Files
Adriano 263470786d Phase 2: persistence + safety controls
Aggiunge la persistenza SQLite, l'audit log a hash chain, il kill
switch coordinato e i CLI di gestione documentati in
docs/05-data-model.md e docs/07-risk-controls.md. 197 test pass,
1 skipped (sqlite3 CLI mancante), copertura totale 97%.

State (`state/`):
- 0001_init.sql con positions, instructions, decisions, dvol_history,
  manual_actions, system_state.
- db.py: connect con WAL + foreign_keys + transaction ctx, runner
  forward-only basato su PRAGMA user_version.
- models.py: record Pydantic, Decimal preservato come TEXT.
- repository.py: CRUD typed con singola connessione passata, cache
  aware, posizioni concorrenti.

Safety (`safety/`):
- audit_log.py: AuditLog append-only con SHA-256 chain e fsync,
  verify_chain riconosce ogni manomissione (payload, prev_hash,
  hash, JSON, separatori).
- kill_switch.py: arm/disarm transazionali, idempotenti, accoppiati
  all'audit chain.

Config (`config/loader.py` + `strategy.yaml`):
- Loader YAML con deep-merge di strategy.local.yaml.
- Verifica config_hash SHA-256 (riga config_hash esclusa).
- File golden strategy.yaml + esempio override.

Scripts:
- dead_man.sh: watchdog shell indipendente da Python.
- backup.py: VACUUM INTO orario con retention 30 giorni.

CLI:
- audit verify (exit 2 su tampering).
- kill-switch arm/disarm/status su SQLite reale.
- state inspect con tabella posizioni aperte.
- config hash, config validate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:35:35 +02:00

274 lines
9.1 KiB
Python

