Simplify libkeyringctl.keyring.convert_certificate

libkeyringctl/keyring.py:
Simplify `convert_certificate()` by splitting out the conversion of
signature packets to `convert_signature_packet()` and the persistence of
packet material to `persist_key_material()`.
Add `convert_pubkey_signature_packet()`,
`convert_uid_signature_packet()` and
`convert_subkey_signature_packet()` to deal with the conversion of
public key signatures, UID signatures and subkey signatures
(respectively).

tests/test_keyring.py:
Add tests for `convert_certificate()`, `convert_signature_packet()`,
`convert_{pubkey,uid,subkey}_signature_packet()` and
`persist_subkey_revocations()`.
This commit is contained in:
David Runge
2021-11-14 15:49:18 +01:00
committed by Levente Polyak
parent bb30e3d2fd
commit e43a28f4a7
2 changed files with 614 additions and 69 deletions

View File

@ -88,8 +88,184 @@ def transform_fingerprint_to_keyring_path(keyring_root: Path, paths: List[Path])
paths[index] = fingerprint_paths[0]
# TODO: simplify to lower complexity
def convert_certificate( # noqa: ignore=C901
def convert_pubkey_signature_packet(
packet: Path,
certificate_fingerprint: Fingerprint,
fingerprint_filter: Optional[Set[Fingerprint]],
current_packet_fingerprint: Optional[Fingerprint],
direct_revocations: Dict[Fingerprint, List[Path]],
direct_sigs: Dict[Fingerprint, List[Path]],
) -> None:
"""Convert a public key signature packet
packet: The Path of the packet file to process
certificate_fingerprint: The public key certificate fingerprint
fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with
current_packet_fingerprint: Optional certificate fingerprint of the current packet
direct_revocations: A dictionary of direct key revocations
direct_sigs: A dictionary of direct key signatures
"""
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
signature_type = packet_dump_field(packet=packet, field="Type")
issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer")))
if not issuer:
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")
else:
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
direct_revocations[issuer].append(packet)
elif signature_type in ["DirectKey", "GenericCertification"]:
direct_sigs[issuer].append(packet)
else:
raise Exception(f"unknown signature type: {signature_type}")
def convert_uid_signature_packet(
packet: Path,
current_packet_uid: Optional[Uid],
fingerprint_filter: Optional[Set[Fingerprint]],
certifications: Dict[Uid, Dict[Fingerprint, List[Path]]],
revocations: Dict[Uid, Dict[Fingerprint, List[Path]]],
) -> None:
"""Convert a UID signature packet
packet: The Path of the packet file to process
current_packet_uid: Optional Uid of the current packet
fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with
certifications: A dictionary containing all certificaions
revocations: A dictionary containing all revocations
"""
if not current_packet_uid:
raise Exception('missing current packet uid for "{packet.name}"')
signature_type = packet_dump_field(packet=packet, field="Type")
issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer")))
if not issuer:
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")
else:
if signature_type == "CertificationRevocation":
revocations[current_packet_uid][issuer].append(packet)
elif signature_type.endswith("Certification"):
# TODO: extend fp filter to all certifications
# TODO: use contains_fingerprint
if fingerprint_filter is None or any([fp.endswith(issuer) for fp in fingerprint_filter]):
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
certifications[current_packet_uid][issuer].append(packet)
else:
debug(f"The certification by issuer {issuer} is not appended because it is not in the filter")
else:
raise Exception(f"unknown signature type: {signature_type}")
def convert_subkey_signature_packet(
packet: Path,
certificate_fingerprint: Fingerprint,
current_packet_fingerprint: Optional[Fingerprint],
fingerprint_filter: Optional[Set[Fingerprint]],
subkey_bindings: Dict[Fingerprint, List[Path]],
subkey_revocations: Dict[Fingerprint, List[Path]],
) -> None:
"""Convert a subkey signature packet
packet: The Path of the packet file to process
certificate_fingerprint: The public key certificate fingerprint
current_packet_fingerprint: Optional certificate fingerprint of the current packet
fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with
subkey_bindings: A dictionary containing all subkey binding signatures
subkey_revocations: A dictionary containing all subkey revocations
"""
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
signature_type = packet_dump_field(packet=packet, field="Type")
issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer")))
if not issuer:
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")
else:
if issuer != certificate_fingerprint:
raise Exception(f"subkey packet does not belong to {certificate_fingerprint}, issuer: {issuer}")
if signature_type == "SubkeyBinding":
subkey_bindings[current_packet_fingerprint].append(packet)
elif signature_type == "SubkeyRevocation":
subkey_revocations[current_packet_fingerprint].append(packet)
else:
raise Exception(f"unknown signature type: {signature_type}")
def convert_signature_packet(
packet: Path,
current_packet_mode: Optional[str],
certificate_fingerprint: Optional[Fingerprint],
fingerprint_filter: Optional[Set[Fingerprint]],
current_packet_fingerprint: Optional[Fingerprint],
current_packet_uid: Optional[Uid],
direct_revocations: Dict[Fingerprint, List[Path]],
direct_sigs: Dict[Fingerprint, List[Path]],
certifications: Dict[Uid, Dict[Fingerprint, List[Path]]],
revocations: Dict[Uid, Dict[Fingerprint, List[Path]]],
subkey_bindings: Dict[Fingerprint, List[Path]],
subkey_revocations: Dict[Fingerprint, List[Path]],
) -> None:
"""Convert a signature packet
packet: The Path of the packet file to process
certificate_fingerprint: The public key certificate fingerprint
current_packet_fingerprint: Optional certificate fingerprint of the current packet
current_packet_uid: Optional Uid of the current packet
direct_revocations: A dictionary of direct key revocations
direct_sigs: A dictionary of direct key signatures
fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with
certifications: A dictionary containing all certificaions
revocations: A dictionary containing all revocations
subkey_bindings: A dictionary containing all subkey binding signatures
subkey_revocations: A dictionary containing all subkey revocations
"""
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
if current_packet_mode == "pubkey":
convert_pubkey_signature_packet(
packet=packet,
certificate_fingerprint=certificate_fingerprint,
fingerprint_filter=fingerprint_filter,
current_packet_fingerprint=current_packet_fingerprint,
direct_revocations=direct_revocations,
direct_sigs=direct_sigs,
)
elif current_packet_mode == "uid":
convert_uid_signature_packet(
packet=packet,
current_packet_uid=current_packet_uid,
fingerprint_filter=fingerprint_filter,
certifications=certifications,
revocations=revocations,
)
elif current_packet_mode == "subkey":
convert_subkey_signature_packet(
packet=packet,
current_packet_fingerprint=current_packet_fingerprint,
certificate_fingerprint=certificate_fingerprint,
fingerprint_filter=fingerprint_filter,
subkey_bindings=subkey_bindings,
subkey_revocations=subkey_revocations,
)
elif current_packet_mode == "uattr":
# ignore user attributes and related signatures
debug("skipping user attribute signature packet")
else:
raise Exception(f'unknown signature root for "{packet.name}"')
def convert_certificate(
working_dir: Path,
certificate: Path,
keyring_dir: Path,
@ -183,75 +359,25 @@ def convert_certificate( # noqa: ignore=C901
)
raise Exception("Secret key detected, aborting")
elif packet.name.endswith("--Signature"):
# ignore user attributes and related signatures
if current_packet_mode == "uattr":
debug("skipping user attribute signature packet")
continue
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
issuer = get_fingerprint_from_partial(
fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer"))
convert_signature_packet(
packet=packet,
current_packet_mode=current_packet_mode,
certificate_fingerprint=certificate_fingerprint,
fingerprint_filter=fingerprint_filter,
current_packet_fingerprint=current_packet_fingerprint,
current_packet_uid=current_packet_uid,
direct_revocations=direct_revocations,
direct_sigs=direct_sigs,
certifications=certifications,
revocations=revocations,
subkey_bindings=subkey_bindings,
subkey_revocations=subkey_revocations,
)
if not issuer:
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")
continue
signature_type = packet_dump_field(packet, "Type")
if current_packet_mode == "pubkey":
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
direct_revocations[issuer].append(packet)
elif signature_type in ["DirectKey", "GenericCertification"]:
direct_sigs[issuer].append(packet)
else:
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "uid":
if not current_packet_uid:
raise Exception('missing current packet uid for "{packet.name}"')
if signature_type == "CertificationRevocation":
revocations[current_packet_uid][issuer].append(packet)
elif signature_type.endswith("Certification"):
# TODO: extend fp filter to all certifications
# TODO: use contains_fingerprint
if fingerprint_filter is None or any([fp.endswith(issuer) for fp in fingerprint_filter]):
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
certifications[current_packet_uid][issuer].append(packet)
else:
debug(f"The certification by issuer {issuer} is not appended because it is not in the filter")
else:
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "subkey":
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
issuer = get_fingerprint_from_partial(
fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer"))
)
if issuer != certificate_fingerprint:
raise Exception(f"subkey packet does not belong to {certificate_fingerprint}, issuer: {issuer}")
if signature_type == "SubkeyBinding":
subkey_bindings[current_packet_fingerprint].append(packet)
elif signature_type == "SubkeyRevocation":
subkey_revocations[current_packet_fingerprint].append(packet)
else:
raise Exception(f"unknown signature type: {signature_type}")
else:
raise Exception(f'unknown signature root for "{packet.name}"')
else:
raise Exception(f'unknown packet type "{packet.name}"')
if not certificate_fingerprint:
raise Exception("missing certificate fingerprint")
if not pubkey:
raise Exception("missing certificate public-key")
if not certificate_fingerprint or not pubkey:
raise Exception("missing certificate public key")
name_override = (
name_override
@ -263,6 +389,51 @@ def convert_certificate( # noqa: ignore=C901
key_dir = user_dir / certificate_fingerprint
key_dir.mkdir(parents=True, exist_ok=True)
persist_key_material(
key_dir=key_dir,
direct_sigs=direct_sigs,
direct_revocations=direct_revocations,
certificate_fingerprint=certificate_fingerprint,
pubkey=pubkey,
subkeys=subkeys,
subkey_bindings=subkey_bindings,
subkey_revocations=subkey_revocations,
uids=uids,
certifications=certifications,
revocations=revocations,
)
return key_dir
def persist_key_material(
key_dir: Path,
direct_sigs: Dict[Fingerprint, List[Path]],
direct_revocations: Dict[Fingerprint, List[Path]],
certificate_fingerprint: Fingerprint,
pubkey: Path,
subkeys: Dict[Fingerprint, Path],
subkey_bindings: Dict[Fingerprint, List[Path]],
subkey_revocations: Dict[Fingerprint, List[Path]],
uids: Dict[Uid, Path],
certifications: Dict[Uid, Dict[Fingerprint, List[Path]]],
revocations: Dict[Uid, Dict[Fingerprint, List[Path]]],
) -> None:
"""Persist the key material found in a certificate to decomposed directory structure
key_dir: The Path below which to create a decomposed directory structure for the certificate
direct_sigs: A dictionary of direct key signatures
direct_revocations: A dictionary of direct key revocations
certificate_fingerprint: The public key certificate fingerprint
pubkey: The Path of the PGP packet representing the public key material
subkeys: A dictionary of Paths per Fingerprint that represent the subkey material of the certificate
subkey_bindings: A dictionary containing all subkey binding signatures
subkey_revocations: A dictionary containing all subkey revocations
uids: A dictionary of Path per Uid, that represent the UID packets of the certificate
certifications: A dictionary containing all certificaions
revocations: A dictionary containing all revocations
"""
persist_public_key(
certificate_fingerprint=certificate_fingerprint,
pubkey=pubkey,
@ -311,8 +482,6 @@ def convert_certificate( # noqa: ignore=C901
key_dir=key_dir,
)
return key_dir
def persist_public_key(
certificate_fingerprint: Fingerprint,