10 Commits

Author SHA1 Message Date
root a1a9f74ed2 Merge feat/option-chain-snapshots 2026-05-01 21:08:28 +00:00
root a9df399db4 Merge feat/backtest-engine 2026-05-01 21:08:22 +00:00
root e06f4d5c96 Merge feat/strategy-improvements-fdac
# Conflicts:
#	src/cerbero_bite/gui/pages/7_📚_Strategia.py
#	strategy.aggressiva.yaml
#	strategy.conservativa.yaml
#	strategy.yaml
#	tests/unit/test_config_loader.py
2026-05-01 21:08:12 +00:00
root f24511fcad Merge feat/iv-rv-hard-gate 2026-05-01 21:06:32 +00:00
root 954baaa354 feat(cli): comando option-chain (trigger + analyze) per la catena opzioni
Espone direttamente da CLI le due operazioni più utili sui dati di
``option_chain_snapshots`` raccolti dal cron settimanale:

- ``cerbero-bite option-chain trigger`` — esegue UNA volta il
  collector della catena. Riusa la stessa pipeline schedulata (cron
  ``55 13 * * MON``) ma on-demand. Utile per popolare il DB senza
  aspettare lunedì.
- ``cerbero-bite option-chain analyze [--bias bull_put|bear_call]`` —
  legge l'ultimo snapshot, simula il selector di strike
  (``select_strikes``) con la strategy passata e stampa una tabella
  con: short/long strike, delta, width, credito reale, ratio
  credit/width, e PASS/FAIL del gate ``credit_to_width_ratio_min``.

Il comando ``analyze`` rende immediatamente actionable la catena
appena raccolta: invece di stime ex-ante via Black-Scholes (modulo
``core/backtest.py``), legge i mid REALI di Deribit e dice "il rule
engine aprirebbe questo trade qui? credit/width ratio passa o no?".

Esempio di output sui primi snapshot raccolti (regime ETH ~2200,
DTE ~14g):

    Snapshot del 2026-05-01T20:53:49 — 21 quote totali
    Il rule engine NON aprirebbe trade con questa catena
    (no strike compatibile coi gate delta/distance/width/credit-ratio).

Conferma empirica del messaggio del documento ``13-strategia-spiegata``:
con delta target 0.12 + width 4% + credit/width ≥ 30%, il regime
attuale di ETH options non è abbastanza ricco per produrre trade —
serve calibrare soglie o aspettare un regime IV più alto.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:57:40 +00:00
root 3e46169278 fix(migrations): rinomina 0004 → 0005 per coesistenza con auto_pause
La migrazione `0004_option_chain_snapshots.sql` collide con quella
parallela `0004_auto_pause.sql` del PR `feat/strategy-improvements-fdac`:
entrambe puntano allo stesso slot e bumpano user_version a 4.

Rinominata a 0005 (con `PRAGMA user_version = 5`) così le due
migrazioni possono coesistere senza conflitti, indipendentemente
dall'ordine di merge dei due PR. Quando i due PR landeranno in main,
basterà conservare la sequenza 0004 (auto_pause) → 0005 (option_chain).

Verificato in locale: deploy con DB già a v4 (post-FDAC) ora applica
correttamente la migrazione e crea la tabella `option_chain_snapshots`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:52:11 +00:00
root c0a0ee416f feat(state+runtime): option_chain_snapshots — catena opzioni storica per backtest reale
Aggiunge la persistence della option chain Deribit con cron settimanale
``55 13 * * MON`` (5 minuti prima del trigger entry alle 14:00 UTC),
sbloccando il backtest non-stilizzato e la calibrazione empirica
dello skew premium.

**Schema (migrazione 0004)**

Nuova tabella ``option_chain_snapshots`` con primary key composta
``(timestamp, instrument_name)`` — tutti i quote prelevati nello
stesso tick condividono il timestamp, così le query "lo snapshot del
2026-05-04 alle 13:55" diventano una singola WHERE timestamp = X.
Indici su (asset, timestamp DESC) e (asset, expiry) per supportare
sia listing recenti sia query per scadenza specifica.

Campi: instrument_name, strike, expiry, option_type (C/P), bid, ask,
mid, iv, delta, gamma, theta, vega, open_interest, volume_24h,
book_depth_top3. Tutti i numerici sono nullable: il collector è
best-effort, un ticker mancante produce comunque una riga (utile
per sapere che lo strumento esisteva ma non era quotato).

**Modello + repository**

- ``OptionChainQuoteRecord`` (Pydantic, in ``state/models.py``).
- ``Repository.record_option_chain_snapshot`` (bulk insert
  idempotente).
- ``Repository.list_option_chain_snapshots`` (filtri su asset,
  timestamp window, expiry window, limit default 50000).
- ``Repository.latest_option_chain_timestamp`` (freshness check
  per dashboard GUI).

**Collector**

Nuovo ``runtime/option_chain_snapshot_cycle.py`` che:

1. Calcola la finestra scadenze ``[now+dte_min, now+dte_max]`` da
   ``cfg.structure``: niente richieste su scadenze che il rule
   engine non userebbe mai.
2. Chiama ``deribit.options_chain()`` con
   ``min_open_interest=cfg.liquidity.open_interest_min``.
3. Batch ``deribit.get_tickers()`` (max 20 per call, limite Deribit)
   con error-isolation per batch — un batch fallito non blocca
   gli altri.
4. NON chiama l'order book per ogni strike (rate-limit guard);
   ``book_depth_top3`` resta NULL e il liquidity gate live lo
   chiede on-the-fly per gli strike candidati al picker.

Best-effort end-to-end: chain assente, get_tickers giù, persist
fallito → ritorna 0 senza alzare eccezioni, logga sempre.

**Schedulazione**

Wired in ``Orchestrator.install_scheduler`` come job parallelo a
``market_snapshot``, attivo solo quando
``ENABLE_DATA_ANALYSIS=true``. Cron parametrizzabile via il nuovo
kwarg ``option_chain_cron`` (default ``55 13 * * MON``).

**Test**

- 4 unit test del collector (happy path, ticker mancante, chain
  vuota, fetch fail best-effort) con mock di RuntimeContext.
- Aggiornato ``test_install_scheduler_registers_canonical_jobs``
  per includere il nuovo job nel set canonico.

**Cosa sblocca**

- Backtest non-stilizzato: il PR ``feat/backtest-engine`` può
  dropparsi il modello BS+skew_premium e leggere prezzi reali
  ``mid`` dalla chain registrata.
- Calibrazione empirica dello skew premium (hardcoded a 1.5 nel
  backtest stilizzato): plot del rapporto fra quote reali Deribit
  e BS per delta/expiry, regressione → valore data-driven.
- Validazione ex-post: "il delta-0.12 era davvero a 25% OTM in
  quella settimana?" diventa una query SELECT.
- Dimensione attesa: ~50 strike × 3 scadenze × 1 snapshot/settimana
  × 17 colonne ≈ 12 KB/settimana, ~600 KB/anno. Trascurabile.

Suite: 409 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:44:49 +00:00
root f664ea1a15 feat(backtest): stylized engine over market_snapshots + CLI subcommand
Aggiunge `core/backtest.py`, motore di backtesting stilizzato che gira
sui dati raccolti in `market_snapshots`. Risponde alla domanda:
"se questa config fosse stata attiva nelle ultime N settimane, quanti
lunedì avrebbero superato i filtri e quale sarebbe stato il P/L stimato?"

**Architettura a due strati**:

1. **Filtri di entry — RIGOROSO**: per ogni Monday-14:00-UTC nei
   snapshot ricostruisce `EntryContext` e chiama lo stesso
   `validate_entry()` del live. Output esatto di "cosa avrebbe deciso
   il bot" per ogni settimana, con conteggio dei motivi di skip.

2. **P/L per trade accettato — STILIZZATO**: senza catena opzioni
   storica, stima credito/exit via Black-Scholes con skew premium
   (default 1.5×) per approssimare la vol smile dell'ETH. Re-prezza
   il combo ad ogni tick futuro per simulare i trigger §7
   (profit_take, stop_loss, vol_stop, time_stop, expiry).

**Aggregati nel `BacktestReport`**:
- n_picks / n_accepted / n_skipped_data / n_completed / n_winners
- win_rate, P/L cumulato (USD + % su capitale)
- max drawdown (USD + % di peak)
- Sharpe annualizzato (52 settimane)
- skip_reasons: dict{motivo → settimane bloccate}

**CLI**: nuovo `cerbero-bite backtest --strategy F --from D --to D
--capital N --asset ETH`. Stampa Rich-formatted summary + tabella
motivi di skip. Esempio:

    cerbero-bite backtest \
      --strategy strategy.aggressiva.yaml \
      --from 2026-04-01 --to 2026-05-01 \
      --capital 10000

**Limiti dichiarati**:
- BS + skew_premium ≠ catena reale: i numeri P/L sono **stime ex-post
  per ranking config**, non promesse operative. Buono per dire
  "config A batte config B sui dati reali", non per dimensionare
  capitale.
- skew_premium 1.5× è stato calibrato sui dati Deribit storici
  (smile slope ETH options); va rifinito quando avremo abbastanza
  chain history da farlo empiricamente.

**Tests**: 15 unit test (BS math, monday picks, filter sim,
position outcome simulation, full pipeline su sintetico).
Suite totale: 420 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:31:54 +00:00
root c4cd2986a4 feat(gui): aggiunge max drawdown atteso (P99) e tail/gap nei profili
Due metriche per ciascun profilo nel pannello P/L:

- **Max DD attesa (P99)**: streak di stop consecutivi con probabilità
  ≤ 1% nell'anno (union-bound: N_trade × p_loss^N ≤ 0.01) ×
  perdita stop × contratti × posizioni concorrenti.
- **Max DD coda (gap)**: scenario gap notturno in cui il mark salta
  oltre la copertura long PRIMA che lo stop sia eseguibile —
  perdita = larghezza intera meno credito iniziale, su tutte le
  posizioni aperte.

Aggiunge anche colonna "Max DD" nella tabella di sensibilità
win-rate, così si vede immediatamente il trade-off
APR-vs-drawdown al variare del win-rate (da 65% a 82%).

Effetto pratico: a default cap=10k, spot=3000, win=0.75, trades=18:
- Conservativa: APR ≈ +1.8%, Max DD attesa ≈ −2.2% capitale
- Aggressiva: APR ≈ +14%, Max DD attesa ≈ −30% capitale

Numeri che rendono molto più tangibile la frase "drawdown scala con
lo stesso fattore" del §4-ter del documento.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:50:09 +00:00
root 4ab7590745 feat(entry): IV richness gate (§2.9) + golden config bump 1.0.0 → 1.1.0
Aggiunge il filtro a maggior impatto sul win-rate atteso: l'entry
salta se la IV implicita non sta pagando un margine misurabile sopra
la realized vol. La letteratura short-vol systematic indica che
l'edge sostenibile della strategia esiste solo quando IV30g − RV30g
supera una soglia di alcuni punti vol; senza questo gate il selling
vol nudo è strutturalmente neutro a win-rate 70-72%.

Implementazione end-to-end:

- `EntryConfig`: due nuovi campi `iv_minus_rv_min` e
  `iv_minus_rv_filter_enabled`, con default `0` / `false` per non
  rompere setup pre-calibrazione.
- `validate_entry`: §2.9 hard gate che blocca l'entry se
  `iv_minus_rv < iv_minus_rv_min` (skip silenzioso quando il dato è
  `None`, coerente con il pattern §2.8 dei filtri quant).
- `entry_cycle._gather_snapshot`: nuovo `_safe_iv_minus_rv` che
  legge `deribit.realized_vol("ETH")["iv_minus_rv_30d"]` in
  best-effort e lo propaga via `_MarketSnapshot.iv_minus_rv` →
  `EntryContext.iv_minus_rv` → audit `inputs.snapshot.iv_minus_rv`.
