13 Commits

Author SHA1 Message Date
root dabcc8d15b docs: aggiornamento Phase 5 — IV-RV gate, F+D+A, backtest, option chain
- 01-strategy-rules.md:
  * §2.8 (filtri quant: dealer gamma + liquidation risk)
  * §2.9 (IV richness gate, opt-in, default disabled)
  * §3.2 — variante delta_by_dvol step-function
  * §7-bis.1 (vol-collapse harvest D)
  * §7-bis.2 (graduated profit-take C — scaffolding)
  * §7-bis.3 (auto-pause su drawdown F)

- 05-data-model.md:
  * `system_state.auto_pause_until / _reason` (migration 0004)
  * Nuova tabella `option_chain_snapshots` (migration 0005)
  * Tabella migrations completa (1→5)

- 13-strategia-spiegata.md:
  * §4-quinquies — catena opzioni storica (Phase 5):
    cosa raccoglie, cosa sblocca, CLI `option-chain
    trigger|analyze`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 21:29:00 +00:00
root 7fdd8b47a5 fix(gui): percentili Calibrazione in riga compatta, no truncation
I valori percentili (es. -0.0298, 0.05323) renderizzati come
``st.metric`` su 7 colonne venivano tagliati su viewport stretti:
ogni metric ha label sopra, font fisso, nessun shrink. Sostituito
con render markdown inline a font 0.85rem, single-line, scrollabile
orizzontalmente se serve. Tutti e 7 i percentili visibili senza
troncamento e senza wrap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 21:21:32 +00:00
root a1a9f74ed2 Merge feat/option-chain-snapshots 2026-05-01 21:08:28 +00:00
root a9df399db4 Merge feat/backtest-engine 2026-05-01 21:08:22 +00:00
root e06f4d5c96 Merge feat/strategy-improvements-fdac
# Conflicts:
#	src/cerbero_bite/gui/pages/7_📚_Strategia.py
#	strategy.aggressiva.yaml
#	strategy.conservativa.yaml
#	strategy.yaml
#	tests/unit/test_config_loader.py
2026-05-01 21:08:12 +00:00
root f24511fcad Merge feat/iv-rv-hard-gate 2026-05-01 21:06:32 +00:00
root 954baaa354 feat(cli): comando option-chain (trigger + analyze) per la catena opzioni
Espone direttamente da CLI le due operazioni più utili sui dati di
``option_chain_snapshots`` raccolti dal cron settimanale:

- ``cerbero-bite option-chain trigger`` — esegue UNA volta il
  collector della catena. Riusa la stessa pipeline schedulata (cron
  ``55 13 * * MON``) ma on-demand. Utile per popolare il DB senza
  aspettare lunedì.
- ``cerbero-bite option-chain analyze [--bias bull_put|bear_call]`` —
  legge l'ultimo snapshot, simula il selector di strike
  (``select_strikes``) con la strategy passata e stampa una tabella
  con: short/long strike, delta, width, credito reale, ratio
  credit/width, e PASS/FAIL del gate ``credit_to_width_ratio_min``.

Il comando ``analyze`` rende immediatamente actionable la catena
appena raccolta: invece di stime ex-ante via Black-Scholes (modulo
``core/backtest.py``), legge i mid REALI di Deribit e dice "il rule
engine aprirebbe questo trade qui? credit/width ratio passa o no?".

Esempio di output sui primi snapshot raccolti (regime ETH ~2200,
DTE ~14g):

    Snapshot del 2026-05-01T20:53:49 — 21 quote totali
    Il rule engine NON aprirebbe trade con questa catena
    (no strike compatibile coi gate delta/distance/width/credit-ratio).

Conferma empirica del messaggio del documento ``13-strategia-spiegata``:
con delta target 0.12 + width 4% + credit/width ≥ 30%, il regime
attuale di ETH options non è abbastanza ricco per produrre trade —
serve calibrare soglie o aspettare un regime IV più alto.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:57:40 +00:00
root 3e46169278 fix(migrations): rinomina 0004 → 0005 per coesistenza con auto_pause
La migrazione `0004_option_chain_snapshots.sql` collide con quella
parallela `0004_auto_pause.sql` del PR `feat/strategy-improvements-fdac`:
entrambe puntano allo stesso slot e bumpano user_version a 4.

Rinominata a 0005 (con `PRAGMA user_version = 5`) così le due
migrazioni possono coesistere senza conflitti, indipendentemente
dall'ordine di merge dei due PR. Quando i due PR landeranno in main,
basterà conservare la sequenza 0004 (auto_pause) → 0005 (option_chain).

Verificato in locale: deploy con DB già a v4 (post-FDAC) ora applica
correttamente la migrazione e crea la tabella `option_chain_snapshots`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:52:11 +00:00
root c0a0ee416f feat(state+runtime): option_chain_snapshots — catena opzioni storica per backtest reale
Aggiunge la persistence della option chain Deribit con cron settimanale
``55 13 * * MON`` (5 minuti prima del trigger entry alle 14:00 UTC),
sbloccando il backtest non-stilizzato e la calibrazione empirica
dello skew premium.

**Schema (migrazione 0004)**

Nuova tabella ``option_chain_snapshots`` con primary key composta
``(timestamp, instrument_name)`` — tutti i quote prelevati nello
stesso tick condividono il timestamp, così le query "lo snapshot del
2026-05-04 alle 13:55" diventano una singola WHERE timestamp = X.
Indici su (asset, timestamp DESC) e (asset, expiry) per supportare
sia listing recenti sia query per scadenza specifica.

Campi: instrument_name, strike, expiry, option_type (C/P), bid, ask,
mid, iv, delta, gamma, theta, vega, open_interest, volume_24h,
book_depth_top3. Tutti i numerici sono nullable: il collector è
best-effort, un ticker mancante produce comunque una riga (utile
per sapere che lo strumento esisteva ma non era quotato).

**Modello + repository**

- ``OptionChainQuoteRecord`` (Pydantic, in ``state/models.py``).
- ``Repository.record_option_chain_snapshot`` (bulk insert
  idempotente).
- ``Repository.list_option_chain_snapshots`` (filtri su asset,
  timestamp window, expiry window, limit default 50000).
- ``Repository.latest_option_chain_timestamp`` (freshness check
  per dashboard GUI).

**Collector**

Nuovo ``runtime/option_chain_snapshot_cycle.py`` che:

1. Calcola la finestra scadenze ``[now+dte_min, now+dte_max]`` da
   ``cfg.structure``: niente richieste su scadenze che il rule
   engine non userebbe mai.
2. Chiama ``deribit.options_chain()`` con
   ``min_open_interest=cfg.liquidity.open_interest_min``.
3. Batch ``deribit.get_tickers()`` (max 20 per call, limite Deribit)
   con error-isolation per batch — un batch fallito non blocca
   gli altri.
4. NON chiama l'order book per ogni strike (rate-limit guard);
   ``book_depth_top3`` resta NULL e il liquidity gate live lo
   chiede on-the-fly per gli strike candidati al picker.

Best-effort end-to-end: chain assente, get_tickers giù, persist
fallito → ritorna 0 senza alzare eccezioni, logga sempre.

**Schedulazione**

Wired in ``Orchestrator.install_scheduler`` come job parallelo a
``market_snapshot``, attivo solo quando
``ENABLE_DATA_ANALYSIS=true``. Cron parametrizzabile via il nuovo
kwarg ``option_chain_cron`` (default ``55 13 * * MON``).

**Test**

- 4 unit test del collector (happy path, ticker mancante, chain
  vuota, fetch fail best-effort) con mock di RuntimeContext.
- Aggiornato ``test_install_scheduler_registers_canonical_jobs``
  per includere il nuovo job nel set canonico.

**Cosa sblocca**

- Backtest non-stilizzato: il PR ``feat/backtest-engine`` può
  dropparsi il modello BS+skew_premium e leggere prezzi reali
  ``mid`` dalla chain registrata.
- Calibrazione empirica dello skew premium (hardcoded a 1.5 nel
  backtest stilizzato): plot del rapporto fra quote reali Deribit
  e BS per delta/expiry, regressione → valore data-driven.
- Validazione ex-post: "il delta-0.12 era davvero a 25% OTM in
  quella settimana?" diventa una query SELECT.
- Dimensione attesa: ~50 strike × 3 scadenze × 1 snapshot/settimana
  × 17 colonne ≈ 12 KB/settimana, ~600 KB/anno. Trascurabile.

Suite: 409 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:44:49 +00:00
root 18cc27a76e feat(gui): simulazione P/L con effetti dei miglioramenti FDAC + IV-RV
Estende il pannello "💰 P/L atteso" della pagina `📚 Strategia` per
applicare gli effetti stimati di IV-RV gate, A (delta dinamico),
D (vol-harvest) e F (auto-pause) leggendoli direttamente dai
`strategy.*.yaml` di ciascun profilo.

- Nuova `_detect_features(strategy)` che ispeziona la config:
    A → `short_strike.delta_by_dvol` non vuoto
    D → `exit.vol_harvest_dvol_decrease > 0`
    F → `auto_pause.enabled`
    IV → `entry.iv_minus_rv_filter_enabled`
- `_compute_pl` accetta ora un dict `features` opzionale e applica:
    IV: +5 pp win-rate, −25% trade/anno (skip-week aggressivo)
    A: +1.5 pp win-rate, sl_loss × 0.95 (strike picking migliore)
    D: 5% trade convertiti da loss a harvest exit (+0.20×credito)
    F: −8% trade/anno (skip-week dopo streak)
- `_render_profile_card` mostra ora:
    badge "🟢 Miglioramenti attivi" con la lista per profilo,
    delta vs base in E[trade] e P/L annuo,
    help con win_rate effettivo / prob_loss / trade/anno.
- Checkbox "Applica effetti dei miglioramenti" (default ON) per
  switchare tra simulazione realistica e formula base.
- Nuova mini-tabella "Contributo marginale di ogni feature": per
  ogni miglioramento mostra ΔP/L annuo e ΔAPR isolando l'effetto
  del singolo feature, con marker " attiva nel YAML".
- Sensibilità win-rate ora applica le feature attive ai due profili.

Effetti dichiarati come **stime ex-ante** dalla letteratura
short-vol systematic; i valori puntuali (+5 pp win, etc.) andranno
calibrati sul dataset accumulato.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:17:24 +00:00
root 1c6baaee83 feat(strategy): F+D+A miglioramenti — auto-pause, vol-harvest, delta dinamico
Implementa tre miglioramenti dalla roadmap di "📚 Strategia" + scaffolding del quarto.
Tutti retro-compatibili: i defaults della golden config disabilitano le nuove funzioni
così il comportamento attuale resta invariato finché l'operatore non le accende
esplicitamente in `strategy.yaml`. Il profilo `strategy.aggressiva.yaml` opta-in
agli incrementi più impattanti.

**F — Auto-pause su drawdown rolling (§7-bis)**

Circuit breaker sopra il kill-switch tecnico. Quando le ultime N posizioni
chiuse hanno cumulato perdite oltre `max_drawdown_pct × capitale_attuale`,
l'engine si auto-mette in pausa per `pause_weeks` settimane. Difende dai
regime change non rilevati dai filtri quant — se i filtri stanno fallendo
sistematicamente, fermarsi è meglio che continuare a sanguinare.

- `AutoPauseConfig` + `cfg.auto_pause` (top-level, default disabled).
- Migrazione SQL `0004_auto_pause.sql`: `system_state.auto_pause_until`
  e `auto_pause_reason` (NULL = engine attivo).
- Nuovo modulo puro `runtime/auto_pause.py` con `is_paused()` (gate I/O-free)
  e `evaluate_drawdown_breach()` (decide se armare).
- `entry_cycle` consulta `is_paused` subito dopo il kill-switch e arma
  la pausa dopo aver calcolato il capitale; nuovo status `_STATUS_AUTO_PAUSED`.
- Repository: `set_auto_pause`, `recent_closed_position_pnls_usd`.
- 12 test unitari: gate filter on/off, lookback insufficiente, soglia
  esatta, capitale non valido, transizioni paused → not-paused.

**D — Vol-collapse harvest (§7-bis)**

Exit opportunistica: quando DVOL è scesa di tot punti rispetto all'entry
e siamo in profit, esce subito. Edge IV-RV catturato, non c'è motivo di
tenere fino al profit-take. Nuovo `ExitAction = "CLOSE_VOL_HARVEST"`,
gate `exit.vol_harvest_dvol_decrease` (default 0 = off). 5 test unitari.

**A — Delta target dinamico per regime DVOL (§3.2)**

Strike short adattivo alla volatilità: a DVOL bassa il margine OTM è
generoso ⇒ posso prendere più premio (delta 0.15); a DVOL alta voglio
più safety distance (delta 0.10). Nuovo `DeltaByDvolBand` (step
function); quando `delta_by_dvol` è popolato, `_select_short` legge
la prima banda ascending con `dvol_now ≤ dvol_under`. Default vuoto =
comportamento invariato. `select_strikes` accetta nuovo kwarg
`dvol_now`, propagato da `entry_cycle`. 4 test unitari.

**C — Scaffolding profit-take graduale (§7.1bis)**

Schema in place ma runtime non ancora wirato. Aggiunge `PartialProfitLevel`
e `exit.profit_take_partial_levels` (default vuoto). Nuovo
`ExitAction = "CLOSE_PROFIT_PARTIAL"` nella Literal. La pipeline di
chiusure parziali nel runtime (entry_cycle / repository / clients)
richiede refactor del position model — lasciato come TODO per un PR
dedicato. La schema è pronta a recepire la config futura senza altri
breaking change.

**Profili aggiornati**

- `strategy.yaml` (golden, 1.2.0): tutto disabilitato by default.
- `strategy.conservativa.yaml` (1.2.0-cons): identico al golden.
- `strategy.aggressiva.yaml` (1.2.0-aggr): A+D+F enabled
  (delta_by_dvol 0.15/0.12/0.10, vol_harvest a 15 pt vol,
  auto_pause @ 15% DD su 5 trade, 2 settimane pausa).

Bump versioni 1.1.0 → 1.2.0, hash ricalcolati, test pinning aggiornato.

Suite: 426 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:07:25 +00:00
root c4cd2986a4 feat(gui): aggiunge max drawdown atteso (P99) e tail/gap nei profili
Due metriche per ciascun profilo nel pannello P/L:

- **Max DD attesa (P99)**: streak di stop consecutivi con probabilità
  ≤ 1% nell'anno (union-bound: N_trade × p_loss^N ≤ 0.01) ×
  perdita stop × contratti × posizioni concorrenti.
- **Max DD coda (gap)**: scenario gap notturno in cui il mark salta
  oltre la copertura long PRIMA che lo stop sia eseguibile —
  perdita = larghezza intera meno credito iniziale, su tutte le
  posizioni aperte.

Aggiunge anche colonna "Max DD" nella tabella di sensibilità
win-rate, così si vede immediatamente il trade-off
APR-vs-drawdown al variare del win-rate (da 65% a 82%).

Effetto pratico: a default cap=10k, spot=3000, win=0.75, trades=18:
- Conservativa: APR ≈ +1.8%, Max DD attesa ≈ −2.2% capitale
- Aggressiva: APR ≈ +14%, Max DD attesa ≈ −30% capitale

Numeri che rendono molto più tangibile la frase "drawdown scala con
lo stesso fattore" del §4-ter del documento.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:50:09 +00:00
root 4ab7590745 feat(entry): IV richness gate (§2.9) + golden config bump 1.0.0 → 1.1.0
Aggiunge il filtro a maggior impatto sul win-rate atteso: l'entry
salta se la IV implicita non sta pagando un margine misurabile sopra
la realized vol. La letteratura short-vol systematic indica che
l'edge sostenibile della strategia esiste solo quando IV30g − RV30g
supera una soglia di alcuni punti vol; senza questo gate il selling
vol nudo è strutturalmente neutro a win-rate 70-72%.

Implementazione end-to-end:

- `EntryConfig`: due nuovi campi `iv_minus_rv_min` e
  `iv_minus_rv_filter_enabled`, con default `0` / `false` per non
  rompere setup pre-calibrazione.
- `validate_entry`: §2.9 hard gate che blocca l'entry se
  `iv_minus_rv < iv_minus_rv_min` (skip silenzioso quando il dato è
  `None`, coerente con il pattern §2.8 dei filtri quant).
