6266708e15
Code review fixes (commit 17700d2):
- _stopped reset on start() (was stuck True after stop→start)
- _require_started guard on subscribe_*/unsubscribe (clear WSError vs AttributeError)
- _reader_loop logs disconnect via logger.warning + sets _ws=None for `connected` signal
- Class docstring documents stale-snapshot behavior + deferred reconnect
- New tests: subscribe-before-start, stop→start cycle resumption
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
125 lines
3.4 KiB
Python
125 lines
3.4 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
from cerbero_mcp.exchanges.ibkr.ws import IBKRWebSocket, WSError
|
|
|
|
|
|
class FakeWS:
|
|
"""Bidirectional async fake for WSS messages."""
|
|
def __init__(self) -> None:
|
|
self.sent: list[str] = []
|
|
self._inbox: asyncio.Queue[str] = asyncio.Queue()
|
|
self.closed = False
|
|
async def send(self, msg: str) -> None:
|
|
self.sent.append(msg)
|
|
async def recv(self) -> str:
|
|
return await self._inbox.get()
|
|
async def close(self) -> None:
|
|
self.closed = True
|
|
async def push(self, payload: dict) -> None:
|
|
await self._inbox.put(json.dumps(payload))
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_signer():
|
|
s = MagicMock()
|
|
s.get_live_session_token = AsyncMock(return_value="LST==")
|
|
return s
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscribe_tick_caches_snapshot(fake_signer, monkeypatch):
|
|
fake_ws = FakeWS()
|
|
|
|
async def fake_connect(url, **kw):
|
|
return fake_ws
|
|
|
|
monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect)
|
|
|
|
ws = IBKRWebSocket(
|
|
signer=fake_signer,
|
|
ws_url="wss://api.ibkr.com/v1/api/ws",
|
|
base_url="https://api.ibkr.com/v1/api",
|
|
max_subs=80, idle_timeout_s=300,
|
|
)
|
|
await ws.start()
|
|
await ws.subscribe_tick(265598)
|
|
|
|
await fake_ws.push({
|
|
"topic": "smd+265598",
|
|
"31": "150.5", "84": "150.4", "86": "150.6",
|
|
"7295": "100", "7296": "200",
|
|
})
|
|
await asyncio.sleep(0.05)
|
|
|
|
snap = ws.get_tick_snapshot(265598)
|
|
assert snap is not None
|
|
assert snap["last_price"] == 150.5
|
|
assert snap["bid"] == 150.4
|
|
|
|
await ws.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscribe_limit(fake_signer, monkeypatch):
|
|
fake_ws = FakeWS()
|
|
|
|
async def fake_connect(url, **kw):
|
|
return fake_ws
|
|
|
|
monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect)
|
|
|
|
ws = IBKRWebSocket(
|
|
signer=fake_signer,
|
|
ws_url="wss://x", base_url="https://x",
|
|
max_subs=2, idle_timeout_s=300,
|
|
)
|
|
await ws.start()
|
|
await ws.subscribe_tick(1)
|
|
await ws.subscribe_tick(2)
|
|
with pytest.raises(WSError, match="IBKR_WS_SUB_LIMIT"):
|
|
await ws.subscribe_tick(3)
|
|
await ws.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscribe_before_start_raises(fake_signer):
|
|
ws = IBKRWebSocket(
|
|
signer=fake_signer,
|
|
ws_url="wss://x", base_url="https://x",
|
|
max_subs=10, idle_timeout_s=300,
|
|
)
|
|
with pytest.raises(WSError, match="IBKR_WS_NOT_STARTED"):
|
|
await ws.subscribe_tick(1)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_after_stop_resumes_reader(fake_signer, monkeypatch):
|
|
fake_ws_a = FakeWS()
|
|
fake_ws_b = FakeWS()
|
|
fakes = iter([fake_ws_a, fake_ws_b])
|
|
|
|
async def fake_connect(url, **kw):
|
|
return next(fakes)
|
|
|
|
monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect)
|
|
|
|
ws = IBKRWebSocket(
|
|
signer=fake_signer,
|
|
ws_url="wss://x", base_url="https://x",
|
|
max_subs=10, idle_timeout_s=300,
|
|
)
|
|
await ws.start()
|
|
await ws.stop()
|
|
# Restart with fresh fake_ws_b
|
|
await ws.start()
|
|
await ws.subscribe_tick(42)
|
|
await fake_ws_b.push({"topic": "smd+42", "31": "100"})
|
|
await asyncio.sleep(0.05)
|
|
assert ws.get_tick_snapshot(42) is not None
|
|
await ws.stop()
|