Files
Cerbero-Bite/tests/integration/test_orchestrator.py
root c0a0ee416f feat(state+runtime): option_chain_snapshots — catena opzioni storica per backtest reale
Aggiunge la persistence della option chain Deribit con cron settimanale
``55 13 * * MON`` (5 minuti prima del trigger entry alle 14:00 UTC),
sbloccando il backtest non-stilizzato e la calibrazione empirica
dello skew premium.

**Schema (migrazione 0004)**

Nuova tabella ``option_chain_snapshots`` con primary key composta
``(timestamp, instrument_name)`` — tutti i quote prelevati nello
stesso tick condividono il timestamp, così le query "lo snapshot del
2026-05-04 alle 13:55" diventano una singola WHERE timestamp = X.
Indici su (asset, timestamp DESC) e (asset, expiry) per supportare
sia listing recenti sia query per scadenza specifica.

Campi: instrument_name, strike, expiry, option_type (C/P), bid, ask,
mid, iv, delta, gamma, theta, vega, open_interest, volume_24h,
book_depth_top3. Tutti i numerici sono nullable: il collector è
best-effort, un ticker mancante produce comunque una riga (utile
per sapere che lo strumento esisteva ma non era quotato).

**Modello + repository**

- ``OptionChainQuoteRecord`` (Pydantic, in ``state/models.py``).
- ``Repository.record_option_chain_snapshot`` (bulk insert
  idempotente).
- ``Repository.list_option_chain_snapshots`` (filtri su asset,
  timestamp window, expiry window, limit default 50000).
- ``Repository.latest_option_chain_timestamp`` (freshness check
  per dashboard GUI).

**Collector**

Nuovo ``runtime/option_chain_snapshot_cycle.py`` che:

1. Calcola la finestra scadenze ``[now+dte_min, now+dte_max]`` da
   ``cfg.structure``: niente richieste su scadenze che il rule
   engine non userebbe mai.
2. Chiama ``deribit.options_chain()`` con
   ``min_open_interest=cfg.liquidity.open_interest_min``.
3. Batch ``deribit.get_tickers()`` (max 20 per call, limite Deribit)
   con error-isolation per batch — un batch fallito non blocca
   gli altri.
4. NON chiama l'order book per ogni strike (rate-limit guard);
   ``book_depth_top3`` resta NULL e il liquidity gate live lo
   chiede on-the-fly per gli strike candidati al picker.

Best-effort end-to-end: chain assente, get_tickers giù, persist
fallito → ritorna 0 senza alzare eccezioni, logga sempre.

**Schedulazione**

Wired in ``Orchestrator.install_scheduler`` come job parallelo a
``market_snapshot``, attivo solo quando
``ENABLE_DATA_ANALYSIS=true``. Cron parametrizzabile via il nuovo
kwarg ``option_chain_cron`` (default ``55 13 * * MON``).

**Test**

- 4 unit test del collector (happy path, ticker mancante, chain
  vuota, fetch fail best-effort) con mock di RuntimeContext.
- Aggiornato ``test_install_scheduler_registers_canonical_jobs``
  per includere il nuovo job nel set canonico.

**Cosa sblocca**

- Backtest non-stilizzato: il PR ``feat/backtest-engine`` può
  dropparsi il modello BS+skew_premium e leggere prezzi reali
  ``mid`` dalla chain registrata.
- Calibrazione empirica dello skew premium (hardcoded a 1.5 nel
  backtest stilizzato): plot del rapporto fra quote reali Deribit
  e BS per delta/expiry, regressione → valore data-driven.
- Validazione ex-post: "il delta-0.12 era davvero a 25% OTM in
  quella settimana?" diventa una query SELECT.
- Dimensione attesa: ~50 strike × 3 scadenze × 1 snapshot/settimana
  × 17 colonne ≈ 12 KB/settimana, ~600 KB/anno. Trascurabile.

Suite: 409 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:44:49 +00:00

172 lines
5.1 KiB
Python

