feature(keyringctl): split direct key packets into own directory

This commit is contained in:
Levente Polyak 2021-11-18 20:07:34 +01:00
parent 31a49c89a3
commit e9dc04df32
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8
4 changed files with 312 additions and 204 deletions

View File

@ -21,6 +21,7 @@ from .sequoia import keyring_split
from .sequoia import latest_certification from .sequoia import latest_certification
from .sequoia import packet_dump_field from .sequoia import packet_dump_field
from .sequoia import packet_join from .sequoia import packet_join
from .sequoia import packet_signature_creation_time
from .sequoia import packet_split from .sequoia import packet_split
from .trust import certificate_trust from .trust import certificate_trust
from .trust import certificate_trust_from_paths from .trust import certificate_trust_from_paths
@ -29,12 +30,15 @@ from .types import Fingerprint
from .types import Trust from .types import Trust
from .types import Uid from .types import Uid
from .types import Username from .types import Username
from .util import contains_fingerprint
from .util import filter_fingerprints_by_trust from .util import filter_fingerprints_by_trust
from .util import get_cert_paths from .util import get_cert_paths
from .util import get_fingerprint_from_partial from .util import get_fingerprint_from_partial
from .util import simplify_ascii from .util import simplify_ascii
from .util import transform_fd_to_tmpfile from .util import transform_fd_to_tmpfile
PACKET_FILENAME_DATETIME_FORMAT: str = "%Y-%m-%d_%H-%M-%S"
def is_pgp_fingerprint(string: str) -> bool: def is_pgp_fingerprint(string: str) -> bool:
"""Returns whether the passed string looks like a PGP (long) fingerprint """Returns whether the passed string looks like a PGP (long) fingerprint
@ -93,6 +97,7 @@ def convert_pubkey_signature_packet(
certificate_fingerprint: Fingerprint, certificate_fingerprint: Fingerprint,
fingerprint_filter: Optional[Set[Fingerprint]], fingerprint_filter: Optional[Set[Fingerprint]],
current_packet_fingerprint: Optional[Fingerprint], current_packet_fingerprint: Optional[Fingerprint],
key_revocations: Dict[Fingerprint, Path],
direct_revocations: Dict[Fingerprint, List[Path]], direct_revocations: Dict[Fingerprint, List[Path]],
direct_sigs: Dict[Fingerprint, List[Path]], direct_sigs: Dict[Fingerprint, List[Path]],
) -> None: ) -> None:
@ -102,6 +107,7 @@ def convert_pubkey_signature_packet(
certificate_fingerprint: The public key certificate fingerprint certificate_fingerprint: The public key certificate fingerprint
fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with
current_packet_fingerprint: Optional certificate fingerprint of the current packet current_packet_fingerprint: Optional certificate fingerprint of the current packet
key_revocation: A dictionary of key revocation packets
direct_revocations: A dictionary of direct key revocations direct_revocations: A dictionary of direct key revocations
direct_sigs: A dictionary of direct key signatures direct_sigs: A dictionary of direct key signatures
""" """
@ -114,13 +120,20 @@ def convert_pubkey_signature_packet(
if not issuer: if not issuer:
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet") debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")
return
if not certificate_fingerprint.endswith(issuer):
debug(f"skipping direct key signature because {issuer} is a third-party and not a self signature")
return
if signature_type == "KeyRevocation":
key_revocations[issuer] = packet
elif signature_type == "DirectKey" or signature_type.endswith("Certification"):
direct_sigs[issuer].append(packet)
elif signature_type == "CertificationRevocation":
direct_revocations[issuer].append(packet)
else: else:
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer): raise Exception(f"unknown signature type: {signature_type}")
direct_revocations[issuer].append(packet)
elif signature_type in ["DirectKey", "GenericCertification"]:
direct_sigs[issuer].append(packet)
else:
raise Exception(f"unknown signature type: {signature_type}")
def convert_uid_signature_packet( def convert_uid_signature_packet(
@ -149,11 +162,12 @@ def convert_uid_signature_packet(
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet") debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")
else: else:
if signature_type == "CertificationRevocation": if signature_type == "CertificationRevocation":
revocations[current_packet_uid][issuer].append(packet) if fingerprint_filter is None or contains_fingerprint(fingerprints=fingerprint_filter, fingerprint=issuer):
revocations[current_packet_uid][issuer].append(packet)
else:
debug(f"The revocation by issuer {issuer} is not appended because it is not in the filter")
elif signature_type.endswith("Certification"): elif signature_type.endswith("Certification"):
# TODO: extend fp filter to all certifications if fingerprint_filter is None or contains_fingerprint(fingerprints=fingerprint_filter, fingerprint=issuer):
# TODO: use contains_fingerprint
if fingerprint_filter is None or any([fp.endswith(issuer) for fp in fingerprint_filter]):
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.") debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
certifications[current_packet_uid][issuer].append(packet) certifications[current_packet_uid][issuer].append(packet)
else: else:
@ -205,6 +219,7 @@ def convert_signature_packet(
current_packet_mode: Optional[str], current_packet_mode: Optional[str],
certificate_fingerprint: Optional[Fingerprint], certificate_fingerprint: Optional[Fingerprint],
fingerprint_filter: Optional[Set[Fingerprint]], fingerprint_filter: Optional[Set[Fingerprint]],
key_revocations: Dict[Fingerprint, Path],
current_packet_fingerprint: Optional[Fingerprint], current_packet_fingerprint: Optional[Fingerprint],
current_packet_uid: Optional[Uid], current_packet_uid: Optional[Uid],
direct_revocations: Dict[Fingerprint, List[Path]], direct_revocations: Dict[Fingerprint, List[Path]],
@ -218,11 +233,12 @@ def convert_signature_packet(
packet: The Path of the packet file to process packet: The Path of the packet file to process
certificate_fingerprint: The public key certificate fingerprint certificate_fingerprint: The public key certificate fingerprint
fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with
key_revocation: A dictionary containing all key revocation packet
current_packet_fingerprint: Optional certificate fingerprint of the current packet current_packet_fingerprint: Optional certificate fingerprint of the current packet
current_packet_uid: Optional Uid of the current packet current_packet_uid: Optional Uid of the current packet
direct_revocations: A dictionary of direct key revocations direct_revocations: A dictionary of direct key revocations
direct_sigs: A dictionary of direct key signatures direct_sigs: A dictionary of direct key signatures
fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with
certifications: A dictionary containing all certificaions certifications: A dictionary containing all certificaions
revocations: A dictionary containing all revocations revocations: A dictionary containing all revocations
subkey_bindings: A dictionary containing all subkey binding signatures subkey_bindings: A dictionary containing all subkey binding signatures
@ -238,6 +254,7 @@ def convert_signature_packet(
certificate_fingerprint=certificate_fingerprint, certificate_fingerprint=certificate_fingerprint,
fingerprint_filter=fingerprint_filter, fingerprint_filter=fingerprint_filter,
current_packet_fingerprint=current_packet_fingerprint, current_packet_fingerprint=current_packet_fingerprint,
key_revocations=key_revocations,
direct_revocations=direct_revocations, direct_revocations=direct_revocations,
direct_sigs=direct_sigs, direct_sigs=direct_sigs,
) )
@ -299,8 +316,7 @@ def convert_certificate(
# root packets # root packets
certificate_fingerprint: Optional[Fingerprint] = None certificate_fingerprint: Optional[Fingerprint] = None
pubkey: Optional[Path] = None pubkey: Optional[Path] = None
# TODO: direct key certifications are not yet selecting the latest sig, owner may have multiple key_revocations: Dict[Fingerprint, Path] = {}
# TODO: direct key certifications are not yet single packet per file
direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list)
direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list)
@ -319,10 +335,6 @@ def convert_certificate(
current_packet_fingerprint: Optional[Fingerprint] = None current_packet_fingerprint: Optional[Fingerprint] = None
current_packet_uid: Optional[Uid] = None current_packet_uid: Optional[Uid] = None
# XXX: PrimaryKeyBinding
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
debug(f"Processing certificate {certificate}") debug(f"Processing certificate {certificate}")
for packet in packet_split(working_dir=working_dir, certificate=certificate): for packet in packet_split(working_dir=working_dir, certificate=certificate):
@ -366,6 +378,7 @@ def convert_certificate(
fingerprint_filter=fingerprint_filter, fingerprint_filter=fingerprint_filter,
current_packet_fingerprint=current_packet_fingerprint, current_packet_fingerprint=current_packet_fingerprint,
current_packet_uid=current_packet_uid, current_packet_uid=current_packet_uid,
key_revocations=key_revocations,
direct_revocations=direct_revocations, direct_revocations=direct_revocations,
direct_sigs=direct_sigs, direct_sigs=direct_sigs,
certifications=certifications, certifications=certifications,
@ -395,6 +408,7 @@ def convert_certificate(
direct_revocations=direct_revocations, direct_revocations=direct_revocations,
certificate_fingerprint=certificate_fingerprint, certificate_fingerprint=certificate_fingerprint,
pubkey=pubkey, pubkey=pubkey,
key_revocations=key_revocations,
subkeys=subkeys, subkeys=subkeys,
subkey_bindings=subkey_bindings, subkey_bindings=subkey_bindings,
subkey_revocations=subkey_revocations, subkey_revocations=subkey_revocations,
@ -412,6 +426,7 @@ def persist_key_material(
direct_revocations: Dict[Fingerprint, List[Path]], direct_revocations: Dict[Fingerprint, List[Path]],
certificate_fingerprint: Fingerprint, certificate_fingerprint: Fingerprint,
pubkey: Path, pubkey: Path,
key_revocations: Dict[Fingerprint, Path],
subkeys: Dict[Fingerprint, Path], subkeys: Dict[Fingerprint, Path],
subkey_bindings: Dict[Fingerprint, List[Path]], subkey_bindings: Dict[Fingerprint, List[Path]],
subkey_revocations: Dict[Fingerprint, List[Path]], subkey_revocations: Dict[Fingerprint, List[Path]],
@ -426,6 +441,7 @@ def persist_key_material(
direct_revocations: A dictionary of direct key revocations direct_revocations: A dictionary of direct key revocations
certificate_fingerprint: The public key certificate fingerprint certificate_fingerprint: The public key certificate fingerprint
pubkey: The Path of the PGP packet representing the public key material pubkey: The Path of the PGP packet representing the public key material
key_revocations: A dictionary containing all key revocations
subkeys: A dictionary of Paths per Fingerprint that represent the subkey material of the certificate subkeys: A dictionary of Paths per Fingerprint that represent the subkey material of the certificate
subkey_bindings: A dictionary containing all subkey binding signatures subkey_bindings: A dictionary containing all subkey binding signatures
subkey_revocations: A dictionary containing all subkey revocations subkey_revocations: A dictionary containing all subkey revocations
@ -440,6 +456,11 @@ def persist_key_material(
key_dir=key_dir, key_dir=key_dir,
) )
persist_key_revocations(
key_dir=key_dir,
key_revocations=key_revocations,
)
persist_direct_key_certifications( persist_direct_key_certifications(
direct_key_certifications=direct_sigs, direct_key_certifications=direct_sigs,
key_dir=key_dir, key_dir=key_dir,
@ -589,6 +610,26 @@ def persist_subkey_revocations(
packet_join(packets=[revocation], output=output_file, force=True) packet_join(packets=[revocation], output=output_file, force=True)
def persist_key_revocations(
key_revocations: Dict[Fingerprint, Path],
key_dir: Path,
) -> None:
"""Persist the key revocation
Parameters
----------
key_revocations: Dictionary with key revocation
key_dir: The root directory below which the revocation is persisted
"""
for issuer, revocation in key_revocations.items():
output_file = key_dir / "revocation" / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {str(revocation)}")
packet_join(packets=[revocation], output=output_file, force=True)
def persist_direct_key_certifications( def persist_direct_key_certifications(
direct_key_certifications: Dict[Fingerprint, List[Path]], direct_key_certifications: Dict[Fingerprint, List[Path]],
key_dir: Path, key_dir: Path,
@ -603,10 +644,14 @@ def persist_direct_key_certifications(
""" """
for issuer, certifications in direct_key_certifications.items(): for issuer, certifications in direct_key_certifications.items():
output_file = key_dir / "certification" / f"{issuer}.asc" output_dir = key_dir / "directkey" / "certification" / issuer
output_file.parent.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
packet_join(packets=certifications, output=output_file, force=True) for certification in certifications:
creation_time = packet_signature_creation_time(certification).strftime(PACKET_FILENAME_DATETIME_FORMAT)
output_file = output_dir / f"{creation_time}.asc"
debug(f"Writing file {output_file} from {str(certification)}")
packet_join(packets=[certification], output=output_file, force=True)
def persist_direct_key_revocations( def persist_direct_key_revocations(
@ -622,10 +667,14 @@ def persist_direct_key_revocations(
""" """
for issuer, certifications in direct_key_revocations.items(): for issuer, certifications in direct_key_revocations.items():
output_file = key_dir / "revocation" / f"{issuer}.asc" output_dir = key_dir / "directkey" / "revocation" / issuer
output_file.parent.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
packet_join(packets=certifications, output=output_file, force=True) for certification in certifications:
creation_time = packet_signature_creation_time(certification).strftime(PACKET_FILENAME_DATETIME_FORMAT)
output_file = output_dir / f"{creation_time}.asc"
debug(f"Writing file {output_file} from {str(certification)}")
packet_join(packets=[certification], output=output_file, force=True)
def persist_uid_certifications( def persist_uid_certifications(
@ -937,7 +986,7 @@ def get_packets_from_path(path: Path) -> List[Path]:
def get_packets_from_listing(path: Path) -> List[Path]: def get_packets_from_listing(path: Path) -> List[Path]:
"""Collects packets from a listing of directories holding one level each by calling `get_get_packets_from_path`. """Collects packets from a listing of directories holding one level each by calling `get_packets_from_path`.
Parameters Parameters
---------- ----------
@ -996,6 +1045,11 @@ def export(
packets += get_packets_from_listing(cert_dir / "subkey") packets += get_packets_from_listing(cert_dir / "subkey")
packets += get_packets_from_listing(cert_dir / "uid") packets += get_packets_from_listing(cert_dir / "uid")
directkey_path = cert_dir / "directkey"
directkeys = directkey_path.iterdir() if directkey_path.exists() else []
for path in directkeys:
packets += get_packets_from_listing(path)
output_path = temp_dir / f"{cert_dir.name}.asc" output_path = temp_dir / f"{cert_dir.name}.asc"
debug(f"Joining {cert_dir} in {output_path}") debug(f"Joining {cert_dir} in {output_path}")
packet_join( packet_join(

View File

@ -206,7 +206,9 @@ def packet_signature_creation_time(packet: Path) -> datetime:
------- -------
The signature creation time as datetime The signature creation time as datetime
""" """
return datetime.strptime(packet_dump_field(packet, "Signature creation time"), "%Y-%m-%d %H:%M:%S %Z") field = packet_dump_field(packet, "Signature creation time")
field = " ".join(field.split(" ", 3)[0:3])
return datetime.strptime(field, "%Y-%m-%d %H:%M:%S %Z")
def packet_kinds(packet: Path) -> List[PacketKind]: def packet_kinds(packet: Path) -> List[PacketKind]:

View File

@ -15,7 +15,6 @@ from libkeyringctl.keyring import transform_username_to_keyring_path
from libkeyringctl.sequoia import packet_dump_field from libkeyringctl.sequoia import packet_dump_field
from libkeyringctl.sequoia import packet_kinds from libkeyringctl.sequoia import packet_kinds
from libkeyringctl.types import Fingerprint from libkeyringctl.types import Fingerprint
from libkeyringctl.types import PacketKind
from libkeyringctl.types import Uid from libkeyringctl.types import Uid
from libkeyringctl.util import get_cert_paths from libkeyringctl.util import get_cert_paths
from libkeyringctl.util import get_fingerprint_from_partial from libkeyringctl.util import get_fingerprint_from_partial
@ -91,60 +90,26 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
if path.is_file(): if path.is_file():
if path.name != f"{certificate.name}.asc": if path.name != f"{certificate.name}.asc":
raise Exception(f"Unexpected file in certificate {certificate.name}: {str(path)}") raise Exception(f"Unexpected file in certificate {certificate.name}: {str(path)}")
kinds: List[PacketKind] = packet_kinds(packet=path) assert_packet_kind(path=path, expected="Public-Key")
if not kinds or len(kinds) > 1: assert_filename_matches_packet_fingerprint(path=path, check=certificate.name)
raise Exception(f"Unexpected amount of packets in file {str(path)}: {kinds}")
kind = kinds[0]
if kind != "Public-Key":
raise Exception(f"Unexpected packet in file {str(path)}: {kind}")
fingerprint = packet_dump_field(packet=path, field="Fingerprint")
if fingerprint != certificate.name:
raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}")
debug(f"OK: {path}") debug(f"OK: {path}")
elif path.is_dir(): elif path.is_dir():
# TODO: check direct key types, multiple if "revocation" == path.name:
if "certification" == path.name: verify_integrity_key_revocations(path=path)
for sig in path.iterdir(): elif "directkey" == path.name:
if not sig.is_file(): for directkey in path.iterdir():
raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(sig)}") assert_is_dir(path=directkey)
if not is_pgp_fingerprint(sig.stem): if "certification" == directkey.name:
raise Exception(f"Unexpected file name in certificate {certificate.name}: {str(sig)}") verify_integrity_direct_key_certifications(path=directkey)
if sig.suffix != ".asc": elif "revocation" == directkey.name:
raise Exception(f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}") verify_integrity_direct_key_revocations(path=directkey)
kinds = packet_kinds(packet=sig) else:
if not kinds: raise_unexpected_file(path=directkey)
raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}")
if any(filter(lambda kind: not kind == "Signature", kinds)):
raise Exception(f"Unexpected packet in file {str(sig)}: {kinds}")
debug(f"OK: {path}")
elif "revocation" == path.name:
for sig in path.iterdir():
if not sig.is_file():
raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(sig)}")
if not is_pgp_fingerprint(sig.stem):
raise Exception(f"Unexpected file name in certificate {certificate.name}: {str(sig)}")
if sig.suffix != ".asc":
raise Exception(f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}")
kinds = packet_kinds(packet=sig)
if not kinds or len(kinds) > 1:
raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}")
kind = kinds[0]
if kind != "Signature":
raise Exception(f"Unexpected packet in file {str(sig)}: {kind}")
fingerprint = packet_dump_field(packet=sig, field="Issuer Fingerprint")
if not fingerprint == sig.stem:
raise Exception(f"Unexpected packet fingerprint in file {str(sig)}: {fingerprint}")
sig_type = packet_dump_field(packet=sig, field="Type")
if "KeyRevocation" != sig_type:
raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}")
debug(f"OK: {sig}")
elif "uid" == path.name: elif "uid" == path.name:
for uid in path.iterdir(): for uid in path.iterdir():
if not uid.is_dir(): assert_is_dir(path=uid)
raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(uid)}")
uid_packet = uid / f"{uid.name}.asc" uid_packet = uid / f"{uid.name}.asc"
if not uid_packet.is_file(): assert_is_file(path=uid_packet)
raise Exception(f"Missing uid packet for {certificate.name}: {str(uid_packet)}")
uid_binding_sig = uid / "certification" / f"{certificate.name}.asc" uid_binding_sig = uid / "certification" / f"{certificate.name}.asc"
uid_revocation_sig = uid / "revocation" / f"{certificate.name}.asc" uid_revocation_sig = uid / "revocation" / f"{certificate.name}.asc"
@ -155,12 +120,9 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
if uid_path.is_file(): if uid_path.is_file():
if uid_path.name != f"{uid.name}.asc": if uid_path.name != f"{uid.name}.asc":
raise Exception(f"Unexpected file in certificate {certificate.name}: {str(uid_path)}") raise Exception(f"Unexpected file in certificate {certificate.name}: {str(uid_path)}")
kinds = packet_kinds(packet=uid_path)
if not kinds or len(kinds) > 1: assert_packet_kind(path=uid_path, expected="User")
raise Exception(f"Unexpected amount of packets in file {str(uid_path)}: {kinds}")
kind = kinds[0]
if kind != "User":
raise Exception(f"Unexpected packet in file {str(uid_path)}: {kind}")
uid_value = Uid(simplify_ascii(packet_dump_field(packet=uid_path, field="Value"))) uid_value = Uid(simplify_ascii(packet_dump_field(packet=uid_path, field="Value")))
if uid_value != uid.name: if uid_value != uid.name:
raise Exception(f"Unexpected uid in file {str(uid_path)}: {uid_value}") raise Exception(f"Unexpected uid in file {str(uid_path)}: {uid_value}")
@ -168,63 +130,35 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(uid_path)}") raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(uid_path)}")
elif "certification" == uid_path.name: elif "certification" == uid_path.name:
for sig in uid_path.iterdir(): for sig in uid_path.iterdir():
if not sig.is_file(): assert_is_file(path=sig)
raise Exception( assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
f"Unexpected file type in certificate {certificate.name}: {str(sig)}" assert_has_suffix(path=sig, suffix=".asc")
)
if not is_pgp_fingerprint(sig.stem): assert_packet_kind(path=sig, expected="Signature")
raise Exception( assert_signature_type_certification(path=sig)
f"Unexpected file name in certificate {certificate.name}: {str(sig)}"
)
if sig.suffix != ".asc":
raise Exception(
f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}"
)
kinds = packet_kinds(packet=sig)
if not kinds or len(kinds) > 1:
raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}")
kind = kinds[0]
if kind != "Signature":
raise Exception(f"Unexpected packet in file {str(sig)}: {kind}")
issuer = get_fingerprint_from_partial( issuer = get_fingerprint_from_partial(
fingerprints=all_fingerprints, fingerprints=all_fingerprints,
fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")), fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")),
) )
if issuer != sig.stem: if issuer != sig.stem:
raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}") raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}")
sig_type = packet_dump_field(packet=sig, field="Type")
if not sig_type.endswith("Certification"):
raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}")
debug(f"OK: {sig}") debug(f"OK: {sig}")
elif "revocation" == uid_path.name: elif "revocation" == uid_path.name:
for sig in uid_path.iterdir(): for sig in uid_path.iterdir():
if not sig.is_file(): assert_is_file(path=sig)
raise Exception( assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
f"Unexpected file type in certificate {certificate.name}: {str(sig)}" assert_has_suffix(path=sig, suffix=".asc")
)
if not is_pgp_fingerprint(sig.stem): assert_packet_kind(path=sig, expected="Signature")
raise Exception( assert_signature_type(path=sig, expected="CertificationRevocation")
f"Unexpected file name in certificate {certificate.name}: {str(sig)}"
)
if sig.suffix != ".asc":
raise Exception(
f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}"
)
kinds = packet_kinds(packet=sig)
if not kinds or len(kinds) > 1:
raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}")
kind = kinds[0]
if kind != "Signature":
raise Exception(f"Unexpected packet in file {str(sig)}: {kind}")
issuer = get_fingerprint_from_partial( issuer = get_fingerprint_from_partial(
fingerprints=all_fingerprints, fingerprints=all_fingerprints,
fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")), fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")),
) )
if issuer != sig.stem: if issuer != sig.stem:
raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}") raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}")
sig_type = packet_dump_field(packet=sig, field="Type")
if sig_type != "CertificationRevocation":
raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}")
debug(f"OK: {sig}") debug(f"OK: {sig}")
else: else:
raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(uid_path)}") raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(uid_path)}")
@ -232,13 +166,11 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
debug(f"OK: {uid}") debug(f"OK: {uid}")
elif "subkey" == path.name: elif "subkey" == path.name:
for subkey in path.iterdir(): for subkey in path.iterdir():
if not subkey.is_dir(): assert_is_dir(path=subkey)
raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(subkey)}") assert_is_pgp_fingerprint(path=subkey, _str=subkey.name)
if not is_pgp_fingerprint(subkey.name):
raise Exception(f"Unexpected file name in certificate {certificate.name}: {str(subkey)}")
subkey_packet = subkey / f"{subkey.name}.asc" subkey_packet = subkey / f"{subkey.name}.asc"
if not subkey_packet.is_file(): assert_is_file(path=subkey_packet)
raise Exception(f"Missing subkey packet for {certificate.name}: {str(subkey_packet)}")
subkey_binding_sig = subkey / "certification" / f"{certificate.name}.asc" subkey_binding_sig = subkey / "certification" / f"{certificate.name}.asc"
subkey_revocation_sig = subkey / "revocation" / f"{certificate.name}.asc" subkey_revocation_sig = subkey / "revocation" / f"{certificate.name}.asc"
@ -251,73 +183,33 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
raise Exception( raise Exception(
f"Unexpected file in certificate {certificate.name}: {str(subkey_path)}" f"Unexpected file in certificate {certificate.name}: {str(subkey_path)}"
) )
kinds = packet_kinds(packet=subkey_path)
if not kinds or len(kinds) > 1: assert_packet_kind(path=subkey_path, expected="Public-Subkey")
raise Exception(f"Unexpected amount of packets in file {str(subkey_path)}: {kinds}") assert_filename_matches_packet_fingerprint(path=subkey_path, check=subkey_path.stem)
kind = kinds[0]
if kind != "Public-Subkey":
raise Exception(f"Unexpected packet in file {str(subkey_path)}: {kind}")
fingerprint = packet_dump_field(packet=subkey_path, field="Fingerprint")
if fingerprint != subkey_path.stem:
raise Exception(
f"Unexpected packet fingerprint in file {str(subkey_path)}: {fingerprint}"
)
elif not subkey_path.is_dir(): elif not subkey_path.is_dir():
raise Exception( raise Exception(
f"Unexpected file type in certificate {certificate.name}: {str(subkey_path)}" f"Unexpected file type in certificate {certificate.name}: {str(subkey_path)}"
) )
elif "certification" == subkey_path.name: elif "certification" == subkey_path.name:
for sig in subkey_path.iterdir(): for sig in subkey_path.iterdir():
if not sig.is_file(): assert_is_file(path=sig)
raise Exception( assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
f"Unexpected file type in certificate {certificate.name}: {str(sig)}" assert_has_suffix(path=sig, suffix=".asc")
)
if not is_pgp_fingerprint(sig.stem): assert_packet_kind(path=sig, expected="Signature")
raise Exception( assert_signature_type(path=sig, expected="SubkeyBinding")
f"Unexpected file name in certificate {certificate.name}: {str(sig)}"
) assert_filename_matches_packet_issuer_fingerprint(path=sig, check=certificate.name)
if sig.suffix != ".asc":
raise Exception(
f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}"
)
kinds = packet_kinds(packet=sig)
if not kinds or len(kinds) > 1:
raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}")
kind = kinds[0]
if kind != "Signature":
raise Exception(f"Unexpected packet in file {str(sig)}: {kind}")
fingerprint = packet_dump_field(packet=sig, field="Issuer Fingerprint")
if fingerprint != certificate.name:
raise Exception(f"Unexpected packet fingerprint in file {str(sig)}: {fingerprint}")
sig_type = packet_dump_field(packet=sig, field="Type")
if sig_type != "SubkeyBinding":
raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}")
elif "revocation" == subkey_path.name: elif "revocation" == subkey_path.name:
for sig in subkey_path.iterdir(): for sig in subkey_path.iterdir():
if not sig.is_file(): assert_is_file(path=sig)
raise Exception( assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
f"Unexpected file type in certificate {certificate.name}: {str(sig)}" assert_has_suffix(path=sig, suffix=".asc")
)
if not is_pgp_fingerprint(sig.stem): assert_packet_kind(path=sig, expected="Signature")
raise Exception( assert_signature_type(path=sig, expected="SubkeyRevocation")
f"Unexpected file name in certificate {certificate.name}: {str(sig)}"
) assert_filename_matches_packet_issuer_fingerprint(path=sig, check=certificate.name)
if sig.suffix != ".asc":
raise Exception(
f"Unexpected file suffix in certificate {certificate.name}: {str(sig)}"
)
kinds = packet_kinds(packet=sig)
if not kinds or len(kinds) > 1:
raise Exception(f"Unexpected amount of packets in file {str(sig)}: {kinds}")
kind = kinds[0]
if kind != "Signature":
raise Exception(f"Unexpected packet in file {str(sig)}: {kind}")
fingerprint = packet_dump_field(packet=sig, field="Issuer Fingerprint")
if fingerprint != certificate.name:
raise Exception(f"Unexpected packet fingerprint in file {str(sig)}: {fingerprint}")
sig_type = packet_dump_field(packet=sig, field="Type")
if sig_type != "SubkeyRevocation":
raise Exception(f"Unexpected packet type in file {str(sig)}: {sig_type}")
else: else:
raise Exception( raise Exception(
f"Unexpected directory in certificate {certificate.name}: {str(subkey_path)}" f"Unexpected directory in certificate {certificate.name}: {str(subkey_path)}"
@ -327,3 +219,115 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(path)}") raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(path)}")
else: else:
raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(path)}") raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(path)}")
def assert_packet_kind(path: Path, expected: str) -> None:
kinds = packet_kinds(packet=path)
if not kinds or len(kinds) != 1:
raise Exception(f"Unexpected amount of packets in file {str(path)}: {kinds}")
kind = kinds[0]
if kind != expected:
raise Exception(f"Unexpected packet in file {str(path)} kind: {kind} expected: {expected}")
def assert_signature_type(path: Path, expected: str) -> None:
sig_type = packet_dump_field(packet=path, field="Type")
if sig_type != expected:
raise Exception(f"Unexpected packet type in file {str(path)} type: {sig_type} expected: {expected}")
def assert_signature_type_certification(path: Path) -> None:
sig_type = packet_dump_field(packet=path, field="Type")
if sig_type not in ["GenericCertification", "PersonaCertification", "CasualCertification", "PositiveCertification"]:
raise Exception(f"Unexpected packet certification type in file {str(path)} type: {sig_type}")
def assert_is_pgp_fingerprint(path: Path, _str: str) -> None:
if not is_pgp_fingerprint(_str):
raise Exception(f"Unexpected file name, not a pgp fingerprint: {str(path)}")
def assert_filename_matches_packet_issuer_fingerprint(path: Path, check: str) -> None:
fingerprint = packet_dump_field(packet=path, field="Issuer Fingerprint")
if not fingerprint == check:
raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}")
def assert_filename_matches_packet_fingerprint(path: Path, check: str) -> None:
fingerprint = packet_dump_field(packet=path, field="Fingerprint")
if not fingerprint == check:
raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}")
def assert_has_suffix(path: Path, suffix: str) -> None:
if path.suffix != suffix:
raise Exception(f"Unexpected file suffix in {str(path)} expected: {suffix}")
def assert_is_file(path: Path) -> None:
if not path.is_file():
raise Exception(f"Unexpected type, should be file: {str(path)}")
def assert_is_dir(path: Path) -> None:
if not path.is_dir():
raise Exception(f"Unexpected type, should be directory: {str(path)}")
def raise_unexpected_file(path: Path) -> None:
raise Exception(f"Unexpected file in directory: {str(path)}")
def verify_integrity_key_revocations(path: Path) -> None:
assert_is_dir(path=path)
for sig in path.iterdir():
assert_is_file(path=sig)
assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
assert_has_suffix(path=sig, suffix=".asc")
assert_packet_kind(path=sig, expected="Signature")
assert_signature_type(path=sig, expected="KeyRevocation")
assert_filename_matches_packet_issuer_fingerprint(path=sig, check=sig.stem)
debug(f"OK: {sig}")
def verify_integrity_direct_key_certifications(path: Path) -> None:
for issuer_dir in path.iterdir():
assert_is_dir(path=issuer_dir)
assert_is_pgp_fingerprint(path=issuer_dir, _str=issuer_dir.name)
for certification in issuer_dir.iterdir():
verify_integrity_direct_key_certification(path=certification)
def verify_integrity_direct_key_revocations(path: Path) -> None:
for issuer_dir in path.iterdir():
assert_is_dir(path=issuer_dir)
assert_is_pgp_fingerprint(path=issuer_dir, _str=issuer_dir.name)
for certification in issuer_dir.iterdir():
verify_integrity_direct_key_revocation(path=certification)
def verify_integrity_direct_key_certification(path: Path) -> None:
assert_is_file(path=path)
assert_has_suffix(path=path, suffix=".asc")
assert_packet_kind(path=path, expected="Signature")
assert_signature_type(path=path, expected="DirectKey")
assert_filename_matches_packet_issuer_fingerprint(path=path, check=path.parent.name)
debug(f"OK: {path}")
def verify_integrity_direct_key_revocation(path: Path) -> None:
assert_is_file(path=path)
assert_has_suffix(path=path, suffix=".asc")
assert_packet_kind(path=path, expected="Signature")
assert_signature_type(path=path, expected="CertificationRevocation")
assert_filename_matches_packet_issuer_fingerprint(path=path, check=path.parent.name)
debug(f"OK: {path}")

