"""IBKR Client Portal Web API client (REST httpx + OAuth1a).""" from __future__ import annotations import logging import time from collections import OrderedDict from dataclasses import dataclass, field from typing import Any import httpx from cerbero_mcp.common.http import async_client from cerbero_mcp.exchanges.ibkr.oauth import ( IBKRAuthError, OAuth1aSigner, _percent_encode, ) logger = logging.getLogger(__name__) class IBKRError(Exception): """Generic IBKR API error (non-auth).""" _TICKLE_INTERVAL_S = 240.0 # tickle if last call > 4min ago @dataclass class IBKRClient: signer: OAuth1aSigner account_id: str paper: bool = True base_url: str = "https://api.ibkr.com/v1/api" _conid_cache: OrderedDict[str, int] = field( default_factory=OrderedDict, init=False, repr=False ) _last_request_at: float = field(default=0.0, init=False, repr=False) _http: httpx.AsyncClient | None = field(default=None, init=False, repr=False) _CONID_CACHE_MAX = 1024 def __post_init__(self) -> None: # IBKR Client Portal gateway latency is higher than crypto exchanges # (lookup roundtrip + session validation); 30s matches Alpaca's choice. self._http = async_client(timeout=30.0) async def aclose(self) -> None: if self._http and not self._http.is_closed: await self._http.aclose() async def health(self) -> dict[str, Any]: return {"status": "ok", "paper": self.paper} def is_testnet(self) -> dict[str, Any]: return {"testnet": self.paper, "base_url": self.base_url} async def _build_auth_header(self, method: str, url: str) -> str: await self.signer.get_live_session_token(base_url=self.base_url) params = self.signer.make_oauth_params() params["oauth_signature_method"] = "HMAC-SHA256" sig = self.signer.sign_with_lst(method, url, params) params["oauth_signature"] = sig return "OAuth realm=\"limited_poa\", " + ", ".join( f'{k}="{_percent_encode(v)}"' for k, v in sorted(params.items()) ) async def _maybe_tickle(self) -> None: if time.monotonic() - self._last_request_at < _TICKLE_INTERVAL_S: return if self._http is None: # pragma: no cover return try: url = f"{self.base_url}/tickle" auth = await self._build_auth_header("POST", url) await self._http.post(url, headers={"Authorization": auth}) except Exception as exc: # Best-effort: failure shouldn't block the real request, but log # so misconfigured signer / dead session aren't invisible. logger.debug("ibkr tickle best-effort failed: %s", exc) async def _force_lst_refresh(self) -> None: """Invalidate cached LST and remint on next call.""" self.signer._live_session_token = None self.signer._lst_expires_at = 0.0 async def _request( self, method: str, path: str, *, params: dict[str, Any] | None = None, json_body: dict[str, Any] | None = None, skip_tickle: bool = False, _retried_auth: bool = False, ) -> Any: if self._http is None: # pragma: no cover — set in __post_init__ raise IBKRError("http client not initialized") if not skip_tickle: await self._maybe_tickle() url = f"{self.base_url}{path}" auth = await self._build_auth_header(method, url) clean_params = ( {k: v for k, v in params.items() if v is not None} if params else None ) resp = await self._http.request( method, url, params=clean_params or None, json=json_body, headers={"Authorization": auth, "User-Agent": "cerbero-mcp/2.0"}, ) self._last_request_at = time.monotonic() if resp.status_code == 401 and not _retried_auth: # Retry once with fresh LST (per spec: IBKR_AUTH_FAILED retryable) logger.info("ibkr 401 on %s %s — refreshing LST and retrying once", method, path) await self._force_lst_refresh() return await self._request( method, path, params=params, json_body=json_body, skip_tickle=skip_tickle, _retried_auth=True, ) if resp.status_code == 401: raise IBKRAuthError(f"401 on {method} {path} (after retry): {resp.text[:200]}") if resp.status_code == 429: raise IBKRError(f"IBKR_RATE_LIMITED: {resp.text[:200]}") if resp.status_code >= 500: raise IBKRError(f"IBKR_SERVER_ERROR status={resp.status_code}") if resp.status_code >= 400: raise IBKRError(f"IBKR_HTTP_{resp.status_code}: {resp.text[:300]}") if not resp.content: return {} return resp.json() async def get_account(self) -> dict: return await self._request("GET", f"/portfolio/{self.account_id}/summary")