From f56df197e15088036767ce1bb6e5838edac72a40 Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Thu, 30 Apr 2026 18:46:48 +0200 Subject: [PATCH] feat(V2): migrazione sentiment completa (read-only, env ignored) - exchanges/sentiment/{client,fetchers,tools}.py: SentimentClient wrapper stateless (cryptopanic_key, lunarcrush_key) - routers/sentiment.py: 9 tool POST sotto /mcp-sentiment (news, social, funding, OI, liquidations, cointegration) - exchanges/__init__.py: branch builder per sentiment (env ignored) - tests/unit/exchanges/sentiment: migrato test_fetchers, scartato test_server_acl V1-only - tests/unit/test_exchanges_builder.py: aggiunto test_build_client_sentiment_no_env_distinction - fetchers.py: env var lookup allineato a LUNARCRUSH_KEY (con fallback LUNARCRUSH_API_KEY) 241 test passano. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cerbero_mcp/exchanges/__init__.py | 9 + .../exchanges/sentiment/__init__.py | 0 src/cerbero_mcp/exchanges/sentiment/client.py | 21 + .../exchanges/sentiment/fetchers.py | 660 ++++++++++++++++++ src/cerbero_mcp/exchanges/sentiment/tools.py | 122 ++++ src/cerbero_mcp/routers/sentiment.py | 98 +++ tests/unit/exchanges/sentiment/__init__.py | 0 .../unit/exchanges/sentiment/test_fetchers.py | 320 +++++++++ tests/unit/test_exchanges_builder.py | 23 + 9 files changed, 1253 insertions(+) create mode 100644 src/cerbero_mcp/exchanges/sentiment/__init__.py create mode 100644 src/cerbero_mcp/exchanges/sentiment/client.py create mode 100644 src/cerbero_mcp/exchanges/sentiment/fetchers.py create mode 100644 src/cerbero_mcp/exchanges/sentiment/tools.py create mode 100644 src/cerbero_mcp/routers/sentiment.py create mode 100644 tests/unit/exchanges/sentiment/__init__.py create mode 100644 tests/unit/exchanges/sentiment/test_fetchers.py diff --git a/src/cerbero_mcp/exchanges/__init__.py b/src/cerbero_mcp/exchanges/__init__.py index 910b1f7..0b5c5c6 100644 --- a/src/cerbero_mcp/exchanges/__init__.py +++ b/src/cerbero_mcp/exchanges/__init__.py @@ -54,4 +54,13 @@ async def build_client( fred_api_key=settings.macro.fred_api_key.get_secret_value(), finnhub_api_key=settings.macro.finnhub_api_key.get_secret_value(), ) + if exchange == "sentiment": + # Read-only data provider — env ignored (CryptoPanic, LunarCrush e + # endpoint pubblici di funding/OI multi-exchange sono unici). + from cerbero_mcp.exchanges.sentiment.client import SentimentClient + + return SentimentClient( + cryptopanic_key=settings.sentiment.cryptopanic_key.get_secret_value(), + lunarcrush_key=settings.sentiment.lunarcrush_key.get_secret_value(), + ) raise ValueError(f"unsupported exchange: {exchange}") diff --git a/src/cerbero_mcp/exchanges/sentiment/__init__.py b/src/cerbero_mcp/exchanges/sentiment/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cerbero_mcp/exchanges/sentiment/client.py b/src/cerbero_mcp/exchanges/sentiment/client.py new file mode 100644 index 0000000..eee2c4e --- /dev/null +++ b/src/cerbero_mcp/exchanges/sentiment/client.py @@ -0,0 +1,21 @@ +"""SentimentClient: contenitore credenziali per data provider sentiment. + +Sentiment è un read-only data provider (CryptoPanic, LunarCrush, aggregazioni +multi-exchange di funding/OI da endpoint pubblici). Nessun testnet/mainnet, +nessuna sessione HTTP persistente — i `fetchers.py` aprono client httpx ad-hoc +tramite `cerbero_mcp.common.http.async_client`. Questo wrapper esiste solo per +uniformità con il pattern degli altri exchange (DeribitClient/BybitClient/...) +e per essere istanziato dal `ClientRegistry`. +""" +from __future__ import annotations + + +class SentimentClient: + """Wrapper credenziali CryptoPanic/LunarCrush. Stateless, no HTTP session.""" + + def __init__(self, *, cryptopanic_key: str = "", lunarcrush_key: str = "") -> None: + self.cryptopanic_key = cryptopanic_key + self.lunarcrush_key = lunarcrush_key + + async def aclose(self) -> None: # pragma: no cover - no-op, no resources + return None diff --git a/src/cerbero_mcp/exchanges/sentiment/fetchers.py b/src/cerbero_mcp/exchanges/sentiment/fetchers.py new file mode 100644 index 0000000..398ec69 --- /dev/null +++ b/src/cerbero_mcp/exchanges/sentiment/fetchers.py @@ -0,0 +1,660 @@ +from __future__ import annotations + +import os +import re +import xml.etree.ElementTree as ET +from typing import Any + +from cerbero_mcp.common.http import async_client + +CRYPTOPANIC_URL = "https://cryptopanic.com/api/v1/posts/" +ALTERNATIVE_ME_URL = "https://api.alternative.me/fng/" +COINDESK_RSS = "https://www.coindesk.com/arc/outboundfeeds/rss/" +LUNARCRUSH_COIN_URL = "https://lunarcrush.com/api4/public/coins/{symbol}/v1" +CRYPTOCOMPARE_NEWS_URL = "https://min-api.cryptocompare.com/data/v2/news/" +MESSARI_NEWS_URL = "https://data.messari.io/api/v1/news" + + +async def _fetch_coindesk_headlines(limit: int = 20) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + try: + async with async_client(timeout=10.0, follow_redirects=True) as client: + resp = await client.get(COINDESK_RSS) + if resp.status_code != 200: + return items + root = ET.fromstring(resp.text) + for node in root.findall(".//item")[:limit]: + items.append( + { + "title": node.findtext("title", ""), + "source": "CoinDesk", + "published_at": node.findtext("pubDate", ""), + "url": node.findtext("link", ""), + } + ) + except Exception: + pass + return items + +WORLD_NEWS_FEEDS = [ + ("Reuters Business", "https://feeds.reuters.com/reuters/businessNews"), + ("CNBC Top News", "https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=100003114"), + ("Bloomberg Markets", "https://feeds.bloomberg.com/markets/news.rss"), + ("CoinDesk", "https://www.coindesk.com/arc/outboundfeeds/rss/"), +] + +# Public funding rate endpoints (no auth required) +BINANCE_FUNDING_URL = "https://fapi.binance.com/fapi/v1/premiumIndex" +BYBIT_FUNDING_URL = "https://api.bybit.com/v5/market/tickers" +OKX_FUNDING_URL = "https://www.okx.com/api/v5/public/funding-rate" +BINANCE_OI_HIST_URL = "https://fapi.binance.com/futures/data/openInterestHist" + + +async def _fetch_cryptocompare_news(limit: int = 20) -> list[dict[str, Any]]: + """CER-017: CryptoCompare news free (no key needed).""" + items: list[dict[str, Any]] = [] + try: + async with async_client(timeout=10.0) as client: + resp = await client.get(CRYPTOCOMPARE_NEWS_URL, params={"lang": "EN"}) + if resp.status_code != 200: + return items + data = resp.json() + for r in (data.get("Data") or [])[:limit]: + ts = r.get("published_on") + try: + import datetime as _dt + pub = _dt.datetime.fromtimestamp(int(ts), _dt.UTC).isoformat() if ts else "" + except (TypeError, ValueError): + pub = "" + items.append({ + "title": r.get("title", ""), + "source": r.get("source", "CryptoCompare"), + "published_at": pub, + "url": r.get("url", ""), + "provider": "cryptocompare", + }) + except Exception: + pass + return items + + +async def _fetch_messari_news(limit: int = 20) -> list[dict[str, Any]]: + """CER-017: Messari news free (no key needed for basic feed).""" + items: list[dict[str, Any]] = [] + try: + async with async_client(timeout=10.0) as client: + resp = await client.get(MESSARI_NEWS_URL) + if resp.status_code != 200: + return items + data = resp.json() + for r in (data.get("data") or [])[:limit]: + items.append({ + "title": r.get("title", ""), + "source": (r.get("author") or {}).get("name") or "Messari", + "published_at": r.get("published_at", ""), + "url": r.get("url", ""), + "provider": "messari", + }) + except Exception: + pass + return items + + +def _normalize_title(t: str) -> str: + """Lowercase + strip non-alnum per dedup tra provider.""" + return "".join(ch for ch in t.lower() if ch.isalnum() or ch.isspace()).strip() + + +async def fetch_crypto_news(api_key: str = "", limit: int = 20) -> dict[str, Any]: + """CER-017: multi-source aggregator (CoinDesk + CryptoCompare + Messari) + dedup. + + Se `api_key` Cryptopanic presente, include anche quella come 4° source. + """ + import asyncio + + # CoinDesk + CryptoCompare + Messari sempre (free, no key) + tasks = [ + _fetch_coindesk_headlines(limit), + _fetch_cryptocompare_news(limit), + _fetch_messari_news(limit), + ] + include_cp = bool(api_key) and api_key.lower() not in ("placeholder", "none", "changeme") + if include_cp: + tasks.append(_fetch_cryptopanic_news(api_key, limit)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + all_items: list[dict[str, Any]] = [] + providers_ok: list[str] = [] + providers_failed: list[str] = [] + provider_names = ["coindesk", "cryptocompare", "messari"] + if include_cp: + provider_names.append("cryptopanic") + for name, res in zip(provider_names, results, strict=True): + if isinstance(res, Exception) or not res: + providers_failed.append(name) + continue + providers_ok.append(name) + for item in res: + if "provider" not in item: + item["provider"] = name + all_items.append(item) + + # Dedup per normalized title — preserva primo match + seen: set[str] = set() + deduped: list[dict[str, Any]] = [] + for h in all_items: + key = _normalize_title(h.get("title", "")) + if not key or key in seen: + continue + seen.add(key) + deduped.append(h) + + # Sort per published_at DESC (stringhe ISO confrontabili; quelle vuote in fondo) + deduped.sort(key=lambda x: x.get("published_at") or "", reverse=True) + + return { + "headlines": deduped[:limit], + "sources": providers_ok, + "sources_failed": providers_failed, + "total_before_dedup": len(all_items), + "total_after_dedup": len(deduped), + } + + +async def _fetch_cryptopanic_news(api_key: str, limit: int) -> list[dict[str, Any]]: + """Cryptopanic as one of the sources. Failure → [].""" + try: + async with async_client(timeout=10.0) as client: + resp = await client.get( + CRYPTOPANIC_URL, + params={"auth_token": api_key, "public": "true"}, + ) + if resp.status_code >= 400: + return [] + data = resp.json() + except Exception: + return [] + return [ + { + "title": r.get("title", ""), + "source": (r.get("source") or {}).get("title", "Cryptopanic"), + "published_at": r.get("published_at", ""), + "url": r.get("url", ""), + "provider": "cryptopanic", + } + for r in (data.get("results") or [])[:limit] + ] + + +async def _fetch_lunarcrush(symbol: str, api_key: str) -> dict | None: + """CER-P2-005: LunarCrush v4 social metrics. Ritorna None se fail.""" + try: + async with async_client(timeout=10.0) as client: + resp = await client.get( + LUNARCRUSH_COIN_URL.format(symbol=symbol.upper()), + headers={"Authorization": f"Bearer {api_key}"}, + ) + if resp.status_code != 200: + return None + data = (resp.json() or {}).get("data") or {} + return { + "galaxy_score": data.get("galaxy_score"), + "alt_rank": data.get("alt_rank"), + "sentiment": data.get("sentiment"), # 0-100 scale + "social_volume_24h": data.get("social_volume_24h"), + "social_dominance": data.get("social_dominance"), + } + except Exception: + return None + + +def _fng_to_sentiment(value: int) -> float: + """Normalize fear&greed 0-100 to [-1, 1] proxy sentiment.""" + return round((value - 50) / 50.0, 3) + + +async def fetch_social_sentiment(symbol: str = "BTC") -> dict[str, Any]: + """CER-P2-005: provider chain LunarCrush + fear_greed proxy. + + Se LUNARCRUSH_API_KEY env è presente e risponde, usa valori reali. + Altrimenti deriva proxy da fear_greed per popolare twitter/reddit sentiment + (marcato come derived=True così l'agent sa che è proxy). + """ + async with async_client(timeout=10.0) as client: + fng_resp = await client.get(ALTERNATIVE_ME_URL, params={"limit": 1}) + fng_data = fng_resp.json() + fng_list = fng_data.get("data", []) + fng = fng_list[0] if fng_list else {} + fng_value = int(fng.get("value", 0)) + proxy = _fng_to_sentiment(fng_value) if fng_value else 0.0 + + result: dict[str, Any] = { + "fear_greed_index": fng_value, + "fear_greed_label": fng.get("value_classification", ""), + "symbol": symbol.upper(), + "social_volume": 0, + "twitter_sentiment": 0.0, + "reddit_sentiment": 0.0, + "source": "fear_greed_only", + "derived": True, + } + + lc_key = ( + os.environ.get("LUNARCRUSH_KEY") + or os.environ.get("LUNARCRUSH_API_KEY") + or "" + ).strip() + if lc_key: + lc = await _fetch_lunarcrush(symbol, lc_key) + if lc is not None: + # LunarCrush sentiment 0-100 → normalize to [-1, 1] + lc_sent = lc.get("sentiment") + norm = round((float(lc_sent) - 50) / 50.0, 3) if lc_sent is not None else None + result.update({ + "twitter_sentiment": norm if norm is not None else proxy, + "reddit_sentiment": norm if norm is not None else proxy, + "social_volume": int(lc.get("social_volume_24h") or 0), + "galaxy_score": lc.get("galaxy_score"), + "alt_rank": lc.get("alt_rank"), + "social_dominance": lc.get("social_dominance"), + "source": "lunarcrush+fear_greed", + "derived": False, + }) + return result + + # Proxy-only path + result["twitter_sentiment"] = proxy + result["reddit_sentiment"] = proxy + result["note"] = ( + "twitter/reddit derived from fear_greed_index; configure LUNARCRUSH_KEY " + "for real social metrics" + ) + return result + + +async def fetch_funding_rates(asset: str = "BTC") -> dict[str, Any]: + """Fetch perpetual funding rates from Binance, Bybit and OKX public APIs.""" + asset = asset.upper() + usdt_symbol = f"{asset}USDT" + okx_inst = f"{asset}-USDT-SWAP" + rates: list[dict[str, Any]] = [] + + async with async_client(timeout=10.0) as client: + # Binance + try: + resp = await client.get(BINANCE_FUNDING_URL, params={"symbol": usdt_symbol}) + if resp.status_code == 200: + d = resp.json() + rates.append( + { + "exchange": "binance", + "asset": asset, + "rate": float(d.get("lastFundingRate", 0)), + "next_funding_time": d.get("nextFundingTime", ""), + } + ) + except Exception: + pass + + # Bybit + try: + resp = await client.get( + BYBIT_FUNDING_URL, + params={"category": "linear", "symbol": usdt_symbol}, + ) + if resp.status_code == 200: + items = resp.json().get("result", {}).get("list", []) + if items: + d = items[0] + rates.append( + { + "exchange": "bybit", + "asset": asset, + "rate": float(d.get("fundingRate", 0)), + "next_funding_time": d.get("nextFundingTime", ""), + } + ) + except Exception: + pass + + # OKX + try: + resp = await client.get(OKX_FUNDING_URL, params={"instId": okx_inst}) + if resp.status_code == 200: + items = resp.json().get("data", []) + if items: + d = items[0] + rates.append( + { + "exchange": "okx", + "asset": asset, + "rate": float(d.get("fundingRate", 0)), + "next_funding_time": d.get("nextFundingTime", ""), + } + ) + except Exception: + pass + + return {"asset": asset, "rates": rates} + + +async def fetch_cross_exchange_funding(assets: list[str] | None = None) -> dict[str, Any]: + """Snapshot multi-asset funding rates con spread e arbitrage detection.""" + from datetime import UTC + from datetime import datetime as _dt + + assets = [a.upper() for a in (assets or ["BTC", "ETH", "SOL"])] + snapshot: dict[str, dict[str, Any]] = {} + async with async_client(timeout=10.0) as client: + for asset in assets: + rates: dict[str, float | None] = { + "binance": None, + "bybit": None, + "okx": None, + "hyperliquid": None, + } + try: + resp = await client.get( + BINANCE_FUNDING_URL, params={"symbol": f"{asset}USDT"} + ) + if resp.status_code == 200: + rates["binance"] = float(resp.json().get("lastFundingRate", 0)) + except Exception: + pass + try: + resp = await client.get( + BYBIT_FUNDING_URL, + params={"category": "linear", "symbol": f"{asset}USDT"}, + ) + if resp.status_code == 200: + items = resp.json().get("result", {}).get("list", []) + if items: + rates["bybit"] = float(items[0].get("fundingRate", 0)) + except Exception: + pass + try: + resp = await client.get( + OKX_FUNDING_URL, params={"instId": f"{asset}-USDT-SWAP"} + ) + if resp.status_code == 200: + items = resp.json().get("data", []) + if items: + rates["okx"] = float(items[0].get("fundingRate", 0)) + except Exception: + pass + try: + resp = await client.post( + "https://api.hyperliquid.xyz/info", + json={"type": "metaAndAssetCtxs"}, + ) + if resp.status_code == 200: + data = resp.json() + universe = data[0].get("universe") or [] + ctx_list = data[1] if len(data) > 1 else [] + for meta, ctx in zip(universe, ctx_list, strict=False): + if meta.get("name", "").upper() == asset: + rates["hyperliquid"] = float(ctx.get("funding", 0)) + break + except Exception: + pass + + present = [v for v in rates.values() if v is not None] + spread_max_min = max(present) - min(present) if present else None + anomaly = None + if present and spread_max_min is not None: + mean_r = sum(present) / len(present) + for name, v in rates.items(): + if v is None: + continue + if abs(v - mean_r) > 2 * (spread_max_min / 2 or 1e-9): + anomaly = f"{name}_outlier" + break + + snapshot[asset] = { + **rates, + "spread_max_min": spread_max_min, + "anomaly": anomaly, + } + + # Arbitrage opportunities + arbs = [] + for asset, data in snapshot.items(): + values = [(k, v) for k, v in data.items() if k in ("binance", "bybit", "okx", "hyperliquid") and v is not None] + if len(values) < 2: + continue + values.sort(key=lambda x: x[1]) + low_ex, low_v = values[0] + high_ex, high_v = values[-1] + diff = high_v - low_v + ann_pct = diff * 24 * 365 * 100 # hourly funding → annual pct + if ann_pct > 50: + arbs.append({ + "asset": asset, + "pair": f"long_{low_ex}_short_{high_ex}", + "funding_differential_ann": round(ann_pct, 2), + "risk_adjusted": "acceptable" if ann_pct > 100 else "marginal", + }) + + return { + "assets": assets, + "snapshot": snapshot, + "arbitrage_opportunities": arbs, + "data_timestamp": _dt.now(UTC).isoformat(), + } + + +async def fetch_funding_arb_spread(assets: list[str] | None = None) -> dict[str, Any]: + """Riassume opportunità di arbitrage funding su cross-exchange in un + formato compatto: per ogni asset, rate min/max + spread + annualized %. + Wrapper su fetch_cross_exchange_funding focalizzato su action items. + """ + base = await fetch_cross_exchange_funding(assets) + snapshot = base.get("snapshot") or {} + rows: list[dict[str, Any]] = [] + for asset, data in snapshot.items(): + rates = {k: v for k, v in data.items() if k in ("binance", "bybit", "okx", "hyperliquid") and v is not None} + if len(rates) < 2: + continue + sorted_rates = sorted(rates.items(), key=lambda x: x[1]) + low_ex, low_v = sorted_rates[0] + high_ex, high_v = sorted_rates[-1] + spread = high_v - low_v + # Funding cycle: 8h on most, 1h on hyperliquid → assume 8h => 3x/day + ann_pct = spread * 3 * 365 * 100 + actionable = ann_pct > 50 + rows.append({ + "asset": asset, + "long_venue": low_ex, + "short_venue": high_ex, + "long_funding": low_v, + "short_funding": high_v, + "spread": spread, + "annualized_pct": round(ann_pct, 2), + "actionable": actionable, + }) + rows.sort(key=lambda r: -r["annualized_pct"]) + return { + "opportunities": rows, + "data_timestamp": base.get("data_timestamp"), + } + + +async def fetch_liquidation_heatmap(asset: str = "BTC") -> dict[str, Any]: + """Heuristic liquidation pressure: combina OI delta + funding extreme su + asset. NON usa feed liq paid (Coinglass): stima dove si concentra + leveraged exposure pronta a essere liquidata. + + long_squeeze_risk: high se OI cresce + funding positivo (long crowded). + short_squeeze_risk: high se OI cresce + funding negativo (short crowded). + """ + asset = asset.upper() + oi = await fetch_oi_history(asset=asset, period="5m", limit=288) + funding = await fetch_cross_exchange_funding(assets=[asset]) + snap = (funding.get("snapshot") or {}).get(asset) or {} + rates = [v for k, v in snap.items() if k in ("binance", "bybit", "okx", "hyperliquid") and v is not None] + avg_funding = sum(rates) / len(rates) if rates else None + + delta_4h = oi.get("delta_pct_4h") + delta_24h = oi.get("delta_pct_24h") + + long_risk = "low" + short_risk = "low" + if avg_funding is not None and delta_24h is not None: + if avg_funding > 0.0001 and delta_24h > 5: + long_risk = "high" + elif avg_funding > 0.00005 and delta_24h > 2: + long_risk = "medium" + if avg_funding < -0.0001 and delta_24h > 5: + short_risk = "high" + elif avg_funding < -0.00005 and delta_24h > 2: + short_risk = "medium" + + return { + "asset": asset, + "avg_funding_rate": avg_funding, + "oi_delta_pct_4h": delta_4h, + "oi_delta_pct_24h": delta_24h, + "long_squeeze_risk": long_risk, + "short_squeeze_risk": short_risk, + "note": "heuristic — non sostituisce feed liq dedicati (Coinglass).", + } + + +async def fetch_cointegration_pairs( + pairs: list[list[str]] | None = None, + lookback_hours: int = 24, +) -> dict[str, Any]: + """Test Engle-Granger su coppie crypto su Binance hourly. + pairs: lista di [base, quote] (es. [["BTC", "ETH"]]). Default top-3. + """ + from cerbero_mcp.common.stats import cointegration_test + + pairs = pairs or [["BTC", "ETH"], ["BTC", "SOL"], ["ETH", "SOL"]] + out: list[dict[str, Any]] = [] + interval = "1h" + limit = max(50, lookback_hours) + + async with async_client(timeout=15.0) as client: + for pair in pairs: + if len(pair) != 2: + continue + a, b = pair[0].upper(), pair[1].upper() + sym_a = f"{a}USDT" + sym_b = f"{b}USDT" + try: + resp_a = await client.get( + "https://api.binance.com/api/v3/klines", + params={"symbol": sym_a, "interval": interval, "limit": limit}, + ) + resp_b = await client.get( + "https://api.binance.com/api/v3/klines", + params={"symbol": sym_b, "interval": interval, "limit": limit}, + ) + if resp_a.status_code != 200 or resp_b.status_code != 200: + continue + closes_a = [float(k[4]) for k in resp_a.json()] + closes_b = [float(k[4]) for k in resp_b.json()] + if len(closes_a) != len(closes_b): + n = min(len(closes_a), len(closes_b)) + closes_a = closes_a[-n:] + closes_b = closes_b[-n:] + result = cointegration_test(closes_a, closes_b) + out.append({ + "pair": [a, b], + "samples": len(closes_a), + **result, + }) + except Exception as e: + out.append({"pair": [a, b], "error": str(e)}) + + out.sort(key=lambda r: r.get("adf_t_stat") or 0) + return { + "results": out, + "lookback_hours": lookback_hours, + } + + +async def fetch_world_news() -> dict[str, Any]: + """Fetch world financial news from free RSS feeds.""" + articles: list[dict[str, Any]] = [] + + async with async_client(timeout=10.0, follow_redirects=True) as client: + for source_name, url in WORLD_NEWS_FEEDS: + try: + resp = await client.get(url) + if resp.status_code != 200: + continue + root = ET.fromstring(resp.text) + for item in root.findall(".//item")[:5]: + title = item.findtext("title", "") + link = item.findtext("link", "") + pub_date = item.findtext("pubDate", "") + description = item.findtext("description", "") + if "<" in description: + description = re.sub(r"<[^>]+>", "", description).strip() + articles.append( + { + "source": source_name, + "title": title, + "url": link, + "published": pub_date, + "summary": description[:200] if description else "", + } + ) + except Exception: + continue + + return {"articles": articles, "count": len(articles)} + + +async def fetch_oi_history(asset: str = "BTC", period: str = "5m", limit: int = 288) -> dict[str, Any]: + """Open interest history perpetual da Binance futures (public). + + period: 5m|15m|30m|1h|2h|4h|6h|12h|1d (Binance API). + limit: 1..500, default 288 = 24h a 5min. + """ + asset = asset.upper() + symbol = f"{asset}USDT" + limit = max(1, min(int(limit), 500)) + points: list[dict[str, Any]] = [] + try: + async with async_client(timeout=10.0) as client: + resp = await client.get( + BINANCE_OI_HIST_URL, + params={"symbol": symbol, "period": period, "limit": limit}, + ) + if resp.status_code == 200: + for row in resp.json() or []: + points.append( + { + "timestamp": int(row.get("timestamp", 0)), + "oi": float(row.get("sumOpenInterest", 0)), + "oi_value_usd": float(row.get("sumOpenInterestValue", 0)), + } + ) + except Exception: + pass + + def _delta_pct(points: list[dict[str, Any]], minutes_back: int) -> float | None: + if len(points) < 2: + return None + current = points[-1] + cutoff_ts = current["timestamp"] - minutes_back * 60 * 1000 + past = next((p for p in reversed(points) if p["timestamp"] <= cutoff_ts), None) + if past is None or past["oi"] == 0: + return None + return round(100.0 * (current["oi"] - past["oi"]) / past["oi"], 3) + + return { + "asset": asset, + "exchange": "binance", + "symbol": symbol, + "period": period, + "points": points, + "current_oi": points[-1]["oi"] if points else None, + "current_oi_value_usd": points[-1]["oi_value_usd"] if points else None, + "delta_pct_1h": _delta_pct(points, 60), + "delta_pct_4h": _delta_pct(points, 240), + "delta_pct_24h": _delta_pct(points, 1440), + "data_points": len(points), + } diff --git a/src/cerbero_mcp/exchanges/sentiment/tools.py b/src/cerbero_mcp/exchanges/sentiment/tools.py new file mode 100644 index 0000000..d69a42b --- /dev/null +++ b/src/cerbero_mcp/exchanges/sentiment/tools.py @@ -0,0 +1,122 @@ +"""Tool sentiment V2: pydantic schemas + async functions. + +Ogni funzione prende (client: SentimentClient, params: ) e ritorna un dict. +Pure logica, no FastAPI dependency, no ACL. Sentiment è read-only data provider: +nessun write tool, nessun leverage_cap. +""" +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel + +from cerbero_mcp.exchanges.sentiment.client import SentimentClient +from cerbero_mcp.exchanges.sentiment.fetchers import ( + fetch_cointegration_pairs, + fetch_cross_exchange_funding, + fetch_crypto_news, + fetch_funding_arb_spread, + fetch_funding_rates, + fetch_liquidation_heatmap, + fetch_oi_history, + fetch_social_sentiment, + fetch_world_news, +) + +# === Schemas === + + +class GetCryptoNewsReq(BaseModel): + limit: int = 20 + + +class GetSocialSentimentReq(BaseModel): + symbol: str = "BTC" + + +class GetFundingRatesReq(BaseModel): + asset: str = "BTC" + + +class GetWorldNewsReq(BaseModel): + pass + + +class GetCrossExchangeFundingReq(BaseModel): + assets: list[str] | None = None + + +class GetFundingArbSpreadReq(BaseModel): + assets: list[str] | None = None + + +class GetLiquidationHeatmapReq(BaseModel): + asset: str = "BTC" + + +class GetCointegrationPairsReq(BaseModel): + pairs: list[list[str]] | None = None + lookback_hours: int = 24 + + +class GetOiHistoryReq(BaseModel): + asset: str = "BTC" + period: str = "5m" + limit: int = 288 + + +# === Tool functions === + + +async def get_crypto_news( + client: SentimentClient, params: GetCryptoNewsReq +) -> dict[str, Any]: + return await fetch_crypto_news(api_key=client.cryptopanic_key, limit=params.limit) + + +async def get_social_sentiment( + client: SentimentClient, params: GetSocialSentimentReq +) -> dict[str, Any]: + return await fetch_social_sentiment(params.symbol) + + +async def get_funding_rates( + client: SentimentClient, params: GetFundingRatesReq +) -> dict[str, Any]: + return await fetch_funding_rates(params.asset) + + +async def get_world_news( + client: SentimentClient, params: GetWorldNewsReq +) -> dict[str, Any]: + return await fetch_world_news() + + +async def get_cross_exchange_funding( + client: SentimentClient, params: GetCrossExchangeFundingReq +) -> dict[str, Any]: + return await fetch_cross_exchange_funding(params.assets) + + +async def get_funding_arb_spread( + client: SentimentClient, params: GetFundingArbSpreadReq +) -> dict[str, Any]: + return await fetch_funding_arb_spread(params.assets) + + +async def get_liquidation_heatmap( + client: SentimentClient, params: GetLiquidationHeatmapReq +) -> dict[str, Any]: + return await fetch_liquidation_heatmap(params.asset) + + +async def get_cointegration_pairs( + client: SentimentClient, params: GetCointegrationPairsReq +) -> dict[str, Any]: + return await fetch_cointegration_pairs(params.pairs, params.lookback_hours) + + +async def get_oi_history( + client: SentimentClient, params: GetOiHistoryReq +) -> dict[str, Any]: + return await fetch_oi_history(params.asset, params.period, params.limit) diff --git a/src/cerbero_mcp/routers/sentiment.py b/src/cerbero_mcp/routers/sentiment.py new file mode 100644 index 0000000..5a02e49 --- /dev/null +++ b/src/cerbero_mcp/routers/sentiment.py @@ -0,0 +1,98 @@ +"""Router /mcp-sentiment/* — read-only data provider. + +Sentiment non distingue testnet/mainnet (CryptoPanic, LunarCrush e gli endpoint +pubblici di funding/OI multi-exchange sono unici), ma manteniamo la signature +`Environment` per uniformità con gli altri router. Tutti i tool sono READ — +niente write, niente leverage_cap. +""" +from __future__ import annotations + +from typing import Literal + +from fastapi import APIRouter, Depends, Request + +from cerbero_mcp.client_registry import ClientRegistry +from cerbero_mcp.exchanges.sentiment import tools as t +from cerbero_mcp.exchanges.sentiment.client import SentimentClient + +Environment = Literal["testnet", "mainnet"] + + +def get_environment(request: Request) -> Environment: + return request.state.environment + + +async def get_sentiment_client( + request: Request, env: Environment = Depends(get_environment) +) -> SentimentClient: + registry: ClientRegistry = request.app.state.registry + return await registry.get("sentiment", env) + + +def make_router() -> APIRouter: + r = APIRouter(prefix="/mcp-sentiment", tags=["sentiment"]) + + @r.post("/tools/get_crypto_news") + async def _get_crypto_news( + params: t.GetCryptoNewsReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_crypto_news(client, params) + + @r.post("/tools/get_world_news") + async def _get_world_news( + params: t.GetWorldNewsReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_world_news(client, params) + + @r.post("/tools/get_social_sentiment") + async def _get_social_sentiment( + params: t.GetSocialSentimentReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_social_sentiment(client, params) + + @r.post("/tools/get_funding_rates") + async def _get_funding_rates( + params: t.GetFundingRatesReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_funding_rates(client, params) + + @r.post("/tools/get_funding_arb_spread") + async def _get_funding_arb_spread( + params: t.GetFundingArbSpreadReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_funding_arb_spread(client, params) + + @r.post("/tools/get_cross_exchange_funding") + async def _get_cross_exchange_funding( + params: t.GetCrossExchangeFundingReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_cross_exchange_funding(client, params) + + @r.post("/tools/get_oi_history") + async def _get_oi_history( + params: t.GetOiHistoryReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_oi_history(client, params) + + @r.post("/tools/get_liquidation_heatmap") + async def _get_liquidation_heatmap( + params: t.GetLiquidationHeatmapReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_liquidation_heatmap(client, params) + + @r.post("/tools/get_cointegration_pairs") + async def _get_cointegration_pairs( + params: t.GetCointegrationPairsReq, + client: SentimentClient = Depends(get_sentiment_client), + ): + return await t.get_cointegration_pairs(client, params) + + return r diff --git a/tests/unit/exchanges/sentiment/__init__.py b/tests/unit/exchanges/sentiment/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/exchanges/sentiment/test_fetchers.py b/tests/unit/exchanges/sentiment/test_fetchers.py new file mode 100644 index 0000000..51c225c --- /dev/null +++ b/tests/unit/exchanges/sentiment/test_fetchers.py @@ -0,0 +1,320 @@ +from __future__ import annotations + +import httpx +import pytest +import pytest_httpx +from cerbero_mcp.exchanges.sentiment.fetchers import ( + fetch_crypto_news, + fetch_funding_rates, + fetch_social_sentiment, + fetch_world_news, +) + +# --- CER-017 multi-source news aggregator --- + +_COINDESK_RSS = ( + '' + "ETH rallyhttps://coindesk.com/eth" + "2026-04-19" + "Common headlinehttps://coindesk.com/x" + "2026-04-18" + "" +) + + +def _mock_three_providers(httpx_mock: pytest_httpx.HTTPXMock, *, cc_items=None, messari_items=None): + httpx_mock.add_response(url="https://www.coindesk.com/arc/outboundfeeds/rss/", text=_COINDESK_RSS) + httpx_mock.add_response( + url="https://min-api.cryptocompare.com/data/v2/news/?lang=EN", + json={"Data": cc_items if cc_items is not None else [ + {"title": "BTC ATH", "source": "CryptoCompare", "published_on": 1761868800, "url": "https://x/1"}, + {"title": "Common headline", "source": "Reuters", "published_on": 1761782400, "url": "https://x/2"}, + ]}, + ) + httpx_mock.add_response( + url="https://data.messari.io/api/v1/news", + json={"data": messari_items if messari_items is not None else [ + {"title": "SOL rally", "author": {"name": "Messari"}, "published_at": "2026-04-19T10:00:00Z", "url": "https://x/3"}, + ]}, + ) + + +@pytest.mark.asyncio +async def test_crypto_news_aggregates_three_sources(httpx_mock: pytest_httpx.HTTPXMock): + """CER-017: CoinDesk + CryptoCompare + Messari in parallelo.""" + _mock_three_providers(httpx_mock) + result = await fetch_crypto_news(limit=20) + titles = {h["title"] for h in result["headlines"]} + assert "ETH rally" in titles + assert "BTC ATH" in titles + assert "SOL rally" in titles + assert set(result["sources"]) == {"coindesk", "cryptocompare", "messari"} + assert result["sources_failed"] == [] + + +@pytest.mark.asyncio +async def test_crypto_news_dedup_by_title(httpx_mock: pytest_httpx.HTTPXMock): + """CER-017: stesso titolo su 2 provider → 1 sola entry.""" + _mock_three_providers(httpx_mock) + result = await fetch_crypto_news(limit=20) + common_count = sum(1 for h in result["headlines"] if h["title"].lower() == "common headline") + assert common_count == 1 + assert result["total_before_dedup"] > result["total_after_dedup"] + + +@pytest.mark.asyncio +async def test_crypto_news_partial_failure(httpx_mock: pytest_httpx.HTTPXMock): + """CER-017: 1 provider 500 → altri proseguono, sources_failed riporta.""" + httpx_mock.add_response(url="https://www.coindesk.com/arc/outboundfeeds/rss/", text=_COINDESK_RSS) + httpx_mock.add_response( + url="https://min-api.cryptocompare.com/data/v2/news/?lang=EN", + status_code=500, + ) + httpx_mock.add_response( + url="https://data.messari.io/api/v1/news", + json={"data": [{"title": "OK Messari", "author": {"name": "M"}, "published_at": "2026-04-19T10:00:00Z", "url": "https://x"}]}, + ) + result = await fetch_crypto_news(limit=20) + assert "cryptocompare" in result["sources_failed"] + assert "coindesk" in result["sources"] + assert "messari" in result["sources"] + + +@pytest.mark.asyncio +async def test_crypto_news_sorted_desc_by_date(httpx_mock: pytest_httpx.HTTPXMock): + """CER-017: ordine published_at DESC.""" + _mock_three_providers(httpx_mock) + result = await fetch_crypto_news(limit=20) + dates = [h.get("published_at") or "" for h in result["headlines"] if h.get("published_at")] + assert dates == sorted(dates, reverse=True) + + +@pytest.mark.asyncio +async def test_crypto_news_with_cryptopanic_key(httpx_mock: pytest_httpx.HTTPXMock): + """CER-017: se api_key presente, include Cryptopanic come 4° source.""" + _mock_three_providers(httpx_mock) + httpx_mock.add_response( + url=httpx.URL("https://cryptopanic.com/api/v1/posts/", params={"auth_token": "k", "public": "true"}), + json={"results": [{ + "title": "Cryptopanic exclusive", + "source": {"title": "CP"}, + "published_at": "2026-04-20T00:00:00Z", + "url": "https://x/cp", + }]}, + ) + result = await fetch_crypto_news(api_key="k", limit=20) + titles = {h["title"] for h in result["headlines"]} + assert "Cryptopanic exclusive" in titles + assert "cryptopanic" in result["sources"] + + +@pytest.mark.asyncio +async def test_crypto_news_placeholder_key_skips_cryptopanic(httpx_mock: pytest_httpx.HTTPXMock): + """CER-017: api_key placeholder → no Cryptopanic call.""" + _mock_three_providers(httpx_mock) + result = await fetch_crypto_news(api_key="placeholder", limit=20) + assert "cryptopanic" not in result["sources"] + assert "cryptopanic" not in result["sources_failed"] + + +@pytest.mark.asyncio +async def test_crypto_news_provider_tracing(httpx_mock: pytest_httpx.HTTPXMock): + """CER-017: ogni headline ha campo provider.""" + _mock_three_providers(httpx_mock) + result = await fetch_crypto_news(limit=20) + for h in result["headlines"]: + assert h.get("provider") in {"coindesk", "cryptocompare", "messari"} + + +# --- fetch_social_sentiment --- + +@pytest.mark.asyncio +async def test_social_sentiment_happy(httpx_mock: pytest_httpx.HTTPXMock): + httpx_mock.add_response( + url=httpx.URL( + "https://api.alternative.me/fng/", + params={"limit": "1"}, + ), + json={"data": [{"value": "72", "value_classification": "Greed"}]}, + ) + result = await fetch_social_sentiment() + assert result["fear_greed_index"] == 72 + assert result["fear_greed_label"] == "Greed" + assert "social_volume" in result + + +@pytest.mark.asyncio +async def test_social_sentiment_empty_data(httpx_mock: pytest_httpx.HTTPXMock): + httpx_mock.add_response( + url=httpx.URL( + "https://api.alternative.me/fng/", + params={"limit": "1"}, + ), + json={"data": []}, + ) + result = await fetch_social_sentiment() + assert result["fear_greed_index"] == 0 + assert result["fear_greed_label"] == "" + + +@pytest.mark.asyncio +async def test_social_sentiment_derives_proxy_from_fng( + httpx_mock: pytest_httpx.HTTPXMock, monkeypatch +): + """CER-P2-005: senza LUNARCRUSH_API_KEY, twitter/reddit derivano da F&G.""" + monkeypatch.delenv("LUNARCRUSH_API_KEY", raising=False) + httpx_mock.add_response( + url=httpx.URL("https://api.alternative.me/fng/", params={"limit": "1"}), + json={"data": [{"value": "25", "value_classification": "Extreme Fear"}]}, + ) + result = await fetch_social_sentiment() + assert result["twitter_sentiment"] == pytest.approx(-0.5) + assert result["reddit_sentiment"] == pytest.approx(-0.5) + assert result["derived"] is True + assert result["source"] == "fear_greed_only" + + +@pytest.mark.asyncio +async def test_social_sentiment_uses_lunarcrush_when_key_present( + httpx_mock: pytest_httpx.HTTPXMock, monkeypatch +): + """CER-P2-005: con LUNARCRUSH_API_KEY, valori reali.""" + monkeypatch.setenv("LUNARCRUSH_API_KEY", "test-key") + httpx_mock.add_response( + url=httpx.URL("https://api.alternative.me/fng/", params={"limit": "1"}), + json={"data": [{"value": "50", "value_classification": "Neutral"}]}, + ) + httpx_mock.add_response( + url="https://lunarcrush.com/api4/public/coins/BTC/v1", + json={"data": { + "sentiment": 80, + "galaxy_score": 75, + "alt_rank": 3, + "social_volume_24h": 12345, + "social_dominance": 25.5, + }}, + ) + result = await fetch_social_sentiment("BTC") + assert result["twitter_sentiment"] == pytest.approx(0.6) + assert result["reddit_sentiment"] == pytest.approx(0.6) + assert result["social_volume"] == 12345 + assert result["galaxy_score"] == 75 + assert result["derived"] is False + assert "lunarcrush" in result["source"] + + +@pytest.mark.asyncio +async def test_social_sentiment_lunarcrush_failure_fallback_to_proxy( + httpx_mock: pytest_httpx.HTTPXMock, monkeypatch +): + """CER-P2-005: se LC fallisce, fallback a proxy F&G — no crash.""" + monkeypatch.setenv("LUNARCRUSH_API_KEY", "broken-key") + httpx_mock.add_response( + url=httpx.URL("https://api.alternative.me/fng/", params={"limit": "1"}), + json={"data": [{"value": "75", "value_classification": "Greed"}]}, + ) + httpx_mock.add_response( + url="https://lunarcrush.com/api4/public/coins/BTC/v1", + status_code=401, + json={"error": "unauthorized"}, + ) + result = await fetch_social_sentiment("BTC") + assert result["twitter_sentiment"] == pytest.approx(0.5) + assert result["derived"] is True + assert result["source"] == "fear_greed_only" + + +# --- fetch_funding_rates --- + +@pytest.mark.asyncio +async def test_funding_rates_all_exchanges(httpx_mock: pytest_httpx.HTTPXMock): + httpx_mock.add_response( + url=httpx.URL( + "https://fapi.binance.com/fapi/v1/premiumIndex", + params={"symbol": "BTCUSDT"}, + ), + json={"lastFundingRate": "0.0001", "nextFundingTime": 1700000000000}, + ) + httpx_mock.add_response( + url=httpx.URL( + "https://api.bybit.com/v5/market/tickers", + params={"category": "linear", "symbol": "BTCUSDT"}, + ), + json={"result": {"list": [{"fundingRate": "0.0002", "nextFundingTime": "1700000000000"}]}}, + ) + httpx_mock.add_response( + url=httpx.URL( + "https://www.okx.com/api/v5/public/funding-rate", + params={"instId": "BTC-USDT-SWAP"}, + ), + json={"data": [{"fundingRate": "0.00015", "nextFundingTime": "1700000000000"}]}, + ) + result = await fetch_funding_rates() + assert "rates" in result + exchanges = {r["exchange"] for r in result["rates"]} + assert "binance" in exchanges + assert "bybit" in exchanges + assert "okx" in exchanges + + +@pytest.mark.asyncio +async def test_funding_rates_partial_failure(httpx_mock: pytest_httpx.HTTPXMock): + """If some exchanges fail, we still get results from others.""" + httpx_mock.add_response( + url=httpx.URL( + "https://fapi.binance.com/fapi/v1/premiumIndex", + params={"symbol": "BTCUSDT"}, + ), + json={"lastFundingRate": "0.0001", "nextFundingTime": 1700000000000}, + ) + httpx_mock.add_response( + url=httpx.URL( + "https://api.bybit.com/v5/market/tickers", + params={"category": "linear", "symbol": "BTCUSDT"}, + ), + status_code=500, + ) + httpx_mock.add_response( + url=httpx.URL( + "https://www.okx.com/api/v5/public/funding-rate", + params={"instId": "BTC-USDT-SWAP"}, + ), + status_code=500, + ) + result = await fetch_funding_rates() + assert len(result["rates"]) == 1 + assert result["rates"][0]["exchange"] == "binance" + + +# --- fetch_world_news --- + +@pytest.mark.asyncio +async def test_world_news_happy(httpx_mock: pytest_httpx.HTTPXMock): + rss_xml = """ + +Markets rallyhttp://example.com/1Mon, 15 Jan 2024 10:00:00 +0000Stocks up +""" + for _, url in [ + ("Reuters Business", "https://feeds.reuters.com/reuters/businessNews"), + ("CNBC Top News", "https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=100003114"), + ("Bloomberg Markets", "https://feeds.bloomberg.com/markets/news.rss"), + ("CoinDesk", "https://www.coindesk.com/arc/outboundfeeds/rss/"), + ]: + httpx_mock.add_response(url=url, text=rss_xml) + result = await fetch_world_news() + assert result["count"] == 4 + assert result["articles"][0]["title"] == "Markets rally" + + +@pytest.mark.asyncio +async def test_world_news_all_fail(httpx_mock: pytest_httpx.HTTPXMock): + for _, url in [ + ("Reuters Business", "https://feeds.reuters.com/reuters/businessNews"), + ("CNBC Top News", "https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=100003114"), + ("Bloomberg Markets", "https://feeds.bloomberg.com/markets/news.rss"), + ("CoinDesk", "https://www.coindesk.com/arc/outboundfeeds/rss/"), + ]: + httpx_mock.add_response(url=url, status_code=503) + result = await fetch_world_news() + assert result["articles"] == [] + assert result["count"] == 0 diff --git a/tests/unit/test_exchanges_builder.py b/tests/unit/test_exchanges_builder.py index 127e945..db568cc 100644 --- a/tests/unit/test_exchanges_builder.py +++ b/tests/unit/test_exchanges_builder.py @@ -125,6 +125,29 @@ async def test_build_client_macro_no_env_distinction(monkeypatch): assert c_test.finnhub_api_key == c_live.finnhub_api_key +@pytest.mark.asyncio +async def test_build_client_sentiment_no_env_distinction(monkeypatch): + from tests.unit.test_settings import _minimal_env + + for k, v in _minimal_env().items(): + monkeypatch.setenv(k, v) + + from cerbero_mcp.settings import Settings + from cerbero_mcp.exchanges import build_client + from cerbero_mcp.exchanges.sentiment.client import SentimentClient + + s = Settings() + c_test = await build_client(s, "sentiment", "testnet") + c_live = await build_client(s, "sentiment", "mainnet") + + # entrambi sono SentimentClient validi (env ignorato) + assert isinstance(c_test, SentimentClient) + assert isinstance(c_live, SentimentClient) + # Stesse credenziali (env ignorato) + assert c_test.cryptopanic_key == c_live.cryptopanic_key + assert c_test.lunarcrush_key == c_live.lunarcrush_key + + @pytest.mark.asyncio async def test_build_client_unknown_exchange_raises(monkeypatch): from tests.unit.test_settings import _minimal_env