Files
Cerbero-mcp/tests/unit/exchanges/ibkr/test_oauth.py
T
root a90c5c4d6f refactor(V2): IBKR OAuth signer — type tightening + verify-based test
Code review polish:
- _signature_key/_encryption_key typed as RSAPrivateKey | None
- sign() uses assert instead of type: ignore
- test_oauth_signer_signs_with_rsa verifies signature against public key
- Clarifying comments on %3D/%26 manual encoding and Task 3 imports

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 20:12:48 +00:00

86 lines
3.2 KiB
Python

from __future__ import annotations
import base64 as _b64
from cerbero_mcp.exchanges.ibkr.oauth import (
OAuth1aSigner,
build_signature_base_string,
)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
def test_signature_base_string_canonical_order():
base = build_signature_base_string(
method="POST",
url="https://api.ibkr.com/v1/api/oauth/live_session_token",
params={
"oauth_consumer_key": "TEST_CONSUMER",
"oauth_token": "TEST_TOKEN",
"oauth_nonce": "abc123",
"oauth_timestamp": "1700000000",
"oauth_signature_method": "RSA-SHA256",
"oauth_version": "1.0",
"diffie_hellman_challenge": "ff00",
},
)
assert base.startswith("POST&")
assert "oauth_consumer_key%3DTEST_CONSUMER" in base
idx_consumer = base.index("oauth_consumer_key")
idx_token = base.index("oauth_token")
assert idx_consumer < idx_token
def test_oauth_signer_signs_with_rsa(tmp_path):
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
sig_path = tmp_path / "sig.pem"
sig_path.write_bytes(pem)
enc_path = tmp_path / "enc.pem"
enc_path.write_bytes(pem)
signer = OAuth1aSigner(
consumer_key="TEST_CONSUMER",
access_token="TEST_TOKEN",
access_token_secret="TEST_SECRET",
signature_key_path=str(sig_path),
encryption_key_path=str(enc_path),
dh_prime="FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF",
)
sig = signer.sign(
method="GET",
url="https://api.ibkr.com/v1/api/iserver/auth/status",
params={
"oauth_consumer_key": "TEST_CONSUMER",
"oauth_token": "TEST_TOKEN",
"oauth_nonce": "abc",
"oauth_timestamp": "1700000000",
"oauth_signature_method": "RSA-SHA256",
"oauth_version": "1.0",
},
)
# Verify signature against the public key — proves correctness, not just shape
base = build_signature_base_string(
method="GET",
url="https://api.ibkr.com/v1/api/iserver/auth/status",
params={
"oauth_consumer_key": "TEST_CONSUMER",
"oauth_token": "TEST_TOKEN",
"oauth_nonce": "abc",
"oauth_timestamp": "1700000000",
"oauth_signature_method": "RSA-SHA256",
"oauth_version": "1.0",
},
)
key.public_key().verify(
_b64.b64decode(sig),
base.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)