225 lines
6.8 KiB
Python
225 lines
6.8 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
from collections.abc import Iterable
|
|
from datetime import datetime
|
|
from functools import reduce
|
|
from pathlib import Path
|
|
from re import sub
|
|
from tempfile import mkdtemp
|
|
from typing import Dict
|
|
from typing import List
|
|
from typing import Optional
|
|
|
|
from .types import Fingerprint
|
|
from .types import Username
|
|
from .util import cwd
|
|
from .util import natural_sort_path
|
|
from .util import system
|
|
|
|
|
|
def keyring_split(working_dir: Path, keyring: Path, preserve_filename: bool = False) -> Iterable[Path]:
|
|
"""Split a file containing a PGP keyring into separate certificate files
|
|
|
|
The original keyring filename is preserved if the split only yields a single certificate.
|
|
If preserve_filename is True, all keyrings are placed into separate directories while preserving
|
|
the filename.
|
|
|
|
The file is split using sq.
|
|
|
|
Parameters
|
|
----------
|
|
working_dir: The path of the working directory below which to create the output files
|
|
keyring: The path of a file containing a PGP keyring
|
|
preserve_filename: If True, all keyrings are placed into separate directories while preserving the filename
|
|
|
|
Returns
|
|
-------
|
|
An iterable over the naturally sorted list of certificate files derived from a keyring
|
|
"""
|
|
|
|
keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute()
|
|
|
|
with cwd(keyring_dir):
|
|
system(["sq", "keyring", "split", "--prefix", "", str(keyring)])
|
|
|
|
keyrings: List[Path] = list(natural_sort_path(keyring_dir.iterdir()))
|
|
|
|
if 1 == len(keyrings) or preserve_filename:
|
|
for index, key in enumerate(keyrings):
|
|
keyring_sub_dir = Path(mkdtemp(dir=keyring_dir, prefix=f"{keyring.name}-")).absolute()
|
|
keyrings[index] = key.rename(keyring_sub_dir / keyring.name)
|
|
|
|
return keyrings
|
|
|
|
|
|
def keyring_merge(certificates: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
|
|
"""Merge multiple certificates into a keyring
|
|
|
|
Parameters
|
|
----------
|
|
certificates: List of paths to certificates to merge into a keyring
|
|
output: Path to a file which the keyring is written, return the result instead if None
|
|
force: Whether to force overwriting existing files (defaults to False)
|
|
|
|
Returns
|
|
-------
|
|
The result if no output file has been used
|
|
"""
|
|
|
|
cmd = ["sq", "keyring", "merge"]
|
|
if force:
|
|
cmd.insert(1, "--force")
|
|
if output:
|
|
cmd += ["--output", str(output)]
|
|
cmd += [str(cert) for cert in sorted(certificates)]
|
|
|
|
return system(cmd)
|
|
|
|
|
|
def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
|
|
"""Split a file containing a PGP certificate into separate packet files
|
|
|
|
The files are split using sq
|
|
|
|
Parameters
|
|
----------
|
|
working_dir: The path of the working directory below which to create the output files
|
|
certificate: The absolute path of a file containing one PGP certificate
|
|
|
|
Returns
|
|
-------
|
|
An iterable over the naturally sorted list of packet files derived from certificate
|
|
"""
|
|
|
|
packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute()
|
|
|
|
with cwd(packet_dir):
|
|
system(["sq", "packet", "split", "--prefix", "", str(certificate)])
|
|
return natural_sort_path(packet_dir.iterdir())
|
|
|
|
|
|
def packet_join(packets: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
|
|
"""Join PGP packet data in files to a single output file
|
|
|
|
Parameters
|
|
----------
|
|
packets: A list of paths to files that contain PGP packet data
|
|
output: Path to a file to which all PGP packet data is written, return the result instead if None
|
|
force: Whether to force overwriting existing files (defaults to False)
|
|
|
|
Returns
|
|
-------
|
|
The result if no output file has been used
|
|
"""
|
|
|
|
cmd = ["sq", "packet", "join"]
|
|
if force:
|
|
cmd.insert(1, "--force")
|
|
packets_str = list(map(lambda path: str(path), packets))
|
|
cmd.extend(packets_str)
|
|
cmd.extend(["--output", str(output)])
|
|
return system(cmd)
|
|
|
|
|
|
def inspect(
|
|
packet: Path, certifications: bool = True, fingerprints: Optional[Dict[Fingerprint, Username]] = None
|
|
) -> str:
|
|
"""Inspect PGP packet data and return the result
|
|
|
|
Parameters
|
|
----------
|
|
packet: Path to a file that contain PGP data
|
|
certifications: Whether to print third-party certifications
|
|
fingerprints: Optional dict of fingerprints to usernames to enrich the output with
|
|
|
|
Returns
|
|
-------
|
|
The result of the inspection
|
|
"""
|
|
|
|
cmd = ["sq", "inspect"]
|
|
if certifications:
|
|
cmd.append("--certifications")
|
|
cmd.append(str(packet))
|
|
result: str = system(cmd)
|
|
|
|
if fingerprints:
|
|
for fingerprint, username in fingerprints.items():
|
|
result = sub(f"{fingerprint}", f"{fingerprint} {username}", result)
|
|
result = sub(f" {fingerprint[24:]}", f" {fingerprint[24:]} {username}", result)
|
|
|
|
return result
|
|
|
|
|
|
def packet_dump(packet: Path) -> str:
|
|
"""Dump a PGP packet to string
|
|
|
|
The `sq packet dump` command is used to retrieve a dump of information from a PGP packet
|
|
|
|
Parameters
|
|
----------
|
|
packet: The path to the PGP packet to retrieve the value from
|
|
|
|
Returns
|
|
-------
|
|
The contents of the packet dump
|
|
"""
|
|
|
|
return system(["sq", "packet", "dump", str(packet)])
|
|
|
|
|
|
def packet_dump_field(packet: Path, field: str) -> str:
|
|
"""Retrieve the value of a field from a PGP packet
|
|
|
|
Parameters
|
|
----------
|
|
packet: The path to the PGP packet to retrieve the value from
|
|
field: The name of the field
|
|
|
|
Raises
|
|
------
|
|
Exception: If the field is not found in the PGP packet
|
|
|
|
Returns
|
|
-------
|
|
The value of the field found in packet
|
|
"""
|
|
|
|
dump = packet_dump(packet)
|
|
lines = [line.strip() for line in dump.splitlines()]
|
|
lines = list(filter(lambda line: line.strip().startswith(f"{field}: "), lines))
|
|
if not lines:
|
|
raise Exception(f'Packet has no field "{field}"')
|
|
return lines[0].split(sep=": ", maxsplit=1)[1]
|
|
|
|
|
|
def packet_signature_creation_time(packet: Path) -> datetime:
|
|
"""Retrieve the signature creation time field as datetime
|
|
|
|
Parameters
|
|
----------
|
|
packet: The path to the PGP packet to retrieve the value from
|
|
|
|
Returns
|
|
-------
|
|
The signature creation time as datetime
|
|
"""
|
|
return datetime.strptime(packet_dump_field(packet, "Signature creation time"), "%Y-%m-%d %H:%M:%S %Z")
|
|
|
|
|
|
def latest_certification(certifications: Iterable[Path]) -> Path:
|
|
"""Returns the latest certification based on the signature creation time from a list of packets.
|
|
|
|
Parameters
|
|
----------
|
|
certifications: List of certification from which to choose the latest from
|
|
|
|
Returns
|
|
-------
|
|
The latest certification from a list of packets
|
|
"""
|
|
return reduce(
|
|
lambda a, b: a if packet_signature_creation_time(a) > packet_signature_creation_time(b) else b,
|
|
certifications,
|
|
)
|