From 19695e4730431546c191c09ce9cdd45adc78aa8d Mon Sep 17 00:00:00 2001 From: root Date: Tue, 12 May 2026 13:38:34 +0000 Subject: [PATCH] feat(state): dvol_history multi-asset (ETH+BTC) + backfill ETH legacy rows Migration 0006 promuove dvol_history da PK=(timestamp) mono-ETH a PK=(timestamp, asset), rinomina eth_spot -> spot, e backfilla con asset='ETH' le righe storiche. market_snapshot_cycle ora scrive sia per ETH che per BTC; monitor_cycle resta ETH-only via WHERE asset='ETH' nella lookup di return_4h. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/02-architecture.md | 2 +- docs/05-data-model.md | 19 +++++++--- .../runtime/market_snapshot_cycle.py | 15 ++++---- src/cerbero_bite/runtime/monitor_cycle.py | 6 ++-- .../0006_dvol_history_multi_asset.sql | 30 ++++++++++++++++ src/cerbero_bite/state/models.py | 3 +- src/cerbero_bite/state/repository.py | 7 ++-- tests/integration/test_monitor_cycle.py | 3 +- tests/unit/test_market_snapshot_cycle.py | 24 ++++++++----- tests/unit/test_state_repository.py | 35 +++++++++++++++++-- 10 files changed, 111 insertions(+), 33 deletions(-) create mode 100644 src/cerbero_bite/state/migrations/0006_dvol_history_multi_asset.sql diff --git a/docs/02-architecture.md b/docs/02-architecture.md index 878da16..d1bccc8 100644 --- a/docs/02-architecture.md +++ b/docs/02-architecture.md @@ -242,7 +242,7 @@ async def run_monitor_cycle(ctx: RuntimeContext, *, now): dvol = await deribit.latest_dvol(currency="ETH", now=now) return_4h = await _fetch_return_4h(ctx, now=now) # usa dvol_history o # fallback get_historical - repo.record_dvol_snapshot(DvolSnapshot(timestamp=now, dvol=dvol, eth_spot=spot)) + repo.record_dvol_snapshot(DvolSnapshot(timestamp=now, asset="ETH", dvol=dvol, spot=spot)) for record in repo.list_positions(status="open"): snapshot = await _build_position_snapshot(...) diff --git a/docs/05-data-model.md b/docs/05-data-model.md index b21e293..bcdb27c 100644 --- a/docs/05-data-model.md +++ b/docs/05-data-model.md @@ -137,16 +137,25 @@ CREATE INDEX idx_decisions_proposal ON decisions(proposal_id); ### `dvol_history` -Snapshot DVOL + ETH spot ad ogni evaluation. Utile per il calcolo di -`return_4h` durante il monitor (vedi `runtime/monitor_cycle.py -_fetch_return_4h`) e per analisi post-mortem. +Snapshot DVOL + spot per asset ad ogni evaluation. Utile per il +calcolo di `return_4h` durante il monitor (vedi +`runtime/monitor_cycle.py _fetch_return_4h`, che filtra `asset='ETH'`) +e per analisi post-mortem comparate fra ETH e BTC. Lo schema è +multi-asset dal migration `0006_dvol_history_multi_asset.sql`; le +righe storiche pre-migration sono state backfillate con +`asset='ETH'`. ```sql CREATE TABLE dvol_history ( - timestamp TEXT PRIMARY KEY, + timestamp TEXT NOT NULL, + asset TEXT NOT NULL, -- "ETH", "BTC" dvol NUMERIC NOT NULL, - eth_spot NUMERIC NOT NULL + spot NUMERIC NOT NULL, + PRIMARY KEY (timestamp, asset) ); + +CREATE INDEX idx_dvol_history_asset_ts + ON dvol_history(asset, timestamp DESC); ``` ### `manual_actions` diff --git a/src/cerbero_bite/runtime/market_snapshot_cycle.py b/src/cerbero_bite/runtime/market_snapshot_cycle.py index 854725e..54fe478 100644 --- a/src/cerbero_bite/runtime/market_snapshot_cycle.py +++ b/src/cerbero_bite/runtime/market_snapshot_cycle.py @@ -181,19 +181,18 @@ async def collect_market_snapshot( try: with transaction(conn): ctx.repository.record_market_snapshot(conn, record) - # Mirror ETH spot+DVOL into dvol_history so monitor_cycle's - # return_4h lookup has local samples even in data-only mode. - if ( - record.asset == "ETH" - and record.spot is not None - and record.dvol is not None - ): + # Mirror spot+DVOL into dvol_history (per asset) so + # monitor_cycle's return_4h lookup has local samples even + # in data-only mode. dvol_history enforces NOT NULL on + # dvol/spot so skip if either is missing. + if record.spot is not None and record.dvol is not None: ctx.repository.record_dvol_snapshot( conn, DvolSnapshot( timestamp=record.timestamp, + asset=record.asset, dvol=record.dvol, - eth_spot=record.spot, + spot=record.spot, ), ) finally: diff --git a/src/cerbero_bite/runtime/monitor_cycle.py b/src/cerbero_bite/runtime/monitor_cycle.py index c7b3875..d520b66 100644 --- a/src/cerbero_bite/runtime/monitor_cycle.py +++ b/src/cerbero_bite/runtime/monitor_cycle.py @@ -173,8 +173,8 @@ async def _fetch_return_4h(ctx: RuntimeContext, *, now: datetime) -> Decimal: conn = connect_state(ctx.db_path) try: row = conn.execute( - "SELECT timestamp, eth_spot FROM dvol_history " - "WHERE timestamp <= ? AND timestamp >= ? " + "SELECT timestamp, spot FROM dvol_history " + "WHERE asset = 'ETH' AND timestamp <= ? AND timestamp >= ? " "ORDER BY timestamp DESC LIMIT 1", (cutoff.isoformat(), floor.isoformat()), ).fetchone() @@ -239,7 +239,7 @@ async def run_monitor_cycle( with transaction(conn): ctx.repository.record_dvol_snapshot( conn, - DvolSnapshot(timestamp=when, dvol=dvol, eth_spot=spot), + DvolSnapshot(timestamp=when, asset="ETH", dvol=dvol, spot=spot), ) positions = ctx.repository.list_positions(conn, status="open") finally: diff --git a/src/cerbero_bite/state/migrations/0006_dvol_history_multi_asset.sql b/src/cerbero_bite/state/migrations/0006_dvol_history_multi_asset.sql new file mode 100644 index 0000000..5776af2 --- /dev/null +++ b/src/cerbero_bite/state/migrations/0006_dvol_history_multi_asset.sql @@ -0,0 +1,30 @@ +-- 0006_dvol_history_multi_asset.sql — promote dvol_history to multi-asset +-- +-- Original schema (0001_init.sql) treated dvol_history as ETH-only: +-- PRIMARY KEY (timestamp) and a column named eth_spot. With the +-- orchestrator now snapshotting BTC in addition to ETH (commit +-- e978a44), the table needs an asset dimension so we can store a +-- DVOL/spot sample per asset per tick. +-- +-- Forward-only. The 1028 existing rows are all ETH (the only writer +-- was the ETH branch of market_snapshot_cycle) so we backfill +-- asset='ETH' before swapping the table in place. + +CREATE TABLE dvol_history_v2 ( + timestamp TEXT NOT NULL, + asset TEXT NOT NULL, + dvol NUMERIC NOT NULL, + spot NUMERIC NOT NULL, + PRIMARY KEY (timestamp, asset) +); + +INSERT INTO dvol_history_v2(timestamp, asset, dvol, spot) +SELECT timestamp, 'ETH', dvol, eth_spot FROM dvol_history; + +DROP TABLE dvol_history; +ALTER TABLE dvol_history_v2 RENAME TO dvol_history; + +CREATE INDEX idx_dvol_history_asset_ts + ON dvol_history(asset, timestamp DESC); + +PRAGMA user_version = 6; diff --git a/src/cerbero_bite/state/models.py b/src/cerbero_bite/state/models.py index 15d4633..336e092 100644 --- a/src/cerbero_bite/state/models.py +++ b/src/cerbero_bite/state/models.py @@ -116,8 +116,9 @@ class DvolSnapshot(BaseModel): model_config = ConfigDict(extra="forbid") timestamp: datetime + asset: str # "ETH", "BTC" dvol: Decimal - eth_spot: Decimal + spot: Decimal class MarketSnapshotRecord(BaseModel): diff --git a/src/cerbero_bite/state/repository.py b/src/cerbero_bite/state/repository.py index 5f62c05..b3c3373 100644 --- a/src/cerbero_bite/state/repository.py +++ b/src/cerbero_bite/state/repository.py @@ -339,12 +339,13 @@ class Repository: self, conn: sqlite3.Connection, snapshot: DvolSnapshot ) -> None: conn.execute( - "INSERT OR REPLACE INTO dvol_history(timestamp, dvol, eth_spot) " - "VALUES (?,?,?)", + "INSERT OR REPLACE INTO dvol_history(timestamp, asset, dvol, spot) " + "VALUES (?,?,?,?)", ( _enc_dt(snapshot.timestamp), + snapshot.asset, _enc_dec(snapshot.dvol), - _enc_dec(snapshot.eth_spot), + _enc_dec(snapshot.spot), ), ) diff --git a/tests/integration/test_monitor_cycle.py b/tests/integration/test_monitor_cycle.py index f3b7981..00c446e 100644 --- a/tests/integration/test_monitor_cycle.py +++ b/tests/integration/test_monitor_cycle.py @@ -87,7 +87,8 @@ def _seed_dvol_history(ctx, *, when: datetime, spot: Decimal, dvol: Decimal): try: with transaction(conn): ctx.repository.record_dvol_snapshot( - conn, DvolSnapshot(timestamp=when, dvol=dvol, eth_spot=spot) + conn, + DvolSnapshot(timestamp=when, asset="ETH", dvol=dvol, spot=spot), ) finally: conn.close() diff --git a/tests/unit/test_market_snapshot_cycle.py b/tests/unit/test_market_snapshot_cycle.py index ec09cd5..8ba5164 100644 --- a/tests/unit/test_market_snapshot_cycle.py +++ b/tests/unit/test_market_snapshot_cycle.py @@ -38,7 +38,9 @@ def _ctx(tmp_path: Path) -> MagicMock: # Default: every feed succeeds with sane mock values. ctx.deribit = MagicMock() - ctx.deribit.spot_perp_price = AsyncMock(return_value=Decimal("3000")) + ctx.deribit.spot_perp_price = AsyncMock( + side_effect=lambda asset: Decimal("65000") if asset == "BTC" else Decimal("3000") + ) ctx.deribit.latest_dvol = AsyncMock(return_value=Decimal("55")) ctx.deribit.realized_vol = AsyncMock( return_value={ @@ -181,31 +183,35 @@ def _read_dvol_history(ctx: MagicMock) -> list[dict]: @pytest.mark.asyncio -async def test_eth_snapshot_mirrors_into_dvol_history(tmp_path: Path) -> None: +async def test_snapshot_mirrors_each_asset_into_dvol_history(tmp_path: Path) -> None: ctx = _ctx(tmp_path) await collect_market_snapshot(ctx, assets=("ETH", "BTC"), now=_now()) rows = _read_dvol_history(ctx) - assert len(rows) == 1 - assert Decimal(str(rows[0]["dvol"])) == Decimal("55") - assert Decimal(str(rows[0]["eth_spot"])) == Decimal("3000") + by_asset = {r["asset"]: r for r in rows} + assert set(by_asset) == {"ETH", "BTC"} + assert Decimal(str(by_asset["ETH"]["spot"])) == Decimal("3000") + assert Decimal(str(by_asset["BTC"]["spot"])) == Decimal("65000") @pytest.mark.asyncio -async def test_btc_only_snapshot_does_not_touch_dvol_history( +async def test_btc_only_snapshot_mirrors_into_dvol_history( tmp_path: Path, ) -> None: ctx = _ctx(tmp_path) await collect_market_snapshot(ctx, assets=("BTC",), now=_now()) - assert _read_dvol_history(ctx) == [] + rows = _read_dvol_history(ctx) + assert len(rows) == 1 + assert rows[0]["asset"] == "BTC" + assert Decimal(str(rows[0]["spot"])) == Decimal("65000") @pytest.mark.asyncio -async def test_eth_snapshot_skips_dvol_history_when_dvol_missing( +async def test_snapshot_skips_dvol_history_when_dvol_missing( tmp_path: Path, ) -> None: ctx = _ctx(tmp_path) ctx.deribit.latest_dvol = AsyncMock(side_effect=RuntimeError("no dvol")) await collect_market_snapshot(ctx, assets=("ETH",), now=_now()) # market_snapshots row still persisted, but dvol_history must stay empty - # because its schema enforces NOT NULL on dvol/eth_spot. + # because its schema enforces NOT NULL on dvol/spot. assert _read_dvol_history(ctx) == [] diff --git a/tests/unit/test_state_repository.py b/tests/unit/test_state_repository.py index 1264574..67b16b8 100644 --- a/tests/unit/test_state_repository.py +++ b/tests/unit/test_state_repository.py @@ -277,15 +277,46 @@ def test_record_dvol_snapshot_replaces_on_duplicate_timestamp( ts = datetime(2026, 4, 27, 14, 0, tzinfo=UTC) with transaction(conn): repo.record_dvol_snapshot( - conn, DvolSnapshot(timestamp=ts, dvol=Decimal("50"), eth_spot=Decimal("3000")) + conn, + DvolSnapshot( + timestamp=ts, asset="ETH", dvol=Decimal("50"), spot=Decimal("3000") + ), ) repo.record_dvol_snapshot( - conn, DvolSnapshot(timestamp=ts, dvol=Decimal("55"), eth_spot=Decimal("3050")) + conn, + DvolSnapshot( + timestamp=ts, asset="ETH", dvol=Decimal("55"), spot=Decimal("3050") + ), ) rows = conn.execute("SELECT COUNT(*) FROM dvol_history").fetchone() assert rows[0] == 1 +def test_record_dvol_snapshot_keeps_assets_distinct_on_same_timestamp( + conn: sqlite3.Connection, repo: Repository +) -> None: + ts = datetime(2026, 4, 27, 14, 0, tzinfo=UTC) + with transaction(conn): + repo.record_dvol_snapshot( + conn, + DvolSnapshot( + timestamp=ts, asset="ETH", dvol=Decimal("50"), spot=Decimal("3000") + ), + ) + repo.record_dvol_snapshot( + conn, + DvolSnapshot( + timestamp=ts, asset="BTC", dvol=Decimal("45"), spot=Decimal("65000") + ), + ) + rows = conn.execute( + "SELECT asset, dvol, spot FROM dvol_history ORDER BY asset" + ).fetchall() + assert [r["asset"] for r in rows] == ["BTC", "ETH"] + assert Decimal(str(rows[0]["spot"])) == Decimal("65000") + assert Decimal(str(rows[1]["spot"])) == Decimal("3000") + + def test_manual_action_enqueue_consume_cycle( conn: sqlite3.Connection, repo: Repository ) -> None: