a5be572136
keyringctl: Change `convert_certificates()` to use a more descriptive `name_override` parameter in its signature to allow the overriding of the username directory name into which key material is persisted. Distinguish between the per-username directory and the eventual key material directory. Instead of the key directory return the username directory. Change the `persist*` functions to use the `key_dir` instead of the `root_dir` terminology as well. Change `convert()` to optionally allow a `name_override` as well and use that in the calls to `convert_certificate()`. Make the moving of files more robust, by at least allowing to move the per-key directories for a username, if the username target directory exists already. NOTE: This needs expansion for the use-case where existing files should be updated/extended by new files. Add an additional argument to the 'convert' argparse parser to allow users to override the target username directory name.
543 lines
19 KiB
Python
Executable File
543 lines
19 KiB
Python
Executable File
#!/usr/bin/env python
|
|
from argparse import ArgumentParser
|
|
|
|
from collections import defaultdict
|
|
|
|
from os import chdir
|
|
from os import getcwd
|
|
|
|
from pathlib import Path
|
|
|
|
from shutil import move
|
|
|
|
from re import escape
|
|
from re import split
|
|
from re import sub
|
|
|
|
from subprocess import CalledProcessError
|
|
from subprocess import check_output
|
|
from subprocess import PIPE
|
|
|
|
from sys import exit
|
|
from sys import stderr
|
|
|
|
from tempfile import TemporaryDirectory
|
|
from tempfile import mkdtemp
|
|
|
|
from logging import basicConfig
|
|
from logging import debug
|
|
from logging import DEBUG
|
|
|
|
from typing import Any
|
|
from typing import Dict
|
|
from typing import List
|
|
from typing import Optional
|
|
from typing import Iterable
|
|
|
|
from contextlib import contextmanager
|
|
|
|
|
|
@contextmanager
|
|
def cwd(new_dir: Path):
|
|
previous_dir = getcwd()
|
|
chdir(new_dir)
|
|
try:
|
|
yield
|
|
finally:
|
|
chdir(previous_dir)
|
|
|
|
# class Key():
|
|
# fingerprint: str = ""
|
|
# pubkey: Path
|
|
# uids: List[]
|
|
# uid_certification: List[]
|
|
# subkeys: List[]
|
|
# subkey_certification: List[]
|
|
# uid_signatures: List[]
|
|
|
|
|
|
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
|
|
def convert(text: str) -> Any:
|
|
return int(text) if text.isdigit() else text.lower()
|
|
|
|
def alphanum_key(key: Path) -> List[Any]:
|
|
return [convert(c) for c in split('([0-9]+)', str(key.name))]
|
|
|
|
return sorted(_list, key=alphanum_key)
|
|
|
|
|
|
def system(cmd: List[str], exit_on_error: bool = True) -> str:
|
|
try:
|
|
return check_output(cmd, stderr=PIPE).decode()
|
|
except CalledProcessError as e:
|
|
stderr.buffer.write(e.stderr)
|
|
if exit_on_error:
|
|
exit(e.returncode)
|
|
raise e
|
|
|
|
|
|
def convert_certificate(working_dir: Path, certificate: Path, name_override: Optional[str] = None) -> Path:
|
|
certificate_fingerprint: Optional[str] = None
|
|
pubkey: Optional[Path] = None
|
|
direct_sigs: Dict[str, Dict[str, List[Path]]] = {}
|
|
direct_revocations: Dict[str, Dict[str, List[Path]]] = {}
|
|
current_packet_mode: Optional[str] = None
|
|
current_packet_key: Optional[str] = None
|
|
uids: Dict[str, Path] = {}
|
|
uid_binding_sig: Dict[str, Path] = {}
|
|
subkey: Dict[str, Path] = {}
|
|
subkey_binding_sig: Dict[str, Path] = {}
|
|
certifications: Dict[str, List[Path]] = defaultdict(list)
|
|
revocations: Dict[str, List[Path]] = defaultdict(list)
|
|
username = name_override or certificate.name.split(".")[0]
|
|
|
|
def add_packet_to_direct_sigs(
|
|
direct_sigs: Dict[str, Dict[str, List[Path]]],
|
|
issuer: str,
|
|
packet_key: str,
|
|
packet: Path,
|
|
) -> Dict[str, Dict[str, List[Path]]]:
|
|
"""Add a packet to the set of DirectKeys
|
|
|
|
If no key with the given packet_key exists yet, it is created.
|
|
|
|
Parameters
|
|
----------
|
|
direct_sigs: Dict[str, Dict[str, List[Path]]]
|
|
The signatures directly on a root key (such as DirectKey or *Certifications without a specific User ID)
|
|
issuer: str
|
|
The issuer of the signature
|
|
packet: Path
|
|
The path to the packet
|
|
packet_key: str
|
|
The key identifying the packet (e.g. its Fingerprint)
|
|
"""
|
|
|
|
if not direct_sigs.get(packet_key):
|
|
direct_sigs = direct_sigs | {packet_key: defaultdict(list)}
|
|
|
|
direct_sigs[packet_key][issuer].append(packet)
|
|
return direct_sigs
|
|
|
|
# XXX: PrimaryKeyBinding
|
|
|
|
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
|
|
|
|
debug(f'Processing certificate {certificate}')
|
|
|
|
for packet in packet_split(working_dir, certificate):
|
|
debug(f'Processing packet {packet.name}')
|
|
if packet.name.endswith('--PublicKey'):
|
|
pubkey = packet
|
|
certificate_fingerprint = packet_dump_field(packet, 'Fingerprint')
|
|
current_packet_mode = 'pubkey'
|
|
current_packet_key = certificate_fingerprint
|
|
elif packet.name.endswith('--UserID'):
|
|
value = packet_dump_field(packet, 'Value')
|
|
value = simplify_user_id(value)
|
|
current_packet_mode = 'uid'
|
|
current_packet_key = value
|
|
uids[value] = packet
|
|
elif packet.name.endswith('--PublicSubkey'):
|
|
fingerprint = packet_dump_field(packet, 'Fingerprint')
|
|
current_packet_mode = 'subkey'
|
|
current_packet_key = fingerprint
|
|
subkey[fingerprint] = packet
|
|
elif packet.name.endswith('--Signature'):
|
|
if not certificate_fingerprint:
|
|
raise Exception('missing certificate fingerprint for "{packet.name}"')
|
|
if not current_packet_key:
|
|
raise Exception('missing current packet key for "{packet.name}"')
|
|
|
|
issuer = packet_dump_field(packet, 'Issuer')
|
|
signature_type = packet_dump_field(packet, 'Type')
|
|
|
|
if signature_type == 'DirectKey':
|
|
direct_sigs = add_packet_to_direct_sigs(
|
|
direct_sigs=direct_sigs,
|
|
issuer=issuer,
|
|
packet_key=current_packet_key,
|
|
packet=packet,
|
|
)
|
|
continue
|
|
|
|
if not current_packet_key:
|
|
# TODO GenericCertification PersonaCertification CasualCertification PositiveCertification
|
|
raise Exception(f'unknown packet key for "{packet.name}"')
|
|
|
|
if current_packet_mode == 'uid' or current_packet_mode == 'pubkey':
|
|
if certificate_fingerprint.endswith(issuer):
|
|
if signature_type == 'PositiveCertification':
|
|
uid_binding_sig[current_packet_key] = packet
|
|
elif signature_type == 'CertificationRevocation':
|
|
# XXX:
|
|
revocations[current_packet_key].append(packet)
|
|
elif signature_type == 'KeyRevocation':
|
|
direct_revocations = add_packet_to_direct_sigs(
|
|
direct_sigs=direct_revocations,
|
|
issuer=issuer,
|
|
packet_key=current_packet_key,
|
|
packet=packet,
|
|
)
|
|
else:
|
|
raise Exception(f'unknown signature type: {signature_type}')
|
|
else:
|
|
if signature_type.endswith('Certification'):
|
|
# NOTE: here we are only considering signatures directly on the root key
|
|
# signatures on a User ID, that are not tied to it via a SubkeyBinding are not addressed
|
|
if current_packet_key not in uids:
|
|
direct_sigs = add_packet_to_direct_sigs(
|
|
direct_sigs=direct_sigs,
|
|
issuer=issuer,
|
|
packet_key=current_packet_key,
|
|
packet=packet,
|
|
)
|
|
# NOTE: here we address all signatures on User IDs (those that are tied to it with a
|
|
# SubkeyBinding and those that are not)
|
|
else:
|
|
certifications[current_packet_key].append(packet)
|
|
elif signature_type == 'CertificationRevocation':
|
|
revocations[current_packet_key].append(packet)
|
|
else:
|
|
raise Exception(f'unknown signature type: {signature_type}')
|
|
elif current_packet_mode == 'subkey':
|
|
if signature_type == 'SubkeyBinding':
|
|
subkey_binding_sig[current_packet_key] = packet
|
|
elif signature_type == 'SubkeyRevocation':
|
|
# XXX:
|
|
pass
|
|
else:
|
|
raise Exception(f'unknown signature type: {signature_type}')
|
|
else:
|
|
raise Exception(f'unknown signature root for "{packet.name}"')
|
|
else:
|
|
raise Exception(f'unknown packet type "{packet.name}"')
|
|
|
|
if not certificate_fingerprint:
|
|
raise Exception('missing certificate fingerprint')
|
|
|
|
if not pubkey:
|
|
raise Exception('missing certificate public-key')
|
|
|
|
user_dir = (working_dir / username)
|
|
key_dir = (user_dir / certificate_fingerprint)
|
|
key_dir.mkdir(parents=True)
|
|
|
|
persist_basic_key(
|
|
certificate_fingerprint=certificate_fingerprint,
|
|
pubkey=pubkey,
|
|
key_dir=key_dir,
|
|
subkey=subkey,
|
|
subkey_binding_sig=subkey_binding_sig,
|
|
uid_binding_sig=uid_binding_sig,
|
|
uids=uids,
|
|
)
|
|
|
|
persist_direct_sigs(
|
|
direct_sigs=direct_sigs,
|
|
pubkey=pubkey,
|
|
key_dir=key_dir,
|
|
)
|
|
|
|
persist_direct_sigs(
|
|
direct_sigs=direct_revocations,
|
|
pubkey=pubkey,
|
|
key_dir=key_dir,
|
|
sig_type="revocation",
|
|
)
|
|
|
|
persist_certifications(
|
|
certifications=certifications,
|
|
pubkey=pubkey,
|
|
key_dir=key_dir,
|
|
uid_binding_sig=uid_binding_sig,
|
|
uids=uids,
|
|
)
|
|
|
|
persist_revocations(
|
|
pubkey=pubkey,
|
|
revocations=revocations,
|
|
key_dir=key_dir,
|
|
uid_binding_sig=uid_binding_sig,
|
|
uids=uids,
|
|
)
|
|
|
|
return user_dir
|
|
|
|
|
|
def persist_basic_key(
|
|
certificate_fingerprint: str,
|
|
pubkey: Path,
|
|
key_dir: Path,
|
|
subkey: Dict[str, Path],
|
|
subkey_binding_sig: Dict[str, Path],
|
|
uid_binding_sig: Dict[str, Path],
|
|
uids: Dict[str, Path],
|
|
) -> None:
|
|
"""Persist the basic key material of a root key to file
|
|
|
|
The basic key material consists of the root key's public key, any PublicSubkeys and their SubkeyBindings, all User
|
|
IDs and the per User ID PositiveCertifications.
|
|
The file is written to key_dir and is named after the root key's certificate fingerprint.
|
|
|
|
Parameters
|
|
----------
|
|
certificate_fingerprint: str
|
|
The unique fingerprint of the public key
|
|
pubkey: Path
|
|
The path to the public key of the root key
|
|
key_dir: Path
|
|
The root directory below which the basic key material is persisted
|
|
subkey: Dict[str, Path]
|
|
The PublicSubkeys of a key
|
|
subkey_binding_sig: Dict[str, Path]
|
|
The SubkeyBinding signatures of a Public-Key (the root key)
|
|
uid_binding_sig: Dict[str, Path]
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
|
uids: Dict[str, Path]
|
|
The User IDs of a Public-Key (the root key)
|
|
"""
|
|
|
|
packets: List[Path] = [pubkey]
|
|
for key in uid_binding_sig.keys():
|
|
packets.extend([uids[key], uid_binding_sig[key]])
|
|
for key in subkey_binding_sig.keys():
|
|
packets.extend([subkey[key], subkey_binding_sig[key]])
|
|
|
|
packet_join(packets, key_dir / f'{certificate_fingerprint}.asc')
|
|
|
|
|
|
def persist_direct_sigs(
|
|
direct_sigs: Dict[str, Dict[str, List[Path]]],
|
|
pubkey: Path,
|
|
key_dir: Path,
|
|
sig_type: str = "certification",
|
|
) -> None:
|
|
"""Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to
|
|
file(s)
|
|
|
|
Parameters
|
|
----------
|
|
certifications: Dict[str, List[Path]]
|
|
The certifications to write to file
|
|
pubkey: Path
|
|
The path to the public key of the root key
|
|
key_dir: Path
|
|
The root directory below which the Directkeys are persisted
|
|
sig_type: str
|
|
The type of direct certification to persist (defaults to 'certification'). This influences the directory name
|
|
"""
|
|
|
|
for key, current_certifications in direct_sigs.items():
|
|
for issuer, certifications in current_certifications.items():
|
|
direct_key_dir = key_dir / sig_type
|
|
direct_key_dir.mkdir(parents=True, exist_ok=True)
|
|
packets = [pubkey] + certifications
|
|
output_file = direct_key_dir / f'{issuer}.asc'
|
|
debug(f'Writing file {output_file} from {[str(cert) for cert in certifications]}')
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
def persist_certifications(
|
|
certifications: Dict[str, List[Path]],
|
|
pubkey: Path,
|
|
key_dir: Path,
|
|
uid_binding_sig: Dict[str, Path],
|
|
uids: Dict[str, Path],
|
|
) -> None:
|
|
"""Persist the certifications of a root key to file(s)
|
|
|
|
The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and
|
|
PositiveCertifications for all User IDs of the given root key.
|
|
All certifications are persisted in per User ID certification directories below key_dir.
|
|
|
|
Parameters
|
|
----------
|
|
certifications: Dict[str, List[Path]]
|
|
The certifications to write to file
|
|
pubkey: Path
|
|
The path to the public key of the root key
|
|
key_dir: Path
|
|
The root directory below which certifications are persisted
|
|
uid_binding_sig: Dict[str, Path]
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
|
uids: Dict[str, Path]
|
|
The User IDs of a Public-Key (the root key)
|
|
"""
|
|
|
|
for key, current_certifications in certifications.items():
|
|
for certification in current_certifications:
|
|
certification_dir = key_dir / key / 'certification'
|
|
certification_dir.mkdir(parents=True, exist_ok=True)
|
|
issuer = packet_dump_field(certification, 'Issuer')
|
|
|
|
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
|
|
output_file = certification_dir / f'{issuer}.asc'
|
|
debug(f'Writing file {output_file} from {certification}')
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
def persist_revocations(
|
|
pubkey: Path,
|
|
revocations: Dict[str, List[Path]],
|
|
key_dir: Path,
|
|
uid_binding_sig: Dict[str, Path],
|
|
uids: Dict[str, Path],
|
|
) -> None:
|
|
"""Persist the revocations of a root key to file(s)
|
|
|
|
The revocations include all CertificationRevocations for all User IDs of the given root key.
|
|
All revocations are persisted in per User ID 'revocation' directories below key_dir.
|
|
|
|
Parameters
|
|
----------
|
|
pubkey: Path
|
|
The path to the public key of the root key
|
|
revocations: Dict[str, List[Path]]
|
|
The revocations to write to file
|
|
key_dir: Path
|
|
The root directory below which revocations will be persisted
|
|
uid_binding_sig: Dict[str, Path]
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
|
uids: Dict[str, Path]
|
|
The User IDs of a Public-Key (the root key)
|
|
"""
|
|
|
|
for key, current_revocations in revocations.items():
|
|
for revocation in current_revocations:
|
|
revocation_dir = key_dir / key / 'revocation'
|
|
revocation_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
issuer = packet_dump_field(revocation, 'Issuer')
|
|
packets = [pubkey, uids[key]]
|
|
# Binding sigs only exist for 3rd-party revocations
|
|
if key in uid_binding_sig:
|
|
packets.append(uid_binding_sig[key])
|
|
packets.append(revocation)
|
|
output_file = revocation_dir / f'{issuer}.asc'
|
|
debug(f'Writing file {output_file} from {revocation}')
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
def packet_dump(packet: Path) -> str:
|
|
return system(['sq', 'packet', 'dump', str(packet)])
|
|
|
|
|
|
def packet_dump_field(packet: Path, field: str) -> str:
|
|
dump = packet_dump(packet)
|
|
lines = [line.strip() for line in dump.splitlines()]
|
|
lines = list(filter(lambda line: line.startswith(f'{field}: '), lines))
|
|
if not lines:
|
|
raise Exception(f'Packet has no field "{field}"')
|
|
return lines[0].split(maxsplit=1)[1]
|
|
|
|
|
|
def packet_split(working_dir: Path, key: Path) -> Iterable[Path]:
|
|
packet_dir = Path(mkdtemp(dir=working_dir)).absolute()
|
|
with cwd(packet_dir):
|
|
system(['sq', 'packet', 'split', '--prefix', '', str(key)])
|
|
return natural_sort_path(packet_dir.iterdir())
|
|
|
|
|
|
def packet_join(packets: List[Path], output: Path) -> None:
|
|
cmd = ['sq', 'packet', 'join']
|
|
packets_str = list(map(lambda path: str(path), packets))
|
|
cmd.extend(packets_str)
|
|
cmd.extend(['--output', str(output)])
|
|
system(cmd, exit_on_error=False)
|
|
|
|
|
|
def simplify_user_id(user_id: str) -> str:
|
|
user_id = user_id.replace('@', '_at_')
|
|
user_id = sub('[<>]', '', user_id)
|
|
user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id)
|
|
return user_id
|
|
|
|
|
|
def convert(
|
|
working_dir: Path,
|
|
source: Path,
|
|
target_dir: Optional[Path] = None,
|
|
name_override: Optional[str] = None,
|
|
) -> Path:
|
|
directories: List[Path] = []
|
|
if source.is_dir():
|
|
for key in source.iterdir():
|
|
directories.append(
|
|
convert_certificate(working_dir=working_dir, certificate=key, name_override=name_override)
|
|
)
|
|
else:
|
|
directories.append(
|
|
convert_certificate(working_dir=working_dir, certificate=source, name_override=name_override)
|
|
)
|
|
|
|
if target_dir:
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
else:
|
|
# persistent target directory
|
|
target_dir = Path(mkdtemp()).absolute()
|
|
|
|
for path in directories:
|
|
target_dir.mkdir(exist_ok=True)
|
|
if (target_dir / path.name).exists():
|
|
for key_dir in path.iterdir():
|
|
move(key_dir, target_dir / path.name)
|
|
else:
|
|
move(path, target_dir)
|
|
|
|
return target_dir
|
|
|
|
|
|
def keyring_import(working_dir: Path, source: Path, target_dir: Optional[Path] = None):
|
|
pass
|
|
|
|
|
|
def absolute_path(path: str) -> Path:
|
|
return Path(path).absolute()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = ArgumentParser()
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help='Causes to print debugging messages about the progress')
|
|
parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory')
|
|
subcommands = parser.add_subparsers(dest="subcommand")
|
|
|
|
convert_parser = subcommands.add_parser(
|
|
'convert',
|
|
help="convert a legacy-style PGP cert or directory containing PGP certs to the new format",
|
|
)
|
|
convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert')
|
|
convert_parser.add_argument('--target', type=absolute_path, help='target directory')
|
|
convert_parser.add_argument(
|
|
'--name',
|
|
type=str,
|
|
default=None,
|
|
help='override the username to use (only useful when targetting a single file)',
|
|
)
|
|
|
|
import_parser = subcommands.add_parser('import')
|
|
import_parser.add_argument('source', type=absolute_path, help='File or directory')
|
|
import_parser.add_argument('--target', type=absolute_path, help='Target directory')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
basicConfig(level=DEBUG)
|
|
|
|
# temporary working directory that gets auto cleaned
|
|
with TemporaryDirectory() as tempdir:
|
|
working_dir = Path(tempdir)
|
|
start_dir = Path().absolute()
|
|
chdir(working_dir)
|
|
debug(f'Working directory: {working_dir}')
|
|
|
|
if 'convert' == args.subcommand:
|
|
print(convert(working_dir, args.source, args.target))
|
|
elif 'import' == args.subcommand:
|
|
keyring_import(working_dir, args.source, args.target)
|
|
|
|
if args.wait:
|
|
print('Press [ENTER] to continue')
|
|
input()
|