From d9423a1ab5e3987c0d1a98934bcf9ad24d0debf4 Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Sun, 10 May 2026 11:27:27 +0200 Subject: [PATCH] fix(data,protocol): paginazione OHLCV + macd accetta signal param Run reale phase1-real-002 ha rivelato: 1. Cerbero/Deribit cap ~5000 candele per call. Una richiesta di 2 anni 1h (17500 candele) ritorna troncata. CerberoOHLCVLoader._fetch ora pagina in chunk da 4500 barre, concatena e dedupe. 2. _ind_macd accettava solo (df, fast, slow). Il prompt suggerisce "(indicator macd 12 26 9)" con 3 numeri (fast/slow/signal). Aggiunto signal=9 default e calcolo histogram (macd_line - signal_line). Test suite 122 PASSED, ruff e mypy clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/multi_swarm/data/cerbero_ohlcv.py | 41 ++++++++++++++++++++++----- src/multi_swarm/protocol/compiler.py | 8 ++++-- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/multi_swarm/data/cerbero_ohlcv.py b/src/multi_swarm/data/cerbero_ohlcv.py index 83efdb7..1439f6a 100644 --- a/src/multi_swarm/data/cerbero_ohlcv.py +++ b/src/multi_swarm/data/cerbero_ohlcv.py @@ -19,16 +19,15 @@ the three plausible shapes (object-of-records under ``candles``/``data``/ ``result``/``ohlcv``/``klines``/``bars``, array-of-arrays ccxt-style, or a raw list at the top level) and raises a clear error if none matches. -Pagination is NOT yet implemented — Cerbero is assumed to accept the full -date range and page internally. If a future live call shows a cap (e.g. -~1000 candles per call), add a chunked fetch in a follow-up. +Cerbero/Deribit applicano un cap soft di ~5000 candele per call: il +loader pagina internamente in chunk da 4500 barre, concatena e dedupe. """ from __future__ import annotations import hashlib from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from typing import Any, ClassVar @@ -73,10 +72,38 @@ class CerberoOHLCVLoader: df.to_parquet(cache_file) return df + # Cerbero/Deribit hanno un cap soft di ~5000 candele per call. + # Paginiamo in chunk piu' piccoli per intervalli lunghi. + _CHUNK_BARS: ClassVar[int] = 4500 + def _fetch(self, req: OHLCVRequest) -> pd.DataFrame: - args = self._build_args(req) - response = self.client.call_tool(req.exchange, "get_historical", args) - return self._parse_response(response) + bar_seconds = _timeframe_to_minutes(req.timeframe) * 60 + chunk_seconds = self._CHUNK_BARS * bar_seconds + chunks: list[pd.DataFrame] = [] + cursor = req.start + while cursor < req.end: + chunk_end = min(req.end, cursor + timedelta(seconds=chunk_seconds)) + chunk_req = OHLCVRequest( + symbol=req.symbol, timeframe=req.timeframe, + start=cursor, end=chunk_end, exchange=req.exchange, + ) + args = self._build_args(chunk_req) + response = self.client.call_tool(req.exchange, "get_historical", args) + chunk = self._parse_response(response) + if not chunk.empty: + chunks.append(chunk) + last_ts = chunk.index[-1].to_pydatetime() + # avanza di un bar oltre l'ultimo per evitare overlap + cursor = max(last_ts + timedelta(seconds=bar_seconds), chunk_end) + else: + cursor = chunk_end + if not chunks: + return pd.DataFrame(columns=self._COLUMNS).set_index( + pd.DatetimeIndex([], tz="UTC", name="ts") + ) + df = pd.concat(chunks) + df = df[~df.index.duplicated(keep="first")].sort_index() + return df def _build_args(self, req: OHLCVRequest) -> dict[str, Any]: if req.exchange == "deribit": diff --git a/src/multi_swarm/protocol/compiler.py b/src/multi_swarm/protocol/compiler.py index 6e2ba8a..9deefdd 100644 --- a/src/multi_swarm/protocol/compiler.py +++ b/src/multi_swarm/protocol/compiler.py @@ -77,8 +77,12 @@ def _ind_realized_vol(df: pd.DataFrame, window: int) -> pd.Series: return _realized_vol(df["close"], window) -def _ind_macd(df: pd.DataFrame, fast: int = 12, slow: int = 26) -> pd.Series: - return _sma(df["close"], fast) - _sma(df["close"], slow) +def _ind_macd( + df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9, +) -> pd.Series: + macd_line = _sma(df["close"], fast) - _sma(df["close"], slow) + signal_line = _sma(macd_line, signal) + return macd_line - signal_line # Annotated as ``dict[str, Any]`` deliberately: each indicator has its own