From d9454fc996e4eacd91491c0980674f0badc610d0 Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Thu, 30 Apr 2026 14:39:09 +0200 Subject: [PATCH] =?UTF-8?q?feat(state+runtime+gui):=20market=5Fsnapshots?= =?UTF-8?q?=20=E2=80=94=20calibrazione=20soglie=20da=20dati?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sistema dedicato di raccolta dati per scegliere le soglie dei filtri sui percentili reali invece di valori a istinto. Nuovi componenti: * state/migrations/0003_market_snapshots.sql — tabella + index, PK composta (timestamp, asset). Ogni colonna numerica è NULL-able per preservare la continuità della serie quando un singolo MCP fallisce. * state/models.py — MarketSnapshotRecord Pydantic. * state/repository.py — record_market_snapshot, list_market_snapshots, _row_to_market_snapshot. * runtime/market_snapshot_cycle.py — collettore best-effort che chiama spot/dvol/realized_vol/dealer_gamma/funding_perp/funding_cross/ liquidation_heatmap/macro per ogni asset; raccoglie gli errori in fetch_errors_json e segna fetch_ok=false ma persiste comunque la riga. * clients/deribit.py — generalizzati dealer_gamma_profile(currency), realized_vol(currency), spot_perp_price(asset). dealer_gamma_profile_eth resta come alias per la chiamata dell'entry cycle. * runtime/orchestrator.py — nuovo job APScheduler `market_snapshot` cron */15 con assets configurabili (default ETH+BTC); il consumer manual_actions ora dispatcha anche kind=run_cycle cycle=market_snapshot per la GUI. * gui/data_layer.py — load_market_snapshots, enqueue_run_cycle accetta market_snapshot; tipo MarketSnapshotRecord esposto. * gui/pages/6_📐_Calibrazione.py — selezione asset+finestra, conteggio fetch_ok, per ogni metrica: istogramma, soglia da strategy.yaml come vline rossa, percentili P5/P10/P25/P50/P75/P90/P95, % di tick che la soglia avrebbe filtrato. * gui/pages/1_📊_Status.py — bottone "📐 Forza snapshot" (4° del pannello Forza ciclo) per popolare la tabella senza aspettare il cron. 5 nuovi test sul collector (happy, fault tolerance, asset switch, macro fail, empty assets); test_orchestrator job set aggiornato. 368/368 tests pass; ruff clean; mypy strict src clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cerbero_bite/clients/deribit.py | 69 +++- src/cerbero_bite/gui/data_layer.py | 29 +- src/cerbero_bite/gui/pages/1_📊_Status.py | 9 +- .../gui/pages/6_📐_Calibrazione.py | 309 ++++++++++++++++++ .../runtime/market_snapshot_cycle.py | 192 +++++++++++ src/cerbero_bite/runtime/orchestrator.py | 26 ++ .../migrations/0003_market_snapshots.sql | 38 +++ src/cerbero_bite/state/models.py | 30 ++ src/cerbero_bite/state/repository.py | 86 +++++ tests/integration/test_orchestrator.py | 9 +- tests/unit/test_market_snapshot_cycle.py | 166 ++++++++++ 11 files changed, 956 insertions(+), 7 deletions(-) create mode 100644 src/cerbero_bite/gui/pages/6_📐_Calibrazione.py create mode 100644 src/cerbero_bite/runtime/market_snapshot_cycle.py create mode 100644 src/cerbero_bite/state/migrations/0003_market_snapshots.sql create mode 100644 tests/unit/test_market_snapshot_cycle.py diff --git a/src/cerbero_bite/clients/deribit.py b/src/cerbero_bite/clients/deribit.py index ea614d7..88ac2a1 100644 --- a/src/cerbero_bite/clients/deribit.py +++ b/src/cerbero_bite/clients/deribit.py @@ -303,14 +303,15 @@ class DeribitClient: return Decimal(str(entry["close"])) return None - async def dealer_gamma_profile_eth( + async def dealer_gamma_profile( self, + currency: str, *, expiry_from: datetime | None = None, expiry_to: datetime | None = None, top_n_strikes: int = 50, ) -> DealerGammaSnapshot: - """Return the aggregated dealer net gamma snapshot for ETH options. + """Return the aggregated dealer net gamma snapshot for ``currency``. Long-gamma regime (``total_net_dealer_gamma > 0``) is associated with vol-suppressing dealer hedging — the entry filter §2.8 uses @@ -318,7 +319,7 @@ class DeribitClient: (vol-amplifying dealer flow). """ body: dict[str, Any] = { - "currency": "ETH", + "currency": currency.upper(), "top_n_strikes": top_n_strikes, } if expiry_from is not None: @@ -347,6 +348,68 @@ class DeribitClient: strikes_analyzed=int(raw.get("strikes_analyzed") or 0), ) + async def dealer_gamma_profile_eth( + self, + *, + expiry_from: datetime | None = None, + expiry_to: datetime | None = None, + top_n_strikes: int = 50, + ) -> DealerGammaSnapshot: + """Backwards-compatible alias of :py:meth:`dealer_gamma_profile`.""" + return await self.dealer_gamma_profile( + "ETH", + expiry_from=expiry_from, + expiry_to=expiry_to, + top_n_strikes=top_n_strikes, + ) + + async def realized_vol( + self, + currency: str, + *, + windows: tuple[int, ...] = (14, 30), + ) -> dict[str, Decimal | None]: + """Annualised realised vol for ``currency`` plus IV-RV spread. + + Returns ``{"rv_14d", "rv_30d", "iv_minus_rv_30d", "iv_current"}`` + (``None`` for any missing field). Pure read-only — no side + effects on the engine. + """ + raw = await self._http.call( + "get_realized_vol", + {"currency": currency.upper(), "windows": list(windows)}, + ) + if not isinstance(raw, dict): + return {} + rv = raw.get("realized_vol_pct") or {} + spread = raw.get("iv_minus_rv_pct") or {} + return { + "rv_14d": _to_decimal(rv.get("14d")), + "rv_30d": _to_decimal(rv.get("30d")), + "iv_current": _to_decimal(raw.get("iv_current_pct")), + "iv_minus_rv_30d": _to_decimal(spread.get("30d")), + "iv_minus_rv_14d": _to_decimal(spread.get("14d")), + } + + async def spot_perp_price(self, asset: str) -> Decimal: + """Mark price of ``-PERPETUAL`` (cheap proxy for spot).""" + instrument = f"{asset.upper()}-PERPETUAL" + raw = await self._http.call("get_ticker", {"instrument": instrument}) + if not isinstance(raw, dict): + raise McpDataAnomalyError( + f"get_ticker: unexpected shape for {instrument}", + service=self.SERVICE, + tool="get_ticker", + ) + mark = raw.get("mark_price") or raw.get("last_price") + if mark is None: + raise McpDataAnomalyError( + f"get_ticker: missing mark_price for {instrument}", + service=self.SERVICE, + tool="get_ticker", + ) + return Decimal(str(mark)) + async def adx_14( self, *, diff --git a/src/cerbero_bite/gui/data_layer.py b/src/cerbero_bite/gui/data_layer.py index d204c9f..b79de16 100644 --- a/src/cerbero_bite/gui/data_layer.py +++ b/src/cerbero_bite/gui/data_layer.py @@ -32,6 +32,7 @@ from cerbero_bite.state import Repository, connect, transaction from cerbero_bite.state.models import ( DecisionRecord, ManualAction, + MarketSnapshotRecord, PositionRecord, SystemStateRecord, ) @@ -61,6 +62,7 @@ __all__ = [ "load_closed_positions", "load_decisions_for_position", "load_engine_snapshot", + "load_market_snapshots", "load_open_positions", "load_pending_manual_actions", "load_position_by_id", @@ -634,9 +636,10 @@ def enqueue_run_cycle( method on the next minute tick. """ cycle_norm = cycle.strip().lower() - if cycle_norm not in {"entry", "monitor", "health"}: + if cycle_norm not in {"entry", "monitor", "health", "market_snapshot"}: raise ValueError( - f"cycle must be entry|monitor|health, got '{cycle}'" + f"cycle must be entry|monitor|health|market_snapshot, " + f"got '{cycle}'" ) return _enqueue_action( db_path=db_path, @@ -645,6 +648,28 @@ def enqueue_run_cycle( ) +def load_market_snapshots( + *, + asset: str, + db_path: Path | str = DEFAULT_DB_PATH, + start: datetime | None = None, + end: datetime | None = None, + limit: int = 5000, +) -> list[MarketSnapshotRecord]: + """Return market_snapshots rows for the asset, newest-first.""" + db_path = Path(db_path) + if not db_path.exists(): + return [] + repo = Repository() + conn = connect(db_path) + try: + return repo.list_market_snapshots( + conn, asset=asset, start=start, end=end, limit=limit + ) + finally: + conn.close() + + def load_pending_manual_actions( *, db_path: Path | str = DEFAULT_DB_PATH ) -> list[ManualAction]: diff --git a/src/cerbero_bite/gui/pages/1_📊_Status.py b/src/cerbero_bite/gui/pages/1_📊_Status.py index 3911d03..c241db5 100644 --- a/src/cerbero_bite/gui/pages/1_📊_Status.py +++ b/src/cerbero_bite/gui/pages/1_📊_Status.py @@ -47,7 +47,7 @@ def _render_force_cycle_panel(db_path: Path) -> None: "solo se il motore è in esecuzione (`cerbero-bite start`); il job " "`manual_actions` consuma la coda ogni minuto." ) - cols = st.columns(3) + cols = st.columns(4) if cols[0].button( "▶ Forza entry", use_container_width=True, @@ -72,6 +72,13 @@ def _render_force_cycle_panel(db_path: Path) -> None: ): aid = enqueue_run_cycle(cycle="health", db_path=db_path) st.success(f"✅ ciclo health accodato (id #{aid}).") + if cols[3].button( + "📐 Forza snapshot", + use_container_width=True, + help="Esegue subito una raccolta market_snapshot (alimenta Calibrazione).", + ): + aid = enqueue_run_cycle(cycle="market_snapshot", db_path=db_path) + st.success(f"✅ snapshot accodato (id #{aid}).") @st.cache_data(ttl=60, show_spinner=False) diff --git a/src/cerbero_bite/gui/pages/6_📐_Calibrazione.py b/src/cerbero_bite/gui/pages/6_📐_Calibrazione.py new file mode 100644 index 0000000..1af37f5 --- /dev/null +++ b/src/cerbero_bite/gui/pages/6_📐_Calibrazione.py @@ -0,0 +1,309 @@ +"""Calibrazione page — distribuzioni storiche dei segnali per tarare le soglie. + +Legge dalla tabella ``market_snapshots`` (popolata dal job dedicato cron +``*/15``). Per ogni metrica osservabile mostra: + +* istogramma + linea verticale della soglia attuale di config, +* percentili P5/P10/P25/P50/P75/P90/P95, +* percentuale di tick che la soglia attuale avrebbe filtrato. + +L'idea è scegliere le soglie sui percentili reali del proprio +ambiente (testnet o mainnet), invece di valori fissati a istinto. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from pathlib import Path + +import pandas as pd +import plotly.graph_objects as go +import streamlit as st + +from cerbero_bite.config.loader import load_strategy +from cerbero_bite.gui.data_layer import ( + DEFAULT_DB_PATH, + humanize_dt, + load_market_snapshots, +) +from cerbero_bite.state.models import MarketSnapshotRecord + + +def _resolve_db() -> Path: + return Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH)) + + +@dataclass(frozen=True) +class MetricSpec: + """Descrittore della metrica da plottare.""" + + field: str + title: str + unit: str + threshold_label: str | None + threshold_value: float | None + threshold_direction: str # "below" o "above" (filtra se valore è X soglia) + + +def _metric_specs(strategy: object | None) -> list[MetricSpec]: + """Costruisce gli spec leggendo le soglie correnti da strategy.yaml.""" + funding_max: float | None = None + dealer_min: float | None = None + dvol_min: float | None = None + if strategy is not None: + try: + funding_max = float(strategy.entry.funding_max_abs_annualized) # type: ignore[attr-defined] + except Exception: + funding_max = None + try: + dealer_min = float(strategy.entry.dealer_gamma_min) # type: ignore[attr-defined] + except Exception: + dealer_min = None + try: + dvol_min = float(strategy.entry.dvol_min) # type: ignore[attr-defined] + except Exception: + dvol_min = None + + specs: list[MetricSpec] = [ + MetricSpec( + field="dvol", + title="DVOL", + unit="%", + threshold_label=( + f"DVOL min={dvol_min:.0f}" if dvol_min is not None else None + ), + threshold_value=dvol_min, + threshold_direction="below", + ), + MetricSpec( + field="realized_vol_30d", + title="Realized vol 30d", + unit="%", + threshold_label=None, + threshold_value=None, + threshold_direction="below", + ), + MetricSpec( + field="iv_minus_rv", + title="IV − RV (30d)", + unit="%", + threshold_label=None, + threshold_value=None, + threshold_direction="below", + ), + MetricSpec( + field="funding_perp_annualized", + title="Funding perp annualized", + unit="frazione", + threshold_label=( + f"|funding| max={funding_max:.2f}" + if funding_max is not None + else None + ), + threshold_value=funding_max, + threshold_direction="above_abs", + ), + MetricSpec( + field="funding_cross_annualized", + title="Funding cross median annualized", + unit="frazione", + threshold_label=None, + threshold_value=None, + threshold_direction="above_abs", + ), + MetricSpec( + field="dealer_net_gamma", + title="Dealer net gamma", + unit="USD", + threshold_label=( + f"min={dealer_min:.0f}" + if dealer_min is not None + else None + ), + threshold_value=dealer_min, + threshold_direction="below", + ), + MetricSpec( + field="oi_delta_pct_4h", + title="OI delta % (4h)", + unit="%", + threshold_label=None, + threshold_value=None, + threshold_direction="below", + ), + ] + return specs + + +def _series(records: list[MarketSnapshotRecord], field: str) -> pd.Series: + values: list[float] = [] + for r in records: + v = getattr(r, field, None) + if v is None: + continue + try: + values.append(float(v)) + except (TypeError, ValueError): + continue + return pd.Series(values, dtype="float64") + + +def _percent_blocked(s: pd.Series, spec: MetricSpec) -> float | None: + if spec.threshold_value is None or s.empty: + return None + if spec.threshold_direction == "below": + return float((s < spec.threshold_value).mean()) + if spec.threshold_direction == "above_abs": + return float((s.abs() > spec.threshold_value).mean()) + if spec.threshold_direction == "above": + return float((s > spec.threshold_value).mean()) + return None + + +def _percentiles_strip(s: pd.Series) -> None: + if s.empty: + st.caption("(nessun dato)") + return + quantiles = [0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95] + cols = st.columns(len(quantiles)) + for col, q in zip(cols, quantiles, strict=False): + col.metric(f"P{int(q * 100)}", f"{s.quantile(q):.4g}") + + +def _render_metric(spec: MetricSpec, records: list[MarketSnapshotRecord]) -> None: + s = _series(records, spec.field) + if s.empty: + st.subheader(f"{spec.title}") + st.info( + f"Nessun valore disponibile per `{spec.field}`. " + "Avvia il job `market_snapshot` (engine attivo, cron */15) per " + "popolare la tabella." + ) + return + + st.subheader(f"{spec.title} ({spec.unit})") + + pct_blocked = _percent_blocked(s, spec) + cols = st.columns(4) + cols[0].metric("Tick raccolti", len(s)) + cols[1].metric("Min", f"{s.min():.4g}") + cols[2].metric("Max", f"{s.max():.4g}") + cols[3].metric( + "% bloccato dalla soglia", + f"{pct_blocked:.0%}" if pct_blocked is not None else "—", + help=( + "Frazione di tick che la soglia di config avrebbe filtrato" + f" se applicata a questa serie ({spec.threshold_direction})." + ), + ) + + fig = go.Figure() + fig.add_trace(go.Histogram(x=s, nbinsx=40, opacity=0.85, name="distrib.")) + if spec.threshold_value is not None: + fig.add_vline( + x=spec.threshold_value, + line_dash="dash", + line_color="red", + line_width=2, + annotation_text=spec.threshold_label or f"soglia {spec.threshold_value}", + annotation_position="top", + ) + if spec.threshold_direction == "above_abs": + # Disegna anche il bound negativo per i filtri simmetrici. + fig.add_vline( + x=-spec.threshold_value, + line_dash="dash", + line_color="red", + line_width=2, + annotation_text=None, + ) + fig.update_layout( + height=280, + margin={"l": 10, "r": 10, "t": 30, "b": 10}, + xaxis_title=spec.unit, + yaxis_title="numero tick", + ) + st.plotly_chart(fig, use_container_width=True) + + _percentiles_strip(s) + + +def render() -> None: + st.title("📐 Calibrazione") + st.caption( + "Distribuzioni storiche dei segnali raccolti dal job " + "`market_snapshot` (cron */15). Usa i percentili reali per " + "tarare le soglie in `strategy.yaml` invece di valori a istinto." + ) + + db_path = _resolve_db() + + col_a, col_b = st.columns(2) + asset = col_a.selectbox("Asset", options=["ETH", "BTC"], index=0) + window = col_b.selectbox( + "Finestra", + options=[ + "Tutto lo storico", + "Ultime 24h", + "Ultimi 7 giorni", + "Ultimi 30 giorni", + ], + index=0, + ) + + now = datetime.now(UTC) + start: datetime | None = None + if window == "Ultime 24h": + start = now - timedelta(hours=24) + elif window == "Ultimi 7 giorni": + start = now - timedelta(days=7) + elif window == "Ultimi 30 giorni": + start = now - timedelta(days=30) + + records = load_market_snapshots( + asset=asset, db_path=db_path, start=start, limit=5000 + ) + + if not records: + st.info( + "Nessun snapshot disponibile in questa finestra per " + f"`{asset}`. Avvia l'engine (`cerbero-bite start`) e attendi " + "almeno un tick del job `market_snapshot` (cron */15)." + ) + return + + st.caption( + f"{len(records)} snapshot · primo {humanize_dt(records[-1].timestamp)} " + f"· ultimo {humanize_dt(records[0].timestamp)}" + ) + + # Conteggio fetch_ok per qualità delle serie + n_ok = sum(1 for r in records if r.fetch_ok) + cols = st.columns(3) + cols[0].metric("Snapshot totali", len(records)) + cols[1].metric("fetch_ok = true", n_ok) + cols[2].metric( + "Tasso ok", + f"{n_ok / len(records):.0%}" if records else "—", + ) + st.divider() + + # Carica strategy.yaml per leggere le soglie correnti + try: + strategy = load_strategy(Path("strategy.yaml")) + except Exception as exc: + st.warning( + f"Impossibile leggere `strategy.yaml`: {type(exc).__name__}: {exc}" + ) + strategy = None + + specs = _metric_specs(strategy) + + for spec in specs: + _render_metric(spec, records) + st.divider() + + +render() diff --git a/src/cerbero_bite/runtime/market_snapshot_cycle.py b/src/cerbero_bite/runtime/market_snapshot_cycle.py new file mode 100644 index 0000000..244809f --- /dev/null +++ b/src/cerbero_bite/runtime/market_snapshot_cycle.py @@ -0,0 +1,192 @@ +"""Periodic market-snapshot collector. + +Drives the ``market_snapshots`` table populated by the scheduler job +``market_snapshot`` (cron */15 by default). For every traded asset the +collector calls the same MCP feeds the entry/monitor cycles consume, +but in **best-effort mode**: a single failure leaves the corresponding +column NULL and the row is still persisted, with an error map in +``fetch_errors_json`` for debugging. This keeps the time series +continuous even when one of the feeds is briefly down — the +distributions are what matters for threshold calibration, not the +real-time correctness of any single tick. +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Awaitable, Callable +from datetime import UTC, datetime +from decimal import Decimal +from typing import TYPE_CHECKING, Any + +from cerbero_bite.clients._exceptions import McpError +from cerbero_bite.state import connect, transaction +from cerbero_bite.state.models import MarketSnapshotRecord + +if TYPE_CHECKING: + from cerbero_bite.runtime.dependencies import RuntimeContext + +__all__ = ["DEFAULT_ASSETS", "collect_market_snapshot"] + + +_log = logging.getLogger("cerbero_bite.runtime.market_snapshot") + + +DEFAULT_ASSETS: tuple[str, ...] = ("ETH", "BTC") + + +async def _safe_call( + label: str, + factory: Callable[[], Awaitable[Any]], + errors: dict[str, str], +) -> Any: + try: + return await factory() + except (McpError, Exception) as exc: # pragma: no branch — best-effort + errors[label] = f"{type(exc).__name__}: {exc}" + return None + + +def _decimal_or_none(value: Any) -> Decimal | None: + if value is None: + return None + if isinstance(value, Decimal): + return value + try: + return Decimal(str(value)) + except (ValueError, ArithmeticError): + return None + + +async def _collect_one( + ctx: RuntimeContext, asset: str, *, when: datetime +) -> MarketSnapshotRecord: + errors: dict[str, str] = {} + asset_upper = asset.upper() + + spot = await _safe_call( + "spot", + lambda: ctx.deribit.spot_perp_price(asset_upper), + errors, + ) + dvol_value = await _safe_call( + "dvol", + lambda: ctx.deribit.latest_dvol(currency=asset_upper, now=when), + errors, + ) + rv = await _safe_call( + "realized_vol", + lambda: ctx.deribit.realized_vol(asset_upper), + errors, + ) + gamma = await _safe_call( + "dealer_gamma", + lambda: ctx.deribit.dealer_gamma_profile(asset_upper), + errors, + ) + funding_perp = await _safe_call( + "funding_perp", + lambda: ctx.hyperliquid.funding_rate_annualized(asset_upper), + errors, + ) + funding_cross = await _safe_call( + "funding_cross", + lambda: ctx.sentiment.funding_cross_median_annualized(asset_upper), + errors, + ) + heatmap = await _safe_call( + "liquidation", + lambda: ctx.sentiment.liquidation_heatmap(asset_upper), + errors, + ) + macro_days = await _safe_call( + "macro", + lambda: ctx.macro.next_high_severity_within( + days=ctx.cfg.structure.dte_target, + countries=list(ctx.cfg.entry.exclude_macro_countries), + now=when, + ), + errors, + ) + + rv_30 = (rv or {}).get("rv_30d") if isinstance(rv, dict) else None + iv_minus_rv_30 = ( + (rv or {}).get("iv_minus_rv_30d") if isinstance(rv, dict) else None + ) + + return MarketSnapshotRecord( + timestamp=when, + asset=asset_upper, + spot=_decimal_or_none(spot), + dvol=_decimal_or_none(dvol_value), + realized_vol_30d=_decimal_or_none(rv_30), + iv_minus_rv=_decimal_or_none(iv_minus_rv_30), + funding_perp_annualized=_decimal_or_none(funding_perp), + funding_cross_annualized=_decimal_or_none(funding_cross), + dealer_net_gamma=( + _decimal_or_none(gamma.total_net_dealer_gamma) + if gamma is not None + else None + ), + gamma_flip_level=( + _decimal_or_none(gamma.gamma_flip_level) + if gamma is not None + else None + ), + oi_delta_pct_4h=( + _decimal_or_none(heatmap.oi_delta_pct_4h) + if heatmap is not None + else None + ), + liquidation_long_risk=( + heatmap.long_squeeze_risk if heatmap is not None else None + ), + liquidation_short_risk=( + heatmap.short_squeeze_risk if heatmap is not None else None + ), + macro_days_to_event=( + int(macro_days) if isinstance(macro_days, int) else None + ), + fetch_ok=not errors, + fetch_errors_json=(json.dumps(errors) if errors else None), + ) + + +async def collect_market_snapshot( + ctx: RuntimeContext, + *, + assets: tuple[str, ...] = DEFAULT_ASSETS, + now: datetime | None = None, +) -> int: + """Collect + persist one snapshot per asset. Returns count persisted. + + The function is sync at heart (sequential per asset to keep MCP + load light) but kept ``async def`` so APScheduler can schedule it + directly. A single asset failing does not abort the loop — the + other assets are still snapshotted. + """ + when = (now or datetime.now(UTC)).astimezone(UTC) + persisted = 0 + + for asset in assets: + try: + record = await _collect_one(ctx, asset, when=when) + except Exception: # pragma: no cover — defensive + _log.exception("snapshot for %s failed catastrophically", asset) + continue + + try: + conn = connect(ctx.db_path) + try: + with transaction(conn): + ctx.repository.record_market_snapshot(conn, record) + finally: + conn.close() + persisted += 1 + except Exception: # pragma: no cover — defensive + _log.exception("persist snapshot for %s failed", asset) + + if persisted: + _log.info("market_snapshot persisted %d row(s)", persisted) + return persisted diff --git a/src/cerbero_bite/runtime/orchestrator.py b/src/cerbero_bite/runtime/orchestrator.py index 99df6d0..c606436 100644 --- a/src/cerbero_bite/runtime/orchestrator.py +++ b/src/cerbero_bite/runtime/orchestrator.py @@ -29,6 +29,10 @@ from cerbero_bite.runtime.entry_cycle import EntryCycleResult, run_entry_cycle from cerbero_bite.runtime.health_check import HealthCheck, HealthCheckResult from cerbero_bite.runtime.lockfile import EngineLock from cerbero_bite.runtime.manual_actions_consumer import consume_manual_actions +from cerbero_bite.runtime.market_snapshot_cycle import ( + DEFAULT_ASSETS, + collect_market_snapshot, +) from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle from cerbero_bite.runtime.recovery import recover_state from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler @@ -47,6 +51,7 @@ _CRON_MONITOR = "0 2,14 * * *" _CRON_HEALTH = "*/5 * * * *" _CRON_BACKUP = "0 * * * *" _CRON_MANUAL_ACTIONS = "*/1 * * * *" +_CRON_MARKET_SNAPSHOT = "*/15 * * * *" _BACKUP_RETENTION_DAYS = 30 @@ -194,6 +199,8 @@ class Orchestrator: health_cron: str = _CRON_HEALTH, backup_cron: str = _CRON_BACKUP, manual_actions_cron: str = _CRON_MANUAL_ACTIONS, + market_snapshot_cron: str = _CRON_MARKET_SNAPSHOT, + market_snapshot_assets: tuple[str, ...] = DEFAULT_ASSETS, backup_dir: Path | None = None, backup_retention_days: int = _BACKUP_RETENTION_DAYS, ) -> AsyncIOScheduler: @@ -232,6 +239,11 @@ class Orchestrator: await _safe("backup", _do) + async def _run_market_snapshot_via_action() -> None: + await collect_market_snapshot( + self._ctx, assets=market_snapshot_assets + ) + async def _manual_actions() -> None: async def _do() -> None: await consume_manual_actions( @@ -240,11 +252,20 @@ class Orchestrator: "entry": self.run_entry, "monitor": self.run_monitor, "health": self.run_health, + "market_snapshot": _run_market_snapshot_via_action, }, ) await _safe("manual_actions", _do) + async def _market_snapshot() -> None: + async def _do() -> None: + await collect_market_snapshot( + self._ctx, assets=market_snapshot_assets + ) + + await _safe("market_snapshot", _do) + self._scheduler = build_scheduler( [ JobSpec(name="entry", cron=entry_cron, coro_factory=_entry), @@ -256,6 +277,11 @@ class Orchestrator: cron=manual_actions_cron, coro_factory=_manual_actions, ), + JobSpec( + name="market_snapshot", + cron=market_snapshot_cron, + coro_factory=_market_snapshot, + ), ] ) return self._scheduler diff --git a/src/cerbero_bite/state/migrations/0003_market_snapshots.sql b/src/cerbero_bite/state/migrations/0003_market_snapshots.sql new file mode 100644 index 0000000..8db13a1 --- /dev/null +++ b/src/cerbero_bite/state/migrations/0003_market_snapshots.sql @@ -0,0 +1,38 @@ +-- 0003_market_snapshots.sql — periodic market snapshot table. +-- +-- Populated by the `market_snapshot` scheduler job (cron */15) for +-- every asset traded by the engine (ETH primary, BTC as benchmark). +-- The table backs the "Calibrazione" GUI page: histograms, percentiles +-- and "% of ticks the current threshold would have blocked" let the +-- operator pick filter thresholds from observed distributions instead +-- of guessing. +-- +-- Every column except (timestamp, asset, fetch_ok) is NULL-able: a +-- single MCP call may fail and we still want to keep the row so the +-- time series stays continuous. fetch_errors_json carries the per-feed +-- error messages for offline debugging. + +CREATE TABLE market_snapshots ( + timestamp TEXT NOT NULL, + asset TEXT NOT NULL, + spot NUMERIC, + dvol NUMERIC, + realized_vol_30d NUMERIC, + iv_minus_rv NUMERIC, + funding_perp_annualized NUMERIC, + funding_cross_annualized NUMERIC, + dealer_net_gamma NUMERIC, + gamma_flip_level NUMERIC, + oi_delta_pct_4h NUMERIC, + liquidation_long_risk TEXT, + liquidation_short_risk TEXT, + macro_days_to_event INTEGER, + fetch_ok INTEGER NOT NULL, + fetch_errors_json TEXT, + PRIMARY KEY (timestamp, asset) +); + +CREATE INDEX idx_market_snapshots_asset_ts + ON market_snapshots(asset, timestamp DESC); + +PRAGMA user_version = 3; diff --git a/src/cerbero_bite/state/models.py b/src/cerbero_bite/state/models.py index 1fd5fc4..f7cd965 100644 --- a/src/cerbero_bite/state/models.py +++ b/src/cerbero_bite/state/models.py @@ -21,6 +21,7 @@ __all__ = [ "DvolSnapshot", "InstructionRecord", "ManualAction", + "MarketSnapshotRecord", "PositionRecord", "PositionStatus", "SystemStateRecord", @@ -118,6 +119,35 @@ class DvolSnapshot(BaseModel): eth_spot: Decimal +class MarketSnapshotRecord(BaseModel): + """Row of the ``market_snapshots`` table. + + Single point in time, single asset. Every numeric field is + optional because the ``market_snapshot`` collector is best-effort: + a single MCP failure NULLs the affected metric without dropping + the row. + """ + + model_config = ConfigDict(extra="forbid") + + timestamp: datetime + asset: str # "ETH", "BTC" + spot: Decimal | None = None + dvol: Decimal | None = None + realized_vol_30d: Decimal | None = None + iv_minus_rv: Decimal | None = None + funding_perp_annualized: Decimal | None = None + funding_cross_annualized: Decimal | None = None + dealer_net_gamma: Decimal | None = None + gamma_flip_level: Decimal | None = None + oi_delta_pct_4h: Decimal | None = None + liquidation_long_risk: str | None = None + liquidation_short_risk: str | None = None + macro_days_to_event: int | None = None + fetch_ok: bool + fetch_errors_json: str | None = None + + class ManualAction(BaseModel): """Row of the ``manual_actions`` table.""" diff --git a/src/cerbero_bite/state/repository.py b/src/cerbero_bite/state/repository.py index 4c46e95..aae344d 100644 --- a/src/cerbero_bite/state/repository.py +++ b/src/cerbero_bite/state/repository.py @@ -23,6 +23,7 @@ from cerbero_bite.state.models import ( DvolSnapshot, InstructionRecord, ManualAction, + MarketSnapshotRecord, PositionRecord, PositionStatus, SystemStateRecord, @@ -346,6 +347,66 @@ class Repository: ), ) + # ------------------------------------------------------------------ + # market_snapshots + # ------------------------------------------------------------------ + + def record_market_snapshot( + self, conn: sqlite3.Connection, snapshot: MarketSnapshotRecord + ) -> None: + conn.execute( + "INSERT OR REPLACE INTO market_snapshots(" + "timestamp, asset, spot, dvol, realized_vol_30d, iv_minus_rv, " + "funding_perp_annualized, funding_cross_annualized, " + "dealer_net_gamma, gamma_flip_level, oi_delta_pct_4h, " + "liquidation_long_risk, liquidation_short_risk, " + "macro_days_to_event, fetch_ok, fetch_errors_json) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + ( + _enc_dt(snapshot.timestamp), + snapshot.asset, + _enc_dec(snapshot.spot), + _enc_dec(snapshot.dvol), + _enc_dec(snapshot.realized_vol_30d), + _enc_dec(snapshot.iv_minus_rv), + _enc_dec(snapshot.funding_perp_annualized), + _enc_dec(snapshot.funding_cross_annualized), + _enc_dec(snapshot.dealer_net_gamma), + _enc_dec(snapshot.gamma_flip_level), + _enc_dec(snapshot.oi_delta_pct_4h), + snapshot.liquidation_long_risk, + snapshot.liquidation_short_risk, + snapshot.macro_days_to_event, + 1 if snapshot.fetch_ok else 0, + snapshot.fetch_errors_json, + ), + ) + + def list_market_snapshots( + self, + conn: sqlite3.Connection, + *, + asset: str, + start: datetime | None = None, + end: datetime | None = None, + limit: int = 5000, + ) -> list[MarketSnapshotRecord]: + clauses: list[str] = ["asset = ?"] + params: list[Any] = [asset] + if start is not None: + clauses.append("timestamp >= ?") + params.append(_enc_dt(start)) + if end is not None: + clauses.append("timestamp <= ?") + params.append(_enc_dt(end)) + params.append(int(limit)) + rows = conn.execute( + f"SELECT * FROM market_snapshots WHERE {' AND '.join(clauses)} " + f"ORDER BY timestamp DESC LIMIT ?", + params, + ).fetchall() + return [_row_to_market_snapshot(r) for r in rows] + # ------------------------------------------------------------------ # manual_actions # ------------------------------------------------------------------ @@ -559,6 +620,31 @@ def _row_to_manual(row: sqlite3.Row) -> ManualAction: ) +def _row_to_market_snapshot(row: sqlite3.Row) -> MarketSnapshotRecord: + return MarketSnapshotRecord( + timestamp=_dec_dt_required(row["timestamp"]), + asset=row["asset"], + spot=_dec_dec(row["spot"]), + dvol=_dec_dec(row["dvol"]), + realized_vol_30d=_dec_dec(row["realized_vol_30d"]), + iv_minus_rv=_dec_dec(row["iv_minus_rv"]), + funding_perp_annualized=_dec_dec(row["funding_perp_annualized"]), + funding_cross_annualized=_dec_dec(row["funding_cross_annualized"]), + dealer_net_gamma=_dec_dec(row["dealer_net_gamma"]), + gamma_flip_level=_dec_dec(row["gamma_flip_level"]), + oi_delta_pct_4h=_dec_dec(row["oi_delta_pct_4h"]), + liquidation_long_risk=row["liquidation_long_risk"], + liquidation_short_risk=row["liquidation_short_risk"], + macro_days_to_event=( + int(row["macro_days_to_event"]) + if row["macro_days_to_event"] is not None + else None + ), + fetch_ok=bool(int(row["fetch_ok"])), + fetch_errors_json=row["fetch_errors_json"], + ) + + def _dec_dec_required(value: Any) -> Decimal: out = _dec_dec(value) if out is None: diff --git a/tests/integration/test_orchestrator.py b/tests/integration/test_orchestrator.py index 5659fab..5f5cbc2 100644 --- a/tests/integration/test_orchestrator.py +++ b/tests/integration/test_orchestrator.py @@ -114,4 +114,11 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None: orch = _build_orch(tmp_path) sched = orch.install_scheduler() job_ids = {j.id for j in sched.get_jobs()} - assert job_ids == {"entry", "monitor", "health", "backup", "manual_actions"} + assert job_ids == { + "entry", + "monitor", + "health", + "backup", + "manual_actions", + "market_snapshot", + } diff --git a/tests/unit/test_market_snapshot_cycle.py b/tests/unit/test_market_snapshot_cycle.py new file mode 100644 index 0000000..12d6782 --- /dev/null +++ b/tests/unit/test_market_snapshot_cycle.py @@ -0,0 +1,166 @@ +"""Tests for runtime.market_snapshot_cycle (best-effort collector).""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from decimal import Decimal +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from cerbero_bite.clients._exceptions import McpDataAnomalyError +from cerbero_bite.clients.deribit import DealerGammaSnapshot +from cerbero_bite.clients.sentiment import LiquidationHeatmap +from cerbero_bite.config import golden_config +from cerbero_bite.runtime.market_snapshot_cycle import collect_market_snapshot +from cerbero_bite.state import Repository, connect, run_migrations, transaction + + +def _now() -> datetime: + return datetime(2026, 4, 30, 12, 0, tzinfo=UTC) + + +def _ctx(tmp_path: Path) -> MagicMock: + db_path = tmp_path / "state.sqlite" + repo = Repository() + conn = connect(db_path) + run_migrations(conn) + with transaction(conn): + repo.init_system_state(conn, config_version="1.0.0", now=_now()) + conn.close() + + ctx = MagicMock() + ctx.db_path = db_path + ctx.repository = repo + ctx.cfg = golden_config() + + # Default: every feed succeeds with sane mock values. + ctx.deribit = MagicMock() + ctx.deribit.spot_perp_price = AsyncMock(return_value=Decimal("3000")) + ctx.deribit.latest_dvol = AsyncMock(return_value=Decimal("55")) + ctx.deribit.realized_vol = AsyncMock( + return_value={ + "rv_14d": Decimal("28"), + "rv_30d": Decimal("35"), + "iv_minus_rv_30d": Decimal("20"), + } + ) + ctx.deribit.dealer_gamma_profile = AsyncMock( + return_value=DealerGammaSnapshot( + spot_price=Decimal("3000"), + total_net_dealer_gamma=Decimal("-66000000"), + gamma_flip_level=Decimal("2900"), + strikes_analyzed=42, + ) + ) + + ctx.hyperliquid = MagicMock() + ctx.hyperliquid.funding_rate_annualized = AsyncMock( + return_value=Decimal("0.45") + ) + + ctx.sentiment = MagicMock() + ctx.sentiment.funding_cross_median_annualized = AsyncMock( + return_value=Decimal("0.30") + ) + ctx.sentiment.liquidation_heatmap = AsyncMock( + return_value=LiquidationHeatmap( + asset="ETH", + avg_funding_rate=Decimal("0.0003"), + oi_delta_pct_4h=Decimal("1.2"), + oi_delta_pct_24h=None, + long_squeeze_risk="low", + short_squeeze_risk="low", + ) + ) + + ctx.macro = MagicMock() + ctx.macro.next_high_severity_within = AsyncMock(return_value=3) + + return ctx + + +def _read_snapshots(ctx: MagicMock, asset: str) -> list[dict]: + import sqlite3 + + conn = connect(ctx.db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + "SELECT * FROM market_snapshots WHERE asset = ? ORDER BY timestamp", + (asset,), + ).fetchall() + finally: + conn.close() + return [dict(r) for r in rows] + + +@pytest.mark.asyncio +async def test_happy_path_persists_one_row_per_asset(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + n = await collect_market_snapshot(ctx, assets=("ETH", "BTC"), now=_now()) + assert n == 2 + eth_rows = _read_snapshots(ctx, "ETH") + btc_rows = _read_snapshots(ctx, "BTC") + assert len(eth_rows) == 1 + assert len(btc_rows) == 1 + eth = eth_rows[0] + assert eth["fetch_ok"] == 1 + assert eth["fetch_errors_json"] is None + assert Decimal(str(eth["spot"])) == Decimal("3000") + assert Decimal(str(eth["dealer_net_gamma"])) == Decimal("-66000000") + assert eth["macro_days_to_event"] == 3 + + +@pytest.mark.asyncio +async def test_failure_in_one_metric_keeps_row_with_error( + tmp_path: Path, +) -> None: + ctx = _ctx(tmp_path) + ctx.deribit.dealer_gamma_profile = AsyncMock( + side_effect=McpDataAnomalyError( + "boom", service="deribit", tool="get_dealer_gamma_profile" + ) + ) + n = await collect_market_snapshot(ctx, assets=("ETH",), now=_now()) + assert n == 1 + rows = _read_snapshots(ctx, "ETH") + assert len(rows) == 1 + assert rows[0]["fetch_ok"] == 0 + errors = json.loads(rows[0]["fetch_errors_json"]) + assert "dealer_gamma" in errors + assert rows[0]["dealer_net_gamma"] is None + # Other metrics still populated. + assert Decimal(str(rows[0]["spot"])) == Decimal("3000") + + +@pytest.mark.asyncio +async def test_btc_uses_btc_in_calls(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + await collect_market_snapshot(ctx, assets=("BTC",), now=_now()) + ctx.deribit.spot_perp_price.assert_awaited_with("BTC") + ctx.hyperliquid.funding_rate_annualized.assert_awaited_with("BTC") + ctx.sentiment.liquidation_heatmap.assert_awaited_with("BTC") + + +@pytest.mark.asyncio +async def test_macro_failure_only_nulls_macro(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + ctx.macro.next_high_severity_within = AsyncMock( + side_effect=RuntimeError("calendar down") + ) + await collect_market_snapshot(ctx, assets=("ETH",), now=_now()) + rows = _read_snapshots(ctx, "ETH") + assert rows[0]["macro_days_to_event"] is None + assert rows[0]["fetch_ok"] == 0 + errors = json.loads(rows[0]["fetch_errors_json"]) + assert "macro" in errors + + +@pytest.mark.asyncio +async def test_returns_zero_for_empty_assets(tmp_path: Path) -> None: + ctx = _ctx(tmp_path) + n = await collect_market_snapshot(ctx, assets=(), now=_now()) + assert n == 0