Files
AdrianoDev 28e77cddee
ci / ruff lint (push) Failing after 33s
ci / mypy mcp_common (push) Failing after 31s
ci / pytest (push) Failing after 43s
ci / validate compose + Caddyfile (push) Failing after 38s
ci / build & push to registry (push) Has been skipped
docs(plan): COT report implementation plan (8 tasks TDD)
2026-04-28 23:54:05 +02:00

35 KiB

COT Report Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Aggiungere a mcp-macro 3 tool MCP per fetchare il Commitment of Traders (COT) report CFTC: TFF per equity/financial, Disaggregated per commodities, scanner di extreme positioning.

Architecture: Pure-logic helpers (parsing + percentile + classify) testati separatamente; fetcher async che usano mcp_common.http.async_client (retry transport in place); cache in-memory TTL 1h riusando il pattern di _TREASURY_CACHE; tre endpoint server con Pydantic body models, ACL core+observer.

Tech Stack: Python 3.13, FastAPI, httpx (via mcp_common.http.async_client), pytest + pytest-httpx, Pydantic v2.

Spec di riferimento: docs/superpowers/specs/2026-04-27-cot-report-design.md


File Structure

services/mcp-macro/
  src/mcp_macro/
    cot_contracts.py    # NUOVO — costanti SYMBOL_TO_CFTC_CODE_TFF, _DISAGG, FIELD_MAPPINGS
    cot.py              # NUOVO — pure-logic: parse_tff_row, parse_disagg_row,
                        #         compute_percentile, classify_extreme
    fetchers.py         # MODIFY — aggiunge fetch_cot_tff, fetch_cot_disaggregated,
                        #          fetch_cot_extreme_positioning + cache
    server.py           # MODIFY — aggiunge 3 endpoint + 3 body model
  tests/
    test_cot.py         # NUOVO — test pure-logic (parsing, percentile, classify)
    test_fetchers.py    # MODIFY — integration test cot fetchers via httpx_mock
    test_server_acl.py  # MODIFY — ACL test sui 3 nuovi endpoint

Decomposizione: cot_contracts.py ha solo costanti (zero logica). cot.py ha funzioni pure (testabili senza HTTP). fetchers.py orchestra HTTP + cache. server.py espone endpoint + ACL. Files in tests/ rispecchiano la struttura.


Task 1: Costanti CFTC contract codes + field mappings

Files:

  • Create: services/mcp-macro/src/mcp_macro/cot_contracts.py

  • Step 1: Crea il modulo costanti

# services/mcp-macro/src/mcp_macro/cot_contracts.py
"""Costanti CFTC: ticker → contract_market_code per TFF e Disaggregated.

I codici CFTC (`cftc_contract_market_code`) sono pubblici e stabili nel tempo.
Riferimento: https://www.cftc.gov/MarketReports/CommitmentsofTraders/
"""
from __future__ import annotations

CFTC_BASE_URL = "https://publicreporting.cftc.gov/resource"
TFF_DATASET_ID = "gpe5-46if"
DISAGG_DATASET_ID = "72hh-3qpy"

# TFF: equity/financial. Mapping ticker → cftc_contract_market_code.
SYMBOL_TO_CFTC_CODE_TFF: dict[str, str] = {
    "ES": "13874A",   # E-mini S&P 500
    "NQ": "209742",   # E-mini Nasdaq-100
    "RTY": "239742",  # E-mini Russell 2000
    "ZN": "043602",   # 10-Year T-Note
    "ZB": "020601",   # 30-Year T-Bond
    "6E": "099741",   # Euro FX
    "6J": "097741",   # Japanese Yen
    "DX": "098662",   # US Dollar Index
}

# Disaggregated: commodities.
SYMBOL_TO_CFTC_CODE_DISAGG: dict[str, str] = {
    "CL": "067651",   # Crude Oil WTI
    "GC": "088691",   # Gold
    "SI": "084691",   # Silver
    "HG": "085692",   # Copper
    "ZW": "001602",   # Wheat
    "ZC": "002602",   # Corn
    "ZS": "005602",   # Soybeans
}

ALL_TFF_SYMBOLS: list[str] = list(SYMBOL_TO_CFTC_CODE_TFF.keys())
ALL_DISAGG_SYMBOLS: list[str] = list(SYMBOL_TO_CFTC_CODE_DISAGG.keys())
  • Step 2: Commit
git add services/mcp-macro/src/mcp_macro/cot_contracts.py
git commit -m "feat(mcp-macro): add CFTC contract codes constants for COT report"

Task 2: Pure-logic — compute_percentile + classify_extreme

Files:

  • Create: services/mcp-macro/src/mcp_macro/cot.py (parziale, solo helper percentile)

  • Create: services/mcp-macro/tests/test_cot.py

  • Step 1: Scrivi i test failing

# services/mcp-macro/tests/test_cot.py
from __future__ import annotations

from mcp_macro.cot import classify_extreme, compute_percentile


