from __future__ import annotations import base64 as _b64 import re import time import pytest from cerbero_mcp.exchanges.ibkr.oauth import ( OAuth1aSigner, build_signature_base_string, ) from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa from pytest_httpx import HTTPXMock def test_signature_base_string_canonical_order(): base = build_signature_base_string( method="POST", url="https://api.ibkr.com/v1/api/oauth/live_session_token", params={ "oauth_consumer_key": "TEST_CONSUMER", "oauth_token": "TEST_TOKEN", "oauth_nonce": "abc123", "oauth_timestamp": "1700000000", "oauth_signature_method": "RSA-SHA256", "oauth_version": "1.0", "diffie_hellman_challenge": "ff00", }, ) assert base.startswith("POST&") assert "oauth_consumer_key%3DTEST_CONSUMER" in base idx_consumer = base.index("oauth_consumer_key") idx_token = base.index("oauth_token") assert idx_consumer < idx_token def test_oauth_signer_signs_with_rsa(tmp_path): key = rsa.generate_private_key(public_exponent=65537, key_size=2048) pem = key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) sig_path = tmp_path / "sig.pem" sig_path.write_bytes(pem) enc_path = tmp_path / "enc.pem" enc_path.write_bytes(pem) signer = OAuth1aSigner( consumer_key="TEST_CONSUMER", access_token="TEST_TOKEN", access_token_secret="TEST_SECRET", signature_key_path=str(sig_path), encryption_key_path=str(enc_path), dh_prime="FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF", ) sig = signer.sign( method="GET", url="https://api.ibkr.com/v1/api/iserver/auth/status", params={ "oauth_consumer_key": "TEST_CONSUMER", "oauth_token": "TEST_TOKEN", "oauth_nonce": "abc", "oauth_timestamp": "1700000000", "oauth_signature_method": "RSA-SHA256", "oauth_version": "1.0", }, ) # Verify signature against the public key — proves correctness, not just shape base = build_signature_base_string( method="GET", url="https://api.ibkr.com/v1/api/iserver/auth/status", params={ "oauth_consumer_key": "TEST_CONSUMER", "oauth_token": "TEST_TOKEN", "oauth_nonce": "abc", "oauth_timestamp": "1700000000", "oauth_signature_method": "RSA-SHA256", "oauth_version": "1.0", }, ) key.public_key().verify( _b64.b64decode(sig), base.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256(), ) @pytest.mark.asyncio async def test_live_session_token_mint(httpx_mock: HTTPXMock, tmp_path): key = rsa.generate_private_key(public_exponent=65537, key_size=2048) pem = key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) (tmp_path / "sig.pem").write_bytes(pem) (tmp_path / "enc.pem").write_bytes(pem) # Encrypt fake "real" secret with our public key raw_secret = b"my_real_token_secret" encrypted = key.public_key().encrypt(raw_secret, padding.PKCS1v15()) encrypted_hex = encrypted.hex() httpx_mock.add_response( url=re.compile(r".*/oauth/live_session_token"), json={ "diffie_hellman_response": "ff", "live_session_token_signature": "00" * 20, "live_session_token_expiration": int(time.time() * 1000) + 86400000, }, ) from cerbero_mcp.exchanges.ibkr.oauth import OAuth1aSigner signer = OAuth1aSigner( consumer_key="TEST_CK", access_token="TEST_AT", access_token_secret=encrypted_hex, signature_key_path=str(tmp_path / "sig.pem"), encryption_key_path=str(tmp_path / "enc.pem"), dh_prime="ff", ) lst = await signer.get_live_session_token( base_url="https://api.ibkr.com/v1/api" ) assert isinstance(lst, str) and len(lst) > 0 lst2 = await signer.get_live_session_token( base_url="https://api.ibkr.com/v1/api" ) assert lst == lst2 # cached