Files
Cerbero-Bite/tests/unit/test_market_snapshot_cycle.py
root a2e7a78f8a feat(data): mirror ETH spot+DVOL in dvol_history dal market_snapshot
Popola dvol_history dentro la stessa transazione di market_snapshots,
così lo storico è disponibile anche in modalità data-only (STRATEGY=false).
Evita il warm-up vuoto di return_4h quando si abilita la strategia: il
monitor_cycle trova subito i campioni locali invece di dipendere dal
fallback Deribit get_historical.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 21:59:27 +00:00

212 lines
6.7 KiB
Python

"""Tests for runtime.market_snapshot_cycle (best-effort collector)."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from decimal import Decimal
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from cerbero_bite.clients._exceptions import McpDataAnomalyError
from cerbero_bite.clients.deribit import DealerGammaSnapshot
from cerbero_bite.clients.sentiment import LiquidationHeatmap
from cerbero_bite.config import golden_config
from cerbero_bite.runtime.market_snapshot_cycle import collect_market_snapshot
from cerbero_bite.state import Repository, connect, run_migrations, transaction
def _now() -> datetime:
return datetime(2026, 4, 30, 12, 0, tzinfo=UTC)
def _ctx(tmp_path: Path) -> MagicMock:
db_path = tmp_path / "state.sqlite"
repo = Repository()
conn = connect(db_path)
run_migrations(conn)
with transaction(conn):
repo.init_system_state(conn, config_version="1.0.0", now=_now())
conn.close()
ctx = MagicMock()
ctx.db_path = db_path
ctx.repository = repo
ctx.cfg = golden_config()
# Default: every feed succeeds with sane mock values.
ctx.deribit = MagicMock()
ctx.deribit.spot_perp_price = AsyncMock(return_value=Decimal("3000"))
ctx.deribit.latest_dvol = AsyncMock(return_value=Decimal("55"))
ctx.deribit.realized_vol = AsyncMock(
return_value={
"rv_14d": Decimal("28"),
"rv_30d": Decimal("35"),
"iv_minus_rv_30d": Decimal("20"),
}
)
ctx.deribit.dealer_gamma_profile = AsyncMock(
return_value=DealerGammaSnapshot(
spot_price=Decimal("3000"),
total_net_dealer_gamma=Decimal("-66000000"),
gamma_flip_level=Decimal("2900"),
strikes_analyzed=42,
)
)
ctx.hyperliquid = MagicMock()
ctx.hyperliquid.funding_rate_annualized = AsyncMock(
return_value=Decimal("0.45")
)
ctx.sentiment = MagicMock()
ctx.sentiment.funding_cross_median_annualized = AsyncMock(
return_value=Decimal("0.30")
)
ctx.sentiment.liquidation_heatmap = AsyncMock(
return_value=LiquidationHeatmap(
asset="ETH",
avg_funding_rate=Decimal("0.0003"),
oi_delta_pct_4h=Decimal("1.2"),
oi_delta_pct_24h=None,
long_squeeze_risk="low",
short_squeeze_risk="low",
)
)
ctx.macro = MagicMock()
ctx.macro.next_high_severity_within = AsyncMock(return_value=3)
return ctx
def _read_snapshots(ctx: MagicMock, asset: str) -> list[dict]:
import sqlite3
conn = connect(ctx.db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"SELECT * FROM market_snapshots WHERE asset = ? ORDER BY timestamp",
(asset,),
).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
@pytest.mark.asyncio
async def test_happy_path_persists_one_row_per_asset(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
n = await collect_market_snapshot(ctx, assets=("ETH", "BTC"), now=_now())
assert n == 2
eth_rows = _read_snapshots(ctx, "ETH")
btc_rows = _read_snapshots(ctx, "BTC")
assert len(eth_rows) == 1
assert len(btc_rows) == 1
eth = eth_rows[0]
assert eth["fetch_ok"] == 1
assert eth["fetch_errors_json"] is None
assert Decimal(str(eth["spot"])) == Decimal("3000")
assert Decimal(str(eth["dealer_net_gamma"])) == Decimal("-66000000")
assert eth["macro_days_to_event"] == 3
@pytest.mark.asyncio
async def test_failure_in_one_metric_keeps_row_with_error(
tmp_path: Path,
) -> None:
ctx = _ctx(tmp_path)
ctx.deribit.dealer_gamma_profile = AsyncMock(
side_effect=McpDataAnomalyError(
"boom", service="deribit", tool="get_dealer_gamma_profile"
)
)
n = await collect_market_snapshot(ctx, assets=("ETH",), now=_now())
assert n == 1
rows = _read_snapshots(ctx, "ETH")
assert len(rows) == 1
assert rows[0]["fetch_ok"] == 0
errors = json.loads(rows[0]["fetch_errors_json"])
assert "dealer_gamma" in errors
assert rows[0]["dealer_net_gamma"] is None
# Other metrics still populated.
assert Decimal(str(rows[0]["spot"])) == Decimal("3000")
@pytest.mark.asyncio
async def test_btc_uses_btc_in_calls(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
await collect_market_snapshot(ctx, assets=("BTC",), now=_now())
ctx.deribit.spot_perp_price.assert_awaited_with("BTC")
ctx.hyperliquid.funding_rate_annualized.assert_awaited_with("BTC")
ctx.sentiment.liquidation_heatmap.assert_awaited_with("BTC")
@pytest.mark.asyncio
async def test_macro_failure_only_nulls_macro(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
ctx.macro.next_high_severity_within = AsyncMock(
side_effect=RuntimeError("calendar down")
)
await collect_market_snapshot(ctx, assets=("ETH",), now=_now())
rows = _read_snapshots(ctx, "ETH")
assert rows[0]["macro_days_to_event"] is None
assert rows[0]["fetch_ok"] == 0
errors = json.loads(rows[0]["fetch_errors_json"])
assert "macro" in errors
@pytest.mark.asyncio
async def test_returns_zero_for_empty_assets(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
n = await collect_market_snapshot(ctx, assets=(), now=_now())
assert n == 0
def _read_dvol_history(ctx: MagicMock) -> list[dict]:
import sqlite3
conn = connect(ctx.db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"SELECT * FROM dvol_history ORDER BY timestamp"
).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
@pytest.mark.asyncio
async def test_eth_snapshot_mirrors_into_dvol_history(tmp_path: Path) -> None:
ctx = _ctx(tmp_path)
await collect_market_snapshot(ctx, assets=("ETH", "BTC"), now=_now())
rows = _read_dvol_history(ctx)
assert len(rows) == 1
assert Decimal(str(rows[0]["dvol"])) == Decimal("55")
assert Decimal(str(rows[0]["eth_spot"])) == Decimal("3000")
@pytest.mark.asyncio
async def test_btc_only_snapshot_does_not_touch_dvol_history(
tmp_path: Path,
) -> None:
ctx = _ctx(tmp_path)
await collect_market_snapshot(ctx, assets=("BTC",), now=_now())
assert _read_dvol_history(ctx) == []
@pytest.mark.asyncio
async def test_eth_snapshot_skips_dvol_history_when_dvol_missing(
tmp_path: Path,
) -> None:
ctx = _ctx(tmp_path)
ctx.deribit.latest_dvol = AsyncMock(side_effect=RuntimeError("no dvol"))
await collect_market_snapshot(ctx, assets=("ETH",), now=_now())
# market_snapshots row still persisted, but dvol_history must stay empty
# because its schema enforces NOT NULL on dvol/eth_spot.
assert _read_dvol_history(ctx) == []