feat(data): OHLCV loader via ccxt with parquet cache

Adjust .gitignore to keep src/multi_swarm/data/ tracked (only
top-level /data/ cache directory remains ignored).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-09 19:01:25 +02:00
parent b73416f482
commit b977c371e5
4 changed files with 138 additions and 1 deletions
View File
+73
View File
@@ -0,0 +1,73 @@
from __future__ import annotations
import hashlib
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import ccxt # type: ignore[import-untyped]
import pandas as pd # type: ignore[import-untyped]
@dataclass(frozen=True)
class OHLCVRequest:
symbol: str
timeframe: str
start: datetime
end: datetime
def cache_key(self) -> str:
s = f"{self.symbol}|{self.timeframe}|{self.start.isoformat()}|{self.end.isoformat()}"
return hashlib.sha1(s.encode()).hexdigest()[:16]
class OHLCVLoader:
"""Carica OHLCV via ccxt (Binance) e cachea in parquet."""
def __init__(self, cache_dir: Path, exchange_name: str = "binance"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.exchange_name = exchange_name
def load(self, req: OHLCVRequest) -> pd.DataFrame:
cache_file = self.cache_dir / f"{req.cache_key()}.parquet"
if cache_file.exists():
return pd.read_parquet(cache_file)
df = self._fetch_paginated(req)
df.to_parquet(cache_file)
return df
@staticmethod
def _timeframe_to_ms(timeframe: str) -> int:
units = {"m": 60, "h": 3600, "d": 86400, "w": 604800}
unit = timeframe[-1]
if unit not in units:
raise ValueError(f"Unsupported timeframe: {timeframe}")
return int(timeframe[:-1]) * units[unit] * 1000
def _fetch_paginated(self, req: OHLCVRequest) -> pd.DataFrame:
exchange = getattr(ccxt, self.exchange_name)({"enableRateLimit": True})
timeframe_ms = self._timeframe_to_ms(req.timeframe)
since = int(req.start.timestamp() * 1000)
end_ms = int(req.end.timestamp() * 1000)
all_rows: list[list[float]] = []
limit = 1000
while since <= end_ms:
rows = exchange.fetch_ohlcv(req.symbol, req.timeframe, since=since, limit=limit)
if not rows:
break
all_rows.extend(rows)
last_ts = rows[-1][0]
new_since = last_ts + timeframe_ms
if new_since <= since:
break
since = new_since
df = pd.DataFrame(all_rows, columns=["ts", "open", "high", "low", "close", "volume"])
df = df.drop_duplicates(subset=["ts"]).sort_values("ts")
df["ts"] = pd.to_datetime(df["ts"], unit="ms", utc=True)
df = df.set_index("ts")
df = df[(df.index >= req.start) & (df.index < req.end)]
return df[["open", "high", "low", "close", "volume"]].astype("float64")