diff --git a/keyringctl b/keyringctl index c7ae1bd..2bba305 100755 --- a/keyringctl +++ b/keyringctl @@ -496,8 +496,22 @@ def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]: return natural_sort_path(packet_dir.iterdir()) -def packet_join(packets: List[Path], output: Path) -> None: +def packet_join(packets: List[Path], output: Path, force: bool = False) -> None: + """Join PGP packet data in files to a single output file + + Parameters + ---------- + packets: List[Path] + A list of paths to files that contain PGP packet data + output: Path + A file to which all PGP packet data is written + force: bool + Whether to force the execution of sq (defaults to False) + """ + cmd = ['sq', 'packet', 'join'] + if force: + cmd.insert(1, '--force') packets_str = list(map(lambda path: str(path), packets)) cmd.extend(packets_str) cmd.extend(['--output', str(output)]) @@ -541,6 +555,61 @@ def keyring_import(working_dir: Path, source: Path, target_dir: Optional[Path] = pass +def export_keyring(working_dir: Path, sources: List[Path], output: Path, force: bool) -> None: + """Export all provided PGP packet files to a single output file + + If sources contains directories, any .asc files below them are considered. + + Parameters + ---------- + working_dir: Path + A directory to use for temporary files + sources: List[Path] + A list of directories or files from which to read PGP packet information + output: Path + An output file that all PGP packet data is written to + force: bool + Whether to force the execution of packet_join() + """ + + sources = [source.absolute() for source in sources] + cert_dir = Path(mkdtemp(dir=working_dir)).absolute() + output = output.absolute() + certs: List[Path] = [] + + debug(f"Creating keyring {output} from {[str(source_dir) for source_dir in sources]}.") + + for source_number, source in enumerate(sources): + if source.is_dir(): + for user_number, user_dir in enumerate(sorted(source.iterdir())): + if user_dir.is_dir(): + for user_cert_number, user_cert_dir in enumerate(sorted(user_dir.iterdir())): + if user_cert_dir.is_dir(): + cert_path = ( + cert_dir + / ( + f"{str(source_number).zfill(4)}" + f"-{str(user_number).zfill(4)}" + f"-{str(user_cert_number).zfill(4)}.asc" + ) + ) + debug(f"Joining {user_dir.name}/{user_cert_dir.name} in {cert_path}.") + packet_join( + packets=sorted(user_cert_dir.glob("**/*.asc")), + output=cert_path, + force=force, + ) + certs.append(cert_path) + elif source.is_file() and not source.is_symlink(): + certs.append(source) + + cmd = ['sq', 'keyring', 'merge', '-o', str(output)] + if force: + cmd.insert(1, '--force') + cmd += [str(cert) for cert in sorted(certs)] + system(cmd, exit_on_error=False) + + def absolute_path(path: str) -> Path: return Path(path).absolute() @@ -550,6 +619,13 @@ if __name__ == '__main__': parser.add_argument('-v', '--verbose', action='store_true', help='Causes to print debugging messages about the progress') parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory') + parser.add_argument( + '-f', + '--force', + action='store_true', + default=False, + help='force the execution of subcommands (e.g. overwriting of files)', + ) subcommands = parser.add_subparsers(dest="subcommand") convert_parser = subcommands.add_parser( @@ -569,29 +645,45 @@ if __name__ == '__main__': import_parser.add_argument('source', type=absolute_path, help='File or directory') import_parser.add_argument('--target', type=absolute_path, help='Target directory') + export_parser = subcommands.add_parser( + 'export', + help="export a directory structure of PGP packet data to a combined file", + ) + export_parser.add_argument('output', type=absolute_path, help='file to write PGP packet data to') + export_parser.add_argument( + '-s', + '--source', + action="append", + help='files or directories containing PGP packet data (can be provided multiple times)', + required=True, + type=absolute_path, + ) + args = parser.parse_args() if args.verbose: basicConfig(level=DEBUG) - if args.target: - args.target.mkdir(parents=True, exist_ok=True) - target_dir = args.target - else: - # persistent target directory - target_dir = Path(mkdtemp()).absolute() - # temporary working directory that gets auto cleaned with TemporaryDirectory() as tempdir: working_dir = Path(tempdir) - start_dir = Path().absolute() chdir(working_dir) debug(f'Working directory: {working_dir}') + if args.subcommand in ["convert", "import"]: + if args.target: + args.target.mkdir(parents=True, exist_ok=True) + target_dir = args.target + else: + # persistent target directory + target_dir = Path(mkdtemp()).absolute() + if 'convert' == args.subcommand: print(convert(working_dir, args.source, target_dir)) elif 'import' == args.subcommand: keyring_import(working_dir, args.source, target_dir) + elif 'export' == args.subcommand: + export_keyring(working_dir=working_dir, sources=args.source, output=args.output, force=args.force) if args.wait: print('Press [ENTER] to continue')