7f7c2f13f0
keyringctl: Add `sanitize_certificate_file()` to potentially split per-user input files that contain more than one certificate. Change `packet_split()` to add documentation and rename the key parameter to certificate, as it is more generic. Change `convert_certificate()` to use named parameters when calling `packet_split()`. Change `convert()` to call `convert_certificate()` on a list of sanitized certificates (generated using `sanitized_certificate_file()`) to be able to deal with multi-certificate files per user.
599 lines
21 KiB
Python
Executable File
599 lines
21 KiB
Python
Executable File
#!/usr/bin/env python
|
|
from argparse import ArgumentParser
|
|
|
|
from collections import defaultdict
|
|
|
|
from os import chdir
|
|
from os import getcwd
|
|
|
|
from pathlib import Path
|
|
|
|
from shutil import copytree
|
|
|
|
from re import escape
|
|
from re import split
|
|
from re import sub
|
|
|
|
from subprocess import CalledProcessError
|
|
from subprocess import check_output
|
|
from subprocess import PIPE
|
|
|
|
from sys import exit
|
|
from sys import stderr
|
|
|
|
from tempfile import TemporaryDirectory
|
|
from tempfile import mkdtemp
|
|
|
|
from logging import basicConfig
|
|
from logging import debug
|
|
from logging import DEBUG
|
|
|
|
from typing import Any
|
|
from typing import Dict
|
|
from typing import List
|
|
from typing import Optional
|
|
from typing import Iterable
|
|
|
|
from contextlib import contextmanager
|
|
|
|
|
|
@contextmanager
|
|
def cwd(new_dir: Path):
|
|
previous_dir = getcwd()
|
|
chdir(new_dir)
|
|
try:
|
|
yield
|
|
finally:
|
|
chdir(previous_dir)
|
|
|
|
# class Key():
|
|
# fingerprint: str = ""
|
|
# pubkey: Path
|
|
# uids: List[]
|
|
# uid_certification: List[]
|
|
# subkeys: List[]
|
|
# subkey_certification: List[]
|
|
# uid_signatures: List[]
|
|
|
|
|
|
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
|
|
def convert(text: str) -> Any:
|
|
return int(text) if text.isdigit() else text.lower()
|
|
|
|
def alphanum_key(key: Path) -> List[Any]:
|
|
return [convert(c) for c in split('([0-9]+)', str(key.name))]
|
|
|
|
return sorted(_list, key=alphanum_key)
|
|
|
|
|
|
def system(cmd: List[str], exit_on_error: bool = True) -> str:
|
|
try:
|
|
return check_output(cmd, stderr=PIPE).decode()
|
|
except CalledProcessError as e:
|
|
stderr.buffer.write(e.stderr)
|
|
if exit_on_error:
|
|
exit(e.returncode)
|
|
raise e
|
|
|
|
|
|
def convert_certificate(working_dir: Path, certificate: Path, name_override: Optional[str] = None) -> Path:
|
|
certificate_fingerprint: Optional[str] = None
|
|
pubkey: Optional[Path] = None
|
|
direct_sigs: Dict[str, Dict[str, List[Path]]] = {}
|
|
direct_revocations: Dict[str, Dict[str, List[Path]]] = {}
|
|
current_packet_mode: Optional[str] = None
|
|
current_packet_key: Optional[str] = None
|
|
uids: Dict[str, Path] = {}
|
|
uid_binding_sig: Dict[str, Path] = {}
|
|
subkey: Dict[str, Path] = {}
|
|
subkey_binding_sig: Dict[str, Path] = {}
|
|
certifications: Dict[str, List[Path]] = defaultdict(list)
|
|
revocations: Dict[str, List[Path]] = defaultdict(list)
|
|
username = name_override or certificate.name.split(".")[0]
|
|
|
|
def add_packet_to_direct_sigs(
|
|
direct_sigs: Dict[str, Dict[str, List[Path]]],
|
|
issuer: str,
|
|
packet_key: str,
|
|
packet: Path,
|
|
) -> Dict[str, Dict[str, List[Path]]]:
|
|
"""Add a packet to the set of DirectKeys
|
|
|
|
If no key with the given packet_key exists yet, it is created.
|
|
|
|
Parameters
|
|
----------
|
|
direct_sigs: Dict[str, Dict[str, List[Path]]]
|
|
The signatures directly on a root key (such as DirectKey or *Certifications without a specific User ID)
|
|
issuer: str
|
|
The issuer of the signature
|
|
packet: Path
|
|
The path to the packet
|
|
packet_key: str
|
|
The key identifying the packet (e.g. its Fingerprint)
|
|
"""
|
|
|
|
if not direct_sigs.get(packet_key):
|
|
direct_sigs = direct_sigs | {packet_key: defaultdict(list)}
|
|
|
|
direct_sigs[packet_key][issuer].append(packet)
|
|
return direct_sigs
|
|
|
|
# XXX: PrimaryKeyBinding
|
|
|
|
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
|
|
|
|
debug(f'Processing certificate {certificate}')
|
|
|
|
for packet in packet_split(working_dir=working_dir, certificate=certificate):
|
|
debug(f'Processing packet {packet.name}')
|
|
if packet.name.endswith('--PublicKey'):
|
|
pubkey = packet
|
|
certificate_fingerprint = packet_dump_field(packet, 'Fingerprint')
|
|
current_packet_mode = 'pubkey'
|
|
current_packet_key = certificate_fingerprint
|
|
elif packet.name.endswith('--UserID'):
|
|
value = packet_dump_field(packet, 'Value')
|
|
value = simplify_user_id(value)
|
|
current_packet_mode = 'uid'
|
|
current_packet_key = value
|
|
uids[value] = packet
|
|
elif packet.name.endswith('--PublicSubkey'):
|
|
fingerprint = packet_dump_field(packet, 'Fingerprint')
|
|
current_packet_mode = 'subkey'
|
|
current_packet_key = fingerprint
|
|
subkey[fingerprint] = packet
|
|
elif packet.name.endswith('--Signature'):
|
|
if not certificate_fingerprint:
|
|
raise Exception('missing certificate fingerprint for "{packet.name}"')
|
|
if not current_packet_key:
|
|
raise Exception('missing current packet key for "{packet.name}"')
|
|
|
|
issuer = packet_dump_field(packet, 'Issuer')
|
|
signature_type = packet_dump_field(packet, 'Type')
|
|
|
|
if signature_type == 'DirectKey':
|
|
direct_sigs = add_packet_to_direct_sigs(
|
|
direct_sigs=direct_sigs,
|
|
issuer=issuer,
|
|
packet_key=current_packet_key,
|
|
packet=packet,
|
|
)
|
|
continue
|
|
|
|
if not current_packet_key:
|
|
# TODO GenericCertification PersonaCertification CasualCertification PositiveCertification
|
|
raise Exception(f'unknown packet key for "{packet.name}"')
|
|
|
|
if current_packet_mode == 'uid' or current_packet_mode == 'pubkey':
|
|
if certificate_fingerprint.endswith(issuer):
|
|
if signature_type == 'PositiveCertification':
|
|
uid_binding_sig[current_packet_key] = packet
|
|
elif signature_type == 'CertificationRevocation':
|
|
# XXX:
|
|
revocations[current_packet_key].append(packet)
|
|
elif signature_type == 'KeyRevocation':
|
|
direct_revocations = add_packet_to_direct_sigs(
|
|
direct_sigs=direct_revocations,
|
|
issuer=issuer,
|
|
packet_key=current_packet_key,
|
|
packet=packet,
|
|
)
|
|
else:
|
|
raise Exception(f'unknown signature type: {signature_type}')
|
|
else:
|
|
if signature_type.endswith('Certification'):
|
|
# NOTE: here we are only considering signatures directly on the root key
|
|
# signatures on a User ID, that are not tied to it via a SubkeyBinding are not addressed
|
|
if current_packet_key not in uids:
|
|
direct_sigs = add_packet_to_direct_sigs(
|
|
direct_sigs=direct_sigs,
|
|
issuer=issuer,
|
|
packet_key=current_packet_key,
|
|
packet=packet,
|
|
)
|
|
# NOTE: here we address all signatures on User IDs (those that are tied to it with a
|
|
# SubkeyBinding and those that are not)
|
|
else:
|
|
certifications[current_packet_key].append(packet)
|
|
elif signature_type == 'CertificationRevocation':
|
|
revocations[current_packet_key].append(packet)
|
|
else:
|
|
raise Exception(f'unknown signature type: {signature_type}')
|
|
elif current_packet_mode == 'subkey':
|
|
if signature_type == 'SubkeyBinding':
|
|
subkey_binding_sig[current_packet_key] = packet
|
|
elif signature_type == 'SubkeyRevocation':
|
|
# XXX:
|
|
pass
|
|
else:
|
|
raise Exception(f'unknown signature type: {signature_type}')
|
|
else:
|
|
raise Exception(f'unknown signature root for "{packet.name}"')
|
|
else:
|
|
raise Exception(f'unknown packet type "{packet.name}"')
|
|
|
|
if not certificate_fingerprint:
|
|
raise Exception('missing certificate fingerprint')
|
|
|
|
if not pubkey:
|
|
raise Exception('missing certificate public-key')
|
|
|
|
user_dir = (working_dir / username)
|
|
key_dir = (user_dir / certificate_fingerprint)
|
|
key_dir.mkdir(parents=True)
|
|
|
|
persist_basic_key(
|
|
certificate_fingerprint=certificate_fingerprint,
|
|
pubkey=pubkey,
|
|
key_dir=key_dir,
|
|
subkey=subkey,
|
|
subkey_binding_sig=subkey_binding_sig,
|
|
uid_binding_sig=uid_binding_sig,
|
|
uids=uids,
|
|
)
|
|
|
|
persist_direct_sigs(
|
|
direct_sigs=direct_sigs,
|
|
pubkey=pubkey,
|
|
key_dir=key_dir,
|
|
)
|
|
|
|
persist_direct_sigs(
|
|
direct_sigs=direct_revocations,
|
|
pubkey=pubkey,
|
|
key_dir=key_dir,
|
|
sig_type="revocation",
|
|
)
|
|
|
|
persist_certifications(
|
|
certifications=certifications,
|
|
pubkey=pubkey,
|
|
key_dir=key_dir,
|
|
uid_binding_sig=uid_binding_sig,
|
|
uids=uids,
|
|
)
|
|
|
|
persist_revocations(
|
|
pubkey=pubkey,
|
|
revocations=revocations,
|
|
key_dir=key_dir,
|
|
uid_binding_sig=uid_binding_sig,
|
|
uids=uids,
|
|
)
|
|
|
|
return user_dir
|
|
|
|
|
|
def persist_basic_key(
|
|
certificate_fingerprint: str,
|
|
pubkey: Path,
|
|
key_dir: Path,
|
|
subkey: Dict[str, Path],
|
|
subkey_binding_sig: Dict[str, Path],
|
|
uid_binding_sig: Dict[str, Path],
|
|
uids: Dict[str, Path],
|
|
) -> None:
|
|
"""Persist the basic key material of a root key to file
|
|
|
|
The basic key material consists of the root key's public key, any PublicSubkeys and their SubkeyBindings, all User
|
|
IDs and the per User ID PositiveCertifications.
|
|
The file is written to key_dir and is named after the root key's certificate fingerprint.
|
|
|
|
Parameters
|
|
----------
|
|
certificate_fingerprint: str
|
|
The unique fingerprint of the public key
|
|
pubkey: Path
|
|
The path to the public key of the root key
|
|
key_dir: Path
|
|
The root directory below which the basic key material is persisted
|
|
subkey: Dict[str, Path]
|
|
The PublicSubkeys of a key
|
|
subkey_binding_sig: Dict[str, Path]
|
|
The SubkeyBinding signatures of a Public-Key (the root key)
|
|
uid_binding_sig: Dict[str, Path]
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
|
uids: Dict[str, Path]
|
|
The User IDs of a Public-Key (the root key)
|
|
"""
|
|
|
|
packets: List[Path] = [pubkey]
|
|
for key in uid_binding_sig.keys():
|
|
packets.extend([uids[key], uid_binding_sig[key]])
|
|
for key in subkey_binding_sig.keys():
|
|
packets.extend([subkey[key], subkey_binding_sig[key]])
|
|
|
|
packet_join(packets, key_dir / f'{certificate_fingerprint}.asc')
|
|
|
|
|
|
def persist_direct_sigs(
|
|
direct_sigs: Dict[str, Dict[str, List[Path]]],
|
|
pubkey: Path,
|
|
key_dir: Path,
|
|
sig_type: str = "certification",
|
|
) -> None:
|
|
"""Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to
|
|
file(s)
|
|
|
|
Parameters
|
|
----------
|
|
certifications: Dict[str, List[Path]]
|
|
The certifications to write to file
|
|
pubkey: Path
|
|
The path to the public key of the root key
|
|
key_dir: Path
|
|
The root directory below which the Directkeys are persisted
|
|
sig_type: str
|
|
The type of direct certification to persist (defaults to 'certification'). This influences the directory name
|
|
"""
|
|
|
|
for key, current_certifications in direct_sigs.items():
|
|
for issuer, certifications in current_certifications.items():
|
|
direct_key_dir = key_dir / sig_type
|
|
direct_key_dir.mkdir(parents=True, exist_ok=True)
|
|
packets = [pubkey] + certifications
|
|
output_file = direct_key_dir / f'{issuer}.asc'
|
|
debug(f'Writing file {output_file} from {[str(cert) for cert in certifications]}')
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
def persist_certifications(
|
|
certifications: Dict[str, List[Path]],
|
|
pubkey: Path,
|
|
key_dir: Path,
|
|
uid_binding_sig: Dict[str, Path],
|
|
uids: Dict[str, Path],
|
|
) -> None:
|
|
"""Persist the certifications of a root key to file(s)
|
|
|
|
The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and
|
|
PositiveCertifications for all User IDs of the given root key.
|
|
All certifications are persisted in per User ID certification directories below key_dir.
|
|
|
|
Parameters
|
|
----------
|
|
certifications: Dict[str, List[Path]]
|
|
The certifications to write to file
|
|
pubkey: Path
|
|
The path to the public key of the root key
|
|
key_dir: Path
|
|
The root directory below which certifications are persisted
|
|
uid_binding_sig: Dict[str, Path]
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
|
uids: Dict[str, Path]
|
|
The User IDs of a Public-Key (the root key)
|
|
"""
|
|
|
|
for key, current_certifications in certifications.items():
|
|
for certification in current_certifications:
|
|
certification_dir = key_dir / key / 'certification'
|
|
certification_dir.mkdir(parents=True, exist_ok=True)
|
|
issuer = packet_dump_field(certification, 'Issuer')
|
|
|
|
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
|
|
output_file = certification_dir / f'{issuer}.asc'
|
|
debug(f'Writing file {output_file} from {certification}')
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
def persist_revocations(
|
|
pubkey: Path,
|
|
revocations: Dict[str, List[Path]],
|
|
key_dir: Path,
|
|
uid_binding_sig: Dict[str, Path],
|
|
uids: Dict[str, Path],
|
|
) -> None:
|
|
"""Persist the revocations of a root key to file(s)
|
|
|
|
The revocations include all CertificationRevocations for all User IDs of the given root key.
|
|
All revocations are persisted in per User ID 'revocation' directories below key_dir.
|
|
|
|
Parameters
|
|
----------
|
|
pubkey: Path
|
|
The path to the public key of the root key
|
|
revocations: Dict[str, List[Path]]
|
|
The revocations to write to file
|
|
key_dir: Path
|
|
The root directory below which revocations will be persisted
|
|
uid_binding_sig: Dict[str, Path]
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
|
uids: Dict[str, Path]
|
|
The User IDs of a Public-Key (the root key)
|
|
"""
|
|
|
|
for key, current_revocations in revocations.items():
|
|
for revocation in current_revocations:
|
|
revocation_dir = key_dir / key / 'revocation'
|
|
revocation_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
issuer = packet_dump_field(revocation, 'Issuer')
|
|
packets = [pubkey, uids[key]]
|
|
# Binding sigs only exist for 3rd-party revocations
|
|
if key in uid_binding_sig:
|
|
packets.append(uid_binding_sig[key])
|
|
packets.append(revocation)
|
|
output_file = revocation_dir / f'{issuer}.asc'
|
|
debug(f'Writing file {output_file} from {revocation}')
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
def packet_dump(packet: Path) -> str:
|
|
return system(['sq', 'packet', 'dump', str(packet)])
|
|
|
|
|
|
def packet_dump_field(packet: Path, field: str) -> str:
|
|
dump = packet_dump(packet)
|
|
lines = [line.strip() for line in dump.splitlines()]
|
|
lines = list(filter(lambda line: line.startswith(f'{field}: '), lines))
|
|
if not lines:
|
|
raise Exception(f'Packet has no field "{field}"')
|
|
return lines[0].split(maxsplit=1)[1]
|
|
|
|
|
|
def sanitize_certificate_file(working_dir: Path, certificate: Path) -> List[Path]:
|
|
"""Sanitize a certificate file, potentially containing several certificates
|
|
|
|
If the input file holds several certificates, they are split into respective files below working_dir and their
|
|
paths are returned. Else the path to the input certificate file is returned.
|
|
|
|
Parameters
|
|
----------
|
|
working_dir: Path
|
|
The path of the working directory below which to create split certificates
|
|
certificate: Path
|
|
The absolute path of a file containing one or several PGP certificates
|
|
|
|
Returns
|
|
-------
|
|
List[Path]
|
|
A list of paths that either contain the input certificate or several split certificates
|
|
"""
|
|
|
|
begin_cert = "-----BEGIN PGP PUBLIC KEY BLOCK-----"
|
|
end_cert = "-----END PGP PUBLIC KEY BLOCK-----"
|
|
certificate_list: List[Path] = []
|
|
|
|
with open(file=certificate, mode="r") as certificate_file:
|
|
file = certificate_file.read()
|
|
if file.count(begin_cert) > 1 and file.count(end_cert) > 1:
|
|
debug(f"Several public keys detected in file: {certificate}")
|
|
for split_cert in [f"{cert}{end_cert}\n" for cert in list(filter(None, file.split(f"{end_cert}\n")))]:
|
|
split_cert_dir = Path(mkdtemp(dir=working_dir)).absolute()
|
|
split_cert_path = split_cert_dir / certificate.name
|
|
with open(file=split_cert_path, mode="w") as split_cert_file:
|
|
split_cert_file.write(split_cert)
|
|
debug(f"Writing split cert to file: {split_cert_path}")
|
|
certificate_list.append(split_cert_path)
|
|
return certificate_list
|
|
else:
|
|
return [certificate]
|
|
|
|
|
|
def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
|
|
"""Split a file containing a PGP certificate into separate packet files
|
|
|
|
The files are split using sq
|
|
|
|
Parameters
|
|
----------
|
|
working_dir: Path
|
|
The path of the working directory below which to create the output files
|
|
certificate: Path
|
|
The absolute path of a file containing one PGP certificate
|
|
|
|
Returns
|
|
-------
|
|
Iterable[Path]
|
|
An iterable over the naturally sorted list of packet files derived from certificate
|
|
"""
|
|
|
|
packet_dir = Path(mkdtemp(dir=working_dir)).absolute()
|
|
|
|
with cwd(packet_dir):
|
|
system(['sq', 'packet', 'split', '--prefix', '', str(certificate)])
|
|
return natural_sort_path(packet_dir.iterdir())
|
|
|
|
|
|
def packet_join(packets: List[Path], output: Path) -> None:
|
|
cmd = ['sq', 'packet', 'join']
|
|
packets_str = list(map(lambda path: str(path), packets))
|
|
cmd.extend(packets_str)
|
|
cmd.extend(['--output', str(output)])
|
|
system(cmd, exit_on_error=False)
|
|
|
|
|
|
def simplify_user_id(user_id: str) -> str:
|
|
user_id = user_id.replace('@', '_at_')
|
|
user_id = sub('[<>]', '', user_id)
|
|
user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id)
|
|
return user_id
|
|
|
|
|
|
def convert(
|
|
working_dir: Path,
|
|
source: Path,
|
|
target_dir: Path,
|
|
name_override: Optional[str] = None,
|
|
) -> Path:
|
|
directories: List[Path] = []
|
|
if source.is_dir():
|
|
for key in source.iterdir():
|
|
for sane_cert in sanitize_certificate_file(working_dir=working_dir, certificate=key):
|
|
directories.append(
|
|
convert_certificate(working_dir=working_dir, certificate=sane_cert, name_override=name_override)
|
|
)
|
|
else:
|
|
for sane_cert in sanitize_certificate_file(working_dir=working_dir, certificate=source):
|
|
directories.append(
|
|
convert_certificate(working_dir=working_dir, certificate=sane_cert, name_override=name_override)
|
|
)
|
|
|
|
for path in directories:
|
|
(target_dir / path.name).mkdir(exist_ok=True)
|
|
copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True)
|
|
|
|
return target_dir
|
|
|
|
|
|
def keyring_import(working_dir: Path, source: Path, target_dir: Optional[Path] = None):
|
|
pass
|
|
|
|
|
|
def absolute_path(path: str) -> Path:
|
|
return Path(path).absolute()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = ArgumentParser()
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help='Causes to print debugging messages about the progress')
|
|
parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory')
|
|
subcommands = parser.add_subparsers(dest="subcommand")
|
|
|
|
convert_parser = subcommands.add_parser(
|
|
'convert',
|
|
help="convert a legacy-style PGP cert or directory containing PGP certs to the new format",
|
|
)
|
|
convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert')
|
|
convert_parser.add_argument('--target', type=absolute_path, help='target directory')
|
|
convert_parser.add_argument(
|
|
'--name',
|
|
type=str,
|
|
default=None,
|
|
help='override the username to use (only useful when targetting a single file)',
|
|
)
|
|
|
|
import_parser = subcommands.add_parser('import')
|
|
import_parser.add_argument('source', type=absolute_path, help='File or directory')
|
|
import_parser.add_argument('--target', type=absolute_path, help='Target directory')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
basicConfig(level=DEBUG)
|
|
|
|
if args.target:
|
|
args.target.mkdir(parents=True, exist_ok=True)
|
|
target_dir = args.target
|
|
else:
|
|
# persistent target directory
|
|
target_dir = Path(mkdtemp()).absolute()
|
|
|
|
# temporary working directory that gets auto cleaned
|
|
with TemporaryDirectory() as tempdir:
|
|
working_dir = Path(tempdir)
|
|
start_dir = Path().absolute()
|
|
chdir(working_dir)
|
|
debug(f'Working directory: {working_dir}')
|
|
|
|
if 'convert' == args.subcommand:
|
|
print(convert(working_dir, args.source, target_dir))
|
|
elif 'import' == args.subcommand:
|
|
keyring_import(working_dir, args.source, target_dir)
|
|
|
|
if args.wait:
|
|
print('Press [ENTER] to continue')
|
|
input()
|