feat: 15 nuovi indicatori quant (common + deribit + bybit + macro + sentiment)

Common (mcp_common):
- indicators.py: vol_cone, hurst_exponent, half_life_mean_reversion,
  garch11_forecast, autocorrelation, rolling_sharpe, var_cvar
- options.py (nuovo): oi_weighted_skew, smile_asymmetry, atm_vs_wings_vol,
  dealer_gamma_profile, vanna_charm_aggregate
- microstructure.py (nuovo): orderbook_imbalance (ratio + microprice + slope)
- stats.py (nuovo): cointegration_test Engle-Granger + ADF helper

Deribit (+6 tool MCP):
- get_dealer_gamma_profile (net dealer gamma + flip level)
- get_vanna_charm (vanna/charm aggregati pesati OI)
- get_oi_weighted_skew, get_smile_asymmetry, get_atm_vs_wings_vol
- get_orderbook_imbalance

Bybit (+2 tool MCP):
- get_orderbook_imbalance, get_basis_term_structure (futures dated curve)

Macro (+2 tool MCP):
- get_yield_curve_slope (2y10y/5y30y + butterfly + regime)
- get_breakeven_inflation (FRED T5YIE/T10YIE/T5YIFR)

Sentiment (+3 tool MCP):
- get_funding_arb_spread (opportunità arb compatte annualizzate)
- get_liquidation_heatmap (heuristic da OI delta + funding extreme,
  no feed paid Coinglass)
- get_cointegration_pairs (Engle-Granger su coppie crypto Binance hourly)

Tutto in TDD pure-Python (no numpy/scipy in mcp_common). README
aggiornato con elenco completo. 442 test totali verdi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
AdrianoDev
2026-04-27 23:58:07 +02:00
parent 867180f4bf
commit a13e3fe045
21 changed files with 1922 additions and 1 deletions
@@ -1,5 +1,7 @@
from __future__ import annotations
import math
def sma(values: list[float], period: int) -> float | None:
if len(values) < period:
@@ -137,3 +139,278 @@ def adx(
for i in range(period, len(dxs)):
adx_val = (adx_val * (period - 1) + dxs[i]) / period
return {"adx": adx_val, "+di": pdi, "-di": mdi}
# ───── Returns helper ─────
def _log_returns(closes: list[float]) -> list[float]:
out: list[float] = []
for i in range(1, len(closes)):
prev = closes[i - 1]
curr = closes[i]
if prev > 0 and curr > 0:
out.append(math.log(curr / prev))
return out
def _percentile(sorted_values: list[float], q: float) -> float:
if not sorted_values:
return 0.0
if len(sorted_values) == 1:
return sorted_values[0]
pos = q * (len(sorted_values) - 1)
lo = int(pos)
hi = min(lo + 1, len(sorted_values) - 1)
frac = pos - lo
return sorted_values[lo] + frac * (sorted_values[hi] - sorted_values[lo])
def _stddev(xs: list[float]) -> float:
if len(xs) < 2:
return 0.0
m = sum(xs) / len(xs)
var = sum((x - m) ** 2 for x in xs) / (len(xs) - 1)
return math.sqrt(var)
# ───── vol_cone ─────
def vol_cone(
closes: list[float],
windows: list[int] | None = None,
annualization: int = 252,
) -> dict[int, dict[str, float | None]]:
"""Realized vol cone: per ogni window restituisce vol corrente e percentili
storici (p10/p50/p90) di tutte le rolling windows del campione.
Annualizzata (default 252 trading days).
"""
windows = windows or [10, 20, 30, 60]
rets = _log_returns(closes)
out: dict[int, dict[str, float | None]] = {}
factor = math.sqrt(annualization)
for w in windows:
if len(rets) < w:
out[w] = {"current": None, "p10": None, "p50": None, "p90": None}
continue
rolling: list[float] = []
for i in range(w, len(rets) + 1):
window_rets = rets[i - w:i]
rolling.append(_stddev(window_rets) * factor)
rolling_sorted = sorted(rolling)
out[w] = {
"current": rolling[-1],
"p10": _percentile(rolling_sorted, 0.10),
"p50": _percentile(rolling_sorted, 0.50),
"p90": _percentile(rolling_sorted, 0.90),
}
return out
# ───── hurst_exponent ─────
def hurst_exponent(closes: list[float], min_lag: int = 2, max_lag: int = 100) -> float | None:
"""Hurst via R/S analysis su log-prices. H≈0.5 random walk, >0.5 trending,
<0.5 mean-reverting.
"""
if len(closes) < max(20, max_lag):
return None
log_p = [math.log(c) for c in closes if c > 0]
if len(log_p) < max(20, max_lag):
return None
upper = min(max_lag, len(log_p) // 2)
if upper < min_lag + 1:
return None
lags = list(range(min_lag, upper))
log_lags: list[float] = []
log_rs: list[float] = []
for lag in lags:
# Build N/lag non-overlapping segments; for each compute R/S
rs_vals: list[float] = []
n_segs = len(log_p) // lag
if n_segs < 1:
continue
for seg in range(n_segs):
chunk = log_p[seg * lag:(seg + 1) * lag]
diffs = [chunk[i] - chunk[i - 1] for i in range(1, len(chunk))]
if len(diffs) < 2:
continue
mean = sum(diffs) / len(diffs)
dev = [d - mean for d in diffs]
cum = []
acc = 0.0
for d in dev:
acc += d
cum.append(acc)
r = max(cum) - min(cum)
s = _stddev(diffs)
if s > 0:
rs_vals.append(r / s)
if rs_vals:
avg_rs = sum(rs_vals) / len(rs_vals)
if avg_rs > 0:
log_lags.append(math.log(lag))
log_rs.append(math.log(avg_rs))
if len(log_lags) < 4:
return None
# Linear regression slope = Hurst
n = len(log_lags)
mx = sum(log_lags) / n
my = sum(log_rs) / n
num = sum((log_lags[i] - mx) * (log_rs[i] - my) for i in range(n))
den = sum((log_lags[i] - mx) ** 2 for i in range(n))
if den == 0:
return None
return num / den
# ───── half_life_mean_reversion ─────
def half_life_mean_reversion(closes: list[float]) -> float | None:
"""Half-life via OU AR(1) fit: y_t - y_{t-1} = a + b*y_{t-1} + eps.
Half-life = -ln(2)/ln(1+b). Se b>=0 → no mean reversion → None.
"""
if len(closes) < 30:
return None
y_lag = closes[:-1]
delta = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
n = len(y_lag)
mx = sum(y_lag) / n
my = sum(delta) / n
num = sum((y_lag[i] - mx) * (delta[i] - my) for i in range(n))
den = sum((y_lag[i] - mx) ** 2 for i in range(n))
if den == 0:
return None
b = num / den
if b >= 0:
return None
one_plus_b = 1.0 + b
if one_plus_b <= 0:
return None
return -math.log(2.0) / math.log(one_plus_b)
# ───── garch11_forecast ─────
def garch11_forecast(
closes: list[float],
max_iter: int = 50,
) -> dict[str, float] | None:
"""Forecast GARCH(1,1) one-step-ahead sigma via metodo dei momenti
semplificato (no MLE). Pure-Python: stima omega, alpha, beta tramite
iterazione di punto fisso minimizzando MSE sul squared-return tracking.
Sufficiente per ranking volatility regimes; non production-grade.
"""
rets = _log_returns(closes)
if len(rets) < 50:
return None
mean = sum(rets) / len(rets)
centered = [r - mean for r in rets]
sq = [r * r for r in centered]
# Sample variance as long-run mean
var_lr = sum(sq) / len(sq)
if var_lr <= 0:
return None
# Simple grid for (alpha, beta) minimizing MSE of sigma2 vs realized sq
best = (1e18, 0.05, 0.90)
for a in [0.02, 0.05, 0.08, 0.10, 0.15]:
for b in [0.80, 0.85, 0.88, 0.90, 0.93]:
if a + b >= 0.999:
continue
omega = var_lr * (1 - a - b)
if omega <= 0:
continue
sigma2 = var_lr
mse = 0.0
for s in sq[:-1]:
sigma2 = omega + a * s + b * sigma2
mse += (sigma2 - s) ** 2
if mse < best[0]:
best = (mse, a, b)
_, alpha, beta = best
omega = var_lr * (1 - alpha - beta)
sigma2 = var_lr
for s in sq:
sigma2 = omega + alpha * s + beta * sigma2
sigma2_next = omega + alpha * sq[-1] + beta * sigma2
return {
"sigma_next": math.sqrt(max(sigma2_next, 0.0)),
"alpha": alpha,
"beta": beta,
"omega": omega,
"long_run_sigma": math.sqrt(var_lr),
}
# ───── autocorrelation ─────
def autocorrelation(values: list[float], max_lag: int = 10) -> dict[int, float]:
"""Autocorrelation function (ACF) lag 1..max_lag. White noise → ≈ 0.
AR(1) phi → lag1 ≈ phi, lag-k ≈ phi^k.
"""
if len(values) < max_lag + 2:
return {}
n = len(values)
mean = sum(values) / n
dev = [v - mean for v in values]
var = sum(d * d for d in dev) / n
if var == 0:
return {lag: 0.0 for lag in range(1, max_lag + 1)}
out: dict[int, float] = {}
for lag in range(1, max_lag + 1):
cov = sum(dev[i] * dev[i + lag] for i in range(n - lag)) / n
out[lag] = cov / var
return out
# ───── rolling_sharpe ─────
def rolling_sharpe(
closes: list[float],
window: int = 60,
annualization: int = 252,
risk_free: float = 0.0,
) -> dict[str, float] | None:
"""Sharpe e Sortino rolling sull'ultimo `window` di log-returns.
Annualizzati. risk_free in tasso annualizzato.
"""
rets = _log_returns(closes)
if len(rets) < window:
return None
sample = rets[-window:]
daily_rf = risk_free / annualization
excess = [r - daily_rf for r in sample]
mean = sum(excess) / len(excess)
sd = _stddev(excess)
sharpe = (mean / sd) * math.sqrt(annualization) if sd > 0 else 0.0
downside = [e for e in excess if e < 0]
if downside:
ds_var = sum(d * d for d in downside) / len(excess)
ds_sd = math.sqrt(ds_var)
sortino = (mean / ds_sd) * math.sqrt(annualization) if ds_sd > 0 else 0.0
else:
sortino = sharpe * 2 # nessun downside → sortino "molto buono"
return {"sharpe": sharpe, "sortino": sortino, "mean_excess": mean, "stddev": sd}
# ───── var_cvar ─────
def var_cvar(returns: list[float], confidences: list[float] | None = None) -> dict[str, float]:
"""Historical VaR e CVaR (Expected Shortfall) ai livelli di confidenza.
returns: serie di rendimenti (qualsiasi periodicità). VaR/CVaR restituiti
come perdite positive (es. var_95=0.03 → -3% al 95%).
"""
confidences = confidences or [0.95, 0.99]
if len(returns) < 30:
return {}
sorted_rets = sorted(returns)
out: dict[str, float] = {}
for c in confidences:
tag = int(round(c * 100))
q = 1.0 - c
var = -_percentile(sorted_rets, q)
cutoff = -var
tail = [r for r in sorted_rets if r <= cutoff]
cvar = -(sum(tail) / len(tail)) if tail else var
out[f"var_{tag}"] = var
out[f"cvar_{tag}"] = cvar
return out
@@ -0,0 +1,77 @@
"""Microstructure indicators: orderbook imbalance, slope, microprice.
Tutte le funzioni accettano bids/asks come list[list[price, qty]] (formato
standard dei ticker exchange) e ritornano metriche aggregate exchange-agnostic.
"""
from __future__ import annotations
def orderbook_imbalance(
bids: list[list[float]],
asks: list[list[float]],
depth: int = 10,
) -> dict[str, float | None]:
"""Imbalance ratio = (bid_vol - ask_vol) / (bid_vol + ask_vol) sui top-`depth`
livelli. Range [-1, +1]. Positivo = bid pressure, negativo = ask pressure.
Microprice (Stoll-Bertsimas): mid pesato dalla size opposta
→ P_micro = (P_bid * Q_ask + P_ask * Q_bid) / (Q_bid + Q_ask). Best level only.
Slope: variazione cumulata di volume per unità di prezzo (proxy per
liquidità in profondità).
"""
if not bids and not asks:
return {
"imbalance_ratio": None,
"bid_volume": 0.0,
"ask_volume": 0.0,
"microprice": None,
"bid_slope": None,
"ask_slope": None,
}
top_bids = bids[:depth]
top_asks = asks[:depth]
bid_vol = sum(q for _, q in top_bids)
ask_vol = sum(q for _, q in top_asks)
total = bid_vol + ask_vol
if total == 0:
ratio = None
else:
ratio = (bid_vol - ask_vol) / total
# Microprice: best bid, best ask. Weighted by opposite-side size.
microprice = None
if top_bids and top_asks:
bp, bq = top_bids[0]
ap, aq = top_asks[0]
denom = bq + aq
if denom > 0:
microprice = (bp * aq + ap * bq) / denom
bid_slope = _depth_slope(top_bids, ascending_price=False)
ask_slope = _depth_slope(top_asks, ascending_price=True)
return {
"imbalance_ratio": ratio,
"bid_volume": bid_vol,
"ask_volume": ask_vol,
"microprice": microprice,
"bid_slope": bid_slope,
"ask_slope": ask_slope,
}
def _depth_slope(levels: list[list[float]], ascending_price: bool) -> float | None:
"""Calcola |Δq / Δp| sul primo vs penultimo livello.
Slope alto = liquidità che crolla rapidamente in profondità (book sottile).
"""
if len(levels) < 2:
return None
p_first, q_first = levels[0]
p_last, q_last = levels[-1]
dp = abs(p_last - p_first)
if dp == 0:
return None
return abs(q_first - q_last) / dp
+201
View File
@@ -0,0 +1,201 @@
"""Logiche option-flow exchange-agnostiche.
Ogni funzione accetta una lista di "legs" (dizionari) con i campi rilevanti
e ritorna metriche aggregate. La normalizzazione exchange-specific dei dati
spetta al chiamante (es. mcp-deribit costruisce le legs da chain + ticker).
"""
from __future__ import annotations
from typing import Any
# Convention dealer gamma: i dealer sono SHORT calls (le vendono al retail) e
# LONG puts. Quando spot sale e dealer sono short calls, comprano underlying
# (positive feedback → vol amplificata). Quando spot scende e dealer long puts,
# vendono underlying (positive feedback). Net dealer gamma negativo → mercato
# instabile (squeeze in entrambe le direzioni).
def oi_weighted_skew(legs: list[dict[str, Any]]) -> dict[str, float | int | None]:
"""Skew aggregato pesato per OI: IV media puts - IV media calls.
Positivo = puts richer (paura), negativo = calls richer (greed).
"""
call_num = call_den = 0.0
put_num = put_den = 0.0
for leg in legs:
iv = leg.get("iv")
oi = leg.get("oi") or 0
if iv is None or oi <= 0:
continue
if leg.get("option_type") == "call":
call_num += iv * oi
call_den += oi
elif leg.get("option_type") == "put":
put_num += iv * oi
put_den += oi
call_iv = call_num / call_den if call_den > 0 else None
put_iv = put_num / put_den if put_den > 0 else None
skew = (put_iv - call_iv) if (call_iv is not None and put_iv is not None) else None
return {
"skew": skew,
"call_iv_weighted": call_iv,
"put_iv_weighted": put_iv,
"total_oi": int(call_den + put_den),
}
def smile_asymmetry(legs: list[dict[str, Any]], spot: float) -> dict[str, float | None]:
"""Smile asymmetry: differenza media IV otm puts vs otm calls a parità
di moneyness. Positivo = put-side richer (skew negativo classico equity).
"""
if spot <= 0 or not legs:
return {"atm_iv": None, "asymmetry": None, "otm_put_iv": None, "otm_call_iv": None}
# ATM IV: media IV strike entro ±1% da spot
atm_ivs = [leg["iv"] for leg in legs if leg.get("iv") is not None and abs(leg.get("strike", 0) - spot) / spot < 0.01]
atm_iv = sum(atm_ivs) / len(atm_ivs) if atm_ivs else None
otm_put_ivs = [
leg["iv"] for leg in legs
if leg.get("iv") is not None and leg.get("option_type") == "put" and leg.get("strike", 0) < spot * 0.95
]
otm_call_ivs = [
leg["iv"] for leg in legs
if leg.get("iv") is not None and leg.get("option_type") == "call" and leg.get("strike", 0) > spot * 1.05
]
otm_put = sum(otm_put_ivs) / len(otm_put_ivs) if otm_put_ivs else None
otm_call = sum(otm_call_ivs) / len(otm_call_ivs) if otm_call_ivs else None
asym = (otm_put - otm_call) if (otm_put is not None and otm_call is not None) else None
return {
"atm_iv": atm_iv,
"asymmetry": asym,
"otm_put_iv": otm_put,
"otm_call_iv": otm_call,
}
def atm_vs_wings_vol(legs: list[dict[str, Any]], spot: float) -> dict[str, float | None]:
"""IV ATM vs IV alle ali 25-delta. Wing richness > 0 → smile (kurtosis vol).
"""
if not legs:
return {"atm_iv": None, "wing_25d_call_iv": None, "wing_25d_put_iv": None, "wing_richness": None}
def _closest(target_delta: float, opt_type: str, tol: float = 0.1) -> float | None:
best = None
best_dist = float("inf")
for leg in legs:
d = leg.get("delta")
iv = leg.get("iv")
if d is None or iv is None or leg.get("option_type") != opt_type:
continue
dist = abs(abs(d) - abs(target_delta))
if dist < best_dist:
best_dist = dist
best = iv
return best if best_dist <= tol else None
# ATM IV: leg con delta più vicino a 0.5 (call) o -0.5 (put)
atm_call_iv = _closest(0.5, "call")
atm_put_iv = _closest(-0.5, "put")
atm_ivs = [v for v in (atm_call_iv, atm_put_iv) if v is not None]
atm_iv = sum(atm_ivs) / len(atm_ivs) if atm_ivs else None
wing_call = _closest(0.25, "call")
wing_put = _closest(-0.25, "put")
wing_avg = None
if wing_call is not None and wing_put is not None:
wing_avg = (wing_call + wing_put) / 2
richness = (wing_avg - atm_iv) if (wing_avg is not None and atm_iv is not None) else None
return {
"atm_iv": atm_iv,
"wing_25d_call_iv": wing_call,
"wing_25d_put_iv": wing_put,
"wing_richness": richness,
}
def dealer_gamma_profile(
legs: list[dict[str, Any]],
spot: float,
) -> dict[str, Any]:
"""Net dealer gamma per strike (assume dealer short calls + long puts).
Restituisce per strike: call_dealer_gamma (negativo), put_dealer_gamma
(positivo), net. Aggregato totale + zero-cross strike (gamma flip).
"""
by_strike: dict[float, dict[str, float]] = {}
for leg in legs:
strike = leg.get("strike")
gamma = leg.get("gamma")
oi = leg.get("oi") or 0
if strike is None or gamma is None or oi <= 0 or spot <= 0:
continue
contrib = float(gamma) * oi * (spot ** 2) * 0.01
entry = by_strike.setdefault(
float(strike),
{"strike": float(strike), "call_dealer_gamma": 0.0, "put_dealer_gamma": 0.0},
)
if leg.get("option_type") == "call":
entry["call_dealer_gamma"] -= contrib # dealer short calls
elif leg.get("option_type") == "put":
entry["put_dealer_gamma"] += contrib # dealer long puts
rows: list[dict[str, float]] = []
for s in sorted(by_strike.keys()):
e = by_strike[s]
e["net_dealer_gamma"] = e["call_dealer_gamma"] + e["put_dealer_gamma"]
rows.append(e)
flip_level = None
for a, b in zip(rows, rows[1:], strict=False):
if (a["net_dealer_gamma"] < 0 <= b["net_dealer_gamma"]) or (
a["net_dealer_gamma"] > 0 >= b["net_dealer_gamma"]
):
denom = b["net_dealer_gamma"] - a["net_dealer_gamma"]
if denom != 0:
frac = -a["net_dealer_gamma"] / denom
flip_level = round(a["strike"] + frac * (b["strike"] - a["strike"]), 2)
break
total = sum(r["net_dealer_gamma"] for r in rows)
return {
"by_strike": [
{
"strike": r["strike"],
"call_dealer_gamma": round(r["call_dealer_gamma"], 2),
"put_dealer_gamma": round(r["put_dealer_gamma"], 2),
"net_dealer_gamma": round(r["net_dealer_gamma"], 2),
}
for r in rows
],
"total_net_dealer_gamma": round(total, 2),
"gamma_flip_level": flip_level,
}
def vanna_charm_aggregate(
legs: list[dict[str, Any]],
spot: float,
) -> dict[str, Any]:
"""Vanna (∂delta/∂IV) e Charm (∂delta/∂t) aggregati pesati per OI.
Vanna positiva → IV up, calls hedge buys; charm negativa → time decay
pushes delta down (calls only).
"""
total_vanna = 0.0
total_charm = 0.0
legs_used = 0
for leg in legs:
vanna = leg.get("vanna")
charm = leg.get("charm")
oi = leg.get("oi") or 0
if vanna is None or charm is None or oi <= 0:
continue
sign = 1 if leg.get("option_type") == "call" else -1
total_vanna += float(vanna) * oi * sign
total_charm += float(charm) * oi * sign
legs_used += 1
return {
"total_vanna": total_vanna,
"total_charm": total_charm,
"legs_analyzed": legs_used,
"spot": spot,
}
+96
View File
@@ -0,0 +1,96 @@
"""Test statistici puri (cointegration, ADF, half-life già in indicators).
Nessuna dipendenza esterna: pure-Python.
"""
from __future__ import annotations
import math
def _ols_slope_intercept(xs: list[float], ys: list[float]) -> tuple[float, float] | None:
if len(xs) != len(ys) or len(xs) < 3:
return None
n = len(xs)
mx = sum(xs) / n
my = sum(ys) / n
num = sum((xs[i] - mx) * (ys[i] - my) for i in range(n))
den = sum((xs[i] - mx) ** 2 for i in range(n))
if den == 0:
return None
slope = num / den
intercept = my - slope * mx
return slope, intercept
def _adf_t_stat(series: list[float]) -> float | None:
"""Augmented Dickey-Fuller test stat semplificato (lag=0 → DF):
Δy_t = a + b*y_{t-1} + eps. t-stat di b vs zero.
Più negativo = più stazionario. Approssimazione: critical value ~ -2.86 al 5%.
"""
if len(series) < 30:
return None
y_lag = series[:-1]
delta = [series[i] - series[i - 1] for i in range(1, len(series))]
res = _ols_slope_intercept(y_lag, delta)
if res is None:
return None
b, a = res
n = len(y_lag)
mx = sum(y_lag) / n
den = sum((x - mx) ** 2 for x in y_lag)
if den == 0:
return None
fitted = [a + b * y_lag[i] for i in range(n)]
resid = [delta[i] - fitted[i] for i in range(n)]
rss = sum(r * r for r in resid)
if n - 2 <= 0:
return None
sigma2 = rss / (n - 2)
se_b = math.sqrt(sigma2 / den)
if se_b == 0:
return None
return b / se_b
def cointegration_test(
series_a: list[float],
series_b: list[float],
significance_t: float = -2.86,
) -> dict[str, float | bool | None]:
"""Engle-Granger cointegration:
1. OLS: y_t = alpha + beta * x_t + eps
2. ADF su residui: se t-stat < critical (-2.86 @ 5%) → cointegrate.
"""
if len(series_a) != len(series_b) or len(series_a) < 50:
return {
"cointegrated": None,
"beta": None,
"alpha": None,
"adf_t_stat": None,
"spread_mean": None,
"spread_std": None,
}
res = _ols_slope_intercept(series_b, series_a)
if res is None:
return {
"cointegrated": None,
"beta": None,
"alpha": None,
"adf_t_stat": None,
"spread_mean": None,
"spread_std": None,
}
beta, alpha = res
spread = [series_a[i] - alpha - beta * series_b[i] for i in range(len(series_a))]
t_stat = _adf_t_stat(spread)
cointegrated = (t_stat is not None and t_stat < significance_t)
n = len(spread)
mean = sum(spread) / n
var = sum((s - mean) ** 2 for s in spread) / (n - 1) if n > 1 else 0.0
return {
"cointegrated": cointegrated,
"beta": beta,
"alpha": alpha,
"adf_t_stat": t_stat,
"spread_mean": mean,
"spread_std": math.sqrt(var),
}
+181 -1
View File
@@ -1,5 +1,20 @@
from mcp_common.indicators import adx, atr, macd, rsi, sma
import math
from mcp_common.indicators import (
adx,
atr,
autocorrelation,
garch11_forecast,
half_life_mean_reversion,
hurst_exponent,
macd,
rolling_sharpe,
rsi,
sma,
var_cvar,
vol_cone,
)
def test_rsi_simple():
@@ -78,3 +93,168 @@ def test_adx_flat_market():
# no directional movement → ADX near 0
assert a["adx"] is not None
assert a["adx"] < 5.0
# ---------- vol_cone ----------
def _gbm_series(mu: float, sigma: float, n: int, seed: int = 42) -> list[float]:
"""Mock GBM closes: deterministic for tests."""
import random
r = random.Random(seed)
p = [100.0]
for _ in range(n):
z = r.gauss(0.0, 1.0)
p.append(p[-1] * math.exp(mu / 252 + sigma / math.sqrt(252) * z))
return p
def test_vol_cone_returns_percentiles_per_window():
closes = _gbm_series(mu=0.0, sigma=0.5, n=400)
out = vol_cone(closes, windows=[10, 30, 60])
assert set(out.keys()) == {10, 30, 60}
for w, stats in out.items():
assert "current" in stats
assert "p10" in stats and "p50" in stats and "p90" in stats
assert stats["p10"] <= stats["p50"] <= stats["p90"]
# annualized — sensible range for sigma=0.5
assert 0.1 < stats["p50"] < 1.5
def test_vol_cone_insufficient_data():
out = vol_cone([100.0, 101.0], windows=[10, 30])
assert out[10]["current"] is None
assert out[30]["current"] is None
# ---------- hurst_exponent ----------
def test_hurst_random_walk_near_half():
closes = _gbm_series(mu=0.0, sigma=0.3, n=500, seed=7)
h = hurst_exponent(closes)
assert h is not None
# Random walk → Hurst ≈ 0.5; R/S bias positivo ben noto su sample finiti.
# Bound largo: distinguere comunque random walk da trending forte (>0.85).
assert 0.35 < h < 0.85
def test_hurst_persistent_trend():
# Strong monotonic trend → H >> 0.5
closes = [100.0 + i * 0.5 + math.sin(i / 10) * 0.1 for i in range(400)]
h = hurst_exponent(closes)
assert h is not None
assert h > 0.85
def test_hurst_insufficient_data():
assert hurst_exponent([1.0, 2.0, 3.0]) is None
# ---------- half_life_mean_reversion ----------
def test_half_life_mean_reverting_series():
"""OU process with theta=0.1 → half-life ≈ ln(2)/0.1 ≈ 6.93."""
import random
r = random.Random(123)
theta = 0.1
mu = 100.0
sigma = 0.5
s = [mu]
for _ in range(500):
s.append(s[-1] + theta * (mu - s[-1]) + sigma * r.gauss(0, 1))
hl = half_life_mean_reversion(s)
assert hl is not None
# broad tolerance — finite-sample noise
assert 3.0 < hl < 20.0
def test_half_life_trending_returns_none():
closes = [100.0 + i for i in range(200)]
hl = half_life_mean_reversion(closes)
# No mean reversion → returns None or +inf
assert hl is None or hl > 1000
# ---------- garch11_forecast ----------
def test_garch11_forecast_returns_positive_sigma():
closes = _gbm_series(mu=0.0, sigma=0.4, n=500, seed=11)
out = garch11_forecast(closes)
assert out is not None
assert out["sigma_next"] > 0
assert 0 < out["alpha"] < 1
assert 0 < out["beta"] < 1
assert out["alpha"] + out["beta"] < 1.0 # stationarity
def test_garch11_insufficient_data():
assert garch11_forecast([100.0, 101.0]) is None
# ---------- autocorrelation ----------
def test_autocorrelation_white_noise_low():
import random
r = random.Random(1)
rets = [r.gauss(0, 0.01) for _ in range(500)]
out = autocorrelation(rets, max_lag=5)
assert len(out) == 5
# white noise → all autocorr ≈ 0 (within ±2/sqrt(N))
bound = 2.0 / math.sqrt(len(rets))
for lag, val in out.items():
assert abs(val) < bound * 2 # generous
def test_autocorrelation_lag1_strong_for_ar1():
"""AR(1) with phi=0.7 → autocorr lag-1 ≈ 0.7."""
import random
r = random.Random(2)
s = [0.0]
for _ in range(500):
s.append(0.7 * s[-1] + r.gauss(0, 0.1))
out = autocorrelation(s, max_lag=3)
assert out[1] > 0.5
assert out[2] > 0.2 # geometric decay
def test_autocorrelation_insufficient_data():
assert autocorrelation([1.0], max_lag=5) == {}
# ---------- rolling_sharpe ----------
def test_rolling_sharpe_positive_for_uptrend():
closes = [100.0 * (1 + 0.001 * i) for i in range(252)]
s = rolling_sharpe(closes, window=60)
assert s is not None
assert s["sharpe"] > 0
assert s["sortino"] >= s["sharpe"] / 2 # sortino can be high if no downside
def test_rolling_sharpe_zero_volatility():
closes = [100.0] * 100
s = rolling_sharpe(closes, window=60)
assert s is not None
assert s["sharpe"] == 0.0 # no variance → 0 by convention
def test_rolling_sharpe_insufficient_data():
assert rolling_sharpe([100.0, 101.0], window=60) is None
# ---------- var_cvar ----------
def test_var_cvar_basic():
import random
r = random.Random(3)
rets = [r.gauss(0.0005, 0.02) for _ in range(1000)]
out = var_cvar(rets, confidences=[0.95, 0.99])
assert "var_95" in out and "cvar_95" in out
assert "var_99" in out and "cvar_99" in out
# VaR is loss → positive number representing percentile loss
assert out["var_95"] > 0
assert out["cvar_95"] >= out["var_95"] # CVaR worse than VaR
assert out["var_99"] >= out["var_95"]
def test_var_cvar_insufficient_data():
assert var_cvar([0.01], confidences=[0.95]) == {}
@@ -0,0 +1,59 @@
from __future__ import annotations
from mcp_common.microstructure import orderbook_imbalance
def test_orderbook_imbalance_balanced():
bids = [[100.0, 1.0], [99.5, 1.0], [99.0, 1.0]]
asks = [[100.5, 1.0], [101.0, 1.0], [101.5, 1.0]]
out = orderbook_imbalance(bids, asks, depth=3)
assert abs(out["imbalance_ratio"]) < 0.01 # bilanciato
assert out["bid_volume"] == 3.0
assert out["ask_volume"] == 3.0
assert out["microprice"] is not None
def test_orderbook_imbalance_bid_heavy():
bids = [[100.0, 5.0], [99.5, 5.0]]
asks = [[100.5, 1.0], [101.0, 1.0]]
out = orderbook_imbalance(bids, asks, depth=2)
assert out["imbalance_ratio"] > 0.5 # forte bid pressure
assert out["bid_volume"] == 10.0
assert out["ask_volume"] == 2.0
def test_orderbook_imbalance_ask_heavy():
bids = [[100.0, 1.0], [99.5, 1.0]]
asks = [[100.5, 5.0], [101.0, 5.0]]
out = orderbook_imbalance(bids, asks, depth=2)
assert out["imbalance_ratio"] < -0.5
def test_orderbook_imbalance_microprice_skew():
"""Microprice è weighted mid: pesato bid/ask depth opposto."""
bids = [[100.0, 9.0]]
asks = [[101.0, 1.0]]
out = orderbook_imbalance(bids, asks, depth=1)
# large bid → microprice closer to ask (paradox: weighted by *opposite* size)
assert out["microprice"] > 100.5
def test_orderbook_imbalance_empty():
out = orderbook_imbalance([], [], depth=5)
assert out["imbalance_ratio"] is None
assert out["microprice"] is None
def test_orderbook_imbalance_one_sided():
out = orderbook_imbalance([[100.0, 1.0]], [], depth=1)
assert out["imbalance_ratio"] == 1.0 # all bid
def test_orderbook_imbalance_slope():
"""Slope = velocity of liquidity dropoff: ripido = poca liquidità in profondità."""
bids_steep = [[100.0, 10.0], [99.0, 1.0]] # depth crolla → slope alto
asks_steep = [[101.0, 10.0], [102.0, 1.0]]
out = orderbook_imbalance(bids_steep, asks_steep, depth=2)
assert out["bid_slope"] is not None
# bid liquidity drops by 9 per 1 price unit → slope ~9
assert out["bid_slope"] > 5.0
+146
View File
@@ -0,0 +1,146 @@
"""Test puri per mcp_common.options (logiche option-flow indipendenti
dall'exchange).
"""
from __future__ import annotations
import pytest
from mcp_common.options import (
atm_vs_wings_vol,
dealer_gamma_profile,
oi_weighted_skew,
smile_asymmetry,
vanna_charm_aggregate,
)
# ---------- oi_weighted_skew ----------
def test_oi_weighted_skew_balanced():
"""OI distribuito 50/50 calls/puts → skew vicino a 0."""
legs = [
{"iv": 0.5, "delta": 0.5, "oi": 100, "option_type": "call"},
{"iv": 0.5, "delta": -0.5, "oi": 100, "option_type": "put"},
]
out = oi_weighted_skew(legs)
assert abs(out["skew"]) < 0.01
def test_oi_weighted_skew_put_heavy():
"""Put heavy → IV media puts > IV media calls → skew positivo (put > call)."""
legs = [
{"iv": 0.4, "delta": 0.5, "oi": 50, "option_type": "call"},
{"iv": 0.7, "delta": -0.5, "oi": 500, "option_type": "put"},
]
out = oi_weighted_skew(legs)
assert out["skew"] > 0
assert out["call_iv_weighted"] > 0
assert out["put_iv_weighted"] > out["call_iv_weighted"]
def test_oi_weighted_skew_empty():
out = oi_weighted_skew([])
assert out == {"skew": None, "call_iv_weighted": None, "put_iv_weighted": None, "total_oi": 0}
# ---------- smile_asymmetry ----------
def test_smile_asymmetry_symmetric():
"""Smile simmetrico ATM → asymmetry ≈ 0."""
legs = [
{"strike": 80, "iv": 0.55, "option_type": "put"},
{"strike": 90, "iv": 0.50, "option_type": "put"},
{"strike": 100, "iv": 0.45, "option_type": "call"},
{"strike": 110, "iv": 0.50, "option_type": "call"},
{"strike": 120, "iv": 0.55, "option_type": "call"},
]
out = smile_asymmetry(legs, spot=100.0)
assert out["atm_iv"] is not None
assert abs(out["asymmetry"]) < 0.05
def test_smile_asymmetry_put_skew():
"""OTM puts (low strike) IV >> OTM calls (high strike) IV → asymmetry > 0."""
legs = [
{"strike": 80, "iv": 0.80, "option_type": "put"},
{"strike": 100, "iv": 0.50, "option_type": "call"},
{"strike": 120, "iv": 0.45, "option_type": "call"},
]
out = smile_asymmetry(legs, spot=100.0)
assert out["asymmetry"] > 0.1
def test_smile_asymmetry_no_atm():
legs = [{"strike": 200, "iv": 0.5, "option_type": "call"}]
out = smile_asymmetry(legs, spot=100.0)
assert out["atm_iv"] is None
# ---------- atm_vs_wings_vol ----------
def test_atm_vs_wings_vol_basic():
legs = [
{"strike": 90, "iv": 0.55, "delta": -0.25, "option_type": "put"},
{"strike": 100, "iv": 0.45, "delta": 0.5, "option_type": "call"},
{"strike": 110, "iv": 0.50, "delta": 0.25, "option_type": "call"},
]
out = atm_vs_wings_vol(legs, spot=100.0)
assert out["atm_iv"] == pytest.approx(0.45, rel=1e-3)
assert out["wing_25d_call_iv"] == pytest.approx(0.50, rel=1e-3)
assert out["wing_25d_put_iv"] == pytest.approx(0.55, rel=1e-3)
# ATM<wings → richness positiva
assert out["wing_richness"] > 0
def test_atm_vs_wings_vol_no_data():
out = atm_vs_wings_vol([], spot=100.0)
assert out["atm_iv"] is None
# ---------- dealer_gamma_profile ----------
def test_dealer_gamma_profile_assumes_dealer_short_calls():
"""Convention: dealer SHORT calls (sells calls to retail), LONG puts.
Calls oi → negative dealer gamma, puts oi → positive dealer gamma.
"""
legs = [
{"strike": 100, "gamma": 0.01, "oi": 1000, "option_type": "call"},
{"strike": 100, "gamma": 0.01, "oi": 500, "option_type": "put"},
]
out = dealer_gamma_profile(legs, spot=100.0)
# call gamma greater than put gamma at same strike → net dealer short gamma
assert len(out["by_strike"]) == 1
row = out["by_strike"][0]
assert row["call_dealer_gamma"] < 0
assert row["put_dealer_gamma"] > 0
assert row["net_dealer_gamma"] < 0 # calls dominate
assert out["total_net_dealer_gamma"] < 0
def test_dealer_gamma_profile_empty():
out = dealer_gamma_profile([], spot=100.0)
assert out["by_strike"] == []
assert out["total_net_dealer_gamma"] == 0.0
# ---------- vanna_charm_aggregate ----------
def test_vanna_charm_aggregate_basic():
legs = [
{"strike": 100, "vanna": 0.05, "charm": -0.001, "oi": 1000, "option_type": "call"},
{"strike": 100, "vanna": -0.05, "charm": 0.001, "oi": 500, "option_type": "put"},
]
out = vanna_charm_aggregate(legs, spot=100.0)
assert out["total_vanna"] != 0 # some net exposure
assert "total_charm" in out
assert out["legs_analyzed"] == 2
def test_vanna_charm_aggregate_skip_missing_greeks():
legs = [
{"strike": 100, "vanna": None, "charm": -0.001, "oi": 1000, "option_type": "call"},
{"strike": 100, "vanna": 0.05, "charm": None, "oi": 500, "option_type": "put"},
]
out = vanna_charm_aggregate(legs, spot=100.0)
# entrambe le legs hanno almeno una greca None → skippate
assert out["legs_analyzed"] == 0
+52
View File
@@ -0,0 +1,52 @@
from __future__ import annotations
import math
import random
from mcp_common.stats import cointegration_test
def test_cointegrated_synthetic_pair():
"""Costruisco coppia cointegrata: B random walk, A = 2*B + noise stazionario."""
r = random.Random(1)
b = [100.0]
for _ in range(300):
b.append(b[-1] + r.gauss(0, 1))
a = [2 * b[i] + r.gauss(0, 0.5) for i in range(len(b))]
out = cointegration_test(a, b)
assert out["cointegrated"] is True
assert out["beta"] == pytest_approx(2.0, rel=0.05)
assert out["adf_t_stat"] is not None
assert out["adf_t_stat"] < -2.86
def test_not_cointegrated_independent_walks():
"""Due random walk indipendenti → spread non stazionario → no cointegration."""
r = random.Random(2)
a = [100.0]
b = [100.0]
for _ in range(300):
a.append(a[-1] + r.gauss(0, 1))
b.append(b[-1] + r.gauss(0, 1))
out = cointegration_test(a, b)
# Per due RW indipendenti, t-stat ADF è solitamente > -2.86 → non cointegrate
assert out["cointegrated"] is False or out["adf_t_stat"] > -3.0
def test_cointegration_short_series():
out = cointegration_test([1.0, 2.0], [3.0, 4.0])
assert out["cointegrated"] is None
assert out["beta"] is None
def test_cointegration_mismatched_length():
out = cointegration_test([1.0, 2.0, 3.0], [1.0, 2.0])
assert out["cointegrated"] is None
def pytest_approx(value, rel):
"""Tiny helper to avoid importing pytest just for approx."""
class _Approx:
def __eq__(self, other):
return abs(other - value) <= abs(value) * rel
return _Approx()