from __future__ import annotations import logging import os from fastapi import Depends, FastAPI, HTTPException from mcp_common.auth import Principal, TokenStore, require_principal from mcp_common.mcp_bridge import mount_mcp_endpoint from mcp_common.server import build_app from pydantic import BaseModel from mcp_sentiment.fetchers import ( fetch_cointegration_pairs, fetch_cross_exchange_funding, fetch_crypto_news, fetch_funding_arb_spread, fetch_funding_rates, fetch_liquidation_heatmap, fetch_oi_history, fetch_social_sentiment, fetch_world_news, ) logger = logging.getLogger(__name__) # --- Body models --- class GetCryptoNewsReq(BaseModel): limit: int = 20 class GetSocialSentimentReq(BaseModel): symbol: str = "BTC" class GetFundingRatesReq(BaseModel): asset: str = "BTC" class GetWorldNewsReq(BaseModel): pass class GetCrossExchangeFundingReq(BaseModel): assets: list[str] | None = None class GetFundingArbSpreadReq(BaseModel): assets: list[str] | None = None class GetLiquidationHeatmapReq(BaseModel): asset: str = "BTC" class GetCointegrationPairsReq(BaseModel): pairs: list[list[str]] | None = None lookback_hours: int = 24 class GetOiHistoryReq(BaseModel): asset: str = "BTC" period: str = "5m" limit: int = 288 # --- ACL helper --- def _check(principal: Principal, *, core: bool = False, observer: bool = False) -> None: allowed: set[str] = set() if core: allowed.add("core") if observer: allowed.add("observer") if not (principal.capabilities & allowed): raise HTTPException(403, f"capability required: {allowed}") # --- App factory --- def create_app(*, cryptopanic_key: str = "", token_store: TokenStore) -> FastAPI: app = build_app(name="mcp-sentiment", version="0.1.0", token_store=token_store) if not cryptopanic_key or cryptopanic_key.lower() in ("placeholder", "none", "changeme"): logger.warning( "mcp-sentiment: cryptopanic_key mancante o placeholder — get_crypto_news " "ritornerà headlines=[] con note diagnostica" ) @app.post("/tools/get_crypto_news", tags=["reads"]) async def t_get_crypto_news( body: GetCryptoNewsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_crypto_news(api_key=cryptopanic_key, limit=body.limit) @app.post("/tools/get_social_sentiment", tags=["reads"]) async def t_get_social_sentiment( body: GetSocialSentimentReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_social_sentiment(body.symbol) @app.post("/tools/get_funding_rates", tags=["reads"]) async def t_get_funding_rates( body: GetFundingRatesReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_funding_rates(body.asset) @app.post("/tools/get_world_news", tags=["reads"]) async def t_get_world_news( body: GetWorldNewsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_world_news() @app.post("/tools/get_cross_exchange_funding", tags=["reads"]) async def t_get_cross_exchange_funding( body: GetCrossExchangeFundingReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_cross_exchange_funding(body.assets) @app.post("/tools/get_funding_arb_spread", tags=["reads"]) async def t_get_funding_arb_spread( body: GetFundingArbSpreadReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_funding_arb_spread(body.assets) @app.post("/tools/get_liquidation_heatmap", tags=["reads"]) async def t_get_liquidation_heatmap( body: GetLiquidationHeatmapReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_liquidation_heatmap(body.asset) @app.post("/tools/get_cointegration_pairs", tags=["reads"]) async def t_get_cointegration_pairs( body: GetCointegrationPairsReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_cointegration_pairs(body.pairs, body.lookback_hours) @app.post("/tools/get_oi_history", tags=["reads"]) async def t_get_oi_history( body: GetOiHistoryReq, principal: Principal = Depends(require_principal) ): _check(principal, core=True, observer=True) return await fetch_oi_history(body.asset, body.period, body.limit) # ───── MCP endpoint (/mcp) — bridge verso /tools/* ───── port = int(os.environ.get("PORT", "9014")) mount_mcp_endpoint( app, name="cerbero-sentiment", version="0.1.0", token_store=token_store, internal_base_url=f"http://localhost:{port}", tools=[ {"name": "get_crypto_news", "description": "News crypto da CryptoPanic."}, {"name": "get_social_sentiment", "description": "Sentiment aggregato social."}, {"name": "get_funding_rates", "description": "Funding rates aggregati."}, {"name": "get_world_news", "description": "News macro/world."}, {"name": "get_cross_exchange_funding", "description": "Funding multi-asset multi-exchange + arbitrage opportunities."}, {"name": "get_oi_history", "description": "Open interest history perp (Binance) + delta_pct 1h/4h/24h."}, {"name": "get_funding_arb_spread", "description": "Opportunità arbitrage funding cross-exchange in formato compatto + annualized %."}, {"name": "get_liquidation_heatmap", "description": "Pressione liquidazioni heuristica da OI delta + funding (long/short squeeze risk)."}, {"name": "get_cointegration_pairs", "description": "Engle-Granger cointegration test su coppie crypto Binance hourly."}, ], ) return app