feat(V2): bearer auth middleware con compare_digest

Implementa install_auth_middleware con whitelist /health /apidocs /openapi.json,
token timing-safe via secrets.compare_digest, request.state.environment injection.
Fix pyproject: --import-mode=prepend (importlib + PEP563 rompe FastAPI Request injection).
Rimosso from __future__ import annotations da test_auth.py per stesso motivo.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
AdrianoDev
2026-04-30 18:09:21 +02:00
parent 97d93a5139
commit 2934a2d26a
3 changed files with 161 additions and 1 deletions
+1 -1
View File
@@ -58,7 +58,7 @@ extend-immutable-calls = [
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "--import-mode=importlib"
addopts = "--import-mode=prepend"
[tool.mypy]
python_version = "3.11"
+60
View File
@@ -0,0 +1,60 @@
"""Bearer auth middleware: bearer token → request.state.environment."""
from __future__ import annotations
import secrets
from typing import Literal
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
Environment = Literal["testnet", "mainnet"]
WHITELIST_PATHS = frozenset({"/health", "/apidocs", "/openapi.json"})
def _extract_bearer(auth_header: str) -> str | None:
if not auth_header.startswith("Bearer "):
return None
token = auth_header[len("Bearer "):].strip()
return token or None
def _check_token(
candidate: str, testnet_token: str, mainnet_token: str
) -> Environment | None:
if secrets.compare_digest(candidate, testnet_token):
return "testnet"
if secrets.compare_digest(candidate, mainnet_token):
return "mainnet"
return None
def install_auth_middleware(
app: FastAPI,
*,
testnet_token: str,
mainnet_token: str,
) -> None:
"""Registra middleware di auth bearer sull'app FastAPI."""
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
if request.url.path in WHITELIST_PATHS:
return await call_next(request)
token = _extract_bearer(request.headers.get("Authorization", ""))
if token is None:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"error": {"code": "UNAUTHORIZED",
"message": "missing or malformed bearer token"}},
)
env = _check_token(token, testnet_token, mainnet_token)
if env is None:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"error": {"code": "UNAUTHORIZED",
"message": "invalid token"}},
)
request.state.environment = env
return await call_next(request)
+100
View File
@@ -0,0 +1,100 @@
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
def test_health_no_auth_required():
from cerbero_mcp.auth import install_auth_middleware
fa = FastAPI()
install_auth_middleware(fa, testnet_token="t", mainnet_token="m")
@fa.get("/health")
def h():
return {"ok": True}
c = TestClient(fa)
r = c.get("/health")
assert r.status_code == 200
def test_apidocs_no_auth_required():
from cerbero_mcp.auth import install_auth_middleware
fa = FastAPI(docs_url="/apidocs")
install_auth_middleware(fa, testnet_token="t", mainnet_token="m")
c = TestClient(fa)
r = c.get("/apidocs")
assert r.status_code == 200
r = c.get("/openapi.json")
assert r.status_code == 200
def test_missing_authorization_header_401():
from cerbero_mcp.auth import install_auth_middleware
fa = FastAPI()
install_auth_middleware(fa, testnet_token="t", mainnet_token="m")
@fa.get("/mcp-deribit/health")
def h():
return {"ok": True}
c = TestClient(fa)
r = c.get("/mcp-deribit/health")
assert r.status_code == 401
def test_invalid_bearer_401():
from cerbero_mcp.auth import install_auth_middleware
fa = FastAPI()
install_auth_middleware(fa, testnet_token="t", mainnet_token="m")
@fa.get("/mcp-deribit/health")
def h():
return {"ok": True}
c = TestClient(fa)
r = c.get("/mcp-deribit/health", headers={"Authorization": "Bearer wrong"})
assert r.status_code == 401
def test_testnet_token_sets_env_testnet():
from cerbero_mcp.auth import install_auth_middleware
from fastapi import Request
fa = FastAPI()
install_auth_middleware(fa, testnet_token="tk_test", mainnet_token="tk_live")
@fa.get("/mcp-deribit/peek")
def peek(request: Request):
return {"env": request.state.environment}
c = TestClient(fa)
r = c.get("/mcp-deribit/peek", headers={"Authorization": "Bearer tk_test"})
assert r.status_code == 200
assert r.json() == {"env": "testnet"}
def test_mainnet_token_sets_env_mainnet():
from cerbero_mcp.auth import install_auth_middleware
from fastapi import Request
fa = FastAPI()
install_auth_middleware(fa, testnet_token="tk_test", mainnet_token="tk_live")
@fa.get("/mcp-deribit/peek")
def peek(request: Request):
return {"env": request.state.environment}
c = TestClient(fa)
r = c.get("/mcp-deribit/peek", headers={"Authorization": "Bearer tk_live"})
assert r.status_code == 200
assert r.json() == {"env": "mainnet"}
def test_uses_compare_digest():
"""Verifica che _check_token usi secrets.compare_digest (timing-safe)."""
import inspect
from cerbero_mcp import auth
src = inspect.getsource(auth)
assert "compare_digest" in src, "auth.py deve usare secrets.compare_digest"