feat: FASE 5b/6.1+6.2 - SPC Backend + Dashboard Metrologist (Plotly.js)

Aggiunge servizio SPC con calcoli Cp/Cpk/Pp/Ppk, carta di controllo (UCL/LCL),
istogramma con curva normale. Router FastAPI con 5 endpoint statistics, blueprint
Flask con proxy AJAX, dashboard interattiva Alpine.js + Plotly.js con filtri per
ricetta/subtask/date, riepilogo pass/fail, gauge Cpk e i18n IT/EN completo.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Adriano
2026-02-07 15:00:05 +01:00
parent e1f4ee73d0
commit bcd807e57d
14 changed files with 1567 additions and 242 deletions
+236
View File
@@ -0,0 +1,236 @@
"""Statistics router - SPC endpoints for Metrologist dashboard."""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from middleware.api_key import require_metrologist
from models.measurement import Measurement
from models.recipe import RecipeVersion
from models.task import RecipeSubtask, RecipeTask
from models.user import User
from schemas.statistics import (
CapabilityData,
ControlChartData,
HistogramData,
SummaryData,
)
from services.spc_service import (
compute_capability,
compute_control_chart,
compute_histogram,
compute_summary,
)
router = APIRouter(prefix="/api/statistics", tags=["statistics"])
async def _query_measurements(
db: AsyncSession,
recipe_id: int,
version_id: int | None = None,
subtask_id: int | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
operator_id: int | None = None,
lot_number: str | None = None,
serial_number: str | None = None,
) -> list[Measurement]:
"""Query measurements with common filters.
Returns list of Measurement ORM objects ordered by measured_at.
"""
filters = []
# recipe_id filter via RecipeVersion subquery
if version_id is not None:
filters.append(Measurement.version_id == version_id)
else:
version_ids = select(RecipeVersion.id).where(
RecipeVersion.recipe_id == recipe_id
)
filters.append(Measurement.version_id.in_(version_ids))
if subtask_id is not None:
filters.append(Measurement.subtask_id == subtask_id)
if date_from is not None:
filters.append(Measurement.measured_at >= date_from)
if date_to is not None:
filters.append(Measurement.measured_at <= date_to)
if operator_id is not None:
filters.append(Measurement.measured_by == operator_id)
if lot_number is not None:
filters.append(Measurement.lot_number == lot_number)
if serial_number is not None:
filters.append(Measurement.serial_number == serial_number)
query = (
select(Measurement)
.where(and_(*filters) if filters else True)
.order_by(Measurement.measured_at.asc())
)
result = await db.execute(query)
return list(result.scalars().all())
async def _get_subtask_tolerances(
db: AsyncSession,
subtask_id: int,
) -> dict:
"""Get tolerance limits for a specific subtask."""
result = await db.execute(
select(RecipeSubtask).where(RecipeSubtask.id == subtask_id)
)
subtask = result.scalar_one_or_none()
if subtask is None:
return {"utl": None, "uwl": None, "lwl": None, "ltl": None, "nominal": None}
return {
"utl": float(subtask.utl) if subtask.utl is not None else None,
"uwl": float(subtask.uwl) if subtask.uwl is not None else None,
"lwl": float(subtask.lwl) if subtask.lwl is not None else None,
"ltl": float(subtask.ltl) if subtask.ltl is not None else None,
"nominal": float(subtask.nominal) if subtask.nominal is not None else None,
}
@router.get("/summary", response_model=SummaryData)
async def get_summary(
recipe_id: int = Query(...),
version_id: int | None = Query(None),
subtask_id: int | None = Query(None),
date_from: datetime | None = Query(None),
date_to: datetime | None = Query(None),
operator_id: int | None = Query(None),
lot_number: str | None = Query(None),
serial_number: str | None = Query(None),
user: User = Depends(require_metrologist),
db: AsyncSession = Depends(get_db),
) -> SummaryData:
"""Get pass/fail/warning summary for filtered measurements."""
measurements = await _query_measurements(
db, recipe_id, version_id, subtask_id,
date_from, date_to, operator_id, lot_number, serial_number,
)
pass_fail_values = [m.pass_fail for m in measurements]
return compute_summary(pass_fail_values)
@router.get("/capability", response_model=CapabilityData)
async def get_capability(
recipe_id: int = Query(...),
subtask_id: int = Query(...),
version_id: int | None = Query(None),
date_from: datetime | None = Query(None),
date_to: datetime | None = Query(None),
operator_id: int | None = Query(None),
lot_number: str | None = Query(None),
serial_number: str | None = Query(None),
user: User = Depends(require_metrologist),
db: AsyncSession = Depends(get_db),
) -> CapabilityData:
"""Get capability indices (Cp/Cpk/Pp/Ppk) for a specific subtask."""
measurements = await _query_measurements(
db, recipe_id, version_id, subtask_id,
date_from, date_to, operator_id, lot_number, serial_number,
)
values = [float(m.value) for m in measurements]
tol = await _get_subtask_tolerances(db, subtask_id)
return compute_capability(values, tol["utl"], tol["ltl"], tol["nominal"])
@router.get("/control-chart", response_model=ControlChartData)
async def get_control_chart(
recipe_id: int = Query(...),
subtask_id: int = Query(...),
version_id: int | None = Query(None),
date_from: datetime | None = Query(None),
date_to: datetime | None = Query(None),
operator_id: int | None = Query(None),
lot_number: str | None = Query(None),
serial_number: str | None = Query(None),
user: User = Depends(require_metrologist),
db: AsyncSession = Depends(get_db),
) -> ControlChartData:
"""Get control chart data with UCL/LCL and OOC detection."""
measurements = await _query_measurements(
db, recipe_id, version_id, subtask_id,
date_from, date_to, operator_id, lot_number, serial_number,
)
values = [float(m.value) for m in measurements]
timestamps = [m.measured_at for m in measurements]
tol = await _get_subtask_tolerances(db, subtask_id)
return compute_control_chart(
values, timestamps,
tol["utl"], tol["uwl"], tol["lwl"], tol["ltl"], tol["nominal"],
)
@router.get("/histogram", response_model=HistogramData)
async def get_histogram(
recipe_id: int = Query(...),
subtask_id: int = Query(...),
version_id: int | None = Query(None),
date_from: datetime | None = Query(None),
date_to: datetime | None = Query(None),
operator_id: int | None = Query(None),
lot_number: str | None = Query(None),
serial_number: str | None = Query(None),
n_bins: int = Query(20, ge=5, le=100),
user: User = Depends(require_metrologist),
db: AsyncSession = Depends(get_db),
) -> HistogramData:
"""Get histogram data with normal curve overlay."""
measurements = await _query_measurements(
db, recipe_id, version_id, subtask_id,
date_from, date_to, operator_id, lot_number, serial_number,
)
values = [float(m.value) for m in measurements]
return compute_histogram(values, n_bins)
@router.get("/subtasks")
async def get_recipe_subtasks(
recipe_id: int = Query(...),
version_id: int | None = Query(None),
user: User = Depends(require_metrologist),
db: AsyncSession = Depends(get_db),
) -> list[dict]:
"""Get subtasks for a recipe (for filter dropdown).
Returns subtask id, marker_number, description, and parent task title.
"""
# Get version_id: use provided or find current version
if version_id is None:
ver_result = await db.execute(
select(RecipeVersion.id).where(
RecipeVersion.recipe_id == recipe_id,
RecipeVersion.is_current == True,
)
)
version_id = ver_result.scalar_one_or_none()
if version_id is None:
return []
# Get tasks and subtasks for this version
tasks_result = await db.execute(
select(RecipeTask).where(RecipeTask.version_id == version_id)
.order_by(RecipeTask.order_index)
)
tasks = tasks_result.scalars().all()
subtasks_list = []
for task in tasks:
for st in sorted(task.subtasks, key=lambda s: s.marker_number):
subtasks_list.append({
"id": st.id,
"marker_number": st.marker_number,
"description": st.description,
"task_title": task.title,
"nominal": float(st.nominal) if st.nominal is not None else None,
"utl": float(st.utl) if st.utl is not None else None,
"ltl": float(st.ltl) if st.ltl is not None else None,
})
return subtasks_list