chore(keyringctl): move cert depth first search to own function

This way we can reuse the functionality which will allow to simplify a
lot of places that have multiple layers of inflexible nested loops.
This commit is contained in:
Levente Polyak 2021-10-22 20:17:50 +02:00
parent 3776ca942c
commit 2d1eb93a39
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8

View File

@ -139,6 +139,33 @@ def system(cmd: List[str], exit_on_error: bool = False) -> str:
raise e raise e
def get_cert_paths(paths: Iterable[Path]) -> Set[Path]:
"""Walks a list of paths and resolves all discovered certificate paths
Parameters
----------
paths: Iterable[Path]
A list of paths to walk and resolve to certificate paths.
Returns
-------
List[Path]
The list of paths to certificates
"""
# depth first search certificate paths
cert_paths: Set[Path] = set()
visit: List[Path] = list(paths)
while visit:
path = visit.pop()
# this level contains a certificate, abort depth search
if list(path.glob("*.asc")):
cert_paths.add(path)
continue
visit.extend([path for path in path.iterdir() if path.is_dir()])
return cert_paths
# TODO: simplify to lower complexity # TODO: simplify to lower complexity
def convert_certificate( # noqa: ignore=C901 def convert_certificate( # noqa: ignore=C901
working_dir: Path, working_dir: Path,
@ -1099,21 +1126,11 @@ def export(
if not source.exists() and packager_source.exists(): if not source.exists() and packager_source.exists():
sources[index] = packager_source sources[index] = packager_source
# depth first search certificate paths
cert_dirs: Set[Path] = set()
visit: List[Path] = sources
while visit:
path = visit.pop()
# this level contains a certificate, abort depth search
if list(path.glob("*.asc")):
cert_dirs.add(path)
continue
visit.extend([path for path in path.iterdir() if path.is_dir()])
temp_dir = Path(mkdtemp(dir=working_dir, prefix="arch-keyringctl-export-join-")).absolute() temp_dir = Path(mkdtemp(dir=working_dir, prefix="arch-keyringctl-export-join-")).absolute()
cert_paths: Set[Path] = get_cert_paths(sources)
certificates: List[Path] = []
certificates = [] for cert_dir in sorted(cert_paths):
for cert_dir in sorted(cert_dirs):
cert_path = temp_dir / f"{cert_dir.name}.asc" cert_path = temp_dir / f"{cert_dir.name}.asc"
debug(f"Joining {cert_dir} in {cert_path}") debug(f"Joining {cert_dir} in {cert_path}")
packet_join( packet_join(