3 Commits

Author SHA1 Message Date
root 954baaa354 feat(cli): comando option-chain (trigger + analyze) per la catena opzioni
Espone direttamente da CLI le due operazioni più utili sui dati di
``option_chain_snapshots`` raccolti dal cron settimanale:

- ``cerbero-bite option-chain trigger`` — esegue UNA volta il
  collector della catena. Riusa la stessa pipeline schedulata (cron
  ``55 13 * * MON``) ma on-demand. Utile per popolare il DB senza
  aspettare lunedì.
- ``cerbero-bite option-chain analyze [--bias bull_put|bear_call]`` —
  legge l'ultimo snapshot, simula il selector di strike
  (``select_strikes``) con la strategy passata e stampa una tabella
  con: short/long strike, delta, width, credito reale, ratio
  credit/width, e PASS/FAIL del gate ``credit_to_width_ratio_min``.

Il comando ``analyze`` rende immediatamente actionable la catena
appena raccolta: invece di stime ex-ante via Black-Scholes (modulo
``core/backtest.py``), legge i mid REALI di Deribit e dice "il rule
engine aprirebbe questo trade qui? credit/width ratio passa o no?".

Esempio di output sui primi snapshot raccolti (regime ETH ~2200,
DTE ~14g):

    Snapshot del 2026-05-01T20:53:49 — 21 quote totali
    Il rule engine NON aprirebbe trade con questa catena
    (no strike compatibile coi gate delta/distance/width/credit-ratio).

Conferma empirica del messaggio del documento ``13-strategia-spiegata``:
con delta target 0.12 + width 4% + credit/width ≥ 30%, il regime
attuale di ETH options non è abbastanza ricco per produrre trade —
serve calibrare soglie o aspettare un regime IV più alto.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:57:40 +00:00
root 3e46169278 fix(migrations): rinomina 0004 → 0005 per coesistenza con auto_pause
La migrazione `0004_option_chain_snapshots.sql` collide con quella
parallela `0004_auto_pause.sql` del PR `feat/strategy-improvements-fdac`:
entrambe puntano allo stesso slot e bumpano user_version a 4.

Rinominata a 0005 (con `PRAGMA user_version = 5`) così le due
migrazioni possono coesistere senza conflitti, indipendentemente
dall'ordine di merge dei due PR. Quando i due PR landeranno in main,
basterà conservare la sequenza 0004 (auto_pause) → 0005 (option_chain).

Verificato in locale: deploy con DB già a v4 (post-FDAC) ora applica
correttamente la migrazione e crea la tabella `option_chain_snapshots`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:52:11 +00:00
root c0a0ee416f feat(state+runtime): option_chain_snapshots — catena opzioni storica per backtest reale
Aggiunge la persistence della option chain Deribit con cron settimanale
``55 13 * * MON`` (5 minuti prima del trigger entry alle 14:00 UTC),
sbloccando il backtest non-stilizzato e la calibrazione empirica
dello skew premium.

**Schema (migrazione 0004)**

Nuova tabella ``option_chain_snapshots`` con primary key composta
``(timestamp, instrument_name)`` — tutti i quote prelevati nello
stesso tick condividono il timestamp, così le query "lo snapshot del
2026-05-04 alle 13:55" diventano una singola WHERE timestamp = X.
Indici su (asset, timestamp DESC) e (asset, expiry) per supportare
sia listing recenti sia query per scadenza specifica.

Campi: instrument_name, strike, expiry, option_type (C/P), bid, ask,
mid, iv, delta, gamma, theta, vega, open_interest, volume_24h,
book_depth_top3. Tutti i numerici sono nullable: il collector è
best-effort, un ticker mancante produce comunque una riga (utile
per sapere che lo strumento esisteva ma non era quotato).

**Modello + repository**

- ``OptionChainQuoteRecord`` (Pydantic, in ``state/models.py``).
- ``Repository.record_option_chain_snapshot`` (bulk insert
  idempotente).
- ``Repository.list_option_chain_snapshots`` (filtri su asset,
  timestamp window, expiry window, limit default 50000).
- ``Repository.latest_option_chain_timestamp`` (freshness check
  per dashboard GUI).

**Collector**

Nuovo ``runtime/option_chain_snapshot_cycle.py`` che:

1. Calcola la finestra scadenze ``[now+dte_min, now+dte_max]`` da
   ``cfg.structure``: niente richieste su scadenze che il rule
   engine non userebbe mai.
2. Chiama ``deribit.options_chain()`` con
   ``min_open_interest=cfg.liquidity.open_interest_min``.
3. Batch ``deribit.get_tickers()`` (max 20 per call, limite Deribit)
   con error-isolation per batch — un batch fallito non blocca
   gli altri.
4. NON chiama l'order book per ogni strike (rate-limit guard);
   ``book_depth_top3`` resta NULL e il liquidity gate live lo
   chiede on-the-fly per gli strike candidati al picker.

Best-effort end-to-end: chain assente, get_tickers giù, persist
fallito → ritorna 0 senza alzare eccezioni, logga sempre.

**Schedulazione**

Wired in ``Orchestrator.install_scheduler`` come job parallelo a
``market_snapshot``, attivo solo quando
``ENABLE_DATA_ANALYSIS=true``. Cron parametrizzabile via il nuovo
kwarg ``option_chain_cron`` (default ``55 13 * * MON``).

**Test**

- 4 unit test del collector (happy path, ticker mancante, chain
  vuota, fetch fail best-effort) con mock di RuntimeContext.
- Aggiornato ``test_install_scheduler_registers_canonical_jobs``
  per includere il nuovo job nel set canonico.

**Cosa sblocca**

- Backtest non-stilizzato: il PR ``feat/backtest-engine`` può
  dropparsi il modello BS+skew_premium e leggere prezzi reali
  ``mid`` dalla chain registrata.
- Calibrazione empirica dello skew premium (hardcoded a 1.5 nel
  backtest stilizzato): plot del rapporto fra quote reali Deribit
  e BS per delta/expiry, regressione → valore data-driven.
- Validazione ex-post: "il delta-0.12 era davvero a 25% OTM in
  quella settimana?" diventa una query SELECT.
- Dimensione attesa: ~50 strike × 3 scadenze × 1 snapshot/settimana
  × 17 colonne ≈ 12 KB/settimana, ~600 KB/anno. Trascurabile.

