"""Authentication service - password hashing, API key management.""" import secrets from datetime import datetime from passlib.context import CryptContext from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from models.user import User pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: """Hash a password using bcrypt.""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return pwd_context.verify(plain_password, hashed_password) def generate_api_key() -> str: """Generate a secure 64-character API key.""" return secrets.token_urlsafe(48) # 64 chars base64 async def authenticate_user( db: AsyncSession, username: str, password: str ) -> User | None: """Authenticate user by username and password.""" result = await db.execute( select(User).where(User.username == username, User.active == True) ) user = result.scalar_one_or_none() if user is None or not verify_password(password, user.password_hash): return None return user async def login_user(db: AsyncSession, user: User) -> str: """Generate API key and update last_login for user.""" api_key = generate_api_key() await db.execute( update(User) .where(User.id == user.id) .values(api_key=api_key, last_login=datetime.utcnow()) ) await db.flush() return api_key async def logout_user(db: AsyncSession, user: User) -> None: """Invalidate user's API key.""" await db.execute( update(User).where(User.id == user.id).values(api_key=None) ) await db.flush() async def create_user( db: AsyncSession, username: str, password: str, display_name: str, email: str | None = None, roles: list[str] | None = None, is_admin: bool = False, language_pref: str = "it", theme_pref: str = "light", ) -> User: """Create a new user with hashed password.""" user = User( username=username, password_hash=hash_password(password), display_name=display_name, email=email, roles=roles or [], is_admin=is_admin, language_pref=language_pref, theme_pref=theme_pref, ) db.add(user) await db.flush() await db.refresh(user) return user async def regenerate_api_key(db: AsyncSession, user_id: int) -> str: """Regenerate API key for a user.""" new_key = generate_api_key() await db.execute( update(User).where(User.id == user_id).values(api_key=new_key) ) await db.flush() return new_key