feat(V2): IBKR live session token mint via DH key exchange

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root
2026-05-03 20:15:17 +00:00
parent a90c5c4d6f
commit 92da6aa842
2 changed files with 132 additions and 2 deletions
+84 -2
View File
@@ -5,8 +5,8 @@ Reference: https://www.interactivebrokers.com/api/doc.html (Self-Service OAuth)
from __future__ import annotations from __future__ import annotations
import base64 import base64
import hashlib # noqa: F401 -- used in Task 3 (DH live session token mint) import hashlib
import hmac # noqa: F401 -- used in Task 3 (DH live session token mint) import hmac
import secrets import secrets
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -83,3 +83,85 @@ class OAuth1aSigner:
"oauth_signature_method": "RSA-SHA256", "oauth_signature_method": "RSA-SHA256",
"oauth_version": "1.0", "oauth_version": "1.0",
} }
async def get_live_session_token(self, *, base_url: str) -> str:
"""Restituisce LST cached, riminta se mancante o vicino a scadenza."""
if self._live_session_token and time.monotonic() < self._lst_expires_at:
return self._live_session_token
return await self._mint_live_session_token(base_url)
async def _mint_live_session_token(self, base_url: str) -> str:
"""DH key exchange + RSA-signed POST /oauth/live_session_token.
1. Generate random dh_random
2. Compute dh_challenge = 2^dh_random mod dh_prime
3. Decrypt access_token_secret via encryption RSA key
4. POST signed request with diffie_hellman_challenge
5. shared = dh_response^dh_random mod dh_prime
6. LST = HMAC-SHA1(shared, decrypted_secret), base64
"""
from cerbero_mcp.common.http import async_client
url = f"{base_url}/oauth/live_session_token"
prime = int(self.dh_prime, 16)
dh_random = secrets.randbits(256)
dh_challenge = pow(2, dh_random, prime)
dh_challenge_hex = format(dh_challenge, "x")
try:
assert self._encryption_key is not None
encrypted = bytes.fromhex(self.access_token_secret)
decrypted_secret = self._encryption_key.decrypt(
encrypted, padding.PKCS1v15()
)
except Exception as e:
raise IBKRAuthError(f"access_token_secret decrypt failed: {e}") from e
oauth_params = self.make_oauth_params()
oauth_params["diffie_hellman_challenge"] = dh_challenge_hex
signature = self.sign("POST", url, oauth_params)
oauth_params["oauth_signature"] = signature
auth_header = "OAuth " + ", ".join(
f'{k}="{_percent_encode(v)}"' for k, v in sorted(oauth_params.items())
)
async with async_client(timeout=15.0) as http:
resp = await http.post(
url,
headers={"Authorization": auth_header, "User-Agent": "cerbero-mcp/2.0"},
)
if resp.status_code != 200:
raise IBKRAuthError(
f"LST mint failed status={resp.status_code} body={resp.text[:300]}"
)
data = resp.json()
dh_response = int(data["diffie_hellman_response"], 16)
expires_ms = data.get("live_session_token_expiration", 0)
shared = pow(dh_response, dh_random, prime)
shared_bytes = shared.to_bytes((shared.bit_length() + 7) // 8, "big")
if shared_bytes and shared_bytes[0] & 0x80:
shared_bytes = b"\x00" + shared_bytes
lst_raw = hmac.new(shared_bytes, decrypted_secret, hashlib.sha1).digest()
lst = base64.b64encode(lst_raw).decode("ascii")
self._live_session_token = lst
ttl = max(60.0, expires_ms / 1000 - time.time() - 32400) if expires_ms else 54000.0
self._lst_expires_at = time.monotonic() + ttl
return lst
def sign_with_lst(self, method: str, url: str, params: dict[str, str]) -> str:
"""Firma HMAC-SHA256 con LST come key (per request post-mint)."""
if not self._live_session_token:
raise IBKRAuthError("LST not minted yet; call get_live_session_token first")
base = build_signature_base_string(method, url, params)
lst_bytes = base64.b64decode(self._live_session_token)
sig = hmac.new(lst_bytes, base.encode("utf-8"), hashlib.sha256).digest()
return base64.b64encode(sig).decode("ascii")
class IBKRAuthError(Exception):
"""OAuth flow failed (key invalid, consumer revoked, mint failed)."""
+48
View File
@@ -1,13 +1,17 @@
from __future__ import annotations from __future__ import annotations
import base64 as _b64 import base64 as _b64
import re
import time
import pytest
from cerbero_mcp.exchanges.ibkr.oauth import ( from cerbero_mcp.exchanges.ibkr.oauth import (
OAuth1aSigner, OAuth1aSigner,
build_signature_base_string, build_signature_base_string,
) )
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.primitives.asymmetric import padding, rsa
from pytest_httpx import HTTPXMock
def test_signature_base_string_canonical_order(): def test_signature_base_string_canonical_order():
@@ -83,3 +87,47 @@ def test_oauth_signer_signs_with_rsa(tmp_path):
padding.PKCS1v15(), padding.PKCS1v15(),
hashes.SHA256(), hashes.SHA256(),
) )
@pytest.mark.asyncio
async def test_live_session_token_mint(httpx_mock: HTTPXMock, tmp_path):
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
(tmp_path / "sig.pem").write_bytes(pem)
(tmp_path / "enc.pem").write_bytes(pem)
# Encrypt fake "real" secret with our public key
raw_secret = b"my_real_token_secret"
encrypted = key.public_key().encrypt(raw_secret, padding.PKCS1v15())
encrypted_hex = encrypted.hex()
httpx_mock.add_response(
url=re.compile(r".*/oauth/live_session_token"),
json={
"diffie_hellman_response": "ff",
"live_session_token_signature": "00" * 20,
"live_session_token_expiration": int(time.time() * 1000) + 86400000,
},
)
from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner
signer = OAuth1aSigner(
consumer_key="TEST_CK",
access_token="TEST_AT",
access_token_secret=encrypted_hex,
signature_key_path=str(tmp_path / "sig.pem"),
encryption_key_path=str(tmp_path / "enc.pem"),
dh_prime="ff",
)
lst = await signer.get_live_session_token(
base_url="https://api.ibkr.com/v1/api"
)
assert isinstance(lst, str) and len(lst) > 0
lst2 = await signer.get_live_session_token(
base_url="https://api.ibkr.com/v1/api"
)
assert lst == lst2 # cached