From b1836d91c27bec4cbc5aee66f3cd0d901513c07b Mon Sep 17 00:00:00 2001 From: root Date: Sun, 10 May 2026 08:52:05 +0000 Subject: [PATCH] refactor(core): IV-RV adattivo distinct-days policy + backfill Deribit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sblocca il warmup hard del gate IV-RV adattivo (~21 giorni residui) permettendo di mischiare cadenze diverse (tick live 15min + backfill giornaliero) senza assumere il fattore costante 96 tick/giorno. API change (no backwards-compat shims): * compute_adaptive_threshold(history, *, n_days, percentile, absolute_floor): rimossi `min_days`/`target_days`. La selezione finestra (target_days/min_days/intera storia) si sposta al caller. Warmup hard quando `n_days == 0`. * repository: rimosso `iv_rv_history`; aggiunti `count_iv_rv_distinct_days` (COUNT DISTINCT substr(ts,1,10)) e `iv_rv_values_for_window`. * EntryContext aggiunge `iv_rv_n_days: int = 0`. entry_cycle calcola n_days, sceglie window_days e popola il context. Audit `iv_rv_n_days` reale (non più len/96). * GUI Calibrazione: counter giorni distinti tramite set di date. * Spec aggiornata con errata 2026-05-10 e nuova warmup table. Backfill (scripts/backfill_iv_rv.py, stdlib-only): * Fetch DVOL daily + ETH/BTC-PERPETUAL closes da Deribit public REST. * Calcolo RV30d annualizzato (stdev log-return × √365 × 100). * INSERT OR REPLACE in market_snapshots con timestamp 12:00 UTC e fetch_errors_json='{"backfill":true}' per distinzione audit. * Compute layer testato (9 test): RV su prezzi costanti/monotoni/ alternati, build_records con cutoff e missing data. Verifica live post-deploy (10 mag 2026 08:50 UTC): * ETH: n_days=46, P25=2.21 vol pt, IV-RV=10.05 → gate PASS * BTC: n_days=46, P25=5.69 vol pt, IV-RV=8.60 → gate PASS 509 test passati (500 esistenti + 9 backfill), ruff pulito. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../2026-05-08-iv-rv-adaptive-gate-design.md | 80 ++-- scripts/backfill_iv_rv.py | 281 +++++++++++ src/cerbero_bite/core/adaptive_threshold.py | 53 +-- src/cerbero_bite/core/entry_validator.py | 19 +- .../gui/pages/6_📐_Calibrazione.py | 11 +- src/cerbero_bite/runtime/entry_cycle.py | 92 ++-- src/cerbero_bite/state/repository.py | 44 +- .../test_entry_cycle_iv_rv_adaptive.py | 48 +- tests/unit/test_adaptive_threshold.py | 177 +++---- tests/unit/test_backfill_iv_rv.py | 176 +++++++ tests/unit/test_entry_validator.py | 69 ++- tests/unit/test_repository_iv_rv_helpers.py | 441 +++++++++++++----- 12 files changed, 1131 insertions(+), 360 deletions(-) create mode 100644 scripts/backfill_iv_rv.py create mode 100644 tests/unit/test_backfill_iv_rv.py diff --git a/docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md b/docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md index 135f6ca..69e05ff 100644 --- a/docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md +++ b/docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md @@ -35,26 +35,33 @@ L'hybrid bilancia due controlli additivi: ## 4. Comportamento del gate +> **Errata 2026-05-10** — design originale assumeva `n_days = len(history) // 96` (cadenza fissa 96 tick/giorno). Refattorizzato a **distinct-days policy**: il caller interroga il repository per (a) il numero di giorni di calendario distinti coperti e (b) i valori della finestra scelta. Questo permette di mischiare cadenze (tick live 15 min + backfill daily) senza assumere un fattore costante. Sotto il pseudo-codice aggiornato. + ``` def validate_iv_richness_adaptive(ctx, cfg, repo): if not cfg.entry.iv_minus_rv_filter_enabled: return PASS # gate off - # 1) Soglia adattiva - history = repo.iv_rv_history(asset=ctx.asset, max_days=cfg.window_target_days) - n_days = len(history) / 96 # 96 tick/giorno + # 1) Soglia adattiva — distinct-days policy + n_days = repo.count_iv_rv_distinct_days( + asset=ctx.asset, max_days=cfg.window_target_days, + ) if n_days < 1: - return PASS # warmup hard: <1g, no gate + return PASS # warmup hard: nessun giorno coperto if n_days >= cfg.window_target_days: - window = history[-cfg.window_target_days*96:] # ≥60g → finestra fissa 60g + window_days = cfg.window_target_days # ≥60g → finestra fissa 60g elif n_days >= cfg.window_min_days: - window = history[-cfg.window_min_days*96:] # 30-60g → finestra fissa 30g + window_days = cfg.window_min_days # 30-60g → finestra fissa 30g else: - window = history # 1-30g → tutta la storia disponibile + window_days = cfg.window_target_days # 1-30g → query tutta la storia disp. - threshold = max(percentile(window, cfg.percentile), + history = repo.iv_rv_values_for_window( + asset=ctx.asset, window_days=window_days, + ) + + threshold = max(percentile(history, cfg.percentile), cfg.absolute_floor) if ctx.iv_minus_rv < threshold: @@ -72,13 +79,17 @@ def validate_iv_richness_adaptive(ctx, cfg, repo): ### 4.1 Warmup behavior +Tutte le soglie sono espresse in **giorni di calendario distinti** coperti da almeno un record valido (`fetch_ok=1` ∧ `iv_minus_rv IS NOT NULL`). + | storia disponibile | finestra usata | comportamento | |---|---|---| -| < 1 giorno (96 tick) | — | gate disabled (PASS), log `GATE_WARMUP_INSUFFICIENT` | -| 1 g ≤ storia < 30 g | tutta la storia | percentile della finestra disponibile (decisione utente) | -| 30 g ≤ storia < 60 g | ultimi 30 g | finestra fissa 30g | +| 0 giorni distinti | — | gate disabled (PASS), log `GATE_WARMUP_INSUFFICIENT` | +| 1 g ≤ giorni < 30 g | tutta la storia | percentile della finestra disponibile (decisione utente) | +| 30 g ≤ giorni < 60 g | ultimi 30 g | finestra fissa 30g | | ≥ 60 g | ultimi 60 g | finestra fissa 60g (target) | +I valori della finestra contribuiscono uno-a-uno al percentile: un tick a 15 min e un record di backfill daily hanno lo stesso peso. Mix di cadenze diverse è statisticamente sbilanciato finché i tick live non saturano la finestra; questa è una scelta deliberata per non rinunciare allo storico backfill. + ### 4.2 Soglia = `max(P25, floor)` `floor` è il vecchio `iv_minus_rv_min` riutilizzato come *absolute floor*. Permette: @@ -137,29 +148,38 @@ Se `iv_minus_rv_adaptive_enabled=False` e `iv_minus_rv_filter_enabled=True`, il ## 6. Architettura -### 6.1 Nuovo modulo `core/adaptive_threshold.py` +### 6.1 Modulo `core/adaptive_threshold.py` -Funzione pura, testabile senza I/O: +Funzione pura, testabile senza I/O. La selezione della finestra è +delegata al caller (separation of concerns): ```python def compute_adaptive_threshold( - history: list[Decimal], + history: Sequence[Decimal], *, + n_days: int, percentile: Decimal, absolute_floor: Decimal, - min_days: int, - target_days: int, ) -> Decimal | None: - """Ritorna None se warmup hard (<96 tick), altrimenti max(P_q, floor).""" + """Ritorna None se warmup hard (n_days==0 o history vuota), + altrimenti max(P_q(history), absolute_floor).""" ``` ### 6.2 Repository (`state/repository.py`) -Due nuovi metodi su `MarketSnapshotRepository`: +Tre metodi su `Repository` (uno preesistente): ```python -def iv_rv_history(self, *, asset: str, max_days: int) -> list[Decimal]: - """IV-RV ordinato ASC, finestra max_days, fetch_ok=1, NULL filtrati.""" +def count_iv_rv_distinct_days( + self, *, asset: str, max_days: int, as_of: datetime | None = None, +) -> int: + """Numero di giorni di calendario distinti con almeno un IV-RV + valido nell'intervallo [as_of - max_days, as_of].""" + +def iv_rv_values_for_window( + self, *, asset: str, window_days: int, as_of: datetime | None = None, +) -> list[Decimal]: + """Valori IV-RV ordinati ASC su [as_of - window_days, as_of].""" def dvol_lookback(self, *, asset: str, hours: int) -> Decimal | None: """DVOL del tick più vicino a now-hours, ±15min tolerance. None se gap.""" @@ -219,15 +239,23 @@ Principio: **fail-open** in tutti i casi di dato mancante. Il gate adattivo è a ### 9.1 Unit — `core/adaptive_threshold.py` -- Warmup: history vuota → None -- Warmup: history < 96 tick → None -- Sotto min_days: 200 tick → P25 sui 200 -- Tra min e target: 4000 tick → P25 sui 4000 -- Oltre target: 10000 tick → P25 sugli ultimi `target_days*96` +- Warmup: `n_days=0` → None +- Warmup difensivo: `n_days=0` ma history non vuota → None +- Difensivo: history vuota con `n_days>0` → None +- `n_days=1`, 96 tick → P25 sui 96 +- Mix di cadenze (30 daily + 96 live) → percentile uno-a-uno - Floor binding: P25=0.5, floor=3 → 3 - Floor non binding: P25=5, floor=0 → 5 - Percentile diverso: percentile=0.5 → mediana -- Boundary: esattamente min_days → window = min_days +- Validation: percentile ∉ [0,1] o `n_days<0` → ValueError + +### 9.1bis Unit — `state/repository.py` + +- `count_iv_rv_distinct_days`: 1 giorno → 1; 3 giorni misti → 3 +- esclusione asset diversi, NULL e fetch_ok=0 +- rispetto del cutoff `max_days` +- ValueError su `as_of` naive o `max_days≤0` +- `iv_rv_values_for_window`: ordine ASC, filtri equivalenti, ValueError input ### 9.2 Unit — `core/entry_validator.py` diff --git a/scripts/backfill_iv_rv.py b/scripts/backfill_iv_rv.py new file mode 100644 index 0000000..0b8abeb --- /dev/null +++ b/scripts/backfill_iv_rv.py @@ -0,0 +1,281 @@ +"""Backfill IV-RV history from Deribit public REST API. + +Use case: il gate IV-RV adattivo richiede ≥30 giorni di storia per +attivarsi (spec ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``). +Quando la pipeline ha pochi giorni di tick live, questo script popola +``market_snapshots`` con record giornalieri storici calcolati da +DVOL Deribit + closes ETH-PERPETUAL/BTC-PERPETUAL pubblici. + +Idempotente: usa ``INSERT OR REPLACE`` sulla PK ``(timestamp, asset)`` +con timestamp fissato a 12:00 UTC del giorno di calendario. +``fetch_errors_json='{"backfill":true}'`` permette di distinguere i +record sintetici dai tick live in audit. + +I record contribuiscono al gate adattivo come singoli punti +(distinct-days policy), uno per giorno: lo statistical bias è coperto +dalla spec §4.1. + +Esempio: + python scripts/backfill_iv_rv.py --db data/state.sqlite --days 45 +""" + +from __future__ import annotations + +import argparse +import json +import math +import sqlite3 +import statistics +import urllib.request +from dataclasses import dataclass +from datetime import UTC, date, datetime, timedelta +from decimal import Decimal + +__all__ = [ + "BackfillRow", + "build_backfill_records", + "compute_rv30d_annualized", +] + + +_DERIBIT = "https://www.deribit.com/api/v2/public" +_RV_LOOKBACK_DAYS = 30 +_TRADING_DAYS_PER_YEAR = 365 + + +@dataclass(frozen=True) +class BackfillRow: + """Una riga sintetica destinata a ``market_snapshots``.""" + + timestamp: datetime + asset: str + spot: Decimal + dvol: Decimal + realized_vol_30d: Decimal + iv_minus_rv: Decimal + fetch_ok: bool = True + + +# --------------------------------------------------------------------------- +# Pure compute layer (TDD: tests/unit/test_backfill_iv_rv.py) +# --------------------------------------------------------------------------- + + +def compute_rv30d_annualized(closes: list[Decimal]) -> Decimal: + """Volatilità realizzata 30g annualizzata in **punti vol** (% annuali). + + Args: + closes: ``31`` close consecutivi (uno al giorno) — produce 30 + log-returns. + + Returns: + ``stdev(log_returns) * sqrt(365) * 100`` come ``Decimal``. + + Raises: + ValueError: se ``len(closes) < 31``. + """ + if len(closes) < _RV_LOOKBACK_DAYS + 1: + raise ValueError( + f"need at least {_RV_LOOKBACK_DAYS + 1} closes, got {len(closes)}" + ) + log_returns = [ + math.log(float(closes[i] / closes[i - 1])) + for i in range(1, _RV_LOOKBACK_DAYS + 1) + ] + sigma_daily = statistics.stdev(log_returns) + annualized = sigma_daily * math.sqrt(_TRADING_DAYS_PER_YEAR) * 100.0 + return Decimal(str(annualized)) + + +def build_backfill_records( + *, + asset: str, + spots_by_day: dict[str, Decimal], + dvols_by_day: dict[str, Decimal], + oldest_day: date, +) -> list[BackfillRow]: + """Compone le righe di backfill per i giorni nella finestra richiesta. + + Per ogni giorno target ``D`` (da ``oldest_day`` a oggi compreso) la + riga viene emessa solo se: (a) DVOL e spot sono presenti per ``D``, + (b) la serie di spot dispone dei 30 giorni precedenti necessari per + il calcolo di RV30d. + + Il timestamp è fissato a 12:00 UTC, scelta che evita il rollover + delle candele Deribit (vedi anomalia DVOL 00:00 UTC nei market + snapshots live). + """ + sorted_days = sorted(spots_by_day.keys()) + records: list[BackfillRow] = [] + for day_str in sorted_days: + day = date.fromisoformat(day_str) + if day < oldest_day: + continue + if day_str not in dvols_by_day: + continue + rv_window = [ + day - timedelta(days=i) for i in range(_RV_LOOKBACK_DAYS, -1, -1) + ] + if not all(d.isoformat() in spots_by_day for d in rv_window): + continue + closes = [spots_by_day[d.isoformat()] for d in rv_window] + rv = compute_rv30d_annualized(closes) + dvol = dvols_by_day[day_str] + spot = spots_by_day[day_str] + records.append( + BackfillRow( + timestamp=datetime(day.year, day.month, day.day, 12, 0, tzinfo=UTC), + asset=asset, + spot=spot, + dvol=dvol, + realized_vol_30d=rv, + iv_minus_rv=dvol - rv, + ) + ) + return records + + +# --------------------------------------------------------------------------- +# I/O layer (network + sqlite) +# --------------------------------------------------------------------------- + + +def _http_get_json(url: str, timeout_s: float = 30.0) -> dict: + with urllib.request.urlopen(url, timeout=timeout_s) as resp: + return json.loads(resp.read()) + + +def fetch_dvol_daily(currency: str, days: int) -> dict[str, Decimal]: + """Mappa ``YYYY-MM-DD -> DVOL close`` per gli ultimi ``days`` giorni.""" + end_ms = int(datetime.now(UTC).timestamp() * 1000) + start_ms = end_ms - days * 86_400_000 + url = ( + f"{_DERIBIT}/get_volatility_index_data" + f"?currency={currency}" + f"&start_timestamp={start_ms}&end_timestamp={end_ms}" + f"&resolution=86400" + ) + payload = _http_get_json(url) + data = (payload.get("result") or {}).get("data") or [] + out: dict[str, Decimal] = {} + for row in data: + # row = [ts_ms, open, high, low, close] + if not isinstance(row, list) or len(row) < 5: + continue + ts = datetime.fromtimestamp(row[0] / 1000, tz=UTC).date().isoformat() + out[ts] = Decimal(str(row[4])) + return out + + +def fetch_spot_daily(instrument: str, days: int) -> dict[str, Decimal]: + """Mappa ``YYYY-MM-DD -> close USD`` per ``instrument`` su ``days`` giorni.""" + end_ms = int(datetime.now(UTC).timestamp() * 1000) + start_ms = end_ms - days * 86_400_000 + url = ( + f"{_DERIBIT}/get_tradingview_chart_data" + f"?instrument_name={instrument}" + f"&start_timestamp={start_ms}&end_timestamp={end_ms}" + f"&resolution=1D" + ) + payload = _http_get_json(url) + result = payload.get("result") or {} + ticks = result.get("ticks") or [] + closes = result.get("close") or [] + out: dict[str, Decimal] = {} + for ts_ms, close in zip(ticks, closes, strict=False): + ts = datetime.fromtimestamp(ts_ms / 1000, tz=UTC).date().isoformat() + out[ts] = Decimal(str(close)) + return out + + +def write_records(db_path: str, records: list[BackfillRow]) -> int: + """Insert/replace dei record in market_snapshots. Ritorna la rowcount.""" + if not records: + return 0 + conn = sqlite3.connect(db_path) + try: + with conn: + for r in records: + conn.execute( + "INSERT OR REPLACE INTO market_snapshots (" + "timestamp, asset, spot, dvol, realized_vol_30d, iv_minus_rv, " + "funding_perp_annualized, funding_cross_annualized, " + "dealer_net_gamma, gamma_flip_level, oi_delta_pct_4h, " + "liquidation_long_risk, liquidation_short_risk, " + "macro_days_to_event, fetch_ok, fetch_errors_json" + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + ( + r.timestamp.astimezone(UTC).isoformat(), + r.asset, + str(r.spot), + str(r.dvol), + str(r.realized_vol_30d), + str(r.iv_minus_rv), + None, + None, + None, + None, + None, + None, + None, + None, + 1 if r.fetch_ok else 0, + '{"backfill":true}', + ), + ) + return len(records) + finally: + conn.close() + + +def backfill_asset(db_path: str, asset: str, days: int) -> int: + """Esegue l'intero backfill per ``asset`` e ritorna il numero di + record inseriti/sostituiti. + """ + instrument = f"{asset.upper()}-PERPETUAL" + fetch_window_days = days + _RV_LOOKBACK_DAYS + 5 # margine per il lookback RV + spots = fetch_spot_daily(instrument, fetch_window_days) + dvols = fetch_dvol_daily(asset.upper(), fetch_window_days) + today = datetime.now(UTC).date() + oldest = today - timedelta(days=days) + records = build_backfill_records( + asset=asset.upper(), + spots_by_day=spots, + dvols_by_day=dvols, + oldest_day=oldest, + ) + return write_records(db_path, records) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--db", + default="data/state.sqlite", + help="path a state.sqlite (default: data/state.sqlite)", + ) + parser.add_argument( + "--days", + type=int, + default=45, + help="quanti giorni di backfill emettere (default: 45)", + ) + parser.add_argument( + "--assets", + nargs="+", + default=["ETH", "BTC"], + help="asset symbols (default: ETH BTC)", + ) + args = parser.parse_args() + + total = 0 + for asset in args.assets: + n = backfill_asset(args.db, asset, args.days) + print(f"{asset}: inserted/replaced {n} backfill rows") + total += n + print(f"TOTAL: {total}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/cerbero_bite/core/adaptive_threshold.py b/src/cerbero_bite/core/adaptive_threshold.py index 0a77133..a0915d4 100644 --- a/src/cerbero_bite/core/adaptive_threshold.py +++ b/src/cerbero_bite/core/adaptive_threshold.py @@ -2,8 +2,13 @@ Spec: ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``. -Deterministic, no I/O. La query del repository è effettuata dal caller -(``runtime/entry_cycle``) prima di chiamare questa funzione. +Deterministic, no I/O. La selezione della finestra (target_days vs +min_days vs intera storia disponibile) è responsabilità del caller, che +interroga il repository con i parametri corretti e passa qui sia i +valori (``history``) sia il numero di giorni distinti coperti +(``n_days``). Questo permette di mischiare cadenze diverse — tick live a +15 min e backfill daily — senza assumere un fattore costante +``ticks_per_day``. """ from __future__ import annotations @@ -13,59 +18,43 @@ from decimal import Decimal __all__ = ["compute_adaptive_threshold"] -_TICKS_PER_DAY = 96 # cron */15 → 4 tick/h × 24h - def compute_adaptive_threshold( history: Sequence[Decimal], *, + n_days: int, percentile: Decimal, absolute_floor: Decimal, - min_days: int, - target_days: int, ) -> Decimal | None: """Ritorna la soglia adattiva o ``None`` durante il warmup hard. Args: - history: Sequenza ordinata ASC dei valori IV-RV (un valore per - ogni tick disponibile, max ``target_days * 96``). NULL e - tick non riusciti devono essere già stati filtrati dal - caller. + history: Sequenza dei valori IV-RV nella finestra scelta dal + caller. NULL e tick non riusciti devono essere già stati + filtrati upstream. L'ordine non è significativo per il + percentile. + n_days: Numero di giorni distinti coperti dalla storia + disponibile (calcolato dal caller, tipicamente con + ``COUNT(DISTINCT date(timestamp))``). ``0`` → warmup hard. percentile: Quantile target nella distribuzione (es. ``0.25``). absolute_floor: Floor minimo applicato dopo il calcolo del percentile. La soglia restituita è ``max(P_q, absolute_floor)``. - min_days: Sotto questa soglia di giorni di storia, la finestra - usata è "tutta la storia disponibile". Sopra, la finestra è - fissa a ``min_days`` finché non si raggiunge ``target_days``. - target_days: Finestra finale stabile. Returns: - ``None`` se la storia è < 1 giorno (warmup hard, gate - disabilitato), altrimenti il percentile della finestra, + ``None`` se ``n_days == 0`` o ``history`` è vuota (warmup hard, + gate disabilitato), altrimenti il percentile della finestra bounded dal floor. """ if not (Decimal(0) <= percentile <= Decimal(1)): raise ValueError( f"percentile must be in [0, 1], got {percentile}" ) - if min_days <= 0 or target_days <= 0 or min_days >= target_days: - raise ValueError( - f"require 0 < min_days < target_days, " - f"got min_days={min_days}, target_days={target_days}" - ) - if not history: + if n_days < 0: + raise ValueError(f"n_days must be >= 0, got {n_days}") + if n_days == 0 or not history: return None - n_ticks = len(history) - if n_ticks < _TICKS_PER_DAY: - return None - if n_ticks >= target_days * _TICKS_PER_DAY: - window = history[-target_days * _TICKS_PER_DAY:] - elif n_ticks >= min_days * _TICKS_PER_DAY: - window = history[-min_days * _TICKS_PER_DAY:] - else: - window = list(history) - return max(_percentile(window, percentile), absolute_floor) + return max(_percentile(history, percentile), absolute_floor) def _percentile(values: Sequence[Decimal], q: Decimal) -> Decimal: diff --git a/src/cerbero_bite/core/entry_validator.py b/src/cerbero_bite/core/entry_validator.py index c9cd5ae..2ea2349 100644 --- a/src/cerbero_bite/core/entry_validator.py +++ b/src/cerbero_bite/core/entry_validator.py @@ -51,13 +51,19 @@ class EntryContext(BaseModel): # invalida l'entry). iv_minus_rv: Decimal | None = None - # Storia recente di IV-RV (un valore per ogni tick di - # market_snapshots, ASC, NULL e fetch_ok=0 esclusi). Caricata dal - # repository in `entry_cycle` quando `iv_minus_rv_adaptive_enabled` - # è True. Tuple per coerenza con frozen=True. Vuoto = warmup hard - # (gate disabilitato). + # Valori IV-RV nella finestra rolling già scelta dal caller + # (entry_cycle): tutti i record validi su window_days, ASC, NULL e + # fetch_ok=0 esclusi. Caricata dal repository quando + # `iv_minus_rv_adaptive_enabled` è True. Tuple per coerenza con + # frozen=True. iv_rv_history: tuple[Decimal, ...] = () + # Numero di giorni di calendario distinti coperti dalla storia + # IV-RV disponibile (non solo dalla finestra `iv_rv_history`). + # ``0`` = warmup hard, gate disabilitato (fail-open). Calcolato dal + # caller via `repository.count_iv_rv_distinct_days`. + iv_rv_n_days: int = 0 + # DVOL al tick più vicino a now - vol_of_vol_lookback_hours. # ``None`` = gap nel dato (es. cron mancante 24h fa) → VoV guard # skip. Caricato dal repository in `entry_cycle` quando @@ -162,10 +168,9 @@ def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision: if entry_cfg.iv_minus_rv_adaptive_enabled: threshold = compute_adaptive_threshold( history=ctx.iv_rv_history, + n_days=ctx.iv_rv_n_days, percentile=entry_cfg.iv_minus_rv_percentile, absolute_floor=entry_cfg.iv_minus_rv_min, - min_days=entry_cfg.iv_minus_rv_window_min_days, - target_days=entry_cfg.iv_minus_rv_window_target_days, ) if threshold is not None and ctx.iv_minus_rv < threshold: pct = int(entry_cfg.iv_minus_rv_percentile * 100) diff --git a/src/cerbero_bite/gui/pages/6_📐_Calibrazione.py b/src/cerbero_bite/gui/pages/6_📐_Calibrazione.py index 02a5ee5..cdd671a 100644 --- a/src/cerbero_bite/gui/pages/6_📐_Calibrazione.py +++ b/src/cerbero_bite/gui/pages/6_📐_Calibrazione.py @@ -267,24 +267,26 @@ def _render_adaptive_gate_panel( # records DESC (newest first) → history ASC con NULL/fetch_ok=0 esclusi iv_rv_history: list[Decimal] = [] + distinct_days: set[str] = set() for r in reversed(records): if r.fetch_ok and r.iv_minus_rv is not None: iv_rv_history.append(r.iv_minus_rv) + distinct_days.add(r.timestamp.date().isoformat()) n_ticks = len(iv_rv_history) + n_days = len(distinct_days) target = int(getattr(entry, "iv_minus_rv_window_target_days", 60)) min_days = int(getattr(entry, "iv_minus_rv_window_min_days", 30)) - n_days = n_ticks // 96 if n_days < 1: - status = f"🟡 Warmup hard ({n_ticks}/96 tick)" + status = "🟡 Warmup hard (nessun giorno coperto)" elif n_days < min_days: status = f"🟡 Warmup ({n_days}/{min_days}g — finestra crescente)" elif n_days < target: status = f"🟢 Attivo (finestra {min_days}g, target {target}g)" else: status = f"🟢 Attivo (finestra stabile {target}g)" - st.markdown(f"**Status:** {status}") + st.markdown(f"**Status:** {status} · {n_ticks} tick complessivi") # Latest tick iv_rv_now: Decimal | None = None @@ -307,10 +309,9 @@ def _render_adaptive_gate_panel( try: threshold = compute_adaptive_threshold( history=iv_rv_history, + n_days=n_days, percentile=percentile, absolute_floor=floor, - min_days=min_days, - target_days=target, ) except ValueError as exc: st.warning(f"Configurazione gate non valida: {exc}") diff --git a/src/cerbero_bite/runtime/entry_cycle.py b/src/cerbero_bite/runtime/entry_cycle.py index d96479e..e81dc7d 100644 --- a/src/cerbero_bite/runtime/entry_cycle.py +++ b/src/cerbero_bite/runtime/entry_cycle.py @@ -316,33 +316,14 @@ async def _build_quotes( return out -def _audit_threshold( - entry_cfg: object, - iv_rv_history: tuple[Decimal, ...], -) -> str | None: - """Soglia P_q rolling effettivamente usata dal gate, per il decisions log.""" - if not getattr(entry_cfg, "iv_minus_rv_filter_enabled", False): - return None - if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False): - return str(getattr(entry_cfg, "iv_minus_rv_min", Decimal("0"))) - threshold = compute_adaptive_threshold( - history=iv_rv_history, - percentile=entry_cfg.iv_minus_rv_percentile, # type: ignore[attr-defined] - absolute_floor=entry_cfg.iv_minus_rv_min, # type: ignore[attr-defined] - min_days=entry_cfg.iv_minus_rv_window_min_days, # type: ignore[attr-defined] - target_days=entry_cfg.iv_minus_rv_window_target_days, # type: ignore[attr-defined] - ) - return None if threshold is None else str(threshold) +def _select_window_days(entry_cfg: object, n_days: int) -> int: + """Sceglie la finestra in giorni per il gate adattivo dato n_days + disponibili. - -def _audit_window_days( - entry_cfg: object, - iv_rv_history: tuple[Decimal, ...], -) -> int | None: - """Numero di giorni effettivamente usati dalla finestra rolling.""" - if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False): - return None - n_days = len(iv_rv_history) // 96 + Spec: warmup hard se ``n_days == 0`` → 0; finestra ``target_days`` + se ``n_days >= target_days``; ``min_days`` se ``n_days >= min_days``; + altrimenti tutta la storia disponibile (capped a ``target_days``). + """ target = int(getattr(entry_cfg, "iv_minus_rv_window_target_days", 60)) min_days = int(getattr(entry_cfg, "iv_minus_rv_window_min_days", 30)) if n_days < 1: @@ -351,7 +332,33 @@ def _audit_window_days( return target if n_days >= min_days: return min_days - return n_days + return target # storia parziale: query fino a target, repository ne ritorna n_days + + +def _audit_threshold( + entry_cfg: object, + iv_rv_history: tuple[Decimal, ...], + n_days: int, +) -> str | None: + """Soglia P_q rolling effettivamente usata dal gate, per il decisions log.""" + if not getattr(entry_cfg, "iv_minus_rv_filter_enabled", False): + return None + if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False): + return str(getattr(entry_cfg, "iv_minus_rv_min", Decimal("0"))) + threshold = compute_adaptive_threshold( + history=iv_rv_history, + n_days=n_days, + percentile=entry_cfg.iv_minus_rv_percentile, # type: ignore[attr-defined] + absolute_floor=entry_cfg.iv_minus_rv_min, # type: ignore[attr-defined] + ) + return None if threshold is None else str(threshold) + + +def _audit_window_days(entry_cfg: object, n_days: int) -> int | None: + """Numero di giorni effettivamente usati dalla finestra rolling.""" + if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False): + return None + return _select_window_days(entry_cfg, n_days) def _max_loss_per_contract_usd(short_strike: Decimal, long_strike: Decimal) -> Decimal: @@ -472,18 +479,27 @@ async def run_entry_cycle( asset = cfg.asset.symbol iv_rv_history: tuple[Decimal, ...] = () + iv_rv_n_days: int = 0 dvol_24h_ago: Decimal | None = None if entry_cfg.iv_minus_rv_filter_enabled and entry_cfg.iv_minus_rv_adaptive_enabled: conn = connect_state(ctx.db_path) try: - iv_rv_history = tuple( - ctx.repository.iv_rv_history( - conn, - asset=asset, - max_days=entry_cfg.iv_minus_rv_window_target_days, - as_of=when, - ) + iv_rv_n_days = ctx.repository.count_iv_rv_distinct_days( + conn, + asset=asset, + max_days=entry_cfg.iv_minus_rv_window_target_days, + as_of=when, ) + window_days = _select_window_days(entry_cfg, iv_rv_n_days) + if window_days > 0: + iv_rv_history = tuple( + ctx.repository.iv_rv_values_for_window( + conn, + asset=asset, + window_days=window_days, + as_of=when, + ) + ) finally: conn.close() if entry_cfg.vol_of_vol_guard_enabled: @@ -508,6 +524,7 @@ async def run_entry_cycle( iv_minus_rv=snap.iv_minus_rv, liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high, iv_rv_history=iv_rv_history, + iv_rv_n_days=iv_rv_n_days, dvol_24h_ago=dvol_24h_ago, ) decision = validate_entry(entry_ctx, cfg) @@ -529,9 +546,12 @@ async def run_entry_cycle( str(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None ), "iv_rv_history_n": len(iv_rv_history), - "iv_rv_threshold_used": _audit_threshold(entry_cfg, iv_rv_history), + "iv_rv_n_days": iv_rv_n_days, + "iv_rv_threshold_used": _audit_threshold( + entry_cfg, iv_rv_history, iv_rv_n_days + ), "iv_rv_window_used_days": _audit_window_days( - entry_cfg, iv_rv_history + entry_cfg, iv_rv_n_days ), "dvol_24h_ago": ( str(dvol_24h_ago) if dvol_24h_ago is not None else None diff --git a/src/cerbero_bite/state/repository.py b/src/cerbero_bite/state/repository.py index acf7353..5f62c05 100644 --- a/src/cerbero_bite/state/repository.py +++ b/src/cerbero_bite/state/repository.py @@ -408,22 +408,23 @@ class Repository: ).fetchall() return [_row_to_market_snapshot(r) for r in rows] - def iv_rv_history( + def count_iv_rv_distinct_days( self, conn: sqlite3.Connection, *, asset: str, max_days: int, as_of: datetime | None = None, - ) -> list[Decimal]: - """Lista IV-RV ordinata ASC sull'intervallo `[as_of - max_days, as_of]`. + ) -> int: + """Numero di giorni di calendario distinti coperti da IV-RV validi. Esclude righe con ``fetch_ok=0`` o ``iv_minus_rv IS NULL``. - Usata dal validator quando il gate adattivo è abilitato. + Usato dal caller del gate adattivo per decidere la finestra + (warmup hard / min_days / target_days). Args: as_of: Reference time for the rolling window. Defaults to - ``datetime.now(UTC)``. Tests can pin a fixed value. + ``datetime.now(UTC)``. """ if max_days <= 0: raise ValueError(f"max_days must be positive, got {max_days}") @@ -431,6 +432,39 @@ class Repository: if ref.tzinfo is None: raise ValueError("as_of must be timezone-aware") cutoff = ref - timedelta(days=max_days) + row = conn.execute( + "SELECT COUNT(DISTINCT substr(timestamp, 1, 10)) AS n " + "FROM market_snapshots " + "WHERE asset = ? " + " AND fetch_ok = 1 " + " AND iv_minus_rv IS NOT NULL " + " AND timestamp >= ?", + (asset, _enc_dt(cutoff)), + ).fetchone() + return int(row["n"]) if row is not None else 0 + + def iv_rv_values_for_window( + self, + conn: sqlite3.Connection, + *, + asset: str, + window_days: int, + as_of: datetime | None = None, + ) -> list[Decimal]: + """Valori IV-RV ordinati ASC su ``[as_of - window_days, as_of]``. + + Esclude righe con ``fetch_ok=0`` o ``iv_minus_rv IS NULL``. + Tutti i record validi della finestra concorrono come singoli + contributi alla statistica del percentile, indipendentemente + dalla cadenza con cui sono stati raccolti (tick live vs backfill + daily). + """ + if window_days <= 0: + raise ValueError(f"window_days must be positive, got {window_days}") + ref = as_of if as_of is not None else datetime.now(UTC) + if ref.tzinfo is None: + raise ValueError("as_of must be timezone-aware") + cutoff = ref - timedelta(days=window_days) rows = conn.execute( "SELECT iv_minus_rv FROM market_snapshots " "WHERE asset = ? " diff --git a/tests/integration/test_entry_cycle_iv_rv_adaptive.py b/tests/integration/test_entry_cycle_iv_rv_adaptive.py index 5f21cec..0d20270 100644 --- a/tests/integration/test_entry_cycle_iv_rv_adaptive.py +++ b/tests/integration/test_entry_cycle_iv_rv_adaptive.py @@ -1,4 +1,8 @@ -"""End-to-end test del gate IV-RV adattivo + Vol-of-Vol guard via Repository.""" +"""End-to-end test del gate IV-RV adattivo + Vol-of-Vol guard via Repository. + +Verifica che la nuova API distinct-days componga correttamente repository +helpers + ``compute_adaptive_threshold``. +""" from __future__ import annotations @@ -7,6 +11,7 @@ from decimal import Decimal import pytest +from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold from cerbero_bite.state.db import connect, run_migrations from cerbero_bite.state.models import MarketSnapshotRecord from cerbero_bite.state.repository import Repository @@ -67,26 +72,49 @@ def db_30d(tmp_path): return conn, repo -def test_iv_rv_history_p25_picks_up_recent_regime(db_30d) -> None: - """Sanity: con bimodale 1.0/5.0 e finestra 30g, P25 di tutta la - storia è 1.0 (il 25° centile è ancora nella metà bassa).""" +def test_distinct_days_count_matches_calendar_days(db_30d) -> None: + """30 giorni di calendario seedati → COUNT DISTINCT = 30.""" conn, repo = db_30d - history = repo.iv_rv_history( + n = repo.count_iv_rv_distinct_days( conn, asset="ETH", max_days=60, as_of=datetime(2026, 5, 1, 0, 0, tzinfo=UTC), ) - assert len(history) == 2880 - from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold + assert n == 30 + +def test_window_values_returned_for_full_history(db_30d) -> None: + conn, repo = db_30d + values = repo.iv_rv_values_for_window( + conn, + asset="ETH", + window_days=60, + as_of=datetime(2026, 5, 1, 0, 0, tzinfo=UTC), + ) + assert len(values) == 2880 + # Bimodale: 1440 valori 1.0 e 1440 valori 5.0 + assert sum(1 for v in values if v == Decimal("1.0")) == 1440 + assert sum(1 for v in values if v == Decimal("5.0")) == 1440 + + +def test_p25_of_bimodal_history_picks_low_regime(db_30d) -> None: + """Comporre repository + adaptive_threshold come fa entry_cycle.""" + conn, repo = db_30d + as_of = datetime(2026, 5, 1, 0, 0, tzinfo=UTC) + n_days = repo.count_iv_rv_distinct_days( + conn, asset="ETH", max_days=60, as_of=as_of + ) + values = repo.iv_rv_values_for_window( + conn, asset="ETH", window_days=60, as_of=as_of + ) threshold = compute_adaptive_threshold( - history=history, + history=values, + n_days=n_days, percentile=Decimal("0.25"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) + # P25 di 2880 valori bimodali: 1440 ×1.0, 1440 ×5.0 → soglia = 1.0 assert threshold == Decimal("1.0") diff --git a/tests/unit/test_adaptive_threshold.py b/tests/unit/test_adaptive_threshold.py index 719a56a..f5b37e9 100644 --- a/tests/unit/test_adaptive_threshold.py +++ b/tests/unit/test_adaptive_threshold.py @@ -1,6 +1,12 @@ """TDD per :mod:`cerbero_bite.core.adaptive_threshold`. Spec: ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``. + +La funzione è una pura statistica: riceve già la finestra di valori scelta +dal caller e il numero di giorni distinti coperti dalla storia disponibile +(``n_days``), e restituisce ``max(percentile, floor)`` o ``None`` durante +il warmup hard. La selezione della finestra (target_days vs min_days vs +intera storia) è responsabilità del caller (repository + entry_cycle). """ from __future__ import annotations @@ -11,107 +17,109 @@ import pytest from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold - # --------------------------------------------------------------------------- -# Warmup +# Warmup hard: nessun giorno disponibile # --------------------------------------------------------------------------- -def test_empty_history_returns_none() -> None: +def test_n_days_zero_returns_none() -> None: + """Storia vuota o nessun giorno coperto → warmup hard.""" out = compute_adaptive_threshold( history=[], + n_days=0, percentile=Decimal("0.25"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) assert out is None -def test_history_under_one_day_returns_none() -> None: +def test_n_days_zero_with_values_still_returns_none() -> None: + """Difensivo: se il caller passa n_days=0 ma valori non vuoti, warmup + hard vince comunque (gate disabilitato).""" out = compute_adaptive_threshold( - history=[Decimal("3")] * 50, # 50 tick < 96 + history=[Decimal("3")] * 10, + n_days=0, percentile=Decimal("0.25"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) assert out is None -def test_history_exactly_one_day_returns_percentile() -> None: +def test_empty_history_with_positive_n_days_returns_none() -> None: + """Difensivo: history vuota anche con n_days>0 → None.""" + out = compute_adaptive_threshold( + history=[], + n_days=5, + percentile=Decimal("0.25"), + absolute_floor=Decimal("0"), + ) + assert out is None + + +# --------------------------------------------------------------------------- +# Calcolo percentile sulla finestra ricevuta +# --------------------------------------------------------------------------- + + +def test_n_days_one_returns_percentile_of_history() -> None: + """Singolo giorno con tick a 15 min (96 valori): P25 standard.""" history = [Decimal(i) / Decimal("10") for i in range(96)] # 0.0..9.5 out = compute_adaptive_threshold( history=history, + n_days=1, percentile=Decimal("0.25"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) # P25 di [0.0..9.5] passo 0.1 con method='linear': k=23.75, val ≈ 2.375 assert out is not None assert Decimal("2.3") < out < Decimal("2.5") -def _ramp(n: int, base: Decimal = Decimal("1")) -> list[Decimal]: - """Ramp lineare 1, 2, 3, ... per testare in modo predicibile il P25.""" - return [base * Decimal(i + 1) for i in range(n)] - - -def test_below_min_days_uses_full_history() -> None: - # 5 giorni di storia (5*96=480 tick), min_days=30, target=60. - # Window = full history. - history = _ramp(480) +def test_window_chosen_by_caller_is_used_verbatim() -> None: + """La funzione NON fa slicing: usa esattamente la history ricevuta.""" + history = [Decimal(i) for i in range(1, 201)] # 1..200 out = compute_adaptive_threshold( history=history, - percentile=Decimal("0.25"), + n_days=30, + percentile=Decimal("0.5"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) - # P25 di ramp [1..480] = ~120.75 (k=479*0.25=119.75, ramp[119]=120, ramp[120]=121) + # P50 di [1..200] = (200+1)/2 = 100.5 assert out is not None - assert Decimal("120") <= out <= Decimal("121") + assert Decimal("100") <= out <= Decimal("101") -def test_between_min_and_target_uses_min_window() -> None: - # 50 giorni di storia (4800 tick), min_days=30, target=60. Window = ultimi 30g. - history = _ramp(4800) # values 1..4800 +def test_mixed_cadence_window_no_special_treatment() -> None: + """Mix di valori (es. backfill daily + tick live) trattato come una + distribuzione qualunque: il caller ha già scelto la finestra; la + funzione calcola il percentile sui valori ricevuti uno-a-uno.""" + # 30 valori "daily backfill" (uno per giorno) + 96 tick "live" + history = [Decimal("5")] * 30 + [Decimal("8")] * 96 out = compute_adaptive_threshold( history=history, + n_days=31, percentile=Decimal("0.25"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) - # Window = ultimi 30*96=2880, valori 1921..4800, P25 ≈ 2640 + # Sorted: 30 ×5, 96 ×8. P25 a indice 0.25*125 = 31.25 → tra 5 e 8. + # NumPy linear: sorted_v[31]=8, sorted_v[32]=8 → 8. + # Verifica solo l'estremo superiore della famiglia di valori sorted. assert out is not None - assert Decimal("2630") <= out <= Decimal("2650") + assert out in (Decimal("5"), Decimal("8")) -def test_above_target_uses_target_window() -> None: - # 100 giorni (9600 tick), target=60. Window = ultimi 60g. - history = _ramp(9600) # values 1..9600 - out = compute_adaptive_threshold( - history=history, - percentile=Decimal("0.25"), - absolute_floor=Decimal("0"), - min_days=30, - target_days=60, - ) - # Window = ultimi 5760, valori 3841..9600, P25 ≈ 5280 - assert out is not None - assert Decimal("5270") <= out <= Decimal("5290") +# --------------------------------------------------------------------------- +# Floor binding +# --------------------------------------------------------------------------- def test_floor_binding_overrides_low_percentile() -> None: history = [Decimal("0.5")] * 200 out = compute_adaptive_threshold( history=history, + n_days=30, percentile=Decimal("0.25"), absolute_floor=Decimal("3"), - min_days=30, - target_days=60, ) assert out == Decimal("3") @@ -120,58 +128,13 @@ def test_floor_not_binding_returns_percentile() -> None: history = [Decimal("5")] * 200 out = compute_adaptive_threshold( history=history, + n_days=30, percentile=Decimal("0.25"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) assert out == Decimal("5") -def test_median_percentile_returns_p50() -> None: - history = _ramp(200) - out = compute_adaptive_threshold( - history=history, - percentile=Decimal("0.5"), - absolute_floor=Decimal("0"), - min_days=30, - target_days=60, - ) - # P50 di [1..200] = (200+1)/2 = 100.5 - assert out is not None - assert Decimal("100") <= out <= Decimal("101") - - -def test_exactly_min_days_uses_min_window() -> None: - """Boundary: history == min_days*96 → window is min_days (per spec 9.1 item 9).""" - history = _ramp(30 * 96) # exactly 2880 ticks - out = compute_adaptive_threshold( - history=history, - percentile=Decimal("0.25"), - absolute_floor=Decimal("0"), - min_days=30, - target_days=60, - ) - # Window = last 2880 = all history; P25 of ramp [1..2880] ≈ 720.75 - assert out is not None - assert Decimal("720") <= out <= Decimal("721") - - -def test_exactly_target_days_uses_target_window() -> None: - """Boundary: history == target_days*96 → window is target_days.""" - history = _ramp(60 * 96) # exactly 5760 ticks - out = compute_adaptive_threshold( - history=history, - percentile=Decimal("0.25"), - absolute_floor=Decimal("0"), - min_days=30, - target_days=60, - ) - # Window = last 5760 = all history; P25 of ramp [1..5760] ≈ 1440.75 - assert out is not None - assert Decimal("1440") <= out <= Decimal("1441") - - # --------------------------------------------------------------------------- # Input validation # --------------------------------------------------------------------------- @@ -181,10 +144,9 @@ def test_invalid_percentile_above_one_raises() -> None: with pytest.raises(ValueError, match="percentile must be in"): compute_adaptive_threshold( history=[Decimal("1")] * 200, + n_days=10, percentile=Decimal("1.5"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) @@ -192,30 +154,17 @@ def test_invalid_percentile_negative_raises() -> None: with pytest.raises(ValueError, match="percentile must be in"): compute_adaptive_threshold( history=[Decimal("1")] * 200, + n_days=10, percentile=Decimal("-0.1"), absolute_floor=Decimal("0"), - min_days=30, - target_days=60, ) -def test_invalid_window_inverted_raises() -> None: - with pytest.raises(ValueError, match="min_days < target_days"): +def test_invalid_negative_n_days_raises() -> None: + with pytest.raises(ValueError, match="n_days must be >= 0"): compute_adaptive_threshold( - history=[Decimal("1")] * 200, + history=[Decimal("1")] * 10, + n_days=-1, percentile=Decimal("0.25"), absolute_floor=Decimal("0"), - min_days=60, - target_days=30, - ) - - -def test_invalid_window_zero_raises() -> None: - with pytest.raises(ValueError, match="min_days < target_days"): - compute_adaptive_threshold( - history=[Decimal("1")] * 200, - percentile=Decimal("0.25"), - absolute_floor=Decimal("0"), - min_days=0, - target_days=60, ) diff --git a/tests/unit/test_backfill_iv_rv.py b/tests/unit/test_backfill_iv_rv.py new file mode 100644 index 0000000..4204e41 --- /dev/null +++ b/tests/unit/test_backfill_iv_rv.py @@ -0,0 +1,176 @@ +"""TDD per il backfill IV-RV (``scripts/backfill_iv_rv.py``). + +Testa solo la parte pura (compute RV + assemblaggio record). I/O HTTP +e SQLite restano nel main del CLI: testati manualmente al deploy. +""" + +from __future__ import annotations + +import importlib.util +import sys +from datetime import UTC, date, datetime, timedelta +from decimal import Decimal +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parents[2] + + +def _load_backfill_module() -> object: + """Load scripts/backfill_iv_rv.py as a module without polluting sys.path.""" + spec = importlib.util.spec_from_file_location( + "_cerbero_bite_backfill_iv_rv", REPO_ROOT / "scripts" / "backfill_iv_rv.py" + ) + if spec is None or spec.loader is None: + raise RuntimeError("cannot load backfill_iv_rv module") + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +@pytest.fixture(scope="module") +def mod(): + return _load_backfill_module() + + +# --------------------------------------------------------------------------- +# compute_rv30d_annualized +# --------------------------------------------------------------------------- + + +def test_constant_prices_yield_zero_rv(mod) -> None: + closes = [Decimal("100")] * 31 # 30 returns of log(1)=0 + rv = mod.compute_rv30d_annualized(closes) + assert rv == Decimal("0") + + +def test_too_few_closes_raises(mod) -> None: + with pytest.raises(ValueError, match="need at least 31 closes"): + mod.compute_rv30d_annualized([Decimal("100")] * 10) + + +def test_monotonic_growth_yields_low_rv(mod) -> None: + """Crescita +1% ogni giorno: log returns costanti → stdev = 0 → RV = 0.""" + closes = [Decimal("100") * (Decimal("1.01") ** i) for i in range(31)] + rv = mod.compute_rv30d_annualized(closes) + # Tutti i log returns sono identici (log 1.01) → stdev zero + assert rv == Decimal("0") + + +def test_alternating_returns_yield_known_rv(mod) -> None: + """Returns alternati ±2% ogni giorno: stdev nota.""" + # closes: 100, 102, 100, 102, ... (ricorda: returns = log(c[i]/c[i-1])) + closes = [Decimal("100")] + [ + Decimal("102") if i % 2 == 0 else Decimal("100") for i in range(30) + ] + rv = mod.compute_rv30d_annualized(closes) + # |log return| ~ 0.0198, stdev ≈ 0.0198 (alternano segno con media ≈ 0) + # Annualized = 0.0198 * sqrt(365) * 100 ≈ 37.86 vol pts + assert Decimal("36") <= rv <= Decimal("40") + + +# --------------------------------------------------------------------------- +# build_backfill_records +# --------------------------------------------------------------------------- + + +def test_build_records_skips_days_without_30d_history(mod) -> None: + """Per i primi 30 giorni della serie spot, RV30d non è calcolabile.""" + today = date(2026, 5, 10) + days = [today - timedelta(days=i) for i in range(45)] + spots = {d.isoformat(): Decimal("100") for d in days} + dvols = {d.isoformat(): Decimal("50") for d in days} + + records = mod.build_backfill_records( + asset="ETH", + spots_by_day=spots, + dvols_by_day=dvols, + oldest_day=today - timedelta(days=40), + ) + # Per ogni record day, servono 30 giorni precedenti di spot. + # Lo spot più vecchio è today-44; quindi il primo giorno computabile + # è today-44+30 = today-14. Cap a oldest_day=today-40 → window day-14..day-0. + assert len(records) == 15 # day-14..day-0 incluso + for r in records: + assert r.asset == "ETH" + assert r.fetch_ok is True + assert r.iv_minus_rv == Decimal("50") # rv=0 con prezzi costanti + assert r.timestamp.tzinfo == UTC + assert r.timestamp.hour == 12 + + +def test_build_records_filters_to_requested_window(mod) -> None: + """oldest_day applicato come cutoff inferiore inclusivo.""" + today = date(2026, 5, 10) + days = [today - timedelta(days=i) for i in range(45)] + spots = {d.isoformat(): Decimal("100") for d in days} + dvols = {d.isoformat(): Decimal("50") for d in days} + + records = mod.build_backfill_records( + asset="BTC", + spots_by_day=spots, + dvols_by_day=dvols, + oldest_day=today - timedelta(days=5), + ) + # day-5..day-0 → 6 record + assert len(records) == 6 + record_days = {r.timestamp.date() for r in records} + assert record_days == {today - timedelta(days=i) for i in range(6)} + + +def test_build_records_skips_days_missing_dvol(mod) -> None: + """Se manca DVOL per un giorno della finestra, lo si salta (no record).""" + today = date(2026, 5, 10) + days = [today - timedelta(days=i) for i in range(45)] + spots = {d.isoformat(): Decimal("100") for d in days} + dvols = { + d.isoformat(): Decimal("50") + for d in days + if d != today - timedelta(days=2) + } + records = mod.build_backfill_records( + asset="ETH", + spots_by_day=spots, + dvols_by_day=dvols, + oldest_day=today - timedelta(days=5), + ) + record_days = {r.timestamp.date() for r in records} + assert today - timedelta(days=2) not in record_days + assert len(records) == 5 + + +def test_build_records_skips_days_missing_spot(mod) -> None: + """Se manca lo spot del giorno target, no record per quel giorno.""" + today = date(2026, 5, 10) + days = [today - timedelta(days=i) for i in range(45)] + spots = { + d.isoformat(): Decimal("100") + for d in days + if d != today - timedelta(days=2) + } + dvols = {d.isoformat(): Decimal("50") for d in days} + records = mod.build_backfill_records( + asset="ETH", + spots_by_day=spots, + dvols_by_day=dvols, + oldest_day=today - timedelta(days=5), + ) + record_days = {r.timestamp.date() for r in records} + assert today - timedelta(days=2) not in record_days + + +def test_build_records_uses_noon_utc_timestamp(mod) -> None: + today = date(2026, 5, 10) + days = [today - timedelta(days=i) for i in range(35)] + spots = {d.isoformat(): Decimal("100") for d in days} + dvols = {d.isoformat(): Decimal("50") for d in days} + records = mod.build_backfill_records( + asset="ETH", + spots_by_day=spots, + dvols_by_day=dvols, + oldest_day=today, + ) + assert len(records) == 1 + assert records[0].timestamp == datetime(2026, 5, 10, 12, 0, tzinfo=UTC) diff --git a/tests/unit/test_entry_validator.py b/tests/unit/test_entry_validator.py index b2e3f6c..960a8ef 100644 --- a/tests/unit/test_entry_validator.py +++ b/tests/unit/test_entry_validator.py @@ -376,7 +376,12 @@ def test_adaptive_pass_when_iv_rv_above_p25() -> None: cfg = _adaptive_cfg() history = tuple(Decimal(i) for i in range(1, 201)) decision = validate_entry( - _good_ctx(iv_minus_rv=Decimal("80"), iv_rv_history=history), cfg + _good_ctx( + iv_minus_rv=Decimal("80"), + iv_rv_history=history, + iv_rv_n_days=30, + ), + cfg, ) assert decision.accepted is True assert not any("IV richness" in r for r in decision.reasons) @@ -386,16 +391,27 @@ def test_adaptive_blocks_when_iv_rv_below_p25() -> None: cfg = _adaptive_cfg() history = tuple(Decimal(i) for i in range(1, 201)) decision = validate_entry( - _good_ctx(iv_minus_rv=Decimal("20"), iv_rv_history=history), cfg + _good_ctx( + iv_minus_rv=Decimal("20"), + iv_rv_history=history, + iv_rv_n_days=30, + ), + cfg, ) assert decision.accepted is False assert any("IV richness" in r and "rolling" in r for r in decision.reasons) -def test_adaptive_with_empty_history_passes_warmup() -> None: +def test_adaptive_with_n_days_zero_passes_warmup() -> None: + """Warmup hard: nessun giorno coperto → gate skip (fail-open).""" cfg = _adaptive_cfg() decision = validate_entry( - _good_ctx(iv_minus_rv=Decimal("0.1"), iv_rv_history=()), cfg + _good_ctx( + iv_minus_rv=Decimal("0.1"), + iv_rv_history=(), + iv_rv_n_days=0, + ), + cfg, ) assert decision.accepted is True @@ -404,7 +420,12 @@ def test_adaptive_with_floor_floor_binds_when_p25_low() -> None: cfg = _adaptive_cfg(iv_minus_rv_min=Decimal("3")) history = tuple(Decimal("0.5") for _ in range(200)) decision = validate_entry( - _good_ctx(iv_minus_rv=Decimal("1"), iv_rv_history=history), cfg + _good_ctx( + iv_minus_rv=Decimal("1"), + iv_rv_history=history, + iv_rv_n_days=30, + ), + cfg, ) assert decision.accepted is False assert any("IV richness" in r for r in decision.reasons) @@ -417,7 +438,8 @@ def test_legacy_static_gate_still_works_when_adaptive_disabled() -> None: "iv_minus_rv_min": Decimal("3"), }) decision = validate_entry( - _good_ctx(iv_minus_rv=Decimal("2"), iv_rv_history=()), cfg + _good_ctx(iv_minus_rv=Decimal("2"), iv_rv_history=(), iv_rv_n_days=0), + cfg, ) assert decision.accepted is False assert any("IV richness below floor" in r for r in decision.reasons) @@ -426,12 +448,45 @@ def test_legacy_static_gate_still_works_when_adaptive_disabled() -> None: def test_iv_minus_rv_none_skips_gate_in_both_modes() -> None: cfg = _adaptive_cfg() decision = validate_entry( - _good_ctx(iv_minus_rv=None, iv_rv_history=tuple(Decimal(i) for i in range(1, 201))), + _good_ctx( + iv_minus_rv=None, + iv_rv_history=tuple(Decimal(i) for i in range(1, 201)), + iv_rv_n_days=30, + ), cfg, ) assert decision.accepted is True +def test_adaptive_with_n_days_one_uses_history_for_percentile() -> None: + """Singolo giorno disponibile (cadenza qualunque): gate attivo, + soglia = P25 della finestra ricevuta. Dimostra che il warmup hard + finisce a n_days=1 (non 30 come nella vecchia implementazione).""" + cfg = _adaptive_cfg() + history = tuple(Decimal(i) for i in range(1, 101)) # 1..100, P25 = 25.75 + # IV-RV sopra P25 → pass + pass_decision = validate_entry( + _good_ctx( + iv_minus_rv=Decimal("30"), + iv_rv_history=history, + iv_rv_n_days=1, + ), + cfg, + ) + assert pass_decision.accepted is True + # IV-RV sotto P25 → block + block_decision = validate_entry( + _good_ctx( + iv_minus_rv=Decimal("10"), + iv_rv_history=history, + iv_rv_n_days=1, + ), + cfg, + ) + assert block_decision.accepted is False + assert any("IV richness" in r and "rolling" in r for r in block_decision.reasons) + + # --------------------------------------------------------------------------- # Vol-of-Vol guard # --------------------------------------------------------------------------- diff --git a/tests/unit/test_repository_iv_rv_helpers.py b/tests/unit/test_repository_iv_rv_helpers.py index 0f8a0ac..14f0701 100644 --- a/tests/unit/test_repository_iv_rv_helpers.py +++ b/tests/unit/test_repository_iv_rv_helpers.py @@ -1,4 +1,15 @@ -"""TDD per Repository.iv_rv_history e Repository.dvol_lookback.""" +"""TDD per i nuovi helper repository del gate IV-RV adattivo. + +Spec: distinct-days policy — il caller (entry_cycle) interroga il +numero di giorni coperti separatamente dai valori della finestra, +così che cadenze miste (tick live 15min + backfill daily) restino +statisticamente coerenti. + +Helpers: + * ``count_iv_rv_distinct_days(asset, max_days, as_of) -> int`` + * ``iv_rv_values_for_window(asset, window_days, as_of) -> list[Decimal]`` + * ``dvol_lookback`` (invariato, riusato dal Vol-of-Vol guard) +""" from __future__ import annotations @@ -13,178 +24,372 @@ from cerbero_bite.state.models import MarketSnapshotRecord from cerbero_bite.state.repository import Repository -@pytest.fixture -def db_with_history(tmp_path) -> sqlite3.Connection: - """SQLite temp con 96 tick ETH a 15min ciascuno (1 giorno) e fetch_ok=1.""" - db_path = tmp_path / "test.sqlite" - conn = connect(str(db_path)) - run_migrations(conn) +def _snap( + *, + ts: datetime, + asset: str = "ETH", + iv_minus_rv: Decimal | None = Decimal("2"), + fetch_ok: bool = True, + dvol: Decimal = Decimal("50"), +) -> MarketSnapshotRecord: + return MarketSnapshotRecord( + timestamp=ts, + asset=asset, + spot=Decimal("2000"), + dvol=dvol, + realized_vol_30d=Decimal("48"), + iv_minus_rv=iv_minus_rv, + funding_perp_annualized=Decimal("0"), + funding_cross_annualized=Decimal("0"), + dealer_net_gamma=Decimal("0"), + gamma_flip_level=None, + oi_delta_pct_4h=None, + liquidation_long_risk="low", + liquidation_short_risk="low", + macro_days_to_event=None, + fetch_ok=fetch_ok, + fetch_errors_json=None, + ) + +@pytest.fixture +def db_one_day(tmp_path) -> sqlite3.Connection: + """SQLite temp con 96 tick ETH a 15min (1 giorno) e fetch_ok=1.""" + conn = connect(str(tmp_path / "test.sqlite")) + run_migrations(conn) repo = Repository() base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC) for i in range(96): repo.record_market_snapshot( conn, - MarketSnapshotRecord( - timestamp=base + timedelta(minutes=15 * i), - asset="ETH", - spot=Decimal("2000"), - dvol=Decimal("50") + Decimal(i) / Decimal("10"), - realized_vol_30d=Decimal("48"), + _snap( + ts=base + timedelta(minutes=15 * i), iv_minus_rv=Decimal("2") + Decimal(i) / Decimal("100"), - funding_perp_annualized=Decimal("0"), - funding_cross_annualized=Decimal("0"), - dealer_net_gamma=Decimal("0"), - gamma_flip_level=None, - oi_delta_pct_4h=None, - liquidation_long_risk="low", - liquidation_short_risk="low", - macro_days_to_event=None, - fetch_ok=True, - fetch_errors_json=None, + dvol=Decimal("50") + Decimal(i) / Decimal("10"), ), ) conn.commit() return conn -def test_iv_rv_history_returns_ordered_asc(db_with_history) -> None: +@pytest.fixture +def db_three_days_mixed(tmp_path) -> sqlite3.Connection: + """SQLite temp con 3 giorni ETH: + - day1 (2026-05-01): 96 tick @ 15min, valori 1..96 + - day2 (2026-05-02): 1 record daily a 12:00, valore 100 (backfill style) + - day3 (2026-05-03): 4 tick orari, valori 200, 201, 202, 203 + Più 1 giorno BTC isolato (per cross-asset isolation). + """ + conn = connect(str(tmp_path / "test.sqlite")) + run_migrations(conn) repo = Repository() - history = repo.iv_rv_history( - db_with_history, + + day1 = datetime(2026, 5, 1, 0, 0, tzinfo=UTC) + for i in range(96): + repo.record_market_snapshot( + conn, + _snap( + ts=day1 + timedelta(minutes=15 * i), + iv_minus_rv=Decimal(i + 1), + ), + ) + repo.record_market_snapshot( + conn, + _snap(ts=datetime(2026, 5, 2, 12, 0, tzinfo=UTC), iv_minus_rv=Decimal("100")), + ) + day3 = datetime(2026, 5, 3, 0, 0, tzinfo=UTC) + for i in range(4): + repo.record_market_snapshot( + conn, + _snap( + ts=day3 + timedelta(hours=i), + iv_minus_rv=Decimal(200 + i), + ), + ) + repo.record_market_snapshot( + conn, + _snap( + ts=datetime(2026, 4, 30, 0, 0, tzinfo=UTC), + asset="BTC", + iv_minus_rv=Decimal("999"), + ), + ) + conn.commit() + return conn + + +# --------------------------------------------------------------------------- +# count_iv_rv_distinct_days +# --------------------------------------------------------------------------- + + +def test_count_distinct_days_returns_one_for_single_day_history(db_one_day) -> None: + repo = Repository() + n = repo.count_iv_rv_distinct_days( + db_one_day, asset="ETH", max_days=60, as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), ) - assert len(history) == 96 - assert history == sorted(history) - assert history[0] == Decimal("2.00") + assert n == 1 -def test_iv_rv_history_filters_other_asset(db_with_history) -> None: +def test_count_distinct_days_returns_zero_for_other_asset(db_one_day) -> None: repo = Repository() - history = repo.iv_rv_history( - db_with_history, + n = repo.count_iv_rv_distinct_days( + db_one_day, asset="BTC", max_days=60, as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), ) - assert history == [] + assert n == 0 -def test_iv_rv_history_skips_null_values(db_with_history) -> None: +def test_count_distinct_days_counts_unique_calendar_days( + db_three_days_mixed, +) -> None: repo = Repository() - repo.record_market_snapshot( - db_with_history, - MarketSnapshotRecord( - timestamp=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), - asset="ETH", - spot=Decimal("2000"), - dvol=Decimal("50"), - realized_vol_30d=None, - iv_minus_rv=None, - funding_perp_annualized=Decimal("0"), - funding_cross_annualized=Decimal("0"), - dealer_net_gamma=Decimal("0"), - gamma_flip_level=None, - oi_delta_pct_4h=None, - liquidation_long_risk="low", - liquidation_short_risk="low", - macro_days_to_event=None, - fetch_ok=True, - fetch_errors_json=None, - ), - ) - db_with_history.commit() - history = repo.iv_rv_history( - db_with_history, - asset="ETH", - max_days=60, - as_of=datetime(2026, 5, 3, 0, 0, tzinfo=UTC), - ) - assert len(history) == 96 - - -def test_iv_rv_history_skips_fetch_failed(db_with_history) -> None: - repo = Repository() - repo.record_market_snapshot( - db_with_history, - MarketSnapshotRecord( - timestamp=datetime(2026, 5, 3, 0, 0, tzinfo=UTC), - asset="ETH", - spot=Decimal("2000"), - dvol=Decimal("50"), - realized_vol_30d=None, - iv_minus_rv=Decimal("99"), - funding_perp_annualized=Decimal("0"), - funding_cross_annualized=Decimal("0"), - dealer_net_gamma=None, - gamma_flip_level=None, - oi_delta_pct_4h=None, - liquidation_long_risk=None, - liquidation_short_risk=None, - macro_days_to_event=None, - fetch_ok=False, - fetch_errors_json='{"x":"y"}', - ), - ) - db_with_history.commit() - history = repo.iv_rv_history( - db_with_history, + n = repo.count_iv_rv_distinct_days( + db_three_days_mixed, asset="ETH", max_days=60, as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC), ) - assert Decimal("99") not in history + assert n == 3 -def test_dvol_lookback_returns_closest_tick(db_with_history) -> None: +def test_count_distinct_days_excludes_other_assets( + db_three_days_mixed, +) -> None: repo = Repository() - base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC) - target = base + timedelta(hours=12) - out = repo.dvol_lookback( - db_with_history, asset="ETH", reference=target, tolerance_minutes=15 + n_btc = repo.count_iv_rv_distinct_days( + db_three_days_mixed, + asset="BTC", + max_days=60, + as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC), ) - # i=48 → dvol = 50 + 4.8 = 54.8 - assert out == Decimal("54.8") + assert n_btc == 1 -def test_dvol_lookback_returns_none_when_gap(db_with_history) -> None: +def test_count_distinct_days_respects_window_cutoff( + db_three_days_mixed, +) -> None: + """max_days=1 da as_of=2026-05-04 → cutoff=2026-05-03 → solo day3.""" repo = Repository() - target = datetime(2025, 1, 1, 0, 0, tzinfo=UTC) - out = repo.dvol_lookback( - db_with_history, asset="ETH", reference=target, tolerance_minutes=15 + n = repo.count_iv_rv_distinct_days( + db_three_days_mixed, + asset="ETH", + max_days=1, + as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC), ) - assert out is None + assert n == 1 -def test_iv_rv_history_rejects_non_positive_max_days(db_with_history) -> None: +def test_count_distinct_days_excludes_null_iv_rv(tmp_path) -> None: + conn = connect(str(tmp_path / "test.sqlite")) + run_migrations(conn) repo = Repository() - with pytest.raises(ValueError, match="max_days must be positive"): - repo.iv_rv_history( - db_with_history, - asset="ETH", - max_days=0, - as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), - ) + repo.record_market_snapshot( + conn, + _snap(ts=datetime(2026, 5, 1, 12, 0, tzinfo=UTC), iv_minus_rv=None), + ) + conn.commit() + n = repo.count_iv_rv_distinct_days( + conn, + asset="ETH", + max_days=60, + as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), + ) + assert n == 0 -def test_iv_rv_history_rejects_naive_as_of(db_with_history) -> None: +def test_count_distinct_days_excludes_fetch_failed(tmp_path) -> None: + conn = connect(str(tmp_path / "test.sqlite")) + run_migrations(conn) + repo = Repository() + repo.record_market_snapshot( + conn, + _snap( + ts=datetime(2026, 5, 1, 12, 0, tzinfo=UTC), + iv_minus_rv=Decimal("99"), + fetch_ok=False, + ), + ) + conn.commit() + n = repo.count_iv_rv_distinct_days( + conn, + asset="ETH", + max_days=60, + as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), + ) + assert n == 0 + + +def test_count_distinct_days_rejects_naive_as_of(db_one_day) -> None: repo = Repository() with pytest.raises(ValueError, match="timezone-aware"): - repo.iv_rv_history( - db_with_history, + repo.count_iv_rv_distinct_days( + db_one_day, asset="ETH", max_days=60, as_of=datetime(2026, 5, 2, 0, 0), # naive ) -def test_dvol_lookback_rejects_naive_reference(db_with_history) -> None: +def test_count_distinct_days_rejects_non_positive_max_days(db_one_day) -> None: + repo = Repository() + with pytest.raises(ValueError, match="max_days must be positive"): + repo.count_iv_rv_distinct_days( + db_one_day, + asset="ETH", + max_days=0, + as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), + ) + + +# --------------------------------------------------------------------------- +# iv_rv_values_for_window +# --------------------------------------------------------------------------- + + +def test_values_for_window_returns_ordered_asc(db_one_day) -> None: + repo = Repository() + values = repo.iv_rv_values_for_window( + db_one_day, + asset="ETH", + window_days=60, + as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), + ) + assert len(values) == 96 + assert values == sorted(values) + assert values[0] == Decimal("2.00") + + +def test_values_for_window_filters_other_asset(db_one_day) -> None: + repo = Repository() + values = repo.iv_rv_values_for_window( + db_one_day, + asset="BTC", + window_days=60, + as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), + ) + assert values == [] + + +def test_values_for_window_skips_null(db_one_day) -> None: + repo = Repository() + repo.record_market_snapshot( + db_one_day, + _snap(ts=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), iv_minus_rv=None), + ) + db_one_day.commit() + values = repo.iv_rv_values_for_window( + db_one_day, + asset="ETH", + window_days=60, + as_of=datetime(2026, 5, 3, 0, 0, tzinfo=UTC), + ) + assert len(values) == 96 + + +def test_values_for_window_skips_fetch_failed(db_one_day) -> None: + repo = Repository() + repo.record_market_snapshot( + db_one_day, + _snap( + ts=datetime(2026, 5, 3, 0, 0, tzinfo=UTC), + iv_minus_rv=Decimal("99"), + fetch_ok=False, + ), + ) + db_one_day.commit() + values = repo.iv_rv_values_for_window( + db_one_day, + asset="ETH", + window_days=60, + as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC), + ) + assert Decimal("99") not in values + + +def test_values_for_window_respects_window_cutoff( + db_three_days_mixed, +) -> None: + """window_days=1 da as_of=2026-05-04 → solo day3 (4 valori 200..203).""" + repo = Repository() + values = repo.iv_rv_values_for_window( + db_three_days_mixed, + asset="ETH", + window_days=1, + as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC), + ) + assert values == [Decimal(200 + i) for i in range(4)] + + +def test_values_for_window_full_window(db_three_days_mixed) -> None: + """window_days=60: tutti i valori dei 3 giorni (96 + 1 + 4 = 101).""" + repo = Repository() + values = repo.iv_rv_values_for_window( + db_three_days_mixed, + asset="ETH", + window_days=60, + as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC), + ) + assert len(values) == 101 + + +def test_values_for_window_rejects_naive_as_of(db_one_day) -> None: + repo = Repository() + with pytest.raises(ValueError, match="timezone-aware"): + repo.iv_rv_values_for_window( + db_one_day, + asset="ETH", + window_days=60, + as_of=datetime(2026, 5, 2, 0, 0), + ) + + +def test_values_for_window_rejects_non_positive_window(db_one_day) -> None: + repo = Repository() + with pytest.raises(ValueError, match="window_days must be positive"): + repo.iv_rv_values_for_window( + db_one_day, + asset="ETH", + window_days=0, + as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), + ) + + +# --------------------------------------------------------------------------- +# dvol_lookback (regression — invariato dopo refactor) +# --------------------------------------------------------------------------- + + +def test_dvol_lookback_returns_closest_tick(db_one_day) -> None: + repo = Repository() + base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC) + target = base + timedelta(hours=12) + out = repo.dvol_lookback( + db_one_day, asset="ETH", reference=target, tolerance_minutes=15 + ) + # i=48 → dvol = 50 + 4.8 = 54.8 + assert out == Decimal("54.8") + + +def test_dvol_lookback_returns_none_when_gap(db_one_day) -> None: + repo = Repository() + target = datetime(2025, 1, 1, 0, 0, tzinfo=UTC) + out = repo.dvol_lookback( + db_one_day, asset="ETH", reference=target, tolerance_minutes=15 + ) + assert out is None + + +def test_dvol_lookback_rejects_naive_reference(db_one_day) -> None: repo = Repository() with pytest.raises(ValueError, match="timezone-aware"): repo.dvol_lookback( - db_with_history, + db_one_day, asset="ETH", - reference=datetime(2026, 5, 1, 12, 0), # naive + reference=datetime(2026, 5, 1, 12, 0), )