Phase 2: persistence + safety controls
Aggiunge la persistenza SQLite, l'audit log a hash chain, il kill switch coordinato e i CLI di gestione documentati in docs/05-data-model.md e docs/07-risk-controls.md. 197 test pass, 1 skipped (sqlite3 CLI mancante), copertura totale 97%. State (`state/`): - 0001_init.sql con positions, instructions, decisions, dvol_history, manual_actions, system_state. - db.py: connect con WAL + foreign_keys + transaction ctx, runner forward-only basato su PRAGMA user_version. - models.py: record Pydantic, Decimal preservato come TEXT. - repository.py: CRUD typed con singola connessione passata, cache aware, posizioni concorrenti. Safety (`safety/`): - audit_log.py: AuditLog append-only con SHA-256 chain e fsync, verify_chain riconosce ogni manomissione (payload, prev_hash, hash, JSON, separatori). - kill_switch.py: arm/disarm transazionali, idempotenti, accoppiati all'audit chain. Config (`config/loader.py` + `strategy.yaml`): - Loader YAML con deep-merge di strategy.local.yaml. - Verifica config_hash SHA-256 (riga config_hash esclusa). - File golden strategy.yaml + esempio override. Scripts: - dead_man.sh: watchdog shell indipendente da Python. - backup.py: VACUUM INTO orario con retention 30 giorni. CLI: - audit verify (exit 2 su tampering). - kill-switch arm/disarm/status su SQLite reale. - state inspect con tabella posizioni aperte. - config hash, config validate. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,160 @@
|
||||
"""End-to-end CLI tests for the Phase 2 commands.
|
||||
|
||||
The commands hit real on-disk paths (tmp_path) so the assertions run
|
||||
the production code paths verbatim.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from click.testing import CliRunner
|
||||
|
||||
from cerbero_bite.cli import main as cli_main
|
||||
from cerbero_bite.safety import AuditLog
|
||||
from cerbero_bite.state import Repository, connect, run_migrations, transaction
|
||||
|
||||
|
||||
def _seed_state(db_path: Path) -> None:
|
||||
conn = connect(db_path)
|
||||
try:
|
||||
run_migrations(conn)
|
||||
with transaction(conn):
|
||||
Repository().init_system_state(
|
||||
conn,
|
||||
config_version="1.0.0",
|
||||
now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_audit_verify_reports_ok_on_clean_chain(tmp_path: Path) -> None:
|
||||
audit = AuditLog(tmp_path / "audit.log")
|
||||
audit.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
|
||||
audit.append(event="B", payload={}, now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC))
|
||||
|
||||
result = CliRunner().invoke(
|
||||
cli_main, ["audit", "verify", "--file", str(tmp_path / "audit.log")]
|
||||
)
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "ok" in result.output
|
||||
assert "2" in result.output
|
||||
|
||||
|
||||
def test_audit_verify_handles_empty_file(tmp_path: Path) -> None:
|
||||
target = tmp_path / "audit.log"
|
||||
target.write_text("", encoding="utf-8")
|
||||
result = CliRunner().invoke(cli_main, ["audit", "verify", "--file", str(target)])
|
||||
assert result.exit_code == 0
|
||||
assert "empty" in result.output
|
||||
|
||||
|
||||
def test_audit_verify_exits_nonzero_on_tampering(tmp_path: Path) -> None:
|
||||
target = tmp_path / "audit.log"
|
||||
audit = AuditLog(target)
|
||||
audit.append(event="A", payload={}, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC))
|
||||
target.write_text(
|
||||
target.read_text(encoding="utf-8").replace('"event":"A"', '"event":"X"'),
|
||||
encoding="utf-8",
|
||||
)
|
||||
# NB: we mutated the JSON payload, but the actual line still has event=A.
|
||||
# Force tampering by editing the literal "A" in the line text.
|
||||
raw = target.read_text(encoding="utf-8")
|
||||
target.write_text(raw.replace("|A|", "|X|", 1), encoding="utf-8")
|
||||
|
||||
result = CliRunner().invoke(cli_main, ["audit", "verify", "--file", str(target)])
|
||||
assert result.exit_code == 2
|
||||
assert "TAMPERED" in result.output
|
||||
|
||||
|
||||
def test_kill_switch_status_prints_disarmed(tmp_path: Path) -> None:
|
||||
db = tmp_path / "state.sqlite"
|
||||
_seed_state(db)
|
||||
result = CliRunner().invoke(cli_main, ["kill-switch", "status", "--db", str(db)])
|
||||
assert result.exit_code == 0
|
||||
assert "disarmed" in result.output
|
||||
|
||||
|
||||
def test_kill_switch_arm_then_status_shows_armed(tmp_path: Path) -> None:
|
||||
db = tmp_path / "state.sqlite"
|
||||
audit = tmp_path / "audit.log"
|
||||
runner = CliRunner()
|
||||
arm = runner.invoke(
|
||||
cli_main,
|
||||
[
|
||||
"kill-switch",
|
||||
"arm",
|
||||
"--reason",
|
||||
"manual smoke",
|
||||
"--db",
|
||||
str(db),
|
||||
"--audit",
|
||||
str(audit),
|
||||
],
|
||||
)
|
||||
assert arm.exit_code == 0, arm.output
|
||||
status = runner.invoke(cli_main, ["kill-switch", "status", "--db", str(db)])
|
||||
assert status.exit_code == 0
|
||||
assert "ARMED" in status.output
|
||||
|
||||
|
||||
def test_kill_switch_status_handles_missing_db(tmp_path: Path) -> None:
|
||||
result = CliRunner().invoke(
|
||||
cli_main, ["kill-switch", "status", "--db", str(tmp_path / "absent.sqlite")]
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "not found" in result.output
|
||||
|
||||
|
||||
def test_state_inspect_shows_no_open_positions(tmp_path: Path) -> None:
|
||||
db = tmp_path / "state.sqlite"
|
||||
_seed_state(db)
|
||||
result = CliRunner().invoke(cli_main, ["state", "inspect", "--db", str(db)])
|
||||
assert result.exit_code == 0
|
||||
assert "no open positions" in result.output
|
||||
|
||||
|
||||
def test_state_inspect_handles_missing_db(tmp_path: Path) -> None:
|
||||
result = CliRunner().invoke(
|
||||
cli_main, ["state", "inspect", "--db", str(tmp_path / "absent.sqlite")]
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "not found" in result.output
|
||||
|
||||
|
||||
def test_config_hash_matches_loader(tmp_path: Path) -> None:
|
||||
target = tmp_path / "strategy.yaml"
|
||||
target.write_text(
|
||||
'config_version: "1.0.0"\nconfig_hash: "0000"\nasset:\n symbol: ETH\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
result = CliRunner().invoke(cli_main, ["config", "hash", "--file", str(target)])
|
||||
assert result.exit_code == 0
|
||||
assert len(result.output.strip()) == 64 # sha256 hex
|
||||
|
||||
|
||||
def test_config_validate_repo_strategy_yaml() -> None:
|
||||
repo_root = Path(__file__).resolve().parents[2]
|
||||
yaml_path = repo_root / "strategy.yaml"
|
||||
result = CliRunner().invoke(
|
||||
cli_main, ["config", "validate", "--file", str(yaml_path)]
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "ok" in result.output
|
||||
|
||||
|
||||
def test_config_validate_with_no_enforce_hash_skips_check(tmp_path: Path) -> None:
|
||||
target = tmp_path / "strategy.yaml"
|
||||
target.write_text(
|
||||
'config_version: "1.0.0"\nconfig_hash: "wrong"\n'
|
||||
'last_review: "2026-04-26"\nlast_reviewer: "test"\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
result = CliRunner().invoke(
|
||||
cli_main,
|
||||
["config", "validate", "--file", str(target), "--no-enforce-hash"],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "ok" in result.output
|
||||
Reference in New Issue
Block a user