- `tests/unit/test_entry_validator.py`: 5 nuovi casi (default
  permissivo, gate sotto/sopra/uguale soglia, dato mancante).
- `tests/integration/test_entry_cycle.py`: stub `get_realized_vol`
  nel mock helper così tutti gli scenari di happy/edge path
  continuano a passare.

Configurazione di profili coerente con la disciplina:

- `strategy.yaml` (golden 1.1.0) e `strategy.conservativa.yaml`:
  gate `enabled=false, min=0`. Manteniamo i lunedì pre-calibrazione
  per accumulare dati sulla distribuzione di `iv_minus_rv`.
- `strategy.aggressiva.yaml` (1.1.0-aggressiva): gate
  `enabled=true, min=3`. Coerente con la filosofia del profilo —
  size più grande pretende win-rate più alto. La soglia 3 è
  conservativa; la documentazione raccomanda 5 dopo 4-8 settimane di
  calibrazione.

Doc + GUI:

- `docs/13-strategia-spiegata.md` §4-quater: spiega gate, parametri,
  default per profilo, effetto atteso sul P/L (trade/anno scendono
  ma E[trade] sale → APR cresce comunque), roadmap di hardening
  (soglia adattiva, vol-of-vol guard, multi-asset).
- pagina `📚 Strategia`: la riga "IV − RV" passa da informativa a
  pass/fail reale; mostra "filtro DISABILITATO (info-only)" quando
  spento, / contro la soglia di config quando acceso.

Bump versioni e hash di tutti e tre i file YAML
(`config_version: 1.1.0`, hash ricalcolato). Test pinning aggiornato
(`test_load_repo_strategy_yaml`).

