d6508e0ae8
Implementazione completa del backend FastAPI: - Modelli SQLAlchemy: User, Recipe, RecipeVersion, RecipeTask, RecipeSubtask, Measurement, AccessLog, SystemSetting, RecipeVersionAudit - Schemas Pydantic v2 per tutti i CRUD + statistiche SPC - Middleware: API Key auth (X-API-Key) con role checking + access logging - Router: auth, users, recipes, tasks, measurements, files, settings - Services: auth (bcrypt+secrets), recipe (copy-on-write versioning), measurement (auto pass/fail con UTL/UWL/LWL/LTL) - Alembic env.py con import modelli attivi - Fix architect review: no double-commit, recipe_id subquery filter, user_id in access logs, type annotations corrette Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
"""API Key authentication dependency for FastAPI."""
|
|
from fastapi import Depends, HTTPException, Request, status
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from database import get_db
|
|
from models.user import User
|
|
|
|
|
|
async def get_current_user(
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> User:
|
|
"""Extract API key from header and return the authenticated user.
|
|
|
|
The API key is sent in the X-API-Key header on every request.
|
|
Login endpoint is excluded from this check.
|
|
"""
|
|
api_key = request.headers.get("X-API-Key")
|
|
if not api_key:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing API key in X-API-Key header",
|
|
)
|
|
|
|
result = await db.execute(
|
|
select(User).where(User.api_key == api_key, User.active == True)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or inactive API key",
|
|
)
|
|
|
|
# Store user_id in request state for access logging middleware
|
|
request.state.user_id = user.id
|
|
|
|
return user
|
|
|
|
|
|
def require_role(role: str):
|
|
"""Dependency factory that checks if user has a specific role."""
|
|
async def check_role(user: User = Depends(get_current_user)) -> User:
|
|
if not user.has_role(role) and not user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Role '{role}' required",
|
|
)
|
|
return user
|
|
return check_role
|
|
|
|
|
|
def require_admin():
|
|
"""Dependency that checks if user is admin."""
|
|
async def check_admin(user: User = Depends(get_current_user)) -> User:
|
|
if not user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin access required",
|
|
)
|
|
return user
|
|
return check_admin
|
|
|
|
|
|
# Pre-built role dependencies for convenience
|
|
require_maker = require_role("Maker")
|
|
require_measurement_tec = require_role("MeasurementTec")
|
|
require_metrologist = require_role("Metrologist")
|
|
require_admin_user = require_admin()
|