From 2934a2d26af7c52a24cb45a97c0fda1d869dd7a8 Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Thu, 30 Apr 2026 18:09:21 +0200 Subject: [PATCH] feat(V2): bearer auth middleware con compare_digest Implementa install_auth_middleware con whitelist /health /apidocs /openapi.json, token timing-safe via secrets.compare_digest, request.state.environment injection. Fix pyproject: --import-mode=prepend (importlib + PEP563 rompe FastAPI Request injection). Rimosso from __future__ import annotations da test_auth.py per stesso motivo. Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 +- src/cerbero_mcp/auth.py | 60 ++++++++++++++++++++++++ tests/unit/test_auth.py | 100 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 src/cerbero_mcp/auth.py create mode 100644 tests/unit/test_auth.py diff --git a/pyproject.toml b/pyproject.toml index 2c6e4cc..2eea56d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,7 +58,7 @@ extend-immutable-calls = [ [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] -addopts = "--import-mode=importlib" +addopts = "--import-mode=prepend" [tool.mypy] python_version = "3.11" diff --git a/src/cerbero_mcp/auth.py b/src/cerbero_mcp/auth.py new file mode 100644 index 0000000..08d0a1b --- /dev/null +++ b/src/cerbero_mcp/auth.py @@ -0,0 +1,60 @@ +"""Bearer auth middleware: bearer token → request.state.environment.""" +from __future__ import annotations + +import secrets +from typing import Literal + +from fastapi import FastAPI, Request, status +from fastapi.responses import JSONResponse + +Environment = Literal["testnet", "mainnet"] + +WHITELIST_PATHS = frozenset({"/health", "/apidocs", "/openapi.json"}) + + +def _extract_bearer(auth_header: str) -> str | None: + if not auth_header.startswith("Bearer "): + return None + token = auth_header[len("Bearer "):].strip() + return token or None + + +def _check_token( + candidate: str, testnet_token: str, mainnet_token: str +) -> Environment | None: + if secrets.compare_digest(candidate, testnet_token): + return "testnet" + if secrets.compare_digest(candidate, mainnet_token): + return "mainnet" + return None + + +def install_auth_middleware( + app: FastAPI, + *, + testnet_token: str, + mainnet_token: str, +) -> None: + """Registra middleware di auth bearer sull'app FastAPI.""" + + @app.middleware("http") + async def auth_middleware(request: Request, call_next): + if request.url.path in WHITELIST_PATHS: + return await call_next(request) + + token = _extract_bearer(request.headers.get("Authorization", "")) + if token is None: + return JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={"error": {"code": "UNAUTHORIZED", + "message": "missing or malformed bearer token"}}, + ) + env = _check_token(token, testnet_token, mainnet_token) + if env is None: + return JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={"error": {"code": "UNAUTHORIZED", + "message": "invalid token"}}, + ) + request.state.environment = env + return await call_next(request) diff --git a/tests/unit/test_auth.py b/tests/unit/test_auth.py new file mode 100644 index 0000000..2e03924 --- /dev/null +++ b/tests/unit/test_auth.py @@ -0,0 +1,100 @@ +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + + +def test_health_no_auth_required(): + from cerbero_mcp.auth import install_auth_middleware + fa = FastAPI() + install_auth_middleware(fa, testnet_token="t", mainnet_token="m") + + @fa.get("/health") + def h(): + return {"ok": True} + + c = TestClient(fa) + r = c.get("/health") + assert r.status_code == 200 + + +def test_apidocs_no_auth_required(): + from cerbero_mcp.auth import install_auth_middleware + fa = FastAPI(docs_url="/apidocs") + install_auth_middleware(fa, testnet_token="t", mainnet_token="m") + + c = TestClient(fa) + r = c.get("/apidocs") + assert r.status_code == 200 + r = c.get("/openapi.json") + assert r.status_code == 200 + + +def test_missing_authorization_header_401(): + from cerbero_mcp.auth import install_auth_middleware + fa = FastAPI() + install_auth_middleware(fa, testnet_token="t", mainnet_token="m") + + @fa.get("/mcp-deribit/health") + def h(): + return {"ok": True} + + c = TestClient(fa) + r = c.get("/mcp-deribit/health") + assert r.status_code == 401 + + +def test_invalid_bearer_401(): + from cerbero_mcp.auth import install_auth_middleware + fa = FastAPI() + install_auth_middleware(fa, testnet_token="t", mainnet_token="m") + + @fa.get("/mcp-deribit/health") + def h(): + return {"ok": True} + + c = TestClient(fa) + r = c.get("/mcp-deribit/health", headers={"Authorization": "Bearer wrong"}) + assert r.status_code == 401 + + +def test_testnet_token_sets_env_testnet(): + from cerbero_mcp.auth import install_auth_middleware + from fastapi import Request + + fa = FastAPI() + install_auth_middleware(fa, testnet_token="tk_test", mainnet_token="tk_live") + + @fa.get("/mcp-deribit/peek") + def peek(request: Request): + return {"env": request.state.environment} + + c = TestClient(fa) + r = c.get("/mcp-deribit/peek", headers={"Authorization": "Bearer tk_test"}) + assert r.status_code == 200 + assert r.json() == {"env": "testnet"} + + +def test_mainnet_token_sets_env_mainnet(): + from cerbero_mcp.auth import install_auth_middleware + from fastapi import Request + + fa = FastAPI() + install_auth_middleware(fa, testnet_token="tk_test", mainnet_token="tk_live") + + @fa.get("/mcp-deribit/peek") + def peek(request: Request): + return {"env": request.state.environment} + + c = TestClient(fa) + r = c.get("/mcp-deribit/peek", headers={"Authorization": "Bearer tk_live"}) + assert r.status_code == 200 + assert r.json() == {"env": "mainnet"} + + +def test_uses_compare_digest(): + """Verifica che _check_token usi secrets.compare_digest (timing-safe).""" + import inspect + from cerbero_mcp import auth + + src = inspect.getsource(auth) + assert "compare_digest" in src, "auth.py deve usare secrets.compare_digest"