keyringctl: Persist direct signatures generically

keyringctl:
Rename `persist_direct_keys()` to `persist_direct_sigs()` as it is now
not only handling the persistence of DirectKeys but also *Certifications
directly on a root key (those without an explicit User ID).
Add inline function `add_packet_to_direct_sigs()` to
`convert_certificate()` to generically add direct signatures on a root
key, grouped by issuer.
Change `convert_certificate()` to add Certifications on a root key
(without a specified User ID) to the list of direct_sigs, so that they
are persisted alongside any existing DirectKeys.
Remove breakpoints from `persist_certifications()` as they are no longer
reached. The function is now solely used for Certifications on User IDs.
This commit is contained in:
David Runge 2021-10-03 19:24:49 +02:00 committed by Levente Polyak
parent 0d32d2f00a
commit a77b334859
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8

View File

@ -26,7 +26,6 @@ from tempfile import mkdtemp
from logging import basicConfig
from logging import debug
from logging import error
from logging import DEBUG
from typing import Any
@ -80,7 +79,7 @@ def system(cmd: List[str], exit_on_error: bool = True) -> str:
def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Path:
certificate_fingerprint: Optional[str] = None
pubkey: Optional[Path] = None
direct_keys: Dict[str, Dict[str, List[Path]]] = {}
direct_sigs: Dict[str, Dict[str, List[Path]]] = {}
current_packet_mode: Optional[str] = None
current_packet_key: Optional[str] = None
uids: Dict[str, Path] = {}
@ -90,6 +89,34 @@ def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Pat
certifications: Dict[str, List[Path]] = defaultdict(list)
revocations: Dict[str, List[Path]] = defaultdict(list)
def add_packet_to_direct_sigs(
direct_sigs: Dict[str, Dict[str, List[Path]]],
issuer: str,
packet_key: str,
packet: Path,
) -> Dict[str, Dict[str, List[Path]]]:
"""Add a packet to the set of DirectKeys
If no key with the given packet_key exists yet, it is created.
Parameters
----------
direct_sigs: Dict[str, Dict[str, List[Path]]]
The signatures directly on a root key (such as DirectKey or *Certifications without a specific User ID)
issuer: str
The issuer of the signature
packet: Path
The path to the packet
packet_key: str
The key identifying the packet (e.g. its Fingerprint)
"""
if not direct_sigs.get(packet_key):
direct_sigs = direct_sigs | {packet_key: defaultdict(list)}
direct_sigs[packet_key][issuer].append(packet)
return direct_sigs
# XXX: KeyRevocation
# XXX: PrimaryKeyBinding
@ -123,10 +150,12 @@ def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Pat
signature_type = packet_dump_field(packet, 'Type')
if signature_type == 'DirectKey':
if not direct_keys.get(current_packet_key):
direct_keys = direct_keys | {current_packet_key: defaultdict(list)}
direct_keys[current_packet_key][issuer].append(packet)
direct_sigs = add_packet_to_direct_sigs(
direct_sigs=direct_sigs,
issuer=issuer,
packet_key=current_packet_key,
packet=packet,
)
continue
if not current_packet_key:
@ -144,6 +173,18 @@ def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Pat
raise Exception(f'unknown signature type: {signature_type}')
else:
if signature_type.endswith('Certification'):
# NOTE: here we are only considering signatures directly on the root key
# signatures on a User ID, that are not tied to it via a SubkeyBinding are not addressed
if current_packet_key not in uids:
direct_sigs = add_packet_to_direct_sigs(
direct_sigs=direct_sigs,
issuer=issuer,
packet_key=current_packet_key,
packet=packet,
)
# NOTE: here we address all signatures on User IDs (those that are tied to it with a
# SubkeyBinding and those that are not)
else:
certifications[current_packet_key].append(packet)
elif signature_type == 'CertificationRevocation':
revocations[current_packet_key].append(packet)
@ -181,8 +222,8 @@ def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Pat
uids=uids,
)
persist_direct_keys(
direct_keys=direct_keys,
persist_direct_sigs(
direct_sigs=direct_sigs,
pubkey=pubkey,
root_dir=root_dir,
)
@ -248,12 +289,13 @@ def persist_basic_key(
packet_join(packets, root_dir / f'{certificate_fingerprint}.asc')
def persist_direct_keys(
direct_keys: Dict[str, Dict[str, List[Path]]],
def persist_direct_sigs(
direct_sigs: Dict[str, Dict[str, List[Path]]],
pubkey: Path,
root_dir: Path,
) -> None:
"""Persist the DirectKeys on a root key to file(s)
"""Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to
file(s)
Parameters
----------
@ -265,7 +307,7 @@ def persist_direct_keys(
The root directory below which the Directkeys are persisted
"""
for key, current_certifications in direct_keys.items():
for key, current_certifications in direct_sigs.items():
for issuer, certifications in current_certifications.items():
direct_key_dir = root_dir / 'certification'
direct_key_dir.mkdir(parents=True, exist_ok=True)
@ -306,15 +348,7 @@ def persist_certifications(
for certification in current_certifications:
certification_dir = root_dir / key / 'certification'
certification_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(certification, 'Issuer')
# TODO: find a way to get uid binding for pubkey certs
if key not in uids:
error('missing uid')
breakpoint()
if key not in uid_binding_sig:
error('missing binding sig')
breakpoint()
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
output_file = certification_dir / f'{issuer}.asc'