2021-08-25 11:53:07 -05:00
|
|
|
#!/usr/bin/env python
|
2021-10-16 17:50:34 -05:00
|
|
|
#
|
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
from argparse import ArgumentParser
|
2021-08-25 11:53:07 -05:00
|
|
|
from collections import defaultdict
|
2021-10-16 17:50:34 -05:00
|
|
|
from contextlib import contextmanager
|
2021-10-18 05:58:23 -05:00
|
|
|
from logging import DEBUG
|
|
|
|
from logging import basicConfig
|
|
|
|
from logging import debug
|
|
|
|
from logging import error
|
|
|
|
from os import chdir
|
|
|
|
from os import getcwd
|
2021-08-25 11:53:07 -05:00
|
|
|
from pathlib import Path
|
2021-10-18 05:58:23 -05:00
|
|
|
from re import escape
|
|
|
|
from re import split
|
|
|
|
from re import sub
|
2021-10-04 12:25:24 -05:00
|
|
|
from shutil import copytree
|
2021-10-18 05:58:23 -05:00
|
|
|
from subprocess import PIPE
|
|
|
|
from subprocess import CalledProcessError
|
|
|
|
from subprocess import check_output
|
|
|
|
from sys import exit
|
|
|
|
from sys import stderr
|
|
|
|
from tempfile import TemporaryDirectory
|
|
|
|
from tempfile import mkdtemp
|
|
|
|
from typing import Dict
|
|
|
|
from typing import Iterable
|
|
|
|
from typing import Iterator
|
|
|
|
from typing import List
|
2021-10-19 10:35:59 -05:00
|
|
|
from typing import NewType
|
2021-10-18 05:58:23 -05:00
|
|
|
from typing import Optional
|
|
|
|
from typing import Set
|
|
|
|
from typing import Tuple
|
|
|
|
from typing import Union
|
2021-08-25 11:53:07 -05:00
|
|
|
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
Fingerprint = NewType('Fingerprint', str)
|
|
|
|
Uid = NewType('Uid', str)
|
|
|
|
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
@contextmanager
|
2021-10-12 12:35:18 -05:00
|
|
|
def cwd(new_dir: Path) -> Iterator[None]:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Change to a new current working directory in a context and go back to the previous dir after the context is done
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
new_dir: Path
|
|
|
|
A path to change to
|
|
|
|
"""
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
previous_dir = getcwd()
|
|
|
|
chdir(new_dir)
|
|
|
|
try:
|
|
|
|
yield
|
|
|
|
finally:
|
|
|
|
chdir(previous_dir)
|
|
|
|
|
|
|
|
|
|
|
|
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Sort an Iterable of Paths naturally
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
_list: Iterable[Path]
|
|
|
|
An iterable containing paths to be sorted
|
|
|
|
|
|
|
|
Return
|
|
|
|
------
|
|
|
|
Iterable[Path]
|
|
|
|
An Iterable of paths that are naturally sorted
|
|
|
|
"""
|
|
|
|
|
2021-10-19 11:23:08 -05:00
|
|
|
def convert_text_chunk(text: str) -> Union[int, str]:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Convert input text to int or str
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
text: str
|
|
|
|
An input string
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Union[int, str]
|
|
|
|
Either an integer if text is a digit, else text in lower-case representation
|
|
|
|
"""
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
return int(text) if text.isdigit() else text.lower()
|
|
|
|
|
2021-10-11 06:00:48 -05:00
|
|
|
def alphanum_key(key: Path) -> List[Union[int, str]]:
|
|
|
|
"""Retrieve an alphanumeric key from a Path, that can be used in sorted()
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
key: Path
|
|
|
|
A path for which to create a key
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
List[Union[int, str]]
|
|
|
|
A list of either int or str objects that may serve as 'key' argument for sorted()
|
|
|
|
"""
|
|
|
|
|
2021-10-19 11:23:08 -05:00
|
|
|
return [convert_text_chunk(c) for c in split("([0-9]+)", str(key.name))]
|
2021-08-25 11:53:07 -05:00
|
|
|
|
|
|
|
return sorted(_list, key=alphanum_key)
|
|
|
|
|
|
|
|
|
|
|
|
def system(cmd: List[str], exit_on_error: bool = True) -> str:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Execute a command using check_output
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
cmd: List[str]
|
|
|
|
A list of strings to be fed to check_output
|
|
|
|
exit_on_error: bool
|
|
|
|
Whether to exit the script when encountering an error (defaults to True)
|
|
|
|
|
|
|
|
Raises
|
|
|
|
------
|
|
|
|
CalledProcessError
|
|
|
|
If not exit_on_error and `check_output()` encounters an error
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
str
|
|
|
|
The output of cmd
|
|
|
|
"""
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
try:
|
|
|
|
return check_output(cmd, stderr=PIPE).decode()
|
|
|
|
except CalledProcessError as e:
|
|
|
|
stderr.buffer.write(e.stderr)
|
|
|
|
if exit_on_error:
|
|
|
|
exit(e.returncode)
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
# TODO: simplify to lower complexity
|
|
|
|
def convert_certificate( # noqa: ignore=C901
|
|
|
|
working_dir: Path,
|
|
|
|
certificate: Path,
|
|
|
|
name_override: Optional[str] = None,
|
2021-10-19 10:35:59 -05:00
|
|
|
fingerprint_filter: Optional[Set[Fingerprint]] = None,
|
2021-10-16 17:50:34 -05:00
|
|
|
) -> Path:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets
|
|
|
|
|
|
|
|
The output directory structure is created per user. The username is derived from the certificate (or overridden).
|
|
|
|
Below the username directory a directory tree describes the public keys componentes split up into certifications
|
|
|
|
and revocations, as well as per subkey and per uid certifications and revocations.
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
|
|
|
The path of the working directory below which to create split certificates
|
|
|
|
certificate: Path
|
|
|
|
The path to a public key certificate
|
|
|
|
name_override: Optional[str]
|
|
|
|
An optional string to override the username in the to be created output directory structure
|
2021-10-19 10:35:59 -05:00
|
|
|
fingerprint_filter: Optional[Set[Fingerprint]]
|
2021-10-18 05:47:12 -05:00
|
|
|
An optional list of strings defining fingerprints of PGP public keys that all certificates will be filtered
|
|
|
|
with
|
2021-10-11 06:00:48 -05:00
|
|
|
|
|
|
|
Raises
|
|
|
|
------
|
|
|
|
Exception
|
|
|
|
If required PGP packets are not found
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Path
|
|
|
|
The path of the user_dir (which is located below working_dir)
|
|
|
|
"""
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
# root packets
|
|
|
|
certificate_fingerprint: Optional[Fingerprint] = None
|
2021-08-25 11:53:07 -05:00
|
|
|
pubkey: Optional[Path] = None
|
2021-10-19 10:35:59 -05:00
|
|
|
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]] = {}
|
|
|
|
direct_revocations: Dict[Fingerprint, Dict[str, List[Path]]] = {}
|
|
|
|
|
|
|
|
# subkey packets
|
|
|
|
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]] = {}
|
|
|
|
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]] = {}
|
|
|
|
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]] = {}
|
|
|
|
|
|
|
|
# uid packets
|
|
|
|
uids: Dict[Uid, Path] = {}
|
|
|
|
uid_binding_sigs: Dict[Uid, Path] = {}
|
|
|
|
certifications: Dict[Uid, List[Path]] = defaultdict(list)
|
|
|
|
revocations: Dict[Uid, List[Path]] = defaultdict(list)
|
|
|
|
|
|
|
|
# intermediate variables
|
|
|
|
username: str = name_override or certificate.stem
|
2021-08-25 11:53:07 -05:00
|
|
|
current_packet_mode: Optional[str] = None
|
2021-10-19 10:35:59 -05:00
|
|
|
current_packet_fingerprint: Optional[Fingerprint] = None
|
|
|
|
current_packet_uid: Optional[Uid] = None
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-03 12:24:49 -05:00
|
|
|
def add_packet_to_direct_sigs(
|
2021-10-19 10:35:59 -05:00
|
|
|
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]],
|
2021-10-03 12:24:49 -05:00
|
|
|
issuer: str,
|
2021-10-19 10:35:59 -05:00
|
|
|
packet_key: Fingerprint,
|
2021-10-03 12:24:49 -05:00
|
|
|
packet: Path,
|
2021-10-19 10:35:59 -05:00
|
|
|
) -> Dict[Fingerprint, Dict[str, List[Path]]]:
|
2021-10-03 12:24:49 -05:00
|
|
|
"""Add a packet to the set of DirectKeys
|
|
|
|
|
|
|
|
If no key with the given packet_key exists yet, it is created.
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
2021-10-19 10:35:59 -05:00
|
|
|
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]]
|
2021-10-03 12:24:49 -05:00
|
|
|
The signatures directly on a root key (such as DirectKey or *Certifications without a specific User ID)
|
|
|
|
issuer: str
|
|
|
|
The issuer of the signature
|
|
|
|
packet: Path
|
|
|
|
The path to the packet
|
2021-10-19 10:35:59 -05:00
|
|
|
packet_key: Fingerprint
|
2021-10-03 12:24:49 -05:00
|
|
|
The key identifying the packet (e.g. its Fingerprint)
|
|
|
|
"""
|
|
|
|
|
|
|
|
if not direct_sigs.get(packet_key):
|
|
|
|
direct_sigs = direct_sigs | {packet_key: defaultdict(list)}
|
|
|
|
|
|
|
|
direct_sigs[packet_key][issuer].append(packet)
|
|
|
|
return direct_sigs
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
# XXX: PrimaryKeyBinding
|
|
|
|
|
|
|
|
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
debug(f"Processing certificate {certificate}")
|
2021-10-03 13:05:11 -05:00
|
|
|
|
2021-10-04 15:12:33 -05:00
|
|
|
for packet in packet_split(working_dir=working_dir, certificate=certificate):
|
2021-10-16 17:50:34 -05:00
|
|
|
debug(f"Processing packet {packet.name}")
|
|
|
|
if packet.name.endswith("--PublicKey"):
|
|
|
|
current_packet_mode = "pubkey"
|
2021-10-19 10:35:59 -05:00
|
|
|
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
|
|
|
|
current_packet_uid = None
|
|
|
|
|
|
|
|
certificate_fingerprint = current_packet_fingerprint
|
|
|
|
pubkey = packet
|
2021-10-16 17:50:34 -05:00
|
|
|
elif packet.name.endswith("--UserID"):
|
|
|
|
current_packet_mode = "uid"
|
2021-10-19 10:35:59 -05:00
|
|
|
current_packet_fingerprint = None
|
|
|
|
current_packet_uid = simplify_user_id(Uid(packet_dump_field(packet, "Value")))
|
|
|
|
|
|
|
|
uids[current_packet_uid] = packet
|
2021-10-16 17:50:34 -05:00
|
|
|
elif packet.name.endswith("--PublicSubkey"):
|
|
|
|
current_packet_mode = "subkey"
|
2021-10-19 10:35:59 -05:00
|
|
|
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
|
|
|
|
current_packet_uid = None
|
2021-10-09 05:56:33 -05:00
|
|
|
|
|
|
|
if not certificate_fingerprint:
|
|
|
|
raise Exception('missing certificate fingerprint for "{packet.name}"')
|
|
|
|
|
|
|
|
if not subkeys.get(certificate_fingerprint):
|
2021-10-19 10:35:59 -05:00
|
|
|
subkeys |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
|
2021-10-09 05:56:33 -05:00
|
|
|
else:
|
2021-10-19 10:35:59 -05:00
|
|
|
subkeys[certificate_fingerprint] |= {current_packet_fingerprint: packet}
|
2021-10-09 05:56:33 -05:00
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
elif packet.name.endswith("--Signature"):
|
2021-08-25 11:53:07 -05:00
|
|
|
if not certificate_fingerprint:
|
|
|
|
raise Exception('missing certificate fingerprint for "{packet.name}"')
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
issuer = packet_dump_field(packet, "Issuer")
|
|
|
|
signature_type = packet_dump_field(packet, "Type")
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
if current_packet_mode == "pubkey":
|
2021-10-19 10:35:59 -05:00
|
|
|
if not current_packet_fingerprint:
|
|
|
|
raise Exception('missing current packet fingerprint for "{packet.name}"')
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
|
2021-10-09 16:42:13 -05:00
|
|
|
direct_revocations = add_packet_to_direct_sigs(
|
|
|
|
direct_sigs=direct_revocations,
|
|
|
|
issuer=issuer,
|
2021-10-19 10:35:59 -05:00
|
|
|
packet_key=current_packet_fingerprint,
|
2021-10-09 16:42:13 -05:00
|
|
|
packet=packet,
|
|
|
|
)
|
2021-10-16 17:50:34 -05:00
|
|
|
elif signature_type in ["DirectKey", "GenericCertification"]:
|
2021-10-09 16:42:13 -05:00
|
|
|
direct_sigs = add_packet_to_direct_sigs(
|
|
|
|
direct_sigs=direct_sigs,
|
|
|
|
issuer=issuer,
|
2021-10-19 10:35:59 -05:00
|
|
|
packet_key=current_packet_fingerprint,
|
2021-10-09 16:42:13 -05:00
|
|
|
packet=packet,
|
|
|
|
)
|
2021-08-25 11:53:07 -05:00
|
|
|
else:
|
2021-10-16 17:50:34 -05:00
|
|
|
raise Exception(f"unknown signature type: {signature_type}")
|
|
|
|
elif current_packet_mode == "uid":
|
2021-10-19 10:35:59 -05:00
|
|
|
if not current_packet_uid:
|
|
|
|
raise Exception('missing current packet uid for "{packet.name}"')
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
if signature_type == "CertificationRevocation":
|
2021-10-19 10:35:59 -05:00
|
|
|
revocations[current_packet_uid].append(packet)
|
2021-10-16 17:50:34 -05:00
|
|
|
elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer):
|
2021-10-19 10:35:59 -05:00
|
|
|
uid_binding_sigs[current_packet_uid] = packet
|
2021-10-16 17:50:34 -05:00
|
|
|
elif signature_type.endswith("Certification"):
|
2021-10-18 05:47:12 -05:00
|
|
|
if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]):
|
|
|
|
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
|
2021-10-19 10:35:59 -05:00
|
|
|
certifications[current_packet_uid].append(packet)
|
2021-10-18 05:47:12 -05:00
|
|
|
else:
|
|
|
|
debug(f"The certification by issuer {issuer} is not appended because it is not in the filter")
|
2021-10-09 16:42:13 -05:00
|
|
|
else:
|
2021-10-16 17:50:34 -05:00
|
|
|
raise Exception(f"unknown signature type: {signature_type}")
|
|
|
|
elif current_packet_mode == "subkey":
|
2021-10-19 10:35:59 -05:00
|
|
|
if not current_packet_fingerprint:
|
|
|
|
raise Exception('missing current packet fingerprint for "{packet.name}"')
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
if signature_type == "SubkeyBinding":
|
2021-10-09 05:56:33 -05:00
|
|
|
if not subkey_binding_sigs.get(certificate_fingerprint):
|
2021-10-19 10:35:59 -05:00
|
|
|
subkey_binding_sigs |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
|
2021-10-09 05:56:33 -05:00
|
|
|
else:
|
2021-10-19 10:35:59 -05:00
|
|
|
subkey_binding_sigs[certificate_fingerprint] |= {current_packet_fingerprint: packet}
|
2021-10-16 17:50:34 -05:00
|
|
|
elif signature_type == "SubkeyRevocation":
|
2021-10-09 05:56:33 -05:00
|
|
|
if not subkey_revocations.get(certificate_fingerprint):
|
2021-10-19 10:35:59 -05:00
|
|
|
subkey_revocations |= {certificate_fingerprint: {current_packet_fingerprint: packet}}
|
2021-10-09 05:56:33 -05:00
|
|
|
else:
|
2021-10-19 10:35:59 -05:00
|
|
|
subkey_revocations[certificate_fingerprint] |= {current_packet_fingerprint: packet}
|
2021-08-25 11:53:07 -05:00
|
|
|
else:
|
2021-10-16 17:50:34 -05:00
|
|
|
raise Exception(f"unknown signature type: {signature_type}")
|
2021-08-25 11:53:07 -05:00
|
|
|
else:
|
|
|
|
raise Exception(f'unknown signature root for "{packet.name}"')
|
|
|
|
else:
|
|
|
|
raise Exception(f'unknown packet type "{packet.name}"')
|
|
|
|
|
|
|
|
if not certificate_fingerprint:
|
2021-10-16 17:50:34 -05:00
|
|
|
raise Exception("missing certificate fingerprint")
|
2021-08-25 11:53:07 -05:00
|
|
|
|
|
|
|
if not pubkey:
|
2021-10-16 17:50:34 -05:00
|
|
|
raise Exception("missing certificate public-key")
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
user_dir = working_dir / username
|
|
|
|
key_dir = user_dir / certificate_fingerprint
|
2021-10-09 05:56:33 -05:00
|
|
|
key_dir.mkdir(parents=True, exist_ok=True)
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-09 16:42:13 -05:00
|
|
|
persist_public_key(
|
2021-10-03 08:06:33 -05:00
|
|
|
certificate_fingerprint=certificate_fingerprint,
|
|
|
|
pubkey=pubkey,
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir=key_dir,
|
2021-10-09 16:42:13 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
persist_uids(
|
|
|
|
key_dir=key_dir,
|
|
|
|
uid_binding_sigs=uid_binding_sigs,
|
2021-10-03 08:06:33 -05:00
|
|
|
uids=uids,
|
|
|
|
)
|
|
|
|
|
2021-10-09 05:56:33 -05:00
|
|
|
persist_subkeys(
|
|
|
|
certificate_fingerprint=certificate_fingerprint,
|
|
|
|
key_dir=key_dir,
|
|
|
|
subkeys=subkeys,
|
|
|
|
subkey_binding_sigs=subkey_binding_sigs,
|
|
|
|
)
|
|
|
|
|
|
|
|
persist_subkey_revocations(
|
|
|
|
certificate_fingerprint=certificate_fingerprint,
|
|
|
|
key_dir=key_dir,
|
|
|
|
subkey_revocations=subkey_revocations,
|
|
|
|
)
|
|
|
|
|
2021-10-03 12:24:49 -05:00
|
|
|
persist_direct_sigs(
|
|
|
|
direct_sigs=direct_sigs,
|
2021-10-03 08:06:33 -05:00
|
|
|
pubkey=pubkey,
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir=key_dir,
|
2021-10-03 08:06:33 -05:00
|
|
|
)
|
|
|
|
|
2021-10-03 13:05:11 -05:00
|
|
|
persist_direct_sigs(
|
|
|
|
direct_sigs=direct_revocations,
|
|
|
|
pubkey=pubkey,
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir=key_dir,
|
|
|
|
sig_type="revocation",
|
2021-10-03 13:05:11 -05:00
|
|
|
)
|
|
|
|
|
2021-10-03 08:06:33 -05:00
|
|
|
persist_certifications(
|
|
|
|
certifications=certifications,
|
|
|
|
pubkey=pubkey,
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir=key_dir,
|
2021-10-09 16:42:13 -05:00
|
|
|
uid_binding_sig=uid_binding_sigs,
|
2021-10-03 08:06:33 -05:00
|
|
|
uids=uids,
|
|
|
|
)
|
|
|
|
|
|
|
|
persist_revocations(
|
|
|
|
pubkey=pubkey,
|
|
|
|
revocations=revocations,
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir=key_dir,
|
2021-10-09 16:42:13 -05:00
|
|
|
uid_binding_sig=uid_binding_sigs,
|
2021-10-03 08:06:33 -05:00
|
|
|
uids=uids,
|
|
|
|
)
|
|
|
|
|
2021-10-03 14:41:09 -05:00
|
|
|
return user_dir
|
2021-10-03 08:06:33 -05:00
|
|
|
|
|
|
|
|
2021-10-09 16:42:13 -05:00
|
|
|
def persist_public_key(
|
2021-10-19 10:35:59 -05:00
|
|
|
certificate_fingerprint: Fingerprint,
|
2021-10-03 08:06:33 -05:00
|
|
|
pubkey: Path,
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir: Path,
|
2021-10-03 08:06:33 -05:00
|
|
|
) -> None:
|
2021-10-09 16:42:13 -05:00
|
|
|
"""Persist the Public-Key packet
|
2021-10-03 08:06:33 -05:00
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
2021-10-19 10:35:59 -05:00
|
|
|
certificate_fingerprint: Fingerprint
|
2021-10-03 08:06:33 -05:00
|
|
|
The unique fingerprint of the public key
|
|
|
|
pubkey: Path
|
|
|
|
The path to the public key of the root key
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir: Path
|
2021-10-03 08:06:33 -05:00
|
|
|
The root directory below which the basic key material is persisted
|
|
|
|
"""
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
packets: List[Path] = [pubkey]
|
2021-10-16 17:50:34 -05:00
|
|
|
output_file = key_dir / f"{certificate_fingerprint}.asc"
|
2021-10-12 17:09:58 -05:00
|
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
2021-10-16 17:50:34 -05:00
|
|
|
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
|
2021-10-09 05:56:33 -05:00
|
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
|
|
|
2021-10-09 16:42:13 -05:00
|
|
|
def persist_uids(
|
|
|
|
key_dir: Path,
|
2021-10-19 10:35:59 -05:00
|
|
|
uid_binding_sigs: Dict[Uid, Path],
|
|
|
|
uids: Dict[Uid, Path],
|
2021-10-09 16:42:13 -05:00
|
|
|
) -> None:
|
|
|
|
"""Persist the User IDs that belong to a PublicKey
|
|
|
|
|
|
|
|
The User ID material consists of PublicSubkeys and their SubkeyBindings.
|
2021-10-12 16:57:20 -05:00
|
|
|
The files are written to a UID specific directory and file below key_dir/uid.
|
2021-10-09 16:42:13 -05:00
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
key_dir: Path
|
|
|
|
The root directory below which the basic key material is persisted
|
2021-10-19 10:35:59 -05:00
|
|
|
uid_binding_sigs: Dict[Uid, Path]
|
2021-10-09 16:42:13 -05:00
|
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
2021-10-19 10:35:59 -05:00
|
|
|
uids: Dict[Uid, Path]
|
2021-10-09 16:42:13 -05:00
|
|
|
The User IDs of a Public-Key (the root key)
|
|
|
|
"""
|
|
|
|
|
|
|
|
for key in uid_binding_sigs.keys():
|
|
|
|
packets = [uids[key], uid_binding_sigs[key]]
|
2021-10-16 17:50:34 -05:00
|
|
|
output_file = key_dir / "uid" / key / f"{key}.asc"
|
2021-10-12 17:09:58 -05:00
|
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
2021-10-16 17:50:34 -05:00
|
|
|
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
|
2021-10-09 16:42:13 -05:00
|
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
|
|
|
2021-10-09 05:56:33 -05:00
|
|
|
def persist_subkeys(
|
2021-10-19 10:35:59 -05:00
|
|
|
certificate_fingerprint: Fingerprint,
|
2021-10-09 05:56:33 -05:00
|
|
|
key_dir: Path,
|
2021-10-19 10:35:59 -05:00
|
|
|
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]],
|
|
|
|
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]],
|
2021-10-09 05:56:33 -05:00
|
|
|
) -> None:
|
|
|
|
"""Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s)
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
2021-10-19 10:35:59 -05:00
|
|
|
certificate_fingerprint: Fingerprint
|
2021-10-09 05:56:33 -05:00
|
|
|
The unique fingerprint of the public key
|
|
|
|
key_dir: Path
|
|
|
|
The root directory below which the basic key material is persisted
|
2021-10-19 10:35:59 -05:00
|
|
|
subkeys: Dict[Fingerprint, Dict[Fingerprint, Path]]
|
2021-10-09 05:56:33 -05:00
|
|
|
The PublicSubkeys of a key
|
2021-10-19 10:35:59 -05:00
|
|
|
subkey_binding_sigs: Dict[Fingerprint, Dict[Fingerprint, Path]]
|
2021-10-09 05:56:33 -05:00
|
|
|
The SubkeyBinding signatures of a Public-Key (the root key)
|
|
|
|
"""
|
|
|
|
|
|
|
|
if subkeys.get(certificate_fingerprint):
|
|
|
|
for signature, subkey in subkeys[certificate_fingerprint].items():
|
|
|
|
packets: List[Path] = []
|
|
|
|
packets.extend([subkey, subkey_binding_sigs[certificate_fingerprint][signature]])
|
2021-10-16 17:50:34 -05:00
|
|
|
output_file = key_dir / "subkey" / signature / f"{signature}.asc"
|
2021-10-12 17:09:58 -05:00
|
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
2021-10-16 17:50:34 -05:00
|
|
|
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
|
2021-10-09 05:56:33 -05:00
|
|
|
packet_join(packets=packets, output=output_file)
|
|
|
|
|
|
|
|
|
|
|
|
def persist_subkey_revocations(
|
2021-10-19 10:35:59 -05:00
|
|
|
certificate_fingerprint: Fingerprint,
|
2021-10-09 05:56:33 -05:00
|
|
|
key_dir: Path,
|
2021-10-19 10:35:59 -05:00
|
|
|
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]],
|
2021-10-09 05:56:33 -05:00
|
|
|
) -> None:
|
|
|
|
"""Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s)
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
2021-10-19 10:35:59 -05:00
|
|
|
certificate_fingerprint: Fingerprint
|
2021-10-09 05:56:33 -05:00
|
|
|
The unique fingerprint of the public key
|
|
|
|
key_dir: Path
|
|
|
|
The root directory below which the basic key material is persisted
|
2021-10-19 10:35:59 -05:00
|
|
|
subkey_revocations: Dict[Fingerprint, Dict[Fingerprint, Path]]
|
2021-10-09 05:56:33 -05:00
|
|
|
The SubkeyRevocations of PublicSubkeys of a key
|
|
|
|
"""
|
|
|
|
|
|
|
|
if subkey_revocations.get(certificate_fingerprint):
|
|
|
|
for signature, revocation in subkey_revocations[certificate_fingerprint].items():
|
2021-10-16 17:50:34 -05:00
|
|
|
issuer = packet_dump_field(revocation, "Issuer")
|
|
|
|
output_file = key_dir / "subkey" / signature / "revocation" / f"{issuer}.asc"
|
2021-10-12 17:09:58 -05:00
|
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
2021-10-16 17:50:34 -05:00
|
|
|
debug(f"Writing file {output_file} from {revocation}")
|
2021-10-09 05:56:33 -05:00
|
|
|
packet_join(packets=[revocation], output=output_file)
|
2021-10-03 08:06:33 -05:00
|
|
|
|
|
|
|
|
2021-10-03 12:24:49 -05:00
|
|
|
def persist_direct_sigs(
|
2021-10-19 10:35:59 -05:00
|
|
|
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]],
|
2021-10-03 08:06:33 -05:00
|
|
|
pubkey: Path,
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir: Path,
|
2021-10-03 13:05:11 -05:00
|
|
|
sig_type: str = "certification",
|
2021-10-03 08:06:33 -05:00
|
|
|
) -> None:
|
2021-10-03 12:24:49 -05:00
|
|
|
"""Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to
|
|
|
|
file(s)
|
2021-10-03 08:06:33 -05:00
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
2021-10-19 10:35:59 -05:00
|
|
|
direct_sigs: Dict[Fingerprint, Dict[str, List[Path]]]
|
2021-10-03 08:06:33 -05:00
|
|
|
The certifications to write to file
|
|
|
|
pubkey: Path
|
|
|
|
The path to the public key of the root key
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir: Path
|
2021-10-03 08:06:33 -05:00
|
|
|
The root directory below which the Directkeys are persisted
|
2021-10-03 13:05:11 -05:00
|
|
|
sig_type: str
|
|
|
|
The type of direct certification to persist (defaults to 'certification'). This influences the directory name
|
2021-10-03 08:06:33 -05:00
|
|
|
"""
|
|
|
|
|
2021-10-03 12:24:49 -05:00
|
|
|
for key, current_certifications in direct_sigs.items():
|
2021-10-03 08:06:33 -05:00
|
|
|
for issuer, certifications in current_certifications.items():
|
|
|
|
packets = [pubkey] + certifications
|
2021-10-16 17:50:34 -05:00
|
|
|
output_file = key_dir / sig_type / f"{issuer}.asc"
|
2021-10-12 17:09:58 -05:00
|
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
2021-10-16 17:50:34 -05:00
|
|
|
debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
|
2021-10-03 08:06:33 -05:00
|
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
|
|
|
|
|
|
def persist_certifications(
|
2021-10-19 10:35:59 -05:00
|
|
|
certifications: Dict[Uid, List[Path]],
|
2021-10-03 08:06:33 -05:00
|
|
|
pubkey: Path,
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir: Path,
|
2021-10-19 10:35:59 -05:00
|
|
|
uid_binding_sig: Dict[Uid, Path],
|
|
|
|
uids: Dict[Uid, Path],
|
2021-10-03 08:06:33 -05:00
|
|
|
) -> None:
|
|
|
|
"""Persist the certifications of a root key to file(s)
|
|
|
|
|
|
|
|
The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and
|
|
|
|
PositiveCertifications for all User IDs of the given root key.
|
2021-10-03 14:41:09 -05:00
|
|
|
All certifications are persisted in per User ID certification directories below key_dir.
|
2021-10-03 08:06:33 -05:00
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
2021-10-19 10:35:59 -05:00
|
|
|
certifications: Dict[Uid, List[Path]]
|
2021-10-03 08:06:33 -05:00
|
|
|
The certifications to write to file
|
|
|
|
pubkey: Path
|
|
|
|
The path to the public key of the root key
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir: Path
|
2021-10-03 08:06:33 -05:00
|
|
|
The root directory below which certifications are persisted
|
2021-10-19 10:35:59 -05:00
|
|
|
uid_binding_sig: Dict[Uid, Path]
|
2021-10-03 08:06:33 -05:00
|
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
2021-10-19 10:35:59 -05:00
|
|
|
uids: Dict[Uid, Path]
|
2021-10-03 08:06:33 -05:00
|
|
|
The User IDs of a Public-Key (the root key)
|
|
|
|
"""
|
2021-08-25 11:53:07 -05:00
|
|
|
|
|
|
|
for key, current_certifications in certifications.items():
|
|
|
|
for certification in current_certifications:
|
2021-10-16 17:50:34 -05:00
|
|
|
certification_dir = key_dir / "uid" / key / "certification"
|
2021-08-25 11:53:07 -05:00
|
|
|
certification_dir.mkdir(parents=True, exist_ok=True)
|
2021-10-16 17:50:34 -05:00
|
|
|
issuer = packet_dump_field(certification, "Issuer")
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-16 17:21:08 -05:00
|
|
|
if uids.get(key) and uid_binding_sig.get(key):
|
|
|
|
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
|
2021-10-16 17:50:34 -05:00
|
|
|
output_file = certification_dir / f"{issuer}.asc"
|
|
|
|
debug(f"Writing file {output_file} from {certification}")
|
2021-10-16 17:21:08 -05:00
|
|
|
packet_join(packets, output_file)
|
|
|
|
else:
|
|
|
|
error(
|
|
|
|
f"Public key '{pubkey}' does not provide "
|
|
|
|
f"{'the UID binding signature' if not uid_binding_sig.get(key) else ''} for UID '{key}', "
|
2021-10-16 17:50:34 -05:00
|
|
|
"so its certifications can not be used!"
|
|
|
|
)
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-03 08:06:33 -05:00
|
|
|
|
|
|
|
def persist_revocations(
|
|
|
|
pubkey: Path,
|
2021-10-19 10:35:59 -05:00
|
|
|
revocations: Dict[Uid, List[Path]],
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir: Path,
|
2021-10-19 10:35:59 -05:00
|
|
|
uid_binding_sig: Dict[Uid, Path],
|
|
|
|
uids: Dict[Uid, Path],
|
2021-10-03 08:06:33 -05:00
|
|
|
) -> None:
|
|
|
|
"""Persist the revocations of a root key to file(s)
|
|
|
|
|
|
|
|
The revocations include all CertificationRevocations for all User IDs of the given root key.
|
2021-10-03 14:41:09 -05:00
|
|
|
All revocations are persisted in per User ID 'revocation' directories below key_dir.
|
2021-10-03 08:06:33 -05:00
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
pubkey: Path
|
|
|
|
The path to the public key of the root key
|
2021-10-19 10:35:59 -05:00
|
|
|
revocations: Dict[Uid, List[Path]]
|
2021-10-03 08:06:33 -05:00
|
|
|
The revocations to write to file
|
2021-10-03 14:41:09 -05:00
|
|
|
key_dir: Path
|
2021-10-03 08:06:33 -05:00
|
|
|
The root directory below which revocations will be persisted
|
2021-10-19 10:35:59 -05:00
|
|
|
uid_binding_sig: Dict[Uid, Path]
|
2021-10-03 08:06:33 -05:00
|
|
|
The PositiveCertifications of a User ID and Public-Key packet
|
2021-10-19 10:35:59 -05:00
|
|
|
uids: Dict[Uid, Path]
|
2021-10-03 08:06:33 -05:00
|
|
|
The User IDs of a Public-Key (the root key)
|
|
|
|
"""
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
for key, current_revocations in revocations.items():
|
|
|
|
for revocation in current_revocations:
|
2021-10-16 17:50:34 -05:00
|
|
|
revocation_dir = key_dir / "uid" / key / "revocation"
|
2021-08-25 11:53:07 -05:00
|
|
|
revocation_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
issuer = packet_dump_field(revocation, "Issuer")
|
2021-08-25 11:53:07 -05:00
|
|
|
packets = [pubkey, uids[key]]
|
|
|
|
# Binding sigs only exist for 3rd-party revocations
|
|
|
|
if key in uid_binding_sig:
|
|
|
|
packets.append(uid_binding_sig[key])
|
|
|
|
packets.append(revocation)
|
2021-10-16 17:50:34 -05:00
|
|
|
output_file = revocation_dir / f"{issuer}.asc"
|
|
|
|
debug(f"Writing file {output_file} from {revocation}")
|
2021-08-25 11:53:07 -05:00
|
|
|
packet_join(packets, output_file)
|
|
|
|
|
|
|
|
|
|
|
|
def packet_dump(packet: Path) -> str:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Dump a PGP packet to string
|
|
|
|
|
|
|
|
The `sq packet dump` command is used to retrieve a dump of information from a PGP packet
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
packet: Path
|
|
|
|
The path to the PGP packet to retrieve the value from
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
str
|
|
|
|
The contents of the packet dump
|
|
|
|
"""
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
return system(["sq", "packet", "dump", str(packet)])
|
2021-08-25 11:53:07 -05:00
|
|
|
|
|
|
|
|
|
|
|
def packet_dump_field(packet: Path, field: str) -> str:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Retrieve the value of a field from a PGP packet
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
packet: Path
|
|
|
|
The path to the PGP packet to retrieve the value from
|
|
|
|
field: str
|
|
|
|
The name of the field
|
|
|
|
|
|
|
|
Raises
|
|
|
|
------
|
|
|
|
Exception
|
|
|
|
If the field is not found in the PGP packet
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
str
|
|
|
|
The value of the field found in packet
|
|
|
|
"""
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
dump = packet_dump(packet)
|
|
|
|
lines = [line.strip() for line in dump.splitlines()]
|
2021-10-16 17:50:34 -05:00
|
|
|
lines = list(filter(lambda line: line.startswith(f"{field}: "), lines))
|
2021-08-25 11:53:07 -05:00
|
|
|
if not lines:
|
|
|
|
raise Exception(f'Packet has no field "{field}"')
|
|
|
|
return lines[0].split(maxsplit=1)[1]
|
|
|
|
|
|
|
|
|
2021-10-12 12:20:33 -05:00
|
|
|
def keyring_split(working_dir: Path, keyring: Path) -> Iterable[Path]:
|
|
|
|
"""Split a file containing a PGP keyring into separate certificate files
|
2021-10-04 15:12:33 -05:00
|
|
|
|
2021-10-12 12:20:33 -05:00
|
|
|
The file is split using sq
|
2021-10-11 06:00:48 -05:00
|
|
|
|
2021-10-04 15:12:33 -05:00
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
2021-10-12 12:20:33 -05:00
|
|
|
The path of the working directory below which to create the output files
|
|
|
|
keyring: Path
|
|
|
|
The path of a file containing a PGP keyring
|
2021-10-04 15:12:33 -05:00
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
2021-10-12 12:20:33 -05:00
|
|
|
Iterable[Path]
|
|
|
|
An iterable over the naturally sorted list of certificate files derived from a keyring
|
2021-10-04 15:12:33 -05:00
|
|
|
"""
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute()
|
2021-10-12 12:20:33 -05:00
|
|
|
|
|
|
|
with cwd(keyring_dir):
|
2021-10-16 17:50:34 -05:00
|
|
|
system(["sq", "keyring", "split", "--prefix", "", str(keyring)])
|
2021-10-12 12:20:33 -05:00
|
|
|
return natural_sort_path(keyring_dir.iterdir())
|
2021-10-04 15:12:33 -05:00
|
|
|
|
|
|
|
|
|
|
|
def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
|
|
|
|
"""Split a file containing a PGP certificate into separate packet files
|
|
|
|
|
|
|
|
The files are split using sq
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
|
|
|
The path of the working directory below which to create the output files
|
|
|
|
certificate: Path
|
|
|
|
The absolute path of a file containing one PGP certificate
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Iterable[Path]
|
|
|
|
An iterable over the naturally sorted list of packet files derived from certificate
|
|
|
|
"""
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute()
|
2021-10-04 15:12:33 -05:00
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
with cwd(packet_dir):
|
2021-10-16 17:50:34 -05:00
|
|
|
system(["sq", "packet", "split", "--prefix", "", str(certificate)])
|
2021-08-25 11:53:07 -05:00
|
|
|
return natural_sort_path(packet_dir.iterdir())
|
|
|
|
|
|
|
|
|
2021-10-05 11:31:31 -05:00
|
|
|
def packet_join(packets: List[Path], output: Path, force: bool = False) -> None:
|
|
|
|
"""Join PGP packet data in files to a single output file
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
packets: List[Path]
|
|
|
|
A list of paths to files that contain PGP packet data
|
|
|
|
output: Path
|
|
|
|
A file to which all PGP packet data is written
|
|
|
|
force: bool
|
|
|
|
Whether to force the execution of sq (defaults to False)
|
|
|
|
"""
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
cmd = ["sq", "packet", "join"]
|
2021-10-05 11:31:31 -05:00
|
|
|
if force:
|
2021-10-16 17:50:34 -05:00
|
|
|
cmd.insert(1, "--force")
|
2021-08-25 11:53:07 -05:00
|
|
|
packets_str = list(map(lambda path: str(path), packets))
|
|
|
|
cmd.extend(packets_str)
|
2021-10-16 17:50:34 -05:00
|
|
|
cmd.extend(["--output", str(output)])
|
2021-08-25 11:53:07 -05:00
|
|
|
system(cmd, exit_on_error=False)
|
|
|
|
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
def simplify_user_id(user_id: Uid) -> Uid:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Simplify the User ID string to contain more filesystem friendly characters
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
2021-10-19 10:35:59 -05:00
|
|
|
user_id: Uid
|
2021-10-11 06:00:48 -05:00
|
|
|
A User ID string (e.g. 'Foobar McFooface <foobar@foo.face>')
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
2021-10-19 10:35:59 -05:00
|
|
|
Uid
|
2021-10-11 06:00:48 -05:00
|
|
|
The simplified representation of user_id
|
|
|
|
"""
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
user_id_str: str = user_id.replace("@", "_at_")
|
|
|
|
user_id_str = sub("[<>]", "", user_id_str)
|
|
|
|
user_id_str = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id_str)
|
|
|
|
return Uid(user_id_str)
|
2021-08-25 11:53:07 -05:00
|
|
|
|
|
|
|
|
2021-10-17 05:58:19 -05:00
|
|
|
def derive_user_from_target(working_dir: Path, target_dir: Path, certificate: Path) -> Optional[str]:
|
|
|
|
"""Attempt to derive the username of a public key from a target directory
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
|
|
|
A directory to use for temporary files
|
|
|
|
target_dir: Path
|
|
|
|
The directory in which to look up a username
|
|
|
|
certificate: Path
|
|
|
|
A public key file
|
|
|
|
|
|
|
|
Raises
|
|
|
|
------
|
|
|
|
Exception
|
|
|
|
If more than one username is found (a public key can only belong to one individual)
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Optional[str]
|
|
|
|
A string representing the username a public key certificate belongs to, None otherwise
|
|
|
|
"""
|
|
|
|
|
|
|
|
def get_certificate_fingerprint(working_dir: Path, certificate: Path) -> str:
|
|
|
|
"""Get the certificate fingerprint from a PGP public key file
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
|
|
|
A directory to use for temporary files
|
|
|
|
certificate: Path
|
|
|
|
A PGP public key file
|
|
|
|
|
|
|
|
Raises
|
|
|
|
------
|
|
|
|
Exception
|
|
|
|
If no fingerprint can be found in the provided PGP public key file
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
str
|
|
|
|
The fingerprint of the PGP public key file
|
|
|
|
"""
|
|
|
|
|
|
|
|
certificate_fingerprint = ""
|
|
|
|
for packet in packet_split(working_dir=working_dir, certificate=certificate):
|
|
|
|
if packet.name.endswith("--PublicKey"):
|
|
|
|
certificate_fingerprint = packet_dump_field(packet, "Fingerprint")
|
|
|
|
|
|
|
|
if not certificate_fingerprint:
|
|
|
|
raise Exception(f"The public key file {certificate} does not provide a PublicKey fingerprint.")
|
|
|
|
|
|
|
|
return certificate_fingerprint
|
|
|
|
|
|
|
|
certificate_fingerprint = get_certificate_fingerprint(working_dir=working_dir, certificate=certificate)
|
|
|
|
debug(f"Derived fingerprint '{certificate_fingerprint}' from {certificate}")
|
|
|
|
|
|
|
|
matches = list(target_dir.glob(f"*/{certificate_fingerprint}"))
|
|
|
|
|
|
|
|
if len(matches) > 1:
|
|
|
|
raise Exception(
|
|
|
|
f"More than one username found in {target_dir} when probing for fingerprint '{certificate_fingerprint}'!"
|
|
|
|
)
|
|
|
|
elif len(matches) == 0:
|
|
|
|
debug(f"Can not derive username from target directory for certificate {certificate}")
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
username = matches[0].parent.stem
|
|
|
|
debug(f"Successfully derived username '{username}' from target directory for certificate {certificate}")
|
|
|
|
return username
|
|
|
|
|
|
|
|
|
2021-10-03 14:41:09 -05:00
|
|
|
def convert(
|
|
|
|
working_dir: Path,
|
|
|
|
source: Path,
|
2021-10-04 12:25:24 -05:00
|
|
|
target_dir: Path,
|
2021-10-03 14:41:09 -05:00
|
|
|
name_override: Optional[str] = None,
|
2021-10-19 10:35:59 -05:00
|
|
|
fingerprint_filter: Optional[Set[Fingerprint]] = None,
|
2021-10-03 14:41:09 -05:00
|
|
|
) -> Path:
|
2021-10-12 12:20:33 -05:00
|
|
|
"""Convert a path containing PGP certificate material to a decomposed directory structure
|
2021-10-11 06:00:48 -05:00
|
|
|
|
2021-10-12 12:20:33 -05:00
|
|
|
Any input is first split by `keyring_split()` into individual certificates.
|
2021-10-11 06:00:48 -05:00
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
|
|
|
A directory to use for temporary files
|
|
|
|
source: Path
|
2021-10-12 12:39:07 -05:00
|
|
|
A path to a file or directory to decompose
|
2021-10-11 06:00:48 -05:00
|
|
|
target_dir: Path
|
|
|
|
A directory path to write the new directory structure to
|
|
|
|
name_override: Optional[str]
|
|
|
|
An optional username override for the call to `convert_certificate()`
|
2021-10-19 10:35:59 -05:00
|
|
|
fingerprint_filter: Optional[Set[Fingerprint]]
|
2021-10-18 05:47:12 -05:00
|
|
|
An optional set of strings defining fingerprints of PGP public keys that all certificates will be filtered with
|
2021-10-11 06:00:48 -05:00
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Path
|
|
|
|
The directory that contains the resulting directory structure (target_dir)
|
|
|
|
"""
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
directories: List[Path] = []
|
2021-10-12 12:42:22 -05:00
|
|
|
keys: Iterable[Path] = source.iterdir() if source.is_dir() else [source]
|
|
|
|
|
|
|
|
for key in keys:
|
|
|
|
for cert in keyring_split(working_dir=working_dir, keyring=key):
|
2021-10-17 05:58:19 -05:00
|
|
|
name = (
|
|
|
|
name_override
|
|
|
|
or derive_user_from_target(working_dir=working_dir, target_dir=target_dir, certificate=cert)
|
|
|
|
or key.stem
|
|
|
|
)
|
2021-10-18 05:47:12 -05:00
|
|
|
directories.append(
|
|
|
|
convert_certificate(
|
|
|
|
working_dir=working_dir,
|
|
|
|
certificate=cert,
|
|
|
|
name_override=name,
|
|
|
|
fingerprint_filter=fingerprint_filter,
|
|
|
|
)
|
|
|
|
)
|
2021-08-25 11:53:07 -05:00
|
|
|
|
|
|
|
for path in directories:
|
2021-10-16 17:12:57 -05:00
|
|
|
(target_dir / path.name).mkdir(parents=True, exist_ok=True)
|
2021-10-04 12:25:24 -05:00
|
|
|
copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True)
|
2021-08-25 11:53:07 -05:00
|
|
|
|
|
|
|
return target_dir
|
|
|
|
|
|
|
|
|
2021-10-10 11:28:30 -05:00
|
|
|
def temp_join_keys(sources: List[Path], temp_dir: Path, force: bool) -> List[Path]:
|
|
|
|
"""Temporarily join the key material of a given set of keys in a temporary location and return their paths
|
2021-10-05 11:31:31 -05:00
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
sources: List[Path]
|
2021-10-10 11:28:30 -05:00
|
|
|
A list of paths below which PGP packets are found
|
|
|
|
temp_dir: Path
|
|
|
|
The temporary directory below which to join PGP keys
|
2021-10-05 11:31:31 -05:00
|
|
|
force: bool
|
2021-10-10 11:28:30 -05:00
|
|
|
Whether to force the joining of files
|
2021-10-05 11:31:31 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
certs: List[Path] = []
|
|
|
|
|
|
|
|
for source_number, source in enumerate(sources):
|
|
|
|
if source.is_dir():
|
|
|
|
for user_number, user_dir in enumerate(sorted(source.iterdir())):
|
|
|
|
if user_dir.is_dir():
|
|
|
|
for user_cert_number, user_cert_dir in enumerate(sorted(user_dir.iterdir())):
|
|
|
|
if user_cert_dir.is_dir():
|
2021-10-16 17:50:34 -05:00
|
|
|
cert_path = temp_dir / (
|
|
|
|
f"{str(source_number).zfill(4)}"
|
|
|
|
f"-{str(user_number).zfill(4)}"
|
|
|
|
f"-{str(user_cert_number).zfill(4)}.asc"
|
2021-10-05 11:31:31 -05:00
|
|
|
)
|
|
|
|
debug(f"Joining {user_dir.name}/{user_cert_dir.name} in {cert_path}.")
|
|
|
|
packet_join(
|
|
|
|
packets=sorted(user_cert_dir.glob("**/*.asc")),
|
|
|
|
output=cert_path,
|
|
|
|
force=force,
|
|
|
|
)
|
|
|
|
certs.append(cert_path)
|
|
|
|
elif source.is_file() and not source.is_symlink():
|
|
|
|
certs.append(source)
|
|
|
|
|
2021-10-10 11:28:30 -05:00
|
|
|
return certs
|
|
|
|
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[Fingerprint], List[Fingerprint]]:
|
2021-10-10 11:28:30 -05:00
|
|
|
"""Get the fingerprints of all public keys and all fingerprints of all (self) revoked public keys in a directory
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
certs: List[Path]
|
|
|
|
The certificates to trust
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Tuple[List[str], List[str]]
|
|
|
|
A tuple with the first item containing the fingerprints of all public keys and the second item containing the
|
|
|
|
fingerprints of all self-revoked public keys
|
|
|
|
"""
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
all_fingerprints: List[Fingerprint] = []
|
|
|
|
revoked_fingerprints: List[Fingerprint] = []
|
|
|
|
|
|
|
|
# TODO: what about direct key revocations/signatures?
|
2021-10-10 11:28:30 -05:00
|
|
|
|
|
|
|
debug(f"Retrieving all and self-revoked certificates from {[str(cert_dir) for cert_dir in certs]}")
|
|
|
|
for cert_collection in certs:
|
|
|
|
if cert_collection.is_dir():
|
|
|
|
for user_dir in cert_collection.iterdir():
|
|
|
|
if user_dir.is_dir():
|
|
|
|
for cert_dir in user_dir.iterdir():
|
2021-10-19 10:35:59 -05:00
|
|
|
cert_fingerprint = Fingerprint(cert_dir.stem)
|
|
|
|
all_fingerprints.append(cert_fingerprint)
|
2021-10-10 11:28:30 -05:00
|
|
|
for revocation_cert in cert_dir.glob("revocation/*.asc"):
|
2021-10-19 10:35:59 -05:00
|
|
|
if cert_fingerprint.endswith(revocation_cert.stem):
|
|
|
|
debug(f"Revoking {cert_fingerprint} due to self-revocation")
|
|
|
|
revoked_fingerprints.append(cert_fingerprint)
|
2021-10-10 11:28:30 -05:00
|
|
|
|
|
|
|
return (all_fingerprints, revoked_fingerprints)
|
|
|
|
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]:
|
2021-10-10 11:28:30 -05:00
|
|
|
"""Export ownertrust from a set of keys
|
|
|
|
|
|
|
|
The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all
|
|
|
|
non-revoked keys as fully trusted.
|
|
|
|
The exported file is used by pacman-key when importing a keyring (see
|
|
|
|
https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
certs: List[Path]
|
|
|
|
The certificates to trust
|
|
|
|
output: Path
|
|
|
|
The file path to write to
|
|
|
|
"""
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
all_certs, revoked_certs = get_all_and_revoked_certs(certs=certs)
|
2021-10-10 11:28:30 -05:00
|
|
|
trusted_certs = [cert for cert in all_certs if cert not in revoked_certs]
|
|
|
|
|
|
|
|
with open(file=output, mode="w") as trusted_certs_file:
|
|
|
|
for cert in trusted_certs:
|
|
|
|
debug(f"Writing {cert} to {output}")
|
|
|
|
trusted_certs_file.write(f"{cert}:4:\n")
|
|
|
|
|
|
|
|
return (trusted_certs, all_certs)
|
|
|
|
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None:
|
2021-10-10 11:28:30 -05:00
|
|
|
"""Export the PGP revoked status from a set of keys
|
|
|
|
|
|
|
|
The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations
|
|
|
|
by any main key exist.
|
|
|
|
The exported file is used by pacman-key when importing a keyring (see
|
|
|
|
https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
certs: List[Path]
|
|
|
|
A list of directories with keys to check for their revocation status
|
2021-10-19 10:35:59 -05:00
|
|
|
main_keys: List[Fingerprint]
|
2021-10-10 11:28:30 -05:00
|
|
|
A list of strings representing the fingerprints of (current and/or revoked) main keys
|
|
|
|
output: Path
|
|
|
|
The file path to write to
|
|
|
|
min_revoker: int
|
|
|
|
The minimum amount of revocation certificates on a User ID from any main key to deem a public key as revoked
|
|
|
|
(defaults to 2)
|
|
|
|
"""
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
all_certs, revoked_certs = get_all_and_revoked_certs(certs=certs)
|
2021-10-10 11:28:30 -05:00
|
|
|
|
|
|
|
debug(f"Retrieving certificates revoked by main keys from {[str(cert_dir) for cert_dir in certs]}")
|
|
|
|
foreign_revocations: Dict[str, List[str]] = {}
|
|
|
|
for cert_collection in certs:
|
|
|
|
if cert_collection.is_dir():
|
|
|
|
for user_dir in cert_collection.iterdir():
|
|
|
|
if user_dir.is_dir():
|
|
|
|
for cert_dir in user_dir.iterdir():
|
|
|
|
debug(f"Inspecting public key {cert_dir.name}")
|
|
|
|
foreign_revocations[cert_dir.stem] = []
|
2021-10-12 16:57:20 -05:00
|
|
|
for revocation_cert in cert_dir.glob("uid/*/revocation/*.asc"):
|
2021-10-10 11:28:30 -05:00
|
|
|
foreign_revocations[cert_dir.stem] += [
|
2021-10-16 17:50:34 -05:00
|
|
|
revocation_cert.stem
|
|
|
|
for main_key in main_keys
|
2021-10-10 11:28:30 -05:00
|
|
|
if main_key.endswith(revocation_cert.stem)
|
|
|
|
]
|
|
|
|
|
|
|
|
# TODO: find a better (less naive) approach, as this would also match on public certificates,
|
|
|
|
# where some UIDs are signed and others are revoked
|
|
|
|
if len(set(foreign_revocations[cert_dir.stem])) >= min_revoker:
|
|
|
|
debug(
|
|
|
|
f"Revoking {cert_dir.name} due to {set(foreign_revocations[cert_dir.stem])} "
|
|
|
|
"being main key revocations"
|
|
|
|
)
|
2021-10-19 10:35:59 -05:00
|
|
|
revoked_certs.append(Fingerprint(cert_dir.stem))
|
2021-10-10 11:28:30 -05:00
|
|
|
|
|
|
|
with open(file=output, mode="w") as trusted_certs_file:
|
|
|
|
for cert in set(revoked_certs):
|
|
|
|
debug(f"Writing {cert} to {output}")
|
|
|
|
trusted_certs_file.write(f"{cert}\n")
|
|
|
|
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
def get_fingerprints_from_import_source(working_dir: Path, source: Path) -> List[Fingerprint]:
|
2021-10-18 05:47:12 -05:00
|
|
|
"""Get all fingerprints of PGP public keys from import file(s)
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
|
|
|
A directory to use for temporary files
|
|
|
|
source: Path
|
|
|
|
The path to a source file or directory
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
2021-10-19 10:35:59 -05:00
|
|
|
List[Fingerprint]
|
2021-10-18 05:47:12 -05:00
|
|
|
A list of strings representing the fingerprints of PGP public keys found in source
|
|
|
|
"""
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
fingerprints: List[Fingerprint] = []
|
2021-10-18 05:47:12 -05:00
|
|
|
keys: List[Path] = list(source.iterdir()) if source.is_dir() else [source]
|
|
|
|
|
|
|
|
for key in keys:
|
|
|
|
for certificate in keyring_split(working_dir=working_dir, keyring=key):
|
|
|
|
for packet in packet_split(working_dir=working_dir, certificate=certificate):
|
|
|
|
if packet.name.endswith("--PublicKey"):
|
2021-10-19 10:35:59 -05:00
|
|
|
fingerprints += [Fingerprint(packet_dump_field(packet, "Fingerprint"))]
|
2021-10-18 05:47:12 -05:00
|
|
|
|
|
|
|
debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}")
|
|
|
|
return fingerprints
|
|
|
|
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
def get_fingerprints_from_decomposed_dir(path: Path) -> List[Fingerprint]:
|
2021-10-18 05:47:12 -05:00
|
|
|
"""Get all fingerprints of PGP public keys from a decomposed directory structure
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
path: Path
|
|
|
|
The path to a decomposed directory structure
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
2021-10-19 10:35:59 -05:00
|
|
|
List[Fingerprint]
|
2021-10-18 05:47:12 -05:00
|
|
|
A list of strings representing all fingerprints of PGP public keys below path
|
|
|
|
"""
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
fingerprints = [Fingerprint(path.stem) for path in list(path.absolute().glob("*/*"))]
|
2021-10-18 05:47:12 -05:00
|
|
|
debug(f"Fingerprints of PGP public keys in {path}: {fingerprints}")
|
|
|
|
return fingerprints
|
|
|
|
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
def get_fingerprints(working_dir: Path, input_path: Path, decomposed_paths: List[Path]) -> Set[Fingerprint]:
|
2021-10-18 05:47:12 -05:00
|
|
|
"""Get the fingerprints of PGP public keys from input paths and decomposed directory structures
|
|
|
|
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
|
|
|
A directory to use for temporary files
|
|
|
|
input_path: Path
|
|
|
|
The path to a source file or directory
|
|
|
|
decomposed_paths: List[Path]
|
|
|
|
A list of paths that identify decomposed PGP data in directory structures
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
2021-10-19 10:35:59 -05:00
|
|
|
Set[Fingerprint]
|
2021-10-18 05:47:12 -05:00
|
|
|
A set of strings describing fingerprints of PGP public keys
|
|
|
|
"""
|
|
|
|
|
2021-10-19 10:35:59 -05:00
|
|
|
fingerprints: Set[Fingerprint] = set()
|
2021-10-18 05:47:12 -05:00
|
|
|
|
|
|
|
fingerprints.update(
|
|
|
|
get_fingerprints_from_import_source(
|
|
|
|
working_dir=working_dir,
|
|
|
|
source=args.source,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
for decomposed_path in decomposed_paths:
|
|
|
|
fingerprints.update(get_fingerprints_from_decomposed_dir(path=decomposed_path))
|
|
|
|
|
|
|
|
return fingerprints
|
|
|
|
|
|
|
|
|
2021-10-10 11:28:30 -05:00
|
|
|
def export_keyring(
|
|
|
|
working_dir: Path,
|
|
|
|
main: List[Path],
|
|
|
|
sources: List[Path],
|
|
|
|
output: Path,
|
|
|
|
force: bool,
|
|
|
|
pacman_integration: bool,
|
|
|
|
) -> None:
|
|
|
|
"""Export all provided PGP packet files to a single output file
|
|
|
|
|
|
|
|
If sources contains directories, any .asc files below them are considered.
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: Path
|
|
|
|
A directory to use for temporary files
|
|
|
|
main: List[Path]
|
|
|
|
A list of directories or files from which to read PGP packet information, that is considered as public keys
|
|
|
|
that are used to sign those found in sources
|
|
|
|
sources: List[Path]
|
|
|
|
A list of directories or files from which to read PGP packet information
|
|
|
|
output: Path
|
|
|
|
An output file that all PGP packet data is written to
|
|
|
|
force: bool
|
|
|
|
Whether to force the execution of packet_join()
|
|
|
|
"""
|
|
|
|
|
|
|
|
main = [source.absolute() for source in main]
|
|
|
|
sources = [source.absolute() for source in sources]
|
|
|
|
output = output.absolute()
|
|
|
|
|
|
|
|
main_certs = temp_join_keys(sources=main, temp_dir=Path(mkdtemp(dir=working_dir)).absolute(), force=force)
|
|
|
|
sources_certs = temp_join_keys(sources=sources, temp_dir=Path(mkdtemp(dir=working_dir)).absolute(), force=force)
|
|
|
|
debug(
|
|
|
|
f"Creating keyring {output} from {[str(source_dir) for source_dir in main]} "
|
|
|
|
f"and {[str(source_dir) for source_dir in sources]}."
|
|
|
|
)
|
|
|
|
|
2021-10-16 17:12:57 -05:00
|
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
2021-10-16 17:50:34 -05:00
|
|
|
cmd = ["sq", "keyring", "merge", "-o", str(output)]
|
2021-10-05 11:31:31 -05:00
|
|
|
if force:
|
2021-10-16 17:50:34 -05:00
|
|
|
cmd.insert(1, "--force")
|
2021-10-10 11:28:30 -05:00
|
|
|
cmd += [str(cert) for cert in sorted(main_certs)]
|
|
|
|
cmd += [str(cert) for cert in sorted(sources_certs)]
|
2021-10-05 11:31:31 -05:00
|
|
|
system(cmd, exit_on_error=False)
|
|
|
|
|
2021-10-10 11:28:30 -05:00
|
|
|
if pacman_integration:
|
|
|
|
[trusted_main_keys, all_main_keys] = export_ownertrust(
|
|
|
|
certs=main,
|
|
|
|
output=Path(f"{str(output).split('.gpg')[0]}-trusted"),
|
|
|
|
)
|
|
|
|
export_revoked(
|
|
|
|
certs=main + sources,
|
|
|
|
main_keys=all_main_keys,
|
|
|
|
output=Path(f"{str(output).split('.gpg')[0]}-revoked"),
|
|
|
|
)
|
|
|
|
|
2021-10-05 11:31:31 -05:00
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
def absolute_path(path: str) -> Path:
|
2021-10-11 06:00:48 -05:00
|
|
|
"""Return the absolute path of a given str
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
path: str
|
|
|
|
A string representing a path
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Path
|
|
|
|
The absolute path representation of path
|
|
|
|
"""
|
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
return Path(path).absolute()
|
|
|
|
|
|
|
|
|
2021-10-16 17:50:34 -05:00
|
|
|
if __name__ == "__main__":
|
2021-08-25 11:53:07 -05:00
|
|
|
parser = ArgumentParser()
|
2021-10-05 11:31:31 -05:00
|
|
|
parser.add_argument(
|
2021-10-16 17:50:34 -05:00
|
|
|
"-v", "--verbose", action="store_true", help="Causes to print debugging messages about the progress"
|
|
|
|
)
|
|
|
|
parser.add_argument("--wait", action="store_true", help="Block before cleaning up the temp directory")
|
|
|
|
parser.add_argument(
|
|
|
|
"-f",
|
|
|
|
"--force",
|
|
|
|
action="store_true",
|
2021-10-05 11:31:31 -05:00
|
|
|
default=False,
|
2021-10-16 17:50:34 -05:00
|
|
|
help="force the execution of subcommands (e.g. overwriting of files)",
|
2021-10-05 11:31:31 -05:00
|
|
|
)
|
2021-08-25 11:53:07 -05:00
|
|
|
subcommands = parser.add_subparsers(dest="subcommand")
|
|
|
|
|
2021-10-03 08:06:33 -05:00
|
|
|
convert_parser = subcommands.add_parser(
|
2021-10-16 17:50:34 -05:00
|
|
|
"convert",
|
2021-10-11 06:00:48 -05:00
|
|
|
help="import one or multiple PGP public keys and convert them to a decomposed directory structure",
|
2021-10-03 08:06:33 -05:00
|
|
|
)
|
2021-10-16 17:50:34 -05:00
|
|
|
convert_parser.add_argument("source", type=absolute_path, help="File or directory to convert")
|
|
|
|
convert_parser.add_argument("--target", type=absolute_path, help="target directory")
|
2021-10-03 14:41:09 -05:00
|
|
|
convert_parser.add_argument(
|
2021-10-16 17:50:34 -05:00
|
|
|
"--name",
|
2021-10-03 14:41:09 -05:00
|
|
|
type=str,
|
|
|
|
default=None,
|
2021-10-16 17:50:34 -05:00
|
|
|
help="override the username to use (only useful when using a single file as source)",
|
2021-10-03 14:41:09 -05:00
|
|
|
)
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-16 17:12:57 -05:00
|
|
|
import_main_parser = subcommands.add_parser(
|
2021-10-16 17:50:34 -05:00
|
|
|
"import-main", help="import one or several PGP keys to the main signing keys"
|
2021-10-16 17:12:57 -05:00
|
|
|
)
|
2021-10-16 17:50:34 -05:00
|
|
|
import_main_parser.add_argument("source", type=absolute_path, help="File or directory")
|
2021-10-16 17:12:57 -05:00
|
|
|
import_main_parser.add_argument(
|
2021-10-16 17:50:34 -05:00
|
|
|
"--name",
|
2021-10-16 17:12:57 -05:00
|
|
|
type=str,
|
|
|
|
default=None,
|
2021-10-16 17:50:34 -05:00
|
|
|
help="override the username to use (only useful when using a single file as source)",
|
2021-10-16 17:12:57 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
import_packager_parser = subcommands.add_parser(
|
2021-10-16 17:50:34 -05:00
|
|
|
"import-packager", help="import one or several PGP keys to the packager keys"
|
2021-10-16 17:12:57 -05:00
|
|
|
)
|
2021-10-16 17:50:34 -05:00
|
|
|
import_packager_parser.add_argument("source", type=absolute_path, help="File or directory")
|
2021-10-16 17:12:57 -05:00
|
|
|
import_packager_parser.add_argument(
|
2021-10-16 17:50:34 -05:00
|
|
|
"--name",
|
2021-10-16 17:12:57 -05:00
|
|
|
type=str,
|
|
|
|
default=None,
|
2021-10-16 17:50:34 -05:00
|
|
|
help="override the username to use (only useful when using a single file as source)",
|
2021-10-16 17:12:57 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
export_keyring_parser = subcommands.add_parser(
|
2021-10-16 17:50:34 -05:00
|
|
|
"export-keyring",
|
2021-10-16 17:12:57 -05:00
|
|
|
help="export PGP packet data below main/ and packagers/ to output/archlinux.gpg alongside pacman integration",
|
|
|
|
)
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-05 11:31:31 -05:00
|
|
|
export_parser = subcommands.add_parser(
|
2021-10-16 17:50:34 -05:00
|
|
|
"export",
|
2021-10-05 11:31:31 -05:00
|
|
|
help="export a directory structure of PGP packet data to a combined file",
|
|
|
|
)
|
2021-10-16 17:50:34 -05:00
|
|
|
export_parser.add_argument("output", type=absolute_path, help="file to write PGP packet data to")
|
2021-10-10 11:28:30 -05:00
|
|
|
export_parser.add_argument(
|
2021-10-16 17:50:34 -05:00
|
|
|
"-m",
|
|
|
|
"--main",
|
2021-10-10 11:28:30 -05:00
|
|
|
action="append",
|
2021-10-16 17:50:34 -05:00
|
|
|
help="files or directories containing PGP packet data that is trusted (can be provided multiple times)",
|
2021-10-10 11:28:30 -05:00
|
|
|
required=True,
|
|
|
|
type=absolute_path,
|
|
|
|
)
|
2021-10-05 11:31:31 -05:00
|
|
|
export_parser.add_argument(
|
2021-10-16 17:50:34 -05:00
|
|
|
"-s",
|
|
|
|
"--source",
|
2021-10-05 11:31:31 -05:00
|
|
|
action="append",
|
2021-10-16 17:50:34 -05:00
|
|
|
help="files or directories containing PGP packet data (can be provided multiple times)",
|
2021-10-05 11:31:31 -05:00
|
|
|
required=True,
|
|
|
|
type=absolute_path,
|
|
|
|
)
|
2021-10-10 11:28:30 -05:00
|
|
|
export_parser.add_argument(
|
2021-10-16 17:50:34 -05:00
|
|
|
"-p",
|
|
|
|
"--pacman-integration",
|
|
|
|
action="store_true",
|
2021-10-10 11:28:30 -05:00
|
|
|
default=False,
|
2021-10-16 17:50:34 -05:00
|
|
|
help="export trusted and revoked files (used by pacman) alongside the keyring",
|
2021-10-10 11:28:30 -05:00
|
|
|
)
|
2021-10-05 11:31:31 -05:00
|
|
|
|
2021-08-25 11:53:07 -05:00
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
if args.verbose:
|
|
|
|
basicConfig(level=DEBUG)
|
|
|
|
|
|
|
|
# temporary working directory that gets auto cleaned
|
2021-10-16 17:50:34 -05:00
|
|
|
with TemporaryDirectory(prefix="arch-keyringctl-") as tempdir:
|
2021-10-12 12:39:30 -05:00
|
|
|
keyring_root = Path().absolute()
|
2021-08-25 11:53:07 -05:00
|
|
|
working_dir = Path(tempdir)
|
2021-10-16 17:50:34 -05:00
|
|
|
debug(f"Working directory: {working_dir}")
|
2021-10-12 12:22:30 -05:00
|
|
|
with cwd(working_dir):
|
2021-10-16 17:50:34 -05:00
|
|
|
if "convert" == args.subcommand:
|
|
|
|
print(convert(working_dir, args.source, target_dir=Path(mkdtemp(prefix="arch-keyringctl-")).absolute()))
|
|
|
|
elif "import-main" == args.subcommand:
|
2021-10-16 17:12:57 -05:00
|
|
|
print(
|
|
|
|
convert(
|
|
|
|
working_dir=working_dir,
|
|
|
|
source=args.source,
|
|
|
|
target_dir=keyring_root / "main",
|
|
|
|
name_override=args.name,
|
2021-10-18 05:47:12 -05:00
|
|
|
fingerprint_filter=get_fingerprints(
|
|
|
|
working_dir=working_dir,
|
|
|
|
input_path=args.source,
|
|
|
|
decomposed_paths=[keyring_root / "main", keyring_root / "packagers"],
|
|
|
|
),
|
2021-10-16 17:12:57 -05:00
|
|
|
)
|
|
|
|
)
|
2021-10-16 17:50:34 -05:00
|
|
|
elif "import-packager" == args.subcommand:
|
2021-10-16 17:12:57 -05:00
|
|
|
print(
|
|
|
|
convert(
|
|
|
|
working_dir=working_dir,
|
|
|
|
source=args.source,
|
|
|
|
target_dir=keyring_root / "packagers",
|
|
|
|
name_override=args.name,
|
2021-10-18 05:47:12 -05:00
|
|
|
fingerprint_filter=get_fingerprints(
|
|
|
|
working_dir=working_dir,
|
|
|
|
input_path=args.source,
|
|
|
|
decomposed_paths=[keyring_root / "main", keyring_root / "packagers"],
|
|
|
|
),
|
2021-10-16 17:12:57 -05:00
|
|
|
)
|
|
|
|
)
|
2021-10-16 17:50:34 -05:00
|
|
|
elif "export-keyring" == args.subcommand:
|
2021-10-16 17:12:57 -05:00
|
|
|
export_keyring(
|
|
|
|
working_dir=working_dir,
|
|
|
|
main=[keyring_root / "main"],
|
|
|
|
sources=[keyring_root / "packagers"],
|
|
|
|
output=keyring_root / "output" / "archlinux.gpg",
|
|
|
|
force=True,
|
|
|
|
pacman_integration=True,
|
|
|
|
)
|
2021-10-16 17:50:34 -05:00
|
|
|
elif "export" == args.subcommand:
|
2021-10-12 12:22:30 -05:00
|
|
|
export_keyring(
|
|
|
|
working_dir=working_dir,
|
|
|
|
main=args.main,
|
|
|
|
sources=args.source,
|
|
|
|
output=args.output,
|
|
|
|
force=args.force,
|
|
|
|
pacman_integration=args.pacman_integration,
|
|
|
|
)
|
2021-08-25 11:53:07 -05:00
|
|
|
|
2021-10-12 12:22:30 -05:00
|
|
|
if args.wait:
|
2021-10-16 17:50:34 -05:00
|
|
|
print("Press [ENTER] to continue")
|
2021-10-12 12:22:30 -05:00
|
|
|
input()
|