Files
Cerbero-Bite/src/cerbero_bite/runtime/orchestrator.py
T
root 6ff021fbf4 feat(strategy): abbandono gating settimanale — entry daily 24/7
Crypto opera 24/7: la cadenza settimanale lunedì-only era un retaggio
TradFi senza giustificazione. La nuova cadenza è giornaliera (cron
0 14 * * *), con i gate quantitativi a decidere se entrare o saltare.

Cambiamenti principali:

* runtime/orchestrator.py — _CRON_ENTRY 0 14 * * * (era MON)
* runtime/auto_pause.py — pause_until(days=) (era weeks=); minimo
  clamp 1 giorno (era 1 settimana)
* core/backtest.py — MondayPick→DailyPick, monday_picks→daily_picks
  (1 pick per calendar-day all'ora target); Sharpe annualization su
  ~120 trade/anno (era 52)
* config/schema.py — default cron daily; max_concurrent_positions 1→5;
  AutoPauseConfig.pause_weeks→pause_days, default 14
* runtime/option_chain_snapshot_cycle.py + orchestrator — cron */15
  per accumulo continuo dataset di backtest empirico

Strategy yamls (config_version 1.3.0 → 1.4.0, hash rigenerati):

* strategy.yaml — max_concurrent 1→5, cap_aggregate coerente
* strategy.aggressiva.yaml — max_concurrent 2→8, cap_aggregate
  3200→6400, max_contracts_per_trade invariato a 16
* strategy.conservativa.yaml — max_concurrent 1→3
* tutti — pause_weeks→pause_days: 14

GUI (pages/7_📚_Strategia.py):

* slider Trade/anno: range 20-200 (era 8-30), default 110, help
  riallineato sulla math 365 candidature × pass-rate 30-40%
* card profili: versione letta dinamicamente da config_version invece
  che hard-coded "v1.2.0"
* warning "entrambi perdono soldi" ora valuta i P/L effettivi
  (cons['annual_pl'], aggr['annual_pl']) invece del win_rate grezzo;
  aggiunto stato intermedio quando solo conservativo è in perdita

Tests (450/450 passati):

* test_auto_pause: pause_days, clamp ≥1 giorno
* test_backtest: rinomina + ridisegno daily picks (assert su
  calendar-day dedupe e hour filter)
* test_sizing_engine: other_open_positions=5 per cap default
* test_config_loader: version 1.4.0

Docs (README + 9 file in docs/) — tutti i riferimenti weekly/lunedì
allineati a daily/24-7, volume option_chain ricalcolato per cron
*/15 (~1.1 MB/giorno, ~400 MB/anno).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 16:21:16 +00:00

454 lines
16 KiB
Python

"""Façade that ties the runtime modules into a runnable engine.
The :class:`Orchestrator` is the single entry point for the CLI: it
holds the :class:`RuntimeContext`, the :class:`HealthCheck` state, and
the boot procedure (recover + boot environment check + scheduler
arming). Every concrete cycle is delegated to its own module so each
piece stays independently testable.
"""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from datetime import UTC, datetime
from decimal import Decimal
from pathlib import Path
from typing import Literal
from uuid import uuid4
import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from cerbero_bite.config.mcp_endpoints import McpEndpoints
from cerbero_bite.config.runtime_flags import RuntimeFlags
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.runtime.dependencies import RuntimeContext, build_runtime
from cerbero_bite.runtime.entry_cycle import EntryCycleResult, run_entry_cycle
from cerbero_bite.runtime.health_check import HealthCheck, HealthCheckResult
from cerbero_bite.runtime.lockfile import EngineLock
from cerbero_bite.runtime.manual_actions_consumer import consume_manual_actions
from cerbero_bite.runtime.market_snapshot_cycle import (
DEFAULT_ASSETS,
collect_market_snapshot,
)
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
from cerbero_bite.state import connect as connect_state
__all__ = ["Orchestrator"]
_log = logging.getLogger("cerbero_bite.runtime.orchestrator")
Environment = Literal["testnet", "mainnet"]
# Default cron schedule (matches docs/06-operational-flow.md table).
_CRON_ENTRY = "0 14 * * *" # crypto 24/7: candidatura giornaliera; i gate decidono se entrare
_CRON_MONITOR = "0 2,14 * * *"
_CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "*/15 * * * *" # crypto è 24/7: cadenza continua allineata a market_snapshot
_BACKUP_RETENTION_DAYS = 30
@dataclass(frozen=True)
class _BootResult:
environment: Environment
health: HealthCheckResult
class Orchestrator:
"""Engine façade — boot, scheduler, manual cycle invocation."""
def __init__(
self,
ctx: RuntimeContext,
*,
expected_environment: Environment,
eur_to_usd: Decimal,
flags: RuntimeFlags | None = None,
) -> None:
self._ctx = ctx
self._expected_env = expected_environment
self._eur_to_usd = eur_to_usd
self._flags = flags or RuntimeFlags()
self._health = HealthCheck(ctx, expected_environment=expected_environment)
self._scheduler: AsyncIOScheduler | None = None
@property
def context(self) -> RuntimeContext:
return self._ctx
@property
def expected_environment(self) -> Environment:
return self._expected_env
@property
def flags(self) -> RuntimeFlags:
return self._flags
# ------------------------------------------------------------------
# Boot
# ------------------------------------------------------------------
async def boot(self) -> _BootResult:
"""Reconcile state, verify environment, run a first health probe."""
when = self._ctx.clock()
await self._verify_audit_anchor(now=when)
await recover_state(self._ctx, now=when)
info = await self._ctx.deribit.environment_info()
if info.environment != self._expected_env:
await self._ctx.alert_manager.critical(
source="orchestrator.boot",
message=(
f"Deribit environment mismatch at boot: expected "
f"{self._expected_env}, got {info.environment}"
),
component="runtime.orchestrator",
)
health = await self._health.run(now=when)
self._ctx.audit_log.append(
event="ENGINE_START",
payload={
"environment": info.environment,
"health": health.state,
"config_version": self._ctx.cfg.config_version,
"data_analysis_enabled": self._flags.data_analysis_enabled,
"strategy_enabled": self._flags.strategy_enabled,
},
now=when,
)
_log.info(
"engine started: env=%s health=%s data_analysis=%s strategy=%s",
info.environment,
health.state,
self._flags.data_analysis_enabled,
self._flags.strategy_enabled,
)
return _BootResult(environment=info.environment, health=health)
# ------------------------------------------------------------------
# Cycle invocations (used by scheduler jobs and CLI dry-run)
# ------------------------------------------------------------------
async def _verify_audit_anchor(self, *, now: datetime) -> None: # noqa: ARG002
"""Compare the audit log tail with the SQLite anchor.
``now`` is accepted for symmetry with the other ``boot``
helpers but unused: the comparison is purely between the
in-memory tail hash and the value persisted on the previous
run.
"""
conn = connect_state(self._ctx.db_path)
try:
state = self._ctx.repository.get_system_state(conn)
finally:
conn.close()
if state is None or state.last_audit_hash is None:
return # first boot, nothing to compare against
actual_tail = self._ctx.audit_log.last_hash
if actual_tail != state.last_audit_hash:
await self._ctx.alert_manager.critical(
source="orchestrator.boot",
message=(
f"audit log anchor mismatch: anchor="
f"{state.last_audit_hash[:12]}…, file tail="
f"{actual_tail[:12]}… — possible tampering or truncation"
),
component="safety.audit_log",
)
async def run_entry(
self, *, now: datetime | None = None
) -> EntryCycleResult:
cycle_id = str(uuid4())
token = structlog.contextvars.bind_contextvars(
cycle="entry", cycle_id=cycle_id
)
try:
return await run_entry_cycle(
self._ctx, eur_to_usd_rate=self._eur_to_usd, now=now
)
finally:
structlog.contextvars.reset_contextvars(**token)
async def run_monitor(
self, *, now: datetime | None = None
) -> MonitorCycleResult:
cycle_id = str(uuid4())
token = structlog.contextvars.bind_contextvars(
cycle="monitor", cycle_id=cycle_id
)
try:
return await run_monitor_cycle(self._ctx, now=now)
finally:
structlog.contextvars.reset_contextvars(**token)
async def run_health(
self, *, now: datetime | None = None
) -> HealthCheckResult:
cycle_id = str(uuid4())
token = structlog.contextvars.bind_contextvars(
cycle="health", cycle_id=cycle_id
)
try:
return await self._health.run(now=now)
finally:
structlog.contextvars.reset_contextvars(**token)
# ------------------------------------------------------------------
# Scheduler lifecycle
# ------------------------------------------------------------------
def install_scheduler(
self,
*,
entry_cron: str = _CRON_ENTRY,
monitor_cron: str = _CRON_MONITOR,
health_cron: str = _CRON_HEALTH,
backup_cron: str = _CRON_BACKUP,
manual_actions_cron: str = _CRON_MANUAL_ACTIONS,
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler:
"""Build the scheduler with the canonical job set, ready to start."""
async def _safe(name: str, coro_factory: Callable[[], Awaitable[object]]) -> None:
try:
await coro_factory()
except Exception as exc: # never let a tick kill the scheduler
_log.exception("scheduler tick %s raised", name)
await self._ctx.alert_manager.critical(
source=f"scheduler.{name}",
message=f"{type(exc).__name__}: {exc}",
component=f"runtime.{name}",
)
async def _entry() -> None:
await _safe("entry", self.run_entry)
async def _monitor() -> None:
await _safe("monitor", self.run_monitor)
async def _health() -> None:
await _safe("health", self.run_health)
backups_target = backup_dir or self._ctx.db_path.parent / "backups"
async def _backup() -> None:
async def _do() -> None:
await asyncio.to_thread(
_run_backup,
db_path=self._ctx.db_path,
backup_dir=backups_target,
retention_days=backup_retention_days,
)
await _safe("backup", _do)
async def _run_market_snapshot_via_action() -> None:
await collect_market_snapshot(
self._ctx, assets=market_snapshot_assets
)
async def _manual_actions() -> None:
async def _do() -> None:
await consume_manual_actions(
self._ctx,
cycle_runners={
"entry": self.run_entry,
"monitor": self.run_monitor,
"health": self.run_health,
"market_snapshot": _run_market_snapshot_via_action,
},
)
await _safe("manual_actions", _do)
async def _market_snapshot() -> None:
async def _do() -> None:
await collect_market_snapshot(
self._ctx, assets=market_snapshot_assets
)
await _safe("market_snapshot", _do)
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
await _safe("option_chain_snapshot", _do)
jobs: list[JobSpec] = [
JobSpec(name="health", cron=health_cron, coro_factory=_health),
JobSpec(name="backup", cron=backup_cron, coro_factory=_backup),
JobSpec(
name="manual_actions",
cron=manual_actions_cron,
coro_factory=_manual_actions,
),
]
if self._flags.strategy_enabled:
jobs.append(JobSpec(name="entry", cron=entry_cron, coro_factory=_entry))
jobs.append(
JobSpec(name="monitor", cron=monitor_cron, coro_factory=_monitor)
)
else:
_log.warning(
"strategy disabled (CERBERO_BITE_ENABLE_STRATEGY=false): "
"entry and monitor cycles are NOT scheduled"
)
if self._flags.data_analysis_enabled:
jobs.append(
JobSpec(
name="market_snapshot",
cron=market_snapshot_cron,
coro_factory=_market_snapshot,
)
)
jobs.append(
JobSpec(
name="option_chain_snapshot",
cron=option_chain_cron,
coro_factory=_option_chain_snapshot,
)
)
else:
_log.warning(
"data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS="
"false): market_snapshot job is NOT scheduled"
)
self._scheduler = build_scheduler(jobs)
return self._scheduler
async def run_forever(self, *, lock_path: Path | None = None) -> None:
"""Boot, acquire the single-instance lock, install the scheduler.
``lock_path`` defaults to ``<db_path.parent>/.lockfile`` so two
containers cannot trade against the same SQLite file. SIGTERM
and SIGINT are intercepted so Docker (or the operator) can
signal a clean shutdown — the scheduler is stopped, in-flight
cycles complete, the audit log fsyncs, and the HTTP client is
closed before the process exits.
"""
import signal # noqa: PLC0415 — only needed by run_forever
lock = EngineLock(
lock_path or self._ctx.db_path.parent / ".lockfile"
)
with lock:
try:
await self.boot()
scheduler = self.install_scheduler()
scheduler.start()
stop_event = asyncio.Event()
def _on_signal(signame: str) -> None:
_log.info("received %s — initiating shutdown", signame)
stop_event.set()
loop = asyncio.get_running_loop()
for sig_name in ("SIGTERM", "SIGINT"):
sig = getattr(signal, sig_name, None)
if sig is None: # pragma: no cover — Windows fallback
continue
try:
loop.add_signal_handler(
sig, _on_signal, sig_name
)
except NotImplementedError: # pragma: no cover
# Some sandboxes (Windows asyncio) don't support
# add_signal_handler; fall back to no-op.
signal.signal(sig, lambda *_: stop_event.set())
try:
await stop_event.wait()
finally:
scheduler.shutdown(wait=False)
finally:
await self._ctx.aclose()
def _run_backup(
*, db_path: Path, backup_dir: Path, retention_days: int
) -> None:
"""Synchronous helper invoked from the scheduler via ``asyncio.to_thread``.
Keeps the import of ``scripts.backup`` lazy: importing the module
eagerly at orchestrator load time would mean the scheduler depends
on a script that lives outside the ``cerbero_bite`` package, which
breaks ``importlib.util.spec_from_file_location`` if the cwd shifts
at runtime.
"""
import sys # noqa: PLC0415 — kept lazy to keep module load cheap
from importlib.util import ( # noqa: PLC0415
module_from_spec,
spec_from_file_location,
)
backup_py = Path(__file__).resolve().parents[3] / "scripts" / "backup.py"
spec = spec_from_file_location("_cerbero_bite_backup", backup_py)
if spec is None or spec.loader is None: # pragma: no cover — only on broken installs
raise RuntimeError(f"cannot load scripts/backup.py from {backup_py}")
module = module_from_spec(spec)
sys.modules.setdefault(spec.name, module)
spec.loader.exec_module(module)
module.backup_database(db_path=db_path, backup_dir=backup_dir)
module.prune_backups(backup_dir, retention_days=retention_days)
# ---------------------------------------------------------------------------
# Convenience builder for the CLI
# ---------------------------------------------------------------------------
def make_orchestrator(
*,
cfg: StrategyConfig,
endpoints: McpEndpoints,
token: str,
db_path: Path,
audit_path: Path,
expected_environment: Environment,
eur_to_usd: Decimal,
bot_tag: str | None = None,
flags: RuntimeFlags | None = None,
clock: Callable[[], datetime] | None = None,
) -> Orchestrator:
"""Build a fresh :class:`Orchestrator` ready for ``boot``/``run_*``."""
build_kwargs: dict[str, object] = {
"cfg": cfg,
"endpoints": endpoints,
"token": token,
"db_path": db_path,
"audit_path": audit_path,
"clock": clock or (lambda: datetime.now(UTC)),
}
if bot_tag is not None:
build_kwargs["bot_tag"] = bot_tag
ctx = build_runtime(**build_kwargs) # type: ignore[arg-type]
return Orchestrator(
ctx,
expected_environment=expected_environment,
eur_to_usd=eur_to_usd,
flags=flags,
)