From 49ff2df1fc96c5f5855ab7fc516c5b979a8dbb72 Mon Sep 17 00:00:00 2001 From: David Runge Date: Mon, 18 Oct 2021 12:47:12 +0200 Subject: [PATCH] keyringctl: Implement filtering of certifications keyringctl: Add `get_fingerprints_from_import_source()` to derive all fingerprints of PGP public keys found in the import source. Add `get_fingerprints_from_decomposed_dir()` to derive all fingerprints of PGP public keys found in a directory structure holding decomposed PGP packet data. Add `get_fingerprints()` to derive a set of fingerprints of PGP public keys provided through `get_fingerprints_from_import_source()` and `get_fingerprints_from_decomposed_dir()`. Change `convert()` and `convert_certificate()` to accept an optional set of strings (`fingerprint_filter`) that may be used as a filter for valid fingerprints when considering certifications. Change `__main__` to call `convert()` when importing keys to packager or main dir, providing `fingerprint_filter` which will attempt to look up fingerprints in the source as well as the target. --- keyringctl | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 113 insertions(+), 3 deletions(-) diff --git a/keyringctl b/keyringctl index 51a3d4c..51e2001 100755 --- a/keyringctl +++ b/keyringctl @@ -13,7 +13,7 @@ from shutil import copytree from subprocess import PIPE, CalledProcessError, check_output from sys import exit, stderr from tempfile import TemporaryDirectory, mkdtemp -from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Union +from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union @contextmanager @@ -118,6 +118,7 @@ def convert_certificate( # noqa: ignore=C901 working_dir: Path, certificate: Path, name_override: Optional[str] = None, + fingerprint_filter: Optional[Set[str]] = None, ) -> Path: """Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets @@ -133,6 +134,9 @@ def convert_certificate( # noqa: ignore=C901 The path to a public key certificate name_override: Optional[str] An optional string to override the username in the to be created output directory structure + fingerprint_filter: Optional[Set[str]] + An optional list of strings defining fingerprints of PGP public keys that all certificates will be filtered + with Raises ------ @@ -251,7 +255,11 @@ def convert_certificate( # noqa: ignore=C901 elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer): uid_binding_sigs[current_packet_key] = packet elif signature_type.endswith("Certification"): - certifications[current_packet_key].append(packet) + if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]): + debug(f"The certification by issuer {issuer} is appended as it is found in the filter.") + certifications[current_packet_key].append(packet) + else: + debug(f"The certification by issuer {issuer} is not appended because it is not in the filter") else: raise Exception(f"unknown signature type: {signature_type}") elif current_packet_mode == "subkey": @@ -782,6 +790,7 @@ def convert( source: Path, target_dir: Path, name_override: Optional[str] = None, + fingerprint_filter: Optional[Set[str]] = None, ) -> Path: """Convert a path containing PGP certificate material to a decomposed directory structure @@ -797,6 +806,8 @@ def convert( A directory path to write the new directory structure to name_override: Optional[str] An optional username override for the call to `convert_certificate()` + fingerprint_filter: Optional[Set[str]] + An optional set of strings defining fingerprints of PGP public keys that all certificates will be filtered with Returns ------- @@ -814,7 +825,14 @@ def convert( or derive_user_from_target(working_dir=working_dir, target_dir=target_dir, certificate=cert) or key.stem ) - directories.append(convert_certificate(working_dir=working_dir, certificate=cert, name_override=name)) + directories.append( + convert_certificate( + working_dir=working_dir, + certificate=cert, + name_override=name, + fingerprint_filter=fingerprint_filter, + ) + ) for path in directories: (target_dir / path.name).mkdir(parents=True, exist_ok=True) @@ -976,6 +994,88 @@ def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_re trusted_certs_file.write(f"{cert}\n") +def get_fingerprints_from_import_source(working_dir: Path, source: Path) -> List[str]: + """Get all fingerprints of PGP public keys from import file(s) + + Parameters + ---------- + working_dir: Path + A directory to use for temporary files + source: Path + The path to a source file or directory + + Returns + ------- + List[str] + A list of strings representing the fingerprints of PGP public keys found in source + """ + + fingerprints: List[str] = [] + keys: List[Path] = list(source.iterdir()) if source.is_dir() else [source] + + for key in keys: + for certificate in keyring_split(working_dir=working_dir, keyring=key): + for packet in packet_split(working_dir=working_dir, certificate=certificate): + if packet.name.endswith("--PublicKey"): + fingerprints += [packet_dump_field(packet, "Fingerprint")] + + debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}") + return fingerprints + + +def get_fingerprints_from_decomposed_dir(path: Path) -> List[str]: + """Get all fingerprints of PGP public keys from a decomposed directory structure + + Parameters + ---------- + path: Path + The path to a decomposed directory structure + + Returns + ------- + List[str] + A list of strings representing all fingerprints of PGP public keys below path + """ + + fingerprints = [path.stem for path in list(path.absolute().glob("*/*"))] + debug(f"Fingerprints of PGP public keys in {path}: {fingerprints}") + return fingerprints + + +def get_fingerprints(working_dir: Path, input_path: Path, decomposed_paths: List[Path]) -> Set[str]: + """Get the fingerprints of PGP public keys from input paths and decomposed directory structures + + + Parameters + ---------- + working_dir: Path + A directory to use for temporary files + input_path: Path + The path to a source file or directory + decomposed_paths: List[Path] + A list of paths that identify decomposed PGP data in directory structures + + Returns + ------- + Set[str] + A set of strings describing fingerprints of PGP public keys + """ + + fingerprints: Set[str] = set() + + fingerprints.update( + get_fingerprints_from_import_source( + working_dir=working_dir, + source=args.source, + ) + ) + + for decomposed_path in decomposed_paths: + fingerprints.update(get_fingerprints_from_decomposed_dir(path=decomposed_path)) + + return fingerprints + + def export_keyring( working_dir: Path, main: List[Path], @@ -1155,6 +1255,11 @@ if __name__ == "__main__": source=args.source, target_dir=keyring_root / "main", name_override=args.name, + fingerprint_filter=get_fingerprints( + working_dir=working_dir, + input_path=args.source, + decomposed_paths=[keyring_root / "main", keyring_root / "packagers"], + ), ) ) elif "import-packager" == args.subcommand: @@ -1164,6 +1269,11 @@ if __name__ == "__main__": source=args.source, target_dir=keyring_root / "packagers", name_override=args.name, + fingerprint_filter=get_fingerprints( + working_dir=working_dir, + input_path=args.source, + decomposed_paths=[keyring_root / "main", keyring_root / "packagers"], + ), ) ) elif "export-keyring" == args.subcommand: