From 58307c629d10b6baa638de6149693ed2d5cc5ba9 Mon Sep 17 00:00:00 2001 From: Levente Polyak Date: Sun, 24 Oct 2021 14:37:27 +0200 Subject: [PATCH] chore(keyringctl): modularize the code for overview and testing --- Makefile | 6 +- keyringctl | 1450 +------------------------------------ libkeyringctl/__init__.py | 0 libkeyringctl/cli.py | 171 +++++ libkeyringctl/keyring.py | 978 +++++++++++++++++++++++++ libkeyringctl/sequoia.py | 224 ++++++ libkeyringctl/types.py | 7 + libkeyringctl/util.py | 119 +++ 8 files changed, 1504 insertions(+), 1451 deletions(-) create mode 100644 libkeyringctl/__init__.py create mode 100644 libkeyringctl/cli.py create mode 100644 libkeyringctl/keyring.py create mode 100644 libkeyringctl/sequoia.py create mode 100644 libkeyringctl/types.py create mode 100644 libkeyringctl/util.py diff --git a/Makefile b/Makefile index c8dacac..49bfd0f 100644 --- a/Makefile +++ b/Makefile @@ -5,10 +5,10 @@ KEYRING_FILES=$(wildcard build/*.gpg) $(wildcard build/*-revoked) $(wildcard bui all: build lint: - black --check --diff keyringctl + black --check --diff keyringctl libkeyringctl isort --diff . - flake8 keyringctl - mypy --install-types --non-interactive keyringctl + flake8 keyringctl libkeyringctl + mypy --install-types --non-interactive keyringctl libkeyringctl build: ./keyringctl -v build diff --git a/keyringctl b/keyringctl index 31263cc..7922426 100755 --- a/keyringctl +++ b/keyringctl @@ -2,1453 +2,7 @@ # # SPDX-License-Identifier: GPL-3.0-or-later -from argparse import ArgumentParser -from collections import defaultdict -from collections.abc import Iterable -from collections.abc import Iterator -from contextlib import contextmanager -from datetime import datetime -from functools import reduce -from itertools import chain -from logging import DEBUG -from logging import basicConfig -from logging import debug -from os import chdir -from os import getcwd -from pathlib import Path -from re import escape -from re import match -from re import split -from re import sub -from shutil import copytree -from subprocess import PIPE -from subprocess import CalledProcessError -from subprocess import check_output -from sys import exit -from sys import stderr -from tempfile import TemporaryDirectory -from tempfile import mkdtemp -from tempfile import mkstemp -from traceback import print_stack -from typing import Dict -from typing import List -from typing import NewType -from typing import Optional -from typing import Set -from typing import Tuple -from typing import Union - -Fingerprint = NewType("Fingerprint", str) -Uid = NewType("Uid", str) -Username = NewType("Username", str) - - -@contextmanager -def cwd(new_dir: Path) -> Iterator[None]: - """Change to a new current working directory in a context and go back to the previous dir after the context is done - - Parameters - ---------- - new_dir: A path to change to - """ - - previous_dir = getcwd() - chdir(new_dir) - try: - yield - finally: - chdir(previous_dir) - - -def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]: - """Sort an Iterable of Paths naturally - - Parameters - ---------- - _list: An iterable containing paths to be sorted - - Return - ------ - An Iterable of paths that are naturally sorted - """ - - def convert_text_chunk(text: str) -> Union[int, str]: - """Convert input text to int or str - - Parameters - ---------- - text: An input string - - Returns - ------- - Either an integer if text is a digit, else text in lower-case representation - """ - - return int(text) if text.isdigit() else text.lower() - - def alphanum_key(key: Path) -> List[Union[int, str]]: - """Retrieve an alphanumeric key from a Path, that can be used in sorted() - - Parameters - ---------- - key: A path for which to create a key - - Returns - ------- - A list of either int or str objects that may serve as 'key' argument for sorted() - """ - - return [convert_text_chunk(c) for c in split("([0-9]+)", str(key.name))] - - return sorted(_list, key=alphanum_key) - - -def system(cmd: List[str], exit_on_error: bool = False) -> str: - """Execute a command using check_output - - Parameters - ---------- - cmd: A list of strings to be fed to check_output - exit_on_error: Whether to exit the script when encountering an error (defaults to False) - - Raises - ------ - CalledProcessError: If not exit_on_error and `check_output()` encounters an error - - Returns - ------- - The output of cmd - """ - - try: - return check_output(cmd, stderr=PIPE).decode() - except CalledProcessError as e: - stderr.buffer.write(e.stderr) - print_stack() - if exit_on_error: - exit(e.returncode) - raise e - - -def is_pgp_fingerprint(string: str) -> bool: - """Returns whether the passed string looks like a PGP (long) fingerprint - - Parameters - ---------- - string: Input to consider as a fingerprint - - Returns - ------- - RWhether string is a fingerprint - """ - if len(string) not in [16, 40]: - return False - return match("^[A-F0-9]+$", string) is not None - - -def get_cert_paths(paths: Iterable[Path]) -> Set[Path]: - """Walks a list of paths and resolves all discovered certificate paths - - Parameters - ---------- - paths: A list of paths to walk and resolve to certificate paths. - - Returns - ------- - The list of paths to certificates - """ - - # depth first search certificate paths - cert_paths: Set[Path] = set() - visit: List[Path] = list(paths) - while visit: - path = visit.pop() - if not path.exists(): - continue - # this level contains a certificate, abort depth search - if list(path.glob("*.asc")): - cert_paths.add(path) - continue - visit.extend([path for path in path.iterdir() if path.is_dir()]) - return cert_paths - - -def transform_username_to_keyring_path(keyring_dir: Path, paths: List[Path]) -> None: - """Mutates the input sources by transforming passed usernames to keyring paths - - Parameters - ---------- - keyring_dir: The directory underneath the username needs to exist - paths: A list of paths to mutate and replace usernames to keyring paths - """ - for index, source in enumerate(paths): - if source.exists(): - continue - packager_source = keyring_dir / source.name - if not packager_source.exists(): - continue - paths[index] = packager_source - - -def transform_fingerprint_to_keyring_path(keyring_root: Path, paths: List[Path]) -> None: - """Mutates the input sources by transforming passed fingerprints to keyring paths - - Parameters - ---------- - keyring_root: The keyring root directory to look up fingerprints in - paths: A list of paths to mutate and replace fingerprints to keyring paths - """ - for index, source in enumerate(paths): - if source.exists(): - continue - if not is_pgp_fingerprint(source.name): - continue - fingerprint_paths = list(keyring_root.glob(f"*/*/*{source.name}")) - if not fingerprint_paths: - continue - paths[index] = fingerprint_paths[0].parent - - -# TODO: simplify to lower complexity -def convert_certificate( # noqa: ignore=C901 - working_dir: Path, - certificate: Path, - keyring_dir: Path, - name_override: Optional[Username] = None, - fingerprint_filter: Optional[Set[Fingerprint]] = None, -) -> Path: - """Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets - - The output directory structure is created per user. The username is derived from the certificate via - `derive_username_from_fingerprint` or overridden via `name_override`. - Below the username directory a directory tree describes the public keys components split up into certifications - and revocations, as well as per subkey and per uid certifications and revocations. - - Parameters - ---------- - working_dir: The path of the working directory below which to create split certificates - certificate: The path to a public key certificate - keyring_dir: The path of the keyring used to try to derive the username from the public key fingerprint - name_override: An optional string to override the username in the to be created output directory structure - fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with - - Raises - ------ - Exception: If required PGP packets are not found - - Returns - ------- - The path of the user_dir (which is located below working_dir) - """ - - # root packets - certificate_fingerprint: Optional[Fingerprint] = None - pubkey: Optional[Path] = None - # TODO: direct key certifications are not yet selecting the latest sig, owner may have multiple - # TODO: direct key certifications are not yet single packet per file - direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) - direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) - - # subkey packets - subkeys: Dict[Fingerprint, Path] = {} - subkey_bindings: Dict[Fingerprint, List[Path]] = defaultdict(list) - subkey_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) - - # uid packets - uids: Dict[Uid, Path] = {} - certifications: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) - revocations: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) - - # intermediate variables - current_packet_mode: Optional[str] = None - current_packet_fingerprint: Optional[Fingerprint] = None - current_packet_uid: Optional[Uid] = None - - # XXX: PrimaryKeyBinding - - # TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean - - debug(f"Processing certificate {certificate}") - - for packet in packet_split(working_dir=working_dir, certificate=certificate): - debug(f"Processing packet {packet.name}") - if packet.name.endswith("--PublicKey"): - current_packet_mode = "pubkey" - current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint")) - current_packet_uid = None - - certificate_fingerprint = current_packet_fingerprint - pubkey = packet - elif packet.name.endswith("--UserID"): - current_packet_mode = "uid" - current_packet_fingerprint = None - current_packet_uid = simplify_user_id(Uid(packet_dump_field(packet, "Value"))) - - uids[current_packet_uid] = packet - elif packet.name.endswith("--PublicSubkey"): - current_packet_mode = "subkey" - current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint")) - current_packet_uid = None - - subkeys[current_packet_fingerprint] = packet - elif packet.name.endswith("--Signature"): - if not certificate_fingerprint: - raise Exception('missing certificate fingerprint for "{packet.name}"') - - issuer: Fingerprint = Fingerprint(packet_dump_field(packet, "Issuer")) - signature_type = packet_dump_field(packet, "Type") - - if current_packet_mode == "pubkey": - if not current_packet_fingerprint: - raise Exception('missing current packet fingerprint for "{packet.name}"') - - if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer): - direct_revocations[issuer].append(packet) - elif signature_type in ["DirectKey", "GenericCertification"]: - direct_sigs[issuer].append(packet) - else: - raise Exception(f"unknown signature type: {signature_type}") - elif current_packet_mode == "uid": - if not current_packet_uid: - raise Exception('missing current packet uid for "{packet.name}"') - - if signature_type == "CertificationRevocation": - revocations[current_packet_uid][issuer].append(packet) - elif signature_type.endswith("Certification"): - if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]): - debug(f"The certification by issuer {issuer} is appended as it is found in the filter.") - certifications[current_packet_uid][issuer].append(packet) - else: - debug(f"The certification by issuer {issuer} is not appended because it is not in the filter") - else: - raise Exception(f"unknown signature type: {signature_type}") - elif current_packet_mode == "subkey": - if not current_packet_fingerprint: - raise Exception('missing current packet fingerprint for "{packet.name}"') - - if signature_type == "SubkeyBinding": - subkey_bindings[current_packet_fingerprint].append(packet) - elif signature_type == "SubkeyRevocation": - subkey_revocations[certificate_fingerprint].append(packet) - else: - raise Exception(f"unknown signature type: {signature_type}") - else: - raise Exception(f'unknown signature root for "{packet.name}"') - else: - raise Exception(f'unknown packet type "{packet.name}"') - - if not certificate_fingerprint: - raise Exception("missing certificate fingerprint") - - if not pubkey: - raise Exception("missing certificate public-key") - - name_override = ( - name_override - or derive_username_from_fingerprint(keyring_dir=keyring_dir, certificate_fingerprint=certificate_fingerprint) - or Username(certificate.stem) - ) - - user_dir = working_dir / name_override - key_dir = user_dir / certificate_fingerprint - key_dir.mkdir(parents=True, exist_ok=True) - - persist_public_key( - certificate_fingerprint=certificate_fingerprint, - pubkey=pubkey, - key_dir=key_dir, - ) - - persist_direct_key_certifications( - direct_key_certifications=direct_sigs, - key_dir=key_dir, - ) - - persist_direct_key_revocations( - direct_key_revocations=direct_revocations, - key_dir=key_dir, - ) - - persist_subkeys( - key_dir=key_dir, - subkeys=subkeys, - ) - - persist_subkey_bindings( - key_dir=key_dir, - subkey_bindings=subkey_bindings, - ) - - persist_subkey_revocations( - key_dir=key_dir, - subkey_revocations=subkey_revocations, - ) - - persist_uids( - key_dir=key_dir, - uids=uids, - ) - - persist_uid_certifications( - certifications=certifications, - key_dir=key_dir, - ) - - persist_uid_revocations( - revocations=revocations, - key_dir=key_dir, - ) - - return user_dir - - -def persist_public_key( - certificate_fingerprint: Fingerprint, - pubkey: Path, - key_dir: Path, -) -> None: - """Persist the Public-Key packet - - Parameters - ---------- - certificate_fingerprint: The unique fingerprint of the public key - pubkey: The path to the public key of the root key - key_dir: The root directory below which the basic key material is persisted - """ - - packets: List[Path] = [pubkey] - output_file = key_dir / f"{certificate_fingerprint}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") - packet_join(packets, output_file, force=True) - - -def persist_uids( - key_dir: Path, - uids: Dict[Uid, Path], -) -> None: - """Persist the User IDs that belong to a PublicKey - - The User ID material consists of a single User ID Packet. - The files are written to a UID specific directory and file below key_dir/uid. - - Parameters - ---------- - key_dir: The root directory below which the basic key material is persisted - uids: The User IDs of a Public-Key (the root key) - """ - - for uid, uid_packet in uids.items(): - output_file = key_dir / "uid" / uid / f"{uid}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {uid_packet}") - packet_join(packets=[uid_packet], output=output_file, force=True) - - -def persist_subkeys( - key_dir: Path, - subkeys: Dict[Fingerprint, Path], -) -> None: - """Persist all Public-Subkeys of a root key file to file(s) - - Parameters - ---------- - key_dir: The root directory below which the basic key material is persisted - subkeys: The PublicSubkeys of a key - """ - - for fingerprint, subkey in subkeys.items(): - output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {str(subkey)}") - packet_join(packets=[subkey], output=output_file, force=True) - - -def persist_subkey_bindings( - key_dir: Path, - subkey_bindings: Dict[Fingerprint, List[Path]], -) -> None: - """Persist all SubkeyBinding of a root key file to file(s) - - Parameters - ---------- - key_dir: The root directory below which the basic key material is persisted - subkey_bindings: The SubkeyBinding signatures of a Public-Subkey - """ - - for fingerprint, bindings in subkey_bindings.items(): - subkey_binding = latest_certification(bindings) - issuer = packet_dump_field(subkey_binding, "Issuer") - output_file = key_dir / "subkey" / fingerprint / "certification" / f"{issuer}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {str(subkey_binding)}") - packet_join(packets=[subkey_binding], output=output_file, force=True) - - -def persist_subkey_revocations( - key_dir: Path, - subkey_revocations: Dict[Fingerprint, List[Path]], -) -> None: - """Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s) - - Parameters - ---------- - key_dir: The root directory below which the basic key material is persisted - subkey_revocations: The SubkeyRevocations of PublicSubkeys of a key - """ - - for fingerprint, revocations in subkey_revocations.items(): - revocation = latest_certification(revocations) - issuer = packet_dump_field(revocation, "Issuer") - output_file = key_dir / "subkey" / fingerprint / "revocation" / f"{issuer}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {revocation}") - packet_join(packets=[revocation], output=output_file, force=True) - - -def persist_direct_key_certifications( - direct_key_certifications: Dict[Fingerprint, List[Path]], - key_dir: Path, -) -> None: - """Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to - file(s) - - Parameters - ---------- - direct_key_certifications: The direct key certifications to write to file - key_dir: The root directory below which the Directkeys are persisted - """ - - for issuer, certifications in direct_key_certifications.items(): - output_file = key_dir / "certification" / f"{issuer}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") - packet_join(packets=certifications, output=output_file, force=True) - - -def persist_direct_key_revocations( - direct_key_revocations: Dict[Fingerprint, List[Path]], - key_dir: Path, -) -> None: - """Persist the revocations directly on a root key (such as KeyRevocation) to file(s) - - Parameters - ---------- - direct_key_revocations: The direct key revocations to write to file - key_dir: The root directory below which the Directkeys are persisted - """ - - for issuer, certifications in direct_key_revocations.items(): - output_file = key_dir / "revocation" / f"{issuer}.asc" - output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") - packet_join(packets=certifications, output=output_file, force=True) - - -def persist_uid_certifications( - certifications: Dict[Uid, Dict[Fingerprint, List[Path]]], - key_dir: Path, -) -> None: - """Persist the certifications of a root key to file(s) - - The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and - PositiveCertifications for all User IDs of the given root key. - All certifications are persisted in per User ID certification directories below key_dir. - - Parameters - ---------- - certifications: The certifications to write to file - key_dir: The root directory below which certifications are persisted - """ - - for uid, uid_certifications in certifications.items(): - for issuer, issuer_certifications in uid_certifications.items(): - certification_dir = key_dir / "uid" / uid / "certification" - certification_dir.mkdir(parents=True, exist_ok=True) - certification = latest_certification(issuer_certifications) - output_file = certification_dir / f"{issuer}.asc" - debug(f"Writing file {output_file} from {certification}") - packet_join(packets=[certification], output=output_file, force=True) - - -def persist_uid_revocations( - revocations: Dict[Uid, Dict[Fingerprint, List[Path]]], - key_dir: Path, -) -> None: - """Persist the revocations of a root key to file(s) - - The revocations include all CertificationRevocations for all User IDs of the given root key. - All revocations are persisted in per User ID 'revocation' directories below key_dir. - - Parameters - ---------- - revocations: The revocations to write to file - key_dir: The root directory below which revocations will be persisted - """ - - for uid, uid_revocations in revocations.items(): - for issuer, issuer_revocations in uid_revocations.items(): - revocation_dir = key_dir / "uid" / uid / "revocation" - revocation_dir.mkdir(parents=True, exist_ok=True) - revocation = latest_certification(issuer_revocations) - output_file = revocation_dir / f"{issuer}.asc" - debug(f"Writing file {output_file} from {revocation}") - packet_join(packets=[revocation], output=output_file, force=True) - - -def packet_dump(packet: Path) -> str: - """Dump a PGP packet to string - - The `sq packet dump` command is used to retrieve a dump of information from a PGP packet - - Parameters - ---------- - packet: The path to the PGP packet to retrieve the value from - - Returns - ------- - The contents of the packet dump - """ - - return system(["sq", "packet", "dump", str(packet)]) - - -def packet_dump_field(packet: Path, field: str) -> str: - """Retrieve the value of a field from a PGP packet - - Parameters - ---------- - packet: The path to the PGP packet to retrieve the value from - field: The name of the field - - Raises - ------ - Exception: If the field is not found in the PGP packet - - Returns - ------- - The value of the field found in packet - """ - - dump = packet_dump(packet) - lines = [line.strip() for line in dump.splitlines()] - lines = list(filter(lambda line: line.strip().startswith(f"{field}: "), lines)) - if not lines: - raise Exception(f'Packet has no field "{field}"') - return lines[0].split(sep=": ", maxsplit=1)[1] - - -def packet_signature_creation_time(packet: Path) -> datetime: - """Retrieve the signature creation time field as datetime - - Parameters - ---------- - packet: The path to the PGP packet to retrieve the value from - - Returns - ------- - The signature creation time as datetime - """ - return datetime.strptime(packet_dump_field(packet, "Signature creation time"), "%Y-%m-%d %H:%M:%S %Z") - - -def latest_certification(certifications: Iterable[Path]) -> Path: - """Returns the latest certification based on the signature creation time from a list of packets. - - Parameters - ---------- - certifications: List of certification from which to choose the latest from - - Returns - ------- - The latest certification from a list of packets - """ - return reduce( - lambda a, b: a if packet_signature_creation_time(a) > packet_signature_creation_time(b) else b, - certifications, - ) - - -def keyring_split(working_dir: Path, keyring: Path, preserve_filename: bool = False) -> Iterable[Path]: - """Split a file containing a PGP keyring into separate certificate files - - The original keyring filename is preserved if the split only yields a single certificate. - If preserve_filename is True, all keyrings are placed into separate directories while preserving - the filename. - - The file is split using sq. - - Parameters - ---------- - working_dir: The path of the working directory below which to create the output files - keyring: The path of a file containing a PGP keyring - preserve_filename: If True, all keyrings are placed into separate directories while preserving the filename - - Returns - ------- - An iterable over the naturally sorted list of certificate files derived from a keyring - """ - - keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute() - - with cwd(keyring_dir): - system(["sq", "keyring", "split", "--prefix", "", str(keyring)]) - - keyrings: List[Path] = list(natural_sort_path(keyring_dir.iterdir())) - - if 1 == len(keyrings) or preserve_filename: - for index, key in enumerate(keyrings): - keyring_sub_dir = Path(mkdtemp(dir=keyring_dir, prefix=f"{keyring.name}-")).absolute() - keyrings[index] = key.rename(keyring_sub_dir / keyring.name) - - return keyrings - - -def keyring_merge(certificates: List[Path], output: Optional[Path] = None, force: bool = False) -> str: - """Merge multiple certificates into a keyring - - Parameters - ---------- - certificates: List of paths to certificates to merge into a keyring - output: Path to a file which the keyring is written, return the result instead if None - force: Whether to force overwriting existing files (defaults to False) - - Returns - ------- - The result if no output file has been used - """ - - cmd = ["sq", "keyring", "merge"] - if force: - cmd.insert(1, "--force") - if output: - cmd += ["--output", str(output)] - cmd += [str(cert) for cert in sorted(certificates)] - - return system(cmd) - - -def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]: - """Split a file containing a PGP certificate into separate packet files - - The files are split using sq - - Parameters - ---------- - working_dir: The path of the working directory below which to create the output files - certificate: The absolute path of a file containing one PGP certificate - - Returns - ------- - An iterable over the naturally sorted list of packet files derived from certificate - """ - - packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute() - - with cwd(packet_dir): - system(["sq", "packet", "split", "--prefix", "", str(certificate)]) - return natural_sort_path(packet_dir.iterdir()) - - -def packet_join(packets: List[Path], output: Optional[Path] = None, force: bool = False) -> str: - """Join PGP packet data in files to a single output file - - Parameters - ---------- - packets: A list of paths to files that contain PGP packet data - output: Path to a file to which all PGP packet data is written, return the result instead if None - force: Whether to force overwriting existing files (defaults to False) - - Returns - ------- - The result if no output file has been used - """ - - cmd = ["sq", "packet", "join"] - if force: - cmd.insert(1, "--force") - packets_str = list(map(lambda path: str(path), packets)) - cmd.extend(packets_str) - cmd.extend(["--output", str(output)]) - return system(cmd) - - -def inspect( - packet: Path, certifications: bool = True, fingerprints: Optional[Dict[Fingerprint, Username]] = None -) -> str: - """Inspect PGP packet data and return the result - - Parameters - ---------- - packet: Path to a file that contain PGP data - certifications: Whether to print third-party certifications - fingerprints: Optional dict of fingerprints to usernames to enrich the output with - - Returns - ------- - The result of the inspection - """ - - cmd = ["sq", "inspect"] - if certifications: - cmd.append("--certifications") - cmd.append(str(packet)) - result: str = system(cmd) - - if fingerprints: - for fingerprint, username in fingerprints.items(): - result = sub(f"{fingerprint}", f"{fingerprint} {username}", result) - result = sub(f" {fingerprint[24:]}", f" {fingerprint[24:]} {username}", result) - - return result - - -def simplify_user_id(user_id: Uid) -> Uid: - """Simplify the User ID string to contain more filesystem friendly characters - - Parameters - ---------- - user_id: A User ID string (e.g. 'Foobar McFooface ') - - Returns - ------- - The simplified representation of user_id - """ - - user_id_str: str = user_id.replace("@", "_at_") - user_id_str = sub("[<>]", "", user_id_str) - user_id_str = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id_str) - return Uid(user_id_str) - - -def derive_username_from_fingerprint(keyring_dir: Path, certificate_fingerprint: Fingerprint) -> Optional[Username]: - """Attempt to derive the username of a public key fingerprint from a keyring directory - - Parameters - ---------- - keyring_dir: The directory in which to look up a username - certificate_fingerprint: The public key fingerprint to derive the username from - - Raises - ------ - Exception: If more than one username is found (a public key can only belong to one individual) - - Returns - ------- - A string representing the username a public key certificate belongs to, None otherwise - """ - - matches = list(keyring_dir.glob(f"*/*{certificate_fingerprint}")) - - if len(matches) > 1: - raise Exception( - f"More than one username found in {keyring_dir} when probing for fingerprint '{certificate_fingerprint}': " - f"{matches}" - ) - elif not matches: - debug(f"Can not derive username from target directory for fingerprint {certificate_fingerprint}") - return None - else: - username = matches[0].parent.stem - debug( - f"Successfully derived username '{username}' from target directory for fingerprint " - f"{certificate_fingerprint}" - ) - return Username(username) - - -def convert( - working_dir: Path, - keyring_root: Path, - source: Iterable[Path], - target_dir: Path, - name_override: Optional[Username] = None, -) -> Path: - """Convert a path containing PGP certificate material to a decomposed directory structure - - Any input is first split by `keyring_split()` into individual certificates. - - Parameters - ---------- - working_dir: A directory to use for temporary files - keyring_root: The keyring root directory to look up accepted fingerprints for certifications - source: A path to a file or directory to decompose - target_dir: A directory path to write the new directory structure to - name_override: An optional username override for the call to `convert_certificate()` - - Returns - ------- - The directory that contains the resulting directory structure (target_dir) - """ - - directories: List[Path] = [] - keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source))) - - fingerprint_filter = set( - get_fingerprints( - working_dir=working_dir, - sources=source, - paths=[keyring_root], - ).keys() - ) - - for key in keys: - for cert in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True): - directories.append( - convert_certificate( - working_dir=working_dir, - certificate=cert, - keyring_dir=target_dir, - name_override=name_override, - fingerprint_filter=fingerprint_filter, - ) - ) - - for path in directories: - (target_dir / path.name).mkdir(parents=True, exist_ok=True) - copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True) - - return target_dir - - -def get_trusted_and_revoked_certs(certs: List[Path]) -> Tuple[List[Fingerprint], List[Fingerprint]]: - """Get the fingerprints of all trusted and all self revoked public keys in a directory - - Parameters - ---------- - certs: The certificates to trust - - Returns - ------- - A tuple with the first item containing the fingerprints of all public keys and the second item containing the - fingerprints of all self-revoked public keys - """ - - all_certs: List[Fingerprint] = [] - revoked_certs: List[Fingerprint] = [] - - # TODO: what about direct key revocations/signatures? - - debug(f"Retrieving trusted and self-revoked certificates from {[str(cert_dir) for cert_dir in certs]}") - - for cert_dir in sorted(get_cert_paths(certs)): - cert_fingerprint = Fingerprint(cert_dir.stem) - all_certs.append(cert_fingerprint) - for revocation_cert in cert_dir.glob("revocation/*.asc"): - if cert_fingerprint.endswith(revocation_cert.stem): - debug(f"Revoking {cert_fingerprint} due to self-revocation") - revoked_certs.append(cert_fingerprint) - - trusted_keys = [cert for cert in all_certs if cert not in revoked_certs] - - return trusted_keys, revoked_certs - - -def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]: - """Export ownertrust from a set of keys and return the trusted and revoked fingerprints - - The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all - non-revoked keys as fully trusted. - The exported file is used by pacman-key when importing a keyring (see - https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT). - - Parameters - ---------- - certs: The certificates to trust - output: The file path to write to - """ - - trusted_certs, revoked_certs = get_trusted_and_revoked_certs(certs=certs) - - with open(file=output, mode="w") as trusted_certs_file: - for cert in sorted(set(trusted_certs)): - debug(f"Writing {cert} to {output}") - trusted_certs_file.write(f"{cert}:4:\n") - - return trusted_certs, revoked_certs - - -def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None: - """Export the PGP revoked status from a set of keys - - The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations - by any main key exist. - The exported file is used by pacman-key when importing a keyring (see - https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT). - - Parameters - ---------- - certs: A list of directories with keys to check for their revocation status - main_keys: A list of strings representing the fingerprints of (current and/or revoked) main keys - output: The file path to write to - min_revoker: The minimum amount of revocation certificates on a User ID from any main key to deem a public key as - revoked - """ - - trusted_certs, revoked_certs = get_trusted_and_revoked_certs(certs=certs) - - debug(f"Retrieving certificates revoked by main keys from {[str(cert_dir) for cert_dir in certs]}") - foreign_revocations: Dict[Fingerprint, Set[Fingerprint]] = defaultdict(set) - for cert_dir in sorted(get_cert_paths(certs)): - fingerprint = Fingerprint(cert_dir.name) - debug(f"Inspecting public key {fingerprint}") - for revocation_cert in cert_dir.glob("uid/*/revocation/*.asc"): - revocation_fingerprint = Fingerprint(revocation_cert.stem) - foreign_revocations[fingerprint].update( - [revocation_fingerprint for main_key in main_keys if main_key.endswith(revocation_fingerprint)] - ) - - # TODO: find a better (less naive) approach, as this would also match on public certificates, - # where some UIDs are signed and others are revoked - if len(foreign_revocations[fingerprint]) >= min_revoker: - debug( - f"Revoking {cert_dir.name} due to {set(foreign_revocations[fingerprint])} " "being main key revocations" - ) - revoked_certs.append(fingerprint) - - with open(file=output, mode="w") as trusted_certs_file: - for cert in sorted(set(revoked_certs)): - debug(f"Writing {cert} to {output}") - trusted_certs_file.write(f"{cert}\n") - - -def get_fingerprints_from_keyring_files(working_dir: Path, source: Iterable[Path]) -> Dict[Fingerprint, Username]: - """Get all fingerprints of PGP public keys from import file(s) - - Parameters - ---------- - working_dir: A directory to use for temporary files - source: The path to a source file or directory containing keyrings - - Returns - ------- - A dict of all fingerprints and their usernames of PGP public keys below path - """ - - fingerprints: Dict[Fingerprint, Username] = {} - keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source))) - - for key in keys: - for certificate in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True): - for packet in packet_split(working_dir=working_dir, certificate=certificate): - if packet.name.endswith("--PublicKey"): - fingerprints[Fingerprint(packet_dump_field(packet, "Fingerprint"))] = Username(certificate.stem) - - debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}") - return fingerprints - - -def get_fingerprints_from_certificate_directory( - paths: List[Path], prefix: str = "", postfix: str = "" -) -> Dict[Fingerprint, Username]: - """Get all fingerprints of PGP public keys from decomposed directory structures - - Parameters - ---------- - paths: The path to a decomposed directory structure - prefix: Prefix to add to each username - postfix: Postfix to add to each username - - Returns - ------- - A dict of all fingerprints and their usernames of PGP public keys below path - """ - - fingerprints: Dict[Fingerprint, Username] = {} - for cert in sorted(get_cert_paths(paths)): - fingerprints[Fingerprint(cert.name)] = Username(f"{prefix}{cert.parent.name}{postfix}") - - debug(f"Fingerprints of PGP public keys in {paths}: {fingerprints}") - return fingerprints - - -def get_fingerprints(working_dir: Path, sources: Iterable[Path], paths: List[Path]) -> Dict[Fingerprint, Username]: - """Get the fingerprints of PGP public keys from input paths and decomposed directory structures - - Parameters - ---------- - working_dir: A directory to use for temporary files - sources: A list of directories or files from which to read PGP keyring information - paths: A list of paths that identify decomposed PGP data in directory structures - - Returns - ------- - A dict of all fingerprints and their usernames of PGP public keys below path - """ - - fingerprints: Dict[Fingerprint, Username] = {} - - fingerprints.update( - get_fingerprints_from_keyring_files( - working_dir=working_dir, - source=sources, - ) - ) - - fingerprints.update(get_fingerprints_from_certificate_directory(paths=paths)) - - return fingerprints - - -def get_packets_from_path(path: Path) -> List[Path]: - """Collects packets from one level by appending the root, certifications and revocations. - - Parameters - ---------- - path: Filesystem path used to collect the packets from - - Returns - ------- - A list of packets ordered by root, certification, revocation - """ - if not path.exists(): - return [] - - packets: List[Path] = [] - packets += sorted(path.glob("*.asc")) - certifications = path / "certification" - if certifications.exists(): - packets += sorted(certifications.glob("*.asc")) - revocations = path / "revocation" - if revocations.exists(): - packets += sorted(revocations.glob("*.asc")) - return packets - - -def get_packets_from_listing(path: Path) -> List[Path]: - """Collects packets from a listing of directories holding one level each by calling `get_get_packets_from_path`. - - Parameters - ---------- - path: Filesystem path used as listing to collect the packets from - - Returns - ------- - A list of packets ordered by root, certification, revocation for each level - """ - if not path.exists(): - return [] - - packets: List[Path] = [] - for sub_path in sorted(path.iterdir()): - packets += get_packets_from_path(sub_path) - return packets - - -def export( - working_dir: Path, - keyring_root: Path, - sources: Optional[List[Path]] = None, - output: Optional[Path] = None, -) -> str: - """Export all provided PGP packet files to a single output file - - If sources contains directories, any .asc files below them are considered. - - Parameters - ---------- - working_dir: A directory to use for temporary files - keyring_root: The keyring root directory to look up username shorthand sources - sources: A list of username, fingerprint or directories from which to read PGP packet information - (defaults to `keyring_root`) - output: An output file that all PGP packet data is written to, return the result instead if None - - Returns - ------- - The result if no output file has been used - """ - - if not sources: - sources = [keyring_root] - - # transform shorthand paths to actual keyring paths - transform_username_to_keyring_path(keyring_dir=keyring_root / "packager", paths=sources) - transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources) - - temp_dir = Path(mkdtemp(dir=working_dir, prefix="arch-keyringctl-export-join-")).absolute() - cert_paths: Set[Path] = get_cert_paths(sources) - certificates: List[Path] = [] - - for cert_dir in sorted(cert_paths): - packets: List[Path] = [] - packets += get_packets_from_path(cert_dir) - packets += get_packets_from_listing(cert_dir / "subkey") - packets += get_packets_from_listing(cert_dir / "uid") - - output_path = temp_dir / f"{cert_dir.name}.asc" - debug(f"Joining {cert_dir} in {output_path}") - packet_join( - packets=packets, - output=output_path, - force=True, - ) - certificates.append(output_path) - - return keyring_merge(certificates, output, force=True) - - -def build( - working_dir: Path, - keyring_root: Path, - target_dir: Path, -) -> None: - """Build keyring PGP artifacts alongside ownertrust and revoked status files - - Parameters - ---------- - working_dir: A directory to use for temporary files - keyring_root: The keyring root directory to build the artifacts from - target_dir: Output directory that all artifacts are written to - """ - - target_dir.mkdir(parents=True, exist_ok=True) - - keyring: Path = target_dir / Path("archlinux.gpg") - export(working_dir=working_dir, keyring_root=keyring_root, output=keyring) - - [trusted_main_keys, revoked_main_keys] = export_ownertrust( - certs=[keyring_root / "main"], - output=target_dir / "archlinux-trusted", - ) - export_revoked( - certs=[keyring_root], - main_keys=trusted_main_keys + revoked_main_keys, - output=target_dir / "archlinux-revoked", - ) - - -def list_keyring(keyring_root: Path, sources: Optional[List[Path]] = None, main_keys: bool = False) -> None: - """List certificates in the keyring - - If sources contains directories, all certificate below them are considered. - - Parameters - ---------- - keyring_root: The keyring root directory to look up username shorthand sources - sources: A list of username, fingerprint or directories from which to read PGP packet information - (defaults to `keyring_root`) - main_keys: List main keys instead of packager keys (defaults to False) - """ - - keyring_dir = keyring_root / ("main" if main_keys else "packager") - - if not sources: - sources = list(sorted(keyring_dir.iterdir(), key=lambda path: path.name.casefold())) - - # transform shorthand paths to actual keyring paths - transform_username_to_keyring_path(keyring_dir=keyring_dir, paths=sources) - transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources) - - username_length = max([len(source.name) for source in sources]) - - for user_path in sources: - if is_pgp_fingerprint(user_path.name): - user_path = user_path.parent - certificates = [cert.name for cert in user_path.iterdir()] - print(f"{user_path.name:<{username_length}} {' '.join(certificates)}") - - -def inspect_keyring(working_dir: Path, keyring_root: Path, sources: Optional[List[Path]]) -> str: - """Inspect certificates in the keyring and pretty print the data - - If sources contains directories, all certificate below them are considered. - - Parameters - ---------- - working_dir: A directory to use for temporary files - keyring_root: The keyring root directory to look up username shorthand sources - sources: A list of username, fingerprint or directories from which to read PGP packet information - (defaults to `keyring_root`) - - Returns - ------- - The result of the inspect - """ - - if not sources: - sources = [keyring_root] - - # transform shorthand paths to actual keyring paths - transform_username_to_keyring_path(keyring_dir=keyring_root / "packager", paths=sources) - transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources) - - keyring = Path(mkstemp(dir=working_dir, prefix="packet-", suffix=".asc")[1]).absolute() - export(working_dir=working_dir, keyring_root=keyring_root, sources=sources, output=keyring) - - fingerprints: Dict[Fingerprint, Username] = get_fingerprints_from_certificate_directory( - paths=[keyring_root / "packager"] - ) | get_fingerprints_from_certificate_directory(paths=[keyring_root / "main"], postfix=" (main)") - - return inspect( - packet=keyring, - certifications=True, - fingerprints=fingerprints, - ) - - -def absolute_path(path: str) -> Path: - """Return the absolute path of a given str - - Parameters - ---------- - path: A string representing a path - - Returns - ------- - The absolute path representation of path - """ - - return Path(path).absolute() - +from libkeyringctl.cli import main if __name__ == "__main__": - parser = ArgumentParser() - parser.add_argument( - "-v", "--verbose", action="store_true", help="Causes to print debugging messages about the progress" - ) - parser.add_argument("--wait", action="store_true", help="Block before cleaning up the temp directory") - parser.add_argument( - "-f", - "--force", - action="store_true", - default=False, - help="force the execution of subcommands (e.g. overwriting of files)", - ) - subcommands = parser.add_subparsers(dest="subcommand") - - convert_parser = subcommands.add_parser( - "convert", - help="convert one or multiple PGP public keys to a decomposed directory structure", - ) - convert_parser.add_argument("source", type=absolute_path, nargs="+", help="Files or directorie to convert") - convert_parser.add_argument("--target", type=absolute_path, help="Target directory instead of a random tmpdir") - convert_parser.add_argument( - "--name", - type=Username, - default=None, - help="override the username to use (only useful when using a single file as source)", - ) - - import_parser = subcommands.add_parser( - "import", - help="import one or several PGP keys to the keyring directory structure", - ) - import_parser.add_argument("source", type=absolute_path, nargs="+", help="Files or directories to import") - import_parser.add_argument( - "--name", - type=Username, - default=None, - help="override the username to use (only useful when using a single file as source)", - ) - import_parser.add_argument("--main", action="store_true", help="Import a main signing key into the keyring") - - export_parser = subcommands.add_parser( - "export", - help="export a directory structure of PGP packet data to a combined file", - ) - export_parser.add_argument("-o", "--output", type=absolute_path, help="file to write PGP packet data to") - export_parser.add_argument( - "source", - nargs="*", - help="username, fingerprint or directories containing certificates", - type=absolute_path, - ) - - build_parser = subcommands.add_parser( - "build", - help="build keyring PGP artifacts alongside ownertrust and revoked status files", - ) - - list_parser = subcommands.add_parser( - "list", - help="list the certificates in the keyring", - ) - list_parser.add_argument("--main", action="store_true", help="List main signing keys instead of packager keys") - list_parser.add_argument( - "source", - nargs="*", - help="username, fingerprint or directories containing certificates", - type=absolute_path, - ) - - inspect_parser = subcommands.add_parser( - "inspect", - help="inspect certificates in the keyring and pretty print the data", - ) - inspect_parser.add_argument( - "source", - nargs="*", - help="username, fingerprint or directories containing certificates", - type=absolute_path, - ) - - args = parser.parse_args() - - if args.verbose: - basicConfig(level=DEBUG) - - # temporary working directory that gets auto cleaned - with TemporaryDirectory(prefix="arch-keyringctl-") as tempdir: - keyring_root = Path("keyring").absolute() - working_dir = Path(tempdir) - debug(f"Working directory: {working_dir}") - with cwd(working_dir): - if "convert" == args.subcommand: - target_dir = args.target or Path(mkdtemp(prefix="arch-keyringctl-")).absolute() - print( - convert( - working_dir=working_dir, - keyring_root=keyring_root, - source=args.source, - target_dir=target_dir, - name_override=args.name, - ) - ) - elif "import" == args.subcommand: - target_dir = "main" if args.main else "packager" - print( - convert( - working_dir=working_dir, - keyring_root=keyring_root, - source=args.source, - target_dir=keyring_root / target_dir, - name_override=args.name, - ) - ) - elif "export" == args.subcommand: - print( - export( - working_dir=working_dir, - keyring_root=keyring_root, - sources=args.source, - output=args.output, - ), - end="", - ) - elif "build" == args.subcommand: - build( - working_dir=working_dir, - keyring_root=keyring_root, - target_dir=keyring_root.parent / "build", - ) - elif "list" == args.subcommand: - list_keyring( - keyring_root=keyring_root, - sources=args.source, - main_keys=args.main, - ) - elif "inspect" == args.subcommand: - print( - inspect_keyring( - working_dir=working_dir, - keyring_root=keyring_root, - sources=args.source, - ), - end="", - ) - else: - parser.print_help() - - if args.wait: - print("Press [ENTER] to continue") - input() + main() diff --git a/libkeyringctl/__init__.py b/libkeyringctl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libkeyringctl/cli.py b/libkeyringctl/cli.py new file mode 100644 index 0000000..881c666 --- /dev/null +++ b/libkeyringctl/cli.py @@ -0,0 +1,171 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +from argparse import ArgumentParser +from logging import DEBUG +from logging import basicConfig +from logging import debug +from pathlib import Path +from tempfile import TemporaryDirectory +from tempfile import mkdtemp + +from .keyring import Username +from .keyring import build +from .keyring import convert +from .keyring import export +from .keyring import inspect_keyring +from .keyring import list_keyring +from .util import absolute_path +from .util import cwd + +parser = ArgumentParser() +parser.add_argument( + "-v", "--verbose", action="store_true", help="Causes to print debugging messages about the progress" +) +parser.add_argument("--wait", action="store_true", help="Block before cleaning up the temp directory") +parser.add_argument( + "-f", + "--force", + action="store_true", + default=False, + help="force the execution of subcommands (e.g. overwriting of files)", +) +subcommands = parser.add_subparsers(dest="subcommand") + +convert_parser = subcommands.add_parser( + "convert", + help="convert one or multiple PGP public keys to a decomposed directory structure", +) +convert_parser.add_argument("source", type=absolute_path, nargs="+", help="Files or directorie to convert") +convert_parser.add_argument("--target", type=absolute_path, help="Target directory instead of a random tmpdir") +convert_parser.add_argument( + "--name", + type=Username, + default=None, + help="override the username to use (only useful when using a single file as source)", +) + +import_parser = subcommands.add_parser( + "import", + help="import one or several PGP keys to the keyring directory structure", +) +import_parser.add_argument("source", type=absolute_path, nargs="+", help="Files or directories to import") +import_parser.add_argument( + "--name", + type=Username, + default=None, + help="override the username to use (only useful when using a single file as source)", +) +import_parser.add_argument("--main", action="store_true", help="Import a main signing key into the keyring") + +export_parser = subcommands.add_parser( + "export", + help="export a directory structure of PGP packet data to a combined file", +) +export_parser.add_argument("-o", "--output", type=absolute_path, help="file to write PGP packet data to") +export_parser.add_argument( + "source", + nargs="*", + help="username, fingerprint or directories containing certificates", + type=absolute_path, +) + +build_parser = subcommands.add_parser( + "build", + help="build keyring PGP artifacts alongside ownertrust and revoked status files", +) + +list_parser = subcommands.add_parser( + "list", + help="list the certificates in the keyring", +) +list_parser.add_argument("--main", action="store_true", help="List main signing keys instead of packager keys") +list_parser.add_argument( + "source", + nargs="*", + help="username, fingerprint or directories containing certificates", + type=absolute_path, +) + +inspect_parser = subcommands.add_parser( + "inspect", + help="inspect certificates in the keyring and pretty print the data", +) +inspect_parser.add_argument( + "source", + nargs="*", + help="username, fingerprint or directories containing certificates", + type=absolute_path, +) + + +def main() -> None: + args = parser.parse_args() + + if args.verbose: + basicConfig(level=DEBUG) + + # temporary working directory that gets auto cleaned + with TemporaryDirectory(prefix="arch-keyringctl-") as tempdir: + keyring_root = Path("keyring").absolute() + working_dir = Path(tempdir) + debug(f"Working directory: {working_dir}") + with cwd(working_dir): + if "convert" == args.subcommand: + target_dir = args.target or Path(mkdtemp(prefix="arch-keyringctl-")).absolute() + print( + convert( + working_dir=working_dir, + keyring_root=keyring_root, + source=args.source, + target_dir=target_dir, + name_override=args.name, + ) + ) + elif "import" == args.subcommand: + target_dir = "main" if args.main else "packager" + print( + convert( + working_dir=working_dir, + keyring_root=keyring_root, + source=args.source, + target_dir=keyring_root / target_dir, + name_override=args.name, + ) + ) + elif "export" == args.subcommand: + print( + export( + working_dir=working_dir, + keyring_root=keyring_root, + sources=args.source, + output=args.output, + ), + end="", + ) + elif "build" == args.subcommand: + build( + working_dir=working_dir, + keyring_root=keyring_root, + target_dir=keyring_root.parent / "build", + ) + elif "list" == args.subcommand: + list_keyring( + keyring_root=keyring_root, + sources=args.source, + main_keys=args.main, + ) + elif "inspect" == args.subcommand: + print( + inspect_keyring( + working_dir=working_dir, + keyring_root=keyring_root, + sources=args.source, + ), + end="", + ) + else: + parser.print_help() + + if args.wait: + print("Press [ENTER] to continue") + input() diff --git a/libkeyringctl/keyring.py b/libkeyringctl/keyring.py new file mode 100644 index 0000000..07f9768 --- /dev/null +++ b/libkeyringctl/keyring.py @@ -0,0 +1,978 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +from collections import defaultdict +from collections.abc import Iterable +from itertools import chain +from logging import debug +from pathlib import Path +from re import escape +from re import match +from re import sub +from shutil import copytree +from tempfile import mkdtemp +from tempfile import mkstemp +from typing import Dict +from typing import List +from typing import Optional +from typing import Set +from typing import Tuple + +from .sequoia import inspect +from .sequoia import keyring_merge +from .sequoia import keyring_split +from .sequoia import latest_certification +from .sequoia import packet_dump_field +from .sequoia import packet_join +from .sequoia import packet_split +from .types import Fingerprint +from .types import Uid +from .types import Username + + +def is_pgp_fingerprint(string: str) -> bool: + """Returns whether the passed string looks like a PGP (long) fingerprint + + Parameters + ---------- + string: Input to consider as a fingerprint + + Returns + ------- + RWhether string is a fingerprint + """ + if len(string) not in [16, 40]: + return False + return match("^[A-F0-9]+$", string) is not None + + +def get_cert_paths(paths: Iterable[Path]) -> Set[Path]: + """Walks a list of paths and resolves all discovered certificate paths + + Parameters + ---------- + paths: A list of paths to walk and resolve to certificate paths. + + Returns + ------- + The list of paths to certificates + """ + + # depth first search certificate paths + cert_paths: Set[Path] = set() + visit: List[Path] = list(paths) + while visit: + path = visit.pop() + if not path.exists(): + continue + # this level contains a certificate, abort depth search + if list(path.glob("*.asc")): + cert_paths.add(path) + continue + visit.extend([path for path in path.iterdir() if path.is_dir()]) + return cert_paths + + +def transform_username_to_keyring_path(keyring_dir: Path, paths: List[Path]) -> None: + """Mutates the input sources by transforming passed usernames to keyring paths + + Parameters + ---------- + keyring_dir: The directory underneath the username needs to exist + paths: A list of paths to mutate and replace usernames to keyring paths + """ + for index, source in enumerate(paths): + if source.exists(): + continue + packager_source = keyring_dir / source.name + if not packager_source.exists(): + continue + paths[index] = packager_source + + +def transform_fingerprint_to_keyring_path(keyring_root: Path, paths: List[Path]) -> None: + """Mutates the input sources by transforming passed fingerprints to keyring paths + + Parameters + ---------- + keyring_root: The keyring root directory to look up fingerprints in + paths: A list of paths to mutate and replace fingerprints to keyring paths + """ + for index, source in enumerate(paths): + if source.exists(): + continue + if not is_pgp_fingerprint(source.name): + continue + fingerprint_paths = list(keyring_root.glob(f"*/*/*{source.name}")) + if not fingerprint_paths: + continue + paths[index] = fingerprint_paths[0].parent + + +# TODO: simplify to lower complexity +def convert_certificate( # noqa: ignore=C901 + working_dir: Path, + certificate: Path, + keyring_dir: Path, + name_override: Optional[Username] = None, + fingerprint_filter: Optional[Set[Fingerprint]] = None, +) -> Path: + """Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets + + The output directory structure is created per user. The username is derived from the certificate via + `derive_username_from_fingerprint` or overridden via `name_override`. + Below the username directory a directory tree describes the public keys components split up into certifications + and revocations, as well as per subkey and per uid certifications and revocations. + + Parameters + ---------- + working_dir: The path of the working directory below which to create split certificates + certificate: The path to a public key certificate + keyring_dir: The path of the keyring used to try to derive the username from the public key fingerprint + name_override: An optional string to override the username in the to be created output directory structure + fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with + + Raises + ------ + Exception: If required PGP packets are not found + + Returns + ------- + The path of the user_dir (which is located below working_dir) + """ + + # root packets + certificate_fingerprint: Optional[Fingerprint] = None + pubkey: Optional[Path] = None + # TODO: direct key certifications are not yet selecting the latest sig, owner may have multiple + # TODO: direct key certifications are not yet single packet per file + direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list) + direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) + + # subkey packets + subkeys: Dict[Fingerprint, Path] = {} + subkey_bindings: Dict[Fingerprint, List[Path]] = defaultdict(list) + subkey_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list) + + # uid packets + uids: Dict[Uid, Path] = {} + certifications: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) + revocations: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list)) + + # intermediate variables + current_packet_mode: Optional[str] = None + current_packet_fingerprint: Optional[Fingerprint] = None + current_packet_uid: Optional[Uid] = None + + # XXX: PrimaryKeyBinding + + # TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean + + debug(f"Processing certificate {certificate}") + + for packet in packet_split(working_dir=working_dir, certificate=certificate): + debug(f"Processing packet {packet.name}") + if packet.name.endswith("--PublicKey"): + current_packet_mode = "pubkey" + current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint")) + current_packet_uid = None + + certificate_fingerprint = current_packet_fingerprint + pubkey = packet + elif packet.name.endswith("--UserID"): + current_packet_mode = "uid" + current_packet_fingerprint = None + current_packet_uid = simplify_user_id(Uid(packet_dump_field(packet, "Value"))) + + uids[current_packet_uid] = packet + elif packet.name.endswith("--PublicSubkey"): + current_packet_mode = "subkey" + current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint")) + current_packet_uid = None + + subkeys[current_packet_fingerprint] = packet + elif packet.name.endswith("--Signature"): + if not certificate_fingerprint: + raise Exception('missing certificate fingerprint for "{packet.name}"') + + issuer: Fingerprint = Fingerprint(packet_dump_field(packet, "Issuer")) + signature_type = packet_dump_field(packet, "Type") + + if current_packet_mode == "pubkey": + if not current_packet_fingerprint: + raise Exception('missing current packet fingerprint for "{packet.name}"') + + if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer): + direct_revocations[issuer].append(packet) + elif signature_type in ["DirectKey", "GenericCertification"]: + direct_sigs[issuer].append(packet) + else: + raise Exception(f"unknown signature type: {signature_type}") + elif current_packet_mode == "uid": + if not current_packet_uid: + raise Exception('missing current packet uid for "{packet.name}"') + + if signature_type == "CertificationRevocation": + revocations[current_packet_uid][issuer].append(packet) + elif signature_type.endswith("Certification"): + if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]): + debug(f"The certification by issuer {issuer} is appended as it is found in the filter.") + certifications[current_packet_uid][issuer].append(packet) + else: + debug(f"The certification by issuer {issuer} is not appended because it is not in the filter") + else: + raise Exception(f"unknown signature type: {signature_type}") + elif current_packet_mode == "subkey": + if not current_packet_fingerprint: + raise Exception('missing current packet fingerprint for "{packet.name}"') + + if signature_type == "SubkeyBinding": + subkey_bindings[current_packet_fingerprint].append(packet) + elif signature_type == "SubkeyRevocation": + subkey_revocations[certificate_fingerprint].append(packet) + else: + raise Exception(f"unknown signature type: {signature_type}") + else: + raise Exception(f'unknown signature root for "{packet.name}"') + else: + raise Exception(f'unknown packet type "{packet.name}"') + + if not certificate_fingerprint: + raise Exception("missing certificate fingerprint") + + if not pubkey: + raise Exception("missing certificate public-key") + + name_override = ( + name_override + or derive_username_from_fingerprint(keyring_dir=keyring_dir, certificate_fingerprint=certificate_fingerprint) + or Username(certificate.stem) + ) + + user_dir = working_dir / name_override + key_dir = user_dir / certificate_fingerprint + key_dir.mkdir(parents=True, exist_ok=True) + + persist_public_key( + certificate_fingerprint=certificate_fingerprint, + pubkey=pubkey, + key_dir=key_dir, + ) + + persist_direct_key_certifications( + direct_key_certifications=direct_sigs, + key_dir=key_dir, + ) + + persist_direct_key_revocations( + direct_key_revocations=direct_revocations, + key_dir=key_dir, + ) + + persist_subkeys( + key_dir=key_dir, + subkeys=subkeys, + ) + + persist_subkey_bindings( + key_dir=key_dir, + subkey_bindings=subkey_bindings, + ) + + persist_subkey_revocations( + key_dir=key_dir, + subkey_revocations=subkey_revocations, + ) + + persist_uids( + key_dir=key_dir, + uids=uids, + ) + + persist_uid_certifications( + certifications=certifications, + key_dir=key_dir, + ) + + persist_uid_revocations( + revocations=revocations, + key_dir=key_dir, + ) + + return user_dir + + +def persist_public_key( + certificate_fingerprint: Fingerprint, + pubkey: Path, + key_dir: Path, +) -> None: + """Persist the Public-Key packet + + Parameters + ---------- + certificate_fingerprint: The unique fingerprint of the public key + pubkey: The path to the public key of the root key + key_dir: The root directory below which the basic key material is persisted + """ + + packets: List[Path] = [pubkey] + output_file = key_dir / f"{certificate_fingerprint}.asc" + output_file.parent.mkdir(parents=True, exist_ok=True) + debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") + packet_join(packets, output_file, force=True) + + +def persist_uids( + key_dir: Path, + uids: Dict[Uid, Path], +) -> None: + """Persist the User IDs that belong to a PublicKey + + The User ID material consists of a single User ID Packet. + The files are written to a UID specific directory and file below key_dir/uid. + + Parameters + ---------- + key_dir: The root directory below which the basic key material is persisted + uids: The User IDs of a Public-Key (the root key) + """ + + for uid, uid_packet in uids.items(): + output_file = key_dir / "uid" / uid / f"{uid}.asc" + output_file.parent.mkdir(parents=True, exist_ok=True) + debug(f"Writing file {output_file} from {uid_packet}") + packet_join(packets=[uid_packet], output=output_file, force=True) + + +def persist_subkeys( + key_dir: Path, + subkeys: Dict[Fingerprint, Path], +) -> None: + """Persist all Public-Subkeys of a root key file to file(s) + + Parameters + ---------- + key_dir: The root directory below which the basic key material is persisted + subkeys: The PublicSubkeys of a key + """ + + for fingerprint, subkey in subkeys.items(): + output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc" + output_file.parent.mkdir(parents=True, exist_ok=True) + debug(f"Writing file {output_file} from {str(subkey)}") + packet_join(packets=[subkey], output=output_file, force=True) + + +def persist_subkey_bindings( + key_dir: Path, + subkey_bindings: Dict[Fingerprint, List[Path]], +) -> None: + """Persist all SubkeyBinding of a root key file to file(s) + + Parameters + ---------- + key_dir: The root directory below which the basic key material is persisted + subkey_bindings: The SubkeyBinding signatures of a Public-Subkey + """ + + for fingerprint, bindings in subkey_bindings.items(): + subkey_binding = latest_certification(bindings) + issuer = packet_dump_field(subkey_binding, "Issuer") + output_file = key_dir / "subkey" / fingerprint / "certification" / f"{issuer}.asc" + output_file.parent.mkdir(parents=True, exist_ok=True) + debug(f"Writing file {output_file} from {str(subkey_binding)}") + packet_join(packets=[subkey_binding], output=output_file, force=True) + + +def persist_subkey_revocations( + key_dir: Path, + subkey_revocations: Dict[Fingerprint, List[Path]], +) -> None: + """Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s) + + Parameters + ---------- + key_dir: The root directory below which the basic key material is persisted + subkey_revocations: The SubkeyRevocations of PublicSubkeys of a key + """ + + for fingerprint, revocations in subkey_revocations.items(): + revocation = latest_certification(revocations) + issuer = packet_dump_field(revocation, "Issuer") + output_file = key_dir / "subkey" / fingerprint / "revocation" / f"{issuer}.asc" + output_file.parent.mkdir(parents=True, exist_ok=True) + debug(f"Writing file {output_file} from {revocation}") + packet_join(packets=[revocation], output=output_file, force=True) + + +def persist_direct_key_certifications( + direct_key_certifications: Dict[Fingerprint, List[Path]], + key_dir: Path, +) -> None: + """Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to + file(s) + + Parameters + ---------- + direct_key_certifications: The direct key certifications to write to file + key_dir: The root directory below which the Directkeys are persisted + """ + + for issuer, certifications in direct_key_certifications.items(): + output_file = key_dir / "certification" / f"{issuer}.asc" + output_file.parent.mkdir(parents=True, exist_ok=True) + debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") + packet_join(packets=certifications, output=output_file, force=True) + + +def persist_direct_key_revocations( + direct_key_revocations: Dict[Fingerprint, List[Path]], + key_dir: Path, +) -> None: + """Persist the revocations directly on a root key (such as KeyRevocation) to file(s) + + Parameters + ---------- + direct_key_revocations: The direct key revocations to write to file + key_dir: The root directory below which the Directkeys are persisted + """ + + for issuer, certifications in direct_key_revocations.items(): + output_file = key_dir / "revocation" / f"{issuer}.asc" + output_file.parent.mkdir(parents=True, exist_ok=True) + debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") + packet_join(packets=certifications, output=output_file, force=True) + + +def persist_uid_certifications( + certifications: Dict[Uid, Dict[Fingerprint, List[Path]]], + key_dir: Path, +) -> None: + """Persist the certifications of a root key to file(s) + + The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and + PositiveCertifications for all User IDs of the given root key. + All certifications are persisted in per User ID certification directories below key_dir. + + Parameters + ---------- + certifications: The certifications to write to file + key_dir: The root directory below which certifications are persisted + """ + + for uid, uid_certifications in certifications.items(): + for issuer, issuer_certifications in uid_certifications.items(): + certification_dir = key_dir / "uid" / uid / "certification" + certification_dir.mkdir(parents=True, exist_ok=True) + certification = latest_certification(issuer_certifications) + output_file = certification_dir / f"{issuer}.asc" + debug(f"Writing file {output_file} from {certification}") + packet_join(packets=[certification], output=output_file, force=True) + + +def persist_uid_revocations( + revocations: Dict[Uid, Dict[Fingerprint, List[Path]]], + key_dir: Path, +) -> None: + """Persist the revocations of a root key to file(s) + + The revocations include all CertificationRevocations for all User IDs of the given root key. + All revocations are persisted in per User ID 'revocation' directories below key_dir. + + Parameters + ---------- + revocations: The revocations to write to file + key_dir: The root directory below which revocations will be persisted + """ + + for uid, uid_revocations in revocations.items(): + for issuer, issuer_revocations in uid_revocations.items(): + revocation_dir = key_dir / "uid" / uid / "revocation" + revocation_dir.mkdir(parents=True, exist_ok=True) + revocation = latest_certification(issuer_revocations) + output_file = revocation_dir / f"{issuer}.asc" + debug(f"Writing file {output_file} from {revocation}") + packet_join(packets=[revocation], output=output_file, force=True) + + +def simplify_user_id(user_id: Uid) -> Uid: + """Simplify the User ID string to contain more filesystem friendly characters + + Parameters + ---------- + user_id: A User ID string (e.g. 'Foobar McFooface ') + + Returns + ------- + The simplified representation of user_id + """ + + user_id_str: str = user_id.replace("@", "_at_") + user_id_str = sub("[<>]", "", user_id_str) + user_id_str = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id_str) + return Uid(user_id_str) + + +def derive_username_from_fingerprint(keyring_dir: Path, certificate_fingerprint: Fingerprint) -> Optional[Username]: + """Attempt to derive the username of a public key fingerprint from a keyring directory + + Parameters + ---------- + keyring_dir: The directory in which to look up a username + certificate_fingerprint: The public key fingerprint to derive the username from + + Raises + ------ + Exception: If more than one username is found (a public key can only belong to one individual) + + Returns + ------- + A string representing the username a public key certificate belongs to, None otherwise + """ + + matches = list(keyring_dir.glob(f"*/*{certificate_fingerprint}")) + + if len(matches) > 1: + raise Exception( + f"More than one username found in {keyring_dir} when probing for fingerprint '{certificate_fingerprint}': " + f"{matches}" + ) + elif not matches: + debug(f"Can not derive username from target directory for fingerprint {certificate_fingerprint}") + return None + else: + username = matches[0].parent.stem + debug( + f"Successfully derived username '{username}' from target directory for fingerprint " + f"{certificate_fingerprint}" + ) + return Username(username) + + +def convert( + working_dir: Path, + keyring_root: Path, + source: Iterable[Path], + target_dir: Path, + name_override: Optional[Username] = None, +) -> Path: + """Convert a path containing PGP certificate material to a decomposed directory structure + + Any input is first split by `keyring_split()` into individual certificates. + + Parameters + ---------- + working_dir: A directory to use for temporary files + keyring_root: The keyring root directory to look up accepted fingerprints for certifications + source: A path to a file or directory to decompose + target_dir: A directory path to write the new directory structure to + name_override: An optional username override for the call to `convert_certificate()` + + Returns + ------- + The directory that contains the resulting directory structure (target_dir) + """ + + directories: List[Path] = [] + keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source))) + + fingerprint_filter = set( + get_fingerprints( + working_dir=working_dir, + sources=source, + paths=[keyring_root], + ).keys() + ) + + for key in keys: + for cert in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True): + directories.append( + convert_certificate( + working_dir=working_dir, + certificate=cert, + keyring_dir=target_dir, + name_override=name_override, + fingerprint_filter=fingerprint_filter, + ) + ) + + for path in directories: + (target_dir / path.name).mkdir(parents=True, exist_ok=True) + copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True) + + return target_dir + + +def get_trusted_and_revoked_certs(certs: List[Path]) -> Tuple[List[Fingerprint], List[Fingerprint]]: + """Get the fingerprints of all trusted and all self revoked public keys in a directory + + Parameters + ---------- + certs: The certificates to trust + + Returns + ------- + A tuple with the first item containing the fingerprints of all public keys and the second item containing the + fingerprints of all self-revoked public keys + """ + + all_certs: List[Fingerprint] = [] + revoked_certs: List[Fingerprint] = [] + + # TODO: what about direct key revocations/signatures? + + debug(f"Retrieving trusted and self-revoked certificates from {[str(cert_dir) for cert_dir in certs]}") + + for cert_dir in sorted(get_cert_paths(certs)): + cert_fingerprint = Fingerprint(cert_dir.stem) + all_certs.append(cert_fingerprint) + for revocation_cert in cert_dir.glob("revocation/*.asc"): + if cert_fingerprint.endswith(revocation_cert.stem): + debug(f"Revoking {cert_fingerprint} due to self-revocation") + revoked_certs.append(cert_fingerprint) + + trusted_keys = [cert for cert in all_certs if cert not in revoked_certs] + + return trusted_keys, revoked_certs + + +def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]: + """Export ownertrust from a set of keys and return the trusted and revoked fingerprints + + The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all + non-revoked keys as fully trusted. + The exported file is used by pacman-key when importing a keyring (see + https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT). + + Parameters + ---------- + certs: The certificates to trust + output: The file path to write to + """ + + trusted_certs, revoked_certs = get_trusted_and_revoked_certs(certs=certs) + + with open(file=output, mode="w") as trusted_certs_file: + for cert in sorted(set(trusted_certs)): + debug(f"Writing {cert} to {output}") + trusted_certs_file.write(f"{cert}:4:\n") + + return trusted_certs, revoked_certs + + +def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None: + """Export the PGP revoked status from a set of keys + + The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations + by any main key exist. + The exported file is used by pacman-key when importing a keyring (see + https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT). + + Parameters + ---------- + certs: A list of directories with keys to check for their revocation status + main_keys: A list of strings representing the fingerprints of (current and/or revoked) main keys + output: The file path to write to + min_revoker: The minimum amount of revocation certificates on a User ID from any main key to deem a public key as + revoked + """ + + trusted_certs, revoked_certs = get_trusted_and_revoked_certs(certs=certs) + + debug(f"Retrieving certificates revoked by main keys from {[str(cert_dir) for cert_dir in certs]}") + foreign_revocations: Dict[Fingerprint, Set[Fingerprint]] = defaultdict(set) + for cert_dir in sorted(get_cert_paths(certs)): + fingerprint = Fingerprint(cert_dir.name) + debug(f"Inspecting public key {fingerprint}") + for revocation_cert in cert_dir.glob("uid/*/revocation/*.asc"): + revocation_fingerprint = Fingerprint(revocation_cert.stem) + foreign_revocations[fingerprint].update( + [revocation_fingerprint for main_key in main_keys if main_key.endswith(revocation_fingerprint)] + ) + + # TODO: find a better (less naive) approach, as this would also match on public certificates, + # where some UIDs are signed and others are revoked + if len(foreign_revocations[fingerprint]) >= min_revoker: + debug( + f"Revoking {cert_dir.name} due to {set(foreign_revocations[fingerprint])} " "being main key revocations" + ) + revoked_certs.append(fingerprint) + + with open(file=output, mode="w") as trusted_certs_file: + for cert in sorted(set(revoked_certs)): + debug(f"Writing {cert} to {output}") + trusted_certs_file.write(f"{cert}\n") + + +def get_fingerprints_from_keyring_files(working_dir: Path, source: Iterable[Path]) -> Dict[Fingerprint, Username]: + """Get all fingerprints of PGP public keys from import file(s) + + Parameters + ---------- + working_dir: A directory to use for temporary files + source: The path to a source file or directory containing keyrings + + Returns + ------- + A dict of all fingerprints and their usernames of PGP public keys below path + """ + + fingerprints: Dict[Fingerprint, Username] = {} + keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source))) + + for key in keys: + for certificate in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True): + for packet in packet_split(working_dir=working_dir, certificate=certificate): + if packet.name.endswith("--PublicKey"): + fingerprints[Fingerprint(packet_dump_field(packet, "Fingerprint"))] = Username(certificate.stem) + + debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}") + return fingerprints + + +def get_fingerprints_from_certificate_directory( + paths: List[Path], prefix: str = "", postfix: str = "" +) -> Dict[Fingerprint, Username]: + """Get all fingerprints of PGP public keys from decomposed directory structures + + Parameters + ---------- + paths: The path to a decomposed directory structure + prefix: Prefix to add to each username + postfix: Postfix to add to each username + + Returns + ------- + A dict of all fingerprints and their usernames of PGP public keys below path + """ + + fingerprints: Dict[Fingerprint, Username] = {} + for cert in sorted(get_cert_paths(paths)): + fingerprints[Fingerprint(cert.name)] = Username(f"{prefix}{cert.parent.name}{postfix}") + + debug(f"Fingerprints of PGP public keys in {paths}: {fingerprints}") + return fingerprints + + +def get_fingerprints(working_dir: Path, sources: Iterable[Path], paths: List[Path]) -> Dict[Fingerprint, Username]: + """Get the fingerprints of PGP public keys from input paths and decomposed directory structures + + Parameters + ---------- + working_dir: A directory to use for temporary files + sources: A list of directories or files from which to read PGP keyring information + paths: A list of paths that identify decomposed PGP data in directory structures + + Returns + ------- + A dict of all fingerprints and their usernames of PGP public keys below path + """ + + fingerprints: Dict[Fingerprint, Username] = {} + + fingerprints.update( + get_fingerprints_from_keyring_files( + working_dir=working_dir, + source=sources, + ) + ) + + fingerprints.update(get_fingerprints_from_certificate_directory(paths=paths)) + + return fingerprints + + +def get_packets_from_path(path: Path) -> List[Path]: + """Collects packets from one level by appending the root, certifications and revocations. + + Parameters + ---------- + path: Filesystem path used to collect the packets from + + Returns + ------- + A list of packets ordered by root, certification, revocation + """ + if not path.exists(): + return [] + + packets: List[Path] = [] + packets += sorted(path.glob("*.asc")) + certifications = path / "certification" + if certifications.exists(): + packets += sorted(certifications.glob("*.asc")) + revocations = path / "revocation" + if revocations.exists(): + packets += sorted(revocations.glob("*.asc")) + return packets + + +def get_packets_from_listing(path: Path) -> List[Path]: + """Collects packets from a listing of directories holding one level each by calling `get_get_packets_from_path`. + + Parameters + ---------- + path: Filesystem path used as listing to collect the packets from + + Returns + ------- + A list of packets ordered by root, certification, revocation for each level + """ + if not path.exists(): + return [] + + packets: List[Path] = [] + for sub_path in sorted(path.iterdir()): + packets += get_packets_from_path(sub_path) + return packets + + +def export( + working_dir: Path, + keyring_root: Path, + sources: Optional[List[Path]] = None, + output: Optional[Path] = None, +) -> str: + """Export all provided PGP packet files to a single output file + + If sources contains directories, any .asc files below them are considered. + + Parameters + ---------- + working_dir: A directory to use for temporary files + keyring_root: The keyring root directory to look up username shorthand sources + sources: A list of username, fingerprint or directories from which to read PGP packet information + (defaults to `keyring_root`) + output: An output file that all PGP packet data is written to, return the result instead if None + + Returns + ------- + The result if no output file has been used + """ + + if not sources: + sources = [keyring_root] + + # transform shorthand paths to actual keyring paths + transform_username_to_keyring_path(keyring_dir=keyring_root / "packager", paths=sources) + transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources) + + temp_dir = Path(mkdtemp(dir=working_dir, prefix="arch-keyringctl-export-join-")).absolute() + cert_paths: Set[Path] = get_cert_paths(sources) + certificates: List[Path] = [] + + for cert_dir in sorted(cert_paths): + packets: List[Path] = [] + packets += get_packets_from_path(cert_dir) + packets += get_packets_from_listing(cert_dir / "subkey") + packets += get_packets_from_listing(cert_dir / "uid") + + output_path = temp_dir / f"{cert_dir.name}.asc" + debug(f"Joining {cert_dir} in {output_path}") + packet_join( + packets=packets, + output=output_path, + force=True, + ) + certificates.append(output_path) + + return keyring_merge(certificates, output, force=True) + + +def build( + working_dir: Path, + keyring_root: Path, + target_dir: Path, +) -> None: + """Build keyring PGP artifacts alongside ownertrust and revoked status files + + Parameters + ---------- + working_dir: A directory to use for temporary files + keyring_root: The keyring root directory to build the artifacts from + target_dir: Output directory that all artifacts are written to + """ + + target_dir.mkdir(parents=True, exist_ok=True) + + keyring: Path = target_dir / Path("archlinux.gpg") + export(working_dir=working_dir, keyring_root=keyring_root, output=keyring) + + [trusted_main_keys, revoked_main_keys] = export_ownertrust( + certs=[keyring_root / "main"], + output=target_dir / "archlinux-trusted", + ) + export_revoked( + certs=[keyring_root], + main_keys=trusted_main_keys + revoked_main_keys, + output=target_dir / "archlinux-revoked", + ) + + +def list_keyring(keyring_root: Path, sources: Optional[List[Path]] = None, main_keys: bool = False) -> None: + """List certificates in the keyring + + If sources contains directories, all certificate below them are considered. + + Parameters + ---------- + keyring_root: The keyring root directory to look up username shorthand sources + sources: A list of username, fingerprint or directories from which to read PGP packet information + (defaults to `keyring_root`) + main_keys: List main keys instead of packager keys (defaults to False) + """ + + keyring_dir = keyring_root / ("main" if main_keys else "packager") + + if not sources: + sources = list(sorted(keyring_dir.iterdir(), key=lambda path: path.name.casefold())) + + # transform shorthand paths to actual keyring paths + transform_username_to_keyring_path(keyring_dir=keyring_dir, paths=sources) + transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources) + + username_length = max([len(source.name) for source in sources]) + + for user_path in sources: + if is_pgp_fingerprint(user_path.name): + user_path = user_path.parent + certificates = [cert.name for cert in user_path.iterdir()] + print(f"{user_path.name:<{username_length}} {' '.join(certificates)}") + + +def inspect_keyring(working_dir: Path, keyring_root: Path, sources: Optional[List[Path]]) -> str: + """Inspect certificates in the keyring and pretty print the data + + If sources contains directories, all certificate below them are considered. + + Parameters + ---------- + working_dir: A directory to use for temporary files + keyring_root: The keyring root directory to look up username shorthand sources + sources: A list of username, fingerprint or directories from which to read PGP packet information + (defaults to `keyring_root`) + + Returns + ------- + The result of the inspect + """ + + if not sources: + sources = [keyring_root] + + # transform shorthand paths to actual keyring paths + transform_username_to_keyring_path(keyring_dir=keyring_root / "packager", paths=sources) + transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources) + + keyring = Path(mkstemp(dir=working_dir, prefix="packet-", suffix=".asc")[1]).absolute() + export(working_dir=working_dir, keyring_root=keyring_root, sources=sources, output=keyring) + + fingerprints: Dict[Fingerprint, Username] = get_fingerprints_from_certificate_directory( + paths=[keyring_root / "packager"] + ) | get_fingerprints_from_certificate_directory(paths=[keyring_root / "main"], postfix=" (main)") + + return inspect( + packet=keyring, + certifications=True, + fingerprints=fingerprints, + ) diff --git a/libkeyringctl/sequoia.py b/libkeyringctl/sequoia.py new file mode 100644 index 0000000..a7a68ff --- /dev/null +++ b/libkeyringctl/sequoia.py @@ -0,0 +1,224 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +from collections.abc import Iterable +from datetime import datetime +from functools import reduce +from pathlib import Path +from re import sub +from tempfile import mkdtemp +from typing import Dict +from typing import List +from typing import Optional + +from .types import Fingerprint +from .types import Username +from .util import cwd +from .util import natural_sort_path +from .util import system + + +def keyring_split(working_dir: Path, keyring: Path, preserve_filename: bool = False) -> Iterable[Path]: + """Split a file containing a PGP keyring into separate certificate files + + The original keyring filename is preserved if the split only yields a single certificate. + If preserve_filename is True, all keyrings are placed into separate directories while preserving + the filename. + + The file is split using sq. + + Parameters + ---------- + working_dir: The path of the working directory below which to create the output files + keyring: The path of a file containing a PGP keyring + preserve_filename: If True, all keyrings are placed into separate directories while preserving the filename + + Returns + ------- + An iterable over the naturally sorted list of certificate files derived from a keyring + """ + + keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute() + + with cwd(keyring_dir): + system(["sq", "keyring", "split", "--prefix", "", str(keyring)]) + + keyrings: List[Path] = list(natural_sort_path(keyring_dir.iterdir())) + + if 1 == len(keyrings) or preserve_filename: + for index, key in enumerate(keyrings): + keyring_sub_dir = Path(mkdtemp(dir=keyring_dir, prefix=f"{keyring.name}-")).absolute() + keyrings[index] = key.rename(keyring_sub_dir / keyring.name) + + return keyrings + + +def keyring_merge(certificates: List[Path], output: Optional[Path] = None, force: bool = False) -> str: + """Merge multiple certificates into a keyring + + Parameters + ---------- + certificates: List of paths to certificates to merge into a keyring + output: Path to a file which the keyring is written, return the result instead if None + force: Whether to force overwriting existing files (defaults to False) + + Returns + ------- + The result if no output file has been used + """ + + cmd = ["sq", "keyring", "merge"] + if force: + cmd.insert(1, "--force") + if output: + cmd += ["--output", str(output)] + cmd += [str(cert) for cert in sorted(certificates)] + + return system(cmd) + + +def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]: + """Split a file containing a PGP certificate into separate packet files + + The files are split using sq + + Parameters + ---------- + working_dir: The path of the working directory below which to create the output files + certificate: The absolute path of a file containing one PGP certificate + + Returns + ------- + An iterable over the naturally sorted list of packet files derived from certificate + """ + + packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute() + + with cwd(packet_dir): + system(["sq", "packet", "split", "--prefix", "", str(certificate)]) + return natural_sort_path(packet_dir.iterdir()) + + +def packet_join(packets: List[Path], output: Optional[Path] = None, force: bool = False) -> str: + """Join PGP packet data in files to a single output file + + Parameters + ---------- + packets: A list of paths to files that contain PGP packet data + output: Path to a file to which all PGP packet data is written, return the result instead if None + force: Whether to force overwriting existing files (defaults to False) + + Returns + ------- + The result if no output file has been used + """ + + cmd = ["sq", "packet", "join"] + if force: + cmd.insert(1, "--force") + packets_str = list(map(lambda path: str(path), packets)) + cmd.extend(packets_str) + cmd.extend(["--output", str(output)]) + return system(cmd) + + +def inspect( + packet: Path, certifications: bool = True, fingerprints: Optional[Dict[Fingerprint, Username]] = None +) -> str: + """Inspect PGP packet data and return the result + + Parameters + ---------- + packet: Path to a file that contain PGP data + certifications: Whether to print third-party certifications + fingerprints: Optional dict of fingerprints to usernames to enrich the output with + + Returns + ------- + The result of the inspection + """ + + cmd = ["sq", "inspect"] + if certifications: + cmd.append("--certifications") + cmd.append(str(packet)) + result: str = system(cmd) + + if fingerprints: + for fingerprint, username in fingerprints.items(): + result = sub(f"{fingerprint}", f"{fingerprint} {username}", result) + result = sub(f" {fingerprint[24:]}", f" {fingerprint[24:]} {username}", result) + + return result + + +def packet_dump(packet: Path) -> str: + """Dump a PGP packet to string + + The `sq packet dump` command is used to retrieve a dump of information from a PGP packet + + Parameters + ---------- + packet: The path to the PGP packet to retrieve the value from + + Returns + ------- + The contents of the packet dump + """ + + return system(["sq", "packet", "dump", str(packet)]) + + +def packet_dump_field(packet: Path, field: str) -> str: + """Retrieve the value of a field from a PGP packet + + Parameters + ---------- + packet: The path to the PGP packet to retrieve the value from + field: The name of the field + + Raises + ------ + Exception: If the field is not found in the PGP packet + + Returns + ------- + The value of the field found in packet + """ + + dump = packet_dump(packet) + lines = [line.strip() for line in dump.splitlines()] + lines = list(filter(lambda line: line.strip().startswith(f"{field}: "), lines)) + if not lines: + raise Exception(f'Packet has no field "{field}"') + return lines[0].split(sep=": ", maxsplit=1)[1] + + +def packet_signature_creation_time(packet: Path) -> datetime: + """Retrieve the signature creation time field as datetime + + Parameters + ---------- + packet: The path to the PGP packet to retrieve the value from + + Returns + ------- + The signature creation time as datetime + """ + return datetime.strptime(packet_dump_field(packet, "Signature creation time"), "%Y-%m-%d %H:%M:%S %Z") + + +def latest_certification(certifications: Iterable[Path]) -> Path: + """Returns the latest certification based on the signature creation time from a list of packets. + + Parameters + ---------- + certifications: List of certification from which to choose the latest from + + Returns + ------- + The latest certification from a list of packets + """ + return reduce( + lambda a, b: a if packet_signature_creation_time(a) > packet_signature_creation_time(b) else b, + certifications, + ) diff --git a/libkeyringctl/types.py b/libkeyringctl/types.py new file mode 100644 index 0000000..14a4646 --- /dev/null +++ b/libkeyringctl/types.py @@ -0,0 +1,7 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import NewType + +Fingerprint = NewType("Fingerprint", str) +Uid = NewType("Uid", str) +Username = NewType("Username", str) diff --git a/libkeyringctl/util.py b/libkeyringctl/util.py new file mode 100644 index 0000000..71ca09e --- /dev/null +++ b/libkeyringctl/util.py @@ -0,0 +1,119 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +from collections.abc import Iterable +from collections.abc import Iterator +from contextlib import contextmanager +from os import chdir +from os import getcwd +from pathlib import Path +from re import split +from subprocess import PIPE +from subprocess import CalledProcessError +from subprocess import check_output +from sys import exit +from sys import stderr +from traceback import print_stack +from typing import List +from typing import Union + + +@contextmanager +def cwd(new_dir: Path) -> Iterator[None]: + """Change to a new current working directory in a context and go back to the previous dir after the context is done + + Parameters + ---------- + new_dir: A path to change to + """ + + previous_dir = getcwd() + chdir(new_dir) + try: + yield + finally: + chdir(previous_dir) + + +def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]: + """Sort an Iterable of Paths naturally + + Parameters + ---------- + _list: An iterable containing paths to be sorted + + Return + ------ + An Iterable of paths that are naturally sorted + """ + + def convert_text_chunk(text: str) -> Union[int, str]: + """Convert input text to int or str + + Parameters + ---------- + text: An input string + + Returns + ------- + Either an integer if text is a digit, else text in lower-case representation + """ + + return int(text) if text.isdigit() else text.lower() + + def alphanum_key(key: Path) -> List[Union[int, str]]: + """Retrieve an alphanumeric key from a Path, that can be used in sorted() + + Parameters + ---------- + key: A path for which to create a key + + Returns + ------- + A list of either int or str objects that may serve as 'key' argument for sorted() + """ + + return [convert_text_chunk(c) for c in split("([0-9]+)", str(key.name))] + + return sorted(_list, key=alphanum_key) + + +def system(cmd: List[str], exit_on_error: bool = False) -> str: + """Execute a command using check_output + + Parameters + ---------- + cmd: A list of strings to be fed to check_output + exit_on_error: Whether to exit the script when encountering an error (defaults to False) + + Raises + ------ + CalledProcessError: If not exit_on_error and `check_output()` encounters an error + + Returns + ------- + The output of cmd + """ + + try: + return check_output(cmd, stderr=PIPE).decode() + except CalledProcessError as e: + stderr.buffer.write(e.stderr) + print_stack() + if exit_on_error: + exit(e.returncode) + raise e + + +def absolute_path(path: str) -> Path: + """Return the absolute path of a given str + + Parameters + ---------- + path: A string representing a path + + Returns + ------- + The absolute path representation of path + """ + + return Path(path).absolute()