d9423a1ab5
Run reale phase1-real-002 ha rivelato: 1. Cerbero/Deribit cap ~5000 candele per call. Una richiesta di 2 anni 1h (17500 candele) ritorna troncata. CerberoOHLCVLoader._fetch ora pagina in chunk da 4500 barre, concatena e dedupe. 2. _ind_macd accettava solo (df, fast, slow). Il prompt suggerisce "(indicator macd 12 26 9)" con 3 numeri (fast/slow/signal). Aggiunto signal=9 default e calcolo histogram (macd_line - signal_line). Test suite 122 PASSED, ruff e mypy clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
190 lines
7.3 KiB
Python
190 lines
7.3 KiB
Python
"""OHLCV loader backed by Cerbero MCP (replaces ccxt in Phase 1).
|
|
|
|
Cerbero is the single source of truth for market data; Phase 1 uses only
|
|
OHLCV. Default exchange = deribit (BTC-PERPETUAL / ETH-PERPETUAL).
|
|
|
|
Cerbero get_historical request schemas (from OpenAPI at
|
|
``https://cerbero-mcp.tielogic.xyz/openapi.json``):
|
|
|
|
- ``mcp-deribit/tools/get_historical``: required ``instrument``,
|
|
``start_date``, ``end_date``; optional ``resolution`` (default ``1h``).
|
|
- ``mcp-bybit/tools/get_historical``: ``symbol`` + ``interval`` (minutes)
|
|
+ range params (``start``/``end`` ms epoch).
|
|
- ``mcp-hyperliquid/tools/get_historical``: ``instrument`` (or ``asset``),
|
|
``start_date``, ``end_date``, optional ``resolution``.
|
|
|
|
Response shape is not statically guaranteed by the OpenAPI schema (200
|
|
returns an opaque object). The parser is therefore defensive: it tries
|
|
the three plausible shapes (object-of-records under ``candles``/``data``/
|
|
``result``/``ohlcv``/``klines``/``bars``, array-of-arrays ccxt-style, or
|
|
a raw list at the top level) and raises a clear error if none matches.
|
|
|
|
Cerbero/Deribit applicano un cap soft di ~5000 candele per call: il
|
|
loader pagina internamente in chunk da 4500 barre, concatena e dedupe.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, ClassVar
|
|
|
|
import pandas as pd # type: ignore[import-untyped]
|
|
|
|
from ..cerbero.client import CerberoClient
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OHLCVRequest:
|
|
symbol: str # e.g. "BTC-PERPETUAL" for deribit, "BTCUSDT" for bybit
|
|
timeframe: str # e.g. "1h", "5m"
|
|
start: datetime
|
|
end: datetime
|
|
exchange: str = "deribit"
|
|
|
|
def cache_key(self) -> str:
|
|
s = (
|
|
f"{self.exchange}|{self.symbol}|{self.timeframe}|"
|
|
f"{self.start.isoformat()}|{self.end.isoformat()}"
|
|
)
|
|
return hashlib.sha1(s.encode()).hexdigest()[:16]
|
|
|
|
|
|
class CerberoOHLCVLoader:
|
|
"""Carica OHLCV chiamando Cerbero MCP get_historical, cachea in parquet."""
|
|
|
|
_COLUMNS: ClassVar[list[str]] = ["open", "high", "low", "close", "volume"]
|
|
|
|
def __init__(self, client: CerberoClient, cache_dir: Path):
|
|
self.client = client
|
|
self.cache_dir = Path(cache_dir)
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def load(self, req: OHLCVRequest) -> pd.DataFrame:
|
|
cache_file = self.cache_dir / f"{req.cache_key()}.parquet"
|
|
if cache_file.exists():
|
|
df = pd.read_parquet(cache_file)
|
|
assert isinstance(df, pd.DataFrame)
|
|
return df
|
|
df = self._fetch(req)
|
|
df.to_parquet(cache_file)
|
|
return df
|
|
|
|
# Cerbero/Deribit hanno un cap soft di ~5000 candele per call.
|
|
# Paginiamo in chunk piu' piccoli per intervalli lunghi.
|
|
_CHUNK_BARS: ClassVar[int] = 4500
|
|
|
|
def _fetch(self, req: OHLCVRequest) -> pd.DataFrame:
|
|
bar_seconds = _timeframe_to_minutes(req.timeframe) * 60
|
|
chunk_seconds = self._CHUNK_BARS * bar_seconds
|
|
chunks: list[pd.DataFrame] = []
|
|
cursor = req.start
|
|
while cursor < req.end:
|
|
chunk_end = min(req.end, cursor + timedelta(seconds=chunk_seconds))
|
|
chunk_req = OHLCVRequest(
|
|
symbol=req.symbol, timeframe=req.timeframe,
|
|
start=cursor, end=chunk_end, exchange=req.exchange,
|
|
)
|
|
args = self._build_args(chunk_req)
|
|
response = self.client.call_tool(req.exchange, "get_historical", args)
|
|
chunk = self._parse_response(response)
|
|
if not chunk.empty:
|
|
chunks.append(chunk)
|
|
last_ts = chunk.index[-1].to_pydatetime()
|
|
# avanza di un bar oltre l'ultimo per evitare overlap
|
|
cursor = max(last_ts + timedelta(seconds=bar_seconds), chunk_end)
|
|
else:
|
|
cursor = chunk_end
|
|
if not chunks:
|
|
return pd.DataFrame(columns=self._COLUMNS).set_index(
|
|
pd.DatetimeIndex([], tz="UTC", name="ts")
|
|
)
|
|
df = pd.concat(chunks)
|
|
df = df[~df.index.duplicated(keep="first")].sort_index()
|
|
return df
|
|
|
|
def _build_args(self, req: OHLCVRequest) -> dict[str, Any]:
|
|
if req.exchange == "deribit":
|
|
return {
|
|
"instrument": req.symbol,
|
|
"start_date": req.start.isoformat(),
|
|
"end_date": req.end.isoformat(),
|
|
"resolution": req.timeframe,
|
|
}
|
|
if req.exchange == "bybit":
|
|
return {
|
|
"symbol": req.symbol,
|
|
"interval": _timeframe_to_minutes(req.timeframe),
|
|
"start": int(req.start.timestamp() * 1000),
|
|
"end": int(req.end.timestamp() * 1000),
|
|
}
|
|
if req.exchange == "hyperliquid":
|
|
return {
|
|
"instrument": req.symbol,
|
|
"start_date": req.start.isoformat(),
|
|
"end_date": req.end.isoformat(),
|
|
"resolution": req.timeframe,
|
|
}
|
|
raise ValueError(f"unsupported exchange: {req.exchange}")
|
|
|
|
def _parse_response(self, response: Any) -> pd.DataFrame:
|
|
records = self._extract_records(response)
|
|
if not records:
|
|
empty = pd.DataFrame(columns=self._COLUMNS)
|
|
empty.index = pd.DatetimeIndex([], tz="UTC", name="ts")
|
|
return empty.astype("float64")
|
|
|
|
first = records[0]
|
|
if isinstance(first, dict):
|
|
df = pd.DataFrame(records)
|
|
ts_col = self._guess_ts_col(df.columns)
|
|
df = df.rename(columns={ts_col: "ts"})
|
|
elif isinstance(first, list | tuple) and len(first) >= 6:
|
|
df = pd.DataFrame(
|
|
records, columns=["ts", "open", "high", "low", "close", "volume"]
|
|
)
|
|
else:
|
|
raise RuntimeError(f"unrecognized record shape: {first!r}")
|
|
|
|
df["ts"] = self._to_utc_index(df["ts"])
|
|
df = df.drop_duplicates(subset=["ts"]).sort_values("ts").set_index("ts")
|
|
keep = [c for c in self._COLUMNS if c in df.columns]
|
|
return df[keep].astype("float64")
|
|
|
|
@staticmethod
|
|
def _extract_records(response: Any) -> list[Any]:
|
|
if isinstance(response, list):
|
|
return response
|
|
if isinstance(response, dict):
|
|
for key in ("candles", "data", "result", "ohlcv", "klines", "bars"):
|
|
if key in response and isinstance(response[key], list):
|
|
return list(response[key])
|
|
raise RuntimeError(
|
|
f"cannot find records list in response: {type(response).__name__}"
|
|
)
|
|
|
|
@staticmethod
|
|
def _guess_ts_col(columns: pd.Index) -> str:
|
|
for cand in ("ts", "timestamp", "time", "date", "datetime", "open_time", "t"):
|
|
if cand in columns:
|
|
return cand
|
|
raise RuntimeError(f"no timestamp column in {list(columns)}")
|
|
|
|
@staticmethod
|
|
def _to_utc_index(s: pd.Series) -> pd.Series:
|
|
# Accepts ms-epoch ints, sec-epoch ints, ISO strings.
|
|
if pd.api.types.is_numeric_dtype(s):
|
|
mx = float(s.max())
|
|
unit = "ms" if mx > 1e12 else "s"
|
|
return pd.to_datetime(s, unit=unit, utc=True)
|
|
return pd.to_datetime(s, utc=True)
|
|
|
|
|
|
def _timeframe_to_minutes(tf: str) -> int:
|
|
units = {"m": 1, "h": 60, "d": 60 * 24}
|
|
if not tf or tf[-1] not in units:
|
|
raise ValueError(f"unsupported timeframe: {tf}")
|
|
return int(tf[:-1]) * units[tf[-1]]
|