test(V2): migrazione test common/

Copiati e aggiornati i test da services/common/tests/ a tests/unit/common/.
Import aggiornati da mcp_common a cerbero_mcp.common. Eliminati test di
funzionalità V1-only (app_factory, environment, auth/Principal, server_base).
Refactored test_audit.py (principal→actor str) e test_mcp_bridge.py
(TokenStore→valid_tokens set). 71/71 test passano.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
AdrianoDev
2026-04-30 18:16:26 +02:00
parent 1a1f9c43ba
commit 993326136b
9 changed files with 926 additions and 0 deletions
View File
+151
View File
@@ -0,0 +1,151 @@
from __future__ import annotations
import logging
import pytest
from cerbero_mcp.common import audit as audit_mod
from cerbero_mcp.common.audit import audit_write_op
@pytest.fixture
def captured_records(monkeypatch):
"""Cattura i record emessi dal logger mcp.audit (propagate=False blocca caplog).
Sostituisce il logger del modulo con uno che ha caplog attaccato.
"""
records: list[logging.LogRecord] = []
class ListHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
records.append(record)
test_logger = logging.getLogger("mcp.audit.test")
test_logger.handlers.clear()
test_logger.addHandler(ListHandler())
test_logger.setLevel(logging.DEBUG)
test_logger.propagate = False
monkeypatch.setattr(audit_mod, "_logger", test_logger)
return records
def test_audit_write_op_emits_structured_record(captured_records):
audit_write_op(
actor="core",
action="place_order",
exchange="deribit",
target="BTC-PERPETUAL",
payload={"side": "buy", "amount": 10, "leverage": 3},
result={"order_id": "abc", "state": "open"},
)
assert len(captured_records) == 1
rec = captured_records[0]
assert rec.action == "place_order"
assert rec.exchange == "deribit"
assert rec.target == "BTC-PERPETUAL"
assert rec.actor == "core"
assert rec.payload == {"side": "buy", "amount": 10, "leverage": 3}
assert rec.result == {"order_id": "abc", "state": "open"}
def test_audit_write_op_error_uses_error_level(captured_records):
audit_write_op(
actor="core",
action="cancel_order",
exchange="bybit",
target="ord-123",
payload={},
error="not_found",
)
assert len(captured_records) == 1
rec = captured_records[0]
assert rec.levelname == "ERROR"
assert rec.error == "not_found"
def test_audit_write_op_summarizes_result_fields(captured_records):
big_result = {
"order_id": "ord-1",
"state": "submitted",
"extra_huge_field": "x" * 10000,
"orders": [{"id": 1}, {"id": 2}, {"id": 3}],
}
audit_write_op(
actor="core",
action="place_combo_order",
exchange="bybit",
payload={},
result=big_result,
)
rec = captured_records[0]
assert "extra_huge_field" not in rec.result
assert rec.result["order_id"] == "ord-1"
assert rec.result["orders_count"] == 3
def test_audit_write_op_no_actor(captured_records):
audit_write_op(
actor=None,
action="place_order",
exchange="alpaca",
payload={},
)
rec = captured_records[0]
assert rec.actor is None
def test_audit_write_op_writes_to_file_when_AUDIT_LOG_FILE_set(tmp_path, monkeypatch):
"""Con env AUDIT_LOG_FILE settato, una riga JSON appare nel file."""
import json
from cerbero_mcp.common import audit as audit_mod
audit_file = tmp_path / "audit.jsonl"
monkeypatch.setenv("AUDIT_LOG_FILE", str(audit_file))
# Reset state idempotency flag così il test riesegue setup
audit_mod._file_handler_attached = False
# Pulisci handlers preesistenti dal logger (potrebbe avere file vecchio)
for h in list(audit_mod._logger.handlers):
from logging.handlers import TimedRotatingFileHandler
if isinstance(h, TimedRotatingFileHandler):
audit_mod._logger.removeHandler(h)
audit_write_op(
actor="core",
action="place_order",
exchange="bybit",
target="BTCUSDT",
payload={"side": "Buy", "qty": 0.01},
result={"order_id": "abc123", "status": "submitted"},
)
# Forza flush dei file handler
for h in audit_mod._logger.handlers:
h.flush()
assert audit_file.exists()
content = audit_file.read_text().strip()
assert content, "audit file empty"
record = json.loads(content.splitlines()[-1])
assert record["audit_event"] == "write_op"
assert record["action"] == "place_order"
assert record["exchange"] == "bybit"
assert record["target"] == "BTCUSDT"
assert record["actor"] == "core"
def test_audit_no_file_when_env_unset(tmp_path, monkeypatch):
"""Senza AUDIT_LOG_FILE, nessun file viene creato."""
from cerbero_mcp.common import audit as audit_mod
monkeypatch.delenv("AUDIT_LOG_FILE", raising=False)
audit_mod._file_handler_attached = False
audit_write_op(
actor="core",
action="cancel_order",
exchange="bybit",
target="ord-1",
payload={},
)
# Niente file creato in tmp_path
files = list(tmp_path.iterdir())
assert files == []
+72
View File
@@ -0,0 +1,72 @@
from __future__ import annotations
import asyncio
import httpx
import pytest
from cerbero_mcp.common.http import async_client, call_with_retry
def test_async_client_uses_retry_transport():
c = async_client(retries=5)
assert isinstance(c._transport, httpx.AsyncHTTPTransport)
# internal _retries on transport
assert c._transport._pool._retries == 5
@pytest.mark.asyncio
async def test_call_with_retry_succeeds_first_try():
calls = 0
async def fn():
nonlocal calls
calls += 1
return "ok"
result = await call_with_retry(fn)
assert result == "ok"
assert calls == 1
@pytest.mark.asyncio
async def test_call_with_retry_recovers_after_transient(monkeypatch):
monkeypatch.setattr(asyncio, "sleep", asyncio.coroutine(lambda *_: None) if False else _no_sleep)
calls = 0
async def fn():
nonlocal calls
calls += 1
if calls < 3:
raise httpx.ConnectError("boom")
return "ok"
result = await call_with_retry(fn, max_attempts=5, base_delay=0.0)
assert result == "ok"
assert calls == 3
async def _no_sleep(_):
return None
@pytest.mark.asyncio
async def test_call_with_retry_gives_up_after_max():
calls = 0
async def fn():
nonlocal calls
calls += 1
raise httpx.TimeoutException("slow")
with pytest.raises(httpx.TimeoutException):
await call_with_retry(fn, max_attempts=3, base_delay=0.0)
assert calls == 3
@pytest.mark.asyncio
async def test_call_with_retry_does_not_catch_unexpected():
async def fn():
raise ValueError("not transient")
with pytest.raises(ValueError):
await call_with_retry(fn, max_attempts=5, base_delay=0.0)
+260
View File
@@ -0,0 +1,260 @@
import math
from cerbero_mcp.common.indicators import (
adx,
atr,
autocorrelation,
garch11_forecast,
half_life_mean_reversion,
hurst_exponent,
macd,
rolling_sharpe,
rsi,
sma,
var_cvar,
vol_cone,
)
def test_rsi_simple():
closes = [44, 44.34, 44.09, 44.15, 43.61, 44.33, 44.83, 45.10, 45.42, 45.84,
46.08, 45.89, 46.03, 45.61, 46.28]
r = rsi(closes, period=14)
assert r is not None
# Known textbook RSI value ballpark
assert 65.0 < r < 75.0
def test_rsi_insufficient_data():
assert rsi([1, 2, 3], period=14) is None
def test_sma_simple():
assert sma([1, 2, 3, 4, 5], period=5) == 3.0
assert sma([1, 2, 3], period=5) is None
def test_atr_simple():
# highs, lows, closes
highs = [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]
lows = [ 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23]
closes = [9.5,10.5,11.5,12.5,13.5,14.5,15.5,16.5,17.5,18.5,19.5,20.5,21.5,22.5,23.5]
a = atr(highs, lows, closes, period=14)
assert a is not None
assert 0.9 < a <= 1.5
def test_macd_trend_up():
# monotonic uptrend → MACD > 0, histogram > 0
closes = [float(i) for i in range(1, 60)]
m = macd(closes)
assert m["macd"] is not None
assert m["signal"] is not None
assert m["hist"] is not None
assert m["macd"] > 0
assert m["hist"] >= 0
def test_macd_insufficient_data():
m = macd([1.0, 2.0, 3.0])
assert m == {"macd": None, "signal": None, "hist": None}
def test_macd_trend_down():
closes = [float(i) for i in range(60, 1, -1)]
m = macd(closes)
assert m["macd"] < 0
assert m["hist"] <= 0
def test_adx_insufficient_data():
a = adx([1.0] * 10, [0.5] * 10, [0.7] * 10, period=14)
assert a == {"adx": None, "+di": None, "-di": None}
def test_adx_strong_uptrend():
highs = [float(i) + 1.0 for i in range(1, 40)]
lows = [float(i) for i in range(1, 40)]
closes = [float(i) + 0.5 for i in range(1, 40)]
a = adx(highs, lows, closes, period=14)
assert a["adx"] is not None
assert a["+di"] is not None and a["-di"] is not None
# strong uptrend → +DI >> -DI, ADX high
assert a["+di"] > a["-di"]
assert a["adx"] > 50.0
def test_adx_flat_market():
highs = [10.0] * 40
lows = [9.0] * 40
closes = [9.5] * 40
a = adx(highs, lows, closes, period=14)
# no directional movement → ADX near 0
assert a["adx"] is not None
assert a["adx"] < 5.0
# ---------- vol_cone ----------
def _gbm_series(mu: float, sigma: float, n: int, seed: int = 42) -> list[float]:
"""Mock GBM closes: deterministic for tests."""
import random
r = random.Random(seed)
p = [100.0]
for _ in range(n):
z = r.gauss(0.0, 1.0)
p.append(p[-1] * math.exp(mu / 252 + sigma / math.sqrt(252) * z))
return p
def test_vol_cone_returns_percentiles_per_window():
closes = _gbm_series(mu=0.0, sigma=0.5, n=400)
out = vol_cone(closes, windows=[10, 30, 60])
assert set(out.keys()) == {10, 30, 60}
for _w, stats in out.items():
assert "current" in stats
assert "p10" in stats and "p50" in stats and "p90" in stats
assert stats["p10"] <= stats["p50"] <= stats["p90"]
# annualized — sensible range for sigma=0.5
assert 0.1 < stats["p50"] < 1.5
def test_vol_cone_insufficient_data():
out = vol_cone([100.0, 101.0], windows=[10, 30])
assert out[10]["current"] is None
assert out[30]["current"] is None
# ---------- hurst_exponent ----------
def test_hurst_random_walk_near_half():
closes = _gbm_series(mu=0.0, sigma=0.3, n=500, seed=7)
h = hurst_exponent(closes)
assert h is not None
# Random walk → Hurst ≈ 0.5; R/S bias positivo ben noto su sample finiti.
# Bound largo: distinguere comunque random walk da trending forte (>0.85).
assert 0.35 < h < 0.85
def test_hurst_persistent_trend():
# Strong monotonic trend → H >> 0.5
closes = [100.0 + i * 0.5 + math.sin(i / 10) * 0.1 for i in range(400)]
h = hurst_exponent(closes)
assert h is not None
assert h > 0.85
def test_hurst_insufficient_data():
assert hurst_exponent([1.0, 2.0, 3.0]) is None
# ---------- half_life_mean_reversion ----------
def test_half_life_mean_reverting_series():
"""OU process with theta=0.1 → half-life ≈ ln(2)/0.1 ≈ 6.93."""
import random
r = random.Random(123)
theta = 0.1
mu = 100.0
sigma = 0.5
s = [mu]
for _ in range(500):
s.append(s[-1] + theta * (mu - s[-1]) + sigma * r.gauss(0, 1))
hl = half_life_mean_reversion(s)
assert hl is not None
# broad tolerance — finite-sample noise
assert 3.0 < hl < 20.0
def test_half_life_trending_returns_none():
closes = [100.0 + i for i in range(200)]
hl = half_life_mean_reversion(closes)
# No mean reversion → returns None or +inf
assert hl is None or hl > 1000
# ---------- garch11_forecast ----------
def test_garch11_forecast_returns_positive_sigma():
closes = _gbm_series(mu=0.0, sigma=0.4, n=500, seed=11)
out = garch11_forecast(closes)
assert out is not None
assert out["sigma_next"] > 0
assert 0 < out["alpha"] < 1
assert 0 < out["beta"] < 1
assert out["alpha"] + out["beta"] < 1.0 # stationarity
def test_garch11_insufficient_data():
assert garch11_forecast([100.0, 101.0]) is None
# ---------- autocorrelation ----------
def test_autocorrelation_white_noise_low():
import random
r = random.Random(1)
rets = [r.gauss(0, 0.01) for _ in range(500)]
out = autocorrelation(rets, max_lag=5)
assert len(out) == 5
# white noise → all autocorr ≈ 0 (within ±2/sqrt(N))
bound = 2.0 / math.sqrt(len(rets))
for _lag, val in out.items():
assert abs(val) < bound * 2 # generous
def test_autocorrelation_lag1_strong_for_ar1():
"""AR(1) with phi=0.7 → autocorr lag-1 ≈ 0.7."""
import random
r = random.Random(2)
s = [0.0]
for _ in range(500):
s.append(0.7 * s[-1] + r.gauss(0, 0.1))
out = autocorrelation(s, max_lag=3)
assert out[1] > 0.5
assert out[2] > 0.2 # geometric decay
def test_autocorrelation_insufficient_data():
assert autocorrelation([1.0], max_lag=5) == {}
# ---------- rolling_sharpe ----------
def test_rolling_sharpe_positive_for_uptrend():
closes = [100.0 * (1 + 0.001 * i) for i in range(252)]
s = rolling_sharpe(closes, window=60)
assert s is not None
assert s["sharpe"] > 0
assert s["sortino"] >= s["sharpe"] / 2 # sortino can be high if no downside
def test_rolling_sharpe_zero_volatility():
closes = [100.0] * 100
s = rolling_sharpe(closes, window=60)
assert s is not None
assert s["sharpe"] == 0.0 # no variance → 0 by convention
def test_rolling_sharpe_insufficient_data():
assert rolling_sharpe([100.0, 101.0], window=60) is None
# ---------- var_cvar ----------
def test_var_cvar_basic():
import random
r = random.Random(3)
rets = [r.gauss(0.0005, 0.02) for _ in range(1000)]
out = var_cvar(rets, confidences=[0.95, 0.99])
assert "var_95" in out and "cvar_95" in out
assert "var_99" in out and "cvar_99" in out
# VaR is loss → positive number representing percentile loss
assert out["var_95"] > 0
assert out["cvar_95"] >= out["var_95"] # CVaR worse than VaR
assert out["var_99"] >= out["var_95"]
def test_var_cvar_insufficient_data():
assert var_cvar([0.01], confidences=[0.95]) == {}
+77
View File
@@ -0,0 +1,77 @@
import json
import logging
from cerbero_mcp.common.logging import (
SecretsFilter,
configure_root_logging,
get_json_logger,
)
def test_secrets_filter_masks_bearer():
f = SecretsFilter()
rec = logging.LogRecord(
name="t", level=logging.INFO, pathname="", lineno=0,
msg="Got Bearer abcdef123456 from client",
args=(), exc_info=None,
)
f.filter(rec)
assert "abcdef" not in rec.msg
assert "***" in rec.msg
def test_secrets_filter_masks_api_key_json():
f = SecretsFilter()
rec = logging.LogRecord(
name="t", level=logging.INFO, pathname="", lineno=0,
msg='{"api_key": "sk-live-abc123xyz"}',
args=(), exc_info=None,
)
f.filter(rec)
assert "sk-live-abc123xyz" not in rec.msg
def test_json_logger_outputs_json(capsys):
logger = get_json_logger("test")
logger.info("hello", extra={"user_id": 42})
captured = capsys.readouterr()
# output is on stderr by default for json logger
line = (captured.err or captured.out).strip().splitlines()[-1]
data = json.loads(line)
assert data["message"] == "hello"
assert data["user_id"] == 42
def test_configure_root_json_format(monkeypatch, capsys):
monkeypatch.setenv("LOG_FORMAT", "json")
monkeypatch.setenv("LOG_LEVEL", "INFO")
configure_root_logging()
logging.info("root json test")
line = capsys.readouterr().err.strip().splitlines()[-1]
data = json.loads(line)
assert data["message"] == "root json test"
assert data["levelname"] == "INFO"
def test_configure_root_text_format(monkeypatch, capsys):
monkeypatch.setenv("LOG_FORMAT", "text")
configure_root_logging()
logging.info("root text test")
line = capsys.readouterr().err.strip().splitlines()[-1]
# text format non è JSON parseable
try:
json.loads(line)
raise AssertionError("expected text format, got JSON")
except json.JSONDecodeError:
pass
assert "root text test" in line
def test_configure_root_applies_secrets_filter(monkeypatch, capsys):
monkeypatch.setenv("LOG_FORMAT", "json")
configure_root_logging()
logging.info("calling with Bearer sk-live-leak123456 token")
line = capsys.readouterr().err.strip().splitlines()[-1]
data = json.loads(line)
assert "sk-live-leak123456" not in data["message"]
assert "***" in data["message"]
+112
View File
@@ -0,0 +1,112 @@
from __future__ import annotations
from fastapi import FastAPI
from fastapi.testclient import TestClient
from cerbero_mcp.common.mcp_bridge import _derive_input_schemas, mount_mcp_endpoint
from pydantic import BaseModel
VALID_TOKEN = "t"
VALID_TOKENS: set[str] = {VALID_TOKEN}
class EchoBody(BaseModel):
msg: str
n: int = 1
def _make_app() -> FastAPI:
app = FastAPI()
@app.post("/tools/echo")
def echo(body: EchoBody):
return {"echo": body.msg, "n": body.n}
@app.post("/tools/ping")
def ping():
return {"pong": True}
return app
def test_derive_input_schemas_resolves_lazy_annotations():
app = _make_app()
schemas = _derive_input_schemas(app, ["echo", "ping"])
assert "echo" in schemas
echo_schema = schemas["echo"]
assert echo_schema["type"] == "object"
assert "msg" in echo_schema["properties"]
assert "n" in echo_schema["properties"]
assert "msg" in echo_schema["required"]
# ping has no Pydantic body → not in map (fallback applied by caller)
assert "ping" not in schemas
def test_mount_mcp_endpoint_exposes_derived_schemas():
app = _make_app()
mount_mcp_endpoint(
app,
name="test",
version="1.0",
valid_tokens=VALID_TOKENS,
internal_base_url="http://localhost:0",
tools=[
{"name": "echo", "description": "Echo a message."},
{"name": "ping", "description": "Ping."},
],
)
c = TestClient(app)
r = c.post(
"/mcp",
headers={"Authorization": f"Bearer {VALID_TOKEN}"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
)
assert r.status_code == 200
tools = r.json()["result"]["tools"]
by_name = {t["name"]: t for t in tools}
assert set(by_name["echo"]["inputSchema"]["required"]) == {"msg"}
# ping fallback su schema generico
assert by_name["ping"]["inputSchema"] == {
"type": "object",
"additionalProperties": True,
}
def test_mount_mcp_endpoint_requires_auth():
app = _make_app()
mount_mcp_endpoint(
app,
name="test",
version="1.0",
valid_tokens=VALID_TOKENS,
internal_base_url="http://localhost:0",
tools=[{"name": "echo"}],
)
c = TestClient(app)
r = c.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"})
assert r.status_code == 401
r = c.post(
"/mcp",
headers={"Authorization": "Bearer WRONG"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
)
assert r.status_code == 403
def test_explicit_input_schema_overrides_derived():
app = _make_app()
custom = {"type": "object", "properties": {"custom": {"type": "string"}}, "required": ["custom"]}
mount_mcp_endpoint(
app,
name="test",
version="1.0",
valid_tokens=VALID_TOKENS,
internal_base_url="http://localhost:0",
tools=[{"name": "echo", "input_schema": custom}],
)
c = TestClient(app)
r = c.post(
"/mcp",
headers={"Authorization": f"Bearer {VALID_TOKEN}"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
)
assert r.json()["result"]["tools"][0]["inputSchema"] == custom
+59
View File
@@ -0,0 +1,59 @@
from __future__ import annotations
from cerbero_mcp.common.microstructure import orderbook_imbalance
def test_orderbook_imbalance_balanced():
bids = [[100.0, 1.0], [99.5, 1.0], [99.0, 1.0]]
asks = [[100.5, 1.0], [101.0, 1.0], [101.5, 1.0]]
out = orderbook_imbalance(bids, asks, depth=3)
assert abs(out["imbalance_ratio"]) < 0.01 # bilanciato
assert out["bid_volume"] == 3.0
assert out["ask_volume"] == 3.0
assert out["microprice"] is not None
def test_orderbook_imbalance_bid_heavy():
bids = [[100.0, 5.0], [99.5, 5.0]]
asks = [[100.5, 1.0], [101.0, 1.0]]
out = orderbook_imbalance(bids, asks, depth=2)
assert out["imbalance_ratio"] > 0.5 # forte bid pressure
assert out["bid_volume"] == 10.0
assert out["ask_volume"] == 2.0
def test_orderbook_imbalance_ask_heavy():
bids = [[100.0, 1.0], [99.5, 1.0]]
asks = [[100.5, 5.0], [101.0, 5.0]]
out = orderbook_imbalance(bids, asks, depth=2)
assert out["imbalance_ratio"] < -0.5
def test_orderbook_imbalance_microprice_skew():
"""Microprice è weighted mid: pesato bid/ask depth opposto."""
bids = [[100.0, 9.0]]
asks = [[101.0, 1.0]]
out = orderbook_imbalance(bids, asks, depth=1)
# large bid → microprice closer to ask (paradox: weighted by *opposite* size)
assert out["microprice"] > 100.5
def test_orderbook_imbalance_empty():
out = orderbook_imbalance([], [], depth=5)
assert out["imbalance_ratio"] is None
assert out["microprice"] is None
def test_orderbook_imbalance_one_sided():
out = orderbook_imbalance([[100.0, 1.0]], [], depth=1)
assert out["imbalance_ratio"] == 1.0 # all bid
def test_orderbook_imbalance_slope():
"""Slope = velocity of liquidity dropoff: ripido = poca liquidità in profondità."""
bids_steep = [[100.0, 10.0], [99.0, 1.0]] # depth crolla → slope alto
asks_steep = [[101.0, 10.0], [102.0, 1.0]]
out = orderbook_imbalance(bids_steep, asks_steep, depth=2)
assert out["bid_slope"] is not None
# bid liquidity drops by 9 per 1 price unit → slope ~9
assert out["bid_slope"] > 5.0
+144
View File
@@ -0,0 +1,144 @@
"""Test puri per mcp_common.options (logiche option-flow indipendenti
dall'exchange).
"""
from __future__ import annotations
import pytest
from cerbero_mcp.common.options import (
atm_vs_wings_vol,
dealer_gamma_profile,
oi_weighted_skew,
smile_asymmetry,
vanna_charm_aggregate,
)
# ---------- oi_weighted_skew ----------
def test_oi_weighted_skew_balanced():
"""OI distribuito 50/50 calls/puts → skew vicino a 0."""
legs = [
{"iv": 0.5, "delta": 0.5, "oi": 100, "option_type": "call"},
{"iv": 0.5, "delta": -0.5, "oi": 100, "option_type": "put"},
]
out = oi_weighted_skew(legs)
assert abs(out["skew"]) < 0.01
def test_oi_weighted_skew_put_heavy():
"""Put heavy → IV media puts > IV media calls → skew positivo (put > call)."""
legs = [
{"iv": 0.4, "delta": 0.5, "oi": 50, "option_type": "call"},
{"iv": 0.7, "delta": -0.5, "oi": 500, "option_type": "put"},
]
out = oi_weighted_skew(legs)
assert out["skew"] > 0
assert out["call_iv_weighted"] > 0
assert out["put_iv_weighted"] > out["call_iv_weighted"]
def test_oi_weighted_skew_empty():
out = oi_weighted_skew([])
assert out == {"skew": None, "call_iv_weighted": None, "put_iv_weighted": None, "total_oi": 0}
# ---------- smile_asymmetry ----------
def test_smile_asymmetry_symmetric():
"""Smile simmetrico ATM → asymmetry ≈ 0."""
legs = [
{"strike": 80, "iv": 0.55, "option_type": "put"},
{"strike": 90, "iv": 0.50, "option_type": "put"},
{"strike": 100, "iv": 0.45, "option_type": "call"},
{"strike": 110, "iv": 0.50, "option_type": "call"},
{"strike": 120, "iv": 0.55, "option_type": "call"},
]
out = smile_asymmetry(legs, spot=100.0)
assert out["atm_iv"] is not None
assert abs(out["asymmetry"]) < 0.05
def test_smile_asymmetry_put_skew():
"""OTM puts (low strike) IV >> OTM calls (high strike) IV → asymmetry > 0."""
legs = [
{"strike": 80, "iv": 0.80, "option_type": "put"},
{"strike": 100, "iv": 0.50, "option_type": "call"},
{"strike": 120, "iv": 0.45, "option_type": "call"},
]
out = smile_asymmetry(legs, spot=100.0)
assert out["asymmetry"] > 0.1
def test_smile_asymmetry_no_atm():
legs = [{"strike": 200, "iv": 0.5, "option_type": "call"}]
out = smile_asymmetry(legs, spot=100.0)
assert out["atm_iv"] is None
# ---------- atm_vs_wings_vol ----------
def test_atm_vs_wings_vol_basic():
legs = [
{"strike": 90, "iv": 0.55, "delta": -0.25, "option_type": "put"},
{"strike": 100, "iv": 0.45, "delta": 0.5, "option_type": "call"},
{"strike": 110, "iv": 0.50, "delta": 0.25, "option_type": "call"},
]
out = atm_vs_wings_vol(legs, spot=100.0)
assert out["atm_iv"] == pytest.approx(0.45, rel=1e-3)
assert out["wing_25d_call_iv"] == pytest.approx(0.50, rel=1e-3)
assert out["wing_25d_put_iv"] == pytest.approx(0.55, rel=1e-3)
# ATM<wings → richness positiva
assert out["wing_richness"] > 0
def test_atm_vs_wings_vol_no_data():
out = atm_vs_wings_vol([], spot=100.0)
assert out["atm_iv"] is None
# ---------- dealer_gamma_profile ----------
def test_dealer_gamma_profile_assumes_dealer_short_calls():
"""Convention: dealer SHORT calls (sells calls to retail), LONG puts.
Calls oi → negative dealer gamma, puts oi → positive dealer gamma.
"""
legs = [
{"strike": 100, "gamma": 0.01, "oi": 1000, "option_type": "call"},
{"strike": 100, "gamma": 0.01, "oi": 500, "option_type": "put"},
]
out = dealer_gamma_profile(legs, spot=100.0)
# call gamma greater than put gamma at same strike → net dealer short gamma
assert len(out["by_strike"]) == 1
row = out["by_strike"][0]
assert row["call_dealer_gamma"] < 0
assert row["put_dealer_gamma"] > 0
assert row["net_dealer_gamma"] < 0 # calls dominate
assert out["total_net_dealer_gamma"] < 0
def test_dealer_gamma_profile_empty():
out = dealer_gamma_profile([], spot=100.0)
assert out["by_strike"] == []
assert out["total_net_dealer_gamma"] == 0.0
# ---------- vanna_charm_aggregate ----------
def test_vanna_charm_aggregate_basic():
legs = [
{"strike": 100, "vanna": 0.05, "charm": -0.001, "oi": 1000, "option_type": "call"},
{"strike": 100, "vanna": -0.05, "charm": 0.001, "oi": 500, "option_type": "put"},
]
out = vanna_charm_aggregate(legs, spot=100.0)
assert out["total_vanna"] != 0 # some net exposure
assert "total_charm" in out
assert out["legs_analyzed"] == 2
def test_vanna_charm_aggregate_skip_missing_greeks():
legs = [
{"strike": 100, "vanna": None, "charm": -0.001, "oi": 1000, "option_type": "call"},
{"strike": 100, "vanna": 0.05, "charm": None, "oi": 500, "option_type": "put"},
]
out = vanna_charm_aggregate(legs, spot=100.0)
# entrambe le legs hanno almeno una greca None → skippate
assert out["legs_analyzed"] == 0
+51
View File
@@ -0,0 +1,51 @@
from __future__ import annotations
import random
from cerbero_mcp.common.stats import cointegration_test
def test_cointegrated_synthetic_pair():
"""Costruisco coppia cointegrata: B random walk, A = 2*B + noise stazionario."""
r = random.Random(1)
b = [100.0]
for _ in range(300):
b.append(b[-1] + r.gauss(0, 1))
a = [2 * b[i] + r.gauss(0, 0.5) for i in range(len(b))]
out = cointegration_test(a, b)
assert out["cointegrated"] is True
assert out["beta"] == pytest_approx(2.0, rel=0.05)
assert out["adf_t_stat"] is not None
assert out["adf_t_stat"] < -2.86
def test_not_cointegrated_independent_walks():
"""Due random walk indipendenti → spread non stazionario → no cointegration."""
r = random.Random(2)
a = [100.0]
b = [100.0]
for _ in range(300):
a.append(a[-1] + r.gauss(0, 1))
b.append(b[-1] + r.gauss(0, 1))
out = cointegration_test(a, b)
# Per due RW indipendenti, t-stat ADF è solitamente > -2.86 → non cointegrate
assert out["cointegrated"] is False or out["adf_t_stat"] > -3.0
def test_cointegration_short_series():
out = cointegration_test([1.0, 2.0], [3.0, 4.0])
assert out["cointegrated"] is None
assert out["beta"] is None
def test_cointegration_mismatched_length():
out = cointegration_test([1.0, 2.0, 3.0], [1.0, 2.0])
assert out["cointegrated"] is None
def pytest_approx(value, rel):
"""Tiny helper to avoid importing pytest just for approx."""
class _Approx:
def __eq__(self, other):
return abs(other - value) <= abs(value) * rel
return _Approx()