a2e7a78f8a
Popola dvol_history dentro la stessa transazione di market_snapshots, così lo storico è disponibile anche in modalità data-only (STRATEGY=false). Evita il warm-up vuoto di return_4h quando si abilita la strategia: il monitor_cycle trova subito i campioni locali invece di dipendere dal fallback Deribit get_historical. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
212 lines
6.7 KiB
Python
212 lines
6.7 KiB
Python
"""Tests for runtime.market_snapshot_cycle (best-effort collector)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import UTC, datetime
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from cerbero_bite.clients._exceptions import McpDataAnomalyError
|
|
from cerbero_bite.clients.deribit import DealerGammaSnapshot
|
|
from cerbero_bite.clients.sentiment import LiquidationHeatmap
|
|
from cerbero_bite.config import golden_config
|
|
from cerbero_bite.runtime.market_snapshot_cycle import collect_market_snapshot
|
|
from cerbero_bite.state import Repository, connect, run_migrations, transaction
|
|
|
|
|
|
def _now() -> datetime:
|
|
return datetime(2026, 4, 30, 12, 0, tzinfo=UTC)
|
|
|
|
|
|
def _ctx(tmp_path: Path) -> MagicMock:
|
|
db_path = tmp_path / "state.sqlite"
|
|
repo = Repository()
|
|
conn = connect(db_path)
|
|
run_migrations(conn)
|
|
with transaction(conn):
|
|
repo.init_system_state(conn, config_version="1.0.0", now=_now())
|
|
conn.close()
|
|
|
|
ctx = MagicMock()
|
|
ctx.db_path = db_path
|
|
ctx.repository = repo
|
|
ctx.cfg = golden_config()
|
|
|
|
# Default: every feed succeeds with sane mock values.
|
|
ctx.deribit = MagicMock()
|
|
ctx.deribit.spot_perp_price = AsyncMock(return_value=Decimal("3000"))
|
|
ctx.deribit.latest_dvol = AsyncMock(return_value=Decimal("55"))
|
|
ctx.deribit.realized_vol = AsyncMock(
|
|
return_value={
|
|
"rv_14d": Decimal("28"),
|
|
"rv_30d": Decimal("35"),
|
|
"iv_minus_rv_30d": Decimal("20"),
|
|
}
|
|
)
|
|
ctx.deribit.dealer_gamma_profile = AsyncMock(
|
|
return_value=DealerGammaSnapshot(
|
|
spot_price=Decimal("3000"),
|
|
total_net_dealer_gamma=Decimal("-66000000"),
|
|
gamma_flip_level=Decimal("2900"),
|
|
strikes_analyzed=42,
|
|
)
|
|
)
|
|
|
|
ctx.hyperliquid = MagicMock()
|
|
ctx.hyperliquid.funding_rate_annualized = AsyncMock(
|
|
return_value=Decimal("0.45")
|
|
)
|
|
|
|
ctx.sentiment = MagicMock()
|
|
ctx.sentiment.funding_cross_median_annualized = AsyncMock(
|
|
return_value=Decimal("0.30")
|
|
)
|
|
ctx.sentiment.liquidation_heatmap = AsyncMock(
|
|
return_value=LiquidationHeatmap(
|
|
asset="ETH",
|
|
avg_funding_rate=Decimal("0.0003"),
|
|
oi_delta_pct_4h=Decimal("1.2"),
|
|
oi_delta_pct_24h=None,
|
|
long_squeeze_risk="low",
|
|
short_squeeze_risk="low",
|
|
)
|
|
)
|
|
|
|
ctx.macro = MagicMock()
|
|
ctx.macro.next_high_severity_within = AsyncMock(return_value=3)
|
|
|
|
return ctx
|
|
|
|
|
|
def _read_snapshots(ctx: MagicMock, asset: str) -> list[dict]:
|
|
import sqlite3
|
|
|
|
conn = connect(ctx.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT * FROM market_snapshots WHERE asset = ? ORDER BY timestamp",
|
|
(asset,),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_happy_path_persists_one_row_per_asset(tmp_path: Path) -> None:
|
|
ctx = _ctx(tmp_path)
|
|
n = await collect_market_snapshot(ctx, assets=("ETH", "BTC"), now=_now())
|
|
assert n == 2
|
|
eth_rows = _read_snapshots(ctx, "ETH")
|
|
btc_rows = _read_snapshots(ctx, "BTC")
|
|
assert len(eth_rows) == 1
|
|
assert len(btc_rows) == 1
|
|
eth = eth_rows[0]
|
|
assert eth["fetch_ok"] == 1
|
|
assert eth["fetch_errors_json"] is None
|
|
assert Decimal(str(eth["spot"])) == Decimal("3000")
|
|
assert Decimal(str(eth["dealer_net_gamma"])) == Decimal("-66000000")
|
|
assert eth["macro_days_to_event"] == 3
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failure_in_one_metric_keeps_row_with_error(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
ctx = _ctx(tmp_path)
|
|
ctx.deribit.dealer_gamma_profile = AsyncMock(
|
|
side_effect=McpDataAnomalyError(
|
|
"boom", service="deribit", tool="get_dealer_gamma_profile"
|
|
)
|
|
)
|
|
n = await collect_market_snapshot(ctx, assets=("ETH",), now=_now())
|
|
assert n == 1
|
|
rows = _read_snapshots(ctx, "ETH")
|
|
assert len(rows) == 1
|
|
assert rows[0]["fetch_ok"] == 0
|
|
errors = json.loads(rows[0]["fetch_errors_json"])
|
|
assert "dealer_gamma" in errors
|
|
assert rows[0]["dealer_net_gamma"] is None
|
|
# Other metrics still populated.
|
|
assert Decimal(str(rows[0]["spot"])) == Decimal("3000")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_btc_uses_btc_in_calls(tmp_path: Path) -> None:
|
|
ctx = _ctx(tmp_path)
|
|
await collect_market_snapshot(ctx, assets=("BTC",), now=_now())
|
|
ctx.deribit.spot_perp_price.assert_awaited_with("BTC")
|
|
ctx.hyperliquid.funding_rate_annualized.assert_awaited_with("BTC")
|
|
ctx.sentiment.liquidation_heatmap.assert_awaited_with("BTC")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_macro_failure_only_nulls_macro(tmp_path: Path) -> None:
|
|
ctx = _ctx(tmp_path)
|
|
ctx.macro.next_high_severity_within = AsyncMock(
|
|
side_effect=RuntimeError("calendar down")
|
|
)
|
|
await collect_market_snapshot(ctx, assets=("ETH",), now=_now())
|
|
rows = _read_snapshots(ctx, "ETH")
|
|
assert rows[0]["macro_days_to_event"] is None
|
|
assert rows[0]["fetch_ok"] == 0
|
|
errors = json.loads(rows[0]["fetch_errors_json"])
|
|
assert "macro" in errors
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_zero_for_empty_assets(tmp_path: Path) -> None:
|
|
ctx = _ctx(tmp_path)
|
|
n = await collect_market_snapshot(ctx, assets=(), now=_now())
|
|
assert n == 0
|
|
|
|
|
|
def _read_dvol_history(ctx: MagicMock) -> list[dict]:
|
|
import sqlite3
|
|
|
|
conn = connect(ctx.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT * FROM dvol_history ORDER BY timestamp"
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_eth_snapshot_mirrors_into_dvol_history(tmp_path: Path) -> None:
|
|
ctx = _ctx(tmp_path)
|
|
await collect_market_snapshot(ctx, assets=("ETH", "BTC"), now=_now())
|
|
rows = _read_dvol_history(ctx)
|
|
assert len(rows) == 1
|
|
assert Decimal(str(rows[0]["dvol"])) == Decimal("55")
|
|
assert Decimal(str(rows[0]["eth_spot"])) == Decimal("3000")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_btc_only_snapshot_does_not_touch_dvol_history(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
ctx = _ctx(tmp_path)
|
|
await collect_market_snapshot(ctx, assets=("BTC",), now=_now())
|
|
assert _read_dvol_history(ctx) == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_eth_snapshot_skips_dvol_history_when_dvol_missing(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
ctx = _ctx(tmp_path)
|
|
ctx.deribit.latest_dvol = AsyncMock(side_effect=RuntimeError("no dvol"))
|
|
await collect_market_snapshot(ctx, assets=("ETH",), now=_now())
|
|
# market_snapshots row still persisted, but dvol_history must stay empty
|
|
# because its schema enforces NOT NULL on dvol/eth_spot.
|
|
assert _read_dvol_history(ctx) == []
|