"""Test statistici puri (cointegration, ADF, half-life già in indicators). Nessuna dipendenza esterna: pure-Python. """ from __future__ import annotations import math def _ols_slope_intercept(xs: list[float], ys: list[float]) -> tuple[float, float] | None: if len(xs) != len(ys) or len(xs) < 3: return None n = len(xs) mx = sum(xs) / n my = sum(ys) / n num = sum((xs[i] - mx) * (ys[i] - my) for i in range(n)) den = sum((xs[i] - mx) ** 2 for i in range(n)) if den == 0: return None slope = num / den intercept = my - slope * mx return slope, intercept def _adf_t_stat(series: list[float]) -> float | None: """Augmented Dickey-Fuller test stat semplificato (lag=0 → DF): Δy_t = a + b*y_{t-1} + eps. t-stat di b vs zero. Più negativo = più stazionario. Approssimazione: critical value ~ -2.86 al 5%. """ if len(series) < 30: return None y_lag = series[:-1] delta = [series[i] - series[i - 1] for i in range(1, len(series))] res = _ols_slope_intercept(y_lag, delta) if res is None: return None b, a = res n = len(y_lag) mx = sum(y_lag) / n den = sum((x - mx) ** 2 for x in y_lag) if den == 0: return None fitted = [a + b * y_lag[i] for i in range(n)] resid = [delta[i] - fitted[i] for i in range(n)] rss = sum(r * r for r in resid) if n - 2 <= 0: return None sigma2 = rss / (n - 2) se_b = math.sqrt(sigma2 / den) if se_b == 0: return None return b / se_b def cointegration_test( series_a: list[float], series_b: list[float], significance_t: float = -2.86, ) -> dict[str, float | bool | None]: """Engle-Granger cointegration: 1. OLS: y_t = alpha + beta * x_t + eps 2. ADF su residui: se t-stat < critical (-2.86 @ 5%) → cointegrate. """ if len(series_a) != len(series_b) or len(series_a) < 50: return { "cointegrated": None, "beta": None, "alpha": None, "adf_t_stat": None, "spread_mean": None, "spread_std": None, } res = _ols_slope_intercept(series_b, series_a) if res is None: return { "cointegrated": None, "beta": None, "alpha": None, "adf_t_stat": None, "spread_mean": None, "spread_std": None, } beta, alpha = res spread = [series_a[i] - alpha - beta * series_b[i] for i in range(len(series_a))] t_stat = _adf_t_stat(spread) cointegrated = (t_stat is not None and t_stat < significance_t) n = len(spread) mean = sum(spread) / n var = sum((s - mean) ** 2 for s in spread) / (n - 1) if n > 1 else 0.0 return { "cointegrated": cointegrated, "beta": beta, "alpha": alpha, "adf_t_stat": t_stat, "spread_mean": mean, "spread_std": math.sqrt(var), }