dd2ebf863a
Security hardening: CORS lockdown, rate limiting middleware con sliding window e eviction IP stale, security headers (CSP, HSTS, X-Frame-Options), session cookie hardening, filename sanitization upload. i18n completion: internazionalizzati barcode.js e csv-export.js con bridge window.BARCODE_I18N/CSV_I18N, aggiornati .po IT/EN con 27 nuove stringhe. Tablet UX: touch target 44px per dispositivi coarse pointer. Test suite: 101 test totali (76 server + 25 client), copertura completa di tutti i router API, autenticazione, ruoli, CRUD, SPC, file upload, security integration. Infrastruttura SQLite async in-memory con fixtures. Fix critici: MissingGreenlet in recipe_service (selectinload eager), route ordering tasks.py, auth_service bcrypt diretto, Measurement.id Integer per SQLite. Documentazione: API.md (riferimento completo 40+ endpoint), DEPLOYMENT.md (guida produzione con Docker/Nginx/SSL), USER_GUIDE.md (manuale utente per ruolo). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
98 lines
2.7 KiB
Python
98 lines
2.7 KiB
Python
"""Authentication service - password hashing, API key management."""
|
|
import secrets
|
|
from datetime import datetime
|
|
|
|
import bcrypt
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from models.user import User
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash a password using bcrypt."""
|
|
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""Verify a password against its hash."""
|
|
return bcrypt.checkpw(
|
|
plain_password.encode("utf-8"), hashed_password.encode("utf-8")
|
|
)
|
|
|
|
|
|
def generate_api_key() -> str:
|
|
"""Generate a secure 64-character API key."""
|
|
return secrets.token_urlsafe(48) # 64 chars base64
|
|
|
|
|
|
async def authenticate_user(
|
|
db: AsyncSession, username: str, password: str
|
|
) -> User | None:
|
|
"""Authenticate user by username and password."""
|
|
result = await db.execute(
|
|
select(User).where(User.username == username, User.active == True)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
if user is None or not verify_password(password, user.password_hash):
|
|
return None
|
|
return user
|
|
|
|
|
|
async def login_user(db: AsyncSession, user: User) -> str:
|
|
"""Generate API key and update last_login for user."""
|
|
api_key = generate_api_key()
|
|
await db.execute(
|
|
update(User)
|
|
.where(User.id == user.id)
|
|
.values(api_key=api_key, last_login=datetime.utcnow())
|
|
)
|
|
await db.flush()
|
|
return api_key
|
|
|
|
|
|
async def logout_user(db: AsyncSession, user: User) -> None:
|
|
"""Invalidate user's API key."""
|
|
await db.execute(
|
|
update(User).where(User.id == user.id).values(api_key=None)
|
|
)
|
|
await db.flush()
|
|
|
|
|
|
async def create_user(
|
|
db: AsyncSession,
|
|
username: str,
|
|
password: str,
|
|
display_name: str,
|
|
email: str | None = None,
|
|
roles: list[str] | None = None,
|
|
is_admin: bool = False,
|
|
language_pref: str = "it",
|
|
theme_pref: str = "light",
|
|
) -> User:
|
|
"""Create a new user with hashed password."""
|
|
user = User(
|
|
username=username,
|
|
password_hash=hash_password(password),
|
|
display_name=display_name,
|
|
email=email,
|
|
roles=roles or [],
|
|
is_admin=is_admin,
|
|
language_pref=language_pref,
|
|
theme_pref=theme_pref,
|
|
)
|
|
db.add(user)
|
|
await db.flush()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def regenerate_api_key(db: AsyncSession, user_id: int) -> str:
|
|
"""Regenerate API key for a user."""
|
|
new_key = generate_api_key()
|
|
await db.execute(
|
|
update(User).where(User.id == user_id).values(api_key=new_key)
|
|
)
|
|
await db.flush()
|
|
return new_key
|