b49b2b36e0
Code review fixes (commit 92da6aa):
- LST refresh buffer / fallback TTL extracted as named module constants
- Replace `assert` with explicit `if/raise` (asserts stripped under -O)
- Move IBKRAuthError above OAuth1aSigner (forward declaration)
- async_client import lifted to module level
- Test uses actual prime (23) instead of composite (255)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
133 lines
4.8 KiB
Python
133 lines
4.8 KiB
Python
from __future__ import annotations
|
|
|
|
import base64 as _b64
|
|
import re
|
|
import time
|
|
|
|
import pytest
|
|
from cerbero_mcp.exchanges.ibkr.oauth import (
|
|
OAuth1aSigner,
|
|
build_signature_base_string,
|
|
)
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
|
from pytest_httpx import HTTPXMock
|
|
|
|
|
|
def test_signature_base_string_canonical_order():
|
|
base = build_signature_base_string(
|
|
method="POST",
|
|
url="https://api.ibkr.com/v1/api/oauth/live_session_token",
|
|
params={
|
|
"oauth_consumer_key": "TEST_CONSUMER",
|
|
"oauth_token": "TEST_TOKEN",
|
|
"oauth_nonce": "abc123",
|
|
"oauth_timestamp": "1700000000",
|
|
"oauth_signature_method": "RSA-SHA256",
|
|
"oauth_version": "1.0",
|
|
"diffie_hellman_challenge": "ff00",
|
|
},
|
|
)
|
|
assert base.startswith("POST&")
|
|
assert "oauth_consumer_key%3DTEST_CONSUMER" in base
|
|
idx_consumer = base.index("oauth_consumer_key")
|
|
idx_token = base.index("oauth_token")
|
|
assert idx_consumer < idx_token
|
|
|
|
|
|
def test_oauth_signer_signs_with_rsa(tmp_path):
|
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
pem = key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
sig_path = tmp_path / "sig.pem"
|
|
sig_path.write_bytes(pem)
|
|
enc_path = tmp_path / "enc.pem"
|
|
enc_path.write_bytes(pem)
|
|
|
|
signer = OAuth1aSigner(
|
|
consumer_key="TEST_CONSUMER",
|
|
access_token="TEST_TOKEN",
|
|
access_token_secret="TEST_SECRET",
|
|
signature_key_path=str(sig_path),
|
|
encryption_key_path=str(enc_path),
|
|
dh_prime="FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF",
|
|
)
|
|
sig = signer.sign(
|
|
method="GET",
|
|
url="https://api.ibkr.com/v1/api/iserver/auth/status",
|
|
params={
|
|
"oauth_consumer_key": "TEST_CONSUMER",
|
|
"oauth_token": "TEST_TOKEN",
|
|
"oauth_nonce": "abc",
|
|
"oauth_timestamp": "1700000000",
|
|
"oauth_signature_method": "RSA-SHA256",
|
|
"oauth_version": "1.0",
|
|
},
|
|
)
|
|
|
|
# Verify signature against the public key — proves correctness, not just shape
|
|
base = build_signature_base_string(
|
|
method="GET",
|
|
url="https://api.ibkr.com/v1/api/iserver/auth/status",
|
|
params={
|
|
"oauth_consumer_key": "TEST_CONSUMER",
|
|
"oauth_token": "TEST_TOKEN",
|
|
"oauth_nonce": "abc",
|
|
"oauth_timestamp": "1700000000",
|
|
"oauth_signature_method": "RSA-SHA256",
|
|
"oauth_version": "1.0",
|
|
},
|
|
)
|
|
key.public_key().verify(
|
|
_b64.b64decode(sig),
|
|
base.encode("utf-8"),
|
|
padding.PKCS1v15(),
|
|
hashes.SHA256(),
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_live_session_token_mint(httpx_mock: HTTPXMock, tmp_path):
|
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
pem = key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
(tmp_path / "sig.pem").write_bytes(pem)
|
|
(tmp_path / "enc.pem").write_bytes(pem)
|
|
|
|
# Encrypt fake "real" secret with our public key
|
|
raw_secret = b"my_real_token_secret"
|
|
encrypted = key.public_key().encrypt(raw_secret, padding.PKCS1v15())
|
|
encrypted_hex = encrypted.hex()
|
|
|
|
httpx_mock.add_response(
|
|
url=re.compile(r".*/oauth/live_session_token"),
|
|
json={
|
|
"diffie_hellman_response": "ff",
|
|
"live_session_token_signature": "00" * 20,
|
|
"live_session_token_expiration": int(time.time() * 1000) + 86400000,
|
|
},
|
|
)
|
|
|
|
signer = OAuth1aSigner(
|
|
consumer_key="TEST_CK",
|
|
access_token="TEST_AT",
|
|
access_token_secret=encrypted_hex,
|
|
signature_key_path=str(tmp_path / "sig.pem"),
|
|
encryption_key_path=str(tmp_path / "enc.pem"),
|
|
dh_prime="17", # 23 — smallest prime > 16 that fits a 1-byte modulus
|
|
)
|
|
lst = await signer.get_live_session_token(
|
|
base_url="https://api.ibkr.com/v1/api"
|
|
)
|
|
assert isinstance(lst, str) and len(lst) > 0
|
|
lst2 = await signer.get_live_session_token(
|
|
base_url="https://api.ibkr.com/v1/api"
|
|
)
|
|
assert lst == lst2 # cached
|