"""Periodic option-chain snapshot collector (§13). Fetches the Deribit option chain for every strike entro la finestra DTE configurata, prima del trigger entry settimanale (cron ``55 13 * * MON`` di default). Persiste un quote per ogni strumento in ``option_chain_snapshots`` con un timestamp condiviso, che diventa il dato di base per: * il backtest non-stilizzato (vedi ``core/backtest.py``), * la calibrazione empirica dello skew premium e del credit/width ratio sui regimi reali, * l'analisi ex-post degli strike picker. Il collector è **best-effort**: se ``get_tickers`` fallisce per un batch, gli altri batch passano comunque; se manca completamente la chain, il job ritorna 0 senza alzare eccezioni e logga il problema. Non chiama l'order book per ogni strike (sarebbe troppo costoso) — ``book_depth_top3`` resta NULL nel quote, il liquidity gate del live lo legge al volo solo per gli strike che gli interessano. """ from __future__ import annotations import asyncio import logging from datetime import UTC, datetime, timedelta from decimal import Decimal from typing import TYPE_CHECKING, Any from cerbero_bite.state import connect, transaction from cerbero_bite.state.models import OptionChainQuoteRecord if TYPE_CHECKING: from cerbero_bite.runtime.dependencies import RuntimeContext __all__ = ["DEFAULT_BATCH_SIZE", "collect_option_chain_snapshot"] _log = logging.getLogger("cerbero_bite.runtime.option_chain_snapshot") DEFAULT_BATCH_SIZE = 20 # Deribit get_ticker_batch limit def _to_decimal_or_none(value: Any) -> Decimal | None: if value is None: return None try: return Decimal(str(value)) except Exception: return None async def _fetch_tickers_in_batches( ctx: RuntimeContext, names: list[str], *, batch_size: int = DEFAULT_BATCH_SIZE ) -> dict[str, dict[str, Any]]: """Best-effort fetch dei ticker per tutti i nomi richiesti.""" out: dict[str, dict[str, Any]] = {} for i in range(0, len(names), batch_size): batch = names[i : i + batch_size] try: tickers = await ctx.deribit.get_tickers(batch) except Exception as exc: _log.warning( "get_tickers failed for batch starting %s: %s", batch[0] if batch else "", exc, ) continue for t in tickers: name = t.get("instrument_name") or t.get("instrument") if isinstance(name, str): out[name] = t return out async def collect_option_chain_snapshot( ctx: RuntimeContext, *, asset: str = "ETH", now: datetime | None = None, batch_size: int = DEFAULT_BATCH_SIZE, ) -> int: """Collect + persist a single chain snapshot for ``asset``. Returns the number of quotes persisted (0 on best-effort failure). Filtra le scadenze nella finestra ``[dte_min, dte_max]`` di ``cfg.structure`` per non sprecare richieste su scadenze che il rule engine non userebbe mai. """ when = (now or datetime.now(UTC)).astimezone(UTC) cfg = ctx.cfg expiry_from = when + timedelta(days=cfg.structure.dte_min) expiry_to = when + timedelta(days=cfg.structure.dte_max) try: chain = await ctx.deribit.options_chain( currency=asset.upper(), expiry_from=expiry_from, expiry_to=expiry_to, min_open_interest=int(cfg.liquidity.open_interest_min), ) except Exception: _log.exception("option chain fetch failed") return 0 if not chain: _log.info("option chain empty for %s in window", asset) return 0 names = [meta.name for meta in chain] tickers = await _fetch_tickers_in_batches(ctx, names, batch_size=batch_size) quotes: list[OptionChainQuoteRecord] = [] for meta in chain: ticker = tickers.get(meta.name) if ticker is None: # Lasciamo comunque la riga senza quote: utile sapere # che lo strumento esisteva. quotes.append( OptionChainQuoteRecord( timestamp=when, asset=asset.upper(), instrument_name=meta.name, strike=meta.strike, expiry=meta.expiry, option_type=meta.option_type, open_interest=int(meta.open_interest) if meta.open_interest is not None else None, ) ) continue greeks = ticker.get("greeks") or {} quotes.append( OptionChainQuoteRecord( timestamp=when, asset=asset.upper(), instrument_name=meta.name, strike=meta.strike, expiry=meta.expiry, option_type=meta.option_type, bid=_to_decimal_or_none(ticker.get("bid")), ask=_to_decimal_or_none(ticker.get("ask")), mid=_to_decimal_or_none(ticker.get("mark_price")), iv=_to_decimal_or_none(ticker.get("mark_iv")), delta=_to_decimal_or_none(greeks.get("delta")), gamma=_to_decimal_or_none(greeks.get("gamma")), theta=_to_decimal_or_none(greeks.get("theta")), vega=_to_decimal_or_none(greeks.get("vega")), open_interest=int(meta.open_interest) if meta.open_interest is not None else None, volume_24h=( int(ticker["volume_24h"]) if ticker.get("volume_24h") is not None else None ), # book_depth_top3: NULL — non lo prendiamo per ogni # strike per non saturare l'API. Il liquidity gate # del live lo chiede on-the-fly per gli strike # candidati al picker. ) ) persisted = 0 try: conn = connect(ctx.db_path) try: with transaction(conn): persisted = ctx.repository.record_option_chain_snapshot( conn, quotes ) finally: conn.close() except Exception: _log.exception("persist option chain snapshot failed") return 0 _log.info("option_chain_snapshot persisted %d quote(s)", persisted) return persisted # Avoid unused import warning for asyncio in lint when only used as type _ = asyncio