from __future__ import annotations import os from fastapi import Depends, FastAPI, HTTPException from mcp_common.audit import audit_write_op from mcp_common.auth import Principal, TokenStore, require_principal from mcp_common.environment import EnvironmentInfo from mcp_common.mcp_bridge import mount_mcp_endpoint from mcp_common.server import build_app from pydantic import BaseModel, field_validator, model_validator from mcp_deribit.client import DeribitClient from mcp_deribit.leverage_cap import enforce_leverage as _enforce_leverage from mcp_deribit.leverage_cap import get_max_leverage # --- Body models --- class GetTickerReq(BaseModel): instrument_name: str | None = None instrument: str | None = None model_config = {"extra": "allow"} @model_validator(mode="after") def _normalize(self): sym = self.instrument_name or self.instrument if not sym: raise ValueError("instrument_name (or instrument) is required") self.instrument_name = sym return self class GetTickerBatchReq(BaseModel): instrument_names: list[str] | None = None instruments: list[str] | None = None model_config = {"extra": "allow"} @model_validator(mode="after") def _normalize(self): names = self.instrument_names or self.instruments if not names: raise ValueError("instrument_names (or instruments) is required") self.instrument_names = names return self class GetInstrumentsReq(BaseModel): currency: str kind: str | None = None expiry_from: str | None = None expiry_to: str | None = None strike_min: float | None = None strike_max: float | None = None min_open_interest: float | None = None limit: int = 100 offset: int = 0 class GetOrderbookReq(BaseModel): instrument_name: str depth: int = 10 class OrderbookImbalanceReq(BaseModel): instrument_name: str depth: int = 10 class GetPositionsReq(BaseModel): currency: str = "USDC" class GetAccountSummaryReq(BaseModel): currency: str = "USDC" class GetTradeHistoryReq(BaseModel): limit: int = 100 instrument_name: str | None = None class GetHistoricalReq(BaseModel): instrument: str start_date: str end_date: str resolution: str = "1h" class GetDvolReq(BaseModel): currency: str = "BTC" start_date: str end_date: str resolution: str = "1D" class GetDvolHistoryReq(BaseModel): currency: str = "BTC" lookback_days: int = 90 class GetIvRankReq(BaseModel): instrument: str class GetRealizedVolReq(BaseModel): currency: str = "BTC" windows: list[int] = [14, 30] class GetGexReq(BaseModel): currency: str expiry_from: str | None = None expiry_to: str | None = None top_n_strikes: int = 50 class OptionFlowReq(BaseModel): """Body comune per indicatori option-flow (dealer gamma, vanna/charm, OI-weighted skew, smile asymmetry, ATM vs wings).""" currency: str expiry_from: str | None = None expiry_to: str | None = None top_n_strikes: int = 100 class GetPcRatioReq(BaseModel): currency: str class GetSkew25dReq(BaseModel): currency: str expiry: str class GetTermStructureReq(BaseModel): currency: str class CalculateSpreadPayoffReq(BaseModel): legs: list[dict] quote_currency: str = "USD" class RunBacktestReq(BaseModel): strategy_name: str underlying: str = "BTC" lookback_days: int = 30 resolution: str = "4h" entry_rules: dict | None = None exit_rules: dict | None = None class FindByDeltaReq(BaseModel): currency: str expiry: str target_delta: float option_type: str max_results: int = 3 min_open_interest: float = 100.0 min_volume_24h: float = 20.0 class GetIndicatorsReq(BaseModel): instrument: str indicators: list[str] start_date: str end_date: str resolution: str = "1h" @field_validator("indicators", mode="before") @classmethod def _coerce_indicators(cls, v): if isinstance(v, str): import json s = v.strip() if s.startswith("["): try: parsed = json.loads(s) if isinstance(parsed, list): return [str(x).strip() for x in parsed if str(x).strip()] except json.JSONDecodeError: pass return [x.strip() for x in s.split(",") if x.strip()] if isinstance(v, list): return v raise ValueError( "indicators must be a list like ['rsi','atr','macd'] " "or a comma-separated string like 'rsi,atr,macd'" ) class PlaceOrderReq(BaseModel): instrument_name: str side: str # "buy" | "sell" amount: float type: str = "limit" price: float | None = None reduce_only: bool = False post_only: bool = False label: str | None = None leverage: int | None = None # CER-016: None → default cap (3x) class ComboLeg(BaseModel): instrument_name: str direction: str # "buy" | "sell" ratio: int = 1 class PlaceComboOrderReq(BaseModel): legs: list[ComboLeg] side: str # "buy" | "sell" amount: float type: str = "limit" price: float | None = None label: str | None = None leverage: int | None = None @model_validator(mode="after") def _at_least_two_legs(self): if len(self.legs) < 2: raise ValueError("combo requires at least 2 legs") return self class CancelOrderReq(BaseModel): order_id: str class SetStopLossReq(BaseModel): order_id: str stop_price: float class SetTakeProfitReq(BaseModel): order_id: str tp_price: float class ClosePositionReq(BaseModel): instrument_name: str # --- ACL helper --- def _check(principal: Principal, *, core: bool = False, observer: bool = False) -> None: allowed: set[str] = set() if core: allowed.add("core") if observer: allowed.add("observer") if not (principal.capabilities & allowed): raise HTTPException(403, f"capability required: {allowed}") # --- App factory --- def create_app( *, client: DeribitClient, token_store: TokenStore, creds: dict, env_info: EnvironmentInfo | None = None, ) -> FastAPI: from contextlib import asynccontextmanager cap_default = get_max_leverage(creds) # CER-016: pre-set leverage cap su perp principali al boot (best-effort). @asynccontextmanager async def _lifespan(_app: FastAPI): for inst in ("BTC-PERPETUAL", "ETH-PERPETUAL"): try: await client.set_leverage(inst, cap_default) except Exception: pass yield app = build_app( name="mcp-deribit", version="0.1.0", token_store=token_store, lifespan=_lifespan, ) # --- Read tools: core + observer --- @app.post("/tools/is_testnet", tags=["reads"]) async def t_is_testnet(principal: Principal = Depends(require_principal)): _check(principal, core=True, observer=True) return client.is_testnet() @app.post("/tools/environment_info", tags=["reads"]) async def t_environment_info(principal: Principal = Depends(require_principal)): _check(principal, core=True, observer=True) if env_info is None: return { "exchange": "deribit", "environment": "testnet" if client.is_testnet().get("testnet") else "mainnet", "source": "credentials", "env_value": None, "base_url": client.base_url, "max_leverage": get_max_leverage(creds), } return { "exchange": env_info.exchange, "environment": env_info.environment, "source": env_info.source, "env_value": env_info.env_value, "base_url": env_info.base_url, "max_leverage": get_max_leverage(creds), } @app.post("/tools/get_ticker", tags=["reads"]) async def t_get_ticker( body: GetTickerReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_ticker(body.instrument_name) @app.post("/tools/get_ticker_batch", tags=["reads"]) async def t_get_ticker_batch( body: GetTickerBatchReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_ticker_batch(body.instrument_names) @app.post("/tools/get_instruments", tags=["reads"]) async def t_get_instruments( body: GetInstrumentsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_instruments( currency=body.currency, kind=body.kind, expiry_from=body.expiry_from, expiry_to=body.expiry_to, strike_min=body.strike_min, strike_max=body.strike_max, min_open_interest=body.min_open_interest, limit=body.limit, offset=body.offset, ) @app.post("/tools/get_orderbook", tags=["reads"]) async def t_get_orderbook( body: GetOrderbookReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_orderbook(body.instrument_name, body.depth) @app.post("/tools/get_orderbook_imbalance", tags=["reads"]) async def t_get_ob_imbalance( body: OrderbookImbalanceReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_orderbook_imbalance(body.instrument_name, body.depth) @app.post("/tools/get_positions", tags=["reads"]) async def t_get_positions( body: GetPositionsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_positions(body.currency) @app.post("/tools/get_account_summary", tags=["reads"]) async def t_get_account_summary( body: GetAccountSummaryReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_account_summary(body.currency) @app.post("/tools/get_trade_history", tags=["reads"]) async def t_get_trade_history( body: GetTradeHistoryReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_trade_history(body.limit, body.instrument_name) @app.post("/tools/get_historical", tags=["reads"]) async def t_get_historical( body: GetHistoricalReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_historical( body.instrument, body.start_date, body.end_date, body.resolution ) @app.post("/tools/get_dvol", tags=["reads"]) async def t_get_dvol( body: GetDvolReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_dvol( body.currency, body.start_date, body.end_date, body.resolution ) @app.post("/tools/get_gex", tags=["reads"]) async def t_get_gex( body: GetGexReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_gex( body.currency, body.expiry_from, body.expiry_to, body.top_n_strikes ) @app.post("/tools/get_dealer_gamma_profile", tags=["reads"]) async def t_get_dealer_gamma_profile( body: OptionFlowReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_dealer_gamma_profile( body.currency, body.expiry_from, body.expiry_to, body.top_n_strikes ) @app.post("/tools/get_vanna_charm", tags=["reads"]) async def t_get_vanna_charm( body: OptionFlowReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_vanna_charm( body.currency, body.expiry_from, body.expiry_to, body.top_n_strikes ) @app.post("/tools/get_oi_weighted_skew", tags=["reads"]) async def t_get_oi_weighted_skew( body: OptionFlowReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_oi_weighted_skew( body.currency, body.expiry_from, body.expiry_to, body.top_n_strikes ) @app.post("/tools/get_smile_asymmetry", tags=["reads"]) async def t_get_smile_asymmetry( body: OptionFlowReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_smile_asymmetry( body.currency, body.expiry_from, body.expiry_to, body.top_n_strikes ) @app.post("/tools/get_atm_vs_wings_vol", tags=["reads"]) async def t_get_atm_vs_wings_vol( body: OptionFlowReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_atm_vs_wings_vol( body.currency, body.expiry_from, body.expiry_to, body.top_n_strikes ) @app.post("/tools/get_pc_ratio", tags=["reads"]) async def t_get_pc_ratio( body: GetPcRatioReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_pc_ratio(body.currency) @app.post("/tools/get_skew_25d", tags=["reads"]) async def t_get_skew_25d( body: GetSkew25dReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_skew_25d(body.currency, body.expiry) @app.post("/tools/get_term_structure", tags=["reads"]) async def t_get_term_structure( body: GetTermStructureReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_term_structure(body.currency) @app.post("/tools/run_backtest", tags=["writes"]) async def t_run_backtest( body: RunBacktestReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.run_backtest( strategy_name=body.strategy_name, underlying=body.underlying, lookback_days=body.lookback_days, resolution=body.resolution, entry_rules=body.entry_rules, exit_rules=body.exit_rules, ) @app.post("/tools/calculate_spread_payoff", tags=["writes"]) async def t_calculate_spread_payoff( body: CalculateSpreadPayoffReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.calculate_spread_payoff(body.legs, body.quote_currency) @app.post("/tools/find_by_delta", tags=["writes"]) async def t_find_by_delta( body: FindByDeltaReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.find_by_delta( currency=body.currency, expiry=body.expiry, target_delta=body.target_delta, option_type=body.option_type, max_results=body.max_results, min_open_interest=body.min_open_interest, min_volume_24h=body.min_volume_24h, ) @app.post("/tools/get_iv_rank", tags=["reads"]) async def t_get_iv_rank( body: GetIvRankReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_iv_rank(body.instrument) @app.post("/tools/get_dvol_history", tags=["reads"]) async def t_get_dvol_history( body: GetDvolHistoryReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_dvol_history(body.currency, body.lookback_days) @app.post("/tools/get_realized_vol", tags=["reads"]) async def t_get_realized_vol( body: GetRealizedVolReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_realized_vol(body.currency, body.windows) @app.post("/tools/get_technical_indicators", tags=["reads"]) async def t_get_indicators( body: GetIndicatorsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await client.get_technical_indicators( body.instrument, body.indicators, body.start_date, body.end_date, body.resolution, ) # --- Write tools: core only --- @app.post("/tools/place_order", tags=["writes"]) async def t_place_order( body: PlaceOrderReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) lev = _enforce_leverage(body.leverage, creds=creds, exchange="deribit") if lev != cap_default: try: await client.set_leverage(body.instrument_name, lev) except Exception: pass result = await client.place_order( instrument_name=body.instrument_name, side=body.side, amount=body.amount, type=body.type, price=body.price, reduce_only=body.reduce_only, post_only=body.post_only, label=body.label, ) audit_write_op( principal=principal, action="place_order", exchange="deribit", target=body.instrument_name, payload={"side": body.side, "amount": body.amount, "type": body.type, "price": body.price, "leverage": lev, "label": body.label}, result=result, ) return result @app.post("/tools/place_combo_order", tags=["writes"]) async def t_place_combo_order( body: PlaceComboOrderReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) lev = _enforce_leverage(body.leverage, creds=creds, exchange="deribit") if lev != cap_default: for leg in body.legs: try: await client.set_leverage(leg.instrument_name, lev) except Exception: pass result = await client.place_combo_order( legs=[leg.model_dump() for leg in body.legs], side=body.side, amount=body.amount, type=body.type, price=body.price, label=body.label, ) audit_write_op( principal=principal, action="place_combo_order", exchange="deribit", target=result.get("combo_instrument") if isinstance(result, dict) else None, payload={"legs": [leg.model_dump() for leg in body.legs], "side": body.side, "amount": body.amount, "leverage": lev}, result=result if isinstance(result, dict) else None, ) return result @app.post("/tools/cancel_order", tags=["writes"]) async def t_cancel_order( body: CancelOrderReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) result = await client.cancel_order(body.order_id) audit_write_op( principal=principal, action="cancel_order", exchange="deribit", target=body.order_id, payload={}, result=result, ) return result @app.post("/tools/set_stop_loss", tags=["writes"]) async def t_set_sl( body: SetStopLossReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) result = await client.set_stop_loss(body.order_id, body.stop_price) audit_write_op( principal=principal, action="set_stop_loss", exchange="deribit", target=body.order_id, payload={"stop_price": body.stop_price}, result=result, ) return result @app.post("/tools/set_take_profit", tags=["writes"]) async def t_set_tp( body: SetTakeProfitReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) result = await client.set_take_profit(body.order_id, body.tp_price) audit_write_op( principal=principal, action="set_take_profit", exchange="deribit", target=body.order_id, payload={"tp_price": body.tp_price}, result=result, ) return result @app.post("/tools/close_position", tags=["writes"]) async def t_close_position( body: ClosePositionReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True) result = await client.close_position(body.instrument_name) audit_write_op( principal=principal, action="close_position", exchange="deribit", target=body.instrument_name, payload={}, result=result, ) return result # ───── MCP endpoint (/mcp) — bridge verso /tools/* ───── port = int(os.environ.get("PORT", "9011")) mount_mcp_endpoint( app, name="cerbero-deribit", version="0.1.0", token_store=token_store, internal_base_url=f"http://localhost:{port}", tools=[ {"name": "is_testnet", "description": "True se client Deribit è in modalità testnet."}, {"name": "environment_info", "description": "Ambiente operativo (testnet/mainnet), source, base_url, max_leverage cap."}, {"name": "get_ticker", "description": "Ticker di un instrument Deribit."}, {"name": "get_ticker_batch", "description": "Ticker per N instruments in parallelo (max 20)."}, {"name": "get_instruments", "description": "Lista instruments per currency."}, {"name": "get_orderbook", "description": "Orderbook L1/L2 per instrument."}, {"name": "get_orderbook_imbalance", "description": "Microstructure: imbalance ratio + microprice + slope."}, {"name": "get_positions", "description": "Posizioni aperte."}, {"name": "get_account_summary", "description": "Summary account (equity, balance)."}, {"name": "get_trade_history", "description": "Storia trade recenti."}, {"name": "get_historical", "description": "OHLCV storico."}, {"name": "get_dvol", "description": "Deribit Volatility Index (DVOL) OHLC per currency (BTC/ETH)."}, {"name": "get_dvol_history", "description": "DVOL time series + percentili su lookback_days."}, {"name": "get_iv_rank", "description": "IV rank 30/90/365d di un instrument vs DVOL storico della currency."}, {"name": "find_by_delta", "description": "Trova strike con delta più vicino a target, filtrato per liquidità (OI/vol)."}, {"name": "calculate_spread_payoff", "description": "Payoff/greci/max P-L/break-even/fee per struttura multi-leg."}, {"name": "run_backtest", "description": "Heuristic backtest RSI-based su storia OHLCV per threshold accept/marginal/reject."}, {"name": "get_term_structure", "description": "IV ATM per ogni expiry disponibile, detect contango/backwardation."}, {"name": "get_skew_25d", "description": "Skew 25-delta put/call IV + risk reversal + butterfly per expiry."}, {"name": "get_pc_ratio", "description": "Put/Call ratio aggregato su OI e volume 24h."}, {"name": "get_gex", "description": "Gamma exposure per strike + zero gamma level (top N strikes per OI)."}, {"name": "get_dealer_gamma_profile", "description": "Net dealer gamma per strike (short calls/long puts) + gamma flip level."}, {"name": "get_vanna_charm", "description": "Vanna (∂delta/∂IV) e Charm (∂delta/∂t) aggregati pesati OI."}, {"name": "get_oi_weighted_skew", "description": "Skew aggregato pesato per OI: IV puts - IV calls. Positivo = paura."}, {"name": "get_smile_asymmetry", "description": "Asymmetry IV otm-puts vs otm-calls + ATM IV reference."}, {"name": "get_atm_vs_wings_vol", "description": "IV ATM vs IV ali 25-delta. wing_richness > 0 = smile/kurtosis."}, {"name": "get_technical_indicators", "description": "Indicatori tecnici (RSI, MACD, ATR, ADX)."}, {"name": "get_realized_vol", "description": "Volatilità realizzata annualizzata (log-return std) BTC/ETH + spread IV−RV."}, {"name": "place_order", "description": "Invia ordine (CORE only, testnet)."}, {"name": "place_combo_order", "description": "Crea combo via private/create_combo + piazza ordine sul combo (1 cross spread invece di N)."}, {"name": "cancel_order", "description": "Cancella ordine."}, {"name": "set_stop_loss", "description": "Setta stop loss su posizione."}, {"name": "set_take_profit", "description": "Setta take profit su posizione."}, {"name": "close_position", "description": "Chiude posizione aperta."}, ], ) return app