chore(v2): restructure monorepo to src/ layout with uv

Aligns the repo with the python-project-spec-design.md template chosen
for V2.0.0. Big move, no logic changes. The 3 pre-existing test
failures (test_recipes::test_update_recipe, test_recipes::
test_recipe_versioning, test_tasks::test_reorder_tasks, plus the
client test_save_measurement_proxy) survive unchanged.

Layout changes
- server/        -> src/backend/
- server/middleware/ -> src/backend/api/middleware/
- server/routers/    -> src/backend/api/routers/
- server/models/     -> src/backend/models/orm/
- server/schemas/    -> src/backend/models/api/
- server/uploads/    -> uploads/ (project root, mounted volume)
- server/tests/      -> src/backend/tests/
- client/            -> src/frontend/flask_app/ (Flask kept; React
  deroga is documented in CLAUDE.md, justified by tablet UX, USB
  caliper/barcode workflow and Fabric.js integration)

Tooling
- pyproject.toml: monorepo with [project] core deps and
  optional-dependencies server / client / dev. Replaces both
  server/requirements.txt and client/requirements.txt.
- uv.lock + .python-version (3.11) committed for reproducible builds.
- Dockerfile (root, backend) and Dockerfile.frontend rewritten to use
  uv sync --frozen --no-dev --extra server|client; legacy Dockerfiles
  preserved as Dockerfile.legacy for reference but excluded from build
  context via .dockerignore.
- docker-compose.dev.yml + docker-compose.yml: build context now ".",
  dockerfile pointing to the root files.

Code adjustments forced by the move
- Every "from config|database|models|schemas|services|routers|middleware
  import ..." rewritten to its src.backend.* equivalent (50+ files
  including indented inline imports inside test bodies).
- src/backend/migrations/env.py: insert project root into sys.path so
  alembic can resolve src.backend.* imports regardless of cwd.
- src/backend/config.py: env_file ../../.env (was ../.env), upload_path
  resolves project root via parents[2].
- src/backend/tests/conftest.py + tests: import ... from src.backend.*
  instead of bare names; old per-directory pytest.ini files removed in
  favor of root pyproject.toml [tool.pytest.ini_options].
- .gitignore: uploads/ at root, src/frontend/flask_app/static/css/
  tailwind.css path; .dockerignore tightened.
- CLAUDE.md: rewrote sections "Layout del repository", "Comandi di
  Sviluppo", "Database & Migrations", "Test", "i18n", and all path
  references throughout the architecture sections.

Verified
- uv lock resolves 77 packages; uv sync --extra server --extra client
  --extra dev installs cleanly.
- uv run pytest: 171 passed, 4 pre-existing failures.
- uv run alembic -c src/backend/migrations/alembic.ini check loads
  config and metadata (errors only on the absent local MySQL).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-25 12:26:47 +02:00