"""Integration tests for the Orchestrator façade (boot + cycle wiring)."""
from __future__ import annotations
from datetime import UTC, datetime
from decimal import Decimal
from pathlib import Path
import pytest
from pytest_httpx import HTTPXMock
from cerbero_bite.config import golden_config
from cerbero_bite.config.mcp_endpoints import load_endpoints
from cerbero_bite.config.runtime_flags import RuntimeFlags
from cerbero_bite.runtime import Orchestrator
from cerbero_bite.runtime.dependencies import build_runtime
pytestmark = pytest.mark.httpx_mock(assert_all_responses_were_requested=False)
def _now() -> datetime:
return datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
def _wire_environment_info(
httpx_mock: HTTPXMock,
*,
environment: str = "testnet",
) -> None:
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/environment_info",
json={
"exchange": "deribit",
"environment": environment,
"source": "env",
"env_value": "true" if environment == "testnet" else "false",
"base_url": "https://test.deribit.com/api/v2",
"max_leverage": 3,
},
is_reusable=True,
)
def _wire_health_probes(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
url="http://mcp-macro:9013/tools/get_macro_calendar",
json={"events": []},
is_reusable=True,
)
httpx_mock.add_response(
url="http://mcp-sentiment:9014/tools/get_cross_exchange_funding",
json={"snapshot": {}},
is_reusable=True,
)
httpx_mock.add_response(
url="http://mcp-hyperliquid:9012/tools/get_funding_rate",
json={"asset": "ETH", "current_funding_rate": 0.0001},
is_reusable=True,
)
def _build_orch(
tmp_path: Path,
*,
expected: str = "testnet",
flags: RuntimeFlags | None = None,
) -> Orchestrator:
ctx = build_runtime(
cfg=golden_config(),
endpoints=load_endpoints(env={}),
token="t",
db_path=tmp_path / "state.sqlite",
audit_path=tmp_path / "audit.log",
retry_max=1,
clock=_now,
)
return Orchestrator(
ctx,
expected_environment=expected, # type: ignore[arg-type]
eur_to_usd=Decimal("1.075"),
flags=flags
or RuntimeFlags(data_analysis_enabled=True, strategy_enabled=True),
)
@pytest.mark.asyncio
async def test_boot_succeeds_when_environment_matches(
tmp_path: Path, httpx_mock: HTTPXMock
) -> None:
_wire_environment_info(httpx_mock, environment="testnet")
_wire_health_probes(httpx_mock)
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/get_positions",
json=[],
is_reusable=True,
)
orch = _build_orch(tmp_path, expected="testnet")
boot = await orch.boot()
assert boot.environment == "testnet"
assert boot.health.state == "ok"
assert orch.context.kill_switch.is_armed() is False
@pytest.mark.asyncio
async def test_boot_arms_kill_switch_on_environment_mismatch(
tmp_path: Path, httpx_mock: HTTPXMock
) -> None:
_wire_environment_info(httpx_mock, environment="mainnet")
_wire_health_probes(httpx_mock)
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/get_positions",
json=[],
is_reusable=True,
)
orch = _build_orch(tmp_path, expected="testnet")
await orch.boot()
assert orch.context.kill_switch.is_armed() is True
def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None:
orch = _build_orch(tmp_path)
sched = orch.install_scheduler()
job_ids = {j.id for j in sched.get_jobs()}
assert job_ids == {
"entry",
"monitor",
"health",
"backup",
"manual_actions",
"market_snapshot",
"option_chain_snapshot",
}
def test_install_scheduler_skips_strategy_jobs_when_disabled(tmp_path: Path) -> None:
orch = _build_orch(
tmp_path,
flags=RuntimeFlags(data_analysis_enabled=True, strategy_enabled=False),
)
sched = orch.install_scheduler()
job_ids = {j.id for j in sched.get_jobs()}
assert "entry" not in job_ids
assert "monitor" not in job_ids
# data analysis stays on, plus the always-on infra jobs.
assert {"health", "backup", "manual_actions", "market_snapshot"}.issubset(job_ids)
def test_install_scheduler_skips_market_snapshot_when_data_analysis_off(
tmp_path: Path,
) -> None:
orch = _build_orch(
tmp_path,
flags=RuntimeFlags(data_analysis_enabled=False, strategy_enabled=True),
)
sched = orch.install_scheduler()
job_ids = {j.id for j in sched.get_jobs()}
assert "market_snapshot" not in job_ids
assert {"entry", "monitor", "health", "backup", "manual_actions"}.issubset(
job_ids
)
def test_install_scheduler_analysis_only_default(tmp_path: Path) -> None:
"""The default RuntimeFlags profile (analysis only) drops entry/monitor."""
orch = _build_orch(tmp_path, flags=RuntimeFlags())
sched = orch.install_scheduler()
job_ids = {j.id for j in sched.get_jobs()}
assert "entry" not in job_ids
assert "monitor" not in job_ids
assert "market_snapshot" in job_ids