diff --git a/src/cerbero_mcp/exchanges/ibkr/oauth.py b/src/cerbero_mcp/exchanges/ibkr/oauth.py index f8b462f..56245e5 100644 --- a/src/cerbero_mcp/exchanges/ibkr/oauth.py +++ b/src/cerbero_mcp/exchanges/ibkr/oauth.py @@ -5,8 +5,8 @@ Reference: https://www.interactivebrokers.com/api/doc.html (Self-Service OAuth) from __future__ import annotations import base64 -import hashlib # noqa: F401 -import hmac # noqa: F401 +import hashlib # noqa: F401 -- used in Task 3 (DH live session token mint) +import hmac # noqa: F401 -- used in Task 3 (DH live session token mint) import secrets import time from dataclasses import dataclass, field @@ -14,6 +14,7 @@ from urllib.parse import quote from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey def _percent_encode(value: str) -> str: @@ -32,6 +33,7 @@ def build_signature_base_string( f"{_percent_encode(k)}%3D{_percent_encode(v)}" for k, v in sorted_params ] + # Manual %3D / %26 = double-encoding of '=' and '&' per RFC 5849 §3.4.1.3.2 params_str = "%26".join(encoded_pairs) return f"{method.upper()}&{_percent_encode(url)}&{params_str}" @@ -45,8 +47,8 @@ class OAuth1aSigner: encryption_key_path: str dh_prime: str # hex string - _signature_key: object = field(default=None, init=False, repr=False) - _encryption_key: object = field(default=None, init=False, repr=False) + _signature_key: RSAPrivateKey | None = field(default=None, init=False, repr=False) + _encryption_key: RSAPrivateKey | None = field(default=None, init=False, repr=False) _live_session_token: str | None = field(default=None, init=False, repr=False) _lst_expires_at: float = field(default=0.0, init=False, repr=False) @@ -62,8 +64,9 @@ class OAuth1aSigner: def sign(self, method: str, url: str, params: dict[str, str]) -> str: """Firma RSA-SHA256 della signature base string. Ritorna base64.""" + assert self._signature_key is not None # set in __post_init__ base = build_signature_base_string(method, url, params) - signature = self._signature_key.sign( # type: ignore[attr-defined] + signature = self._signature_key.sign( base.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256(), diff --git a/tests/unit/exchanges/ibkr/test_oauth.py b/tests/unit/exchanges/ibkr/test_oauth.py index f652105..303309c 100644 --- a/tests/unit/exchanges/ibkr/test_oauth.py +++ b/tests/unit/exchanges/ibkr/test_oauth.py @@ -1,8 +1,13 @@ from __future__ import annotations -from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner, build_signature_base_string -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa # noqa: I001 +import base64 as _b64 + +from cerbero_mcp.exchanges.ibkr.oauth import ( + OAuth1aSigner, + build_signature_base_string, +) +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa def test_signature_base_string_canonical_order(): @@ -58,5 +63,23 @@ def test_oauth_signer_signs_with_rsa(tmp_path): "oauth_version": "1.0", }, ) - assert isinstance(sig, str) - assert len(sig) > 100 + + # Verify signature against the public key — proves correctness, not just shape + base = build_signature_base_string( + method="GET", + url="https://api.ibkr.com/v1/api/iserver/auth/status", + params={ + "oauth_consumer_key": "TEST_CONSUMER", + "oauth_token": "TEST_TOKEN", + "oauth_nonce": "abc", + "oauth_timestamp": "1700000000", + "oauth_signature_method": "RSA-SHA256", + "oauth_version": "1.0", + }, + ) + key.public_key().verify( + _b64.b64decode(sig), + base.encode("utf-8"), + padding.PKCS1v15(), + hashes.SHA256(), + )