def test_compute_percentile_basic():
    history = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
    assert compute_percentile(50, history) == 50.0
    assert compute_percentile(10, history) == 10.0
    assert compute_percentile(100, history) == 100.0


def test_compute_percentile_value_below_min():
    history = [10, 20, 30]
    assert compute_percentile(5, history) == 0.0


def test_compute_percentile_value_above_max():
    history = [10, 20, 30]
    assert compute_percentile(40, history) == 100.0


def test_compute_percentile_empty_history():
    assert compute_percentile(50, []) is None


def test_classify_extreme_below_threshold():
    assert classify_extreme(3.0) == "extreme_short"
    assert classify_extreme(5.0) == "extreme_short"  # boundary inclusive


def test_classify_extreme_above_threshold():
    assert classify_extreme(96.0) == "extreme_long"
    assert classify_extreme(95.0) == "extreme_long"  # boundary inclusive


def test_classify_extreme_neutral():
    assert classify_extreme(50.0) == "neutral"
    assert classify_extreme(94.99) == "neutral"
    assert classify_extreme(5.01) == "neutral"


def test_classify_extreme_none_input():
    assert classify_extreme(None) == "neutral"
  • Step 2: Run, verifica fail

Run: uv run pytest services/mcp-macro/tests/test_cot.py -v Expected: ImportError (modulo mcp_macro.cot non esiste ancora).

  • Step 3: Implementa cot.py con i 2 helper
# services/mcp-macro/src/mcp_macro/cot.py
"""Pure-logic helpers per COT report parsing e analytics.

Niente HTTP qui — orchestrazione fetch sta in fetchers.py. Tutto testabile
in isolamento.
"""
from __future__ import annotations

from typing import Literal

ExtremeSignal = Literal["extreme_short", "extreme_long", "neutral"]


def compute_percentile(value: float, history: list[float]) -> float | None:
    """Percentile di `value` rispetto ad `history` (0-100, inclusive).

    Restituisce None se history vuoto. Clipped a [0, 100] se value fuori range.
    """
    if not history:
        return None
    n = len(history)
    below_or_eq = sum(1 for h in history if h <= value)
    pct = 100.0 * below_or_eq / n
    return max(0.0, min(100.0, pct))


def classify_extreme(percentile: float | None, threshold: float = 5.0) -> ExtremeSignal:
    """Classifica un percentile come estremo short/long o neutral.

    threshold default 5 → flagga ≤ 5 come short, ≥ 100-5=95 come long.
    """
    if percentile is None:
        return "neutral"
    if percentile <= threshold:
        return "extreme_short"
    if percentile >= 100.0 - threshold:
        return "extreme_long"
    return "neutral"
  • Step 4: Run, verifica pass

Run: uv run pytest services/mcp-macro/tests/test_cot.py -v Expected: 8 passed.

  • Step 5: Commit
git add services/mcp-macro/src/mcp_macro/cot.py services/mcp-macro/tests/test_cot.py
git commit -m "feat(mcp-macro): add compute_percentile + classify_extreme pure helpers"

Task 3: Pure-logic — parse_tff_row + parse_disagg_row

Files:

  • Modify: services/mcp-macro/src/mcp_macro/cot.py (aggiunge parser)

  • Modify: services/mcp-macro/tests/test_cot.py

  • Step 1: Aggiungi i test failing

Append a services/mcp-macro/tests/test_cot.py:

from mcp_macro.cot import parse_disagg_row, parse_tff_row


# Payload Socrata reale (subset campi rilevanti, valori arbitrari per test)
TFF_SOCRATA_ROW = {
    "report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000",
    "dealer_positions_long_all": "12345",
    "dealer_positions_short_all": "23456",
    "asset_mgr_positions_long": "654321",
    "asset_mgr_positions_short": "200000",
    "lev_money_positions_long": "100000",
    "lev_money_positions_short": "350000",
    "other_rept_positions_long": "50000",
    "other_rept_positions_short": "50000",
    "open_interest_all": "2500000",
}

DISAGG_SOCRATA_ROW = {
    "report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000",
    "prod_merc_positions_long_all": "100000",
    "prod_merc_positions_short_all": "300000",
    "swap_positions_long_all": "50000",
    "swap_positions_short_all": "60000",
    "m_money_positions_long_all": "200000",
    "m_money_positions_short_all": "80000",
    "other_rept_positions_long_all": "10000",
    "other_rept_positions_short_all": "10000",
    "open_interest_all": "1500000",
}


def test_parse_tff_row_extracts_all_fields():
    row = parse_tff_row(TFF_SOCRATA_ROW)
    assert row["report_date"] == "2026-04-22"
    assert row["dealer_long"] == 12345
    assert row["dealer_short"] == 23456
    assert row["dealer_net"] == 12345 - 23456
    assert row["asset_mgr_long"] == 654321
    assert row["asset_mgr_net"] == 654321 - 200000
    assert row["lev_funds_long"] == 100000
    assert row["lev_funds_short"] == 350000
    assert row["lev_funds_net"] == 100000 - 350000
    assert row["other_long"] == 50000
    assert row["other_net"] == 0
    assert row["open_interest"] == 2500000


