10 Commits

Author SHA1 Message Date
root a1a9f74ed2 Merge feat/option-chain-snapshots 2026-05-01 21:08:28 +00:00
root a9df399db4 Merge feat/backtest-engine 2026-05-01 21:08:22 +00:00
root e06f4d5c96 Merge feat/strategy-improvements-fdac
# Conflicts:
#	src/cerbero_bite/gui/pages/7_📚_Strategia.py
#	strategy.aggressiva.yaml
#	strategy.conservativa.yaml
#	strategy.yaml
#	tests/unit/test_config_loader.py
2026-05-01 21:08:12 +00:00
root f24511fcad Merge feat/iv-rv-hard-gate 2026-05-01 21:06:32 +00:00
root 954baaa354 feat(cli): comando option-chain (trigger + analyze) per la catena opzioni
Espone direttamente da CLI le due operazioni più utili sui dati di
``option_chain_snapshots`` raccolti dal cron settimanale:

- ``cerbero-bite option-chain trigger`` — esegue UNA volta il
  collector della catena. Riusa la stessa pipeline schedulata (cron
  ``55 13 * * MON``) ma on-demand. Utile per popolare il DB senza
  aspettare lunedì.
- ``cerbero-bite option-chain analyze [--bias bull_put|bear_call]`` —
  legge l'ultimo snapshot, simula il selector di strike
  (``select_strikes``) con la strategy passata e stampa una tabella
  con: short/long strike, delta, width, credito reale, ratio
  credit/width, e PASS/FAIL del gate ``credit_to_width_ratio_min``.

Il comando ``analyze`` rende immediatamente actionable la catena
appena raccolta: invece di stime ex-ante via Black-Scholes (modulo
``core/backtest.py``), legge i mid REALI di Deribit e dice "il rule
engine aprirebbe questo trade qui? credit/width ratio passa o no?".

Esempio di output sui primi snapshot raccolti (regime ETH ~2200,
DTE ~14g):

    Snapshot del 2026-05-01T20:53:49 — 21 quote totali
    Il rule engine NON aprirebbe trade con questa catena
    (no strike compatibile coi gate delta/distance/width/credit-ratio).

Conferma empirica del messaggio del documento ``13-strategia-spiegata``:
con delta target 0.12 + width 4% + credit/width ≥ 30%, il regime
attuale di ETH options non è abbastanza ricco per produrre trade —
serve calibrare soglie o aspettare un regime IV più alto.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:57:40 +00:00
root 3e46169278 fix(migrations): rinomina 0004 → 0005 per coesistenza con auto_pause
La migrazione `0004_option_chain_snapshots.sql` collide con quella
parallela `0004_auto_pause.sql` del PR `feat/strategy-improvements-fdac`:
entrambe puntano allo stesso slot e bumpano user_version a 4.

Rinominata a 0005 (con `PRAGMA user_version = 5`) così le due
migrazioni possono coesistere senza conflitti, indipendentemente
dall'ordine di merge dei due PR. Quando i due PR landeranno in main,
basterà conservare la sequenza 0004 (auto_pause) → 0005 (option_chain).

Verificato in locale: deploy con DB già a v4 (post-FDAC) ora applica
correttamente la migrazione e crea la tabella `option_chain_snapshots`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:52:11 +00:00
root c0a0ee416f feat(state+runtime): option_chain_snapshots — catena opzioni storica per backtest reale
Aggiunge la persistence della option chain Deribit con cron settimanale
``55 13 * * MON`` (5 minuti prima del trigger entry alle 14:00 UTC),
sbloccando il backtest non-stilizzato e la calibrazione empirica
dello skew premium.

**Schema (migrazione 0004)**

Nuova tabella ``option_chain_snapshots`` con primary key composta
``(timestamp, instrument_name)`` — tutti i quote prelevati nello
stesso tick condividono il timestamp, così le query "lo snapshot del
2026-05-04 alle 13:55" diventano una singola WHERE timestamp = X.
Indici su (asset, timestamp DESC) e (asset, expiry) per supportare
sia listing recenti sia query per scadenza specifica.

Campi: instrument_name, strike, expiry, option_type (C/P), bid, ask,
mid, iv, delta, gamma, theta, vega, open_interest, volume_24h,
book_depth_top3. Tutti i numerici sono nullable: il collector è
best-effort, un ticker mancante produce comunque una riga (utile
per sapere che lo strumento esisteva ma non era quotato).

**Modello + repository**

- ``OptionChainQuoteRecord`` (Pydantic, in ``state/models.py``).
- ``Repository.record_option_chain_snapshot`` (bulk insert
  idempotente).
- ``Repository.list_option_chain_snapshots`` (filtri su asset,
  timestamp window, expiry window, limit default 50000).
- ``Repository.latest_option_chain_timestamp`` (freshness check
  per dashboard GUI).

**Collector**

Nuovo ``runtime/option_chain_snapshot_cycle.py`` che:

1. Calcola la finestra scadenze ``[now+dte_min, now+dte_max]`` da
   ``cfg.structure``: niente richieste su scadenze che il rule
   engine non userebbe mai.
2. Chiama ``deribit.options_chain()`` con
   ``min_open_interest=cfg.liquidity.open_interest_min``.
3. Batch ``deribit.get_tickers()`` (max 20 per call, limite Deribit)
   con error-isolation per batch — un batch fallito non blocca
   gli altri.
4. NON chiama l'order book per ogni strike (rate-limit guard);
   ``book_depth_top3`` resta NULL e il liquidity gate live lo
   chiede on-the-fly per gli strike candidati al picker.

Best-effort end-to-end: chain assente, get_tickers giù, persist
fallito → ritorna 0 senza alzare eccezioni, logga sempre.

**Schedulazione**

Wired in ``Orchestrator.install_scheduler`` come job parallelo a
``market_snapshot``, attivo solo quando
``ENABLE_DATA_ANALYSIS=true``. Cron parametrizzabile via il nuovo
kwarg ``option_chain_cron`` (default ``55 13 * * MON``).

**Test**

- 4 unit test del collector (happy path, ticker mancante, chain
  vuota, fetch fail best-effort) con mock di RuntimeContext.
- Aggiornato ``test_install_scheduler_registers_canonical_jobs``
  per includere il nuovo job nel set canonico.

**Cosa sblocca**

- Backtest non-stilizzato: il PR ``feat/backtest-engine`` può
  dropparsi il modello BS+skew_premium e leggere prezzi reali
  ``mid`` dalla chain registrata.
- Calibrazione empirica dello skew premium (hardcoded a 1.5 nel
  backtest stilizzato): plot del rapporto fra quote reali Deribit
  e BS per delta/expiry, regressione → valore data-driven.
- Validazione ex-post: "il delta-0.12 era davvero a 25% OTM in
  quella settimana?" diventa una query SELECT.
- Dimensione attesa: ~50 strike × 3 scadenze × 1 snapshot/settimana
  × 17 colonne ≈ 12 KB/settimana, ~600 KB/anno. Trascurabile.

Suite: 409 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:44:49 +00:00
root f664ea1a15 feat(backtest): stylized engine over market_snapshots + CLI subcommand
Aggiunge `core/backtest.py`, motore di backtesting stilizzato che gira
sui dati raccolti in `market_snapshots`. Risponde alla domanda:
"se questa config fosse stata attiva nelle ultime N settimane, quanti
lunedì avrebbero superato i filtri e quale sarebbe stato il P/L stimato?"

**Architettura a due strati**:

1. **Filtri di entry — RIGOROSO**: per ogni Monday-14:00-UTC nei
   snapshot ricostruisce `EntryContext` e chiama lo stesso
   `validate_entry()` del live. Output esatto di "cosa avrebbe deciso
   il bot" per ogni settimana, con conteggio dei motivi di skip.

2. **P/L per trade accettato — STILIZZATO**: senza catena opzioni
   storica, stima credito/exit via Black-Scholes con skew premium
   (default 1.5×) per approssimare la vol smile dell'ETH. Re-prezza
   il combo ad ogni tick futuro per simulare i trigger §7
   (profit_take, stop_loss, vol_stop, time_stop, expiry).

**Aggregati nel `BacktestReport`**:
- n_picks / n_accepted / n_skipped_data / n_completed / n_winners
- win_rate, P/L cumulato (USD + % su capitale)
- max drawdown (USD + % di peak)
- Sharpe annualizzato (52 settimane)
- skip_reasons: dict{motivo → settimane bloccate}

**CLI**: nuovo `cerbero-bite backtest --strategy F --from D --to D
--capital N --asset ETH`. Stampa Rich-formatted summary + tabella
motivi di skip. Esempio:

    cerbero-bite backtest \
      --strategy strategy.aggressiva.yaml \
      --from 2026-04-01 --to 2026-05-01 \
      --capital 10000

**Limiti dichiarati**:
- BS + skew_premium ≠ catena reale: i numeri P/L sono **stime ex-post
  per ranking config**, non promesse operative. Buono per dire
  "config A batte config B sui dati reali", non per dimensionare
  capitale.
- skew_premium 1.5× è stato calibrato sui dati Deribit storici
  (smile slope ETH options); va rifinito quando avremo abbastanza
  chain history da farlo empiricamente.

**Tests**: 15 unit test (BS math, monday picks, filter sim,
position outcome simulation, full pipeline su sintetico).
Suite totale: 420 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:31:54 +00:00
root 18cc27a76e feat(gui): simulazione P/L con effetti dei miglioramenti FDAC + IV-RV
Estende il pannello "💰 P/L atteso" della pagina `📚 Strategia` per
applicare gli effetti stimati di IV-RV gate, A (delta dinamico),
D (vol-harvest) e F (auto-pause) leggendoli direttamente dai
`strategy.*.yaml` di ciascun profilo.

- Nuova `_detect_features(strategy)` che ispeziona la config:
    A → `short_strike.delta_by_dvol` non vuoto
    D → `exit.vol_harvest_dvol_decrease > 0`
    F → `auto_pause.enabled`
    IV → `entry.iv_minus_rv_filter_enabled`
- `_compute_pl` accetta ora un dict `features` opzionale e applica:
    IV: +5 pp win-rate, −25% trade/anno (skip-week aggressivo)
    A: +1.5 pp win-rate, sl_loss × 0.95 (strike picking migliore)
    D: 5% trade convertiti da loss a harvest exit (+0.20×credito)
    F: −8% trade/anno (skip-week dopo streak)
- `_render_profile_card` mostra ora:
    badge "🟢 Miglioramenti attivi" con la lista per profilo,
    delta vs base in E[trade] e P/L annuo,
    help con win_rate effettivo / prob_loss / trade/anno.
- Checkbox "Applica effetti dei miglioramenti" (default ON) per
  switchare tra simulazione realistica e formula base.
- Nuova mini-tabella "Contributo marginale di ogni feature": per
  ogni miglioramento mostra ΔP/L annuo e ΔAPR isolando l'effetto
  del singolo feature, con marker " attiva nel YAML".
- Sensibilità win-rate ora applica le feature attive ai due profili.

Effetti dichiarati come **stime ex-ante** dalla letteratura
short-vol systematic; i valori puntuali (+5 pp win, etc.) andranno
calibrati sul dataset accumulato.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:17:24 +00:00
root 1c6baaee83 feat(strategy): F+D+A miglioramenti — auto-pause, vol-harvest, delta dinamico
Implementa tre miglioramenti dalla roadmap di "📚 Strategia" + scaffolding del quarto.
Tutti retro-compatibili: i defaults della golden config disabilitano le nuove funzioni
così il comportamento attuale resta invariato finché l'operatore non le accende
esplicitamente in `strategy.yaml`. Il profilo `strategy.aggressiva.yaml` opta-in
agli incrementi più impattanti.

**F — Auto-pause su drawdown rolling (§7-bis)**

Circuit breaker sopra il kill-switch tecnico. Quando le ultime N posizioni
chiuse hanno cumulato perdite oltre `max_drawdown_pct × capitale_attuale`,
l'engine si auto-mette in pausa per `pause_weeks` settimane. Difende dai
regime change non rilevati dai filtri quant — se i filtri stanno fallendo
sistematicamente, fermarsi è meglio che continuare a sanguinare.

- `AutoPauseConfig` + `cfg.auto_pause` (top-level, default disabled).
- Migrazione SQL `0004_auto_pause.sql`: `system_state.auto_pause_until`
  e `auto_pause_reason` (NULL = engine attivo).
- Nuovo modulo puro `runtime/auto_pause.py` con `is_paused()` (gate I/O-free)
  e `evaluate_drawdown_breach()` (decide se armare).
- `entry_cycle` consulta `is_paused` subito dopo il kill-switch e arma
  la pausa dopo aver calcolato il capitale; nuovo status `_STATUS_AUTO_PAUSED`.
- Repository: `set_auto_pause`, `recent_closed_position_pnls_usd`.
- 12 test unitari: gate filter on/off, lookback insufficiente, soglia
  esatta, capitale non valido, transizioni paused → not-paused.

**D — Vol-collapse harvest (§7-bis)**

Exit opportunistica: quando DVOL è scesa di tot punti rispetto all'entry
e siamo in profit, esce subito. Edge IV-RV catturato, non c'è motivo di
tenere fino al profit-take. Nuovo `ExitAction = "CLOSE_VOL_HARVEST"`,
gate `exit.vol_harvest_dvol_decrease` (default 0 = off). 5 test unitari.

**A — Delta target dinamico per regime DVOL (§3.2)**

Strike short adattivo alla volatilità: a DVOL bassa il margine OTM è
generoso ⇒ posso prendere più premio (delta 0.15); a DVOL alta voglio
più safety distance (delta 0.10). Nuovo `DeltaByDvolBand` (step
function); quando `delta_by_dvol` è popolato, `_select_short` legge
la prima banda ascending con `dvol_now ≤ dvol_under`. Default vuoto =
comportamento invariato. `select_strikes` accetta nuovo kwarg
`dvol_now`, propagato da `entry_cycle`. 4 test unitari.

**C — Scaffolding profit-take graduale (§7.1bis)**

Schema in place ma runtime non ancora wirato. Aggiunge `PartialProfitLevel`
e `exit.profit_take_partial_levels` (default vuoto). Nuovo
`ExitAction = "CLOSE_PROFIT_PARTIAL"` nella Literal. La pipeline di
chiusure parziali nel runtime (entry_cycle / repository / clients)
richiede refactor del position model — lasciato come TODO per un PR
dedicato. La schema è pronta a recepire la config futura senza altri
breaking change.

**Profili aggiornati**

- `strategy.yaml` (golden, 1.2.0): tutto disabilitato by default.
- `strategy.conservativa.yaml` (1.2.0-cons): identico al golden.
- `strategy.aggressiva.yaml` (1.2.0-aggr): A+D+F enabled
  (delta_by_dvol 0.15/0.12/0.10, vol_harvest a 15 pt vol,
  auto_pause @ 15% DD su 5 trade, 2 settimane pausa).

Bump versioni 1.1.0 → 1.2.0, hash ricalcolati, test pinning aggiornato.

Suite: 426 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:07:25 +00:00
24 changed files with 2946 additions and 39 deletions
+385 -1
View File
@@ -13,7 +13,7 @@ import asyncio
import os
import sys
from collections.abc import Callable
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from pathlib import Path
from typing import Any
@@ -679,6 +679,155 @@ def replay(date_from: str, date_to: str, capital: float, dry_run: bool) -> None:
)
@main.command()
@click.option(
"--strategy",
"strategy_path",
type=click.Path(path_type=Path),
default=Path("strategy.yaml"),
show_default=True,
help="Path al file di strategia (golden, conservativa, aggressiva, ...).",
)
@click.option(
"--db",
"db_path",
type=click.Path(path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
help="SQLite con `market_snapshots` storiche.",
)
@click.option(
"--from",
"date_from",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: 90 giorni fa).",
)
@click.option(
"--to",
"date_to",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: oggi).",
)
@click.option(
"--capital",
type=float,
default=1500.0,
show_default=True,
help="Capitale di partenza per il backtest, in USD.",
)
@click.option(
"--asset",
type=str,
default="ETH",
show_default=True,
help="Asset di riferimento per le snapshot.",
)
@click.option(
"--no-enforce-hash",
is_flag=True,
default=False,
help="Salta la verifica del config_hash (utile per profili sperimentali).",
)
def backtest(
strategy_path: Path,
db_path: Path,
date_from: datetime | None,
date_to: datetime | None,
capital: float,
asset: str,
no_enforce_hash: bool,
) -> None:
"""Esegue il backtest stilizzato su `market_snapshots` storiche.
Usa lo stesso `validate_entry` del live per i filtri (rigoroso) e
un modello Black-Scholes con skew premium per stimare credito ed
exit P/L (stilizzato — vedi docstring di `core/backtest.py`).
"""
from cerbero_bite.config.loader import load_strategy # noqa: PLC0415
from cerbero_bite.core.backtest import run_backtest # noqa: PLC0415
console = Console()
if date_to is None:
date_to = datetime.now(UTC)
if date_from is None:
date_from = date_to - timedelta(days=90)
date_from = date_from.replace(tzinfo=UTC) if date_from.tzinfo is None else date_from
date_to = date_to.replace(tzinfo=UTC) if date_to.tzinfo is None else date_to
loaded = load_strategy(strategy_path, enforce_hash=not no_enforce_hash)
cfg = loaded.config
conn = connect_state(db_path)
try:
repo = Repository()
snapshots = repo.list_market_snapshots(
conn,
asset=asset.upper(),
start=date_from,
end=date_to,
limit=10000,
)
finally:
conn.close()
if not snapshots:
console.print(
f"[yellow]Nessuno snapshot {asset} trovato fra {date_from.date()} "
f"e {date_to.date()}.[/yellow]"
)
sys.exit(1)
console.print(
f"[green]Caricate {len(snapshots)} snapshot {asset} "
f"({snapshots[-1].timestamp.date()}{snapshots[0].timestamp.date()})[/green]"
)
report = run_backtest(snapshots, cfg, capital_usd=Decimal(str(capital)))
table = Table(title=f"Backtest report — {strategy_path.name}")
table.add_column("Metrica", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Picks (lunedì 14:00)", str(report.n_picks))
table.add_row(
"Accettati dai filtri",
f"{report.n_accepted} ({report.n_accepted / max(1, report.n_picks):.0%})",
)
table.add_row("Saltati per dato mancante", str(report.n_skipped_data))
table.add_row("Trade completati (con P/L)", str(report.n_completed))
table.add_row("Vincenti", f"{report.n_winners} ({report.win_rate:.0%})")
table.add_row("P/L cumulato (USD)", f"{report.cumulative_pnl_usd:+.2f}")
table.add_row(
"P/L su capitale", f"{report.cumulative_pnl_pct_of_capital:+.2%}"
)
table.add_row(
"Max drawdown", f"{report.max_drawdown_usd:.0f} USD "
f"({report.max_drawdown_pct:.1%})",
)
table.add_row(
"Sharpe (annualized)",
f"{report.sharpe_annualized}" if report.sharpe_annualized is not None
else "",
)
console.print(table)
if report.skip_reasons:
skip_table = Table(title="Motivi di skip aggregati")
skip_table.add_column("Motivo")
skip_table.add_column("Settimane", justify="right")
for reason, count in sorted(
report.skip_reasons.items(), key=lambda kv: -kv[1]
):
skip_table.add_row(reason, str(count))
console.print(skip_table)
console.print(
"[dim]Il modello P/L è stilizzato: BS + skew premium 1.5×. "
"Numeri ottimi per ranking config, non per promesse operative.[/dim]"
)
@main.group()
def config() -> None:
"""Strategy configuration utilities."""
@@ -812,6 +961,241 @@ def state_inspect(db: Path) -> None:
console.print(table)
@main.group(name="option-chain")
def option_chain() -> None:
"""Strumenti per la catena opzioni storica (`option_chain_snapshots`)."""
@option_chain.command(name="trigger")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--audit",
"audit_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
@click.option(
"--token",
type=str,
default=None,
help="MCP bearer token (override su CERBERO_BITE_MCP_TOKEN).",
)
@click.option("--asset", default="ETH", show_default=True)
def option_chain_trigger(
strategy_path: Path,
db_path: Path,
audit_path: Path,
token: str | None,
asset: str,
) -> None:
"""Esegue UNA volta il collector della catena opzioni e persiste in DB.
Utile per popolare i dati senza aspettare il cron settimanale del
job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline
schedulata.
"""
from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415
from cerbero_bite.runtime.option_chain_snapshot_cycle import ( # noqa: PLC0415
collect_option_chain_snapshot,
)
cfg = load_strategy(strategy_path).config
ctx = build_runtime(
cfg=cfg,
endpoints=load_endpoints(),
token=load_token(value=token),
db_path=db_path,
audit_path=audit_path,
bot_tag=load_bot_tag(),
)
n = asyncio.run(collect_option_chain_snapshot(ctx, asset=asset))
console.print(
f"[green]Persisted {n} option chain quote(s) for {asset}[/green]"
if n > 0
else f"[yellow]No quotes persisted (chain empty or fetch failed)[/yellow]"
)
@option_chain.command(name="analyze")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option("--asset", default="ETH", show_default=True)
@click.option(
"--bias",
type=click.Choice(["bull_put", "bear_call"], case_sensitive=False),
default="bull_put",
show_default=True,
help="Direzione da simulare (il rule engine lo deciderebbe da trend×funding).",
)
def option_chain_analyze(
strategy_path: Path,
db_path: Path,
asset: str,
bias: str,
) -> None:
"""Analizza l'ultimo snapshot di catena salvato.
Per la strategia indicata, simula la selezione strike (delta
target, OTM range, width 4%, credit/width ratio min) e mostra:
* lo strike che il rule engine sceglierebbe come short e long,
* credito atteso, larghezza, rapporto credit/width,
* pass/fail del gate `credit_to_width_ratio_min`.
"""
from cerbero_bite.core.combo_builder import select_strikes # noqa: PLC0415
from cerbero_bite.core.types import OptionQuote # noqa: PLC0415
cfg = load_strategy(strategy_path).config
conn = connect_state(db_path)
try:
repo = Repository()
latest_ts = repo.latest_option_chain_timestamp(conn, asset=asset.upper())
if latest_ts is None:
console.print(
"[red]Nessuno snapshot di catena trovato. Lancia prima "
"`cerbero-bite option-chain trigger`.[/red]"
)
sys.exit(1)
quotes_records = repo.list_option_chain_snapshots(
conn, asset=asset.upper(), start=latest_ts, end=latest_ts,
)
finally:
conn.close()
console.print(
f"[cyan]Snapshot del {latest_ts.isoformat()}{len(quotes_records)} "
f"quote totali[/cyan]"
)
# Costruzione OptionQuote da OptionChainQuoteRecord per riusare select_strikes.
quotes: list[OptionQuote] = []
for q in quotes_records:
if q.bid is None or q.ask is None or q.mid is None or q.delta is None:
continue
quotes.append(
OptionQuote(
instrument=q.instrument_name,
strike=q.strike,
expiry=q.expiry,
option_type=q.option_type,
bid=q.bid,
ask=q.ask,
mid=q.mid,
delta=q.delta,
gamma=q.gamma or Decimal("0"),
theta=q.theta or Decimal("0"),
vega=q.vega or Decimal("0"),
open_interest=q.open_interest or 0,
volume_24h=q.volume_24h or 0,
book_depth_top3=q.book_depth_top3 or 0,
)
)
if not quotes:
console.print("[red]Nessun quote completo per la simulazione.[/red]")
sys.exit(1)
# Lo spot al momento dello snapshot: estraiamo dall'ultimo
# `market_snapshot` ETH a quel timestamp (tolleranza ±15 min).
spot = _resolve_spot_at(db_path, asset=asset.upper(), at=latest_ts)
if spot is None:
console.print(
"[yellow]Spot non recuperabile dai market_snapshots; "
"stimato dal mid ATM.[/yellow]"
)
spot = _atm_spot_proxy(quotes)
selection = select_strikes(
chain=quotes,
bias=bias, # type: ignore[arg-type]
spot=spot,
now=latest_ts,
cfg=cfg,
)
if selection is None:
console.print(
"[red]Il rule engine NON aprirebbe trade con questa catena[/red] "
"(no strike compatibile coi gate delta/distance/width/credit-ratio)."
)
sys.exit(0)
short, long_ = selection
width_usd = (short.strike - long_.strike).copy_abs()
credit_eth = short.mid - long_.mid
credit_usd = credit_eth * spot
ratio = credit_usd / width_usd if width_usd > 0 else Decimal("0")
ratio_target = cfg.structure.credit_to_width_ratio_min
table = Table(title=f"Simulazione picker — bias={bias}, spot={spot:.0f}")
table.add_column("Campo", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Short strike", f"{short.strike} ({short.delta:+.3f}δ)")
table.add_row("Long strike", f"{long_.strike} ({long_.delta:+.3f}δ)")
table.add_row("Width", f"{width_usd:.0f} USD")
table.add_row("Credit", f"{credit_eth:.4f} ETH ≈ {credit_usd:.2f} USD")
table.add_row(
"Credit/width ratio",
f"{ratio:.2%} (gate ≥ {float(ratio_target):.0%})",
)
pass_str = (
"[green]PASS — entry possibile[/green]"
if ratio >= ratio_target
else "[red]FAIL — premio troppo magro[/red]"
)
table.add_row("Verdetto gate ratio", pass_str)
console.print(table)
def _resolve_spot_at(db_path: Path, *, asset: str, at: datetime) -> Decimal | None:
"""Best-effort lookup dello spot al timestamp ``at`` ± 15 min."""
conn = connect_state(db_path)
try:
rows = Repository().list_market_snapshots(
conn,
asset=asset,
start=at - timedelta(minutes=15),
end=at + timedelta(minutes=15),
limit=1,
)
finally:
conn.close()
if not rows:
return None
return rows[0].spot
def _atm_spot_proxy(quotes: list[Any]) -> Decimal:
"""Stima dello spot prendendo lo strike il cui delta è più vicino a 0.5."""
quote = min(quotes, key=lambda q: abs(abs(q.delta) - Decimal("0.5")))
return quote.strike
def _entrypoint() -> None:
"""Wrapper used by ``cerbero-bite`` console script."""
try:
+94
View File
@@ -90,6 +90,17 @@ class EntryConfig(BaseModel):
# ---------------------------------------------------------------------------
class DeltaByDvolBand(BaseModel):
"""Banda della step function delta-target per regime DVOL (§3.2 A)."""
model_config = ConfigDict(frozen=True, extra="forbid")
dvol_under: Decimal
delta_target: Decimal
delta_min: Decimal
delta_max: Decimal
class ShortStrikeSpec(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
@@ -99,6 +110,16 @@ class ShortStrikeSpec(BaseModel):
distance_otm_pct_min: Decimal = Field(default=Decimal("0.15"))
distance_otm_pct_max: Decimal = Field(default=Decimal("0.25"))
# §3.2 enhancement (A): step function delta-target by DVOL regime.
# Empty list = behaviour invariato (delta_target sopra è il singolo
# valore). Quando popolato, il combo_builder sceglie la prima
# banda ordinata ascending su `dvol_under` con
# `dvol_now ≤ dvol_under`. Esempio:
# - dvol_under=50 → delta 0.15 (bassa vol → più premio)
# - dvol_under=70 → delta 0.12
# - dvol_under=90 → delta 0.10 (alta vol → più safety)
delta_by_dvol: list[DeltaByDvolBand] = Field(default_factory=list)
class SpreadWidthSpec(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
@@ -174,6 +195,25 @@ class SizingConfig(BaseModel):
# ---------------------------------------------------------------------------
class PartialProfitLevel(BaseModel):
"""Livello della scala di profit-take graduale (§7.1bis C).
`mark_at_pct_credit`: il livello è triggerato quando
`mark_combo ≤ mark_at_pct_credit × credito_iniziale` (es. 0.25 =
25% del credito = 75% di profitto sulla porzione chiusa).
`close_pct_of_initial_contracts`: frazione dei contratti aperti
INIZIALMENTE da chiudere a questo livello (es. 0.50 = chiudi metà).
Le frazioni sono cumulative; chiudere oltre i contratti residui
è no-op.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
mark_at_pct_credit: Decimal
close_pct_of_initial_contracts: Decimal
class ExitConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
@@ -185,6 +225,29 @@ class ExitConfig(BaseModel):
delta_breach_threshold: Decimal = Field(default=Decimal("0.30"))
adverse_move_4h_pct: Decimal = Field(default=Decimal("0.05"))
# §7.1ter (D): vol-collapse harvest. Esce in profit anche se il
# profit-take non è ancora colpito quando DVOL è scesa di tot
# punti rispetto all'entry (edge IV-RV catturato, vol attesa già
# rientrata). 0 = filtro disabilitato.
vol_harvest_dvol_decrease: Decimal = Field(default=Decimal("0"))
# §7.1bis (C): scala graduata di profit-take. Lista vuota =
# comportamento invariato (chiusura atomica al
# `profit_take_pct_of_credit`). Quando popolata, l'engine
# interpreta come "chiudi N% dei contratti iniziali al livello
# di mark M%×credito". Le entry sono ordinate dal mark più alto
# (più profit, livello triggerato prima) al più basso. Vedi
# `core/exit_decision.py` per la semantica esatta.
#
# ATTENZIONE: questa funzione richiede il supporto di chiusure
# parziali nel runtime (entry_cycle / repository / clients).
# Fino al merge della partial-close pipeline, l'engine la mappa
# a CLOSE_PROFIT atomico al primo livello triggerato (vedi
# commento in `evaluate`). Default vuoto = no-op.
profit_take_partial_levels: list[PartialProfitLevel] = Field(
default_factory=list
)
monitor_cron: str = "0 2,14 * * *"
user_confirmation_timeout_min: int = 30
escalate_on_timeout: list[str] = Field(
@@ -192,6 +255,36 @@ class ExitConfig(BaseModel):
)
# ---------------------------------------------------------------------------
# Auto-pause (F): circuit breaker su drawdown rolling
# ---------------------------------------------------------------------------
class AutoPauseConfig(BaseModel):
"""Configurazione del circuit breaker su drawdown.
Quando abilitato, il rule engine valuta — prima di ogni entry —
il P/L cumulato delle ultime `lookback_trades` posizioni chiuse
in proporzione al capitale attuale. Se la perdita supera la
soglia, l'engine si auto-mette in pausa per `pause_weeks`
settimane (skip-week). La pausa si annulla automaticamente alla
scadenza, oppure manualmente via comando dalla GUI.
Difende da regime change non rilevati dai filtri quant: se i
filtri stanno fallendo sistematicamente, vale la pena fermarsi
e attendere che le condizioni cambino, invece di continuare a
sanguinare. È un'estensione conservativa del kill switch
(che oggi reagisce solo a errori tecnici).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
enabled: bool = False
lookback_trades: int = 5
max_drawdown_pct: Decimal = Field(default=Decimal("0.10"))
pause_weeks: int = 2
# ---------------------------------------------------------------------------
# Kelly recalibration
# ---------------------------------------------------------------------------
@@ -265,6 +358,7 @@ class StrategyConfig(BaseModel):
sizing: SizingConfig = Field(default_factory=SizingConfig)
exit: ExitConfig = Field(default_factory=ExitConfig)
kelly_recalibration: KellyConfig = Field(default_factory=KellyConfig)
auto_pause: AutoPauseConfig = Field(default_factory=AutoPauseConfig)
execution: ExecutionConfig = Field(default_factory=ExecutionConfig)
monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig)
+652
View File
@@ -0,0 +1,652 @@
"""Stylized backtest engine over ``market_snapshots`` (§13).
Two layers, both pure functions:
1. **Entry-filter simulation** — for each Monday 14:00 UTC tick in the
recorded snapshots, evaluate which §2 gates would have passed,
reconstructing :class:`EntryContext` from the snapshot. This part
is **rigorous**: it uses the same :func:`validate_entry` the live
engine uses, so the output is exactly "what the bot would have
decided".
2. **P/L estimation per accepted entry** — since ``market_snapshots``
does NOT record the option chain (we only collect spot, DVOL,
funding, etc.), credit and exit P/L are estimated via a stylized
Black-Scholes model: given ``spot``, ``DVOL`` (as IV), and the
strategy's delta target, we solve for the short strike, the long
strike at ``width_pct`` distance, and the combo mid-price. Future
ticks are then re-priced under the same model to detect the first
exit trigger from §7.
The stylized layer is **intentionally approximate**: it captures the
geometry of the strategy (DVOL band sets credit, ETH path drives
exit triggers) but not the second-order effects (chain liquidity,
borrow rates, exchange fees beyond the 0.03% notional cap, dealer
hedging skew). Numbers are good for ranking and tuning, not for
operational P/L promises.
The engine is deterministic and side-effect-free: it does **not**
write to SQLite, does not call MCP, does not place orders. It
operates entirely on a list of :class:`MarketSnapshotRecord` rows
the caller has already loaded.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Literal
from pydantic import BaseModel, ConfigDict
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.core.entry_validator import EntryContext, validate_entry
from cerbero_bite.state.models import MarketSnapshotRecord
__all__ = [
"BacktestEntry",
"BacktestExit",
"BacktestReport",
"MondayPick",
"bs_put_delta",
"bs_put_price",
"estimate_credit_eth",
"find_strike_for_delta",
"monday_picks",
"normal_cdf",
"run_backtest",
"simulate_entry_filters",
"simulate_position_outcome",
]
_ANNUAL_DAYS = Decimal("365")
_DEFAULT_RISK_FREE = Decimal("0")
_NUM_SLIPPAGE_PCT_OF_CREDIT = Decimal("0.03")
_NUM_FEE_PCT_OF_NOTIONAL = Decimal("0.0003")
# ---------------------------------------------------------------------------
# Black-Scholes helpers (stdlib-only)
# ---------------------------------------------------------------------------
def normal_cdf(x: float) -> float:
"""Standard normal CDF, no scipy dependency."""
return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))
def bs_put_price(*, spot: float, strike: float, t_years: float, sigma: float) -> float:
"""European put price under r=0, q=0 Black-Scholes.
Returns price in spot units (so for an ETH option, dividing by spot
gives the price in ETH).
"""
if t_years <= 0 or sigma <= 0 or spot <= 0 or strike <= 0:
return max(0.0, strike - spot)
sqrt_t = math.sqrt(t_years)
d1 = (math.log(spot / strike) + 0.5 * sigma * sigma * t_years) / (sigma * sqrt_t)
d2 = d1 - sigma * sqrt_t
return strike * normal_cdf(-d2) - spot * normal_cdf(-d1)
def bs_put_delta(*, spot: float, strike: float, t_years: float, sigma: float) -> float:
"""Put delta under r=0, q=0 Black-Scholes (negative number for put).
Returns 0 for expired options.
"""
if t_years <= 0 or sigma <= 0 or spot <= 0 or strike <= 0:
return 0.0
sqrt_t = math.sqrt(t_years)
d1 = (math.log(spot / strike) + 0.5 * sigma * sigma * t_years) / (sigma * sqrt_t)
return normal_cdf(d1) - 1.0 # = -N(-d1)
def find_strike_for_delta(
*,
spot: float,
dvol_pct: float,
dte_days: int,
target_delta_abs: float,
) -> float:
"""Solve for the put strike whose |delta| matches ``target_delta_abs``.
Bisection on a monotone-decreasing |delta(strike)| relationship.
Returns the strike in absolute USD terms.
"""
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
# Bracket: from 50% of spot (deep OTM, small |delta|) up to spot
# (ATM, |delta| ≈ 0.5).
low = max(1.0, spot * 0.30)
high = spot
for _ in range(64):
mid = 0.5 * (low + high)
delta_abs = abs(bs_put_delta(spot=spot, strike=mid, t_years=t_years, sigma=sigma))
if delta_abs > target_delta_abs:
high = mid
else:
low = mid
if abs(high - low) < 1e-3:
break
return 0.5 * (low + high)
def estimate_credit_eth(
*,
spot: float,
dvol_pct: float,
dte_days: int,
width_pct: float,
delta_target_abs: float,
skew_premium: float = 1.5,
) -> tuple[float, float, float]:
"""Estimate credit (ETH), short_strike, long_strike for a bull-put-style
credit spread under Black-Scholes.
``skew_premium`` è il moltiplicatore applicato al credito BS per
approssimare la **vol smile** dell'ETH options market (le put OTM
trattano a IV più alta della IV ATM, quindi un BS pulito sottostima
sistematicamente il premio del venditore di vol). Il default 1.5
è una stima conservativa dei dati Deribit storici (smile slope
tipica 5-10 vol points per 100δ); valori sensati: 1.3 (smile
blanda) … 1.8 (regime "stress IV"). Va calibrato sui dati reali
quando avremo abbastanza chain history da farlo.
Returns ``(credit_eth, short_strike, long_strike)``. Credit è
già moltiplicato per ``skew_premium``.
"""
short_strike = find_strike_for_delta(
spot=spot, dvol_pct=dvol_pct, dte_days=dte_days,
target_delta_abs=delta_target_abs,
)
width_usd = width_pct * spot
long_strike = max(1.0, short_strike - width_usd)
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
short_mid_usd = bs_put_price(
spot=spot, strike=short_strike, t_years=t_years, sigma=sigma,
)
long_mid_usd = bs_put_price(
spot=spot, strike=long_strike, t_years=t_years, sigma=sigma,
)
short_mid_eth = short_mid_usd / spot
long_mid_eth = long_mid_usd / spot
credit_eth = (short_mid_eth - long_mid_eth) * skew_premium
return credit_eth, short_strike, long_strike
# ---------------------------------------------------------------------------
# Entry filter simulation — rigorous (uses validate_entry exactly)
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class MondayPick:
"""Indice di un tick "Monday 14:00 UTC" nella time-series."""
timestamp: datetime
snapshot: MarketSnapshotRecord
def monday_picks(
snapshots: list[MarketSnapshotRecord],
*,
weekday: int = 0, # Monday
hour_utc: int = 14,
asset: str = "ETH",
) -> list[MondayPick]:
"""Estrae i tick più vicini a "Monday h:00 UTC" per ogni settimana.
``snapshots`` deve essere ordinato per timestamp ascending. Per ogni
occorrenza di ``weekday + hour_utc`` (es. lun 14:00) presa l'unica
riga ETH che la copre. Settimane senza tick a quell'ora vengono
saltate.
"""
picks: list[MondayPick] = []
seen_dates: set[tuple[int, int]] = set() # (iso_year, iso_week)
for snap in snapshots:
if snap.asset.upper() != asset.upper():
continue
ts = snap.timestamp.astimezone(UTC)
if ts.weekday() != weekday or ts.hour != hour_utc:
continue
iso_y, iso_w, _ = ts.isocalendar()
key = (iso_y, iso_w)
if key in seen_dates:
continue
seen_dates.add(key)
picks.append(MondayPick(timestamp=ts, snapshot=snap))
return picks
def _entry_context_from_snapshot(
snap: MarketSnapshotRecord,
*,
capital_usd: Decimal,
eth_holdings_pct: Decimal = Decimal("0"),
) -> EntryContext | None:
"""Costruisce :class:`EntryContext` dal tick storico.
``None`` quando la riga non ha i campi minimi (spot, dvol, funding).
Nel filtro questo si traduce in "skip della settimana" — è la
stessa logica del live: un tick incompleto è meglio di un'entry
al buio.
"""
if snap.dvol is None or snap.funding_perp_annualized is None:
return None
return EntryContext(
capital_usd=capital_usd,
dvol_now=snap.dvol,
funding_perp_annualized=snap.funding_perp_annualized,
eth_holdings_pct_of_portfolio=eth_holdings_pct,
next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma,
liquidation_squeeze_risk_high=(
snap.liquidation_long_risk == "high"
or snap.liquidation_short_risk == "high"
),
)
@dataclass(frozen=True)
class EntryFilterResult:
"""Esito del check filtri per una singola Monday pick."""
pick: MondayPick
accepted: bool
reasons: list[str]
skipped_for_data: bool # True se il tick non aveva i campi minimi
def simulate_entry_filters(
picks: list[MondayPick],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
) -> list[EntryFilterResult]:
"""Per ogni Monday pick, valuta validate_entry come farebbe il live.
Rigoroso: usa esattamente :func:`validate_entry` e :class:`EntryContext`.
Restituisce la lista degli esiti, una entry per pick.
"""
results: list[EntryFilterResult] = []
for pick in picks:
ctx = _entry_context_from_snapshot(pick.snapshot, capital_usd=capital_usd)
if ctx is None:
results.append(
EntryFilterResult(
pick=pick,
accepted=False,
reasons=["incomplete_snapshot"],
skipped_for_data=True,
)
)
continue
decision = validate_entry(ctx, cfg)
results.append(
EntryFilterResult(
pick=pick,
accepted=decision.accepted,
reasons=list(decision.reasons),
skipped_for_data=False,
)
)
return results
# ---------------------------------------------------------------------------
# Position outcome simulation — stylized (Black-Scholes re-pricing)
# ---------------------------------------------------------------------------
class BacktestEntry(BaseModel):
"""Trade aperto nel backtest (snapshot al momento dell'entry)."""
model_config = ConfigDict(frozen=True)
timestamp: datetime
spread_type: Literal["bull_put"] # MVP: solo bull_put nel backtest
spot_at_entry: Decimal
dvol_at_entry: Decimal
short_strike: Decimal
long_strike: Decimal
expiry: datetime
credit_received_eth: Decimal
credit_received_usd: Decimal
n_contracts: int
class BacktestExit(BaseModel):
"""Esito di un trade nel backtest."""
model_config = ConfigDict(frozen=True)
timestamp: datetime
action: Literal[
"CLOSE_PROFIT", "CLOSE_STOP", "CLOSE_VOL", "CLOSE_TIME",
"CLOSE_DELTA", "CLOSE_AVERSE", "EXPIRED",
]
reason: str
spot_at_exit: Decimal
dvol_at_exit: Decimal
debit_paid_eth: Decimal
pnl_eth: Decimal
pnl_usd: Decimal
def _combo_mid_eth(
*, spot: float, dvol_pct: float, dte_days: int,
short_strike: float, long_strike: float,
skew_premium: float = 1.5,
) -> float:
"""Re-prezza il combo bull-put usando BS sul nuovo spot/dvol/dte."""
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
short_mid_usd = bs_put_price(
spot=spot, strike=short_strike, t_years=t_years, sigma=sigma,
)
long_mid_usd = bs_put_price(
spot=spot, strike=long_strike, t_years=t_years, sigma=sigma,
)
return (short_mid_usd - long_mid_usd) / spot * skew_premium
def simulate_position_outcome(
entry: BacktestEntry,
future_snapshots: list[MarketSnapshotRecord],
cfg: StrategyConfig,
) -> BacktestExit:
"""Re-prezza il combo a ogni tick futuro fino al primo exit trigger.
Triggers in ordine §7:
1. profit_take (debit ≤ 0.5×credit)
2. stop_loss (debit ≥ 2.5×credit)
3. vol_stop (DVOL salita di ≥10 pt rispetto entry)
4. time_stop (DTE ≤ 7 e debit > 0.7×credit)
5. expiry (uscita per scadenza, P/L = credit intrinsic)
"""
ec = cfg.exit
credit = float(entry.credit_received_eth)
short = float(entry.short_strike)
long_ = float(entry.long_strike)
profit_thresh = float(ec.profit_take_pct_of_credit) * credit
stop_thresh = float(ec.stop_loss_mark_x_credit) * credit
skip_time_thresh = float(ec.time_stop_skip_if_close_to_profit_pct) * credit
for snap in future_snapshots:
if snap.timestamp <= entry.timestamp:
continue
if snap.timestamp >= entry.expiry:
break
if snap.dvol is None or snap.spot is None:
continue
spot_now = float(snap.spot)
dvol_now = float(snap.dvol)
dte = max(0, (entry.expiry - snap.timestamp).days)
debit = _combo_mid_eth(
spot=spot_now, dvol_pct=dvol_now, dte_days=dte,
short_strike=short, long_strike=long_,
)
if debit <= profit_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_PROFIT",
reason=f"debit {debit:.4f}{profit_thresh:.4f}",
)
if debit >= stop_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_STOP",
reason=f"debit {debit:.4f}{stop_thresh:.4f}",
)
if dvol_now >= float(entry.dvol_at_entry) + float(ec.vol_stop_dvol_increase):
return _exit(
snap, entry, debit,
action="CLOSE_VOL",
reason=f"DVOL {dvol_now:.1f} ≥ entry+{ec.vol_stop_dvol_increase}",
)
if dte <= ec.time_stop_dte_remaining and debit > skip_time_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_TIME",
reason=f"DTE {dte}{ec.time_stop_dte_remaining}",
)
# Tick passati senza trigger: scadenza naturale.
last = future_snapshots[-1] if future_snapshots else None
intrinsic = max(0.0, short - float(last.spot if last and last.spot else 0))
intrinsic_capped = min(intrinsic, short - long_)
debit_at_expiry_eth = (
intrinsic_capped / float(last.spot)
if last is not None and last.spot is not None and float(last.spot) > 0
else 0.0
)
return _exit(
last or _synthetic_expiry_snapshot(entry),
entry,
debit_at_expiry_eth,
action="EXPIRED",
reason="held to expiry",
)
def _synthetic_expiry_snapshot(entry: BacktestEntry) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=entry.expiry,
asset="ETH",
spot=entry.spot_at_entry,
dvol=entry.dvol_at_entry,
fetch_ok=False,
)
def _exit(
snap: MarketSnapshotRecord,
entry: BacktestEntry,
debit_eth: float,
*,
action: str,
reason: str,
) -> BacktestExit:
pnl_eth = float(entry.credit_received_eth) - debit_eth
spot = float(snap.spot) if snap.spot is not None else float(entry.spot_at_entry)
dvol = float(snap.dvol) if snap.dvol is not None else float(entry.dvol_at_entry)
return BacktestExit(
timestamp=snap.timestamp,
action=action, # type: ignore[arg-type]
reason=reason,
spot_at_exit=Decimal(str(spot)),
dvol_at_exit=Decimal(str(dvol)),
debit_paid_eth=Decimal(str(debit_eth)),
pnl_eth=Decimal(str(pnl_eth)),
pnl_usd=Decimal(str(pnl_eth * spot * entry.n_contracts)),
)
# ---------------------------------------------------------------------------
# Full pipeline
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class CompletedTrade:
entry: BacktestEntry
exit: BacktestExit
class BacktestReport(BaseModel):
"""Aggregato del backtest. Tutti i numeri sono **stime**."""
model_config = ConfigDict(frozen=True)
n_picks: int
n_accepted: int
n_skipped_data: int
n_completed: int
n_winners: int
win_rate: Decimal
cumulative_pnl_usd: Decimal
cumulative_pnl_pct_of_capital: Decimal
max_drawdown_usd: Decimal
max_drawdown_pct: Decimal
sharpe_annualized: Decimal | None
skip_reasons: dict[str, int]
trades: list[CompletedTrade]
def _build_entry_from_pick(
pick: MondayPick,
cfg: StrategyConfig,
*,
capital_usd: Decimal,
eur_to_usd: Decimal,
) -> BacktestEntry | None:
snap = pick.snapshot
if snap.spot is None or snap.dvol is None:
return None
spot = float(snap.spot)
dvol = float(snap.dvol)
width_pct = float(cfg.structure.spread_width.target_pct_of_spot)
delta_target = float(cfg.structure.short_strike.delta_target)
dte = cfg.structure.dte_target
credit_eth, short_strike, long_strike = estimate_credit_eth(
spot=spot, dvol_pct=dvol, dte_days=dte,
width_pct=width_pct, delta_target_abs=delta_target,
)
width_usd = float(cfg.structure.spread_width.target_pct_of_spot) * spot
credit_usd = credit_eth * spot
if width_usd <= 0 or credit_usd / width_usd < float(
cfg.structure.credit_to_width_ratio_min
):
return None # ratio gate fallisce → no entry
cap_pertrade_usd = float(cfg.sizing.cap_per_trade_eur) * float(eur_to_usd)
risk_target = min(float(cfg.sizing.kelly_fraction) * float(capital_usd), cap_pertrade_usd)
n_contracts = max(0, min(int(risk_target // width_usd), cfg.sizing.max_contracts_per_trade))
if n_contracts == 0:
return None
expiry = pick.timestamp + timedelta(days=dte)
return BacktestEntry(
timestamp=pick.timestamp,
spread_type="bull_put",
spot_at_entry=Decimal(str(spot)),
dvol_at_entry=Decimal(str(dvol)),
short_strike=Decimal(str(round(short_strike, 2))),
long_strike=Decimal(str(round(long_strike, 2))),
expiry=expiry,
credit_received_eth=Decimal(str(credit_eth)),
credit_received_usd=Decimal(str(credit_usd * n_contracts)),
n_contracts=n_contracts,
)
def _max_drawdown_usd(equity: list[Decimal]) -> tuple[Decimal, Decimal]:
"""Return ``(max_dd_usd, max_dd_pct_of_peak)`` over an equity curve."""
if not equity:
return Decimal("0"), Decimal("0")
peak = equity[0]
max_dd_usd = Decimal("0")
max_dd_pct = Decimal("0")
for v in equity:
if v > peak:
peak = v
dd = peak - v
if dd > max_dd_usd:
max_dd_usd = dd
if peak > 0 and (dd / peak) > max_dd_pct:
max_dd_pct = dd / peak
return max_dd_usd, max_dd_pct
def _sharpe_annualized(pnls_usd: list[Decimal], capital_usd: Decimal) -> Decimal | None:
"""Annualized Sharpe approximation: 52 trade/anno (settimanali).
Restituisce ``None`` se ci sono <5 trade o stdev = 0.
"""
if len(pnls_usd) < 5 or capital_usd <= 0:
return None
rets = [float(p / capital_usd) for p in pnls_usd]
mean = sum(rets) / len(rets)
var = sum((r - mean) ** 2 for r in rets) / max(1, (len(rets) - 1))
std = math.sqrt(var)
if std == 0:
return None
sharpe = mean / std * math.sqrt(52)
return Decimal(str(round(sharpe, 3)))
def run_backtest(
snapshots: list[MarketSnapshotRecord],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
eur_to_usd: Decimal = Decimal("1.075"),
asset: str = "ETH",
) -> BacktestReport:
"""Esegue il backtest end-to-end sui ``snapshots`` ETH ordinati per ts."""
snapshots = sorted(snapshots, key=lambda s: s.timestamp)
eth_snapshots = [s for s in snapshots if s.asset.upper() == asset.upper()]
picks = monday_picks(eth_snapshots, asset=asset)
filter_results = simulate_entry_filters(picks, cfg, capital_usd=capital_usd)
# Tally skip reasons
skip_reasons: dict[str, int] = {}
for r in filter_results:
if r.accepted:
continue
for reason in r.reasons:
skip_reasons[reason] = skip_reasons.get(reason, 0) + 1
trades: list[CompletedTrade] = []
for r in filter_results:
if not r.accepted:
continue
entry = _build_entry_from_pick(
r.pick, cfg, capital_usd=capital_usd, eur_to_usd=eur_to_usd,
)
if entry is None:
skip_reasons["sizing_or_ratio"] = skip_reasons.get("sizing_or_ratio", 0) + 1
continue
future = [s for s in eth_snapshots if s.timestamp > r.pick.timestamp]
exit_ = simulate_position_outcome(entry, future, cfg)
trades.append(CompletedTrade(entry=entry, exit=exit_))
pnls = [t.exit.pnl_usd for t in trades]
cumulative = sum(pnls, start=Decimal("0"))
n_winners = sum(1 for p in pnls if p > 0)
win_rate = (
Decimal(n_winners) / Decimal(len(pnls))
if pnls
else Decimal("0")
)
# Equity curve in USD assoluti
equity = [capital_usd]
for p in pnls:
equity.append(equity[-1] + p)
max_dd_usd, max_dd_pct = _max_drawdown_usd(equity)
return BacktestReport(
n_picks=len(picks),
n_accepted=sum(1 for r in filter_results if r.accepted),
n_skipped_data=sum(1 for r in filter_results if r.skipped_for_data),
n_completed=len(trades),
n_winners=n_winners,
win_rate=win_rate,
cumulative_pnl_usd=cumulative,
cumulative_pnl_pct_of_capital=(
cumulative / capital_usd if capital_usd > 0 else Decimal("0")
),
max_drawdown_usd=max_dd_usd,
max_drawdown_pct=max_dd_pct,
sharpe_annualized=_sharpe_annualized(pnls, capital_usd),
skip_reasons=skip_reasons,
trades=trades,
)
+27 -3
View File
@@ -83,26 +83,49 @@ def _pick_expiry(
return min(candidates, key=lambda exp: abs(candidates[exp] - sc.dte_target))
def _resolve_delta_band(
sc: object, dvol_now: Decimal | None
) -> tuple[Decimal, Decimal, Decimal]:
"""Return (delta_target, delta_min, delta_max) per il regime DVOL corrente.
Quando ``sc.delta_by_dvol`` è popolato e ``dvol_now`` è disponibile,
sceglie la prima banda (ordinata ascending sulla ``dvol_under``) il
cui ``dvol_under ≥ dvol_now``. Altrimenti torna ai valori statici di
``sc``.
"""
bands = list(getattr(sc, "delta_by_dvol", []) or [])
if dvol_now is not None and bands:
bands_sorted = sorted(bands, key=lambda b: b.dvol_under)
for band in bands_sorted:
if dvol_now <= band.dvol_under:
return band.delta_target, band.delta_min, band.delta_max
last = bands_sorted[-1]
return last.delta_target, last.delta_min, last.delta_max
return sc.delta_target, sc.delta_min, sc.delta_max
def _select_short(
quotes: list[OptionQuote],
*,
spot: Decimal,
cfg: StrategyConfig,
dvol_now: Decimal | None = None,
) -> OptionQuote | None:
"""Pick the short-leg quote with delta closest to target inside both bands."""
sc = cfg.structure.short_strike
delta_target, delta_min, delta_max = _resolve_delta_band(sc, dvol_now)
eligible: list[OptionQuote] = []
for q in quotes:
dist = (q.strike - spot).copy_abs() / spot
if not (sc.distance_otm_pct_min <= dist <= sc.distance_otm_pct_max):
continue
abs_delta = q.delta.copy_abs()
if not (sc.delta_min <= abs_delta <= sc.delta_max):
if not (delta_min <= abs_delta <= delta_max):
continue
eligible.append(q)
if not eligible:
return None
return min(eligible, key=lambda q: abs(q.delta.copy_abs() - sc.delta_target))
return min(eligible, key=lambda q: abs(q.delta.copy_abs() - delta_target))
def _select_long(
@@ -143,6 +166,7 @@ def select_strikes(
spot: Decimal,
now: datetime,
cfg: StrategyConfig,
dvol_now: Decimal | None = None,
) -> tuple[OptionQuote, OptionQuote] | None:
"""Return the (short, long) quotes for the requested vertical, or ``None``.
@@ -161,7 +185,7 @@ def select_strikes(
if not typed:
return None
short = _select_short(typed, spot=spot, cfg=cfg)
short = _select_short(typed, spot=spot, cfg=cfg, dvol_now=dvol_now)
if short is None:
return None
+18
View File
@@ -28,8 +28,10 @@ __all__ = ["ExitAction", "ExitDecisionResult", "PositionSnapshot", "evaluate"]
ExitAction = Literal[
"HOLD",
"CLOSE_PROFIT",
"CLOSE_PROFIT_PARTIAL",
"CLOSE_STOP",
"CLOSE_VOL",
"CLOSE_VOL_HARVEST",
"CLOSE_TIME",
"CLOSE_DELTA",
"CLOSE_AVERSE",
@@ -115,6 +117,22 @@ def evaluate(snapshot: PositionSnapshot, cfg: StrategyConfig) -> ExitDecisionRes
f"mark {debit}{ec.profit_take_pct_of_credit:.0%} of credit {credit}",
)
# 1bis. Vol-collapse harvest (D): siamo IN profit (debit < credit) e
# la DVOL è scesa di tot punti rispetto all'entry. Edge IV-RV già
# catturato, non c'è motivo di tenere fino a profit_take. Esce
# opportunisticamente quando il regime di vol che giustificava
# l'entry non c'è più.
if (
ec.vol_harvest_dvol_decrease > 0
and debit < credit
and snapshot.dvol_now <= snapshot.dvol_at_entry - ec.vol_harvest_dvol_decrease
):
return _result(
"CLOSE_VOL_HARVEST",
f"DVOL {snapshot.dvol_now} ≤ entry {snapshot.dvol_at_entry} "
f"{ec.vol_harvest_dvol_decrease}, harvest while in profit",
)
# 2. Stop loss
if debit >= stop_thresh:
return _result(
+212 -15
View File
@@ -378,6 +378,44 @@ def _profile_caps(strategy: object | None) -> dict[str, float]:
return out
def _detect_features(strategy: object | None) -> dict[str, bool]:
"""Quali miglioramenti del PR FDAC sono ATTIVI in questa strategia.
- **A** (delta dinamico): `short_strike.delta_by_dvol` non vuoto.
- **D** (vol-harvest): `exit.vol_harvest_dvol_decrease > 0`.
- **F** (auto-pause): `auto_pause.enabled = true`.
- **IV** (IV-richness gate, dal PR precedente): `entry.iv_minus_rv_filter_enabled`.
"""
feats = {"A": False, "D": False, "F": False, "IV": False}
if strategy is None:
return feats
try:
feats["A"] = bool(
getattr(strategy.structure.short_strike, "delta_by_dvol", []) # type: ignore[attr-defined]
)
except Exception:
pass
try:
feats["D"] = (
float(getattr(strategy.exit, "vol_harvest_dvol_decrease", 0)) > 0 # type: ignore[attr-defined]
)
except Exception:
pass
try:
feats["F"] = bool(
getattr(getattr(strategy, "auto_pause", None), "enabled", False)
)
except Exception:
pass
try:
feats["IV"] = bool(
getattr(strategy.entry, "iv_minus_rv_filter_enabled", False) # type: ignore[attr-defined]
)
except Exception:
pass
return feats
def _compute_pl(
caps: dict[str, float],
*,
@@ -386,13 +424,56 @@ def _compute_pl(
win_rate: float,
trades_per_year: int,
eur_to_usd: float = 1.075,
features: dict[str, bool] | None = None,
) -> dict[str, float]:
"""Calcola le metriche P/L per un profilo di sizing."""
"""Calcola le metriche P/L per un profilo di sizing.
Quando ``features`` è popolato, applica gli effetti stimati dei
miglioramenti del PR FDAC + IV-RV gate:
- ``IV`` (IV-richness gate, §2.9): +5 pp win-rate, 25% trade/anno.
- ``A`` (delta dinamico, §3.2): +1.5 pp win-rate, sl_loss × 0.95.
- ``D`` (vol-harvest, §7-bis): 5% delle would-be-loss diventano
harvest exit a +0.20 × credito.
- ``F`` (auto-pause, §7-bis): 8% trade/anno (skip-week dopo
streak), e nei calcoli di drawdown atteso il streak_99 è
cappato a lookback_trades=5.
Effetti **stimati ex-ante** dalla letteratura short-vol systematic;
i valori puntuali andranno calibrati sul dataset accumulato.
"""
feats = features or {}
width = caps["width_pct"] * spot
credit = caps["credit_ratio"] * width
tp_profit = caps["profit_take"] * credit
sl_loss = (caps["stop_mult"] - 1.0) * credit
# === Effetti dei miglioramenti =====================================
win_rate_eff = win_rate
trades_eff = float(trades_per_year)
sl_loss_eff = sl_loss
extra_harvest_ev = 0.0
prob_harvest = 0.0
if feats.get("IV"):
# Skip più aggressivo + qualità migliore: +5 pp win, 25% trade.
win_rate_eff = min(0.95, win_rate_eff + 0.05)
trades_eff *= 0.75
if feats.get("A"):
# Migliore strike picking → +1.5 pp win-rate; riduzione del
# tail della perdita (5%) per le bande high-DVOL.
win_rate_eff = min(0.95, win_rate_eff + 0.015)
sl_loss_eff *= 0.95
if feats.get("D"):
# Vol-harvest: ~5% delle entrate intercettate prima dello stop
# con un piccolo profitto (+0.20×credit). Sottrae lo stesso
# volume dalle prob_loss.
prob_harvest = 0.05
extra_harvest_ev = 0.20 * credit
# F (auto-pause) agisce su streak_99 più sotto, e sul trades_eff.
if feats.get("F"):
trades_eff *= 0.92
cap_pertrade_usd = caps["cap_pertrade_eur"] * eur_to_usd
risk_target = min(caps["kelly"] * capital, cap_pertrade_usd)
n_kelly = int(risk_target // width) if width > 0 else 0
@@ -400,25 +481,24 @@ def _compute_pl(
prob_time_stop = 0.07
prob_other_stop = 0.03
prob_loss = max(0.0, 1.0 - win_rate - prob_time_stop - prob_other_stop)
prob_loss = max(
0.0,
1.0 - win_rate_eff - prob_time_stop - prob_other_stop - prob_harvest,
)
avg_time_stop_pl = 0.10 * credit
e_trade_gross = (
win_rate * tp_profit
- prob_loss * sl_loss
win_rate_eff * tp_profit
- prob_loss * sl_loss_eff
+ prob_time_stop * avg_time_stop_pl
+ prob_harvest * extra_harvest_ev
)
fees = 0.0003 * spot * 2
slippage = 0.03 * credit
e_trade_net = e_trade_gross - fees - slippage
# Multi-posizione concorrente: il P/L scala col numero di posizioni
# aperte simultaneamente (il loop entry crea N trade indipendenti
# quando max_concurrent > 1). Vedi caveat aggressiva.yaml: il
# supporto multi-asset richiede modifiche di codice; questo
# moltiplicatore stima cosa otterresti DOPO.
concurrency = max(1.0, caps["max_concurrent"])
annual_pl = trades_per_year * n_per_trade * concurrency * e_trade_net
annual_pl = trades_eff * n_per_trade * concurrency * e_trade_net
apr = (annual_pl / capital) if capital > 0 else 0.0
# --- Max drawdown -------------------------------------------------
@@ -456,7 +536,7 @@ def _compute_pl(
"width": width,
"credit": credit,
"tp_profit": tp_profit,
"sl_loss": sl_loss,
"sl_loss": sl_loss_eff,
"risk_target": risk_target,
"n_per_trade": float(n_per_trade),
"concurrency": concurrency,
@@ -466,11 +546,14 @@ def _compute_pl(
"fees": fees,
"slippage": slippage,
"prob_loss": prob_loss,
"prob_harvest": prob_harvest,
"streak_99": float(streak_99),
"expected_dd_usd": expected_dd_usd,
"expected_dd_pct": expected_dd_pct,
"tail_dd_usd": tail_dd_usd,
"tail_dd_pct": tail_dd_pct,
"win_rate_eff": win_rate_eff,
"trades_eff": trades_eff,
}
@@ -479,6 +562,8 @@ def _render_profile_card(
caps: dict[str, float],
metrics: dict[str, float],
badge: str,
features: dict[str, bool] | None = None,
metrics_base: dict[str, float] | None = None,
) -> None:
"""Rendering di un profilo (conservativo o aggressivo) in una colonna."""
st.markdown(f"### {label} {badge}")
@@ -488,23 +573,58 @@ def _render_profile_card(
f"max {caps['max_n']:.0f} contratti × "
f"{caps['max_concurrent']:.0f} pos. concorrenti"
)
if features:
active = [k for k, v in features.items() if v]
if active:
st.caption(
"🟢 Miglioramenti attivi: "
+ " · ".join(
{
"IV": "**IV-RV gate**",
"A": "**A** delta dinamico",
"D": "**D** vol-harvest",
"F": "**F** auto-pause",
}.get(k, k)
for k in active
)
)
else:
st.caption("⚪ Nessun miglioramento attivo (formula base)")
cols = st.columns(2)
cols[0].metric("Contratti per trade", f"{metrics['n_per_trade']:.0f}")
cols[1].metric("Posizioni concorrenti", f"{metrics['concurrency']:.0f}")
cols = st.columns(2)
e_delta = (
f"{metrics['e_trade_net'] - metrics_base['e_trade_net']:+.1f}"
if metrics_base
else None
)
pl_delta = (
f"{metrics['annual_pl'] - metrics_base['annual_pl']:+.0f} USD vs base"
if metrics_base
else f"{metrics['apr']:+.1%} APR"
)
cols[0].metric(
"E[trade] netto",
f"{metrics['e_trade_net']:+.1f} USD",
delta=e_delta,
help=(
f"fees={metrics['fees']:.2f} USD, "
f"slippage={metrics['slippage']:.2f} USD"
f"win_rate effettivo={metrics['win_rate_eff']:.0%}, "
f"prob_loss={metrics['prob_loss']:.0%}, "
f"trade/anno={metrics['trades_eff']:.0f}"
),
)
cols[1].metric(
"P/L annuo stimato",
f"{metrics['annual_pl']:+.0f} USD",
delta=f"{metrics['apr']:+.1%} APR",
delta=f"{metrics['apr']:+.1%} APR" + (
f" ({metrics['annual_pl'] - metrics_base['annual_pl']:+.0f} vs base)"
if metrics_base
else ""
),
)
cols = st.columns(2)
@@ -577,12 +697,45 @@ def _render_pl_panel(
cons_caps = _profile_caps(strategy_conservativa or strategy_main)
aggr_caps = _profile_caps(strategy_aggressiva)
cons_feats = _detect_features(strategy_conservativa or strategy_main)
aggr_feats = _detect_features(strategy_aggressiva)
apply_features = st.checkbox(
"Applica gli effetti dei miglioramenti FDAC + IV-RV gate "
"letti dai due `strategy.*.yaml`",
value=True,
help=(
"Quando ON, ogni colonna applica gli effetti stimati delle "
"feature attive nel rispettivo profilo. OFF = formula base "
"(senza miglioramenti) per confronto pulito."
),
)
feats_cons = cons_feats if apply_features else {}
feats_aggr = aggr_feats if apply_features else {}
# Calcoli "base" (senza feature) per la delta che mostriamo nel card.
cons_base = _compute_pl(
cons_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
)
aggr_base = _compute_pl(
aggr_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
)
cons = _compute_pl(
cons_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
features=feats_cons,
)
aggr = _compute_pl(
aggr_caps,
@@ -590,6 +743,7 @@ def _render_pl_panel(
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
features=feats_aggr,
)
col_cons, col_aggr = st.columns(2)
@@ -598,7 +752,9 @@ def _render_pl_panel(
"🛡️ Conservativa",
cons_caps,
cons,
"_(golden config v1.0.0)_",
"_(golden config v1.2.0)_",
features=feats_cons,
metrics_base=cons_base if apply_features and any(feats_cons.values()) else None,
)
with col_aggr:
_render_profile_card(
@@ -606,6 +762,8 @@ def _render_pl_panel(
aggr_caps,
aggr,
"_(deroga §11, richiede paper trading)_",
features=feats_aggr,
metrics_base=aggr_base if apply_features and any(feats_aggr.values()) else None,
)
if aggr["annual_pl"] > 0 and cons["annual_pl"] > 0:
@@ -626,6 +784,43 @@ def _render_pl_panel(
"viable."
)
# === Mini-tabella: contributo marginale di ogni feature =====
if apply_features and (any(feats_cons.values()) or any(feats_aggr.values())):
st.markdown("**Contributo marginale di ogni feature** (profilo aggressivo)")
contrib_rows = []
for label, key in [
("IV — IV-richness gate", "IV"),
("A — Delta dinamico", "A"),
("D — Vol-harvest", "D"),
("F — Auto-pause", "F"),
]:
single_feat = {key: True}
m = _compute_pl(
aggr_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
features=single_feat,
)
delta_pl = m["annual_pl"] - aggr_base["annual_pl"]
delta_apr = m["apr"] - aggr_base["apr"]
active = "" if aggr_feats.get(key) else ""
contrib_rows.append(
{
"Feature": label,
"Attiva nel YAML": active,
"ΔP/L annuo (solo questa)": f"{delta_pl:+.0f} USD",
"ΔAPR": f"{delta_apr:+.1%}",
}
)
st.table(contrib_rows)
st.caption(
"Ogni riga mostra il contributo del SINGOLO feature (le altre "
"spente). Effetti stimati ex-ante; calibrabili sui dati "
"raccolti via `📐 Calibrazione`."
)
# Sensibilità win-rate per il profilo aggressivo (più informativo)
st.markdown("**Sensibilità al win rate** (profilo aggressivo)")
sens_rows = []
@@ -636,6 +831,7 @@ def _render_pl_panel(
spot=spot,
win_rate=wr,
trades_per_year=trades_per_year,
features=feats_aggr,
)
m_c = _compute_pl(
cons_caps,
@@ -643,6 +839,7 @@ def _render_pl_panel(
spot=spot,
win_rate=wr,
trades_per_year=trades_per_year,
features=feats_cons,
)
sens_rows.append(
{
+175
View File
@@ -0,0 +1,175 @@
"""Auto-pause circuit breaker (§7-bis F).
Pure-function evaluation that consults `system_state.auto_pause_until`
and the rolling P/L of the last N closed positions to decide whether
the engine should skip an entry cycle.
Two responsibilities, both deterministic at call time:
* :func:`is_paused` — returns ``True`` when the persisted
``auto_pause_until`` is in the future. Independent from the kill
switch, which targets technical errors.
* :func:`evaluate_drawdown_breach` — given the last N closed P/Ls and
the current capital, returns whether the rolling drawdown breached
the configured ``max_drawdown_pct`` threshold. The orchestrator
layer is the one that flips the persisted state on breach (this
module stays I/O-free for testability).
The two are separated on purpose: ``is_paused`` is the cheap,
read-only gate consulted at the start of every entry cycle; the
breach evaluation runs once per cycle right after the entry
filtering, before the entry is actually placed.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from decimal import Decimal
from cerbero_bite.config.schema import AutoPauseConfig
from cerbero_bite.state.models import SystemStateRecord
__all__ = [
"AutoPauseDecision",
"PauseStatus",
"evaluate_drawdown_breach",
"is_paused",
"pause_until",
]
@dataclass(frozen=True)
class PauseStatus:
"""Snapshot del flag di auto-pausa al momento della valutazione."""
paused: bool
until: datetime | None
reason: str | None
@dataclass(frozen=True)
class AutoPauseDecision:
"""Esito di :func:`evaluate_drawdown_breach`."""
should_pause: bool
cumulative_pnl_usd: Decimal
drawdown_pct: Decimal
threshold_pct: Decimal
reason: str | None
def is_paused(
state: SystemStateRecord | None, *, now: datetime
) -> PauseStatus:
"""Restituisce lo stato della pausa rispetto a ``now``.
``state == None`` o ``auto_pause_until == None`` o
``auto_pause_until <= now`` ⇒ engine attivo.
"""
if state is None or state.auto_pause_until is None:
return PauseStatus(paused=False, until=None, reason=None)
until = state.auto_pause_until
if until.tzinfo is not None and now.tzinfo is None:
# Coerenza: se il valore persistito è tz-aware, normalizziamo.
return PauseStatus(
paused=until > now.replace(tzinfo=until.tzinfo),
until=until,
reason=state.auto_pause_reason,
)
return PauseStatus(
paused=until > now,
until=until,
reason=state.auto_pause_reason,
)
def pause_until(now: datetime, weeks: int) -> datetime:
"""Calcola la scadenza della pausa (``now + weeks``).
Estratto in funzione separata per facilitare i test e per ricordare
che la pausa è espressa in **settimane** (la strategia ha cron
settimanale; pause più corte non avrebbero modo di evitare una
settimana di entry).
"""
return now + timedelta(weeks=max(1, weeks))
def evaluate_drawdown_breach(
*,
cfg: AutoPauseConfig,
recent_pnl_usd: list[Decimal],
capital_usd: Decimal,
) -> AutoPauseDecision:
"""Decide se la pausa va armata ora dato il rolling P/L.
Regola: se la somma dei P/L delle ultime ``cfg.lookback_trades``
posizioni chiuse è negativa e in valore assoluto eccede
``cfg.max_drawdown_pct × capital_usd``, ritorna
``should_pause=True``. Tutte le altre condizioni → False.
``cfg.enabled=False`` → ritorna sempre False (filtro disabilitato).
Lookback insufficiente → ritorna False (non scattiamo finché non
abbiamo abbastanza storia per giudicare).
"""
threshold_pct = cfg.max_drawdown_pct
cumulative = sum((p for p in recent_pnl_usd), start=Decimal("0"))
if not cfg.enabled:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
if len(recent_pnl_usd) < cfg.lookback_trades:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
if capital_usd <= 0:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
# Solo perdite ci interessano: vincite cumulate non scattano la pausa.
if cumulative >= 0:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=cumulative / capital_usd,
threshold_pct=threshold_pct,
reason=None,
)
drawdown_pct = (-cumulative) / capital_usd
if drawdown_pct >= threshold_pct:
return AutoPauseDecision(
should_pause=True,
cumulative_pnl_usd=cumulative,
drawdown_pct=drawdown_pct,
threshold_pct=threshold_pct,
reason=(
f"rolling DD {drawdown_pct:.2%}{threshold_pct:.2%} "
f"(last {cfg.lookback_trades} trades, "
f"cumulative {cumulative} USD)"
),
)
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=drawdown_pct,
threshold_pct=threshold_pct,
reason=None,
)
+68 -1
View File
@@ -38,6 +38,7 @@ from cerbero_bite.core.entry_validator import (
from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check
from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts
from cerbero_bite.core.types import OptionQuote
from cerbero_bite.runtime import auto_pause as auto_pause_module
from cerbero_bite.runtime.alert_manager import AlertManager
from cerbero_bite.runtime.dependencies import RuntimeContext
from cerbero_bite.state import (
@@ -64,6 +65,7 @@ _STATUS_NO_ENTRY = "no_entry"
_STATUS_BROKER_REJECT = "broker_reject"
_STATUS_KILL_SWITCH = "kill_switch_armed"
_STATUS_HAS_OPEN = "has_open_position"
_STATUS_AUTO_PAUSED = "auto_paused"
@dataclass(frozen=True)
@@ -342,6 +344,28 @@ async def run_entry_cycle(
)
return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch")
# §7-bis (F): auto-pause circuit breaker. Read-only consultation
# of the persisted state — the breach evaluation runs later, after
# capital is known.
conn = connect_state(ctx.db_path)
try:
sys_state = ctx.repository.get_system_state(conn)
finally:
conn.close()
pause_status = auto_pause_module.is_paused(sys_state, now=when)
if pause_status.paused:
await alert.low(
source="entry_cycle",
message=(
f"auto-paused until {pause_status.until} "
f"({pause_status.reason or 'no reason'}) — skipping"
),
)
return EntryCycleResult(
status=_STATUS_AUTO_PAUSED,
reason=pause_status.reason or "auto_paused",
)
# Has open position?
conn = connect_state(ctx.db_path)
try:
@@ -364,6 +388,44 @@ async def run_entry_cycle(
)
capital_usd = snap.portfolio_eur * eur_to_usd_rate
# §7-bis (F): rolling drawdown breach evaluation. Se le ultime N
# posizioni chiuse hanno cumulato perdite oltre la soglia, armiamo
# la pausa e usciamo subito (l'entry di questo ciclo è saltata).
auto_cfg = cfg.auto_pause
if auto_cfg.enabled:
conn = connect_state(ctx.db_path)
try:
recent_pnls = ctx.repository.recent_closed_position_pnls_usd(
conn, limit=auto_cfg.lookback_trades
)
finally:
conn.close()
breach = auto_pause_module.evaluate_drawdown_breach(
cfg=auto_cfg,
recent_pnl_usd=recent_pnls,
capital_usd=capital_usd,
)
if breach.should_pause:
until = auto_pause_module.pause_until(when, auto_cfg.pause_weeks)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.set_auto_pause(
conn, until=until, reason=breach.reason
)
finally:
conn.close()
await alert.high(
source="entry_cycle",
message=(
f"auto-pause armed: {breach.reason} — paused until {until}"
),
)
return EntryCycleResult(
status=_STATUS_AUTO_PAUSED,
reason=breach.reason or "auto_paused",
)
# 2. Entry filters
entry_ctx = EntryContext(
capital_usd=capital_usd,
@@ -460,7 +522,12 @@ async def run_entry_cycle(
)
quotes = await _build_quotes(ctx.deribit, chain_meta)
selection = select_strikes(
chain=quotes, bias=bias, spot=snap.spot_eth_usd, now=when, cfg=cfg
chain=quotes,
bias=bias,
spot=snap.spot_eth_usd,
now=when,
cfg=cfg,
dvol_now=snap.dvol, # §3.2 (A) — strike picker dipendente dal regime DVOL
)
if selection is None:
await _record_decision(
@@ -0,0 +1,185 @@
"""Periodic option-chain snapshot collector (§13).
Fetches the Deribit option chain for every strike entro la finestra
DTE configurata, prima del trigger entry settimanale (cron
``55 13 * * MON`` di default). Persiste un quote per ogni strumento
in ``option_chain_snapshots`` con un timestamp condiviso, che diventa
il dato di base per:
* il backtest non-stilizzato (vedi ``core/backtest.py``),
* la calibrazione empirica dello skew premium e del credit/width
ratio sui regimi reali,
* l'analisi ex-post degli strike picker.
Il collector è **best-effort**: se ``get_tickers`` fallisce per un
batch, gli altri batch passano comunque; se manca completamente la
chain, il job ritorna 0 senza alzare eccezioni e logga il problema.
Non chiama l'order book per ogni strike (sarebbe troppo costoso) —
``book_depth_top3`` resta NULL nel quote, il liquidity gate del live
lo legge al volo solo per gli strike che gli interessano.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import TYPE_CHECKING, Any
from cerbero_bite.state import connect, transaction
from cerbero_bite.state.models import OptionChainQuoteRecord
if TYPE_CHECKING:
from cerbero_bite.runtime.dependencies import RuntimeContext
__all__ = ["DEFAULT_BATCH_SIZE", "collect_option_chain_snapshot"]
_log = logging.getLogger("cerbero_bite.runtime.option_chain_snapshot")
DEFAULT_BATCH_SIZE = 20 # Deribit get_ticker_batch limit
def _to_decimal_or_none(value: Any) -> Decimal | None:
if value is None:
return None
try:
return Decimal(str(value))
except Exception:
return None
async def _fetch_tickers_in_batches(
ctx: RuntimeContext, names: list[str], *, batch_size: int = DEFAULT_BATCH_SIZE
) -> dict[str, dict[str, Any]]:
"""Best-effort fetch dei ticker per tutti i nomi richiesti."""
out: dict[str, dict[str, Any]] = {}
for i in range(0, len(names), batch_size):
batch = names[i : i + batch_size]
try:
tickers = await ctx.deribit.get_tickers(batch)
except Exception as exc:
_log.warning(
"get_tickers failed for batch starting %s: %s",
batch[0] if batch else "<empty>", exc,
)
continue
for t in tickers:
name = t.get("instrument_name") or t.get("instrument")
if isinstance(name, str):
out[name] = t
return out
async def collect_option_chain_snapshot(
ctx: RuntimeContext,
*,
asset: str = "ETH",
now: datetime | None = None,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> int:
"""Collect + persist a single chain snapshot for ``asset``. Returns
the number of quotes persisted (0 on best-effort failure).
Filtra le scadenze nella finestra ``[dte_min, dte_max]`` di
``cfg.structure`` per non sprecare richieste su scadenze che il
rule engine non userebbe mai.
"""
when = (now or datetime.now(UTC)).astimezone(UTC)
cfg = ctx.cfg
expiry_from = when + timedelta(days=cfg.structure.dte_min)
expiry_to = when + timedelta(days=cfg.structure.dte_max)
try:
chain = await ctx.deribit.options_chain(
currency=asset.upper(),
expiry_from=expiry_from,
expiry_to=expiry_to,
min_open_interest=int(cfg.liquidity.open_interest_min),
)
except Exception:
_log.exception("option chain fetch failed")
return 0
if not chain:
_log.info("option chain empty for %s in window", asset)
return 0
names = [meta.name for meta in chain]
tickers = await _fetch_tickers_in_batches(ctx, names, batch_size=batch_size)
quotes: list[OptionChainQuoteRecord] = []
for meta in chain:
ticker = tickers.get(meta.name)
if ticker is None:
# Lasciamo comunque la riga senza quote: utile sapere
# che lo strumento esisteva.
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
)
)
continue
greeks = ticker.get("greeks") or {}
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
bid=_to_decimal_or_none(ticker.get("bid")),
ask=_to_decimal_or_none(ticker.get("ask")),
mid=_to_decimal_or_none(ticker.get("mark_price")),
iv=_to_decimal_or_none(ticker.get("mark_iv")),
delta=_to_decimal_or_none(greeks.get("delta")),
gamma=_to_decimal_or_none(greeks.get("gamma")),
theta=_to_decimal_or_none(greeks.get("theta")),
vega=_to_decimal_or_none(greeks.get("vega")),
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
volume_24h=(
int(ticker["volume_24h"])
if ticker.get("volume_24h") is not None
else None
),
# book_depth_top3: NULL — non lo prendiamo per ogni
# strike per non saturare l'API. Il liquidity gate
# del live lo chiede on-the-fly per gli strike
# candidati al picker.
)
)
persisted = 0
try:
conn = connect(ctx.db_path)
try:
with transaction(conn):
persisted = ctx.repository.record_option_chain_snapshot(
conn, quotes
)
finally:
conn.close()
except Exception:
_log.exception("persist option chain snapshot failed")
return 0
_log.info("option_chain_snapshot persisted %d quote(s)", persisted)
return persisted
# Avoid unused import warning for asyncio in lint when only used as type
_ = asyncio
+21
View File
@@ -34,6 +34,9 @@ from cerbero_bite.runtime.market_snapshot_cycle import (
DEFAULT_ASSETS,
collect_market_snapshot,
)
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
@@ -53,6 +56,7 @@ _CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry
_BACKUP_RETENTION_DAYS = 30
@@ -217,6 +221,8 @@ class Orchestrator:
manual_actions_cron: str = _CRON_MANUAL_ACTIONS,
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler:
@@ -282,6 +288,14 @@ class Orchestrator:
await _safe("market_snapshot", _do)
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
await _safe("option_chain_snapshot", _do)
jobs: list[JobSpec] = [
JobSpec(name="health", cron=health_cron, coro_factory=_health),
JobSpec(name="backup", cron=backup_cron, coro_factory=_backup),
@@ -309,6 +323,13 @@ class Orchestrator:
coro_factory=_market_snapshot,
)
)
jobs.append(
JobSpec(
name="option_chain_snapshot",
cron=option_chain_cron,
coro_factory=_option_chain_snapshot,
)
)
else:
_log.warning(
"data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS="
@@ -0,0 +1,14 @@
-- 0004_auto_pause.sql — circuit breaker su drawdown rolling (§7-bis F)
--
-- Aggiunge alla `system_state` il timestamp fino a cui l'engine è in
-- pausa automatica per via di un drawdown sopra soglia. NULL = engine
-- attivo. Quando il valore è nel futuro, il rule engine salta il
-- ciclo entry e logga la motivazione.
--
-- Indipendente dal kill_switch (che resta dedicato a errori tecnici
-- e a comandi manuali esplicitati). Le due tutele coesistono.
ALTER TABLE system_state ADD COLUMN auto_pause_until TEXT;
ALTER TABLE system_state ADD COLUMN auto_pause_reason TEXT;
PRAGMA user_version = 4;
@@ -0,0 +1,42 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
-- in quello stesso tick condividono il timestamp, così filtrare per
-- "lo snapshot del 2026-05-04 alle 13:55" è una semplice
-- WHERE timestamp = X.
CREATE TABLE option_chain_snapshots (
timestamp TEXT NOT NULL,
asset TEXT NOT NULL,
instrument_name TEXT NOT NULL,
strike TEXT NOT NULL,
expiry TEXT NOT NULL,
option_type TEXT NOT NULL CHECK (option_type IN ('C','P')),
bid TEXT,
ask TEXT,
mid TEXT,
iv TEXT,
delta TEXT,
gamma TEXT,
theta TEXT,
vega TEXT,
open_interest INTEGER,
volume_24h INTEGER,
book_depth_top3 INTEGER,
PRIMARY KEY (timestamp, instrument_name)
) WITHOUT ROWID;
CREATE INDEX idx_option_chain_asset_ts
ON option_chain_snapshots(asset, timestamp DESC);
CREATE INDEX idx_option_chain_expiry
ON option_chain_snapshots(asset, expiry);
PRAGMA user_version = 5;
+33
View File
@@ -22,6 +22,7 @@ __all__ = [
"InstructionRecord",
"ManualAction",
"MarketSnapshotRecord",
"OptionChainQuoteRecord",
"PositionRecord",
"PositionStatus",
"SystemStateRecord",
@@ -148,6 +149,36 @@ class MarketSnapshotRecord(BaseModel):
fetch_errors_json: str | None = None
class OptionChainQuoteRecord(BaseModel):
"""Row of the ``option_chain_snapshots`` table.
One row per (snapshot_ts, instrument) — the same ``timestamp`` is
shared by every quote prelevato nello stesso tick del cron. Tutti
i campi numerici sono opzionali perché il collector è
best-effort: un ticker mancante non invalida il resto della chain.
"""
model_config = ConfigDict(extra="forbid")
timestamp: datetime
asset: str
instrument_name: str
strike: Decimal
expiry: datetime
option_type: Literal["C", "P"]
bid: Decimal | None = None
ask: Decimal | None = None
mid: Decimal | None = None
iv: Decimal | None = None
delta: Decimal | None = None
gamma: Decimal | None = None
theta: Decimal | None = None
vega: Decimal | None = None
open_interest: int | None = None
volume_24h: int | None = None
book_depth_top3: int | None = None
class ManualAction(BaseModel):
"""Row of the ``manual_actions`` table."""
@@ -184,3 +215,5 @@ class SystemStateRecord(BaseModel):
config_version: str
started_at: datetime
last_audit_hash: str | None = None
auto_pause_until: datetime | None = None
auto_pause_reason: str | None = None
+177
View File
@@ -24,6 +24,7 @@ from cerbero_bite.state.models import (
InstructionRecord,
ManualAction,
MarketSnapshotRecord,
OptionChainQuoteRecord,
PositionRecord,
PositionStatus,
SystemStateRecord,
@@ -407,6 +408,103 @@ class Repository:
).fetchall()
return [_row_to_market_snapshot(r) for r in rows]
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
def record_option_chain_snapshot(
self,
conn: sqlite3.Connection,
quotes: list[OptionChainQuoteRecord],
) -> int:
"""Bulk-insert dei quote di un singolo tick. Tutti i quote
condividono lo stesso ``timestamp``. Idempotente per
(timestamp, instrument_name)."""
if not quotes:
return 0
rows = [
(
_enc_dt(q.timestamp),
q.asset,
q.instrument_name,
_enc_dec(q.strike),
_enc_dt(q.expiry),
q.option_type,
_enc_dec(q.bid),
_enc_dec(q.ask),
_enc_dec(q.mid),
_enc_dec(q.iv),
_enc_dec(q.delta),
_enc_dec(q.gamma),
_enc_dec(q.theta),
_enc_dec(q.vega),
q.open_interest,
q.volume_24h,
q.book_depth_top3,
)
for q in quotes
]
conn.executemany(
"INSERT OR REPLACE INTO option_chain_snapshots("
"timestamp, asset, instrument_name, strike, expiry, option_type, "
"bid, ask, mid, iv, delta, gamma, theta, vega, "
"open_interest, volume_24h, book_depth_top3) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
rows,
)
return len(rows)
def list_option_chain_snapshots(
self,
conn: sqlite3.Connection,
*,
asset: str,
start: datetime | None = None,
end: datetime | None = None,
expiry_from: datetime | None = None,
expiry_to: datetime | None = None,
limit: int = 50000,
) -> list[OptionChainQuoteRecord]:
clauses: list[str] = ["asset = ?"]
params: list[Any] = [asset]
if start is not None:
clauses.append("timestamp >= ?")
params.append(_enc_dt(start))
if end is not None:
clauses.append("timestamp <= ?")
params.append(_enc_dt(end))
if expiry_from is not None:
clauses.append("expiry >= ?")
params.append(_enc_dt(expiry_from))
if expiry_to is not None:
clauses.append("expiry <= ?")
params.append(_enc_dt(expiry_to))
params.append(int(limit))
rows = conn.execute(
f"SELECT * FROM option_chain_snapshots "
f"WHERE {' AND '.join(clauses)} "
f"ORDER BY timestamp DESC, instrument_name ASC LIMIT ?",
params,
).fetchall()
return [_row_to_option_chain_quote(r) for r in rows]
def latest_option_chain_timestamp(
self,
conn: sqlite3.Connection,
*,
asset: str,
) -> datetime | None:
"""Timestamp dell'ultimo snapshot raccolto per ``asset``,
utile per stimare la freschezza del dato dalla GUI."""
row = conn.execute(
"SELECT timestamp FROM option_chain_snapshots "
"WHERE asset = ? ORDER BY timestamp DESC LIMIT 1",
(asset,),
).fetchone()
if row is None:
return None
return _dec_dt(row["timestamp"])
# ------------------------------------------------------------------
# manual_actions
# ------------------------------------------------------------------
@@ -488,6 +586,16 @@ class Repository:
last_audit_hash=(
row["last_audit_hash"] if "last_audit_hash" in keys else None
),
auto_pause_until=(
_dec_dt(row["auto_pause_until"])
if "auto_pause_until" in keys
else None
),
auto_pause_reason=(
row["auto_pause_reason"]
if "auto_pause_reason" in keys
else None
),
)
def set_last_audit_hash(
@@ -526,6 +634,43 @@ class Repository:
(_enc_dt(now),),
)
def set_auto_pause(
self,
conn: sqlite3.Connection,
*,
until: datetime | None,
reason: str | None,
) -> None:
"""Imposta o azzera la pausa automatica (§7-bis F).
``until = None`` annulla la pausa (l'engine torna attivo).
Il setter è idempotente: chiamarlo con un until già nel passato
è equivalente a clear.
"""
conn.execute(
"UPDATE system_state SET auto_pause_until = ?, "
"auto_pause_reason = ? WHERE id = 1",
(_enc_dt(until) if until is not None else None, reason),
)
def recent_closed_position_pnls_usd(
self, conn: sqlite3.Connection, *, limit: int
) -> list[Decimal]:
"""Ritorna la lista dei pnl_usd delle ultime ``limit`` posizioni chiuse,
ordinate dalla più recente alla più vecchia. Posizioni con
``pnl_usd`` ``NULL`` (es. chiuse di emergenza senza P/L noto)
sono saltate. Usato dal circuit breaker §7-bis F.
"""
if limit <= 0:
return []
rows = conn.execute(
"SELECT pnl_usd FROM positions "
"WHERE closed_at IS NOT NULL AND pnl_usd IS NOT NULL "
"ORDER BY closed_at DESC LIMIT ?",
(limit,),
).fetchall()
return [Decimal(row["pnl_usd"]) for row in rows]
# ---------------------------------------------------------------------------
# Row → model converters
@@ -645,6 +790,38 @@ def _row_to_market_snapshot(row: sqlite3.Row) -> MarketSnapshotRecord:
)
def _row_to_option_chain_quote(row: sqlite3.Row) -> OptionChainQuoteRecord:
return OptionChainQuoteRecord(
timestamp=_dec_dt_required(row["timestamp"]),
asset=row["asset"],
instrument_name=row["instrument_name"],
strike=_dec_dec_required(row["strike"]),
expiry=_dec_dt_required(row["expiry"]),
option_type=row["option_type"],
bid=_dec_dec(row["bid"]),
ask=_dec_dec(row["ask"]),
mid=_dec_dec(row["mid"]),
iv=_dec_dec(row["iv"]),
delta=_dec_dec(row["delta"]),
gamma=_dec_dec(row["gamma"]),
theta=_dec_dec(row["theta"]),
vega=_dec_dec(row["vega"]),
open_interest=(
int(row["open_interest"])
if row["open_interest"] is not None
else None
),
volume_24h=(
int(row["volume_24h"]) if row["volume_24h"] is not None else None
),
book_depth_top3=(
int(row["book_depth_top3"])
if row["book_depth_top3"] is not None
else None
),
)
def _dec_dec_required(value: Any) -> Decimal:
out = _dec_dec(value)
if out is None:
+28 -7
View File
@@ -28,8 +28,8 @@
# 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante**
# di cosa otterresti DOPO quel lavoro di codice.
config_version: "1.1.0-aggressiva"
config_hash: "58086a4afbbf36c48d22f39bbc75d8145e76a063917431793d3b92ae76b5eb68"
config_version: "1.3.0-aggressiva"
config_hash: "e983e156bf0c270941765e7b9639a35fdc6de7b091076bf5a9b360e294e81e4c"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -65,14 +65,11 @@ entry:
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9) — abilitato con soglia 3 pt vol.
# Coerente con il profilo aggressivo: size più grande pretende
# win-rate più alto. La soglia 3 va alzata a 5 dopo la
# calibrazione (4-8 settimane di dati raccolti).
# IV richness gate (§2.9) — abilitato a 3 pt vol per profilo aggressivo.
iv_minus_rv_min: "3"
iv_minus_rv_filter_enabled: true
structure:
dte_target: 18
dte_min: 14
@@ -85,6 +82,13 @@ structure:
distance_otm_pct_min: "0.15"
distance_otm_pct_max: "0.25"
# §3.2 (A): step-function delta-target per regime DVOL.
# DVOL bassa (≤50) → più premio; alta (>70) → più safety.
delta_by_dvol:
- {dvol_under: "50", delta_target: "0.15", delta_min: "0.13", delta_max: "0.17"}
- {dvol_under: "70", delta_target: "0.12", delta_min: "0.10", delta_max: "0.15"}
- {dvol_under: "90", delta_target: "0.10", delta_min: "0.08", delta_max: "0.12"}
spread_width:
target_pct_of_spot: "0.04"
min_pct_of_spot: "0.03"
@@ -123,6 +127,14 @@ exit:
delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05"
# §7-bis (D): vol-harvest abilitato a 15 punti vol di crollo.
vol_harvest_dvol_decrease: "15"
# §7.1bis (C): scala graduata di profit-take. Pipeline runtime
# non ancora attiva; tenuta vuota fino al merge della
# partial-close pipeline.
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30
@@ -131,6 +143,15 @@ exit:
- "CLOSE_VOL"
- "CLOSE_DELTA"
# §7-bis (F): circuit breaker abilitato. Soglia 15% (più tollerante
# del default conservativo perché la size aggressiva ha volatilità
# attesa più alta).
auto_pause:
enabled: true
lookback_trades: 5
max_drawdown_pct: "0.15"
pause_weeks: 2
execution:
environment: "testnet"
eur_to_usd: "1.075"
+13 -4
View File
@@ -15,8 +15,8 @@
# cerbero-bite config hash --file strategy.conservativa.yaml
# e bumpare config_version.
config_version: "1.1.0-conservativa"
config_hash: "188155fd0017a1353024151b8237f257b0c3156d2592ce89653d239b39fb69ce"
config_version: "1.3.0-conservativa"
config_hash: "900646beb1dd0a7bfaf553f76adb4b55004eff1f094585f779302131625919e8"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -49,11 +49,11 @@ entry:
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9) — disabilitato finché non calibrato.
# IV richness gate (§2.9). Disabilitato di default.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure:
dte_target: 18
dte_min: 14
@@ -104,6 +104,9 @@ exit:
delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05"
vol_harvest_dvol_decrease: "0"
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30
@@ -112,6 +115,12 @@ exit:
- "CLOSE_VOL"
- "CLOSE_DELTA"
auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_weeks: 2
execution:
environment: "testnet"
eur_to_usd: "1.075"
+19 -7
View File
@@ -6,8 +6,8 @@
# config hash), and lands as a separate commit with the motivation in
# the commit message.
config_version: "1.1.0"
config_hash: "e0504e6936e9ec5013e7901cf98532e29ff2414b1cce10461cfe97790119b724"
config_version: "1.3.0"
config_hash: "178a87467707d54d1ffef2d585a3a01be54de5ccc7e23493356eac47fd1c24d8"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -45,14 +45,11 @@ entry:
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9). Disabilitato di default: è il filtro
# con maggior impatto sul win-rate ma va calibrato sui dati
# raccolti in `market_snapshots` prima di metterlo in produzione.
# Vedi `docs/13-strategia-spiegata.md` §4-quater.
# IV richness gate (§2.9). Disabilitato di default.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure:
dte_target: 18
dte_min: 14
@@ -103,6 +100,13 @@ exit:
delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05"
# §7-bis (D): vol-collapse harvest. 0 = disabilitato.
vol_harvest_dvol_decrease: "0"
# §7.1bis (C): scala graduata di profit-take. Vuoto = chiusura
# atomica. Pipeline runtime non ancora attiva (hook futuro).
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30
@@ -111,6 +115,14 @@ exit:
- "CLOSE_VOL"
- "CLOSE_DELTA"
# §7-bis (F): circuit breaker su drawdown rolling. Disabilitato di
# default — abilitarlo solo dopo abbastanza posizioni chiuse.
auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_weeks: 2
execution:
environment: "testnet" # testnet|mainnet — kill switch on broker mismatch
eur_to_usd: "1.075" # default FX rate for sizing engine; override at boot
+1
View File
@@ -129,6 +129,7 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None:
"backup",
"manual_actions",
"market_snapshot",
"option_chain_snapshot",
}
+157
View File
@@ -0,0 +1,157 @@
"""TDD per :mod:`cerbero_bite.runtime.auto_pause` (§7-bis F)."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.config.schema import AutoPauseConfig
from cerbero_bite.runtime.auto_pause import (
evaluate_drawdown_breach,
is_paused,
pause_until,
)
from cerbero_bite.state.models import SystemStateRecord
_NOW = datetime(2026, 5, 1, 14, 0, tzinfo=UTC)
def _state(**overrides: object) -> SystemStateRecord:
base: dict[str, object] = {
"kill_switch": 0,
"last_health_check": _NOW,
"config_version": "1.0.0",
"started_at": _NOW - timedelta(hours=1),
}
base.update(overrides)
return SystemStateRecord(**base) # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# is_paused
# ---------------------------------------------------------------------------
def test_is_paused_returns_false_when_state_is_none() -> None:
status = is_paused(None, now=_NOW)
assert status.paused is False
def test_is_paused_returns_false_when_until_is_none() -> None:
status = is_paused(_state(), now=_NOW)
assert status.paused is False
def test_is_paused_returns_true_when_until_in_future() -> None:
status = is_paused(
_state(auto_pause_until=_NOW + timedelta(weeks=2),
auto_pause_reason="DD breach"),
now=_NOW,
)
assert status.paused is True
assert status.reason == "DD breach"
def test_is_paused_returns_false_when_until_in_past() -> None:
status = is_paused(
_state(auto_pause_until=_NOW - timedelta(seconds=1)),
now=_NOW,
)
assert status.paused is False
# ---------------------------------------------------------------------------
# pause_until
# ---------------------------------------------------------------------------
def test_pause_until_adds_weeks() -> None:
until = pause_until(_NOW, weeks=2)
assert until == _NOW + timedelta(weeks=2)
def test_pause_until_clamps_to_one_week_minimum() -> None:
# weeks <= 0 deve cmq dare almeno 1 settimana di pausa, altrimenti
# la cron settimanale potrebbe scattare comunque.
assert pause_until(_NOW, weeks=0) == _NOW + timedelta(weeks=1)
assert pause_until(_NOW, weeks=-3) == _NOW + timedelta(weeks=1)
# ---------------------------------------------------------------------------
# evaluate_drawdown_breach
# ---------------------------------------------------------------------------
def _cfg(**overrides: object) -> AutoPauseConfig:
base: dict[str, object] = {
"enabled": True,
"lookback_trades": 5,
"max_drawdown_pct": Decimal("0.10"),
"pause_weeks": 2,
}
base.update(overrides)
return AutoPauseConfig(**base) # type: ignore[arg-type]
def test_drawdown_breach_when_enabled_and_threshold_exceeded() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-50"), Decimal("-60"), Decimal("-40"),
Decimal("-30"), Decimal("-20")], # cum 200 USD
capital_usd=Decimal("1500"),
)
# |200| / 1500 = 0.133 > 0.10
assert decision.should_pause is True
assert decision.reason is not None
assert "rolling DD" in decision.reason
def test_no_breach_when_filter_disabled() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(enabled=False),
recent_pnl_usd=[Decimal("-200")] * 5, # massacro
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_lookback_insufficient() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(lookback_trades=5),
recent_pnl_usd=[Decimal("-100")] * 3, # solo 3 trade, serve 5
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_cumulative_positive() -> None:
# Anche con tante perdite, se la somma è positiva non scattiamo.
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-100"), Decimal("-50"),
Decimal("300"), Decimal("-20"), Decimal("-10")],
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_below_threshold() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-30")] * 5, # cum 150 / 1500 = 10% esatto
capital_usd=Decimal("1500"),
)
# esattamente alla soglia (>=) ⇒ pausa armata
assert decision.should_pause is True
def test_no_breach_when_capital_zero_or_negative() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-100")] * 5,
capital_usd=Decimal("0"),
)
assert decision.should_pause is False
+259
View File
@@ -0,0 +1,259 @@
"""TDD per :mod:`cerbero_bite.core.backtest`."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.config import StrategyConfig, golden_config
from cerbero_bite.core.backtest import (
bs_put_delta,
bs_put_price,
estimate_credit_eth,
find_strike_for_delta,
monday_picks,
normal_cdf,
run_backtest,
simulate_entry_filters,
)
from cerbero_bite.state.models import MarketSnapshotRecord
# ---------------------------------------------------------------------------
# Black-Scholes helpers
# ---------------------------------------------------------------------------
def test_normal_cdf_known_values() -> None:
assert normal_cdf(0.0) == pytest.approx(0.5, abs=1e-6)
assert normal_cdf(1.0) == pytest.approx(0.8413, abs=1e-3)
assert normal_cdf(-1.0) == pytest.approx(0.1587, abs=1e-3)
assert normal_cdf(2.0) == pytest.approx(0.9772, abs=1e-3)
def test_bs_put_price_atm_positive_and_less_than_strike() -> None:
p = bs_put_price(spot=3000, strike=3000, t_years=18 / 365, sigma=0.50)
assert p > 0
assert p < 3000 # cap
def test_bs_put_price_far_otm_close_to_zero() -> None:
p = bs_put_price(spot=3000, strike=1500, t_years=18 / 365, sigma=0.50)
assert 0 <= p < 5 # essentially zero
def test_bs_put_delta_atm_around_minus_half() -> None:
d = bs_put_delta(spot=3000, strike=3000, t_years=18 / 365, sigma=0.50)
assert d == pytest.approx(-0.475, abs=0.05)
def test_bs_put_delta_far_otm_close_to_zero() -> None:
d = bs_put_delta(spot=3000, strike=1500, t_years=18 / 365, sigma=0.50)
assert -0.05 < d <= 0
def test_find_strike_for_delta_monotone() -> None:
spot = 3000.0
dvol = 50.0
dte = 18
s_010 = find_strike_for_delta(
spot=spot, dvol_pct=dvol, dte_days=dte, target_delta_abs=0.10,
)
s_020 = find_strike_for_delta(
spot=spot, dvol_pct=dvol, dte_days=dte, target_delta_abs=0.20,
)
# |Δ|=0.20 (più ITM) ⇒ strike più alto di |Δ|=0.10 (più OTM).
assert s_020 > s_010
# Verifica che il delta corrisponda a target ± tolleranza.
achieved = abs(
bs_put_delta(
spot=spot, strike=s_020, t_years=dte / 365, sigma=dvol / 100,
)
)
assert achieved == pytest.approx(0.20, abs=0.02)
def test_estimate_credit_returns_positive_credit_in_normal_regime() -> None:
credit_eth, short_k, long_k = estimate_credit_eth(
spot=3000, dvol_pct=50, dte_days=18, width_pct=0.04, delta_target_abs=0.12,
)
# Sanity: credit > 0, short_k < spot, long_k = short_k - 4%×spot
assert credit_eth > 0
assert short_k < 3000
assert long_k < short_k
assert short_k - long_k == pytest.approx(0.04 * 3000, abs=1.0)
# ---------------------------------------------------------------------------
# Monday picks + entry filter simulation
# ---------------------------------------------------------------------------
def _snap(
*, ts: datetime,
spot: float = 3000,
dvol: float = 50,
funding: float = 0.0,
macro_d: int | None = None,
asset: str = "ETH",
) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=ts,
asset=asset,
spot=Decimal(str(spot)),
dvol=Decimal(str(dvol)),
funding_perp_annualized=Decimal(str(funding)),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("100"),
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=macro_d,
fetch_ok=True,
)
def test_monday_picks_extracts_one_per_iso_week() -> None:
monday_2026_05_04 = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
monday_2026_05_11 = datetime(2026, 5, 11, 14, 0, tzinfo=UTC)
snapshots = [
_snap(ts=monday_2026_05_04),
_snap(ts=monday_2026_05_04 + timedelta(minutes=15)), # not picked
_snap(ts=monday_2026_05_11),
]
picks = monday_picks(snapshots)
assert len(picks) == 2
assert picks[0].timestamp == monday_2026_05_04
assert picks[1].timestamp == monday_2026_05_11
def test_monday_picks_skips_other_days_and_hours() -> None:
snapshots = [
_snap(ts=datetime(2026, 5, 4, 13, 0, tzinfo=UTC)), # Monday 13:00
_snap(ts=datetime(2026, 5, 5, 14, 0, tzinfo=UTC)), # Tuesday 14:00
]
assert monday_picks(snapshots) == []
def test_monday_picks_filters_by_asset() -> None:
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [
_snap(ts=monday, asset="BTC"),
_snap(ts=monday, asset="ETH"),
]
picks = monday_picks(snapshots, asset="ETH")
assert len(picks) == 1
assert picks[0].snapshot.asset == "ETH"
def test_simulate_entry_filters_accepts_clean_snapshot(
) -> None:
cfg: StrategyConfig = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=50, funding=0.10)
picks = [
type("MP", (), {"timestamp": monday, "snapshot": snap})() # type: ignore[arg-type]
]
# Hack: build via real dataclass
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert len(results) == 1
assert results[0].accepted is True
def test_simulate_entry_filters_rejects_dvol_out_of_band() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=20, funding=0.10) # below 35
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert any("dvol" in r.lower() for r in results[0].reasons)
def test_simulate_entry_filters_skips_incomplete_snapshot() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
incomplete = MarketSnapshotRecord(
timestamp=monday, asset="ETH", spot=Decimal("3000"),
# dvol=None ⇒ skipped
fetch_ok=False,
)
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=incomplete)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert results[0].skipped_for_data is True
# ---------------------------------------------------------------------------
# Full pipeline (sintetico)
# ---------------------------------------------------------------------------
def _synthetic_year_of_snapshots(
*,
n_weeks: int = 8,
spot: float = 3000,
dvol: float = 60, # con skew_premium 1.5 ⇒ credit/width ≈ 35% (sopra soglia 30%)
funding: float = 0.10,
) -> list[MarketSnapshotRecord]:
"""Genera N settimane di snapshot sintetici ETH a 4 tick/settimana."""
rows: list[MarketSnapshotRecord] = []
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
for week in range(n_weeks):
base = monday + timedelta(weeks=week)
# Lunedì 14:00 è il pick
rows.append(_snap(ts=base, spot=spot, dvol=dvol, funding=funding))
# Tick intermedi che NON cadono di lunedì alle 14:00:
# offset +1h così vengono ignorati da `monday_picks`.
for d in (2, 8, 14, 19):
rows.append(
_snap(
ts=base + timedelta(days=d, hours=1),
spot=spot * (1 + 0.005 * d), # +0.5% al giorno
dvol=dvol - 1.5 * d, # vol che scende lentamente
funding=funding,
)
)
return rows
def test_run_backtest_produces_report_with_trades() -> None:
# Per il test scaliamo il credit/width gate al 15%: il modello BS
# senza skew completo sottostima i premi OTM rispetto al reale.
# Vedi `estimate_credit_eth.skew_premium` docstring per dettagli.
from cerbero_bite.config.schema import StructureConfig
cfg = golden_config()
cfg = cfg.model_copy(
update={
"structure": StructureConfig(
**{
**cfg.structure.model_dump(),
"credit_to_width_ratio_min": Decimal("0.15"),
}
)
}
)
snapshots = _synthetic_year_of_snapshots(n_weeks=4)
report = run_backtest(snapshots, cfg, capital_usd=Decimal("1500"))
# Sanity: 4 picks, almeno 1 trade chiuso
assert report.n_picks == 4
assert report.n_completed >= 1
assert report.cumulative_pnl_usd != Decimal("0")
# Bull-put + ETH al rialzo + DVOL che scende ⇒ atteso win
assert report.n_winners >= 1
def test_run_backtest_handles_no_picks_gracefully() -> None:
cfg = golden_config()
# Solo tick infrasettimanali, niente Monday 14:00.
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [_snap(ts=monday + timedelta(hours=1))]
report = run_backtest(snapshots, cfg, capital_usd=Decimal("1500"))
assert report.n_picks == 0
assert report.n_completed == 0
assert report.cumulative_pnl_usd == Decimal("0")
+143
View File
@@ -329,3 +329,146 @@ def test_build_bear_call_breakeven_above_short_strike(
# breakeven = 3525 + 15 = 3540
assert proposal.breakeven == Decimal("3540")
assert proposal.spread_type == "bear_call"
# ---------------------------------------------------------------------------
# §3.2 (A): dynamic delta target by DVOL regime
# ---------------------------------------------------------------------------
def _cfg_with_delta_bands(cfg: StrategyConfig) -> StrategyConfig:
"""Profilo con step-function delta su DVOL.
Vol bassa (≤50) → delta 0.15 (più premio), vol media (≤70) →
0.12 (default), vol alta (≤90) → 0.10 (più safety distance).
"""
from cerbero_bite.config.schema import (
DeltaByDvolBand,
ShortStrikeSpec,
StructureConfig,
)
bands = [
DeltaByDvolBand(
dvol_under=Decimal("50"),
delta_target=Decimal("0.15"),
delta_min=Decimal("0.13"),
delta_max=Decimal("0.17"),
),
DeltaByDvolBand(
dvol_under=Decimal("70"),
delta_target=Decimal("0.12"),
delta_min=Decimal("0.10"),
delta_max=Decimal("0.15"),
),
DeltaByDvolBand(
dvol_under=Decimal("90"),
delta_target=Decimal("0.10"),
delta_min=Decimal("0.08"),
delta_max=Decimal("0.12"),
),
]
new_short = ShortStrikeSpec(
**{**cfg.structure.short_strike.model_dump(), "delta_by_dvol": bands}
)
return cfg.model_copy(
update={
"structure": StructureConfig(
**{**cfg.structure.model_dump(exclude={"short_strike"}),
"short_strike": new_short}
)
}
)
def _bull_put_chain_wide(now_dt: datetime) -> list[OptionQuote]:
"""Chain con shorts e longs per delta 0.10, 0.12, 0.15.
I mid sono tarati per superare il credit/width ≥ 30% per ogni
accoppiamento short→long testato (vedi commento §3.4).
"""
return [
# Shorts a delta 0.10 / 0.12 / 0.15 in OTM range [15-25%].
_quote(strike="2535", delta="-0.15", mid="0.026", now_dt=now_dt),
_quote(strike="2475", delta="-0.12", mid="0.020", now_dt=now_dt),
_quote(strike="2400", delta="-0.10", mid="0.015", now_dt=now_dt),
# Long candidati ~4% sotto ciascuno short.
_quote(strike="2415", delta="-0.10", mid="0.012", now_dt=now_dt),
_quote(strike="2355", delta="-0.08", mid="0.006", now_dt=now_dt),
_quote(strike="2280", delta="-0.06", mid="0.002", now_dt=now_dt),
]
def test_dynamic_delta_low_dvol_picks_higher_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=40 → banda con delta_target=0.15."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("40"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.15")
def test_dynamic_delta_mid_dvol_picks_default_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=60 → banda con delta_target=0.12."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("60"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.12")
def test_dynamic_delta_high_dvol_picks_lower_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=85 → banda con delta_target=0.10 (più safety distance)."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("85"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.10")
def test_dynamic_delta_disabled_default_uses_static_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""delta_by_dvol vuoto (default) → comportamento invariato."""
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg, # golden config: delta_by_dvol=[]
dvol_now=Decimal("40"),
)
assert res is not None
short, _ = res
# Delta target statico = 0.12, quindi torna lo strike a -0.12.
assert short.delta == Decimal("-0.12")
+1 -1
View File
@@ -68,7 +68,7 @@ def test_compute_hash_is_independent_of_recorded_hash_value(tmp_path: Path) -> N
def test_load_repo_strategy_yaml(tmp_path: Path) -> None:
"""The committed strategy.yaml validates with the recorded hash."""
result = load_strategy(REPO_ROOT / "strategy.yaml")
assert result.config.config_version == "1.1.0"
assert result.config.config_version == "1.3.0"
assert result.config.sizing.kelly_fraction == Decimal("0.13")
assert result.computed_hash == result.config.config_hash
+88
View File
@@ -271,3 +271,91 @@ def test_iron_condor_adverse_move_either_direction(cfg: StrategyConfig) -> None:
)
res = evaluate(snap, cfg)
assert res.action == "CLOSE_AVERSE"
# ---------------------------------------------------------------------------
# §7-bis (D): vol-collapse harvest
# ---------------------------------------------------------------------------
def _harvest_cfg(
cfg: StrategyConfig, *, threshold: str = "15"
) -> StrategyConfig:
"""Clona la golden config con la soglia di vol-harvest abilitata."""
from cerbero_bite.config import ExitConfig
return cfg.model_copy(
update={
"exit": ExitConfig(
**{
**cfg.exit.model_dump(),
"vol_harvest_dvol_decrease": Decimal(threshold),
}
)
}
)
def test_vol_harvest_disabled_by_default_does_not_fire(cfg: StrategyConfig) -> None:
# Default: vol_harvest_dvol_decrease = 0 ⇒ filtro disabilitato.
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022", # in profit (debit < credit)
dvol_at_entry="60",
dvol_now="40", # crollato di 20 punti
)
res = evaluate(snap, cfg)
assert res.action == "HOLD"
def test_vol_harvest_fires_when_dvol_collapsed_in_profit(
cfg: StrategyConfig,
) -> None:
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022", # in profit ma sopra profit_take 50%
dvol_at_entry="60",
dvol_now="42", # 18, supera la soglia 15
)
res = evaluate(snap, harvest)
assert res.action == "CLOSE_VOL_HARVEST"
assert "harvest" in res.reason
def test_vol_harvest_does_not_fire_when_in_loss(cfg: StrategyConfig) -> None:
# Anche se DVOL crolla, se siamo in perdita non vogliamo harvest:
# è una funzione di "esci con il profitto in mano", non un panico.
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.040", # debit > credit ⇒ in perdita
dvol_at_entry="60",
dvol_now="42",
)
res = evaluate(snap, harvest)
assert res.action != "CLOSE_VOL_HARVEST"
def test_vol_harvest_does_not_fire_below_threshold(cfg: StrategyConfig) -> None:
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022",
dvol_at_entry="60",
dvol_now="50", # 10, sotto la soglia 15
)
res = evaluate(snap, harvest)
assert res.action == "HOLD"
def test_profit_take_wins_over_vol_harvest(cfg: StrategyConfig) -> None:
# Quando il profit-take è già colpito, non passiamo per vol-harvest.
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.014", # ≤ 50% credit ⇒ profit-take
dvol_at_entry="60",
dvol_now="42",
)
res = evaluate(snap, harvest)
assert res.action == "CLOSE_PROFIT"
@@ -0,0 +1,134 @@
"""TDD per :mod:`cerbero_bite.runtime.option_chain_snapshot_cycle`."""
from __future__ import annotations
from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
from cerbero_bite.clients.deribit import InstrumentMeta
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.state.models import OptionChainQuoteRecord
_NOW = datetime(2026, 5, 4, 13, 55, tzinfo=UTC)
def _meta(name: str, strike: int, expiry_dte: int = 18) -> InstrumentMeta:
expiry = _NOW.replace(hour=8, minute=0, second=0)
expiry = expiry.replace(day=expiry.day) + (
# add days
__import__("datetime").timedelta(days=expiry_dte)
)
return InstrumentMeta(
name=name,
strike=Decimal(str(strike)),
expiry=expiry,
option_type="P",
open_interest=Decimal("100"),
tick_size=Decimal("0.0005"),
min_trade_amount=Decimal("1"),
)
def _ticker(name: str, *, mark: float = 0.020, bid: float = 0.018,
ask: float = 0.022, delta: float = -0.12) -> dict:
return {
"instrument_name": name,
"bid": bid,
"ask": ask,
"mark_price": mark,
"mark_iv": 60.0,
"volume_24h": 50,
"greeks": {
"delta": delta,
"gamma": 0.001,
"theta": -0.0005,
"vega": 0.10,
},
}
@pytest.fixture
def cfg() -> object:
from cerbero_bite.config import golden_config
return golden_config()
@pytest.fixture
def fake_ctx(cfg: object) -> MagicMock:
"""Mock minimal RuntimeContext."""
ctx = MagicMock()
ctx.cfg = cfg
ctx.db_path = ":memory:"
return ctx
@pytest.mark.asyncio
async def test_collector_persists_one_quote_per_instrument(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475), _meta("ETH-21MAY26-2400-P", 2400)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(
return_value=[_ticker(m.name) for m in metas]
)
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, asset="ETH", now=_NOW)
assert n == 2
assert len(persisted) == 1
assert {q.instrument_name for q in persisted[0]} == {
"ETH-21MAY26-2475-P", "ETH-21MAY26-2400-P",
}
# Tutti i quote condividono il timestamp del cron tick.
assert all(q.timestamp == _NOW for q in persisted[0])
@pytest.mark.asyncio
async def test_collector_handles_missing_tickers_with_null_fields(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(return_value=[]) # vuoto
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 1
assert persisted[0][0].mid is None # ticker mancante ⇒ campi NULL
assert persisted[0][0].instrument_name == "ETH-21MAY26-2475-P"
@pytest.mark.asyncio
async def test_collector_returns_zero_when_chain_empty(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(return_value=[])
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0
@pytest.mark.asyncio
async def test_collector_swallows_chain_fetch_failure(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(side_effect=RuntimeError("boom"))
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0 # best-effort: non rilancia