diff --git a/keyringctl b/keyringctl index a512104..e459b8f 100755 --- a/keyringctl +++ b/keyringctl @@ -80,7 +80,7 @@ def system(cmd: List[str], exit_on_error: bool = True) -> str: def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Path: certificate_fingerprint: Optional[str] = None pubkey: Optional[Path] = None - direct_keys: List[Path] = [] + direct_keys: Dict[str, Dict[str, List[Path]]] = {} current_packet_mode: Optional[str] = None current_packet_key: Optional[str] = None uids: Dict[str, Path] = {} @@ -116,15 +116,17 @@ def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Pat elif packet.name.endswith('--Signature'): if not certificate_fingerprint: raise Exception('missing certificate fingerprint for "{packet.name}"') + if not current_packet_key: + raise Exception('missing current packet key for "{packet.name}"') issuer = packet_dump_field(packet, 'Issuer') signature_type = packet_dump_field(packet, 'Type') - # TODO: handle Revocation key via self Issuer if signature_type == 'DirectKey': - direct_keys.append(packet) - # TODO - breakpoint() + if not direct_keys.get(current_packet_key): + direct_keys = direct_keys | {current_packet_key: defaultdict(list)} + + direct_keys[current_packet_key][issuer].append(packet) continue if not current_packet_key: @@ -169,16 +171,136 @@ def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Pat root_dir = (working_dir / certificate_fingerprint) root_dir.mkdir() - # TODO: DirectKeys + persist_basic_key( + certificate_fingerprint=certificate_fingerprint, + pubkey=pubkey, + root_dir=root_dir, + subkey=subkey, + subkey_binding_sig=subkey_binding_sig, + uid_binding_sig=uid_binding_sig, + uids=uids, + ) + + persist_direct_keys( + direct_keys=direct_keys, + pubkey=pubkey, + root_dir=root_dir, + ) + + persist_certifications( + certifications=certifications, + pubkey=pubkey, + root_dir=root_dir, + uid_binding_sig=uid_binding_sig, + uids=uids, + ) + + persist_revocations( + pubkey=pubkey, + revocations=revocations, + root_dir=root_dir, + uid_binding_sig=uid_binding_sig, + uids=uids, + ) + + return root_dir + + +def persist_basic_key( + certificate_fingerprint: str, + pubkey: Path, + root_dir: Path, + subkey: Dict[str, Path], + subkey_binding_sig: Dict[str, Path], + uid_binding_sig: Dict[str, Path], + uids: Dict[str, Path], +) -> None: + """Persist the basic key material of a root key to file + + The basic key material consists of the root key's public key, any PublicSubkeys and their SubkeyBindings, all User + IDs and the per User ID PositiveCertifications. + The file is written to root_dir and is named after the root key's certificate fingerprint. + + Parameters + ---------- + certificate_fingerprint: str + The unique fingerprint of the public key + pubkey: Path + The path to the public key of the root key + root_dir: Path + The root directory below which the basic key material is persisted + subkey: Dict[str, Path] + The PublicSubkeys of a key + subkey_binding_sig: Dict[str, Path] + The SubkeyBinding signatures of a Public-Key (the root key) + uid_binding_sig: Dict[str, Path] + The PositiveCertifications of a User ID and Public-Key packet + uids: Dict[str, Path] + The User IDs of a Public-Key (the root key) + """ + packets: List[Path] = [pubkey] - packets.extend(direct_keys) for key in uid_binding_sig.keys(): packets.extend([uids[key], uid_binding_sig[key]]) for key in subkey_binding_sig.keys(): packets.extend([subkey[key], subkey_binding_sig[key]]) - minimal_certificate = root_dir / f'{certificate_fingerprint}.asc' - packet_join(packets, minimal_certificate) + packet_join(packets, root_dir / f'{certificate_fingerprint}.asc') + + +def persist_direct_keys( + direct_keys: Dict[str, Dict[str, List[Path]]], + pubkey: Path, + root_dir: Path, +) -> None: + """Persist the DirectKeys on a root key to file(s) + + Parameters + ---------- + certifications: Dict[str, List[Path]] + The certifications to write to file + pubkey: Path + The path to the public key of the root key + root_dir: Path + The root directory below which the Directkeys are persisted + """ + + for key, current_certifications in direct_keys.items(): + for issuer, certifications in current_certifications.items(): + direct_key_dir = root_dir / 'certification' + direct_key_dir.mkdir(parents=True, exist_ok=True) + packets = [pubkey] + certifications + output_file = direct_key_dir / f'{issuer}.asc' + debug(f'Writing file {output_file} from {[str(cert) for cert in certifications]}') + packet_join(packets, output_file) + + +def persist_certifications( + certifications: Dict[str, List[Path]], + pubkey: Path, + root_dir: Path, + uid_binding_sig: Dict[str, Path], + uids: Dict[str, Path], +) -> None: + """Persist the certifications of a root key to file(s) + + The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and + PositiveCertifications for all User IDs of the given root key. + All certifications are persisted in per User ID certification directories below root_dir. + + Parameters + ---------- + certifications: Dict[str, List[Path]] + The certifications to write to file + pubkey: Path + The path to the public key of the root key + root_dir: Path + The root directory below which certifications are persisted + uid_binding_sig: Dict[str, Path] + The PositiveCertifications of a User ID and Public-Key packet + uids: Dict[str, Path] + The User IDs of a Public-Key (the root key) + """ for key, current_certifications in certifications.items(): for certification in current_certifications: @@ -199,6 +321,33 @@ def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Pat debug(f'Writing file {output_file} from {certification}') packet_join(packets, output_file) + +def persist_revocations( + pubkey: Path, + revocations: Dict[str, List[Path]], + root_dir: Path, + uid_binding_sig: Dict[str, Path], + uids: Dict[str, Path], +) -> None: + """Persist the revocations of a root key to file(s) + + The revocations include all CertificationRevocations for all User IDs of the given root key. + All revocations are persisted in per User ID 'revocation' directories below root_dir. + + Parameters + ---------- + pubkey: Path + The path to the public key of the root key + revocations: Dict[str, List[Path]] + The revocations to write to file + root_dir: Path + The root directory below which revocations will be persisted + uid_binding_sig: Dict[str, Path] + The PositiveCertifications of a User ID and Public-Key packet + uids: Dict[str, Path] + The User IDs of a Public-Key (the root key) + """ + for key, current_revocations in revocations.items(): for revocation in current_revocations: revocation_dir = root_dir / key / 'revocation' @@ -214,8 +363,6 @@ def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Pat debug(f'Writing file {output_file} from {revocation}') packet_join(packets, output_file) - return root_dir - def packet_dump(packet: Path) -> str: return system(['sq', 'packet', 'dump', str(packet)]) @@ -288,9 +435,12 @@ if __name__ == '__main__': parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory') subcommands = parser.add_subparsers(dest="subcommand") - convert_parser = subcommands.add_parser('convert') - convert_parser.add_argument('source', type=absolute_path, help='File or directory') - convert_parser.add_argument('--target', type=absolute_path, help='Target directory') + convert_parser = subcommands.add_parser( + 'convert', + help="convert a legacy-style PGP cert or directory containing PGP certs to the new format", + ) + convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert') + convert_parser.add_argument('--target', type=absolute_path, help='target directory') import_parser = subcommands.add_parser('import') import_parser.add_argument('source', type=absolute_path, help='File or directory')