3 Commits

Author SHA1 Message Date
root 954baaa354 feat(cli): comando option-chain (trigger + analyze) per la catena opzioni
Espone direttamente da CLI le due operazioni più utili sui dati di
``option_chain_snapshots`` raccolti dal cron settimanale:

- ``cerbero-bite option-chain trigger`` — esegue UNA volta il
  collector della catena. Riusa la stessa pipeline schedulata (cron
  ``55 13 * * MON``) ma on-demand. Utile per popolare il DB senza
  aspettare lunedì.
- ``cerbero-bite option-chain analyze [--bias bull_put|bear_call]`` —
  legge l'ultimo snapshot, simula il selector di strike
  (``select_strikes``) con la strategy passata e stampa una tabella
  con: short/long strike, delta, width, credito reale, ratio
  credit/width, e PASS/FAIL del gate ``credit_to_width_ratio_min``.

Il comando ``analyze`` rende immediatamente actionable la catena
appena raccolta: invece di stime ex-ante via Black-Scholes (modulo
``core/backtest.py``), legge i mid REALI di Deribit e dice "il rule
engine aprirebbe questo trade qui? credit/width ratio passa o no?".

Esempio di output sui primi snapshot raccolti (regime ETH ~2200,
DTE ~14g):

    Snapshot del 2026-05-01T20:53:49 — 21 quote totali
    Il rule engine NON aprirebbe trade con questa catena
    (no strike compatibile coi gate delta/distance/width/credit-ratio).

Conferma empirica del messaggio del documento ``13-strategia-spiegata``:
con delta target 0.12 + width 4% + credit/width ≥ 30%, il regime
attuale di ETH options non è abbastanza ricco per produrre trade —
serve calibrare soglie o aspettare un regime IV più alto.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:57:40 +00:00
root 3e46169278 fix(migrations): rinomina 0004 → 0005 per coesistenza con auto_pause
La migrazione `0004_option_chain_snapshots.sql` collide con quella
parallela `0004_auto_pause.sql` del PR `feat/strategy-improvements-fdac`:
entrambe puntano allo stesso slot e bumpano user_version a 4.

Rinominata a 0005 (con `PRAGMA user_version = 5`) così le due
migrazioni possono coesistere senza conflitti, indipendentemente
dall'ordine di merge dei due PR. Quando i due PR landeranno in main,
basterà conservare la sequenza 0004 (auto_pause) → 0005 (option_chain).

Verificato in locale: deploy con DB già a v4 (post-FDAC) ora applica
correttamente la migrazione e crea la tabella `option_chain_snapshots`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:52:11 +00:00
root c0a0ee416f feat(state+runtime): option_chain_snapshots — catena opzioni storica per backtest reale
Aggiunge la persistence della option chain Deribit con cron settimanale
``55 13 * * MON`` (5 minuti prima del trigger entry alle 14:00 UTC),
sbloccando il backtest non-stilizzato e la calibrazione empirica
dello skew premium.

**Schema (migrazione 0004)**

Nuova tabella ``option_chain_snapshots`` con primary key composta
``(timestamp, instrument_name)`` — tutti i quote prelevati nello
stesso tick condividono il timestamp, così le query "lo snapshot del
2026-05-04 alle 13:55" diventano una singola WHERE timestamp = X.
Indici su (asset, timestamp DESC) e (asset, expiry) per supportare
sia listing recenti sia query per scadenza specifica.

Campi: instrument_name, strike, expiry, option_type (C/P), bid, ask,
mid, iv, delta, gamma, theta, vega, open_interest, volume_24h,
book_depth_top3. Tutti i numerici sono nullable: il collector è
best-effort, un ticker mancante produce comunque una riga (utile
per sapere che lo strumento esisteva ma non era quotato).

**Modello + repository**

- ``OptionChainQuoteRecord`` (Pydantic, in ``state/models.py``).
- ``Repository.record_option_chain_snapshot`` (bulk insert
  idempotente).
- ``Repository.list_option_chain_snapshots`` (filtri su asset,
  timestamp window, expiry window, limit default 50000).
- ``Repository.latest_option_chain_timestamp`` (freshness check
  per dashboard GUI).

**Collector**

Nuovo ``runtime/option_chain_snapshot_cycle.py`` che:

1. Calcola la finestra scadenze ``[now+dte_min, now+dte_max]`` da
   ``cfg.structure``: niente richieste su scadenze che il rule
   engine non userebbe mai.
2. Chiama ``deribit.options_chain()`` con
   ``min_open_interest=cfg.liquidity.open_interest_min``.
3. Batch ``deribit.get_tickers()`` (max 20 per call, limite Deribit)
   con error-isolation per batch — un batch fallito non blocca
   gli altri.
4. NON chiama l'order book per ogni strike (rate-limit guard);
   ``book_depth_top3`` resta NULL e il liquidity gate live lo
   chiede on-the-fly per gli strike candidati al picker.

Best-effort end-to-end: chain assente, get_tickers giù, persist
fallito → ritorna 0 senza alzare eccezioni, logga sempre.

**Schedulazione**

Wired in ``Orchestrator.install_scheduler`` come job parallelo a
``market_snapshot``, attivo solo quando
``ENABLE_DATA_ANALYSIS=true``. Cron parametrizzabile via il nuovo
kwarg ``option_chain_cron`` (default ``55 13 * * MON``).

**Test**

- 4 unit test del collector (happy path, ticker mancante, chain
  vuota, fetch fail best-effort) con mock di RuntimeContext.
- Aggiornato ``test_install_scheduler_registers_canonical_jobs``
  per includere il nuovo job nel set canonico.

**Cosa sblocca**

- Backtest non-stilizzato: il PR ``feat/backtest-engine`` può
  dropparsi il modello BS+skew_premium e leggere prezzi reali
  ``mid`` dalla chain registrata.
- Calibrazione empirica dello skew premium (hardcoded a 1.5 nel
  backtest stilizzato): plot del rapporto fra quote reali Deribit
  e BS per delta/expiry, regressione → valore data-driven.
- Validazione ex-post: "il delta-0.12 era davvero a 25% OTM in
  quella settimana?" diventa una query SELECT.
- Dimensione attesa: ~50 strike × 3 scadenze × 1 snapshot/settimana
  × 17 colonne ≈ 12 KB/settimana, ~600 KB/anno. Trascurabile.

