25 Commits

Author SHA1 Message Date
root 76d1a4a32d chore(gitignore): ignore .omc/ (oh-my-claudecode session/memory dir)
Directory creata localmente dall'infrastruttura OMC per stato sessione
(project-memory.json, research/, sessions/, state/). Non è artefatto
del progetto cerbero-bite.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 09:08:00 +00:00
root e978a44bff feat(gui): Strategia pannello P/L con slider sizing + fix max_loss
Pannello "P/L atteso — Conservativa vs Aggressiva":
* Sostituiti slider Capitale/Spot con slider parametrici Cap/trade
  (EUR) + posizioni concorrenti. Il capitale richiesto viene calcolato
  in automatico via Kelly-binding aggregato:
  capital = cap_pertrade_usd × concorrenza / max(kelly, 1e-3).
* Profili Conservativa/Aggressiva ora ereditano dai yaml SOLO le leve
  qualitative (width_pct, credit_ratio, kelly_fraction, feature
  attive); le leve di sizing (cap, concorrenza) sono comandate dagli
  slider per confronti omogenei.
* Tre metriche header: capitale richiesto, cap aggregato notional,
  cap per trade USD.

Fix in `_compute_pl`:
* Max loss per contratto era `width` (errato per credit spread).
  Corretto a `width − credit` allineato a core/sizing_engine.py.
  Effetto: n_kelly aumenta proporzionalmente al credit incassato →
  P/L stimato più realistico per spread con credit_to_width_ratio
  alto (es. 0.30+ in profilo Aggressiva).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 09:00:42 +00:00
root efa829f7aa feat(runtime): orchestrator option-chain snapshot multi-asset (ETH+BTC)
Sostituisce `option_chain_asset: str = "ETH"` con
`option_chain_assets: tuple[str, ...] = ("ETH", "BTC")` e itera nel
job schedulato. Coerente con `market_snapshot_assets` già multi-asset
e con i 64 strikes BTC + 51 strikes ETH già visibili in
option_chain_snapshots.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 09:00:24 +00:00
root b1836d91c2 refactor(core): IV-RV adattivo distinct-days policy + backfill Deribit
Sblocca il warmup hard del gate IV-RV adattivo (~21 giorni residui)
permettendo di mischiare cadenze diverse (tick live 15min + backfill
giornaliero) senza assumere il fattore costante 96 tick/giorno.

API change (no backwards-compat shims):
* compute_adaptive_threshold(history, *, n_days, percentile,
  absolute_floor): rimossi `min_days`/`target_days`. La selezione
  finestra (target_days/min_days/intera storia) si sposta al caller.
  Warmup hard quando `n_days == 0`.
* repository: rimosso `iv_rv_history`; aggiunti
  `count_iv_rv_distinct_days` (COUNT DISTINCT substr(ts,1,10)) e
  `iv_rv_values_for_window`.
* EntryContext aggiunge `iv_rv_n_days: int = 0`. entry_cycle calcola
  n_days, sceglie window_days e popola il context. Audit
  `iv_rv_n_days` reale (non più len/96).
* GUI Calibrazione: counter giorni distinti tramite set di date.
* Spec aggiornata con errata 2026-05-10 e nuova warmup table.

Backfill (scripts/backfill_iv_rv.py, stdlib-only):
* Fetch DVOL daily + ETH/BTC-PERPETUAL closes da Deribit public REST.
* Calcolo RV30d annualizzato (stdev log-return × √365 × 100).
* INSERT OR REPLACE in market_snapshots con timestamp 12:00 UTC e
  fetch_errors_json='{"backfill":true}' per distinzione audit.
* Compute layer testato (9 test): RV su prezzi costanti/monotoni/
  alternati, build_records con cutoff e missing data.

Verifica live post-deploy (10 mag 2026 08:50 UTC):
* ETH: n_days=46, P25=2.21 vol pt, IV-RV=10.05 → gate PASS
* BTC: n_days=46, P25=5.69 vol pt, IV-RV=8.60  → gate PASS

509 test passati (500 esistenti + 9 backfill), ruff pulito.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 08:52:05 +00:00
root 6f4f2ce02e feat(runtime): audit log include threshold rolling e window usata
Risponde al final review (spec §6.4): il decisions log ora
contiene iv_rv_threshold_used (la soglia P_q effettivamente
applicata) e iv_rv_window_used_days (giorni di history nella
finestra). Permette ricostruire ex-post perché un trade è stato
saltato e con quali numeri.

Helper privi di I/O — la soglia viene ricomputata in base alla
history già caricata, costo trascurabile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:27:40 +00:00
root d2ff29fde3 fix(config): aggressiva config_hash ricalcolato post adaptive gate
L'hash dichiarato non rispecchiava più il contenuto del file dopo
l'attivazione del gate adattivo (commit 080acf8). Senza questo fix
il loader sollevava ConfigHashError e l'orchestrator rifiutava il
profilo Aggressiva al boot.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:26:24 +00:00
root eb0662e44d chore(lint): adaptive_threshold imports + entry_validator top-level import
- Sequence importato da collections.abc invece di typing (PYI001).
- compute_adaptive_threshold spostato a top-level (PLC0415):
  niente circular dep risk perché adaptive_threshold non importa
  da entry_validator.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:20:54 +00:00
root 64f4d4e09e feat(gui): pannello informativo Gate IV-RV adattivo in Calibrazione
Mostra status (warmup/attivo), soglia P25 rolling corrente, IV-RV
ultimo tick, floor assoluto, decisione hypothetical e sezione
Vol-of-Vol guard. Read-only: i percentili statici esistenti
restano per analisi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:14:34 +00:00
root 080acf829d feat(config): profilo Aggressiva attiva gate IV-RV adattivo + VoV
P25 rolling 60g, warmup a finestra disponibile, VoV guard 5pt.
Conservativa e golden invariati (default disabled).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:11:46 +00:00
root 98111814d2 test(integration): IV-RV adaptive gate end-to-end con SQLite reale
Verifica integrazione tra Repository.iv_rv_history,
compute_adaptive_threshold e dvol_lookback su un DB reale
seedato con 30 giorni di market_snapshots bimodale.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:09:27 +00:00
root 3190764f64 feat(runtime): entry_cycle popola iv_rv_history e dvol_24h_ago
Quando i flag adaptive_enabled / vol_of_vol_guard_enabled sono
attivi, entry_cycle carica history e lookback dal repository
prima di costruire EntryContext. Il decisions log riceve i meta
n_history e dvol_24h_ago per audit ex-post.

Quando i flag sono off, niente query DB extra (zero overhead).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:04:19 +00:00
root 8221aba10f fix(state): repository iv_rv_history time-stable + input validation
Risponde alla code review di 395191e:
- iv_rv_history accetta as_of (default now UTC) invece di
  affidarsi al clock SQLite, rendendo i test time-stable.
- Valida max_days > 0 e raise se as_of/reference sono naive.
- Aggiunge 3 test sulle nuove guard.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 23:00:06 +00:00
root 395191ea13 feat(state): Repository.iv_rv_history + dvol_lookback per gate adaptive
Due nuovi metodi che leggono market_snapshots filtrando NULL e
fetch_ok=0. iv_rv_history limita a max_days; dvol_lookback trova
il tick più vicino a un istante con tolerance configurabile.

Tests: ordered ASC, asset filter, NULL skip, fetch_ok=0 skip,
lookback closest, gap returns None.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:53:19 +00:00
root d36cdff609 feat(core): Vol-of-Vol guard in validate_entry + tests
Blocca entry se |DVOL_now - DVOL_24h_ago| >= threshold (default
5 pt). Fail-open quando dvol_24h_ago è None (gap dati). Independente
dal gate IV-RV: i due gate sono additivi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:36:59 +00:00
root 3a5cf2554b feat(core): IV-RV adaptive gate in validate_entry + tests
Quando iv_minus_rv_adaptive_enabled=True, la soglia diventa
max(P_q rolling, iv_minus_rv_min). Path legacy (statico) e
None-bypass restano invariati.

Aggiunge anche due model_validator a StrategyConfig per
fail-fast su config invalida (window_min_days < target_days,
percentile in (0,1)) — risponde alla code review T1.

Tests: pass/skip su rolling, warmup hard, floor binding,
backwards compat statico, None bypass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:29:48 +00:00
root ef3c512684 feat(core): EntryContext aggiunge iv_rv_history e dvol_24h_ago
Campi opzionali con default vuoto/None per non rompere i caller
esistenti. Saranno popolati da entry_cycle quando i flag
adaptive_enabled / vol_of_vol_guard_enabled sono True.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:24:43 +00:00
root 6eff8aab0f fix(core): adaptive_threshold input validation + boundary tests
Risponde alla code review di 7dc2fda:
- Valida percentile in [0,1] e 0 < min_days < target_days, raise
  ValueError quando out-of-range. Fail-fast invece di IndexError o
  silent wrong result.
- Aggiunge test boundary esattamente a min_days*96 e target_days*96
  (spec §9.1 item 9 era mancante).
- Aggiunge 4 test sulle nuove guards.
- Fix typo docstring (Determinismic → Deterministic).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 22:11:17 +00:00
root 7dc2fda524 feat(core): compute_adaptive_threshold pure function + tests
Implementa il calcolo del percentile rolling con warmup,
transizione min_days → target_days e floor assoluto. Pure
function senza I/O: il caller passa la sequenza pre-filtrata
(NULL e fetch_ok=0 esclusi).

Tests: warmup, transizione finestra, floor, percentili.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 21:36:50 +00:00
root 0fcfff7d7e feat(config): EntryConfig campi adaptive IV-RV gate + VoV guard
Aggiunge i flag e i parametri per il gate IV-RV adattivo (P25
rolling) e per il Vol-of-Vol guard. Default disabilitati per
non cambiare comportamento dei profili attuali.

Vedi docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 20:10:07 +00:00
root f889258952 docs(plans): IV-RV adaptive gate implementation plan
Piano TDD bite-sized in 11 task con steps dettagliati, codice
completo, comandi e expected output. Coverage completa dello
spec 2026-05-08-iv-rv-adaptive-gate-design.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 19:58:32 +00:00
root 2a4a82c8ef docs(specs): IV-RV adaptive gate design
Spec del gate IV-RV adattivo (P25 rolling 60g + Vol-of-Vol guard 5pt
24h) — riprende roadmap §4-quater di 13-strategia-spiegata.md punti
1 e 2 e li promuove a design pronto per implementazione.

Decisioni emerse dal brainstorming:
- Hybrid (percentile rolling + VoV guard), non regime detection
- Window target 60g, min 30g, sotto usa storia disponibile (warmup)
- Floor assoluto via vecchio iv_minus_rv_min (backwards compat)
- Inline nel validator, stateless, no DB cache
- GUI Calibrazione: pannello informativo, slider esistenti invariati
- Fail-open su tutti i casi di dato mancante

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 19:50:56 +00:00
root 467c8952e3 docs(migrations): 0005 — commento allineato a cadenza */15 reale
Il commento dichiarava cron settimanale (55 13 * * MON) ma lo
scheduler reale (orchestrator._CRON_OPTION_CHAIN_SNAPSHOT) è */15
24/7, allineato a market_snapshot. Aggiornato per evitare confusione
nei lettori futuri. Anche fixato l'header file (0004 → 0005).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 16:37:31 +00:00
root 3aaa059417 fix(deribit): DVOL midnight — finestra 1D estesa a ieri+oggi
Alle 00:00 UTC Deribit non ha ancora costruito il candle 1D di oggi:
con start_date=oggi la response è vuota e il client tirava
McpDataAnomalyError ('neither latest nor candles'). Includendo ieri
nello start_date, candles[-1] resta valido come fallback.

Verificato sui dati raccolti: 3 fail consecutivi 2026-05-02/03/04 a
00:00 UTC su ETH, zero fail dal 2026-05-05 in poi (container
rebuildato in mezzo al periodo).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 16:37:23 +00:00
root a2e7a78f8a feat(data): mirror ETH spot+DVOL in dvol_history dal market_snapshot
Popola dvol_history dentro la stessa transazione di market_snapshots,
così lo storico è disponibile anche in modalità data-only (STRATEGY=false).
Evita il warm-up vuoto di return_4h quando si abilita la strategia: il
monitor_cycle trova subito i campioni locali invece di dipendere dal
fallback Deribit get_historical.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 21:59:27 +00:00
root 6ff021fbf4 feat(strategy): abbandono gating settimanale — entry daily 24/7
Crypto opera 24/7: la cadenza settimanale lunedì-only era un retaggio
TradFi senza giustificazione. La nuova cadenza è giornaliera (cron
0 14 * * *), con i gate quantitativi a decidere se entrare o saltare.

Cambiamenti principali:

* runtime/orchestrator.py — _CRON_ENTRY 0 14 * * * (era MON)
* runtime/auto_pause.py — pause_until(days=) (era weeks=); minimo
  clamp 1 giorno (era 1 settimana)