"""Audit chain writer + verifier tests."""
from __future__ import annotations
import hashlib
from datetime import UTC, datetime, timedelta
from pathlib import Path
import pytest
from cerbero_bite.safety.audit_log import (
GENESIS_HASH,
AuditChainError,
AuditLog,
iter_entries,
verify_chain,
)
def test_empty_file_verifies_with_zero_entries(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
assert verify_chain(path) == 0
def test_first_entry_uses_genesis_prev_hash(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
entry = log.append(
event="ENGINE_START",
payload={"version": "1.0.0"},
now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
)
assert entry.prev_hash == GENESIS_HASH
assert entry.hash != GENESIS_HASH
def test_chain_links_subsequent_entries(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
e1 = log.append(event="A", payload={"i": 1}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
e2 = log.append(event="B", payload={"i": 2}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC))
e3 = log.append(event="C", payload={"i": 3}, now=datetime(2026, 4, 27, 14, 2, tzinfo=UTC))
assert e2.prev_hash == e1.hash
assert e3.prev_hash == e2.hash
assert verify_chain(path) == 3
def test_iter_entries_yields_in_order(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
log.append(event="A", payload={"i": 1}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
log.append(event="B", payload={"i": 2}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC))
events = [e.event for e in iter_entries(path)]
assert events == ["A", "B"]
def test_log_resumes_chain_after_reopen(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
first = AuditLog(path)
e1 = first.append(
event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
)
second = AuditLog(path)
assert second.last_hash == e1.hash
e2 = second.append(
event="B", payload={"k": "v"}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC)
)
assert e2.prev_hash == e1.hash
assert verify_chain(path) == 2
def test_payload_with_pipe_character_round_trips(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
log.append(
event="NOTE",
payload={"text": "first|second|third"},
now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
)
entries = list(iter_entries(path))
assert entries[0].payload == {"text": "first|second|third"}
assert verify_chain(path) == 1
def test_tampered_payload_breaks_chain(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
log.append(event="A", payload={"i": 1}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
log.append(event="B", payload={"i": 2}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC))
# Mutate the first line's payload by hand.
text = path.read_text(encoding="utf-8").splitlines()
text[0] = text[0].replace('"i":1', '"i":99')
path.write_text("\n".join(text) + "\n", encoding="utf-8")
with pytest.raises(AuditChainError, match="hash mismatch"):
verify_chain(path)
def test_verify_chain_skips_blank_lines(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
log.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
raw = path.read_text(encoding="utf-8")
path.write_text("\n" + raw + "\n \n", encoding="utf-8")
# The chain still verifies despite the surrounding whitespace lines.
assert verify_chain(path) == 1
def test_prev_hash_mismatch_between_entries_is_caught(tmp_path: Path) -> None:
"""Second line's prev_hash points to a different chain — verify_chain rejects."""
path = tmp_path / "audit.log"
log = AuditLog(path)
e1 = log.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
# Build a synthetic second line whose prev_hash != e1.hash but whose
# own hash is correctly computed from that bogus prev_hash.
fake_prev = "0" * 32 + "f" * 32
ts2 = "2026-04-27T14:01:00+00:00"
payload_json = "{}"
raw = f"{ts2}|B|{payload_json}|{fake_prev}"
fake_hash = hashlib.sha256(raw.encode()).hexdigest()
line = f"{ts2}|B|{payload_json}|prev_hash={fake_prev}|hash={fake_hash}\n"
with path.open("a", encoding="utf-8") as fh:
fh.write(line)
assert e1.hash != fake_prev # sanity
with pytest.raises(AuditChainError, match="prev_hash mismatch"):
verify_chain(path)
def test_tampered_prev_hash_breaks_chain(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
log.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
log.append(event="B", payload={}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC))
# Inject an unrelated prev_hash on the second line.
lines = path.read_text(encoding="utf-8").splitlines()
lines[1] = lines[1].replace("prev_hash=", "prev_hash=" + "f" * 64 + "X")
# Truncate to recover length: replace prev_hash field with all-ff.
lines[1] = lines[1].replace("X", "")
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
with pytest.raises(AuditChainError):
verify_chain(path)
def test_malformed_line_raises_chain_error(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
path.write_text("not-a-valid-line\n", encoding="utf-8")
with pytest.raises(AuditChainError):
verify_chain(path)
def test_parser_rejects_missing_hash_field(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
path.write_text(
"2026-04-27T14:00:00+00:00|EVT|{}|prev_hash=" + "0" * 64 + "\n",
encoding="utf-8",
)
with pytest.raises(AuditChainError, match="hash="):
verify_chain(path)
def test_parser_rejects_missing_prev_hash_field(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
path.write_text(
"2026-04-27T14:00:00+00:00|EVT|{}|hash=" + "f" * 64 + "\n",
encoding="utf-8",
)
with pytest.raises(AuditChainError, match="prev_hash"):
verify_chain(path)
def test_parser_rejects_line_with_no_separators(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
path.write_text("just-a-blob|hash=" + "f" * 64 + "\n", encoding="utf-8")
with pytest.raises(AuditChainError, match="prev_hash"):
verify_chain(path)
def test_parser_rejects_malformed_leading_section(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
# Two `|` only: rsplit succeeds twice, leading parts has 1 element ≠ 3.
path.write_text(
"tooshort|prev_hash=" + "0" * 64 + "|hash=" + "f" * 64 + "\n",
encoding="utf-8",
)
with pytest.raises(AuditChainError, match="leading section"):
verify_chain(path)
def test_parser_rejects_payload_not_a_json_object(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
path.write_text(
"2026-04-27T14:00:00+00:00|EVT|[1,2]|prev_hash="
+ "0" * 64
+ "|hash="
+ "f" * 64
+ "\n",
encoding="utf-8",
)
with pytest.raises(AuditChainError, match="JSON object"):
verify_chain(path)
def test_parser_rejects_payload_with_invalid_json(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
path.write_text(
"2026-04-27T14:00:00+00:00|EVT|{not-json}|prev_hash="
+ "0" * 64
+ "|hash="
+ "f" * 64
+ "\n",
encoding="utf-8",
)
with pytest.raises(AuditChainError, match="JSON"):
verify_chain(path)
def test_iter_entries_returns_empty_when_file_missing(tmp_path: Path) -> None:
path = tmp_path / "missing.log"
assert list(iter_entries(path)) == []
def test_iter_entries_skips_blank_lines(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
log.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
raw = path.read_text(encoding="utf-8")
path.write_text(raw + "\n\n", encoding="utf-8")
entries = list(iter_entries(path))
assert len(entries) == 1
def test_log_resumes_chain_with_large_file(tmp_path: Path) -> None:
"""Tail-seek reads past the 4096-byte chunk boundary."""
path = tmp_path / "audit.log"
log = AuditLog(path)
base = datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
# Each line ~150 chars; 50 lines is comfortably > 4096 bytes.
for i in range(50):
log.append(
event=f"E{i}",
payload={"i": i, "filler": "x" * 80},
now=base + timedelta(seconds=i),
)
last_hash = log.last_hash
reopened = AuditLog(path)
assert reopened.last_hash == last_hash
assert verify_chain(path) == 50
def test_payload_serialisation_is_canonical(tmp_path: Path) -> None:
path = tmp_path / "audit.log"
log = AuditLog(path)
# Different key order must produce identical hashes.
e1 = log.append(
event="A",
payload={"b": 1, "a": 2},
now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
)
other = tmp_path / "audit_other.log"
log2 = AuditLog(other)
e2 = log2.append(
event="A",
payload={"a": 2, "b": 1},
now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
)
assert e1.hash == e2.hash