23 Commits

Author SHA1 Message Date
root 76d1a4a32d chore(gitignore): ignore .omc/ (oh-my-claudecode session/memory dir)
Directory creata localmente dall'infrastruttura OMC per stato sessione
(project-memory.json, research/, sessions/, state/). Non è artefatto
del progetto cerbero-bite.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 09:08:00 +00:00
root e978a44bff feat(gui): Strategia pannello P/L con slider sizing + fix max_loss
Pannello "P/L atteso — Conservativa vs Aggressiva":
* Sostituiti slider Capitale/Spot con slider parametrici Cap/trade
  (EUR) + posizioni concorrenti. Il capitale richiesto viene calcolato
  in automatico via Kelly-binding aggregato:
  capital = cap_pertrade_usd × concorrenza / max(kelly, 1e-3).
* Profili Conservativa/Aggressiva ora ereditano dai yaml SOLO le leve
  qualitative (width_pct, credit_ratio, kelly_fraction, feature
  attive); le leve di sizing (cap, concorrenza) sono comandate dagli
  slider per confronti omogenei.
* Tre metriche header: capitale richiesto, cap aggregato notional,
  cap per trade USD.

Fix in `_compute_pl`:
* Max loss per contratto era `width` (errato per credit spread).
  Corretto a `width − credit` allineato a core/sizing_engine.py.
  Effetto: n_kelly aumenta proporzionalmente al credit incassato →
  P/L stimato più realistico per spread con credit_to_width_ratio
  alto (es. 0.30+ in profilo Aggressiva).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 09:00:42 +00:00
root efa829f7aa feat(runtime): orchestrator option-chain snapshot multi-asset (ETH+BTC)
Sostituisce `option_chain_asset: str = "ETH"` con
`option_chain_assets: tuple[str, ...] = ("ETH", "BTC")` e itera nel
job schedulato. Coerente con `market_snapshot_assets` già multi-asset
e con i 64 strikes BTC + 51 strikes ETH già visibili in
option_chain_snapshots.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 09:00:24 +00:00
root b1836d91c2 refactor(core): IV-RV adattivo distinct-days policy + backfill Deribit
Sblocca il warmup hard del gate IV-RV adattivo (~21 giorni residui)
permettendo di mischiare cadenze diverse (tick live 15min + backfill
giornaliero) senza assumere il fattore costante 96 tick/giorno.

API change (no backwards-compat shims):
* compute_adaptive_threshold(history, *, n_days, percentile,
  absolute_floor): rimossi `min_days`/`target_days`. La selezione
  finestra (target_days/min_days/intera storia) si sposta al caller.
  Warmup hard quando `n_days == 0`.
* repository: rimosso `iv_rv_history`; aggiunti
  `count_iv_rv_distinct_days` (COUNT DISTINCT substr(ts,1,10)) e
  `iv_rv_values_for_window`.
* EntryContext aggiunge `iv_rv_n_days: int = 0`. entry_cycle calcola
  n_days, sceglie window_days e popola il context. Audit
  `iv_rv_n_days` reale (non più len/96).
* GUI Calibrazione: counter giorni distinti tramite set di date.
* Spec aggiornata con errata 2026-05-10 e nuova warmup table.

Backfill (scripts/backfill_iv_rv.py, stdlib-only):
* Fetch DVOL daily + ETH/BTC-PERPETUAL closes da Deribit public REST.
* Calcolo RV30d annualizzato (stdev log-return × √365 × 100).
* INSERT OR REPLACE in market_snapshots con timestamp 12:00 UTC e
  fetch_errors_json='{"backfill":true}' per distinzione audit.
* Compute layer testato (9 test): RV su prezzi costanti/monotoni/
  alternati, build_records con cutoff e missing data.

Verifica live post-deploy (10 mag 2026 08:50 UTC):
* ETH: n_days=46, P25=2.21 vol pt, IV-RV=10.05 → gate PASS
* BTC: n_days=46, P25=5.69 vol pt, IV-RV=8.60  → gate PASS

509 test passati (500 esistenti + 9 backfill), ruff pulito.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 08:52:05 +00:00
root 6f4f2ce02e feat(runtime): audit log include threshold rolling e window usata
Risponde al final review (spec §6.4): il decisions log ora
contiene iv_rv_threshold_used (la soglia P_q effettivamente
applicata) e iv_rv_window_used_days (giorni di history nella
finestra). Permette ricostruire ex-post perché un trade è stato
saltato e con quali numeri.

Helper privi di I/O — la soglia viene ricomputata in base alla
history già caricata, costo trascurabile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:27:40 +00:00
root d2ff29fde3 fix(config): aggressiva config_hash ricalcolato post adaptive gate
L'hash dichiarato non rispecchiava più il contenuto del file dopo
l'attivazione del gate adattivo (commit 080acf8). Senza questo fix
il loader sollevava ConfigHashError e l'orchestrator rifiutava il
profilo Aggressiva al boot.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:26:24 +00:00
root eb0662e44d chore(lint): adaptive_threshold imports + entry_validator top-level import
- Sequence importato da collections.abc invece di typing (PYI001).
- compute_adaptive_threshold spostato a top-level (PLC0415):
  niente circular dep risk perché adaptive_threshold non importa
  da entry_validator.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:20:54 +00:00
root 64f4d4e09e feat(gui): pannello informativo Gate IV-RV adattivo in Calibrazione
Mostra status (warmup/attivo), soglia P25 rolling corrente, IV-RV
ultimo tick, floor assoluto, decisione hypothetical e sezione
Vol-of-Vol guard. Read-only: i percentili statici esistenti
restano per analisi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:14:34 +00:00
root 080acf829d feat(config): profilo Aggressiva attiva gate IV-RV adattivo + VoV
P25 rolling 60g, warmup a finestra disponibile, VoV guard 5pt.
Conservativa e golden invariati (default disabled).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:11:46 +00:00
root 98111814d2 test(integration): IV-RV adaptive gate end-to-end con SQLite reale
Verifica integrazione tra Repository.iv_rv_history,
compute_adaptive_threshold e dvol_lookback su un DB reale
seedato con 30 giorni di market_snapshots bimodale.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:09:27 +00:00
root 3190764f64 feat(runtime): entry_cycle popola iv_rv_history e dvol_24h_ago
Quando i flag adaptive_enabled / vol_of_vol_guard_enabled sono
attivi, entry_cycle carica history e lookback dal repository
prima di costruire EntryContext. Il decisions log riceve i meta
n_history e dvol_24h_ago per audit ex-post.

Quando i flag sono off, niente query DB extra (zero overhead).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:04:19 +00:00
root 8221aba10f fix(state): repository iv_rv_history time-stable + input validation
Risponde alla code review di 395191e:
- iv_rv_history accetta as_of (default now UTC) invece di
  affidarsi al clock SQLite, rendendo i test time-stable.
- Valida max_days > 0 e raise se as_of/reference sono naive.
- Aggiunge 3 test sulle nuove guard.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:00:06 +00:00
root 395191ea13 feat(state): Repository.iv_rv_history + dvol_lookback per gate adaptive
Due nuovi metodi che leggono market_snapshots filtrando NULL e
fetch_ok=0. iv_rv_history limita a max_days; dvol_lookback trova
il tick più vicino a un istante con tolerance configurabile.

Tests: ordered ASC, asset filter, NULL skip, fetch_ok=0 skip,
lookback closest, gap returns None.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:53:19 +00:00
root d36cdff609 feat(core): Vol-of-Vol guard in validate_entry + tests
Blocca entry se |DVOL_now - DVOL_24h_ago| >= threshold (default
5 pt). Fail-open quando dvol_24h_ago è None (gap dati). Independente
dal gate IV-RV: i due gate sono additivi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:36:59 +00:00
root 3a5cf2554b feat(core): IV-RV adaptive gate in validate_entry + tests
Quando iv_minus_rv_adaptive_enabled=True, la soglia diventa
max(P_q rolling, iv_minus_rv_min). Path legacy (statico) e
None-bypass restano invariati.

Aggiunge anche due model_validator a StrategyConfig per
fail-fast su config invalida (window_min_days < target_days,
percentile in (0,1)) — risponde alla code review T1.

Tests: pass/skip su rolling, warmup hard, floor binding,
backwards compat statico, None bypass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:29:48 +00:00
root ef3c512684 feat(core): EntryContext aggiunge iv_rv_history e dvol_24h_ago
Campi opzionali con default vuoto/None per non rompere i caller
esistenti. Saranno popolati da entry_cycle quando i flag
adaptive_enabled / vol_of_vol_guard_enabled sono True.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:24:43 +00:00
root 6eff8aab0f fix(core): adaptive_threshold input validation + boundary tests
Risponde alla code review di 7dc2fda:
- Valida percentile in [0,1] e 0 < min_days < target_days, raise
  ValueError quando out-of-range. Fail-fast invece di IndexError o
  silent wrong result.
- Aggiunge test boundary esattamente a min_days*96 e target_days*96
  (spec §9.1 item 9 era mancante).
- Aggiunge 4 test sulle nuove guards.
- Fix typo docstring (Determinismic → Deterministic).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:11:17 +00:00
root 7dc2fda524 feat(core): compute_adaptive_threshold pure function + tests
Implementa il calcolo del percentile rolling con warmup,
transizione min_days → target_days e floor assoluto. Pure
function senza I/O: il caller passa la sequenza pre-filtrata
(NULL e fetch_ok=0 esclusi).

Tests: warmup, transizione finestra, floor, percentili.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 21:36:50 +00:00
root 0fcfff7d7e feat(config): EntryConfig campi adaptive IV-RV gate + VoV guard
Aggiunge i flag e i parametri per il gate IV-RV adattivo (P25
rolling) e per il Vol-of-Vol guard. Default disabilitati per
non cambiare comportamento dei profili attuali.

Vedi docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 20:10:07 +00:00
root f889258952 docs(plans): IV-RV adaptive gate implementation plan
Piano TDD bite-sized in 11 task con steps dettagliati, codice
completo, comandi e expected output. Coverage completa dello
spec 2026-05-08-iv-rv-adaptive-gate-design.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 19:58:32 +00:00
root 2a4a82c8ef docs(specs): IV-RV adaptive gate design
Spec del gate IV-RV adattivo (P25 rolling 60g + Vol-of-Vol guard 5pt
24h) — riprende roadmap §4-quater di 13-strategia-spiegata.md punti
1 e 2 e li promuove a design pronto per implementazione.

Decisioni emerse dal brainstorming:
- Hybrid (percentile rolling + VoV guard), non regime detection
- Window target 60g, min 30g, sotto usa storia disponibile (warmup)
- Floor assoluto via vecchio iv_minus_rv_min (backwards compat)
- Inline nel validator, stateless, no DB cache
- GUI Calibrazione: pannello informativo, slider esistenti invariati
- Fail-open su tutti i casi di dato mancante

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 19:50:56 +00:00
root 467c8952e3 docs(migrations): 0005 — commento allineato a cadenza */15 reale
Il commento dichiarava cron settimanale (55 13 * * MON) ma lo
scheduler reale (orchestrator._CRON_OPTION_CHAIN_SNAPSHOT) è */15
24/7, allineato a market_snapshot. Aggiornato per evitare confusione
nei lettori futuri. Anche fixato l'header file (0004 → 0005).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 16:37:31 +00:00
root 3aaa059417 fix(deribit): DVOL midnight — finestra 1D estesa a ieri+oggi
Alle 00:00 UTC Deribit non ha ancora costruito il candle 1D di oggi:
con start_date=oggi la response è vuota e il client tirava
McpDataAnomalyError ('neither latest nor candles'). Includendo ieri
nello start_date, candles[-1] resta valido come fallback.