def test_parse_tff_row_handles_missing_field():
    payload = {"report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000"}
    row = parse_tff_row(payload)
    assert row["report_date"] == "2026-04-22"
    assert row["dealer_long"] == 0
    assert row["dealer_net"] == 0


def test_parse_disagg_row_extracts_all_fields():
    row = parse_disagg_row(DISAGG_SOCRATA_ROW)
    assert row["report_date"] == "2026-04-22"
    assert row["producer_long"] == 100000
    assert row["producer_short"] == 300000
    assert row["producer_net"] == -200000
    assert row["swap_long"] == 50000
    assert row["swap_net"] == -10000
    assert row["managed_money_long"] == 200000
    assert row["managed_money_short"] == 80000
    assert row["managed_money_net"] == 120000
    assert row["other_long"] == 10000
    assert row["other_net"] == 0
    assert row["open_interest"] == 1500000
  • Step 2: Run, verifica fail

Run: uv run pytest services/mcp-macro/tests/test_cot.py -v Expected: ImportError o AttributeError su parse_tff_row/parse_disagg_row.

  • Step 3: Aggiungi i parser a cot.py

Append a services/mcp-macro/src/mcp_macro/cot.py:

def _to_int(v) -> int:
    try:
        return int(float(v))
    except (TypeError, ValueError):
        return 0


def _date_only(s: str) -> str:
    """Estrae 'YYYY-MM-DD' da una data ISO con o senza timestamp."""
    if not s:
        return ""
    return s.split("T", 1)[0]


def parse_tff_row(raw: dict) -> dict:
    """Mappa una row Socrata TFF al formato API output."""
    dl = _to_int(raw.get("dealer_positions_long_all"))
    ds = _to_int(raw.get("dealer_positions_short_all"))
    al = _to_int(raw.get("asset_mgr_positions_long"))
    as_ = _to_int(raw.get("asset_mgr_positions_short"))
    ll = _to_int(raw.get("lev_money_positions_long"))
    ls = _to_int(raw.get("lev_money_positions_short"))
    ol = _to_int(raw.get("other_rept_positions_long"))
    os_ = _to_int(raw.get("other_rept_positions_short"))
    return {
        "report_date": _date_only(raw.get("report_date_as_yyyy_mm_dd", "")),
        "dealer_long": dl, "dealer_short": ds, "dealer_net": dl - ds,
        "asset_mgr_long": al, "asset_mgr_short": as_, "asset_mgr_net": al - as_,
        "lev_funds_long": ll, "lev_funds_short": ls, "lev_funds_net": ll - ls,
        "other_long": ol, "other_short": os_, "other_net": ol - os_,
        "open_interest": _to_int(raw.get("open_interest_all")),
    }


def parse_disagg_row(raw: dict) -> dict:
    """Mappa una row Socrata Disaggregated F&O combined al formato API output."""
    pl = _to_int(raw.get("prod_merc_positions_long_all"))
    ps = _to_int(raw.get("prod_merc_positions_short_all"))
    sl = _to_int(raw.get("swap_positions_long_all"))
    ss = _to_int(raw.get("swap_positions_short_all"))
    ml = _to_int(raw.get("m_money_positions_long_all"))
    ms = _to_int(raw.get("m_money_positions_short_all"))
    ol = _to_int(raw.get("other_rept_positions_long_all"))
    os_ = _to_int(raw.get("other_rept_positions_short_all"))
    return {
        "report_date": _date_only(raw.get("report_date_as_yyyy_mm_dd", "")),
        "producer_long": pl, "producer_short": ps, "producer_net": pl - ps,
        "swap_long": sl, "swap_short": ss, "swap_net": sl - ss,
        "managed_money_long": ml, "managed_money_short": ms, "managed_money_net": ml - ms,
        "other_long": ol, "other_short": os_, "other_net": ol - os_,
        "open_interest": _to_int(raw.get("open_interest_all")),
    }
  • Step 4: Run, verifica pass

Run: uv run pytest services/mcp-macro/tests/test_cot.py -v Expected: 11 passed (8 + 3 nuovi).

  • Step 5: Commit
git add services/mcp-macro/src/mcp_macro/cot.py services/mcp-macro/tests/test_cot.py
git commit -m "feat(mcp-macro): add parse_tff_row + parse_disagg_row Socrata mappers"

Task 4: fetch_cot_tff async fetcher

Files:

  • Modify: services/mcp-macro/src/mcp_macro/fetchers.py

  • Modify: services/mcp-macro/tests/test_fetchers.py

  • Step 1: Aggiungi test failing

Append a services/mcp-macro/tests/test_fetchers.py:

