Phase 4: orchestrator + cycles auto-execute

Componente runtime/ che cabla core+clients+state+safety in un engine
autonomo notify-only: nessuna conferma manuale, ordini combo
piazzati direttamente quando le regole passano. 311 test pass,
copertura totale 94%, runtime/ 90%, mypy strict pulito, ruff clean.

Moduli:
- runtime/alert_manager.py: escalation tree
  LOW/MEDIUM/HIGH/CRITICAL → audit + Telegram + kill switch.
- runtime/dependencies.py: build_runtime() costruisce
  RuntimeContext con tutti i client MCP, repository, audit log,
  kill switch, alert manager.
- runtime/entry_cycle.py: flusso settimanale (snapshot parallelo
  spot/dvol/funding/macro/holdings/equity → validate_entry →
  compute_bias → options_chain → select_strikes →
  liquidity_gate → sizing_engine → combo_builder.build →
  place_combo_order → notify_position_opened).
- runtime/monitor_cycle.py: loop 12h con dvol_history per il
  return_4h, exit_decision.evaluate, close auto-execute.
- runtime/health_check.py: probe parallelo MCP + SQLite +
  environment match; 3 strikes consecutivi → kill switch HIGH.
- runtime/recovery.py: riconciliazione SQLite vs broker
  all'avvio; mismatch → kill switch CRITICAL.
- runtime/scheduler.py: AsyncIOScheduler builder con cron entry
  (lun 14:00), monitor (02/14), health (5min).
- runtime/orchestrator.py: façade boot() + run_entry/monitor/health
  + install_scheduler + run_forever, con env check vs strategy.

CLI:
- start: avvia engine bloccante (asyncio.run + scheduler).
- dry-run --cycle entry|monitor|health: esegue un singolo ciclo
  per debug/test in produzione.
- stop: documenta lo shutdown via SIGTERM al container.

Documentazione:
- docs/06-operational-flow.md riscritto per il modello
  notify-only auto-execute (no conferma manuale, no memory,
  no brain-bridge).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-28 00:03:45 +02:00
