fix(keyringctl): deterministic assembling order with single packet files
Fix the assembling of a certificate by joining the packets in the correct order and splitting combined files into individual single packets per file.
This commit is contained in:
parent
4b136dd6f6
commit
32469720f8
116
keyringctl
116
keyringctl
@ -247,12 +247,11 @@ def convert_certificate( # noqa: ignore=C901
|
|||||||
|
|
||||||
# subkey packets
|
# subkey packets
|
||||||
subkeys: Dict[Fingerprint, Path] = {}
|
subkeys: Dict[Fingerprint, Path] = {}
|
||||||
subkey_binding_sigs: Dict[Fingerprint, Path] = {}
|
subkey_bindings: Dict[Fingerprint, Path] = {}
|
||||||
subkey_revocations: Dict[Fingerprint, Path] = {}
|
subkey_revocations: Dict[Fingerprint, Path] = {}
|
||||||
|
|
||||||
# uid packets
|
# uid packets
|
||||||
uids: Dict[Uid, Path] = {}
|
uids: Dict[Uid, Path] = {}
|
||||||
uid_binding_sigs: Dict[Uid, Path] = {}
|
|
||||||
certifications: Dict[Uid, List[Path]] = defaultdict(list)
|
certifications: Dict[Uid, List[Path]] = defaultdict(list)
|
||||||
revocations: Dict[Uid, List[Path]] = defaultdict(list)
|
revocations: Dict[Uid, List[Path]] = defaultdict(list)
|
||||||
|
|
||||||
@ -311,8 +310,6 @@ def convert_certificate( # noqa: ignore=C901
|
|||||||
|
|
||||||
if signature_type == "CertificationRevocation":
|
if signature_type == "CertificationRevocation":
|
||||||
revocations[current_packet_uid].append(packet)
|
revocations[current_packet_uid].append(packet)
|
||||||
elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer):
|
|
||||||
uid_binding_sigs[current_packet_uid] = packet
|
|
||||||
elif signature_type.endswith("Certification"):
|
elif signature_type.endswith("Certification"):
|
||||||
if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]):
|
if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]):
|
||||||
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
|
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
|
||||||
@ -326,7 +323,7 @@ def convert_certificate( # noqa: ignore=C901
|
|||||||
raise Exception('missing current packet fingerprint for "{packet.name}"')
|
raise Exception('missing current packet fingerprint for "{packet.name}"')
|
||||||
|
|
||||||
if signature_type == "SubkeyBinding":
|
if signature_type == "SubkeyBinding":
|
||||||
subkey_binding_sigs[current_packet_fingerprint] = packet
|
subkey_bindings[current_packet_fingerprint] = packet
|
||||||
elif signature_type == "SubkeyRevocation":
|
elif signature_type == "SubkeyRevocation":
|
||||||
subkey_revocations[certificate_fingerprint] = packet
|
subkey_revocations[certificate_fingerprint] = packet
|
||||||
else:
|
else:
|
||||||
@ -371,7 +368,11 @@ def convert_certificate( # noqa: ignore=C901
|
|||||||
persist_subkeys(
|
persist_subkeys(
|
||||||
key_dir=key_dir,
|
key_dir=key_dir,
|
||||||
subkeys=subkeys,
|
subkeys=subkeys,
|
||||||
subkey_binding_sigs=subkey_binding_sigs,
|
)
|
||||||
|
|
||||||
|
persist_subkey_bindings(
|
||||||
|
key_dir=key_dir,
|
||||||
|
subkey_bindings=subkey_bindings,
|
||||||
)
|
)
|
||||||
|
|
||||||
persist_subkey_revocations(
|
persist_subkey_revocations(
|
||||||
@ -381,7 +382,6 @@ def convert_certificate( # noqa: ignore=C901
|
|||||||
|
|
||||||
persist_uids(
|
persist_uids(
|
||||||
key_dir=key_dir,
|
key_dir=key_dir,
|
||||||
uid_binding_sigs=uid_binding_sigs,
|
|
||||||
uids=uids,
|
uids=uids,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -421,49 +421,63 @@ def persist_public_key(
|
|||||||
|
|
||||||
def persist_uids(
|
def persist_uids(
|
||||||
key_dir: Path,
|
key_dir: Path,
|
||||||
uid_binding_sigs: Dict[Uid, Path],
|
|
||||||
uids: Dict[Uid, Path],
|
uids: Dict[Uid, Path],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Persist the User IDs that belong to a PublicKey
|
"""Persist the User IDs that belong to a PublicKey
|
||||||
|
|
||||||
The User ID material consists of a User ID Packet and a binding signature.
|
The User ID material consists of a single User ID Packet.
|
||||||
The files are written to a UID specific directory and file below key_dir/uid.
|
The files are written to a UID specific directory and file below key_dir/uid.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
key_dir: The root directory below which the basic key material is persisted
|
key_dir: The root directory below which the basic key material is persisted
|
||||||
uid_binding_sigs: The PositiveCertifications of a User ID
|
|
||||||
uids: The User IDs of a Public-Key (the root key)
|
uids: The User IDs of a Public-Key (the root key)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
for key in uid_binding_sigs.keys():
|
for uid, uid_packet in uids.items():
|
||||||
packets = [uids[key], uid_binding_sigs[key]]
|
output_file = key_dir / "uid" / uid / f"{uid}.asc"
|
||||||
output_file = key_dir / "uid" / key / f"{key}.asc"
|
|
||||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
|
debug(f"Writing file {output_file} from {uid_packet}")
|
||||||
packet_join(packets, output_file, force=True)
|
packet_join(packets=[uid_packet], output=output_file, force=True)
|
||||||
|
|
||||||
|
|
||||||
def persist_subkeys(
|
def persist_subkeys(
|
||||||
key_dir: Path,
|
key_dir: Path,
|
||||||
subkeys: Dict[Fingerprint, Path],
|
subkeys: Dict[Fingerprint, Path],
|
||||||
subkey_binding_sigs: Dict[Fingerprint, Path],
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s)
|
"""Persist all Public-Subkeys of a root key file to file(s)
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
key_dir: The root directory below which the basic key material is persisted
|
key_dir: The root directory below which the basic key material is persisted
|
||||||
subkeys: The PublicSubkeys of a key
|
subkeys: The PublicSubkeys of a key
|
||||||
subkey_binding_sigs: The SubkeyBinding signatures of a Public-Key (the root key)
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
for fingerprint, subkey in subkeys.items():
|
for fingerprint, subkey in subkeys.items():
|
||||||
packets: List[Path] = [subkey, subkey_binding_sigs[fingerprint]]
|
|
||||||
output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc"
|
output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc"
|
||||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
|
debug(f"Writing file {output_file} from {str(subkey)}")
|
||||||
packet_join(packets=packets, output=output_file, force=True)
|
packet_join(packets=[subkey], output=output_file, force=True)
|
||||||
|
|
||||||
|
|
||||||
|
def persist_subkey_bindings(
|
||||||
|
key_dir: Path,
|
||||||
|
subkey_bindings: Dict[Fingerprint, Path],
|
||||||
|
) -> None:
|
||||||
|
"""Persist all SubkeyBinding of a root key file to file(s)
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
key_dir: The root directory below which the basic key material is persisted
|
||||||
|
subkey_bindings: The SubkeyBinding signatures of a Public-Subkey
|
||||||
|
"""
|
||||||
|
|
||||||
|
for fingerprint, subkey_binding in subkey_bindings.items():
|
||||||
|
issuer = packet_dump_field(subkey_binding, "Issuer")
|
||||||
|
output_file = key_dir / "subkey" / fingerprint / "certification" / f"{issuer}.asc"
|
||||||
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
debug(f"Writing file {output_file} from {str(subkey_binding)}")
|
||||||
|
packet_join(packets=[subkey_binding], output=output_file, force=True)
|
||||||
|
|
||||||
|
|
||||||
def persist_subkey_revocations(
|
def persist_subkey_revocations(
|
||||||
@ -1041,6 +1055,51 @@ def get_fingerprints(working_dir: Path, sources: Iterable[Path], paths: List[Pat
|
|||||||
return fingerprints
|
return fingerprints
|
||||||
|
|
||||||
|
|
||||||
|
def get_packets_from_path(path: Path) -> List[Path]:
|
||||||
|
"""Collects packets from one level by appending the root, certifications and revocations.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
path: Filesystem path used to collect the packets from
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
A list of packets ordered by root, certification, revocation
|
||||||
|
"""
|
||||||
|
if not path.exists():
|
||||||
|
return []
|
||||||
|
|
||||||
|
packets: List[Path] = []
|
||||||
|
packets += sorted(path.glob("*.asc"))
|
||||||
|
certifications = path / "certification"
|
||||||
|
if certifications.exists():
|
||||||
|
packets += sorted(certifications.glob("*.asc"))
|
||||||
|
revocations = path / "revocation"
|
||||||
|
if revocations.exists():
|
||||||
|
packets += sorted(revocations.glob("*.asc"))
|
||||||
|
return packets
|
||||||
|
|
||||||
|
|
||||||
|
def get_packets_from_listing(path: Path) -> List[Path]:
|
||||||
|
"""Collects packets from a listing of directories holding one level each by calling `get_get_packets_from_path`.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
path: Filesystem path used as listing to collect the packets from
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
A list of packets ordered by root, certification, revocation for each level
|
||||||
|
"""
|
||||||
|
if not path.exists():
|
||||||
|
return []
|
||||||
|
|
||||||
|
packets: List[Path] = []
|
||||||
|
for sub_path in sorted(path.iterdir()):
|
||||||
|
packets += get_packets_from_path(sub_path)
|
||||||
|
return packets
|
||||||
|
|
||||||
|
|
||||||
def export(
|
def export(
|
||||||
working_dir: Path,
|
working_dir: Path,
|
||||||
keyring_root: Path,
|
keyring_root: Path,
|
||||||
@ -1076,14 +1135,19 @@ def export(
|
|||||||
certificates: List[Path] = []
|
certificates: List[Path] = []
|
||||||
|
|
||||||
for cert_dir in sorted(cert_paths):
|
for cert_dir in sorted(cert_paths):
|
||||||
cert_path = temp_dir / f"{cert_dir.name}.asc"
|
packets: List[Path] = []
|
||||||
debug(f"Joining {cert_dir} in {cert_path}")
|
packets += get_packets_from_path(cert_dir)
|
||||||
|
packets += get_packets_from_listing(cert_dir / "subkey")
|
||||||
|
packets += get_packets_from_listing(cert_dir / "uid")
|
||||||
|
|
||||||
|
output_path = temp_dir / f"{cert_dir.name}.asc"
|
||||||
|
debug(f"Joining {cert_dir} in {output_path}")
|
||||||
packet_join(
|
packet_join(
|
||||||
packets=sorted(cert_dir.glob("**/*.asc")),
|
packets=packets,
|
||||||
output=cert_path,
|
output=output_path,
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
certificates.append(cert_path)
|
certificates.append(output_path)
|
||||||
|
|
||||||
return keyring_merge(certificates, output, force=True)
|
return keyring_merge(certificates, output, force=True)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user