Files
TieMeasureFlow/server/services/spc_service.py
T
Adriano bcd807e57d feat: FASE 5b/6.1+6.2 - SPC Backend + Dashboard Metrologist (Plotly.js)
Aggiunge servizio SPC con calcoli Cp/Cpk/Pp/Ppk, carta di controllo (UCL/LCL),
istogramma con curva normale. Router FastAPI con 5 endpoint statistics, blueprint
Flask con proxy AJAX, dashboard interattiva Alpine.js + Plotly.js con filtri per
ricetta/subtask/date, riepilogo pass/fail, gauge Cpk e i18n IT/EN completo.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 15:00:05 +01:00

262 lines
7.1 KiB
Python

"""SPC (Statistical Process Control) computation service.
Pure functions — no DB dependencies. Receives lists of floats and tolerance limits,
returns structured data matching schemas in schemas/statistics.py.
Uses only Python stdlib (math, statistics). No numpy/scipy needed.
"""
import math
import statistics as stats
from datetime import datetime
from schemas.statistics import (
CapabilityData,
ControlChartData,
HistogramData,
SummaryData,
)
def compute_summary(
pass_fail_values: list[str],
) -> SummaryData:
"""Compute pass/fail/warning summary from a list of pass_fail strings.
Args:
pass_fail_values: List of "pass", "warning", or "fail" strings.
Returns:
SummaryData with counts and rates.
"""
total = len(pass_fail_values)
if total == 0:
return SummaryData(
total=0,
pass_count=0,
warning_count=0,
fail_count=0,
pass_rate=0.0,
warning_rate=0.0,
fail_rate=0.0,
)
pass_count = pass_fail_values.count("pass")
warning_count = pass_fail_values.count("warning")
fail_count = pass_fail_values.count("fail")
return SummaryData(
total=total,
pass_count=pass_count,
warning_count=warning_count,
fail_count=fail_count,
pass_rate=round(pass_count / total * 100, 2),
warning_rate=round(warning_count / total * 100, 2),
fail_rate=round(fail_count / total * 100, 2),
)
def compute_capability(
values: list[float],
utl: float | None,
ltl: float | None,
nominal: float | None,
) -> CapabilityData:
"""Compute capability indices Cp, Cpk, Pp, Ppk.
- Cp = (UTL - LTL) / (6 * sigma_within)
- Cpk = min((UTL - mean) / (3 * sigma), (mean - LTL) / (3 * sigma))
- Pp/Ppk: same formulas using population std dev (same data, no subgrouping)
Args:
values: Measured values.
utl: Upper Tolerance Limit (None if not defined).
ltl: Lower Tolerance Limit (None if not defined).
nominal: Nominal value (None if not defined).
Returns:
CapabilityData with indices and statistics.
"""
n = len(values)
if n < 2:
return CapabilityData(
cp=None, cpk=None, pp=None, ppk=None,
mean=values[0] if n == 1 else 0.0,
std_dev=0.0, n=n,
utl=utl, ltl=ltl, nominal=nominal,
)
mean = stats.mean(values)
# Population std dev for Pp/Ppk
std_dev_pop = stats.pstdev(values)
# Sample std dev for Cp/Cpk
std_dev_sample = stats.stdev(values)
cp = cpk = pp = ppk = None
if utl is not None and ltl is not None and std_dev_sample > 0:
cp = round((utl - ltl) / (6 * std_dev_sample), 4)
if std_dev_sample > 0:
cpk_values = []
if utl is not None:
cpk_values.append((utl - mean) / (3 * std_dev_sample))
if ltl is not None:
cpk_values.append((mean - ltl) / (3 * std_dev_sample))
if cpk_values:
cpk = round(min(cpk_values), 4)
if utl is not None and ltl is not None and std_dev_pop > 0:
pp = round((utl - ltl) / (6 * std_dev_pop), 4)
if std_dev_pop > 0:
ppk_values = []
if utl is not None:
ppk_values.append((utl - mean) / (3 * std_dev_pop))
if ltl is not None:
ppk_values.append((mean - ltl) / (3 * std_dev_pop))
if ppk_values:
ppk = round(min(ppk_values), 4)
return CapabilityData(
cp=cp, cpk=cpk, pp=pp, ppk=ppk,
mean=round(mean, 6),
std_dev=round(std_dev_sample, 6),
n=n,
utl=utl, ltl=ltl, nominal=nominal,
)
def compute_control_chart(
values: list[float],
timestamps: list[datetime],
utl: float | None,
uwl: float | None,
lwl: float | None,
ltl: float | None,
nominal: float | None,
) -> ControlChartData:
"""Compute control chart data with UCL/LCL and out-of-control detection.
UCL = mean + 3*sigma
LCL = mean - 3*sigma
Out-of-control: points outside UCL/LCL.
Args:
values: Measured values in chronological order.
timestamps: Corresponding timestamps.
utl/uwl/lwl/ltl: Tolerance/warning limits.
nominal: Nominal value.
Returns:
ControlChartData with values, limits, and OOC indices.
"""
n = len(values)
if n == 0:
return ControlChartData(
values=[], timestamps=[], mean=0.0, ucl=0.0, lcl=0.0,
utl=utl, uwl=uwl, lwl=lwl, ltl=ltl, nominal=nominal,
out_of_control=[],
)
mean = stats.mean(values)
if n >= 2:
sigma = stats.stdev(values)
else:
sigma = 0.0
ucl = mean + 3 * sigma
lcl = mean - 3 * sigma
# Detect out-of-control points (outside UCL/LCL)
out_of_control = []
for i, v in enumerate(values):
if v > ucl or v < lcl:
out_of_control.append(i)
return ControlChartData(
values=values,
timestamps=timestamps,
mean=round(mean, 6),
ucl=round(ucl, 6),
lcl=round(lcl, 6),
utl=utl,
uwl=uwl,
lwl=lwl,
ltl=ltl,
nominal=nominal,
out_of_control=out_of_control,
)
def compute_histogram(
values: list[float],
n_bins: int = 20,
) -> HistogramData:
"""Compute histogram bin data and normal curve overlay.
Args:
values: Measured values.
n_bins: Number of histogram bins (default 20).
Returns:
HistogramData with bins, counts, and normal curve points.
"""
n = len(values)
if n == 0:
return HistogramData(
bins=[], counts=[], normal_x=[], normal_y=[],
mean=0.0, std_dev=0.0, n=0,
)
mean = stats.mean(values)
std_dev = stats.pstdev(values) if n >= 2 else 0.0
min_val = min(values)
max_val = max(values)
# Avoid zero-width range
if max_val == min_val:
max_val = min_val + 1.0
bin_width = (max_val - min_val) / n_bins
bins = [round(min_val + i * bin_width, 6) for i in range(n_bins + 1)]
# Count values per bin
counts = [0] * n_bins
for v in values:
idx = int((v - min_val) / bin_width)
if idx >= n_bins:
idx = n_bins - 1
counts[idx] += 1
# Normal curve overlay (100 points)
normal_x: list[float] = []
normal_y: list[float] = []
if std_dev > 0:
n_curve_points = 100
x_min = mean - 4 * std_dev
x_max = mean + 4 * std_dev
x_step = (x_max - x_min) / (n_curve_points - 1)
for i in range(n_curve_points):
x = x_min + i * x_step
# Normal PDF: (1 / (sigma * sqrt(2*pi))) * exp(-0.5 * ((x-mu)/sigma)^2)
y = (1.0 / (std_dev * math.sqrt(2 * math.pi))) * math.exp(
-0.5 * ((x - mean) / std_dev) ** 2
)
# Scale to match histogram: y * n * bin_width
y_scaled = y * n * bin_width
normal_x.append(round(x, 6))
normal_y.append(round(y_scaled, 4))
return HistogramData(
bins=bins,
counts=counts,
normal_x=normal_x,
normal_y=normal_y,
mean=round(mean, 6),
std_dev=round(std_dev, 6),
n=n,
)