feat(V2): build_app con swagger /apidocs + middleware + handlers
Aggiunge /docs e /redoc alla whitelist auth (path disabilitati, nessun rischio sicurezza). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -9,7 +9,7 @@ from fastapi.responses import JSONResponse
|
||||
|
||||
Environment = Literal["testnet", "mainnet"]
|
||||
|
||||
WHITELIST_PATHS = frozenset({"/health", "/apidocs", "/openapi.json"})
|
||||
WHITELIST_PATHS = frozenset({"/health", "/apidocs", "/openapi.json", "/docs", "/redoc"})
|
||||
|
||||
|
||||
def _extract_bearer(auth_header: str) -> str | None:
|
||||
|
||||
@@ -0,0 +1,205 @@
|
||||
"""Factory FastAPI app con middleware, swagger, exception handlers."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from fastapi.openapi.utils import get_openapi
|
||||
from fastapi.responses import JSONResponse, Response
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
|
||||
from cerbero_mcp.auth import install_auth_middleware
|
||||
from cerbero_mcp.common.errors import (
|
||||
HTTP_CODE_MAP,
|
||||
RETRYABLE_STATUSES,
|
||||
error_envelope,
|
||||
)
|
||||
|
||||
|
||||
class _TimestampInjectorMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(self, request: Request, call_next):
|
||||
response = await call_next(request)
|
||||
path = request.url.path
|
||||
if "/tools/" not in path:
|
||||
return response
|
||||
ctype = response.headers.get("content-type", "")
|
||||
if "application/json" not in ctype:
|
||||
return response
|
||||
body = b""
|
||||
async for chunk in response.body_iterator:
|
||||
body += chunk
|
||||
ts = datetime.now(UTC).isoformat()
|
||||
try:
|
||||
data = json.loads(body) if body else None
|
||||
except Exception:
|
||||
headers = dict(response.headers)
|
||||
headers["X-Data-Timestamp"] = ts
|
||||
return Response(
|
||||
content=body, status_code=response.status_code,
|
||||
headers=headers, media_type=response.media_type,
|
||||
)
|
||||
modified = False
|
||||
if isinstance(data, dict) and "data_timestamp" not in data:
|
||||
data["data_timestamp"] = ts
|
||||
modified = True
|
||||
elif isinstance(data, list):
|
||||
for item in data:
|
||||
if isinstance(item, dict) and "data_timestamp" not in item:
|
||||
item["data_timestamp"] = ts
|
||||
modified = True
|
||||
headers = dict(response.headers)
|
||||
headers["X-Data-Timestamp"] = ts
|
||||
if modified:
|
||||
new_body = json.dumps(data, default=str).encode()
|
||||
headers.pop("content-length", None)
|
||||
return Response(
|
||||
content=new_body, status_code=response.status_code,
|
||||
headers=headers, media_type="application/json",
|
||||
)
|
||||
return Response(
|
||||
content=body, status_code=response.status_code,
|
||||
headers=headers, media_type=response.media_type,
|
||||
)
|
||||
|
||||
|
||||
def build_app(
|
||||
*,
|
||||
testnet_token: str,
|
||||
mainnet_token: str,
|
||||
title: str = "Cerbero MCP",
|
||||
version: str = "2.0.0",
|
||||
description: str = (
|
||||
"Multi-exchange MCP server. "
|
||||
"Bearer token decides environment (testnet/mainnet)."
|
||||
),
|
||||
) -> FastAPI:
|
||||
app = FastAPI(
|
||||
title=title,
|
||||
version=version,
|
||||
description=description,
|
||||
docs_url="/apidocs",
|
||||
redoc_url=None,
|
||||
openapi_url="/openapi.json",
|
||||
swagger_ui_parameters={
|
||||
"persistAuthorization": True,
|
||||
"displayRequestDuration": True,
|
||||
"filter": True,
|
||||
"tryItOutEnabled": True,
|
||||
"tagsSorter": "alpha",
|
||||
"operationsSorter": "alpha",
|
||||
},
|
||||
)
|
||||
app.state.boot_at = time.time()
|
||||
|
||||
install_auth_middleware(
|
||||
app, testnet_token=testnet_token, mainnet_token=mainnet_token
|
||||
)
|
||||
|
||||
app.add_middleware(_TimestampInjectorMiddleware)
|
||||
|
||||
@app.middleware("http")
|
||||
async def latency_header(request: Request, call_next):
|
||||
t0 = time.perf_counter()
|
||||
response = await call_next(request)
|
||||
dur_ms = (time.perf_counter() - t0) * 1000
|
||||
response.headers["X-Duration-Ms"] = f"{dur_ms:.2f}"
|
||||
return response
|
||||
|
||||
@app.exception_handler(HTTPException)
|
||||
async def _http_exc(request: Request, exc: HTTPException):
|
||||
retryable = exc.status_code in RETRYABLE_STATUSES
|
||||
code = HTTP_CODE_MAP.get(exc.status_code, f"HTTP_{exc.status_code}")
|
||||
message = "HTTP error"
|
||||
details: dict | None = None
|
||||
detail = exc.detail
|
||||
if isinstance(detail, dict):
|
||||
if isinstance(detail.get("error"), str):
|
||||
code = detail["error"].upper()
|
||||
message = str(detail.get("message") or detail.get("error") or message)
|
||||
details = detail
|
||||
elif isinstance(detail, str):
|
||||
message = detail
|
||||
return JSONResponse(
|
||||
status_code=exc.status_code,
|
||||
content=error_envelope(
|
||||
type_="http_error", code=code, message=message,
|
||||
retryable=retryable, details=details,
|
||||
),
|
||||
)
|
||||
|
||||
@app.exception_handler(RequestValidationError)
|
||||
async def _val_exc(request: Request, exc: RequestValidationError):
|
||||
errs = exc.errors()
|
||||
first_loc = ".".join(str(x) for x in errs[0]["loc"]) if errs else "body"
|
||||
suggestion = (
|
||||
f"check field '{first_loc}': "
|
||||
+ (errs[0]["msg"] if errs else "invalid input")
|
||||
)
|
||||
safe_errs: list[dict] = []
|
||||
for e in errs[:5]:
|
||||
ne: dict = {}
|
||||
for k, v in e.items():
|
||||
if k == "ctx" and isinstance(v, dict):
|
||||
ne[k] = {ck: str(cv) for ck, cv in v.items()}
|
||||
else:
|
||||
ne[k] = v
|
||||
safe_errs.append(ne)
|
||||
return JSONResponse(
|
||||
status_code=422,
|
||||
content=error_envelope(
|
||||
type_="validation_error", code="INVALID_INPUT",
|
||||
message=f"request body validation failed on {first_loc}",
|
||||
retryable=False, suggested_fix=suggestion,
|
||||
details={"errors": safe_errs},
|
||||
),
|
||||
)
|
||||
|
||||
@app.exception_handler(Exception)
|
||||
async def _unhandled(request: Request, exc: Exception):
|
||||
return JSONResponse(
|
||||
status_code=500,
|
||||
content=error_envelope(
|
||||
type_="internal_error", code="UNHANDLED_EXCEPTION",
|
||||
message=f"{type(exc).__name__}: {str(exc)[:300]}",
|
||||
retryable=True,
|
||||
),
|
||||
)
|
||||
|
||||
@app.get("/health", tags=["system"])
|
||||
def health():
|
||||
return {
|
||||
"status": "healthy",
|
||||
"name": title,
|
||||
"version": version,
|
||||
"uptime_seconds": int(time.time() - app.state.boot_at),
|
||||
"data_timestamp": datetime.now(UTC).isoformat(),
|
||||
}
|
||||
|
||||
def _custom_openapi() -> dict[str, Any]:
|
||||
if app.openapi_schema:
|
||||
return app.openapi_schema
|
||||
schema = get_openapi(
|
||||
title=app.title, version=app.version,
|
||||
description=app.description, routes=app.routes,
|
||||
)
|
||||
schema.setdefault("components", {})
|
||||
schema["components"]["securitySchemes"] = {
|
||||
"BearerAuth": {
|
||||
"type": "http",
|
||||
"scheme": "bearer",
|
||||
"description": (
|
||||
"Use TESTNET_TOKEN for testnet routing, "
|
||||
"MAINNET_TOKEN for mainnet."
|
||||
),
|
||||
}
|
||||
}
|
||||
schema["security"] = [{"BearerAuth": []}]
|
||||
app.openapi_schema = schema
|
||||
return schema
|
||||
|
||||
app.openapi = _custom_openapi
|
||||
return app
|
||||
@@ -0,0 +1,62 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app():
|
||||
from cerbero_mcp.server import build_app
|
||||
return build_app(
|
||||
testnet_token="tk_test",
|
||||
mainnet_token="tk_live",
|
||||
title="Test",
|
||||
version="2.0.0",
|
||||
)
|
||||
|
||||
|
||||
def test_apidocs_served(app):
|
||||
c = TestClient(app)
|
||||
r = c.get("/apidocs")
|
||||
assert r.status_code == 200
|
||||
assert "swagger" in r.text.lower()
|
||||
|
||||
|
||||
def test_openapi_json_served(app):
|
||||
c = TestClient(app)
|
||||
r = c.get("/openapi.json")
|
||||
assert r.status_code == 200
|
||||
spec = r.json()
|
||||
assert spec["info"]["title"] == "Test"
|
||||
# securityScheme BearerAuth presente
|
||||
assert "BearerAuth" in spec["components"]["securitySchemes"]
|
||||
assert spec["components"]["securitySchemes"]["BearerAuth"]["scheme"] == "bearer"
|
||||
|
||||
|
||||
def test_redoc_disabled(app):
|
||||
c = TestClient(app)
|
||||
r = c.get("/redoc")
|
||||
assert r.status_code == 404
|
||||
|
||||
|
||||
def test_default_docs_path_disabled(app):
|
||||
c = TestClient(app)
|
||||
r = c.get("/docs")
|
||||
assert r.status_code == 404
|
||||
|
||||
|
||||
def test_health_endpoint(app):
|
||||
c = TestClient(app)
|
||||
r = c.get("/health")
|
||||
assert r.status_code == 200
|
||||
j = r.json()
|
||||
assert j["status"] == "healthy"
|
||||
assert j["version"] == "2.0.0"
|
||||
assert "uptime_seconds" in j
|
||||
assert "data_timestamp" in j
|
||||
|
||||
|
||||
def test_x_duration_ms_header(app):
|
||||
c = TestClient(app)
|
||||
r = c.get("/health")
|
||||
assert "X-Duration-Ms" in r.headers
|
||||
Reference in New Issue
Block a user