"""Tests for runtime.manual_actions_consumer.""" from __future__ import annotations import json from datetime import UTC, datetime from pathlib import Path from unittest.mock import MagicMock import pytest from cerbero_bite.runtime.manual_actions_consumer import consume_manual_actions from cerbero_bite.safety.audit_log import AuditLog from cerbero_bite.safety.kill_switch import KillSwitch, KillSwitchError from cerbero_bite.state import Repository, connect, run_migrations, transaction from cerbero_bite.state.models import ManualAction def _now() -> datetime: return datetime(2026, 4, 30, 12, 0, tzinfo=UTC) def _ctx(tmp_path: Path): db_path = tmp_path / "state.sqlite" audit_path = tmp_path / "audit.log" repo = Repository() conn = connect(db_path) run_migrations(conn) with transaction(conn): repo.init_system_state(conn, config_version="1.0.0", now=_now()) conn.close() audit = AuditLog(audit_path) ks = KillSwitch( connection_factory=lambda: connect(db_path), repository=repo, audit_log=audit, clock=_now, ) ctx = MagicMock() ctx.db_path = db_path ctx.repository = repo ctx.kill_switch = ks ctx.audit_log = audit return ctx def _enqueue(ctx, kind: str, payload: dict[str, object]) -> int: conn = connect(ctx.db_path) try: with transaction(conn): return ctx.repository.enqueue_manual_action( conn, ManualAction( kind=kind, # type: ignore[arg-type] payload_json=json.dumps(payload), created_at=_now(), ), ) finally: conn.close() def _fetch_action(ctx, action_id: int): conn = connect(ctx.db_path) try: row = conn.execute( "SELECT consumed_at, consumed_by, result FROM manual_actions WHERE id = ?", (action_id,), ).fetchone() finally: conn.close() return row @pytest.mark.asyncio async def test_arm_kill_arms_kill_switch(tmp_path: Path) -> None: ctx = _ctx(tmp_path) aid = _enqueue(ctx, "arm_kill", {"reason": "GUI typed yes"}) assert ctx.kill_switch.is_armed() is False n = await consume_manual_actions(ctx, now=_now()) assert n == 1 assert ctx.kill_switch.is_armed() is True row = _fetch_action(ctx, aid) assert row["consumed_by"] == "engine" assert row["result"] == "ok" assert row["consumed_at"] is not None @pytest.mark.asyncio async def test_disarm_kill_disarms_kill_switch(tmp_path: Path) -> None: ctx = _ctx(tmp_path) ctx.kill_switch.arm(reason="prior", source="manual") assert ctx.kill_switch.is_armed() is True aid = _enqueue(ctx, "disarm_kill", {"reason": "operator override"}) n = await consume_manual_actions(ctx, now=_now()) assert n == 1 assert ctx.kill_switch.is_armed() is False row = _fetch_action(ctx, aid) assert row["result"] == "ok" @pytest.mark.asyncio async def test_consumer_drains_queue(tmp_path: Path) -> None: ctx = _ctx(tmp_path) _enqueue(ctx, "arm_kill", {"reason": "first"}) _enqueue(ctx, "disarm_kill", {"reason": "second"}) _enqueue(ctx, "arm_kill", {"reason": "third"}) n = await consume_manual_actions(ctx, now=_now()) assert n == 3 assert ctx.kill_switch.is_armed() is True @pytest.mark.asyncio async def test_unsupported_kind_marked_not_supported(tmp_path: Path) -> None: ctx = _ctx(tmp_path) aid = _enqueue(ctx, "force_close", {"proposal_id": "abc"}) n = await consume_manual_actions(ctx, now=_now()) assert n == 1 row = _fetch_action(ctx, aid) assert row["result"] == "not_supported" @pytest.mark.asyncio async def test_missing_payload_uses_default_reason(tmp_path: Path) -> None: ctx = _ctx(tmp_path) _enqueue(ctx, "arm_kill", {}) n = await consume_manual_actions(ctx, now=_now()) assert n == 1 assert ctx.kill_switch.is_armed() is True @pytest.mark.asyncio async def test_kill_switch_error_caught_and_recorded(tmp_path: Path) -> None: ctx = _ctx(tmp_path) # Replace the kill switch with one whose arm raises. bad_ks = MagicMock() bad_ks.arm.side_effect = KillSwitchError("simulated") bad_ks.is_armed.return_value = False ctx.kill_switch = bad_ks aid = _enqueue(ctx, "arm_kill", {"reason": "x"}) n = await consume_manual_actions(ctx, now=_now()) assert n == 1 row = _fetch_action(ctx, aid) assert "KillSwitchError" in (row["result"] or "") @pytest.mark.asyncio async def test_empty_queue_returns_zero(tmp_path: Path) -> None: ctx = _ctx(tmp_path) n = await consume_manual_actions(ctx, now=_now()) assert n == 0