Files
Cerbero-Bite/src/cerbero_bite/runtime/orchestrator.py
T
Adriano e8345a29c8 feat(gui+runtime): Phase D — kill-switch arm/disarm from the dashboard
Wires the GUI's first write path through the manual_actions queue:

* runtime/manual_actions_consumer.py — drains the queue and
  dispatches arm_kill / disarm_kill via KillSwitch (preserving the
  audit chain). Unsupported kinds (force_close, approve/reject_proposal)
  are marked result="not_supported" so they don't sit forever.
* runtime/orchestrator.py — adds a `manual_actions` job at */1 cron
  to the canonical scheduler manifest.
* gui/data_layer.py — write helpers enqueue_arm_kill /
  enqueue_disarm_kill (the only write path the GUI uses) plus
  load_pending_manual_actions for the pending strip.
* gui/pages/1_📊_Status.py — kill-switch arm/disarm panel with typed
  confirmation ("yes I am sure") + reason field; pending-actions table
  rendered when the queue is non-empty.

End-to-end smoke against the testnet state.sqlite:
  GUI enqueue → consumer dispatch → KillSwitch transition → audit
  chain hash linkage holds, "source":"manual_gui" recorded.

7 new unit tests for the consumer (arm, disarm, drain, unsupported,
default-reason, KillSwitchError handling, empty queue); 360/360 pass.
ruff clean; mypy strict src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 12:33:58 +02:00

360 lines
13 KiB
Python

"""Façade that ties the runtime modules into a runnable engine.
The :class:`Orchestrator` is the single entry point for the CLI: it
holds the :class:`RuntimeContext`, the :class:`HealthCheck` state, and
the boot procedure (recover + boot environment check + scheduler
arming). Every concrete cycle is delegated to its own module so each
piece stays independently testable.
"""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from datetime import UTC, datetime
from decimal import Decimal
from pathlib import Path
from typing import Literal
from uuid import uuid4
import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from cerbero_bite.config.mcp_endpoints import McpEndpoints
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.runtime.dependencies import RuntimeContext, build_runtime
from cerbero_bite.runtime.entry_cycle import EntryCycleResult, run_entry_cycle
from cerbero_bite.runtime.health_check import HealthCheck, HealthCheckResult
from cerbero_bite.runtime.lockfile import EngineLock
from cerbero_bite.runtime.manual_actions_consumer import consume_manual_actions
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
from cerbero_bite.state import connect as connect_state
__all__ = ["Orchestrator"]
_log = logging.getLogger("cerbero_bite.runtime.orchestrator")
Environment = Literal["testnet", "mainnet"]
# Default cron schedule (matches docs/06-operational-flow.md table).
_CRON_ENTRY = "0 14 * * MON"
_CRON_MONITOR = "0 2,14 * * *"
_CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *"
_BACKUP_RETENTION_DAYS = 30
@dataclass(frozen=True)
class _BootResult:
environment: Environment
health: HealthCheckResult
class Orchestrator:
"""Engine façade — boot, scheduler, manual cycle invocation."""
def __init__(
self,
ctx: RuntimeContext,
*,
expected_environment: Environment,
eur_to_usd: Decimal,
) -> None:
self._ctx = ctx
self._expected_env = expected_environment
self._eur_to_usd = eur_to_usd
self._health = HealthCheck(ctx, expected_environment=expected_environment)
self._scheduler: AsyncIOScheduler | None = None
@property
def context(self) -> RuntimeContext:
return self._ctx
@property
def expected_environment(self) -> Environment:
return self._expected_env
# ------------------------------------------------------------------
# Boot
# ------------------------------------------------------------------
async def boot(self) -> _BootResult:
"""Reconcile state, verify environment, run a first health probe."""
when = self._ctx.clock()
await self._verify_audit_anchor(now=when)
await recover_state(self._ctx, now=when)
info = await self._ctx.deribit.environment_info()
if info.environment != self._expected_env:
await self._ctx.alert_manager.critical(
source="orchestrator.boot",
message=(
f"Deribit environment mismatch at boot: expected "
f"{self._expected_env}, got {info.environment}"
),
component="runtime.orchestrator",
)
health = await self._health.run(now=when)
self._ctx.audit_log.append(
event="ENGINE_START",
payload={
"environment": info.environment,
"health": health.state,
"config_version": self._ctx.cfg.config_version,
},
now=when,
)
return _BootResult(environment=info.environment, health=health)
# ------------------------------------------------------------------
# Cycle invocations (used by scheduler jobs and CLI dry-run)
# ------------------------------------------------------------------
async def _verify_audit_anchor(self, *, now: datetime) -> None: # noqa: ARG002
"""Compare the audit log tail with the SQLite anchor.
``now`` is accepted for symmetry with the other ``boot``
helpers but unused: the comparison is purely between the
in-memory tail hash and the value persisted on the previous
run.
"""
conn = connect_state(self._ctx.db_path)
try:
state = self._ctx.repository.get_system_state(conn)
finally:
conn.close()
if state is None or state.last_audit_hash is None:
return # first boot, nothing to compare against
actual_tail = self._ctx.audit_log.last_hash
if actual_tail != state.last_audit_hash:
await self._ctx.alert_manager.critical(
source="orchestrator.boot",
message=(
f"audit log anchor mismatch: anchor="
f"{state.last_audit_hash[:12]}…, file tail="
f"{actual_tail[:12]}… — possible tampering or truncation"
),
component="safety.audit_log",
)
async def run_entry(
self, *, now: datetime | None = None
) -> EntryCycleResult:
cycle_id = str(uuid4())
token = structlog.contextvars.bind_contextvars(
cycle="entry", cycle_id=cycle_id
)
try:
return await run_entry_cycle(
self._ctx, eur_to_usd_rate=self._eur_to_usd, now=now
)
finally:
structlog.contextvars.reset_contextvars(**token)
async def run_monitor(
self, *, now: datetime | None = None
) -> MonitorCycleResult:
cycle_id = str(uuid4())
token = structlog.contextvars.bind_contextvars(
cycle="monitor", cycle_id=cycle_id
)
try:
return await run_monitor_cycle(self._ctx, now=now)
finally:
structlog.contextvars.reset_contextvars(**token)
async def run_health(
self, *, now: datetime | None = None
) -> HealthCheckResult:
cycle_id = str(uuid4())
token = structlog.contextvars.bind_contextvars(
cycle="health", cycle_id=cycle_id
)
try:
return await self._health.run(now=now)
finally:
structlog.contextvars.reset_contextvars(**token)
# ------------------------------------------------------------------
# Scheduler lifecycle
# ------------------------------------------------------------------
def install_scheduler(
self,
*,
entry_cron: str = _CRON_ENTRY,
monitor_cron: str = _CRON_MONITOR,
health_cron: str = _CRON_HEALTH,
backup_cron: str = _CRON_BACKUP,
manual_actions_cron: str = _CRON_MANUAL_ACTIONS,
backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler:
"""Build the scheduler with the canonical job set, ready to start."""
async def _safe(name: str, coro_factory: Callable[[], Awaitable[object]]) -> None:
try:
await coro_factory()
except Exception as exc: # never let a tick kill the scheduler
_log.exception("scheduler tick %s raised", name)
await self._ctx.alert_manager.critical(
source=f"scheduler.{name}",
message=f"{type(exc).__name__}: {exc}",
component=f"runtime.{name}",
)
async def _entry() -> None:
await _safe("entry", self.run_entry)
async def _monitor() -> None:
await _safe("monitor", self.run_monitor)
async def _health() -> None:
await _safe("health", self.run_health)
backups_target = backup_dir or self._ctx.db_path.parent / "backups"
async def _backup() -> None:
async def _do() -> None:
await asyncio.to_thread(
_run_backup,
db_path=self._ctx.db_path,
backup_dir=backups_target,
retention_days=backup_retention_days,
)
await _safe("backup", _do)
async def _manual_actions() -> None:
async def _do() -> None:
await consume_manual_actions(self._ctx)
await _safe("manual_actions", _do)
self._scheduler = build_scheduler(
[
JobSpec(name="entry", cron=entry_cron, coro_factory=_entry),
JobSpec(name="monitor", cron=monitor_cron, coro_factory=_monitor),
JobSpec(name="health", cron=health_cron, coro_factory=_health),
JobSpec(name="backup", cron=backup_cron, coro_factory=_backup),
JobSpec(
name="manual_actions",
cron=manual_actions_cron,
coro_factory=_manual_actions,
),
]
)
return self._scheduler
async def run_forever(self, *, lock_path: Path | None = None) -> None:
"""Boot, acquire the single-instance lock, install the scheduler.
``lock_path`` defaults to ``<db_path.parent>/.lockfile`` so two
containers cannot trade against the same SQLite file. SIGTERM
and SIGINT are intercepted so Docker (or the operator) can
signal a clean shutdown — the scheduler is stopped, in-flight
cycles complete, the audit log fsyncs, and the HTTP client is
closed before the process exits.
"""
import signal # noqa: PLC0415 — only needed by run_forever
lock = EngineLock(
lock_path or self._ctx.db_path.parent / ".lockfile"
)
with lock:
try:
await self.boot()
scheduler = self.install_scheduler()
scheduler.start()
stop_event = asyncio.Event()
def _on_signal(signame: str) -> None:
_log.info("received %s — initiating shutdown", signame)
stop_event.set()
loop = asyncio.get_running_loop()
for sig_name in ("SIGTERM", "SIGINT"):
sig = getattr(signal, sig_name, None)
if sig is None: # pragma: no cover — Windows fallback
continue
try:
loop.add_signal_handler(
sig, _on_signal, sig_name
)
except NotImplementedError: # pragma: no cover
# Some sandboxes (Windows asyncio) don't support
# add_signal_handler; fall back to no-op.
signal.signal(sig, lambda *_: stop_event.set())
try:
await stop_event.wait()
finally:
scheduler.shutdown(wait=False)
finally:
await self._ctx.aclose()
def _run_backup(
*, db_path: Path, backup_dir: Path, retention_days: int
) -> None:
"""Synchronous helper invoked from the scheduler via ``asyncio.to_thread``.
Keeps the import of ``scripts.backup`` lazy: importing the module
eagerly at orchestrator load time would mean the scheduler depends
on a script that lives outside the ``cerbero_bite`` package, which
breaks ``importlib.util.spec_from_file_location`` if the cwd shifts
at runtime.
"""
import sys # noqa: PLC0415 — kept lazy to keep module load cheap
from importlib.util import ( # noqa: PLC0415
module_from_spec,
spec_from_file_location,
)
backup_py = Path(__file__).resolve().parents[3] / "scripts" / "backup.py"
spec = spec_from_file_location("_cerbero_bite_backup", backup_py)
if spec is None or spec.loader is None: # pragma: no cover — only on broken installs
raise RuntimeError(f"cannot load scripts/backup.py from {backup_py}")
module = module_from_spec(spec)
sys.modules.setdefault(spec.name, module)
spec.loader.exec_module(module)
module.backup_database(db_path=db_path, backup_dir=backup_dir)
module.prune_backups(backup_dir, retention_days=retention_days)
# ---------------------------------------------------------------------------
# Convenience builder for the CLI
# ---------------------------------------------------------------------------
def make_orchestrator(
*,
cfg: StrategyConfig,
endpoints: McpEndpoints,
token: str,
db_path: Path,
audit_path: Path,
expected_environment: Environment,
eur_to_usd: Decimal,
clock: Callable[[], datetime] | None = None,
) -> Orchestrator:
"""Build a fresh :class:`Orchestrator` ready for ``boot``/``run_*``."""
ctx = build_runtime(
cfg=cfg,
endpoints=endpoints,
token=token,
db_path=db_path,
audit_path=audit_path,
clock=clock or (lambda: datetime.now(UTC)),
)
return Orchestrator(
ctx, expected_environment=expected_environment, eur_to_usd=eur_to_usd
)