"""Bearer auth middleware: bearer token → request.state.environment. Inoltre richiede header `X-Bot-Tag` su tutte le chiamate non whitelisted, così che l'audit log identifichi il bot chiamante. """ from __future__ import annotations import secrets from typing import Literal from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse Environment = Literal["testnet", "mainnet"] # Path che bypassano sia bearer auth sia bot_tag check. PATH_WHITELIST_FULL = frozenset( { "/health", "/health/ready", "/apidocs", "/openapi.json", "/docs", "/redoc", } ) # Path che richiedono bearer ma NON il bot_tag (admin endpoint). PATH_WHITELIST_BOT_TAG_ONLY = frozenset({"/admin/audit"}) # Backward-compat alias (vecchi import). WHITELIST_PATHS = PATH_WHITELIST_FULL MAX_BOT_TAG_LEN = 64 def _extract_bearer(auth_header: str) -> str | None: if not auth_header.startswith("Bearer "): return None token = auth_header[len("Bearer "):].strip() return token or None def _check_token( candidate: str, testnet_token: str, mainnet_token: str ) -> Environment | None: if secrets.compare_digest(candidate, testnet_token): return "testnet" if secrets.compare_digest(candidate, mainnet_token): return "mainnet" return None def install_auth_middleware( app: FastAPI, *, testnet_token: str, mainnet_token: str, ) -> None: """Registra middleware di auth bearer + bot_tag sull'app FastAPI.""" @app.middleware("http") async def auth_middleware(request: Request, call_next): path = request.url.path # 1. Whitelist totale: nessun check. if path in PATH_WHITELIST_FULL: return await call_next(request) # 2. Bearer auth (sempre richiesto). token = _extract_bearer(request.headers.get("Authorization", "")) if token is None: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"error": {"code": "UNAUTHORIZED", "message": "missing or malformed bearer token"}}, ) env = _check_token(token, testnet_token, mainnet_token) if env is None: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"error": {"code": "UNAUTHORIZED", "message": "invalid token"}}, ) request.state.environment = env # 3. Whitelist parziale (admin): bearer ok, no bot_tag check. if path in PATH_WHITELIST_BOT_TAG_ONLY: return await call_next(request) # 4. X-Bot-Tag obbligatorio. raw_tag = request.headers.get("X-Bot-Tag", "") tag = raw_tag.strip() if raw_tag else "" if not tag: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"error": {"code": "BAD_REQUEST", "message": "missing X-Bot-Tag header"}}, ) if len(tag) > MAX_BOT_TAG_LEN: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"error": {"code": "BAD_REQUEST", "message": "X-Bot-Tag too long"}}, ) request.state.bot_tag = tag return await call_next(request)