"""Hyperliquid REST API client for perpetual futures trading.""" from __future__ import annotations import asyncio import datetime as _dt from typing import Any import httpx from mcp_common import indicators as ind from mcp_common.http import async_client BASE_LIVE = "https://api.hyperliquid.xyz" BASE_TESTNET = "https://api.hyperliquid-testnet.xyz" RESOLUTION_MAP = { "1m": "1m", "5m": "5m", "15m": "15m", "1h": "1h", "4h": "4h", "1d": "1d", } try: from eth_account import Account from hyperliquid.exchange import Exchange from hyperliquid.utils import constants as hl_constants _SDK_AVAILABLE = True except ImportError: # pragma: no cover _SDK_AVAILABLE = False def _to_ms(date_str: str) -> int: try: dt = _dt.datetime.fromisoformat(date_str) except ValueError: dt = _dt.datetime.strptime(date_str, "%Y-%m-%d") return int(dt.timestamp() * 1000) class HyperliquidClient: """Async client for the Hyperliquid API. Read operations use direct HTTP calls via httpx against /info. Write operations delegate to hyperliquid-python-sdk for EIP-712 signing. """ def __init__( self, wallet_address: str, private_key: str, testnet: bool = True, api_wallet_address: str | None = None, ): self.wallet_address = wallet_address self.private_key = private_key self.testnet = testnet self.api_wallet_address = api_wallet_address or wallet_address self.base_url = BASE_TESTNET if testnet else BASE_LIVE self._exchange: Any | None = None # ── SDK exchange (lazy) ──────────────────────────────────── def _get_exchange(self) -> Any: """Return (and cache) an SDK Exchange instance for write ops.""" if not _SDK_AVAILABLE: raise RuntimeError( "hyperliquid-python-sdk is not installed; write operations unavailable." ) if self._exchange is None: account = Account.from_key(self.private_key) base_url = ( hl_constants.TESTNET_API_URL if self.testnet else hl_constants.MAINNET_API_URL ) empty_spot_meta: dict[str, Any] = {"universe": [], "tokens": []} self._exchange = Exchange( account, base_url, account_address=self.wallet_address, spot_meta=empty_spot_meta, ) return self._exchange # ── Internal helpers ─────────────────────────────────────── async def _post(self, payload: dict[str, Any]) -> Any: """POST JSON to the /info endpoint.""" async with async_client(timeout=15.0) as http: resp = await http.post(f"{self.base_url}/info", json=payload) resp.raise_for_status() return resp.json() @staticmethod async def _run_sync(func: Any, *args: Any, **kwargs: Any) -> Any: """Run a synchronous SDK call in the default executor.""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) # ── Read tools ───────────────────────────────────────────── async def get_markets(self) -> list[dict[str, Any]]: """List all perp markets with metadata and current stats.""" data = await self._post({"type": "metaAndAssetCtxs"}) universe = data[0]["universe"] ctx_list = data[1] markets = [] for meta, ctx in zip(universe, ctx_list, strict=True): markets.append( { "asset": meta["name"], "mark_price": float(ctx.get("markPx", 0)), "funding_rate": float(ctx.get("funding", 0)), "open_interest": float(ctx.get("openInterest", 0)), "volume_24h": float(ctx.get("dayNtlVlm", 0)), "max_leverage": int(meta.get("maxLeverage", 1)), } ) return markets async def get_ticker(self, instrument: str) -> dict[str, Any]: """Get ticker information for a specific asset.""" markets = await self.get_markets() for m in markets: if m["asset"].upper() == instrument.upper(): return { "asset": m["asset"], "mark_price": m["mark_price"], "mid_price": m["mark_price"], "funding_rate": m["funding_rate"], "open_interest": m["open_interest"], "volume_24h": m["volume_24h"], "premium": 0.0, } return {"error": f"Asset {instrument} not found"} async def get_orderbook(self, instrument: str, depth: int = 10) -> dict[str, Any]: """Get L2 order book for an asset.""" data = await self._post({"type": "l2Book", "coin": instrument.upper()}) levels = data.get("levels", [[], []]) bids = [{"price": float(b["px"]), "size": float(b["sz"])} for b in levels[0][:depth]] asks = [{"price": float(a["px"]), "size": float(a["sz"])} for a in levels[1][:depth]] return {"asset": instrument, "bids": bids, "asks": asks} async def get_positions(self) -> list[dict[str, Any]]: """Get open positions for the wallet.""" data = await self._post( {"type": "clearinghouseState", "user": self.wallet_address} ) positions = [] for ap in data.get("assetPositions", []): pos = ap.get("position", {}) size = float(pos.get("szi", 0)) if size == 0: continue leverage_data = pos.get("leverage", {}) lev_value = ( leverage_data.get("value", "1") if isinstance(leverage_data, dict) else str(leverage_data) ) positions.append( { "asset": pos.get("coin", ""), "size": abs(size), "direction": "long" if size > 0 else "short", "entry_price": float(pos.get("entryPx", 0) or 0), "unrealized_pnl": float(pos.get("unrealizedPnl", 0)), "leverage": float(lev_value), "liquidation_price": float(pos.get("liquidationPx", 0) or 0), } ) return positions async def get_account_summary(self) -> dict[str, Any]: """Get account summary (equity, balance, margin) including spot balances. Con Unified Account, spot USDC e perps condividono collaterale. `spot_fetch_ok` / `perps_fetch_ok` indicano se i due lati sono stati letti correttamente: se uno dei due è False il chiamante dovrebbe considerare `equity`/`available_balance` un lower bound. """ perps_fetch_ok = True perps_equity = 0.0 perps_available = 0.0 margin_used = 0.0 unrealized_pnl = 0.0 try: data = await self._post( {"type": "clearinghouseState", "user": self.wallet_address} ) margin = data.get("marginSummary") or {} perps_equity = float(margin.get("accountValue", 0) or 0) perps_available = float(margin.get("totalRawUsd", 0) or 0) margin_used = float(margin.get("totalMarginUsed", 0) or 0) unrealized_pnl = float(margin.get("totalNtlPos", 0) or 0) except Exception: perps_fetch_ok = False spot_fetch_ok = True spot_usdc = 0.0 try: spot_data = await self._post( {"type": "spotClearinghouseState", "user": self.wallet_address} ) for b in spot_data.get("balances", []) or []: if b.get("coin") == "USDC": spot_usdc = float(b.get("total", 0) or 0) except Exception: spot_fetch_ok = False total_equity = perps_equity + spot_usdc total_available = perps_available + spot_usdc return { "equity": total_equity, "perps_equity": perps_equity, "perps_available": perps_available, "spot_usdc": spot_usdc, "available_balance": total_available, "margin_used": margin_used, "unrealized_pnl": unrealized_pnl, "perps_fetch_ok": perps_fetch_ok, "spot_fetch_ok": spot_fetch_ok, } async def get_trade_history(self, limit: int = 100) -> list[dict[str, Any]]: """Get recent trade fills.""" data = await self._post({"type": "userFills", "user": self.wallet_address}) trades = [] for t in data[:limit]: trades.append( { "asset": t.get("coin", ""), "side": t.get("side", ""), "size": float(t.get("sz", 0)), "price": float(t.get("px", 0)), "fee": float(t.get("fee", 0)), "timestamp": t.get("time", ""), } ) return trades async def get_historical( self, instrument: str, start_date: str, end_date: str, resolution: str = "1h" ) -> dict[str, Any]: """Get OHLCV candles for an asset.""" start_ms = _to_ms(start_date) end_ms = _to_ms(end_date) interval = RESOLUTION_MAP.get(resolution, resolution) data = await self._post( { "type": "candleSnapshot", "req": { "coin": instrument.upper(), "interval": interval, "startTime": start_ms, "endTime": end_ms, }, } ) candles = [] for c in data: candles.append( { "timestamp": c.get("t", 0), "open": float(c.get("o", 0)), "high": float(c.get("h", 0)), "low": float(c.get("l", 0)), "close": float(c.get("c", 0)), "volume": float(c.get("v", 0)), } ) return {"candles": candles} async def get_open_orders(self) -> list[dict[str, Any]]: """Get all open orders for the wallet.""" data = await self._post({"type": "openOrders", "user": self.wallet_address}) orders = [] for o in data: orders.append( { "oid": o.get("oid"), "asset": o.get("coin", ""), "side": o.get("side", ""), "size": float(o.get("sz", 0)), "price": float(o.get("limitPx", 0)), "order_type": o.get("orderType", ""), } ) return orders async def basis_spot_perp(self, asset: str) -> dict[str, Any]: asset = asset.upper() # Spot reference price from Coinbase (mainnet reference, anche se HL è testnet) spot_price: float | None = None spot_source = "coinbase" try: async with async_client(timeout=8.0) as c: resp = await c.get(f"https://api.coinbase.com/v2/prices/{asset}-USD/spot") if resp.status_code == 200: spot_price = float(resp.json().get("data", {}).get("amount")) except Exception: spot_price = None if spot_price is None: try: async with async_client(timeout=8.0) as c: resp = await c.get( "https://api.kraken.com/0/public/Ticker", params={"pair": f"{asset}USD"} ) if resp.status_code == 200: res = resp.json().get("result") or {} first = next(iter(res.values()), {}) price = (first.get("c") or [None])[0] spot_price = float(price) if price else None spot_source = "kraken" except Exception: pass # Perp price + funding from HL try: ctx = await self._post({"type": "metaAndAssetCtxs"}) universe = ctx[0]["universe"] ctx_list = ctx[1] perp_price = None funding = None for meta, c in zip(universe, ctx_list, strict=True): if meta["name"].upper() == asset: perp_price = float(c.get("markPx", 0)) funding = float(c.get("funding", 0)) break except Exception: perp_price = None funding = None if spot_price is None or perp_price is None: return { "asset": asset, "spot_price": spot_price, "perp_price": perp_price, "error": "missing spot or perp price", } basis_abs = perp_price - spot_price basis_pct = round(basis_abs / spot_price * 100, 4) basis_ann_funding = ( round(funding * 24 * 365 * 100, 2) if funding is not None else None ) carry_opp = bool( basis_ann_funding is not None and basis_ann_funding > 5 and (funding or 0) > 0.0001 ) return { "asset": asset, "spot_price": spot_price, "spot_source": spot_source, "perp_price": perp_price, "basis_absolute": round(basis_abs, 4), "basis_pct": basis_pct, "current_funding_hourly": funding, "basis_annualized_funding_only": basis_ann_funding, "carry_opportunity": carry_opp, "testnet": self.testnet, "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), } async def get_funding_rate(self, instrument: str) -> dict[str, Any]: """Get current and recent historical funding rates for an asset.""" data = await self._post({"type": "metaAndAssetCtxs"}) universe = data[0]["universe"] ctx_list = data[1] current_rate = None for meta, ctx in zip(universe, ctx_list, strict=True): if meta["name"].upper() == instrument.upper(): current_rate = float(ctx.get("funding", 0)) break if current_rate is None: return {"error": f"Asset {instrument} not found"} # Fetch funding history (last 7 days) end_ms = int(_dt.datetime.utcnow().timestamp() * 1000) start_ms = end_ms - 7 * 24 * 3600 * 1000 history_data = await self._post( { "type": "fundingHistory", "coin": instrument.upper(), "startTime": start_ms, "endTime": end_ms, } ) history = [] for entry in history_data: history.append( { "timestamp": entry.get("time", 0), "funding_rate": float(entry.get("fundingRate", 0)), } ) return { "asset": instrument, "current_funding_rate": current_rate, "history": history, } async def get_indicators( self, instrument: str, indicators: list[str], start_date: str, end_date: str, resolution: str = "1h", ) -> dict[str, Any]: """Compute technical indicators over OHLCV data.""" historical = await self.get_historical(instrument, start_date, end_date, resolution) candles = historical.get("candles", []) closes = [c["close"] for c in candles] highs = [c["high"] for c in candles] lows = [c["low"] for c in candles] result: dict[str, Any] = {} for indicator in indicators: name = indicator.lower() if name == "sma": result["sma"] = ind.sma(closes, 20) elif name == "rsi": result["rsi"] = ind.rsi(closes) elif name == "atr": result["atr"] = ind.atr(highs, lows, closes) elif name == "macd": result["macd"] = ind.macd(closes) elif name == "adx": result["adx"] = ind.adx(highs, lows, closes) else: result[name] = None return result # ── Write tools (via SDK) ────────────────────────────────── async def place_order( self, instrument: str, side: str, amount: float, type: str = "limit", price: float | None = None, reduce_only: bool = False, ) -> dict[str, Any]: """Place an order on Hyperliquid using the SDK for EIP-712 signing.""" exchange = self._get_exchange() is_buy = side.lower() in ("buy", "long") coin = instrument.upper() if type == "market": ot: dict[str, Any] = {"limit": {"tif": "Ioc"}} if price is None: ticker = await self.get_ticker(coin) mark = ticker.get("mark_price", 0) price = round(mark * 1.03, 1) if is_buy else round(mark * 0.97, 1) elif type in ("stop_market", "stop_loss"): ot = {"trigger": {"triggerPx": float(price), "isMarket": True, "tpsl": "sl"}} elif type == "take_profit": ot = {"trigger": {"triggerPx": float(price), "isMarket": True, "tpsl": "tp"}} else: ot = {"limit": {"tif": "Gtc"}} if price is None: return {"error": "price is required for limit orders"} result = await self._run_sync( exchange.order, coin, is_buy, amount, price, ot, reduce_only ) status = result.get("status", "unknown") response = result.get("response", {}) if isinstance(response, str): return { "status": status, "error": response, "order_id": "", "filled_size": 0, "avg_fill_price": 0, } statuses = response.get("data", {}).get("statuses", [{}]) first = statuses[0] if statuses else {} if isinstance(first, str): return { "status": status, "error": first, "order_id": "", "filled_size": 0, "avg_fill_price": 0, } return { "order_id": first.get("resting", {}).get( "oid", first.get("filled", {}).get("oid", "") ), "status": status, "filled_size": float(first.get("filled", {}).get("totalSz", 0)), "avg_fill_price": float(first.get("filled", {}).get("avgPx", 0)), } async def cancel_order(self, order_id: str, instrument: str) -> dict[str, Any]: """Cancel an existing order using the SDK.""" exchange = self._get_exchange() result = await self._run_sync( exchange.cancel, instrument.upper(), int(order_id) ) status = result.get("status", "unknown") response = result.get("response", "") if isinstance(response, str) and status == "err": return {"order_id": order_id, "status": status, "error": response} return {"order_id": order_id, "status": status} async def set_stop_loss( self, instrument: str, stop_price: float, size: float ) -> dict[str, Any]: """Set a stop-loss trigger order.""" # Determine direction by checking open position positions = await self.get_positions() direction = "sell" # default: assume long for pos in positions: if pos["asset"].upper() == instrument.upper(): direction = "sell" if pos["direction"] == "long" else "buy" if size == 0: size = pos["size"] break return await self.place_order( instrument=instrument, side=direction, amount=size, type="stop_loss", price=stop_price, reduce_only=True, ) async def set_take_profit( self, instrument: str, tp_price: float, size: float ) -> dict[str, Any]: """Set a take-profit trigger order.""" positions = await self.get_positions() direction = "sell" # default: assume long for pos in positions: if pos["asset"].upper() == instrument.upper(): direction = "sell" if pos["direction"] == "long" else "buy" if size == 0: size = pos["size"] break return await self.place_order( instrument=instrument, side=direction, amount=size, type="take_profit", price=tp_price, reduce_only=True, ) async def close_position(self, instrument: str) -> dict[str, Any]: """Close an open position for the given asset using market_close.""" exchange = self._get_exchange() try: result = await self._run_sync(exchange.market_close, instrument.upper()) return { "status": result.get("status", "unknown"), "asset": instrument, } except Exception as exc: return {"error": str(exc), "asset": instrument} async def health(self) -> dict[str, Any]: """Health check — ping /info for server status.""" try: await self._post({"type": "meta"}) return {"status": "ok", "testnet": self.testnet} except Exception as exc: return {"status": "error", "error": str(exc)}