"""Statistics router - SPC endpoints for Metrologist dashboard.""" from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import and_, select from sqlalchemy.ext.asyncio import AsyncSession from database import get_db from middleware.api_key import require_metrologist from models.measurement import Measurement from models.recipe import RecipeVersion from models.task import RecipeSubtask, RecipeTask from models.user import User from schemas.statistics import ( CapabilityData, ControlChartData, HistogramData, SummaryData, ) from services.spc_service import ( compute_capability, compute_control_chart, compute_histogram, compute_summary, ) router = APIRouter(prefix="/api/statistics", tags=["statistics"]) async def _query_measurements( db: AsyncSession, recipe_id: int, version_id: int | None = None, subtask_id: int | None = None, date_from: datetime | None = None, date_to: datetime | None = None, operator_id: int | None = None, lot_number: str | None = None, serial_number: str | None = None, ) -> list[Measurement]: """Query measurements with common filters. Returns list of Measurement ORM objects ordered by measured_at. """ filters = [] # recipe_id filter via RecipeVersion subquery if version_id is not None: filters.append(Measurement.version_id == version_id) else: version_ids = select(RecipeVersion.id).where( RecipeVersion.recipe_id == recipe_id ) filters.append(Measurement.version_id.in_(version_ids)) if subtask_id is not None: filters.append(Measurement.subtask_id == subtask_id) if date_from is not None: filters.append(Measurement.measured_at >= date_from) if date_to is not None: filters.append(Measurement.measured_at <= date_to) if operator_id is not None: filters.append(Measurement.measured_by == operator_id) if lot_number is not None: filters.append(Measurement.lot_number == lot_number) if serial_number is not None: filters.append(Measurement.serial_number == serial_number) query = ( select(Measurement) .where(and_(*filters) if filters else True) .order_by(Measurement.measured_at.asc()) ) result = await db.execute(query) return list(result.scalars().all()) async def _get_subtask_tolerances( db: AsyncSession, subtask_id: int, ) -> dict: """Get tolerance limits for a specific subtask.""" result = await db.execute( select(RecipeSubtask).where(RecipeSubtask.id == subtask_id) ) subtask = result.scalar_one_or_none() if subtask is None: return {"utl": None, "uwl": None, "lwl": None, "ltl": None, "nominal": None} return { "utl": float(subtask.utl) if subtask.utl is not None else None, "uwl": float(subtask.uwl) if subtask.uwl is not None else None, "lwl": float(subtask.lwl) if subtask.lwl is not None else None, "ltl": float(subtask.ltl) if subtask.ltl is not None else None, "nominal": float(subtask.nominal) if subtask.nominal is not None else None, } @router.get("/summary", response_model=SummaryData) async def get_summary( recipe_id: int = Query(...), version_id: int | None = Query(None), subtask_id: int | None = Query(None), date_from: datetime | None = Query(None), date_to: datetime | None = Query(None), operator_id: int | None = Query(None), lot_number: str | None = Query(None), serial_number: str | None = Query(None), user: User = Depends(require_metrologist), db: AsyncSession = Depends(get_db), ) -> SummaryData: """Get pass/fail/warning summary for filtered measurements.""" measurements = await _query_measurements( db, recipe_id, version_id, subtask_id, date_from, date_to, operator_id, lot_number, serial_number, ) pass_fail_values = [m.pass_fail for m in measurements] return compute_summary(pass_fail_values) @router.get("/capability", response_model=CapabilityData) async def get_capability( recipe_id: int = Query(...), subtask_id: int = Query(...), version_id: int | None = Query(None), date_from: datetime | None = Query(None), date_to: datetime | None = Query(None), operator_id: int | None = Query(None), lot_number: str | None = Query(None), serial_number: str | None = Query(None), user: User = Depends(require_metrologist), db: AsyncSession = Depends(get_db), ) -> CapabilityData: """Get capability indices (Cp/Cpk/Pp/Ppk) for a specific subtask.""" measurements = await _query_measurements( db, recipe_id, version_id, subtask_id, date_from, date_to, operator_id, lot_number, serial_number, ) values = [float(m.value) for m in measurements] tol = await _get_subtask_tolerances(db, subtask_id) return compute_capability(values, tol["utl"], tol["ltl"], tol["nominal"]) @router.get("/control-chart", response_model=ControlChartData) async def get_control_chart( recipe_id: int = Query(...), subtask_id: int = Query(...), version_id: int | None = Query(None), date_from: datetime | None = Query(None), date_to: datetime | None = Query(None), operator_id: int | None = Query(None), lot_number: str | None = Query(None), serial_number: str | None = Query(None), user: User = Depends(require_metrologist), db: AsyncSession = Depends(get_db), ) -> ControlChartData: """Get control chart data with UCL/LCL and OOC detection.""" measurements = await _query_measurements( db, recipe_id, version_id, subtask_id, date_from, date_to, operator_id, lot_number, serial_number, ) values = [float(m.value) for m in measurements] timestamps = [m.measured_at for m in measurements] tol = await _get_subtask_tolerances(db, subtask_id) return compute_control_chart( values, timestamps, tol["utl"], tol["uwl"], tol["lwl"], tol["ltl"], tol["nominal"], ) @router.get("/histogram", response_model=HistogramData) async def get_histogram( recipe_id: int = Query(...), subtask_id: int = Query(...), version_id: int | None = Query(None), date_from: datetime | None = Query(None), date_to: datetime | None = Query(None), operator_id: int | None = Query(None), lot_number: str | None = Query(None), serial_number: str | None = Query(None), n_bins: int = Query(20, ge=5, le=100), user: User = Depends(require_metrologist), db: AsyncSession = Depends(get_db), ) -> HistogramData: """Get histogram data with normal curve overlay.""" measurements = await _query_measurements( db, recipe_id, version_id, subtask_id, date_from, date_to, operator_id, lot_number, serial_number, ) values = [float(m.value) for m in measurements] return compute_histogram(values, n_bins) @router.get("/subtasks") async def get_recipe_subtasks( recipe_id: int = Query(...), version_id: int | None = Query(None), user: User = Depends(require_metrologist), db: AsyncSession = Depends(get_db), ) -> list[dict]: """Get subtasks for a recipe (for filter dropdown). Returns subtask id, marker_number, description, and parent task title. """ # Get version_id: use provided or find current version if version_id is None: ver_result = await db.execute( select(RecipeVersion.id).where( RecipeVersion.recipe_id == recipe_id, RecipeVersion.is_current == True, ) ) version_id = ver_result.scalar_one_or_none() if version_id is None: return [] # Get tasks and subtasks for this version tasks_result = await db.execute( select(RecipeTask).where(RecipeTask.version_id == version_id) .order_by(RecipeTask.order_index) ) tasks = tasks_result.scalars().all() subtasks_list = [] for task in tasks: for st in sorted(task.subtasks, key=lambda s: s.marker_number): subtasks_list.append({ "id": st.id, "marker_number": st.marker_number, "description": st.description, "task_title": task.title, "nominal": float(st.nominal) if st.nominal is not None else None, "utl": float(st.utl) if st.utl is not None else None, "ltl": float(st.ltl) if st.ltl is not None else None, }) return subtasks_list