Compare commits

...

5 Commits

Author SHA1 Message Date
root 91aadaea6a docs(V2): document /mcp-cross historical aggregator
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 21:42:20 +00:00
root 0ba5a05219 feat(V2): /mcp-cross/tools/get_historical with cross-exchange consensus
Add a unified historical endpoint that fans out to every exchange
supporting the requested (asset_class, symbol) pair, then merges the
results into a single consensus candle series with per-bar divergence
metrics:
  - candles[i].close = median across sources
  - candles[i].sources = count of contributing exchanges
  - candles[i].div_pct = (max-min)/median for that bar's close

Crypto routes BTC/ETH/SOL across Bybit + Hyperliquid + Deribit; equities
route to Alpaca for now (IBKR omitted from MVP because its bars endpoint
takes a relative period instead of start/end). Partial failures return a
warning envelope (failed_sources) instead of failing the whole request;
all sources failing → HTTP 502.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 21:41:18 +00:00
root c94312d79f feat(V2): shared Candle validator + uniform 'candles' response key
Introduce common/candles.py with a Pydantic Candle model enforcing OHLC
consistency (high≥max, low≤min), non-negative volume and positive
timestamp. validate_candles() coerces upstream rows, sorts by timestamp
and raises HTTPException(502) on malformed data — surfacing upstream
data corruption as a retryable envelope instead of silently returning
nonsense.