Suite: 409 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:44:49 +00:00
10 changed files with 779 additions and 1060 deletions
+235 -149
View File
@@ -679,155 +679,6 @@ def replay(date_from: str, date_to: str, capital: float, dry_run: bool) -> None:
) )
@main.command()
@click.option(
"--strategy",
"strategy_path",
type=click.Path(path_type=Path),
default=Path("strategy.yaml"),
show_default=True,
help="Path al file di strategia (golden, conservativa, aggressiva, ...).",
)
@click.option(
"--db",
"db_path",
type=click.Path(path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
help="SQLite con `market_snapshots` storiche.",
)
@click.option(
"--from",
"date_from",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: 90 giorni fa).",
)
@click.option(
"--to",
"date_to",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: oggi).",
)
@click.option(
"--capital",
type=float,
default=1500.0,
show_default=True,
help="Capitale di partenza per il backtest, in USD.",
)
@click.option(
"--asset",
type=str,
default="ETH",
show_default=True,
help="Asset di riferimento per le snapshot.",
)
@click.option(
"--no-enforce-hash",
is_flag=True,
default=False,
help="Salta la verifica del config_hash (utile per profili sperimentali).",
)
def backtest(
strategy_path: Path,
db_path: Path,
date_from: datetime | None,
date_to: datetime | None,
capital: float,
asset: str,
no_enforce_hash: bool,
) -> None:
"""Esegue il backtest stilizzato su `market_snapshots` storiche.
Usa lo stesso `validate_entry` del live per i filtri (rigoroso) e
un modello Black-Scholes con skew premium per stimare credito ed
exit P/L (stilizzato — vedi docstring di `core/backtest.py`).
"""
from cerbero_bite.config.loader import load_strategy # noqa: PLC0415
from cerbero_bite.core.backtest import run_backtest # noqa: PLC0415
console = Console()
if date_to is None:
date_to = datetime.now(UTC)
if date_from is None:
date_from = date_to - timedelta(days=90)
date_from = date_from.replace(tzinfo=UTC) if date_from.tzinfo is None else date_from
date_to = date_to.replace(tzinfo=UTC) if date_to.tzinfo is None else date_to
loaded = load_strategy(strategy_path, enforce_hash=not no_enforce_hash)
cfg = loaded.config
conn = connect_state(db_path)
try:
repo = Repository()
snapshots = repo.list_market_snapshots(
conn,
asset=asset.upper(),
start=date_from,
end=date_to,
limit=10000,
)
finally:
conn.close()
if not snapshots:
console.print(
f"[yellow]Nessuno snapshot {asset} trovato fra {date_from.date()} "
f"e {date_to.date()}.[/yellow]"
)
sys.exit(1)
console.print(
f"[green]Caricate {len(snapshots)} snapshot {asset} "
f"({snapshots[-1].timestamp.date()}{snapshots[0].timestamp.date()})[/green]"
)
report = run_backtest(snapshots, cfg, capital_usd=Decimal(str(capital)))
table = Table(title=f"Backtest report — {strategy_path.name}")
table.add_column("Metrica", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Picks (lunedì 14:00)", str(report.n_picks))
table.add_row(
"Accettati dai filtri",
f"{report.n_accepted} ({report.n_accepted / max(1, report.n_picks):.0%})",
)
table.add_row("Saltati per dato mancante", str(report.n_skipped_data))
table.add_row("Trade completati (con P/L)", str(report.n_completed))
table.add_row("Vincenti", f"{report.n_winners} ({report.win_rate:.0%})")
table.add_row("P/L cumulato (USD)", f"{report.cumulative_pnl_usd:+.2f}")
table.add_row(
"P/L su capitale", f"{report.cumulative_pnl_pct_of_capital:+.2%}"
)
table.add_row(
"Max drawdown", f"{report.max_drawdown_usd:.0f} USD "
f"({report.max_drawdown_pct:.1%})",
)
table.add_row(
"Sharpe (annualized)",
f"{report.sharpe_annualized}" if report.sharpe_annualized is not None
else "",
)
console.print(table)
if report.skip_reasons:
skip_table = Table(title="Motivi di skip aggregati")
skip_table.add_column("Motivo")
skip_table.add_column("Settimane", justify="right")
for reason, count in sorted(
report.skip_reasons.items(), key=lambda kv: -kv[1]
):
skip_table.add_row(reason, str(count))
console.print(skip_table)
console.print(
"[dim]Il modello P/L è stilizzato: BS + skew premium 1.5×. "
"Numeri ottimi per ranking config, non per promesse operative.[/dim]"
)
@main.group() @main.group()
def config() -> None: def config() -> None:
"""Strategy configuration utilities.""" """Strategy configuration utilities."""
@@ -961,6 +812,241 @@ def state_inspect(db: Path) -> None:
console.print(table) console.print(table)
@main.group(name="option-chain")
def option_chain() -> None:
"""Strumenti per la catena opzioni storica (`option_chain_snapshots`)."""
@option_chain.command(name="trigger")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--audit",
"audit_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
@click.option(
"--token",
type=str,
default=None,
help="MCP bearer token (override su CERBERO_BITE_MCP_TOKEN).",
)
@click.option("--asset", default="ETH", show_default=True)
def option_chain_trigger(
strategy_path: Path,
db_path: Path,
audit_path: Path,
token: str | None,
asset: str,
) -> None:
"""Esegue UNA volta il collector della catena opzioni e persiste in DB.
Utile per popolare i dati senza aspettare il cron settimanale del
job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline
schedulata.
"""
from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415
from cerbero_bite.runtime.option_chain_snapshot_cycle import ( # noqa: PLC0415
collect_option_chain_snapshot,
)
cfg = load_strategy(strategy_path).config
ctx = build_runtime(
cfg=cfg,
endpoints=load_endpoints(),
token=load_token(value=token),
db_path=db_path,
audit_path=audit_path,
bot_tag=load_bot_tag(),
)
n = asyncio.run(collect_option_chain_snapshot(ctx, asset=asset))
console.print(
f"[green]Persisted {n} option chain quote(s) for {asset}[/green]"
if n > 0
else f"[yellow]No quotes persisted (chain empty or fetch failed)[/yellow]"
)
@option_chain.command(name="analyze")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option("--asset", default="ETH", show_default=True)
@click.option(
"--bias",
type=click.Choice(["bull_put", "bear_call"], case_sensitive=False),
default="bull_put",
show_default=True,
help="Direzione da simulare (il rule engine lo deciderebbe da trend×funding).",
)
def option_chain_analyze(
strategy_path: Path,
db_path: Path,
asset: str,
bias: str,
) -> None:
"""Analizza l'ultimo snapshot di catena salvato.
Per la strategia indicata, simula la selezione strike (delta
target, OTM range, width 4%, credit/width ratio min) e mostra:
* lo strike che il rule engine sceglierebbe come short e long,
* credito atteso, larghezza, rapporto credit/width,
* pass/fail del gate `credit_to_width_ratio_min`.
"""
from cerbero_bite.core.combo_builder import select_strikes # noqa: PLC0415
from cerbero_bite.core.types import OptionQuote # noqa: PLC0415
cfg = load_strategy(strategy_path).config
conn = connect_state(db_path)
try:
repo = Repository()
latest_ts = repo.latest_option_chain_timestamp(conn, asset=asset.upper())
if latest_ts is None:
console.print(
"[red]Nessuno snapshot di catena trovato. Lancia prima "
"`cerbero-bite option-chain trigger`.[/red]"
)
sys.exit(1)
quotes_records = repo.list_option_chain_snapshots(
conn, asset=asset.upper(), start=latest_ts, end=latest_ts,
)
finally:
conn.close()
console.print(
f"[cyan]Snapshot del {latest_ts.isoformat()}{len(quotes_records)} "
f"quote totali[/cyan]"
)
# Costruzione OptionQuote da OptionChainQuoteRecord per riusare select_strikes.
quotes: list[OptionQuote] = []
for q in quotes_records:
if q.bid is None or q.ask is None or q.mid is None or q.delta is None:
continue
quotes.append(
OptionQuote(
instrument=q.instrument_name,
strike=q.strike,
expiry=q.expiry,
option_type=q.option_type,
bid=q.bid,
ask=q.ask,
mid=q.mid,
delta=q.delta,
gamma=q.gamma or Decimal("0"),
theta=q.theta or Decimal("0"),
vega=q.vega or Decimal("0"),
open_interest=q.open_interest or 0,
volume_24h=q.volume_24h or 0,
book_depth_top3=q.book_depth_top3 or 0,
)
)
if not quotes:
console.print("[red]Nessun quote completo per la simulazione.[/red]")
sys.exit(1)
# Lo spot al momento dello snapshot: estraiamo dall'ultimo
# `market_snapshot` ETH a quel timestamp (tolleranza ±15 min).
spot = _resolve_spot_at(db_path, asset=asset.upper(), at=latest_ts)
if spot is None:
console.print(
"[yellow]Spot non recuperabile dai market_snapshots; "
"stimato dal mid ATM.[/yellow]"
)
spot = _atm_spot_proxy(quotes)
selection = select_strikes(
chain=quotes,
bias=bias, # type: ignore[arg-type]
spot=spot,
now=latest_ts,
cfg=cfg,
)
if selection is None:
console.print(
"[red]Il rule engine NON aprirebbe trade con questa catena[/red] "
"(no strike compatibile coi gate delta/distance/width/credit-ratio)."
)
sys.exit(0)
short, long_ = selection
width_usd = (short.strike - long_.strike).copy_abs()
credit_eth = short.mid - long_.mid
credit_usd = credit_eth * spot
ratio = credit_usd / width_usd if width_usd > 0 else Decimal("0")
ratio_target = cfg.structure.credit_to_width_ratio_min
table = Table(title=f"Simulazione picker — bias={bias}, spot={spot:.0f}")
table.add_column("Campo", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Short strike", f"{short.strike} ({short.delta:+.3f}δ)")
table.add_row("Long strike", f"{long_.strike} ({long_.delta:+.3f}δ)")
table.add_row("Width", f"{width_usd:.0f} USD")
table.add_row("Credit", f"{credit_eth:.4f} ETH ≈ {credit_usd:.2f} USD")
table.add_row(
"Credit/width ratio",
f"{ratio:.2%} (gate ≥ {float(ratio_target):.0%})",
)
pass_str = (
"[green]PASS — entry possibile[/green]"
if ratio >= ratio_target
else "[red]FAIL — premio troppo magro[/red]"
)
table.add_row("Verdetto gate ratio", pass_str)
console.print(table)
def _resolve_spot_at(db_path: Path, *, asset: str, at: datetime) -> Decimal | None:
"""Best-effort lookup dello spot al timestamp ``at`` ± 15 min."""
conn = connect_state(db_path)
try:
rows = Repository().list_market_snapshots(
conn,
asset=asset,
start=at - timedelta(minutes=15),
end=at + timedelta(minutes=15),
limit=1,
)
finally:
conn.close()
if not rows:
return None
return rows[0].spot
def _atm_spot_proxy(quotes: list[Any]) -> Decimal:
"""Stima dello spot prendendo lo strike il cui delta è più vicino a 0.5."""
quote = min(quotes, key=lambda q: abs(abs(q.delta) - Decimal("0.5")))
return quote.strike
def _entrypoint() -> None: def _entrypoint() -> None:
"""Wrapper used by ``cerbero-bite`` console script.""" """Wrapper used by ``cerbero-bite`` console script."""
try: try:
-652
View File
@@ -1,652 +0,0 @@
"""Stylized backtest engine over ``market_snapshots`` (§13).
Two layers, both pure functions:
1. **Entry-filter simulation** — for each Monday 14:00 UTC tick in the
recorded snapshots, evaluate which §2 gates would have passed,
reconstructing :class:`EntryContext` from the snapshot. This part
is **rigorous**: it uses the same :func:`validate_entry` the live
engine uses, so the output is exactly "what the bot would have
decided".
2. **P/L estimation per accepted entry** — since ``market_snapshots``
does NOT record the option chain (we only collect spot, DVOL,
funding, etc.), credit and exit P/L are estimated via a stylized
Black-Scholes model: given ``spot``, ``DVOL`` (as IV), and the
strategy's delta target, we solve for the short strike, the long
strike at ``width_pct`` distance, and the combo mid-price. Future
ticks are then re-priced under the same model to detect the first
exit trigger from §7.
The stylized layer is **intentionally approximate**: it captures the
geometry of the strategy (DVOL band sets credit, ETH path drives
exit triggers) but not the second-order effects (chain liquidity,
borrow rates, exchange fees beyond the 0.03% notional cap, dealer
hedging skew). Numbers are good for ranking and tuning, not for
operational P/L promises.
The engine is deterministic and side-effect-free: it does **not**
write to SQLite, does not call MCP, does not place orders. It
operates entirely on a list of :class:`MarketSnapshotRecord` rows
the caller has already loaded.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Literal
from pydantic import BaseModel, ConfigDict
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.core.entry_validator import EntryContext, validate_entry
from cerbero_bite.state.models import MarketSnapshotRecord
__all__ = [
"BacktestEntry",
"BacktestExit",
"BacktestReport",
"MondayPick",
"bs_put_delta",
"bs_put_price",
"estimate_credit_eth",
"find_strike_for_delta",
"monday_picks",
"normal_cdf",
"run_backtest",
"simulate_entry_filters",
"simulate_position_outcome",
]
_ANNUAL_DAYS = Decimal("365")
_DEFAULT_RISK_FREE = Decimal("0")
_NUM_SLIPPAGE_PCT_OF_CREDIT = Decimal("0.03")
_NUM_FEE_PCT_OF_NOTIONAL = Decimal("0.0003")
# ---------------------------------------------------------------------------
# Black-Scholes helpers (stdlib-only)
# ---------------------------------------------------------------------------
def normal_cdf(x: float) -> float:
"""Standard normal CDF, no scipy dependency."""
return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))
def bs_put_price(*, spot: float, strike: float, t_years: float, sigma: float) -> float:
"""European put price under r=0, q=0 Black-Scholes.
Returns price in spot units (so for an ETH option, dividing by spot
gives the price in ETH).
"""
if t_years <= 0 or sigma <= 0 or spot <= 0 or strike <= 0:
return max(0.0, strike - spot)
sqrt_t = math.sqrt(t_years)
d1 = (math.log(spot / strike) + 0.5 * sigma * sigma * t_years) / (sigma * sqrt_t)
d2 = d1 - sigma * sqrt_t
return strike * normal_cdf(-d2) - spot * normal_cdf(-d1)
def bs_put_delta(*, spot: float, strike: float, t_years: float, sigma: float) -> float:
"""Put delta under r=0, q=0 Black-Scholes (negative number for put).
Returns 0 for expired options.
"""
if t_years <= 0 or sigma <= 0 or spot <= 0 or strike <= 0:
return 0.0
sqrt_t = math.sqrt(t_years)
d1 = (math.log(spot / strike) + 0.5 * sigma * sigma * t_years) / (sigma * sqrt_t)
return normal_cdf(d1) - 1.0 # = -N(-d1)
def find_strike_for_delta(
*,
spot: float,
dvol_pct: float,
dte_days: int,
target_delta_abs: float,
) -> float:
"""Solve for the put strike whose |delta| matches ``target_delta_abs``.
Bisection on a monotone-decreasing |delta(strike)| relationship.
Returns the strike in absolute USD terms.
"""
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
# Bracket: from 50% of spot (deep OTM, small |delta|) up to spot
# (ATM, |delta| ≈ 0.5).
low = max(1.0, spot * 0.30)
high = spot
for _ in range(64):
mid = 0.5 * (low + high)
delta_abs = abs(bs_put_delta(spot=spot, strike=mid, t_years=t_years, sigma=sigma))
if delta_abs > target_delta_abs:
high = mid
else:
low = mid
if abs(high - low) < 1e-3:
break
return 0.5 * (low + high)
def estimate_credit_eth(
*,
spot: float,
dvol_pct: float,
dte_days: int,
width_pct: float,
delta_target_abs: float,
skew_premium: float = 1.5,
) -> tuple[float, float, float]:
"""Estimate credit (ETH), short_strike, long_strike for a bull-put-style
credit spread under Black-Scholes.
``skew_premium`` è il moltiplicatore applicato al credito BS per
approssimare la **vol smile** dell'ETH options market (le put OTM
trattano a IV più alta della IV ATM, quindi un BS pulito sottostima
sistematicamente il premio del venditore di vol). Il default 1.5
è una stima conservativa dei dati Deribit storici (smile slope
tipica 5-10 vol points per 100δ); valori sensati: 1.3 (smile
blanda) … 1.8 (regime "stress IV"). Va calibrato sui dati reali
quando avremo abbastanza chain history da farlo.
Returns ``(credit_eth, short_strike, long_strike)``. Credit è
già moltiplicato per ``skew_premium``.
"""
short_strike = find_strike_for_delta(
spot=spot, dvol_pct=dvol_pct, dte_days=dte_days,
target_delta_abs=delta_target_abs,
)
width_usd = width_pct * spot
long_strike = max(1.0, short_strike - width_usd)
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
short_mid_usd = bs_put_price(
spot=spot, strike=short_strike, t_years=t_years, sigma=sigma,
)
long_mid_usd = bs_put_price(
spot=spot, strike=long_strike, t_years=t_years, sigma=sigma,
)
short_mid_eth = short_mid_usd / spot
long_mid_eth = long_mid_usd / spot
credit_eth = (short_mid_eth - long_mid_eth) * skew_premium
return credit_eth, short_strike, long_strike
# ---------------------------------------------------------------------------
# Entry filter simulation — rigorous (uses validate_entry exactly)
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class MondayPick:
"""Indice di un tick "Monday 14:00 UTC" nella time-series."""
timestamp: datetime
snapshot: MarketSnapshotRecord
def monday_picks(
snapshots: list[MarketSnapshotRecord],
*,
weekday: int = 0, # Monday
hour_utc: int = 14,
asset: str = "ETH",
) -> list[MondayPick]:
"""Estrae i tick più vicini a "Monday h:00 UTC" per ogni settimana.
``snapshots`` deve essere ordinato per timestamp ascending. Per ogni
occorrenza di ``weekday + hour_utc`` (es. lun 14:00) presa l'unica
riga ETH che la copre. Settimane senza tick a quell'ora vengono
saltate.
"""
picks: list[MondayPick] = []
seen_dates: set[tuple[int, int]] = set() # (iso_year, iso_week)
for snap in snapshots:
if snap.asset.upper() != asset.upper():
continue
ts = snap.timestamp.astimezone(UTC)
if ts.weekday() != weekday or ts.hour != hour_utc:
continue
iso_y, iso_w, _ = ts.isocalendar()
key = (iso_y, iso_w)
if key in seen_dates:
continue
seen_dates.add(key)
picks.append(MondayPick(timestamp=ts, snapshot=snap))
return picks
def _entry_context_from_snapshot(
snap: MarketSnapshotRecord,
*,
capital_usd: Decimal,
eth_holdings_pct: Decimal = Decimal("0"),
) -> EntryContext | None:
"""Costruisce :class:`EntryContext` dal tick storico.
``None`` quando la riga non ha i campi minimi (spot, dvol, funding).
Nel filtro questo si traduce in "skip della settimana" — è la
stessa logica del live: un tick incompleto è meglio di un'entry
al buio.
"""
if snap.dvol is None or snap.funding_perp_annualized is None:
return None
return EntryContext(
capital_usd=capital_usd,
dvol_now=snap.dvol,
funding_perp_annualized=snap.funding_perp_annualized,
eth_holdings_pct_of_portfolio=eth_holdings_pct,
next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma,
liquidation_squeeze_risk_high=(
snap.liquidation_long_risk == "high"
or snap.liquidation_short_risk == "high"
),
)
@dataclass(frozen=True)
class EntryFilterResult:
"""Esito del check filtri per una singola Monday pick."""
pick: MondayPick
accepted: bool
reasons: list[str]
skipped_for_data: bool # True se il tick non aveva i campi minimi
def simulate_entry_filters(
picks: list[MondayPick],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
) -> list[EntryFilterResult]:
"""Per ogni Monday pick, valuta validate_entry come farebbe il live.
Rigoroso: usa esattamente :func:`validate_entry` e :class:`EntryContext`.
Restituisce la lista degli esiti, una entry per pick.
"""
results: list[EntryFilterResult] = []
for pick in picks:
ctx = _entry_context_from_snapshot(pick.snapshot, capital_usd=capital_usd)
if ctx is None:
results.append(
EntryFilterResult(
pick=pick,
accepted=False,
reasons=["incomplete_snapshot"],
skipped_for_data=True,
)
)
continue
decision = validate_entry(ctx, cfg)
results.append(
EntryFilterResult(
pick=pick,
accepted=decision.accepted,
reasons=list(decision.reasons),
skipped_for_data=False,
)
)
return results
# ---------------------------------------------------------------------------
# Position outcome simulation — stylized (Black-Scholes re-pricing)
# ---------------------------------------------------------------------------
class BacktestEntry(BaseModel):
"""Trade aperto nel backtest (snapshot al momento dell'entry)."""
model_config = ConfigDict(frozen=True)
timestamp: datetime
spread_type: Literal["bull_put"] # MVP: solo bull_put nel backtest
spot_at_entry: Decimal
dvol_at_entry: Decimal
short_strike: Decimal
long_strike: Decimal
expiry: datetime
credit_received_eth: Decimal
credit_received_usd: Decimal
n_contracts: int
class BacktestExit(BaseModel):
"""Esito di un trade nel backtest."""
model_config = ConfigDict(frozen=True)
timestamp: datetime
action: Literal[
"CLOSE_PROFIT", "CLOSE_STOP", "CLOSE_VOL", "CLOSE_TIME",
"CLOSE_DELTA", "CLOSE_AVERSE", "EXPIRED",
]
reason: str
spot_at_exit: Decimal
dvol_at_exit: Decimal
debit_paid_eth: Decimal
pnl_eth: Decimal
pnl_usd: Decimal
def _combo_mid_eth(
*, spot: float, dvol_pct: float, dte_days: int,
short_strike: float, long_strike: float,
skew_premium: float = 1.5,
) -> float:
"""Re-prezza il combo bull-put usando BS sul nuovo spot/dvol/dte."""
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
short_mid_usd = bs_put_price(
spot=spot, strike=short_strike, t_years=t_years, sigma=sigma,
)
long_mid_usd = bs_put_price(
spot=spot, strike=long_strike, t_years=t_years, sigma=sigma,
)
return (short_mid_usd - long_mid_usd) / spot * skew_premium
def simulate_position_outcome(
entry: BacktestEntry,
future_snapshots: list[MarketSnapshotRecord],
cfg: StrategyConfig,
) -> BacktestExit:
"""Re-prezza il combo a ogni tick futuro fino al primo exit trigger.
Triggers in ordine §7:
1. profit_take (debit ≤ 0.5×credit)
2. stop_loss (debit ≥ 2.5×credit)
3. vol_stop (DVOL salita di ≥10 pt rispetto entry)
4. time_stop (DTE ≤ 7 e debit > 0.7×credit)
5. expiry (uscita per scadenza, P/L = credit intrinsic)
"""
ec = cfg.exit
credit = float(entry.credit_received_eth)
short = float(entry.short_strike)
long_ = float(entry.long_strike)
profit_thresh = float(ec.profit_take_pct_of_credit) * credit
stop_thresh = float(ec.stop_loss_mark_x_credit) * credit
skip_time_thresh = float(ec.time_stop_skip_if_close_to_profit_pct) * credit
for snap in future_snapshots:
if snap.timestamp <= entry.timestamp:
continue
if snap.timestamp >= entry.expiry:
break
if snap.dvol is None or snap.spot is None:
continue
spot_now = float(snap.spot)
dvol_now = float(snap.dvol)
dte = max(0, (entry.expiry - snap.timestamp).days)
debit = _combo_mid_eth(
spot=spot_now, dvol_pct=dvol_now, dte_days=dte,
short_strike=short, long_strike=long_,
)
if debit <= profit_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_PROFIT",
reason=f"debit {debit:.4f}{profit_thresh:.4f}",
)
if debit >= stop_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_STOP",
reason=f"debit {debit:.4f}{stop_thresh:.4f}",
)
if dvol_now >= float(entry.dvol_at_entry) + float(ec.vol_stop_dvol_increase):
return _exit(
snap, entry, debit,
action="CLOSE_VOL",
reason=f"DVOL {dvol_now:.1f} ≥ entry+{ec.vol_stop_dvol_increase}",
)
if dte <= ec.time_stop_dte_remaining and debit > skip_time_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_TIME",
reason=f"DTE {dte}{ec.time_stop_dte_remaining}",
)
# Tick passati senza trigger: scadenza naturale.
last = future_snapshots[-1] if future_snapshots else None
intrinsic = max(0.0, short - float(last.spot if last and last.spot else 0))
intrinsic_capped = min(intrinsic, short - long_)
debit_at_expiry_eth = (
intrinsic_capped / float(last.spot)
if last is not None and last.spot is not None and float(last.spot) > 0
else 0.0
)
return _exit(
last or _synthetic_expiry_snapshot(entry),
entry,
debit_at_expiry_eth,
action="EXPIRED",
reason="held to expiry",
)
def _synthetic_expiry_snapshot(entry: BacktestEntry) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=entry.expiry,
asset="ETH",
spot=entry.spot_at_entry,
dvol=entry.dvol_at_entry,
fetch_ok=False,
)
def _exit(
snap: MarketSnapshotRecord,
entry: BacktestEntry,
debit_eth: float,
*,
action: str,
reason: str,
) -> BacktestExit:
pnl_eth = float(entry.credit_received_eth) - debit_eth
spot = float(snap.spot) if snap.spot is not None else float(entry.spot_at_entry)
dvol = float(snap.dvol) if snap.dvol is not None else float(entry.dvol_at_entry)
return BacktestExit(
timestamp=snap.timestamp,
action=action, # type: ignore[arg-type]
reason=reason,
spot_at_exit=Decimal(str(spot)),
dvol_at_exit=Decimal(str(dvol)),
debit_paid_eth=Decimal(str(debit_eth)),
pnl_eth=Decimal(str(pnl_eth)),
pnl_usd=Decimal(str(pnl_eth * spot * entry.n_contracts)),
)
# ---------------------------------------------------------------------------
# Full pipeline
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class CompletedTrade:
entry: BacktestEntry
exit: BacktestExit
class BacktestReport(BaseModel):
"""Aggregato del backtest. Tutti i numeri sono **stime**."""
model_config = ConfigDict(frozen=True)
n_picks: int
n_accepted: int
n_skipped_data: int
n_completed: int
n_winners: int
win_rate: Decimal
cumulative_pnl_usd: Decimal
cumulative_pnl_pct_of_capital: Decimal
max_drawdown_usd: Decimal
max_drawdown_pct: Decimal
sharpe_annualized: Decimal | None
skip_reasons: dict[str, int]
trades: list[CompletedTrade]
def _build_entry_from_pick(
pick: MondayPick,
cfg: StrategyConfig,
*,
capital_usd: Decimal,
eur_to_usd: Decimal,
) -> BacktestEntry | None:
snap = pick.snapshot
if snap.spot is None or snap.dvol is None:
return None
spot = float(snap.spot)
dvol = float(snap.dvol)
width_pct = float(cfg.structure.spread_width.target_pct_of_spot)
delta_target = float(cfg.structure.short_strike.delta_target)
dte = cfg.structure.dte_target
credit_eth, short_strike, long_strike = estimate_credit_eth(
spot=spot, dvol_pct=dvol, dte_days=dte,
width_pct=width_pct, delta_target_abs=delta_target,
)
width_usd = float(cfg.structure.spread_width.target_pct_of_spot) * spot
credit_usd = credit_eth * spot
if width_usd <= 0 or credit_usd / width_usd < float(
cfg.structure.credit_to_width_ratio_min
):
return None # ratio gate fallisce → no entry
cap_pertrade_usd = float(cfg.sizing.cap_per_trade_eur) * float(eur_to_usd)
risk_target = min(float(cfg.sizing.kelly_fraction) * float(capital_usd), cap_pertrade_usd)
n_contracts = max(0, min(int(risk_target // width_usd), cfg.sizing.max_contracts_per_trade))
if n_contracts == 0:
return None
expiry = pick.timestamp + timedelta(days=dte)
return BacktestEntry(
timestamp=pick.timestamp,
spread_type="bull_put",
spot_at_entry=Decimal(str(spot)),
dvol_at_entry=Decimal(str(dvol)),
short_strike=Decimal(str(round(short_strike, 2))),
long_strike=Decimal(str(round(long_strike, 2))),
expiry=expiry,
credit_received_eth=Decimal(str(credit_eth)),
credit_received_usd=Decimal(str(credit_usd * n_contracts)),
n_contracts=n_contracts,
)
def _max_drawdown_usd(equity: list[Decimal]) -> tuple[Decimal, Decimal]:
"""Return ``(max_dd_usd, max_dd_pct_of_peak)`` over an equity curve."""
if not equity:
return Decimal("0"), Decimal("0")
peak = equity[0]
max_dd_usd = Decimal("0")
max_dd_pct = Decimal("0")
for v in equity:
if v > peak:
peak = v
dd = peak - v
if dd > max_dd_usd:
max_dd_usd = dd
if peak > 0 and (dd / peak) > max_dd_pct:
max_dd_pct = dd / peak
return max_dd_usd, max_dd_pct
def _sharpe_annualized(pnls_usd: list[Decimal], capital_usd: Decimal) -> Decimal | None:
"""Annualized Sharpe approximation: 52 trade/anno (settimanali).
Restituisce ``None`` se ci sono <5 trade o stdev = 0.
"""
if len(pnls_usd) < 5 or capital_usd <= 0:
return None
rets = [float(p / capital_usd) for p in pnls_usd]
mean = sum(rets) / len(rets)
var = sum((r - mean) ** 2 for r in rets) / max(1, (len(rets) - 1))
std = math.sqrt(var)
if std == 0:
return None
sharpe = mean / std * math.sqrt(52)
return Decimal(str(round(sharpe, 3)))
def run_backtest(
snapshots: list[MarketSnapshotRecord],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
eur_to_usd: Decimal = Decimal("1.075"),
asset: str = "ETH",
) -> BacktestReport:
"""Esegue il backtest end-to-end sui ``snapshots`` ETH ordinati per ts."""
snapshots = sorted(snapshots, key=lambda s: s.timestamp)
eth_snapshots = [s for s in snapshots if s.asset.upper() == asset.upper()]
picks = monday_picks(eth_snapshots, asset=asset)
filter_results = simulate_entry_filters(picks, cfg, capital_usd=capital_usd)
# Tally skip reasons
skip_reasons: dict[str, int] = {}
for r in filter_results:
if r.accepted:
continue
for reason in r.reasons:
skip_reasons[reason] = skip_reasons.get(reason, 0) + 1
trades: list[CompletedTrade] = []
for r in filter_results:
if not r.accepted:
continue
entry = _build_entry_from_pick(
r.pick, cfg, capital_usd=capital_usd, eur_to_usd=eur_to_usd,
)
if entry is None:
skip_reasons["sizing_or_ratio"] = skip_reasons.get("sizing_or_ratio", 0) + 1
continue
future = [s for s in eth_snapshots if s.timestamp > r.pick.timestamp]
exit_ = simulate_position_outcome(entry, future, cfg)
trades.append(CompletedTrade(entry=entry, exit=exit_))
pnls = [t.exit.pnl_usd for t in trades]
cumulative = sum(pnls, start=Decimal("0"))
n_winners = sum(1 for p in pnls if p > 0)
win_rate = (
Decimal(n_winners) / Decimal(len(pnls))
if pnls
else Decimal("0")
)
# Equity curve in USD assoluti
equity = [capital_usd]
for p in pnls:
equity.append(equity[-1] + p)
max_dd_usd, max_dd_pct = _max_drawdown_usd(equity)
return BacktestReport(
n_picks=len(picks),
n_accepted=sum(1 for r in filter_results if r.accepted),
n_skipped_data=sum(1 for r in filter_results if r.skipped_for_data),
n_completed=len(trades),
n_winners=n_winners,
win_rate=win_rate,
cumulative_pnl_usd=cumulative,
cumulative_pnl_pct_of_capital=(
cumulative / capital_usd if capital_usd > 0 else Decimal("0")
),
max_drawdown_usd=max_dd_usd,
max_drawdown_pct=max_dd_pct,
sharpe_annualized=_sharpe_annualized(pnls, capital_usd),
skip_reasons=skip_reasons,
trades=trades,
)
@@ -0,0 +1,185 @@
"""Periodic option-chain snapshot collector (§13).
Fetches the Deribit option chain for every strike entro la finestra
DTE configurata, prima del trigger entry settimanale (cron
``55 13 * * MON`` di default). Persiste un quote per ogni strumento
in ``option_chain_snapshots`` con un timestamp condiviso, che diventa
il dato di base per:
* il backtest non-stilizzato (vedi ``core/backtest.py``),
* la calibrazione empirica dello skew premium e del credit/width
ratio sui regimi reali,
* l'analisi ex-post degli strike picker.
Il collector è **best-effort**: se ``get_tickers`` fallisce per un
batch, gli altri batch passano comunque; se manca completamente la
chain, il job ritorna 0 senza alzare eccezioni e logga il problema.
Non chiama l'order book per ogni strike (sarebbe troppo costoso) —
``book_depth_top3`` resta NULL nel quote, il liquidity gate del live
lo legge al volo solo per gli strike che gli interessano.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import TYPE_CHECKING, Any
from cerbero_bite.state import connect, transaction
from cerbero_bite.state.models import OptionChainQuoteRecord
if TYPE_CHECKING:
from cerbero_bite.runtime.dependencies import RuntimeContext
__all__ = ["DEFAULT_BATCH_SIZE", "collect_option_chain_snapshot"]
_log = logging.getLogger("cerbero_bite.runtime.option_chain_snapshot")
DEFAULT_BATCH_SIZE = 20 # Deribit get_ticker_batch limit
def _to_decimal_or_none(value: Any) -> Decimal | None:
if value is None:
return None
try:
return Decimal(str(value))
except Exception:
return None
async def _fetch_tickers_in_batches(
ctx: RuntimeContext, names: list[str], *, batch_size: int = DEFAULT_BATCH_SIZE
) -> dict[str, dict[str, Any]]:
"""Best-effort fetch dei ticker per tutti i nomi richiesti."""
out: dict[str, dict[str, Any]] = {}
for i in range(0, len(names), batch_size):
batch = names[i : i + batch_size]
try:
tickers = await ctx.deribit.get_tickers(batch)
except Exception as exc:
_log.warning(
"get_tickers failed for batch starting %s: %s",
batch[0] if batch else "<empty>", exc,
)
continue
for t in tickers:
name = t.get("instrument_name") or t.get("instrument")
if isinstance(name, str):
out[name] = t
return out
async def collect_option_chain_snapshot(
ctx: RuntimeContext,
*,
asset: str = "ETH",
now: datetime | None = None,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> int:
"""Collect + persist a single chain snapshot for ``asset``. Returns
the number of quotes persisted (0 on best-effort failure).
Filtra le scadenze nella finestra ``[dte_min, dte_max]`` di
``cfg.structure`` per non sprecare richieste su scadenze che il
rule engine non userebbe mai.
"""
when = (now or datetime.now(UTC)).astimezone(UTC)
cfg = ctx.cfg
expiry_from = when + timedelta(days=cfg.structure.dte_min)
expiry_to = when + timedelta(days=cfg.structure.dte_max)
try:
chain = await ctx.deribit.options_chain(
currency=asset.upper(),
expiry_from=expiry_from,
expiry_to=expiry_to,
min_open_interest=int(cfg.liquidity.open_interest_min),
)
except Exception:
_log.exception("option chain fetch failed")
return 0
if not chain:
_log.info("option chain empty for %s in window", asset)
return 0
names = [meta.name for meta in chain]
tickers = await _fetch_tickers_in_batches(ctx, names, batch_size=batch_size)
quotes: list[OptionChainQuoteRecord] = []
for meta in chain:
ticker = tickers.get(meta.name)
if ticker is None:
# Lasciamo comunque la riga senza quote: utile sapere
# che lo strumento esisteva.
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
)
)
continue
greeks = ticker.get("greeks") or {}
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
bid=_to_decimal_or_none(ticker.get("bid")),
ask=_to_decimal_or_none(ticker.get("ask")),
mid=_to_decimal_or_none(ticker.get("mark_price")),
iv=_to_decimal_or_none(ticker.get("mark_iv")),
delta=_to_decimal_or_none(greeks.get("delta")),
gamma=_to_decimal_or_none(greeks.get("gamma")),
theta=_to_decimal_or_none(greeks.get("theta")),
vega=_to_decimal_or_none(greeks.get("vega")),
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
volume_24h=(
int(ticker["volume_24h"])
if ticker.get("volume_24h") is not None
else None
),
# book_depth_top3: NULL — non lo prendiamo per ogni
# strike per non saturare l'API. Il liquidity gate
# del live lo chiede on-the-fly per gli strike
# candidati al picker.
)
)
persisted = 0
try:
conn = connect(ctx.db_path)
try:
with transaction(conn):
persisted = ctx.repository.record_option_chain_snapshot(
conn, quotes
)
finally:
conn.close()
except Exception:
_log.exception("persist option chain snapshot failed")
return 0
_log.info("option_chain_snapshot persisted %d quote(s)", persisted)
return persisted
# Avoid unused import warning for asyncio in lint when only used as type
_ = asyncio
+21
View File
@@ -34,6 +34,9 @@ from cerbero_bite.runtime.market_snapshot_cycle import (
DEFAULT_ASSETS, DEFAULT_ASSETS,
collect_market_snapshot, collect_market_snapshot,
) )
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
@@ -53,6 +56,7 @@ _CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *" _CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *" _CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *" _CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry
_BACKUP_RETENTION_DAYS = 30 _BACKUP_RETENTION_DAYS = 30
@@ -217,6 +221,8 @@ class Orchestrator:
manual_actions_cron: str = _CRON_MANUAL_ACTIONS, manual_actions_cron: str = _CRON_MANUAL_ACTIONS,
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT, market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS, market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
backup_dir: Path | None = None, backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS, backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler: ) -> AsyncIOScheduler:
@@ -282,6 +288,14 @@ class Orchestrator:
await _safe("market_snapshot", _do) await _safe("market_snapshot", _do)
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
await _safe("option_chain_snapshot", _do)
jobs: list[JobSpec] = [ jobs: list[JobSpec] = [
JobSpec(name="health", cron=health_cron, coro_factory=_health), JobSpec(name="health", cron=health_cron, coro_factory=_health),
JobSpec(name="backup", cron=backup_cron, coro_factory=_backup), JobSpec(name="backup", cron=backup_cron, coro_factory=_backup),
@@ -309,6 +323,13 @@ class Orchestrator:
coro_factory=_market_snapshot, coro_factory=_market_snapshot,
) )
) )
jobs.append(
JobSpec(
name="option_chain_snapshot",
cron=option_chain_cron,
coro_factory=_option_chain_snapshot,
)
)
else: else:
_log.warning( _log.warning(
"data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS=" "data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS="
@@ -0,0 +1,42 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
-- in quello stesso tick condividono il timestamp, così filtrare per
-- "lo snapshot del 2026-05-04 alle 13:55" è una semplice
-- WHERE timestamp = X.
CREATE TABLE option_chain_snapshots (
timestamp TEXT NOT NULL,
asset TEXT NOT NULL,
instrument_name TEXT NOT NULL,
strike TEXT NOT NULL,
expiry TEXT NOT NULL,
option_type TEXT NOT NULL CHECK (option_type IN ('C','P')),
bid TEXT,
ask TEXT,
mid TEXT,
iv TEXT,
delta TEXT,
gamma TEXT,
theta TEXT,
vega TEXT,
open_interest INTEGER,
volume_24h INTEGER,
book_depth_top3 INTEGER,
PRIMARY KEY (timestamp, instrument_name)
) WITHOUT ROWID;
CREATE INDEX idx_option_chain_asset_ts
ON option_chain_snapshots(asset, timestamp DESC);
CREATE INDEX idx_option_chain_expiry
ON option_chain_snapshots(asset, expiry);
PRAGMA user_version = 5;
+31
View File
@@ -22,6 +22,7 @@ __all__ = [
"InstructionRecord", "InstructionRecord",
"ManualAction", "ManualAction",
"MarketSnapshotRecord", "MarketSnapshotRecord",
"OptionChainQuoteRecord",
"PositionRecord", "PositionRecord",
"PositionStatus", "PositionStatus",
"SystemStateRecord", "SystemStateRecord",
@@ -148,6 +149,36 @@ class MarketSnapshotRecord(BaseModel):
fetch_errors_json: str | None = None fetch_errors_json: str | None = None
class OptionChainQuoteRecord(BaseModel):
"""Row of the ``option_chain_snapshots`` table.
One row per (snapshot_ts, instrument) the same ``timestamp`` is
shared by every quote prelevato nello stesso tick del cron. Tutti
i campi numerici sono opzionali perché il collector è
best-effort: un ticker mancante non invalida il resto della chain.
"""
model_config = ConfigDict(extra="forbid")
timestamp: datetime
asset: str
instrument_name: str
strike: Decimal
expiry: datetime
option_type: Literal["C", "P"]
bid: Decimal | None = None
ask: Decimal | None = None
mid: Decimal | None = None
iv: Decimal | None = None
delta: Decimal | None = None
gamma: Decimal | None = None
theta: Decimal | None = None
vega: Decimal | None = None
open_interest: int | None = None
volume_24h: int | None = None
book_depth_top3: int | None = None
class ManualAction(BaseModel): class ManualAction(BaseModel):
"""Row of the ``manual_actions`` table.""" """Row of the ``manual_actions`` table."""
+130
View File
@@ -24,6 +24,7 @@ from cerbero_bite.state.models import (
InstructionRecord, InstructionRecord,
ManualAction, ManualAction,
MarketSnapshotRecord, MarketSnapshotRecord,
OptionChainQuoteRecord,
PositionRecord, PositionRecord,
PositionStatus, PositionStatus,
SystemStateRecord, SystemStateRecord,
@@ -407,6 +408,103 @@ class Repository:
).fetchall() ).fetchall()
return [_row_to_market_snapshot(r) for r in rows] return [_row_to_market_snapshot(r) for r in rows]
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
def record_option_chain_snapshot(
self,
conn: sqlite3.Connection,
quotes: list[OptionChainQuoteRecord],
) -> int:
"""Bulk-insert dei quote di un singolo tick. Tutti i quote
condividono lo stesso ``timestamp``. Idempotente per
(timestamp, instrument_name)."""
if not quotes:
return 0
rows = [
(
_enc_dt(q.timestamp),
q.asset,
q.instrument_name,
_enc_dec(q.strike),
_enc_dt(q.expiry),
q.option_type,
_enc_dec(q.bid),
_enc_dec(q.ask),
_enc_dec(q.mid),
_enc_dec(q.iv),
_enc_dec(q.delta),
_enc_dec(q.gamma),
_enc_dec(q.theta),
_enc_dec(q.vega),
q.open_interest,
q.volume_24h,
q.book_depth_top3,
)
for q in quotes
]
conn.executemany(
"INSERT OR REPLACE INTO option_chain_snapshots("
"timestamp, asset, instrument_name, strike, expiry, option_type, "
"bid, ask, mid, iv, delta, gamma, theta, vega, "
"open_interest, volume_24h, book_depth_top3) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
rows,
)
return len(rows)
def list_option_chain_snapshots(
self,
conn: sqlite3.Connection,
*,
asset: str,
start: datetime | None = None,
end: datetime | None = None,
expiry_from: datetime | None = None,
expiry_to: datetime | None = None,
limit: int = 50000,
) -> list[OptionChainQuoteRecord]:
clauses: list[str] = ["asset = ?"]
params: list[Any] = [asset]
if start is not None:
clauses.append("timestamp >= ?")
params.append(_enc_dt(start))
if end is not None:
clauses.append("timestamp <= ?")
params.append(_enc_dt(end))
if expiry_from is not None:
clauses.append("expiry >= ?")
params.append(_enc_dt(expiry_from))
if expiry_to is not None:
clauses.append("expiry <= ?")
params.append(_enc_dt(expiry_to))
params.append(int(limit))
rows = conn.execute(
f"SELECT * FROM option_chain_snapshots "
f"WHERE {' AND '.join(clauses)} "
f"ORDER BY timestamp DESC, instrument_name ASC LIMIT ?",
params,
).fetchall()
return [_row_to_option_chain_quote(r) for r in rows]
def latest_option_chain_timestamp(
self,
conn: sqlite3.Connection,
*,
asset: str,
) -> datetime | None:
"""Timestamp dell'ultimo snapshot raccolto per ``asset``,
utile per stimare la freschezza del dato dalla GUI."""
row = conn.execute(
"SELECT timestamp FROM option_chain_snapshots "
"WHERE asset = ? ORDER BY timestamp DESC LIMIT 1",
(asset,),
).fetchone()
if row is None:
return None
return _dec_dt(row["timestamp"])
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# manual_actions # manual_actions
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -645,6 +743,38 @@ def _row_to_market_snapshot(row: sqlite3.Row) -> MarketSnapshotRecord:
) )
def _row_to_option_chain_quote(row: sqlite3.Row) -> OptionChainQuoteRecord:
return OptionChainQuoteRecord(
timestamp=_dec_dt_required(row["timestamp"]),
asset=row["asset"],
instrument_name=row["instrument_name"],
strike=_dec_dec_required(row["strike"]),
expiry=_dec_dt_required(row["expiry"]),
option_type=row["option_type"],
bid=_dec_dec(row["bid"]),
ask=_dec_dec(row["ask"]),
mid=_dec_dec(row["mid"]),
iv=_dec_dec(row["iv"]),
delta=_dec_dec(row["delta"]),
gamma=_dec_dec(row["gamma"]),
theta=_dec_dec(row["theta"]),
vega=_dec_dec(row["vega"]),
open_interest=(
int(row["open_interest"])
if row["open_interest"] is not None
else None
),
volume_24h=(
int(row["volume_24h"]) if row["volume_24h"] is not None else None
),
book_depth_top3=(
int(row["book_depth_top3"])
if row["book_depth_top3"] is not None
else None
),
)
def _dec_dec_required(value: Any) -> Decimal: def _dec_dec_required(value: Any) -> Decimal:
out = _dec_dec(value) out = _dec_dec(value)
if out is None: if out is None:
+1
View File
@@ -129,6 +129,7 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None:
"backup", "backup",
"manual_actions", "manual_actions",
"market_snapshot", "market_snapshot",
"option_chain_snapshot",
} }
-259
View File
@@ -1,259 +0,0 @@
"""TDD per :mod:`cerbero_bite.core.backtest`."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.config import StrategyConfig, golden_config
from cerbero_bite.core.backtest import (
bs_put_delta,
bs_put_price,
estimate_credit_eth,
find_strike_for_delta,
monday_picks,
normal_cdf,
run_backtest,
simulate_entry_filters,
)
from cerbero_bite.state.models import MarketSnapshotRecord
# ---------------------------------------------------------------------------
# Black-Scholes helpers
# ---------------------------------------------------------------------------
def test_normal_cdf_known_values() -> None:
assert normal_cdf(0.0) == pytest.approx(0.5, abs=1e-6)
assert normal_cdf(1.0) == pytest.approx(0.8413, abs=1e-3)
assert normal_cdf(-1.0) == pytest.approx(0.1587, abs=1e-3)
assert normal_cdf(2.0) == pytest.approx(0.9772, abs=1e-3)
def test_bs_put_price_atm_positive_and_less_than_strike() -> None:
p = bs_put_price(spot=3000, strike=3000, t_years=18 / 365, sigma=0.50)
assert p > 0
assert p < 3000 # cap
def test_bs_put_price_far_otm_close_to_zero() -> None:
p = bs_put_price(spot=3000, strike=1500, t_years=18 / 365, sigma=0.50)
assert 0 <= p < 5 # essentially zero
def test_bs_put_delta_atm_around_minus_half() -> None:
d = bs_put_delta(spot=3000, strike=3000, t_years=18 / 365, sigma=0.50)
assert d == pytest.approx(-0.475, abs=0.05)
def test_bs_put_delta_far_otm_close_to_zero() -> None:
d = bs_put_delta(spot=3000, strike=1500, t_years=18 / 365, sigma=0.50)
assert -0.05 < d <= 0
def test_find_strike_for_delta_monotone() -> None:
spot = 3000.0
dvol = 50.0
dte = 18
s_010 = find_strike_for_delta(
spot=spot, dvol_pct=dvol, dte_days=dte, target_delta_abs=0.10,
)
s_020 = find_strike_for_delta(
spot=spot, dvol_pct=dvol, dte_days=dte, target_delta_abs=0.20,
)
# |Δ|=0.20 (più ITM) ⇒ strike più alto di |Δ|=0.10 (più OTM).
assert s_020 > s_010
# Verifica che il delta corrisponda a target ± tolleranza.
achieved = abs(
bs_put_delta(
spot=spot, strike=s_020, t_years=dte / 365, sigma=dvol / 100,
)
)
assert achieved == pytest.approx(0.20, abs=0.02)
def test_estimate_credit_returns_positive_credit_in_normal_regime() -> None:
credit_eth, short_k, long_k = estimate_credit_eth(
spot=3000, dvol_pct=50, dte_days=18, width_pct=0.04, delta_target_abs=0.12,
)
# Sanity: credit > 0, short_k < spot, long_k = short_k - 4%×spot
assert credit_eth > 0
assert short_k < 3000
assert long_k < short_k
assert short_k - long_k == pytest.approx(0.04 * 3000, abs=1.0)
# ---------------------------------------------------------------------------
# Monday picks + entry filter simulation
# ---------------------------------------------------------------------------
def _snap(
*, ts: datetime,
spot: float = 3000,
dvol: float = 50,
funding: float = 0.0,
macro_d: int | None = None,
asset: str = "ETH",
) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=ts,
asset=asset,
spot=Decimal(str(spot)),
dvol=Decimal(str(dvol)),
funding_perp_annualized=Decimal(str(funding)),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("100"),
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=macro_d,
fetch_ok=True,
)
def test_monday_picks_extracts_one_per_iso_week() -> None:
monday_2026_05_04 = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
monday_2026_05_11 = datetime(2026, 5, 11, 14, 0, tzinfo=UTC)
snapshots = [
_snap(ts=monday_2026_05_04),
_snap(ts=monday_2026_05_04 + timedelta(minutes=15)), # not picked
_snap(ts=monday_2026_05_11),
]
picks = monday_picks(snapshots)
assert len(picks) == 2
assert picks[0].timestamp == monday_2026_05_04
assert picks[1].timestamp == monday_2026_05_11
def test_monday_picks_skips_other_days_and_hours() -> None:
snapshots = [
_snap(ts=datetime(2026, 5, 4, 13, 0, tzinfo=UTC)), # Monday 13:00
_snap(ts=datetime(2026, 5, 5, 14, 0, tzinfo=UTC)), # Tuesday 14:00
]
assert monday_picks(snapshots) == []
def test_monday_picks_filters_by_asset() -> None:
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [
_snap(ts=monday, asset="BTC"),
_snap(ts=monday, asset="ETH"),
]
picks = monday_picks(snapshots, asset="ETH")
assert len(picks) == 1
assert picks[0].snapshot.asset == "ETH"
def test_simulate_entry_filters_accepts_clean_snapshot(
) -> None:
cfg: StrategyConfig = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=50, funding=0.10)
picks = [
type("MP", (), {"timestamp": monday, "snapshot": snap})() # type: ignore[arg-type]
]
# Hack: build via real dataclass
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert len(results) == 1
assert results[0].accepted is True
def test_simulate_entry_filters_rejects_dvol_out_of_band() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=20, funding=0.10) # below 35
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert any("dvol" in r.lower() for r in results[0].reasons)
def test_simulate_entry_filters_skips_incomplete_snapshot() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
incomplete = MarketSnapshotRecord(
timestamp=monday, asset="ETH", spot=Decimal("3000"),
# dvol=None ⇒ skipped
fetch_ok=False,
)
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=incomplete)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert results[0].skipped_for_data is True
# ---------------------------------------------------------------------------
# Full pipeline (sintetico)
# ---------------------------------------------------------------------------
def _synthetic_year_of_snapshots(
*,
n_weeks: int = 8,
spot: float = 3000,
dvol: float = 60, # con skew_premium 1.5 ⇒ credit/width ≈ 35% (sopra soglia 30%)
funding: float = 0.10,
) -> list[MarketSnapshotRecord]:
"""Genera N settimane di snapshot sintetici ETH a 4 tick/settimana."""
rows: list[MarketSnapshotRecord] = []
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
for week in range(n_weeks):
base = monday + timedelta(weeks=week)
# Lunedì 14:00 è il pick
rows.append(_snap(ts=base, spot=spot, dvol=dvol, funding=funding))
# Tick intermedi che NON cadono di lunedì alle 14:00:
# offset +1h così vengono ignorati da `monday_picks`.
for d in (2, 8, 14, 19):
rows.append(
_snap(
ts=base + timedelta(days=d, hours=1),
spot=spot * (1 + 0.005 * d), # +0.5% al giorno
dvol=dvol - 1.5 * d, # vol che scende lentamente
funding=funding,
)
)
return rows
def test_run_backtest_produces_report_with_trades() -> None:
# Per il test scaliamo il credit/width gate al 15%: il modello BS
# senza skew completo sottostima i premi OTM rispetto al reale.
# Vedi `estimate_credit_eth.skew_premium` docstring per dettagli.
from cerbero_bite.config.schema import StructureConfig
cfg = golden_config()
cfg = cfg.model_copy(
update={
"structure": StructureConfig(
**{
**cfg.structure.model_dump(),
"credit_to_width_ratio_min": Decimal("0.15"),
}
)
}
)
snapshots = _synthetic_year_of_snapshots(n_weeks=4)
report = run_backtest(snapshots, cfg, capital_usd=Decimal("1500"))
# Sanity: 4 picks, almeno 1 trade chiuso
assert report.n_picks == 4
assert report.n_completed >= 1
assert report.cumulative_pnl_usd != Decimal("0")
# Bull-put + ETH al rialzo + DVOL che scende ⇒ atteso win
assert report.n_winners >= 1
def test_run_backtest_handles_no_picks_gracefully() -> None:
cfg = golden_config()
# Solo tick infrasettimanali, niente Monday 14:00.
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [_snap(ts=monday + timedelta(hours=1))]
report = run_backtest(snapshots, cfg, capital_usd=Decimal("1500"))
assert report.n_picks == 0
assert report.n_completed == 0
assert report.cumulative_pnl_usd == Decimal("0")
@@ -0,0 +1,134 @@
"""TDD per :mod:`cerbero_bite.runtime.option_chain_snapshot_cycle`."""
from __future__ import annotations
from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
from cerbero_bite.clients.deribit import InstrumentMeta
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.state.models import OptionChainQuoteRecord
_NOW = datetime(2026, 5, 4, 13, 55, tzinfo=UTC)
def _meta(name: str, strike: int, expiry_dte: int = 18) -> InstrumentMeta:
expiry = _NOW.replace(hour=8, minute=0, second=0)
expiry = expiry.replace(day=expiry.day) + (
# add days
__import__("datetime").timedelta(days=expiry_dte)
)
return InstrumentMeta(
name=name,
strike=Decimal(str(strike)),
expiry=expiry,
option_type="P",
open_interest=Decimal("100"),
tick_size=Decimal("0.0005"),
min_trade_amount=Decimal("1"),
)
def _ticker(name: str, *, mark: float = 0.020, bid: float = 0.018,
ask: float = 0.022, delta: float = -0.12) -> dict:
return {
"instrument_name": name,
"bid": bid,
"ask": ask,
"mark_price": mark,
"mark_iv": 60.0,
"volume_24h": 50,
"greeks": {
"delta": delta,
"gamma": 0.001,
"theta": -0.0005,
"vega": 0.10,
},
}
@pytest.fixture
def cfg() -> object:
from cerbero_bite.config import golden_config
return golden_config()
@pytest.fixture
def fake_ctx(cfg: object) -> MagicMock:
"""Mock minimal RuntimeContext."""
ctx = MagicMock()
ctx.cfg = cfg
ctx.db_path = ":memory:"
return ctx
@pytest.mark.asyncio
async def test_collector_persists_one_quote_per_instrument(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475), _meta("ETH-21MAY26-2400-P", 2400)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(
return_value=[_ticker(m.name) for m in metas]
)
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, asset="ETH", now=_NOW)
assert n == 2
assert len(persisted) == 1
assert {q.instrument_name for q in persisted[0]} == {
"ETH-21MAY26-2475-P", "ETH-21MAY26-2400-P",
}
# Tutti i quote condividono il timestamp del cron tick.
assert all(q.timestamp == _NOW for q in persisted[0])
@pytest.mark.asyncio
async def test_collector_handles_missing_tickers_with_null_fields(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(return_value=[]) # vuoto
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 1
assert persisted[0][0].mid is None # ticker mancante ⇒ campi NULL
assert persisted[0][0].instrument_name == "ETH-21MAY26-2475-P"
@pytest.mark.asyncio
async def test_collector_returns_zero_when_chain_empty(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(return_value=[])
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0
@pytest.mark.asyncio
async def test_collector_swallows_chain_fetch_failure(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(side_effect=RuntimeError("boom"))
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0 # best-effort: non rilancia