condorcore-keyring/keyringctl
David Runge 0d32d2f00a
keyringctl: Dedicated functions for writing to file
keyringctl:
Add `persist_basic_key()`, `persist_direct_keys()`,
`persist_certifications()` and `persist_revocations()` to allow for
dedicated writing of basic key material, direct key signatures,
per UID certificates and per UID revocations (respectively).
Change `convert_certificate()` to call the new dedicated write functions
instead of implementing the functionality.
Change `convert_certificate()` to raise on missing current_packet_key
when trying to work on signature files (this is unlikely to occur,
unless the input data is somehow broken, but it keeps the linter happy).
Change `convert_certificate()` to handle direct_keys by issuer on a
given root key (DirectKey signatures by the same issuer are combined).
Change the argparse subparser for the 'convert' command to include a
help text.
2021-11-30 22:54:05 +01:00

469 lines
16 KiB
Python
Executable File

#!/usr/bin/env python
from argparse import ArgumentParser
from collections import defaultdict
from os import chdir
from os import getcwd
from pathlib import Path
from shutil import move
from re import escape
from re import split
from re import sub
from subprocess import CalledProcessError
from subprocess import check_output
from subprocess import PIPE
from sys import exit
from sys import stderr
from tempfile import TemporaryDirectory
from tempfile import mkdtemp
from logging import basicConfig
from logging import debug
from logging import error
from logging import DEBUG
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Iterable
from contextlib import contextmanager
@contextmanager
def cwd(new_dir: Path):
previous_dir = getcwd()
chdir(new_dir)
try:
yield
finally:
chdir(previous_dir)
# class Key():
# fingerprint: str = ""
# pubkey: Path
# uids: List[]
# uid_certification: List[]
# subkeys: List[]
# subkey_certification: List[]
# uid_signatures: List[]
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
def convert(text: str) -> Any:
return int(text) if text.isdigit() else text.lower()
def alphanum_key(key: Path) -> List[Any]:
return [convert(c) for c in split('([0-9]+)', str(key.name))]
return sorted(_list, key=alphanum_key)
def system(cmd: List[str], exit_on_error: bool = True) -> str:
try:
return check_output(cmd, stderr=PIPE).decode()
except CalledProcessError as e:
stderr.buffer.write(e.stderr)
if exit_on_error:
exit(e.returncode)
raise e
def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Path:
certificate_fingerprint: Optional[str] = None
pubkey: Optional[Path] = None
direct_keys: Dict[str, Dict[str, List[Path]]] = {}
current_packet_mode: Optional[str] = None
current_packet_key: Optional[str] = None
uids: Dict[str, Path] = {}
uid_binding_sig: Dict[str, Path] = {}
subkey: Dict[str, Path] = {}
subkey_binding_sig: Dict[str, Path] = {}
certifications: Dict[str, List[Path]] = defaultdict(list)
revocations: Dict[str, List[Path]] = defaultdict(list)
# XXX: KeyRevocation
# XXX: PrimaryKeyBinding
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
for packet in packet_split(working_dir, certificate):
debug(f'Processing packet {packet.name}')
if packet.name.endswith('--PublicKey'):
pubkey = packet
certificate_fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'pubkey'
current_packet_key = certificate_fingerprint
elif packet.name.endswith('--UserID'):
value = packet_dump_field(packet, 'Value')
value = simplify_user_id(value)
current_packet_mode = 'uid'
current_packet_key = value
uids[value] = packet
elif packet.name.endswith('--PublicSubkey'):
fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'subkey'
current_packet_key = fingerprint
subkey[fingerprint] = packet
elif packet.name.endswith('--Signature'):
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
if not current_packet_key:
raise Exception('missing current packet key for "{packet.name}"')
issuer = packet_dump_field(packet, 'Issuer')
signature_type = packet_dump_field(packet, 'Type')
if signature_type == 'DirectKey':
if not direct_keys.get(current_packet_key):
direct_keys = direct_keys | {current_packet_key: defaultdict(list)}
direct_keys[current_packet_key][issuer].append(packet)
continue
if not current_packet_key:
# TODO GenericCertification PersonaCertification CasualCertification PositiveCertification
raise Exception(f'unknown packet key for "{packet.name}"')
if current_packet_mode == 'uid' or current_packet_mode == 'pubkey':
if certificate_fingerprint.endswith(issuer):
if signature_type == 'PositiveCertification':
uid_binding_sig[current_packet_key] = packet
elif signature_type == 'CertificationRevocation':
# XXX:
revocations[current_packet_key].append(packet)
else:
raise Exception(f'unknown signature type: {signature_type}')
else:
if signature_type.endswith('Certification'):
certifications[current_packet_key].append(packet)
elif signature_type == 'CertificationRevocation':
revocations[current_packet_key].append(packet)
else:
raise Exception(f'unknown signature type: {signature_type}')
elif current_packet_mode == 'subkey':
if signature_type == 'SubkeyBinding':
subkey_binding_sig[current_packet_key] = packet
elif signature_type == 'SubkeyRevocation':
# XXX:
pass
else:
raise Exception(f'unknown signature type: {signature_type}')
else:
raise Exception(f'unknown signature root for "{packet.name}"')
else:
raise Exception(f'unknown packet type "{packet.name}"')
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint')
if not pubkey:
raise Exception('missing certificate public-key')
root_dir = (working_dir / certificate_fingerprint)
root_dir.mkdir()
persist_basic_key(
certificate_fingerprint=certificate_fingerprint,
pubkey=pubkey,
root_dir=root_dir,
subkey=subkey,
subkey_binding_sig=subkey_binding_sig,
uid_binding_sig=uid_binding_sig,
uids=uids,
)
persist_direct_keys(
direct_keys=direct_keys,
pubkey=pubkey,
root_dir=root_dir,
)
persist_certifications(
certifications=certifications,
pubkey=pubkey,
root_dir=root_dir,
uid_binding_sig=uid_binding_sig,
uids=uids,
)
persist_revocations(
pubkey=pubkey,
revocations=revocations,
root_dir=root_dir,
uid_binding_sig=uid_binding_sig,
uids=uids,
)
return root_dir
def persist_basic_key(
certificate_fingerprint: str,
pubkey: Path,
root_dir: Path,
subkey: Dict[str, Path],
subkey_binding_sig: Dict[str, Path],
uid_binding_sig: Dict[str, Path],
uids: Dict[str, Path],
) -> None:
"""Persist the basic key material of a root key to file
The basic key material consists of the root key's public key, any PublicSubkeys and their SubkeyBindings, all User
IDs and the per User ID PositiveCertifications.
The file is written to root_dir and is named after the root key's certificate fingerprint.
Parameters
----------
certificate_fingerprint: str
The unique fingerprint of the public key
pubkey: Path
The path to the public key of the root key
root_dir: Path
The root directory below which the basic key material is persisted
subkey: Dict[str, Path]
The PublicSubkeys of a key
subkey_binding_sig: Dict[str, Path]
The SubkeyBinding signatures of a Public-Key (the root key)
uid_binding_sig: Dict[str, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
The User IDs of a Public-Key (the root key)
"""
packets: List[Path] = [pubkey]
for key in uid_binding_sig.keys():
packets.extend([uids[key], uid_binding_sig[key]])
for key in subkey_binding_sig.keys():
packets.extend([subkey[key], subkey_binding_sig[key]])
packet_join(packets, root_dir / f'{certificate_fingerprint}.asc')
def persist_direct_keys(
direct_keys: Dict[str, Dict[str, List[Path]]],
pubkey: Path,
root_dir: Path,
) -> None:
"""Persist the DirectKeys on a root key to file(s)
Parameters
----------
certifications: Dict[str, List[Path]]
The certifications to write to file
pubkey: Path
The path to the public key of the root key
root_dir: Path
The root directory below which the Directkeys are persisted
"""
for key, current_certifications in direct_keys.items():
for issuer, certifications in current_certifications.items():
direct_key_dir = root_dir / 'certification'
direct_key_dir.mkdir(parents=True, exist_ok=True)
packets = [pubkey] + certifications
output_file = direct_key_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {[str(cert) for cert in certifications]}')
packet_join(packets, output_file)
def persist_certifications(
certifications: Dict[str, List[Path]],
pubkey: Path,
root_dir: Path,
uid_binding_sig: Dict[str, Path],
uids: Dict[str, Path],
) -> None:
"""Persist the certifications of a root key to file(s)
The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and
PositiveCertifications for all User IDs of the given root key.
All certifications are persisted in per User ID certification directories below root_dir.
Parameters
----------
certifications: Dict[str, List[Path]]
The certifications to write to file
pubkey: Path
The path to the public key of the root key
root_dir: Path
The root directory below which certifications are persisted
uid_binding_sig: Dict[str, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
The User IDs of a Public-Key (the root key)
"""
for key, current_certifications in certifications.items():
for certification in current_certifications:
certification_dir = root_dir / key / 'certification'
certification_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(certification, 'Issuer')
# TODO: find a way to get uid binding for pubkey certs
if key not in uids:
error('missing uid')
breakpoint()
if key not in uid_binding_sig:
error('missing binding sig')
breakpoint()
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
output_file = certification_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {certification}')
packet_join(packets, output_file)
def persist_revocations(
pubkey: Path,
revocations: Dict[str, List[Path]],
root_dir: Path,
uid_binding_sig: Dict[str, Path],
uids: Dict[str, Path],
) -> None:
"""Persist the revocations of a root key to file(s)
The revocations include all CertificationRevocations for all User IDs of the given root key.
All revocations are persisted in per User ID 'revocation' directories below root_dir.
Parameters
----------
pubkey: Path
The path to the public key of the root key
revocations: Dict[str, List[Path]]
The revocations to write to file
root_dir: Path
The root directory below which revocations will be persisted
uid_binding_sig: Dict[str, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
The User IDs of a Public-Key (the root key)
"""
for key, current_revocations in revocations.items():
for revocation in current_revocations:
revocation_dir = root_dir / key / 'revocation'
revocation_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(revocation, 'Issuer')
packets = [pubkey, uids[key]]
# Binding sigs only exist for 3rd-party revocations
if key in uid_binding_sig:
packets.append(uid_binding_sig[key])
packets.append(revocation)
output_file = revocation_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {revocation}')
packet_join(packets, output_file)
def packet_dump(packet: Path) -> str:
return system(['sq', 'packet', 'dump', str(packet)])
def packet_dump_field(packet: Path, field: str) -> str:
dump = packet_dump(packet)
lines = [line.strip() for line in dump.splitlines()]
lines = list(filter(lambda line: line.startswith(f'{field}: '), lines))
if not lines:
raise Exception(f'Packet has no field "{field}"')
return lines[0].split(maxsplit=1)[1]
def packet_split(working_dir: Path, key: Path) -> Iterable[Path]:
packet_dir = Path(mkdtemp(dir=working_dir)).absolute()
with cwd(packet_dir):
system(['sq', 'packet', 'split', '--prefix', '', str(key)])
return natural_sort_path(packet_dir.iterdir())
def packet_join(packets: List[Path], output: Path) -> None:
cmd = ['sq', 'packet', 'join']
packets_str = list(map(lambda path: str(path), packets))
cmd.extend(packets_str)
cmd.extend(['--output', str(output)])
system(cmd, exit_on_error=False)
def simplify_user_id(user_id: str) -> str:
user_id = user_id.replace('@', '_at_')
user_id = sub('[<>]', '', user_id)
user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id)
return user_id
def convert(working_dir: Path, source: Path, target_dir: Optional[Path] = None) -> Path:
directories: List[Path] = []
if source.is_dir():
for key in source.iterdir():
directories.append(convert_certificate(working_dir, key, 'anthraxx'))
else:
directories.append(convert_certificate(working_dir, source, 'anthraxx'))
if target_dir:
target_dir.mkdir(parents=True, exist_ok=True)
else:
# persistent target directory
target_dir = Path(mkdtemp()).absolute()
for path in directories:
target_dir.mkdir(exist_ok=True)
move(path, target_dir)
return target_dir
def keyring_import(working_dir: Path, source: Path, target_dir: Optional[Path] = None):
pass
def absolute_path(path: str) -> Path:
return Path(path).absolute()
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true',
help='Causes to print debugging messages about the progress')
parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory')
subcommands = parser.add_subparsers(dest="subcommand")
convert_parser = subcommands.add_parser(
'convert',
help="convert a legacy-style PGP cert or directory containing PGP certs to the new format",
)
convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert')
convert_parser.add_argument('--target', type=absolute_path, help='target directory')
import_parser = subcommands.add_parser('import')
import_parser.add_argument('source', type=absolute_path, help='File or directory')
import_parser.add_argument('--target', type=absolute_path, help='Target directory')
args = parser.parse_args()
if args.verbose:
basicConfig(level=DEBUG)
# temporary working directory that gets auto cleaned
with TemporaryDirectory() as tempdir:
working_dir = Path(tempdir)
start_dir = Path().absolute()
chdir(working_dir)
debug(f'Working directory: {working_dir}')
if 'convert' == args.subcommand:
print(convert(working_dir, args.source, args.target))
elif 'import' == args.subcommand:
keyring_import(working_dir, args.source, args.target)
if args.wait:
print('Press [ENTER] to continue')
input()