chore(keyringctl): simplify convert usage by moving fingerprint filter
This commit is contained in:
parent
fc9ad848ab
commit
619f2a3b68
61
keyringctl
61
keyringctl
@ -854,10 +854,10 @@ def derive_username_from_fingerprint(keyring_dir: Path, certificate_fingerprint:
|
|||||||
|
|
||||||
def convert(
|
def convert(
|
||||||
working_dir: Path,
|
working_dir: Path,
|
||||||
|
keyring_root: Path,
|
||||||
source: Iterable[Path],
|
source: Iterable[Path],
|
||||||
target_dir: Path,
|
target_dir: Path,
|
||||||
name_override: Optional[Username] = None,
|
name_override: Optional[Username] = None,
|
||||||
fingerprint_filter: Optional[Set[Fingerprint]] = None,
|
|
||||||
) -> Path:
|
) -> Path:
|
||||||
"""Convert a path containing PGP certificate material to a decomposed directory structure
|
"""Convert a path containing PGP certificate material to a decomposed directory structure
|
||||||
|
|
||||||
@ -867,14 +867,14 @@ def convert(
|
|||||||
----------
|
----------
|
||||||
working_dir: Path
|
working_dir: Path
|
||||||
A directory to use for temporary files
|
A directory to use for temporary files
|
||||||
|
keyring_root: Path
|
||||||
|
The keyring root directory to look up accepted fingerprints for certifications
|
||||||
source: Iterable[Path]
|
source: Iterable[Path]
|
||||||
A path to a file or directory to decompose
|
A path to a file or directory to decompose
|
||||||
target_dir: Path
|
target_dir: Path
|
||||||
A directory path to write the new directory structure to
|
A directory path to write the new directory structure to
|
||||||
name_override: Optional[Username]
|
name_override: Optional[Username]
|
||||||
An optional username override for the call to `convert_certificate()`
|
An optional username override for the call to `convert_certificate()`
|
||||||
fingerprint_filter: Optional[Set[Fingerprint]]
|
|
||||||
An optional set of strings defining fingerprints of PGP public keys that all certificates will be filtered with
|
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
@ -885,6 +885,14 @@ def convert(
|
|||||||
directories: List[Path] = []
|
directories: List[Path] = []
|
||||||
keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source)))
|
keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source)))
|
||||||
|
|
||||||
|
fingerprint_filter = set(
|
||||||
|
get_fingerprints(
|
||||||
|
working_dir=working_dir,
|
||||||
|
sources=source,
|
||||||
|
paths=[keyring_root],
|
||||||
|
).keys()
|
||||||
|
)
|
||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
for cert in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True):
|
for cert in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True):
|
||||||
directories.append(
|
directories.append(
|
||||||
@ -1003,8 +1011,7 @@ def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path
|
|||||||
# where some UIDs are signed and others are revoked
|
# where some UIDs are signed and others are revoked
|
||||||
if len(foreign_revocations[fingerprint]) >= min_revoker:
|
if len(foreign_revocations[fingerprint]) >= min_revoker:
|
||||||
debug(
|
debug(
|
||||||
f"Revoking {cert_dir.name} due to {set(foreign_revocations[fingerprint])} "
|
f"Revoking {cert_dir.name} due to {set(foreign_revocations[fingerprint])} " "being main key revocations"
|
||||||
"being main key revocations"
|
|
||||||
)
|
)
|
||||||
revoked_certs.append(fingerprint)
|
revoked_certs.append(fingerprint)
|
||||||
|
|
||||||
@ -1014,14 +1021,14 @@ def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path
|
|||||||
trusted_certs_file.write(f"{cert}\n")
|
trusted_certs_file.write(f"{cert}\n")
|
||||||
|
|
||||||
|
|
||||||
def get_fingerprints_from_import_source(working_dir: Path, source: List[Path]) -> Dict[Fingerprint, Username]:
|
def get_fingerprints_from_keyring_files(working_dir: Path, source: Iterable[Path]) -> Dict[Fingerprint, Username]:
|
||||||
"""Get all fingerprints of PGP public keys from import file(s)
|
"""Get all fingerprints of PGP public keys from import file(s)
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
working_dir: Path
|
working_dir: Path
|
||||||
A directory to use for temporary files
|
A directory to use for temporary files
|
||||||
source: List[Path]
|
source: Interable[Path]
|
||||||
The path to a source file or directory containing keyrings
|
The path to a source file or directory containing keyrings
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
@ -1043,12 +1050,12 @@ def get_fingerprints_from_import_source(working_dir: Path, source: List[Path]) -
|
|||||||
return fingerprints
|
return fingerprints
|
||||||
|
|
||||||
|
|
||||||
def get_fingerprints_from_decomposed_dir(path: Path) -> Dict[Fingerprint, Username]:
|
def get_fingerprints_from_certificate_directory(paths: List[Path]) -> Dict[Fingerprint, Username]:
|
||||||
"""Get all fingerprints of PGP public keys from a decomposed directory structure
|
"""Get all fingerprints of PGP public keys from decomposed directory structures
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
path: Path
|
paths: List[Path]
|
||||||
The path to a decomposed directory structure
|
The path to a decomposed directory structure
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
@ -1058,14 +1065,14 @@ def get_fingerprints_from_decomposed_dir(path: Path) -> Dict[Fingerprint, Userna
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
fingerprints: Dict[Fingerprint, Username] = {}
|
fingerprints: Dict[Fingerprint, Username] = {}
|
||||||
for cert in sorted(get_cert_paths([path])):
|
for cert in sorted(get_cert_paths(paths)):
|
||||||
fingerprints[Fingerprint(cert.name)] = Username(cert.parent.name)
|
fingerprints[Fingerprint(cert.name)] = Username(cert.parent.name)
|
||||||
|
|
||||||
debug(f"Fingerprints of PGP public keys in {path}: {fingerprints}")
|
debug(f"Fingerprints of PGP public keys in {paths}: {fingerprints}")
|
||||||
return fingerprints
|
return fingerprints
|
||||||
|
|
||||||
|
|
||||||
def get_fingerprints(working_dir: Path, decomposed_paths: List[Path]) -> Dict[Fingerprint, Username]:
|
def get_fingerprints(working_dir: Path, sources: Iterable[Path], paths: List[Path]) -> Dict[Fingerprint, Username]:
|
||||||
"""Get the fingerprints of PGP public keys from input paths and decomposed directory structures
|
"""Get the fingerprints of PGP public keys from input paths and decomposed directory structures
|
||||||
|
|
||||||
|
|
||||||
@ -1073,7 +1080,9 @@ def get_fingerprints(working_dir: Path, decomposed_paths: List[Path]) -> Dict[Fi
|
|||||||
----------
|
----------
|
||||||
working_dir: Path
|
working_dir: Path
|
||||||
A directory to use for temporary files
|
A directory to use for temporary files
|
||||||
decomposed_paths: List[Path]
|
sources: Iterable[Path]
|
||||||
|
A list of directories or files from which to read PGP keyring information
|
||||||
|
paths: List[Path]
|
||||||
A list of paths that identify decomposed PGP data in directory structures
|
A list of paths that identify decomposed PGP data in directory structures
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
@ -1085,14 +1094,13 @@ def get_fingerprints(working_dir: Path, decomposed_paths: List[Path]) -> Dict[Fi
|
|||||||
fingerprints: Dict[Fingerprint, Username] = {}
|
fingerprints: Dict[Fingerprint, Username] = {}
|
||||||
|
|
||||||
fingerprints.update(
|
fingerprints.update(
|
||||||
get_fingerprints_from_import_source(
|
get_fingerprints_from_keyring_files(
|
||||||
working_dir=working_dir,
|
working_dir=working_dir,
|
||||||
source=args.source,
|
source=sources,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
for decomposed_path in decomposed_paths:
|
fingerprints.update(get_fingerprints_from_certificate_directory(paths=paths))
|
||||||
fingerprints.update(get_fingerprints_from_decomposed_dir(path=decomposed_path))
|
|
||||||
|
|
||||||
return fingerprints
|
return fingerprints
|
||||||
|
|
||||||
@ -1249,7 +1257,7 @@ def inspect_keyring(working_dir: Path, keyring_root: Path, sources: Optional[Lis
|
|||||||
export(working_dir=working_dir, keyring_root=keyring_root, sources=sources, output=keyring)
|
export(working_dir=working_dir, keyring_root=keyring_root, sources=sources, output=keyring)
|
||||||
|
|
||||||
return inspect(
|
return inspect(
|
||||||
packet=keyring, certifications=True, fingerprints=get_fingerprints_from_decomposed_dir(path=keyring_root)
|
packet=keyring, certifications=True, fingerprints=get_fingerprints_from_certificate_directory(paths=[keyring_root])
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -1364,19 +1372,24 @@ if __name__ == "__main__":
|
|||||||
with cwd(working_dir):
|
with cwd(working_dir):
|
||||||
if "convert" == args.subcommand:
|
if "convert" == args.subcommand:
|
||||||
target_dir = args.target or Path(mkdtemp(prefix="arch-keyringctl-")).absolute()
|
target_dir = args.target or Path(mkdtemp(prefix="arch-keyringctl-")).absolute()
|
||||||
print(convert(working_dir, args.source, target_dir=target_dir))
|
print(
|
||||||
|
convert(
|
||||||
|
working_dir=working_dir,
|
||||||
|
keyring_root=keyring_root,
|
||||||
|
source=args.source,
|
||||||
|
target_dir=target_dir,
|
||||||
|
name_override=args.name,
|
||||||
|
)
|
||||||
|
)
|
||||||
elif "import" == args.subcommand:
|
elif "import" == args.subcommand:
|
||||||
target_dir = "main" if args.main else "packager"
|
target_dir = "main" if args.main else "packager"
|
||||||
print(
|
print(
|
||||||
convert(
|
convert(
|
||||||
working_dir=working_dir,
|
working_dir=working_dir,
|
||||||
|
keyring_root=keyring_root,
|
||||||
source=args.source,
|
source=args.source,
|
||||||
target_dir=keyring_root / target_dir,
|
target_dir=keyring_root / target_dir,
|
||||||
name_override=args.name,
|
name_override=args.name,
|
||||||
fingerprint_filter=set(get_fingerprints(
|
|
||||||
working_dir=working_dir,
|
|
||||||
decomposed_paths=[keyring_root / "main", keyring_root / "packager"],
|
|
||||||
).keys()),
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
elif "export" == args.subcommand:
|
elif "export" == args.subcommand:
|
||||||
|
Loading…
Reference in New Issue
Block a user