from __future__ import annotations import json import os import time import uuid from collections.abc import Callable from contextlib import AbstractAsyncContextManager from datetime import UTC, datetime from typing import Any from fastapi import FastAPI, HTTPException, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, Response from starlette.middleware.base import BaseHTTPMiddleware from mcp_common.auth import TokenStore Lifespan = Callable[[FastAPI], AbstractAsyncContextManager[None]] def _error_envelope( *, type_: str, code: str, message: str, retryable: bool, suggested_fix: str | None = None, details: dict | None = None, request_id: str | None = None, ) -> dict: env: dict[str, Any] = { "error": { "type": type_, "code": code, "message": message, "retryable": retryable, }, "request_id": request_id or uuid.uuid4().hex, "data_timestamp": datetime.now(UTC).isoformat(), } if suggested_fix: env["error"]["suggested_fix"] = suggested_fix if details: env["error"]["details"] = details return env class _TimestampInjectorMiddleware(BaseHTTPMiddleware): """CER-P5-001: inietta data_timestamp nei response tool. - Dict response: body gains `data_timestamp` se mancante. - List of dicts: ogni item gains `data_timestamp` se mancante. - Header `X-Data-Timestamp` sempre presente (universale per list primitive). Skips /health (già popolato) e /mcp (JSON-RPC bridge) e non-JSON responses. """ async def dispatch(self, request: Request, call_next): response = await call_next(request) path = request.url.path if not path.startswith("/tools/"): return response ctype = response.headers.get("content-type", "") if "application/json" not in ctype: return response body = b"" async for chunk in response.body_iterator: body += chunk ts = datetime.now(UTC).isoformat() try: data = json.loads(body) if body else None except Exception: headers = dict(response.headers) headers["X-Data-Timestamp"] = ts return Response( content=body, status_code=response.status_code, headers=headers, media_type=response.media_type, ) modified = False if isinstance(data, dict) and "data_timestamp" not in data: data["data_timestamp"] = ts modified = True elif isinstance(data, list): for item in data: if isinstance(item, dict) and "data_timestamp" not in item: item["data_timestamp"] = ts modified = True headers = dict(response.headers) headers["X-Data-Timestamp"] = ts if modified: new_body = json.dumps(data, default=str).encode() headers.pop("content-length", None) return Response( content=new_body, status_code=response.status_code, headers=headers, media_type="application/json", ) return Response( content=body, status_code=response.status_code, headers=headers, media_type=response.media_type, ) def build_app( *, name: str, version: str, token_store: TokenStore, lifespan: Lifespan | None = None, ) -> FastAPI: root_path = os.getenv("ROOT_PATH", "") app = FastAPI(title=name, version=version, root_path=root_path, lifespan=lifespan) app.state.token_store = token_store app.state.boot_at = time.time() app.add_middleware(_TimestampInjectorMiddleware) @app.middleware("http") async def _latency_header(request: Request, call_next): t0 = time.perf_counter() response = await call_next(request) dur_ms = (time.perf_counter() - t0) * 1000 response.headers["X-Duration-Ms"] = f"{dur_ms:.2f}" return response # CER-P5-002 error envelope: exception handlers globali @app.exception_handler(HTTPException) async def _http_exc(request: Request, exc: HTTPException): retryable = exc.status_code in (408, 429, 502, 503, 504) code_map = { 400: "BAD_REQUEST", 401: "UNAUTHORIZED", 403: "FORBIDDEN", 404: "NOT_FOUND", 408: "TIMEOUT", 409: "CONFLICT", 422: "VALIDATION_ERROR", 429: "RATE_LIMIT", 500: "INTERNAL_ERROR", 502: "UPSTREAM_ERROR", 503: "UNAVAILABLE", 504: "GATEWAY_TIMEOUT", } code = code_map.get(exc.status_code, f"HTTP_{exc.status_code}") message = "HTTP error" details: dict | None = None detail = exc.detail # Preserve rail-style detail {"error": "..", "message": ".."} as code if isinstance(detail, dict): if isinstance(detail.get("error"), str): code = detail["error"].upper() message = str(detail.get("message") or detail.get("error") or message) details = detail elif isinstance(detail, str): message = detail return JSONResponse( status_code=exc.status_code, content=_error_envelope( type_="http_error", code=code, message=message, retryable=retryable, details=details, ), ) @app.exception_handler(RequestValidationError) async def _validation_exc(request: Request, exc: RequestValidationError): errs = exc.errors() first_loc = ".".join(str(x) for x in errs[0]["loc"]) if errs else "body" suggestion = ( f"check field '{first_loc}': " + (errs[0]["msg"] if errs else "invalid input") ) # Sanitize ctx values: pydantic v2 può mettere ValueError in ctx['error'], # non serializzabile JSON. Riduci a stringhe. safe_errs: list[dict] = [] for e in errs[:5]: ne: dict = {} for k, v in e.items(): if k == "ctx" and isinstance(v, dict): ne[k] = {ck: str(cv) for ck, cv in v.items()} else: ne[k] = v safe_errs.append(ne) return JSONResponse( status_code=422, content=_error_envelope( type_="validation_error", code="INVALID_INPUT", message=f"request body validation failed on {first_loc}", retryable=False, suggested_fix=suggestion, details={"errors": safe_errs}, ), ) @app.exception_handler(Exception) async def _unhandled(request: Request, exc: Exception): return JSONResponse( status_code=500, content=_error_envelope( type_="internal_error", code="UNHANDLED_EXCEPTION", message=f"{type(exc).__name__}: {str(exc)[:300]}", retryable=True, ), ) @app.get("/health") def health(): return { "status": "healthy", "name": name, "version": version, "uptime_seconds": int(time.time() - app.state.boot_at), "data_timestamp": datetime.now(UTC).isoformat(), } return app