keyringctl: Format file

keyringctl:
Use black to format the file, isort to auto-sort all imports.
Remove commented code and (for now) ignore the high complexity in
`convert()` so that flake8 can be used.
This commit is contained in:
David Runge 2021-10-17 00:50:34 +02:00 committed by Levente Polyak
parent 0e54261242
commit 1cbd360d17
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8

View File

@ -1,42 +1,19 @@
#!/usr/bin/env python
#
# SPDX-License-Identifier: GPL-3.0-or-later
from argparse import ArgumentParser
from collections import defaultdict
from os import chdir
from os import getcwd
from pathlib import Path
from shutil import copytree
from re import escape
from re import split
from re import sub
from subprocess import CalledProcessError
from subprocess import check_output
from subprocess import PIPE
from sys import exit
from sys import stderr
from tempfile import TemporaryDirectory
from tempfile import mkdtemp
from logging import basicConfig
from logging import debug, error
from logging import DEBUG
from typing import Dict
from typing import List
from typing import Optional
from typing import Iterable
from typing import Iterator
from typing import Tuple
from typing import Union
from contextlib import contextmanager
from logging import DEBUG, basicConfig, debug, error
from os import chdir, getcwd
from pathlib import Path
from re import escape, split, sub
from shutil import copytree
from subprocess import PIPE, CalledProcessError, check_output
from sys import exit, stderr
from tempfile import TemporaryDirectory, mkdtemp
from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Union
@contextmanager
@ -56,15 +33,6 @@ def cwd(new_dir: Path) -> Iterator[None]:
finally:
chdir(previous_dir)
# class Key():
# fingerprint: str = ""
# pubkey: Path
# uids: List[]
# uid_certification: List[]
# subkeys: List[]
# subkey_certification: List[]
# uid_signatures: List[]
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
"""Sort an Iterable of Paths naturally
@ -110,7 +78,7 @@ def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
A list of either int or str objects that may serve as 'key' argument for sorted()
"""
return [convert(c) for c in split('([0-9]+)', str(key.name))]
return [convert(c) for c in split("([0-9]+)", str(key.name))]
return sorted(_list, key=alphanum_key)
@ -145,7 +113,12 @@ def system(cmd: List[str], exit_on_error: bool = True) -> str:
raise e
def convert_certificate(working_dir: Path, certificate: Path, name_override: Optional[str] = None) -> Path:
# TODO: simplify to lower complexity
def convert_certificate( # noqa: ignore=C901
working_dir: Path,
certificate: Path,
name_override: Optional[str] = None,
) -> Path:
"""Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets
The output directory structure is created per user. The username is derived from the certificate (or overridden).
@ -219,23 +192,23 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
debug(f'Processing certificate {certificate}')
debug(f"Processing certificate {certificate}")
for packet in packet_split(working_dir=working_dir, certificate=certificate):
debug(f'Processing packet {packet.name}')
if packet.name.endswith('--PublicKey'):
debug(f"Processing packet {packet.name}")
if packet.name.endswith("--PublicKey"):
pubkey = packet
certificate_fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'pubkey'
certificate_fingerprint = packet_dump_field(packet, "Fingerprint")
current_packet_mode = "pubkey"
current_packet_key = certificate_fingerprint
elif packet.name.endswith('--UserID'):
value = simplify_user_id(packet_dump_field(packet, 'Value'))
current_packet_mode = 'uid'
elif packet.name.endswith("--UserID"):
value = simplify_user_id(packet_dump_field(packet, "Value"))
current_packet_mode = "uid"
current_packet_key = value
uids[value] = packet
elif packet.name.endswith('--PublicSubkey'):
fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'subkey'
elif packet.name.endswith("--PublicSubkey"):
fingerprint = packet_dump_field(packet, "Fingerprint")
current_packet_mode = "subkey"
current_packet_key = fingerprint
if not certificate_fingerprint:
@ -246,24 +219,24 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
else:
subkeys[certificate_fingerprint] |= {fingerprint: packet}
elif packet.name.endswith('--Signature'):
elif packet.name.endswith("--Signature"):
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
if not current_packet_key:
raise Exception('missing current packet key for "{packet.name}"')
issuer = packet_dump_field(packet, 'Issuer')
signature_type = packet_dump_field(packet, 'Type')
issuer = packet_dump_field(packet, "Issuer")
signature_type = packet_dump_field(packet, "Type")
if current_packet_mode == 'pubkey':
if signature_type == 'KeyRevocation' and certificate_fingerprint.endswith(issuer):
if current_packet_mode == "pubkey":
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
direct_revocations = add_packet_to_direct_sigs(
direct_sigs=direct_revocations,
issuer=issuer,
packet_key=current_packet_key,
packet=packet,
)
elif signature_type in ['DirectKey', 'GenericCertification']:
elif signature_type in ["DirectKey", "GenericCertification"]:
direct_sigs = add_packet_to_direct_sigs(
direct_sigs=direct_sigs,
issuer=issuer,
@ -271,42 +244,42 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
packet=packet,
)
else:
raise Exception(f'unknown signature type: {signature_type}')
elif current_packet_mode == 'uid':
if signature_type == 'CertificationRevocation':
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "uid":
if signature_type == "CertificationRevocation":
revocations[current_packet_key].append(packet)
elif signature_type == 'PositiveCertification' and certificate_fingerprint.endswith(issuer):
elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer):
uid_binding_sigs[current_packet_key] = packet
elif signature_type.endswith('Certification'):
elif signature_type.endswith("Certification"):
certifications[current_packet_key].append(packet)
else:
raise Exception(f'unknown signature type: {signature_type}')
elif current_packet_mode == 'subkey':
if signature_type == 'SubkeyBinding':
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "subkey":
if signature_type == "SubkeyBinding":
if not subkey_binding_sigs.get(certificate_fingerprint):
subkey_binding_sigs |= {certificate_fingerprint: {fingerprint: packet}}
else:
subkey_binding_sigs[certificate_fingerprint] |= {fingerprint: packet}
elif signature_type == 'SubkeyRevocation':
elif signature_type == "SubkeyRevocation":
if not subkey_revocations.get(certificate_fingerprint):
subkey_revocations |= {certificate_fingerprint: {fingerprint: packet}}
else:
subkey_revocations[certificate_fingerprint] |= {fingerprint: packet}
else:
raise Exception(f'unknown signature type: {signature_type}')
raise Exception(f"unknown signature type: {signature_type}")
else:
raise Exception(f'unknown signature root for "{packet.name}"')
else:
raise Exception(f'unknown packet type "{packet.name}"')
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint')
raise Exception("missing certificate fingerprint")
if not pubkey:
raise Exception('missing certificate public-key')
raise Exception("missing certificate public-key")
user_dir = (working_dir / username)
key_dir = (user_dir / certificate_fingerprint)
user_dir = working_dir / username
key_dir = user_dir / certificate_fingerprint
key_dir.mkdir(parents=True, exist_ok=True)
persist_public_key(
@ -384,9 +357,9 @@ def persist_public_key(
"""
packets: List[Path] = [pubkey]
output_file = key_dir / f'{certificate_fingerprint}.asc'
output_file = key_dir / f"{certificate_fingerprint}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}')
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
packet_join(packets, output_file)
@ -412,9 +385,9 @@ def persist_uids(
for key in uid_binding_sigs.keys():
packets = [uids[key], uid_binding_sigs[key]]
output_file = key_dir / 'uid' / key / f'{key}.asc'
output_file = key_dir / "uid" / key / f"{key}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}')
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
packet_join(packets, output_file)
@ -442,9 +415,9 @@ def persist_subkeys(
for signature, subkey in subkeys[certificate_fingerprint].items():
packets: List[Path] = []
packets.extend([subkey, subkey_binding_sigs[certificate_fingerprint][signature]])
output_file = key_dir / 'subkey' / signature / f'{signature}.asc'
output_file = key_dir / "subkey" / signature / f"{signature}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}')
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
packet_join(packets=packets, output=output_file)
@ -467,10 +440,10 @@ def persist_subkey_revocations(
if subkey_revocations.get(certificate_fingerprint):
for signature, revocation in subkey_revocations[certificate_fingerprint].items():
issuer = packet_dump_field(revocation, 'Issuer')
output_file = key_dir / 'subkey' / signature / 'revocation' / f'{issuer}.asc'
issuer = packet_dump_field(revocation, "Issuer")
output_file = key_dir / "subkey" / signature / "revocation" / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f'Writing file {output_file} from {revocation}')
debug(f"Writing file {output_file} from {revocation}")
packet_join(packets=[revocation], output=output_file)
@ -498,9 +471,9 @@ def persist_direct_sigs(
for key, current_certifications in direct_sigs.items():
for issuer, certifications in current_certifications.items():
packets = [pubkey] + certifications
output_file = key_dir / sig_type / f'{issuer}.asc'
output_file = key_dir / sig_type / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f'Writing file {output_file} from {[str(cert) for cert in certifications]}')
debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
packet_join(packets, output_file)
@ -533,20 +506,21 @@ def persist_certifications(
for key, current_certifications in certifications.items():
for certification in current_certifications:
certification_dir = key_dir / 'uid' / key / 'certification'
certification_dir = key_dir / "uid" / key / "certification"
certification_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(certification, 'Issuer')
issuer = packet_dump_field(certification, "Issuer")
if uids.get(key) and uid_binding_sig.get(key):
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
output_file = certification_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {certification}')
output_file = certification_dir / f"{issuer}.asc"
debug(f"Writing file {output_file} from {certification}")
packet_join(packets, output_file)
else:
error(
f"Public key '{pubkey}' does not provide "
f"{'the UID binding signature' if not uid_binding_sig.get(key) else ''} for UID '{key}', "
"so its certifications can not be used!")
"so its certifications can not be used!"
)
def persist_revocations(
@ -577,17 +551,17 @@ def persist_revocations(
for key, current_revocations in revocations.items():
for revocation in current_revocations:
revocation_dir = key_dir / 'uid' / key / 'revocation'
revocation_dir = key_dir / "uid" / key / "revocation"
revocation_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(revocation, 'Issuer')
issuer = packet_dump_field(revocation, "Issuer")
packets = [pubkey, uids[key]]
# Binding sigs only exist for 3rd-party revocations
if key in uid_binding_sig:
packets.append(uid_binding_sig[key])
packets.append(revocation)
output_file = revocation_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {revocation}')
output_file = revocation_dir / f"{issuer}.asc"
debug(f"Writing file {output_file} from {revocation}")
packet_join(packets, output_file)
@ -607,7 +581,7 @@ def packet_dump(packet: Path) -> str:
The contents of the packet dump
"""
return system(['sq', 'packet', 'dump', str(packet)])
return system(["sq", "packet", "dump", str(packet)])
def packet_dump_field(packet: Path, field: str) -> str:
@ -633,7 +607,7 @@ def packet_dump_field(packet: Path, field: str) -> str:
dump = packet_dump(packet)
lines = [line.strip() for line in dump.splitlines()]
lines = list(filter(lambda line: line.startswith(f'{field}: '), lines))
lines = list(filter(lambda line: line.startswith(f"{field}: "), lines))
if not lines:
raise Exception(f'Packet has no field "{field}"')
return lines[0].split(maxsplit=1)[1]
@ -657,10 +631,10 @@ def keyring_split(working_dir: Path, keyring: Path) -> Iterable[Path]:
An iterable over the naturally sorted list of certificate files derived from a keyring
"""
keyring_dir = Path(mkdtemp(dir=working_dir, prefix='keyring-')).absolute()
keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute()
with cwd(keyring_dir):
system(['sq', 'keyring', 'split', '--prefix', '', str(keyring)])
system(["sq", "keyring", "split", "--prefix", "", str(keyring)])
return natural_sort_path(keyring_dir.iterdir())
@ -682,10 +656,10 @@ def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
An iterable over the naturally sorted list of packet files derived from certificate
"""
packet_dir = Path(mkdtemp(dir=working_dir, prefix='packet-')).absolute()
packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute()
with cwd(packet_dir):
system(['sq', 'packet', 'split', '--prefix', '', str(certificate)])
system(["sq", "packet", "split", "--prefix", "", str(certificate)])
return natural_sort_path(packet_dir.iterdir())
@ -702,12 +676,12 @@ def packet_join(packets: List[Path], output: Path, force: bool = False) -> None:
Whether to force the execution of sq (defaults to False)
"""
cmd = ['sq', 'packet', 'join']
cmd = ["sq", "packet", "join"]
if force:
cmd.insert(1, '--force')
cmd.insert(1, "--force")
packets_str = list(map(lambda path: str(path), packets))
cmd.extend(packets_str)
cmd.extend(['--output', str(output)])
cmd.extend(["--output", str(output)])
system(cmd, exit_on_error=False)
@ -725,9 +699,9 @@ def simplify_user_id(user_id: str) -> str:
The simplified representation of user_id
"""
user_id = user_id.replace('@', '_at_')
user_id = sub('[<>]', '', user_id)
user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id)
user_id = user_id.replace("@", "_at_")
user_id = sub("[<>]", "", user_id)
user_id = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id)
return user_id
@ -764,8 +738,7 @@ def convert(
for key in keys:
name = name_override or key.stem
for cert in keyring_split(working_dir=working_dir, keyring=key):
directories.append(
convert_certificate(working_dir=working_dir, certificate=cert, name_override=name))
directories.append(convert_certificate(working_dir=working_dir, certificate=cert, name_override=name))
for path in directories:
(target_dir / path.name).mkdir(parents=True, exist_ok=True)
@ -795,12 +768,10 @@ def temp_join_keys(sources: List[Path], temp_dir: Path, force: bool) -> List[Pat
if user_dir.is_dir():
for user_cert_number, user_cert_dir in enumerate(sorted(user_dir.iterdir())):
if user_cert_dir.is_dir():
cert_path = (
temp_dir / (
f"{str(source_number).zfill(4)}"
f"-{str(user_number).zfill(4)}"
f"-{str(user_cert_number).zfill(4)}.asc"
)
cert_path = temp_dir / (
f"{str(source_number).zfill(4)}"
f"-{str(user_number).zfill(4)}"
f"-{str(user_cert_number).zfill(4)}.asc"
)
debug(f"Joining {user_dir.name}/{user_cert_dir.name} in {cert_path}.")
packet_join(
@ -909,7 +880,8 @@ def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_re
foreign_revocations[cert_dir.stem] = []
for revocation_cert in cert_dir.glob("uid/*/revocation/*.asc"):
foreign_revocations[cert_dir.stem] += [
revocation_cert.stem for main_key in main_keys
revocation_cert.stem
for main_key in main_keys
if main_key.endswith(revocation_cert.stem)
]
@ -967,9 +939,9 @@ def export_keyring(
)
output.parent.mkdir(parents=True, exist_ok=True)
cmd = ['sq', 'keyring', 'merge', '-o', str(output)]
cmd = ["sq", "keyring", "merge", "-o", str(output)]
if force:
cmd.insert(1, '--force')
cmd.insert(1, "--force")
cmd += [str(cert) for cert in sorted(main_certs)]
cmd += [str(cert) for cert in sorted(sources_certs)]
system(cmd, exit_on_error=False)
@ -1003,89 +975,88 @@ def absolute_path(path: str) -> Path:
return Path(path).absolute()
if __name__ == '__main__':
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true',
help='Causes to print debugging messages about the progress')
parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory')
parser.add_argument(
'-f',
'--force',
action='store_true',
"-v", "--verbose", action="store_true", help="Causes to print debugging messages about the progress"
)
parser.add_argument("--wait", action="store_true", help="Block before cleaning up the temp directory")
parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help='force the execution of subcommands (e.g. overwriting of files)',
help="force the execution of subcommands (e.g. overwriting of files)",
)
subcommands = parser.add_subparsers(dest="subcommand")
convert_parser = subcommands.add_parser(
'convert',
"convert",
help="import one or multiple PGP public keys and convert them to a decomposed directory structure",
)
convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert')
convert_parser.add_argument('--target', type=absolute_path, help='target directory')
convert_parser.add_argument("source", type=absolute_path, help="File or directory to convert")
convert_parser.add_argument("--target", type=absolute_path, help="target directory")
convert_parser.add_argument(
'--name',
"--name",
type=str,
default=None,
help='override the username to use (only useful when using a single file as source)',
help="override the username to use (only useful when using a single file as source)",
)
import_main_parser = subcommands.add_parser(
'import-main',
help="import one or several PGP keys to the main signing keys"
"import-main", help="import one or several PGP keys to the main signing keys"
)
import_main_parser.add_argument('source', type=absolute_path, help='File or directory')
import_main_parser.add_argument("source", type=absolute_path, help="File or directory")
import_main_parser.add_argument(
'--name',
"--name",
type=str,
default=None,
help='override the username to use (only useful when using a single file as source)',
help="override the username to use (only useful when using a single file as source)",
)
import_packager_parser = subcommands.add_parser(
'import-packager',
help="import one or several PGP keys to the packager keys"
"import-packager", help="import one or several PGP keys to the packager keys"
)
import_packager_parser.add_argument('source', type=absolute_path, help='File or directory')
import_packager_parser.add_argument("source", type=absolute_path, help="File or directory")
import_packager_parser.add_argument(
'--name',
"--name",
type=str,
default=None,
help='override the username to use (only useful when using a single file as source)',
help="override the username to use (only useful when using a single file as source)",
)
export_keyring_parser = subcommands.add_parser(
'export-keyring',
"export-keyring",
help="export PGP packet data below main/ and packagers/ to output/archlinux.gpg alongside pacman integration",
)
export_parser = subcommands.add_parser(
'export',
"export",
help="export a directory structure of PGP packet data to a combined file",
)
export_parser.add_argument('output', type=absolute_path, help='file to write PGP packet data to')
export_parser.add_argument("output", type=absolute_path, help="file to write PGP packet data to")
export_parser.add_argument(
'-m',
'--main',
"-m",
"--main",
action="append",
help='files or directories containing PGP packet data that is trusted (can be provided multiple times)',
help="files or directories containing PGP packet data that is trusted (can be provided multiple times)",
required=True,
type=absolute_path,
)
export_parser.add_argument(
'-s',
'--source',
"-s",
"--source",
action="append",
help='files or directories containing PGP packet data (can be provided multiple times)',
help="files or directories containing PGP packet data (can be provided multiple times)",
required=True,
type=absolute_path,
)
export_parser.add_argument(
'-p',
'--pacman-integration',
action='store_true',
"-p",
"--pacman-integration",
action="store_true",
default=False,
help='export trusted and revoked files (used by pacman) alongside the keyring',
help="export trusted and revoked files (used by pacman) alongside the keyring",
)
args = parser.parse_args()
@ -1094,16 +1065,14 @@ if __name__ == '__main__':
basicConfig(level=DEBUG)
# temporary working directory that gets auto cleaned
with TemporaryDirectory(prefix='arch-keyringctl-') as tempdir:
with TemporaryDirectory(prefix="arch-keyringctl-") as tempdir:
keyring_root = Path().absolute()
working_dir = Path(tempdir)
debug(f'Working directory: {working_dir}')
debug(f"Working directory: {working_dir}")
with cwd(working_dir):
if 'convert' == args.subcommand:
print(
convert(working_dir, args.source, target_dir=Path(mkdtemp(prefix='arch-keyringctl-')).absolute())
)
elif 'import-main' == args.subcommand:
if "convert" == args.subcommand:
print(convert(working_dir, args.source, target_dir=Path(mkdtemp(prefix="arch-keyringctl-")).absolute()))
elif "import-main" == args.subcommand:
print(
convert(
working_dir=working_dir,
@ -1112,7 +1081,7 @@ if __name__ == '__main__':
name_override=args.name,
)
)
elif 'import-packager' == args.subcommand:
elif "import-packager" == args.subcommand:
print(
convert(
working_dir=working_dir,
@ -1121,7 +1090,7 @@ if __name__ == '__main__':
name_override=args.name,
)
)
elif 'export-keyring' == args.subcommand:
elif "export-keyring" == args.subcommand:
export_keyring(
working_dir=working_dir,
main=[keyring_root / "main"],
@ -1130,7 +1099,7 @@ if __name__ == '__main__':
force=True,
pacman_integration=True,
)
elif 'export' == args.subcommand:
elif "export" == args.subcommand:
export_keyring(
working_dir=working_dir,
main=args.main,
@ -1141,5 +1110,5 @@ if __name__ == '__main__':
)
if args.wait:
print('Press [ENTER] to continue')
print("Press [ENTER] to continue")
input()