"""Kill switch behaviour: SQLite + audit log stay in lock-step.""" from __future__ import annotations from datetime import UTC, datetime, timedelta from pathlib import Path import pytest from cerbero_bite.safety import AuditLog, verify_chain from cerbero_bite.safety.kill_switch import KillSwitch, KillSwitchError from cerbero_bite.state import Repository, connect, run_migrations, transaction def _make_kill_switch(tmp_path: Path) -> tuple[KillSwitch, AuditLog, Path, Repository]: db_path = tmp_path / "state.sqlite" audit_path = tmp_path / "audit.log" conn = connect(db_path) run_migrations(conn) repo = Repository() with transaction(conn): repo.init_system_state( conn, config_version="1.0.0", now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC) ) conn.close() audit = AuditLog(audit_path) times = iter( datetime(2026, 4, 27, 14, m, tzinfo=UTC) for m in (10, 20, 30, 40, 50) ) ks = KillSwitch( connection_factory=lambda: connect(db_path), repository=repo, audit_log=audit, clock=lambda: next(times), ) return ks, audit, audit_path, repo def test_arm_persists_state_and_appends_audit(tmp_path: Path) -> None: ks, _audit, audit_path, repo = _make_kill_switch(tmp_path) assert ks.is_armed() is False ks.arm(reason="manual test", source="manual") assert ks.is_armed() is True conn = connect(tmp_path / "state.sqlite") try: state = repo.get_system_state(conn) finally: conn.close() assert state is not None assert state.kill_switch == 1 assert state.kill_reason == "manual test" assert state.kill_at is not None assert verify_chain(audit_path) == 1 def test_arm_is_idempotent_on_second_call(tmp_path: Path) -> None: ks, _audit, audit_path, _repo = _make_kill_switch(tmp_path) ks.arm(reason="first", source="manual") ks.arm(reason="second", source="manual") # no-op # only one audit line because the second call short-circuits assert verify_chain(audit_path) == 1 def test_disarm_resets_kill_switch(tmp_path: Path) -> None: ks, _audit, audit_path, repo = _make_kill_switch(tmp_path) ks.arm(reason="test", source="manual") ks.disarm(reason="cleared", source="manual") assert ks.is_armed() is False conn = connect(tmp_path / "state.sqlite") try: state = repo.get_system_state(conn) finally: conn.close() assert state is not None assert state.kill_at is None # arm + disarm = 2 audit lines assert verify_chain(audit_path) == 2 def test_disarm_when_not_armed_is_noop(tmp_path: Path) -> None: ks, _audit, audit_path, _repo = _make_kill_switch(tmp_path) ks.disarm(reason="nothing to do", source="manual") assert verify_chain(audit_path) == 0 def test_arm_requires_reason(tmp_path: Path) -> None: ks, _audit, _audit_path, _repo = _make_kill_switch(tmp_path) with pytest.raises(KillSwitchError, match="reason is required"): ks.arm(reason="", source="manual") def test_arm_without_initialised_state_raises(tmp_path: Path) -> None: db_path = tmp_path / "state.sqlite" audit_path = tmp_path / "audit.log" conn = connect(db_path) run_migrations(conn) conn.close() ks = KillSwitch( connection_factory=lambda: connect(db_path), repository=Repository(), audit_log=AuditLog(audit_path), clock=lambda: datetime(2026, 4, 27, 14, 0, tzinfo=UTC), ) with pytest.raises(KillSwitchError, match="system_state singleton missing"): ks.arm(reason="x", source="manual") def test_audit_chain_records_event_kind(tmp_path: Path) -> None: ks, _audit, audit_path, _repo = _make_kill_switch(tmp_path) ks.arm(reason="x", source="mcp_timeout") ks.disarm(reason="y", source="manual") text = audit_path.read_text(encoding="utf-8") assert "KILL_SWITCH_ARMED" in text assert "KILL_SWITCH_DISARMED" in text def test_is_armed_returns_false_when_singleton_missing(tmp_path: Path) -> None: db_path = tmp_path / "state.sqlite" audit_path = tmp_path / "audit.log" conn = connect(db_path) run_migrations(conn) conn.close() ks = KillSwitch( connection_factory=lambda: connect(db_path), repository=Repository(), audit_log=AuditLog(audit_path), clock=lambda: datetime(2026, 4, 27, 14, 0, tzinfo=UTC), ) assert ks.is_armed() is False def test_disarm_requires_reason(tmp_path: Path) -> None: ks, _audit, _audit_path, _repo = _make_kill_switch(tmp_path) with pytest.raises(KillSwitchError, match="reason is required"): ks.disarm(reason="", source="manual") def test_disarm_without_initialised_state_raises(tmp_path: Path) -> None: db_path = tmp_path / "state.sqlite" audit_path = tmp_path / "audit.log" conn = connect(db_path) run_migrations(conn) conn.close() ks = KillSwitch( connection_factory=lambda: connect(db_path), repository=Repository(), audit_log=AuditLog(audit_path), clock=lambda: datetime(2026, 4, 27, 14, 0, tzinfo=UTC), ) with pytest.raises(KillSwitchError, match="system_state singleton missing"): ks.disarm(reason="x", source="manual") def test_clock_is_advanced_for_each_call(tmp_path: Path) -> None: ks, _audit, _audit_path, repo = _make_kill_switch(tmp_path) ks.arm(reason="x", source="manual") ks.disarm(reason="y", source="manual") conn = connect(tmp_path / "state.sqlite") try: state = repo.get_system_state(conn) finally: conn.close() assert state is not None # last_health_check should reflect the disarm time (14:20 from the fake clock). assert state.last_health_check >= datetime(2026, 4, 27, 14, 15, tzinfo=UTC) - timedelta( seconds=1 )