From 099df52a0452e35830563632af4963e080734a83 Mon Sep 17 00:00:00 2001 From: Levente Polyak Date: Wed, 20 Jul 2022 18:27:42 +0200 Subject: [PATCH] feature(keyringctl): support query expressions for packet field selection Instead of simply string matching a line, we now traverse the packet as a tree and match the path based on a depth first search. While traversing, we support logical OR and current depth * wildcard processed as a component based query expression. Callee's are adjusted to specifically select the appropriate Issuer at the correct depth. Fixes #185 --- libkeyringctl/keyring.py | 18 ++++-- libkeyringctl/sequoia.py | 65 +++++++++++++++++--- libkeyringctl/verify.py | 18 +++--- tests/test_sequoia.py | 126 ++++++++++++++++++++++++++++++++++++--- 4 files changed, 199 insertions(+), 28 deletions(-) diff --git a/libkeyringctl/keyring.py b/libkeyringctl/keyring.py index 3deecbc..cd6085e 100644 --- a/libkeyringctl/keyring.py +++ b/libkeyringctl/keyring.py @@ -125,8 +125,10 @@ def convert_pubkey_signature_packet( if not current_packet_fingerprint: raise Exception('missing current packet fingerprint for "{packet.name}"') - signature_type = packet_dump_field(packet=packet, field="Type") - issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer"))) + signature_type = packet_dump_field(packet=packet, query="Type") + issuer = get_fingerprint_from_partial( + fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Hashed area|Unhashed area.Issuer")) + ) if not issuer: debug(f"failed to resolve partial fingerprint {issuer}, skipping packet") @@ -165,8 +167,10 @@ def convert_uid_signature_packet( if not current_packet_uid: raise Exception('missing current packet uid for "{packet.name}"') - signature_type = packet_dump_field(packet=packet, field="Type") - issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer"))) + signature_type = packet_dump_field(packet=packet, query="Type") + issuer = get_fingerprint_from_partial( + fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Hashed area|Unhashed area.Issuer")) + ) if not issuer: debug(f"failed to resolve partial fingerprint {issuer}, skipping packet") @@ -207,8 +211,10 @@ def convert_subkey_signature_packet( if not current_packet_fingerprint: raise Exception('missing current packet fingerprint for "{packet.name}"') - signature_type = packet_dump_field(packet=packet, field="Type") - issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer"))) + signature_type = packet_dump_field(packet=packet, query="Type") + issuer = get_fingerprint_from_partial( + fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Hashed area|Unhashed area.Issuer")) + ) if not issuer: debug(f"failed to resolve partial fingerprint {issuer}, skipping packet") diff --git a/libkeyringctl/sequoia.py b/libkeyringctl/sequoia.py index 8eb368c..941f5e0 100644 --- a/libkeyringctl/sequoia.py +++ b/libkeyringctl/sequoia.py @@ -1,5 +1,6 @@ # SPDX-License-Identifier: GPL-3.0-or-later +from collections import deque from datetime import datetime from functools import reduce from pathlib import Path @@ -176,13 +177,23 @@ def packet_dump(packet: Path) -> str: return system(["sq", "packet", "dump", str(packet)]) -def packet_dump_field(packet: Path, field: str) -> str: +def packet_dump_field(packet: Path, query: str) -> str: """Retrieve the value of a field from a PGP packet + Field queries are possible with the following notation during tree traversal: + - Use '.' to separate the parent section + - Use '*' as a wildcard for the current section + - Use '|' inside the current level as a logical OR + + Example: + - Version + - Hashed area|Unhashed area.Issuer + - *.Issuer + Parameters ---------- packet: The path to the PGP packet to retrieve the value from - field: The name of the field + query: The name of the field as a query notation Raises ------ @@ -194,11 +205,49 @@ def packet_dump_field(packet: Path, field: str) -> str: """ dump = packet_dump(packet) - lines = [line.strip() for line in dump.splitlines()] - lines = list(filter(lambda line: line.strip().startswith(f"{field}: "), lines)) - if not lines: - raise Exception(f'Packet has no field "{field}"') - return lines[0].split(sep=": ", maxsplit=1)[1] + + queries = deque(query.split(".")) + path = [queries.popleft()] + depth = 0 + + # remove leading 4 space indention + lines = list(filter(lambda line: line.startswith(" "), dump.splitlines())) + lines = [sub(r"^ {4}", "", line, count=1) for line in lines] + # filter empty lines + lines = list(filter(lambda line: line.strip(), lines)) + + for line in lines: + # determine current line depth by counting whitespace pairs + depth_line = int((len(line) - len(line.lstrip(" "))) / 2) + line = line.lstrip(" ") + + # skip nodes that are deeper as our currently matched path + if depth < depth_line: + continue + + # unwind the current query path until reaching previous match depth + while depth > depth_line: + queries.appendleft(path.pop()) + depth -= 1 + matcher = path[-1].split("|") + + # check if current field matches the query expression + field = line.split(sep=":", maxsplit=1)[0] + if field not in matcher and "*" not in matcher: + continue + + # next depth is one level deeper as the current line + depth = depth_line + 1 + + # check if matcher is not the leaf of the query expression + if queries: + path.append(queries.popleft()) + continue + + # return final match + return line.split(sep=": ", maxsplit=1)[1] if ": " in line else line + + raise Exception(f"Packet '{packet}' did not match the query '{query}'") def packet_signature_creation_time(packet: Path) -> datetime: @@ -212,7 +261,7 @@ def packet_signature_creation_time(packet: Path) -> datetime: ------- The signature creation time as datetime """ - field = packet_dump_field(packet, "Signature creation time") + field = packet_dump_field(packet, "Hashed area.Signature creation time") field = " ".join(field.split(" ", 3)[0:3]) return datetime.strptime(field, "%Y-%m-%d %H:%M:%S %Z") diff --git a/libkeyringctl/verify.py b/libkeyringctl/verify.py index 8673e6f..5b7fc7d 100644 --- a/libkeyringctl/verify.py +++ b/libkeyringctl/verify.py @@ -123,7 +123,7 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N assert_packet_kind(path=uid_path, expected="User") - uid_value = simplify_uid(Uid(packet_dump_field(packet=uid_path, field="Value"))) + uid_value = simplify_uid(Uid(packet_dump_field(packet=uid_path, query="Value"))) if uid_value != uid.name: raise Exception(f"Unexpected uid in file {str(uid_path)}: {uid_value}") elif not uid_path.is_dir(): @@ -139,7 +139,9 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N issuer = get_fingerprint_from_partial( fingerprints=all_fingerprints, - fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")), + fingerprint=Fingerprint( + packet_dump_field(packet=sig, query="Hashed area|Unhashed area.Issuer") + ), ) if issuer != sig.stem: raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}") @@ -155,7 +157,9 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N issuer = get_fingerprint_from_partial( fingerprints=all_fingerprints, - fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")), + fingerprint=Fingerprint( + packet_dump_field(packet=sig, query="Hashed area|Unhashed area.Issuer") + ), ) if issuer != sig.stem: raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}") @@ -236,13 +240,13 @@ def assert_packet_kind(path: Path, expected: str) -> None: def assert_signature_type(path: Path, expected: str) -> None: - sig_type = packet_dump_field(packet=path, field="Type") + sig_type = packet_dump_field(packet=path, query="Type") if sig_type != expected: raise Exception(f"Unexpected packet type in file {str(path)} type: {sig_type} expected: {expected}") def assert_signature_type_certification(path: Path) -> None: - sig_type = packet_dump_field(packet=path, field="Type") + sig_type = packet_dump_field(packet=path, query="Type") if sig_type not in ["GenericCertification", "PersonaCertification", "CasualCertification", "PositiveCertification"]: raise Exception(f"Unexpected packet certification type in file {str(path)} type: {sig_type}") @@ -253,13 +257,13 @@ def assert_is_pgp_fingerprint(path: Path, _str: str) -> None: def assert_filename_matches_packet_issuer_fingerprint(path: Path, check: str) -> None: - fingerprint = packet_dump_field(packet=path, field="Issuer Fingerprint") + fingerprint = packet_dump_field(packet=path, query="Unhashed area|Hashed area.Issuer Fingerprint") if not fingerprint == check: raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}") def assert_filename_matches_packet_fingerprint(path: Path, check: str) -> None: - fingerprint = packet_dump_field(packet=path, field="Fingerprint") + fingerprint = packet_dump_field(packet=path, query="Fingerprint") if not fingerprint == check: raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}") diff --git a/tests/test_sequoia.py b/tests/test_sequoia.py index 28cdbb6..6213a91 100644 --- a/tests/test_sequoia.py +++ b/tests/test_sequoia.py @@ -170,16 +170,127 @@ def test_packet_dump(system_mock: Mock) -> None: @mark.parametrize( - "packet_dump_return, field, expectation", + "packet_dump_return, query, result, expectation", [ ( - "foo: bar", - "foo", + """ +Signature Packet + Version: 4 + Type: SubkeyBinding + Hash algo: SHA512 +""", + "Type", + "SubkeyBinding", does_not_raise(), ), ( - "foo: bar", - "baz", + """ +Signature Packet + Version: 4 + Type: SubkeyBinding + Hash algo: SHA512 + Hashed area: + Signature creation time: 2022-12-31 15:53:59 UTC + Issuer: BBBBBB + Unhashed area: + Issuer: 42424242 +""", + "Unhashed area.Issuer", + "42424242", + does_not_raise(), + ), + ( + """ +Signature Packet + Version: 4 + Type: SubkeyBinding + Hash algo: SHA512 + Hashed area: + Signature creation time: 2022-12-31 15:53:59 UTC + Unhashed area: + Issuer: 42424242 +""", + "Hashed area|Unhashed area.Issuer", + "42424242", + does_not_raise(), + ), + ( + """ +Signature Packet + Version: 4 + Type: SubkeyBinding + Hash algo: SHA1 + Hashed area: + Signature creation time: 2022-12-31 +""", + "*.Signature creation time", + "2022-12-31", + does_not_raise(), + ), + ( + """ +Signature Packet + a: + b: + x: foo + b: + b: + c: bar +""", + "*.b.c", + "bar", + does_not_raise(), + ), + ( + """ +Signature Packet + a: + b: + x: + y: + z: foo + b: + b: + x: + y: + z: foo + w: + w: foo + k: + i: + c: bar +""", + "*.b.*.*.c", + "bar", + does_not_raise(), + ), + ( + """ +Signature Packet + a: + c: + b: foo + a: + b: bar +""", + "a.b", + "bar", + does_not_raise(), + ), + ( + """ +Signature Packet + Version: 4 + Type: SubkeyBinding + Hash algo: SHA512 + Hashed area: + Signature creation time: 2022-12-31 15:53:59 UTC + Unhashed area: + Issuer: 42424242 + Issuer: BBBBBBBB +""", + "Hashed area.Issuer", + None, raises(Exception), ), ], @@ -188,13 +299,14 @@ def test_packet_dump(system_mock: Mock) -> None: def test_packet_dump_field( packet_dump_mock: Mock, packet_dump_return: str, - field: str, + query: str, + result: str, expectation: ContextManager[str], ) -> None: packet_dump_mock.return_value = packet_dump_return with expectation: - sequoia.packet_dump_field(packet=Path("packet"), field=field) + assert sequoia.packet_dump_field(packet=Path("packet"), query=query) == result @patch("libkeyringctl.sequoia.packet_dump_field")