Files
TieMeasureFlow/server/tests/test_security.py
Adriano dd2ebf863a feat: FASE 7 - Polish & Testing (security, i18n, test suite, docs)
Security hardening: CORS lockdown, rate limiting middleware con sliding
window e eviction IP stale, security headers (CSP, HSTS, X-Frame-Options),
session cookie hardening, filename sanitization upload.

i18n completion: internazionalizzati barcode.js e csv-export.js con bridge
window.BARCODE_I18N/CSV_I18N, aggiornati .po IT/EN con 27 nuove stringhe.

Tablet UX: touch target 44px per dispositivi coarse pointer.

Test suite: 101 test totali (76 server + 25 client), copertura completa
di tutti i router API, autenticazione, ruoli, CRUD, SPC, file upload,
security integration. Infrastruttura SQLite async in-memory con fixtures.

Fix critici: MissingGreenlet in recipe_service (selectinload eager),
route ordering tasks.py, auth_service bcrypt diretto, Measurement.id
Integer per SQLite.

Documentazione: API.md (riferimento completo 40+ endpoint),
DEPLOYMENT.md (guida produzione con Docker/Nginx/SSL),
USER_GUIDE.md (manuale utente per ruolo).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 17:10:24 +01:00

162 lines
5.6 KiB
Python

"""Security integration tests for FastAPI server.
Covers CORS, rate limiting, security headers, API key auth,
path traversal protection, filename sanitization, and OPTIONS preflight.
"""
import time
from unittest.mock import patch
import pytest
from httpx import AsyncClient
from tests.conftest import _create_user, auth_headers
class TestCORSHeaders:
"""CORS header tests."""
async def test_cors_headers_present(self, client: AsyncClient):
"""Response includes CORS headers for allowed origin."""
resp = await client.get(
"/api/health",
headers={"Origin": "http://localhost:5000"},
)
assert resp.status_code == 200
assert "access-control-allow-origin" in resp.headers
class TestSecurityHeaders:
"""Security header tests."""
async def test_security_headers_present(self, client: AsyncClient):
"""Response includes all standard security headers."""
resp = await client.get("/api/health")
assert resp.status_code == 200
assert resp.headers.get("x-content-type-options") == "nosniff"
assert resp.headers.get("x-frame-options") == "DENY"
assert resp.headers.get("x-xss-protection") == "1; mode=block"
assert "strict-origin" in resp.headers.get("referrer-policy", "")
assert "default-src" in resp.headers.get("content-security-policy", "")
class TestAPIKeyAuth:
"""API key authentication tests."""
async def test_protected_endpoint_requires_api_key(self, client: AsyncClient):
"""Protected endpoint returns 401 without API key."""
resp = await client.get("/api/users")
assert resp.status_code == 401
detail = resp.json().get("detail", "")
assert "API key" in detail or "Missing" in detail
async def test_invalid_api_key_returns_401(self, client: AsyncClient):
"""Invalid API key returns 401."""
resp = await client.get(
"/api/users",
headers={"X-API-Key": "totally-invalid-key-12345"},
)
assert resp.status_code == 401
detail = resp.json().get("detail", "")
assert "Invalid" in detail or "inactive" in detail
class TestRateLimiting:
"""Rate limiting tests."""
async def test_login_rate_limit_returns_429(self, client: AsyncClient, db_session):
"""Exceeding login rate limit returns 429."""
# Default rate_limit_login = 5 per minute
# Send 6 requests to exceed the limit
for i in range(6):
resp = await client.post(
"/api/auth/login",
json={"username": f"user{i}", "password": "pass"},
)
# The 6th request (or beyond) should be rate limited
# Send one more to be sure
resp = await client.post(
"/api/auth/login",
json={"username": "overflow", "password": "pass"},
)
assert resp.status_code == 429
assert "Too many" in resp.json().get("detail", "")
async def test_rate_limit_retry_after_header(self, client: AsyncClient):
"""429 response includes Retry-After header."""
# Exhaust login rate limit
for i in range(7):
resp = await client.post(
"/api/auth/login",
json={"username": f"retry{i}", "password": "pass"},
)
# Final request should be 429 with Retry-After
resp = await client.post(
"/api/auth/login",
json={"username": "retrycheck", "password": "pass"},
)
assert resp.status_code == 429
assert "retry-after" in resp.headers
retry_val = int(resp.headers["retry-after"])
assert retry_val >= 1
class TestPathTraversal:
"""Path traversal protection in file endpoints."""
async def test_path_traversal_blocked(self, client: AsyncClient, db_session):
"""Path traversal in file endpoint is blocked (403 or 404)."""
user = await _create_user(
db_session,
username="fileuser",
roles=["Maker", "MeasurementTec"],
)
resp = await client.get(
"/api/files/../../etc/passwd",
headers=auth_headers(user),
)
# Should be 403 (access denied) or 404 (not found), never 200
assert resp.status_code in (403, 404)
class TestFilenameSanitization:
"""Filename sanitization tests."""
async def test_dotfile_rejected(self, client: AsyncClient, db_session):
"""Filenames starting with '.' are rejected."""
from routers.files import sanitize_filename
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
sanitize_filename(".htaccess")
assert exc_info.value.status_code == 400
assert "." in exc_info.value.detail
async def test_empty_filename_rejected(self, client: AsyncClient, db_session):
"""Empty filenames are rejected."""
from routers.files import sanitize_filename
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
sanitize_filename("///")
assert exc_info.value.status_code == 400
class TestOptionsPreflight:
"""OPTIONS preflight request tests."""
async def test_options_preflight(self, client: AsyncClient):
"""OPTIONS request returns CORS preflight headers."""
resp = await client.options(
"/api/health",
headers={
"Origin": "http://localhost:5000",
"Access-Control-Request-Method": "GET",
"Access-Control-Request-Headers": "X-API-Key",
},
)
assert resp.status_code == 200
assert "access-control-allow-methods" in resp.headers