"""Audit chain writer + verifier tests.""" from __future__ import annotations import hashlib from datetime import UTC, datetime, timedelta from pathlib import Path import pytest from cerbero_bite.safety.audit_log import ( GENESIS_HASH, AuditChainError, AuditLog, iter_entries, verify_chain, ) def test_empty_file_verifies_with_zero_entries(tmp_path: Path) -> None: path = tmp_path / "audit.log" assert verify_chain(path) == 0 def test_first_entry_uses_genesis_prev_hash(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) entry = log.append( event="ENGINE_START", payload={"version": "1.0.0"}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC), ) assert entry.prev_hash == GENESIS_HASH assert entry.hash != GENESIS_HASH def test_chain_links_subsequent_entries(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) e1 = log.append(event="A", payload={"i": 1}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)) e2 = log.append(event="B", payload={"i": 2}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC)) e3 = log.append(event="C", payload={"i": 3}, now=datetime(2026, 4, 27, 14, 2, tzinfo=UTC)) assert e2.prev_hash == e1.hash assert e3.prev_hash == e2.hash assert verify_chain(path) == 3 def test_iter_entries_yields_in_order(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) log.append(event="A", payload={"i": 1}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)) log.append(event="B", payload={"i": 2}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC)) events = [e.event for e in iter_entries(path)] assert events == ["A", "B"] def test_log_resumes_chain_after_reopen(tmp_path: Path) -> None: path = tmp_path / "audit.log" first = AuditLog(path) e1 = first.append( event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC) ) second = AuditLog(path) assert second.last_hash == e1.hash e2 = second.append( event="B", payload={"k": "v"}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC) ) assert e2.prev_hash == e1.hash assert verify_chain(path) == 2 def test_payload_with_pipe_character_round_trips(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) log.append( event="NOTE", payload={"text": "first|second|third"}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC), ) entries = list(iter_entries(path)) assert entries[0].payload == {"text": "first|second|third"} assert verify_chain(path) == 1 def test_tampered_payload_breaks_chain(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) log.append(event="A", payload={"i": 1}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)) log.append(event="B", payload={"i": 2}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC)) # Mutate the first line's payload by hand. text = path.read_text(encoding="utf-8").splitlines() text[0] = text[0].replace('"i":1', '"i":99') path.write_text("\n".join(text) + "\n", encoding="utf-8") with pytest.raises(AuditChainError, match="hash mismatch"): verify_chain(path) def test_verify_chain_skips_blank_lines(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) log.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)) raw = path.read_text(encoding="utf-8") path.write_text("\n" + raw + "\n \n", encoding="utf-8") # The chain still verifies despite the surrounding whitespace lines. assert verify_chain(path) == 1 def test_prev_hash_mismatch_between_entries_is_caught(tmp_path: Path) -> None: """Second line's prev_hash points to a different chain — verify_chain rejects.""" path = tmp_path / "audit.log" log = AuditLog(path) e1 = log.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)) # Build a synthetic second line whose prev_hash != e1.hash but whose # own hash is correctly computed from that bogus prev_hash. fake_prev = "0" * 32 + "f" * 32 ts2 = "2026-04-27T14:01:00+00:00" payload_json = "{}" raw = f"{ts2}|B|{payload_json}|{fake_prev}" fake_hash = hashlib.sha256(raw.encode()).hexdigest() line = f"{ts2}|B|{payload_json}|prev_hash={fake_prev}|hash={fake_hash}\n" with path.open("a", encoding="utf-8") as fh: fh.write(line) assert e1.hash != fake_prev # sanity with pytest.raises(AuditChainError, match="prev_hash mismatch"): verify_chain(path) def test_tampered_prev_hash_breaks_chain(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) log.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)) log.append(event="B", payload={}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC)) # Inject an unrelated prev_hash on the second line. lines = path.read_text(encoding="utf-8").splitlines() lines[1] = lines[1].replace("prev_hash=", "prev_hash=" + "f" * 64 + "X") # Truncate to recover length: replace prev_hash field with all-ff. lines[1] = lines[1].replace("X", "") path.write_text("\n".join(lines) + "\n", encoding="utf-8") with pytest.raises(AuditChainError): verify_chain(path) def test_malformed_line_raises_chain_error(tmp_path: Path) -> None: path = tmp_path / "audit.log" path.write_text("not-a-valid-line\n", encoding="utf-8") with pytest.raises(AuditChainError): verify_chain(path) def test_parser_rejects_missing_hash_field(tmp_path: Path) -> None: path = tmp_path / "audit.log" path.write_text( "2026-04-27T14:00:00+00:00|EVT|{}|prev_hash=" + "0" * 64 + "\n", encoding="utf-8", ) with pytest.raises(AuditChainError, match="hash="): verify_chain(path) def test_parser_rejects_missing_prev_hash_field(tmp_path: Path) -> None: path = tmp_path / "audit.log" path.write_text( "2026-04-27T14:00:00+00:00|EVT|{}|hash=" + "f" * 64 + "\n", encoding="utf-8", ) with pytest.raises(AuditChainError, match="prev_hash"): verify_chain(path) def test_parser_rejects_line_with_no_separators(tmp_path: Path) -> None: path = tmp_path / "audit.log" path.write_text("just-a-blob|hash=" + "f" * 64 + "\n", encoding="utf-8") with pytest.raises(AuditChainError, match="prev_hash"): verify_chain(path) def test_parser_rejects_malformed_leading_section(tmp_path: Path) -> None: path = tmp_path / "audit.log" # Two `|` only: rsplit succeeds twice, leading parts has 1 element ≠ 3. path.write_text( "tooshort|prev_hash=" + "0" * 64 + "|hash=" + "f" * 64 + "\n", encoding="utf-8", ) with pytest.raises(AuditChainError, match="leading section"): verify_chain(path) def test_parser_rejects_payload_not_a_json_object(tmp_path: Path) -> None: path = tmp_path / "audit.log" path.write_text( "2026-04-27T14:00:00+00:00|EVT|[1,2]|prev_hash=" + "0" * 64 + "|hash=" + "f" * 64 + "\n", encoding="utf-8", ) with pytest.raises(AuditChainError, match="JSON object"): verify_chain(path) def test_parser_rejects_payload_with_invalid_json(tmp_path: Path) -> None: path = tmp_path / "audit.log" path.write_text( "2026-04-27T14:00:00+00:00|EVT|{not-json}|prev_hash=" + "0" * 64 + "|hash=" + "f" * 64 + "\n", encoding="utf-8", ) with pytest.raises(AuditChainError, match="JSON"): verify_chain(path) def test_iter_entries_returns_empty_when_file_missing(tmp_path: Path) -> None: path = tmp_path / "missing.log" assert list(iter_entries(path)) == [] def test_iter_entries_skips_blank_lines(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) log.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)) raw = path.read_text(encoding="utf-8") path.write_text(raw + "\n\n", encoding="utf-8") entries = list(iter_entries(path)) assert len(entries) == 1 def test_log_resumes_chain_with_large_file(tmp_path: Path) -> None: """Tail-seek reads past the 4096-byte chunk boundary.""" path = tmp_path / "audit.log" log = AuditLog(path) base = datetime(2026, 4, 27, 14, 0, tzinfo=UTC) # Each line ~150 chars; 50 lines is comfortably > 4096 bytes. for i in range(50): log.append( event=f"E{i}", payload={"i": i, "filler": "x" * 80}, now=base + timedelta(seconds=i), ) last_hash = log.last_hash reopened = AuditLog(path) assert reopened.last_hash == last_hash assert verify_chain(path) == 50 def test_payload_serialisation_is_canonical(tmp_path: Path) -> None: path = tmp_path / "audit.log" log = AuditLog(path) # Different key order must produce identical hashes. e1 = log.append( event="A", payload={"b": 1, "a": 2}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC), ) other = tmp_path / "audit_other.log" log2 = AuditLog(other) e2 = log2.append( event="A", payload={"a": 2, "b": 1}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC), ) assert e1.hash == e2.hash