- `entry_cycle._gather_snapshot`: nuovo `_safe_iv_minus_rv` che
  legge `deribit.realized_vol("ETH")["iv_minus_rv_30d"]` in
  best-effort e lo propaga via `_MarketSnapshot.iv_minus_rv` →
  `EntryContext.iv_minus_rv` → audit `inputs.snapshot.iv_minus_rv`.
- `tests/unit/test_entry_validator.py`: 5 nuovi casi (default
  permissivo, gate sotto/sopra/uguale soglia, dato mancante).
- `tests/integration/test_entry_cycle.py`: stub `get_realized_vol`
  nel mock helper così tutti gli scenari di happy/edge path
  continuano a passare.

Configurazione di profili coerente con la disciplina:

- `strategy.yaml` (golden 1.1.0) e `strategy.conservativa.yaml`:
  gate `enabled=false, min=0`. Manteniamo i lunedì pre-calibrazione
  per accumulare dati sulla distribuzione di `iv_minus_rv`.
- `strategy.aggressiva.yaml` (1.1.0-aggressiva): gate
  `enabled=true, min=3`. Coerente con la filosofia del profilo —
  size più grande pretende win-rate più alto. La soglia 3 è
  conservativa; la documentazione raccomanda 5 dopo 4-8 settimane di
  calibrazione.

Doc + GUI:

- `docs/13-strategia-spiegata.md` §4-quater: spiega gate, parametri,
  default per profilo, effetto atteso sul P/L (trade/anno scendono
  ma E[trade] sale → APR cresce comunque), roadmap di hardening
  (soglia adattiva, vol-of-vol guard, multi-asset).
- pagina `📚 Strategia`: la riga "IV − RV" passa da informativa a
  pass/fail reale; mostra "filtro DISABILITATO (info-only)" quando
  spento, / contro la soglia di config quando acceso.

Bump versioni e hash di tutti e tre i file YAML
(`config_version: 1.1.0`, hash ricalcolato). Test pinning aggiornato
(`test_load_repo_strategy_yaml`).

