Files
Cerbero-mcp/services/mcp-bybit/src/mcp_bybit/server.py
T

397 lines
15 KiB
Python

from __future__ import annotations
import os
from fastapi import Depends, HTTPException
from mcp_common.auth import Principal, TokenStore, require_principal
from mcp_common.environment import EnvironmentInfo
from mcp_common.mcp_bridge import mount_mcp_endpoint
from mcp_common.server import build_app
from pydantic import BaseModel
from mcp_bybit.client import BybitClient
from mcp_bybit.leverage_cap import enforce_leverage as _enforce_leverage
from mcp_bybit.leverage_cap import get_max_leverage
# --- Body models: reads ---
class TickerReq(BaseModel):
symbol: str
category: str = "linear"
class TickerBatchReq(BaseModel):
symbols: list[str]
category: str = "linear"
class OrderbookReq(BaseModel):
symbol: str
category: str = "linear"
limit: int = 50
class HistoricalReq(BaseModel):
symbol: str
category: str = "linear"
interval: str = "60"
start: int | None = None
end: int | None = None
limit: int = 1000
class IndicatorsReq(BaseModel):
symbol: str
category: str = "linear"
indicators: list[str] = ["rsi", "atr", "macd", "adx"]
interval: str = "60"
start: int | None = None
end: int | None = None
class FundingRateReq(BaseModel):
symbol: str
category: str = "linear"
class FundingHistoryReq(BaseModel):
symbol: str
category: str = "linear"
limit: int = 100
class OpenInterestReq(BaseModel):
symbol: str
category: str = "linear"
interval: str = "5min"
limit: int = 288
class InstrumentsReq(BaseModel):
category: str = "linear"
symbol: str | None = None
class OptionChainReq(BaseModel):
base_coin: str
expiry: str | None = None
class PositionsReq(BaseModel):
category: str = "linear"
class AccountSummaryReq(BaseModel):
pass
class TradeHistoryReq(BaseModel):
category: str = "linear"
limit: int = 50
class OpenOrdersReq(BaseModel):
category: str = "linear"
symbol: str | None = None
class BasisSpotPerpReq(BaseModel):
asset: str
# --- Body models: writes ---
class PlaceOrderReq(BaseModel):
category: str
symbol: str
side: str
qty: float
order_type: str = "Limit"
price: float | None = None
tif: str = "GTC"
reduce_only: bool = False
position_idx: int | None = None
class AmendOrderReq(BaseModel):
category: str
symbol: str
order_id: str
new_qty: float | None = None
new_price: float | None = None
class CancelOrderReq(BaseModel):
category: str
symbol: str
order_id: str
class CancelAllReq(BaseModel):
category: str
symbol: str | None = None
class SetStopLossReq(BaseModel):
category: str
symbol: str
stop_loss: float
position_idx: int = 0
class SetTakeProfitReq(BaseModel):
category: str
symbol: str
take_profit: float
position_idx: int = 0
class ClosePositionReq(BaseModel):
category: str
symbol: str
class SetLeverageReq(BaseModel):
category: str
symbol: str
leverage: int
class SwitchModeReq(BaseModel):
category: str
symbol: str
mode: str
class TransferReq(BaseModel):
coin: str
amount: float
from_type: str
to_type: str
# --- ACL helper ---
def _check(principal: Principal, *, core: bool = False, observer: bool = False) -> None:
allowed: set[str] = set()
if core:
allowed.add("core")
if observer:
allowed.add("observer")
if not (principal.capabilities & allowed):
raise HTTPException(status_code=403, detail="forbidden")
def create_app(
*,
client: BybitClient,
token_store: TokenStore,
creds: dict | None = None,
env_info: "EnvironmentInfo | None" = None,
):
creds = creds or {}
app = build_app(name="mcp-bybit", version="0.1.0", token_store=token_store)
# ── Reads ──────────────────────────────────────────────
@app.post("/tools/environment_info", tags=["reads"])
async def t_environment_info(principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
if env_info is None:
return {
"exchange": "bybit",
"environment": "testnet" if client.testnet else "mainnet",
"source": "credentials",
"env_value": None,
"base_url": getattr(client, "base_url", None),
"max_leverage": get_max_leverage(creds),
}
return {
"exchange": env_info.exchange,
"environment": env_info.environment,
"source": env_info.source,
"env_value": env_info.env_value,
"base_url": env_info.base_url,
"max_leverage": get_max_leverage(creds),
}
@app.post("/tools/get_ticker", tags=["reads"])
async def t_get_ticker(body: TickerReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_ticker(body.symbol, body.category)
@app.post("/tools/get_ticker_batch", tags=["reads"])
async def t_get_ticker_batch(body: TickerBatchReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_ticker_batch(body.symbols, body.category)
@app.post("/tools/get_orderbook", tags=["reads"])
async def t_get_orderbook(body: OrderbookReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_orderbook(body.symbol, body.category, body.limit)
@app.post("/tools/get_historical", tags=["reads"])
async def t_get_historical(body: HistoricalReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_historical(
body.symbol, body.category, body.interval, body.start, body.end, body.limit,
)
@app.post("/tools/get_indicators", tags=["reads"])
async def t_get_indicators(body: IndicatorsReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_indicators(
body.symbol, body.category, body.indicators,
body.interval, body.start, body.end,
)
@app.post("/tools/get_funding_rate", tags=["reads"])
async def t_get_funding_rate(body: FundingRateReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_funding_rate(body.symbol, body.category)
@app.post("/tools/get_funding_history", tags=["reads"])
async def t_get_funding_history(body: FundingHistoryReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_funding_history(body.symbol, body.category, body.limit)
@app.post("/tools/get_open_interest", tags=["reads"])
async def t_get_open_interest(body: OpenInterestReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_open_interest(body.symbol, body.category, body.interval, body.limit)
@app.post("/tools/get_instruments", tags=["reads"])
async def t_get_instruments(body: InstrumentsReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_instruments(body.category, body.symbol)
@app.post("/tools/get_option_chain", tags=["reads"])
async def t_get_option_chain(body: OptionChainReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_option_chain(body.base_coin, body.expiry)
@app.post("/tools/get_positions", tags=["reads"])
async def t_get_positions(body: PositionsReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return {"positions": await client.get_positions(body.category)}
@app.post("/tools/get_account_summary", tags=["reads"])
async def t_get_account_summary(body: AccountSummaryReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_account_summary()
@app.post("/tools/get_trade_history", tags=["reads"])
async def t_get_trade_history(body: TradeHistoryReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return {"trades": await client.get_trade_history(body.category, body.limit)}
@app.post("/tools/get_open_orders", tags=["reads"])
async def t_get_open_orders(body: OpenOrdersReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return {"orders": await client.get_open_orders(body.category, body.symbol)}
@app.post("/tools/get_basis_spot_perp", tags=["reads"])
async def t_get_basis_spot_perp(body: BasisSpotPerpReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True, observer=True)
return await client.get_basis_spot_perp(body.asset)
# ── Writes ─────────────────────────────────────────────
@app.post("/tools/place_order", tags=["writes"])
async def t_place_order(body: PlaceOrderReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.place_order(
body.category, body.symbol, body.side, body.qty,
body.order_type, body.price, body.tif, body.reduce_only, body.position_idx,
)
@app.post("/tools/amend_order", tags=["writes"])
async def t_amend_order(body: AmendOrderReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.amend_order(
body.category, body.symbol, body.order_id, body.new_qty, body.new_price,
)
@app.post("/tools/cancel_order", tags=["writes"])
async def t_cancel_order(body: CancelOrderReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.cancel_order(body.category, body.symbol, body.order_id)
@app.post("/tools/cancel_all_orders", tags=["writes"])
async def t_cancel_all(body: CancelAllReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.cancel_all_orders(body.category, body.symbol)
@app.post("/tools/set_stop_loss", tags=["writes"])
async def t_set_sl(body: SetStopLossReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.set_stop_loss(body.category, body.symbol, body.stop_loss, body.position_idx)
@app.post("/tools/set_take_profit", tags=["writes"])
async def t_set_tp(body: SetTakeProfitReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.set_take_profit(body.category, body.symbol, body.take_profit, body.position_idx)
@app.post("/tools/close_position", tags=["writes"])
async def t_close(body: ClosePositionReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.close_position(body.category, body.symbol)
@app.post("/tools/set_leverage", tags=["writes"])
async def t_set_leverage(body: SetLeverageReq, principal: Principal = Depends(require_principal)):
_enforce_leverage(body.leverage, creds=creds, exchange="bybit")
_check(principal, core=True)
return await client.set_leverage(body.category, body.symbol, body.leverage)
@app.post("/tools/switch_position_mode", tags=["writes"])
async def t_switch_mode(body: SwitchModeReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.switch_position_mode(body.category, body.symbol, body.mode)
@app.post("/tools/transfer_asset", tags=["writes"])
async def t_transfer(body: TransferReq, principal: Principal = Depends(require_principal)):
_check(principal, core=True)
return await client.transfer_asset(body.coin, body.amount, body.from_type, body.to_type)
# ── MCP mount ──────────────────────────────────────────
port = int(os.environ.get("PORT", "9019"))
mount_mcp_endpoint(
app,
name="cerbero-bybit",
version="0.1.0",
token_store=token_store,
internal_base_url=f"http://localhost:{port}",
tools=[
{"name": "environment_info", "description": "Ambiente operativo (testnet/mainnet), source, base_url, max_leverage cap."},
{"name": "get_ticker", "description": "Ticker Bybit (spot/linear/inverse/option)."},
{"name": "get_ticker_batch", "description": "Ticker per più simboli."},
{"name": "get_orderbook", "description": "Orderbook profondità N."},
{"name": "get_historical", "description": "OHLCV candles Bybit."},
{"name": "get_indicators", "description": "Indicatori tecnici (RSI, ATR, MACD, ADX)."},
{"name": "get_funding_rate", "description": "Funding corrente perp."},
{"name": "get_funding_history", "description": "Funding storico perp."},
{"name": "get_open_interest", "description": "Open interest history perp."},
{"name": "get_instruments", "description": "Specs contratti."},
{"name": "get_option_chain", "description": "Option chain BTC/ETH/SOL."},
{"name": "get_positions", "description": "Posizioni aperte."},
{"name": "get_account_summary", "description": "Wallet balance e margine."},
{"name": "get_trade_history", "description": "Fills recenti."},
{"name": "get_open_orders", "description": "Ordini pending."},
{"name": "get_basis_spot_perp", "description": "Basis spot vs linear perp."},
{"name": "place_order", "description": "Invia ordine (CORE only)."},
{"name": "amend_order", "description": "Modifica ordine esistente."},
{"name": "cancel_order", "description": "Cancella ordine."},
{"name": "cancel_all_orders", "description": "Cancella tutti ordini."},
{"name": "set_stop_loss", "description": "Setta stop loss su posizione."},
{"name": "set_take_profit", "description": "Setta take profit su posizione."},
{"name": "close_position", "description": "Chiude posizione aperta."},
{"name": "set_leverage", "description": "Leva buy+sell uniforme."},
{"name": "switch_position_mode", "description": "Hedge vs one-way."},
{"name": "transfer_asset", "description": "Trasferimento interno tra account types."},
],
)
return app