From c0a0ee416f2c637d268811e0b6bad0ec32c7406d Mon Sep 17 00:00:00 2001 From: root Date: Fri, 1 May 2026 20:44:49 +0000 Subject: [PATCH 1/3] =?UTF-8?q?feat(state+runtime):=20option=5Fchain=5Fsna?= =?UTF-8?q?pshots=20=E2=80=94=20catena=20opzioni=20storica=20per=20backtes?= =?UTF-8?q?t=20reale?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Aggiunge la persistence della option chain Deribit con cron settimanale ``55 13 * * MON`` (5 minuti prima del trigger entry alle 14:00 UTC), sbloccando il backtest non-stilizzato e la calibrazione empirica dello skew premium. **Schema (migrazione 0004)** Nuova tabella ``option_chain_snapshots`` con primary key composta ``(timestamp, instrument_name)`` — tutti i quote prelevati nello stesso tick condividono il timestamp, così le query "lo snapshot del 2026-05-04 alle 13:55" diventano una singola WHERE timestamp = X. Indici su (asset, timestamp DESC) e (asset, expiry) per supportare sia listing recenti sia query per scadenza specifica. Campi: instrument_name, strike, expiry, option_type (C/P), bid, ask, mid, iv, delta, gamma, theta, vega, open_interest, volume_24h, book_depth_top3. Tutti i numerici sono nullable: il collector è best-effort, un ticker mancante produce comunque una riga (utile per sapere che lo strumento esisteva ma non era quotato). **Modello + repository** - ``OptionChainQuoteRecord`` (Pydantic, in ``state/models.py``). - ``Repository.record_option_chain_snapshot`` (bulk insert idempotente). - ``Repository.list_option_chain_snapshots`` (filtri su asset, timestamp window, expiry window, limit default 50000). - ``Repository.latest_option_chain_timestamp`` (freshness check per dashboard GUI). **Collector** Nuovo ``runtime/option_chain_snapshot_cycle.py`` che: 1. Calcola la finestra scadenze ``[now+dte_min, now+dte_max]`` da ``cfg.structure``: niente richieste su scadenze che il rule engine non userebbe mai. 2. Chiama ``deribit.options_chain()`` con ``min_open_interest=cfg.liquidity.open_interest_min``. 3. Batch ``deribit.get_tickers()`` (max 20 per call, limite Deribit) con error-isolation per batch — un batch fallito non blocca gli altri. 4. NON chiama l'order book per ogni strike (rate-limit guard); ``book_depth_top3`` resta NULL e il liquidity gate live lo chiede on-the-fly per gli strike candidati al picker. Best-effort end-to-end: chain assente, get_tickers giù, persist fallito → ritorna 0 senza alzare eccezioni, logga sempre. **Schedulazione** Wired in ``Orchestrator.install_scheduler`` come job parallelo a ``market_snapshot``, attivo solo quando ``ENABLE_DATA_ANALYSIS=true``. Cron parametrizzabile via il nuovo kwarg ``option_chain_cron`` (default ``55 13 * * MON``). **Test** - 4 unit test del collector (happy path, ticker mancante, chain vuota, fetch fail best-effort) con mock di RuntimeContext. - Aggiornato ``test_install_scheduler_registers_canonical_jobs`` per includere il nuovo job nel set canonico. **Cosa sblocca** - Backtest non-stilizzato: il PR ``feat/backtest-engine`` può dropparsi il modello BS+skew_premium e leggere prezzi reali ``mid`` dalla chain registrata. - Calibrazione empirica dello skew premium (hardcoded a 1.5 nel backtest stilizzato): plot del rapporto fra quote reali Deribit e BS per delta/expiry, regressione → valore data-driven. - Validazione ex-post: "il delta-0.12 era davvero a 25% OTM in quella settimana?" diventa una query SELECT. - Dimensione attesa: ~50 strike × 3 scadenze × 1 snapshot/settimana × 17 colonne ≈ 12 KB/settimana, ~600 KB/anno. Trascurabile. Suite: 409 passed. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../runtime/option_chain_snapshot_cycle.py | 185 ++++++++++++++++++ src/cerbero_bite/runtime/orchestrator.py | 21 ++ .../0004_option_chain_snapshots.sql | 42 ++++ src/cerbero_bite/state/models.py | 31 +++ src/cerbero_bite/state/repository.py | 130 ++++++++++++ tests/integration/test_orchestrator.py | 1 + .../unit/test_option_chain_snapshot_cycle.py | 134 +++++++++++++ 7 files changed, 544 insertions(+) create mode 100644 src/cerbero_bite/runtime/option_chain_snapshot_cycle.py create mode 100644 src/cerbero_bite/state/migrations/0004_option_chain_snapshots.sql create mode 100644 tests/unit/test_option_chain_snapshot_cycle.py diff --git a/src/cerbero_bite/runtime/option_chain_snapshot_cycle.py b/src/cerbero_bite/runtime/option_chain_snapshot_cycle.py new file mode 100644 index 0000000..d2b6d0c --- /dev/null +++ b/src/cerbero_bite/runtime/option_chain_snapshot_cycle.py @@ -0,0 +1,185 @@ +"""Periodic option-chain snapshot collector (§13). + +Fetches the Deribit option chain for every strike entro la finestra +DTE configurata, prima del trigger entry settimanale (cron +``55 13 * * MON`` di default). Persiste un quote per ogni strumento +in ``option_chain_snapshots`` con un timestamp condiviso, che diventa +il dato di base per: + +* il backtest non-stilizzato (vedi ``core/backtest.py``), +* la calibrazione empirica dello skew premium e del credit/width + ratio sui regimi reali, +* l'analisi ex-post degli strike picker. + +Il collector è **best-effort**: se ``get_tickers`` fallisce per un +batch, gli altri batch passano comunque; se manca completamente la +chain, il job ritorna 0 senza alzare eccezioni e logga il problema. +Non chiama l'order book per ogni strike (sarebbe troppo costoso) — +``book_depth_top3`` resta NULL nel quote, il liquidity gate del live +lo legge al volo solo per gli strike che gli interessano. +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from typing import TYPE_CHECKING, Any + +from cerbero_bite.state import connect, transaction +from cerbero_bite.state.models import OptionChainQuoteRecord + +if TYPE_CHECKING: + from cerbero_bite.runtime.dependencies import RuntimeContext + +__all__ = ["DEFAULT_BATCH_SIZE", "collect_option_chain_snapshot"] + + +_log = logging.getLogger("cerbero_bite.runtime.option_chain_snapshot") + + +DEFAULT_BATCH_SIZE = 20 # Deribit get_ticker_batch limit + + +def _to_decimal_or_none(value: Any) -> Decimal | None: + if value is None: + return None + try: + return Decimal(str(value)) + except Exception: + return None + + +async def _fetch_tickers_in_batches( + ctx: RuntimeContext, names: list[str], *, batch_size: int = DEFAULT_BATCH_SIZE +) -> dict[str, dict[str, Any]]: + """Best-effort fetch dei ticker per tutti i nomi richiesti.""" + out: dict[str, dict[str, Any]] = {} + for i in range(0, len(names), batch_size): + batch = names[i : i + batch_size] + try: + tickers = await ctx.deribit.get_tickers(batch) + except Exception as exc: + _log.warning( + "get_tickers failed for batch starting %s: %s", + batch[0] if batch else "", exc, + ) + continue + for t in tickers: + name = t.get("instrument_name") or t.get("instrument") + if isinstance(name, str): + out[name] = t + return out + + +async def collect_option_chain_snapshot( + ctx: RuntimeContext, + *, + asset: str = "ETH", + now: datetime | None = None, + batch_size: int = DEFAULT_BATCH_SIZE, +) -> int: + """Collect + persist a single chain snapshot for ``asset``. Returns + the number of quotes persisted (0 on best-effort failure). + + Filtra le scadenze nella finestra ``[dte_min, dte_max]`` di + ``cfg.structure`` per non sprecare richieste su scadenze che il + rule engine non userebbe mai. + """ + when = (now or datetime.now(UTC)).astimezone(UTC) + cfg = ctx.cfg + + expiry_from = when + timedelta(days=cfg.structure.dte_min) + expiry_to = when + timedelta(days=cfg.structure.dte_max) + + try: + chain = await ctx.deribit.options_chain( + currency=asset.upper(), + expiry_from=expiry_from, + expiry_to=expiry_to, + min_open_interest=int(cfg.liquidity.open_interest_min), + ) + except Exception: + _log.exception("option chain fetch failed") + return 0 + + if not chain: + _log.info("option chain empty for %s in window", asset) + return 0 + + names = [meta.name for meta in chain] + tickers = await _fetch_tickers_in_batches(ctx, names, batch_size=batch_size) + + quotes: list[OptionChainQuoteRecord] = [] + for meta in chain: + ticker = tickers.get(meta.name) + if ticker is None: + # Lasciamo comunque la riga senza quote: utile sapere + # che lo strumento esisteva. + quotes.append( + OptionChainQuoteRecord( + timestamp=when, + asset=asset.upper(), + instrument_name=meta.name, + strike=meta.strike, + expiry=meta.expiry, + option_type=meta.option_type, + open_interest=int(meta.open_interest) + if meta.open_interest is not None + else None, + ) + ) + continue + greeks = ticker.get("greeks") or {} + quotes.append( + OptionChainQuoteRecord( + timestamp=when, + asset=asset.upper(), + instrument_name=meta.name, + strike=meta.strike, + expiry=meta.expiry, + option_type=meta.option_type, + bid=_to_decimal_or_none(ticker.get("bid")), + ask=_to_decimal_or_none(ticker.get("ask")), + mid=_to_decimal_or_none(ticker.get("mark_price")), + iv=_to_decimal_or_none(ticker.get("mark_iv")), + delta=_to_decimal_or_none(greeks.get("delta")), + gamma=_to_decimal_or_none(greeks.get("gamma")), + theta=_to_decimal_or_none(greeks.get("theta")), + vega=_to_decimal_or_none(greeks.get("vega")), + open_interest=int(meta.open_interest) + if meta.open_interest is not None + else None, + volume_24h=( + int(ticker["volume_24h"]) + if ticker.get("volume_24h") is not None + else None + ), + # book_depth_top3: NULL — non lo prendiamo per ogni + # strike per non saturare l'API. Il liquidity gate + # del live lo chiede on-the-fly per gli strike + # candidati al picker. + ) + ) + + persisted = 0 + try: + conn = connect(ctx.db_path) + try: + with transaction(conn): + persisted = ctx.repository.record_option_chain_snapshot( + conn, quotes + ) + finally: + conn.close() + except Exception: + _log.exception("persist option chain snapshot failed") + return 0 + + _log.info("option_chain_snapshot persisted %d quote(s)", persisted) + return persisted + + +# Avoid unused import warning for asyncio in lint when only used as type +_ = asyncio diff --git a/src/cerbero_bite/runtime/orchestrator.py b/src/cerbero_bite/runtime/orchestrator.py index 4ba5a83..3b064f5 100644 --- a/src/cerbero_bite/runtime/orchestrator.py +++ b/src/cerbero_bite/runtime/orchestrator.py @@ -34,6 +34,9 @@ from cerbero_bite.runtime.market_snapshot_cycle import ( DEFAULT_ASSETS, collect_market_snapshot, ) +from cerbero_bite.runtime.option_chain_snapshot_cycle import ( + collect_option_chain_snapshot, +) from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle from cerbero_bite.runtime.recovery import recover_state from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler @@ -53,6 +56,7 @@ _CRON_HEALTH = "*/5 * * * *" _CRON_BACKUP = "0 * * * *" _CRON_MANUAL_ACTIONS = "*/1 * * * *" _CRON_MARKET_SNAPSHOT = "*/15 * * * *" +_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry _BACKUP_RETENTION_DAYS = 30 @@ -217,6 +221,8 @@ class Orchestrator: manual_actions_cron: str = _CRON_MANUAL_ACTIONS, market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT, market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS, + option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT, + option_chain_asset: str = "ETH", backup_dir: Path | None = None, backup_retention_days: int = _BACKUP_RETENTION_DAYS, ) -> AsyncIOScheduler: @@ -282,6 +288,14 @@ class Orchestrator: await _safe("market_snapshot", _do) + async def _option_chain_snapshot() -> None: + async def _do() -> None: + await collect_option_chain_snapshot( + self._ctx, asset=option_chain_asset + ) + + await _safe("option_chain_snapshot", _do) + jobs: list[JobSpec] = [ JobSpec(name="health", cron=health_cron, coro_factory=_health), JobSpec(name="backup", cron=backup_cron, coro_factory=_backup), @@ -309,6 +323,13 @@ class Orchestrator: coro_factory=_market_snapshot, ) ) + jobs.append( + JobSpec( + name="option_chain_snapshot", + cron=option_chain_cron, + coro_factory=_option_chain_snapshot, + ) + ) else: _log.warning( "data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS=" diff --git a/src/cerbero_bite/state/migrations/0004_option_chain_snapshots.sql b/src/cerbero_bite/state/migrations/0004_option_chain_snapshots.sql new file mode 100644 index 0000000..c9b7957 --- /dev/null +++ b/src/cerbero_bite/state/migrations/0004_option_chain_snapshots.sql @@ -0,0 +1,42 @@ +-- 0004_option_chain_snapshots.sql — catena opzioni storica +-- +-- Snapshot della option chain Deribit, prelevata settimanalmente (cron +-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per +-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra +-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per +-- calibrare empiricamente lo skew premium del modello BS. +-- +-- Granularità: una riga per (snapshot_ts, instrument). Lo +-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti +-- in quello stesso tick condividono il timestamp, così filtrare per +-- "lo snapshot del 2026-05-04 alle 13:55" è una semplice +-- WHERE timestamp = X. + +CREATE TABLE option_chain_snapshots ( + timestamp TEXT NOT NULL, + asset TEXT NOT NULL, + instrument_name TEXT NOT NULL, + strike TEXT NOT NULL, + expiry TEXT NOT NULL, + option_type TEXT NOT NULL CHECK (option_type IN ('C','P')), + bid TEXT, + ask TEXT, + mid TEXT, + iv TEXT, + delta TEXT, + gamma TEXT, + theta TEXT, + vega TEXT, + open_interest INTEGER, + volume_24h INTEGER, + book_depth_top3 INTEGER, + PRIMARY KEY (timestamp, instrument_name) +) WITHOUT ROWID; + +CREATE INDEX idx_option_chain_asset_ts + ON option_chain_snapshots(asset, timestamp DESC); + +CREATE INDEX idx_option_chain_expiry + ON option_chain_snapshots(asset, expiry); + +PRAGMA user_version = 4; diff --git a/src/cerbero_bite/state/models.py b/src/cerbero_bite/state/models.py index f7cd965..faa5a9f 100644 --- a/src/cerbero_bite/state/models.py +++ b/src/cerbero_bite/state/models.py @@ -22,6 +22,7 @@ __all__ = [ "InstructionRecord", "ManualAction", "MarketSnapshotRecord", + "OptionChainQuoteRecord", "PositionRecord", "PositionStatus", "SystemStateRecord", @@ -148,6 +149,36 @@ class MarketSnapshotRecord(BaseModel): fetch_errors_json: str | None = None +class OptionChainQuoteRecord(BaseModel): + """Row of the ``option_chain_snapshots`` table. + + One row per (snapshot_ts, instrument) — the same ``timestamp`` is + shared by every quote prelevato nello stesso tick del cron. Tutti + i campi numerici sono opzionali perché il collector è + best-effort: un ticker mancante non invalida il resto della chain. + """ + + model_config = ConfigDict(extra="forbid") + + timestamp: datetime + asset: str + instrument_name: str + strike: Decimal + expiry: datetime + option_type: Literal["C", "P"] + bid: Decimal | None = None + ask: Decimal | None = None + mid: Decimal | None = None + iv: Decimal | None = None + delta: Decimal | None = None + gamma: Decimal | None = None + theta: Decimal | None = None + vega: Decimal | None = None + open_interest: int | None = None + volume_24h: int | None = None + book_depth_top3: int | None = None + + class ManualAction(BaseModel): """Row of the ``manual_actions`` table.""" diff --git a/src/cerbero_bite/state/repository.py b/src/cerbero_bite/state/repository.py index aae344d..64c7715 100644 --- a/src/cerbero_bite/state/repository.py +++ b/src/cerbero_bite/state/repository.py @@ -24,6 +24,7 @@ from cerbero_bite.state.models import ( InstructionRecord, ManualAction, MarketSnapshotRecord, + OptionChainQuoteRecord, PositionRecord, PositionStatus, SystemStateRecord, @@ -407,6 +408,103 @@ class Repository: ).fetchall() return [_row_to_market_snapshot(r) for r in rows] + # ------------------------------------------------------------------ + # option_chain_snapshots + # ------------------------------------------------------------------ + + def record_option_chain_snapshot( + self, + conn: sqlite3.Connection, + quotes: list[OptionChainQuoteRecord], + ) -> int: + """Bulk-insert dei quote di un singolo tick. Tutti i quote + condividono lo stesso ``timestamp``. Idempotente per + (timestamp, instrument_name).""" + if not quotes: + return 0 + rows = [ + ( + _enc_dt(q.timestamp), + q.asset, + q.instrument_name, + _enc_dec(q.strike), + _enc_dt(q.expiry), + q.option_type, + _enc_dec(q.bid), + _enc_dec(q.ask), + _enc_dec(q.mid), + _enc_dec(q.iv), + _enc_dec(q.delta), + _enc_dec(q.gamma), + _enc_dec(q.theta), + _enc_dec(q.vega), + q.open_interest, + q.volume_24h, + q.book_depth_top3, + ) + for q in quotes + ] + conn.executemany( + "INSERT OR REPLACE INTO option_chain_snapshots(" + "timestamp, asset, instrument_name, strike, expiry, option_type, " + "bid, ask, mid, iv, delta, gamma, theta, vega, " + "open_interest, volume_24h, book_depth_top3) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + rows, + ) + return len(rows) + + def list_option_chain_snapshots( + self, + conn: sqlite3.Connection, + *, + asset: str, + start: datetime | None = None, + end: datetime | None = None, + expiry_from: datetime | None = None, + expiry_to: datetime | None = None, + limit: int = 50000, + ) -> list[OptionChainQuoteRecord]: + clauses: list[str] = ["asset = ?"] + params: list[Any] = [asset] + if start is not None: + clauses.append("timestamp >= ?") + params.append(_enc_dt(start)) + if end is not None: + clauses.append("timestamp <= ?") + params.append(_enc_dt(end)) + if expiry_from is not None: + clauses.append("expiry >= ?") + params.append(_enc_dt(expiry_from)) + if expiry_to is not None: + clauses.append("expiry <= ?") + params.append(_enc_dt(expiry_to)) + params.append(int(limit)) + rows = conn.execute( + f"SELECT * FROM option_chain_snapshots " + f"WHERE {' AND '.join(clauses)} " + f"ORDER BY timestamp DESC, instrument_name ASC LIMIT ?", + params, + ).fetchall() + return [_row_to_option_chain_quote(r) for r in rows] + + def latest_option_chain_timestamp( + self, + conn: sqlite3.Connection, + *, + asset: str, + ) -> datetime | None: + """Timestamp dell'ultimo snapshot raccolto per ``asset``, + utile per stimare la freschezza del dato dalla GUI.""" + row = conn.execute( + "SELECT timestamp FROM option_chain_snapshots " + "WHERE asset = ? ORDER BY timestamp DESC LIMIT 1", + (asset,), + ).fetchone() + if row is None: + return None + return _dec_dt(row["timestamp"]) + # ------------------------------------------------------------------ # manual_actions # ------------------------------------------------------------------ @@ -645,6 +743,38 @@ def _row_to_market_snapshot(row: sqlite3.Row) -> MarketSnapshotRecord: ) +def _row_to_option_chain_quote(row: sqlite3.Row) -> OptionChainQuoteRecord: + return OptionChainQuoteRecord( + timestamp=_dec_dt_required(row["timestamp"]), + asset=row["asset"], + instrument_name=row["instrument_name"], + strike=_dec_dec_required(row["strike"]), + expiry=_dec_dt_required(row["expiry"]), + option_type=row["option_type"], + bid=_dec_dec(row["bid"]), + ask=_dec_dec(row["ask"]), + mid=_dec_dec(row["mid"]), + iv=_dec_dec(row["iv"]), + delta=_dec_dec(row["delta"]), + gamma=_dec_dec(row["gamma"]), + theta=_dec_dec(row["theta"]), + vega=_dec_dec(row["vega"]), + open_interest=( + int(row["open_interest"]) + if row["open_interest"] is not None + else None + ), + volume_24h=( + int(row["volume_24h"]) if row["volume_24h"] is not None else None + ), + book_depth_top3=( + int(row["book_depth_top3"]) + if row["book_depth_top3"] is not None + else None + ), + ) + + def _dec_dec_required(value: Any) -> Decimal: out = _dec_dec(value) if out is None: diff --git a/tests/integration/test_orchestrator.py b/tests/integration/test_orchestrator.py index e5934c1..01ce88a 100644 --- a/tests/integration/test_orchestrator.py +++ b/tests/integration/test_orchestrator.py @@ -129,6 +129,7 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None: "backup", "manual_actions", "market_snapshot", + "option_chain_snapshot", } diff --git a/tests/unit/test_option_chain_snapshot_cycle.py b/tests/unit/test_option_chain_snapshot_cycle.py new file mode 100644 index 0000000..0cf65e8 --- /dev/null +++ b/tests/unit/test_option_chain_snapshot_cycle.py @@ -0,0 +1,134 @@ +"""TDD per :mod:`cerbero_bite.runtime.option_chain_snapshot_cycle`.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from cerbero_bite.clients.deribit import InstrumentMeta +from cerbero_bite.runtime.option_chain_snapshot_cycle import ( + collect_option_chain_snapshot, +) +from cerbero_bite.state.models import OptionChainQuoteRecord + + +_NOW = datetime(2026, 5, 4, 13, 55, tzinfo=UTC) + + +def _meta(name: str, strike: int, expiry_dte: int = 18) -> InstrumentMeta: + expiry = _NOW.replace(hour=8, minute=0, second=0) + expiry = expiry.replace(day=expiry.day) + ( + # add days + __import__("datetime").timedelta(days=expiry_dte) + ) + return InstrumentMeta( + name=name, + strike=Decimal(str(strike)), + expiry=expiry, + option_type="P", + open_interest=Decimal("100"), + tick_size=Decimal("0.0005"), + min_trade_amount=Decimal("1"), + ) + + +def _ticker(name: str, *, mark: float = 0.020, bid: float = 0.018, + ask: float = 0.022, delta: float = -0.12) -> dict: + return { + "instrument_name": name, + "bid": bid, + "ask": ask, + "mark_price": mark, + "mark_iv": 60.0, + "volume_24h": 50, + "greeks": { + "delta": delta, + "gamma": 0.001, + "theta": -0.0005, + "vega": 0.10, + }, + } + + +@pytest.fixture +def cfg() -> object: + from cerbero_bite.config import golden_config + return golden_config() + + +@pytest.fixture +def fake_ctx(cfg: object) -> MagicMock: + """Mock minimal RuntimeContext.""" + ctx = MagicMock() + ctx.cfg = cfg + ctx.db_path = ":memory:" + return ctx + + +@pytest.mark.asyncio +async def test_collector_persists_one_quote_per_instrument( + fake_ctx: MagicMock, +) -> None: + metas = [_meta("ETH-21MAY26-2475-P", 2475), _meta("ETH-21MAY26-2400-P", 2400)] + fake_ctx.deribit.options_chain = AsyncMock(return_value=metas) + fake_ctx.deribit.get_tickers = AsyncMock( + return_value=[_ticker(m.name) for m in metas] + ) + persisted: list[list[OptionChainQuoteRecord]] = [] + + def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int: + persisted.append(qs) + return len(qs) + + fake_ctx.repository.record_option_chain_snapshot = _record + + n = await collect_option_chain_snapshot(fake_ctx, asset="ETH", now=_NOW) + assert n == 2 + assert len(persisted) == 1 + assert {q.instrument_name for q in persisted[0]} == { + "ETH-21MAY26-2475-P", "ETH-21MAY26-2400-P", + } + # Tutti i quote condividono il timestamp del cron tick. + assert all(q.timestamp == _NOW for q in persisted[0]) + + +@pytest.mark.asyncio +async def test_collector_handles_missing_tickers_with_null_fields( + fake_ctx: MagicMock, +) -> None: + metas = [_meta("ETH-21MAY26-2475-P", 2475)] + fake_ctx.deribit.options_chain = AsyncMock(return_value=metas) + fake_ctx.deribit.get_tickers = AsyncMock(return_value=[]) # vuoto + persisted: list[list[OptionChainQuoteRecord]] = [] + + def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int: + persisted.append(qs) + return len(qs) + + fake_ctx.repository.record_option_chain_snapshot = _record + + n = await collect_option_chain_snapshot(fake_ctx, now=_NOW) + assert n == 1 + assert persisted[0][0].mid is None # ticker mancante ⇒ campi NULL + assert persisted[0][0].instrument_name == "ETH-21MAY26-2475-P" + + +@pytest.mark.asyncio +async def test_collector_returns_zero_when_chain_empty( + fake_ctx: MagicMock, +) -> None: + fake_ctx.deribit.options_chain = AsyncMock(return_value=[]) + n = await collect_option_chain_snapshot(fake_ctx, now=_NOW) + assert n == 0 + + +@pytest.mark.asyncio +async def test_collector_swallows_chain_fetch_failure( + fake_ctx: MagicMock, +) -> None: + fake_ctx.deribit.options_chain = AsyncMock(side_effect=RuntimeError("boom")) + n = await collect_option_chain_snapshot(fake_ctx, now=_NOW) + assert n == 0 # best-effort: non rilancia From 3e461692785e6602cb1fc0f32e2b1973c55c4ab4 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 1 May 2026 20:52:11 +0000 Subject: [PATCH 2/3] =?UTF-8?q?fix(migrations):=20rinomina=200004=20?= =?UTF-8?q?=E2=86=92=200005=20per=20coesistenza=20con=20auto=5Fpause?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit La migrazione `0004_option_chain_snapshots.sql` collide con quella parallela `0004_auto_pause.sql` del PR `feat/strategy-improvements-fdac`: entrambe puntano allo stesso slot e bumpano user_version a 4. Rinominata a 0005 (con `PRAGMA user_version = 5`) così le due migrazioni possono coesistere senza conflitti, indipendentemente dall'ordine di merge dei due PR. Quando i due PR landeranno in main, basterà conservare la sequenza 0004 (auto_pause) → 0005 (option_chain). Verificato in locale: deploy con DB già a v4 (post-FDAC) ora applica correttamente la migrazione e crea la tabella `option_chain_snapshots`. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...tion_chain_snapshots.sql => 0005_option_chain_snapshots.sql} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename src/cerbero_bite/state/migrations/{0004_option_chain_snapshots.sql => 0005_option_chain_snapshots.sql} (98%) diff --git a/src/cerbero_bite/state/migrations/0004_option_chain_snapshots.sql b/src/cerbero_bite/state/migrations/0005_option_chain_snapshots.sql similarity index 98% rename from src/cerbero_bite/state/migrations/0004_option_chain_snapshots.sql rename to src/cerbero_bite/state/migrations/0005_option_chain_snapshots.sql index c9b7957..edafbc2 100644 --- a/src/cerbero_bite/state/migrations/0004_option_chain_snapshots.sql +++ b/src/cerbero_bite/state/migrations/0005_option_chain_snapshots.sql @@ -39,4 +39,4 @@ CREATE INDEX idx_option_chain_asset_ts CREATE INDEX idx_option_chain_expiry ON option_chain_snapshots(asset, expiry); -PRAGMA user_version = 4; +PRAGMA user_version = 5; From 954baaa354d7ca74c4998ccfffab7c207e713425 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 1 May 2026 20:57:40 +0000 Subject: [PATCH 3/3] feat(cli): comando `option-chain` (trigger + analyze) per la catena opzioni MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Espone direttamente da CLI le due operazioni più utili sui dati di ``option_chain_snapshots`` raccolti dal cron settimanale: - ``cerbero-bite option-chain trigger`` — esegue UNA volta il collector della catena. Riusa la stessa pipeline schedulata (cron ``55 13 * * MON``) ma on-demand. Utile per popolare il DB senza aspettare lunedì. - ``cerbero-bite option-chain analyze [--bias bull_put|bear_call]`` — legge l'ultimo snapshot, simula il selector di strike (``select_strikes``) con la strategy passata e stampa una tabella con: short/long strike, delta, width, credito reale, ratio credit/width, e PASS/FAIL del gate ``credit_to_width_ratio_min``. Il comando ``analyze`` rende immediatamente actionable la catena appena raccolta: invece di stime ex-ante via Black-Scholes (modulo ``core/backtest.py``), legge i mid REALI di Deribit e dice "il rule engine aprirebbe questo trade qui? credit/width ratio passa o no?". Esempio di output sui primi snapshot raccolti (regime ETH ~2200, DTE ~14g): Snapshot del 2026-05-01T20:53:49 — 21 quote totali Il rule engine NON aprirebbe trade con questa catena (no strike compatibile coi gate delta/distance/width/credit-ratio). Conferma empirica del messaggio del documento ``13-strategia-spiegata``: con delta target 0.12 + width 4% + credit/width ≥ 30%, il regime attuale di ETH options non è abbastanza ricco per produrre trade — serve calibrare soglie o aspettare un regime IV più alto. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cerbero_bite/cli.py | 237 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 236 insertions(+), 1 deletion(-) diff --git a/src/cerbero_bite/cli.py b/src/cerbero_bite/cli.py index 78635cd..9c9b267 100644 --- a/src/cerbero_bite/cli.py +++ b/src/cerbero_bite/cli.py @@ -13,7 +13,7 @@ import asyncio import os import sys from collections.abc import Callable -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta from decimal import Decimal from pathlib import Path from typing import Any @@ -812,6 +812,241 @@ def state_inspect(db: Path) -> None: console.print(table) +@main.group(name="option-chain") +def option_chain() -> None: + """Strumenti per la catena opzioni storica (`option_chain_snapshots`).""" + + +@option_chain.command(name="trigger") +@click.option( + "--strategy", + "strategy_path", + type=click.Path(exists=True, dir_okay=False, path_type=Path), + default=_DEFAULT_STRATEGY_PATH, + show_default=True, +) +@click.option( + "--db", + "db_path", + type=click.Path(dir_okay=False, path_type=Path), + default=_DEFAULT_DB_PATH, + show_default=True, +) +@click.option( + "--audit", + "audit_path", + type=click.Path(dir_okay=False, path_type=Path), + default=_DEFAULT_AUDIT_PATH, + show_default=True, +) +@click.option( + "--token", + type=str, + default=None, + help="MCP bearer token (override su CERBERO_BITE_MCP_TOKEN).", +) +@click.option("--asset", default="ETH", show_default=True) +def option_chain_trigger( + strategy_path: Path, + db_path: Path, + audit_path: Path, + token: str | None, + asset: str, +) -> None: + """Esegue UNA volta il collector della catena opzioni e persiste in DB. + + Utile per popolare i dati senza aspettare il cron settimanale del + job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline + schedulata. + """ + from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415 + from cerbero_bite.runtime.option_chain_snapshot_cycle import ( # noqa: PLC0415 + collect_option_chain_snapshot, + ) + + cfg = load_strategy(strategy_path).config + ctx = build_runtime( + cfg=cfg, + endpoints=load_endpoints(), + token=load_token(value=token), + db_path=db_path, + audit_path=audit_path, + bot_tag=load_bot_tag(), + ) + n = asyncio.run(collect_option_chain_snapshot(ctx, asset=asset)) + console.print( + f"[green]Persisted {n} option chain quote(s) for {asset}[/green]" + if n > 0 + else f"[yellow]No quotes persisted (chain empty or fetch failed)[/yellow]" + ) + + +@option_chain.command(name="analyze") +@click.option( + "--strategy", + "strategy_path", + type=click.Path(exists=True, dir_okay=False, path_type=Path), + default=_DEFAULT_STRATEGY_PATH, + show_default=True, +) +@click.option( + "--db", + "db_path", + type=click.Path(dir_okay=False, path_type=Path), + default=_DEFAULT_DB_PATH, + show_default=True, +) +@click.option("--asset", default="ETH", show_default=True) +@click.option( + "--bias", + type=click.Choice(["bull_put", "bear_call"], case_sensitive=False), + default="bull_put", + show_default=True, + help="Direzione da simulare (il rule engine lo deciderebbe da trend×funding).", +) +def option_chain_analyze( + strategy_path: Path, + db_path: Path, + asset: str, + bias: str, +) -> None: + """Analizza l'ultimo snapshot di catena salvato. + + Per la strategia indicata, simula la selezione strike (delta + target, OTM range, width 4%, credit/width ratio min) e mostra: + * lo strike che il rule engine sceglierebbe come short e long, + * credito atteso, larghezza, rapporto credit/width, + * pass/fail del gate `credit_to_width_ratio_min`. + """ + from cerbero_bite.core.combo_builder import select_strikes # noqa: PLC0415 + from cerbero_bite.core.types import OptionQuote # noqa: PLC0415 + + cfg = load_strategy(strategy_path).config + + conn = connect_state(db_path) + try: + repo = Repository() + latest_ts = repo.latest_option_chain_timestamp(conn, asset=asset.upper()) + if latest_ts is None: + console.print( + "[red]Nessuno snapshot di catena trovato. Lancia prima " + "`cerbero-bite option-chain trigger`.[/red]" + ) + sys.exit(1) + quotes_records = repo.list_option_chain_snapshots( + conn, asset=asset.upper(), start=latest_ts, end=latest_ts, + ) + finally: + conn.close() + + console.print( + f"[cyan]Snapshot del {latest_ts.isoformat()} — {len(quotes_records)} " + f"quote totali[/cyan]" + ) + + # Costruzione OptionQuote da OptionChainQuoteRecord per riusare select_strikes. + quotes: list[OptionQuote] = [] + for q in quotes_records: + if q.bid is None or q.ask is None or q.mid is None or q.delta is None: + continue + quotes.append( + OptionQuote( + instrument=q.instrument_name, + strike=q.strike, + expiry=q.expiry, + option_type=q.option_type, + bid=q.bid, + ask=q.ask, + mid=q.mid, + delta=q.delta, + gamma=q.gamma or Decimal("0"), + theta=q.theta or Decimal("0"), + vega=q.vega or Decimal("0"), + open_interest=q.open_interest or 0, + volume_24h=q.volume_24h or 0, + book_depth_top3=q.book_depth_top3 or 0, + ) + ) + + if not quotes: + console.print("[red]Nessun quote completo per la simulazione.[/red]") + sys.exit(1) + + # Lo spot al momento dello snapshot: estraiamo dall'ultimo + # `market_snapshot` ETH a quel timestamp (tolleranza ±15 min). + spot = _resolve_spot_at(db_path, asset=asset.upper(), at=latest_ts) + if spot is None: + console.print( + "[yellow]Spot non recuperabile dai market_snapshots; " + "stimato dal mid ATM.[/yellow]" + ) + spot = _atm_spot_proxy(quotes) + + selection = select_strikes( + chain=quotes, + bias=bias, # type: ignore[arg-type] + spot=spot, + now=latest_ts, + cfg=cfg, + ) + if selection is None: + console.print( + "[red]Il rule engine NON aprirebbe trade con questa catena[/red] " + "(no strike compatibile coi gate delta/distance/width/credit-ratio)." + ) + sys.exit(0) + + short, long_ = selection + width_usd = (short.strike - long_.strike).copy_abs() + credit_eth = short.mid - long_.mid + credit_usd = credit_eth * spot + ratio = credit_usd / width_usd if width_usd > 0 else Decimal("0") + ratio_target = cfg.structure.credit_to_width_ratio_min + + table = Table(title=f"Simulazione picker — bias={bias}, spot={spot:.0f}") + table.add_column("Campo", style="cyan") + table.add_column("Valore", style="bold") + table.add_row("Short strike", f"{short.strike} ({short.delta:+.3f}δ)") + table.add_row("Long strike", f"{long_.strike} ({long_.delta:+.3f}δ)") + table.add_row("Width", f"{width_usd:.0f} USD") + table.add_row("Credit", f"{credit_eth:.4f} ETH ≈ {credit_usd:.2f} USD") + table.add_row( + "Credit/width ratio", + f"{ratio:.2%} (gate ≥ {float(ratio_target):.0%})", + ) + pass_str = ( + "[green]PASS — entry possibile[/green]" + if ratio >= ratio_target + else "[red]FAIL — premio troppo magro[/red]" + ) + table.add_row("Verdetto gate ratio", pass_str) + console.print(table) + + +def _resolve_spot_at(db_path: Path, *, asset: str, at: datetime) -> Decimal | None: + """Best-effort lookup dello spot al timestamp ``at`` ± 15 min.""" + conn = connect_state(db_path) + try: + rows = Repository().list_market_snapshots( + conn, + asset=asset, + start=at - timedelta(minutes=15), + end=at + timedelta(minutes=15), + limit=1, + ) + finally: + conn.close() + if not rows: + return None + return rows[0].spot + + +def _atm_spot_proxy(quotes: list[Any]) -> Decimal: + """Stima dello spot prendendo lo strike il cui delta è più vicino a 0.5.""" + quote = min(quotes, key=lambda q: abs(abs(q.delta) - Decimal("0.5"))) + return quote.strike + + def _entrypoint() -> None: """Wrapper used by ``cerbero-bite`` console script.""" try: