2 Commits

Author SHA1 Message Date
root c4cd2986a4 feat(gui): aggiunge max drawdown atteso (P99) e tail/gap nei profili
Due metriche per ciascun profilo nel pannello P/L:

- **Max DD attesa (P99)**: streak di stop consecutivi con probabilità
  ≤ 1% nell'anno (union-bound: N_trade × p_loss^N ≤ 0.01) ×
  perdita stop × contratti × posizioni concorrenti.
- **Max DD coda (gap)**: scenario gap notturno in cui il mark salta
  oltre la copertura long PRIMA che lo stop sia eseguibile —
  perdita = larghezza intera meno credito iniziale, su tutte le
  posizioni aperte.

Aggiunge anche colonna "Max DD" nella tabella di sensibilità
win-rate, così si vede immediatamente il trade-off
APR-vs-drawdown al variare del win-rate (da 65% a 82%).

Effetto pratico: a default cap=10k, spot=3000, win=0.75, trades=18:
- Conservativa: APR ≈ +1.8%, Max DD attesa ≈ −2.2% capitale
- Aggressiva: APR ≈ +14%, Max DD attesa ≈ −30% capitale

Numeri che rendono molto più tangibile la frase "drawdown scala con
lo stesso fattore" del §4-ter del documento.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:50:09 +00:00
root 4ab7590745 feat(entry): IV richness gate (§2.9) + golden config bump 1.0.0 → 1.1.0
Aggiunge il filtro a maggior impatto sul win-rate atteso: l'entry
salta se la IV implicita non sta pagando un margine misurabile sopra
la realized vol. La letteratura short-vol systematic indica che
l'edge sostenibile della strategia esiste solo quando IV30g − RV30g
supera una soglia di alcuni punti vol; senza questo gate il selling
vol nudo è strutturalmente neutro a win-rate 70-72%.

Implementazione end-to-end:

- `EntryConfig`: due nuovi campi `iv_minus_rv_min` e
  `iv_minus_rv_filter_enabled`, con default `0` / `false` per non
  rompere setup pre-calibrazione.
- `validate_entry`: §2.9 hard gate che blocca l'entry se
  `iv_minus_rv < iv_minus_rv_min` (skip silenzioso quando il dato è
  `None`, coerente con il pattern §2.8 dei filtri quant).
- `entry_cycle._gather_snapshot`: nuovo `_safe_iv_minus_rv` che
  legge `deribit.realized_vol("ETH")["iv_minus_rv_30d"]` in
  best-effort e lo propaga via `_MarketSnapshot.iv_minus_rv` →
  `EntryContext.iv_minus_rv` → audit `inputs.snapshot.iv_minus_rv`.
- `tests/unit/test_entry_validator.py`: 5 nuovi casi (default
  permissivo, gate sotto/sopra/uguale soglia, dato mancante).
- `tests/integration/test_entry_cycle.py`: stub `get_realized_vol`
  nel mock helper così tutti gli scenari di happy/edge path
  continuano a passare.

Configurazione di profili coerente con la disciplina:

- `strategy.yaml` (golden 1.1.0) e `strategy.conservativa.yaml`:
  gate `enabled=false, min=0`. Manteniamo i lunedì pre-calibrazione
  per accumulare dati sulla distribuzione di `iv_minus_rv`.
- `strategy.aggressiva.yaml` (1.1.0-aggressiva): gate
  `enabled=true, min=3`. Coerente con la filosofia del profilo —
  size più grande pretende win-rate più alto. La soglia 3 è
  conservativa; la documentazione raccomanda 5 dopo 4-8 settimane di
  calibrazione.

Doc + GUI:

- `docs/13-strategia-spiegata.md` §4-quater: spiega gate, parametri,
  default per profilo, effetto atteso sul P/L (trade/anno scendono
  ma E[trade] sale → APR cresce comunque), roadmap di hardening
  (soglia adattiva, vol-of-vol guard, multi-asset).
- pagina `📚 Strategia`: la riga "IV − RV" passa da informativa a
  pass/fail reale; mostra "filtro DISABILITATO (info-only)" quando
  spento, / contro la soglia di config quando acceso.

Bump versioni e hash di tutti e tre i file YAML
(`config_version: 1.1.0`, hash ricalcolato). Test pinning aggiornato
(`test_load_repo_strategy_yaml`).

Suite: 410 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:32:21 +00:00
19 changed files with 321 additions and 804 deletions
+63
View File
@@ -532,6 +532,69 @@ quella che il sistema parte ad eseguire.
---
## 4-quater. IV richness gate (§2.9): il filtro che alza il win-rate
Il filtro a maggior impatto sull'edge è anche il più semplice da
descrivere: **non vendere vol quando la IV non sta pagando un margine
misurabile sopra la RV**. È implementato come gate hard nel
`validate_entry`:
```
if iv_minus_rv_filter_enabled and iv_minus_rv < iv_minus_rv_min:
skip entry
```
con due parametri in `entry:` di `strategy.yaml`:
| Parametro | Default | Effetto |
|---|---|---|
| `iv_minus_rv_filter_enabled` | `false` (golden) / `true` (aggressiva) | Master switch del gate |
| `iv_minus_rv_min` | `0` (golden) / `3` (aggressiva) | Soglia in punti vol che IV30g RV30g deve eccedere |
Il dato è già raccolto in `market_snapshots.iv_minus_rv` ogni 15
minuti. Il gate consulta l'ultimo tick disponibile al momento
dell'entry cycle (non un percentile rolling — quello è il prossimo
step di calibrazione, vedi §4-quinquies in roadmap).
**Profili di default ragionati.**
- **Conservativa / golden config**: `enabled=false, min=0`. Tutti i
setup passano questo gate, anche con IV-RV negativa. Motivo: nei
primi 8 turni di lunedì non si hanno abbastanza tick per stabilire
che soglia ha senso nel proprio regime. Lasciamo la pagina
`📐 Calibrazione` mostrare la distribuzione e poi alziamo
manualmente.
- **Aggressiva**: `enabled=true, min=3`. Il profilo aggressivo già di
suo prende size più grande; pretendere `IV-RV ≥ 3 vol points` come
prerequisito è coerente — se stai betting più grosso, vuoi
win-rate più alto. La soglia 3 è conservativa; la letteratura
short-vol systematic suggerisce 5 dopo calibrazione.
**Cosa cambia nel P/L atteso quando attivi il gate.**
Il gate **riduce** il numero di entry (saltiamo settimane con premio
magro) ma **alza** la qualità di quelle che passano (premio ricco =
win-rate empirico più alto). Effetto netto sul P/L annuo:
- Trade/anno: 18 → 12-14 (skip più aggressivo)
- Win-rate atteso: 0.72 → 0.78-0.80
- E[trade] netto: +0.6 USD → +4-6 USD per contratto
- **P/L annuo proiettato sale anche se i trade scendono**, perché
ogni trade ha edge più alto.
La pagina `📚 Strategia` ha lo slider win-rate già coerente con
questa logica: muovi da 0.72 a 0.78 e vedi l'APR scattare.
**Roadmap di hardening (passi successivi al merge di questo PR).**
1. **Soglia adattiva**: sostituire `iv_minus_rv_min: 3` con un valore
calcolato a runtime come `P25 rolling 60d` di `market_snapshots.iv_minus_rv`.
2. **Vol-of-vol guard**: bloccare entry quando `dvol` è cambiato di
≥5 punti nelle ultime 24h, anche se `iv_minus_rv` è alto (regime
instabile).
3. **Multi-asset (ETH+BTC)**: come da §4-ter, sblocca il
moltiplicatore 2× sulle opportunità a parità di filtri.
## 5. Come leggere il dato giorno per giorno
Tre euristiche operative sui campi raccolti:
+1 -236
View File
@@ -13,7 +13,7 @@ import asyncio
import os
import sys
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from datetime import UTC, datetime
from decimal import Decimal
from pathlib import Path
from typing import Any
@@ -812,241 +812,6 @@ def state_inspect(db: Path) -> None:
console.print(table)
@main.group(name="option-chain")
def option_chain() -> None:
"""Strumenti per la catena opzioni storica (`option_chain_snapshots`)."""
@option_chain.command(name="trigger")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--audit",
"audit_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
@click.option(
"--token",
type=str,
default=None,
help="MCP bearer token (override su CERBERO_BITE_MCP_TOKEN).",
)
@click.option("--asset", default="ETH", show_default=True)
def option_chain_trigger(
strategy_path: Path,
db_path: Path,
audit_path: Path,
token: str | None,
asset: str,
) -> None:
"""Esegue UNA volta il collector della catena opzioni e persiste in DB.
Utile per popolare i dati senza aspettare il cron settimanale del
job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline
schedulata.
"""
from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415
from cerbero_bite.runtime.option_chain_snapshot_cycle import ( # noqa: PLC0415
collect_option_chain_snapshot,
)
cfg = load_strategy(strategy_path).config
ctx = build_runtime(
cfg=cfg,
endpoints=load_endpoints(),
token=load_token(value=token),
db_path=db_path,
audit_path=audit_path,
bot_tag=load_bot_tag(),
)
n = asyncio.run(collect_option_chain_snapshot(ctx, asset=asset))
console.print(
f"[green]Persisted {n} option chain quote(s) for {asset}[/green]"
if n > 0
else f"[yellow]No quotes persisted (chain empty or fetch failed)[/yellow]"
)
@option_chain.command(name="analyze")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option("--asset", default="ETH", show_default=True)
@click.option(
"--bias",
type=click.Choice(["bull_put", "bear_call"], case_sensitive=False),
default="bull_put",
show_default=True,
help="Direzione da simulare (il rule engine lo deciderebbe da trend×funding).",
)
def option_chain_analyze(
strategy_path: Path,
db_path: Path,
asset: str,
bias: str,
) -> None:
"""Analizza l'ultimo snapshot di catena salvato.
Per la strategia indicata, simula la selezione strike (delta
target, OTM range, width 4%, credit/width ratio min) e mostra:
* lo strike che il rule engine sceglierebbe come short e long,
* credito atteso, larghezza, rapporto credit/width,
* pass/fail del gate `credit_to_width_ratio_min`.
"""
from cerbero_bite.core.combo_builder import select_strikes # noqa: PLC0415
from cerbero_bite.core.types import OptionQuote # noqa: PLC0415
cfg = load_strategy(strategy_path).config
conn = connect_state(db_path)
try:
repo = Repository()
latest_ts = repo.latest_option_chain_timestamp(conn, asset=asset.upper())
if latest_ts is None:
console.print(
"[red]Nessuno snapshot di catena trovato. Lancia prima "
"`cerbero-bite option-chain trigger`.[/red]"
)
sys.exit(1)
quotes_records = repo.list_option_chain_snapshots(
conn, asset=asset.upper(), start=latest_ts, end=latest_ts,
)
finally:
conn.close()
console.print(
f"[cyan]Snapshot del {latest_ts.isoformat()}{len(quotes_records)} "
f"quote totali[/cyan]"
)
# Costruzione OptionQuote da OptionChainQuoteRecord per riusare select_strikes.
quotes: list[OptionQuote] = []
for q in quotes_records:
if q.bid is None or q.ask is None or q.mid is None or q.delta is None:
continue
quotes.append(
OptionQuote(
instrument=q.instrument_name,
strike=q.strike,
expiry=q.expiry,
option_type=q.option_type,
bid=q.bid,
ask=q.ask,
mid=q.mid,
delta=q.delta,
gamma=q.gamma or Decimal("0"),
theta=q.theta or Decimal("0"),
vega=q.vega or Decimal("0"),
open_interest=q.open_interest or 0,
volume_24h=q.volume_24h or 0,
book_depth_top3=q.book_depth_top3 or 0,
)
)
if not quotes:
console.print("[red]Nessun quote completo per la simulazione.[/red]")
sys.exit(1)
# Lo spot al momento dello snapshot: estraiamo dall'ultimo
# `market_snapshot` ETH a quel timestamp (tolleranza ±15 min).
spot = _resolve_spot_at(db_path, asset=asset.upper(), at=latest_ts)
if spot is None:
console.print(
"[yellow]Spot non recuperabile dai market_snapshots; "
"stimato dal mid ATM.[/yellow]"
)
spot = _atm_spot_proxy(quotes)
selection = select_strikes(
chain=quotes,
bias=bias, # type: ignore[arg-type]
spot=spot,
now=latest_ts,
cfg=cfg,
)
if selection is None:
console.print(
"[red]Il rule engine NON aprirebbe trade con questa catena[/red] "
"(no strike compatibile coi gate delta/distance/width/credit-ratio)."
)
sys.exit(0)
short, long_ = selection
width_usd = (short.strike - long_.strike).copy_abs()
credit_eth = short.mid - long_.mid
credit_usd = credit_eth * spot
ratio = credit_usd / width_usd if width_usd > 0 else Decimal("0")
ratio_target = cfg.structure.credit_to_width_ratio_min
table = Table(title=f"Simulazione picker — bias={bias}, spot={spot:.0f}")
table.add_column("Campo", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Short strike", f"{short.strike} ({short.delta:+.3f}δ)")
table.add_row("Long strike", f"{long_.strike} ({long_.delta:+.3f}δ)")
table.add_row("Width", f"{width_usd:.0f} USD")
table.add_row("Credit", f"{credit_eth:.4f} ETH ≈ {credit_usd:.2f} USD")
table.add_row(
"Credit/width ratio",
f"{ratio:.2%} (gate ≥ {float(ratio_target):.0%})",
)
pass_str = (
"[green]PASS — entry possibile[/green]"
if ratio >= ratio_target
else "[red]FAIL — premio troppo magro[/red]"
)
table.add_row("Verdetto gate ratio", pass_str)
console.print(table)
def _resolve_spot_at(db_path: Path, *, asset: str, at: datetime) -> Decimal | None:
"""Best-effort lookup dello spot al timestamp ``at`` ± 15 min."""
conn = connect_state(db_path)
try:
rows = Repository().list_market_snapshots(
conn,
asset=asset,
start=at - timedelta(minutes=15),
end=at + timedelta(minutes=15),
limit=1,
)
finally:
conn.close()
if not rows:
return None
return rows[0].spot
def _atm_spot_proxy(quotes: list[Any]) -> Decimal:
"""Stima dello spot prendendo lo strike il cui delta è più vicino a 0.5."""
quote = min(quotes, key=lambda q: abs(abs(q.delta) - Decimal("0.5")))
return quote.strike
def _entrypoint() -> None:
"""Wrapper used by ``cerbero-bite`` console script."""
try:
+9
View File
@@ -75,6 +75,15 @@ class EntryConfig(BaseModel):
dealer_gamma_filter_enabled: bool = True
liquidation_filter_enabled: bool = True
# IV richness filter (§2.9). `iv_minus_rv_min` è la soglia in
# punti vol che la IV implicita 30g deve eccedere la RV30g per
# ammettere l'entry. Letteratura short-vol systematic: l'edge
# sostenibile esiste solo con un margine misurabile fra IV e RV.
# Default disabilitato + soglia 0 per non bloccare l'avvio finché
# non si è calibrato sui dati raccolti (vedi `📐 Calibrazione`).
iv_minus_rv_min: Decimal = Field(default=Decimal("0"))
iv_minus_rv_filter_enabled: bool = False
# ---------------------------------------------------------------------------
# Structure
+20
View File
@@ -44,6 +44,12 @@ class EntryContext(BaseModel):
dealer_net_gamma: Decimal | None = None
liquidation_squeeze_risk_high: bool | None = None
# IV richness gate (§2.9). Differenza IV30g RV30g in punti vol.
# Optional, stessa logica best-effort dei filtri quant: ``None``
# significa "dato non disponibile" e fa saltare il gate (non
# invalida l'entry).
iv_minus_rv: Decimal | None = None
class EntryDecision(BaseModel):
"""Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons."""
@@ -131,6 +137,20 @@ def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision:
):
reasons.append("imminent liquidation squeeze risk")
# §2.9: IV richness gate. Vendere vol senza un margine misurabile
# fra IV e RV è statisticamente neutro: l'edge della strategia
# esiste solo quando il premio è "ricco" rispetto a quanto il
# mercato si è effettivamente mosso.
if (
entry_cfg.iv_minus_rv_filter_enabled
and ctx.iv_minus_rv is not None
and ctx.iv_minus_rv < entry_cfg.iv_minus_rv_min
):
reasons.append(
f"IV richness below floor "
f"(IV-RV={ctx.iv_minus_rv} < {entry_cfg.iv_minus_rv_min} vol pts)"
)
return EntryDecision(accepted=not reasons, reasons=reasons)
+104 -8
View File
@@ -11,6 +11,7 @@ La pagina è di sola lettura: non chiama MCP, non scrive sul DB.
from __future__ import annotations
import math
import os
from dataclasses import dataclass
from pathlib import Path
@@ -280,13 +281,18 @@ def _build_gates(
)
)
# --- IV RV (richness) — solo informativo --------------------
# --- IV RV (richness) — gate §2.9 ---------------------------
rv = (
float(snap.realized_vol_30d) if snap.realized_vol_30d is not None else None
)
iv_minus_rv = (
float(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
)
iv_min = float(getattr(entry, "iv_minus_rv_min", 0.0)) if entry else 0.0
iv_enabled = (
bool(getattr(entry, "iv_minus_rv_filter_enabled", False)) if entry else False
)
if not iv_enabled:
rows.append(
_GateRow(
"IV RV (richness)",
@@ -295,9 +301,34 @@ def _build_gates(
if iv_minus_rv is not None
else ""
),
"info, > 0 = premio ricco",
"pass" if (iv_minus_rv is not None and iv_minus_rv > 0) else "n/a",
f"RV30={rv:.2f}" if rv is not None else "",
"filtro DISABILITATO (info-only)",
"n/a",
f"RV30={rv:.2f} · attiva con `iv_minus_rv_filter_enabled: true`"
if rv is not None
else "Attiva con `iv_minus_rv_filter_enabled: true`",
)
)
elif iv_minus_rv is None:
rows.append(
_GateRow(
"IV RV ≥ soglia",
"",
f"{iv_min:.1f} pt vol",
"n/a",
"Dato non disponibile in questo tick (best-effort skip).",
)
)
else:
ok = iv_minus_rv >= iv_min
rows.append(
_GateRow(
"IV RV ≥ soglia",
f"{iv_minus_rv:+.2f} pt vol",
f"{iv_min:.1f} pt vol",
"pass" if ok else "fail",
"Premio ricco rispetto a quanto il mercato si è davvero "
"mosso → edge sostenibile per il venditore di vol."
+ (f" RV30={rv:.2f}" if rv is not None else ""),
)
)
@@ -390,6 +421,37 @@ def _compute_pl(
annual_pl = trades_per_year * n_per_trade * concurrency * e_trade_net
apr = (annual_pl / capital) if capital > 0 else 0.0
# --- Max drawdown -------------------------------------------------
# Due metriche distinte:
#
# 1. **Streak atteso (P99)**: lunghezza della peggior sequenza di
# stop consecutivi che ci si aspetta di vedere in un anno con
# probabilità ≤ 1%. Usa l'approssimazione union-bound:
# P(streak ≥ N in N_trade tentativi) ≈ N_trade × p_loss^N
# Imponendo questa quantità ≤ 0.01 e risolvendo per N:
# N = ceil( log(0.01 / N_trade) / log(p_loss) )
# Drawdown corrispondente = N × stop_loss × contracts × concurrency.
#
# 2. **Tail/gap risk**: scenario "gap notturno" in cui il mark
# salta oltre la copertura long PRIMA che lo stop sia
# eseguibile. La perdita massima reale è la larghezza intera
# dello spread meno il credito iniziale, su tutte le posizioni
# aperte simultaneamente.
if prob_loss > 0 and prob_loss < 1 and trades_per_year > 0:
streak_99 = max(
1,
int(math.ceil(
math.log(0.01 / trades_per_year) / math.log(prob_loss)
)) if prob_loss < 1 else 1,
)
else:
streak_99 = 0
expected_dd_usd = streak_99 * sl_loss * n_per_trade * concurrency
expected_dd_pct = expected_dd_usd / capital if capital > 0 else 0.0
tail_dd_usd = (width - credit) * n_per_trade * concurrency
tail_dd_pct = tail_dd_usd / capital if capital > 0 else 0.0
return {
"width": width,
"credit": credit,
@@ -403,6 +465,12 @@ def _compute_pl(
"apr": apr,
"fees": fees,
"slippage": slippage,
"prob_loss": prob_loss,
"streak_99": float(streak_99),
"expected_dd_usd": expected_dd_usd,
"expected_dd_pct": expected_dd_pct,
"tail_dd_usd": tail_dd_usd,
"tail_dd_pct": tail_dd_pct,
}
@@ -439,6 +507,34 @@ def _render_profile_card(
delta=f"{metrics['apr']:+.1%} APR",
)
cols = st.columns(2)
cols[0].metric(
"Max DD attesa (P99)",
f"{metrics['expected_dd_usd']:.0f} USD",
delta=f"{-metrics['expected_dd_pct']:+.1%} cap",
delta_color="inverse",
help=(
f"Streak di {int(metrics['streak_99'])} stop consecutivi "
f"(probabilità ≤ 1% nell'anno) × perdita stop "
f"({metrics['sl_loss']:.0f} USD) × contratti × posizioni "
f"concorrenti. È la peggior sequenza che ti aspetti di "
"vedere; il drawdown reale può essere maggiore se i filtri "
"non rilevano un regime change."
),
)
cols[1].metric(
"Max DD coda (gap)",
f"{metrics['tail_dd_usd']:.0f} USD",
delta=f"{-metrics['tail_dd_pct']:+.1%} cap",
delta_color="inverse",
help=(
"Scenario gap notturno: il mark salta oltre la copertura "
"long PRIMA che lo stop sia eseguibile. Perdita = larghezza "
"intera meno credito, su tutte le posizioni aperte. "
"I filtri quant + macro lo riducono ma NON lo annullano."
),
)
if metrics["n_per_trade"] == 0:
st.warning(
"Sizing 0 contratti: capitale insufficiente per i cap di "
@@ -551,10 +647,10 @@ def _render_pl_panel(
sens_rows.append(
{
"Win rate": f"{wr:.0%}",
"Conservativa P/L": f"{m_c['annual_pl']:+.0f} USD",
"Conservativa APR": f"{m_c['apr']:+.1%}",
"Aggressiva P/L": f"{m_a['annual_pl']:+.0f} USD",
"Aggressiva APR": f"{m_a['apr']:+.1%}",
"Cons. APR": f"{m_c['apr']:+.1%}",
"Cons. Max DD": f"{m_c['expected_dd_pct']:.1%}",
"Aggr. APR": f"{m_a['apr']:+.1%}",
"Aggr. Max DD": f"{m_a['expected_dd_pct']:.1%}",
}
)
st.table(sens_rows)
+24
View File
@@ -94,6 +94,7 @@ class _MarketSnapshot:
portfolio_eur: Decimal
dealer_net_gamma: Decimal | None
liquidation_squeeze_risk_high: bool | None
iv_minus_rv: Decimal | None
async def _gather_snapshot(
@@ -159,6 +160,9 @@ async def _gather_snapshot(
liquidation_t: asyncio.Task[bool | None] = asyncio.create_task(
_safe_liquidation_squeeze(sentiment)
)
iv_rv_t: asyncio.Task[Decimal | None] = asyncio.create_task(
_safe_iv_minus_rv(deribit)
)
await asyncio.gather(
spot_t,
@@ -172,6 +176,7 @@ async def _gather_snapshot(
portfolio_t,
dealer_t,
liquidation_t,
iv_rv_t,
)
return _MarketSnapshot(
spot_eth_usd=spot_t.result(),
@@ -185,6 +190,7 @@ async def _gather_snapshot(
portfolio_eur=portfolio_t.result(),
dealer_net_gamma=dealer_t.result(),
liquidation_squeeze_risk_high=liquidation_t.result(),
iv_minus_rv=iv_rv_t.result(),
)
@@ -196,6 +202,20 @@ async def _safe_dealer_gamma(deribit: DeribitClient) -> Decimal | None:
return snap.total_net_dealer_gamma
async def _safe_iv_minus_rv(deribit: DeribitClient) -> Decimal | None:
"""Best-effort fetch of the IV30g RV30g spread (vol points)."""
try:
rv = await deribit.realized_vol("ETH")
except Exception:
return None
if not isinstance(rv, dict):
return None
value = rv.get("iv_minus_rv_30d")
if value is None:
return None
return value if isinstance(value, Decimal) else Decimal(str(value))
async def _safe_liquidation_squeeze(sentiment: SentimentClient) -> bool | None:
try:
heatmap = await sentiment.liquidation_heatmap("ETH")
@@ -353,6 +373,7 @@ async def run_entry_cycle(
next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma,
iv_minus_rv=snap.iv_minus_rv,
liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high,
)
decision = validate_entry(entry_ctx, cfg)
@@ -370,6 +391,9 @@ async def run_entry_cycle(
"eth_holdings_pct": str(snap.eth_holdings_pct),
"portfolio_eur": str(snap.portfolio_eur),
"capital_usd": str(capital_usd),
"iv_minus_rv": (
str(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
),
}
}
if not decision.accepted:
@@ -1,185 +0,0 @@
"""Periodic option-chain snapshot collector (§13).
Fetches the Deribit option chain for every strike entro la finestra
DTE configurata, prima del trigger entry settimanale (cron
``55 13 * * MON`` di default). Persiste un quote per ogni strumento
in ``option_chain_snapshots`` con un timestamp condiviso, che diventa
il dato di base per:
* il backtest non-stilizzato (vedi ``core/backtest.py``),
* la calibrazione empirica dello skew premium e del credit/width
ratio sui regimi reali,
* l'analisi ex-post degli strike picker.
Il collector è **best-effort**: se ``get_tickers`` fallisce per un
batch, gli altri batch passano comunque; se manca completamente la
chain, il job ritorna 0 senza alzare eccezioni e logga il problema.
Non chiama l'order book per ogni strike (sarebbe troppo costoso) —
``book_depth_top3`` resta NULL nel quote, il liquidity gate del live
lo legge al volo solo per gli strike che gli interessano.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import TYPE_CHECKING, Any
from cerbero_bite.state import connect, transaction
from cerbero_bite.state.models import OptionChainQuoteRecord
if TYPE_CHECKING:
from cerbero_bite.runtime.dependencies import RuntimeContext
__all__ = ["DEFAULT_BATCH_SIZE", "collect_option_chain_snapshot"]
_log = logging.getLogger("cerbero_bite.runtime.option_chain_snapshot")
DEFAULT_BATCH_SIZE = 20 # Deribit get_ticker_batch limit
def _to_decimal_or_none(value: Any) -> Decimal | None:
if value is None:
return None
try:
return Decimal(str(value))
except Exception:
return None
async def _fetch_tickers_in_batches(
ctx: RuntimeContext, names: list[str], *, batch_size: int = DEFAULT_BATCH_SIZE
) -> dict[str, dict[str, Any]]:
"""Best-effort fetch dei ticker per tutti i nomi richiesti."""
out: dict[str, dict[str, Any]] = {}
for i in range(0, len(names), batch_size):
batch = names[i : i + batch_size]
try:
tickers = await ctx.deribit.get_tickers(batch)
except Exception as exc:
_log.warning(
"get_tickers failed for batch starting %s: %s",
batch[0] if batch else "<empty>", exc,
)
continue
for t in tickers:
name = t.get("instrument_name") or t.get("instrument")
if isinstance(name, str):
out[name] = t
return out
async def collect_option_chain_snapshot(
ctx: RuntimeContext,
*,
asset: str = "ETH",
now: datetime | None = None,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> int:
"""Collect + persist a single chain snapshot for ``asset``. Returns
the number of quotes persisted (0 on best-effort failure).
Filtra le scadenze nella finestra ``[dte_min, dte_max]`` di
``cfg.structure`` per non sprecare richieste su scadenze che il
rule engine non userebbe mai.
"""
when = (now or datetime.now(UTC)).astimezone(UTC)
cfg = ctx.cfg
expiry_from = when + timedelta(days=cfg.structure.dte_min)
expiry_to = when + timedelta(days=cfg.structure.dte_max)
try:
chain = await ctx.deribit.options_chain(
currency=asset.upper(),
expiry_from=expiry_from,
expiry_to=expiry_to,
min_open_interest=int(cfg.liquidity.open_interest_min),
)
except Exception:
_log.exception("option chain fetch failed")
return 0
if not chain:
_log.info("option chain empty for %s in window", asset)
return 0
names = [meta.name for meta in chain]
tickers = await _fetch_tickers_in_batches(ctx, names, batch_size=batch_size)
quotes: list[OptionChainQuoteRecord] = []
for meta in chain:
ticker = tickers.get(meta.name)
if ticker is None:
# Lasciamo comunque la riga senza quote: utile sapere
# che lo strumento esisteva.
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
)
)
continue
greeks = ticker.get("greeks") or {}
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
bid=_to_decimal_or_none(ticker.get("bid")),
ask=_to_decimal_or_none(ticker.get("ask")),
mid=_to_decimal_or_none(ticker.get("mark_price")),
iv=_to_decimal_or_none(ticker.get("mark_iv")),
delta=_to_decimal_or_none(greeks.get("delta")),
gamma=_to_decimal_or_none(greeks.get("gamma")),
theta=_to_decimal_or_none(greeks.get("theta")),
vega=_to_decimal_or_none(greeks.get("vega")),
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
volume_24h=(
int(ticker["volume_24h"])
if ticker.get("volume_24h") is not None
else None
),
# book_depth_top3: NULL — non lo prendiamo per ogni
# strike per non saturare l'API. Il liquidity gate
# del live lo chiede on-the-fly per gli strike
# candidati al picker.
)
)
persisted = 0
try:
conn = connect(ctx.db_path)
try:
with transaction(conn):
persisted = ctx.repository.record_option_chain_snapshot(
conn, quotes
)
finally:
conn.close()
except Exception:
_log.exception("persist option chain snapshot failed")
return 0
_log.info("option_chain_snapshot persisted %d quote(s)", persisted)
return persisted
# Avoid unused import warning for asyncio in lint when only used as type
_ = asyncio
-21
View File
@@ -34,9 +34,6 @@ from cerbero_bite.runtime.market_snapshot_cycle import (
DEFAULT_ASSETS,
collect_market_snapshot,
)
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
@@ -56,7 +53,6 @@ _CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry
_BACKUP_RETENTION_DAYS = 30
@@ -221,8 +217,6 @@ class Orchestrator:
manual_actions_cron: str = _CRON_MANUAL_ACTIONS,
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler:
@@ -288,14 +282,6 @@ class Orchestrator:
await _safe("market_snapshot", _do)
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
await _safe("option_chain_snapshot", _do)
jobs: list[JobSpec] = [
JobSpec(name="health", cron=health_cron, coro_factory=_health),
JobSpec(name="backup", cron=backup_cron, coro_factory=_backup),
@@ -323,13 +309,6 @@ class Orchestrator:
coro_factory=_market_snapshot,
)
)
jobs.append(
JobSpec(
name="option_chain_snapshot",
cron=option_chain_cron,
coro_factory=_option_chain_snapshot,
)
)
else:
_log.warning(
"data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS="
@@ -1,42 +0,0 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
-- in quello stesso tick condividono il timestamp, così filtrare per
-- "lo snapshot del 2026-05-04 alle 13:55" è una semplice
-- WHERE timestamp = X.
CREATE TABLE option_chain_snapshots (
timestamp TEXT NOT NULL,
asset TEXT NOT NULL,
instrument_name TEXT NOT NULL,
strike TEXT NOT NULL,
expiry TEXT NOT NULL,
option_type TEXT NOT NULL CHECK (option_type IN ('C','P')),
bid TEXT,
ask TEXT,
mid TEXT,
iv TEXT,
delta TEXT,
gamma TEXT,
theta TEXT,
vega TEXT,
open_interest INTEGER,
volume_24h INTEGER,
book_depth_top3 INTEGER,
PRIMARY KEY (timestamp, instrument_name)
) WITHOUT ROWID;
CREATE INDEX idx_option_chain_asset_ts
ON option_chain_snapshots(asset, timestamp DESC);
CREATE INDEX idx_option_chain_expiry
ON option_chain_snapshots(asset, expiry);
PRAGMA user_version = 5;
-31
View File
@@ -22,7 +22,6 @@ __all__ = [
"InstructionRecord",
"ManualAction",
"MarketSnapshotRecord",
"OptionChainQuoteRecord",
"PositionRecord",
"PositionStatus",
"SystemStateRecord",
@@ -149,36 +148,6 @@ class MarketSnapshotRecord(BaseModel):
fetch_errors_json: str | None = None
class OptionChainQuoteRecord(BaseModel):
"""Row of the ``option_chain_snapshots`` table.
One row per (snapshot_ts, instrument) the same ``timestamp`` is
shared by every quote prelevato nello stesso tick del cron. Tutti
i campi numerici sono opzionali perché il collector è
best-effort: un ticker mancante non invalida il resto della chain.
"""
model_config = ConfigDict(extra="forbid")
timestamp: datetime
asset: str
instrument_name: str
strike: Decimal
expiry: datetime
option_type: Literal["C", "P"]
bid: Decimal | None = None
ask: Decimal | None = None
mid: Decimal | None = None
iv: Decimal | None = None
delta: Decimal | None = None
gamma: Decimal | None = None
theta: Decimal | None = None
vega: Decimal | None = None
open_interest: int | None = None
volume_24h: int | None = None
book_depth_top3: int | None = None
class ManualAction(BaseModel):
"""Row of the ``manual_actions`` table."""
-130
View File
@@ -24,7 +24,6 @@ from cerbero_bite.state.models import (
InstructionRecord,
ManualAction,
MarketSnapshotRecord,
OptionChainQuoteRecord,
PositionRecord,
PositionStatus,
SystemStateRecord,
@@ -408,103 +407,6 @@ class Repository:
).fetchall()
return [_row_to_market_snapshot(r) for r in rows]
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
def record_option_chain_snapshot(
self,
conn: sqlite3.Connection,
quotes: list[OptionChainQuoteRecord],
) -> int:
"""Bulk-insert dei quote di un singolo tick. Tutti i quote
condividono lo stesso ``timestamp``. Idempotente per
(timestamp, instrument_name)."""
if not quotes:
return 0
rows = [
(
_enc_dt(q.timestamp),
q.asset,
q.instrument_name,
_enc_dec(q.strike),
_enc_dt(q.expiry),
q.option_type,
_enc_dec(q.bid),
_enc_dec(q.ask),
_enc_dec(q.mid),
_enc_dec(q.iv),
_enc_dec(q.delta),
_enc_dec(q.gamma),
_enc_dec(q.theta),
_enc_dec(q.vega),
q.open_interest,
q.volume_24h,
q.book_depth_top3,
)
for q in quotes
]
conn.executemany(
"INSERT OR REPLACE INTO option_chain_snapshots("
"timestamp, asset, instrument_name, strike, expiry, option_type, "
"bid, ask, mid, iv, delta, gamma, theta, vega, "
"open_interest, volume_24h, book_depth_top3) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
rows,
)
return len(rows)
def list_option_chain_snapshots(
self,
conn: sqlite3.Connection,
*,
asset: str,
start: datetime | None = None,
end: datetime | None = None,
expiry_from: datetime | None = None,
expiry_to: datetime | None = None,
limit: int = 50000,
) -> list[OptionChainQuoteRecord]:
clauses: list[str] = ["asset = ?"]
params: list[Any] = [asset]
if start is not None:
clauses.append("timestamp >= ?")
params.append(_enc_dt(start))
if end is not None:
clauses.append("timestamp <= ?")
params.append(_enc_dt(end))
if expiry_from is not None:
clauses.append("expiry >= ?")
params.append(_enc_dt(expiry_from))
if expiry_to is not None:
clauses.append("expiry <= ?")
params.append(_enc_dt(expiry_to))
params.append(int(limit))
rows = conn.execute(
f"SELECT * FROM option_chain_snapshots "
f"WHERE {' AND '.join(clauses)} "
f"ORDER BY timestamp DESC, instrument_name ASC LIMIT ?",
params,
).fetchall()
return [_row_to_option_chain_quote(r) for r in rows]
def latest_option_chain_timestamp(
self,
conn: sqlite3.Connection,
*,
asset: str,
) -> datetime | None:
"""Timestamp dell'ultimo snapshot raccolto per ``asset``,
utile per stimare la freschezza del dato dalla GUI."""
row = conn.execute(
"SELECT timestamp FROM option_chain_snapshots "
"WHERE asset = ? ORDER BY timestamp DESC LIMIT 1",
(asset,),
).fetchone()
if row is None:
return None
return _dec_dt(row["timestamp"])
# ------------------------------------------------------------------
# manual_actions
# ------------------------------------------------------------------
@@ -743,38 +645,6 @@ def _row_to_market_snapshot(row: sqlite3.Row) -> MarketSnapshotRecord:
)
def _row_to_option_chain_quote(row: sqlite3.Row) -> OptionChainQuoteRecord:
return OptionChainQuoteRecord(
timestamp=_dec_dt_required(row["timestamp"]),
asset=row["asset"],
instrument_name=row["instrument_name"],
strike=_dec_dec_required(row["strike"]),
expiry=_dec_dt_required(row["expiry"]),
option_type=row["option_type"],
bid=_dec_dec(row["bid"]),
ask=_dec_dec(row["ask"]),
mid=_dec_dec(row["mid"]),
iv=_dec_dec(row["iv"]),
delta=_dec_dec(row["delta"]),
gamma=_dec_dec(row["gamma"]),
theta=_dec_dec(row["theta"]),
vega=_dec_dec(row["vega"]),
open_interest=(
int(row["open_interest"])
if row["open_interest"] is not None
else None
),
volume_24h=(
int(row["volume_24h"]) if row["volume_24h"] is not None else None
),
book_depth_top3=(
int(row["book_depth_top3"])
if row["book_depth_top3"] is not None
else None
),
)
def _dec_dec_required(value: Any) -> Decimal:
out = _dec_dec(value)
if out is None:
+9 -2
View File
@@ -28,8 +28,8 @@
# 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante**
# di cosa otterresti DOPO quel lavoro di codice.
config_version: "1.0.0-aggressiva"
config_hash: "b931a2b96fbc149b21cae84a196ee8bad10220b5ee8fa9ab0ed06ae52d7dc531"
config_version: "1.1.0-aggressiva"
config_hash: "58086a4afbbf36c48d22f39bbc75d8145e76a063917431793d3b92ae76b5eb68"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -66,6 +66,13 @@ entry:
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9) — abilitato con soglia 3 pt vol.
# Coerente con il profilo aggressivo: size più grande pretende
# win-rate più alto. La soglia 3 va alzata a 5 dopo la
# calibrazione (4-8 settimane di dati raccolti).
iv_minus_rv_min: "3"
iv_minus_rv_filter_enabled: true
structure:
dte_target: 18
dte_min: 14
+6 -2
View File
@@ -15,8 +15,8 @@
# cerbero-bite config hash --file strategy.conservativa.yaml
# e bumpare config_version.
config_version: "1.0.0-conservativa"
config_hash: "eff824281bbb538fba49434d8cc4b9c37675bc73d60e351293e263cc7e7b29ef"
config_version: "1.1.0-conservativa"
config_hash: "188155fd0017a1353024151b8237f257b0c3156d2592ce89653d239b39fb69ce"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -50,6 +50,10 @@ entry:
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9) — disabilitato finché non calibrato.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure:
dte_target: 18
dte_min: 14
+9 -2
View File
@@ -6,8 +6,8 @@
# config hash), and lands as a separate commit with the motivation in
# the commit message.
config_version: "1.0.0"
config_hash: "4c2be4c51c849ed58fa22ec2b302016c453894dd0964b6d05445ab1b723e2d10"
config_version: "1.1.0"
config_hash: "e0504e6936e9ec5013e7901cf98532e29ff2414b1cce10461cfe97790119b724"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -46,6 +46,13 @@ entry:
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9). Disabilitato di default: è il filtro
# con maggior impatto sul win-rate ma va calibrato sui dati
# raccolti in `market_snapshots` prima di metterlo in produzione.
# Vedi `docs/13-strategia-spiegata.md` §4-quater.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure:
dte_target: 18
dte_min: 14
+10
View File
@@ -118,6 +118,16 @@ def _wire_market_snapshot(
},
is_reusable=True,
)
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/get_realized_vol",
json={
"currency": "ETH",
"realized_vol_pct": {"14d": 30.0, "30d": 30.0},
"iv_current_pct": 38.0,
"iv_minus_rv_pct": {"14d": 8.0, "30d": 8.0},
},
is_reusable=True,
)
httpx_mock.add_response(
url="http://mcp-sentiment:9014/tools/get_liquidation_heatmap",
json={
-1
View File
@@ -129,7 +129,6 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None:
"backup",
"manual_actions",
"market_snapshot",
"option_chain_snapshot",
}
+1 -1
View File
@@ -68,7 +68,7 @@ def test_compute_hash_is_independent_of_recorded_hash_value(tmp_path: Path) -> N
def test_load_repo_strategy_yaml(tmp_path: Path) -> None:
"""The committed strategy.yaml validates with the recorded hash."""
result = load_strategy(REPO_ROOT / "strategy.yaml")
assert result.config.config_version == "1.0.0"
assert result.config.config_version == "1.1.0"
assert result.config.sizing.kelly_fraction == Decimal("0.13")
assert result.computed_hash == result.config.config_hash
+56
View File
@@ -194,6 +194,62 @@ def test_dealer_gamma_filter_disabled_in_config(cfg: StrategyConfig) -> None:
assert decision.accepted is True
# ---------------------------------------------------------------------------
# IV richness gate (§2.9)
# ---------------------------------------------------------------------------
def _strict_iv_rv_cfg(
cfg: StrategyConfig, *, threshold: Decimal = Decimal("5")
) -> StrategyConfig:
return golden_config(
entry=EntryConfig(
**{
**cfg.entry.model_dump(),
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_min": threshold,
}
)
)
def test_iv_richness_gate_disabled_by_default_lets_thin_premium_pass(
cfg: StrategyConfig,
) -> None:
# Default config: filter disabled. Anche con IV-RV negativa (RV>IV)
# l'entry deve passare per non rompere setup pre-calibrazione.
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("-2")), cfg)
assert decision.accepted is True
def test_iv_richness_gate_blocks_when_below_floor(cfg: StrategyConfig) -> None:
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("3")), strict)
assert decision.accepted is False
assert any("IV richness" in r for r in decision.reasons)
def test_iv_richness_gate_passes_when_above_floor(cfg: StrategyConfig) -> None:
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("6")), strict)
assert decision.accepted is True
def test_iv_richness_gate_passes_at_exact_threshold(cfg: StrategyConfig) -> None:
# Soglia inclusiva: IV-RV == soglia → accettato (gate è "<", non "<=").
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("5")), strict)
assert decision.accepted is True
def test_iv_richness_gate_skipped_when_data_missing(cfg: StrategyConfig) -> None:
# MCP irraggiungibile: best-effort skip, non bloccare l'entry per
# un problema di infrastruttura.
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=None), strict)
assert decision.accepted is True
def test_validate_entry_accumulates_all_reasons(cfg: StrategyConfig) -> None:
decision = validate_entry(
_good_ctx(
@@ -1,134 +0,0 @@
"""TDD per :mod:`cerbero_bite.runtime.option_chain_snapshot_cycle`."""
from __future__ import annotations
from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
from cerbero_bite.clients.deribit import InstrumentMeta
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.state.models import OptionChainQuoteRecord
_NOW = datetime(2026, 5, 4, 13, 55, tzinfo=UTC)
def _meta(name: str, strike: int, expiry_dte: int = 18) -> InstrumentMeta:
expiry = _NOW.replace(hour=8, minute=0, second=0)
expiry = expiry.replace(day=expiry.day) + (
# add days
__import__("datetime").timedelta(days=expiry_dte)
)
return InstrumentMeta(
name=name,
strike=Decimal(str(strike)),
expiry=expiry,
option_type="P",
open_interest=Decimal("100"),
tick_size=Decimal("0.0005"),
min_trade_amount=Decimal("1"),
)
def _ticker(name: str, *, mark: float = 0.020, bid: float = 0.018,
ask: float = 0.022, delta: float = -0.12) -> dict:
return {
"instrument_name": name,
"bid": bid,
"ask": ask,
"mark_price": mark,
"mark_iv": 60.0,
"volume_24h": 50,
"greeks": {
"delta": delta,
"gamma": 0.001,
"theta": -0.0005,
"vega": 0.10,
},
}
@pytest.fixture
def cfg() -> object:
from cerbero_bite.config import golden_config
return golden_config()
@pytest.fixture
def fake_ctx(cfg: object) -> MagicMock:
"""Mock minimal RuntimeContext."""
ctx = MagicMock()
ctx.cfg = cfg
ctx.db_path = ":memory:"
return ctx
@pytest.mark.asyncio
async def test_collector_persists_one_quote_per_instrument(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475), _meta("ETH-21MAY26-2400-P", 2400)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(
return_value=[_ticker(m.name) for m in metas]
)
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, asset="ETH", now=_NOW)
assert n == 2
assert len(persisted) == 1
assert {q.instrument_name for q in persisted[0]} == {
"ETH-21MAY26-2475-P", "ETH-21MAY26-2400-P",
}
# Tutti i quote condividono il timestamp del cron tick.
assert all(q.timestamp == _NOW for q in persisted[0])
@pytest.mark.asyncio
async def test_collector_handles_missing_tickers_with_null_fields(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(return_value=[]) # vuoto
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 1
assert persisted[0][0].mid is None # ticker mancante ⇒ campi NULL
assert persisted[0][0].instrument_name == "ETH-21MAY26-2475-P"
@pytest.mark.asyncio
async def test_collector_returns_zero_when_chain_empty(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(return_value=[])
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0
@pytest.mark.asyncio
async def test_collector_swallows_chain_fetch_failure(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(side_effect=RuntimeError("boom"))
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0 # best-effort: non rilancia