feat(V2): IBKR live session token mint via DH key exchange
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -5,8 +5,8 @@ Reference: https://www.interactivebrokers.com/api/doc.html (Self-Service OAuth)
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
import hashlib # noqa: F401 -- used in Task 3 (DH live session token mint)
|
import hashlib
|
||||||
import hmac # noqa: F401 -- used in Task 3 (DH live session token mint)
|
import hmac
|
||||||
import secrets
|
import secrets
|
||||||
import time
|
import time
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
@@ -83,3 +83,85 @@ class OAuth1aSigner:
|
|||||||
"oauth_signature_method": "RSA-SHA256",
|
"oauth_signature_method": "RSA-SHA256",
|
||||||
"oauth_version": "1.0",
|
"oauth_version": "1.0",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async def get_live_session_token(self, *, base_url: str) -> str:
|
||||||
|
"""Restituisce LST cached, riminta se mancante o vicino a scadenza."""
|
||||||
|
if self._live_session_token and time.monotonic() < self._lst_expires_at:
|
||||||
|
return self._live_session_token
|
||||||
|
return await self._mint_live_session_token(base_url)
|
||||||
|
|
||||||
|
async def _mint_live_session_token(self, base_url: str) -> str:
|
||||||
|
"""DH key exchange + RSA-signed POST /oauth/live_session_token.
|
||||||
|
|
||||||
|
1. Generate random dh_random
|
||||||
|
2. Compute dh_challenge = 2^dh_random mod dh_prime
|
||||||
|
3. Decrypt access_token_secret via encryption RSA key
|
||||||
|
4. POST signed request with diffie_hellman_challenge
|
||||||
|
5. shared = dh_response^dh_random mod dh_prime
|
||||||
|
6. LST = HMAC-SHA1(shared, decrypted_secret), base64
|
||||||
|
"""
|
||||||
|
from cerbero_mcp.common.http import async_client
|
||||||
|
|
||||||
|
url = f"{base_url}/oauth/live_session_token"
|
||||||
|
|
||||||
|
prime = int(self.dh_prime, 16)
|
||||||
|
dh_random = secrets.randbits(256)
|
||||||
|
dh_challenge = pow(2, dh_random, prime)
|
||||||
|
dh_challenge_hex = format(dh_challenge, "x")
|
||||||
|
|
||||||
|
try:
|
||||||
|
assert self._encryption_key is not None
|
||||||
|
encrypted = bytes.fromhex(self.access_token_secret)
|
||||||
|
decrypted_secret = self._encryption_key.decrypt(
|
||||||
|
encrypted, padding.PKCS1v15()
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise IBKRAuthError(f"access_token_secret decrypt failed: {e}") from e
|
||||||
|
|
||||||
|
oauth_params = self.make_oauth_params()
|
||||||
|
oauth_params["diffie_hellman_challenge"] = dh_challenge_hex
|
||||||
|
signature = self.sign("POST", url, oauth_params)
|
||||||
|
oauth_params["oauth_signature"] = signature
|
||||||
|
|
||||||
|
auth_header = "OAuth " + ", ".join(
|
||||||
|
f'{k}="{_percent_encode(v)}"' for k, v in sorted(oauth_params.items())
|
||||||
|
)
|
||||||
|
|
||||||
|
async with async_client(timeout=15.0) as http:
|
||||||
|
resp = await http.post(
|
||||||
|
url,
|
||||||
|
headers={"Authorization": auth_header, "User-Agent": "cerbero-mcp/2.0"},
|
||||||
|
)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
raise IBKRAuthError(
|
||||||
|
f"LST mint failed status={resp.status_code} body={resp.text[:300]}"
|
||||||
|
)
|
||||||
|
data = resp.json()
|
||||||
|
dh_response = int(data["diffie_hellman_response"], 16)
|
||||||
|
expires_ms = data.get("live_session_token_expiration", 0)
|
||||||
|
|
||||||
|
shared = pow(dh_response, dh_random, prime)
|
||||||
|
shared_bytes = shared.to_bytes((shared.bit_length() + 7) // 8, "big")
|
||||||
|
if shared_bytes and shared_bytes[0] & 0x80:
|
||||||
|
shared_bytes = b"\x00" + shared_bytes
|
||||||
|
|
||||||
|
lst_raw = hmac.new(shared_bytes, decrypted_secret, hashlib.sha1).digest()
|
||||||
|
lst = base64.b64encode(lst_raw).decode("ascii")
|
||||||
|
|
||||||
|
self._live_session_token = lst
|
||||||
|
ttl = max(60.0, expires_ms / 1000 - time.time() - 32400) if expires_ms else 54000.0
|
||||||
|
self._lst_expires_at = time.monotonic() + ttl
|
||||||
|
return lst
|
||||||
|
|
||||||
|
def sign_with_lst(self, method: str, url: str, params: dict[str, str]) -> str:
|
||||||
|
"""Firma HMAC-SHA256 con LST come key (per request post-mint)."""
|
||||||
|
if not self._live_session_token:
|
||||||
|
raise IBKRAuthError("LST not minted yet; call get_live_session_token first")
|
||||||
|
base = build_signature_base_string(method, url, params)
|
||||||
|
lst_bytes = base64.b64decode(self._live_session_token)
|
||||||
|
sig = hmac.new(lst_bytes, base.encode("utf-8"), hashlib.sha256).digest()
|
||||||
|
return base64.b64encode(sig).decode("ascii")
|
||||||
|
|
||||||
|
|
||||||
|
class IBKRAuthError(Exception):
|
||||||
|
"""OAuth flow failed (key invalid, consumer revoked, mint failed)."""
|
||||||
|
|||||||
@@ -1,13 +1,17 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import base64 as _b64
|
import base64 as _b64
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
from cerbero_mcp.exchanges.ibkr.oauth import (
|
from cerbero_mcp.exchanges.ibkr.oauth import (
|
||||||
OAuth1aSigner,
|
OAuth1aSigner,
|
||||||
build_signature_base_string,
|
build_signature_base_string,
|
||||||
)
|
)
|
||||||
from cryptography.hazmat.primitives import hashes, serialization
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||||
|
from pytest_httpx import HTTPXMock
|
||||||
|
|
||||||
|
|
||||||
def test_signature_base_string_canonical_order():
|
def test_signature_base_string_canonical_order():
|
||||||
@@ -83,3 +87,47 @@ def test_oauth_signer_signs_with_rsa(tmp_path):
|
|||||||
padding.PKCS1v15(),
|
padding.PKCS1v15(),
|
||||||
hashes.SHA256(),
|
hashes.SHA256(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_live_session_token_mint(httpx_mock: HTTPXMock, tmp_path):
|
||||||
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||||
|
pem = key.private_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||||
|
encryption_algorithm=serialization.NoEncryption(),
|
||||||
|
)
|
||||||
|
(tmp_path / "sig.pem").write_bytes(pem)
|
||||||
|
(tmp_path / "enc.pem").write_bytes(pem)
|
||||||
|
|
||||||
|
# Encrypt fake "real" secret with our public key
|
||||||
|
raw_secret = b"my_real_token_secret"
|
||||||
|
encrypted = key.public_key().encrypt(raw_secret, padding.PKCS1v15())
|
||||||
|
encrypted_hex = encrypted.hex()
|
||||||
|
|
||||||
|
httpx_mock.add_response(
|
||||||
|
url=re.compile(r".*/oauth/live_session_token"),
|
||||||
|
json={
|
||||||
|
"diffie_hellman_response": "ff",
|
||||||
|
"live_session_token_signature": "00" * 20,
|
||||||
|
"live_session_token_expiration": int(time.time() * 1000) + 86400000,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner
|
||||||
|
signer = OAuth1aSigner(
|
||||||
|
consumer_key="TEST_CK",
|
||||||
|
access_token="TEST_AT",
|
||||||
|
access_token_secret=encrypted_hex,
|
||||||
|
signature_key_path=str(tmp_path / "sig.pem"),
|
||||||
|
encryption_key_path=str(tmp_path / "enc.pem"),
|
||||||
|
dh_prime="ff",
|
||||||
|
)
|
||||||
|
lst = await signer.get_live_session_token(
|
||||||
|
base_url="https://api.ibkr.com/v1/api"
|
||||||
|
)
|
||||||
|
assert isinstance(lst, str) and len(lst) > 0
|
||||||
|
lst2 = await signer.get_live_session_token(
|
||||||
|
base_url="https://api.ibkr.com/v1/api"
|
||||||
|
)
|
||||||
|
assert lst == lst2 # cached
|
||||||
|
|||||||
Reference in New Issue
Block a user