33 Commits

Author SHA1 Message Date
root b1836d91c2 refactor(core): IV-RV adattivo distinct-days policy + backfill Deribit
Sblocca il warmup hard del gate IV-RV adattivo (~21 giorni residui)
permettendo di mischiare cadenze diverse (tick live 15min + backfill
giornaliero) senza assumere il fattore costante 96 tick/giorno.

API change (no backwards-compat shims):
* compute_adaptive_threshold(history, *, n_days, percentile,
  absolute_floor): rimossi `min_days`/`target_days`. La selezione
  finestra (target_days/min_days/intera storia) si sposta al caller.
  Warmup hard quando `n_days == 0`.
* repository: rimosso `iv_rv_history`; aggiunti
  `count_iv_rv_distinct_days` (COUNT DISTINCT substr(ts,1,10)) e
  `iv_rv_values_for_window`.
* EntryContext aggiunge `iv_rv_n_days: int = 0`. entry_cycle calcola
  n_days, sceglie window_days e popola il context. Audit
  `iv_rv_n_days` reale (non più len/96).
* GUI Calibrazione: counter giorni distinti tramite set di date.
* Spec aggiornata con errata 2026-05-10 e nuova warmup table.

Backfill (scripts/backfill_iv_rv.py, stdlib-only):
* Fetch DVOL daily + ETH/BTC-PERPETUAL closes da Deribit public REST.
* Calcolo RV30d annualizzato (stdev log-return × √365 × 100).
* INSERT OR REPLACE in market_snapshots con timestamp 12:00 UTC e
  fetch_errors_json='{"backfill":true}' per distinzione audit.
* Compute layer testato (9 test): RV su prezzi costanti/monotoni/
  alternati, build_records con cutoff e missing data.

Verifica live post-deploy (10 mag 2026 08:50 UTC):
* ETH: n_days=46, P25=2.21 vol pt, IV-RV=10.05 → gate PASS
* BTC: n_days=46, P25=5.69 vol pt, IV-RV=8.60  → gate PASS

509 test passati (500 esistenti + 9 backfill), ruff pulito.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 08:52:05 +00:00
root 6f4f2ce02e feat(runtime): audit log include threshold rolling e window usata
Risponde al final review (spec §6.4): il decisions log ora
contiene iv_rv_threshold_used (la soglia P_q effettivamente
applicata) e iv_rv_window_used_days (giorni di history nella
finestra). Permette ricostruire ex-post perché un trade è stato
saltato e con quali numeri.

Helper privi di I/O — la soglia viene ricomputata in base alla
history già caricata, costo trascurabile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:27:40 +00:00
root d2ff29fde3 fix(config): aggressiva config_hash ricalcolato post adaptive gate
L'hash dichiarato non rispecchiava più il contenuto del file dopo
l'attivazione del gate adattivo (commit 080acf8). Senza questo fix
il loader sollevava ConfigHashError e l'orchestrator rifiutava il
profilo Aggressiva al boot.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:26:24 +00:00
root eb0662e44d chore(lint): adaptive_threshold imports + entry_validator top-level import
- Sequence importato da collections.abc invece di typing (PYI001).
- compute_adaptive_threshold spostato a top-level (PLC0415):
  niente circular dep risk perché adaptive_threshold non importa
  da entry_validator.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:20:54 +00:00
root 64f4d4e09e feat(gui): pannello informativo Gate IV-RV adattivo in Calibrazione
Mostra status (warmup/attivo), soglia P25 rolling corrente, IV-RV
ultimo tick, floor assoluto, decisione hypothetical e sezione
Vol-of-Vol guard. Read-only: i percentili statici esistenti
restano per analisi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:14:34 +00:00
root 080acf829d feat(config): profilo Aggressiva attiva gate IV-RV adattivo + VoV
P25 rolling 60g, warmup a finestra disponibile, VoV guard 5pt.
Conservativa e golden invariati (default disabled).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:11:46 +00:00
root 98111814d2 test(integration): IV-RV adaptive gate end-to-end con SQLite reale
Verifica integrazione tra Repository.iv_rv_history,
compute_adaptive_threshold e dvol_lookback su un DB reale
seedato con 30 giorni di market_snapshots bimodale.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:09:27 +00:00
root 3190764f64 feat(runtime): entry_cycle popola iv_rv_history e dvol_24h_ago
Quando i flag adaptive_enabled / vol_of_vol_guard_enabled sono
attivi, entry_cycle carica history e lookback dal repository
prima di costruire EntryContext. Il decisions log riceve i meta
n_history e dvol_24h_ago per audit ex-post.

Quando i flag sono off, niente query DB extra (zero overhead).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:04:19 +00:00
root 8221aba10f fix(state): repository iv_rv_history time-stable + input validation
Risponde alla code review di 395191e:
- iv_rv_history accetta as_of (default now UTC) invece di
  affidarsi al clock SQLite, rendendo i test time-stable.
- Valida max_days > 0 e raise se as_of/reference sono naive.
- Aggiunge 3 test sulle nuove guard.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:00:06 +00:00
root 395191ea13 feat(state): Repository.iv_rv_history + dvol_lookback per gate adaptive
Due nuovi metodi che leggono market_snapshots filtrando NULL e
fetch_ok=0. iv_rv_history limita a max_days; dvol_lookback trova
il tick più vicino a un istante con tolerance configurabile.

Tests: ordered ASC, asset filter, NULL skip, fetch_ok=0 skip,
lookback closest, gap returns None.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:53:19 +00:00
root d36cdff609 feat(core): Vol-of-Vol guard in validate_entry + tests
Blocca entry se |DVOL_now - DVOL_24h_ago| >= threshold (default
5 pt). Fail-open quando dvol_24h_ago è None (gap dati). Independente
dal gate IV-RV: i due gate sono additivi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:36:59 +00:00
root 3a5cf2554b feat(core): IV-RV adaptive gate in validate_entry + tests
Quando iv_minus_rv_adaptive_enabled=True, la soglia diventa
max(P_q rolling, iv_minus_rv_min). Path legacy (statico) e
None-bypass restano invariati.

Aggiunge anche due model_validator a StrategyConfig per
fail-fast su config invalida (window_min_days < target_days,
percentile in (0,1)) — risponde alla code review T1.

Tests: pass/skip su rolling, warmup hard, floor binding,
backwards compat statico, None bypass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:29:48 +00:00
root ef3c512684 feat(core): EntryContext aggiunge iv_rv_history e dvol_24h_ago
Campi opzionali con default vuoto/None per non rompere i caller
esistenti. Saranno popolati da entry_cycle quando i flag
adaptive_enabled / vol_of_vol_guard_enabled sono True.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:24:43 +00:00
root 6eff8aab0f fix(core): adaptive_threshold input validation + boundary tests
Risponde alla code review di 7dc2fda:
- Valida percentile in [0,1] e 0 < min_days < target_days, raise
  ValueError quando out-of-range. Fail-fast invece di IndexError o
  silent wrong result.
- Aggiunge test boundary esattamente a min_days*96 e target_days*96
  (spec §9.1 item 9 era mancante).
- Aggiunge 4 test sulle nuove guards.
- Fix typo docstring (Determinismic → Deterministic).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:11:17 +00:00
root 7dc2fda524 feat(core): compute_adaptive_threshold pure function + tests
Implementa il calcolo del percentile rolling con warmup,
transizione min_days → target_days e floor assoluto. Pure
function senza I/O: il caller passa la sequenza pre-filtrata
(NULL e fetch_ok=0 esclusi).

Tests: warmup, transizione finestra, floor, percentili.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 21:36:50 +00:00
root 0fcfff7d7e feat(config): EntryConfig campi adaptive IV-RV gate + VoV guard
Aggiunge i flag e i parametri per il gate IV-RV adattivo (P25
rolling) e per il Vol-of-Vol guard. Default disabilitati per
non cambiare comportamento dei profili attuali.

Vedi docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 20:10:07 +00:00
root f889258952 docs(plans): IV-RV adaptive gate implementation plan
Piano TDD bite-sized in 11 task con steps dettagliati, codice
completo, comandi e expected output. Coverage completa dello
spec 2026-05-08-iv-rv-adaptive-gate-design.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 19:58:32 +00:00
root 2a4a82c8ef docs(specs): IV-RV adaptive gate design
Spec del gate IV-RV adattivo (P25 rolling 60g + Vol-of-Vol guard 5pt
24h) — riprende roadmap §4-quater di 13-strategia-spiegata.md punti
1 e 2 e li promuove a design pronto per implementazione.

Decisioni emerse dal brainstorming:
- Hybrid (percentile rolling + VoV guard), non regime detection
- Window target 60g, min 30g, sotto usa storia disponibile (warmup)
- Floor assoluto via vecchio iv_minus_rv_min (backwards compat)
- Inline nel validator, stateless, no DB cache
- GUI Calibrazione: pannello informativo, slider esistenti invariati
- Fail-open su tutti i casi di dato mancante

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 19:50:56 +00:00
root 467c8952e3 docs(migrations): 0005 — commento allineato a cadenza */15 reale
Il commento dichiarava cron settimanale (55 13 * * MON) ma lo
scheduler reale (orchestrator._CRON_OPTION_CHAIN_SNAPSHOT) è */15
24/7, allineato a market_snapshot. Aggiornato per evitare confusione
nei lettori futuri. Anche fixato l'header file (0004 → 0005).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 16:37:31 +00:00
root 3aaa059417 fix(deribit): DVOL midnight — finestra 1D estesa a ieri+oggi
Alle 00:00 UTC Deribit non ha ancora costruito il candle 1D di oggi:
con start_date=oggi la response è vuota e il client tirava
McpDataAnomalyError ('neither latest nor candles'). Includendo ieri
nello start_date, candles[-1] resta valido come fallback.

Verificato sui dati raccolti: 3 fail consecutivi 2026-05-02/03/04 a
00:00 UTC su ETH, zero fail dal 2026-05-05 in poi (container
rebuildato in mezzo al periodo).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 16:37:23 +00:00
root a2e7a78f8a feat(data): mirror ETH spot+DVOL in dvol_history dal market_snapshot
Popola dvol_history dentro la stessa transazione di market_snapshots,
così lo storico è disponibile anche in modalità data-only (STRATEGY=false).
Evita il warm-up vuoto di return_4h quando si abilita la strategia: il
monitor_cycle trova subito i campioni locali invece di dipendere dal
fallback Deribit get_historical.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 21:59:27 +00:00
root 6ff021fbf4 feat(strategy): abbandono gating settimanale — entry daily 24/7
Crypto opera 24/7: la cadenza settimanale lunedì-only era un retaggio
TradFi senza giustificazione. La nuova cadenza è giornaliera (cron
0 14 * * *), con i gate quantitativi a decidere se entrare o saltare.

Cambiamenti principali:

* runtime/orchestrator.py — _CRON_ENTRY 0 14 * * * (era MON)
* runtime/auto_pause.py — pause_until(days=) (era weeks=); minimo
  clamp 1 giorno (era 1 settimana)
