"""SPC (Statistical Process Control) computation service. Pure functions — no DB dependencies. Receives lists of floats and tolerance limits, returns structured data matching schemas in schemas/statistics.py. Uses only Python stdlib (math, statistics). No numpy/scipy needed. """ import math import statistics as stats from datetime import datetime from schemas.statistics import ( CapabilityData, ControlChartData, HistogramData, SummaryData, ) def compute_summary( pass_fail_values: list[str], ) -> SummaryData: """Compute pass/fail/warning summary from a list of pass_fail strings. Args: pass_fail_values: List of "pass", "warning", or "fail" strings. Returns: SummaryData with counts and rates. """ total = len(pass_fail_values) if total == 0: return SummaryData( total=0, pass_count=0, warning_count=0, fail_count=0, pass_rate=0.0, warning_rate=0.0, fail_rate=0.0, ) pass_count = pass_fail_values.count("pass") warning_count = pass_fail_values.count("warning") fail_count = pass_fail_values.count("fail") return SummaryData( total=total, pass_count=pass_count, warning_count=warning_count, fail_count=fail_count, pass_rate=round(pass_count / total * 100, 2), warning_rate=round(warning_count / total * 100, 2), fail_rate=round(fail_count / total * 100, 2), ) def compute_capability( values: list[float], utl: float | None, ltl: float | None, nominal: float | None, ) -> CapabilityData: """Compute capability indices Cp, Cpk, Pp, Ppk. - Cp = (UTL - LTL) / (6 * sigma_within) - Cpk = min((UTL - mean) / (3 * sigma), (mean - LTL) / (3 * sigma)) - Pp/Ppk: same formulas using population std dev (same data, no subgrouping) Args: values: Measured values. utl: Upper Tolerance Limit (None if not defined). ltl: Lower Tolerance Limit (None if not defined). nominal: Nominal value (None if not defined). Returns: CapabilityData with indices and statistics. """ n = len(values) if n < 2: return CapabilityData( cp=None, cpk=None, pp=None, ppk=None, mean=values[0] if n == 1 else 0.0, std_dev=0.0, n=n, utl=utl, ltl=ltl, nominal=nominal, ) mean = stats.mean(values) # Population std dev for Pp/Ppk std_dev_pop = stats.pstdev(values) # Sample std dev for Cp/Cpk std_dev_sample = stats.stdev(values) cp = cpk = pp = ppk = None if utl is not None and ltl is not None and std_dev_sample > 0: cp = round((utl - ltl) / (6 * std_dev_sample), 4) if std_dev_sample > 0: cpk_values = [] if utl is not None: cpk_values.append((utl - mean) / (3 * std_dev_sample)) if ltl is not None: cpk_values.append((mean - ltl) / (3 * std_dev_sample)) if cpk_values: cpk = round(min(cpk_values), 4) if utl is not None and ltl is not None and std_dev_pop > 0: pp = round((utl - ltl) / (6 * std_dev_pop), 4) if std_dev_pop > 0: ppk_values = [] if utl is not None: ppk_values.append((utl - mean) / (3 * std_dev_pop)) if ltl is not None: ppk_values.append((mean - ltl) / (3 * std_dev_pop)) if ppk_values: ppk = round(min(ppk_values), 4) return CapabilityData( cp=cp, cpk=cpk, pp=pp, ppk=ppk, mean=round(mean, 6), std_dev=round(std_dev_sample, 6), n=n, utl=utl, ltl=ltl, nominal=nominal, ) def compute_control_chart( values: list[float], timestamps: list[datetime], utl: float | None, uwl: float | None, lwl: float | None, ltl: float | None, nominal: float | None, ) -> ControlChartData: """Compute control chart data with UCL/LCL and out-of-control detection. UCL = mean + 3*sigma LCL = mean - 3*sigma Out-of-control: points outside UCL/LCL. Args: values: Measured values in chronological order. timestamps: Corresponding timestamps. utl/uwl/lwl/ltl: Tolerance/warning limits. nominal: Nominal value. Returns: ControlChartData with values, limits, and OOC indices. """ n = len(values) if n == 0: return ControlChartData( values=[], timestamps=[], mean=0.0, ucl=0.0, lcl=0.0, utl=utl, uwl=uwl, lwl=lwl, ltl=ltl, nominal=nominal, out_of_control=[], ) mean = stats.mean(values) if n >= 2: sigma = stats.stdev(values) else: sigma = 0.0 ucl = mean + 3 * sigma lcl = mean - 3 * sigma # Detect out-of-control points (outside UCL/LCL) out_of_control = [] for i, v in enumerate(values): if v > ucl or v < lcl: out_of_control.append(i) return ControlChartData( values=values, timestamps=timestamps, mean=round(mean, 6), ucl=round(ucl, 6), lcl=round(lcl, 6), utl=utl, uwl=uwl, lwl=lwl, ltl=ltl, nominal=nominal, out_of_control=out_of_control, ) def compute_histogram( values: list[float], n_bins: int = 20, ) -> HistogramData: """Compute histogram bin data and normal curve overlay. Args: values: Measured values. n_bins: Number of histogram bins (default 20). Returns: HistogramData with bins, counts, and normal curve points. """ n = len(values) if n == 0: return HistogramData( bins=[], counts=[], normal_x=[], normal_y=[], mean=0.0, std_dev=0.0, n=0, ) mean = stats.mean(values) std_dev = stats.pstdev(values) if n >= 2 else 0.0 min_val = min(values) max_val = max(values) # Avoid zero-width range if max_val == min_val: max_val = min_val + 1.0 bin_width = (max_val - min_val) / n_bins bins = [round(min_val + i * bin_width, 6) for i in range(n_bins + 1)] # Count values per bin counts = [0] * n_bins for v in values: idx = int((v - min_val) / bin_width) if idx >= n_bins: idx = n_bins - 1 counts[idx] += 1 # Normal curve overlay (100 points) normal_x: list[float] = [] normal_y: list[float] = [] if std_dev > 0: n_curve_points = 100 x_min = mean - 4 * std_dev x_max = mean + 4 * std_dev x_step = (x_max - x_min) / (n_curve_points - 1) for i in range(n_curve_points): x = x_min + i * x_step # Normal PDF: (1 / (sigma * sqrt(2*pi))) * exp(-0.5 * ((x-mu)/sigma)^2) y = (1.0 / (std_dev * math.sqrt(2 * math.pi))) * math.exp( -0.5 * ((x - mean) / std_dev) ** 2 ) # Scale to match histogram: y * n * bin_width y_scaled = y * n * bin_width normal_x.append(round(x, 6)) normal_y.append(round(y_scaled, 4)) return HistogramData( bins=bins, counts=counts, normal_x=normal_x, normal_y=normal_y, mean=round(mean, 6), std_dev=round(std_dev, 6), n=n, )