"""OHLCV loader backed by Cerbero MCP (replaces ccxt in Phase 1). Cerbero is the single source of truth for market data; Phase 1 uses only OHLCV. Default exchange = deribit (BTC-PERPETUAL / ETH-PERPETUAL). Cerbero get_historical request schemas (from OpenAPI at ``https://cerbero-mcp.tielogic.xyz/openapi.json``): - ``mcp-deribit/tools/get_historical``: required ``instrument``, ``start_date``, ``end_date``; optional ``resolution`` (default ``1h``). - ``mcp-bybit/tools/get_historical``: ``symbol`` + ``interval`` (minutes) + range params (``start``/``end`` ms epoch). - ``mcp-hyperliquid/tools/get_historical``: ``instrument`` (or ``asset``), ``start_date``, ``end_date``, optional ``resolution``. Response shape is not statically guaranteed by the OpenAPI schema (200 returns an opaque object). The parser is therefore defensive: it tries the three plausible shapes (object-of-records under ``candles``/``data``/ ``result``/``ohlcv``/``klines``/``bars``, array-of-arrays ccxt-style, or a raw list at the top level) and raises a clear error if none matches. Cerbero/Deribit applicano un cap soft di ~5000 candele per call: il loader pagina internamente in chunk da 4500 barre, concatena e dedupe. """ from __future__ import annotations import hashlib from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Any, ClassVar import pandas as pd # type: ignore[import-untyped] from ..cerbero.client import CerberoClient @dataclass(frozen=True) class OHLCVRequest: symbol: str # e.g. "BTC-PERPETUAL" for deribit, "BTCUSDT" for bybit timeframe: str # e.g. "1h", "5m" start: datetime end: datetime exchange: str = "deribit" def cache_key(self) -> str: s = ( f"{self.exchange}|{self.symbol}|{self.timeframe}|" f"{self.start.isoformat()}|{self.end.isoformat()}" ) return hashlib.sha1(s.encode()).hexdigest()[:16] class CerberoOHLCVLoader: """Carica OHLCV chiamando Cerbero MCP get_historical, cachea in parquet.""" _COLUMNS: ClassVar[list[str]] = ["open", "high", "low", "close", "volume"] def __init__(self, client: CerberoClient, cache_dir: Path): self.client = client self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) def load(self, req: OHLCVRequest) -> pd.DataFrame: cache_file = self.cache_dir / f"{req.cache_key()}.parquet" if cache_file.exists(): df = pd.read_parquet(cache_file) assert isinstance(df, pd.DataFrame) return df df = self._fetch(req) df.to_parquet(cache_file) return df # Cerbero/Deribit hanno un cap soft di ~5000 candele per call. # Paginiamo in chunk piu' piccoli per intervalli lunghi. _CHUNK_BARS: ClassVar[int] = 4500 def _fetch(self, req: OHLCVRequest) -> pd.DataFrame: bar_seconds = _timeframe_to_minutes(req.timeframe) * 60 chunk_seconds = self._CHUNK_BARS * bar_seconds chunks: list[pd.DataFrame] = [] cursor = req.start while cursor < req.end: chunk_end = min(req.end, cursor + timedelta(seconds=chunk_seconds)) chunk_req = OHLCVRequest( symbol=req.symbol, timeframe=req.timeframe, start=cursor, end=chunk_end, exchange=req.exchange, ) args = self._build_args(chunk_req) response = self.client.call_tool(req.exchange, "get_historical", args) chunk = self._parse_response(response) if not chunk.empty: chunks.append(chunk) last_ts = chunk.index[-1].to_pydatetime() # avanza di un bar oltre l'ultimo per evitare overlap cursor = max(last_ts + timedelta(seconds=bar_seconds), chunk_end) else: cursor = chunk_end if not chunks: return pd.DataFrame(columns=self._COLUMNS).set_index( pd.DatetimeIndex([], tz="UTC", name="ts") ) df = pd.concat(chunks) df = df[~df.index.duplicated(keep="first")].sort_index() return df def _build_args(self, req: OHLCVRequest) -> dict[str, Any]: if req.exchange == "deribit": return { "instrument": req.symbol, "start_date": req.start.isoformat(), "end_date": req.end.isoformat(), "resolution": req.timeframe, } if req.exchange == "bybit": return { "symbol": req.symbol, "interval": _timeframe_to_minutes(req.timeframe), "start": int(req.start.timestamp() * 1000), "end": int(req.end.timestamp() * 1000), } if req.exchange == "hyperliquid": return { "instrument": req.symbol, "start_date": req.start.isoformat(), "end_date": req.end.isoformat(), "resolution": req.timeframe, } raise ValueError(f"unsupported exchange: {req.exchange}") def _parse_response(self, response: Any) -> pd.DataFrame: records = self._extract_records(response) if not records: empty = pd.DataFrame(columns=self._COLUMNS) empty.index = pd.DatetimeIndex([], tz="UTC", name="ts") return empty.astype("float64") first = records[0] if isinstance(first, dict): df = pd.DataFrame(records) ts_col = self._guess_ts_col(df.columns) df = df.rename(columns={ts_col: "ts"}) elif isinstance(first, list | tuple) and len(first) >= 6: df = pd.DataFrame( records, columns=["ts", "open", "high", "low", "close", "volume"] ) else: raise RuntimeError(f"unrecognized record shape: {first!r}") df["ts"] = self._to_utc_index(df["ts"]) df = df.drop_duplicates(subset=["ts"]).sort_values("ts").set_index("ts") keep = [c for c in self._COLUMNS if c in df.columns] return df[keep].astype("float64") @staticmethod def _extract_records(response: Any) -> list[Any]: if isinstance(response, list): return response if isinstance(response, dict): for key in ("candles", "data", "result", "ohlcv", "klines", "bars"): if key in response and isinstance(response[key], list): return list(response[key]) raise RuntimeError( f"cannot find records list in response: {type(response).__name__}" ) @staticmethod def _guess_ts_col(columns: pd.Index) -> str: for cand in ("ts", "timestamp", "time", "date", "datetime", "open_time", "t"): if cand in columns: return cand raise RuntimeError(f"no timestamp column in {list(columns)}") @staticmethod def _to_utc_index(s: pd.Series) -> pd.Series: # Accepts ms-epoch ints, sec-epoch ints, ISO strings. if pd.api.types.is_numeric_dtype(s): mx = float(s.max()) unit = "ms" if mx > 1e12 else "s" return pd.to_datetime(s, unit=unit, utc=True) return pd.to_datetime(s, utc=True) def _timeframe_to_minutes(tf: str) -> int: units = {"m": 1, "h": 60, "d": 60 * 24} if not tf or tf[-1] not in units: raise ValueError(f"unsupported timeframe: {tf}") return int(tf[:-1]) * units[tf[-1]]