Suite: 410 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:32:21 +00:00
29 changed files with 2390 additions and 52 deletions
+85
View File
@@ -41,6 +41,34 @@ sono vere:
patrimonio totale (correlazione direzionale già alta). patrimonio totale (correlazione direzionale già alta).
8. **Liquidità degli strike candidati** entro le soglie del §3. 8. **Liquidità degli strike candidati** entro le soglie del §3.
### 2.8 Filtri quant (introdotti in Phase 4)
Due gate aggiuntivi che leggono i campi del `market_snapshot`:
- **Dealer net gamma > 0** (default: `dealer_gamma_min: 0`). Long-gamma
regime = dealer hedge che sopprime la vol → ideale per vendere
credit spread. Short-gamma = vol-amplifying flow, statisticamente
perdente. Disattivabile via `dealer_gamma_filter_enabled: false`.
- **Liquidation risk ≠ high** (default: `liquidation_filter_enabled:
true`). Salta entry quando il modulo sentiment flagga uno squeeze
imminente su long o short side.
### 2.9 IV richness gate (introdotto in Phase 5, opt-in)
Filtro a maggior impatto sul win-rate. **Disabilitato** di default
nella golden config — abilitato esplicitamente nel profilo
`strategy.aggressiva.yaml`:
- `iv_minus_rv_filter_enabled: true|false` — master switch.
- `iv_minus_rv_min: <pt vol>` — soglia: l'entry passa solo se la
IV implicita 30g supera la realized vol 30g di almeno tot punti.
Default 0; valori sensati 3-5 dopo calibrazione sui dati raccolti.
Razionale: il selling vol nudo è strutturalmente neutro a win-rate
70-72%. L'edge della strategia esiste solo quando il premio è
"ricco" — IV30 > RV30 + N. Vedere `13-strategia-spiegata.md §4-quater`
per il razionale completo.
Se anche **una sola** condizione fallisce → **no entry**, log con motivo, Se anche **una sola** condizione fallisce → **no entry**, log con motivo,
ritento la settimana successiva. ritento la settimana successiva.
@@ -77,6 +105,20 @@ assoluto.
| Distanza minima OTM | 15% (anche se delta è dentro tolleranza) | | Distanza minima OTM | 15% (anche se delta è dentro tolleranza) |
| Distanza massima OTM | 25% | | Distanza massima OTM | 25% |
**Variante dinamica per regime DVOL (Phase 5, opt-in).** Il campo
`short_strike.delta_by_dvol` (lista step-function) sostituisce il
delta target singolo con bande ordinate per `dvol_under`: a DVOL
bassa il bot prende delta più alto (più premio), a DVOL alta sceglie
delta più basso (più safety distance). Lista vuota = comportamento
classico col delta target sopra. Esempio bande nel profilo
`strategy.aggressiva.yaml`:
| dvol_under | delta_target | delta_min | delta_max |
|---|---|---|---|
| 50 | 0.15 | 0.13 | 0.17 |
| 70 | 0.12 | 0.10 | 0.15 |
| 90 | 0.10 | 0.08 | 0.12 |
Se nessuno strike disponibile rientra in entrambe le tolleranze → no entry. Se nessuno strike disponibile rientra in entrambe le tolleranze → no entry.
### 3.3 Strike long (protezione) ### 3.3 Strike long (protezione)
@@ -179,6 +221,49 @@ Per ogni posizione aperta, il rule engine valuta in ordine:
L'ordine è importante: il primo trigger soddisfatto vince. L'ordine è importante: il primo trigger soddisfatto vince.
## 7-bis. Estensioni opzionali (Phase 5)
Tre miglioramenti opt-in al decision loop. Tutti **disabled** di
default nella golden config; il profilo `strategy.aggressiva.yaml`
li abilita.
### 7-bis.1 Vol-collapse harvest (D)
Inserito tra il profit-take §7.1 e lo stop-loss §7.2:
> Se `vol_harvest_dvol_decrease > 0`, siamo in profit
> (`debit < credit`) E `DVOL_now ≤ DVOL_entry vol_harvest_dvol_decrease`,
> `decision = CLOSE_VOL_HARVEST`.
Razionale: edge IV-RV già catturato, vol attesa rientrata, non c'è
motivo di tenere fino al profit-take. Default disabilitato (`0`);
profilo aggressivo: `15` punti vol.
### 7-bis.2 Profit-take graduale (C — scaffolding)
Schema in place per chiusure parziali; pipeline runtime di
chiusura partial-close NON ancora wirata. Default vuoto. Quando
popolato, ogni livello `{mark_at_pct_credit, close_pct_of_initial_contracts}`
emette un'azione `CLOSE_PROFIT_PARTIAL` advisory che il runtime
attualmente ignora. Il completamento richiede refactor del position
model (contracts_open vs contracts_initial) — PR dedicato.
### 7-bis.3 Auto-pause su drawdown (F)
Circuit breaker sopra il kill-switch tecnico. Valutato all'inizio di
ogni entry-cycle:
> Se `auto_pause.enabled` e P/L cumulato delle ultime
> `lookback_trades` posizioni chiuse < `max_drawdown_pct ×
> capitale_corrente`, l'engine si auto-mette in pausa per
> `pause_weeks` settimane (skip-week mode).
Difende dai regime change non rilevati dai filtri quant. La pausa
si annulla automaticamente alla scadenza, oppure manualmente con
`UPDATE system_state SET auto_pause_until = NULL`. Default
disabilitato; profilo aggressivo: lookback 5 trade, soglia 15%, 2
settimane di pausa.
## 8. Esecuzione di apertura ## 8. Esecuzione di apertura
1. Engine costruisce **combo order Deribit** (un solo ordine atomico 1. Engine costruisce **combo order Deribit** (un solo ordine atomico
+63 -6
View File
@@ -202,7 +202,9 @@ CREATE TABLE system_state (
last_kelly_calib TEXT, last_kelly_calib TEXT,
config_version TEXT NOT NULL, config_version TEXT NOT NULL,
started_at TEXT NOT NULL, started_at TEXT NOT NULL,
last_audit_hash TEXT -- aggiunto dalla migration 0002 last_audit_hash TEXT, -- aggiunto dalla migration 0002
auto_pause_until TEXT, -- aggiunto dalla migration 0004 (§7-bis.3)
auto_pause_reason TEXT -- aggiunto dalla migration 0004
); );
``` ```
@@ -212,6 +214,54 @@ Al boot l'orchestrator confronta questo valore con il tail del file
`audit.log`: discrepanza → kill switch CRITICAL, vedi `audit.log`: discrepanza → kill switch CRITICAL, vedi
`07-risk-controls.md`. `07-risk-controls.md`.
I campi `auto_pause_until` / `auto_pause_reason` implementano il
circuit breaker §7-bis.3 (pausa automatica su drawdown rolling).
NULL = engine attivo.
### `option_chain_snapshots`
Snapshot della catena opzioni Deribit prelevata settimanalmente
(cron `55 13 * * MON`, 5 minuti prima del trigger entry). Ogni
tick contiene un quote per strumento entro la finestra
`[dte_min, dte_max]` di config; tutti i quote prelevati nello stesso
tick condividono ``timestamp``. Migration `0005`.
```sql
CREATE TABLE option_chain_snapshots (
timestamp TEXT NOT NULL,
asset TEXT NOT NULL,
instrument_name TEXT NOT NULL,
strike TEXT NOT NULL,
expiry TEXT NOT NULL,
option_type TEXT NOT NULL CHECK (option_type IN ('C','P')),
bid TEXT,
ask TEXT,
mid TEXT,
iv TEXT,
delta TEXT,
gamma TEXT,
theta TEXT,
vega TEXT,
open_interest INTEGER,
volume_24h INTEGER,
book_depth_top3 INTEGER,
PRIMARY KEY (timestamp, instrument_name)
) WITHOUT ROWID;
```
Indici: `(asset, timestamp DESC)` per listing recenti, `(asset,
expiry)` per query per scadenza specifica. ``book_depth_top3`` è
NULL by design — il collector non chiama l'order book per ogni
strike per non saturare l'API; lo legge il liquidity gate live solo
sugli strike candidati al picker.
**Sblocca**: il backtest non-stilizzato (modulo `core/backtest.py`
con prezzi reali invece di Black-Scholes), la calibrazione empirica
dello skew premium, la validazione ex-post dello strike picker.
Volume atteso: ~50 strike × 3 scadenze × 1 snapshot/settimana ×
17 colonne ≈ 12 KB/settimana, ~600 KB/anno.
## Log file ## Log file
Sotto `data/log/` un file per giorno: `cerbero-bite-YYYY-MM-DD.jsonl`. Sotto `data/log/` un file per giorno: `cerbero-bite-YYYY-MM-DD.jsonl`.
@@ -289,11 +339,18 @@ da altri processi (es. CLI `state inspect`) non vedano stati parziali.
## Migrations ## Migrations
Lo schema viene tracciato con il counter `PRAGMA user_version`. La Lo schema viene tracciato con il counter `PRAGMA user_version`.
prima volta `0001_init.sql` viene applicato e versione → 1; alla `state.db.run_migrations` applica in ordine ogni file
seconda esecuzione (o su DB già a versione 1) `0002_audit_anchor.sql` `NNNN_<name>.sql` con versione superiore a quella corrente,
viene applicato e versione → 2. `state.db.run_migrations` è idempotente, forward-only:
idempotente. Nessun rollback supportato (migrations forward-only).
| Versione | File | Cosa aggiunge |
|---|---|---|
| 1 | `0001_init.sql` | tabelle base (positions, decisions, ...) |
| 2 | `0002_audit_anchor.sql` | `system_state.last_audit_hash` |
| 3 | `0003_market_snapshots.sql` | tabella `market_snapshots` |
| 4 | `0004_auto_pause.sql` | `system_state.auto_pause_until / _reason` |
| 5 | `0005_option_chain_snapshots.sql` | tabella `option_chain_snapshots` |
## Backup ## Backup
+104
View File
@@ -532,6 +532,110 @@ quella che il sistema parte ad eseguire.
--- ---
## 4-quater. IV richness gate (§2.9): il filtro che alza il win-rate
Il filtro a maggior impatto sull'edge è anche il più semplice da
descrivere: **non vendere vol quando la IV non sta pagando un margine
misurabile sopra la RV**. È implementato come gate hard nel
`validate_entry`:
```
if iv_minus_rv_filter_enabled and iv_minus_rv < iv_minus_rv_min:
skip entry
```
con due parametri in `entry:` di `strategy.yaml`:
| Parametro | Default | Effetto |
|---|---|---|
| `iv_minus_rv_filter_enabled` | `false` (golden) / `true` (aggressiva) | Master switch del gate |
| `iv_minus_rv_min` | `0` (golden) / `3` (aggressiva) | Soglia in punti vol che IV30g RV30g deve eccedere |
Il dato è già raccolto in `market_snapshots.iv_minus_rv` ogni 15
minuti. Il gate consulta l'ultimo tick disponibile al momento
dell'entry cycle (non un percentile rolling — quello è il prossimo
step di calibrazione, vedi §4-quinquies in roadmap).
**Profili di default ragionati.**
- **Conservativa / golden config**: `enabled=false, min=0`. Tutti i
setup passano questo gate, anche con IV-RV negativa. Motivo: nei
primi 8 turni di lunedì non si hanno abbastanza tick per stabilire
che soglia ha senso nel proprio regime. Lasciamo la pagina
`📐 Calibrazione` mostrare la distribuzione e poi alziamo
manualmente.
- **Aggressiva**: `enabled=true, min=3`. Il profilo aggressivo già di
suo prende size più grande; pretendere `IV-RV ≥ 3 vol points` come
prerequisito è coerente — se stai betting più grosso, vuoi
win-rate più alto. La soglia 3 è conservativa; la letteratura
short-vol systematic suggerisce 5 dopo calibrazione.
**Cosa cambia nel P/L atteso quando attivi il gate.**
Il gate **riduce** il numero di entry (saltiamo settimane con premio
magro) ma **alza** la qualità di quelle che passano (premio ricco =
win-rate empirico più alto). Effetto netto sul P/L annuo:
- Trade/anno: 18 → 12-14 (skip più aggressivo)
- Win-rate atteso: 0.72 → 0.78-0.80
- E[trade] netto: +0.6 USD → +4-6 USD per contratto
- **P/L annuo proiettato sale anche se i trade scendono**, perché
ogni trade ha edge più alto.
La pagina `📚 Strategia` ha lo slider win-rate già coerente con
questa logica: muovi da 0.72 a 0.78 e vedi l'APR scattare.
**Roadmap di hardening (passi successivi al merge di questo PR).**
1. **Soglia adattiva**: sostituire `iv_minus_rv_min: 3` con un valore
calcolato a runtime come `P25 rolling 60d` di `market_snapshots.iv_minus_rv`.
2. **Vol-of-vol guard**: bloccare entry quando `dvol` è cambiato di
≥5 punti nelle ultime 24h, anche se `iv_minus_rv` è alto (regime
instabile).
3. **Multi-asset (ETH+BTC)**: come da §4-ter, sblocca il
moltiplicatore 2× sulle opportunità a parità di filtri.
## 4-quinquies. Catena opzioni storica (Phase 5)
In aggiunta a `market_snapshots` (cron `*/15`), il bot raccoglie
ora una seconda fonte di dati: la **catena opzioni Deribit completa**
ogni lunedì alle 13:55 UTC (5 minuti prima del trigger entry).
Tabella `option_chain_snapshots` — vedi `05-data-model.md` per lo
schema. Cosa registra per ogni strumento entro la finestra
`[dte_min, dte_max]`:
| Campo | Cosa misura | A che serve |
|---|---|---|
| `bid` / `ask` / `mid` | Prezzi reali Deribit in ETH | P/L reale per backtest |
| `iv` | IV implicita allo strike | Calibrazione skew premium |
| `delta`/`gamma`/`theta`/`vega` | Greeks | Sizing + risk monitoring |
| `open_interest` / `volume_24h` | Liquidità | Validazione gate §4 |
**Cosa sblocca:**
- **Backtest non-stilizzato**: il modulo `core/backtest.py` può
sostituire la stima Black-Scholes (con `skew_premium` hardcoded a
1.5×) leggendo i `mid` reali. Numeri da "stime ex-ante per
ranking" a "P/L empirico per dimensionare capitale".
- **Calibrazione empirica dello skew**: rapporto fra prezzi
Deribit reali e BS pulito → valore data-driven invece di stima a
istinto. Va fatto dopo 4-8 settimane di accumulo.
- **Validazione ex-post strike picker**: "il delta-0.12 era davvero
a 25% OTM in quella settimana?" diventa una `SELECT`.
**Strumenti CLI:**
- `cerbero-bite option-chain trigger` — esegue UNA volta il
collector senza aspettare il cron. Utile per test e per popolare
prima del primo lunedì utile.
- `cerbero-bite option-chain analyze [--bias bull_put|bear_call]`
legge l'ultimo snapshot, simula il selector di strike con la
strategy passata e stampa: short/long strike, delta, width,
credito reale, ratio credit/width, PASS/FAIL del gate
`credit_to_width_ratio_min`. Risponde in tempo reale: "il rule
engine aprirebbe un trade adesso?".
## 5. Come leggere il dato giorno per giorno ## 5. Come leggere il dato giorno per giorno
Tre euristiche operative sui campi raccolti: Tre euristiche operative sui campi raccolti:
+235
View File
@@ -961,6 +961,241 @@ def state_inspect(db: Path) -> None:
console.print(table) console.print(table)
@main.group(name="option-chain")
def option_chain() -> None:
"""Strumenti per la catena opzioni storica (`option_chain_snapshots`)."""
@option_chain.command(name="trigger")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--audit",
"audit_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
@click.option(
"--token",
type=str,
default=None,
help="MCP bearer token (override su CERBERO_BITE_MCP_TOKEN).",
)
@click.option("--asset", default="ETH", show_default=True)
def option_chain_trigger(
strategy_path: Path,
db_path: Path,
audit_path: Path,
token: str | None,
asset: str,
) -> None:
"""Esegue UNA volta il collector della catena opzioni e persiste in DB.
Utile per popolare i dati senza aspettare il cron settimanale del
job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline
schedulata.
"""
from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415
from cerbero_bite.runtime.option_chain_snapshot_cycle import ( # noqa: PLC0415
collect_option_chain_snapshot,
)
cfg = load_strategy(strategy_path).config
ctx = build_runtime(
cfg=cfg,
endpoints=load_endpoints(),
token=load_token(value=token),
db_path=db_path,
audit_path=audit_path,
bot_tag=load_bot_tag(),
)
n = asyncio.run(collect_option_chain_snapshot(ctx, asset=asset))
console.print(
f"[green]Persisted {n} option chain quote(s) for {asset}[/green]"
if n > 0
else f"[yellow]No quotes persisted (chain empty or fetch failed)[/yellow]"
)
@option_chain.command(name="analyze")
@click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--db",
"db_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option("--asset", default="ETH", show_default=True)
@click.option(
"--bias",
type=click.Choice(["bull_put", "bear_call"], case_sensitive=False),
default="bull_put",
show_default=True,
help="Direzione da simulare (il rule engine lo deciderebbe da trend×funding).",
)
def option_chain_analyze(
strategy_path: Path,
db_path: Path,
asset: str,
bias: str,
) -> None:
"""Analizza l'ultimo snapshot di catena salvato.
Per la strategia indicata, simula la selezione strike (delta
target, OTM range, width 4%, credit/width ratio min) e mostra:
* lo strike che il rule engine sceglierebbe come short e long,
* credito atteso, larghezza, rapporto credit/width,
* pass/fail del gate `credit_to_width_ratio_min`.
"""
from cerbero_bite.core.combo_builder import select_strikes # noqa: PLC0415
from cerbero_bite.core.types import OptionQuote # noqa: PLC0415
cfg = load_strategy(strategy_path).config
conn = connect_state(db_path)
try:
repo = Repository()
latest_ts = repo.latest_option_chain_timestamp(conn, asset=asset.upper())
if latest_ts is None:
console.print(
"[red]Nessuno snapshot di catena trovato. Lancia prima "
"`cerbero-bite option-chain trigger`.[/red]"
)
sys.exit(1)
quotes_records = repo.list_option_chain_snapshots(
conn, asset=asset.upper(), start=latest_ts, end=latest_ts,
)
finally:
conn.close()
console.print(
f"[cyan]Snapshot del {latest_ts.isoformat()}{len(quotes_records)} "
f"quote totali[/cyan]"
)
# Costruzione OptionQuote da OptionChainQuoteRecord per riusare select_strikes.
quotes: list[OptionQuote] = []
for q in quotes_records:
if q.bid is None or q.ask is None or q.mid is None or q.delta is None:
continue
quotes.append(
OptionQuote(
instrument=q.instrument_name,
strike=q.strike,
expiry=q.expiry,
option_type=q.option_type,
bid=q.bid,
ask=q.ask,
mid=q.mid,
delta=q.delta,
gamma=q.gamma or Decimal("0"),
theta=q.theta or Decimal("0"),
vega=q.vega or Decimal("0"),
open_interest=q.open_interest or 0,
volume_24h=q.volume_24h or 0,
book_depth_top3=q.book_depth_top3 or 0,
)
)
if not quotes:
console.print("[red]Nessun quote completo per la simulazione.[/red]")
sys.exit(1)
# Lo spot al momento dello snapshot: estraiamo dall'ultimo
# `market_snapshot` ETH a quel timestamp (tolleranza ±15 min).
spot = _resolve_spot_at(db_path, asset=asset.upper(), at=latest_ts)
if spot is None:
console.print(
"[yellow]Spot non recuperabile dai market_snapshots; "
"stimato dal mid ATM.[/yellow]"
)
spot = _atm_spot_proxy(quotes)
selection = select_strikes(
chain=quotes,
bias=bias, # type: ignore[arg-type]
spot=spot,
now=latest_ts,
cfg=cfg,
)
if selection is None:
console.print(
"[red]Il rule engine NON aprirebbe trade con questa catena[/red] "
"(no strike compatibile coi gate delta/distance/width/credit-ratio)."
)
sys.exit(0)
short, long_ = selection
width_usd = (short.strike - long_.strike).copy_abs()
credit_eth = short.mid - long_.mid
credit_usd = credit_eth * spot
ratio = credit_usd / width_usd if width_usd > 0 else Decimal("0")
ratio_target = cfg.structure.credit_to_width_ratio_min
table = Table(title=f"Simulazione picker — bias={bias}, spot={spot:.0f}")
table.add_column("Campo", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Short strike", f"{short.strike} ({short.delta:+.3f}δ)")
table.add_row("Long strike", f"{long_.strike} ({long_.delta:+.3f}δ)")
table.add_row("Width", f"{width_usd:.0f} USD")
table.add_row("Credit", f"{credit_eth:.4f} ETH ≈ {credit_usd:.2f} USD")
table.add_row(
"Credit/width ratio",
f"{ratio:.2%} (gate ≥ {float(ratio_target):.0%})",
)
pass_str = (
"[green]PASS — entry possibile[/green]"
if ratio >= ratio_target
else "[red]FAIL — premio troppo magro[/red]"
)
table.add_row("Verdetto gate ratio", pass_str)
console.print(table)
def _resolve_spot_at(db_path: Path, *, asset: str, at: datetime) -> Decimal | None:
"""Best-effort lookup dello spot al timestamp ``at`` ± 15 min."""
conn = connect_state(db_path)
try:
rows = Repository().list_market_snapshots(
conn,
asset=asset,
start=at - timedelta(minutes=15),
end=at + timedelta(minutes=15),
limit=1,
)
finally:
conn.close()
if not rows:
return None
return rows[0].spot
def _atm_spot_proxy(quotes: list[Any]) -> Decimal:
"""Stima dello spot prendendo lo strike il cui delta è più vicino a 0.5."""
quote = min(quotes, key=lambda q: abs(abs(q.delta) - Decimal("0.5")))
return quote.strike
def _entrypoint() -> None: def _entrypoint() -> None:
"""Wrapper used by ``cerbero-bite`` console script.""" """Wrapper used by ``cerbero-bite`` console script."""
try: try:
+103
View File
@@ -75,12 +75,32 @@ class EntryConfig(BaseModel):
dealer_gamma_filter_enabled: bool = True dealer_gamma_filter_enabled: bool = True
liquidation_filter_enabled: bool = True liquidation_filter_enabled: bool = True
# IV richness filter (§2.9). `iv_minus_rv_min` è la soglia in
# punti vol che la IV implicita 30g deve eccedere la RV30g per
# ammettere l'entry. Letteratura short-vol systematic: l'edge
# sostenibile esiste solo con un margine misurabile fra IV e RV.
# Default disabilitato + soglia 0 per non bloccare l'avvio finché
# non si è calibrato sui dati raccolti (vedi `📐 Calibrazione`).
iv_minus_rv_min: Decimal = Field(default=Decimal("0"))
iv_minus_rv_filter_enabled: bool = False
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Structure # Structure
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class DeltaByDvolBand(BaseModel):
"""Banda della step function delta-target per regime DVOL (§3.2 A)."""
model_config = ConfigDict(frozen=True, extra="forbid")
dvol_under: Decimal
delta_target: Decimal
delta_min: Decimal
delta_max: Decimal
class ShortStrikeSpec(BaseModel): class ShortStrikeSpec(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid") model_config = ConfigDict(frozen=True, extra="forbid")
@@ -90,6 +110,16 @@ class ShortStrikeSpec(BaseModel):
distance_otm_pct_min: Decimal = Field(default=Decimal("0.15")) distance_otm_pct_min: Decimal = Field(default=Decimal("0.15"))
distance_otm_pct_max: Decimal = Field(default=Decimal("0.25")) distance_otm_pct_max: Decimal = Field(default=Decimal("0.25"))
# §3.2 enhancement (A): step function delta-target by DVOL regime.
# Empty list = behaviour invariato (delta_target sopra è il singolo
# valore). Quando popolato, il combo_builder sceglie la prima
# banda ordinata ascending su `dvol_under` con
# `dvol_now ≤ dvol_under`. Esempio:
# - dvol_under=50 → delta 0.15 (bassa vol → più premio)
# - dvol_under=70 → delta 0.12
# - dvol_under=90 → delta 0.10 (alta vol → più safety)
delta_by_dvol: list[DeltaByDvolBand] = Field(default_factory=list)
class SpreadWidthSpec(BaseModel): class SpreadWidthSpec(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid") model_config = ConfigDict(frozen=True, extra="forbid")
@@ -165,6 +195,25 @@ class SizingConfig(BaseModel):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class PartialProfitLevel(BaseModel):
"""Livello della scala di profit-take graduale (§7.1bis C).
`mark_at_pct_credit`: il livello è triggerato quando
`mark_combo ≤ mark_at_pct_credit × credito_iniziale` (es. 0.25 =
25% del credito = 75% di profitto sulla porzione chiusa).
`close_pct_of_initial_contracts`: frazione dei contratti aperti
INIZIALMENTE da chiudere a questo livello (es. 0.50 = chiudi metà).
Le frazioni sono cumulative; chiudere oltre i contratti residui
è no-op.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
mark_at_pct_credit: Decimal
close_pct_of_initial_contracts: Decimal
class ExitConfig(BaseModel): class ExitConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid") model_config = ConfigDict(frozen=True, extra="forbid")
@@ -176,6 +225,29 @@ class ExitConfig(BaseModel):
delta_breach_threshold: Decimal = Field(default=Decimal("0.30")) delta_breach_threshold: Decimal = Field(default=Decimal("0.30"))
adverse_move_4h_pct: Decimal = Field(default=Decimal("0.05")) adverse_move_4h_pct: Decimal = Field(default=Decimal("0.05"))
# §7.1ter (D): vol-collapse harvest. Esce in profit anche se il
# profit-take non è ancora colpito quando DVOL è scesa di tot
# punti rispetto all'entry (edge IV-RV catturato, vol attesa già
# rientrata). 0 = filtro disabilitato.
vol_harvest_dvol_decrease: Decimal = Field(default=Decimal("0"))
# §7.1bis (C): scala graduata di profit-take. Lista vuota =
# comportamento invariato (chiusura atomica al
# `profit_take_pct_of_credit`). Quando popolata, l'engine
# interpreta come "chiudi N% dei contratti iniziali al livello
# di mark M%×credito". Le entry sono ordinate dal mark più alto
# (più profit, livello triggerato prima) al più basso. Vedi
# `core/exit_decision.py` per la semantica esatta.
#
# ATTENZIONE: questa funzione richiede il supporto di chiusure
# parziali nel runtime (entry_cycle / repository / clients).
# Fino al merge della partial-close pipeline, l'engine la mappa
# a CLOSE_PROFIT atomico al primo livello triggerato (vedi
# commento in `evaluate`). Default vuoto = no-op.
profit_take_partial_levels: list[PartialProfitLevel] = Field(
default_factory=list
)
monitor_cron: str = "0 2,14 * * *" monitor_cron: str = "0 2,14 * * *"
user_confirmation_timeout_min: int = 30 user_confirmation_timeout_min: int = 30
escalate_on_timeout: list[str] = Field( escalate_on_timeout: list[str] = Field(
@@ -183,6 +255,36 @@ class ExitConfig(BaseModel):
) )
# ---------------------------------------------------------------------------
# Auto-pause (F): circuit breaker su drawdown rolling
# ---------------------------------------------------------------------------
class AutoPauseConfig(BaseModel):
"""Configurazione del circuit breaker su drawdown.
Quando abilitato, il rule engine valuta — prima di ogni entry —
il P/L cumulato delle ultime `lookback_trades` posizioni chiuse
in proporzione al capitale attuale. Se la perdita supera la
soglia, l'engine si auto-mette in pausa per `pause_weeks`
settimane (skip-week). La pausa si annulla automaticamente alla
scadenza, oppure manualmente via comando dalla GUI.
Difende da regime change non rilevati dai filtri quant: se i
filtri stanno fallendo sistematicamente, vale la pena fermarsi
e attendere che le condizioni cambino, invece di continuare a
sanguinare. È un'estensione conservativa del kill switch
(che oggi reagisce solo a errori tecnici).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
enabled: bool = False
lookback_trades: int = 5
max_drawdown_pct: Decimal = Field(default=Decimal("0.10"))
pause_weeks: int = 2
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Kelly recalibration # Kelly recalibration
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -256,6 +358,7 @@ class StrategyConfig(BaseModel):
sizing: SizingConfig = Field(default_factory=SizingConfig) sizing: SizingConfig = Field(default_factory=SizingConfig)
exit: ExitConfig = Field(default_factory=ExitConfig) exit: ExitConfig = Field(default_factory=ExitConfig)
kelly_recalibration: KellyConfig = Field(default_factory=KellyConfig) kelly_recalibration: KellyConfig = Field(default_factory=KellyConfig)
auto_pause: AutoPauseConfig = Field(default_factory=AutoPauseConfig)
execution: ExecutionConfig = Field(default_factory=ExecutionConfig) execution: ExecutionConfig = Field(default_factory=ExecutionConfig)
monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig) monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig)
+27 -3
View File
@@ -83,26 +83,49 @@ def _pick_expiry(
return min(candidates, key=lambda exp: abs(candidates[exp] - sc.dte_target)) return min(candidates, key=lambda exp: abs(candidates[exp] - sc.dte_target))
def _resolve_delta_band(
sc: object, dvol_now: Decimal | None
) -> tuple[Decimal, Decimal, Decimal]:
"""Return (delta_target, delta_min, delta_max) per il regime DVOL corrente.
Quando ``sc.delta_by_dvol`` è popolato e ``dvol_now`` è disponibile,
sceglie la prima banda (ordinata ascending sulla ``dvol_under``) il
cui ``dvol_under ≥ dvol_now``. Altrimenti torna ai valori statici di
``sc``.
"""
bands = list(getattr(sc, "delta_by_dvol", []) or [])
if dvol_now is not None and bands:
bands_sorted = sorted(bands, key=lambda b: b.dvol_under)
for band in bands_sorted:
if dvol_now <= band.dvol_under:
return band.delta_target, band.delta_min, band.delta_max
last = bands_sorted[-1]
return last.delta_target, last.delta_min, last.delta_max
return sc.delta_target, sc.delta_min, sc.delta_max
def _select_short( def _select_short(
quotes: list[OptionQuote], quotes: list[OptionQuote],
*, *,
spot: Decimal, spot: Decimal,
cfg: StrategyConfig, cfg: StrategyConfig,
dvol_now: Decimal | None = None,
) -> OptionQuote | None: ) -> OptionQuote | None:
"""Pick the short-leg quote with delta closest to target inside both bands.""" """Pick the short-leg quote with delta closest to target inside both bands."""
sc = cfg.structure.short_strike sc = cfg.structure.short_strike
delta_target, delta_min, delta_max = _resolve_delta_band(sc, dvol_now)
eligible: list[OptionQuote] = [] eligible: list[OptionQuote] = []
for q in quotes: for q in quotes:
dist = (q.strike - spot).copy_abs() / spot dist = (q.strike - spot).copy_abs() / spot
if not (sc.distance_otm_pct_min <= dist <= sc.distance_otm_pct_max): if not (sc.distance_otm_pct_min <= dist <= sc.distance_otm_pct_max):
continue continue
abs_delta = q.delta.copy_abs() abs_delta = q.delta.copy_abs()
if not (sc.delta_min <= abs_delta <= sc.delta_max): if not (delta_min <= abs_delta <= delta_max):
continue continue
eligible.append(q) eligible.append(q)
if not eligible: if not eligible:
return None return None
return min(eligible, key=lambda q: abs(q.delta.copy_abs() - sc.delta_target)) return min(eligible, key=lambda q: abs(q.delta.copy_abs() - delta_target))
def _select_long( def _select_long(
@@ -143,6 +166,7 @@ def select_strikes(
spot: Decimal, spot: Decimal,
now: datetime, now: datetime,
cfg: StrategyConfig, cfg: StrategyConfig,
dvol_now: Decimal | None = None,
) -> tuple[OptionQuote, OptionQuote] | None: ) -> tuple[OptionQuote, OptionQuote] | None:
"""Return the (short, long) quotes for the requested vertical, or ``None``. """Return the (short, long) quotes for the requested vertical, or ``None``.
@@ -161,7 +185,7 @@ def select_strikes(
if not typed: if not typed:
return None return None
short = _select_short(typed, spot=spot, cfg=cfg) short = _select_short(typed, spot=spot, cfg=cfg, dvol_now=dvol_now)
if short is None: if short is None:
return None return None
+20
View File
@@ -44,6 +44,12 @@ class EntryContext(BaseModel):
dealer_net_gamma: Decimal | None = None dealer_net_gamma: Decimal | None = None
liquidation_squeeze_risk_high: bool | None = None liquidation_squeeze_risk_high: bool | None = None
# IV richness gate (§2.9). Differenza IV30g RV30g in punti vol.
# Optional, stessa logica best-effort dei filtri quant: ``None``
# significa "dato non disponibile" e fa saltare il gate (non
# invalida l'entry).
iv_minus_rv: Decimal | None = None
class EntryDecision(BaseModel): class EntryDecision(BaseModel):
"""Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons.""" """Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons."""
@@ -131,6 +137,20 @@ def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision:
): ):
reasons.append("imminent liquidation squeeze risk") reasons.append("imminent liquidation squeeze risk")
# §2.9: IV richness gate. Vendere vol senza un margine misurabile
# fra IV e RV è statisticamente neutro: l'edge della strategia
# esiste solo quando il premio è "ricco" rispetto a quanto il
# mercato si è effettivamente mosso.
if (
entry_cfg.iv_minus_rv_filter_enabled
and ctx.iv_minus_rv is not None
and ctx.iv_minus_rv < entry_cfg.iv_minus_rv_min
):
reasons.append(
f"IV richness below floor "
f"(IV-RV={ctx.iv_minus_rv} < {entry_cfg.iv_minus_rv_min} vol pts)"
)
return EntryDecision(accepted=not reasons, reasons=reasons) return EntryDecision(accepted=not reasons, reasons=reasons)
+18
View File
@@ -28,8 +28,10 @@ __all__ = ["ExitAction", "ExitDecisionResult", "PositionSnapshot", "evaluate"]
ExitAction = Literal[ ExitAction = Literal[
"HOLD", "HOLD",
"CLOSE_PROFIT", "CLOSE_PROFIT",
"CLOSE_PROFIT_PARTIAL",
"CLOSE_STOP", "CLOSE_STOP",
"CLOSE_VOL", "CLOSE_VOL",
"CLOSE_VOL_HARVEST",
"CLOSE_TIME", "CLOSE_TIME",
"CLOSE_DELTA", "CLOSE_DELTA",
"CLOSE_AVERSE", "CLOSE_AVERSE",
@@ -115,6 +117,22 @@ def evaluate(snapshot: PositionSnapshot, cfg: StrategyConfig) -> ExitDecisionRes
f"mark {debit}{ec.profit_take_pct_of_credit:.0%} of credit {credit}", f"mark {debit}{ec.profit_take_pct_of_credit:.0%} of credit {credit}",
) )
# 1bis. Vol-collapse harvest (D): siamo IN profit (debit < credit) e
# la DVOL è scesa di tot punti rispetto all'entry. Edge IV-RV già
# catturato, non c'è motivo di tenere fino a profit_take. Esce
# opportunisticamente quando il regime di vol che giustificava
# l'entry non c'è più.
if (
ec.vol_harvest_dvol_decrease > 0
and debit < credit
and snapshot.dvol_now <= snapshot.dvol_at_entry - ec.vol_harvest_dvol_decrease
):
return _result(
"CLOSE_VOL_HARVEST",
f"DVOL {snapshot.dvol_now} ≤ entry {snapshot.dvol_at_entry} "
f"{ec.vol_harvest_dvol_decrease}, harvest while in profit",
)
# 2. Stop loss # 2. Stop loss
if debit >= stop_thresh: if debit >= stop_thresh:
return _result( return _result(
@@ -167,9 +167,21 @@ def _percentiles_strip(s: pd.Series) -> None:
st.caption("(nessun dato)") st.caption("(nessun dato)")
return return
quantiles = [0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95] quantiles = [0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95]
cols = st.columns(len(quantiles)) # Inline markdown compatto: i valori a 4 cifre significative
for col, q in zip(cols, quantiles, strict=False): # cadono regolarmente fuori dalla colonna se renderizzati con
col.metric(f"P{int(q * 100)}", f"{s.quantile(q):.4g}") # ``st.metric`` (font fisso, label sopra, no shrink). Render
# orizzontale con font ridotto evita il troncamento.
parts = [
f"<b>P{int(q * 100)}</b> {s.quantile(q):.4g}"
for q in quantiles
]
html = (
"<div style='font-size:0.85rem;line-height:1.5;"
"white-space:nowrap;overflow-x:auto;'>"
+ " &nbsp;·&nbsp; ".join(parts)
+ "</div>"
)
st.markdown(html, unsafe_allow_html=True)
def _render_metric(spec: MetricSpec, records: list[MarketSnapshotRecord]) -> None: def _render_metric(spec: MetricSpec, records: list[MarketSnapshotRecord]) -> None:
+325 -32
View File
@@ -11,6 +11,7 @@ La pagina è di sola lettura: non chiama MCP, non scrive sul DB.
from __future__ import annotations from __future__ import annotations
import math
import os import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -280,26 +281,56 @@ def _build_gates(
) )
) )
# --- IV RV (richness) — solo informativo -------------------- # --- IV RV (richness) — gate §2.9 ---------------------------
rv = ( rv = (
float(snap.realized_vol_30d) if snap.realized_vol_30d is not None else None float(snap.realized_vol_30d) if snap.realized_vol_30d is not None else None
) )
iv_minus_rv = ( iv_minus_rv = (
float(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None float(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
) )
rows.append( iv_min = float(getattr(entry, "iv_minus_rv_min", 0.0)) if entry else 0.0
_GateRow( iv_enabled = (
"IV RV (richness)", bool(getattr(entry, "iv_minus_rv_filter_enabled", False)) if entry else False
(
f"{iv_minus_rv:+.2f} pt vol"
if iv_minus_rv is not None
else ""
),
"info, > 0 = premio ricco",
"pass" if (iv_minus_rv is not None and iv_minus_rv > 0) else "n/a",
f"RV30={rv:.2f}" if rv is not None else "",
)
) )
if not iv_enabled:
rows.append(
_GateRow(
"IV RV (richness)",
(
f"{iv_minus_rv:+.2f} pt vol"
if iv_minus_rv is not None
else ""
),
"filtro DISABILITATO (info-only)",
"n/a",
f"RV30={rv:.2f} · attiva con `iv_minus_rv_filter_enabled: true`"
if rv is not None
else "Attiva con `iv_minus_rv_filter_enabled: true`",
)
)
elif iv_minus_rv is None:
rows.append(
_GateRow(
"IV RV ≥ soglia",
"",
f"{iv_min:.1f} pt vol",
"n/a",
"Dato non disponibile in questo tick (best-effort skip).",
)
)
else:
ok = iv_minus_rv >= iv_min
rows.append(
_GateRow(
"IV RV ≥ soglia",
f"{iv_minus_rv:+.2f} pt vol",
f"{iv_min:.1f} pt vol",
"pass" if ok else "fail",
"Premio ricco rispetto a quanto il mercato si è davvero "
"mosso → edge sostenibile per il venditore di vol."
+ (f" RV30={rv:.2f}" if rv is not None else ""),
)
)
return rows return rows
@@ -347,6 +378,44 @@ def _profile_caps(strategy: object | None) -> dict[str, float]:
return out return out
def _detect_features(strategy: object | None) -> dict[str, bool]:
"""Quali miglioramenti del PR FDAC sono ATTIVI in questa strategia.
- **A** (delta dinamico): `short_strike.delta_by_dvol` non vuoto.
- **D** (vol-harvest): `exit.vol_harvest_dvol_decrease > 0`.
- **F** (auto-pause): `auto_pause.enabled = true`.
- **IV** (IV-richness gate, dal PR precedente): `entry.iv_minus_rv_filter_enabled`.
"""
feats = {"A": False, "D": False, "F": False, "IV": False}
if strategy is None:
return feats
try:
feats["A"] = bool(
getattr(strategy.structure.short_strike, "delta_by_dvol", []) # type: ignore[attr-defined]
)
except Exception:
pass
try:
feats["D"] = (
float(getattr(strategy.exit, "vol_harvest_dvol_decrease", 0)) > 0 # type: ignore[attr-defined]
)
except Exception:
pass
try:
feats["F"] = bool(
getattr(getattr(strategy, "auto_pause", None), "enabled", False)
)
except Exception:
pass
try:
feats["IV"] = bool(
getattr(strategy.entry, "iv_minus_rv_filter_enabled", False) # type: ignore[attr-defined]
)
except Exception:
pass
return feats
def _compute_pl( def _compute_pl(
caps: dict[str, float], caps: dict[str, float],
*, *,
@@ -355,13 +424,56 @@ def _compute_pl(
win_rate: float, win_rate: float,
trades_per_year: int, trades_per_year: int,
eur_to_usd: float = 1.075, eur_to_usd: float = 1.075,
features: dict[str, bool] | None = None,
) -> dict[str, float]: ) -> dict[str, float]:
"""Calcola le metriche P/L per un profilo di sizing.""" """Calcola le metriche P/L per un profilo di sizing.
Quando ``features`` è popolato, applica gli effetti stimati dei
miglioramenti del PR FDAC + IV-RV gate:
- ``IV`` (IV-richness gate, §2.9): +5 pp win-rate, 25% trade/anno.
- ``A`` (delta dinamico, §3.2): +1.5 pp win-rate, sl_loss × 0.95.
- ``D`` (vol-harvest, §7-bis): 5% delle would-be-loss diventano
harvest exit a +0.20 × credito.
- ``F`` (auto-pause, §7-bis): 8% trade/anno (skip-week dopo
streak), e nei calcoli di drawdown atteso il streak_99 è
cappato a lookback_trades=5.
Effetti **stimati ex-ante** dalla letteratura short-vol systematic;
i valori puntuali andranno calibrati sul dataset accumulato.
"""
feats = features or {}
width = caps["width_pct"] * spot width = caps["width_pct"] * spot
credit = caps["credit_ratio"] * width credit = caps["credit_ratio"] * width
tp_profit = caps["profit_take"] * credit tp_profit = caps["profit_take"] * credit
sl_loss = (caps["stop_mult"] - 1.0) * credit sl_loss = (caps["stop_mult"] - 1.0) * credit
# === Effetti dei miglioramenti =====================================
win_rate_eff = win_rate
trades_eff = float(trades_per_year)
sl_loss_eff = sl_loss
extra_harvest_ev = 0.0
prob_harvest = 0.0
if feats.get("IV"):
# Skip più aggressivo + qualità migliore: +5 pp win, 25% trade.
win_rate_eff = min(0.95, win_rate_eff + 0.05)
trades_eff *= 0.75
if feats.get("A"):
# Migliore strike picking → +1.5 pp win-rate; riduzione del
# tail della perdita (5%) per le bande high-DVOL.
win_rate_eff = min(0.95, win_rate_eff + 0.015)
sl_loss_eff *= 0.95
if feats.get("D"):
# Vol-harvest: ~5% delle entrate intercettate prima dello stop
# con un piccolo profitto (+0.20×credit). Sottrae lo stesso
# volume dalle prob_loss.
prob_harvest = 0.05
extra_harvest_ev = 0.20 * credit
# F (auto-pause) agisce su streak_99 più sotto, e sul trades_eff.
if feats.get("F"):
trades_eff *= 0.92
cap_pertrade_usd = caps["cap_pertrade_eur"] * eur_to_usd cap_pertrade_usd = caps["cap_pertrade_eur"] * eur_to_usd
risk_target = min(caps["kelly"] * capital, cap_pertrade_usd) risk_target = min(caps["kelly"] * capital, cap_pertrade_usd)
n_kelly = int(risk_target // width) if width > 0 else 0 n_kelly = int(risk_target // width) if width > 0 else 0
@@ -369,32 +481,62 @@ def _compute_pl(
prob_time_stop = 0.07 prob_time_stop = 0.07
prob_other_stop = 0.03 prob_other_stop = 0.03
prob_loss = max(0.0, 1.0 - win_rate - prob_time_stop - prob_other_stop) prob_loss = max(
0.0,
1.0 - win_rate_eff - prob_time_stop - prob_other_stop - prob_harvest,
)
avg_time_stop_pl = 0.10 * credit avg_time_stop_pl = 0.10 * credit
e_trade_gross = ( e_trade_gross = (
win_rate * tp_profit win_rate_eff * tp_profit
- prob_loss * sl_loss - prob_loss * sl_loss_eff
+ prob_time_stop * avg_time_stop_pl + prob_time_stop * avg_time_stop_pl
+ prob_harvest * extra_harvest_ev
) )
fees = 0.0003 * spot * 2 fees = 0.0003 * spot * 2
slippage = 0.03 * credit slippage = 0.03 * credit
e_trade_net = e_trade_gross - fees - slippage e_trade_net = e_trade_gross - fees - slippage
# Multi-posizione concorrente: il P/L scala col numero di posizioni
# aperte simultaneamente (il loop entry crea N trade indipendenti
# quando max_concurrent > 1). Vedi caveat aggressiva.yaml: il
# supporto multi-asset richiede modifiche di codice; questo
# moltiplicatore stima cosa otterresti DOPO.
concurrency = max(1.0, caps["max_concurrent"]) concurrency = max(1.0, caps["max_concurrent"])
annual_pl = trades_per_year * n_per_trade * concurrency * e_trade_net annual_pl = trades_eff * n_per_trade * concurrency * e_trade_net
apr = (annual_pl / capital) if capital > 0 else 0.0 apr = (annual_pl / capital) if capital > 0 else 0.0
# --- Max drawdown -------------------------------------------------
# Due metriche distinte:
#
# 1. **Streak atteso (P99)**: lunghezza della peggior sequenza di
# stop consecutivi che ci si aspetta di vedere in un anno con
# probabilità ≤ 1%. Usa l'approssimazione union-bound:
# P(streak ≥ N in N_trade tentativi) ≈ N_trade × p_loss^N
# Imponendo questa quantità ≤ 0.01 e risolvendo per N:
# N = ceil( log(0.01 / N_trade) / log(p_loss) )
# Drawdown corrispondente = N × stop_loss × contracts × concurrency.
#
# 2. **Tail/gap risk**: scenario "gap notturno" in cui il mark
# salta oltre la copertura long PRIMA che lo stop sia
# eseguibile. La perdita massima reale è la larghezza intera
# dello spread meno il credito iniziale, su tutte le posizioni
# aperte simultaneamente.
if prob_loss > 0 and prob_loss < 1 and trades_per_year > 0:
streak_99 = max(
1,
int(math.ceil(
math.log(0.01 / trades_per_year) / math.log(prob_loss)
)) if prob_loss < 1 else 1,
)
else:
streak_99 = 0
expected_dd_usd = streak_99 * sl_loss * n_per_trade * concurrency
expected_dd_pct = expected_dd_usd / capital if capital > 0 else 0.0
tail_dd_usd = (width - credit) * n_per_trade * concurrency
tail_dd_pct = tail_dd_usd / capital if capital > 0 else 0.0
return { return {
"width": width, "width": width,
"credit": credit, "credit": credit,
"tp_profit": tp_profit, "tp_profit": tp_profit,
"sl_loss": sl_loss, "sl_loss": sl_loss_eff,
"risk_target": risk_target, "risk_target": risk_target,
"n_per_trade": float(n_per_trade), "n_per_trade": float(n_per_trade),
"concurrency": concurrency, "concurrency": concurrency,
@@ -403,6 +545,15 @@ def _compute_pl(
"apr": apr, "apr": apr,
"fees": fees, "fees": fees,
"slippage": slippage, "slippage": slippage,
"prob_loss": prob_loss,
"prob_harvest": prob_harvest,
"streak_99": float(streak_99),
"expected_dd_usd": expected_dd_usd,
"expected_dd_pct": expected_dd_pct,
"tail_dd_usd": tail_dd_usd,
"tail_dd_pct": tail_dd_pct,
"win_rate_eff": win_rate_eff,
"trades_eff": trades_eff,
} }
@@ -411,6 +562,8 @@ def _render_profile_card(
caps: dict[str, float], caps: dict[str, float],
metrics: dict[str, float], metrics: dict[str, float],
badge: str, badge: str,
features: dict[str, bool] | None = None,
metrics_base: dict[str, float] | None = None,
) -> None: ) -> None:
"""Rendering di un profilo (conservativo o aggressivo) in una colonna.""" """Rendering di un profilo (conservativo o aggressivo) in una colonna."""
st.markdown(f"### {label} {badge}") st.markdown(f"### {label} {badge}")
@@ -420,23 +573,86 @@ def _render_profile_card(
f"max {caps['max_n']:.0f} contratti × " f"max {caps['max_n']:.0f} contratti × "
f"{caps['max_concurrent']:.0f} pos. concorrenti" f"{caps['max_concurrent']:.0f} pos. concorrenti"
) )
if features:
active = [k for k, v in features.items() if v]
if active:
st.caption(
"🟢 Miglioramenti attivi: "
+ " · ".join(
{
"IV": "**IV-RV gate**",
"A": "**A** delta dinamico",
"D": "**D** vol-harvest",
"F": "**F** auto-pause",
}.get(k, k)
for k in active
)
)
else:
st.caption("⚪ Nessun miglioramento attivo (formula base)")
cols = st.columns(2) cols = st.columns(2)
cols[0].metric("Contratti per trade", f"{metrics['n_per_trade']:.0f}") cols[0].metric("Contratti per trade", f"{metrics['n_per_trade']:.0f}")
cols[1].metric("Posizioni concorrenti", f"{metrics['concurrency']:.0f}") cols[1].metric("Posizioni concorrenti", f"{metrics['concurrency']:.0f}")
cols = st.columns(2) cols = st.columns(2)
e_delta = (
f"{metrics['e_trade_net'] - metrics_base['e_trade_net']:+.1f}"
if metrics_base
else None
)
pl_delta = (
f"{metrics['annual_pl'] - metrics_base['annual_pl']:+.0f} USD vs base"
if metrics_base
else f"{metrics['apr']:+.1%} APR"
)
cols[0].metric( cols[0].metric(
"E[trade] netto", "E[trade] netto",
f"{metrics['e_trade_net']:+.1f} USD", f"{metrics['e_trade_net']:+.1f} USD",
delta=e_delta,
help=( help=(
f"fees={metrics['fees']:.2f} USD, " f"win_rate effettivo={metrics['win_rate_eff']:.0%}, "
f"slippage={metrics['slippage']:.2f} USD" f"prob_loss={metrics['prob_loss']:.0%}, "
f"trade/anno={metrics['trades_eff']:.0f}"
), ),
) )
cols[1].metric( cols[1].metric(
"P/L annuo stimato", "P/L annuo stimato",
f"{metrics['annual_pl']:+.0f} USD", f"{metrics['annual_pl']:+.0f} USD",
delta=f"{metrics['apr']:+.1%} APR", delta=f"{metrics['apr']:+.1%} APR" + (
f" ({metrics['annual_pl'] - metrics_base['annual_pl']:+.0f} vs base)"
if metrics_base
else ""
),
)
cols = st.columns(2)
cols[0].metric(
"Max DD attesa (P99)",
f"{metrics['expected_dd_usd']:.0f} USD",
delta=f"{-metrics['expected_dd_pct']:+.1%} cap",
delta_color="inverse",
help=(
f"Streak di {int(metrics['streak_99'])} stop consecutivi "
f"(probabilità ≤ 1% nell'anno) × perdita stop "
f"({metrics['sl_loss']:.0f} USD) × contratti × posizioni "
f"concorrenti. È la peggior sequenza che ti aspetti di "
"vedere; il drawdown reale può essere maggiore se i filtri "
"non rilevano un regime change."
),
)
cols[1].metric(
"Max DD coda (gap)",
f"{metrics['tail_dd_usd']:.0f} USD",
delta=f"{-metrics['tail_dd_pct']:+.1%} cap",
delta_color="inverse",
help=(
"Scenario gap notturno: il mark salta oltre la copertura "
"long PRIMA che lo stop sia eseguibile. Perdita = larghezza "
"intera meno credito, su tutte le posizioni aperte. "
"I filtri quant + macro lo riducono ma NON lo annullano."
),
) )
if metrics["n_per_trade"] == 0: if metrics["n_per_trade"] == 0:
@@ -481,12 +697,45 @@ def _render_pl_panel(
cons_caps = _profile_caps(strategy_conservativa or strategy_main) cons_caps = _profile_caps(strategy_conservativa or strategy_main)
aggr_caps = _profile_caps(strategy_aggressiva) aggr_caps = _profile_caps(strategy_aggressiva)
cons_feats = _detect_features(strategy_conservativa or strategy_main)
aggr_feats = _detect_features(strategy_aggressiva)
apply_features = st.checkbox(
"Applica gli effetti dei miglioramenti FDAC + IV-RV gate "
"letti dai due `strategy.*.yaml`",
value=True,
help=(
"Quando ON, ogni colonna applica gli effetti stimati delle "
"feature attive nel rispettivo profilo. OFF = formula base "
"(senza miglioramenti) per confronto pulito."
),
)
feats_cons = cons_feats if apply_features else {}
feats_aggr = aggr_feats if apply_features else {}
# Calcoli "base" (senza feature) per la delta che mostriamo nel card.
cons_base = _compute_pl(
cons_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
)
aggr_base = _compute_pl(
aggr_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
)
cons = _compute_pl( cons = _compute_pl(
cons_caps, cons_caps,
capital=capital, capital=capital,
spot=spot, spot=spot,
win_rate=win_rate, win_rate=win_rate,
trades_per_year=trades_per_year, trades_per_year=trades_per_year,
features=feats_cons,
) )
aggr = _compute_pl( aggr = _compute_pl(
aggr_caps, aggr_caps,
@@ -494,6 +743,7 @@ def _render_pl_panel(
spot=spot, spot=spot,
win_rate=win_rate, win_rate=win_rate,
trades_per_year=trades_per_year, trades_per_year=trades_per_year,
features=feats_aggr,
) )
col_cons, col_aggr = st.columns(2) col_cons, col_aggr = st.columns(2)
@@ -502,7 +752,9 @@ def _render_pl_panel(
"🛡️ Conservativa", "🛡️ Conservativa",
cons_caps, cons_caps,
cons, cons,
"_(golden config v1.0.0)_", "_(golden config v1.2.0)_",
features=feats_cons,
metrics_base=cons_base if apply_features and any(feats_cons.values()) else None,
) )
with col_aggr: with col_aggr:
_render_profile_card( _render_profile_card(
@@ -510,6 +762,8 @@ def _render_pl_panel(
aggr_caps, aggr_caps,
aggr, aggr,
"_(deroga §11, richiede paper trading)_", "_(deroga §11, richiede paper trading)_",
features=feats_aggr,
metrics_base=aggr_base if apply_features and any(feats_aggr.values()) else None,
) )
if aggr["annual_pl"] > 0 and cons["annual_pl"] > 0: if aggr["annual_pl"] > 0 and cons["annual_pl"] > 0:
@@ -530,6 +784,43 @@ def _render_pl_panel(
"viable." "viable."
) )
# === Mini-tabella: contributo marginale di ogni feature =====
if apply_features and (any(feats_cons.values()) or any(feats_aggr.values())):
st.markdown("**Contributo marginale di ogni feature** (profilo aggressivo)")
contrib_rows = []
for label, key in [
("IV — IV-richness gate", "IV"),
("A — Delta dinamico", "A"),
("D — Vol-harvest", "D"),
("F — Auto-pause", "F"),
]:
single_feat = {key: True}
m = _compute_pl(
aggr_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
features=single_feat,
)
delta_pl = m["annual_pl"] - aggr_base["annual_pl"]
delta_apr = m["apr"] - aggr_base["apr"]
active = "" if aggr_feats.get(key) else ""
contrib_rows.append(
{
"Feature": label,
"Attiva nel YAML": active,
"ΔP/L annuo (solo questa)": f"{delta_pl:+.0f} USD",
"ΔAPR": f"{delta_apr:+.1%}",
}
)
st.table(contrib_rows)
st.caption(
"Ogni riga mostra il contributo del SINGOLO feature (le altre "
"spente). Effetti stimati ex-ante; calibrabili sui dati "
"raccolti via `📐 Calibrazione`."
)
# Sensibilità win-rate per il profilo aggressivo (più informativo) # Sensibilità win-rate per il profilo aggressivo (più informativo)
st.markdown("**Sensibilità al win rate** (profilo aggressivo)") st.markdown("**Sensibilità al win rate** (profilo aggressivo)")
sens_rows = [] sens_rows = []
@@ -540,6 +831,7 @@ def _render_pl_panel(
spot=spot, spot=spot,
win_rate=wr, win_rate=wr,
trades_per_year=trades_per_year, trades_per_year=trades_per_year,
features=feats_aggr,
) )
m_c = _compute_pl( m_c = _compute_pl(
cons_caps, cons_caps,
@@ -547,14 +839,15 @@ def _render_pl_panel(
spot=spot, spot=spot,
win_rate=wr, win_rate=wr,
trades_per_year=trades_per_year, trades_per_year=trades_per_year,
features=feats_cons,
) )
sens_rows.append( sens_rows.append(
{ {
"Win rate": f"{wr:.0%}", "Win rate": f"{wr:.0%}",
"Conservativa P/L": f"{m_c['annual_pl']:+.0f} USD", "Cons. APR": f"{m_c['apr']:+.1%}",
"Conservativa APR": f"{m_c['apr']:+.1%}", "Cons. Max DD": f"{m_c['expected_dd_pct']:.1%}",
"Aggressiva P/L": f"{m_a['annual_pl']:+.0f} USD", "Aggr. APR": f"{m_a['apr']:+.1%}",
"Aggressiva APR": f"{m_a['apr']:+.1%}", "Aggr. Max DD": f"{m_a['expected_dd_pct']:.1%}",
} }
) )
st.table(sens_rows) st.table(sens_rows)
+175
View File
@@ -0,0 +1,175 @@
"""Auto-pause circuit breaker (§7-bis F).
Pure-function evaluation that consults `system_state.auto_pause_until`
and the rolling P/L of the last N closed positions to decide whether
the engine should skip an entry cycle.
Two responsibilities, both deterministic at call time:
* :func:`is_paused` returns ``True`` when the persisted
``auto_pause_until`` is in the future. Independent from the kill
switch, which targets technical errors.
* :func:`evaluate_drawdown_breach` given the last N closed P/Ls and
the current capital, returns whether the rolling drawdown breached
the configured ``max_drawdown_pct`` threshold. The orchestrator
layer is the one that flips the persisted state on breach (this
module stays I/O-free for testability).
The two are separated on purpose: ``is_paused`` is the cheap,
read-only gate consulted at the start of every entry cycle; the
breach evaluation runs once per cycle right after the entry
filtering, before the entry is actually placed.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from decimal import Decimal
from cerbero_bite.config.schema import AutoPauseConfig
from cerbero_bite.state.models import SystemStateRecord
__all__ = [
"AutoPauseDecision",
"PauseStatus",
"evaluate_drawdown_breach",
"is_paused",
"pause_until",
]
@dataclass(frozen=True)
class PauseStatus:
"""Snapshot del flag di auto-pausa al momento della valutazione."""
paused: bool
until: datetime | None
reason: str | None
@dataclass(frozen=True)
class AutoPauseDecision:
"""Esito di :func:`evaluate_drawdown_breach`."""
should_pause: bool
cumulative_pnl_usd: Decimal
drawdown_pct: Decimal
threshold_pct: Decimal
reason: str | None
def is_paused(
state: SystemStateRecord | None, *, now: datetime
) -> PauseStatus:
"""Restituisce lo stato della pausa rispetto a ``now``.
``state == None`` o ``auto_pause_until == None`` o
``auto_pause_until <= now`` engine attivo.
"""
if state is None or state.auto_pause_until is None:
return PauseStatus(paused=False, until=None, reason=None)
until = state.auto_pause_until
if until.tzinfo is not None and now.tzinfo is None:
# Coerenza: se il valore persistito è tz-aware, normalizziamo.
return PauseStatus(
paused=until > now.replace(tzinfo=until.tzinfo),
until=until,
reason=state.auto_pause_reason,
)
return PauseStatus(
paused=until > now,
until=until,
reason=state.auto_pause_reason,
)
def pause_until(now: datetime, weeks: int) -> datetime:
"""Calcola la scadenza della pausa (``now + weeks``).
Estratto in funzione separata per facilitare i test e per ricordare
che la pausa è espressa in **settimane** (la strategia ha cron
settimanale; pause più corte non avrebbero modo di evitare una
settimana di entry).
"""
return now + timedelta(weeks=max(1, weeks))
def evaluate_drawdown_breach(
*,
cfg: AutoPauseConfig,
recent_pnl_usd: list[Decimal],
capital_usd: Decimal,
) -> AutoPauseDecision:
"""Decide se la pausa va armata ora dato il rolling P/L.
Regola: se la somma dei P/L delle ultime ``cfg.lookback_trades``
posizioni chiuse è negativa e in valore assoluto eccede
``cfg.max_drawdown_pct × capital_usd``, ritorna
``should_pause=True``. Tutte le altre condizioni False.
``cfg.enabled=False`` ritorna sempre False (filtro disabilitato).
Lookback insufficiente ritorna False (non scattiamo finché non
abbiamo abbastanza storia per giudicare).
"""
threshold_pct = cfg.max_drawdown_pct
cumulative = sum((p for p in recent_pnl_usd), start=Decimal("0"))
if not cfg.enabled:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
if len(recent_pnl_usd) < cfg.lookback_trades:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
if capital_usd <= 0:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
# Solo perdite ci interessano: vincite cumulate non scattano la pausa.
if cumulative >= 0:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=cumulative / capital_usd,
threshold_pct=threshold_pct,
reason=None,
)
drawdown_pct = (-cumulative) / capital_usd
if drawdown_pct >= threshold_pct:
return AutoPauseDecision(
should_pause=True,
cumulative_pnl_usd=cumulative,
drawdown_pct=drawdown_pct,
threshold_pct=threshold_pct,
reason=(
f"rolling DD {drawdown_pct:.2%}{threshold_pct:.2%} "
f"(last {cfg.lookback_trades} trades, "
f"cumulative {cumulative} USD)"
),
)
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=drawdown_pct,
threshold_pct=threshold_pct,
reason=None,
)
+92 -1
View File
@@ -38,6 +38,7 @@ from cerbero_bite.core.entry_validator import (
from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check
from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts
from cerbero_bite.core.types import OptionQuote from cerbero_bite.core.types import OptionQuote
from cerbero_bite.runtime import auto_pause as auto_pause_module
from cerbero_bite.runtime.alert_manager import AlertManager from cerbero_bite.runtime.alert_manager import AlertManager
from cerbero_bite.runtime.dependencies import RuntimeContext from cerbero_bite.runtime.dependencies import RuntimeContext
from cerbero_bite.state import ( from cerbero_bite.state import (
@@ -64,6 +65,7 @@ _STATUS_NO_ENTRY = "no_entry"
_STATUS_BROKER_REJECT = "broker_reject" _STATUS_BROKER_REJECT = "broker_reject"
_STATUS_KILL_SWITCH = "kill_switch_armed" _STATUS_KILL_SWITCH = "kill_switch_armed"
_STATUS_HAS_OPEN = "has_open_position" _STATUS_HAS_OPEN = "has_open_position"
_STATUS_AUTO_PAUSED = "auto_paused"
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -94,6 +96,7 @@ class _MarketSnapshot:
portfolio_eur: Decimal portfolio_eur: Decimal
dealer_net_gamma: Decimal | None dealer_net_gamma: Decimal | None
liquidation_squeeze_risk_high: bool | None liquidation_squeeze_risk_high: bool | None
iv_minus_rv: Decimal | None
async def _gather_snapshot( async def _gather_snapshot(
@@ -159,6 +162,9 @@ async def _gather_snapshot(
liquidation_t: asyncio.Task[bool | None] = asyncio.create_task( liquidation_t: asyncio.Task[bool | None] = asyncio.create_task(
_safe_liquidation_squeeze(sentiment) _safe_liquidation_squeeze(sentiment)
) )
iv_rv_t: asyncio.Task[Decimal | None] = asyncio.create_task(
_safe_iv_minus_rv(deribit)
)
await asyncio.gather( await asyncio.gather(
spot_t, spot_t,
@@ -172,6 +178,7 @@ async def _gather_snapshot(
portfolio_t, portfolio_t,
dealer_t, dealer_t,
liquidation_t, liquidation_t,
iv_rv_t,
) )
return _MarketSnapshot( return _MarketSnapshot(
spot_eth_usd=spot_t.result(), spot_eth_usd=spot_t.result(),
@@ -185,6 +192,7 @@ async def _gather_snapshot(
portfolio_eur=portfolio_t.result(), portfolio_eur=portfolio_t.result(),
dealer_net_gamma=dealer_t.result(), dealer_net_gamma=dealer_t.result(),
liquidation_squeeze_risk_high=liquidation_t.result(), liquidation_squeeze_risk_high=liquidation_t.result(),
iv_minus_rv=iv_rv_t.result(),
) )
@@ -196,6 +204,20 @@ async def _safe_dealer_gamma(deribit: DeribitClient) -> Decimal | None:
return snap.total_net_dealer_gamma return snap.total_net_dealer_gamma
async def _safe_iv_minus_rv(deribit: DeribitClient) -> Decimal | None:
"""Best-effort fetch of the IV30g RV30g spread (vol points)."""
try:
rv = await deribit.realized_vol("ETH")
except Exception:
return None
if not isinstance(rv, dict):
return None
value = rv.get("iv_minus_rv_30d")
if value is None:
return None
return value if isinstance(value, Decimal) else Decimal(str(value))
async def _safe_liquidation_squeeze(sentiment: SentimentClient) -> bool | None: async def _safe_liquidation_squeeze(sentiment: SentimentClient) -> bool | None:
try: try:
heatmap = await sentiment.liquidation_heatmap("ETH") heatmap = await sentiment.liquidation_heatmap("ETH")
@@ -322,6 +344,28 @@ async def run_entry_cycle(
) )
return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch") return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch")
# §7-bis (F): auto-pause circuit breaker. Read-only consultation
# of the persisted state — the breach evaluation runs later, after
# capital is known.
conn = connect_state(ctx.db_path)
try:
sys_state = ctx.repository.get_system_state(conn)
finally:
conn.close()
pause_status = auto_pause_module.is_paused(sys_state, now=when)
if pause_status.paused:
await alert.low(
source="entry_cycle",
message=(
f"auto-paused until {pause_status.until} "
f"({pause_status.reason or 'no reason'}) — skipping"
),
)
return EntryCycleResult(
status=_STATUS_AUTO_PAUSED,
reason=pause_status.reason or "auto_paused",
)
# Has open position? # Has open position?
conn = connect_state(ctx.db_path) conn = connect_state(ctx.db_path)
try: try:
@@ -344,6 +388,44 @@ async def run_entry_cycle(
) )
capital_usd = snap.portfolio_eur * eur_to_usd_rate capital_usd = snap.portfolio_eur * eur_to_usd_rate
# §7-bis (F): rolling drawdown breach evaluation. Se le ultime N
# posizioni chiuse hanno cumulato perdite oltre la soglia, armiamo
# la pausa e usciamo subito (l'entry di questo ciclo è saltata).
auto_cfg = cfg.auto_pause
if auto_cfg.enabled:
conn = connect_state(ctx.db_path)
try:
recent_pnls = ctx.repository.recent_closed_position_pnls_usd(
conn, limit=auto_cfg.lookback_trades
)
finally:
conn.close()
breach = auto_pause_module.evaluate_drawdown_breach(
cfg=auto_cfg,
recent_pnl_usd=recent_pnls,
capital_usd=capital_usd,
)
if breach.should_pause:
until = auto_pause_module.pause_until(when, auto_cfg.pause_weeks)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.set_auto_pause(
conn, until=until, reason=breach.reason
)
finally:
conn.close()
await alert.high(
source="entry_cycle",
message=(
f"auto-pause armed: {breach.reason} — paused until {until}"
),
)
return EntryCycleResult(
status=_STATUS_AUTO_PAUSED,
reason=breach.reason or "auto_paused",
)
# 2. Entry filters # 2. Entry filters
entry_ctx = EntryContext( entry_ctx = EntryContext(
capital_usd=capital_usd, capital_usd=capital_usd,
@@ -353,6 +435,7 @@ async def run_entry_cycle(
next_macro_event_in_days=snap.macro_days_to_event, next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False, has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma, dealer_net_gamma=snap.dealer_net_gamma,
iv_minus_rv=snap.iv_minus_rv,
liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high, liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high,
) )
decision = validate_entry(entry_ctx, cfg) decision = validate_entry(entry_ctx, cfg)
@@ -370,6 +453,9 @@ async def run_entry_cycle(
"eth_holdings_pct": str(snap.eth_holdings_pct), "eth_holdings_pct": str(snap.eth_holdings_pct),
"portfolio_eur": str(snap.portfolio_eur), "portfolio_eur": str(snap.portfolio_eur),
"capital_usd": str(capital_usd), "capital_usd": str(capital_usd),
"iv_minus_rv": (
str(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
),
} }
} }
if not decision.accepted: if not decision.accepted:
@@ -436,7 +522,12 @@ async def run_entry_cycle(
) )
quotes = await _build_quotes(ctx.deribit, chain_meta) quotes = await _build_quotes(ctx.deribit, chain_meta)
selection = select_strikes( selection = select_strikes(
chain=quotes, bias=bias, spot=snap.spot_eth_usd, now=when, cfg=cfg chain=quotes,
bias=bias,
spot=snap.spot_eth_usd,
now=when,
cfg=cfg,
dvol_now=snap.dvol, # §3.2 (A) — strike picker dipendente dal regime DVOL
) )
if selection is None: if selection is None:
await _record_decision( await _record_decision(
@@ -0,0 +1,185 @@
"""Periodic option-chain snapshot collector (§13).
Fetches the Deribit option chain for every strike entro la finestra
DTE configurata, prima del trigger entry settimanale (cron
``55 13 * * MON`` di default). Persiste un quote per ogni strumento
in ``option_chain_snapshots`` con un timestamp condiviso, che diventa
il dato di base per:
* il backtest non-stilizzato (vedi ``core/backtest.py``),
* la calibrazione empirica dello skew premium e del credit/width
ratio sui regimi reali,
* l'analisi ex-post degli strike picker.
Il collector è **best-effort**: se ``get_tickers`` fallisce per un
batch, gli altri batch passano comunque; se manca completamente la
chain, il job ritorna 0 senza alzare eccezioni e logga il problema.
Non chiama l'order book per ogni strike (sarebbe troppo costoso) —
``book_depth_top3`` resta NULL nel quote, il liquidity gate del live
lo legge al volo solo per gli strike che gli interessano.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import TYPE_CHECKING, Any
from cerbero_bite.state import connect, transaction
from cerbero_bite.state.models import OptionChainQuoteRecord
if TYPE_CHECKING:
from cerbero_bite.runtime.dependencies import RuntimeContext
__all__ = ["DEFAULT_BATCH_SIZE", "collect_option_chain_snapshot"]
_log = logging.getLogger("cerbero_bite.runtime.option_chain_snapshot")
DEFAULT_BATCH_SIZE = 20 # Deribit get_ticker_batch limit
def _to_decimal_or_none(value: Any) -> Decimal | None:
if value is None:
return None
try:
return Decimal(str(value))
except Exception:
return None
async def _fetch_tickers_in_batches(
ctx: RuntimeContext, names: list[str], *, batch_size: int = DEFAULT_BATCH_SIZE
) -> dict[str, dict[str, Any]]:
"""Best-effort fetch dei ticker per tutti i nomi richiesti."""
out: dict[str, dict[str, Any]] = {}
for i in range(0, len(names), batch_size):
batch = names[i : i + batch_size]
try:
tickers = await ctx.deribit.get_tickers(batch)
except Exception as exc:
_log.warning(
"get_tickers failed for batch starting %s: %s",
batch[0] if batch else "<empty>", exc,
)
continue
for t in tickers:
name = t.get("instrument_name") or t.get("instrument")
if isinstance(name, str):
out[name] = t
return out
async def collect_option_chain_snapshot(
ctx: RuntimeContext,
*,
asset: str = "ETH",
now: datetime | None = None,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> int:
"""Collect + persist a single chain snapshot for ``asset``. Returns
the number of quotes persisted (0 on best-effort failure).
Filtra le scadenze nella finestra ``[dte_min, dte_max]`` di
``cfg.structure`` per non sprecare richieste su scadenze che il
rule engine non userebbe mai.
"""
when = (now or datetime.now(UTC)).astimezone(UTC)
cfg = ctx.cfg
expiry_from = when + timedelta(days=cfg.structure.dte_min)
expiry_to = when + timedelta(days=cfg.structure.dte_max)
try:
chain = await ctx.deribit.options_chain(
currency=asset.upper(),
expiry_from=expiry_from,
expiry_to=expiry_to,
min_open_interest=int(cfg.liquidity.open_interest_min),
)
except Exception:
_log.exception("option chain fetch failed")
return 0
if not chain:
_log.info("option chain empty for %s in window", asset)
return 0
names = [meta.name for meta in chain]
tickers = await _fetch_tickers_in_batches(ctx, names, batch_size=batch_size)
quotes: list[OptionChainQuoteRecord] = []
for meta in chain:
ticker = tickers.get(meta.name)
if ticker is None:
# Lasciamo comunque la riga senza quote: utile sapere
# che lo strumento esisteva.
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
)
)
continue
greeks = ticker.get("greeks") or {}
quotes.append(
OptionChainQuoteRecord(
timestamp=when,
asset=asset.upper(),
instrument_name=meta.name,
strike=meta.strike,
expiry=meta.expiry,
option_type=meta.option_type,
bid=_to_decimal_or_none(ticker.get("bid")),
ask=_to_decimal_or_none(ticker.get("ask")),
mid=_to_decimal_or_none(ticker.get("mark_price")),
iv=_to_decimal_or_none(ticker.get("mark_iv")),
delta=_to_decimal_or_none(greeks.get("delta")),
gamma=_to_decimal_or_none(greeks.get("gamma")),
theta=_to_decimal_or_none(greeks.get("theta")),
vega=_to_decimal_or_none(greeks.get("vega")),
open_interest=int(meta.open_interest)
if meta.open_interest is not None
else None,
volume_24h=(
int(ticker["volume_24h"])
if ticker.get("volume_24h") is not None
else None
),
# book_depth_top3: NULL — non lo prendiamo per ogni
# strike per non saturare l'API. Il liquidity gate
# del live lo chiede on-the-fly per gli strike
# candidati al picker.
)
)
persisted = 0
try:
conn = connect(ctx.db_path)
try:
with transaction(conn):
persisted = ctx.repository.record_option_chain_snapshot(
conn, quotes
)
finally:
conn.close()
except Exception:
_log.exception("persist option chain snapshot failed")
return 0
_log.info("option_chain_snapshot persisted %d quote(s)", persisted)
return persisted
# Avoid unused import warning for asyncio in lint when only used as type
_ = asyncio
+21
View File
@@ -34,6 +34,9 @@ from cerbero_bite.runtime.market_snapshot_cycle import (
DEFAULT_ASSETS, DEFAULT_ASSETS,
collect_market_snapshot, collect_market_snapshot,
) )
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle
from cerbero_bite.runtime.recovery import recover_state from cerbero_bite.runtime.recovery import recover_state
from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler
@@ -53,6 +56,7 @@ _CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *" _CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *" _CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *" _CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry
_BACKUP_RETENTION_DAYS = 30 _BACKUP_RETENTION_DAYS = 30
@@ -217,6 +221,8 @@ class Orchestrator:
manual_actions_cron: str = _CRON_MANUAL_ACTIONS, manual_actions_cron: str = _CRON_MANUAL_ACTIONS,
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT, market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS, market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
backup_dir: Path | None = None, backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS, backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler: ) -> AsyncIOScheduler:
@@ -282,6 +288,14 @@ class Orchestrator:
await _safe("market_snapshot", _do) await _safe("market_snapshot", _do)
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
await _safe("option_chain_snapshot", _do)
jobs: list[JobSpec] = [ jobs: list[JobSpec] = [
JobSpec(name="health", cron=health_cron, coro_factory=_health), JobSpec(name="health", cron=health_cron, coro_factory=_health),
JobSpec(name="backup", cron=backup_cron, coro_factory=_backup), JobSpec(name="backup", cron=backup_cron, coro_factory=_backup),
@@ -309,6 +323,13 @@ class Orchestrator:
coro_factory=_market_snapshot, coro_factory=_market_snapshot,
) )
) )
jobs.append(
JobSpec(
name="option_chain_snapshot",
cron=option_chain_cron,
coro_factory=_option_chain_snapshot,
)
)
else: else:
_log.warning( _log.warning(
"data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS=" "data analysis disabled (CERBERO_BITE_ENABLE_DATA_ANALYSIS="
@@ -0,0 +1,14 @@
-- 0004_auto_pause.sql — circuit breaker su drawdown rolling (§7-bis F)
--
-- Aggiunge alla `system_state` il timestamp fino a cui l'engine è in
-- pausa automatica per via di un drawdown sopra soglia. NULL = engine
-- attivo. Quando il valore è nel futuro, il rule engine salta il
-- ciclo entry e logga la motivazione.
--
-- Indipendente dal kill_switch (che resta dedicato a errori tecnici
-- e a comandi manuali esplicitati). Le due tutele coesistono.
ALTER TABLE system_state ADD COLUMN auto_pause_until TEXT;
ALTER TABLE system_state ADD COLUMN auto_pause_reason TEXT;
PRAGMA user_version = 4;
@@ -0,0 +1,42 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
-- in quello stesso tick condividono il timestamp, così filtrare per
-- "lo snapshot del 2026-05-04 alle 13:55" è una semplice
-- WHERE timestamp = X.
CREATE TABLE option_chain_snapshots (
timestamp TEXT NOT NULL,
asset TEXT NOT NULL,
instrument_name TEXT NOT NULL,
strike TEXT NOT NULL,
expiry TEXT NOT NULL,
option_type TEXT NOT NULL CHECK (option_type IN ('C','P')),
bid TEXT,
ask TEXT,
mid TEXT,
iv TEXT,
delta TEXT,
gamma TEXT,
theta TEXT,
vega TEXT,
open_interest INTEGER,
volume_24h INTEGER,
book_depth_top3 INTEGER,
PRIMARY KEY (timestamp, instrument_name)
) WITHOUT ROWID;
CREATE INDEX idx_option_chain_asset_ts
ON option_chain_snapshots(asset, timestamp DESC);
CREATE INDEX idx_option_chain_expiry
ON option_chain_snapshots(asset, expiry);
PRAGMA user_version = 5;
+33
View File
@@ -22,6 +22,7 @@ __all__ = [
"InstructionRecord", "InstructionRecord",
"ManualAction", "ManualAction",
"MarketSnapshotRecord", "MarketSnapshotRecord",
"OptionChainQuoteRecord",
"PositionRecord", "PositionRecord",
"PositionStatus", "PositionStatus",
"SystemStateRecord", "SystemStateRecord",
@@ -148,6 +149,36 @@ class MarketSnapshotRecord(BaseModel):
fetch_errors_json: str | None = None fetch_errors_json: str | None = None
class OptionChainQuoteRecord(BaseModel):
"""Row of the ``option_chain_snapshots`` table.
One row per (snapshot_ts, instrument) the same ``timestamp`` is
shared by every quote prelevato nello stesso tick del cron. Tutti
i campi numerici sono opzionali perché il collector è
best-effort: un ticker mancante non invalida il resto della chain.
"""
model_config = ConfigDict(extra="forbid")
timestamp: datetime
asset: str
instrument_name: str
strike: Decimal
expiry: datetime
option_type: Literal["C", "P"]
bid: Decimal | None = None
ask: Decimal | None = None
mid: Decimal | None = None
iv: Decimal | None = None
delta: Decimal | None = None
gamma: Decimal | None = None
theta: Decimal | None = None
vega: Decimal | None = None
open_interest: int | None = None
volume_24h: int | None = None
book_depth_top3: int | None = None
class ManualAction(BaseModel): class ManualAction(BaseModel):
"""Row of the ``manual_actions`` table.""" """Row of the ``manual_actions`` table."""
@@ -184,3 +215,5 @@ class SystemStateRecord(BaseModel):
config_version: str config_version: str
started_at: datetime started_at: datetime
last_audit_hash: str | None = None last_audit_hash: str | None = None
auto_pause_until: datetime | None = None
auto_pause_reason: str | None = None
+177
View File
@@ -24,6 +24,7 @@ from cerbero_bite.state.models import (
InstructionRecord, InstructionRecord,
ManualAction, ManualAction,
MarketSnapshotRecord, MarketSnapshotRecord,
OptionChainQuoteRecord,
PositionRecord, PositionRecord,
PositionStatus, PositionStatus,
SystemStateRecord, SystemStateRecord,
@@ -407,6 +408,103 @@ class Repository:
).fetchall() ).fetchall()
return [_row_to_market_snapshot(r) for r in rows] return [_row_to_market_snapshot(r) for r in rows]
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
def record_option_chain_snapshot(
self,
conn: sqlite3.Connection,
quotes: list[OptionChainQuoteRecord],
) -> int:
"""Bulk-insert dei quote di un singolo tick. Tutti i quote
condividono lo stesso ``timestamp``. Idempotente per
(timestamp, instrument_name)."""
if not quotes:
return 0
rows = [
(
_enc_dt(q.timestamp),
q.asset,
q.instrument_name,
_enc_dec(q.strike),
_enc_dt(q.expiry),
q.option_type,
_enc_dec(q.bid),
_enc_dec(q.ask),
_enc_dec(q.mid),
_enc_dec(q.iv),
_enc_dec(q.delta),
_enc_dec(q.gamma),
_enc_dec(q.theta),
_enc_dec(q.vega),
q.open_interest,
q.volume_24h,
q.book_depth_top3,
)
for q in quotes
]
conn.executemany(
"INSERT OR REPLACE INTO option_chain_snapshots("
"timestamp, asset, instrument_name, strike, expiry, option_type, "
"bid, ask, mid, iv, delta, gamma, theta, vega, "
"open_interest, volume_24h, book_depth_top3) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
rows,
)
return len(rows)
def list_option_chain_snapshots(
self,
conn: sqlite3.Connection,
*,
asset: str,
start: datetime | None = None,
end: datetime | None = None,
expiry_from: datetime | None = None,
expiry_to: datetime | None = None,
limit: int = 50000,
) -> list[OptionChainQuoteRecord]:
clauses: list[str] = ["asset = ?"]
params: list[Any] = [asset]
if start is not None:
clauses.append("timestamp >= ?")
params.append(_enc_dt(start))
if end is not None:
clauses.append("timestamp <= ?")
params.append(_enc_dt(end))
if expiry_from is not None:
clauses.append("expiry >= ?")
params.append(_enc_dt(expiry_from))
if expiry_to is not None:
clauses.append("expiry <= ?")
params.append(_enc_dt(expiry_to))
params.append(int(limit))
rows = conn.execute(
f"SELECT * FROM option_chain_snapshots "
f"WHERE {' AND '.join(clauses)} "
f"ORDER BY timestamp DESC, instrument_name ASC LIMIT ?",
params,
).fetchall()
return [_row_to_option_chain_quote(r) for r in rows]
def latest_option_chain_timestamp(
self,
conn: sqlite3.Connection,
*,
asset: str,
) -> datetime | None:
"""Timestamp dell'ultimo snapshot raccolto per ``asset``,
utile per stimare la freschezza del dato dalla GUI."""
row = conn.execute(
"SELECT timestamp FROM option_chain_snapshots "
"WHERE asset = ? ORDER BY timestamp DESC LIMIT 1",
(asset,),
).fetchone()
if row is None:
return None
return _dec_dt(row["timestamp"])
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# manual_actions # manual_actions
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -488,6 +586,16 @@ class Repository:
last_audit_hash=( last_audit_hash=(
row["last_audit_hash"] if "last_audit_hash" in keys else None row["last_audit_hash"] if "last_audit_hash" in keys else None
), ),
auto_pause_until=(
_dec_dt(row["auto_pause_until"])
if "auto_pause_until" in keys
else None
),
auto_pause_reason=(
row["auto_pause_reason"]
if "auto_pause_reason" in keys
else None
),
) )
def set_last_audit_hash( def set_last_audit_hash(
@@ -526,6 +634,43 @@ class Repository:
(_enc_dt(now),), (_enc_dt(now),),
) )
def set_auto_pause(
self,
conn: sqlite3.Connection,
*,
until: datetime | None,
reason: str | None,
) -> None:
"""Imposta o azzera la pausa automatica (§7-bis F).
``until = None`` annulla la pausa (l'engine torna attivo).
Il setter è idempotente: chiamarlo con un until già nel passato
è equivalente a clear.
"""
conn.execute(
"UPDATE system_state SET auto_pause_until = ?, "
"auto_pause_reason = ? WHERE id = 1",
(_enc_dt(until) if until is not None else None, reason),
)
def recent_closed_position_pnls_usd(
self, conn: sqlite3.Connection, *, limit: int
) -> list[Decimal]:
"""Ritorna la lista dei pnl_usd delle ultime ``limit`` posizioni chiuse,
ordinate dalla più recente alla più vecchia. Posizioni con
``pnl_usd`` ``NULL`` (es. chiuse di emergenza senza P/L noto)
sono saltate. Usato dal circuit breaker §7-bis F.
"""
if limit <= 0:
return []
rows = conn.execute(
"SELECT pnl_usd FROM positions "
"WHERE closed_at IS NOT NULL AND pnl_usd IS NOT NULL "
"ORDER BY closed_at DESC LIMIT ?",
(limit,),
).fetchall()
return [Decimal(row["pnl_usd"]) for row in rows]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Row → model converters # Row → model converters
@@ -645,6 +790,38 @@ def _row_to_market_snapshot(row: sqlite3.Row) -> MarketSnapshotRecord:
) )
def _row_to_option_chain_quote(row: sqlite3.Row) -> OptionChainQuoteRecord:
return OptionChainQuoteRecord(
timestamp=_dec_dt_required(row["timestamp"]),
asset=row["asset"],
instrument_name=row["instrument_name"],
strike=_dec_dec_required(row["strike"]),
expiry=_dec_dt_required(row["expiry"]),
option_type=row["option_type"],
bid=_dec_dec(row["bid"]),
ask=_dec_dec(row["ask"]),
mid=_dec_dec(row["mid"]),
iv=_dec_dec(row["iv"]),
delta=_dec_dec(row["delta"]),
gamma=_dec_dec(row["gamma"]),
theta=_dec_dec(row["theta"]),
vega=_dec_dec(row["vega"]),
open_interest=(
int(row["open_interest"])
if row["open_interest"] is not None
else None
),
volume_24h=(
int(row["volume_24h"]) if row["volume_24h"] is not None else None
),
book_depth_top3=(
int(row["book_depth_top3"])
if row["book_depth_top3"] is not None
else None
),
)
def _dec_dec_required(value: Any) -> Decimal: def _dec_dec_required(value: Any) -> Decimal:
out = _dec_dec(value) out = _dec_dec(value)
if out is None: if out is None:
+30 -2
View File
@@ -28,8 +28,8 @@
# 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante** # 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante**
# di cosa otterresti DOPO quel lavoro di codice. # di cosa otterresti DOPO quel lavoro di codice.
config_version: "1.0.0-aggressiva" config_version: "1.3.0-aggressiva"
config_hash: "b931a2b96fbc149b21cae84a196ee8bad10220b5ee8fa9ab0ed06ae52d7dc531" config_hash: "e983e156bf0c270941765e7b9639a35fdc6de7b091076bf5a9b360e294e81e4c"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -65,6 +65,10 @@ entry:
dealer_gamma_min: "0" dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true liquidation_filter_enabled: true
# IV richness gate (§2.9) — abilitato a 3 pt vol per profilo aggressivo.
iv_minus_rv_min: "3"
iv_minus_rv_filter_enabled: true
structure: structure:
dte_target: 18 dte_target: 18
@@ -78,6 +82,13 @@ structure:
distance_otm_pct_min: "0.15" distance_otm_pct_min: "0.15"
distance_otm_pct_max: "0.25" distance_otm_pct_max: "0.25"
# §3.2 (A): step-function delta-target per regime DVOL.
# DVOL bassa (≤50) → più premio; alta (>70) → più safety.
delta_by_dvol:
- {dvol_under: "50", delta_target: "0.15", delta_min: "0.13", delta_max: "0.17"}
- {dvol_under: "70", delta_target: "0.12", delta_min: "0.10", delta_max: "0.15"}
- {dvol_under: "90", delta_target: "0.10", delta_min: "0.08", delta_max: "0.12"}
spread_width: spread_width:
target_pct_of_spot: "0.04" target_pct_of_spot: "0.04"
min_pct_of_spot: "0.03" min_pct_of_spot: "0.03"
@@ -116,6 +127,14 @@ exit:
delta_breach_threshold: "0.30" delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05" adverse_move_4h_pct: "0.05"
# §7-bis (D): vol-harvest abilitato a 15 punti vol di crollo.
vol_harvest_dvol_decrease: "15"
# §7.1bis (C): scala graduata di profit-take. Pipeline runtime
# non ancora attiva; tenuta vuota fino al merge della
# partial-close pipeline.
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *" monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30 user_confirmation_timeout_min: 30
@@ -124,6 +143,15 @@ exit:
- "CLOSE_VOL" - "CLOSE_VOL"
- "CLOSE_DELTA" - "CLOSE_DELTA"
# §7-bis (F): circuit breaker abilitato. Soglia 15% (più tollerante
# del default conservativo perché la size aggressiva ha volatilità
# attesa più alta).
auto_pause:
enabled: true
lookback_trades: 5
max_drawdown_pct: "0.15"
pause_weeks: 2
execution: execution:
environment: "testnet" environment: "testnet"
eur_to_usd: "1.075" eur_to_usd: "1.075"
+15 -2
View File
@@ -15,8 +15,8 @@
# cerbero-bite config hash --file strategy.conservativa.yaml # cerbero-bite config hash --file strategy.conservativa.yaml
# e bumpare config_version. # e bumpare config_version.
config_version: "1.0.0-conservativa" config_version: "1.3.0-conservativa"
config_hash: "eff824281bbb538fba49434d8cc4b9c37675bc73d60e351293e263cc7e7b29ef" config_hash: "900646beb1dd0a7bfaf553f76adb4b55004eff1f094585f779302131625919e8"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -49,6 +49,10 @@ entry:
dealer_gamma_min: "0" dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true liquidation_filter_enabled: true
# IV richness gate (§2.9). Disabilitato di default.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure: structure:
dte_target: 18 dte_target: 18
@@ -100,6 +104,9 @@ exit:
delta_breach_threshold: "0.30" delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05" adverse_move_4h_pct: "0.05"
vol_harvest_dvol_decrease: "0"
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *" monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30 user_confirmation_timeout_min: 30
@@ -108,6 +115,12 @@ exit:
- "CLOSE_VOL" - "CLOSE_VOL"
- "CLOSE_DELTA" - "CLOSE_DELTA"
auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_weeks: 2
execution: execution:
environment: "testnet" environment: "testnet"
eur_to_usd: "1.075" eur_to_usd: "1.075"
+21 -2
View File
@@ -6,8 +6,8 @@
# config hash), and lands as a separate commit with the motivation in # config hash), and lands as a separate commit with the motivation in
# the commit message. # the commit message.
config_version: "1.0.0" config_version: "1.3.0"
config_hash: "4c2be4c51c849ed58fa22ec2b302016c453894dd0964b6d05445ab1b723e2d10" config_hash: "178a87467707d54d1ffef2d585a3a01be54de5ccc7e23493356eac47fd1c24d8"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -45,6 +45,10 @@ entry:
dealer_gamma_min: "0" dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true liquidation_filter_enabled: true
# IV richness gate (§2.9). Disabilitato di default.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure: structure:
dte_target: 18 dte_target: 18
@@ -96,6 +100,13 @@ exit:
delta_breach_threshold: "0.30" delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05" adverse_move_4h_pct: "0.05"
# §7-bis (D): vol-collapse harvest. 0 = disabilitato.
vol_harvest_dvol_decrease: "0"
# §7.1bis (C): scala graduata di profit-take. Vuoto = chiusura
# atomica. Pipeline runtime non ancora attiva (hook futuro).
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *" monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30 user_confirmation_timeout_min: 30
@@ -104,6 +115,14 @@ exit:
- "CLOSE_VOL" - "CLOSE_VOL"
- "CLOSE_DELTA" - "CLOSE_DELTA"
# §7-bis (F): circuit breaker su drawdown rolling. Disabilitato di
# default — abilitarlo solo dopo abbastanza posizioni chiuse.
auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_weeks: 2
execution: execution:
environment: "testnet" # testnet|mainnet — kill switch on broker mismatch environment: "testnet" # testnet|mainnet — kill switch on broker mismatch
eur_to_usd: "1.075" # default FX rate for sizing engine; override at boot eur_to_usd: "1.075" # default FX rate for sizing engine; override at boot
+10
View File
@@ -118,6 +118,16 @@ def _wire_market_snapshot(
}, },
is_reusable=True, is_reusable=True,
) )
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/get_realized_vol",
json={
"currency": "ETH",
"realized_vol_pct": {"14d": 30.0, "30d": 30.0},
"iv_current_pct": 38.0,
"iv_minus_rv_pct": {"14d": 8.0, "30d": 8.0},
},
is_reusable=True,
)
httpx_mock.add_response( httpx_mock.add_response(
url="http://mcp-sentiment:9014/tools/get_liquidation_heatmap", url="http://mcp-sentiment:9014/tools/get_liquidation_heatmap",
json={ json={
+1
View File
@@ -129,6 +129,7 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None:
"backup", "backup",
"manual_actions", "manual_actions",
"market_snapshot", "market_snapshot",
"option_chain_snapshot",
} }
+157
View File
@@ -0,0 +1,157 @@
"""TDD per :mod:`cerbero_bite.runtime.auto_pause` (§7-bis F)."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.config.schema import AutoPauseConfig
from cerbero_bite.runtime.auto_pause import (
evaluate_drawdown_breach,
is_paused,
pause_until,
)
from cerbero_bite.state.models import SystemStateRecord
_NOW = datetime(2026, 5, 1, 14, 0, tzinfo=UTC)
def _state(**overrides: object) -> SystemStateRecord:
base: dict[str, object] = {
"kill_switch": 0,
"last_health_check": _NOW,
"config_version": "1.0.0",
"started_at": _NOW - timedelta(hours=1),
}
base.update(overrides)
return SystemStateRecord(**base) # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# is_paused
# ---------------------------------------------------------------------------
def test_is_paused_returns_false_when_state_is_none() -> None:
status = is_paused(None, now=_NOW)
assert status.paused is False
def test_is_paused_returns_false_when_until_is_none() -> None:
status = is_paused(_state(), now=_NOW)
assert status.paused is False
def test_is_paused_returns_true_when_until_in_future() -> None:
status = is_paused(
_state(auto_pause_until=_NOW + timedelta(weeks=2),
auto_pause_reason="DD breach"),
now=_NOW,
)
assert status.paused is True
assert status.reason == "DD breach"
def test_is_paused_returns_false_when_until_in_past() -> None:
status = is_paused(
_state(auto_pause_until=_NOW - timedelta(seconds=1)),
now=_NOW,
)
assert status.paused is False
# ---------------------------------------------------------------------------
# pause_until
# ---------------------------------------------------------------------------
def test_pause_until_adds_weeks() -> None:
until = pause_until(_NOW, weeks=2)
assert until == _NOW + timedelta(weeks=2)
def test_pause_until_clamps_to_one_week_minimum() -> None:
# weeks <= 0 deve cmq dare almeno 1 settimana di pausa, altrimenti
# la cron settimanale potrebbe scattare comunque.
assert pause_until(_NOW, weeks=0) == _NOW + timedelta(weeks=1)
assert pause_until(_NOW, weeks=-3) == _NOW + timedelta(weeks=1)
# ---------------------------------------------------------------------------
# evaluate_drawdown_breach
# ---------------------------------------------------------------------------
def _cfg(**overrides: object) -> AutoPauseConfig:
base: dict[str, object] = {
"enabled": True,
"lookback_trades": 5,
"max_drawdown_pct": Decimal("0.10"),
"pause_weeks": 2,
}
base.update(overrides)
return AutoPauseConfig(**base) # type: ignore[arg-type]
def test_drawdown_breach_when_enabled_and_threshold_exceeded() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-50"), Decimal("-60"), Decimal("-40"),
Decimal("-30"), Decimal("-20")], # cum 200 USD
capital_usd=Decimal("1500"),
)
# |200| / 1500 = 0.133 > 0.10
assert decision.should_pause is True
assert decision.reason is not None
assert "rolling DD" in decision.reason
def test_no_breach_when_filter_disabled() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(enabled=False),
recent_pnl_usd=[Decimal("-200")] * 5, # massacro
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_lookback_insufficient() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(lookback_trades=5),
recent_pnl_usd=[Decimal("-100")] * 3, # solo 3 trade, serve 5
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_cumulative_positive() -> None:
# Anche con tante perdite, se la somma è positiva non scattiamo.
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-100"), Decimal("-50"),
Decimal("300"), Decimal("-20"), Decimal("-10")],
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_below_threshold() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-30")] * 5, # cum 150 / 1500 = 10% esatto
capital_usd=Decimal("1500"),
)
# esattamente alla soglia (>=) ⇒ pausa armata
assert decision.should_pause is True
def test_no_breach_when_capital_zero_or_negative() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-100")] * 5,
capital_usd=Decimal("0"),
)
assert decision.should_pause is False
+143
View File
@@ -329,3 +329,146 @@ def test_build_bear_call_breakeven_above_short_strike(
# breakeven = 3525 + 15 = 3540 # breakeven = 3525 + 15 = 3540
assert proposal.breakeven == Decimal("3540") assert proposal.breakeven == Decimal("3540")
assert proposal.spread_type == "bear_call" assert proposal.spread_type == "bear_call"
# ---------------------------------------------------------------------------
# §3.2 (A): dynamic delta target by DVOL regime
# ---------------------------------------------------------------------------
def _cfg_with_delta_bands(cfg: StrategyConfig) -> StrategyConfig:
"""Profilo con step-function delta su DVOL.
Vol bassa (50) delta 0.15 (più premio), vol media (70)
0.12 (default), vol alta (90) 0.10 (più safety distance).
"""
from cerbero_bite.config.schema import (
DeltaByDvolBand,
ShortStrikeSpec,
StructureConfig,
)
bands = [
DeltaByDvolBand(
dvol_under=Decimal("50"),
delta_target=Decimal("0.15"),
delta_min=Decimal("0.13"),
delta_max=Decimal("0.17"),
),
DeltaByDvolBand(
dvol_under=Decimal("70"),
delta_target=Decimal("0.12"),
delta_min=Decimal("0.10"),
delta_max=Decimal("0.15"),
),
DeltaByDvolBand(
dvol_under=Decimal("90"),
delta_target=Decimal("0.10"),
delta_min=Decimal("0.08"),
delta_max=Decimal("0.12"),
),
]
new_short = ShortStrikeSpec(
**{**cfg.structure.short_strike.model_dump(), "delta_by_dvol": bands}
)
return cfg.model_copy(
update={
"structure": StructureConfig(
**{**cfg.structure.model_dump(exclude={"short_strike"}),
"short_strike": new_short}
)
}
)
def _bull_put_chain_wide(now_dt: datetime) -> list[OptionQuote]:
"""Chain con shorts e longs per delta 0.10, 0.12, 0.15.
I mid sono tarati per superare il credit/width 30% per ogni
accoppiamento shortlong testato (vedi commento §3.4).
"""
return [
# Shorts a delta 0.10 / 0.12 / 0.15 in OTM range [15-25%].
_quote(strike="2535", delta="-0.15", mid="0.026", now_dt=now_dt),
_quote(strike="2475", delta="-0.12", mid="0.020", now_dt=now_dt),
_quote(strike="2400", delta="-0.10", mid="0.015", now_dt=now_dt),
# Long candidati ~4% sotto ciascuno short.
_quote(strike="2415", delta="-0.10", mid="0.012", now_dt=now_dt),
_quote(strike="2355", delta="-0.08", mid="0.006", now_dt=now_dt),
_quote(strike="2280", delta="-0.06", mid="0.002", now_dt=now_dt),
]
def test_dynamic_delta_low_dvol_picks_higher_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=40 → banda con delta_target=0.15."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("40"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.15")
def test_dynamic_delta_mid_dvol_picks_default_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=60 → banda con delta_target=0.12."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("60"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.12")
def test_dynamic_delta_high_dvol_picks_lower_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=85 → banda con delta_target=0.10 (più safety distance)."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("85"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.10")
def test_dynamic_delta_disabled_default_uses_static_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""delta_by_dvol vuoto (default) → comportamento invariato."""
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg, # golden config: delta_by_dvol=[]
dvol_now=Decimal("40"),
)
assert res is not None
short, _ = res
# Delta target statico = 0.12, quindi torna lo strike a -0.12.
assert short.delta == Decimal("-0.12")
+1 -1
View File
@@ -68,7 +68,7 @@ def test_compute_hash_is_independent_of_recorded_hash_value(tmp_path: Path) -> N
def test_load_repo_strategy_yaml(tmp_path: Path) -> None: def test_load_repo_strategy_yaml(tmp_path: Path) -> None:
"""The committed strategy.yaml validates with the recorded hash.""" """The committed strategy.yaml validates with the recorded hash."""
result = load_strategy(REPO_ROOT / "strategy.yaml") result = load_strategy(REPO_ROOT / "strategy.yaml")
assert result.config.config_version == "1.0.0" assert result.config.config_version == "1.3.0"
assert result.config.sizing.kelly_fraction == Decimal("0.13") assert result.config.sizing.kelly_fraction == Decimal("0.13")
assert result.computed_hash == result.config.config_hash assert result.computed_hash == result.config.config_hash
+56
View File
@@ -194,6 +194,62 @@ def test_dealer_gamma_filter_disabled_in_config(cfg: StrategyConfig) -> None:
assert decision.accepted is True assert decision.accepted is True
# ---------------------------------------------------------------------------
# IV richness gate (§2.9)
# ---------------------------------------------------------------------------
def _strict_iv_rv_cfg(
cfg: StrategyConfig, *, threshold: Decimal = Decimal("5")
) -> StrategyConfig:
return golden_config(
entry=EntryConfig(
**{
**cfg.entry.model_dump(),
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_min": threshold,
}
)
)
def test_iv_richness_gate_disabled_by_default_lets_thin_premium_pass(
cfg: StrategyConfig,
) -> None:
# Default config: filter disabled. Anche con IV-RV negativa (RV>IV)
# l'entry deve passare per non rompere setup pre-calibrazione.
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("-2")), cfg)
assert decision.accepted is True
def test_iv_richness_gate_blocks_when_below_floor(cfg: StrategyConfig) -> None:
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("3")), strict)
assert decision.accepted is False
assert any("IV richness" in r for r in decision.reasons)
def test_iv_richness_gate_passes_when_above_floor(cfg: StrategyConfig) -> None:
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("6")), strict)
assert decision.accepted is True
def test_iv_richness_gate_passes_at_exact_threshold(cfg: StrategyConfig) -> None:
# Soglia inclusiva: IV-RV == soglia → accettato (gate è "<", non "<=").
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("5")), strict)
assert decision.accepted is True
def test_iv_richness_gate_skipped_when_data_missing(cfg: StrategyConfig) -> None:
# MCP irraggiungibile: best-effort skip, non bloccare l'entry per
# un problema di infrastruttura.
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=None), strict)
assert decision.accepted is True
def test_validate_entry_accumulates_all_reasons(cfg: StrategyConfig) -> None: def test_validate_entry_accumulates_all_reasons(cfg: StrategyConfig) -> None:
decision = validate_entry( decision = validate_entry(
_good_ctx( _good_ctx(
+88
View File
@@ -271,3 +271,91 @@ def test_iron_condor_adverse_move_either_direction(cfg: StrategyConfig) -> None:
) )
res = evaluate(snap, cfg) res = evaluate(snap, cfg)
assert res.action == "CLOSE_AVERSE" assert res.action == "CLOSE_AVERSE"
# ---------------------------------------------------------------------------
# §7-bis (D): vol-collapse harvest
# ---------------------------------------------------------------------------
def _harvest_cfg(
cfg: StrategyConfig, *, threshold: str = "15"
) -> StrategyConfig:
"""Clona la golden config con la soglia di vol-harvest abilitata."""
from cerbero_bite.config import ExitConfig
return cfg.model_copy(
update={
"exit": ExitConfig(
**{
**cfg.exit.model_dump(),
"vol_harvest_dvol_decrease": Decimal(threshold),
}
)
}
)
def test_vol_harvest_disabled_by_default_does_not_fire(cfg: StrategyConfig) -> None:
# Default: vol_harvest_dvol_decrease = 0 ⇒ filtro disabilitato.
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022", # in profit (debit < credit)
dvol_at_entry="60",
dvol_now="40", # crollato di 20 punti
)
res = evaluate(snap, cfg)
assert res.action == "HOLD"
def test_vol_harvest_fires_when_dvol_collapsed_in_profit(
cfg: StrategyConfig,
) -> None:
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022", # in profit ma sopra profit_take 50%
dvol_at_entry="60",
dvol_now="42", # 18, supera la soglia 15
)
res = evaluate(snap, harvest)
assert res.action == "CLOSE_VOL_HARVEST"
assert "harvest" in res.reason
def test_vol_harvest_does_not_fire_when_in_loss(cfg: StrategyConfig) -> None:
# Anche se DVOL crolla, se siamo in perdita non vogliamo harvest:
# è una funzione di "esci con il profitto in mano", non un panico.
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.040", # debit > credit ⇒ in perdita
dvol_at_entry="60",
dvol_now="42",
)
res = evaluate(snap, harvest)
assert res.action != "CLOSE_VOL_HARVEST"
def test_vol_harvest_does_not_fire_below_threshold(cfg: StrategyConfig) -> None:
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022",
dvol_at_entry="60",
dvol_now="50", # 10, sotto la soglia 15
)
res = evaluate(snap, harvest)
assert res.action == "HOLD"
def test_profit_take_wins_over_vol_harvest(cfg: StrategyConfig) -> None:
# Quando il profit-take è già colpito, non passiamo per vol-harvest.
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.014", # ≤ 50% credit ⇒ profit-take
dvol_at_entry="60",
dvol_now="42",
)
res = evaluate(snap, harvest)
assert res.action == "CLOSE_PROFIT"
@@ -0,0 +1,134 @@
"""TDD per :mod:`cerbero_bite.runtime.option_chain_snapshot_cycle`."""
from __future__ import annotations
from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
from cerbero_bite.clients.deribit import InstrumentMeta
from cerbero_bite.runtime.option_chain_snapshot_cycle import (
collect_option_chain_snapshot,
)
from cerbero_bite.state.models import OptionChainQuoteRecord
_NOW = datetime(2026, 5, 4, 13, 55, tzinfo=UTC)
def _meta(name: str, strike: int, expiry_dte: int = 18) -> InstrumentMeta:
expiry = _NOW.replace(hour=8, minute=0, second=0)
expiry = expiry.replace(day=expiry.day) + (
# add days
__import__("datetime").timedelta(days=expiry_dte)
)
return InstrumentMeta(
name=name,
strike=Decimal(str(strike)),
expiry=expiry,
option_type="P",
open_interest=Decimal("100"),
tick_size=Decimal("0.0005"),
min_trade_amount=Decimal("1"),
)
def _ticker(name: str, *, mark: float = 0.020, bid: float = 0.018,
ask: float = 0.022, delta: float = -0.12) -> dict:
return {
"instrument_name": name,
"bid": bid,
"ask": ask,
"mark_price": mark,
"mark_iv": 60.0,
"volume_24h": 50,
"greeks": {
"delta": delta,
"gamma": 0.001,
"theta": -0.0005,
"vega": 0.10,
},
}
@pytest.fixture
def cfg() -> object:
from cerbero_bite.config import golden_config
return golden_config()
@pytest.fixture
def fake_ctx(cfg: object) -> MagicMock:
"""Mock minimal RuntimeContext."""
ctx = MagicMock()
ctx.cfg = cfg
ctx.db_path = ":memory:"
return ctx
@pytest.mark.asyncio
async def test_collector_persists_one_quote_per_instrument(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475), _meta("ETH-21MAY26-2400-P", 2400)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(
return_value=[_ticker(m.name) for m in metas]
)
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, asset="ETH", now=_NOW)
assert n == 2
assert len(persisted) == 1
assert {q.instrument_name for q in persisted[0]} == {
"ETH-21MAY26-2475-P", "ETH-21MAY26-2400-P",
}
# Tutti i quote condividono il timestamp del cron tick.
assert all(q.timestamp == _NOW for q in persisted[0])
@pytest.mark.asyncio
async def test_collector_handles_missing_tickers_with_null_fields(
fake_ctx: MagicMock,
) -> None:
metas = [_meta("ETH-21MAY26-2475-P", 2475)]
fake_ctx.deribit.options_chain = AsyncMock(return_value=metas)
fake_ctx.deribit.get_tickers = AsyncMock(return_value=[]) # vuoto
persisted: list[list[OptionChainQuoteRecord]] = []
def _record(_conn: object, qs: list[OptionChainQuoteRecord]) -> int:
persisted.append(qs)
return len(qs)
fake_ctx.repository.record_option_chain_snapshot = _record
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 1
assert persisted[0][0].mid is None # ticker mancante ⇒ campi NULL
assert persisted[0][0].instrument_name == "ETH-21MAY26-2475-P"
@pytest.mark.asyncio
async def test_collector_returns_zero_when_chain_empty(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(return_value=[])
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0
@pytest.mark.asyncio
async def test_collector_swallows_chain_fetch_failure(
fake_ctx: MagicMock,
) -> None:
fake_ctx.deribit.options_chain = AsyncMock(side_effect=RuntimeError("boom"))
n = await collect_option_chain_snapshot(fake_ctx, now=_NOW)
assert n == 0 # best-effort: non rilancia