Suite: 409 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:44:49 +00:00
22 changed files with 806 additions and 1102 deletions
+236 -1
View File
@@ -13,7 +13,7 @@ import asyncio
import os import os
import sys import sys
from collections.abc import Callable from collections.abc import Callable
from datetime import UTC, datetime from datetime import UTC, datetime, timedelta
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -812,6 +812,241 @@ def state_inspect(db: Path) -> None:
console.print(table) console.print(table)
@main.group(name="option-chain")
def option_chain() -> None:
"""Strumenti per la catena opzioni storica (`option_chain_snapshots`)."""
@option_chain.command(name="trigger")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--audit",
"audit_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
@click.option(
"--token",
type=str,
default=None,
help="MCP bearer token (override su CERBERO_BITE_MCP_TOKEN).",
)
@click.option("--asset", default="ETH", show_default=True)
def option_chain_trigger(
strategy_path: Path,
db_path: Path,
audit_path: Path,
token: str | None,
asset: str,
) -> None:
"""Esegue UNA volta il collector della catena opzioni e persiste in DB.
Utile per popolare i dati senza aspettare il cron settimanale del
job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline
schedulata.
"""
from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415
from cerbero_bite.runtime.option_chain_snapshot_cycle import ( # noqa: PLC0415
collect_option_chain_snapshot,
)
cfg = load_strategy(strategy_path).config
ctx = build_runtime(
cfg=cfg,
endpoints=load_endpoints(),
token=load_token(value=token),
db_path=db_path,
audit_path=audit_path,
bot_tag=load_bot_tag(),
)
n = asyncio.run(collect_option_chain_snapshot(ctx, asset=asset))
console.print(
f"[green]Persisted {n} option chain quote(s) for {asset}[/green]"
if n > 0
else f"[yellow]No quotes persisted (chain empty or fetch failed)[/yellow]"
)
@option_chain.command(name="analyze")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option("--asset", default="ETH", show_default=True)
@click.option(
"--bias",
type=click.Choice(["bull_put", "bear_call"], case_sensitive=False),
default="bull_put",
show_default=True,
help="Direzione da simulare (il rule engine lo deciderebbe da trend×funding).",
)
def option_chain_analyze(
strategy_path: Path,
db_path: Path,
asset: str,
bias: str,
) -> None:
"""Analizza l'ultimo snapshot di catena salvato.
Per la strategia indicata, simula la selezione strike (delta
target, OTM range, width 4%, credit/width ratio min) e mostra:
* lo strike che il rule engine sceglierebbe come short e long,
* credito atteso, larghezza, rapporto credit/width,
* pass/fail del gate `credit_to_width_ratio_min`.
"""
from cerbero_bite.core.combo_builder import select_strikes # noqa: PLC0415
from cerbero_bite.core.types import OptionQuote # noqa: PLC0415
cfg = load_strategy(strategy_path).config
conn = connect_state(db_path)
try:
repo = Repository()
latest_ts = repo.latest_option_chain_timestamp(conn, asset=asset.upper())
if latest_ts is None:
console.print(
"[red]Nessuno snapshot di catena trovato. Lancia prima "
"`cerbero-bite option-chain trigger`.[/red]"
)
sys.exit(1)
quotes_records = repo.list_option_chain_snapshots(
conn, asset=asset.upper(), start=latest_ts, end=latest_ts,
)
finally:
conn.close()
console.print(
f"[cyan]Snapshot del {latest_ts.isoformat()}{len(quotes_records)} "
f"quote totali[/cyan]"
)
# Costruzione OptionQuote da OptionChainQuoteRecord per riusare select_strikes.
quotes: list[OptionQuote] = []
for q in quotes_records:
if q.bid is None or q.ask is None or q.mid is None or q.delta is None:
continue
quotes.append(
OptionQuote(
instrument=q.instrument_name,
strike=q.strike,
expiry=q.expiry,
option_type=q.option_type,
bid=q.bid,
ask=q.ask,
mid=q.mid,
delta=q.delta,
gamma=q.gamma or Decimal("0"),
theta=q.theta or Decimal("0"),
vega=q.vega or Decimal("0"),
open_interest=q.open_interest or 0,
volume_24h=q.volume_24h or 0,
book_depth_top3=q.book_depth_top3 or 0,
)
)
if not quotes:
console.print("[red]Nessun quote completo per la simulazione.[/red]")
sys.exit(1)
# Lo spot al momento dello snapshot: estraiamo dall'ultimo
# `market_snapshot` ETH a quel timestamp (tolleranza ±15 min).
spot = _resolve_spot_at(db_path, asset=asset.upper(), at=latest_ts)
if spot is None:
console.print(
"[yellow]Spot non recuperabile dai market_snapshots; "
"stimato dal mid ATM.[/yellow]"
)
spot = _atm_spot_proxy(quotes)
selection = select_strikes(
chain=quotes,
bias=bias, # type: ignore[arg-type]
spot=spot,
now=latest_ts,
cfg=cfg,
)
if selection is None:
console.print(
"[red]Il rule engine NON aprirebbe trade con questa catena[/red] "
"(no strike compatibile coi gate delta/distance/width/credit-ratio)."
)
sys.exit(0)
short, long_ = selection
width_usd = (short.strike - long_.strike).copy_abs()
credit_eth = short.mid - long_.mid
credit_usd = credit_eth * spot
ratio = credit_usd / width_usd if width_usd > 0 else Decimal("0")
ratio_target = cfg.structure.credit_to_width_ratio_min
table = Table(title=f"Simulazione picker — bias={bias}, spot={spot:.0f}")
table.add_column("Campo", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Short strike", f"{short.strike} ({short.delta:+.3f}δ)")
table.add_row("Long strike", f"{long_.strike} ({long_.delta:+.3f}δ)")
table.add_row("Width", f"{width_usd:.0f} USD")
table.add_row("Credit", f"{credit_eth:.4f} ETH ≈ {credit_usd:.2f} USD")
table.add_row(
"Credit/width ratio",
f"{ratio:.2%} (gate ≥ {float(ratio_target):.0%})",
)
pass_str = (
"[green]PASS — entry possibile[/green]"
if ratio >= ratio_target
else "[red]FAIL — premio troppo magro[/red]"
)
table.add_row("Verdetto gate ratio", pass_str)
console.print(table)
def _resolve_spot_at(db_path: Path, *, asset: str, at: datetime) -> Decimal | None:
"""Best-effort lookup dello spot al timestamp ``at`` ± 15 min."""
conn = connect_state(db_path)
try:
rows = Repository().list_market_snapshots(
conn,
asset=asset,
start=at - timedelta(minutes=15),
end=at + timedelta(minutes=15),
limit=1,
)
finally:
conn.close()
if not rows:
return None
return rows[0].spot
def _atm_spot_proxy(quotes: list[Any]) -> Decimal:
"""Stima dello spot prendendo lo strike il cui delta è più vicino a 0.5."""
quote = min(quotes, key=lambda q: abs(abs(q.delta) - Decimal("0.5")))
return quote.strike
def _entrypoint() -> None: def _entrypoint() -> None:
"""Wrapper used by ``cerbero-bite`` console script.""" """Wrapper used by ``cerbero-bite`` console script."""
try: try:
-94
View File
@@ -81,17 +81,6 @@ class EntryConfig(BaseModel):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class DeltaByDvolBand(BaseModel):
"""Banda della step function delta-target per regime DVOL (§3.2 A)."""
model_config = ConfigDict(frozen=True, extra="forbid")
dvol_under: Decimal
delta_target: Decimal
delta_min: Decimal
delta_max: Decimal
class ShortStrikeSpec(BaseModel): class ShortStrikeSpec(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid") model_config = ConfigDict(frozen=True, extra="forbid")
@@ -101,16 +90,6 @@ class ShortStrikeSpec(BaseModel):
distance_otm_pct_min: Decimal = Field(default=Decimal("0.15")) distance_otm_pct_min: Decimal = Field(default=Decimal("0.15"))
distance_otm_pct_max: Decimal = Field(default=Decimal("0.25")) distance_otm_pct_max: Decimal = Field(default=Decimal("0.25"))
# §3.2 enhancement (A): step function delta-target by DVOL regime.
# Empty list = behaviour invariato (delta_target sopra è il singolo
# valore). Quando popolato, il combo_builder sceglie la prima
# banda ordinata ascending su `dvol_under` con
# `dvol_now ≤ dvol_under`. Esempio:
# - dvol_under=50 → delta 0.15 (bassa vol → più premio)
# - dvol_under=70 → delta 0.12
# - dvol_under=90 → delta 0.10 (alta vol → più safety)
delta_by_dvol: list[DeltaByDvolBand] = Field(default_factory=list)
class SpreadWidthSpec(BaseModel): class SpreadWidthSpec(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid") model_config = ConfigDict(frozen=True, extra="forbid")
@@ -186,25 +165,6 @@ class SizingConfig(BaseModel):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class PartialProfitLevel(BaseModel):
"""Livello della scala di profit-take graduale (§7.1bis C).
`mark_at_pct_credit`: il livello è triggerato quando
`mark_combo ≤ mark_at_pct_credit × credito_iniziale` (es. 0.25 =
25% del credito = 75% di profitto sulla porzione chiusa).
`close_pct_of_initial_contracts`: frazione dei contratti aperti
INIZIALMENTE da chiudere a questo livello (es. 0.50 = chiudi metà).
Le frazioni sono cumulative; chiudere oltre i contratti residui
è no-op.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
mark_at_pct_credit: Decimal
close_pct_of_initial_contracts: Decimal
class ExitConfig(BaseModel): class ExitConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid") model_config = ConfigDict(frozen=True, extra="forbid")
@@ -216,29 +176,6 @@ class ExitConfig(BaseModel):
delta_breach_threshold: Decimal = Field(default=Decimal("0.30")) delta_breach_threshold: Decimal = Field(default=Decimal("0.30"))
adverse_move_4h_pct: Decimal = Field(default=Decimal("0.05")) adverse_move_4h_pct: Decimal = Field(default=Decimal("0.05"))
# §7.1ter (D): vol-collapse harvest. Esce in profit anche se il
# profit-take non è ancora colpito quando DVOL è scesa di tot
# punti rispetto all'entry (edge IV-RV catturato, vol attesa già
# rientrata). 0 = filtro disabilitato.
vol_harvest_dvol_decrease: Decimal = Field(default=Decimal("0"))
# §7.1bis (C): scala graduata di profit-take. Lista vuota =
# comportamento invariato (chiusura atomica al
# `profit_take_pct_of_credit`). Quando popolata, l'engine
# interpreta come "chiudi N% dei contratti iniziali al livello
# di mark M%×credito". Le entry sono ordinate dal mark più alto
# (più profit, livello triggerato prima) al più basso. Vedi
# `core/exit_decision.py` per la semantica esatta.
#
# ATTENZIONE: questa funzione richiede il supporto di chiusure
# parziali nel runtime (entry_cycle / repository / clients).
# Fino al merge della partial-close pipeline, l'engine la mappa
# a CLOSE_PROFIT atomico al primo livello triggerato (vedi
# commento in `evaluate`). Default vuoto = no-op.
profit_take_partial_levels: list[PartialProfitLevel] = Field(
default_factory=list
)
monitor_cron: str = "0 2,14 * * *" monitor_cron: str = "0 2,14 * * *"
user_confirmation_timeout_min: int = 30 user_confirmation_timeout_min: int = 30
escalate_on_timeout: list[str] = Field( escalate_on_timeout: list[str] = Field(
@@ -246,36 +183,6 @@ class ExitConfig(BaseModel):
) )
# ---------------------------------------------------------------------------
# Auto-pause (F): circuit breaker su drawdown rolling
# ---------------------------------------------------------------------------
class AutoPauseConfig(BaseModel):
"""Configurazione del circuit breaker su drawdown.
Quando abilitato, il rule engine valuta — prima di ogni entry —
il P/L cumulato delle ultime `lookback_trades` posizioni chiuse
in proporzione al capitale attuale. Se la perdita supera la
soglia, l'engine si auto-mette in pausa per `pause_weeks`
settimane (skip-week). La pausa si annulla automaticamente alla
scadenza, oppure manualmente via comando dalla GUI.
Difende da regime change non rilevati dai filtri quant: se i
filtri stanno fallendo sistematicamente, vale la pena fermarsi
e attendere che le condizioni cambino, invece di continuare a
sanguinare. È un'estensione conservativa del kill switch
(che oggi reagisce solo a errori tecnici).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
enabled: bool = False
lookback_trades: int = 5
max_drawdown_pct: Decimal = Field(default=Decimal("0.10"))
pause_weeks: int = 2
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Kelly recalibration # Kelly recalibration
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -349,7 +256,6 @@ class StrategyConfig(BaseModel):
sizing: SizingConfig = Field(default_factory=SizingConfig) sizing: SizingConfig = Field(default_factory=SizingConfig)
exit: ExitConfig = Field(default_factory=ExitConfig) exit: ExitConfig = Field(default_factory=ExitConfig)
kelly_recalibration: KellyConfig = Field(default_factory=KellyConfig) kelly_recalibration: KellyConfig = Field(default_factory=KellyConfig)
auto_pause: AutoPauseConfig = Field(default_factory=AutoPauseConfig)
execution: ExecutionConfig = Field(default_factory=ExecutionConfig) execution: ExecutionConfig = Field(default_factory=ExecutionConfig)
monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig) monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig)
+3 -27
View File
@@ -83,49 +83,26 @@ def _pick_expiry(
return min(candidates, key=lambda exp: abs(candidates[exp] - sc.dte_target)) return min(candidates, key=lambda exp: abs(candidates[exp] - sc.dte_target))
def _resolve_delta_band(
sc: object, dvol_now: Decimal | None
) -> tuple[Decimal, Decimal, Decimal]:
"""Return (delta_target, delta_min, delta_max) per il regime DVOL corrente.
Quando ``sc.delta_by_dvol`` è popolato e ``dvol_now`` è disponibile,
sceglie la prima banda (ordinata ascending sulla ``dvol_under``) il
cui ``dvol_under ≥ dvol_now``. Altrimenti torna ai valori statici di
``sc``.
"""
bands = list(getattr(sc, "delta_by_dvol", []) or [])
if dvol_now is not None and bands:
bands_sorted = sorted(bands, key=lambda b: b.dvol_under)
for band in bands_sorted:
if dvol_now <= band.dvol_under:
return band.delta_target, band.delta_min, band.delta_max
last = bands_sorted[-1]
return last.delta_target, last.delta_min, last.delta_max
return sc.delta_target, sc.delta_min, sc.delta_max
def _select_short( def _select_short(
quotes: list[OptionQuote], quotes: list[OptionQuote],
*, *,
spot: Decimal, spot: Decimal,
cfg: StrategyConfig, cfg: StrategyConfig,
dvol_now: Decimal | None = None,
) -> OptionQuote | None: ) -> OptionQuote | None:
"""Pick the short-leg quote with delta closest to target inside both bands.""" """Pick the short-leg quote with delta closest to target inside both bands."""
sc = cfg.structure.short_strike sc = cfg.structure.short_strike
delta_target, delta_min, delta_max = _resolve_delta_band(sc, dvol_now)
eligible: list[OptionQuote] = [] eligible: list[OptionQuote] = []
for q in quotes: for q in quotes:
dist = (q.strike - spot).copy_abs() / spot dist = (q.strike - spot).copy_abs() / spot
if not (sc.distance_otm_pct_min <= dist <= sc.distance_otm_pct_max): if not (sc.distance_otm_pct_min <= dist <= sc.distance_otm_pct_max):
continue continue
abs_delta = q.delta.copy_abs() abs_delta = q.delta.copy_abs()
if not (delta_min <= abs_delta <= delta_max): if not (sc.delta_min <= abs_delta <= sc.delta_max):
continue continue
eligible.append(q) eligible.append(q)
if not eligible: if not eligible:
return None return None
return min(eligible, key=lambda q: abs(q.delta.copy_abs() - delta_target)) return min(eligible, key=lambda q: abs(q.delta.copy_abs() - sc.delta_target))
def _select_long( def _select_long(
@@ -166,7 +143,6 @@ def select_strikes(
spot: Decimal, spot: Decimal,
now: datetime, now: datetime,
cfg: StrategyConfig, cfg: StrategyConfig,
dvol_now: Decimal | None = None,
) -> tuple[OptionQuote, OptionQuote] | None: ) -> tuple[OptionQuote, OptionQuote] | None:
"""Return the (short, long) quotes for the requested vertical, or ``None``. """Return the (short, long) quotes for the requested vertical, or ``None``.
@@ -185,7 +161,7 @@ def select_strikes(
if not typed: if not typed:
return None return None
short = _select_short(typed, spot=spot, cfg=cfg, dvol_now=dvol_now) short = _select_short(typed, spot=spot, cfg=cfg)
if short is None: if short is None:
return None return None
-18
View File
@@ -28,10 +28,8 @@ __all__ = ["ExitAction", "ExitDecisionResult", "PositionSnapshot", "evaluate"]
ExitAction = Literal[ ExitAction = Literal[
"HOLD", "HOLD",
"CLOSE_PROFIT", "CLOSE_PROFIT",
"CLOSE_PROFIT_PARTIAL",
"CLOSE_STOP", "CLOSE_STOP",
"CLOSE_VOL", "CLOSE_VOL",
"CLOSE_VOL_HARVEST",
"CLOSE_TIME", "CLOSE_TIME",
"CLOSE_DELTA", "CLOSE_DELTA",
"CLOSE_AVERSE", "CLOSE_AVERSE",
@@ -117,22 +115,6 @@ def evaluate(snapshot: PositionSnapshot, cfg: StrategyConfig) -> ExitDecisionRes
f"mark {debit}{ec.profit_take_pct_of_credit:.0%} of credit {credit}", f"mark {debit}{ec.profit_take_pct_of_credit:.0%} of credit {credit}",
) )
# 1bis. Vol-collapse harvest (D): siamo IN profit (debit < credit) e
# la DVOL è scesa di tot punti rispetto all'entry. Edge IV-RV già
# catturato, non c'è motivo di tenere fino a profit_take. Esce
# opportunisticamente quando il regime di vol che giustificava
# l'entry non c'è più.
if (
ec.vol_harvest_dvol_decrease > 0
and debit < credit
and snapshot.dvol_now <= snapshot.dvol_at_entry - ec.vol_harvest_dvol_decrease
):
return _result(
"CLOSE_VOL_HARVEST",
f"DVOL {snapshot.dvol_now} ≤ entry {snapshot.dvol_at_entry} "
f"{ec.vol_harvest_dvol_decrease}, harvest while in profit",
)
# 2. Stop loss # 2. Stop loss
if debit >= stop_thresh: if debit >= stop_thresh:
return _result( return _result(
+15 -213
View File
@@ -347,44 +347,6 @@ def _profile_caps(strategy: object | None) -> dict[str, float]:
return out return out
def _detect_features(strategy: object | None) -> dict[str, bool]:
"""Quali miglioramenti del PR FDAC sono ATTIVI in questa strategia.
- **A** (delta dinamico): `short_strike.delta_by_dvol` non vuoto.
- **D** (vol-harvest): `exit.vol_harvest_dvol_decrease > 0`.
- **F** (auto-pause): `auto_pause.enabled = true`.
- **IV** (IV-richness gate, dal PR precedente): `entry.iv_minus_rv_filter_enabled`.
"""
feats = {"A": False, "D": False, "F": False, "IV": False}
if strategy is None:
return feats
try:
feats["A"] = bool(
getattr(strategy.structure.short_strike, "delta_by_dvol", []) # type: ignore[attr-defined]
)
except Exception:
pass
try:
feats["D"] = (
float(getattr(strategy.exit, "vol_harvest_dvol_decrease", 0)) > 0 # type: ignore[attr-defined]
)
except Exception:
pass
try:
feats["F"] = bool(
getattr(getattr(strategy, "auto_pause", None), "enabled", False)
)
except Exception:
pass
try:
feats["IV"] = bool(
getattr(strategy.entry, "iv_minus_rv_filter_enabled", False) # type: ignore[attr-defined]
)
except Exception:
pass
return feats
def _compute_pl( def _compute_pl(
caps: dict[str, float], caps: dict[str, float],
*, *,
@@ -393,56 +355,13 @@ def _compute_pl(
win_rate: float, win_rate: float,
trades_per_year: int, trades_per_year: int,
eur_to_usd: float = 1.075, eur_to_usd: float = 1.075,
features: dict[str, bool] | None = None,
) -> dict[str, float]: ) -> dict[str, float]:
"""Calcola le metriche P/L per un profilo di sizing. """Calcola le metriche P/L per un profilo di sizing."""
Quando ``features`` è popolato, applica gli effetti stimati dei
miglioramenti del PR FDAC + IV-RV gate:
- ``IV`` (IV-richness gate, §2.9): +5 pp win-rate, 25% trade/anno.
- ``A`` (delta dinamico, §3.2): +1.5 pp win-rate, sl_loss × 0.95.
- ``D`` (vol-harvest, §7-bis): 5% delle would-be-loss diventano
harvest exit a +0.20 × credito.
- ``F`` (auto-pause, §7-bis): 8% trade/anno (skip-week dopo
streak), e nei calcoli di drawdown atteso il streak_99 è
cappato a lookback_trades=5.
Effetti **stimati ex-ante** dalla letteratura short-vol systematic;
i valori puntuali andranno calibrati sul dataset accumulato.
"""
feats = features or {}
width = caps["width_pct"] * spot width = caps["width_pct"] * spot
credit = caps["credit_ratio"] * width credit = caps["credit_ratio"] * width
tp_profit = caps["profit_take"] * credit tp_profit = caps["profit_take"] * credit
sl_loss = (caps["stop_mult"] - 1.0) * credit sl_loss = (caps["stop_mult"] - 1.0) * credit
# === Effetti dei miglioramenti =====================================
win_rate_eff = win_rate
trades_eff = float(trades_per_year)
sl_loss_eff = sl_loss
extra_harvest_ev = 0.0
prob_harvest = 0.0
if feats.get("IV"):
# Skip più aggressivo + qualità migliore: +5 pp win, 25% trade.
win_rate_eff = min(0.95, win_rate_eff + 0.05)
trades_eff *= 0.75
if feats.get("A"):
# Migliore strike picking → +1.5 pp win-rate; riduzione del
# tail della perdita (5%) per le bande high-DVOL.
win_rate_eff = min(0.95, win_rate_eff + 0.015)
sl_loss_eff *= 0.95
if feats.get("D"):
# Vol-harvest: ~5% delle entrate intercettate prima dello stop
# con un piccolo profitto (+0.20×credit). Sottrae lo stesso
# volume dalle prob_loss.
prob_harvest = 0.05
extra_harvest_ev = 0.20 * credit
# F (auto-pause) agisce su streak_99 più sotto, e sul trades_eff.
if feats.get("F"):
trades_eff *= 0.92
cap_pertrade_usd = caps["cap_pertrade_eur"] * eur_to_usd cap_pertrade_usd = caps["cap_pertrade_eur"] * eur_to_usd
risk_target = min(caps["kelly"] * capital, cap_pertrade_usd) risk_target = min(caps["kelly"] * capital, cap_pertrade_usd)
n_kelly = int(risk_target // width) if width > 0 else 0 n_kelly = int(risk_target // width) if width > 0 else 0
@@ -450,31 +369,32 @@ def _compute_pl(
prob_time_stop = 0.07 prob_time_stop = 0.07
prob_other_stop = 0.03 prob_other_stop = 0.03
prob_loss = max( prob_loss = max(0.0, 1.0 - win_rate - prob_time_stop - prob_other_stop)
0.0,
1.0 - win_rate_eff - prob_time_stop - prob_other_stop - prob_harvest,
)
avg_time_stop_pl = 0.10 * credit avg_time_stop_pl = 0.10 * credit
e_trade_gross = ( e_trade_gross = (
win_rate_eff * tp_profit win_rate * tp_profit
- prob_loss * sl_loss_eff - prob_loss * sl_loss
+ prob_time_stop * avg_time_stop_pl + prob_time_stop * avg_time_stop_pl
+ prob_harvest * extra_harvest_ev
) )
fees = 0.0003 * spot * 2 fees = 0.0003 * spot * 2
slippage = 0.03 * credit slippage = 0.03 * credit
e_trade_net = e_trade_gross - fees - slippage e_trade_net = e_trade_gross - fees - slippage
# Multi-posizione concorrente: il P/L scala col numero di posizioni
# aperte simultaneamente (il loop entry crea N trade indipendenti
# quando max_concurrent > 1). Vedi caveat aggressiva.yaml: il
# supporto multi-asset richiede modifiche di codice; questo
# moltiplicatore stima cosa otterresti DOPO.
concurrency = max(1.0, caps["max_concurrent"]) concurrency = max(1.0, caps["max_concurrent"])
annual_pl = trades_eff * n_per_trade * concurrency * e_trade_net annual_pl = trades_per_year * n_per_trade * concurrency * e_trade_net
apr = (annual_pl / capital) if capital > 0 else 0.0 apr = (annual_pl / capital) if capital > 0 else 0.0
return { return {
"width": width, "width": width,
"credit": credit, "credit": credit,
"tp_profit": tp_profit, "tp_profit": tp_profit,
"sl_loss": sl_loss_eff, "sl_loss": sl_loss,
"risk_target": risk_target, "risk_target": risk_target,
"n_per_trade": float(n_per_trade), "n_per_trade": float(n_per_trade),
"concurrency": concurrency, "concurrency": concurrency,
@@ -483,10 +403,6 @@ def _compute_pl(
"apr": apr, "apr": apr,
"fees": fees, "fees": fees,
"slippage": slippage, "slippage": slippage,
"win_rate_eff": win_rate_eff,
"trades_eff": trades_eff,
"prob_loss": prob_loss,
"prob_harvest": prob_harvest,
} }
@@ -495,8 +411,6 @@ def _render_profile_card(
caps: dict[str, float], caps: dict[str, float],
metrics: dict[str, float], metrics: dict[str, float],
badge: str, badge: str,
features: dict[str, bool] | None = None,
metrics_base: dict[str, float] | None = None,
) -> None: ) -> None:
"""Rendering di un profilo (conservativo o aggressivo) in una colonna.""" """Rendering di un profilo (conservativo o aggressivo) in una colonna."""
st.markdown(f"### {label} {badge}") st.markdown(f"### {label} {badge}")
@@ -506,58 +420,23 @@ def _render_profile_card(
f"max {caps['max_n']:.0f} contratti × " f"max {caps['max_n']:.0f} contratti × "
f"{caps['max_concurrent']:.0f} pos. concorrenti" f"{caps['max_concurrent']:.0f} pos. concorrenti"
) )
if features:
active = [k for k, v in features.items() if v]
if active:
st.caption(
"🟢 Miglioramenti attivi: "
+ " · ".join(
{
"IV": "**IV-RV gate**",
"A": "**A** delta dinamico",
"D": "**D** vol-harvest",
"F": "**F** auto-pause",
}.get(k, k)
for k in active
)
)
else:
st.caption("⚪ Nessun miglioramento attivo (formula base)")
cols = st.columns(2) cols = st.columns(2)
cols[0].metric("Contratti per trade", f"{metrics['n_per_trade']:.0f}") cols[0].metric("Contratti per trade", f"{metrics['n_per_trade']:.0f}")
cols[1].metric("Posizioni concorrenti", f"{metrics['concurrency']:.0f}") cols[1].metric("Posizioni concorrenti", f"{metrics['concurrency']:.0f}")
cols = st.columns(2) cols = st.columns(2)
e_delta = (
f"{metrics['e_trade_net'] - metrics_base['e_trade_net']:+.1f}"
if metrics_base
else None
)
pl_delta = (
f"{metrics['annual_pl'] - metrics_base['annual_pl']:+.0f} USD vs base"
if metrics_base
else f"{metrics['apr']:+.1%} APR"
)
cols[0].metric( cols[0].metric(
"E[trade] netto", "E[trade] netto",
f"{metrics['e_trade_net']:+.1f} USD", f"{metrics['e_trade_net']:+.1f} USD",
delta=e_delta,
help=( help=(
f"win_rate effettivo={metrics['win_rate_eff']:.0%}, " f"fees={metrics['fees']:.2f} USD, "
f"prob_loss={metrics['prob_loss']:.0%}, " f"slippage={metrics['slippage']:.2f} USD"
f"trade/anno={metrics['trades_eff']:.0f}"
), ),
) )
cols[1].metric( cols[1].metric(
"P/L annuo stimato", "P/L annuo stimato",
f"{metrics['annual_pl']:+.0f} USD", f"{metrics['annual_pl']:+.0f} USD",
delta=f"{metrics['apr']:+.1%} APR" + ( delta=f"{metrics['apr']:+.1%} APR",
f" ({metrics['annual_pl'] - metrics_base['annual_pl']:+.0f} vs base)"
if metrics_base
else ""
),
) )
if metrics["n_per_trade"] == 0: if metrics["n_per_trade"] == 0:
@@ -602,45 +481,12 @@ def _render_pl_panel(
cons_caps = _profile_caps(strategy_conservativa or strategy_main) cons_caps = _profile_caps(strategy_conservativa or strategy_main)
aggr_caps = _profile_caps(strategy_aggressiva) aggr_caps = _profile_caps(strategy_aggressiva)
cons_feats = _detect_features(strategy_conservativa or strategy_main)
aggr_feats = _detect_features(strategy_aggressiva)
apply_features = st.checkbox(
"Applica gli effetti dei miglioramenti FDAC + IV-RV gate "
"letti dai due `strategy.*.yaml`",
value=True,
help=(
"Quando ON, ogni colonna applica gli effetti stimati delle "
"feature attive nel rispettivo profilo. OFF = formula base "
"(senza miglioramenti) per confronto pulito."
),
)
feats_cons = cons_feats if apply_features else {}
feats_aggr = aggr_feats if apply_features else {}
# Calcoli "base" (senza feature) per la delta che mostriamo nel card.
cons_base = _compute_pl(
cons_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
)
aggr_base = _compute_pl(
aggr_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
)
cons = _compute_pl( cons = _compute_pl(
cons_caps, cons_caps,
capital=capital, capital=capital,
spot=spot, spot=spot,
win_rate=win_rate, win_rate=win_rate,
trades_per_year=trades_per_year, trades_per_year=trades_per_year,
features=feats_cons,
) )
aggr = _compute_pl( aggr = _compute_pl(
aggr_caps, aggr_caps,
@@ -648,7 +494,6 @@ def _render_pl_panel(
spot=spot, spot=spot,
win_rate=win_rate, win_rate=win_rate,
trades_per_year=trades_per_year, trades_per_year=trades_per_year,
features=feats_aggr,
) )
col_cons, col_aggr = st.columns(2) col_cons, col_aggr = st.columns(2)
@@ -657,9 +502,7 @@ def _render_pl_panel(
"🛡️ Conservativa", "🛡️ Conservativa",
cons_caps, cons_caps,
cons, cons,
"_(golden config v1.2.0)_", "_(golden config v1.0.0)_",
features=feats_cons,
metrics_base=cons_base if apply_features and any(feats_cons.values()) else None,
) )
with col_aggr: with col_aggr:
_render_profile_card( _render_profile_card(
@@ -667,8 +510,6 @@ def _render_pl_panel(
aggr_caps, aggr_caps,
aggr, aggr,
"_(deroga §11, richiede paper trading)_", "_(deroga §11, richiede paper trading)_",
features=feats_aggr,
metrics_base=aggr_base if apply_features and any(feats_aggr.values()) else None,
) )
if aggr["annual_pl"] > 0 and cons["annual_pl"] > 0: if aggr["annual_pl"] > 0 and cons["annual_pl"] > 0:
@@ -689,43 +530,6 @@ def _render_pl_panel(
"viable." "viable."
) )
# === Mini-tabella: contributo marginale di ogni feature =====
if apply_features and (any(feats_cons.values()) or any(feats_aggr.values())):
st.markdown("**Contributo marginale di ogni feature** (profilo aggressivo)")
contrib_rows = []
for label, key in [
("IV — IV-richness gate", "IV"),
("A — Delta dinamico", "A"),
("D — Vol-harvest", "D"),
("F — Auto-pause", "F"),
]:
single_feat = {key: True}
m = _compute_pl(
aggr_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
features=single_feat,
)
delta_pl = m["annual_pl"] - aggr_base["annual_pl"]
delta_apr = m["apr"] - aggr_base["apr"]
active = "" if aggr_feats.get(key) else ""
contrib_rows.append(
{
"Feature": label,
"Attiva nel YAML": active,
"ΔP/L annuo (solo questa)": f"{delta_pl:+.0f} USD",
"ΔAPR": f"{delta_apr:+.1%}",
}
)
st.table(contrib_rows)
st.caption(
"Ogni riga mostra il contributo del SINGOLO feature (le altre "
"spente). Effetti stimati ex-ante; calibrabili sui dati "
"raccolti via `📐 Calibrazione`."
)
# Sensibilità win-rate per il profilo aggressivo (più informativo) # Sensibilità win-rate per il profilo aggressivo (più informativo)
st.markdown("**Sensibilità al win rate** (profilo aggressivo)") st.markdown("**Sensibilità al win rate** (profilo aggressivo)")
sens_rows = [] sens_rows = []
@@ -736,7 +540,6 @@ def _render_pl_panel(
spot=spot, spot=spot,
win_rate=wr, win_rate=wr,
trades_per_year=trades_per_year, trades_per_year=trades_per_year,
features=feats_aggr,
) )
m_c = _compute_pl( m_c = _compute_pl(
cons_caps, cons_caps,
@@ -744,7 +547,6 @@ def _render_pl_panel(
spot=spot, spot=spot,
win_rate=wr, win_rate=wr,
trades_per_year=trades_per_year, trades_per_year=trades_per_year,
features=feats_cons,
) )
sens_rows.append( sens_rows.append(
{ {
-175
View File
@@ -1,175 +0,0 @@
"""Auto-pause circuit breaker (§7-bis F).
Pure-function evaluation that consults `system_state.auto_pause_until`
and the rolling P/L of the last N closed positions to decide whether
the engine should skip an entry cycle.
Two responsibilities, both deterministic at call time:
* :func:`is_paused` — returns ``True`` when the persisted
``auto_pause_until`` is in the future. Independent from the kill
switch, which targets technical errors.
* :func:`evaluate_drawdown_breach` — given the last N closed P/Ls and
the current capital, returns whether the rolling drawdown breached
the configured ``max_drawdown_pct`` threshold. The orchestrator
layer is the one that flips the persisted state on breach (this
module stays I/O-free for testability).
The two are separated on purpose: ``is_paused`` is the cheap,
read-only gate consulted at the start of every entry cycle; the
breach evaluation runs once per cycle right after the entry
filtering, before the entry is actually placed.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from decimal import Decimal
from cerbero_bite.config.schema import AutoPauseConfig
from cerbero_bite.state.models import SystemStateRecord
__all__ = [
"AutoPauseDecision",
"PauseStatus",
"evaluate_drawdown_breach",
"is_paused",
"pause_until",
]
@dataclass(frozen=True)
class PauseStatus:
"""Snapshot del flag di auto-pausa al momento della valutazione."""
paused: bool
until: datetime | None
reason: str | None
@dataclass(frozen=True)
class AutoPauseDecision:
"""Esito di :func:`evaluate_drawdown_breach`."""
should_pause: bool
cumulative_pnl_usd: Decimal
drawdown_pct: Decimal
threshold_pct: Decimal
reason: str | None
def is_paused(
state: SystemStateRecord | None, *, now: datetime
) -> PauseStatus:
"""Restituisce lo stato della pausa rispetto a ``now``.
``state == None`` o ``auto_pause_until == None`` o
``auto_pause_until <= now`` ⇒ engine attivo.
"""
if state is None or state.auto_pause_until is None:
return PauseStatus(paused=False, until=None, reason=None)
until = state.auto_pause_until
if until.tzinfo is not None and now.tzinfo is None:
# Coerenza: se il valore persistito è tz-aware, normalizziamo.
return PauseStatus(
paused=until > now.replace(tzinfo=until.tzinfo),
until=until,
reason=state.auto_pause_reason,
)
return PauseStatus(
paused=until > now,
until=until,
reason=state.auto_pause_reason,
)
def pause_until(now: datetime, weeks: int) -> datetime:
"""Calcola la scadenza della pausa (``now + weeks``).
Estratto in funzione separata per facilitare i test e per ricordare
che la pausa è espressa in **settimane** (la strategia ha cron
settimanale; pause più corte non avrebbero modo di evitare una
settimana di entry).
"""
return now + timedelta(weeks=max(1, weeks))
def evaluate_drawdown_breach(
*,
cfg: AutoPauseConfig,
recent_pnl_usd: list[Decimal],
capital_usd: Decimal,
) -> AutoPauseDecision:
"""Decide se la pausa va armata ora dato il rolling P/L.
Regola: se la somma dei P/L delle ultime ``cfg.lookback_trades``
posizioni chiuse è negativa e in valore assoluto eccede
``cfg.max_drawdown_pct × capital_usd``, ritorna
``should_pause=True``. Tutte le altre condizioni → False.
``cfg.enabled=False`` → ritorna sempre False (filtro disabilitato).
Lookback insufficiente → ritorna False (non scattiamo finché non
abbiamo abbastanza storia per giudicare).
"""
threshold_pct = cfg.max_drawdown_pct
cumulative = sum((p for p in recent_pnl_usd), start=Decimal("0"))
if not cfg.enabled:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
if len(recent_pnl_usd) < cfg.lookback_trades:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
if capital_usd <= 0:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
# Solo perdite ci interessano: vincite cumulate non scattano la pausa.
if cumulative >= 0:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=cumulative / capital_usd,
threshold_pct=threshold_pct,
reason=None,
)
drawdown_pct = (-cumulative) / capital_usd
if drawdown_pct >= threshold_pct:
return AutoPauseDecision(
should_pause=True,
cumulative_pnl_usd=cumulative,
drawdown_pct=drawdown_pct,
threshold_pct=threshold_pct,
reason=(
f"rolling DD {drawdown_pct:.2%}{threshold_pct:.2%} "
f"(last {cfg.lookback_trades} trades, "
f"cumulative {cumulative} USD)"
),
)
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=drawdown_pct,
threshold_pct=threshold_pct,
reason=None,
)
+1 -68
View File
@@ -38,7 +38,6 @@ from cerbero_bite.core.entry_validator import (
from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check
from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts
from cerbero_bite.core.types import OptionQuote from cerbero_bite.core.types import OptionQuote
from cerbero_bite.runtime import auto_pause as auto_pause_module
from cerbero_bite.runtime.alert_manager import AlertManager from cerbero_bite.runtime.alert_manager import AlertManager
from cerbero_bite.runtime.dependencies import RuntimeContext from cerbero_bite.runtime.dependencies import RuntimeContext
from cerbero_bite.state import ( from cerbero_bite.state import (
@@ -65,7 +64,6 @@ _STATUS_NO_ENTRY = "no_entry"
_STATUS_BROKER_REJECT = "broker_reject" _STATUS_BROKER_REJECT = "broker_reject"
_STATUS_KILL_SWITCH = "kill_switch_armed" _STATUS_KILL_SWITCH = "kill_switch_armed"
_STATUS_HAS_OPEN = "has_open_position" _STATUS_HAS_OPEN = "has_open_position"
_STATUS_AUTO_PAUSED = "auto_paused"
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -324,28 +322,6 @@ async def run_entry_cycle(
) )
return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch") return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch")
# §7-bis (F): auto-pause circuit breaker. Read-only consultation
# of the persisted state — the breach evaluation runs later, after
# capital is known.
conn = connect_state(ctx.db_path)
try:
sys_state = ctx.repository.get_system_state(conn)
finally:
conn.close()
pause_status = auto_pause_module.is_paused(sys_state, now=when)
if pause_status.paused:
await alert.low(
source="entry_cycle",
message=(
f"auto-paused until {pause_status.until} "
f"({pause_status.reason or 'no reason'}) — skipping"
),
)
return EntryCycleResult(
status=_STATUS_AUTO_PAUSED,
reason=pause_status.reason or "auto_paused",
)
# Has open position? # Has open position?
conn = connect_state(ctx.db_path) conn = connect_state(ctx.db_path)
try: try:
@@ -368,44 +344,6 @@ async def run_entry_cycle(
) )
capital_usd = snap.portfolio_eur * eur_to_usd_rate capital_usd = snap.portfolio_eur * eur_to_usd_rate
# §7-bis (F): rolling drawdown breach evaluation. Se le ultime N
# posizioni chiuse hanno cumulato perdite oltre la soglia, armiamo
# la pausa e usciamo subito (l'entry di questo ciclo è saltata).
auto_cfg = cfg.auto_pause
if auto_cfg.enabled:
conn = connect_state(ctx.db_path)
try:
recent_pnls = ctx.repository.recent_closed_position_pnls_usd(
conn, limit=auto_cfg.lookback_trades
)
finally:
conn.close()
breach = auto_pause_module.evaluate_drawdown_breach(
cfg=auto_cfg,
recent_pnl_usd=recent_pnls,
capital_usd=capital_usd,
)
if breach.should_pause:
until = auto_pause_module.pause_until(when, auto_cfg.pause_weeks)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.set_auto_pause(
conn, until=until, reason=breach.reason
)
finally:
conn.close()
await alert.high(
source="entry_cycle",
message=(
f"auto-pause armed: {breach.reason} — paused until {until}"
),
)
return EntryCycleResult(
status=_STATUS_AUTO_PAUSED,
reason=breach.reason or "auto_paused",
)
# 2. Entry filters # 2. Entry filters
entry_ctx = EntryContext( entry_ctx = EntryContext(
capital_usd=capital_usd, capital_usd=capital_usd,
@@ -498,12 +436,7 @@ async def run_entry_cycle(
) )
quotes = await _build_quotes(ctx.deribit, chain_meta) quotes = await _build_quotes(ctx.deribit, chain_meta)
selection = select_strikes( selection = select_strikes(
chain=quotes, chain=quotes, bias=bias, spot=snap.spot_eth_usd, now=when, cfg=cfg
bias=bias,
spot=snap.spot_eth_usd,
now=when,
cfg=cfg,
dvol_now=snap.dvol, # §3.2 (A) — strike picker dipendente dal regime DVOL
) )
if selection is None: if selection is None:
await _record_decision( await _record_decision(
@@ -0,0 +1,185 @@
"""Periodic option-chain snapshot collector (§13).
Fetches the Deribit option chain for every strike entro la finestra
DTE configurata, prima del trigger entry settimanale (cron
``55 13 * * MON`` di default). Persiste un quote per ogni strumento
in ``option_chain_snapshots`` con un timestamp condiviso, che diventa
il dato di base per:
* il backtest non-stilizzato (vedi ``core/backtest.py``),
* la calibrazione empirica dello skew premium e del credit/width
ratio sui regimi reali,
* l'analisi ex-post degli strike picker.
Il collector è **best-effort**: se ``get_tickers`` fallisce per un
batch, gli altri batch passano comunque; se manca completamente la
chain, il job ritorna 0 senza alzare eccezioni e logga il problema.
Non chiama l'order book per ogni strike (sarebbe troppo costoso) —
``book_depth_top3`` resta NULL nel quote, il liquidity gate del live
lo legge al volo solo per gli strike che gli interessano.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import TYPE_CHECKING, Any
from cerbero_bite.state import connect, transaction
from cerbero_bite.state.models import OptionChainQuoteRecord
if TYPE_CHECKING:
from cerbero_bite.runtime.dependencies import RuntimeContext
__all__ = ["DEFAULT_BATCH_SIZE", "collect_option_chain_snapshot"]
_log = logging.getLogger("cerbero_bite.runtime.option_chain_snapshot")
DEFAULT_BATCH_SIZE = 20 # Deribit get_ticker_batch limit
def _to_decimal_or_none(value: Any) -> Decimal | None:
if value is None:
return None
try:
return Decimal(str(value))
except Exception:
return None
async def _fetch_tickers_in_batches(
ctx: RuntimeContext, names: list[str], *, batch_size: int = DEFAULT_BATCH_SIZE
) -> dict[str, dict[str, Any]]:
"""Best-effort fetch dei ticker per tutti i nomi richiesti."""
out: dict[str, dict[str, Any]] = {}
for i in range(0, len(names), batch_size):
batch = names[i : i + batch_size]
try:
tickers = await ctx.deribit.get_tickers(batch)
except Exception as exc:
_log.warning(
"get_tickers failed for batch starting %s: %s",
batch[0] if batch else "<empty>", exc,
)
continue
for t in tickers:
name = t.get("instrument_name") or t.get("instrument")
if isinstance(name, str):
out[name] = t
return out
async def collect_option_chain_snapshot(
ctx: RuntimeContext,
*,
asset: str = "ETH",
now: datetime | None = None,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> int:
"""Collect + persist a single chain snapshot for ``asset``. Returns
the number of quotes persisted (0 on best-effort failure).
Filtra le scadenze nella finestra ``[dte_min, dte_max]`` di
``cfg.structure`` per non sprecare richieste su scadenze che il
rule engine non userebbe mai.
"""
when = (now or datetime.now(UTC)).astimezone(UTC)
cfg = ctx.cfg
expiry_from = when + timedelta(days=cfg.structure.dte_min)
expiry_to = when + timedelta(days=cfg.structure.dte_max)
try:
chain = await ctx.deribit.options_chain(
currency=asset.upper(),
expiry_from=expiry_from,
expiry_to=expiry_to,
min_open_interest=int(cfg.liquidity.open_interest_min),
)
except Exception:
_log.exception("option chain fetch failed")
return 0
if not chain:
_log.info("option chain empty for %s in window", asset)
return 0
names = [meta.name for meta in chain]
tickers = await _fetch_tickers_in_batches(ctx, names, batch_size=batch_size)
quotes: list[OptionChainQuoteRecord] = []
for meta in chain:
ticker = tickers.get(meta.name)
if ticker is None:
# Lasciamo comunque la riga senza quote: utile sapere
# che lo strumento esisteva.
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
)
)
continue
greeks = ticker.get("greeks") or {}
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
bid=_to_decimal_or_none(ticker.get("bid")),
ask=_to_decimal_or_none(ticker.get("ask")),
mid=_to_decimal_or_none(ticker.get("mark_price")),
iv=_to_decimal_or_none(ticker.get("mark_iv")),
delta=_to_decimal_or_none(greeks.get("delta")),
gamma=_to_decimal_or_none(greeks.get("gamma")),
theta=_to_decimal_or_none(greeks.get("theta")),
vega=_to_decimal_or_none(greeks.get("vega")),
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
volume_24h=(
int(ticker["volume_24h"])
if ticker.get("volume_24h") is not None
else None
),
# book_depth_top3: NULL — non lo prendiamo per ogni
# strike per non saturare l'API. Il liquidity gate
# del live lo chiede on-the-fly per gli strike
# candidati al picker.
)
)
persisted = 0
try:
conn = connect(ctx.db_path)
try:
with transaction(conn):
persisted = ctx.repository.record_option_chain_snapshot(
conn, quotes
)
finally:
conn.close()
except Exception:
_log.exception("persist option chain snapshot failed")
return 0
_log.info("option_chain_snapshot persisted %d quote(s)", persisted)
return persisted
# Avoid unused import warning for asyncio in lint when only used as type
_ = asyncio
+21
View File
@@ -34,6 +34,9 @@ from cerbero_bite.runtime.market_snapshot_cycle import (
DEFAULT_ASSETS, DEFAULT_ASSETS,
collect_market_snapshot, collect_market_snapshot,
) )
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
@@ -53,6 +56,7 @@ _CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *" _CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *" _CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *" _CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry
_BACKUP_RETENTION_DAYS = 30 _BACKUP_RETENTION_DAYS = 30
@@ -217,6 +221,8 @@ class Orchestrator:
manual_actions_cron: str = _CRON_MANUAL_ACTIONS, manual_actions_cron: str = _CRON_MANUAL_ACTIONS,
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT, market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS, market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
backup_dir: Path | None = None, backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS, backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler: ) -> AsyncIOScheduler:
@@ -282,6 +288,14 @@ class Orchestrator:
await _safe("market_snapshot", _do) await _safe("market_snapshot", _do)
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
await _safe("option_chain_snapshot", _do)
jobs: list[JobSpec] = [ jobs: list[JobSpec] = [
JobSpec(name="health", cron=health_cron, coro_factory=_health), JobSpec(name="health", cron=health_cron, coro_factory=_health),
JobSpec(name="backup", cron=backup_cron, coro_factory=_backup), JobSpec(name="backup", cron=backup_cron, coro_factory=_backup),
@@ -309,6 +323,13 @@ class Orchestrator:
coro_factory=_market_snapshot, coro_factory=_market_snapshot,
) )
) )
jobs.append(
JobSpec(
name="option_chain_snapshot",
cron=option_chain_cron,
coro_factory=_option_chain_snapshot,
)
)
else: else:
_log.warning( _log.warning(
"data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS=" "data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS="
@@ -1,14 +0,0 @@
-- 0004_auto_pause.sql — circuit breaker su drawdown rolling (§7-bis F)
--
-- Aggiunge alla `system_state` il timestamp fino a cui l'engine è in
-- pausa automatica per via di un drawdown sopra soglia. NULL = engine
-- attivo. Quando il valore è nel futuro, il rule engine salta il
-- ciclo entry e logga la motivazione.
--
-- Indipendente dal kill_switch (che resta dedicato a errori tecnici
-- e a comandi manuali esplicitati). Le due tutele coesistono.
ALTER TABLE system_state ADD COLUMN auto_pause_until TEXT;
ALTER TABLE system_state ADD COLUMN auto_pause_reason TEXT;
PRAGMA user_version = 4;
@@ -0,0 +1,42 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
-- in quello stesso tick condividono il timestamp, così filtrare per
-- "lo snapshot del 2026-05-04 alle 13:55" è una semplice
-- WHERE timestamp = X.
CREATE TABLE option_chain_snapshots (
timestamp TEXT NOT NULL,
asset TEXT NOT NULL,
instrument_name TEXT NOT NULL,
strike TEXT NOT NULL,
expiry TEXT NOT NULL,
option_type TEXT NOT NULL CHECK (option_type IN ('C','P')),
bid TEXT,
ask TEXT,
mid TEXT,
iv TEXT,
delta TEXT,
gamma TEXT,
theta TEXT,
vega TEXT,
open_interest INTEGER,
volume_24h INTEGER,
book_depth_top3 INTEGER,
PRIMARY KEY (timestamp, instrument_name)
) WITHOUT ROWID;
CREATE INDEX idx_option_chain_asset_ts
ON option_chain_snapshots(asset, timestamp DESC);
CREATE INDEX idx_option_chain_expiry
ON option_chain_snapshots(asset, expiry);
PRAGMA user_version = 5;
+31 -2
View File
@@ -22,6 +22,7 @@ __all__ = [
"InstructionRecord", "InstructionRecord",
"ManualAction", "ManualAction",
"MarketSnapshotRecord", "MarketSnapshotRecord",
"OptionChainQuoteRecord",
"PositionRecord", "PositionRecord",
"PositionStatus", "PositionStatus",
"SystemStateRecord", "SystemStateRecord",
@@ -148,6 +149,36 @@ class MarketSnapshotRecord(BaseModel):
fetch_errors_json: str | None = None fetch_errors_json: str | None = None
class OptionChainQuoteRecord(BaseModel):
"""Row of the ``option_chain_snapshots`` table.
One row per (snapshot_ts, instrument) — the same ``timestamp`` is
shared by every quote prelevato nello stesso tick del cron. Tutti
i campi numerici sono opzionali perché il collector è
best-effort: un ticker mancante non invalida il resto della chain.
"""
model_config = ConfigDict(extra="forbid")
timestamp: datetime
asset: str
instrument_name: str
strike: Decimal
expiry: datetime
option_type: Literal["C", "P"]
bid: Decimal | None = None
ask: Decimal | None = None
mid: Decimal | None = None
iv: Decimal | None = None
delta: Decimal | None = None
gamma: Decimal | None = None
theta: Decimal | None = None
vega: Decimal | None = None
open_interest: int | None = None
volume_24h: int | None = None
book_depth_top3: int | None = None
class ManualAction(BaseModel): class ManualAction(BaseModel):
"""Row of the ``manual_actions`` table.""" """Row of the ``manual_actions`` table."""
@@ -184,5 +215,3 @@ class SystemStateRecord(BaseModel):
config_version: str config_version: str
started_at: datetime started_at: datetime
last_audit_hash: str | None = None last_audit_hash: str | None = None
auto_pause_until: datetime | None = None
auto_pause_reason: str | None = None
+130 -47
View File
@@ -24,6 +24,7 @@ from cerbero_bite.state.models import (
InstructionRecord, InstructionRecord,
ManualAction, ManualAction,
MarketSnapshotRecord, MarketSnapshotRecord,
OptionChainQuoteRecord,
PositionRecord, PositionRecord,
PositionStatus, PositionStatus,
SystemStateRecord, SystemStateRecord,
@@ -407,6 +408,103 @@ class Repository:
).fetchall() ).fetchall()
return [_row_to_market_snapshot(r) for r in rows] return [_row_to_market_snapshot(r) for r in rows]
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
def record_option_chain_snapshot(
self,
conn: sqlite3.Connection,
quotes: list[OptionChainQuoteRecord],
) -> int:
"""Bulk-insert dei quote di un singolo tick. Tutti i quote
condividono lo stesso ``timestamp``. Idempotente per
(timestamp, instrument_name)."""
if not quotes:
return 0
rows = [
(
_enc_dt(q.timestamp),
q.asset,
q.instrument_name,
_enc_dec(q.strike),
_enc_dt(q.expiry),
q.option_type,
_enc_dec(q.bid),
_enc_dec(q.ask),
_enc_dec(q.mid),
_enc_dec(q.iv),
_enc_dec(q.delta),
_enc_dec(q.gamma),
_enc_dec(q.theta),
_enc_dec(q.vega),
q.open_interest,
q.volume_24h,
q.book_depth_top3,
)
for q in quotes
]
conn.executemany(
"INSERT OR REPLACE INTO option_chain_snapshots("
"timestamp, asset, instrument_name, strike, expiry, option_type, "
"bid, ask, mid, iv, delta, gamma, theta, vega, "
"open_interest, volume_24h, book_depth_top3) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
rows,
)
return len(rows)
def list_option_chain_snapshots(
self,
conn: sqlite3.Connection,
*,
asset: str,
start: datetime | None = None,
end: datetime | None = None,
expiry_from: datetime | None = None,
expiry_to: datetime | None = None,
limit: int = 50000,
) -> list[OptionChainQuoteRecord]:
clauses: list[str] = ["asset = ?"]
params: list[Any] = [asset]
if start is not None:
clauses.append("timestamp >= ?")
params.append(_enc_dt(start))
if end is not None:
clauses.append("timestamp <= ?")
params.append(_enc_dt(end))
if expiry_from is not None:
clauses.append("expiry >= ?")
params.append(_enc_dt(expiry_from))
if expiry_to is not None:
clauses.append("expiry <= ?")
params.append(_enc_dt(expiry_to))
params.append(int(limit))
rows = conn.execute(
f"SELECT * FROM option_chain_snapshots "
f"WHERE {' AND '.join(clauses)} "
f"ORDER BY timestamp DESC, instrument_name ASC LIMIT ?",
params,
).fetchall()
return [_row_to_option_chain_quote(r) for r in rows]
def latest_option_chain_timestamp(
self,
conn: sqlite3.Connection,
*,
asset: str,
) -> datetime | None:
"""Timestamp dell'ultimo snapshot raccolto per ``asset``,
utile per stimare la freschezza del dato dalla GUI."""
row = conn.execute(
"SELECT timestamp FROM option_chain_snapshots "
"WHERE asset = ? ORDER BY timestamp DESC LIMIT 1",
(asset,),
).fetchone()
if row is None:
return None
return _dec_dt(row["timestamp"])
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# manual_actions # manual_actions
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -488,16 +586,6 @@ class Repository:
last_audit_hash=( last_audit_hash=(
row["last_audit_hash"] if "last_audit_hash" in keys else None row["last_audit_hash"] if "last_audit_hash" in keys else None
), ),
auto_pause_until=(
_dec_dt(row["auto_pause_until"])
if "auto_pause_until" in keys
else None
),
auto_pause_reason=(
row["auto_pause_reason"]
if "auto_pause_reason" in keys
else None
),
) )
def set_last_audit_hash( def set_last_audit_hash(
@@ -536,43 +624,6 @@ class Repository:
(_enc_dt(now),), (_enc_dt(now),),
) )
def set_auto_pause(
self,
conn: sqlite3.Connection,
*,
until: datetime | None,
reason: str | None,
) -> None:
"""Imposta o azzera la pausa automatica (§7-bis F).
``until = None`` annulla la pausa (l'engine torna attivo).
Il setter è idempotente: chiamarlo con un until già nel passato
è equivalente a clear.
"""
conn.execute(
"UPDATE system_state SET auto_pause_until = ?, "
"auto_pause_reason = ? WHERE id = 1",
(_enc_dt(until) if until is not None else None, reason),
)
def recent_closed_position_pnls_usd(
self, conn: sqlite3.Connection, *, limit: int
) -> list[Decimal]:
"""Ritorna la lista dei pnl_usd delle ultime ``limit`` posizioni chiuse,
ordinate dalla più recente alla più vecchia. Posizioni con
``pnl_usd`` ``NULL`` (es. chiuse di emergenza senza P/L noto)
sono saltate. Usato dal circuit breaker §7-bis F.
"""
if limit <= 0:
return []
rows = conn.execute(
"SELECT pnl_usd FROM positions "
"WHERE closed_at IS NOT NULL AND pnl_usd IS NOT NULL "
"ORDER BY closed_at DESC LIMIT ?",
(limit,),
).fetchall()
return [Decimal(row["pnl_usd"]) for row in rows]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Row → model converters # Row → model converters
@@ -692,6 +743,38 @@ def _row_to_market_snapshot(row: sqlite3.Row) -> MarketSnapshotRecord:
) )
def _row_to_option_chain_quote(row: sqlite3.Row) -> OptionChainQuoteRecord:
return OptionChainQuoteRecord(
timestamp=_dec_dt_required(row["timestamp"]),
asset=row["asset"],
instrument_name=row["instrument_name"],
strike=_dec_dec_required(row["strike"]),
expiry=_dec_dt_required(row["expiry"]),
option_type=row["option_type"],
bid=_dec_dec(row["bid"]),
ask=_dec_dec(row["ask"]),
mid=_dec_dec(row["mid"]),
iv=_dec_dec(row["iv"]),
delta=_dec_dec(row["delta"]),
gamma=_dec_dec(row["gamma"]),
theta=_dec_dec(row["theta"]),
vega=_dec_dec(row["vega"]),
open_interest=(
int(row["open_interest"])
if row["open_interest"] is not None
else None
),
volume_24h=(
int(row["volume_24h"]) if row["volume_24h"] is not None else None
),
book_depth_top3=(
int(row["book_depth_top3"])
if row["book_depth_top3"] is not None
else None
),
)
def _dec_dec_required(value: Any) -> Decimal: def _dec_dec_required(value: Any) -> Decimal:
out = _dec_dec(value) out = _dec_dec(value)
if out is None: if out is None:
+2 -26
View File
@@ -28,8 +28,8 @@
# 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante** # 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante**
# di cosa otterresti DOPO quel lavoro di codice. # di cosa otterresti DOPO quel lavoro di codice.
config_version: "1.2.0-aggressiva" config_version: "1.0.0-aggressiva"
config_hash: "e3a583cabfaa4781cd0ebcc8b62fc8f200648153738f93ab8726b062e46cacef" config_hash: "b931a2b96fbc149b21cae84a196ee8bad10220b5ee8fa9ab0ed06ae52d7dc531"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -78,13 +78,6 @@ structure:
distance_otm_pct_min: "0.15" distance_otm_pct_min: "0.15"
distance_otm_pct_max: "0.25" distance_otm_pct_max: "0.25"
# §3.2 (A): step-function delta-target per regime DVOL.
# DVOL bassa (≤50) → più premio; alta (>70) → più safety.
delta_by_dvol:
- {dvol_under: "50", delta_target: "0.15", delta_min: "0.13", delta_max: "0.17"}
- {dvol_under: "70", delta_target: "0.12", delta_min: "0.10", delta_max: "0.15"}
- {dvol_under: "90", delta_target: "0.10", delta_min: "0.08", delta_max: "0.12"}
spread_width: spread_width:
target_pct_of_spot: "0.04" target_pct_of_spot: "0.04"
min_pct_of_spot: "0.03" min_pct_of_spot: "0.03"
@@ -123,14 +116,6 @@ exit:
delta_breach_threshold: "0.30" delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05" adverse_move_4h_pct: "0.05"
# §7-bis (D): vol-harvest abilitato a 15 punti vol di crollo.
vol_harvest_dvol_decrease: "15"
# §7.1bis (C): scala graduata di profit-take. Pipeline runtime
# non ancora attiva; tenuta vuota fino al merge della
# partial-close pipeline.
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *" monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30 user_confirmation_timeout_min: 30
@@ -139,15 +124,6 @@ exit:
- "CLOSE_VOL" - "CLOSE_VOL"
- "CLOSE_DELTA" - "CLOSE_DELTA"
# §7-bis (F): circuit breaker abilitato. Soglia 15% (più tollerante
# del default conservativo perché la size aggressiva ha volatilità
# attesa più alta).
auto_pause:
enabled: true
lookback_trades: 5
max_drawdown_pct: "0.15"
pause_weeks: 2
execution: execution:
environment: "testnet" environment: "testnet"
eur_to_usd: "1.075" eur_to_usd: "1.075"
+2 -11
View File
@@ -15,8 +15,8 @@
# cerbero-bite config hash --file strategy.conservativa.yaml # cerbero-bite config hash --file strategy.conservativa.yaml
# e bumpare config_version. # e bumpare config_version.
config_version: "1.2.0-conservativa" config_version: "1.0.0-conservativa"
config_hash: "fa09dad9cfa40a8ab006ec85157635603e0c4b6381ecd5d721504e00c4119a1b" config_hash: "eff824281bbb538fba49434d8cc4b9c37675bc73d60e351293e263cc7e7b29ef"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -100,9 +100,6 @@ exit:
delta_breach_threshold: "0.30" delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05" adverse_move_4h_pct: "0.05"
vol_harvest_dvol_decrease: "0"
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *" monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30 user_confirmation_timeout_min: 30
@@ -111,12 +108,6 @@ exit:
- "CLOSE_VOL" - "CLOSE_VOL"
- "CLOSE_DELTA" - "CLOSE_DELTA"
auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_weeks: 2
execution: execution:
environment: "testnet" environment: "testnet"
eur_to_usd: "1.075" eur_to_usd: "1.075"
+2 -17
View File
@@ -6,8 +6,8 @@
# config hash), and lands as a separate commit with the motivation in # config hash), and lands as a separate commit with the motivation in
# the commit message. # the commit message.
config_version: "1.2.0" config_version: "1.0.0"
config_hash: "33263a313b26b24b41269f93f93783784451ac9b4b6460005b95c2fb3624fcdc" config_hash: "4c2be4c51c849ed58fa22ec2b302016c453894dd0964b6d05445ab1b723e2d10"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -96,13 +96,6 @@ exit:
delta_breach_threshold: "0.30" delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05" adverse_move_4h_pct: "0.05"
# §7-bis (D): vol-collapse harvest. 0 = disabilitato.
vol_harvest_dvol_decrease: "0"
# §7.1bis (C): scala graduata di profit-take. Vuoto = chiusura
# atomica. Pipeline runtime non ancora attiva (hook futuro).
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *" monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30 user_confirmation_timeout_min: 30
@@ -111,14 +104,6 @@ exit:
- "CLOSE_VOL" - "CLOSE_VOL"
- "CLOSE_DELTA" - "CLOSE_DELTA"
# §7-bis (F): circuit breaker su drawdown rolling. Disabilitato di
# default — abilitarlo solo dopo abbastanza posizioni chiuse.
auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_weeks: 2
execution: execution:
environment: "testnet" # testnet|mainnet — kill switch on broker mismatch environment: "testnet" # testnet|mainnet — kill switch on broker mismatch
eur_to_usd: "1.075" # default FX rate for sizing engine; override at boot eur_to_usd: "1.075" # default FX rate for sizing engine; override at boot
+1
View File
@@ -129,6 +129,7 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None:
"backup", "backup",
"manual_actions", "manual_actions",
"market_snapshot", "market_snapshot",
"option_chain_snapshot",
} }
-157
View File
@@ -1,157 +0,0 @@
"""TDD per :mod:`cerbero_bite.runtime.auto_pause` (§7-bis F)."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.config.schema import AutoPauseConfig
from cerbero_bite.runtime.auto_pause import (
evaluate_drawdown_breach,
is_paused,
pause_until,
)
from cerbero_bite.state.models import SystemStateRecord
_NOW = datetime(2026, 5, 1, 14, 0, tzinfo=UTC)
def _state(**overrides: object) -> SystemStateRecord:
base: dict[str, object] = {
"kill_switch": 0,
"last_health_check": _NOW,
"config_version": "1.0.0",
"started_at": _NOW - timedelta(hours=1),
}
base.update(overrides)
return SystemStateRecord(**base) # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# is_paused
# ---------------------------------------------------------------------------
def test_is_paused_returns_false_when_state_is_none() -> None:
status = is_paused(None, now=_NOW)
assert status.paused is False
def test_is_paused_returns_false_when_until_is_none() -> None:
status = is_paused(_state(), now=_NOW)
assert status.paused is False
def test_is_paused_returns_true_when_until_in_future() -> None:
status = is_paused(
_state(auto_pause_until=_NOW + timedelta(weeks=2),
auto_pause_reason="DD breach"),
now=_NOW,
)
assert status.paused is True
assert status.reason == "DD breach"
def test_is_paused_returns_false_when_until_in_past() -> None:
status = is_paused(
_state(auto_pause_until=_NOW - timedelta(seconds=1)),
now=_NOW,
)
assert status.paused is False
# ---------------------------------------------------------------------------
# pause_until
# ---------------------------------------------------------------------------
def test_pause_until_adds_weeks() -> None:
until = pause_until(_NOW, weeks=2)
assert until == _NOW + timedelta(weeks=2)
def test_pause_until_clamps_to_one_week_minimum() -> None:
# weeks <= 0 deve cmq dare almeno 1 settimana di pausa, altrimenti
# la cron settimanale potrebbe scattare comunque.
assert pause_until(_NOW, weeks=0) == _NOW + timedelta(weeks=1)
assert pause_until(_NOW, weeks=-3) == _NOW + timedelta(weeks=1)
# ---------------------------------------------------------------------------
# evaluate_drawdown_breach
# ---------------------------------------------------------------------------
def _cfg(**overrides: object) -> AutoPauseConfig:
base: dict[str, object] = {
"enabled": True,
"lookback_trades": 5,
"max_drawdown_pct": Decimal("0.10"),
"pause_weeks": 2,
}
base.update(overrides)
return AutoPauseConfig(**base) # type: ignore[arg-type]
def test_drawdown_breach_when_enabled_and_threshold_exceeded() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-50"), Decimal("-60"), Decimal("-40"),
Decimal("-30"), Decimal("-20")], # cum 200 USD
capital_usd=Decimal("1500"),
)
# |200| / 1500 = 0.133 > 0.10
assert decision.should_pause is True
assert decision.reason is not None
assert "rolling DD" in decision.reason
def test_no_breach_when_filter_disabled() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(enabled=False),
recent_pnl_usd=[Decimal("-200")] * 5, # massacro
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_lookback_insufficient() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(lookback_trades=5),
recent_pnl_usd=[Decimal("-100")] * 3, # solo 3 trade, serve 5
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_cumulative_positive() -> None:
# Anche con tante perdite, se la somma è positiva non scattiamo.
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-100"), Decimal("-50"),
Decimal("300"), Decimal("-20"), Decimal("-10")],
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_below_threshold() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-30")] * 5, # cum 150 / 1500 = 10% esatto
capital_usd=Decimal("1500"),
)
# esattamente alla soglia (>=) ⇒ pausa armata
assert decision.should_pause is True
def test_no_breach_when_capital_zero_or_negative() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-100")] * 5,
capital_usd=Decimal("0"),
)
assert decision.should_pause is False
-143
View File
@@ -329,146 +329,3 @@ def test_build_bear_call_breakeven_above_short_strike(
# breakeven = 3525 + 15 = 3540 # breakeven = 3525 + 15 = 3540
assert proposal.breakeven == Decimal("3540") assert proposal.breakeven == Decimal("3540")
assert proposal.spread_type == "bear_call" assert proposal.spread_type == "bear_call"
# ---------------------------------------------------------------------------
# §3.2 (A): dynamic delta target by DVOL regime
# ---------------------------------------------------------------------------
def _cfg_with_delta_bands(cfg: StrategyConfig) -> StrategyConfig:
"""Profilo con step-function delta su DVOL.
Vol bassa (≤50) → delta 0.15 (più premio), vol media (≤70) →
0.12 (default), vol alta (≤90) → 0.10 (più safety distance).
"""
from cerbero_bite.config.schema import (
DeltaByDvolBand,
ShortStrikeSpec,
StructureConfig,
)
bands = [
DeltaByDvolBand(
dvol_under=Decimal("50"),
delta_target=Decimal("0.15"),
delta_min=Decimal("0.13"),
delta_max=Decimal("0.17"),
),
DeltaByDvolBand(
dvol_under=Decimal("70"),
delta_target=Decimal("0.12"),
delta_min=Decimal("0.10"),
delta_max=Decimal("0.15"),
),
DeltaByDvolBand(
dvol_under=Decimal("90"),
delta_target=Decimal("0.10"),
delta_min=Decimal("0.08"),
delta_max=Decimal("0.12"),
),
]
new_short = ShortStrikeSpec(
**{**cfg.structure.short_strike.model_dump(), "delta_by_dvol": bands}
)
return cfg.model_copy(
update={
"structure": StructureConfig(
**{**cfg.structure.model_dump(exclude={"short_strike"}),
"short_strike": new_short}
)
}
)
def _bull_put_chain_wide(now_dt: datetime) -> list[OptionQuote]:
"""Chain con shorts e longs per delta 0.10, 0.12, 0.15.
I mid sono tarati per superare il credit/width ≥ 30% per ogni
accoppiamento short→long testato (vedi commento §3.4).
"""
return [
# Shorts a delta 0.10 / 0.12 / 0.15 in OTM range [15-25%].
_quote(strike="2535", delta="-0.15", mid="0.026", now_dt=now_dt),
_quote(strike="2475", delta="-0.12", mid="0.020", now_dt=now_dt),
_quote(strike="2400", delta="-0.10", mid="0.015", now_dt=now_dt),
# Long candidati ~4% sotto ciascuno short.
_quote(strike="2415", delta="-0.10", mid="0.012", now_dt=now_dt),
_quote(strike="2355", delta="-0.08", mid="0.006", now_dt=now_dt),
_quote(strike="2280", delta="-0.06", mid="0.002", now_dt=now_dt),
]
def test_dynamic_delta_low_dvol_picks_higher_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=40 → banda con delta_target=0.15."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("40"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.15")
def test_dynamic_delta_mid_dvol_picks_default_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=60 → banda con delta_target=0.12."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("60"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.12")
def test_dynamic_delta_high_dvol_picks_lower_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=85 → banda con delta_target=0.10 (più safety distance)."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("85"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.10")
def test_dynamic_delta_disabled_default_uses_static_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""delta_by_dvol vuoto (default) → comportamento invariato."""
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg, # golden config: delta_by_dvol=[]
dvol_now=Decimal("40"),
)
assert res is not None
short, _ = res
# Delta target statico = 0.12, quindi torna lo strike a -0.12.
assert short.delta == Decimal("-0.12")
+1 -1
View File
@@ -68,7 +68,7 @@ def test_compute_hash_is_independent_of_recorded_hash_value(tmp_path: Path) -> N
def test_load_repo_strategy_yaml(tmp_path: Path) -> None: def test_load_repo_strategy_yaml(tmp_path: Path) -> None:
"""The committed strategy.yaml validates with the recorded hash.""" """The committed strategy.yaml validates with the recorded hash."""
result = load_strategy(REPO_ROOT / "strategy.yaml") result = load_strategy(REPO_ROOT / "strategy.yaml")
assert result.config.config_version == "1.2.0" assert result.config.config_version == "1.0.0"
assert result.config.sizing.kelly_fraction == Decimal("0.13") assert result.config.sizing.kelly_fraction == Decimal("0.13")
assert result.computed_hash == result.config.config_hash assert result.computed_hash == result.config.config_hash
-88
View File
@@ -271,91 +271,3 @@ def test_iron_condor_adverse_move_either_direction(cfg: StrategyConfig) -> None:
) )
res = evaluate(snap, cfg) res = evaluate(snap, cfg)
assert res.action == "CLOSE_AVERSE" assert res.action == "CLOSE_AVERSE"
# ---------------------------------------------------------------------------
# §7-bis (D): vol-collapse harvest
# ---------------------------------------------------------------------------
def _harvest_cfg(
cfg: StrategyConfig, *, threshold: str = "15"
) -> StrategyConfig:
"""Clona la golden config con la soglia di vol-harvest abilitata."""
from cerbero_bite.config import ExitConfig
return cfg.model_copy(
update={
"exit": ExitConfig(
**{
**cfg.exit.model_dump(),
"vol_harvest_dvol_decrease": Decimal(threshold),
}
)
}
)
def test_vol_harvest_disabled_by_default_does_not_fire(cfg: StrategyConfig) -> None:
# Default: vol_harvest_dvol_decrease = 0 ⇒ filtro disabilitato.
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022", # in profit (debit < credit)
dvol_at_entry="60",
dvol_now="40", # crollato di 20 punti
)
res = evaluate(snap, cfg)
assert res.action == "HOLD"
def test_vol_harvest_fires_when_dvol_collapsed_in_profit(
cfg: StrategyConfig,
) -> None:
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022", # in profit ma sopra profit_take 50%
dvol_at_entry="60",
dvol_now="42", # 18, supera la soglia 15
)
res = evaluate(snap, harvest)
assert res.action == "CLOSE_VOL_HARVEST"
assert "harvest" in res.reason
def test_vol_harvest_does_not_fire_when_in_loss(cfg: StrategyConfig) -> None:
# Anche se DVOL crolla, se siamo in perdita non vogliamo harvest:
# è una funzione di "esci con il profitto in mano", non un panico.
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.040", # debit > credit ⇒ in perdita
dvol_at_entry="60",
dvol_now="42",
)
res = evaluate(snap, harvest)
assert res.action != "CLOSE_VOL_HARVEST"
def test_vol_harvest_does_not_fire_below_threshold(cfg: StrategyConfig) -> None:
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022",
dvol_at_entry="60",
dvol_now="50", # 10, sotto la soglia 15
)
res = evaluate(snap, harvest)
assert res.action == "HOLD"
def test_profit_take_wins_over_vol_harvest(cfg: StrategyConfig) -> None:
# Quando il profit-take è già colpito, non passiamo per vol-harvest.
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.014", # ≤ 50% credit ⇒ profit-take
dvol_at_entry="60",
dvol_now="42",
)
res = evaluate(snap, harvest)
assert res.action == "CLOSE_PROFIT"
@@ -0,0 +1,134 @@
"""TDD per :mod:`cerbero_bite.runtime.option_chain_snapshot_cycle`."""
from __future__ import annotations
from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
from cerbero_bite.clients.deribit import InstrumentMeta
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.state.models import OptionChainQuoteRecord
_NOW = datetime(2026, 5, 4, 13, 55, tzinfo=UTC)
def _meta(name: str, strike: int, expiry_dte: int = 18) -> InstrumentMeta:
expiry = _NOW.replace(hour=8, minute=0, second=0)
expiry = expiry.replace(day=expiry.day) + (
# add days
__import__("datetime").timedelta(days=expiry_dte)
)
return InstrumentMeta(
name=name,
strike=Decimal(str(strike)),
expiry=expiry,
option_type="P",
open_interest=Decimal("100"),
tick_size=Decimal("0.0005"),
min_trade_amount=Decimal("1"),
)
def _ticker(name: str, *, mark: float = 0.020, bid: float = 0.018,
ask: float = 0.022, delta: float = -0.12) -> dict:
return {
"instrument_name": name,
"bid": bid,
"ask": ask,
"mark_price": mark,
"mark_iv": 60.0,
"volume_24h": 50,
"greeks": {
"delta": delta,
"gamma": 0.001,
"theta": -0.0005,
"vega": 0.10,
},
}
@pytest.fixture
def cfg() -> object:
from cerbero_bite.config import golden_config
return golden_config()
@pytest.fixture
def fake_ctx(cfg: object) -> MagicMock:
"""Mock minimal RuntimeContext."""
ctx = MagicMock()
ctx.cfg = cfg
ctx.db_path = ":memory:"
return ctx
@pytest.mark.asyncio
async def test_collector_persists_one_quote_per_instrument(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475), _meta("ETH-21MAY26-2400-P", 2400)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(
return_value=[_ticker(m.name) for m in metas]
)
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, asset="ETH", now=_NOW)
assert n == 2
assert len(persisted) == 1
assert {q.instrument_name for q in persisted[0]} == {
"ETH-21MAY26-2475-P", "ETH-21MAY26-2400-P",
}
# Tutti i quote condividono il timestamp del cron tick.
assert all(q.timestamp == _NOW for q in persisted[0])
@pytest.mark.asyncio
async def test_collector_handles_missing_tickers_with_null_fields(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(return_value=[]) # vuoto
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 1
assert persisted[0][0].mid is None # ticker mancante ⇒ campi NULL
assert persisted[0][0].instrument_name == "ETH-21MAY26-2475-P"
@pytest.mark.asyncio
async def test_collector_returns_zero_when_chain_empty(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(return_value=[])
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0
@pytest.mark.asyncio
async def test_collector_swallows_chain_fetch_failure(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(side_effect=RuntimeError("boom"))
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0 # best-effort: non rilancia