feature(keyringctl): support importing from a piped fd
This feature allows to import from a piped fd like: > ./keyringctl import --name foobar <(gpg --export foo@bar) We achieve this even with hidepid by taking the naive approach of copying the processes fd source to a tempfile and pass around latter.
This commit is contained in:
parent
2384d0337b
commit
26c7027660
@ -140,7 +140,7 @@ def main() -> None: # noqa: ignore=C901
|
|||||||
convert(
|
convert(
|
||||||
working_dir=working_dir,
|
working_dir=working_dir,
|
||||||
keyring_root=keyring_root,
|
keyring_root=keyring_root,
|
||||||
source=args.source,
|
sources=args.source,
|
||||||
target_dir=target_dir,
|
target_dir=target_dir,
|
||||||
name_override=args.name,
|
name_override=args.name,
|
||||||
)
|
)
|
||||||
@ -151,7 +151,7 @@ def main() -> None: # noqa: ignore=C901
|
|||||||
convert(
|
convert(
|
||||||
working_dir=working_dir,
|
working_dir=working_dir,
|
||||||
keyring_root=keyring_root,
|
keyring_root=keyring_root,
|
||||||
source=args.source,
|
sources=args.source,
|
||||||
target_dir=keyring_root / target_dir,
|
target_dir=keyring_root / target_dir,
|
||||||
name_override=args.name,
|
name_override=args.name,
|
||||||
)
|
)
|
||||||
|
@ -30,6 +30,7 @@ from .types import Fingerprint
|
|||||||
from .types import Uid
|
from .types import Uid
|
||||||
from .types import Username
|
from .types import Username
|
||||||
from .util import system
|
from .util import system
|
||||||
|
from .util import transform_fd_to_tmpfile
|
||||||
|
|
||||||
|
|
||||||
def is_pgp_fingerprint(string: str) -> bool:
|
def is_pgp_fingerprint(string: str) -> bool:
|
||||||
@ -578,7 +579,7 @@ def derive_username_from_fingerprint(keyring_dir: Path, certificate_fingerprint:
|
|||||||
def convert(
|
def convert(
|
||||||
working_dir: Path,
|
working_dir: Path,
|
||||||
keyring_root: Path,
|
keyring_root: Path,
|
||||||
source: Iterable[Path],
|
sources: List[Path],
|
||||||
target_dir: Path,
|
target_dir: Path,
|
||||||
name_override: Optional[Username] = None,
|
name_override: Optional[Username] = None,
|
||||||
) -> Path:
|
) -> Path:
|
||||||
@ -590,7 +591,7 @@ def convert(
|
|||||||
----------
|
----------
|
||||||
working_dir: A directory to use for temporary files
|
working_dir: A directory to use for temporary files
|
||||||
keyring_root: The keyring root directory to look up accepted fingerprints for certifications
|
keyring_root: The keyring root directory to look up accepted fingerprints for certifications
|
||||||
source: A path to a file or directory to decompose
|
sources: A path to a file or directory to decompose
|
||||||
target_dir: A directory path to write the new directory structure to
|
target_dir: A directory path to write the new directory structure to
|
||||||
name_override: An optional username override for the call to `convert_certificate()`
|
name_override: An optional username override for the call to `convert_certificate()`
|
||||||
|
|
||||||
@ -600,12 +601,13 @@ def convert(
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
directories: List[Path] = []
|
directories: List[Path] = []
|
||||||
keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source)))
|
transform_fd_to_tmpfile(working_dir=working_dir, sources=sources)
|
||||||
|
keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], sources)))
|
||||||
|
|
||||||
fingerprint_filter = set(
|
fingerprint_filter = set(
|
||||||
get_fingerprints(
|
get_fingerprints(
|
||||||
working_dir=working_dir,
|
working_dir=working_dir,
|
||||||
sources=source,
|
sources=sources,
|
||||||
paths=[keyring_root] if keyring_root.exists() else [],
|
paths=[keyring_root] if keyring_root.exists() else [],
|
||||||
).keys()
|
).keys()
|
||||||
)
|
)
|
||||||
|
@ -12,6 +12,7 @@ from subprocess import CalledProcessError
|
|||||||
from subprocess import check_output
|
from subprocess import check_output
|
||||||
from sys import exit
|
from sys import exit
|
||||||
from sys import stderr
|
from sys import stderr
|
||||||
|
from tempfile import mkstemp
|
||||||
from traceback import print_stack
|
from traceback import print_stack
|
||||||
from typing import IO
|
from typing import IO
|
||||||
from typing import AnyStr
|
from typing import AnyStr
|
||||||
@ -121,3 +122,23 @@ def absolute_path(path: str) -> Path:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
return Path(path).absolute()
|
return Path(path).absolute()
|
||||||
|
|
||||||
|
|
||||||
|
def transform_fd_to_tmpfile(working_dir: Path, sources: List[Path]) -> None:
|
||||||
|
"""Transforms an input list of paths from any file descriptor of the current process to a tempfile in working_dir.
|
||||||
|
|
||||||
|
Using this function on fd inputs allow to pass the content to another process while hidepid is active and /proc
|
||||||
|
not visible for the other process.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
working_dir: A directory to use for temporary files
|
||||||
|
sources: Paths that should be iterated and all fd's transformed to tmpfiles
|
||||||
|
"""
|
||||||
|
for index, source in enumerate(sources):
|
||||||
|
if str(source).startswith("/proc/self/fd"):
|
||||||
|
file = mkstemp(dir=working_dir, prefix=f"{source.name}", suffix=".fd")[1]
|
||||||
|
with open(file, mode="wb") as f:
|
||||||
|
f.write(source.read_bytes())
|
||||||
|
f.flush()
|
||||||
|
sources[index] = Path(file)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user