#!/usr/bin/env python # # SPDX-License-Identifier: GPL-3.0-or-later from argparse import ArgumentParser from collections import defaultdict from contextlib import contextmanager from logging import DEBUG from logging import basicConfig from logging import debug from logging import error from os import chdir from os import getcwd from pathlib import Path from re import escape from re import split from re import sub from shutil import copytree from subprocess import PIPE from subprocess import CalledProcessError from subprocess import check_output from sys import exit from sys import stderr from tempfile import TemporaryDirectory from tempfile import mkdtemp from typing import Dict from typing import Iterable from typing import Iterator from typing import List from typing import NewType from typing import Optional from typing import Set from typing import Tuple from typing import Union Fingerprint = NewType("Fingerprint", str) Uid = NewType("Uid", str) Username = NewType("Username", str) @contextmanager def cwd(new_dir: Path) -> Iterator[None]: """Change to a new current working directory in a context and go back to the previous dir after the context is done Parameters ---------- new_dir: Path A path to change to """ previous_dir = getcwd() chdir(new_dir) try: yield finally: chdir(previous_dir) def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]: """Sort an Iterable of Paths naturally Parameters ---------- _list: Iterable[Path] An iterable containing paths to be sorted Return ------ Iterable[Path] An Iterable of paths that are naturally sorted """ def convert_text_chunk(text: str) -> Union[int, str]: """Convert input text to int or str Parameters ---------- text: str An input string Returns ------- Union[int, str] Either an integer if text is a digit, else text in lower-case representation """ return int(text) if text.isdigit() else text.lower() def alphanum_key(key: Path) -> List[Union[int, str]]: """Retrieve an alphanumeric key from a Path, that can be used in sorted() Parameters ---------- key: Path A path for which to create a key Returns ------- List[Union[int, str]] A list of either int or str objects that may serve as 'key' argument for sorted() """ return [convert_text_chunk(c) for c in split("([0-9]+)", str(key.name))] return sorted(_list, key=alphanum_key) def system(cmd: List[str], exit_on_error: bool = True) -> str: """Execute a command using check_output Parameters ---------- cmd: List[str] A list of strings to be fed to check_output exit_on_error: bool Whether to exit the script when encountering an error (defaults to True) Raises ------ CalledProcessError If not exit_on_error and `check_output()` encounters an error Returns ------- str The output of cmd """ try: return check_output(cmd, stderr=PIPE).decode() except CalledProcessError as e: stderr.buffer.write(e.stderr) if exit_on_error: exit(e.returncode) raise e # TODO: simplify to lower complexity def convert_certificate( # noqa: ignore=C901 working_dir: Path, certificate: Path, keyring_dir: Path, name_override: Optional[Username] = None, fingerprint_filter: Optional[Set[Fingerprint]] = None, ) -> Path: """Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets The output directory structure is created per user. The username is derived from the certificate via `derive_username_from_fingerprint` or overridden via `name_override`. Below the username directory a directory tree describes the public keys components split up into certifications and revocations, as well as per subkey and per uid certifications and revocations. Parameters ---------- working_dir: Path The path of the working directory below which to create split certificates certificate: Path The path to a public key certificate keyring_dir: Path The path of the keyring used to try to derive the username from the public key fingerprint name_override: Optional[Username] An optional string to override the username in the to be created output directory structure fingerprint_filter: Optional[Set[Fingerprint]] An optional list of strings defining fingerprints of PGP public keys that all certificates will be filtered with Raises ------ Exception If required PGP packets are not found Returns ------- Path The path of the user_dir (which is located below working_dir) """ # root packets certificate_fingerprint: Optional[Fingerprint] = None pubkey: Optional[Path] = None direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) # subkey packets subkeys: Dict[Fingerprint, Path] = {} subkey_binding_sigs: Dict[Fingerprint, Path] = {} subkey_revocations: Dict[Fingerprint, Path] = {} # uid packets uids: Dict[Uid, Path] = {} uid_binding_sigs: Dict[Uid, Path] = {} certifications: Dict[Uid, List[Path]] = defaultdict(list) revocations: Dict[Uid, List[Path]] = defaultdict(list) # intermediate variables current_packet_mode: Optional[str] = None current_packet_fingerprint: Optional[Fingerprint] = None current_packet_uid: Optional[Uid] = None # XXX: PrimaryKeyBinding # TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean debug(f"Processing certificate {certificate}") for packet in packet_split(working_dir=working_dir, certificate=certificate): debug(f"Processing packet {packet.name}") if packet.name.endswith("--PublicKey"): current_packet_mode = "pubkey" current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint")) current_packet_uid = None certificate_fingerprint = current_packet_fingerprint pubkey = packet elif packet.name.endswith("--UserID"): current_packet_mode = "uid" current_packet_fingerprint = None current_packet_uid = simplify_user_id(Uid(packet_dump_field(packet, "Value"))) uids[current_packet_uid] = packet elif packet.name.endswith("--PublicSubkey"): current_packet_mode = "subkey" current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint")) current_packet_uid = None subkeys[current_packet_fingerprint] = packet elif packet.name.endswith("--Signature"): if not certificate_fingerprint: raise Exception('missing certificate fingerprint for "{packet.name}"') issuer: Fingerprint = Fingerprint(packet_dump_field(packet, "Issuer")) signature_type = packet_dump_field(packet, "Type") if current_packet_mode == "pubkey": if not current_packet_fingerprint: raise Exception('missing current packet fingerprint for "{packet.name}"') if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer): direct_revocations[issuer].append(packet) elif signature_type in ["DirectKey", "GenericCertification"]: direct_sigs[issuer].append(packet) else: raise Exception(f"unknown signature type: {signature_type}") elif current_packet_mode == "uid": if not current_packet_uid: raise Exception('missing current packet uid for "{packet.name}"') if signature_type == "CertificationRevocation": revocations[current_packet_uid].append(packet) elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer): uid_binding_sigs[current_packet_uid] = packet elif signature_type.endswith("Certification"): if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]): debug(f"The certification by issuer {issuer} is appended as it is found in the filter.") certifications[current_packet_uid].append(packet) else: debug(f"The certification by issuer {issuer} is not appended because it is not in the filter") else: raise Exception(f"unknown signature type: {signature_type}") elif current_packet_mode == "subkey": if not current_packet_fingerprint: raise Exception('missing current packet fingerprint for "{packet.name}"') if signature_type == "SubkeyBinding": subkey_binding_sigs[current_packet_fingerprint] = packet elif signature_type == "SubkeyRevocation": subkey_revocations[certificate_fingerprint] = packet else: raise Exception(f"unknown signature type: {signature_type}") else: raise Exception(f'unknown signature root for "{packet.name}"') else: raise Exception(f'unknown packet type "{packet.name}"') if not certificate_fingerprint: raise Exception("missing certificate fingerprint") if not pubkey: raise Exception("missing certificate public-key") name_override = ( name_override or derive_username_from_fingerprint(keyring_dir=keyring_dir, certificate_fingerprint=certificate_fingerprint) or Username(certificate.stem) ) user_dir = working_dir / name_override key_dir = user_dir / certificate_fingerprint key_dir.mkdir(parents=True, exist_ok=True) persist_public_key( certificate_fingerprint=certificate_fingerprint, pubkey=pubkey, key_dir=key_dir, ) persist_uids( key_dir=key_dir, uid_binding_sigs=uid_binding_sigs, uids=uids, ) persist_subkeys( key_dir=key_dir, subkeys=subkeys, subkey_binding_sigs=subkey_binding_sigs, ) persist_subkey_revocations( key_dir=key_dir, subkey_revocations=subkey_revocations, ) persist_direct_sigs( direct_sigs=direct_sigs, pubkey=pubkey, key_dir=key_dir, ) persist_direct_sigs( direct_sigs=direct_revocations, pubkey=pubkey, key_dir=key_dir, sig_type="revocation", ) persist_certifications( certifications=certifications, pubkey=pubkey, key_dir=key_dir, uid_binding_sig=uid_binding_sigs, uids=uids, ) persist_revocations( pubkey=pubkey, revocations=revocations, key_dir=key_dir, uid_binding_sig=uid_binding_sigs, uids=uids, ) return user_dir def persist_public_key( certificate_fingerprint: Fingerprint, pubkey: Path, key_dir: Path, ) -> None: """Persist the Public-Key packet Parameters ---------- certificate_fingerprint: Fingerprint The unique fingerprint of the public key pubkey: Path The path to the public key of the root key key_dir: Path The root directory below which the basic key material is persisted """ packets: List[Path] = [pubkey] output_file = key_dir / f"{certificate_fingerprint}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") packet_join(packets, output_file) def persist_uids( key_dir: Path, uid_binding_sigs: Dict[Uid, Path], uids: Dict[Uid, Path], ) -> None: """Persist the User IDs that belong to a PublicKey The User ID material consists of PublicSubkeys and their SubkeyBindings. The files are written to a UID specific directory and file below key_dir/uid. Parameters ---------- key_dir: Path The root directory below which the basic key material is persisted uid_binding_sigs: Dict[Uid, Path] The PositiveCertifications of a User ID and Public-Key packet uids: Dict[Uid, Path] The User IDs of a Public-Key (the root key) """ for key in uid_binding_sigs.keys(): packets = [uids[key], uid_binding_sigs[key]] output_file = key_dir / "uid" / key / f"{key}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") packet_join(packets, output_file) def persist_subkeys( key_dir: Path, subkeys: Dict[Fingerprint, Path], subkey_binding_sigs: Dict[Fingerprint, Path], ) -> None: """Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s) Parameters ---------- key_dir: Path The root directory below which the basic key material is persisted subkeys: Dict[Fingerprint, Path] The PublicSubkeys of a key subkey_binding_sigs: Dict[Fingerprint, Path] The SubkeyBinding signatures of a Public-Key (the root key) """ for fingerprint, subkey in subkeys.items(): packets: List[Path] = [] packets.extend([subkey, subkey_binding_sigs[fingerprint]]) output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") packet_join(packets=packets, output=output_file) def persist_subkey_revocations( key_dir: Path, subkey_revocations: Dict[Fingerprint, Path], ) -> None: """Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s) Parameters ---------- key_dir: Path The root directory below which the basic key material is persisted subkey_revocations: Dict[Fingerprint, Path] The SubkeyRevocations of PublicSubkeys of a key """ for fingerprint, revocation in subkey_revocations.items(): issuer = packet_dump_field(revocation, "Issuer") output_file = key_dir / "subkey" / fingerprint / "revocation" / f"{issuer}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) debug(f"Writing file {output_file} from {revocation}") packet_join(packets=[revocation], output=output_file) def persist_direct_sigs( direct_sigs: Dict[Fingerprint, List[Path]], pubkey: Path, key_dir: Path, sig_type: str = "certification", ) -> None: """Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to file(s) Parameters ---------- direct_sigs: Dict[Fingerprint, List[Path]] The direct sigs to write to file pubkey: Path The path to the public key of the root key key_dir: Path The root directory below which the Directkeys are persisted sig_type: str The type of direct certification to persist (defaults to 'certification'). This influences the directory name """ for issuer, certifications in direct_sigs.items(): packets = [pubkey] + certifications output_file = key_dir / sig_type / f"{issuer}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") packet_join(packets, output_file) def persist_certifications( certifications: Dict[Uid, List[Path]], pubkey: Path, key_dir: Path, uid_binding_sig: Dict[Uid, Path], uids: Dict[Uid, Path], ) -> None: """Persist the certifications of a root key to file(s) The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and PositiveCertifications for all User IDs of the given root key. All certifications are persisted in per User ID certification directories below key_dir. Parameters ---------- certifications: Dict[Uid, List[Path]] The certifications to write to file pubkey: Path The path to the public key of the root key key_dir: Path The root directory below which certifications are persisted uid_binding_sig: Dict[Uid, Path] The PositiveCertifications of a User ID and Public-Key packet uids: Dict[Uid, Path] The User IDs of a Public-Key (the root key) """ for key, current_certifications in certifications.items(): for certification in current_certifications: certification_dir = key_dir / "uid" / key / "certification" certification_dir.mkdir(parents=True, exist_ok=True) issuer = packet_dump_field(certification, "Issuer") if uids.get(key) and uid_binding_sig.get(key): packets = [pubkey, uids[key], uid_binding_sig[key], certification] output_file = certification_dir / f"{issuer}.asc" debug(f"Writing file {output_file} from {certification}") packet_join(packets, output_file) else: error( f"Public key '{pubkey}' does not provide " f"{'the UID binding signature' if not uid_binding_sig.get(key) else ''} for UID '{key}', " "so its certifications can not be used!" ) def persist_revocations( pubkey: Path, revocations: Dict[Uid, List[Path]], key_dir: Path, uid_binding_sig: Dict[Uid, Path], uids: Dict[Uid, Path], ) -> None: """Persist the revocations of a root key to file(s) The revocations include all CertificationRevocations for all User IDs of the given root key. All revocations are persisted in per User ID 'revocation' directories below key_dir. Parameters ---------- pubkey: Path The path to the public key of the root key revocations: Dict[Uid, List[Path]] The revocations to write to file key_dir: Path The root directory below which revocations will be persisted uid_binding_sig: Dict[Uid, Path] The PositiveCertifications of a User ID and Public-Key packet uids: Dict[Uid, Path] The User IDs of a Public-Key (the root key) """ for key, current_revocations in revocations.items(): for revocation in current_revocations: revocation_dir = key_dir / "uid" / key / "revocation" revocation_dir.mkdir(parents=True, exist_ok=True) issuer = packet_dump_field(revocation, "Issuer") packets = [pubkey, uids[key]] # Binding sigs only exist for 3rd-party revocations if key in uid_binding_sig: packets.append(uid_binding_sig[key]) packets.append(revocation) output_file = revocation_dir / f"{issuer}.asc" debug(f"Writing file {output_file} from {revocation}") packet_join(packets, output_file) def packet_dump(packet: Path) -> str: """Dump a PGP packet to string The `sq packet dump` command is used to retrieve a dump of information from a PGP packet Parameters ---------- packet: Path The path to the PGP packet to retrieve the value from Returns ------- str The contents of the packet dump """ return system(["sq", "packet", "dump", str(packet)]) def packet_dump_field(packet: Path, field: str) -> str: """Retrieve the value of a field from a PGP packet Parameters ---------- packet: Path The path to the PGP packet to retrieve the value from field: str The name of the field Raises ------ Exception If the field is not found in the PGP packet Returns ------- str The value of the field found in packet """ dump = packet_dump(packet) lines = [line.strip() for line in dump.splitlines()] lines = list(filter(lambda line: line.startswith(f"{field}: "), lines)) if not lines: raise Exception(f'Packet has no field "{field}"') return lines[0].split(maxsplit=1)[1] def keyring_split(working_dir: Path, keyring: Path) -> Iterable[Path]: """Split a file containing a PGP keyring into separate certificate files The original keyring filename is preserved if the split only yields a single certificate. The file is split using sq. Parameters ---------- working_dir: Path The path of the working directory below which to create the output files keyring: Path The path of a file containing a PGP keyring Returns ------- Iterable[Path] An iterable over the naturally sorted list of certificate files derived from a keyring """ keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute() with cwd(keyring_dir): system(["sq", "keyring", "split", "--prefix", "", str(keyring)]) keyrings: List[Path] = list(natural_sort_path(keyring_dir.iterdir())) if 1 == len(keyrings): keyrings[0] = keyrings[0].rename(keyrings[0].parent / keyring.name) return keyrings def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]: """Split a file containing a PGP certificate into separate packet files The files are split using sq Parameters ---------- working_dir: Path The path of the working directory below which to create the output files certificate: Path The absolute path of a file containing one PGP certificate Returns ------- Iterable[Path] An iterable over the naturally sorted list of packet files derived from certificate """ packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute() with cwd(packet_dir): system(["sq", "packet", "split", "--prefix", "", str(certificate)]) return natural_sort_path(packet_dir.iterdir()) def packet_join(packets: List[Path], output: Path, force: bool = False) -> None: """Join PGP packet data in files to a single output file Parameters ---------- packets: List[Path] A list of paths to files that contain PGP packet data output: Path A file to which all PGP packet data is written force: bool Whether to force the execution of sq (defaults to False) """ cmd = ["sq", "packet", "join"] if force: cmd.insert(1, "--force") packets_str = list(map(lambda path: str(path), packets)) cmd.extend(packets_str) cmd.extend(["--output", str(output)]) system(cmd, exit_on_error=False) def simplify_user_id(user_id: Uid) -> Uid: """Simplify the User ID string to contain more filesystem friendly characters Parameters ---------- user_id: Uid A User ID string (e.g. 'Foobar McFooface ') Returns ------- Uid The simplified representation of user_id """ user_id_str: str = user_id.replace("@", "_at_") user_id_str = sub("[<>]", "", user_id_str) user_id_str = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id_str) return Uid(user_id_str) def derive_username_from_fingerprint(keyring_dir: Path, certificate_fingerprint: Fingerprint) -> Optional[Username]: """Attempt to derive the username of a public key fingerprint from a keyring directory Parameters ---------- keyring_dir: Path The directory in which to look up a username certificate_fingerprint: Fingerprint The public key fingerprint to derive the username from Raises ------ Exception If more than one username is found (a public key can only belong to one individual) Returns ------- Optional[Username] A string representing the username a public key certificate belongs to, None otherwise """ matches = list(keyring_dir.glob(f"*/*{certificate_fingerprint}")) if len(matches) > 1: raise Exception( f"More than one username found in {keyring_dir} when probing for fingerprint '{certificate_fingerprint}': " f"{matches}" ) elif not matches: debug(f"Can not derive username from target directory for fingerprint {certificate_fingerprint}") return None else: username = matches[0].parent.stem debug( f"Successfully derived username '{username}' from target directory for fingerprint " f"{certificate_fingerprint}" ) return Username(username) def convert( working_dir: Path, source: Path, target_dir: Path, name_override: Optional[Username] = None, fingerprint_filter: Optional[Set[Fingerprint]] = None, ) -> Path: """Convert a path containing PGP certificate material to a decomposed directory structure Any input is first split by `keyring_split()` into individual certificates. Parameters ---------- working_dir: Path A directory to use for temporary files source: Path A path to a file or directory to decompose target_dir: Path A directory path to write the new directory structure to name_override: Optional[Username] An optional username override for the call to `convert_certificate()` fingerprint_filter: Optional[Set[Fingerprint]] An optional set of strings defining fingerprints of PGP public keys that all certificates will be filtered with Returns ------- Path The directory that contains the resulting directory structure (target_dir) """ directories: List[Path] = [] keys: Iterable[Path] = source.iterdir() if source.is_dir() else [source] for key in keys: for cert in keyring_split(working_dir=working_dir, keyring=key): directories.append( convert_certificate( working_dir=working_dir, certificate=cert, keyring_dir=target_dir, name_override=name_override, fingerprint_filter=fingerprint_filter, ) ) for path in directories: (target_dir / path.name).mkdir(parents=True, exist_ok=True) copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True) return target_dir def temp_join_keys(sources: List[Path], temp_dir: Path, force: bool) -> List[Path]: """Temporarily join the key material of a given set of keys in a temporary location and return their paths Parameters ---------- sources: List[Path] A list of paths below which PGP packets are found temp_dir: Path The temporary directory below which to join PGP keys force: bool Whether to force the joining of files """ certs: List[Path] = [] for source_number, source in enumerate(sources): if source.is_dir(): for user_number, user_dir in enumerate(sorted(source.iterdir())): if user_dir.is_dir(): for user_cert_number, user_cert_dir in enumerate(sorted(user_dir.iterdir())): if user_cert_dir.is_dir(): cert_path = temp_dir / ( f"{str(source_number).zfill(4)}" f"-{str(user_number).zfill(4)}" f"-{str(user_cert_number).zfill(4)}.asc" ) debug(f"Joining {user_dir.name}/{user_cert_dir.name} in {cert_path}.") packet_join( packets=sorted(user_cert_dir.glob("**/*.asc")), output=cert_path, force=force, ) certs.append(cert_path) elif source.is_file() and not source.is_symlink(): certs.append(source) return certs def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[Fingerprint], List[Fingerprint]]: """Get the fingerprints of all public keys and all fingerprints of all (self) revoked public keys in a directory Parameters ---------- certs: List[Path] The certificates to trust Returns ------- Tuple[List[str], List[str]] A tuple with the first item containing the fingerprints of all public keys and the second item containing the fingerprints of all self-revoked public keys """ all_fingerprints: List[Fingerprint] = [] revoked_fingerprints: List[Fingerprint] = [] # TODO: what about direct key revocations/signatures? debug(f"Retrieving all and self-revoked certificates from {[str(cert_dir) for cert_dir in certs]}") for cert_collection in certs: if cert_collection.is_dir(): for user_dir in cert_collection.iterdir(): if user_dir.is_dir(): for cert_dir in user_dir.iterdir(): cert_fingerprint = Fingerprint(cert_dir.stem) all_fingerprints.append(cert_fingerprint) for revocation_cert in cert_dir.glob("revocation/*.asc"): if cert_fingerprint.endswith(revocation_cert.stem): debug(f"Revoking {cert_fingerprint} due to self-revocation") revoked_fingerprints.append(cert_fingerprint) return all_fingerprints, revoked_fingerprints def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]: """Export ownertrust from a set of keys The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all non-revoked keys as fully trusted. The exported file is used by pacman-key when importing a keyring (see https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT). Parameters ---------- certs: List[Path] The certificates to trust output: Path The file path to write to """ all_certs, revoked_certs = get_all_and_revoked_certs(certs=certs) trusted_certs = [cert for cert in all_certs if cert not in revoked_certs] with open(file=output, mode="w") as trusted_certs_file: for cert in trusted_certs: debug(f"Writing {cert} to {output}") trusted_certs_file.write(f"{cert}:4:\n") return trusted_certs, all_certs def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None: """Export the PGP revoked status from a set of keys The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations by any main key exist. The exported file is used by pacman-key when importing a keyring (see https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT). Parameters ---------- certs: List[Path] A list of directories with keys to check for their revocation status main_keys: List[Fingerprint] A list of strings representing the fingerprints of (current and/or revoked) main keys output: Path The file path to write to min_revoker: int The minimum amount of revocation certificates on a User ID from any main key to deem a public key as revoked (defaults to 2) """ all_certs, revoked_certs = get_all_and_revoked_certs(certs=certs) debug(f"Retrieving certificates revoked by main keys from {[str(cert_dir) for cert_dir in certs]}") foreign_revocations: Dict[str, List[str]] = {} for cert_collection in certs: if cert_collection.is_dir(): for user_dir in cert_collection.iterdir(): if user_dir.is_dir(): for cert_dir in user_dir.iterdir(): debug(f"Inspecting public key {cert_dir.name}") foreign_revocations[cert_dir.stem] = [] for revocation_cert in cert_dir.glob("uid/*/revocation/*.asc"): foreign_revocations[cert_dir.stem] += [ revocation_cert.stem for main_key in main_keys if main_key.endswith(revocation_cert.stem) ] # TODO: find a better (less naive) approach, as this would also match on public certificates, # where some UIDs are signed and others are revoked if len(set(foreign_revocations[cert_dir.stem])) >= min_revoker: debug( f"Revoking {cert_dir.name} due to {set(foreign_revocations[cert_dir.stem])} " "being main key revocations" ) revoked_certs.append(Fingerprint(cert_dir.stem)) with open(file=output, mode="w") as trusted_certs_file: for cert in set(revoked_certs): debug(f"Writing {cert} to {output}") trusted_certs_file.write(f"{cert}\n") def get_fingerprints_from_import_source(working_dir: Path, source: Path) -> List[Fingerprint]: """Get all fingerprints of PGP public keys from import file(s) Parameters ---------- working_dir: Path A directory to use for temporary files source: Path The path to a source file or directory Returns ------- List[Fingerprint] A list of strings representing the fingerprints of PGP public keys found in source """ fingerprints: List[Fingerprint] = [] keys: List[Path] = list(source.iterdir()) if source.is_dir() else [source] for key in keys: for certificate in keyring_split(working_dir=working_dir, keyring=key): for packet in packet_split(working_dir=working_dir, certificate=certificate): if packet.name.endswith("--PublicKey"): fingerprints += [Fingerprint(packet_dump_field(packet, "Fingerprint"))] debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}") return fingerprints def get_fingerprints_from_decomposed_dir(path: Path) -> List[Fingerprint]: """Get all fingerprints of PGP public keys from a decomposed directory structure Parameters ---------- path: Path The path to a decomposed directory structure Returns ------- List[Fingerprint] A list of strings representing all fingerprints of PGP public keys below path """ fingerprints = [Fingerprint(path.stem) for path in list(path.absolute().glob("*/*"))] debug(f"Fingerprints of PGP public keys in {path}: {fingerprints}") return fingerprints def get_fingerprints(working_dir: Path, decomposed_paths: List[Path]) -> Set[Fingerprint]: """Get the fingerprints of PGP public keys from input paths and decomposed directory structures Parameters ---------- working_dir: Path A directory to use for temporary files decomposed_paths: List[Path] A list of paths that identify decomposed PGP data in directory structures Returns ------- Set[Fingerprint] A set of strings describing fingerprints of PGP public keys """ fingerprints: Set[Fingerprint] = set() fingerprints.update( get_fingerprints_from_import_source( working_dir=working_dir, source=args.source, ) ) for decomposed_path in decomposed_paths: fingerprints.update(get_fingerprints_from_decomposed_dir(path=decomposed_path)) return fingerprints def export_keyring( working_dir: Path, main: List[Path], sources: List[Path], output: Path, force: bool, pacman_integration: bool, ) -> None: """Export all provided PGP packet files to a single output file If sources contains directories, any .asc files below them are considered. Parameters ---------- working_dir: Path A directory to use for temporary files main: List[Path] A list of directories or files from which to read PGP packet information, that is considered as public keys that are used to sign those found in sources sources: List[Path] A list of directories or files from which to read PGP packet information output: Path An output file that all PGP packet data is written to force: bool Whether to force the execution of packet_join() pacman_integration: bool Whether to write pacman compatible trust files """ main = [source.absolute() for source in main] sources = [source.absolute() for source in sources] output = output.absolute() main_certs = temp_join_keys(sources=main, temp_dir=Path(mkdtemp(dir=working_dir)).absolute(), force=force) sources_certs = temp_join_keys(sources=sources, temp_dir=Path(mkdtemp(dir=working_dir)).absolute(), force=force) debug( f"Creating keyring {output} from {[str(source_dir) for source_dir in main]} " f"and {[str(source_dir) for source_dir in sources]}." ) output.parent.mkdir(parents=True, exist_ok=True) cmd = ["sq", "keyring", "merge", "-o", str(output)] if force: cmd.insert(1, "--force") cmd += [str(cert) for cert in sorted(main_certs)] cmd += [str(cert) for cert in sorted(sources_certs)] system(cmd, exit_on_error=False) if pacman_integration: [trusted_main_keys, all_main_keys] = export_ownertrust( certs=main, output=Path(f"{str(output).split('.gpg')[0]}-trusted"), ) export_revoked( certs=main + sources, main_keys=all_main_keys, output=Path(f"{str(output).split('.gpg')[0]}-revoked"), ) def absolute_path(path: str) -> Path: """Return the absolute path of a given str Parameters ---------- path: str A string representing a path Returns ------- Path The absolute path representation of path """ return Path(path).absolute() if __name__ == "__main__": parser = ArgumentParser() parser.add_argument( "-v", "--verbose", action="store_true", help="Causes to print debugging messages about the progress" ) parser.add_argument("--wait", action="store_true", help="Block before cleaning up the temp directory") parser.add_argument( "-f", "--force", action="store_true", default=False, help="force the execution of subcommands (e.g. overwriting of files)", ) subcommands = parser.add_subparsers(dest="subcommand") convert_parser = subcommands.add_parser( "convert", help="convert one or multiple PGP public keys to a decomposed directory structure", ) convert_parser.add_argument("source", type=absolute_path, help="File or directory to convert") convert_parser.add_argument("--target", type=absolute_path, help="target directory") convert_parser.add_argument( "--name", type=Username, default=None, help="override the username to use (only useful when using a single file as source)", ) import_parser = subcommands.add_parser( "import", help="import one or several PGP keys to the keyring directory structure", ) import_parser.add_argument("source", type=absolute_path, help="File or directory") import_parser.add_argument( "--name", type=Username, default=None, help="override the username to use (only useful when using a single file as source)", ) import_parser.add_argument("--main", action="store_true", help="Import a main signing key into the keyring") export_keyring_parser = subcommands.add_parser( "export-keyring", help="export PGP packet data below main/ and packager/ to output/archlinux.gpg alongside pacman integration", ) export_parser = subcommands.add_parser( "export", help="export a directory structure of PGP packet data to a combined file", ) export_parser.add_argument("output", type=absolute_path, help="file to write PGP packet data to") export_parser.add_argument( "-m", "--main", action="append", help="files or directories containing PGP packet data that is trusted (can be provided multiple times)", required=True, type=absolute_path, ) export_parser.add_argument( "-s", "--source", action="append", help="files or directories containing PGP packet data (can be provided multiple times)", required=True, type=absolute_path, ) export_parser.add_argument( "-p", "--pacman-integration", action="store_true", default=False, help="export trusted and revoked files (used by pacman) alongside the keyring", ) args = parser.parse_args() if args.verbose: basicConfig(level=DEBUG) # temporary working directory that gets auto cleaned with TemporaryDirectory(prefix="arch-keyringctl-") as tempdir: keyring_root = Path("keyring").absolute() working_dir = Path(tempdir) debug(f"Working directory: {working_dir}") with cwd(working_dir): if "convert" == args.subcommand: print(convert(working_dir, args.source, target_dir=Path(mkdtemp(prefix="arch-keyringctl-")).absolute())) elif "import" == args.subcommand: target_dir = "main" if args.main else "packager" print( convert( working_dir=working_dir, source=args.source, target_dir=keyring_root / target_dir, name_override=args.name, fingerprint_filter=get_fingerprints( working_dir=working_dir, decomposed_paths=[keyring_root / "main", keyring_root / "packager"], ), ) ) elif "export-keyring" == args.subcommand: export_keyring( working_dir=working_dir, main=[keyring_root / "main"], sources=[keyring_root / "packager"], output=keyring_root / "output" / "archlinux.gpg", force=True, pacman_integration=True, ) elif "export" == args.subcommand: export_keyring( working_dir=working_dir, main=args.main, sources=args.source, output=args.output, force=args.force, pacman_integration=args.pacman_integration, ) else: parser.print_help() if args.wait: print("Press [ENTER] to continue") input()