from __future__ import annotations import math def sma(values: list[float], period: int) -> float | None: if len(values) < period: return None return sum(values[-period:]) / period def rsi(closes: list[float], period: int = 14) -> float | None: if len(closes) < period + 1: return None gains: list[float] = [] losses: list[float] = [] for i in range(1, len(closes)): delta = closes[i] - closes[i - 1] gains.append(max(delta, 0.0)) losses.append(-min(delta, 0.0)) avg_gain = sum(gains[:period]) / period avg_loss = sum(losses[:period]) / period for i in range(period, len(gains)): avg_gain = (avg_gain * (period - 1) + gains[i]) / period avg_loss = (avg_loss * (period - 1) + losses[i]) / period if avg_loss == 0: return 100.0 rs = avg_gain / avg_loss return 100.0 - (100.0 / (1.0 + rs)) def _ema_series(values: list[float], period: int) -> list[float]: if len(values) < period: return [] k = 2.0 / (period + 1) seed = sum(values[:period]) / period out = [seed] for v in values[period:]: out.append(out[-1] + k * (v - out[-1])) return out def macd( closes: list[float], fast: int = 12, slow: int = 26, signal: int = 9, ) -> dict[str, float | None]: nothing: dict[str, float | None] = {"macd": None, "signal": None, "hist": None} if len(closes) < slow + signal: return nothing ema_fast = _ema_series(closes, fast) ema_slow = _ema_series(closes, slow) offset = slow - fast aligned_fast = ema_fast[offset:] macd_line = [f - s for f, s in zip(aligned_fast, ema_slow, strict=False)] if len(macd_line) < signal: return nothing signal_line = _ema_series(macd_line, signal) if not signal_line: return nothing last_macd = macd_line[-1] last_sig = signal_line[-1] return { "macd": last_macd, "signal": last_sig, "hist": last_macd - last_sig, } def atr( highs: list[float], lows: list[float], closes: list[float], period: int = 14, ) -> float | None: if len(closes) < period + 1: return None trs: list[float] = [] for i in range(1, len(closes)): tr = max( highs[i] - lows[i], abs(highs[i] - closes[i - 1]), abs(lows[i] - closes[i - 1]), ) trs.append(tr) if len(trs) < period: return None avg = sum(trs[:period]) / period for i in range(period, len(trs)): avg = (avg * (period - 1) + trs[i]) / period return avg def adx( highs: list[float], lows: list[float], closes: list[float], period: int = 14, ) -> dict[str, float | None]: nothing: dict[str, float | None] = {"adx": None, "+di": None, "-di": None} if len(closes) < 2 * period + 1: return nothing trs: list[float] = [] plus_dms: list[float] = [] minus_dms: list[float] = [] for i in range(1, len(closes)): tr = max( highs[i] - lows[i], abs(highs[i] - closes[i - 1]), abs(lows[i] - closes[i - 1]), ) up = highs[i] - highs[i - 1] dn = lows[i - 1] - lows[i] plus_dm = up if (up > dn and up > 0) else 0.0 minus_dm = dn if (dn > up and dn > 0) else 0.0 trs.append(tr) plus_dms.append(plus_dm) minus_dms.append(minus_dm) atr_s = sum(trs[:period]) pdm_s = sum(plus_dms[:period]) mdm_s = sum(minus_dms[:period]) dxs: list[float] = [] pdi = mdi = 0.0 for i in range(period, len(trs)): atr_s = atr_s - atr_s / period + trs[i] pdm_s = pdm_s - pdm_s / period + plus_dms[i] mdm_s = mdm_s - mdm_s / period + minus_dms[i] pdi = 100.0 * pdm_s / atr_s if atr_s else 0.0 mdi = 100.0 * mdm_s / atr_s if atr_s else 0.0 s = pdi + mdi dx = 100.0 * abs(pdi - mdi) / s if s else 0.0 dxs.append(dx) if len(dxs) < period: return nothing adx_val = sum(dxs[:period]) / period for i in range(period, len(dxs)): adx_val = (adx_val * (period - 1) + dxs[i]) / period return {"adx": adx_val, "+di": pdi, "-di": mdi} # ───── Returns helper ───── def _log_returns(closes: list[float]) -> list[float]: out: list[float] = [] for i in range(1, len(closes)): prev = closes[i - 1] curr = closes[i] if prev > 0 and curr > 0: out.append(math.log(curr / prev)) return out def _percentile(sorted_values: list[float], q: float) -> float: if not sorted_values: return 0.0 if len(sorted_values) == 1: return sorted_values[0] pos = q * (len(sorted_values) - 1) lo = int(pos) hi = min(lo + 1, len(sorted_values) - 1) frac = pos - lo return sorted_values[lo] + frac * (sorted_values[hi] - sorted_values[lo]) def _stddev(xs: list[float]) -> float: if len(xs) < 2: return 0.0 m = sum(xs) / len(xs) var = sum((x - m) ** 2 for x in xs) / (len(xs) - 1) return math.sqrt(var) # ───── vol_cone ───── def vol_cone( closes: list[float], windows: list[int] | None = None, annualization: int = 252, ) -> dict[int, dict[str, float | None]]: """Realized vol cone: per ogni window restituisce vol corrente e percentili storici (p10/p50/p90) di tutte le rolling windows del campione. Annualizzata (default 252 trading days). """ windows = windows or [10, 20, 30, 60] rets = _log_returns(closes) out: dict[int, dict[str, float | None]] = {} factor = math.sqrt(annualization) for w in windows: if len(rets) < w: out[w] = {"current": None, "p10": None, "p50": None, "p90": None} continue rolling: list[float] = [] for i in range(w, len(rets) + 1): window_rets = rets[i - w:i] rolling.append(_stddev(window_rets) * factor) rolling_sorted = sorted(rolling) out[w] = { "current": rolling[-1], "p10": _percentile(rolling_sorted, 0.10), "p50": _percentile(rolling_sorted, 0.50), "p90": _percentile(rolling_sorted, 0.90), } return out # ───── hurst_exponent ───── def hurst_exponent(closes: list[float], min_lag: int = 2, max_lag: int = 100) -> float | None: """Hurst via R/S analysis su log-prices. H≈0.5 random walk, >0.5 trending, <0.5 mean-reverting. """ if len(closes) < max(20, max_lag): return None log_p = [math.log(c) for c in closes if c > 0] if len(log_p) < max(20, max_lag): return None upper = min(max_lag, len(log_p) // 2) if upper < min_lag + 1: return None lags = list(range(min_lag, upper)) log_lags: list[float] = [] log_rs: list[float] = [] for lag in lags: # Build N/lag non-overlapping segments; for each compute R/S rs_vals: list[float] = [] n_segs = len(log_p) // lag if n_segs < 1: continue for seg in range(n_segs): chunk = log_p[seg * lag:(seg + 1) * lag] diffs = [chunk[i] - chunk[i - 1] for i in range(1, len(chunk))] if len(diffs) < 2: continue mean = sum(diffs) / len(diffs) dev = [d - mean for d in diffs] cum = [] acc = 0.0 for d in dev: acc += d cum.append(acc) r = max(cum) - min(cum) s = _stddev(diffs) if s > 0: rs_vals.append(r / s) if rs_vals: avg_rs = sum(rs_vals) / len(rs_vals) if avg_rs > 0: log_lags.append(math.log(lag)) log_rs.append(math.log(avg_rs)) if len(log_lags) < 4: return None # Linear regression slope = Hurst n = len(log_lags) mx = sum(log_lags) / n my = sum(log_rs) / n num = sum((log_lags[i] - mx) * (log_rs[i] - my) for i in range(n)) den = sum((log_lags[i] - mx) ** 2 for i in range(n)) if den == 0: return None return num / den # ───── half_life_mean_reversion ───── def half_life_mean_reversion(closes: list[float]) -> float | None: """Half-life via OU AR(1) fit: y_t - y_{t-1} = a + b*y_{t-1} + eps. Half-life = -ln(2)/ln(1+b). Se b>=0 → no mean reversion → None. """ if len(closes) < 30: return None y_lag = closes[:-1] delta = [closes[i] - closes[i - 1] for i in range(1, len(closes))] n = len(y_lag) mx = sum(y_lag) / n my = sum(delta) / n num = sum((y_lag[i] - mx) * (delta[i] - my) for i in range(n)) den = sum((y_lag[i] - mx) ** 2 for i in range(n)) if den == 0: return None b = num / den if b >= 0: return None one_plus_b = 1.0 + b if one_plus_b <= 0: return None return -math.log(2.0) / math.log(one_plus_b) # ───── garch11_forecast ───── def garch11_forecast( closes: list[float], max_iter: int = 50, ) -> dict[str, float] | None: """Forecast GARCH(1,1) one-step-ahead sigma via metodo dei momenti semplificato (no MLE). Pure-Python: stima omega, alpha, beta tramite iterazione di punto fisso minimizzando MSE sul squared-return tracking. Sufficiente per ranking volatility regimes; non production-grade. """ rets = _log_returns(closes) if len(rets) < 50: return None mean = sum(rets) / len(rets) centered = [r - mean for r in rets] sq = [r * r for r in centered] # Sample variance as long-run mean var_lr = sum(sq) / len(sq) if var_lr <= 0: return None # Simple grid for (alpha, beta) minimizing MSE of sigma2 vs realized sq best = (1e18, 0.05, 0.90) for a in [0.02, 0.05, 0.08, 0.10, 0.15]: for b in [0.80, 0.85, 0.88, 0.90, 0.93]: if a + b >= 0.999: continue omega = var_lr * (1 - a - b) if omega <= 0: continue sigma2 = var_lr mse = 0.0 for s in sq[:-1]: sigma2 = omega + a * s + b * sigma2 mse += (sigma2 - s) ** 2 if mse < best[0]: best = (mse, a, b) _, alpha, beta = best omega = var_lr * (1 - alpha - beta) sigma2 = var_lr for s in sq: sigma2 = omega + alpha * s + beta * sigma2 sigma2_next = omega + alpha * sq[-1] + beta * sigma2 return { "sigma_next": math.sqrt(max(sigma2_next, 0.0)), "alpha": alpha, "beta": beta, "omega": omega, "long_run_sigma": math.sqrt(var_lr), } # ───── autocorrelation ───── def autocorrelation(values: list[float], max_lag: int = 10) -> dict[int, float]: """Autocorrelation function (ACF) lag 1..max_lag. White noise → ≈ 0. AR(1) phi → lag1 ≈ phi, lag-k ≈ phi^k. """ if len(values) < max_lag + 2: return {} n = len(values) mean = sum(values) / n dev = [v - mean for v in values] var = sum(d * d for d in dev) / n if var == 0: return {lag: 0.0 for lag in range(1, max_lag + 1)} out: dict[int, float] = {} for lag in range(1, max_lag + 1): cov = sum(dev[i] * dev[i + lag] for i in range(n - lag)) / n out[lag] = cov / var return out # ───── rolling_sharpe ───── def rolling_sharpe( closes: list[float], window: int = 60, annualization: int = 252, risk_free: float = 0.0, ) -> dict[str, float] | None: """Sharpe e Sortino rolling sull'ultimo `window` di log-returns. Annualizzati. risk_free in tasso annualizzato. """ rets = _log_returns(closes) if len(rets) < window: return None sample = rets[-window:] daily_rf = risk_free / annualization excess = [r - daily_rf for r in sample] mean = sum(excess) / len(excess) sd = _stddev(excess) sharpe = (mean / sd) * math.sqrt(annualization) if sd > 0 else 0.0 downside = [e for e in excess if e < 0] if downside: ds_var = sum(d * d for d in downside) / len(excess) ds_sd = math.sqrt(ds_var) sortino = (mean / ds_sd) * math.sqrt(annualization) if ds_sd > 0 else 0.0 else: sortino = sharpe * 2 # nessun downside → sortino "molto buono" return {"sharpe": sharpe, "sortino": sortino, "mean_excess": mean, "stddev": sd} # ───── var_cvar ───── def var_cvar(returns: list[float], confidences: list[float] | None = None) -> dict[str, float]: """Historical VaR e CVaR (Expected Shortfall) ai livelli di confidenza. returns: serie di rendimenti (qualsiasi periodicità). VaR/CVaR restituiti come perdite positive (es. var_95=0.03 → -3% al 95%). """ confidences = confidences or [0.95, 0.99] if len(returns) < 30: return {} sorted_rets = sorted(returns) out: dict[str, float] = {} for c in confidences: tag = int(round(c * 100)) q = 1.0 - c var = -_percentile(sorted_rets, q) cutoff = -var tail = [r for r in sorted_rets if r <= cutoff] cvar = -(sum(tail) / len(tail)) if tail else var out[f"var_{tag}"] = var out[f"cvar_{tag}"] = cvar return out