from __future__ import annotations import os from fastapi import Depends, FastAPI, HTTPException from mcp_common.audit import audit_write_op from mcp_common.auth import Principal, TokenStore, require_principal from mcp_common.environment import EnvironmentInfo from mcp_common.mcp_bridge import mount_mcp_endpoint from mcp_common.server import build_app from pydantic import BaseModel, field_validator, model_validator from mcp_hyperliquid.client import HyperliquidClient from mcp_hyperliquid.leverage_cap import enforce_leverage as _enforce_leverage from mcp_hyperliquid.leverage_cap import get_max_leverage # --- Body models --- class GetMarketsReq(BaseModel): pass class GetTickerReq(BaseModel): instrument: str class GetOrderbookReq(BaseModel): instrument: str depth: int = 10 class GetPositionsReq(BaseModel): pass class GetAccountSummaryReq(BaseModel): pass class GetTradeHistoryReq(BaseModel): limit: int = 100 class GetHistoricalReq(BaseModel): instrument: str | None = None asset: str | None = None start_date: str | None = None end_date: str | None = None resolution: str = "1h" interval: str | None = None limit: int = 50 model_config = {"extra": "allow"} @model_validator(mode="after") def _normalize(self): from datetime import UTC, datetime, timedelta sym = self.instrument or self.asset if not sym: raise ValueError("instrument (or asset) is required") self.instrument = sym if self.interval: self.resolution = self.interval if not self.end_date: self.end_date = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S") if not self.start_date: days = max(1, self.limit // 6) self.start_date = ( datetime.now(UTC) - timedelta(days=days) ).strftime("%Y-%m-%dT%H:%M:%S") return self class GetOpenOrdersReq(BaseModel): pass class GetFundingRateReq(BaseModel): instrument: str class BasisSpotPerpReq(BaseModel): asset: str class GetIndicatorsReq(BaseModel): instrument: str | None = None asset: str | None = None indicators: list[str] = ["rsi", "atr", "macd", "adx"] start_date: str | None = None end_date: str | None = None resolution: str = "1h" interval: str | None = None limit: int = 50 model_config = {"extra": "allow"} @model_validator(mode="after") def _normalize(self): from datetime import UTC, datetime, timedelta sym = self.instrument or self.asset if not sym: raise ValueError("instrument (or asset) is required") self.instrument = sym if self.interval: self.resolution = self.interval if not self.end_date: self.end_date = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S") if not self.start_date: days = max(2, self.limit // 6) self.start_date = ( datetime.now(UTC) - timedelta(days=days) ).strftime("%Y-%m-%dT%H:%M:%S") return self @field_validator("indicators", mode="before") @classmethod def _coerce_indicators(cls, v): if isinstance(v, str): import json s = v.strip() if s.startswith("["): try: parsed = json.loads(s) if isinstance(parsed, list): return [str(x).strip() for x in parsed if str(x).strip()] except json.JSONDecodeError: pass return [x.strip() for x in s.split(",") if x.strip()] if isinstance(v, list): return v raise ValueError( "indicators must be a list like ['rsi','atr','macd'] " "or a comma-separated string like 'rsi,atr,macd'" ) class PlaceOrderReq(BaseModel): instrument: str side: str # "buy" | "sell" amount: float type: str = "limit" price: float | None = None reduce_only: bool = False leverage: int | None = None # CER-016: None → default cap (3x) class CancelOrderReq(BaseModel): order_id: str instrument: str class SetStopLossReq(BaseModel): instrument: str stop_price: float size: float class SetTakeProfitReq(BaseModel): instrument: str tp_price: float size: float class ClosePositionReq(BaseModel): instrument: str # --- ACL helper --- def _check(principal: Principal, *, core: bool = False, observer: bool = False) -> None: allowed: set[str] = set() if core: allowed.add("core") if observer: allowed.add("observer") if not (principal.capabilities & allowed): raise HTTPException(403, f"capability required: {allowed}") # --- App factory --- def create_app( *, client: HyperliquidClient, token_store: TokenStore, creds: dict | None = None, env_info: EnvironmentInfo | None = None, ) -> FastAPI: creds = creds or {} app = build_app(name="mcp-hyperliquid", version="0.1.0", token_store=token_store) # --- Read tools: core + observer --- @app.post("/tools/environment_info", tags=["reads"]) async def t_environment_info(principal: Principal = Depends(require_principal)): _check(principal, core=True, observer=True) if env_info is None: return { "exchange": "hyperliquid", "environment": "testnet" if getattr(client, "testnet", True) else "mainnet", "source": "credentials", "env_value": None, "base_url": getattr(client, "base_url", None), "max_leverage": get_max_leverage(creds), } return { "exchange": env_info.exchange, "environment": env_info.environment, "source": env_info.source, "env_value": env_info.env_value, "base_url": env_info.base_url, "max_leverage": get_max_leverage(creds), } @app.post("/tools/get_markets", tags=["reads"]) async def t_get_markets( body: GetMarketsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_markets() @app.post("/tools/get_ticker", tags=["reads"]) async def t_get_ticker( body: GetTickerReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_ticker(body.instrument) @app.post("/tools/get_orderbook", tags=["reads"]) async def t_get_orderbook( body: GetOrderbookReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_orderbook(body.instrument, body.depth) @app.post("/tools/get_positions", tags=["reads"]) async def t_get_positions( body: GetPositionsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_positions() @app.post("/tools/get_account_summary", tags=["reads"]) async def t_get_account_summary( body: GetAccountSummaryReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_account_summary() @app.post("/tools/get_trade_history", tags=["reads"]) async def t_get_trade_history( body: GetTradeHistoryReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_trade_history(body.limit) @app.post("/tools/get_historical", tags=["reads"]) async def t_get_historical( body: GetHistoricalReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_historical( body.instrument, body.start_date, body.end_date, body.resolution ) @app.post("/tools/get_open_orders", tags=["reads"]) async def t_get_open_orders( body: GetOpenOrdersReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_open_orders() @app.post("/tools/get_funding_rate", tags=["reads"]) async def t_get_funding_rate( body: GetFundingRateReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_funding_rate(body.instrument) @app.post("/tools/basis_spot_perp", tags=["writes"]) async def t_basis_spot_perp( body: BasisSpotPerpReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.basis_spot_perp(body.asset) @app.post("/tools/get_indicators", tags=["reads"]) async def t_get_indicators( body: GetIndicatorsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_indicators( body.instrument, body.indicators, body.start_date, body.end_date, body.resolution, ) # --- Write tools: core only --- @app.post("/tools/place_order", tags=["writes"]) async def t_place_order( body: PlaceOrderReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) _enforce_leverage(body.leverage, creds=creds, exchange="hyperliquid") result = await client.place_order( instrument=body.instrument, side=body.side, amount=body.amount, type=body.type, price=body.price, reduce_only=body.reduce_only, ) audit_write_op( principal=principal, action="place_order", exchange="hyperliquid", target=body.instrument, payload={"side": body.side, "amount": body.amount, "type": body.type, "price": body.price, "reduce_only": body.reduce_only, "leverage": body.leverage}, result=result, ) return result @app.post("/tools/cancel_order", tags=["writes"]) async def t_cancel_order( body: CancelOrderReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) result = await client.cancel_order(body.order_id, body.instrument) audit_write_op( principal=principal, action="cancel_order", exchange="hyperliquid", target=body.order_id, payload={"instrument": body.instrument}, result=result, ) return result @app.post("/tools/set_stop_loss", tags=["writes"]) async def t_set_sl( body: SetStopLossReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) result = await client.set_stop_loss(body.instrument, body.stop_price, body.size) audit_write_op( principal=principal, action="set_stop_loss", exchange="hyperliquid", target=body.instrument, payload={"stop_price": body.stop_price, "size": body.size}, result=result, ) return result @app.post("/tools/set_take_profit", tags=["writes"]) async def t_set_tp( body: SetTakeProfitReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) result = await client.set_take_profit(body.instrument, body.tp_price, body.size) audit_write_op( principal=principal, action="set_take_profit", exchange="hyperliquid", target=body.instrument, payload={"tp_price": body.tp_price, "size": body.size}, result=result, ) return result @app.post("/tools/close_position", tags=["writes"]) async def t_close_position( body: ClosePositionReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) result = await client.close_position(body.instrument) audit_write_op( principal=principal, action="close_position", exchange="hyperliquid", target=body.instrument, payload={}, result=result, ) return result # ───── MCP endpoint (/mcp) — bridge verso /tools/* ───── port = int(os.environ.get("PORT", "9012")) mount_mcp_endpoint( app, name="cerbero-hyperliquid", version="0.1.0", token_store=token_store, internal_base_url=f"http://localhost:{port}", tools=[ {"name": "environment_info", "description": "Ambiente operativo (testnet/mainnet), source, base_url, max_leverage cap."}, {"name": "get_markets", "description": "Lista mercati perp disponibili."}, {"name": "get_ticker", "description": "Ticker di un perp."}, {"name": "get_orderbook", "description": "Orderbook L2."}, {"name": "get_positions", "description": "Posizioni aperte."}, {"name": "get_account_summary", "description": "Account summary (spot + perp equity)."}, {"name": "get_trade_history", "description": "Storia trade."}, {"name": "get_historical", "description": "OHLCV storico."}, {"name": "get_open_orders", "description": "Ordini aperti."}, {"name": "get_funding_rate", "description": "Funding rate corrente per simbolo."}, {"name": "basis_spot_perp", "description": "Basis spot-perp annualizzato + carry opportunity detection."}, {"name": "get_indicators", "description": "Indicatori tecnici."}, {"name": "place_order", "description": "Invia ordine (CORE only)."}, {"name": "cancel_order", "description": "Cancella ordine."}, {"name": "set_stop_loss", "description": "Stop loss su posizione."}, {"name": "set_take_profit", "description": "Take profit su posizione."}, {"name": "close_position", "description": "Chiude posizione."}, ], ) return app