From 993326136b86a86b041935909f019fb62bfa478c Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Thu, 30 Apr 2026 18:16:26 +0200 Subject: [PATCH] test(V2): migrazione test common/ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Copiati e aggiornati i test da services/common/tests/ a tests/unit/common/. Import aggiornati da mcp_common a cerbero_mcp.common. Eliminati test di funzionalità V1-only (app_factory, environment, auth/Principal, server_base). Refactored test_audit.py (principal→actor str) e test_mcp_bridge.py (TokenStore→valid_tokens set). 71/71 test passano. Co-Authored-By: Claude Sonnet 4.6 --- tests/unit/common/__init__.py | 0 tests/unit/common/test_audit.py | 151 +++++++++++++ tests/unit/common/test_http.py | 72 +++++++ tests/unit/common/test_indicators.py | 260 +++++++++++++++++++++++ tests/unit/common/test_logging.py | 77 +++++++ tests/unit/common/test_mcp_bridge.py | 112 ++++++++++ tests/unit/common/test_microstructure.py | 59 +++++ tests/unit/common/test_options.py | 144 +++++++++++++ tests/unit/common/test_stats.py | 51 +++++ 9 files changed, 926 insertions(+) create mode 100644 tests/unit/common/__init__.py create mode 100644 tests/unit/common/test_audit.py create mode 100644 tests/unit/common/test_http.py create mode 100644 tests/unit/common/test_indicators.py create mode 100644 tests/unit/common/test_logging.py create mode 100644 tests/unit/common/test_mcp_bridge.py create mode 100644 tests/unit/common/test_microstructure.py create mode 100644 tests/unit/common/test_options.py create mode 100644 tests/unit/common/test_stats.py diff --git a/tests/unit/common/__init__.py b/tests/unit/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/common/test_audit.py b/tests/unit/common/test_audit.py new file mode 100644 index 0000000..948a6db --- /dev/null +++ b/tests/unit/common/test_audit.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import logging + +import pytest +from cerbero_mcp.common import audit as audit_mod +from cerbero_mcp.common.audit import audit_write_op + + +@pytest.fixture +def captured_records(monkeypatch): + """Cattura i record emessi dal logger mcp.audit (propagate=False blocca caplog). + + Sostituisce il logger del modulo con uno che ha caplog attaccato. + """ + records: list[logging.LogRecord] = [] + + class ListHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + records.append(record) + + test_logger = logging.getLogger("mcp.audit.test") + test_logger.handlers.clear() + test_logger.addHandler(ListHandler()) + test_logger.setLevel(logging.DEBUG) + test_logger.propagate = False + monkeypatch.setattr(audit_mod, "_logger", test_logger) + return records + + +def test_audit_write_op_emits_structured_record(captured_records): + audit_write_op( + actor="core", + action="place_order", + exchange="deribit", + target="BTC-PERPETUAL", + payload={"side": "buy", "amount": 10, "leverage": 3}, + result={"order_id": "abc", "state": "open"}, + ) + assert len(captured_records) == 1 + rec = captured_records[0] + assert rec.action == "place_order" + assert rec.exchange == "deribit" + assert rec.target == "BTC-PERPETUAL" + assert rec.actor == "core" + assert rec.payload == {"side": "buy", "amount": 10, "leverage": 3} + assert rec.result == {"order_id": "abc", "state": "open"} + + +def test_audit_write_op_error_uses_error_level(captured_records): + audit_write_op( + actor="core", + action="cancel_order", + exchange="bybit", + target="ord-123", + payload={}, + error="not_found", + ) + assert len(captured_records) == 1 + rec = captured_records[0] + assert rec.levelname == "ERROR" + assert rec.error == "not_found" + + +def test_audit_write_op_summarizes_result_fields(captured_records): + big_result = { + "order_id": "ord-1", + "state": "submitted", + "extra_huge_field": "x" * 10000, + "orders": [{"id": 1}, {"id": 2}, {"id": 3}], + } + audit_write_op( + actor="core", + action="place_combo_order", + exchange="bybit", + payload={}, + result=big_result, + ) + rec = captured_records[0] + assert "extra_huge_field" not in rec.result + assert rec.result["order_id"] == "ord-1" + assert rec.result["orders_count"] == 3 + + +def test_audit_write_op_no_actor(captured_records): + audit_write_op( + actor=None, + action="place_order", + exchange="alpaca", + payload={}, + ) + rec = captured_records[0] + assert rec.actor is None + + +def test_audit_write_op_writes_to_file_when_AUDIT_LOG_FILE_set(tmp_path, monkeypatch): + """Con env AUDIT_LOG_FILE settato, una riga JSON appare nel file.""" + import json + + from cerbero_mcp.common import audit as audit_mod + + audit_file = tmp_path / "audit.jsonl" + monkeypatch.setenv("AUDIT_LOG_FILE", str(audit_file)) + # Reset state idempotency flag così il test riesegue setup + audit_mod._file_handler_attached = False + # Pulisci handlers preesistenti dal logger (potrebbe avere file vecchio) + for h in list(audit_mod._logger.handlers): + from logging.handlers import TimedRotatingFileHandler + if isinstance(h, TimedRotatingFileHandler): + audit_mod._logger.removeHandler(h) + + audit_write_op( + actor="core", + action="place_order", + exchange="bybit", + target="BTCUSDT", + payload={"side": "Buy", "qty": 0.01}, + result={"order_id": "abc123", "status": "submitted"}, + ) + + # Forza flush dei file handler + for h in audit_mod._logger.handlers: + h.flush() + + assert audit_file.exists() + content = audit_file.read_text().strip() + assert content, "audit file empty" + record = json.loads(content.splitlines()[-1]) + assert record["audit_event"] == "write_op" + assert record["action"] == "place_order" + assert record["exchange"] == "bybit" + assert record["target"] == "BTCUSDT" + assert record["actor"] == "core" + + +def test_audit_no_file_when_env_unset(tmp_path, monkeypatch): + """Senza AUDIT_LOG_FILE, nessun file viene creato.""" + from cerbero_mcp.common import audit as audit_mod + monkeypatch.delenv("AUDIT_LOG_FILE", raising=False) + audit_mod._file_handler_attached = False + + audit_write_op( + actor="core", + action="cancel_order", + exchange="bybit", + target="ord-1", + payload={}, + ) + # Niente file creato in tmp_path + files = list(tmp_path.iterdir()) + assert files == [] diff --git a/tests/unit/common/test_http.py b/tests/unit/common/test_http.py new file mode 100644 index 0000000..83e71b3 --- /dev/null +++ b/tests/unit/common/test_http.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import asyncio + +import httpx +import pytest +from cerbero_mcp.common.http import async_client, call_with_retry + + +def test_async_client_uses_retry_transport(): + c = async_client(retries=5) + assert isinstance(c._transport, httpx.AsyncHTTPTransport) + # internal _retries on transport + assert c._transport._pool._retries == 5 + + +@pytest.mark.asyncio +async def test_call_with_retry_succeeds_first_try(): + calls = 0 + + async def fn(): + nonlocal calls + calls += 1 + return "ok" + + result = await call_with_retry(fn) + assert result == "ok" + assert calls == 1 + + +@pytest.mark.asyncio +async def test_call_with_retry_recovers_after_transient(monkeypatch): + monkeypatch.setattr(asyncio, "sleep", asyncio.coroutine(lambda *_: None) if False else _no_sleep) + calls = 0 + + async def fn(): + nonlocal calls + calls += 1 + if calls < 3: + raise httpx.ConnectError("boom") + return "ok" + + result = await call_with_retry(fn, max_attempts=5, base_delay=0.0) + assert result == "ok" + assert calls == 3 + + +async def _no_sleep(_): + return None + + +@pytest.mark.asyncio +async def test_call_with_retry_gives_up_after_max(): + calls = 0 + + async def fn(): + nonlocal calls + calls += 1 + raise httpx.TimeoutException("slow") + + with pytest.raises(httpx.TimeoutException): + await call_with_retry(fn, max_attempts=3, base_delay=0.0) + assert calls == 3 + + +@pytest.mark.asyncio +async def test_call_with_retry_does_not_catch_unexpected(): + async def fn(): + raise ValueError("not transient") + + with pytest.raises(ValueError): + await call_with_retry(fn, max_attempts=5, base_delay=0.0) diff --git a/tests/unit/common/test_indicators.py b/tests/unit/common/test_indicators.py new file mode 100644 index 0000000..4a2186d --- /dev/null +++ b/tests/unit/common/test_indicators.py @@ -0,0 +1,260 @@ + +import math + +from cerbero_mcp.common.indicators import ( + adx, + atr, + autocorrelation, + garch11_forecast, + half_life_mean_reversion, + hurst_exponent, + macd, + rolling_sharpe, + rsi, + sma, + var_cvar, + vol_cone, +) + + +def test_rsi_simple(): + closes = [44, 44.34, 44.09, 44.15, 43.61, 44.33, 44.83, 45.10, 45.42, 45.84, + 46.08, 45.89, 46.03, 45.61, 46.28] + r = rsi(closes, period=14) + assert r is not None + # Known textbook RSI value ballpark + assert 65.0 < r < 75.0 + + +def test_rsi_insufficient_data(): + assert rsi([1, 2, 3], period=14) is None + + +def test_sma_simple(): + assert sma([1, 2, 3, 4, 5], period=5) == 3.0 + assert sma([1, 2, 3], period=5) is None + + +def test_atr_simple(): + # highs, lows, closes + highs = [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24] + lows = [ 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23] + closes = [9.5,10.5,11.5,12.5,13.5,14.5,15.5,16.5,17.5,18.5,19.5,20.5,21.5,22.5,23.5] + a = atr(highs, lows, closes, period=14) + assert a is not None + assert 0.9 < a <= 1.5 + + +def test_macd_trend_up(): + # monotonic uptrend → MACD > 0, histogram > 0 + closes = [float(i) for i in range(1, 60)] + m = macd(closes) + assert m["macd"] is not None + assert m["signal"] is not None + assert m["hist"] is not None + assert m["macd"] > 0 + assert m["hist"] >= 0 + + +def test_macd_insufficient_data(): + m = macd([1.0, 2.0, 3.0]) + assert m == {"macd": None, "signal": None, "hist": None} + + +def test_macd_trend_down(): + closes = [float(i) for i in range(60, 1, -1)] + m = macd(closes) + assert m["macd"] < 0 + assert m["hist"] <= 0 + + +def test_adx_insufficient_data(): + a = adx([1.0] * 10, [0.5] * 10, [0.7] * 10, period=14) + assert a == {"adx": None, "+di": None, "-di": None} + + +def test_adx_strong_uptrend(): + highs = [float(i) + 1.0 for i in range(1, 40)] + lows = [float(i) for i in range(1, 40)] + closes = [float(i) + 0.5 for i in range(1, 40)] + a = adx(highs, lows, closes, period=14) + assert a["adx"] is not None + assert a["+di"] is not None and a["-di"] is not None + # strong uptrend → +DI >> -DI, ADX high + assert a["+di"] > a["-di"] + assert a["adx"] > 50.0 + + +def test_adx_flat_market(): + highs = [10.0] * 40 + lows = [9.0] * 40 + closes = [9.5] * 40 + a = adx(highs, lows, closes, period=14) + # no directional movement → ADX near 0 + assert a["adx"] is not None + assert a["adx"] < 5.0 + + +# ---------- vol_cone ---------- + +def _gbm_series(mu: float, sigma: float, n: int, seed: int = 42) -> list[float]: + """Mock GBM closes: deterministic for tests.""" + import random + r = random.Random(seed) + p = [100.0] + for _ in range(n): + z = r.gauss(0.0, 1.0) + p.append(p[-1] * math.exp(mu / 252 + sigma / math.sqrt(252) * z)) + return p + + +def test_vol_cone_returns_percentiles_per_window(): + closes = _gbm_series(mu=0.0, sigma=0.5, n=400) + out = vol_cone(closes, windows=[10, 30, 60]) + assert set(out.keys()) == {10, 30, 60} + for _w, stats in out.items(): + assert "current" in stats + assert "p10" in stats and "p50" in stats and "p90" in stats + assert stats["p10"] <= stats["p50"] <= stats["p90"] + # annualized — sensible range for sigma=0.5 + assert 0.1 < stats["p50"] < 1.5 + + +def test_vol_cone_insufficient_data(): + out = vol_cone([100.0, 101.0], windows=[10, 30]) + assert out[10]["current"] is None + assert out[30]["current"] is None + + +# ---------- hurst_exponent ---------- + +def test_hurst_random_walk_near_half(): + closes = _gbm_series(mu=0.0, sigma=0.3, n=500, seed=7) + h = hurst_exponent(closes) + assert h is not None + # Random walk → Hurst ≈ 0.5; R/S bias positivo ben noto su sample finiti. + # Bound largo: distinguere comunque random walk da trending forte (>0.85). + assert 0.35 < h < 0.85 + + +def test_hurst_persistent_trend(): + # Strong monotonic trend → H >> 0.5 + closes = [100.0 + i * 0.5 + math.sin(i / 10) * 0.1 for i in range(400)] + h = hurst_exponent(closes) + assert h is not None + assert h > 0.85 + + +def test_hurst_insufficient_data(): + assert hurst_exponent([1.0, 2.0, 3.0]) is None + + +# ---------- half_life_mean_reversion ---------- + +def test_half_life_mean_reverting_series(): + """OU process with theta=0.1 → half-life ≈ ln(2)/0.1 ≈ 6.93.""" + import random + r = random.Random(123) + theta = 0.1 + mu = 100.0 + sigma = 0.5 + s = [mu] + for _ in range(500): + s.append(s[-1] + theta * (mu - s[-1]) + sigma * r.gauss(0, 1)) + hl = half_life_mean_reversion(s) + assert hl is not None + # broad tolerance — finite-sample noise + assert 3.0 < hl < 20.0 + + +def test_half_life_trending_returns_none(): + closes = [100.0 + i for i in range(200)] + hl = half_life_mean_reversion(closes) + # No mean reversion → returns None or +inf + assert hl is None or hl > 1000 + + +# ---------- garch11_forecast ---------- + +def test_garch11_forecast_returns_positive_sigma(): + closes = _gbm_series(mu=0.0, sigma=0.4, n=500, seed=11) + out = garch11_forecast(closes) + assert out is not None + assert out["sigma_next"] > 0 + assert 0 < out["alpha"] < 1 + assert 0 < out["beta"] < 1 + assert out["alpha"] + out["beta"] < 1.0 # stationarity + + +def test_garch11_insufficient_data(): + assert garch11_forecast([100.0, 101.0]) is None + + +# ---------- autocorrelation ---------- + +def test_autocorrelation_white_noise_low(): + import random + r = random.Random(1) + rets = [r.gauss(0, 0.01) for _ in range(500)] + out = autocorrelation(rets, max_lag=5) + assert len(out) == 5 + # white noise → all autocorr ≈ 0 (within ±2/sqrt(N)) + bound = 2.0 / math.sqrt(len(rets)) + for _lag, val in out.items(): + assert abs(val) < bound * 2 # generous + + +def test_autocorrelation_lag1_strong_for_ar1(): + """AR(1) with phi=0.7 → autocorr lag-1 ≈ 0.7.""" + import random + r = random.Random(2) + s = [0.0] + for _ in range(500): + s.append(0.7 * s[-1] + r.gauss(0, 0.1)) + out = autocorrelation(s, max_lag=3) + assert out[1] > 0.5 + assert out[2] > 0.2 # geometric decay + + +def test_autocorrelation_insufficient_data(): + assert autocorrelation([1.0], max_lag=5) == {} + + +# ---------- rolling_sharpe ---------- + +def test_rolling_sharpe_positive_for_uptrend(): + closes = [100.0 * (1 + 0.001 * i) for i in range(252)] + s = rolling_sharpe(closes, window=60) + assert s is not None + assert s["sharpe"] > 0 + assert s["sortino"] >= s["sharpe"] / 2 # sortino can be high if no downside + + +def test_rolling_sharpe_zero_volatility(): + closes = [100.0] * 100 + s = rolling_sharpe(closes, window=60) + assert s is not None + assert s["sharpe"] == 0.0 # no variance → 0 by convention + + +def test_rolling_sharpe_insufficient_data(): + assert rolling_sharpe([100.0, 101.0], window=60) is None + + +# ---------- var_cvar ---------- + +def test_var_cvar_basic(): + import random + r = random.Random(3) + rets = [r.gauss(0.0005, 0.02) for _ in range(1000)] + out = var_cvar(rets, confidences=[0.95, 0.99]) + assert "var_95" in out and "cvar_95" in out + assert "var_99" in out and "cvar_99" in out + # VaR is loss → positive number representing percentile loss + assert out["var_95"] > 0 + assert out["cvar_95"] >= out["var_95"] # CVaR worse than VaR + assert out["var_99"] >= out["var_95"] + + +def test_var_cvar_insufficient_data(): + assert var_cvar([0.01], confidences=[0.95]) == {} diff --git a/tests/unit/common/test_logging.py b/tests/unit/common/test_logging.py new file mode 100644 index 0000000..23ca8c8 --- /dev/null +++ b/tests/unit/common/test_logging.py @@ -0,0 +1,77 @@ +import json +import logging + +from cerbero_mcp.common.logging import ( + SecretsFilter, + configure_root_logging, + get_json_logger, +) + + +def test_secrets_filter_masks_bearer(): + f = SecretsFilter() + rec = logging.LogRecord( + name="t", level=logging.INFO, pathname="", lineno=0, + msg="Got Bearer abcdef123456 from client", + args=(), exc_info=None, + ) + f.filter(rec) + assert "abcdef" not in rec.msg + assert "***" in rec.msg + + +def test_secrets_filter_masks_api_key_json(): + f = SecretsFilter() + rec = logging.LogRecord( + name="t", level=logging.INFO, pathname="", lineno=0, + msg='{"api_key": "sk-live-abc123xyz"}', + args=(), exc_info=None, + ) + f.filter(rec) + assert "sk-live-abc123xyz" not in rec.msg + + +def test_json_logger_outputs_json(capsys): + logger = get_json_logger("test") + logger.info("hello", extra={"user_id": 42}) + captured = capsys.readouterr() + # output is on stderr by default for json logger + line = (captured.err or captured.out).strip().splitlines()[-1] + data = json.loads(line) + assert data["message"] == "hello" + assert data["user_id"] == 42 + + +def test_configure_root_json_format(monkeypatch, capsys): + monkeypatch.setenv("LOG_FORMAT", "json") + monkeypatch.setenv("LOG_LEVEL", "INFO") + configure_root_logging() + logging.info("root json test") + line = capsys.readouterr().err.strip().splitlines()[-1] + data = json.loads(line) + assert data["message"] == "root json test" + assert data["levelname"] == "INFO" + + +def test_configure_root_text_format(monkeypatch, capsys): + monkeypatch.setenv("LOG_FORMAT", "text") + configure_root_logging() + logging.info("root text test") + line = capsys.readouterr().err.strip().splitlines()[-1] + # text format non è JSON parseable + try: + json.loads(line) + raise AssertionError("expected text format, got JSON") + except json.JSONDecodeError: + pass + assert "root text test" in line + + +def test_configure_root_applies_secrets_filter(monkeypatch, capsys): + monkeypatch.setenv("LOG_FORMAT", "json") + configure_root_logging() + logging.info("calling with Bearer sk-live-leak123456 token") + line = capsys.readouterr().err.strip().splitlines()[-1] + data = json.loads(line) + assert "sk-live-leak123456" not in data["message"] + assert "***" in data["message"] diff --git a/tests/unit/common/test_mcp_bridge.py b/tests/unit/common/test_mcp_bridge.py new file mode 100644 index 0000000..faaf05e --- /dev/null +++ b/tests/unit/common/test_mcp_bridge.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from fastapi import FastAPI +from fastapi.testclient import TestClient +from cerbero_mcp.common.mcp_bridge import _derive_input_schemas, mount_mcp_endpoint +from pydantic import BaseModel + +VALID_TOKEN = "t" +VALID_TOKENS: set[str] = {VALID_TOKEN} + + +class EchoBody(BaseModel): + msg: str + n: int = 1 + + +def _make_app() -> FastAPI: + app = FastAPI() + + @app.post("/tools/echo") + def echo(body: EchoBody): + return {"echo": body.msg, "n": body.n} + + @app.post("/tools/ping") + def ping(): + return {"pong": True} + + return app + + +def test_derive_input_schemas_resolves_lazy_annotations(): + app = _make_app() + schemas = _derive_input_schemas(app, ["echo", "ping"]) + assert "echo" in schemas + echo_schema = schemas["echo"] + assert echo_schema["type"] == "object" + assert "msg" in echo_schema["properties"] + assert "n" in echo_schema["properties"] + assert "msg" in echo_schema["required"] + # ping has no Pydantic body → not in map (fallback applied by caller) + assert "ping" not in schemas + + +def test_mount_mcp_endpoint_exposes_derived_schemas(): + app = _make_app() + mount_mcp_endpoint( + app, + name="test", + version="1.0", + valid_tokens=VALID_TOKENS, + internal_base_url="http://localhost:0", + tools=[ + {"name": "echo", "description": "Echo a message."}, + {"name": "ping", "description": "Ping."}, + ], + ) + c = TestClient(app) + r = c.post( + "/mcp", + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + ) + assert r.status_code == 200 + tools = r.json()["result"]["tools"] + by_name = {t["name"]: t for t in tools} + assert set(by_name["echo"]["inputSchema"]["required"]) == {"msg"} + # ping fallback su schema generico + assert by_name["ping"]["inputSchema"] == { + "type": "object", + "additionalProperties": True, + } + + +def test_mount_mcp_endpoint_requires_auth(): + app = _make_app() + mount_mcp_endpoint( + app, + name="test", + version="1.0", + valid_tokens=VALID_TOKENS, + internal_base_url="http://localhost:0", + tools=[{"name": "echo"}], + ) + c = TestClient(app) + r = c.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}) + assert r.status_code == 401 + r = c.post( + "/mcp", + headers={"Authorization": "Bearer WRONG"}, + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + ) + assert r.status_code == 403 + + +def test_explicit_input_schema_overrides_derived(): + app = _make_app() + custom = {"type": "object", "properties": {"custom": {"type": "string"}}, "required": ["custom"]} + mount_mcp_endpoint( + app, + name="test", + version="1.0", + valid_tokens=VALID_TOKENS, + internal_base_url="http://localhost:0", + tools=[{"name": "echo", "input_schema": custom}], + ) + c = TestClient(app) + r = c.post( + "/mcp", + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + ) + assert r.json()["result"]["tools"][0]["inputSchema"] == custom diff --git a/tests/unit/common/test_microstructure.py b/tests/unit/common/test_microstructure.py new file mode 100644 index 0000000..7530933 --- /dev/null +++ b/tests/unit/common/test_microstructure.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from cerbero_mcp.common.microstructure import orderbook_imbalance + + +def test_orderbook_imbalance_balanced(): + bids = [[100.0, 1.0], [99.5, 1.0], [99.0, 1.0]] + asks = [[100.5, 1.0], [101.0, 1.0], [101.5, 1.0]] + out = orderbook_imbalance(bids, asks, depth=3) + assert abs(out["imbalance_ratio"]) < 0.01 # bilanciato + assert out["bid_volume"] == 3.0 + assert out["ask_volume"] == 3.0 + assert out["microprice"] is not None + + +def test_orderbook_imbalance_bid_heavy(): + bids = [[100.0, 5.0], [99.5, 5.0]] + asks = [[100.5, 1.0], [101.0, 1.0]] + out = orderbook_imbalance(bids, asks, depth=2) + assert out["imbalance_ratio"] > 0.5 # forte bid pressure + assert out["bid_volume"] == 10.0 + assert out["ask_volume"] == 2.0 + + +def test_orderbook_imbalance_ask_heavy(): + bids = [[100.0, 1.0], [99.5, 1.0]] + asks = [[100.5, 5.0], [101.0, 5.0]] + out = orderbook_imbalance(bids, asks, depth=2) + assert out["imbalance_ratio"] < -0.5 + + +def test_orderbook_imbalance_microprice_skew(): + """Microprice è weighted mid: pesato bid/ask depth opposto.""" + bids = [[100.0, 9.0]] + asks = [[101.0, 1.0]] + out = orderbook_imbalance(bids, asks, depth=1) + # large bid → microprice closer to ask (paradox: weighted by *opposite* size) + assert out["microprice"] > 100.5 + + +def test_orderbook_imbalance_empty(): + out = orderbook_imbalance([], [], depth=5) + assert out["imbalance_ratio"] is None + assert out["microprice"] is None + + +def test_orderbook_imbalance_one_sided(): + out = orderbook_imbalance([[100.0, 1.0]], [], depth=1) + assert out["imbalance_ratio"] == 1.0 # all bid + + +def test_orderbook_imbalance_slope(): + """Slope = velocity of liquidity dropoff: ripido = poca liquidità in profondità.""" + bids_steep = [[100.0, 10.0], [99.0, 1.0]] # depth crolla → slope alto + asks_steep = [[101.0, 10.0], [102.0, 1.0]] + out = orderbook_imbalance(bids_steep, asks_steep, depth=2) + assert out["bid_slope"] is not None + # bid liquidity drops by 9 per 1 price unit → slope ~9 + assert out["bid_slope"] > 5.0 diff --git a/tests/unit/common/test_options.py b/tests/unit/common/test_options.py new file mode 100644 index 0000000..bfa5076 --- /dev/null +++ b/tests/unit/common/test_options.py @@ -0,0 +1,144 @@ +"""Test puri per mcp_common.options (logiche option-flow indipendenti +dall'exchange). +""" +from __future__ import annotations + +import pytest +from cerbero_mcp.common.options import ( + atm_vs_wings_vol, + dealer_gamma_profile, + oi_weighted_skew, + smile_asymmetry, + vanna_charm_aggregate, +) + +# ---------- oi_weighted_skew ---------- + +def test_oi_weighted_skew_balanced(): + """OI distribuito 50/50 calls/puts → skew vicino a 0.""" + legs = [ + {"iv": 0.5, "delta": 0.5, "oi": 100, "option_type": "call"}, + {"iv": 0.5, "delta": -0.5, "oi": 100, "option_type": "put"}, + ] + out = oi_weighted_skew(legs) + assert abs(out["skew"]) < 0.01 + + +def test_oi_weighted_skew_put_heavy(): + """Put heavy → IV media puts > IV media calls → skew positivo (put > call).""" + legs = [ + {"iv": 0.4, "delta": 0.5, "oi": 50, "option_type": "call"}, + {"iv": 0.7, "delta": -0.5, "oi": 500, "option_type": "put"}, + ] + out = oi_weighted_skew(legs) + assert out["skew"] > 0 + assert out["call_iv_weighted"] > 0 + assert out["put_iv_weighted"] > out["call_iv_weighted"] + + +def test_oi_weighted_skew_empty(): + out = oi_weighted_skew([]) + assert out == {"skew": None, "call_iv_weighted": None, "put_iv_weighted": None, "total_oi": 0} + + +# ---------- smile_asymmetry ---------- + +def test_smile_asymmetry_symmetric(): + """Smile simmetrico ATM → asymmetry ≈ 0.""" + legs = [ + {"strike": 80, "iv": 0.55, "option_type": "put"}, + {"strike": 90, "iv": 0.50, "option_type": "put"}, + {"strike": 100, "iv": 0.45, "option_type": "call"}, + {"strike": 110, "iv": 0.50, "option_type": "call"}, + {"strike": 120, "iv": 0.55, "option_type": "call"}, + ] + out = smile_asymmetry(legs, spot=100.0) + assert out["atm_iv"] is not None + assert abs(out["asymmetry"]) < 0.05 + + +def test_smile_asymmetry_put_skew(): + """OTM puts (low strike) IV >> OTM calls (high strike) IV → asymmetry > 0.""" + legs = [ + {"strike": 80, "iv": 0.80, "option_type": "put"}, + {"strike": 100, "iv": 0.50, "option_type": "call"}, + {"strike": 120, "iv": 0.45, "option_type": "call"}, + ] + out = smile_asymmetry(legs, spot=100.0) + assert out["asymmetry"] > 0.1 + + +def test_smile_asymmetry_no_atm(): + legs = [{"strike": 200, "iv": 0.5, "option_type": "call"}] + out = smile_asymmetry(legs, spot=100.0) + assert out["atm_iv"] is None + + +# ---------- atm_vs_wings_vol ---------- + +def test_atm_vs_wings_vol_basic(): + legs = [ + {"strike": 90, "iv": 0.55, "delta": -0.25, "option_type": "put"}, + {"strike": 100, "iv": 0.45, "delta": 0.5, "option_type": "call"}, + {"strike": 110, "iv": 0.50, "delta": 0.25, "option_type": "call"}, + ] + out = atm_vs_wings_vol(legs, spot=100.0) + assert out["atm_iv"] == pytest.approx(0.45, rel=1e-3) + assert out["wing_25d_call_iv"] == pytest.approx(0.50, rel=1e-3) + assert out["wing_25d_put_iv"] == pytest.approx(0.55, rel=1e-3) + # ATM 0 + + +def test_atm_vs_wings_vol_no_data(): + out = atm_vs_wings_vol([], spot=100.0) + assert out["atm_iv"] is None + + +# ---------- dealer_gamma_profile ---------- + +def test_dealer_gamma_profile_assumes_dealer_short_calls(): + """Convention: dealer SHORT calls (sells calls to retail), LONG puts. + Calls oi → negative dealer gamma, puts oi → positive dealer gamma. + """ + legs = [ + {"strike": 100, "gamma": 0.01, "oi": 1000, "option_type": "call"}, + {"strike": 100, "gamma": 0.01, "oi": 500, "option_type": "put"}, + ] + out = dealer_gamma_profile(legs, spot=100.0) + # call gamma greater than put gamma at same strike → net dealer short gamma + assert len(out["by_strike"]) == 1 + row = out["by_strike"][0] + assert row["call_dealer_gamma"] < 0 + assert row["put_dealer_gamma"] > 0 + assert row["net_dealer_gamma"] < 0 # calls dominate + assert out["total_net_dealer_gamma"] < 0 + + +def test_dealer_gamma_profile_empty(): + out = dealer_gamma_profile([], spot=100.0) + assert out["by_strike"] == [] + assert out["total_net_dealer_gamma"] == 0.0 + + +# ---------- vanna_charm_aggregate ---------- + +def test_vanna_charm_aggregate_basic(): + legs = [ + {"strike": 100, "vanna": 0.05, "charm": -0.001, "oi": 1000, "option_type": "call"}, + {"strike": 100, "vanna": -0.05, "charm": 0.001, "oi": 500, "option_type": "put"}, + ] + out = vanna_charm_aggregate(legs, spot=100.0) + assert out["total_vanna"] != 0 # some net exposure + assert "total_charm" in out + assert out["legs_analyzed"] == 2 + + +def test_vanna_charm_aggregate_skip_missing_greeks(): + legs = [ + {"strike": 100, "vanna": None, "charm": -0.001, "oi": 1000, "option_type": "call"}, + {"strike": 100, "vanna": 0.05, "charm": None, "oi": 500, "option_type": "put"}, + ] + out = vanna_charm_aggregate(legs, spot=100.0) + # entrambe le legs hanno almeno una greca None → skippate + assert out["legs_analyzed"] == 0 diff --git a/tests/unit/common/test_stats.py b/tests/unit/common/test_stats.py new file mode 100644 index 0000000..3c62f69 --- /dev/null +++ b/tests/unit/common/test_stats.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import random + +from cerbero_mcp.common.stats import cointegration_test + + +def test_cointegrated_synthetic_pair(): + """Costruisco coppia cointegrata: B random walk, A = 2*B + noise stazionario.""" + r = random.Random(1) + b = [100.0] + for _ in range(300): + b.append(b[-1] + r.gauss(0, 1)) + a = [2 * b[i] + r.gauss(0, 0.5) for i in range(len(b))] + out = cointegration_test(a, b) + assert out["cointegrated"] is True + assert out["beta"] == pytest_approx(2.0, rel=0.05) + assert out["adf_t_stat"] is not None + assert out["adf_t_stat"] < -2.86 + + +def test_not_cointegrated_independent_walks(): + """Due random walk indipendenti → spread non stazionario → no cointegration.""" + r = random.Random(2) + a = [100.0] + b = [100.0] + for _ in range(300): + a.append(a[-1] + r.gauss(0, 1)) + b.append(b[-1] + r.gauss(0, 1)) + out = cointegration_test(a, b) + # Per due RW indipendenti, t-stat ADF è solitamente > -2.86 → non cointegrate + assert out["cointegrated"] is False or out["adf_t_stat"] > -3.0 + + +def test_cointegration_short_series(): + out = cointegration_test([1.0, 2.0], [3.0, 4.0]) + assert out["cointegrated"] is None + assert out["beta"] is None + + +def test_cointegration_mismatched_length(): + out = cointegration_test([1.0, 2.0, 3.0], [1.0, 2.0]) + assert out["cointegrated"] is None + + +def pytest_approx(value, rel): + """Tiny helper to avoid importing pytest just for approx.""" + class _Approx: + def __eq__(self, other): + return abs(other - value) <= abs(value) * rel + return _Approx()