diff --git a/docs/06-operational-flow.md b/docs/06-operational-flow.md index e7a215e..f11ff2a 100644 --- a/docs/06-operational-flow.md +++ b/docs/06-operational-flow.md @@ -1,28 +1,31 @@ # 06 — Flussi operativi -Tre flussi principali, tutti orchestrati dallo scheduler interno -APScheduler. +I quattro flussi principali del rule engine, tutti orchestrati dallo +scheduler interno APScheduler. Aggiornati al modello di esercizio +**autonomo notify-only** introdotto in Fase 3: Cerbero Bite consulta i +server MCP della suite per leggere il mercato e per inviare l'ordine, +non chiede mai conferma a Adriano e usa Telegram esclusivamente per +notificare quanto fatto. ## Flusso 1 — Avvio engine ``` 1. Carica strategy.yaml + strategy.local.yaml (override) -2. Validazione schema (pydantic) — fail veloce +2. Validazione schema + verifica config_hash 3. Acquisisci lock file (data/.lockfile) -4. Apri SQLite, esegui migrations -5. Health check MCP server (versioni + ping) -6. Riconciliatore stato: - a. legge positions con status in {awaiting_fill, open, closing} - b. per ognuna chiama cerbero-memory.is_acknowledged + Deribit get_positions - c. allinea SQLite alla realtà del broker (sempre la realtà vince) - d. log delle discrepanze come WARNING +4. Apri SQLite, esegui migrations, init_system_state +5. Health check MCP (environment_info, ping read tools) + - cerbero-deribit.environment_info → confronta con + strategy.execution.environment; mismatch → kill switch +6. Riconciliatore stato (vedi flusso 6) 7. Health check sistema OK? → arma scheduler KO? → kill switch + alert + idle -8. Log ENGINE_START +8. Audit log: ENGINE_START ``` L'avvio è progettato per essere **safe**: se qualcosa non torna, il -sistema si rifiuta di operare. Mai partire con uno stato dubbio. +sistema si rifiuta di operare. Mai partire con uno stato dubbio o un +ambiente diverso da quello atteso. ## Flusso 2 — Settimanale (entry) @@ -31,44 +34,45 @@ Trigger: cron `0 14 * * MON` (lunedì 14:00 UTC). ``` START ├── safety.system_healthy()? → no → log + skip - ├── state.has_open_position()? → yes → log + skip + ├── repository.has_open_position()? → yes → log + skip ├── snapshot dati di mercato (parallel): - │ spot, dvol, funding_perp, funding_cross, macro_calendar, - │ holdings_pct, capital + │ spot_eth, dvol, funding_perp, + │ funding_cross, macro_calendar, + │ eth_holdings_pct, capital_eur ├── entry_validator.validate_entry → fail → log ENTRY_REJECTED + reasons ├── entry_validator.compute_bias → None → log ENTRY_REJECTED ("no_bias") - ├── deribit.get_options_chain (DTE 14-21) + ├── deribit.options_chain (DTE 14-21) ├── combo_builder.select_strikes → None → log ENTRY_REJECTED ("no_strike") - ├── deribit.get_orderbook per le 2 gambe + ├── deribit.orderbook_depth_top3 per le 2 gambe ├── liquidity_gate.check → fail → log ENTRY_REJECTED ("illiquid") ├── sizing_engine.compute_contracts → 0 → log ENTRY_REJECTED ("undersize") ├── combo_builder.build → ComboProposal ├── greeks_aggregator.aggregate - ├── reporting.pre_trade(proposal) → testo Telegram - ├── telegram.request_confirmation(timeout=60min) - │ ├── confermato: - │ │ ├── state.create_position (status="proposed") - │ │ ├── memory.push_user_instruction - │ │ ├── state.update(status="awaiting_fill") - │ │ └── log ENTRY_CONFIRMED - │ ├── rifiutato: - │ │ └── log ENTRY_REJECTED_BY_USER - │ └── timeout: - │ └── log ENTRY_TIMEOUT + ├── repository.create_position(status="proposed") + ├── deribit.place_combo_order + │ ├── ordine accettato (state=open|filled): + │ │ ├── repository.update_position_status(awaiting_fill|open) + │ │ ├── repository.create_instruction + │ │ ├── audit ENTRY_PLACED + │ │ └── telegram.notify_position_opened (post-fact) + │ └── ordine rigettato: + │ ├── repository.update_position_status(cancelled) + │ ├── audit ENTRY_REJECTED_BY_BROKER + │ └── telegram.notify_alert (priority=high) └── END ``` +Nessuna conferma manuale: la decisione di apertura è il risultato +deterministico delle regole, non una proposta soggetta ad +approvazione. Il messaggio Telegram è informativo. + ### Tempo previsto end-to-end -- Snapshot dati: 3-5 secondi -- Algoritmi puri: < 0.5 secondi -- Conferma utente: variabile (max 60 min) +- Snapshot dati: 3-5 secondi. +- Algoritmi puri: < 0.5 secondi. +- `place_combo_order` su testnet: 1-3 secondi. -Il critical path automatico è **sotto 10 secondi**. La latenza umana -non rallenta i prezzi: se Adriano risponde dopo 30 minuti, l'engine -prima di inviare l'istruzione **rivaluta** spot, mid e dvol e abortisce -l'apertura se le condizioni sono cambiate (slippage > 8% o dvol fuori -banda o macro nuova). +Critical path completo sotto 10 secondi nominali. ## Flusso 3 — Monitoring (12h) @@ -77,129 +81,118 @@ Trigger: cron `0 2,14 * * *` (02:00 e 14:00 UTC, ogni giorno). ``` START ├── safety.system_healthy()? → no → kill switch (no auto-close ciechi) + ├── salva snapshot in dvol_history (per il calcolo di return_4h) ├── per ogni position con status="open": │ ├── snapshot: - │ │ spot, dvol, mark_combo, delta_short, return_4h + │ │ spot, dvol, mark_combo (= mid_short - mid_long), + │ │ delta_short, return_4h (vs dvol_history 4h fa) │ ├── exit_decision.evaluate │ ├── action == HOLD: │ │ └── log EXIT_EVALUATED("hold") │ ├── action == CLOSE_*: - │ │ ├── reporting.exit_proposal(position, decision) - │ │ ├── telegram.request_confirmation(timeout=30min) - │ │ │ ├── confermato: - │ │ │ │ ├── memory.push_user_instruction (close) - │ │ │ │ ├── state.update(status="closing") - │ │ │ │ └── log EXIT_CONFIRMED - │ │ │ ├── rifiutato: - │ │ │ │ └── log EXIT_DEFERRED (resterà in HOLD fino a prossimo ciclo) - │ │ │ └── timeout: - │ │ │ └── escalation: per CLOSE_STOP/CLOSE_VOL/CLOSE_DELTA - │ │ │ → invia comunque l'istruzione (urgenza prevale) - │ │ │ per altri motivi → log EXIT_TIMEOUT, attende prossimo ciclo + │ │ ├── repository.update_position_status("closing") + │ │ ├── deribit.place_combo_order (direzione inversa) + │ │ │ ├── filled: + │ │ │ │ ├── repository.update_position_status("closed") + │ │ │ │ ├── audit EXIT_FILLED + │ │ │ │ └── telegram.notify_position_closed + │ │ │ └── rejected: + │ │ │ ├── repository.update_position_status("open") + │ │ │ └── telegram.notify_alert (priority=critical, + │ │ │ source="exit_failed") │ └── continue └── END ``` -### Politica di escalation +Non c'è un ramo "richiesta conferma utente". Ogni `CLOSE_*` viene +eseguito immediatamente; il messaggio Telegram è post-fact e descrive +azione + motivo (formato `notify_position_closed`). -I trigger di uscita non sono uguali. Alcuni hanno **urgenza alta** che -giustifica l'esecuzione anche senza conferma utente entro la finestra: - -| Trigger | Urgenza | Comportamento su timeout | -|---|---|---| -| `CLOSE_PROFIT` | Bassa | Attendi prossimo ciclo (può migliorare) | -| `CLOSE_STOP` | **Alta** | Esegui chiusura senza conferma esplicita | -| `CLOSE_VOL` | **Alta** | Esegui chiusura senza conferma | -| `CLOSE_DELTA` | **Alta** | Esegui chiusura senza conferma | -| `CLOSE_TIME` | Media | Attendi 1 ciclo extra; al secondo timeout esegui | -| `CLOSE_AVERSE` | Media | Attendi prossimo ciclo | - -Su escalation hard, Telegram riceve un messaggio **post-fact**: "ho -chiuso la posizione X per stop loss perché non ho ricevuto risposta in -30 min e l'urgenza non consentiva attesa". +In caso di fallimento del `place_combo_order` di chiusura (broker +respinge, latenza > soglia, ecc.) la posizione viene rimessa in +`open` e generato un alert `critical`: sarà il prossimo ciclo a +ritentare. ## Flusso 4 — Mensile (Kelly recalibration) Trigger: cron `0 12 1 * *` (primo di ogni mese alle 12:00 UTC). ``` -1. Carica trades chiusi negli ultimi 365 giorni +1. Carica trades chiusi negli ultimi 365 giorni dal repository 2. kelly_recalibration.recalibrate -3. Genera report mensile: - - performance vs simulazione MC - - win rate, avg win/loss, edge - - kelly suggerito vs corrente - - violazioni regole (deve essere 0) -4. telegram.send (informativo, no conferma) -5. brain_bridge.write_note( - path="strategies/cerberus-bite/monthly-report-YYYY-MM.md", - content=report - ) -6. log KELLY_RECALIBRATED +3. Genera report mensile testuale (markdown) +4. telegram.notify (priority=normal, tag="kelly") +5. salva report come allegato al log JSONL del giorno +6. audit KELLY_RECALIBRATED ``` L'aggiornamento di `strategy.yaml` resta **manuale**: Adriano legge il -report, discute con Milito, e committa il nuovo file di config (con -giustificazione nel commit message). Nessun auto-update. +report e committa il nuovo file (con giustificazione nel commit +message). Nessun auto-update del kelly_fraction. ## Flusso 5 — Health check periodico Trigger: ogni 5 minuti. ``` -1. ping ogni MCP usato → ms latency +1. Per ogni MCP utilizzato: probe lightweight + - deribit.environment_info + - macro.get_macro_calendar(days=1) + - sentiment.get_cross_exchange_funding (no asset filter) + - hyperliquid.get_funding_rate("ETH") + - portfolio.get_total_portfolio_value + - telegram: skip (notify-only, no probe non invasivo) 2. SQLite read-write probe (transazione fittizia) -3. lock file ancora valido -4. ultima fill di Cerbero core ricevuta entro tempo atteso -5. state.system_state.kill_switch == 0 -Se qualsiasi passo fallisce: - - log HEALTH_DEGRADED - - se 3 fallimenti consecutivi → kill_switch + alert - - se 5 fallimenti consecutivi → restart automatico (1 sola volta al giorno) +3. Lock file ancora valido +4. environment_info.environment == strategy.execution.environment +5. Audit HEALTH_OK / HEALTH_DEGRADED +6. Conta fallimenti consecutivi: + - 3 fallimenti → kill_switch + alert HIGH + - 5 fallimenti → audit + alert CRITICAL (riavvio è demandato a Docker) ``` +Il dead-man (`scripts/dead_man.sh`) sorveglia che `HEALTH_OK` venga +scritto: silenzio > 15 min → kill switch via SQLite e alert. + ## Flusso 6 — Recovery dopo crash -Quando l'engine riparte: +All'avvio o dopo un riavvio del container: -1. Riapre SQLite + log della giornata. -2. Per ogni posizione `awaiting_fill`: - - Chiede a `cerbero-memory.get_status(instruction_id)` - - Se filled → aggiorna a `open` - - Se cancelled → aggiorna a `cancelled` - - Se ancora pending → mantiene stato e riprende il monitoraggio -3. Per ogni posizione `closing`: - - Stessa logica -4. Per ogni posizione `open`: - - Verifica che esista realmente sul broker (Deribit `get_positions`) - - Se non esiste → flag `state_inconsistent` + alert + kill switch -5. Riconciliazione completata → ENGINE_START. +1. Apre SQLite + log JSONL della giornata. +2. Per ogni `awaiting_fill`/`closing`: + - chiama `deribit.get_positions` per verificare l'esistenza sul broker + - se trovata → aggiorna a `open` (fill confermato) + - se non trovata e l'ordine risulta cancellato → aggiorna a `cancelled` + - se nessuna delle due → flag `state_inconsistent`, kill switch, + alert CRITICAL +3. Per ogni `open`: + - verifica corrispondenza posizione broker vs DB (size, strike, + expiry); discrepanze → kill switch +4. Riconciliazione completata → `audit ENGINE_START`. -Il sistema **mai** prende decisioni di trading durante recovery: solo -allinea lo stato. Il primo decision loop avviene al prossimo trigger -naturale. +Il sistema **mai** prende decisioni di trading durante il recovery: +solo allinea lo stato. Il primo decision loop avviene al prossimo +trigger naturale. ## Diagramma di stato di una posizione ``` - ┌─ rejected_by_user ──→ cancelled - │ - (entry) ├─ timeout ──────────→ cancelled -proposed ─────────────► │ - ├─ confirmed - ▼ - awaiting_fill - │ - ├─ no_fill ──────────→ cancelled - ├─ filled ───────────► open - │ │ - │ (exit_decision != HOLD) - │ │ - │ ▼ - │ closing - │ │ - │ ├─ no_fill (escalated) → manual_intervention - │ └─ filled ──────────→ closed +proposed + │ + ├─ broker_reject ────→ cancelled + ├─ submitted ────────► awaiting_fill + │ + ├─ no_fill ───────────→ cancelled + ├─ filled ────────────► open + │ │ + │ (exit_decision != HOLD) + │ │ + │ ▼ + │ closing + │ │ + │ ├─ no_fill → open + │ │ (riprovato al prossimo ciclo) + │ └─ filled → closed ``` ## Cron summary diff --git a/pyproject.toml b/pyproject.toml index fe5ca56..f307c1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,6 +87,8 @@ ignore = [ "PLR0913", # too many arguments (we accept config-heavy functions) "PLR2004", # magic value (we have many domain constants in tests) "PLR0911", # too many return statements (rule engines have many early returns) + "PLR0912", # too many branches (entry/monitor cycles are inherently long) + "PLR0915", # too many statements (orchestration cycles) "RUF001", # ambiguous unicode in strings (we use math symbols × ≤ ≥) "RUF002", # ambiguous unicode in docstrings "RUF003", # ambiguous unicode in comments diff --git a/src/cerbero_bite/cli.py b/src/cerbero_bite/cli.py index eab512f..37d77cd 100644 --- a/src/cerbero_bite/cli.py +++ b/src/cerbero_bite/cli.py @@ -11,8 +11,11 @@ from __future__ import annotations import asyncio import sys +from collections.abc import Callable from datetime import UTC, datetime +from decimal import Decimal from pathlib import Path +from typing import Any import click from rich.console import Console @@ -33,6 +36,7 @@ from cerbero_bite.config.mcp_endpoints import ( ) from cerbero_bite.logging import configure as configure_logging from cerbero_bite.logging import get_logger +from cerbero_bite.runtime.orchestrator import Orchestrator, make_orchestrator from cerbero_bite.safety.audit_log import AuditChainError, AuditLog from cerbero_bite.safety.audit_log import verify_chain as verify_audit_chain from cerbero_bite.safety.kill_switch import KillSwitch @@ -87,22 +91,160 @@ def status() -> None: ) +def _engine_options(func: Callable[..., Any]) -> Callable[..., Any]: + """Common options for the engine commands.""" + decorators = [ + click.option( + "--strategy", + "strategy_path", + type=click.Path(exists=True, dir_okay=False, path_type=Path), + default=_DEFAULT_STRATEGY_PATH, + show_default=True, + ), + click.option( + "--token-file", + type=click.Path(dir_okay=False, path_type=Path), + default=None, + ), + click.option( + "--db", + type=click.Path(dir_okay=False, path_type=Path), + default=_DEFAULT_DB_PATH, + show_default=True, + ), + click.option( + "--audit", + type=click.Path(dir_okay=False, path_type=Path), + default=_DEFAULT_AUDIT_PATH, + show_default=True, + ), + click.option( + "--environment", + type=click.Choice(["testnet", "mainnet"]), + default="testnet", + show_default=True, + ), + click.option( + "--eur-to-usd", + type=float, + default=1.075, + show_default=True, + help="EUR→USD conversion rate used by the sizing engine.", + ), + ] + for d in reversed(decorators): + func = d(func) + return func + + +def _build_orchestrator( + *, + strategy_path: Path, + token_file: Path | None, + db: Path, + audit: Path, + environment: str, + eur_to_usd: float, +) -> Orchestrator: + loaded = load_strategy(strategy_path, enforce_hash=False) + token = load_token(path=token_file) + return make_orchestrator( + cfg=loaded.config, + endpoints=load_endpoints(), + token=token, + db_path=db, + audit_path=audit, + expected_environment=environment, # type: ignore[arg-type] + eur_to_usd=Decimal(str(eur_to_usd)), + ) + + @main.command() -def start() -> None: +@_engine_options +def start( + strategy_path: Path, + token_file: Path | None, + db: Path, + audit: Path, + environment: str, + eur_to_usd: float, +) -> None: """Start the engine main loop (scheduler + monitoring).""" - _phase0_notice("start command not yet implemented; engine remains idle.") + orch = _build_orchestrator( + strategy_path=strategy_path, + token_file=token_file, + db=db, + audit=audit, + environment=environment, + eur_to_usd=eur_to_usd, + ) + console.print( + f"[bold cyan]Cerbero Bite[/bold cyan] starting " + f"(env={environment}, db={db}, audit={audit})" + ) + try: + asyncio.run(orch.run_forever()) + except KeyboardInterrupt: + console.print("[yellow]engine interrupted[/yellow]") + + +@main.command() +@_engine_options +@click.option( + "--cycle", + type=click.Choice(["entry", "monitor", "health"]), + default="entry", + show_default=True, +) +def dry_run( + strategy_path: Path, + token_file: Path | None, + db: Path, + audit: Path, + environment: str, + eur_to_usd: float, + cycle: str, +) -> None: + """Execute one cycle without starting the scheduler.""" + orch = _build_orchestrator( + strategy_path=strategy_path, + token_file=token_file, + db=db, + audit=audit, + environment=environment, + eur_to_usd=eur_to_usd, + ) + + async def _go() -> None: + await orch.boot() + if cycle == "entry": + entry = await orch.run_entry() + console.print( + f"[green]entry[/green] {entry.status} reason={entry.reason}" + ) + elif cycle == "monitor": + mon = await orch.run_monitor() + console.print(f"[green]monitor[/green] outcomes={len(mon.outcomes)}") + for outcome in mon.outcomes: + console.print( + f" • {outcome.proposal_id[:8]} {outcome.action} " + f"closed={outcome.closed}" + ) + else: + health = await orch.run_health() + console.print(f"[green]health[/green] state={health.state}") + + asyncio.run(_go()) @main.command() def stop() -> None: - """Gracefully stop a running engine.""" - _phase0_notice("stop command not yet implemented.") - - -@main.command(name="dry-run") -def dry_run() -> None: - """Run the decision loop once in dry-run mode (no MCP writes).""" - _phase0_notice("dry-run command not yet implemented.") + """Gracefully stop a running engine (manual: send SIGTERM).""" + console.print( + "Use [bold]docker compose stop cerbero-bite[/bold] (or send SIGTERM " + "to the engine PID). The container traps the signal and shuts down " + "the scheduler before exit." + ) @main.group(name="kill-switch") diff --git a/src/cerbero_bite/runtime/__init__.py b/src/cerbero_bite/runtime/__init__.py index e69de29..d3f3134 100644 --- a/src/cerbero_bite/runtime/__init__.py +++ b/src/cerbero_bite/runtime/__init__.py @@ -0,0 +1,14 @@ +"""Runtime: composes core/ + clients/ + state/ + safety/ into the engine.""" + +from cerbero_bite.runtime.alert_manager import AlertManager, Severity +from cerbero_bite.runtime.dependencies import RuntimeContext, build_runtime +from cerbero_bite.runtime.orchestrator import Orchestrator, make_orchestrator + +__all__ = [ + "AlertManager", + "Orchestrator", + "RuntimeContext", + "Severity", + "build_runtime", + "make_orchestrator", +] diff --git a/src/cerbero_bite/runtime/alert_manager.py b/src/cerbero_bite/runtime/alert_manager.py new file mode 100644 index 0000000..66c9e96 --- /dev/null +++ b/src/cerbero_bite/runtime/alert_manager.py @@ -0,0 +1,106 @@ +"""Severity-based alert dispatcher (``docs/07-risk-controls.md``). + +Routes anomalies to the right notification channel and, for HIGH and +CRITICAL events, arms the kill switch. + +* ``LOW`` — append to audit log only. +* ``MEDIUM`` — audit + ``telegram.notify`` (priority="high"). +* ``HIGH`` — audit + ``telegram.notify_alert`` (priority="high") + + arm kill switch. +* ``CRITICAL``— audit + ``telegram.notify_system_error`` + + arm kill switch (already armed → idempotent). +""" + +from __future__ import annotations + +import logging +from enum import StrEnum + +from cerbero_bite.clients.telegram import TelegramClient +from cerbero_bite.safety.audit_log import AuditLog +from cerbero_bite.safety.kill_switch import KillSwitch + +__all__ = ["AlertManager", "Severity"] + + +_log = logging.getLogger("cerbero_bite.runtime.alert_manager") + + +class Severity(StrEnum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class AlertManager: + """Dispatcher used by every runtime module to surface anomalies.""" + + def __init__( + self, + *, + telegram: TelegramClient, + audit_log: AuditLog, + kill_switch: KillSwitch, + ) -> None: + self._telegram = telegram + self._audit = audit_log + self._kill = kill_switch + + async def emit( + self, + severity: Severity, + *, + source: str, + message: str, + component: str | None = None, + ) -> None: + """Emit an alert at the given severity level.""" + self._audit.append( + event="ALERT", + payload={ + "severity": severity.value, + "source": source, + "message": message, + "component": component, + }, + ) + + if severity == Severity.LOW: + _log.info("alert.low source=%s message=%s", source, message) + return + + if severity == Severity.MEDIUM: + await self._telegram.notify( + f"[{source}] {message}", priority="high", tag=source + ) + return + + if severity == Severity.HIGH: + await self._telegram.notify_alert( + source=source, message=message, priority="high" + ) + self._kill.arm(reason=message, source=source) + return + + # CRITICAL + await self._telegram.notify_system_error( + message=message, component=component, priority="critical" + ) + self._kill.arm(reason=message, source=source) + + async def low(self, *, source: str, message: str) -> None: + await self.emit(Severity.LOW, source=source, message=message) + + async def medium(self, *, source: str, message: str) -> None: + await self.emit(Severity.MEDIUM, source=source, message=message) + + async def high(self, *, source: str, message: str) -> None: + await self.emit(Severity.HIGH, source=source, message=message) + + async def critical( + self, *, source: str, message: str, component: str | None = None + ) -> None: + await self.emit( + Severity.CRITICAL, source=source, message=message, component=component + ) diff --git a/src/cerbero_bite/runtime/dependencies.py b/src/cerbero_bite/runtime/dependencies.py new file mode 100644 index 0000000..b446ae3 --- /dev/null +++ b/src/cerbero_bite/runtime/dependencies.py @@ -0,0 +1,135 @@ +"""Runtime dependency container. + +Builds and wires together every long-lived object the engine needs: +HTTP clients, repository, audit log, kill switch, alert manager. The +:func:`build_runtime` factory returns a frozen :class:`RuntimeContext` +that the orchestrator and the cycle modules pass around — no global +state, no implicit singletons. +""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path + +from cerbero_bite.clients._base import HttpToolClient +from cerbero_bite.clients.deribit import DeribitClient +from cerbero_bite.clients.hyperliquid import HyperliquidClient +from cerbero_bite.clients.macro import MacroClient +from cerbero_bite.clients.portfolio import PortfolioClient +from cerbero_bite.clients.sentiment import SentimentClient +from cerbero_bite.clients.telegram import TelegramClient +from cerbero_bite.config.mcp_endpoints import McpEndpoints +from cerbero_bite.config.schema import StrategyConfig +from cerbero_bite.runtime.alert_manager import AlertManager +from cerbero_bite.safety.audit_log import AuditLog +from cerbero_bite.safety.kill_switch import KillSwitch +from cerbero_bite.state import ( + Repository, + connect, + run_migrations, + transaction, +) + +__all__ = ["RuntimeContext", "build_runtime"] + + +@dataclass(frozen=True) +class RuntimeContext: + """Bag of every wired object used by the runtime.""" + + cfg: StrategyConfig + db_path: Path + audit_path: Path + + repository: Repository + audit_log: AuditLog + kill_switch: KillSwitch + alert_manager: AlertManager + + deribit: DeribitClient + macro: MacroClient + sentiment: SentimentClient + hyperliquid: HyperliquidClient + portfolio: PortfolioClient + telegram: TelegramClient + + clock: Callable[[], datetime] + + +def _utc_now() -> datetime: + return datetime.now(UTC) + + +def build_runtime( + *, + cfg: StrategyConfig, + endpoints: McpEndpoints, + token: str, + db_path: Path | str, + audit_path: Path | str, + timeout_s: float = 8.0, + retry_max: int = 3, + clock: Callable[[], datetime] | None = None, +) -> RuntimeContext: + """Wire every dependency the runtime needs. + + The SQLite database is migrated and the system_state singleton is + initialised eagerly so the orchestrator can assume both are + present. + """ + db_path = Path(db_path) + audit_path = Path(audit_path) + clk = clock or _utc_now + + repository = Repository() + conn = connect(db_path) + try: + run_migrations(conn) + with transaction(conn): + repository.init_system_state( + conn, config_version=cfg.config_version, now=clk() + ) + finally: + conn.close() + + audit_log = AuditLog(audit_path) + kill_switch = KillSwitch( + connection_factory=lambda: connect(db_path), + repository=repository, + audit_log=audit_log, + clock=clk, + ) + + def _client(service: str) -> HttpToolClient: + return HttpToolClient( + service=service, + base_url=endpoints.for_service(service), + token=token, + timeout_s=timeout_s, + retry_max=retry_max, + ) + + telegram = TelegramClient(_client("telegram")) + alert_manager = AlertManager( + telegram=telegram, audit_log=audit_log, kill_switch=kill_switch + ) + + return RuntimeContext( + cfg=cfg, + db_path=db_path, + audit_path=audit_path, + repository=repository, + audit_log=audit_log, + kill_switch=kill_switch, + alert_manager=alert_manager, + deribit=DeribitClient(_client("deribit")), + macro=MacroClient(_client("macro")), + sentiment=SentimentClient(_client("sentiment")), + hyperliquid=HyperliquidClient(_client("hyperliquid")), + portfolio=PortfolioClient(_client("portfolio")), + telegram=telegram, + clock=clk, + ) diff --git a/src/cerbero_bite/runtime/entry_cycle.py b/src/cerbero_bite/runtime/entry_cycle.py new file mode 100644 index 0000000..3d74378 --- /dev/null +++ b/src/cerbero_bite/runtime/entry_cycle.py @@ -0,0 +1,640 @@ +"""Weekly entry decision loop (``docs/06-operational-flow.md`` §2). + +Pure orchestration over the existing core/clients/state primitives. +The cycle is auto-execute: when every gate passes, the engine sends +the combo order without asking Adriano. Telegram is used only to +notify the outcome. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from typing import Any +from uuid import uuid4 + +from cerbero_bite.clients.deribit import ( + ComboLegOrder, + ComboOrderResult, + DeribitClient, + InstrumentMeta, +) +from cerbero_bite.clients.hyperliquid import HyperliquidClient +from cerbero_bite.clients.macro import MacroClient +from cerbero_bite.clients.portfolio import PortfolioClient +from cerbero_bite.clients.sentiment import SentimentClient +from cerbero_bite.config.schema import StrategyConfig +from cerbero_bite.core.combo_builder import ComboProposal, build, select_strikes +from cerbero_bite.core.entry_validator import ( + EntryContext, + TrendContext, + compute_bias, + validate_entry, +) +from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check +from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts +from cerbero_bite.core.types import OptionQuote +from cerbero_bite.runtime.alert_manager import AlertManager +from cerbero_bite.runtime.dependencies import RuntimeContext +from cerbero_bite.state import ( + DecisionRecord, + InstructionRecord, + PositionRecord, + transaction, +) +from cerbero_bite.state import connect as connect_state + +__all__ = [ + "EntryCycleResult", + "EntryDecisionStatus", + "run_entry_cycle", +] + + +_log = logging.getLogger("cerbero_bite.runtime.entry") + + +EntryDecisionStatus = str # one of the literals below +_STATUS_ENTRY_PLACED = "entry_placed" +_STATUS_NO_ENTRY = "no_entry" +_STATUS_BROKER_REJECT = "broker_reject" +_STATUS_KILL_SWITCH = "kill_switch_armed" +_STATUS_HAS_OPEN = "has_open_position" + + +@dataclass(frozen=True) +class EntryCycleResult: + """Outcome of one ``run_entry_cycle`` call (no exception path).""" + + status: EntryDecisionStatus + reason: str | None + proposal: ComboProposal | None = None + order: ComboOrderResult | None = None + + +# --------------------------------------------------------------------------- +# Snapshot collection +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class _MarketSnapshot: + spot_eth_usd: Decimal + dvol: Decimal + funding_perp: Decimal + funding_cross: Decimal + macro_days_to_event: int | None + eth_holdings_pct: Decimal + portfolio_eur: Decimal + + +async def _gather_snapshot( + *, + deribit: DeribitClient, + hyperliquid: HyperliquidClient, + sentiment: SentimentClient, + macro: MacroClient, + portfolio: PortfolioClient, + cfg: StrategyConfig, + now: datetime, +) -> _MarketSnapshot: + spot_t: asyncio.Task[Decimal] = asyncio.create_task(deribit.index_price_eth()) + dvol_t: asyncio.Task[Decimal] = asyncio.create_task( + deribit.latest_dvol(currency="ETH", now=now) + ) + funding_perp_t: asyncio.Task[Decimal] = asyncio.create_task( + hyperliquid.funding_rate_annualized("ETH") + ) + funding_cross_t: asyncio.Task[Decimal] = asyncio.create_task( + sentiment.funding_cross_median_annualized("ETH") + ) + macro_t: asyncio.Task[int | None] = asyncio.create_task( + macro.next_high_severity_within( + days=cfg.structure.dte_target, + countries=list(cfg.entry.exclude_macro_countries), + now=now, + ) + ) + holdings_t: asyncio.Task[Decimal] = asyncio.create_task( + portfolio.asset_pct_of_portfolio("ETH") + ) + portfolio_t: asyncio.Task[Decimal] = asyncio.create_task( + portfolio.total_equity_eur() + ) + + await asyncio.gather( + spot_t, + dvol_t, + funding_perp_t, + funding_cross_t, + macro_t, + holdings_t, + portfolio_t, + ) + return _MarketSnapshot( + spot_eth_usd=spot_t.result(), + dvol=dvol_t.result(), + funding_perp=funding_perp_t.result(), + funding_cross=funding_cross_t.result(), + macro_days_to_event=macro_t.result(), + eth_holdings_pct=holdings_t.result(), + portfolio_eur=portfolio_t.result(), + ) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _record_decision( + ctx: RuntimeContext, + *, + inputs: dict[str, Any], + outputs: dict[str, Any], + action_taken: str, + notes: str | None, + proposal_id: str | None, + now: datetime, +) -> None: + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.record_decision( + conn, + DecisionRecord( + decision_type="entry_check", + timestamp=now, + inputs_json=json.dumps(inputs, default=str, sort_keys=True), + outputs_json=json.dumps(outputs, default=str, sort_keys=True), + action_taken=action_taken, + notes=notes, + proposal_id=proposal_id, # type: ignore[arg-type] + ), + ) + finally: + conn.close() + + +async def _build_quotes( + deribit: DeribitClient, + chain: list[InstrumentMeta], +) -> list[OptionQuote]: + """Fetch tickers + orderbook depth for the given metas, return OptionQuotes.""" + if not chain: + return [] + names = [m.name for m in chain] + if len(names) > 20: + # Bite consumes a narrow window of strikes; if it ever overflows + # the batch limit, the caller is expected to pre-filter. + raise ValueError("entry_cycle: too many instruments to quote in one batch") + + tickers = await deribit.get_tickers(names) + depths = await asyncio.gather( + *[deribit.orderbook_depth_top3(m.name) for m in chain] + ) + by_name: dict[str, dict[str, Any]] = { + str(t.get("instrument_name")): t for t in tickers if isinstance(t, dict) + } + + out: list[OptionQuote] = [] + for meta, depth in zip(chain, depths, strict=True): + ticker = by_name.get(meta.name) + if not ticker: + continue + bid = ticker.get("bid") + ask = ticker.get("ask") + mark = ticker.get("mark_price") + greeks = ticker.get("greeks") or {} + if bid is None or ask is None or mark is None: + continue + out.append( + OptionQuote( + instrument=meta.name, + strike=meta.strike, + expiry=meta.expiry, + option_type=meta.option_type, + bid=Decimal(str(bid)), + ask=Decimal(str(ask)), + mid=Decimal(str(mark)), + delta=Decimal(str(greeks.get("delta") or 0)), + gamma=Decimal(str(greeks.get("gamma") or 0)), + theta=Decimal(str(greeks.get("theta") or 0)), + vega=Decimal(str(greeks.get("vega") or 0)), + open_interest=int(meta.open_interest or 0), + volume_24h=int(ticker.get("volume_24h") or 0), + book_depth_top3=int(depth), + ) + ) + return out + + +def _max_loss_per_contract_usd(short_strike: Decimal, long_strike: Decimal) -> Decimal: + return (short_strike - long_strike).copy_abs() + + +# --------------------------------------------------------------------------- +# Cycle entry point +# --------------------------------------------------------------------------- + + +async def run_entry_cycle( + ctx: RuntimeContext, + *, + eur_to_usd_rate: Decimal, + now: datetime | None = None, +) -> EntryCycleResult: + """Run one weekly entry evaluation cycle. + + The function is idempotent and side-effect aware: it persists the + decision in the ``decisions`` table regardless of outcome and only + creates a position when the broker accepts the order. + """ + when = (now or ctx.clock()).astimezone(UTC) + cfg = ctx.cfg + alert: AlertManager = ctx.alert_manager + + if ctx.kill_switch.is_armed(): + await ctx.alert_manager.low( + source="entry_cycle", message="kill switch armed — skipping" + ) + return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch") + + # Has open position? + conn = connect_state(ctx.db_path) + try: + concurrent = ctx.repository.count_concurrent_positions(conn) + finally: + conn.close() + if concurrent > 0: + await alert.low(source="entry_cycle", message="position already open") + return EntryCycleResult(status=_STATUS_HAS_OPEN, reason="has_open_position") + + # 1. Snapshot + snap = await _gather_snapshot( + deribit=ctx.deribit, + hyperliquid=ctx.hyperliquid, + sentiment=ctx.sentiment, + macro=ctx.macro, + portfolio=ctx.portfolio, + cfg=cfg, + now=when, + ) + capital_usd = snap.portfolio_eur * eur_to_usd_rate + + # 2. Entry filters + entry_ctx = EntryContext( + capital_usd=capital_usd, + dvol_now=snap.dvol, + funding_perp_annualized=snap.funding_perp, + eth_holdings_pct_of_portfolio=snap.eth_holdings_pct, + next_macro_event_in_days=snap.macro_days_to_event, + has_open_position=False, + ) + decision = validate_entry(entry_ctx, cfg) + inputs = { + "snapshot": { + "spot_eth_usd": str(snap.spot_eth_usd), + "dvol": str(snap.dvol), + "funding_perp": str(snap.funding_perp), + "funding_cross": str(snap.funding_cross), + "macro_days_to_event": snap.macro_days_to_event, + "eth_holdings_pct": str(snap.eth_holdings_pct), + "portfolio_eur": str(snap.portfolio_eur), + "capital_usd": str(capital_usd), + } + } + if not decision.accepted: + await _record_decision( + ctx, + inputs=inputs, + outputs={"accepted": False, "reasons": decision.reasons}, + action_taken="no_entry", + notes="entry_validator", + proposal_id=None, + now=when, + ) + await alert.low( + source="entry_cycle", + message=f"entry rejected: {'; '.join(decision.reasons)}", + ) + return EntryCycleResult( + status=_STATUS_NO_ENTRY, reason=";".join(decision.reasons) + ) + + # 3. Bias (need a 30-day prior spot — orchestrator passes it in) + # We approximate by reusing the current spot until the historical + # snapshot store ships in Phase 5; for now no historical → bias + # cannot fire bull/bear, only iron_condor when DVOL/ADX align. The + # caller is responsible for plugging in real data via overrides. + trend_ctx = TrendContext( + eth_now=snap.spot_eth_usd, + eth_30d_ago=snap.spot_eth_usd, + funding_cross_annualized=snap.funding_cross, + dvol_now=snap.dvol, + adx_14=Decimal("25"), # placeholder until ADX lands in market data + ) + bias = compute_bias(trend_ctx, cfg) + if bias is None: + await _record_decision( + ctx, + inputs=inputs, + outputs={"bias": None}, + action_taken="no_entry", + notes="no_bias", + proposal_id=None, + now=when, + ) + await alert.low(source="entry_cycle", message="no directional bias") + return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="no_bias") + + # 4. Chain → strikes + expiry_from = when + expiry_to = when + timedelta(days=cfg.structure.dte_max + 1) + chain_meta = await ctx.deribit.options_chain( + currency="ETH", + expiry_from=expiry_from, + expiry_to=expiry_to, + min_open_interest=int(cfg.liquidity.open_interest_min), + ) + quotes = await _build_quotes(ctx.deribit, chain_meta) + selection = select_strikes( + chain=quotes, bias=bias, spot=snap.spot_eth_usd, now=when, cfg=cfg + ) + if selection is None: + await _record_decision( + ctx, + inputs=inputs, + outputs={"bias": bias, "n_quotes": len(quotes)}, + action_taken="no_entry", + notes="no_strike", + proposal_id=None, + now=when, + ) + await alert.low(source="entry_cycle", message="no strike candidate") + return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="no_strike") + + short, long_ = selection + + # 5. Liquidity gate (uses raw bid/ask/depth from the same quotes) + short_snap = InstrumentSnapshot( + instrument=short.instrument, + bid=short.bid, + ask=short.ask, + mid=short.mid, + open_interest=short.open_interest, + volume_24h=short.volume_24h, + book_depth_top3=short.book_depth_top3, + ) + long_snap = InstrumentSnapshot( + instrument=long_.instrument, + bid=long_.bid, + ask=long_.ask, + mid=long_.mid, + open_interest=long_.open_interest, + volume_24h=long_.volume_24h, + book_depth_top3=long_.book_depth_top3, + ) + credit_eth_per_contract = short.mid - long_.mid + + # 6. Sizing + width_usd = (short.strike - long_.strike).copy_abs() + sizing_ctx = SizingContext( + capital_usd=capital_usd, + max_loss_per_contract_usd=_max_loss_per_contract_usd( + short.strike, long_.strike + ), + dvol_now=snap.dvol, + open_engagement_usd=Decimal("0"), + eur_to_usd=eur_to_usd_rate, + other_open_positions=0, + ) + sizing = compute_contracts(sizing_ctx, cfg) + if sizing.n_contracts < 1: + await _record_decision( + ctx, + inputs=inputs, + outputs={"sizing_reason": sizing.reason_if_zero}, + action_taken="no_entry", + notes="undersize", + proposal_id=None, + now=when, + ) + await alert.low( + source="entry_cycle", + message=f"undersize: {sizing.reason_if_zero}", + ) + return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="undersize") + + # 7. Liquidity check now that we know n_contracts + liq = check( + short_leg=short_snap, + long_leg=long_snap, + credit=credit_eth_per_contract * Decimal(sizing.n_contracts), + n_contracts=sizing.n_contracts, + cfg=cfg, + ) + if not liq.accepted: + await _record_decision( + ctx, + inputs=inputs, + outputs={"liquidity_reasons": liq.reasons}, + action_taken="no_entry", + notes="illiquid", + proposal_id=None, + now=when, + ) + await alert.low( + source="entry_cycle", + message=f"illiquid: {'; '.join(liq.reasons)}", + ) + return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="illiquid") + + # 8. Build proposal + persist + place order + proposal = build( + short=short, + long_=long_, + n_contracts=sizing.n_contracts, + spot=snap.spot_eth_usd, + dvol=snap.dvol, + cfg=cfg, + now=when, + spread_type=bias, + ) + + pct_of_spot = ( + width_usd / snap.spot_eth_usd if snap.spot_eth_usd > 0 else Decimal("0") + ) + record = PositionRecord( + proposal_id=proposal.proposal_id, + spread_type=bias, + expiry=proposal.expiry, + short_strike=short.strike, + long_strike=long_.strike, + short_instrument=short.instrument, + long_instrument=long_.instrument, + n_contracts=sizing.n_contracts, + spread_width_usd=width_usd, + spread_width_pct=pct_of_spot, + credit_eth=proposal.credit_target_eth, + credit_usd=proposal.credit_target_usd, + max_loss_usd=proposal.max_loss_usd, + spot_at_entry=snap.spot_eth_usd, + dvol_at_entry=snap.dvol, + delta_at_entry=short.delta, + eth_price_at_entry=snap.spot_eth_usd, + proposed_at=when, + status="proposed", + created_at=when, + updated_at=when, + ) + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.create_position(conn, record) + finally: + conn.close() + + legs = [ + ComboLegOrder(instrument_name=short.instrument, direction="sell"), + ComboLegOrder(instrument_name=long_.instrument, direction="buy"), + ] + try: + order = await ctx.deribit.place_combo_order( + legs=legs, + side="sell", + n_contracts=sizing.n_contracts, + limit_price_eth=credit_eth_per_contract, + label=f"bite-{proposal.proposal_id}", + ) + except Exception as exc: + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.update_position_status( + conn, + proposal.proposal_id, + status="cancelled", + closed_at=when, + close_reason="broker_error", + now=when, + ) + finally: + conn.close() + await alert.high( + source="entry_cycle", + message=f"place_combo_order failed: {type(exc).__name__}: {exc}", + ) + await _record_decision( + ctx, + inputs=inputs, + outputs={"error": str(exc)}, + action_taken="broker_error", + notes=type(exc).__name__, + proposal_id=str(proposal.proposal_id), + now=when, + ) + return EntryCycleResult( + status=_STATUS_BROKER_REJECT, + reason=f"{type(exc).__name__}: {exc}", + proposal=proposal, + ) + + # 9. Persist instruction + update status + next_status = "open" if order.state in {"filled", "open"} else "awaiting_fill" + if order.state == "rejected": + next_status = "cancelled" + instruction_id = uuid4() + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.create_instruction( + conn, + InstructionRecord( + instruction_id=instruction_id, + proposal_id=proposal.proposal_id, + kind="open_combo", + payload_json=json.dumps(order.raw, default=str, sort_keys=True), + sent_at=when, + actual_fill_eth=order.average_price_eth, + ), + ) + ctx.repository.update_position_status( + conn, + proposal.proposal_id, + status=next_status, # type: ignore[arg-type] + opened_at=when if next_status == "open" else None, + closed_at=when if next_status == "cancelled" else None, + close_reason="broker_reject" if next_status == "cancelled" else None, + now=when, + ) + finally: + conn.close() + + await _record_decision( + ctx, + inputs=inputs, + outputs={ + "n_contracts": sizing.n_contracts, + "credit_eth": str(proposal.credit_target_eth), + "max_loss_usd": str(proposal.max_loss_usd), + "broker_state": order.state, + }, + action_taken="propose_open", + notes=None, + proposal_id=str(proposal.proposal_id), + now=when, + ) + + if next_status == "cancelled": + await alert.high( + source="entry_cycle", + message=f"broker rejected combo order: state={order.state}", + ) + return EntryCycleResult( + status=_STATUS_BROKER_REJECT, + reason="broker_reject", + proposal=proposal, + order=order, + ) + + await ctx.telegram.notify_position_opened( + instrument=order.combo_instrument, + side="SELL", + size=sizing.n_contracts, + strategy=bias, + greeks={ + "delta_short": short.delta, + "credit_eth": proposal.credit_target_eth, + "max_loss_usd": proposal.max_loss_usd, + }, + expected_pnl_usd=proposal.credit_target_usd, + ) + ctx.audit_log.append( + event="ENTRY_PLACED", + payload={ + "proposal_id": str(proposal.proposal_id), + "spread_type": bias, + "n_contracts": sizing.n_contracts, + "combo_instrument": order.combo_instrument, + "broker_state": order.state, + }, + now=when, + ) + _log.info( + "entry placed: proposal=%s combo=%s contracts=%d state=%s", + proposal.proposal_id, + order.combo_instrument, + sizing.n_contracts, + order.state, + ) + return EntryCycleResult( + status=_STATUS_ENTRY_PLACED, + reason=None, + proposal=proposal, + order=order, + ) diff --git a/src/cerbero_bite/runtime/health_check.py b/src/cerbero_bite/runtime/health_check.py new file mode 100644 index 0000000..dae44a7 --- /dev/null +++ b/src/cerbero_bite/runtime/health_check.py @@ -0,0 +1,128 @@ +"""Periodic health probe across MCP services + SQLite + environment. + +The probe is fail-soft: every check is wrapped in a try/except so a +single misbehaving service does not abort the others. The orchestrator +keeps a counter of consecutive failures: at the third failure the +kill switch arms (HIGH severity); any time the probe succeeds the +counter resets and a fresh ``HEALTH_OK`` line is appended to the +audit log so the dead-man watcher stays quiet. +""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Literal + +from cerbero_bite.runtime.alert_manager import Severity +from cerbero_bite.runtime.dependencies import RuntimeContext +from cerbero_bite.state import connect + +__all__ = ["HealthCheck", "HealthCheckResult", "HealthState"] + + +_log = logging.getLogger("cerbero_bite.runtime.health") + + +HealthState = Literal["ok", "degraded"] + + +@dataclass(frozen=True) +class HealthCheckResult: + state: HealthState + failures: list[tuple[str, str]] # [(service, reason), ...] + consecutive_failures: int + + +class HealthCheck: + """Stateful health probe; remembers consecutive failures across calls.""" + + def __init__( + self, + ctx: RuntimeContext, + *, + expected_environment: Literal["testnet", "mainnet"], + kill_after: int = 3, + ) -> None: + self._ctx = ctx + self._expected = expected_environment + self._kill_after = kill_after + self._consecutive = 0 + + async def run(self, *, now: datetime | None = None) -> HealthCheckResult: + when = (now or self._ctx.clock()).astimezone(UTC) + failures: list[tuple[str, str]] = [] + + async def _probe(service: str, coro: object) -> None: + try: + await coro # type: ignore[misc] + except Exception as exc: # surface every error to the operator + failures.append((service, f"{type(exc).__name__}: {exc}")) + + await asyncio.gather( + _probe("deribit", self._probe_deribit()), + _probe("macro", self._ctx.macro.get_calendar(days=1)), + _probe("sentiment", self._probe_sentiment()), + _probe("hyperliquid", self._ctx.hyperliquid.funding_rate_annualized("ETH")), + _probe("portfolio", self._ctx.portfolio.total_equity_eur()), + ) + + # SQLite health: lightweight transaction. + try: + conn = connect(self._ctx.db_path) + try: + self._ctx.repository.touch_health_check(conn, now=when) + finally: + conn.close() + except Exception as exc: # pragma: no cover — sqlite errors are rare + failures.append(("sqlite", f"{type(exc).__name__}: {exc}")) + + if failures: + self._consecutive += 1 + state: HealthState = "degraded" + self._ctx.audit_log.append( + event="HEALTH_DEGRADED", + payload={ + "failures": failures, + "consecutive": self._consecutive, + }, + now=when, + ) + if self._consecutive >= self._kill_after: + await self._ctx.alert_manager.emit( + Severity.HIGH, + source="health_check", + message=( + f"{self._consecutive} consecutive health-check failures " + f"(latest: {failures})" + ), + ) + else: + self._consecutive = 0 + state = "ok" + self._ctx.audit_log.append( + event="HEALTH_OK", payload={}, now=when + ) + + return HealthCheckResult( + state=state, + failures=failures, + consecutive_failures=self._consecutive, + ) + + async def _probe_deribit(self) -> None: + info = await self._ctx.deribit.environment_info() + if info.environment != self._expected: + raise RuntimeError( + f"deribit environment mismatch: expected {self._expected}, " + f"got {info.environment}" + ) + + async def _probe_sentiment(self) -> None: + # Avoid funding_cross which would raise on empty snapshot during + # the health probe; we only need a successful HTTP round-trip. + await self._ctx.sentiment._http.call( + "get_cross_exchange_funding", {"assets": ["ETH"]} + ) diff --git a/src/cerbero_bite/runtime/monitor_cycle.py b/src/cerbero_bite/runtime/monitor_cycle.py new file mode 100644 index 0000000..d2b8590 --- /dev/null +++ b/src/cerbero_bite/runtime/monitor_cycle.py @@ -0,0 +1,451 @@ +"""Position monitoring loop (``docs/06-operational-flow.md`` §3). + +Walks every ``open`` position, builds the live snapshot, asks +:func:`exit_decision.evaluate`, and — when the action is not ``HOLD`` +— sends the inverse combo order to close. Decisions are persisted to +the ``decisions`` table; transitions to the audit log. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from typing import Any +from uuid import uuid4 + +from cerbero_bite.clients.deribit import ComboLegOrder +from cerbero_bite.core.exit_decision import ( + PositionSnapshot, + evaluate, +) +from cerbero_bite.core.types import OptionLeg, PutOrCall +from cerbero_bite.runtime.dependencies import RuntimeContext +from cerbero_bite.state import ( + DecisionRecord, + DvolSnapshot, + InstructionRecord, + PositionRecord, + transaction, +) +from cerbero_bite.state import connect as connect_state + +__all__ = [ + "MonitorCycleResult", + "PositionOutcome", + "run_monitor_cycle", +] + + +_log = logging.getLogger("cerbero_bite.runtime.monitor") + + +@dataclass(frozen=True) +class PositionOutcome: + proposal_id: str + action: str + closed: bool + reason: str | None + + +@dataclass(frozen=True) +class MonitorCycleResult: + outcomes: list[PositionOutcome] + + +# --------------------------------------------------------------------------- +# Snapshot helpers +# --------------------------------------------------------------------------- + + +async def _build_position_snapshot( + ctx: RuntimeContext, + *, + record: PositionRecord, + spot: Decimal, + dvol: Decimal, + return_4h: Decimal, + eth_price_usd_now: Decimal, + now: datetime, +) -> PositionSnapshot | None: + """Fetch tickers for the two legs and build a :class:`PositionSnapshot`. + + Returns ``None`` if the broker no longer quotes one of the legs — + the caller will arm an alert in that case. + """ + tickers = await ctx.deribit.get_tickers( + [record.short_instrument, record.long_instrument] + ) + by_name: dict[str, dict[str, Any]] = { + str(t.get("instrument_name")): t for t in tickers if isinstance(t, dict) + } + short = by_name.get(record.short_instrument) + long_ = by_name.get(record.long_instrument) + if short is None or long_ is None: + return None + short_mid = Decimal(str(short.get("mark_price") or 0)) + long_mid = Decimal(str(long_.get("mark_price") or 0)) + if short_mid <= 0 or long_mid <= 0: + return None + + short_delta = Decimal( + str((short.get("greeks") or {}).get("delta") or record.delta_at_entry) + ) + debit_per_contract = short_mid - long_mid + debit_total = debit_per_contract * Decimal(record.n_contracts) + + legs = [ + OptionLeg( + instrument=record.short_instrument, + side="SELL", + strike=record.short_strike, + expiry=record.expiry, + type=_option_type_from_name(record.short_instrument), + size=record.n_contracts, + mid_price_eth=short_mid, + delta=short_delta, + gamma=Decimal(str((short.get("greeks") or {}).get("gamma") or 0)), + theta=Decimal(str((short.get("greeks") or {}).get("theta") or 0)), + vega=Decimal(str((short.get("greeks") or {}).get("vega") or 0)), + ), + OptionLeg( + instrument=record.long_instrument, + side="BUY", + strike=record.long_strike, + expiry=record.expiry, + type=_option_type_from_name(record.long_instrument), + size=record.n_contracts, + mid_price_eth=long_mid, + delta=Decimal(str((long_.get("greeks") or {}).get("delta") or 0)), + gamma=Decimal(str((long_.get("greeks") or {}).get("gamma") or 0)), + theta=Decimal(str((long_.get("greeks") or {}).get("theta") or 0)), + vega=Decimal(str((long_.get("greeks") or {}).get("vega") or 0)), + ), + ] + + return PositionSnapshot( + proposal_id=record.proposal_id, + spread_type=record.spread_type, # type: ignore[arg-type] + legs=legs, + credit_received_eth=record.credit_eth, + credit_received_usd=record.credit_usd, + spot_at_entry=record.spot_at_entry, + dvol_at_entry=record.dvol_at_entry, + expiry=record.expiry, + opened_at=record.opened_at or record.created_at, + eth_price_usd_now=eth_price_usd_now, + spot_now=spot, + dvol_now=dvol, + mark_combo_now_eth=debit_total, + delta_short_now=short_delta, + return_4h_now=return_4h, + now=now, + ) + + +def _option_type_from_name(name: str) -> PutOrCall: + suffix = name.rsplit("-", 1)[-1] + if suffix not in ("P", "C"): + raise ValueError(f"unknown option type suffix in {name!r}") + return suffix # type: ignore[return-value] + + +async def _fetch_return_4h(ctx: RuntimeContext, *, now: datetime) -> Decimal: + """Compute ETH 4h return from the locally stored dvol_history snapshots. + + The orchestrator records a snapshot at the start of every monitor + cycle (see :func:`run_monitor_cycle`); this helper reads the most + recent snapshot at least 3.5h old and computes ``(now / past) - 1``. + Returns 0 if no historical sample is available — in that branch the + orchestrator emits a LOW alert about insufficient history. + """ + cutoff = now - timedelta(hours=3, minutes=30) + floor = now - timedelta(hours=8) + conn = connect_state(ctx.db_path) + try: + row = conn.execute( + "SELECT timestamp, eth_spot FROM dvol_history " + "WHERE timestamp <= ? AND timestamp >= ? " + "ORDER BY timestamp DESC LIMIT 1", + (cutoff.isoformat(), floor.isoformat()), + ).fetchone() + finally: + conn.close() + if row is None: + return Decimal("0") + past_spot = Decimal(str(row[1])) + if past_spot == 0: + return Decimal("0") + spot_now = await ctx.deribit.index_price_eth() + return spot_now / past_spot - Decimal("1") + + +# --------------------------------------------------------------------------- +# Cycle entry point +# --------------------------------------------------------------------------- + + +async def run_monitor_cycle( + ctx: RuntimeContext, + *, + now: datetime | None = None, +) -> MonitorCycleResult: + """Walk every open position, evaluate exit, place close orders if needed.""" + when = (now or ctx.clock()).astimezone(UTC) + if ctx.kill_switch.is_armed(): + await ctx.alert_manager.low( + source="monitor_cycle", message="kill switch armed — skipping" + ) + return MonitorCycleResult(outcomes=[]) + + # Refresh spot+DVOL snapshot first; it goes into dvol_history so the + # next cycle can compute return_4h. + spot_t = asyncio.create_task(ctx.deribit.index_price_eth()) + dvol_t = asyncio.create_task(ctx.deribit.latest_dvol(currency="ETH", now=when)) + return_4h_t = asyncio.create_task(_fetch_return_4h(ctx, now=when)) + await asyncio.gather(spot_t, dvol_t, return_4h_t) + spot = spot_t.result() + dvol = dvol_t.result() + return_4h = return_4h_t.result() + + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.record_dvol_snapshot( + conn, + DvolSnapshot(timestamp=when, dvol=dvol, eth_spot=spot), + ) + positions = ctx.repository.list_positions(conn, status="open") + finally: + conn.close() + + outcomes: list[PositionOutcome] = [] + for record in positions: + outcome = await _evaluate_position( + ctx, + record=record, + spot=spot, + dvol=dvol, + return_4h=return_4h, + now=when, + ) + outcomes.append(outcome) + return MonitorCycleResult(outcomes=outcomes) + + +async def _evaluate_position( + ctx: RuntimeContext, + *, + record: PositionRecord, + spot: Decimal, + dvol: Decimal, + return_4h: Decimal, + now: datetime, +) -> PositionOutcome: + snapshot = await _build_position_snapshot( + ctx, + record=record, + spot=spot, + dvol=dvol, + return_4h=return_4h, + eth_price_usd_now=spot, + now=now, + ) + if snapshot is None: + await ctx.alert_manager.high( + source="monitor_cycle", + message=( + f"missing tickers for legs of position {record.proposal_id} — " + "broker may have de-listed an instrument" + ), + ) + return PositionOutcome( + proposal_id=str(record.proposal_id), + action="HOLD", + closed=False, + reason="missing_ticker", + ) + + decision = evaluate(snapshot, ctx.cfg) + + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.record_decision( + conn, + DecisionRecord( + decision_type="exit_check", + timestamp=now, + inputs_json=json.dumps( + { + "spot": str(spot), + "dvol": str(dvol), + "mark_combo_now_eth": str(snapshot.mark_combo_now_eth), + "delta_short_now": str(snapshot.delta_short_now), + "return_4h_now": str(return_4h), + }, + default=str, + sort_keys=True, + ), + outputs_json=json.dumps( + { + "action": decision.action, + "reason": decision.reason, + "pnl_eth": str(decision.pnl_estimate_eth), + "pnl_usd": str(decision.pnl_estimate_usd), + }, + default=str, + sort_keys=True, + ), + action_taken=decision.action, + proposal_id=record.proposal_id, + ), + ) + finally: + conn.close() + + if decision.action == "HOLD": + ctx.audit_log.append( + event="HOLD", + payload={ + "proposal_id": str(record.proposal_id), + "reason": decision.reason, + }, + now=now, + ) + return PositionOutcome( + proposal_id=str(record.proposal_id), + action="HOLD", + closed=False, + reason=decision.reason, + ) + + # Place inverse combo to close. + short_leg = ComboLegOrder( + instrument_name=record.short_instrument, direction="buy" + ) + long_leg = ComboLegOrder( + instrument_name=record.long_instrument, direction="sell" + ) + + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.update_position_status( + conn, + record.proposal_id, + status="closing", + now=now, + ) + finally: + conn.close() + + try: + order = await ctx.deribit.place_combo_order( + legs=[short_leg, long_leg], + side="buy", + n_contracts=record.n_contracts, + limit_price_eth=snapshot.mark_combo_now_eth / Decimal(record.n_contracts), + label=f"bite-close-{record.proposal_id}", + ) + except Exception as exc: + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.update_position_status( + conn, record.proposal_id, status="open", now=now + ) + finally: + conn.close() + await ctx.alert_manager.critical( + source="monitor_cycle", + message=( + f"close failed for {record.proposal_id}: " + f"{type(exc).__name__}: {exc}" + ), + component="runtime.monitor_cycle", + ) + return PositionOutcome( + proposal_id=str(record.proposal_id), + action=decision.action, + closed=False, + reason=f"close_error:{type(exc).__name__}", + ) + + instruction_id = uuid4() + is_filled = order.state in {"filled", "open"} + next_status = "closed" if is_filled else "open" + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.create_instruction( + conn, + InstructionRecord( + instruction_id=instruction_id, + proposal_id=record.proposal_id, + kind="close_combo", + payload_json=json.dumps(order.raw, default=str, sort_keys=True), + sent_at=now, + actual_fill_eth=order.average_price_eth, + ), + ) + ctx.repository.update_position_status( + conn, + record.proposal_id, + status=next_status, # type: ignore[arg-type] + closed_at=now if is_filled else None, + close_reason=decision.action if is_filled else None, + debit_paid_eth=order.average_price_eth if is_filled else None, + pnl_eth=decision.pnl_estimate_eth if is_filled else None, + pnl_usd=decision.pnl_estimate_usd if is_filled else None, + now=now, + ) + finally: + conn.close() + + if not is_filled: + await ctx.alert_manager.critical( + source="monitor_cycle", + message=( + f"close rejected by broker for {record.proposal_id} " + f"(state={order.state})" + ), + component="runtime.monitor_cycle", + ) + return PositionOutcome( + proposal_id=str(record.proposal_id), + action=decision.action, + closed=False, + reason=f"broker_reject:{order.state}", + ) + + ctx.audit_log.append( + event="EXIT_FILLED", + payload={ + "proposal_id": str(record.proposal_id), + "action": decision.action, + "pnl_usd": str(decision.pnl_estimate_usd), + "combo_instrument": order.combo_instrument, + }, + now=now, + ) + await ctx.telegram.notify_position_closed( + instrument=order.combo_instrument, + realized_pnl_usd=decision.pnl_estimate_usd, + reason=decision.action, + ) + _log.info( + "exit filled: proposal=%s action=%s pnl_usd=%s", + record.proposal_id, + decision.action, + decision.pnl_estimate_usd, + ) + return PositionOutcome( + proposal_id=str(record.proposal_id), + action=decision.action, + closed=True, + reason=decision.reason, + ) diff --git a/src/cerbero_bite/runtime/orchestrator.py b/src/cerbero_bite/runtime/orchestrator.py new file mode 100644 index 0000000..94b4cb3 --- /dev/null +++ b/src/cerbero_bite/runtime/orchestrator.py @@ -0,0 +1,208 @@ +"""Façade that ties the runtime modules into a runnable engine. + +The :class:`Orchestrator` is the single entry point for the CLI: it +holds the :class:`RuntimeContext`, the :class:`HealthCheck` state, and +the boot procedure (recover + boot environment check + scheduler +arming). Every concrete cycle is delegated to its own module so each +piece stays independently testable. +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from datetime import UTC, datetime +from decimal import Decimal +from pathlib import Path +from typing import Literal + +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +from cerbero_bite.config.mcp_endpoints import McpEndpoints +from cerbero_bite.config.schema import StrategyConfig +from cerbero_bite.runtime.dependencies import RuntimeContext, build_runtime +from cerbero_bite.runtime.entry_cycle import EntryCycleResult, run_entry_cycle +from cerbero_bite.runtime.health_check import HealthCheck, HealthCheckResult +from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle +from cerbero_bite.runtime.recovery import recover_state +from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler + +__all__ = ["Orchestrator"] + + +_log = logging.getLogger("cerbero_bite.runtime.orchestrator") + +Environment = Literal["testnet", "mainnet"] + +# Default cron schedule (matches docs/06-operational-flow.md table). +_CRON_ENTRY = "0 14 * * MON" +_CRON_MONITOR = "0 2,14 * * *" +_CRON_HEALTH = "*/5 * * * *" + + +@dataclass(frozen=True) +class _BootResult: + environment: Environment + health: HealthCheckResult + + +class Orchestrator: + """Engine façade — boot, scheduler, manual cycle invocation.""" + + def __init__( + self, + ctx: RuntimeContext, + *, + expected_environment: Environment, + eur_to_usd: Decimal, + ) -> None: + self._ctx = ctx + self._expected_env = expected_environment + self._eur_to_usd = eur_to_usd + self._health = HealthCheck(ctx, expected_environment=expected_environment) + self._scheduler: AsyncIOScheduler | None = None + + @property + def context(self) -> RuntimeContext: + return self._ctx + + @property + def expected_environment(self) -> Environment: + return self._expected_env + + # ------------------------------------------------------------------ + # Boot + # ------------------------------------------------------------------ + + async def boot(self) -> _BootResult: + """Reconcile state, verify environment, run a first health probe.""" + when = self._ctx.clock() + await recover_state(self._ctx, now=when) + + info = await self._ctx.deribit.environment_info() + if info.environment != self._expected_env: + await self._ctx.alert_manager.critical( + source="orchestrator.boot", + message=( + f"Deribit environment mismatch at boot: expected " + f"{self._expected_env}, got {info.environment}" + ), + component="runtime.orchestrator", + ) + + health = await self._health.run(now=when) + self._ctx.audit_log.append( + event="ENGINE_START", + payload={ + "environment": info.environment, + "health": health.state, + "config_version": self._ctx.cfg.config_version, + }, + now=when, + ) + return _BootResult(environment=info.environment, health=health) + + # ------------------------------------------------------------------ + # Cycle invocations (used by scheduler jobs and CLI dry-run) + # ------------------------------------------------------------------ + + async def run_entry( + self, *, now: datetime | None = None + ) -> EntryCycleResult: + return await run_entry_cycle( + self._ctx, eur_to_usd_rate=self._eur_to_usd, now=now + ) + + async def run_monitor( + self, *, now: datetime | None = None + ) -> MonitorCycleResult: + return await run_monitor_cycle(self._ctx, now=now) + + async def run_health( + self, *, now: datetime | None = None + ) -> HealthCheckResult: + return await self._health.run(now=now) + + # ------------------------------------------------------------------ + # Scheduler lifecycle + # ------------------------------------------------------------------ + + def install_scheduler( + self, + *, + entry_cron: str = _CRON_ENTRY, + monitor_cron: str = _CRON_MONITOR, + health_cron: str = _CRON_HEALTH, + ) -> AsyncIOScheduler: + """Build the scheduler with the canonical job set, ready to start.""" + + async def _safe(name: str, coro_factory: Callable[[], Awaitable[object]]) -> None: + try: + await coro_factory() + except Exception as exc: # never let a tick kill the scheduler + _log.exception("scheduler tick %s raised", name) + await self._ctx.alert_manager.critical( + source=f"scheduler.{name}", + message=f"{type(exc).__name__}: {exc}", + component=f"runtime.{name}", + ) + + async def _entry() -> None: + await _safe("entry", self.run_entry) + + async def _monitor() -> None: + await _safe("monitor", self.run_monitor) + + async def _health() -> None: + await _safe("health", self.run_health) + + self._scheduler = build_scheduler( + [ + JobSpec(name="entry", cron=entry_cron, coro_factory=_entry), + JobSpec(name="monitor", cron=monitor_cron, coro_factory=_monitor), + JobSpec(name="health", cron=health_cron, coro_factory=_health), + ] + ) + return self._scheduler + + async def run_forever(self) -> None: + """Boot, install the scheduler, and block forever (until cancelled).""" + await self.boot() + scheduler = self.install_scheduler() + scheduler.start() + try: + await asyncio.Event().wait() + finally: + scheduler.shutdown(wait=False) + + +# --------------------------------------------------------------------------- +# Convenience builder for the CLI +# --------------------------------------------------------------------------- + + +def make_orchestrator( + *, + cfg: StrategyConfig, + endpoints: McpEndpoints, + token: str, + db_path: Path, + audit_path: Path, + expected_environment: Environment, + eur_to_usd: Decimal, + clock: Callable[[], datetime] | None = None, +) -> Orchestrator: + """Build a fresh :class:`Orchestrator` ready for ``boot``/``run_*``.""" + ctx = build_runtime( + cfg=cfg, + endpoints=endpoints, + token=token, + db_path=db_path, + audit_path=audit_path, + clock=clock or (lambda: datetime.now(UTC)), + ) + return Orchestrator( + ctx, expected_environment=expected_environment, eur_to_usd=eur_to_usd + ) diff --git a/src/cerbero_bite/runtime/recovery.py b/src/cerbero_bite/runtime/recovery.py new file mode 100644 index 0000000..8e499c1 --- /dev/null +++ b/src/cerbero_bite/runtime/recovery.py @@ -0,0 +1,117 @@ +"""Recover state on engine boot (``docs/06-operational-flow.md`` §6). + +The recovery loop never trades: it only aligns the SQLite state with +the broker. Any unresolvable discrepancy (a position the broker does +not know about, a leg that disappeared, etc.) arms the kill switch. +""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime + +from cerbero_bite.runtime.dependencies import RuntimeContext +from cerbero_bite.state import PositionRecord, transaction +from cerbero_bite.state import connect as connect_state + +__all__ = ["recover_state"] + + +_log = logging.getLogger("cerbero_bite.runtime.recovery") + + +async def recover_state(ctx: RuntimeContext, *, now: datetime | None = None) -> None: + """Reconcile SQLite positions with the broker's open positions.""" + when = (now or ctx.clock()).astimezone(UTC) + + conn = connect_state(ctx.db_path) + try: + in_flight: list[PositionRecord] = ctx.repository.list_positions( + conn, status="awaiting_fill" + ) + in_flight += ctx.repository.list_positions(conn, status="closing") + opens = ctx.repository.list_positions(conn, status="open") + finally: + conn.close() + + if not in_flight and not opens: + ctx.audit_log.append( + event="RECOVERY_DONE", payload={"reconciled": 0}, now=when + ) + return + + broker_positions = await ctx.deribit.get_positions(currency="USDC") + broker_by_instrument: dict[str, dict[str, object]] = {} + for p in broker_positions: + instrument = p.get("instrument") + if isinstance(instrument, str): + broker_by_instrument[instrument] = p + + discrepancies: list[str] = [] + + # Awaiting-fill / closing → resolve to open or cancelled. + for record in in_flight: + seen = ( + record.short_instrument in broker_by_instrument + and record.long_instrument in broker_by_instrument + ) + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + if seen: + # Both awaiting_fill and closing collapse to "open" + # once the broker confirms the legs are present. + next_status = "open" + ctx.repository.update_position_status( + conn, + record.proposal_id, + status=next_status, # type: ignore[arg-type] + opened_at=record.opened_at or when, + now=when, + ) + else: + ctx.repository.update_position_status( + conn, + record.proposal_id, + status="cancelled", + closed_at=when, + close_reason="recovery_no_fill", + now=when, + ) + discrepancies.append( + f"{record.proposal_id}: was {record.status}, broker shows nothing" + ) + finally: + conn.close() + + # Open positions → must be present on broker. If not, alarm. + for record in opens: + if ( + record.short_instrument in broker_by_instrument + and record.long_instrument in broker_by_instrument + ): + continue + discrepancies.append( + f"{record.proposal_id}: open in DB but missing on broker" + ) + + if discrepancies: + await ctx.alert_manager.critical( + source="recovery", + message="state inconsistencies detected: " + "; ".join(discrepancies), + component="runtime.recovery", + ) + + ctx.audit_log.append( + event="RECOVERY_DONE", + payload={ + "reconciled": len(in_flight) + len(opens), + "discrepancies": len(discrepancies), + }, + now=when, + ) + _log.info( + "recovery done: reconciled=%d discrepancies=%d", + len(in_flight) + len(opens), + len(discrepancies), + ) diff --git a/src/cerbero_bite/runtime/scheduler.py b/src/cerbero_bite/runtime/scheduler.py new file mode 100644 index 0000000..421d595 --- /dev/null +++ b/src/cerbero_bite/runtime/scheduler.py @@ -0,0 +1,66 @@ +"""APScheduler bootstrap (``docs/06-operational-flow.md``). + +Wraps :class:`AsyncIOScheduler` so the orchestrator can register the +documented cron jobs in one place. The scheduler is built but not +started; ``start()`` must be called from inside the running event +loop. +""" + +from __future__ import annotations + +import logging +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger + +__all__ = ["JobSpec", "build_scheduler"] + + +_log = logging.getLogger("cerbero_bite.runtime.scheduler") + + +@dataclass(frozen=True) +class JobSpec: + """One row in the scheduler manifest.""" + + name: str + cron: str + coro_factory: Callable[[], Awaitable[None]] + + +def _parse_cron(expr: str) -> CronTrigger: + parts = expr.split() + if len(parts) != 5: + raise ValueError(f"cron must have 5 fields, got: {expr!r}") + minute, hour, day, month, day_of_week = parts + return CronTrigger( + minute=minute, + hour=hour, + day=day, + month=month, + day_of_week=day_of_week, + timezone="UTC", + ) + + +def build_scheduler(jobs: list[JobSpec]) -> AsyncIOScheduler: + """Return an :class:`AsyncIOScheduler` with all *jobs* registered. + + The scheduler is *not* started — the caller is responsible for + invoking ``start()`` after constructing it on a running event loop. + """ + scheduler = AsyncIOScheduler(timezone="UTC") + for spec in jobs: + scheduler.add_job( + spec.coro_factory, + trigger=_parse_cron(spec.cron), + id=spec.name, + name=spec.name, + replace_existing=True, + coalesce=True, + misfire_grace_time=300, + ) + _log.info("scheduled job %s with cron %s", spec.name, spec.cron) + return scheduler diff --git a/tests/integration/test_entry_cycle.py b/tests/integration/test_entry_cycle.py new file mode 100644 index 0000000..7e02e66 --- /dev/null +++ b/tests/integration/test_entry_cycle.py @@ -0,0 +1,540 @@ +"""Integration tests for the weekly entry cycle. + +Every external service is mocked via ``pytest-httpx``. The cycle +exercises the production code paths end-to-end: snapshot collection, +entry validation, bias, strike selection, liquidity, sizing, combo +order placement, and persistence. +""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from pathlib import Path +from typing import Any +from uuid import uuid4 + +import pytest +from pytest_httpx import HTTPXMock + +from cerbero_bite.config import StrategyConfig, golden_config +from cerbero_bite.config.mcp_endpoints import load_endpoints +from cerbero_bite.runtime import build_runtime +from cerbero_bite.runtime.entry_cycle import run_entry_cycle +from cerbero_bite.state import ( + PositionRecord, + connect, + transaction, +) +from cerbero_bite.state import connect as connect_state + +pytestmark = pytest.mark.httpx_mock(assert_all_responses_were_requested=False) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def now() -> datetime: + return datetime(2026, 4, 27, 14, 0, tzinfo=UTC) + + +@pytest.fixture +def cfg() -> StrategyConfig: + return golden_config() + + +@pytest.fixture +def runtime_paths(tmp_path: Path) -> tuple[Path, Path]: + return tmp_path / "state.sqlite", tmp_path / "audit.log" + + +def _ctx( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, +): + db, audit = runtime_paths + return build_runtime( + cfg=cfg, + endpoints=load_endpoints(env={}), + token="t", + db_path=db, + audit_path=audit, + retry_max=1, + clock=lambda: now, + ) + + +def _option_name(strike: int, opt: str = "P", expiry: str = "15MAY26") -> str: + return f"ETH-{expiry}-{strike}-{opt}" + + +def _wire_market_snapshot( + httpx_mock: HTTPXMock, + *, + spot: float = 3000.0, + dvol: float = 50.0, + funding_perp_hourly: float = 0.0, + funding_cross_period: float = 0.0001, + macro_events: list[dict[str, Any]] | None = None, + eth_pct: float = 0.10, + portfolio_eur: float | Decimal = 5000.0, +) -> None: + """Stub every MCP endpoint queried during the snapshot stage.""" + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_ticker", + json={"instrument_name": "ETH-PERPETUAL", "mark_price": spot}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_dvol", + json={"currency": "ETH", "latest": dvol, "candles": []}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-hyperliquid:9012/tools/get_funding_rate", + json={"asset": "ETH", "current_funding_rate": funding_perp_hourly}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-sentiment:9014/tools/get_cross_exchange_funding", + json={ + "snapshot": { + "ETH": { + "binance": funding_cross_period, + "bybit": funding_cross_period, + "okx": funding_cross_period, + "hyperliquid": None, + } + } + }, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-macro:9013/tools/get_macro_calendar", + json={"events": macro_events or []}, + is_reusable=True, + ) + portfolio_eur_f = float(portfolio_eur) + httpx_mock.add_response( + url="http://mcp-portfolio:9018/tools/get_holdings", + json=[ + {"ticker": "AAPL", "current_value_eur": portfolio_eur_f * (1 - eth_pct)}, + {"ticker": "ETH-USD", "current_value_eur": portfolio_eur_f * eth_pct}, + ], + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-portfolio:9018/tools/get_total_portfolio_value", + json={"total_value_eur": portfolio_eur_f}, + is_reusable=True, + ) + + +def _wire_chain_and_quotes( + httpx_mock: HTTPXMock, + *, + short_strike: int = 2475, + long_strike: int = 2350, + short_mid: float = 0.020, + long_mid: float = 0.005, + short_delta: float = -0.12, + long_delta: float = -0.08, +) -> None: + """Stub the option chain → quotes → orderbook flow. + + The two strikes returned satisfy the golden config gates by default: + OTM range, delta range, width 4% × 3000 = 120, credit 0.015 ETH × 3000 + = 45 USD vs width 125 USD ≈ 36% (≥ 30% gate), liquidity OK. + """ + short_name = _option_name(short_strike) + long_name = _option_name(long_strike) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_instruments", + json={ + "instruments": [ + {"name": short_name, "open_interest": 500, "tick_size": 0.0005}, + {"name": long_name, "open_interest": 400, "tick_size": 0.0005}, + ] + }, + is_reusable=True, + ) + # Use tight 1% bid-ask spread relative to mid so the liquidity gate + # passes regardless of strike (otherwise the long leg's spread + # blows past the 15% cap on small premiums). + short_half = short_mid * 0.005 + long_half = long_mid * 0.005 + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_ticker_batch", + json={ + "tickers": [ + { + "instrument_name": short_name, + "bid": short_mid - short_half, + "ask": short_mid + short_half, + "mark_price": short_mid, + "volume_24h": 200, + "greeks": { + "delta": short_delta, + "gamma": 0.001, + "theta": -0.0005, + "vega": 0.10, + }, + }, + { + "instrument_name": long_name, + "bid": long_mid - long_half, + "ask": long_mid + long_half, + "mark_price": long_mid, + "volume_24h": 150, + "greeks": { + "delta": long_delta, + "gamma": 0.001, + "theta": -0.0003, + "vega": 0.07, + }, + }, + ], + "errors": [], + }, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_orderbook", + json={"bids": [[1, 50]], "asks": [[2, 50]]}, + is_reusable=True, + ) + + +def _wire_combo_order( + httpx_mock: HTTPXMock, *, state: str = "filled" +) -> None: + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/place_combo_order", + json={ + "combo_instrument": "ETH-15MAY26-2475P_2350P", + "order_id": "ord-1", + "state": state, + "average_price": 0.005, + "filled_amount": 2, + }, + is_reusable=True, + ) + + +def _wire_telegram_notify_position_opened(httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_position_opened", + json={"ok": True}, + is_reusable=True, + ) + + +# --------------------------------------------------------------------------- +# Happy path +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_happy_path_places_combo_and_records_open_position( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + # bull bias requires bull-trend AND bull-funding. + # Bull funding cross threshold = 0.20 annualised. Period rate × 1095 + # → 0.20/1095 ≈ 0.000183 per period. + _wire_market_snapshot( + httpx_mock, + portfolio_eur=Decimal("3500"), + funding_cross_period=0.0002, + ) + _wire_chain_and_quotes(httpx_mock) + _wire_combo_order(httpx_mock, state="filled") + _wire_telegram_notify_position_opened(httpx_mock) + + # Bypass bias requirement: stub trend == bull by overriding the + # spot snapshot with a value > +5% vs entry. Since the entry cycle + # currently uses spot==spot (no historical data wired), it falls + # into the "neutral trend" branch. To make a directional bias fire + # we use iron_condor: trend neutral + funding neutral + DVOL ≥ 55 + # + ADX < 20. But ADX is hard-coded 25 in the cycle for now, so + # instead we set funding to land in bull territory and accept the + # neutral-vs-bull mismatch which the cycle resolves to "no bias" + # — we bypass via configuration. + # In practice the orchestrator will provide eth_30d_ago; for this + # smoke test we widen bias acceptance with a config override. + bull_cfg = golden_config( + entry=type(cfg.entry)( + **{**cfg.entry.model_dump(), "trend_bull_threshold_pct": Decimal("0")} + ), + ) + ctx = _ctx(bull_cfg, runtime_paths, now) + res = await run_entry_cycle( + ctx, eur_to_usd_rate=Decimal("1.075"), now=now + ) + + assert res.status == "entry_placed", res.reason + assert res.proposal is not None + assert res.order is not None + assert res.order.combo_instrument == "ETH-15MAY26-2475P_2350P" + assert res.proposal.spread_type == "bull_put" + + db_path, _ = runtime_paths + conn = connect(db_path) + try: + positions = ctx.repository.list_positions(conn) + finally: + conn.close() + assert len(positions) == 1 + assert positions[0].status == "open" + + +# --------------------------------------------------------------------------- +# Reject paths +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_kill_switch_short_circuits_cycle( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + ctx = _ctx(cfg, runtime_paths, now) + ctx.kill_switch.arm(reason="test", source="manual") + res = await run_entry_cycle(ctx, eur_to_usd_rate=Decimal("1.075"), now=now) + assert res.status == "kill_switch_armed" + + +@pytest.mark.asyncio +async def test_below_capital_minimum_returns_no_entry( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify", + json={"ok": True}, + is_reusable=True, + ) + # 500 EUR × 1.075 = 537 USD < 720 cfg minimum + _wire_market_snapshot(httpx_mock, portfolio_eur=500.0) + ctx = _ctx(cfg, runtime_paths, now) + res = await run_entry_cycle( + ctx, eur_to_usd_rate=Decimal("1.075"), now=now + ) + assert res.status == "no_entry" + assert "capital" in (res.reason or "").lower() + + +@pytest.mark.asyncio +async def test_macro_event_within_dte_blocks_entry( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify", + json={"ok": True}, + is_reusable=True, + ) + macro_events = [ + { + "name": "FOMC", + "country_code": "US", + "importance": "high", + "datetime_utc": (now + timedelta(days=5)).isoformat(), + } + ] + _wire_market_snapshot(httpx_mock, macro_events=macro_events, portfolio_eur=3500) + ctx = _ctx(cfg, runtime_paths, now) + res = await run_entry_cycle( + ctx, eur_to_usd_rate=Decimal("1.075"), now=now + ) + assert res.status == "no_entry" + assert "macro" in (res.reason or "").lower() + + +@pytest.mark.asyncio +async def test_no_bias_returns_no_entry( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify", + json={"ok": True}, + is_reusable=True, + ) + # Funding cross neutral (=0) and DVOL 40 → no IC, no directional; + # entry validates clean otherwise. + _wire_market_snapshot( + httpx_mock, + portfolio_eur=3500, + dvol=40.0, + funding_cross_period=0.0, + ) + ctx = _ctx(cfg, runtime_paths, now) + res = await run_entry_cycle( + ctx, eur_to_usd_rate=Decimal("1.075"), now=now + ) + assert res.status == "no_entry" + assert res.reason == "no_bias" + + +@pytest.mark.asyncio +async def test_undersize_returns_no_entry( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + """Capital that produces n_contracts < 1 yields no_entry/undersize.""" + # Capital just above minimum (720 USD ≈ 670 EUR) but with high + # max_loss/contract → sizing returns 0. + _wire_market_snapshot( + httpx_mock, + portfolio_eur=670.0, + funding_cross_period=0.0002, + ) + _wire_chain_and_quotes(httpx_mock, short_strike=2400, long_strike=2150) + bull_cfg = golden_config( + entry=type(cfg.entry)( + **{**cfg.entry.model_dump(), "trend_bull_threshold_pct": Decimal("0")} + ), + ) + ctx = _ctx(bull_cfg, runtime_paths, now) + res = await run_entry_cycle(ctx, eur_to_usd_rate=Decimal("1.075"), now=now) + assert res.status == "no_entry" + assert res.reason in {"undersize", "no_strike", "illiquid"} + + +@pytest.mark.asyncio +async def test_no_strike_when_chain_is_empty( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + _wire_market_snapshot( + httpx_mock, portfolio_eur=3500.0, funding_cross_period=0.0002 + ) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_instruments", + json={"instruments": []}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_ticker_batch", + json={"tickers": [], "errors": []}, + is_reusable=True, + ) + bull_cfg = golden_config( + entry=type(cfg.entry)( + **{**cfg.entry.model_dump(), "trend_bull_threshold_pct": Decimal("0")} + ), + ) + ctx = _ctx(bull_cfg, runtime_paths, now) + res = await run_entry_cycle(ctx, eur_to_usd_rate=Decimal("1.075"), now=now) + assert res.status == "no_entry" + assert res.reason == "no_strike" + + +@pytest.mark.asyncio +async def test_broker_reject_marks_position_cancelled( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + _wire_market_snapshot( + httpx_mock, portfolio_eur=3500.0, funding_cross_period=0.0002 + ) + _wire_chain_and_quotes(httpx_mock) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/place_combo_order", + json={ + "combo_instrument": "ETH-15MAY26-2475P_2350P", + "order_id": None, + "state": "rejected", + "average_price": None, + "filled_amount": 0, + }, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_alert", + json={"ok": True}, + is_reusable=True, + ) + bull_cfg = golden_config( + entry=type(cfg.entry)( + **{**cfg.entry.model_dump(), "trend_bull_threshold_pct": Decimal("0")} + ), + ) + ctx = _ctx(bull_cfg, runtime_paths, now) + res = await run_entry_cycle(ctx, eur_to_usd_rate=Decimal("1.075"), now=now) + assert res.status == "broker_reject" + db_path, _ = runtime_paths + conn = connect(db_path) + try: + positions = ctx.repository.list_positions(conn) + finally: + conn.close() + assert positions[0].status == "cancelled" + assert ctx.kill_switch.is_armed() is True + + +@pytest.mark.asyncio +async def test_already_open_position_skips_cycle( + cfg: StrategyConfig, + runtime_paths: tuple[Path, Path], + now: datetime, + httpx_mock: HTTPXMock, +) -> None: + ctx = _ctx(cfg, runtime_paths, now) + # Pre-seed an open position + record = PositionRecord( + proposal_id=uuid4(), + spread_type="bull_put", + expiry=now + timedelta(days=18), + short_strike=Decimal("2475"), + long_strike=Decimal("2350"), + short_instrument="X", + long_instrument="Y", + n_contracts=1, + spread_width_usd=Decimal("125"), + spread_width_pct=Decimal("0.04"), + credit_eth=Decimal("0.015"), + credit_usd=Decimal("45"), + max_loss_usd=Decimal("80"), + spot_at_entry=Decimal("3000"), + dvol_at_entry=Decimal("50"), + delta_at_entry=Decimal("-0.12"), + eth_price_at_entry=Decimal("3000"), + proposed_at=now, + status="open", + created_at=now, + updated_at=now, + ) + db_path, _ = runtime_paths + conn = connect_state(db_path) + try: + with transaction(conn): + ctx.repository.create_position(conn, record) + finally: + conn.close() + + res = await run_entry_cycle( + ctx, eur_to_usd_rate=Decimal("1.075"), now=now + ) + assert res.status == "has_open_position" diff --git a/tests/integration/test_health_check.py b/tests/integration/test_health_check.py new file mode 100644 index 0000000..a4b545d --- /dev/null +++ b/tests/integration/test_health_check.py @@ -0,0 +1,223 @@ +"""Tests for the periodic health-check probe.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path + +import pytest +from pytest_httpx import HTTPXMock + +from cerbero_bite.config import golden_config +from cerbero_bite.config.mcp_endpoints import load_endpoints +from cerbero_bite.runtime import build_runtime +from cerbero_bite.runtime.health_check import HealthCheck + +pytestmark = pytest.mark.httpx_mock(assert_all_responses_were_requested=False) + + +def _now() -> datetime: + return datetime(2026, 4, 27, 14, 0, tzinfo=UTC) + + +def _ctx(tmp_path: Path): + return build_runtime( + cfg=golden_config(), + endpoints=load_endpoints(env={}), + token="t", + db_path=tmp_path / "state.sqlite", + audit_path=tmp_path / "audit.log", + retry_max=1, + clock=_now, + ) + + +def _wire_all_ok(httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/environment_info", + json={ + "exchange": "deribit", + "environment": "testnet", + "source": "env", + "env_value": "true", + "base_url": "https://test.deribit.com/api/v2", + "max_leverage": 3, + }, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-macro:9013/tools/get_macro_calendar", + json={"events": []}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-sentiment:9014/tools/get_cross_exchange_funding", + json={"snapshot": {}}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-hyperliquid:9012/tools/get_funding_rate", + json={"asset": "ETH", "current_funding_rate": 0.0001}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-portfolio:9018/tools/get_total_portfolio_value", + json={"total_value_eur": 1000.0}, + is_reusable=True, + ) + + +@pytest.mark.asyncio +async def test_all_services_ok_emits_health_ok( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + ctx = _ctx(tmp_path) + hc = HealthCheck(ctx, expected_environment="testnet") + _wire_all_ok(httpx_mock) + res = await hc.run() + assert res.state == "ok" + assert res.consecutive_failures == 0 + + +@pytest.mark.asyncio +async def test_environment_mismatch_counts_as_failure( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + ctx = _ctx(tmp_path) + hc = HealthCheck(ctx, expected_environment="testnet") + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/environment_info", + json={ + "exchange": "deribit", + "environment": "mainnet", + "source": "env", + "env_value": "false", + "base_url": "https://www.deribit.com/api/v2", + "max_leverage": 3, + }, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-macro:9013/tools/get_macro_calendar", + json={"events": []}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-sentiment:9014/tools/get_cross_exchange_funding", + json={"snapshot": {}}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-hyperliquid:9012/tools/get_funding_rate", + json={"asset": "ETH", "current_funding_rate": 0.0001}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-portfolio:9018/tools/get_total_portfolio_value", + json={"total_value_eur": 1000.0}, + is_reusable=True, + ) + res = await hc.run() + assert res.state == "degraded" + assert any("environment mismatch" in r for _s, r in res.failures) + + +@pytest.mark.asyncio +async def test_three_consecutive_failures_arm_kill_switch( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + ctx = _ctx(tmp_path) + hc = HealthCheck(ctx, expected_environment="testnet", kill_after=3) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/environment_info", + status_code=500, + text="boom", + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-macro:9013/tools/get_macro_calendar", + json={"events": []}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-sentiment:9014/tools/get_cross_exchange_funding", + json={"snapshot": {}}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-hyperliquid:9012/tools/get_funding_rate", + json={"asset": "ETH", "current_funding_rate": 0.0001}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-portfolio:9018/tools/get_total_portfolio_value", + json={"total_value_eur": 1000.0}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_alert", + json={"ok": True}, + is_reusable=True, + ) + + for _ in range(2): + await hc.run() + assert ctx.kill_switch.is_armed() is False + + res = await hc.run() + assert res.consecutive_failures == 3 + assert ctx.kill_switch.is_armed() is True + + +@pytest.mark.asyncio +async def test_recovered_run_resets_counter( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + ctx = _ctx(tmp_path) + hc = HealthCheck(ctx, expected_environment="testnet", kill_after=10) + # First run fails on deribit + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/environment_info", + status_code=500, + text="boom", + ) + # Other services OK + httpx_mock.add_response( + url="http://mcp-macro:9013/tools/get_macro_calendar", + json={"events": []}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-sentiment:9014/tools/get_cross_exchange_funding", + json={"snapshot": {}}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-hyperliquid:9012/tools/get_funding_rate", + json={"asset": "ETH", "current_funding_rate": 0.0001}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-portfolio:9018/tools/get_total_portfolio_value", + json={"total_value_eur": 1000.0}, + is_reusable=True, + ) + res = await hc.run() + assert res.state == "degraded" + assert res.consecutive_failures == 1 + + # Second run: deribit recovers + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/environment_info", + json={ + "exchange": "deribit", + "environment": "testnet", + "source": "env", + "env_value": "true", + "base_url": "https://test.deribit.com/api/v2", + "max_leverage": 3, + }, + ) + res = await hc.run() + assert res.state == "ok" + assert res.consecutive_failures == 0 diff --git a/tests/integration/test_monitor_cycle.py b/tests/integration/test_monitor_cycle.py new file mode 100644 index 0000000..d98a5fd --- /dev/null +++ b/tests/integration/test_monitor_cycle.py @@ -0,0 +1,296 @@ +"""Integration tests for the monitor cycle (open positions → exit decisions).""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from pathlib import Path +from uuid import uuid4 + +import pytest +from pytest_httpx import HTTPXMock + +from cerbero_bite.config import golden_config +from cerbero_bite.config.mcp_endpoints import load_endpoints +from cerbero_bite.runtime import build_runtime +from cerbero_bite.runtime.monitor_cycle import run_monitor_cycle +from cerbero_bite.state import ( + DvolSnapshot, + PositionRecord, + transaction, +) +from cerbero_bite.state import connect as connect_state + +pytestmark = pytest.mark.httpx_mock(assert_all_responses_were_requested=False) + + +@pytest.fixture +def now() -> datetime: + return datetime(2026, 4, 27, 14, 0, tzinfo=UTC) + + +@pytest.fixture +def runtime_paths(tmp_path: Path) -> tuple[Path, Path]: + return tmp_path / "state.sqlite", tmp_path / "audit.log" + + +def _seed_position( + ctx, + *, + proposal_id, + short_mid_at_entry: Decimal, + long_mid_at_entry: Decimal, + n_contracts: int = 2, + now: datetime, +): + short_strike = Decimal("2475") + long_strike = Decimal("2350") + width = short_strike - long_strike + credit_eth_per = short_mid_at_entry - long_mid_at_entry + spot = Decimal("3000") + record = PositionRecord( + proposal_id=proposal_id, + spread_type="bull_put", + expiry=now + timedelta(days=18), + short_strike=short_strike, + long_strike=long_strike, + short_instrument="ETH-15MAY26-2475-P", + long_instrument="ETH-15MAY26-2350-P", + n_contracts=n_contracts, + spread_width_usd=width, + spread_width_pct=width / spot, + credit_eth=credit_eth_per * Decimal(n_contracts), + credit_usd=credit_eth_per * Decimal(n_contracts) * spot, + max_loss_usd=(width - credit_eth_per * spot) * Decimal(n_contracts), + spot_at_entry=spot, + dvol_at_entry=Decimal("50"), + delta_at_entry=Decimal("-0.12"), + eth_price_at_entry=spot, + proposed_at=now - timedelta(days=4), + opened_at=now - timedelta(days=4), + status="open", + created_at=now - timedelta(days=4), + updated_at=now - timedelta(days=4), + ) + db_path = ctx.db_path + conn = connect_state(db_path) + try: + with transaction(conn): + ctx.repository.create_position(conn, record) + finally: + conn.close() + return record + + +def _seed_dvol_history(ctx, *, when: datetime, spot: Decimal, dvol: Decimal): + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.record_dvol_snapshot( + conn, DvolSnapshot(timestamp=when, dvol=dvol, eth_spot=spot) + ) + finally: + conn.close() + + +def _wire_market_data( + httpx_mock: HTTPXMock, + *, + spot: float = 3000.0, + dvol: float = 50.0, +) -> None: + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_ticker", + json={"instrument_name": "ETH-PERPETUAL", "mark_price": spot}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_dvol", + json={"currency": "ETH", "latest": dvol, "candles": []}, + is_reusable=True, + ) + + +def _wire_position_quotes( + httpx_mock: HTTPXMock, + *, + short_mid: float, + long_mid: float, + short_delta: float = -0.12, + long_delta: float = -0.08, +) -> None: + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_ticker_batch", + json={ + "tickers": [ + { + "instrument_name": "ETH-15MAY26-2475-P", + "bid": short_mid * 0.995, + "ask": short_mid * 1.005, + "mark_price": short_mid, + "greeks": { + "delta": short_delta, + "gamma": 0.001, + "theta": -0.0005, + "vega": 0.10, + }, + }, + { + "instrument_name": "ETH-15MAY26-2350-P", + "bid": long_mid * 0.995, + "ask": long_mid * 1.005, + "mark_price": long_mid, + "greeks": { + "delta": long_delta, + "gamma": 0.001, + "theta": -0.0003, + "vega": 0.07, + }, + }, + ], + "errors": [], + }, + is_reusable=True, + ) + + +def _ctx(runtime_paths, now: datetime): + db, audit = runtime_paths + return build_runtime( + cfg=golden_config(), + endpoints=load_endpoints(env={}), + token="t", + db_path=db, + audit_path=audit, + retry_max=1, + clock=lambda: now, + ) + + +@pytest.mark.asyncio +async def test_monitor_emits_hold_when_no_trigger_fires( + runtime_paths: tuple[Path, Path], now: datetime, httpx_mock: HTTPXMock +) -> None: + ctx = _ctx(runtime_paths, now) + proposal_id = uuid4() + _seed_position( + ctx, + proposal_id=proposal_id, + short_mid_at_entry=Decimal("0.020"), + long_mid_at_entry=Decimal("0.005"), + now=now, + ) + + _wire_market_data(httpx_mock) + # Mark close to entry (mid still 60% of credit) → HOLD + _wire_position_quotes(httpx_mock, short_mid=0.0143, long_mid=0.0050) + + res = await run_monitor_cycle(ctx, now=now) + assert len(res.outcomes) == 1 + assert res.outcomes[0].action == "HOLD" + assert res.outcomes[0].closed is False + + +@pytest.mark.asyncio +async def test_monitor_closes_position_on_profit_take( + runtime_paths: tuple[Path, Path], now: datetime, httpx_mock: HTTPXMock +) -> None: + ctx = _ctx(runtime_paths, now) + proposal_id = uuid4() + _seed_position( + ctx, + proposal_id=proposal_id, + short_mid_at_entry=Decimal("0.020"), + long_mid_at_entry=Decimal("0.005"), + now=now, + ) + + _wire_market_data(httpx_mock) + # mark = 30% of credit → profit take fires + # credit per contract 0.015, so mark per contract 0.0045 → short-long + _wire_position_quotes(httpx_mock, short_mid=0.0080, long_mid=0.0035) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/place_combo_order", + json={ + "combo_instrument": "ETH-15MAY26-2475P_2350P", + "order_id": "close-1", + "state": "filled", + "average_price": 0.0045, + "filled_amount": 2, + }, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_position_closed", + json={"ok": True}, + is_reusable=True, + ) + + res = await run_monitor_cycle(ctx, now=now) + assert len(res.outcomes) == 1 + outcome = res.outcomes[0] + assert outcome.action == "CLOSE_PROFIT" + assert outcome.closed is True + + conn = connect_state(ctx.db_path) + try: + positions = ctx.repository.list_positions(conn) + finally: + conn.close() + assert positions[0].status == "closed" + assert positions[0].close_reason == "CLOSE_PROFIT" + + +@pytest.mark.asyncio +async def test_monitor_skips_when_kill_switch_armed( + runtime_paths: tuple[Path, Path], now: datetime, httpx_mock: HTTPXMock +) -> None: + ctx = _ctx(runtime_paths, now) + ctx.kill_switch.arm(reason="test", source="manual") + res = await run_monitor_cycle(ctx, now=now) + assert res.outcomes == [] + + +@pytest.mark.asyncio +async def test_monitor_uses_dvol_history_for_return_4h( + runtime_paths: tuple[Path, Path], now: datetime, httpx_mock: HTTPXMock +) -> None: + ctx = _ctx(runtime_paths, now) + proposal_id = uuid4() + _seed_position( + ctx, + proposal_id=proposal_id, + short_mid_at_entry=Decimal("0.020"), + long_mid_at_entry=Decimal("0.005"), + now=now, + ) + # Snapshot 4h ago: spot was 3300 → return = 3000/3300 - 1 ≈ -9% (adverse) + _seed_dvol_history( + ctx, + when=now - timedelta(hours=4), + spot=Decimal("3300"), + dvol=Decimal("50"), + ) + + _wire_market_data(httpx_mock, spot=3000.0) + _wire_position_quotes(httpx_mock, short_mid=0.0140, long_mid=0.0050) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/place_combo_order", + json={ + "combo_instrument": "ETH-15MAY26-2475P_2350P", + "order_id": "close-1", + "state": "filled", + "average_price": 0.009, + "filled_amount": 2, + }, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_position_closed", + json={"ok": True}, + is_reusable=True, + ) + + res = await run_monitor_cycle(ctx, now=now) + assert res.outcomes[0].action == "CLOSE_AVERSE" + assert res.outcomes[0].closed is True diff --git a/tests/integration/test_orchestrator.py b/tests/integration/test_orchestrator.py new file mode 100644 index 0000000..d63eba3 --- /dev/null +++ b/tests/integration/test_orchestrator.py @@ -0,0 +1,128 @@ +"""Integration tests for the Orchestrator façade (boot + cycle wiring).""" + +from __future__ import annotations + +from datetime import UTC, datetime +from decimal import Decimal +from pathlib import Path + +import pytest +from pytest_httpx import HTTPXMock + +from cerbero_bite.config import golden_config +from cerbero_bite.config.mcp_endpoints import load_endpoints +from cerbero_bite.runtime import Orchestrator +from cerbero_bite.runtime.dependencies import build_runtime + +pytestmark = pytest.mark.httpx_mock(assert_all_responses_were_requested=False) + + +def _now() -> datetime: + return datetime(2026, 4, 27, 14, 0, tzinfo=UTC) + + +def _wire_environment_info( + httpx_mock: HTTPXMock, + *, + environment: str = "testnet", +) -> None: + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/environment_info", + json={ + "exchange": "deribit", + "environment": environment, + "source": "env", + "env_value": "true" if environment == "testnet" else "false", + "base_url": "https://test.deribit.com/api/v2", + "max_leverage": 3, + }, + is_reusable=True, + ) + + +def _wire_health_probes(httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response( + url="http://mcp-macro:9013/tools/get_macro_calendar", + json={"events": []}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-sentiment:9014/tools/get_cross_exchange_funding", + json={"snapshot": {}}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-hyperliquid:9012/tools/get_funding_rate", + json={"asset": "ETH", "current_funding_rate": 0.0001}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-portfolio:9018/tools/get_total_portfolio_value", + json={"total_value_eur": 1000.0}, + is_reusable=True, + ) + + +def _build_orch(tmp_path: Path, *, expected: str = "testnet") -> Orchestrator: + ctx = build_runtime( + cfg=golden_config(), + endpoints=load_endpoints(env={}), + token="t", + db_path=tmp_path / "state.sqlite", + audit_path=tmp_path / "audit.log", + retry_max=1, + clock=_now, + ) + return Orchestrator( + ctx, + expected_environment=expected, # type: ignore[arg-type] + eur_to_usd=Decimal("1.075"), + ) + + +@pytest.mark.asyncio +async def test_boot_succeeds_when_environment_matches( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + _wire_environment_info(httpx_mock, environment="testnet") + _wire_health_probes(httpx_mock) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_positions", + json=[], + is_reusable=True, + ) + + orch = _build_orch(tmp_path, expected="testnet") + boot = await orch.boot() + assert boot.environment == "testnet" + assert boot.health.state == "ok" + assert orch.context.kill_switch.is_armed() is False + + +@pytest.mark.asyncio +async def test_boot_arms_kill_switch_on_environment_mismatch( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + _wire_environment_info(httpx_mock, environment="mainnet") + _wire_health_probes(httpx_mock) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_positions", + json=[], + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_system_error", + json={"ok": True}, + is_reusable=True, + ) + + orch = _build_orch(tmp_path, expected="testnet") + await orch.boot() + assert orch.context.kill_switch.is_armed() is True + + +def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None: + orch = _build_orch(tmp_path) + sched = orch.install_scheduler() + job_ids = {j.id for j in sched.get_jobs()} + assert job_ids == {"entry", "monitor", "health"} diff --git a/tests/integration/test_recovery.py b/tests/integration/test_recovery.py new file mode 100644 index 0000000..cb172b5 --- /dev/null +++ b/tests/integration/test_recovery.py @@ -0,0 +1,174 @@ +"""Integration tests for the recovery loop.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from pathlib import Path +from uuid import uuid4 + +import pytest +from pytest_httpx import HTTPXMock + +from cerbero_bite.config import golden_config +from cerbero_bite.config.mcp_endpoints import load_endpoints +from cerbero_bite.runtime import build_runtime +from cerbero_bite.runtime.recovery import recover_state +from cerbero_bite.state import PositionRecord, transaction +from cerbero_bite.state import connect as connect_state + +pytestmark = pytest.mark.httpx_mock(assert_all_responses_were_requested=False) + + +def _now() -> datetime: + return datetime(2026, 4, 27, 14, 0, tzinfo=UTC) + + +def _build_ctx(tmp_path: Path): + return build_runtime( + cfg=golden_config(), + endpoints=load_endpoints(env={}), + token="t", + db_path=tmp_path / "state.sqlite", + audit_path=tmp_path / "audit.log", + retry_max=1, + clock=_now, + ) + + +def _make_record(*, status: str, proposal_id, opened: datetime | None = None) -> PositionRecord: + base_now = _now() + return PositionRecord( + proposal_id=proposal_id, + spread_type="bull_put", + expiry=base_now + timedelta(days=18), + short_strike=Decimal("2475"), + long_strike=Decimal("2350"), + short_instrument="ETH-15MAY26-2475-P", + long_instrument="ETH-15MAY26-2350-P", + n_contracts=2, + spread_width_usd=Decimal("125"), + spread_width_pct=Decimal("0.04"), + credit_eth=Decimal("0.030"), + credit_usd=Decimal("90"), + max_loss_usd=Decimal("160"), + spot_at_entry=Decimal("3000"), + dvol_at_entry=Decimal("50"), + delta_at_entry=Decimal("-0.12"), + eth_price_at_entry=Decimal("3000"), + proposed_at=base_now - timedelta(hours=1), + opened_at=opened, + status=status, + created_at=base_now - timedelta(hours=1), + updated_at=base_now - timedelta(hours=1), + ) + + +@pytest.mark.asyncio +async def test_recovery_promotes_awaiting_fill_when_broker_shows_position( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + ctx = _build_ctx(tmp_path) + pid = uuid4() + record = _make_record(status="awaiting_fill", proposal_id=pid) + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.create_position(conn, record) + finally: + conn.close() + + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_positions", + json=[ + {"instrument": "ETH-15MAY26-2475-P", "size": 2}, + {"instrument": "ETH-15MAY26-2350-P", "size": 2}, + ], + ) + + await recover_state(ctx, now=_now()) + + conn = connect_state(ctx.db_path) + try: + positions = ctx.repository.list_positions(conn) + finally: + conn.close() + assert positions[0].status == "open" + assert positions[0].opened_at is not None + + +@pytest.mark.asyncio +async def test_recovery_cancels_awaiting_fill_when_broker_lacks_legs( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + ctx = _build_ctx(tmp_path) + pid = uuid4() + record = _make_record(status="awaiting_fill", proposal_id=pid) + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.create_position(conn, record) + finally: + conn.close() + + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_positions", + json=[], + ) + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_system_error", + json={"ok": True}, + is_reusable=True, + ) + + await recover_state(ctx, now=_now()) + + conn = connect_state(ctx.db_path) + try: + positions = ctx.repository.list_positions(conn) + finally: + conn.close() + assert positions[0].status == "cancelled" + assert positions[0].close_reason == "recovery_no_fill" + # discrepancies → kill switch armed + assert ctx.kill_switch.is_armed() is True + + +@pytest.mark.asyncio +async def test_recovery_alerts_on_open_position_missing_on_broker( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + ctx = _build_ctx(tmp_path) + pid = uuid4() + record = _make_record( + status="open", proposal_id=pid, opened=_now() - timedelta(days=1) + ) + conn = connect_state(ctx.db_path) + try: + with transaction(conn): + ctx.repository.create_position(conn, record) + finally: + conn.close() + + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_positions", + json=[], + ) + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_system_error", + json={"ok": True}, + is_reusable=True, + ) + + await recover_state(ctx, now=_now()) + assert ctx.kill_switch.is_armed() is True + + +@pytest.mark.asyncio +async def test_recovery_noop_when_no_in_flight_positions( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + ctx = _build_ctx(tmp_path) + # No HTTP stubs needed because get_positions is not even called. + await recover_state(ctx, now=_now()) + assert ctx.kill_switch.is_armed() is False diff --git a/tests/unit/test_alert_manager.py b/tests/unit/test_alert_manager.py new file mode 100644 index 0000000..97510a8 --- /dev/null +++ b/tests/unit/test_alert_manager.py @@ -0,0 +1,139 @@ +"""Tests for AlertManager.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path + +import pytest +from pytest_httpx import HTTPXMock + +from cerbero_bite.clients._base import HttpToolClient +from cerbero_bite.clients.telegram import TelegramClient +from cerbero_bite.runtime.alert_manager import AlertManager, Severity +from cerbero_bite.safety import AuditLog, iter_entries +from cerbero_bite.safety.kill_switch import KillSwitch +from cerbero_bite.state import Repository, connect, run_migrations, transaction + + +def _make_alert_manager(tmp_path: Path) -> tuple[AlertManager, Path, Path, KillSwitch]: + db_path = tmp_path / "state.sqlite" + audit_path = tmp_path / "audit.log" + conn = connect(db_path) + run_migrations(conn) + repo = Repository() + with transaction(conn): + repo.init_system_state( + conn, config_version="1.0.0", now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC) + ) + conn.close() + + audit = AuditLog(audit_path) + times = iter( + datetime(2026, 4, 27, 14, m, tzinfo=UTC) for m in range(0, 50) + ) + ks = KillSwitch( + connection_factory=lambda: connect(db_path), + repository=Repository(), + audit_log=audit, + clock=lambda: next(times), + ) + telegram = TelegramClient( + HttpToolClient( + service="telegram", + base_url="http://mcp-telegram:9017", + token="t", + retry_max=1, + ) + ) + return AlertManager(telegram=telegram, audit_log=audit, kill_switch=ks), audit_path, db_path, ks + + +@pytest.mark.asyncio +async def test_low_emits_audit_only(tmp_path: Path, httpx_mock: HTTPXMock) -> None: + am, audit_path, _, ks = _make_alert_manager(tmp_path) + await am.low(source="test", message="just info") + entries = list(iter_entries(audit_path)) + assert len(entries) == 1 + assert entries[0].event == "ALERT" + assert entries[0].payload["severity"] == "low" + assert ks.is_armed() is False + # No telegram call expected + assert httpx_mock.get_requests() == [] + + +@pytest.mark.asyncio +async def test_medium_calls_telegram_notify(tmp_path: Path, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify", json={"ok": True} + ) + am, audit_path, _, ks = _make_alert_manager(tmp_path) + await am.medium(source="entry_cycle", message="snapshot delayed") + requests = httpx_mock.get_requests() + assert len(requests) == 1 + body = json.loads(requests[0].read()) + assert body["message"] == "[entry_cycle] snapshot delayed" + assert body["priority"] == "high" + assert body["tag"] == "entry_cycle" + assert ks.is_armed() is False + assert any(e.payload["severity"] == "medium" for e in iter_entries(audit_path)) + + +@pytest.mark.asyncio +async def test_high_arms_kill_switch_and_calls_notify_alert( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_alert", json={"ok": True} + ) + am, _, _, ks = _make_alert_manager(tmp_path) + await am.high(source="health", message="3 consecutive MCP failures") + body = json.loads(httpx_mock.get_request().read()) + assert body == { + "source": "health", + "message": "3 consecutive MCP failures", + "priority": "high", + } + assert ks.is_armed() is True + + +@pytest.mark.asyncio +async def test_critical_arms_kill_switch_and_calls_notify_system_error( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_system_error", json={"ok": True} + ) + am, _, _, ks = _make_alert_manager(tmp_path) + await am.critical( + source="audit_chain", + message="hash chain mismatch on line 142", + component="safety.audit_log", + ) + body = json.loads(httpx_mock.get_request().read()) + assert body["component"] == "safety.audit_log" + assert body["priority"] == "critical" + assert ks.is_armed() is True + + +@pytest.mark.asyncio +async def test_critical_when_already_armed_is_idempotent( + tmp_path: Path, httpx_mock: HTTPXMock +) -> None: + httpx_mock.add_response( + url="http://mcp-telegram:9017/tools/notify_system_error", json={"ok": True} + ) + am, _, _, ks = _make_alert_manager(tmp_path) + ks.arm(reason="prior", source="manual") + assert ks.is_armed() is True + await am.critical(source="x", message="anomaly") + assert ks.is_armed() is True + + +@pytest.mark.asyncio +async def test_emit_with_severity_enum(tmp_path: Path, httpx_mock: HTTPXMock) -> None: + am, audit_path, _, _ks = _make_alert_manager(tmp_path) + await am.emit(Severity.LOW, source="t", message="m") + entries = list(iter_entries(audit_path)) + assert entries[0].payload["severity"] == "low" diff --git a/tests/unit/test_runtime_dependencies.py b/tests/unit/test_runtime_dependencies.py new file mode 100644 index 0000000..e070fd8 --- /dev/null +++ b/tests/unit/test_runtime_dependencies.py @@ -0,0 +1,55 @@ +"""Tests for the runtime dependency container.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path + +from cerbero_bite.config import golden_config +from cerbero_bite.config.mcp_endpoints import load_endpoints +from cerbero_bite.runtime import build_runtime +from cerbero_bite.state import connect + + +def test_build_runtime_creates_state_and_audit_files(tmp_path: Path) -> None: + db_path = tmp_path / "state.sqlite" + audit_path = tmp_path / "audit.log" + + ctx = build_runtime( + cfg=golden_config(), + endpoints=load_endpoints(env={}), + token="t", + db_path=db_path, + audit_path=audit_path, + clock=lambda: datetime(2026, 4, 27, 14, 0, tzinfo=UTC), + ) + + assert db_path.exists() + assert ctx.audit_log.path == audit_path + # system_state singleton initialised + conn = connect(db_path) + try: + state = ctx.repository.get_system_state(conn) + finally: + conn.close() + assert state is not None + assert state.config_version == ctx.cfg.config_version + + +def test_build_runtime_clients_pinned_to_endpoints(tmp_path: Path) -> None: + ctx = build_runtime( + cfg=golden_config(), + endpoints=load_endpoints( + env={"CERBERO_BITE_MCP_DERIBIT_URL": "http://localhost:9911"} + ), + token="t", + db_path=tmp_path / "state.sqlite", + audit_path=tmp_path / "audit.log", + ) + # type checks: every client is the right concrete type + assert ctx.deribit.SERVICE == "deribit" + assert ctx.macro.SERVICE == "macro" + assert ctx.sentiment.SERVICE == "sentiment" + assert ctx.hyperliquid.SERVICE == "hyperliquid" + assert ctx.portfolio.SERVICE == "portfolio" + assert ctx.telegram.SERVICE == "telegram" diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py new file mode 100644 index 0000000..22e42c1 --- /dev/null +++ b/tests/unit/test_scheduler.py @@ -0,0 +1,27 @@ +"""Tests for the APScheduler bootstrap helper.""" + +from __future__ import annotations + +import pytest + +from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler + + +async def _noop() -> None: + return None + + +def test_build_scheduler_registers_all_jobs() -> None: + specs = [ + JobSpec(name="entry", cron="0 14 * * MON", coro_factory=_noop), + JobSpec(name="monitor", cron="0 2,14 * * *", coro_factory=_noop), + JobSpec(name="health", cron="*/5 * * * *", coro_factory=_noop), + ] + sched = build_scheduler(specs) + job_ids = {j.id for j in sched.get_jobs()} + assert job_ids == {"entry", "monitor", "health"} + + +def test_build_scheduler_rejects_malformed_cron() -> None: + with pytest.raises(ValueError, match="cron must have 5 fields"): + build_scheduler([JobSpec(name="x", cron="0 14 * *", coro_factory=_noop)])