condorcore-keyring/keyringctl

1419 lines
48 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
#
# SPDX-License-Identifier: GPL-3.0-or-later
from argparse import ArgumentParser
from collections import defaultdict
from collections.abc import Iterable
from collections.abc import Iterator
from contextlib import contextmanager
from itertools import chain
from logging import DEBUG
from logging import basicConfig
from logging import debug
from logging import error
from os import chdir
from os import getcwd
from pathlib import Path
from re import escape
from re import split
from re import sub
from shutil import copytree
from subprocess import PIPE
from subprocess import CalledProcessError
from subprocess import check_output
from sys import exit
from sys import stderr
from tempfile import TemporaryDirectory
from tempfile import mkdtemp
from tempfile import mkstemp
from traceback import print_stack
from typing import Dict
from typing import List
from typing import NewType
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
Fingerprint = NewType("Fingerprint", str)
Uid = NewType("Uid", str)
Username = NewType("Username", str)
@contextmanager
def cwd(new_dir: Path) -> Iterator[None]:
"""Change to a new current working directory in a context and go back to the previous dir after the context is done
Parameters
----------
new_dir: Path
A path to change to
"""
previous_dir = getcwd()
chdir(new_dir)
try:
yield
finally:
chdir(previous_dir)
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
"""Sort an Iterable of Paths naturally
Parameters
----------
_list: Iterable[Path]
An iterable containing paths to be sorted
Return
------
Iterable[Path]
An Iterable of paths that are naturally sorted
"""
def convert_text_chunk(text: str) -> Union[int, str]:
"""Convert input text to int or str
Parameters
----------
text: str
An input string
Returns
-------
Union[int, str]
Either an integer if text is a digit, else text in lower-case representation
"""
return int(text) if text.isdigit() else text.lower()
def alphanum_key(key: Path) -> List[Union[int, str]]:
"""Retrieve an alphanumeric key from a Path, that can be used in sorted()
Parameters
----------
key: Path
A path for which to create a key
Returns
-------
List[Union[int, str]]
A list of either int or str objects that may serve as 'key' argument for sorted()
"""
return [convert_text_chunk(c) for c in split("([0-9]+)", str(key.name))]
return sorted(_list, key=alphanum_key)
def system(cmd: List[str], exit_on_error: bool = False) -> str:
"""Execute a command using check_output
Parameters
----------
cmd: List[str]
A list of strings to be fed to check_output
exit_on_error: bool
Whether to exit the script when encountering an error (defaults to False)
Raises
------
CalledProcessError
If not exit_on_error and `check_output()` encounters an error
Returns
-------
str
The output of cmd
"""
try:
return check_output(cmd, stderr=PIPE).decode()
except CalledProcessError as e:
stderr.buffer.write(e.stderr)
print_stack()
if exit_on_error:
exit(e.returncode)
raise e
def get_cert_paths(paths: Iterable[Path]) -> Set[Path]:
"""Walks a list of paths and resolves all discovered certificate paths
Parameters
----------
paths: Iterable[Path]
A list of paths to walk and resolve to certificate paths.
Returns
-------
List[Path]
The list of paths to certificates
"""
# depth first search certificate paths
cert_paths: Set[Path] = set()
visit: List[Path] = list(paths)
while visit:
path = visit.pop()
# this level contains a certificate, abort depth search
if list(path.glob("*.asc")):
cert_paths.add(path)
continue
visit.extend([path for path in path.iterdir() if path.is_dir()])
return cert_paths
# TODO: simplify to lower complexity
def convert_certificate( # noqa: ignore=C901
working_dir: Path,
certificate: Path,
keyring_dir: Path,
name_override: Optional[Username] = None,
fingerprint_filter: Optional[Set[Fingerprint]] = None,
) -> Path:
"""Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets
The output directory structure is created per user. The username is derived from the certificate via
`derive_username_from_fingerprint` or overridden via `name_override`.
Below the username directory a directory tree describes the public keys components split up into certifications
and revocations, as well as per subkey and per uid certifications and revocations.
Parameters
----------
working_dir: Path
The path of the working directory below which to create split certificates
certificate: Path
The path to a public key certificate
keyring_dir: Path
The path of the keyring used to try to derive the username from the public key fingerprint
name_override: Optional[Username]
An optional string to override the username in the to be created output directory structure
fingerprint_filter: Optional[Set[Fingerprint]]
An optional list of strings defining fingerprints of PGP public keys that all certificates will be filtered
with
Raises
------
Exception
If required PGP packets are not found
Returns
-------
Path
The path of the user_dir (which is located below working_dir)
"""
# root packets
certificate_fingerprint: Optional[Fingerprint] = None
pubkey: Optional[Path] = None
direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list)
direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list)
# subkey packets
subkeys: Dict[Fingerprint, Path] = {}
subkey_binding_sigs: Dict[Fingerprint, Path] = {}
subkey_revocations: Dict[Fingerprint, Path] = {}
# uid packets
uids: Dict[Uid, Path] = {}
uid_binding_sigs: Dict[Uid, Path] = {}
certifications: Dict[Uid, List[Path]] = defaultdict(list)
revocations: Dict[Uid, List[Path]] = defaultdict(list)
# intermediate variables
current_packet_mode: Optional[str] = None
current_packet_fingerprint: Optional[Fingerprint] = None
current_packet_uid: Optional[Uid] = None
# XXX: PrimaryKeyBinding
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
debug(f"Processing certificate {certificate}")
for packet in packet_split(working_dir=working_dir, certificate=certificate):
debug(f"Processing packet {packet.name}")
if packet.name.endswith("--PublicKey"):
current_packet_mode = "pubkey"
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
current_packet_uid = None
certificate_fingerprint = current_packet_fingerprint
pubkey = packet
elif packet.name.endswith("--UserID"):
current_packet_mode = "uid"
current_packet_fingerprint = None
current_packet_uid = simplify_user_id(Uid(packet_dump_field(packet, "Value")))
uids[current_packet_uid] = packet
elif packet.name.endswith("--PublicSubkey"):
current_packet_mode = "subkey"
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
current_packet_uid = None
subkeys[current_packet_fingerprint] = packet
elif packet.name.endswith("--Signature"):
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
issuer: Fingerprint = Fingerprint(packet_dump_field(packet, "Issuer"))
signature_type = packet_dump_field(packet, "Type")
if current_packet_mode == "pubkey":
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
direct_revocations[issuer].append(packet)
elif signature_type in ["DirectKey", "GenericCertification"]:
direct_sigs[issuer].append(packet)
else:
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "uid":
if not current_packet_uid:
raise Exception('missing current packet uid for "{packet.name}"')
if signature_type == "CertificationRevocation":
revocations[current_packet_uid].append(packet)
elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer):
uid_binding_sigs[current_packet_uid] = packet
elif signature_type.endswith("Certification"):
if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]):
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
certifications[current_packet_uid].append(packet)
else:
debug(f"The certification by issuer {issuer} is not appended because it is not in the filter")
else:
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "subkey":
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "SubkeyBinding":
subkey_binding_sigs[current_packet_fingerprint] = packet
elif signature_type == "SubkeyRevocation":
subkey_revocations[certificate_fingerprint] = packet
else:
raise Exception(f"unknown signature type: {signature_type}")
else:
raise Exception(f'unknown signature root for "{packet.name}"')
else:
raise Exception(f'unknown packet type "{packet.name}"')
if not certificate_fingerprint:
raise Exception("missing certificate fingerprint")
if not pubkey:
raise Exception("missing certificate public-key")
name_override = (
name_override
or derive_username_from_fingerprint(keyring_dir=keyring_dir, certificate_fingerprint=certificate_fingerprint)
or Username(certificate.stem)
)
user_dir = working_dir / name_override
key_dir = user_dir / certificate_fingerprint
key_dir.mkdir(parents=True, exist_ok=True)
persist_public_key(
certificate_fingerprint=certificate_fingerprint,
pubkey=pubkey,
key_dir=key_dir,
)
persist_uids(
key_dir=key_dir,
uid_binding_sigs=uid_binding_sigs,
uids=uids,
)
persist_subkeys(
key_dir=key_dir,
subkeys=subkeys,
subkey_binding_sigs=subkey_binding_sigs,
)
persist_subkey_revocations(
key_dir=key_dir,
subkey_revocations=subkey_revocations,
)
persist_direct_sigs(
direct_sigs=direct_sigs,
pubkey=pubkey,
key_dir=key_dir,
)
persist_direct_sigs(
direct_sigs=direct_revocations,
pubkey=pubkey,
key_dir=key_dir,
sig_type="revocation",
)
persist_certifications(
certifications=certifications,
pubkey=pubkey,
key_dir=key_dir,
uid_binding_sig=uid_binding_sigs,
uids=uids,
)
persist_revocations(
pubkey=pubkey,
revocations=revocations,
key_dir=key_dir,
uid_binding_sig=uid_binding_sigs,
uids=uids,
)
return user_dir
def persist_public_key(
certificate_fingerprint: Fingerprint,
pubkey: Path,
key_dir: Path,
) -> None:
"""Persist the Public-Key packet
Parameters
----------
certificate_fingerprint: Fingerprint
The unique fingerprint of the public key
pubkey: Path
The path to the public key of the root key
key_dir: Path
The root directory below which the basic key material is persisted
"""
packets: List[Path] = [pubkey]
output_file = key_dir / f"{certificate_fingerprint}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
packet_join(packets, output_file, force=True)
def persist_uids(
key_dir: Path,
uid_binding_sigs: Dict[Uid, Path],
uids: Dict[Uid, Path],
) -> None:
"""Persist the User IDs that belong to a PublicKey
The User ID material consists of PublicSubkeys and their SubkeyBindings.
The files are written to a UID specific directory and file below key_dir/uid.
Parameters
----------
key_dir: Path
The root directory below which the basic key material is persisted
uid_binding_sigs: Dict[Uid, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[Uid, Path]
The User IDs of a Public-Key (the root key)
"""
for key in uid_binding_sigs.keys():
packets = [uids[key], uid_binding_sigs[key]]
output_file = key_dir / "uid" / key / f"{key}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
packet_join(packets, output_file, force=True)
def persist_subkeys(
key_dir: Path,
subkeys: Dict[Fingerprint, Path],
subkey_binding_sigs: Dict[Fingerprint, Path],
) -> None:
"""Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s)
Parameters
----------
key_dir: Path
The root directory below which the basic key material is persisted
subkeys: Dict[Fingerprint, Path]
The PublicSubkeys of a key
subkey_binding_sigs: Dict[Fingerprint, Path]
The SubkeyBinding signatures of a Public-Key (the root key)
"""
for fingerprint, subkey in subkeys.items():
packets: List[Path] = []
packets.extend([subkey, subkey_binding_sigs[fingerprint]])
output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
packet_join(packets=packets, output=output_file, force=True)
def persist_subkey_revocations(
key_dir: Path,
subkey_revocations: Dict[Fingerprint, Path],
) -> None:
"""Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s)
Parameters
----------
key_dir: Path
The root directory below which the basic key material is persisted
subkey_revocations: Dict[Fingerprint, Path]
The SubkeyRevocations of PublicSubkeys of a key
"""
for fingerprint, revocation in subkey_revocations.items():
issuer = packet_dump_field(revocation, "Issuer")
output_file = key_dir / "subkey" / fingerprint / "revocation" / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {revocation}")
packet_join(packets=[revocation], output=output_file, force=True)
def persist_direct_sigs(
direct_sigs: Dict[Fingerprint, List[Path]],
pubkey: Path,
key_dir: Path,
sig_type: str = "certification",
) -> None:
"""Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to
file(s)
Parameters
----------
direct_sigs: Dict[Fingerprint, List[Path]]
The direct sigs to write to file
pubkey: Path
The path to the public key of the root key
key_dir: Path
The root directory below which the Directkeys are persisted
sig_type: str
The type of direct certification to persist (defaults to 'certification'). This influences the directory name
"""
for issuer, certifications in direct_sigs.items():
packets = [pubkey] + certifications
output_file = key_dir / sig_type / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
packet_join(packets, output_file, force=True)
def persist_certifications(
certifications: Dict[Uid, List[Path]],
pubkey: Path,
key_dir: Path,
uid_binding_sig: Dict[Uid, Path],
uids: Dict[Uid, Path],
) -> None:
"""Persist the certifications of a root key to file(s)
The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and
PositiveCertifications for all User IDs of the given root key.
All certifications are persisted in per User ID certification directories below key_dir.
Parameters
----------
certifications: Dict[Uid, List[Path]]
The certifications to write to file
pubkey: Path
The path to the public key of the root key
key_dir: Path
The root directory below which certifications are persisted
uid_binding_sig: Dict[Uid, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[Uid, Path]
The User IDs of a Public-Key (the root key)
"""
for key, current_certifications in certifications.items():
for certification in current_certifications:
certification_dir = key_dir / "uid" / key / "certification"
certification_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(certification, "Issuer")
if uids.get(key) and uid_binding_sig.get(key):
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
output_file = certification_dir / f"{issuer}.asc"
debug(f"Writing file {output_file} from {certification}")
packet_join(packets, output_file, force=True)
else:
error(
f"Public key '{pubkey}' does not provide "
f"{'the UID binding signature' if not uid_binding_sig.get(key) else ''} for UID '{key}', "
"so its certifications can not be used!"
)
def persist_revocations(
pubkey: Path,
revocations: Dict[Uid, List[Path]],
key_dir: Path,
uid_binding_sig: Dict[Uid, Path],
uids: Dict[Uid, Path],
) -> None:
"""Persist the revocations of a root key to file(s)
The revocations include all CertificationRevocations for all User IDs of the given root key.
All revocations are persisted in per User ID 'revocation' directories below key_dir.
Parameters
----------
pubkey: Path
The path to the public key of the root key
revocations: Dict[Uid, List[Path]]
The revocations to write to file
key_dir: Path
The root directory below which revocations will be persisted
uid_binding_sig: Dict[Uid, Path]
The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[Uid, Path]
The User IDs of a Public-Key (the root key)
"""
for key, current_revocations in revocations.items():
for revocation in current_revocations:
revocation_dir = key_dir / "uid" / key / "revocation"
revocation_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(revocation, "Issuer")
packets = [pubkey, uids[key]]
# Binding sigs only exist for 3rd-party revocations
if key in uid_binding_sig:
packets.append(uid_binding_sig[key])
packets.append(revocation)
output_file = revocation_dir / f"{issuer}.asc"
debug(f"Writing file {output_file} from {revocation}")
packet_join(packets, output_file, force=True)
def packet_dump(packet: Path) -> str:
"""Dump a PGP packet to string
The `sq packet dump` command is used to retrieve a dump of information from a PGP packet
Parameters
----------
packet: Path
The path to the PGP packet to retrieve the value from
Returns
-------
str
The contents of the packet dump
"""
return system(["sq", "packet", "dump", str(packet)])
def packet_dump_field(packet: Path, field: str) -> str:
"""Retrieve the value of a field from a PGP packet
Parameters
----------
packet: Path
The path to the PGP packet to retrieve the value from
field: str
The name of the field
Raises
------
Exception
If the field is not found in the PGP packet
Returns
-------
str
The value of the field found in packet
"""
dump = packet_dump(packet)
lines = [line.strip() for line in dump.splitlines()]
lines = list(filter(lambda line: line.startswith(f"{field}: "), lines))
if not lines:
raise Exception(f'Packet has no field "{field}"')
return lines[0].split(maxsplit=1)[1]
def keyring_split(working_dir: Path, keyring: Path, preserve_filename: bool = False) -> Iterable[Path]:
"""Split a file containing a PGP keyring into separate certificate files
The original keyring filename is preserved if the split only yields a single certificate.
If preserve_filename is True, all keyrings are placed into separate directories while preserving
the filename.
The file is split using sq.
Parameters
----------
working_dir: Path
The path of the working directory below which to create the output files
keyring: Path
The path of a file containing a PGP keyring
preserve_filename: bool
If True, all keyrings are placed into separate directories while preserving the filename
Returns
-------
Iterable[Path]
An iterable over the naturally sorted list of certificate files derived from a keyring
"""
keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute()
with cwd(keyring_dir):
system(["sq", "keyring", "split", "--prefix", "", str(keyring)])
keyrings: List[Path] = list(natural_sort_path(keyring_dir.iterdir()))
if 1 == len(keyrings) or preserve_filename:
for index, key in enumerate(keyrings):
keyring_sub_dir = Path(mkdtemp(dir=keyring_dir, prefix=f"{keyring.name}-")).absolute()
keyrings[index] = key.rename(keyring_sub_dir / keyring.name)
return keyrings
def keyring_merge(certificates: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
"""Merge multiple certificates into a keyring
Parameters
----------
certificates: List[Path]
List of paths to certificates to merge into a keyring
output: Optional[Path]
Path to a file which the keyring is written, return the result instead if None
force: bool
Whether to force overwriting existing files (defaults to False)
Returns
-------
str
The result if no output file has been used
"""
cmd = ["sq", "keyring", "merge"]
if force:
cmd.insert(1, "--force")
if output:
cmd += ["--output", str(output)]
cmd += [str(cert) for cert in sorted(certificates)]
return system(cmd)
def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
"""Split a file containing a PGP certificate into separate packet files
The files are split using sq
Parameters
----------
working_dir: Path
The path of the working directory below which to create the output files
certificate: Path
The absolute path of a file containing one PGP certificate
Returns
-------
Iterable[Path]
An iterable over the naturally sorted list of packet files derived from certificate
"""
packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute()
with cwd(packet_dir):
system(["sq", "packet", "split", "--prefix", "", str(certificate)])
return natural_sort_path(packet_dir.iterdir())
def packet_join(packets: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
"""Join PGP packet data in files to a single output file
Parameters
----------
packets: List[Path]
A list of paths to files that contain PGP packet data
output: Optional[Path]
Path to a file to which all PGP packet data is written, return the result instead if None
force: bool
Whether to force overwriting existing files (defaults to False)
Returns
-------
str
The result if no output file has been used
"""
cmd = ["sq", "packet", "join"]
if force:
cmd.insert(1, "--force")
packets_str = list(map(lambda path: str(path), packets))
cmd.extend(packets_str)
cmd.extend(["--output", str(output)])
return system(cmd)
def inspect(
packet: Path, certifications: bool = True, fingerprints: Optional[Dict[Fingerprint, Username]] = None
) -> str:
"""Inspect PGP packet data and return the result
Parameters
----------
packet: Path
Path to a file that contain PGP data
certifications: bool
Whether to print third-party certifications
fingerprints: Optional[Dict[Fingerprint, Username]]
Optional dict of fingerprints to usernames to enrich the output with
Returns
-------
str
The result of the inspection
"""
cmd = ["sq", "inspect"]
if certifications:
cmd.append("--certifications")
cmd.append(str(packet))
result: str = system(cmd)
if fingerprints:
for fingerprint, username in fingerprints.items():
result = sub(f"{fingerprint}", f"{fingerprint} {username}", result)
result = sub(f" {fingerprint[24:]}", f" {fingerprint[24:]} {username}", result)
return result
def simplify_user_id(user_id: Uid) -> Uid:
"""Simplify the User ID string to contain more filesystem friendly characters
Parameters
----------
user_id: Uid
A User ID string (e.g. 'Foobar McFooface <foobar@foo.face>')
Returns
-------
Uid
The simplified representation of user_id
"""
user_id_str: str = user_id.replace("@", "_at_")
user_id_str = sub("[<>]", "", user_id_str)
user_id_str = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id_str)
return Uid(user_id_str)
def derive_username_from_fingerprint(keyring_dir: Path, certificate_fingerprint: Fingerprint) -> Optional[Username]:
"""Attempt to derive the username of a public key fingerprint from a keyring directory
Parameters
----------
keyring_dir: Path
The directory in which to look up a username
certificate_fingerprint: Fingerprint
The public key fingerprint to derive the username from
Raises
------
Exception
If more than one username is found (a public key can only belong to one individual)
Returns
-------
Optional[Username]
A string representing the username a public key certificate belongs to, None otherwise
"""
matches = list(keyring_dir.glob(f"*/*{certificate_fingerprint}"))
if len(matches) > 1:
raise Exception(
f"More than one username found in {keyring_dir} when probing for fingerprint '{certificate_fingerprint}': "
f"{matches}"
)
elif not matches:
debug(f"Can not derive username from target directory for fingerprint {certificate_fingerprint}")
return None
else:
username = matches[0].parent.stem
debug(
f"Successfully derived username '{username}' from target directory for fingerprint "
f"{certificate_fingerprint}"
)
return Username(username)
def convert(
working_dir: Path,
source: Iterable[Path],
target_dir: Path,
name_override: Optional[Username] = None,
fingerprint_filter: Optional[Set[Fingerprint]] = None,
) -> Path:
"""Convert a path containing PGP certificate material to a decomposed directory structure
Any input is first split by `keyring_split()` into individual certificates.
Parameters
----------
working_dir: Path
A directory to use for temporary files
source: Iterable[Path]
A path to a file or directory to decompose
target_dir: Path
A directory path to write the new directory structure to
name_override: Optional[Username]
An optional username override for the call to `convert_certificate()`
fingerprint_filter: Optional[Set[Fingerprint]]
An optional set of strings defining fingerprints of PGP public keys that all certificates will be filtered with
Returns
-------
Path
The directory that contains the resulting directory structure (target_dir)
"""
directories: List[Path] = []
keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source)))
for key in keys:
for cert in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True):
directories.append(
convert_certificate(
working_dir=working_dir,
certificate=cert,
keyring_dir=target_dir,
name_override=name_override,
fingerprint_filter=fingerprint_filter,
)
)
for path in directories:
(target_dir / path.name).mkdir(parents=True, exist_ok=True)
copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True)
return target_dir
def get_trusted_and_revoked_certs(certs: List[Path]) -> Tuple[List[Fingerprint], List[Fingerprint]]:
"""Get the fingerprints of all trusted and all self revoked public keys in a directory
Parameters
----------
certs: List[Path]
The certificates to trust
Returns
-------
Tuple[List[str], List[str]]
A tuple with the first item containing the fingerprints of all public keys and the second item containing the
fingerprints of all self-revoked public keys
"""
all_certs: List[Fingerprint] = []
revoked_certs: List[Fingerprint] = []
# TODO: what about direct key revocations/signatures?
debug(f"Retrieving trusted and self-revoked certificates from {[str(cert_dir) for cert_dir in certs]}")
for cert_dir in sorted(get_cert_paths(certs)):
cert_fingerprint = Fingerprint(cert_dir.stem)
all_certs.append(cert_fingerprint)
for revocation_cert in cert_dir.glob("revocation/*.asc"):
if cert_fingerprint.endswith(revocation_cert.stem):
debug(f"Revoking {cert_fingerprint} due to self-revocation")
revoked_certs.append(cert_fingerprint)
trusted_keys = [cert for cert in all_certs if cert not in revoked_certs]
return trusted_keys, revoked_certs
def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]:
"""Export ownertrust from a set of keys and return the trusted and revoked fingerprints
The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all
non-revoked keys as fully trusted.
The exported file is used by pacman-key when importing a keyring (see
https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).
Parameters
----------
certs: List[Path]
The certificates to trust
output: Path
The file path to write to
"""
trusted_certs, revoked_certs = get_trusted_and_revoked_certs(certs=certs)
with open(file=output, mode="w") as trusted_certs_file:
for cert in sorted(set(trusted_certs)):
debug(f"Writing {cert} to {output}")
trusted_certs_file.write(f"{cert}:4:\n")
return trusted_certs, revoked_certs
def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None:
"""Export the PGP revoked status from a set of keys
The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations
by any main key exist.
The exported file is used by pacman-key when importing a keyring (see
https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).
Parameters
----------
certs: List[Path]
A list of directories with keys to check for their revocation status
main_keys: List[Fingerprint]
A list of strings representing the fingerprints of (current and/or revoked) main keys
output: Path
The file path to write to
min_revoker: int
The minimum amount of revocation certificates on a User ID from any main key to deem a public key as revoked
(defaults to 2)
"""
trusted_certs, revoked_certs = get_trusted_and_revoked_certs(certs=certs)
debug(f"Retrieving certificates revoked by main keys from {[str(cert_dir) for cert_dir in certs]}")
foreign_revocations: Dict[Fingerprint, Set[Fingerprint]] = defaultdict(set)
for cert_dir in sorted(get_cert_paths(certs)):
fingerprint = Fingerprint(cert_dir.name)
debug(f"Inspecting public key {fingerprint}")
for revocation_cert in cert_dir.glob("uid/*/revocation/*.asc"):
revocation_fingerprint = Fingerprint(revocation_cert.stem)
foreign_revocations[fingerprint].update(
[revocation_fingerprint for main_key in main_keys if main_key.endswith(revocation_fingerprint)]
)
# TODO: find a better (less naive) approach, as this would also match on public certificates,
# where some UIDs are signed and others are revoked
if len(foreign_revocations[fingerprint]) >= min_revoker:
debug(
f"Revoking {cert_dir.name} due to {set(foreign_revocations[fingerprint])} "
"being main key revocations"
)
revoked_certs.append(fingerprint)
with open(file=output, mode="w") as trusted_certs_file:
for cert in sorted(set(revoked_certs)):
debug(f"Writing {cert} to {output}")
trusted_certs_file.write(f"{cert}\n")
def get_fingerprints_from_import_source(working_dir: Path, source: List[Path]) -> Dict[Fingerprint, Username]:
"""Get all fingerprints of PGP public keys from import file(s)
Parameters
----------
working_dir: Path
A directory to use for temporary files
source: List[Path]
The path to a source file or directory containing keyrings
Returns
-------
Dict[Fingerprint, Username]
A dict of all fingerprints and their usernames of PGP public keys below path
"""
fingerprints: Dict[Fingerprint, Username] = {}
keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source)))
for key in keys:
for certificate in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True):
for packet in packet_split(working_dir=working_dir, certificate=certificate):
if packet.name.endswith("--PublicKey"):
fingerprints[Fingerprint(packet_dump_field(packet, "Fingerprint"))] = Username(certificate.stem)
debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}")
return fingerprints
def get_fingerprints_from_decomposed_dir(path: Path) -> Dict[Fingerprint, Username]:
"""Get all fingerprints of PGP public keys from a decomposed directory structure
Parameters
----------
path: Path
The path to a decomposed directory structure
Returns
-------
Dict[Fingerprint, Username]
A dict of all fingerprints and their usernames of PGP public keys below path
"""
fingerprints: Dict[Fingerprint, Username] = {}
for cert in sorted(get_cert_paths([path])):
fingerprints[Fingerprint(cert.name)] = Username(cert.parent.name)
debug(f"Fingerprints of PGP public keys in {path}: {fingerprints}")
return fingerprints
def get_fingerprints(working_dir: Path, decomposed_paths: List[Path]) -> Dict[Fingerprint, Username]:
"""Get the fingerprints of PGP public keys from input paths and decomposed directory structures
Parameters
----------
working_dir: Path
A directory to use for temporary files
decomposed_paths: List[Path]
A list of paths that identify decomposed PGP data in directory structures
Returns
-------
Set[Fingerprint]
A dict of all fingerprints and their usernames of PGP public keys below path
"""
fingerprints: Dict[Fingerprint, Username] = {}
fingerprints.update(
get_fingerprints_from_import_source(
working_dir=working_dir,
source=args.source,
)
)
for decomposed_path in decomposed_paths:
fingerprints.update(get_fingerprints_from_decomposed_dir(path=decomposed_path))
return fingerprints
def export(
working_dir: Path,
keyring_root: Path,
sources: Optional[List[Path]] = None,
output: Optional[Path] = None,
) -> str:
"""Export all provided PGP packet files to a single output file
If sources contains directories, any .asc files below them are considered.
Parameters
----------
working_dir: Path
A directory to use for temporary files
keyring_root: Path
The keyring root directory to look up username shorthand sources
sources: Optional[List[Path]]
A list of directories or files from which to read PGP packet information (defaults to `keyring_root`)
output: Optional[Path]
An output file that all PGP packet data is written to, return the result instead if None
Returns
-------
str
The result if no output file has been used
"""
if not sources:
sources = [keyring_root]
# resolve shorthand username exports for packager keys
for index, source in enumerate(sources):
packager_source = keyring_root / "packager" / source.name
if not source.exists() and packager_source.exists():
sources[index] = packager_source
temp_dir = Path(mkdtemp(dir=working_dir, prefix="arch-keyringctl-export-join-")).absolute()
cert_paths: Set[Path] = get_cert_paths(sources)
certificates: List[Path] = []
for cert_dir in sorted(cert_paths):
cert_path = temp_dir / f"{cert_dir.name}.asc"
debug(f"Joining {cert_dir} in {cert_path}")
packet_join(
packets=sorted(cert_dir.glob("**/*.asc")),
output=cert_path,
force=True,
)
certificates.append(cert_path)
return keyring_merge(certificates, output, force=True)
def build(
working_dir: Path,
keyring_root: Path,
target_dir: Path,
) -> None:
"""Build keyring PGP artifacts alongside ownertrust and revoked status files
Parameters
----------
working_dir: Path
A directory to use for temporary files
keyring_root: Path
The keyring root directory to build the artifacts from
target_dir: Path
Output directory that all artifacts are written to
"""
target_dir.mkdir(parents=True, exist_ok=True)
keyring: Path = target_dir / Path("archlinux.gpg")
export(working_dir=working_dir, keyring_root=keyring_root, output=keyring)
[trusted_main_keys, revoked_main_keys] = export_ownertrust(
certs=[keyring_root / "main"],
output=target_dir / "archlinux-trusted",
)
export_revoked(
certs=[keyring_root],
main_keys=trusted_main_keys + revoked_main_keys,
output=target_dir / "archlinux-revoked",
)
def list_keyring(keyring_root: Path, sources: Optional[List[Path]] = None, main_keys: bool = False) -> None:
"""List certificates in the keyring
If sources contains directories, all certificate below them are considered.
Parameters
----------
keyring_root: Path
The keyring root directory to look up username shorthand sources
sources: Optional[List[Path]]
A list of username or files/directories from which to read PGP packet information (defaults to `keyring_root`)
main_keys: bool
List main keys instead of packager keys (defaults to False)
"""
keyring_dir = keyring_root / ("main" if main_keys else "packager")
if not sources:
sources = list(keyring_dir.iterdir())
# resolve shorthand username exports for packager keys
for index, source in enumerate(sources):
packager_source = keyring_dir / source.name
if not source.exists() and packager_source.exists():
sources[index] = packager_source
username_length = max([len(source.name) for source in sources])
for userdir in sources:
certificates = [cert.name for cert in userdir.iterdir()]
print(f"{userdir.name:<{username_length}} {' '.join(certificates)}")
def inspect_keyring(working_dir: Path, keyring_root: Path, sources: Optional[List[Path]]) -> str:
"""Inspect certificates in the keyring and pretty print the data
If sources contains directories, all certificate below them are considered.
Parameters
----------
working_dir: Path
A directory to use for temporary files
keyring_root: Path
The keyring root directory to look up username shorthand sources
sources: Optional[List[Path]]
A list of username or files/directories from which to read PGP packet information (defaults to `keyring_root`)
Returns
-------
str
The result of the inspect
"""
if not sources:
sources = [keyring_root]
# resolve shorthand username exports for packager keys
for index, source in enumerate(sources):
packager_source = keyring_root / "packager" / source.name
if not source.exists() and packager_source.exists():
sources[index] = packager_source
keyring = Path(mkstemp(dir=working_dir, prefix="packet-", suffix=".asc")[1]).absolute()
export(working_dir=working_dir, keyring_root=keyring_root, sources=sources, output=keyring)
return inspect(
packet=keyring, certifications=True, fingerprints=get_fingerprints_from_decomposed_dir(path=keyring_root)
)
def absolute_path(path: str) -> Path:
"""Return the absolute path of a given str
Parameters
----------
path: str
A string representing a path
Returns
-------
Path
The absolute path representation of path
"""
return Path(path).absolute()
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"-v", "--verbose", action="store_true", help="Causes to print debugging messages about the progress"
)
parser.add_argument("--wait", action="store_true", help="Block before cleaning up the temp directory")
parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="force the execution of subcommands (e.g. overwriting of files)",
)
subcommands = parser.add_subparsers(dest="subcommand")
convert_parser = subcommands.add_parser(
"convert",
help="convert one or multiple PGP public keys to a decomposed directory structure",
)
convert_parser.add_argument("source", type=absolute_path, nargs="+", help="Files or directorie to convert")
convert_parser.add_argument("--target", type=absolute_path, help="Target directory instead of a random tmpdir")
convert_parser.add_argument(
"--name",
type=Username,
default=None,
help="override the username to use (only useful when using a single file as source)",
)
import_parser = subcommands.add_parser(
"import",
help="import one or several PGP keys to the keyring directory structure",
)
import_parser.add_argument("source", type=absolute_path, nargs="+", help="Files or directories to import")
import_parser.add_argument(
"--name",
type=Username,
default=None,
help="override the username to use (only useful when using a single file as source)",
)
import_parser.add_argument("--main", action="store_true", help="Import a main signing key into the keyring")
export_parser = subcommands.add_parser(
"export",
help="export a directory structure of PGP packet data to a combined file",
)
export_parser.add_argument("-o", "--output", type=absolute_path, help="file to write PGP packet data to")
export_parser.add_argument(
"source",
nargs="*",
help="username or files/directories containing PGP packet data (can be provided multiple times)",
type=absolute_path,
)
build_parser = subcommands.add_parser(
"build",
help="build keyring PGP artifacts alongside ownertrust and revoked status files",
)
list_parser = subcommands.add_parser(
"list",
help="list the certificates in the keyring",
)
list_parser.add_argument("--main", action="store_true", help="List main signing keys instead of packager keys")
list_parser.add_argument(
"source",
nargs="*",
help="username or files/directories containing certificates (can be provided multiple times)",
type=absolute_path,
)
inspect_parser = subcommands.add_parser(
"inspect",
help="inspect certificates in the keyring and pretty print the data",
)
inspect_parser.add_argument(
"source",
nargs="*",
help="username or directories containing certificates",
type=absolute_path,
)
args = parser.parse_args()
if args.verbose:
basicConfig(level=DEBUG)
# temporary working directory that gets auto cleaned
with TemporaryDirectory(prefix="arch-keyringctl-") as tempdir:
keyring_root = Path("keyring").absolute()
working_dir = Path(tempdir)
debug(f"Working directory: {working_dir}")
with cwd(working_dir):
if "convert" == args.subcommand:
target_dir = args.target or Path(mkdtemp(prefix="arch-keyringctl-")).absolute()
print(convert(working_dir, args.source, target_dir=target_dir))
elif "import" == args.subcommand:
target_dir = "main" if args.main else "packager"
print(
convert(
working_dir=working_dir,
source=args.source,
target_dir=keyring_root / target_dir,
name_override=args.name,
fingerprint_filter=set(get_fingerprints(
working_dir=working_dir,
decomposed_paths=[keyring_root / "main", keyring_root / "packager"],
).keys()),
)
)
elif "export" == args.subcommand:
print(
export(
working_dir=working_dir,
keyring_root=keyring_root,
sources=args.source,
output=args.output,
),
end="",
)
elif "build" == args.subcommand:
build(
working_dir=working_dir,
keyring_root=keyring_root,
target_dir=keyring_root.parent / "build",
)
elif "list" == args.subcommand:
list_keyring(
keyring_root=keyring_root,
sources=args.source,
main_keys=args.main,
)
elif "inspect" == args.subcommand:
print(
inspect_keyring(
working_dir=working_dir,
keyring_root=keyring_root,
sources=args.source,
),
end="",
)
else:
parser.print_help()
if args.wait:
print("Press [ENTER] to continue")
input()