@pytest.mark.asyncio
async def test_fetch_cot_tff_happy_path(httpx_mock: pytest_httpx.HTTPXMock):
    from mcp_macro.fetchers import fetch_cot_tff
    httpx_mock.add_response(
        url=httpx.URL(
            "https://publicreporting.cftc.gov/resource/gpe5-46if.json",
            params={
                "cftc_contract_market_code": "13874A",
                "$order": "report_date_as_yyyy_mm_dd DESC",
                "$limit": "52",
            },
        ),
        json=[
            {
                "report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000",
                "dealer_positions_long_all": "12345",
                "dealer_positions_short_all": "23456",
                "asset_mgr_positions_long": "654321",
                "asset_mgr_positions_short": "200000",
                "lev_money_positions_long": "100000",
                "lev_money_positions_short": "350000",
                "other_rept_positions_long": "50000",
                "other_rept_positions_short": "50000",
                "open_interest_all": "2500000",
            },
            {
                "report_date_as_yyyy_mm_dd": "2026-04-15T00:00:00.000",
                "dealer_positions_long_all": "11000",
                "dealer_positions_short_all": "22000",
                "asset_mgr_positions_long": "640000",
                "asset_mgr_positions_short": "210000",
                "lev_money_positions_long": "110000",
                "lev_money_positions_short": "320000",
                "other_rept_positions_long": "48000",
                "other_rept_positions_short": "52000",
                "open_interest_all": "2480000",
            },
        ],
    )
    out = await fetch_cot_tff("ES", lookback_weeks=52)
    assert out["symbol"] == "ES"
    assert out["report_type"] == "tff"
    assert len(out["rows"]) == 2
    # Ordering ASC by date (oldest first)
    assert out["rows"][0]["report_date"] == "2026-04-15"
    assert out["rows"][1]["report_date"] == "2026-04-22"
    assert out["rows"][1]["lev_funds_net"] == -250000
    assert "data_timestamp" in out


@pytest.mark.asyncio
async def test_fetch_cot_tff_unknown_symbol():
    from mcp_macro.fetchers import fetch_cot_tff
    out = await fetch_cot_tff("INVALID", lookback_weeks=52)
    assert out.get("error") == "unknown_symbol"
    assert "ES" in out.get("available", [])
  • Step 2: Run, verifica fail

Run: uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k cot_tff Expected: ImportError o AttributeError su fetch_cot_tff.

  • Step 3: Implementa fetch_cot_tff

Aggiungi a services/mcp-macro/src/mcp_macro/fetchers.py (in cima, dopo gli altri import):

from mcp_macro.cot import parse_disagg_row, parse_tff_row
from mcp_macro.cot_contracts import (
    ALL_DISAGG_SYMBOLS,
    ALL_TFF_SYMBOLS,
    CFTC_BASE_URL,
    DISAGG_DATASET_ID,
    SYMBOL_TO_CFTC_CODE_DISAGG,
    SYMBOL_TO_CFTC_CODE_TFF,
    TFF_DATASET_ID,
)

Quindi (in fondo al file, prima di altre funzioni nuove):

_COT_TTL = 3600.0  # 1h
_COT_CACHE: dict[tuple[str, str, int], dict[str, Any]] = {}
_COT_CACHE_TS: dict[tuple[str, str, int], float] = {}


async def fetch_cot_tff(symbol: str, lookback_weeks: int = 52) -> dict[str, Any]:
    """Fetch COT TFF report per simbolo equity/financial. Returns ASC by date."""
    import time

    symbol = symbol.upper()
    if symbol not in SYMBOL_TO_CFTC_CODE_TFF:
        return {"error": "unknown_symbol", "available": ALL_TFF_SYMBOLS}

    key = (symbol, "tff", lookback_weeks)
    now = time.monotonic()
    if key in _COT_CACHE and (now - _COT_CACHE_TS[key]) < _COT_TTL:
        return _COT_CACHE[key]

    code = SYMBOL_TO_CFTC_CODE_TFF[symbol]
    url = f"{CFTC_BASE_URL}/{TFF_DATASET_ID}.json"
    async with async_client(timeout=10.0) as client:
        resp = await client.get(
            url,
            params={
                "cftc_contract_market_code": code,
                "$order": "report_date_as_yyyy_mm_dd DESC",
                "$limit": str(lookback_weeks),
            },
        )
    if resp.status_code != 200:
        return {"symbol": symbol, "report_type": "tff", "rows": [], "error": "cftc_unavailable"}
    raw_rows = resp.json() or []
    parsed = [parse_tff_row(r) for r in raw_rows]
    parsed.sort(key=lambda r: r["report_date"])  # ASC by date
    out = {
        "symbol": symbol,
        "report_type": "tff",
        "rows": parsed,
        "data_timestamp": datetime.now(UTC).isoformat(),
    }
    _COT_CACHE[key] = out
    _COT_CACHE_TS[key] = now
    return out
  • Step 4: Run, verifica pass

