Files
Cerbero-mcp/src/cerbero_mcp/auth.py
T
AdrianoDev 2a268b3a33 feat(V2): build_app con swagger /apidocs + middleware + handlers
Aggiunge /docs e /redoc alla whitelist auth (path disabilitati, nessun rischio sicurezza).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 18:20:17 +02:00

61 lines
1.9 KiB
Python

"""Bearer auth middleware: bearer token → request.state.environment."""
from __future__ import annotations
import secrets
from typing import Literal
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
Environment = Literal["testnet", "mainnet"]
WHITELIST_PATHS = frozenset({"/health", "/apidocs", "/openapi.json", "/docs", "/redoc"})
def _extract_bearer(auth_header: str) -> str | None:
if not auth_header.startswith("Bearer "):
return None
token = auth_header[len("Bearer "):].strip()
return token or None
def _check_token(
candidate: str, testnet_token: str, mainnet_token: str
) -> Environment | None:
if secrets.compare_digest(candidate, testnet_token):
return "testnet"
if secrets.compare_digest(candidate, mainnet_token):
return "mainnet"
return None
def install_auth_middleware(
app: FastAPI,
*,
testnet_token: str,
mainnet_token: str,
) -> None:
"""Registra middleware di auth bearer sull'app FastAPI."""
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
if request.url.path in WHITELIST_PATHS:
return await call_next(request)
token = _extract_bearer(request.headers.get("Authorization", ""))
if token is None:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"error": {"code": "UNAUTHORIZED",
"message": "missing or malformed bearer token"}},
)
env = _check_token(token, testnet_token, mainnet_token)
if env is None:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"error": {"code": "UNAUTHORIZED",
"message": "invalid token"}},
)
request.state.environment = env
return await call_next(request)