Files
Cerbero-mcp/tests/unit/exchanges/cross/test_symbol_map.py
T
root 0ba5a05219 feat(V2): /mcp-cross/tools/get_historical with cross-exchange consensus
Add a unified historical endpoint that fans out to every exchange
supporting the requested (asset_class, symbol) pair, then merges the
results into a single consensus candle series with per-bar divergence
metrics:
  - candles[i].close = median across sources
  - candles[i].sources = count of contributing exchanges
  - candles[i].div_pct = (max-min)/median for that bar's close

Crypto routes BTC/ETH/SOL across Bybit + Hyperliquid + Deribit; equities
route to Alpaca for now (IBKR omitted from MVP because its bars endpoint
takes a relative period instead of start/end). Partial failures return a
warning envelope (failed_sources) instead of failing the whole request;
all sources failing → HTTP 502.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 21:41:18 +00:00

48 lines
1.3 KiB
Python

from __future__ import annotations
import pytest
from cerbero_mcp.exchanges.cross.symbol_map import (
get_sources,
to_native_interval,
to_native_symbol,
)
def test_btc_crypto_sources():
assert set(get_sources("crypto", "BTC")) == {"bybit", "hyperliquid", "deribit"}
def test_eth_crypto_sources():
assert set(get_sources("crypto", "ETH")) == {"bybit", "hyperliquid", "deribit"}
def test_unknown_crypto_symbol_returns_empty():
assert get_sources("crypto", "DOGEFAKE") == []
def test_stocks_aapl_sources():
assert set(get_sources("stocks", "AAPL")) == {"alpaca"}
def test_native_symbol_btc():
assert to_native_symbol("crypto", "BTC", "bybit") == "BTCUSDT"
assert to_native_symbol("crypto", "BTC", "hyperliquid") == "BTC"
assert to_native_symbol("crypto", "BTC", "deribit") == "BTC-PERPETUAL"
def test_native_symbol_unsupported_pair_raises():
with pytest.raises(KeyError):
to_native_symbol("crypto", "BTC", "alpaca")
def test_native_interval_1h():
assert to_native_interval("1h", "bybit") == "60"
assert to_native_interval("1h", "hyperliquid") == "1h"
assert to_native_interval("1h", "deribit") == "1h"
assert to_native_interval("1h", "alpaca") == "1h"
def test_native_interval_unknown_canonical_raises():
with pytest.raises(KeyError):
to_native_interval("3h", "bybit")