Run: uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k cot_tff Expected: 2 passed.

  • Step 5: Commit
git add services/mcp-macro/src/mcp_macro/fetchers.py services/mcp-macro/tests/test_fetchers.py
git commit -m "feat(mcp-macro): fetch_cot_tff async fetcher with cache"

Task 5: fetch_cot_disaggregated async fetcher

Files:

  • Modify: services/mcp-macro/src/mcp_macro/fetchers.py

  • Modify: services/mcp-macro/tests/test_fetchers.py

  • Step 1: Aggiungi test failing

Append a services/mcp-macro/tests/test_fetchers.py:

@pytest.mark.asyncio
async def test_fetch_cot_disagg_happy_path(httpx_mock: pytest_httpx.HTTPXMock):
    from mcp_macro.fetchers import fetch_cot_disaggregated
    httpx_mock.add_response(
        url=httpx.URL(
            "https://publicreporting.cftc.gov/resource/72hh-3qpy.json",
            params={
                "cftc_contract_market_code": "067651",
                "$order": "report_date_as_yyyy_mm_dd DESC",
                "$limit": "52",
            },
        ),
        json=[
            {
                "report_date_as_yyyy_mm_dd": "2026-04-22T00:00:00.000",
                "prod_merc_positions_long_all": "100000",
                "prod_merc_positions_short_all": "300000",
                "swap_positions_long_all": "50000",
                "swap_positions_short_all": "60000",
                "m_money_positions_long_all": "200000",
                "m_money_positions_short_all": "80000",
                "other_rept_positions_long_all": "10000",
                "other_rept_positions_short_all": "10000",
                "open_interest_all": "1500000",
            },
        ],
    )
    out = await fetch_cot_disaggregated("CL", lookback_weeks=52)
    assert out["symbol"] == "CL"
    assert out["report_type"] == "disaggregated"
    assert len(out["rows"]) == 1
    assert out["rows"][0]["managed_money_net"] == 120000
    assert out["rows"][0]["producer_net"] == -200000


@pytest.mark.asyncio
async def test_fetch_cot_disagg_unknown_symbol():
    from mcp_macro.fetchers import fetch_cot_disaggregated
    out = await fetch_cot_disaggregated("XYZ", lookback_weeks=52)
    assert out.get("error") == "unknown_symbol"
    assert "CL" in out.get("available", [])
  • Step 2: Run, verifica fail

Run: uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k cot_disagg Expected: AttributeError su fetch_cot_disaggregated.

  • Step 3: Implementa fetch_cot_disaggregated

Append a services/mcp-macro/src/mcp_macro/fetchers.py (dopo fetch_cot_tff):

async def fetch_cot_disaggregated(symbol: str, lookback_weeks: int = 52) -> dict[str, Any]:
    """Fetch COT Disaggregated report per simbolo commodity. Returns ASC by date."""
    import time

    symbol = symbol.upper()
    if symbol not in SYMBOL_TO_CFTC_CODE_DISAGG:
        return {"error": "unknown_symbol", "available": ALL_DISAGG_SYMBOLS}

    key = (symbol, "disaggregated", lookback_weeks)
    now = time.monotonic()
    if key in _COT_CACHE and (now - _COT_CACHE_TS[key]) < _COT_TTL:
        return _COT_CACHE[key]

    code = SYMBOL_TO_CFTC_CODE_DISAGG[symbol]
    url = f"{CFTC_BASE_URL}/{DISAGG_DATASET_ID}.json"
    async with async_client(timeout=10.0) as client:
        resp = await client.get(
            url,
            params={
                "cftc_contract_market_code": code,
                "$order": "report_date_as_yyyy_mm_dd DESC",
                "$limit": str(lookback_weeks),
            },
        )
    if resp.status_code != 200:
        return {"symbol": symbol, "report_type": "disaggregated", "rows": [], "error": "cftc_unavailable"}
    raw_rows = resp.json() or []
    parsed = [parse_disagg_row(r) for r in raw_rows]
    parsed.sort(key=lambda r: r["report_date"])
    out = {
        "symbol": symbol,
        "report_type": "disaggregated",
        "rows": parsed,
        "data_timestamp": datetime.now(UTC).isoformat(),
    }
    _COT_CACHE[key] = out
    _COT_CACHE_TS[key] = now
    return out
  • Step 4: Run, verifica pass

Run: uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k cot_disagg Expected: 2 passed.

  • Step 5: Commit
git add services/mcp-macro/src/mcp_macro/fetchers.py services/mcp-macro/tests/test_fetchers.py
git commit -m "feat(mcp-macro): fetch_cot_disaggregated async fetcher with cache"

Task 6: fetch_cot_extreme_positioning scanner

Files:

  • Modify: services/mcp-macro/src/mcp_macro/fetchers.py

  • Modify: services/mcp-macro/tests/test_fetchers.py

  • Step 1: Aggiungi test failing

Append a services/mcp-macro/tests/test_fetchers.py:

