feat(V2): IBKR OAuth1a signer + RSA-SHA256 signature

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root
2026-05-03 20:08:15 +00:00
parent 92cc45c896
commit ae63aaf69a
4 changed files with 144 additions and 0 deletions
+82
View File
@@ -0,0 +1,82 @@
"""OAuth 1.0a Self-Service signer for IBKR Client Portal Web API.
Reference: https://www.interactivebrokers.com/api/doc.html (Self-Service OAuth)
"""
from __future__ import annotations
import base64
import hashlib # noqa: F401
import hmac # noqa: F401
import secrets
import time
from dataclasses import dataclass, field
from urllib.parse import quote
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
def _percent_encode(value: str) -> str:
"""RFC 3986 percent-encoding for OAuth (no `+` for space)."""
return quote(str(value), safe="")
def build_signature_base_string(
method: str, url: str, params: dict[str, str]
) -> str:
"""Costruisce signature base string OAuth 1.0a:
`<METHOD>&<encoded-url>&<encoded-sorted-params>`
"""
sorted_params = sorted(params.items())
encoded_pairs = [
f"{_percent_encode(k)}%3D{_percent_encode(v)}"
for k, v in sorted_params
]
params_str = "%26".join(encoded_pairs)
return f"{method.upper()}&{_percent_encode(url)}&{params_str}"
@dataclass
class OAuth1aSigner:
consumer_key: str
access_token: str
access_token_secret: str
signature_key_path: str
encryption_key_path: str
dh_prime: str # hex string
_signature_key: object = field(default=None, init=False, repr=False)
_encryption_key: object = field(default=None, init=False, repr=False)
_live_session_token: str | None = field(default=None, init=False, repr=False)
_lst_expires_at: float = field(default=0.0, init=False, repr=False)
def __post_init__(self) -> None:
with open(self.signature_key_path, "rb") as f:
self._signature_key = serialization.load_pem_private_key(
f.read(), password=None
)
with open(self.encryption_key_path, "rb") as f:
self._encryption_key = serialization.load_pem_private_key(
f.read(), password=None
)
def sign(self, method: str, url: str, params: dict[str, str]) -> str:
"""Firma RSA-SHA256 della signature base string. Ritorna base64."""
base = build_signature_base_string(method, url, params)
signature = self._signature_key.sign( # type: ignore[attr-defined]
base.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
return base64.b64encode(signature).decode("ascii")
def make_oauth_params(self) -> dict[str, str]:
"""Genera oauth_nonce/timestamp/version standard."""
return {
"oauth_consumer_key": self.consumer_key,
"oauth_token": self.access_token,
"oauth_nonce": secrets.token_hex(16),
"oauth_timestamp": str(int(time.time())),
"oauth_signature_method": "RSA-SHA256",
"oauth_version": "1.0",
}