"""Bybit V5 REST API client (httpx puro, no SDK). Implementazione diretta su `httpx.AsyncClient` per i tool Cerbero MCP V2. Mantiene parità di interfaccia con la versione precedente basata su `pybit.unified_trading.HTTP` per non rompere `tools.py` né i router. Auth Bybit V5: Header X-BAPI-SIGN = HMAC_SHA256(secret, timestamp + api_key + recv_window + (body_json | querystring)) """ from __future__ import annotations import hashlib import hmac import json import time import uuid from typing import Any from urllib.parse import urlencode import httpx from cerbero_mcp.common import indicators as ind from cerbero_mcp.common import microstructure as micro BASE_MAINNET = "https://api.bybit.com" BASE_TESTNET = "https://api-testnet.bybit.com" DEFAULT_RECV_WINDOW = "5000" DEFAULT_TIMEOUT = 15.0 def _f(v: Any) -> float | None: try: return float(v) except (TypeError, ValueError): return None def _i(v: Any) -> int | None: try: return int(v) except (TypeError, ValueError): return None class BybitAPIError(RuntimeError): """Errore di trasporto Bybit V5 (non gestito a livello envelope).""" class BybitClient: """Async REST client per Bybit V5 (linear/inverse/spot/option).""" def __init__( self, api_key: str, api_secret: str, testnet: bool = True, http: httpx.AsyncClient | None = None, base_url: str | None = None, ) -> None: self.api_key = api_key self.api_secret = api_secret self.testnet = testnet self.base_url = base_url or (BASE_TESTNET if testnet else BASE_MAINNET) self.recv_window = DEFAULT_RECV_WINDOW # `http` injection è usato dai test per montare un AsyncClient con # `httpx.MockTransport`. In produzione creiamo un client dedicato. self._owns_http = http is None self._http: httpx.AsyncClient = http or httpx.AsyncClient( timeout=DEFAULT_TIMEOUT ) async def aclose(self) -> None: """Chiude l'AsyncClient httpx se di nostra proprietà.""" if self._owns_http: await self._http.aclose() async def health(self) -> dict[str, Any]: """Probe minimo per /health/ready: nessuna chiamata di rete.""" return {"status": "ok", "testnet": self.testnet} # ── auth helpers ─────────────────────────────────────────── def _timestamp_ms(self) -> str: return str(int(time.time() * 1000)) def _sign(self, timestamp: str, payload: str) -> str: msg = timestamp + self.api_key + self.recv_window + payload return hmac.new( self.api_secret.encode("utf-8"), msg.encode("utf-8"), hashlib.sha256, ).hexdigest() def _signed_headers(self, payload: str) -> dict[str, str]: ts = self._timestamp_ms() sig = self._sign(ts, payload) return { "X-BAPI-API-KEY": self.api_key, "X-BAPI-TIMESTAMP": ts, "X-BAPI-RECV-WINDOW": self.recv_window, "X-BAPI-SIGN": sig, "Content-Type": "application/json", } @staticmethod def _clean_params(params: dict[str, Any] | None) -> dict[str, Any]: if not params: return {} return {k: v for k, v in params.items() if v is not None} @staticmethod def _querystring(params: dict[str, Any]) -> str: # Bybit accetta querystring nell'ordine in cui viene serializzata la # request. Per la signature usiamo lo stesso urlencode (ordine # inserzione dict). In Python 3.7+ dict mantiene insertion order: # mantenere coerenza tra signature payload e URL effettivo. return urlencode(params) # ── request primitives ───────────────────────────────────── async def _request_public( self, method: str, path: str, params: dict[str, Any] | None = None, ) -> dict[str, Any]: clean = self._clean_params(params) url = self.base_url + path resp = await self._http.request( method, url, params=clean if clean else None ) return self._parse_response(resp) async def _request_signed( self, method: str, path: str, params: dict[str, Any] | None = None, body: dict[str, Any] | None = None, ) -> dict[str, Any]: url = self.base_url + path method = method.upper() if method == "GET": clean = self._clean_params(params) qs = self._querystring(clean) headers = self._signed_headers(qs) resp = await self._http.request( method, url, params=clean if clean else None, headers=headers ) else: payload_body = body or {} body_json = json.dumps(payload_body, separators=(",", ":")) headers = self._signed_headers(body_json) resp = await self._http.request( method, url, content=body_json, headers=headers ) return self._parse_response(resp) @staticmethod def _parse_response(resp: httpx.Response) -> dict[str, Any]: try: data = resp.json() except Exception as e: # pragma: no cover - difficilmente raggiungibile raise BybitAPIError( f"invalid JSON from Bybit (status={resp.status_code}): {resp.text[:200]}" ) from e if resp.status_code >= 500: raise BybitAPIError( f"bybit server error {resp.status_code}: " f"{data.get('retMsg', resp.text[:200])}" ) if not isinstance(data, dict): raise BybitAPIError(f"unexpected payload type: {type(data).__name__}") return data def _envelope(self, resp: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]: code = resp.get("retCode", 0) if code != 0: return {"error": resp.get("retMsg", "bybit_error"), "code": code} return payload # ── parsers shared ───────────────────────────────────────── @staticmethod def _parse_ticker(row: dict[str, Any]) -> dict[str, Any]: return { "symbol": row.get("symbol"), "last_price": _f(row.get("lastPrice")), "mark_price": _f(row.get("markPrice")), "bid": _f(row.get("bid1Price")), "ask": _f(row.get("ask1Price")), "volume_24h": _f(row.get("volume24h")), "turnover_24h": _f(row.get("turnover24h")), "funding_rate": _f(row.get("fundingRate")), "open_interest": _f(row.get("openInterest")), } # ── market data (public) ─────────────────────────────────── async def get_ticker(self, symbol: str, category: str = "linear") -> dict: resp = await self._request_public( "GET", "/v5/market/tickers", params={"category": category, "symbol": symbol}, ) rows = (resp.get("result") or {}).get("list") or [] if not rows: return {"symbol": symbol, "error": "not_found"} return self._parse_ticker(rows[0]) async def get_ticker_batch( self, symbols: list[str], category: str = "linear" ) -> dict[str, dict]: out: dict[str, dict] = {} for sym in symbols: out[sym] = await self.get_ticker(sym, category=category) return out async def get_orderbook( self, symbol: str, category: str = "linear", limit: int = 50 ) -> dict: resp = await self._request_public( "GET", "/v5/market/orderbook", params={"category": category, "symbol": symbol, "limit": limit}, ) r = resp.get("result") or {} return { "symbol": r.get("s"), "bids": [[float(p), float(q)] for p, q in (r.get("b") or [])], "asks": [[float(p), float(q)] for p, q in (r.get("a") or [])], "timestamp": r.get("ts"), } async def get_historical( self, symbol: str, category: str = "linear", interval: str = "60", start: int | None = None, end: int | None = None, limit: int = 1000, ) -> dict: params: dict[str, Any] = { "category": category, "symbol": symbol, "interval": interval, "limit": limit, } if start is not None: params["start"] = start if end is not None: params["end"] = end resp = await self._request_public("GET", "/v5/market/kline", params=params) rows = (resp.get("result") or {}).get("list") or [] rows_sorted = sorted(rows, key=lambda r: int(r[0])) candles = [ { "timestamp": int(r[0]), "open": float(r[1]), "high": float(r[2]), "low": float(r[3]), "close": float(r[4]), "volume": float(r[5]), } for r in rows_sorted ] return {"symbol": symbol, "candles": candles} async def get_indicators( self, symbol: str, category: str = "linear", indicators: list[str] | None = None, interval: str = "60", start: int | None = None, end: int | None = None, ) -> dict: indicators = indicators or ["rsi", "atr", "macd", "adx"] historical = await self.get_historical( symbol, category=category, interval=interval, start=start, end=end ) candles = historical.get("candles", []) closes = [c["close"] for c in candles] highs = [c["high"] for c in candles] lows = [c["low"] for c in candles] out: dict[str, Any] = {"symbol": symbol, "category": category} for name in indicators: n = name.lower() if n == "sma": out["sma"] = ind.sma(closes, 20) elif n == "rsi": out["rsi"] = ind.rsi(closes) elif n == "atr": out["atr"] = ind.atr(highs, lows, closes) elif n == "macd": out["macd"] = ind.macd(closes) elif n == "adx": out["adx"] = ind.adx(highs, lows, closes) else: out[n] = None return out async def get_funding_rate(self, symbol: str, category: str = "linear") -> dict: resp = await self._request_public( "GET", "/v5/market/tickers", params={"category": category, "symbol": symbol}, ) rows = (resp.get("result") or {}).get("list") or [] if not rows: return {"symbol": symbol, "error": "not_found"} row = rows[0] return { "symbol": row.get("symbol"), "funding_rate": _f(row.get("fundingRate")), "next_funding_time": _i(row.get("nextFundingTime")), } async def get_funding_history( self, symbol: str, category: str = "linear", limit: int = 100 ) -> dict: resp = await self._request_public( "GET", "/v5/market/funding/history", params={"category": category, "symbol": symbol, "limit": limit}, ) rows = (resp.get("result") or {}).get("list") or [] hist = [ { "timestamp": int(r.get("fundingRateTimestamp", 0)), "rate": float(r.get("fundingRate", 0)), } for r in rows ] return {"symbol": symbol, "history": hist} async def get_open_interest( self, symbol: str, category: str = "linear", interval: str = "5min", limit: int = 288, ) -> dict: resp = await self._request_public( "GET", "/v5/market/open-interest", params={ "category": category, "symbol": symbol, "intervalTime": interval, "limit": limit, }, ) rows = (resp.get("result") or {}).get("list") or [] points = [ { "timestamp": int(r.get("timestamp", 0)), "oi": float(r.get("openInterest", 0)), } for r in rows ] current_oi = points[0]["oi"] if points else None return { "symbol": symbol, "category": category, "interval": interval, "current_oi": current_oi, "points": points, } async def get_instruments( self, category: str = "linear", symbol: str | None = None ) -> dict: params: dict[str, Any] = {"category": category} if symbol: params["symbol"] = symbol resp = await self._request_public( "GET", "/v5/market/instruments-info", params=params ) rows = (resp.get("result") or {}).get("list") or [] instruments = [] for r in rows: pf = r.get("priceFilter") or {} lf = r.get("lotSizeFilter") or {} instruments.append( { "symbol": r.get("symbol"), "status": r.get("status"), "base_coin": r.get("baseCoin"), "quote_coin": r.get("quoteCoin"), "tick_size": _f(pf.get("tickSize")), "qty_step": _f(lf.get("qtyStep")), "min_qty": _f(lf.get("minOrderQty")), } ) return {"category": category, "instruments": instruments} async def get_option_chain(self, base_coin: str, expiry: str | None = None) -> dict: resp = await self._request_public( "GET", "/v5/market/instruments-info", params={"category": "option", "baseCoin": base_coin.upper()}, ) rows = (resp.get("result") or {}).get("list") or [] options = [] for r in rows: delivery = r.get("deliveryTime") if expiry and expiry not in r.get("symbol", ""): continue options.append( { "symbol": r.get("symbol"), "base_coin": r.get("baseCoin"), "settle_coin": r.get("settleCoin"), "type": r.get("optionsType"), "launch_time": int(r.get("launchTime", 0)), "delivery_time": int(delivery) if delivery else None, } ) return {"base_coin": base_coin.upper(), "options": options} # ── account / positions / orders (signed) ───────────────── async def get_positions( self, category: str = "linear", settle_coin: str = "USDT" ) -> list[dict]: params: dict[str, Any] = {"category": category} if category in ("linear", "inverse"): params["settleCoin"] = settle_coin resp = await self._request_signed("GET", "/v5/position/list", params=params) rows = (resp.get("result") or {}).get("list") or [] out = [] for r in rows: out.append( { "symbol": r.get("symbol"), "side": r.get("side"), "size": _f(r.get("size")), "entry_price": _f(r.get("avgPrice")), "unrealized_pnl": _f(r.get("unrealisedPnl")), "leverage": _f(r.get("leverage")), "liquidation_price": _f(r.get("liqPrice")), "position_value": _f(r.get("positionValue")), } ) return out async def get_account_summary(self, account_type: str = "UNIFIED") -> dict: resp = await self._request_signed( "GET", "/v5/account/wallet-balance", params={"accountType": account_type}, ) rows = (resp.get("result") or {}).get("list") or [] if not rows: return {"error": "no_account"} a = rows[0] coins = [] for c in a.get("coin") or []: coins.append( { "coin": c.get("coin"), "wallet_balance": _f(c.get("walletBalance")), "equity": _f(c.get("equity")), } ) return { "account_type": a.get("accountType"), "equity": _f(a.get("totalEquity")), "wallet_balance": _f(a.get("totalWalletBalance")), "margin_balance": _f(a.get("totalMarginBalance")), "available_balance": _f(a.get("totalAvailableBalance")), "unrealized_pnl": _f(a.get("totalPerpUPL")), "coins": coins, } async def get_trade_history( self, category: str = "linear", limit: int = 50 ) -> list[dict]: resp = await self._request_signed( "GET", "/v5/execution/list", params={"category": category, "limit": limit}, ) rows = (resp.get("result") or {}).get("list") or [] return [ { "symbol": r.get("symbol"), "side": r.get("side"), "size": _f(r.get("execQty")), "price": _f(r.get("execPrice")), "fee": _f(r.get("execFee")), "timestamp": _i(r.get("execTime")), "order_id": r.get("orderId"), } for r in rows ] async def get_open_orders( self, category: str = "linear", symbol: str | None = None, settle_coin: str = "USDT", ) -> list[dict]: params: dict[str, Any] = {"category": category} if category in ("linear", "inverse") and not symbol: params["settleCoin"] = settle_coin if symbol: params["symbol"] = symbol resp = await self._request_signed( "GET", "/v5/order/realtime", params=params ) rows = (resp.get("result") or {}).get("list") or [] return [ { "order_id": r.get("orderId"), "symbol": r.get("symbol"), "side": r.get("side"), "qty": _f(r.get("qty")), "price": _f(r.get("price")), "type": r.get("orderType"), "status": r.get("orderStatus"), "reduce_only": bool(r.get("reduceOnly")), } for r in rows ] # ── microstructure / basis ───────────────────────────────── async def get_orderbook_imbalance( self, symbol: str, category: str = "linear", depth: int = 10, ) -> dict: ob = await self.get_orderbook( symbol=symbol, category=category, limit=max(depth, 50) ) result = micro.orderbook_imbalance( ob.get("bids") or [], ob.get("asks") or [], depth=depth ) return { "symbol": symbol, "category": category, "depth": depth, **result, "timestamp": ob.get("timestamp"), } async def get_basis_term_structure(self, asset: str) -> dict: import datetime as _dt asset = asset.upper() spot = await self.get_ticker(f"{asset}USDT", category="spot") perp = await self.get_ticker(f"{asset}USDT", category="linear") sp = spot.get("last_price") pp = perp.get("last_price") instr = await self.get_instruments(category="linear") items = instr.get("instruments") or [] futures = [ x for x in items if x.get("symbol", "").startswith(f"{asset}-") or x.get("symbol", "").startswith(f"{asset}USDT-") ] rows: list[dict[str, Any]] = [] if sp: now_ms = int(_dt.datetime.now(_dt.UTC).timestamp() * 1000) for f in futures[:10]: tk = await self.get_ticker(f["symbol"], category="linear") fp = tk.get("last_price") expiry_ms = f.get("delivery_time") if not fp or not expiry_ms: continue days = max((int(expiry_ms) - now_ms) / 86_400_000, 1) basis_pct = 100.0 * (fp - sp) / sp annualized = basis_pct * 365.0 / days rows.append( { "symbol": f["symbol"], "expiry_ms": int(expiry_ms), "days_to_expiry": round(days, 2), "future_price": fp, "basis_pct": round(basis_pct, 4), "annualized_basis_pct": round(annualized, 4), } ) rows.sort(key=lambda r: r["days_to_expiry"]) return { "asset": asset, "spot_price": sp, "perp_price": pp, "perp_basis_pct": round(100.0 * (pp - sp) / sp, 4) if (sp and pp) else None, "term_structure": rows, "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), } async def get_basis_spot_perp(self, asset: str) -> dict: asset = asset.upper() symbol = f"{asset}USDT" spot = await self.get_ticker(symbol, category="spot") perp = await self.get_ticker(symbol, category="linear") sp = spot.get("last_price") pp = perp.get("last_price") basis_abs = basis_pct = None if sp and pp: basis_abs = pp - sp basis_pct = 100.0 * basis_abs / sp return { "asset": asset, "symbol": symbol, "spot_price": sp, "perp_price": pp, "basis_abs": basis_abs, "basis_pct": basis_pct, "funding_rate": perp.get("funding_rate"), } # ── trading (signed, write) ──────────────────────────────── async def place_order( self, category: str, symbol: str, side: str, qty: float, order_type: str = "Limit", price: float | None = None, tif: str = "GTC", reduce_only: bool = False, position_idx: int | None = None, ) -> dict: body: dict[str, Any] = { "category": category, "symbol": symbol, "side": side, "qty": str(qty), "orderType": order_type, "timeInForce": tif, "reduceOnly": reduce_only, } if price is not None: body["price"] = str(price) if position_idx is not None: body["positionIdx"] = position_idx if category == "option": body["orderLinkId"] = f"cerbero-{uuid.uuid4().hex[:16]}" resp = await self._request_signed("POST", "/v5/order/create", body=body) r = resp.get("result") or {} return self._envelope( resp, { "order_id": r.get("orderId"), "order_link_id": r.get("orderLinkId"), "status": "submitted", }, ) async def place_combo_order( self, category: str, legs: list[dict[str, Any]], ) -> dict: if category != "option": raise ValueError( "place_combo_order: Bybit batch_order è disponibile solo su category='option'" ) if len(legs) < 2: raise ValueError("combo requires at least 2 legs") request: list[dict[str, Any]] = [] for leg in legs: entry: dict[str, Any] = { "symbol": leg["symbol"], "side": leg["side"], "qty": str(leg["qty"]), "orderType": leg.get("order_type", "Limit"), "timeInForce": leg.get("tif", "GTC"), "reduceOnly": leg.get("reduce_only", False), "orderLinkId": f"cerbero-{uuid.uuid4().hex[:16]}", } if leg.get("price") is not None: entry["price"] = str(leg["price"]) request.append(entry) body = {"category": category, "request": request} resp = await self._request_signed( "POST", "/v5/order/create-batch", body=body ) result_list = (resp.get("result") or {}).get("list") or [] orders = [ { "order_id": r.get("orderId"), "order_link_id": r.get("orderLinkId"), "status": "submitted", } for r in result_list ] return self._envelope(resp, {"orders": orders}) async def amend_order( self, category: str, symbol: str, order_id: str, new_qty: float | None = None, new_price: float | None = None, ) -> dict: body: dict[str, Any] = { "category": category, "symbol": symbol, "orderId": order_id, } if new_qty is not None: body["qty"] = str(new_qty) if new_price is not None: body["price"] = str(new_price) resp = await self._request_signed("POST", "/v5/order/amend", body=body) r = resp.get("result") or {} return self._envelope( resp, { "order_id": r.get("orderId", order_id), "status": "amended", }, ) async def cancel_order(self, category: str, symbol: str, order_id: str) -> dict: body = {"category": category, "symbol": symbol, "orderId": order_id} resp = await self._request_signed("POST", "/v5/order/cancel", body=body) r = resp.get("result") or {} return self._envelope( resp, { "order_id": r.get("orderId", order_id), "status": "cancelled", }, ) async def cancel_all_orders( self, category: str, symbol: str | None = None ) -> dict: body: dict[str, Any] = {"category": category} if symbol: body["symbol"] = symbol resp = await self._request_signed( "POST", "/v5/order/cancel-all", body=body ) r = resp.get("result") or {} ids = [x.get("orderId") for x in (r.get("list") or [])] return self._envelope( resp, { "cancelled_ids": ids, "count": len(ids), }, ) async def set_stop_loss( self, category: str, symbol: str, stop_loss: float, position_idx: int = 0, ) -> dict: body = { "category": category, "symbol": symbol, "stopLoss": str(stop_loss), "positionIdx": position_idx, } resp = await self._request_signed( "POST", "/v5/position/trading-stop", body=body ) return self._envelope( resp, { "symbol": symbol, "stop_loss": stop_loss, "status": "stop_loss_set", }, ) async def set_take_profit( self, category: str, symbol: str, take_profit: float, position_idx: int = 0, ) -> dict: body = { "category": category, "symbol": symbol, "takeProfit": str(take_profit), "positionIdx": position_idx, } resp = await self._request_signed( "POST", "/v5/position/trading-stop", body=body ) return self._envelope( resp, { "symbol": symbol, "take_profit": take_profit, "status": "take_profit_set", }, ) async def close_position(self, category: str, symbol: str) -> dict: positions = await self.get_positions(category=category) target = next( (p for p in positions if p["symbol"] == symbol and (p["size"] or 0) > 0), None, ) if not target: return {"error": "no_open_position", "symbol": symbol} close_side = "Sell" if target["side"] == "Buy" else "Buy" return await self.place_order( category=category, symbol=symbol, side=close_side, qty=target["size"], order_type="Market", reduce_only=True, tif="IOC", ) async def set_leverage( self, category: str, symbol: str, leverage: int ) -> dict: body = { "category": category, "symbol": symbol, "buyLeverage": str(leverage), "sellLeverage": str(leverage), } resp = await self._request_signed( "POST", "/v5/position/set-leverage", body=body ) return self._envelope( resp, { "symbol": symbol, "leverage": leverage, "status": "leverage_set", }, ) async def switch_position_mode( self, category: str, symbol: str, mode: str ) -> dict: mode_code = 3 if mode.lower() == "hedge" else 0 body = { "category": category, "symbol": symbol, "mode": mode_code, } resp = await self._request_signed( "POST", "/v5/position/switch-mode", body=body ) return self._envelope( resp, { "symbol": symbol, "mode": mode, "status": "mode_switched", }, ) async def transfer_asset( self, coin: str, amount: float, from_type: str, to_type: str, ) -> dict: body = { "transferId": str(uuid.uuid4()), "coin": coin, "amount": str(amount), "fromAccountType": from_type, "toAccountType": to_type, } resp = await self._request_signed( "POST", "/v5/asset/transfer/inter-transfer", body=body ) r = resp.get("result") or {} return self._envelope( resp, { "transfer_id": r.get("transferId"), "coin": coin, "amount": amount, "status": "submitted", }, )