Files
TieMeasureFlow/server/middleware/rate_limit.py
Adriano dd2ebf863a feat: FASE 7 - Polish & Testing (security, i18n, test suite, docs)
Security hardening: CORS lockdown, rate limiting middleware con sliding
window e eviction IP stale, security headers (CSP, HSTS, X-Frame-Options),
session cookie hardening, filename sanitization upload.

i18n completion: internazionalizzati barcode.js e csv-export.js con bridge
window.BARCODE_I18N/CSV_I18N, aggiornati .po IT/EN con 27 nuove stringhe.

Tablet UX: touch target 44px per dispositivi coarse pointer.

Test suite: 101 test totali (76 server + 25 client), copertura completa
di tutti i router API, autenticazione, ruoli, CRUD, SPC, file upload,
security integration. Infrastruttura SQLite async in-memory con fixtures.

Fix critici: MissingGreenlet in recipe_service (selectinload eager),
route ordering tasks.py, auth_service bcrypt diretto, Measurement.id
Integer per SQLite.

Documentazione: API.md (riferimento completo 40+ endpoint),
DEPLOYMENT.md (guida produzione con Docker/Nginx/SSL),
USER_GUIDE.md (manuale utente per ruolo).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 17:10:24 +01:00

105 lines
3.9 KiB
Python

"""Rate limiting middleware for FastAPI.
Implements in-memory sliding window rate limiting per client IP.
Configurable limits for login and general endpoints.
"""
import time
from collections import defaultdict
from typing import Callable
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from config import settings
class RateLimitMiddleware(BaseHTTPMiddleware):
"""Middleware that enforces per-IP rate limits using a sliding window.
- Login endpoint (/api/auth/login): limited to `rate_limit_login` req/min.
- All other endpoints: limited to `rate_limit_general` req/min.
Returns HTTP 429 with Retry-After header when limit exceeded.
"""
LOGIN_PATH = "/api/auth/login"
WINDOW_SECONDS = 60
def __init__(self, app) -> None:
super().__init__(app)
# {ip: [timestamp, ...]} per bucket
self._login_requests: dict[str, list[float]] = defaultdict(list)
self._general_requests: dict[str, list[float]] = defaultdict(list)
self._request_count = 0 # Counter for triggering eviction
def _clean_window(self, timestamps: list[float], now: float) -> list[float]:
"""Remove timestamps outside the current sliding window."""
cutoff = now - self.WINDOW_SECONDS
return [t for t in timestamps if t > cutoff]
def _evict_stale_ips(self, bucket: dict[str, list[float]], now: float) -> None:
"""Remove IP entries with no timestamps in the current window (memory leak prevention)."""
cutoff = now - self.WINDOW_SECONDS
stale_ips = [ip for ip, timestamps in bucket.items() if not timestamps or max(timestamps) <= cutoff]
for ip in stale_ips:
del bucket[ip]
def _check_rate_limit(
self,
bucket: dict[str, list[float]],
client_ip: str,
limit: int,
now: float,
) -> tuple[bool, int]:
"""Check if a request is within the rate limit.
Returns:
Tuple of (allowed, retry_after_seconds).
"""
bucket[client_ip] = self._clean_window(bucket[client_ip], now)
if len(bucket[client_ip]) >= limit:
# Calculate seconds until the oldest request falls out of window
oldest = bucket[client_ip][0]
retry_after = int(oldest + self.WINDOW_SECONDS - now) + 1
return False, max(retry_after, 1)
bucket[client_ip].append(now)
return True, 0
async def dispatch(self, request: Request, call_next: Callable) -> Response:
client_ip = request.client.host if request.client else "unknown"
now = time.time()
path = request.url.path
# Periodic eviction: every 100 requests, remove stale IP buckets
self._request_count += 1
if self._request_count % 100 == 0:
self._evict_stale_ips(self._login_requests, now)
self._evict_stale_ips(self._general_requests, now)
# Check login-specific rate limit
if path == self.LOGIN_PATH and request.method == "POST":
allowed, retry_after = self._check_rate_limit(
self._login_requests, client_ip, settings.rate_limit_login, now
)
if not allowed:
return JSONResponse(
status_code=429,
content={"detail": "Too many login attempts. Please try again later."},
headers={"Retry-After": str(retry_after)},
)
# Check general rate limit
allowed, retry_after = self._check_rate_limit(
self._general_requests, client_ip, settings.rate_limit_general, now
)
if not allowed:
return JSONResponse(
status_code=429,
content={"detail": "Too many requests. Please try again later."},
headers={"Retry-After": str(retry_after)},
)
return await call_next(request)