@pytest.mark.asyncio
async def test_fetch_cot_extreme_positioning_flags_outliers(monkeypatch):
    """Mock fetch_cot_tff e fetch_cot_disagg per simulare history e ultimo punto."""
    from unittest.mock import AsyncMock
    from mcp_macro import fetchers as f

    # Simula una serie ES dove ultimo lev_funds_net è in basso (extreme_short)
    es_rows = [
        {"report_date": f"2026-{m:02d}-01", "lev_funds_net": v}
        for m, v in [(1, 0), (2, 50), (3, 100), (4, -500)]
    ]
    cl_rows = [
        {"report_date": f"2026-{m:02d}-01", "managed_money_net": v}
        for m, v in [(1, 100), (2, 200), (3, 300), (4, 1000)]
    ]

    async def fake_tff(symbol, lookback_weeks):
        if symbol == "ES":
            return {"symbol": "ES", "report_type": "tff", "rows": es_rows}
        return {"symbol": symbol, "report_type": "tff", "rows": []}

    async def fake_disagg(symbol, lookback_weeks):
        if symbol == "CL":
            return {"symbol": "CL", "report_type": "disaggregated", "rows": cl_rows}
        return {"symbol": symbol, "report_type": "disaggregated", "rows": []}

    monkeypatch.setattr(f, "fetch_cot_tff", AsyncMock(side_effect=fake_tff))
    monkeypatch.setattr(f, "fetch_cot_disaggregated", AsyncMock(side_effect=fake_disagg))

    out = await f.fetch_cot_extreme_positioning(lookback_weeks=4)
    assert "extremes" in out
    by_sym = {e["symbol"]: e for e in out["extremes"]}
    assert by_sym["ES"]["signal"] == "extreme_short"
    assert by_sym["ES"]["key_role"] == "lev_funds"
    assert by_sym["CL"]["signal"] == "extreme_long"
    assert by_sym["CL"]["key_role"] == "managed_money"
  • Step 2: Run, verifica fail

Run: uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k extreme_positioning Expected: AttributeError su fetch_cot_extreme_positioning.

  • Step 3: Implementa lo scanner

Append a services/mcp-macro/src/mcp_macro/fetchers.py:

from mcp_macro.cot import classify_extreme, compute_percentile


async def fetch_cot_extreme_positioning(lookback_weeks: int = 156) -> dict[str, Any]:
    """Scanner posizionamento estremo (percentile ≤5 o ≥95) sui simboli watchlist.

    TFF → key_role = lev_funds (lev_funds_net).
    Disaggregated → key_role = managed_money (managed_money_net).
    """
    import asyncio

    tff_tasks = [fetch_cot_tff(s, lookback_weeks) for s in ALL_TFF_SYMBOLS]
    disagg_tasks = [fetch_cot_disaggregated(s, lookback_weeks) for s in ALL_DISAGG_SYMBOLS]
    tff_results, disagg_results = await asyncio.gather(
        asyncio.gather(*tff_tasks, return_exceptions=True),
        asyncio.gather(*disagg_tasks, return_exceptions=True),
    )

    extremes: list[dict[str, Any]] = []

    for res in tff_results:
        if isinstance(res, BaseException) or not isinstance(res, dict):
            continue
        rows = res.get("rows") or []
        if len(rows) < 4:
            continue
        history = [r["lev_funds_net"] for r in rows]
        current = history[-1]
        pct = compute_percentile(current, history)
        extremes.append({
            "symbol": res["symbol"],
            "report_type": "tff",
            "key_role": "lev_funds",
            "current_net": current,
            "percentile": pct,
            "signal": classify_extreme(pct),
            "report_date": rows[-1]["report_date"],
        })

    for res in disagg_results:
        if isinstance(res, BaseException) or not isinstance(res, dict):
            continue
        rows = res.get("rows") or []
        if len(rows) < 4:
            continue
        history = [r["managed_money_net"] for r in rows]
        current = history[-1]
        pct = compute_percentile(current, history)
        extremes.append({
            "symbol": res["symbol"],
            "report_type": "disaggregated",
            "key_role": "managed_money",
            "current_net": current,
            "percentile": pct,
            "signal": classify_extreme(pct),
            "report_date": rows[-1]["report_date"],
        })

    return {
        "lookback_weeks": lookback_weeks,
        "extremes": extremes,
        "data_timestamp": datetime.now(UTC).isoformat(),
    }
  • Step 4: Run, verifica pass

Run: uv run pytest services/mcp-macro/tests/test_fetchers.py -v -k extreme_positioning Expected: 1 passed.

  • Step 5: Commit
git add services/mcp-macro/src/mcp_macro/fetchers.py services/mcp-macro/tests/test_fetchers.py
git commit -m "feat(mcp-macro): fetch_cot_extreme_positioning scanner"

Task 7: Server endpoints + body models

Files:

  • Modify: services/mcp-macro/src/mcp_macro/server.py

  • Modify: services/mcp-macro/tests/test_server_acl.py

  • Step 1: Aggiungi test ACL failing

Append a services/mcp-macro/tests/test_server_acl.py (assumendo http fixture esiste e patcha fetch_treasury_yields):

def test_get_cot_tff_core_ok(http):
    from unittest.mock import AsyncMock, patch
    with patch(
        "mcp_macro.server.fetch_cot_tff",
        new=AsyncMock(return_value={"symbol": "ES", "rows": []}),
    ):
        r = http.post(
            "/tools/get_cot_tff",
            headers={"Authorization": "Bearer ct"},
            json={"symbol": "ES"},
        )
    assert r.status_code == 200
    assert r.json()["symbol"] == "ES"


def test_get_cot_tff_observer_ok(http):
    from unittest.mock import AsyncMock, patch
    with patch(
        "mcp_macro.server.fetch_cot_tff",
        new=AsyncMock(return_value={"symbol": "ES", "rows": []}),
    ):
        r = http.post(
            "/tools/get_cot_tff",
            headers={"Authorization": "Bearer ot"},
            json={"symbol": "ES"},
        )
    assert r.status_code == 200


def test_get_cot_tff_no_auth_401(http):
    r = http.post("/tools/get_cot_tff", json={"symbol": "ES"})
    assert r.status_code == 401


def test_get_cot_disagg_observer_ok(http):
    from unittest.mock import AsyncMock, patch
    with patch(
        "mcp_macro.server.fetch_cot_disaggregated",
        new=AsyncMock(return_value={"symbol": "CL", "rows": []}),
    ):
        r = http.post(
            "/tools/get_cot_disaggregated",
            headers={"Authorization": "Bearer ot"},
            json={"symbol": "CL"},
        )
    assert r.status_code == 200


def test_get_cot_disagg_no_auth_401(http):
    r = http.post("/tools/get_cot_disaggregated", json={"symbol": "CL"})
    assert r.status_code == 401


def test_get_cot_extreme_positioning_ok(http):
    from unittest.mock import AsyncMock, patch
    with patch(
        "mcp_macro.server.fetch_cot_extreme_positioning",
        new=AsyncMock(return_value={"extremes": []}),
    ):
        r = http.post(
            "/tools/get_cot_extreme_positioning",
            headers={"Authorization": "Bearer ot"},
            json={},
        )
    assert r.status_code == 200


def test_get_cot_extreme_positioning_lookback_too_short(http):
    """Pydantic validation: lookback_weeks < 4 → 422."""
    r = http.post(
        "/tools/get_cot_extreme_positioning",
        headers={"Authorization": "Bearer ct"},
        json={"lookback_weeks": 2},
    )
    assert r.status_code == 422

(Se nel test_server_acl.py esistente le auth header sono diverse, es. Bearer core-tok, adatta i token in queste 7 test per matchare la fixture esistente.)

  • Step 2: Run, verifica fail

Run: uv run pytest services/mcp-macro/tests/test_server_acl.py -v -k cot Expected: 404 sui POST (endpoint non esistono ancora).

  • Step 3: Aggiungi import + body models a server.py

Modifica services/mcp-macro/src/mcp_macro/server.py:

Sostituisci l'import block dei fetcher con:

from mcp_macro.fetchers import (
    fetch_asset_price,
    fetch_breakeven_inflation,
    fetch_cot_disaggregated,
    fetch_cot_extreme_positioning,
    fetch_cot_tff,
    fetch_economic_indicators,
    fetch_equity_futures,
    fetch_macro_calendar,
    fetch_market_overview,
    fetch_treasury_yields,
    fetch_yield_curve_slope,
)

Sostituisci l'import pydantic con:

from pydantic import BaseModel, Field

Aggiungi (dopo class GetBreakevenInflationReq(BaseModel): o equivalente, prima della funzione _check):

class GetCotTffReq(BaseModel):
    symbol: str
    lookback_weeks: int = Field(default=52, ge=4, le=520)


class GetCotDisaggregatedReq(BaseModel):
    symbol: str
    lookback_weeks: int = Field(default=52, ge=4, le=520)


class GetCotExtremeReq(BaseModel):
    lookback_weeks: int = Field(default=156, ge=4, le=520)
  • Step 4: Aggiungi gli endpoint

Trova la sezione con t_get_breakeven_inflation o l'ultimo @app.post(...) tags=["reads"] e aggiungi subito dopo:

    @app.post("/tools/get_cot_tff", tags=["reads"])
    async def t_get_cot_tff(
        body: GetCotTffReq, principal: Principal = Depends(require_principal)
    ):
        _check(principal, core=True, observer=True)
        return await fetch_cot_tff(body.symbol, body.lookback_weeks)

    @app.post("/tools/get_cot_disaggregated", tags=["reads"])
    async def t_get_cot_disaggregated(
        body: GetCotDisaggregatedReq, principal: Principal = Depends(require_principal)
    ):
        _check(principal, core=True, observer=True)
        return await fetch_cot_disaggregated(body.symbol, body.lookback_weeks)

    @app.post("/tools/get_cot_extreme_positioning", tags=["reads"])
    async def t_get_cot_extreme(
        body: GetCotExtremeReq, principal: Principal = Depends(require_principal)
    ):
        _check(principal, core=True, observer=True)
        return await fetch_cot_extreme_positioning(body.lookback_weeks)
  • Step 5: Aggiungi i 3 tool al registry MCP

Trova mount_mcp_endpoint(...) e dentro tools=[...] aggiungi (dopo get_breakeven_inflation):

            {"name": "get_cot_tff", "description": "COT TFF report (CFTC) per equity/financial: ES/NQ/RTY/ZN/ZB/6E/6J/DX. Roles: dealer, asset manager, leveraged funds, other."},
            {"name": "get_cot_disaggregated", "description": "COT Disaggregated report (CFTC) per commodities: CL/GC/SI/HG/ZW/ZC/ZS. Roles: producer/merchant, swap dealer, managed money, other."},
            {"name": "get_cot_extreme_positioning", "description": "Scanner posizionamento estremo (percentile ≤5 o ≥95) sui simboli watchlist."},
  • Step 6: Run, verifica pass

Run: uv run pytest services/mcp-macro/tests/test_server_acl.py -v -k cot Expected: 7 passed.

  • Step 7: Run full suite per regression

Run: uv run pytest services/ Expected: tutti verdi (esistenti + nuovi).

  • Step 8: Commit
git add services/mcp-macro/src/mcp_macro/server.py services/mcp-macro/tests/test_server_acl.py
git commit -m "feat(mcp-macro): expose COT report tools via MCP endpoint"

Task 8: Documentation update

Files:

  • Modify: README.md

  • Step 1: Aggiungi i 3 tool COT alla sezione Macro del README

In README.md, trova la riga che inizia con "Macro: Treasury yields..." e sostituisci la lista per includere i COT (la riga "Nuovi" già esiste; aggiungi i COT nella stessa enumerazione):

### Macro
Treasury yields, FRED indicators, equity futures, asset prices, calendar.
**Nuovi**: `get_yield_curve_slope` (slope 2y10y/5y30y + butterfly + regime),
`get_breakeven_inflation` (T5YIE/T10YIE/T5YIFR), `get_cot_tff` (TFF report
CFTC equity/financial), `get_cot_disaggregated` (Disaggregated report
CFTC commodities), `get_cot_extreme_positioning` (scanner percentile
≤5/≥95 su watchlist).
  • Step 2: Commit
git add README.md
git commit -m "docs: add COT report tools to README macro section"

Self-Review

1. Spec coverage:

  • §1 Motivazione → contesto, no task richiesto.
  • §2 Decisione TFF + Disaggregated → riflessa nelle costanti del Task 1.
  • §3 Sorgenti dati → CFTC_BASE_URL, *_DATASET_ID in Task 1.
  • §4 Watchlist simboli → SYMBOL_TO_CFTC_CODE_* in Task 1.
  • §5.1 get_cot_tff → Task 4 fetcher + Task 7 endpoint.
  • §5.2 get_cot_disaggregated → Task 5 fetcher + Task 7 endpoint.
  • §5.3 get_cot_extreme_positioning → Task 6 fetcher + Task 7 endpoint.
  • §6 Architettura → File Structure section + Task 1/2/3.
  • §7 Cache → _COT_CACHE, TTL 3600 in Task 4.
  • §8 Edge cases → unknown_symbol (Task 4/5), 5xx → cftc_unavailable (Task 4/5), lookback_weeks ≥ 4 (Task 7 Pydantic Field).
  • §9 Test plan → coperto in Task 2 (pure-logic), Task 3 (parser), Task 4-5-6 (httpx_mock), Task 7 (ACL).
  • §10 Out of scope → niente da implementare, OK.

2. Placeholder scan: nessun TBD/TODO. Tutto codice concreto.

3. Type consistency:

  • compute_percentile ritorna float | None in Task 2; usato in Task 6 con check pct = compute_percentile(...) e passato a classify_extreme(pct) che accetta float | None. ✓
  • parse_tff_row campo lev_funds_net → usato in Task 6 r["lev_funds_net"]. ✓
  • parse_disagg_row campo managed_money_net → usato in Task 6. ✓
  • SYMBOL_TO_CFTC_CODE_TFF usato in Task 4 + Task 6. ✓
  • Body models Pydantic Field constraint ge=4 matcha edge case spec §8 (lookback ≥ 4). ✓

Plan complete and saved to docs/superpowers/plans/2026-04-27-cot-report.md. Two execution options:

1. Subagent-Driven (recommended) — dispatch un subagent per task, review tra task, iterazione veloce.

2. Inline Execution — eseguo i task in questa stessa sessione con checkpoint per review.

Quale approccio?