diff --git a/docs/superpowers/plans/2026-04-27-cot-report.md b/docs/superpowers/plans/2026-04-27-cot-report.md new file mode 100644 index 0000000..b85692a --- /dev/null +++ b/docs/superpowers/plans/2026-04-27-cot-report.md @@ -0,0 +1,1044 @@ +# COT Report Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Aggiungere a `mcp-macro` 3 tool MCP per fetchare il Commitment of Traders (COT) report CFTC: TFF per equity/financial, Disaggregated per commodities, scanner di extreme positioning. + +**Architecture:** Pure-logic helpers (parsing + percentile + classify) testati separatamente; fetcher async che usano `mcp_common.http.async_client` (retry transport in place); cache in-memory TTL 1h riusando il pattern di `_TREASURY_CACHE`; tre endpoint server con Pydantic body models, ACL `core+observer`. + +**Tech Stack:** Python 3.13, FastAPI, httpx (via `mcp_common.http.async_client`), pytest + pytest-httpx, Pydantic v2. + +**Spec di riferimento:** `docs/superpowers/specs/2026-04-27-cot-report-design.md` + +--- + +## File Structure + +```text +services/mcp-macro/ + src/mcp_macro/ + cot_contracts.py # NUOVO — costanti SYMBOL_TO_CFTC_CODE_TFF, _DISAGG, FIELD_MAPPINGS + cot.py # NUOVO — pure-logic: parse_tff_row, parse_disagg_row, + # compute_percentile, classify_extreme + fetchers.py # MODIFY — aggiunge fetch_cot_tff, fetch_cot_disaggregated, + # fetch_cot_extreme_positioning + cache + server.py # MODIFY — aggiunge 3 endpoint + 3 body model + tests/ + test_cot.py # NUOVO — test pure-logic (parsing, percentile, classify) + test_fetchers.py # MODIFY — integration test cot fetchers via httpx_mock + test_server_acl.py # MODIFY — ACL test sui 3 nuovi endpoint +``` + +Decomposizione: `cot_contracts.py` ha solo costanti (zero logica). `cot.py` ha funzioni pure (testabili senza HTTP). `fetchers.py` orchestra HTTP + cache. `server.py` espone endpoint + ACL. Files in `tests/` rispecchiano la struttura. + +--- + +### Task 1: Costanti CFTC contract codes + field mappings + +**Files:** + +- Create: `services/mcp-macro/src/mcp_macro/cot_contracts.py` + +- [ ] **Step 1: Crea il modulo costanti** + +```python +# services/mcp-macro/src/mcp_macro/cot_contracts.py +"""Costanti CFTC: ticker → contract_market_code per TFF e Disaggregated. + +I codici CFTC (`cftc_contract_market_code`) sono pubblici e stabili nel tempo. +Riferimento: https://www.cftc.gov/MarketReports/CommitmentsofTraders/ +""" +from __future__ import annotations + +CFTC_BASE_URL = "https://publicreporting.cftc.gov/resource" +TFF_DATASET_ID = "gpe5-46if" +DISAGG_DATASET_ID = "72hh-3qpy" + +# TFF: equity/financial. Mapping ticker → cftc_contract_market_code. +SYMBOL_TO_CFTC_CODE_TFF: dict[str, str] = { + "ES": "13874A", # E-mini S&P 500 + "NQ": "209742", # E-mini Nasdaq-100 + "RTY": "239742", # E-mini Russell 2000 + "ZN": "043602", # 10-Year T-Note + "ZB": "020601", # 30-Year T-Bond + "6E": "099741", # Euro FX + "6J": "097741", # Japanese Yen + "DX": "098662", # US Dollar Index +} + +# Disaggregated: commodities. +SYMBOL_TO_CFTC_CODE_DISAGG: dict[str, str] = { + "CL": "067651", # Crude Oil WTI + "GC": "088691", # Gold + "SI": "084691", # Silver + "HG": "085692", # Copper + "ZW": "001602", # Wheat + "ZC": "002602", # Corn + "ZS": "005602", # Soybeans +} + +ALL_TFF_SYMBOLS: list[str] = list(SYMBOL_TO_CFTC_CODE_TFF.keys()) +ALL_DISAGG_SYMBOLS: list[str] = list(SYMBOL_TO_CFTC_CODE_DISAGG.keys()) +``` + +- [ ] **Step 2: Commit** + +```bash +git add services/mcp-macro/src/mcp_macro/cot_contracts.py +git commit -m "feat(mcp-macro): add CFTC contract codes constants for COT report" +``` + +--- + +### Task 2: Pure-logic — `compute_percentile` + `classify_extreme` + +**Files:** + +- Create: `services/mcp-macro/src/mcp_macro/cot.py` (parziale, solo helper percentile) +- Create: `services/mcp-macro/tests/test_cot.py` + +- [ ] **Step 1: Scrivi i test failing** + +```python +# services/mcp-macro/tests/test_cot.py +from __future__ import annotations + +from mcp_macro.cot import classify_extreme, compute_percentile + + +def test_compute_percentile_basic(): + history = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] + assert compute_percentile(50, history) == 50.0 + assert compute_percentile(10, history) == 10.0 + assert compute_percentile(100, history) == 100.0 + + +def test_compute_percentile_value_below_min(): + history = [10, 20, 30] + assert compute_percentile(5, history) == 0.0 + + +def test_compute_percentile_value_above_max(): + history = [10, 20, 30] + assert compute_percentile(40, history) == 100.0 + + +def test_compute_percentile_empty_history(): + assert compute_percentile(50, []) is None + + +def test_classify_extreme_below_threshold(): + assert classify_extreme(3.0) == "extreme_short" + assert classify_extreme(5.0) == "extreme_short" # boundary inclusive + + +def test_classify_extreme_above_threshold(): + assert classify_extreme(96.0) == "extreme_long" + assert classify_extreme(95.0) == "extreme_long" # boundary inclusive + + +def test_classify_extreme_neutral(): + assert classify_extreme(50.0) == "neutral" + assert classify_extreme(94.99) == "neutral" + assert classify_extreme(5.01) == "neutral" + + +def test_classify_extreme_none_input(): + assert classify_extreme(None) == "neutral" +``` + +- [ ] **Step 2: Run, verifica fail** + +Run: `uv run pytest services/mcp-macro/tests/test_cot.py -v` +Expected: ImportError (modulo `mcp_macro.cot` non esiste ancora). + +- [ ] **Step 3: Implementa `cot.py` con i 2 helper** + +```python +# services/mcp-macro/src/mcp_macro/cot.py +"""Pure-logic helpers per COT report parsing e analytics. + +Niente HTTP qui — orchestrazione fetch sta in fetchers.py. Tutto testabile +in isolamento. +""" +from __future__ import annotations + +from typing import Literal + +ExtremeSignal = Literal["extreme_short", "extreme_long", "neutral"] + + +def compute_percentile(value: float, history: list[float]) -> float | None: + """Percentile di `value` rispetto ad `history` (0-100, inclusive). + + Restituisce None se history vuoto. Clipped a [0, 100] se value fuori range. + """ + if not history: + return None + n = len(history) + below_or_eq = sum(1 for h in history if h <= value) + pct = 100.0 * below_or_eq / n + return max(0.0, min(100.0, pct)) + + +def classify_extreme(percentile: float | None, threshold: float = 5.0) -> ExtremeSignal: + """Classifica un percentile come estremo short/long o neutral. + + threshold default 5 → flagga ≤ 5 come short, ≥ 100-5=95 come long. + """ + if percentile is None: + return "neutral" + if percentile <= threshold: + return "extreme_short" + if percentile >= 100.0 - threshold: + return "extreme_long" + return "neutral" +``` + +- [ ] **Step 4: Run, verifica pass** + +Run: `uv run pytest services/mcp-macro/tests/test_cot.py -v` +Expected: 8 passed. + +- [ ] **Step 5: Commit** + +```bash +git add services/mcp-macro/src/mcp_macro/cot.py services/mcp-macro/tests/test_cot.py +git commit -m "feat(mcp-macro): add compute_percentile + classify_extreme pure helpers" +``` + +--- + +### Task 3: Pure-logic — `parse_tff_row` + `parse_disagg_row` + +**Files:** + +- Modify: `services/mcp-macro/src/mcp_macro/cot.py` (aggiunge parser) +- Modify: `services/mcp-macro/tests/test_cot.py` + +- [ ] **Step 1: Aggiungi i test failing** + +Append a `services/mcp-macro/tests/test_cot.py`: + +```python +from mcp_macro.cot import parse_disagg_row, parse_tff_row + + +# Payload Socrata reale (subset campi rilevanti, valori arbitrari per test) +TFF_SOCRATA_ROW = { + "report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000", + "dealer_positions_long_all": "12345", + "dealer_positions_short_all": "23456", + "asset_mgr_positions_long": "654321", + "asset_mgr_positions_short": "200000", + "lev_money_positions_long": "100000", + "lev_money_positions_short": "350000", + "other_rept_positions_long": "50000", + "other_rept_positions_short": "50000", + "open_interest_all": "2500000", +} + +DISAGG_SOCRATA_ROW = { + "report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000", + "prod_merc_positions_long_all": "100000", + "prod_merc_positions_short_all": "300000", + "swap_positions_long_all": "50000", + "swap_positions_short_all": "60000", + "m_money_positions_long_all": "200000", + "m_money_positions_short_all": "80000", + "other_rept_positions_long_all": "10000", + "other_rept_positions_short_all": "10000", + "open_interest_all": "1500000", +} + + +def test_parse_tff_row_extracts_all_fields(): + row = parse_tff_row(TFF_SOCRATA_ROW) + assert row["report_date"] == "2026-04-22" + assert row["dealer_long"] == 12345 + assert row["dealer_short"] == 23456 + assert row["dealer_net"] == 12345 - 23456 + assert row["asset_mgr_long"] == 654321 + assert row["asset_mgr_net"] == 654321 - 200000 + assert row["lev_funds_long"] == 100000 + assert row["lev_funds_short"] == 350000 + assert row["lev_funds_net"] == 100000 - 350000 + assert row["other_long"] == 50000 + assert row["other_net"] == 0 + assert row["open_interest"] == 2500000 + + +def test_parse_tff_row_handles_missing_field(): + payload = {"report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000"} + row = parse_tff_row(payload) + assert row["report_date"] == "2026-04-22" + assert row["dealer_long"] == 0 + assert row["dealer_net"] == 0 + + +def test_parse_disagg_row_extracts_all_fields(): + row = parse_disagg_row(DISAGG_SOCRATA_ROW) + assert row["report_date"] == "2026-04-22" + assert row["producer_long"] == 100000 + assert row["producer_short"] == 300000 + assert row["producer_net"] == -200000 + assert row["swap_long"] == 50000 + assert row["swap_net"] == -10000 + assert row["managed_money_long"] == 200000 + assert row["managed_money_short"] == 80000 + assert row["managed_money_net"] == 120000 + assert row["other_long"] == 10000 + assert row["other_net"] == 0 + assert row["open_interest"] == 1500000 +``` + +- [ ] **Step 2: Run, verifica fail** + +Run: `uv run pytest services/mcp-macro/tests/test_cot.py -v` +Expected: ImportError o AttributeError su `parse_tff_row`/`parse_disagg_row`. + +- [ ] **Step 3: Aggiungi i parser a `cot.py`** + +Append a `services/mcp-macro/src/mcp_macro/cot.py`: + +```python +def _to_int(v) -> int: + try: + return int(float(v)) + except (TypeError, ValueError): + return 0 + + +def _date_only(s: str) -> str: + """Estrae 'YYYY-MM-DD' da una data ISO con o senza timestamp.""" + if not s: + return "" + return s.split("T", 1)[0] + + +def parse_tff_row(raw: dict) -> dict: + """Mappa una row Socrata TFF al formato API output.""" + dl = _to_int(raw.get("dealer_positions_long_all")) + ds = _to_int(raw.get("dealer_positions_short_all")) + al = _to_int(raw.get("asset_mgr_positions_long")) + as_ = _to_int(raw.get("asset_mgr_positions_short")) + ll = _to_int(raw.get("lev_money_positions_long")) + ls = _to_int(raw.get("lev_money_positions_short")) + ol = _to_int(raw.get("other_rept_positions_long")) + os_ = _to_int(raw.get("other_rept_positions_short")) + return { + "report_date": _date_only(raw.get("report_date_as_yyyy_mm_dd", "")), + "dealer_long": dl, "dealer_short": ds, "dealer_net": dl - ds, + "asset_mgr_long": al, "asset_mgr_short": as_, "asset_mgr_net": al - as_, + "lev_funds_long": ll, "lev_funds_short": ls, "lev_funds_net": ll - ls, + "other_long": ol, "other_short": os_, "other_net": ol - os_, + "open_interest": _to_int(raw.get("open_interest_all")), + } + + +def parse_disagg_row(raw: dict) -> dict: + """Mappa una row Socrata Disaggregated F&O combined al formato API output.""" + pl = _to_int(raw.get("prod_merc_positions_long_all")) + ps = _to_int(raw.get("prod_merc_positions_short_all")) + sl = _to_int(raw.get("swap_positions_long_all")) + ss = _to_int(raw.get("swap_positions_short_all")) + ml = _to_int(raw.get("m_money_positions_long_all")) + ms = _to_int(raw.get("m_money_positions_short_all")) + ol = _to_int(raw.get("other_rept_positions_long_all")) + os_ = _to_int(raw.get("other_rept_positions_short_all")) + return { + "report_date": _date_only(raw.get("report_date_as_yyyy_mm_dd", "")), + "producer_long": pl, "producer_short": ps, "producer_net": pl - ps, + "swap_long": sl, "swap_short": ss, "swap_net": sl - ss, + "managed_money_long": ml, "managed_money_short": ms, "managed_money_net": ml - ms, + "other_long": ol, "other_short": os_, "other_net": ol - os_, + "open_interest": _to_int(raw.get("open_interest_all")), + } +``` + +- [ ] **Step 4: Run, verifica pass** + +Run: `uv run pytest services/mcp-macro/tests/test_cot.py -v` +Expected: 11 passed (8 + 3 nuovi). + +- [ ] **Step 5: Commit** + +```bash +git add services/mcp-macro/src/mcp_macro/cot.py services/mcp-macro/tests/test_cot.py +git commit -m "feat(mcp-macro): add parse_tff_row + parse_disagg_row Socrata mappers" +``` + +--- + +### Task 4: `fetch_cot_tff` async fetcher + +**Files:** + +- Modify: `services/mcp-macro/src/mcp_macro/fetchers.py` +- Modify: `services/mcp-macro/tests/test_fetchers.py` + +- [ ] **Step 1: Aggiungi test failing** + +Append a `services/mcp-macro/tests/test_fetchers.py`: + +```python +@pytest.mark.asyncio +async def test_fetch_cot_tff_happy_path(httpx_mock: pytest_httpx.HTTPXMock): + from mcp_macro.fetchers import fetch_cot_tff + httpx_mock.add_response( + url=httpx.URL( + "https://publicreporting.cftc.gov/resource/gpe5-46if.json", + params={ + "cftc_contract_market_code": "13874A", + "$order": "report_date_as_yyyy_mm_dd DESC", + "$limit": "52", + }, + ), + json=[ + { + "report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000", + "dealer_positions_long_all": "12345", + "dealer_positions_short_all": "23456", + "asset_mgr_positions_long": "654321", + "asset_mgr_positions_short": "200000", + "lev_money_positions_long": "100000", + "lev_money_positions_short": "350000", + "other_rept_positions_long": "50000", + "other_rept_positions_short": "50000", + "open_interest_all": "2500000", + }, + { + "report_date_as_yyyy_mm_dd": "2026-04-15T00:00:00.000", + "dealer_positions_long_all": "11000", + "dealer_positions_short_all": "22000", + "asset_mgr_positions_long": "640000", + "asset_mgr_positions_short": "210000", + "lev_money_positions_long": "110000", + "lev_money_positions_short": "320000", + "other_rept_positions_long": "48000", + "other_rept_positions_short": "52000", + "open_interest_all": "2480000", + }, + ], + ) + out = await fetch_cot_tff("ES", lookback_weeks=52) + assert out["symbol"] == "ES" + assert out["report_type"] == "tff" + assert len(out["rows"]) == 2 + # Ordering ASC by date (oldest first) + assert out["rows"][0]["report_date"] == "2026-04-15" + assert out["rows"][1]["report_date"] == "2026-04-22" + assert out["rows"][1]["lev_funds_net"] == -250000 + assert "data_timestamp" in out + + +@pytest.mark.asyncio +async def test_fetch_cot_tff_unknown_symbol(): + from mcp_macro.fetchers import fetch_cot_tff + out = await fetch_cot_tff("INVALID", lookback_weeks=52) + assert out.get("error") == "unknown_symbol" + assert "ES" in out.get("available", []) +``` + +- [ ] **Step 2: Run, verifica fail** + +Run: `uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k cot_tff` +Expected: ImportError o AttributeError su `fetch_cot_tff`. + +- [ ] **Step 3: Implementa `fetch_cot_tff`** + +Aggiungi a `services/mcp-macro/src/mcp_macro/fetchers.py` (in cima, dopo gli altri import): + +```python +from mcp_macro.cot import parse_disagg_row, parse_tff_row +from mcp_macro.cot_contracts import ( + ALL_DISAGG_SYMBOLS, + ALL_TFF_SYMBOLS, + CFTC_BASE_URL, + DISAGG_DATASET_ID, + SYMBOL_TO_CFTC_CODE_DISAGG, + SYMBOL_TO_CFTC_CODE_TFF, + TFF_DATASET_ID, +) +``` + +Quindi (in fondo al file, prima di altre funzioni nuove): + +```python +_COT_TTL = 3600.0 # 1h +_COT_CACHE: dict[tuple[str, str, int], dict[str, Any]] = {} +_COT_CACHE_TS: dict[tuple[str, str, int], float] = {} + + +async def fetch_cot_tff(symbol: str, lookback_weeks: int = 52) -> dict[str, Any]: + """Fetch COT TFF report per simbolo equity/financial. Returns ASC by date.""" + import time + + symbol = symbol.upper() + if symbol not in SYMBOL_TO_CFTC_CODE_TFF: + return {"error": "unknown_symbol", "available": ALL_TFF_SYMBOLS} + + key = (symbol, "tff", lookback_weeks) + now = time.monotonic() + if key in _COT_CACHE and (now - _COT_CACHE_TS[key]) < _COT_TTL: + return _COT_CACHE[key] + + code = SYMBOL_TO_CFTC_CODE_TFF[symbol] + url = f"{CFTC_BASE_URL}/{TFF_DATASET_ID}.json" + async with async_client(timeout=10.0) as client: + resp = await client.get( + url, + params={ + "cftc_contract_market_code": code, + "$order": "report_date_as_yyyy_mm_dd DESC", + "$limit": str(lookback_weeks), + }, + ) + if resp.status_code != 200: + return {"symbol": symbol, "report_type": "tff", "rows": [], "error": "cftc_unavailable"} + raw_rows = resp.json() or [] + parsed = [parse_tff_row(r) for r in raw_rows] + parsed.sort(key=lambda r: r["report_date"]) # ASC by date + out = { + "symbol": symbol, + "report_type": "tff", + "rows": parsed, + "data_timestamp": datetime.now(UTC).isoformat(), + } + _COT_CACHE[key] = out + _COT_CACHE_TS[key] = now + return out +``` + +- [ ] **Step 4: Run, verifica pass** + +Run: `uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k cot_tff` +Expected: 2 passed. + +- [ ] **Step 5: Commit** + +```bash +git add services/mcp-macro/src/mcp_macro/fetchers.py services/mcp-macro/tests/test_fetchers.py +git commit -m "feat(mcp-macro): fetch_cot_tff async fetcher with cache" +``` + +--- + +### Task 5: `fetch_cot_disaggregated` async fetcher + +**Files:** + +- Modify: `services/mcp-macro/src/mcp_macro/fetchers.py` +- Modify: `services/mcp-macro/tests/test_fetchers.py` + +- [ ] **Step 1: Aggiungi test failing** + +Append a `services/mcp-macro/tests/test_fetchers.py`: + +```python +@pytest.mark.asyncio +async def test_fetch_cot_disagg_happy_path(httpx_mock: pytest_httpx.HTTPXMock): + from mcp_macro.fetchers import fetch_cot_disaggregated + httpx_mock.add_response( + url=httpx.URL( + "https://publicreporting.cftc.gov/resource/72hh-3qpy.json", + params={ + "cftc_contract_market_code": "067651", + "$order": "report_date_as_yyyy_mm_dd DESC", + "$limit": "52", + }, + ), + json=[ + { + "report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000", + "prod_merc_positions_long_all": "100000", + "prod_merc_positions_short_all": "300000", + "swap_positions_long_all": "50000", + "swap_positions_short_all": "60000", + "m_money_positions_long_all": "200000", + "m_money_positions_short_all": "80000", + "other_rept_positions_long_all": "10000", + "other_rept_positions_short_all": "10000", + "open_interest_all": "1500000", + }, + ], + ) + out = await fetch_cot_disaggregated("CL", lookback_weeks=52) + assert out["symbol"] == "CL" + assert out["report_type"] == "disaggregated" + assert len(out["rows"]) == 1 + assert out["rows"][0]["managed_money_net"] == 120000 + assert out["rows"][0]["producer_net"] == -200000 + + +@pytest.mark.asyncio +async def test_fetch_cot_disagg_unknown_symbol(): + from mcp_macro.fetchers import fetch_cot_disaggregated + out = await fetch_cot_disaggregated("XYZ", lookback_weeks=52) + assert out.get("error") == "unknown_symbol" + assert "CL" in out.get("available", []) +``` + +- [ ] **Step 2: Run, verifica fail** + +Run: `uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k cot_disagg` +Expected: AttributeError su `fetch_cot_disaggregated`. + +- [ ] **Step 3: Implementa `fetch_cot_disaggregated`** + +Append a `services/mcp-macro/src/mcp_macro/fetchers.py` (dopo `fetch_cot_tff`): + +```python +async def fetch_cot_disaggregated(symbol: str, lookback_weeks: int = 52) -> dict[str, Any]: + """Fetch COT Disaggregated report per simbolo commodity. Returns ASC by date.""" + import time + + symbol = symbol.upper() + if symbol not in SYMBOL_TO_CFTC_CODE_DISAGG: + return {"error": "unknown_symbol", "available": ALL_DISAGG_SYMBOLS} + + key = (symbol, "disaggregated", lookback_weeks) + now = time.monotonic() + if key in _COT_CACHE and (now - _COT_CACHE_TS[key]) < _COT_TTL: + return _COT_CACHE[key] + + code = SYMBOL_TO_CFTC_CODE_DISAGG[symbol] + url = f"{CFTC_BASE_URL}/{DISAGG_DATASET_ID}.json" + async with async_client(timeout=10.0) as client: + resp = await client.get( + url, + params={ + "cftc_contract_market_code": code, + "$order": "report_date_as_yyyy_mm_dd DESC", + "$limit": str(lookback_weeks), + }, + ) + if resp.status_code != 200: + return {"symbol": symbol, "report_type": "disaggregated", "rows": [], "error": "cftc_unavailable"} + raw_rows = resp.json() or [] + parsed = [parse_disagg_row(r) for r in raw_rows] + parsed.sort(key=lambda r: r["report_date"]) + out = { + "symbol": symbol, + "report_type": "disaggregated", + "rows": parsed, + "data_timestamp": datetime.now(UTC).isoformat(), + } + _COT_CACHE[key] = out + _COT_CACHE_TS[key] = now + return out +``` + +- [ ] **Step 4: Run, verifica pass** + +Run: `uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k cot_disagg` +Expected: 2 passed. + +- [ ] **Step 5: Commit** + +```bash +git add services/mcp-macro/src/mcp_macro/fetchers.py services/mcp-macro/tests/test_fetchers.py +git commit -m "feat(mcp-macro): fetch_cot_disaggregated async fetcher with cache" +``` + +--- + +### Task 6: `fetch_cot_extreme_positioning` scanner + +**Files:** + +- Modify: `services/mcp-macro/src/mcp_macro/fetchers.py` +- Modify: `services/mcp-macro/tests/test_fetchers.py` + +- [ ] **Step 1: Aggiungi test failing** + +Append a `services/mcp-macro/tests/test_fetchers.py`: + +```python +@pytest.mark.asyncio +async def test_fetch_cot_extreme_positioning_flags_outliers(monkeypatch): + """Mock fetch_cot_tff e fetch_cot_disagg per simulare history e ultimo punto.""" + from unittest.mock import AsyncMock + from mcp_macro import fetchers as f + + # Simula una serie ES dove ultimo lev_funds_net è in basso (extreme_short) + es_rows = [ + {"report_date": f"2026-{m:02d}-01", "lev_funds_net": v} + for m, v in [(1, 0), (2, 50), (3, 100), (4, -500)] + ] + cl_rows = [ + {"report_date": f"2026-{m:02d}-01", "managed_money_net": v} + for m, v in [(1, 100), (2, 200), (3, 300), (4, 1000)] + ] + + async def fake_tff(symbol, lookback_weeks): + if symbol == "ES": + return {"symbol": "ES", "report_type": "tff", "rows": es_rows} + return {"symbol": symbol, "report_type": "tff", "rows": []} + + async def fake_disagg(symbol, lookback_weeks): + if symbol == "CL": + return {"symbol": "CL", "report_type": "disaggregated", "rows": cl_rows} + return {"symbol": symbol, "report_type": "disaggregated", "rows": []} + + monkeypatch.setattr(f, "fetch_cot_tff", AsyncMock(side_effect=fake_tff)) + monkeypatch.setattr(f, "fetch_cot_disaggregated", AsyncMock(side_effect=fake_disagg)) + + out = await f.fetch_cot_extreme_positioning(lookback_weeks=4) + assert "extremes" in out + by_sym = {e["symbol"]: e for e in out["extremes"]} + assert by_sym["ES"]["signal"] == "extreme_short" + assert by_sym["ES"]["key_role"] == "lev_funds" + assert by_sym["CL"]["signal"] == "extreme_long" + assert by_sym["CL"]["key_role"] == "managed_money" +``` + +- [ ] **Step 2: Run, verifica fail** + +Run: `uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k extreme_positioning` +Expected: AttributeError su `fetch_cot_extreme_positioning`. + +- [ ] **Step 3: Implementa lo scanner** + +Append a `services/mcp-macro/src/mcp_macro/fetchers.py`: + +```python +from mcp_macro.cot import classify_extreme, compute_percentile + + +async def fetch_cot_extreme_positioning(lookback_weeks: int = 156) -> dict[str, Any]: + """Scanner posizionamento estremo (percentile ≤5 o ≥95) sui simboli watchlist. + + TFF → key_role = lev_funds (lev_funds_net). + Disaggregated → key_role = managed_money (managed_money_net). + """ + import asyncio + + tff_tasks = [fetch_cot_tff(s, lookback_weeks) for s in ALL_TFF_SYMBOLS] + disagg_tasks = [fetch_cot_disaggregated(s, lookback_weeks) for s in ALL_DISAGG_SYMBOLS] + tff_results, disagg_results = await asyncio.gather( + asyncio.gather(*tff_tasks, return_exceptions=True), + asyncio.gather(*disagg_tasks, return_exceptions=True), + ) + + extremes: list[dict[str, Any]] = [] + + for res in tff_results: + if isinstance(res, BaseException) or not isinstance(res, dict): + continue + rows = res.get("rows") or [] + if len(rows) < 4: + continue + history = [r["lev_funds_net"] for r in rows] + current = history[-1] + pct = compute_percentile(current, history) + extremes.append({ + "symbol": res["symbol"], + "report_type": "tff", + "key_role": "lev_funds", + "current_net": current, + "percentile": pct, + "signal": classify_extreme(pct), + "report_date": rows[-1]["report_date"], + }) + + for res in disagg_results: + if isinstance(res, BaseException) or not isinstance(res, dict): + continue + rows = res.get("rows") or [] + if len(rows) < 4: + continue + history = [r["managed_money_net"] for r in rows] + current = history[-1] + pct = compute_percentile(current, history) + extremes.append({ + "symbol": res["symbol"], + "report_type": "disaggregated", + "key_role": "managed_money", + "current_net": current, + "percentile": pct, + "signal": classify_extreme(pct), + "report_date": rows[-1]["report_date"], + }) + + return { + "lookback_weeks": lookback_weeks, + "extremes": extremes, + "data_timestamp": datetime.now(UTC).isoformat(), + } +``` + +- [ ] **Step 4: Run, verifica pass** + +Run: `uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k extreme_positioning` +Expected: 1 passed. + +- [ ] **Step 5: Commit** + +```bash +git add services/mcp-macro/src/mcp_macro/fetchers.py services/mcp-macro/tests/test_fetchers.py +git commit -m "feat(mcp-macro): fetch_cot_extreme_positioning scanner" +``` + +--- + +### Task 7: Server endpoints + body models + +**Files:** + +- Modify: `services/mcp-macro/src/mcp_macro/server.py` +- Modify: `services/mcp-macro/tests/test_server_acl.py` + +- [ ] **Step 1: Aggiungi test ACL failing** + +Append a `services/mcp-macro/tests/test_server_acl.py` (assumendo `http` fixture esiste e patcha `fetch_treasury_yields`): + +```python +def test_get_cot_tff_core_ok(http): + from unittest.mock import AsyncMock, patch + with patch( + "mcp_macro.server.fetch_cot_tff", + new=AsyncMock(return_value={"symbol": "ES", "rows": []}), + ): + r = http.post( + "/tools/get_cot_tff", + headers={"Authorization": "Bearer ct"}, + json={"symbol": "ES"}, + ) + assert r.status_code == 200 + assert r.json()["symbol"] == "ES" + + +def test_get_cot_tff_observer_ok(http): + from unittest.mock import AsyncMock, patch + with patch( + "mcp_macro.server.fetch_cot_tff", + new=AsyncMock(return_value={"symbol": "ES", "rows": []}), + ): + r = http.post( + "/tools/get_cot_tff", + headers={"Authorization": "Bearer ot"}, + json={"symbol": "ES"}, + ) + assert r.status_code == 200 + + +def test_get_cot_tff_no_auth_401(http): + r = http.post("/tools/get_cot_tff", json={"symbol": "ES"}) + assert r.status_code == 401 + + +def test_get_cot_disagg_observer_ok(http): + from unittest.mock import AsyncMock, patch + with patch( + "mcp_macro.server.fetch_cot_disaggregated", + new=AsyncMock(return_value={"symbol": "CL", "rows": []}), + ): + r = http.post( + "/tools/get_cot_disaggregated", + headers={"Authorization": "Bearer ot"}, + json={"symbol": "CL"}, + ) + assert r.status_code == 200 + + +def test_get_cot_disagg_no_auth_401(http): + r = http.post("/tools/get_cot_disaggregated", json={"symbol": "CL"}) + assert r.status_code == 401 + + +def test_get_cot_extreme_positioning_ok(http): + from unittest.mock import AsyncMock, patch + with patch( + "mcp_macro.server.fetch_cot_extreme_positioning", + new=AsyncMock(return_value={"extremes": []}), + ): + r = http.post( + "/tools/get_cot_extreme_positioning", + headers={"Authorization": "Bearer ot"}, + json={}, + ) + assert r.status_code == 200 + + +def test_get_cot_extreme_positioning_lookback_too_short(http): + """Pydantic validation: lookback_weeks < 4 → 422.""" + r = http.post( + "/tools/get_cot_extreme_positioning", + headers={"Authorization": "Bearer ct"}, + json={"lookback_weeks": 2}, + ) + assert r.status_code == 422 +``` + +(Se nel `test_server_acl.py` esistente le auth header sono diverse, es. `Bearer core-tok`, adatta i token in queste 7 test per matchare la fixture esistente.) + +- [ ] **Step 2: Run, verifica fail** + +Run: `uv run pytest services/mcp-macro/tests/test_server_acl.py -v -k cot` +Expected: 404 sui POST (endpoint non esistono ancora). + +- [ ] **Step 3: Aggiungi import + body models a `server.py`** + +Modifica `services/mcp-macro/src/mcp_macro/server.py`: + +Sostituisci l'import block dei fetcher con: + +```python +from mcp_macro.fetchers import ( + fetch_asset_price, + fetch_breakeven_inflation, + fetch_cot_disaggregated, + fetch_cot_extreme_positioning, + fetch_cot_tff, + fetch_economic_indicators, + fetch_equity_futures, + fetch_macro_calendar, + fetch_market_overview, + fetch_treasury_yields, + fetch_yield_curve_slope, +) +``` + +Sostituisci l'import pydantic con: + +```python +from pydantic import BaseModel, Field +``` + +Aggiungi (dopo `class GetBreakevenInflationReq(BaseModel):` o equivalente, prima della funzione `_check`): + +```python +class GetCotTffReq(BaseModel): + symbol: str + lookback_weeks: int = Field(default=52, ge=4, le=520) + + +class GetCotDisaggregatedReq(BaseModel): + symbol: str + lookback_weeks: int = Field(default=52, ge=4, le=520) + + +class GetCotExtremeReq(BaseModel): + lookback_weeks: int = Field(default=156, ge=4, le=520) +``` + +- [ ] **Step 4: Aggiungi gli endpoint** + +Trova la sezione con `t_get_breakeven_inflation` o l'ultimo `@app.post(...)` `tags=["reads"]` e aggiungi subito dopo: + +```python + @app.post("/tools/get_cot_tff", tags=["reads"]) + async def t_get_cot_tff( + body: GetCotTffReq, principal: Principal = Depends(require_principal) + ): + _check(principal, core=True, observer=True) + return await fetch_cot_tff(body.symbol, body.lookback_weeks) + + @app.post("/tools/get_cot_disaggregated", tags=["reads"]) + async def t_get_cot_disaggregated( + body: GetCotDisaggregatedReq, principal: Principal = Depends(require_principal) + ): + _check(principal, core=True, observer=True) + return await fetch_cot_disaggregated(body.symbol, body.lookback_weeks) + + @app.post("/tools/get_cot_extreme_positioning", tags=["reads"]) + async def t_get_cot_extreme( + body: GetCotExtremeReq, principal: Principal = Depends(require_principal) + ): + _check(principal, core=True, observer=True) + return await fetch_cot_extreme_positioning(body.lookback_weeks) +``` + +- [ ] **Step 5: Aggiungi i 3 tool al registry MCP** + +Trova `mount_mcp_endpoint(...)` e dentro `tools=[...]` aggiungi (dopo `get_breakeven_inflation`): + +```python + {"name": "get_cot_tff", "description": "COT TFF report (CFTC) per equity/financial: ES/NQ/RTY/ZN/ZB/6E/6J/DX. Roles: dealer, asset manager, leveraged funds, other."}, + {"name": "get_cot_disaggregated", "description": "COT Disaggregated report (CFTC) per commodities: CL/GC/SI/HG/ZW/ZC/ZS. Roles: producer/merchant, swap dealer, managed money, other."}, + {"name": "get_cot_extreme_positioning", "description": "Scanner posizionamento estremo (percentile ≤5 o ≥95) sui simboli watchlist."}, +``` + +- [ ] **Step 6: Run, verifica pass** + +Run: `uv run pytest services/mcp-macro/tests/test_server_acl.py -v -k cot` +Expected: 7 passed. + +- [ ] **Step 7: Run full suite per regression** + +Run: `uv run pytest services/` +Expected: tutti verdi (esistenti + nuovi). + +- [ ] **Step 8: Commit** + +```bash +git add services/mcp-macro/src/mcp_macro/server.py services/mcp-macro/tests/test_server_acl.py +git commit -m "feat(mcp-macro): expose COT report tools via MCP endpoint" +``` + +--- + +### Task 8: Documentation update + +**Files:** + +- Modify: `README.md` + +- [ ] **Step 1: Aggiungi i 3 tool COT alla sezione Macro del README** + +In `README.md`, trova la riga che inizia con "**Macro**: Treasury yields..." e sostituisci la lista per includere i COT (la riga "**Nuovi**" già esiste; aggiungi i COT nella stessa enumerazione): + +```markdown +### Macro +Treasury yields, FRED indicators, equity futures, asset prices, calendar. +**Nuovi**: `get_yield_curve_slope` (slope 2y10y/5y30y + butterfly + regime), +`get_breakeven_inflation` (T5YIE/T10YIE/T5YIFR), `get_cot_tff` (TFF report +CFTC equity/financial), `get_cot_disaggregated` (Disaggregated report +CFTC commodities), `get_cot_extreme_positioning` (scanner percentile +≤5/≥95 su watchlist). +``` + +- [ ] **Step 2: Commit** + +```bash +git add README.md +git commit -m "docs: add COT report tools to README macro section" +``` + +--- + +## Self-Review + +**1. Spec coverage:** + +- §1 Motivazione → contesto, no task richiesto. +- §2 Decisione TFF + Disaggregated → riflessa nelle costanti del Task 1. +- §3 Sorgenti dati → `CFTC_BASE_URL`, `*_DATASET_ID` in Task 1. +- §4 Watchlist simboli → `SYMBOL_TO_CFTC_CODE_*` in Task 1. +- §5.1 `get_cot_tff` → Task 4 fetcher + Task 7 endpoint. +- §5.2 `get_cot_disaggregated` → Task 5 fetcher + Task 7 endpoint. +- §5.3 `get_cot_extreme_positioning` → Task 6 fetcher + Task 7 endpoint. +- §6 Architettura → File Structure section + Task 1/2/3. +- §7 Cache → `_COT_CACHE`, TTL 3600 in Task 4. +- §8 Edge cases → `unknown_symbol` (Task 4/5), 5xx → `cftc_unavailable` (Task 4/5), `lookback_weeks ≥ 4` (Task 7 Pydantic Field). +- §9 Test plan → coperto in Task 2 (pure-logic), Task 3 (parser), Task 4-5-6 (httpx_mock), Task 7 (ACL). +- §10 Out of scope → niente da implementare, OK. + +**2. Placeholder scan:** nessun TBD/TODO. Tutto codice concreto. + +**3. Type consistency:** + +- `compute_percentile` ritorna `float | None` in Task 2; usato in Task 6 con check `pct = compute_percentile(...)` e passato a `classify_extreme(pct)` che accetta `float | None`. ✓ +- `parse_tff_row` campo `lev_funds_net` → usato in Task 6 `r["lev_funds_net"]`. ✓ +- `parse_disagg_row` campo `managed_money_net` → usato in Task 6. ✓ +- `SYMBOL_TO_CFTC_CODE_TFF` usato in Task 4 + Task 6. ✓ +- Body models Pydantic Field constraint `ge=4` matcha edge case spec §8 (lookback ≥ 4). ✓ + +Plan complete and saved to `docs/superpowers/plans/2026-04-27-cot-report.md`. Two execution options: + +**1. Subagent-Driven (recommended)** — dispatch un subagent per task, review tra task, iterazione veloce. + +**2. Inline Execution** — eseguo i task in questa stessa sessione con checkpoint per review. + +Quale approccio?