Verificato sui dati raccolti: 3 fail consecutivi 2026-05-02/03/04 a
00:00 UTC su ETH, zero fail dal 2026-05-05 in poi (container
rebuildato in mezzo al periodo).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 16:37:23 +00:00
20 changed files with 3840 additions and 37 deletions
+3
View File
@@ -43,3 +43,6 @@ data/
.env
.env.*
!.env.example
# Tooling state (oh-my-claudecode session/memory dir)
.omc/
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,316 @@
# IV-RV adaptive entry gate — design
**Status**: drafted, awaiting implementation plan
**Date**: 2026-05-08
**Author**: brainstorming session (operator + Claude)
**Roadmap origin**: `docs/13-strategia-spiegata.md` §4-quater, hardening punti 1 e 2
## 1. Problema
Il gate IV-richness in `core/entry_validator.py:140-152` confronta `ctx.iv_minus_rv` con la soglia statica `entry.iv_minus_rv_min` (config). I dati raccolti in `market_snapshots` mostrano due problemi sul campione 2026-05-01 → 2026-05-08:
| metrica | ETH | BTC |
|---|---|---|
| IV-RV p25 | 1.87 | 5.48 |
| IV-RV p50 | 2.70 | 6.88 |
| IV-RV p90 | 8.52 | 8.26 |
| Pearson(\|drift%\|, IV-RV mean) | **0.02** | **+0.54** |
| Trend giornaliero IV-RV mean | 1.64 → 8.96 (+5.4×) | 4.78 → 8.11 (+1.7×) |
ETH mostra un IV richening monotono **decoupled dal drift realised** — la soglia statica `min=3` (profilo Aggressiva) avrebbe escluso il 50%+ dei tick nei primi 4 giorni e il 0% degli ultimi 3, sopra una distribuzione che non è stazionaria. Su BTC è meno drammatico ma il problema è strutturale: il regime di IV cambia, la soglia no.
## 2. Obiettivo
Sostituire la soglia statica con un meccanismo adattivo che si auto-calibra al regime corrente, senza richiedere intervento manuale dell'operatore. Il gate deve restare semanticamente "non vendere vol senza margine misurabile sopra la RV", solo che il "margine misurabile" è ora derivato dalla distribuzione storica recente invece che hardcoded.
## 3. Approccio scelto: Hybrid (P25 rolling + Vol-of-Vol guard)
Decisione presa nel brainstorming dopo aver scartato:
- **Solo percentile rolling**: insufficiente, non protegge da regime shift bruschi (DVOL salta di 5+ pt in 24h)
- **Solo regime detection (HMM/cluster)**: troppo opaco e ad alto rischio di overfit con 8 giorni di dati
L'hybrid bilancia due controlli additivi:
1. **Soglia adattiva** = P25 di IV-RV nella finestra rolling
2. **Vol-of-Vol guard** = blocco se |ΔDVOL_24h| ≥ 5 pt (regime shift detector)
## 4. Comportamento del gate
> **Errata 2026-05-10** — design originale assumeva `n_days = len(history) // 96` (cadenza fissa 96 tick/giorno). Refattorizzato a **distinct-days policy**: il caller interroga il repository per (a) il numero di giorni di calendario distinti coperti e (b) i valori della finestra scelta. Questo permette di mischiare cadenze (tick live 15 min + backfill daily) senza assumere un fattore costante. Sotto il pseudo-codice aggiornato.
```
def validate_iv_richness_adaptive(ctx, cfg, repo):
if not cfg.entry.iv_minus_rv_filter_enabled:
return PASS # gate off
# 1) Soglia adattiva — distinct-days policy
n_days = repo.count_iv_rv_distinct_days(
asset=ctx.asset, max_days=cfg.window_target_days,
)
if n_days < 1:
return PASS # warmup hard: nessun giorno coperto
if n_days >= cfg.window_target_days:
window_days = cfg.window_target_days # ≥60g → finestra fissa 60g
elif n_days >= cfg.window_min_days:
window_days = cfg.window_min_days # 30-60g → finestra fissa 30g
else:
window_days = cfg.window_target_days # 1-30g → query tutta la storia disp.
history = repo.iv_rv_values_for_window(
asset=ctx.asset, window_days=window_days,
)
threshold = max(percentile(history, cfg.percentile),
cfg.absolute_floor)
if ctx.iv_minus_rv < threshold:
return SKIP("IV richness below P25 rolling")
# 2) Vol-of-vol guard (additivo)
if cfg.vol_of_vol_guard_enabled:
dvol_24h_ago = repo.dvol_lookback(asset=ctx.asset, hours=24)
if dvol_24h_ago is not None and \
abs(ctx.dvol - dvol_24h_ago) >= cfg.vol_of_vol_threshold:
return SKIP("DVOL shifted ≥5pt in 24h")
return PASS
```
### 4.1 Warmup behavior
Tutte le soglie sono espresse in **giorni di calendario distinti** coperti da almeno un record valido (`fetch_ok=1``iv_minus_rv IS NOT NULL`).
| storia disponibile | finestra usata | comportamento |
|---|---|---|
| 0 giorni distinti | — | gate disabled (PASS), log `GATE_WARMUP_INSUFFICIENT` |
| 1 g ≤ giorni < 30 g | tutta la storia | percentile della finestra disponibile (decisione utente) |
| 30 g ≤ giorni < 60 g | ultimi 30 g | finestra fissa 30g |
| ≥ 60 g | ultimi 60 g | finestra fissa 60g (target) |
I valori della finestra contribuiscono uno-a-uno al percentile: un tick a 15 min e un record di backfill daily hanno lo stesso peso. Mix di cadenze diverse è statisticamente sbilanciato finché i tick live non saturano la finestra; questa è una scelta deliberata per non rinunciare allo storico backfill.
### 4.2 Soglia = `max(P25, floor)`
`floor` è il vecchio `iv_minus_rv_min` riutilizzato come *absolute floor*. Permette:
- backwards compat: se `adaptive_enabled=False`, comportamento identico ad oggi
- safety: anche se P25 storico fosse ≈0 (regime IV bassa persistente), l'operatore può tenere un floor minimo (es. 1 vol pt) per evitare di vendere vol mai
## 5. Schema config (`config/schema.py`)
Aggiunte alla classe `EntryConfig`:
```python
class EntryConfig(BaseModel):
# campi esistenti
iv_minus_rv_filter_enabled: bool = False
iv_minus_rv_min: Decimal = Decimal("0") # ora è absolute_floor
# nuovi — gate adattivo
iv_minus_rv_adaptive_enabled: bool = False
iv_minus_rv_percentile: Decimal = Decimal("0.25")
iv_minus_rv_window_target_days: int = 60
iv_minus_rv_window_min_days: int = 30
# nuovi — vol-of-vol guard
vol_of_vol_guard_enabled: bool = False
vol_of_vol_threshold_pt: Decimal = Decimal("5")
vol_of_vol_lookback_hours: int = 24
```
### 5.1 Profili predefiniti
**Conservativa / golden** (`config/golden.yaml`):
```yaml
entry:
iv_minus_rv_filter_enabled: false
iv_minus_rv_adaptive_enabled: false
vol_of_vol_guard_enabled: false
```
Comportamento invariato rispetto a oggi.
**Aggressiva** (`config/aggressive.yaml`):
```yaml
entry:
iv_minus_rv_filter_enabled: true
iv_minus_rv_adaptive_enabled: true
iv_minus_rv_min: 0 # floor 0, lascia decidere il P25 rolling
iv_minus_rv_percentile: 0.25
iv_minus_rv_window_target_days: 60
iv_minus_rv_window_min_days: 30
vol_of_vol_guard_enabled: true
vol_of_vol_threshold_pt: 5
```
### 5.2 Backwards compat
Se `iv_minus_rv_adaptive_enabled=False` e `iv_minus_rv_filter_enabled=True`, il validator usa il path legacy `iv_rv < iv_minus_rv_min` esattamente come oggi. Nessuna regressione comportamentale per chi non ha attivato l'adaptive.
## 6. Architettura
### 6.1 Modulo `core/adaptive_threshold.py`
Funzione pura, testabile senza I/O. La selezione della finestra è
delegata al caller (separation of concerns):
```python
def compute_adaptive_threshold(
history: Sequence[Decimal],
*,
n_days: int,
percentile: Decimal,
absolute_floor: Decimal,
) -> Decimal | None:
"""Ritorna None se warmup hard (n_days==0 o history vuota),
altrimenti max(P_q(history), absolute_floor)."""
```
### 6.2 Repository (`state/repository.py`)
Tre metodi su `Repository` (uno preesistente):
```python
def count_iv_rv_distinct_days(
self, *, asset: str, max_days: int, as_of: datetime | None = None,
) -> int:
"""Numero di giorni di calendario distinti con almeno un IV-RV
valido nell'intervallo [as_of - max_days, as_of]."""
def iv_rv_values_for_window(
self, *, asset: str, window_days: int, as_of: datetime | None = None,
) -> list[Decimal]:
"""Valori IV-RV ordinati ASC su [as_of - window_days, as_of]."""
def dvol_lookback(self, *, asset: str, hours: int) -> Decimal | None:
"""DVOL del tick più vicino a now-hours, ±15min tolerance. None se gap."""
```
Usa l'index esistente `idx_market_snapshots_asset_ts`. Nessuna nuova migration.
### 6.3 Inline nel validator
`core/entry_validator.py` chiama `compute_adaptive_threshold` con i dati dal repo. Nessun caching, stateless. La query per finestra 60g (5760 righe per asset) costa ms-level con index — non vale la pena introdurre cache da invalidare.
### 6.4 Audit / logging
Ogni entry cycle scrive in `decisions`:
- `inputs_json`: `{iv_rv_now, threshold_used, dvol_now, dvol_24h_ago, n_history, window_used_days}`
- `outputs_json`: `{gate: "iv_richness_adaptive", verdict: PASS|SKIP, reason}`
Permette ricostruzione ex-post: perché un trade è stato saltato e con quali numeri.
## 7. GUI Calibrazione (`pages/6_📐_Calibrazione.py`)
Aggiunta sezione "Gate adattivo" sopra ai percentili statici esistenti — questi ultimi NON vengono modificati (restano per analisi).
```
┌─ 🎯 Gate IV-RV adattivo ──────────────────────────────────┐
│ Status: 🟢 Attivo (Aggressiva) | 🟡 Warmup (n=8/30g) │
│ │
│ Soglia P25 rolling (corrente) 2.74 vol pts │
│ IV-RV ultimo tick 8.96 vol pts ✅ │
│ Floor assoluto 0.00 vol pts │
│ │
│ Evoluzione 7g (sparkline) ▁▂▂▃▄▆▇ │
│ │
│ ── VoV guard ── │
│ ΔDVOL ultime 24h 0.43 pt ✅ │
│ Soglia VoV 5.00 pt │
│ │
│ Decisione hypothetical: PASS │
└──────────────────────────────────────────────────────────┘
```
La GUI usa la stessa funzione `compute_adaptive_threshold` del validator → unica fonte di verità. Refresh manuale al page load (coerente con resto GUI).
## 8. Error handling
Principio: **fail-open** in tutti i casi di dato mancante. Il gate adattivo è additivo sopra ai gate hard esistenti (delta band, credit, ecc.); il dato mancante non deve trasformare un trade non voluto in trade fatto, ma neppure deve bloccare entry valide se è il dato del gate stesso a mancare.
| Scenario | Comportamento | Loggato come |
|---|---|---|
| `iv_rv_history` ritorna 0 righe | gate = PASS | `GATE_WARMUP_NO_DATA` |
| Storia < 96 tick (1g) | gate = PASS, threshold None | `GATE_WARMUP_INSUFFICIENT` |
| `dvol_lookback` = None (gap dati 24h fa) | VoV guard = PASS | `VOV_GUARD_NO_LOOKBACK` |
| `ctx.iv_minus_rv` = None | gate bypassato (riga 146 esistente) | invariato |
| `ctx.dvol` = None | VoV guard bypassato, gate adattivo prosegue | invariato |
## 9. Testing strategy
### 9.1 Unit — `core/adaptive_threshold.py`
- Warmup: `n_days=0` → None
- Warmup difensivo: `n_days=0` ma history non vuota → None
- Difensivo: history vuota con `n_days>0` → None
- `n_days=1`, 96 tick → P25 sui 96
- Mix di cadenze (30 daily + 96 live) → percentile uno-a-uno
- Floor binding: P25=0.5, floor=3 → 3
- Floor non binding: P25=5, floor=0 → 5
- Percentile diverso: percentile=0.5 → mediana
- Validation: percentile ∉ [0,1] o `n_days<0` → ValueError
### 9.1bis Unit — `state/repository.py`
- `count_iv_rv_distinct_days`: 1 giorno → 1; 3 giorni misti → 3
- esclusione asset diversi, NULL e fetch_ok=0
- rispetto del cutoff `max_days`
- ValueError su `as_of` naive o `max_days≤0`
- `iv_rv_values_for_window`: ordine ASC, filtri equivalenti, ValueError input
### 9.2 Unit — `core/entry_validator.py`
Mock repo, focus sul flusso decisionale:
- Adaptive disabled, statico passa → PASS legacy
- Adaptive disabled, statico fail → SKIP legacy
- Adaptive enabled, IV-RV sopra P25 → PASS
- Adaptive enabled, IV-RV sotto P25 sopra floor → SKIP("rolling")
- VoV guard ON, ΔDVOL=6 pt → SKIP("vov")
- VoV guard ON, ΔDVOL=4 pt, gate principale pass → PASS
- VoV guard ON, dvol_lookback=None → guard bypass
- ctx.iv_minus_rv=None → bypass
- decisions log popolato con threshold, n_history, dvol_lookback
### 9.3 Integration — `tests/integration/test_entry_cycle_adaptive.py`
SQLite temp + fixture market_snapshots con 5760 tick (60g):
- Aggressiva con flag adattivo → entry passa solo nei tick sopra P25 della fixture
- Golden → invariato
- Warmup: DB con 50 tick → tutti pass, log `GATE_WARMUP_INSUFFICIENT`
- Regime shift fixture: DVOL salta da 50 a 56 in 24h → VoV guard scatta
### 9.4 Backtest sanity
Aggiunta nel report del CLI `backtest`: count distinto skip-reasons (`iv_rv_static`, `iv_rv_rolling`, `vov_guard`) per analisi ex-post.
### 9.5 GUI smoke
Manuale al deploy:
- Calibrazione carica con `enabled=False` (fallback grafico)
- Calibrazione mostra warmup status quando DB < 30g
- Refresh ricalcola coerente
### 9.6 Cosa NON testiamo
- Performance query (5760 righe con index = trascurabile)
- Concorrenza entry cycle / GUI (WAL abilitato)
- Migrazione (nessuna tabella nuova)
## 10. Out of scope
- Regime detection avanzato (HMM, cluster) — esplicitamente scartato per opacità
- Soglie per-asset diverse — il P25 si calibra naturalmente per asset (history filtrata per asset)
- Auto-attivazione adaptive su Conservativa quando warmup è completo — l'operatore decide manualmente quando passare al profilo aggressivo
- Multi-asset (ETH+BTC simultanei) — già scope §4-ter, indipendente da questo design
- Override manuale soglia da GUI — explicit no, l'obiettivo è autocalibrante
## 11. Decisioni prese durante brainstorming
| # | Domanda | Scelta |
|---|---|---|
| 1 | Approccio | Hybrid (percentile + VoV guard) |
| 2 | Warmup | Percentile della finestra disponibile (anche se <30g) |
| 3 | Percentile | P25 (allineato roadmap) |
| 4 | Window | Target 60g, attivazione a 30g, sotto usa quel che c'è |
| 5 | VoV soglia | 5 pt vol in 24h |
| 6 | Architettura | Inline nel validator, stateless, no cache DB |
| 7 | GUI | Pannello informativo aggiunto, slider esistenti invariati |
+281
View File
@@ -0,0 +1,281 @@
"""Backfill IV-RV history from Deribit public REST API.
Use case: il gate IV-RV adattivo richiede ≥30 giorni di storia per
attivarsi (spec ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``).
Quando la pipeline ha pochi giorni di tick live, questo script popola
``market_snapshots`` con record giornalieri storici calcolati da
DVOL Deribit + closes ETH-PERPETUAL/BTC-PERPETUAL pubblici.
Idempotente: usa ``INSERT OR REPLACE`` sulla PK ``(timestamp, asset)``
con timestamp fissato a 12:00 UTC del giorno di calendario.
``fetch_errors_json='{"backfill":true}'`` permette di distinguere i
record sintetici dai tick live in audit.
I record contribuiscono al gate adattivo come singoli punti
(distinct-days policy), uno per giorno: lo statistical bias è coperto
dalla spec §4.1.
Esempio:
python scripts/backfill_iv_rv.py --db data/state.sqlite --days 45
"""
from __future__ import annotations
import argparse
import json
import math
import sqlite3
import statistics
import urllib.request
from dataclasses import dataclass
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
__all__ = [
"BackfillRow",
"build_backfill_records",
"compute_rv30d_annualized",
]
_DERIBIT = "https://www.deribit.com/api/v2/public"
_RV_LOOKBACK_DAYS = 30
_TRADING_DAYS_PER_YEAR = 365
@dataclass(frozen=True)
class BackfillRow:
"""Una riga sintetica destinata a ``market_snapshots``."""
timestamp: datetime
asset: str
spot: Decimal
dvol: Decimal
realized_vol_30d: Decimal
iv_minus_rv: Decimal
fetch_ok: bool = True
# ---------------------------------------------------------------------------
# Pure compute layer (TDD: tests/unit/test_backfill_iv_rv.py)
# ---------------------------------------------------------------------------
def compute_rv30d_annualized(closes: list[Decimal]) -> Decimal:
"""Volatilità realizzata 30g annualizzata in **punti vol** (% annuali).
Args:
closes: ``31`` close consecutivi (uno al giorno) — produce 30
log-returns.
Returns:
``stdev(log_returns) * sqrt(365) * 100`` come ``Decimal``.
Raises:
ValueError: se ``len(closes) < 31``.
"""
if len(closes) < _RV_LOOKBACK_DAYS + 1:
raise ValueError(
f"need at least {_RV_LOOKBACK_DAYS + 1} closes, got {len(closes)}"
)
log_returns = [
math.log(float(closes[i] / closes[i - 1]))
for i in range(1, _RV_LOOKBACK_DAYS + 1)
]
sigma_daily = statistics.stdev(log_returns)
annualized = sigma_daily * math.sqrt(_TRADING_DAYS_PER_YEAR) * 100.0
return Decimal(str(annualized))
def build_backfill_records(
*,
asset: str,
spots_by_day: dict[str, Decimal],
dvols_by_day: dict[str, Decimal],
oldest_day: date,
) -> list[BackfillRow]:
"""Compone le righe di backfill per i giorni nella finestra richiesta.
Per ogni giorno target ``D`` (da ``oldest_day`` a oggi compreso) la
riga viene emessa solo se: (a) DVOL e spot sono presenti per ``D``,
(b) la serie di spot dispone dei 30 giorni precedenti necessari per
il calcolo di RV30d.
Il timestamp è fissato a 12:00 UTC, scelta che evita il rollover
delle candele Deribit (vedi anomalia DVOL 00:00 UTC nei market
snapshots live).
"""
sorted_days = sorted(spots_by_day.keys())
records: list[BackfillRow] = []
for day_str in sorted_days:
day = date.fromisoformat(day_str)
if day < oldest_day:
continue
if day_str not in dvols_by_day:
continue
rv_window = [
day - timedelta(days=i) for i in range(_RV_LOOKBACK_DAYS, -1, -1)
]
if not all(d.isoformat() in spots_by_day for d in rv_window):
continue
closes = [spots_by_day[d.isoformat()] for d in rv_window]
rv = compute_rv30d_annualized(closes)
dvol = dvols_by_day[day_str]
spot = spots_by_day[day_str]
records.append(
BackfillRow(
timestamp=datetime(day.year, day.month, day.day, 12, 0, tzinfo=UTC),
asset=asset,
spot=spot,
dvol=dvol,
realized_vol_30d=rv,
iv_minus_rv=dvol - rv,
)
)
return records
# ---------------------------------------------------------------------------
# I/O layer (network + sqlite)
# ---------------------------------------------------------------------------
def _http_get_json(url: str, timeout_s: float = 30.0) -> dict:
with urllib.request.urlopen(url, timeout=timeout_s) as resp:
return json.loads(resp.read())
def fetch_dvol_daily(currency: str, days: int) -> dict[str, Decimal]:
"""Mappa ``YYYY-MM-DD -> DVOL close`` per gli ultimi ``days`` giorni."""
end_ms = int(datetime.now(UTC).timestamp() * 1000)
start_ms = end_ms - days * 86_400_000
url = (
f"{_DERIBIT}/get_volatility_index_data"
f"?currency={currency}"
f"&start_timestamp={start_ms}&end_timestamp={end_ms}"
f"&resolution=86400"
)
payload = _http_get_json(url)
data = (payload.get("result") or {}).get("data") or []
out: dict[str, Decimal] = {}
for row in data:
# row = [ts_ms, open, high, low, close]
if not isinstance(row, list) or len(row) < 5:
continue
ts = datetime.fromtimestamp(row[0] / 1000, tz=UTC).date().isoformat()
out[ts] = Decimal(str(row[4]))
return out
def fetch_spot_daily(instrument: str, days: int) -> dict[str, Decimal]:
"""Mappa ``YYYY-MM-DD -> close USD`` per ``instrument`` su ``days`` giorni."""
end_ms = int(datetime.now(UTC).timestamp() * 1000)
start_ms = end_ms - days * 86_400_000
url = (
f"{_DERIBIT}/get_tradingview_chart_data"
f"?instrument_name={instrument}"
f"&start_timestamp={start_ms}&end_timestamp={end_ms}"
f"&resolution=1D"
)
payload = _http_get_json(url)
result = payload.get("result") or {}
ticks = result.get("ticks") or []
closes = result.get("close") or []
out: dict[str, Decimal] = {}
for ts_ms, close in zip(ticks, closes, strict=False):
ts = datetime.fromtimestamp(ts_ms / 1000, tz=UTC).date().isoformat()
out[ts] = Decimal(str(close))
return out
def write_records(db_path: str, records: list[BackfillRow]) -> int:
"""Insert/replace dei record in market_snapshots. Ritorna la rowcount."""
if not records:
return 0
conn = sqlite3.connect(db_path)
try:
with conn:
for r in records:
conn.execute(
"INSERT OR REPLACE INTO market_snapshots ("
"timestamp, asset, spot, dvol, realized_vol_30d, iv_minus_rv, "
"funding_perp_annualized, funding_cross_annualized, "
"dealer_net_gamma, gamma_flip_level, oi_delta_pct_4h, "
"liquidation_long_risk, liquidation_short_risk, "
"macro_days_to_event, fetch_ok, fetch_errors_json"
") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
r.timestamp.astimezone(UTC).isoformat(),
r.asset,
str(r.spot),
str(r.dvol),
str(r.realized_vol_30d),
str(r.iv_minus_rv),
None,
None,
None,
None,
None,
None,
None,
None,
1 if r.fetch_ok else 0,
'{"backfill":true}',
),
)
return len(records)
finally:
conn.close()
def backfill_asset(db_path: str, asset: str, days: int) -> int:
"""Esegue l'intero backfill per ``asset`` e ritorna il numero di
record inseriti/sostituiti.
"""
instrument = f"{asset.upper()}-PERPETUAL"
fetch_window_days = days + _RV_LOOKBACK_DAYS + 5 # margine per il lookback RV
spots = fetch_spot_daily(instrument, fetch_window_days)
dvols = fetch_dvol_daily(asset.upper(), fetch_window_days)
today = datetime.now(UTC).date()
oldest = today - timedelta(days=days)
records = build_backfill_records(
asset=asset.upper(),
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=oldest,
)
return write_records(db_path, records)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--db",
default="data/state.sqlite",
help="path a state.sqlite (default: data/state.sqlite)",
)
parser.add_argument(
"--days",
type=int,
default=45,
help="quanti giorni di backfill emettere (default: 45)",
)
parser.add_argument(
"--assets",
nargs="+",
default=["ETH", "BTC"],
help="asset symbols (default: ETH BTC)",
)
args = parser.parse_args()
total = 0
for asset in args.assets:
n = backfill_asset(args.db, asset, args.days)
print(f"{asset}: inserted/replaced {n} backfill rows")
total += n
print(f"TOTAL: {total}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+5 -2
View File
@@ -9,7 +9,7 @@ the ``core/`` algorithms stay in their preferred numeric domain.
from __future__ import annotations
import re
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Any, Literal
@@ -167,9 +167,12 @@ class DeribitClient:
) -> Decimal:
"""Return the latest DVOL value for ``currency``."""
when = (now or datetime.now(UTC)).astimezone(UTC)
# Window starts one day back so a tick fired exactly at 00:00 UTC
# — before Deribit has built today's 1D candle — still has
# yesterday's close to fall back on (see candles[-1] branch).
body = {
"currency": currency,
"start_date": (when.date()).isoformat(),
"start_date": (when.date() - timedelta(days=1)).isoformat(),
"end_date": when.date().isoformat(),
"resolution": "1D",
}
+28
View File
@@ -84,6 +84,24 @@ class EntryConfig(BaseModel):
iv_minus_rv_min: Decimal = Field(default=Decimal("0"))
iv_minus_rv_filter_enabled: bool = False
# IV richness gate adattivo (Phase 5+). Quando
# `iv_minus_rv_adaptive_enabled=True`, la soglia statica
# `iv_minus_rv_min` diventa il floor assoluto e la soglia
# effettiva è `max(P_q rolling, floor)` calcolata su
# `market_snapshots`. Vedi
# `docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md`.
iv_minus_rv_adaptive_enabled: bool = False
iv_minus_rv_percentile: Decimal = Field(default=Decimal("0.25"))
iv_minus_rv_window_target_days: int = 60
iv_minus_rv_window_min_days: int = 30
# Vol-of-Vol guard (§4-quater roadmap punto 2): blocca entry se
# |DVOL_now - DVOL_24h_ago| supera la soglia. Cattura regime
# shift bruschi non riflessi nel percentile rolling.
vol_of_vol_guard_enabled: bool = False
vol_of_vol_threshold_pt: Decimal = Field(default=Decimal("5"))
vol_of_vol_lookback_hours: int = 24
# ---------------------------------------------------------------------------
# Structure
@@ -389,6 +407,16 @@ class StrategyConfig(BaseModel):
if self.entry.dvol_min >= self.entry.dvol_max:
raise ValueError("dvol_min must be < dvol_max")
e = self.entry
if e.iv_minus_rv_window_min_days >= e.iv_minus_rv_window_target_days:
raise ValueError(
"iv_minus_rv_window_min_days must be < iv_minus_rv_window_target_days"
)
if not (Decimal("0") < e.iv_minus_rv_percentile < Decimal("1")):
raise ValueError(
"iv_minus_rv_percentile must be in (0, 1)"
)
return self
@@ -0,0 +1,75 @@
"""Funzione pura per calcolare la soglia adattiva del gate IV-RV.
Spec: ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``.
Deterministic, no I/O. La selezione della finestra (target_days vs
min_days vs intera storia disponibile) è responsabilità del caller, che
interroga il repository con i parametri corretti e passa qui sia i
valori (``history``) sia il numero di giorni distinti coperti
(``n_days``). Questo permette di mischiare cadenze diverse — tick live a
15 min e backfill daily — senza assumere un fattore costante
``ticks_per_day``.
"""
from __future__ import annotations
from collections.abc import Sequence
from decimal import Decimal
__all__ = ["compute_adaptive_threshold"]
def compute_adaptive_threshold(
history: Sequence[Decimal],
*,
n_days: int,
percentile: Decimal,
absolute_floor: Decimal,
) -> Decimal | None:
"""Ritorna la soglia adattiva o ``None`` durante il warmup hard.
Args:
history: Sequenza dei valori IV-RV nella finestra scelta dal
caller. NULL e tick non riusciti devono essere già stati
filtrati upstream. L'ordine non è significativo per il
percentile.
n_days: Numero di giorni distinti coperti dalla storia
disponibile (calcolato dal caller, tipicamente con
``COUNT(DISTINCT date(timestamp))``). ``0`` → warmup hard.
percentile: Quantile target nella distribuzione (es. ``0.25``).
absolute_floor: Floor minimo applicato dopo il calcolo del
percentile. La soglia restituita è
``max(P_q, absolute_floor)``.
Returns:
``None`` se ``n_days == 0`` o ``history`` è vuota (warmup hard,
gate disabilitato), altrimenti il percentile della finestra
bounded dal floor.
"""
if not (Decimal(0) <= percentile <= Decimal(1)):
raise ValueError(
f"percentile must be in [0, 1], got {percentile}"
)
if n_days < 0:
raise ValueError(f"n_days must be >= 0, got {n_days}")
if n_days == 0 or not history:
return None
return max(_percentile(history, percentile), absolute_floor)
def _percentile(values: Sequence[Decimal], q: Decimal) -> Decimal:
"""Linear-interpolated percentile, NumPy-compatible (method='linear').
Implementato in Decimal puro per evitare dipendenze numpy nel core.
"""
if not values:
raise ValueError("percentile of empty sequence")
sorted_v = sorted(values)
n = len(sorted_v)
k = (Decimal(n) - Decimal(1)) * q
f = int(k) # floor
c = min(f + 1, n - 1)
if f == c:
return sorted_v[f]
frac = k - Decimal(f)
return sorted_v[f] + (sorted_v[c] - sorted_v[f]) * frac
+53 -6
View File
@@ -15,6 +15,7 @@ from decimal import Decimal
from pydantic import BaseModel, ConfigDict
from cerbero_bite.config import SpreadType, StrategyConfig
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
__all__ = [
"EntryContext",
@@ -50,6 +51,25 @@ class EntryContext(BaseModel):
# invalida l'entry).
iv_minus_rv: Decimal | None = None
# Valori IV-RV nella finestra rolling già scelta dal caller
# (entry_cycle): tutti i record validi su window_days, ASC, NULL e
# fetch_ok=0 esclusi. Caricata dal repository quando
# `iv_minus_rv_adaptive_enabled` è True. Tuple per coerenza con
# frozen=True.
iv_rv_history: tuple[Decimal, ...] = ()
# Numero di giorni di calendario distinti coperti dalla storia
# IV-RV disponibile (non solo dalla finestra `iv_rv_history`).
# ``0`` = warmup hard, gate disabilitato (fail-open). Calcolato dal
# caller via `repository.count_iv_rv_distinct_days`.
iv_rv_n_days: int = 0
# DVOL al tick più vicino a now - vol_of_vol_lookback_hours.
# ``None`` = gap nel dato (es. cron mancante 24h fa) → VoV guard
# skip. Caricato dal repository in `entry_cycle` quando
# `vol_of_vol_guard_enabled` è True.
dvol_24h_ago: Decimal | None = None
class EntryDecision(BaseModel):
"""Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons."""
@@ -140,17 +160,44 @@ def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision:
# §2.9: IV richness gate. Vendere vol senza un margine misurabile
# fra IV e RV è statisticamente neutro: l'edge della strategia
# esiste solo quando il premio è "ricco" rispetto a quanto il
# mercato si è effettivamente mosso.
if (
entry_cfg.iv_minus_rv_filter_enabled
and ctx.iv_minus_rv is not None
and ctx.iv_minus_rv < entry_cfg.iv_minus_rv_min
):
# mercato si è effettivamente mosso. La modalità adattiva calcola
# la soglia come max(P_q rolling, iv_minus_rv_min) sulla storia
# disponibile in market_snapshots; altrimenti fallback alla
# soglia statica `iv_minus_rv_min`.
if entry_cfg.iv_minus_rv_filter_enabled and ctx.iv_minus_rv is not None:
if entry_cfg.iv_minus_rv_adaptive_enabled:
threshold = compute_adaptive_threshold(
history=ctx.iv_rv_history,
n_days=ctx.iv_rv_n_days,
percentile=entry_cfg.iv_minus_rv_percentile,
absolute_floor=entry_cfg.iv_minus_rv_min,
)
if threshold is not None and ctx.iv_minus_rv < threshold:
pct = int(entry_cfg.iv_minus_rv_percentile * 100)
reasons.append(
f"IV richness below P{pct} rolling "
f"(IV-RV={ctx.iv_minus_rv} < {threshold} vol pts)"
)
elif ctx.iv_minus_rv < entry_cfg.iv_minus_rv_min:
reasons.append(
f"IV richness below floor "
f"(IV-RV={ctx.iv_minus_rv} < {entry_cfg.iv_minus_rv_min} vol pts)"
)
# §4-quater roadmap: vol-of-vol guard. Blocca entry quando il
# regime di volatilità sta cambiando bruscamente, anche se IV-RV
# è alto. Fail-open su gap dati 24h fa.
if (
entry_cfg.vol_of_vol_guard_enabled
and ctx.dvol_24h_ago is not None
):
delta = abs(ctx.dvol_now - ctx.dvol_24h_ago)
if delta >= entry_cfg.vol_of_vol_threshold_pt:
reasons.append(
f"DVOL shifted {delta} pt in {entry_cfg.vol_of_vol_lookback_hours}h "
f"(threshold {entry_cfg.vol_of_vol_threshold_pt})"
)
return EntryDecision(accepted=not reasons, reasons=reasons)
@@ -16,6 +16,7 @@ from __future__ import annotations
import os
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from pathlib import Path
import pandas as pd
@@ -23,6 +24,7 @@ import plotly.graph_objects as go
import streamlit as st
from cerbero_bite.config.loader import load_strategy
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.gui.data_layer import (
DEFAULT_DB_PATH,
humanize_dt,
@@ -242,6 +244,132 @@ def _render_metric(spec: MetricSpec, records: list[MarketSnapshotRecord]) -> Non
_percentiles_strip(s)
def _render_adaptive_gate_panel(
strategy: object | None,
records: list[MarketSnapshotRecord],
) -> None:
"""Pannello informativo sul gate IV-RV adattivo (read-only)."""
if strategy is None:
return
try:
entry = strategy.entry # type: ignore[attr-defined]
except AttributeError:
return
if not getattr(entry, "iv_minus_rv_filter_enabled", False):
st.subheader("🎯 Gate IV-RV adattivo")
st.info("Gate IV-RV disabilitato nel profilo corrente.")
st.divider()
return
st.subheader("🎯 Gate IV-RV adattivo")
# records DESC (newest first) → history ASC con NULL/fetch_ok=0 esclusi
iv_rv_history: list[Decimal] = []
distinct_days: set[str] = set()
for r in reversed(records):
if r.fetch_ok and r.iv_minus_rv is not None:
iv_rv_history.append(r.iv_minus_rv)
distinct_days.add(r.timestamp.date().isoformat())
n_ticks = len(iv_rv_history)
n_days = len(distinct_days)
target = int(getattr(entry, "iv_minus_rv_window_target_days", 60))
min_days = int(getattr(entry, "iv_minus_rv_window_min_days", 30))
if n_days < 1:
status = "🟡 Warmup hard (nessun giorno coperto)"
elif n_days < min_days:
status = f"🟡 Warmup ({n_days}/{min_days}g — finestra crescente)"
elif n_days < target:
status = f"🟢 Attivo (finestra {min_days}g, target {target}g)"
else:
status = f"🟢 Attivo (finestra stabile {target}g)"
st.markdown(f"**Status:** {status} · {n_ticks} tick complessivi")
# Latest tick
iv_rv_now: Decimal | None = None
dvol_now: Decimal | None = None
latest_ts: datetime | None = None
for r in records: # records DESC
if r.fetch_ok:
iv_rv_now = r.iv_minus_rv
dvol_now = r.dvol
latest_ts = r.timestamp
break
adaptive_on = bool(getattr(entry, "iv_minus_rv_adaptive_enabled", False))
floor = Decimal(str(getattr(entry, "iv_minus_rv_min", "0")))
if adaptive_on:
percentile = Decimal(
str(getattr(entry, "iv_minus_rv_percentile", "0.25"))
)
try:
threshold = compute_adaptive_threshold(
history=iv_rv_history,
n_days=n_days,
percentile=percentile,
absolute_floor=floor,
)
except ValueError as exc:
st.warning(f"Configurazione gate non valida: {exc}")
threshold = None
c1, c2, c3 = st.columns(3)
pct_label = int(percentile * 100)
c1.metric(
f"Soglia P{pct_label} rolling",
f"{threshold:.2f}" if threshold is not None else "",
help="Soglia adattiva = max(percentile, floor)",
)
c2.metric(
"IV-RV ultimo tick",
f"{iv_rv_now:.2f}" if iv_rv_now is not None else "",
)
c3.metric("Floor assoluto", f"{floor:.2f}")
if threshold is not None and iv_rv_now is not None:
verdict = "✅ PASS" if iv_rv_now >= threshold else "❌ SKIP"
st.markdown(f"**Decisione hypothetical:** {verdict}")
else:
st.write(f"Modalità statica: floor = {floor} vol pts")
if bool(getattr(entry, "vol_of_vol_guard_enabled", False)):
st.markdown("---")
st.markdown("**Vol-of-Vol guard**")
threshold_pt = Decimal(
str(getattr(entry, "vol_of_vol_threshold_pt", "5"))
)
lookback_h = int(getattr(entry, "vol_of_vol_lookback_hours", 24))
# Find tick closest to latest_ts - lookback hours, tolerance 15 min
dvol_lookback: Decimal | None = None
if latest_ts is not None and dvol_now is not None:
target_ts = latest_ts - timedelta(hours=lookback_h)
best_delta = timedelta(minutes=15)
for r in records:
if not r.fetch_ok or r.dvol is None:
continue
d = abs(r.timestamp - target_ts)
if d <= best_delta:
best_delta = d
dvol_lookback = r.dvol
if dvol_lookback is not None and dvol_now is not None:
delta = abs(dvol_now - dvol_lookback)
c1, c2 = st.columns(2)
c1.metric(f"|ΔDVOL {lookback_h}h|", f"{delta:.2f}")
c2.metric("Soglia VoV", f"{threshold_pt:.2f}")
verdict = "✅ PASS" if delta < threshold_pt else "❌ SKIP"
st.markdown(f"**Verdict:** {verdict}")
else:
st.info(f"Lookback {lookback_h}h non disponibile (gap dati).")
st.divider()
def render() -> None:
st.title("📐 Calibrazione")
st.caption(
@@ -313,6 +441,8 @@ def render() -> None:
specs = _metric_specs(strategy)
_render_adaptive_gate_panel(strategy, records)
for spec in specs:
_render_metric(spec, records)
st.divider()
+60 -13
View File
@@ -476,7 +476,11 @@ def _compute_pl(
cap_pertrade_usd = caps["cap_pertrade_eur"] * eur_to_usd
risk_target = min(caps["kelly"] * capital, cap_pertrade_usd)
n_kelly = int(risk_target // width) if width > 0 else 0
# Max loss per contratto = width credit (NON width). Su un put
# spread incassi `credit` upfront, quindi la perdita massima è la
# larghezza meno il credito (vedi core/sizing_engine.py).
max_loss_per_contract = max(width - credit, 1e-6)
n_kelly = int(risk_target // max_loss_per_contract)
n_per_trade = max(0, min(n_kelly, int(caps["max_n"])))
prob_time_stop = 0.07
@@ -670,27 +674,33 @@ def _render_pl_panel(
"""Pannello P/L: confronto Conservativa vs Aggressiva sugli stessi slider."""
st.subheader("💰 P/L atteso — Conservativa vs Aggressiva")
st.caption(
"Stessi slider, due profili di sizing. **Conservativa** = la "
"golden config attuale (`strategy.yaml`). **Aggressiva** = "
"`strategy.aggressiva.yaml` con cap_per_trade 4×, max contratti "
"4×, 2 posizioni concorrenti. Le regole §2-§9 sono identiche; "
"cambiano SOLO le leve di sizing — quello che il P/L "
"conservativo lascia sul tavolo."
"Slider parametrici: scegli **cap per trade** e **posizioni "
"concorrenti**, il capitale richiesto viene calcolato in "
"automatico (Kelly-binding × concurrency / kelly_fraction). "
"Conservativa e Aggressiva ereditano dai rispettivi yaml SOLO "
"le leve qualitative (width_pct, credit_ratio, kelly_fraction, "
"feature attive); le leve di sizing (cap, concorrenza) le "
"controlli qui sotto."
)
col_a, col_b, col_c, col_d = st.columns(4)
capital = col_a.slider(
"Capitale (USD)", 720, 50_000, value=10_000, step=100
col_a, col_b, col_c, col_d, col_e = st.columns(5)
cap_per_trade_eur = col_a.slider(
"Cap/trade (EUR)", 50, 2000, value=200, step=10,
help="Massima perdita per singolo trade. Bound al rischio.",
)
spot = col_b.slider("Spot ETH (USD)", 1500, 6000, value=3000, step=100)
win_rate = col_c.slider(
concurrency_override = col_b.slider(
"Pos. concorrenti", 1, 10, value=3, step=1,
help="Quanti trade simultanei. Cap aggregato = cap/trade × N.",
)
spot = col_c.slider("Spot ETH (USD)", 1500, 6000, value=3000, step=100)
win_rate = col_d.slider(
"Win rate atteso", 0.50, 0.90, value=0.75, step=0.01,
help=(
"Senza filtri quant ≈ 0.650.70. CON filtri (dealer gamma>0, "
"no macro, IVRV>0, liquidation_*_risk≠high) sale a 0.750.80."
),
)
trades_per_year = col_d.slider(
trades_per_year = col_e.slider(
"Trade / anno (post-filtri)", 20, 200, value=110, step=5,
help=(
"Crypto è 24/7: l'entry cycle gira ogni giorno alle 14:00 UTC "
@@ -702,6 +712,43 @@ def _render_pl_panel(
cons_caps = _profile_caps(strategy_conservativa or strategy_main)
aggr_caps = _profile_caps(strategy_aggressiva)
# Override sizing dai slider (sostituisce le leve cap/trade,
# cap_aggregate, max_concurrent dei yaml).
eur_to_usd = 1.075
cap_pertrade_usd = cap_per_trade_eur * eur_to_usd
cap_aggregate_override = float(cap_per_trade_eur * concurrency_override)
cons_caps = {
**cons_caps,
"cap_pertrade_eur": float(cap_per_trade_eur),
"cap_aggregate_eur": cap_aggregate_override,
"max_concurrent": float(concurrency_override),
}
aggr_caps = {
**aggr_caps,
"cap_pertrade_eur": float(cap_per_trade_eur),
"cap_aggregate_eur": cap_aggregate_override,
"max_concurrent": float(concurrency_override),
}
# Capitale richiesto: Kelly-binding aggregato.
# Per ogni trade slot, kelly × capital ≥ cap_pertrade_usd → capital
# ≥ cap_pertrade_usd / kelly. Per N concorrenti, scala linearmente
# come limite conservativo del notional cumulato.
kelly_cons = cons_caps.get("kelly", 0.13)
kelly_aggr = aggr_caps.get("kelly", 0.13)
capital_cons = int(
cap_pertrade_usd * concurrency_override / max(kelly_cons, 1e-3)
)
capital_aggr = int(
cap_pertrade_usd * concurrency_override / max(kelly_aggr, 1e-3)
)
capital = max(capital_cons, capital_aggr)
cap_col1, cap_col2, cap_col3 = st.columns(3)
cap_col1.metric("📊 Capitale richiesto", f"${capital:,}")
cap_col2.metric(
"💸 Cap aggregato (notional)",
f"${int(cap_pertrade_usd * concurrency_override):,}",
)
cap_col3.metric("🎯 Cap per trade (USD)", f"${int(cap_pertrade_usd):,}")
cons_feats = _detect_features(strategy_conservativa or strategy_main)
aggr_feats = _detect_features(strategy_aggressiva)
+98
View File
@@ -30,6 +30,7 @@ from cerbero_bite.clients.macro import MacroClient
from cerbero_bite.clients.portfolio import PortfolioClient
from cerbero_bite.clients.sentiment import SentimentClient
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.core.combo_builder import ComboProposal, build, select_strikes
from cerbero_bite.core.entry_validator import (
EntryContext,
@@ -315,6 +316,51 @@ async def _build_quotes(
return out
def _select_window_days(entry_cfg: object, n_days: int) -> int:
"""Sceglie la finestra in giorni per il gate adattivo dato n_days
disponibili.
Spec: warmup hard se ``n_days == 0`` 0; finestra ``target_days``
se ``n_days >= target_days``; ``min_days`` se ``n_days >= min_days``;
altrimenti tutta la storia disponibile (capped a ``target_days``).
"""
target = int(getattr(entry_cfg, "iv_minus_rv_window_target_days", 60))
min_days = int(getattr(entry_cfg, "iv_minus_rv_window_min_days", 30))
if n_days < 1:
return 0
if n_days >= target:
return target
if n_days >= min_days:
return min_days
return target # storia parziale: query fino a target, repository ne ritorna n_days
def _audit_threshold(
entry_cfg: object,
iv_rv_history: tuple[Decimal, ...],
n_days: int,
) -> str | None:
"""Soglia P_q rolling effettivamente usata dal gate, per il decisions log."""
if not getattr(entry_cfg, "iv_minus_rv_filter_enabled", False):
return None
if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False):
return str(getattr(entry_cfg, "iv_minus_rv_min", Decimal("0")))
threshold = compute_adaptive_threshold(
history=iv_rv_history,
n_days=n_days,
percentile=entry_cfg.iv_minus_rv_percentile, # type: ignore[attr-defined]
absolute_floor=entry_cfg.iv_minus_rv_min, # type: ignore[attr-defined]
)
return None if threshold is None else str(threshold)
def _audit_window_days(entry_cfg: object, n_days: int) -> int | None:
"""Numero di giorni effettivamente usati dalla finestra rolling."""
if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False):
return None
return _select_window_days(entry_cfg, n_days)
def _max_loss_per_contract_usd(short_strike: Decimal, long_strike: Decimal) -> Decimal:
return (short_strike - long_strike).copy_abs()
@@ -429,6 +475,44 @@ async def run_entry_cycle(
)
# 2. Entry filters
entry_cfg = cfg.entry
asset = cfg.asset.symbol
iv_rv_history: tuple[Decimal, ...] = ()
iv_rv_n_days: int = 0
dvol_24h_ago: Decimal | None = None
if entry_cfg.iv_minus_rv_filter_enabled and entry_cfg.iv_minus_rv_adaptive_enabled:
conn = connect_state(ctx.db_path)
try:
iv_rv_n_days = ctx.repository.count_iv_rv_distinct_days(
conn,
asset=asset,
max_days=entry_cfg.iv_minus_rv_window_target_days,
as_of=when,
)
window_days = _select_window_days(entry_cfg, iv_rv_n_days)
if window_days > 0:
iv_rv_history = tuple(
ctx.repository.iv_rv_values_for_window(
conn,
asset=asset,
window_days=window_days,
as_of=when,
)
)
finally:
conn.close()
if entry_cfg.vol_of_vol_guard_enabled:
conn = connect_state(ctx.db_path)
try:
dvol_24h_ago = ctx.repository.dvol_lookback(
conn,
asset=asset,
reference=when - timedelta(hours=entry_cfg.vol_of_vol_lookback_hours),
)
finally:
conn.close()
entry_ctx = EntryContext(
capital_usd=capital_usd,
dvol_now=snap.dvol,
@@ -439,6 +523,9 @@ async def run_entry_cycle(
dealer_net_gamma=snap.dealer_net_gamma,
iv_minus_rv=snap.iv_minus_rv,
liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high,
iv_rv_history=iv_rv_history,
iv_rv_n_days=iv_rv_n_days,
dvol_24h_ago=dvol_24h_ago,
)
decision = validate_entry(entry_ctx, cfg)
inputs = {
@@ -458,6 +545,17 @@ async def run_entry_cycle(
"iv_minus_rv": (
str(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
),
"iv_rv_history_n": len(iv_rv_history),
"iv_rv_n_days": iv_rv_n_days,
"iv_rv_threshold_used": _audit_threshold(
entry_cfg, iv_rv_history, iv_rv_n_days
),
"iv_rv_window_used_days": _audit_window_days(
entry_cfg, iv_rv_n_days
),
"dvol_24h_ago": (
str(dvol_24h_ago) if dvol_24h_ago is not None else None
),
}
}
if not decision.accepted:
+3 -4
View File
@@ -222,7 +222,7 @@ class Orchestrator:
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
option_chain_assets: tuple[str, ...] = ("ETH", "BTC"),
backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler:
@@ -290,9 +290,8 @@ class Orchestrator:
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
for asset in option_chain_assets:
await collect_option_chain_snapshot(self._ctx, asset=asset)
await _safe("option_chain_snapshot", _do)
@@ -1,10 +1,10 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
-- 0005_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
-- Snapshot della option chain Deribit prelevata ogni 15 minuti (stesso
-- scheduler di market_snapshots, cron */15) per ogni strike entro ±30%
-- dallo spot e per ogni scadenza nella finestra 14-28 DTE. Dato di
-- base per il backtest non-stilizzato e per calibrare empiricamente
-- lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
+106 -1
View File
@@ -13,7 +13,7 @@ Decimals are stored as TEXT to preserve precision (see
from __future__ import annotations
import sqlite3
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Any
from uuid import UUID
@@ -408,6 +408,111 @@ class Repository:
).fetchall()
return [_row_to_market_snapshot(r) for r in rows]
def count_iv_rv_distinct_days(
self,
conn: sqlite3.Connection,
*,
asset: str,
max_days: int,
as_of: datetime | None = None,
) -> int:
"""Numero di giorni di calendario distinti coperti da IV-RV validi.
Esclude righe con ``fetch_ok=0`` o ``iv_minus_rv IS NULL``.
Usato dal caller del gate adattivo per decidere la finestra
(warmup hard / min_days / target_days).
Args:
as_of: Reference time for the rolling window. Defaults to
``datetime.now(UTC)``.
"""
if max_days <= 0:
raise ValueError(f"max_days must be positive, got {max_days}")
ref = as_of if as_of is not None else datetime.now(UTC)
if ref.tzinfo is None:
raise ValueError("as_of must be timezone-aware")
cutoff = ref - timedelta(days=max_days)
row = conn.execute(
"SELECT COUNT(DISTINCT substr(timestamp, 1, 10)) AS n "
"FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND iv_minus_rv IS NOT NULL "
" AND timestamp >= ?",
(asset, _enc_dt(cutoff)),
).fetchone()
return int(row["n"]) if row is not None else 0
def iv_rv_values_for_window(
self,
conn: sqlite3.Connection,
*,
asset: str,
window_days: int,
as_of: datetime | None = None,
) -> list[Decimal]:
"""Valori IV-RV ordinati ASC su ``[as_of - window_days, as_of]``.
Esclude righe con ``fetch_ok=0`` o ``iv_minus_rv IS NULL``.
Tutti i record validi della finestra concorrono come singoli
contributi alla statistica del percentile, indipendentemente
dalla cadenza con cui sono stati raccolti (tick live vs backfill
daily).
"""
if window_days <= 0:
raise ValueError(f"window_days must be positive, got {window_days}")
ref = as_of if as_of is not None else datetime.now(UTC)
if ref.tzinfo is None:
raise ValueError("as_of must be timezone-aware")
cutoff = ref - timedelta(days=window_days)
rows = conn.execute(
"SELECT iv_minus_rv FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND iv_minus_rv IS NOT NULL "
" AND timestamp >= ? "
"ORDER BY timestamp ASC",
(asset, _enc_dt(cutoff)),
).fetchall()
return [Decimal(str(r["iv_minus_rv"])) for r in rows]
def dvol_lookback(
self,
conn: sqlite3.Connection,
*,
asset: str,
reference: datetime,
tolerance_minutes: int = 15,
) -> Decimal | None:
"""DVOL al tick più vicino a `reference`, entro ±tolerance_minutes.
Ritorna ``None`` se non esiste un tick valido (``fetch_ok=1``,
``dvol IS NOT NULL``) entro la tolerance. Usato dal Vol-of-Vol
guard per stimare DVOL N ore fa.
"""
if reference.tzinfo is None:
raise ValueError("reference must be timezone-aware")
ref_lo = (reference - timedelta(minutes=tolerance_minutes)).astimezone(UTC)
ref_hi = (reference + timedelta(minutes=tolerance_minutes)).astimezone(UTC)
row = conn.execute(
"SELECT dvol, timestamp FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND dvol IS NOT NULL "
" AND timestamp >= ? "
" AND timestamp <= ? "
"ORDER BY ABS(julianday(timestamp) - julianday(?)) ASC LIMIT 1",
(
asset,
_enc_dt(ref_lo),
_enc_dt(ref_hi),
_enc_dt(reference),
),
).fetchone()
if row is None:
return None
return Decimal(str(row["dvol"]))
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
+16 -3
View File
@@ -29,7 +29,7 @@
# di cosa otterresti DOPO quel lavoro di codice.
config_version: "1.4.0-aggressiva"
config_hash: "7a39214a7efd2861d22d465f6caf758fec84598775c3f01d922782d7f6f337b0"
config_hash: "7fa9b0be5b56517293421bc19838b700da595725360fe018a1be13b802dea859"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -65,9 +65,22 @@ entry:
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9) — abilitato a 3 pt vol per profilo aggressivo.
iv_minus_rv_min: "3"
# IV richness gate (§2.9). In Aggressiva il gate è in modalità
# adattiva: la soglia è il P25 rolling sui market_snapshots
# (warmup: usa la storia disponibile finché < 30g, poi finestra 30g
# fino a 60g, poi fissa 60g). `iv_minus_rv_min: 0` = floor zero,
# lascia decidere al P25.
iv_minus_rv_filter_enabled: true
iv_minus_rv_adaptive_enabled: true
iv_minus_rv_min: "0"
iv_minus_rv_percentile: "0.25"
iv_minus_rv_window_target_days: 60
iv_minus_rv_window_min_days: 30
# Vol-of-Vol guard: blocca entry su shift bruschi DVOL.
vol_of_vol_guard_enabled: true
vol_of_vol_threshold_pt: "5"
vol_of_vol_lookback_hours: 24
structure:
@@ -0,0 +1,136 @@
"""End-to-end test del gate IV-RV adattivo + Vol-of-Vol guard via Repository.
Verifica che la nuova API distinct-days componga correttamente repository
helpers + ``compute_adaptive_threshold``.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.state.db import connect, run_migrations
from cerbero_bite.state.models import MarketSnapshotRecord
from cerbero_bite.state.repository import Repository
def _seed_history(
conn,
repo: Repository,
asset: str,
base: datetime,
n_ticks: int,
iv_rv_value: Decimal,
dvol_value: Decimal,
) -> None:
for i in range(n_ticks):
repo.record_market_snapshot(
conn,
MarketSnapshotRecord(
timestamp=base + timedelta(minutes=15 * i),
asset=asset,
spot=Decimal("2000"),
dvol=dvol_value,
realized_vol_30d=Decimal("48"),
iv_minus_rv=iv_rv_value,
funding_perp_annualized=Decimal("0"),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("0"),
gamma_flip_level=None,
oi_delta_pct_4h=None,
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=None,
fetch_ok=True,
fetch_errors_json=None,
),
)
conn.commit()
@pytest.fixture
def db_30d(tmp_path):
"""30 giorni di storia con IV-RV bimodale: prima metà 1.0, seconda metà 5.0."""
db_path = tmp_path / "e2e.sqlite"
conn = connect(str(db_path))
run_migrations(conn)
repo = Repository()
base = datetime(2026, 4, 1, 0, 0, tzinfo=UTC)
_seed_history(conn, repo, "ETH", base, 1440, Decimal("1.0"), Decimal("50"))
_seed_history(
conn,
repo,
"ETH",
base + timedelta(days=15),
1440,
Decimal("5.0"),
Decimal("50"),
)
return conn, repo
def test_distinct_days_count_matches_calendar_days(db_30d) -> None:
"""30 giorni di calendario seedati → COUNT DISTINCT = 30."""
conn, repo = db_30d
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 1, 0, 0, tzinfo=UTC),
)
assert n == 30
def test_window_values_returned_for_full_history(db_30d) -> None:
conn, repo = db_30d
values = repo.iv_rv_values_for_window(
conn,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 1, 0, 0, tzinfo=UTC),
)
assert len(values) == 2880
# Bimodale: 1440 valori 1.0 e 1440 valori 5.0
assert sum(1 for v in values if v == Decimal("1.0")) == 1440
assert sum(1 for v in values if v == Decimal("5.0")) == 1440
def test_p25_of_bimodal_history_picks_low_regime(db_30d) -> None:
"""Comporre repository + adaptive_threshold come fa entry_cycle."""
conn, repo = db_30d
as_of = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
n_days = repo.count_iv_rv_distinct_days(
conn, asset="ETH", max_days=60, as_of=as_of
)
values = repo.iv_rv_values_for_window(
conn, asset="ETH", window_days=60, as_of=as_of
)
threshold = compute_adaptive_threshold(
history=values,
n_days=n_days,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# P25 di 2880 valori bimodali: 1440 ×1.0, 1440 ×5.0 → soglia = 1.0
assert threshold == Decimal("1.0")
def test_dvol_lookback_within_tolerance(db_30d) -> None:
conn, repo = db_30d
base = datetime(2026, 4, 1, 0, 0, tzinfo=UTC)
out = repo.dvol_lookback(conn, asset="ETH", reference=base + timedelta(hours=24))
assert out == Decimal("50")
def test_dvol_lookback_returns_none_outside_tolerance(db_30d) -> None:
conn, repo = db_30d
out = repo.dvol_lookback(
conn,
asset="ETH",
reference=datetime(2025, 1, 1, tzinfo=UTC),
tolerance_minutes=15,
)
assert out is None
+170
View File
@@ -0,0 +1,170 @@
"""TDD per :mod:`cerbero_bite.core.adaptive_threshold`.
Spec: ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``.
La funzione è una pura statistica: riceve già la finestra di valori scelta
dal caller e il numero di giorni distinti coperti dalla storia disponibile
(``n_days``), e restituisce ``max(percentile, floor)`` o ``None`` durante
il warmup hard. La selezione della finestra (target_days vs min_days vs
intera storia) è responsabilità del caller (repository + entry_cycle).
"""
from __future__ import annotations
from decimal import Decimal
import pytest
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
# ---------------------------------------------------------------------------
# Warmup hard: nessun giorno disponibile
# ---------------------------------------------------------------------------
def test_n_days_zero_returns_none() -> None:
"""Storia vuota o nessun giorno coperto → warmup hard."""
out = compute_adaptive_threshold(
history=[],
n_days=0,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
def test_n_days_zero_with_values_still_returns_none() -> None:
"""Difensivo: se il caller passa n_days=0 ma valori non vuoti, warmup
hard vince comunque (gate disabilitato)."""
out = compute_adaptive_threshold(
history=[Decimal("3")] * 10,
n_days=0,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
def test_empty_history_with_positive_n_days_returns_none() -> None:
"""Difensivo: history vuota anche con n_days>0 → None."""
out = compute_adaptive_threshold(
history=[],
n_days=5,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
# ---------------------------------------------------------------------------
# Calcolo percentile sulla finestra ricevuta
# ---------------------------------------------------------------------------
def test_n_days_one_returns_percentile_of_history() -> None:
"""Singolo giorno con tick a 15 min (96 valori): P25 standard."""
history = [Decimal(i) / Decimal("10") for i in range(96)] # 0.0..9.5
out = compute_adaptive_threshold(
history=history,
n_days=1,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# P25 di [0.0..9.5] passo 0.1 con method='linear': k=23.75, val ≈ 2.375
assert out is not None
assert Decimal("2.3") < out < Decimal("2.5")
def test_window_chosen_by_caller_is_used_verbatim() -> None:
"""La funzione NON fa slicing: usa esattamente la history ricevuta."""
history = [Decimal(i) for i in range(1, 201)] # 1..200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.5"),
absolute_floor=Decimal("0"),
)
# P50 di [1..200] = (200+1)/2 = 100.5
assert out is not None
assert Decimal("100") <= out <= Decimal("101")
def test_mixed_cadence_window_no_special_treatment() -> None:
"""Mix di valori (es. backfill daily + tick live) trattato come una
distribuzione qualunque: il caller ha già scelto la finestra; la
funzione calcola il percentile sui valori ricevuti uno-a-uno."""
# 30 valori "daily backfill" (uno per giorno) + 96 tick "live"
history = [Decimal("5")] * 30 + [Decimal("8")] * 96
out = compute_adaptive_threshold(
history=history,
n_days=31,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# Sorted: 30 ×5, 96 ×8. P25 a indice 0.25*125 = 31.25 → tra 5 e 8.
# NumPy linear: sorted_v[31]=8, sorted_v[32]=8 → 8.
# Verifica solo l'estremo superiore della famiglia di valori sorted.
assert out is not None
assert out in (Decimal("5"), Decimal("8"))
# ---------------------------------------------------------------------------
# Floor binding
# ---------------------------------------------------------------------------
def test_floor_binding_overrides_low_percentile() -> None:
history = [Decimal("0.5")] * 200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.25"),
absolute_floor=Decimal("3"),
)
assert out == Decimal("3")
def test_floor_not_binding_returns_percentile() -> None:
history = [Decimal("5")] * 200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out == Decimal("5")
# ---------------------------------------------------------------------------
# Input validation
# ---------------------------------------------------------------------------
def test_invalid_percentile_above_one_raises() -> None:
with pytest.raises(ValueError, match="percentile must be in"):
compute_adaptive_threshold(
history=[Decimal("1")] * 200,
n_days=10,
percentile=Decimal("1.5"),
absolute_floor=Decimal("0"),
)
def test_invalid_percentile_negative_raises() -> None:
with pytest.raises(ValueError, match="percentile must be in"):
compute_adaptive_threshold(
history=[Decimal("1")] * 200,
n_days=10,
percentile=Decimal("-0.1"),
absolute_floor=Decimal("0"),
)
def test_invalid_negative_n_days_raises() -> None:
with pytest.raises(ValueError, match="n_days must be >= 0"):
compute_adaptive_threshold(
history=[Decimal("1")] * 10,
n_days=-1,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
+176
View File
@@ -0,0 +1,176 @@
"""TDD per il backfill IV-RV (``scripts/backfill_iv_rv.py``).
Testa solo la parte pura (compute RV + assemblaggio record). I/O HTTP
e SQLite restano nel main del CLI: testati manualmente al deploy.
"""
from __future__ import annotations
import importlib.util
import sys
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[2]
def _load_backfill_module() -> object:
"""Load scripts/backfill_iv_rv.py as a module without polluting sys.path."""
spec = importlib.util.spec_from_file_location(
"_cerbero_bite_backfill_iv_rv", REPO_ROOT / "scripts" / "backfill_iv_rv.py"
)
if spec is None or spec.loader is None:
raise RuntimeError("cannot load backfill_iv_rv module")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
@pytest.fixture(scope="module")
def mod():
return _load_backfill_module()
# ---------------------------------------------------------------------------
# compute_rv30d_annualized
# ---------------------------------------------------------------------------
def test_constant_prices_yield_zero_rv(mod) -> None:
closes = [Decimal("100")] * 31 # 30 returns of log(1)=0
rv = mod.compute_rv30d_annualized(closes)
assert rv == Decimal("0")
def test_too_few_closes_raises(mod) -> None:
with pytest.raises(ValueError, match="need at least 31 closes"):
mod.compute_rv30d_annualized([Decimal("100")] * 10)
def test_monotonic_growth_yields_low_rv(mod) -> None:
"""Crescita +1% ogni giorno: log returns costanti → stdev = 0 → RV = 0."""
closes = [Decimal("100") * (Decimal("1.01") ** i) for i in range(31)]
rv = mod.compute_rv30d_annualized(closes)
# Tutti i log returns sono identici (log 1.01) → stdev zero
assert rv == Decimal("0")
def test_alternating_returns_yield_known_rv(mod) -> None:
"""Returns alternati ±2% ogni giorno: stdev nota."""
# closes: 100, 102, 100, 102, ... (ricorda: returns = log(c[i]/c[i-1]))
closes = [Decimal("100")] + [
Decimal("102") if i % 2 == 0 else Decimal("100") for i in range(30)
]
rv = mod.compute_rv30d_annualized(closes)
# |log return| ~ 0.0198, stdev ≈ 0.0198 (alternano segno con media ≈ 0)
# Annualized = 0.0198 * sqrt(365) * 100 ≈ 37.86 vol pts
assert Decimal("36") <= rv <= Decimal("40")
# ---------------------------------------------------------------------------
# build_backfill_records
# ---------------------------------------------------------------------------
def test_build_records_skips_days_without_30d_history(mod) -> None:
"""Per i primi 30 giorni della serie spot, RV30d non è calcolabile."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=40),
)
# Per ogni record day, servono 30 giorni precedenti di spot.
# Lo spot più vecchio è today-44; quindi il primo giorno computabile
# è today-44+30 = today-14. Cap a oldest_day=today-40 → window day-14..day-0.
assert len(records) == 15 # day-14..day-0 incluso
for r in records:
assert r.asset == "ETH"
assert r.fetch_ok is True
assert r.iv_minus_rv == Decimal("50") # rv=0 con prezzi costanti
assert r.timestamp.tzinfo == UTC
assert r.timestamp.hour == 12
def test_build_records_filters_to_requested_window(mod) -> None:
"""oldest_day applicato come cutoff inferiore inclusivo."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="BTC",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
# day-5..day-0 → 6 record
assert len(records) == 6
record_days = {r.timestamp.date() for r in records}
assert record_days == {today - timedelta(days=i) for i in range(6)}
def test_build_records_skips_days_missing_dvol(mod) -> None:
"""Se manca DVOL per un giorno della finestra, lo si salta (no record)."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {
d.isoformat(): Decimal("50")
for d in days
if d != today - timedelta(days=2)
}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
record_days = {r.timestamp.date() for r in records}
assert today - timedelta(days=2) not in record_days
assert len(records) == 5
def test_build_records_skips_days_missing_spot(mod) -> None:
"""Se manca lo spot del giorno target, no record per quel giorno."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {
d.isoformat(): Decimal("100")
for d in days
if d != today - timedelta(days=2)
}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
record_days = {r.timestamp.date() for r in records}
assert today - timedelta(days=2) not in record_days
def test_build_records_uses_noon_utc_timestamp(mod) -> None:
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(35)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today,
)
assert len(records) == 1
assert records[0].timestamp == datetime(2026, 5, 10, 12, 0, tzinfo=UTC)
+185
View File
@@ -351,3 +351,188 @@ def test_bias_zero_division_safe(cfg: StrategyConfig) -> None:
# eth_30d_ago == 0 must not crash; treat as no-bias (neutral)
ctx = _trend(eth_now="3000", eth_30d_ago="0", funding_cross="0", dvol="60", adx="15")
assert compute_bias(ctx, cfg) is None
# ---------------------------------------------------------------------------
# IV-RV adaptive gate
# ---------------------------------------------------------------------------
def _adaptive_cfg(**entry_overrides: object) -> StrategyConfig:
"""Golden config con gate adattivo abilitato di default per test."""
base_entry: dict[str, object] = {
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_adaptive_enabled": True,
"iv_minus_rv_min": Decimal("0"),
"iv_minus_rv_percentile": Decimal("0.25"),
"iv_minus_rv_window_target_days": 60,
"iv_minus_rv_window_min_days": 30,
}
base_entry.update(entry_overrides)
return golden_config(entry=base_entry)
def test_adaptive_pass_when_iv_rv_above_p25() -> None:
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 201))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("80"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is True
assert not any("IV richness" in r for r in decision.reasons)
def test_adaptive_blocks_when_iv_rv_below_p25() -> None:
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 201))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("20"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is False
assert any("IV richness" in r and "rolling" in r for r in decision.reasons)
def test_adaptive_with_n_days_zero_passes_warmup() -> None:
"""Warmup hard: nessun giorno coperto → gate skip (fail-open)."""
cfg = _adaptive_cfg()
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("0.1"),
iv_rv_history=(),
iv_rv_n_days=0,
),
cfg,
)
assert decision.accepted is True
def test_adaptive_with_floor_floor_binds_when_p25_low() -> None:
cfg = _adaptive_cfg(iv_minus_rv_min=Decimal("3"))
history = tuple(Decimal("0.5") for _ in range(200))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("1"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is False
assert any("IV richness" in r for r in decision.reasons)
def test_legacy_static_gate_still_works_when_adaptive_disabled() -> None:
cfg = golden_config(entry={
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_adaptive_enabled": False,
"iv_minus_rv_min": Decimal("3"),
})
decision = validate_entry(
_good_ctx(iv_minus_rv=Decimal("2"), iv_rv_history=(), iv_rv_n_days=0),
cfg,
)
assert decision.accepted is False
assert any("IV richness below floor" in r for r in decision.reasons)
def test_iv_minus_rv_none_skips_gate_in_both_modes() -> None:
cfg = _adaptive_cfg()
decision = validate_entry(
_good_ctx(
iv_minus_rv=None,
iv_rv_history=tuple(Decimal(i) for i in range(1, 201)),
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is True
def test_adaptive_with_n_days_one_uses_history_for_percentile() -> None:
"""Singolo giorno disponibile (cadenza qualunque): gate attivo,
soglia = P25 della finestra ricevuta. Dimostra che il warmup hard
finisce a n_days=1 (non 30 come nella vecchia implementazione)."""
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 101)) # 1..100, P25 = 25.75
# IV-RV sopra P25 → pass
pass_decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("30"),
iv_rv_history=history,
iv_rv_n_days=1,
),
cfg,
)
assert pass_decision.accepted is True
# IV-RV sotto P25 → block
block_decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("10"),
iv_rv_history=history,
iv_rv_n_days=1,
),
cfg,
)
assert block_decision.accepted is False
assert any("IV richness" in r and "rolling" in r for r in block_decision.reasons)
# ---------------------------------------------------------------------------
# Vol-of-Vol guard
# ---------------------------------------------------------------------------
def _vov_cfg(threshold: Decimal = Decimal("5")) -> StrategyConfig:
return golden_config(entry={
"vol_of_vol_guard_enabled": True,
"vol_of_vol_threshold_pt": threshold,
"vol_of_vol_lookback_hours": 24,
})
def test_vov_guard_blocks_on_large_dvol_shift() -> None:
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("56"), dvol_24h_ago=Decimal("50")), cfg
)
assert decision.accepted is False
assert any("DVOL shifted" in r for r in decision.reasons)
def test_vov_guard_passes_on_small_dvol_shift() -> None:
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("52"), dvol_24h_ago=Decimal("50")), cfg
)
assert decision.accepted is True
def test_vov_guard_passes_when_lookback_missing() -> None:
"""fail-open su gap dati: se dvol_24h_ago=None il guard non scatta."""
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("99"), dvol_24h_ago=None), cfg
)
# dvol_now=99 sarebbe oltre dvol_max=90; testiamo solo l'effetto VoV
# consultando le reasons (dvol_now potrebbe avere altre reason ma non
# quella VoV).
assert not any("DVOL shifted" in r for r in decision.reasons)
def test_vov_guard_disabled_does_nothing() -> None:
cfg = golden_config(entry={"vol_of_vol_guard_enabled": False})
decision = validate_entry(
_good_ctx(dvol_now=Decimal("55"), dvol_24h_ago=Decimal("50")), cfg
)
# Nessuna reason VoV (il delta=5 sarebbe oltre soglia se attivo)
assert not any("DVOL shifted" in r for r in decision.reasons)
+395
View File
@@ -0,0 +1,395 @@
"""TDD per i nuovi helper repository del gate IV-RV adattivo.
Spec: distinct-days policy il caller (entry_cycle) interroga il
numero di giorni coperti separatamente dai valori della finestra,
così che cadenze miste (tick live 15min + backfill daily) restino
statisticamente coerenti.
Helpers:
* ``count_iv_rv_distinct_days(asset, max_days, as_of) -> int``
* ``iv_rv_values_for_window(asset, window_days, as_of) -> list[Decimal]``
* ``dvol_lookback`` (invariato, riusato dal Vol-of-Vol guard)
"""
from __future__ import annotations
import sqlite3
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.state.db import connect, run_migrations
from cerbero_bite.state.models import MarketSnapshotRecord
from cerbero_bite.state.repository import Repository
def _snap(
*,
ts: datetime,
asset: str = "ETH",
iv_minus_rv: Decimal | None = Decimal("2"),
fetch_ok: bool = True,
dvol: Decimal = Decimal("50"),
) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=ts,
asset=asset,
spot=Decimal("2000"),
dvol=dvol,
realized_vol_30d=Decimal("48"),
iv_minus_rv=iv_minus_rv,
funding_perp_annualized=Decimal("0"),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("0"),
gamma_flip_level=None,
oi_delta_pct_4h=None,
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=None,
fetch_ok=fetch_ok,
fetch_errors_json=None,
)
@pytest.fixture
def db_one_day(tmp_path) -> sqlite3.Connection:
"""SQLite temp con 96 tick ETH a 15min (1 giorno) e fetch_ok=1."""
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
for i in range(96):
repo.record_market_snapshot(
conn,
_snap(
ts=base + timedelta(minutes=15 * i),
iv_minus_rv=Decimal("2") + Decimal(i) / Decimal("100"),
dvol=Decimal("50") + Decimal(i) / Decimal("10"),
),
)
conn.commit()
return conn
@pytest.fixture
def db_three_days_mixed(tmp_path) -> sqlite3.Connection:
"""SQLite temp con 3 giorni ETH:
- day1 (2026-05-01): 96 tick @ 15min, valori 1..96
- day2 (2026-05-02): 1 record daily a 12:00, valore 100 (backfill style)
- day3 (2026-05-03): 4 tick orari, valori 200, 201, 202, 203
Più 1 giorno BTC isolato (per cross-asset isolation).
"""
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
day1 = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
for i in range(96):
repo.record_market_snapshot(
conn,
_snap(
ts=day1 + timedelta(minutes=15 * i),
iv_minus_rv=Decimal(i + 1),
),
)
repo.record_market_snapshot(
conn,
_snap(ts=datetime(2026, 5, 2, 12, 0, tzinfo=UTC), iv_minus_rv=Decimal("100")),
)
day3 = datetime(2026, 5, 3, 0, 0, tzinfo=UTC)
for i in range(4):
repo.record_market_snapshot(
conn,
_snap(
ts=day3 + timedelta(hours=i),
iv_minus_rv=Decimal(200 + i),
),
)
repo.record_market_snapshot(
conn,
_snap(
ts=datetime(2026, 4, 30, 0, 0, tzinfo=UTC),
asset="BTC",
iv_minus_rv=Decimal("999"),
),
)
conn.commit()
return conn
# ---------------------------------------------------------------------------
# count_iv_rv_distinct_days
# ---------------------------------------------------------------------------
def test_count_distinct_days_returns_one_for_single_day_history(db_one_day) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 1
def test_count_distinct_days_returns_zero_for_other_asset(db_one_day) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_one_day,
asset="BTC",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_counts_unique_calendar_days(
db_three_days_mixed,
) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n == 3
def test_count_distinct_days_excludes_other_assets(
db_three_days_mixed,
) -> None:
repo = Repository()
n_btc = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="BTC",
max_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n_btc == 1
def test_count_distinct_days_respects_window_cutoff(
db_three_days_mixed,
) -> None:
"""max_days=1 da as_of=2026-05-04 → cutoff=2026-05-03 → solo day3."""
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="ETH",
max_days=1,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n == 1
def test_count_distinct_days_excludes_null_iv_rv(tmp_path) -> None:
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
repo.record_market_snapshot(
conn,
_snap(ts=datetime(2026, 5, 1, 12, 0, tzinfo=UTC), iv_minus_rv=None),
)
conn.commit()
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_excludes_fetch_failed(tmp_path) -> None:
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
repo.record_market_snapshot(
conn,
_snap(
ts=datetime(2026, 5, 1, 12, 0, tzinfo=UTC),
iv_minus_rv=Decimal("99"),
fetch_ok=False,
),
)
conn.commit()
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_rejects_naive_as_of(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0), # naive
)
def test_count_distinct_days_rejects_non_positive_max_days(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="max_days must be positive"):
repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=0,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
# ---------------------------------------------------------------------------
# iv_rv_values_for_window
# ---------------------------------------------------------------------------
def test_values_for_window_returns_ordered_asc(db_one_day) -> None:
repo = Repository()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert len(values) == 96
assert values == sorted(values)
assert values[0] == Decimal("2.00")
def test_values_for_window_filters_other_asset(db_one_day) -> None:
repo = Repository()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="BTC",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert values == []
def test_values_for_window_skips_null(db_one_day) -> None:
repo = Repository()
repo.record_market_snapshot(
db_one_day,
_snap(ts=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), iv_minus_rv=None),
)
db_one_day.commit()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 3, 0, 0, tzinfo=UTC),
)
assert len(values) == 96
def test_values_for_window_skips_fetch_failed(db_one_day) -> None:
repo = Repository()
repo.record_market_snapshot(
db_one_day,
_snap(
ts=datetime(2026, 5, 3, 0, 0, tzinfo=UTC),
iv_minus_rv=Decimal("99"),
fetch_ok=False,
),
)
db_one_day.commit()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert Decimal("99") not in values
def test_values_for_window_respects_window_cutoff(
db_three_days_mixed,
) -> None:
"""window_days=1 da as_of=2026-05-04 → solo day3 (4 valori 200..203)."""
repo = Repository()
values = repo.iv_rv_values_for_window(
db_three_days_mixed,
asset="ETH",
window_days=1,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert values == [Decimal(200 + i) for i in range(4)]
def test_values_for_window_full_window(db_three_days_mixed) -> None:
"""window_days=60: tutti i valori dei 3 giorni (96 + 1 + 4 = 101)."""
repo = Repository()
values = repo.iv_rv_values_for_window(
db_three_days_mixed,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert len(values) == 101
def test_values_for_window_rejects_naive_as_of(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0),
)
def test_values_for_window_rejects_non_positive_window(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="window_days must be positive"):
repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=0,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
# ---------------------------------------------------------------------------
# dvol_lookback (regression — invariato dopo refactor)
# ---------------------------------------------------------------------------
def test_dvol_lookback_returns_closest_tick(db_one_day) -> None:
repo = Repository()
base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
target = base + timedelta(hours=12)
out = repo.dvol_lookback(
db_one_day, asset="ETH", reference=target, tolerance_minutes=15
)
# i=48 → dvol = 50 + 4.8 = 54.8
assert out == Decimal("54.8")
def test_dvol_lookback_returns_none_when_gap(db_one_day) -> None:
repo = Repository()
target = datetime(2025, 1, 1, 0, 0, tzinfo=UTC)
out = repo.dvol_lookback(
db_one_day, asset="ETH", reference=target, tolerance_minutes=15
)
assert out is None
def test_dvol_lookback_rejects_naive_reference(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.dvol_lookback(
db_one_day,
asset="ETH",
reference=datetime(2026, 5, 1, 12, 0),
)