#!/usr/bin/env python from argparse import ArgumentParser from collections import defaultdict from os import chdir from os import getcwd from pathlib import Path from shutil import copytree from re import escape from re import split from re import sub from subprocess import CalledProcessError from subprocess import check_output from subprocess import PIPE from sys import exit from sys import stderr from tempfile import TemporaryDirectory from tempfile import mkdtemp from logging import basicConfig from logging import debug from logging import DEBUG from typing import Any from typing import Dict from typing import List from typing import Optional from typing import Iterable from contextlib import contextmanager @contextmanager def cwd(new_dir: Path): previous_dir = getcwd() chdir(new_dir) try: yield finally: chdir(previous_dir) # class Key(): # fingerprint: str = "" # pubkey: Path # uids: List[] # uid_certification: List[] # subkeys: List[] # subkey_certification: List[] # uid_signatures: List[] def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]: def convert(text: str) -> Any: return int(text) if text.isdigit() else text.lower() def alphanum_key(key: Path) -> List[Any]: return [convert(c) for c in split('([0-9]+)', str(key.name))] return sorted(_list, key=alphanum_key) def system(cmd: List[str], exit_on_error: bool = True) -> str: try: return check_output(cmd, stderr=PIPE).decode() except CalledProcessError as e: stderr.buffer.write(e.stderr) if exit_on_error: exit(e.returncode) raise e def convert_certificate(working_dir: Path, certificate: Path, name_override: Optional[str] = None) -> Path: certificate_fingerprint: Optional[str] = None pubkey: Optional[Path] = None direct_sigs: Dict[str, Dict[str, List[Path]]] = {} direct_revocations: Dict[str, Dict[str, List[Path]]] = {} current_packet_mode: Optional[str] = None current_packet_key: Optional[str] = None uids: Dict[str, Path] = {} uid_binding_sigs: Dict[str, Path] = {} subkeys: Dict[str, Dict[str, Path]] = {} subkey_binding_sigs: Dict[str, Dict[str, Path]] = {} subkey_revocations: Dict[str, Dict[str, Path]] = {} certifications: Dict[str, List[Path]] = defaultdict(list) revocations: Dict[str, List[Path]] = defaultdict(list) username = name_override or certificate.name.split(".")[0] def add_packet_to_direct_sigs( direct_sigs: Dict[str, Dict[str, List[Path]]], issuer: str, packet_key: str, packet: Path, ) -> Dict[str, Dict[str, List[Path]]]: """Add a packet to the set of DirectKeys If no key with the given packet_key exists yet, it is created. Parameters ---------- direct_sigs: Dict[str, Dict[str, List[Path]]] The signatures directly on a root key (such as DirectKey or *Certifications without a specific User ID) issuer: str The issuer of the signature packet: Path The path to the packet packet_key: str The key identifying the packet (e.g. its Fingerprint) """ if not direct_sigs.get(packet_key): direct_sigs = direct_sigs | {packet_key: defaultdict(list)} direct_sigs[packet_key][issuer].append(packet) return direct_sigs # XXX: PrimaryKeyBinding # TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean debug(f'Processing certificate {certificate}') for packet in packet_split(working_dir=working_dir, certificate=certificate): debug(f'Processing packet {packet.name}') if packet.name.endswith('--PublicKey'): pubkey = packet certificate_fingerprint = packet_dump_field(packet, 'Fingerprint') current_packet_mode = 'pubkey' current_packet_key = certificate_fingerprint elif packet.name.endswith('--UserID'): value = simplify_user_id(packet_dump_field(packet, 'Value')) current_packet_mode = 'uid' current_packet_key = value uids[value] = packet elif packet.name.endswith('--PublicSubkey'): fingerprint = packet_dump_field(packet, 'Fingerprint') current_packet_mode = 'subkey' current_packet_key = fingerprint if not certificate_fingerprint: raise Exception('missing certificate fingerprint for "{packet.name}"') if not subkeys.get(certificate_fingerprint): subkeys |= {certificate_fingerprint: {fingerprint: packet}} else: subkeys[certificate_fingerprint] |= {fingerprint: packet} elif packet.name.endswith('--Signature'): if not certificate_fingerprint: raise Exception('missing certificate fingerprint for "{packet.name}"') if not current_packet_key: raise Exception('missing current packet key for "{packet.name}"') issuer = packet_dump_field(packet, 'Issuer') signature_type = packet_dump_field(packet, 'Type') if current_packet_mode == 'pubkey': if signature_type == 'KeyRevocation' and certificate_fingerprint.endswith(issuer): direct_revocations = add_packet_to_direct_sigs( direct_sigs=direct_revocations, issuer=issuer, packet_key=current_packet_key, packet=packet, ) elif signature_type in ['DirectKey', 'GenericCertification']: direct_sigs = add_packet_to_direct_sigs( direct_sigs=direct_sigs, issuer=issuer, packet_key=current_packet_key, packet=packet, ) else: raise Exception(f'unknown signature type: {signature_type}') elif current_packet_mode == 'uid': if signature_type == 'CertificationRevocation': revocations[current_packet_key].append(packet) elif signature_type == 'PositiveCertification' and certificate_fingerprint.endswith(issuer): uid_binding_sigs[current_packet_key] = packet elif signature_type.endswith('Certification'): certifications[current_packet_key].append(packet) else: raise Exception(f'unknown signature type: {signature_type}') elif current_packet_mode == 'subkey': if signature_type == 'SubkeyBinding': if not subkey_binding_sigs.get(certificate_fingerprint): subkey_binding_sigs |= {certificate_fingerprint: {fingerprint: packet}} else: subkey_binding_sigs[certificate_fingerprint] |= {fingerprint: packet} elif signature_type == 'SubkeyRevocation': if not subkey_revocations.get(certificate_fingerprint): subkey_revocations |= {certificate_fingerprint: {fingerprint: packet}} else: subkey_revocations[certificate_fingerprint] |= {fingerprint: packet} else: raise Exception(f'unknown signature type: {signature_type}') else: raise Exception(f'unknown signature root for "{packet.name}"') else: raise Exception(f'unknown packet type "{packet.name}"') if not certificate_fingerprint: raise Exception('missing certificate fingerprint') if not pubkey: raise Exception('missing certificate public-key') user_dir = (working_dir / username) key_dir = (user_dir / certificate_fingerprint) key_dir.mkdir(parents=True, exist_ok=True) persist_public_key( certificate_fingerprint=certificate_fingerprint, pubkey=pubkey, key_dir=key_dir, ) persist_uids( key_dir=key_dir, uid_binding_sigs=uid_binding_sigs, uids=uids, ) persist_subkeys( certificate_fingerprint=certificate_fingerprint, key_dir=key_dir, subkeys=subkeys, subkey_binding_sigs=subkey_binding_sigs, ) persist_subkey_revocations( certificate_fingerprint=certificate_fingerprint, key_dir=key_dir, subkey_revocations=subkey_revocations, ) persist_direct_sigs( direct_sigs=direct_sigs, pubkey=pubkey, key_dir=key_dir, ) persist_direct_sigs( direct_sigs=direct_revocations, pubkey=pubkey, key_dir=key_dir, sig_type="revocation", ) persist_certifications( certifications=certifications, pubkey=pubkey, key_dir=key_dir, uid_binding_sig=uid_binding_sigs, uids=uids, ) persist_revocations( pubkey=pubkey, revocations=revocations, key_dir=key_dir, uid_binding_sig=uid_binding_sigs, uids=uids, ) return user_dir def persist_public_key( certificate_fingerprint: str, pubkey: Path, key_dir: Path, ) -> None: """Persist the Public-Key packet Parameters ---------- certificate_fingerprint: str The unique fingerprint of the public key pubkey: Path The path to the public key of the root key key_dir: Path The root directory below which the basic key material is persisted """ packets: List[Path] = [pubkey] output_file = key_dir / f'{certificate_fingerprint}.asc' debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}') packet_join(packets, output_file) def persist_uids( key_dir: Path, uid_binding_sigs: Dict[str, Path], uids: Dict[str, Path], ) -> None: """Persist the User IDs that belong to a PublicKey The User ID material consists of PublicSubkeys and their SubkeyBindings. The files are written to a UID specific directory and file below key_dir/uids. Parameters ---------- key_dir: Path The root directory below which the basic key material is persisted uid_binding_sigs: Dict[str, Path] The PositiveCertifications of a User ID and Public-Key packet uids: Dict[str, Path] The User IDs of a Public-Key (the root key) """ for key in uid_binding_sigs.keys(): packets = [uids[key], uid_binding_sigs[key]] output_file = key_dir / f'uids/{key}/{key}.asc' (key_dir / Path(f'uids/{key}')).mkdir(parents=True, exist_ok=True) debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}') packet_join(packets, output_file) def persist_subkeys( certificate_fingerprint: str, key_dir: Path, subkeys: Dict[str, Dict[str, Path]], subkey_binding_sigs: Dict[str, Dict[str, Path]], ) -> None: """Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s) Parameters ---------- certificate_fingerprint: str The unique fingerprint of the public key key_dir: Path The root directory below which the basic key material is persisted subkeys: Dict[str, Dict[str, Path]] The PublicSubkeys of a key subkey_binding_sigs: Dict[str, Dict[str, Path]] The SubkeyBinding signatures of a Public-Key (the root key) """ if subkeys.get(certificate_fingerprint): for signature, subkey in subkeys[certificate_fingerprint].items(): packets: List[Path] = [] packets.extend([subkey, subkey_binding_sigs[certificate_fingerprint][signature]]) output_file = key_dir / Path(f'subkeys/{signature}/{signature}.asc') debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}') (key_dir / Path(f"subkeys/{signature}")).mkdir(parents=True, exist_ok=True) packet_join(packets=packets, output=output_file) def persist_subkey_revocations( certificate_fingerprint: str, key_dir: Path, subkey_revocations: Dict[str, Dict[str, Path]], ) -> None: """Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s) Parameters ---------- certificate_fingerprint: str The unique fingerprint of the public key key_dir: Path The root directory below which the basic key material is persisted subkey_revocations: Dict[str, Dict[str, Path]] The SubkeyRevocations of PublicSubkeys of a key """ if subkey_revocations.get(certificate_fingerprint): for signature, revocation in subkey_revocations[certificate_fingerprint].items(): issuer = packet_dump_field(revocation, 'Issuer') output_file = (key_dir / Path(f'subkeys/{signature}/revocation/{issuer}.asc')) debug(f'Writing file {output_file} from {revocation}') (key_dir / Path(f"subkeys/{signature}/revocation")).mkdir(parents=True, exist_ok=True) packet_join(packets=[revocation], output=output_file) def persist_direct_sigs( direct_sigs: Dict[str, Dict[str, List[Path]]], pubkey: Path, key_dir: Path, sig_type: str = "certification", ) -> None: """Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to file(s) Parameters ---------- certifications: Dict[str, List[Path]] The certifications to write to file pubkey: Path The path to the public key of the root key key_dir: Path The root directory below which the Directkeys are persisted sig_type: str The type of direct certification to persist (defaults to 'certification'). This influences the directory name """ for key, current_certifications in direct_sigs.items(): for issuer, certifications in current_certifications.items(): direct_key_dir = key_dir / sig_type direct_key_dir.mkdir(parents=True, exist_ok=True) packets = [pubkey] + certifications output_file = direct_key_dir / f'{issuer}.asc' debug(f'Writing file {output_file} from {[str(cert) for cert in certifications]}') packet_join(packets, output_file) def persist_certifications( certifications: Dict[str, List[Path]], pubkey: Path, key_dir: Path, uid_binding_sig: Dict[str, Path], uids: Dict[str, Path], ) -> None: """Persist the certifications of a root key to file(s) The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and PositiveCertifications for all User IDs of the given root key. All certifications are persisted in per User ID certification directories below key_dir. Parameters ---------- certifications: Dict[str, List[Path]] The certifications to write to file pubkey: Path The path to the public key of the root key key_dir: Path The root directory below which certifications are persisted uid_binding_sig: Dict[str, Path] The PositiveCertifications of a User ID and Public-Key packet uids: Dict[str, Path] The User IDs of a Public-Key (the root key) """ for key, current_certifications in certifications.items(): for certification in current_certifications: certification_dir = key_dir / Path("uids") / key / 'certification' certification_dir.mkdir(parents=True, exist_ok=True) issuer = packet_dump_field(certification, 'Issuer') packets = [pubkey, uids[key], uid_binding_sig[key], certification] output_file = certification_dir / f'{issuer}.asc' debug(f'Writing file {output_file} from {certification}') packet_join(packets, output_file) def persist_revocations( pubkey: Path, revocations: Dict[str, List[Path]], key_dir: Path, uid_binding_sig: Dict[str, Path], uids: Dict[str, Path], ) -> None: """Persist the revocations of a root key to file(s) The revocations include all CertificationRevocations for all User IDs of the given root key. All revocations are persisted in per User ID 'revocation' directories below key_dir. Parameters ---------- pubkey: Path The path to the public key of the root key revocations: Dict[str, List[Path]] The revocations to write to file key_dir: Path The root directory below which revocations will be persisted uid_binding_sig: Dict[str, Path] The PositiveCertifications of a User ID and Public-Key packet uids: Dict[str, Path] The User IDs of a Public-Key (the root key) """ for key, current_revocations in revocations.items(): for revocation in current_revocations: revocation_dir = key_dir / Path("uids") / key / 'revocation' revocation_dir.mkdir(parents=True, exist_ok=True) issuer = packet_dump_field(revocation, 'Issuer') packets = [pubkey, uids[key]] # Binding sigs only exist for 3rd-party revocations if key in uid_binding_sig: packets.append(uid_binding_sig[key]) packets.append(revocation) output_file = revocation_dir / f'{issuer}.asc' debug(f'Writing file {output_file} from {revocation}') packet_join(packets, output_file) def packet_dump(packet: Path) -> str: return system(['sq', 'packet', 'dump', str(packet)]) def packet_dump_field(packet: Path, field: str) -> str: dump = packet_dump(packet) lines = [line.strip() for line in dump.splitlines()] lines = list(filter(lambda line: line.startswith(f'{field}: '), lines)) if not lines: raise Exception(f'Packet has no field "{field}"') return lines[0].split(maxsplit=1)[1] def sanitize_certificate_file(working_dir: Path, certificate: Path) -> List[Path]: """Sanitize a certificate file, potentially containing several certificates If the input file holds several certificates, they are split into respective files below working_dir and their paths are returned. Else the path to the input certificate file is returned. Parameters ---------- working_dir: Path The path of the working directory below which to create split certificates certificate: Path The absolute path of a file containing one or several PGP certificates Returns ------- List[Path] A list of paths that either contain the input certificate or several split certificates """ begin_cert = "-----BEGIN PGP PUBLIC KEY BLOCK-----" end_cert = "-----END PGP PUBLIC KEY BLOCK-----" certificate_list: List[Path] = [] with open(file=certificate, mode="r") as certificate_file: file = certificate_file.read() if file.count(begin_cert) > 1 and file.count(end_cert) > 1: debug(f"Several public keys detected in file: {certificate}") for split_cert in [f"{cert}{end_cert}\n" for cert in list(filter(None, file.split(f"{end_cert}\n")))]: split_cert_dir = Path(mkdtemp(dir=working_dir)).absolute() split_cert_path = split_cert_dir / certificate.name with open(file=split_cert_path, mode="w") as split_cert_file: split_cert_file.write(split_cert) debug(f"Writing split cert to file: {split_cert_path}") certificate_list.append(split_cert_path) return certificate_list else: return [certificate] def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]: """Split a file containing a PGP certificate into separate packet files The files are split using sq Parameters ---------- working_dir: Path The path of the working directory below which to create the output files certificate: Path The absolute path of a file containing one PGP certificate Returns ------- Iterable[Path] An iterable over the naturally sorted list of packet files derived from certificate """ packet_dir = Path(mkdtemp(dir=working_dir)).absolute() with cwd(packet_dir): system(['sq', 'packet', 'split', '--prefix', '', str(certificate)]) return natural_sort_path(packet_dir.iterdir()) def packet_join(packets: List[Path], output: Path, force: bool = False) -> None: """Join PGP packet data in files to a single output file Parameters ---------- packets: List[Path] A list of paths to files that contain PGP packet data output: Path A file to which all PGP packet data is written force: bool Whether to force the execution of sq (defaults to False) """ cmd = ['sq', 'packet', 'join'] if force: cmd.insert(1, '--force') packets_str = list(map(lambda path: str(path), packets)) cmd.extend(packets_str) cmd.extend(['--output', str(output)]) system(cmd, exit_on_error=False) def simplify_user_id(user_id: str) -> str: user_id = user_id.replace('@', '_at_') user_id = sub('[<>]', '', user_id) user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id) return user_id def convert( working_dir: Path, source: Path, target_dir: Path, name_override: Optional[str] = None, ) -> Path: directories: List[Path] = [] if source.is_dir(): for key in source.iterdir(): for sane_cert in sanitize_certificate_file(working_dir=working_dir, certificate=key): directories.append( convert_certificate(working_dir=working_dir, certificate=sane_cert, name_override=name_override) ) else: for sane_cert in sanitize_certificate_file(working_dir=working_dir, certificate=source): directories.append( convert_certificate(working_dir=working_dir, certificate=sane_cert, name_override=name_override) ) for path in directories: (target_dir / path.name).mkdir(exist_ok=True) copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True) return target_dir def keyring_import(working_dir: Path, source: Path, target_dir: Optional[Path] = None): pass def export_keyring(working_dir: Path, sources: List[Path], output: Path, force: bool) -> None: """Export all provided PGP packet files to a single output file If sources contains directories, any .asc files below them are considered. Parameters ---------- working_dir: Path A directory to use for temporary files sources: List[Path] A list of directories or files from which to read PGP packet information output: Path An output file that all PGP packet data is written to force: bool Whether to force the execution of packet_join() """ sources = [source.absolute() for source in sources] cert_dir = Path(mkdtemp(dir=working_dir)).absolute() output = output.absolute() certs: List[Path] = [] debug(f"Creating keyring {output} from {[str(source_dir) for source_dir in sources]}.") for source_number, source in enumerate(sources): if source.is_dir(): for user_number, user_dir in enumerate(sorted(source.iterdir())): if user_dir.is_dir(): for user_cert_number, user_cert_dir in enumerate(sorted(user_dir.iterdir())): if user_cert_dir.is_dir(): cert_path = ( cert_dir / ( f"{str(source_number).zfill(4)}" f"-{str(user_number).zfill(4)}" f"-{str(user_cert_number).zfill(4)}.asc" ) ) debug(f"Joining {user_dir.name}/{user_cert_dir.name} in {cert_path}.") packet_join( packets=sorted(user_cert_dir.glob("**/*.asc")), output=cert_path, force=force, ) certs.append(cert_path) elif source.is_file() and not source.is_symlink(): certs.append(source) cmd = ['sq', 'keyring', 'merge', '-o', str(output)] if force: cmd.insert(1, '--force') cmd += [str(cert) for cert in sorted(certs)] system(cmd, exit_on_error=False) def absolute_path(path: str) -> Path: return Path(path).absolute() if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('-v', '--verbose', action='store_true', help='Causes to print debugging messages about the progress') parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory') parser.add_argument( '-f', '--force', action='store_true', default=False, help='force the execution of subcommands (e.g. overwriting of files)', ) subcommands = parser.add_subparsers(dest="subcommand") convert_parser = subcommands.add_parser( 'convert', help="convert a legacy-style PGP cert or directory containing PGP certs to the new format", ) convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert') convert_parser.add_argument('--target', type=absolute_path, help='target directory') convert_parser.add_argument( '--name', type=str, default=None, help='override the username to use (only useful when targetting a single file)', ) import_parser = subcommands.add_parser('import') import_parser.add_argument('source', type=absolute_path, help='File or directory') import_parser.add_argument('--target', type=absolute_path, help='Target directory') export_parser = subcommands.add_parser( 'export', help="export a directory structure of PGP packet data to a combined file", ) export_parser.add_argument('output', type=absolute_path, help='file to write PGP packet data to') export_parser.add_argument( '-s', '--source', action="append", help='files or directories containing PGP packet data (can be provided multiple times)', required=True, type=absolute_path, ) args = parser.parse_args() if args.verbose: basicConfig(level=DEBUG) # temporary working directory that gets auto cleaned with TemporaryDirectory() as tempdir: working_dir = Path(tempdir) chdir(working_dir) debug(f'Working directory: {working_dir}') if args.subcommand in ["convert", "import"]: if args.target: args.target.mkdir(parents=True, exist_ok=True) target_dir = args.target else: # persistent target directory target_dir = Path(mkdtemp()).absolute() if 'convert' == args.subcommand: print(convert(working_dir, args.source, target_dir)) elif 'import' == args.subcommand: keyring_import(working_dir, args.source, target_dir) elif 'export' == args.subcommand: export_keyring(working_dir=working_dir, sources=args.source, output=args.output, force=args.force) if args.wait: print('Press [ENTER] to continue') input()