Wired into all five exchange historical endpoints (Bybit, Hyperliquid,
Deribit, Alpaca, IBKR). BREAKING: Alpaca get_bars and IBKR get_bars now
return 'candles' (was 'bars') to align with the other exchanges.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 19:19:20 +00:00
root 110ca7f5cf docs(V2): update README for IBKR integration
Add IBKR to the exchange list, endpoint table, audit filter values, and
Tool disponibili. Bump test count to 366 and reorder IBKR Setup before
Licenza.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 08:54:54 +00:00
root a56baad3dd fix(V2): hoist _IBKRRotateConfirmReq to module level
Defining the Pydantic body model inside make_admin_router() leaves an
unresolved forward reference under `from __future__ import annotations`,
which breaks /openapi.json generation with PydanticUserError.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 08:45:20 +00:00
21 changed files with 809 additions and 68 deletions
+31 -9
View File
@@ -9,8 +9,8 @@ sul token bearer fornito dal client.
- **Una singola immagine Docker** (`cerbero-mcp`) ospita tutti i router
exchange in un unico processo FastAPI
- **Quattro exchange** (Deribit, Bybit, Hyperliquid, Alpaca) e **due data
provider** read-only (Macro, Sentiment)
- **Cinque exchange** (Deribit, Bybit, Hyperliquid, Alpaca, IBKR) e **due
data provider** read-only (Macro, Sentiment)
- **Switch testnet/mainnet per-request** tramite header
`Authorization: Bearer <TOKEN>`: lo stesso container serve entrambi gli
ambienti senza riavvii
@@ -19,7 +19,10 @@ sul token bearer fornito dal client.
override-abili tramite variabili dedicate (`DERIBIT_URL_*`,
`BYBIT_URL_*`, `HYPERLIQUID_URL_*`, `ALPACA_URL_*`)
- **Documentazione interattiva** OpenAPI/Swagger esposta a `/apidocs`
- **Qualità verificata**: 310 test (unit + integration + smoke), mypy
- **Endpoint cross-exchange unificato** (`/mcp-cross/tools/get_historical`):
fan-out a tutti gli exchange che supportano (symbol, asset_class) e
consensus per-bar (mediana OHLC + `div_pct` + `sources`)
- **Qualità verificata**: 399 test (unit + integration + smoke), mypy
pulito, ruff pulito
## Avvio rapido (sviluppo, senza Docker)
@@ -88,8 +91,10 @@ non è richiesto sugli endpoint pubblici (`/health`, `/apidocs`,
| `POST /mcp-bybit/tools/{tool}` | Tool exchange Bybit |
| `POST /mcp-hyperliquid/tools/{tool}` | Tool exchange Hyperliquid |
| `POST /mcp-alpaca/tools/{tool}` | Tool exchange Alpaca |
| `POST /mcp-ibkr/tools/{tool}` | Tool exchange Interactive Brokers |
| `POST /mcp-macro/tools/{tool}` | Tool macro/market data |
| `POST /mcp-sentiment/tools/{tool}` | Tool sentiment/news |
| `POST /mcp-cross/tools/get_historical` | Storico aggregato cross-exchange con consensus + divergenza |
| `GET /admin/audit` | Query dell'audit log JSONL (bearer richiesto, no X-Bot-Tag) |
## Observability
@@ -140,7 +145,7 @@ Parametri di query (tutti opzionali):
- `from`, `to`: ISO 8601 datetime (es. `2026-05-01` o `2026-05-01T12:34:56Z`)
- `actor`: `testnet` | `mainnet`
- `exchange`: nome dell'exchange (`deribit`, `bybit`, `hyperliquid`, `alpaca`)
- `exchange`: nome dell'exchange (`deribit`, `bybit`, `hyperliquid`, `alpaca`, `ibkr`)
- `action`: nome del tool (es. `place_order`)
- `bot_tag`: identificatore del bot
- `limit`: massimo record restituiti, default `1000`, massimo `10000`
@@ -186,6 +191,13 @@ rate, basis spot/perp, place_order, set_stop_loss, set_take_profit.
Account, positions, bars, snapshot, option chain, place_order,
amend_order, cancel_order, close_position.
### IBKR (Interactive Brokers)
Account, positions, activities, ticker, bars, snapshot, option chain,
search_contracts, clock, streaming (tick + depth via WebSocket
singleton), place_order, amend_order, cancel_order, close_position,
bracket/OCO/OTO orders. Auth via OAuth 1.0a Self-Service con minting
session token unattended (vedi sezione "IBKR Setup" più sotto).
### Macro
Treasury yields, FRED indicators, equity futures, asset prices, calendar,
get_yield_curve_slope, get_breakeven_inflation, get_cot_tff,
@@ -196,6 +208,16 @@ News (CryptoPanic/CoinDesk), social (LunarCrush), funding multi-exchange,
OI history, get_funding_arb_spread, get_liquidation_heatmap,
get_cointegration_pairs.
### Cross (storico unificato)
`get_historical` aggrega le candele dello stesso simbolo da tutti gli
exchange che lo supportano e ritorna una serie consensus: la chiusura è
la mediana, `sources` è il numero di exchange che hanno contribuito al
bar e `div_pct = (max-min)/median` segnala il disaccordo tra fonti — un
quality gate per i bot. Crypto: BTC/ETH/SOL via Bybit + Hyperliquid +
Deribit. Stocks: AAPL/SPY/QQQ/TSLA/NVDA via Alpaca. In caso di fallimento
parziale ritorna i dati disponibili più `failed_sources`; se *tutti* gli
upstream falliscono → HTTP 502 retryable.
## Deploy su VPS con Traefik
Sul VPS la rete pubblica (TLS, allowlist IP, rate limit) è gestita da
@@ -280,7 +302,7 @@ PORT=9000 TESTNET_TOKEN="$TESTNET_TOKEN" bash tests/smoke/run.sh
```bash
uv sync
uv run pytest # tutta la suite (310 test attesi)
uv run pytest # tutta la suite (399 test attesi)
uv run pytest tests/unit -v # solo unit
uv run pytest tests/integration -v
uv run ruff check src/ tests/
@@ -361,10 +383,6 @@ pybit (workaround documentato nel client). Per Alpaca l'override è
applicato al solo trading endpoint: gli endpoint dati
(`data.alpaca.markets`) restano quelli predefiniti dell'SDK.
## Licenza
Privato.
## IBKR Setup
IBKR uses OAuth 1.0a Self-Service for fully unattended runtime auth. Setup is
@@ -427,3 +445,7 @@ curl -X POST "https://cerbero-mcp.<dom>/admin/ibkr/rotate-keys/confirm?env=testn
-H "Authorization: Bearer <ADMIN_TOKEN>" -H "Content-Type: application/json" \
-d '{"new_consumer_key":"...","new_access_token":"...","new_access_token_secret":"..."}'
```
## Licenza
Privato.
+2
View File
@@ -23,6 +23,7 @@ from cerbero_mcp.exchanges import build_client
from cerbero_mcp.routers import (
alpaca,
bybit,
cross,
deribit,
hyperliquid,
ibkr,
@@ -71,6 +72,7 @@ def _make_app(settings: Settings) -> FastAPI:
app.include_router(ibkr.make_router())
app.include_router(macro.make_router())
app.include_router(sentiment.make_router())
app.include_router(cross.make_router())
app.include_router(admin.make_admin_router())
return app
+6 -5
View File
@@ -16,6 +16,12 @@ MAX_RECORDS = 10000
DEFAULT_LIMIT = 1000
class _IBKRRotateConfirmReq(BaseModel):
new_consumer_key: str
new_access_token: str
new_access_token_secret: str
def _parse_iso(value: str | None) -> datetime | None:
if not value:
return None
@@ -158,11 +164,6 @@ def make_admin_router() -> APIRouter:
},
}
class _IBKRRotateConfirmReq(BaseModel):
new_consumer_key: str
new_access_token: str
new_access_token_secret: str
@r.post("/ibkr/rotate-keys/start")
async def _ibkr_rotate_start(env: str, request: Request):
if env not in ("testnet", "mainnet"):
+53
View File
@@ -0,0 +1,53 @@
"""Shared OHLCV candle model + validator for exchange historical endpoints."""
from __future__ import annotations
from typing import Any
from fastapi import HTTPException
from pydantic import BaseModel, ConfigDict, ValidationError, model_validator
class Candle(BaseModel):
model_config = ConfigDict(extra="ignore")
timestamp: int
open: float
high: float
low: float
close: float
volume: float
@model_validator(mode="after")
def _check(self) -> Candle:
if self.timestamp <= 0:
raise ValueError(f"timestamp must be > 0, got {self.timestamp}")
if self.volume < 0:
raise ValueError(f"volume must be >= 0, got {self.volume}")
if self.high < max(self.open, self.close, self.low):
raise ValueError(
f"high {self.high} < max(open={self.open}, "
f"close={self.close}, low={self.low})"
)
if self.low > min(self.open, self.close, self.high):
raise ValueError(
f"low {self.low} > min(open={self.open}, "
f"close={self.close}, high={self.high})"
)
return self
def validate_candles(raw: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Coerce upstream rows into validated candle dicts, sorted by timestamp.
Raises HTTPException(502) if any row violates OHLC consistency or schema —
upstream data corruption is mapped to a retryable error envelope.
"""
try:
candles = [Candle.model_validate(row) for row in raw]
except ValidationError as e:
raise HTTPException(
status_code=502,
detail=f"upstream returned malformed candle: {e.errors()[0]['msg']}",
) from e
candles.sort(key=lambda c: c.timestamp)
return [c.model_dump() for c in candles]
+13 -4
View File
@@ -20,6 +20,7 @@ from typing import Any
import httpx
from cerbero_mcp.common.candles import validate_candles
from cerbero_mcp.common.http import async_client
# ── Endpoint base ────────────────────────────────────────────────
@@ -301,9 +302,17 @@ class AlpacaClient:
bars_dict = (data or {}).get("bars") or {}
rows = bars_dict.get(symbol) or []
bars = [
def _iso_to_ms(ts: str | int | None) -> int | None:
if ts is None or isinstance(ts, int):
return ts
return int(_dt.datetime.fromisoformat(
ts.replace("Z", "+00:00")
).timestamp() * 1000)
candles = validate_candles([
{
"timestamp": b.get("t"),
"timestamp": _iso_to_ms(b.get("t")),
"open": b.get("o"),
"high": b.get("h"),
"low": b.get("l"),
@@ -311,12 +320,12 @@ class AlpacaClient:
"volume": b.get("v"),
}
for b in rows
]
])
return {
"symbol": symbol,
"asset_class": ac,
"interval": interval,
"bars": bars,
"candles": candles,
}
async def get_snapshot(self, symbol: str) -> dict:
+9 -9
View File
@@ -22,6 +22,7 @@ import httpx
from cerbero_mcp.common import indicators as ind
from cerbero_mcp.common import microstructure as micro
from cerbero_mcp.common.candles import validate_candles
BASE_MAINNET = "https://api.bybit.com"
BASE_TESTNET = "https://api-testnet.bybit.com"
@@ -254,18 +255,17 @@ class BybitClient:
params["end"] = end
resp = await self._request_public("GET", "/v5/market/kline", params=params)
rows = (resp.get("result") or {}).get("list") or []
rows_sorted = sorted(rows, key=lambda r: int(r[0]))
candles = [
candles = validate_candles([
{
"timestamp": int(r[0]),
"open": float(r[1]),
"high": float(r[2]),
"low": float(r[3]),
"close": float(r[4]),
"volume": float(r[5]),
"open": r[1],
"high": r[2],
"low": r[3],
"close": r[4],
"volume": r[5],
}
for r in rows_sorted
]
for r in rows
])
return {"symbol": symbol, "candles": candles}
async def get_indicators(
+146
View File
@@ -0,0 +1,146 @@
"""Cross-exchange historical aggregator.
Fan-out a canonical (symbol, asset_class, interval, start, end) request to
every active exchange that supports the pair, then merge the results into
a single consensus candle series with per-bar divergence metrics.
"""
from __future__ import annotations
import asyncio
import datetime as _dt
from typing import Any, Literal, Protocol
from fastapi import HTTPException
from cerbero_mcp.exchanges.cross.consensus import merge_candles
from cerbero_mcp.exchanges.cross.symbol_map import (
get_sources,
supported_intervals,
to_native_interval,
to_native_symbol,
)
Environment = Literal["testnet", "mainnet"]
class _Registry(Protocol):
async def get(self, exchange: str, env: Environment) -> Any: ...
def _iso_to_ms(s: str) -> int:
return int(_dt.datetime.fromisoformat(
s.replace("Z", "+00:00")
).timestamp() * 1000)
async def _call_bybit(client: Any, sym: str, interval: str,
start: str, end: str) -> dict[str, Any]:
resp: dict[str, Any] = await client.get_historical(
symbol=sym, category="linear", interval=interval,
start=_iso_to_ms(start), end=_iso_to_ms(end),
)
return resp
async def _call_hyperliquid(client: Any, sym: str, interval: str,
start: str, end: str) -> dict[str, Any]:
resp: dict[str, Any] = await client.get_historical(
instrument=sym, start_date=start, end_date=end, resolution=interval,
)
return resp
async def _call_deribit(client: Any, sym: str, interval: str,
start: str, end: str) -> dict[str, Any]:
resp: dict[str, Any] = await client.get_historical(
instrument=sym, start_date=start, end_date=end, resolution=interval,
)
return resp
async def _call_alpaca(client: Any, sym: str, interval: str,
start: str, end: str) -> dict[str, Any]:
resp: dict[str, Any] = await client.get_bars(
symbol=sym, asset_class="stocks", interval=interval,
start=start, end=end,
)
return resp
_DISPATCH = {
"bybit": _call_bybit,
"hyperliquid": _call_hyperliquid,
"deribit": _call_deribit,
"alpaca": _call_alpaca,
}
class CrossClient:
def __init__(self, registry: _Registry, *, env: Environment):
self._registry = registry
self._env = env
async def _fetch_one(
self, exchange: str, native_sym: str, native_interval: str,
start: str, end: str,
) -> tuple[str, list[dict[str, Any]] | Exception]:
try:
client = await self._registry.get(exchange, self._env)
resp = await _DISPATCH[exchange](
client, native_sym, native_interval, start, end,
)
return exchange, resp.get("candles", [])
except Exception as e: # noqa: BLE001
return exchange, e
async def get_historical(
self, *, symbol: str, asset_class: str, interval: str,
start_date: str, end_date: str,
) -> dict[str, Any]:
sources = get_sources(asset_class, symbol)
if not sources:
raise HTTPException(
status_code=400,
detail=f"unsupported symbol/asset_class: {symbol} ({asset_class})",
)
if interval not in supported_intervals():
raise HTTPException(
status_code=400,
detail=f"unsupported interval: {interval}; "
f"supported: {supported_intervals()}",
)
tasks = [
self._fetch_one(
ex,
to_native_symbol(asset_class, symbol, ex),
to_native_interval(interval, ex),
start_date, end_date,
)
for ex in sources
]
results = await asyncio.gather(*tasks)
by_source: dict[str, list[dict[str, Any]]] = {}
failed: list[dict[str, str]] = []
for ex, payload in results:
if isinstance(payload, Exception):
failed.append({"exchange": ex, "error": f"{type(payload).__name__}: {payload}"})
else:
by_source[ex] = payload
if not by_source:
raise HTTPException(
status_code=502,
detail={"error": "all sources failed", "failed_sources": failed},
)
return {
"symbol": symbol.upper(),
"asset_class": asset_class,
"interval": interval,
"candles": merge_candles(by_source),
"sources_used": sorted(by_source.keys()),
"failed_sources": failed,
}
@@ -0,0 +1,37 @@
"""Pure consensus aggregation: merge per-source OHLCV candles by timestamp.
The output is a single time-series with the median OHLC across sources,
mean volume, the contributing source count, and a divergence % computed
on the close range. div_pct gives a quick quality signal: 0 means full
agreement, > X% means at least one source is suspect.
"""
from __future__ import annotations
from collections import defaultdict
from statistics import median
from typing import Any
def merge_candles(by_source: dict[str, list[dict[str, Any]]]) -> list[dict[str, Any]]:
grouped: dict[int, list[dict[str, Any]]] = defaultdict(list)
for candles in by_source.values():
for c in candles:
grouped[int(c["timestamp"])].append(c)
out: list[dict[str, Any]] = []
for ts in sorted(grouped):
rows = grouped[ts]
closes = [float(r["close"]) for r in rows]
med_close = float(median(closes))
div_pct = (max(closes) - min(closes)) / med_close if med_close else 0.0
out.append({
"timestamp": ts,
"open": float(median(float(r["open"]) for r in rows)),
"high": float(median(float(r["high"]) for r in rows)),
"low": float(median(float(r["low"]) for r in rows)),
"close": med_close,
"volume": sum(float(r["volume"]) for r in rows) / len(rows),
"sources": len(rows),
"div_pct": div_pct,
})
return out
@@ -0,0 +1,60 @@
"""Routing table: canonical (asset_class, symbol, interval) → per-exchange native.
Crypto canonical symbols default to USD/USDT-quoted perpetuals on the most
liquid pair available. Equities currently route to Alpaca only — IBKR is
omitted from the cross MVP because its bars endpoint takes a relative
period instead of (start, end).
"""
from __future__ import annotations
AssetClass = str
_CRYPTO_SYMBOLS: dict[str, dict[str, str]] = {
"BTC": {"bybit": "BTCUSDT", "hyperliquid": "BTC", "deribit": "BTC-PERPETUAL"},
"ETH": {"bybit": "ETHUSDT", "hyperliquid": "ETH", "deribit": "ETH-PERPETUAL"},
"SOL": {"bybit": "SOLUSDT", "hyperliquid": "SOL"},
}
_STOCK_SYMBOLS: dict[str, dict[str, str]] = {
"AAPL": {"alpaca": "AAPL"},
"SPY": {"alpaca": "SPY"},
"QQQ": {"alpaca": "QQQ"},
"TSLA": {"alpaca": "TSLA"},
"NVDA": {"alpaca": "NVDA"},
}
_SYMBOLS: dict[AssetClass, dict[str, dict[str, str]]] = {
"crypto": _CRYPTO_SYMBOLS,
"stocks": _STOCK_SYMBOLS,
}
_INTERVALS: dict[str, dict[str, str]] = {
"1m": {"bybit": "1", "hyperliquid": "1m", "deribit": "1m", "alpaca": "1m"},
"5m": {"bybit": "5", "hyperliquid": "5m", "deribit": "5m", "alpaca": "5m"},
"15m": {"bybit": "15", "hyperliquid": "15m", "deribit": "15m", "alpaca": "15m"},
"1h": {"bybit": "60", "hyperliquid": "1h", "deribit": "1h", "alpaca": "1h"},
"4h": {"bybit": "240", "hyperliquid": "4h", "deribit": "4h", "alpaca": "4h"},
"1d": {"bybit": "D", "hyperliquid": "1d", "deribit": "1d", "alpaca": "1d"},
}
def get_sources(asset_class: AssetClass, symbol: str) -> list[str]:
table = _SYMBOLS.get(asset_class, {})
mapping = table.get(symbol.upper())
if mapping is None:
return []
return list(mapping.keys())
def to_native_symbol(
asset_class: AssetClass, symbol: str, exchange: str
) -> str:
return _SYMBOLS[asset_class][symbol.upper()][exchange]
def to_native_interval(interval: str, exchange: str) -> str:
return _INTERVALS[interval][exchange]
def supported_intervals() -> list[str]:
return list(_INTERVALS.keys())
+28
View File
@@ -0,0 +1,28 @@
"""Pydantic schemas + thin tool wrappers for the /mcp-cross router."""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel
from cerbero_mcp.exchanges.cross.client import CrossClient
AssetClass = Literal["crypto", "stocks"]
class GetHistoricalReq(BaseModel):
symbol: str
asset_class: AssetClass = "crypto"
interval: str = "1h"
start_date: str
end_date: str
async def get_historical(client: CrossClient, params: GetHistoricalReq) -> dict:
return await client.get_historical(
symbol=params.symbol,
asset_class=params.asset_class,
interval=params.interval,
start_date=params.start_date,
end_date=params.end_date,
)
+18 -16
View File
@@ -11,10 +11,11 @@ from fastapi import HTTPException
from cerbero_mcp.common import indicators as ind
from cerbero_mcp.common import microstructure as micro
from cerbero_mcp.common import options as opt
from cerbero_mcp.common.candles import validate_candles
from cerbero_mcp.common.http import async_client
def _parse_deribit_response(resp) -> dict:
def _parse_deribit_response(resp: Any) -> dict[str, Any]:
"""Map Deribit upstream errors to a clean HTTP 502 (retryable) instead of
leaking JSONDecodeError when the body is HTML (e.g. Cloudflare 5xx page)."""
if resp.status_code >= 500:
@@ -23,7 +24,8 @@ def _parse_deribit_response(resp) -> dict:
detail=f"Deribit upstream HTTP {resp.status_code}",
)
try:
return resp.json()
data: dict[str, Any] = resp.json()
return data
except json.JSONDecodeError as e:
raise HTTPException(
status_code=502,
@@ -121,10 +123,10 @@ class DeribitClient:
resp = await http.get(url, params=request_params, headers=headers)
data = _parse_deribit_response(resp)
if "result" in data:
return data # type: ignore[no-any-return]
return data
return {"result": None, "error": error_msg}
return data # type: ignore[no-any-return]
return data
# ── Read tools ───────────────────────────────────────────────
@@ -418,24 +420,24 @@ class DeribitClient:
},
)
r = raw.get("result") or {}
candles = []
ticks = r.get("ticks", []) or []
opens = r.get("open", []) or []
highs = r.get("high", []) or []
lows = r.get("low", []) or []
closes = r.get("close", []) or []
volumes = r.get("volume", []) or []
for idx, ts in enumerate(ticks):
if idx >= min(len(opens), len(highs), len(lows), len(closes), len(volumes)):
break
candles.append({
"timestamp": ts,
"open": opens[idx],
"high": highs[idx],
"low": lows[idx],
"close": closes[idx],
"volume": volumes[idx],
})
n = min(len(ticks), len(opens), len(highs), len(lows), len(closes), len(volumes))
candles = validate_candles([
{
"timestamp": ticks[i],
"open": opens[i],
"high": highs[i],
"low": lows[i],
"close": closes[i],
"volume": volumes[i],
}
for i in range(n)
])
return {"candles": candles}
async def get_dvol(
+10 -10
View File
@@ -27,6 +27,7 @@ from eth_account.messages import encode_typed_data
from eth_utils import keccak, to_hex
from cerbero_mcp.common import indicators as ind
from cerbero_mcp.common.candles import validate_candles
from cerbero_mcp.common.http import async_client
BASE_LIVE = "https://api.hyperliquid.xyz"
@@ -408,18 +409,17 @@ class HyperliquidClient:
},
}
)
candles = []
for c in data:
candles.append(
candles = validate_candles([
{
"timestamp": c.get("t", 0),
"open": float(c.get("o", 0)),
"high": float(c.get("h", 0)),
"low": float(c.get("l", 0)),
"close": float(c.get("c", 0)),
"volume": float(c.get("v", 0)),
"timestamp": c.get("t"),
"open": c.get("o"),
"high": c.get("h"),
"low": c.get("l"),
"close": c.get("c"),
"volume": c.get("v"),
}
)
for c in data
])
return {"candles": candles}
async def get_open_orders(self) -> list[dict[str, Any]]:
+8 -6
View File
@@ -9,6 +9,7 @@ from typing import Any
import httpx
from cerbero_mcp.common.candles import validate_candles
from cerbero_mcp.common.http import async_client
from cerbero_mcp.exchanges.ibkr.oauth import (
IBKRAuthError,
@@ -234,11 +235,7 @@ class IBKRClient:
params={"conid": str(conid), "period": period, "bar": bar},
)
rows = (data or {}).get("data") or []
return {
"symbol": symbol,
"asset_class": asset_class,
"interval": bar,
"bars": [
candles = validate_candles([
{
"timestamp": r.get("t"),
"open": r.get("o"),
@@ -248,7 +245,12 @@ class IBKRClient:
"volume": r.get("v"),
}
for r in rows
],
])
return {
"symbol": symbol,
"asset_class": asset_class,
"interval": bar,
"candles": candles,
}
async def get_option_chain(
+36
View File
@@ -0,0 +1,36 @@
"""Router /mcp-cross/* — historical data with cross-exchange consensus."""
from __future__ import annotations
from typing import Literal, cast
from fastapi import APIRouter, Depends, Request
from cerbero_mcp.client_registry import ClientRegistry
from cerbero_mcp.exchanges.cross import tools as t
from cerbero_mcp.exchanges.cross.client import CrossClient
Environment = Literal["testnet", "mainnet"]
def get_environment(request: Request) -> Environment:
return cast(Environment, request.state.environment)
def get_cross_client(
request: Request, env: Environment = Depends(get_environment),
) -> CrossClient:
registry: ClientRegistry = request.app.state.registry
return CrossClient(registry, env=env)
def make_router() -> APIRouter:
r = APIRouter(prefix="/mcp-cross", tags=["cross"])
@r.post("/tools/get_historical")
async def _get_historical(
params: t.GetHistoricalReq,
client: CrossClient = Depends(get_cross_client),
):
return await t.get_historical(client, params)
return r
+72
View File
@@ -0,0 +1,72 @@
from __future__ import annotations
import pytest
from cerbero_mcp.common.candles import Candle, validate_candles
from fastapi import HTTPException
def test_valid_candle():
c = Candle(timestamp=1_700_000_000_000, open=100.0, high=110.0,
low=95.0, close=105.0, volume=12.5)
assert c.high == 110.0
def test_high_below_close_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=100, high=90, low=80, close=95, volume=1)
def test_high_below_open_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=100, high=90, low=80, close=85, volume=1)
def test_low_above_close_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=100, high=110, low=105, close=102, volume=1)
def test_low_above_open_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=95, high=110, low=100, close=105, volume=1)
def test_negative_volume_rejected():
with pytest.raises(ValueError):
Candle(timestamp=1, open=100, high=110, low=90, close=105, volume=-1)
def test_non_positive_timestamp_rejected():
with pytest.raises(ValueError):
Candle(timestamp=0, open=100, high=110, low=90, close=105, volume=1)
def test_validate_candles_sorts_by_timestamp():
raw = [
{"timestamp": 3, "open": 1, "high": 2, "low": 1, "close": 1, "volume": 0},
{"timestamp": 1, "open": 1, "high": 2, "low": 1, "close": 1, "volume": 0},
{"timestamp": 2, "open": 1, "high": 2, "low": 1, "close": 1, "volume": 0},
]
out = validate_candles(raw)
assert [c["timestamp"] for c in out] == [1, 2, 3]
def test_validate_candles_coerces_string_numerics():
raw = [{"timestamp": "1", "open": "100", "high": "110",
"low": "90", "close": "105", "volume": "10"}]
out = validate_candles(raw)
assert out[0]["open"] == 100.0
assert isinstance(out[0]["volume"], float)
def test_validate_candles_malformed_raises_http_502():
raw = [{"timestamp": 1, "open": 100, "high": 50, "low": 90,
"close": 105, "volume": 1}]
with pytest.raises(HTTPException) as exc_info:
validate_candles(raw)
assert exc_info.value.status_code == 502
assert "candle" in str(exc_info.value.detail).lower()
def test_validate_candles_empty_list():
assert validate_candles([]) == []
+2 -2
View File
@@ -271,8 +271,8 @@ async def test_get_bars_stocks(httpx_mock: HTTPXMock, client: AlpacaClient):
)
assert result["symbol"] == "AAPL"
assert result["interval"] == "1d"
assert len(result["bars"]) == 1
assert result["bars"][0]["close"] == 175.0
assert len(result["candles"]) == 1
assert result["candles"][0]["close"] == 175.0
@pytest.mark.asyncio
+134
View File
@@ -0,0 +1,134 @@
from __future__ import annotations
from typing import Any
import pytest
from cerbero_mcp.exchanges.cross.client import CrossClient
from fastapi import HTTPException
class _Fake:
def __init__(self, candles: list[dict[str, Any]] | None = None,
*, raises: Exception | None = None):
self._candles = candles or []
self._raises = raises
self.calls: list[dict[str, Any]] = []
async def get_historical(self, **kwargs: Any) -> dict[str, Any]:
if self._raises:
raise self._raises
self.calls.append(kwargs)
return {"candles": list(self._candles)}
async def get_bars(self, **kwargs: Any) -> dict[str, Any]:
if self._raises:
raise self._raises
self.calls.append(kwargs)
return {"candles": list(self._candles)}
class _FakeRegistry:
def __init__(self, clients: dict[str, _Fake]):
self._clients = clients
async def get(self, exchange: str, env: str) -> _Fake:
if exchange not in self._clients:
raise KeyError(exchange)
return self._clients[exchange]
def _c(ts: int, close: float = 100.0) -> dict[str, Any]:
return {"timestamp": ts, "open": close, "high": close, "low": close,
"close": close, "volume": 1.0}
@pytest.mark.asyncio
async def test_crypto_three_sources_aggregates():
fakes = {
"bybit": _Fake([_c(1, 100), _c(2, 200)]),
"hyperliquid": _Fake([_c(1, 100), _c(2, 200)]),
"deribit": _Fake([_c(1, 100), _c(2, 200)]),
}
cc = CrossClient(_FakeRegistry(fakes), env="mainnet")
out = await cc.get_historical(
symbol="BTC", asset_class="crypto", interval="1h",
start_date="2026-05-09T00:00:00", end_date="2026-05-10T00:00:00",
)
assert out["symbol"] == "BTC"
assert out["asset_class"] == "crypto"
assert len(out["candles"]) == 2
assert out["candles"][0]["sources"] == 3
assert out["candles"][0]["div_pct"] == 0.0
assert set(out["sources_used"]) == {"bybit", "hyperliquid", "deribit"}
assert out["failed_sources"] == []
@pytest.mark.asyncio
async def test_crypto_partial_failure_returns_partial_with_warning():
fakes = {
"bybit": _Fake([_c(1, 100)]),
"hyperliquid": _Fake([_c(1, 100)]),
"deribit": _Fake(raises=RuntimeError("upstream down")),
}
cc = CrossClient(_FakeRegistry(fakes), env="mainnet")
out = await cc.get_historical(
symbol="BTC", asset_class="crypto", interval="1h",
start_date="2026-05-09T00:00:00", end_date="2026-05-10T00:00:00",
)
assert out["candles"][0]["sources"] == 2
assert set(out["sources_used"]) == {"bybit", "hyperliquid"}
assert len(out["failed_sources"]) == 1
assert out["failed_sources"][0]["exchange"] == "deribit"
assert "upstream down" in out["failed_sources"][0]["error"]
@pytest.mark.asyncio
async def test_all_sources_fail_raises_502():
fakes = {
"bybit": _Fake(raises=RuntimeError("a")),
"hyperliquid": _Fake(raises=RuntimeError("b")),
"deribit": _Fake(raises=RuntimeError("c")),
}
cc = CrossClient(_FakeRegistry(fakes), env="mainnet")
with pytest.raises(HTTPException) as exc_info:
await cc.get_historical(
symbol="BTC", asset_class="crypto", interval="1h",
start_date="2026-05-09T00:00:00", end_date="2026-05-10T00:00:00",
)
assert exc_info.value.status_code == 502
@pytest.mark.asyncio
async def test_unsupported_symbol_raises_400():
cc = CrossClient(_FakeRegistry({}), env="mainnet")
with pytest.raises(HTTPException) as exc_info:
await cc.get_historical(
symbol="NONEXISTENT", asset_class="crypto", interval="1h",
start_date="2026-05-09T00:00:00", end_date="2026-05-10T00:00:00",
)
assert exc_info.value.status_code == 400
@pytest.mark.asyncio
async def test_stocks_routes_to_alpaca_only():
fake = _Fake([_c(1, 175.0)])
cc = CrossClient(_FakeRegistry({"alpaca": fake}), env="mainnet")
out = await cc.get_historical(
symbol="AAPL", asset_class="stocks", interval="1d",
start_date="2026-04-09T00:00:00", end_date="2026-04-10T00:00:00",
)
assert out["sources_used"] == ["alpaca"]
assert out["candles"][0]["close"] == 175.0
# Alpaca was called with native symbol
assert fake.calls[0]["symbol"] == "AAPL"
@pytest.mark.asyncio
async def test_unsupported_interval_raises_400():
cc = CrossClient(_FakeRegistry({}), env="mainnet")
with pytest.raises(HTTPException) as exc_info:
await cc.get_historical(
symbol="BTC", asset_class="crypto", interval="3h",
start_date="2026-05-09T00:00:00", end_date="2026-05-10T00:00:00",
)
assert exc_info.value.status_code == 400
@@ -0,0 +1,90 @@
from __future__ import annotations
from cerbero_mcp.exchanges.cross.consensus import merge_candles
def _c(ts, o, h, l, c, v):
return {"timestamp": ts, "open": o, "high": h, "low": l, "close": c, "volume": v}
def test_empty_input():
assert merge_candles({}) == []
def test_single_source_passthrough():
out = merge_candles({"bybit": [_c(1, 100, 110, 90, 105, 5)]})
assert len(out) == 1
assert out[0]["timestamp"] == 1
assert out[0]["close"] == 105
assert out[0]["sources"] == 1
assert out[0]["div_pct"] == 0.0
def test_three_sources_identical_no_divergence():
src = {
"bybit": [_c(1, 100, 110, 90, 105, 5)],
"hyperliquid": [_c(1, 100, 110, 90, 105, 3)],
"deribit": [_c(1, 100, 110, 90, 105, 7)],
}
out = merge_candles(src)
assert len(out) == 1
assert out[0]["close"] == 105.0
assert out[0]["sources"] == 3
assert out[0]["div_pct"] == 0.0
# volume is mean across sources
assert abs(out[0]["volume"] - 5.0) < 1e-9
def test_three_sources_divergent_close():
src = {
"bybit": [_c(1, 100, 110, 90, 100, 1)],
"hyperliquid": [_c(1, 100, 110, 90, 110, 1)],
"deribit": [_c(1, 100, 110, 90, 105, 1)],
}
out = merge_candles(src)
# median of [100, 110, 105] = 105
assert out[0]["close"] == 105.0
# div_pct = (110 - 100) / 105 ≈ 0.0952
assert abs(out[0]["div_pct"] - 10 / 105) < 1e-6
assert out[0]["sources"] == 3
def test_misaligned_timestamps():
src = {
"bybit": [_c(1, 100, 110, 90, 105, 1), _c(2, 100, 110, 90, 105, 1)],
"hyperliquid": [_c(2, 100, 110, 90, 105, 1), _c(3, 100, 110, 90, 105, 1)],
}
out = merge_candles(src)
timestamps = [c["timestamp"] for c in out]
sources_by_ts = {c["timestamp"]: c["sources"] for c in out}
assert timestamps == [1, 2, 3]
assert sources_by_ts == {1: 1, 2: 2, 3: 1}
def test_two_sources_even_median():
src = {
"bybit": [_c(1, 100, 110, 90, 100, 1)],
"hyperliquid": [_c(1, 100, 110, 90, 110, 1)],
}
out = merge_candles(src)
# even median = mean of two = 105
assert out[0]["close"] == 105.0
def test_empty_source_ignored():
src = {
"bybit": [_c(1, 100, 110, 90, 105, 1)],
"hyperliquid": [],
}
out = merge_candles(src)
assert len(out) == 1
assert out[0]["sources"] == 1
def test_output_sorted_by_timestamp():
src = {
"bybit": [_c(3, 100, 110, 90, 105, 1), _c(1, 100, 110, 90, 105, 1),
_c(2, 100, 110, 90, 105, 1)],
}
out = merge_candles(src)
assert [c["timestamp"] for c in out] == [1, 2, 3]
@@ -0,0 +1,47 @@
from __future__ import annotations
import pytest
from cerbero_mcp.exchanges.cross.symbol_map import (
get_sources,
to_native_interval,
to_native_symbol,
)
def test_btc_crypto_sources():
assert set(get_sources("crypto", "BTC")) == {"bybit", "hyperliquid", "deribit"}
def test_eth_crypto_sources():
assert set(get_sources("crypto", "ETH")) == {"bybit", "hyperliquid", "deribit"}
def test_unknown_crypto_symbol_returns_empty():
assert get_sources("crypto", "DOGEFAKE") == []
def test_stocks_aapl_sources():
assert set(get_sources("stocks", "AAPL")) == {"alpaca"}
def test_native_symbol_btc():
assert to_native_symbol("crypto", "BTC", "bybit") == "BTCUSDT"
assert to_native_symbol("crypto", "BTC", "hyperliquid") == "BTC"
assert to_native_symbol("crypto", "BTC", "deribit") == "BTC-PERPETUAL"
def test_native_symbol_unsupported_pair_raises():
with pytest.raises(KeyError):
to_native_symbol("crypto", "BTC", "alpaca")
def test_native_interval_1h():
assert to_native_interval("1h", "bybit") == "60"
assert to_native_interval("1h", "hyperliquid") == "1h"
assert to_native_interval("1h", "deribit") == "1h"
assert to_native_interval("1h", "alpaca") == "1h"
def test_native_interval_unknown_canonical_raises():
with pytest.raises(KeyError):
to_native_interval("3h", "bybit")