From daa4e02971a0fed593681916a9369c613d3c96fc Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Thu, 30 Apr 2026 18:23:44 +0200 Subject: [PATCH] feat(V2): migrazione deribit (client, leverage_cap, tools) Task 6.1 V2.0.0: copia client.py + leverage_cap.py da services/mcp-deribit con import riscritti (mcp_common -> cerbero_mcp.common, mcp_deribit -> cerbero_mcp.exchanges.deribit). Estratte 34 tool async (28 endpoint + is_testnet/environment_info + helpers) in tools.py: pure logica senza FastAPI/ACL. Audit calls per ora rimossi (TODO: cabling via router su request.state.environment). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cerbero_mcp/exchanges/deribit/__init__.py | 0 src/cerbero_mcp/exchanges/deribit/client.py | 1673 +++++++++++++++++ .../exchanges/deribit/leverage_cap.py | 56 + src/cerbero_mcp/exchanges/deribit/tools.py | 528 ++++++ 4 files changed, 2257 insertions(+) create mode 100644 src/cerbero_mcp/exchanges/deribit/__init__.py create mode 100644 src/cerbero_mcp/exchanges/deribit/client.py create mode 100644 src/cerbero_mcp/exchanges/deribit/leverage_cap.py create mode 100644 src/cerbero_mcp/exchanges/deribit/tools.py diff --git a/src/cerbero_mcp/exchanges/deribit/__init__.py b/src/cerbero_mcp/exchanges/deribit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cerbero_mcp/exchanges/deribit/client.py b/src/cerbero_mcp/exchanges/deribit/client.py new file mode 100644 index 0000000..4dbb6b9 --- /dev/null +++ b/src/cerbero_mcp/exchanges/deribit/client.py @@ -0,0 +1,1673 @@ +from __future__ import annotations + +import contextlib +import time +from dataclasses import dataclass, field +from typing import Any + +from cerbero_mcp.common import indicators as ind +from cerbero_mcp.common import microstructure as micro +from cerbero_mcp.common import options as opt +from cerbero_mcp.common.http import async_client + +BASE_LIVE = "https://www.deribit.com/api/v2" +BASE_TESTNET = "https://test.deribit.com/api/v2" + +RESOLUTION_MAP = { + "1m": "1", + "5m": "5", + "15m": "15", + "1h": "60", + "4h": "240", + "1d": "1D", +} + + +@dataclass +class DeribitClient: + client_id: str + client_secret: str + testnet: bool = True + _token: str | None = field(default=None, init=False, repr=False) + _token_expires_at: float = field(default=0.0, init=False, repr=False) + + @property + def base_url(self) -> str: + return BASE_TESTNET if self.testnet else BASE_LIVE + + # ── Auth ───────────────────────────────────────────────────── + + async def _authenticate(self) -> str: + url = f"{self.base_url}/public/auth" + params = { + "grant_type": "client_credentials", + "client_id": self.client_id, + "client_secret": self.client_secret, + } + async with async_client(timeout=15.0) as http: + resp = await http.get(url, params=params) + data = resp.json() + result = data["result"] + self._token = result["access_token"] + self._token_expires_at = time.monotonic() + result.get("expires_in", 900) - 30 + return self._token + + async def _get_token(self) -> str: + if self._token is None or time.monotonic() >= self._token_expires_at: + await self._authenticate() + return self._token # type: ignore[return-value] + + async def _request(self, method: str, params: dict[str, Any] | None = None) -> dict: + is_private = method.startswith("private/") + if is_private: + await self._get_token() + + url = f"{self.base_url}/{method}" + request_params = dict(params) if params else {} + headers: dict[str, str] = {} + if is_private and self._token: + headers["Authorization"] = f"Bearer {self._token}" + + async with async_client(timeout=15.0) as http: + resp = await http.get(url, params=request_params, headers=headers) + data = resp.json() + + if "result" not in data: + error = data.get("error", {}) + error_code = error.get("code", 0) if isinstance(error, dict) else 0 + error_msg = error.get("message", str(data)) if isinstance(error, dict) else str(error) + # Re-authenticate on auth errors and retry once + if is_private and error_code in (13004, 13009, 13015): + self._token = None + await self._authenticate() + headers["Authorization"] = f"Bearer {self._token}" + resp = await http.get(url, params=request_params, headers=headers) + data = resp.json() + if "result" in data: + return data + return {"result": None, "error": error_msg} + + return data + + # ── Read tools ─────────────────────────────────────────────── + + def is_testnet(self) -> dict: + return {"testnet": self.testnet, "base_url": self.base_url} + + async def get_ticker(self, instrument_name: str) -> dict: + import datetime as _dt + raw = await self._request("public/ticker", {"instrument_name": instrument_name}) + r = raw.get("result") or {} + is_perp = instrument_name.upper().endswith("-PERPETUAL") + greeks = r.get("greeks") + if is_perp and greeks is None: + greeks = {"delta": 1.0, "gamma": 0.0, "vega": 0.0, "theta": 0.0, "rho": 0.0} + return { + "instrument_name": instrument_name, + "last_price": r.get("last_price"), + "mark_price": r.get("mark_price"), + "bid": r.get("best_bid_price"), + "ask": r.get("best_ask_price"), + "volume_24h": r.get("stats", {}).get("volume"), + "open_interest": r.get("open_interest"), + "greeks": greeks, + "mark_iv": r.get("mark_iv"), + "testnet": self.testnet, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + } + + async def get_ticker_batch(self, instrument_names: list[str]) -> dict: + """Fetch multiple tickers in parallel (max 20).""" + import asyncio + import datetime as _dt + + if not instrument_names: + return {"tickers": [], "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat()} + if len(instrument_names) > 20: + return {"error": "max 20 instruments per batch"} + + results = await asyncio.gather( + *[self.get_ticker(n) for n in instrument_names], + return_exceptions=True, + ) + tickers = [] + errors = [] + for name, res in zip(instrument_names, results, strict=True): + if isinstance(res, Exception): + errors.append({"instrument": name, "error": str(res)}) + else: + tickers.append(res) + return { + "tickers": tickers, + "errors": errors, + "count": len(tickers), + "testnet": self.testnet, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + } + + async def get_instruments( + self, + currency: str, + kind: str | None = None, + expired: bool = False, + expiry_from: str | None = None, + expiry_to: str | None = None, + strike_min: float | None = None, + strike_max: float | None = None, + min_open_interest: float | None = None, + limit: int = 100, + offset: int = 0, + ) -> dict: + import asyncio + import datetime as _dt + + def _parse_iso(s: str | None) -> int | None: + if not s: + return None + try: + dt = _dt.datetime.fromisoformat(s) + except ValueError: + dt = _dt.datetime.strptime(s, "%Y-%m-%d") + return int(dt.timestamp() * 1000) + + expiry_from_ms = _parse_iso(expiry_from) + expiry_to_ms = _parse_iso(expiry_to) + + params: dict[str, Any] = {"currency": currency, "expired": expired} + if kind: + params["kind"] = kind + + # CER-008: fetch metadata + book_summary in parallelo per popolare OI. + # `public/get_instruments` non include open_interest; book_summary sì. + summary_params: dict[str, Any] = {"currency": currency} + if kind: + summary_params["kind"] = kind + instruments_raw, summary_raw = await asyncio.gather( + self._request("public/get_instruments", params), + self._request("public/get_book_summary_by_currency", summary_params), + return_exceptions=True, + ) + raw = instruments_raw if isinstance(instruments_raw, dict) else {} + summary_items = ( + summary_raw.get("result") if isinstance(summary_raw, dict) else None + ) or [] + oi_by_name: dict[str, float] = {} + for s in summary_items: + name = s.get("instrument_name") + oi = s.get("open_interest") + if name and oi is not None: + with contextlib.suppress(TypeError, ValueError): + oi_by_name[name] = float(oi) + + all_items = raw.get("result") or [] + filtered: list[dict] = [] + for i in all_items: + exp_ms = i.get("expiration_timestamp") + strike = i.get("strike") + name = i.get("instrument_name") + # CER-008: popola OI dal book_summary se mancante + oi = i.get("open_interest") + if oi is None and name in oi_by_name: + oi = oi_by_name[name] + i["open_interest"] = oi + if expiry_from_ms is not None and exp_ms is not None and exp_ms < expiry_from_ms: + continue + if expiry_to_ms is not None and exp_ms is not None and exp_ms > expiry_to_ms: + continue + if strike_min is not None and strike is not None and strike < strike_min: + continue + if strike_max is not None and strike is not None and strike > strike_max: + continue + if ( + min_open_interest is not None + and oi is not None + and oi < min_open_interest + ): + continue + filtered.append(i) + + total = len(filtered) + page = filtered[offset : offset + limit] + instruments = [ + { + "name": i.get("instrument_name"), + "strike": i.get("strike"), + "expiry": i.get("expiration_timestamp"), + "option_type": i.get("option_type"), + "tick_size": i.get("tick_size"), + "min_trade_amount": i.get("min_trade_amount"), + "open_interest": i.get("open_interest"), + } + for i in page + ] + return { + "instruments": instruments, + "total": total, + "offset": offset, + "limit": limit, + "has_more": offset + limit < total, + "testnet": self.testnet, + } + + async def get_orderbook(self, instrument_name: str, depth: int = 10) -> dict: + import datetime as _dt + raw = await self._request( + "public/get_order_book", {"instrument_name": instrument_name, "depth": depth} + ) + r = raw.get("result") or {} + return { + "bids": r.get("bids", []), + "asks": r.get("asks", []), + "timestamp": r.get("timestamp"), + "testnet": self.testnet, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + } + + async def get_orderbook_imbalance(self, instrument_name: str, depth: int = 10) -> dict: + """Microstructure: bid/ask imbalance + microprice + slope su top-N livelli.""" + ob = await self.get_orderbook(instrument_name, depth=max(depth, 10)) + result = micro.orderbook_imbalance(ob.get("bids") or [], ob.get("asks") or [], depth=depth) + return { + "instrument_name": instrument_name, + "depth": depth, + **result, + "timestamp": ob.get("timestamp"), + "testnet": self.testnet, + } + + async def get_positions(self, currency: str = "USDC") -> list: + raw = await self._request("private/get_positions", {"currency": currency}) + result = raw.get("result") or [] + positions = [] + for p in result: + size = p.get("size", 0) + if size == 0: + continue + positions.append({ + "instrument": p.get("instrument_name"), + "size": abs(size), + "direction": "long" if size > 0 else "short", + "avg_price": p.get("average_price"), + "mark_price": p.get("mark_price"), + "unrealized_pnl": p.get("floating_profit_loss"), + "realized_pnl": p.get("realized_profit_loss"), + "leverage": p.get("leverage"), + }) + return positions + + async def get_account_summary(self, currency: str = "USDC") -> dict: + raw = await self._request("private/get_account_summary", {"currency": currency}) + r = raw.get("result") + if not r: + return { + "equity": 0, + "balance": 0, + "margin_balance": 0, + "available_funds": 0, + "unrealized_pnl": 0, + "total_pnl": 0, + "testnet": self.testnet, + "error": raw.get("error", "no result"), + } + return { + "equity": r.get("equity", 0), + "balance": r.get("balance", 0), + "margin_balance": r.get("margin_balance", 0), + "available_funds": r.get("available_funds", 0), + "unrealized_pnl": r.get("unrealized_pnl", 0), + "total_pnl": r.get("total_pnl", 0), + "testnet": self.testnet, + } + + async def get_trade_history( + self, limit: int = 100, instrument_name: str | None = None + ) -> list: + if instrument_name: + raw = await self._request( + "private/get_user_trades_by_instrument", + {"instrument_name": instrument_name, "count": limit}, + ) + else: + raw = await self._request( + "private/get_user_trades_by_currency", + {"currency": "BTC", "count": limit}, + ) + result = raw.get("result") or {} + if not isinstance(result, dict): + return [] + trades_raw = result.get("trades") or [] + trades = [] + for t in trades_raw: + if not isinstance(t, dict): + continue + trades.append({ + "instrument": t.get("instrument_name"), + "direction": t.get("direction"), + "price": t.get("price"), + "amount": t.get("amount"), + "fee": t.get("fee"), + "timestamp": t.get("timestamp"), + "order_id": t.get("order_id"), + }) + return trades + + async def get_historical( + self, instrument: str, start_date: str, end_date: str, resolution: str + ) -> dict: + # Convert ISO date strings to millisecond timestamps + import datetime as _dt + + def _to_ms(date_str: str) -> int: + try: + dt = _dt.datetime.fromisoformat(date_str) + except ValueError: + dt = _dt.datetime.strptime(date_str, "%Y-%m-%d") + return int(dt.timestamp() * 1000) + + start_ms = _to_ms(start_date) + end_ms = _to_ms(end_date) + res = RESOLUTION_MAP.get(resolution, resolution) + raw = await self._request( + "public/get_tradingview_chart_data", + { + "instrument_name": instrument, + "start_timestamp": start_ms, + "end_timestamp": end_ms, + "resolution": res, + }, + ) + r = raw.get("result") or {} + candles = [] + ticks = r.get("ticks", []) or [] + opens = r.get("open", []) or [] + highs = r.get("high", []) or [] + lows = r.get("low", []) or [] + closes = r.get("close", []) or [] + volumes = r.get("volume", []) or [] + for idx, ts in enumerate(ticks): + if idx >= min(len(opens), len(highs), len(lows), len(closes), len(volumes)): + break + candles.append({ + "timestamp": ts, + "open": opens[idx], + "high": highs[idx], + "low": lows[idx], + "close": closes[idx], + "volume": volumes[idx], + }) + return {"candles": candles} + + async def get_dvol( + self, + currency: str, + start_date: str, + end_date: str, + resolution: str = "1D", + ) -> dict: + import datetime as _dt + + def _to_ms(date_str: str) -> int: + try: + dt = _dt.datetime.fromisoformat(date_str) + except ValueError: + dt = _dt.datetime.strptime(date_str, "%Y-%m-%d") + return int(dt.timestamp() * 1000) + + start_ms = _to_ms(start_date) + end_ms = _to_ms(end_date) + res = RESOLUTION_MAP.get(resolution, resolution) + raw = await self._request( + "public/get_volatility_index_data", + { + "currency": currency.upper(), + "start_timestamp": start_ms, + "end_timestamp": end_ms, + "resolution": res, + }, + ) + r = raw.get("result") or {} + rows = r.get("data") or [] + candles = [ + { + "timestamp": row[0], + "open": row[1], + "high": row[2], + "low": row[3], + "close": row[4], + } + for row in rows + if len(row) >= 5 + ] + latest = candles[-1]["close"] if candles else None + return {"currency": currency.upper(), "latest": latest, "candles": candles} + + async def get_gex( + self, + currency: str, + expiry_from: str | None = None, + expiry_to: str | None = None, + top_n_strikes: int = 50, + ) -> dict: + import asyncio + import datetime as _dt + + currency = currency.upper() + try: + idx_tk = await self.get_ticker(f"{currency}-PERPETUAL") + spot = float(idx_tk.get("mark_price") or 0) + except Exception: + spot = 0.0 + + chain = await self.get_instruments( + currency=currency, + kind="option", + expiry_from=expiry_from, + expiry_to=expiry_to, + limit=2000, + ) + items = chain.get("instruments", []) + items.sort(key=lambda x: -(x.get("open_interest") or 0)) + top = items[:top_n_strikes] + + async def _ticker(name: str) -> dict: + try: + return await self.get_ticker(name) + except Exception: + return {} + + tickers = await asyncio.gather(*[_ticker(i["name"]) for i in top]) + + by_strike: dict[float, dict[str, float]] = {} + for meta, tk in zip(top, tickers, strict=True): + strike = meta.get("strike") + if strike is None: + continue + greeks = tk.get("greeks") or {} + gamma = greeks.get("gamma") + oi = meta.get("open_interest") or 0 + if gamma is None or spot <= 0: + continue + gex_contribution = float(gamma) * oi * (spot ** 2) * 0.01 + entry = by_strike.setdefault( + float(strike), {"strike": float(strike), "call_gex": 0.0, "put_gex": 0.0} + ) + if meta.get("option_type") == "call": + entry["call_gex"] += gex_contribution + else: + entry["put_gex"] -= gex_contribution + + rows = [] + for s in sorted(by_strike.keys()): + e = by_strike[s] + e["net_gex"] = e["call_gex"] + e["put_gex"] + rows.append(e) + + total_gex = sum(r["net_gex"] for r in rows) + zero_gamma = None + for a, b in zip(rows, rows[1:], strict=False): + if (a["net_gex"] < 0 <= b["net_gex"]) or (a["net_gex"] > 0 >= b["net_gex"]): + denom = b["net_gex"] - a["net_gex"] + if denom != 0: + frac = -a["net_gex"] / denom + zero_gamma = round(a["strike"] + frac * (b["strike"] - a["strike"]), 2) + break + + max_gex_level = max(rows, key=lambda r: r["net_gex"])["strike"] if rows else None + min_gex_level = min(rows, key=lambda r: r["net_gex"])["strike"] if rows else None + + return { + "currency": currency, + "expiry_from": expiry_from, + "expiry_to": expiry_to, + "spot_price": spot, + "total_gex_usd": round(total_gex, 2), + "zero_gamma_level": zero_gamma, + "gex_by_strike": [ + { + "strike": r["strike"], + "call_gex": round(r["call_gex"], 2), + "put_gex": round(r["put_gex"], 2), + "net_gex": round(r["net_gex"], 2), + } + for r in rows + ], + "max_gex_level": max_gex_level, + "min_gex_level": min_gex_level, + "strikes_analyzed": len(rows), + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + "testnet": self.testnet, + } + + async def _fetch_chain_legs( + self, + currency: str, + expiry_from: str | None = None, + expiry_to: str | None = None, + top_n_strikes: int = 50, + ) -> tuple[float, list[dict[str, Any]]]: + """Fetch chain options + ticker per top-N strikes per OI; restituisce + (spot, legs[]) con campi normalizzati per le funzioni in cerbero_mcp.common.options. + """ + import asyncio + + currency = currency.upper() + try: + idx_tk = await self.get_ticker(f"{currency}-PERPETUAL") + spot = float(idx_tk.get("mark_price") or 0) + except Exception: + spot = 0.0 + + chain = await self.get_instruments( + currency=currency, + kind="option", + expiry_from=expiry_from, + expiry_to=expiry_to, + limit=2000, + ) + items = chain.get("instruments", []) + items.sort(key=lambda x: -(x.get("open_interest") or 0)) + top = items[:top_n_strikes] + + async def _ticker(name: str) -> dict: + try: + return await self.get_ticker(name) + except Exception: + return {} + + tickers = await asyncio.gather(*[_ticker(i["name"]) for i in top]) + legs: list[dict[str, Any]] = [] + for meta, tk in zip(top, tickers, strict=True): + greeks = tk.get("greeks") or {} + legs.append({ + "strike": meta.get("strike"), + "option_type": meta.get("option_type"), + "oi": meta.get("open_interest") or 0, + "iv": tk.get("mark_iv"), + "delta": greeks.get("delta"), + "gamma": greeks.get("gamma"), + "vanna": greeks.get("vanna"), + "charm": greeks.get("charm"), + "vega": greeks.get("vega"), + }) + return spot, legs + + async def get_dealer_gamma_profile( + self, + currency: str, + expiry_from: str | None = None, + expiry_to: str | None = None, + top_n_strikes: int = 50, + ) -> dict: + """Net dealer gamma per strike (assume dealer short calls/long puts). + Identifica il gamma flip level: sopra → mercato pinning, sotto → squeeze. + """ + import datetime as _dt + spot, legs = await self._fetch_chain_legs(currency, expiry_from, expiry_to, top_n_strikes) + result = opt.dealer_gamma_profile(legs, spot) + return { + "currency": currency.upper(), + "spot_price": spot, + **result, + "strikes_analyzed": len(result["by_strike"]), + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + "testnet": self.testnet, + } + + async def get_vanna_charm( + self, + currency: str, + expiry_from: str | None = None, + expiry_to: str | None = None, + top_n_strikes: int = 50, + ) -> dict: + """Vanna (∂delta/∂IV) e Charm (∂delta/∂t) aggregati pesati per OI. + Vanna positiva: dealer compra spot quando IV sale. + Charm negativa: time decay erode delta hedging. + """ + import datetime as _dt + spot, legs = await self._fetch_chain_legs(currency, expiry_from, expiry_to, top_n_strikes) + result = opt.vanna_charm_aggregate(legs, spot) + return { + "currency": currency.upper(), + **result, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + "testnet": self.testnet, + } + + async def get_oi_weighted_skew( + self, + currency: str, + expiry_from: str | None = None, + expiry_to: str | None = None, + top_n_strikes: int = 100, + ) -> dict: + """Skew aggregato pesato OI: IV media puts - calls. Positivo = paura. + """ + import datetime as _dt + _, legs = await self._fetch_chain_legs(currency, expiry_from, expiry_to, top_n_strikes) + result = opt.oi_weighted_skew(legs) + return { + "currency": currency.upper(), + **result, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + "testnet": self.testnet, + } + + async def get_smile_asymmetry( + self, + currency: str, + expiry_from: str | None = None, + expiry_to: str | None = None, + top_n_strikes: int = 100, + ) -> dict: + """Asymmetry IV otm-puts vs otm-calls. Positivo = put-side richer.""" + import datetime as _dt + spot, legs = await self._fetch_chain_legs(currency, expiry_from, expiry_to, top_n_strikes) + result = opt.smile_asymmetry(legs, spot) + return { + "currency": currency.upper(), + "spot_price": spot, + **result, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + "testnet": self.testnet, + } + + async def get_atm_vs_wings_vol( + self, + currency: str, + expiry_from: str | None = None, + expiry_to: str | None = None, + top_n_strikes: int = 100, + ) -> dict: + """IV ATM vs IV alle ali 25-delta. wing_richness > 0 → smile (kurtosis).""" + import datetime as _dt + spot, legs = await self._fetch_chain_legs(currency, expiry_from, expiry_to, top_n_strikes) + result = opt.atm_vs_wings_vol(legs, spot) + return { + "currency": currency.upper(), + "spot_price": spot, + **result, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + "testnet": self.testnet, + } + + async def get_pc_ratio(self, currency: str) -> dict: + import datetime as _dt + + currency = currency.upper() + raw = await self._request( + "public/get_book_summary_by_currency", + {"currency": currency, "kind": "option"}, + ) + rows = raw.get("result") or [] + call_oi = 0.0 + put_oi = 0.0 + call_vol = 0.0 + put_vol = 0.0 + for r in rows: + name = r.get("instrument_name", "") + oi = r.get("open_interest") or 0 + vol = r.get("volume") or 0 + if name.endswith("-C"): + call_oi += oi + call_vol += vol + elif name.endswith("-P"): + put_oi += oi + put_vol += vol + + def _ratio(put: float, call: float) -> float | None: + return round(put / call, 4) if call > 0 else None + + def _interp(ratio: float | None) -> str: + if ratio is None: + return "insufficient_data" + if ratio > 1.0: + return "puts_dominant" + if ratio < 0.7: + return "calls_dominant" + return "balanced" + + pc_oi = _ratio(put_oi, call_oi) + pc_vol = _ratio(put_vol, call_vol) + + return { + "currency": currency, + "pc_ratio_oi": pc_oi, + "pc_ratio_volume_24h": pc_vol, + "total_call_oi": call_oi, + "total_put_oi": put_oi, + "total_call_volume_24h": call_vol, + "total_put_volume_24h": put_vol, + "interpretation": { + "oi": _interp(pc_oi), + "volume_24h": _interp(pc_vol), + "percentile_90d": None, + }, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + "testnet": self.testnet, + } + + async def get_skew_25d(self, currency: str, expiry: str) -> dict: + import asyncio + import datetime as _dt + + currency = currency.upper() + put_task = self.find_by_delta(currency, expiry, -0.25, "put", 1, 0, 0) + call_task = self.find_by_delta(currency, expiry, 0.25, "call", 1, 0, 0) + try: + idx_tk = await self.get_ticker(f"{currency}-PERPETUAL") + spot = float(idx_tk.get("mark_price") or 0) + except Exception: + spot = 0.0 + put_res, call_res = await asyncio.gather(put_task, call_task) + + put_best = put_res.get("best_match") or {} + call_best = call_res.get("best_match") or {} + put_iv = put_best.get("mark_iv") + call_iv = call_best.get("mark_iv") + + # ATM IV: find instrument closest to spot on this expiry + exp_dt = _dt.datetime.fromisoformat(expiry) if "T" in expiry or "-" in expiry else _dt.datetime.strptime(expiry, "%Y-%m-%d") + chain = await self.get_instruments( + currency=currency, + kind="option", + expiry_from=expiry, + expiry_to=(exp_dt + _dt.timedelta(days=1)).strftime("%Y-%m-%d"), + limit=500, + ) + items = chain.get("instruments", []) + atm_iv = None + if items and spot > 0: + call_items = [i for i in items if i.get("option_type") == "call"] + if call_items: + call_items.sort(key=lambda x: abs((x.get("strike") or 0) - spot)) + atm_tk = await self.get_ticker(call_items[0]["name"]) + atm_iv = atm_tk.get("mark_iv") + + skew = None + if put_iv is not None and call_iv is not None: + skew = round(put_iv - call_iv, 4) + + if skew is None: + skew_sign = "unknown" + elif skew > 1: + skew_sign = "puts_rich" + elif skew < -1: + skew_sign = "calls_rich" + else: + skew_sign = "neutral" + + butterfly = None + if put_iv is not None and call_iv is not None and atm_iv is not None: + butterfly = round((put_iv + call_iv) / 2 - atm_iv, 4) + + return { + "currency": currency, + "expiry": expiry, + "put_25d_iv": put_iv, + "call_25d_iv": call_iv, + "atm_iv": atm_iv, + "skew_25d": skew, + "skew_sign": skew_sign, + "risk_reversal_25d": round(-skew, 4) if skew is not None else None, + "butterfly_25d": butterfly, + "skew_percentile_90d": None, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + "testnet": self.testnet, + } + + async def get_term_structure(self, currency: str) -> dict: + import asyncio + import datetime as _dt + + currency = currency.upper() + try: + idx_tk = await self.get_ticker(f"{currency}-PERPETUAL") + spot = float(idx_tk.get("mark_price") or 0) + except Exception: + spot = 0.0 + + chain = await self.get_instruments( + currency=currency, kind="option", limit=2000, offset=0 + ) + items = chain.get("instruments", []) + now_ms = int(_dt.datetime.now(_dt.UTC).timestamp() * 1000) + by_exp: dict[int, list[dict]] = {} + for i in items: + exp = i.get("expiry") + if exp is None or exp < now_ms: + continue + by_exp.setdefault(int(exp), []).append(i) + + atm_names: list[tuple[int, str]] = [] + for exp_ms, ins in sorted(by_exp.items()): + if spot > 0: + ins.sort(key=lambda x: abs((x.get("strike") or 0) - spot)) + atm_names.append((exp_ms, ins[0]["name"])) + + async def _ticker(name: str) -> dict: + try: + return await self.get_ticker(name) + except Exception: + return {} + + tickers = await asyncio.gather(*[_ticker(n) for _, n in atm_names]) + + ts = [] + for (exp_ms, name), tk in zip(atm_names, tickers, strict=True): + iv = tk.get("mark_iv") + if iv is None: + continue + exp_dt = _dt.datetime.fromtimestamp(exp_ms / 1000, _dt.UTC) + dte = max(0, (exp_dt - _dt.datetime.now(_dt.UTC)).days) + ts.append({ + "expiry": exp_dt.strftime("%Y-%m-%d"), + "dte": dte, + "atm_iv": iv, + "atm_instrument": name, + }) + + shape = "flat" + contango_steep = False + calendar_opp = False + if len(ts) >= 2: + diffs = [ts[i + 1]["atm_iv"] - ts[i]["atm_iv"] for i in range(len(ts) - 1)] + up = sum(1 for d in diffs if d > 0) + down = sum(1 for d in diffs if d < 0) + if up > len(diffs) / 2: + shape = "contango" + elif down > len(diffs) / 2: + shape = "backwardation" + short_term = next((x for x in ts if 8 <= x["dte"] <= 14), None) + mid_term = next((x for x in ts if 35 <= x["dte"] <= 45), None) + if short_term and mid_term and mid_term["atm_iv"] - short_term["atm_iv"] > 5: + contango_steep = True + calendar_opp = True + + return { + "currency": currency, + "spot": spot, + "term_structure": ts, + "shape": shape, + "contango_steep": contango_steep, + "calendar_spread_opportunity": calendar_opp, + "testnet": self.testnet, + } + + async def run_backtest( + self, + strategy_name: str, + underlying: str = "BTC", + lookback_days: int = 30, + resolution: str = "4h", + entry_rules: dict | None = None, + exit_rules: dict | None = None, + ) -> dict: + """Heuristic backtest: OHLCV lookback + simple rule-based entry/exit. + Approximate; does not simulate full options chain. + """ + import datetime as _dt + + end = _dt.datetime.now(_dt.UTC) + start = end - _dt.timedelta(days=lookback_days) + historical = await self.get_historical( + f"{underlying.upper()}-PERPETUAL", + start.strftime("%Y-%m-%d"), + end.strftime("%Y-%m-%d"), + resolution, + ) + candles = historical.get("candles", []) + if len(candles) < 20: + return { + "strategy_name": strategy_name, + "error": "insufficient history", + "trades_simulated": 0, + "recommendation": "reject", + } + + closes = [c["close"] for c in candles] + from cerbero_mcp.common import indicators as _ind + + rsi_thr_low = (entry_rules or {}).get("rsi_below", 35) + rsi_thr_high = (entry_rules or {}).get("rsi_above", 65) + target_pct = (exit_rules or {}).get("target_pct", 2.0) + stop_pct = (exit_rules or {}).get("stop_pct", -1.5) + max_hold_bars = (exit_rules or {}).get("max_hold_bars", 20) + + trades: list[dict] = [] + i = 14 + while i < len(closes) - 1: + sub = closes[: i + 1] + rsi_val = _ind.rsi(sub) + if rsi_val is None: + i += 1 + continue + side: str | None = None + if rsi_val < rsi_thr_low: + side = "long" + elif rsi_val > rsi_thr_high: + side = "short" + if side is None: + i += 1 + continue + entry = closes[i] + exit_bar = None + exit_price = entry + for j in range(1, min(max_hold_bars, len(closes) - i)): + p = closes[i + j] + pct = (p - entry) / entry * 100 * (1 if side == "long" else -1) + if pct >= target_pct or pct <= stop_pct: + exit_bar = j + exit_price = p + break + if exit_bar is None: + exit_bar = min(max_hold_bars, len(closes) - i - 1) + exit_price = closes[i + exit_bar] + pnl_pct = (exit_price - entry) / entry * 100 * (1 if side == "long" else -1) + trades.append({ + "entry_idx": i, + "side": side, + "entry": entry, + "exit": exit_price, + "bars_held": exit_bar, + "pnl_pct": pnl_pct, + }) + i += exit_bar + 1 + + if not trades: + return { + "strategy_name": strategy_name, + "trades_simulated": 0, + "recommendation": "reject", + "notes": "no entries triggered", + } + + wins = [t for t in trades if t["pnl_pct"] > 0] + losses = [t for t in trades if t["pnl_pct"] <= 0] + win_rate = len(wins) / len(trades) + total_profit = sum(t["pnl_pct"] for t in wins) + total_loss = -sum(t["pnl_pct"] for t in losses) + profit_factor = (total_profit / total_loss) if total_loss > 0 else float("inf") + equity = 0.0 + peak = 0.0 + max_dd = 0.0 + for t in trades: + equity += t["pnl_pct"] + peak = max(peak, equity) + dd = peak - equity + max_dd = max(max_dd, dd) + mean_r = sum(t["pnl_pct"] for t in trades) / len(trades) + var_r = sum((t["pnl_pct"] - mean_r) ** 2 for t in trades) / len(trades) + std_r = var_r ** 0.5 + sharpe = round(mean_r / std_r, 2) if std_r > 0 else None + + recommendation = "reject" + if win_rate > 0.55 and max_dd < 4.0 and profit_factor > 1.5: + recommendation = "accept" + elif win_rate > 0.50 and max_dd < 5.0 and profit_factor > 1.35: + recommendation = "marginal" + + return { + "strategy_name": strategy_name, + "underlying": underlying.upper(), + "lookback_days": lookback_days, + "resolution": resolution, + "trades_simulated": len(trades), + "win_rate": round(win_rate, 3), + "max_drawdown_pct": round(max_dd, 2), + "profit_factor": round(profit_factor, 2) if profit_factor != float("inf") else None, + "sharpe": sharpe, + "recommendation": recommendation, + "notes": "heuristic RSI-based backtest — approximate, not full options simulation", + "testnet": self.testnet, + "data_timestamp": _dt.datetime.now(_dt.UTC).isoformat(), + } + + async def calculate_spread_payoff( + self, + legs: list[dict], + quote_currency: str = "USD", + ) -> dict: + import asyncio + import re as _re + + if not legs or len(legs) > 4: + return {"error": "legs must be 1..4"} + + async def _fetch(name: str) -> dict: + try: + return await self.get_ticker(name) + except Exception as e: + return {"error": str(e)} + + tickers = await asyncio.gather(*[_fetch(l["instrument_name"]) for l in legs]) + + enriched: list[dict] = [] + spot_est: float | None = None + for leg, tk in zip(legs, tickers, strict=True): + name = leg["instrument_name"] + match = _re.match( + r"^([A-Z]+)-(\d{1,2}[A-Z]{3}\d{2})-(\d+(?:\.\d+)?)-(C|P)$", name + ) + if not match: + return {"error": f"invalid option name: {name}"} + coin, exp_str, strike_s, opt_type = match.groups() + strike = float(strike_s) + action = leg.get("action", "long").lower() + qty = float(leg.get("quantity", 1)) + sign = 1 if action == "long" else -1 + mark_price = tk.get("mark_price") or 0.0 + greeks = tk.get("greeks") or {} + if spot_est is None and mark_price: + spot_est = float(strike) + enriched.append({ + "name": name, + "coin": coin, + "expiry": exp_str, + "strike": strike, + "type": opt_type, + "action": action, + "quantity": qty, + "sign": sign, + "mark_price": float(mark_price), + "greeks": greeks, + }) + + strikes = [l["strike"] for l in enriched] + spot = spot_est or (sum(strikes) / len(strikes)) if strikes else 0.0 + if enriched and enriched[0]["coin"]: + try: + idx_name = f"{enriched[0]['coin']}-PERPETUAL" + idx_tk = await self.get_ticker(idx_name) + if idx_tk.get("mark_price"): + spot = float(idx_tk["mark_price"]) + except Exception: + pass + + net_premium = sum(l["sign"] * l["quantity"] * l["mark_price"] for l in enriched) + credit = net_premium < 0 + + greeks_net = {k: 0.0 for k in ("delta", "gamma", "vega", "theta", "rho")} + for l in enriched: + for k in greeks_net: + v = l["greeks"].get(k) + if v is not None: + greeks_net[k] += l["sign"] * l["quantity"] * float(v) + + def _payoff(s: float) -> float: + total = 0.0 + for l in enriched: + k = l["strike"] + intrinsic = max(0.0, s - k) if l["type"] == "C" else max(0.0, k - s) + total += l["sign"] * l["quantity"] * (intrinsic - l["mark_price"]) + return total + + lo = max(spot * 0.5, min(strikes) * 0.7) if strikes else spot * 0.5 + hi = max(spot * 1.5, max(strikes) * 1.3) if strikes else spot * 1.5 + steps = 14 + points = [] + for i in range(steps + 1): + s = lo + (hi - lo) * (i / steps) + points.append({"spot": round(s, 2), "pnl": round(_payoff(s), 4)}) + key_points = sorted({int(lo), int(spot), int(hi), *(int(k) for k in strikes)}) + for s in key_points: + points.append({"spot": float(s), "pnl": round(_payoff(float(s)), 4)}) + points.sort(key=lambda p: p["spot"]) + + pnls = [p["pnl"] for p in points] + max_profit = max(pnls) if pnls else 0.0 + max_loss = min(pnls) if pnls else 0.0 + + break_evens: list[float] = [] + for a, b in zip(points, points[1:], strict=False): + if a["pnl"] == 0: + break_evens.append(a["spot"]) + elif (a["pnl"] < 0 < b["pnl"]) or (a["pnl"] > 0 > b["pnl"]): + frac = -a["pnl"] / (b["pnl"] - a["pnl"]) + break_evens.append(round(a["spot"] + frac * (b["spot"] - a["spot"]), 2)) + + structure = self._guess_structure(enriched) + + sum(l["quantity"] * spot for l in enriched) if spot else 0.0 + fee_per_leg = min(0.0003 * (spot or 1) * sum(l["quantity"] for l in enriched), + 0.125 * abs(net_premium)) if spot else 0.0 + fees_open = round(fee_per_leg, 4) + fees_total = round(fees_open * 2, 4) + + warnings = [] + for l in enriched: + if l["action"] == "short": + any_long_same_type = any( + o["type"] == l["type"] and o["action"] == "long" and o["coin"] == l["coin"] + for o in enriched + ) + if not any_long_same_type: + warnings.append( + f"naked short {l['type']} {l['name']} — viola hard prohibition" + ) + + pl_ratio = ( + round(abs(max_profit / max_loss), 2) + if max_loss and max_loss != 0 else None + ) + + return { + "structure_name_guess": structure, + "net_premium": round(abs(net_premium), 4), + "net_premium_sign": "credit" if credit else "debit", + "max_profit": round(max_profit, 4), + "max_loss": round(max_loss, 4), + "break_even": break_evens, + "profit_loss_ratio": pl_ratio, + "greeks_net": {k: round(v, 6) for k, v in greeks_net.items()}, + "fees_estimate": { + "open": fees_open, + "close_estimate": fees_open, + "total": fees_total, + "cap_hit": fees_total >= 0.125 * abs(net_premium) if net_premium else False, + }, + "payoff_table": points, + "spot_assumed": round(spot, 2), + "warnings": warnings, + "testnet": self.testnet, + } + + @staticmethod + def _guess_structure(legs: list[dict]) -> str: + n = len(legs) + if n == 1: + l = legs[0] + return f"long {l['type']}" if l["action"] == "long" else f"short {l['type']}" + if n == 2: + types = {l["type"] for l in legs} + actions = {l["action"] for l in legs} + strikes = sorted(set(l["strike"] for l in legs)) + if types == {"P"} and actions == {"long", "short"}: + short = next(l for l in legs if l["action"] == "short") + long_ = next(l for l in legs if l["action"] == "long") + return "bull put spread" if short["strike"] > long_["strike"] else "bear put spread" + if types == {"C"} and actions == {"long", "short"}: + short = next(l for l in legs if l["action"] == "short") + long_ = next(l for l in legs if l["action"] == "long") + return "bear call spread" if short["strike"] < long_["strike"] else "bull call spread" + if types == {"C", "P"} and len(strikes) == 1 and len(actions) == 1: + return f"{list(actions)[0]} straddle" + if types == {"C", "P"} and len(strikes) == 2 and len(actions) == 1: + return f"{list(actions)[0]} strangle" + if len({l["expiry"] for l in legs}) == 2 and len(strikes) == 1: + return "calendar spread" + if n == 4: + types = [l["type"] for l in legs] + if types.count("P") == 2 and types.count("C") == 2: + return "iron condor" + return "custom" + + async def find_by_delta( + self, + currency: str, + expiry: str, + target_delta: float, + option_type: str, + max_results: int = 3, + min_open_interest: float = 100.0, + min_volume_24h: float = 20.0, + ) -> dict: + import asyncio + import datetime as _dt + + currency = currency.upper() + option_type = option_type.lower() + try: + exp_dt = _dt.datetime.fromisoformat(expiry) + except ValueError: + exp_dt = _dt.datetime.strptime(expiry, "%Y-%m-%d") + exp_ms = int(exp_dt.replace(tzinfo=_dt.UTC).timestamp() * 1000) + day_ms = 24 * 3600 * 1000 + + chain = await self.get_instruments( + currency=currency, + kind="option", + expiry_from=expiry, + expiry_to=(exp_dt + _dt.timedelta(days=1)).strftime("%Y-%m-%d"), + limit=500, + offset=0, + ) + items = [ + i for i in chain.get("instruments", []) + if i.get("option_type") == option_type + and i.get("expiry") is not None + and abs(i["expiry"] - exp_ms) < day_ms + ] + if not items: + return {"matches": [], "best_match": None, "note": "no instruments for expiry"} + + async def _ticker(inst: str) -> dict: + try: + return await self.get_ticker(inst) + except Exception: + return {} + + tickers = await asyncio.gather(*[_ticker(i["name"]) for i in items]) + + candidates = [] + for meta, tk in zip(items, tickers, strict=True): + greeks = tk.get("greeks") or {} + delta = greeks.get("delta") + oi = meta.get("open_interest") or 0 + vol = tk.get("volume_24h") or 0 + bid = tk.get("bid") or 0 + ask = tk.get("ask") or 0 + if delta is None: + continue + if oi < min_open_interest or vol < min_volume_24h: + continue + mid = (bid + ask) / 2 if (bid and ask) else None + spread_pct = round(100 * (ask - bid) / mid, 2) if mid and mid > 0 else None + candidates.append({ + "instrument_name": meta["name"], + "strike": meta["strike"], + "delta": delta, + "delta_distance": abs(delta - target_delta), + "mark_iv": tk.get("mark_iv"), + "mark_price_usd": tk.get("mark_price"), + "bid_ask_spread_pct": spread_pct, + "open_interest": oi, + "volume_24h": vol, + "greeks": greeks, + }) + candidates.sort(key=lambda x: x["delta_distance"]) + top = candidates[:max_results] + return { + "currency": currency, + "expiry": expiry, + "target_delta": target_delta, + "option_type": option_type, + "matches": top, + "best_match": top[0] if top else None, + "candidates_considered": len(candidates), + "testnet": self.testnet, + } + + async def get_iv_rank(self, instrument: str) -> dict: + currency = instrument.split("-", 1)[0].upper() + if currency not in ("BTC", "ETH"): + return { + "instrument": instrument, + "error": f"currency {currency} not supported for IV rank", + } + + ticker_raw = await self._request( + "public/ticker", {"instrument_name": instrument} + ) + tr = ticker_raw.get("result") or {} + current_iv = tr.get("mark_iv") + + import datetime as _dt + + now = _dt.datetime.now(_dt.UTC) + series_by_lookback: dict[int, list[float]] = {} + for lb in (30, 60, 90, 365): + start = now - _dt.timedelta(days=lb) + raw = await self._request( + "public/get_volatility_index_data", + { + "currency": currency, + "start_timestamp": int(start.timestamp() * 1000), + "end_timestamp": int(now.timestamp() * 1000), + "resolution": "1D", + }, + ) + rows = (raw.get("result") or {}).get("data") or [] + series_by_lookback[lb] = [ + float(r[4]) for r in rows if len(r) >= 5 + ] + + if current_iv is None: + latest = series_by_lookback.get(30) or series_by_lookback.get(90) or [] + current_iv = latest[-1] if latest else None + + def _percentile(values: list[float], target: float | None) -> float | None: + if target is None or not values: + return None + below = sum(1 for v in values if v <= target) + return round(100.0 * below / len(values), 2) + + def _rank(values: list[float], target: float | None) -> float | None: + if target is None or not values: + return None + lo, hi = min(values), max(values) + if hi == lo: + return None + return round(100.0 * (target - lo) / (hi - lo), 2) + + def _stats(values: list[float]) -> tuple[float | None, float | None]: + if not values: + return None, None + m = sum(values) / len(values) + var = sum((v - m) ** 2 for v in values) / len(values) + return m, var ** 0.5 + + mean_30, std_30 = _stats(series_by_lookback[30]) + warnings: list[str] = [] + if len(series_by_lookback[30]) < 10: + warnings.append("limited_history") + return { + "instrument": instrument, + "currency": currency, + "current_iv": current_iv, + "iv_percentile_30d": _percentile(series_by_lookback[30], current_iv), + "iv_percentile_60d": _percentile(series_by_lookback[60], current_iv), + "iv_percentile_90d": _percentile(series_by_lookback[90], current_iv), + "iv_percentile_365d": _percentile(series_by_lookback[365], current_iv), + "iv_rank_30d": _rank(series_by_lookback[30], current_iv), + "iv_rank_60d": _rank(series_by_lookback[60], current_iv), + "iv_rank_90d": _rank(series_by_lookback[90], current_iv), + "iv_rank_365d": _rank(series_by_lookback[365], current_iv), + "mean_30d": mean_30, + "stddev_30d": std_30, + "data_points_30d": len(series_by_lookback[30]), + "data_timestamp": now.isoformat(), + "warnings": warnings, + "testnet": self.testnet, + } + + async def get_realized_vol( + self, + currency: str, + windows: list[int] | None = None, + ) -> dict: + """Annualized realized volatility (log-return std) su daily closes di Deribit index.""" + import datetime as _dt + import math + + currency = currency.upper() + if currency not in ("BTC", "ETH"): + return {"currency": currency, "error": "currency not supported"} + windows = windows or [14, 30] + max_w = max(windows) + + now = _dt.datetime.now(_dt.UTC) + start = now - _dt.timedelta(days=max_w + 10) + raw = await self._request( + "public/get_tradingview_chart_data", + { + "instrument_name": f"{currency}-PERPETUAL", + "start_timestamp": int(start.timestamp() * 1000), + "end_timestamp": int(now.timestamp() * 1000), + "resolution": "1D", + }, + ) + result = raw.get("result") or {} + closes = [float(c) for c in (result.get("close") or [])] + rv_by_window: dict[str, float | None] = {} + for w in windows: + if len(closes) <= w: + rv_by_window[f"{w}d"] = None + continue + segment = closes[-(w + 1):] + rets = [ + math.log(segment[i] / segment[i - 1]) + for i in range(1, len(segment)) + if segment[i - 1] > 0 + ] + if len(rets) < 2: + rv_by_window[f"{w}d"] = None + continue + m = sum(rets) / len(rets) + var = sum((r - m) ** 2 for r in rets) / (len(rets) - 1) + rv_by_window[f"{w}d"] = round(math.sqrt(var * 365) * 100, 2) + + iv_current = None + try: + dvol_raw = await self._request( + "public/get_volatility_index_data", + { + "currency": currency, + "start_timestamp": int((now - _dt.timedelta(days=2)).timestamp() * 1000), + "end_timestamp": int(now.timestamp() * 1000), + "resolution": "1D", + }, + ) + dv = (dvol_raw.get("result") or {}).get("data") or [] + if dv: + iv_current = float(dv[-1][4]) + except Exception: + iv_current = None + + iv_rv_spread: dict[str, float | None] = {} + for w_key, rv in rv_by_window.items(): + iv_rv_spread[w_key] = ( + round(iv_current - rv, 2) if (iv_current is not None and rv is not None) else None + ) + + return { + "currency": currency, + "realized_vol_pct": rv_by_window, + "iv_current_pct": iv_current, + "iv_minus_rv_pct": iv_rv_spread, + "data_points": len(closes), + "data_timestamp": now.isoformat(), + "testnet": self.testnet, + } + + async def get_dvol_history( + self, + currency: str, + lookback_days: int = 90, + ) -> dict: + import datetime as _dt + + now = _dt.datetime.now(_dt.UTC) + start = now - _dt.timedelta(days=lookback_days) + raw = await self._request( + "public/get_volatility_index_data", + { + "currency": currency.upper(), + "start_timestamp": int(start.timestamp() * 1000), + "end_timestamp": int(now.timestamp() * 1000), + "resolution": "1D", + }, + ) + rows = (raw.get("result") or {}).get("data") or [] + series = [ + {"timestamp": r[0], "dvol": float(r[4])} + for r in rows + if len(r) >= 5 + ] + values = [s["dvol"] for s in series] + values_sorted = sorted(values) + + def _pct(p: float) -> float | None: + if not values_sorted: + return None + idx = int(round((len(values_sorted) - 1) * p)) + return values_sorted[idx] + + mean = sum(values) / len(values) if values else None + return { + "currency": currency.upper(), + "lookback_days": lookback_days, + "series": series, + "current": values[-1] if values else None, + "mean": mean, + "p25": _pct(0.25), + "p50": _pct(0.50), + "p75": _pct(0.75), + "p95": _pct(0.95), + "data_points": len(values), + "testnet": self.testnet, + } + + async def get_technical_indicators( + self, + instrument: str, + indicators: list[str], + start_date: str, + end_date: str, + resolution: str = "1h", + ) -> dict: + historical = await self.get_historical(instrument, start_date, end_date, resolution) + candles = historical.get("candles", []) + closes = [c["close"] for c in candles] + highs = [c["high"] for c in candles] + lows = [c["low"] for c in candles] + + result: dict[str, Any] = {} + for indicator in indicators: + name = indicator.lower() + if name == "sma": + result["sma"] = ind.sma(closes, 20) + elif name == "rsi": + result["rsi"] = ind.rsi(closes) + elif name == "atr": + result["atr"] = ind.atr(highs, lows, closes) + elif name == "macd": + result["macd"] = ind.macd(closes) + elif name == "adx": + result["adx"] = ind.adx(highs, lows, closes) + else: + result[name] = None + return result + + # ── Write tools ────────────────────────────────────────────── + + async def place_order( + self, + instrument_name: str, + side: str, + amount: float, + type: str = "limit", + price: float | None = None, + reduce_only: bool = False, + post_only: bool = False, + label: str | None = None, + ) -> dict: + endpoint = "private/buy" if side == "buy" else "private/sell" + params: dict[str, Any] = { + "instrument_name": instrument_name, + "amount": amount, + "type": type, + } + if price is not None: + if type in ("stop_market", "take_profit_market"): + params["trigger_price"] = price + params["trigger"] = "mark_price" + else: + params["price"] = price + if label: + params["label"] = label + if reduce_only: + params["reduce_only"] = True + if post_only: + params["post_only"] = True + raw = await self._request(endpoint, params) + r = raw.get("result") + if r is None: + return {"error": raw.get("error", "unknown"), "state": "error"} + return r + + async def place_combo_order( + self, + legs: list[dict[str, Any]], + side: str, + amount: float, + type: str = "limit", + price: float | None = None, + label: str | None = None, + ) -> dict: + """Crea un combo via private/create_combo poi piazza un singolo ordine + (buy/sell) sull'instrument_name del combo. Una sola crociata di spread + invece di N (uno per leg) → minor slippage su strutture liquide. + + legs: [{instrument_name, direction: 'buy'|'sell', ratio: int}]. + """ + combo_raw = await self._request("private/create_combo", {"trades": legs}) + combo = combo_raw.get("result") + if combo is None: + return {"state": "error", "error": combo_raw.get("error", "unknown")} + combo_instrument = combo.get("instrument_name") or combo.get("id") + order = await self.place_order( + instrument_name=combo_instrument, + side=side, + amount=amount, + type=type, + price=price, + label=label, + ) + if order.get("state") == "error": + return {"state": "error", "error": order.get("error"), "combo_instrument": combo_instrument} + return {"combo_instrument": combo_instrument, **order} + + async def set_leverage(self, instrument_name: str, leverage: int) -> dict: + """CER-016: pre-set account leverage per evitare default 50x testnet.""" + raw = await self._request( + "private/set_leverage", + {"instrument_name": instrument_name, "leverage": leverage}, + ) + if raw.get("result") is None: + return {"state": "error", "error": raw.get("error", "unknown")} + return {"state": "ok", "instrument": instrument_name, "leverage": leverage} + + async def cancel_order(self, order_id: str) -> dict: + raw = await self._request("private/cancel", {"order_id": order_id}) + if raw.get("result") is None: + return { + "order_id": order_id, + "state": "error", + "error": raw.get("error", "unknown"), + } + r = raw["result"] + return { + "order_id": r.get("order_id"), + "state": r.get("order_state"), + } + + async def set_stop_loss(self, order_id: str, stop_price: float) -> dict: + """ + Amend an existing order to add a stop-loss trigger via edit endpoint. + Deribit does not have a standalone set_stop_loss; we use private/edit + to update stop_price on the order. + """ + raw = await self._request( + "private/edit", + {"order_id": order_id, "stop_price": stop_price}, + ) + if raw.get("result") is None: + return {"order_id": order_id, "error": raw.get("error", "unknown")} + r = raw["result"].get("order", raw["result"]) + return { + "order_id": r.get("order_id"), + "state": r.get("order_state"), + "stop_price": r.get("stop_price"), + } + + async def set_take_profit(self, order_id: str, tp_price: float) -> dict: + """ + Amend an existing order to add a take-profit trigger via private/edit. + """ + raw = await self._request( + "private/edit", + {"order_id": order_id, "stop_price": tp_price}, + ) + if raw.get("result") is None: + return {"order_id": order_id, "error": raw.get("error", "unknown")} + r = raw["result"].get("order", raw["result"]) + return { + "order_id": r.get("order_id"), + "state": r.get("order_state"), + "tp_price": tp_price, + } + + async def close_position(self, instrument_name: str) -> dict: + raw = await self._request( + "private/close_position", + {"instrument_name": instrument_name, "type": "market"}, + ) + r = raw.get("result") + if r is None: + return {"instrument": instrument_name, "error": raw.get("error", "unknown"), "state": "error"} + order = r.get("order", r) + return { + "order_id": order.get("order_id"), + "state": order.get("order_state"), + } diff --git a/src/cerbero_mcp/exchanges/deribit/leverage_cap.py b/src/cerbero_mcp/exchanges/deribit/leverage_cap.py new file mode 100644 index 0000000..d04dd51 --- /dev/null +++ b/src/cerbero_mcp/exchanges/deribit/leverage_cap.py @@ -0,0 +1,56 @@ +"""Leverage cap server-side per place_order. + +Cap letto dal secret JSON via campo `max_leverage`. Default 1 (cash) se assente. +""" +from __future__ import annotations + +from fastapi import HTTPException + + +def get_max_leverage(creds: dict) -> int: + """Legge max_leverage dal secret. Default 1 se mancante.""" + raw = creds.get("max_leverage", 1) + try: + value = int(raw) + except (TypeError, ValueError): + value = 1 + return max(1, value) + + +def enforce_leverage( + requested: int | float | None, + *, + creds: dict, + exchange: str, +) -> int: + """Verifica e applica leverage cap. Ritorna leverage applicabile. + + Solleva HTTPException(403, LEVERAGE_CAP_EXCEEDED) se requested > cap. + Se requested is None, applica il cap come default. + """ + cap = get_max_leverage(creds) + if requested is None: + return cap + lev = int(requested) + if lev < 1: + raise HTTPException( + status_code=403, + detail={ + "error": "LEVERAGE_CAP_EXCEEDED", + "exchange": exchange, + "requested": lev, + "max": cap, + "reason": "leverage must be >= 1", + }, + ) + if lev > cap: + raise HTTPException( + status_code=403, + detail={ + "error": "LEVERAGE_CAP_EXCEEDED", + "exchange": exchange, + "requested": lev, + "max": cap, + }, + ) + return lev diff --git a/src/cerbero_mcp/exchanges/deribit/tools.py b/src/cerbero_mcp/exchanges/deribit/tools.py new file mode 100644 index 0000000..4bf2ce3 --- /dev/null +++ b/src/cerbero_mcp/exchanges/deribit/tools.py @@ -0,0 +1,528 @@ +"""Tool deribit V2: pydantic schemas + async functions. + +Ogni funzione prende (client: DeribitClient, params: ) e restituisce +un dict (o un model Pydantic). Pure logica, no FastAPI dependency, no ACL. +L'autenticazione bearer è gestita dal middleware in cerbero_mcp.auth; +l'audit verrà cablato dal router via request.state.environment. +""" +from __future__ import annotations + +import contextlib +from typing import Any + +from pydantic import BaseModel, field_validator, model_validator + +from cerbero_mcp.exchanges.deribit.client import DeribitClient +from cerbero_mcp.exchanges.deribit.leverage_cap import ( + enforce_leverage as _enforce_leverage, +) +from cerbero_mcp.exchanges.deribit.leverage_cap import get_max_leverage + +# === Schemas === + + +class GetTickerReq(BaseModel): + instrument_name: str | None = None + instrument: str | None = None + + model_config = {"extra": "allow"} + + @model_validator(mode="after") + def _normalize(self): + sym = self.instrument_name or self.instrument + if not sym: + raise ValueError("instrument_name (or instrument) is required") + self.instrument_name = sym + return self + + +class GetTickerBatchReq(BaseModel): + instrument_names: list[str] | None = None + instruments: list[str] | None = None + + model_config = {"extra": "allow"} + + @model_validator(mode="after") + def _normalize(self): + names = self.instrument_names or self.instruments + if not names: + raise ValueError("instrument_names (or instruments) is required") + self.instrument_names = names + return self + + +class GetInstrumentsReq(BaseModel): + currency: str + kind: str | None = None + expiry_from: str | None = None + expiry_to: str | None = None + strike_min: float | None = None + strike_max: float | None = None + min_open_interest: float | None = None + limit: int = 100 + offset: int = 0 + + +class GetOrderbookReq(BaseModel): + instrument_name: str + depth: int = 10 + + +class OrderbookImbalanceReq(BaseModel): + instrument_name: str + depth: int = 10 + + +class GetPositionsReq(BaseModel): + currency: str = "USDC" + + +class GetAccountSummaryReq(BaseModel): + currency: str = "USDC" + + +class GetTradeHistoryReq(BaseModel): + limit: int = 100 + instrument_name: str | None = None + + +class GetHistoricalReq(BaseModel): + instrument: str + start_date: str + end_date: str + resolution: str = "1h" + + +class GetDvolReq(BaseModel): + currency: str = "BTC" + start_date: str + end_date: str + resolution: str = "1D" + + +class GetDvolHistoryReq(BaseModel): + currency: str = "BTC" + lookback_days: int = 90 + + +class GetIvRankReq(BaseModel): + instrument: str + + +class GetRealizedVolReq(BaseModel): + currency: str = "BTC" + windows: list[int] = [14, 30] + + +class GetGexReq(BaseModel): + currency: str + expiry_from: str | None = None + expiry_to: str | None = None + top_n_strikes: int = 50 + + +class OptionFlowReq(BaseModel): + """Body comune per indicatori option-flow (dealer gamma, vanna/charm, + OI-weighted skew, smile asymmetry, ATM vs wings).""" + + currency: str + expiry_from: str | None = None + expiry_to: str | None = None + top_n_strikes: int = 100 + + +class GetPcRatioReq(BaseModel): + currency: str + + +class GetSkew25dReq(BaseModel): + currency: str + expiry: str + + +class GetTermStructureReq(BaseModel): + currency: str + + +class CalculateSpreadPayoffReq(BaseModel): + legs: list[dict] + quote_currency: str = "USD" + + +class RunBacktestReq(BaseModel): + strategy_name: str + underlying: str = "BTC" + lookback_days: int = 30 + resolution: str = "4h" + entry_rules: dict | None = None + exit_rules: dict | None = None + + +class FindByDeltaReq(BaseModel): + currency: str + expiry: str + target_delta: float + option_type: str + max_results: int = 3 + min_open_interest: float = 100.0 + min_volume_24h: float = 20.0 + + +class GetIndicatorsReq(BaseModel): + instrument: str + indicators: list[str] + start_date: str + end_date: str + resolution: str = "1h" + + @field_validator("indicators", mode="before") + @classmethod + def _coerce_indicators(cls, v): + if isinstance(v, str): + import json + + s = v.strip() + if s.startswith("["): + try: + parsed = json.loads(s) + if isinstance(parsed, list): + return [str(x).strip() for x in parsed if str(x).strip()] + except json.JSONDecodeError: + pass + return [x.strip() for x in s.split(",") if x.strip()] + if isinstance(v, list): + return v + raise ValueError( + "indicators must be a list like ['rsi','atr','macd'] " + "or a comma-separated string like 'rsi,atr,macd'" + ) + + +class PlaceOrderReq(BaseModel): + instrument_name: str + side: str # "buy" | "sell" + amount: float + type: str = "limit" + price: float | None = None + reduce_only: bool = False + post_only: bool = False + label: str | None = None + leverage: int | None = None # CER-016: None → default cap (3x) + + model_config = { + "json_schema_extra": { + "examples": [ + { + "summary": "Market buy 0.1 BTC perpetual", + "value": { + "instrument_name": "BTC-PERPETUAL", + "type": "market", + "amount": 0.1, + "side": "buy", + }, + } + ] + } + } + + +class ComboLeg(BaseModel): + instrument_name: str + direction: str # "buy" | "sell" + ratio: int = 1 + + +class PlaceComboOrderReq(BaseModel): + legs: list[ComboLeg] + side: str # "buy" | "sell" + amount: float + type: str = "limit" + price: float | None = None + label: str | None = None + leverage: int | None = None + + @model_validator(mode="after") + def _at_least_two_legs(self): + if len(self.legs) < 2: + raise ValueError("combo requires at least 2 legs") + return self + + +class CancelOrderReq(BaseModel): + order_id: str + + +class SetStopLossReq(BaseModel): + order_id: str + stop_price: float + + +class SetTakeProfitReq(BaseModel): + order_id: str + tp_price: float + + +class ClosePositionReq(BaseModel): + instrument_name: str + + +# === Tools (reads) === + + +async def is_testnet(client: DeribitClient) -> dict: + return client.is_testnet() + + +async def environment_info( + client: DeribitClient, *, creds: dict, env_info: Any | None = None +) -> dict: + if env_info is None: + return { + "exchange": "deribit", + "environment": "testnet" if client.is_testnet().get("testnet") else "mainnet", + "source": "credentials", + "env_value": None, + "base_url": client.base_url, + "max_leverage": get_max_leverage(creds), + } + return { + "exchange": env_info.exchange, + "environment": env_info.environment, + "source": env_info.source, + "env_value": env_info.env_value, + "base_url": env_info.base_url, + "max_leverage": get_max_leverage(creds), + } + + +async def get_ticker(client: DeribitClient, params: GetTickerReq) -> dict: + return await client.get_ticker(params.instrument_name) + + +async def get_ticker_batch(client: DeribitClient, params: GetTickerBatchReq) -> dict: + return await client.get_ticker_batch(params.instrument_names) + + +async def get_instruments(client: DeribitClient, params: GetInstrumentsReq) -> dict: + return await client.get_instruments( + currency=params.currency, + kind=params.kind, + expiry_from=params.expiry_from, + expiry_to=params.expiry_to, + strike_min=params.strike_min, + strike_max=params.strike_max, + min_open_interest=params.min_open_interest, + limit=params.limit, + offset=params.offset, + ) + + +async def get_orderbook(client: DeribitClient, params: GetOrderbookReq) -> dict: + return await client.get_orderbook(params.instrument_name, params.depth) + + +async def get_orderbook_imbalance( + client: DeribitClient, params: OrderbookImbalanceReq +) -> dict: + return await client.get_orderbook_imbalance(params.instrument_name, params.depth) + + +async def get_positions(client: DeribitClient, params: GetPositionsReq) -> dict: + return await client.get_positions(params.currency) + + +async def get_account_summary( + client: DeribitClient, params: GetAccountSummaryReq +) -> dict: + return await client.get_account_summary(params.currency) + + +async def get_trade_history(client: DeribitClient, params: GetTradeHistoryReq) -> dict: + return await client.get_trade_history(params.limit, params.instrument_name) + + +async def get_historical(client: DeribitClient, params: GetHistoricalReq) -> dict: + return await client.get_historical( + params.instrument, params.start_date, params.end_date, params.resolution + ) + + +async def get_dvol(client: DeribitClient, params: GetDvolReq) -> dict: + return await client.get_dvol( + params.currency, params.start_date, params.end_date, params.resolution + ) + + +async def get_gex(client: DeribitClient, params: GetGexReq) -> dict: + return await client.get_gex( + params.currency, params.expiry_from, params.expiry_to, params.top_n_strikes + ) + + +async def get_dealer_gamma_profile( + client: DeribitClient, params: OptionFlowReq +) -> dict: + return await client.get_dealer_gamma_profile( + params.currency, params.expiry_from, params.expiry_to, params.top_n_strikes + ) + + +async def get_vanna_charm(client: DeribitClient, params: OptionFlowReq) -> dict: + return await client.get_vanna_charm( + params.currency, params.expiry_from, params.expiry_to, params.top_n_strikes + ) + + +async def get_oi_weighted_skew(client: DeribitClient, params: OptionFlowReq) -> dict: + return await client.get_oi_weighted_skew( + params.currency, params.expiry_from, params.expiry_to, params.top_n_strikes + ) + + +async def get_smile_asymmetry(client: DeribitClient, params: OptionFlowReq) -> dict: + return await client.get_smile_asymmetry( + params.currency, params.expiry_from, params.expiry_to, params.top_n_strikes + ) + + +async def get_atm_vs_wings_vol(client: DeribitClient, params: OptionFlowReq) -> dict: + return await client.get_atm_vs_wings_vol( + params.currency, params.expiry_from, params.expiry_to, params.top_n_strikes + ) + + +async def get_pc_ratio(client: DeribitClient, params: GetPcRatioReq) -> dict: + return await client.get_pc_ratio(params.currency) + + +async def get_skew_25d(client: DeribitClient, params: GetSkew25dReq) -> dict: + return await client.get_skew_25d(params.currency, params.expiry) + + +async def get_term_structure( + client: DeribitClient, params: GetTermStructureReq +) -> dict: + return await client.get_term_structure(params.currency) + + +async def run_backtest(client: DeribitClient, params: RunBacktestReq) -> dict: + return await client.run_backtest( + strategy_name=params.strategy_name, + underlying=params.underlying, + lookback_days=params.lookback_days, + resolution=params.resolution, + entry_rules=params.entry_rules, + exit_rules=params.exit_rules, + ) + + +async def calculate_spread_payoff( + client: DeribitClient, params: CalculateSpreadPayoffReq +) -> dict: + return await client.calculate_spread_payoff(params.legs, params.quote_currency) + + +async def find_by_delta(client: DeribitClient, params: FindByDeltaReq) -> dict: + return await client.find_by_delta( + currency=params.currency, + expiry=params.expiry, + target_delta=params.target_delta, + option_type=params.option_type, + max_results=params.max_results, + min_open_interest=params.min_open_interest, + min_volume_24h=params.min_volume_24h, + ) + + +async def get_iv_rank(client: DeribitClient, params: GetIvRankReq) -> dict: + return await client.get_iv_rank(params.instrument) + + +async def get_dvol_history(client: DeribitClient, params: GetDvolHistoryReq) -> dict: + return await client.get_dvol_history(params.currency, params.lookback_days) + + +async def get_realized_vol(client: DeribitClient, params: GetRealizedVolReq) -> dict: + return await client.get_realized_vol(params.currency, params.windows) + + +async def get_technical_indicators( + client: DeribitClient, params: GetIndicatorsReq +) -> dict: + return await client.get_technical_indicators( + params.instrument, + params.indicators, + params.start_date, + params.end_date, + params.resolution, + ) + + +# === Tools (writes) === + + +async def place_order( + client: DeribitClient, params: PlaceOrderReq, *, creds: dict +) -> dict: + cap_default = get_max_leverage(creds) + lev = _enforce_leverage(params.leverage, creds=creds, exchange="deribit") + if lev != cap_default: + with contextlib.suppress(Exception): + await client.set_leverage(params.instrument_name, lev) + result = await client.place_order( + instrument_name=params.instrument_name, + side=params.side, + amount=params.amount, + type=params.type, + price=params.price, + reduce_only=params.reduce_only, + post_only=params.post_only, + label=params.label, + ) + # TODO V2: wire audit via request.state.environment in router + return result + + +async def place_combo_order( + client: DeribitClient, params: PlaceComboOrderReq, *, creds: dict +) -> dict: + cap_default = get_max_leverage(creds) + lev = _enforce_leverage(params.leverage, creds=creds, exchange="deribit") + if lev != cap_default: + for leg in params.legs: + with contextlib.suppress(Exception): + await client.set_leverage(leg.instrument_name, lev) + result = await client.place_combo_order( + legs=[leg.model_dump() for leg in params.legs], + side=params.side, + amount=params.amount, + type=params.type, + price=params.price, + label=params.label, + ) + # TODO V2: wire audit via request.state.environment in router + return result + + +async def cancel_order(client: DeribitClient, params: CancelOrderReq) -> dict: + result = await client.cancel_order(params.order_id) + # TODO V2: wire audit via request.state.environment in router + return result + + +async def set_stop_loss(client: DeribitClient, params: SetStopLossReq) -> dict: + result = await client.set_stop_loss(params.order_id, params.stop_price) + # TODO V2: wire audit via request.state.environment in router + return result + + +async def set_take_profit(client: DeribitClient, params: SetTakeProfitReq) -> dict: + result = await client.set_take_profit(params.order_id, params.tp_price) + # TODO V2: wire audit via request.state.environment in router + return result + + +async def close_position(client: DeribitClient, params: ClosePositionReq) -> dict: + result = await client.close_position(params.instrument_name) + # TODO V2: wire audit via request.state.environment in router + return result