feature(keyringctl): add type hinting for fingerprint and uid

This drastically improves readability and type safety when joggling with
different keys in the data structures.
This commit is contained in:
Levente Polyak 2021-10-19 17:35:59 +02:00
parent cd0a2005a7
commit 77b1eab89e
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8

View File

@ -27,12 +27,17 @@ from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import NewType
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
Fingerprint = NewType('Fingerprint', str)
Uid = NewType('Uid', str)
@contextmanager
def cwd(new_dir: Path) -> Iterator[None]:
"""Change to a new current working directory in a context and go back to the previous dir after the context is done
@ -135,7 +140,7 @@ def convert_certificate( # noqa: ignore=C901
working_dir: Path,
certificate: Path,
name_override: Optional[str] = None,
fingerprint_filter: Optional[Set[str]] = None,
fingerprint_filter: Optional[Set[Fingerprint]] = None,
) -> Path:
"""Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets
@ -151,7 +156,7 @@ def convert_certificate( # noqa: ignore=C901
The path to a public key certificate
name_override: Optional[str]
An optional string to override the username in the to be created output directory structure
fingerprint_filter: Optional[Set[str]]
fingerprint_filter: Optional[Set[Fingerprint]]
An optional list of strings defining fingerprints of PGP public keys that all certificates will be filtered
with
@ -166,40 +171,48 @@ def convert_certificate( # noqa: ignore=C901
The path of the user_dir (which is located below working_dir)
"""
certificate_fingerprint: Optional[str] = None
# root packets
certificate_fingerprint: Optional[Fingerprint] = None
pubkey: Optional[Path] = None
direct_sigs: Dict[str, Dict[str, List[Path]]] = {}
direct_revocations: Dict[str, Dict[str, List[Path]]] = {}
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]] = {}
direct_revocations: Dict[Fingerprint, Dict[str, List[Path]]] = {}
# subkey packets
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]] = {}
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]] = {}
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]] = {}
# uid packets
uids: Dict[Uid, Path] = {}
uid_binding_sigs: Dict[Uid, Path] = {}
certifications: Dict[Uid, List[Path]] = defaultdict(list)
revocations: Dict[Uid, List[Path]] = defaultdict(list)
# intermediate variables
username: str = name_override or certificate.stem
current_packet_mode: Optional[str] = None
current_packet_key: Optional[str] = None
uids: Dict[str, Path] = {}
uid_binding_sigs: Dict[str, Path] = {}
subkeys: Dict[str, Dict[str, Path]] = {}
subkey_binding_sigs: Dict[str, Dict[str, Path]] = {}
subkey_revocations: Dict[str, Dict[str, Path]] = {}
certifications: Dict[str, List[Path]] = defaultdict(list)
revocations: Dict[str, List[Path]] = defaultdict(list)
username = name_override or certificate.stem
current_packet_fingerprint: Optional[Fingerprint] = None
current_packet_uid: Optional[Uid] = None
def add_packet_to_direct_sigs(
direct_sigs: Dict[str, Dict[str, List[Path]]],
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]],
issuer: str,
packet_key: str,
packet_key: Fingerprint,
packet: Path,
) -> Dict[str, Dict[str, List[Path]]]:
) -> Dict[Fingerprint, Dict[str, List[Path]]]:
"""Add a packet to the set of DirectKeys
If no key with the given packet_key exists yet, it is created.
Parameters
----------
direct_sigs: Dict[str, Dict[str, List[Path]]]
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]]
The signatures directly on a root key (such as DirectKey or *Certifications without a specific User ID)
issuer: str
The issuer of the signature
packet: Path
The path to the packet
packet_key: str
packet_key: Fingerprint
The key identifying the packet (e.g. its Fingerprint)
"""
@ -218,78 +231,88 @@ def convert_certificate( # noqa: ignore=C901
for packet in packet_split(working_dir=working_dir, certificate=certificate):
debug(f"Processing packet {packet.name}")
if packet.name.endswith("--PublicKey"):
pubkey = packet
certificate_fingerprint = packet_dump_field(packet, "Fingerprint")
current_packet_mode = "pubkey"
current_packet_key = certificate_fingerprint
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
current_packet_uid = None
certificate_fingerprint = current_packet_fingerprint
pubkey = packet
elif packet.name.endswith("--UserID"):
value = simplify_user_id(packet_dump_field(packet, "Value"))
current_packet_mode = "uid"
current_packet_key = value
uids[value] = packet
current_packet_fingerprint = None
current_packet_uid = simplify_user_id(Uid(packet_dump_field(packet, "Value")))
uids[current_packet_uid] = packet
elif packet.name.endswith("--PublicSubkey"):
fingerprint = packet_dump_field(packet, "Fingerprint")
current_packet_mode = "subkey"
current_packet_key = fingerprint
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
current_packet_uid = None
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
if not subkeys.get(certificate_fingerprint):
subkeys |= {certificate_fingerprint: {fingerprint: packet}}
subkeys |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
else:
subkeys[certificate_fingerprint] |= {fingerprint: packet}
subkeys[certificate_fingerprint] |= {current_packet_fingerprint: packet}
elif packet.name.endswith("--Signature"):
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
if not current_packet_key:
raise Exception('missing current packet key for "{packet.name}"')
issuer = packet_dump_field(packet, "Issuer")
signature_type = packet_dump_field(packet, "Type")
if current_packet_mode == "pubkey":
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
direct_revocations = add_packet_to_direct_sigs(
direct_sigs=direct_revocations,
issuer=issuer,
packet_key=current_packet_key,
packet_key=current_packet_fingerprint,
packet=packet,
)
elif signature_type in ["DirectKey", "GenericCertification"]:
direct_sigs = add_packet_to_direct_sigs(
direct_sigs=direct_sigs,
issuer=issuer,
packet_key=current_packet_key,
packet_key=current_packet_fingerprint,
packet=packet,
)
else:
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "uid":
if not current_packet_uid:
raise Exception('missing current packet uid for "{packet.name}"')
if signature_type == "CertificationRevocation":
revocations[current_packet_key].append(packet)
revocations[current_packet_uid].append(packet)
elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer):
uid_binding_sigs[current_packet_key] = packet
uid_binding_sigs[current_packet_uid] = packet
elif signature_type.endswith("Certification"):
if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]):
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
certifications[current_packet_key].append(packet)
certifications[current_packet_uid].append(packet)
else:
debug(f"The certification by issuer {issuer} is not appended because it is not in the filter")
else:
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "subkey":
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "SubkeyBinding":
if not subkey_binding_sigs.get(certificate_fingerprint):
subkey_binding_sigs |= {certificate_fingerprint: {fingerprint: packet}}
subkey_binding_sigs |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
else:
subkey_binding_sigs[certificate_fingerprint] |= {fingerprint: packet}
subkey_binding_sigs[certificate_fingerprint] |= {current_packet_fingerprint: packet}
elif signature_type == "SubkeyRevocation":
if not subkey_revocations.get(certificate_fingerprint):
subkey_revocations |= {certificate_fingerprint: {fingerprint: packet}}
subkey_revocations |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
else:
subkey_revocations[certificate_fingerprint] |= {fingerprint: packet}
subkey_revocations[certificate_fingerprint] |= {current_packet_fingerprint: packet}
else:
raise Exception(f"unknown signature type: {signature_type}")
else:
@ -365,7 +388,7 @@ def convert_certificate( # noqa: ignore=C901
def persist_public_key(
certificate_fingerprint: str,
certificate_fingerprint: Fingerprint,
pubkey: Path,
key_dir: Path,
) -> None:
@ -373,7 +396,7 @@ def persist_public_key(
Parameters
----------
certificate_fingerprint: str
certificate_fingerprint: Fingerprint
The unique fingerprint of the public key
pubkey: Path
The path to the public key of the root key
@ -390,8 +413,8 @@ def persist_public_key(
def persist_uids(
key_dir: Path,
uid_binding_sigs: Dict[str, Path],
uids: Dict[str, Path],
uid_binding_sigs: Dict[Uid, Path],
uids: Dict[Uid, Path],
) -> None:
"""Persist the User IDs that belong to a PublicKey
@ -402,9 +425,9 @@ def persist_uids(
----------
key_dir: Path
The root directory below which the basic key material is persisted
uid_binding_sigs: Dict[str, Path]
uid_binding_sigs: Dict[Uid, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
uids: Dict[Uid, Path]
The User IDs of a Public-Key (the root key)
"""
@ -417,22 +440,22 @@ def persist_uids(
def persist_subkeys(
certificate_fingerprint: str,
certificate_fingerprint: Fingerprint,
key_dir: Path,
subkeys: Dict[str, Dict[str, Path]],
subkey_binding_sigs: Dict[str, Dict[str, Path]],
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]],
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]],
) -> None:
"""Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s)
Parameters
----------
certificate_fingerprint: str
certificate_fingerprint: Fingerprint
The unique fingerprint of the public key
key_dir: Path
The root directory below which the basic key material is persisted
subkeys: Dict[str, Dict[str, Path]]
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]]
The PublicSubkeys of a key
subkey_binding_sigs: Dict[str, Dict[str, Path]]
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]]
The SubkeyBinding signatures of a Public-Key (the root key)
"""
@ -447,19 +470,19 @@ def persist_subkeys(
def persist_subkey_revocations(
certificate_fingerprint: str,
certificate_fingerprint: Fingerprint,
key_dir: Path,
subkey_revocations: Dict[str, Dict[str, Path]],
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]],
) -> None:
"""Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s)
Parameters
----------
certificate_fingerprint: str
certificate_fingerprint: Fingerprint
The unique fingerprint of the public key
key_dir: Path
The root directory below which the basic key material is persisted
subkey_revocations: Dict[str, Dict[str, Path]]
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]]
The SubkeyRevocations of PublicSubkeys of a key
"""
@ -473,7 +496,7 @@ def persist_subkey_revocations(
def persist_direct_sigs(
direct_sigs: Dict[str, Dict[str, List[Path]]],
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]],
pubkey: Path,
key_dir: Path,
sig_type: str = "certification",
@ -483,7 +506,7 @@ def persist_direct_sigs(
Parameters
----------
certifications: Dict[str, List[Path]]
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]]
The certifications to write to file
pubkey: Path
The path to the public key of the root key
@ -503,11 +526,11 @@ def persist_direct_sigs(
def persist_certifications(
certifications: Dict[str, List[Path]],
certifications: Dict[Uid, List[Path]],
pubkey: Path,
key_dir: Path,
uid_binding_sig: Dict[str, Path],
uids: Dict[str, Path],
uid_binding_sig: Dict[Uid, Path],
uids: Dict[Uid, Path],
) -> None:
"""Persist the certifications of a root key to file(s)
@ -517,15 +540,15 @@ def persist_certifications(
Parameters
----------
certifications: Dict[str, List[Path]]
certifications: Dict[Uid, List[Path]]
The certifications to write to file
pubkey: Path
The path to the public key of the root key
key_dir: Path
The root directory below which certifications are persisted
uid_binding_sig: Dict[str, Path]
uid_binding_sig: Dict[Uid, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
uids: Dict[Uid, Path]
The User IDs of a Public-Key (the root key)
"""
@ -550,10 +573,10 @@ def persist_certifications(
def persist_revocations(
pubkey: Path,
revocations: Dict[str, List[Path]],
revocations: Dict[Uid, List[Path]],
key_dir: Path,
uid_binding_sig: Dict[str, Path],
uids: Dict[str, Path],
uid_binding_sig: Dict[Uid, Path],
uids: Dict[Uid, Path],
) -> None:
"""Persist the revocations of a root key to file(s)
@ -564,13 +587,13 @@ def persist_revocations(
----------
pubkey: Path
The path to the public key of the root key
revocations: Dict[str, List[Path]]
revocations: Dict[Uid, List[Path]]
The revocations to write to file
key_dir: Path
The root directory below which revocations will be persisted
uid_binding_sig: Dict[str, Path]
uid_binding_sig: Dict[Uid, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
uids: Dict[Uid, Path]
The User IDs of a Public-Key (the root key)
"""
@ -710,24 +733,24 @@ def packet_join(packets: List[Path], output: Path, force: bool = False) -> None:
system(cmd, exit_on_error=False)
def simplify_user_id(user_id: str) -> str:
def simplify_user_id(user_id: Uid) -> Uid:
"""Simplify the User ID string to contain more filesystem friendly characters
Parameters
----------
user_id: str
user_id: Uid
A User ID string (e.g. 'Foobar McFooface <foobar@foo.face>')
Returns
-------
str
Uid
The simplified representation of user_id
"""
user_id = user_id.replace("@", "_at_")
user_id = sub("[<>]", "", user_id)
user_id = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id)
return user_id
user_id_str: str = user_id.replace("@", "_at_")
user_id_str = sub("[<>]", "", user_id_str)
user_id_str = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id_str)
return Uid(user_id_str)
def derive_user_from_target(working_dir: Path, target_dir: Path, certificate: Path) -> Optional[str]:
@ -807,7 +830,7 @@ def convert(
source: Path,
target_dir: Path,
name_override: Optional[str] = None,
fingerprint_filter: Optional[Set[str]] = None,
fingerprint_filter: Optional[Set[Fingerprint]] = None,
) -> Path:
"""Convert a path containing PGP certificate material to a decomposed directory structure
@ -823,7 +846,7 @@ def convert(
A directory path to write the new directory structure to
name_override: Optional[str]
An optional username override for the call to `convert_certificate()`
fingerprint_filter: Optional[Set[str]]
fingerprint_filter: Optional[Set[Fingerprint]]
An optional set of strings defining fingerprints of PGP public keys that all certificates will be filtered with
Returns
@ -897,7 +920,7 @@ def temp_join_keys(sources: List[Path], temp_dir: Path, force: bool) -> List[Pat
return certs
def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[str], List[str]]:
def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[Fingerprint], List[Fingerprint]]:
"""Get the fingerprints of all public keys and all fingerprints of all (self) revoked public keys in a directory
Parameters
@ -912,8 +935,10 @@ def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[str], List[str]]:
fingerprints of all self-revoked public keys
"""
all_fingerprints: List[str] = []
revoked_fingerprints: List[str] = []
all_fingerprints: List[Fingerprint] = []
revoked_fingerprints: List[Fingerprint] = []
# TODO: what about direct key revocations/signatures?
debug(f"Retrieving all and self-revoked certificates from {[str(cert_dir) for cert_dir in certs]}")
for cert_collection in certs:
@ -921,16 +946,17 @@ def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[str], List[str]]:
for user_dir in cert_collection.iterdir():
if user_dir.is_dir():
for cert_dir in user_dir.iterdir():
all_fingerprints.append(cert_dir.stem)
cert_fingerprint = Fingerprint(cert_dir.stem)
all_fingerprints.append(cert_fingerprint)
for revocation_cert in cert_dir.glob("revocation/*.asc"):
if cert_dir.stem.endswith(revocation_cert.stem):
debug(f"Revoking {cert_dir.stem} due to self-revocation")
revoked_fingerprints.append(cert_dir.stem)
if cert_fingerprint.endswith(revocation_cert.stem):
debug(f"Revoking {cert_fingerprint} due to self-revocation")
revoked_fingerprints.append(cert_fingerprint)
return (all_fingerprints, revoked_fingerprints)
def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[str], List[str]]:
def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]:
"""Export ownertrust from a set of keys
The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all
@ -946,7 +972,7 @@ def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[str], List[
The file path to write to
"""
(all_certs, revoked_certs) = get_all_and_revoked_certs(certs=certs)
all_certs, revoked_certs = get_all_and_revoked_certs(certs=certs)
trusted_certs = [cert for cert in all_certs if cert not in revoked_certs]
with open(file=output, mode="w") as trusted_certs_file:
@ -957,7 +983,7 @@ def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[str], List[
return (trusted_certs, all_certs)
def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_revoker: int = 2) -> None:
def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None:
"""Export the PGP revoked status from a set of keys
The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations
@ -969,7 +995,7 @@ def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_re
----------
certs: List[Path]
A list of directories with keys to check for their revocation status
main_keys: List[str]
main_keys: List[Fingerprint]
A list of strings representing the fingerprints of (current and/or revoked) main keys
output: Path
The file path to write to
@ -978,7 +1004,7 @@ def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_re
(defaults to 2)
"""
(all_certs, revoked_certs) = get_all_and_revoked_certs(certs=certs)
all_certs, revoked_certs = get_all_and_revoked_certs(certs=certs)
debug(f"Retrieving certificates revoked by main keys from {[str(cert_dir) for cert_dir in certs]}")
foreign_revocations: Dict[str, List[str]] = {}
@ -1003,7 +1029,7 @@ def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_re
f"Revoking {cert_dir.name} due to {set(foreign_revocations[cert_dir.stem])} "
"being main key revocations"
)
revoked_certs.append(cert_dir.stem)
revoked_certs.append(Fingerprint(cert_dir.stem))
with open(file=output, mode="w") as trusted_certs_file:
for cert in set(revoked_certs):
@ -1011,7 +1037,7 @@ def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_re
trusted_certs_file.write(f"{cert}\n")
def get_fingerprints_from_import_source(working_dir: Path, source: Path) -> List[str]:
def get_fingerprints_from_import_source(working_dir: Path, source: Path) -> List[Fingerprint]:
"""Get all fingerprints of PGP public keys from import file(s)
Parameters
@ -1023,24 +1049,24 @@ def get_fingerprints_from_import_source(working_dir: Path, source: Path) -> List
Returns
-------
List[str]
List[Fingerprint]
A list of strings representing the fingerprints of PGP public keys found in source
"""
fingerprints: List[str] = []
fingerprints: List[Fingerprint] = []
keys: List[Path] = list(source.iterdir()) if source.is_dir() else [source]
for key in keys:
for certificate in keyring_split(working_dir=working_dir, keyring=key):
for packet in packet_split(working_dir=working_dir, certificate=certificate):
if packet.name.endswith("--PublicKey"):
fingerprints += [packet_dump_field(packet, "Fingerprint")]
fingerprints += [Fingerprint(packet_dump_field(packet, "Fingerprint"))]
debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}")
return fingerprints
def get_fingerprints_from_decomposed_dir(path: Path) -> List[str]:
def get_fingerprints_from_decomposed_dir(path: Path) -> List[Fingerprint]:
"""Get all fingerprints of PGP public keys from a decomposed directory structure
Parameters
@ -1050,16 +1076,16 @@ def get_fingerprints_from_decomposed_dir(path: Path) -> List[str]:
Returns
-------
List[str]
List[Fingerprint]
A list of strings representing all fingerprints of PGP public keys below path
"""
fingerprints = [path.stem for path in list(path.absolute().glob("*/*"))]
fingerprints = [Fingerprint(path.stem) for path in list(path.absolute().glob("*/*"))]
debug(f"Fingerprints of PGP public keys in {path}: {fingerprints}")
return fingerprints
def get_fingerprints(working_dir: Path, input_path: Path, decomposed_paths: List[Path]) -> Set[str]:
def get_fingerprints(working_dir: Path, input_path: Path, decomposed_paths: List[Path]) -> Set[Fingerprint]:
"""Get the fingerprints of PGP public keys from input paths and decomposed directory structures
@ -1074,11 +1100,11 @@ def get_fingerprints(working_dir: Path, input_path: Path, decomposed_paths: List
Returns
-------
Set[str]
Set[Fingerprint]
A set of strings describing fingerprints of PGP public keys
"""
fingerprints: Set[str] = set()
fingerprints: Set[Fingerprint] = set()
fingerprints.update(
get_fingerprints_from_import_source(