From dc285daac80f60c9245f8e91d194692f091be567 Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Wed, 29 Apr 2026 00:06:52 +0200 Subject: [PATCH] feat(mcp-macro): fetch_cot_extreme_positioning scanner Co-Authored-By: Claude Opus 4.7 (1M context) --- services/mcp-macro/src/mcp_macro/fetchers.py | 66 +++++++++++++++++++- services/mcp-macro/tests/test_fetchers.py | 38 +++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/services/mcp-macro/src/mcp_macro/fetchers.py b/services/mcp-macro/src/mcp_macro/fetchers.py index 7790bd4..838210e 100644 --- a/services/mcp-macro/src/mcp_macro/fetchers.py +++ b/services/mcp-macro/src/mcp_macro/fetchers.py @@ -5,7 +5,7 @@ from typing import Any import httpx from mcp_common.http import async_client -from mcp_macro.cot import parse_disagg_row, parse_tff_row +from mcp_macro.cot import classify_extreme, compute_percentile, parse_disagg_row, parse_tff_row from mcp_macro.cot_contracts import ( ALL_DISAGG_SYMBOLS, ALL_TFF_SYMBOLS, @@ -704,3 +704,67 @@ async def fetch_cot_disaggregated(symbol: str, lookback_weeks: int = 52) -> dict _COT_CACHE[key] = out _COT_CACHE_TS[key] = now return out + + +async def fetch_cot_extreme_positioning(lookback_weeks: int = 156) -> dict[str, Any]: + """Scanner posizionamento estremo (percentile <=5 o >=95) sui simboli watchlist. + + TFF -> key_role = lev_funds (lev_funds_net). + Disaggregated -> key_role = managed_money (managed_money_net). + """ + import asyncio + + tff_tasks = [fetch_cot_tff(s, lookback_weeks) for s in ALL_TFF_SYMBOLS] + disagg_tasks = [fetch_cot_disaggregated(s, lookback_weeks) for s in ALL_DISAGG_SYMBOLS] + tff_results, disagg_results = await asyncio.gather( + asyncio.gather(*tff_tasks, return_exceptions=True), + asyncio.gather(*disagg_tasks, return_exceptions=True), + ) + + extremes: list[dict[str, Any]] = [] + + for res in tff_results: + if isinstance(res, BaseException) or not isinstance(res, dict): + continue + rows = res.get("rows") or [] + if len(rows) < 4: + continue + series = [r["lev_funds_net"] for r in rows] + current = series[-1] + history = series[:-1] + pct = compute_percentile(current, history) + extremes.append({ + "symbol": res["symbol"], + "report_type": "tff", + "key_role": "lev_funds", + "current_net": current, + "percentile": pct, + "signal": classify_extreme(pct), + "report_date": rows[-1]["report_date"], + }) + + for res in disagg_results: + if isinstance(res, BaseException) or not isinstance(res, dict): + continue + rows = res.get("rows") or [] + if len(rows) < 4: + continue + series = [r["managed_money_net"] for r in rows] + current = series[-1] + history = series[:-1] + pct = compute_percentile(current, history) + extremes.append({ + "symbol": res["symbol"], + "report_type": "disaggregated", + "key_role": "managed_money", + "current_net": current, + "percentile": pct, + "signal": classify_extreme(pct), + "report_date": rows[-1]["report_date"], + }) + + return { + "lookback_weeks": lookback_weeks, + "extremes": extremes, + "data_timestamp": datetime.now(UTC).isoformat(), + } diff --git a/services/mcp-macro/tests/test_fetchers.py b/services/mcp-macro/tests/test_fetchers.py index 919b94a..a891506 100644 --- a/services/mcp-macro/tests/test_fetchers.py +++ b/services/mcp-macro/tests/test_fetchers.py @@ -361,3 +361,41 @@ async def test_fetch_cot_disagg_unknown_symbol(): assert out.get("error") == "unknown_symbol" assert "CL" in out.get("available", []) + +@pytest.mark.asyncio +async def test_fetch_cot_extreme_positioning_flags_outliers(monkeypatch): + """Mock fetch_cot_tff e fetch_cot_disagg per simulare history e ultimo punto.""" + from unittest.mock import AsyncMock + from mcp_macro import fetchers as f + + # Simula una serie ES dove ultimo lev_funds_net รจ in basso (extreme_short) + es_rows = [ + {"report_date": f"2026-{m:02d}-01", "lev_funds_net": v} + for m, v in [(1, 0), (2, 50), (3, 100), (4, -500)] + ] + cl_rows = [ + {"report_date": f"2026-{m:02d}-01", "managed_money_net": v} + for m, v in [(1, 100), (2, 200), (3, 300), (4, 1000)] + ] + + async def fake_tff(symbol, lookback_weeks): + if symbol == "ES": + return {"symbol": "ES", "report_type": "tff", "rows": es_rows} + return {"symbol": symbol, "report_type": "tff", "rows": []} + + async def fake_disagg(symbol, lookback_weeks): + if symbol == "CL": + return {"symbol": "CL", "report_type": "disaggregated", "rows": cl_rows} + return {"symbol": symbol, "report_type": "disaggregated", "rows": []} + + monkeypatch.setattr(f, "fetch_cot_tff", AsyncMock(side_effect=fake_tff)) + monkeypatch.setattr(f, "fetch_cot_disaggregated", AsyncMock(side_effect=fake_disagg)) + + out = await f.fetch_cot_extreme_positioning(lookback_weeks=4) + assert "extremes" in out + by_sym = {e["symbol"]: e for e in out["extremes"]} + assert by_sym["ES"]["signal"] == "extreme_short" + assert by_sym["ES"]["key_role"] == "lev_funds" + assert by_sym["CL"]["signal"] == "extreme_long" + assert by_sym["CL"]["key_role"] == "managed_money" +