chore(keyringctl): modularize the code for overview and testing
This commit is contained in:
224
libkeyringctl/sequoia.py
Normal file
224
libkeyringctl/sequoia.py
Normal file
@ -0,0 +1,224 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from collections.abc import Iterable
|
||||
from datetime import datetime
|
||||
from functools import reduce
|
||||
from pathlib import Path
|
||||
from re import sub
|
||||
from tempfile import mkdtemp
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
from .types import Fingerprint
|
||||
from .types import Username
|
||||
from .util import cwd
|
||||
from .util import natural_sort_path
|
||||
from .util import system
|
||||
|
||||
|
||||
def keyring_split(working_dir: Path, keyring: Path, preserve_filename: bool = False) -> Iterable[Path]:
|
||||
"""Split a file containing a PGP keyring into separate certificate files
|
||||
|
||||
The original keyring filename is preserved if the split only yields a single certificate.
|
||||
If preserve_filename is True, all keyrings are placed into separate directories while preserving
|
||||
the filename.
|
||||
|
||||
The file is split using sq.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
working_dir: The path of the working directory below which to create the output files
|
||||
keyring: The path of a file containing a PGP keyring
|
||||
preserve_filename: If True, all keyrings are placed into separate directories while preserving the filename
|
||||
|
||||
Returns
|
||||
-------
|
||||
An iterable over the naturally sorted list of certificate files derived from a keyring
|
||||
"""
|
||||
|
||||
keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute()
|
||||
|
||||
with cwd(keyring_dir):
|
||||
system(["sq", "keyring", "split", "--prefix", "", str(keyring)])
|
||||
|
||||
keyrings: List[Path] = list(natural_sort_path(keyring_dir.iterdir()))
|
||||
|
||||
if 1 == len(keyrings) or preserve_filename:
|
||||
for index, key in enumerate(keyrings):
|
||||
keyring_sub_dir = Path(mkdtemp(dir=keyring_dir, prefix=f"{keyring.name}-")).absolute()
|
||||
keyrings[index] = key.rename(keyring_sub_dir / keyring.name)
|
||||
|
||||
return keyrings
|
||||
|
||||
|
||||
def keyring_merge(certificates: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
|
||||
"""Merge multiple certificates into a keyring
|
||||
|
||||
Parameters
|
||||
----------
|
||||
certificates: List of paths to certificates to merge into a keyring
|
||||
output: Path to a file which the keyring is written, return the result instead if None
|
||||
force: Whether to force overwriting existing files (defaults to False)
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result if no output file has been used
|
||||
"""
|
||||
|
||||
cmd = ["sq", "keyring", "merge"]
|
||||
if force:
|
||||
cmd.insert(1, "--force")
|
||||
if output:
|
||||
cmd += ["--output", str(output)]
|
||||
cmd += [str(cert) for cert in sorted(certificates)]
|
||||
|
||||
return system(cmd)
|
||||
|
||||
|
||||
def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
|
||||
"""Split a file containing a PGP certificate into separate packet files
|
||||
|
||||
The files are split using sq
|
||||
|
||||
Parameters
|
||||
----------
|
||||
working_dir: The path of the working directory below which to create the output files
|
||||
certificate: The absolute path of a file containing one PGP certificate
|
||||
|
||||
Returns
|
||||
-------
|
||||
An iterable over the naturally sorted list of packet files derived from certificate
|
||||
"""
|
||||
|
||||
packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute()
|
||||
|
||||
with cwd(packet_dir):
|
||||
system(["sq", "packet", "split", "--prefix", "", str(certificate)])
|
||||
return natural_sort_path(packet_dir.iterdir())
|
||||
|
||||
|
||||
def packet_join(packets: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
|
||||
"""Join PGP packet data in files to a single output file
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packets: A list of paths to files that contain PGP packet data
|
||||
output: Path to a file to which all PGP packet data is written, return the result instead if None
|
||||
force: Whether to force overwriting existing files (defaults to False)
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result if no output file has been used
|
||||
"""
|
||||
|
||||
cmd = ["sq", "packet", "join"]
|
||||
if force:
|
||||
cmd.insert(1, "--force")
|
||||
packets_str = list(map(lambda path: str(path), packets))
|
||||
cmd.extend(packets_str)
|
||||
cmd.extend(["--output", str(output)])
|
||||
return system(cmd)
|
||||
|
||||
|
||||
def inspect(
|
||||
packet: Path, certifications: bool = True, fingerprints: Optional[Dict[Fingerprint, Username]] = None
|
||||
) -> str:
|
||||
"""Inspect PGP packet data and return the result
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: Path to a file that contain PGP data
|
||||
certifications: Whether to print third-party certifications
|
||||
fingerprints: Optional dict of fingerprints to usernames to enrich the output with
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result of the inspection
|
||||
"""
|
||||
|
||||
cmd = ["sq", "inspect"]
|
||||
if certifications:
|
||||
cmd.append("--certifications")
|
||||
cmd.append(str(packet))
|
||||
result: str = system(cmd)
|
||||
|
||||
if fingerprints:
|
||||
for fingerprint, username in fingerprints.items():
|
||||
result = sub(f"{fingerprint}", f"{fingerprint} {username}", result)
|
||||
result = sub(f" {fingerprint[24:]}", f" {fingerprint[24:]} {username}", result)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def packet_dump(packet: Path) -> str:
|
||||
"""Dump a PGP packet to string
|
||||
|
||||
The `sq packet dump` command is used to retrieve a dump of information from a PGP packet
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: The path to the PGP packet to retrieve the value from
|
||||
|
||||
Returns
|
||||
-------
|
||||
The contents of the packet dump
|
||||
"""
|
||||
|
||||
return system(["sq", "packet", "dump", str(packet)])
|
||||
|
||||
|
||||
def packet_dump_field(packet: Path, field: str) -> str:
|
||||
"""Retrieve the value of a field from a PGP packet
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: The path to the PGP packet to retrieve the value from
|
||||
field: The name of the field
|
||||
|
||||
Raises
|
||||
------
|
||||
Exception: If the field is not found in the PGP packet
|
||||
|
||||
Returns
|
||||
-------
|
||||
The value of the field found in packet
|
||||
"""
|
||||
|
||||
dump = packet_dump(packet)
|
||||
lines = [line.strip() for line in dump.splitlines()]
|
||||
lines = list(filter(lambda line: line.strip().startswith(f"{field}: "), lines))
|
||||
if not lines:
|
||||
raise Exception(f'Packet has no field "{field}"')
|
||||
return lines[0].split(sep=": ", maxsplit=1)[1]
|
||||
|
||||
|
||||
def packet_signature_creation_time(packet: Path) -> datetime:
|
||||
"""Retrieve the signature creation time field as datetime
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: The path to the PGP packet to retrieve the value from
|
||||
|
||||
Returns
|
||||
-------
|
||||
The signature creation time as datetime
|
||||
"""
|
||||
return datetime.strptime(packet_dump_field(packet, "Signature creation time"), "%Y-%m-%d %H:%M:%S %Z")
|
||||
|
||||
|
||||
def latest_certification(certifications: Iterable[Path]) -> Path:
|
||||
"""Returns the latest certification based on the signature creation time from a list of packets.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
certifications: List of certification from which to choose the latest from
|
||||
|
||||
Returns
|
||||
-------
|
||||
The latest certification from a list of packets
|
||||
"""
|
||||
return reduce(
|
||||
lambda a, b: a if packet_signature_creation_time(a) > packet_signature_creation_time(b) else b,
|
||||
certifications,
|
||||
)
|
Reference in New Issue
Block a user