a2e7a78f8a
Popola dvol_history dentro la stessa transazione di market_snapshots, così lo storico è disponibile anche in modalità data-only (STRATEGY=false). Evita il warm-up vuoto di return_4h quando si abilita la strategia: il monitor_cycle trova subito i campioni locali invece di dipendere dal fallback Deribit get_historical. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
208 lines
6.6 KiB
Python
208 lines
6.6 KiB
Python
"""Periodic market-snapshot collector.
|
|
|
|
Drives the ``market_snapshots`` table populated by the scheduler job
|
|
``market_snapshot`` (cron */15 by default). For every traded asset the
|
|
collector calls the same MCP feeds the entry/monitor cycles consume,
|
|
but in **best-effort mode**: a single failure leaves the corresponding
|
|
column NULL and the row is still persisted, with an error map in
|
|
``fetch_errors_json`` for debugging. This keeps the time series
|
|
continuous even when one of the feeds is briefly down — the
|
|
distributions are what matters for threshold calibration, not the
|
|
real-time correctness of any single tick.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from collections.abc import Awaitable, Callable
|
|
from datetime import UTC, datetime
|
|
from decimal import Decimal
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from cerbero_bite.clients._exceptions import McpError
|
|
from cerbero_bite.state import connect, transaction
|
|
from cerbero_bite.state.models import DvolSnapshot, MarketSnapshotRecord
|
|
|
|
if TYPE_CHECKING:
|
|
from cerbero_bite.runtime.dependencies import RuntimeContext
|
|
|
|
__all__ = ["DEFAULT_ASSETS", "collect_market_snapshot"]
|
|
|
|
|
|
_log = logging.getLogger("cerbero_bite.runtime.market_snapshot")
|
|
|
|
|
|
DEFAULT_ASSETS: tuple[str, ...] = ("ETH", "BTC")
|
|
|
|
|
|
async def _safe_call(
|
|
label: str,
|
|
factory: Callable[[], Awaitable[Any]],
|
|
errors: dict[str, str],
|
|
) -> Any:
|
|
try:
|
|
return await factory()
|
|
except (McpError, Exception) as exc: # pragma: no branch — best-effort
|
|
errors[label] = f"{type(exc).__name__}: {exc}"
|
|
return None
|
|
|
|
|
|
def _decimal_or_none(value: Any) -> Decimal | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, Decimal):
|
|
return value
|
|
try:
|
|
return Decimal(str(value))
|
|
except (ValueError, ArithmeticError):
|
|
return None
|
|
|
|
|
|
async def _collect_one(
|
|
ctx: RuntimeContext, asset: str, *, when: datetime
|
|
) -> MarketSnapshotRecord:
|
|
errors: dict[str, str] = {}
|
|
asset_upper = asset.upper()
|
|
|
|
spot = await _safe_call(
|
|
"spot",
|
|
lambda: ctx.deribit.spot_perp_price(asset_upper),
|
|
errors,
|
|
)
|
|
dvol_value = await _safe_call(
|
|
"dvol",
|
|
lambda: ctx.deribit.latest_dvol(currency=asset_upper, now=when),
|
|
errors,
|
|
)
|
|
rv = await _safe_call(
|
|
"realized_vol",
|
|
lambda: ctx.deribit.realized_vol(asset_upper),
|
|
errors,
|
|
)
|
|
gamma = await _safe_call(
|
|
"dealer_gamma",
|
|
lambda: ctx.deribit.dealer_gamma_profile(asset_upper),
|
|
errors,
|
|
)
|
|
funding_perp = await _safe_call(
|
|
"funding_perp",
|
|
lambda: ctx.hyperliquid.funding_rate_annualized(asset_upper),
|
|
errors,
|
|
)
|
|
funding_cross = await _safe_call(
|
|
"funding_cross",
|
|
lambda: ctx.sentiment.funding_cross_median_annualized(asset_upper),
|
|
errors,
|
|
)
|
|
heatmap = await _safe_call(
|
|
"liquidation",
|
|
lambda: ctx.sentiment.liquidation_heatmap(asset_upper),
|
|
errors,
|
|
)
|
|
macro_days = await _safe_call(
|
|
"macro",
|
|
lambda: ctx.macro.next_high_severity_within(
|
|
days=ctx.cfg.structure.dte_target,
|
|
countries=list(ctx.cfg.entry.exclude_macro_countries),
|
|
now=when,
|
|
),
|
|
errors,
|
|
)
|
|
|
|
rv_30 = (rv or {}).get("rv_30d") if isinstance(rv, dict) else None
|
|
iv_minus_rv_30 = (
|
|
(rv or {}).get("iv_minus_rv_30d") if isinstance(rv, dict) else None
|
|
)
|
|
|
|
return MarketSnapshotRecord(
|
|
timestamp=when,
|
|
asset=asset_upper,
|
|
spot=_decimal_or_none(spot),
|
|
dvol=_decimal_or_none(dvol_value),
|
|
realized_vol_30d=_decimal_or_none(rv_30),
|
|
iv_minus_rv=_decimal_or_none(iv_minus_rv_30),
|
|
funding_perp_annualized=_decimal_or_none(funding_perp),
|
|
funding_cross_annualized=_decimal_or_none(funding_cross),
|
|
dealer_net_gamma=(
|
|
_decimal_or_none(gamma.total_net_dealer_gamma)
|
|
if gamma is not None
|
|
else None
|
|
),
|
|
gamma_flip_level=(
|
|
_decimal_or_none(gamma.gamma_flip_level)
|
|
if gamma is not None
|
|
else None
|
|
),
|
|
oi_delta_pct_4h=(
|
|
_decimal_or_none(heatmap.oi_delta_pct_4h)
|
|
if heatmap is not None
|
|
else None
|
|
),
|
|
liquidation_long_risk=(
|
|
heatmap.long_squeeze_risk if heatmap is not None else None
|
|
),
|
|
liquidation_short_risk=(
|
|
heatmap.short_squeeze_risk if heatmap is not None else None
|
|
),
|
|
macro_days_to_event=(
|
|
int(macro_days) if isinstance(macro_days, int) else None
|
|
),
|
|
fetch_ok=not errors,
|
|
fetch_errors_json=(json.dumps(errors) if errors else None),
|
|
)
|
|
|
|
|
|
async def collect_market_snapshot(
|
|
ctx: RuntimeContext,
|
|
*,
|
|
assets: tuple[str, ...] = DEFAULT_ASSETS,
|
|
now: datetime | None = None,
|
|
) -> int:
|
|
"""Collect + persist one snapshot per asset. Returns count persisted.
|
|
|
|
The function is sync at heart (sequential per asset to keep MCP
|
|
load light) but kept ``async def`` so APScheduler can schedule it
|
|
directly. A single asset failing does not abort the loop — the
|
|
other assets are still snapshotted.
|
|
"""
|
|
when = (now or datetime.now(UTC)).astimezone(UTC)
|
|
persisted = 0
|
|
|
|
for asset in assets:
|
|
try:
|
|
record = await _collect_one(ctx, asset, when=when)
|
|
except Exception: # pragma: no cover — defensive
|
|
_log.exception("snapshot for %s failed catastrophically", asset)
|
|
continue
|
|
|
|
try:
|
|
conn = connect(ctx.db_path)
|
|
try:
|
|
with transaction(conn):
|
|
ctx.repository.record_market_snapshot(conn, record)
|
|
# Mirror ETH spot+DVOL into dvol_history so monitor_cycle's
|
|
# return_4h lookup has local samples even in data-only mode.
|
|
if (
|
|
record.asset == "ETH"
|
|
and record.spot is not None
|
|
and record.dvol is not None
|
|
):
|
|
ctx.repository.record_dvol_snapshot(
|
|
conn,
|
|
DvolSnapshot(
|
|
timestamp=record.timestamp,
|
|
dvol=record.dvol,
|
|
eth_spot=record.spot,
|
|
),
|
|
)
|
|
finally:
|
|
conn.close()
|
|
persisted += 1
|
|
except Exception: # pragma: no cover — defensive
|
|
_log.exception("persist snapshot for %s failed", asset)
|
|
|
|
if persisted:
|
|
_log.info("market_snapshot persisted %d row(s)", persisted)
|
|
return persisted
|