Files
TieMeasureFlow/server/routers/users.py
T
Adriano d6508e0ae8 feat: FASE 1 - Backend Core (modelli, auth, API)
Implementazione completa del backend FastAPI:
- Modelli SQLAlchemy: User, Recipe, RecipeVersion, RecipeTask,
  RecipeSubtask, Measurement, AccessLog, SystemSetting, RecipeVersionAudit
- Schemas Pydantic v2 per tutti i CRUD + statistiche SPC
- Middleware: API Key auth (X-API-Key) con role checking + access logging
- Router: auth, users, recipes, tasks, measurements, files, settings
- Services: auth (bcrypt+secrets), recipe (copy-on-write versioning),
  measurement (auto pass/fail con UTL/UWL/LWL/LTL)
- Alembic env.py con import modelli attivi
- Fix architect review: no double-commit, recipe_id subquery filter,
  user_id in access logs, type annotations corrette

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 00:40:50 +01:00

128 lines
4.5 KiB
Python

"""Users router - CRUD operations (admin only)."""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from middleware.api_key import require_admin_user
from models.user import User
from schemas.user import UserCreate, UserResponse, UserUpdate
from services.auth_service import create_user, hash_password, regenerate_api_key
router = APIRouter(prefix="/api/users", tags=["users"])
@router.get("", response_model=list[UserResponse])
async def list_users(
admin: User = Depends(require_admin_user),
db: AsyncSession = Depends(get_db),
):
"""List all users (admin only)."""
result = await db.execute(select(User).order_by(User.username))
users = result.scalars().all()
return [UserResponse.model_validate(u) for u in users]
@router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_new_user(
data: UserCreate,
admin: User = Depends(require_admin_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new user (admin only)."""
# Check username uniqueness
existing = await db.execute(
select(User).where(User.username == data.username)
)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Username '{data.username}' already exists",
)
# Validate roles
valid_roles = {"Maker", "MeasurementTec", "Metrologist"}
for role in data.roles:
if role not in valid_roles:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid role '{role}'. Valid roles: {valid_roles}",
)
user = await create_user(
db,
username=data.username,
password=data.password,
display_name=data.display_name,
email=data.email,
roles=data.roles,
is_admin=data.is_admin,
language_pref=data.language_pref,
theme_pref=data.theme_pref,
)
return UserResponse.model_validate(user)
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
data: UserUpdate,
admin: User = Depends(require_admin_user),
db: AsyncSession = Depends(get_db),
):
"""Update user (admin only)."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
update_data = data.model_dump(exclude_unset=True)
# Validate roles if provided
if "roles" in update_data:
valid_roles = {"Maker", "MeasurementTec", "Metrologist"}
for role in update_data["roles"]:
if role not in valid_roles:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid role '{role}'. Valid roles: {valid_roles}",
)
for field, value in update_data.items():
setattr(user, field, value)
await db.flush()
await db.refresh(user)
return UserResponse.model_validate(user)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def deactivate_user(
user_id: int,
admin: User = Depends(require_admin_user),
db: AsyncSession = Depends(get_db),
):
"""Soft-delete user (set active=False)."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if user.id == admin.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate yourself",
)
user.active = False
user.api_key = None
await db.flush()
@router.post("/{user_id}/regenerate-key")
async def regenerate_user_key(
user_id: int,
admin: User = Depends(require_admin_user),
db: AsyncSession = Depends(get_db),
):
"""Regenerate API key for a user (admin only)."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
new_key = await regenerate_api_key(db, user_id)
return {"api_key": new_key, "message": f"API key regenerated for user {user.username}"}