condorcore-keyring/keyringctl
David Runge 5320f2491e
keyringctl: Implement export of ownertrust/ revoker status
keyringctl:
Add `temp_join_keys()` to generically join PGP packets in a directory
below a temporary directory.
Add `get_all_and_revoked_certs()` to retrieve a tuple containing a list
of all public key fingerprints and a list of all self-revoked public key
fingerprints in a list of paths.
Add `export_ownertrust()` to export a list of fingerprints of
non-revoked public keys to a file that can be imported using `gpg
--import-ownertrust`.
Add `export_revoked()` to export the fingerprints of all self-revoked
public keys and the fingerprints of public keys that have been revoked
by third party signing keys (the latter is still fairly naive).
Change `export_keyring()` to make use of `temp_join_keys()` for
preparing main signing keys and general keys for the export to file. Add
integration for exporting ownertrust and revoker status (using
`export_ownertrust()` and `export_revoked()`, respectively).
Change `__main__` by extending the export_parser by a `-m`/ `--main`
argument to provide one or multiple files or directories, that serve as
the signing authority for key material located below `-s`/ `--source`.
Add a `-p`/ `--pacman-integration` to provide the means to export
ownertrust and revoker status on demand.
2021-11-30 22:54:06 +01:00

942 lines
34 KiB
Python
Executable File

