from collections import defaultdict from functools import wraps from pathlib import Path from shutil import copytree from subprocess import PIPE from subprocess import Popen from tempfile import NamedTemporaryFile from tempfile import TemporaryDirectory from typing import Any from typing import Callable from typing import Dict from typing import Generator from typing import List from typing import Optional from typing import Set from pytest import fixture from libkeyringctl.keyring import convert_certificate from libkeyringctl.keyring import export from libkeyringctl.keyring import get_fingerprints_from_keyring_files from libkeyringctl.keyring import simplify_user_id from libkeyringctl.sequoia import certify from libkeyringctl.sequoia import key_extract_certificate from libkeyringctl.sequoia import key_generate from libkeyringctl.sequoia import keyring_merge from libkeyringctl.sequoia import packet_join from libkeyringctl.types import Fingerprint from libkeyringctl.types import Uid from libkeyringctl.types import Username from libkeyringctl.util import cwd from libkeyringctl.util import system test_keys: Dict[Username, List[Path]] = defaultdict(list) test_key_revocation: Dict[Username, List[Path]] = defaultdict(list) test_certificates: Dict[Username, List[Path]] = defaultdict(list) test_certificate_uids: Dict[Username, List[List[Uid]]] = defaultdict(list) test_keyring_certificates: Dict[Username, List[Path]] = defaultdict(list) test_main_fingerprints: Set[Fingerprint] = set() test_all_fingerprints: Set[Fingerprint] = set() @fixture(autouse=True) def reset_storage() -> None: test_keys.clear() test_key_revocation.clear() test_certificates.clear() test_certificate_uids.clear() test_keyring_certificates.clear() test_main_fingerprints.clear() test_all_fingerprints.clear() def create_certificate( username: Username, uids: List[Uid], keyring_type: str = "packager", func: Optional[Callable[..., Any]] = None, ) -> Callable[..., Any]: def decorator(decorated_func: Callable[..., None]) -> Callable[..., Any]: @wraps(decorated_func) def wrapper(working_dir: Path, *args: Any, **kwargs: Any) -> None: key_directory = working_dir / "secret" / f"{username}" key_directory.mkdir(parents=True, exist_ok=True) key_file: Path = key_directory / f"{username}.asc" key_generate(uids=uids, outfile=key_file) test_keys[username].append(key_file) certificate_directory = working_dir / "certificate" / f"{username}" certificate_directory.mkdir(parents=True, exist_ok=True) keyring_root: Path = working_dir / "keyring" keyring_root.mkdir(parents=True, exist_ok=True) certificate_file: Path = certificate_directory / f"{username}.asc" key_extract_certificate(key=key_file, output=certificate_file) test_certificates[username].append(certificate_file) test_certificate_uids[username].append(uids) key_revocation_packet = key_file.parent / f"{key_file.name}.rev" key_revocation_joined = key_file.parent / f"{key_file.name}.joined.rev" key_revocation_cert = key_file.parent / f"{key_file.name}.cert.rev" packet_join(packets=[certificate_file, key_revocation_packet], output=key_revocation_joined) keyring_merge(certificates=[key_revocation_joined], output=key_revocation_cert) test_key_revocation[username].append(key_revocation_cert) target_dir = keyring_root / keyring_type for fingerprint in get_fingerprints_from_keyring_files( working_dir=working_dir, source=[certificate_file] ).keys(): test_all_fingerprints.add(fingerprint) decomposed_path: Path = convert_certificate( working_dir=working_dir, certificate=certificate_file, keyring_dir=keyring_root / keyring_type, fingerprint_filter=test_all_fingerprints, ) user_dir = decomposed_path.parent (target_dir / user_dir.name).mkdir(parents=True, exist_ok=True) copytree(src=user_dir, dst=(target_dir / user_dir.name), dirs_exist_ok=True) test_keyring_certificates[username].append(target_dir / user_dir.name / decomposed_path.name) certificate_fingerprint: Fingerprint = Fingerprint(decomposed_path.name) if "main" == keyring_type: test_main_fingerprints.add(certificate_fingerprint) test_all_fingerprints.add(certificate_fingerprint) decorated_func(working_dir=working_dir, *args, **kwargs) return wrapper if not func: return decorator return decorator(func) def create_uid_certification( issuer: Username, certified: Username, uid: Uid, func: Optional[Callable[[Any], None]] = None ) -> Callable[..., Any]: def decorator(decorated_func: Callable[..., None]) -> Callable[..., Any]: @wraps(decorated_func) def wrapper(working_dir: Path, *args: Any, **kwargs: Any) -> None: key: Path = test_keys[issuer][0] certificate: Path = test_certificates[certified][0] fingerprint: Fingerprint = Fingerprint(test_keyring_certificates[certified][0].name) issuer_fingerprint: Fingerprint = Fingerprint(test_keyring_certificates[issuer][0].name) simplified_uid = simplify_user_id(uid) output: Path = ( working_dir / "keyring" / "packager" / certified / fingerprint / "uid" / simplified_uid / "certification" / f"{issuer_fingerprint}.asc" ) output.parent.mkdir(parents=True, exist_ok=True) certify(key, certificate, uid, output) decorated_func(working_dir=working_dir, *args, **kwargs) return wrapper if not func: return decorator return decorator(func) def create_key_revocation( username: Username, keyring_type: str = "packager", func: Optional[Callable[..., Any]] = None, ) -> Callable[..., Any]: def decorator(decorated_func: Callable[..., None]) -> Callable[..., Any]: @wraps(decorated_func) def wrapper(working_dir: Path, *args: Any, **kwargs: Any) -> None: revocation = test_key_revocation[username][0] keyring_root: Path = working_dir / "keyring" keyring_root.mkdir(parents=True, exist_ok=True) target_dir = keyring_root / keyring_type decomposed_path: Path = convert_certificate( working_dir=working_dir, certificate=revocation, keyring_dir=keyring_root / keyring_type, fingerprint_filter=test_all_fingerprints, ) user_dir = decomposed_path.parent (target_dir / user_dir.name).mkdir(parents=True, exist_ok=True) copytree(src=user_dir, dst=(target_dir / user_dir.name), dirs_exist_ok=True) decorated_func(working_dir=working_dir, *args, **kwargs) return wrapper if not func: return decorator return decorator(func) def create_signature_revocation( issuer: Username, certified: Username, uid: Uid, func: Optional[Callable[[Any], None]] = None ) -> Callable[..., Any]: def decorator(decorated_func: Callable[..., None]) -> Callable[..., Any]: @wraps(decorated_func) def wrapper(working_dir: Path, *args: Any, **kwargs: Any) -> None: issuer_key: Path = test_keys[issuer][0] keyring_root: Path = working_dir / "keyring" keyring_certificate: Path = test_keyring_certificates[certified][0] certified_fingerprint = keyring_certificate.name with NamedTemporaryFile(dir=str(working_dir), prefix=f"{certified}", suffix=".asc") as certificate: certificate_path: Path = Path(certificate.name) export( working_dir=working_dir, keyring_root=keyring_root, sources=[keyring_certificate], output=certificate_path, ) with TemporaryDirectory(prefix="gnupg") as gnupg_home: env = {"GNUPGHOME": gnupg_home} print( system( [ "gpg", "--no-auto-check-trustdb", "--import", f"{str(issuer_key)}", f"{str(certificate_path)}", ], env=env, ) ) uid_confirmations = "" for cert_uid in test_certificate_uids[certified][0]: if uid == cert_uid: uid_confirmations += "y\n" else: uid_confirmations += "n\n" commands = Popen(["echo", "-e", f"{uid_confirmations}y\n0\ny\n\ny\ny\nsave\n"], stdout=PIPE) system( [ "gpg", "--no-auto-check-trustdb", "--command-fd", "0", "--expert", "--yes", "--batch", "--edit-key", f"{certified_fingerprint}", "revsig", "save", ], _stdin=commands.stdout, env=env, ) revoked_certificate = system(["gpg", "--armor", "--export", f"{certified_fingerprint}"], env=env) certificate.truncate(0) certificate.seek(0) certificate.write(revoked_certificate.encode()) certificate.flush() target_dir = keyring_root / "packager" decomposed_path: Path = convert_certificate( working_dir=working_dir, certificate=certificate_path, keyring_dir=target_dir, fingerprint_filter=test_all_fingerprints, ) user_dir = decomposed_path.parent (target_dir / user_dir.name).mkdir(parents=True, exist_ok=True) copytree(src=user_dir, dst=(target_dir / user_dir.name), dirs_exist_ok=True) decorated_func(working_dir=working_dir, *args, **kwargs) return wrapper if not func: return decorator return decorator(func) @fixture(scope="function") def working_dir() -> Generator[Path, None, None]: with TemporaryDirectory(prefix="arch-keyringctl-test-") as tempdir: path: Path = Path(tempdir) with cwd(path): yield path @fixture(scope="function") def keyring_dir(working_dir: Path) -> Generator[Path, None, None]: yield working_dir / "keyring"