keyringctl: Add documentation to all functions
keyringctl: Add documentation to all functions. Change the inlined functions `convert()` and `alphanum_key()` in `natural_sort_path()` to rely on type Union[int, str] instead of type Any. Change `convert_certificate()` to derive the username using the stem of the provided certificate.
This commit is contained in:
parent
5320f2491e
commit
5170319717
189
keyringctl
189
keyringctl
@ -28,18 +28,26 @@ from logging import basicConfig
|
|||||||
from logging import debug
|
from logging import debug
|
||||||
from logging import DEBUG
|
from logging import DEBUG
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
from typing import List
|
from typing import List
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from typing import Iterable
|
from typing import Iterable
|
||||||
from typing import Tuple
|
from typing import Tuple
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def cwd(new_dir: Path):
|
def cwd(new_dir: Path):
|
||||||
|
"""Change to a new current working directory in a context and go back to the previous dir after the context is done
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
new_dir: Path
|
||||||
|
A path to change to
|
||||||
|
"""
|
||||||
|
|
||||||
previous_dir = getcwd()
|
previous_dir = getcwd()
|
||||||
chdir(new_dir)
|
chdir(new_dir)
|
||||||
try:
|
try:
|
||||||
@ -58,16 +66,75 @@ def cwd(new_dir: Path):
|
|||||||
|
|
||||||
|
|
||||||
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
|
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
|
||||||
def convert(text: str) -> Any:
|
"""Sort an Iterable of Paths naturally
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
_list: Iterable[Path]
|
||||||
|
An iterable containing paths to be sorted
|
||||||
|
|
||||||
|
Return
|
||||||
|
------
|
||||||
|
Iterable[Path]
|
||||||
|
An Iterable of paths that are naturally sorted
|
||||||
|
"""
|
||||||
|
|
||||||
|
def convert(text: str) -> Union[int, str]:
|
||||||
|
"""Convert input text to int or str
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
text: str
|
||||||
|
An input string
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
Union[int, str]
|
||||||
|
Either an integer if text is a digit, else text in lower-case representation
|
||||||
|
"""
|
||||||
|
|
||||||
return int(text) if text.isdigit() else text.lower()
|
return int(text) if text.isdigit() else text.lower()
|
||||||
|
|
||||||
def alphanum_key(key: Path) -> List[Any]:
|
def alphanum_key(key: Path) -> List[Union[int, str]]:
|
||||||
|
"""Retrieve an alphanumeric key from a Path, that can be used in sorted()
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
key: Path
|
||||||
|
A path for which to create a key
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
List[Union[int, str]]
|
||||||
|
A list of either int or str objects that may serve as 'key' argument for sorted()
|
||||||
|
"""
|
||||||
|
|
||||||
return [convert(c) for c in split('([0-9]+)', str(key.name))]
|
return [convert(c) for c in split('([0-9]+)', str(key.name))]
|
||||||
|
|
||||||
return sorted(_list, key=alphanum_key)
|
return sorted(_list, key=alphanum_key)
|
||||||
|
|
||||||
|
|
||||||
def system(cmd: List[str], exit_on_error: bool = True) -> str:
|
def system(cmd: List[str], exit_on_error: bool = True) -> str:
|
||||||
|
"""Execute a command using check_output
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
cmd: List[str]
|
||||||
|
A list of strings to be fed to check_output
|
||||||
|
exit_on_error: bool
|
||||||
|
Whether to exit the script when encountering an error (defaults to True)
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
CalledProcessError
|
||||||
|
If not exit_on_error and `check_output()` encounters an error
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
str
|
||||||
|
The output of cmd
|
||||||
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return check_output(cmd, stderr=PIPE).decode()
|
return check_output(cmd, stderr=PIPE).decode()
|
||||||
except CalledProcessError as e:
|
except CalledProcessError as e:
|
||||||
@ -78,6 +145,32 @@ def system(cmd: List[str], exit_on_error: bool = True) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def convert_certificate(working_dir: Path, certificate: Path, name_override: Optional[str] = None) -> Path:
|
def convert_certificate(working_dir: Path, certificate: Path, name_override: Optional[str] = None) -> Path:
|
||||||
|
"""Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets
|
||||||
|
|
||||||
|
The output directory structure is created per user. The username is derived from the certificate (or overridden).
|
||||||
|
Below the username directory a directory tree describes the public keys componentes split up into certifications
|
||||||
|
and revocations, as well as per subkey and per uid certifications and revocations.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
working_dir: Path
|
||||||
|
The path of the working directory below which to create split certificates
|
||||||
|
certificate: Path
|
||||||
|
The path to a public key certificate
|
||||||
|
name_override: Optional[str]
|
||||||
|
An optional string to override the username in the to be created output directory structure
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
Exception
|
||||||
|
If required PGP packets are not found
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
Path
|
||||||
|
The path of the user_dir (which is located below working_dir)
|
||||||
|
"""
|
||||||
|
|
||||||
certificate_fingerprint: Optional[str] = None
|
certificate_fingerprint: Optional[str] = None
|
||||||
pubkey: Optional[Path] = None
|
pubkey: Optional[Path] = None
|
||||||
direct_sigs: Dict[str, Dict[str, List[Path]]] = {}
|
direct_sigs: Dict[str, Dict[str, List[Path]]] = {}
|
||||||
@ -91,7 +184,7 @@ def convert_certificate(working_dir: Path, certificate: Path, name_override: Opt
|
|||||||
subkey_revocations: Dict[str, Dict[str, Path]] = {}
|
subkey_revocations: Dict[str, Dict[str, Path]] = {}
|
||||||
certifications: Dict[str, List[Path]] = defaultdict(list)
|
certifications: Dict[str, List[Path]] = defaultdict(list)
|
||||||
revocations: Dict[str, List[Path]] = defaultdict(list)
|
revocations: Dict[str, List[Path]] = defaultdict(list)
|
||||||
username = name_override or certificate.name.split(".")[0]
|
username = name_override or certificate.stem
|
||||||
|
|
||||||
def add_packet_to_direct_sigs(
|
def add_packet_to_direct_sigs(
|
||||||
direct_sigs: Dict[str, Dict[str, List[Path]]],
|
direct_sigs: Dict[str, Dict[str, List[Path]]],
|
||||||
@ -492,10 +585,45 @@ def persist_revocations(
|
|||||||
|
|
||||||
|
|
||||||
def packet_dump(packet: Path) -> str:
|
def packet_dump(packet: Path) -> str:
|
||||||
|
"""Dump a PGP packet to string
|
||||||
|
|
||||||
|
The `sq packet dump` command is used to retrieve a dump of information from a PGP packet
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
packet: Path
|
||||||
|
The path to the PGP packet to retrieve the value from
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
str
|
||||||
|
The contents of the packet dump
|
||||||
|
"""
|
||||||
|
|
||||||
return system(['sq', 'packet', 'dump', str(packet)])
|
return system(['sq', 'packet', 'dump', str(packet)])
|
||||||
|
|
||||||
|
|
||||||
def packet_dump_field(packet: Path, field: str) -> str:
|
def packet_dump_field(packet: Path, field: str) -> str:
|
||||||
|
"""Retrieve the value of a field from a PGP packet
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
packet: Path
|
||||||
|
The path to the PGP packet to retrieve the value from
|
||||||
|
field: str
|
||||||
|
The name of the field
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
Exception
|
||||||
|
If the field is not found in the PGP packet
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
str
|
||||||
|
The value of the field found in packet
|
||||||
|
"""
|
||||||
|
|
||||||
dump = packet_dump(packet)
|
dump = packet_dump(packet)
|
||||||
lines = [line.strip() for line in dump.splitlines()]
|
lines = [line.strip() for line in dump.splitlines()]
|
||||||
lines = list(filter(lambda line: line.startswith(f'{field}: '), lines))
|
lines = list(filter(lambda line: line.startswith(f'{field}: '), lines))
|
||||||
@ -510,6 +638,8 @@ def sanitize_certificate_file(working_dir: Path, certificate: Path) -> List[Path
|
|||||||
If the input file holds several certificates, they are split into respective files below working_dir and their
|
If the input file holds several certificates, they are split into respective files below working_dir and their
|
||||||
paths are returned. Else the path to the input certificate file is returned.
|
paths are returned. Else the path to the input certificate file is returned.
|
||||||
|
|
||||||
|
This is done to be able to read all certificates contained in a file (`sq` only reads the first).
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
working_dir: Path
|
working_dir: Path
|
||||||
@ -591,6 +721,19 @@ def packet_join(packets: List[Path], output: Path, force: bool = False) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def simplify_user_id(user_id: str) -> str:
|
def simplify_user_id(user_id: str) -> str:
|
||||||
|
"""Simplify the User ID string to contain more filesystem friendly characters
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
user_id: str
|
||||||
|
A User ID string (e.g. 'Foobar McFooface <foobar@foo.face>')
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
str
|
||||||
|
The simplified representation of user_id
|
||||||
|
"""
|
||||||
|
|
||||||
user_id = user_id.replace('@', '_at_')
|
user_id = user_id.replace('@', '_at_')
|
||||||
user_id = sub('[<>]', '', user_id)
|
user_id = sub('[<>]', '', user_id)
|
||||||
user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id)
|
user_id = sub('[' + escape(r' !@#$%^&*()_-+=[]{}\|;:,.<>/?') + ']', '_', user_id)
|
||||||
@ -603,6 +746,27 @@ def convert(
|
|||||||
target_dir: Path,
|
target_dir: Path,
|
||||||
name_override: Optional[str] = None,
|
name_override: Optional[str] = None,
|
||||||
) -> Path:
|
) -> Path:
|
||||||
|
"""Convert a path containing PGP public key material to a decomposed directory structure
|
||||||
|
|
||||||
|
Any PGP public key material described by source is first sanitized (split) by `sanitize_certificate_file()`.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
working_dir: Path
|
||||||
|
A directory to use for temporary files
|
||||||
|
source: Path
|
||||||
|
A path to a file or directory
|
||||||
|
target_dir: Path
|
||||||
|
A directory path to write the new directory structure to
|
||||||
|
name_override: Optional[str]
|
||||||
|
An optional username override for the call to `convert_certificate()`
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
Path
|
||||||
|
The directory that contains the resulting directory structure (target_dir)
|
||||||
|
"""
|
||||||
|
|
||||||
directories: List[Path] = []
|
directories: List[Path] = []
|
||||||
if source.is_dir():
|
if source.is_dir():
|
||||||
for key in source.iterdir():
|
for key in source.iterdir():
|
||||||
@ -840,6 +1004,19 @@ def export_keyring(
|
|||||||
|
|
||||||
|
|
||||||
def absolute_path(path: str) -> Path:
|
def absolute_path(path: str) -> Path:
|
||||||
|
"""Return the absolute path of a given str
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
path: str
|
||||||
|
A string representing a path
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
Path
|
||||||
|
The absolute path representation of path
|
||||||
|
"""
|
||||||
|
|
||||||
return Path(path).absolute()
|
return Path(path).absolute()
|
||||||
|
|
||||||
|
|
||||||
@ -859,7 +1036,7 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
convert_parser = subcommands.add_parser(
|
convert_parser = subcommands.add_parser(
|
||||||
'convert',
|
'convert',
|
||||||
help="convert a legacy-style PGP cert or directory containing PGP certs to the new format",
|
help="import one or multiple PGP public keys and convert them to a decomposed directory structure",
|
||||||
)
|
)
|
||||||
convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert')
|
convert_parser.add_argument('source', type=absolute_path, help='File or directory to convert')
|
||||||
convert_parser.add_argument('--target', type=absolute_path, help='target directory')
|
convert_parser.add_argument('--target', type=absolute_path, help='target directory')
|
||||||
@ -867,7 +1044,7 @@ if __name__ == '__main__':
|
|||||||
'--name',
|
'--name',
|
||||||
type=str,
|
type=str,
|
||||||
default=None,
|
default=None,
|
||||||
help='override the username to use (only useful when targetting a single file)',
|
help='override the username to use (only useful when using a single file as source)',
|
||||||
)
|
)
|
||||||
|
|
||||||
import_parser = subcommands.add_parser('import')
|
import_parser = subcommands.add_parser('import')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user