diff --git a/keyringctl b/keyringctl index a731965..31263cc 100755 --- a/keyringctl +++ b/keyringctl @@ -7,6 +7,8 @@ from collections import defaultdict from collections.abc import Iterable from collections.abc import Iterator from contextlib import contextmanager +from datetime import datetime +from functools import reduce from itertools import chain from logging import DEBUG from logging import basicConfig @@ -242,18 +244,20 @@ def convert_certificate( # noqa: ignore=C901 # root packets certificate_fingerprint: Optional[Fingerprint] = None pubkey: Optional[Path] = None + # TODO: direct key certifications are not yet selecting the latest sig, owner may have multiple + # TODO: direct key certifications are not yet single packet per file direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) # subkey packets subkeys: Dict[Fingerprint, Path] = {} - subkey_bindings: Dict[Fingerprint, Path] = {} - subkey_revocations: Dict[Fingerprint, Path] = {} + subkey_bindings: Dict[Fingerprint, List[Path]] = defaultdict(list) + subkey_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) # uid packets uids: Dict[Uid, Path] = {} - certifications: Dict[Uid, List[Path]] = defaultdict(list) - revocations: Dict[Uid, List[Path]] = defaultdict(list) + certifications: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) + revocations: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) # intermediate variables current_packet_mode: Optional[str] = None @@ -309,11 +313,11 @@ def convert_certificate( # noqa: ignore=C901 raise Exception('missing current packet uid for "{packet.name}"') if signature_type == "CertificationRevocation": - revocations[current_packet_uid].append(packet) + revocations[current_packet_uid][issuer].append(packet) elif signature_type.endswith("Certification"): if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]): debug(f"The certification by issuer {issuer} is appended as it is found in the filter.") - certifications[current_packet_uid].append(packet) + certifications[current_packet_uid][issuer].append(packet) else: debug(f"The certification by issuer {issuer} is not appended because it is not in the filter") else: @@ -323,9 +327,9 @@ def convert_certificate( # noqa: ignore=C901 raise Exception('missing current packet fingerprint for "{packet.name}"') if signature_type == "SubkeyBinding": - subkey_bindings[current_packet_fingerprint] = packet + subkey_bindings[current_packet_fingerprint].append(packet) elif signature_type == "SubkeyRevocation": - subkey_revocations[certificate_fingerprint] = packet + subkey_revocations[certificate_fingerprint].append(packet) else: raise Exception(f"unknown signature type: {signature_type}") else: @@ -462,7 +466,7 @@ def persist_subkeys( def persist_subkey_bindings( key_dir: Path, - subkey_bindings: Dict[Fingerprint, Path], + subkey_bindings: Dict[Fingerprint, List[Path]], ) -> None: """Persist all SubkeyBinding of a root key file to file(s) @@ -472,7 +476,8 @@ def persist_subkey_bindings( subkey_bindings: The SubkeyBinding signatures of a Public-Subkey """ - for fingerprint, subkey_binding in subkey_bindings.items(): + for fingerprint, bindings in subkey_bindings.items(): + subkey_binding = latest_certification(bindings) issuer = packet_dump_field(subkey_binding, "Issuer") output_file = key_dir / "subkey" / fingerprint / "certification" / f"{issuer}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) @@ -482,7 +487,7 @@ def persist_subkey_bindings( def persist_subkey_revocations( key_dir: Path, - subkey_revocations: Dict[Fingerprint, Path], + subkey_revocations: Dict[Fingerprint, List[Path]], ) -> None: """Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s) @@ -492,7 +497,8 @@ def persist_subkey_revocations( subkey_revocations: The SubkeyRevocations of PublicSubkeys of a key """ - for fingerprint, revocation in subkey_revocations.items(): + for fingerprint, revocations in subkey_revocations.items(): + revocation = latest_certification(revocations) issuer = packet_dump_field(revocation, "Issuer") output_file = key_dir / "subkey" / fingerprint / "revocation" / f"{issuer}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) @@ -540,7 +546,7 @@ def persist_direct_key_revocations( def persist_uid_certifications( - certifications: Dict[Uid, List[Path]], + certifications: Dict[Uid, Dict[Fingerprint, List[Path]]], key_dir: Path, ) -> None: """Persist the certifications of a root key to file(s) @@ -555,19 +561,18 @@ def persist_uid_certifications( key_dir: The root directory below which certifications are persisted """ - for key, current_certifications in certifications.items(): - for certification in current_certifications: - certification_dir = key_dir / "uid" / key / "certification" + for uid, uid_certifications in certifications.items(): + for issuer, issuer_certifications in uid_certifications.items(): + certification_dir = key_dir / "uid" / uid / "certification" certification_dir.mkdir(parents=True, exist_ok=True) - issuer = packet_dump_field(certification, "Issuer") - + certification = latest_certification(issuer_certifications) output_file = certification_dir / f"{issuer}.asc" debug(f"Writing file {output_file} from {certification}") packet_join(packets=[certification], output=output_file, force=True) def persist_uid_revocations( - revocations: Dict[Uid, List[Path]], + revocations: Dict[Uid, Dict[Fingerprint, List[Path]]], key_dir: Path, ) -> None: """Persist the revocations of a root key to file(s) @@ -581,12 +586,11 @@ def persist_uid_revocations( key_dir: The root directory below which revocations will be persisted """ - for key, current_revocations in revocations.items(): - for revocation in current_revocations: - revocation_dir = key_dir / "uid" / key / "revocation" + for uid, uid_revocations in revocations.items(): + for issuer, issuer_revocations in uid_revocations.items(): + revocation_dir = key_dir / "uid" / uid / "revocation" revocation_dir.mkdir(parents=True, exist_ok=True) - - issuer = packet_dump_field(revocation, "Issuer") + revocation = latest_certification(issuer_revocations) output_file = revocation_dir / f"{issuer}.asc" debug(f"Writing file {output_file} from {revocation}") packet_join(packets=[revocation], output=output_file, force=True) @@ -628,10 +632,41 @@ def packet_dump_field(packet: Path, field: str) -> str: dump = packet_dump(packet) lines = [line.strip() for line in dump.splitlines()] - lines = list(filter(lambda line: line.startswith(f"{field}: "), lines)) + lines = list(filter(lambda line: line.strip().startswith(f"{field}: "), lines)) if not lines: raise Exception(f'Packet has no field "{field}"') - return lines[0].split(maxsplit=1)[1] + return lines[0].split(sep=": ", maxsplit=1)[1] + + +def packet_signature_creation_time(packet: Path) -> datetime: + """Retrieve the signature creation time field as datetime + + Parameters + ---------- + packet: The path to the PGP packet to retrieve the value from + + Returns + ------- + The signature creation time as datetime + """ + return datetime.strptime(packet_dump_field(packet, "Signature creation time"), "%Y-%m-%d %H:%M:%S %Z") + + +def latest_certification(certifications: Iterable[Path]) -> Path: + """Returns the latest certification based on the signature creation time from a list of packets. + + Parameters + ---------- + certifications: List of certification from which to choose the latest from + + Returns + ------- + The latest certification from a list of packets + """ + return reduce( + lambda a, b: a if packet_signature_creation_time(a) > packet_signature_creation_time(b) else b, + certifications, + ) def keyring_split(working_dir: Path, keyring: Path, preserve_filename: bool = False) -> Iterable[Path]: