"""Backfill IV-RV history from Deribit public REST API. Use case: il gate IV-RV adattivo richiede ≥30 giorni di storia per attivarsi (spec ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``). Quando la pipeline ha pochi giorni di tick live, questo script popola ``market_snapshots`` con record giornalieri storici calcolati da DVOL Deribit + closes ETH-PERPETUAL/BTC-PERPETUAL pubblici. Idempotente: usa ``INSERT OR REPLACE`` sulla PK ``(timestamp, asset)`` con timestamp fissato a 12:00 UTC del giorno di calendario. ``fetch_errors_json='{"backfill":true}'`` permette di distinguere i record sintetici dai tick live in audit. I record contribuiscono al gate adattivo come singoli punti (distinct-days policy), uno per giorno: lo statistical bias è coperto dalla spec §4.1. Esempio: python scripts/backfill_iv_rv.py --db data/state.sqlite --days 45 """ from __future__ import annotations import argparse import json import math import sqlite3 import statistics import urllib.request from dataclasses import dataclass from datetime import UTC, date, datetime, timedelta from decimal import Decimal __all__ = [ "BackfillRow", "build_backfill_records", "compute_rv30d_annualized", ] _DERIBIT = "https://www.deribit.com/api/v2/public" _RV_LOOKBACK_DAYS = 30 _TRADING_DAYS_PER_YEAR = 365 @dataclass(frozen=True) class BackfillRow: """Una riga sintetica destinata a ``market_snapshots``.""" timestamp: datetime asset: str spot: Decimal dvol: Decimal realized_vol_30d: Decimal iv_minus_rv: Decimal fetch_ok: bool = True # --------------------------------------------------------------------------- # Pure compute layer (TDD: tests/unit/test_backfill_iv_rv.py) # --------------------------------------------------------------------------- def compute_rv30d_annualized(closes: list[Decimal]) -> Decimal: """Volatilità realizzata 30g annualizzata in **punti vol** (% annuali). Args: closes: ``31`` close consecutivi (uno al giorno) — produce 30 log-returns. Returns: ``stdev(log_returns) * sqrt(365) * 100`` come ``Decimal``. Raises: ValueError: se ``len(closes) < 31``. """ if len(closes) < _RV_LOOKBACK_DAYS + 1: raise ValueError( f"need at least {_RV_LOOKBACK_DAYS + 1} closes, got {len(closes)}" ) log_returns = [ math.log(float(closes[i] / closes[i - 1])) for i in range(1, _RV_LOOKBACK_DAYS + 1) ] sigma_daily = statistics.stdev(log_returns) annualized = sigma_daily * math.sqrt(_TRADING_DAYS_PER_YEAR) * 100.0 return Decimal(str(annualized)) def build_backfill_records( *, asset: str, spots_by_day: dict[str, Decimal], dvols_by_day: dict[str, Decimal], oldest_day: date, ) -> list[BackfillRow]: """Compone le righe di backfill per i giorni nella finestra richiesta. Per ogni giorno target ``D`` (da ``oldest_day`` a oggi compreso) la riga viene emessa solo se: (a) DVOL e spot sono presenti per ``D``, (b) la serie di spot dispone dei 30 giorni precedenti necessari per il calcolo di RV30d. Il timestamp è fissato a 12:00 UTC, scelta che evita il rollover delle candele Deribit (vedi anomalia DVOL 00:00 UTC nei market snapshots live). """ sorted_days = sorted(spots_by_day.keys()) records: list[BackfillRow] = [] for day_str in sorted_days: day = date.fromisoformat(day_str) if day < oldest_day: continue if day_str not in dvols_by_day: continue rv_window = [ day - timedelta(days=i) for i in range(_RV_LOOKBACK_DAYS, -1, -1) ] if not all(d.isoformat() in spots_by_day for d in rv_window): continue closes = [spots_by_day[d.isoformat()] for d in rv_window] rv = compute_rv30d_annualized(closes) dvol = dvols_by_day[day_str] spot = spots_by_day[day_str] records.append( BackfillRow( timestamp=datetime(day.year, day.month, day.day, 12, 0, tzinfo=UTC), asset=asset, spot=spot, dvol=dvol, realized_vol_30d=rv, iv_minus_rv=dvol - rv, ) ) return records # --------------------------------------------------------------------------- # I/O layer (network + sqlite) # --------------------------------------------------------------------------- def _http_get_json(url: str, timeout_s: float = 30.0) -> dict: with urllib.request.urlopen(url, timeout=timeout_s) as resp: return json.loads(resp.read()) def fetch_dvol_daily(currency: str, days: int) -> dict[str, Decimal]: """Mappa ``YYYY-MM-DD -> DVOL close`` per gli ultimi ``days`` giorni.""" end_ms = int(datetime.now(UTC).timestamp() * 1000) start_ms = end_ms - days * 86_400_000 url = ( f"{_DERIBIT}/get_volatility_index_data" f"?currency={currency}" f"&start_timestamp={start_ms}&end_timestamp={end_ms}" f"&resolution=86400" ) payload = _http_get_json(url) data = (payload.get("result") or {}).get("data") or [] out: dict[str, Decimal] = {} for row in data: # row = [ts_ms, open, high, low, close] if not isinstance(row, list) or len(row) < 5: continue ts = datetime.fromtimestamp(row[0] / 1000, tz=UTC).date().isoformat() out[ts] = Decimal(str(row[4])) return out def fetch_spot_daily(instrument: str, days: int) -> dict[str, Decimal]: """Mappa ``YYYY-MM-DD -> close USD`` per ``instrument`` su ``days`` giorni.""" end_ms = int(datetime.now(UTC).timestamp() * 1000) start_ms = end_ms - days * 86_400_000 url = ( f"{_DERIBIT}/get_tradingview_chart_data" f"?instrument_name={instrument}" f"&start_timestamp={start_ms}&end_timestamp={end_ms}" f"&resolution=1D" ) payload = _http_get_json(url) result = payload.get("result") or {} ticks = result.get("ticks") or [] closes = result.get("close") or [] out: dict[str, Decimal] = {} for ts_ms, close in zip(ticks, closes, strict=False): ts = datetime.fromtimestamp(ts_ms / 1000, tz=UTC).date().isoformat() out[ts] = Decimal(str(close)) return out def write_records(db_path: str, records: list[BackfillRow]) -> int: """Insert/replace dei record in market_snapshots. Ritorna la rowcount.""" if not records: return 0 conn = sqlite3.connect(db_path) try: with conn: for r in records: conn.execute( "INSERT OR REPLACE INTO market_snapshots (" "timestamp, asset, spot, dvol, realized_vol_30d, iv_minus_rv, " "funding_perp_annualized, funding_cross_annualized, " "dealer_net_gamma, gamma_flip_level, oi_delta_pct_4h, " "liquidation_long_risk, liquidation_short_risk, " "macro_days_to_event, fetch_ok, fetch_errors_json" ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", ( r.timestamp.astimezone(UTC).isoformat(), r.asset, str(r.spot), str(r.dvol), str(r.realized_vol_30d), str(r.iv_minus_rv), None, None, None, None, None, None, None, None, 1 if r.fetch_ok else 0, '{"backfill":true}', ), ) return len(records) finally: conn.close() def backfill_asset(db_path: str, asset: str, days: int) -> int: """Esegue l'intero backfill per ``asset`` e ritorna il numero di record inseriti/sostituiti. """ instrument = f"{asset.upper()}-PERPETUAL" fetch_window_days = days + _RV_LOOKBACK_DAYS + 5 # margine per il lookback RV spots = fetch_spot_daily(instrument, fetch_window_days) dvols = fetch_dvol_daily(asset.upper(), fetch_window_days) today = datetime.now(UTC).date() oldest = today - timedelta(days=days) records = build_backfill_records( asset=asset.upper(), spots_by_day=spots, dvols_by_day=dvols, oldest_day=oldest, ) return write_records(db_path, records) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--db", default="data/state.sqlite", help="path a state.sqlite (default: data/state.sqlite)", ) parser.add_argument( "--days", type=int, default=45, help="quanti giorni di backfill emettere (default: 45)", ) parser.add_argument( "--assets", nargs="+", default=["ETH", "BTC"], help="asset symbols (default: ETH BTC)", ) args = parser.parse_args() total = 0 for asset in args.assets: n = backfill_asset(args.db, asset, args.days) print(f"{asset}: inserted/replaced {n} backfill rows") total += n print(f"TOTAL: {total}") return 0 if __name__ == "__main__": raise SystemExit(main())