keyringctl: Split out subkeys to separate structure

keyringctl:
Add `persist_subkeys()` and `persist_subkey_revocations()` to persist
the Public-Subkeys and the SubkeyRevocations of a root key out into a
dedicated directory structure below the respective Public-Key.
Change `persist_basic_key()` to not persist the Public-Subkeys and
SubkeyRevocations of a root key anymore and to output debug information
before writing to file.
Change `convert_certificate()` to refer to Public-Subkeys and
PublicSubkeyBinding as `subkeys` and `subkey_binding_sigs`
(respectively) and to explicitly refer to the main certificate
fingerprint when aggregating the data about them. Add
`subkey_revocations` to track any SubkeyRevocations of a given
Public-Subkey, so that it can be persisted to file.
This commit is contained in:
David Runge 2021-10-09 12:56:33 +02:00 committed by Levente Polyak
parent c4fbd95041
commit 7e04c50a16
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8

View File

@ -85,8 +85,9 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
current_packet_key: Optional[str] = None current_packet_key: Optional[str] = None
uids: Dict[str, Path] = {} uids: Dict[str, Path] = {}
uid_binding_sig: Dict[str, Path] = {} uid_binding_sig: Dict[str, Path] = {}
subkey: Dict[str, Path] = {} subkeys: Dict[str, Dict[str, Path]] = {}
subkey_binding_sig: Dict[str, Path] = {} subkey_binding_sigs: Dict[str, Dict[str, Path]] = {}
subkey_revocations: Dict[str, Dict[str, Path]] = {}
certifications: Dict[str, List[Path]] = defaultdict(list) certifications: Dict[str, List[Path]] = defaultdict(list)
revocations: Dict[str, List[Path]] = defaultdict(list) revocations: Dict[str, List[Path]] = defaultdict(list)
username = name_override or certificate.name.split(".")[0] username = name_override or certificate.name.split(".")[0]
@ -142,7 +143,15 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
fingerprint = packet_dump_field(packet, 'Fingerprint') fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'subkey' current_packet_mode = 'subkey'
current_packet_key = fingerprint current_packet_key = fingerprint
subkey[fingerprint] = packet
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
if not subkeys.get(certificate_fingerprint):
subkeys |= {certificate_fingerprint: {fingerprint: packet}}
else:
subkeys[certificate_fingerprint] |= {fingerprint: packet}
elif packet.name.endswith('--Signature'): elif packet.name.endswith('--Signature'):
if not certificate_fingerprint: if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"') raise Exception('missing certificate fingerprint for "{packet.name}"')
@ -170,7 +179,6 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
if signature_type == 'PositiveCertification': if signature_type == 'PositiveCertification':
uid_binding_sig[current_packet_key] = packet uid_binding_sig[current_packet_key] = packet
elif signature_type == 'CertificationRevocation': elif signature_type == 'CertificationRevocation':
# XXX:
revocations[current_packet_key].append(packet) revocations[current_packet_key].append(packet)
elif signature_type == 'KeyRevocation': elif signature_type == 'KeyRevocation':
direct_revocations = add_packet_to_direct_sigs( direct_revocations = add_packet_to_direct_sigs(
@ -202,10 +210,15 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
raise Exception(f'unknown signature type: {signature_type}') raise Exception(f'unknown signature type: {signature_type}')
elif current_packet_mode == 'subkey': elif current_packet_mode == 'subkey':
if signature_type == 'SubkeyBinding': if signature_type == 'SubkeyBinding':
subkey_binding_sig[current_packet_key] = packet if not subkey_binding_sigs.get(certificate_fingerprint):
subkey_binding_sigs |= {certificate_fingerprint: {fingerprint: packet}}
else:
subkey_binding_sigs[certificate_fingerprint] |= {fingerprint: packet}
elif signature_type == 'SubkeyRevocation': elif signature_type == 'SubkeyRevocation':
# XXX: if not subkey_revocations.get(certificate_fingerprint):
pass subkey_revocations |= {certificate_fingerprint: {fingerprint: packet}}
else:
subkey_revocations[certificate_fingerprint] |= {fingerprint: packet}
else: else:
raise Exception(f'unknown signature type: {signature_type}') raise Exception(f'unknown signature type: {signature_type}')
else: else:
@ -221,18 +234,29 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
user_dir = (working_dir / username) user_dir = (working_dir / username)
key_dir = (user_dir / certificate_fingerprint) key_dir = (user_dir / certificate_fingerprint)
key_dir.mkdir(parents=True) key_dir.mkdir(parents=True, exist_ok=True)
persist_basic_key( persist_basic_key(
certificate_fingerprint=certificate_fingerprint, certificate_fingerprint=certificate_fingerprint,
pubkey=pubkey, pubkey=pubkey,
key_dir=key_dir, key_dir=key_dir,
subkey=subkey,
subkey_binding_sig=subkey_binding_sig,
uid_binding_sig=uid_binding_sig, uid_binding_sig=uid_binding_sig,
uids=uids, uids=uids,
) )
persist_subkeys(
certificate_fingerprint=certificate_fingerprint,
key_dir=key_dir,
subkeys=subkeys,
subkey_binding_sigs=subkey_binding_sigs,
)
persist_subkey_revocations(
certificate_fingerprint=certificate_fingerprint,
key_dir=key_dir,
subkey_revocations=subkey_revocations,
)
persist_direct_sigs( persist_direct_sigs(
direct_sigs=direct_sigs, direct_sigs=direct_sigs,
pubkey=pubkey, pubkey=pubkey,
@ -269,8 +293,6 @@ def persist_basic_key(
certificate_fingerprint: str, certificate_fingerprint: str,
pubkey: Path, pubkey: Path,
key_dir: Path, key_dir: Path,
subkey: Dict[str, Path],
subkey_binding_sig: Dict[str, Path],
uid_binding_sig: Dict[str, Path], uid_binding_sig: Dict[str, Path],
uids: Dict[str, Path], uids: Dict[str, Path],
) -> None: ) -> None:
@ -288,10 +310,6 @@ def persist_basic_key(
The path to the public key of the root key The path to the public key of the root key
key_dir: Path key_dir: Path
The root directory below which the basic key material is persisted The root directory below which the basic key material is persisted
subkey: Dict[str, Path]
The PublicSubkeys of a key
subkey_binding_sig: Dict[str, Path]
The SubkeyBinding signatures of a Public-Key (the root key)
uid_binding_sig: Dict[str, Path] uid_binding_sig: Dict[str, Path]
The PositiveCertifications of a User ID and Public-Key packet The PositiveCertifications of a User ID and Public-Key packet
uids: Dict[str, Path] uids: Dict[str, Path]
@ -301,10 +319,66 @@ def persist_basic_key(
packets: List[Path] = [pubkey] packets: List[Path] = [pubkey]
for key in uid_binding_sig.keys(): for key in uid_binding_sig.keys():
packets.extend([uids[key], uid_binding_sig[key]]) packets.extend([uids[key], uid_binding_sig[key]])
for key in subkey_binding_sig.keys():
packets.extend([subkey[key], subkey_binding_sig[key]])
packet_join(packets, key_dir / f'{certificate_fingerprint}.asc') output_file = key_dir / f'{certificate_fingerprint}.asc'
debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}')
packet_join(packets, output_file)
def persist_subkeys(
certificate_fingerprint: str,
key_dir: Path,
subkeys: Dict[str, Dict[str, Path]],
subkey_binding_sigs: Dict[str, Dict[str, Path]],
) -> None:
"""Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s)
Parameters
----------
certificate_fingerprint: str
The unique fingerprint of the public key
key_dir: Path
The root directory below which the basic key material is persisted
subkeys: Dict[str, Dict[str, Path]]
The PublicSubkeys of a key
subkey_binding_sigs: Dict[str, Dict[str, Path]]
The SubkeyBinding signatures of a Public-Key (the root key)
"""
if subkeys.get(certificate_fingerprint):
for signature, subkey in subkeys[certificate_fingerprint].items():
packets: List[Path] = []
packets.extend([subkey, subkey_binding_sigs[certificate_fingerprint][signature]])
output_file = key_dir / Path(f'subkeys/{signature}/{signature}.asc')
debug(f'Writing file {output_file} from {[str(packet) for packet in packets]}')
(key_dir / Path(f"subkeys/{signature}")).mkdir(parents=True, exist_ok=True)
packet_join(packets=packets, output=output_file)
def persist_subkey_revocations(
certificate_fingerprint: str,
key_dir: Path,
subkey_revocations: Dict[str, Dict[str, Path]],
) -> None:
"""Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s)
Parameters
----------
certificate_fingerprint: str
The unique fingerprint of the public key
key_dir: Path
The root directory below which the basic key material is persisted
subkey_revocations: Dict[str, Dict[str, Path]]
The SubkeyRevocations of PublicSubkeys of a key
"""
if subkey_revocations.get(certificate_fingerprint):
for signature, revocation in subkey_revocations[certificate_fingerprint].items():
issuer = packet_dump_field(revocation, 'Issuer')
output_file = (key_dir / Path(f'subkeys/{signature}/revocation/{issuer}.asc'))
debug(f'Writing file {output_file} from {revocation}')
(key_dir / Path(f"subkeys/{signature}/revocation")).mkdir(parents=True, exist_ok=True)
packet_join(packets=[revocation], output=output_file)
def persist_direct_sigs( def persist_direct_sigs(