feature(keyringctl): support query expressions for packet field selection

Instead of simply string matching a line, we now traverse the packet as
a tree and match the path based on a depth first search.

While traversing, we support logical OR and current depth * wildcard
processed as a component based query expression.

Callee's are adjusted to specifically select the appropriate Issuer at
the correct depth.

Fixes #185
This commit is contained in:
Levente Polyak 2022-07-20 18:27:42 +02:00
parent 9d4c7057f4
commit 099df52a04
No known key found for this signature in database
GPG Key ID: FC1B547C8D8172C8
4 changed files with 199 additions and 28 deletions

View File

@ -125,8 +125,10 @@ def convert_pubkey_signature_packet(
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
signature_type = packet_dump_field(packet=packet, field="Type")
issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer")))
signature_type = packet_dump_field(packet=packet, query="Type")
issuer = get_fingerprint_from_partial(
fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Hashed area|Unhashed area.Issuer"))
)
if not issuer:
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")
@ -165,8 +167,10 @@ def convert_uid_signature_packet(
if not current_packet_uid:
raise Exception('missing current packet uid for "{packet.name}"')
signature_type = packet_dump_field(packet=packet, field="Type")
issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer")))
signature_type = packet_dump_field(packet=packet, query="Type")
issuer = get_fingerprint_from_partial(
fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Hashed area|Unhashed area.Issuer"))
)
if not issuer:
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")
@ -207,8 +211,10 @@ def convert_subkey_signature_packet(
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
signature_type = packet_dump_field(packet=packet, field="Type")
issuer = get_fingerprint_from_partial(fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Issuer")))
signature_type = packet_dump_field(packet=packet, query="Type")
issuer = get_fingerprint_from_partial(
fingerprint_filter or set(), Fingerprint(packet_dump_field(packet, "Hashed area|Unhashed area.Issuer"))
)
if not issuer:
debug(f"failed to resolve partial fingerprint {issuer}, skipping packet")

View File

@ -1,5 +1,6 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from collections import deque
from datetime import datetime
from functools import reduce
from pathlib import Path
@ -176,13 +177,23 @@ def packet_dump(packet: Path) -> str:
return system(["sq", "packet", "dump", str(packet)])
def packet_dump_field(packet: Path, field: str) -> str:
def packet_dump_field(packet: Path, query: str) -> str:
"""Retrieve the value of a field from a PGP packet
Field queries are possible with the following notation during tree traversal:
- Use '.' to separate the parent section
- Use '*' as a wildcard for the current section
- Use '|' inside the current level as a logical OR
Example:
- Version
- Hashed area|Unhashed area.Issuer
- *.Issuer
Parameters
----------
packet: The path to the PGP packet to retrieve the value from
field: The name of the field
query: The name of the field as a query notation
Raises
------
@ -194,11 +205,49 @@ def packet_dump_field(packet: Path, field: str) -> str:
"""
dump = packet_dump(packet)
lines = [line.strip() for line in dump.splitlines()]
lines = list(filter(lambda line: line.strip().startswith(f"{field}: "), lines))
if not lines:
raise Exception(f'Packet has no field "{field}"')
return lines[0].split(sep=": ", maxsplit=1)[1]
queries = deque(query.split("."))
path = [queries.popleft()]
depth = 0
# remove leading 4 space indention
lines = list(filter(lambda line: line.startswith(" "), dump.splitlines()))
lines = [sub(r"^ {4}", "", line, count=1) for line in lines]
# filter empty lines
lines = list(filter(lambda line: line.strip(), lines))
for line in lines:
# determine current line depth by counting whitespace pairs
depth_line = int((len(line) - len(line.lstrip(" "))) / 2)
line = line.lstrip(" ")
# skip nodes that are deeper as our currently matched path
if depth < depth_line:
continue
# unwind the current query path until reaching previous match depth
while depth > depth_line:
queries.appendleft(path.pop())
depth -= 1
matcher = path[-1].split("|")
# check if current field matches the query expression
field = line.split(sep=":", maxsplit=1)[0]
if field not in matcher and "*" not in matcher:
continue
# next depth is one level deeper as the current line
depth = depth_line + 1
# check if matcher is not the leaf of the query expression
if queries:
path.append(queries.popleft())
continue
# return final match
return line.split(sep=": ", maxsplit=1)[1] if ": " in line else line
raise Exception(f"Packet '{packet}' did not match the query '{query}'")
def packet_signature_creation_time(packet: Path) -> datetime:
@ -212,7 +261,7 @@ def packet_signature_creation_time(packet: Path) -> datetime:
-------
The signature creation time as datetime
"""
field = packet_dump_field(packet, "Signature creation time")
field = packet_dump_field(packet, "Hashed area.Signature creation time")
field = " ".join(field.split(" ", 3)[0:3])
return datetime.strptime(field, "%Y-%m-%d %H:%M:%S %Z")

View File

@ -123,7 +123,7 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
assert_packet_kind(path=uid_path, expected="User")
uid_value = simplify_uid(Uid(packet_dump_field(packet=uid_path, field="Value")))
uid_value = simplify_uid(Uid(packet_dump_field(packet=uid_path, query="Value")))
if uid_value != uid.name:
raise Exception(f"Unexpected uid in file {str(uid_path)}: {uid_value}")
elif not uid_path.is_dir():
@ -139,7 +139,9 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
issuer = get_fingerprint_from_partial(
fingerprints=all_fingerprints,
fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")),
fingerprint=Fingerprint(
packet_dump_field(packet=sig, query="Hashed area|Unhashed area.Issuer")
),
)
if issuer != sig.stem:
raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}")
@ -155,7 +157,9 @@ def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> N
issuer = get_fingerprint_from_partial(
fingerprints=all_fingerprints,
fingerprint=Fingerprint(packet_dump_field(packet=sig, field="Issuer")),
fingerprint=Fingerprint(
packet_dump_field(packet=sig, query="Hashed area|Unhashed area.Issuer")
),
)
if issuer != sig.stem:
raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}")
@ -236,13 +240,13 @@ def assert_packet_kind(path: Path, expected: str) -> None:
def assert_signature_type(path: Path, expected: str) -> None:
sig_type = packet_dump_field(packet=path, field="Type")
sig_type = packet_dump_field(packet=path, query="Type")
if sig_type != expected:
raise Exception(f"Unexpected packet type in file {str(path)} type: {sig_type} expected: {expected}")
def assert_signature_type_certification(path: Path) -> None:
sig_type = packet_dump_field(packet=path, field="Type")
sig_type = packet_dump_field(packet=path, query="Type")
if sig_type not in ["GenericCertification", "PersonaCertification", "CasualCertification", "PositiveCertification"]:
raise Exception(f"Unexpected packet certification type in file {str(path)} type: {sig_type}")
@ -253,13 +257,13 @@ def assert_is_pgp_fingerprint(path: Path, _str: str) -> None:
def assert_filename_matches_packet_issuer_fingerprint(path: Path, check: str) -> None:
fingerprint = packet_dump_field(packet=path, field="Issuer Fingerprint")
fingerprint = packet_dump_field(packet=path, query="Unhashed area|Hashed area.Issuer Fingerprint")
if not fingerprint == check:
raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}")
def assert_filename_matches_packet_fingerprint(path: Path, check: str) -> None:
fingerprint = packet_dump_field(packet=path, field="Fingerprint")
fingerprint = packet_dump_field(packet=path, query="Fingerprint")
if not fingerprint == check:
raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}")

View File

@ -170,16 +170,127 @@ def test_packet_dump(system_mock: Mock) -> None:
@mark.parametrize(
"packet_dump_return, field, expectation",
"packet_dump_return, query, result, expectation",
[
(
"foo: bar",
"foo",
"""
Signature Packet
Version: 4
Type: SubkeyBinding
Hash algo: SHA512
""",
"Type",
"SubkeyBinding",
does_not_raise(),
),
(
"foo: bar",
"baz",
"""
Signature Packet
Version: 4
Type: SubkeyBinding
Hash algo: SHA512
Hashed area:
Signature creation time: 2022-12-31 15:53:59 UTC
Issuer: BBBBBB
Unhashed area:
Issuer: 42424242
""",
"Unhashed area.Issuer",
"42424242",
does_not_raise(),
),
(
"""
Signature Packet
Version: 4
Type: SubkeyBinding
Hash algo: SHA512
Hashed area:
Signature creation time: 2022-12-31 15:53:59 UTC
Unhashed area:
Issuer: 42424242
""",
"Hashed area|Unhashed area.Issuer",
"42424242",
does_not_raise(),
),
(
"""
Signature Packet
Version: 4
Type: SubkeyBinding
Hash algo: SHA1
Hashed area:
Signature creation time: 2022-12-31
""",
"*.Signature creation time",
"2022-12-31",
does_not_raise(),
),
(
"""
Signature Packet
a:
b:
x: foo
b:
b:
c: bar
""",
"*.b.c",
"bar",
does_not_raise(),
),
(
"""
Signature Packet
a:
b:
x:
y:
z: foo
b:
b:
x:
y:
z: foo
w:
w: foo
k:
i:
c: bar
""",
"*.b.*.*.c",
"bar",
does_not_raise(),
),
(
"""
Signature Packet
a:
c:
b: foo
a:
b: bar
""",
"a.b",
"bar",
does_not_raise(),
),
(
"""
Signature Packet
Version: 4
Type: SubkeyBinding
Hash algo: SHA512
Hashed area:
Signature creation time: 2022-12-31 15:53:59 UTC
Unhashed area:
Issuer: 42424242
Issuer: BBBBBBBB
""",
"Hashed area.Issuer",
None,
raises(Exception),
),
],
@ -188,13 +299,14 @@ def test_packet_dump(system_mock: Mock) -> None:
def test_packet_dump_field(
packet_dump_mock: Mock,
packet_dump_return: str,
field: str,
query: str,
result: str,
expectation: ContextManager[str],
) -> None:
packet_dump_mock.return_value = packet_dump_return
with expectation:
sequoia.packet_dump_field(packet=Path("packet"), field=field)
assert sequoia.packet_dump_field(packet=Path("packet"), query=query) == result
@patch("libkeyringctl.sequoia.packet_dump_field")