feat(mcp-macro): fetch_cot_extreme_positioning scanner

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
AdrianoDev
2026-04-29 00:06:52 +02:00
parent 2474445b4c
commit dc285daac8
2 changed files with 103 additions and 1 deletions
+38
View File
@@ -361,3 +361,41 @@ async def test_fetch_cot_disagg_unknown_symbol():
assert out.get("error") == "unknown_symbol"
assert "CL" in out.get("available", [])
@pytest.mark.asyncio
async def test_fetch_cot_extreme_positioning_flags_outliers(monkeypatch):
"""Mock fetch_cot_tff e fetch_cot_disagg per simulare history e ultimo punto."""
from unittest.mock import AsyncMock
from mcp_macro import fetchers as f
# Simula una serie ES dove ultimo lev_funds_net è in basso (extreme_short)
es_rows = [
{"report_date": f"2026-{m:02d}-01", "lev_funds_net": v}
for m, v in [(1, 0), (2, 50), (3, 100), (4, -500)]
]
cl_rows = [
{"report_date": f"2026-{m:02d}-01", "managed_money_net": v}
for m, v in [(1, 100), (2, 200), (3, 300), (4, 1000)]
]
async def fake_tff(symbol, lookback_weeks):
if symbol == "ES":
return {"symbol": "ES", "report_type": "tff", "rows": es_rows}
return {"symbol": symbol, "report_type": "tff", "rows": []}
async def fake_disagg(symbol, lookback_weeks):
if symbol == "CL":
return {"symbol": "CL", "report_type": "disaggregated", "rows": cl_rows}
return {"symbol": symbol, "report_type": "disaggregated", "rows": []}
monkeypatch.setattr(f, "fetch_cot_tff", AsyncMock(side_effect=fake_tff))
monkeypatch.setattr(f, "fetch_cot_disaggregated", AsyncMock(side_effect=fake_disagg))
out = await f.fetch_cot_extreme_positioning(lookback_weeks=4)
assert "extremes" in out
by_sym = {e["symbol"]: e for e in out["extremes"]}
assert by_sym["ES"]["signal"] == "extreme_short"
assert by_sym["ES"]["key_role"] == "lev_funds"
assert by_sym["CL"]["signal"] == "extreme_long"
assert by_sym["CL"]["key_role"] == "managed_money"