feat: add demo measurements to seed and user management to setup page

Seed now generates 180 realistic measurements (30 per subtask) with
Gaussian distribution for SPC testing. Setup page gains full user CRUD
with list, create/edit, password change, and activate/deactivate.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Adriano
2026-02-24 14:49:58 +01:00
parent 272685e554
commit e07a4e4f89
3 changed files with 953 additions and 7 deletions
+271 -2
View File
@@ -3,21 +3,24 @@
Protected by SETUP_PASSWORD env var. When empty/None, all endpoints return 404.
NOT protected by API Key auth (standalone access).
"""
import random
import secrets
from datetime import datetime, timedelta
from pathlib import Path
from fastapi import APIRouter, HTTPException, Request, status
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from sqlalchemy import inspect as sa_inspect
from sqlalchemy import inspect as sa_inspect, select
from config import settings
from database import Base, engine, async_session_factory
from models import (
User, Recipe, RecipeVersion, RecipeTask, RecipeSubtask,
User, Recipe, RecipeVersion, RecipeTask, RecipeSubtask, Measurement,
)
from services.auth_service import hash_password
from services.measurement_service import calculate_pass_fail
router = APIRouter(prefix="/api/setup", tags=["setup"])
@@ -46,6 +49,28 @@ class ResetDbBody(BaseModel):
confirm: bool
class ManageUserBody(BaseModel):
password: str # setup password
user_id: int | None = None # None = create, int = update
username: str
user_password: str | None = None # Required for create, optional for update
display_name: str
email: str | None = None
roles: list[str] = []
is_admin: bool = False
class ChangeUserPasswordBody(BaseModel):
password: str # setup password
user_id: int
new_password: str
class ToggleUserActiveBody(BaseModel):
password: str # setup password
user_id: int
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@@ -66,6 +91,76 @@ def _check_password(password: str) -> None:
)
def _generate_measurements_for_subtask(
subtask: RecipeSubtask,
version_id: int,
measured_by: int,
lot_numbers: list[str],
serial_numbers: list[str],
base_date: datetime,
count: int = 30,
) -> list[Measurement]:
"""Generate realistic demo measurements with Gaussian distribution."""
nominal = float(subtask.nominal)
uwl = float(subtask.uwl)
ltl = float(subtask.ltl)
utl = float(subtask.utl)
lwl = float(subtask.lwl)
sigma = (uwl - nominal) / 2.5
measurements = []
for i in range(count):
# ~80% pass, ~13% warning, ~7% fail
r = random.random()
if r < 0.07:
# Fail: outside tolerance limits
if random.random() < 0.5:
value = random.uniform(utl, utl + (utl - nominal))
else:
value = random.uniform(ltl - (nominal - ltl), ltl)
elif r < 0.20:
# Warning: between warning and tolerance
if random.random() < 0.5:
value = random.uniform(uwl, utl)
else:
value = random.uniform(ltl, lwl)
else:
# Pass: Gaussian around nominal
value = random.gauss(nominal, sigma * 0.7)
# Clamp to within warning limits
value = max(lwl, min(uwl, value))
value = round(value, 6)
pass_fail, deviation = calculate_pass_fail(value, subtask)
# Distribute across time, lots, serials
days_offset = random.uniform(0, 30)
hours_offset = random.uniform(8, 17) # working hours
measured_at = base_date - timedelta(days=days_offset) + timedelta(hours=hours_offset)
lot = lot_numbers[i % len(lot_numbers)]
serial = serial_numbers[i % len(serial_numbers)]
input_method = "usb_caliper" if random.random() < 0.7 else "manual"
m = Measurement(
subtask_id=subtask.id,
version_id=version_id,
measured_by=measured_by,
value=value,
pass_fail=pass_fail,
deviation=deviation,
lot_number=lot,
serial_number=serial,
input_method=input_method,
measured_at=measured_at,
)
measurements.append(m)
return measurements
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@@ -312,14 +407,188 @@ async def seed_demo_data(body: PasswordBody):
]
session.add_all(subtasks_2)
# ---- Measurements -------------------------------------------------
await session.flush() # Get subtask IDs
random.seed(42) # Reproducible demo data
tec_user = created_users[2] # tec1
lot_numbers = ["LOT-2025-001", "LOT-2025-002", "LOT-2025-003", "LOT-2025-004"]
serial_numbers = [f"SN-{i:04d}" for i in range(1, 13)]
base_date = datetime.now()
all_subtasks = subtasks_1 + subtasks_2
measurements_created = 0
for st in all_subtasks:
ms = _generate_measurements_for_subtask(
subtask=st,
version_id=version.id,
measured_by=tec_user.id,
lot_numbers=lot_numbers,
serial_numbers=serial_numbers,
base_date=base_date,
count=30,
)
session.add_all(ms)
measurements_created += len(ms)
return {
"status": "ok",
"message": "Demo data seeded successfully",
"users_created": [u["username"] for u in users_data],
"recipe_created": "DEMO-001",
"measurements_created": measurements_created,
}
@router.post("/list-users")
async def list_users(body: PasswordBody):
"""List all users."""
_check_password(body.password)
async with async_session_factory() as session:
result = await session.execute(select(User).order_by(User.id))
users = result.scalars().all()
return {
"status": "ok",
"users": [
{
"id": u.id,
"username": u.username,
"display_name": u.display_name,
"email": u.email,
"roles": u.roles or [],
"is_admin": u.is_admin,
"active": u.active,
"created_at": u.created_at.isoformat() if u.created_at else None,
"last_login": u.last_login.isoformat() if u.last_login else None,
}
for u in users
],
}
@router.post("/manage-user")
async def manage_user(body: ManageUserBody):
"""Create or update a user. user_id=null creates new, user_id=int updates."""
_check_password(body.password)
VALID_ROLES = {"Maker", "MeasurementTec", "Metrologist"}
invalid = set(body.roles) - VALID_ROLES
if invalid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid roles: {', '.join(invalid)}",
)
async with async_session_factory() as session:
async with session.begin():
if body.user_id is None:
# Create
if not body.user_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password is required when creating a new user",
)
# Check username uniqueness
existing = await session.execute(
select(User).where(User.username == body.username)
)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Username '{body.username}' already exists",
)
user = User(
username=body.username,
password_hash=hash_password(body.user_password),
display_name=body.display_name,
email=body.email,
roles=body.roles,
is_admin=body.is_admin,
api_key=secrets.token_hex(32),
)
session.add(user)
await session.flush()
return {"status": "ok", "message": f"User '{body.username}' created", "user_id": user.id}
else:
# Update
result = await session.execute(
select(User).where(User.id == body.user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User ID {body.user_id} not found",
)
# Check username uniqueness (exclude self)
existing = await session.execute(
select(User).where(
User.username == body.username, User.id != body.user_id
)
)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Username '{body.username}' already taken",
)
user.username = body.username
user.display_name = body.display_name
user.email = body.email
user.roles = body.roles
user.is_admin = body.is_admin
if body.user_password:
user.password_hash = hash_password(body.user_password)
return {"status": "ok", "message": f"User '{body.username}' updated", "user_id": user.id}
@router.post("/change-user-password")
async def change_user_password(body: ChangeUserPasswordBody):
"""Change a user's password and invalidate their API key."""
_check_password(body.password)
async with async_session_factory() as session:
async with session.begin():
result = await session.execute(
select(User).where(User.id == body.user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User ID {body.user_id} not found",
)
user.password_hash = hash_password(body.new_password)
user.api_key = None # Invalidate sessions
return {"status": "ok", "message": f"Password changed for '{user.username}'"}
@router.post("/toggle-user-active")
async def toggle_user_active(body: ToggleUserActiveBody):
"""Toggle a user's active status. Invalidates API key when deactivated."""
_check_password(body.password)
async with async_session_factory() as session:
async with session.begin():
result = await session.execute(
select(User).where(User.id == body.user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User ID {body.user_id} not found",
)
user.active = not user.active
if not user.active:
user.api_key = None # Invalidate sessions on deactivation
new_status = "active" if user.active else "inactive"
return {"status": "ok", "message": f"User '{user.username}' is now {new_status}", "active": user.active}
@router.post("/reset-db")
async def reset_db(body: ResetDbBody):
"""Drop all tables and recreate them. Requires confirm=true."""