* core/backtest.py — MondayPick→DailyPick, monday_picks→daily_picks
  (1 pick per calendar-day all'ora target); Sharpe annualization su
  ~120 trade/anno (era 52)
* config/schema.py — default cron daily; max_concurrent_positions 1→5;
  AutoPauseConfig.pause_weeks→pause_days, default 14
* runtime/option_chain_snapshot_cycle.py + orchestrator — cron */15
  per accumulo continuo dataset di backtest empirico

Strategy yamls (config_version 1.3.0 → 1.4.0, hash rigenerati):

* strategy.yaml — max_concurrent 1→5, cap_aggregate coerente
* strategy.aggressiva.yaml — max_concurrent 2→8, cap_aggregate
  3200→6400, max_contracts_per_trade invariato a 16
* strategy.conservativa.yaml — max_concurrent 1→3
* tutti — pause_weeks→pause_days: 14

GUI (pages/7_📚_Strategia.py):

* slider Trade/anno: range 20-200 (era 8-30), default 110, help
  riallineato sulla math 365 candidature × pass-rate 30-40%
* card profili: versione letta dinamicamente da config_version invece
  che hard-coded "v1.2.0"
* warning "entrambi perdono soldi" ora valuta i P/L effettivi
  (cons['annual_pl'], aggr['annual_pl']) invece del win_rate grezzo;
  aggiunto stato intermedio quando solo conservativo è in perdita

Tests (450/450 passati):

* test_auto_pause: pause_days, clamp ≥1 giorno
* test_backtest: rinomina + ridisegno daily picks (assert su
  calendar-day dedupe e hour filter)
* test_sizing_engine: other_open_positions=5 per cap default
* test_config_loader: version 1.4.0

Docs (README + 9 file in docs/) — tutti i riferimenti weekly/lunedì
allineati a daily/24-7, volume option_chain ricalcolato per cron
*/15 (~1.1 MB/giorno, ~400 MB/anno).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 16:21:16 +00:00
43 changed files with 4116 additions and 218 deletions
+3
View File
@@ -43,3 +43,6 @@ data/
.env
.env.*
!.env.example
# Tooling state (oh-my-claudecode session/memory dir)
.omc/
+5 -2
View File
@@ -15,7 +15,10 @@ attiva, sizing Quarter Kelly e disciplina di uscita rigida.
- **Gestione attiva:** profit take 50% credito, stop loss 1.5× credito,
vol stop +10 punti DVOL, time stop 7 DTE, exit su short strike testato
(|delta| ≥ 0.30). Su ETH **non si difende rollando**: si esce.
- **Frequenza:** apertura ogni 7 giorni, una posizione alla volta.
- **Frequenza:** candidatura giornaliera (cron `0 14 * * *`, crypto è 24/7),
fino a **5 posizioni concorrenti** sul profilo principale (3 sul
conservativo, 8 sull'aggressivo). I gate quantitativi decidono se entrare;
nei giorni in cui falliscono, niente trade.
Il sistema è **deterministico**: nessun LLM partecipa al decision loop.
Le regole sono codificate, le soglie sono parametri di configurazione,
@@ -48,7 +51,7 @@ leggere in ordine per chi implementa.
| `docs/03-algorithms.md` | Specifiche dettagliate dei sette algoritmi core |
| `docs/04-mcp-integration.md` | Mappa dei tool MCP usati e contratti |
| `docs/05-data-model.md` | Schema persistenza posizioni, log, KB |
| `docs/06-operational-flow.md` | Flussi operativi: avvio, settimanale, monitoring |
| `docs/06-operational-flow.md` | Flussi operativi: avvio, entry daily, monitoring |
| `docs/07-risk-controls.md` | Kill switch, cap, dead-man, audit |
| `docs/08-testing-validation.md` | TDD, paper trading, golden tests |
| `docs/09-development-roadmap.md` | Fasi di sviluppo e milestone |
+1 -1
View File
@@ -36,7 +36,7 @@ per imparare, ma **non sta nel loop di esecuzione**.
### Cosa fa Cerbero Bite
1. Legge dati di mercato dagli MCP (Deribit, Hyperliquid, sentiment, macro).
2. Valuta condizioni di entrata su finestra temporale fissa (settimanale).
2. Valuta condizioni di entrata su finestra temporale fissa (giornaliera, 14:00 UTC; crypto è 24/7).
3. Calcola la struttura ottimale dello spread secondo le regole.
4. Verifica liquidità, cap di rischio, calendar macro.
5. Calcola sizing in contratti.
+11 -9
View File
@@ -19,10 +19,12 @@ Sorgente teorica: `Cerbero_Office/NewStrategy/strategia-credit-spread-eth.md`
## 2. Trigger di apertura (entry)
Il rule engine valuta l'apertura di un nuovo trade **una sola volta al
giorno**, alle **14:00 UTC** del lunedì (orario UE pomeridiano stabile,
fuori dai picchi di funding statunitensi). Se il lunedì è festività
italiana, l'engine ignora la regola e attende il lunedì successivo.
Il rule engine valuta l'apertura di un nuovo trade **una volta al
giorno**, alle **14:00 UTC** (orario UE pomeridiano stabile, fuori
dai picchi di funding statunitensi). Crypto è 24/7: non c'è un "giorno
buono" intrinseco, sono i gate quantitativi a decidere se entrare o
saltare. Se il giorno è festività italiana e `skip_holidays_country`
è attivo, l'engine attende il giorno successivo.
Una nuova posizione viene aperta **solo se tutte** le seguenti condizioni
sono vere:
@@ -70,7 +72,7 @@ Razionale: il selling vol nudo è strutturalmente neutro a win-rate
per il razionale completo.
Se anche **una sola** condizione fallisce → **no entry**, log con motivo,
ritento la settimana successiva.
ritento il giorno successivo.
## 3. Selezione struttura
@@ -256,13 +258,13 @@ ogni entry-cycle:
> Se `auto_pause.enabled` e P/L cumulato delle ultime
> `lookback_trades` posizioni chiuse < `max_drawdown_pct ×
> capitale_corrente`, l'engine si auto-mette in pausa per
> `pause_weeks` settimane (skip-week mode).
> `pause_days` giorni (skip-day mode).
Difende dai regime change non rilevati dai filtri quant. La pausa
si annulla automaticamente alla scadenza, oppure manualmente con
`UPDATE system_state SET auto_pause_until = NULL`. Default
disabilitato; profilo aggressivo: lookback 5 trade, soglia 15%, 2
settimane di pausa.
disabilitato; profilo aggressivo: lookback 5 trade, soglia 15%, 14
giorni di pausa.
## 8. Esecuzione di apertura
@@ -296,7 +298,7 @@ settimane di pausa.
durante un trade aperto).
- Non aggiusta strike o size dopo l'apertura.
- Non apre nuovi trade per "compensare" perdite recenti.
- Non opera fuori dalla finestra del lunedì 14:00 UTC, eccetto chiusure.
- Non opera fuori dalla finestra delle 14:00 UTC, eccetto chiusure.
- Non deroga ai cap nemmeno per "opportunità eccezionali".
- Non si aggiorna automaticamente: nuovo set di regole = nuovo deploy
con review esplicita.
+1 -1
View File
@@ -122,7 +122,7 @@ Cerbero_Bite/
│ │ ├── lockfile.py # fcntl.flock single-instance
│ │ ├── alert_manager.py # severity routing
│ │ ├── health_check.py # ping + 3-strikes kill switch
│ │ ├── entry_cycle.py # weekly entry auto-execute
│ │ ├── entry_cycle.py # daily entry auto-execute (crypto 24/7)
│ │ ├── monitor_cycle.py # 12h exit auto-execute
│ │ └── recovery.py # state reconcile al boot
│ ├── state/ # persistenza
+10 -7
View File
@@ -220,11 +220,12 @@ NULL = engine attivo.
### `option_chain_snapshots`
Snapshot della catena opzioni Deribit prelevata settimanalmente
(cron `55 13 * * MON`, 5 minuti prima del trigger entry). Ogni
tick contiene un quote per strumento entro la finestra
`[dte_min, dte_max]` di config; tutti i quote prelevati nello stesso
tick condividono ``timestamp``. Migration `0005`.
Snapshot della catena opzioni Deribit prelevata in continuo
(cron `*/15 * * * *`, allineato a `market_snapshots`). Crypto è
24/7: l'accumulo dataset deve essere continuo, non gateato sulla
settimana. Ogni tick contiene un quote per strumento entro la
finestra `[dte_min, dte_max]` di config; tutti i quote prelevati
nello stesso tick condividono ``timestamp``. Migration `0005`.
```sql
CREATE TABLE option_chain_snapshots (
@@ -259,8 +260,10 @@ sugli strike candidati al picker.
con prezzi reali invece di Black-Scholes), la calibrazione empirica
dello skew premium, la validazione ex-post dello strike picker.
Volume atteso: ~50 strike × 3 scadenze × 1 snapshot/settimana ×
17 colonne ≈ 12 KB/settimana, ~600 KB/anno.
Volume atteso (cron `*/15 * * * *`, ~96 snapshot/giorno):
~50 strike × 3 scadenze × 96 snap/giorno × 17 colonne ≈ ~1.1 MB/giorno,
~400 MB/anno. Considerare politiche di retention (archive trimestrale
in parquet) se il bot gira a lungo.
## Log file
+6 -4
View File
@@ -27,9 +27,11 @@ L'avvio è progettato per essere **safe**: se qualcosa non torna, il
sistema si rifiuta di operare. Mai partire con uno stato dubbio o un
ambiente diverso da quello atteso.
## Flusso 2 — Settimanale (entry)
## Flusso 2 — Daily (entry)
Trigger: cron `0 14 * * MON` (lunedì 14:00 UTC).
Trigger: cron `0 14 * * *` (ogni giorno 14:00 UTC). Crypto è 24/7:
la cadenza di candidatura non è gateata sulla settimana — sono i gate
quantitativi a decidere se entrare o saltare il giorno.
```
START
@@ -219,7 +221,7 @@ proposed
| Cron | Trigger | Frequenza |
|---|---|---|
| `0 14 * * MON` | Entry evaluation | Settimanale |
| `0 14 * * *` | Entry evaluation | Giornaliera |
| `0 2,14 * * *` | Position monitoring | 2× giorno |
| `0 12 1 * *` | Kelly recalibration | Mensile |
| `*/5 * * * *` | Health check | 5 min |
@@ -237,7 +239,7 @@ Il bot riconosce due interruttori indipendenti, letti da
| Variabile d'ambiente | Default | Cosa abilita |
|---|---|---|
| `CERBERO_BITE_ENABLE_DATA_ANALYSIS` | `true` | Job `market_snapshot` ogni 15 min: raccolta dati MCP, scrittura tabella `market_snapshots`, calibrazione soglie. |
| `CERBERO_BITE_ENABLE_STRATEGY` | `false` | Job `entry` (lunedì 14:00 UTC) e `monitor` (2× giorno): valutazione regole §2-§9 di `01-strategy-rules.md` e proposta/esecuzione ordini. |
| `CERBERO_BITE_ENABLE_STRATEGY` | `false` | Job `entry` (daily 14:00 UTC) e `monitor` (2× giorno): valutazione regole §2-§9 di `01-strategy-rules.md` e proposta/esecuzione ordini. |
I job di infrastruttura (`health`, `backup`, `manual_actions`) sono
**sempre attivi**, indipendentemente dai flag, perché tengono in vita il
+1 -1
View File
@@ -54,7 +54,7 @@ cerbero-bite kill-switch disarm --reason "<motivo>" \
L'operazione è transazionale: SQLite `system_state.kill_switch = 0` +
una linea `KILL_SWITCH_DISARMED` nella audit chain con il motivo. Il
disarm non riavvia automaticamente lo scheduler; è il prossimo tick
naturale (entry settimanale o monitor 12h) a far ripartire la
naturale (entry giornaliero o monitor 12h) a far ripartire la
decisione.
## Cap di rischio (oltre alle regole di strategia)
+5 -5
View File
@@ -157,9 +157,9 @@ chain:
| Test | Scenario |
|---|---|
| `test_weekly_open_happy_path` | Tutto OK → proposta inviata |
| `test_weekly_open_no_strike_available` | Chain vuota nel range delta |
| `test_weekly_open_macro_blocks` | FOMC entro 5 giorni |
| `test_daily_open_happy_path` | Tutto OK → proposta inviata |
| `test_daily_open_no_strike_available` | Chain vuota nel range delta |
| `test_daily_open_macro_blocks` | FOMC entro 5 giorni |
| `test_monitor_profit_take` | Mark = 50% credito → close_profit |
| `test_monitor_vol_stop` | DVOL +12 → close_vol |
| `test_recovery_after_crash_open_position` | Crash mid-fill, restart, riconcilia |
@@ -175,8 +175,8 @@ checked-in.
```
tests/golden/
├── 2026-04-27_weekly_open_bull_put.yaml # input snapshot
├── 2026-04-27_weekly_open_bull_put.golden # output atteso
├── 2026-04-27_daily_open_bull_put.yaml # input snapshot
├── 2026-04-27_daily_open_bull_put.golden # output atteso
└── runner.py
```
+2 -2
View File
@@ -114,7 +114,7 @@ Tasks:
4. `runtime/alert_manager.py` — escalation policy
Test integration su scenari completi (vedi `08-testing-validation.md`):
- weekly open happy path
- daily open happy path
- monitor profit take
- monitor vol stop
- recovery dopo crash
@@ -206,7 +206,7 @@ Setup:
Metriche da raccogliere:
- Numero proposte settimanali emesse
- Numero proposte giornaliere emesse
- Quante passano i filtri
- Win rate, avg P&L paper
- Discrepanze tra mid stimato e fill reale (slippage)
+2 -2
View File
@@ -24,8 +24,8 @@ asset:
# === ENTRY ===
entry:
# finestra di valutazione settimanale
cron: "0 14 * * MON" # lunedì 14:00 UTC
# finestra di valutazione giornaliera (crypto 24/7)
cron: "0 14 * * *" # ogni giorno 14:00 UTC
skip_holidays_country: "IT"
# filtri di accesso (vedi 01-strategy-rules.md §2)
+24 -21
View File
@@ -12,7 +12,7 @@
## TL;DR
Cerbero Bite vende **credit spread settimanali su ETH/Deribit** quando
Cerbero Bite vende **credit spread su ETH/Deribit (DTE 14-21, valutati ogni giorno)** quando
la volatilità implicita è **abbastanza alta da pagare bene**, il
mercato non è in **stress di liquidazione**, non ci sono **eventi macro
forti** in finestra, e il bias direzionale è **chiaro** (bull o bear).
@@ -22,8 +22,9 @@ strategia.
Ogni 15 minuti raccoglie 1 riga per asset (ETH e BTC) nella tabella
`market_snapshots`. Quei dati alimentano tre obiettivi distinti:
1. **Decisione live** — l'entry ciclo del lunedì 14:00 UTC legge i
campi più freschi per dire "go/no-go".
1. **Decisione live** — l'entry ciclo daily alle 14:00 UTC legge i
campi più freschi per dire "go/no-go" (crypto è 24/7: la cadenza
non è gateata sulla settimana, decidono i gate quantitativi).
2. **Monitoring continuo** — il decision loop di gestione attiva
confronta la situazione con quella all'apertura.
3. **Calibrazione** — la pagina `📐 Calibrazione` usa la distribuzione
@@ -197,7 +198,7 @@ Quanto segue è la versione "leggibile" delle regole §2-§9 di
`01-strategy-rules.md`. Ogni passo cita i campi di
`market_snapshots` che lo alimentano.
### Fase 1 — Trigger (lunedì 14:00 UTC, festività italiane escluse)
### Fase 1 — Trigger (daily 14:00 UTC, festività italiane escluse se `skip_holidays_country` è on)
```
SE NESSUNA posizione aperta
@@ -211,7 +212,7 @@ SE NESSUNA posizione aperta
ALLORA
procedi alla Fase 2
ALTRIMENTI
no entry, log motivo, ritento la settimana successiva
no entry, log motivo, ritento il giorno successivo
```
### Fase 2 — Bias e struttura
@@ -363,8 +364,8 @@ capitale **non aumenta** i contratti per trade.
### Frequenza realistica di entry
La regola si valuta una volta a settimana, ma la maggioranza dei
lunedì viene saltata per:
La regola si valuta **una volta al giorno** (crypto è 24/7), ma la
maggioranza dei giorni viene saltata per:
| Motivo di skip | Frequenza tipica |
|---|---|
@@ -372,11 +373,12 @@ lunedì viene saltata per:
| Bias non chiaro (trend × funding discordi o entrambi neutri senza IC) | 2535% |
| Macro entro DTE | 1020% |
| Funding o liquidation risk fuori soglia | 515% |
| Capitale o sizing insufficiente | 05% |
| Capitale, sizing insufficiente o concurrency cap raggiunto | 515% |
**Risultato netto: 3050% delle settimane finisce in entry effettiva
⇒ 1525 trade / anno** (52 lunedì × 3050%). Le altre settimane il
bot sta fermo. È il design.
**Risultato netto: ~3040% dei giorni finisce in entry effettiva
⇒ 110145 trade / anno** (365 candidature × pass-rate, capped da
`max_concurrent_positions`). I restanti giorni il bot sta fermo:
è il design — la disciplina è la strategia.
### Win-rate atteso (short delta 0.12 + profit-take 50%)
@@ -450,11 +452,11 @@ In modalità data-only (oggi) il P/L atteso è **0** — l'engine
2. **Validare** i filtri quant osservando ex-post quanti tick
sarebbero stati filtrati (vedi pagina `📐 Calibrazione`, colonna
"% bloccato dalla soglia").
3. **Misurare** la quota effettiva di lunedì che superano i filtri
3. **Misurare** la quota effettiva di giorni che superano i filtri
nel proprio regime, prima di committare capitale.
> Suggerimento: 4 settimane di dati = 4 lunedì × probabilità entry =
> 12 candidate entry effettive. **Aspettare almeno 8 settimane**
> Suggerimento: 30 giorni di dati = 30 candidature × probabilità entry
> ≈ 912 candidate entry effettive. **Aspettare almeno 60 giorni**
> prima di tarare le soglie dà uno storico con dispersione
> sufficiente per decisioni non-rumorose.
@@ -560,8 +562,8 @@ step di calibrazione, vedi §4-quinquies in roadmap).
- **Conservativa / golden config**: `enabled=false, min=0`. Tutti i
setup passano questo gate, anche con IV-RV negativa. Motivo: nei
primi 8 turni di lunedì non si hanno abbastanza tick per stabilire
che soglia ha senso nel proprio regime. Lasciamo la pagina
primi 60 giorni non si hanno abbastanza tick per stabilire che
soglia ha senso nel proprio regime. Lasciamo la pagina
`📐 Calibrazione` mostrare la distribuzione e poi alziamo
manualmente.
- **Aggressiva**: `enabled=true, min=3`. Il profilo aggressivo già di
@@ -599,7 +601,8 @@ questa logica: muovi da 0.72 a 0.78 e vedi l'APR scattare.
In aggiunta a `market_snapshots` (cron `*/15`), il bot raccoglie
ora una seconda fonte di dati: la **catena opzioni Deribit completa**
ogni lunedì alle 13:55 UTC (5 minuti prima del trigger entry).
ogni 15 minuti (cron `*/15`, allineato a `market_snapshots` — crypto
è 24/7, l'accumulo dataset deve essere continuo).
Tabella `option_chain_snapshots` — vedi `05-data-model.md` per lo
schema. Cosa registra per ogni strumento entro la finestra
@@ -628,7 +631,7 @@ schema. Cosa registra per ogni strumento entro la finestra
- `cerbero-bite option-chain trigger` — esegue UNA volta il
collector senza aspettare il cron. Utile per test e per popolare
prima del primo lunedì utile.
prima del primo tick utile.
- `cerbero-bite option-chain analyze [--bias bull_put|bear_call]`
legge l'ultimo snapshot, simula il selector di strike con la
strategy passata e stampa: short/long strike, delta, width,
@@ -643,9 +646,9 @@ Tre euristiche operative sui campi raccolti:
1. **Premio "ricco":** `iv_minus_rv` consistentemente > 5 punti per
N giorni → il regime sta pagando bene la vendita di vol. Sono i
periodi in cui la strategia ha edge maggiore.
2. **Premio "magro":** `dvol < 35` per più giorni → la finestra del
lunedì viene saltata. Non è un fallimento: è la disciplina che
funziona.
2. **Premio "magro":** `dvol < 35` per più giorni → la finestra
giornaliera viene saltata. Non è un fallimento: è la disciplina
che funziona.
3. **Stress imminente:** `liquidation_*_risk = high` o spike di
`oi_delta_pct_4h` (> 5% in valore assoluto) + funding ai limiti
→ atteso vol stop / time stop attivi nei prossimi cicli, anche
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,316 @@
# IV-RV adaptive entry gate — design
**Status**: drafted, awaiting implementation plan
**Date**: 2026-05-08
**Author**: brainstorming session (operator + Claude)
**Roadmap origin**: `docs/13-strategia-spiegata.md` §4-quater, hardening punti 1 e 2
## 1. Problema
Il gate IV-richness in `core/entry_validator.py:140-152` confronta `ctx.iv_minus_rv` con la soglia statica `entry.iv_minus_rv_min` (config). I dati raccolti in `market_snapshots` mostrano due problemi sul campione 2026-05-01 → 2026-05-08:
| metrica | ETH | BTC |
|---|---|---|
| IV-RV p25 | 1.87 | 5.48 |
| IV-RV p50 | 2.70 | 6.88 |
| IV-RV p90 | 8.52 | 8.26 |
| Pearson(\|drift%\|, IV-RV mean) | **0.02** | **+0.54** |
| Trend giornaliero IV-RV mean | 1.64 → 8.96 (+5.4×) | 4.78 → 8.11 (+1.7×) |
ETH mostra un IV richening monotono **decoupled dal drift realised** — la soglia statica `min=3` (profilo Aggressiva) avrebbe escluso il 50%+ dei tick nei primi 4 giorni e il 0% degli ultimi 3, sopra una distribuzione che non è stazionaria. Su BTC è meno drammatico ma il problema è strutturale: il regime di IV cambia, la soglia no.
## 2. Obiettivo
Sostituire la soglia statica con un meccanismo adattivo che si auto-calibra al regime corrente, senza richiedere intervento manuale dell'operatore. Il gate deve restare semanticamente "non vendere vol senza margine misurabile sopra la RV", solo che il "margine misurabile" è ora derivato dalla distribuzione storica recente invece che hardcoded.
## 3. Approccio scelto: Hybrid (P25 rolling + Vol-of-Vol guard)
Decisione presa nel brainstorming dopo aver scartato:
- **Solo percentile rolling**: insufficiente, non protegge da regime shift bruschi (DVOL salta di 5+ pt in 24h)
- **Solo regime detection (HMM/cluster)**: troppo opaco e ad alto rischio di overfit con 8 giorni di dati
L'hybrid bilancia due controlli additivi:
1. **Soglia adattiva** = P25 di IV-RV nella finestra rolling
2. **Vol-of-Vol guard** = blocco se |ΔDVOL_24h| ≥ 5 pt (regime shift detector)
## 4. Comportamento del gate
> **Errata 2026-05-10** — design originale assumeva `n_days = len(history) // 96` (cadenza fissa 96 tick/giorno). Refattorizzato a **distinct-days policy**: il caller interroga il repository per (a) il numero di giorni di calendario distinti coperti e (b) i valori della finestra scelta. Questo permette di mischiare cadenze (tick live 15 min + backfill daily) senza assumere un fattore costante. Sotto il pseudo-codice aggiornato.
```
def validate_iv_richness_adaptive(ctx, cfg, repo):
if not cfg.entry.iv_minus_rv_filter_enabled:
return PASS # gate off
# 1) Soglia adattiva — distinct-days policy
n_days = repo.count_iv_rv_distinct_days(
asset=ctx.asset, max_days=cfg.window_target_days,
)
if n_days < 1:
return PASS # warmup hard: nessun giorno coperto
if n_days >= cfg.window_target_days:
window_days = cfg.window_target_days # ≥60g → finestra fissa 60g
elif n_days >= cfg.window_min_days:
window_days = cfg.window_min_days # 30-60g → finestra fissa 30g
else:
window_days = cfg.window_target_days # 1-30g → query tutta la storia disp.
history = repo.iv_rv_values_for_window(
asset=ctx.asset, window_days=window_days,
)
threshold = max(percentile(history, cfg.percentile),
cfg.absolute_floor)
if ctx.iv_minus_rv < threshold:
return SKIP("IV richness below P25 rolling")
# 2) Vol-of-vol guard (additivo)
if cfg.vol_of_vol_guard_enabled:
dvol_24h_ago = repo.dvol_lookback(asset=ctx.asset, hours=24)
if dvol_24h_ago is not None and \
abs(ctx.dvol - dvol_24h_ago) >= cfg.vol_of_vol_threshold:
return SKIP("DVOL shifted ≥5pt in 24h")
return PASS
```
### 4.1 Warmup behavior
Tutte le soglie sono espresse in **giorni di calendario distinti** coperti da almeno un record valido (`fetch_ok=1``iv_minus_rv IS NOT NULL`).
| storia disponibile | finestra usata | comportamento |
|---|---|---|
| 0 giorni distinti | — | gate disabled (PASS), log `GATE_WARMUP_INSUFFICIENT` |
| 1 g ≤ giorni < 30 g | tutta la storia | percentile della finestra disponibile (decisione utente) |
| 30 g ≤ giorni < 60 g | ultimi 30 g | finestra fissa 30g |
| ≥ 60 g | ultimi 60 g | finestra fissa 60g (target) |
I valori della finestra contribuiscono uno-a-uno al percentile: un tick a 15 min e un record di backfill daily hanno lo stesso peso. Mix di cadenze diverse è statisticamente sbilanciato finché i tick live non saturano la finestra; questa è una scelta deliberata per non rinunciare allo storico backfill.
### 4.2 Soglia = `max(P25, floor)`
`floor` è il vecchio `iv_minus_rv_min` riutilizzato come *absolute floor*. Permette:
- backwards compat: se `adaptive_enabled=False`, comportamento identico ad oggi
- safety: anche se P25 storico fosse ≈0 (regime IV bassa persistente), l'operatore può tenere un floor minimo (es. 1 vol pt) per evitare di vendere vol mai
## 5. Schema config (`config/schema.py`)
Aggiunte alla classe `EntryConfig`:
```python
class EntryConfig(BaseModel):
# campi esistenti
iv_minus_rv_filter_enabled: bool = False
iv_minus_rv_min: Decimal = Decimal("0") # ora è absolute_floor
# nuovi — gate adattivo
iv_minus_rv_adaptive_enabled: bool = False
iv_minus_rv_percentile: Decimal = Decimal("0.25")
iv_minus_rv_window_target_days: int = 60
iv_minus_rv_window_min_days: int = 30
# nuovi — vol-of-vol guard
vol_of_vol_guard_enabled: bool = False
vol_of_vol_threshold_pt: Decimal = Decimal("5")
vol_of_vol_lookback_hours: int = 24
```
### 5.1 Profili predefiniti
**Conservativa / golden** (`config/golden.yaml`):
```yaml
entry:
iv_minus_rv_filter_enabled: false
iv_minus_rv_adaptive_enabled: false
vol_of_vol_guard_enabled: false
```
Comportamento invariato rispetto a oggi.
**Aggressiva** (`config/aggressive.yaml`):
```yaml
entry:
iv_minus_rv_filter_enabled: true
iv_minus_rv_adaptive_enabled: true
iv_minus_rv_min: 0 # floor 0, lascia decidere il P25 rolling
iv_minus_rv_percentile: 0.25
iv_minus_rv_window_target_days: 60
iv_minus_rv_window_min_days: 30
vol_of_vol_guard_enabled: true
vol_of_vol_threshold_pt: 5
```
### 5.2 Backwards compat
Se `iv_minus_rv_adaptive_enabled=False` e `iv_minus_rv_filter_enabled=True`, il validator usa il path legacy `iv_rv < iv_minus_rv_min` esattamente come oggi. Nessuna regressione comportamentale per chi non ha attivato l'adaptive.
## 6. Architettura
### 6.1 Modulo `core/adaptive_threshold.py`
Funzione pura, testabile senza I/O. La selezione della finestra è
delegata al caller (separation of concerns):
```python
def compute_adaptive_threshold(
history: Sequence[Decimal],
*,
n_days: int,
percentile: Decimal,
absolute_floor: Decimal,
) -> Decimal | None:
"""Ritorna None se warmup hard (n_days==0 o history vuota),
altrimenti max(P_q(history), absolute_floor)."""
```
### 6.2 Repository (`state/repository.py`)
Tre metodi su `Repository` (uno preesistente):
```python
def count_iv_rv_distinct_days(
self, *, asset: str, max_days: int, as_of: datetime | None = None,
) -> int:
"""Numero di giorni di calendario distinti con almeno un IV-RV
valido nell'intervallo [as_of - max_days, as_of]."""
def iv_rv_values_for_window(
self, *, asset: str, window_days: int, as_of: datetime | None = None,
) -> list[Decimal]:
"""Valori IV-RV ordinati ASC su [as_of - window_days, as_of]."""
def dvol_lookback(self, *, asset: str, hours: int) -> Decimal | None:
"""DVOL del tick più vicino a now-hours, ±15min tolerance. None se gap."""
```
Usa l'index esistente `idx_market_snapshots_asset_ts`. Nessuna nuova migration.
### 6.3 Inline nel validator
`core/entry_validator.py` chiama `compute_adaptive_threshold` con i dati dal repo. Nessun caching, stateless. La query per finestra 60g (5760 righe per asset) costa ms-level con index — non vale la pena introdurre cache da invalidare.
### 6.4 Audit / logging
Ogni entry cycle scrive in `decisions`:
- `inputs_json`: `{iv_rv_now, threshold_used, dvol_now, dvol_24h_ago, n_history, window_used_days}`
- `outputs_json`: `{gate: "iv_richness_adaptive", verdict: PASS|SKIP, reason}`
Permette ricostruzione ex-post: perché un trade è stato saltato e con quali numeri.
## 7. GUI Calibrazione (`pages/6_📐_Calibrazione.py`)
Aggiunta sezione "Gate adattivo" sopra ai percentili statici esistenti — questi ultimi NON vengono modificati (restano per analisi).
```
┌─ 🎯 Gate IV-RV adattivo ──────────────────────────────────┐
│ Status: 🟢 Attivo (Aggressiva) | 🟡 Warmup (n=8/30g) │
│ │
│ Soglia P25 rolling (corrente) 2.74 vol pts │
│ IV-RV ultimo tick 8.96 vol pts ✅ │
│ Floor assoluto 0.00 vol pts │
│ │
│ Evoluzione 7g (sparkline) ▁▂▂▃▄▆▇ │
│ │
│ ── VoV guard ── │
│ ΔDVOL ultime 24h 0.43 pt ✅ │
│ Soglia VoV 5.00 pt │
│ │
│ Decisione hypothetical: PASS │
└──────────────────────────────────────────────────────────┘
```
La GUI usa la stessa funzione `compute_adaptive_threshold` del validator → unica fonte di verità. Refresh manuale al page load (coerente con resto GUI).
## 8. Error handling
Principio: **fail-open** in tutti i casi di dato mancante. Il gate adattivo è additivo sopra ai gate hard esistenti (delta band, credit, ecc.); il dato mancante non deve trasformare un trade non voluto in trade fatto, ma neppure deve bloccare entry valide se è il dato del gate stesso a mancare.
| Scenario | Comportamento | Loggato come |
|---|---|---|
| `iv_rv_history` ritorna 0 righe | gate = PASS | `GATE_WARMUP_NO_DATA` |
| Storia < 96 tick (1g) | gate = PASS, threshold None | `GATE_WARMUP_INSUFFICIENT` |
| `dvol_lookback` = None (gap dati 24h fa) | VoV guard = PASS | `VOV_GUARD_NO_LOOKBACK` |
| `ctx.iv_minus_rv` = None | gate bypassato (riga 146 esistente) | invariato |
| `ctx.dvol` = None | VoV guard bypassato, gate adattivo prosegue | invariato |
## 9. Testing strategy
### 9.1 Unit — `core/adaptive_threshold.py`
- Warmup: `n_days=0` → None
- Warmup difensivo: `n_days=0` ma history non vuota → None
- Difensivo: history vuota con `n_days>0` → None
- `n_days=1`, 96 tick → P25 sui 96
- Mix di cadenze (30 daily + 96 live) → percentile uno-a-uno
- Floor binding: P25=0.5, floor=3 → 3
- Floor non binding: P25=5, floor=0 → 5
- Percentile diverso: percentile=0.5 → mediana
- Validation: percentile ∉ [0,1] o `n_days<0` → ValueError
### 9.1bis Unit — `state/repository.py`
- `count_iv_rv_distinct_days`: 1 giorno → 1; 3 giorni misti → 3
- esclusione asset diversi, NULL e fetch_ok=0
- rispetto del cutoff `max_days`
- ValueError su `as_of` naive o `max_days≤0`
- `iv_rv_values_for_window`: ordine ASC, filtri equivalenti, ValueError input
### 9.2 Unit — `core/entry_validator.py`
Mock repo, focus sul flusso decisionale:
- Adaptive disabled, statico passa → PASS legacy
- Adaptive disabled, statico fail → SKIP legacy
- Adaptive enabled, IV-RV sopra P25 → PASS
- Adaptive enabled, IV-RV sotto P25 sopra floor → SKIP("rolling")
- VoV guard ON, ΔDVOL=6 pt → SKIP("vov")
- VoV guard ON, ΔDVOL=4 pt, gate principale pass → PASS
- VoV guard ON, dvol_lookback=None → guard bypass
- ctx.iv_minus_rv=None → bypass
- decisions log popolato con threshold, n_history, dvol_lookback
### 9.3 Integration — `tests/integration/test_entry_cycle_adaptive.py`
SQLite temp + fixture market_snapshots con 5760 tick (60g):
- Aggressiva con flag adattivo → entry passa solo nei tick sopra P25 della fixture
- Golden → invariato
- Warmup: DB con 50 tick → tutti pass, log `GATE_WARMUP_INSUFFICIENT`
- Regime shift fixture: DVOL salta da 50 a 56 in 24h → VoV guard scatta
### 9.4 Backtest sanity
Aggiunta nel report del CLI `backtest`: count distinto skip-reasons (`iv_rv_static`, `iv_rv_rolling`, `vov_guard`) per analisi ex-post.
### 9.5 GUI smoke
Manuale al deploy:
- Calibrazione carica con `enabled=False` (fallback grafico)
- Calibrazione mostra warmup status quando DB < 30g
- Refresh ricalcola coerente
### 9.6 Cosa NON testiamo
- Performance query (5760 righe con index = trascurabile)
- Concorrenza entry cycle / GUI (WAL abilitato)
- Migrazione (nessuna tabella nuova)
## 10. Out of scope
- Regime detection avanzato (HMM, cluster) — esplicitamente scartato per opacità
- Soglie per-asset diverse — il P25 si calibra naturalmente per asset (history filtrata per asset)
- Auto-attivazione adaptive su Conservativa quando warmup è completo — l'operatore decide manualmente quando passare al profilo aggressivo
- Multi-asset (ETH+BTC simultanei) — già scope §4-ter, indipendente da questo design
- Override manuale soglia da GUI — explicit no, l'obiettivo è autocalibrante
## 11. Decisioni prese durante brainstorming
| # | Domanda | Scelta |
|---|---|---|
| 1 | Approccio | Hybrid (percentile + VoV guard) |
| 2 | Warmup | Percentile della finestra disponibile (anche se <30g) |
| 3 | Percentile | P25 (allineato roadmap) |
| 4 | Window | Target 60g, attivazione a 30g, sotto usa quel che c'è |
| 5 | VoV soglia | 5 pt vol in 24h |
| 6 | Architettura | Inline nel validator, stateless, no cache DB |
| 7 | GUI | Pannello informativo aggiunto, slider esistenti invariati |
+281
View File
@@ -0,0 +1,281 @@
"""Backfill IV-RV history from Deribit public REST API.
Use case: il gate IV-RV adattivo richiede 30 giorni di storia per
attivarsi (spec ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``).
Quando la pipeline ha pochi giorni di tick live, questo script popola
``market_snapshots`` con record giornalieri storici calcolati da
DVOL Deribit + closes ETH-PERPETUAL/BTC-PERPETUAL pubblici.
Idempotente: usa ``INSERT OR REPLACE`` sulla PK ``(timestamp, asset)``
con timestamp fissato a 12:00 UTC del giorno di calendario.
``fetch_errors_json='{"backfill":true}'`` permette di distinguere i
record sintetici dai tick live in audit.
I record contribuiscono al gate adattivo come singoli punti
(distinct-days policy), uno per giorno: lo statistical bias è coperto
dalla spec §4.1.
Esempio:
python scripts/backfill_iv_rv.py --db data/state.sqlite --days 45
"""
from __future__ import annotations
import argparse
import json
import math
import sqlite3
import statistics
import urllib.request
from dataclasses import dataclass
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
__all__ = [
"BackfillRow",
"build_backfill_records",
"compute_rv30d_annualized",
]
_DERIBIT = "https://www.deribit.com/api/v2/public"
_RV_LOOKBACK_DAYS = 30
_TRADING_DAYS_PER_YEAR = 365
@dataclass(frozen=True)
class BackfillRow:
"""Una riga sintetica destinata a ``market_snapshots``."""
timestamp: datetime
asset: str
spot: Decimal
dvol: Decimal
realized_vol_30d: Decimal
iv_minus_rv: Decimal
fetch_ok: bool = True
# ---------------------------------------------------------------------------
# Pure compute layer (TDD: tests/unit/test_backfill_iv_rv.py)
# ---------------------------------------------------------------------------
def compute_rv30d_annualized(closes: list[Decimal]) -> Decimal:
"""Volatilità realizzata 30g annualizzata in **punti vol** (% annuali).
Args:
closes: ``31`` close consecutivi (uno al giorno) produce 30
log-returns.
Returns:
``stdev(log_returns) * sqrt(365) * 100`` come ``Decimal``.
Raises:
ValueError: se ``len(closes) < 31``.
"""
if len(closes) < _RV_LOOKBACK_DAYS + 1:
raise ValueError(
f"need at least {_RV_LOOKBACK_DAYS + 1} closes, got {len(closes)}"
)
log_returns = [
math.log(float(closes[i] / closes[i - 1]))
for i in range(1, _RV_LOOKBACK_DAYS + 1)
]
sigma_daily = statistics.stdev(log_returns)
annualized = sigma_daily * math.sqrt(_TRADING_DAYS_PER_YEAR) * 100.0
return Decimal(str(annualized))
def build_backfill_records(
*,
asset: str,
spots_by_day: dict[str, Decimal],
dvols_by_day: dict[str, Decimal],
oldest_day: date,
) -> list[BackfillRow]:
"""Compone le righe di backfill per i giorni nella finestra richiesta.
Per ogni giorno target ``D`` (da ``oldest_day`` a oggi compreso) la
riga viene emessa solo se: (a) DVOL e spot sono presenti per ``D``,
(b) la serie di spot dispone dei 30 giorni precedenti necessari per
il calcolo di RV30d.
Il timestamp è fissato a 12:00 UTC, scelta che evita il rollover
delle candele Deribit (vedi anomalia DVOL 00:00 UTC nei market
snapshots live).
"""
sorted_days = sorted(spots_by_day.keys())
records: list[BackfillRow] = []
for day_str in sorted_days:
day = date.fromisoformat(day_str)
if day < oldest_day:
continue
if day_str not in dvols_by_day:
continue
rv_window = [
day - timedelta(days=i) for i in range(_RV_LOOKBACK_DAYS, -1, -1)
]
if not all(d.isoformat() in spots_by_day for d in rv_window):
continue
closes = [spots_by_day[d.isoformat()] for d in rv_window]
rv = compute_rv30d_annualized(closes)
dvol = dvols_by_day[day_str]
spot = spots_by_day[day_str]
records.append(
BackfillRow(
timestamp=datetime(day.year, day.month, day.day, 12, 0, tzinfo=UTC),
asset=asset,
spot=spot,
dvol=dvol,
realized_vol_30d=rv,
iv_minus_rv=dvol - rv,
)
)
return records
# ---------------------------------------------------------------------------
# I/O layer (network + sqlite)
# ---------------------------------------------------------------------------
def _http_get_json(url: str, timeout_s: float = 30.0) -> dict:
with urllib.request.urlopen(url, timeout=timeout_s) as resp:
return json.loads(resp.read())
def fetch_dvol_daily(currency: str, days: int) -> dict[str, Decimal]:
"""Mappa ``YYYY-MM-DD -> DVOL close`` per gli ultimi ``days`` giorni."""
end_ms = int(datetime.now(UTC).timestamp() * 1000)
start_ms = end_ms - days * 86_400_000
url = (
f"{_DERIBIT}/get_volatility_index_data"
f"?currency={currency}"
f"&start_timestamp={start_ms}&end_timestamp={end_ms}"
f"&resolution=86400"
)
payload = _http_get_json(url)
data = (payload.get("result") or {}).get("data") or []
out: dict[str, Decimal] = {}
for row in data:
# row = [ts_ms, open, high, low, close]
if not isinstance(row, list) or len(row) < 5:
continue
ts = datetime.fromtimestamp(row[0] / 1000, tz=UTC).date().isoformat()
out[ts] = Decimal(str(row[4]))
return out
def fetch_spot_daily(instrument: str, days: int) -> dict[str, Decimal]:
"""Mappa ``YYYY-MM-DD -> close USD`` per ``instrument`` su ``days`` giorni."""
end_ms = int(datetime.now(UTC).timestamp() * 1000)
start_ms = end_ms - days * 86_400_000
url = (
f"{_DERIBIT}/get_tradingview_chart_data"
f"?instrument_name={instrument}"
f"&start_timestamp={start_ms}&end_timestamp={end_ms}"
f"&resolution=1D"
)
payload = _http_get_json(url)
result = payload.get("result") or {}
ticks = result.get("ticks") or []
closes = result.get("close") or []
out: dict[str, Decimal] = {}
for ts_ms, close in zip(ticks, closes, strict=False):
ts = datetime.fromtimestamp(ts_ms / 1000, tz=UTC).date().isoformat()
out[ts] = Decimal(str(close))
return out
def write_records(db_path: str, records: list[BackfillRow]) -> int:
"""Insert/replace dei record in market_snapshots. Ritorna la rowcount."""
if not records:
return 0
conn = sqlite3.connect(db_path)
try:
with conn:
for r in records:
conn.execute(
"INSERT OR REPLACE INTO market_snapshots ("
"timestamp, asset, spot, dvol, realized_vol_30d, iv_minus_rv, "
"funding_perp_annualized, funding_cross_annualized, "
"dealer_net_gamma, gamma_flip_level, oi_delta_pct_4h, "
"liquidation_long_risk, liquidation_short_risk, "
"macro_days_to_event, fetch_ok, fetch_errors_json"
") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
r.timestamp.astimezone(UTC).isoformat(),
r.asset,
str(r.spot),
str(r.dvol),
str(r.realized_vol_30d),
str(r.iv_minus_rv),
None,
None,
None,
None,
None,
None,
None,
None,
1 if r.fetch_ok else 0,
'{"backfill":true}',
),
)
return len(records)
finally:
conn.close()
def backfill_asset(db_path: str, asset: str, days: int) -> int:
"""Esegue l'intero backfill per ``asset`` e ritorna il numero di
record inseriti/sostituiti.
"""
instrument = f"{asset.upper()}-PERPETUAL"
fetch_window_days = days + _RV_LOOKBACK_DAYS + 5 # margine per il lookback RV
spots = fetch_spot_daily(instrument, fetch_window_days)
dvols = fetch_dvol_daily(asset.upper(), fetch_window_days)
today = datetime.now(UTC).date()
oldest = today - timedelta(days=days)
records = build_backfill_records(
asset=asset.upper(),
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=oldest,
)
return write_records(db_path, records)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--db",
default="data/state.sqlite",
help="path a state.sqlite (default: data/state.sqlite)",
)
parser.add_argument(
"--days",
type=int,
default=45,
help="quanti giorni di backfill emettere (default: 45)",
)
parser.add_argument(
"--assets",
nargs="+",
default=["ETH", "BTC"],
help="asset symbols (default: ETH BTC)",
)
args = parser.parse_args()
total = 0
for asset in args.assets:
n = backfill_asset(args.db, asset, args.days)
print(f"{asset}: inserted/replaced {n} backfill rows")
total += n
print(f"TOTAL: {total}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+4 -4
View File
@@ -789,7 +789,7 @@ def backtest(
table = Table(title=f"Backtest report — {strategy_path.name}")
table.add_column("Metrica", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Picks (lunedì 14:00)", str(report.n_picks))
table.add_row("Picks (daily 14:00)", str(report.n_picks))
table.add_row(
"Accettati dai filtri",
f"{report.n_accepted} ({report.n_accepted / max(1, report.n_picks):.0%})",
@@ -815,7 +815,7 @@ def backtest(
if report.skip_reasons:
skip_table = Table(title="Motivi di skip aggregati")
skip_table.add_column("Motivo")
skip_table.add_column("Settimane", justify="right")
skip_table.add_column("Giorni", justify="right")
for reason, count in sorted(
report.skip_reasons.items(), key=lambda kv: -kv[1]
):
@@ -1004,8 +1004,8 @@ def option_chain_trigger(
) -> None:
"""Esegue UNA volta il collector della catena opzioni e persiste in DB.
Utile per popolare i dati senza aspettare il cron settimanale del
job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline
Utile per popolare i dati senza aspettare il cron del job
``option_chain_snapshot``. Riusa esattamente la stessa pipeline
schedulata.
"""
from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415
+5 -2
View File
@@ -9,7 +9,7 @@ the ``core/`` algorithms stay in their preferred numeric domain.
from __future__ import annotations
import re
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Any, Literal
@@ -167,9 +167,12 @@ class DeribitClient:
) -> Decimal:
"""Return the latest DVOL value for ``currency``."""
when = (now or datetime.now(UTC)).astimezone(UTC)
# Window starts one day back so a tick fired exactly at 00:00 UTC
# — before Deribit has built today's 1D candle — still has
# yesterday's close to fall back on (see candles[-1] branch).
body = {
"currency": currency,
"start_date": (when.date()).isoformat(),
"start_date": (when.date() - timedelta(days=1)).isoformat(),
"end_date": when.date().isoformat(),
"resolution": "1D",
}
+33 -5
View File
@@ -47,7 +47,7 @@ class EntryConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
cron: str = "0 14 * * MON"
cron: str = "0 14 * * *"
skip_holidays_country: str = "IT"
# access filters (§2)
@@ -84,6 +84,24 @@ class EntryConfig(BaseModel):
iv_minus_rv_min: Decimal = Field(default=Decimal("0"))
iv_minus_rv_filter_enabled: bool = False
# IV richness gate adattivo (Phase 5+). Quando
# `iv_minus_rv_adaptive_enabled=True`, la soglia statica
# `iv_minus_rv_min` diventa il floor assoluto e la soglia
# effettiva è `max(P_q rolling, floor)` calcolata su
# `market_snapshots`. Vedi
# `docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md`.
iv_minus_rv_adaptive_enabled: bool = False
iv_minus_rv_percentile: Decimal = Field(default=Decimal("0.25"))
iv_minus_rv_window_target_days: int = 60
iv_minus_rv_window_min_days: int = 30
# Vol-of-Vol guard (§4-quater roadmap punto 2): blocca entry se
# |DVOL_now - DVOL_24h_ago| supera la soglia. Cattura regime
# shift bruschi non riflessi nel percentile rolling.
vol_of_vol_guard_enabled: bool = False
vol_of_vol_threshold_pt: Decimal = Field(default=Decimal("5"))
vol_of_vol_lookback_hours: int = 24
# ---------------------------------------------------------------------------
# Structure
@@ -183,7 +201,7 @@ class SizingConfig(BaseModel):
cap_per_trade_eur: Decimal = Field(default=Decimal("200"))
cap_aggregate_open_eur: Decimal = Field(default=Decimal("1000"))
max_concurrent_positions: int = 1
max_concurrent_positions: int = 5
max_contracts_per_trade: int = 4
dvol_adjustment: list[DvolAdjustmentBand] = Field(default_factory=_default_dvol_bands)
@@ -266,8 +284,8 @@ class AutoPauseConfig(BaseModel):
Quando abilitato, il rule engine valuta prima di ogni entry
il P/L cumulato delle ultime `lookback_trades` posizioni chiuse
in proporzione al capitale attuale. Se la perdita supera la
soglia, l'engine si auto-mette in pausa per `pause_weeks`
settimane (skip-week). La pausa si annulla automaticamente alla
soglia, l'engine si auto-mette in pausa per `pause_days`
giorni (skip-day). La pausa si annulla automaticamente alla
scadenza, oppure manualmente via comando dalla GUI.
Difende da regime change non rilevati dai filtri quant: se i
@@ -282,7 +300,7 @@ class AutoPauseConfig(BaseModel):
enabled: bool = False
lookback_trades: int = 5
max_drawdown_pct: Decimal = Field(default=Decimal("0.10"))
pause_weeks: int = 2
pause_days: int = 14
# ---------------------------------------------------------------------------
@@ -389,6 +407,16 @@ class StrategyConfig(BaseModel):
if self.entry.dvol_min >= self.entry.dvol_max:
raise ValueError("dvol_min must be < dvol_max")
e = self.entry
if e.iv_minus_rv_window_min_days >= e.iv_minus_rv_window_target_days:
raise ValueError(
"iv_minus_rv_window_min_days must be < iv_minus_rv_window_target_days"
)
if not (Decimal("0") < e.iv_minus_rv_percentile < Decimal("1")):
raise ValueError(
"iv_minus_rv_percentile must be in (0, 1)"
)
return self
@@ -0,0 +1,75 @@
"""Funzione pura per calcolare la soglia adattiva del gate IV-RV.
Spec: ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``.
Deterministic, no I/O. La selezione della finestra (target_days vs
min_days vs intera storia disponibile) è responsabilità del caller, che
interroga il repository con i parametri corretti e passa qui sia i
valori (``history``) sia il numero di giorni distinti coperti
(``n_days``). Questo permette di mischiare cadenze diverse tick live a
15 min e backfill daily senza assumere un fattore costante
``ticks_per_day``.
"""
from __future__ import annotations
from collections.abc import Sequence
from decimal import Decimal
__all__ = ["compute_adaptive_threshold"]
def compute_adaptive_threshold(
history: Sequence[Decimal],
*,
n_days: int,
percentile: Decimal,
absolute_floor: Decimal,
) -> Decimal | None:
"""Ritorna la soglia adattiva o ``None`` durante il warmup hard.
Args:
history: Sequenza dei valori IV-RV nella finestra scelta dal
caller. NULL e tick non riusciti devono essere già stati
filtrati upstream. L'ordine non è significativo per il
percentile.
n_days: Numero di giorni distinti coperti dalla storia
disponibile (calcolato dal caller, tipicamente con
``COUNT(DISTINCT date(timestamp))``). ``0`` warmup hard.
percentile: Quantile target nella distribuzione (es. ``0.25``).
absolute_floor: Floor minimo applicato dopo il calcolo del
percentile. La soglia restituita è
``max(P_q, absolute_floor)``.
Returns:
``None`` se ``n_days == 0`` o ``history`` è vuota (warmup hard,
gate disabilitato), altrimenti il percentile della finestra
bounded dal floor.
"""
if not (Decimal(0) <= percentile <= Decimal(1)):
raise ValueError(
f"percentile must be in [0, 1], got {percentile}"
)
if n_days < 0:
raise ValueError(f"n_days must be >= 0, got {n_days}")
if n_days == 0 or not history:
return None
return max(_percentile(history, percentile), absolute_floor)
def _percentile(values: Sequence[Decimal], q: Decimal) -> Decimal:
"""Linear-interpolated percentile, NumPy-compatible (method='linear').
Implementato in Decimal puro per evitare dipendenze numpy nel core.
"""
if not values:
raise ValueError("percentile of empty sequence")
sorted_v = sorted(values)
n = len(sorted_v)
k = (Decimal(n) - Decimal(1)) * q
f = int(k) # floor
c = min(f + 1, n - 1)
if f == c:
return sorted_v[f]
frac = k - Decimal(f)
return sorted_v[f] + (sorted_v[c] - sorted_v[f]) * frac
+29 -30
View File
@@ -2,7 +2,7 @@
Two layers, both pure functions:
1. **Entry-filter simulation** for each Monday 14:00 UTC tick in the
1. **Entry-filter simulation** for each daily 14:00 UTC tick in the
recorded snapshots, evaluate which §2 gates would have passed,
reconstructing :class:`EntryContext` from the snapshot. This part
is **rigorous**: it uses the same :func:`validate_entry` the live
@@ -49,12 +49,12 @@ __all__ = [
"BacktestEntry",
"BacktestExit",
"BacktestReport",
"MondayPick",
"DailyPick",
"bs_put_delta",
"bs_put_price",
"daily_picks",
"estimate_credit_eth",
"find_strike_for_delta",
"monday_picks",
"normal_cdf",
"run_backtest",
"simulate_entry_filters",
@@ -184,41 +184,40 @@ def estimate_credit_eth(
@dataclass(frozen=True)
class MondayPick:
"""Indice di un tick "Monday 14:00 UTC" nella time-series."""
class DailyPick:
"""Indice di un tick "daily h:00 UTC" nella time-series."""
timestamp: datetime
snapshot: MarketSnapshotRecord
def monday_picks(
def daily_picks(
snapshots: list[MarketSnapshotRecord],
*,
weekday: int = 0, # Monday
hour_utc: int = 14,
asset: str = "ETH",
) -> list[MondayPick]:
"""Estrae i tick più vicini a "Monday h:00 UTC" per ogni settimana.
) -> list[DailyPick]:
"""Estrae un tick per giorno all'ora ``hour_utc``.
``snapshots`` deve essere ordinato per timestamp ascending. Per ogni
occorrenza di ``weekday + hour_utc`` (es. lun 14:00) presa l'unica
riga ETH che la copre. Settimane senza tick a quell'ora vengono
saltate.
Crypto è 24/7, quindi non gateiamo sul giorno della settimana: per
ogni giorno di calendario presente in ``snapshots``, prendiamo la
riga ETH che cade a ``hour_utc:00``. Giorni senza tick a quell'ora
vengono saltati. ``snapshots`` deve essere ordinato per timestamp
ascending.
"""
picks: list[MondayPick] = []
seen_dates: set[tuple[int, int]] = set() # (iso_year, iso_week)
picks: list[DailyPick] = []
seen_dates: set[tuple[int, int, int]] = set() # (year, month, day)
for snap in snapshots:
if snap.asset.upper() != asset.upper():
continue
ts = snap.timestamp.astimezone(UTC)
if ts.weekday() != weekday or ts.hour != hour_utc:
if ts.hour != hour_utc:
continue
iso_y, iso_w, _ = ts.isocalendar()
key = (iso_y, iso_w)
key = (ts.year, ts.month, ts.day)
if key in seen_dates:
continue
seen_dates.add(key)
picks.append(MondayPick(timestamp=ts, snapshot=snap))
picks.append(DailyPick(timestamp=ts, snapshot=snap))
return picks
@@ -231,9 +230,8 @@ def _entry_context_from_snapshot(
"""Costruisce :class:`EntryContext` dal tick storico.
``None`` quando la riga non ha i campi minimi (spot, dvol, funding).
Nel filtro questo si traduce in "skip della settimana" è la
stessa logica del live: un tick incompleto è meglio di un'entry
al buio.
Nel filtro questo si traduce in "skip del giorno" è la stessa
logica del live: un tick incompleto è meglio di un'entry al buio.
"""
if snap.dvol is None or snap.funding_perp_annualized is None:
return None
@@ -254,21 +252,21 @@ def _entry_context_from_snapshot(
@dataclass(frozen=True)
class EntryFilterResult:
"""Esito del check filtri per una singola Monday pick."""
"""Esito del check filtri per una singola daily pick."""
pick: MondayPick
pick: DailyPick
accepted: bool
reasons: list[str]
skipped_for_data: bool # True se il tick non aveva i campi minimi
def simulate_entry_filters(
picks: list[MondayPick],
picks: list[DailyPick],
cfg: StrategyConfig,
*,
capital_usd: Decimal,
) -> list[EntryFilterResult]:
"""Per ogni Monday pick, valuta validate_entry come farebbe il live.
"""Per ogni daily pick, valuta validate_entry come farebbe il live.
Rigoroso: usa esattamente :func:`validate_entry` e :class:`EntryContext`.
Restituisce la lista degli esiti, una entry per pick.
@@ -500,7 +498,7 @@ class BacktestReport(BaseModel):
def _build_entry_from_pick(
pick: MondayPick,
pick: DailyPick,
cfg: StrategyConfig,
*,
capital_usd: Decimal,
@@ -566,7 +564,8 @@ def _max_drawdown_usd(equity: list[Decimal]) -> tuple[Decimal, Decimal]:
def _sharpe_annualized(pnls_usd: list[Decimal], capital_usd: Decimal) -> Decimal | None:
"""Annualized Sharpe approximation: 52 trade/anno (settimanali).
"""Annualized Sharpe approximation: ~120 trade/anno (entry daily,
crypto 24/7, post-filtri + concurrency cap).
Restituisce ``None`` se ci sono <5 trade o stdev = 0.
"""
@@ -578,7 +577,7 @@ def _sharpe_annualized(pnls_usd: list[Decimal], capital_usd: Decimal) -> Decimal
std = math.sqrt(var)
if std == 0:
return None
sharpe = mean / std * math.sqrt(52)
sharpe = mean / std * math.sqrt(120)
return Decimal(str(round(sharpe, 3)))
@@ -593,7 +592,7 @@ def run_backtest(
"""Esegue il backtest end-to-end sui ``snapshots`` ETH ordinati per ts."""
snapshots = sorted(snapshots, key=lambda s: s.timestamp)
eth_snapshots = [s for s in snapshots if s.asset.upper() == asset.upper()]
picks = monday_picks(eth_snapshots, asset=asset)
picks = daily_picks(eth_snapshots, asset=asset)
filter_results = simulate_entry_filters(picks, cfg, capital_usd=capital_usd)
# Tally skip reasons
+55 -8
View File
@@ -15,6 +15,7 @@ from decimal import Decimal
from pydantic import BaseModel, ConfigDict
from cerbero_bite.config import SpreadType, StrategyConfig
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
__all__ = [
"EntryContext",
@@ -50,6 +51,25 @@ class EntryContext(BaseModel):
# invalida l'entry).
iv_minus_rv: Decimal | None = None
# Valori IV-RV nella finestra rolling già scelta dal caller
# (entry_cycle): tutti i record validi su window_days, ASC, NULL e
# fetch_ok=0 esclusi. Caricata dal repository quando
# `iv_minus_rv_adaptive_enabled` è True. Tuple per coerenza con
# frozen=True.
iv_rv_history: tuple[Decimal, ...] = ()
# Numero di giorni di calendario distinti coperti dalla storia
# IV-RV disponibile (non solo dalla finestra `iv_rv_history`).
# ``0`` = warmup hard, gate disabilitato (fail-open). Calcolato dal
# caller via `repository.count_iv_rv_distinct_days`.
iv_rv_n_days: int = 0
# DVOL al tick più vicino a now - vol_of_vol_lookback_hours.
# ``None`` = gap nel dato (es. cron mancante 24h fa) → VoV guard
# skip. Caricato dal repository in `entry_cycle` quando
# `vol_of_vol_guard_enabled` è True.
dvol_24h_ago: Decimal | None = None
class EntryDecision(BaseModel):
"""Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons."""
@@ -140,16 +160,43 @@ def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision:
# §2.9: IV richness gate. Vendere vol senza un margine misurabile
# fra IV e RV è statisticamente neutro: l'edge della strategia
# esiste solo quando il premio è "ricco" rispetto a quanto il
# mercato si è effettivamente mosso.
# mercato si è effettivamente mosso. La modalità adattiva calcola
# la soglia come max(P_q rolling, iv_minus_rv_min) sulla storia
# disponibile in market_snapshots; altrimenti fallback alla
# soglia statica `iv_minus_rv_min`.
if entry_cfg.iv_minus_rv_filter_enabled and ctx.iv_minus_rv is not None:
if entry_cfg.iv_minus_rv_adaptive_enabled:
threshold = compute_adaptive_threshold(
history=ctx.iv_rv_history,
n_days=ctx.iv_rv_n_days,
percentile=entry_cfg.iv_minus_rv_percentile,
absolute_floor=entry_cfg.iv_minus_rv_min,
)
if threshold is not None and ctx.iv_minus_rv < threshold:
pct = int(entry_cfg.iv_minus_rv_percentile * 100)
reasons.append(
f"IV richness below P{pct} rolling "
f"(IV-RV={ctx.iv_minus_rv} < {threshold} vol pts)"
)
elif ctx.iv_minus_rv < entry_cfg.iv_minus_rv_min:
reasons.append(
f"IV richness below floor "
f"(IV-RV={ctx.iv_minus_rv} < {entry_cfg.iv_minus_rv_min} vol pts)"
)
# §4-quater roadmap: vol-of-vol guard. Blocca entry quando il
# regime di volatilità sta cambiando bruscamente, anche se IV-RV
# è alto. Fail-open su gap dati 24h fa.
if (
entry_cfg.iv_minus_rv_filter_enabled
and ctx.iv_minus_rv is not None
and ctx.iv_minus_rv < entry_cfg.iv_minus_rv_min
entry_cfg.vol_of_vol_guard_enabled
and ctx.dvol_24h_ago is not None
):
reasons.append(
f"IV richness below floor "
f"(IV-RV={ctx.iv_minus_rv} < {entry_cfg.iv_minus_rv_min} vol pts)"
)
delta = abs(ctx.dvol_now - ctx.dvol_24h_ago)
if delta >= entry_cfg.vol_of_vol_threshold_pt:
reasons.append(
f"DVOL shifted {delta} pt in {entry_cfg.vol_of_vol_lookback_hours}h "
f"(threshold {entry_cfg.vol_of_vol_threshold_pt})"
)
return EntryDecision(accepted=not reasons, reasons=reasons)
@@ -16,6 +16,7 @@ from __future__ import annotations
import os
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from pathlib import Path
import pandas as pd
@@ -23,6 +24,7 @@ import plotly.graph_objects as go
import streamlit as st
from cerbero_bite.config.loader import load_strategy
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.gui.data_layer import (
DEFAULT_DB_PATH,
humanize_dt,
@@ -242,6 +244,132 @@ def _render_metric(spec: MetricSpec, records: list[MarketSnapshotRecord]) -> Non
_percentiles_strip(s)
def _render_adaptive_gate_panel(
strategy: object | None,
records: list[MarketSnapshotRecord],
) -> None:
"""Pannello informativo sul gate IV-RV adattivo (read-only)."""
if strategy is None:
return
try:
entry = strategy.entry # type: ignore[attr-defined]
except AttributeError:
return
if not getattr(entry, "iv_minus_rv_filter_enabled", False):
st.subheader("🎯 Gate IV-RV adattivo")
st.info("Gate IV-RV disabilitato nel profilo corrente.")
st.divider()
return
st.subheader("🎯 Gate IV-RV adattivo")
# records DESC (newest first) → history ASC con NULL/fetch_ok=0 esclusi
iv_rv_history: list[Decimal] = []
distinct_days: set[str] = set()
for r in reversed(records):
if r.fetch_ok and r.iv_minus_rv is not None:
iv_rv_history.append(r.iv_minus_rv)
distinct_days.add(r.timestamp.date().isoformat())
n_ticks = len(iv_rv_history)
n_days = len(distinct_days)
target = int(getattr(entry, "iv_minus_rv_window_target_days", 60))
min_days = int(getattr(entry, "iv_minus_rv_window_min_days", 30))
if n_days < 1:
status = "🟡 Warmup hard (nessun giorno coperto)"
elif n_days < min_days:
status = f"🟡 Warmup ({n_days}/{min_days}g — finestra crescente)"
elif n_days < target:
status = f"🟢 Attivo (finestra {min_days}g, target {target}g)"
else:
status = f"🟢 Attivo (finestra stabile {target}g)"
st.markdown(f"**Status:** {status} · {n_ticks} tick complessivi")
# Latest tick
iv_rv_now: Decimal | None = None
dvol_now: Decimal | None = None
latest_ts: datetime | None = None
for r in records: # records DESC
if r.fetch_ok:
iv_rv_now = r.iv_minus_rv
dvol_now = r.dvol
latest_ts = r.timestamp
break
adaptive_on = bool(getattr(entry, "iv_minus_rv_adaptive_enabled", False))
floor = Decimal(str(getattr(entry, "iv_minus_rv_min", "0")))
if adaptive_on:
percentile = Decimal(
str(getattr(entry, "iv_minus_rv_percentile", "0.25"))
)
try:
threshold = compute_adaptive_threshold(
history=iv_rv_history,
n_days=n_days,
percentile=percentile,
absolute_floor=floor,
)
except ValueError as exc:
st.warning(f"Configurazione gate non valida: {exc}")
threshold = None
c1, c2, c3 = st.columns(3)
pct_label = int(percentile * 100)
c1.metric(
f"Soglia P{pct_label} rolling",
f"{threshold:.2f}" if threshold is not None else "",
help="Soglia adattiva = max(percentile, floor)",
)
c2.metric(
"IV-RV ultimo tick",
f"{iv_rv_now:.2f}" if iv_rv_now is not None else "",
)
c3.metric("Floor assoluto", f"{floor:.2f}")
if threshold is not None and iv_rv_now is not None:
verdict = "✅ PASS" if iv_rv_now >= threshold else "❌ SKIP"
st.markdown(f"**Decisione hypothetical:** {verdict}")
else:
st.write(f"Modalità statica: floor = {floor} vol pts")
if bool(getattr(entry, "vol_of_vol_guard_enabled", False)):
st.markdown("---")
st.markdown("**Vol-of-Vol guard**")
threshold_pt = Decimal(
str(getattr(entry, "vol_of_vol_threshold_pt", "5"))
)
lookback_h = int(getattr(entry, "vol_of_vol_lookback_hours", 24))
# Find tick closest to latest_ts - lookback hours, tolerance 15 min
dvol_lookback: Decimal | None = None
if latest_ts is not None and dvol_now is not None:
target_ts = latest_ts - timedelta(hours=lookback_h)
best_delta = timedelta(minutes=15)
for r in records:
if not r.fetch_ok or r.dvol is None:
continue
d = abs(r.timestamp - target_ts)
if d <= best_delta:
best_delta = d
dvol_lookback = r.dvol
if dvol_lookback is not None and dvol_now is not None:
delta = abs(dvol_now - dvol_lookback)
c1, c2 = st.columns(2)
c1.metric(f"|ΔDVOL {lookback_h}h|", f"{delta:.2f}")
c2.metric("Soglia VoV", f"{threshold_pt:.2f}")
verdict = "✅ PASS" if delta < threshold_pt else "❌ SKIP"
st.markdown(f"**Verdict:** {verdict}")
else:
st.info(f"Lookback {lookback_h}h non disponibile (gap dati).")
st.divider()
def render() -> None:
st.title("📐 Calibrazione")
st.caption(
@@ -313,6 +441,8 @@ def render() -> None:
specs = _metric_specs(strategy)
_render_adaptive_gate_panel(strategy, records)
for spec in specs:
_render_metric(spec, records)
st.divider()
+91 -25
View File
@@ -435,7 +435,7 @@ def _compute_pl(
- ``A`` (delta dinamico, §3.2): +1.5 pp win-rate, sl_loss × 0.95.
- ``D`` (vol-harvest, §7-bis): 5% delle would-be-loss diventano
harvest exit a +0.20 × credito.
- ``F`` (auto-pause, §7-bis): 8% trade/anno (skip-week dopo
- ``F`` (auto-pause, §7-bis): 8% trade/anno (skip-day dopo
streak), e nei calcoli di drawdown atteso il streak_99 è
cappato a lookback_trades=5.
@@ -476,7 +476,11 @@ def _compute_pl(
cap_pertrade_usd = caps["cap_pertrade_eur"] * eur_to_usd
risk_target = min(caps["kelly"] * capital, cap_pertrade_usd)
n_kelly = int(risk_target // width) if width > 0 else 0
# Max loss per contratto = width credit (NON width). Su un put
# spread incassi `credit` upfront, quindi la perdita massima è la
# larghezza meno il credito (vedi core/sizing_engine.py).
max_loss_per_contract = max(width - credit, 1e-6)
n_kelly = int(risk_target // max_loss_per_contract)
n_per_trade = max(0, min(n_kelly, int(caps["max_n"])))
prob_time_stop = 0.07
@@ -670,33 +674,81 @@ def _render_pl_panel(
"""Pannello P/L: confronto Conservativa vs Aggressiva sugli stessi slider."""
st.subheader("💰 P/L atteso — Conservativa vs Aggressiva")
st.caption(
"Stessi slider, due profili di sizing. **Conservativa** = la "
"golden config attuale (`strategy.yaml`). **Aggressiva** = "
"`strategy.aggressiva.yaml` con cap_per_trade 4×, max contratti "
"4×, 2 posizioni concorrenti. Le regole §2-§9 sono identiche; "
"cambiano SOLO le leve di sizing — quello che il P/L "
"conservativo lascia sul tavolo."
"Slider parametrici: scegli **cap per trade** e **posizioni "
"concorrenti**, il capitale richiesto viene calcolato in "
"automatico (Kelly-binding × concurrency / kelly_fraction). "
"Conservativa e Aggressiva ereditano dai rispettivi yaml SOLO "
"le leve qualitative (width_pct, credit_ratio, kelly_fraction, "
"feature attive); le leve di sizing (cap, concorrenza) le "
"controlli qui sotto."
)
col_a, col_b, col_c, col_d = st.columns(4)
capital = col_a.slider(
"Capitale (USD)", 720, 50_000, value=10_000, step=100
col_a, col_b, col_c, col_d, col_e = st.columns(5)
cap_per_trade_eur = col_a.slider(
"Cap/trade (EUR)", 50, 2000, value=200, step=10,
help="Massima perdita per singolo trade. Bound al rischio.",
)
spot = col_b.slider("Spot ETH (USD)", 1500, 6000, value=3000, step=100)
win_rate = col_c.slider(
concurrency_override = col_b.slider(
"Pos. concorrenti", 1, 10, value=3, step=1,
help="Quanti trade simultanei. Cap aggregato = cap/trade × N.",
)
spot = col_c.slider("Spot ETH (USD)", 1500, 6000, value=3000, step=100)
win_rate = col_d.slider(
"Win rate atteso", 0.50, 0.90, value=0.75, step=0.01,
help=(
"Senza filtri quant ≈ 0.650.70. CON filtri (dealer gamma>0, "
"no macro, IVRV>0, liquidation_*_risk≠high) sale a 0.750.80."
),
)
trades_per_year = col_d.slider(
"Trade / anno (post-filtri)", 8, 30, value=18, step=1,
help="52 lunedì × probabilità di superare i filtri (3050%).",
trades_per_year = col_e.slider(
"Trade / anno (post-filtri)", 20, 200, value=110, step=5,
help=(
"Crypto è 24/7: l'entry cycle gira ogni giorno alle 14:00 UTC "
"(`0 14 * * *`). 365 candidature × ~30-50% pass-rate effettivo "
"(post-filtri + cap concorrenza) ≈ 110-180/anno. Auto-pause F "
"riduce ulteriormente di ~8% in regime drawdown."
),
)
cons_caps = _profile_caps(strategy_conservativa or strategy_main)
aggr_caps = _profile_caps(strategy_aggressiva)
# Override sizing dai slider (sostituisce le leve cap/trade,
# cap_aggregate, max_concurrent dei yaml).
eur_to_usd = 1.075
cap_pertrade_usd = cap_per_trade_eur * eur_to_usd
cap_aggregate_override = float(cap_per_trade_eur * concurrency_override)
cons_caps = {
**cons_caps,
"cap_pertrade_eur": float(cap_per_trade_eur),
"cap_aggregate_eur": cap_aggregate_override,
"max_concurrent": float(concurrency_override),
}
aggr_caps = {
**aggr_caps,
"cap_pertrade_eur": float(cap_per_trade_eur),
"cap_aggregate_eur": cap_aggregate_override,
"max_concurrent": float(concurrency_override),
}
# Capitale richiesto: Kelly-binding aggregato.
# Per ogni trade slot, kelly × capital ≥ cap_pertrade_usd → capital
# ≥ cap_pertrade_usd / kelly. Per N concorrenti, scala linearmente
# come limite conservativo del notional cumulato.
kelly_cons = cons_caps.get("kelly", 0.13)
kelly_aggr = aggr_caps.get("kelly", 0.13)
capital_cons = int(
cap_pertrade_usd * concurrency_override / max(kelly_cons, 1e-3)
)
capital_aggr = int(
cap_pertrade_usd * concurrency_override / max(kelly_aggr, 1e-3)
)
capital = max(capital_cons, capital_aggr)
cap_col1, cap_col2, cap_col3 = st.columns(3)
cap_col1.metric("📊 Capitale richiesto", f"${capital:,}")
cap_col2.metric(
"💸 Cap aggregato (notional)",
f"${int(cap_pertrade_usd * concurrency_override):,}",
)
cap_col3.metric("🎯 Cap per trade (USD)", f"${int(cap_pertrade_usd):,}")
cons_feats = _detect_features(strategy_conservativa or strategy_main)
aggr_feats = _detect_features(strategy_aggressiva)
@@ -746,13 +798,17 @@ def _render_pl_panel(
features=feats_aggr,
)
cons_version = getattr(
strategy_conservativa or strategy_main, "config_version", "?"
)
aggr_version = getattr(strategy_aggressiva, "config_version", "?")
col_cons, col_aggr = st.columns(2)
with col_cons:
_render_profile_card(
"🛡️ Conservativa",
cons_caps,
cons,
"_(golden config v1.2.0)_",
f"_(golden config v{cons_version})_",
features=feats_cons,
metrics_base=cons_base if apply_features and any(feats_cons.values()) else None,
)
@@ -761,7 +817,7 @@ def _render_pl_panel(
"🔥 Aggressiva",
aggr_caps,
aggr,
"_(deroga §11, richiede paper trading)_",
f"_(v{aggr_version} · deroga §11, richiede paper trading)_",
features=feats_aggr,
metrics_base=aggr_base if apply_features and any(feats_aggr.values()) else None,
)
@@ -774,14 +830,24 @@ def _render_pl_panel(
"APR). Drawdown atteso scala con lo stesso fattore."
)
if win_rate < 0.72:
if cons["annual_pl"] < 0 and aggr["annual_pl"] < 0:
st.error(
"**Win rate sotto 0.72: entrambi i profili perdono soldi.** "
"Selling vol nudo è strutturalmente neutro qui. L'edge della "
"strategia sono i FILTRI (dealer gamma>0, no macro, "
"liquidation≠high, bias chiaro) che alzano il win rate sopra "
"il 0.75. Senza filtri attivi nessuno dei due profili è "
"viable."
f"**Entrambi i profili in perdita** (cons {cons['apr']:+.1%}, "
f"aggr {aggr['apr']:+.1%} APR). Selling vol nudo a win rate "
f"{win_rate:.0%} è strutturalmente non profittevole. L'edge "
"sono i FILTRI (dealer gamma>0, no macro, liquidation≠high, "
"bias chiaro) e i miglioramenti F+D+A+IV-RV gate, che alzano "
"il win rate effettivo sopra ~0.75 e/o riducono i tail loss. "
"Spunta l'opzione 'Applica gli effetti dei miglioramenti' qui "
"sopra per vedere i numeri con i filtri attivi."
)
elif cons["annual_pl"] < 0:
st.warning(
f"**Conservativo in perdita** ({cons['apr']:+.1%} APR), "
f"aggressivo positivo ({aggr['apr']:+.1%} APR). I miglioramenti "
"F+D+A+IV-RV gate stanno facendo il loro lavoro sull'aggressivo. "
"Sul conservativo i cap stretti riducono troppo il P/L atteso "
"a questo win rate."
)
# === Mini-tabella: contributo marginale di ogni feature =====
+6 -6
View File
@@ -84,15 +84,15 @@ def is_paused(
)
def pause_until(now: datetime, weeks: int) -> datetime:
"""Calcola la scadenza della pausa (``now + weeks``).
def pause_until(now: datetime, days: int) -> datetime:
"""Calcola la scadenza della pausa (``now + days``).
Estratto in funzione separata per facilitare i test e per ricordare
che la pausa è espressa in **settimane** (la strategia ha cron
settimanale; pause più corte non avrebbero modo di evitare una
settimana di entry).
che la pausa è espressa in **giorni** (la strategia ha cron
giornaliero su crypto 24/7; pause sub-giornaliere non avrebbero
modo di evitare un'entry).
"""
return now + timedelta(weeks=max(1, weeks))
return now + timedelta(days=max(1, days))
def evaluate_drawdown_breach(
+107 -7
View File
@@ -1,9 +1,11 @@
"""Weekly entry decision loop (``docs/06-operational-flow.md`` §2).
"""Daily entry decision loop (``docs/06-operational-flow.md`` §2).
Pure orchestration over the existing core/clients/state primitives.
The cycle is auto-execute: when every gate passes, the engine sends
the combo order without asking Adriano. Telegram is used only to
notify the outcome.
Crypto è 24/7: la cadenza di candidatura non è gateata sulla
settimana, sono i gate quantitativi a decidere se entrare o saltare
il giorno. Pure orchestration over the existing core/clients/state
primitives. The cycle is auto-execute: when every gate passes, the
engine sends the combo order without asking Adriano. Telegram is
used only to notify the outcome.
"""
from __future__ import annotations
@@ -28,6 +30,7 @@ from cerbero_bite.clients.macro import MacroClient
from cerbero_bite.clients.portfolio import PortfolioClient
from cerbero_bite.clients.sentiment import SentimentClient
from cerbero_bite.config.schema import StrategyConfig
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.core.combo_builder import ComboProposal, build, select_strikes
from cerbero_bite.core.entry_validator import (
EntryContext,
@@ -313,6 +316,51 @@ async def _build_quotes(
return out
def _select_window_days(entry_cfg: object, n_days: int) -> int:
"""Sceglie la finestra in giorni per il gate adattivo dato n_days
disponibili.
Spec: warmup hard se ``n_days == 0`` 0; finestra ``target_days``
se ``n_days >= target_days``; ``min_days`` se ``n_days >= min_days``;
altrimenti tutta la storia disponibile (capped a ``target_days``).
"""
target = int(getattr(entry_cfg, "iv_minus_rv_window_target_days", 60))
min_days = int(getattr(entry_cfg, "iv_minus_rv_window_min_days", 30))
if n_days < 1:
return 0
if n_days >= target:
return target
if n_days >= min_days:
return min_days
return target # storia parziale: query fino a target, repository ne ritorna n_days
def _audit_threshold(
entry_cfg: object,
iv_rv_history: tuple[Decimal, ...],
n_days: int,
) -> str | None:
"""Soglia P_q rolling effettivamente usata dal gate, per il decisions log."""
if not getattr(entry_cfg, "iv_minus_rv_filter_enabled", False):
return None
if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False):
return str(getattr(entry_cfg, "iv_minus_rv_min", Decimal("0")))
threshold = compute_adaptive_threshold(
history=iv_rv_history,
n_days=n_days,
percentile=entry_cfg.iv_minus_rv_percentile, # type: ignore[attr-defined]
absolute_floor=entry_cfg.iv_minus_rv_min, # type: ignore[attr-defined]
)
return None if threshold is None else str(threshold)
def _audit_window_days(entry_cfg: object, n_days: int) -> int | None:
"""Numero di giorni effettivamente usati dalla finestra rolling."""
if not getattr(entry_cfg, "iv_minus_rv_adaptive_enabled", False):
return None
return _select_window_days(entry_cfg, n_days)
def _max_loss_per_contract_usd(short_strike: Decimal, long_strike: Decimal) -> Decimal:
return (short_strike - long_strike).copy_abs()
@@ -328,7 +376,7 @@ async def run_entry_cycle(
eur_to_usd_rate: Decimal,
now: datetime | None = None,
) -> EntryCycleResult:
"""Run one weekly entry evaluation cycle.
"""Run one daily entry evaluation cycle.
The function is idempotent and side-effect aware: it persists the
decision in the ``decisions`` table regardless of outcome and only
@@ -406,7 +454,7 @@ async def run_entry_cycle(
capital_usd=capital_usd,
)
if breach.should_pause:
until = auto_pause_module.pause_until(when, auto_cfg.pause_weeks)
until = auto_pause_module.pause_until(when, auto_cfg.pause_days)
conn = connect_state(ctx.db_path)
try:
with transaction(conn):
@@ -427,6 +475,44 @@ async def run_entry_cycle(
)
# 2. Entry filters
entry_cfg = cfg.entry
asset = cfg.asset.symbol
iv_rv_history: tuple[Decimal, ...] = ()
iv_rv_n_days: int = 0
dvol_24h_ago: Decimal | None = None
if entry_cfg.iv_minus_rv_filter_enabled and entry_cfg.iv_minus_rv_adaptive_enabled:
conn = connect_state(ctx.db_path)
try:
iv_rv_n_days = ctx.repository.count_iv_rv_distinct_days(
conn,
asset=asset,
max_days=entry_cfg.iv_minus_rv_window_target_days,
as_of=when,
)
window_days = _select_window_days(entry_cfg, iv_rv_n_days)
if window_days > 0:
iv_rv_history = tuple(
ctx.repository.iv_rv_values_for_window(
conn,
asset=asset,
window_days=window_days,
as_of=when,
)
)
finally:
conn.close()
if entry_cfg.vol_of_vol_guard_enabled:
conn = connect_state(ctx.db_path)
try:
dvol_24h_ago = ctx.repository.dvol_lookback(
conn,
asset=asset,
reference=when - timedelta(hours=entry_cfg.vol_of_vol_lookback_hours),
)
finally:
conn.close()
entry_ctx = EntryContext(
capital_usd=capital_usd,
dvol_now=snap.dvol,
@@ -437,6 +523,9 @@ async def run_entry_cycle(
dealer_net_gamma=snap.dealer_net_gamma,
iv_minus_rv=snap.iv_minus_rv,
liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high,
iv_rv_history=iv_rv_history,
iv_rv_n_days=iv_rv_n_days,
dvol_24h_ago=dvol_24h_ago,
)
decision = validate_entry(entry_ctx, cfg)
inputs = {
@@ -456,6 +545,17 @@ async def run_entry_cycle(
"iv_minus_rv": (
str(snap.iv_minus_rv) if snap.iv_minus_rv is not None else None
),
"iv_rv_history_n": len(iv_rv_history),
"iv_rv_n_days": iv_rv_n_days,
"iv_rv_threshold_used": _audit_threshold(
entry_cfg, iv_rv_history, iv_rv_n_days
),
"iv_rv_window_used_days": _audit_window_days(
entry_cfg, iv_rv_n_days
),
"dvol_24h_ago": (
str(dvol_24h_ago) if dvol_24h_ago is not None else None
),
}
}
if not decision.accepted:
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Any
from cerbero_bite.clients._exceptions import McpError
from cerbero_bite.state import connect, transaction
from cerbero_bite.state.models import MarketSnapshotRecord
from cerbero_bite.state.models import DvolSnapshot, MarketSnapshotRecord
if TYPE_CHECKING:
from cerbero_bite.runtime.dependencies import RuntimeContext
@@ -181,6 +181,21 @@ async def collect_market_snapshot(
try:
with transaction(conn):
ctx.repository.record_market_snapshot(conn, record)
# Mirror ETH spot+DVOL into dvol_history so monitor_cycle's
# return_4h lookup has local samples even in data-only mode.
if (
record.asset == "ETH"
and record.spot is not None
and record.dvol is not None
):
ctx.repository.record_dvol_snapshot(
conn,
DvolSnapshot(
timestamp=record.timestamp,
dvol=record.dvol,
eth_spot=record.spot,
),
)
finally:
conn.close()
persisted += 1
@@ -1,10 +1,11 @@
"""Periodic option-chain snapshot collector (§13).
Fetches the Deribit option chain for every strike entro la finestra
DTE configurata, prima del trigger entry settimanale (cron
``55 13 * * MON`` di default). Persiste un quote per ogni strumento
in ``option_chain_snapshots`` con un timestamp condiviso, che diventa
il dato di base per:
DTE configurata. Cadenza di default ``*/15 * * * *`` (allineata a
``market_snapshot``: crypto è 24/7 e l'accumulo dataset deve essere
continuo, non gateato sui rollover TradFi-style). Persiste un quote
per ogni strumento in ``option_chain_snapshots`` con un timestamp
condiviso, che diventa il dato di base per:
* il backtest non-stilizzato (vedi ``core/backtest.py``),
* la calibrazione empirica dello skew premium e del credit/width
+5 -6
View File
@@ -50,13 +50,13 @@ _log = logging.getLogger("cerbero_bite.runtime.orchestrator")
Environment = Literal["testnet", "mainnet"]
# Default cron schedule (matches docs/06-operational-flow.md table).
_CRON_ENTRY = "0 14 * * MON"
_CRON_ENTRY = "0 14 * * *" # crypto 24/7: candidatura giornaliera; i gate decidono se entrare
_CRON_MONITOR = "0 2,14 * * *"
_CRON_HEALTH = "*/5 * * * *"
_CRON_BACKUP = "0 * * * *"
_CRON_MANUAL_ACTIONS = "*/1 * * * *"
_CRON_MARKET_SNAPSHOT = "*/15 * * * *"
_CRON_OPTION_CHAIN_SNAPSHOT = "55 13 * * MON" # 5 min prima del trigger entry
_CRON_OPTION_CHAIN_SNAPSHOT = "*/15 * * * *" # crypto è 24/7: cadenza continua allineata a market_snapshot
_BACKUP_RETENTION_DAYS = 30
@@ -222,7 +222,7 @@ class Orchestrator:
market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT,
market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS,
option_chain_cron: str = _CRON_OPTION_CHAIN_SNAPSHOT,
option_chain_asset: str = "ETH",
option_chain_assets: tuple[str, ...] = ("ETH", "BTC"),
backup_dir: Path | None = None,
backup_retention_days: int = _BACKUP_RETENTION_DAYS,
) -> AsyncIOScheduler:
@@ -290,9 +290,8 @@ class Orchestrator:
async def _option_chain_snapshot() -> None:
async def _do() -> None:
await collect_option_chain_snapshot(
self._ctx, asset=option_chain_asset
)
for asset in option_chain_assets:
await collect_option_chain_snapshot(self._ctx, asset=asset)
await _safe("option_chain_snapshot", _do)
@@ -1,10 +1,10 @@
-- 0004_option_chain_snapshots.sql — catena opzioni storica
-- 0005_option_chain_snapshots.sql — catena opzioni storica
--
-- Snapshot della option chain Deribit, prelevata settimanalmente (cron
-- 55 13 * * MON, appena prima del trigger entry alle 14:00 UTC) per
-- ogni strike entro ±30% dallo spot e per ogni scadenza in finestra
-- 14-28 DTE. Dato di base per il backtest non-stilizzato e per
-- calibrare empiricamente lo skew premium del modello BS.
-- Snapshot della option chain Deribit prelevata ogni 15 minuti (stesso
-- scheduler di market_snapshots, cron */15) per ogni strike entro ±30%
-- dallo spot e per ogni scadenza nella finestra 14-28 DTE. Dato di
-- base per il backtest non-stilizzato e per calibrare empiricamente
-- lo skew premium del modello BS.
--
-- Granularità: una riga per (snapshot_ts, instrument). Lo
-- snapshot_ts è il timestamp del cron tick — TUTTI i quote raccolti
+106 -1
View File
@@ -13,7 +13,7 @@ Decimals are stored as TEXT to preserve precision (see
from __future__ import annotations
import sqlite3
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import Any
from uuid import UUID
@@ -408,6 +408,111 @@ class Repository:
).fetchall()
return [_row_to_market_snapshot(r) for r in rows]
def count_iv_rv_distinct_days(
self,
conn: sqlite3.Connection,
*,
asset: str,
max_days: int,
as_of: datetime | None = None,
) -> int:
"""Numero di giorni di calendario distinti coperti da IV-RV validi.
Esclude righe con ``fetch_ok=0`` o ``iv_minus_rv IS NULL``.
Usato dal caller del gate adattivo per decidere la finestra
(warmup hard / min_days / target_days).
Args:
as_of: Reference time for the rolling window. Defaults to
``datetime.now(UTC)``.
"""
if max_days <= 0:
raise ValueError(f"max_days must be positive, got {max_days}")
ref = as_of if as_of is not None else datetime.now(UTC)
if ref.tzinfo is None:
raise ValueError("as_of must be timezone-aware")
cutoff = ref - timedelta(days=max_days)
row = conn.execute(
"SELECT COUNT(DISTINCT substr(timestamp, 1, 10)) AS n "
"FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND iv_minus_rv IS NOT NULL "
" AND timestamp >= ?",
(asset, _enc_dt(cutoff)),
).fetchone()
return int(row["n"]) if row is not None else 0
def iv_rv_values_for_window(
self,
conn: sqlite3.Connection,
*,
asset: str,
window_days: int,
as_of: datetime | None = None,
) -> list[Decimal]:
"""Valori IV-RV ordinati ASC su ``[as_of - window_days, as_of]``.
Esclude righe con ``fetch_ok=0`` o ``iv_minus_rv IS NULL``.
Tutti i record validi della finestra concorrono come singoli
contributi alla statistica del percentile, indipendentemente
dalla cadenza con cui sono stati raccolti (tick live vs backfill
daily).
"""
if window_days <= 0:
raise ValueError(f"window_days must be positive, got {window_days}")
ref = as_of if as_of is not None else datetime.now(UTC)
if ref.tzinfo is None:
raise ValueError("as_of must be timezone-aware")
cutoff = ref - timedelta(days=window_days)
rows = conn.execute(
"SELECT iv_minus_rv FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND iv_minus_rv IS NOT NULL "
" AND timestamp >= ? "
"ORDER BY timestamp ASC",
(asset, _enc_dt(cutoff)),
).fetchall()
return [Decimal(str(r["iv_minus_rv"])) for r in rows]
def dvol_lookback(
self,
conn: sqlite3.Connection,
*,
asset: str,
reference: datetime,
tolerance_minutes: int = 15,
) -> Decimal | None:
"""DVOL al tick più vicino a `reference`, entro ±tolerance_minutes.
Ritorna ``None`` se non esiste un tick valido (``fetch_ok=1``,
``dvol IS NOT NULL``) entro la tolerance. Usato dal Vol-of-Vol
guard per stimare DVOL N ore fa.
"""
if reference.tzinfo is None:
raise ValueError("reference must be timezone-aware")
ref_lo = (reference - timedelta(minutes=tolerance_minutes)).astimezone(UTC)
ref_hi = (reference + timedelta(minutes=tolerance_minutes)).astimezone(UTC)
row = conn.execute(
"SELECT dvol, timestamp FROM market_snapshots "
"WHERE asset = ? "
" AND fetch_ok = 1 "
" AND dvol IS NOT NULL "
" AND timestamp >= ? "
" AND timestamp <= ? "
"ORDER BY ABS(julianday(timestamp) - julianday(?)) ASC LIMIT 1",
(
asset,
_enc_dt(ref_lo),
_enc_dt(ref_hi),
_enc_dt(reference),
),
).fetchone()
if row is None:
return None
return Decimal(str(row["dvol"]))
# ------------------------------------------------------------------
# option_chain_snapshots
# ------------------------------------------------------------------
+21 -8
View File
@@ -28,8 +28,8 @@
# 2× via "ETH + BTC" indicato in `📚 Strategia` è una **stima ex-ante**
# di cosa otterresti DOPO quel lavoro di codice.
config_version: "1.3.0-aggressiva"
config_hash: "e983e156bf0c270941765e7b9639a35fdc6de7b091076bf5a9b360e294e81e4c"
config_version: "1.4.0-aggressiva"
config_hash: "7fa9b0be5b56517293421bc19838b700da595725360fe018a1be13b802dea859"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -38,7 +38,7 @@ asset:
exchange: "deribit"
entry:
cron: "0 14 * * MON"
cron: "0 14 * * *"
skip_holidays_country: "IT"
capital_min_usd: "2880" # 4× del minimo conservativo (720)
@@ -65,9 +65,22 @@ entry:
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
# IV richness gate (§2.9) — abilitato a 3 pt vol per profilo aggressivo.
iv_minus_rv_min: "3"
# IV richness gate (§2.9). In Aggressiva il gate è in modalità
# adattiva: la soglia è il P25 rolling sui market_snapshots
# (warmup: usa la storia disponibile finché < 30g, poi finestra 30g
# fino a 60g, poi fissa 60g). `iv_minus_rv_min: 0` = floor zero,
# lascia decidere al P25.
iv_minus_rv_filter_enabled: true
iv_minus_rv_adaptive_enabled: true
iv_minus_rv_min: "0"
iv_minus_rv_percentile: "0.25"
iv_minus_rv_window_target_days: 60
iv_minus_rv_window_min_days: 30
# Vol-of-Vol guard: blocca entry su shift bruschi DVOL.
vol_of_vol_guard_enabled: true
vol_of_vol_threshold_pt: "5"
vol_of_vol_lookback_hours: 24
structure:
@@ -108,8 +121,8 @@ sizing:
# Le tre leve dominanti:
cap_per_trade_eur: "800" # era 200 → 4×
cap_aggregate_open_eur: "3200" # era 1000 → 4× (proporzionato a 2 posizioni × cap_per_trade × 2 ruote)
max_concurrent_positions: 2 # era 1
cap_aggregate_open_eur: "6400" # 8 posizioni concorrenti × 800 cap_per_trade (entry daily)
max_concurrent_positions: 8 # era 2 (entry daily × DTE 14-21 × pass-rate)
max_contracts_per_trade: 16 # era 4 → 4×
dvol_adjustment:
@@ -150,7 +163,7 @@ auto_pause:
enabled: true
lookback_trades: 5
max_drawdown_pct: "0.15"
pause_weeks: 2
pause_days: 14
execution:
environment: "testnet"
+6 -6
View File
@@ -15,8 +15,8 @@
# cerbero-bite config hash --file strategy.conservativa.yaml
# e bumpare config_version.
config_version: "1.3.0-conservativa"
config_hash: "900646beb1dd0a7bfaf553f76adb4b55004eff1f094585f779302131625919e8"
config_version: "1.4.0-conservativa"
config_hash: "b6af7b041508a67846eba5985e27e655526fe89105653f86bc88b8a4a437ac3a"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -25,7 +25,7 @@ asset:
exchange: "deribit"
entry:
cron: "0 14 * * MON"
cron: "0 14 * * *"
skip_holidays_country: "IT"
capital_min_usd: "720"
@@ -84,8 +84,8 @@ sizing:
kelly_fraction: "0.13"
cap_per_trade_eur: "200"
cap_aggregate_open_eur: "1000"
max_concurrent_positions: 1
cap_aggregate_open_eur: "1000" # 3 posizioni × ~200 + headroom (entry daily, profilo conservativo)
max_concurrent_positions: 3
max_contracts_per_trade: 4
@@ -119,7 +119,7 @@ auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_weeks: 2
pause_days: 14
execution:
environment: "testnet"
+5 -5
View File
@@ -6,8 +6,8 @@
# config hash), and lands as a separate commit with the motivation in
# the commit message.
config_version: "1.3.0"
config_hash: "178a87467707d54d1ffef2d585a3a01be54de5ccc7e23493356eac47fd1c24d8"
config_version: "1.4.0"
config_hash: "22182814216190331e0b69b3bc99493e6d69cc813f7ed937394986eecc1f5d11"
last_review: "2026-04-26"
last_reviewer: "Adriano"
@@ -16,7 +16,7 @@ asset:
exchange: "deribit"
entry:
cron: "0 14 * * MON"
cron: "0 14 * * *"
skip_holidays_country: "IT"
capital_min_usd: "720"
@@ -81,7 +81,7 @@ sizing:
cap_per_trade_eur: "200"
cap_aggregate_open_eur: "1000"
max_concurrent_positions: 1
max_concurrent_positions: 5
max_contracts_per_trade: 4
@@ -121,7 +121,7 @@ auto_pause:
enabled: false
lookback_trades: 5
max_drawdown_pct: "0.10"
pause_weeks: 2
pause_days: 14
execution:
environment: "testnet" # testnet|mainnet — kill switch on broker mismatch
@@ -0,0 +1,136 @@
"""End-to-end test del gate IV-RV adattivo + Vol-of-Vol guard via Repository.
Verifica che la nuova API distinct-days componga correttamente repository
helpers + ``compute_adaptive_threshold``.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
from cerbero_bite.state.db import connect, run_migrations
from cerbero_bite.state.models import MarketSnapshotRecord
from cerbero_bite.state.repository import Repository
def _seed_history(
conn,
repo: Repository,
asset: str,
base: datetime,
n_ticks: int,
iv_rv_value: Decimal,
dvol_value: Decimal,
) -> None:
for i in range(n_ticks):
repo.record_market_snapshot(
conn,
MarketSnapshotRecord(
timestamp=base + timedelta(minutes=15 * i),
asset=asset,
spot=Decimal("2000"),
dvol=dvol_value,
realized_vol_30d=Decimal("48"),
iv_minus_rv=iv_rv_value,
funding_perp_annualized=Decimal("0"),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("0"),
gamma_flip_level=None,
oi_delta_pct_4h=None,
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=None,
fetch_ok=True,
fetch_errors_json=None,
),
)
conn.commit()
@pytest.fixture
def db_30d(tmp_path):
"""30 giorni di storia con IV-RV bimodale: prima metà 1.0, seconda metà 5.0."""
db_path = tmp_path / "e2e.sqlite"
conn = connect(str(db_path))
run_migrations(conn)
repo = Repository()
base = datetime(2026, 4, 1, 0, 0, tzinfo=UTC)
_seed_history(conn, repo, "ETH", base, 1440, Decimal("1.0"), Decimal("50"))
_seed_history(
conn,
repo,
"ETH",
base + timedelta(days=15),
1440,
Decimal("5.0"),
Decimal("50"),
)
return conn, repo
def test_distinct_days_count_matches_calendar_days(db_30d) -> None:
"""30 giorni di calendario seedati → COUNT DISTINCT = 30."""
conn, repo = db_30d
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 1, 0, 0, tzinfo=UTC),
)
assert n == 30
def test_window_values_returned_for_full_history(db_30d) -> None:
conn, repo = db_30d
values = repo.iv_rv_values_for_window(
conn,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 1, 0, 0, tzinfo=UTC),
)
assert len(values) == 2880
# Bimodale: 1440 valori 1.0 e 1440 valori 5.0
assert sum(1 for v in values if v == Decimal("1.0")) == 1440
assert sum(1 for v in values if v == Decimal("5.0")) == 1440
def test_p25_of_bimodal_history_picks_low_regime(db_30d) -> None:
"""Comporre repository + adaptive_threshold come fa entry_cycle."""
conn, repo = db_30d
as_of = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
n_days = repo.count_iv_rv_distinct_days(
conn, asset="ETH", max_days=60, as_of=as_of
)
values = repo.iv_rv_values_for_window(
conn, asset="ETH", window_days=60, as_of=as_of
)
threshold = compute_adaptive_threshold(
history=values,
n_days=n_days,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# P25 di 2880 valori bimodali: 1440 ×1.0, 1440 ×5.0 → soglia = 1.0
assert threshold == Decimal("1.0")
def test_dvol_lookback_within_tolerance(db_30d) -> None:
conn, repo = db_30d
base = datetime(2026, 4, 1, 0, 0, tzinfo=UTC)
out = repo.dvol_lookback(conn, asset="ETH", reference=base + timedelta(hours=24))
assert out == Decimal("50")
def test_dvol_lookback_returns_none_outside_tolerance(db_30d) -> None:
conn, repo = db_30d
out = repo.dvol_lookback(
conn,
asset="ETH",
reference=datetime(2025, 1, 1, tzinfo=UTC),
tolerance_minutes=15,
)
assert out is None
+170
View File
@@ -0,0 +1,170 @@
"""TDD per :mod:`cerbero_bite.core.adaptive_threshold`.
Spec: ``docs/superpowers/specs/2026-05-08-iv-rv-adaptive-gate-design.md``.
La funzione è una pura statistica: riceve già la finestra di valori scelta
dal caller e il numero di giorni distinti coperti dalla storia disponibile
(``n_days``), e restituisce ``max(percentile, floor)`` o ``None`` durante
il warmup hard. La selezione della finestra (target_days vs min_days vs
intera storia) è responsabilità del caller (repository + entry_cycle).
"""
from __future__ import annotations
from decimal import Decimal
import pytest
from cerbero_bite.core.adaptive_threshold import compute_adaptive_threshold
# ---------------------------------------------------------------------------
# Warmup hard: nessun giorno disponibile
# ---------------------------------------------------------------------------
def test_n_days_zero_returns_none() -> None:
"""Storia vuota o nessun giorno coperto → warmup hard."""
out = compute_adaptive_threshold(
history=[],
n_days=0,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
def test_n_days_zero_with_values_still_returns_none() -> None:
"""Difensivo: se il caller passa n_days=0 ma valori non vuoti, warmup
hard vince comunque (gate disabilitato)."""
out = compute_adaptive_threshold(
history=[Decimal("3")] * 10,
n_days=0,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
def test_empty_history_with_positive_n_days_returns_none() -> None:
"""Difensivo: history vuota anche con n_days>0 → None."""
out = compute_adaptive_threshold(
history=[],
n_days=5,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out is None
# ---------------------------------------------------------------------------
# Calcolo percentile sulla finestra ricevuta
# ---------------------------------------------------------------------------
def test_n_days_one_returns_percentile_of_history() -> None:
"""Singolo giorno con tick a 15 min (96 valori): P25 standard."""
history = [Decimal(i) / Decimal("10") for i in range(96)] # 0.0..9.5
out = compute_adaptive_threshold(
history=history,
n_days=1,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# P25 di [0.0..9.5] passo 0.1 con method='linear': k=23.75, val ≈ 2.375
assert out is not None
assert Decimal("2.3") < out < Decimal("2.5")
def test_window_chosen_by_caller_is_used_verbatim() -> None:
"""La funzione NON fa slicing: usa esattamente la history ricevuta."""
history = [Decimal(i) for i in range(1, 201)] # 1..200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.5"),
absolute_floor=Decimal("0"),
)
# P50 di [1..200] = (200+1)/2 = 100.5
assert out is not None
assert Decimal("100") <= out <= Decimal("101")
def test_mixed_cadence_window_no_special_treatment() -> None:
"""Mix di valori (es. backfill daily + tick live) trattato come una
distribuzione qualunque: il caller ha già scelto la finestra; la
funzione calcola il percentile sui valori ricevuti uno-a-uno."""
# 30 valori "daily backfill" (uno per giorno) + 96 tick "live"
history = [Decimal("5")] * 30 + [Decimal("8")] * 96
out = compute_adaptive_threshold(
history=history,
n_days=31,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
# Sorted: 30 ×5, 96 ×8. P25 a indice 0.25*125 = 31.25 → tra 5 e 8.
# NumPy linear: sorted_v[31]=8, sorted_v[32]=8 → 8.
# Verifica solo l'estremo superiore della famiglia di valori sorted.
assert out is not None
assert out in (Decimal("5"), Decimal("8"))
# ---------------------------------------------------------------------------
# Floor binding
# ---------------------------------------------------------------------------
def test_floor_binding_overrides_low_percentile() -> None:
history = [Decimal("0.5")] * 200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.25"),
absolute_floor=Decimal("3"),
)
assert out == Decimal("3")
def test_floor_not_binding_returns_percentile() -> None:
history = [Decimal("5")] * 200
out = compute_adaptive_threshold(
history=history,
n_days=30,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
assert out == Decimal("5")
# ---------------------------------------------------------------------------
# Input validation
# ---------------------------------------------------------------------------
def test_invalid_percentile_above_one_raises() -> None:
with pytest.raises(ValueError, match="percentile must be in"):
compute_adaptive_threshold(
history=[Decimal("1")] * 200,
n_days=10,
percentile=Decimal("1.5"),
absolute_floor=Decimal("0"),
)
def test_invalid_percentile_negative_raises() -> None:
with pytest.raises(ValueError, match="percentile must be in"):
compute_adaptive_threshold(
history=[Decimal("1")] * 200,
n_days=10,
percentile=Decimal("-0.1"),
absolute_floor=Decimal("0"),
)
def test_invalid_negative_n_days_raises() -> None:
with pytest.raises(ValueError, match="n_days must be >= 0"):
compute_adaptive_threshold(
history=[Decimal("1")] * 10,
n_days=-1,
percentile=Decimal("0.25"),
absolute_floor=Decimal("0"),
)
+9 -9
View File
@@ -68,16 +68,16 @@ def test_is_paused_returns_false_when_until_in_past() -> None:
# ---------------------------------------------------------------------------
def test_pause_until_adds_weeks() -> None:
until = pause_until(_NOW, weeks=2)
assert until == _NOW + timedelta(weeks=2)
def test_pause_until_adds_days() -> None:
until = pause_until(_NOW, days=14)
assert until == _NOW + timedelta(days=14)
def test_pause_until_clamps_to_one_week_minimum() -> None:
# weeks <= 0 deve cmq dare almeno 1 settimana di pausa, altrimenti
# la cron settimanale potrebbe scattare comunque.
assert pause_until(_NOW, weeks=0) == _NOW + timedelta(weeks=1)
assert pause_until(_NOW, weeks=-3) == _NOW + timedelta(weeks=1)
def test_pause_until_clamps_to_one_day_minimum() -> None:
# days <= 0 deve cmq dare almeno 1 giorno di pausa, altrimenti
# la cron giornaliera potrebbe scattare comunque.
assert pause_until(_NOW, days=0) == _NOW + timedelta(days=1)
assert pause_until(_NOW, days=-3) == _NOW + timedelta(days=1)
# ---------------------------------------------------------------------------
@@ -90,7 +90,7 @@ def _cfg(**overrides: object) -> AutoPauseConfig:
"enabled": True,
"lookback_trades": 5,
"max_drawdown_pct": Decimal("0.10"),
"pause_weeks": 2,
"pause_days": 14,
}
base.update(overrides)
return AutoPauseConfig(**base) # type: ignore[arg-type]
+176
View File
@@ -0,0 +1,176 @@
"""TDD per il backfill IV-RV (``scripts/backfill_iv_rv.py``).
Testa solo la parte pura (compute RV + assemblaggio record). I/O HTTP
e SQLite restano nel main del CLI: testati manualmente al deploy.
"""
from __future__ import annotations
import importlib.util
import sys
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[2]
def _load_backfill_module() -> object:
"""Load scripts/backfill_iv_rv.py as a module without polluting sys.path."""
spec = importlib.util.spec_from_file_location(
"_cerbero_bite_backfill_iv_rv", REPO_ROOT / "scripts" / "backfill_iv_rv.py"
)
if spec is None or spec.loader is None:
raise RuntimeError("cannot load backfill_iv_rv module")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
@pytest.fixture(scope="module")
def mod():
return _load_backfill_module()
# ---------------------------------------------------------------------------
# compute_rv30d_annualized
# ---------------------------------------------------------------------------
def test_constant_prices_yield_zero_rv(mod) -> None:
closes = [Decimal("100")] * 31 # 30 returns of log(1)=0
rv = mod.compute_rv30d_annualized(closes)
assert rv == Decimal("0")
def test_too_few_closes_raises(mod) -> None:
with pytest.raises(ValueError, match="need at least 31 closes"):
mod.compute_rv30d_annualized([Decimal("100")] * 10)
def test_monotonic_growth_yields_low_rv(mod) -> None:
"""Crescita +1% ogni giorno: log returns costanti → stdev = 0 → RV = 0."""
closes = [Decimal("100") * (Decimal("1.01") ** i) for i in range(31)]
rv = mod.compute_rv30d_annualized(closes)
# Tutti i log returns sono identici (log 1.01) → stdev zero
assert rv == Decimal("0")
def test_alternating_returns_yield_known_rv(mod) -> None:
"""Returns alternati ±2% ogni giorno: stdev nota."""
# closes: 100, 102, 100, 102, ... (ricorda: returns = log(c[i]/c[i-1]))
closes = [Decimal("100")] + [
Decimal("102") if i % 2 == 0 else Decimal("100") for i in range(30)
]
rv = mod.compute_rv30d_annualized(closes)
# |log return| ~ 0.0198, stdev ≈ 0.0198 (alternano segno con media ≈ 0)
# Annualized = 0.0198 * sqrt(365) * 100 ≈ 37.86 vol pts
assert Decimal("36") <= rv <= Decimal("40")
# ---------------------------------------------------------------------------
# build_backfill_records
# ---------------------------------------------------------------------------
def test_build_records_skips_days_without_30d_history(mod) -> None:
"""Per i primi 30 giorni della serie spot, RV30d non è calcolabile."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=40),
)
# Per ogni record day, servono 30 giorni precedenti di spot.
# Lo spot più vecchio è today-44; quindi il primo giorno computabile
# è today-44+30 = today-14. Cap a oldest_day=today-40 → window day-14..day-0.
assert len(records) == 15 # day-14..day-0 incluso
for r in records:
assert r.asset == "ETH"
assert r.fetch_ok is True
assert r.iv_minus_rv == Decimal("50") # rv=0 con prezzi costanti
assert r.timestamp.tzinfo == UTC
assert r.timestamp.hour == 12
def test_build_records_filters_to_requested_window(mod) -> None:
"""oldest_day applicato come cutoff inferiore inclusivo."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="BTC",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
# day-5..day-0 → 6 record
assert len(records) == 6
record_days = {r.timestamp.date() for r in records}
assert record_days == {today - timedelta(days=i) for i in range(6)}
def test_build_records_skips_days_missing_dvol(mod) -> None:
"""Se manca DVOL per un giorno della finestra, lo si salta (no record)."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {
d.isoformat(): Decimal("50")
for d in days
if d != today - timedelta(days=2)
}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
record_days = {r.timestamp.date() for r in records}
assert today - timedelta(days=2) not in record_days
assert len(records) == 5
def test_build_records_skips_days_missing_spot(mod) -> None:
"""Se manca lo spot del giorno target, no record per quel giorno."""
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(45)]
spots = {
d.isoformat(): Decimal("100")
for d in days
if d != today - timedelta(days=2)
}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today - timedelta(days=5),
)
record_days = {r.timestamp.date() for r in records}
assert today - timedelta(days=2) not in record_days
def test_build_records_uses_noon_utc_timestamp(mod) -> None:
today = date(2026, 5, 10)
days = [today - timedelta(days=i) for i in range(35)]
spots = {d.isoformat(): Decimal("100") for d in days}
dvols = {d.isoformat(): Decimal("50") for d in days}
records = mod.build_backfill_records(
asset="ETH",
spots_by_day=spots,
dvols_by_day=dvols,
oldest_day=today,
)
assert len(records) == 1
assert records[0].timestamp == datetime(2026, 5, 10, 12, 0, tzinfo=UTC)
+28 -28
View File
@@ -11,9 +11,9 @@ from cerbero_bite.config import StrategyConfig, golden_config
from cerbero_bite.core.backtest import (
bs_put_delta,
bs_put_price,
daily_picks,
estimate_credit_eth,
find_strike_for_delta,
monday_picks,
normal_cdf,
run_backtest,
simulate_entry_filters,
@@ -87,7 +87,7 @@ def test_estimate_credit_returns_positive_credit_in_normal_regime() -> None:
# ---------------------------------------------------------------------------
# Monday picks + entry filter simulation
# Daily picks + entry filter simulation
# ---------------------------------------------------------------------------
@@ -114,35 +114,35 @@ def _snap(
)
def test_monday_picks_extracts_one_per_iso_week() -> None:
monday_2026_05_04 = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
monday_2026_05_11 = datetime(2026, 5, 11, 14, 0, tzinfo=UTC)
def test_daily_picks_extracts_one_per_calendar_day() -> None:
day1 = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
day2 = datetime(2026, 5, 5, 14, 0, tzinfo=UTC) # Tuesday: PICKED ora (crypto 24/7)
snapshots = [
_snap(ts=monday_2026_05_04),
_snap(ts=monday_2026_05_04 + timedelta(minutes=15)), # not picked
_snap(ts=monday_2026_05_11),
_snap(ts=day1),
_snap(ts=day1 + timedelta(minutes=15)), # stesso giorno, deduplicato
_snap(ts=day2),
]
picks = monday_picks(snapshots)
picks = daily_picks(snapshots)
assert len(picks) == 2
assert picks[0].timestamp == monday_2026_05_04
assert picks[1].timestamp == monday_2026_05_11
assert picks[0].timestamp == day1
assert picks[1].timestamp == day2
def test_monday_picks_skips_other_days_and_hours() -> None:
def test_daily_picks_skips_other_hours() -> None:
snapshots = [
_snap(ts=datetime(2026, 5, 4, 13, 0, tzinfo=UTC)), # Monday 13:00
_snap(ts=datetime(2026, 5, 5, 14, 0, tzinfo=UTC)), # Tuesday 14:00
_snap(ts=datetime(2026, 5, 4, 13, 0, tzinfo=UTC)), # 13:00 → skipped
_snap(ts=datetime(2026, 5, 5, 15, 30, tzinfo=UTC)), # 15:30 → skipped
]
assert monday_picks(snapshots) == []
assert daily_picks(snapshots) == []
def test_monday_picks_filters_by_asset() -> None:
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
def test_daily_picks_filters_by_asset() -> None:
day = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snapshots = [
_snap(ts=monday, asset="BTC"),
_snap(ts=monday, asset="ETH"),
_snap(ts=day, asset="BTC"),
_snap(ts=day, asset="ETH"),
]
picks = monday_picks(snapshots, asset="ETH")
picks = daily_picks(snapshots, asset="ETH")
assert len(picks) == 1
assert picks[0].snapshot.asset == "ETH"
@@ -156,8 +156,8 @@ def test_simulate_entry_filters_accepts_clean_snapshot(
type("MP", (), {"timestamp": monday, "snapshot": snap})() # type: ignore[arg-type]
]
# Hack: build via real dataclass
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=snap)]
from cerbero_bite.core.backtest import DailyPick
picks = [DailyPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert len(results) == 1
assert results[0].accepted is True
@@ -167,8 +167,8 @@ def test_simulate_entry_filters_rejects_dvol_out_of_band() -> None:
cfg = golden_config()
monday = datetime(2026, 5, 4, 14, 0, tzinfo=UTC)
snap = _snap(ts=monday, dvol=20, funding=0.10) # below 35
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=snap)]
from cerbero_bite.core.backtest import DailyPick
picks = [DailyPick(timestamp=monday, snapshot=snap)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert any("dvol" in r.lower() for r in results[0].reasons)
@@ -182,8 +182,8 @@ def test_simulate_entry_filters_skips_incomplete_snapshot() -> None:
# dvol=None ⇒ skipped
fetch_ok=False,
)
from cerbero_bite.core.backtest import MondayPick
picks = [MondayPick(timestamp=monday, snapshot=incomplete)]
from cerbero_bite.core.backtest import DailyPick
picks = [DailyPick(timestamp=monday, snapshot=incomplete)]
results = simulate_entry_filters(picks, cfg, capital_usd=Decimal("1500"))
assert results[0].accepted is False
assert results[0].skipped_for_data is True
@@ -208,8 +208,8 @@ def _synthetic_year_of_snapshots(
base = monday + timedelta(weeks=week)
# Lunedì 14:00 è il pick
rows.append(_snap(ts=base, spot=spot, dvol=dvol, funding=funding))
# Tick intermedi che NON cadono di lunedì alle 14:00:
# offset +1h così vengono ignorati da `monday_picks`.
# Tick intermedi che NON cadono alle 14:00:
# offset +1h (=15:00) così vengono ignorati da `daily_picks`.
for d in (2, 8, 14, 19):
rows.append(
_snap(
+1 -1
View File
@@ -68,7 +68,7 @@ def test_compute_hash_is_independent_of_recorded_hash_value(tmp_path: Path) -> N
def test_load_repo_strategy_yaml(tmp_path: Path) -> None:
"""The committed strategy.yaml validates with the recorded hash."""
result = load_strategy(REPO_ROOT / "strategy.yaml")
assert result.config.config_version == "1.3.0"
assert result.config.config_version == "1.4.0"
assert result.config.sizing.kelly_fraction == Decimal("0.13")
assert result.computed_hash == result.config.config_hash
+185
View File
@@ -351,3 +351,188 @@ def test_bias_zero_division_safe(cfg: StrategyConfig) -> None:
# eth_30d_ago == 0 must not crash; treat as no-bias (neutral)
ctx = _trend(eth_now="3000", eth_30d_ago="0", funding_cross="0", dvol="60", adx="15")
assert compute_bias(ctx, cfg) is None
# ---------------------------------------------------------------------------
# IV-RV adaptive gate
# ---------------------------------------------------------------------------
def _adaptive_cfg(**entry_overrides: object) -> StrategyConfig:
"""Golden config con gate adattivo abilitato di default per test."""
base_entry: dict[str, object] = {
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_adaptive_enabled": True,
"iv_minus_rv_min": Decimal("0"),
"iv_minus_rv_percentile": Decimal("0.25"),
"iv_minus_rv_window_target_days": 60,
"iv_minus_rv_window_min_days": 30,
}
base_entry.update(entry_overrides)
return golden_config(entry=base_entry)
def test_adaptive_pass_when_iv_rv_above_p25() -> None:
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 201))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("80"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is True
assert not any("IV richness" in r for r in decision.reasons)
def test_adaptive_blocks_when_iv_rv_below_p25() -> None:
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 201))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("20"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is False
assert any("IV richness" in r and "rolling" in r for r in decision.reasons)
def test_adaptive_with_n_days_zero_passes_warmup() -> None:
"""Warmup hard: nessun giorno coperto → gate skip (fail-open)."""
cfg = _adaptive_cfg()
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("0.1"),
iv_rv_history=(),
iv_rv_n_days=0,
),
cfg,
)
assert decision.accepted is True
def test_adaptive_with_floor_floor_binds_when_p25_low() -> None:
cfg = _adaptive_cfg(iv_minus_rv_min=Decimal("3"))
history = tuple(Decimal("0.5") for _ in range(200))
decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("1"),
iv_rv_history=history,
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is False
assert any("IV richness" in r for r in decision.reasons)
def test_legacy_static_gate_still_works_when_adaptive_disabled() -> None:
cfg = golden_config(entry={
"iv_minus_rv_filter_enabled": True,
"iv_minus_rv_adaptive_enabled": False,
"iv_minus_rv_min": Decimal("3"),
})
decision = validate_entry(
_good_ctx(iv_minus_rv=Decimal("2"), iv_rv_history=(), iv_rv_n_days=0),
cfg,
)
assert decision.accepted is False
assert any("IV richness below floor" in r for r in decision.reasons)
def test_iv_minus_rv_none_skips_gate_in_both_modes() -> None:
cfg = _adaptive_cfg()
decision = validate_entry(
_good_ctx(
iv_minus_rv=None,
iv_rv_history=tuple(Decimal(i) for i in range(1, 201)),
iv_rv_n_days=30,
),
cfg,
)
assert decision.accepted is True
def test_adaptive_with_n_days_one_uses_history_for_percentile() -> None:
"""Singolo giorno disponibile (cadenza qualunque): gate attivo,
soglia = P25 della finestra ricevuta. Dimostra che il warmup hard
finisce a n_days=1 (non 30 come nella vecchia implementazione)."""
cfg = _adaptive_cfg()
history = tuple(Decimal(i) for i in range(1, 101)) # 1..100, P25 = 25.75
# IV-RV sopra P25 → pass
pass_decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("30"),
iv_rv_history=history,
iv_rv_n_days=1,
),
cfg,
)
assert pass_decision.accepted is True
# IV-RV sotto P25 → block
block_decision = validate_entry(
_good_ctx(
iv_minus_rv=Decimal("10"),
iv_rv_history=history,
iv_rv_n_days=1,
),
cfg,
)
assert block_decision.accepted is False
assert any("IV richness" in r and "rolling" in r for r in block_decision.reasons)
# ---------------------------------------------------------------------------
# Vol-of-Vol guard
# ---------------------------------------------------------------------------
def _vov_cfg(threshold: Decimal = Decimal("5")) -> StrategyConfig:
return golden_config(entry={
"vol_of_vol_guard_enabled": True,
"vol_of_vol_threshold_pt": threshold,
"vol_of_vol_lookback_hours": 24,
})
def test_vov_guard_blocks_on_large_dvol_shift() -> None:
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("56"), dvol_24h_ago=Decimal("50")), cfg
)
assert decision.accepted is False
assert any("DVOL shifted" in r for r in decision.reasons)
def test_vov_guard_passes_on_small_dvol_shift() -> None:
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("52"), dvol_24h_ago=Decimal("50")), cfg
)
assert decision.accepted is True
def test_vov_guard_passes_when_lookback_missing() -> None:
"""fail-open su gap dati: se dvol_24h_ago=None il guard non scatta."""
cfg = _vov_cfg()
decision = validate_entry(
_good_ctx(dvol_now=Decimal("99"), dvol_24h_ago=None), cfg
)
# dvol_now=99 sarebbe oltre dvol_max=90; testiamo solo l'effetto VoV
# consultando le reasons (dvol_now potrebbe avere altre reason ma non
# quella VoV).
assert not any("DVOL shifted" in r for r in decision.reasons)
def test_vov_guard_disabled_does_nothing() -> None:
cfg = golden_config(entry={"vol_of_vol_guard_enabled": False})
decision = validate_entry(
_good_ctx(dvol_now=Decimal("55"), dvol_24h_ago=Decimal("50")), cfg
)
# Nessuna reason VoV (il delta=5 sarebbe oltre soglia se attivo)
assert not any("DVOL shifted" in r for r in decision.reasons)
+45
View File
@@ -164,3 +164,48 @@ async def test_returns_zero_for_empty_assets(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
n = await collect_market_snapshot(ctx, assets=(), now=_now())
assert n == 0
def _read_dvol_history(ctx: MagicMock) -> list[dict]:
import sqlite3
conn = connect(ctx.db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"SELECT * FROM dvol_history ORDER BY timestamp"
).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
@pytest.mark.asyncio
async def test_eth_snapshot_mirrors_into_dvol_history(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
await collect_market_snapshot(ctx, assets=("ETH", "BTC"), now=_now())
rows = _read_dvol_history(ctx)
assert len(rows) == 1
assert Decimal(str(rows[0]["dvol"])) == Decimal("55")
assert Decimal(str(rows[0]["eth_spot"])) == Decimal("3000")
@pytest.mark.asyncio
async def test_btc_only_snapshot_does_not_touch_dvol_history(
tmp_path: Path,
) -> None:
ctx = _ctx(tmp_path)
await collect_market_snapshot(ctx, assets=("BTC",), now=_now())
assert _read_dvol_history(ctx) == []
@pytest.mark.asyncio
async def test_eth_snapshot_skips_dvol_history_when_dvol_missing(
tmp_path: Path,
) -> None:
ctx = _ctx(tmp_path)
ctx.deribit.latest_dvol = AsyncMock(side_effect=RuntimeError("no dvol"))
await collect_market_snapshot(ctx, assets=("ETH",), now=_now())
# market_snapshots row still persisted, but dvol_history must stay empty
# because its schema enforces NOT NULL on dvol/eth_spot.
assert _read_dvol_history(ctx) == []
+395
View File
@@ -0,0 +1,395 @@
"""TDD per i nuovi helper repository del gate IV-RV adattivo.
Spec: distinct-days policy il caller (entry_cycle) interroga il
numero di giorni coperti separatamente dai valori della finestra,
così che cadenze miste (tick live 15min + backfill daily) restino
statisticamente coerenti.
Helpers:
* ``count_iv_rv_distinct_days(asset, max_days, as_of) -> int``
* ``iv_rv_values_for_window(asset, window_days, as_of) -> list[Decimal]``
* ``dvol_lookback`` (invariato, riusato dal Vol-of-Vol guard)
"""
from __future__ import annotations
import sqlite3
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from cerbero_bite.state.db import connect, run_migrations
from cerbero_bite.state.models import MarketSnapshotRecord
from cerbero_bite.state.repository import Repository
def _snap(
*,
ts: datetime,
asset: str = "ETH",
iv_minus_rv: Decimal | None = Decimal("2"),
fetch_ok: bool = True,
dvol: Decimal = Decimal("50"),
) -> MarketSnapshotRecord:
return MarketSnapshotRecord(
timestamp=ts,
asset=asset,
spot=Decimal("2000"),
dvol=dvol,
realized_vol_30d=Decimal("48"),
iv_minus_rv=iv_minus_rv,
funding_perp_annualized=Decimal("0"),
funding_cross_annualized=Decimal("0"),
dealer_net_gamma=Decimal("0"),
gamma_flip_level=None,
oi_delta_pct_4h=None,
liquidation_long_risk="low",
liquidation_short_risk="low",
macro_days_to_event=None,
fetch_ok=fetch_ok,
fetch_errors_json=None,
)
@pytest.fixture
def db_one_day(tmp_path) -> sqlite3.Connection:
"""SQLite temp con 96 tick ETH a 15min (1 giorno) e fetch_ok=1."""
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
for i in range(96):
repo.record_market_snapshot(
conn,
_snap(
ts=base + timedelta(minutes=15 * i),
iv_minus_rv=Decimal("2") + Decimal(i) / Decimal("100"),
dvol=Decimal("50") + Decimal(i) / Decimal("10"),
),
)
conn.commit()
return conn
@pytest.fixture
def db_three_days_mixed(tmp_path) -> sqlite3.Connection:
"""SQLite temp con 3 giorni ETH:
- day1 (2026-05-01): 96 tick @ 15min, valori 1..96
- day2 (2026-05-02): 1 record daily a 12:00, valore 100 (backfill style)
- day3 (2026-05-03): 4 tick orari, valori 200, 201, 202, 203
Più 1 giorno BTC isolato (per cross-asset isolation).
"""
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
day1 = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
for i in range(96):
repo.record_market_snapshot(
conn,
_snap(
ts=day1 + timedelta(minutes=15 * i),
iv_minus_rv=Decimal(i + 1),
),
)
repo.record_market_snapshot(
conn,
_snap(ts=datetime(2026, 5, 2, 12, 0, tzinfo=UTC), iv_minus_rv=Decimal("100")),
)
day3 = datetime(2026, 5, 3, 0, 0, tzinfo=UTC)
for i in range(4):
repo.record_market_snapshot(
conn,
_snap(
ts=day3 + timedelta(hours=i),
iv_minus_rv=Decimal(200 + i),
),
)
repo.record_market_snapshot(
conn,
_snap(
ts=datetime(2026, 4, 30, 0, 0, tzinfo=UTC),
asset="BTC",
iv_minus_rv=Decimal("999"),
),
)
conn.commit()
return conn
# ---------------------------------------------------------------------------
# count_iv_rv_distinct_days
# ---------------------------------------------------------------------------
def test_count_distinct_days_returns_one_for_single_day_history(db_one_day) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 1
def test_count_distinct_days_returns_zero_for_other_asset(db_one_day) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_one_day,
asset="BTC",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_counts_unique_calendar_days(
db_three_days_mixed,
) -> None:
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n == 3
def test_count_distinct_days_excludes_other_assets(
db_three_days_mixed,
) -> None:
repo = Repository()
n_btc = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="BTC",
max_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n_btc == 1
def test_count_distinct_days_respects_window_cutoff(
db_three_days_mixed,
) -> None:
"""max_days=1 da as_of=2026-05-04 → cutoff=2026-05-03 → solo day3."""
repo = Repository()
n = repo.count_iv_rv_distinct_days(
db_three_days_mixed,
asset="ETH",
max_days=1,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert n == 1
def test_count_distinct_days_excludes_null_iv_rv(tmp_path) -> None:
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
repo.record_market_snapshot(
conn,
_snap(ts=datetime(2026, 5, 1, 12, 0, tzinfo=UTC), iv_minus_rv=None),
)
conn.commit()
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_excludes_fetch_failed(tmp_path) -> None:
conn = connect(str(tmp_path / "test.sqlite"))
run_migrations(conn)
repo = Repository()
repo.record_market_snapshot(
conn,
_snap(
ts=datetime(2026, 5, 1, 12, 0, tzinfo=UTC),
iv_minus_rv=Decimal("99"),
fetch_ok=False,
),
)
conn.commit()
n = repo.count_iv_rv_distinct_days(
conn,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert n == 0
def test_count_distinct_days_rejects_naive_as_of(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=60,
as_of=datetime(2026, 5, 2, 0, 0), # naive
)
def test_count_distinct_days_rejects_non_positive_max_days(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="max_days must be positive"):
repo.count_iv_rv_distinct_days(
db_one_day,
asset="ETH",
max_days=0,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
# ---------------------------------------------------------------------------
# iv_rv_values_for_window
# ---------------------------------------------------------------------------
def test_values_for_window_returns_ordered_asc(db_one_day) -> None:
repo = Repository()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert len(values) == 96
assert values == sorted(values)
assert values[0] == Decimal("2.00")
def test_values_for_window_filters_other_asset(db_one_day) -> None:
repo = Repository()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="BTC",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
assert values == []
def test_values_for_window_skips_null(db_one_day) -> None:
repo = Repository()
repo.record_market_snapshot(
db_one_day,
_snap(ts=datetime(2026, 5, 2, 0, 0, tzinfo=UTC), iv_minus_rv=None),
)
db_one_day.commit()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 3, 0, 0, tzinfo=UTC),
)
assert len(values) == 96
def test_values_for_window_skips_fetch_failed(db_one_day) -> None:
repo = Repository()
repo.record_market_snapshot(
db_one_day,
_snap(
ts=datetime(2026, 5, 3, 0, 0, tzinfo=UTC),
iv_minus_rv=Decimal("99"),
fetch_ok=False,
),
)
db_one_day.commit()
values = repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert Decimal("99") not in values
def test_values_for_window_respects_window_cutoff(
db_three_days_mixed,
) -> None:
"""window_days=1 da as_of=2026-05-04 → solo day3 (4 valori 200..203)."""
repo = Repository()
values = repo.iv_rv_values_for_window(
db_three_days_mixed,
asset="ETH",
window_days=1,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert values == [Decimal(200 + i) for i in range(4)]
def test_values_for_window_full_window(db_three_days_mixed) -> None:
"""window_days=60: tutti i valori dei 3 giorni (96 + 1 + 4 = 101)."""
repo = Repository()
values = repo.iv_rv_values_for_window(
db_three_days_mixed,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 4, 0, 0, tzinfo=UTC),
)
assert len(values) == 101
def test_values_for_window_rejects_naive_as_of(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=60,
as_of=datetime(2026, 5, 2, 0, 0),
)
def test_values_for_window_rejects_non_positive_window(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="window_days must be positive"):
repo.iv_rv_values_for_window(
db_one_day,
asset="ETH",
window_days=0,
as_of=datetime(2026, 5, 2, 0, 0, tzinfo=UTC),
)
# ---------------------------------------------------------------------------
# dvol_lookback (regression — invariato dopo refactor)
# ---------------------------------------------------------------------------
def test_dvol_lookback_returns_closest_tick(db_one_day) -> None:
repo = Repository()
base = datetime(2026, 5, 1, 0, 0, tzinfo=UTC)
target = base + timedelta(hours=12)
out = repo.dvol_lookback(
db_one_day, asset="ETH", reference=target, tolerance_minutes=15
)
# i=48 → dvol = 50 + 4.8 = 54.8
assert out == Decimal("54.8")
def test_dvol_lookback_returns_none_when_gap(db_one_day) -> None:
repo = Repository()
target = datetime(2025, 1, 1, 0, 0, tzinfo=UTC)
out = repo.dvol_lookback(
db_one_day, asset="ETH", reference=target, tolerance_minutes=15
)
assert out is None
def test_dvol_lookback_rejects_naive_reference(db_one_day) -> None:
repo = Repository()
with pytest.raises(ValueError, match="timezone-aware"):
repo.dvol_lookback(
db_one_day,
asset="ETH",
reference=datetime(2026, 5, 1, 12, 0),
)
+2 -1
View File
@@ -144,7 +144,8 @@ def test_aggregate_cap_at_975_reduces_to_one(cfg: StrategyConfig) -> None:
def test_concurrent_positions_at_cap_returns_zero(cfg: StrategyConfig) -> None:
res = compute_contracts(_ctx(capital_usd="1500", other_open_positions=1), cfg)
# Default cap = 5 (entry daily). other_open_positions=5 ⇒ cap raggiunto.
res = compute_contracts(_ctx(capital_usd="1500", other_open_positions=5), cfg)
assert res.n_contracts == 0
assert res.reason_if_zero is not None
assert "position" in res.reason_if_zero.lower()