diff --git a/src/cerbero_mcp/exchanges/ibkr/oauth.py b/src/cerbero_mcp/exchanges/ibkr/oauth.py index 56245e5..11fb4b4 100644 --- a/src/cerbero_mcp/exchanges/ibkr/oauth.py +++ b/src/cerbero_mcp/exchanges/ibkr/oauth.py @@ -5,8 +5,8 @@ Reference: https://www.interactivebrokers.com/api/doc.html (Self-Service OAuth) from __future__ import annotations import base64 -import hashlib # noqa: F401 -- used in Task 3 (DH live session token mint) -import hmac # noqa: F401 -- used in Task 3 (DH live session token mint) +import hashlib +import hmac import secrets import time from dataclasses import dataclass, field @@ -83,3 +83,85 @@ class OAuth1aSigner: "oauth_signature_method": "RSA-SHA256", "oauth_version": "1.0", } + + async def get_live_session_token(self, *, base_url: str) -> str: + """Restituisce LST cached, riminta se mancante o vicino a scadenza.""" + if self._live_session_token and time.monotonic() < self._lst_expires_at: + return self._live_session_token + return await self._mint_live_session_token(base_url) + + async def _mint_live_session_token(self, base_url: str) -> str: + """DH key exchange + RSA-signed POST /oauth/live_session_token. + + 1. Generate random dh_random + 2. Compute dh_challenge = 2^dh_random mod dh_prime + 3. Decrypt access_token_secret via encryption RSA key + 4. POST signed request with diffie_hellman_challenge + 5. shared = dh_response^dh_random mod dh_prime + 6. LST = HMAC-SHA1(shared, decrypted_secret), base64 + """ + from cerbero_mcp.common.http import async_client + + url = f"{base_url}/oauth/live_session_token" + + prime = int(self.dh_prime, 16) + dh_random = secrets.randbits(256) + dh_challenge = pow(2, dh_random, prime) + dh_challenge_hex = format(dh_challenge, "x") + + try: + assert self._encryption_key is not None + encrypted = bytes.fromhex(self.access_token_secret) + decrypted_secret = self._encryption_key.decrypt( + encrypted, padding.PKCS1v15() + ) + except Exception as e: + raise IBKRAuthError(f"access_token_secret decrypt failed: {e}") from e + + oauth_params = self.make_oauth_params() + oauth_params["diffie_hellman_challenge"] = dh_challenge_hex + signature = self.sign("POST", url, oauth_params) + oauth_params["oauth_signature"] = signature + + auth_header = "OAuth " + ", ".join( + f'{k}="{_percent_encode(v)}"' for k, v in sorted(oauth_params.items()) + ) + + async with async_client(timeout=15.0) as http: + resp = await http.post( + url, + headers={"Authorization": auth_header, "User-Agent": "cerbero-mcp/2.0"}, + ) + if resp.status_code != 200: + raise IBKRAuthError( + f"LST mint failed status={resp.status_code} body={resp.text[:300]}" + ) + data = resp.json() + dh_response = int(data["diffie_hellman_response"], 16) + expires_ms = data.get("live_session_token_expiration", 0) + + shared = pow(dh_response, dh_random, prime) + shared_bytes = shared.to_bytes((shared.bit_length() + 7) // 8, "big") + if shared_bytes and shared_bytes[0] & 0x80: + shared_bytes = b"\x00" + shared_bytes + + lst_raw = hmac.new(shared_bytes, decrypted_secret, hashlib.sha1).digest() + lst = base64.b64encode(lst_raw).decode("ascii") + + self._live_session_token = lst + ttl = max(60.0, expires_ms / 1000 - time.time() - 32400) if expires_ms else 54000.0 + self._lst_expires_at = time.monotonic() + ttl + return lst + + def sign_with_lst(self, method: str, url: str, params: dict[str, str]) -> str: + """Firma HMAC-SHA256 con LST come key (per request post-mint).""" + if not self._live_session_token: + raise IBKRAuthError("LST not minted yet; call get_live_session_token first") + base = build_signature_base_string(method, url, params) + lst_bytes = base64.b64decode(self._live_session_token) + sig = hmac.new(lst_bytes, base.encode("utf-8"), hashlib.sha256).digest() + return base64.b64encode(sig).decode("ascii") + + +class IBKRAuthError(Exception): + """OAuth flow failed (key invalid, consumer revoked, mint failed).""" diff --git a/tests/unit/exchanges/ibkr/test_oauth.py b/tests/unit/exchanges/ibkr/test_oauth.py index 303309c..2cee491 100644 --- a/tests/unit/exchanges/ibkr/test_oauth.py +++ b/tests/unit/exchanges/ibkr/test_oauth.py @@ -1,13 +1,17 @@ from __future__ import annotations import base64 as _b64 +import re +import time +import pytest from cerbero_mcp.exchanges.ibkr.oauth import ( OAuth1aSigner, build_signature_base_string, ) from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa +from pytest_httpx import HTTPXMock def test_signature_base_string_canonical_order(): @@ -83,3 +87,47 @@ def test_oauth_signer_signs_with_rsa(tmp_path): padding.PKCS1v15(), hashes.SHA256(), ) + + +@pytest.mark.asyncio +async def test_live_session_token_mint(httpx_mock: HTTPXMock, tmp_path): + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + (tmp_path / "sig.pem").write_bytes(pem) + (tmp_path / "enc.pem").write_bytes(pem) + + # Encrypt fake "real" secret with our public key + raw_secret = b"my_real_token_secret" + encrypted = key.public_key().encrypt(raw_secret, padding.PKCS1v15()) + encrypted_hex = encrypted.hex() + + httpx_mock.add_response( + url=re.compile(r".*/oauth/live_session_token"), + json={ + "diffie_hellman_response": "ff", + "live_session_token_signature": "00" * 20, + "live_session_token_expiration": int(time.time() * 1000) + 86400000, + }, + ) + + from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner + signer = OAuth1aSigner( + consumer_key="TEST_CK", + access_token="TEST_AT", + access_token_secret=encrypted_hex, + signature_key_path=str(tmp_path / "sig.pem"), + encryption_key_path=str(tmp_path / "enc.pem"), + dh_prime="ff", + ) + lst = await signer.get_live_session_token( + base_url="https://api.ibkr.com/v1/api" + ) + assert isinstance(lst, str) and len(lst) > 0 + lst2 = await signer.get_live_session_token( + base_url="https://api.ibkr.com/v1/api" + ) + assert lst == lst2 # cached