keyringctl: Add writing to keyring output file
keyringctl: Change `packet_join()` to add documentation and a `force` parameter with which sq's force parameter may be toggled (defaults to False). Add `export_keyring()` to allow writing all provided PGP packet files to a single output file using `sq keyring merge`. Change `__main__` to add an `export` subcommand to allow for providing multiple input sources and one output file. Add an optional `-f/--force` parameter that can be used to force subcommands that support it. Remove the unused `start_dir` variable. Move the creation of `target_dir` below the context that creates the working directory and only create it when using the `convert` or `import` subcommands (as it is not used otherwise). Call `export_keyring()` when using the `export` subcommand.
This commit is contained in:
parent
7f7c2f13f0
commit
c4fbd95041
110
keyringctl
110
keyringctl
@ -496,8 +496,22 @@ def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
|
||||
return natural_sort_path(packet_dir.iterdir())
|
||||
|
||||
|
||||
def packet_join(packets: List[Path], output: Path) -> None:
|
||||
def packet_join(packets: List[Path], output: Path, force: bool = False) -> None:
|
||||
"""Join PGP packet data in files to a single output file
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packets: List[Path]
|
||||
A list of paths to files that contain PGP packet data
|
||||
output: Path
|
||||
A file to which all PGP packet data is written
|
||||
force: bool
|
||||
Whether to force the execution of sq (defaults to False)
|
||||
"""
|
||||
|
||||
cmd = ['sq', 'packet', 'join']
|
||||
if force:
|
||||
cmd.insert(1, '--force')
|
||||
packets_str = list(map(lambda path: str(path), packets))
|
||||
cmd.extend(packets_str)
|
||||
cmd.extend(['--output', str(output)])
|
||||
@ -541,6 +555,61 @@ def keyring_import(working_dir: Path, source: Path, target_dir: Optional[Path] =
|
||||
pass
|
||||
|
||||
|
||||
def export_keyring(working_dir: Path, sources: List[Path], output: Path, force: bool) -> None:
|
||||
"""Export all provided PGP packet files to a single output file
|
||||
|
||||
If sources contains directories, any .asc files below them are considered.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
working_dir: Path
|
||||
A directory to use for temporary files
|
||||
sources: List[Path]
|
||||
A list of directories or files from which to read PGP packet information
|
||||
output: Path
|
||||
An output file that all PGP packet data is written to
|
||||
force: bool
|
||||
Whether to force the execution of packet_join()
|
||||
"""
|
||||
|
||||
sources = [source.absolute() for source in sources]
|
||||
cert_dir = Path(mkdtemp(dir=working_dir)).absolute()
|
||||
output = output.absolute()
|
||||
certs: List[Path] = []
|
||||
|
||||
debug(f"Creating keyring {output} from {[str(source_dir) for source_dir in sources]}.")
|
||||
|
||||
for source_number, source in enumerate(sources):
|
||||
if source.is_dir():
|
||||
for user_number, user_dir in enumerate(sorted(source.iterdir())):
|
||||
if user_dir.is_dir():
|
||||
for user_cert_number, user_cert_dir in enumerate(sorted(user_dir.iterdir())):
|
||||
if user_cert_dir.is_dir():
|
||||
cert_path = (
|
||||
cert_dir
|
||||
/ (
|
||||
f"{str(source_number).zfill(4)}"
|
||||
f"-{str(user_number).zfill(4)}"
|
||||
f"-{str(user_cert_number).zfill(4)}.asc"
|
||||
)
|
||||
)
|
||||
debug(f"Joining {user_dir.name}/{user_cert_dir.name} in {cert_path}.")
|
||||
packet_join(
|
||||
packets=sorted(user_cert_dir.glob("**/*.asc")),
|
||||
output=cert_path,
|
||||
force=force,
|
||||
)
|
||||
certs.append(cert_path)
|
||||
elif source.is_file() and not source.is_symlink():
|
||||
certs.append(source)
|
||||
|
||||
cmd = ['sq', 'keyring', 'merge', '-o', str(output)]
|
||||
if force:
|
||||
cmd.insert(1, '--force')
|
||||
cmd += [str(cert) for cert in sorted(certs)]
|
||||
system(cmd, exit_on_error=False)
|
||||
|
||||
|
||||
def absolute_path(path: str) -> Path:
|
||||
return Path(path).absolute()
|
||||
|
||||
@ -550,6 +619,13 @@ if __name__ == '__main__':
|
||||
parser.add_argument('-v', '--verbose', action='store_true',
|
||||
help='Causes to print debugging messages about the progress')
|
||||
parser.add_argument('--wait', action='store_true', help='Block before cleaning up the temp directory')
|
||||
parser.add_argument(
|
||||
'-f',
|
||||
'--force',
|
||||
action='store_true',
|
||||
default=False,
|
||||
help='force the execution of subcommands (e.g. overwriting of files)',
|
||||
)
|
||||
subcommands = parser.add_subparsers(dest="subcommand")
|
||||
|
||||
convert_parser = subcommands.add_parser(
|
||||
@ -569,29 +645,45 @@ if __name__ == '__main__':
|
||||
import_parser.add_argument('source', type=absolute_path, help='File or directory')
|
||||
import_parser.add_argument('--target', type=absolute_path, help='Target directory')
|
||||
|
||||
export_parser = subcommands.add_parser(
|
||||
'export',
|
||||
help="export a directory structure of PGP packet data to a combined file",
|
||||
)
|
||||
export_parser.add_argument('output', type=absolute_path, help='file to write PGP packet data to')
|
||||
export_parser.add_argument(
|
||||
'-s',
|
||||
'--source',
|
||||
action="append",
|
||||
help='files or directories containing PGP packet data (can be provided multiple times)',
|
||||
required=True,
|
||||
type=absolute_path,
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
basicConfig(level=DEBUG)
|
||||
|
||||
if args.target:
|
||||
args.target.mkdir(parents=True, exist_ok=True)
|
||||
target_dir = args.target
|
||||
else:
|
||||
# persistent target directory
|
||||
target_dir = Path(mkdtemp()).absolute()
|
||||
|
||||
# temporary working directory that gets auto cleaned
|
||||
with TemporaryDirectory() as tempdir:
|
||||
working_dir = Path(tempdir)
|
||||
start_dir = Path().absolute()
|
||||
chdir(working_dir)
|
||||
debug(f'Working directory: {working_dir}')
|
||||
|
||||
if args.subcommand in ["convert", "import"]:
|
||||
if args.target:
|
||||
args.target.mkdir(parents=True, exist_ok=True)
|
||||
target_dir = args.target
|
||||
else:
|
||||
# persistent target directory
|
||||
target_dir = Path(mkdtemp()).absolute()
|
||||
|
||||
if 'convert' == args.subcommand:
|
||||
print(convert(working_dir, args.source, target_dir))
|
||||
elif 'import' == args.subcommand:
|
||||
keyring_import(working_dir, args.source, target_dir)
|
||||
elif 'export' == args.subcommand:
|
||||
export_keyring(working_dir=working_dir, sources=args.source, output=args.output, force=args.force)
|
||||
|
||||
if args.wait:
|
||||
print('Press [ENTER] to continue')
|
||||
|
Loading…
Reference in New Issue
Block a user