parent 86df67f2e5
commit 1a0431366f
174 changed files with 2568 additions and 308 deletions
View File
View File
+273
View File
@@ -0,0 +1,273 @@
"""Shared test fixtures for server tests.
Uses SQLite async (aiosqlite) as in-memory test database.
Overrides FastAPI's get_db dependency to inject the test session.
"""
import sys
from pathlib import Path
from collections.abc import AsyncGenerator
from unittest.mock import MagicMock
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.pool import StaticPool
# ---------------------------------------------------------------------------
# Mock heavy optional dependencies that require system libraries.
# WeasyPrint needs GTK/Pango which may not be available in test environments.
# We mock it before any server code is imported.
# ---------------------------------------------------------------------------
if "weasyprint" not in sys.modules:
_mock_weasyprint = MagicMock()
sys.modules["weasyprint"] = _mock_weasyprint
# Ensure the server package is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from src.backend.database import Base, get_db
from src.backend.main import app
from src.backend.api.middleware.rate_limit import RateLimitMiddleware
from src.backend.models.orm.user import User
from src.backend.models.orm.recipe import Recipe, RecipeVersion
from src.backend.models.orm.task import RecipeTask, RecipeSubtask
from src.backend.models.orm.measurement import Measurement
from src.backend.models.orm.access_log import AccessLog
from src.backend.models.orm.setting import SystemSetting, RecipeVersionAudit
from src.backend.models.orm.station import Station, StationRecipeAssignment
from src.backend.services.auth_service import hash_password, generate_api_key
# ---------------------------------------------------------------------------
# In-memory SQLite engine for tests
# ---------------------------------------------------------------------------
TEST_DATABASE_URL = "sqlite+aiosqlite://"
test_engine = create_async_engine(
TEST_DATABASE_URL,
echo=False,
connect_args={"check_same_thread": False},
# StaticPool keeps a single connection for in-memory SQLite so that
# create_all, fixtures, and the app share the same database.
poolclass=StaticPool,
)
TestSessionFactory = async_sessionmaker(
test_engine,
class_=AsyncSession,
expire_on_commit=False,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest_asyncio.fixture(autouse=True)
async def setup_database():
"""Create all tables before each test and drop them after."""
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture(autouse=True)
def reset_rate_limits():
"""Clear rate limit buckets between tests to avoid 429 in test suite.
The RateLimitMiddleware is added to the module-level ``app`` object and
persists across tests. Walk the ASGI middleware stack to find the
instance and clear its per-IP sliding-window dictionaries.
"""
middleware = app.middleware_stack
while middleware is not None:
if isinstance(middleware, RateLimitMiddleware):
middleware._login_requests.clear()
middleware._general_requests.clear()
break
middleware = getattr(middleware, "app", None)
yield
@pytest_asyncio.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
"""Yield a fresh async session for direct DB manipulation in tests."""
async with TestSessionFactory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@pytest_asyncio.fixture
async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]:
"""Yield an httpx AsyncClient wired to the FastAPI app with test DB."""
async def _override_get_db() -> AsyncGenerator[AsyncSession, None]:
yield db_session
app.dependency_overrides[get_db] = _override_get_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
yield ac
app.dependency_overrides.clear()
# ---------------------------------------------------------------------------
# User factory helpers
# ---------------------------------------------------------------------------
async def _create_user(
session: AsyncSession,
username: str,
password: str = "TestPass123",
display_name: str | None = None,
roles: list[str] | None = None,
is_admin: bool = False,
active: bool = True,
) -> User:
"""Insert a user into the test DB and return it with an API key set."""
api_key = generate_api_key()
user = User(
username=username,
password_hash=hash_password(password),
display_name=display_name or username.title(),
roles=roles or [],
is_admin=is_admin,
active=active,
api_key=api_key,
language_pref="en",
theme_pref="light",
)
session.add(user)
await session.flush()
await session.refresh(user)
return user
@pytest_asyncio.fixture
async def admin_user(db_session: AsyncSession) -> User:
"""An active admin user with API key."""
return await _create_user(
db_session,
username="admin",
display_name="Admin User",
roles=["Maker", "MeasurementTec", "Metrologist"],
is_admin=True,
)
@pytest_asyncio.fixture
async def maker_user(db_session: AsyncSession) -> User:
"""An active Maker user with API key."""
return await _create_user(
db_session,
username="maker",
display_name="Maker User",
roles=["Maker"],
)
@pytest_asyncio.fixture
async def measurement_tec_user(db_session: AsyncSession) -> User:
"""An active MeasurementTec user with API key."""
return await _create_user(
db_session,
username="measurement_tec",
display_name="MeasurementTec User",
roles=["MeasurementTec"],
)
@pytest_asyncio.fixture
async def metrologist_user(db_session: AsyncSession) -> User:
"""An active Metrologist user with API key."""
return await _create_user(
db_session,
username="metrologist",
display_name="Metrologist User",
roles=["Metrologist"],
)
# ---------------------------------------------------------------------------
# Recipe helper
# ---------------------------------------------------------------------------
async def create_test_recipe(
session: AsyncSession,
user_id: int,
code: str = "REC-001",
name: str = "Test Recipe",
) -> Recipe:
"""Create a recipe with one version (v1 current), one task, and one subtask.
Returns the Recipe ORM object.
"""
recipe = Recipe(
code=code,
name=name,
description="A recipe for testing",
created_by=user_id,
)
session.add(recipe)
await session.flush()
version = RecipeVersion(
recipe_id=recipe.id,
version_number=1,
is_current=True,
created_by=user_id,
change_notes="Initial version",
)
session.add(version)
await session.flush()
task = RecipeTask(
version_id=version.id,
order_index=0,
title="Test Task",
directive="Measure the part",
description="First measurement task",
)
session.add(task)
await session.flush()
subtask = RecipeSubtask(
task_id=task.id,
marker_number=1,
description="Diameter measurement",
measurement_type="diameter",
nominal=10.0,
utl=10.5,
uwl=10.3,
lwl=9.7,
ltl=9.5,
unit="mm",
)
session.add(subtask)
await session.flush()
await session.refresh(recipe, attribute_names=["versions"])
return recipe
def auth_headers(user: User) -> dict[str, str]:
"""Return headers dict with the user's API key for authenticated requests."""
return {"X-API-Key": user.api_key}
+96
View File
@@ -0,0 +1,96 @@
"""Tests for authentication router (/api/auth)."""
import pytest
from httpx import AsyncClient
from src.backend.tests.conftest import _create_user, auth_headers
class TestLogin:
"""POST /api/auth/login tests."""
async def test_login_success(self, client: AsyncClient, db_session):
"""Successful login returns user data and API key."""
user = await _create_user(
db_session, username="loginuser", password="SecurePass1"
)
resp = await client.post(
"/api/auth/login",
json={"username": "loginuser", "password": "SecurePass1"},
)
assert resp.status_code == 200
data = resp.json()
assert data["user"]["username"] == "loginuser"
assert "api_key" in data
assert len(data["api_key"]) > 0
async def test_login_wrong_password(self, client: AsyncClient, db_session):
"""Login with wrong password returns 401."""
await _create_user(
db_session, username="wrongpw", password="CorrectPass1"
)
resp = await client.post(
"/api/auth/login",
json={"username": "wrongpw", "password": "WrongPass1"},
)
assert resp.status_code == 401
assert "Invalid" in resp.json()["detail"]
async def test_login_unknown_user(self, client: AsyncClient):
"""Login with non-existent username returns 401."""
resp = await client.post(
"/api/auth/login",
json={"username": "nonexistent", "password": "Anything1"},
)
assert resp.status_code == 401
async def test_login_inactive_user(self, client: AsyncClient, db_session):
"""Login with inactive user returns 401."""
await _create_user(
db_session,
username="inactive_user",
password="InactivePass1",
active=False,
)
resp = await client.post(
"/api/auth/login",
json={"username": "inactive_user", "password": "InactivePass1"},
)
assert resp.status_code == 401
async def test_login_missing_fields(self, client: AsyncClient):
"""Login with missing fields returns 422."""
resp = await client.post("/api/auth/login", json={"username": "only"})
assert resp.status_code == 422
async def test_login_returns_api_key(self, client: AsyncClient, db_session):
"""Login response contains a valid api_key string."""
await _create_user(
db_session, username="apikey_user", password="KeyPass123"
)
resp = await client.post(
"/api/auth/login",
json={"username": "apikey_user", "password": "KeyPass123"},
)
assert resp.status_code == 200
api_key = resp.json()["api_key"]
# API key should be a non-empty string (base64url, ~64 chars)
assert isinstance(api_key, str)
assert len(api_key) >= 32
class TestHealthAndAuth:
"""Health check and auth requirement tests."""
async def test_health_check_no_auth(self, client: AsyncClient):
"""Health check endpoint works without authentication."""
resp = await client.get("/api/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert "version" in data
async def test_endpoint_requires_api_key(self, client: AsyncClient):
"""Protected endpoint returns 401 without API key."""
resp = await client.get("/api/users")
assert resp.status_code == 401
assert "API key" in resp.json()["detail"] or "Missing" in resp.json()["detail"]
+175
View File
@@ -0,0 +1,175 @@
"""Tests for files router (/api/files)."""
import io
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from httpx import AsyncClient
from src.backend.models.orm.user import User
from src.backend.tests.conftest import auth_headers
class TestUploadFile:
"""POST /api/files/upload tests."""
async def test_upload_image(
self, client: AsyncClient, maker_user: User, tmp_path: Path
):
"""Maker can upload a valid image."""
# Create a minimal valid JPEG (1x1 pixel)
jpeg_bytes = (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01"
b"\x00\x01\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06"
b"\x05\x08\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b"
b"\x0c\x19\x12\x13\x0f\x14\x1d\x1a\x1f\x1e\x1d\x1a\x1c"
b"\x1c $.\' \",#\x1c\x1c(7),01444\x1f\'9=82<.342\xff\xc0"
b"\x00\x0b\x08\x00\x01\x00\x01\x01\x01\x11\x00\xff\xc4"
b"\x00\x1f\x00\x00\x01\x05\x01\x01\x01\x01\x01\x01\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06"
b"\x07\x08\t\n\x0b\xff\xc4\x00\xb5\x10\x00\x02\x01\x03"
b"\x03\x02\x04\x03\x05\x05\x04\x04\x00\x00\x01}\x01\x02"
b"\x03\x00\x04\x11\x05\x12!1A\x06\x13Qa\x07\"q\x142\x81"
b"\x91\xa1\x08#B\xb1\xc1\x15R\xd1\xf0$3br\x82\t\n\x16"
b"\x17\x18\x19\x1a%&\'()*456789:CDEFGHIJSTUVWXYZcdefghij"
b"stuvwxyz\x83\x84\x85\x86\x87\x88\x89\x8a\x92\x93\x94"
b"\x95\x96\x97\x98\x99\x9a\xa2\xa3\xa4\xa5\xa6\xa7\xa8"
b"\xa9\xaa\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xc2\xc3"
b"\xc4\xc5\xc6\xc7\xc8\xc9\xca\xd2\xd3\xd4\xd5\xd6\xd7"
b"\xd8\xd9\xda\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea"
b"\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xff\xda\x00"
b"\x08\x01\x01\x00\x00?\x00T\xdb\xae\x8e\xd3H\xa5(?"
b"\xff\xd9"
)
# Patch settings.upload_path to use tmp_path
with patch("src.backend.api.routers.files.settings") as mock_settings:
mock_settings.upload_path = tmp_path
mock_settings.max_upload_size_mb = 50
resp = await client.post(
"/api/files/upload",
headers=auth_headers(maker_user),
files={"file": ("test.jpg", io.BytesIO(jpeg_bytes), "image/jpeg")},
)
assert resp.status_code == 200
data = resp.json()
assert "file_path" in data
assert data["file_type"] == "image/jpeg"
assert data["file_size"] > 0
async def test_upload_invalid_type(
self, client: AsyncClient, maker_user: User
):
"""Uploading a disallowed file type returns 400."""
resp = await client.post(
"/api/files/upload",
headers=auth_headers(maker_user),
files={
"file": ("malware.exe", io.BytesIO(b"MZ\x00\x00"), "application/x-msdownload")
},
)
assert resp.status_code == 400
assert "not allowed" in resp.json()["detail"]
async def test_upload_too_large(
self, client: AsyncClient, maker_user: User
):
"""Uploading a file that exceeds size limit returns 400."""
# Create a large file content
large_content = b"\x00" * (51 * 1024 * 1024) # 51MB
with patch("src.backend.api.routers.files.settings") as mock_settings:
mock_settings.max_upload_size_mb = 50
mock_settings.upload_path = Path(tempfile.mkdtemp())
resp = await client.post(
"/api/files/upload",
headers=auth_headers(maker_user),
files={
"file": ("big.jpg", io.BytesIO(large_content), "image/jpeg")
},
)
assert resp.status_code == 400
assert "exceeds" in resp.json()["detail"]
class TestGetFile:
"""GET /api/files/{file_path} tests."""
async def test_get_file(
self, client: AsyncClient, maker_user: User, tmp_path: Path
):
"""Authenticated user can retrieve an uploaded file."""
# Create a test file in tmp_path
test_file = tmp_path / "testfile.txt"
test_file.write_text("hello test")
with patch("src.backend.api.routers.files.settings") as mock_settings:
mock_settings.upload_path = tmp_path
resp = await client.get(
"/api/files/testfile.txt",
headers=auth_headers(maker_user),
)
assert resp.status_code == 200
async def test_get_file_not_found(
self, client: AsyncClient, maker_user: User, tmp_path: Path
):
"""Requesting a non-existent file returns 404."""
with patch("src.backend.api.routers.files.settings") as mock_settings:
mock_settings.upload_path = tmp_path
resp = await client.get(
"/api/files/nonexistent.png",
headers=auth_headers(maker_user),
)
assert resp.status_code == 404
class TestDeleteFile:
"""DELETE /api/files/{file_path} tests."""
async def test_delete_file(
self, client: AsyncClient, maker_user: User, tmp_path: Path
):
"""Maker can delete a file."""
test_file = tmp_path / "deleteme.txt"
test_file.write_text("delete me")
with patch("src.backend.api.routers.files.settings") as mock_settings:
mock_settings.upload_path = tmp_path
resp = await client.delete(
"/api/files/deleteme.txt",
headers=auth_headers(maker_user),
)
assert resp.status_code == 204
assert not test_file.exists()
class TestFilenameSanitization:
"""Path traversal protection tests."""
async def test_path_traversal_blocked(
self, client: AsyncClient, maker_user: User, tmp_path: Path
):
"""Path traversal attempt returns 403 or 404."""
with patch("src.backend.api.routers.files.settings") as mock_settings:
mock_settings.upload_path = tmp_path
resp = await client.get(
"/api/files/../../etc/passwd",
headers=auth_headers(maker_user),
)
# Should be blocked by security check
assert resp.status_code in (403, 404)
+278
View File
@@ -0,0 +1,278 @@
"""Tests for measurements router (/api/measurements)."""
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.user import User
from src.backend.models.orm.recipe import RecipeVersion
from src.backend.models.orm.task import RecipeTask, RecipeSubtask
from src.backend.tests.conftest import auth_headers, create_test_recipe
async def _get_subtask_and_version(
client: AsyncClient, user: User, recipe_id: int
) -> tuple[int, int]:
"""Helper: return (subtask_id, version_id) from the recipe's current version."""
tasks_resp = await client.get(
f"/api/recipes/{recipe_id}/tasks", headers=auth_headers(user)
)
task = tasks_resp.json()[0]
subtask_id = task["subtasks"][0]["id"]
version_id = task["version_id"]
return subtask_id, version_id
class TestCreateMeasurement:
"""POST /api/measurements/ tests."""
async def test_create_measurement(
self,
client: AsyncClient,
measurement_tec_user: User,
db_session: AsyncSession,
):
"""MeasurementTec can create a measurement."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, version_id = await _get_subtask_and_version(
client, measurement_tec_user, recipe.id
)
resp = await client.post(
"/api/measurements/",
headers=auth_headers(measurement_tec_user),
json={
"subtask_id": subtask_id,
"version_id": version_id,
"value": 10.1,
"input_method": "manual",
},
)
assert resp.status_code == 200
data = resp.json()
assert data["value"] == pytest.approx(10.1, rel=1e-4)
assert data["pass_fail"] in ("pass", "warning", "fail")
assert data["measured_by"] == measurement_tec_user.id
async def test_measurement_requires_auth(self, client: AsyncClient):
"""Creating measurement without auth returns 401."""
resp = await client.post(
"/api/measurements/",
json={
"subtask_id": 1,
"version_id": 1,
"value": 10.0,
},
)
assert resp.status_code == 401
async def test_measurement_validation(
self,
client: AsyncClient,
measurement_tec_user: User,
):
"""Measurement with non-existent subtask returns 400."""
resp = await client.post(
"/api/measurements/",
headers=auth_headers(measurement_tec_user),
json={
"subtask_id": 99999,
"version_id": 99999,
"value": 10.0,
},
)
assert resp.status_code == 400
class TestMeasurementPassFail:
"""Pass/fail calculation tests."""
async def test_measurement_pass(
self,
client: AsyncClient,
measurement_tec_user: User,
db_session: AsyncSession,
):
"""Value within tolerance -> pass."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, version_id = await _get_subtask_and_version(
client, measurement_tec_user, recipe.id
)
# nominal=10.0, utl=10.5, uwl=10.3, lwl=9.7, ltl=9.5
resp = await client.post(
"/api/measurements/",
headers=auth_headers(measurement_tec_user),
json={
"subtask_id": subtask_id,
"version_id": version_id,
"value": 10.0,
},
)
assert resp.status_code == 200
assert resp.json()["pass_fail"] == "pass"
async def test_measurement_warning(
self,
client: AsyncClient,
measurement_tec_user: User,
db_session: AsyncSession,
):
"""Value outside warning but inside tolerance -> warning."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, version_id = await _get_subtask_and_version(
client, measurement_tec_user, recipe.id
)
# uwl=10.3 < 10.4 < utl=10.5 -> warning
resp = await client.post(
"/api/measurements/",
headers=auth_headers(measurement_tec_user),
json={
"subtask_id": subtask_id,
"version_id": version_id,
"value": 10.4,
},
)
assert resp.status_code == 200
assert resp.json()["pass_fail"] == "warning"
async def test_measurement_fail(
self,
client: AsyncClient,
measurement_tec_user: User,
db_session: AsyncSession,
):
"""Value outside tolerance limits -> fail."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, version_id = await _get_subtask_and_version(
client, measurement_tec_user, recipe.id
)
# value > utl=10.5 -> fail
resp = await client.post(
"/api/measurements/",
headers=auth_headers(measurement_tec_user),
json={
"subtask_id": subtask_id,
"version_id": version_id,
"value": 10.6,
},
)
assert resp.status_code == 200
assert resp.json()["pass_fail"] == "fail"
class TestListMeasurements:
"""GET /api/measurements/ tests."""
async def test_list_measurements(
self,
client: AsyncClient,
measurement_tec_user: User,
metrologist_user: User,
db_session: AsyncSession,
):
"""Metrologist can list measurements with pagination."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, version_id = await _get_subtask_and_version(
client, measurement_tec_user, recipe.id
)
# Create some measurements
for val in [9.8, 10.0, 10.2]:
await client.post(
"/api/measurements/",
headers=auth_headers(measurement_tec_user),
json={
"subtask_id": subtask_id,
"version_id": version_id,
"value": val,
},
)
# List as Metrologist
resp = await client.get(
"/api/measurements/",
headers=auth_headers(metrologist_user),
)
assert resp.status_code == 200
data = resp.json()
assert "items" in data
assert "total" in data
assert data["total"] >= 3
async def test_measurement_by_recipe_filter(
self,
client: AsyncClient,
measurement_tec_user: User,
metrologist_user: User,
db_session: AsyncSession,
):
"""Filtering measurements by recipe_id returns only matching records."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, version_id = await _get_subtask_and_version(
client, measurement_tec_user, recipe.id
)
await client.post(
"/api/measurements/",
headers=auth_headers(measurement_tec_user),
json={
"subtask_id": subtask_id,
"version_id": version_id,
"value": 10.0,
},
)
resp = await client.get(
"/api/measurements/",
headers=auth_headers(metrologist_user),
params={"recipe_id": recipe.id},
)
assert resp.status_code == 200
assert resp.json()["total"] >= 1
class TestMeasurementStatistics:
"""Measurement deviation / statistics edge case tests."""
async def test_measurement_has_deviation(
self,
client: AsyncClient,
measurement_tec_user: User,
db_session: AsyncSession,
):
"""Measurement response includes deviation from nominal."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, version_id = await _get_subtask_and_version(
client, measurement_tec_user, recipe.id
)
resp = await client.post(
"/api/measurements/",
headers=auth_headers(measurement_tec_user),
json={
"subtask_id": subtask_id,
"version_id": version_id,
"value": 10.2,
},
)
assert resp.status_code == 200
data = resp.json()
# deviation = 10.2 - nominal(10.0) = 0.2
assert data["deviation"] is not None
assert abs(data["deviation"] - 0.2) < 0.001
@@ -0,0 +1,65 @@
"""Tests for RateLimitMiddleware honoring proxy headers (X-Forwarded-For)."""
from src.backend.api.middleware.rate_limit import RateLimitMiddleware
class _FakeRequest:
"""Minimal stand-in for starlette.requests.Request."""
def __init__(self, headers: dict[str, str], host: str | None = "10.0.0.1"):
self.headers = headers
class _Client:
pass
if host is None:
self.client = None
else:
self.client = _Client()
self.client.host = host
def test_client_ip_uses_x_forwarded_for_first_hop():
req = _FakeRequest(
headers={"x-forwarded-for": "203.0.113.5, 10.0.0.2"},
host="10.0.0.1",
)
assert RateLimitMiddleware._client_ip(req) == "203.0.113.5"
def test_client_ip_strips_whitespace():
req = _FakeRequest(headers={"x-forwarded-for": " 198.51.100.7 "})
assert RateLimitMiddleware._client_ip(req) == "198.51.100.7"
def test_client_ip_falls_back_to_x_real_ip():
req = _FakeRequest(headers={"x-real-ip": "203.0.113.99"}, host="10.0.0.1")
assert RateLimitMiddleware._client_ip(req) == "203.0.113.99"
def test_client_ip_falls_back_to_request_client_host():
req = _FakeRequest(headers={}, host="172.18.0.5")
assert RateLimitMiddleware._client_ip(req) == "172.18.0.5"
def test_client_ip_returns_unknown_without_client():
req = _FakeRequest(headers={}, host=None)
assert RateLimitMiddleware._client_ip(req) == "unknown"
def test_x_forwarded_for_overrides_x_real_ip():
req = _FakeRequest(
headers={
"x-forwarded-for": "203.0.113.5",
"x-real-ip": "10.0.0.2",
},
host="10.0.0.1",
)
assert RateLimitMiddleware._client_ip(req) == "203.0.113.5"
def test_empty_x_forwarded_for_falls_through():
req = _FakeRequest(
headers={"x-forwarded-for": "", "x-real-ip": "203.0.113.42"},
host="10.0.0.1",
)
assert RateLimitMiddleware._client_ip(req) == "203.0.113.42"
+252
View File
@@ -0,0 +1,252 @@
"""Tests for recipes router (/api/recipes)."""
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.user import User
from src.backend.tests.conftest import auth_headers, create_test_recipe
class TestListRecipes:
"""GET /api/recipes tests."""
async def test_list_recipes(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Authenticated user can list recipes."""
await create_test_recipe(db_session, maker_user.id)
resp = await client.get(
"/api/recipes", headers=auth_headers(maker_user)
)
assert resp.status_code == 200
data = resp.json()
assert "items" in data
assert "total" in data
assert data["total"] >= 1
assert len(data["items"]) >= 1
async def test_search_recipes(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Search parameter filters recipes by name or code."""
await create_test_recipe(
db_session, maker_user.id, code="SEARCH-001", name="Searchable Recipe"
)
await create_test_recipe(
db_session, maker_user.id, code="OTHER-002", name="Other Recipe"
)
resp = await client.get(
"/api/recipes",
headers=auth_headers(maker_user),
params={"search": "Searchable"},
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] >= 1
names = [item["name"] for item in data["items"]]
assert "Searchable Recipe" in names
class TestGetRecipe:
"""GET /api/recipes/{recipe_id} tests."""
async def test_get_recipe(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Get single recipe detail with current version."""
recipe = await create_test_recipe(db_session, maker_user.id)
resp = await client.get(
f"/api/recipes/{recipe.id}", headers=auth_headers(maker_user)
)
assert resp.status_code == 200
data = resp.json()
assert data["code"] == "REC-001"
assert data["name"] == "Test Recipe"
assert data["current_version"] is not None
assert data["current_version"]["version_number"] == 1
async def test_recipe_not_found(
self, client: AsyncClient, maker_user: User
):
"""Non-existent recipe returns 404."""
resp = await client.get(
"/api/recipes/99999", headers=auth_headers(maker_user)
)
assert resp.status_code == 404
class TestCreateRecipe:
"""POST /api/recipes tests."""
async def test_create_recipe_as_maker(
self, client: AsyncClient, maker_user: User
):
"""Maker can create a recipe.
Note: The POST response may not include ``current_version`` due to
async lazy-loading limitations with the in-memory SQLite test engine.
We verify the created recipe via a follow-up GET which uses
``selectinload`` and therefore loads the full object graph correctly.
"""
resp = await client.post(
"/api/recipes",
headers=auth_headers(maker_user),
json={
"code": "NEW-001",
"name": "New Recipe",
"description": "Created in test",
},
)
# Accept either 201 (success) or 500 (lazy-loading limitation with
# SQLite in-memory; the recipe IS created, the serialisation fails)
assert resp.status_code in (201, 500)
# Verify via GET which uses full selectinload
list_resp = await client.get(
"/api/recipes",
headers=auth_headers(maker_user),
params={"search": "NEW-001"},
)
assert list_resp.status_code == 200
items = list_resp.json()["items"]
assert len(items) >= 1
recipe = items[0]
assert recipe["code"] == "NEW-001"
assert recipe["name"] == "New Recipe"
assert recipe["active"] is True
# Should have current version v1
assert recipe["current_version"] is not None
assert recipe["current_version"]["version_number"] == 1
async def test_create_recipe_forbidden(
self, client: AsyncClient, measurement_tec_user: User
):
"""MeasurementTec without Maker role cannot create recipes."""
resp = await client.post(
"/api/recipes",
headers=auth_headers(measurement_tec_user),
json={
"code": "FORBIDDEN-001",
"name": "Forbidden Recipe",
},
)
assert resp.status_code == 403
async def test_create_recipe_duplicate_code(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Duplicate recipe code returns 409."""
await create_test_recipe(db_session, maker_user.id, code="DUP-001")
resp = await client.post(
"/api/recipes",
headers=auth_headers(maker_user),
json={"code": "DUP-001", "name": "Duplicate"},
)
assert resp.status_code == 409
class TestUpdateRecipe:
"""PUT /api/recipes/{recipe_id} tests."""
async def test_update_recipe(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Updating a recipe creates a new version (copy-on-write).
We verify the new version exists by listing versions, since
SQLAlchemy identity-map staleness with StaticPool can cause
``is_current`` to appear stale in some responses.
"""
recipe = await create_test_recipe(db_session, maker_user.id)
resp = await client.put(
f"/api/recipes/{recipe.id}",
headers=auth_headers(maker_user),
json={
"name": "Updated Recipe",
"change_notes": "Updated name in test",
},
)
assert resp.status_code == 200
# Verify via versions list which always does a clean query
versions_resp = await client.get(
f"/api/recipes/{recipe.id}/versions",
headers=auth_headers(maker_user),
)
assert versions_resp.status_code == 200
versions = versions_resp.json()
version_numbers = [v["version_number"] for v in versions]
assert 1 in version_numbers
assert 2 in version_numbers
class TestDeleteRecipe:
"""DELETE /api/recipes/{recipe_id} tests."""
async def test_delete_recipe(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Soft-delete deactivates a recipe."""
recipe = await create_test_recipe(db_session, maker_user.id)
resp = await client.delete(
f"/api/recipes/{recipe.id}", headers=auth_headers(maker_user)
)
assert resp.status_code == 200
assert resp.json()["active"] is False
class TestRecipeVersioning:
"""Versioning-related tests."""
async def test_recipe_versioning(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Multiple updates create consecutive versions."""
recipe = await create_test_recipe(db_session, maker_user.id)
# Update once -> v2
await client.put(
f"/api/recipes/{recipe.id}",
headers=auth_headers(maker_user),
json={"change_notes": "Version 2"},
)
# Update again -> v3
await client.put(
f"/api/recipes/{recipe.id}",
headers=auth_headers(maker_user),
json={"change_notes": "Version 3"},
)
# List versions
resp = await client.get(
f"/api/recipes/{recipe.id}/versions",
headers=auth_headers(maker_user),
)
assert resp.status_code == 200
versions = resp.json()
version_numbers = [v["version_number"] for v in versions]
assert 1 in version_numbers
assert 2 in version_numbers
assert 3 in version_numbers
+107
View File
@@ -0,0 +1,107 @@
"""Tests for reports router (/api/reports).
Note: Report generation requires WeasyPrint/Kaleido and Jinja2 templates.
These tests mock the PDF generation to test endpoint plumbing and auth.
"""
from unittest.mock import AsyncMock, patch
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.user import User
from src.backend.tests.conftest import auth_headers, create_test_recipe
class TestMeasurementReport:
"""GET /api/reports/measurements tests."""
async def test_generate_measurement_report(
self,
client: AsyncClient,
metrologist_user: User,
db_session: AsyncSession,
):
"""Metrologist can generate a measurement report (mocked PDF)."""
recipe = await create_test_recipe(
db_session, metrologist_user.id
)
fake_pdf = b"%PDF-1.4 fake content"
with patch(
"src.backend.api.routers.reports.generate_measurement_report",
new_callable=AsyncMock,
return_value=fake_pdf,
):
resp = await client.get(
"/api/reports/measurements",
headers=auth_headers(metrologist_user),
params={"recipe_id": recipe.id},
)
assert resp.status_code == 200
assert resp.headers["content-type"] == "application/pdf"
assert b"%PDF" in resp.content
async def test_report_requires_auth(self, client: AsyncClient):
"""Report endpoints require authentication."""
resp = await client.get(
"/api/reports/measurements",
params={"recipe_id": 1},
)
assert resp.status_code == 401
class TestSPCReport:
"""GET /api/reports/spc tests."""
async def test_generate_spc_report(
self,
client: AsyncClient,
metrologist_user: User,
db_session: AsyncSession,
):
"""Metrologist can generate an SPC report (mocked PDF)."""
recipe = await create_test_recipe(
db_session, metrologist_user.id
)
# Get subtask_id
tasks_resp = await client.get(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(metrologist_user),
)
subtask_id = tasks_resp.json()[0]["subtasks"][0]["id"]
fake_pdf = b"%PDF-1.4 spc report content"
with patch(
"src.backend.api.routers.reports.generate_spc_report",
new_callable=AsyncMock,
return_value=fake_pdf,
):
resp = await client.get(
"/api/reports/spc",
headers=auth_headers(metrologist_user),
params={
"recipe_id": recipe.id,
"subtask_id": subtask_id,
},
)
assert resp.status_code == 200
assert resp.headers["content-type"] == "application/pdf"
async def test_spc_report_requires_metrologist(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Non-Metrologist cannot generate SPC reports."""
recipe = await create_test_recipe(db_session, maker_user.id)
resp = await client.get(
"/api/reports/spc",
headers=auth_headers(maker_user),
params={"recipe_id": recipe.id, "subtask_id": 1},
)
assert resp.status_code == 403
+161
View File
@@ -0,0 +1,161 @@
"""Security integration tests for FastAPI server.
Covers CORS, rate limiting, security headers, API key auth,
path traversal protection, filename sanitization, and OPTIONS preflight.
"""
import time
from unittest.mock import patch
import pytest
from httpx import AsyncClient
from src.backend.tests.conftest import _create_user, auth_headers
class TestCORSHeaders:
"""CORS header tests."""
async def test_cors_headers_present(self, client: AsyncClient):
"""Response includes CORS headers for allowed origin."""
resp = await client.get(
"/api/health",
headers={"Origin": "http://localhost:5000"},
)
assert resp.status_code == 200
assert "access-control-allow-origin" in resp.headers
class TestSecurityHeaders:
"""Security header tests."""
async def test_security_headers_present(self, client: AsyncClient):
"""Response includes all standard security headers."""
resp = await client.get("/api/health")
assert resp.status_code == 200
assert resp.headers.get("x-content-type-options") == "nosniff"
assert resp.headers.get("x-frame-options") == "DENY"
assert resp.headers.get("x-xss-protection") == "1; mode=block"
assert "strict-origin" in resp.headers.get("referrer-policy", "")
assert "default-src" in resp.headers.get("content-security-policy", "")
class TestAPIKeyAuth:
"""API key authentication tests."""
async def test_protected_endpoint_requires_api_key(self, client: AsyncClient):
"""Protected endpoint returns 401 without API key."""
resp = await client.get("/api/users")
assert resp.status_code == 401
detail = resp.json().get("detail", "")
assert "API key" in detail or "Missing" in detail
async def test_invalid_api_key_returns_401(self, client: AsyncClient):
"""Invalid API key returns 401."""
resp = await client.get(
"/api/users",
headers={"X-API-Key": "totally-invalid-key-12345"},
)
assert resp.status_code == 401
detail = resp.json().get("detail", "")
assert "Invalid" in detail or "inactive" in detail
class TestRateLimiting:
"""Rate limiting tests."""
async def test_login_rate_limit_returns_429(self, client: AsyncClient, db_session):
"""Exceeding login rate limit returns 429."""
# Default rate_limit_login = 5 per minute
# Send 6 requests to exceed the limit
for i in range(6):
resp = await client.post(
"/api/auth/login",
json={"username": f"user{i}", "password": "pass"},
)
# The 6th request (or beyond) should be rate limited
# Send one more to be sure
resp = await client.post(
"/api/auth/login",
json={"username": "overflow", "password": "pass"},
)
assert resp.status_code == 429
assert "Too many" in resp.json().get("detail", "")
async def test_rate_limit_retry_after_header(self, client: AsyncClient):
"""429 response includes Retry-After header."""
# Exhaust login rate limit
for i in range(7):
resp = await client.post(
"/api/auth/login",
json={"username": f"retry{i}", "password": "pass"},
)
# Final request should be 429 with Retry-After
resp = await client.post(
"/api/auth/login",
json={"username": "retrycheck", "password": "pass"},
)
assert resp.status_code == 429
assert "retry-after" in resp.headers
retry_val = int(resp.headers["retry-after"])
assert retry_val >= 1
class TestPathTraversal:
"""Path traversal protection in file endpoints."""
async def test_path_traversal_blocked(self, client: AsyncClient, db_session):
"""Path traversal in file endpoint is blocked (403 or 404)."""
user = await _create_user(
db_session,
username="fileuser",
roles=["Maker", "MeasurementTec"],
)
resp = await client.get(
"/api/files/../../etc/passwd",
headers=auth_headers(user),
)
# Should be 403 (access denied) or 404 (not found), never 200
assert resp.status_code in (403, 404)
class TestFilenameSanitization:
"""Filename sanitization tests."""
async def test_dotfile_rejected(self, client: AsyncClient, db_session):
"""Filenames starting with '.' are rejected."""
from src.backend.api.routers.files import sanitize_filename
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
sanitize_filename(".htaccess")
assert exc_info.value.status_code == 400
assert "." in exc_info.value.detail
async def test_empty_filename_rejected(self, client: AsyncClient, db_session):
"""Empty filenames are rejected."""
from src.backend.api.routers.files import sanitize_filename
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
sanitize_filename("///")
assert exc_info.value.status_code == 400
class TestOptionsPreflight:
"""OPTIONS preflight request tests."""
async def test_options_preflight(self, client: AsyncClient):
"""OPTIONS request returns CORS preflight headers."""
resp = await client.options(
"/api/health",
headers={
"Origin": "http://localhost:5000",
"Access-Control-Request-Method": "GET",
"Access-Control-Request-Headers": "X-API-Key",
},
)
assert resp.status_code == 200
assert "access-control-allow-methods" in resp.headers
+107
View File
@@ -0,0 +1,107 @@
"""Tests for settings router (/api/settings)."""
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.setting import SystemSetting
from src.backend.models.orm.user import User
from src.backend.tests.conftest import auth_headers
class TestGetSettings:
"""GET /api/settings/ tests."""
async def test_get_settings(
self,
client: AsyncClient,
admin_user: User,
db_session: AsyncSession,
):
"""Authenticated user can retrieve settings."""
# Seed a setting
setting = SystemSetting(
setting_key="test_setting",
setting_value="test_value",
setting_type="string",
)
db_session.add(setting)
await db_session.flush()
resp = await client.get(
"/api/settings/", headers=auth_headers(admin_user)
)
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, dict)
assert data.get("test_setting") == "test_value"
async def test_get_settings_empty(
self, client: AsyncClient, maker_user: User
):
"""Empty settings returns empty dict."""
resp = await client.get(
"/api/settings/", headers=auth_headers(maker_user)
)
assert resp.status_code == 200
assert resp.json() == {} or isinstance(resp.json(), dict)
async def test_get_setting_by_key(
self,
client: AsyncClient,
admin_user: User,
db_session: AsyncSession,
):
"""Settings dict contains specific key after creation."""
setting = SystemSetting(
setting_key="csv_delimiter",
setting_value=";",
setting_type="string",
)
db_session.add(setting)
await db_session.flush()
resp = await client.get(
"/api/settings/", headers=auth_headers(admin_user)
)
assert resp.status_code == 200
assert resp.json()["csv_delimiter"] == ";"
class TestUpdateSettings:
"""PUT /api/settings/ tests."""
async def test_update_settings(
self, client: AsyncClient, admin_user: User
):
"""Admin can update/create settings."""
resp = await client.put(
"/api/settings/",
headers=auth_headers(admin_user),
json={
"company_name": "Tielogic Srl",
"csv_delimiter": ",",
},
)
assert resp.status_code == 200
data = resp.json()
assert data["message"].startswith("Updated")
assert "company_name" in data["updated_keys"]
assert "csv_delimiter" in data["updated_keys"]
# Verify persistence
get_resp = await client.get(
"/api/settings/", headers=auth_headers(admin_user)
)
settings = get_resp.json()
assert settings["company_name"] == "Tielogic Srl"
async def test_settings_require_admin(
self, client: AsyncClient, maker_user: User
):
"""Non-admin user cannot update settings."""
resp = await client.put(
"/api/settings/",
headers=auth_headers(maker_user),
json={"some_key": "some_value"},
)
assert resp.status_code == 403
+294
View File
@@ -0,0 +1,294 @@
"""Tests for setup router - demo measurements and user management."""
import pytest
from httpx import AsyncClient
from src.backend.config import settings
from src.backend.tests.conftest import test_engine, TestSessionFactory
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
SETUP_PWD = "test-setup-password"
@pytest.fixture(autouse=True)
def enable_setup(monkeypatch):
"""Enable setup endpoints and redirect engine/session to test DB."""
monkeypatch.setattr(settings, "setup_password", SETUP_PWD)
# setup.py imports engine and async_session_factory directly from database
# and uses them instead of get_db dependency injection. Patch them to use
# the test in-memory SQLite engine/session instead of real MySQL.
import src.backend.api.routers.setup as setup_mod
monkeypatch.setattr(setup_mod, "engine", test_engine)
monkeypatch.setattr(setup_mod, "async_session_factory", TestSessionFactory)
async def _seed(client: AsyncClient) -> dict:
"""Run seed and return response data."""
resp = await client.post("/api/setup/init-db", json={"password": SETUP_PWD})
assert resp.status_code == 200
resp = await client.post("/api/setup/seed", json={"password": SETUP_PWD})
assert resp.status_code == 200
return resp.json()
# ---------------------------------------------------------------------------
# Feature 1: Demo Measurements in Seed
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_seed_creates_measurements(client: AsyncClient):
"""Seed should create 180 measurements (30 per subtask x 6 subtasks)."""
data = await _seed(client)
assert data["measurements_created"] == 180
@pytest.mark.asyncio
async def test_seed_measurements_have_correct_distribution(client: AsyncClient, db_session):
"""Measurements should have a mix of pass, warning, and fail."""
await _seed(client)
from sqlalchemy import select, func
from src.backend.models.orm.measurement import Measurement
result = await db_session.execute(
select(Measurement.pass_fail, func.count()).group_by(Measurement.pass_fail)
)
counts = dict(result.all())
# Should have all three statuses
assert "pass" in counts
assert "warning" in counts
assert "fail" in counts
# Pass should be the majority
assert counts["pass"] > counts["warning"]
assert counts["pass"] > counts["fail"]
@pytest.mark.asyncio
async def test_seed_measurements_have_lot_and_serial(client: AsyncClient, db_session):
"""All measurements should have lot and serial numbers."""
await _seed(client)
from sqlalchemy import select, func
from src.backend.models.orm.measurement import Measurement
result = await db_session.execute(
select(func.count()).where(Measurement.lot_number.is_(None))
)
null_lots = result.scalar()
assert null_lots == 0
@pytest.mark.asyncio
async def test_seed_measurements_input_methods(client: AsyncClient, db_session):
"""Measurements should have mixed input methods."""
await _seed(client)
from sqlalchemy import select, func
from src.backend.models.orm.measurement import Measurement
result = await db_session.execute(
select(Measurement.input_method, func.count()).group_by(Measurement.input_method)
)
methods = dict(result.all())
assert "usb_caliper" in methods
assert "manual" in methods
@pytest.mark.asyncio
async def test_seed_is_reproducible(client: AsyncClient):
"""Seed uses random.seed(42), so results should be deterministic."""
data = await _seed(client)
assert data["measurements_created"] == 180
assert data["recipe_created"] == "DEMO-001"
# ---------------------------------------------------------------------------
# Feature 2: User Management - List
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_users_after_seed(client: AsyncClient):
"""After seed, list-users should return 4 users."""
await _seed(client)
resp = await client.post("/api/setup/list-users", json={"password": SETUP_PWD})
assert resp.status_code == 200
data = resp.json()
assert len(data["users"]) == 4
@pytest.mark.asyncio
async def test_list_users_wrong_password(client: AsyncClient):
"""list-users with wrong password should return 403."""
resp = await client.post("/api/setup/list-users", json={"password": "wrong"})
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# Feature 2: User Management - Create / Update
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_user(client: AsyncClient):
"""Create a new user via manage-user endpoint."""
resp = await client.post("/api/setup/init-db", json={"password": SETUP_PWD})
assert resp.status_code == 200
resp = await client.post("/api/setup/manage-user", json={
"password": SETUP_PWD,
"user_id": None,
"username": "newuser",
"user_password": "pass123",
"display_name": "New User",
"email": "new@test.com",
"roles": ["Maker"],
"is_admin": False,
})
assert resp.status_code == 200
data = resp.json()
assert data["user_id"] is not None
assert "created" in data["message"]
@pytest.mark.asyncio
async def test_create_user_duplicate_username(client: AsyncClient):
"""Creating a user with a duplicate username should return 409."""
await _seed(client)
resp = await client.post("/api/setup/manage-user", json={
"password": SETUP_PWD,
"user_id": None,
"username": "admin",
"user_password": "pass123",
"display_name": "Duplicate",
"roles": [],
})
assert resp.status_code == 409
@pytest.mark.asyncio
async def test_create_user_without_password(client: AsyncClient):
"""Creating a user without a password should return 400."""
resp = await client.post("/api/setup/init-db", json={"password": SETUP_PWD})
resp = await client.post("/api/setup/manage-user", json={
"password": SETUP_PWD,
"user_id": None,
"username": "nopassuser",
"display_name": "No Pass",
"roles": [],
})
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_update_user(client: AsyncClient):
"""Update an existing user via manage-user endpoint."""
await _seed(client)
# Get users to find the ID
resp = await client.post("/api/setup/list-users", json={"password": SETUP_PWD})
users = resp.json()["users"]
maker = next(u for u in users if u["username"] == "maker1")
resp = await client.post("/api/setup/manage-user", json={
"password": SETUP_PWD,
"user_id": maker["id"],
"username": "maker1",
"display_name": "Updated Name",
"email": "updated@test.com",
"roles": ["Maker", "MeasurementTec"],
"is_admin": False,
})
assert resp.status_code == 200
assert "updated" in resp.json()["message"]
@pytest.mark.asyncio
async def test_create_user_invalid_role(client: AsyncClient):
"""Creating a user with an invalid role should return 400."""
resp = await client.post("/api/setup/init-db", json={"password": SETUP_PWD})
resp = await client.post("/api/setup/manage-user", json={
"password": SETUP_PWD,
"user_id": None,
"username": "badrole",
"user_password": "pass123",
"display_name": "Bad Role",
"roles": ["InvalidRole"],
})
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# Feature 2: User Management - Password Change
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_change_password(client: AsyncClient):
"""Changing password should succeed and invalidate api_key."""
await _seed(client)
resp = await client.post("/api/setup/list-users", json={"password": SETUP_PWD})
users = resp.json()["users"]
tec = next(u for u in users if u["username"] == "tec1")
resp = await client.post("/api/setup/change-user-password", json={
"password": SETUP_PWD,
"user_id": tec["id"],
"new_password": "newpass123",
})
assert resp.status_code == 200
assert "changed" in resp.json()["message"].lower()
@pytest.mark.asyncio
async def test_change_password_user_not_found(client: AsyncClient):
"""Changing password for non-existent user should return 404."""
resp = await client.post("/api/setup/init-db", json={"password": SETUP_PWD})
resp = await client.post("/api/setup/change-user-password", json={
"password": SETUP_PWD,
"user_id": 99999,
"new_password": "newpass123",
})
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# Feature 2: User Management - Toggle Active
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_toggle_user_active(client: AsyncClient):
"""Toggle active should flip the user's active status."""
await _seed(client)
resp = await client.post("/api/setup/list-users", json={"password": SETUP_PWD})
users = resp.json()["users"]
maker = next(u for u in users if u["username"] == "maker1")
assert maker["active"] is True
# Deactivate
resp = await client.post("/api/setup/toggle-user-active", json={
"password": SETUP_PWD,
"user_id": maker["id"],
})
assert resp.status_code == 200
assert resp.json()["active"] is False
# Re-activate
resp = await client.post("/api/setup/toggle-user-active", json={
"password": SETUP_PWD,
"user_id": maker["id"],
})
assert resp.status_code == 200
assert resp.json()["active"] is True
+73
View File
@@ -0,0 +1,73 @@
"""Test the Station and StationRecipeAssignment ORM models."""
import pytest
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.station import Station, StationRecipeAssignment
from src.backend.tests.conftest import _create_user, create_test_recipe
async def test_create_station(db_session: AsyncSession):
admin = await _create_user(db_session, username="admin1", is_admin=True)
station = Station(
code="ST-001",
name="Linea 1",
location="Reparto Nord",
created_by=admin.id,
)
db_session.add(station)
await db_session.flush()
await db_session.refresh(station)
assert station.id is not None
assert station.active is True
assert station.created_at is not None
async def test_station_code_is_unique(db_session: AsyncSession):
admin = await _create_user(db_session, username="admin2", is_admin=True)
db_session.add(Station(code="ST-DUP", name="A", created_by=admin.id))
await db_session.flush()
db_session.add(Station(code="ST-DUP", name="B", created_by=admin.id))
with pytest.raises(IntegrityError):
await db_session.flush()
await db_session.rollback()
async def test_assign_recipe_to_station(db_session: AsyncSession):
admin = await _create_user(db_session, username="admin3", is_admin=True)
station = Station(code="ST-002", name="Linea 2", created_by=admin.id)
db_session.add(station)
await db_session.flush()
recipe = await create_test_recipe(db_session, user_id=admin.id, code="REC-X")
assignment = StationRecipeAssignment(
station_id=station.id, recipe_id=recipe.id, assigned_by=admin.id,
)
db_session.add(assignment)
await db_session.flush()
result = await db_session.execute(
select(StationRecipeAssignment).where(
StationRecipeAssignment.station_id == station.id
)
)
assignments = result.scalars().all()
assert len(assignments) == 1
assert assignments[0].recipe_id == recipe.id
async def test_duplicate_assignment_is_rejected(db_session: AsyncSession):
admin = await _create_user(db_session, username="admin4", is_admin=True)
station = Station(code="ST-003", name="Linea 3", created_by=admin.id)
db_session.add(station)
await db_session.flush()
recipe = await create_test_recipe(db_session, user_id=admin.id, code="REC-Y")
db_session.add(StationRecipeAssignment(
station_id=station.id, recipe_id=recipe.id, assigned_by=admin.id,
))
await db_session.flush()
db_session.add(StationRecipeAssignment(
station_id=station.id, recipe_id=recipe.id, assigned_by=admin.id,
))
with pytest.raises(IntegrityError):
await db_session.flush()
await db_session.rollback()
+35
View File
@@ -0,0 +1,35 @@
"""Tests for Station Pydantic schemas."""
import pytest
from pydantic import ValidationError
from src.backend.models.api.station import (
StationCreate, StationUpdate, StationResponse,
StationRecipeAssignmentCreate, StationRecipeAssignmentResponse,
StationWithRecipesResponse,
)
def test_station_create_valid():
data = StationCreate(code="ST-001", name="Linea 1", location="Reparto Nord")
assert data.code == "ST-001"
assert data.active is True
def test_station_create_rejects_empty_code():
with pytest.raises(ValidationError):
StationCreate(code="", name="X")
def test_station_create_rejects_too_long_code():
with pytest.raises(ValidationError):
StationCreate(code="A" * 101, name="X")
def test_station_update_all_optional():
data = StationUpdate()
assert data.name is None
def test_station_assignment_create():
data = StationRecipeAssignmentCreate(recipe_id=42)
assert data.recipe_id == 42
+100
View File
@@ -0,0 +1,100 @@
"""Verify /api/setup/seed creates a default station with all recipes assigned.
DEVIATION FROM PLAN: The real setup endpoint is NOT a single /api/setup/initialize.
The setup is split into separate endpoints:
- POST /api/setup/init-db (creates tables)
- POST /api/setup/seed (seeds users + demo recipe + stations)
The seed body is {"password": "..."} with no load_demo_data flag (demo data
is always seeded). The station seed is added inside /api/setup/seed.
The seed endpoint uses async_session_factory directly (not get_db dependency
injection), so we must monkeypatch routers.setup.engine and
routers.setup.async_session_factory to point at the test SQLite engine,
exactly as done in test_setup.py.
"""
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.config import settings
from src.backend.models.orm.station import Station, StationRecipeAssignment
from src.backend.models.orm.recipe import Recipe
from src.backend.tests.conftest import test_engine, TestSessionFactory
SETUP_PWD = "test-setup-pwd"
@pytest.fixture(autouse=True)
def enable_setup(monkeypatch):
"""Enable setup endpoints and redirect engine/session to test SQLite DB."""
monkeypatch.setattr(settings, "setup_password", SETUP_PWD)
import src.backend.api.routers.setup as setup_mod
monkeypatch.setattr(setup_mod, "engine", test_engine)
monkeypatch.setattr(setup_mod, "async_session_factory", TestSessionFactory)
async def _seed(client: AsyncClient) -> dict:
"""Init DB tables then run seed; return seed response JSON."""
resp = await client.post("/api/setup/init-db", json={"password": SETUP_PWD})
assert resp.status_code == 200, resp.text
resp = await client.post("/api/setup/seed", json={"password": SETUP_PWD})
assert resp.status_code == 200, resp.text
return resp.json()
@pytest.mark.asyncio
async def test_setup_seed_creates_default_station_and_assigns_recipes(
client: AsyncClient,
db_session: AsyncSession,
):
"""After seeding, ST-DEFAULT station must exist and all active recipes assigned."""
await _seed(client)
# Default station must exist and be active.
result = await db_session.execute(
select(Station).where(Station.code == "ST-DEFAULT")
)
default = result.scalar_one_or_none()
assert default is not None, "ST-DEFAULT station was not created"
assert default.active is True
# All active recipes must be assigned to the default station.
active_recipes_result = await db_session.execute(
select(Recipe).where(Recipe.active == True)
)
active_recipes = active_recipes_result.scalars().all()
assert len(active_recipes) > 0, "demo seed must create at least one active recipe"
assignments_result = await db_session.execute(
select(StationRecipeAssignment).where(
StationRecipeAssignment.station_id == default.id
)
)
n_assignments = len(assignments_result.scalars().all())
assert n_assignments == len(active_recipes), (
f"Expected {len(active_recipes)} assignment(s), got {n_assignments}"
)
@pytest.mark.asyncio
async def test_setup_seed_station_idempotent(
client: AsyncClient,
db_session: AsyncSession,
):
"""Running seed twice must not duplicate ST-DEFAULT or its assignments."""
# First run — creates everything.
await _seed(client)
# Second run — recipe DEMO-001 already exists → seed returns early.
# ST-DEFAULT must still exist (not re-created, not duplicated).
resp = await client.post("/api/setup/seed", json={"password": SETUP_PWD})
assert resp.status_code == 200, resp.text
stations_result = await db_session.execute(
select(Station).where(Station.code == "ST-DEFAULT")
)
stations = stations_result.scalars().all()
assert len(stations) == 1, f"Expected exactly 1 ST-DEFAULT, got {len(stations)}"
+114
View File
@@ -0,0 +1,114 @@
"""Tests for station_service business logic."""
import pytest
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.station import Station
from src.backend.models.api.station import StationCreate, StationUpdate
from src.backend.services.station_service import (
create_station, update_station, delete_station,
assign_recipe, unassign_recipe, list_station_recipes,
get_station_by_code,
)
from src.backend.tests.conftest import _create_user, create_test_recipe
async def test_create_station_ok(db_session: AsyncSession):
admin = await _create_user(db_session, username="a1", is_admin=True)
station = await create_station(
db_session, StationCreate(code="ST-100", name="Pilot"), admin
)
assert station.id is not None
assert station.code == "ST-100"
async def test_create_station_duplicate_code(db_session: AsyncSession):
admin = await _create_user(db_session, username="a2", is_admin=True)
await create_station(db_session, StationCreate(code="ST-DUP", name="A"), admin)
with pytest.raises(HTTPException) as exc:
await create_station(db_session, StationCreate(code="ST-DUP", name="B"), admin)
assert exc.value.status_code == 409
async def test_update_station(db_session: AsyncSession):
admin = await _create_user(db_session, username="a3", is_admin=True)
station = await create_station(db_session, StationCreate(code="ST-U", name="Old"), admin)
updated = await update_station(
db_session, station.id, StationUpdate(name="New name"),
)
assert updated.name == "New name"
assert updated.code == "ST-U"
async def test_update_missing_station(db_session: AsyncSession):
with pytest.raises(HTTPException) as exc:
await update_station(db_session, 9999, StationUpdate(name="x"))
assert exc.value.status_code == 404
async def test_assign_and_list_recipes(db_session: AsyncSession):
admin = await _create_user(db_session, username="a4", is_admin=True)
station = await create_station(db_session, StationCreate(code="ST-R", name="R"), admin)
r1 = await create_test_recipe(db_session, user_id=admin.id, code="REC-R1")
r2 = await create_test_recipe(db_session, user_id=admin.id, code="REC-R2")
await assign_recipe(db_session, station.id, r1.id, admin)
await assign_recipe(db_session, station.id, r2.id, admin)
recipes = await list_station_recipes(db_session, station.id)
assert {r.code for r in recipes} == {"REC-R1", "REC-R2"}
async def test_assign_same_recipe_twice_is_409(db_session: AsyncSession):
admin = await _create_user(db_session, username="a5", is_admin=True)
station = await create_station(db_session, StationCreate(code="ST-D", name="D"), admin)
r = await create_test_recipe(db_session, user_id=admin.id, code="REC-D")
await assign_recipe(db_session, station.id, r.id, admin)
with pytest.raises(HTTPException) as exc:
await assign_recipe(db_session, station.id, r.id, admin)
assert exc.value.status_code == 409
async def test_unassign_recipe(db_session: AsyncSession):
admin = await _create_user(db_session, username="a6", is_admin=True)
station = await create_station(db_session, StationCreate(code="ST-UN", name="UN"), admin)
r = await create_test_recipe(db_session, user_id=admin.id, code="REC-UN")
await assign_recipe(db_session, station.id, r.id, admin)
await unassign_recipe(db_session, station.id, r.id)
recipes = await list_station_recipes(db_session, station.id)
assert recipes == []
async def test_get_station_by_code(db_session: AsyncSession):
admin = await _create_user(db_session, username="a7", is_admin=True)
await create_station(db_session, StationCreate(code="ST-FIND", name="F"), admin)
found = await get_station_by_code(db_session, "ST-FIND")
assert found is not None
assert found.name == "F"
missing = await get_station_by_code(db_session, "ST-NOPE")
assert missing is None
async def test_list_recipes_only_returns_active(db_session: AsyncSession):
admin = await _create_user(db_session, username="a8", is_admin=True)
station = await create_station(db_session, StationCreate(code="ST-A", name="A"), admin)
active = await create_test_recipe(db_session, user_id=admin.id, code="REC-AC")
inactive = await create_test_recipe(db_session, user_id=admin.id, code="REC-IN")
inactive.active = False
await db_session.flush()
await assign_recipe(db_session, station.id, active.id, admin)
await assign_recipe(db_session, station.id, inactive.id, admin)
recipes = await list_station_recipes(db_session, station.id)
assert [r.code for r in recipes] == ["REC-AC"]
async def test_delete_station_cascades_assignments(db_session: AsyncSession):
from sqlalchemy import select
from src.backend.models.orm.station import StationRecipeAssignment
admin = await _create_user(db_session, username="a9", is_admin=True)
station = await create_station(db_session, StationCreate(code="ST-DEL", name="D"), admin)
r = await create_test_recipe(db_session, user_id=admin.id, code="REC-DEL")
await assign_recipe(db_session, station.id, r.id, admin)
await delete_station(db_session, station.id)
remaining = await db_session.execute(
select(StationRecipeAssignment).where(StationRecipeAssignment.station_id == station.id)
)
assert remaining.scalars().all() == []
+203
View File
@@ -0,0 +1,203 @@
"""Integration tests for /api/stations endpoints."""
import pytest
from httpx import AsyncClient
from src.backend.tests.conftest import auth_headers, create_test_recipe
async def test_list_stations_requires_auth(client: AsyncClient):
resp = await client.get("/api/stations")
assert resp.status_code == 401
async def test_create_station_as_admin(client: AsyncClient, admin_user):
resp = await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-API", "name": "Via API"},
)
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["code"] == "ST-API"
assert body["active"] is True
assert body["id"] > 0
async def test_create_station_non_admin_is_403(client: AsyncClient, maker_user):
resp = await client.post(
"/api/stations",
headers=auth_headers(maker_user),
json={"code": "ST-NO", "name": "No"},
)
assert resp.status_code == 403
async def test_update_station(client: AsyncClient, admin_user):
created = await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-UP", "name": "Old"},
)
station_id = created.json()["id"]
resp = await client.put(
f"/api/stations/{station_id}",
headers=auth_headers(admin_user),
json={"name": "New", "active": False},
)
assert resp.status_code == 200
body = resp.json()
assert body["name"] == "New"
assert body["active"] is False
async def test_delete_station(client: AsyncClient, admin_user):
created = await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-D", "name": "D"},
)
sid = created.json()["id"]
resp = await client.delete(
f"/api/stations/{sid}", headers=auth_headers(admin_user),
)
assert resp.status_code == 204
again = await client.get(
f"/api/stations/{sid}", headers=auth_headers(admin_user),
)
assert again.status_code == 404
async def test_assign_and_unassign_recipe(
client: AsyncClient, admin_user, db_session,
):
recipe = await create_test_recipe(db_session, user_id=admin_user.id, code="REC-AS")
await db_session.commit()
created = await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-ASSIGN", "name": "A"},
)
sid = created.json()["id"]
a = await client.post(
f"/api/stations/{sid}/recipes",
headers=auth_headers(admin_user),
json={"recipe_id": recipe.id},
)
assert a.status_code == 201
r = await client.get(
f"/api/stations/{sid}/recipes",
headers=auth_headers(admin_user),
)
assert r.status_code == 200
assert [rec["code"] for rec in r.json()] == ["REC-AS"]
u = await client.delete(
f"/api/stations/{sid}/recipes/{recipe.id}",
headers=auth_headers(admin_user),
)
assert u.status_code == 204
r2 = await client.get(
f"/api/stations/{sid}/recipes",
headers=auth_headers(admin_user),
)
assert r2.json() == []
async def test_list_recipes_by_station_code(
client: AsyncClient, admin_user, measurement_tec_user, db_session,
):
recipe = await create_test_recipe(db_session, user_id=admin_user.id, code="REC-BC")
await db_session.commit()
created = await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-BC", "name": "BC"},
)
sid = created.json()["id"]
await client.post(
f"/api/stations/{sid}/recipes",
headers=auth_headers(admin_user),
json={"recipe_id": recipe.id},
)
resp = await client.get(
"/api/stations/by-code/ST-BC/recipes",
headers=auth_headers(measurement_tec_user),
)
assert resp.status_code == 200
assert [r["code"] for r in resp.json()] == ["REC-BC"]
async def test_list_recipes_by_unknown_code_404(
client: AsyncClient, measurement_tec_user,
):
resp = await client.get(
"/api/stations/by-code/ST-DOES-NOT-EXIST/recipes",
headers=auth_headers(measurement_tec_user),
)
assert resp.status_code == 404
async def test_admin_can_list_stations(client: AsyncClient, admin_user):
await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-L1", "name": "A"},
)
await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-L2", "name": "B", "active": False},
)
resp = await client.get("/api/stations", headers=auth_headers(admin_user))
assert resp.status_code == 200
codes = {s["code"] for s in resp.json()}
assert {"ST-L1", "ST-L2"}.issubset(codes)
resp_active = await client.get(
"/api/stations?active_only=true", headers=auth_headers(admin_user),
)
assert resp_active.status_code == 200
active_codes = {s["code"] for s in resp_active.json()}
assert "ST-L1" in active_codes
assert "ST-L2" not in active_codes
async def test_assign_recipe_not_found_returns_404(
client: AsyncClient, admin_user,
):
created = await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-NR", "name": "NR"},
)
sid = created.json()["id"]
resp = await client.post(
f"/api/stations/{sid}/recipes",
headers=auth_headers(admin_user),
json={"recipe_id": 99999},
)
assert resp.status_code == 404
async def test_duplicate_assignment_returns_409(
client: AsyncClient, admin_user, db_session,
):
recipe = await create_test_recipe(db_session, user_id=admin_user.id, code="REC-DUP")
await db_session.commit()
created = await client.post(
"/api/stations",
headers=auth_headers(admin_user),
json={"code": "ST-DUP-A", "name": "Dup"},
)
sid = created.json()["id"]
first = await client.post(
f"/api/stations/{sid}/recipes",
headers=auth_headers(admin_user),
json={"recipe_id": recipe.id},
)
assert first.status_code == 201
second = await client.post(
f"/api/stations/{sid}/recipes",
headers=auth_headers(admin_user),
json={"recipe_id": recipe.id},
)
assert second.status_code == 409
+213
View File
@@ -0,0 +1,213 @@
"""Tests for statistics router (/api/statistics)."""
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.user import User
from src.backend.models.orm.measurement import Measurement
from src.backend.tests.conftest import auth_headers, create_test_recipe
async def _seed_measurements(
client: AsyncClient,
tec_user: User,
recipe_id: int,
values: list[float],
) -> tuple[int, int]:
"""Create measurements and return (subtask_id, version_id)."""
tasks_resp = await client.get(
f"/api/recipes/{recipe_id}/tasks", headers=auth_headers(tec_user)
)
task = tasks_resp.json()[0]
subtask_id = task["subtasks"][0]["id"]
version_id = task["version_id"]
for val in values:
await client.post(
"/api/measurements/",
headers=auth_headers(tec_user),
json={
"subtask_id": subtask_id,
"version_id": version_id,
"value": val,
},
)
return subtask_id, version_id
class TestSummary:
"""GET /api/statistics/summary tests."""
async def test_get_statistics_by_recipe(
self,
client: AsyncClient,
measurement_tec_user: User,
metrologist_user: User,
db_session: AsyncSession,
):
"""Metrologist can get summary statistics."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
await _seed_measurements(
client, measurement_tec_user, recipe.id,
[10.0, 10.1, 10.4, 10.6, 9.4], # pass, pass, warning, fail, fail
)
resp = await client.get(
"/api/statistics/summary",
headers=auth_headers(metrologist_user),
params={"recipe_id": recipe.id},
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 5
assert data["pass_count"] >= 0
assert data["warning_count"] >= 0
assert data["fail_count"] >= 0
assert data["pass_rate"] >= 0
# Total rates should sum to ~100
total_rate = data["pass_rate"] + data["warning_rate"] + data["fail_rate"]
assert abs(total_rate - 100.0) < 0.1
async def test_statistics_require_metrologist(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Non-Metrologist cannot access statistics."""
recipe = await create_test_recipe(db_session, maker_user.id)
resp = await client.get(
"/api/statistics/summary",
headers=auth_headers(maker_user),
params={"recipe_id": recipe.id},
)
assert resp.status_code == 403
async def test_statistics_empty(
self,
client: AsyncClient,
metrologist_user: User,
db_session: AsyncSession,
):
"""Statistics on recipe with no measurements returns zeros."""
recipe = await create_test_recipe(
db_session, metrologist_user.id
)
resp = await client.get(
"/api/statistics/summary",
headers=auth_headers(metrologist_user),
params={"recipe_id": recipe.id},
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["pass_count"] == 0
class TestCapability:
"""GET /api/statistics/capability tests."""
async def test_capability_indices(
self,
client: AsyncClient,
measurement_tec_user: User,
metrologist_user: User,
db_session: AsyncSession,
):
"""Capability endpoint returns Cp/Cpk/Pp/Ppk indices."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, _ = await _seed_measurements(
client, measurement_tec_user, recipe.id,
[9.9, 10.0, 10.1, 9.8, 10.2, 10.0, 9.95, 10.05, 10.1, 9.9],
)
resp = await client.get(
"/api/statistics/capability",
headers=auth_headers(metrologist_user),
params={
"recipe_id": recipe.id,
"subtask_id": subtask_id,
},
)
assert resp.status_code == 200
data = resp.json()
assert data["n"] == 10
assert data["mean"] is not None
assert data["std_dev"] is not None
# With UTL=10.5 and LTL=9.5, spread=1.0, Cp should be > 0
if data["cp"] is not None:
assert data["cp"] > 0
class TestSPCData:
"""GET /api/statistics/control-chart tests."""
async def test_spc_data(
self,
client: AsyncClient,
measurement_tec_user: User,
metrologist_user: User,
db_session: AsyncSession,
):
"""Control chart returns values, mean, UCL, LCL."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
subtask_id, _ = await _seed_measurements(
client, measurement_tec_user, recipe.id,
[10.0, 10.1, 9.9, 10.05, 9.95],
)
resp = await client.get(
"/api/statistics/control-chart",
headers=auth_headers(metrologist_user),
params={
"recipe_id": recipe.id,
"subtask_id": subtask_id,
},
)
assert resp.status_code == 200
data = resp.json()
assert len(data["values"]) == 5
assert data["mean"] is not None
assert data["ucl"] is not None
assert data["lcl"] is not None
assert data["ucl"] > data["mean"]
assert data["lcl"] < data["mean"]
class TestDateFilter:
"""Date filter tests."""
async def test_statistics_date_filter(
self,
client: AsyncClient,
measurement_tec_user: User,
metrologist_user: User,
db_session: AsyncSession,
):
"""Date filter parameters are accepted without error."""
recipe = await create_test_recipe(
db_session, measurement_tec_user.id
)
await _seed_measurements(
client, measurement_tec_user, recipe.id,
[10.0, 10.1],
)
resp = await client.get(
"/api/statistics/summary",
headers=auth_headers(metrologist_user),
params={
"recipe_id": recipe.id,
"date_from": "2020-01-01T00:00:00",
"date_to": "2099-12-31T23:59:59",
},
)
assert resp.status_code == 200
# All measurements should be within this wide range
assert resp.json()["total"] >= 2
+219
View File
@@ -0,0 +1,219 @@
"""Tests for tasks router (/api/recipes/{id}/tasks, /api/tasks, /api/subtasks)."""
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.user import User
from src.backend.tests.conftest import auth_headers, create_test_recipe
class TestListTasks:
"""GET /api/recipes/{recipe_id}/tasks tests."""
async def test_list_tasks(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Authenticated user can list tasks for a recipe."""
recipe = await create_test_recipe(db_session, maker_user.id)
resp = await client.get(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(maker_user),
)
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
assert len(data) >= 1
assert data[0]["title"] == "Test Task"
assert len(data[0]["subtasks"]) >= 1
class TestCreateTask:
"""POST /api/recipes/{recipe_id}/tasks tests."""
async def test_create_task(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Maker can add a task (triggers copy-on-write new version)."""
recipe = await create_test_recipe(db_session, maker_user.id)
resp = await client.post(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(maker_user),
json={
"title": "New Task",
"directive": "Inspect surface",
"subtasks": [
{
"marker_number": 1,
"description": "Surface roughness",
"nominal": 5.0,
"utl": 5.5,
"ltl": 4.5,
"unit": "um",
}
],
},
)
assert resp.status_code == 201
data = resp.json()
assert data["title"] == "New Task"
assert len(data["subtasks"]) == 1
assert data["subtasks"][0]["description"] == "Surface roughness"
class TestUpdateTask:
"""PUT /api/tasks/{task_id} tests."""
async def test_update_task(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Maker can update a task on the current version."""
recipe = await create_test_recipe(db_session, maker_user.id)
# Get task list to find the task ID
tasks_resp = await client.get(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(maker_user),
)
task_id = tasks_resp.json()[0]["id"]
resp = await client.put(
f"/api/tasks/{task_id}",
headers=auth_headers(maker_user),
json={"title": "Updated Task Title"},
)
assert resp.status_code == 200
assert resp.json()["title"] == "Updated Task Title"
class TestDeleteTask:
"""DELETE /api/tasks/{task_id} tests."""
async def test_delete_task(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Maker can delete a task from the current version."""
recipe = await create_test_recipe(db_session, maker_user.id)
tasks_resp = await client.get(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(maker_user),
)
task_id = tasks_resp.json()[0]["id"]
resp = await client.delete(
f"/api/tasks/{task_id}", headers=auth_headers(maker_user)
)
assert resp.status_code == 204
class TestCreateSubtask:
"""POST /api/tasks/{task_id}/subtasks tests."""
async def test_create_subtask(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Maker can add a subtask to a task."""
recipe = await create_test_recipe(db_session, maker_user.id)
tasks_resp = await client.get(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(maker_user),
)
task_id = tasks_resp.json()[0]["id"]
resp = await client.post(
f"/api/tasks/{task_id}/subtasks",
headers=auth_headers(maker_user),
json={
"marker_number": 2,
"description": "Width measurement",
"nominal": 20.0,
"utl": 20.5,
"ltl": 19.5,
"unit": "mm",
},
)
assert resp.status_code == 201
data = resp.json()
assert data["marker_number"] == 2
assert data["description"] == "Width measurement"
class TestUpdateSubtask:
"""PUT /api/subtasks/{subtask_id} tests."""
async def test_update_subtask(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Maker can update a subtask."""
recipe = await create_test_recipe(db_session, maker_user.id)
tasks_resp = await client.get(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(maker_user),
)
subtask_id = tasks_resp.json()[0]["subtasks"][0]["id"]
resp = await client.put(
f"/api/subtasks/{subtask_id}",
headers=auth_headers(maker_user),
json={"description": "Updated description", "nominal": 12.0},
)
assert resp.status_code == 200
assert resp.json()["description"] == "Updated description"
class TestReorderTasks:
"""PUT /api/tasks/reorder tests."""
async def test_reorder_tasks(
self,
client: AsyncClient,
maker_user: User,
db_session: AsyncSession,
):
"""Maker can reorder tasks by providing task IDs in desired order."""
recipe = await create_test_recipe(db_session, maker_user.id)
# Add a second task via copy-on-write
add_resp = await client.post(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(maker_user),
json={"title": "Second Task", "subtasks": []},
)
assert add_resp.status_code == 201
# Get tasks from new version
tasks_resp = await client.get(
f"/api/recipes/{recipe.id}/tasks",
headers=auth_headers(maker_user),
)
tasks = tasks_resp.json()
assert len(tasks) >= 2
task_ids = [t["id"] for t in tasks]
# Reverse the order
reversed_ids = list(reversed(task_ids))
resp = await client.put(
"/api/tasks/reorder",
headers=auth_headers(maker_user),
json={"task_ids": reversed_ids},
)
assert resp.status_code == 200
reordered = resp.json()
assert [t["id"] for t in reordered] == reversed_ids
+165
View File
@@ -0,0 +1,165 @@
"""Tests for users router (/api/users)."""
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from src.backend.models.orm.user import User
from src.backend.tests.conftest import _create_user, auth_headers
class TestListUsers:
"""GET /api/users tests."""
async def test_list_users_admin(
self, client: AsyncClient, admin_user: User
):
"""Admin can list all users."""
resp = await client.get("/api/users", headers=auth_headers(admin_user))
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
# At least the admin user
assert len(data) >= 1
usernames = [u["username"] for u in data]
assert "admin" in usernames
async def test_list_users_forbidden(
self, client: AsyncClient, maker_user: User
):
"""Non-admin user cannot list users."""
resp = await client.get("/api/users", headers=auth_headers(maker_user))
assert resp.status_code == 403
class TestGetUser:
"""GET /api/users/{user_id} is not directly exposed; use get_me or list."""
async def test_get_me(self, client: AsyncClient, maker_user: User):
"""GET /api/auth/me returns current user profile."""
resp = await client.get(
"/api/auth/me", headers=auth_headers(maker_user)
)
assert resp.status_code == 200
data = resp.json()
assert data["username"] == "maker"
assert data["display_name"] == "Maker User"
assert "Maker" in data["roles"]
class TestCreateUser:
"""POST /api/users tests."""
async def test_create_user(
self, client: AsyncClient, admin_user: User
):
"""Admin can create a new user."""
resp = await client.post(
"/api/users",
headers=auth_headers(admin_user),
json={
"username": "newuser",
"password": "NewPass123",
"display_name": "New User",
"roles": ["MeasurementTec"],
"is_admin": False,
},
)
assert resp.status_code == 201
data = resp.json()
assert data["username"] == "newuser"
assert data["display_name"] == "New User"
assert "MeasurementTec" in data["roles"]
assert data["active"] is True
async def test_create_user_duplicate_username(
self, client: AsyncClient, admin_user: User
):
"""Creating a user with existing username returns 409."""
resp = await client.post(
"/api/users",
headers=auth_headers(admin_user),
json={
"username": "admin",
"password": "DupPass123",
"display_name": "Dup",
"roles": [],
},
)
assert resp.status_code == 409
class TestUpdateUser:
"""PUT /api/users/{user_id} tests."""
async def test_update_user(
self,
client: AsyncClient,
admin_user: User,
maker_user: User,
):
"""Admin can update another user."""
resp = await client.put(
f"/api/users/{maker_user.id}",
headers=auth_headers(admin_user),
json={"display_name": "Updated Maker"},
)
assert resp.status_code == 200
assert resp.json()["display_name"] == "Updated Maker"
class TestDeleteUser:
"""DELETE /api/users/{user_id} tests."""
async def test_delete_user(
self,
client: AsyncClient,
admin_user: User,
db_session: AsyncSession,
):
"""Admin can soft-delete another user."""
target = await _create_user(
db_session, username="to_delete", display_name="Delete Me"
)
resp = await client.delete(
f"/api/users/{target.id}", headers=auth_headers(admin_user)
)
assert resp.status_code == 204
async def test_delete_self_forbidden(
self, client: AsyncClient, admin_user: User
):
"""Admin cannot deactivate themselves."""
resp = await client.delete(
f"/api/users/{admin_user.id}", headers=auth_headers(admin_user)
)
assert resp.status_code == 400
assert "yourself" in resp.json()["detail"]
class TestProfile:
"""PUT /api/auth/me tests."""
async def test_update_me(self, client: AsyncClient, maker_user: User):
"""User can update own profile."""
resp = await client.put(
"/api/auth/me",
headers=auth_headers(maker_user),
json={"display_name": "My New Name", "theme_pref": "dark"},
)
assert resp.status_code == 200
data = resp.json()
assert data["display_name"] == "My New Name"
assert data["theme_pref"] == "dark"
async def test_change_password_not_in_profile(
self, client: AsyncClient, maker_user: User
):
"""Profile update schema does not accept password field (422)."""
resp = await client.put(
"/api/auth/me",
headers=auth_headers(maker_user),
json={"password": "HackedPass1"},
)
# The field is simply ignored (exclude_unset), so no change should occur
# but the request itself is valid (empty update)
assert resp.status_code == 200