View File

@ -1,6 +1,7 @@
from collections import defaultdict from collections import defaultdict
from contextlib import nullcontext as does_not_raise from contextlib import nullcontext as does_not_raise
from copy import deepcopy from copy import deepcopy
from datetime import datetime
from pathlib import Path from pathlib import Path
from random import choice from random import choice
from string import digits from string import digits
@ -16,7 +17,9 @@ from pytest import mark
from pytest import raises from pytest import raises
from libkeyringctl import keyring from libkeyringctl import keyring
from libkeyringctl.keyring import PACKET_FILENAME_DATETIME_FORMAT
from libkeyringctl.types import Fingerprint from libkeyringctl.types import Fingerprint
from libkeyringctl.types import TrustFilter
from libkeyringctl.types import Uid from libkeyringctl.types import Uid
from libkeyringctl.types import Username from libkeyringctl.types import Username
@ -131,15 +134,19 @@ def test_transform_fingerprint_to_keyring_path(
(True, "DirectKey", "self", does_not_raise()), (True, "DirectKey", "self", does_not_raise()),
(True, "GenericCertification", "self", does_not_raise()), (True, "GenericCertification", "self", does_not_raise()),
(True, "KeyRevocation", None, does_not_raise()), (True, "KeyRevocation", None, does_not_raise()),
(True, "CertificationRevocation", None, does_not_raise()),
(True, "CertificationRevocation", "self", does_not_raise()),
(True, "DirectKey", None, does_not_raise()), (True, "DirectKey", None, does_not_raise()),
(True, "GenericCertification", None, does_not_raise()), (True, "GenericCertification", None, does_not_raise()),
(True, "KeyRevocation", "foo", raises(Exception)), (True, "KeyRevocation", "foo", raises(Exception)),
(True, "DirectKey", "foo", does_not_raise()), (True, "DirectKey", "foo", does_not_raise()),
(True, "GenericCertification", "foo", does_not_raise()), (True, "GenericCertification", "foo", does_not_raise()),
(True, "foo", "foo", raises(Exception)), (True, "foo", "foo", does_not_raise()),
(True, "foo", "self", raises(Exception)),
(False, "KeyRevocation", True, raises(Exception)), (False, "KeyRevocation", True, raises(Exception)),
(False, "DirectKey", True, raises(Exception)), (False, "DirectKey", True, raises(Exception)),
(False, "GenericCertification", True, raises(Exception)), (False, "GenericCertification", True, raises(Exception)),
(False, "CertificationRevocation", True, raises(Exception)),
], ],
) )
@patch("libkeyringctl.keyring.get_fingerprint_from_partial") @patch("libkeyringctl.keyring.get_fingerprint_from_partial")
@ -155,6 +162,7 @@ def test_convert_pubkey_signature_packet(
valid_fingerprint: Fingerprint, valid_fingerprint: Fingerprint,
) -> None: ) -> None:
packet = working_dir / "packet" packet = working_dir / "packet"
key_revocations: Dict[Fingerprint, Path] = {}
direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list)
direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list)
current_packet_fingerprint = None current_packet_fingerprint = None
@ -174,20 +182,28 @@ def test_convert_pubkey_signature_packet(
certificate_fingerprint=valid_fingerprint, certificate_fingerprint=valid_fingerprint,
fingerprint_filter=None, fingerprint_filter=None,
current_packet_fingerprint=current_packet_fingerprint, current_packet_fingerprint=current_packet_fingerprint,
key_revocations=key_revocations,
direct_revocations=direct_revocations, direct_revocations=direct_revocations,
direct_sigs=direct_sigs, direct_sigs=direct_sigs,
) )
if issuer is None or current_packet_fingerprint is None: if issuer is None or current_packet_fingerprint is None:
assert not direct_revocations and not direct_sigs assert not direct_revocations and not direct_sigs and not key_revocations
else: else:
if packet_type == "KeyRevocation": if packet_type == "KeyRevocation":
if issuer == "self": assert key_revocations[valid_fingerprint] == packet
assert direct_revocations[valid_fingerprint] == [packet] elif packet_type in ["CertificationRevocation"]:
else: if issuer != "self":
assert not direct_revocations assert not direct_revocations
else:
assert direct_revocations[valid_fingerprint if issuer == "self" else Fingerprint(issuer)] == [
packet
]
elif packet_type in ["DirectKey", "GenericCertification"]: elif packet_type in ["DirectKey", "GenericCertification"]:
assert direct_sigs[valid_fingerprint if issuer == "self" else Fingerprint(issuer)] == [packet] if issuer != "self":
assert not direct_sigs
else:
assert direct_sigs[valid_fingerprint if issuer == "self" else Fingerprint(issuer)] == [packet]
@mark.parametrize( @mark.parametrize(
@ -227,7 +243,7 @@ def test_convert_uid_signature_packet(
revocations: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) revocations: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list))
current_packet_uid = None current_packet_uid = None
issuer = None issuer = None
fingerprint_filter: Set[Fingerprint] = set([Fingerprint("foo")]) fingerprint_filter: Set[Fingerprint] = {Fingerprint("foo")}
if valid_current_packet_uid: if valid_current_packet_uid:
current_packet_uid = Uid("Foobar McFooface <foo@barmcfoofa.ce>") current_packet_uid = Uid("Foobar McFooface <foo@barmcfoofa.ce>")
@ -256,7 +272,7 @@ def test_convert_uid_signature_packet(
if not valid_current_packet_uid or issuer is None: if not valid_current_packet_uid or issuer is None:
assert not certifications and not revocations assert not certifications and not revocations
else: else:
if packet_type == "CertificationRevocation" and valid_current_packet_uid: if packet_type == "CertificationRevocation" and valid_current_packet_uid and issuer_in_filter:
assert revocations[current_packet_uid][issuer] == [packet] # type: ignore assert revocations[current_packet_uid][issuer] == [packet] # type: ignore
elif packet_type.endswith("Certification") and issuer_in_filter: elif packet_type.endswith("Certification") and issuer_in_filter:
assert certifications[current_packet_uid][issuer] == [packet] # type: ignore assert certifications[current_packet_uid][issuer] == [packet] # type: ignore
@ -350,6 +366,7 @@ def test_convert_signature_packet(
valid_fingerprint: Fingerprint, valid_fingerprint: Fingerprint,
) -> None: ) -> None:
certificate_fingerprint = None certificate_fingerprint = None
key_revocations: Dict[Fingerprint, Path] = {}
direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list)
direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list)
certifications: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) certifications: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list))
@ -366,6 +383,7 @@ def test_convert_signature_packet(
current_packet_mode=current_packet_mode, current_packet_mode=current_packet_mode,
certificate_fingerprint=certificate_fingerprint, certificate_fingerprint=certificate_fingerprint,
fingerprint_filter=None, fingerprint_filter=None,
key_revocations=key_revocations,
current_packet_fingerprint=None, current_packet_fingerprint=None,
current_packet_uid=None, current_packet_uid=None,
direct_revocations=direct_revocations, direct_revocations=direct_revocations,
@ -391,15 +409,16 @@ def test_convert_signature_packet(
Path("foo.asc"), Path("foo.asc"),
[ [
Path("--PublicKey"), Path("--PublicKey"),
Path("--Signature"),
Path("--UserID"), Path("--UserID"),
Path("--UserAttribute"), Path("--UserAttribute"),
Path("--PublicSubkey"), Path("--PublicSubkey"),
Path("--Signature"), Path("--Signature"),
], ],
[ [
"".join(choice("ABCDEF" + digits) for x in range(40)), "".join(choice("ABCDEF" + digits) for _ in range(40)),
"foo <foo@bar.com>", "foo <foo@bar.com>",
"".join(choice("ABCDEF" + digits) for x in range(40)), "".join(choice("ABCDEF" + digits) for _ in range(40)),
], ],
"bar", "bar",
does_not_raise(), does_not_raise(),
@ -493,6 +512,35 @@ def test_persist_subkey_revocations(
) )
@patch("libkeyringctl.keyring.packet_signature_creation_time")
@patch("libkeyringctl.keyring.packet_join")
def test_persist_directkey_revocations(
packet_join_mock: Mock,
packet_signature_creation_time_mock: Mock,
working_dir: Path,
keyring_dir: Path,
valid_fingerprint: Fingerprint,
) -> None:
revocation_packet = working_dir / "latest_revocation.asc"
directkey_revocations: Dict[Fingerprint, List[Path]] = {valid_fingerprint: [revocation_packet]}
dt = datetime(2000, 1, 12, 11, 22, 33)
packet_signature_creation_time_mock.return_value = dt
keyring.persist_direct_key_revocations(
key_dir=keyring_dir,
direct_key_revocations=directkey_revocations,
)
packet_join_mock.assert_called_once_with(
packets=[revocation_packet],
output=keyring_dir
/ "directkey"
/ "revocation"
/ valid_fingerprint
/ f"{dt.strftime(PACKET_FILENAME_DATETIME_FORMAT)}.asc",
force=True,
)
@create_certificate(username=Username("foobar"), uids=[Uid("foobar <foo@bar.xyz>")]) @create_certificate(username=Username("foobar"), uids=[Uid("foobar <foo@bar.xyz>")])
def test_convert(working_dir: Path, keyring_dir: Path) -> None: def test_convert(working_dir: Path, keyring_dir: Path) -> None:
keyring.convert( keyring.convert(
@ -719,7 +767,7 @@ def test_get_fingerprints_from_paths(keyring_dir: Path, valid_fingerprint: str,
fingerprint_subkey_asc = fingerprint_subkey_dir / (fingerprint_subkey_dir.name + ".asc") fingerprint_subkey_asc = fingerprint_subkey_dir / (fingerprint_subkey_dir.name + ".asc")
fingerprint_subkey_asc.touch() fingerprint_subkey_asc.touch()
assert keyring.get_fingerprints_from_paths(sources=[fingerprint_subkey_dir]) == set( assert keyring.get_fingerprints_from_paths(sources=[fingerprint_subkey_dir]) == {
[Fingerprint(valid_subkey_fingerprint)] Fingerprint(valid_subkey_fingerprint)
) }
assert keyring.get_fingerprints_from_paths(sources=[fingerprint_dir]) == set([Fingerprint(valid_fingerprint)]) assert keyring.get_fingerprints_from_paths(sources=[fingerprint_dir]) == {Fingerprint(valid_fingerprint)}