feat: FASE 1 - Backend Core (modelli, auth, API)
Implementazione completa del backend FastAPI: - Modelli SQLAlchemy: User, Recipe, RecipeVersion, RecipeTask, RecipeSubtask, Measurement, AccessLog, SystemSetting, RecipeVersionAudit - Schemas Pydantic v2 per tutti i CRUD + statistiche SPC - Middleware: API Key auth (X-API-Key) con role checking + access logging - Router: auth, users, recipes, tasks, measurements, files, settings - Services: auth (bcrypt+secrets), recipe (copy-on-write versioning), measurement (auto pass/fail con UTL/UWL/LWL/LTL) - Alembic env.py con import modelli attivi - Fix architect review: no double-commit, recipe_id subquery filter, user_id in access logs, type annotations corrette Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,195 @@
|
||||
"""File upload/download/delete router for images and PDFs."""
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
|
||||
from fastapi.responses import FileResponse
|
||||
from PIL import Image
|
||||
|
||||
from config import settings
|
||||
from middleware.api_key import get_current_user, require_maker
|
||||
from models.user import User
|
||||
|
||||
router = APIRouter(prefix="/api/files", tags=["files"])
|
||||
|
||||
# Allowed MIME types
|
||||
ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
|
||||
ALLOWED_PDF_TYPE = "application/pdf"
|
||||
ALLOWED_TYPES = ALLOWED_IMAGE_TYPES | {ALLOWED_PDF_TYPE}
|
||||
|
||||
|
||||
def validate_file_type(content_type: str) -> bool:
|
||||
"""Validate if file type is allowed."""
|
||||
return content_type in ALLOWED_TYPES
|
||||
|
||||
|
||||
def validate_file_size(file_size: int) -> bool:
|
||||
"""Validate if file size is within limits."""
|
||||
max_bytes = settings.max_upload_size_mb * 1024 * 1024
|
||||
return file_size <= max_bytes
|
||||
|
||||
|
||||
def generate_thumbnail(image_path: Path, thumbnail_path: Path, max_size: tuple[int, int] = (200, 200)):
|
||||
"""Generate a thumbnail for an image."""
|
||||
try:
|
||||
with Image.open(image_path) as img:
|
||||
img.thumbnail(max_size, Image.Resampling.LANCZOS)
|
||||
img.save(thumbnail_path, quality=85)
|
||||
except Exception as e:
|
||||
# If thumbnail generation fails, we continue without it
|
||||
print(f"Thumbnail generation failed: {e}")
|
||||
|
||||
|
||||
@router.post("/upload")
|
||||
async def upload_file(
|
||||
file: UploadFile = File(...),
|
||||
recipe_id: int | None = None,
|
||||
version_id: int | None = None,
|
||||
user: User = Depends(require_maker),
|
||||
):
|
||||
"""Upload an image or PDF file.
|
||||
|
||||
Files are stored in uploads/{recipe_id}/{version_id}/
|
||||
Thumbnails are generated for images.
|
||||
"""
|
||||
# Validate file type
|
||||
if not validate_file_type(file.content_type):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"File type {file.content_type} not allowed. Must be image or PDF.",
|
||||
)
|
||||
|
||||
# Read file content
|
||||
content = await file.read()
|
||||
file_size = len(content)
|
||||
|
||||
# Validate file size
|
||||
if not validate_file_size(file_size):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"File size {file_size} bytes exceeds maximum {settings.max_upload_size_mb}MB",
|
||||
)
|
||||
|
||||
# Determine storage path
|
||||
if recipe_id is not None and version_id is not None:
|
||||
storage_dir = settings.upload_path / str(recipe_id) / str(version_id)
|
||||
else:
|
||||
storage_dir = settings.upload_path / "general"
|
||||
|
||||
storage_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Generate unique filename if file already exists
|
||||
file_path = storage_dir / file.filename
|
||||
counter = 1
|
||||
while file_path.exists():
|
||||
name_parts = file.filename.rsplit(".", 1)
|
||||
if len(name_parts) == 2:
|
||||
file_path = storage_dir / f"{name_parts[0]}_{counter}.{name_parts[1]}"
|
||||
else:
|
||||
file_path = storage_dir / f"{file.filename}_{counter}"
|
||||
counter += 1
|
||||
|
||||
# Write file
|
||||
file_path.write_bytes(content)
|
||||
|
||||
# Generate thumbnail for images
|
||||
thumbnail_path = None
|
||||
if file.content_type in ALLOWED_IMAGE_TYPES:
|
||||
thumbnail_dir = storage_dir / "thumbnails"
|
||||
thumbnail_dir.mkdir(exist_ok=True)
|
||||
thumbnail_path = thumbnail_dir / file_path.name
|
||||
generate_thumbnail(file_path, thumbnail_path)
|
||||
|
||||
# Return relative path from uploads directory
|
||||
relative_path = file_path.relative_to(settings.upload_path)
|
||||
thumbnail_relative = (
|
||||
thumbnail_path.relative_to(settings.upload_path)
|
||||
if thumbnail_path
|
||||
else None
|
||||
)
|
||||
|
||||
return {
|
||||
"file_path": str(relative_path).replace("\\", "/"),
|
||||
"thumbnail_path": str(thumbnail_relative).replace("\\", "/") if thumbnail_relative else None,
|
||||
"file_type": file.content_type,
|
||||
"file_size": file_size,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/{file_path:path}")
|
||||
async def get_file(
|
||||
file_path: str,
|
||||
user: User = Depends(get_current_user),
|
||||
):
|
||||
"""Serve a file from the uploads directory.
|
||||
|
||||
Requires authentication but no specific role.
|
||||
"""
|
||||
# Construct full path
|
||||
full_path = settings.upload_path / file_path
|
||||
|
||||
# Security: ensure path is within uploads directory
|
||||
try:
|
||||
full_path = full_path.resolve()
|
||||
if not str(full_path).startswith(str(settings.upload_path.resolve())):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Access denied",
|
||||
)
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="File not found",
|
||||
)
|
||||
|
||||
# Check if file exists
|
||||
if not full_path.exists() or not full_path.is_file():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="File not found",
|
||||
)
|
||||
|
||||
return FileResponse(full_path)
|
||||
|
||||
|
||||
@router.delete("/{file_path:path}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def delete_file(
|
||||
file_path: str,
|
||||
user: User = Depends(require_maker),
|
||||
):
|
||||
"""Delete a file from the uploads directory.
|
||||
|
||||
Also deletes associated thumbnail if it exists.
|
||||
"""
|
||||
# Construct full path
|
||||
full_path = settings.upload_path / file_path
|
||||
|
||||
# Security: ensure path is within uploads directory
|
||||
try:
|
||||
full_path = full_path.resolve()
|
||||
if not str(full_path).startswith(str(settings.upload_path.resolve())):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Access denied",
|
||||
)
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="File not found",
|
||||
)
|
||||
|
||||
# Check if file exists
|
||||
if not full_path.exists() or not full_path.is_file():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="File not found",
|
||||
)
|
||||
|
||||
# Delete thumbnail if it exists
|
||||
if full_path.parent.name != "thumbnails":
|
||||
thumbnail_path = full_path.parent / "thumbnails" / full_path.name
|
||||
if thumbnail_path.exists():
|
||||
thumbnail_path.unlink()
|
||||
|
||||
# Delete file
|
||||
full_path.unlink()
|
||||
Reference in New Issue
Block a user