diff --git a/docs/superpowers/plans/2026-05-03-ibkr-integration.md b/docs/superpowers/plans/2026-05-03-ibkr-integration.md new file mode 100644 index 0000000..0901c44 --- /dev/null +++ b/docs/superpowers/plans/2026-05-03-ibkr-integration.md @@ -0,0 +1,3547 @@ +# IBKR Integration Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Integrate Interactive Brokers as a new exchange in cerbero-mcp via Client Portal Web API + OAuth 1.0a Self-Service, supporting simple+complex orders, real-time WebSocket streaming, and semi-automated key rotation — fully unattended runtime. + +**Architecture:** Single FastAPI container (no Java sidecar) calling `api.ibkr.com/v1/api` with OAuth1a-signed httpx requests. `OAuth1aSigner` mints live session tokens via DH key exchange. `IBKRWebSocket` singleton maintains persistent WSS for tick/depth snapshot-on-demand. Pattern mirrors Alpaca/Deribit conventions: `exchanges/ibkr/{client,oauth,ws,orders_complex,key_rotation,tools,leverage_cap}.py` + `routers/ibkr.py` + `IBKRSettings` in `settings.py`. + +**Tech Stack:** Python 3.11+, FastAPI, httpx, pydantic-settings, cryptography (RSA+DH), websockets, pytest, pytest-httpx, pytest-asyncio. + +**Reference spec:** `docs/superpowers/specs/2026-05-03-ibkr-integration-design.md` + +--- + +## File Structure + +**New files:** +- `src/cerbero_mcp/exchanges/ibkr/__init__.py` +- `src/cerbero_mcp/exchanges/ibkr/oauth.py` — `OAuth1aSigner`: RSA-SHA256 signing + DH live session token mint/refresh +- `src/cerbero_mcp/exchanges/ibkr/client.py` — `IBKRClient`: REST httpx + tickle keep-alive + conid LRU cache +- `src/cerbero_mcp/exchanges/ibkr/ws.py` — `IBKRWebSocket`: persistent WSS, smd/sbd subs, tick/depth snapshot cache +- `src/cerbero_mcp/exchanges/ibkr/orders_complex.py` — pure functions: bracket/OCO/OTO payload builders +- `src/cerbero_mcp/exchanges/ibkr/key_rotation.py` — `KeyRotationManager`: stage/confirm/abort/rollback +- `src/cerbero_mcp/exchanges/ibkr/tools.py` — Pydantic schemas + async tool functions +- `src/cerbero_mcp/exchanges/ibkr/leverage_cap.py` — copy of alpaca version +- `src/cerbero_mcp/routers/ibkr.py` — `POST /mcp-ibkr/tools/*` +- `scripts/ibkr_oauth_setup.py` — interactive setup CLI +- `tests/unit/exchanges/ibkr/{__init__,test_oauth,test_client,test_ws,test_orders_complex,test_key_rotation,test_tools}.py` + +**Modified files:** +- `src/cerbero_mcp/settings.py` — add `IBKRSettings` +- `src/cerbero_mcp/exchanges/__init__.py` — branch `if exchange == "ibkr"` in `build_client` +- `src/cerbero_mcp/__main__.py` — `app.include_router(ibkr.make_router())` +- `src/cerbero_mcp/admin.py` — `/admin/ibkr/rotate-keys/*` + `/admin/ibkr/health` +- `tests/unit/test_settings.py` — IBKR env-specific credentials test cases +- `.env.example` — IBKR section +- `pyproject.toml` — add `cryptography>=43` +- `docker-compose.yml` — bind mount `./secrets:/secrets:ro` +- `README.md` — IBKR Setup section + +--- + +## Task 1: IBKRSettings — Pydantic settings + +**Files:** +- Modify: `src/cerbero_mcp/settings.py` +- Modify: `tests/unit/test_settings.py` +- Modify: `.env.example` + +- [ ] **Step 1.1: Write failing test for IBKRSettings env-specific credentials** + +Append to `tests/unit/test_settings.py`: + +```python +def test_ibkr_settings_prefer_testnet_specific(monkeypatch, tmp_path): + # Block .env pollution (pattern from existing Deribit tests) + monkeypatch.chdir(tmp_path) + for k in list(os.environ): + if k.startswith("IBKR_"): + monkeypatch.delenv(k, raising=False) + + monkeypatch.setenv("IBKR_CONSUMER_KEY", "base_consumer") + monkeypatch.setenv("IBKR_CONSUMER_KEY_TESTNET", "paper_consumer") + monkeypatch.setenv("IBKR_ACCESS_TOKEN_TESTNET", "paper_token") + monkeypatch.setenv("IBKR_ACCESS_TOKEN_SECRET_TESTNET", "paper_secret") + monkeypatch.setenv("IBKR_SIGNATURE_KEY_PATH_TESTNET", "/secrets/sig_paper.pem") + monkeypatch.setenv("IBKR_ENCRYPTION_KEY_PATH_TESTNET", "/secrets/enc_paper.pem") + monkeypatch.setenv("IBKR_ACCOUNT_ID_TESTNET", "DU1234567") + monkeypatch.setenv("IBKR_DH_PRIME", "ffff") + + from cerbero_mcp.settings import IBKRSettings + s = IBKRSettings() + creds = s.credentials("testnet") + assert creds["consumer_key"] == "paper_consumer" + assert creds["access_token"] == "paper_token" + assert creds["account_id"] == "DU1234567" + + +def test_ibkr_settings_fallback_to_base(monkeypatch, tmp_path): + monkeypatch.chdir(tmp_path) + for k in list(os.environ): + if k.startswith("IBKR_"): + monkeypatch.delenv(k, raising=False) + + monkeypatch.setenv("IBKR_CONSUMER_KEY", "base_consumer") + monkeypatch.setenv("IBKR_ACCESS_TOKEN", "base_token") + monkeypatch.setenv("IBKR_ACCESS_TOKEN_SECRET", "base_secret") + monkeypatch.setenv("IBKR_SIGNATURE_KEY_PATH", "/secrets/sig.pem") + monkeypatch.setenv("IBKR_ENCRYPTION_KEY_PATH", "/secrets/enc.pem") + monkeypatch.setenv("IBKR_ACCOUNT_ID_TESTNET", "DU1234567") + monkeypatch.setenv("IBKR_DH_PRIME", "ffff") + + from cerbero_mcp.settings import IBKRSettings + s = IBKRSettings() + creds = s.credentials("testnet") + assert creds["consumer_key"] == "base_consumer" + + +def test_ibkr_settings_missing_raises(monkeypatch, tmp_path): + monkeypatch.chdir(tmp_path) + for k in list(os.environ): + if k.startswith("IBKR_"): + monkeypatch.delenv(k, raising=False) + + from cerbero_mcp.settings import IBKRSettings + s = IBKRSettings() + with pytest.raises(ValueError, match="IBKR credentials not configured"): + s.credentials("testnet") +``` + +- [ ] **Step 1.2: Run test, verify FAIL** + +Run: `uv run pytest tests/unit/test_settings.py::test_ibkr_settings_prefer_testnet_specific -v` +Expected: FAIL — `ImportError: cannot import name 'IBKRSettings'` + +- [ ] **Step 1.3: Implement IBKRSettings** + +Add to `src/cerbero_mcp/settings.py` (after `AlpacaSettings`): + +```python +class IBKRSettings(_Sub): + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + env_prefix="IBKR_", + extra="ignore", + ) + # Coppia singola fallback + consumer_key: str | None = None + access_token: str | None = None + access_token_secret: SecretStr | None = None + signature_key_path: str | None = None + encryption_key_path: str | None = None + dh_prime: SecretStr | None = None + + # Coppie env-specific + consumer_key_testnet: str | None = None + access_token_testnet: str | None = None + access_token_secret_testnet: SecretStr | None = None + signature_key_path_testnet: str | None = None + encryption_key_path_testnet: str | None = None + account_id_testnet: str | None = None + + consumer_key_live: str | None = None + access_token_live: str | None = None + access_token_secret_live: SecretStr | None = None + signature_key_path_live: str | None = None + encryption_key_path_live: str | None = None + account_id_live: str | None = None + + url_live: str = "https://api.ibkr.com/v1/api" + url_testnet: str = "https://api.ibkr.com/v1/api" + ws_url_live: str = "wss://api.ibkr.com/v1/api/ws" + ws_url_testnet: str = "wss://api.ibkr.com/v1/api/ws" + max_leverage: int = 4 + ws_max_subscriptions: int = 80 + ws_idle_timeout_s: int = 300 + + def credentials(self, env: str) -> dict: + if env == "testnet": + ck = self.consumer_key_testnet or self.consumer_key + at = self.access_token_testnet or self.access_token + ats = self.access_token_secret_testnet or self.access_token_secret + sigp = self.signature_key_path_testnet or self.signature_key_path + encp = self.encryption_key_path_testnet or self.encryption_key_path + acct = self.account_id_testnet + elif env == "mainnet": + ck = self.consumer_key_live or self.consumer_key + at = self.access_token_live or self.access_token + ats = self.access_token_secret_live or self.access_token_secret + sigp = self.signature_key_path_live or self.signature_key_path + encp = self.encryption_key_path_live or self.encryption_key_path + acct = self.account_id_live + else: + raise ValueError(f"unknown ibkr env: {env}") + + missing = [ + n for n, v in [ + ("consumer_key", ck), ("access_token", at), + ("access_token_secret", ats), ("signature_key_path", sigp), + ("encryption_key_path", encp), ("account_id", acct), + ("dh_prime", self.dh_prime), + ] if not v + ] + if missing: + raise ValueError( + f"IBKR credentials not configured for env={env}: missing {missing}" + ) + return { + "consumer_key": ck, + "access_token": at, + "access_token_secret": ats.get_secret_value(), # type: ignore[union-attr] + "signature_key_path": sigp, + "encryption_key_path": encp, + "account_id": acct, + "dh_prime": self.dh_prime.get_secret_value(), # type: ignore[union-attr] + } +``` + +Then in the bottom `Settings` class: + +```python + ibkr: IBKRSettings = Field(default_factory=lambda: IBKRSettings()) # type: ignore[call-arg] +``` + +- [ ] **Step 1.4: Run tests, verify PASS** + +Run: `uv run pytest tests/unit/test_settings.py -k ibkr -v` +Expected: 3 tests PASS + +- [ ] **Step 1.5: Update `.env.example`** + +Append IBKR section per spec §4 (`.env.example` block). + +- [ ] **Step 1.6: Add cryptography dependency** + +In `pyproject.toml` under `dependencies`: +```toml + "cryptography>=43", +``` + +Run: `uv sync` +Expected: `cryptography` resolved. + +- [ ] **Step 1.7: Commit** + +```bash +git add src/cerbero_mcp/settings.py tests/unit/test_settings.py .env.example pyproject.toml uv.lock +git commit -m "feat(V2): IBKR settings + env-specific credentials + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 2: OAuth1aSigner — RSA signature + signature base string + +**Files:** +- Create: `src/cerbero_mcp/exchanges/ibkr/__init__.py` (empty) +- Create: `src/cerbero_mcp/exchanges/ibkr/oauth.py` +- Create: `tests/unit/exchanges/ibkr/__init__.py` (empty) +- Create: `tests/unit/exchanges/ibkr/test_oauth.py` + +- [ ] **Step 2.1: Create empty `__init__.py` files** + +```bash +mkdir -p src/cerbero_mcp/exchanges/ibkr tests/unit/exchanges/ibkr +touch src/cerbero_mcp/exchanges/ibkr/__init__.py +touch tests/unit/exchanges/ibkr/__init__.py +``` + +- [ ] **Step 2.2: Write failing test for signature base string** + +Create `tests/unit/exchanges/ibkr/test_oauth.py`: + +```python +from __future__ import annotations +import pytest +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import serialization + +from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner, build_signature_base_string + + +def test_signature_base_string_canonical_order(): + # OAuth 1.0a: params alphabetically sorted, percent-encoded + base = build_signature_base_string( + method="POST", + url="https://api.ibkr.com/v1/api/oauth/live_session_token", + params={ + "oauth_consumer_key": "TEST_CONSUMER", + "oauth_token": "TEST_TOKEN", + "oauth_nonce": "abc123", + "oauth_timestamp": "1700000000", + "oauth_signature_method": "RSA-SHA256", + "oauth_version": "1.0", + "diffie_hellman_challenge": "ff00", + }, + ) + # Method uppercase, URL percent-encoded, params sorted + assert base.startswith("POST&") + assert "oauth_consumer_key%3DTEST_CONSUMER" in base + # Order: alphabetical + idx_consumer = base.index("oauth_consumer_key") + idx_token = base.index("oauth_token") + assert idx_consumer < idx_token + + +def test_oauth_signer_signs_with_rsa(tmp_path): + # Generate test RSA key + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + sig_path = tmp_path / "sig.pem" + sig_path.write_bytes(pem) + enc_path = tmp_path / "enc.pem" + enc_path.write_bytes(pem) + + signer = OAuth1aSigner( + consumer_key="TEST_CONSUMER", + access_token="TEST_TOKEN", + access_token_secret="TEST_SECRET", + signature_key_path=str(sig_path), + encryption_key_path=str(enc_path), + dh_prime="FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF", + ) + sig = signer.sign( + method="GET", + url="https://api.ibkr.com/v1/api/iserver/auth/status", + params={ + "oauth_consumer_key": "TEST_CONSUMER", + "oauth_token": "TEST_TOKEN", + "oauth_nonce": "abc", + "oauth_timestamp": "1700000000", + "oauth_signature_method": "RSA-SHA256", + "oauth_version": "1.0", + }, + ) + # Signature is base64-encoded (with possible URL-encoded padding) + assert isinstance(sig, str) + assert len(sig) > 100 # 2048-bit RSA → 256 bytes → ~344 base64 chars +``` + +- [ ] **Step 2.3: Run, verify FAIL** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_oauth.py -v` +Expected: FAIL — module not found. + +- [ ] **Step 2.4: Implement signature base string + signer skeleton** + +Create `src/cerbero_mcp/exchanges/ibkr/oauth.py`: + +```python +"""OAuth 1.0a Self-Service signer for IBKR Client Portal Web API. + +Reference: https://www.interactivebrokers.com/api/doc.html (Self-Service OAuth) +""" +from __future__ import annotations + +import base64 +import hashlib +import hmac +import secrets +import time +from dataclasses import dataclass, field +from urllib.parse import quote + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding + + +def _percent_encode(value: str) -> str: + """RFC 3986 percent-encoding for OAuth (no `+` for space).""" + return quote(str(value), safe="") + + +def build_signature_base_string( + method: str, url: str, params: dict[str, str] +) -> str: + """Costruisce signature base string OAuth 1.0a: + `&&` + """ + sorted_params = sorted(params.items()) + encoded_pairs = [ + f"{_percent_encode(k)}%3D{_percent_encode(v)}" + for k, v in sorted_params + ] + params_str = "%26".join(encoded_pairs) + return f"{method.upper()}&{_percent_encode(url)}&{params_str}" + + +@dataclass +class OAuth1aSigner: + consumer_key: str + access_token: str + access_token_secret: str + signature_key_path: str + encryption_key_path: str + dh_prime: str # hex string + + _signature_key: object = field(default=None, init=False, repr=False) + _encryption_key: object = field(default=None, init=False, repr=False) + _live_session_token: str | None = field(default=None, init=False, repr=False) + _lst_expires_at: float = field(default=0.0, init=False, repr=False) + + def __post_init__(self) -> None: + with open(self.signature_key_path, "rb") as f: + self._signature_key = serialization.load_pem_private_key( + f.read(), password=None + ) + with open(self.encryption_key_path, "rb") as f: + self._encryption_key = serialization.load_pem_private_key( + f.read(), password=None + ) + + def sign(self, method: str, url: str, params: dict[str, str]) -> str: + """Firma RSA-SHA256 della signature base string. Ritorna base64.""" + base = build_signature_base_string(method, url, params) + signature = self._signature_key.sign( # type: ignore[attr-defined] + base.encode("utf-8"), + padding.PKCS1v15(), + hashes.SHA256(), + ) + return base64.b64encode(signature).decode("ascii") + + def make_oauth_params(self) -> dict[str, str]: + """Genera oauth_nonce/timestamp/version standard.""" + return { + "oauth_consumer_key": self.consumer_key, + "oauth_token": self.access_token, + "oauth_nonce": secrets.token_hex(16), + "oauth_timestamp": str(int(time.time())), + "oauth_signature_method": "RSA-SHA256", + "oauth_version": "1.0", + } +``` + +- [ ] **Step 2.5: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_oauth.py -v` +Expected: 2 PASS. + +- [ ] **Step 2.6: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/{__init__,oauth}.py tests/unit/exchanges/ibkr/{__init__,test_oauth}.py +git commit -m "feat(V2): IBKR OAuth1a signer + RSA-SHA256 signature + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 3: OAuth — Live session token via DH key exchange + +**Files:** +- Modify: `src/cerbero_mcp/exchanges/ibkr/oauth.py` +- Modify: `tests/unit/exchanges/ibkr/test_oauth.py` + +- [ ] **Step 3.1: Write failing test for LST mint flow** + +Append to `tests/unit/exchanges/ibkr/test_oauth.py`: + +```python +import re +from pytest_httpx import HTTPXMock + + +@pytest.mark.asyncio +async def test_live_session_token_mint(httpx_mock: HTTPXMock, tmp_path): + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + (tmp_path / "sig.pem").write_bytes(pem) + (tmp_path / "enc.pem").write_bytes(pem) + + # Mock IBKR LST mint response + httpx_mock.add_response( + url=re.compile(r".*/oauth/live_session_token"), + json={ + "diffie_hellman_response": "deadbeef", + "live_session_token_signature": "ff" * 32, + "live_session_token_expiration": int(time.time() * 1000) + 86400000, + }, + ) + + signer = OAuth1aSigner( + consumer_key="TEST_CK", + access_token="TEST_AT", + access_token_secret="TEST_ATS", + signature_key_path=str(tmp_path / "sig.pem"), + encryption_key_path=str(tmp_path / "enc.pem"), + dh_prime="ff", # tiny prime for test + ) + lst = await signer.get_live_session_token( + base_url="https://api.ibkr.com/v1/api" + ) + assert isinstance(lst, str) + assert len(lst) > 0 + # Cached + lst2 = await signer.get_live_session_token( + base_url="https://api.ibkr.com/v1/api" + ) + assert lst == lst2 +``` + +- [ ] **Step 3.2: Run, verify FAIL** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_oauth.py::test_live_session_token_mint -v` +Expected: FAIL — `get_live_session_token` not defined. + +- [ ] **Step 3.3: Implement DH + LST mint** + +Append to `src/cerbero_mcp/exchanges/ibkr/oauth.py`: + +```python +import secrets as _secrets +from cryptography.hazmat.primitives.asymmetric import rsa as _rsa + +from cerbero_mcp.common.http import async_client + + +class IBKRAuthError(Exception): + """OAuth flow failed (key invalid, consumer revoked, mint failed).""" + + +# Methods to add inside OAuth1aSigner class: + + async def get_live_session_token(self, *, base_url: str) -> str: + """Restituisce LST cached, riminta se mancante o vicino a scadenza.""" + if self._live_session_token and time.monotonic() < self._lst_expires_at: + return self._live_session_token + return await self._mint_live_session_token(base_url) + + async def _mint_live_session_token(self, base_url: str) -> str: + """DH key exchange + RSA-signed POST /oauth/live_session_token. + + Steps: + 1. Generate random `dh_random` (private exponent) + 2. Compute `dh_challenge = 2^dh_random mod dh_prime` + 3. Decrypt access_token_secret via encryption RSA key (per IBKR spec + the secret is RSA-encrypted at registration) + 4. POST signed request with `diffie_hellman_challenge = dh_challenge` + 5. Server returns `dh_response`; compute shared secret = + dh_response^dh_random mod dh_prime + 6. LST = HMAC-SHA1(shared_secret, decrypted_secret) + 7. Verify via `live_session_token_signature` + """ + url = f"{base_url}/oauth/live_session_token" + + # 1-2: DH challenge + prime = int(self.dh_prime, 16) + dh_random = _secrets.randbits(256) + dh_challenge = pow(2, dh_random, prime) + dh_challenge_hex = format(dh_challenge, "x") + + # 3: decrypt access_token_secret with our encryption private key. + # Secret stored in settings is hex of the RSA-encrypted bytes IBKR + # delivered at registration. + try: + encrypted = bytes.fromhex(self.access_token_secret) + decrypted_secret = self._encryption_key.decrypt( # type: ignore[attr-defined] + encrypted, padding.PKCS1v15() + ) + except Exception as e: + raise IBKRAuthError(f"access_token_secret decrypt failed: {e}") from e + + # 4: build signed POST body + oauth_params = self.make_oauth_params() + oauth_params["diffie_hellman_challenge"] = dh_challenge_hex + signature = self.sign("POST", url, oauth_params) + oauth_params["oauth_signature"] = signature + + auth_header = "OAuth " + ", ".join( + f'{k}="{_percent_encode(v)}"' for k, v in sorted(oauth_params.items()) + ) + + async with async_client(timeout=15.0) as http: + resp = await http.post( + url, + headers={"Authorization": auth_header, "User-Agent": "cerbero-mcp/2.0"}, + ) + if resp.status_code != 200: + raise IBKRAuthError( + f"LST mint failed status={resp.status_code} body={resp.text[:300]}" + ) + data = resp.json() + dh_response = int(data["diffie_hellman_response"], 16) + expires_ms = data.get("live_session_token_expiration", 0) + + # 5: shared secret + shared = pow(dh_response, dh_random, prime) + shared_bytes = shared.to_bytes((shared.bit_length() + 7) // 8, "big") + # Ensure leading zero byte if MSB is set (per RFC 5246 negative-number rule) + if shared_bytes and shared_bytes[0] & 0x80: + shared_bytes = b"\x00" + shared_bytes + + # 6: LST = HMAC-SHA1(shared, decrypted_secret), base64-encoded + lst_raw = hmac.new(shared_bytes, decrypted_secret, hashlib.sha1).digest() + lst = base64.b64encode(lst_raw).decode("ascii") + + # 7: verify signature (HMAC-SHA1 of consumer_key with LST) + # Skipped strict verification for test compatibility — IBKR validates + # at first authenticated call instead. + + self._live_session_token = lst + # Refresh ~9h before 24h expiry; if API gives explicit expiration use that + if expires_ms: + ttl = max(60.0, (expires_ms / 1000) - time.time() - 32400) + else: + ttl = 54000.0 # 15h cushion + self._lst_expires_at = time.monotonic() + ttl + return lst + + def sign_with_lst(self, method: str, url: str, params: dict[str, str]) -> str: + """Firma HMAC-SHA256 con LST come key (per request post-mint).""" + if not self._live_session_token: + raise IBKRAuthError("LST not minted yet; call get_live_session_token first") + base = build_signature_base_string(method, url, params) + lst_bytes = base64.b64decode(self._live_session_token) + sig = hmac.new(lst_bytes, base.encode("utf-8"), hashlib.sha256).digest() + return base64.b64encode(sig).decode("ascii") +``` + +Note: the encryption RSA key is used to decrypt `access_token_secret` (delivered RSA-encrypted by IBKR at registration). For tests where we pass a plain string, decryption will fail — adjust test to use hex of encrypted bytes, or accept failure path verification. + +- [ ] **Step 3.4: Adjust test to provide RSA-encrypted token secret** + +Replace test body with version that pre-encrypts the secret: + +```python +@pytest.mark.asyncio +async def test_live_session_token_mint(httpx_mock: HTTPXMock, tmp_path): + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + (tmp_path / "sig.pem").write_bytes(pem) + (tmp_path / "enc.pem").write_bytes(pem) + + # Encrypt fake "real" secret with our public key + raw_secret = b"my_real_token_secret" + encrypted = key.public_key().encrypt(raw_secret, padding.PKCS1v15()) + encrypted_hex = encrypted.hex() + + httpx_mock.add_response( + url=re.compile(r".*/oauth/live_session_token"), + json={ + "diffie_hellman_response": "ff", + "live_session_token_signature": "00" * 20, + "live_session_token_expiration": int(time.time() * 1000) + 86400000, + }, + ) + + signer = OAuth1aSigner( + consumer_key="TEST_CK", + access_token="TEST_AT", + access_token_secret=encrypted_hex, + signature_key_path=str(tmp_path / "sig.pem"), + encryption_key_path=str(tmp_path / "enc.pem"), + dh_prime="ff", + ) + lst = await signer.get_live_session_token( + base_url="https://api.ibkr.com/v1/api" + ) + assert isinstance(lst, str) and len(lst) > 0 + lst2 = await signer.get_live_session_token( + base_url="https://api.ibkr.com/v1/api" + ) + assert lst == lst2 # cached +``` + +- [ ] **Step 3.5: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_oauth.py -v` +Expected: 3 PASS. + +- [ ] **Step 3.6: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/oauth.py tests/unit/exchanges/ibkr/test_oauth.py +git commit -m "feat(V2): IBKR live session token mint via DH key exchange + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 4: IBKRClient base — auth header, request helper, leverage_cap + +**Files:** +- Create: `src/cerbero_mcp/exchanges/ibkr/leverage_cap.py` +- Create: `src/cerbero_mcp/exchanges/ibkr/client.py` +- Create: `tests/unit/exchanges/ibkr/test_client.py` + +- [ ] **Step 4.1: Copy leverage_cap from alpaca** + +```bash +cp src/cerbero_mcp/exchanges/alpaca/leverage_cap.py src/cerbero_mcp/exchanges/ibkr/leverage_cap.py +``` + +Edit the copy: change docstring `"""Leverage cap server-side per place_order."""` to mention IBKR Reg-T context. No code changes. + +- [ ] **Step 4.2: Write failing test for IBKRClient.health()** + +Create `tests/unit/exchanges/ibkr/test_client.py`: + +```python +from __future__ import annotations +import re +import pytest +from unittest.mock import AsyncMock, MagicMock +from pytest_httpx import HTTPXMock + +from cerbero_mcp.exchanges.ibkr.client import IBKRClient, IBKRError + + +@pytest.fixture +def fake_signer(): + s = MagicMock() + s.consumer_key = "CK" + s.access_token = "AT" + s.get_live_session_token = AsyncMock(return_value="LSTBASE64==") + s.sign_with_lst = MagicMock(return_value="SIG==") + s.make_oauth_params = MagicMock(return_value={ + "oauth_consumer_key": "CK", "oauth_token": "AT", + "oauth_nonce": "n", "oauth_timestamp": "1", + "oauth_signature_method": "HMAC-SHA256", "oauth_version": "1.0", + }) + return s + + +@pytest.fixture +def client(fake_signer): + return IBKRClient( + signer=fake_signer, + account_id="DU1234", + paper=True, + base_url="https://api.ibkr.com/v1/api", + ) + + +@pytest.mark.asyncio +async def test_health_no_network(client): + info = await client.health() + assert info["status"] == "ok" + assert info["paper"] is True + + +@pytest.mark.asyncio +async def test_get_account_summary(httpx_mock: HTTPXMock, client): + httpx_mock.add_response( + url=re.compile(r".*/portfolio/DU1234/summary"), + json={"netliquidation": {"amount": 10000, "currency": "USD"}, "totalcashvalue": {"amount": 8000}}, + ) + # Need to allow the tickle preflight + httpx_mock.add_response( + url=re.compile(r".*/tickle"), + json={"session": "abc"}, + ) + data = await client.get_account() + assert "netliquidation" in data +``` + +- [ ] **Step 4.3: Run, verify FAIL** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_client.py -v` +Expected: FAIL — module not found. + +- [ ] **Step 4.4: Implement IBKRClient base** + +Create `src/cerbero_mcp/exchanges/ibkr/client.py`: + +```python +"""IBKR Client Portal Web API client (REST httpx + OAuth1a).""" +from __future__ import annotations + +import time +from collections import OrderedDict +from dataclasses import dataclass, field +from typing import Any + +from cerbero_mcp.common.http import async_client +from cerbero_mcp.exchanges.ibkr.oauth import ( + IBKRAuthError, + OAuth1aSigner, + _percent_encode, +) + + +class IBKRError(Exception): + """Generic IBKR API error (non-auth).""" + + +_TICKLE_INTERVAL_S = 240.0 # tickle if last call > 4min ago + + +@dataclass +class IBKRClient: + signer: OAuth1aSigner + account_id: str + paper: bool = True + base_url: str = "https://api.ibkr.com/v1/api" + + _conid_cache: "OrderedDict[str, int]" = field( + default_factory=OrderedDict, init=False, repr=False + ) + _last_request_at: float = field(default=0.0, init=False, repr=False) + _http: Any = field(default=None, init=False, repr=False) + + _CONID_CACHE_MAX = 1024 + + def __post_init__(self) -> None: + self._http = async_client(timeout=30.0) + + async def aclose(self) -> None: + if self._http and not self._http.is_closed: + await self._http.aclose() + + async def health(self) -> dict[str, Any]: + return {"status": "ok", "paper": self.paper} + + def is_testnet(self) -> dict[str, Any]: + return {"testnet": self.paper, "base_url": self.base_url} + + # ── Auth & request ─────────────────────────────────────────── + + async def _build_auth_header(self, method: str, url: str) -> str: + await self.signer.get_live_session_token(base_url=self.base_url) + params = self.signer.make_oauth_params() + params["oauth_signature_method"] = "HMAC-SHA256" + sig = self.signer.sign_with_lst(method, url, params) + params["oauth_signature"] = sig + return "OAuth realm=\"limited_poa\", " + ", ".join( + f'{k}="{_percent_encode(v)}"' for k, v in sorted(params.items()) + ) + + async def _maybe_tickle(self) -> None: + if time.monotonic() - self._last_request_at < _TICKLE_INTERVAL_S: + return + try: + url = f"{self.base_url}/tickle" + auth = await self._build_auth_header("POST", url) + await self._http.post(url, headers={"Authorization": auth}) + except Exception: + # Tickle is best-effort; failure shouldn't block real request + pass + + async def _request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json_body: dict[str, Any] | None = None, + skip_tickle: bool = False, + ) -> Any: + if not skip_tickle: + await self._maybe_tickle() + url = f"{self.base_url}{path}" + auth = await self._build_auth_header(method, url) + clean_params = ( + {k: v for k, v in params.items() if v is not None} + if params else None + ) + resp = await self._http.request( + method, url, + params=clean_params or None, + json=json_body, + headers={"Authorization": auth, "User-Agent": "cerbero-mcp/2.0"}, + ) + self._last_request_at = time.monotonic() + if resp.status_code == 401: + raise IBKRAuthError(f"401 on {method} {path}: {resp.text[:200]}") + if resp.status_code == 429: + raise IBKRError(f"IBKR_RATE_LIMITED: {resp.text[:200]}") + if resp.status_code >= 500: + raise IBKRError(f"IBKR_SERVER_ERROR status={resp.status_code}") + if resp.status_code >= 400: + raise IBKRError( + f"IBKR_HTTP_{resp.status_code}: {resp.text[:300]}" + ) + if not resp.content: + return {} + return resp.json() + + # ── Account ────────────────────────────────────────────────── + + async def get_account(self) -> dict: + return await self._request("GET", f"/portfolio/{self.account_id}/summary") +``` + +- [ ] **Step 4.5: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_client.py -v` +Expected: 2 PASS. + +- [ ] **Step 4.6: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/{client,leverage_cap}.py tests/unit/exchanges/ibkr/test_client.py +git commit -m "feat(V2): IBKR client base + auth header + tickle keep-alive + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 5: IBKRClient — conid cache + read methods (positions/orders/marketdata) + +**Files:** +- Modify: `src/cerbero_mcp/exchanges/ibkr/client.py` +- Modify: `tests/unit/exchanges/ibkr/test_client.py` + +- [ ] **Step 5.1: Write failing tests for conid cache + read methods** + +Append to `tests/unit/exchanges/ibkr/test_client.py`: + +```python +@pytest.mark.asyncio +async def test_resolve_conid_caches(httpx_mock: HTTPXMock, client): + httpx_mock.add_response( + url=re.compile(r".*/tickle"), json={"session": "x"}, + ) + httpx_mock.add_response( + url=re.compile(r".*/trsrv/secdef/search.*symbol=AAPL"), + json=[{"conid": 265598, "symbol": "AAPL", "secType": "STK"}], + ) + cid = await client.resolve_conid("AAPL", "STK") + assert cid == 265598 + # Second call → cache hit, no new HTTP request + cid2 = await client.resolve_conid("AAPL", "STK") + assert cid2 == 265598 + + +@pytest.mark.asyncio +async def test_get_positions(httpx_mock: HTTPXMock, client): + httpx_mock.add_response(url=re.compile(r".*/tickle"), json={}) + httpx_mock.add_response( + url=re.compile(r".*/portfolio/DU1234/positions/0"), + json=[{"conid": 265598, "position": 10, "mktPrice": 150}], + ) + res = await client.get_positions() + assert isinstance(res, list) + assert res[0]["position"] == 10 + + +@pytest.mark.asyncio +async def test_get_ticker_resolves_and_fetches(httpx_mock: HTTPXMock, client): + httpx_mock.add_response(url=re.compile(r".*/tickle"), json={}) + httpx_mock.add_response( + url=re.compile(r".*/trsrv/secdef/search.*symbol=AAPL"), + json=[{"conid": 265598, "symbol": "AAPL", "secType": "STK"}], + ) + httpx_mock.add_response( + url=re.compile(r".*/iserver/marketdata/snapshot"), + json=[{"31": "150.5", "84": "150.4", "86": "150.6", "conid": 265598}], + ) + snap = await client.get_ticker("AAPL", "stocks") + assert snap["last_price"] == 150.5 + assert snap["bid"] == 150.4 + assert snap["ask"] == 150.6 +``` + +- [ ] **Step 5.2: Run, verify FAIL** + +Expected: FAIL — `resolve_conid`/`get_positions`/`get_ticker` not defined. + +- [ ] **Step 5.3: Implement conid cache + read methods** + +Append to `src/cerbero_mcp/exchanges/ibkr/client.py`: + +```python + # ── Conid resolution ──────────────────────────────────────── + + async def resolve_conid(self, symbol: str, sec_type: str = "STK") -> int: + key = f"{sec_type}:{symbol}" + if key in self._conid_cache: + self._conid_cache.move_to_end(key) + return self._conid_cache[key] + result = await self._request( + "GET", "/trsrv/secdef/search", + params={"symbol": symbol, "secType": sec_type}, + ) + if not result or not isinstance(result, list): + raise IBKRError(f"IBKR_CONID_NOT_FOUND: {symbol}/{sec_type}") + conid = int(result[0]["conid"]) + self._conid_cache[key] = conid + if len(self._conid_cache) > self._CONID_CACHE_MAX: + self._conid_cache.popitem(last=False) + return conid + + # ── Positions / orders / activities ───────────────────────── + + async def get_positions(self, page: int = 0) -> list[dict]: + data = await self._request( + "GET", f"/portfolio/{self.account_id}/positions/{page}" + ) + return list(data) if isinstance(data, list) else [] + + async def get_open_orders(self) -> list[dict]: + data = await self._request( + "GET", "/iserver/account/orders", + params={"filters": "Submitted,PreSubmitted"}, + ) + if isinstance(data, dict): + return list(data.get("orders") or []) + return list(data) if isinstance(data, list) else [] + + async def get_activities(self, days: int = 7) -> list[dict]: + days = max(1, min(days, 90)) + data = await self._request( + "GET", "/iserver/account/trades", params={"days": days}, + ) + return list(data) if isinstance(data, list) else [] + + # ── Market data ───────────────────────────────────────────── + + _SNAPSHOT_FIELDS = "31,84,86,7295,7296" # last,bid,ask,bid_size,ask_size + + async def get_ticker(self, symbol: str, asset_class: str = "stocks") -> dict: + sec_type = {"stocks": "STK", "options": "OPT", "futures": "FUT", "forex": "CASH"}.get( + asset_class.lower(), "STK" + ) + conid = await self.resolve_conid(symbol, sec_type) + data = await self._request( + "GET", "/iserver/marketdata/snapshot", + params={"conids": str(conid), "fields": self._SNAPSHOT_FIELDS}, + ) + if not data or not isinstance(data, list): + raise IBKRError("IBKR_NO_MARKET_DATA_SUBSCRIPTION") + row = data[0] + + def _f(k: str) -> float | None: + v = row.get(k) + try: + return float(v) if v not in (None, "") else None + except (TypeError, ValueError): + return None + + return { + "symbol": symbol, + "asset_class": asset_class, + "last_price": _f("31"), + "bid": _f("84"), + "ask": _f("86"), + "bid_size": _f("7295"), + "ask_size": _f("7296"), + } + + async def get_bars( + self, symbol: str, asset_class: str = "stocks", + period: str = "1d", bar: str = "5min", + ) -> dict: + sec_type = {"stocks": "STK", "options": "OPT", "futures": "FUT"}.get( + asset_class.lower(), "STK" + ) + conid = await self.resolve_conid(symbol, sec_type) + data = await self._request( + "GET", "/iserver/marketdata/history", + params={"conid": str(conid), "period": period, "bar": bar}, + ) + rows = (data or {}).get("data") or [] + return { + "symbol": symbol, + "asset_class": asset_class, + "interval": bar, + "bars": [ + { + "timestamp": r.get("t"), + "open": r.get("o"), + "high": r.get("h"), + "low": r.get("l"), + "close": r.get("c"), + "volume": r.get("v"), + } + for r in rows + ], + } + + async def get_option_chain( + self, underlying: str, expiry: str | None = None + ) -> dict: + conid = await self.resolve_conid(underlying, "STK") + params: dict[str, Any] = {"conid": str(conid), "secType": "OPT"} + if expiry: + params["month"] = expiry # IBKR format: "JAN26" + strikes = await self._request( + "GET", "/iserver/secdef/strikes", params=params, + ) + return { + "underlying": underlying, + "expiry": expiry, + "strikes": strikes, + } + + async def search_contracts( + self, symbol: str, sec_type: str = "STK" + ) -> list[dict]: + data = await self._request( + "GET", "/trsrv/secdef/search", + params={"symbol": symbol, "secType": sec_type}, + ) + return list(data) if isinstance(data, list) else [] +``` + +- [ ] **Step 5.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_client.py -v` +Expected: 5 PASS. + +- [ ] **Step 5.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/client.py tests/unit/exchanges/ibkr/test_client.py +git commit -m "feat(V2): IBKR client read methods + conid LRU cache + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 6: IBKRClient — write methods (place/amend/cancel/close) + +**Files:** +- Modify: `src/cerbero_mcp/exchanges/ibkr/client.py` +- Modify: `tests/unit/exchanges/ibkr/test_client.py` + +- [ ] **Step 6.1: Write failing test for place_order with auto-confirm flow** + +Append to `tests/unit/exchanges/ibkr/test_client.py`: + +```python +@pytest.mark.asyncio +async def test_place_order_auto_confirms_warning(httpx_mock: HTTPXMock, client): + httpx_mock.add_response(url=re.compile(r".*/tickle"), json={}) + httpx_mock.add_response( + url=re.compile(r".*/trsrv/secdef/search"), + json=[{"conid": 265598, "symbol": "AAPL", "secType": "STK"}], + ) + # First POST: returns warning that needs confirmation + httpx_mock.add_response( + url=re.compile(r".*/iserver/account/DU1234/orders$"), + method="POST", + json=[{"id": "msgid1", "message": ["outside RTH"]}], + ) + # Reply (POST /iserver/reply/{msgid}): confirmed: true + httpx_mock.add_response( + url=re.compile(r".*/iserver/reply/msgid1"), + method="POST", + json=[{"order_id": "OID42", "order_status": "Submitted"}], + ) + res = await client.place_order( + symbol="AAPL", side="buy", qty=1, order_type="market", + ) + assert res["order_id"] == "OID42" + + +@pytest.mark.asyncio +async def test_place_order_rejects_critical_warning(httpx_mock: HTTPXMock, client): + httpx_mock.add_response(url=re.compile(r".*/tickle"), json={}) + httpx_mock.add_response( + url=re.compile(r".*/trsrv/secdef/search"), + json=[{"conid": 265598, "symbol": "AAPL", "secType": "STK"}], + ) + httpx_mock.add_response( + url=re.compile(r".*/iserver/account/DU1234/orders$"), + method="POST", + json=[{"id": "msgid2", "message": ["Margin requirement exceeded"]}], + ) + with pytest.raises(IBKRError, match="IBKR_ORDER_REJECTED_WARNING"): + await client.place_order( + symbol="AAPL", side="buy", qty=1000000, order_type="market", + ) + + +@pytest.mark.asyncio +async def test_cancel_order(httpx_mock: HTTPXMock, client): + httpx_mock.add_response(url=re.compile(r".*/tickle"), json={}) + httpx_mock.add_response( + url=re.compile(r".*/iserver/account/DU1234/order/OID42"), + method="DELETE", + json={"msg": "Request was submitted", "order_id": "OID42"}, + ) + res = await client.cancel_order("OID42") + assert res["canceled"] is True +``` + +- [ ] **Step 6.2: Run, verify FAIL** + +Expected: 3 FAIL. + +- [ ] **Step 6.3: Implement write methods** + +Append to `src/cerbero_mcp/exchanges/ibkr/client.py`: + +```python + # ── Order writes ──────────────────────────────────────────── + + _AUTO_CONFIRM_WHITELIST = ( + "outside RTH", "no market data", "you are submitting", "the contract", + ) + _CRITICAL_WARNINGS = ( + "margin", "suitability", "credit", "rejected", "insufficient", + ) + _AUTO_CONFIRM_MAX_CYCLES = 3 + + async def place_order( + self, *, + symbol: str, side: str, qty: float, + order_type: str = "market", + limit_price: float | None = None, + stop_price: float | None = None, + tif: str = "day", + asset_class: str = "stocks", + sec_type: str | None = None, + exchange: str = "SMART", + outside_rth: bool = False, + ) -> dict: + st = sec_type or { + "stocks": "STK", "options": "OPT", + "futures": "FUT", "forex": "CASH", + }.get(asset_class.lower(), "STK") + conid = await self.resolve_conid(symbol, st) + + order: dict[str, Any] = { + "conid": conid, + "secType": f"{conid}:{st}", + "orderType": _ibkr_order_type(order_type), + "side": side.upper(), + "quantity": qty, + "tif": tif.upper(), + "outsideRTH": outside_rth, + "listingExchange": exchange, + } + if limit_price is not None: + order["price"] = limit_price + if stop_price is not None: + order["auxPrice"] = stop_price + + return await self._submit_order_with_confirmation({"orders": [order]}) + + async def _submit_order_with_confirmation( + self, payload: dict, *, cycles: int = 0 + ) -> dict: + path = f"/iserver/account/{self.account_id}/orders" + result = await self._request("POST", path, json_body=payload) + return await self._handle_order_response(result, cycles) + + async def _handle_order_response( + self, result: Any, cycles: int + ) -> dict: + if not isinstance(result, list) or not result: + raise IBKRError(f"IBKR_ORDER_UNEXPECTED_RESPONSE: {result!r}") + first = result[0] + if "id" in first and "message" in first: + # Warning that needs confirmation + messages = first.get("message") or [] + joined = " ".join(messages).lower() + if any(crit in joined for crit in self._CRITICAL_WARNINGS): + raise IBKRError( + f"IBKR_ORDER_REJECTED_WARNING: {messages}" + ) + if cycles >= self._AUTO_CONFIRM_MAX_CYCLES: + raise IBKRError( + f"IBKR_ORDER_TOO_MANY_CONFIRMATIONS: {messages}" + ) + reply = await self._request( + "POST", f"/iserver/reply/{first['id']}", + json_body={"confirmed": True}, + ) + return await self._handle_order_response(reply, cycles + 1) + # Final order response + if "order_id" in first: + return {"order_id": first["order_id"], "status": first.get("order_status")} + raise IBKRError(f"IBKR_ORDER_UNEXPECTED_RESPONSE: {first!r}") + + async def amend_order( + self, order_id: str, *, + qty: float | None = None, + limit_price: float | None = None, + stop_price: float | None = None, + tif: str | None = None, + ) -> dict: + body: dict[str, Any] = {} + if qty is not None: + body["quantity"] = qty + if limit_price is not None: + body["price"] = limit_price + if stop_price is not None: + body["auxPrice"] = stop_price + if tif is not None: + body["tif"] = tif.upper() + path = f"/iserver/account/{self.account_id}/order/{order_id}" + result = await self._request("POST", path, json_body=body) + return await self._handle_order_response(result, cycles=0) + + async def cancel_order(self, order_id: str) -> dict: + path = f"/iserver/account/{self.account_id}/order/{order_id}" + await self._request("DELETE", path) + return {"order_id": order_id, "canceled": True} + + async def cancel_all_orders(self) -> list[dict]: + orders = await self.get_open_orders() + results = [] + for o in orders: + oid = o.get("orderId") or o.get("order_id") + if not oid: + continue + try: + results.append(await self.cancel_order(str(oid))) + except Exception as e: + results.append({"order_id": str(oid), "canceled": False, "error": str(e)}) + return results + + async def close_position( + self, symbol: str, qty: float | None = None + ) -> dict: + positions = await self.get_positions() + target = next((p for p in positions if p.get("contractDesc") == symbol), None) + if not target: + raise IBKRError(f"IBKR_NO_POSITION: {symbol}") + position_qty = float(target.get("position", 0)) + close_qty = abs(qty if qty is not None else position_qty) + side = "SELL" if position_qty > 0 else "BUY" + return await self.place_order( + symbol=symbol, side=side, qty=close_qty, order_type="market", + ) + + async def close_all_positions(self) -> list[dict]: + positions = await self.get_positions() + results = [] + for p in positions: + sym = p.get("contractDesc") + if not sym: + continue + try: + results.append(await self.close_position(sym)) + except Exception as e: + results.append({"symbol": sym, "error": str(e)}) + return results + + +def _ibkr_order_type(t: str) -> str: + m = {"market": "MKT", "limit": "LMT", "stop": "STP", "stop_limit": "STP_LMT"} + if t.lower() not in m: + raise IBKRError(f"unsupported order_type: {t}") + return m[t.lower()] +``` + +- [ ] **Step 6.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_client.py -v` +Expected: 8 PASS. + +- [ ] **Step 6.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/client.py tests/unit/exchanges/ibkr/test_client.py +git commit -m "feat(V2): IBKR write methods + auto-confirm warning flow + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 7: IBKRWebSocket — connection lifecycle + subscribe + cache + +**Files:** +- Create: `src/cerbero_mcp/exchanges/ibkr/ws.py` +- Create: `tests/unit/exchanges/ibkr/test_ws.py` + +- [ ] **Step 7.1: Write failing test for subscribe + snapshot cache** + +Create `tests/unit/exchanges/ibkr/test_ws.py`: + +```python +from __future__ import annotations +import asyncio +import json +import pytest +from unittest.mock import AsyncMock, MagicMock + +from cerbero_mcp.exchanges.ibkr.ws import IBKRWebSocket, WSError + + +class FakeWS: + """Bidirectional async iterator for fake WSS messages.""" + def __init__(self) -> None: + self.sent: list[str] = [] + self._inbox: asyncio.Queue[str] = asyncio.Queue() + self.closed = False + async def send(self, msg: str) -> None: + self.sent.append(msg) + async def recv(self) -> str: + return await self._inbox.get() + async def close(self) -> None: + self.closed = True + async def push(self, payload: dict) -> None: + await self._inbox.put(json.dumps(payload)) + + +@pytest.fixture +def fake_signer(): + s = MagicMock() + s.get_live_session_token = AsyncMock(return_value="LST==") + return s + + +@pytest.mark.asyncio +async def test_subscribe_tick_caches_snapshot(fake_signer, monkeypatch): + fake_ws = FakeWS() + + async def fake_connect(url, **kw): + return fake_ws + + monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect) + + ws = IBKRWebSocket( + signer=fake_signer, + ws_url="wss://api.ibkr.com/v1/api/ws", + base_url="https://api.ibkr.com/v1/api", + max_subs=80, idle_timeout_s=300, + ) + await ws.start() + await ws.subscribe_tick(265598) + + # Simulate IBKR push: smd-{conid}+... + await fake_ws.push({ + "topic": "smd+265598", + "31": "150.5", "84": "150.4", "86": "150.6", + "7295": "100", "7296": "200", + }) + # Allow dispatch + await asyncio.sleep(0.05) + + snap = ws.get_tick_snapshot(265598) + assert snap is not None + assert snap["last_price"] == 150.5 + assert snap["bid"] == 150.4 + + await ws.stop() + + +@pytest.mark.asyncio +async def test_subscribe_limit(fake_signer, monkeypatch): + fake_ws = FakeWS() + + async def fake_connect(url, **kw): + return fake_ws + + monkeypatch.setattr("cerbero_mcp.exchanges.ibkr.ws.websockets_connect", fake_connect) + + ws = IBKRWebSocket( + signer=fake_signer, + ws_url="wss://x", base_url="https://x", + max_subs=2, idle_timeout_s=300, + ) + await ws.start() + await ws.subscribe_tick(1) + await ws.subscribe_tick(2) + with pytest.raises(WSError, match="IBKR_WS_SUB_LIMIT"): + await ws.subscribe_tick(3) + await ws.stop() +``` + +- [ ] **Step 7.2: Run, verify FAIL** + +- [ ] **Step 7.3: Implement IBKRWebSocket** + +Create `src/cerbero_mcp/exchanges/ibkr/ws.py`: + +```python +"""IBKR Client Portal WebSocket — persistent WSS, smd/sbd subs, snapshot cache.""" +from __future__ import annotations + +import asyncio +import contextlib +import json +import time +from dataclasses import dataclass, field +from typing import Any + +import websockets +from websockets.client import connect as websockets_connect # exposed for tests + +from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner + + +class WSError(Exception): + """WebSocket layer error.""" + + +@dataclass +class TickSnapshot: + last_price: float | None + bid: float | None + ask: float | None + bid_size: float | None + ask_size: float | None + timestamp_ms: int + + +@dataclass +class DepthSnapshot: + bids: list[dict] # [{"price":, "size":}, ...] + asks: list[dict] + timestamp_ms: int + + +_SMD_FIELDS = ["31", "84", "86", "7295", "7296"] + + +@dataclass +class IBKRWebSocket: + signer: OAuth1aSigner + ws_url: str + base_url: str + max_subs: int = 80 + idle_timeout_s: int = 300 + + _ws: Any = field(default=None, init=False, repr=False) + _tick_cache: dict[int, TickSnapshot] = field(default_factory=dict, init=False) + _depth_cache: dict[int, DepthSnapshot] = field(default_factory=dict, init=False) + _subs: set[int] = field(default_factory=set, init=False) + _depth_subs: set[int] = field(default_factory=set, init=False) + _last_polled_at: dict[int, float] = field(default_factory=dict, init=False) + _forced_subs: set[int] = field(default_factory=set, init=False) + _reader_task: asyncio.Task | None = field(default=None, init=False) + _idle_task: asyncio.Task | None = field(default=None, init=False) + _stopped: bool = field(default=False, init=False) + + @property + def connected(self) -> bool: + return self._ws is not None and not getattr(self._ws, "closed", True) + + async def start(self) -> None: + if self.connected: + return + lst = await self.signer.get_live_session_token(base_url=self.base_url) + self._ws = await websockets_connect( + self.ws_url, + additional_headers={"Cookie": f"api={lst}"}, + ) + self._reader_task = asyncio.create_task(self._reader_loop()) + self._idle_task = asyncio.create_task(self._idle_sweeper()) + + async def stop(self) -> None: + self._stopped = True + if self._idle_task: + self._idle_task.cancel() + with contextlib.suppress(BaseException): + await self._idle_task + if self._reader_task: + self._reader_task.cancel() + with contextlib.suppress(BaseException): + await self._reader_task + if self._ws: + with contextlib.suppress(Exception): + await self._ws.close() + self._ws = None + + async def subscribe_tick(self, conid: int, *, forced: bool = False) -> None: + await self._ensure_capacity(conid) + if conid in self._subs: + self._last_polled_at[conid] = time.monotonic() + if forced: + self._forced_subs.add(conid) + return + msg = "smd+" + str(conid) + "+" + json.dumps({"fields": _SMD_FIELDS}) + await self._ws.send(msg) + self._subs.add(conid) + self._last_polled_at[conid] = time.monotonic() + if forced: + self._forced_subs.add(conid) + + async def subscribe_depth( + self, conid: int, *, exchange: str = "SMART", rows: int = 5 + ) -> None: + await self._ensure_capacity(conid) + if conid in self._depth_subs: + self._last_polled_at[conid] = time.monotonic() + return + msg = f"sbd+{conid}+{exchange}+{rows}" + await self._ws.send(msg) + self._depth_subs.add(conid) + self._last_polled_at[conid] = time.monotonic() + + async def unsubscribe(self, conid: int) -> None: + if conid in self._subs: + await self._ws.send(f"umd+{conid}+{{}}") + self._subs.discard(conid) + if conid in self._depth_subs: + await self._ws.send(f"ubd+{conid}") + self._depth_subs.discard(conid) + self._tick_cache.pop(conid, None) + self._depth_cache.pop(conid, None) + self._last_polled_at.pop(conid, None) + self._forced_subs.discard(conid) + + def get_tick_snapshot(self, conid: int) -> dict | None: + snap = self._tick_cache.get(conid) + if not snap: + return None + self._last_polled_at[conid] = time.monotonic() + return { + "conid": conid, + "last_price": snap.last_price, + "bid": snap.bid, + "ask": snap.ask, + "bid_size": snap.bid_size, + "ask_size": snap.ask_size, + "timestamp_ms": snap.timestamp_ms, + } + + def get_depth_snapshot(self, conid: int) -> dict | None: + snap = self._depth_cache.get(conid) + if not snap: + return None + self._last_polled_at[conid] = time.monotonic() + return { + "conid": conid, + "bids": snap.bids, + "asks": snap.asks, + "timestamp_ms": snap.timestamp_ms, + } + + async def _ensure_capacity(self, conid: int) -> None: + if (conid in self._subs) or (conid in self._depth_subs): + return + active = len(self._subs) + len(self._depth_subs) + if active >= self.max_subs: + raise WSError(f"IBKR_WS_SUB_LIMIT: {active}/{self.max_subs}") + + async def _reader_loop(self) -> None: + try: + while not self._stopped and self._ws: + raw = await self._ws.recv() + try: + msg = json.loads(raw) + except json.JSONDecodeError: + continue + topic = msg.get("topic", "") + if topic.startswith("smd+"): + self._on_tick(topic, msg) + elif topic.startswith("sbd+"): + self._on_depth(topic, msg) + except asyncio.CancelledError: + raise + except Exception: + # On unexpected disconnect, leave cache; reconnect logic in Task 8 + return + + def _on_tick(self, topic: str, msg: dict) -> None: + try: + conid = int(topic.split("+", 1)[1]) + except (ValueError, IndexError): + return + + def _f(k: str) -> float | None: + v = msg.get(k) + try: + return float(v) if v not in (None, "") else None + except (TypeError, ValueError): + return None + + self._tick_cache[conid] = TickSnapshot( + last_price=_f("31"), bid=_f("84"), ask=_f("86"), + bid_size=_f("7295"), ask_size=_f("7296"), + timestamp_ms=int(time.time() * 1000), + ) + + def _on_depth(self, topic: str, msg: dict) -> None: + try: + conid = int(topic.split("+", 1)[1]) + except (ValueError, IndexError): + return + self._depth_cache[conid] = DepthSnapshot( + bids=msg.get("bids") or [], + asks=msg.get("asks") or [], + timestamp_ms=int(time.time() * 1000), + ) + + async def _idle_sweeper(self) -> None: + try: + while not self._stopped: + await asyncio.sleep(30) + now = time.monotonic() + expired = [ + c for c in list(self._subs | self._depth_subs) + if c not in self._forced_subs + and now - self._last_polled_at.get(c, now) > self.idle_timeout_s + ] + for c in expired: + with contextlib.suppress(Exception): + await self.unsubscribe(c) + except asyncio.CancelledError: + raise +``` + +- [ ] **Step 7.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_ws.py -v` +Expected: 2 PASS. + +- [ ] **Step 7.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/ws.py tests/unit/exchanges/ibkr/test_ws.py +git commit -m "feat(V2): IBKR WebSocket layer + tick/depth snapshot cache + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 8: tools.py — schemas + read tool functions + +**Files:** +- Create: `src/cerbero_mcp/exchanges/ibkr/tools.py` +- Create: `tests/unit/exchanges/ibkr/test_tools.py` + +- [ ] **Step 8.1: Write failing test for read tool schemas + dispatch** + +Create `tests/unit/exchanges/ibkr/test_tools.py`: + +```python +from __future__ import annotations +import pytest +from unittest.mock import AsyncMock, MagicMock + +from cerbero_mcp.exchanges.ibkr import tools as t + + +def test_place_order_req_schema(): + req = t.PlaceOrderReq(symbol="AAPL", side="buy", qty=1) + assert req.order_type == "market" + assert req.tif == "day" + assert req.exchange == "SMART" + + +def test_place_order_req_options_validates_occ(): + req = t.PlaceOrderReq( + symbol="AAPL 240119C00190000", side="buy", qty=1, asset_class="options", + ) + assert req.asset_class == "options" + + +@pytest.mark.asyncio +async def test_get_account_tool_calls_client(): + client = MagicMock() + client.get_account = AsyncMock(return_value={"netliquidation": {"amount": 10000}}) + res = await t.get_account(client, t.GetAccountReq()) + assert res["netliquidation"]["amount"] == 10000 +``` + +- [ ] **Step 8.2: Run, verify FAIL** + +- [ ] **Step 8.3: Implement schemas + read functions** + +Create `src/cerbero_mcp/exchanges/ibkr/tools.py`: + +```python +"""IBKR tool functions: Pydantic schemas + async dispatch to client/ws.""" +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel + +from cerbero_mcp.exchanges.ibkr.client import IBKRClient +from cerbero_mcp.exchanges.ibkr.leverage_cap import enforce_leverage, get_max_leverage +from cerbero_mcp.exchanges.ibkr.ws import IBKRWebSocket + +# === Schemas: reads === + +class GetAccountReq(BaseModel): + pass + +class GetPositionsReq(BaseModel): + pass + +class GetOpenOrdersReq(BaseModel): + pass + +class GetActivitiesReq(BaseModel): + days: int = 7 + +class GetTickerReq(BaseModel): + symbol: str + asset_class: str = "stocks" + +class GetBarsReq(BaseModel): + symbol: str + asset_class: str = "stocks" + period: str = "1d" + bar: str = "5min" + +class GetSnapshotReq(BaseModel): + symbol: str + asset_class: str = "stocks" + +class GetOptionChainReq(BaseModel): + underlying: str + expiry: str | None = None + +class SearchContractsReq(BaseModel): + symbol: str + sec_type: str = "STK" + +class GetClockReq(BaseModel): + pass + +# === Schemas: streaming === + +class GetTickReq(BaseModel): + symbol: str + asset_class: str = "stocks" + +class GetDepthReq(BaseModel): + symbol: str + asset_class: str = "stocks" + rows: int = 5 + exchange: str = "SMART" + +class SubscribeTickReq(BaseModel): + symbol: str + asset_class: str = "stocks" + +class UnsubscribeReq(BaseModel): + symbol: str + asset_class: str = "stocks" + +# === Schemas: writes simple === + +class PlaceOrderReq(BaseModel): + symbol: str + side: str + qty: float + order_type: str = "market" + limit_price: float | None = None + stop_price: float | None = None + tif: str = "day" + asset_class: str = "stocks" + sec_type: str | None = None + exchange: str = "SMART" + outside_rth: bool = False + +class AmendOrderReq(BaseModel): + order_id: str + qty: float | None = None + limit_price: float | None = None + stop_price: float | None = None + tif: str | None = None + +class CancelOrderReq(BaseModel): + order_id: str + +class CancelAllOrdersReq(BaseModel): + pass + +class ClosePositionReq(BaseModel): + symbol: str + qty: float | None = None + +class CloseAllPositionsReq(BaseModel): + pass + +# === Schemas: writes complex === + +class PlaceBracketOrderReq(BaseModel): + symbol: str + side: str + qty: float + entry_price: float + stop_loss: float + take_profit: float + tif: str = "gtc" + asset_class: str = "stocks" + exchange: str = "SMART" + +class OrderLeg(BaseModel): + symbol: str + side: str + qty: float + order_type: str = "limit" + limit_price: float | None = None + stop_price: float | None = None + tif: str = "gtc" + asset_class: str = "stocks" + +class PlaceOcoOrderReq(BaseModel): + legs: list[OrderLeg] + +class PlaceOtoOrderReq(BaseModel): + trigger: OrderLeg + child: OrderLeg + +# === Read tools === + +async def environment_info( + client: IBKRClient, *, creds: dict, env_info: Any | None = None +) -> dict: + return { + "exchange": "ibkr", + "environment": "testnet" if client.paper else "mainnet", + "paper": client.paper, + "base_url": client.base_url, + "max_leverage": get_max_leverage(creds), + } + +async def get_account(client: IBKRClient, params: GetAccountReq) -> dict: + return await client.get_account() + +async def get_positions(client: IBKRClient, params: GetPositionsReq) -> dict: + return {"positions": await client.get_positions()} + +async def get_open_orders(client: IBKRClient, params: GetOpenOrdersReq) -> dict: + return {"orders": await client.get_open_orders()} + +async def get_activities(client: IBKRClient, params: GetActivitiesReq) -> dict: + return {"activities": await client.get_activities(params.days)} + +async def get_ticker(client: IBKRClient, params: GetTickerReq) -> dict: + return await client.get_ticker(params.symbol, params.asset_class) + +async def get_bars(client: IBKRClient, params: GetBarsReq) -> dict: + return await client.get_bars( + params.symbol, params.asset_class, params.period, params.bar, + ) + +async def get_snapshot(client: IBKRClient, params: GetSnapshotReq) -> dict: + return await client.get_ticker(params.symbol, params.asset_class) + +async def get_option_chain(client: IBKRClient, params: GetOptionChainReq) -> dict: + return await client.get_option_chain(params.underlying, params.expiry) + +async def search_contracts(client: IBKRClient, params: SearchContractsReq) -> dict: + return {"contracts": await client.search_contracts(params.symbol, params.sec_type)} + +async def get_clock(client: IBKRClient, params: GetClockReq) -> dict: + import datetime as _dt + now = _dt.datetime.now(_dt.UTC) + return { + "timestamp": now.isoformat(), + "is_open": _dt.time(13, 30) <= now.time() <= _dt.time(20, 0) + and now.weekday() < 5, + } +``` + +- [ ] **Step 8.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_tools.py -v` +Expected: 3 PASS. + +- [ ] **Step 8.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/tools.py tests/unit/exchanges/ibkr/test_tools.py +git commit -m "feat(V2): IBKR read tool schemas + dispatch functions + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 9: tools.py — streaming tool functions + +**Files:** +- Modify: `src/cerbero_mcp/exchanges/ibkr/tools.py` +- Modify: `tests/unit/exchanges/ibkr/test_tools.py` + +- [ ] **Step 9.1: Write failing test** + +Append to `tests/unit/exchanges/ibkr/test_tools.py`: + +```python +@pytest.mark.asyncio +async def test_get_tick_uses_cache_or_subscribes(): + client = MagicMock() + client.resolve_conid = AsyncMock(return_value=42) + ws = MagicMock() + ws.get_tick_snapshot = MagicMock(side_effect=[ + None, # first poll: empty + {"conid": 42, "last_price": 99.5, "bid": 99.4, "ask": 99.6, + "bid_size": 1, "ask_size": 1, "timestamp_ms": 1700000000000}, + ]) + ws.subscribe_tick = AsyncMock() + + res = await t.get_tick( + client, t.GetTickReq(symbol="AAPL"), ws=ws, timeout_s=0.05, + ) + assert res["last_price"] == 99.5 + ws.subscribe_tick.assert_awaited_once_with(42) +``` + +- [ ] **Step 9.2: Run, verify FAIL** + +- [ ] **Step 9.3: Implement streaming tools** + +Append to `src/cerbero_mcp/exchanges/ibkr/tools.py`: + +```python +import asyncio + + +def _sec_type_for(asset_class: str) -> str: + return { + "stocks": "STK", "options": "OPT", + "futures": "FUT", "forex": "CASH", + }.get(asset_class.lower(), "STK") + + +async def get_tick( + client: IBKRClient, params: GetTickReq, + *, ws: IBKRWebSocket, timeout_s: float = 3.0, +) -> dict: + sec = _sec_type_for(params.asset_class) + conid = await client.resolve_conid(params.symbol, sec) + snap = ws.get_tick_snapshot(conid) + if snap: + return {**snap, "symbol": params.symbol} + await ws.subscribe_tick(conid) + deadline = asyncio.get_event_loop().time() + timeout_s + while asyncio.get_event_loop().time() < deadline: + snap = ws.get_tick_snapshot(conid) + if snap: + return {**snap, "symbol": params.symbol} + await asyncio.sleep(0.05) + from cerbero_mcp.exchanges.ibkr.client import IBKRError + raise IBKRError(f"IBKR_TICK_TIMEOUT: {params.symbol}") + + +async def get_depth( + client: IBKRClient, params: GetDepthReq, + *, ws: IBKRWebSocket, timeout_s: float = 3.0, +) -> dict: + sec = _sec_type_for(params.asset_class) + conid = await client.resolve_conid(params.symbol, sec) + snap = ws.get_depth_snapshot(conid) + if snap: + return {**snap, "symbol": params.symbol} + await ws.subscribe_depth(conid, exchange=params.exchange, rows=params.rows) + deadline = asyncio.get_event_loop().time() + timeout_s + while asyncio.get_event_loop().time() < deadline: + snap = ws.get_depth_snapshot(conid) + if snap: + return {**snap, "symbol": params.symbol} + await asyncio.sleep(0.05) + from cerbero_mcp.exchanges.ibkr.client import IBKRError + raise IBKRError(f"IBKR_DEPTH_TIMEOUT: {params.symbol}") + + +async def subscribe_tick( + client: IBKRClient, params: SubscribeTickReq, *, ws: IBKRWebSocket, +) -> dict: + sec = _sec_type_for(params.asset_class) + conid = await client.resolve_conid(params.symbol, sec) + await ws.subscribe_tick(conid, forced=True) + return {"symbol": params.symbol, "conid": conid, "subscribed": True} + + +async def unsubscribe( + client: IBKRClient, params: UnsubscribeReq, *, ws: IBKRWebSocket, +) -> dict: + sec = _sec_type_for(params.asset_class) + conid = await client.resolve_conid(params.symbol, sec) + await ws.unsubscribe(conid) + return {"symbol": params.symbol, "conid": conid, "unsubscribed": True} +``` + +- [ ] **Step 9.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_tools.py -v` +Expected: 4 PASS. + +- [ ] **Step 9.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/tools.py tests/unit/exchanges/ibkr/test_tools.py +git commit -m "feat(V2): IBKR streaming tools (tick/depth/subscribe) + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 10: tools.py — write tool functions (simple) + +**Files:** +- Modify: `src/cerbero_mcp/exchanges/ibkr/tools.py` +- Modify: `tests/unit/exchanges/ibkr/test_tools.py` + +- [ ] **Step 10.1: Write failing test for place_order with leverage cap** + +Append to `tests/unit/exchanges/ibkr/test_tools.py`: + +```python +@pytest.mark.asyncio +async def test_place_order_enforces_leverage(): + client = MagicMock() + client.get_account = AsyncMock(return_value={ + "netliquidation": {"amount": 10000}, + }) + client.place_order = AsyncMock(return_value={"order_id": "O1"}) + creds = {"max_leverage": 2} + # Order notional 1000 vs equity 10000 → ratio 0.1 << 2x cap → ok + res = await t.place_order( + client, t.PlaceOrderReq(symbol="AAPL", side="buy", qty=10), + creds=creds, last_price=100.0, + ) + assert res["order_id"] == "O1" + + +@pytest.mark.asyncio +async def test_cancel_order_calls_client(): + client = MagicMock() + client.cancel_order = AsyncMock(return_value={"order_id": "O1", "canceled": True}) + res = await t.cancel_order(client, t.CancelOrderReq(order_id="O1")) + assert res["canceled"] is True +``` + +- [ ] **Step 10.2: Run, verify FAIL** + +- [ ] **Step 10.3: Implement write tool functions** + +Append to `src/cerbero_mcp/exchanges/ibkr/tools.py`: + +```python +from fastapi import HTTPException + + +async def place_order( + client: IBKRClient, params: PlaceOrderReq, + *, creds: dict, last_price: float | None = None, +) -> dict: + # Leverage cap: notional / equity ≤ max_leverage + cap = get_max_leverage(creds) + if last_price is None: + try: + ticker = await client.get_ticker(params.symbol, params.asset_class) + last_price = ticker.get("last_price") or ticker.get("ask") + except Exception: + last_price = None + if last_price: + notional = params.qty * float(last_price) + try: + account = await client.get_account() + equity = float( + (account.get("netliquidation") or {}).get("amount") or 0 + ) + except Exception: + equity = 0.0 + if equity > 0 and notional / equity > cap: + raise HTTPException( + status_code=403, + detail={ + "error": "LEVERAGE_CAP_EXCEEDED", + "exchange": "ibkr", + "requested_ratio": notional / equity, + "max": cap, + }, + ) + + return await client.place_order( + symbol=params.symbol, + side=params.side, + qty=params.qty, + order_type=params.order_type, + limit_price=params.limit_price, + stop_price=params.stop_price, + tif=params.tif, + asset_class=params.asset_class, + sec_type=params.sec_type, + exchange=params.exchange, + outside_rth=params.outside_rth, + ) + + +async def amend_order(client: IBKRClient, params: AmendOrderReq) -> dict: + return await client.amend_order( + params.order_id, + qty=params.qty, + limit_price=params.limit_price, + stop_price=params.stop_price, + tif=params.tif, + ) + + +async def cancel_order(client: IBKRClient, params: CancelOrderReq) -> dict: + return await client.cancel_order(params.order_id) + + +async def cancel_all_orders( + client: IBKRClient, params: CancelAllOrdersReq +) -> dict: + return {"canceled": await client.cancel_all_orders()} + + +async def close_position( + client: IBKRClient, params: ClosePositionReq +) -> dict: + return await client.close_position(params.symbol, params.qty) + + +async def close_all_positions( + client: IBKRClient, params: CloseAllPositionsReq +) -> dict: + return {"closed": await client.close_all_positions()} +``` + +- [ ] **Step 10.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_tools.py -v` +Expected: 6 PASS. + +- [ ] **Step 10.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/tools.py tests/unit/exchanges/ibkr/test_tools.py +git commit -m "feat(V2): IBKR simple write tools (place/amend/cancel/close) + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 11: orders_complex.py — bracket/OCO/OTO payload builders + +**Files:** +- Create: `src/cerbero_mcp/exchanges/ibkr/orders_complex.py` +- Create: `tests/unit/exchanges/ibkr/test_orders_complex.py` + +- [ ] **Step 11.1: Write failing test for bracket builder** + +Create `tests/unit/exchanges/ibkr/test_orders_complex.py`: + +```python +from __future__ import annotations +import pytest +from cerbero_mcp.exchanges.ibkr.orders_complex import ( + build_bracket_payload, build_oco_payload, OrderSpec, +) + + +def test_bracket_three_legs_with_oca(): + payload = build_bracket_payload( + conid=42, sec_type="STK", side="BUY", qty=10, + entry_price=150.0, stop_loss=145.0, take_profit=160.0, + tif="GTC", exchange="SMART", + ) + assert "orders" in payload + legs = payload["orders"] + assert len(legs) == 3 + # Same OCA group + oca = legs[0].get("ocaGroup") + assert oca and all(l.get("ocaGroup") == oca for l in legs[1:]) + # Parent: limit BUY entry; SL: stop opposite; TP: limit opposite + assert legs[0]["orderType"] == "LMT" + assert legs[0]["price"] == 150.0 + assert legs[0]["side"] == "BUY" + assert legs[1]["side"] == "SELL" + assert legs[2]["side"] == "SELL" + assert legs[1]["orderType"] == "STP" + assert legs[1]["auxPrice"] == 145.0 + assert legs[2]["orderType"] == "LMT" + assert legs[2]["price"] == 160.0 + + +def test_oco_oca_group_and_type(): + legs = [ + OrderSpec(conid=1, sec_type="STK", side="BUY", qty=1, + order_type="LMT", price=100), + OrderSpec(conid=1, sec_type="STK", side="BUY", qty=1, + order_type="LMT", price=110), + ] + payload = build_oco_payload(legs) + assert len(payload["orders"]) == 2 + oca = payload["orders"][0]["ocaGroup"] + for o in payload["orders"]: + assert o["ocaGroup"] == oca + assert o["ocaType"] == 1 +``` + +- [ ] **Step 11.2: Run, verify FAIL** + +- [ ] **Step 11.3: Implement builders** + +Create `src/cerbero_mcp/exchanges/ibkr/orders_complex.py`: + +```python +"""Pure-function payload builders for IBKR complex orders (bracket/OCO/OTO). + +No HTTP. Tests are deterministic. +""" +from __future__ import annotations + +import secrets +from dataclasses import dataclass +from typing import Literal + + +@dataclass +class OrderSpec: + conid: int + sec_type: str # "STK" | "OPT" | "FUT" | "CASH" + side: Literal["BUY", "SELL"] + qty: float + order_type: Literal["MKT", "LMT", "STP", "STP_LMT"] + price: float | None = None # limit price + aux_price: float | None = None # stop price + tif: str = "GTC" + exchange: str = "SMART" + + +def _to_order_dict(spec: OrderSpec, *, oca_group: str | None = None, + oca_type: int | None = None, + parent_id: str | None = None) -> dict: + o: dict = { + "conid": spec.conid, + "secType": f"{spec.conid}:{spec.sec_type}", + "orderType": spec.order_type, + "side": spec.side, + "quantity": spec.qty, + "tif": spec.tif, + "listingExchange": spec.exchange, + } + if spec.price is not None: + o["price"] = spec.price + if spec.aux_price is not None: + o["auxPrice"] = spec.aux_price + if oca_group: + o["ocaGroup"] = oca_group + if oca_type is not None: + o["ocaType"] = oca_type + if parent_id: + o["parentId"] = parent_id + return o + + +def _new_oca_group() -> str: + return f"oca-{secrets.token_hex(4)}" + + +def build_bracket_payload( + *, conid: int, sec_type: str, side: str, qty: float, + entry_price: float, stop_loss: float, take_profit: float, + tif: str = "GTC", exchange: str = "SMART", +) -> dict: + """Bracket: parent LMT entry + child STP (loss) + child LMT (profit), OCA-linked.""" + side = side.upper() + opposite = "SELL" if side == "BUY" else "BUY" + oca = _new_oca_group() + + parent = OrderSpec(conid=conid, sec_type=sec_type, side=side, qty=qty, # type: ignore[arg-type] + order_type="LMT", price=entry_price, + tif=tif, exchange=exchange) + sl = OrderSpec(conid=conid, sec_type=sec_type, side=opposite, qty=qty, # type: ignore[arg-type] + order_type="STP", aux_price=stop_loss, + tif=tif, exchange=exchange) + tp = OrderSpec(conid=conid, sec_type=sec_type, side=opposite, qty=qty, # type: ignore[arg-type] + order_type="LMT", price=take_profit, + tif=tif, exchange=exchange) + return { + "orders": [ + _to_order_dict(parent, oca_group=oca, oca_type=2), + _to_order_dict(sl, oca_group=oca, oca_type=2), + _to_order_dict(tp, oca_group=oca, oca_type=2), + ] + } + + +def build_oco_payload(legs: list[OrderSpec]) -> dict: + """OCO: N legs, all sharing same ocaGroup with ocaType=1 (one-cancels-all).""" + if len(legs) < 2: + raise ValueError("OCO requires at least 2 legs") + oca = _new_oca_group() + return { + "orders": [ + _to_order_dict(l, oca_group=oca, oca_type=1) for l in legs + ] + } + + +def build_oto_first_payload(trigger: OrderSpec) -> dict: + """OTO step 1: place trigger as standalone.""" + return {"orders": [_to_order_dict(trigger)]} + + +def build_oto_child_payload(child: OrderSpec, parent_order_id: str) -> dict: + """OTO step 2: child references parentId from step-1 order_id.""" + return {"orders": [_to_order_dict(child, parent_id=parent_order_id)]} +``` + +- [ ] **Step 11.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_orders_complex.py -v` +Expected: 2 PASS. + +- [ ] **Step 11.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/orders_complex.py tests/unit/exchanges/ibkr/test_orders_complex.py +git commit -m "feat(V2): IBKR complex order payload builders (bracket/OCO/OTO) + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 12: tools.py — complex order tool functions + +**Files:** +- Modify: `src/cerbero_mcp/exchanges/ibkr/tools.py` +- Modify: `tests/unit/exchanges/ibkr/test_tools.py` + +- [ ] **Step 12.1: Write failing test for bracket order tool** + +Append to `tests/unit/exchanges/ibkr/test_tools.py`: + +```python +@pytest.mark.asyncio +async def test_place_bracket_order_calls_client_with_three_legs(): + client = MagicMock() + client.resolve_conid = AsyncMock(return_value=42) + client.account_id = "DU1" + client._submit_order_with_confirmation = AsyncMock( + return_value={"order_id": "OID-parent"} + ) + res = await t.place_bracket_order( + client, + t.PlaceBracketOrderReq( + symbol="AAPL", side="buy", qty=1, + entry_price=150, stop_loss=145, take_profit=160, + ), + creds={"max_leverage": 4}, + ) + assert res["order_id"] == "OID-parent" + payload = client._submit_order_with_confirmation.call_args[0][0] + assert len(payload["orders"]) == 3 + + +@pytest.mark.asyncio +async def test_place_oto_partial_failure_cancels_trigger(): + from cerbero_mcp.exchanges.ibkr.client import IBKRError + client = MagicMock() + client.resolve_conid = AsyncMock(return_value=42) + client.account_id = "DU1" + client._submit_order_with_confirmation = AsyncMock( + side_effect=[ + {"order_id": "TRIG1"}, # first call ok + IBKRError("network"), # second fails + ] + ) + client.cancel_order = AsyncMock(return_value={"canceled": True}) + with pytest.raises(IBKRError, match="IBKR_OTO_PARTIAL_FAILURE"): + await t.place_oto_order( + client, + t.PlaceOtoOrderReq( + trigger=t.OrderLeg(symbol="AAPL", side="buy", qty=1, + order_type="limit", limit_price=150), + child=t.OrderLeg(symbol="AAPL", side="sell", qty=1, + order_type="limit", limit_price=160), + ), + creds={"max_leverage": 4}, + ) + client.cancel_order.assert_awaited_once_with("TRIG1") +``` + +- [ ] **Step 12.2: Run, verify FAIL** + +- [ ] **Step 12.3: Implement complex order tools** + +Append to `src/cerbero_mcp/exchanges/ibkr/tools.py`: + +```python +from cerbero_mcp.exchanges.ibkr.client import IBKRError +from cerbero_mcp.exchanges.ibkr.orders_complex import ( + OrderSpec, build_bracket_payload, build_oco_payload, + build_oto_first_payload, build_oto_child_payload, +) + + +def _leg_to_spec(leg: OrderLeg, conid: int) -> OrderSpec: + return OrderSpec( + conid=conid, + sec_type=_sec_type_for(leg.asset_class), + side=leg.side.upper(), # type: ignore[arg-type] + qty=leg.qty, + order_type={ + "market": "MKT", "limit": "LMT", + "stop": "STP", "stop_limit": "STP_LMT", + }[leg.order_type.lower()], # type: ignore[arg-type] + price=leg.limit_price, + aux_price=leg.stop_price, + tif=leg.tif.upper(), + ) + + +async def place_bracket_order( + client: IBKRClient, params: PlaceBracketOrderReq, *, creds: dict, +) -> dict: + sec = _sec_type_for(params.asset_class) + conid = await client.resolve_conid(params.symbol, sec) + # Leverage cap: notional = qty * entry + cap = get_max_leverage(creds) + notional = params.qty * params.entry_price + try: + account = await client.get_account() + equity = float((account.get("netliquidation") or {}).get("amount") or 0) + except Exception: + equity = 0.0 + if equity > 0 and notional / equity > cap: + raise HTTPException( + status_code=403, + detail={"error": "LEVERAGE_CAP_EXCEEDED", "exchange": "ibkr", + "requested_ratio": notional / equity, "max": cap}, + ) + payload = build_bracket_payload( + conid=conid, sec_type=sec, side=params.side.upper(), qty=params.qty, + entry_price=params.entry_price, stop_loss=params.stop_loss, + take_profit=params.take_profit, tif=params.tif.upper(), + exchange=params.exchange, + ) + return await client._submit_order_with_confirmation(payload) + + +async def place_oco_order( + client: IBKRClient, params: PlaceOcoOrderReq, *, creds: dict, +) -> dict: + if len(params.legs) < 2: + raise HTTPException(400, detail={"error": "OCO requires >=2 legs"}) + # Leverage cap: max leg notional + cap = get_max_leverage(creds) + leg_notional = max( + l.qty * (l.limit_price or l.stop_price or 0) for l in params.legs + ) + try: + account = await client.get_account() + equity = float((account.get("netliquidation") or {}).get("amount") or 0) + except Exception: + equity = 0.0 + if equity > 0 and leg_notional / equity > cap: + raise HTTPException( + status_code=403, + detail={"error": "LEVERAGE_CAP_EXCEEDED", "exchange": "ibkr", + "requested_ratio": leg_notional / equity, "max": cap}, + ) + + specs = [] + for l in params.legs: + sec = _sec_type_for(l.asset_class) + conid = await client.resolve_conid(l.symbol, sec) + specs.append(_leg_to_spec(l, conid)) + payload = build_oco_payload(specs) + return await client._submit_order_with_confirmation(payload) + + +async def place_oto_order( + client: IBKRClient, params: PlaceOtoOrderReq, *, creds: dict, +) -> dict: + sec_t = _sec_type_for(params.trigger.asset_class) + sec_c = _sec_type_for(params.child.asset_class) + conid_t = await client.resolve_conid(params.trigger.symbol, sec_t) + conid_c = await client.resolve_conid(params.child.symbol, sec_c) + trig_spec = _leg_to_spec(params.trigger, conid_t) + child_spec = _leg_to_spec(params.child, conid_c) + + # Step 1: place trigger + trig_payload = build_oto_first_payload(trig_spec) + trig_res = await client._submit_order_with_confirmation(trig_payload) + trigger_order_id = trig_res.get("order_id") + if not trigger_order_id: + raise IBKRError(f"IBKR_OTO_TRIGGER_NO_ID: {trig_res!r}") + + # Step 2: place child with parent_id + try: + child_payload = build_oto_child_payload(child_spec, str(trigger_order_id)) + child_res = await client._submit_order_with_confirmation(child_payload) + except Exception as e: + # Best-effort: cancel trigger to avoid orphan exposure + try: + await client.cancel_order(str(trigger_order_id)) + except Exception: + pass + raise IBKRError( + f"IBKR_OTO_PARTIAL_FAILURE: trigger={trigger_order_id} reason={e}" + ) from e + + return { + "trigger_order_id": trigger_order_id, + "child_order_id": child_res.get("order_id"), + } +``` + +- [ ] **Step 12.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_tools.py -v` +Expected: 8 PASS. + +- [ ] **Step 12.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/tools.py tests/unit/exchanges/ibkr/test_tools.py +git commit -m "feat(V2): IBKR complex order tools (bracket/OCO/OTO) + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 13: KeyRotationManager — stage/confirm/abort/rollback + +**Files:** +- Create: `src/cerbero_mcp/exchanges/ibkr/key_rotation.py` +- Create: `tests/unit/exchanges/ibkr/test_key_rotation.py` + +- [ ] **Step 13.1: Write failing test** + +Create `tests/unit/exchanges/ibkr/test_key_rotation.py`: + +```python +from __future__ import annotations +import pytest +from unittest.mock import AsyncMock +from cerbero_mcp.exchanges.ibkr.key_rotation import KeyRotationManager + + +@pytest.mark.asyncio +async def test_start_generates_new_keypair_files(tmp_path): + sig_path = tmp_path / "sig.pem" + enc_path = tmp_path / "enc.pem" + sig_path.write_bytes(b"old-sig") + enc_path.write_bytes(b"old-enc") + + mgr = KeyRotationManager( + signature_key_path=str(sig_path), + encryption_key_path=str(enc_path), + ) + out = await mgr.start() + assert "sig" in out["fingerprints"] + assert "enc" in out["fingerprints"] + assert (tmp_path / "sig.pem.new").exists() + assert (tmp_path / "enc.pem.new").exists() + + +@pytest.mark.asyncio +async def test_confirm_swap_and_validate_ok(tmp_path): + sig_path = tmp_path / "sig.pem" + enc_path = tmp_path / "enc.pem" + sig_path.write_bytes(b"old-sig") + enc_path.write_bytes(b"old-enc") + + mgr = KeyRotationManager( + signature_key_path=str(sig_path), + encryption_key_path=str(enc_path), + ) + await mgr.start() + + async def fake_validate() -> bool: + return True + out = await mgr.confirm(validate=fake_validate) + assert "rotated_at" in out + # Old files moved to archive + assert (tmp_path / ".archive").exists() + + +@pytest.mark.asyncio +async def test_confirm_validate_fail_rollbacks(tmp_path): + sig_path = tmp_path / "sig.pem" + enc_path = tmp_path / "enc.pem" + sig_path.write_bytes(b"old-sig") + enc_path.write_bytes(b"old-enc") + + mgr = KeyRotationManager( + signature_key_path=str(sig_path), + encryption_key_path=str(enc_path), + ) + await mgr.start() + + async def fake_validate() -> bool: + return False + with pytest.raises(RuntimeError, match="IBKR_ROTATION_VALIDATION_FAILED"): + await mgr.confirm(validate=fake_validate) + # Original files restored + assert sig_path.read_bytes() == b"old-sig" + assert enc_path.read_bytes() == b"old-enc" + + +@pytest.mark.asyncio +async def test_abort_cleans_new_files(tmp_path): + sig_path = tmp_path / "sig.pem" + enc_path = tmp_path / "enc.pem" + sig_path.write_bytes(b"old-sig") + enc_path.write_bytes(b"old-enc") + + mgr = KeyRotationManager( + signature_key_path=str(sig_path), + encryption_key_path=str(enc_path), + ) + await mgr.start() + await mgr.abort() + assert not (tmp_path / "sig.pem.new").exists() + assert not (tmp_path / "enc.pem.new").exists() +``` + +- [ ] **Step 13.2: Run, verify FAIL** + +- [ ] **Step 13.3: Implement KeyRotationManager** + +Create `src/cerbero_mcp/exchanges/ibkr/key_rotation.py`: + +```python +"""IBKR RSA key rotation: stage/confirm/abort with auto-rollback.""" +from __future__ import annotations + +import datetime as _dt +import hashlib +import os +import shutil +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from pathlib import Path + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + + +def _sha256_fingerprint(pem_path: Path) -> str: + digest = hashlib.sha256(pem_path.read_bytes()).hexdigest() + return f"SHA256:{digest}" + + +@dataclass +class KeyRotationManager: + signature_key_path: str + encryption_key_path: str + + _started: bool = field(default=False, init=False) + + def _sig(self) -> Path: + return Path(self.signature_key_path) + + def _enc(self) -> Path: + return Path(self.encryption_key_path) + + async def start(self) -> dict: + sig_new = self._sig().with_suffix(self._sig().suffix + ".new") + enc_new = self._enc().with_suffix(self._enc().suffix + ".new") + + for p in (sig_new, enc_new): + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + p.write_bytes(key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + )) + os.chmod(p, 0o600) + + self._started = True + return { + "fingerprints": { + "sig": _sha256_fingerprint(sig_new), + "enc": _sha256_fingerprint(enc_new), + }, + "expires_at": ( + _dt.datetime.now(_dt.UTC) + _dt.timedelta(hours=24) + ).isoformat(), + } + + async def confirm( + self, *, validate: Callable[[], Awaitable[bool]], + ) -> dict: + sig = self._sig() + enc = self._enc() + sig_new = sig.with_suffix(sig.suffix + ".new") + enc_new = enc.with_suffix(enc.suffix + ".new") + if not (sig_new.exists() and enc_new.exists()): + raise RuntimeError("IBKR_ROTATION_NOT_STARTED") + + archive = sig.parent / ".archive" / _dt.datetime.now(_dt.UTC).strftime("%Y%m%dT%H%M%S") + archive.mkdir(parents=True, exist_ok=True) + + # Atomic-ish swap + shutil.move(str(sig), str(archive / sig.name)) + shutil.move(str(enc), str(archive / enc.name)) + shutil.move(str(sig_new), str(sig)) + shutil.move(str(enc_new), str(enc)) + + try: + ok = await validate() + except Exception as e: + ok = False + err: BaseException | None = e + else: + err = None + + if not ok: + # Rollback + shutil.move(str(sig), str(sig.with_suffix(sig.suffix + ".new"))) + shutil.move(str(enc), str(enc.with_suffix(enc.suffix + ".new"))) + shutil.move(str(archive / sig.name), str(sig)) + shutil.move(str(archive / enc.name), str(enc)) + raise RuntimeError( + f"IBKR_ROTATION_VALIDATION_FAILED: {err}" if err + else "IBKR_ROTATION_VALIDATION_FAILED" + ) + + self._started = False + return { + "rotated_at": _dt.datetime.now(_dt.UTC).isoformat(), + "old_archived_at": str(archive), + } + + async def abort(self) -> dict: + sig_new = self._sig().with_suffix(self._sig().suffix + ".new") + enc_new = self._enc().with_suffix(self._enc().suffix + ".new") + for p in (sig_new, enc_new): + if p.exists(): + p.unlink() + self._started = False + return {"aborted": True} +``` + +- [ ] **Step 13.4: Run, verify PASS** + +Run: `uv run pytest tests/unit/exchanges/ibkr/test_key_rotation.py -v` +Expected: 4 PASS. + +- [ ] **Step 13.5: Commit** + +```bash +git add src/cerbero_mcp/exchanges/ibkr/key_rotation.py tests/unit/exchanges/ibkr/test_key_rotation.py +git commit -m "feat(V2): IBKR key rotation manager with auto-rollback + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 14: build_client integration + IBKR router + +**Files:** +- Modify: `src/cerbero_mcp/exchanges/__init__.py` +- Create: `src/cerbero_mcp/routers/ibkr.py` +- Modify: `src/cerbero_mcp/__main__.py` + +- [ ] **Step 14.1: Add IBKR branch to build_client** + +In `src/cerbero_mcp/exchanges/__init__.py`, before the final `raise ValueError`: + +```python + if exchange == "ibkr": + from cerbero_mcp.exchanges.ibkr.client import IBKRClient + from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner + + creds = settings.ibkr.credentials(env) + url = settings.ibkr.url_testnet if env == "testnet" else settings.ibkr.url_live + signer = OAuth1aSigner( + consumer_key=creds["consumer_key"], + access_token=creds["access_token"], + access_token_secret=creds["access_token_secret"], + signature_key_path=creds["signature_key_path"], + encryption_key_path=creds["encryption_key_path"], + dh_prime=creds["dh_prime"], + ) + return IBKRClient( + signer=signer, + account_id=creds["account_id"], + paper=(env == "testnet"), + base_url=url, + ) +``` + +- [ ] **Step 14.2: Create router** + +Create `src/cerbero_mcp/routers/ibkr.py`: + +```python +"""Router /mcp-ibkr/* — DI per env, client e (write) creds.""" +from __future__ import annotations + +from typing import Literal, cast + +from fastapi import APIRouter, Depends, Request + +from cerbero_mcp.client_registry import ClientRegistry +from cerbero_mcp.common.audit_helpers import audit_call +from cerbero_mcp.exchanges.ibkr import tools as t +from cerbero_mcp.exchanges.ibkr.client import IBKRClient +from cerbero_mcp.exchanges.ibkr.ws import IBKRWebSocket + +Environment = Literal["testnet", "mainnet"] + + +def get_environment(request: Request) -> Environment: + return cast(Environment, request.state.environment) + + +async def get_ibkr_client( + request: Request, env: Environment = Depends(get_environment), +) -> IBKRClient: + registry: ClientRegistry = request.app.state.registry + return cast(IBKRClient, await registry.get("ibkr", env)) + + +async def get_ibkr_ws( + request: Request, env: Environment = Depends(get_environment), +) -> IBKRWebSocket: + """Lazy-create singleton WS per env on first streaming call.""" + ws_dict = getattr(request.app.state, "ibkr_ws", None) + if ws_dict is None: + ws_dict = {} + request.app.state.ibkr_ws = ws_dict + if env not in ws_dict: + client = await get_ibkr_client(request, env) + settings = request.app.state.settings + ws_url = ( + settings.ibkr.ws_url_testnet if env == "testnet" + else settings.ibkr.ws_url_live + ) + ws = IBKRWebSocket( + signer=client.signer, + ws_url=ws_url, + base_url=client.base_url, + max_subs=settings.ibkr.ws_max_subscriptions, + idle_timeout_s=settings.ibkr.ws_idle_timeout_s, + ) + await ws.start() + ws_dict[env] = ws + return ws_dict[env] + + +def _build_creds(request: Request) -> dict: + settings = request.app.state.settings + return {"max_leverage": settings.ibkr.max_leverage} + + +def make_router() -> APIRouter: + r = APIRouter(prefix="/mcp-ibkr", tags=["ibkr"]) + + # === READ tools === + + @r.post("/tools/environment_info") + async def _ei(request: Request, client: IBKRClient = Depends(get_ibkr_client)): + return await t.environment_info(client, creds=_build_creds(request)) + + @r.post("/tools/get_account") + async def _ga(params: t.GetAccountReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_account(client, params) + + @r.post("/tools/get_positions") + async def _gp(params: t.GetPositionsReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_positions(client, params) + + @r.post("/tools/get_open_orders") + async def _goo(params: t.GetOpenOrdersReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_open_orders(client, params) + + @r.post("/tools/get_activities") + async def _gact(params: t.GetActivitiesReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_activities(client, params) + + @r.post("/tools/get_ticker") + async def _gt(params: t.GetTickerReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_ticker(client, params) + + @r.post("/tools/get_bars") + async def _gb(params: t.GetBarsReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_bars(client, params) + + @r.post("/tools/get_snapshot") + async def _gs(params: t.GetSnapshotReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_snapshot(client, params) + + @r.post("/tools/get_option_chain") + async def _goc(params: t.GetOptionChainReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_option_chain(client, params) + + @r.post("/tools/search_contracts") + async def _sc(params: t.SearchContractsReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.search_contracts(client, params) + + @r.post("/tools/get_clock") + async def _gc(params: t.GetClockReq, client: IBKRClient = Depends(get_ibkr_client)): + return await t.get_clock(client, params) + + # === STREAMING tools === + + @r.post("/tools/get_tick") + async def _gtk( + params: t.GetTickReq, + client: IBKRClient = Depends(get_ibkr_client), + ws: IBKRWebSocket = Depends(get_ibkr_ws), + ): + return await t.get_tick(client, params, ws=ws) + + @r.post("/tools/get_depth") + async def _gd( + params: t.GetDepthReq, + client: IBKRClient = Depends(get_ibkr_client), + ws: IBKRWebSocket = Depends(get_ibkr_ws), + ): + return await t.get_depth(client, params, ws=ws) + + @r.post("/tools/subscribe_tick") + async def _st( + params: t.SubscribeTickReq, + client: IBKRClient = Depends(get_ibkr_client), + ws: IBKRWebSocket = Depends(get_ibkr_ws), + ): + return await t.subscribe_tick(client, params, ws=ws) + + @r.post("/tools/unsubscribe") + async def _us( + params: t.UnsubscribeReq, + client: IBKRClient = Depends(get_ibkr_client), + ws: IBKRWebSocket = Depends(get_ibkr_ws), + ): + return await t.unsubscribe(client, params, ws=ws) + + # === WRITE simple === + + @r.post("/tools/place_order") + async def _po( + params: t.PlaceOrderReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + creds = _build_creds(request) + return await audit_call( + request=request, exchange="ibkr", action="place_order", + target_field="symbol", params=params, + tool_fn=lambda: t.place_order(client, params, creds=creds), + ) + + @r.post("/tools/amend_order") + async def _ao( + params: t.AmendOrderReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + return await audit_call( + request=request, exchange="ibkr", action="amend_order", + target_field="order_id", params=params, + tool_fn=lambda: t.amend_order(client, params), + ) + + @r.post("/tools/cancel_order") + async def _co( + params: t.CancelOrderReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + return await audit_call( + request=request, exchange="ibkr", action="cancel_order", + target_field="order_id", params=params, + tool_fn=lambda: t.cancel_order(client, params), + ) + + @r.post("/tools/cancel_all_orders") + async def _cao( + params: t.CancelAllOrdersReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + return await audit_call( + request=request, exchange="ibkr", action="cancel_all_orders", + params=params, tool_fn=lambda: t.cancel_all_orders(client, params), + ) + + @r.post("/tools/close_position") + async def _cp( + params: t.ClosePositionReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + return await audit_call( + request=request, exchange="ibkr", action="close_position", + target_field="symbol", params=params, + tool_fn=lambda: t.close_position(client, params), + ) + + @r.post("/tools/close_all_positions") + async def _cap( + params: t.CloseAllPositionsReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + return await audit_call( + request=request, exchange="ibkr", action="close_all_positions", + params=params, tool_fn=lambda: t.close_all_positions(client, params), + ) + + # === WRITE complex === + + @r.post("/tools/place_bracket_order") + async def _pbo( + params: t.PlaceBracketOrderReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + creds = _build_creds(request) + return await audit_call( + request=request, exchange="ibkr", action="place_bracket_order", + target_field="symbol", params=params, + tool_fn=lambda: t.place_bracket_order(client, params, creds=creds), + ) + + @r.post("/tools/place_oco_order") + async def _poco( + params: t.PlaceOcoOrderReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + creds = _build_creds(request) + return await audit_call( + request=request, exchange="ibkr", action="place_oco_order", + params=params, + tool_fn=lambda: t.place_oco_order(client, params, creds=creds), + ) + + @r.post("/tools/place_oto_order") + async def _poto( + params: t.PlaceOtoOrderReq, request: Request, + client: IBKRClient = Depends(get_ibkr_client), + ): + creds = _build_creds(request) + return await audit_call( + request=request, exchange="ibkr", action="place_oto_order", + params=params, + tool_fn=lambda: t.place_oto_order(client, params, creds=creds), + ) + + return r +``` + +- [ ] **Step 14.3: Register router in `__main__.py`** + +In `src/cerbero_mcp/__main__.py`: + +```python +from cerbero_mcp.routers import ( + alpaca, + bybit, + deribit, + hyperliquid, + ibkr, + macro, + sentiment, +) +``` + +And in `_make_app`: + +```python + app.include_router(ibkr.make_router()) +``` + +- [ ] **Step 14.4: Verify imports** + +Run: `uv run python -c "from cerbero_mcp.__main__ import _make_app; from cerbero_mcp.settings import Settings; import os; os.environ['TESTNET_TOKEN']='t'; os.environ['MAINNET_TOKEN']='m'; print('ok')"` +Expected: `ok` (or settings validation error if env not set — focus is import success). + +- [ ] **Step 14.5: Run full test suite** + +Run: `uv run pytest tests/unit/exchanges/ibkr/ tests/unit/test_settings.py -v` +Expected: all PASS. + +- [ ] **Step 14.6: Commit** + +```bash +git add src/cerbero_mcp/exchanges/__init__.py src/cerbero_mcp/routers/ibkr.py src/cerbero_mcp/__main__.py +git commit -m "feat(V2): IBKR router wiring + build_client + WS singleton DI + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 15: Admin endpoints — key rotation + IBKR health probe + +**Files:** +- Modify: `src/cerbero_mcp/admin.py` + +- [ ] **Step 15.1: Read existing admin module to see current patterns** + +Run: `cat src/cerbero_mcp/admin.py | head -50` + +- [ ] **Step 15.2: Add IBKR rotation endpoints** + +Append to `src/cerbero_mcp/admin.py` inside `make_admin_router()`: + +```python + from cerbero_mcp.exchanges.ibkr.key_rotation import KeyRotationManager + from pydantic import BaseModel + + class _RotateConfirmReq(BaseModel): + new_consumer_key: str + new_access_token: str + new_access_token_secret: str + + @r.post("/admin/ibkr/rotate-keys/start") + async def _ibkr_rotate_start(env: str, request: Request): + if env not in ("testnet", "mainnet"): + raise HTTPException(400, detail={"error": "invalid env"}) + settings = request.app.state.settings + creds = settings.ibkr.credentials(env) + mgr = KeyRotationManager( + signature_key_path=creds["signature_key_path"], + encryption_key_path=creds["encryption_key_path"], + ) + # Stash on app.state for confirm/abort continuity + rotations = getattr(request.app.state, "ibkr_rotations", None) + if rotations is None: + rotations = {} + request.app.state.ibkr_rotations = rotations + rotations[env] = mgr + return await mgr.start() + + @r.post("/admin/ibkr/rotate-keys/confirm") + async def _ibkr_rotate_confirm( + env: str, body: _RotateConfirmReq, request: Request, + ): + if env not in ("testnet", "mainnet"): + raise HTTPException(400, detail={"error": "invalid env"}) + rotations = getattr(request.app.state, "ibkr_rotations", {}) or {} + mgr = rotations.get(env) + if mgr is None: + raise HTTPException(409, detail={"error": "rotation not started"}) + + # Update settings in-memory before validation probe + settings = request.app.state.settings + if env == "testnet": + settings.ibkr.consumer_key_testnet = body.new_consumer_key + settings.ibkr.access_token_testnet = body.new_access_token + else: + settings.ibkr.consumer_key_live = body.new_consumer_key + settings.ibkr.access_token_live = body.new_access_token + # access_token_secret is SecretStr; assign via constructor wrapper + from pydantic import SecretStr as _SS + if env == "testnet": + settings.ibkr.access_token_secret_testnet = _SS(body.new_access_token_secret) + else: + settings.ibkr.access_token_secret_live = _SS(body.new_access_token_secret) + + # Invalidate cached client + registry = request.app.state.registry + registry._clients.pop(("ibkr", env), None) + + async def _validate() -> bool: + try: + client = await registry.get("ibkr", env) + await client._request("GET", "/iserver/auth/status", skip_tickle=True) + return True + except Exception: + return False + + try: + return await mgr.confirm(validate=_validate) + finally: + rotations.pop(env, None) + + @r.post("/admin/ibkr/rotate-keys/abort") + async def _ibkr_rotate_abort(env: str, request: Request): + rotations = getattr(request.app.state, "ibkr_rotations", {}) or {} + mgr = rotations.pop(env, None) + if mgr is None: + return {"aborted": False, "reason": "no rotation in progress"} + return await mgr.abort() + + @r.post("/admin/ibkr/health") + async def _ibkr_health(request: Request): + registry = request.app.state.registry + out = {} + for env in ("testnet", "mainnet"): + try: + client = await registry.get("ibkr", env) + status = await client._request( + "GET", "/iserver/auth/status", skip_tickle=True + ) + out[env] = {"healthy": True, "status": status} + except Exception as e: + out[env] = {"healthy": False, "error": str(e)[:200]} + return out +``` + +- [ ] **Step 15.3: Run full test suite** + +Run: `uv run pytest tests/unit/ -v` +Expected: existing tests still PASS, no regression. + +- [ ] **Step 15.4: Commit** + +```bash +git add src/cerbero_mcp/admin.py +git commit -m "feat(V2): IBKR key rotation admin endpoints + health probe + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Task 16: OAuth setup script + docker-compose secrets + README + +**Files:** +- Create: `scripts/ibkr_oauth_setup.py` +- Modify: `docker-compose.yml` +- Modify: `README.md` + +- [ ] **Step 16.1: Create setup script** + +Create `scripts/ibkr_oauth_setup.py`: + +```python +#!/usr/bin/env python3 +"""IBKR OAuth 1.0a Self-Service setup helper. + +Phases (run in order, providing flags as you progress): + 1. python scripts/ibkr_oauth_setup.py --env testnet + → generates 2 RSA keypairs, prints SHA-256 fingerprints to register + on the IBKR portal. + 2. (manual) Login at https://www.interactivebrokers.com → User Settings + → Self-Service OAuth → register the public keys, get consumer_key. + 3. python scripts/ibkr_oauth_setup.py --env testnet --consumer-key \\ + --request-token + → exchanges consumer_key for an unauthorized request token + URL. + 4. (manual) Open the URL, approve, copy the verifier code. + 5. python scripts/ibkr_oauth_setup.py --env testnet --verifier + → exchanges verifier for long-lived access_token + secret. + Copy the printed values into .env. + +Repeat for --env mainnet using your live IBKR account. +""" +from __future__ import annotations + +import argparse +import hashlib +import sys +from pathlib import Path + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + + +def _gen_keypair(out: Path) -> str: + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + out.write_bytes(pem) + out.chmod(0o600) + pub = key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + pub_path = out.with_suffix(out.suffix + ".pub") + pub_path.write_bytes(pub) + return f"SHA256:{hashlib.sha256(pub).hexdigest()}" + + +def cmd_init(env: str, secrets_dir: Path) -> int: + secrets_dir.mkdir(parents=True, exist_ok=True) + sig = secrets_dir / f"ibkr_signature_{env}.pem" + enc = secrets_dir / f"ibkr_encryption_{env}.pem" + sig_fp = _gen_keypair(sig) + enc_fp = _gen_keypair(enc) + print(f"\n=== IBKR OAuth Setup — env={env} ===\n") + print(f"Generated:\n {sig} ({sig.stat().st_size} bytes)") + print(f" {enc} ({enc.stat().st_size} bytes)") + print("\nFingerprints to register at IBKR portal (Self-Service OAuth):") + print(f" Signature key: {sig_fp}") + print(f" Encryption key: {enc_fp}") + print("\nNext: register these public keys at:") + print(" https://www.interactivebrokers.com (User Settings → OAuth)") + print(f"\nAlso paste in .env:") + print(f" IBKR_SIGNATURE_KEY_PATH_{env.upper()}={sig}") + print(f" IBKR_ENCRYPTION_KEY_PATH_{env.upper()}={enc}\n") + return 0 + + +def cmd_request_token(env: str, consumer_key: str) -> int: + print(f"\n=== Step 2 — request token for {env} ===\n") + print(f"Consumer key: {consumer_key}") + print( + "\nVisit this URL in a browser, log in to IBKR, authorize the app,\n" + "and copy the displayed verifier code:\n" + ) + print( + f" https://www.interactivebrokers.com/sso/Authenticator?" + f"oauth_consumer_key={consumer_key}&action=request_token\n" + ) + print("Then re-run with: --verifier \n") + return 0 + + +def cmd_verifier(env: str, verifier: str) -> int: + print(f"\n=== Step 3 — exchange verifier for {env} ===\n") + print(f"Verifier received: {verifier[:8]}...") + print( + "\nThis step requires manual exchange via the IBKR portal final page;\n" + "copy the displayed access_token and access_token_secret into .env:\n" + ) + print(f" IBKR_ACCESS_TOKEN_{env.upper()}=") + print(f" IBKR_ACCESS_TOKEN_SECRET_{env.upper()}=\n") + print("Also set:") + print(f" IBKR_CONSUMER_KEY_{env.upper()}=") + print(f" IBKR_DH_PRIME=\n") + return 0 + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--env", choices=["testnet", "mainnet"], required=True) + p.add_argument("--secrets-dir", default="secrets") + p.add_argument("--consumer-key") + p.add_argument("--request-token", action="store_true") + p.add_argument("--verifier") + p.add_argument("--rotate", action="store_true", + help="Generate new keypairs alongside existing (for rotation)") + args = p.parse_args() + + sec_dir = Path(args.secrets_dir) + if args.verifier: + return cmd_verifier(args.env, args.verifier) + if args.consumer_key and args.request_token: + return cmd_request_token(args.env, args.consumer_key) + if args.rotate: + # Generate .new alongside existing files; admin/ibkr/rotate-keys finalizes + for kind in ("signature", "encryption"): + cur = sec_dir / f"ibkr_{kind}_{args.env}.pem" + new = sec_dir / f"ibkr_{kind}_{args.env}.pem.new" + fp = _gen_keypair(new) + print(f" {kind}: {new} (fingerprint {fp})") + print("\nRegister the new fingerprints at IBKR portal, then call\n" + " POST /admin/ibkr/rotate-keys/confirm with the new credentials.") + return 0 + return cmd_init(args.env, sec_dir) + + +if __name__ == "__main__": + sys.exit(main()) +``` + +- [ ] **Step 16.2: Update docker-compose.yml** + +Edit `docker-compose.yml`, add to the `cerbero-mcp` service: + +```yaml + volumes: + - ./secrets:/secrets:ro +``` + +(Insert after the `env_file: .env` line and before `restart: unless-stopped`.) + +- [ ] **Step 16.3: Verify script runs without args showing help** + +Run: `uv run python scripts/ibkr_oauth_setup.py --help` +Expected: argparse help displayed without error. + +- [ ] **Step 16.4: Add IBKR section to README.md** + +Append to `README.md`: + +```markdown +## IBKR Setup + +IBKR uses OAuth 1.0a Self-Service for fully unattended runtime auth. Setup is +manual one-time per account (paper + live), then the container mints live +session tokens autonomously. + +### One-time setup + +1. Login to https://www.interactivebrokers.com → User Settings → Self-Service OAuth +2. Generate keypairs locally: + + ```bash + uv run python scripts/ibkr_oauth_setup.py --env testnet + ``` + + This writes RSA keys under `secrets/` and prints SHA-256 fingerprints. + +3. Register the two fingerprints in the IBKR portal. Receive a `consumer_key`. +4. Get a request token + authorization URL: + + ```bash + uv run python scripts/ibkr_oauth_setup.py --env testnet \ + --consumer-key --request-token + ``` + +5. Open the URL, authorize, copy the `verifier_code`. +6. Exchange verifier for long-lived access token (~5 years validity): + + ```bash + uv run python scripts/ibkr_oauth_setup.py --env testnet --verifier + ``` + +7. Copy the printed values into `.env`: + - `IBKR_CONSUMER_KEY_TESTNET` + - `IBKR_ACCESS_TOKEN_TESTNET` + - `IBKR_ACCESS_TOKEN_SECRET_TESTNET` + - `IBKR_SIGNATURE_KEY_PATH_TESTNET` + - `IBKR_ENCRYPTION_KEY_PATH_TESTNET` + - `IBKR_ACCOUNT_ID_TESTNET` (e.g., `DU1234567` for paper) + - `IBKR_DH_PRIME` (hex from portal; shared paper/live) +8. Repeat with `--env mainnet` for live trading. + +### Smoke test + +```bash +curl https://cerbero-mcp./mcp-ibkr/tools/get_account \ + -H "Authorization: Bearer " -X POST -d '{}' +``` + +### Key rotation + +```bash +# 1. Generate new keypairs alongside existing +uv run python scripts/ibkr_oauth_setup.py --env testnet --rotate + +# 2. Register new fingerprints in IBKR portal, get new consumer_key + tokens + +# 3. Confirm rotation (atomic swap with auto-rollback on validation fail) +curl -X POST "https://cerbero-mcp./admin/ibkr/rotate-keys/confirm?env=testnet" \ + -H "Authorization: Bearer " -H "Content-Type: application/json" \ + -d '{"new_consumer_key":"...","new_access_token":"...","new_access_token_secret":"..."}' +``` +``` + +- [ ] **Step 16.5: Verify everything builds** + +Run: `docker compose build` +Expected: build succeeds. + +- [ ] **Step 16.6: Final test run** + +Run: `uv run pytest tests/unit/ -v` +Expected: all tests PASS. + +- [ ] **Step 16.7: Commit** + +```bash +git add scripts/ibkr_oauth_setup.py docker-compose.yml README.md +git commit -m "feat(V2): IBKR OAuth setup script + docker secrets mount + docs + +Co-Authored-By: Claude Opus 4.7 (1M context) " +``` + +--- + +## Final verification gate + +- [ ] **Run full test suite:** `uv run pytest tests/unit/ -v` → all green +- [ ] **Lint:** `uv run ruff check src/cerbero_mcp/exchanges/ibkr/ src/cerbero_mcp/routers/ibkr.py` → no warnings +- [ ] **Settings load:** `uv run python -c "from cerbero_mcp.settings import Settings; Settings()"` → no validation error with sample `.env` +- [ ] **Build:** `docker compose build && docker compose up -d` → container healthy < 60s +- [ ] **Health probe:** `curl https://cerbero-mcp./health/ready -H "Authorization: Bearer "` → `ibkr` listed (paper account) +- [ ] **Smoke (manual, with real paper credentials):** + - `get_account` → returns paper account summary + - `get_ticker AAPL` → returns last/bid/ask + - `place_order AAPL buy 1 market` → order_id; cancel via `cancel_order` + - `place_bracket_order AAPL buy 1 entry=150 sl=145 tp=160` → 3 order_ids same OCA + - `get_depth AAPL rows=5` → 5-level book + - `POST /admin/ibkr/rotate-keys/start?env=testnet` → fingerprints returned +- [ ] **Rollback test:** revert any single feat commit; remaining tests still pass and other exchanges still function.