#!/usr/bin/env python
from argparse import ArgumentParser
from collections import defaultdict
from os import chdir
from os import getcwd
from pathlib import Path
from shutil import copytree
from re import escape
from re import split
from re import sub
from subprocess import CalledProcessError
from subprocess import check_output
from subprocess import PIPE
from sys import exit
from sys import stderr
from tempfile import TemporaryDirectory
from tempfile import mkdtemp
from logging import basicConfig
from logging import debug
from logging import DEBUG
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Iterable
from typing import Tuple
from contextlib import contextmanager
@contextmanager
def cwd(new_dir: Path):
previous_dir = getcwd()
chdir(new_dir)
try:
yield
finally:
chdir(previous_dir)
# class Key():
# fingerprint: str = ""
# pubkey: Path
# uids: List[]
# uid_certification: List[]
# subkeys: List[]
# subkey_certification: List[]
# uid_signatures: List[]
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
def convert(text: str) -> Any:
return int(text) if text.isdigit() else text.lower()
def alphanum_key(key: Path) -> List[Any]:
return [convert(c) for c in split('([0-9]+)', str(key.name))]
return sorted(_list, key=alphanum_key)
def system(cmd: List[str], exit_on_error: bool = True) -> str:
try:
return check_output(cmd, stderr=PIPE).decode()
except CalledProcessError as e:
stderr.buffer.write(e.stderr)
if exit_on_error:
exit(e.returncode)
raise e
def convert_certificate(working_dir: Path, certificate: Path, name_override: Optional[str] = None) -> Path:
certificate_fingerprint: Optional[str] = None
pubkey: Optional[Path] = None
direct_sigs: Dict[str, Dict[str, List[Path]]] = {}
direct_revocations: Dict[str, Dict[str, List[Path]]] = {}
current_packet_mode: Optional[str] = None
current_packet_key: Optional[str] = None
uids: Dict[str, Path] = {}
uid_binding_sigs: Dict[str, Path] = {}
subkeys: Dict[str, Dict[str, Path]] = {}
subkey_binding_sigs: Dict[str, Dict[str, Path]] = {}
subkey_revocations: Dict[str, Dict[str, Path]] = {}
certifications: Dict[str, List[Path]] = defaultdict(list)
revocations: Dict[str, List[Path]] = defaultdict(list)
username = name_override or certificate.name.split(".")[0]
def add_packet_to_direct_sigs(
direct_sigs: Dict[str, Dict[str, List[Path]]],
issuer: str,
packet_key: str,
packet: Path,
) -> Dict[str, Dict[str, List[Path]]]:
"""Add a packet to the set of DirectKeys
If no key with the given packet_key exists yet, it is created.
Parameters
----------
direct_sigs: Dict[str, Dict[str, List[Path]]]
The signatures directly on a root key (such as DirectKey or *Certifications without a specific User ID)
issuer: str
The issuer of the signature
packet: Path
The path to the packet
packet_key: str
The key identifying the packet (e.g. its Fingerprint)
"""
if not direct_sigs.get(packet_key):
direct_sigs = direct_sigs | {packet_key: defaultdict(list)}
direct_sigs[packet_key][issuer].append(packet)
return direct_sigs
# XXX: PrimaryKeyBinding
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
debug(f'Processing certificate {certificate}')
for packet in packet_split(working_dir=working_dir, certificate=certificate):
debug(f'Processing packet {packet.name}')
if packet.name.endswith('--PublicKey'):
pubkey = packet
certificate_fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'pubkey'
current_packet_key = certificate_fingerprint
elif packet.name.endswith('--UserID'):
value = simplify_user_id(packet_dump_field(packet, 'Value'))
current_packet_mode = 'uid'
current_packet_key = value
uids[value] = packet
elif packet.name.endswith('--PublicSubkey'):
fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'subkey'
current_packet_key = fingerprint
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
if not subkeys.get(certificate_fingerprint):
subkeys |= {certificate_fingerprint: {fingerprint: packet}}
else:
subkeys[certificate_fingerprint] |= {fingerprint: packet}
elif packet.name.endswith('--Signature'):
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
if not current_packet_key:
raise Exception('missing current packet key for "{packet.name}"')
issuer = packet_dump_field(packet, 'Issuer')
signature_type = packet_dump_field(packet, 'Type')
if current_packet_mode == 'pubkey':
if signature_type == 'KeyRevocation' and certificate_fingerprint.endswith(issuer):
direct_revocations = add_packet_to_direct_sigs(
direct_sigs=direct_revocations,
issuer=issuer,
packet_key=current_packet_key,
packet=packet,
)
elif signature_type in ['DirectKey', 'GenericCertification']:
direct_sigs = add_packet_to_direct_sigs(
direct_sigs=direct_sigs,
issuer=issuer,
packet_key=current_packet_key,
packet=packet,
)
else:
raise Exception(f'unknown signature type: {signature_type}')
elif current_packet_mode == 'uid':
if signature_type == 'CertificationRevocation':
revocations[current_packet_key].append(packet)
elif signature_type == 'PositiveCertification' and certificate_fingerprint.endswith(issuer):
uid_binding_sigs[current_packet_key] = packet
elif signature_type.endswith('Certification'):
certifications[current_packet_key].append(packet)
else:
raise Exception(f'unknown signature type: {signature_type}')
elif current_packet_mode == 'subkey':
if signature_type == 'SubkeyBinding':
if not subkey_binding_sigs.get(certificate_fingerprint):
subkey_binding_sigs |= {certificate_fingerprint: {fingerprint: packet}}
else:
subkey_binding_sigs[certificate_fingerprint] |= {fingerprint: packet}
elif signature_type == 'SubkeyRevocation':
if not subkey_revocations.get(certificate_fingerprint):
subkey_revocations |= {certificate_fingerprint: {fingerprint: packet}}
else:
subkey_revocations[certificate_fingerprint] |= {fingerprint: packet}
else:
raise Exception(f'unknown signature type: {signature_type}')
else:
raise Exception(f'unknown signature root for "{packet.name}"')
else:
raise Exception(f'unknown packet type "{packet.name}"')
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint')
if not pubkey:
raise Exception('missing certificate public-key')
user_dir = (working_dir / username)
key_dir = (user_dir / certificate_fingerprint)
key_dir.mkdir(parents=True, exist_ok=True)
persist_public_key(
certificate_fingerprint=certificate_fingerprint,
pubkey=pubkey,
key_dir=key_dir,
)
persist_uids(
key_dir=key_dir,
uid_binding_sigs=uid_binding_sigs,
uids=uids,
)
persist_subkeys(
certificate_fingerprint=certificate_fingerprint,
key_dir=key_dir,
subkeys=subkeys,
subkey_binding_sigs=subkey_binding_sigs,
)
persist_subkey_revocations(
certificate_fingerprint=certificate_fingerprint,
key_dir=key_dir,
subkey_revocations=subkey_revocations,
)
persist_direct_sigs(
direct_sigs=direct_sigs,
pubkey=pubkey,
key_dir=key_dir,
)
persist_direct_sigs(
direct_sigs=direct_revocations,
pubkey=pubkey,
key_dir=key_dir,
sig_type="revocation",
)
persist_certifications(
certifications=certifications,
pubkey=pubkey,
key_dir=key_dir,
uid_binding_sig=uid_binding_sigs,
uids=uids,
)
persist_revocations(
pubkey=pubkey,
revocations=revocations,
key_dir=key_dir,
uid_binding_sig=uid_binding_sigs,
uids=uids,
)
return user_dir
def persist_public_key(
certificate_fingerprint: str,
pubkey: Path,
key_dir: Path,
) -> None:
"""Persist the Public-Key packet
Parameters
----------
certificate_fingerprint: str
The unique fingerprint of the public key
pubkey: Path
The path to the public key of the root key
key_dir: Path
The root directory below which the basic key material is persisted
"""
packets: List[Path] = [pubkey]
output_file = key_dir / f'{certificate_fingerprint}.asc'
debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}')
packet_join(packets, output_file)
def persist_uids(
key_dir: Path,
uid_binding_sigs: Dict[str, Path],
uids: Dict[str, Path],
) -> None:
"""Persist the User IDs that belong to a PublicKey
The User ID material consists of PublicSubkeys and their SubkeyBindings.
The files are written to a UID specific directory and file below key_dir/uids.
Parameters
----------
key_dir: Path
The root directory below which the basic key material is persisted
uid_binding_sigs: Dict[str, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
The User IDs of a Public-Key (the root key)
"""
for key in uid_binding_sigs.keys():
packets = [uids[key], uid_binding_sigs[key]]
output_file = key_dir / f'uids/{key}/{key}.asc'
(key_dir / Path(f'uids/{key}')).mkdir(parents=True, exist_ok=True)
debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}')
packet_join(packets, output_file)
def persist_subkeys(
certificate_fingerprint: str,
key_dir: Path,
subkeys: Dict[str, Dict[str, Path]],
subkey_binding_sigs: Dict[str, Dict[str, Path]],
) -> None:
"""Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s)
Parameters
----------
certificate_fingerprint: str
The unique fingerprint of the public key
key_dir: Path
The root directory below which the basic key material is persisted
subkeys: Dict[str, Dict[str, Path]]
The PublicSubkeys of a key
subkey_binding_sigs: Dict[str, Dict[str, Path]]
The SubkeyBinding signatures of a Public-Key (the root key)
"""
if subkeys.get(certificate_fingerprint):
for signature, subkey in subkeys[certificate_fingerprint].items():
packets: List[Path] = []
packets.extend([subkey, subkey_binding_sigs[certificate_fingerprint][signature]])
output_file = key_dir / Path(f'subkeys/{signature}/{signature}.asc')
debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}')
(key_dir / Path(f"subkeys/{signature}")).mkdir(parents=True, exist_ok=True)
packet_join(packets=packets, output=output_file)
def persist_subkey_revocations(
certificate_fingerprint: str,
key_dir: Path,
subkey_revocations: Dict[str, Dict[str, Path]],
) -> None:
"""Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s)
Parameters
----------
certificate_fingerprint: str
The unique fingerprint of the public key
key_dir: Path
The root directory below which the basic key material is persisted
subkey_revocations: Dict[str, Dict[str, Path]]
The SubkeyRevocations of PublicSubkeys of a key
"""
if subkey_revocations.get(certificate_fingerprint):
for signature, revocation in subkey_revocations[certificate_fingerprint].items():
issuer = packet_dump_field(revocation, 'Issuer')
output_file = (key_dir / Path(f'subkeys/{signature}/revocation/{issuer}.asc'))
debug(f'Writing file {output_file} from {revocation}')
(key_dir / Path(f"subkeys/{signature}/revocation")).mkdir(parents=True, exist_ok=True)
packet_join(packets=[revocation], output=output_file)
def persist_direct_sigs(
direct_sigs: Dict[str, Dict[str, List[Path]]],
pubkey: Path,
key_dir: Path,
sig_type: str = "certification",
) -> None:
"""Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to
file(s)
Parameters
----------
certifications: Dict[str, List[Path]]
The certifications to write to file
pubkey: Path
The path to the public key of the root key
key_dir: Path
The root directory below which the Directkeys are persisted
sig_type: str
The type of direct certification to persist (defaults to 'certification'). This influences the directory name
"""
for key, current_certifications in direct_sigs.items():
for issuer, certifications in current_certifications.items():
direct_key_dir = key_dir / sig_type
direct_key_dir.mkdir(parents=True, exist_ok=True)
packets = [pubkey] + certifications
output_file = direct_key_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {[str(cert) for cert in certifications]}')
packet_join(packets, output_file)
def persist_certifications(
certifications: Dict[str, List[Path]],
pubkey: Path,
key_dir: Path,
uid_binding_sig: Dict[str, Path],
uids: Dict[str, Path],
) -> None:
"""Persist the certifications of a root key to file(s)
The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and
PositiveCertifications for all User IDs of the given root key.
All certifications are persisted in per User ID certification directories below key_dir.
Parameters
----------
certifications: Dict[str, List[Path]]
The certifications to write to file
pubkey: Path
The path to the public key of the root key
key_dir: Path
The root directory below which certifications are persisted
uid_binding_sig: Dict[str, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
The User IDs of a Public-Key (the root key)
"""
for key, current_certifications in certifications.items():
for certification in current_certifications:
certification_dir = key_dir / Path("uids") / key / 'certification'
certification_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(certification, 'Issuer')
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
output_file = certification_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {certification}')
packet_join(packets, output_file)
def persist_revocations(
pubkey: Path,
revocations: Dict[str, List[Path]],
key_dir: Path,
uid_binding_sig: Dict[str, Path],
uids: Dict[str, Path],
) -> None:
"""Persist the revocations of a root key to file(s)
The revocations include all CertificationRevocations for all User IDs of the given root key.
All revocations are persisted in per User ID 'revocation' directories below key_dir.
Parameters
----------
pubkey: Path
The path to the public key of the root key
revocations: Dict[str, List[Path]]
The revocations to write to file
key_dir: Path
The root directory below which revocations will be persisted
uid_binding_sig: Dict[str, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path]
The User IDs of a Public-Key (the root key)
"""
for key, current_revocations in revocations.items():
for revocation in current_revocations:
revocation_dir = key_dir / Path("uids") / key / 'revocation'
revocation_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(revocation, 'Issuer')
packets = [pubkey, uids[key]]
# Binding sigs only exist for 3rd-party revocations
if key in uid_binding_sig:
packets.append(uid_binding_sig[key])
packets.append(revocation)
output_file = revocation_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {revocation}')
packet_join(packets, output_file)
def packet_dump(packet: Path) -> str:
return system(['sq', 'packet', 'dump', str(packet)])
def packet_dump_field(packet: Path, field: str) -> str:
dump = packet_dump(packet)
lines = [line.strip() for line in dump.splitlines()]
lines = list(filter(lambda line: line.startswith(f'{field}: '), lines))
if not lines:
raise Exception(f'Packet has no field "{field}"')
return lines[0].split(maxsplit=1)[1]
def sanitize_certificate_file(working_dir: Path, certificate: Path) -> List[Path]:
"""Sanitize a certificate file, potentially containing several certificates
If the input file holds several certificates, they are split into respective files below working_dir and their
paths are returned. Else the path to the input certificate file is returned.
Parameters
----------
working_dir: Path
The path of the working directory below which to create split certificates
certificate: Path
The absolute path of a file containing one or several PGP certificates
Returns
-------
List[Path]
A list of paths that either contain the input certificate or several split certificates
"""
begin_cert = "-----BEGIN PGP PUBLIC KEY BLOCK-----"
end_cert = "-----END PGP PUBLIC KEY BLOCK-----"
certificate_list: List[Path] = []
with open(file=certificate, mode="r") as certificate_file:
file = certificate_file.read()
if file.count(begin_cert) > 1 and file.count(end_cert) > 1:
debug(f"Several public keys detected in file: {certificate}")
for split_cert in [f"{cert}{end_cert}\n" for cert in list(filter(None, file.split(f"{end_cert}\n")))]:
split_cert_dir = Path(mkdtemp(dir=working_dir)).absolute()
split_cert_path = split_cert_dir / certificate.name
with open(file=split_cert_path, mode="w") as split_cert_file:
split_cert_file.write(split_cert)
debug(f"Writing split cert to file: {split_cert_path}")
certificate_list.append(split_cert_path)
return certificate_list
else:
return [certificate]
def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
"""Split a file containing a PGP certificate into separate packet files
The files are split using sq
Parameters
----------
working_dir: Path
The path of the working directory below which to create the output files
certificate: Path
The absolute path of a file containing one PGP certificate
Returns
-------
Iterable[Path]
An iterable over the naturally sorted list of packet files derived from certificate
"""
packet_dir = Path(mkdtemp(dir=working_dir)).absolute()
with cwd(packet_dir):
system(['sq', 'packet', 'split', '--prefix', '', str(certificate)])
return natural_sort_path(packet_dir.iterdir())
def packet_join(packets: List[Path], output: Path, force: bool = False) -> None:
"""Join PGP packet data in files to a single output file
Parameters
----------
packets: List[Path]
A list of paths to files that contain PGP packet data
output: Path
A file to which all PGP packet data is written
force: bool
Whether to force the execution of sq (defaults to False)
"""
cmd = ['sq', 'packet', 'join']
if force:
cmd.insert(1, '--force')
packets_str = list(map(lambda path: str(path), packets))
cmd.extend(packets_str)
cmd.extend(['--output', str(output)])
system(cmd, exit_on_error=False)
def simplify_user_id(user_id: str) -> str:
user_id = user_id.replace('@', '_at_')
user_id = sub('[<>]', '', user_id)
user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id)
return user_id
def convert(
working_dir: Path,
source: Path,
target_dir: Path,
name_override: Optional[str] = None,
) -> Path:
directories: List[Path] = []
if source.is_dir():
for key in source.iterdir():
for sane_cert in sanitize_certificate_file(working_dir=working_dir, certificate=key):
directories.append(
convert_certificate(working_dir=working_dir, certificate=sane_cert, name_override=name_override)
)
else:
for sane_cert in sanitize_certificate_file(working_dir=working_dir, certificate=source):
directories.append(
convert_certificate(working_dir=working_dir, certificate=sane_cert, name_override=name_override)
)
for path in directories:
(target_dir / path.name).mkdir(exist_ok=True)
copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True)
return target_dir
def keyring_import(working_dir: Path, source: Path, target_dir: Optional[Path] = None):
pass
def temp_join_keys(sources: List[Path], temp_dir: Path, force: bool) -> List[Path]:
"""Temporarily join the key material of a given set of keys in a temporary location and return their paths
Parameters
----------
sources: List[Path]
A list of paths below which PGP packets are found
temp_dir: Path
The temporary directory below which to join PGP keys
force: bool
Whether to force the joining of files
"""
certs: List[Path] = []
for source_number, source in enumerate(sources):
if source.is_dir():
for user_number, user_dir in enumerate(sorted(source.iterdir())):
if user_dir.is_dir():
for user_cert_number, user_cert_dir in enumerate(sorted(user_dir.iterdir())):
if user_cert_dir.is_dir():
cert_path = (
temp_dir
/ (
f"{str(source_number).zfill(4)}"
f"-{str(user_number).zfill(4)}"
f"-{str(user_cert_number).zfill(4)}.asc"
)
)
debug(f"Joining {user_dir.name}/{user_cert_dir.name} in {cert_path}.")
packet_join(
packets=sorted(user_cert_dir.glob("**/*.asc")),
output=cert_path,
force=force,
)
certs.append(cert_path)
elif source.is_file() and not source.is_symlink():
certs.append(source)
return certs
def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[str], List[str]]:
"""Get the fingerprints of all public keys and all fingerprints of all (self) revoked public keys in a directory
Parameters
----------
certs: List[Path]
The certificates to trust
Returns
-------
Tuple[List[str], List[str]]
A tuple with the first item containing the fingerprints of all public keys and the second item containing the
fingerprints of all self-revoked public keys
"""
all_fingerprints: List[str] = []
revoked_fingerprints: List[str] = []
debug(f"Retrieving all and self-revoked certificates from {[str(cert_dir) for cert_dir in certs]}")
for cert_collection in certs:
if cert_collection.is_dir():
for user_dir in cert_collection.iterdir():
if user_dir.is_dir():
for cert_dir in user_dir.iterdir():
all_fingerprints.append(cert_dir.stem)
for revocation_cert in cert_dir.glob("revocation/*.asc"):
if cert_dir.stem.endswith(revocation_cert.stem):
debug(f"Revoking {cert_dir.stem} due to self-revocation")
revoked_fingerprints.append(cert_dir.stem)
return (all_fingerprints, revoked_fingerprints)
def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[str], List[str]]:
"""Export ownertrust from a set of keys
The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all
non-revoked keys as fully trusted.
The exported file is used by pacman-key when importing a keyring (see
https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).
Parameters
----------
certs: List[Path]
The certificates to trust
output: Path
The file path to write to
"""
(all_certs, revoked_certs) = get_all_and_revoked_certs(certs=certs)
trusted_certs = [cert for cert in all_certs if cert not in revoked_certs]
with open(file=output, mode="w") as trusted_certs_file:
for cert in trusted_certs:
debug(f"Writing {cert} to {output}")
trusted_certs_file.write(f"{cert}:4:\n")
return (trusted_certs, all_certs)
def export_revoked(certs: List[Path], main_keys: List[str], output: Path, min_revoker: int = 2) -> None:
"""Export the PGP revoked status from a set of keys
The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations
by any main key exist.
The exported file is used by pacman-key when importing a keyring (see
https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).
Parameters
----------
certs: List[Path]
A list of directories with keys to check for their revocation status
main_keys: List[str]
A list of strings representing the fingerprints of (current and/or revoked) main keys
output: Path
The file path to write to
min_revoker: int
The minimum amount of revocation certificates on a User ID from any main key to deem a public key as revoked
(defaults to 2)
"""
(all_certs, revoked_certs) = get_all_and_revoked_certs(certs=certs)
debug(f"Retrieving certificates revoked by main keys from {[str(cert_dir) for cert_dir in certs]}")
foreign_revocations: Dict[str, List[str]] = {}
for cert_collection in certs:
if cert_collection.is_dir():
for user_dir in cert_collection.iterdir():
if user_dir.is_dir():
for cert_dir in user_dir.iterdir():
debug(f"Inspecting public key {cert_dir.name}")
foreign_revocations[cert_dir.stem] = []
for revocation_cert in cert_dir.glob("uids/*/revocation/*.asc"):
foreign_revocations[cert_dir.stem] += [
revocation_cert.stem for main_key in main_keys
if main_key.endswith(revocation_cert.stem)
]
# TODO: find a better (less naive) approach, as this would also match on public certificates,
# where some UIDs are signed and others are revoked
if len(set(foreign_revocations[cert_dir.stem])) >= min_revoker:
debug(
f"Revoking {cert_dir.name} due to {set(foreign_revocations[cert_dir.stem])} "
"being main key revocations"
)
revoked_certs.append(cert_dir.stem)
with open(file=output, mode="w") as trusted_certs_file:
for cert in set(revoked_certs):
debug(f"Writing {cert} to {output}")
trusted_certs_file.write(f"{cert}\n")
def export_keyring(
working_dir: Path,
main: List[Path],
sources: List[Path],
output: Path,
force: bool,
pacman_integration: bool,
) -> None:
"""Export all provided PGP packet files to a single output file
If sources contains directories, any .asc files below them are considered.
Parameters
----------
working_dir: Path
A directory to use for temporary files
main: List[Path]
A list of directories or files from which to read PGP packet information, that is considered as public keys
that are used to sign those found in sources
sources: List[Path]
A list of directories or files from which to read PGP packet information
output: Path
An output file that all PGP packet data is written to
force: bool
Whether to force the execution of packet_join()
"""
main = [source.absolute() for source in main]
sources = [source.absolute() for source in sources]
output = output.absolute()
main_certs = temp_join_keys(sources=main, temp_dir=Path(mkdtemp(dir=working_dir)).absolute(), force=force)
sources_certs = temp_join_keys(sources=sources, temp_dir=Path(mkdtemp(dir=working_dir)).absolute(), force=force)
debug(
f"Creating keyring {output} from {[str(source_dir) for source_dir in main]} "
f"and {[str(source_dir) for source_dir in sources]}."
)
cmd = ['sq', 'keyring', 'merge', '-o', str(output)]
if force:
cmd.insert(1, '--force')
cmd += [str(cert) for cert in sorted(main_certs)]
cmd += [str(cert) for cert in sorted(sources_certs)]
system(cmd, exit_on_error=False)
if pacman_integration:
[trusted_main_keys, all_main_keys] = export_ownertrust(
certs=main,
output=Path(f"{str(output).split('.gpg')[0]}-trusted"),
)
export_revoked(
certs=main + sources,
main_keys=all_main_keys,
output=Path(f"{str(output).split('.gpg')[0]}-revoked"),
)
def absolute_path(path: str) -> Path:
return Path(path).absolute()
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true',
help='Causes to print debugging messages about the progress')
parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory')
parser.add_argument(
'-f',
'--force',
action='store_true',
default=False,
help='force the execution of subcommands (e.g. overwriting of files)',
)
subcommands = parser.add_subparsers(dest="subcommand")
convert_parser = subcommands.add_parser(
'convert',
help="convert a legacy-style PGP cert or directory containing PGP certs to the new format",
)
convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert')
convert_parser.add_argument('--target', type=absolute_path, help='target directory')
convert_parser.add_argument(
'--name',
type=str,
default=None,
help='override the username to use (only useful when targetting a single file)',
)
import_parser = subcommands.add_parser('import')
import_parser.add_argument('source', type=absolute_path, help='File or directory')
import_parser.add_argument('--target', type=absolute_path, help='Target directory')
export_parser = subcommands.add_parser(
'export',
help="export a directory structure of PGP packet data to a combined file",
)
export_parser.add_argument('output', type=absolute_path, help='file to write PGP packet data to')
export_parser.add_argument(
'-m',
'--main',
action="append",
help='files or directories containing PGP packet data that is trusted (can be provided multiple times)',
required=True,
type=absolute_path,
)
export_parser.add_argument(
'-s',
'--source',
action="append",
help='files or directories containing PGP packet data (can be provided multiple times)',
required=True,
type=absolute_path,
)
export_parser.add_argument(
'-p',
'--pacman-integration',
action='store_true',
default=False,
help='export trusted and revoked files (used by pacman) alongside the keyring',
)
args = parser.parse_args()
if args.verbose:
basicConfig(level=DEBUG)
# temporary working directory that gets auto cleaned
with TemporaryDirectory() as tempdir:
working_dir = Path(tempdir)
chdir(working_dir)
debug(f'Working directory: {working_dir}')
if args.subcommand in ["convert", "import"]:
if args.target:
args.target.mkdir(parents=True, exist_ok=True)
target_dir = args.target
else:
# persistent target directory
target_dir = Path(mkdtemp()).absolute()
if 'convert' == args.subcommand:
print(convert(working_dir, args.source, target_dir))
elif 'import' == args.subcommand:
keyring_import(working_dir, args.source, target_dir)
elif 'export' == args.subcommand:
export_keyring(
working_dir=working_dir,
main=args.main,
sources=args.source,
output=args.output,
force=args.force,
pacman_integration=args.pacman_integration,
)
if args.wait:
print('Press [ENTER] to continue')
input()