refactor(data): replace ccxt OHLCV loader with CerberoOHLCVLoader (deribit default)
Cerbero MCP diventa unica fonte di verità per dati di mercato Phase 1.
Il nuovo CerberoOHLCVLoader chiama mcp-{exchange}/tools/get_historical
con shape per-exchange (deribit/bybit/hyperliquid) e parser difensivo
sulla risposta (object-of-records, array-of-arrays, raw list).
- src/multi_swarm/data/cerbero_ohlcv.py (nuovo) con OHLCVRequest +
CerberoOHLCVLoader, cache parquet via SHA1 della request
- tests/unit/test_cerbero_ohlcv.py (nuovo, 5 test, CerberoClient mockato)
- src/multi_swarm/data/ohlcv_loader.py + test ccxt rimossi
- scripts/run_phase1.py: costruisce CerberoClient, --exchange CLI arg,
default --symbol BTC-PERPETUAL (formato Deribit)
- pyproject.toml: rimosso ccxt>=4.4 (uv sync ha rimosso 16 transitivi)
- .env.example: CERBERO_BASE_URL=https://cerbero-mcp.tielogic.xyz +
nota su MAINNET vs TESTNET token
Schema confermato via OpenAPI di Cerbero (instrument/start_date/end_date
+ resolution opzionale). Forma della risposta non garantita dallo schema:
parser difensivo prova candles/data/result/ohlcv/klines/bars e segnala
errore chiaro se nessuna shape combacia. Live verification skippata
(nessun token in .env).
Paginazione non ancora implementata: si assume che get_historical paginI
internamente. Da rivedere se una live call mostra cap (~1000 candele).
Test: 122 passed (era 122 con 2 ccxt + 0 cerbero, ora 0 ccxt + 5 cerbero,
delta netto +3, ma 2 test ga_loop preesistenti rimossi in altro commit
mantenevano il totale a 122).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,162 @@
|
||||
"""OHLCV loader backed by Cerbero MCP (replaces ccxt in Phase 1).
|
||||
|
||||
Cerbero is the single source of truth for market data; Phase 1 uses only
|
||||
OHLCV. Default exchange = deribit (BTC-PERPETUAL / ETH-PERPETUAL).
|
||||
|
||||
Cerbero get_historical request schemas (from OpenAPI at
|
||||
``https://cerbero-mcp.tielogic.xyz/openapi.json``):
|
||||
|
||||
- ``mcp-deribit/tools/get_historical``: required ``instrument``,
|
||||
``start_date``, ``end_date``; optional ``resolution`` (default ``1h``).
|
||||
- ``mcp-bybit/tools/get_historical``: ``symbol`` + ``interval`` (minutes)
|
||||
+ range params (``start``/``end`` ms epoch).
|
||||
- ``mcp-hyperliquid/tools/get_historical``: ``instrument`` (or ``asset``),
|
||||
``start_date``, ``end_date``, optional ``resolution``.
|
||||
|
||||
Response shape is not statically guaranteed by the OpenAPI schema (200
|
||||
returns an opaque object). The parser is therefore defensive: it tries
|
||||
the three plausible shapes (object-of-records under ``candles``/``data``/
|
||||
``result``/``ohlcv``/``klines``/``bars``, array-of-arrays ccxt-style, or
|
||||
a raw list at the top level) and raises a clear error if none matches.
|
||||
|
||||
Pagination is NOT yet implemented — Cerbero is assumed to accept the full
|
||||
date range and page internally. If a future live call shows a cap (e.g.
|
||||
~1000 candles per call), add a chunked fetch in a follow-up.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, ClassVar
|
||||
|
||||
import pandas as pd # type: ignore[import-untyped]
|
||||
|
||||
from ..cerbero.client import CerberoClient
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OHLCVRequest:
|
||||
symbol: str # e.g. "BTC-PERPETUAL" for deribit, "BTCUSDT" for bybit
|
||||
timeframe: str # e.g. "1h", "5m"
|
||||
start: datetime
|
||||
end: datetime
|
||||
exchange: str = "deribit"
|
||||
|
||||
def cache_key(self) -> str:
|
||||
s = (
|
||||
f"{self.exchange}|{self.symbol}|{self.timeframe}|"
|
||||
f"{self.start.isoformat()}|{self.end.isoformat()}"
|
||||
)
|
||||
return hashlib.sha1(s.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
class CerberoOHLCVLoader:
|
||||
"""Carica OHLCV chiamando Cerbero MCP get_historical, cachea in parquet."""
|
||||
|
||||
_COLUMNS: ClassVar[list[str]] = ["open", "high", "low", "close", "volume"]
|
||||
|
||||
def __init__(self, client: CerberoClient, cache_dir: Path):
|
||||
self.client = client
|
||||
self.cache_dir = Path(cache_dir)
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def load(self, req: OHLCVRequest) -> pd.DataFrame:
|
||||
cache_file = self.cache_dir / f"{req.cache_key()}.parquet"
|
||||
if cache_file.exists():
|
||||
df = pd.read_parquet(cache_file)
|
||||
assert isinstance(df, pd.DataFrame)
|
||||
return df
|
||||
df = self._fetch(req)
|
||||
df.to_parquet(cache_file)
|
||||
return df
|
||||
|
||||
def _fetch(self, req: OHLCVRequest) -> pd.DataFrame:
|
||||
args = self._build_args(req)
|
||||
response = self.client.call_tool(req.exchange, "get_historical", args)
|
||||
return self._parse_response(response)
|
||||
|
||||
def _build_args(self, req: OHLCVRequest) -> dict[str, Any]:
|
||||
if req.exchange == "deribit":
|
||||
return {
|
||||
"instrument": req.symbol,
|
||||
"start_date": req.start.isoformat(),
|
||||
"end_date": req.end.isoformat(),
|
||||
"resolution": req.timeframe,
|
||||
}
|
||||
if req.exchange == "bybit":
|
||||
return {
|
||||
"symbol": req.symbol,
|
||||
"interval": _timeframe_to_minutes(req.timeframe),
|
||||
"start": int(req.start.timestamp() * 1000),
|
||||
"end": int(req.end.timestamp() * 1000),
|
||||
}
|
||||
if req.exchange == "hyperliquid":
|
||||
return {
|
||||
"instrument": req.symbol,
|
||||
"start_date": req.start.isoformat(),
|
||||
"end_date": req.end.isoformat(),
|
||||
"resolution": req.timeframe,
|
||||
}
|
||||
raise ValueError(f"unsupported exchange: {req.exchange}")
|
||||
|
||||
def _parse_response(self, response: Any) -> pd.DataFrame:
|
||||
records = self._extract_records(response)
|
||||
if not records:
|
||||
empty = pd.DataFrame(columns=self._COLUMNS)
|
||||
empty.index = pd.DatetimeIndex([], tz="UTC", name="ts")
|
||||
return empty.astype("float64")
|
||||
|
||||
first = records[0]
|
||||
if isinstance(first, dict):
|
||||
df = pd.DataFrame(records)
|
||||
ts_col = self._guess_ts_col(df.columns)
|
||||
df = df.rename(columns={ts_col: "ts"})
|
||||
elif isinstance(first, list | tuple) and len(first) >= 6:
|
||||
df = pd.DataFrame(
|
||||
records, columns=["ts", "open", "high", "low", "close", "volume"]
|
||||
)
|
||||
else:
|
||||
raise RuntimeError(f"unrecognized record shape: {first!r}")
|
||||
|
||||
df["ts"] = self._to_utc_index(df["ts"])
|
||||
df = df.drop_duplicates(subset=["ts"]).sort_values("ts").set_index("ts")
|
||||
keep = [c for c in self._COLUMNS if c in df.columns]
|
||||
return df[keep].astype("float64")
|
||||
|
||||
@staticmethod
|
||||
def _extract_records(response: Any) -> list[Any]:
|
||||
if isinstance(response, list):
|
||||
return response
|
||||
if isinstance(response, dict):
|
||||
for key in ("candles", "data", "result", "ohlcv", "klines", "bars"):
|
||||
if key in response and isinstance(response[key], list):
|
||||
return list(response[key])
|
||||
raise RuntimeError(
|
||||
f"cannot find records list in response: {type(response).__name__}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _guess_ts_col(columns: pd.Index) -> str:
|
||||
for cand in ("ts", "timestamp", "time", "date", "datetime", "open_time", "t"):
|
||||
if cand in columns:
|
||||
return cand
|
||||
raise RuntimeError(f"no timestamp column in {list(columns)}")
|
||||
|
||||
@staticmethod
|
||||
def _to_utc_index(s: pd.Series) -> pd.Series:
|
||||
# Accepts ms-epoch ints, sec-epoch ints, ISO strings.
|
||||
if pd.api.types.is_numeric_dtype(s):
|
||||
mx = float(s.max())
|
||||
unit = "ms" if mx > 1e12 else "s"
|
||||
return pd.to_datetime(s, unit=unit, utc=True)
|
||||
return pd.to_datetime(s, utc=True)
|
||||
|
||||
|
||||
def _timeframe_to_minutes(tf: str) -> int:
|
||||
units = {"m": 1, "h": 60, "d": 60 * 24}
|
||||
if not tf or tf[-1] not in units:
|
||||
raise ValueError(f"unsupported timeframe: {tf}")
|
||||
return int(tf[:-1]) * units[tf[-1]]
|
||||
@@ -1,75 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import ccxt # type: ignore[import-untyped]
|
||||
import pandas as pd # type: ignore[import-untyped]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OHLCVRequest:
|
||||
symbol: str
|
||||
timeframe: str
|
||||
start: datetime
|
||||
end: datetime
|
||||
|
||||
def cache_key(self) -> str:
|
||||
s = f"{self.symbol}|{self.timeframe}|{self.start.isoformat()}|{self.end.isoformat()}"
|
||||
return hashlib.sha1(s.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
class OHLCVLoader:
|
||||
"""Carica OHLCV via ccxt (Binance) e cachea in parquet."""
|
||||
|
||||
def __init__(self, cache_dir: Path, exchange_name: str = "binance"):
|
||||
self.cache_dir = Path(cache_dir)
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.exchange_name = exchange_name
|
||||
|
||||
def load(self, req: OHLCVRequest) -> pd.DataFrame:
|
||||
cache_file = self.cache_dir / f"{req.cache_key()}.parquet"
|
||||
if cache_file.exists():
|
||||
return pd.read_parquet(cache_file)
|
||||
|
||||
df = self._fetch_paginated(req)
|
||||
df.to_parquet(cache_file)
|
||||
return df
|
||||
|
||||
@staticmethod
|
||||
def _timeframe_to_ms(timeframe: str) -> int:
|
||||
units = {"m": 60, "h": 3600, "d": 86400, "w": 604800}
|
||||
unit = timeframe[-1]
|
||||
if unit not in units:
|
||||
raise ValueError(f"Unsupported timeframe: {timeframe}")
|
||||
return int(timeframe[:-1]) * units[unit] * 1000
|
||||
|
||||
def _fetch_paginated(self, req: OHLCVRequest) -> pd.DataFrame:
|
||||
exchange = getattr(ccxt, self.exchange_name)({"enableRateLimit": True})
|
||||
timeframe_ms = self._timeframe_to_ms(req.timeframe)
|
||||
since = int(req.start.timestamp() * 1000)
|
||||
end_ms = int(req.end.timestamp() * 1000)
|
||||
all_rows: list[list[float]] = []
|
||||
limit = 1000
|
||||
|
||||
while since <= end_ms:
|
||||
rows = exchange.fetch_ohlcv(req.symbol, req.timeframe, since=since, limit=limit)
|
||||
if not rows:
|
||||
break
|
||||
all_rows.extend(rows)
|
||||
if len(rows) < limit:
|
||||
break
|
||||
last_ts = rows[-1][0]
|
||||
new_since = last_ts + timeframe_ms
|
||||
if new_since <= since:
|
||||
break
|
||||
since = new_since
|
||||
|
||||
df = pd.DataFrame(all_rows, columns=["ts", "open", "high", "low", "close", "volume"])
|
||||
df = df.drop_duplicates(subset=["ts"]).sort_values("ts")
|
||||
df["ts"] = pd.to_datetime(df["ts"], unit="ms", utc=True)
|
||||
df = df.set_index("ts")
|
||||
df = df[(df.index >= req.start) & (df.index < req.end)]
|
||||
return df[["open", "high", "low", "close", "volume"]].astype("float64")
|
||||
Reference in New Issue
Block a user