Files
Cerbero-Bite/tests/unit/test_manual_actions_consumer.py
Adriano 63d1aa4262 feat(gui): traduzione italiana, logo Cerbero, saldi live e Forza ciclo
* Localizzazione italiana di tutte le pagine (Stato, Audit, Equity,
  Storico, Posizione) e della home; date relative ("5s fa", "12m fa").
* Logo Cerbero (cane a tre teste) in src/cerbero_bite/gui/assets/
  cerbero_logo.png — sostituisce l'emoji 🐺 (lupo, semanticamente
  errata) sia come favicon (`page_icon`) sia in sidebar e header.
* Caricamento automatico di `.env` dal CWD all'avvio della CLI (skip
  sotto pytest tramite PYTEST_CURRENT_TEST), evitando di doversi
  esportare manualmente le 4 URL MCP. Aggiunto python-dotenv come
  dipendenza, `.env.example` committato come template, `.env` resta
  ignorato da git.
* Pagina Stato: nuovo pannello "Saldi exchange" che fa fetch live
  via gateway MCP (Deribit USDC + USDT, Hyperliquid USDC + opzionale
  USDT spot) con cache TTL 60s e bottone refresh; tile riassuntivi
  totale USD / EUR / cambio.
* Pagina Stato: nuovo pannello "Forza ciclo" con tre bottoni
  (entry/monitor/health) che accodano azioni `run_cycle` nella tabella
  manual_actions; il consumer dell'engine — quando in esecuzione —
  dispatcha al `Orchestrator.run_*` corrispondente.
* manual_actions: nuovo `kind="run_cycle"` nello schema
  ManualAction; consumer accetta dict di cycle_runners che
  l'orchestrator popola in install_scheduler. 3 nuovi test (dispatch
  entry, ciclo sconosciuto, fallback senza runner).
* gui/live_data.py — modulo dedicato al fetch MCP dalla GUI
  (relax controllato della regola "no MCP from GUI" solo per i saldi,
  non per i dati di trading).

363/363 tests pass; ruff clean; mypy strict src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 14:11:40 +02:00

206 lines
5.9 KiB
Python

