refactor(V2): hyperliquid client da SDK a httpx + eth-account EIP-712 (parità V1)

Riscritto interamente HyperliquidClient su httpx puro + eth-account per la
firma EIP-712 L1 (chainId 1337, phantom agent source 'a'/'b' per
mainnet/testnet). Bit-parity verificata contro hyperliquid.utils.signing
in test_signing_parity_with_canonical_sdk.

16 metodi pubblici, 26 test passanti. Aggiunte deps: eth-account, msgpack,
eth-utils. hyperliquid-python-sdk ancora presente nel pyproject; rimossa
nel sweep finale.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
AdrianoDev
2026-05-01 01:39:23 +02:00
parent 44c7a18d3e
commit c0b4cb5d5c
4 changed files with 619 additions and 127 deletions
+335 -110
View File
@@ -1,11 +1,31 @@
"""Hyperliquid REST API client for perpetual futures trading."""
"""Hyperliquid REST API client for perpetual futures trading.
Pure ``httpx`` + ``eth-account`` implementation: no dependency on
``hyperliquid-python-sdk``. Read endpoints hit ``POST /info`` (no auth);
write endpoints hit ``POST /exchange`` and require an EIP-712 L1 signature.
The signing scheme is bit-for-bit equivalent to the canonical SDK:
action_hash = keccak( msgpack(action) || nonce[u64 BE] || vault_marker
|| (expires_after marker || expires_after[u64 BE])? )
phantom = {"source": "a"|"b", "connectionId": action_hash} # a=mainnet, b=testnet
EIP-712 domain: name="Exchange", version="1", chainId=1337,
verifyingContract=0x0
"""
from __future__ import annotations
import asyncio
import datetime as _dt
import time as _time
from decimal import Decimal
from typing import Any
import httpx
import msgpack
from eth_account import Account
from eth_account.messages import encode_typed_data
from eth_utils import keccak, to_hex
from cerbero_mcp.common import indicators as ind
from cerbero_mcp.common.http import async_client
@@ -21,14 +41,8 @@ RESOLUTION_MAP = {
"1d": "1d",
}
try:
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.utils import constants as hl_constants
_SDK_AVAILABLE = True
except ImportError: # pragma: no cover
_SDK_AVAILABLE = False
# Slippage usato per market order / market_close (parità con SDK).
DEFAULT_SLIPPAGE = 0.05
def _to_ms(date_str: str) -> int:
@@ -39,11 +53,91 @@ def _to_ms(date_str: str) -> int:
return int(dt.timestamp() * 1000)
class HyperliquidClient:
"""Async client for the Hyperliquid API.
def _float_to_wire(x: float) -> str:
"""Convert a price/size float to Hyperliquid wire string format.
Read operations use direct HTTP calls via httpx against /info.
Write operations delegate to hyperliquid-python-sdk for EIP-712 signing.
8 decimal places, no trailing zeros (matching SDK ``float_to_wire``).
"""
rounded = f"{x:.8f}"
if abs(float(rounded) - x) >= 1e-12:
raise ValueError("float_to_wire causes rounding", x)
if rounded == "-0":
rounded = "0"
normalized = Decimal(rounded).normalize()
return f"{normalized:f}"
def _address_to_bytes(address: str) -> bytes:
return bytes.fromhex(address.removeprefix("0x"))
def _action_hash(
action: Any,
vault_address: str | None,
nonce: int,
expires_after: int | None,
) -> bytes:
"""Deterministic action hash (msgpack + nonce + vault + expires)."""
data = msgpack.packb(action)
data += nonce.to_bytes(8, "big")
if vault_address is None:
data += b"\x00"
else:
data += b"\x01"
data += _address_to_bytes(vault_address)
if expires_after is not None:
data += b"\x00"
data += expires_after.to_bytes(8, "big")
return keccak(data)
def _l1_payload(phantom_agent: dict[str, Any]) -> dict[str, Any]:
return {
"domain": {
"chainId": 1337,
"name": "Exchange",
"verifyingContract": "0x0000000000000000000000000000000000000000",
"version": "1",
},
"types": {
"Agent": [
{"name": "source", "type": "string"},
{"name": "connectionId", "type": "bytes32"},
],
"EIP712Domain": [
{"name": "name", "type": "string"},
{"name": "version", "type": "string"},
{"name": "chainId", "type": "uint256"},
{"name": "verifyingContract", "type": "address"},
],
},
"primaryType": "Agent",
"message": phantom_agent,
}
def _sign_l1_action(
private_key: str,
action: Any,
vault_address: str | None,
nonce: int,
expires_after: int | None,
is_mainnet: bool,
) -> dict[str, Any]:
h = _action_hash(action, vault_address, nonce, expires_after)
phantom_agent = {"source": "a" if is_mainnet else "b", "connectionId": h}
payload = _l1_payload(phantom_agent)
encoded = encode_typed_data(full_message=payload)
signed = Account.from_key(private_key).sign_message(encoded)
return {"r": to_hex(signed["r"]), "s": to_hex(signed["s"]), "v": signed["v"]}
class HyperliquidClient:
"""Async client for the Hyperliquid REST API.
Read operations call ``POST /info`` directly via ``httpx``.
Write operations build an EIP-712 L1 signature in-process (no SDK)
and call ``POST /exchange``.
"""
def __init__(
@@ -63,53 +157,99 @@ class HyperliquidClient:
self.base_url = base_url
else:
self.base_url = BASE_TESTNET if testnet else BASE_LIVE
self._exchange: Any | None = None
self._is_mainnet = self.base_url == BASE_LIVE
self.vault_address: str | None = None
# Persistent async client (riutilizzato per /exchange e /info).
self._http: httpx.AsyncClient | None = None
# Cache name → asset id (perp universe).
self._name_to_asset: dict[str, int] | None = None
# ── SDK exchange (lazy) ────────────────────────────────────
def _get_exchange(self) -> Any:
"""Return (and cache) an SDK Exchange instance for write ops."""
if not _SDK_AVAILABLE:
raise RuntimeError(
"hyperliquid-python-sdk is not installed; write operations unavailable."
)
if self._exchange is None:
account = Account.from_key(self.private_key)
if self._base_url_override:
sdk_base_url = self._base_url_override
else:
sdk_base_url = (
hl_constants.TESTNET_API_URL if self.testnet else hl_constants.MAINNET_API_URL
)
empty_spot_meta: dict[str, Any] = {"universe": [], "tokens": []}
self._exchange = Exchange(
account,
sdk_base_url,
account_address=self.wallet_address,
spot_meta=empty_spot_meta,
)
return self._exchange
async def aclose(self) -> None:
"""Close the underlying HTTP client (if any)."""
if self._http is not None:
await self._http.aclose()
self._http = None
# ── Internal helpers ───────────────────────────────────────
async def _post(self, payload: dict[str, Any]) -> Any:
"""POST JSON to the /info endpoint."""
async def _post_info(self, payload: dict[str, Any]) -> Any:
"""POST a JSON payload to ``/info`` (read-only, no auth)."""
async with async_client(timeout=15.0) as http:
resp = await http.post(f"{self.base_url}/info", json=payload)
resp.raise_for_status()
return resp.json()
# backward-compat alias (interno).
async def _post(self, payload: dict[str, Any]) -> Any:
return await self._post_info(payload)
async def _post_exchange(
self,
action: dict[str, Any],
nonce: int | None = None,
vault_address: str | None = None,
) -> Any:
"""Sign and POST an action to ``/exchange``."""
if nonce is None:
nonce = int(_time.time() * 1000)
vault = vault_address if vault_address is not None else self.vault_address
signature = _sign_l1_action(
self.private_key,
action,
vault,
nonce,
None, # expires_after: not used here
self._is_mainnet,
)
payload: dict[str, Any] = {
"action": action,
"nonce": nonce,
"signature": signature,
"vaultAddress": vault,
"expiresAfter": None,
}
async with async_client(timeout=15.0) as http:
resp = await http.post(f"{self.base_url}/exchange", json=payload)
resp.raise_for_status()
return resp.json()
async def _name_to_asset_id(self, name: str) -> int:
"""Resolve a perp coin name (e.g. ``BTC``) to its asset id.
The asset id is the index in the ``meta.universe`` array. Cached
per-client; refreshed if the requested name is missing.
"""
upper = name.upper()
if self._name_to_asset is None or upper not in self._name_to_asset:
meta = await self._post_info({"type": "meta"})
universe = meta.get("universe", [])
self._name_to_asset = {
m["name"].upper(): idx for idx, m in enumerate(universe)
}
if upper not in self._name_to_asset:
raise ValueError(f"Unknown asset: {name}")
return self._name_to_asset[upper]
@staticmethod
async def _run_sync(func: Any, *args: Any, **kwargs: Any) -> Any:
"""Run a synchronous SDK call in the default executor."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def _order_type_to_wire(order_type: dict[str, Any]) -> dict[str, Any]:
if "limit" in order_type:
return {"limit": order_type["limit"]}
if "trigger" in order_type:
t = order_type["trigger"]
return {
"trigger": {
"isMarket": t["isMarket"],
"triggerPx": _float_to_wire(float(t["triggerPx"])),
"tpsl": t["tpsl"],
}
}
raise ValueError("Invalid order type", order_type)
# ── Read tools ─────────────────────────────────────────────
async def get_markets(self) -> list[dict[str, Any]]:
"""List all perp markets with metadata and current stats."""
data = await self._post({"type": "metaAndAssetCtxs"})
data = await self._post_info({"type": "metaAndAssetCtxs"})
universe = data[0]["universe"]
ctx_list = data[1]
markets = []
@@ -144,7 +284,7 @@ class HyperliquidClient:
async def get_orderbook(self, instrument: str, depth: int = 10) -> dict[str, Any]:
"""Get L2 order book for an asset."""
data = await self._post({"type": "l2Book", "coin": instrument.upper()})
data = await self._post_info({"type": "l2Book", "coin": instrument.upper()})
levels = data.get("levels", [[], []])
bids = [{"price": float(b["px"]), "size": float(b["sz"])} for b in levels[0][:depth]]
asks = [{"price": float(a["px"]), "size": float(a["sz"])} for a in levels[1][:depth]]
@@ -152,7 +292,7 @@ class HyperliquidClient:
async def get_positions(self) -> list[dict[str, Any]]:
"""Get open positions for the wallet."""
data = await self._post(
data = await self._post_info(
{"type": "clearinghouseState", "user": self.wallet_address}
)
positions = []
@@ -184,9 +324,9 @@ class HyperliquidClient:
"""Get account summary (equity, balance, margin) including spot balances.
Con Unified Account, spot USDC e perps condividono collaterale.
`spot_fetch_ok` / `perps_fetch_ok` indicano se i due lati sono stati
``spot_fetch_ok`` / ``perps_fetch_ok`` indicano se i due lati sono stati
letti correttamente: se uno dei due è False il chiamante dovrebbe
considerare `equity`/`available_balance` un lower bound.
considerare ``equity``/``available_balance`` un lower bound.
"""
perps_fetch_ok = True
perps_equity = 0.0
@@ -194,7 +334,7 @@ class HyperliquidClient:
margin_used = 0.0
unrealized_pnl = 0.0
try:
data = await self._post(
data = await self._post_info(
{"type": "clearinghouseState", "user": self.wallet_address}
)
margin = data.get("marginSummary") or {}
@@ -208,7 +348,7 @@ class HyperliquidClient:
spot_fetch_ok = True
spot_usdc = 0.0
try:
spot_data = await self._post(
spot_data = await self._post_info(
{"type": "spotClearinghouseState", "user": self.wallet_address}
)
for b in spot_data.get("balances", []) or []:
@@ -233,7 +373,9 @@ class HyperliquidClient:
async def get_trade_history(self, limit: int = 100) -> list[dict[str, Any]]:
"""Get recent trade fills."""
data = await self._post({"type": "userFills", "user": self.wallet_address})
data = await self._post_info(
{"type": "userFills", "user": self.wallet_address}
)
trades = []
for t in data[:limit]:
trades.append(
@@ -255,7 +397,7 @@ class HyperliquidClient:
start_ms = _to_ms(start_date)
end_ms = _to_ms(end_date)
interval = RESOLUTION_MAP.get(resolution, resolution)
data = await self._post(
data = await self._post_info(
{
"type": "candleSnapshot",
"req": {
@@ -282,7 +424,9 @@ class HyperliquidClient:
async def get_open_orders(self) -> list[dict[str, Any]]:
"""Get all open orders for the wallet."""
data = await self._post({"type": "openOrders", "user": self.wallet_address})
data = await self._post_info(
{"type": "openOrders", "user": self.wallet_address}
)
orders = []
for o in data:
orders.append(
@@ -326,7 +470,7 @@ class HyperliquidClient:
# Perp price + funding from HL
try:
ctx = await self._post({"type": "metaAndAssetCtxs"})
ctx = await self._post_info({"type": "metaAndAssetCtxs"})
universe = ctx[0]["universe"]
ctx_list = ctx[1]
perp_price = None
@@ -375,7 +519,7 @@ class HyperliquidClient:
async def get_funding_rate(self, instrument: str) -> dict[str, Any]:
"""Get current and recent historical funding rates for an asset."""
data = await self._post({"type": "metaAndAssetCtxs"})
data = await self._post_info({"type": "metaAndAssetCtxs"})
universe = data[0]["universe"]
ctx_list = data[1]
current_rate = None
@@ -389,7 +533,7 @@ class HyperliquidClient:
# Fetch funding history (last 7 days)
end_ms = int(_dt.datetime.utcnow().timestamp() * 1000)
start_ms = end_ms - 7 * 24 * 3600 * 1000
history_data = await self._post(
history_data = await self._post_info(
{
"type": "fundingHistory",
"coin": instrument.upper(),
@@ -443,44 +587,10 @@ class HyperliquidClient:
result[name] = None
return result
# ── Write tools (via SDK) ──────────────────────────────────
async def place_order(
self,
instrument: str,
side: str,
amount: float,
type: str = "limit",
price: float | None = None,
reduce_only: bool = False,
) -> dict[str, Any]:
"""Place an order on Hyperliquid using the SDK for EIP-712 signing."""
exchange = self._get_exchange()
is_buy = side.lower() in ("buy", "long")
coin = instrument.upper()
if type == "market":
ot: dict[str, Any] = {"limit": {"tif": "Ioc"}}
if price is None:
ticker = await self.get_ticker(coin)
mark = ticker.get("mark_price", 0)
price = round(mark * 1.03, 1) if is_buy else round(mark * 0.97, 1)
elif type in ("stop_market", "stop_loss"):
assert price is not None
ot = {"trigger": {"triggerPx": float(price), "isMarket": True, "tpsl": "sl"}}
elif type == "take_profit":
assert price is not None
ot = {"trigger": {"triggerPx": float(price), "isMarket": True, "tpsl": "tp"}}
else:
ot = {"limit": {"tif": "Gtc"}}
if price is None:
return {"error": "price is required for limit orders"}
result = await self._run_sync(
exchange.order, coin, is_buy, amount, price, ot, reduce_only
)
# ── Write tools (signed) ──────────────────────────────────
@staticmethod
def _parse_order_response(result: dict[str, Any]) -> dict[str, Any]:
status = result.get("status", "unknown")
response = result.get("response", {})
if isinstance(response, str):
@@ -491,7 +601,6 @@ class HyperliquidClient:
"filled_size": 0,
"avg_fill_price": 0,
}
statuses = response.get("data", {}).get("statuses", [{}])
first = statuses[0] if statuses else {}
if isinstance(first, str):
@@ -511,12 +620,95 @@ class HyperliquidClient:
"avg_fill_price": float(first.get("filled", {}).get("avgPx", 0)),
}
async def place_order(
self,
instrument: str,
side: str,
amount: float,
type: str = "limit",
price: float | None = None,
reduce_only: bool = False,
) -> dict[str, Any]:
"""Place an order on Hyperliquid (signed via EIP-712)."""
is_buy = side.lower() in ("buy", "long")
coin = instrument.upper()
if type == "market":
order_type: dict[str, Any] = {"limit": {"tif": "Ioc"}}
if price is None:
ticker = await self.get_ticker(coin)
mark = ticker.get("mark_price", 0)
price = round(mark * 1.03, 1) if is_buy else round(mark * 0.97, 1)
elif type in ("stop_market", "stop_loss"):
assert price is not None
order_type = {
"trigger": {
"triggerPx": float(price),
"isMarket": True,
"tpsl": "sl",
}
}
elif type == "take_profit":
assert price is not None
order_type = {
"trigger": {
"triggerPx": float(price),
"isMarket": True,
"tpsl": "tp",
}
}
else:
order_type = {"limit": {"tif": "Gtc"}}
if price is None:
return {"error": "price is required for limit orders"}
try:
asset_id = await self._name_to_asset_id(coin)
except ValueError as exc:
return {"error": str(exc), "order_id": "", "filled_size": 0, "avg_fill_price": 0}
order_wire: dict[str, Any] = {
"a": asset_id,
"b": is_buy,
"p": _float_to_wire(float(price)),
"s": _float_to_wire(float(amount)),
"r": reduce_only,
"t": self._order_type_to_wire(order_type),
}
action: dict[str, Any] = {
"type": "order",
"orders": [order_wire],
"grouping": "na",
}
try:
result = await self._post_exchange(action)
except httpx.HTTPError as exc:
return {
"status": "error",
"error": str(exc),
"order_id": "",
"filled_size": 0,
"avg_fill_price": 0,
}
return self._parse_order_response(result)
async def cancel_order(self, order_id: str, instrument: str) -> dict[str, Any]:
"""Cancel an existing order using the SDK."""
exchange = self._get_exchange()
result = await self._run_sync(
exchange.cancel, instrument.upper(), int(order_id)
)
"""Cancel an existing order via signed ``cancel`` action."""
try:
asset_id = await self._name_to_asset_id(instrument)
except ValueError as exc:
return {"order_id": order_id, "status": "error", "error": str(exc)}
action: dict[str, Any] = {
"type": "cancel",
"cancels": [{"a": asset_id, "o": int(order_id)}],
}
try:
result = await self._post_exchange(action)
except httpx.HTTPError as exc:
return {"order_id": order_id, "status": "error", "error": str(exc)}
status = result.get("status", "unknown")
response = result.get("response", "")
if isinstance(response, str) and status == "err":
@@ -526,8 +718,7 @@ class HyperliquidClient:
async def set_stop_loss(
self, instrument: str, stop_price: float, size: float
) -> dict[str, Any]:
"""Set a stop-loss trigger order."""
# Determine direction by checking open position
"""Set a stop-loss trigger order (reduce-only)."""
positions = await self.get_positions()
direction = "sell" # default: assume long
for pos in positions:
@@ -548,7 +739,7 @@ class HyperliquidClient:
async def set_take_profit(
self, instrument: str, tp_price: float, size: float
) -> dict[str, Any]:
"""Set a take-profit trigger order."""
"""Set a take-profit trigger order (reduce-only)."""
positions = await self.get_positions()
direction = "sell" # default: assume long
for pos in positions:
@@ -567,21 +758,55 @@ class HyperliquidClient:
)
async def close_position(self, instrument: str) -> dict[str, Any]:
"""Close an open position for the given asset using market_close."""
exchange = self._get_exchange()
"""Close an open position using an aggressive IOC reduce-only order."""
coin = instrument.upper()
try:
result = await self._run_sync(exchange.market_close, instrument.upper())
data = await self._post_info(
{"type": "clearinghouseState", "user": self.wallet_address}
)
target = None
for ap in data.get("assetPositions", []):
pos = ap.get("position", {})
if (pos.get("coin") or "").upper() == coin:
target = pos
break
if target is None:
return {"error": f"No open position for {instrument}", "asset": instrument}
szi = float(target.get("szi", 0) or 0)
if szi == 0:
return {"error": f"No open position for {instrument}", "asset": instrument}
sz = abs(szi)
is_buy = szi < 0 # short → buy to close
# Slippage price: usa mark price * (1±slippage), arrotonda a 5 sig figs.
ticker = await self.get_ticker(coin)
mark = float(ticker.get("mark_price", 0) or 0)
if mark <= 0:
return {"error": "missing mark price for slippage calc", "asset": instrument}
px = mark * (1 + DEFAULT_SLIPPAGE) if is_buy else mark * (1 - DEFAULT_SLIPPAGE)
px = round(float(f"{px:.5g}"), 6)
result = await self.place_order(
instrument=coin,
side="buy" if is_buy else "sell",
amount=sz,
type="limit",
price=px,
reduce_only=True,
)
return {
"status": result.get("status", "unknown"),
"status": result.get("status", "ok"),
"asset": instrument,
**{k: v for k, v in result.items() if k != "status"},
}
except Exception as exc:
return {"error": str(exc), "asset": instrument}
async def health(self) -> dict[str, Any]:
"""Health check — ping /info for server status."""
"""Health check — ping ``/info`` for server status."""
try:
await self._post({"type": "meta"})
await self._post_info({"type": "meta"})
return {"status": "ok", "testnet": self.testnet}
except Exception as exc:
return {"status": "error", "error": str(exc)}