feat(V2): cabla audit logging nei write endpoint dei 4 router exchange

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
AdrianoDev
2026-05-01 08:44:28 +02:00
parent 43bf8fc461
commit bd6b03ce43
9 changed files with 442 additions and 37 deletions
+94
View File
@@ -0,0 +1,94 @@
"""Helper per cablare audit_write_op nei router.
Pattern uso nel router::
@r.post("/tools/place_order")
async def _place_order(
params: t.PlaceOrderReq,
request: Request,
client: DeribitClient = Depends(get_deribit_client),
):
return await audit_call(
request=request,
exchange="deribit",
action="place_order",
target_field="instrument_name",
params=params,
tool_fn=lambda: t.place_order(client, params, creds=...),
)
"""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import Any
from fastapi import Request
from pydantic import BaseModel
from cerbero_mcp.common.audit import audit_write_op
def _extract_target(params: BaseModel | None, target_field: str | None) -> str | None:
if params is None or target_field is None:
return None
val = getattr(params, target_field, None)
if val is None:
return None
return str(val)
def _safe_dump(params: BaseModel | None) -> dict[str, Any]:
if params is None:
return {}
try:
return params.model_dump(mode="json", exclude_none=True)
except Exception:
return {}
async def audit_call(
*,
request: Request,
exchange: str,
action: str,
tool_fn: Callable[[], Awaitable[Any]],
params: BaseModel | None = None,
target_field: str | None = None,
) -> Any:
"""Esegue tool_fn e logga audit (success o error). Riraisola eccezioni."""
actor = getattr(request.state, "environment", None)
target = _extract_target(params, target_field)
payload = _safe_dump(params)
try:
result = await tool_fn()
except Exception as e:
audit_write_op(
actor=actor,
action=action,
exchange=exchange,
target=target,
payload=payload,
error=f"{type(e).__name__}: {e}",
)
raise
# Se result è dict, passa raw; altrimenti tenta serializzazione
audit_result: dict[str, Any] | None = None
if isinstance(result, dict):
audit_result = result
elif hasattr(result, "model_dump"):
try:
audit_result = result.model_dump(mode="json")
except Exception:
audit_result = None
audit_write_op(
actor=actor,
action=action,
exchange=exchange,
target=target,
payload=payload,
result=audit_result,
)
return result