from __future__ import annotations from datetime import UTC, datetime, timedelta from typing import Any import httpx from mcp_common.http import async_client from mcp_macro.cot import parse_disagg_row, parse_tff_row from mcp_macro.cot_contracts import ( ALL_DISAGG_SYMBOLS, ALL_TFF_SYMBOLS, CFTC_BASE_URL, DISAGG_DATASET_ID, SYMBOL_TO_CFTC_CODE_DISAGG, SYMBOL_TO_CFTC_CODE_TFF, TFF_DATASET_ID, ) FRED_BASE = "https://api.stlouisfed.org/fred/series/observations" FINNHUB_CALENDAR = "https://finnhub.io/api/v1/calendar/economic" COINGECKO_GLOBAL = "https://api.coingecko.com/api/v3/global" COINGECKO_SIMPLE = "https://api.coingecko.com/api/v3/simple/price" DERIBIT_DVOL = "https://www.deribit.com/api/v2/public/get_volatility_index_data" YAHOO_CHART = "https://query1.finance.yahoo.com/v8/finance/chart/{symbol}" ASSET_TICKER_MAP: dict[str, tuple[str, str]] = { "WTI": ("CL=F", "WTI Crude Oil"), "BRENT": ("BZ=F", "Brent Crude Oil"), "GOLD": ("GC=F", "Gold Futures"), "SILVER": ("SI=F", "Silver Futures"), "COPPER": ("HG=F", "Copper Futures"), "NATGAS": ("NG=F", "Natural Gas"), "DXY": ("DX-Y.NYB", "US Dollar Index"), "SPX": ("^GSPC", "S&P 500"), "NDX": ("^NDX", "Nasdaq 100"), "DJI": ("^DJI", "Dow Jones"), "RUT": ("^RUT", "Russell 2000"), "VIX": ("^VIX", "CBOE Volatility Index"), "US5Y": ("^FVX", "US 5-Year Treasury"), "US10Y": ("^TNX", "US 10-Year Treasury"), "US30Y": ("^TYX", "US 30-Year Treasury"), "US2Y": ("^UST2YR", "US 2-Year Treasury"), "EURUSD": ("EURUSD=X", "EUR/USD"), "USDJPY": ("JPY=X", "USD/JPY"), "GBPUSD": ("GBPUSD=X", "GBP/USD"), "BTCUSD": ("BTC-USD", "Bitcoin/USD"), "ETHUSD": ("ETH-USD", "Ethereum/USD"), "ES": ("ES=F", "E-mini S&P 500 Futures"), "NQ": ("NQ=F", "E-mini Nasdaq 100 Futures"), "YM": ("YM=F", "E-mini Dow Futures"), "RTY": ("RTY=F", "E-mini Russell 2000 Futures"), } _ASSET_CACHE: dict[str, dict] = {} _ASSET_CACHE_TTL = 60.0 async def _fetch_yahoo_meta(client: httpx.AsyncClient, symbol: str, range_: str = "10d") -> dict: try: resp = await client.get( YAHOO_CHART.format(symbol=symbol), params={"interval": "1d", "range": range_}, headers={"User-Agent": "Mozilla/5.0"}, ) if resp.status_code != 200: return {} result = (resp.json().get("chart") or {}).get("result") or [] if not result: return {} r0 = result[0] meta = r0.get("meta") or {} closes = ((r0.get("indicators") or {}).get("quote") or [{}])[0].get("close") or [] closes = [c for c in closes if c is not None] return {"meta": meta, "closes": closes} except Exception: return {} async def fetch_asset_price(ticker: str) -> dict[str, Any]: import time key = ticker.upper() now = time.monotonic() cached = _ASSET_CACHE.get(key) if cached and (now - cached["ts"]) < _ASSET_CACHE_TTL: return cached["data"] mapping = ASSET_TICKER_MAP.get(key) if not mapping: return {"ticker": ticker, "error": f"unknown ticker {ticker}"} symbol, name = mapping async with async_client(timeout=10.0) as client: info = await _fetch_yahoo_meta(client, symbol, "10d") meta = info.get("meta") or {} closes = info.get("closes") or [] price = meta.get("regularMarketPrice") prev_close = meta.get("previousClose") change_24h_pct = None if price is not None and prev_close: try: change_24h_pct = round((float(price) - float(prev_close)) / float(prev_close) * 100, 3) except Exception: change_24h_pct = None change_7d_pct = None if len(closes) >= 6 and price is not None: try: change_7d_pct = round((float(price) - float(closes[-6])) / float(closes[-6]) * 100, 3) except Exception: change_7d_pct = None out = { "ticker": key, "name": name, "price": float(price) if price is not None else None, "change_24h_pct": change_24h_pct, "change_7d_pct": change_7d_pct, "source": f"yfinance:{symbol}", "data_timestamp": datetime.now(UTC).isoformat(), } _ASSET_CACHE[key] = {"data": out, "ts": now} return out _TREASURY_CACHE: dict[str, Any] = {"data": None, "ts": 0.0} _TREASURY_TTL = 300.0 async def fetch_treasury_yields() -> dict[str, Any]: import time now = time.monotonic() if _TREASURY_CACHE["data"] and (now - _TREASURY_CACHE["ts"]) < _TREASURY_TTL: return _TREASURY_CACHE["data"] symbols = [ ("us2y", "^UST2YR"), ("us5y", "^FVX"), ("us10y", "^TNX"), ("us30y", "^TYX"), ] yields: dict[str, float | None] = {} async with async_client(timeout=10.0) as client: for key, sym in symbols: info = await _fetch_yahoo_meta(client, sym, "5d") meta = info.get("meta") or {} price = meta.get("regularMarketPrice") yields[key] = float(price) if price is not None else None spread = None if yields.get("us10y") is not None and yields.get("us2y") is not None: spread = round(yields["us10y"] - yields["us2y"], 3) shape = "unknown" if spread is not None: if spread > 0.25: shape = "normal" elif spread < -0.1: shape = "inverted" else: shape = "flat" out = { "yields": yields, "spread_2y10y": spread, "yield_curve_shape": shape, "data_timestamp": datetime.now(UTC).isoformat(), } _TREASURY_CACHE["data"] = out _TREASURY_CACHE["ts"] = now return out def yield_curve_metrics(yields: dict[str, float | None]) -> dict[str, Any]: """Slope + convexity da curva yields (us2y, us5y, us10y, us30y). Convexity (butterfly): 2*us10y - us2y - us30y. >0 = curva concava. """ y2 = yields.get("us2y") y5 = yields.get("us5y") y10 = yields.get("us10y") y30 = yields.get("us30y") slope_2y10y = (y10 - y2) if (y2 is not None and y10 is not None) else None slope_5y30y = (y30 - y5) if (y5 is not None and y30 is not None) else None butterfly_2_10_30 = (2 * y10 - y2 - y30) if (y2 is not None and y10 is not None and y30 is not None) else None regime = "unknown" if slope_2y10y is not None: if slope_2y10y >= 0.5: regime = "steep" elif slope_2y10y > 0.1: regime = "normal" elif slope_2y10y > -0.1: regime = "flat" else: regime = "inverted" return { "slope_2y10y": round(slope_2y10y, 3) if slope_2y10y is not None else None, "slope_5y30y": round(slope_5y30y, 3) if slope_5y30y is not None else None, "butterfly_2_10_30": round(butterfly_2_10_30, 3) if butterfly_2_10_30 is not None else None, "regime": regime, } async def fetch_yield_curve_slope() -> dict[str, Any]: """Curve slope/convexity metrics su treasury yields correnti.""" base = await fetch_treasury_yields() metrics = yield_curve_metrics(base.get("yields") or {}) return { "yields": base.get("yields"), **metrics, "data_timestamp": datetime.now(UTC).isoformat(), } async def fetch_breakeven_inflation(fred_api_key: str = "") -> dict[str, Any]: """Breakeven inflation rate via FRED: - T10YIE (10Y breakeven, market expectation 10Y inflation) - T5YIE (5Y breakeven) - T5YIFR (5Y forward 5Y forward inflation expectation) """ if not fred_api_key: return {"error": "No FRED API key configured", "breakevens": {}} series_map = { "be_5y": "T5YIE", "be_10y": "T10YIE", "be_5y5y_forward": "T5YIFR", } out: dict[str, float | None] = {} async with async_client(timeout=10.0) as client: for name, series_id in series_map.items(): resp = await client.get( FRED_BASE, params={ "series_id": series_id, "api_key": fred_api_key, "file_type": "json", "sort_order": "desc", "limit": 1, }, ) data = resp.json() obs = data.get("observations", []) try: out[name] = float(obs[0]["value"]) if obs and obs[0]["value"] != "." else None except (ValueError, IndexError, KeyError): out[name] = None interpretation = "unknown" be10 = out.get("be_10y") if be10 is not None: if be10 > 3.0: interpretation = "high_inflation_expected" elif be10 < 1.5: interpretation = "low_inflation_expected" else: interpretation = "anchored" return { "breakevens": out, "interpretation": interpretation, "data_timestamp": datetime.now(UTC).isoformat(), } async def fetch_equity_futures() -> dict[str, Any]: """Fetch ES/NQ/YM/RTY futures con session detection.""" tickers = [("es", "ES=F"), ("nq", "NQ=F"), ("ym", "YM=F"), ("rty", "RTY=F")] now = datetime.now(UTC) weekday = now.weekday() # 0=Mon hour_utc = now.hour cash_open = (weekday < 5) and (13 <= hour_utc < 20) if cash_open: session = "regular" elif weekday >= 5: session = "weekend" elif hour_utc < 13: session = "pre-market" else: session = "after-hours" out: dict[str, Any] = {} async with async_client(timeout=10.0) as client: for key, sym in tickers: info = await _fetch_yahoo_meta(client, sym, "5d") meta = info.get("meta") or {} price = meta.get("regularMarketPrice") prev = meta.get("previousClose") or meta.get("chartPreviousClose") change_pct = None if price is not None and prev: try: change_pct = round((float(price) - float(prev)) / float(prev) * 100, 3) except Exception: change_pct = None out[key] = { "price": float(price) if price is not None else None, "change_pct": change_pct, "session": session, } next_open = None if weekday < 5 and hour_utc < 13: next_open = now.replace(hour=13, minute=30, second=0, microsecond=0).isoformat() else: days_ahead = (7 - weekday) if weekday >= 5 else 1 nd = (now.replace(hour=13, minute=30, second=0, microsecond=0) + timedelta(days=days_ahead)) next_open = nd.isoformat() return { "futures": out, "session_status": { "cash_open": cash_open, "session": session, "next_open_utc": next_open, }, "data_timestamp": datetime.now(UTC).isoformat(), } _MARKET_CACHE: dict[str, Any] = {"data": None, "ts": 0.0} _MARKET_CACHE_TTL = 120.0 async def _fetch_yahoo_price(client: httpx.AsyncClient, symbol: str) -> float | None: try: resp = await client.get( YAHOO_CHART.format(symbol=symbol), params={"interval": "1d", "range": "5d"}, headers={"User-Agent": "Mozilla/5.0"}, ) if resp.status_code != 200: return None result = (resp.json().get("chart") or {}).get("result") or [] if not result: return None price = (result[0].get("meta") or {}).get("regularMarketPrice") return float(price) if price is not None else None except Exception: return None async def _fetch_dvol_latest(client: httpx.AsyncClient, currency: str) -> float | None: now_ms = int(datetime.now(UTC).timestamp() * 1000) start_ms = now_ms - 7 * 24 * 3600 * 1000 try: resp = await client.get( DERIBIT_DVOL, params={ "currency": currency, "start_timestamp": start_ms, "end_timestamp": now_ms, "resolution": "1D", }, ) rows = (resp.json().get("result") or {}).get("data") or [] if not rows: return None return float(rows[-1][4]) except Exception: return None async def fetch_economic_indicators( fred_api_key: str = "", indicators: list[str] | None = None, ) -> dict[str, Any]: series_map = { "fed_rate": "FEDFUNDS", "cpi": "CPIAUCSL", "unemployment": "UNRATE", "us10y_yield": "DGS10", } result: dict[str, Any] = {} if not fred_api_key: return {"indicators": result, "error": "No FRED API key configured"} async with async_client(timeout=10.0) as client: for name, series_id in series_map.items(): if indicators and name not in indicators: continue resp = await client.get( FRED_BASE, params={ "series_id": series_id, "api_key": fred_api_key, "file_type": "json", "sort_order": "desc", "limit": 1, }, ) data = resp.json() obs = data.get("observations", []) result[name] = float(obs[0]["value"]) if obs else None result["updated_at"] = datetime.now(UTC).isoformat() return result CURRENCY_TO_COUNTRY = { "USD": ("US", "United States"), "EUR": ("EU", "Euro Area"), "JPY": ("JP", "Japan"), "GBP": ("UK", "United Kingdom"), "CAD": ("CA", "Canada"), "AUD": ("AU", "Australia"), "NZD": ("NZ", "New Zealand"), "CHF": ("CH", "Switzerland"), "CNY": ("CN", "China"), } _HIGH_IMPACT_EVENTS = ( "fomc", "fed", "cpi", "nfp", "non-farm", "nonfarm", "ppi", "ecb", "boj", "boe", "gdp", "unemployment rate", ) def _market_impact_historical(name: str) -> str: n = (name or "").lower() for kw in _HIGH_IMPACT_EVENTS: if kw in n: return "high_vol_spike" return "normal" async def fetch_macro_calendar( finnhub_api_key: str = "", days_ahead: int = 7, country_filter: list[str] | None = None, importance_min: str | None = None, start: str | None = None, end: str | None = None, ) -> dict[str, Any]: """Fetch economic calendar con filtri country/importance/date range.""" events: list[dict[str, Any]] = [] importance_order = {"low": 0, "medium": 1, "high": 2} min_level = importance_order.get( (importance_min or "").lower(), 0 ) if importance_min else 0 start_dt: datetime | None = None end_dt: datetime | None = None if start: try: start_dt = datetime.fromisoformat(start).replace(tzinfo=UTC) except ValueError: start_dt = datetime.strptime(start, "%Y-%m-%d").replace(tzinfo=UTC) if end: try: end_dt = datetime.fromisoformat(end).replace(tzinfo=UTC) except ValueError: end_dt = datetime.strptime(end, "%Y-%m-%d").replace(tzinfo=UTC) country_filter_set = ( {c.upper() for c in country_filter} if country_filter else None ) # Try Forex Factory free feed first try: async with async_client(timeout=10.0) as client: resp = await client.get("https://nfs.faireconomy.media/ff_calendar_thisweek.json") if resp.status_code == 200: raw = resp.json() now = datetime.now(UTC) for e in raw: date_str = e.get("date", "") event_dt: datetime | None = None try: event_dt = datetime.fromisoformat(date_str.replace("Z", "+00:00")) if event_dt < now: continue except (ValueError, TypeError): pass currency = (e.get("country", "") or "").upper() country_code, country_name = CURRENCY_TO_COUNTRY.get( currency, (currency or "", e.get("country", "") or "") ) if country_filter_set and country_code not in country_filter_set: continue impact = (e.get("impact", "") or "").lower() importance = ( "high" if impact == "high" else "medium" if impact == "medium" else "low" ) if importance_order[importance] < min_level: continue if start_dt and event_dt and event_dt < start_dt: continue if end_dt and event_dt and event_dt > end_dt: continue name = e.get("title", "") events.append( { "date": date_str, "datetime_utc": event_dt.isoformat() if event_dt else date_str, "name": name, "event": name, "country": country_name, "country_code": country_code, "importance": importance, "forecast": e.get("forecast", ""), "previous": e.get("previous", ""), "actual": e.get("actual"), "market_impact_historical": _market_impact_historical(name), } ) except Exception: pass # Fallback to Finnhub if we have a key and no events if not events and finnhub_api_key: try: now = datetime.now(UTC) end_default = now + timedelta(days=days_ahead) async with async_client(timeout=10.0) as client: resp = await client.get( FINNHUB_CALENDAR, params={ "from": (start_dt or now).strftime("%Y-%m-%d"), "to": (end_dt or end_default).strftime("%Y-%m-%d"), "token": finnhub_api_key, }, ) data = resp.json() if isinstance(data, dict) and "error" in data: return {"events": [], "error": data["error"]} raw = data if isinstance(data, list) else data.get("economicCalendar", []) for e in raw: importance_raw = ( e.get("importance") or e.get("impact") or "medium" ) if isinstance(importance_raw, int): importance = ( "high" if importance_raw >= 3 else "medium" if importance_raw >= 2 else "low" ) else: importance = str(importance_raw).lower() if importance not in ("low", "medium", "high"): importance = "medium" if importance_order[importance] < min_level: continue country_code = (e.get("country", "") or "").upper() country_name = CURRENCY_TO_COUNTRY.get( country_code, (country_code, country_code) )[1] if country_filter_set and country_code not in country_filter_set: continue name = e.get("event", "") date_str = e.get("date", e.get("time", "")) events.append({ "date": date_str, "datetime_utc": date_str, "name": name, "event": name, "country": country_name, "country_code": country_code, "importance": importance, "forecast": e.get("forecast", ""), "previous": e.get("previous", e.get("prev", "")), "actual": e.get("actual"), "market_impact_historical": _market_impact_historical(name), }) except Exception: pass if not events: return {"events": [], "note": "No calendar source available"} return {"events": events} async def fetch_market_overview() -> dict[str, Any]: import time now = time.monotonic() if _MARKET_CACHE["data"] is not None and (now - _MARKET_CACHE["ts"]) < _MARKET_CACHE_TTL: return _MARKET_CACHE["data"] async with async_client(timeout=10.0) as client: global_data: dict[str, Any] = {} prices: dict[str, Any] = {} try: global_resp = await client.get(COINGECKO_GLOBAL) global_data = global_resp.json().get("data", {}) or {} except Exception: global_data = {} try: price_resp = await client.get( COINGECKO_SIMPLE, params={"ids": "bitcoin,ethereum", "vs_currencies": "usd"}, ) prices = price_resp.json() or {} except Exception: prices = {} dvol_btc = await _fetch_dvol_latest(client, "BTC") dvol_eth = await _fetch_dvol_latest(client, "ETH") sp500 = await _fetch_yahoo_price(client, "^GSPC") gold = await _fetch_yahoo_price(client, "GC=F") vix = await _fetch_yahoo_price(client, "^VIX") out = { "btc_dominance": global_data.get("market_cap_percentage", {}).get("btc"), "total_market_cap": global_data.get("total_market_cap", {}).get("usd"), "btc_price": prices.get("bitcoin", {}).get("usd"), "eth_price": prices.get("ethereum", {}).get("usd"), "sp500": sp500, "gold": gold, "vix": vix, "dvol_btc": dvol_btc, "dvol_eth": dvol_eth, "data_timestamp": datetime.now(UTC).isoformat(), } _MARKET_CACHE["data"] = out _MARKET_CACHE["ts"] = now return out _COT_TTL = 3600.0 # 1h _COT_CACHE: dict[tuple[str, str, int], dict[str, Any]] = {} _COT_CACHE_TS: dict[tuple[str, str, int], float] = {} async def fetch_cot_tff(symbol: str, lookback_weeks: int = 52) -> dict[str, Any]: """Fetch COT TFF report per simbolo equity/financial. Returns ASC by date.""" import time symbol = symbol.upper() if symbol not in SYMBOL_TO_CFTC_CODE_TFF: return {"error": "unknown_symbol", "available": ALL_TFF_SYMBOLS} key = (symbol, "tff", lookback_weeks) now = time.monotonic() if key in _COT_CACHE and (now - _COT_CACHE_TS[key]) < _COT_TTL: return _COT_CACHE[key] code = SYMBOL_TO_CFTC_CODE_TFF[symbol] url = f"{CFTC_BASE_URL}/{TFF_DATASET_ID}.json" async with async_client(timeout=10.0) as client: resp = await client.get( url, params={ "cftc_contract_market_code": code, "$order": "report_date_as_yyyy_mm_dd DESC", "$limit": str(lookback_weeks), }, ) if resp.status_code != 200: return {"symbol": symbol, "report_type": "tff", "rows": [], "error": "cftc_unavailable"} raw_rows = resp.json() or [] parsed = [parse_tff_row(r) for r in raw_rows] parsed.sort(key=lambda r: r["report_date"]) # ASC by date out = { "symbol": symbol, "report_type": "tff", "rows": parsed, "data_timestamp": datetime.now(UTC).isoformat(), } _COT_CACHE[key] = out _COT_CACHE_TS[key] = now return out