"""API Key authentication dependency for FastAPI.""" from fastapi import Depends, HTTPException, Request, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from database import get_db from models.user import User async def get_current_user( request: Request, db: AsyncSession = Depends(get_db), ) -> User: """Extract API key from header and return the authenticated user. The API key is sent in the X-API-Key header on every request. Login endpoint is excluded from this check. """ api_key = request.headers.get("X-API-Key") if not api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing API key in X-API-Key header", ) result = await db.execute( select(User).where(User.api_key == api_key, User.active == True) ) user = result.scalar_one_or_none() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or inactive API key", ) # Store user_id in request state for access logging middleware request.state.user_id = user.id return user def require_role(role: str): """Dependency factory that checks if user has a specific role.""" async def check_role(user: User = Depends(get_current_user)) -> User: if not user.has_role(role) and not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Role '{role}' required", ) return user return check_role def require_admin(): """Dependency that checks if user is admin.""" async def check_admin(user: User = Depends(get_current_user)) -> User: if not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required", ) return user return check_admin # Pre-built role dependencies for convenience require_maker = require_role("Maker") require_measurement_tec = require_role("MeasurementTec") require_metrologist = require_role("Metrologist") require_admin_user = require_admin()