From ae63aaf69acbf374d5591764946647574f31e8e4 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 3 May 2026 20:08:15 +0000 Subject: [PATCH] feat(V2): IBKR OAuth1a signer + RSA-SHA256 signature Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cerbero_mcp/exchanges/ibkr/__init__.py | 0 src/cerbero_mcp/exchanges/ibkr/oauth.py | 82 ++++++++++++++++++++++ tests/unit/exchanges/ibkr/__init__.py | 0 tests/unit/exchanges/ibkr/test_oauth.py | 62 ++++++++++++++++ 4 files changed, 144 insertions(+) create mode 100644 src/cerbero_mcp/exchanges/ibkr/__init__.py create mode 100644 src/cerbero_mcp/exchanges/ibkr/oauth.py create mode 100644 tests/unit/exchanges/ibkr/__init__.py create mode 100644 tests/unit/exchanges/ibkr/test_oauth.py diff --git a/src/cerbero_mcp/exchanges/ibkr/__init__.py b/src/cerbero_mcp/exchanges/ibkr/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cerbero_mcp/exchanges/ibkr/oauth.py b/src/cerbero_mcp/exchanges/ibkr/oauth.py new file mode 100644 index 0000000..f8b462f --- /dev/null +++ b/src/cerbero_mcp/exchanges/ibkr/oauth.py @@ -0,0 +1,82 @@ +"""OAuth 1.0a Self-Service signer for IBKR Client Portal Web API. + +Reference: https://www.interactivebrokers.com/api/doc.html (Self-Service OAuth) +""" +from __future__ import annotations + +import base64 +import hashlib # noqa: F401 +import hmac # noqa: F401 +import secrets +import time +from dataclasses import dataclass, field +from urllib.parse import quote + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding + + +def _percent_encode(value: str) -> str: + """RFC 3986 percent-encoding for OAuth (no `+` for space).""" + return quote(str(value), safe="") + + +def build_signature_base_string( + method: str, url: str, params: dict[str, str] +) -> str: + """Costruisce signature base string OAuth 1.0a: + `&&` + """ + sorted_params = sorted(params.items()) + encoded_pairs = [ + f"{_percent_encode(k)}%3D{_percent_encode(v)}" + for k, v in sorted_params + ] + params_str = "%26".join(encoded_pairs) + return f"{method.upper()}&{_percent_encode(url)}&{params_str}" + + +@dataclass +class OAuth1aSigner: + consumer_key: str + access_token: str + access_token_secret: str + signature_key_path: str + encryption_key_path: str + dh_prime: str # hex string + + _signature_key: object = field(default=None, init=False, repr=False) + _encryption_key: object = field(default=None, init=False, repr=False) + _live_session_token: str | None = field(default=None, init=False, repr=False) + _lst_expires_at: float = field(default=0.0, init=False, repr=False) + + def __post_init__(self) -> None: + with open(self.signature_key_path, "rb") as f: + self._signature_key = serialization.load_pem_private_key( + f.read(), password=None + ) + with open(self.encryption_key_path, "rb") as f: + self._encryption_key = serialization.load_pem_private_key( + f.read(), password=None + ) + + def sign(self, method: str, url: str, params: dict[str, str]) -> str: + """Firma RSA-SHA256 della signature base string. Ritorna base64.""" + base = build_signature_base_string(method, url, params) + signature = self._signature_key.sign( # type: ignore[attr-defined] + base.encode("utf-8"), + padding.PKCS1v15(), + hashes.SHA256(), + ) + return base64.b64encode(signature).decode("ascii") + + def make_oauth_params(self) -> dict[str, str]: + """Genera oauth_nonce/timestamp/version standard.""" + return { + "oauth_consumer_key": self.consumer_key, + "oauth_token": self.access_token, + "oauth_nonce": secrets.token_hex(16), + "oauth_timestamp": str(int(time.time())), + "oauth_signature_method": "RSA-SHA256", + "oauth_version": "1.0", + } diff --git a/tests/unit/exchanges/ibkr/__init__.py b/tests/unit/exchanges/ibkr/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/exchanges/ibkr/test_oauth.py b/tests/unit/exchanges/ibkr/test_oauth.py new file mode 100644 index 0000000..f652105 --- /dev/null +++ b/tests/unit/exchanges/ibkr/test_oauth.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner, build_signature_base_string +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa # noqa: I001 + + +def test_signature_base_string_canonical_order(): + base = build_signature_base_string( + method="POST", + url="https://api.ibkr.com/v1/api/oauth/live_session_token", + params={ + "oauth_consumer_key": "TEST_CONSUMER", + "oauth_token": "TEST_TOKEN", + "oauth_nonce": "abc123", + "oauth_timestamp": "1700000000", + "oauth_signature_method": "RSA-SHA256", + "oauth_version": "1.0", + "diffie_hellman_challenge": "ff00", + }, + ) + assert base.startswith("POST&") + assert "oauth_consumer_key%3DTEST_CONSUMER" in base + idx_consumer = base.index("oauth_consumer_key") + idx_token = base.index("oauth_token") + assert idx_consumer < idx_token + + +def test_oauth_signer_signs_with_rsa(tmp_path): + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + sig_path = tmp_path / "sig.pem" + sig_path.write_bytes(pem) + enc_path = tmp_path / "enc.pem" + enc_path.write_bytes(pem) + + signer = OAuth1aSigner( + consumer_key="TEST_CONSUMER", + access_token="TEST_TOKEN", + access_token_secret="TEST_SECRET", + signature_key_path=str(sig_path), + encryption_key_path=str(enc_path), + dh_prime="FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF", + ) + sig = signer.sign( + method="GET", + url="https://api.ibkr.com/v1/api/iserver/auth/status", + params={ + "oauth_consumer_key": "TEST_CONSUMER", + "oauth_token": "TEST_TOKEN", + "oauth_nonce": "abc", + "oauth_timestamp": "1700000000", + "oauth_signature_method": "RSA-SHA256", + "oauth_version": "1.0", + }, + ) + assert isinstance(sig, str) + assert len(sig) > 100