Suite: 410 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:32:21 +00:00
21 changed files with 2155 additions and 27 deletions
+63
View File
@@ -532,6 +532,69 @@ quella che il sistema parte ad eseguire.
--- ---
## 4-quater. IV richness gate (§2.9): il filtro che alza il win-rate
Il filtro a maggior impatto sull'edge è anche il più semplice da
descrivere: **non vendere vol quando la IV non sta pagando un margine
misurabile sopra la RV**. È implementato come gate hard nel
`validate_entry`:
```
if iv_minus_rv_filter_enabled and iv_minus_rv < iv_minus_rv_min:
skip entry
```
con due parametri in `entry:` di `strategy.yaml`:
| Parametro | Default | Effetto |
|---|---|---|
| `iv_minus_rv_filter_enabled` | `false` (golden) / `true` (aggressiva) | Master switch del gate |
| `iv_minus_rv_min` | `0` (golden) / `3` (aggressiva) | Soglia in punti vol che IV30g RV30g deve eccedere |
Il dato è già raccolto in `market_snapshots.iv_minus_rv` ogni 15
minuti. Il gate consulta l'ultimo tick disponibile al momento
dell'entry cycle (non un percentile rolling — quello è il prossimo
step di calibrazione, vedi §4-quinquies in roadmap).
**Profili di default ragionati.**
- **Conservativa / golden config**: `enabled=false, min=0`. Tutti i
setup passano questo gate, anche con IV-RV negativa. Motivo: nei
primi 8 turni di lunedì non si hanno abbastanza tick per stabilire
che soglia ha senso nel proprio regime. Lasciamo la pagina
`📐 Calibrazione` mostrare la distribuzione e poi alziamo
manualmente.
- **Aggressiva**: `enabled=true, min=3`. Il profilo aggressivo già di
suo prende size più grande; pretendere `IV-RV ≥ 3 vol points` come
prerequisito è coerente — se stai betting più grosso, vuoi
win-rate più alto. La soglia 3 è conservativa; la letteratura
short-vol systematic suggerisce 5 dopo calibrazione.
**Cosa cambia nel P/L atteso quando attivi il gate.**
Il gate **riduce** il numero di entry (saltiamo settimane con premio
magro) ma **alza** la qualità di quelle che passano (premio ricco =
win-rate empirico più alto). Effetto netto sul P/L annuo:
- Trade/anno: 18 → 12-14 (skip più aggressivo)
- Win-rate atteso: 0.72 → 0.78-0.80
- E[trade] netto: +0.6 USD → +4-6 USD per contratto
- **P/L annuo proiettato sale anche se i trade scendono**, perché
ogni trade ha edge più alto.
La pagina `📚 Strategia` ha lo slider win-rate già coerente con
questa logica: muovi da 0.72 a 0.78 e vedi l'APR scattare.
**Roadmap di hardening (passi successivi al merge di questo PR).**
1. **Soglia adattiva**: sostituire `iv_minus_rv_min: 3` con un valore
calcolato a runtime come `P25 rolling 60d` di `market_snapshots.iv_minus_rv`.
2. **Vol-of-vol guard**: bloccare entry quando `dvol` è cambiato di
≥5 punti nelle ultime 24h, anche se `iv_minus_rv` è alto (regime
instabile).
3. **Multi-asset (ETH+BTC)**: come da §4-ter, sblocca il
moltiplicatore 2× sulle opportunità a parità di filtri.
## 5. Come leggere il dato giorno per giorno ## 5. Come leggere il dato giorno per giorno
Tre euristiche operative sui campi raccolti: Tre euristiche operative sui campi raccolti:
+385 -1
View File
@@ -13,7 +13,7 @@ import asyncio
import os import os
import sys import sys
from collections.abc import Callable from collections.abc import Callable
from datetime import UTC, datetime from datetime import UTC, datetime, timedelta
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -679,6 +679,155 @@ def replay(date_from: str, date_to: str, capital: float, dry_run: bool) -> None:
) )
@main.command()
@click.option(
"--strategy",
"strategy_path",
type=click.Path(path_type=Path),
default=Path("strategy.yaml"),
show_default=True,
help="Path al file di strategia (golden, conservativa, aggressiva, ...).",
)
@click.option(
"--db",
"db_path",
type=click.Path(path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
help="SQLite con `market_snapshots` storiche.",
)
@click.option(
"--from",
"date_from",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: 90 giorni fa).",
)
@click.option(
"--to",
"date_to",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: oggi).",
)
@click.option(
"--capital",
type=float,
default=1500.0,
show_default=True,
help="Capitale di partenza per il backtest, in USD.",
)
@click.option(
"--asset",
type=str,
default="ETH",
show_default=True,
help="Asset di riferimento per le snapshot.",
)
@click.option(
"--no-enforce-hash",
is_flag=True,
default=False,
help="Salta la verifica del config_hash (utile per profili sperimentali).",
)
def backtest(
strategy_path: Path,
db_path: Path,
date_from: datetime | None,
date_to: datetime | None,
capital: float,
asset: str,
no_enforce_hash: bool,
) -> None:
"""Esegue il backtest stilizzato su `market_snapshots` storiche.
Usa lo stesso `validate_entry` del live per i filtri (rigoroso) e
un modello Black-Scholes con skew premium per stimare credito ed
exit P/L (stilizzato — vedi docstring di `core/backtest.py`).
"""
from cerbero_bite.config.loader import load_strategy # noqa: PLC0415
from cerbero_bite.core.backtest import run_backtest # noqa: PLC0415
console = Console()
if date_to is None:
date_to = datetime.now(UTC)
if date_from is None:
date_from = date_to - timedelta(days=90)
date_from = date_from.replace(tzinfo=UTC) if date_from.tzinfo is None else date_from
date_to = date_to.replace(tzinfo=UTC) if date_to.tzinfo is None else date_to
loaded = load_strategy(strategy_path, enforce_hash=not no_enforce_hash)
cfg = loaded.config
conn = connect_state(db_path)
try:
repo = Repository()
snapshots = repo.list_market_snapshots(
conn,
asset=asset.upper(),
start=date_from,
end=date_to,
limit=10000,
)
finally:
conn.close()
if not snapshots:
console.print(
f"[yellow]Nessuno snapshot {asset} trovato fra {date_from.date()} "
f"e {date_to.date()}.[/yellow]"
)
sys.exit(1)
console.print(
f"[green]Caricate {len(snapshots)} snapshot {asset} "
f"({snapshots[-1].timestamp.date()}{snapshots[0].timestamp.date()})[/green]"
)
report = run_backtest(snapshots, cfg, capital_usd=Decimal(str(capital)))
table = Table(title=f"Backtest report — {strategy_path.name}")
table.add_column("Metrica", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Picks (lunedì 14:00)", str(report.n_picks))
table.add_row(
"Accettati dai filtri",
f"{report.n_accepted} ({report.n_accepted / max(1, report.n_picks):.0%})",
)
table.add_row("Saltati per dato mancante", str(report.n_skipped_data))
table.add_row("Trade completati (con P/L)", str(report.n_completed))
table.add_row("Vincenti", f"{report.n_winners} ({report.win_rate:.0%})")
table.add_row("P/L cumulato (USD)", f"{report.cumulative_pnl_usd:+.2f}")
table.add_row(
"P/L su capitale", f"{report.cumulative_pnl_pct_of_capital:+.2%}"
)
table.add_row(
"Max drawdown", f"{report.max_drawdown_usd:.0f} USD "
f"({report.max_drawdown_pct:.1%})",
)
table.add_row(
"Sharpe (annualized)",
f"{report.sharpe_annualized}" if report.sharpe_annualized is not None
else "",
)
console.print(table)
if report.skip_reasons:
skip_table = Table(title="Motivi di skip aggregati")
skip_table.add_column("Motivo")
skip_table.add_column("Settimane", justify="right")
for reason, count in sorted(
report.skip_reasons.items(), key=lambda kv: -kv[1]
):
skip_table.add_row(reason, str(count))
console.print(skip_table)
console.print(
"[dim]Il modello P/L è stilizzato: BS + skew premium 1.5×. "
"Numeri ottimi per ranking config, non per promesse operative.[/dim]"
)
@main.group() @main.group()
def config() -> None: def config() -> None:
"""Strategy configuration utilities.""" """Strategy configuration utilities."""
@@ -812,6 +961,241 @@ def state_inspect(db: Path) -> None:
console.print(table) console.print(table)
@main.group(name="option-chain")
def option_chain() -> None:
"""Strumenti per la catena opzioni storica (`option_chain_snapshots`)."""
@option_chain.command(name="trigger")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--audit",
"audit_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
@click.option(
"--token",
type=str,
default=None,
help="MCP bearer token (override su CERBERO_BITE_MCP_TOKEN).",
)
@click.option("--asset", default="ETH", show_default=True)
def option_chain_trigger(
strategy_path: Path,
db_path: Path,
audit_path: Path,
token: str | None,
asset: str,
) -> None:
"""Esegue UNA volta il collector della catena opzioni e persiste in DB.
Utile per popolare i dati senza aspettare il cron settimanale del
job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline
schedulata.
"""
from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415
from cerbero_bite.runtime.option_chain_snapshot_cycle import ( # noqa: PLC0415
collect_option_chain_snapshot,
)
cfg = load_strategy(strategy_path).config
ctx = build_runtime(
cfg=cfg,
endpoints=load_endpoints(),
token=load_token(value=token),
db_path=db_path,
audit_path=audit_path,
bot_tag=load_bot_tag(),
)
n = asyncio.run(collect_option_chain_snapshot(ctx, asset=asset))
console.print(
f"[green]Persisted {n} option chain quote(s) for {asset}[/green]"
if n > 0
else f"[yellow]No quotes persisted (chain empty or fetch failed)[/yellow]"
)
@option_chain.command(name="analyze")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option("--asset", default="ETH", show_default=True)
@click.option(
"--bias",
type=click.Choice(["bull_put", "bear_call"], case_sensitive=False),
default="bull_put",
show_default=True,
help="Direzione da simulare (il rule engine lo deciderebbe da trend×funding).",
)
def option_chain_analyze(
strategy_path: Path,
db_path: Path,
asset: str,
bias: str,
) -> None:
"""Analizza l'ultimo snapshot di catena salvato.
Per la strategia indicata, simula la selezione strike (delta
target, OTM range, width 4%, credit/width ratio min) e mostra:
* lo strike che il rule engine sceglierebbe come short e long,
* credito atteso, larghezza, rapporto credit/width,
* pass/fail del gate `credit_to_width_ratio_min`.
"""
from cerbero_bite.core.combo_builder import select_strikes # noqa: PLC0415
from cerbero_bite.core.types import OptionQuote # noqa: PLC0415
cfg = load_strategy(strategy_path).config
conn = connect_state(db_path)
try:
repo = Repository()
latest_ts = repo.latest_option_chain_timestamp(conn, asset=asset.upper())
if latest_ts is None:
console.print(
"[red]Nessuno snapshot di catena trovato. Lancia prima "
"`cerbero-bite option-chain trigger`.[/red]"
)
sys.exit(1)
quotes_records = repo.list_option_chain_snapshots(
conn, asset=asset.upper(), start=latest_ts, end=latest_ts,
)
finally:
conn.close()
console.print(
f"[cyan]Snapshot del {latest_ts.isoformat()}{len(quotes_records)} "
f"quote totali[/cyan]"
)
# Costruzione OptionQuote da OptionChainQuoteRecord per riusare select_strikes.
quotes: list[OptionQuote] = []
for q in quotes_records:
if q.bid is None or q.ask is None or q.mid is None or q.delta is None:
continue
quotes.append(
OptionQuote(
instrument=q.instrument_name,
strike=q.strike,
expiry=q.expiry,
option_type=q.option_type,
bid=q.bid,
ask=q.ask,
mid=q.mid,
delta=q.delta,
gamma=q.gamma or Decimal("0"),
theta=q.theta or Decimal("0"),
vega=q.vega or Decimal("0"),
open_interest=q.open_interest or 0,
volume_24h=q.volume_24h or 0,
book_depth_top3=q.book_depth_top3 or 0,
)
)
if not quotes:
console.print("[red]Nessun quote completo per la simulazione.[/red]")
sys.exit(1)
# Lo spot al momento dello snapshot: estraiamo dall'ultimo
# `market_snapshot` ETH a quel timestamp (tolleranza ±15 min).
spot = _resolve_spot_at(db_path, asset=asset.upper(), at=latest_ts)
if spot is None:
console.print(
"[yellow]Spot non recuperabile dai market_snapshots; "
"stimato dal mid ATM.[/yellow]"
)
spot = _atm_spot_proxy(quotes)
selection = select_strikes(
chain=quotes,
bias=bias, # type: ignore[arg-type]
spot=spot,
now=latest_ts,
cfg=cfg,
)
if selection is None:
console.print(
"[red]Il rule engine NON aprirebbe trade con questa catena[/red] "
"(no strike compatibile coi gate delta/distance/width/credit-ratio)."
)
sys.exit(0)
short, long_ = selection
width_usd = (short.strike - long_.strike).copy_abs()
credit_eth = short.mid - long_.mid
credit_usd = credit_eth * spot
ratio = credit_usd / width_usd if width_usd > 0 else Decimal("0")
ratio_target = cfg.structure.credit_to_width_ratio_min
table = Table(title=f"Simulazione picker — bias={bias}, spot={spot:.0f}")
table.add_column("Campo", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Short strike", f"{short.strike} ({short.delta:+.3f}δ)")
table.add_row("Long strike", f"{long_.strike} ({long_.delta:+.3f}δ)")
table.add_row("Width", f"{width_usd:.0f} USD")
table.add_row("Credit", f"{credit_eth:.4f} ETH ≈ {credit_usd:.2f} USD")
table.add_row(
"Credit/width ratio",
f"{ratio:.2%} (gate ≥ {float(ratio_target):.0%})",
)
pass_str = (
"[green]PASS — entry possibile[/green]"
if ratio >= ratio_target
else "[red]FAIL — premio troppo magro[/red]"
)
table.add_row("Verdetto gate ratio", pass_str)
console.print(table)
def _resolve_spot_at(db_path: Path, *, asset: str, at: datetime) -> Decimal | None:
"""Best-effort lookup dello spot al timestamp ``at`` ± 15 min."""
conn = connect_state(db_path)
try:
rows = Repository().list_market_snapshots(
conn,
asset=asset,
start=at - timedelta(minutes=15),
end=at + timedelta(minutes=15),
limit=1,
)
finally:
conn.close()
if not rows:
return None
return rows[0].spot
def _atm_spot_proxy(quotes: list[Any]) -> Decimal:
"""Stima dello spot prendendo lo strike il cui delta è più vicino a 0.5."""
quote = min(quotes, key=lambda q: abs(abs(q.delta) - Decimal("0.5")))
return quote.strike
def _entrypoint() -> None: def _entrypoint() -> None:
"""Wrapper used by ``cerbero-bite`` console script.""" """Wrapper used by ``cerbero-bite`` console script."""
try: try:
+9
View File
@@ -75,6 +75,15 @@ class EntryConfig(BaseModel):
dealer_gamma_filter_enabled: bool = True dealer_gamma_filter_enabled: bool = True
liquidation_filter_enabled: bool = True liquidation_filter_enabled: bool = True
# IV richness filter (§2.9). `iv_minus_rv_min` è la soglia in
# punti vol che la IV implicita 30g deve eccedere la RV30g per
# ammettere l'entry. Letteratura short-vol systematic: l'edge
# sostenibile esiste solo con un margine misurabile fra IV e RV.
# Default disabilitato + soglia 0 per non bloccare l'avvio finché
# non si è calibrato sui dati raccolti (vedi `📐 Calibrazione`).
iv_minus_rv_min: Decimal = Field(default=Decimal("0"))
iv_minus_rv_filter_enabled: bool = False
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Structure # Structure
+652
View File
@@ -0,0 +1,652 @@
"""Stylized backtest engine over ``market_snapshots`` (§13).
Two layers, both pure functions:
1. **Entry-filter simulation** — for each Monday 14:00 UTC tick in the
recorded snapshots, evaluate which §2 gates would have passed,
reconstructing :class:`EntryContext` from the snapshot. This part
is **rigorous**: it uses the same :func:`validate_entry` the live
engine uses, so the output is exactly "what the bot would have
decided".
2. **P/L estimation per accepted entry** — since ``market_snapshots``
does NOT record the option chain (we only collect spot, DVOL,
funding, etc.), credit and exit P/L are estimated via a stylized
Black-Scholes model: given ``spot``, ``DVOL`` (as IV), and the
strategy's delta target, we solve for the short strike, the long
strike at ``width_pct`` distance, and the combo mid-price. Future
ticks are then re-priced under the same model to detect the first
exit trigger from §7.
The stylized layer is **intentionally approximate**: it captures the
geometry of the strategy (DVOL band sets credit, ETH path drives
exit triggers) but not the second-order effects (chain liquidity,
borrow rates, exchange fees beyond the 0.03% notional cap, dealer
hedging skew). Numbers are good for ranking and tuning, not for
operational P/L promises.
The engine is deterministic and side-effect-free: it does **not**
write to SQLite, does not call MCP, does not place orders. It
operates entirely on a list of :class:`MarketSnapshotRecord` rows
the caller has already loaded.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Literal
from pydantic import BaseModel, ConfigDict
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.core.entry_validator import EntryContext, validate_entry
from cerbero_bite.state.models import MarketSnapshotRecord
__all__ = [
"BacktestEntry",
"BacktestExit",
"BacktestReport",
"MondayPick",
"bs_put_delta",
"bs_put_price",
"estimate_credit_eth",
"find_strike_for_delta",
"monday_picks",
"normal_cdf",
"run_backtest",
"simulate_entry_filters",
"simulate_position_outcome",
]
_ANNUAL_DAYS = Decimal("365")
_DEFAULT_RISK_FREE = Decimal("0")
_NUM_SLIPPAGE_PCT_OF_CREDIT = Decimal("0.03")
_NUM_FEE_PCT_OF_NOTIONAL = Decimal("0.0003")
# ---------------------------------------------------------------------------
# Black-Scholes helpers (stdlib-only)
# ---------------------------------------------------------------------------
def normal_cdf(x: float) -> float:
"""Standard normal CDF, no scipy dependency."""
return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))
def bs_put_price(*, spot: float, strike: float, t_years: float, sigma: float) -> float:
"""European put price under r=0, q=0 Black-Scholes.
Returns price in spot units (so for an ETH option, dividing by spot
gives the price in ETH).
"""
if t_years <= 0 or sigma <= 0 or spot <= 0 or strike <= 0:
return max(0.0, strike - spot)
sqrt_t = math.sqrt(t_years)
d1 = (math.log(spot / strike) + 0.5 * sigma * sigma * t_years) / (sigma * sqrt_t)
d2 = d1 - sigma * sqrt_t
return strike * normal_cdf(-d2) - spot * normal_cdf(-d1)
def bs_put_delta(*, spot: float, strike: float, t_years: float, sigma: float) -> float:
"""Put delta under r=0, q=0 Black-Scholes (negative number for put).
Returns 0 for expired options.
"""
if t_years <= 0 or sigma <= 0 or spot <= 0 or strike <= 0:
return 0.0
sqrt_t = math.sqrt(t_years)
d1 = (math.log(spot / strike) + 0.5 * sigma * sigma * t_years) / (sigma * sqrt_t)
return normal_cdf(d1) - 1.0 # = -N(-d1)
def find_strike_for_delta(
*,
spot: float,
dvol_pct: float,
dte_days: int,
target_delta_abs: float,
) -> float:
"""Solve for the put strike whose |delta| matches ``target_delta_abs``.
Bisection on a monotone-decreasing |delta(strike)| relationship.
Returns the strike in absolute USD terms.
"""
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
# Bracket: from 50% of spot (deep OTM, small |delta|) up to spot
# (ATM, |delta| ≈ 0.5).
low = max(1.0, spot * 0.30)
high = spot
for _ in range(64):
mid = 0.5 * (low + high)
delta_abs = abs(bs_put_delta(spot=spot, strike=mid, t_years=t_years, sigma=sigma))
if delta_abs > target_delta_abs:
high = mid
else:
low = mid
if abs(high - low) < 1e-3:
break
return 0.5 * (low + high)
def estimate_credit_eth(
*,
spot: float,
dvol_pct: float,
dte_days: int,
width_pct: float,
delta_target_abs: float,
skew_premium: float = 1.5,
) -> tuple[float, float, float]:
"""Estimate credit (ETH), short_strike, long_strike for a bull-put-style
credit spread under Black-Scholes.
``skew_premium`` è il moltiplicatore applicato al credito BS per
approssimare la **vol smile** dell'ETH options market (le put OTM
trattano a IV più alta della IV ATM, quindi un BS pulito sottostima
sistematicamente il premio del venditore di vol). Il default 1.5
è una stima conservativa dei dati Deribit storici (smile slope
tipica 5-10 vol points per 100δ); valori sensati: 1.3 (smile
blanda) … 1.8 (regime "stress IV"). Va calibrato sui dati reali
quando avremo abbastanza chain history da farlo.
Returns ``(credit_eth, short_strike, long_strike)``. Credit è
già moltiplicato per ``skew_premium``.
"""
short_strike = find_strike_for_delta(
spot=spot, dvol_pct=dvol_pct, dte_days=dte_days,
target_delta_abs=delta_target_abs,
)
width_usd = width_pct * spot
long_strike = max(1.0, short_strike - width_usd)
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
short_mid_usd = bs_put_price(
spot=spot, strike=short_strike, t_years=t_years, sigma=sigma,
)
long_mid_usd = bs_put_price(
spot=spot, strike=long_strike, t_years=t_years, sigma=sigma,
)
short_mid_eth = short_mid_usd / spot
long_mid_eth = long_mid_usd / spot
credit_eth = (short_mid_eth - long_mid_eth) * skew_premium
return credit_eth, short_strike, long_strike
# ---------------------------------------------------------------------------
# Entry filter simulation — rigorous (uses validate_entry exactly)
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class MondayPick:
"""Indice di un tick "Monday 14:00 UTC" nella time-series."""
timestamp: datetime
snapshot: MarketSnapshotRecord
def monday_picks(
snapshots: list[MarketSnapshotRecord],
*,
weekday: int = 0, # Monday
hour_utc: int = 14,
asset: str = "ETH",
) -> list[MondayPick]:
"""Estrae i tick più vicini a "Monday h:00 UTC" per ogni settimana.
``snapshots`` deve essere ordinato per timestamp ascending. Per ogni
occorrenza di ``weekday + hour_utc`` (es. lun 14:00) presa l'unica
riga ETH che la copre. Settimane senza tick a quell'ora vengono
saltate.
"""
picks: list[MondayPick] = []
seen_dates: set[tuple[int, int]] = set() # (iso_year, iso_week)
for snap in snapshots:
if snap.asset.upper() != asset.upper():
continue
ts = snap.timestamp.astimezone(UTC)
if ts.weekday() != weekday or ts.hour != hour_utc:
continue
iso_y, iso_w, _ = ts.isocalendar()
key = (iso_y, iso_w)
if key in seen_dates:
continue
seen_dates.add(key)
picks.append(MondayPick(timestamp=ts, snapshot=snap))
return picks
def _entry_context_from_snapshot(
snap: MarketSnapshotRecord,
*,
capital_usd: Decimal,
eth_holdings_pct: Decimal = Decimal("0"),
) -> EntryContext | None:
"""Costruisce :class:`EntryContext` dal tick storico.
``None`` quando la riga non ha i campi minimi (spot, dvol, funding).
Nel filtro questo si traduce in "skip della settimana" — è la
stessa logica del live: un tick incompleto è meglio di un'entry
al buio.
"""
if snap.dvol is None or snap.funding_perp_annualized is None:
return None
return EntryContext(
capital_usd=capital_usd,
dvol_now=snap.dvol,
funding_perp_annualized=snap.funding_perp_annualized,
eth_holdings_pct_of_portfolio=eth_holdings_pct,
next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma,
liquidation_squeeze_risk_high=(
snap.liquidation_long_risk == "high"
or snap.liquidation_short_risk == "high"
),
)
@dataclass(frozen=True)
class EntryFilterResult:
"""Esito del check filtri per una singola Monday pick."""
pick: MondayPick
accepted: bool
reasons: list[str]
skipped_for_data: bool # True se il tick non aveva i campi minimi
def simulate_entry_filters(
picks: list[MondayPick],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
) -> list[EntryFilterResult]:
"""Per ogni Monday pick, valuta validate_entry come farebbe il live.
Rigoroso: usa esattamente :func:`validate_entry` e :class:`EntryContext`.
Restituisce la lista degli esiti, una entry per pick.
"""
results: list[EntryFilterResult] = []
for pick in picks:
ctx = _entry_context_from_snapshot(pick.snapshot, capital_usd=capital_usd)
if ctx is None:
results.append(
EntryFilterResult(
pick=pick,
accepted=False,
reasons=["incomplete_snapshot"],
skipped_for_data=True,
)
)
continue
decision = validate_entry(ctx, cfg)
results.append(
EntryFilterResult(
pick=pick,
accepted=decision.accepted,
reasons=list(decision.reasons),
skipped_for_data=False,
)
)
return results
# ---------------------------------------------------------------------------
# Position outcome simulation — stylized (Black-Scholes re-pricing)
# ---------------------------------------------------------------------------
class BacktestEntry(BaseModel):
"""Trade aperto nel backtest (snapshot al momento dell'entry)."""
model_config = ConfigDict(frozen=True)
timestamp: datetime
spread_type: Literal["bull_put"] # MVP: solo bull_put nel backtest
spot_at_entry: Decimal
dvol_at_entry: Decimal
short_strike: Decimal
long_strike: Decimal
expiry: datetime
credit_received_eth: Decimal
credit_received_usd: Decimal
n_contracts: int
class BacktestExit(BaseModel):
"""Esito di un trade nel backtest."""
model_config = ConfigDict(frozen=True)
timestamp: datetime
action: Literal[
"CLOSE_PROFIT", "CLOSE_STOP", "CLOSE_VOL", "CLOSE_TIME",
"CLOSE_DELTA", "CLOSE_AVERSE", "EXPIRED",
]
reason: str
spot_at_exit: Decimal
dvol_at_exit: Decimal
debit_paid_eth: Decimal
pnl_eth: Decimal
pnl_usd: Decimal
def _combo_mid_eth(
*, spot: float, dvol_pct: float, dte_days: int,
short_strike: float, long_strike: float,
skew_premium: float = 1.5,
) -> float:
"""Re-prezza il combo bull-put usando BS sul nuovo spot/dvol/dte."""
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
short_mid_usd = bs_put_price(
spot=spot, strike=short_strike, t_years=t_years, sigma=sigma,
)
long_mid_usd = bs_put_price(
spot=spot, strike=long_strike, t_years=t_years, sigma=sigma,
)
return (short_mid_usd - long_mid_usd) / spot * skew_premium
def simulate_position_outcome(
entry: BacktestEntry,
future_snapshots: list[MarketSnapshotRecord],
cfg: StrategyConfig,
) -> BacktestExit:
"""Re-prezza il combo a ogni tick futuro fino al primo exit trigger.
Triggers in ordine §7:
1. profit_take (debit ≤ 0.5×credit)
2. stop_loss (debit ≥ 2.5×credit)
3. vol_stop (DVOL salita di ≥10 pt rispetto entry)
4. time_stop (DTE ≤ 7 e debit > 0.7×credit)
5. expiry (uscita per scadenza, P/L = credit intrinsic)
"""
ec = cfg.exit
credit = float(entry.credit_received_eth)
short = float(entry.short_strike)
long_ = float(entry.long_strike)
profit_thresh = float(ec.profit_take_pct_of_credit) * credit
stop_thresh = float(ec.stop_loss_mark_x_credit) * credit
skip_time_thresh = float(ec.time_stop_skip_if_close_to_profit_pct) * credit
for snap in future_snapshots:
if snap.timestamp <= entry.timestamp:
continue
if snap.timestamp >= entry.expiry:
break
if snap.dvol is None or snap.spot is None:
continue
spot_now = float(snap.spot)
dvol_now = float(snap.dvol)
dte = max(0, (entry.expiry - snap.timestamp).days)
debit = _combo_mid_eth(
spot=spot_now, dvol_pct=dvol_now, dte_days=dte,
short_strike=short, long_strike=long_,
)
if debit <= profit_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_PROFIT",
reason=f"debit {debit:.4f}{profit_thresh:.4f}",
)
if debit >= stop_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_STOP",
reason=f"debit {debit:.4f}{stop_thresh:.4f}",
)
if dvol_now >= float(entry.dvol_at_entry) + float(ec.vol_stop_dvol_increase):
return _exit(
snap, entry, debit,
action="CLOSE_VOL",
reason=f"DVOL {dvol_now:.1f} ≥ entry+{ec.vol_stop_dvol_increase}",
)
if dte <= ec.time_stop_dte_remaining and debit > skip_time_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_TIME",
reason=f"DTE {dte}{ec.time_stop_dte_remaining}",
)
# Tick passati senza trigger: scadenza naturale.
last = future_snapshots[-1] if future_snapshots else None
intrinsic = max(0.0, short - float(last.spot if last and last.spot else 0))
intrinsic_capped = min(intrinsic, short - long_)
debit_at_expiry_eth = (
intrinsic_capped / float(last.spot)
if last is not None and last.spot is not None and float(last.spot) > 0
else 0.0
)
return _exit(
last or _synthetic_expiry_snapshot(entry),
entry,
debit_at_expiry_eth,
action="EXPIRED",
reason="held to expiry",
)
def _synthetic_expiry_snapshot(entry: BacktestEntry) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=entry.expiry,
asset="ETH",
spot=entry.spot_at_entry,
dvol=entry.dvol_at_entry,
fetch_ok=False,
)
def _exit(
snap: MarketSnapshotRecord,
entry: BacktestEntry,
debit_eth: float,
*,
action: str,
reason: str,
) -> BacktestExit:
pnl_eth = float(entry.credit_received_eth) - debit_eth
spot = float(snap.spot) if snap.spot is not None else float(entry.spot_at_entry)
dvol = float(snap.dvol) if snap.dvol is not None else float(entry.dvol_at_entry)
return BacktestExit(
timestamp=snap.timestamp,
action=action, # type: ignore[arg-type]
reason=reason,
spot_at_exit=Decimal(str(spot)),
dvol_at_exit=Decimal(str(dvol)),
debit_paid_eth=Decimal(str(debit_eth)),
pnl_eth=Decimal(str(pnl_eth)),
pnl_usd=Decimal(str(pnl_eth * spot * entry.n_contracts)),
)
# ---------------------------------------------------------------------------
# Full pipeline
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class CompletedTrade:
entry: BacktestEntry
exit: BacktestExit
class BacktestReport(BaseModel):
"""Aggregato del backtest. Tutti i numeri sono **stime**."""
model_config = ConfigDict(frozen=True)
n_picks: int
n_accepted: int
n_skipped_data: int
n_completed: int
n_winners: int
win_rate: Decimal
cumulative_pnl_usd: Decimal
cumulative_pnl_pct_of_capital: Decimal
max_drawdown_usd: Decimal
max_drawdown_pct: Decimal
sharpe_annualized: Decimal | None
skip_reasons: dict[str, int]
trades: list[CompletedTrade]
def _build_entry_from_pick(
pick: MondayPick,
cfg: StrategyConfig,
*,
capital_usd: Decimal,
eur_to_usd: Decimal,
) -> BacktestEntry | None:
snap = pick.snapshot
if snap.spot is None or snap.dvol is None:
return None
spot = float(snap.spot)
dvol = float(snap.dvol)
width_pct = float(cfg.structure.spread_width.target_pct_of_spot)
delta_target = float(cfg.structure.short_strike.delta_target)
dte = cfg.structure.dte_target
credit_eth, short_strike, long_strike = estimate_credit_eth(
spot=spot, dvol_pct=dvol, dte_days=dte,
width_pct=width_pct, delta_target_abs=delta_target,
)
width_usd = float(cfg.structure.spread_width.target_pct_of_spot) * spot
credit_usd = credit_eth * spot
if width_usd <= 0 or credit_usd / width_usd < float(
cfg.structure.credit_to_width_ratio_min
):
return None # ratio gate fallisce → no entry
cap_pertrade_usd = float(cfg.sizing.cap_per_trade_eur) * float(eur_to_usd)
risk_target = min(float(cfg.sizing.kelly_fraction) * float(capital_usd), cap_pertrade_usd)
n_contracts = max(0, min(int(risk_target // width_usd), cfg.sizing.max_contracts_per_trade))
if n_contracts == 0:
return None
expiry = pick.timestamp + timedelta(days=dte)
return BacktestEntry(
timestamp=pick.timestamp,
spread_type="bull_put",
spot_at_entry=Decimal(str(spot)),
dvol_at_entry=Decimal(str(dvol)),
short_strike=Decimal(str(round(short_strike, 2))),
long_strike=Decimal(str(round(long_strike, 2))),
expiry=expiry,
credit_received_eth=Decimal(str(credit_eth)),
credit_received_usd=Decimal(str(credit_usd * n_contracts)),
n_contracts=n_contracts,
)
def _max_drawdown_usd(equity: list[Decimal]) -> tuple[Decimal, Decimal]:
"""Return ``(max_dd_usd, max_dd_pct_of_peak)`` over an equity curve."""
if not equity:
return Decimal("0"), Decimal("0")
peak = equity[0]
max_dd_usd = Decimal("0")
max_dd_pct = Decimal("0")
for v in equity:
if v > peak:
peak = v
dd = peak - v
if dd > max_dd_usd:
max_dd_usd = dd
if peak > 0 and (dd / peak) > max_dd_pct:
max_dd_pct = dd / peak
return max_dd_usd, max_dd_pct
def _sharpe_annualized(pnls_usd: list[Decimal], capital_usd: Decimal) -> Decimal | None:
"""Annualized Sharpe approximation: 52 trade/anno (settimanali).
Restituisce ``None`` se ci sono <5 trade o stdev = 0.
"""
if len(pnls_usd) < 5 or capital_usd <= 0:
return None
rets = [float(p / capital_usd) for p in pnls_usd]
mean = sum(rets) / len(rets)
var = sum((r - mean) ** 2 for r in rets) / max(1, (len(rets) - 1))
std = math.sqrt(var)
if std == 0:
return None
sharpe = mean / std * math.sqrt(52)
return Decimal(str(round(sharpe, 3)))
def run_backtest(
snapshots: list[MarketSnapshotRecord],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
eur_to_usd: Decimal = Decimal("1.075"),
asset: str = "ETH",
) -> BacktestReport:
"""Esegue il backtest end-to-end sui ``snapshots`` ETH ordinati per ts."""
snapshots = sorted(snapshots, key=lambda s: s.timestamp)
eth_snapshots = [s for s in snapshots if s.asset.upper() == asset.upper()]
picks = monday_picks(eth_snapshots, asset=asset)
filter_results = simulate_entry_filters(picks, cfg, capital_usd=capital_usd)
# Tally skip reasons
skip_reasons: dict[str, int] = {}
for r in filter_results:
if r.accepted:
continue
for reason in r.reasons:
skip_reasons[reason] = skip_reasons.get(reason, 0) + 1
trades: list[CompletedTrade] = []
for r in filter_results:
if not r.accepted:
continue
entry = _build_entry_from_pick(
r.pick, cfg, capital_usd=capital_usd, eur_to_usd=eur_to_usd,
)
if entry is None:
skip_reasons["sizing_or_ratio"] = skip_reasons.get("sizing_or_ratio", 0) + 1
continue
future = [s for s in eth_snapshots if s.timestamp > r.pick.timestamp]
exit_ = simulate_position_outcome(entry, future, cfg)
trades.append(CompletedTrade(entry=entry, exit=exit_))
pnls = [t.exit.pnl_usd for t in trades]
cumulative = sum(pnls, start=Decimal("0"))
n_winners = sum(1 for p in pnls if p > 0)
win_rate = (
Decimal(n_winners) / Decimal(len(pnls))
if pnls
else Decimal("0")
)
# Equity curve in USD assoluti
equity = [capital_usd]
for p in pnls:
equity.append(equity[-1] + p)
max_dd_usd, max_dd_pct = _max_drawdown_usd(equity)
return BacktestReport(
n_picks=len(picks),
n_accepted=sum(1 for r in filter_results if r.accepted),
n_skipped_data=sum(1 for r in filter_results if r.skipped_for_data),
n_completed=len(trades),
n_winners=n_winners,
win_rate=win_rate,
cumulative_pnl_usd=cumulative,
cumulative_pnl_pct_of_capital=(
cumulative / capital_usd if capital_usd > 0 else Decimal("0")
),
max_drawdown_usd=max_dd_usd,
max_drawdown_pct=max_dd_pct,
sharpe_annualized=_sharpe_annualized(pnls, capital_usd),
skip_reasons=skip_reasons,
trades=trades,
)
+20
View File
@@ -44,6 +44,12 @@ class EntryContext(BaseModel):
dealer_net_gamma: Decimal | None = None dealer_net_gamma: Decimal | None = None
liquidation_squeeze_risk_high: bool | None = None liquidation_squeeze_risk_high: bool | None = None
# IV richness gate (§2.9). Differenza IV30g RV30g in punti vol.
# Optional, stessa logica best-effort dei filtri quant: ``None``
# significa "dato non disponibile" e fa saltare il gate (non
# invalida l'entry).
iv_minus_rv: Decimal | None = None
class EntryDecision(BaseModel): class EntryDecision(BaseModel):
"""Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons.""" """Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons."""
@@ -131,6 +137,20 @@ def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision:
): ):
reasons.append("imminent liquidation squeeze risk") reasons.append("imminent liquidation squeeze risk")
# §2.9: IV richness gate. Vendere vol senza un margine misurabile
# fra IV e RV è statisticamente neutro: l'edge della strategia
# esiste solo quando il premio è "ricco" rispetto a quanto il
# mercato si è effettivamente mosso.
if (
entry_cfg.iv_minus_rv_filter_enabled
and ctx.iv_minus_rv is not None
and ctx.iv_minus_rv < entry_cfg.iv_minus_rv_min
):
reasons.append(
f"IV richness below floor "
f"(IV-RV={ctx.iv_minus_rv} < {entry_cfg.iv_minus_rv_min} vol pts)"
)
return EntryDecision(accepted=not reasons, reasons=reasons) return EntryDecision(accepted=not reasons, reasons=reasons)
+105 -10
View File
@@ -11,6 +11,7 @@ La pagina è di sola lettura: non chiama MCP, non scrive sul DB.
from __future__ import annotations from __future__ import annotations
import math
import os import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -280,13 +281,18 @@ def _build_gates(
) )
) )
# --- IV RV (richness) — solo informativo -------------------- # --- IV RV (richness) — gate §2.9 ---------------------------
rv = ( rv = (
float(snap.realized_vol_30d) if snap.realized_vol_30d is not None else None float(snap.realized_vol_30d) if snap.realized_vol_30d is not None else None
) )
iv_minus_rv = ( iv_minus_rv = (
float(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None float(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
) )
iv_min = float(getattr(entry, "iv_minus_rv_min", 0.0)) if entry else 0.0
iv_enabled = (
bool(getattr(entry, "iv_minus_rv_filter_enabled", False)) if entry else False
)
if not iv_enabled:
rows.append( rows.append(
_GateRow( _GateRow(
"IV RV (richness)", "IV RV (richness)",
@@ -295,9 +301,34 @@ def _build_gates(
if iv_minus_rv is not None if iv_minus_rv is not None
else "" else ""
), ),
"info, > 0 = premio ricco", "filtro DISABILITATO (info-only)",
"pass" if (iv_minus_rv is not None and iv_minus_rv > 0) else "n/a", "n/a",
f"RV30={rv:.2f}" if rv is not None else "", f"RV30={rv:.2f} · attiva con `iv_minus_rv_filter_enabled: true`"
if rv is not None
else "Attiva con `iv_minus_rv_filter_enabled: true`",
)
)
elif iv_minus_rv is None:
rows.append(
_GateRow(
"IV RV ≥ soglia",
"",
f"{iv_min:.1f} pt vol",
"n/a",
"Dato non disponibile in questo tick (best-effort skip).",
)
)
else:
ok = iv_minus_rv >= iv_min
rows.append(
_GateRow(
"IV RV ≥ soglia",
f"{iv_minus_rv:+.2f} pt vol",
f"{iv_min:.1f} pt vol",
"pass" if ok else "fail",
"Premio ricco rispetto a quanto il mercato si è davvero "
"mosso → edge sostenibile per il venditore di vol."
+ (f" RV30={rv:.2f}" if rv is not None else ""),
) )
) )
@@ -470,6 +501,37 @@ def _compute_pl(
annual_pl = trades_eff * n_per_trade * concurrency * e_trade_net annual_pl = trades_eff * n_per_trade * concurrency * e_trade_net
apr = (annual_pl / capital) if capital > 0 else 0.0 apr = (annual_pl / capital) if capital > 0 else 0.0
# --- Max drawdown -------------------------------------------------
# Due metriche distinte:
#
# 1. **Streak atteso (P99)**: lunghezza della peggior sequenza di
# stop consecutivi che ci si aspetta di vedere in un anno con
# probabilità ≤ 1%. Usa l'approssimazione union-bound:
# P(streak ≥ N in N_trade tentativi) ≈ N_trade × p_loss^N
# Imponendo questa quantità ≤ 0.01 e risolvendo per N:
# N = ceil( log(0.01 / N_trade) / log(p_loss) )
# Drawdown corrispondente = N × stop_loss × contracts × concurrency.
#
# 2. **Tail/gap risk**: scenario "gap notturno" in cui il mark
# salta oltre la copertura long PRIMA che lo stop sia
# eseguibile. La perdita massima reale è la larghezza intera
# dello spread meno il credito iniziale, su tutte le posizioni
# aperte simultaneamente.
if prob_loss > 0 and prob_loss < 1 and trades_per_year > 0:
streak_99 = max(
1,
int(math.ceil(
math.log(0.01 / trades_per_year) / math.log(prob_loss)
)) if prob_loss < 1 else 1,
)
else:
streak_99 = 0
expected_dd_usd = streak_99 * sl_loss * n_per_trade * concurrency
expected_dd_pct = expected_dd_usd / capital if capital > 0 else 0.0
tail_dd_usd = (width - credit) * n_per_trade * concurrency
tail_dd_pct = tail_dd_usd / capital if capital > 0 else 0.0
return { return {
"width": width, "width": width,
"credit": credit, "credit": credit,
@@ -483,10 +545,15 @@ def _compute_pl(
"apr": apr, "apr": apr,
"fees": fees, "fees": fees,
"slippage": slippage, "slippage": slippage,
"win_rate_eff": win_rate_eff,
"trades_eff": trades_eff,
"prob_loss": prob_loss, "prob_loss": prob_loss,
"prob_harvest": prob_harvest, "prob_harvest": prob_harvest,
"streak_99": float(streak_99),
"expected_dd_usd": expected_dd_usd,
"expected_dd_pct": expected_dd_pct,
"tail_dd_usd": tail_dd_usd,
"tail_dd_pct": tail_dd_pct,
"win_rate_eff": win_rate_eff,
"trades_eff": trades_eff,
} }
@@ -560,6 +627,34 @@ def _render_profile_card(
), ),
) )
cols = st.columns(2)
cols[0].metric(
"Max DD attesa (P99)",
f"{metrics['expected_dd_usd']:.0f} USD",
delta=f"{-metrics['expected_dd_pct']:+.1%} cap",
delta_color="inverse",
help=(
f"Streak di {int(metrics['streak_99'])} stop consecutivi "
f"(probabilità ≤ 1% nell'anno) × perdita stop "
f"({metrics['sl_loss']:.0f} USD) × contratti × posizioni "
f"concorrenti. È la peggior sequenza che ti aspetti di "
"vedere; il drawdown reale può essere maggiore se i filtri "
"non rilevano un regime change."
),
)
cols[1].metric(
"Max DD coda (gap)",
f"{metrics['tail_dd_usd']:.0f} USD",
delta=f"{-metrics['tail_dd_pct']:+.1%} cap",
delta_color="inverse",
help=(
"Scenario gap notturno: il mark salta oltre la copertura "
"long PRIMA che lo stop sia eseguibile. Perdita = larghezza "
"intera meno credito, su tutte le posizioni aperte. "
"I filtri quant + macro lo riducono ma NON lo annullano."
),
)
if metrics["n_per_trade"] == 0: if metrics["n_per_trade"] == 0:
st.warning( st.warning(
"Sizing 0 contratti: capitale insufficiente per i cap di " "Sizing 0 contratti: capitale insufficiente per i cap di "
@@ -749,10 +844,10 @@ def _render_pl_panel(
sens_rows.append( sens_rows.append(
{ {
"Win rate": f"{wr:.0%}", "Win rate": f"{wr:.0%}",
"Conservativa P/L": f"{m_c['annual_pl']:+.0f} USD", "Cons. APR": f"{m_c['apr']:+.1%}",
"Conservativa APR": f"{m_c['apr']:+.1%}", "Cons. Max DD": f"{m_c['expected_dd_pct']:.1%}",
"Aggressiva P/L": f"{m_a['annual_pl']:+.0f} USD", "Aggr. APR": f"{m_a['apr']:+.1%}",
"Aggressiva APR": f"{m_a['apr']:+.1%}", "Aggr. Max DD": f"{m_a['expected_dd_pct']:.1%}",
} }
) )
st.table(sens_rows) st.table(sens_rows)
+24
View File
@@ -96,6 +96,7 @@ class _MarketSnapshot:
portfolio_eur: Decimal portfolio_eur: Decimal
dealer_net_gamma: Decimal | None dealer_net_gamma: Decimal | None
liquidation_squeeze_risk_high: bool | None liquidation_squeeze_risk_high: bool | None
iv_minus_rv: Decimal | None
async def _gather_snapshot( async def _gather_snapshot(
@@ -161,6 +162,9 @@ async def _gather_snapshot(
liquidation_t: asyncio.Task[bool | None] = asyncio.create_task( liquidation_t: asyncio.Task[bool | None] = asyncio.create_task(
_safe_liquidation_squeeze(sentiment) _safe_liquidation_squeeze(sentiment)
) )
iv_rv_t: asyncio.Task[Decimal | None] = asyncio.create_task(
_safe_iv_minus_rv(deribit)
)
await asyncio.gather( await asyncio.gather(
spot_t, spot_t,
@@ -174,6 +178,7 @@ async def _gather_snapshot(
portfolio_t, portfolio_t,
dealer_t, dealer_t,
liquidation_t, liquidation_t,
iv_rv_t,
) )
return _MarketSnapshot( return _MarketSnapshot(
spot_eth_usd=spot_t.result(), spot_eth_usd=spot_t.result(),
@@ -187,6 +192,7 @@ async def _gather_snapshot(
portfolio_eur=portfolio_t.result(), portfolio_eur=portfolio_t.result(),
dealer_net_gamma=dealer_t.result(), dealer_net_gamma=dealer_t.result(),
liquidation_squeeze_risk_high=liquidation_t.result(), liquidation_squeeze_risk_high=liquidation_t.result(),
iv_minus_rv=iv_rv_t.result(),
) )
@@ -198,6 +204,20 @@ async def _safe_dealer_gamma(deribit: DeribitClient) -> Decimal | None:
return snap.total_net_dealer_gamma return snap.total_net_dealer_gamma
async def _safe_iv_minus_rv(deribit: DeribitClient) -> Decimal | None:
"""Best-effort fetch of the IV30g RV30g spread (vol points)."""
try:
rv = await deribit.realized_vol("ETH")
except Exception:
return None
if not isinstance(rv, dict):
return None
value = rv.get("iv_minus_rv_30d")
if value is None:
return None
return value if isinstance(value, Decimal) else Decimal(str(value))
async def _safe_liquidation_squeeze(sentiment: SentimentClient) -> bool | None: async def _safe_liquidation_squeeze(sentiment: SentimentClient) -> bool | None:
try: try:
heatmap = await sentiment.liquidation_heatmap("ETH") heatmap = await sentiment.liquidation_heatmap("ETH")
@@ -415,6 +435,7 @@ async def run_entry_cycle(
next_macro_event_in_days=snap.macro_days_to_event, next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False, has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma, dealer_net_gamma=snap.dealer_net_gamma,
iv_minus_rv=snap.iv_minus_rv,
liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high, liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high,
) )
decision = validate_entry(entry_ctx, cfg) decision = validate_entry(entry_ctx, cfg)
@@ -432,6 +453,9 @@ async def run_entry_cycle(
"eth_holdings_pct": str(snap.eth_holdings_pct), "eth_holdings_pct": str(snap.eth_holdings_pct),
"portfolio_eur": str(snap.portfolio_eur), "portfolio_eur": str(snap.portfolio_eur),
"capital_usd": str(capital_usd), "capital_usd": str(capital_usd),
"iv_minus_rv": (
str(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
),
} }
} }
if not decision.accepted: if not decision.accepted:
@@ -0,0 +1,185 @@
"""Periodic option-chain snapshot collector (§13).
Fetches the Deribit option chain for every strike entro la finestra
DTE configurata, prima del trigger entry settimanale (cron
``55 13 * * MON`` di default). Persiste un quote per ogni strumento
in ``option_chain_snapshots`` con un timestamp condiviso, che diventa
il dato di base per:
* il backtest non-stilizzato (vedi ``core/backtest.py``),
* la calibrazione empirica dello skew premium e del credit/width
ratio sui regimi reali,
* l'analisi ex-post degli strike picker.
Il collector è **best-effort**: se ``get_tickers`` fallisce per un
batch, gli altri batch passano comunque; se manca completamente la
chain, il job ritorna 0 senza alzare eccezioni e logga il problema.
Non chiama l'order book per ogni strike (sarebbe troppo costoso) —
``book_depth_top3`` resta NULL nel quote, il liquidity gate del live
lo legge al volo solo per gli strike che gli interessano.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import TYPE_CHECKING, Any
from cerbero_bite.state import connect, transaction
from cerbero_bite.state.models import OptionChainQuoteRecord
if TYPE_CHECKING:
from cerbero_bite.runtime.dependencies import RuntimeContext
__all__ = ["DEFAULT_BATCH_SIZE", "collect_option_chain_snapshot"]
_log = logging.getLogger("cerbero_bite.runtime.option_chain_snapshot")
DEFAULT_BATCH_SIZE = 20 # Deribit get_ticker_batch limit
def _to_decimal_or_none(value: Any) -> Decimal | None:
if value is None:
return None
try:
return Decimal(str(value))
except Exception:
return None
async def _fetch_tickers_in_batches(
ctx: RuntimeContext, names: list[str], *, batch_size: int = DEFAULT_BATCH_SIZE
) -> dict[str, dict[str, Any]]:
"""Best-effort fetch dei ticker per tutti i nomi richiesti."""
out: dict[str, dict[str, Any]] = {}
for i in range(0, len(names), batch_size):
batch = names[i : i + batch_size]
try:
tickers = await ctx.deribit.get_tickers(batch)
except Exception as exc:
_log.warning(
"get_tickers failed for batch starting %s: %s",
batch[0] if batch else "<empty>", exc,
)
continue
for t in tickers:
name = t.get("instrument_name") or t.get("instrument")
if isinstance(name, str):
out[name] = t
return out
async def collect_option_chain_snapshot(
ctx: RuntimeContext,
*,
asset: str = "ETH",
now: datetime | None = None,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> int:
"""Collect + persist a single chain snapshot for ``asset``. Returns
the number of quotes persisted (0 on best-effort failure).
Filtra le scadenze nella finestra ``[dte_min, dte_max]`` di
``cfg.structure`` per non sprecare richieste su scadenze che il
rule engine non userebbe mai.
"""
when = (now or datetime.now(UTC)).astimezone(UTC)
cfg = ctx.cfg
expiry_from = when + timedelta(days=cfg.structure.dte_min)
expiry_to = when + timedelta(days=cfg.structure.dte_max)
try:
chain = await ctx.deribit.options_chain(
currency=asset.upper(),
expiry_from=expiry_from,
expiry_to=expiry_to,
min_open_interest=int(cfg.liquidity.open_interest_min),
)
except Exception:
_log.exception("option chain fetch failed")
return 0
if not chain:
_log.info("option chain empty for %s in window", asset)
return 0
names = [meta.name for meta in chain]
tickers = await _fetch_tickers_in_batches(ctx, names, batch_size=batch_size)
quotes: list[OptionChainQuoteRecord] = []
for meta in chain:
ticker = tickers.get(meta.name)
if ticker is None:
# Lasciamo comunque la riga senza quote: utile sapere
# che lo strumento esisteva.
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
)
)
continue
greeks = ticker.get("greeks") or {}
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
bid=_to_decimal_or_none(ticker.get("bid")),
ask=_to_decimal_or_none(ticker.get("ask")),
mid=_to_decimal_or_none(ticker.get("mark_price")),
iv=_to_decimal_or_none(ticker.get("mark_iv")),
delta=_to_decimal_or_none(greeks.get("delta")),
gamma=_to_decimal_or_none(greeks.get("gamma")),
theta=_to_decimal_or_none(greeks.get("theta")),
vega=_to_decimal_or_none(greeks.get("vega")),
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
volume_24h=(
int(ticker["volume_24h"])
if ticker.get("volume_24h") is not None
else None
),
# book_depth_top3: NULL — non lo prendiamo per ogni
# strike per non saturare l'API. Il liquidity gate
# del live lo chiede on-the-fly per gli strike
# candidati al picker.
)
)
persisted = 0
try:
conn = connect(ctx.db_path)
try:
with transaction(conn):
persisted = ctx.repository.record_option_chain_snapshot(
conn, quotes
)
finally:
conn.close()
except Exception:
_log.exception("persist option chain snapshot failed")
return 0
_log.info("option_chain_snapshot persisted %d quote(s)", persisted)
return persisted
# Avoid unused import warning for asyncio in lint when only used as type
_ = asyncio
+21
View File
@@ -34,6 +34,9 @@ from cerbero_bite.runtime.market_snapshot_cycle import (
DEFAULT_ASSETS, DEFAULT_ASSETS,
collect_market_snapshot, collect_market_snapshot,
) )
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
@@ -53,6 +56,7 @@ _CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *" _CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *" _CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *" _CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry
_BACKUP_RETENTION_DAYS = 30 _BACKUP_RETENTION_DAYS = 30
@@ -217,6 +221,8 @@ class Orchestrator:
manual_actions_cron: str = _CRON_MANUAL_ACTIONS, manual_actions_cron: str = _CRON_MANUAL_ACTIONS,
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT, market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS, market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
backup_dir: Path | None = None, backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS, backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler: ) -> AsyncIOScheduler:
@@ -282,6 +288,14 @@ class Orchestrator:
await _safe("market_snapshot", _do) await _safe("market_snapshot", _do)
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
await _safe("option_chain_snapshot", _do)
jobs: list[JobSpec] = [ jobs: list[JobSpec] = [
JobSpec(name="health", cron=health_cron, coro_factory=_health), JobSpec(name="health", cron=health_cron, coro_factory=_health),
JobSpec(name="backup", cron=backup_cron, coro_factory=_backup), JobSpec(name="backup", cron=backup_cron, coro_factory=_backup),
@@ -309,6 +323,13 @@ class Orchestrator:
coro_factory=_market_snapshot, coro_factory=_market_snapshot,
) )
) )
jobs.append(
JobSpec(
name="option_chain_snapshot",
cron=option_chain_cron,
coro_factory=_option_chain_snapshot,
)
)
else: else:
_log.warning( _log.warning(
"data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS=" "data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS="
@@ -0,0 +1,42 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
-- in quello stesso tick condividono il timestamp, così filtrare per
-- "lo snapshot del 2026-05-04 alle 13:55" è una semplice
-- WHERE timestamp = X.
CREATE TABLE option_chain_snapshots (
timestamp TEXT NOT NULL,
asset TEXT NOT NULL,
instrument_name TEXT NOT NULL,
strike TEXT NOT NULL,
expiry TEXT NOT NULL,
option_type TEXT NOT NULL CHECK (option_type IN ('C','P')),
bid TEXT,
ask TEXT,
mid TEXT,
iv TEXT,
delta TEXT,
gamma TEXT,
theta TEXT,
vega TEXT,
open_interest INTEGER,
volume_24h INTEGER,
book_depth_top3 INTEGER,
PRIMARY KEY (timestamp, instrument_name)
) WITHOUT ROWID;
CREATE INDEX idx_option_chain_asset_ts
ON option_chain_snapshots(asset, timestamp DESC);
CREATE INDEX idx_option_chain_expiry
ON option_chain_snapshots(asset, expiry);
PRAGMA user_version = 5;
+31
View File
@@ -22,6 +22,7 @@ __all__ = [
"InstructionRecord", "InstructionRecord",
"ManualAction", "ManualAction",
"MarketSnapshotRecord", "MarketSnapshotRecord",
"OptionChainQuoteRecord",
"PositionRecord", "PositionRecord",
"PositionStatus", "PositionStatus",
"SystemStateRecord", "SystemStateRecord",
@@ -148,6 +149,36 @@ class MarketSnapshotRecord(BaseModel):
fetch_errors_json: str | None = None fetch_errors_json: str | None = None
class OptionChainQuoteRecord(BaseModel):
"""Row of the ``option_chain_snapshots`` table.
One row per (snapshot_ts, instrument) — the same ``timestamp`` is
shared by every quote prelevato nello stesso tick del cron. Tutti
i campi numerici sono opzionali perché il collector è
best-effort: un ticker mancante non invalida il resto della chain.
"""
model_config = ConfigDict(extra="forbid")
timestamp: datetime
asset: str
instrument_name: str
strike: Decimal
expiry: datetime
option_type: Literal["C", "P"]
bid: Decimal | None = None
ask: Decimal | None = None
mid: Decimal | None = None
iv: Decimal | None = None
delta: Decimal | None = None
gamma: Decimal | None = None
theta: Decimal | None = None
vega: Decimal | None = None
open_interest: int | None = None
volume_24h: int | None = None
book_depth_top3: int | None = None
class ManualAction(BaseModel): class ManualAction(BaseModel):
"""Row of the ``manual_actions`` table.""" """Row of the ``manual_actions`` table."""
+130
View File
@@ -24,6 +24,7 @@ from cerbero_bite.state.models import (
InstructionRecord, InstructionRecord,
ManualAction, ManualAction,
MarketSnapshotRecord, MarketSnapshotRecord,
OptionChainQuoteRecord,
PositionRecord, PositionRecord,
PositionStatus, PositionStatus,
SystemStateRecord, SystemStateRecord,
@@ -407,6 +408,103 @@ class Repository:
).fetchall() ).fetchall()
return [_row_to_market_snapshot(r) for r in rows] return [_row_to_market_snapshot(r) for r in rows]
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
def record_option_chain_snapshot(
self,
conn: sqlite3.Connection,
quotes: list[OptionChainQuoteRecord],
) -> int:
"""Bulk-insert dei quote di un singolo tick. Tutti i quote
condividono lo stesso ``timestamp``. Idempotente per
(timestamp, instrument_name)."""
if not quotes:
return 0
rows = [
(
_enc_dt(q.timestamp),
q.asset,
q.instrument_name,
_enc_dec(q.strike),
_enc_dt(q.expiry),
q.option_type,
_enc_dec(q.bid),
_enc_dec(q.ask),
_enc_dec(q.mid),
_enc_dec(q.iv),
_enc_dec(q.delta),
_enc_dec(q.gamma),
_enc_dec(q.theta),
_enc_dec(q.vega),
q.open_interest,
q.volume_24h,
q.book_depth_top3,
)
for q in quotes
]
conn.executemany(
"INSERT OR REPLACE INTO option_chain_snapshots("
"timestamp, asset, instrument_name, strike, expiry, option_type, "
"bid, ask, mid, iv, delta, gamma, theta, vega, "
"open_interest, volume_24h, book_depth_top3) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
rows,
)
return len(rows)
def list_option_chain_snapshots(
self,
conn: sqlite3.Connection,
*,
asset: str,
start: datetime | None = None,
end: datetime | None = None,
expiry_from: datetime | None = None,
expiry_to: datetime | None = None,
limit: int = 50000,
) -> list[OptionChainQuoteRecord]:
clauses: list[str] = ["asset = ?"]
params: list[Any] = [asset]
if start is not None:
clauses.append("timestamp >= ?")
params.append(_enc_dt(start))
if end is not None:
clauses.append("timestamp <= ?")
params.append(_enc_dt(end))
if expiry_from is not None:
clauses.append("expiry >= ?")
params.append(_enc_dt(expiry_from))
if expiry_to is not None:
clauses.append("expiry <= ?")
params.append(_enc_dt(expiry_to))
params.append(int(limit))
rows = conn.execute(
f"SELECT * FROM option_chain_snapshots "
f"WHERE {' AND '.join(clauses)} "
f"ORDER BY timestamp DESC, instrument_name ASC LIMIT ?",
params,
).fetchall()
return [_row_to_option_chain_quote(r) for r in rows]
def latest_option_chain_timestamp(
self,
conn: sqlite3.Connection,
*,
asset: str,
) -> datetime | None:
"""Timestamp dell'ultimo snapshot raccolto per ``asset``,
utile per stimare la freschezza del dato dalla GUI."""
row = conn.execute(
"SELECT timestamp FROM option_chain_snapshots "
"WHERE asset = ? ORDER BY timestamp DESC LIMIT 1",
(asset,),
).fetchone()
if row is None:
return None
return _dec_dt(row["timestamp"])
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# manual_actions # manual_actions
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -692,6 +790,38 @@ def _row_to_market_snapshot(row: sqlite3.Row) -> MarketSnapshotRecord:
) )
def _row_to_option_chain_quote(row: sqlite3.Row) -> OptionChainQuoteRecord:
return OptionChainQuoteRecord(
timestamp=_dec_dt_required(row["timestamp"]),
asset=row["asset"],
instrument_name=row["instrument_name"],
strike=_dec_dec_required(row["strike"]),
expiry=_dec_dt_required(row["expiry"]),
option_type=row["option_type"],
bid=_dec_dec(row["bid"]),
ask=_dec_dec(row["ask"]),
mid=_dec_dec(row["mid"]),
iv=_dec_dec(row["iv"]),
delta=_dec_dec(row["delta"]),
gamma=_dec_dec(row["gamma"]),
theta=_dec_dec(row["theta"]),
vega=_dec_dec(row["vega"]),
open_interest=(
int(row["open_interest"])
if row["open_interest"] is not None
else None
),
volume_24h=(
int(row["volume_24h"]) if row["volume_24h"] is not None else None
),
book_depth_top3=(
int(row["book_depth_top3"])
if row["book_depth_top3"] is not None
else None
),
)
def _dec_dec_required(value: Any) -> Decimal: def _dec_dec_required(value: Any) -> Decimal:
out = _dec_dec(value) out = _dec_dec(value)
if out is None: if out is None:
+6 -2
View File
@@ -28,8 +28,8 @@
# 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante** # 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante**
# di cosa otterresti DOPO quel lavoro di codice. # di cosa otterresti DOPO quel lavoro di codice.
config_version: "1.2.0-aggressiva" config_version: "1.3.0-aggressiva"
config_hash: "e3a583cabfaa4781cd0ebcc8b62fc8f200648153738f93ab8726b062e46cacef" config_hash: "e983e156bf0c270941765e7b9639a35fdc6de7b091076bf5a9b360e294e81e4c"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -65,6 +65,10 @@ entry:
dealer_gamma_min: "0" dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true liquidation_filter_enabled: true
# IV richness gate (§2.9) — abilitato a 3 pt vol per profilo aggressivo.
iv_minus_rv_min: "3"
iv_minus_rv_filter_enabled: true
structure: structure:
dte_target: 18 dte_target: 18
+6 -2
View File
@@ -15,8 +15,8 @@
# cerbero-bite config hash --file strategy.conservativa.yaml # cerbero-bite config hash --file strategy.conservativa.yaml
# e bumpare config_version. # e bumpare config_version.
config_version: "1.2.0-conservativa" config_version: "1.3.0-conservativa"
config_hash: "fa09dad9cfa40a8ab006ec85157635603e0c4b6381ecd5d721504e00c4119a1b" config_hash: "900646beb1dd0a7bfaf553f76adb4b55004eff1f094585f779302131625919e8"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -49,6 +49,10 @@ entry:
dealer_gamma_min: "0" dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true liquidation_filter_enabled: true
# IV richness gate (§2.9). Disabilitato di default.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure: structure:
dte_target: 18 dte_target: 18
+6 -2
View File
@@ -6,8 +6,8 @@
# config hash), and lands as a separate commit with the motivation in # config hash), and lands as a separate commit with the motivation in
# the commit message. # the commit message.
config_version: "1.2.0" config_version: "1.3.0"
config_hash: "33263a313b26b24b41269f93f93783784451ac9b4b6460005b95c2fb3624fcdc" config_hash: "178a87467707d54d1ffef2d585a3a01be54de5ccc7e23493356eac47fd1c24d8"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -45,6 +45,10 @@ entry:
dealer_gamma_min: "0" dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true liquidation_filter_enabled: true
# IV richness gate (§2.9). Disabilitato di default.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure: structure:
dte_target: 18 dte_target: 18
+10
View File
@@ -118,6 +118,16 @@ def _wire_market_snapshot(
}, },
is_reusable=True, is_reusable=True,
) )
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/get_realized_vol",
json={
"currency": "ETH",
"realized_vol_pct": {"14d": 30.0, "30d": 30.0},
"iv_current_pct": 38.0,
"iv_minus_rv_pct": {"14d": 8.0, "30d": 8.0},
},
is_reusable=True,
)
httpx_mock.add_response( httpx_mock.add_response(
url="http://mcp-sentiment:9014/tools/get_liquidation_heatmap", url="http://mcp-sentiment:9014/tools/get_liquidation_heatmap",
json={ json={
+1
View File
@@ -129,6 +129,7 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None:
"backup", "backup",
"manual_actions", "manual_actions",
"market_snapshot", "market_snapshot",
"option_chain_snapshot",
} }
+259
View File
@@ -0,0 +1,259 @@
"""TDD per :mod:`cerbero_bite.core.backtest`."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.config import StrategyConfig, golden_config
from cerbero_bite.core.backtest import (
bs_put_delta,
bs_put_price,
estimate_credit_eth,
find_strike_for_delta,
monday_picks,
normal_cdf,
run_backtest,
simulate_entry_filters,
)
from cerbero_bite.state.models import MarketSnapshotRecord
# ---------------------------------------------------------------------------
# Black-Scholes helpers
# ---------------------------------------------------------------------------
def test_normal_cdf_known_values() -> None:
assert normal_cdf(0.0) == pytest.approx(0.5, abs=1e-6)
assert normal_cdf(1.0) == pytest.approx(0.8413, abs=1e-3)
assert normal_cdf(-1.0) == pytest.approx(0.1587, abs=1e-3)
assert normal_cdf(2.0) == pytest.approx(0.9772, abs=1e-3)
def test_bs_put_price_atm_positive_and_less_than_strike() -> None:
p = bs_put_price(spot=3000, strike=3000, t_years=18 / 365, sigma=0.50)
assert p > 0
assert p < 3000 # cap
def test_bs_put_price_far_otm_close_to_zero() -> None:
p = bs_put_price(spot=3000, strike=1500, t_years=18 / 365, sigma=0.50)
assert 0 <= p < 5 # essentially zero
def test_bs_put_delta_atm_around_minus_half() -> None:
d = bs_put_delta(spot=3000, strike=3000, t_years=18 / 365, sigma=0.50)
assert d == pytest.approx(-0.475, abs=0.05)
def test_bs_put_delta_far_otm_close_to_zero() -> None:
d = bs_put_delta(spot=3000, strike=1500, t_years=18 / 365, sigma=0.50)
assert -0.05 < d <= 0
def test_find_strike_for_delta_monotone() -> None:
spot = 3000.0
dvol = 50.0
dte = 18
s_010 = find_strike_for_delta(
spot=spot, dvol_pct=dvol, dte_days=dte, target_delta_abs=0.10,
)
s_020 = find_strike_for_delta(
spot=spot, dvol_pct=dvol, dte_days=dte, target_delta_abs=0.20,
)
# |Δ|=0.20 (più ITM) ⇒ strike più alto di |Δ|=0.10 (più OTM).
assert s_020 > s_010
# Verifica che il delta corrisponda a target ± tolleranza.
achieved = abs(
bs_put_delta(
spot=spot, strike=s_020, t_years=dte / 365, sigma=dvol / 100,
)
)
assert achieved == pytest.approx(0.20, abs=0.02)
def test_estimate_credit_returns_positive_credit_in_normal_regime() -> None:
credit_eth, short_k, long_k = estimate_credit_eth(
spot=3000, dvol_pct=50, dte_days=18, width_pct=0.04, delta_target_abs=0.12,
)
# Sanity: credit > 0, short_k < spot, long_k = short_k - 4%×spot
assert credit_eth > 0
assert short_k < 3000
assert long_k < short_k
assert short_k - long_k == pytest.approx(0.04 * 3000, abs=1.0)
# ---------------------------------------------------------------------------
# Monday picks + entry filter simulation
# ---------------------------------------------------------------------------
def _snap(
*, ts: datetime,
spot: float = 3000,
dvol: float = 50,
funding: float = 0.0,
macro_d: int | None = None,
asset: str = "ETH",
) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=ts,
asset=asset,
spot=Decimal(str(spot)),
dvol=Decimal(str(dvol)),
funding_perp_annualized=Decimal(str(funding)),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("100"),
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=macro_d,
fetch_ok=True,
)
def test_monday_picks_extracts_one_per_iso_week() -> None:
monday_2026_05_04 = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
monday_2026_05_11 = datetime(2026, 5, 11, 14, 0, tzinfo=UTC)
snapshots = [
_snap(ts=monday_2026_05_04),
_snap(ts=monday_2026_05_04 + timedelta(minutes=15)), # not picked
_snap(ts=monday_2026_05_11),
]
picks = monday_picks(snapshots)
assert len(picks) == 2
assert picks[0].timestamp == monday_2026_05_04
assert picks[1].timestamp == monday_2026_05_11
def test_monday_picks_skips_other_days_and_hours() -> None:
snapshots = [
_snap(ts=datetime(2026, 5, 4, 13, 0, tzinfo=UTC)), # Monday 13:00
_snap(ts=datetime(2026, 5, 5, 14, 0, tzinfo=UTC)), # Tuesday 14:00
]
assert monday_picks(snapshots) == []
def test_monday_picks_filters_by_asset() -> None:
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [
_snap(ts=monday, asset="BTC"),
_snap(ts=monday, asset="ETH"),
]
picks = monday_picks(snapshots, asset="ETH")
assert len(picks) == 1
assert picks[0].snapshot.asset == "ETH"
def test_simulate_entry_filters_accepts_clean_snapshot(
) -> None:
cfg: StrategyConfig = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=50, funding=0.10)
picks = [
type("MP", (), {"timestamp": monday, "snapshot": snap})() # type: ignore[arg-type]
]
# Hack: build via real dataclass
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert len(results) == 1
assert results[0].accepted is True
def test_simulate_entry_filters_rejects_dvol_out_of_band() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=20, funding=0.10) # below 35
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert any("dvol" in r.lower() for r in results[0].reasons)
def test_simulate_entry_filters_skips_incomplete_snapshot() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
incomplete = MarketSnapshotRecord(
timestamp=monday, asset="ETH", spot=Decimal("3000"),
# dvol=None ⇒ skipped
fetch_ok=False,
)
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=incomplete)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert results[0].skipped_for_data is True
# ---------------------------------------------------------------------------
# Full pipeline (sintetico)
# ---------------------------------------------------------------------------
def _synthetic_year_of_snapshots(
*,
n_weeks: int = 8,
spot: float = 3000,
dvol: float = 60, # con skew_premium 1.5 ⇒ credit/width ≈ 35% (sopra soglia 30%)
funding: float = 0.10,
) -> list[MarketSnapshotRecord]:
"""Genera N settimane di snapshot sintetici ETH a 4 tick/settimana."""
rows: list[MarketSnapshotRecord] = []
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
for week in range(n_weeks):
base = monday + timedelta(weeks=week)
# Lunedì 14:00 è il pick
rows.append(_snap(ts=base, spot=spot, dvol=dvol, funding=funding))
# Tick intermedi che NON cadono di lunedì alle 14:00:
# offset +1h così vengono ignorati da `monday_picks`.
for d in (2, 8, 14, 19):
rows.append(
_snap(
ts=base + timedelta(days=d, hours=1),
spot=spot * (1 + 0.005 * d), # +0.5% al giorno
dvol=dvol - 1.5 * d, # vol che scende lentamente
funding=funding,
)
)
return rows
def test_run_backtest_produces_report_with_trades() -> None:
# Per il test scaliamo il credit/width gate al 15%: il modello BS
# senza skew completo sottostima i premi OTM rispetto al reale.
# Vedi `estimate_credit_eth.skew_premium` docstring per dettagli.
from cerbero_bite.config.schema import StructureConfig
cfg = golden_config()
cfg = cfg.model_copy(
update={
"structure": StructureConfig(
**{
**cfg.structure.model_dump(),
"credit_to_width_ratio_min": Decimal("0.15"),
}
)
}
)
snapshots = _synthetic_year_of_snapshots(n_weeks=4)
report = run_backtest(snapshots, cfg, capital_usd=Decimal("1500"))
# Sanity: 4 picks, almeno 1 trade chiuso
assert report.n_picks == 4
assert report.n_completed >= 1
assert report.cumulative_pnl_usd != Decimal("0")
# Bull-put + ETH al rialzo + DVOL che scende ⇒ atteso win
assert report.n_winners >= 1
def test_run_backtest_handles_no_picks_gracefully() -> None:
cfg = golden_config()
# Solo tick infrasettimanali, niente Monday 14:00.
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [_snap(ts=monday + timedelta(hours=1))]
report = run_backtest(snapshots, cfg, capital_usd=Decimal("1500"))
assert report.n_picks == 0
assert report.n_completed == 0
assert report.cumulative_pnl_usd == Decimal("0")
+1 -1
View File
@@ -68,7 +68,7 @@ def test_compute_hash_is_independent_of_recorded_hash_value(tmp_path: Path) -> N
def test_load_repo_strategy_yaml(tmp_path: Path) -> None: def test_load_repo_strategy_yaml(tmp_path: Path) -> None:
"""The committed strategy.yaml validates with the recorded hash.""" """The committed strategy.yaml validates with the recorded hash."""
result = load_strategy(REPO_ROOT / "strategy.yaml") result = load_strategy(REPO_ROOT / "strategy.yaml")
assert result.config.config_version == "1.2.0" assert result.config.config_version == "1.3.0"
assert result.config.sizing.kelly_fraction == Decimal("0.13") assert result.config.sizing.kelly_fraction == Decimal("0.13")
assert result.computed_hash == result.config.config_hash assert result.computed_hash == result.config.config_hash
+56
View File
@@ -194,6 +194,62 @@ def test_dealer_gamma_filter_disabled_in_config(cfg: StrategyConfig) -> None:
assert decision.accepted is True assert decision.accepted is True
# ---------------------------------------------------------------------------
# IV richness gate (§2.9)
# ---------------------------------------------------------------------------
def _strict_iv_rv_cfg(
cfg: StrategyConfig, *, threshold: Decimal = Decimal("5")
) -> StrategyConfig:
return golden_config(
entry=EntryConfig(
**{
**cfg.entry.model_dump(),
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_min": threshold,
}
)
)
def test_iv_richness_gate_disabled_by_default_lets_thin_premium_pass(
cfg: StrategyConfig,
) -> None:
# Default config: filter disabled. Anche con IV-RV negativa (RV>IV)
# l'entry deve passare per non rompere setup pre-calibrazione.
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("-2")), cfg)
assert decision.accepted is True
def test_iv_richness_gate_blocks_when_below_floor(cfg: StrategyConfig) -> None:
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("3")), strict)
assert decision.accepted is False
assert any("IV richness" in r for r in decision.reasons)
def test_iv_richness_gate_passes_when_above_floor(cfg: StrategyConfig) -> None:
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("6")), strict)
assert decision.accepted is True
def test_iv_richness_gate_passes_at_exact_threshold(cfg: StrategyConfig) -> None:
# Soglia inclusiva: IV-RV == soglia → accettato (gate è "<", non "<=").
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("5")), strict)
assert decision.accepted is True
def test_iv_richness_gate_skipped_when_data_missing(cfg: StrategyConfig) -> None:
# MCP irraggiungibile: best-effort skip, non bloccare l'entry per
# un problema di infrastruttura.
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=None), strict)
assert decision.accepted is True
def test_validate_entry_accumulates_all_reasons(cfg: StrategyConfig) -> None: def test_validate_entry_accumulates_all_reasons(cfg: StrategyConfig) -> None:
decision = validate_entry( decision = validate_entry(
_good_ctx( _good_ctx(
@@ -0,0 +1,134 @@
"""TDD per :mod:`cerbero_bite.runtime.option_chain_snapshot_cycle`."""
from __future__ import annotations
from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
from cerbero_bite.clients.deribit import InstrumentMeta
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.state.models import OptionChainQuoteRecord
_NOW = datetime(2026, 5, 4, 13, 55, tzinfo=UTC)
def _meta(name: str, strike: int, expiry_dte: int = 18) -> InstrumentMeta:
expiry = _NOW.replace(hour=8, minute=0, second=0)
expiry = expiry.replace(day=expiry.day) + (
# add days
__import__("datetime").timedelta(days=expiry_dte)
)
return InstrumentMeta(
name=name,
strike=Decimal(str(strike)),
expiry=expiry,
option_type="P",
open_interest=Decimal("100"),
tick_size=Decimal("0.0005"),
min_trade_amount=Decimal("1"),
)
def _ticker(name: str, *, mark: float = 0.020, bid: float = 0.018,
ask: float = 0.022, delta: float = -0.12) -> dict:
return {
"instrument_name": name,
"bid": bid,
"ask": ask,
"mark_price": mark,
"mark_iv": 60.0,
"volume_24h": 50,
"greeks": {
"delta": delta,
"gamma": 0.001,
"theta": -0.0005,
"vega": 0.10,
},
}
@pytest.fixture
def cfg() -> object:
from cerbero_bite.config import golden_config
return golden_config()
@pytest.fixture
def fake_ctx(cfg: object) -> MagicMock:
"""Mock minimal RuntimeContext."""
ctx = MagicMock()
ctx.cfg = cfg
ctx.db_path = ":memory:"
return ctx
@pytest.mark.asyncio
async def test_collector_persists_one_quote_per_instrument(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475), _meta("ETH-21MAY26-2400-P", 2400)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(
return_value=[_ticker(m.name) for m in metas]
)
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, asset="ETH", now=_NOW)
assert n == 2
assert len(persisted) == 1
assert {q.instrument_name for q in persisted[0]} == {
"ETH-21MAY26-2475-P", "ETH-21MAY26-2400-P",
}
# Tutti i quote condividono il timestamp del cron tick.
assert all(q.timestamp == _NOW for q in persisted[0])
@pytest.mark.asyncio
async def test_collector_handles_missing_tickers_with_null_fields(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(return_value=[]) # vuoto
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 1
assert persisted[0][0].mid is None # ticker mancante ⇒ campi NULL
assert persisted[0][0].instrument_name == "ETH-21MAY26-2475-P"
@pytest.mark.asyncio
async def test_collector_returns_zero_when_chain_empty(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(return_value=[])
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0
@pytest.mark.asyncio
async def test_collector_swallows_chain_fetch_failure(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(side_effect=RuntimeError("boom"))
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0 # best-effort: non rilancia