feature(keyringctl): simplification by removing static data from types

The certificate fingerprint in the convert function remains always the
same as we only process a single certificate and loop outside over
multiple keyrings. Therefor remove that layer from the data structures
and implicitly simplify all the assignments and usages.
This commit is contained in:
Levente Polyak 2021-10-19 18:26:50 +02:00
parent a43d3dfac6
commit 2206fe07b6
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8

View File

@ -174,13 +174,13 @@ def convert_certificate( # noqa: ignore=C901
# root packets # root packets
certificate_fingerprint: Optional[Fingerprint] = None certificate_fingerprint: Optional[Fingerprint] = None
pubkey: Optional[Path] = None pubkey: Optional[Path] = None
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]] = {} direct_sigs: Dict[str, List[Path]] = defaultdict(list)
direct_revocations: Dict[Fingerprint, Dict[str, List[Path]]] = {} direct_revocations: Dict[str, List[Path]] = defaultdict(list)
# subkey packets # subkey packets
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]] = {} subkeys: Dict[Fingerprint, Path] = {}
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]] = {} subkey_binding_sigs: Dict[Fingerprint, Path] = {}
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]] = {} subkey_revocations: Dict[Fingerprint, Path] = {}
# uid packets # uid packets
uids: Dict[Uid, Path] = {} uids: Dict[Uid, Path] = {}
@ -194,34 +194,6 @@ def convert_certificate( # noqa: ignore=C901
current_packet_fingerprint: Optional[Fingerprint] = None current_packet_fingerprint: Optional[Fingerprint] = None
current_packet_uid: Optional[Uid] = None current_packet_uid: Optional[Uid] = None
def add_packet_to_direct_sigs(
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]],
issuer: str,
packet_key: Fingerprint,
packet: Path,
) -> Dict[Fingerprint, Dict[str, List[Path]]]:
"""Add a packet to the set of DirectKeys
If no key with the given packet_key exists yet, it is created.
Parameters
----------
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]]
The signatures directly on a root key (such as DirectKey or *Certifications without a specific User ID)
issuer: str
The issuer of the signature
packet: Path
The path to the packet
packet_key: Fingerprint
The key identifying the packet (e.g. its Fingerprint)
"""
if not direct_sigs.get(packet_key):
direct_sigs = direct_sigs | {packet_key: defaultdict(list)}
direct_sigs[packet_key][issuer].append(packet)
return direct_sigs
# XXX: PrimaryKeyBinding # XXX: PrimaryKeyBinding
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean # TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
@ -248,14 +220,7 @@ def convert_certificate( # noqa: ignore=C901
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint")) current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
current_packet_uid = None current_packet_uid = None
if not certificate_fingerprint: subkeys[current_packet_fingerprint] = packet
raise Exception('missing certificate fingerprint for "{packet.name}"')
if not subkeys.get(certificate_fingerprint):
subkeys |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
else:
subkeys[certificate_fingerprint] |= {current_packet_fingerprint: packet}
elif packet.name.endswith("--Signature"): elif packet.name.endswith("--Signature"):
if not certificate_fingerprint: if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"') raise Exception('missing certificate fingerprint for "{packet.name}"')
@ -268,19 +233,9 @@ def convert_certificate( # noqa: ignore=C901
raise Exception('missing current packet fingerprint for "{packet.name}"') raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer): if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
direct_revocations = add_packet_to_direct_sigs( direct_revocations[issuer].append(packet)
direct_sigs=direct_revocations,
issuer=issuer,
packet_key=current_packet_fingerprint,
packet=packet,
)
elif signature_type in ["DirectKey", "GenericCertification"]: elif signature_type in ["DirectKey", "GenericCertification"]:
direct_sigs = add_packet_to_direct_sigs( direct_sigs[issuer].append(packet)
direct_sigs=direct_sigs,
issuer=issuer,
packet_key=current_packet_fingerprint,
packet=packet,
)
else: else:
raise Exception(f"unknown signature type: {signature_type}") raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "uid": elif current_packet_mode == "uid":
@ -304,15 +259,9 @@ def convert_certificate( # noqa: ignore=C901
raise Exception('missing current packet fingerprint for "{packet.name}"') raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "SubkeyBinding": if signature_type == "SubkeyBinding":
if not subkey_binding_sigs.get(certificate_fingerprint): subkey_binding_sigs[current_packet_fingerprint] = packet
subkey_binding_sigs |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
else:
subkey_binding_sigs[certificate_fingerprint] |= {current_packet_fingerprint: packet}
elif signature_type == "SubkeyRevocation": elif signature_type == "SubkeyRevocation":
if not subkey_revocations.get(certificate_fingerprint): subkey_revocations[certificate_fingerprint] = packet
subkey_revocations |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
else:
subkey_revocations[certificate_fingerprint] |= {current_packet_fingerprint: packet}
else: else:
raise Exception(f"unknown signature type: {signature_type}") raise Exception(f"unknown signature type: {signature_type}")
else: else:
@ -343,14 +292,12 @@ def convert_certificate( # noqa: ignore=C901
) )
persist_subkeys( persist_subkeys(
certificate_fingerprint=certificate_fingerprint,
key_dir=key_dir, key_dir=key_dir,
subkeys=subkeys, subkeys=subkeys,
subkey_binding_sigs=subkey_binding_sigs, subkey_binding_sigs=subkey_binding_sigs,
) )
persist_subkey_revocations( persist_subkey_revocations(
certificate_fingerprint=certificate_fingerprint,
key_dir=key_dir, key_dir=key_dir,
subkey_revocations=subkey_revocations, subkey_revocations=subkey_revocations,
) )
@ -440,63 +387,55 @@ def persist_uids(
def persist_subkeys( def persist_subkeys(
certificate_fingerprint: Fingerprint,
key_dir: Path, key_dir: Path,
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]], subkeys: Dict[Fingerprint, Path],
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]], subkey_binding_sigs: Dict[Fingerprint, Path],
) -> None: ) -> None:
"""Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s) """Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s)
Parameters Parameters
---------- ----------
certificate_fingerprint: Fingerprint
The unique fingerprint of the public key
key_dir: Path key_dir: Path
The root directory below which the basic key material is persisted The root directory below which the basic key material is persisted
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]] subkeys: Dict[Fingerprint, Path]
The PublicSubkeys of a key The PublicSubkeys of a key
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]] subkey_binding_sigs: Dict[Fingerprint, Path]
The SubkeyBinding signatures of a Public-Key (the root key) The SubkeyBinding signatures of a Public-Key (the root key)
""" """
if subkeys.get(certificate_fingerprint): for fingerprint, subkey in subkeys.items():
for signature, subkey in subkeys[certificate_fingerprint].items(): packets: List[Path] = []
packets: List[Path] = [] packets.extend([subkey, subkey_binding_sigs[fingerprint]])
packets.extend([subkey, subkey_binding_sigs[certificate_fingerprint][signature]]) output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc"
output_file = key_dir / "subkey" / signature / f"{signature}.asc" output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.parent.mkdir(parents=True, exist_ok=True) debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") packet_join(packets=packets, output=output_file)
packet_join(packets=packets, output=output_file)
def persist_subkey_revocations( def persist_subkey_revocations(
certificate_fingerprint: Fingerprint,
key_dir: Path, key_dir: Path,
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]], subkey_revocations: Dict[Fingerprint, Path],
) -> None: ) -> None:
"""Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s) """Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s)
Parameters Parameters
---------- ----------
certificate_fingerprint: Fingerprint
The unique fingerprint of the public key
key_dir: Path key_dir: Path
The root directory below which the basic key material is persisted The root directory below which the basic key material is persisted
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]] subkey_revocations: Dict[Fingerprint, Path]
The SubkeyRevocations of PublicSubkeys of a key The SubkeyRevocations of PublicSubkeys of a key
""" """
if subkey_revocations.get(certificate_fingerprint): for fingerprint, revocation in subkey_revocations.items():
for signature, revocation in subkey_revocations[certificate_fingerprint].items(): issuer = packet_dump_field(revocation, "Issuer")
issuer = packet_dump_field(revocation, "Issuer") output_file = key_dir / "subkey" / fingerprint / "revocation" / f"{issuer}.asc"
output_file = key_dir / "subkey" / signature / "revocation" / f"{issuer}.asc" output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.parent.mkdir(parents=True, exist_ok=True) debug(f"Writing file {output_file} from {revocation}")
debug(f"Writing file {output_file} from {revocation}") packet_join(packets=[revocation], output=output_file)
packet_join(packets=[revocation], output=output_file)
def persist_direct_sigs( def persist_direct_sigs(
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]], direct_sigs: Dict[str, List[Path]],
pubkey: Path, pubkey: Path,
key_dir: Path, key_dir: Path,
sig_type: str = "certification", sig_type: str = "certification",
@ -506,8 +445,8 @@ def persist_direct_sigs(
Parameters Parameters
---------- ----------
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]] direct_sigs: Dict[str, List[Path]]
The certifications to write to file The direct sigs to write to file
pubkey: Path pubkey: Path
The path to the public key of the root key The path to the public key of the root key
key_dir: Path key_dir: Path
@ -516,13 +455,12 @@ def persist_direct_sigs(
The type of direct certification to persist (defaults to 'certification'). This influences the directory name The type of direct certification to persist (defaults to 'certification'). This influences the directory name
""" """
for key, current_certifications in direct_sigs.items(): for issuer, certifications in direct_sigs.items():
for issuer, certifications in current_certifications.items(): packets = [pubkey] + certifications
packets = [pubkey] + certifications output_file = key_dir / sig_type / f"{issuer}.asc"
output_file = key_dir / sig_type / f"{issuer}.asc" output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.parent.mkdir(parents=True, exist_ok=True) debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") packet_join(packets, output_file)
packet_join(packets, output_file)
def persist_certifications( def persist_certifications(
@ -952,7 +890,7 @@ def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[Fingerprint], Lis
debug(f"Revoking {cert_fingerprint} due to self-revocation") debug(f"Revoking {cert_fingerprint} due to self-revocation")
revoked_fingerprints.append(cert_fingerprint) revoked_fingerprints.append(cert_fingerprint)
return (all_fingerprints, revoked_fingerprints) return all_fingerprints, revoked_fingerprints
def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]: def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]:
@ -979,7 +917,7 @@ def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint
debug(f"Writing {cert} to {output}") debug(f"Writing {cert} to {output}")
trusted_certs_file.write(f"{cert}:4:\n") trusted_certs_file.write(f"{cert}:4:\n")
return (trusted_certs, all_certs) return trusted_certs, all_certs
def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None: def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None: