feature(keyringctl): add tool to work with key data

This commit is contained in:
Levente Polyak 2021-08-25 18:53:07 +02:00
parent 4116f94fbb
commit f626e40b84
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8
2 changed files with 319 additions and 1 deletions

View File

@ -8,7 +8,7 @@ indent_size = 4
insert_final_newline = true insert_final_newline = true
trim_trailing_whitespace = true trim_trailing_whitespace = true
[*.py] [{*.py,keyringctl}]
indent_style = space indent_style = space
indent_size = 4 indent_size = 4
max_line_length = 119 max_line_length = 119

318
keyringctl Executable file
View File

@ -0,0 +1,318 @@
#!/usr/bin/env python
from argparse import ArgumentParser
from collections import defaultdict
from os import chdir
from os import getcwd
from pathlib import Path
from shutil import move
from re import escape
from re import split
from re import sub
from subprocess import CalledProcessError
from subprocess import check_output
from subprocess import PIPE
from sys import exit
from sys import stderr
from tempfile import TemporaryDirectory
from tempfile import mkdtemp
from logging import basicConfig
from logging import debug
from logging import error
from logging import DEBUG
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Iterable
from contextlib import contextmanager
@contextmanager
def cwd(new_dir: Path):
previous_dir = getcwd()
chdir(new_dir)
try:
yield
finally:
chdir(previous_dir)
# class Key():
# fingerprint: str = ""
# pubkey: Path
# uids: List[]
# uid_certification: List[]
# subkeys: List[]
# subkey_certification: List[]
# uid_signatures: List[]
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
def convert(text: str) -> Any:
return int(text) if text.isdigit() else text.lower()
def alphanum_key(key: Path) -> List[Any]:
return [convert(c) for c in split('([0-9]+)', str(key.name))]
return sorted(_list, key=alphanum_key)
def system(cmd: List[str], exit_on_error: bool = True) -> str:
try:
return check_output(cmd, stderr=PIPE).decode()
except CalledProcessError as e:
stderr.buffer.write(e.stderr)
if exit_on_error:
exit(e.returncode)
raise e
def convert_certificate(working_dir: Path, certificate: Path, owner: str) -> Path:
certificate_fingerprint: Optional[str] = None
pubkey: Optional[Path] = None
direct_keys: List[Path] = []
current_packet_mode: Optional[str] = None
current_packet_key: Optional[str] = None
uids: Dict[str, Path] = {}
uid_binding_sig: Dict[str, Path] = {}
subkey: Dict[str, Path] = {}
subkey_binding_sig: Dict[str, Path] = {}
certifications: Dict[str, List[Path]] = defaultdict(list)
revocations: Dict[str, List[Path]] = defaultdict(list)
# XXX: KeyRevocation
# XXX: PrimaryKeyBinding
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
for packet in packet_split(working_dir, certificate):
debug(f'Processing packet {packet.name}')
if packet.name.endswith('--PublicKey'):
pubkey = packet
certificate_fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'pubkey'
current_packet_key = certificate_fingerprint
elif packet.name.endswith('--UserID'):
value = packet_dump_field(packet, 'Value')
value = simplify_user_id(value)
current_packet_mode = 'uid'
current_packet_key = value
uids[value] = packet
elif packet.name.endswith('--PublicSubkey'):
fingerprint = packet_dump_field(packet, 'Fingerprint')
current_packet_mode = 'subkey'
current_packet_key = fingerprint
subkey[fingerprint] = packet
elif packet.name.endswith('--Signature'):
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
issuer = packet_dump_field(packet, 'Issuer')
signature_type = packet_dump_field(packet, 'Type')
# TODO: handle Revocation key via self Issuer
if signature_type == 'DirectKey':
direct_keys.append(packet)
# TODO
breakpoint()
continue
if not current_packet_key:
# TODO GenericCertification PersonaCertification CasualCertification PositiveCertification
raise Exception(f'unknown packet key for "{packet.name}"')
if current_packet_mode == 'uid' or current_packet_mode == 'pubkey':
if certificate_fingerprint.endswith(issuer):
if signature_type == 'PositiveCertification':
uid_binding_sig[current_packet_key] = packet
elif signature_type == 'CertificationRevocation':
# XXX:
revocations[current_packet_key].append(packet)
else:
raise Exception(f'unknown signature type: {signature_type}')
else:
if signature_type.endswith('Certification'):
certifications[current_packet_key].append(packet)
elif signature_type == 'CertificationRevocation':
revocations[current_packet_key].append(packet)
else:
raise Exception(f'unknown signature type: {signature_type}')
elif current_packet_mode == 'subkey':
if signature_type == 'SubkeyBinding':
subkey_binding_sig[current_packet_key] = packet
elif signature_type == 'SubkeyRevocation':
# XXX:
pass
else:
raise Exception(f'unknown signature type: {signature_type}')
else:
raise Exception(f'unknown signature root for "{packet.name}"')
else:
raise Exception(f'unknown packet type "{packet.name}"')
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint')
if not pubkey:
raise Exception('missing certificate public-key')
root_dir = (working_dir / certificate_fingerprint)
root_dir.mkdir()
# TODO: DirectKeys
packets: List[Path] = [pubkey]
packets.extend(direct_keys)
for key in uid_binding_sig.keys():
packets.extend([uids[key], uid_binding_sig[key]])
for key in subkey_binding_sig.keys():
packets.extend([subkey[key], subkey_binding_sig[key]])
minimal_certificate = root_dir / f'{certificate_fingerprint}.asc'
packet_join(packets, minimal_certificate)
for key, current_certifications in certifications.items():
for certification in current_certifications:
certification_dir = root_dir / key / 'certification'
certification_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(certification, 'Issuer')
# TODO: find a way to get uid binding for pubkey certs
if key not in uids:
error('missing uid')
breakpoint()
if key not in uid_binding_sig:
error('missing binding sig')
breakpoint()
packets = [pubkey, uids[key], uid_binding_sig[key], certification]
output_file = certification_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {certification}')
packet_join(packets, output_file)
for key, current_revocations in revocations.items():
for revocation in current_revocations:
revocation_dir = root_dir / key / 'revocation'
revocation_dir.mkdir(parents=True, exist_ok=True)
issuer = packet_dump_field(revocation, 'Issuer')
packets = [pubkey, uids[key]]
# Binding sigs only exist for 3rd-party revocations
if key in uid_binding_sig:
packets.append(uid_binding_sig[key])
packets.append(revocation)
output_file = revocation_dir / f'{issuer}.asc'
debug(f'Writing file {output_file} from {revocation}')
packet_join(packets, output_file)
return root_dir
def packet_dump(packet: Path) -> str:
return system(['sq', 'packet', 'dump', str(packet)])
def packet_dump_field(packet: Path, field: str) -> str:
dump = packet_dump(packet)
lines = [line.strip() for line in dump.splitlines()]
lines = list(filter(lambda line: line.startswith(f'{field}: '), lines))
if not lines:
raise Exception(f'Packet has no field "{field}"')
return lines[0].split(maxsplit=1)[1]
def packet_split(working_dir: Path, key: Path) -> Iterable[Path]:
packet_dir = Path(mkdtemp(dir=working_dir)).absolute()
with cwd(packet_dir):
system(['sq', 'packet', 'split', '--prefix', '', str(key)])
return natural_sort_path(packet_dir.iterdir())
def packet_join(packets: List[Path], output: Path) -> None:
cmd = ['sq', 'packet', 'join']
packets_str = list(map(lambda path: str(path), packets))
cmd.extend(packets_str)
cmd.extend(['--output', str(output)])
system(cmd, exit_on_error=False)
def simplify_user_id(user_id: str) -> str:
user_id = user_id.replace('@', '_at_')
user_id = sub('[<>]', '', user_id)
user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id)
return user_id
def convert(working_dir: Path, source: Path, target_dir: Optional[Path] = None) -> Path:
directories: List[Path] = []
if source.is_dir():
for key in source.iterdir():
directories.append(convert_certificate(working_dir, key, 'anthraxx'))
else:
directories.append(convert_certificate(working_dir, source, 'anthraxx'))
if target_dir:
target_dir.mkdir(parents=True, exist_ok=True)
else:
# persistent target directory
target_dir = Path(mkdtemp()).absolute()
for path in directories:
target_dir.mkdir(exist_ok=True)
move(path, target_dir)
return target_dir
def keyring_import(working_dir: Path, source: Path, target_dir: Optional[Path] = None):
pass
def absolute_path(path: str) -> Path:
return Path(path).absolute()
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true',
help='Causes to print debugging messages about the progress')
parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory')
subcommands = parser.add_subparsers(dest="subcommand")
convert_parser = subcommands.add_parser('convert')
convert_parser.add_argument('source', type=absolute_path, help='File or directory')
convert_parser.add_argument('--target', type=absolute_path, help='Target directory')
import_parser = subcommands.add_parser('import')
import_parser.add_argument('source', type=absolute_path, help='File or directory')
import_parser.add_argument('--target', type=absolute_path, help='Target directory')
args = parser.parse_args()
if args.verbose:
basicConfig(level=DEBUG)
# temporary working directory that gets auto cleaned
with TemporaryDirectory() as tempdir:
working_dir = Path(tempdir)
start_dir = Path().absolute()
chdir(working_dir)
debug(f'Working directory: {working_dir}')
if 'convert' == args.subcommand:
print(convert(working_dir, args.source, args.target))
elif 'import' == args.subcommand:
keyring_import(working_dir, args.source, args.target)
if args.wait:
print('Press [ENTER] to continue')
input()