feature(keyringctl): add clean functionality to match import-clean

De-duplicate not needed certifications by cleaning the keyring after
import to remove old files when processing revocations. This basically
adds the functionality compared to import-clean.
This commit is contained in:
Levente Polyak 2021-12-08 20:50:45 +01:00
parent d0ea790c6a
commit 37ad62d6e5
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8
3 changed files with 49 additions and 2 deletions

View File

@ -286,6 +286,26 @@ def convert_signature_packet(
raise Exception(f'unknown signature root for "{packet.name}"') raise Exception(f'unknown signature root for "{packet.name}"')
def clean_keyring(keyring: Path) -> None:
"""Clean the keyring by f.e. removing old obsolete certifications with matching revocations.
Parameters
----------
keyring: Root directory of the keyring containing all keys to clean.
"""
for cert in get_cert_paths(paths=[keyring]):
for uid in (cert / "uid").iterdir():
certifications = uid / "certification"
revocations = uid / "revocation"
if not certifications.exists() or not revocations.exists():
continue
for revocation in revocations.iterdir():
certification = certifications / revocation.name
if certification.exists():
debug(f"Cleaning up old certification {certification} for revocation {revocation}")
certification.unlink()
def convert_certificate( def convert_certificate(
working_dir: Path, working_dir: Path,
certificate: Path, certificate: Path,
@ -503,6 +523,7 @@ def persist_key_material(
persist_uid_certifications( persist_uid_certifications(
certifications=certifications, certifications=certifications,
revocations=revocations,
key_dir=key_dir, key_dir=key_dir,
) )
@ -688,6 +709,7 @@ def persist_direct_key_revocations(
def persist_uid_certifications( def persist_uid_certifications(
certifications: Dict[Uid, Dict[Fingerprint, List[Path]]], certifications: Dict[Uid, Dict[Fingerprint, List[Path]]],
revocations: Dict[Uid, Dict[Fingerprint, List[Path]]],
key_dir: Path, key_dir: Path,
) -> None: ) -> None:
"""Persist the certifications of a root key to file(s) """Persist the certifications of a root key to file(s)
@ -696,14 +718,20 @@ def persist_uid_certifications(
PositiveCertifications for all User IDs of the given root key. PositiveCertifications for all User IDs of the given root key.
All certifications are persisted in per User ID certification directories below key_dir. All certifications are persisted in per User ID certification directories below key_dir.
Certifications that have a matching revocation are skipped to match behavior of import-clean.
Parameters Parameters
---------- ----------
certifications: The certifications to write to file certifications: The certifications to write to file
revocations: The revocations to check against if certifications need to be persisted.
key_dir: The root directory below which certifications are persisted key_dir: The root directory below which certifications are persisted
""" """
for uid, uid_certifications in certifications.items(): for uid, uid_certifications in certifications.items():
for issuer, issuer_certifications in uid_certifications.items(): for issuer, issuer_certifications in uid_certifications.items():
# skip certifications if there is a revocation present to match import-clean behavior
if uid in revocations and issuer in revocations[uid] and revocations[uid][issuer]:
continue
certification_dir = key_dir / "uid" / simplify_uid(uid) / "certification" certification_dir = key_dir / "uid" / simplify_uid(uid) / "certification"
certification_dir.mkdir(parents=True, exist_ok=True) certification_dir.mkdir(parents=True, exist_ok=True)
certification = latest_certification(issuer_certifications) certification = latest_certification(issuer_certifications)
@ -826,6 +854,8 @@ def convert(
(target_dir / user_dir.name).mkdir(parents=True, exist_ok=True) (target_dir / user_dir.name).mkdir(parents=True, exist_ok=True)
copytree(src=user_dir, dst=(target_dir / user_dir.name), dirs_exist_ok=True) copytree(src=user_dir, dst=(target_dir / user_dir.name), dirs_exist_ok=True)
clean_keyring(keyring=target_dir)
return target_dir return target_dir

View File

@ -159,6 +159,11 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
) )
if issuer != sig.stem: if issuer != sig.stem:
raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}") raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}")
certification = uid_path.parent / "certification" / sig.name
if certification.exists():
raise Exception(f"Certification exists for revocation {str(sig)}: {certification}")
debug(f"OK: {sig}") debug(f"OK: {sig}")
else: else:
raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(uid_path)}") raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(uid_path)}")

View File

@ -25,6 +25,7 @@ from libkeyringctl.types import Username
from .conftest import create_certificate from .conftest import create_certificate
from .conftest import create_key_revocation from .conftest import create_key_revocation
from .conftest import create_signature_revocation
from .conftest import create_uid_certification from .conftest import create_uid_certification
from .conftest import test_all_fingerprints from .conftest import test_all_fingerprints
from .conftest import test_certificates from .conftest import test_certificates
@ -563,7 +564,7 @@ def test_convert(working_dir: Path, keyring_dir: Path) -> None:
working_dir=working_dir, working_dir=working_dir,
keyring_root=keyring_dir, keyring_root=keyring_dir,
sources=test_certificates[Username("foobar")], sources=test_certificates[Username("foobar")],
target_dir=keyring_dir, target_dir=keyring_dir / "packager",
) )
with raises(Exception): with raises(Exception):
@ -571,10 +572,21 @@ def test_convert(working_dir: Path, keyring_dir: Path) -> None:
working_dir=working_dir, working_dir=working_dir,
keyring_root=keyring_dir, keyring_root=keyring_dir,
sources=test_keys[Username("foobar")], sources=test_keys[Username("foobar")],
target_dir=keyring_dir, target_dir=keyring_dir / "packager",
) )
@create_certificate(username=Username("main"), uids=[Uid("main <foo@bar.xyz>")], keyring_type="main")
@create_certificate(username=Username("foobar"), uids=[Uid("foobar <foo@bar.xyz>")])
@create_uid_certification(issuer=Username("main"), certified=Username("foobar"), uid=Uid("foobar <foo@bar.xyz>"))
@create_signature_revocation(issuer=Username("main"), certified=Username("foobar"), uid=Uid("foobar <foo@bar.xyz>"))
def test_clean_keyring(working_dir: Path, keyring_dir: Path) -> None:
# first pass clean up certification
keyring.clean_keyring(keyring=keyring_dir)
# second pass skipping clean up because lack of certification
keyring.clean_keyring(keyring=keyring_dir)
@create_certificate(username=Username("main"), uids=[Uid("main <foo@bar.xyz>")], keyring_type="main") @create_certificate(username=Username("main"), uids=[Uid("main <foo@bar.xyz>")], keyring_type="main")
@create_certificate(username=Username("other_main"), uids=[Uid("other main <foo@bar.xyz>")], keyring_type="main") @create_certificate(username=Username("other_main"), uids=[Uid("other main <foo@bar.xyz>")], keyring_type="main")
@create_certificate(username=Username("foobar"), uids=[Uid("foobar <foo@bar.xyz>")]) @create_certificate(username=Username("foobar"), uids=[Uid("foobar <foo@bar.xyz>")])