diff --git a/libkeyringctl/keyring.py b/libkeyringctl/keyring.py index 8b0b740..64aafb9 100644 --- a/libkeyringctl/keyring.py +++ b/libkeyringctl/keyring.py @@ -21,6 +21,7 @@ from .sequoia import keyring_split from .sequoia import latest_certification from .sequoia import packet_dump_field from .sequoia import packet_join +from .sequoia import packet_signature_creation_time from .sequoia import packet_split from .trust import certificate_trust from .trust import certificate_trust_from_paths @@ -29,12 +30,15 @@ from .types import Fingerprint from .types import Trust from .types import Uid from .types import Username +from .util import contains_fingerprint from .util import filter_fingerprints_by_trust from .util import get_cert_paths from .util import get_fingerprint_from_partial from .util import simplify_ascii from .util import transform_fd_to_tmpfile +PACKET_FILENAME_DATETIME_FORMAT: str = "%Y-%m-%d_%H-%M-%S" + def is_pgp_fingerprint(string: str) -> bool: """Returns whether the passed string looks like a PGP (long) fingerprint @@ -93,6 +97,7 @@ def convert_pubkey_signature_packet( certificate_fingerprint: Fingerprint, fingerprint_filter: Optional[Set[Fingerprint]], current_packet_fingerprint: Optional[Fingerprint], + key_revocations: Dict[Fingerprint, Path], direct_revocations: Dict[Fingerprint, List[Path]], direct_sigs: Dict[Fingerprint, List[Path]], ) -> None: @@ -102,6 +107,7 @@ def convert_pubkey_signature_packet( certificate_fingerprint: The public key certificate fingerprint fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with current_packet_fingerprint: Optional certificate fingerprint of the current packet + key_revocation: A dictionary of key revocation packets direct_revocations: A dictionary of direct key revocations direct_sigs: A dictionary of direct key signatures """ @@ -114,13 +120,20 @@ def convert_pubkey_signature_packet( if not issuer: debug(f"failed to resolve partial fingerprint {issuer}, skipping packet") + return + + if not certificate_fingerprint.endswith(issuer): + debug(f"skipping direct key signature because {issuer} is a third-party and not a self signature") + return + + if signature_type == "KeyRevocation": + key_revocations[issuer] = packet + elif signature_type == "DirectKey" or signature_type.endswith("Certification"): + direct_sigs[issuer].append(packet) + elif signature_type == "CertificationRevocation": + direct_revocations[issuer].append(packet) else: - if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer): - direct_revocations[issuer].append(packet) - elif signature_type in ["DirectKey", "GenericCertification"]: - direct_sigs[issuer].append(packet) - else: - raise Exception(f"unknown signature type: {signature_type}") + raise Exception(f"unknown signature type: {signature_type}") def convert_uid_signature_packet( @@ -149,11 +162,12 @@ def convert_uid_signature_packet( debug(f"failed to resolve partial fingerprint {issuer}, skipping packet") else: if signature_type == "CertificationRevocation": - revocations[current_packet_uid][issuer].append(packet) + if fingerprint_filter is None or contains_fingerprint(fingerprints=fingerprint_filter, fingerprint=issuer): + revocations[current_packet_uid][issuer].append(packet) + else: + debug(f"The revocation by issuer {issuer} is not appended because it is not in the filter") elif signature_type.endswith("Certification"): - # TODO: extend fp filter to all certifications - # TODO: use contains_fingerprint - if fingerprint_filter is None or any([fp.endswith(issuer) for fp in fingerprint_filter]): + if fingerprint_filter is None or contains_fingerprint(fingerprints=fingerprint_filter, fingerprint=issuer): debug(f"The certification by issuer {issuer} is appended as it is found in the filter.") certifications[current_packet_uid][issuer].append(packet) else: @@ -205,6 +219,7 @@ def convert_signature_packet( current_packet_mode: Optional[str], certificate_fingerprint: Optional[Fingerprint], fingerprint_filter: Optional[Set[Fingerprint]], + key_revocations: Dict[Fingerprint, Path], current_packet_fingerprint: Optional[Fingerprint], current_packet_uid: Optional[Uid], direct_revocations: Dict[Fingerprint, List[Path]], @@ -218,11 +233,12 @@ def convert_signature_packet( packet: The Path of the packet file to process certificate_fingerprint: The public key certificate fingerprint + fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with + key_revocation: A dictionary containing all key revocation packet current_packet_fingerprint: Optional certificate fingerprint of the current packet current_packet_uid: Optional Uid of the current packet direct_revocations: A dictionary of direct key revocations direct_sigs: A dictionary of direct key signatures - fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with certifications: A dictionary containing all certificaions revocations: A dictionary containing all revocations subkey_bindings: A dictionary containing all subkey binding signatures @@ -238,6 +254,7 @@ def convert_signature_packet( certificate_fingerprint=certificate_fingerprint, fingerprint_filter=fingerprint_filter, current_packet_fingerprint=current_packet_fingerprint, + key_revocations=key_revocations, direct_revocations=direct_revocations, direct_sigs=direct_sigs, ) @@ -299,8 +316,7 @@ def convert_certificate( # root packets certificate_fingerprint: Optional[Fingerprint] = None pubkey: Optional[Path] = None - # TODO: direct key certifications are not yet selecting the latest sig, owner may have multiple - # TODO: direct key certifications are not yet single packet per file + key_revocations: Dict[Fingerprint, Path] = {} direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) @@ -319,10 +335,6 @@ def convert_certificate( current_packet_fingerprint: Optional[Fingerprint] = None current_packet_uid: Optional[Uid] = None - # XXX: PrimaryKeyBinding - - # TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean - debug(f"Processing certificate {certificate}") for packet in packet_split(working_dir=working_dir, certificate=certificate): @@ -366,6 +378,7 @@ def convert_certificate( fingerprint_filter=fingerprint_filter, current_packet_fingerprint=current_packet_fingerprint, current_packet_uid=current_packet_uid, + key_revocations=key_revocations, direct_revocations=direct_revocations, direct_sigs=direct_sigs, certifications=certifications, @@ -395,6 +408,7 @@ def convert_certificate( direct_revocations=direct_revocations, certificate_fingerprint=certificate_fingerprint, pubkey=pubkey, + key_revocations=key_revocations, subkeys=subkeys, subkey_bindings=subkey_bindings, subkey_revocations=subkey_revocations, @@ -412,6 +426,7 @@ def persist_key_material( direct_revocations: Dict[Fingerprint, List[Path]], certificate_fingerprint: Fingerprint, pubkey: Path, + key_revocations: Dict[Fingerprint, Path], subkeys: Dict[Fingerprint, Path], subkey_bindings: Dict[Fingerprint, List[Path]], subkey_revocations: Dict[Fingerprint, List[Path]], @@ -426,6 +441,7 @@ def persist_key_material( direct_revocations: A dictionary of direct key revocations certificate_fingerprint: The public key certificate fingerprint pubkey: The Path of the PGP packet representing the public key material + key_revocations: A dictionary containing all key revocations subkeys: A dictionary of Paths per Fingerprint that represent the subkey material of the certificate subkey_bindings: A dictionary containing all subkey binding signatures subkey_revocations: A dictionary containing all subkey revocations @@ -440,6 +456,11 @@ def persist_key_material( key_dir=key_dir, ) + persist_key_revocations( + key_dir=key_dir, + key_revocations=key_revocations, + ) + persist_direct_key_certifications( direct_key_certifications=direct_sigs, key_dir=key_dir, @@ -589,6 +610,26 @@ def persist_subkey_revocations( packet_join(packets=[revocation], output=output_file, force=True) +def persist_key_revocations( + key_revocations: Dict[Fingerprint, Path], + key_dir: Path, +) -> None: + """Persist the key revocation + + Parameters + ---------- + key_revocations: Dictionary with key revocation + key_dir: The root directory below which the revocation is persisted + """ + + for issuer, revocation in key_revocations.items(): + output_file = key_dir / "revocation" / f"{issuer}.asc" + output_file.parent.mkdir(parents=True, exist_ok=True) + + debug(f"Writing file {output_file} from {str(revocation)}") + packet_join(packets=[revocation], output=output_file, force=True) + + def persist_direct_key_certifications( direct_key_certifications: Dict[Fingerprint, List[Path]], key_dir: Path, @@ -603,10 +644,14 @@ def persist_direct_key_certifications( """ for issuer, certifications in direct_key_certifications.items(): - output_file = key_dir / "certification" / f"{issuer}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") - packet_join(packets=certifications, output=output_file, force=True) + output_dir = key_dir / "directkey" / "certification" / issuer + output_dir.mkdir(parents=True, exist_ok=True) + + for certification in certifications: + creation_time = packet_signature_creation_time(certification).strftime(PACKET_FILENAME_DATETIME_FORMAT) + output_file = output_dir / f"{creation_time}.asc" + debug(f"Writing file {output_file} from {str(certification)}") + packet_join(packets=[certification], output=output_file, force=True) def persist_direct_key_revocations( @@ -622,10 +667,14 @@ def persist_direct_key_revocations( """ for issuer, certifications in direct_key_revocations.items(): - output_file = key_dir / "revocation" / f"{issuer}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") - packet_join(packets=certifications, output=output_file, force=True) + output_dir = key_dir / "directkey" / "revocation" / issuer + output_dir.mkdir(parents=True, exist_ok=True) + + for certification in certifications: + creation_time = packet_signature_creation_time(certification).strftime(PACKET_FILENAME_DATETIME_FORMAT) + output_file = output_dir / f"{creation_time}.asc" + debug(f"Writing file {output_file} from {str(certification)}") + packet_join(packets=[certification], output=output_file, force=True) def persist_uid_certifications( @@ -937,7 +986,7 @@ def get_packets_from_path(path: Path) -> List[Path]: def get_packets_from_listing(path: Path) -> List[Path]: - """Collects packets from a listing of directories holding one level each by calling `get_get_packets_from_path`. + """Collects packets from a listing of directories holding one level each by calling `get_packets_from_path`. Parameters ---------- @@ -996,6 +1045,11 @@ def export( packets += get_packets_from_listing(cert_dir / "subkey") packets += get_packets_from_listing(cert_dir / "uid") + directkey_path = cert_dir / "directkey" + directkeys = directkey_path.iterdir() if directkey_path.exists() else [] + for path in directkeys: + packets += get_packets_from_listing(path) + output_path = temp_dir / f"{cert_dir.name}.asc" debug(f"Joining {cert_dir} in {output_path}") packet_join( diff --git a/libkeyringctl/sequoia.py b/libkeyringctl/sequoia.py index f53215d..4b1bc12 100644 --- a/libkeyringctl/sequoia.py +++ b/libkeyringctl/sequoia.py @@ -206,7 +206,9 @@ def packet_signature_creation_time(packet: Path) -> datetime: ------- The signature creation time as datetime """ - return datetime.strptime(packet_dump_field(packet, "Signature creation time"), "%Y-%m-%d %H:%M:%S %Z") + field = packet_dump_field(packet, "Signature creation time") + field = " ".join(field.split(" ", 3)[0:3]) + return datetime.strptime(field, "%Y-%m-%d %H:%M:%S %Z") def packet_kinds(packet: Path) -> List[PacketKind]: diff --git a/libkeyringctl/verify.py b/libkeyringctl/verify.py index fc2ac1f..d4ecbdc 100644 --- a/libkeyringctl/verify.py +++ b/libkeyringctl/verify.py @@ -15,7 +15,6 @@ from libkeyringctl.keyring import transform_username_to_keyring_path from libkeyringctl.sequoia import packet_dump_field from libkeyringctl.sequoia import packet_kinds from libkeyringctl.types import Fingerprint -from libkeyringctl.types import PacketKind from libkeyringctl.types import Uid from libkeyringctl.util import get_cert_paths from libkeyringctl.util import get_fingerprint_from_partial @@ -91,60 +90,26 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N if path.is_file(): if path.name != f"{certificate.name}.asc": raise Exception(f"Unexpected file in certificate {certificate.name}: {str(path)}") - kinds: List[PacketKind] = packet_kinds(packet=path) - if not kinds or len(kinds) > 1: - raise Exception(f"Unexpected amount of packets in file {str(path)}: {kinds}") - kind = kinds[0] - if kind != "Public-Key": - raise Exception(f"Unexpected packet in file {str(path)}: {kind}") - fingerprint = packet_dump_field(packet=path, field="Fingerprint") - if fingerprint != certificate.name: - raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}") + assert_packet_kind(path=path, expected="Public-Key") + assert_filename_matches_packet_fingerprint(path=path, check=certificate.name) debug(f"OK: {path}") elif path.is_dir(): - # TODO: check direct key types, multiple - if "certification" == path.name: - for sig in path.iterdir(): - if not sig.is_file(): - raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(sig)}") - if not is_pgp_fingerprint(sig.stem): - raise Exception(f"Unexpected file name in certificate {certificate.name}: {str(sig)}") - if sig.suffix != ".asc": - raise Exception(f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}") - kinds = packet_kinds(packet=sig) - if not kinds: - raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}") - if any(filter(lambda kind: not kind == "Signature", kinds)): - raise Exception(f"Unexpected packet in file {str(sig)}: {kinds}") - debug(f"OK: {path}") - elif "revocation" == path.name: - for sig in path.iterdir(): - if not sig.is_file(): - raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(sig)}") - if not is_pgp_fingerprint(sig.stem): - raise Exception(f"Unexpected file name in certificate {certificate.name}: {str(sig)}") - if sig.suffix != ".asc": - raise Exception(f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}") - kinds = packet_kinds(packet=sig) - if not kinds or len(kinds) > 1: - raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}") - kind = kinds[0] - if kind != "Signature": - raise Exception(f"Unexpected packet in file {str(sig)}: {kind}") - fingerprint = packet_dump_field(packet=sig, field="Issuer Fingerprint") - if not fingerprint == sig.stem: - raise Exception(f"Unexpected packet fingerprint in file {str(sig)}: {fingerprint}") - sig_type = packet_dump_field(packet=sig, field="Type") - if "KeyRevocation" != sig_type: - raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}") - debug(f"OK: {sig}") + if "revocation" == path.name: + verify_integrity_key_revocations(path=path) + elif "directkey" == path.name: + for directkey in path.iterdir(): + assert_is_dir(path=directkey) + if "certification" == directkey.name: + verify_integrity_direct_key_certifications(path=directkey) + elif "revocation" == directkey.name: + verify_integrity_direct_key_revocations(path=directkey) + else: + raise_unexpected_file(path=directkey) elif "uid" == path.name: for uid in path.iterdir(): - if not uid.is_dir(): - raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(uid)}") + assert_is_dir(path=uid) uid_packet = uid / f"{uid.name}.asc" - if not uid_packet.is_file(): - raise Exception(f"Missing uid packet for {certificate.name}: {str(uid_packet)}") + assert_is_file(path=uid_packet) uid_binding_sig = uid / "certification" / f"{certificate.name}.asc" uid_revocation_sig = uid / "revocation" / f"{certificate.name}.asc" @@ -155,12 +120,9 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N if uid_path.is_file(): if uid_path.name != f"{uid.name}.asc": raise Exception(f"Unexpected file in certificate {certificate.name}: {str(uid_path)}") - kinds = packet_kinds(packet=uid_path) - if not kinds or len(kinds) > 1: - raise Exception(f"Unexpected amount of packets in file {str(uid_path)}: {kinds}") - kind = kinds[0] - if kind != "User": - raise Exception(f"Unexpected packet in file {str(uid_path)}: {kind}") + + assert_packet_kind(path=uid_path, expected="User") + uid_value = Uid(simplify_ascii(packet_dump_field(packet=uid_path, field="Value"))) if uid_value != uid.name: raise Exception(f"Unexpected uid in file {str(uid_path)}: {uid_value}") @@ -168,63 +130,35 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(uid_path)}") elif "certification" == uid_path.name: for sig in uid_path.iterdir(): - if not sig.is_file(): - raise Exception( - f"Unexpected file type in certificate {certificate.name}: {str(sig)}" - ) - if not is_pgp_fingerprint(sig.stem): - raise Exception( - f"Unexpected file name in certificate {certificate.name}: {str(sig)}" - ) - if sig.suffix != ".asc": - raise Exception( - f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}" - ) - kinds = packet_kinds(packet=sig) - if not kinds or len(kinds) > 1: - raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}") - kind = kinds[0] - if kind != "Signature": - raise Exception(f"Unexpected packet in file {str(sig)}: {kind}") + assert_is_file(path=sig) + assert_is_pgp_fingerprint(path=sig, _str=sig.stem) + assert_has_suffix(path=sig, suffix=".asc") + + assert_packet_kind(path=sig, expected="Signature") + assert_signature_type_certification(path=sig) + issuer = get_fingerprint_from_partial( fingerprints=all_fingerprints, fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")), ) if issuer != sig.stem: raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}") - sig_type = packet_dump_field(packet=sig, field="Type") - if not sig_type.endswith("Certification"): - raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}") debug(f"OK: {sig}") elif "revocation" == uid_path.name: for sig in uid_path.iterdir(): - if not sig.is_file(): - raise Exception( - f"Unexpected file type in certificate {certificate.name}: {str(sig)}" - ) - if not is_pgp_fingerprint(sig.stem): - raise Exception( - f"Unexpected file name in certificate {certificate.name}: {str(sig)}" - ) - if sig.suffix != ".asc": - raise Exception( - f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}" - ) - kinds = packet_kinds(packet=sig) - if not kinds or len(kinds) > 1: - raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}") - kind = kinds[0] - if kind != "Signature": - raise Exception(f"Unexpected packet in file {str(sig)}: {kind}") + assert_is_file(path=sig) + assert_is_pgp_fingerprint(path=sig, _str=sig.stem) + assert_has_suffix(path=sig, suffix=".asc") + + assert_packet_kind(path=sig, expected="Signature") + assert_signature_type(path=sig, expected="CertificationRevocation") + issuer = get_fingerprint_from_partial( fingerprints=all_fingerprints, fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")), ) if issuer != sig.stem: raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}") - sig_type = packet_dump_field(packet=sig, field="Type") - if sig_type != "CertificationRevocation": - raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}") debug(f"OK: {sig}") else: raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(uid_path)}") @@ -232,13 +166,11 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N debug(f"OK: {uid}") elif "subkey" == path.name: for subkey in path.iterdir(): - if not subkey.is_dir(): - raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(subkey)}") - if not is_pgp_fingerprint(subkey.name): - raise Exception(f"Unexpected file name in certificate {certificate.name}: {str(subkey)}") + assert_is_dir(path=subkey) + assert_is_pgp_fingerprint(path=subkey, _str=subkey.name) + subkey_packet = subkey / f"{subkey.name}.asc" - if not subkey_packet.is_file(): - raise Exception(f"Missing subkey packet for {certificate.name}: {str(subkey_packet)}") + assert_is_file(path=subkey_packet) subkey_binding_sig = subkey / "certification" / f"{certificate.name}.asc" subkey_revocation_sig = subkey / "revocation" / f"{certificate.name}.asc" @@ -251,73 +183,33 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N raise Exception( f"Unexpected file in certificate {certificate.name}: {str(subkey_path)}" ) - kinds = packet_kinds(packet=subkey_path) - if not kinds or len(kinds) > 1: - raise Exception(f"Unexpected amount of packets in file {str(subkey_path)}: {kinds}") - kind = kinds[0] - if kind != "Public-Subkey": - raise Exception(f"Unexpected packet in file {str(subkey_path)}: {kind}") - fingerprint = packet_dump_field(packet=subkey_path, field="Fingerprint") - if fingerprint != subkey_path.stem: - raise Exception( - f"Unexpected packet fingerprint in file {str(subkey_path)}: {fingerprint}" - ) + + assert_packet_kind(path=subkey_path, expected="Public-Subkey") + assert_filename_matches_packet_fingerprint(path=subkey_path, check=subkey_path.stem) elif not subkey_path.is_dir(): raise Exception( f"Unexpected file type in certificate {certificate.name}: {str(subkey_path)}" ) elif "certification" == subkey_path.name: for sig in subkey_path.iterdir(): - if not sig.is_file(): - raise Exception( - f"Unexpected file type in certificate {certificate.name}: {str(sig)}" - ) - if not is_pgp_fingerprint(sig.stem): - raise Exception( - f"Unexpected file name in certificate {certificate.name}: {str(sig)}" - ) - if sig.suffix != ".asc": - raise Exception( - f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}" - ) - kinds = packet_kinds(packet=sig) - if not kinds or len(kinds) > 1: - raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}") - kind = kinds[0] - if kind != "Signature": - raise Exception(f"Unexpected packet in file {str(sig)}: {kind}") - fingerprint = packet_dump_field(packet=sig, field="Issuer Fingerprint") - if fingerprint != certificate.name: - raise Exception(f"Unexpected packet fingerprint in file {str(sig)}: {fingerprint}") - sig_type = packet_dump_field(packet=sig, field="Type") - if sig_type != "SubkeyBinding": - raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}") + assert_is_file(path=sig) + assert_is_pgp_fingerprint(path=sig, _str=sig.stem) + assert_has_suffix(path=sig, suffix=".asc") + + assert_packet_kind(path=sig, expected="Signature") + assert_signature_type(path=sig, expected="SubkeyBinding") + + assert_filename_matches_packet_issuer_fingerprint(path=sig, check=certificate.name) elif "revocation" == subkey_path.name: for sig in subkey_path.iterdir(): - if not sig.is_file(): - raise Exception( - f"Unexpected file type in certificate {certificate.name}: {str(sig)}" - ) - if not is_pgp_fingerprint(sig.stem): - raise Exception( - f"Unexpected file name in certificate {certificate.name}: {str(sig)}" - ) - if sig.suffix != ".asc": - raise Exception( - f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}" - ) - kinds = packet_kinds(packet=sig) - if not kinds or len(kinds) > 1: - raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}") - kind = kinds[0] - if kind != "Signature": - raise Exception(f"Unexpected packet in file {str(sig)}: {kind}") - fingerprint = packet_dump_field(packet=sig, field="Issuer Fingerprint") - if fingerprint != certificate.name: - raise Exception(f"Unexpected packet fingerprint in file {str(sig)}: {fingerprint}") - sig_type = packet_dump_field(packet=sig, field="Type") - if sig_type != "SubkeyRevocation": - raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}") + assert_is_file(path=sig) + assert_is_pgp_fingerprint(path=sig, _str=sig.stem) + assert_has_suffix(path=sig, suffix=".asc") + + assert_packet_kind(path=sig, expected="Signature") + assert_signature_type(path=sig, expected="SubkeyRevocation") + + assert_filename_matches_packet_issuer_fingerprint(path=sig, check=certificate.name) else: raise Exception( f"Unexpected directory in certificate {certificate.name}: {str(subkey_path)}" @@ -327,3 +219,115 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(path)}") else: raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(path)}") + + +def assert_packet_kind(path: Path, expected: str) -> None: + kinds = packet_kinds(packet=path) + if not kinds or len(kinds) != 1: + raise Exception(f"Unexpected amount of packets in file {str(path)}: {kinds}") + kind = kinds[0] + if kind != expected: + raise Exception(f"Unexpected packet in file {str(path)} kind: {kind} expected: {expected}") + + +def assert_signature_type(path: Path, expected: str) -> None: + sig_type = packet_dump_field(packet=path, field="Type") + if sig_type != expected: + raise Exception(f"Unexpected packet type in file {str(path)} type: {sig_type} expected: {expected}") + + +def assert_signature_type_certification(path: Path) -> None: + sig_type = packet_dump_field(packet=path, field="Type") + if sig_type not in ["GenericCertification", "PersonaCertification", "CasualCertification", "PositiveCertification"]: + raise Exception(f"Unexpected packet certification type in file {str(path)} type: {sig_type}") + + +def assert_is_pgp_fingerprint(path: Path, _str: str) -> None: + if not is_pgp_fingerprint(_str): + raise Exception(f"Unexpected file name, not a pgp fingerprint: {str(path)}") + + +def assert_filename_matches_packet_issuer_fingerprint(path: Path, check: str) -> None: + fingerprint = packet_dump_field(packet=path, field="Issuer Fingerprint") + if not fingerprint == check: + raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}") + + +def assert_filename_matches_packet_fingerprint(path: Path, check: str) -> None: + fingerprint = packet_dump_field(packet=path, field="Fingerprint") + if not fingerprint == check: + raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}") + + +def assert_has_suffix(path: Path, suffix: str) -> None: + if path.suffix != suffix: + raise Exception(f"Unexpected file suffix in {str(path)} expected: {suffix}") + + +def assert_is_file(path: Path) -> None: + if not path.is_file(): + raise Exception(f"Unexpected type, should be file: {str(path)}") + + +def assert_is_dir(path: Path) -> None: + if not path.is_dir(): + raise Exception(f"Unexpected type, should be directory: {str(path)}") + + +def raise_unexpected_file(path: Path) -> None: + raise Exception(f"Unexpected file in directory: {str(path)}") + + +def verify_integrity_key_revocations(path: Path) -> None: + assert_is_dir(path=path) + for sig in path.iterdir(): + assert_is_file(path=sig) + assert_is_pgp_fingerprint(path=sig, _str=sig.stem) + assert_has_suffix(path=sig, suffix=".asc") + + assert_packet_kind(path=sig, expected="Signature") + assert_signature_type(path=sig, expected="KeyRevocation") + + assert_filename_matches_packet_issuer_fingerprint(path=sig, check=sig.stem) + + debug(f"OK: {sig}") + + +def verify_integrity_direct_key_certifications(path: Path) -> None: + for issuer_dir in path.iterdir(): + assert_is_dir(path=issuer_dir) + assert_is_pgp_fingerprint(path=issuer_dir, _str=issuer_dir.name) + for certification in issuer_dir.iterdir(): + verify_integrity_direct_key_certification(path=certification) + + +def verify_integrity_direct_key_revocations(path: Path) -> None: + for issuer_dir in path.iterdir(): + assert_is_dir(path=issuer_dir) + assert_is_pgp_fingerprint(path=issuer_dir, _str=issuer_dir.name) + for certification in issuer_dir.iterdir(): + verify_integrity_direct_key_revocation(path=certification) + + +def verify_integrity_direct_key_certification(path: Path) -> None: + assert_is_file(path=path) + assert_has_suffix(path=path, suffix=".asc") + + assert_packet_kind(path=path, expected="Signature") + assert_signature_type(path=path, expected="DirectKey") + + assert_filename_matches_packet_issuer_fingerprint(path=path, check=path.parent.name) + + debug(f"OK: {path}") + + +def verify_integrity_direct_key_revocation(path: Path) -> None: + assert_is_file(path=path) + assert_has_suffix(path=path, suffix=".asc") + + assert_packet_kind(path=path, expected="Signature") + assert_signature_type(path=path, expected="CertificationRevocation") + + assert_filename_matches_packet_issuer_fingerprint(path=path, check=path.parent.name) + + debug(f"OK: {path}") diff --git a/tests/test_keyring.py b/tests/test_keyring.py index 45120b8..d014b13 100644 --- a/tests/test_keyring.py +++ b/tests/test_keyring.py @@ -1,6 +1,7 @@ from collections import defaultdict from contextlib import nullcontext as does_not_raise from copy import deepcopy +from datetime import datetime from pathlib import Path from random import choice from string import digits @@ -16,7 +17,9 @@ from pytest import mark from pytest import raises from libkeyringctl import keyring +from libkeyringctl.keyring import PACKET_FILENAME_DATETIME_FORMAT from libkeyringctl.types import Fingerprint +from libkeyringctl.types import TrustFilter from libkeyringctl.types import Uid from libkeyringctl.types import Username @@ -131,15 +134,19 @@ def test_transform_fingerprint_to_keyring_path( (True, "DirectKey", "self", does_not_raise()), (True, "GenericCertification", "self", does_not_raise()), (True, "KeyRevocation", None, does_not_raise()), + (True, "CertificationRevocation", None, does_not_raise()), + (True, "CertificationRevocation", "self", does_not_raise()), (True, "DirectKey", None, does_not_raise()), (True, "GenericCertification", None, does_not_raise()), (True, "KeyRevocation", "foo", raises(Exception)), (True, "DirectKey", "foo", does_not_raise()), (True, "GenericCertification", "foo", does_not_raise()), - (True, "foo", "foo", raises(Exception)), + (True, "foo", "foo", does_not_raise()), + (True, "foo", "self", raises(Exception)), (False, "KeyRevocation", True, raises(Exception)), (False, "DirectKey", True, raises(Exception)), (False, "GenericCertification", True, raises(Exception)), + (False, "CertificationRevocation", True, raises(Exception)), ], ) @patch("libkeyringctl.keyring.get_fingerprint_from_partial") @@ -155,6 +162,7 @@ def test_convert_pubkey_signature_packet( valid_fingerprint: Fingerprint, ) -> None: packet = working_dir / "packet" + key_revocations: Dict[Fingerprint, Path] = {} direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) current_packet_fingerprint = None @@ -174,20 +182,28 @@ def test_convert_pubkey_signature_packet( certificate_fingerprint=valid_fingerprint, fingerprint_filter=None, current_packet_fingerprint=current_packet_fingerprint, + key_revocations=key_revocations, direct_revocations=direct_revocations, direct_sigs=direct_sigs, ) if issuer is None or current_packet_fingerprint is None: - assert not direct_revocations and not direct_sigs + assert not direct_revocations and not direct_sigs and not key_revocations else: if packet_type == "KeyRevocation": - if issuer == "self": - assert direct_revocations[valid_fingerprint] == [packet] - else: + assert key_revocations[valid_fingerprint] == packet + elif packet_type in ["CertificationRevocation"]: + if issuer != "self": assert not direct_revocations + else: + assert direct_revocations[valid_fingerprint if issuer == "self" else Fingerprint(issuer)] == [ + packet + ] elif packet_type in ["DirectKey", "GenericCertification"]: - assert direct_sigs[valid_fingerprint if issuer == "self" else Fingerprint(issuer)] == [packet] + if issuer != "self": + assert not direct_sigs + else: + assert direct_sigs[valid_fingerprint if issuer == "self" else Fingerprint(issuer)] == [packet] @mark.parametrize( @@ -227,7 +243,7 @@ def test_convert_uid_signature_packet( revocations: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) current_packet_uid = None issuer = None - fingerprint_filter: Set[Fingerprint] = set([Fingerprint("foo")]) + fingerprint_filter: Set[Fingerprint] = {Fingerprint("foo")} if valid_current_packet_uid: current_packet_uid = Uid("Foobar McFooface ") @@ -256,7 +272,7 @@ def test_convert_uid_signature_packet( if not valid_current_packet_uid or issuer is None: assert not certifications and not revocations else: - if packet_type == "CertificationRevocation" and valid_current_packet_uid: + if packet_type == "CertificationRevocation" and valid_current_packet_uid and issuer_in_filter: assert revocations[current_packet_uid][issuer] == [packet] # type: ignore elif packet_type.endswith("Certification") and issuer_in_filter: assert certifications[current_packet_uid][issuer] == [packet] # type: ignore @@ -350,6 +366,7 @@ def test_convert_signature_packet( valid_fingerprint: Fingerprint, ) -> None: certificate_fingerprint = None + key_revocations: Dict[Fingerprint, Path] = {} direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) certifications: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) @@ -366,6 +383,7 @@ def test_convert_signature_packet( current_packet_mode=current_packet_mode, certificate_fingerprint=certificate_fingerprint, fingerprint_filter=None, + key_revocations=key_revocations, current_packet_fingerprint=None, current_packet_uid=None, direct_revocations=direct_revocations, @@ -391,15 +409,16 @@ def test_convert_signature_packet( Path("foo.asc"), [ Path("--PublicKey"), + Path("--Signature"), Path("--UserID"), Path("--UserAttribute"), Path("--PublicSubkey"), Path("--Signature"), ], [ - "".join(choice("ABCDEF" + digits) for x in range(40)), + "".join(choice("ABCDEF" + digits) for _ in range(40)), "foo ", - "".join(choice("ABCDEF" + digits) for x in range(40)), + "".join(choice("ABCDEF" + digits) for _ in range(40)), ], "bar", does_not_raise(), @@ -493,6 +512,35 @@ def test_persist_subkey_revocations( ) +@patch("libkeyringctl.keyring.packet_signature_creation_time") +@patch("libkeyringctl.keyring.packet_join") +def test_persist_directkey_revocations( + packet_join_mock: Mock, + packet_signature_creation_time_mock: Mock, + working_dir: Path, + keyring_dir: Path, + valid_fingerprint: Fingerprint, +) -> None: + revocation_packet = working_dir / "latest_revocation.asc" + directkey_revocations: Dict[Fingerprint, List[Path]] = {valid_fingerprint: [revocation_packet]} + + dt = datetime(2000, 1, 12, 11, 22, 33) + packet_signature_creation_time_mock.return_value = dt + keyring.persist_direct_key_revocations( + key_dir=keyring_dir, + direct_key_revocations=directkey_revocations, + ) + packet_join_mock.assert_called_once_with( + packets=[revocation_packet], + output=keyring_dir + / "directkey" + / "revocation" + / valid_fingerprint + / f"{dt.strftime(PACKET_FILENAME_DATETIME_FORMAT)}.asc", + force=True, + ) + + @create_certificate(username=Username("foobar"), uids=[Uid("foobar ")]) def test_convert(working_dir: Path, keyring_dir: Path) -> None: keyring.convert( @@ -719,7 +767,7 @@ def test_get_fingerprints_from_paths(keyring_dir: Path, valid_fingerprint: str, fingerprint_subkey_asc = fingerprint_subkey_dir / (fingerprint_subkey_dir.name + ".asc") fingerprint_subkey_asc.touch() - assert keyring.get_fingerprints_from_paths(sources=[fingerprint_subkey_dir]) == set( - [Fingerprint(valid_subkey_fingerprint)] - ) - assert keyring.get_fingerprints_from_paths(sources=[fingerprint_dir]) == set([Fingerprint(valid_fingerprint)]) + assert keyring.get_fingerprints_from_paths(sources=[fingerprint_subkey_dir]) == { + Fingerprint(valid_subkey_fingerprint) + } + assert keyring.get_fingerprints_from_paths(sources=[fingerprint_dir]) == {Fingerprint(valid_fingerprint)}