from __future__ import annotations import hashlib from dataclasses import dataclass from datetime import datetime from pathlib import Path import ccxt # type: ignore[import-untyped] import pandas as pd # type: ignore[import-untyped] @dataclass(frozen=True) class OHLCVRequest: symbol: str timeframe: str start: datetime end: datetime def cache_key(self) -> str: s = f"{self.symbol}|{self.timeframe}|{self.start.isoformat()}|{self.end.isoformat()}" return hashlib.sha1(s.encode()).hexdigest()[:16] class OHLCVLoader: """Carica OHLCV via ccxt (Binance) e cachea in parquet.""" def __init__(self, cache_dir: Path, exchange_name: str = "binance"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self.exchange_name = exchange_name def load(self, req: OHLCVRequest) -> pd.DataFrame: cache_file = self.cache_dir / f"{req.cache_key()}.parquet" if cache_file.exists(): return pd.read_parquet(cache_file) df = self._fetch_paginated(req) df.to_parquet(cache_file) return df @staticmethod def _timeframe_to_ms(timeframe: str) -> int: units = {"m": 60, "h": 3600, "d": 86400, "w": 604800} unit = timeframe[-1] if unit not in units: raise ValueError(f"Unsupported timeframe: {timeframe}") return int(timeframe[:-1]) * units[unit] * 1000 def _fetch_paginated(self, req: OHLCVRequest) -> pd.DataFrame: exchange = getattr(ccxt, self.exchange_name)({"enableRateLimit": True}) timeframe_ms = self._timeframe_to_ms(req.timeframe) since = int(req.start.timestamp() * 1000) end_ms = int(req.end.timestamp() * 1000) all_rows: list[list[float]] = [] limit = 1000 while since <= end_ms: rows = exchange.fetch_ohlcv(req.symbol, req.timeframe, since=since, limit=limit) if not rows: break all_rows.extend(rows) if len(rows) < limit: break last_ts = rows[-1][0] new_since = last_ts + timeframe_ms if new_since <= since: break since = new_since df = pd.DataFrame(all_rows, columns=["ts", "open", "high", "low", "close", "volume"]) df = df.drop_duplicates(subset=["ts"]).sort_values("ts") df["ts"] = pd.to_datetime(df["ts"], unit="ms", utc=True) df = df.set_index("ts") df = df[(df.index >= req.start) & (df.index < req.end)] return df[["open", "high", "low", "close", "volume"]].astype("float64")