From 6266708e159285c460236539c40d777e192bf717 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 3 May 2026 21:18:57 +0000 Subject: [PATCH] =?UTF-8?q?refactor(V2):=20IBKR=20WebSocket=20=E2=80=94=20?= =?UTF-8?q?fix=20stop/start=20cycle,=20guard=20rails,=20log=20disconnect?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code review fixes (commit 17700d2): - _stopped reset on start() (was stuck True after stop→start) - _require_started guard on subscribe_*/unsubscribe (clear WSError vs AttributeError) - _reader_loop logs disconnect via logger.warning + sets _ws=None for `connected` signal - Class docstring documents stale-snapshot behavior + deferred reconnect - New tests: subscribe-before-start, stop→start cycle resumption Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cerbero_mcp/exchanges/ibkr/ws.py | 26 ++++++++++++++++++- tests/unit/exchanges/ibkr/test_ws.py | 38 ++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/cerbero_mcp/exchanges/ibkr/ws.py b/src/cerbero_mcp/exchanges/ibkr/ws.py index 06b7e22..6e34a64 100644 --- a/src/cerbero_mcp/exchanges/ibkr/ws.py +++ b/src/cerbero_mcp/exchanges/ibkr/ws.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import contextlib import json +import logging import time from dataclasses import dataclass, field from typing import Any @@ -12,6 +13,8 @@ from websockets import connect as websockets_connect # exposed for tests from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner +logger = logging.getLogger(__name__) + class WSError(Exception): """WebSocket layer error.""" @@ -39,6 +42,15 @@ _SMD_FIELDS = ["31", "84", "86", "7295", "7296"] @dataclass class IBKRWebSocket: + """Persistent WSS to IBKR Client Portal with smd/sbd subs. + + Snapshot lifetime: each (tick|depth) cache entry is overwritten on every + incoming message. On disconnect, the reader loop logs and exits leaving + the existing cache intact. Consumers should check `connected` before + trusting a stale snapshot, or compare `timestamp_ms` against wall clock. + Automatic reconnect is deferred to a follow-up; V1 surfaces disconnects + via `connected=False` so the higher-level tool layer can rebuild the WS. + """ signer: OAuth1aSigner ws_url: str base_url: str @@ -63,6 +75,7 @@ class IBKRWebSocket: async def start(self) -> None: if self.connected: return + self._stopped = False # reset on every start (supports stop→start cycles) lst = await self.signer.get_live_session_token(base_url=self.base_url) self._ws = await websockets_connect( self.ws_url, @@ -87,6 +100,7 @@ class IBKRWebSocket: self._ws = None async def subscribe_tick(self, conid: int, *, forced: bool = False) -> None: + self._require_started() await self._ensure_capacity(conid) if conid in self._subs: self._last_polled_at[conid] = time.monotonic() @@ -103,6 +117,7 @@ class IBKRWebSocket: async def subscribe_depth( self, conid: int, *, exchange: str = "SMART", rows: int = 5 ) -> None: + self._require_started() await self._ensure_capacity(conid) if conid in self._depth_subs: self._last_polled_at[conid] = time.monotonic() @@ -113,6 +128,7 @@ class IBKRWebSocket: self._last_polled_at[conid] = time.monotonic() async def unsubscribe(self, conid: int) -> None: + self._require_started() if conid in self._subs: await self._ws.send(f"umd+{conid}+{{}}") self._subs.discard(conid) @@ -151,6 +167,10 @@ class IBKRWebSocket: "timestamp_ms": snap.timestamp_ms, } + def _require_started(self) -> None: + if self._ws is None: + raise WSError("IBKR_WS_NOT_STARTED: call start() first") + async def _ensure_capacity(self, conid: int) -> None: if (conid in self._subs) or (conid in self._depth_subs): return @@ -173,7 +193,11 @@ class IBKRWebSocket: self._on_depth(topic, msg) except asyncio.CancelledError: raise - except Exception: + except Exception as exc: + # Disconnect / parse error / network — leave cache as-is, mark dead. + # V1: no automatic reconnect; consumers detect via stale timestamp_ms. + logger.warning("ibkr ws reader exited: %s", exc) + self._ws = None return def _on_tick(self, topic: str, msg: dict) -> None: diff --git a/tests/unit/exchanges/ibkr/test_ws.py b/tests/unit/exchanges/ibkr/test_ws.py index 47770b1..ae76d4d 100644 --- a/tests/unit/exchanges/ibkr/test_ws.py +++ b/tests/unit/exchanges/ibkr/test_ws.py @@ -84,3 +84,41 @@ async def test_subscribe_limit(fake_signer, monkeypatch): with pytest.raises(WSError, match="IBKR_WS_SUB_LIMIT"): await ws.subscribe_tick(3) await ws.stop() + + +@pytest.mark.asyncio +async def test_subscribe_before_start_raises(fake_signer): + ws = IBKRWebSocket( + signer=fake_signer, + ws_url="wss://x", base_url="https://x", + max_subs=10, idle_timeout_s=300, + ) + with pytest.raises(WSError, match="IBKR_WS_NOT_STARTED"): + await ws.subscribe_tick(1) + + +@pytest.mark.asyncio +async def test_start_after_stop_resumes_reader(fake_signer, monkeypatch): + fake_ws_a = FakeWS() + fake_ws_b = FakeWS() + fakes = iter([fake_ws_a, fake_ws_b]) + + async def fake_connect(url, **kw): + return next(fakes) + + monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect) + + ws = IBKRWebSocket( + signer=fake_signer, + ws_url="wss://x", base_url="https://x", + max_subs=10, idle_timeout_s=300, + ) + await ws.start() + await ws.stop() + # Restart with fresh fake_ws_b + await ws.start() + await ws.subscribe_tick(42) + await fake_ws_b.push({"topic": "smd+42", "31": "100"}) + await asyncio.sleep(0.05) + assert ws.get_tick_snapshot(42) is not None + await ws.stop()