from __future__ import annotations import asyncio import json from unittest.mock import AsyncMock, MagicMock import pytest from cerbero_mcp.exchanges.ibkr.ws import IBKRWebSocket, WSError class FakeWS: """Bidirectional async fake for WSS messages.""" def __init__(self) -> None: self.sent: list[str] = [] self._inbox: asyncio.Queue[str] = asyncio.Queue() self.closed = False async def send(self, msg: str) -> None: self.sent.append(msg) async def recv(self) -> str: return await self._inbox.get() async def close(self) -> None: self.closed = True async def push(self, payload: dict) -> None: await self._inbox.put(json.dumps(payload)) @pytest.fixture def fake_signer(): s = MagicMock() s.get_live_session_token = AsyncMock(return_value="LST==") return s @pytest.mark.asyncio async def test_subscribe_tick_caches_snapshot(fake_signer, monkeypatch): fake_ws = FakeWS() async def fake_connect(url, **kw): return fake_ws monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect) ws = IBKRWebSocket( signer=fake_signer, ws_url="wss://api.ibkr.com/v1/api/ws", base_url="https://api.ibkr.com/v1/api", max_subs=80, idle_timeout_s=300, ) await ws.start() await ws.subscribe_tick(265598) await fake_ws.push({ "topic": "smd+265598", "31": "150.5", "84": "150.4", "86": "150.6", "7295": "100", "7296": "200", }) await asyncio.sleep(0.05) snap = ws.get_tick_snapshot(265598) assert snap is not None assert snap["last_price"] == 150.5 assert snap["bid"] == 150.4 await ws.stop() @pytest.mark.asyncio async def test_subscribe_limit(fake_signer, monkeypatch): fake_ws = FakeWS() async def fake_connect(url, **kw): return fake_ws monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect) ws = IBKRWebSocket( signer=fake_signer, ws_url="wss://x", base_url="https://x", max_subs=2, idle_timeout_s=300, ) await ws.start() await ws.subscribe_tick(1) await ws.subscribe_tick(2) with pytest.raises(WSError, match="IBKR_WS_SUB_LIMIT"): await ws.subscribe_tick(3) await ws.stop() @pytest.mark.asyncio async def test_subscribe_before_start_raises(fake_signer): ws = IBKRWebSocket( signer=fake_signer, ws_url="wss://x", base_url="https://x", max_subs=10, idle_timeout_s=300, ) with pytest.raises(WSError, match="IBKR_WS_NOT_STARTED"): await ws.subscribe_tick(1) @pytest.mark.asyncio async def test_start_after_stop_resumes_reader(fake_signer, monkeypatch): fake_ws_a = FakeWS() fake_ws_b = FakeWS() fakes = iter([fake_ws_a, fake_ws_b]) async def fake_connect(url, **kw): return next(fakes) monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect) ws = IBKRWebSocket( signer=fake_signer, ws_url="wss://x", base_url="https://x", max_subs=10, idle_timeout_s=300, ) await ws.start() await ws.stop() # Restart with fresh fake_ws_b await ws.start() await ws.subscribe_tick(42) await fake_ws_b.push({"topic": "smd+42", "31": "100"}) await asyncio.sleep(0.05) assert ws.get_tick_snapshot(42) is not None await ws.stop()