From 1cbd360d17236a9354a8142c0ab1512bd01beb04 Mon Sep 17 00:00:00 2001 From: David Runge Date: Sun, 17 Oct 2021 00:50:34 +0200 Subject: [PATCH] keyringctl: Format file keyringctl: Use black to format the file, isort to auto-sort all imports. Remove commented code and (for now) ignore the high complexity in `convert()` so that flake8 can be used. --- keyringctl | 303 ++++++++++++++++++++++++----------------------------- 1 file changed, 136 insertions(+), 167 deletions(-) diff --git a/keyringctl b/keyringctl index 321a3e0..f41cacc 100755 --- a/keyringctl +++ b/keyringctl @@ -1,42 +1,19 @@ #!/usr/bin/env python +# +# SPDX-License-Identifier: GPL-3.0-or-later + from argparse import ArgumentParser - from collections import defaultdict - -from os import chdir -from os import getcwd - -from pathlib import Path - -from shutil import copytree - -from re import escape -from re import split -from re import sub - -from subprocess import CalledProcessError -from subprocess import check_output -from subprocess import PIPE - -from sys import exit -from sys import stderr - -from tempfile import TemporaryDirectory -from tempfile import mkdtemp - -from logging import basicConfig -from logging import debug, error -from logging import DEBUG - -from typing import Dict -from typing import List -from typing import Optional -from typing import Iterable -from typing import Iterator -from typing import Tuple -from typing import Union - from contextlib import contextmanager +from logging import DEBUG, basicConfig, debug, error +from os import chdir, getcwd +from pathlib import Path +from re import escape, split, sub +from shutil import copytree +from subprocess import PIPE, CalledProcessError, check_output +from sys import exit, stderr +from tempfile import TemporaryDirectory, mkdtemp +from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Union @contextmanager @@ -56,15 +33,6 @@ def cwd(new_dir: Path) -> Iterator[None]: finally: chdir(previous_dir) -# class Key(): - # fingerprint: str = "" - # pubkey: Path - # uids: List[] - # uid_certification: List[] - # subkeys: List[] - # subkey_certification: List[] - # uid_signatures: List[] - def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]: """Sort an Iterable of Paths naturally @@ -110,7 +78,7 @@ def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]: A list of either int or str objects that may serve as 'key' argument for sorted() """ - return [convert(c) for c in split('([0-9]+)', str(key.name))] + return [convert(c) for c in split("([0-9]+)", str(key.name))] return sorted(_list, key=alphanum_key) @@ -145,7 +113,12 @@ def system(cmd: List[str], exit_on_error: bool = True) -> str: raise e -def convert_certificate(working_dir: Path, certificate: Path, name_override: Optional[str] = None) -> Path: +# TODO: simplify to lower complexity +def convert_certificate( # noqa: ignore=C901 + working_dir: Path, + certificate: Path, + name_override: Optional[str] = None, +) -> Path: """Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets The output directory structure is created per user. The username is derived from the certificate (or overridden). @@ -219,23 +192,23 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt # TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean - debug(f'Processing certificate {certificate}') + debug(f"Processing certificate {certificate}") for packet in packet_split(working_dir=working_dir, certificate=certificate): - debug(f'Processing packet {packet.name}') - if packet.name.endswith('--PublicKey'): + debug(f"Processing packet {packet.name}") + if packet.name.endswith("--PublicKey"): pubkey = packet - certificate_fingerprint = packet_dump_field(packet, 'Fingerprint') - current_packet_mode = 'pubkey' + certificate_fingerprint = packet_dump_field(packet, "Fingerprint") + current_packet_mode = "pubkey" current_packet_key = certificate_fingerprint - elif packet.name.endswith('--UserID'): - value = simplify_user_id(packet_dump_field(packet, 'Value')) - current_packet_mode = 'uid' + elif packet.name.endswith("--UserID"): + value = simplify_user_id(packet_dump_field(packet, "Value")) + current_packet_mode = "uid" current_packet_key = value uids[value] = packet - elif packet.name.endswith('--PublicSubkey'): - fingerprint = packet_dump_field(packet, 'Fingerprint') - current_packet_mode = 'subkey' + elif packet.name.endswith("--PublicSubkey"): + fingerprint = packet_dump_field(packet, "Fingerprint") + current_packet_mode = "subkey" current_packet_key = fingerprint if not certificate_fingerprint: @@ -246,24 +219,24 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt else: subkeys[certificate_fingerprint] |= {fingerprint: packet} - elif packet.name.endswith('--Signature'): + elif packet.name.endswith("--Signature"): if not certificate_fingerprint: raise Exception('missing certificate fingerprint for "{packet.name}"') if not current_packet_key: raise Exception('missing current packet key for "{packet.name}"') - issuer = packet_dump_field(packet, 'Issuer') - signature_type = packet_dump_field(packet, 'Type') + issuer = packet_dump_field(packet, "Issuer") + signature_type = packet_dump_field(packet, "Type") - if current_packet_mode == 'pubkey': - if signature_type == 'KeyRevocation' and certificate_fingerprint.endswith(issuer): + if current_packet_mode == "pubkey": + if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer): direct_revocations = add_packet_to_direct_sigs( direct_sigs=direct_revocations, issuer=issuer, packet_key=current_packet_key, packet=packet, ) - elif signature_type in ['DirectKey', 'GenericCertification']: + elif signature_type in ["DirectKey", "GenericCertification"]: direct_sigs = add_packet_to_direct_sigs( direct_sigs=direct_sigs, issuer=issuer, @@ -271,42 +244,42 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt packet=packet, ) else: - raise Exception(f'unknown signature type: {signature_type}') - elif current_packet_mode == 'uid': - if signature_type == 'CertificationRevocation': + raise Exception(f"unknown signature type: {signature_type}") + elif current_packet_mode == "uid": + if signature_type == "CertificationRevocation": revocations[current_packet_key].append(packet) - elif signature_type == 'PositiveCertification' and certificate_fingerprint.endswith(issuer): + elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer): uid_binding_sigs[current_packet_key] = packet - elif signature_type.endswith('Certification'): + elif signature_type.endswith("Certification"): certifications[current_packet_key].append(packet) else: - raise Exception(f'unknown signature type: {signature_type}') - elif current_packet_mode == 'subkey': - if signature_type == 'SubkeyBinding': + raise Exception(f"unknown signature type: {signature_type}") + elif current_packet_mode == "subkey": + if signature_type == "SubkeyBinding": if not subkey_binding_sigs.get(certificate_fingerprint): subkey_binding_sigs |= {certificate_fingerprint: {fingerprint: packet}} else: subkey_binding_sigs[certificate_fingerprint] |= {fingerprint: packet} - elif signature_type == 'SubkeyRevocation': + elif signature_type == "SubkeyRevocation": if not subkey_revocations.get(certificate_fingerprint): subkey_revocations |= {certificate_fingerprint: {fingerprint: packet}} else: subkey_revocations[certificate_fingerprint] |= {fingerprint: packet} else: - raise Exception(f'unknown signature type: {signature_type}') + raise Exception(f"unknown signature type: {signature_type}") else: raise Exception(f'unknown signature root for "{packet.name}"') else: raise Exception(f'unknown packet type "{packet.name}"') if not certificate_fingerprint: - raise Exception('missing certificate fingerprint') + raise Exception("missing certificate fingerprint") if not pubkey: - raise Exception('missing certificate public-key') + raise Exception("missing certificate public-key") - user_dir = (working_dir / username) - key_dir = (user_dir / certificate_fingerprint) + user_dir = working_dir / username + key_dir = user_dir / certificate_fingerprint key_dir.mkdir(parents=True, exist_ok=True) persist_public_key( @@ -384,9 +357,9 @@ def persist_public_key( """ packets: List[Path] = [pubkey] - output_file = key_dir / f'{certificate_fingerprint}.asc' + output_file = key_dir / f"{certificate_fingerprint}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}') + debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") packet_join(packets, output_file) @@ -412,9 +385,9 @@ def persist_uids( for key in uid_binding_sigs.keys(): packets = [uids[key], uid_binding_sigs[key]] - output_file = key_dir / 'uid' / key / f'{key}.asc' + output_file = key_dir / "uid" / key / f"{key}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}') + debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") packet_join(packets, output_file) @@ -442,9 +415,9 @@ def persist_subkeys( for signature, subkey in subkeys[certificate_fingerprint].items(): packets: List[Path] = [] packets.extend([subkey, subkey_binding_sigs[certificate_fingerprint][signature]]) - output_file = key_dir / 'subkey' / signature / f'{signature}.asc' + output_file = key_dir / "subkey" / signature / f"{signature}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}') + debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}") packet_join(packets=packets, output=output_file) @@ -467,10 +440,10 @@ def persist_subkey_revocations( if subkey_revocations.get(certificate_fingerprint): for signature, revocation in subkey_revocations[certificate_fingerprint].items(): - issuer = packet_dump_field(revocation, 'Issuer') - output_file = key_dir / 'subkey' / signature / 'revocation' / f'{issuer}.asc' + issuer = packet_dump_field(revocation, "Issuer") + output_file = key_dir / "subkey" / signature / "revocation" / f"{issuer}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f'Writing file {output_file} from {revocation}') + debug(f"Writing file {output_file} from {revocation}") packet_join(packets=[revocation], output=output_file) @@ -498,9 +471,9 @@ def persist_direct_sigs( for key, current_certifications in direct_sigs.items(): for issuer, certifications in current_certifications.items(): packets = [pubkey] + certifications - output_file = key_dir / sig_type / f'{issuer}.asc' + output_file = key_dir / sig_type / f"{issuer}.asc" output_file.parent.mkdir(parents=True, exist_ok=True) - debug(f'Writing file {output_file} from {[str(cert) for cert in certifications]}') + debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}") packet_join(packets, output_file) @@ -533,20 +506,21 @@ def persist_certifications( for key, current_certifications in certifications.items(): for certification in current_certifications: - certification_dir = key_dir / 'uid' / key / 'certification' + certification_dir = key_dir / "uid" / key / "certification" certification_dir.mkdir(parents=True, exist_ok=True) - issuer = packet_dump_field(certification, 'Issuer') + issuer = packet_dump_field(certification, "Issuer") if uids.get(key) and uid_binding_sig.get(key): packets = [pubkey, uids[key], uid_binding_sig[key], certification] - output_file = certification_dir / f'{issuer}.asc' - debug(f'Writing file {output_file} from {certification}') + output_file = certification_dir / f"{issuer}.asc" + debug(f"Writing file {output_file} from {certification}") packet_join(packets, output_file) else: error( f"Public key '{pubkey}' does not provide " f"{'the UID binding signature' if not uid_binding_sig.get(key) else ''} for UID '{key}', " - "so its certifications can not be used!") + "so its certifications can not be used!" + ) def persist_revocations( @@ -577,17 +551,17 @@ def persist_revocations( for key, current_revocations in revocations.items(): for revocation in current_revocations: - revocation_dir = key_dir / 'uid' / key / 'revocation' + revocation_dir = key_dir / "uid" / key / "revocation" revocation_dir.mkdir(parents=True, exist_ok=True) - issuer = packet_dump_field(revocation, 'Issuer') + issuer = packet_dump_field(revocation, "Issuer") packets = [pubkey, uids[key]] # Binding sigs only exist for 3rd-party revocations if key in uid_binding_sig: packets.append(uid_binding_sig[key]) packets.append(revocation) - output_file = revocation_dir / f'{issuer}.asc' - debug(f'Writing file {output_file} from {revocation}') + output_file = revocation_dir / f"{issuer}.asc" + debug(f"Writing file {output_file} from {revocation}") packet_join(packets, output_file) @@ -607,7 +581,7 @@ def packet_dump(packet: Path) -> str: The contents of the packet dump """ - return system(['sq', 'packet', 'dump', str(packet)]) + return system(["sq", "packet", "dump", str(packet)]) def packet_dump_field(packet: Path, field: str) -> str: @@ -633,7 +607,7 @@ def packet_dump_field(packet: Path, field: str) -> str: dump = packet_dump(packet) lines = [line.strip() for line in dump.splitlines()] - lines = list(filter(lambda line: line.startswith(f'{field}: '), lines)) + lines = list(filter(lambda line: line.startswith(f"{field}: "), lines)) if not lines: raise Exception(f'Packet has no field "{field}"') return lines[0].split(maxsplit=1)[1] @@ -657,10 +631,10 @@ def keyring_split(working_dir: Path, keyring: Path) -> Iterable[Path]: An iterable over the naturally sorted list of certificate files derived from a keyring """ - keyring_dir = Path(mkdtemp(dir=working_dir, prefix='keyring-')).absolute() + keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute() with cwd(keyring_dir): - system(['sq', 'keyring', 'split', '--prefix', '', str(keyring)]) + system(["sq", "keyring", "split", "--prefix", "", str(keyring)]) return natural_sort_path(keyring_dir.iterdir()) @@ -682,10 +656,10 @@ def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]: An iterable over the naturally sorted list of packet files derived from certificate """ - packet_dir = Path(mkdtemp(dir=working_dir, prefix='packet-')).absolute() + packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute() with cwd(packet_dir): - system(['sq', 'packet', 'split', '--prefix', '', str(certificate)]) + system(["sq", "packet", "split", "--prefix", "", str(certificate)]) return natural_sort_path(packet_dir.iterdir()) @@ -702,12 +676,12 @@ def packet_join(packets: List[Path], output: Path, force: bool = False) -> None: Whether to force the execution of sq (defaults to False) """ - cmd = ['sq', 'packet', 'join'] + cmd = ["sq", "packet", "join"] if force: - cmd.insert(1, '--force') + cmd.insert(1, "--force") packets_str = list(map(lambda path: str(path), packets)) cmd.extend(packets_str) - cmd.extend(['--output', str(output)]) + cmd.extend(["--output", str(output)]) system(cmd, exit_on_error=False) @@ -725,9 +699,9 @@ def simplify_user_id(user_id: str) -> str: The simplified representation of user_id """ - user_id = user_id.replace('@', '_at_') - user_id = sub('[<>]', '', user_id) - user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id) + user_id = user_id.replace("@", "_at_") + user_id = sub("[<>]", "", user_id) + user_id = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id) return user_id @@ -764,8 +738,7 @@ def convert( for key in keys: name = name_override or key.stem for cert in keyring_split(working_dir=working_dir, keyring=key): - directories.append( - convert_certificate(working_dir=working_dir, certificate=cert, name_override=name)) + directories.append(convert_certificate(working_dir=working_dir, certificate=cert, name_override=name)) for path in directories: (target_dir / path.name).mkdir(parents=True, exist_ok=True) @@ -795,12 +768,10 @@ def temp_join_keys(sources: List[Path], temp_dir: Path, force: bool) -> List[Pat if user_dir.is_dir(): for user_cert_number, user_cert_dir in enumerate(sorted(user_dir.iterdir())): if user_cert_dir.is_dir(): - cert_path = ( - temp_dir / ( - f"{str(source_number).zfill(4)}" - f"-{str(user_number).zfill(4)}" - f"-{str(user_cert_number).zfill(4)}.asc" - ) + cert_path = temp_dir / ( + f"{str(source_number).zfill(4)}" + f"-{str(user_number).zfill(4)}" + f"-{str(user_cert_number).zfill(4)}.asc" ) debug(f"Joining {user_dir.name}/{user_cert_dir.name} in {cert_path}.") packet_join( @@ -909,7 +880,8 @@ def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_re foreign_revocations[cert_dir.stem] = [] for revocation_cert in cert_dir.glob("uid/*/revocation/*.asc"): foreign_revocations[cert_dir.stem] += [ - revocation_cert.stem for main_key in main_keys + revocation_cert.stem + for main_key in main_keys if main_key.endswith(revocation_cert.stem) ] @@ -967,9 +939,9 @@ def export_keyring( ) output.parent.mkdir(parents=True, exist_ok=True) - cmd = ['sq', 'keyring', 'merge', '-o', str(output)] + cmd = ["sq", "keyring", "merge", "-o", str(output)] if force: - cmd.insert(1, '--force') + cmd.insert(1, "--force") cmd += [str(cert) for cert in sorted(main_certs)] cmd += [str(cert) for cert in sorted(sources_certs)] system(cmd, exit_on_error=False) @@ -1003,89 +975,88 @@ def absolute_path(path: str) -> Path: return Path(path).absolute() -if __name__ == '__main__': +if __name__ == "__main__": parser = ArgumentParser() - parser.add_argument('-v', '--verbose', action='store_true', - help='Causes to print debugging messages about the progress') - parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory') parser.add_argument( - '-f', - '--force', - action='store_true', + "-v", "--verbose", action="store_true", help="Causes to print debugging messages about the progress" + ) + parser.add_argument("--wait", action="store_true", help="Block before cleaning up the temp directory") + parser.add_argument( + "-f", + "--force", + action="store_true", default=False, - help='force the execution of subcommands (e.g. overwriting of files)', + help="force the execution of subcommands (e.g. overwriting of files)", ) subcommands = parser.add_subparsers(dest="subcommand") convert_parser = subcommands.add_parser( - 'convert', + "convert", help="import one or multiple PGP public keys and convert them to a decomposed directory structure", ) - convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert') - convert_parser.add_argument('--target', type=absolute_path, help='target directory') + convert_parser.add_argument("source", type=absolute_path, help="File or directory to convert") + convert_parser.add_argument("--target", type=absolute_path, help="target directory") convert_parser.add_argument( - '--name', + "--name", type=str, default=None, - help='override the username to use (only useful when using a single file as source)', + help="override the username to use (only useful when using a single file as source)", ) import_main_parser = subcommands.add_parser( - 'import-main', - help="import one or several PGP keys to the main signing keys" + "import-main", help="import one or several PGP keys to the main signing keys" ) - import_main_parser.add_argument('source', type=absolute_path, help='File or directory') + import_main_parser.add_argument("source", type=absolute_path, help="File or directory") import_main_parser.add_argument( - '--name', + "--name", type=str, default=None, - help='override the username to use (only useful when using a single file as source)', + help="override the username to use (only useful when using a single file as source)", ) import_packager_parser = subcommands.add_parser( - 'import-packager', - help="import one or several PGP keys to the packager keys" + "import-packager", help="import one or several PGP keys to the packager keys" ) - import_packager_parser.add_argument('source', type=absolute_path, help='File or directory') + import_packager_parser.add_argument("source", type=absolute_path, help="File or directory") import_packager_parser.add_argument( - '--name', + "--name", type=str, default=None, - help='override the username to use (only useful when using a single file as source)', + help="override the username to use (only useful when using a single file as source)", ) export_keyring_parser = subcommands.add_parser( - 'export-keyring', + "export-keyring", help="export PGP packet data below main/ and packagers/ to output/archlinux.gpg alongside pacman integration", ) export_parser = subcommands.add_parser( - 'export', + "export", help="export a directory structure of PGP packet data to a combined file", ) - export_parser.add_argument('output', type=absolute_path, help='file to write PGP packet data to') + export_parser.add_argument("output", type=absolute_path, help="file to write PGP packet data to") export_parser.add_argument( - '-m', - '--main', + "-m", + "--main", action="append", - help='files or directories containing PGP packet data that is trusted (can be provided multiple times)', + help="files or directories containing PGP packet data that is trusted (can be provided multiple times)", required=True, type=absolute_path, ) export_parser.add_argument( - '-s', - '--source', + "-s", + "--source", action="append", - help='files or directories containing PGP packet data (can be provided multiple times)', + help="files or directories containing PGP packet data (can be provided multiple times)", required=True, type=absolute_path, ) export_parser.add_argument( - '-p', - '--pacman-integration', - action='store_true', + "-p", + "--pacman-integration", + action="store_true", default=False, - help='export trusted and revoked files (used by pacman) alongside the keyring', + help="export trusted and revoked files (used by pacman) alongside the keyring", ) args = parser.parse_args() @@ -1094,16 +1065,14 @@ if __name__ == '__main__': basicConfig(level=DEBUG) # temporary working directory that gets auto cleaned - with TemporaryDirectory(prefix='arch-keyringctl-') as tempdir: + with TemporaryDirectory(prefix="arch-keyringctl-") as tempdir: keyring_root = Path().absolute() working_dir = Path(tempdir) - debug(f'Working directory: {working_dir}') + debug(f"Working directory: {working_dir}") with cwd(working_dir): - if 'convert' == args.subcommand: - print( - convert(working_dir, args.source, target_dir=Path(mkdtemp(prefix='arch-keyringctl-')).absolute()) - ) - elif 'import-main' == args.subcommand: + if "convert" == args.subcommand: + print(convert(working_dir, args.source, target_dir=Path(mkdtemp(prefix="arch-keyringctl-")).absolute())) + elif "import-main" == args.subcommand: print( convert( working_dir=working_dir, @@ -1112,7 +1081,7 @@ if __name__ == '__main__': name_override=args.name, ) ) - elif 'import-packager' == args.subcommand: + elif "import-packager" == args.subcommand: print( convert( working_dir=working_dir, @@ -1121,7 +1090,7 @@ if __name__ == '__main__': name_override=args.name, ) ) - elif 'export-keyring' == args.subcommand: + elif "export-keyring" == args.subcommand: export_keyring( working_dir=working_dir, main=[keyring_root / "main"], @@ -1130,7 +1099,7 @@ if __name__ == '__main__': force=True, pacman_integration=True, ) - elif 'export' == args.subcommand: + elif "export" == args.subcommand: export_keyring( working_dir=working_dir, main=args.main, @@ -1141,5 +1110,5 @@ if __name__ == '__main__': ) if args.wait: - print('Press [ENTER] to continue') + print("Press [ENTER] to continue") input()