feat(V2): shared Candle validator + uniform 'candles' response key

Introduce common/candles.py with a Pydantic Candle model enforcing OHLC
consistency (high≥max, low≤min), non-negative volume and positive
timestamp. validate_candles() coerces upstream rows, sorts by timestamp
and raises HTTPException(502) on malformed data — surfacing upstream
data corruption as a retryable envelope instead of silently returning
nonsense.

Wired into all five exchange historical endpoints (Bybit, Hyperliquid,
Deribit, Alpaca, IBKR). BREAKING: Alpaca get_bars and IBKR get_bars now
return 'candles' (was 'bars') to align with the other exchanges.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root
2026-05-10 19:19:20 +00:00
parent 110ca7f5cf
commit c94312d79f
8 changed files with 192 additions and 54 deletions
+53
View File
@@ -0,0 +1,53 @@
"""Shared OHLCV candle model + validator for exchange historical endpoints."""
from __future__ import annotations
from typing import Any
from fastapi import HTTPException
from pydantic import BaseModel, ConfigDict, ValidationError, model_validator
class Candle(BaseModel):
model_config = ConfigDict(extra="ignore")
timestamp: int
open: float
high: float
low: float
close: float
volume: float
@model_validator(mode="after")
def _check(self) -> Candle:
if self.timestamp <= 0:
raise ValueError(f"timestamp must be > 0, got {self.timestamp}")
if self.volume < 0:
raise ValueError(f"volume must be >= 0, got {self.volume}")
if self.high < max(self.open, self.close, self.low):
raise ValueError(
f"high {self.high} < max(open={self.open}, "
f"close={self.close}, low={self.low})"
)
if self.low > min(self.open, self.close, self.high):
raise ValueError(
f"low {self.low} > min(open={self.open}, "
f"close={self.close}, high={self.high})"
)
return self
def validate_candles(raw: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Coerce upstream rows into validated candle dicts, sorted by timestamp.
Raises HTTPException(502) if any row violates OHLC consistency or schema —
upstream data corruption is mapped to a retryable error envelope.
"""
try:
candles = [Candle.model_validate(row) for row in raw]
except ValidationError as e:
raise HTTPException(
status_code=502,
detail=f"upstream returned malformed candle: {e.errors()[0]['msg']}",
) from e
candles.sort(key=lambda c: c.timestamp)
return [c.model_dump() for c in candles]
+13 -4
View File
@@ -20,6 +20,7 @@ from typing import Any
import httpx import httpx
from cerbero_mcp.common.candles import validate_candles
from cerbero_mcp.common.http import async_client from cerbero_mcp.common.http import async_client
# ── Endpoint base ──────────────────────────────────────────────── # ── Endpoint base ────────────────────────────────────────────────
@@ -301,9 +302,17 @@ class AlpacaClient:
bars_dict = (data or {}).get("bars") or {} bars_dict = (data or {}).get("bars") or {}
rows = bars_dict.get(symbol) or [] rows = bars_dict.get(symbol) or []
bars = [
def _iso_to_ms(ts: str | int | None) -> int | None:
if ts is None or isinstance(ts, int):
return ts
return int(_dt.datetime.fromisoformat(
ts.replace("Z", "+00:00")
).timestamp() * 1000)
candles = validate_candles([
{ {
"timestamp": b.get("t"), "timestamp": _iso_to_ms(b.get("t")),
"open": b.get("o"), "open": b.get("o"),
"high": b.get("h"), "high": b.get("h"),
"low": b.get("l"), "low": b.get("l"),
@@ -311,12 +320,12 @@ class AlpacaClient:
"volume": b.get("v"), "volume": b.get("v"),
} }
for b in rows for b in rows
] ])
return { return {
"symbol": symbol, "symbol": symbol,
"asset_class": ac, "asset_class": ac,
"interval": interval, "interval": interval,
"bars": bars, "candles": candles,
} }
async def get_snapshot(self, symbol: str) -> dict: async def get_snapshot(self, symbol: str) -> dict:
+9 -9
View File
@@ -22,6 +22,7 @@ import httpx
from cerbero_mcp.common import indicators as ind from cerbero_mcp.common import indicators as ind
from cerbero_mcp.common import microstructure as micro from cerbero_mcp.common import microstructure as micro
from cerbero_mcp.common.candles import validate_candles
BASE_MAINNET = "https://api.bybit.com" BASE_MAINNET = "https://api.bybit.com"
BASE_TESTNET = "https://api-testnet.bybit.com" BASE_TESTNET = "https://api-testnet.bybit.com"
@@ -254,18 +255,17 @@ class BybitClient:
params["end"] = end params["end"] = end
resp = await self._request_public("GET", "/v5/market/kline", params=params) resp = await self._request_public("GET", "/v5/market/kline", params=params)
rows = (resp.get("result") or {}).get("list") or [] rows = (resp.get("result") or {}).get("list") or []
rows_sorted = sorted(rows, key=lambda r: int(r[0])) candles = validate_candles([
candles = [
{ {
"timestamp": int(r[0]), "timestamp": int(r[0]),
"open": float(r[1]), "open": r[1],
"high": float(r[2]), "high": r[2],
"low": float(r[3]), "low": r[3],
"close": float(r[4]), "close": r[4],
"volume": float(r[5]), "volume": r[5],
} }
for r in rows_sorted for r in rows
] ])
return {"symbol": symbol, "candles": candles} return {"symbol": symbol, "candles": candles}
async def get_indicators( async def get_indicators(
+18 -16
View File
@@ -11,10 +11,11 @@ from fastapi import HTTPException
from cerbero_mcp.common import indicators as ind from cerbero_mcp.common import indicators as ind
from cerbero_mcp.common import microstructure as micro from cerbero_mcp.common import microstructure as micro
from cerbero_mcp.common import options as opt from cerbero_mcp.common import options as opt
from cerbero_mcp.common.candles import validate_candles
from cerbero_mcp.common.http import async_client from cerbero_mcp.common.http import async_client
def _parse_deribit_response(resp) -> dict: def _parse_deribit_response(resp: Any) -> dict[str, Any]:
"""Map Deribit upstream errors to a clean HTTP 502 (retryable) instead of """Map Deribit upstream errors to a clean HTTP 502 (retryable) instead of
leaking JSONDecodeError when the body is HTML (e.g. Cloudflare 5xx page).""" leaking JSONDecodeError when the body is HTML (e.g. Cloudflare 5xx page)."""
if resp.status_code >= 500: if resp.status_code >= 500:
@@ -23,7 +24,8 @@ def _parse_deribit_response(resp) -> dict:
detail=f"Deribit upstream HTTP {resp.status_code}", detail=f"Deribit upstream HTTP {resp.status_code}",
) )
try: try:
return resp.json() data: dict[str, Any] = resp.json()
return data
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise HTTPException( raise HTTPException(
status_code=502, status_code=502,
@@ -121,10 +123,10 @@ class DeribitClient:
resp = await http.get(url, params=request_params, headers=headers) resp = await http.get(url, params=request_params, headers=headers)
data = _parse_deribit_response(resp) data = _parse_deribit_response(resp)
if "result" in data: if "result" in data:
return data # type: ignore[no-any-return] return data
return {"result": None, "error": error_msg} return {"result": None, "error": error_msg}
return data # type: ignore[no-any-return] return data
# ── Read tools ─────────────────────────────────────────────── # ── Read tools ───────────────────────────────────────────────
@@ -418,24 +420,24 @@ class DeribitClient:
}, },
) )
r = raw.get("result") or {} r = raw.get("result") or {}
candles = []
ticks = r.get("ticks", []) or [] ticks = r.get("ticks", []) or []
opens = r.get("open", []) or [] opens = r.get("open", []) or []
highs = r.get("high", []) or [] highs = r.get("high", []) or []
lows = r.get("low", []) or [] lows = r.get("low", []) or []
closes = r.get("close", []) or [] closes = r.get("close", []) or []
volumes = r.get("volume", []) or [] volumes = r.get("volume", []) or []
for idx, ts in enumerate(ticks): n = min(len(ticks), len(opens), len(highs), len(lows), len(closes), len(volumes))
if idx >= min(len(opens), len(highs), len(lows), len(closes), len(volumes)): candles = validate_candles([
break {
candles.append({ "timestamp": ticks[i],
"timestamp": ts, "open": opens[i],
"open": opens[idx], "high": highs[i],
"high": highs[idx], "low": lows[i],
"low": lows[idx], "close": closes[i],
"close": closes[idx], "volume": volumes[i],
"volume": volumes[idx], }
}) for i in range(n)
])
return {"candles": candles} return {"candles": candles}
async def get_dvol( async def get_dvol(
+10 -10
View File
@@ -27,6 +27,7 @@ from eth_account.messages import encode_typed_data
from eth_utils import keccak, to_hex from eth_utils import keccak, to_hex
from cerbero_mcp.common import indicators as ind from cerbero_mcp.common import indicators as ind
from cerbero_mcp.common.candles import validate_candles
from cerbero_mcp.common.http import async_client from cerbero_mcp.common.http import async_client
BASE_LIVE = "https://api.hyperliquid.xyz" BASE_LIVE = "https://api.hyperliquid.xyz"
@@ -408,18 +409,17 @@ class HyperliquidClient:
}, },
} }
) )
candles = [] candles = validate_candles([
for c in data:
candles.append(
{ {
"timestamp": c.get("t", 0), "timestamp": c.get("t"),
"open": float(c.get("o", 0)), "open": c.get("o"),
"high": float(c.get("h", 0)), "high": c.get("h"),
"low": float(c.get("l", 0)), "low": c.get("l"),
"close": float(c.get("c", 0)), "close": c.get("c"),
"volume": float(c.get("v", 0)), "volume": c.get("v"),
} }
) for c in data
])
return {"candles": candles} return {"candles": candles}
async def get_open_orders(self) -> list[dict[str, Any]]: async def get_open_orders(self) -> list[dict[str, Any]]:
+8 -6
View File
@@ -9,6 +9,7 @@ from typing import Any
import httpx import httpx
from cerbero_mcp.common.candles import validate_candles
from cerbero_mcp.common.http import async_client from cerbero_mcp.common.http import async_client
from cerbero_mcp.exchanges.ibkr.oauth import ( from cerbero_mcp.exchanges.ibkr.oauth import (
IBKRAuthError, IBKRAuthError,
@@ -234,11 +235,7 @@ class IBKRClient:
params={"conid": str(conid), "period": period, "bar": bar}, params={"conid": str(conid), "period": period, "bar": bar},
) )
rows = (data or {}).get("data") or [] rows = (data or {}).get("data") or []
return { candles = validate_candles([
"symbol": symbol,
"asset_class": asset_class,
"interval": bar,
"bars": [
{ {
"timestamp": r.get("t"), "timestamp": r.get("t"),
"open": r.get("o"), "open": r.get("o"),
@@ -248,7 +245,12 @@ class IBKRClient:
"volume": r.get("v"), "volume": r.get("v"),
} }
for r in rows for r in rows
], ])
return {
"symbol": symbol,
"asset_class": asset_class,
"interval": bar,
"candles": candles,
} }
async def get_option_chain( async def get_option_chain(
+72
View File
@@ -0,0 +1,72 @@
from __future__ import annotations
import pytest
from cerbero_mcp.common.candles import Candle, validate_candles
from fastapi import HTTPException
def test_valid_candle():
c = Candle(timestamp=1_700_000_000_000, open=100.0, high=110.0,
low=95.0, close=105.0, volume=12.5)
assert c.high == 110.0
def test_high_below_close_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=100, high=90, low=80, close=95, volume=1)
def test_high_below_open_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=100, high=90, low=80, close=85, volume=1)
def test_low_above_close_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=100, high=110, low=105, close=102, volume=1)
def test_low_above_open_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=95, high=110, low=100, close=105, volume=1)
def test_negative_volume_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=100, high=110, low=90, close=105, volume=-1)
def test_non_positive_timestamp_rejected():
with pytest.raises(ValueError):
Candle(timestamp=0, open=100, high=110, low=90, close=105, volume=1)
def test_validate_candles_sorts_by_timestamp():
raw = [
{"timestamp": 3, "open": 1, "high": 2, "low": 1, "close": 1, "volume": 0},
{"timestamp": 1, "open": 1, "high": 2, "low": 1, "close": 1, "volume": 0},
{"timestamp": 2, "open": 1, "high": 2, "low": 1, "close": 1, "volume": 0},
]
out = validate_candles(raw)
assert [c["timestamp"] for c in out] == [1, 2, 3]
def test_validate_candles_coerces_string_numerics():
raw = [{"timestamp": "1", "open": "100", "high": "110",
"low": "90", "close": "105", "volume": "10"}]
out = validate_candles(raw)
assert out[0]["open"] == 100.0
assert isinstance(out[0]["volume"], float)
def test_validate_candles_malformed_raises_http_502():
raw = [{"timestamp": 1, "open": 100, "high": 50, "low": 90,
"close": 105, "volume": 1}]
with pytest.raises(HTTPException) as exc_info:
validate_candles(raw)
assert exc_info.value.status_code == 502
assert "candle" in str(exc_info.value.detail).lower()
def test_validate_candles_empty_list():
assert validate_candles([]) == []
+2 -2
View File
@@ -271,8 +271,8 @@ async def test_get_bars_stocks(httpx_mock: HTTPXMock, client: AlpacaClient):
) )
assert result["symbol"] == "AAPL" assert result["symbol"] == "AAPL"
assert result["interval"] == "1d" assert result["interval"] == "1d"
assert len(result["bars"]) == 1 assert len(result["candles"]) == 1
assert result["bars"][0]["close"] == 175.0 assert result["candles"][0]["close"] == 175.0
@pytest.mark.asyncio @pytest.mark.asyncio