From e8345a29c8f3d9ff8de28d93a1e7bedbefd7c18f Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Thu, 30 Apr 2026 12:33:58 +0200 Subject: [PATCH] =?UTF-8?q?feat(gui+runtime):=20Phase=20D=20=E2=80=94=20ki?= =?UTF-8?q?ll-switch=20arm/disarm=20from=20the=20dashboard?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the GUI's first write path through the manual_actions queue: * runtime/manual_actions_consumer.py — drains the queue and dispatches arm_kill / disarm_kill via KillSwitch (preserving the audit chain). Unsupported kinds (force_close, approve/reject_proposal) are marked result="not_supported" so they don't sit forever. * runtime/orchestrator.py — adds a `manual_actions` job at */1 cron to the canonical scheduler manifest. * gui/data_layer.py — write helpers enqueue_arm_kill / enqueue_disarm_kill (the only write path the GUI uses) plus load_pending_manual_actions for the pending strip. * gui/pages/1_📊_Status.py — kill-switch arm/disarm panel with typed confirmation ("yes I am sure") + reason field; pending-actions table rendered when the queue is non-empty. End-to-end smoke against the testnet state.sqlite: GUI enqueue → consumer dispatch → KillSwitch transition → audit chain hash linkage holds, "source":"manual_gui" recorded. 7 new unit tests for the consumer (arm, disarm, drain, unsupported, default-reason, KillSwitchError handling, empty queue); 360/360 pass. ruff clean; mypy strict src clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cerbero_bite/gui/data_layer.py | 85 +++++++++- src/cerbero_bite/gui/pages/1_📊_Status.py | 100 ++++++++++- .../runtime/manual_actions_consumer.py | 114 +++++++++++++ src/cerbero_bite/runtime/orchestrator.py | 14 ++ tests/integration/test_orchestrator.py | 2 +- tests/unit/test_manual_actions_consumer.py | 159 ++++++++++++++++++ 6 files changed, 470 insertions(+), 4 deletions(-) create mode 100644 src/cerbero_bite/runtime/manual_actions_consumer.py create mode 100644 tests/unit/test_manual_actions_consumer.py diff --git a/src/cerbero_bite/gui/data_layer.py b/src/cerbero_bite/gui/data_layer.py index dd48ee6..4c0e12e 100644 --- a/src/cerbero_bite/gui/data_layer.py +++ b/src/cerbero_bite/gui/data_layer.py @@ -14,6 +14,7 @@ poking at the repository directly. from __future__ import annotations +import json from dataclasses import dataclass from datetime import UTC, datetime, timedelta from decimal import Decimal @@ -27,12 +28,14 @@ from cerbero_bite.safety.audit_log import ( iter_entries, verify_chain, ) -from cerbero_bite.state import Repository, connect +from cerbero_bite.state import Repository, connect, transaction from cerbero_bite.state.models import ( DecisionRecord, + ManualAction, PositionRecord, SystemStateRecord, ) +from cerbero_bite.state.repository import _row_to_manual __all__ = [ "DEFAULT_AUDIT_PATH", @@ -50,12 +53,15 @@ __all__ = [ "compute_kpis", "compute_monthly_stats", "compute_payoff_curve", + "enqueue_arm_kill", + "enqueue_disarm_kill", "load_audit_chain_status", "load_audit_tail", "load_closed_positions", "load_decisions_for_position", "load_engine_snapshot", "load_open_positions", + "load_pending_manual_actions", "load_position_by_id", ] @@ -559,6 +565,83 @@ def compute_distance_metrics( ) +# --------------------------------------------------------------------------- +# Manual actions queue (the GUI's only write path) +# --------------------------------------------------------------------------- + + +def _enqueue_action( + *, + db_path: Path | str, + kind: str, + payload: dict[str, object], + proposal_id: UUID | None = None, +) -> int: + """Insert a row in ``manual_actions``. The engine consumer applies it.""" + db_path = Path(db_path) + repo = Repository() + now = datetime.now(UTC) + conn = connect(db_path) + try: + with transaction(conn): + return repo.enqueue_manual_action( + conn, + ManualAction( + kind=kind, # type: ignore[arg-type] + proposal_id=proposal_id, + payload_json=json.dumps(payload), + created_at=now, + ), + ) + finally: + conn.close() + + +def enqueue_arm_kill( + *, reason: str, db_path: Path | str = DEFAULT_DB_PATH +) -> int: + """Queue an ``arm_kill`` action for the engine consumer.""" + if not reason or not reason.strip(): + raise ValueError("reason is required") + return _enqueue_action( + db_path=db_path, + kind="arm_kill", + payload={"reason": reason.strip()}, + ) + + +def enqueue_disarm_kill( + *, reason: str, db_path: Path | str = DEFAULT_DB_PATH +) -> int: + """Queue a ``disarm_kill`` action for the engine consumer.""" + if not reason or not reason.strip(): + raise ValueError("reason is required") + return _enqueue_action( + db_path=db_path, + kind="disarm_kill", + payload={"reason": reason.strip()}, + ) + + +def load_pending_manual_actions( + *, db_path: Path | str = DEFAULT_DB_PATH +) -> list[ManualAction]: + """All unconsumed actions, oldest first (used for the pending strip).""" + db_path = Path(db_path) + if not db_path.exists(): + return [] + conn = connect(db_path) + try: + rows = conn.execute( + "SELECT * FROM manual_actions WHERE consumed_at IS NULL " + "ORDER BY created_at ASC" + ).fetchall() + finally: + conn.close() + + return [_row_to_manual(row) for row in rows] + + def load_audit_tail( *, audit_path: Path | str = DEFAULT_AUDIT_PATH, diff --git a/src/cerbero_bite/gui/pages/1_📊_Status.py b/src/cerbero_bite/gui/pages/1_📊_Status.py index d4d16df..c0e237b 100644 --- a/src/cerbero_bite/gui/pages/1_📊_Status.py +++ b/src/cerbero_bite/gui/pages/1_📊_Status.py @@ -10,10 +10,14 @@ import streamlit as st from cerbero_bite.gui.data_layer import ( DEFAULT_AUDIT_PATH, DEFAULT_DB_PATH, + EngineSnapshot, + enqueue_arm_kill, + enqueue_disarm_kill, humanize_age, humanize_dt, load_engine_snapshot, load_open_positions, + load_pending_manual_actions, ) @@ -31,6 +35,74 @@ _HEALTH_COLORS = { "unknown": ("⚪", "info"), } +_TYPED_PHRASE = "yes I am sure" + + +def _render_kill_switch_panel(db_path: Path, snap: EngineSnapshot) -> None: + st.subheader("Kill switch controls") + + if snap.kill_switch_armed: + st.warning( + "Kill switch is **armed**. Disarming queues a `disarm_kill` " + "action; the engine consumer applies it on the next minute " + "tick and the transition is recorded in the audit chain." + ) + with st.form("kill_disarm_form", clear_on_submit=True): + reason = st.text_input( + "Reason (required)", + placeholder="e.g. macro window passed", + ) + confirm = st.text_input( + f"Type `{_TYPED_PHRASE}` to confirm", + placeholder=_TYPED_PHRASE, + ) + submitted = st.form_submit_button( + "🟢 Queue disarm", + type="primary", + use_container_width=True, + ) + if submitted: + if confirm.strip() != _TYPED_PHRASE: + st.error(f"Type exactly `{_TYPED_PHRASE}` to confirm.") + elif not reason.strip(): + st.error("Reason is required.") + else: + aid = enqueue_disarm_kill(reason=reason, db_path=db_path) + st.success( + f"✅ disarm queued (id #{aid}). " + "The engine will pick it up within ~1 minute." + ) + else: + st.info( + "Kill switch is **disarmed**. Arming queues an `arm_kill` " + "action; the engine consumer applies it on the next minute tick." + ) + with st.form("kill_arm_form", clear_on_submit=True): + reason = st.text_input( + "Reason (required)", + placeholder="e.g. macro shock — pause trading", + ) + confirm = st.text_input( + f"Type `{_TYPED_PHRASE}` to confirm", + placeholder=_TYPED_PHRASE, + ) + submitted = st.form_submit_button( + "🔴 Queue arm", + type="secondary", + use_container_width=True, + ) + if submitted: + if confirm.strip() != _TYPED_PHRASE: + st.error(f"Type exactly `{_TYPED_PHRASE}` to confirm.") + elif not reason.strip(): + st.error("Reason is required.") + else: + aid = enqueue_arm_kill(reason=reason, db_path=db_path) + st.success( + f"✅ arm queued (id #{aid}). " + "The engine will pick it up within ~1 minute." + ) + def render() -> None: st.title("📊 Status") @@ -54,8 +126,7 @@ def render() -> None: st.error( f"**Kill switch armed** — engine will refuse new entries.\n\n" f"- reason: `{snap.kill_reason or '—'}`\n" - f"- since: `{humanize_dt(snap.kill_at)}`\n\n" - "Disarm via CLI: `cerbero-bite kill-switch disarm --reason ''`" + f"- since: `{humanize_dt(snap.kill_at)}`" ) # Top metrics @@ -69,6 +140,31 @@ def render() -> None: st.divider() + # Kill switch controls + _render_kill_switch_panel(db_path, snap) + + st.divider() + + # Pending manual actions + pending = load_pending_manual_actions(db_path=db_path) + if pending: + st.subheader("Pending manual actions") + st.caption( + "Queued from this dashboard, not yet consumed. The engine " + "drains the queue every minute via the `manual_actions` job." + ) + rows_pending = [ + { + "id": a.id, + "kind": a.kind, + "payload": a.payload_json or "", + "created_at": humanize_dt(a.created_at), + } + for a in pending + ] + st.dataframe(rows_pending, use_container_width=True, hide_index=True) + st.divider() + # Audit anchor st.subheader("Audit anchor") if snap.last_audit_hash is None: diff --git a/src/cerbero_bite/runtime/manual_actions_consumer.py b/src/cerbero_bite/runtime/manual_actions_consumer.py new file mode 100644 index 0000000..53c5d07 --- /dev/null +++ b/src/cerbero_bite/runtime/manual_actions_consumer.py @@ -0,0 +1,114 @@ +"""Consumer of the ``manual_actions`` queue. + +The GUI (and other out-of-band tooling) records operator intent in the +SQLite ``manual_actions`` table; this consumer pulls those rows and +dispatches them through the same primitives the engine uses internally +(``KillSwitch.arm`` / ``disarm``) so the audit chain remains the single +source of truth for state transitions. + +Currently supported kinds: + +* ``arm_kill`` — payload ``{"reason": str}``; arms the kill switch. +* ``disarm_kill`` — payload ``{"reason": str}``; disarms it. + +Future kinds (``force_close``, ``approve_proposal``, +``reject_proposal``) are recognised by the ``ManualAction`` schema but +not yet wired up — the consumer marks them as +``result="not_supported"`` so they don't sit in the queue forever. +""" + +from __future__ import annotations + +import json +import logging +from datetime import UTC, datetime +from typing import TYPE_CHECKING + +from cerbero_bite.safety.kill_switch import KillSwitchError +from cerbero_bite.state import connect, transaction + +if TYPE_CHECKING: + from cerbero_bite.runtime.dependencies import RuntimeContext + +__all__ = ["consume_manual_actions"] + + +_log = logging.getLogger("cerbero_bite.runtime.manual_actions") +_CONSUMER_ID = "engine" + + +def _parse_payload(raw: str | None) -> dict[str, object]: + if not raw: + return {} + try: + parsed = json.loads(raw) + except (TypeError, ValueError): + return {} + return parsed if isinstance(parsed, dict) else {} + + +async def consume_manual_actions( + ctx: RuntimeContext, *, now: datetime | None = None +) -> int: + """Drain the queue. Return the number of actions processed. + + The function is synchronous at heart (SQLite + KillSwitch), but kept + ``async def`` so the orchestrator can register it as an APScheduler + coroutine without an extra wrapper. Each iteration fetches the next + unconsumed row and processes it; the loop terminates when the queue + is empty so a single tick can catch up after a long pause. + """ + reference = (now or datetime.now(UTC)).astimezone(UTC) + processed = 0 + + while True: + conn = connect(ctx.db_path) + try: + action = ctx.repository.next_unconsumed_action(conn) + finally: + conn.close() + if action is None: + break + if action.id is None: + _log.warning("manual_action without id, skipping") + break + + payload = _parse_payload(action.payload_json) + result = "ok" + + try: + if action.kind == "arm_kill": + reason = str(payload.get("reason", "manual via GUI")) + ctx.kill_switch.arm(reason=reason, source="manual_gui") + elif action.kind == "disarm_kill": + reason = str(payload.get("reason", "manual via GUI")) + ctx.kill_switch.disarm(reason=reason, source="manual_gui") + else: + result = "not_supported" + _log.warning( + "manual_action kind=%s not supported yet", action.kind + ) + except KillSwitchError as exc: + _log.exception("kill switch transition failed") + result = f"error: {type(exc).__name__}: {exc}" + except Exception as exc: # pragma: no cover — defensive + _log.exception("manual_action dispatch failed") + result = f"error: {type(exc).__name__}: {exc}" + + conn = connect(ctx.db_path) + try: + with transaction(conn): + ctx.repository.mark_action_consumed( + conn, + action.id, + consumed_by=_CONSUMER_ID, + result=result, + now=reference, + ) + finally: + conn.close() + processed += 1 + + if processed: + _log.info("processed %d manual_actions", processed) + return processed diff --git a/src/cerbero_bite/runtime/orchestrator.py b/src/cerbero_bite/runtime/orchestrator.py index 4cf38a0..002d407 100644 --- a/src/cerbero_bite/runtime/orchestrator.py +++ b/src/cerbero_bite/runtime/orchestrator.py @@ -28,6 +28,7 @@ from cerbero_bite.runtime.dependencies import RuntimeContext, build_runtime from cerbero_bite.runtime.entry_cycle import EntryCycleResult, run_entry_cycle from cerbero_bite.runtime.health_check import HealthCheck, HealthCheckResult from cerbero_bite.runtime.lockfile import EngineLock +from cerbero_bite.runtime.manual_actions_consumer import consume_manual_actions from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle from cerbero_bite.runtime.recovery import recover_state from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler @@ -45,6 +46,7 @@ _CRON_ENTRY = "0 14 * * MON" _CRON_MONITOR = "0 2,14 * * *" _CRON_HEALTH = "*/5 * * * *" _CRON_BACKUP = "0 * * * *" +_CRON_MANUAL_ACTIONS = "*/1 * * * *" _BACKUP_RETENTION_DAYS = 30 @@ -191,6 +193,7 @@ class Orchestrator: monitor_cron: str = _CRON_MONITOR, health_cron: str = _CRON_HEALTH, backup_cron: str = _CRON_BACKUP, + manual_actions_cron: str = _CRON_MANUAL_ACTIONS, backup_dir: Path | None = None, backup_retention_days: int = _BACKUP_RETENTION_DAYS, ) -> AsyncIOScheduler: @@ -229,12 +232,23 @@ class Orchestrator: await _safe("backup", _do) + async def _manual_actions() -> None: + async def _do() -> None: + await consume_manual_actions(self._ctx) + + await _safe("manual_actions", _do) + self._scheduler = build_scheduler( [ JobSpec(name="entry", cron=entry_cron, coro_factory=_entry), JobSpec(name="monitor", cron=monitor_cron, coro_factory=_monitor), JobSpec(name="health", cron=health_cron, coro_factory=_health), JobSpec(name="backup", cron=backup_cron, coro_factory=_backup), + JobSpec( + name="manual_actions", + cron=manual_actions_cron, + coro_factory=_manual_actions, + ), ] ) return self._scheduler diff --git a/tests/integration/test_orchestrator.py b/tests/integration/test_orchestrator.py index ae71c55..5659fab 100644 --- a/tests/integration/test_orchestrator.py +++ b/tests/integration/test_orchestrator.py @@ -114,4 +114,4 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None: orch = _build_orch(tmp_path) sched = orch.install_scheduler() job_ids = {j.id for j in sched.get_jobs()} - assert job_ids == {"entry", "monitor", "health", "backup"} + assert job_ids == {"entry", "monitor", "health", "backup", "manual_actions"} diff --git a/tests/unit/test_manual_actions_consumer.py b/tests/unit/test_manual_actions_consumer.py new file mode 100644 index 0000000..7e88a85 --- /dev/null +++ b/tests/unit/test_manual_actions_consumer.py @@ -0,0 +1,159 @@ +"""Tests for runtime.manual_actions_consumer.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from cerbero_bite.runtime.manual_actions_consumer import consume_manual_actions +from cerbero_bite.safety.audit_log import AuditLog +from cerbero_bite.safety.kill_switch import KillSwitch, KillSwitchError +from cerbero_bite.state import Repository, connect, run_migrations, transaction +from cerbero_bite.state.models import ManualAction + + +def _now() -> datetime: + return datetime(2026, 4, 30, 12, 0, tzinfo=UTC) + + +def _ctx(tmp_path: Path): + db_path = tmp_path / "state.sqlite" + audit_path = tmp_path / "audit.log" + + repo = Repository() + conn = connect(db_path) + run_migrations(conn) + with transaction(conn): + repo.init_system_state(conn, config_version="1.0.0", now=_now()) + conn.close() + + audit = AuditLog(audit_path) + ks = KillSwitch( + connection_factory=lambda: connect(db_path), + repository=repo, + audit_log=audit, + clock=_now, + ) + + ctx = MagicMock() + ctx.db_path = db_path + ctx.repository = repo + ctx.kill_switch = ks + ctx.audit_log = audit + return ctx + + +def _enqueue(ctx, kind: str, payload: dict[str, object]) -> int: + conn = connect(ctx.db_path) + try: + with transaction(conn): + return ctx.repository.enqueue_manual_action( + conn, + ManualAction( + kind=kind, # type: ignore[arg-type] + payload_json=json.dumps(payload), + created_at=_now(), + ), + ) + finally: + conn.close() + + +def _fetch_action(ctx, action_id: int): + conn = connect(ctx.db_path) + try: + row = conn.execute( + "SELECT consumed_at, consumed_by, result FROM manual_actions WHERE id = ?", + (action_id,), + ).fetchone() + finally: + conn.close() + return row + + +@pytest.mark.asyncio +async def test_arm_kill_arms_kill_switch(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + aid = _enqueue(ctx, "arm_kill", {"reason": "GUI typed yes"}) + assert ctx.kill_switch.is_armed() is False + + n = await consume_manual_actions(ctx, now=_now()) + assert n == 1 + assert ctx.kill_switch.is_armed() is True + + row = _fetch_action(ctx, aid) + assert row["consumed_by"] == "engine" + assert row["result"] == "ok" + assert row["consumed_at"] is not None + + +@pytest.mark.asyncio +async def test_disarm_kill_disarms_kill_switch(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + ctx.kill_switch.arm(reason="prior", source="manual") + assert ctx.kill_switch.is_armed() is True + + aid = _enqueue(ctx, "disarm_kill", {"reason": "operator override"}) + n = await consume_manual_actions(ctx, now=_now()) + assert n == 1 + assert ctx.kill_switch.is_armed() is False + row = _fetch_action(ctx, aid) + assert row["result"] == "ok" + + +@pytest.mark.asyncio +async def test_consumer_drains_queue(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + _enqueue(ctx, "arm_kill", {"reason": "first"}) + _enqueue(ctx, "disarm_kill", {"reason": "second"}) + _enqueue(ctx, "arm_kill", {"reason": "third"}) + + n = await consume_manual_actions(ctx, now=_now()) + assert n == 3 + assert ctx.kill_switch.is_armed() is True + + +@pytest.mark.asyncio +async def test_unsupported_kind_marked_not_supported(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + aid = _enqueue(ctx, "force_close", {"proposal_id": "abc"}) + n = await consume_manual_actions(ctx, now=_now()) + assert n == 1 + row = _fetch_action(ctx, aid) + assert row["result"] == "not_supported" + + +@pytest.mark.asyncio +async def test_missing_payload_uses_default_reason(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + _enqueue(ctx, "arm_kill", {}) + n = await consume_manual_actions(ctx, now=_now()) + assert n == 1 + assert ctx.kill_switch.is_armed() is True + + +@pytest.mark.asyncio +async def test_kill_switch_error_caught_and_recorded(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + # Replace the kill switch with one whose arm raises. + bad_ks = MagicMock() + bad_ks.arm.side_effect = KillSwitchError("simulated") + bad_ks.is_armed.return_value = False + ctx.kill_switch = bad_ks + + aid = _enqueue(ctx, "arm_kill", {"reason": "x"}) + n = await consume_manual_actions(ctx, now=_now()) + assert n == 1 + row = _fetch_action(ctx, aid) + assert "KillSwitchError" in (row["result"] or "") + + +@pytest.mark.asyncio +async def test_empty_queue_returns_zero(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + n = await consume_manual_actions(ctx, now=_now()) + assert n == 0