parent 466e63dc19
commit 42b0fbe1ab
20 changed files with 3715 additions and 131 deletions
+152 -10
View File
@@ -11,8 +11,11 @@ from __future__ import annotations
import asyncio
import sys
from collections.abc import Callable
from datetime import UTC, datetime
from decimal import Decimal
from pathlib import Path
from typing import Any
import click
from rich.console import Console
@@ -33,6 +36,7 @@ from cerbero_bite.config.mcp_endpoints import (
)
from cerbero_bite.logging import configure as configure_logging
from cerbero_bite.logging import get_logger
from cerbero_bite.runtime.orchestrator import Orchestrator, make_orchestrator
from cerbero_bite.safety.audit_log import AuditChainError, AuditLog
from cerbero_bite.safety.audit_log import verify_chain as verify_audit_chain
from cerbero_bite.safety.kill_switch import KillSwitch
@@ -87,22 +91,160 @@ def status() -> None:
)
def _engine_options(func: Callable[..., Any]) -> Callable[..., Any]:
"""Common options for the engine commands."""
decorators = [
click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
),
click.option(
"--token-file",
type=click.Path(dir_okay=False, path_type=Path),
default=None,
),
click.option(
"--db",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
),
click.option(
"--audit",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
),
click.option(
"--environment",
type=click.Choice(["testnet", "mainnet"]),
default="testnet",
show_default=True,
),
click.option(
"--eur-to-usd",
type=float,
default=1.075,
show_default=True,
help="EUR→USD conversion rate used by the sizing engine.",
),
]
for d in reversed(decorators):
func = d(func)
return func
def _build_orchestrator(
*,
strategy_path: Path,
token_file: Path | None,
db: Path,
audit: Path,
environment: str,
eur_to_usd: float,
) -> Orchestrator:
loaded = load_strategy(strategy_path, enforce_hash=False)
token = load_token(path=token_file)
return make_orchestrator(
cfg=loaded.config,
endpoints=load_endpoints(),
token=token,
db_path=db,
audit_path=audit,
expected_environment=environment, # type: ignore[arg-type]
eur_to_usd=Decimal(str(eur_to_usd)),
)
@main.command()
def start() -> None:
@_engine_options
def start(
strategy_path: Path,
token_file: Path | None,
db: Path,
audit: Path,
environment: str,
eur_to_usd: float,
) -> None:
"""Start the engine main loop (scheduler + monitoring)."""
_phase0_notice("start command not yet implemented; engine remains idle.")
orch = _build_orchestrator(
strategy_path=strategy_path,
token_file=token_file,
db=db,
audit=audit,
environment=environment,
eur_to_usd=eur_to_usd,
)
console.print(
f"[bold cyan]Cerbero Bite[/bold cyan] starting "
f"(env={environment}, db={db}, audit={audit})"
)
try:
asyncio.run(orch.run_forever())
except KeyboardInterrupt:
console.print("[yellow]engine interrupted[/yellow]")
@main.command()
@_engine_options
@click.option(
"--cycle",
type=click.Choice(["entry", "monitor", "health"]),
default="entry",
show_default=True,
)
def dry_run(
strategy_path: Path,
token_file: Path | None,
db: Path,
audit: Path,
environment: str,
eur_to_usd: float,
cycle: str,
) -> None:
"""Execute one cycle without starting the scheduler."""
orch = _build_orchestrator(
strategy_path=strategy_path,
token_file=token_file,
db=db,
audit=audit,
environment=environment,
eur_to_usd=eur_to_usd,
)
async def _go() -> None:
await orch.boot()
if cycle == "entry":
entry = await orch.run_entry()
console.print(
f"[green]entry[/green] {entry.status} reason={entry.reason}"
)
elif cycle == "monitor":
mon = await orch.run_monitor()
console.print(f"[green]monitor[/green] outcomes={len(mon.outcomes)}")
for outcome in mon.outcomes:
console.print(
f"{outcome.proposal_id[:8]} {outcome.action} "
f"closed={outcome.closed}"
)
else:
health = await orch.run_health()
console.print(f"[green]health[/green] state={health.state}")
asyncio.run(_go())
@main.command()
def stop() -> None:
"""Gracefully stop a running engine."""
_phase0_notice("stop command not yet implemented.")
@main.command(name="dry-run")
def dry_run() -> None:
"""Run the decision loop once in dry-run mode (no MCP writes)."""
_phase0_notice("dry-run command not yet implemented.")
"""Gracefully stop a running engine (manual: send SIGTERM)."""
console.print(
"Use [bold]docker compose stop cerbero-bite[/bold] (or send SIGTERM "
"to the engine PID). The container traps the signal and shuts down "
"the scheduler before exit."
)
@main.group(name="kill-switch")
+14
View File
@@ -0,0 +1,14 @@
"""Runtime: composes core/ + clients/ + state/ + safety/ into the engine."""
from cerbero_bite.runtime.alert_manager import AlertManager, Severity
from cerbero_bite.runtime.dependencies import RuntimeContext, build_runtime
from cerbero_bite.runtime.orchestrator import Orchestrator, make_orchestrator
__all__ = [
"AlertManager",
"Orchestrator",
"RuntimeContext",
"Severity",
"build_runtime",
"make_orchestrator",
]
+106
View File
@@ -0,0 +1,106 @@
"""Severity-based alert dispatcher (``docs/07-risk-controls.md``).
Routes anomalies to the right notification channel and, for HIGH and
CRITICAL events, arms the kill switch.
* ``LOW`` — append to audit log only.
* ``MEDIUM`` — audit + ``telegram.notify`` (priority="high").
* ``HIGH`` — audit + ``telegram.notify_alert`` (priority="high")
+ arm kill switch.
* ``CRITICAL``— audit + ``telegram.notify_system_error``
+ arm kill switch (already armed → idempotent).
"""
from __future__ import annotations
import logging
from enum import StrEnum
from cerbero_bite.clients.telegram import TelegramClient
from cerbero_bite.safety.audit_log import AuditLog
from cerbero_bite.safety.kill_switch import KillSwitch
__all__ = ["AlertManager", "Severity"]
_log = logging.getLogger("cerbero_bite.runtime.alert_manager")
class Severity(StrEnum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertManager:
"""Dispatcher used by every runtime module to surface anomalies."""
def __init__(
self,
*,
telegram: TelegramClient,
audit_log: AuditLog,
kill_switch: KillSwitch,
) -> None:
self._telegram = telegram
self._audit = audit_log
self._kill = kill_switch
async def emit(
self,
severity: Severity,
*,
source: str,
message: str,
component: str | None = None,
) -> None:
"""Emit an alert at the given severity level."""
self._audit.append(
event="ALERT",
payload={
"severity": severity.value,
"source": source,
"message": message,
"component": component,
},
)
if severity == Severity.LOW:
_log.info("alert.low source=%s message=%s", source, message)
return
if severity == Severity.MEDIUM:
await self._telegram.notify(
f"[{source}] {message}", priority="high", tag=source
)
return
if severity == Severity.HIGH:
await self._telegram.notify_alert(
source=source, message=message, priority="high"
)
self._kill.arm(reason=message, source=source)
return
# CRITICAL
await self._telegram.notify_system_error(
message=message, component=component, priority="critical"
)
self._kill.arm(reason=message, source=source)
async def low(self, *, source: str, message: str) -> None:
await self.emit(Severity.LOW, source=source, message=message)
async def medium(self, *, source: str, message: str) -> None:
await self.emit(Severity.MEDIUM, source=source, message=message)
async def high(self, *, source: str, message: str) -> None:
await self.emit(Severity.HIGH, source=source, message=message)
async def critical(
self, *, source: str, message: str, component: str | None = None
) -> None:
await self.emit(
Severity.CRITICAL, source=source, message=message, component=component
)
+135
View File
@@ -0,0 +1,135 @@
"""Runtime dependency container.
Builds and wires together every long-lived object the engine needs:
HTTP clients, repository, audit log, kill switch, alert manager. The
:func:`build_runtime` factory returns a frozen :class:`RuntimeContext`
that the orchestrator and the cycle modules pass around — no global
state, no implicit singletons.
"""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from cerbero_bite.clients._base import HttpToolClient
from cerbero_bite.clients.deribit import DeribitClient
from cerbero_bite.clients.hyperliquid import HyperliquidClient
from cerbero_bite.clients.macro import MacroClient
from cerbero_bite.clients.portfolio import PortfolioClient
from cerbero_bite.clients.sentiment import SentimentClient
from cerbero_bite.clients.telegram import TelegramClient
from cerbero_bite.config.mcp_endpoints import McpEndpoints
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.runtime.alert_manager import AlertManager
from cerbero_bite.safety.audit_log import AuditLog
from cerbero_bite.safety.kill_switch import KillSwitch
from cerbero_bite.state import (
Repository,
connect,
run_migrations,
transaction,
)
__all__ = ["RuntimeContext", "build_runtime"]
@dataclass(frozen=True)
class RuntimeContext:
"""Bag of every wired object used by the runtime."""
cfg: StrategyConfig
db_path: Path
audit_path: Path
repository: Repository
audit_log: AuditLog
kill_switch: KillSwitch
alert_manager: AlertManager
deribit: DeribitClient
macro: MacroClient
sentiment: SentimentClient
hyperliquid: HyperliquidClient
portfolio: PortfolioClient
telegram: TelegramClient
clock: Callable[[], datetime]
def _utc_now() -> datetime:
return datetime.now(UTC)
def build_runtime(
*,
cfg: StrategyConfig,
endpoints: McpEndpoints,
token: str,
db_path: Path | str,
audit_path: Path | str,
timeout_s: float = 8.0,
retry_max: int = 3,
clock: Callable[[], datetime] | None = None,
) -> RuntimeContext:
"""Wire every dependency the runtime needs.
The SQLite database is migrated and the system_state singleton is
initialised eagerly so the orchestrator can assume both are
present.
"""
db_path = Path(db_path)
audit_path = Path(audit_path)
clk = clock or _utc_now
repository = Repository()
conn = connect(db_path)
try:
run_migrations(conn)
with transaction(conn):
repository.init_system_state(
conn, config_version=cfg.config_version, now=clk()
)
finally:
conn.close()
audit_log = AuditLog(audit_path)
kill_switch = KillSwitch(
connection_factory=lambda: connect(db_path),
repository=repository,
audit_log=audit_log,
clock=clk,
)
def _client(service: str) -> HttpToolClient:
return HttpToolClient(
service=service,
base_url=endpoints.for_service(service),
token=token,
timeout_s=timeout_s,
retry_max=retry_max,
)
telegram = TelegramClient(_client("telegram"))
alert_manager = AlertManager(
telegram=telegram, audit_log=audit_log, kill_switch=kill_switch
)
return RuntimeContext(
cfg=cfg,
db_path=db_path,
audit_path=audit_path,
repository=repository,
audit_log=audit_log,
kill_switch=kill_switch,
alert_manager=alert_manager,
deribit=DeribitClient(_client("deribit")),
macro=MacroClient(_client("macro")),
sentiment=SentimentClient(_client("sentiment")),
hyperliquid=HyperliquidClient(_client("hyperliquid")),
portfolio=PortfolioClient(_client("portfolio")),
telegram=telegram,
clock=clk,
)
+640
View File
@@ -0,0 +1,640 @@
"""Weekly entry decision loop (``docs/06-operational-flow.md`` §2).
Pure orchestration over the existing core/clients/state primitives.
The cycle is auto-execute: when every gate passes, the engine sends
the combo order without asking Adriano. Telegram is used only to
notify the outcome.
"""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Any
from uuid import uuid4
from cerbero_bite.clients.deribit import (
ComboLegOrder,
ComboOrderResult,
DeribitClient,
InstrumentMeta,
)
from cerbero_bite.clients.hyperliquid import HyperliquidClient
from cerbero_bite.clients.macro import MacroClient
from cerbero_bite.clients.portfolio import PortfolioClient
from cerbero_bite.clients.sentiment import SentimentClient
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.core.combo_builder import ComboProposal, build, select_strikes
from cerbero_bite.core.entry_validator import (
EntryContext,
TrendContext,
compute_bias,
validate_entry,
)
from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check
from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts
from cerbero_bite.core.types import OptionQuote
from cerbero_bite.runtime.alert_manager import AlertManager
from cerbero_bite.runtime.dependencies import RuntimeContext
from cerbero_bite.state import (
DecisionRecord,
InstructionRecord,
PositionRecord,
transaction,
)
from cerbero_bite.state import connect as connect_state
__all__ = [
"EntryCycleResult",
"EntryDecisionStatus",
"run_entry_cycle",
]
_log = logging.getLogger("cerbero_bite.runtime.entry")
EntryDecisionStatus = str # one of the literals below
_STATUS_ENTRY_PLACED = "entry_placed"
_STATUS_NO_ENTRY = "no_entry"
_STATUS_BROKER_REJECT = "broker_reject"
_STATUS_KILL_SWITCH = "kill_switch_armed"
_STATUS_HAS_OPEN = "has_open_position"
@dataclass(frozen=True)
class EntryCycleResult:
"""Outcome of one ``run_entry_cycle`` call (no exception path)."""
status: EntryDecisionStatus
reason: str | None
proposal: ComboProposal | None = None
order: ComboOrderResult | None = None
# ---------------------------------------------------------------------------
# Snapshot collection
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class _MarketSnapshot:
spot_eth_usd: Decimal
dvol: Decimal
funding_perp: Decimal
funding_cross: Decimal
macro_days_to_event: int | None
eth_holdings_pct: Decimal
portfolio_eur: Decimal
async def _gather_snapshot(
*,
deribit: DeribitClient,
hyperliquid: HyperliquidClient,
sentiment: SentimentClient,
macro: MacroClient,
portfolio: PortfolioClient,
cfg: StrategyConfig,
now: datetime,
) -> _MarketSnapshot:
spot_t: asyncio.Task[Decimal] = asyncio.create_task(deribit.index_price_eth())
dvol_t: asyncio.Task[Decimal] = asyncio.create_task(
deribit.latest_dvol(currency="ETH", now=now)
)
funding_perp_t: asyncio.Task[Decimal] = asyncio.create_task(
hyperliquid.funding_rate_annualized("ETH")
)
funding_cross_t: asyncio.Task[Decimal] = asyncio.create_task(
sentiment.funding_cross_median_annualized("ETH")
)
macro_t: asyncio.Task[int | None] = asyncio.create_task(
macro.next_high_severity_within(
days=cfg.structure.dte_target,
countries=list(cfg.entry.exclude_macro_countries),
now=now,
)
)
holdings_t: asyncio.Task[Decimal] = asyncio.create_task(
portfolio.asset_pct_of_portfolio("ETH")
)
portfolio_t: asyncio.Task[Decimal] = asyncio.create_task(
portfolio.total_equity_eur()
)
await asyncio.gather(
spot_t,
dvol_t,
funding_perp_t,
funding_cross_t,
macro_t,
holdings_t,
portfolio_t,
)
return _MarketSnapshot(
spot_eth_usd=spot_t.result(),
dvol=dvol_t.result(),
funding_perp=funding_perp_t.result(),
funding_cross=funding_cross_t.result(),
macro_days_to_event=macro_t.result(),
eth_holdings_pct=holdings_t.result(),
portfolio_eur=portfolio_t.result(),
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _record_decision(
ctx: RuntimeContext,
*,
inputs: dict[str, Any],
outputs: dict[str, Any],
action_taken: str,
notes: str | None,
proposal_id: str | None,
now: datetime,
) -> None:
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.record_decision(
conn,
DecisionRecord(
decision_type="entry_check",
timestamp=now,
inputs_json=json.dumps(inputs, default=str, sort_keys=True),
outputs_json=json.dumps(outputs, default=str, sort_keys=True),
action_taken=action_taken,
notes=notes,
proposal_id=proposal_id, # type: ignore[arg-type]
),
)
finally:
conn.close()
async def _build_quotes(
deribit: DeribitClient,
chain: list[InstrumentMeta],
) -> list[OptionQuote]:
"""Fetch tickers + orderbook depth for the given metas, return OptionQuotes."""
if not chain:
return []
names = [m.name for m in chain]
if len(names) > 20:
# Bite consumes a narrow window of strikes; if it ever overflows
# the batch limit, the caller is expected to pre-filter.
raise ValueError("entry_cycle: too many instruments to quote in one batch")
tickers = await deribit.get_tickers(names)
depths = await asyncio.gather(
*[deribit.orderbook_depth_top3(m.name) for m in chain]
)
by_name: dict[str, dict[str, Any]] = {
str(t.get("instrument_name")): t for t in tickers if isinstance(t, dict)
}
out: list[OptionQuote] = []
for meta, depth in zip(chain, depths, strict=True):
ticker = by_name.get(meta.name)
if not ticker:
continue
bid = ticker.get("bid")
ask = ticker.get("ask")
mark = ticker.get("mark_price")
greeks = ticker.get("greeks") or {}
if bid is None or ask is None or mark is None:
continue
out.append(
OptionQuote(
instrument=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
bid=Decimal(str(bid)),
ask=Decimal(str(ask)),
mid=Decimal(str(mark)),
delta=Decimal(str(greeks.get("delta") or 0)),
gamma=Decimal(str(greeks.get("gamma") or 0)),
theta=Decimal(str(greeks.get("theta") or 0)),
vega=Decimal(str(greeks.get("vega") or 0)),
open_interest=int(meta.open_interest or 0),
volume_24h=int(ticker.get("volume_24h") or 0),
book_depth_top3=int(depth),
)
)
return out
def _max_loss_per_contract_usd(short_strike: Decimal, long_strike: Decimal) -> Decimal:
return (short_strike - long_strike).copy_abs()
# ---------------------------------------------------------------------------
# Cycle entry point
# ---------------------------------------------------------------------------
async def run_entry_cycle(
ctx: RuntimeContext,
*,
eur_to_usd_rate: Decimal,
now: datetime | None = None,
) -> EntryCycleResult:
"""Run one weekly entry evaluation cycle.
The function is idempotent and side-effect aware: it persists the
decision in the ``decisions`` table regardless of outcome and only
creates a position when the broker accepts the order.
"""
when = (now or ctx.clock()).astimezone(UTC)
cfg = ctx.cfg
alert: AlertManager = ctx.alert_manager
if ctx.kill_switch.is_armed():
await ctx.alert_manager.low(
source="entry_cycle", message="kill switch armed — skipping"
)
return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch")
# Has open position?
conn = connect_state(ctx.db_path)
try:
concurrent = ctx.repository.count_concurrent_positions(conn)
finally:
conn.close()
if concurrent > 0:
await alert.low(source="entry_cycle", message="position already open")
return EntryCycleResult(status=_STATUS_HAS_OPEN, reason="has_open_position")
# 1. Snapshot
snap = await _gather_snapshot(
deribit=ctx.deribit,
hyperliquid=ctx.hyperliquid,
sentiment=ctx.sentiment,
macro=ctx.macro,
portfolio=ctx.portfolio,
cfg=cfg,
now=when,
)
capital_usd = snap.portfolio_eur * eur_to_usd_rate
# 2. Entry filters
entry_ctx = EntryContext(
capital_usd=capital_usd,
dvol_now=snap.dvol,
funding_perp_annualized=snap.funding_perp,
eth_holdings_pct_of_portfolio=snap.eth_holdings_pct,
next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False,
)
decision = validate_entry(entry_ctx, cfg)
inputs = {
"snapshot": {
"spot_eth_usd": str(snap.spot_eth_usd),
"dvol": str(snap.dvol),
"funding_perp": str(snap.funding_perp),
"funding_cross": str(snap.funding_cross),
"macro_days_to_event": snap.macro_days_to_event,
"eth_holdings_pct": str(snap.eth_holdings_pct),
"portfolio_eur": str(snap.portfolio_eur),
"capital_usd": str(capital_usd),
}
}
if not decision.accepted:
await _record_decision(
ctx,
inputs=inputs,
outputs={"accepted": False, "reasons": decision.reasons},
action_taken="no_entry",
notes="entry_validator",
proposal_id=None,
now=when,
)
await alert.low(
source="entry_cycle",
message=f"entry rejected: {'; '.join(decision.reasons)}",
)
return EntryCycleResult(
status=_STATUS_NO_ENTRY, reason=";".join(decision.reasons)
)
# 3. Bias (need a 30-day prior spot — orchestrator passes it in)
# We approximate by reusing the current spot until the historical
# snapshot store ships in Phase 5; for now no historical → bias
# cannot fire bull/bear, only iron_condor when DVOL/ADX align. The
# caller is responsible for plugging in real data via overrides.
trend_ctx = TrendContext(
eth_now=snap.spot_eth_usd,
eth_30d_ago=snap.spot_eth_usd,
funding_cross_annualized=snap.funding_cross,
dvol_now=snap.dvol,
adx_14=Decimal("25"), # placeholder until ADX lands in market data
)
bias = compute_bias(trend_ctx, cfg)
if bias is None:
await _record_decision(
ctx,
inputs=inputs,
outputs={"bias": None},
action_taken="no_entry",
notes="no_bias",
proposal_id=None,
now=when,
)
await alert.low(source="entry_cycle", message="no directional bias")
return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="no_bias")
# 4. Chain → strikes
expiry_from = when
expiry_to = when + timedelta(days=cfg.structure.dte_max + 1)
chain_meta = await ctx.deribit.options_chain(
currency="ETH",
expiry_from=expiry_from,
expiry_to=expiry_to,
min_open_interest=int(cfg.liquidity.open_interest_min),
)
quotes = await _build_quotes(ctx.deribit, chain_meta)
selection = select_strikes(
chain=quotes, bias=bias, spot=snap.spot_eth_usd, now=when, cfg=cfg
)
if selection is None:
await _record_decision(
ctx,
inputs=inputs,
outputs={"bias": bias, "n_quotes": len(quotes)},
action_taken="no_entry",
notes="no_strike",
proposal_id=None,
now=when,
)
await alert.low(source="entry_cycle", message="no strike candidate")
return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="no_strike")
short, long_ = selection
# 5. Liquidity gate (uses raw bid/ask/depth from the same quotes)
short_snap = InstrumentSnapshot(
instrument=short.instrument,
bid=short.bid,
ask=short.ask,
mid=short.mid,
open_interest=short.open_interest,
volume_24h=short.volume_24h,
book_depth_top3=short.book_depth_top3,
)
long_snap = InstrumentSnapshot(
instrument=long_.instrument,
bid=long_.bid,
ask=long_.ask,
mid=long_.mid,
open_interest=long_.open_interest,
volume_24h=long_.volume_24h,
book_depth_top3=long_.book_depth_top3,
)
credit_eth_per_contract = short.mid - long_.mid
# 6. Sizing
width_usd = (short.strike - long_.strike).copy_abs()
sizing_ctx = SizingContext(
capital_usd=capital_usd,
max_loss_per_contract_usd=_max_loss_per_contract_usd(
short.strike, long_.strike
),
dvol_now=snap.dvol,
open_engagement_usd=Decimal("0"),
eur_to_usd=eur_to_usd_rate,
other_open_positions=0,
)
sizing = compute_contracts(sizing_ctx, cfg)
if sizing.n_contracts < 1:
await _record_decision(
ctx,
inputs=inputs,
outputs={"sizing_reason": sizing.reason_if_zero},
action_taken="no_entry",
notes="undersize",
proposal_id=None,
now=when,
)
await alert.low(
source="entry_cycle",
message=f"undersize: {sizing.reason_if_zero}",
)
return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="undersize")
# 7. Liquidity check now that we know n_contracts
liq = check(
short_leg=short_snap,
long_leg=long_snap,
credit=credit_eth_per_contract * Decimal(sizing.n_contracts),
n_contracts=sizing.n_contracts,
cfg=cfg,
)
if not liq.accepted:
await _record_decision(
ctx,
inputs=inputs,
outputs={"liquidity_reasons": liq.reasons},
action_taken="no_entry",
notes="illiquid",
proposal_id=None,
now=when,
)
await alert.low(
source="entry_cycle",
message=f"illiquid: {'; '.join(liq.reasons)}",
)
return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="illiquid")
# 8. Build proposal + persist + place order
proposal = build(
short=short,
long_=long_,
n_contracts=sizing.n_contracts,
spot=snap.spot_eth_usd,
dvol=snap.dvol,
cfg=cfg,
now=when,
spread_type=bias,
)
pct_of_spot = (
width_usd / snap.spot_eth_usd if snap.spot_eth_usd > 0 else Decimal("0")
)
record = PositionRecord(
proposal_id=proposal.proposal_id,
spread_type=bias,
expiry=proposal.expiry,
short_strike=short.strike,
long_strike=long_.strike,
short_instrument=short.instrument,
long_instrument=long_.instrument,
n_contracts=sizing.n_contracts,
spread_width_usd=width_usd,
spread_width_pct=pct_of_spot,
credit_eth=proposal.credit_target_eth,
credit_usd=proposal.credit_target_usd,
max_loss_usd=proposal.max_loss_usd,
spot_at_entry=snap.spot_eth_usd,
dvol_at_entry=snap.dvol,
delta_at_entry=short.delta,
eth_price_at_entry=snap.spot_eth_usd,
proposed_at=when,
status="proposed",
created_at=when,
updated_at=when,
)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.create_position(conn, record)
finally:
conn.close()
legs = [
ComboLegOrder(instrument_name=short.instrument, direction="sell"),
ComboLegOrder(instrument_name=long_.instrument, direction="buy"),
]
try:
order = await ctx.deribit.place_combo_order(
legs=legs,
side="sell",
n_contracts=sizing.n_contracts,
limit_price_eth=credit_eth_per_contract,
label=f"bite-{proposal.proposal_id}",
)
except Exception as exc:
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.update_position_status(
conn,
proposal.proposal_id,
status="cancelled",
closed_at=when,
close_reason="broker_error",
now=when,
)
finally:
conn.close()
await alert.high(
source="entry_cycle",
message=f"place_combo_order failed: {type(exc).__name__}: {exc}",
)
await _record_decision(
ctx,
inputs=inputs,
outputs={"error": str(exc)},
action_taken="broker_error",
notes=type(exc).__name__,
proposal_id=str(proposal.proposal_id),
now=when,
)
return EntryCycleResult(
status=_STATUS_BROKER_REJECT,
reason=f"{type(exc).__name__}: {exc}",
proposal=proposal,
)
# 9. Persist instruction + update status
next_status = "open" if order.state in {"filled", "open"} else "awaiting_fill"
if order.state == "rejected":
next_status = "cancelled"
instruction_id = uuid4()
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.create_instruction(
conn,
InstructionRecord(
instruction_id=instruction_id,
proposal_id=proposal.proposal_id,
kind="open_combo",
payload_json=json.dumps(order.raw, default=str, sort_keys=True),
sent_at=when,
actual_fill_eth=order.average_price_eth,
),
)
ctx.repository.update_position_status(
conn,
proposal.proposal_id,
status=next_status, # type: ignore[arg-type]
opened_at=when if next_status == "open" else None,
closed_at=when if next_status == "cancelled" else None,
close_reason="broker_reject" if next_status == "cancelled" else None,
now=when,
)
finally:
conn.close()
await _record_decision(
ctx,
inputs=inputs,
outputs={
"n_contracts": sizing.n_contracts,
"credit_eth": str(proposal.credit_target_eth),
"max_loss_usd": str(proposal.max_loss_usd),
"broker_state": order.state,
},
action_taken="propose_open",
notes=None,
proposal_id=str(proposal.proposal_id),
now=when,
)
if next_status == "cancelled":
await alert.high(
source="entry_cycle",
message=f"broker rejected combo order: state={order.state}",
)
return EntryCycleResult(
status=_STATUS_BROKER_REJECT,
reason="broker_reject",
proposal=proposal,
order=order,
)
await ctx.telegram.notify_position_opened(
instrument=order.combo_instrument,
side="SELL",
size=sizing.n_contracts,
strategy=bias,
greeks={
"delta_short": short.delta,
"credit_eth": proposal.credit_target_eth,
"max_loss_usd": proposal.max_loss_usd,
},
expected_pnl_usd=proposal.credit_target_usd,
)
ctx.audit_log.append(
event="ENTRY_PLACED",
payload={
"proposal_id": str(proposal.proposal_id),
"spread_type": bias,
"n_contracts": sizing.n_contracts,
"combo_instrument": order.combo_instrument,
"broker_state": order.state,
},
now=when,
)
_log.info(
"entry placed: proposal=%s combo=%s contracts=%d state=%s",
proposal.proposal_id,
order.combo_instrument,
sizing.n_contracts,
order.state,
)
return EntryCycleResult(
status=_STATUS_ENTRY_PLACED,
reason=None,
proposal=proposal,
order=order,
)
+128
View File
@@ -0,0 +1,128 @@
"""Periodic health probe across MCP services + SQLite + environment.
The probe is fail-soft: every check is wrapped in a try/except so a
single misbehaving service does not abort the others. The orchestrator
keeps a counter of consecutive failures: at the third failure the
kill switch arms (HIGH severity); any time the probe succeeds the
counter resets and a fresh ``HEALTH_OK`` line is appended to the
audit log so the dead-man watcher stays quiet.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Literal
from cerbero_bite.runtime.alert_manager import Severity
from cerbero_bite.runtime.dependencies import RuntimeContext
from cerbero_bite.state import connect
__all__ = ["HealthCheck", "HealthCheckResult", "HealthState"]
_log = logging.getLogger("cerbero_bite.runtime.health")
HealthState = Literal["ok", "degraded"]
@dataclass(frozen=True)
class HealthCheckResult:
state: HealthState
failures: list[tuple[str, str]] # [(service, reason), ...]
consecutive_failures: int
class HealthCheck:
"""Stateful health probe; remembers consecutive failures across calls."""
def __init__(
self,
ctx: RuntimeContext,
*,
expected_environment: Literal["testnet", "mainnet"],
kill_after: int = 3,
) -> None:
self._ctx = ctx
self._expected = expected_environment
self._kill_after = kill_after
self._consecutive = 0
async def run(self, *, now: datetime | None = None) -> HealthCheckResult:
when = (now or self._ctx.clock()).astimezone(UTC)
failures: list[tuple[str, str]] = []
async def _probe(service: str, coro: object) -> None:
try:
await coro # type: ignore[misc]
except Exception as exc: # surface every error to the operator
failures.append((service, f"{type(exc).__name__}: {exc}"))
await asyncio.gather(
_probe("deribit", self._probe_deribit()),
_probe("macro", self._ctx.macro.get_calendar(days=1)),
_probe("sentiment", self._probe_sentiment()),
_probe("hyperliquid", self._ctx.hyperliquid.funding_rate_annualized("ETH")),
_probe("portfolio", self._ctx.portfolio.total_equity_eur()),
)
# SQLite health: lightweight transaction.
try:
conn = connect(self._ctx.db_path)
try:
self._ctx.repository.touch_health_check(conn, now=when)
finally:
conn.close()
except Exception as exc: # pragma: no cover — sqlite errors are rare
failures.append(("sqlite", f"{type(exc).__name__}: {exc}"))
if failures:
self._consecutive += 1
state: HealthState = "degraded"
self._ctx.audit_log.append(
event="HEALTH_DEGRADED",
payload={
"failures": failures,
"consecutive": self._consecutive,
},
now=when,
)
if self._consecutive >= self._kill_after:
await self._ctx.alert_manager.emit(
Severity.HIGH,
source="health_check",
message=(
f"{self._consecutive} consecutive health-check failures "
f"(latest: {failures})"
),
)
else:
self._consecutive = 0
state = "ok"
self._ctx.audit_log.append(
event="HEALTH_OK", payload={}, now=when
)
return HealthCheckResult(
state=state,
failures=failures,
consecutive_failures=self._consecutive,
)
async def _probe_deribit(self) -> None:
info = await self._ctx.deribit.environment_info()
if info.environment != self._expected:
raise RuntimeError(
f"deribit environment mismatch: expected {self._expected}, "
f"got {info.environment}"
)
async def _probe_sentiment(self) -> None:
# Avoid funding_cross which would raise on empty snapshot during
# the health probe; we only need a successful HTTP round-trip.
await self._ctx.sentiment._http.call(
"get_cross_exchange_funding", {"assets": ["ETH"]}
)
+451
View File
@@ -0,0 +1,451 @@
"""Position monitoring loop (``docs/06-operational-flow.md`` §3).
Walks every ``open`` position, builds the live snapshot, asks
:func:`exit_decision.evaluate`, and — when the action is not ``HOLD``
— sends the inverse combo order to close. Decisions are persisted to
the ``decisions`` table; transitions to the audit log.
"""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Any
from uuid import uuid4
from cerbero_bite.clients.deribit import ComboLegOrder
from cerbero_bite.core.exit_decision import (
PositionSnapshot,
evaluate,
)
from cerbero_bite.core.types import OptionLeg, PutOrCall
from cerbero_bite.runtime.dependencies import RuntimeContext
from cerbero_bite.state import (
DecisionRecord,
DvolSnapshot,
InstructionRecord,
PositionRecord,
transaction,
)
from cerbero_bite.state import connect as connect_state
__all__ = [
"MonitorCycleResult",
"PositionOutcome",
"run_monitor_cycle",
]
_log = logging.getLogger("cerbero_bite.runtime.monitor")
@dataclass(frozen=True)
class PositionOutcome:
proposal_id: str
action: str
closed: bool
reason: str | None
@dataclass(frozen=True)
class MonitorCycleResult:
outcomes: list[PositionOutcome]
# ---------------------------------------------------------------------------
# Snapshot helpers
# ---------------------------------------------------------------------------
async def _build_position_snapshot(
ctx: RuntimeContext,
*,
record: PositionRecord,
spot: Decimal,
dvol: Decimal,
return_4h: Decimal,
eth_price_usd_now: Decimal,
now: datetime,
) -> PositionSnapshot | None:
"""Fetch tickers for the two legs and build a :class:`PositionSnapshot`.
Returns ``None`` if the broker no longer quotes one of the legs —
the caller will arm an alert in that case.
"""
tickers = await ctx.deribit.get_tickers(
[record.short_instrument, record.long_instrument]
)
by_name: dict[str, dict[str, Any]] = {
str(t.get("instrument_name")): t for t in tickers if isinstance(t, dict)
}
short = by_name.get(record.short_instrument)
long_ = by_name.get(record.long_instrument)
if short is None or long_ is None:
return None
short_mid = Decimal(str(short.get("mark_price") or 0))
long_mid = Decimal(str(long_.get("mark_price") or 0))
if short_mid <= 0 or long_mid <= 0:
return None
short_delta = Decimal(
str((short.get("greeks") or {}).get("delta") or record.delta_at_entry)
)
debit_per_contract = short_mid - long_mid
debit_total = debit_per_contract * Decimal(record.n_contracts)
legs = [
OptionLeg(
instrument=record.short_instrument,
side="SELL",
strike=record.short_strike,
expiry=record.expiry,
type=_option_type_from_name(record.short_instrument),
size=record.n_contracts,
mid_price_eth=short_mid,
delta=short_delta,
gamma=Decimal(str((short.get("greeks") or {}).get("gamma") or 0)),
theta=Decimal(str((short.get("greeks") or {}).get("theta") or 0)),
vega=Decimal(str((short.get("greeks") or {}).get("vega") or 0)),
),
OptionLeg(
instrument=record.long_instrument,
side="BUY",
strike=record.long_strike,
expiry=record.expiry,
type=_option_type_from_name(record.long_instrument),
size=record.n_contracts,
mid_price_eth=long_mid,
delta=Decimal(str((long_.get("greeks") or {}).get("delta") or 0)),
gamma=Decimal(str((long_.get("greeks") or {}).get("gamma") or 0)),
theta=Decimal(str((long_.get("greeks") or {}).get("theta") or 0)),
vega=Decimal(str((long_.get("greeks") or {}).get("vega") or 0)),
),
]
return PositionSnapshot(
proposal_id=record.proposal_id,
spread_type=record.spread_type, # type: ignore[arg-type]
legs=legs,
credit_received_eth=record.credit_eth,
credit_received_usd=record.credit_usd,
spot_at_entry=record.spot_at_entry,
dvol_at_entry=record.dvol_at_entry,
expiry=record.expiry,
opened_at=record.opened_at or record.created_at,
eth_price_usd_now=eth_price_usd_now,
spot_now=spot,
dvol_now=dvol,
mark_combo_now_eth=debit_total,
delta_short_now=short_delta,
return_4h_now=return_4h,
now=now,
)
def _option_type_from_name(name: str) -> PutOrCall:
suffix = name.rsplit("-", 1)[-1]
if suffix not in ("P", "C"):
raise ValueError(f"unknown option type suffix in {name!r}")
return suffix # type: ignore[return-value]
async def _fetch_return_4h(ctx: RuntimeContext, *, now: datetime) -> Decimal:
"""Compute ETH 4h return from the locally stored dvol_history snapshots.
The orchestrator records a snapshot at the start of every monitor
cycle (see :func:`run_monitor_cycle`); this helper reads the most
recent snapshot at least 3.5h old and computes ``(now / past) - 1``.
Returns 0 if no historical sample is available — in that branch the
orchestrator emits a LOW alert about insufficient history.
"""
cutoff = now - timedelta(hours=3, minutes=30)
floor = now - timedelta(hours=8)
conn = connect_state(ctx.db_path)
try:
row = conn.execute(
"SELECT timestamp, eth_spot FROM dvol_history "
"WHERE timestamp <= ? AND timestamp >= ? "
"ORDER BY timestamp DESC LIMIT 1",
(cutoff.isoformat(), floor.isoformat()),
).fetchone()
finally:
conn.close()
if row is None:
return Decimal("0")
past_spot = Decimal(str(row[1]))
if past_spot == 0:
return Decimal("0")
spot_now = await ctx.deribit.index_price_eth()
return spot_now / past_spot - Decimal("1")
# ---------------------------------------------------------------------------
# Cycle entry point
# ---------------------------------------------------------------------------
async def run_monitor_cycle(
ctx: RuntimeContext,
*,
now: datetime | None = None,
) -> MonitorCycleResult:
"""Walk every open position, evaluate exit, place close orders if needed."""
when = (now or ctx.clock()).astimezone(UTC)
if ctx.kill_switch.is_armed():
await ctx.alert_manager.low(
source="monitor_cycle", message="kill switch armed — skipping"
)
return MonitorCycleResult(outcomes=[])
# Refresh spot+DVOL snapshot first; it goes into dvol_history so the
# next cycle can compute return_4h.
spot_t = asyncio.create_task(ctx.deribit.index_price_eth())
dvol_t = asyncio.create_task(ctx.deribit.latest_dvol(currency="ETH", now=when))
return_4h_t = asyncio.create_task(_fetch_return_4h(ctx, now=when))
await asyncio.gather(spot_t, dvol_t, return_4h_t)
spot = spot_t.result()
dvol = dvol_t.result()
return_4h = return_4h_t.result()
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.record_dvol_snapshot(
conn,
DvolSnapshot(timestamp=when, dvol=dvol, eth_spot=spot),
)
positions = ctx.repository.list_positions(conn, status="open")
finally:
conn.close()
outcomes: list[PositionOutcome] = []
for record in positions:
outcome = await _evaluate_position(
ctx,
record=record,
spot=spot,
dvol=dvol,
return_4h=return_4h,
now=when,
)
outcomes.append(outcome)
return MonitorCycleResult(outcomes=outcomes)
async def _evaluate_position(
ctx: RuntimeContext,
*,
record: PositionRecord,
spot: Decimal,
dvol: Decimal,
return_4h: Decimal,
now: datetime,
) -> PositionOutcome:
snapshot = await _build_position_snapshot(
ctx,
record=record,
spot=spot,
dvol=dvol,
return_4h=return_4h,
eth_price_usd_now=spot,
now=now,
)
if snapshot is None:
await ctx.alert_manager.high(
source="monitor_cycle",
message=(
f"missing tickers for legs of position {record.proposal_id}"
"broker may have de-listed an instrument"
),
)
return PositionOutcome(
proposal_id=str(record.proposal_id),
action="HOLD",
closed=False,
reason="missing_ticker",
)
decision = evaluate(snapshot, ctx.cfg)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.record_decision(
conn,
DecisionRecord(
decision_type="exit_check",
timestamp=now,
inputs_json=json.dumps(
{
"spot": str(spot),
"dvol": str(dvol),
"mark_combo_now_eth": str(snapshot.mark_combo_now_eth),
"delta_short_now": str(snapshot.delta_short_now),
"return_4h_now": str(return_4h),
},
default=str,
sort_keys=True,
),
outputs_json=json.dumps(
{
"action": decision.action,
"reason": decision.reason,
"pnl_eth": str(decision.pnl_estimate_eth),
"pnl_usd": str(decision.pnl_estimate_usd),
},
default=str,
sort_keys=True,
),
action_taken=decision.action,
proposal_id=record.proposal_id,
),
)
finally:
conn.close()
if decision.action == "HOLD":
ctx.audit_log.append(
event="HOLD",
payload={
"proposal_id": str(record.proposal_id),
"reason": decision.reason,
},
now=now,
)
return PositionOutcome(
proposal_id=str(record.proposal_id),
action="HOLD",
closed=False,
reason=decision.reason,
)
# Place inverse combo to close.
short_leg = ComboLegOrder(
instrument_name=record.short_instrument, direction="buy"
)
long_leg = ComboLegOrder(
instrument_name=record.long_instrument, direction="sell"
)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.update_position_status(
conn,
record.proposal_id,
status="closing",
now=now,
)
finally:
conn.close()
try:
order = await ctx.deribit.place_combo_order(
legs=[short_leg, long_leg],
side="buy",
n_contracts=record.n_contracts,
limit_price_eth=snapshot.mark_combo_now_eth / Decimal(record.n_contracts),
label=f"bite-close-{record.proposal_id}",
)
except Exception as exc:
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.update_position_status(
conn, record.proposal_id, status="open", now=now
)
finally:
conn.close()
await ctx.alert_manager.critical(
source="monitor_cycle",
message=(
f"close failed for {record.proposal_id}: "
f"{type(exc).__name__}: {exc}"
),
component="runtime.monitor_cycle",
)
return PositionOutcome(
proposal_id=str(record.proposal_id),
action=decision.action,
closed=False,
reason=f"close_error:{type(exc).__name__}",
)
instruction_id = uuid4()
is_filled = order.state in {"filled", "open"}
next_status = "closed" if is_filled else "open"
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.create_instruction(
conn,
InstructionRecord(
instruction_id=instruction_id,
proposal_id=record.proposal_id,
kind="close_combo",
payload_json=json.dumps(order.raw, default=str, sort_keys=True),
sent_at=now,
actual_fill_eth=order.average_price_eth,
),
)
ctx.repository.update_position_status(
conn,
record.proposal_id,
status=next_status, # type: ignore[arg-type]
closed_at=now if is_filled else None,
close_reason=decision.action if is_filled else None,
debit_paid_eth=order.average_price_eth if is_filled else None,
pnl_eth=decision.pnl_estimate_eth if is_filled else None,
pnl_usd=decision.pnl_estimate_usd if is_filled else None,
now=now,
)
finally:
conn.close()
if not is_filled:
await ctx.alert_manager.critical(
source="monitor_cycle",
message=(
f"close rejected by broker for {record.proposal_id} "
f"(state={order.state})"
),
component="runtime.monitor_cycle",
)
return PositionOutcome(
proposal_id=str(record.proposal_id),
action=decision.action,
closed=False,
reason=f"broker_reject:{order.state}",
)
ctx.audit_log.append(
event="EXIT_FILLED",
payload={
"proposal_id": str(record.proposal_id),
"action": decision.action,
"pnl_usd": str(decision.pnl_estimate_usd),
"combo_instrument": order.combo_instrument,
},
now=now,
)
await ctx.telegram.notify_position_closed(
instrument=order.combo_instrument,
realized_pnl_usd=decision.pnl_estimate_usd,
reason=decision.action,
)
_log.info(
"exit filled: proposal=%s action=%s pnl_usd=%s",
record.proposal_id,
decision.action,
decision.pnl_estimate_usd,
)
return PositionOutcome(
proposal_id=str(record.proposal_id),
action=decision.action,
closed=True,
reason=decision.reason,
)
+208
View File
@@ -0,0 +1,208 @@
"""Façade that ties the runtime modules into a runnable engine.
The :class:`Orchestrator` is the single entry point for the CLI: it
holds the :class:`RuntimeContext`, the :class:`HealthCheck` state, and
the boot procedure (recover + boot environment check + scheduler
arming). Every concrete cycle is delegated to its own module so each
piece stays independently testable.
"""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from datetime import UTC, datetime
from decimal import Decimal
from pathlib import Path
from typing import Literal
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from cerbero_bite.config.mcp_endpoints import McpEndpoints
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.runtime.dependencies import RuntimeContext, build_runtime
from cerbero_bite.runtime.entry_cycle import EntryCycleResult, run_entry_cycle
from cerbero_bite.runtime.health_check import HealthCheck, HealthCheckResult
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
__all__ = ["Orchestrator"]
_log = logging.getLogger("cerbero_bite.runtime.orchestrator")
Environment = Literal["testnet", "mainnet"]
# Default cron schedule (matches docs/06-operational-flow.md table).
_CRON_ENTRY = "0 14 * * MON"
_CRON_MONITOR = "0 2,14 * * *"
_CRON_HEALTH = "*/5 * * * *"
@dataclass(frozen=True)
class _BootResult:
environment: Environment
health: HealthCheckResult
class Orchestrator:
"""Engine façade — boot, scheduler, manual cycle invocation."""
def __init__(
self,
ctx: RuntimeContext,
*,
expected_environment: Environment,
eur_to_usd: Decimal,
) -> None:
self._ctx = ctx
self._expected_env = expected_environment
self._eur_to_usd = eur_to_usd
self._health = HealthCheck(ctx, expected_environment=expected_environment)
self._scheduler: AsyncIOScheduler | None = None
@property
def context(self) -> RuntimeContext:
return self._ctx
@property
def expected_environment(self) -> Environment:
return self._expected_env
# ------------------------------------------------------------------
# Boot
# ------------------------------------------------------------------
async def boot(self) -> _BootResult:
"""Reconcile state, verify environment, run a first health probe."""
when = self._ctx.clock()
await recover_state(self._ctx, now=when)
info = await self._ctx.deribit.environment_info()
if info.environment != self._expected_env:
await self._ctx.alert_manager.critical(
source="orchestrator.boot",
message=(
f"Deribit environment mismatch at boot: expected "
f"{self._expected_env}, got {info.environment}"
),
component="runtime.orchestrator",
)
health = await self._health.run(now=when)
self._ctx.audit_log.append(
event="ENGINE_START",
payload={
"environment": info.environment,
"health": health.state,
"config_version": self._ctx.cfg.config_version,
},
now=when,
)
return _BootResult(environment=info.environment, health=health)
# ------------------------------------------------------------------
# Cycle invocations (used by scheduler jobs and CLI dry-run)
# ------------------------------------------------------------------
async def run_entry(
self, *, now: datetime | None = None
) -> EntryCycleResult:
return await run_entry_cycle(
self._ctx, eur_to_usd_rate=self._eur_to_usd, now=now
)
async def run_monitor(
self, *, now: datetime | None = None
) -> MonitorCycleResult:
return await run_monitor_cycle(self._ctx, now=now)
async def run_health(
self, *, now: datetime | None = None
) -> HealthCheckResult:
return await self._health.run(now=now)
# ------------------------------------------------------------------
# Scheduler lifecycle
# ------------------------------------------------------------------
def install_scheduler(
self,
*,
entry_cron: str = _CRON_ENTRY,
monitor_cron: str = _CRON_MONITOR,
health_cron: str = _CRON_HEALTH,
) -> AsyncIOScheduler:
"""Build the scheduler with the canonical job set, ready to start."""
async def _safe(name: str, coro_factory: Callable[[], Awaitable[object]]) -> None:
try:
await coro_factory()
except Exception as exc: # never let a tick kill the scheduler
_log.exception("scheduler tick %s raised", name)
await self._ctx.alert_manager.critical(
source=f"scheduler.{name}",
message=f"{type(exc).__name__}: {exc}",
component=f"runtime.{name}",
)
async def _entry() -> None:
await _safe("entry", self.run_entry)
async def _monitor() -> None:
await _safe("monitor", self.run_monitor)
async def _health() -> None:
await _safe("health", self.run_health)
self._scheduler = build_scheduler(
[
JobSpec(name="entry", cron=entry_cron, coro_factory=_entry),
JobSpec(name="monitor", cron=monitor_cron, coro_factory=_monitor),
JobSpec(name="health", cron=health_cron, coro_factory=_health),
]
)
return self._scheduler
async def run_forever(self) -> None:
"""Boot, install the scheduler, and block forever (until cancelled)."""
await self.boot()
scheduler = self.install_scheduler()
scheduler.start()
try:
await asyncio.Event().wait()
finally:
scheduler.shutdown(wait=False)
# ---------------------------------------------------------------------------
# Convenience builder for the CLI
# ---------------------------------------------------------------------------
def make_orchestrator(
*,
cfg: StrategyConfig,
endpoints: McpEndpoints,
token: str,
db_path: Path,
audit_path: Path,
expected_environment: Environment,
eur_to_usd: Decimal,
clock: Callable[[], datetime] | None = None,
) -> Orchestrator:
"""Build a fresh :class:`Orchestrator` ready for ``boot``/``run_*``."""
ctx = build_runtime(
cfg=cfg,
endpoints=endpoints,
token=token,
db_path=db_path,
audit_path=audit_path,
clock=clock or (lambda: datetime.now(UTC)),
)
return Orchestrator(
ctx, expected_environment=expected_environment, eur_to_usd=eur_to_usd
)
+117
View File
@@ -0,0 +1,117 @@
"""Recover state on engine boot (``docs/06-operational-flow.md`` §6).
The recovery loop never trades: it only aligns the SQLite state with
the broker. Any unresolvable discrepancy (a position the broker does
not know about, a leg that disappeared, etc.) arms the kill switch.
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from cerbero_bite.runtime.dependencies import RuntimeContext
from cerbero_bite.state import PositionRecord, transaction
from cerbero_bite.state import connect as connect_state
__all__ = ["recover_state"]
_log = logging.getLogger("cerbero_bite.runtime.recovery")
async def recover_state(ctx: RuntimeContext, *, now: datetime | None = None) -> None:
"""Reconcile SQLite positions with the broker's open positions."""
when = (now or ctx.clock()).astimezone(UTC)
conn = connect_state(ctx.db_path)
try:
in_flight: list[PositionRecord] = ctx.repository.list_positions(
conn, status="awaiting_fill"
)
in_flight += ctx.repository.list_positions(conn, status="closing")
opens = ctx.repository.list_positions(conn, status="open")
finally:
conn.close()
if not in_flight and not opens:
ctx.audit_log.append(
event="RECOVERY_DONE", payload={"reconciled": 0}, now=when
)
return
broker_positions = await ctx.deribit.get_positions(currency="USDC")
broker_by_instrument: dict[str, dict[str, object]] = {}
for p in broker_positions:
instrument = p.get("instrument")
if isinstance(instrument, str):
broker_by_instrument[instrument] = p
discrepancies: list[str] = []
# Awaiting-fill / closing → resolve to open or cancelled.
for record in in_flight:
seen = (
record.short_instrument in broker_by_instrument
and record.long_instrument in broker_by_instrument
)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
if seen:
# Both awaiting_fill and closing collapse to "open"
# once the broker confirms the legs are present.
next_status = "open"
ctx.repository.update_position_status(
conn,
record.proposal_id,
status=next_status, # type: ignore[arg-type]
opened_at=record.opened_at or when,
now=when,
)
else:
ctx.repository.update_position_status(
conn,
record.proposal_id,
status="cancelled",
closed_at=when,
close_reason="recovery_no_fill",
now=when,
)
discrepancies.append(
f"{record.proposal_id}: was {record.status}, broker shows nothing"
)
finally:
conn.close()
# Open positions → must be present on broker. If not, alarm.
for record in opens:
if (
record.short_instrument in broker_by_instrument
and record.long_instrument in broker_by_instrument
):
continue
discrepancies.append(
f"{record.proposal_id}: open in DB but missing on broker"
)
if discrepancies:
await ctx.alert_manager.critical(
source="recovery",
message="state inconsistencies detected: " + "; ".join(discrepancies),
component="runtime.recovery",
)
ctx.audit_log.append(
event="RECOVERY_DONE",
payload={
"reconciled": len(in_flight) + len(opens),
"discrepancies": len(discrepancies),
},
now=when,
)
_log.info(
"recovery done: reconciled=%d discrepancies=%d",
len(in_flight) + len(opens),
len(discrepancies),
)
+66
View File
@@ -0,0 +1,66 @@
"""APScheduler bootstrap (``docs/06-operational-flow.md``).
Wraps :class:`AsyncIOScheduler` so the orchestrator can register the
documented cron jobs in one place. The scheduler is built but not
started; ``start()`` must be called from inside the running event
loop.
"""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
__all__ = ["JobSpec", "build_scheduler"]
_log = logging.getLogger("cerbero_bite.runtime.scheduler")
@dataclass(frozen=True)
class JobSpec:
"""One row in the scheduler manifest."""
name: str
cron: str
coro_factory: Callable[[], Awaitable[None]]
def _parse_cron(expr: str) -> CronTrigger:
parts = expr.split()
if len(parts) != 5:
raise ValueError(f"cron must have 5 fields, got: {expr!r}")
minute, hour, day, month, day_of_week = parts
return CronTrigger(
minute=minute,
hour=hour,
day=day,
month=month,
day_of_week=day_of_week,
timezone="UTC",
)
def build_scheduler(jobs: list[JobSpec]) -> AsyncIOScheduler:
"""Return an :class:`AsyncIOScheduler` with all *jobs* registered.
The scheduler is *not* started — the caller is responsible for
invoking ``start()`` after constructing it on a running event loop.
"""
scheduler = AsyncIOScheduler(timezone="UTC")
for spec in jobs:
scheduler.add_job(
spec.coro_factory,
trigger=_parse_cron(spec.cron),
id=spec.name,
name=spec.name,
replace_existing=True,
coalesce=True,
misfire_grace_time=300,
)
_log.info("scheduled job %s with cron %s", spec.name, spec.cron)
return scheduler