"""Tests for runtime.manual_actions_consumer."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from cerbero_bite.runtime.manual_actions_consumer import consume_manual_actions
from cerbero_bite.safety.audit_log import AuditLog
from cerbero_bite.safety.kill_switch import KillSwitch, KillSwitchError
from cerbero_bite.state import Repository, connect, run_migrations, transaction
from cerbero_bite.state.models import ManualAction
def _now() -> datetime:
return datetime(2026, 4, 30, 12, 0, tzinfo=UTC)
def _ctx(tmp_path: Path):
db_path = tmp_path / "state.sqlite"
audit_path = tmp_path / "audit.log"
repo = Repository()
conn = connect(db_path)
run_migrations(conn)
with transaction(conn):
repo.init_system_state(conn, config_version="1.0.0", now=_now())
conn.close()
audit = AuditLog(audit_path)
ks = KillSwitch(
connection_factory=lambda: connect(db_path),
repository=repo,
audit_log=audit,
clock=_now,
)
ctx = MagicMock()
ctx.db_path = db_path
ctx.repository = repo
ctx.kill_switch = ks
ctx.audit_log = audit
return ctx
def _enqueue(ctx, kind: str, payload: dict[str, object]) -> int:
conn = connect(ctx.db_path)
try:
with transaction(conn):
return ctx.repository.enqueue_manual_action(
conn,
ManualAction(
kind=kind, # type: ignore[arg-type]
payload_json=json.dumps(payload),
created_at=_now(),
),
)
finally:
conn.close()
def _fetch_action(ctx, action_id: int):
conn = connect(ctx.db_path)
try:
row = conn.execute(
"SELECT consumed_at, consumed_by, result FROM manual_actions WHERE id = ?",
(action_id,),
).fetchone()
finally:
conn.close()
return row
@pytest.mark.asyncio
async def test_arm_kill_arms_kill_switch(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
aid = _enqueue(ctx, "arm_kill", {"reason": "GUI typed yes"})
assert ctx.kill_switch.is_armed() is False
n = await consume_manual_actions(ctx, now=_now())
assert n == 1
assert ctx.kill_switch.is_armed() is True
row = _fetch_action(ctx, aid)
assert row["consumed_by"] == "engine"
assert row["result"] == "ok"
assert row["consumed_at"] is not None
@pytest.mark.asyncio
async def test_disarm_kill_disarms_kill_switch(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
ctx.kill_switch.arm(reason="prior", source="manual")
assert ctx.kill_switch.is_armed() is True
aid = _enqueue(ctx, "disarm_kill", {"reason": "operator override"})
n = await consume_manual_actions(ctx, now=_now())
assert n == 1
assert ctx.kill_switch.is_armed() is False
row = _fetch_action(ctx, aid)
assert row["result"] == "ok"
@pytest.mark.asyncio
async def test_consumer_drains_queue(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
_enqueue(ctx, "arm_kill", {"reason": "first"})
_enqueue(ctx, "disarm_kill", {"reason": "second"})
_enqueue(ctx, "arm_kill", {"reason": "third"})
n = await consume_manual_actions(ctx, now=_now())
assert n == 3
assert ctx.kill_switch.is_armed() is True
@pytest.mark.asyncio
async def test_unsupported_kind_marked_not_supported(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
aid = _enqueue(ctx, "force_close", {"proposal_id": "abc"})
n = await consume_manual_actions(ctx, now=_now())
assert n == 1
row = _fetch_action(ctx, aid)
assert row["result"] == "not_supported"
@pytest.mark.asyncio
async def test_missing_payload_uses_default_reason(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
_enqueue(ctx, "arm_kill", {})
n = await consume_manual_actions(ctx, now=_now())
assert n == 1
assert ctx.kill_switch.is_armed() is True
@pytest.mark.asyncio
async def test_kill_switch_error_caught_and_recorded(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
# Replace the kill switch with one whose arm raises.
bad_ks = MagicMock()
bad_ks.arm.side_effect = KillSwitchError("simulated")
bad_ks.is_armed.return_value = False
ctx.kill_switch = bad_ks
aid = _enqueue(ctx, "arm_kill", {"reason": "x"})
n = await consume_manual_actions(ctx, now=_now())
assert n == 1
row = _fetch_action(ctx, aid)
assert "KillSwitchError" in (row["result"] or "")
@pytest.mark.asyncio
async def test_empty_queue_returns_zero(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
n = await consume_manual_actions(ctx, now=_now())
assert n == 0
@pytest.mark.asyncio
async def test_run_cycle_dispatches_to_runner(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
calls: list[str] = []
async def _entry() -> None:
calls.append("entry")
aid = _enqueue(ctx, "run_cycle", {"cycle": "entry"})
n = await consume_manual_actions(
ctx, cycle_runners={"entry": _entry}, now=_now()
)
assert n == 1
assert calls == ["entry"]
row = _fetch_action(ctx, aid)
assert row["result"] == "ok: ran entry"
@pytest.mark.asyncio
async def test_run_cycle_unknown_marked_error(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
async def _entry() -> None:
raise AssertionError("should not run")
aid = _enqueue(ctx, "run_cycle", {"cycle": "monitor"})
n = await consume_manual_actions(
ctx, cycle_runners={"entry": _entry}, now=_now()
)
assert n == 1
row = _fetch_action(ctx, aid)
assert "unknown cycle" in (row["result"] or "")
@pytest.mark.asyncio
async def test_run_cycle_without_runners_marks_not_supported(
tmp_path: Path,
) -> None:
ctx = _ctx(tmp_path)
aid = _enqueue(ctx, "run_cycle", {"cycle": "entry"})
n = await consume_manual_actions(ctx, now=_now())
assert n == 1
row = _fetch_action(ctx, aid)
assert row["result"] == "not_supported"