* core/backtest.py — MondayPick→DailyPick, monday_picks→daily_picks
  (1 pick per calendar-day all'ora target); Sharpe annualization su
  ~120 trade/anno (era 52)
* config/schema.py — default cron daily; max_concurrent_positions 1→5;
  AutoPauseConfig.pause_weeks→pause_days, default 14
* runtime/option_chain_snapshot_cycle.py + orchestrator — cron */15
  per accumulo continuo dataset di backtest empirico

Strategy yamls (config_version 1.3.0 → 1.4.0, hash rigenerati):

* strategy.yaml — max_concurrent 1→5, cap_aggregate coerente
* strategy.aggressiva.yaml — max_concurrent 2→8, cap_aggregate
  3200→6400, max_contracts_per_trade invariato a 16
* strategy.conservativa.yaml — max_concurrent 1→3
* tutti — pause_weeks→pause_days: 14

GUI (pages/7_📚_Strategia.py):

* slider Trade/anno: range 20-200 (era 8-30), default 110, help
  riallineato sulla math 365 candidature × pass-rate 30-40%
* card profili: versione letta dinamicamente da config_version invece
  che hard-coded "v1.2.0"
* warning "entrambi perdono soldi" ora valuta i P/L effettivi
  (cons['annual_pl'], aggr['annual_pl']) invece del win_rate grezzo;
  aggiunto stato intermedio quando solo conservativo è in perdita

Tests (450/450 passati):

* test_auto_pause: pause_days, clamp ≥1 giorno
* test_backtest: rinomina + ridisegno daily picks (assert su
  calendar-day dedupe e hour filter)
* test_sizing_engine: other_open_positions=5 per cap default
* test_config_loader: version 1.4.0

Docs (README + 9 file in docs/) — tutti i riferimenti weekly/lunedì
allineati a daily/24-7, volume option_chain ricalcolato per cron
*/15 (~1.1 MB/giorno, ~400 MB/anno).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 16:21:16 +00:00
root dabcc8d15b docs: aggiornamento Phase 5 — IV-RV gate, F+D+A, backtest, option chain
- 01-strategy-rules.md:
  * §2.8 (filtri quant: dealer gamma + liquidation risk)
  * §2.9 (IV richness gate, opt-in, default disabled)
  * §3.2 — variante delta_by_dvol step-function
  * §7-bis.1 (vol-collapse harvest D)
  * §7-bis.2 (graduated profit-take C — scaffolding)
  * §7-bis.3 (auto-pause su drawdown F)

- 05-data-model.md:
  * `system_state.auto_pause_until / _reason` (migration 0004)
  * Nuova tabella `option_chain_snapshots` (migration 0005)
  * Tabella migrations completa (1→5)

- 13-strategia-spiegata.md:
  * §4-quinquies — catena opzioni storica (Phase 5):
    cosa raccoglie, cosa sblocca, CLI `option-chain
    trigger|analyze`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 21:29:00 +00:00
root 7fdd8b47a5 fix(gui): percentili Calibrazione in riga compatta, no truncation
I valori percentili (es. -0.0298, 0.05323) renderizzati come
``st.metric`` su 7 colonne venivano tagliati su viewport stretti:
ogni metric ha label sopra, font fisso, nessun shrink. Sostituito
con render markdown inline a font 0.85rem, single-line, scrollabile
orizzontalmente se serve. Tutti e 7 i percentili visibili senza
troncamento e senza wrap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 21:21:32 +00:00
root a1a9f74ed2 Merge feat/option-chain-snapshots 2026-05-01 21:08:28 +00:00
root a9df399db4 Merge feat/backtest-engine 2026-05-01 21:08:22 +00:00
root e06f4d5c96 Merge feat/strategy-improvements-fdac
# Conflicts:
#	src/cerbero_bite/gui/pages/7_📚_Strategia.py
#	strategy.aggressiva.yaml
#	strategy.conservativa.yaml
#	strategy.yaml
#	tests/unit/test_config_loader.py
2026-05-01 21:08:12 +00:00
root f24511fcad Merge feat/iv-rv-hard-gate 2026-05-01 21:06:32 +00:00
root f664ea1a15 feat(backtest): stylized engine over market_snapshots + CLI subcommand
Aggiunge `core/backtest.py`, motore di backtesting stilizzato che gira
sui dati raccolti in `market_snapshots`. Risponde alla domanda:
"se questa config fosse stata attiva nelle ultime N settimane, quanti
lunedì avrebbero superato i filtri e quale sarebbe stato il P/L stimato?"

**Architettura a due strati**:

1. **Filtri di entry — RIGOROSO**: per ogni Monday-14:00-UTC nei
   snapshot ricostruisce `EntryContext` e chiama lo stesso
   `validate_entry()` del live. Output esatto di "cosa avrebbe deciso
   il bot" per ogni settimana, con conteggio dei motivi di skip.

2. **P/L per trade accettato — STILIZZATO**: senza catena opzioni
   storica, stima credito/exit via Black-Scholes con skew premium
   (default 1.5×) per approssimare la vol smile dell'ETH. Re-prezza
   il combo ad ogni tick futuro per simulare i trigger §7
   (profit_take, stop_loss, vol_stop, time_stop, expiry).

**Aggregati nel `BacktestReport`**:
- n_picks / n_accepted / n_skipped_data / n_completed / n_winners
- win_rate, P/L cumulato (USD + % su capitale)
- max drawdown (USD + % di peak)
- Sharpe annualizzato (52 settimane)
- skip_reasons: dict{motivo → settimane bloccate}

**CLI**: nuovo `cerbero-bite backtest --strategy F --from D --to D
--capital N --asset ETH`. Stampa Rich-formatted summary + tabella
motivi di skip. Esempio:

    cerbero-bite backtest \
      --strategy strategy.aggressiva.yaml \
      --from 2026-04-01 --to 2026-05-01 \
      --capital 10000

**Limiti dichiarati**:
- BS + skew_premium ≠ catena reale: i numeri P/L sono **stime ex-post
  per ranking config**, non promesse operative. Buono per dire
  "config A batte config B sui dati reali", non per dimensionare
  capitale.
- skew_premium 1.5× è stato calibrato sui dati Deribit storici
  (smile slope ETH options); va rifinito quando avremo abbastanza
  chain history da farlo empiricamente.

**Tests**: 15 unit test (BS math, monday picks, filter sim,
position outcome simulation, full pipeline su sintetico).
Suite totale: 420 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:31:54 +00:00
root 18cc27a76e feat(gui): simulazione P/L con effetti dei miglioramenti FDAC + IV-RV
Estende il pannello "💰 P/L atteso" della pagina `📚 Strategia` per
applicare gli effetti stimati di IV-RV gate, A (delta dinamico),
D (vol-harvest) e F (auto-pause) leggendoli direttamente dai
`strategy.*.yaml` di ciascun profilo.

- Nuova `_detect_features(strategy)` che ispeziona la config:
    A → `short_strike.delta_by_dvol` non vuoto
    D → `exit.vol_harvest_dvol_decrease > 0`
    F → `auto_pause.enabled`
    IV → `entry.iv_minus_rv_filter_enabled`
- `_compute_pl` accetta ora un dict `features` opzionale e applica:
    IV: +5 pp win-rate, −25% trade/anno (skip-week aggressivo)
    A: +1.5 pp win-rate, sl_loss × 0.95 (strike picking migliore)
    D: 5% trade convertiti da loss a harvest exit (+0.20×credito)
    F: −8% trade/anno (skip-week dopo streak)
- `_render_profile_card` mostra ora:
    badge "🟢 Miglioramenti attivi" con la lista per profilo,
    delta vs base in E[trade] e P/L annuo,
    help con win_rate effettivo / prob_loss / trade/anno.
- Checkbox "Applica effetti dei miglioramenti" (default ON) per
  switchare tra simulazione realistica e formula base.
- Nuova mini-tabella "Contributo marginale di ogni feature": per
  ogni miglioramento mostra ΔP/L annuo e ΔAPR isolando l'effetto
  del singolo feature, con marker " attiva nel YAML".
- Sensibilità win-rate ora applica le feature attive ai due profili.

Effetti dichiarati come **stime ex-ante** dalla letteratura
short-vol systematic; i valori puntuali (+5 pp win, etc.) andranno
calibrati sul dataset accumulato.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:17:24 +00:00
root 1c6baaee83 feat(strategy): F+D+A miglioramenti — auto-pause, vol-harvest, delta dinamico
Implementa tre miglioramenti dalla roadmap di "📚 Strategia" + scaffolding del quarto.
Tutti retro-compatibili: i defaults della golden config disabilitano le nuove funzioni
così il comportamento attuale resta invariato finché l'operatore non le accende
esplicitamente in `strategy.yaml`. Il profilo `strategy.aggressiva.yaml` opta-in
agli incrementi più impattanti.

**F — Auto-pause su drawdown rolling (§7-bis)**

Circuit breaker sopra il kill-switch tecnico. Quando le ultime N posizioni
chiuse hanno cumulato perdite oltre `max_drawdown_pct × capitale_attuale`,
l'engine si auto-mette in pausa per `pause_weeks` settimane. Difende dai
regime change non rilevati dai filtri quant — se i filtri stanno fallendo
sistematicamente, fermarsi è meglio che continuare a sanguinare.

- `AutoPauseConfig` + `cfg.auto_pause` (top-level, default disabled).
- Migrazione SQL `0004_auto_pause.sql`: `system_state.auto_pause_until`
  e `auto_pause_reason` (NULL = engine attivo).
- Nuovo modulo puro `runtime/auto_pause.py` con `is_paused()` (gate I/O-free)
  e `evaluate_drawdown_breach()` (decide se armare).
- `entry_cycle` consulta `is_paused` subito dopo il kill-switch e arma
  la pausa dopo aver calcolato il capitale; nuovo status `_STATUS_AUTO_PAUSED`.
- Repository: `set_auto_pause`, `recent_closed_position_pnls_usd`.
- 12 test unitari: gate filter on/off, lookback insufficiente, soglia
  esatta, capitale non valido, transizioni paused → not-paused.

**D — Vol-collapse harvest (§7-bis)**

Exit opportunistica: quando DVOL è scesa di tot punti rispetto all'entry
e siamo in profit, esce subito. Edge IV-RV catturato, non c'è motivo di
tenere fino al profit-take. Nuovo `ExitAction = "CLOSE_VOL_HARVEST"`,
gate `exit.vol_harvest_dvol_decrease` (default 0 = off). 5 test unitari.

**A — Delta target dinamico per regime DVOL (§3.2)**

Strike short adattivo alla volatilità: a DVOL bassa il margine OTM è
generoso ⇒ posso prendere più premio (delta 0.15); a DVOL alta voglio
più safety distance (delta 0.10). Nuovo `DeltaByDvolBand` (step
function); quando `delta_by_dvol` è popolato, `_select_short` legge
la prima banda ascending con `dvol_now ≤ dvol_under`. Default vuoto =
comportamento invariato. `select_strikes` accetta nuovo kwarg
`dvol_now`, propagato da `entry_cycle`. 4 test unitari.

**C — Scaffolding profit-take graduale (§7.1bis)**

Schema in place ma runtime non ancora wirato. Aggiunge `PartialProfitLevel`
e `exit.profit_take_partial_levels` (default vuoto). Nuovo
`ExitAction = "CLOSE_PROFIT_PARTIAL"` nella Literal. La pipeline di
chiusure parziali nel runtime (entry_cycle / repository / clients)
richiede refactor del position model — lasciato come TODO per un PR
dedicato. La schema è pronta a recepire la config futura senza altri
breaking change.

**Profili aggiornati**

- `strategy.yaml` (golden, 1.2.0): tutto disabilitato by default.
- `strategy.conservativa.yaml` (1.2.0-cons): identico al golden.
- `strategy.aggressiva.yaml` (1.2.0-aggr): A+D+F enabled
  (delta_by_dvol 0.15/0.12/0.10, vol_harvest a 15 pt vol,
  auto_pause @ 15% DD su 5 trade, 2 settimane pausa).

Bump versioni 1.1.0 → 1.2.0, hash ricalcolati, test pinning aggiornato.

Suite: 426 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:07:25 +00:00
root c4cd2986a4 feat(gui): aggiunge max drawdown atteso (P99) e tail/gap nei profili
Due metriche per ciascun profilo nel pannello P/L:

- **Max DD attesa (P99)**: streak di stop consecutivi con probabilità
  ≤ 1% nell'anno (union-bound: N_trade × p_loss^N ≤ 0.01) ×
  perdita stop × contratti × posizioni concorrenti.
- **Max DD coda (gap)**: scenario gap notturno in cui il mark salta
  oltre la copertura long PRIMA che lo stop sia eseguibile —
  perdita = larghezza intera meno credito iniziale, su tutte le
  posizioni aperte.

Aggiunge anche colonna "Max DD" nella tabella di sensibilità
win-rate, così si vede immediatamente il trade-off
APR-vs-drawdown al variare del win-rate (da 65% a 82%).

Effetto pratico: a default cap=10k, spot=3000, win=0.75, trades=18:
- Conservativa: APR ≈ +1.8%, Max DD attesa ≈ −2.2% capitale
- Aggressiva: APR ≈ +14%, Max DD attesa ≈ −30% capitale

Numeri che rendono molto più tangibile la frase "drawdown scala con
lo stesso fattore" del §4-ter del documento.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:50:09 +00:00
root 4ab7590745 feat(entry): IV richness gate (§2.9) + golden config bump 1.0.0 → 1.1.0
Aggiunge il filtro a maggior impatto sul win-rate atteso: l'entry
salta se la IV implicita non sta pagando un margine misurabile sopra
la realized vol. La letteratura short-vol systematic indica che
l'edge sostenibile della strategia esiste solo quando IV30g − RV30g
supera una soglia di alcuni punti vol; senza questo gate il selling
vol nudo è strutturalmente neutro a win-rate 70-72%.

Implementazione end-to-end:

- `EntryConfig`: due nuovi campi `iv_minus_rv_min` e
  `iv_minus_rv_filter_enabled`, con default `0` / `false` per non
  rompere setup pre-calibrazione.
- `validate_entry`: §2.9 hard gate che blocca l'entry se
  `iv_minus_rv < iv_minus_rv_min` (skip silenzioso quando il dato è
  `None`, coerente con il pattern §2.8 dei filtri quant).
- `entry_cycle._gather_snapshot`: nuovo `_safe_iv_minus_rv` che
  legge `deribit.realized_vol("ETH")["iv_minus_rv_30d"]` in
  best-effort e lo propaga via `_MarketSnapshot.iv_minus_rv` →
  `EntryContext.iv_minus_rv` → audit `inputs.snapshot.iv_minus_rv`.
- `tests/unit/test_entry_validator.py`: 5 nuovi casi (default
  permissivo, gate sotto/sopra/uguale soglia, dato mancante).
- `tests/integration/test_entry_cycle.py`: stub `get_realized_vol`
  nel mock helper così tutti gli scenari di happy/edge path
  continuano a passare.

Configurazione di profili coerente con la disciplina:

- `strategy.yaml` (golden 1.1.0) e `strategy.conservativa.yaml`:
  gate `enabled=false, min=0`. Manteniamo i lunedì pre-calibrazione
  per accumulare dati sulla distribuzione di `iv_minus_rv`.
- `strategy.aggressiva.yaml` (1.1.0-aggressiva): gate
  `enabled=true, min=3`. Coerente con la filosofia del profilo —
  size più grande pretende win-rate più alto. La soglia 3 è
  conservativa; la documentazione raccomanda 5 dopo 4-8 settimane di
  calibrazione.

Doc + GUI:

- `docs/13-strategia-spiegata.md` §4-quater: spiega gate, parametri,
  default per profilo, effetto atteso sul P/L (trade/anno scendono
  ma E[trade] sale → APR cresce comunque), roadmap di hardening
  (soglia adattiva, vol-of-vol guard, multi-asset).
- pagina `📚 Strategia`: la riga "IV − RV" passa da informativa a
  pass/fail reale; mostra "filtro DISABILITATO (info-only)" quando
  spento, / contro la soglia di config quando acceso.

Bump versioni e hash di tutti e tre i file YAML
(`config_version: 1.1.0`, hash ricalcolato). Test pinning aggiornato
(`test_load_repo_strategy_yaml`).

Suite: 410 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:32:21 +00:00
49 changed files with 6606 additions and 138 deletions
+5 -2
View File
@@ -15,7 +15,10 @@ attiva, sizing Quarter Kelly e disciplina di uscita rigida.
- **Gestione attiva:** profit take 50% credito, stop loss 1.5× credito,
vol stop +10 punti DVOL, time stop 7 DTE, exit su short strike testato
(|delta| ≥ 0.30). Su ETH **non si difende rollando**: si esce.
- **Frequenza:** apertura ogni 7 giorni, una posizione alla volta.
- **Frequenza:** candidatura giornaliera (cron `0 14 * * *`, crypto è 24/7),
fino a **5 posizioni concorrenti** sul profilo principale (3 sul
conservativo, 8 sull'aggressivo). I gate quantitativi decidono se entrare;
nei giorni in cui falliscono, niente trade.
Il sistema è **deterministico**: nessun LLM partecipa al decision loop.
Le regole sono codificate, le soglie sono parametri di configurazione,
@@ -48,7 +51,7 @@ leggere in ordine per chi implementa.
| `docs/03-algorithms.md` | Specifiche dettagliate dei sette algoritmi core |
| `docs/04-mcp-integration.md` | Mappa dei tool MCP usati e contratti |
| `docs/05-data-model.md` | Schema persistenza posizioni, log, KB |
| `docs/06-operational-flow.md` | Flussi operativi: avvio, settimanale, monitoring |
| `docs/06-operational-flow.md` | Flussi operativi: avvio, entry daily, monitoring |
| `docs/07-risk-controls.md` | Kill switch, cap, dead-man, audit |
| `docs/08-testing-validation.md` | TDD, paper trading, golden tests |
| `docs/09-development-roadmap.md` | Fasi di sviluppo e milestone |
+1 -1
View File
@@ -36,7 +36,7 @@ per imparare, ma **non sta nel loop di esecuzione**.
### Cosa fa Cerbero Bite
1. Legge dati di mercato dagli MCP (Deribit, Hyperliquid, sentiment, macro).
2. Valuta condizioni di entrata su finestra temporale fissa (settimanale).
2. Valuta condizioni di entrata su finestra temporale fissa (giornaliera, 14:00 UTC; crypto è 24/7).
3. Calcola la struttura ottimale dello spread secondo le regole.
4. Verifica liquidità, cap di rischio, calendar macro.
5. Calcola sizing in contratti.
+93 -6
View File
@@ -19,10 +19,12 @@ Sorgente teorica: `Cerbero_Office/NewStrategy/strategia-credit-spread-eth.md`
## 2. Trigger di apertura (entry)
Il rule engine valuta l'apertura di un nuovo trade **una sola volta al
giorno**, alle **14:00 UTC** del lunedì (orario UE pomeridiano stabile,
fuori dai picchi di funding statunitensi). Se il lunedì è festività
italiana, l'engine ignora la regola e attende il lunedì successivo.
Il rule engine valuta l'apertura di un nuovo trade **una volta al
giorno**, alle **14:00 UTC** (orario UE pomeridiano stabile, fuori
dai picchi di funding statunitensi). Crypto è 24/7: non c'è un "giorno
buono" intrinseco, sono i gate quantitativi a decidere se entrare o
saltare. Se il giorno è festività italiana e `skip_holidays_country`
è attivo, l'engine attende il giorno successivo.
Una nuova posizione viene aperta **solo se tutte** le seguenti condizioni
sono vere:
@@ -41,8 +43,36 @@ sono vere:
patrimonio totale (correlazione direzionale già alta).
8. **Liquidità degli strike candidati** entro le soglie del §3.
### 2.8 Filtri quant (introdotti in Phase 4)
Due gate aggiuntivi che leggono i campi del `market_snapshot`:
- **Dealer net gamma > 0** (default: `dealer_gamma_min: 0`). Long-gamma
regime = dealer hedge che sopprime la vol → ideale per vendere
credit spread. Short-gamma = vol-amplifying flow, statisticamente
perdente. Disattivabile via `dealer_gamma_filter_enabled: false`.
- **Liquidation risk ≠ high** (default: `liquidation_filter_enabled:
true`). Salta entry quando il modulo sentiment flagga uno squeeze
imminente su long o short side.
### 2.9 IV richness gate (introdotto in Phase 5, opt-in)
Filtro a maggior impatto sul win-rate. **Disabilitato** di default
nella golden config — abilitato esplicitamente nel profilo
`strategy.aggressiva.yaml`:
- `iv_minus_rv_filter_enabled: true|false` — master switch.
- `iv_minus_rv_min: <pt vol>` — soglia: l'entry passa solo se la
IV implicita 30g supera la realized vol 30g di almeno tot punti.
Default 0; valori sensati 3-5 dopo calibrazione sui dati raccolti.
Razionale: il selling vol nudo è strutturalmente neutro a win-rate
70-72%. L'edge della strategia esiste solo quando il premio è
"ricco" — IV30 > RV30 + N. Vedere `13-strategia-spiegata.md §4-quater`
per il razionale completo.
Se anche **una sola** condizione fallisce → **no entry**, log con motivo,
ritento la settimana successiva.
ritento il giorno successivo.
## 3. Selezione struttura
@@ -77,6 +107,20 @@ assoluto.
| Distanza minima OTM | 15% (anche se delta è dentro tolleranza) |
| Distanza massima OTM | 25% |
**Variante dinamica per regime DVOL (Phase 5, opt-in).** Il campo
`short_strike.delta_by_dvol` (lista step-function) sostituisce il
delta target singolo con bande ordinate per `dvol_under`: a DVOL
bassa il bot prende delta più alto (più premio), a DVOL alta sceglie
delta più basso (più safety distance). Lista vuota = comportamento
classico col delta target sopra. Esempio bande nel profilo
`strategy.aggressiva.yaml`:
| dvol_under | delta_target | delta_min | delta_max |
|---|---|---|---|
| 50 | 0.15 | 0.13 | 0.17 |
| 70 | 0.12 | 0.10 | 0.15 |
| 90 | 0.10 | 0.08 | 0.12 |
Se nessuno strike disponibile rientra in entrambe le tolleranze → no entry.
### 3.3 Strike long (protezione)
@@ -179,6 +223,49 @@ Per ogni posizione aperta, il rule engine valuta in ordine:
L'ordine è importante: il primo trigger soddisfatto vince.
## 7-bis. Estensioni opzionali (Phase 5)
Tre miglioramenti opt-in al decision loop. Tutti **disabled** di
default nella golden config; il profilo `strategy.aggressiva.yaml`
li abilita.
### 7-bis.1 Vol-collapse harvest (D)
Inserito tra il profit-take §7.1 e lo stop-loss §7.2:
> Se `vol_harvest_dvol_decrease > 0`, siamo in profit
> (`debit < credit`) E `DVOL_now ≤ DVOL_entry vol_harvest_dvol_decrease`,
> `decision = CLOSE_VOL_HARVEST`.
Razionale: edge IV-RV già catturato, vol attesa rientrata, non c'è
motivo di tenere fino al profit-take. Default disabilitato (`0`);
profilo aggressivo: `15` punti vol.
### 7-bis.2 Profit-take graduale (C — scaffolding)
Schema in place per chiusure parziali; pipeline runtime di
chiusura partial-close NON ancora wirata. Default vuoto. Quando
popolato, ogni livello `{mark_at_pct_credit, close_pct_of_initial_contracts}`
emette un'azione `CLOSE_PROFIT_PARTIAL` advisory che il runtime
attualmente ignora. Il completamento richiede refactor del position
model (contracts_open vs contracts_initial) — PR dedicato.
### 7-bis.3 Auto-pause su drawdown (F)
Circuit breaker sopra il kill-switch tecnico. Valutato all'inizio di
ogni entry-cycle:
> Se `auto_pause.enabled` e P/L cumulato delle ultime
> `lookback_trades` posizioni chiuse < `max_drawdown_pct ×
> capitale_corrente`, l'engine si auto-mette in pausa per
> `pause_days` giorni (skip-day mode).
Difende dai regime change non rilevati dai filtri quant. La pausa
si annulla automaticamente alla scadenza, oppure manualmente con
`UPDATE system_state SET auto_pause_until = NULL`. Default
disabilitato; profilo aggressivo: lookback 5 trade, soglia 15%, 14
giorni di pausa.
## 8. Esecuzione di apertura
1. Engine costruisce **combo order Deribit** (un solo ordine atomico
@@ -211,7 +298,7 @@ L'ordine è importante: il primo trigger soddisfatto vince.
durante un trade aperto).
- Non aggiusta strike o size dopo l'apertura.
- Non apre nuovi trade per "compensare" perdite recenti.
- Non opera fuori dalla finestra del lunedì 14:00 UTC, eccetto chiusure.
- Non opera fuori dalla finestra delle 14:00 UTC, eccetto chiusure.
- Non deroga ai cap nemmeno per "opportunità eccezionali".
- Non si aggiorna automaticamente: nuovo set di regole = nuovo deploy
con review esplicita.
+1 -1
View File
@@ -122,7 +122,7 @@ Cerbero_Bite/
│ │ ├── lockfile.py # fcntl.flock single-instance
│ │ ├── alert_manager.py # severity routing
│ │ ├── health_check.py # ping + 3-strikes kill switch
│ │ ├── entry_cycle.py # weekly entry auto-execute
│ │ ├── entry_cycle.py # daily entry auto-execute (crypto 24/7)
│ │ ├── monitor_cycle.py # 12h exit auto-execute
│ │ └── recovery.py # state reconcile al boot
│ ├── state/ # persistenza
+66 -6
View File
@@ -202,7 +202,9 @@ CREATE TABLE system_state (
last_kelly_calib TEXT,
config_version TEXT NOT NULL,
started_at TEXT NOT NULL,
last_audit_hash TEXT -- aggiunto dalla migration 0002
last_audit_hash TEXT, -- aggiunto dalla migration 0002
auto_pause_until TEXT, -- aggiunto dalla migration 0004 (§7-bis.3)
auto_pause_reason TEXT -- aggiunto dalla migration 0004
);
```
@@ -212,6 +214,57 @@ Al boot l'orchestrator confronta questo valore con il tail del file
`audit.log`: discrepanza → kill switch CRITICAL, vedi
`07-risk-controls.md`.
I campi `auto_pause_until` / `auto_pause_reason` implementano il
circuit breaker §7-bis.3 (pausa automatica su drawdown rolling).
NULL = engine attivo.
### `option_chain_snapshots`
Snapshot della catena opzioni Deribit prelevata in continuo
(cron `*/15 * * * *`, allineato a `market_snapshots`). Crypto è
24/7: l'accumulo dataset deve essere continuo, non gateato sulla
settimana. Ogni tick contiene un quote per strumento entro la
finestra `[dte_min, dte_max]` di config; tutti i quote prelevati
nello stesso tick condividono ``timestamp``. Migration `0005`.
```sql
CREATE TABLE option_chain_snapshots (
timestamp TEXT NOT NULL,
asset TEXT NOT NULL,
instrument_name TEXT NOT NULL,
strike TEXT NOT NULL,
expiry TEXT NOT NULL,
option_type TEXT NOT NULL CHECK (option_type IN ('C','P')),
bid TEXT,
ask TEXT,
mid TEXT,
iv TEXT,
delta TEXT,
gamma TEXT,
theta TEXT,
vega TEXT,
open_interest INTEGER,
volume_24h INTEGER,
book_depth_top3 INTEGER,
PRIMARY KEY (timestamp, instrument_name)
) WITHOUT ROWID;
```
Indici: `(asset, timestamp DESC)` per listing recenti, `(asset,
expiry)` per query per scadenza specifica. ``book_depth_top3`` è
NULL by design — il collector non chiama l'order book per ogni
strike per non saturare l'API; lo legge il liquidity gate live solo
sugli strike candidati al picker.
**Sblocca**: il backtest non-stilizzato (modulo `core/backtest.py`
con prezzi reali invece di Black-Scholes), la calibrazione empirica
dello skew premium, la validazione ex-post dello strike picker.
Volume atteso (cron `*/15 * * * *`, ~96 snapshot/giorno):
~50 strike × 3 scadenze × 96 snap/giorno × 17 colonne ≈ ~1.1 MB/giorno,
~400 MB/anno. Considerare politiche di retention (archive trimestrale
in parquet) se il bot gira a lungo.
## Log file
Sotto `data/log/` un file per giorno: `cerbero-bite-YYYY-MM-DD.jsonl`.
@@ -289,11 +342,18 @@ da altri processi (es. CLI `state inspect`) non vedano stati parziali.
## Migrations
Lo schema viene tracciato con il counter `PRAGMA user_version`. La
prima volta `0001_init.sql` viene applicato e versione → 1; alla
seconda esecuzione (o su DB già a versione 1) `0002_audit_anchor.sql`
viene applicato e versione → 2. `state.db.run_migrations` è
idempotente. Nessun rollback supportato (migrations forward-only).
Lo schema viene tracciato con il counter `PRAGMA user_version`.
`state.db.run_migrations` applica in ordine ogni file
`NNNN_<name>.sql` con versione superiore a quella corrente,
idempotente, forward-only:
| Versione | File | Cosa aggiunge |
|---|---|---|
| 1 | `0001_init.sql` | tabelle base (positions, decisions, ...) |
| 2 | `0002_audit_anchor.sql` | `system_state.last_audit_hash` |
| 3 | `0003_market_snapshots.sql` | tabella `market_snapshots` |
| 4 | `0004_auto_pause.sql` | `system_state.auto_pause_until / _reason` |
| 5 | `0005_option_chain_snapshots.sql` | tabella `option_chain_snapshots` |
## Backup
+6 -4
View File
@@ -27,9 +27,11 @@ L'avvio è progettato per essere **safe**: se qualcosa non torna, il
sistema si rifiuta di operare. Mai partire con uno stato dubbio o un
ambiente diverso da quello atteso.
## Flusso 2 — Settimanale (entry)
## Flusso 2 — Daily (entry)
Trigger: cron `0 14 * * MON` (lunedì 14:00 UTC).
Trigger: cron `0 14 * * *` (ogni giorno 14:00 UTC). Crypto è 24/7:
la cadenza di candidatura non è gateata sulla settimana — sono i gate
quantitativi a decidere se entrare o saltare il giorno.
```
START
@@ -219,7 +221,7 @@ proposed
| Cron | Trigger | Frequenza |
|---|---|---|
| `0 14 * * MON` | Entry evaluation | Settimanale |
| `0 14 * * *` | Entry evaluation | Giornaliera |
| `0 2,14 * * *` | Position monitoring | 2× giorno |
| `0 12 1 * *` | Kelly recalibration | Mensile |
| `*/5 * * * *` | Health check | 5 min |
@@ -237,7 +239,7 @@ Il bot riconosce due interruttori indipendenti, letti da
| Variabile d'ambiente | Default | Cosa abilita |
|---|---|---|
| `CERBERO_BITE_ENABLE_DATA_ANALYSIS` | `true` | Job `market_snapshot` ogni 15 min: raccolta dati MCP, scrittura tabella `market_snapshots`, calibrazione soglie. |
| `CERBERO_BITE_ENABLE_STRATEGY` | `false` | Job `entry` (lunedì 14:00 UTC) e `monitor` (2× giorno): valutazione regole §2-§9 di `01-strategy-rules.md` e proposta/esecuzione ordini. |
| `CERBERO_BITE_ENABLE_STRATEGY` | `false` | Job `entry` (daily 14:00 UTC) e `monitor` (2× giorno): valutazione regole §2-§9 di `01-strategy-rules.md` e proposta/esecuzione ordini. |
I job di infrastruttura (`health`, `backup`, `manual_actions`) sono
**sempre attivi**, indipendentemente dai flag, perché tengono in vita il
+1 -1
View File
@@ -54,7 +54,7 @@ cerbero-bite kill-switch disarm --reason "<motivo>" \
L'operazione è transazionale: SQLite `system_state.kill_switch = 0` +
una linea `KILL_SWITCH_DISARMED` nella audit chain con il motivo. Il
disarm non riavvia automaticamente lo scheduler; è il prossimo tick
naturale (entry settimanale o monitor 12h) a far ripartire la
naturale (entry giornaliero o monitor 12h) a far ripartire la
decisione.
## Cap di rischio (oltre alle regole di strategia)
+5 -5
View File
@@ -157,9 +157,9 @@ chain:
| Test | Scenario |
|---|---|
| `test_weekly_open_happy_path` | Tutto OK → proposta inviata |
| `test_weekly_open_no_strike_available` | Chain vuota nel range delta |
| `test_weekly_open_macro_blocks` | FOMC entro 5 giorni |
| `test_daily_open_happy_path` | Tutto OK → proposta inviata |
| `test_daily_open_no_strike_available` | Chain vuota nel range delta |
| `test_daily_open_macro_blocks` | FOMC entro 5 giorni |
| `test_monitor_profit_take` | Mark = 50% credito → close_profit |
| `test_monitor_vol_stop` | DVOL +12 → close_vol |
| `test_recovery_after_crash_open_position` | Crash mid-fill, restart, riconcilia |
@@ -175,8 +175,8 @@ checked-in.
```
tests/golden/
├── 2026-04-27_weekly_open_bull_put.yaml # input snapshot
├── 2026-04-27_weekly_open_bull_put.golden # output atteso
├── 2026-04-27_daily_open_bull_put.yaml # input snapshot
├── 2026-04-27_daily_open_bull_put.golden # output atteso
└── runner.py
```
+2 -2
View File
@@ -114,7 +114,7 @@ Tasks:
4. `runtime/alert_manager.py` — escalation policy
Test integration su scenari completi (vedi `08-testing-validation.md`):
- weekly open happy path
- daily open happy path
- monitor profit take
- monitor vol stop
- recovery dopo crash
@@ -206,7 +206,7 @@ Setup:
Metriche da raccogliere:
- Numero proposte settimanali emesse
- Numero proposte giornaliere emesse
- Quante passano i filtri
- Win rate, avg P&L paper
- Discrepanze tra mid stimato e fill reale (slippage)
+2 -2
View File
@@ -24,8 +24,8 @@ asset:
# === ENTRY ===
entry:
# finestra di valutazione settimanale
cron: "0 14 * * MON" # lunedì 14:00 UTC
# finestra di valutazione giornaliera (crypto 24/7)
cron: "0 14 * * *" # ogni giorno 14:00 UTC
skip_holidays_country: "IT"
# filtri di accesso (vedi 01-strategy-rules.md §2)
+124 -17
View File
@@ -12,7 +12,7 @@
## TL;DR
Cerbero Bite vende **credit spread settimanali su ETH/Deribit** quando
Cerbero Bite vende **credit spread su ETH/Deribit (DTE 14-21, valutati ogni giorno)** quando
la volatilità implicita è **abbastanza alta da pagare bene**, il
mercato non è in **stress di liquidazione**, non ci sono **eventi macro
forti** in finestra, e il bias direzionale è **chiaro** (bull o bear).
@@ -22,8 +22,9 @@ strategia.
Ogni 15 minuti raccoglie 1 riga per asset (ETH e BTC) nella tabella
`market_snapshots`. Quei dati alimentano tre obiettivi distinti:
1. **Decisione live** — l'entry ciclo del lunedì 14:00 UTC legge i
campi più freschi per dire "go/no-go".
1. **Decisione live** — l'entry ciclo daily alle 14:00 UTC legge i
campi più freschi per dire "go/no-go" (crypto è 24/7: la cadenza
non è gateata sulla settimana, decidono i gate quantitativi).
2. **Monitoring continuo** — il decision loop di gestione attiva
confronta la situazione con quella all'apertura.
3. **Calibrazione** — la pagina `📐 Calibrazione` usa la distribuzione
@@ -197,7 +198,7 @@ Quanto segue è la versione "leggibile" delle regole §2-§9 di
`01-strategy-rules.md`. Ogni passo cita i campi di
`market_snapshots` che lo alimentano.
### Fase 1 — Trigger (lunedì 14:00 UTC, festività italiane escluse)
### Fase 1 — Trigger (daily 14:00 UTC, festività italiane escluse se `skip_holidays_country` è on)
```
SE NESSUNA posizione aperta
@@ -211,7 +212,7 @@ SE NESSUNA posizione aperta
ALLORA
procedi alla Fase 2
ALTRIMENTI
no entry, log motivo, ritento la settimana successiva
no entry, log motivo, ritento il giorno successivo
```
### Fase 2 — Bias e struttura
@@ -363,8 +364,8 @@ capitale **non aumenta** i contratti per trade.
### Frequenza realistica di entry
La regola si valuta una volta a settimana, ma la maggioranza dei
lunedì viene saltata per:
La regola si valuta **una volta al giorno** (crypto è 24/7), ma la
maggioranza dei giorni viene saltata per:
| Motivo di skip | Frequenza tipica |
|---|---|
@@ -372,11 +373,12 @@ lunedì viene saltata per:
| Bias non chiaro (trend × funding discordi o entrambi neutri senza IC) | 2535% |
| Macro entro DTE | 1020% |
| Funding o liquidation risk fuori soglia | 515% |
| Capitale o sizing insufficiente | 05% |
| Capitale, sizing insufficiente o concurrency cap raggiunto | 515% |
**Risultato netto: 3050% delle settimane finisce in entry effettiva
⇒ 1525 trade / anno** (52 lunedì × 3050%). Le altre settimane il
bot sta fermo. È il design.
**Risultato netto: ~3040% dei giorni finisce in entry effettiva
⇒ 110145 trade / anno** (365 candidature × pass-rate, capped da
`max_concurrent_positions`). I restanti giorni il bot sta fermo:
è il design — la disciplina è la strategia.
### Win-rate atteso (short delta 0.12 + profit-take 50%)
@@ -450,11 +452,11 @@ In modalità data-only (oggi) il P/L atteso è **0** — l'engine
2. **Validare** i filtri quant osservando ex-post quanti tick
sarebbero stati filtrati (vedi pagina `📐 Calibrazione`, colonna
"% bloccato dalla soglia").
3. **Misurare** la quota effettiva di lunedì che superano i filtri
3. **Misurare** la quota effettiva di giorni che superano i filtri
nel proprio regime, prima di committare capitale.
> Suggerimento: 4 settimane di dati = 4 lunedì × probabilità entry =
> 12 candidate entry effettive. **Aspettare almeno 8 settimane**
> Suggerimento: 30 giorni di dati = 30 candidature × probabilità entry
> ≈ 912 candidate entry effettive. **Aspettare almeno 60 giorni**
> prima di tarare le soglie dà uno storico con dispersione
> sufficiente per decisioni non-rumorose.
@@ -532,6 +534,111 @@ quella che il sistema parte ad eseguire.
---
## 4-quater. IV richness gate (§2.9): il filtro che alza il win-rate
Il filtro a maggior impatto sull'edge è anche il più semplice da
descrivere: **non vendere vol quando la IV non sta pagando un margine
misurabile sopra la RV**. È implementato come gate hard nel
`validate_entry`:
```
if iv_minus_rv_filter_enabled and iv_minus_rv < iv_minus_rv_min:
skip entry
```
con due parametri in `entry:` di `strategy.yaml`:
| Parametro | Default | Effetto |
|---|---|---|
| `iv_minus_rv_filter_enabled` | `false` (golden) / `true` (aggressiva) | Master switch del gate |
| `iv_minus_rv_min` | `0` (golden) / `3` (aggressiva) | Soglia in punti vol che IV30g RV30g deve eccedere |
Il dato è già raccolto in `market_snapshots.iv_minus_rv` ogni 15
minuti. Il gate consulta l'ultimo tick disponibile al momento
dell'entry cycle (non un percentile rolling — quello è il prossimo
step di calibrazione, vedi §4-quinquies in roadmap).
**Profili di default ragionati.**
- **Conservativa / golden config**: `enabled=false, min=0`. Tutti i
setup passano questo gate, anche con IV-RV negativa. Motivo: nei
primi 60 giorni non si hanno abbastanza tick per stabilire che
soglia ha senso nel proprio regime. Lasciamo la pagina
`📐 Calibrazione` mostrare la distribuzione e poi alziamo
manualmente.
- **Aggressiva**: `enabled=true, min=3`. Il profilo aggressivo già di
suo prende size più grande; pretendere `IV-RV ≥ 3 vol points` come
prerequisito è coerente — se stai betting più grosso, vuoi
win-rate più alto. La soglia 3 è conservativa; la letteratura
short-vol systematic suggerisce 5 dopo calibrazione.
**Cosa cambia nel P/L atteso quando attivi il gate.**
Il gate **riduce** il numero di entry (saltiamo settimane con premio
magro) ma **alza** la qualità di quelle che passano (premio ricco =
win-rate empirico più alto). Effetto netto sul P/L annuo:
- Trade/anno: 18 → 12-14 (skip più aggressivo)
- Win-rate atteso: 0.72 → 0.78-0.80
- E[trade] netto: +0.6 USD → +4-6 USD per contratto
- **P/L annuo proiettato sale anche se i trade scendono**, perché
ogni trade ha edge più alto.
La pagina `📚 Strategia` ha lo slider win-rate già coerente con
questa logica: muovi da 0.72 a 0.78 e vedi l'APR scattare.
**Roadmap di hardening (passi successivi al merge di questo PR).**
1. **Soglia adattiva**: sostituire `iv_minus_rv_min: 3` con un valore
calcolato a runtime come `P25 rolling 60d` di `market_snapshots.iv_minus_rv`.
2. **Vol-of-vol guard**: bloccare entry quando `dvol` è cambiato di
≥5 punti nelle ultime 24h, anche se `iv_minus_rv` è alto (regime
instabile).
3. **Multi-asset (ETH+BTC)**: come da §4-ter, sblocca il
moltiplicatore 2× sulle opportunità a parità di filtri.
## 4-quinquies. Catena opzioni storica (Phase 5)
In aggiunta a `market_snapshots` (cron `*/15`), il bot raccoglie
ora una seconda fonte di dati: la **catena opzioni Deribit completa**
ogni 15 minuti (cron `*/15`, allineato a `market_snapshots` — crypto
è 24/7, l'accumulo dataset deve essere continuo).
Tabella `option_chain_snapshots` — vedi `05-data-model.md` per lo
schema. Cosa registra per ogni strumento entro la finestra
`[dte_min, dte_max]`:
| Campo | Cosa misura | A che serve |
|---|---|---|
| `bid` / `ask` / `mid` | Prezzi reali Deribit in ETH | P/L reale per backtest |
| `iv` | IV implicita allo strike | Calibrazione skew premium |
| `delta`/`gamma`/`theta`/`vega` | Greeks | Sizing + risk monitoring |
| `open_interest` / `volume_24h` | Liquidità | Validazione gate §4 |
**Cosa sblocca:**
- **Backtest non-stilizzato**: il modulo `core/backtest.py` può
sostituire la stima Black-Scholes (con `skew_premium` hardcoded a
1.5×) leggendo i `mid` reali. Numeri da "stime ex-ante per
ranking" a "P/L empirico per dimensionare capitale".
- **Calibrazione empirica dello skew**: rapporto fra prezzi
Deribit reali e BS pulito → valore data-driven invece di stima a
istinto. Va fatto dopo 4-8 settimane di accumulo.
- **Validazione ex-post strike picker**: "il delta-0.12 era davvero
a 25% OTM in quella settimana?" diventa una `SELECT`.
**Strumenti CLI:**
- `cerbero-bite option-chain trigger` — esegue UNA volta il
collector senza aspettare il cron. Utile per test e per popolare
prima del primo tick utile.
- `cerbero-bite option-chain analyze [--bias bull_put|bear_call]`
legge l'ultimo snapshot, simula il selector di strike con la
strategy passata e stampa: short/long strike, delta, width,
credito reale, ratio credit/width, PASS/FAIL del gate
`credit_to_width_ratio_min`. Risponde in tempo reale: "il rule
engine aprirebbe un trade adesso?".
## 5. Come leggere il dato giorno per giorno
Tre euristiche operative sui campi raccolti:
@@ -539,9 +646,9 @@ Tre euristiche operative sui campi raccolti:
1. **Premio "ricco":** `iv_minus_rv` consistentemente > 5 punti per
N giorni → il regime sta pagando bene la vendita di vol. Sono i
periodi in cui la strategia ha edge maggiore.
2. **Premio "magro":** `dvol < 35` per più giorni → la finestra del
lunedì viene saltata. Non è un fallimento: è la disciplina che
funziona.
2. **Premio "magro":** `dvol < 35` per più giorni → la finestra
giornaliera viene saltata. Non è un fallimento: è la disciplina
che funziona.
3. **Stress imminente:** `liquidation_*_risk = high` o spike di
`oi_delta_pct_4h` (> 5% in valore assoluto) + funding ai limiti
→ atteso vol stop / time stop attivi nei prossimi cicli, anche
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,316 @@
# IV-RV adaptive entry gate — design
**Status**: drafted, awaiting implementation plan
**Date**: 2026-05-08
**Author**: brainstorming session (operator + Claude)
**Roadmap origin**: `docs/13-strategia-spiegata.md` §4-quater, hardening punti 1 e 2
## 1. Problema
Il gate IV-richness in `core/entry_validator.py:140-152` confronta `ctx.iv_minus_rv` con la soglia statica `entry.iv_minus_rv_min` (config). I dati raccolti in `market_snapshots` mostrano due problemi sul campione 2026-05-01 → 2026-05-08:
| metrica | ETH | BTC |
|---|---|---|
| IV-RV p25 | 1.87 | 5.48 |
| IV-RV p50 | 2.70 | 6.88 |
| IV-RV p90 | 8.52 | 8.26 |
| Pearson(\|drift%\|, IV-RV mean) | **0.02** | **+0.54** |
| Trend giornaliero IV-RV mean | 1.64 → 8.96 (+5.4×) | 4.78 → 8.11 (+1.7×) |
ETH mostra un IV richening monotono **decoupled dal drift realised** — la soglia statica `min=3` (profilo Aggressiva) avrebbe escluso il 50%+ dei tick nei primi 4 giorni e il 0% degli ultimi 3, sopra una distribuzione che non è stazionaria. Su BTC è meno drammatico ma il problema è strutturale: il regime di IV cambia, la soglia no.
## 2. Obiettivo
Sostituire la soglia statica con un meccanismo adattivo che si auto-calibra al regime corrente, senza richiedere intervento manuale dell'operatore. Il gate deve restare semanticamente "non vendere vol senza margine misurabile sopra la RV", solo che il "margine misurabile" è ora derivato dalla distribuzione storica recente invece che hardcoded.
## 3. Approccio scelto: Hybrid (P25 rolling + Vol-of-Vol guard)
Decisione presa nel brainstorming dopo aver scartato:
- **Solo percentile rolling**: insufficiente, non protegge da regime shift bruschi (DVOL salta di 5+ pt in 24h)
- **Solo regime detection (HMM/cluster)**: troppo opaco e ad alto rischio di overfit con 8 giorni di dati
L'hybrid bilancia due controlli additivi:
1. **Soglia adattiva** = P25 di IV-RV nella finestra rolling
2. **Vol-of-Vol guard** = blocco se |ΔDVOL_24h| ≥ 5 pt (regime shift detector)
## 4. Comportamento del gate
> **Errata 2026-05-10** — design originale assumeva `n_days = len(history) // 96` (cadenza fissa 96 tick/giorno). Refattorizzato a **distinct-days policy**: il caller interroga il repository per (a) il numero di giorni di calendario distinti coperti e (b) i valori della finestra scelta. Questo permette di mischiare cadenze (tick live 15 min + backfill daily) senza assumere un fattore costante. Sotto il pseudo-codice aggiornato.
```
def validate_iv_richness_adaptive(ctx, cfg, repo):
if not cfg.entry.iv_minus_rv_filter_enabled:
return PASS # gate off
# 1) Soglia adattiva — distinct-days policy
n_days = repo.count_iv_rv_distinct_days(
asset=ctx.asset, max_days=cfg.window_target_days,
)
if n_days < 1:
return PASS # warmup hard: nessun giorno coperto
if n_days >= cfg.window_target_days:
window_days = cfg.window_target_days # ≥60g → finestra fissa 60g
elif n_days >= cfg.window_min_days:
window_days = cfg.window_min_days # 30-60g → finestra fissa 30g
else:
window_days = cfg.window_target_days # 1-30g → query tutta la storia disp.
history = repo.iv_rv_values_for_window(
asset=ctx.asset, window_days=window_days,
)
threshold = max(percentile(history, cfg.percentile),
cfg.absolute_floor)
if ctx.iv_minus_rv < threshold:
return SKIP("IV richness below P25 rolling")
# 2) Vol-of-vol guard (additivo)
if cfg.vol_of_vol_guard_enabled:
dvol_24h_ago = repo.dvol_lookback(asset=ctx.asset, hours=24)
if dvol_24h_ago is not None and \
abs(ctx.dvol - dvol_24h_ago) >= cfg.vol_of_vol_threshold:
return SKIP("DVOL shifted ≥5pt in 24h")
return PASS
```
### 4.1 Warmup behavior
Tutte le soglie sono espresse in **giorni di calendario distinti** coperti da almeno un record valido (`fetch_ok=1``iv_minus_rv IS NOT NULL`).
| storia disponibile | finestra usata | comportamento |
|---|---|---|
| 0 giorni distinti | — | gate disabled (PASS), log `GATE_WARMUP_INSUFFICIENT` |
| 1 g ≤ giorni < 30 g | tutta la storia | percentile della finestra disponibile (decisione utente) |
| 30 g ≤ giorni < 60 g | ultimi 30 g | finestra fissa 30g |
| ≥ 60 g | ultimi 60 g | finestra fissa 60g (target) |
I valori della finestra contribuiscono uno-a-uno al percentile: un tick a 15 min e un record di backfill daily hanno lo stesso peso. Mix di cadenze diverse è statisticamente sbilanciato finché i tick live non saturano la finestra; questa è una scelta deliberata per non rinunciare allo storico backfill.
### 4.2 Soglia = `max(P25, floor)`
`floor` è il vecchio `iv_minus_rv_min` riutilizzato come *absolute floor*. Permette:
- backwards compat: se `adaptive_enabled=False`, comportamento identico ad oggi
- safety: anche se P25 storico fosse ≈0 (regime IV bassa persistente), l'operatore può tenere un floor minimo (es. 1 vol pt) per evitare di vendere vol mai
## 5. Schema config (`config/schema.py`)
Aggiunte alla classe `EntryConfig`:
```python
class EntryConfig(BaseModel):
# campi esistenti
iv_minus_rv_filter_enabled: bool = False
iv_minus_rv_min: Decimal = Decimal("0") # ora è absolute_floor
# nuovi — gate adattivo
iv_minus_rv_adaptive_enabled: bool = False
iv_minus_rv_percentile: Decimal = Decimal("0.25")
iv_minus_rv_window_target_days: int = 60
iv_minus_rv_window_min_days: int = 30
# nuovi — vol-of-vol guard
vol_of_vol_guard_enabled: bool = False
vol_of_vol_threshold_pt: Decimal = Decimal("5")
vol_of_vol_lookback_hours: int = 24
```
### 5.1 Profili predefiniti
**Conservativa / golden** (`config/golden.yaml`):
```yaml
entry:
iv_minus_rv_filter_enabled: false
iv_minus_rv_adaptive_enabled: false
vol_of_vol_guard_enabled: false
```
Comportamento invariato rispetto a oggi.
**Aggressiva** (`config/aggressive.yaml`):
```yaml
entry:
iv_minus_rv_filter_enabled: true
iv_minus_rv_adaptive_enabled: true
iv_minus_rv_min: 0 # floor 0, lascia decidere il P25 rolling
iv_minus_rv_percentile: 0.25
iv_minus_rv_window_target_days: 60
iv_minus_rv_window_min_days: 30
vol_of_vol_guard_enabled: true
vol_of_vol_threshold_pt: 5
```
### 5.2 Backwards compat
Se `iv_minus_rv_adaptive_enabled=False` e `iv_minus_rv_filter_enabled=True`, il validator usa il path legacy `iv_rv < iv_minus_rv_min` esattamente come oggi. Nessuna regressione comportamentale per chi non ha attivato l'adaptive.
## 6. Architettura
### 6.1 Modulo `core/adaptive_threshold.py`
Funzione pura, testabile senza I/O. La selezione della finestra è
delegata al caller (separation of concerns):
```python
def compute_adaptive_threshold(
history: Sequence[Decimal],
*,
n_days: int,
percentile: Decimal,
absolute_floor: Decimal,
) -> Decimal | None:
"""Ritorna None se warmup hard (n_days==0 o history vuota),
altrimenti max(P_q(history), absolute_floor)."""
```
### 6.2 Repository (`state/repository.py`)
Tre metodi su `Repository` (uno preesistente):
```python
def count_iv_rv_distinct_days(
self, *, asset: str, max_days: int, as_of: datetime | None = None,
) -> int:
"""Numero di giorni di calendario distinti con almeno un IV-RV
valido nell'intervallo [as_of - max_days, as_of]."""
def iv_rv_values_for_window(
self, *, asset: str, window_days: int, as_of: datetime | None = None,
) -> list[Decimal]:
"""Valori IV-RV ordinati ASC su [as_of - window_days, as_of]."""
def dvol_lookback(self, *, asset: str, hours: int) -> Decimal | None:
"""DVOL del tick più vicino a now-hours, ±15min tolerance. None se gap."""
```
Usa l'index esistente `idx_market_snapshots_asset_ts`. Nessuna nuova migration.
### 6.3 Inline nel validator
`core/entry_validator.py` chiama `compute_adaptive_threshold` con i dati dal repo. Nessun caching, stateless. La query per finestra 60g (5760 righe per asset) costa ms-level con index — non vale la pena introdurre cache da invalidare.
### 6.4 Audit / logging
Ogni entry cycle scrive in `decisions`:
- `inputs_json`: `{iv_rv_now, threshold_used, dvol_now, dvol_24h_ago, n_history, window_used_days}`
- `outputs_json`: `{gate: "iv_richness_adaptive", verdict: PASS|SKIP, reason}`
Permette ricostruzione ex-post: perché un trade è stato saltato e con quali numeri.
## 7. GUI Calibrazione (`pages/6_📐_Calibrazione.py`)
Aggiunta sezione "Gate adattivo" sopra ai percentili statici esistenti — questi ultimi NON vengono modificati (restano per analisi).
```
┌─ 🎯 Gate IV-RV adattivo ──────────────────────────────────┐
│ Status: 🟢 Attivo (Aggressiva) | 🟡 Warmup (n=8/30g) │
│ │
│ Soglia P25 rolling (corrente) 2.74 vol pts │
│ IV-RV ultimo tick 8.96 vol pts ✅ │
│ Floor assoluto 0.00 vol pts │
│ │
│ Evoluzione 7g (sparkline) ▁▂▂▃▄▆▇ │
│ │
│ ── VoV guard ── │
│ ΔDVOL ultime 24h 0.43 pt ✅ │
│ Soglia VoV 5.00 pt │
│ │
│ Decisione hypothetical: PASS │
└──────────────────────────────────────────────────────────┘
```
La GUI usa la stessa funzione `compute_adaptive_threshold` del validator → unica fonte di verità. Refresh manuale al page load (coerente con resto GUI).
## 8. Error handling
Principio: **fail-open** in tutti i casi di dato mancante. Il gate adattivo è additivo sopra ai gate hard esistenti (delta band, credit, ecc.); il dato mancante non deve trasformare un trade non voluto in trade fatto, ma neppure deve bloccare entry valide se è il dato del gate stesso a mancare.
| Scenario | Comportamento | Loggato come |
|---|---|---|
| `iv_rv_history` ritorna 0 righe | gate = PASS | `GATE_WARMUP_NO_DATA` |
| Storia < 96 tick (1g) | gate = PASS, threshold None | `GATE_WARMUP_INSUFFICIENT` |
| `dvol_lookback` = None (gap dati 24h fa) | VoV guard = PASS | `VOV_GUARD_NO_LOOKBACK` |
| `ctx.iv_minus_rv` = None | gate bypassato (riga 146 esistente) | invariato |
| `ctx.dvol` = None | VoV guard bypassato, gate adattivo prosegue | invariato |
## 9. Testing strategy
### 9.1 Unit — `core/adaptive_threshold.py`
- Warmup: `n_days=0` → None
- Warmup difensivo: `n_days=0` ma history non vuota → None
- Difensivo: history vuota con `n_days>0` → None
- `n_days=1`, 96 tick → P25 sui 96
- Mix di cadenze (30 daily + 96 live) → percentile uno-a-uno
- Floor binding: P25=0.5, floor=3 → 3
- Floor non binding: P25=5, floor=0 → 5
- Percentile diverso: percentile=0.5 → mediana
- Validation: percentile ∉ [0,1] o `n_days<0` → ValueError
### 9.1bis Unit — `state/repository.py`
- `count_iv_rv_distinct_days`: 1 giorno → 1; 3 giorni misti → 3
- esclusione asset diversi, NULL e fetch_ok=0
- rispetto del cutoff `max_days`
- ValueError su `as_of` naive o `max_days≤0`
- `iv_rv_values_for_window`: ordine ASC, filtri equivalenti, ValueError input
### 9.2 Unit — `core/entry_validator.py`
Mock repo, focus sul flusso decisionale:
- Adaptive disabled, statico passa → PASS legacy
- Adaptive disabled, statico fail → SKIP legacy
- Adaptive enabled, IV-RV sopra P25 → PASS
- Adaptive enabled, IV-RV sotto P25 sopra floor → SKIP("rolling")
- VoV guard ON, ΔDVOL=6 pt → SKIP("vov")
- VoV guard ON, ΔDVOL=4 pt, gate principale pass → PASS
- VoV guard ON, dvol_lookback=None → guard bypass
- ctx.iv_minus_rv=None → bypass
- decisions log popolato con threshold, n_history, dvol_lookback
### 9.3 Integration — `tests/integration/test_entry_cycle_adaptive.py`
SQLite temp + fixture market_snapshots con 5760 tick (60g):
- Aggressiva con flag adattivo → entry passa solo nei tick sopra P25 della fixture
- Golden → invariato
- Warmup: DB con 50 tick → tutti pass, log `GATE_WARMUP_INSUFFICIENT`
- Regime shift fixture: DVOL salta da 50 a 56 in 24h → VoV guard scatta
### 9.4 Backtest sanity
Aggiunta nel report del CLI `backtest`: count distinto skip-reasons (`iv_rv_static`, `iv_rv_rolling`, `vov_guard`) per analisi ex-post.
### 9.5 GUI smoke
Manuale al deploy:
- Calibrazione carica con `enabled=False` (fallback grafico)
- Calibrazione mostra warmup status quando DB < 30g
- Refresh ricalcola coerente
### 9.6 Cosa NON testiamo
- Performance query (5760 righe con index = trascurabile)
- Concorrenza entry cycle / GUI (WAL abilitato)
- Migrazione (nessuna tabella nuova)
## 10. Out of scope
- Regime detection avanzato (HMM, cluster) — esplicitamente scartato per opacità
- Soglie per-asset diverse — il P25 si calibra naturalmente per asset (history filtrata per asset)
- Auto-attivazione adaptive su Conservativa quando warmup è completo — l'operatore decide manualmente quando passare al profilo aggressivo
- Multi-asset (ETH+BTC simultanei) — già scope §4-ter, indipendente da questo design
- Override manuale soglia da GUI — explicit no, l'obiettivo è autocalibrante
## 11. Decisioni prese durante brainstorming
| # | Domanda | Scelta |
|---|---|---|
| 1 | Approccio | Hybrid (percentile + VoV guard) |
| 2 | Warmup | Percentile della finestra disponibile (anche se <30g) |
| 3 | Percentile | P25 (allineato roadmap) |
| 4 | Window | Target 60g, attivazione a 30g, sotto usa quel che c'è |
| 5 | VoV soglia | 5 pt vol in 24h |
| 6 | Architettura | Inline nel validator, stateless, no cache DB |
| 7 | GUI | Pannello informativo aggiunto, slider esistenti invariati |
+281
View File
@@ -0,0 +1,281 @@
"""Backfill IV-RV history from Deribit public REST API.
Use case: il gate IV-RV adattivo richiede ≥30 giorni di storia per
attivarsi (spec ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``).
Quando la pipeline ha pochi giorni di tick live, questo script popola
``market_snapshots`` con record giornalieri storici calcolati da
DVOL Deribit + closes ETH-PERPETUAL/BTC-PERPETUAL pubblici.
Idempotente: usa ``INSERT OR REPLACE`` sulla PK ``(timestamp, asset)``
con timestamp fissato a 12:00 UTC del giorno di calendario.
``fetch_errors_json='{"backfill":true}'`` permette di distinguere i
record sintetici dai tick live in audit.
I record contribuiscono al gate adattivo come singoli punti
(distinct-days policy), uno per giorno: lo statistical bias è coperto
dalla spec §4.1.
Esempio:
python scripts/backfill_iv_rv.py --db data/state.sqlite --days 45
"""
from __future__ import annotations
import argparse
import json
import math
import sqlite3
import statistics
import urllib.request
from dataclasses import dataclass
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
__all__ = [
"BackfillRow",
"build_backfill_records",
"compute_rv30d_annualized",
]
_DERIBIT = "https://www.deribit.com/api/v2/public"
_RV_LOOKBACK_DAYS = 30
_TRADING_DAYS_PER_YEAR = 365
@dataclass(frozen=True)
class BackfillRow:
"""Una riga sintetica destinata a ``market_snapshots``."""
timestamp: datetime
asset: str
spot: Decimal
dvol: Decimal
realized_vol_30d: Decimal
iv_minus_rv: Decimal
fetch_ok: bool = True
# ---------------------------------------------------------------------------
# Pure compute layer (TDD: tests/unit/test_backfill_iv_rv.py)
# ---------------------------------------------------------------------------
def compute_rv30d_annualized(closes: list[Decimal]) -> Decimal:
"""Volatilità realizzata 30g annualizzata in **punti vol** (% annuali).
Args:
closes: ``31`` close consecutivi (uno al giorno) — produce 30
log-returns.
Returns:
``stdev(log_returns) * sqrt(365) * 100`` come ``Decimal``.
Raises:
ValueError: se ``len(closes) < 31``.
"""
if len(closes) < _RV_LOOKBACK_DAYS + 1:
raise ValueError(
f"need at least {_RV_LOOKBACK_DAYS + 1} closes, got {len(closes)}"
)
log_returns = [
math.log(float(closes[i] / closes[i - 1]))
for i in range(1, _RV_LOOKBACK_DAYS + 1)
]
sigma_daily = statistics.stdev(log_returns)
annualized = sigma_daily * math.sqrt(_TRADING_DAYS_PER_YEAR) * 100.0
return Decimal(str(annualized))
def build_backfill_records(
*,
asset: str,
spots_by_day: dict[str, Decimal],
dvols_by_day: dict[str, Decimal],
oldest_day: date,
) -> list[BackfillRow]:
"""Compone le righe di backfill per i giorni nella finestra richiesta.
Per ogni giorno target ``D`` (da ``oldest_day`` a oggi compreso) la
riga viene emessa solo se: (a) DVOL e spot sono presenti per ``D``,
(b) la serie di spot dispone dei 30 giorni precedenti necessari per
il calcolo di RV30d.
Il timestamp è fissato a 12:00 UTC, scelta che evita il rollover
delle candele Deribit (vedi anomalia DVOL 00:00 UTC nei market
snapshots live).
"""
sorted_days = sorted(spots_by_day.keys())
records: list[BackfillRow] = []
for day_str in sorted_days:
day = date.fromisoformat(day_str)
if day < oldest_day:
continue
if day_str not in dvols_by_day:
continue
rv_window = [
day - timedelta(days=i) for i in range(_RV_LOOKBACK_DAYS, -1, -1)
]
if not all(d.isoformat() in spots_by_day for d in rv_window):
continue
closes = [spots_by_day[d.isoformat()] for d in rv_window]
rv = compute_rv30d_annualized(closes)
dvol = dvols_by_day[day_str]
spot = spots_by_day[day_str]
records.append(
BackfillRow(
timestamp=datetime(day.year, day.month, day.day, 12, 0, tzinfo=UTC),
asset=asset,
spot=spot,
dvol=dvol,
realized_vol_30d=rv,
iv_minus_rv=dvol - rv,
)
)
return records
# ---------------------------------------------------------------------------
# I/O layer (network + sqlite)
# ---------------------------------------------------------------------------
def _http_get_json(url: str, timeout_s: float = 30.0) -> dict:
with urllib.request.urlopen(url, timeout=timeout_s) as resp:
return json.loads(resp.read())
def fetch_dvol_daily(currency: str, days: int) -> dict[str, Decimal]:
"""Mappa ``YYYY-MM-DD -> DVOL close`` per gli ultimi ``days`` giorni."""
end_ms = int(datetime.now(UTC).timestamp() * 1000)
start_ms = end_ms - days * 86_400_000
url = (
f"{_DERIBIT}/get_volatility_index_data"
f"?currency={currency}"
f"&start_timestamp={start_ms}&end_timestamp={end_ms}"
f"&resolution=86400"
)
payload = _http_get_json(url)
data = (payload.get("result") or {}).get("data") or []
out: dict[str, Decimal] = {}
for row in data:
# row = [ts_ms, open, high, low, close]
if not isinstance(row, list) or len(row) < 5:
continue
ts = datetime.fromtimestamp(row[0] / 1000, tz=UTC).date().isoformat()
out[ts] = Decimal(str(row[4]))
return out
def fetch_spot_daily(instrument: str, days: int) -> dict[str, Decimal]:
"""Mappa ``YYYY-MM-DD -> close USD`` per ``instrument`` su ``days`` giorni."""
end_ms = int(datetime.now(UTC).timestamp() * 1000)
start_ms = end_ms - days * 86_400_000
url = (
f"{_DERIBIT}/get_tradingview_chart_data"
f"?instrument_name={instrument}"
f"&start_timestamp={start_ms}&end_timestamp={end_ms}"
f"&resolution=1D"
)
payload = _http_get_json(url)
result = payload.get("result") or {}
ticks = result.get("ticks") or []
closes = result.get("close") or []
out: dict[str, Decimal] = {}
for ts_ms, close in zip(ticks, closes, strict=False):
ts = datetime.fromtimestamp(ts_ms / 1000, tz=UTC).date().isoformat()
out[ts] = Decimal(str(close))
return out
def write_records(db_path: str, records: list[BackfillRow]) -> int:
"""Insert/replace dei record in market_snapshots. Ritorna la rowcount."""
if not records:
return 0
conn = sqlite3.connect(db_path)
try:
with conn:
for r in records:
conn.execute(
"INSERT OR REPLACE INTO market_snapshots ("
"timestamp, asset, spot, dvol, realized_vol_30d, iv_minus_rv, "
"funding_perp_annualized, funding_cross_annualized, "
"dealer_net_gamma, gamma_flip_level, oi_delta_pct_4h, "
"liquidation_long_risk, liquidation_short_risk, "
"macro_days_to_event, fetch_ok, fetch_errors_json"
") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
r.timestamp.astimezone(UTC).isoformat(),
r.asset,
str(r.spot),
str(r.dvol),
str(r.realized_vol_30d),
str(r.iv_minus_rv),
None,
None,
None,
None,
None,
None,
None,
None,
1 if r.fetch_ok else 0,
'{"backfill":true}',
),
)
return len(records)
finally:
conn.close()
def backfill_asset(db_path: str, asset: str, days: int) -> int:
"""Esegue l'intero backfill per ``asset`` e ritorna il numero di
record inseriti/sostituiti.
"""
instrument = f"{asset.upper()}-PERPETUAL"
fetch_window_days = days + _RV_LOOKBACK_DAYS + 5 # margine per il lookback RV
spots = fetch_spot_daily(instrument, fetch_window_days)
dvols = fetch_dvol_daily(asset.upper(), fetch_window_days)
today = datetime.now(UTC).date()
oldest = today - timedelta(days=days)
records = build_backfill_records(
asset=asset.upper(),
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=oldest,
)
return write_records(db_path, records)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--db",
default="data/state.sqlite",
help="path a state.sqlite (default: data/state.sqlite)",
)
parser.add_argument(
"--days",
type=int,
default=45,
help="quanti giorni di backfill emettere (default: 45)",
)
parser.add_argument(
"--assets",
nargs="+",
default=["ETH", "BTC"],
help="asset symbols (default: ETH BTC)",
)
args = parser.parse_args()
total = 0
for asset in args.assets:
n = backfill_asset(args.db, asset, args.days)
print(f"{asset}: inserted/replaced {n} backfill rows")
total += n
print(f"TOTAL: {total}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+151 -2
View File
@@ -679,6 +679,155 @@ def replay(date_from: str, date_to: str, capital: float, dry_run: bool) -> None:
)
@main.command()
@click.option(
"--strategy",
"strategy_path",
type=click.Path(path_type=Path),
default=Path("strategy.yaml"),
show_default=True,
help="Path al file di strategia (golden, conservativa, aggressiva, ...).",
)
@click.option(
"--db",
"db_path",
type=click.Path(path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
help="SQLite con `market_snapshots` storiche.",
)
@click.option(
"--from",
"date_from",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: 90 giorni fa).",
)
@click.option(
"--to",
"date_to",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: oggi).",
)
@click.option(
"--capital",
type=float,
default=1500.0,
show_default=True,
help="Capitale di partenza per il backtest, in USD.",
)
@click.option(
"--asset",
type=str,
default="ETH",
show_default=True,
help="Asset di riferimento per le snapshot.",
)
@click.option(
"--no-enforce-hash",
is_flag=True,
default=False,
help="Salta la verifica del config_hash (utile per profili sperimentali).",
)
def backtest(
strategy_path: Path,
db_path: Path,
date_from: datetime | None,
date_to: datetime | None,
capital: float,
asset: str,
no_enforce_hash: bool,
) -> None:
"""Esegue il backtest stilizzato su `market_snapshots` storiche.
Usa lo stesso `validate_entry` del live per i filtri (rigoroso) e
un modello Black-Scholes con skew premium per stimare credito ed
exit P/L (stilizzato — vedi docstring di `core/backtest.py`).
"""
from cerbero_bite.config.loader import load_strategy # noqa: PLC0415
from cerbero_bite.core.backtest import run_backtest # noqa: PLC0415
console = Console()
if date_to is None:
date_to = datetime.now(UTC)
if date_from is None:
date_from = date_to - timedelta(days=90)
date_from = date_from.replace(tzinfo=UTC) if date_from.tzinfo is None else date_from
date_to = date_to.replace(tzinfo=UTC) if date_to.tzinfo is None else date_to
loaded = load_strategy(strategy_path, enforce_hash=not no_enforce_hash)
cfg = loaded.config
conn = connect_state(db_path)
try:
repo = Repository()
snapshots = repo.list_market_snapshots(
conn,
asset=asset.upper(),
start=date_from,
end=date_to,
limit=10000,
)
finally:
conn.close()
if not snapshots:
console.print(
f"[yellow]Nessuno snapshot {asset} trovato fra {date_from.date()} "
f"e {date_to.date()}.[/yellow]"
)
sys.exit(1)
console.print(
f"[green]Caricate {len(snapshots)} snapshot {asset} "
f"({snapshots[-1].timestamp.date()}{snapshots[0].timestamp.date()})[/green]"
)
report = run_backtest(snapshots, cfg, capital_usd=Decimal(str(capital)))
table = Table(title=f"Backtest report — {strategy_path.name}")
table.add_column("Metrica", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Picks (daily 14:00)", str(report.n_picks))
table.add_row(
"Accettati dai filtri",
f"{report.n_accepted} ({report.n_accepted / max(1, report.n_picks):.0%})",
)
table.add_row("Saltati per dato mancante", str(report.n_skipped_data))
table.add_row("Trade completati (con P/L)", str(report.n_completed))
table.add_row("Vincenti", f"{report.n_winners} ({report.win_rate:.0%})")
table.add_row("P/L cumulato (USD)", f"{report.cumulative_pnl_usd:+.2f}")
table.add_row(
"P/L su capitale", f"{report.cumulative_pnl_pct_of_capital:+.2%}"
)
table.add_row(
"Max drawdown", f"{report.max_drawdown_usd:.0f} USD "
f"({report.max_drawdown_pct:.1%})",
)
table.add_row(
"Sharpe (annualized)",
f"{report.sharpe_annualized}" if report.sharpe_annualized is not None
else "",
)
console.print(table)
if report.skip_reasons:
skip_table = Table(title="Motivi di skip aggregati")
skip_table.add_column("Motivo")
skip_table.add_column("Giorni", justify="right")
for reason, count in sorted(
report.skip_reasons.items(), key=lambda kv: -kv[1]
):
skip_table.add_row(reason, str(count))
console.print(skip_table)
console.print(
"[dim]Il modello P/L è stilizzato: BS + skew premium 1.5×. "
"Numeri ottimi per ranking config, non per promesse operative.[/dim]"
)
@main.group()
def config() -> None:
"""Strategy configuration utilities."""
@@ -855,8 +1004,8 @@ def option_chain_trigger(
) -> None:
"""Esegue UNA volta il collector della catena opzioni e persiste in DB.
Utile per popolare i dati senza aspettare il cron settimanale del
job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline
Utile per popolare i dati senza aspettare il cron del job
``option_chain_snapshot``. Riusa esattamente la stessa pipeline
schedulata.
"""
from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415
+5 -2
View File
@@ -9,7 +9,7 @@ the ``core/`` algorithms stay in their preferred numeric domain.
from __future__ import annotations
import re
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Any, Literal
@@ -167,9 +167,12 @@ class DeribitClient:
) -> Decimal:
"""Return the latest DVOL value for ``currency``."""
when = (now or datetime.now(UTC)).astimezone(UTC)
# Window starts one day back so a tick fired exactly at 00:00 UTC
# — before Deribit has built today's 1D candle — still has
# yesterday's close to fall back on (see candles[-1] branch).
body = {
"currency": currency,
"start_date": (when.date()).isoformat(),
"start_date": (when.date() - timedelta(days=1)).isoformat(),
"end_date": when.date().isoformat(),
"resolution": "1D",
}
+133 -2
View File
@@ -47,7 +47,7 @@ class EntryConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
cron: str = "0 14 * * MON"
cron: str = "0 14 * * *"
skip_holidays_country: str = "IT"
# access filters (§2)
@@ -75,12 +75,50 @@ class EntryConfig(BaseModel):
dealer_gamma_filter_enabled: bool = True
liquidation_filter_enabled: bool = True
# IV richness filter (§2.9). `iv_minus_rv_min` è la soglia in
# punti vol che la IV implicita 30g deve eccedere la RV30g per
# ammettere l'entry. Letteratura short-vol systematic: l'edge
# sostenibile esiste solo con un margine misurabile fra IV e RV.
# Default disabilitato + soglia 0 per non bloccare l'avvio finché
# non si è calibrato sui dati raccolti (vedi `📐 Calibrazione`).
iv_minus_rv_min: Decimal = Field(default=Decimal("0"))
iv_minus_rv_filter_enabled: bool = False
# IV richness gate adattivo (Phase 5+). Quando
# `iv_minus_rv_adaptive_enabled=True`, la soglia statica
# `iv_minus_rv_min` diventa il floor assoluto e la soglia
# effettiva è `max(P_q rolling, floor)` calcolata su
# `market_snapshots`. Vedi
# `docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md`.
iv_minus_rv_adaptive_enabled: bool = False
iv_minus_rv_percentile: Decimal = Field(default=Decimal("0.25"))
iv_minus_rv_window_target_days: int = 60
iv_minus_rv_window_min_days: int = 30
# Vol-of-Vol guard (§4-quater roadmap punto 2): blocca entry se
# |DVOL_now - DVOL_24h_ago| supera la soglia. Cattura regime
# shift bruschi non riflessi nel percentile rolling.
vol_of_vol_guard_enabled: bool = False
vol_of_vol_threshold_pt: Decimal = Field(default=Decimal("5"))
vol_of_vol_lookback_hours: int = 24
# ---------------------------------------------------------------------------
# Structure
# ---------------------------------------------------------------------------
class DeltaByDvolBand(BaseModel):
"""Banda della step function delta-target per regime DVOL (§3.2 A)."""
model_config = ConfigDict(frozen=True, extra="forbid")
dvol_under: Decimal
delta_target: Decimal
delta_min: Decimal
delta_max: Decimal
class ShortStrikeSpec(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
@@ -90,6 +128,16 @@ class ShortStrikeSpec(BaseModel):
distance_otm_pct_min: Decimal = Field(default=Decimal("0.15"))
distance_otm_pct_max: Decimal = Field(default=Decimal("0.25"))
# §3.2 enhancement (A): step function delta-target by DVOL regime.
# Empty list = behaviour invariato (delta_target sopra è il singolo
# valore). Quando popolato, il combo_builder sceglie la prima
# banda ordinata ascending su `dvol_under` con
# `dvol_now ≤ dvol_under`. Esempio:
# - dvol_under=50 → delta 0.15 (bassa vol → più premio)
# - dvol_under=70 → delta 0.12
# - dvol_under=90 → delta 0.10 (alta vol → più safety)
delta_by_dvol: list[DeltaByDvolBand] = Field(default_factory=list)
class SpreadWidthSpec(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
@@ -153,7 +201,7 @@ class SizingConfig(BaseModel):
cap_per_trade_eur: Decimal = Field(default=Decimal("200"))
cap_aggregate_open_eur: Decimal = Field(default=Decimal("1000"))
max_concurrent_positions: int = 1
max_concurrent_positions: int = 5
max_contracts_per_trade: int = 4
dvol_adjustment: list[DvolAdjustmentBand] = Field(default_factory=_default_dvol_bands)
@@ -165,6 +213,25 @@ class SizingConfig(BaseModel):
# ---------------------------------------------------------------------------
class PartialProfitLevel(BaseModel):
"""Livello della scala di profit-take graduale (§7.1bis C).
`mark_at_pct_credit`: il livello è triggerato quando
`mark_combo mark_at_pct_credit × credito_iniziale` (es. 0.25 =
25% del credito = 75% di profitto sulla porzione chiusa).
`close_pct_of_initial_contracts`: frazione dei contratti aperti
INIZIALMENTE da chiudere a questo livello (es. 0.50 = chiudi metà).
Le frazioni sono cumulative; chiudere oltre i contratti residui
è no-op.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
mark_at_pct_credit: Decimal
close_pct_of_initial_contracts: Decimal
class ExitConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
@@ -176,6 +243,29 @@ class ExitConfig(BaseModel):
delta_breach_threshold: Decimal = Field(default=Decimal("0.30"))
adverse_move_4h_pct: Decimal = Field(default=Decimal("0.05"))
# §7.1ter (D): vol-collapse harvest. Esce in profit anche se il
# profit-take non è ancora colpito quando DVOL è scesa di tot
# punti rispetto all'entry (edge IV-RV catturato, vol attesa già
# rientrata). 0 = filtro disabilitato.
vol_harvest_dvol_decrease: Decimal = Field(default=Decimal("0"))
# §7.1bis (C): scala graduata di profit-take. Lista vuota =
# comportamento invariato (chiusura atomica al
# `profit_take_pct_of_credit`). Quando popolata, l'engine
# interpreta come "chiudi N% dei contratti iniziali al livello
# di mark M%×credito". Le entry sono ordinate dal mark più alto
# (più profit, livello triggerato prima) al più basso. Vedi
# `core/exit_decision.py` per la semantica esatta.
#
# ATTENZIONE: questa funzione richiede il supporto di chiusure
# parziali nel runtime (entry_cycle / repository / clients).
# Fino al merge della partial-close pipeline, l'engine la mappa
# a CLOSE_PROFIT atomico al primo livello triggerato (vedi
# commento in `evaluate`). Default vuoto = no-op.
profit_take_partial_levels: list[PartialProfitLevel] = Field(
default_factory=list
)
monitor_cron: str = "0 2,14 * * *"
user_confirmation_timeout_min: int = 30
escalate_on_timeout: list[str] = Field(
@@ -183,6 +273,36 @@ class ExitConfig(BaseModel):
)
# ---------------------------------------------------------------------------
# Auto-pause (F): circuit breaker su drawdown rolling
# ---------------------------------------------------------------------------
class AutoPauseConfig(BaseModel):
"""Configurazione del circuit breaker su drawdown.
Quando abilitato, il rule engine valuta prima di ogni entry
il P/L cumulato delle ultime `lookback_trades` posizioni chiuse
in proporzione al capitale attuale. Se la perdita supera la
soglia, l'engine si auto-mette in pausa per `pause_days`
giorni (skip-day). La pausa si annulla automaticamente alla
scadenza, oppure manualmente via comando dalla GUI.
Difende da regime change non rilevati dai filtri quant: se i
filtri stanno fallendo sistematicamente, vale la pena fermarsi
e attendere che le condizioni cambino, invece di continuare a
sanguinare. È un'estensione conservativa del kill switch
(che oggi reagisce solo a errori tecnici).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
enabled: bool = False
lookback_trades: int = 5
max_drawdown_pct: Decimal = Field(default=Decimal("0.10"))
pause_days: int = 14
# ---------------------------------------------------------------------------
# Kelly recalibration
# ---------------------------------------------------------------------------
@@ -256,6 +376,7 @@ class StrategyConfig(BaseModel):
sizing: SizingConfig = Field(default_factory=SizingConfig)
exit: ExitConfig = Field(default_factory=ExitConfig)
kelly_recalibration: KellyConfig = Field(default_factory=KellyConfig)
auto_pause: AutoPauseConfig = Field(default_factory=AutoPauseConfig)
execution: ExecutionConfig = Field(default_factory=ExecutionConfig)
monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig)
@@ -286,6 +407,16 @@ class StrategyConfig(BaseModel):
if self.entry.dvol_min >= self.entry.dvol_max:
raise ValueError("dvol_min must be < dvol_max")
e = self.entry
if e.iv_minus_rv_window_min_days >= e.iv_minus_rv_window_target_days:
raise ValueError(
"iv_minus_rv_window_min_days must be < iv_minus_rv_window_target_days"
)
if not (Decimal("0") < e.iv_minus_rv_percentile < Decimal("1")):
raise ValueError(
"iv_minus_rv_percentile must be in (0, 1)"
)
return self
@@ -0,0 +1,75 @@
"""Funzione pura per calcolare la soglia adattiva del gate IV-RV.
Spec: ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``.
Deterministic, no I/O. La selezione della finestra (target_days vs
min_days vs intera storia disponibile) è responsabilità del caller, che
interroga il repository con i parametri corretti e passa qui sia i
valori (``history``) sia il numero di giorni distinti coperti
(``n_days``). Questo permette di mischiare cadenze diverse tick live a
15 min e backfill daily senza assumere un fattore costante
``ticks_per_day``.
"""
from __future__ import annotations
from collections.abc import Sequence
from decimal import Decimal
__all__ = ["compute_adaptive_threshold"]
def compute_adaptive_threshold(
history: Sequence[Decimal],
*,
n_days: int,
percentile: Decimal,
absolute_floor: Decimal,
) -> Decimal | None:
"""Ritorna la soglia adattiva o ``None`` durante il warmup hard.
Args:
history: Sequenza dei valori IV-RV nella finestra scelta dal
caller. NULL e tick non riusciti devono essere già stati
filtrati upstream. L'ordine non è significativo per il
percentile.
n_days: Numero di giorni distinti coperti dalla storia
disponibile (calcolato dal caller, tipicamente con
``COUNT(DISTINCT date(timestamp))``). ``0`` warmup hard.
percentile: Quantile target nella distribuzione (es. ``0.25``).
absolute_floor: Floor minimo applicato dopo il calcolo del
percentile. La soglia restituita è
``max(P_q, absolute_floor)``.
Returns:
``None`` se ``n_days == 0`` o ``history`` è vuota (warmup hard,
gate disabilitato), altrimenti il percentile della finestra
bounded dal floor.
"""
if not (Decimal(0) <= percentile <= Decimal(1)):
raise ValueError(
f"percentile must be in [0, 1], got {percentile}"
)
if n_days < 0:
raise ValueError(f"n_days must be >= 0, got {n_days}")
if n_days == 0 or not history:
return None
return max(_percentile(history, percentile), absolute_floor)
def _percentile(values: Sequence[Decimal], q: Decimal) -> Decimal:
"""Linear-interpolated percentile, NumPy-compatible (method='linear').
Implementato in Decimal puro per evitare dipendenze numpy nel core.
"""
if not values:
raise ValueError("percentile of empty sequence")
sorted_v = sorted(values)
n = len(sorted_v)
k = (Decimal(n) - Decimal(1)) * q
f = int(k) # floor
c = min(f + 1, n - 1)
if f == c:
return sorted_v[f]
frac = k - Decimal(f)
return sorted_v[f] + (sorted_v[c] - sorted_v[f]) * frac
+651
View File
@@ -0,0 +1,651 @@
"""Stylized backtest engine over ``market_snapshots`` (§13).
Two layers, both pure functions:
1. **Entry-filter simulation** for each daily 14:00 UTC tick in the
recorded snapshots, evaluate which §2 gates would have passed,
reconstructing :class:`EntryContext` from the snapshot. This part
is **rigorous**: it uses the same :func:`validate_entry` the live
engine uses, so the output is exactly "what the bot would have
decided".
2. **P/L estimation per accepted entry** since ``market_snapshots``
does NOT record the option chain (we only collect spot, DVOL,
funding, etc.), credit and exit P/L are estimated via a stylized
Black-Scholes model: given ``spot``, ``DVOL`` (as IV), and the
strategy's delta target, we solve for the short strike, the long
strike at ``width_pct`` distance, and the combo mid-price. Future
ticks are then re-priced under the same model to detect the first
exit trigger from §7.
The stylized layer is **intentionally approximate**: it captures the
geometry of the strategy (DVOL band sets credit, ETH path drives
exit triggers) but not the second-order effects (chain liquidity,
borrow rates, exchange fees beyond the 0.03% notional cap, dealer
hedging skew). Numbers are good for ranking and tuning, not for
operational P/L promises.
The engine is deterministic and side-effect-free: it does **not**
write to SQLite, does not call MCP, does not place orders. It
operates entirely on a list of :class:`MarketSnapshotRecord` rows
the caller has already loaded.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Literal
from pydantic import BaseModel, ConfigDict
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.core.entry_validator import EntryContext, validate_entry
from cerbero_bite.state.models import MarketSnapshotRecord
__all__ = [
"BacktestEntry",
"BacktestExit",
"BacktestReport",
"DailyPick",
"bs_put_delta",
"bs_put_price",
"daily_picks",
"estimate_credit_eth",
"find_strike_for_delta",
"normal_cdf",
"run_backtest",
"simulate_entry_filters",
"simulate_position_outcome",
]
_ANNUAL_DAYS = Decimal("365")
_DEFAULT_RISK_FREE = Decimal("0")
_NUM_SLIPPAGE_PCT_OF_CREDIT = Decimal("0.03")
_NUM_FEE_PCT_OF_NOTIONAL = Decimal("0.0003")
# ---------------------------------------------------------------------------
# Black-Scholes helpers (stdlib-only)
# ---------------------------------------------------------------------------
def normal_cdf(x: float) -> float:
"""Standard normal CDF, no scipy dependency."""
return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))
def bs_put_price(*, spot: float, strike: float, t_years: float, sigma: float) -> float:
"""European put price under r=0, q=0 Black-Scholes.
Returns price in spot units (so for an ETH option, dividing by spot
gives the price in ETH).
"""
if t_years <= 0 or sigma <= 0 or spot <= 0 or strike <= 0:
return max(0.0, strike - spot)
sqrt_t = math.sqrt(t_years)
d1 = (math.log(spot / strike) + 0.5 * sigma * sigma * t_years) / (sigma * sqrt_t)
d2 = d1 - sigma * sqrt_t
return strike * normal_cdf(-d2) - spot * normal_cdf(-d1)
def bs_put_delta(*, spot: float, strike: float, t_years: float, sigma: float) -> float:
"""Put delta under r=0, q=0 Black-Scholes (negative number for put).
Returns 0 for expired options.
"""
if t_years <= 0 or sigma <= 0 or spot <= 0 or strike <= 0:
return 0.0
sqrt_t = math.sqrt(t_years)
d1 = (math.log(spot / strike) + 0.5 * sigma * sigma * t_years) / (sigma * sqrt_t)
return normal_cdf(d1) - 1.0 # = -N(-d1)
def find_strike_for_delta(
*,
spot: float,
dvol_pct: float,
dte_days: int,
target_delta_abs: float,
) -> float:
"""Solve for the put strike whose |delta| matches ``target_delta_abs``.
Bisection on a monotone-decreasing |delta(strike)| relationship.
Returns the strike in absolute USD terms.
"""
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
# Bracket: from 50% of spot (deep OTM, small |delta|) up to spot
# (ATM, |delta| ≈ 0.5).
low = max(1.0, spot * 0.30)
high = spot
for _ in range(64):
mid = 0.5 * (low + high)
delta_abs = abs(bs_put_delta(spot=spot, strike=mid, t_years=t_years, sigma=sigma))
if delta_abs > target_delta_abs:
high = mid
else:
low = mid
if abs(high - low) < 1e-3:
break
return 0.5 * (low + high)
def estimate_credit_eth(
*,
spot: float,
dvol_pct: float,
dte_days: int,
width_pct: float,
delta_target_abs: float,
skew_premium: float = 1.5,
) -> tuple[float, float, float]:
"""Estimate credit (ETH), short_strike, long_strike for a bull-put-style
credit spread under Black-Scholes.
``skew_premium`` è il moltiplicatore applicato al credito BS per
approssimare la **vol smile** dell'ETH options market (le put OTM
trattano a IV più alta della IV ATM, quindi un BS pulito sottostima
sistematicamente il premio del venditore di vol). Il default 1.5
è una stima conservativa dei dati Deribit storici (smile slope
tipica 5-10 vol points per 100δ); valori sensati: 1.3 (smile
blanda) 1.8 (regime "stress IV"). Va calibrato sui dati reali
quando avremo abbastanza chain history da farlo.
Returns ``(credit_eth, short_strike, long_strike)``. Credit è
già moltiplicato per ``skew_premium``.
"""
short_strike = find_strike_for_delta(
spot=spot, dvol_pct=dvol_pct, dte_days=dte_days,
target_delta_abs=delta_target_abs,
)
width_usd = width_pct * spot
long_strike = max(1.0, short_strike - width_usd)
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
short_mid_usd = bs_put_price(
spot=spot, strike=short_strike, t_years=t_years, sigma=sigma,
)
long_mid_usd = bs_put_price(
spot=spot, strike=long_strike, t_years=t_years, sigma=sigma,
)
short_mid_eth = short_mid_usd / spot
long_mid_eth = long_mid_usd / spot
credit_eth = (short_mid_eth - long_mid_eth) * skew_premium
return credit_eth, short_strike, long_strike
# ---------------------------------------------------------------------------
# Entry filter simulation — rigorous (uses validate_entry exactly)
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class DailyPick:
"""Indice di un tick "daily h:00 UTC" nella time-series."""
timestamp: datetime
snapshot: MarketSnapshotRecord
def daily_picks(
snapshots: list[MarketSnapshotRecord],
*,
hour_utc: int = 14,
asset: str = "ETH",
) -> list[DailyPick]:
"""Estrae un tick per giorno all'ora ``hour_utc``.
Crypto è 24/7, quindi non gateiamo sul giorno della settimana: per
ogni giorno di calendario presente in ``snapshots``, prendiamo la
riga ETH che cade a ``hour_utc:00``. Giorni senza tick a quell'ora
vengono saltati. ``snapshots`` deve essere ordinato per timestamp
ascending.
"""
picks: list[DailyPick] = []
seen_dates: set[tuple[int, int, int]] = set() # (year, month, day)
for snap in snapshots:
if snap.asset.upper() != asset.upper():
continue
ts = snap.timestamp.astimezone(UTC)
if ts.hour != hour_utc:
continue
key = (ts.year, ts.month, ts.day)
if key in seen_dates:
continue
seen_dates.add(key)
picks.append(DailyPick(timestamp=ts, snapshot=snap))
return picks
def _entry_context_from_snapshot(
snap: MarketSnapshotRecord,
*,
capital_usd: Decimal,
eth_holdings_pct: Decimal = Decimal("0"),
) -> EntryContext | None:
"""Costruisce :class:`EntryContext` dal tick storico.
``None`` quando la riga non ha i campi minimi (spot, dvol, funding).
Nel filtro questo si traduce in "skip del giorno" è la stessa
logica del live: un tick incompleto è meglio di un'entry al buio.
"""
if snap.dvol is None or snap.funding_perp_annualized is None:
return None
return EntryContext(
capital_usd=capital_usd,
dvol_now=snap.dvol,
funding_perp_annualized=snap.funding_perp_annualized,
eth_holdings_pct_of_portfolio=eth_holdings_pct,
next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma,
liquidation_squeeze_risk_high=(
snap.liquidation_long_risk == "high"
or snap.liquidation_short_risk == "high"
),
)
@dataclass(frozen=True)
class EntryFilterResult:
"""Esito del check filtri per una singola daily pick."""
pick: DailyPick
accepted: bool
reasons: list[str]
skipped_for_data: bool # True se il tick non aveva i campi minimi
def simulate_entry_filters(
picks: list[DailyPick],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
) -> list[EntryFilterResult]:
"""Per ogni daily pick, valuta validate_entry come farebbe il live.
Rigoroso: usa esattamente :func:`validate_entry` e :class:`EntryContext`.
Restituisce la lista degli esiti, una entry per pick.
"""
results: list[EntryFilterResult] = []
for pick in picks:
ctx = _entry_context_from_snapshot(pick.snapshot, capital_usd=capital_usd)
if ctx is None:
results.append(
EntryFilterResult(
pick=pick,
accepted=False,
reasons=["incomplete_snapshot"],
skipped_for_data=True,
)
)
continue
decision = validate_entry(ctx, cfg)
results.append(
EntryFilterResult(
pick=pick,
accepted=decision.accepted,
reasons=list(decision.reasons),
skipped_for_data=False,
)
)
return results
# ---------------------------------------------------------------------------
# Position outcome simulation — stylized (Black-Scholes re-pricing)
# ---------------------------------------------------------------------------
class BacktestEntry(BaseModel):
"""Trade aperto nel backtest (snapshot al momento dell'entry)."""
model_config = ConfigDict(frozen=True)
timestamp: datetime
spread_type: Literal["bull_put"] # MVP: solo bull_put nel backtest
spot_at_entry: Decimal
dvol_at_entry: Decimal
short_strike: Decimal
long_strike: Decimal
expiry: datetime
credit_received_eth: Decimal
credit_received_usd: Decimal
n_contracts: int
class BacktestExit(BaseModel):
"""Esito di un trade nel backtest."""
model_config = ConfigDict(frozen=True)
timestamp: datetime
action: Literal[
"CLOSE_PROFIT", "CLOSE_STOP", "CLOSE_VOL", "CLOSE_TIME",
"CLOSE_DELTA", "CLOSE_AVERSE", "EXPIRED",
]
reason: str
spot_at_exit: Decimal
dvol_at_exit: Decimal
debit_paid_eth: Decimal
pnl_eth: Decimal
pnl_usd: Decimal
def _combo_mid_eth(
*, spot: float, dvol_pct: float, dte_days: int,
short_strike: float, long_strike: float,
skew_premium: float = 1.5,
) -> float:
"""Re-prezza il combo bull-put usando BS sul nuovo spot/dvol/dte."""
sigma = max(0.01, dvol_pct / 100.0)
t_years = max(1e-6, dte_days / 365.0)
short_mid_usd = bs_put_price(
spot=spot, strike=short_strike, t_years=t_years, sigma=sigma,
)
long_mid_usd = bs_put_price(
spot=spot, strike=long_strike, t_years=t_years, sigma=sigma,
)
return (short_mid_usd - long_mid_usd) / spot * skew_premium
def simulate_position_outcome(
entry: BacktestEntry,
future_snapshots: list[MarketSnapshotRecord],
cfg: StrategyConfig,
) -> BacktestExit:
"""Re-prezza il combo a ogni tick futuro fino al primo exit trigger.
Triggers in ordine §7:
1. profit_take (debit 0.5×credit)
2. stop_loss (debit 2.5×credit)
3. vol_stop (DVOL salita di 10 pt rispetto entry)
4. time_stop (DTE 7 e debit > 0.7×credit)
5. expiry (uscita per scadenza, P/L = credit intrinsic)
"""
ec = cfg.exit
credit = float(entry.credit_received_eth)
short = float(entry.short_strike)
long_ = float(entry.long_strike)
profit_thresh = float(ec.profit_take_pct_of_credit) * credit
stop_thresh = float(ec.stop_loss_mark_x_credit) * credit
skip_time_thresh = float(ec.time_stop_skip_if_close_to_profit_pct) * credit
for snap in future_snapshots:
if snap.timestamp <= entry.timestamp:
continue
if snap.timestamp >= entry.expiry:
break
if snap.dvol is None or snap.spot is None:
continue
spot_now = float(snap.spot)
dvol_now = float(snap.dvol)
dte = max(0, (entry.expiry - snap.timestamp).days)
debit = _combo_mid_eth(
spot=spot_now, dvol_pct=dvol_now, dte_days=dte,
short_strike=short, long_strike=long_,
)
if debit <= profit_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_PROFIT",
reason=f"debit {debit:.4f}{profit_thresh:.4f}",
)
if debit >= stop_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_STOP",
reason=f"debit {debit:.4f}{stop_thresh:.4f}",
)
if dvol_now >= float(entry.dvol_at_entry) + float(ec.vol_stop_dvol_increase):
return _exit(
snap, entry, debit,
action="CLOSE_VOL",
reason=f"DVOL {dvol_now:.1f} ≥ entry+{ec.vol_stop_dvol_increase}",
)
if dte <= ec.time_stop_dte_remaining and debit > skip_time_thresh:
return _exit(
snap, entry, debit,
action="CLOSE_TIME",
reason=f"DTE {dte}{ec.time_stop_dte_remaining}",
)
# Tick passati senza trigger: scadenza naturale.
last = future_snapshots[-1] if future_snapshots else None
intrinsic = max(0.0, short - float(last.spot if last and last.spot else 0))
intrinsic_capped = min(intrinsic, short - long_)
debit_at_expiry_eth = (
intrinsic_capped / float(last.spot)
if last is not None and last.spot is not None and float(last.spot) > 0
else 0.0
)
return _exit(
last or _synthetic_expiry_snapshot(entry),
entry,
debit_at_expiry_eth,
action="EXPIRED",
reason="held to expiry",
)
def _synthetic_expiry_snapshot(entry: BacktestEntry) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=entry.expiry,
asset="ETH",
spot=entry.spot_at_entry,
dvol=entry.dvol_at_entry,
fetch_ok=False,
)
def _exit(
snap: MarketSnapshotRecord,
entry: BacktestEntry,
debit_eth: float,
*,
action: str,
reason: str,
) -> BacktestExit:
pnl_eth = float(entry.credit_received_eth) - debit_eth
spot = float(snap.spot) if snap.spot is not None else float(entry.spot_at_entry)
dvol = float(snap.dvol) if snap.dvol is not None else float(entry.dvol_at_entry)
return BacktestExit(
timestamp=snap.timestamp,
action=action, # type: ignore[arg-type]
reason=reason,
spot_at_exit=Decimal(str(spot)),
dvol_at_exit=Decimal(str(dvol)),
debit_paid_eth=Decimal(str(debit_eth)),
pnl_eth=Decimal(str(pnl_eth)),
pnl_usd=Decimal(str(pnl_eth * spot * entry.n_contracts)),
)
# ---------------------------------------------------------------------------
# Full pipeline
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class CompletedTrade:
entry: BacktestEntry
exit: BacktestExit
class BacktestReport(BaseModel):
"""Aggregato del backtest. Tutti i numeri sono **stime**."""
model_config = ConfigDict(frozen=True)
n_picks: int
n_accepted: int
n_skipped_data: int
n_completed: int
n_winners: int
win_rate: Decimal
cumulative_pnl_usd: Decimal
cumulative_pnl_pct_of_capital: Decimal
max_drawdown_usd: Decimal
max_drawdown_pct: Decimal
sharpe_annualized: Decimal | None
skip_reasons: dict[str, int]
trades: list[CompletedTrade]
def _build_entry_from_pick(
pick: DailyPick,
cfg: StrategyConfig,
*,
capital_usd: Decimal,
eur_to_usd: Decimal,
) -> BacktestEntry | None:
snap = pick.snapshot
if snap.spot is None or snap.dvol is None:
return None
spot = float(snap.spot)
dvol = float(snap.dvol)
width_pct = float(cfg.structure.spread_width.target_pct_of_spot)
delta_target = float(cfg.structure.short_strike.delta_target)
dte = cfg.structure.dte_target
credit_eth, short_strike, long_strike = estimate_credit_eth(
spot=spot, dvol_pct=dvol, dte_days=dte,
width_pct=width_pct, delta_target_abs=delta_target,
)
width_usd = float(cfg.structure.spread_width.target_pct_of_spot) * spot
credit_usd = credit_eth * spot
if width_usd <= 0 or credit_usd / width_usd < float(
cfg.structure.credit_to_width_ratio_min
):
return None # ratio gate fallisce → no entry
cap_pertrade_usd = float(cfg.sizing.cap_per_trade_eur) * float(eur_to_usd)
risk_target = min(float(cfg.sizing.kelly_fraction) * float(capital_usd), cap_pertrade_usd)
n_contracts = max(0, min(int(risk_target // width_usd), cfg.sizing.max_contracts_per_trade))
if n_contracts == 0:
return None
expiry = pick.timestamp + timedelta(days=dte)
return BacktestEntry(
timestamp=pick.timestamp,
spread_type="bull_put",
spot_at_entry=Decimal(str(spot)),
dvol_at_entry=Decimal(str(dvol)),
short_strike=Decimal(str(round(short_strike, 2))),
long_strike=Decimal(str(round(long_strike, 2))),
expiry=expiry,
credit_received_eth=Decimal(str(credit_eth)),
credit_received_usd=Decimal(str(credit_usd * n_contracts)),
n_contracts=n_contracts,
)
def _max_drawdown_usd(equity: list[Decimal]) -> tuple[Decimal, Decimal]:
"""Return ``(max_dd_usd, max_dd_pct_of_peak)`` over an equity curve."""
if not equity:
return Decimal("0"), Decimal("0")
peak = equity[0]
max_dd_usd = Decimal("0")
max_dd_pct = Decimal("0")
for v in equity:
if v > peak:
peak = v
dd = peak - v
if dd > max_dd_usd:
max_dd_usd = dd
if peak > 0 and (dd / peak) > max_dd_pct:
max_dd_pct = dd / peak
return max_dd_usd, max_dd_pct
def _sharpe_annualized(pnls_usd: list[Decimal], capital_usd: Decimal) -> Decimal | None:
"""Annualized Sharpe approximation: ~120 trade/anno (entry daily,
crypto 24/7, post-filtri + concurrency cap).
Restituisce ``None`` se ci sono <5 trade o stdev = 0.
"""
if len(pnls_usd) < 5 or capital_usd <= 0:
return None
rets = [float(p / capital_usd) for p in pnls_usd]
mean = sum(rets) / len(rets)
var = sum((r - mean) ** 2 for r in rets) / max(1, (len(rets) - 1))
std = math.sqrt(var)
if std == 0:
return None
sharpe = mean / std * math.sqrt(120)
return Decimal(str(round(sharpe, 3)))
def run_backtest(
snapshots: list[MarketSnapshotRecord],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
eur_to_usd: Decimal = Decimal("1.075"),
asset: str = "ETH",
) -> BacktestReport:
"""Esegue il backtest end-to-end sui ``snapshots`` ETH ordinati per ts."""
snapshots = sorted(snapshots, key=lambda s: s.timestamp)
eth_snapshots = [s for s in snapshots if s.asset.upper() == asset.upper()]
picks = daily_picks(eth_snapshots, asset=asset)
filter_results = simulate_entry_filters(picks, cfg, capital_usd=capital_usd)
# Tally skip reasons
skip_reasons: dict[str, int] = {}
for r in filter_results:
if r.accepted:
continue
for reason in r.reasons:
skip_reasons[reason] = skip_reasons.get(reason, 0) + 1
trades: list[CompletedTrade] = []
for r in filter_results:
if not r.accepted:
continue
entry = _build_entry_from_pick(
r.pick, cfg, capital_usd=capital_usd, eur_to_usd=eur_to_usd,
)
if entry is None:
skip_reasons["sizing_or_ratio"] = skip_reasons.get("sizing_or_ratio", 0) + 1
continue
future = [s for s in eth_snapshots if s.timestamp > r.pick.timestamp]
exit_ = simulate_position_outcome(entry, future, cfg)
trades.append(CompletedTrade(entry=entry, exit=exit_))
pnls = [t.exit.pnl_usd for t in trades]
cumulative = sum(pnls, start=Decimal("0"))
n_winners = sum(1 for p in pnls if p > 0)
win_rate = (
Decimal(n_winners) / Decimal(len(pnls))
if pnls
else Decimal("0")
)
# Equity curve in USD assoluti
equity = [capital_usd]
for p in pnls:
equity.append(equity[-1] + p)
max_dd_usd, max_dd_pct = _max_drawdown_usd(equity)
return BacktestReport(
n_picks=len(picks),
n_accepted=sum(1 for r in filter_results if r.accepted),
n_skipped_data=sum(1 for r in filter_results if r.skipped_for_data),
n_completed=len(trades),
n_winners=n_winners,
win_rate=win_rate,
cumulative_pnl_usd=cumulative,
cumulative_pnl_pct_of_capital=(
cumulative / capital_usd if capital_usd > 0 else Decimal("0")
),
max_drawdown_usd=max_dd_usd,
max_drawdown_pct=max_dd_pct,
sharpe_annualized=_sharpe_annualized(pnls, capital_usd),
skip_reasons=skip_reasons,
trades=trades,
)
+27 -3
View File
@@ -83,26 +83,49 @@ def _pick_expiry(
return min(candidates, key=lambda exp: abs(candidates[exp] - sc.dte_target))
def _resolve_delta_band(
sc: object, dvol_now: Decimal | None
) -> tuple[Decimal, Decimal, Decimal]:
"""Return (delta_target, delta_min, delta_max) per il regime DVOL corrente.
Quando ``sc.delta_by_dvol`` è popolato e ``dvol_now`` è disponibile,
sceglie la prima banda (ordinata ascending sulla ``dvol_under``) il
cui ``dvol_under dvol_now``. Altrimenti torna ai valori statici di
``sc``.
"""
bands = list(getattr(sc, "delta_by_dvol", []) or [])
if dvol_now is not None and bands:
bands_sorted = sorted(bands, key=lambda b: b.dvol_under)
for band in bands_sorted:
if dvol_now <= band.dvol_under:
return band.delta_target, band.delta_min, band.delta_max
last = bands_sorted[-1]
return last.delta_target, last.delta_min, last.delta_max
return sc.delta_target, sc.delta_min, sc.delta_max
def _select_short(
quotes: list[OptionQuote],
*,
spot: Decimal,
cfg: StrategyConfig,
dvol_now: Decimal | None = None,
) -> OptionQuote | None:
"""Pick the short-leg quote with delta closest to target inside both bands."""
sc = cfg.structure.short_strike
delta_target, delta_min, delta_max = _resolve_delta_band(sc, dvol_now)
eligible: list[OptionQuote] = []
for q in quotes:
dist = (q.strike - spot).copy_abs() / spot
if not (sc.distance_otm_pct_min <= dist <= sc.distance_otm_pct_max):
continue
abs_delta = q.delta.copy_abs()
if not (sc.delta_min <= abs_delta <= sc.delta_max):
if not (delta_min <= abs_delta <= delta_max):
continue
eligible.append(q)
if not eligible:
return None
return min(eligible, key=lambda q: abs(q.delta.copy_abs() - sc.delta_target))
return min(eligible, key=lambda q: abs(q.delta.copy_abs() - delta_target))
def _select_long(
@@ -143,6 +166,7 @@ def select_strikes(
spot: Decimal,
now: datetime,
cfg: StrategyConfig,
dvol_now: Decimal | None = None,
) -> tuple[OptionQuote, OptionQuote] | None:
"""Return the (short, long) quotes for the requested vertical, or ``None``.
@@ -161,7 +185,7 @@ def select_strikes(
if not typed:
return None
short = _select_short(typed, spot=spot, cfg=cfg)
short = _select_short(typed, spot=spot, cfg=cfg, dvol_now=dvol_now)
if short is None:
return None
+67
View File
@@ -15,6 +15,7 @@ from decimal import Decimal
from pydantic import BaseModel, ConfigDict
from cerbero_bite.config import SpreadType, StrategyConfig
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
__all__ = [
"EntryContext",
@@ -44,6 +45,31 @@ class EntryContext(BaseModel):
dealer_net_gamma: Decimal | None = None
liquidation_squeeze_risk_high: bool | None = None
# IV richness gate (§2.9). Differenza IV30g RV30g in punti vol.
# Optional, stessa logica best-effort dei filtri quant: ``None``
# significa "dato non disponibile" e fa saltare il gate (non
# invalida l'entry).
iv_minus_rv: Decimal | None = None
# Valori IV-RV nella finestra rolling già scelta dal caller
# (entry_cycle): tutti i record validi su window_days, ASC, NULL e
# fetch_ok=0 esclusi. Caricata dal repository quando
# `iv_minus_rv_adaptive_enabled` è True. Tuple per coerenza con
# frozen=True.
iv_rv_history: tuple[Decimal, ...] = ()
# Numero di giorni di calendario distinti coperti dalla storia
# IV-RV disponibile (non solo dalla finestra `iv_rv_history`).
# ``0`` = warmup hard, gate disabilitato (fail-open). Calcolato dal
# caller via `repository.count_iv_rv_distinct_days`.
iv_rv_n_days: int = 0
# DVOL al tick più vicino a now - vol_of_vol_lookback_hours.
# ``None`` = gap nel dato (es. cron mancante 24h fa) → VoV guard
# skip. Caricato dal repository in `entry_cycle` quando
# `vol_of_vol_guard_enabled` è True.
dvol_24h_ago: Decimal | None = None
class EntryDecision(BaseModel):
"""Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons."""
@@ -131,6 +157,47 @@ def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision:
):
reasons.append("imminent liquidation squeeze risk")
# §2.9: IV richness gate. Vendere vol senza un margine misurabile
# fra IV e RV è statisticamente neutro: l'edge della strategia
# esiste solo quando il premio è "ricco" rispetto a quanto il
# mercato si è effettivamente mosso. La modalità adattiva calcola
# la soglia come max(P_q rolling, iv_minus_rv_min) sulla storia
# disponibile in market_snapshots; altrimenti fallback alla
# soglia statica `iv_minus_rv_min`.
if entry_cfg.iv_minus_rv_filter_enabled and ctx.iv_minus_rv is not None:
if entry_cfg.iv_minus_rv_adaptive_enabled:
threshold = compute_adaptive_threshold(
history=ctx.iv_rv_history,
n_days=ctx.iv_rv_n_days,
percentile=entry_cfg.iv_minus_rv_percentile,
absolute_floor=entry_cfg.iv_minus_rv_min,
)
if threshold is not None and ctx.iv_minus_rv < threshold:
pct = int(entry_cfg.iv_minus_rv_percentile * 100)
reasons.append(
f"IV richness below P{pct} rolling "
f"(IV-RV={ctx.iv_minus_rv} < {threshold} vol pts)"
)
elif ctx.iv_minus_rv < entry_cfg.iv_minus_rv_min:
reasons.append(
f"IV richness below floor "
f"(IV-RV={ctx.iv_minus_rv} < {entry_cfg.iv_minus_rv_min} vol pts)"
)
# §4-quater roadmap: vol-of-vol guard. Blocca entry quando il
# regime di volatilità sta cambiando bruscamente, anche se IV-RV
# è alto. Fail-open su gap dati 24h fa.
if (
entry_cfg.vol_of_vol_guard_enabled
and ctx.dvol_24h_ago is not None
):
delta = abs(ctx.dvol_now - ctx.dvol_24h_ago)
if delta >= entry_cfg.vol_of_vol_threshold_pt:
reasons.append(
f"DVOL shifted {delta} pt in {entry_cfg.vol_of_vol_lookback_hours}h "
f"(threshold {entry_cfg.vol_of_vol_threshold_pt})"
)
return EntryDecision(accepted=not reasons, reasons=reasons)
+18
View File
@@ -28,8 +28,10 @@ __all__ = ["ExitAction", "ExitDecisionResult", "PositionSnapshot", "evaluate"]
ExitAction = Literal[
"HOLD",
"CLOSE_PROFIT",
"CLOSE_PROFIT_PARTIAL",
"CLOSE_STOP",
"CLOSE_VOL",
"CLOSE_VOL_HARVEST",
"CLOSE_TIME",
"CLOSE_DELTA",
"CLOSE_AVERSE",
@@ -115,6 +117,22 @@ def evaluate(snapshot: PositionSnapshot, cfg: StrategyConfig) -> ExitDecisionRes
f"mark {debit}{ec.profit_take_pct_of_credit:.0%} of credit {credit}",
)
# 1bis. Vol-collapse harvest (D): siamo IN profit (debit < credit) e
# la DVOL è scesa di tot punti rispetto all'entry. Edge IV-RV già
# catturato, non c'è motivo di tenere fino a profit_take. Esce
# opportunisticamente quando il regime di vol che giustificava
# l'entry non c'è più.
if (
ec.vol_harvest_dvol_decrease > 0
and debit < credit
and snapshot.dvol_now <= snapshot.dvol_at_entry - ec.vol_harvest_dvol_decrease
):
return _result(
"CLOSE_VOL_HARVEST",
f"DVOL {snapshot.dvol_now} ≤ entry {snapshot.dvol_at_entry} "
f"{ec.vol_harvest_dvol_decrease}, harvest while in profit",
)
# 2. Stop loss
if debit >= stop_thresh:
return _result(
@@ -16,6 +16,7 @@ from __future__ import annotations
import os
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from pathlib import Path
import pandas as pd
@@ -23,6 +24,7 @@ import plotly.graph_objects as go
import streamlit as st
from cerbero_bite.config.loader import load_strategy
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.gui.data_layer import (
DEFAULT_DB_PATH,
humanize_dt,
@@ -167,9 +169,21 @@ def _percentiles_strip(s: pd.Series) -> None:
st.caption("(nessun dato)")
return
quantiles = [0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95]
cols = st.columns(len(quantiles))
for col, q in zip(cols, quantiles, strict=False):
col.metric(f"P{int(q * 100)}", f"{s.quantile(q):.4g}")
# Inline markdown compatto: i valori a 4 cifre significative
# cadono regolarmente fuori dalla colonna se renderizzati con
# ``st.metric`` (font fisso, label sopra, no shrink). Render
# orizzontale con font ridotto evita il troncamento.
parts = [
f"<b>P{int(q * 100)}</b> {s.quantile(q):.4g}"
for q in quantiles
]
html = (
"<div style='font-size:0.85rem;line-height:1.5;"
"white-space:nowrap;overflow-x:auto;'>"
+ " &nbsp;·&nbsp; ".join(parts)
+ "</div>"
)
st.markdown(html, unsafe_allow_html=True)
def _render_metric(spec: MetricSpec, records: list[MarketSnapshotRecord]) -> None:
@@ -230,6 +244,132 @@ def _render_metric(spec: MetricSpec, records: list[MarketSnapshotRecord]) -> Non
_percentiles_strip(s)
def _render_adaptive_gate_panel(
strategy: object | None,
records: list[MarketSnapshotRecord],
) -> None:
"""Pannello informativo sul gate IV-RV adattivo (read-only)."""
if strategy is None:
return
try:
entry = strategy.entry # type: ignore[attr-defined]
except AttributeError:
return
if not getattr(entry, "iv_minus_rv_filter_enabled", False):
st.subheader("🎯 Gate IV-RV adattivo")
st.info("Gate IV-RV disabilitato nel profilo corrente.")
st.divider()
return
st.subheader("🎯 Gate IV-RV adattivo")
# records DESC (newest first) → history ASC con NULL/fetch_ok=0 esclusi
iv_rv_history: list[Decimal] = []
distinct_days: set[str] = set()
for r in reversed(records):
if r.fetch_ok and r.iv_minus_rv is not None:
iv_rv_history.append(r.iv_minus_rv)
distinct_days.add(r.timestamp.date().isoformat())
n_ticks = len(iv_rv_history)
n_days = len(distinct_days)
target = int(getattr(entry, "iv_minus_rv_window_target_days", 60))
min_days = int(getattr(entry, "iv_minus_rv_window_min_days", 30))
if n_days < 1:
status = "🟡 Warmup hard (nessun giorno coperto)"
elif n_days < min_days:
status = f"🟡 Warmup ({n_days}/{min_days}g — finestra crescente)"
elif n_days < target:
status = f"🟢 Attivo (finestra {min_days}g, target {target}g)"
else:
status = f"🟢 Attivo (finestra stabile {target}g)"
st.markdown(f"**Status:** {status} · {n_ticks} tick complessivi")
# Latest tick
iv_rv_now: Decimal | None = None
dvol_now: Decimal | None = None
latest_ts: datetime | None = None
for r in records: # records DESC
if r.fetch_ok:
iv_rv_now = r.iv_minus_rv
dvol_now = r.dvol
latest_ts = r.timestamp
break
adaptive_on = bool(getattr(entry, "iv_minus_rv_adaptive_enabled", False))
floor = Decimal(str(getattr(entry, "iv_minus_rv_min", "0")))
if adaptive_on:
percentile = Decimal(
str(getattr(entry, "iv_minus_rv_percentile", "0.25"))
)
try:
threshold = compute_adaptive_threshold(
history=iv_rv_history,
n_days=n_days,
percentile=percentile,
absolute_floor=floor,
)
except ValueError as exc:
st.warning(f"Configurazione gate non valida: {exc}")
threshold = None
c1, c2, c3 = st.columns(3)
pct_label = int(percentile * 100)
c1.metric(
f"Soglia P{pct_label} rolling",
f"{threshold:.2f}" if threshold is not None else "",
help="Soglia adattiva = max(percentile, floor)",
)
c2.metric(
"IV-RV ultimo tick",
f"{iv_rv_now:.2f}" if iv_rv_now is not None else "",
)
c3.metric("Floor assoluto", f"{floor:.2f}")
if threshold is not None and iv_rv_now is not None:
verdict = "✅ PASS" if iv_rv_now >= threshold else "❌ SKIP"
st.markdown(f"**Decisione hypothetical:** {verdict}")
else:
st.write(f"Modalità statica: floor = {floor} vol pts")
if bool(getattr(entry, "vol_of_vol_guard_enabled", False)):
st.markdown("---")
st.markdown("**Vol-of-Vol guard**")
threshold_pt = Decimal(
str(getattr(entry, "vol_of_vol_threshold_pt", "5"))
)
lookback_h = int(getattr(entry, "vol_of_vol_lookback_hours", 24))
# Find tick closest to latest_ts - lookback hours, tolerance 15 min
dvol_lookback: Decimal | None = None
if latest_ts is not None and dvol_now is not None:
target_ts = latest_ts - timedelta(hours=lookback_h)
best_delta = timedelta(minutes=15)
for r in records:
if not r.fetch_ok or r.dvol is None:
continue
d = abs(r.timestamp - target_ts)
if d <= best_delta:
best_delta = d
dvol_lookback = r.dvol
if dvol_lookback is not None and dvol_now is not None:
delta = abs(dvol_now - dvol_lookback)
c1, c2 = st.columns(2)
c1.metric(f"|ΔDVOL {lookback_h}h|", f"{delta:.2f}")
c2.metric("Soglia VoV", f"{threshold_pt:.2f}")
verdict = "✅ PASS" if delta < threshold_pt else "❌ SKIP"
st.markdown(f"**Verdict:** {verdict}")
else:
st.info(f"Lookback {lookback_h}h non disponibile (gap dati).")
st.divider()
def render() -> None:
st.title("📐 Calibrazione")
st.caption(
@@ -301,6 +441,8 @@ def render() -> None:
specs = _metric_specs(strategy)
_render_adaptive_gate_panel(strategy, records)
for spec in specs:
_render_metric(spec, records)
st.divider()
+354 -42
View File
@@ -11,6 +11,7 @@ La pagina è di sola lettura: non chiama MCP, non scrive sul DB.
from __future__ import annotations
import math
import os
from dataclasses import dataclass
from pathlib import Path
@@ -280,26 +281,56 @@ def _build_gates(
)
)
# --- IV RV (richness) — solo informativo --------------------
# --- IV RV (richness) — gate §2.9 ---------------------------
rv = (
float(snap.realized_vol_30d) if snap.realized_vol_30d is not None else None
)
iv_minus_rv = (
float(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
)
rows.append(
_GateRow(
"IV RV (richness)",
(
f"{iv_minus_rv:+.2f} pt vol"
if iv_minus_rv is not None
else ""
),
"info, > 0 = premio ricco",
"pass" if (iv_minus_rv is not None and iv_minus_rv > 0) else "n/a",
f"RV30={rv:.2f}" if rv is not None else "",
)
iv_min = float(getattr(entry, "iv_minus_rv_min", 0.0)) if entry else 0.0
iv_enabled = (
bool(getattr(entry, "iv_minus_rv_filter_enabled", False)) if entry else False
)
if not iv_enabled:
rows.append(
_GateRow(
"IV RV (richness)",
(
f"{iv_minus_rv:+.2f} pt vol"
if iv_minus_rv is not None
else ""
),
"filtro DISABILITATO (info-only)",
"n/a",
f"RV30={rv:.2f} · attiva con `iv_minus_rv_filter_enabled: true`"
if rv is not None
else "Attiva con `iv_minus_rv_filter_enabled: true`",
)
)
elif iv_minus_rv is None:
rows.append(
_GateRow(
"IV RV ≥ soglia",
"",
f"{iv_min:.1f} pt vol",
"n/a",
"Dato non disponibile in questo tick (best-effort skip).",
)
)
else:
ok = iv_minus_rv >= iv_min
rows.append(
_GateRow(
"IV RV ≥ soglia",
f"{iv_minus_rv:+.2f} pt vol",
f"{iv_min:.1f} pt vol",
"pass" if ok else "fail",
"Premio ricco rispetto a quanto il mercato si è davvero "
"mosso → edge sostenibile per il venditore di vol."
+ (f" RV30={rv:.2f}" if rv is not None else ""),
)
)
return rows
@@ -347,6 +378,44 @@ def _profile_caps(strategy: object | None) -> dict[str, float]:
return out
def _detect_features(strategy: object | None) -> dict[str, bool]:
"""Quali miglioramenti del PR FDAC sono ATTIVI in questa strategia.
- **A** (delta dinamico): `short_strike.delta_by_dvol` non vuoto.
- **D** (vol-harvest): `exit.vol_harvest_dvol_decrease > 0`.
- **F** (auto-pause): `auto_pause.enabled = true`.
- **IV** (IV-richness gate, dal PR precedente): `entry.iv_minus_rv_filter_enabled`.
"""
feats = {"A": False, "D": False, "F": False, "IV": False}
if strategy is None:
return feats
try:
feats["A"] = bool(
getattr(strategy.structure.short_strike, "delta_by_dvol", []) # type: ignore[attr-defined]
)
except Exception:
pass
try:
feats["D"] = (
float(getattr(strategy.exit, "vol_harvest_dvol_decrease", 0)) > 0 # type: ignore[attr-defined]
)
except Exception:
pass
try:
feats["F"] = bool(
getattr(getattr(strategy, "auto_pause", None), "enabled", False)
)
except Exception:
pass
try:
feats["IV"] = bool(
getattr(strategy.entry, "iv_minus_rv_filter_enabled", False) # type: ignore[attr-defined]
)
except Exception:
pass
return feats
def _compute_pl(
caps: dict[str, float],
*,
@@ -355,13 +424,56 @@ def _compute_pl(
win_rate: float,
trades_per_year: int,
eur_to_usd: float = 1.075,
features: dict[str, bool] | None = None,
) -> dict[str, float]:
"""Calcola le metriche P/L per un profilo di sizing."""
"""Calcola le metriche P/L per un profilo di sizing.
Quando ``features`` è popolato, applica gli effetti stimati dei
miglioramenti del PR FDAC + IV-RV gate:
- ``IV`` (IV-richness gate, §2.9): +5 pp win-rate, 25% trade/anno.
- ``A`` (delta dinamico, §3.2): +1.5 pp win-rate, sl_loss × 0.95.
- ``D`` (vol-harvest, §7-bis): 5% delle would-be-loss diventano
harvest exit a +0.20 × credito.
- ``F`` (auto-pause, §7-bis): 8% trade/anno (skip-day dopo
streak), e nei calcoli di drawdown atteso il streak_99 è
cappato a lookback_trades=5.
Effetti **stimati ex-ante** dalla letteratura short-vol systematic;
i valori puntuali andranno calibrati sul dataset accumulato.
"""
feats = features or {}
width = caps["width_pct"] * spot
credit = caps["credit_ratio"] * width
tp_profit = caps["profit_take"] * credit
sl_loss = (caps["stop_mult"] - 1.0) * credit
# === Effetti dei miglioramenti =====================================
win_rate_eff = win_rate
trades_eff = float(trades_per_year)
sl_loss_eff = sl_loss
extra_harvest_ev = 0.0
prob_harvest = 0.0
if feats.get("IV"):
# Skip più aggressivo + qualità migliore: +5 pp win, 25% trade.
win_rate_eff = min(0.95, win_rate_eff + 0.05)
trades_eff *= 0.75
if feats.get("A"):
# Migliore strike picking → +1.5 pp win-rate; riduzione del
# tail della perdita (5%) per le bande high-DVOL.
win_rate_eff = min(0.95, win_rate_eff + 0.015)
sl_loss_eff *= 0.95
if feats.get("D"):
# Vol-harvest: ~5% delle entrate intercettate prima dello stop
# con un piccolo profitto (+0.20×credit). Sottrae lo stesso
# volume dalle prob_loss.
prob_harvest = 0.05
extra_harvest_ev = 0.20 * credit
# F (auto-pause) agisce su streak_99 più sotto, e sul trades_eff.
if feats.get("F"):
trades_eff *= 0.92
cap_pertrade_usd = caps["cap_pertrade_eur"] * eur_to_usd
risk_target = min(caps["kelly"] * capital, cap_pertrade_usd)
n_kelly = int(risk_target // width) if width > 0 else 0
@@ -369,32 +481,62 @@ def _compute_pl(
prob_time_stop = 0.07
prob_other_stop = 0.03
prob_loss = max(0.0, 1.0 - win_rate - prob_time_stop - prob_other_stop)
prob_loss = max(
0.0,
1.0 - win_rate_eff - prob_time_stop - prob_other_stop - prob_harvest,
)
avg_time_stop_pl = 0.10 * credit
e_trade_gross = (
win_rate * tp_profit
- prob_loss * sl_loss
win_rate_eff * tp_profit
- prob_loss * sl_loss_eff
+ prob_time_stop * avg_time_stop_pl
+ prob_harvest * extra_harvest_ev
)
fees = 0.0003 * spot * 2
slippage = 0.03 * credit
e_trade_net = e_trade_gross - fees - slippage
# Multi-posizione concorrente: il P/L scala col numero di posizioni
# aperte simultaneamente (il loop entry crea N trade indipendenti
# quando max_concurrent > 1). Vedi caveat aggressiva.yaml: il
# supporto multi-asset richiede modifiche di codice; questo
# moltiplicatore stima cosa otterresti DOPO.
concurrency = max(1.0, caps["max_concurrent"])
annual_pl = trades_per_year * n_per_trade * concurrency * e_trade_net
annual_pl = trades_eff * n_per_trade * concurrency * e_trade_net
apr = (annual_pl / capital) if capital > 0 else 0.0
# --- Max drawdown -------------------------------------------------
# Due metriche distinte:
#
# 1. **Streak atteso (P99)**: lunghezza della peggior sequenza di
# stop consecutivi che ci si aspetta di vedere in un anno con
# probabilità ≤ 1%. Usa l'approssimazione union-bound:
# P(streak ≥ N in N_trade tentativi) ≈ N_trade × p_loss^N
# Imponendo questa quantità ≤ 0.01 e risolvendo per N:
# N = ceil( log(0.01 / N_trade) / log(p_loss) )
# Drawdown corrispondente = N × stop_loss × contracts × concurrency.
#
# 2. **Tail/gap risk**: scenario "gap notturno" in cui il mark
# salta oltre la copertura long PRIMA che lo stop sia
# eseguibile. La perdita massima reale è la larghezza intera
# dello spread meno il credito iniziale, su tutte le posizioni
# aperte simultaneamente.
if prob_loss > 0 and prob_loss < 1 and trades_per_year > 0:
streak_99 = max(
1,
int(math.ceil(
math.log(0.01 / trades_per_year) / math.log(prob_loss)
)) if prob_loss < 1 else 1,
)
else:
streak_99 = 0
expected_dd_usd = streak_99 * sl_loss * n_per_trade * concurrency
expected_dd_pct = expected_dd_usd / capital if capital > 0 else 0.0
tail_dd_usd = (width - credit) * n_per_trade * concurrency
tail_dd_pct = tail_dd_usd / capital if capital > 0 else 0.0
return {
"width": width,
"credit": credit,
"tp_profit": tp_profit,
"sl_loss": sl_loss,
"sl_loss": sl_loss_eff,
"risk_target": risk_target,
"n_per_trade": float(n_per_trade),
"concurrency": concurrency,
@@ -403,6 +545,15 @@ def _compute_pl(
"apr": apr,
"fees": fees,
"slippage": slippage,
"prob_loss": prob_loss,
"prob_harvest": prob_harvest,
"streak_99": float(streak_99),
"expected_dd_usd": expected_dd_usd,
"expected_dd_pct": expected_dd_pct,
"tail_dd_usd": tail_dd_usd,
"tail_dd_pct": tail_dd_pct,
"win_rate_eff": win_rate_eff,
"trades_eff": trades_eff,
}
@@ -411,6 +562,8 @@ def _render_profile_card(
caps: dict[str, float],
metrics: dict[str, float],
badge: str,
features: dict[str, bool] | None = None,
metrics_base: dict[str, float] | None = None,
) -> None:
"""Rendering di un profilo (conservativo o aggressivo) in una colonna."""
st.markdown(f"### {label} {badge}")
@@ -420,23 +573,86 @@ def _render_profile_card(
f"max {caps['max_n']:.0f} contratti × "
f"{caps['max_concurrent']:.0f} pos. concorrenti"
)
if features:
active = [k for k, v in features.items() if v]
if active:
st.caption(
"🟢 Miglioramenti attivi: "
+ " · ".join(
{
"IV": "**IV-RV gate**",
"A": "**A** delta dinamico",
"D": "**D** vol-harvest",
"F": "**F** auto-pause",
}.get(k, k)
for k in active
)
)
else:
st.caption("⚪ Nessun miglioramento attivo (formula base)")
cols = st.columns(2)
cols[0].metric("Contratti per trade", f"{metrics['n_per_trade']:.0f}")
cols[1].metric("Posizioni concorrenti", f"{metrics['concurrency']:.0f}")
cols = st.columns(2)
e_delta = (
f"{metrics['e_trade_net'] - metrics_base['e_trade_net']:+.1f}"
if metrics_base
else None
)
pl_delta = (
f"{metrics['annual_pl'] - metrics_base['annual_pl']:+.0f} USD vs base"
if metrics_base
else f"{metrics['apr']:+.1%} APR"
)
cols[0].metric(
"E[trade] netto",
f"{metrics['e_trade_net']:+.1f} USD",
delta=e_delta,
help=(
f"fees={metrics['fees']:.2f} USD, "
f"slippage={metrics['slippage']:.2f} USD"
f"win_rate effettivo={metrics['win_rate_eff']:.0%}, "
f"prob_loss={metrics['prob_loss']:.0%}, "
f"trade/anno={metrics['trades_eff']:.0f}"
),
)
cols[1].metric(
"P/L annuo stimato",
f"{metrics['annual_pl']:+.0f} USD",
delta=f"{metrics['apr']:+.1%} APR",
delta=f"{metrics['apr']:+.1%} APR" + (
f" ({metrics['annual_pl'] - metrics_base['annual_pl']:+.0f} vs base)"
if metrics_base
else ""
),
)
cols = st.columns(2)
cols[0].metric(
"Max DD attesa (P99)",
f"{metrics['expected_dd_usd']:.0f} USD",
delta=f"{-metrics['expected_dd_pct']:+.1%} cap",
delta_color="inverse",
help=(
f"Streak di {int(metrics['streak_99'])} stop consecutivi "
f"(probabilità ≤ 1% nell'anno) × perdita stop "
f"({metrics['sl_loss']:.0f} USD) × contratti × posizioni "
f"concorrenti. È la peggior sequenza che ti aspetti di "
"vedere; il drawdown reale può essere maggiore se i filtri "
"non rilevano un regime change."
),
)
cols[1].metric(
"Max DD coda (gap)",
f"{metrics['tail_dd_usd']:.0f} USD",
delta=f"{-metrics['tail_dd_pct']:+.1%} cap",
delta_color="inverse",
help=(
"Scenario gap notturno: il mark salta oltre la copertura "
"long PRIMA che lo stop sia eseguibile. Perdita = larghezza "
"intera meno credito, su tutte le posizioni aperte. "
"I filtri quant + macro lo riducono ma NON lo annullano."
),
)
if metrics["n_per_trade"] == 0:
@@ -475,18 +691,56 @@ def _render_pl_panel(
),
)
trades_per_year = col_d.slider(
"Trade / anno (post-filtri)", 8, 30, value=18, step=1,
help="52 lunedì × probabilità di superare i filtri (3050%).",
"Trade / anno (post-filtri)", 20, 200, value=110, step=5,
help=(
"Crypto è 24/7: l'entry cycle gira ogni giorno alle 14:00 UTC "
"(`0 14 * * *`). 365 candidature × ~30-50% pass-rate effettivo "
"(post-filtri + cap concorrenza) ≈ 110-180/anno. Auto-pause F "
"riduce ulteriormente di ~8% in regime drawdown."
),
)
cons_caps = _profile_caps(strategy_conservativa or strategy_main)
aggr_caps = _profile_caps(strategy_aggressiva)
cons_feats = _detect_features(strategy_conservativa or strategy_main)
aggr_feats = _detect_features(strategy_aggressiva)
apply_features = st.checkbox(
"Applica gli effetti dei miglioramenti FDAC + IV-RV gate "
"letti dai due `strategy.*.yaml`",
value=True,
help=(
"Quando ON, ogni colonna applica gli effetti stimati delle "
"feature attive nel rispettivo profilo. OFF = formula base "
"(senza miglioramenti) per confronto pulito."
),
)
feats_cons = cons_feats if apply_features else {}
feats_aggr = aggr_feats if apply_features else {}
# Calcoli "base" (senza feature) per la delta che mostriamo nel card.
cons_base = _compute_pl(
cons_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
)
aggr_base = _compute_pl(
aggr_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
)
cons = _compute_pl(
cons_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
features=feats_cons,
)
aggr = _compute_pl(
aggr_caps,
@@ -494,22 +748,31 @@ def _render_pl_panel(
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
features=feats_aggr,
)
cons_version = getattr(
strategy_conservativa or strategy_main, "config_version", "?"
)
aggr_version = getattr(strategy_aggressiva, "config_version", "?")
col_cons, col_aggr = st.columns(2)
with col_cons:
_render_profile_card(
"🛡️ Conservativa",
cons_caps,
cons,
"_(golden config v1.0.0)_",
f"_(golden config v{cons_version})_",
features=feats_cons,
metrics_base=cons_base if apply_features and any(feats_cons.values()) else None,
)
with col_aggr:
_render_profile_card(
"🔥 Aggressiva",
aggr_caps,
aggr,
"_(deroga §11, richiede paper trading)_",
f"_(v{aggr_version} · deroga §11, richiede paper trading)_",
features=feats_aggr,
metrics_base=aggr_base if apply_features and any(feats_aggr.values()) else None,
)
if aggr["annual_pl"] > 0 and cons["annual_pl"] > 0:
@@ -520,14 +783,61 @@ def _render_pl_panel(
"APR). Drawdown atteso scala con lo stesso fattore."
)
if win_rate < 0.72:
if cons["annual_pl"] < 0 and aggr["annual_pl"] < 0:
st.error(
"**Win rate sotto 0.72: entrambi i profili perdono soldi.** "
"Selling vol nudo è strutturalmente neutro qui. L'edge della "
"strategia sono i FILTRI (dealer gamma>0, no macro, "
"liquidation≠high, bias chiaro) che alzano il win rate sopra "
"il 0.75. Senza filtri attivi nessuno dei due profili è "
"viable."
f"**Entrambi i profili in perdita** (cons {cons['apr']:+.1%}, "
f"aggr {aggr['apr']:+.1%} APR). Selling vol nudo a win rate "
f"{win_rate:.0%} è strutturalmente non profittevole. L'edge "
"sono i FILTRI (dealer gamma>0, no macro, liquidation≠high, "
"bias chiaro) e i miglioramenti F+D+A+IV-RV gate, che alzano "
"il win rate effettivo sopra ~0.75 e/o riducono i tail loss. "
"Spunta l'opzione 'Applica gli effetti dei miglioramenti' qui "
"sopra per vedere i numeri con i filtri attivi."
)
elif cons["annual_pl"] < 0:
st.warning(
f"**Conservativo in perdita** ({cons['apr']:+.1%} APR), "
f"aggressivo positivo ({aggr['apr']:+.1%} APR). I miglioramenti "
"F+D+A+IV-RV gate stanno facendo il loro lavoro sull'aggressivo. "
"Sul conservativo i cap stretti riducono troppo il P/L atteso "
"a questo win rate."
)
# === Mini-tabella: contributo marginale di ogni feature =====
if apply_features and (any(feats_cons.values()) or any(feats_aggr.values())):
st.markdown("**Contributo marginale di ogni feature** (profilo aggressivo)")
contrib_rows = []
for label, key in [
("IV — IV-richness gate", "IV"),
("A — Delta dinamico", "A"),
("D — Vol-harvest", "D"),
("F — Auto-pause", "F"),
]:
single_feat = {key: True}
m = _compute_pl(
aggr_caps,
capital=capital,
spot=spot,
win_rate=win_rate,
trades_per_year=trades_per_year,
features=single_feat,
)
delta_pl = m["annual_pl"] - aggr_base["annual_pl"]
delta_apr = m["apr"] - aggr_base["apr"]
active = "" if aggr_feats.get(key) else ""
contrib_rows.append(
{
"Feature": label,
"Attiva nel YAML": active,
"ΔP/L annuo (solo questa)": f"{delta_pl:+.0f} USD",
"ΔAPR": f"{delta_apr:+.1%}",
}
)
st.table(contrib_rows)
st.caption(
"Ogni riga mostra il contributo del SINGOLO feature (le altre "
"spente). Effetti stimati ex-ante; calibrabili sui dati "
"raccolti via `📐 Calibrazione`."
)
# Sensibilità win-rate per il profilo aggressivo (più informativo)
@@ -540,6 +850,7 @@ def _render_pl_panel(
spot=spot,
win_rate=wr,
trades_per_year=trades_per_year,
features=feats_aggr,
)
m_c = _compute_pl(
cons_caps,
@@ -547,14 +858,15 @@ def _render_pl_panel(
spot=spot,
win_rate=wr,
trades_per_year=trades_per_year,
features=feats_cons,
)
sens_rows.append(
{
"Win rate": f"{wr:.0%}",
"Conservativa P/L": f"{m_c['annual_pl']:+.0f} USD",
"Conservativa APR": f"{m_c['apr']:+.1%}",
"Aggressiva P/L": f"{m_a['annual_pl']:+.0f} USD",
"Aggressiva APR": f"{m_a['apr']:+.1%}",
"Cons. APR": f"{m_c['apr']:+.1%}",
"Cons. Max DD": f"{m_c['expected_dd_pct']:.1%}",
"Aggr. APR": f"{m_a['apr']:+.1%}",
"Aggr. Max DD": f"{m_a['expected_dd_pct']:.1%}",
}
)
st.table(sens_rows)
+175
View File
@@ -0,0 +1,175 @@
"""Auto-pause circuit breaker (§7-bis F).
Pure-function evaluation that consults `system_state.auto_pause_until`
and the rolling P/L of the last N closed positions to decide whether
the engine should skip an entry cycle.
Two responsibilities, both deterministic at call time:
* :func:`is_paused` returns ``True`` when the persisted
``auto_pause_until`` is in the future. Independent from the kill
switch, which targets technical errors.
* :func:`evaluate_drawdown_breach` given the last N closed P/Ls and
the current capital, returns whether the rolling drawdown breached
the configured ``max_drawdown_pct`` threshold. The orchestrator
layer is the one that flips the persisted state on breach (this
module stays I/O-free for testability).
The two are separated on purpose: ``is_paused`` is the cheap,
read-only gate consulted at the start of every entry cycle; the
breach evaluation runs once per cycle right after the entry
filtering, before the entry is actually placed.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from decimal import Decimal
from cerbero_bite.config.schema import AutoPauseConfig
from cerbero_bite.state.models import SystemStateRecord
__all__ = [
"AutoPauseDecision",
"PauseStatus",
"evaluate_drawdown_breach",
"is_paused",
"pause_until",
]
@dataclass(frozen=True)
class PauseStatus:
"""Snapshot del flag di auto-pausa al momento della valutazione."""
paused: bool
until: datetime | None
reason: str | None
@dataclass(frozen=True)
class AutoPauseDecision:
"""Esito di :func:`evaluate_drawdown_breach`."""
should_pause: bool
cumulative_pnl_usd: Decimal
drawdown_pct: Decimal
threshold_pct: Decimal
reason: str | None
def is_paused(
state: SystemStateRecord | None, *, now: datetime
) -> PauseStatus:
"""Restituisce lo stato della pausa rispetto a ``now``.
``state == None`` o ``auto_pause_until == None`` o
``auto_pause_until <= now`` engine attivo.
"""
if state is None or state.auto_pause_until is None:
return PauseStatus(paused=False, until=None, reason=None)
until = state.auto_pause_until
if until.tzinfo is not None and now.tzinfo is None:
# Coerenza: se il valore persistito è tz-aware, normalizziamo.
return PauseStatus(
paused=until > now.replace(tzinfo=until.tzinfo),
until=until,
reason=state.auto_pause_reason,
)
return PauseStatus(
paused=until > now,
until=until,
reason=state.auto_pause_reason,
)
def pause_until(now: datetime, days: int) -> datetime:
"""Calcola la scadenza della pausa (``now + days``).
Estratto in funzione separata per facilitare i test e per ricordare
che la pausa è espressa in **giorni** (la strategia ha cron
giornaliero su crypto 24/7; pause sub-giornaliere non avrebbero
modo di evitare un'entry).
"""
return now + timedelta(days=max(1, days))
def evaluate_drawdown_breach(
*,
cfg: AutoPauseConfig,
recent_pnl_usd: list[Decimal],
capital_usd: Decimal,
) -> AutoPauseDecision:
"""Decide se la pausa va armata ora dato il rolling P/L.
Regola: se la somma dei P/L delle ultime ``cfg.lookback_trades``
posizioni chiuse è negativa e in valore assoluto eccede
``cfg.max_drawdown_pct × capital_usd``, ritorna
``should_pause=True``. Tutte le altre condizioni False.
``cfg.enabled=False`` ritorna sempre False (filtro disabilitato).
Lookback insufficiente ritorna False (non scattiamo finché non
abbiamo abbastanza storia per giudicare).
"""
threshold_pct = cfg.max_drawdown_pct
cumulative = sum((p for p in recent_pnl_usd), start=Decimal("0"))
if not cfg.enabled:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
if len(recent_pnl_usd) < cfg.lookback_trades:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
if capital_usd <= 0:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=Decimal("0"),
threshold_pct=threshold_pct,
reason=None,
)
# Solo perdite ci interessano: vincite cumulate non scattano la pausa.
if cumulative >= 0:
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=cumulative / capital_usd,
threshold_pct=threshold_pct,
reason=None,
)
drawdown_pct = (-cumulative) / capital_usd
if drawdown_pct >= threshold_pct:
return AutoPauseDecision(
should_pause=True,
cumulative_pnl_usd=cumulative,
drawdown_pct=drawdown_pct,
threshold_pct=threshold_pct,
reason=(
f"rolling DD {drawdown_pct:.2%}{threshold_pct:.2%} "
f"(last {cfg.lookback_trades} trades, "
f"cumulative {cumulative} USD)"
),
)
return AutoPauseDecision(
should_pause=False,
cumulative_pnl_usd=cumulative,
drawdown_pct=drawdown_pct,
threshold_pct=threshold_pct,
reason=None,
)
+198 -7
View File
@@ -1,9 +1,11 @@
"""Weekly entry decision loop (``docs/06-operational-flow.md`` §2).
"""Daily entry decision loop (``docs/06-operational-flow.md`` §2).
Pure orchestration over the existing core/clients/state primitives.
The cycle is auto-execute: when every gate passes, the engine sends
the combo order without asking Adriano. Telegram is used only to
notify the outcome.
Crypto è 24/7: la cadenza di candidatura non è gateata sulla
settimana, sono i gate quantitativi a decidere se entrare o saltare
il giorno. Pure orchestration over the existing core/clients/state
primitives. The cycle is auto-execute: when every gate passes, the
engine sends the combo order without asking Adriano. Telegram is
used only to notify the outcome.
"""
from __future__ import annotations
@@ -28,6 +30,7 @@ from cerbero_bite.clients.macro import MacroClient
from cerbero_bite.clients.portfolio import PortfolioClient
from cerbero_bite.clients.sentiment import SentimentClient
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.core.combo_builder import ComboProposal, build, select_strikes
from cerbero_bite.core.entry_validator import (
EntryContext,
@@ -38,6 +41,7 @@ from cerbero_bite.core.entry_validator import (
from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check
from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts
from cerbero_bite.core.types import OptionQuote
from cerbero_bite.runtime import auto_pause as auto_pause_module
from cerbero_bite.runtime.alert_manager import AlertManager
from cerbero_bite.runtime.dependencies import RuntimeContext
from cerbero_bite.state import (
@@ -64,6 +68,7 @@ _STATUS_NO_ENTRY = "no_entry"
_STATUS_BROKER_REJECT = "broker_reject"
_STATUS_KILL_SWITCH = "kill_switch_armed"
_STATUS_HAS_OPEN = "has_open_position"
_STATUS_AUTO_PAUSED = "auto_paused"
@dataclass(frozen=True)
@@ -94,6 +99,7 @@ class _MarketSnapshot:
portfolio_eur: Decimal
dealer_net_gamma: Decimal | None
liquidation_squeeze_risk_high: bool | None
iv_minus_rv: Decimal | None
async def _gather_snapshot(
@@ -159,6 +165,9 @@ async def _gather_snapshot(
liquidation_t: asyncio.Task[bool | None] = asyncio.create_task(
_safe_liquidation_squeeze(sentiment)
)
iv_rv_t: asyncio.Task[Decimal | None] = asyncio.create_task(
_safe_iv_minus_rv(deribit)
)
await asyncio.gather(
spot_t,
@@ -172,6 +181,7 @@ async def _gather_snapshot(
portfolio_t,
dealer_t,
liquidation_t,
iv_rv_t,
)
return _MarketSnapshot(
spot_eth_usd=spot_t.result(),
@@ -185,6 +195,7 @@ async def _gather_snapshot(
portfolio_eur=portfolio_t.result(),
dealer_net_gamma=dealer_t.result(),
liquidation_squeeze_risk_high=liquidation_t.result(),
iv_minus_rv=iv_rv_t.result(),
)
@@ -196,6 +207,20 @@ async def _safe_dealer_gamma(deribit: DeribitClient) -> Decimal | None:
return snap.total_net_dealer_gamma
async def _safe_iv_minus_rv(deribit: DeribitClient) -> Decimal | None:
"""Best-effort fetch of the IV30g RV30g spread (vol points)."""
try:
rv = await deribit.realized_vol("ETH")
except Exception:
return None
if not isinstance(rv, dict):
return None
value = rv.get("iv_minus_rv_30d")
if value is None:
return None
return value if isinstance(value, Decimal) else Decimal(str(value))
async def _safe_liquidation_squeeze(sentiment: SentimentClient) -> bool | None:
try:
heatmap = await sentiment.liquidation_heatmap("ETH")
@@ -291,6 +316,51 @@ async def _build_quotes(
return out
def _select_window_days(entry_cfg: object, n_days: int) -> int:
"""Sceglie la finestra in giorni per il gate adattivo dato n_days
disponibili.
Spec: warmup hard se ``n_days == 0`` 0; finestra ``target_days``
se ``n_days >= target_days``; ``min_days`` se ``n_days >= min_days``;
altrimenti tutta la storia disponibile (capped a ``target_days``).
"""
target = int(getattr(entry_cfg, "iv_minus_rv_window_target_days", 60))
min_days = int(getattr(entry_cfg, "iv_minus_rv_window_min_days", 30))
if n_days < 1:
return 0
if n_days >= target:
return target
if n_days >= min_days:
return min_days
return target # storia parziale: query fino a target, repository ne ritorna n_days
def _audit_threshold(
entry_cfg: object,
iv_rv_history: tuple[Decimal, ...],
n_days: int,
) -> str | None:
"""Soglia P_q rolling effettivamente usata dal gate, per il decisions log."""
if not getattr(entry_cfg, "iv_minus_rv_filter_enabled", False):
return None
if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False):
return str(getattr(entry_cfg, "iv_minus_rv_min", Decimal("0")))
threshold = compute_adaptive_threshold(
history=iv_rv_history,
n_days=n_days,
percentile=entry_cfg.iv_minus_rv_percentile, # type: ignore[attr-defined]
absolute_floor=entry_cfg.iv_minus_rv_min, # type: ignore[attr-defined]
)
return None if threshold is None else str(threshold)
def _audit_window_days(entry_cfg: object, n_days: int) -> int | None:
"""Numero di giorni effettivamente usati dalla finestra rolling."""
if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False):
return None
return _select_window_days(entry_cfg, n_days)
def _max_loss_per_contract_usd(short_strike: Decimal, long_strike: Decimal) -> Decimal:
return (short_strike - long_strike).copy_abs()
@@ -306,7 +376,7 @@ async def run_entry_cycle(
eur_to_usd_rate: Decimal,
now: datetime | None = None,
) -> EntryCycleResult:
"""Run one weekly entry evaluation cycle.
"""Run one daily entry evaluation cycle.
The function is idempotent and side-effect aware: it persists the
decision in the ``decisions`` table regardless of outcome and only
@@ -322,6 +392,28 @@ async def run_entry_cycle(
)
return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch")
# §7-bis (F): auto-pause circuit breaker. Read-only consultation
# of the persisted state — the breach evaluation runs later, after
# capital is known.
conn = connect_state(ctx.db_path)
try:
sys_state = ctx.repository.get_system_state(conn)
finally:
conn.close()
pause_status = auto_pause_module.is_paused(sys_state, now=when)
if pause_status.paused:
await alert.low(
source="entry_cycle",
message=(
f"auto-paused until {pause_status.until} "
f"({pause_status.reason or 'no reason'}) — skipping"
),
)
return EntryCycleResult(
status=_STATUS_AUTO_PAUSED,
reason=pause_status.reason or "auto_paused",
)
# Has open position?
conn = connect_state(ctx.db_path)
try:
@@ -344,7 +436,83 @@ async def run_entry_cycle(
)
capital_usd = snap.portfolio_eur * eur_to_usd_rate
# §7-bis (F): rolling drawdown breach evaluation. Se le ultime N
# posizioni chiuse hanno cumulato perdite oltre la soglia, armiamo
# la pausa e usciamo subito (l'entry di questo ciclo è saltata).
auto_cfg = cfg.auto_pause
if auto_cfg.enabled:
conn = connect_state(ctx.db_path)
try:
recent_pnls = ctx.repository.recent_closed_position_pnls_usd(
conn, limit=auto_cfg.lookback_trades
)
finally:
conn.close()
breach = auto_pause_module.evaluate_drawdown_breach(
cfg=auto_cfg,
recent_pnl_usd=recent_pnls,
capital_usd=capital_usd,
)
if breach.should_pause:
until = auto_pause_module.pause_until(when, auto_cfg.pause_days)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
ctx.repository.set_auto_pause(
conn, until=until, reason=breach.reason
)
finally:
conn.close()
await alert.high(
source="entry_cycle",
message=(
f"auto-pause armed: {breach.reason} — paused until {until}"
),
)
return EntryCycleResult(
status=_STATUS_AUTO_PAUSED,
reason=breach.reason or "auto_paused",
)
# 2. Entry filters
entry_cfg = cfg.entry
asset = cfg.asset.symbol
iv_rv_history: tuple[Decimal, ...] = ()
iv_rv_n_days: int = 0
dvol_24h_ago: Decimal | None = None
if entry_cfg.iv_minus_rv_filter_enabled and entry_cfg.iv_minus_rv_adaptive_enabled:
conn = connect_state(ctx.db_path)
try:
iv_rv_n_days = ctx.repository.count_iv_rv_distinct_days(
conn,
asset=asset,
max_days=entry_cfg.iv_minus_rv_window_target_days,
as_of=when,
)
window_days = _select_window_days(entry_cfg, iv_rv_n_days)
if window_days > 0:
iv_rv_history = tuple(
ctx.repository.iv_rv_values_for_window(
conn,
asset=asset,
window_days=window_days,
as_of=when,
)
)
finally:
conn.close()
if entry_cfg.vol_of_vol_guard_enabled:
conn = connect_state(ctx.db_path)
try:
dvol_24h_ago = ctx.repository.dvol_lookback(
conn,
asset=asset,
reference=when - timedelta(hours=entry_cfg.vol_of_vol_lookback_hours),
)
finally:
conn.close()
entry_ctx = EntryContext(
capital_usd=capital_usd,
dvol_now=snap.dvol,
@@ -353,7 +521,11 @@ async def run_entry_cycle(
next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma,
iv_minus_rv=snap.iv_minus_rv,
liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high,
iv_rv_history=iv_rv_history,
iv_rv_n_days=iv_rv_n_days,
dvol_24h_ago=dvol_24h_ago,
)
decision = validate_entry(entry_ctx, cfg)
inputs = {
@@ -370,6 +542,20 @@ async def run_entry_cycle(
"eth_holdings_pct": str(snap.eth_holdings_pct),
"portfolio_eur": str(snap.portfolio_eur),
"capital_usd": str(capital_usd),
"iv_minus_rv": (
str(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
),
"iv_rv_history_n": len(iv_rv_history),
"iv_rv_n_days": iv_rv_n_days,
"iv_rv_threshold_used": _audit_threshold(
entry_cfg, iv_rv_history, iv_rv_n_days
),
"iv_rv_window_used_days": _audit_window_days(
entry_cfg, iv_rv_n_days
),
"dvol_24h_ago": (
str(dvol_24h_ago) if dvol_24h_ago is not None else None
),
}
}
if not decision.accepted:
@@ -436,7 +622,12 @@ async def run_entry_cycle(
)
quotes = await _build_quotes(ctx.deribit, chain_meta)
selection = select_strikes(
chain=quotes, bias=bias, spot=snap.spot_eth_usd, now=when, cfg=cfg
chain=quotes,
bias=bias,
spot=snap.spot_eth_usd,
now=when,
cfg=cfg,
dvol_now=snap.dvol, # §3.2 (A) — strike picker dipendente dal regime DVOL
)
if selection is None:
await _record_decision(
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Any
from cerbero_bite.clients._exceptions import McpError
from cerbero_bite.state import connect, transaction
from cerbero_bite.state.models import MarketSnapshotRecord
from cerbero_bite.state.models import DvolSnapshot, MarketSnapshotRecord
if TYPE_CHECKING:
from cerbero_bite.runtime.dependencies import RuntimeContext
@@ -181,6 +181,21 @@ async def collect_market_snapshot(
try:
with transaction(conn):
ctx.repository.record_market_snapshot(conn, record)
# Mirror ETH spot+DVOL into dvol_history so monitor_cycle's
# return_4h lookup has local samples even in data-only mode.
if (
record.asset == "ETH"
and record.spot is not None
and record.dvol is not None
):
ctx.repository.record_dvol_snapshot(
conn,
DvolSnapshot(
timestamp=record.timestamp,
dvol=record.dvol,
eth_spot=record.spot,
),
)
finally:
conn.close()
persisted += 1
@@ -1,10 +1,11 @@
"""Periodic option-chain snapshot collector (§13).
Fetches the Deribit option chain for every strike entro la finestra
DTE configurata, prima del trigger entry settimanale (cron
``55 13 * * MON`` di default). Persiste un quote per ogni strumento
in ``option_chain_snapshots`` con un timestamp condiviso, che diventa
il dato di base per:
DTE configurata. Cadenza di default ``*/15 * * * *`` (allineata a
``market_snapshot``: crypto è 24/7 e l'accumulo dataset deve essere
continuo, non gateato sui rollover TradFi-style). Persiste un quote
per ogni strumento in ``option_chain_snapshots`` con un timestamp
condiviso, che diventa il dato di base per:
* il backtest non-stilizzato (vedi ``core/backtest.py``),
* la calibrazione empirica dello skew premium e del credit/width
+2 -2
View File
@@ -50,13 +50,13 @@ _log = logging.getLogger("cerbero_bite.runtime.orchestrator")
Environment = Literal["testnet", "mainnet"]
# Default cron schedule (matches docs/06-operational-flow.md table).
_CRON_ENTRY = "0 14 * * MON"
_CRON_ENTRY = "0 14 * * *" # crypto 24/7: candidatura giornaliera; i gate decidono se entrare
_CRON_MONITOR = "0 2,14 * * *"
_CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry
_CRON_OPTION_CHAIN_SNAPSHOT = "*/15 * * * *" # crypto è 24/7: cadenza continua allineata a market_snapshot
_BACKUP_RETENTION_DAYS = 30
@@ -0,0 +1,14 @@
-- 0004_auto_pause.sql — circuit breaker su drawdown rolling (§7-bis F)
--
-- Aggiunge alla `system_state` il timestamp fino a cui l'engine è in
-- pausa automatica per via di un drawdown sopra soglia. NULL = engine
-- attivo. Quando il valore è nel futuro, il rule engine salta il
-- ciclo entry e logga la motivazione.
--
-- Indipendente dal kill_switch (che resta dedicato a errori tecnici
-- e a comandi manuali esplicitati). Le due tutele coesistono.
ALTER TABLE system_state ADD COLUMN auto_pause_until TEXT;
ALTER TABLE system_state ADD COLUMN auto_pause_reason TEXT;
PRAGMA user_version = 4;
@@ -1,10 +1,10 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
-- 0005_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
-- Snapshot della option chain Deribit prelevata ogni 15 minuti (stesso
-- scheduler di market_snapshots, cron */15) per ogni strike entro ±30%
-- dallo spot e per ogni scadenza nella finestra 14-28 DTE. Dato di
-- base per il backtest non-stilizzato e per calibrare empiricamente
-- lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
+2
View File
@@ -215,3 +215,5 @@ class SystemStateRecord(BaseModel):
config_version: str
started_at: datetime
last_audit_hash: str | None = None
auto_pause_until: datetime | None = None
auto_pause_reason: str | None = None
+153 -1
View File
@@ -13,7 +13,7 @@ Decimals are stored as TEXT to preserve precision (see
from __future__ import annotations
import sqlite3
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Any
from uuid import UUID
@@ -408,6 +408,111 @@ class Repository:
).fetchall()
return [_row_to_market_snapshot(r) for r in rows]
def count_iv_rv_distinct_days(
self,
conn: sqlite3.Connection,
*,
asset: str,
max_days: int,
as_of: datetime | None = None,
) -> int:
"""Numero di giorni di calendario distinti coperti da IV-RV validi.
Esclude righe con ``fetch_ok=0`` o ``iv_minus_rv IS NULL``.
Usato dal caller del gate adattivo per decidere la finestra
(warmup hard / min_days / target_days).
Args:
as_of: Reference time for the rolling window. Defaults to
``datetime.now(UTC)``.
"""
if max_days <= 0:
raise ValueError(f"max_days must be positive, got {max_days}")
ref = as_of if as_of is not None else datetime.now(UTC)
if ref.tzinfo is None:
raise ValueError("as_of must be timezone-aware")
cutoff = ref - timedelta(days=max_days)
row = conn.execute(
"SELECT COUNT(DISTINCT substr(timestamp, 1, 10)) AS n "
"FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND iv_minus_rv IS NOT NULL "
" AND timestamp >= ?",
(asset, _enc_dt(cutoff)),
).fetchone()
return int(row["n"]) if row is not None else 0
def iv_rv_values_for_window(
self,
conn: sqlite3.Connection,
*,
asset: str,
window_days: int,
as_of: datetime | None = None,
) -> list[Decimal]:
"""Valori IV-RV ordinati ASC su ``[as_of - window_days, as_of]``.
Esclude righe con ``fetch_ok=0`` o ``iv_minus_rv IS NULL``.
Tutti i record validi della finestra concorrono come singoli
contributi alla statistica del percentile, indipendentemente
dalla cadenza con cui sono stati raccolti (tick live vs backfill
daily).
"""
if window_days <= 0:
raise ValueError(f"window_days must be positive, got {window_days}")
ref = as_of if as_of is not None else datetime.now(UTC)
if ref.tzinfo is None:
raise ValueError("as_of must be timezone-aware")
cutoff = ref - timedelta(days=window_days)
rows = conn.execute(
"SELECT iv_minus_rv FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND iv_minus_rv IS NOT NULL "
" AND timestamp >= ? "
"ORDER BY timestamp ASC",
(asset, _enc_dt(cutoff)),
).fetchall()
return [Decimal(str(r["iv_minus_rv"])) for r in rows]
def dvol_lookback(
self,
conn: sqlite3.Connection,
*,
asset: str,
reference: datetime,
tolerance_minutes: int = 15,
) -> Decimal | None:
"""DVOL al tick più vicino a `reference`, entro ±tolerance_minutes.
Ritorna ``None`` se non esiste un tick valido (``fetch_ok=1``,
``dvol IS NOT NULL``) entro la tolerance. Usato dal Vol-of-Vol
guard per stimare DVOL N ore fa.
"""
if reference.tzinfo is None:
raise ValueError("reference must be timezone-aware")
ref_lo = (reference - timedelta(minutes=tolerance_minutes)).astimezone(UTC)
ref_hi = (reference + timedelta(minutes=tolerance_minutes)).astimezone(UTC)
row = conn.execute(
"SELECT dvol, timestamp FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND dvol IS NOT NULL "
" AND timestamp >= ? "
" AND timestamp <= ? "
"ORDER BY ABS(julianday(timestamp) - julianday(?)) ASC LIMIT 1",
(
asset,
_enc_dt(ref_lo),
_enc_dt(ref_hi),
_enc_dt(reference),
),
).fetchone()
if row is None:
return None
return Decimal(str(row["dvol"]))
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
@@ -586,6 +691,16 @@ class Repository:
last_audit_hash=(
row["last_audit_hash"] if "last_audit_hash" in keys else None
),
auto_pause_until=(
_dec_dt(row["auto_pause_until"])
if "auto_pause_until" in keys
else None
),
auto_pause_reason=(
row["auto_pause_reason"]
if "auto_pause_reason" in keys
else None
),
)
def set_last_audit_hash(
@@ -624,6 +739,43 @@ class Repository:
(_enc_dt(now),),
)
def set_auto_pause(
self,
conn: sqlite3.Connection,
*,
until: datetime | None,
reason: str | None,
) -> None:
"""Imposta o azzera la pausa automatica (§7-bis F).
``until = None`` annulla la pausa (l'engine torna attivo).
Il setter è idempotente: chiamarlo con un until già nel passato
è equivalente a clear.
"""
conn.execute(
"UPDATE system_state SET auto_pause_until = ?, "
"auto_pause_reason = ? WHERE id = 1",
(_enc_dt(until) if until is not None else None, reason),
)
def recent_closed_position_pnls_usd(
self, conn: sqlite3.Connection, *, limit: int
) -> list[Decimal]:
"""Ritorna la lista dei pnl_usd delle ultime ``limit`` posizioni chiuse,
ordinate dalla più recente alla più vecchia. Posizioni con
``pnl_usd`` ``NULL`` (es. chiuse di emergenza senza P/L noto)
sono saltate. Usato dal circuit breaker §7-bis F.
"""
if limit <= 0:
return []
rows = conn.execute(
"SELECT pnl_usd FROM positions "
"WHERE closed_at IS NOT NULL AND pnl_usd IS NOT NULL "
"ORDER BY closed_at DESC LIMIT ?",
(limit,),
).fetchall()
return [Decimal(row["pnl_usd"]) for row in rows]
# ---------------------------------------------------------------------------
# Row → model converters
+46 -5
View File
@@ -28,8 +28,8 @@
# 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante**
# di cosa otterresti DOPO quel lavoro di codice.
config_version: "1.0.0-aggressiva"
config_hash: "b931a2b96fbc149b21cae84a196ee8bad10220b5ee8fa9ab0ed06ae52d7dc531"
config_version: "1.4.0-aggressiva"
config_hash: "7fa9b0be5b56517293421bc19838b700da595725360fe018a1be13b802dea859"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -38,7 +38,7 @@ asset:
exchange: "deribit"
entry:
cron: "0 14 * * MON"
cron: "0 14 * * *"
skip_holidays_country: "IT"
capital_min_usd: "2880" # 4× del minimo conservativo (720)
@@ -65,6 +65,23 @@ entry:
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9). In Aggressiva il gate è in modalità
# adattiva: la soglia è il P25 rolling sui market_snapshots
# (warmup: usa la storia disponibile finché < 30g, poi finestra 30g
# fino a 60g, poi fissa 60g). `iv_minus_rv_min: 0` = floor zero,
# lascia decidere al P25.
iv_minus_rv_filter_enabled: true
iv_minus_rv_adaptive_enabled: true
iv_minus_rv_min: "0"
iv_minus_rv_percentile: "0.25"
iv_minus_rv_window_target_days: 60
iv_minus_rv_window_min_days: 30
# Vol-of-Vol guard: blocca entry su shift bruschi DVOL.
vol_of_vol_guard_enabled: true
vol_of_vol_threshold_pt: "5"
vol_of_vol_lookback_hours: 24
structure:
dte_target: 18
@@ -78,6 +95,13 @@ structure:
distance_otm_pct_min: "0.15"
distance_otm_pct_max: "0.25"
# §3.2 (A): step-function delta-target per regime DVOL.
# DVOL bassa (≤50) → più premio; alta (>70) → più safety.
delta_by_dvol:
- {dvol_under: "50", delta_target: "0.15", delta_min: "0.13", delta_max: "0.17"}
- {dvol_under: "70", delta_target: "0.12", delta_min: "0.10", delta_max: "0.15"}
- {dvol_under: "90", delta_target: "0.10", delta_min: "0.08", delta_max: "0.12"}
spread_width:
target_pct_of_spot: "0.04"
min_pct_of_spot: "0.03"
@@ -97,8 +121,8 @@ sizing:
# Le tre leve dominanti:
cap_per_trade_eur: "800" # era 200 → 4×
cap_aggregate_open_eur: "3200" # era 1000 → 4× (proporzionato a 2 posizioni × cap_per_trade × 2 ruote)
max_concurrent_positions: 2 # era 1
cap_aggregate_open_eur: "6400" # 8 posizioni concorrenti × 800 cap_per_trade (entry daily)
max_concurrent_positions: 8 # era 2 (entry daily × DTE 14-21 × pass-rate)
max_contracts_per_trade: 16 # era 4 → 4×
dvol_adjustment:
@@ -116,6 +140,14 @@ exit:
delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05"
# §7-bis (D): vol-harvest abilitato a 15 punti vol di crollo.
vol_harvest_dvol_decrease: "15"
# §7.1bis (C): scala graduata di profit-take. Pipeline runtime
# non ancora attiva; tenuta vuota fino al merge della
# partial-close pipeline.
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30
@@ -124,6 +156,15 @@ exit:
- "CLOSE_VOL"
- "CLOSE_DELTA"
# §7-bis (F): circuit breaker abilitato. Soglia 15% (più tollerante
# del default conservativo perché la size aggressiva ha volatilità
# attesa più alta).
auto_pause:
enabled: true
lookback_trades: 5
max_drawdown_pct: "0.15"
pause_days: 14
execution:
environment: "testnet"
eur_to_usd: "1.075"
+18 -5
View File
@@ -15,8 +15,8 @@
# cerbero-bite config hash --file strategy.conservativa.yaml
# e bumpare config_version.
config_version: "1.0.0-conservativa"
config_hash: "eff824281bbb538fba49434d8cc4b9c37675bc73d60e351293e263cc7e7b29ef"
config_version: "1.4.0-conservativa"
config_hash: "b6af7b041508a67846eba5985e27e655526fe89105653f86bc88b8a4a437ac3a"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -25,7 +25,7 @@ asset:
exchange: "deribit"
entry:
cron: "0 14 * * MON"
cron: "0 14 * * *"
skip_holidays_country: "IT"
capital_min_usd: "720"
@@ -49,6 +49,10 @@ entry:
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9). Disabilitato di default.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure:
dte_target: 18
@@ -80,8 +84,8 @@ sizing:
kelly_fraction: "0.13"
cap_per_trade_eur: "200"
cap_aggregate_open_eur: "1000"
max_concurrent_positions: 1
cap_aggregate_open_eur: "1000" # 3 posizioni × ~200 + headroom (entry daily, profilo conservativo)
max_concurrent_positions: 3
max_contracts_per_trade: 4
@@ -100,6 +104,9 @@ exit:
delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05"
vol_harvest_dvol_decrease: "0"
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30
@@ -108,6 +115,12 @@ exit:
- "CLOSE_VOL"
- "CLOSE_DELTA"
auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_days: 14
execution:
environment: "testnet"
eur_to_usd: "1.075"
+23 -4
View File
@@ -6,8 +6,8 @@
# config hash), and lands as a separate commit with the motivation in
# the commit message.
config_version: "1.0.0"
config_hash: "4c2be4c51c849ed58fa22ec2b302016c453894dd0964b6d05445ab1b723e2d10"
config_version: "1.4.0"
config_hash: "22182814216190331e0b69b3bc99493e6d69cc813f7ed937394986eecc1f5d11"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -16,7 +16,7 @@ asset:
exchange: "deribit"
entry:
cron: "0 14 * * MON"
cron: "0 14 * * *"
skip_holidays_country: "IT"
capital_min_usd: "720"
@@ -45,6 +45,10 @@ entry:
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9). Disabilitato di default.
iv_minus_rv_min: "0"
iv_minus_rv_filter_enabled: false
structure:
dte_target: 18
@@ -77,7 +81,7 @@ sizing:
cap_per_trade_eur: "200"
cap_aggregate_open_eur: "1000"
max_concurrent_positions: 1
max_concurrent_positions: 5
max_contracts_per_trade: 4
@@ -96,6 +100,13 @@ exit:
delta_breach_threshold: "0.30"
adverse_move_4h_pct: "0.05"
# §7-bis (D): vol-collapse harvest. 0 = disabilitato.
vol_harvest_dvol_decrease: "0"
# §7.1bis (C): scala graduata di profit-take. Vuoto = chiusura
# atomica. Pipeline runtime non ancora attiva (hook futuro).
profit_take_partial_levels: []
monitor_cron: "0 2,14 * * *"
user_confirmation_timeout_min: 30
@@ -104,6 +115,14 @@ exit:
- "CLOSE_VOL"
- "CLOSE_DELTA"
# §7-bis (F): circuit breaker su drawdown rolling. Disabilitato di
# default — abilitarlo solo dopo abbastanza posizioni chiuse.
auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_days: 14
execution:
environment: "testnet" # testnet|mainnet — kill switch on broker mismatch
eur_to_usd: "1.075" # default FX rate for sizing engine; override at boot
+10
View File
@@ -118,6 +118,16 @@ def _wire_market_snapshot(
},
is_reusable=True,
)
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/get_realized_vol",
json={
"currency": "ETH",
"realized_vol_pct": {"14d": 30.0, "30d": 30.0},
"iv_current_pct": 38.0,
"iv_minus_rv_pct": {"14d": 8.0, "30d": 8.0},
},
is_reusable=True,
)
httpx_mock.add_response(
url="http://mcp-sentiment:9014/tools/get_liquidation_heatmap",
json={
@@ -0,0 +1,136 @@
"""End-to-end test del gate IV-RV adattivo + Vol-of-Vol guard via Repository.
Verifica che la nuova API distinct-days componga correttamente repository
helpers + ``compute_adaptive_threshold``.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.state.db import connect, run_migrations
from cerbero_bite.state.models import MarketSnapshotRecord
from cerbero_bite.state.repository import Repository
def _seed_history(
conn,
repo: Repository,
asset: str,
base: datetime,
n_ticks: int,
iv_rv_value: Decimal,
dvol_value: Decimal,
) -> None:
for i in range(n_ticks):
repo.record_market_snapshot(
conn,
MarketSnapshotRecord(
timestamp=base + timedelta(minutes=15 * i),
asset=asset,
spot=Decimal("2000"),
dvol=dvol_value,
realized_vol_30d=Decimal("48"),
iv_minus_rv=iv_rv_value,
funding_perp_annualized=Decimal("0"),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("0"),
gamma_flip_level=None,
oi_delta_pct_4h=None,
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=None,
fetch_ok=True,
fetch_errors_json=None,
),
)
conn.commit()
@pytest.fixture
def db_30d(tmp_path):
"""30 giorni di storia con IV-RV bimodale: prima metà 1.0, seconda metà 5.0."""
db_path = tmp_path / "e2e.sqlite"
conn = connect(str(db_path))
run_migrations(conn)
repo = Repository()
base = datetime(2026, 4, 1, 0, 0, tzinfo=UTC)
_seed_history(conn, repo, "ETH", base, 1440, Decimal("1.0"), Decimal("50"))
_seed_history(
conn,
repo,
"ETH",
base + timedelta(days=15),
1440,
Decimal("5.0"),
Decimal("50"),
)
return conn, repo
def test_distinct_days_count_matches_calendar_days(db_30d) -> None:
"""30 giorni di calendario seedati → COUNT DISTINCT = 30."""
conn, repo = db_30d
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 1, 0, 0, tzinfo=UTC),
)
assert n == 30
def test_window_values_returned_for_full_history(db_30d) -> None:
conn, repo = db_30d
values = repo.iv_rv_values_for_window(
conn,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 1, 0, 0, tzinfo=UTC),
)
assert len(values) == 2880
# Bimodale: 1440 valori 1.0 e 1440 valori 5.0
assert sum(1 for v in values if v == Decimal("1.0")) == 1440
assert sum(1 for v in values if v == Decimal("5.0")) == 1440
def test_p25_of_bimodal_history_picks_low_regime(db_30d) -> None:
"""Comporre repository + adaptive_threshold come fa entry_cycle."""
conn, repo = db_30d
as_of = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
n_days = repo.count_iv_rv_distinct_days(
conn, asset="ETH", max_days=60, as_of=as_of
)
values = repo.iv_rv_values_for_window(
conn, asset="ETH", window_days=60, as_of=as_of
)
threshold = compute_adaptive_threshold(
history=values,
n_days=n_days,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# P25 di 2880 valori bimodali: 1440 ×1.0, 1440 ×5.0 → soglia = 1.0
assert threshold == Decimal("1.0")
def test_dvol_lookback_within_tolerance(db_30d) -> None:
conn, repo = db_30d
base = datetime(2026, 4, 1, 0, 0, tzinfo=UTC)
out = repo.dvol_lookback(conn, asset="ETH", reference=base + timedelta(hours=24))
assert out == Decimal("50")
def test_dvol_lookback_returns_none_outside_tolerance(db_30d) -> None:
conn, repo = db_30d
out = repo.dvol_lookback(
conn,
asset="ETH",
reference=datetime(2025, 1, 1, tzinfo=UTC),
tolerance_minutes=15,
)
assert out is None
+170
View File
@@ -0,0 +1,170 @@
"""TDD per :mod:`cerbero_bite.core.adaptive_threshold`.
Spec: ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``.
La funzione è una pura statistica: riceve già la finestra di valori scelta
dal caller e il numero di giorni distinti coperti dalla storia disponibile
(``n_days``), e restituisce ``max(percentile, floor)`` o ``None`` durante
il warmup hard. La selezione della finestra (target_days vs min_days vs
intera storia) è responsabilità del caller (repository + entry_cycle).
"""
from __future__ import annotations
from decimal import Decimal
import pytest
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
# ---------------------------------------------------------------------------
# Warmup hard: nessun giorno disponibile
# ---------------------------------------------------------------------------
def test_n_days_zero_returns_none() -> None:
"""Storia vuota o nessun giorno coperto → warmup hard."""
out = compute_adaptive_threshold(
history=[],
n_days=0,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
def test_n_days_zero_with_values_still_returns_none() -> None:
"""Difensivo: se il caller passa n_days=0 ma valori non vuoti, warmup
hard vince comunque (gate disabilitato)."""
out = compute_adaptive_threshold(
history=[Decimal("3")] * 10,
n_days=0,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
def test_empty_history_with_positive_n_days_returns_none() -> None:
"""Difensivo: history vuota anche con n_days>0 → None."""
out = compute_adaptive_threshold(
history=[],
n_days=5,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
# ---------------------------------------------------------------------------
# Calcolo percentile sulla finestra ricevuta
# ---------------------------------------------------------------------------
def test_n_days_one_returns_percentile_of_history() -> None:
"""Singolo giorno con tick a 15 min (96 valori): P25 standard."""
history = [Decimal(i) / Decimal("10") for i in range(96)] # 0.0..9.5
out = compute_adaptive_threshold(
history=history,
n_days=1,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# P25 di [0.0..9.5] passo 0.1 con method='linear': k=23.75, val ≈ 2.375
assert out is not None
assert Decimal("2.3") < out < Decimal("2.5")
def test_window_chosen_by_caller_is_used_verbatim() -> None:
"""La funzione NON fa slicing: usa esattamente la history ricevuta."""
history = [Decimal(i) for i in range(1, 201)] # 1..200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.5"),
absolute_floor=Decimal("0"),
)
# P50 di [1..200] = (200+1)/2 = 100.5
assert out is not None
assert Decimal("100") <= out <= Decimal("101")
def test_mixed_cadence_window_no_special_treatment() -> None:
"""Mix di valori (es. backfill daily + tick live) trattato come una
distribuzione qualunque: il caller ha già scelto la finestra; la
funzione calcola il percentile sui valori ricevuti uno-a-uno."""
# 30 valori "daily backfill" (uno per giorno) + 96 tick "live"
history = [Decimal("5")] * 30 + [Decimal("8")] * 96
out = compute_adaptive_threshold(
history=history,
n_days=31,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# Sorted: 30 ×5, 96 ×8. P25 a indice 0.25*125 = 31.25 → tra 5 e 8.
# NumPy linear: sorted_v[31]=8, sorted_v[32]=8 → 8.
# Verifica solo l'estremo superiore della famiglia di valori sorted.
assert out is not None
assert out in (Decimal("5"), Decimal("8"))
# ---------------------------------------------------------------------------
# Floor binding
# ---------------------------------------------------------------------------
def test_floor_binding_overrides_low_percentile() -> None:
history = [Decimal("0.5")] * 200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.25"),
absolute_floor=Decimal("3"),
)
assert out == Decimal("3")
def test_floor_not_binding_returns_percentile() -> None:
history = [Decimal("5")] * 200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out == Decimal("5")
# ---------------------------------------------------------------------------
# Input validation
# ---------------------------------------------------------------------------
def test_invalid_percentile_above_one_raises() -> None:
with pytest.raises(ValueError, match="percentile must be in"):
compute_adaptive_threshold(
history=[Decimal("1")] * 200,
n_days=10,
percentile=Decimal("1.5"),
absolute_floor=Decimal("0"),
)
def test_invalid_percentile_negative_raises() -> None:
with pytest.raises(ValueError, match="percentile must be in"):
compute_adaptive_threshold(
history=[Decimal("1")] * 200,
n_days=10,
percentile=Decimal("-0.1"),
absolute_floor=Decimal("0"),
)
def test_invalid_negative_n_days_raises() -> None:
with pytest.raises(ValueError, match="n_days must be >= 0"):
compute_adaptive_threshold(
history=[Decimal("1")] * 10,
n_days=-1,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
+157
View File
@@ -0,0 +1,157 @@
"""TDD per :mod:`cerbero_bite.runtime.auto_pause` (§7-bis F)."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.config.schema import AutoPauseConfig
from cerbero_bite.runtime.auto_pause import (
evaluate_drawdown_breach,
is_paused,
pause_until,
)
from cerbero_bite.state.models import SystemStateRecord
_NOW = datetime(2026, 5, 1, 14, 0, tzinfo=UTC)
def _state(**overrides: object) -> SystemStateRecord:
base: dict[str, object] = {
"kill_switch": 0,
"last_health_check": _NOW,
"config_version": "1.0.0",
"started_at": _NOW - timedelta(hours=1),
}
base.update(overrides)
return SystemStateRecord(**base) # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# is_paused
# ---------------------------------------------------------------------------
def test_is_paused_returns_false_when_state_is_none() -> None:
status = is_paused(None, now=_NOW)
assert status.paused is False
def test_is_paused_returns_false_when_until_is_none() -> None:
status = is_paused(_state(), now=_NOW)
assert status.paused is False
def test_is_paused_returns_true_when_until_in_future() -> None:
status = is_paused(
_state(auto_pause_until=_NOW + timedelta(weeks=2),
auto_pause_reason="DD breach"),
now=_NOW,
)
assert status.paused is True
assert status.reason == "DD breach"
def test_is_paused_returns_false_when_until_in_past() -> None:
status = is_paused(
_state(auto_pause_until=_NOW - timedelta(seconds=1)),
now=_NOW,
)
assert status.paused is False
# ---------------------------------------------------------------------------
# pause_until
# ---------------------------------------------------------------------------
def test_pause_until_adds_days() -> None:
until = pause_until(_NOW, days=14)
assert until == _NOW + timedelta(days=14)
def test_pause_until_clamps_to_one_day_minimum() -> None:
# days <= 0 deve cmq dare almeno 1 giorno di pausa, altrimenti
# la cron giornaliera potrebbe scattare comunque.
assert pause_until(_NOW, days=0) == _NOW + timedelta(days=1)
assert pause_until(_NOW, days=-3) == _NOW + timedelta(days=1)
# ---------------------------------------------------------------------------
# evaluate_drawdown_breach
# ---------------------------------------------------------------------------
def _cfg(**overrides: object) -> AutoPauseConfig:
base: dict[str, object] = {
"enabled": True,
"lookback_trades": 5,
"max_drawdown_pct": Decimal("0.10"),
"pause_days": 14,
}
base.update(overrides)
return AutoPauseConfig(**base) # type: ignore[arg-type]
def test_drawdown_breach_when_enabled_and_threshold_exceeded() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-50"), Decimal("-60"), Decimal("-40"),
Decimal("-30"), Decimal("-20")], # cum 200 USD
capital_usd=Decimal("1500"),
)
# |200| / 1500 = 0.133 > 0.10
assert decision.should_pause is True
assert decision.reason is not None
assert "rolling DD" in decision.reason
def test_no_breach_when_filter_disabled() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(enabled=False),
recent_pnl_usd=[Decimal("-200")] * 5, # massacro
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_lookback_insufficient() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(lookback_trades=5),
recent_pnl_usd=[Decimal("-100")] * 3, # solo 3 trade, serve 5
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_cumulative_positive() -> None:
# Anche con tante perdite, se la somma è positiva non scattiamo.
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-100"), Decimal("-50"),
Decimal("300"), Decimal("-20"), Decimal("-10")],
capital_usd=Decimal("1500"),
)
assert decision.should_pause is False
def test_no_breach_when_below_threshold() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-30")] * 5, # cum 150 / 1500 = 10% esatto
capital_usd=Decimal("1500"),
)
# esattamente alla soglia (>=) ⇒ pausa armata
assert decision.should_pause is True
def test_no_breach_when_capital_zero_or_negative() -> None:
decision = evaluate_drawdown_breach(
cfg=_cfg(),
recent_pnl_usd=[Decimal("-100")] * 5,
capital_usd=Decimal("0"),
)
assert decision.should_pause is False
+176
View File
@@ -0,0 +1,176 @@
"""TDD per il backfill IV-RV (``scripts/backfill_iv_rv.py``).
Testa solo la parte pura (compute RV + assemblaggio record). I/O HTTP
e SQLite restano nel main del CLI: testati manualmente al deploy.
"""
from __future__ import annotations
import importlib.util
import sys
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[2]
def _load_backfill_module() -> object:
"""Load scripts/backfill_iv_rv.py as a module without polluting sys.path."""
spec = importlib.util.spec_from_file_location(
"_cerbero_bite_backfill_iv_rv", REPO_ROOT / "scripts" / "backfill_iv_rv.py"
)
if spec is None or spec.loader is None:
raise RuntimeError("cannot load backfill_iv_rv module")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
@pytest.fixture(scope="module")
def mod():
return _load_backfill_module()
# ---------------------------------------------------------------------------
# compute_rv30d_annualized
# ---------------------------------------------------------------------------
def test_constant_prices_yield_zero_rv(mod) -> None:
closes = [Decimal("100")] * 31 # 30 returns of log(1)=0
rv = mod.compute_rv30d_annualized(closes)
assert rv == Decimal("0")
def test_too_few_closes_raises(mod) -> None:
with pytest.raises(ValueError, match="need at least 31 closes"):
mod.compute_rv30d_annualized([Decimal("100")] * 10)
def test_monotonic_growth_yields_low_rv(mod) -> None:
"""Crescita +1% ogni giorno: log returns costanti → stdev = 0 → RV = 0."""
closes = [Decimal("100") * (Decimal("1.01") ** i) for i in range(31)]
rv = mod.compute_rv30d_annualized(closes)
# Tutti i log returns sono identici (log 1.01) → stdev zero
assert rv == Decimal("0")
def test_alternating_returns_yield_known_rv(mod) -> None:
"""Returns alternati ±2% ogni giorno: stdev nota."""
# closes: 100, 102, 100, 102, ... (ricorda: returns = log(c[i]/c[i-1]))
closes = [Decimal("100")] + [
Decimal("102") if i % 2 == 0 else Decimal("100") for i in range(30)
]
rv = mod.compute_rv30d_annualized(closes)
# |log return| ~ 0.0198, stdev ≈ 0.0198 (alternano segno con media ≈ 0)
# Annualized = 0.0198 * sqrt(365) * 100 ≈ 37.86 vol pts
assert Decimal("36") <= rv <= Decimal("40")
# ---------------------------------------------------------------------------
# build_backfill_records
# ---------------------------------------------------------------------------
def test_build_records_skips_days_without_30d_history(mod) -> None:
"""Per i primi 30 giorni della serie spot, RV30d non è calcolabile."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=40),
)
# Per ogni record day, servono 30 giorni precedenti di spot.
# Lo spot più vecchio è today-44; quindi il primo giorno computabile
# è today-44+30 = today-14. Cap a oldest_day=today-40 → window day-14..day-0.
assert len(records) == 15 # day-14..day-0 incluso
for r in records:
assert r.asset == "ETH"
assert r.fetch_ok is True
assert r.iv_minus_rv == Decimal("50") # rv=0 con prezzi costanti
assert r.timestamp.tzinfo == UTC
assert r.timestamp.hour == 12
def test_build_records_filters_to_requested_window(mod) -> None:
"""oldest_day applicato come cutoff inferiore inclusivo."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="BTC",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
# day-5..day-0 → 6 record
assert len(records) == 6
record_days = {r.timestamp.date() for r in records}
assert record_days == {today - timedelta(days=i) for i in range(6)}
def test_build_records_skips_days_missing_dvol(mod) -> None:
"""Se manca DVOL per un giorno della finestra, lo si salta (no record)."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {
d.isoformat(): Decimal("50")
for d in days
if d != today - timedelta(days=2)
}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
record_days = {r.timestamp.date() for r in records}
assert today - timedelta(days=2) not in record_days
assert len(records) == 5
def test_build_records_skips_days_missing_spot(mod) -> None:
"""Se manca lo spot del giorno target, no record per quel giorno."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {
d.isoformat(): Decimal("100")
for d in days
if d != today - timedelta(days=2)
}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
record_days = {r.timestamp.date() for r in records}
assert today - timedelta(days=2) not in record_days
def test_build_records_uses_noon_utc_timestamp(mod) -> None:
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(35)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today,
)
assert len(records) == 1
assert records[0].timestamp == datetime(2026, 5, 10, 12, 0, tzinfo=UTC)
+259
View File
@@ -0,0 +1,259 @@
"""TDD per :mod:`cerbero_bite.core.backtest`."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.config import StrategyConfig, golden_config
from cerbero_bite.core.backtest import (
bs_put_delta,
bs_put_price,
daily_picks,
estimate_credit_eth,
find_strike_for_delta,
normal_cdf,
run_backtest,
simulate_entry_filters,
)
from cerbero_bite.state.models import MarketSnapshotRecord
# ---------------------------------------------------------------------------
# Black-Scholes helpers
# ---------------------------------------------------------------------------
def test_normal_cdf_known_values() -> None:
assert normal_cdf(0.0) == pytest.approx(0.5, abs=1e-6)
assert normal_cdf(1.0) == pytest.approx(0.8413, abs=1e-3)
assert normal_cdf(-1.0) == pytest.approx(0.1587, abs=1e-3)
assert normal_cdf(2.0) == pytest.approx(0.9772, abs=1e-3)
def test_bs_put_price_atm_positive_and_less_than_strike() -> None:
p = bs_put_price(spot=3000, strike=3000, t_years=18 / 365, sigma=0.50)
assert p > 0
assert p < 3000 # cap
def test_bs_put_price_far_otm_close_to_zero() -> None:
p = bs_put_price(spot=3000, strike=1500, t_years=18 / 365, sigma=0.50)
assert 0 <= p < 5 # essentially zero
def test_bs_put_delta_atm_around_minus_half() -> None:
d = bs_put_delta(spot=3000, strike=3000, t_years=18 / 365, sigma=0.50)
assert d == pytest.approx(-0.475, abs=0.05)
def test_bs_put_delta_far_otm_close_to_zero() -> None:
d = bs_put_delta(spot=3000, strike=1500, t_years=18 / 365, sigma=0.50)
assert -0.05 < d <= 0
def test_find_strike_for_delta_monotone() -> None:
spot = 3000.0
dvol = 50.0
dte = 18
s_010 = find_strike_for_delta(
spot=spot, dvol_pct=dvol, dte_days=dte, target_delta_abs=0.10,
)
s_020 = find_strike_for_delta(
spot=spot, dvol_pct=dvol, dte_days=dte, target_delta_abs=0.20,
)
# |Δ|=0.20 (più ITM) ⇒ strike più alto di |Δ|=0.10 (più OTM).
assert s_020 > s_010
# Verifica che il delta corrisponda a target ± tolleranza.
achieved = abs(
bs_put_delta(
spot=spot, strike=s_020, t_years=dte / 365, sigma=dvol / 100,
)
)
assert achieved == pytest.approx(0.20, abs=0.02)
def test_estimate_credit_returns_positive_credit_in_normal_regime() -> None:
credit_eth, short_k, long_k = estimate_credit_eth(
spot=3000, dvol_pct=50, dte_days=18, width_pct=0.04, delta_target_abs=0.12,
)
# Sanity: credit > 0, short_k < spot, long_k = short_k - 4%×spot
assert credit_eth > 0
assert short_k < 3000
assert long_k < short_k
assert short_k - long_k == pytest.approx(0.04 * 3000, abs=1.0)
# ---------------------------------------------------------------------------
# Daily picks + entry filter simulation
# ---------------------------------------------------------------------------
def _snap(
*, ts: datetime,
spot: float = 3000,
dvol: float = 50,
funding: float = 0.0,
macro_d: int | None = None,
asset: str = "ETH",
) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=ts,
asset=asset,
spot=Decimal(str(spot)),
dvol=Decimal(str(dvol)),
funding_perp_annualized=Decimal(str(funding)),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("100"),
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=macro_d,
fetch_ok=True,
)
def test_daily_picks_extracts_one_per_calendar_day() -> None:
day1 = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
day2 = datetime(2026, 5, 5, 14, 0, tzinfo=UTC) # Tuesday: PICKED ora (crypto 24/7)
snapshots = [
_snap(ts=day1),
_snap(ts=day1 + timedelta(minutes=15)), # stesso giorno, deduplicato
_snap(ts=day2),
]
picks = daily_picks(snapshots)
assert len(picks) == 2
assert picks[0].timestamp == day1
assert picks[1].timestamp == day2
def test_daily_picks_skips_other_hours() -> None:
snapshots = [
_snap(ts=datetime(2026, 5, 4, 13, 0, tzinfo=UTC)), # 13:00 → skipped
_snap(ts=datetime(2026, 5, 5, 15, 30, tzinfo=UTC)), # 15:30 → skipped
]
assert daily_picks(snapshots) == []
def test_daily_picks_filters_by_asset() -> None:
day = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [
_snap(ts=day, asset="BTC"),
_snap(ts=day, asset="ETH"),
]
picks = daily_picks(snapshots, asset="ETH")
assert len(picks) == 1
assert picks[0].snapshot.asset == "ETH"
def test_simulate_entry_filters_accepts_clean_snapshot(
) -> None:
cfg: StrategyConfig = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=50, funding=0.10)
picks = [
type("MP", (), {"timestamp": monday, "snapshot": snap})() # type: ignore[arg-type]
]
# Hack: build via real dataclass
from cerbero_bite.core.backtest import DailyPick
picks = [DailyPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert len(results) == 1
assert results[0].accepted is True
def test_simulate_entry_filters_rejects_dvol_out_of_band() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=20, funding=0.10) # below 35
from cerbero_bite.core.backtest import DailyPick
picks = [DailyPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert any("dvol" in r.lower() for r in results[0].reasons)
def test_simulate_entry_filters_skips_incomplete_snapshot() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
incomplete = MarketSnapshotRecord(
timestamp=monday, asset="ETH", spot=Decimal("3000"),
# dvol=None ⇒ skipped
fetch_ok=False,
)
from cerbero_bite.core.backtest import DailyPick
picks = [DailyPick(timestamp=monday, snapshot=incomplete)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert results[0].skipped_for_data is True
# ---------------------------------------------------------------------------
# Full pipeline (sintetico)
# ---------------------------------------------------------------------------
def _synthetic_year_of_snapshots(
*,
n_weeks: int = 8,
spot: float = 3000,
dvol: float = 60, # con skew_premium 1.5 ⇒ credit/width ≈ 35% (sopra soglia 30%)
funding: float = 0.10,
) -> list[MarketSnapshotRecord]:
"""Genera N settimane di snapshot sintetici ETH a 4 tick/settimana."""
rows: list[MarketSnapshotRecord] = []
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
for week in range(n_weeks):
base = monday + timedelta(weeks=week)
# Lunedì 14:00 è il pick
rows.append(_snap(ts=base, spot=spot, dvol=dvol, funding=funding))
# Tick intermedi che NON cadono alle 14:00:
# offset +1h (=15:00) così vengono ignorati da `daily_picks`.
for d in (2, 8, 14, 19):
rows.append(
_snap(
ts=base + timedelta(days=d, hours=1),
spot=spot * (1 + 0.005 * d), # +0.5% al giorno
dvol=dvol - 1.5 * d, # vol che scende lentamente
funding=funding,
)
)
return rows
def test_run_backtest_produces_report_with_trades() -> None:
# Per il test scaliamo il credit/width gate al 15%: il modello BS
# senza skew completo sottostima i premi OTM rispetto al reale.
# Vedi `estimate_credit_eth.skew_premium` docstring per dettagli.
from cerbero_bite.config.schema import StructureConfig
cfg = golden_config()
cfg = cfg.model_copy(
update={
"structure": StructureConfig(
**{
**cfg.structure.model_dump(),
"credit_to_width_ratio_min": Decimal("0.15"),
}
)
}
)
snapshots = _synthetic_year_of_snapshots(n_weeks=4)
report = run_backtest(snapshots, cfg, capital_usd=Decimal("1500"))
# Sanity: 4 picks, almeno 1 trade chiuso
assert report.n_picks == 4
assert report.n_completed >= 1
assert report.cumulative_pnl_usd != Decimal("0")
# Bull-put + ETH al rialzo + DVOL che scende ⇒ atteso win
assert report.n_winners >= 1
def test_run_backtest_handles_no_picks_gracefully() -> None:
cfg = golden_config()
# Solo tick infrasettimanali, niente Monday 14:00.
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [_snap(ts=monday + timedelta(hours=1))]
report = run_backtest(snapshots, cfg, capital_usd=Decimal("1500"))
assert report.n_picks == 0
assert report.n_completed == 0
assert report.cumulative_pnl_usd == Decimal("0")
+143
View File
@@ -329,3 +329,146 @@ def test_build_bear_call_breakeven_above_short_strike(
# breakeven = 3525 + 15 = 3540
assert proposal.breakeven == Decimal("3540")
assert proposal.spread_type == "bear_call"
# ---------------------------------------------------------------------------
# §3.2 (A): dynamic delta target by DVOL regime
# ---------------------------------------------------------------------------
def _cfg_with_delta_bands(cfg: StrategyConfig) -> StrategyConfig:
"""Profilo con step-function delta su DVOL.
Vol bassa (50) delta 0.15 (più premio), vol media (70)
0.12 (default), vol alta (90) 0.10 (più safety distance).
"""
from cerbero_bite.config.schema import (
DeltaByDvolBand,
ShortStrikeSpec,
StructureConfig,
)
bands = [
DeltaByDvolBand(
dvol_under=Decimal("50"),
delta_target=Decimal("0.15"),
delta_min=Decimal("0.13"),
delta_max=Decimal("0.17"),
),
DeltaByDvolBand(
dvol_under=Decimal("70"),
delta_target=Decimal("0.12"),
delta_min=Decimal("0.10"),
delta_max=Decimal("0.15"),
),
DeltaByDvolBand(
dvol_under=Decimal("90"),
delta_target=Decimal("0.10"),
delta_min=Decimal("0.08"),
delta_max=Decimal("0.12"),
),
]
new_short = ShortStrikeSpec(
**{**cfg.structure.short_strike.model_dump(), "delta_by_dvol": bands}
)
return cfg.model_copy(
update={
"structure": StructureConfig(
**{**cfg.structure.model_dump(exclude={"short_strike"}),
"short_strike": new_short}
)
}
)
def _bull_put_chain_wide(now_dt: datetime) -> list[OptionQuote]:
"""Chain con shorts e longs per delta 0.10, 0.12, 0.15.
I mid sono tarati per superare il credit/width 30% per ogni
accoppiamento shortlong testato (vedi commento §3.4).
"""
return [
# Shorts a delta 0.10 / 0.12 / 0.15 in OTM range [15-25%].
_quote(strike="2535", delta="-0.15", mid="0.026", now_dt=now_dt),
_quote(strike="2475", delta="-0.12", mid="0.020", now_dt=now_dt),
_quote(strike="2400", delta="-0.10", mid="0.015", now_dt=now_dt),
# Long candidati ~4% sotto ciascuno short.
_quote(strike="2415", delta="-0.10", mid="0.012", now_dt=now_dt),
_quote(strike="2355", delta="-0.08", mid="0.006", now_dt=now_dt),
_quote(strike="2280", delta="-0.06", mid="0.002", now_dt=now_dt),
]
def test_dynamic_delta_low_dvol_picks_higher_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=40 → banda con delta_target=0.15."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("40"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.15")
def test_dynamic_delta_mid_dvol_picks_default_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=60 → banda con delta_target=0.12."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("60"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.12")
def test_dynamic_delta_high_dvol_picks_lower_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""DVOL=85 → banda con delta_target=0.10 (più safety distance)."""
cfg_dyn = _cfg_with_delta_bands(cfg)
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg_dyn,
dvol_now=Decimal("85"),
)
assert res is not None
short, _ = res
assert short.delta == Decimal("-0.10")
def test_dynamic_delta_disabled_default_uses_static_delta(
cfg: StrategyConfig, now: datetime
) -> None:
"""delta_by_dvol vuoto (default) → comportamento invariato."""
chain = _bull_put_chain_wide(now)
res = select_strikes(
chain=chain,
bias="bull_put",
spot=Decimal("3000"),
now=now,
cfg=cfg, # golden config: delta_by_dvol=[]
dvol_now=Decimal("40"),
)
assert res is not None
short, _ = res
# Delta target statico = 0.12, quindi torna lo strike a -0.12.
assert short.delta == Decimal("-0.12")
+1 -1
View File
@@ -68,7 +68,7 @@ def test_compute_hash_is_independent_of_recorded_hash_value(tmp_path: Path) -> N
def test_load_repo_strategy_yaml(tmp_path: Path) -> None:
"""The committed strategy.yaml validates with the recorded hash."""
result = load_strategy(REPO_ROOT / "strategy.yaml")
assert result.config.config_version == "1.0.0"
assert result.config.config_version == "1.4.0"
assert result.config.sizing.kelly_fraction == Decimal("0.13")
assert result.computed_hash == result.config.config_hash
+241
View File
@@ -194,6 +194,62 @@ def test_dealer_gamma_filter_disabled_in_config(cfg: StrategyConfig) -> None:
assert decision.accepted is True
# ---------------------------------------------------------------------------
# IV richness gate (§2.9)
# ---------------------------------------------------------------------------
def _strict_iv_rv_cfg(
cfg: StrategyConfig, *, threshold: Decimal = Decimal("5")
) -> StrategyConfig:
return golden_config(
entry=EntryConfig(
**{
**cfg.entry.model_dump(),
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_min": threshold,
}
)
)
def test_iv_richness_gate_disabled_by_default_lets_thin_premium_pass(
cfg: StrategyConfig,
) -> None:
# Default config: filter disabled. Anche con IV-RV negativa (RV>IV)
# l'entry deve passare per non rompere setup pre-calibrazione.
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("-2")), cfg)
assert decision.accepted is True
def test_iv_richness_gate_blocks_when_below_floor(cfg: StrategyConfig) -> None:
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("3")), strict)
assert decision.accepted is False
assert any("IV richness" in r for r in decision.reasons)
def test_iv_richness_gate_passes_when_above_floor(cfg: StrategyConfig) -> None:
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("6")), strict)
assert decision.accepted is True
def test_iv_richness_gate_passes_at_exact_threshold(cfg: StrategyConfig) -> None:
# Soglia inclusiva: IV-RV == soglia → accettato (gate è "<", non "<=").
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=Decimal("5")), strict)
assert decision.accepted is True
def test_iv_richness_gate_skipped_when_data_missing(cfg: StrategyConfig) -> None:
# MCP irraggiungibile: best-effort skip, non bloccare l'entry per
# un problema di infrastruttura.
strict = _strict_iv_rv_cfg(cfg, threshold=Decimal("5"))
decision = validate_entry(_good_ctx(iv_minus_rv=None), strict)
assert decision.accepted is True
def test_validate_entry_accumulates_all_reasons(cfg: StrategyConfig) -> None:
decision = validate_entry(
_good_ctx(
@@ -295,3 +351,188 @@ def test_bias_zero_division_safe(cfg: StrategyConfig) -> None:
# eth_30d_ago == 0 must not crash; treat as no-bias (neutral)
ctx = _trend(eth_now="3000", eth_30d_ago="0", funding_cross="0", dvol="60", adx="15")
assert compute_bias(ctx, cfg) is None
# ---------------------------------------------------------------------------
# IV-RV adaptive gate
# ---------------------------------------------------------------------------
def _adaptive_cfg(**entry_overrides: object) -> StrategyConfig:
"""Golden config con gate adattivo abilitato di default per test."""
base_entry: dict[str, object] = {
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_adaptive_enabled": True,
"iv_minus_rv_min": Decimal("0"),
"iv_minus_rv_percentile": Decimal("0.25"),
"iv_minus_rv_window_target_days": 60,
"iv_minus_rv_window_min_days": 30,
}
base_entry.update(entry_overrides)
return golden_config(entry=base_entry)
def test_adaptive_pass_when_iv_rv_above_p25() -> None:
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 201))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("80"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is True
assert not any("IV richness" in r for r in decision.reasons)
def test_adaptive_blocks_when_iv_rv_below_p25() -> None:
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 201))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("20"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is False
assert any("IV richness" in r and "rolling" in r for r in decision.reasons)
def test_adaptive_with_n_days_zero_passes_warmup() -> None:
"""Warmup hard: nessun giorno coperto → gate skip (fail-open)."""
cfg = _adaptive_cfg()
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("0.1"),
iv_rv_history=(),
iv_rv_n_days=0,
),
cfg,
)
assert decision.accepted is True
def test_adaptive_with_floor_floor_binds_when_p25_low() -> None:
cfg = _adaptive_cfg(iv_minus_rv_min=Decimal("3"))
history = tuple(Decimal("0.5") for _ in range(200))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("1"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is False
assert any("IV richness" in r for r in decision.reasons)
def test_legacy_static_gate_still_works_when_adaptive_disabled() -> None:
cfg = golden_config(entry={
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_adaptive_enabled": False,
"iv_minus_rv_min": Decimal("3"),
})
decision = validate_entry(
_good_ctx(iv_minus_rv=Decimal("2"), iv_rv_history=(), iv_rv_n_days=0),
cfg,
)
assert decision.accepted is False
assert any("IV richness below floor" in r for r in decision.reasons)
def test_iv_minus_rv_none_skips_gate_in_both_modes() -> None:
cfg = _adaptive_cfg()
decision = validate_entry(
_good_ctx(
iv_minus_rv=None,
iv_rv_history=tuple(Decimal(i) for i in range(1, 201)),
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is True
def test_adaptive_with_n_days_one_uses_history_for_percentile() -> None:
"""Singolo giorno disponibile (cadenza qualunque): gate attivo,
soglia = P25 della finestra ricevuta. Dimostra che il warmup hard
finisce a n_days=1 (non 30 come nella vecchia implementazione)."""
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 101)) # 1..100, P25 = 25.75
# IV-RV sopra P25 → pass
pass_decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("30"),
iv_rv_history=history,
iv_rv_n_days=1,
),
cfg,
)
assert pass_decision.accepted is True
# IV-RV sotto P25 → block
block_decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("10"),
iv_rv_history=history,
iv_rv_n_days=1,
),
cfg,
)
assert block_decision.accepted is False
assert any("IV richness" in r and "rolling" in r for r in block_decision.reasons)
# ---------------------------------------------------------------------------
# Vol-of-Vol guard
# ---------------------------------------------------------------------------
def _vov_cfg(threshold: Decimal = Decimal("5")) -> StrategyConfig:
return golden_config(entry={
"vol_of_vol_guard_enabled": True,
"vol_of_vol_threshold_pt": threshold,
"vol_of_vol_lookback_hours": 24,
})
def test_vov_guard_blocks_on_large_dvol_shift() -> None:
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("56"), dvol_24h_ago=Decimal("50")), cfg
)
assert decision.accepted is False
assert any("DVOL shifted" in r for r in decision.reasons)
def test_vov_guard_passes_on_small_dvol_shift() -> None:
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("52"), dvol_24h_ago=Decimal("50")), cfg
)
assert decision.accepted is True
def test_vov_guard_passes_when_lookback_missing() -> None:
"""fail-open su gap dati: se dvol_24h_ago=None il guard non scatta."""
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("99"), dvol_24h_ago=None), cfg
)
# dvol_now=99 sarebbe oltre dvol_max=90; testiamo solo l'effetto VoV
# consultando le reasons (dvol_now potrebbe avere altre reason ma non
# quella VoV).
assert not any("DVOL shifted" in r for r in decision.reasons)
def test_vov_guard_disabled_does_nothing() -> None:
cfg = golden_config(entry={"vol_of_vol_guard_enabled": False})
decision = validate_entry(
_good_ctx(dvol_now=Decimal("55"), dvol_24h_ago=Decimal("50")), cfg
)
# Nessuna reason VoV (il delta=5 sarebbe oltre soglia se attivo)
assert not any("DVOL shifted" in r for r in decision.reasons)
+88
View File
@@ -271,3 +271,91 @@ def test_iron_condor_adverse_move_either_direction(cfg: StrategyConfig) -> None:
)
res = evaluate(snap, cfg)
assert res.action == "CLOSE_AVERSE"
# ---------------------------------------------------------------------------
# §7-bis (D): vol-collapse harvest
# ---------------------------------------------------------------------------
def _harvest_cfg(
cfg: StrategyConfig, *, threshold: str = "15"
) -> StrategyConfig:
"""Clona la golden config con la soglia di vol-harvest abilitata."""
from cerbero_bite.config import ExitConfig
return cfg.model_copy(
update={
"exit": ExitConfig(
**{
**cfg.exit.model_dump(),
"vol_harvest_dvol_decrease": Decimal(threshold),
}
)
}
)
def test_vol_harvest_disabled_by_default_does_not_fire(cfg: StrategyConfig) -> None:
# Default: vol_harvest_dvol_decrease = 0 ⇒ filtro disabilitato.
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022", # in profit (debit < credit)
dvol_at_entry="60",
dvol_now="40", # crollato di 20 punti
)
res = evaluate(snap, cfg)
assert res.action == "HOLD"
def test_vol_harvest_fires_when_dvol_collapsed_in_profit(
cfg: StrategyConfig,
) -> None:
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022", # in profit ma sopra profit_take 50%
dvol_at_entry="60",
dvol_now="42", # 18, supera la soglia 15
)
res = evaluate(snap, harvest)
assert res.action == "CLOSE_VOL_HARVEST"
assert "harvest" in res.reason
def test_vol_harvest_does_not_fire_when_in_loss(cfg: StrategyConfig) -> None:
# Anche se DVOL crolla, se siamo in perdita non vogliamo harvest:
# è una funzione di "esci con il profitto in mano", non un panico.
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.040", # debit > credit ⇒ in perdita
dvol_at_entry="60",
dvol_now="42",
)
res = evaluate(snap, harvest)
assert res.action != "CLOSE_VOL_HARVEST"
def test_vol_harvest_does_not_fire_below_threshold(cfg: StrategyConfig) -> None:
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.022",
dvol_at_entry="60",
dvol_now="50", # 10, sotto la soglia 15
)
res = evaluate(snap, harvest)
assert res.action == "HOLD"
def test_profit_take_wins_over_vol_harvest(cfg: StrategyConfig) -> None:
# Quando il profit-take è già colpito, non passiamo per vol-harvest.
harvest = _harvest_cfg(cfg, threshold="15")
snap = _snapshot(
credit_received_eth="0.030",
mark_combo_now_eth="0.014", # ≤ 50% credit ⇒ profit-take
dvol_at_entry="60",
dvol_now="42",
)
res = evaluate(snap, harvest)
assert res.action == "CLOSE_PROFIT"
+45
View File
@@ -164,3 +164,48 @@ async def test_returns_zero_for_empty_assets(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
n = await collect_market_snapshot(ctx, assets=(), now=_now())
assert n == 0
def _read_dvol_history(ctx: MagicMock) -> list[dict]:
import sqlite3
conn = connect(ctx.db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"SELECT * FROM dvol_history ORDER BY timestamp"
).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
@pytest.mark.asyncio
async def test_eth_snapshot_mirrors_into_dvol_history(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
await collect_market_snapshot(ctx, assets=("ETH", "BTC"), now=_now())
rows = _read_dvol_history(ctx)
assert len(rows) == 1
assert Decimal(str(rows[0]["dvol"])) == Decimal("55")
assert Decimal(str(rows[0]["eth_spot"])) == Decimal("3000")
@pytest.mark.asyncio
async def test_btc_only_snapshot_does_not_touch_dvol_history(
tmp_path: Path,
) -> None:
ctx = _ctx(tmp_path)
await collect_market_snapshot(ctx, assets=("BTC",), now=_now())
assert _read_dvol_history(ctx) == []
@pytest.mark.asyncio
async def test_eth_snapshot_skips_dvol_history_when_dvol_missing(
tmp_path: Path,
) -> None:
ctx = _ctx(tmp_path)
ctx.deribit.latest_dvol = AsyncMock(side_effect=RuntimeError("no dvol"))
await collect_market_snapshot(ctx, assets=("ETH",), now=_now())
# market_snapshots row still persisted, but dvol_history must stay empty
# because its schema enforces NOT NULL on dvol/eth_spot.
assert _read_dvol_history(ctx) == []
+395
View File
@@ -0,0 +1,395 @@
"""TDD per i nuovi helper repository del gate IV-RV adattivo.
Spec: distinct-days policy il caller (entry_cycle) interroga il
numero di giorni coperti separatamente dai valori della finestra,
così che cadenze miste (tick live 15min + backfill daily) restino
statisticamente coerenti.
Helpers:
* ``count_iv_rv_distinct_days(asset, max_days, as_of) -> int``
* ``iv_rv_values_for_window(asset, window_days, as_of) -> list[Decimal]``
* ``dvol_lookback`` (invariato, riusato dal Vol-of-Vol guard)
"""
from __future__ import annotations
import sqlite3
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.state.db import connect, run_migrations
from cerbero_bite.state.models import MarketSnapshotRecord
from cerbero_bite.state.repository import Repository
def _snap(
*,
ts: datetime,
asset: str = "ETH",
iv_minus_rv: Decimal | None = Decimal("2"),
fetch_ok: bool = True,
dvol: Decimal = Decimal("50"),
) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=ts,
asset=asset,
spot=Decimal("2000"),
dvol=dvol,
realized_vol_30d=Decimal("48"),
iv_minus_rv=iv_minus_rv,
funding_perp_annualized=Decimal("0"),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("0"),
gamma_flip_level=None,
oi_delta_pct_4h=None,
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=None,
fetch_ok=fetch_ok,
fetch_errors_json=None,
)
@pytest.fixture
def db_one_day(tmp_path) -> sqlite3.Connection:
"""SQLite temp con 96 tick ETH a 15min (1 giorno) e fetch_ok=1."""
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
for i in range(96):
repo.record_market_snapshot(
conn,
_snap(
ts=base + timedelta(minutes=15 * i),
iv_minus_rv=Decimal("2") + Decimal(i) / Decimal("100"),
dvol=Decimal("50") + Decimal(i) / Decimal("10"),
),
)
conn.commit()
return conn
@pytest.fixture
def db_three_days_mixed(tmp_path) -> sqlite3.Connection:
"""SQLite temp con 3 giorni ETH:
- day1 (2026-05-01): 96 tick @ 15min, valori 1..96
- day2 (2026-05-02): 1 record daily a 12:00, valore 100 (backfill style)
- day3 (2026-05-03): 4 tick orari, valori 200, 201, 202, 203
Più 1 giorno BTC isolato (per cross-asset isolation).
"""
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
day1 = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
for i in range(96):
repo.record_market_snapshot(
conn,
_snap(
ts=day1 + timedelta(minutes=15 * i),
iv_minus_rv=Decimal(i + 1),
),
)
repo.record_market_snapshot(
conn,
_snap(ts=datetime(2026, 5, 2, 12, 0, tzinfo=UTC), iv_minus_rv=Decimal("100")),
)
day3 = datetime(2026, 5, 3, 0, 0, tzinfo=UTC)
for i in range(4):
repo.record_market_snapshot(
conn,
_snap(
ts=day3 + timedelta(hours=i),
iv_minus_rv=Decimal(200 + i),
),
)
repo.record_market_snapshot(
conn,
_snap(
ts=datetime(2026, 4, 30, 0, 0, tzinfo=UTC),
asset="BTC",
iv_minus_rv=Decimal("999"),
),
)
conn.commit()
return conn
# ---------------------------------------------------------------------------
# count_iv_rv_distinct_days
# ---------------------------------------------------------------------------
def test_count_distinct_days_returns_one_for_single_day_history(db_one_day) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 1
def test_count_distinct_days_returns_zero_for_other_asset(db_one_day) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_one_day,
asset="BTC",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_counts_unique_calendar_days(
db_three_days_mixed,
) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n == 3
def test_count_distinct_days_excludes_other_assets(
db_three_days_mixed,
) -> None:
repo = Repository()
n_btc = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="BTC",
max_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n_btc == 1
def test_count_distinct_days_respects_window_cutoff(
db_three_days_mixed,
) -> None:
"""max_days=1 da as_of=2026-05-04 → cutoff=2026-05-03 → solo day3."""
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="ETH",
max_days=1,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n == 1
def test_count_distinct_days_excludes_null_iv_rv(tmp_path) -> None:
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
repo.record_market_snapshot(
conn,
_snap(ts=datetime(2026, 5, 1, 12, 0, tzinfo=UTC), iv_minus_rv=None),
)
conn.commit()
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_excludes_fetch_failed(tmp_path) -> None:
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
repo.record_market_snapshot(
conn,
_snap(
ts=datetime(2026, 5, 1, 12, 0, tzinfo=UTC),
iv_minus_rv=Decimal("99"),
fetch_ok=False,
),
)
conn.commit()
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_rejects_naive_as_of(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0), # naive
)
def test_count_distinct_days_rejects_non_positive_max_days(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="max_days must be positive"):
repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=0,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
# ---------------------------------------------------------------------------
# iv_rv_values_for_window
# ---------------------------------------------------------------------------
def test_values_for_window_returns_ordered_asc(db_one_day) -> None:
repo = Repository()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert len(values) == 96
assert values == sorted(values)
assert values[0] == Decimal("2.00")
def test_values_for_window_filters_other_asset(db_one_day) -> None:
repo = Repository()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="BTC",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert values == []
def test_values_for_window_skips_null(db_one_day) -> None:
repo = Repository()
repo.record_market_snapshot(
db_one_day,
_snap(ts=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), iv_minus_rv=None),
)
db_one_day.commit()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 3, 0, 0, tzinfo=UTC),
)
assert len(values) == 96
def test_values_for_window_skips_fetch_failed(db_one_day) -> None:
repo = Repository()
repo.record_market_snapshot(
db_one_day,
_snap(
ts=datetime(2026, 5, 3, 0, 0, tzinfo=UTC),
iv_minus_rv=Decimal("99"),
fetch_ok=False,
),
)
db_one_day.commit()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert Decimal("99") not in values
def test_values_for_window_respects_window_cutoff(
db_three_days_mixed,
) -> None:
"""window_days=1 da as_of=2026-05-04 → solo day3 (4 valori 200..203)."""
repo = Repository()
values = repo.iv_rv_values_for_window(
db_three_days_mixed,
asset="ETH",
window_days=1,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert values == [Decimal(200 + i) for i in range(4)]
def test_values_for_window_full_window(db_three_days_mixed) -> None:
"""window_days=60: tutti i valori dei 3 giorni (96 + 1 + 4 = 101)."""
repo = Repository()
values = repo.iv_rv_values_for_window(
db_three_days_mixed,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert len(values) == 101
def test_values_for_window_rejects_naive_as_of(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0),
)
def test_values_for_window_rejects_non_positive_window(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="window_days must be positive"):
repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=0,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
# ---------------------------------------------------------------------------
# dvol_lookback (regression — invariato dopo refactor)
# ---------------------------------------------------------------------------
def test_dvol_lookback_returns_closest_tick(db_one_day) -> None:
repo = Repository()
base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
target = base + timedelta(hours=12)
out = repo.dvol_lookback(
db_one_day, asset="ETH", reference=target, tolerance_minutes=15
)
# i=48 → dvol = 50 + 4.8 = 54.8
assert out == Decimal("54.8")
def test_dvol_lookback_returns_none_when_gap(db_one_day) -> None:
repo = Repository()
target = datetime(2025, 1, 1, 0, 0, tzinfo=UTC)
out = repo.dvol_lookback(
db_one_day, asset="ETH", reference=target, tolerance_minutes=15
)
assert out is None
def test_dvol_lookback_rejects_naive_reference(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.dvol_lookback(
db_one_day,
asset="ETH",
reference=datetime(2026, 5, 1, 12, 0),
)
+2 -1
View File
@@ -144,7 +144,8 @@ def test_aggregate_cap_at_975_reduces_to_one(cfg: StrategyConfig) -> None:
def test_concurrent_positions_at_cap_returns_zero(cfg: StrategyConfig) -> None:
res = compute_contracts(_ctx(capital_usd="1500", other_open_positions=1), cfg)
# Default cap = 5 (entry daily). other_open_positions=5 ⇒ cap raggiunto.
res = compute_contracts(_ctx(capital_usd="1500", other_open_positions=5), cfg)
assert res.n_contracts == 0
assert res.reason_if_zero is not None
assert "position" in res.reason_if_zero.lower()