2021-10-24 07:37:27 -05:00
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
|
|
|
|
from collections.abc import Iterable
|
|
|
|
from collections.abc import Iterator
|
|
|
|
from contextlib import contextmanager
|
|
|
|
from os import chdir
|
|
|
|
from os import getcwd
|
|
|
|
from pathlib import Path
|
|
|
|
from re import split
|
2021-10-24 14:49:55 -05:00
|
|
|
from subprocess import STDOUT
|
2021-10-24 07:37:27 -05:00
|
|
|
from subprocess import CalledProcessError
|
|
|
|
from subprocess import check_output
|
|
|
|
from sys import exit
|
|
|
|
from sys import stderr
|
2021-10-26 12:24:30 -05:00
|
|
|
from tempfile import mkstemp
|
2021-10-24 07:37:27 -05:00
|
|
|
from traceback import print_stack
|
2021-10-24 14:49:55 -05:00
|
|
|
from typing import IO
|
|
|
|
from typing import AnyStr
|
2021-11-01 11:37:00 -06:00
|
|
|
from typing import Dict
|
2021-10-24 07:37:27 -05:00
|
|
|
from typing import List
|
2021-10-24 14:49:55 -05:00
|
|
|
from typing import Optional
|
2021-10-29 17:59:23 -05:00
|
|
|
from typing import Set
|
2021-10-24 07:37:27 -05:00
|
|
|
from typing import Union
|
|
|
|
|
2021-10-29 17:59:23 -05:00
|
|
|
from libkeyringctl.types import Fingerprint
|
2021-11-01 11:37:00 -06:00
|
|
|
from libkeyringctl.types import Trust
|
2021-10-29 17:59:23 -05:00
|
|
|
|
2021-10-24 07:37:27 -05:00
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
def cwd(new_dir: Path) -> Iterator[None]:
|
|
|
|
"""Change to a new current working directory in a context and go back to the previous dir after the context is done
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
new_dir: A path to change to
|
|
|
|
"""
|
|
|
|
|
|
|
|
previous_dir = getcwd()
|
|
|
|
chdir(new_dir)
|
|
|
|
try:
|
|
|
|
yield
|
|
|
|
finally:
|
|
|
|
chdir(previous_dir)
|
|
|
|
|
|
|
|
|
|
|
|
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
|
|
|
|
"""Sort an Iterable of Paths naturally
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
_list: An iterable containing paths to be sorted
|
|
|
|
|
|
|
|
Return
|
|
|
|
------
|
|
|
|
An Iterable of paths that are naturally sorted
|
|
|
|
"""
|
|
|
|
|
|
|
|
def convert_text_chunk(text: str) -> Union[int, str]:
|
|
|
|
"""Convert input text to int or str
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
text: An input string
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Either an integer if text is a digit, else text in lower-case representation
|
|
|
|
"""
|
|
|
|
|
|
|
|
return int(text) if text.isdigit() else text.lower()
|
|
|
|
|
|
|
|
def alphanum_key(key: Path) -> List[Union[int, str]]:
|
|
|
|
"""Retrieve an alphanumeric key from a Path, that can be used in sorted()
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
key: A path for which to create a key
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
A list of either int or str objects that may serve as 'key' argument for sorted()
|
|
|
|
"""
|
|
|
|
|
|
|
|
return [convert_text_chunk(c) for c in split("([0-9]+)", str(key.name))]
|
|
|
|
|
|
|
|
return sorted(_list, key=alphanum_key)
|
|
|
|
|
|
|
|
|
2021-11-04 12:26:11 -06:00
|
|
|
def system(
|
|
|
|
cmd: List[str],
|
|
|
|
_stdin: Optional[IO[AnyStr]] = None,
|
|
|
|
exit_on_error: bool = False,
|
|
|
|
env: Optional[Dict[str, str]] = None,
|
|
|
|
) -> str:
|
2021-10-24 07:37:27 -05:00
|
|
|
"""Execute a command using check_output
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
cmd: A list of strings to be fed to check_output
|
2021-10-24 14:49:55 -05:00
|
|
|
_stdin: input fd used for the spawned process
|
2021-10-24 07:37:27 -05:00
|
|
|
exit_on_error: Whether to exit the script when encountering an error (defaults to False)
|
2021-11-04 12:26:11 -06:00
|
|
|
env: Optional environment vars for the shell invocation
|
2021-10-24 07:37:27 -05:00
|
|
|
|
|
|
|
Raises
|
|
|
|
------
|
|
|
|
CalledProcessError: If not exit_on_error and `check_output()` encounters an error
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
The output of cmd
|
|
|
|
"""
|
2021-11-04 12:26:11 -06:00
|
|
|
if not env:
|
|
|
|
env = {}
|
2021-10-24 07:37:27 -05:00
|
|
|
|
|
|
|
try:
|
2021-11-04 12:26:11 -06:00
|
|
|
return check_output(cmd, stderr=STDOUT, stdin=_stdin, env=env).decode()
|
2021-10-24 07:37:27 -05:00
|
|
|
except CalledProcessError as e:
|
2021-11-07 14:22:00 -06:00
|
|
|
stderr.buffer.write(e.stdout)
|
2021-10-24 07:37:27 -05:00
|
|
|
print_stack()
|
|
|
|
if exit_on_error:
|
|
|
|
exit(e.returncode)
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
|
|
def absolute_path(path: str) -> Path:
|
|
|
|
"""Return the absolute path of a given str
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
path: A string representing a path
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
The absolute path representation of path
|
|
|
|
"""
|
|
|
|
|
|
|
|
return Path(path).absolute()
|
2021-10-26 12:24:30 -05:00
|
|
|
|
|
|
|
|
|
|
|
def transform_fd_to_tmpfile(working_dir: Path, sources: List[Path]) -> None:
|
|
|
|
"""Transforms an input list of paths from any file descriptor of the current process to a tempfile in working_dir.
|
|
|
|
|
|
|
|
Using this function on fd inputs allow to pass the content to another process while hidepid is active and /proc
|
|
|
|
not visible for the other process.
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
working_dir: A directory to use for temporary files
|
|
|
|
sources: Paths that should be iterated and all fd's transformed to tmpfiles
|
|
|
|
"""
|
|
|
|
for index, source in enumerate(sources):
|
|
|
|
if str(source).startswith("/proc/self/fd"):
|
|
|
|
file = mkstemp(dir=working_dir, prefix=f"{source.name}", suffix=".fd")[1]
|
|
|
|
with open(file, mode="wb") as f:
|
|
|
|
f.write(source.read_bytes())
|
|
|
|
f.flush()
|
|
|
|
sources[index] = Path(file)
|
2021-10-29 17:59:23 -05:00
|
|
|
|
|
|
|
|
|
|
|
def get_cert_paths(paths: Iterable[Path]) -> Set[Path]:
|
|
|
|
"""Walks a list of paths and resolves all discovered certificate paths
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
paths: A list of paths to walk and resolve to certificate paths.
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
A set of paths to certificates
|
|
|
|
"""
|
|
|
|
|
|
|
|
# depth first search certificate paths
|
|
|
|
cert_paths: Set[Path] = set()
|
|
|
|
visit: List[Path] = list(paths)
|
|
|
|
while visit:
|
|
|
|
path = visit.pop()
|
|
|
|
# this level contains a certificate, abort depth search
|
|
|
|
if list(path.glob("*.asc")):
|
|
|
|
cert_paths.add(path)
|
|
|
|
continue
|
|
|
|
visit.extend([path for path in path.iterdir() if path.is_dir()])
|
|
|
|
return cert_paths
|
|
|
|
|
|
|
|
|
|
|
|
def get_parent_cert_paths(paths: Iterable[Path]) -> Set[Path]:
|
|
|
|
"""Walks a list of paths upwards and resolves all discovered parent certificate paths
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
paths: A list of paths to walk and resolve to certificate paths.
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
A set of paths to certificates
|
|
|
|
"""
|
|
|
|
|
|
|
|
# depth first search certificate paths
|
|
|
|
cert_paths: Set[Path] = set()
|
|
|
|
visit: List[Path] = list(paths)
|
|
|
|
while visit:
|
|
|
|
node = visit.pop().parent
|
|
|
|
# this level contains a certificate, abort depth search
|
|
|
|
if "keyring" == node.parent.parent.parent.name:
|
|
|
|
cert_paths.add(node)
|
|
|
|
continue
|
|
|
|
visit.append(node)
|
|
|
|
return cert_paths
|
|
|
|
|
|
|
|
|
|
|
|
def contains_fingerprint(fingerprints: Iterable[Fingerprint], fingerprint: Fingerprint) -> bool:
|
2021-11-01 11:37:00 -06:00
|
|
|
"""Returns weather an iterable structure of fingerprints contains a specific fingerprint
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
fingerprints: Iteratable structure of fingerprints that should be searched
|
|
|
|
fingerprint: Fingerprint to search for
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Weather an iterable structure of fingerprints contains a specific fingerprint
|
|
|
|
"""
|
|
|
|
|
2021-10-29 17:59:23 -05:00
|
|
|
return any(filter(lambda e: str(e).endswith(fingerprint), fingerprints))
|
2021-11-01 11:37:00 -06:00
|
|
|
|
|
|
|
|
2021-11-04 12:26:11 -06:00
|
|
|
def get_fingerprint_from_partial(
|
|
|
|
fingerprints: Iterable[Fingerprint], fingerprint: Fingerprint
|
|
|
|
) -> Optional[Fingerprint]:
|
|
|
|
"""Returns the full fingerprint looked up from a partial fingerprint like a key-id
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
fingerprints: Iteratable structure of fingerprints that should be searched
|
|
|
|
fingerprint: Partial fingerprint to search for
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
The full fingerprint or None
|
|
|
|
"""
|
|
|
|
|
|
|
|
for fingerprint in filter(lambda e: str(e).endswith(fingerprint), fingerprints):
|
|
|
|
return fingerprint
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2021-11-01 11:37:00 -06:00
|
|
|
def filter_fingerprints_by_trust(trusts: Dict[Fingerprint, Trust], trust: Trust) -> List[Fingerprint]:
|
|
|
|
"""Filters a dict of Fingerprint to Trust by a passed Trust parameter and returns the matching fingerprints.
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
trusts: Dict of Fingerprint to Trust that should be filtered based on the trust parameter
|
|
|
|
trust: Trust that should be used to filter the trusts dict
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
The matching fingerprints of the dict filtered by trust
|
|
|
|
"""
|
|
|
|
|
|
|
|
return list(
|
|
|
|
map(
|
|
|
|
lambda item: item[0],
|
|
|
|
filter(lambda item: trust == item[1], trusts.items()),
|
|
|
|
)
|
|
|
|
)
|