feat(V2): migrazione common/ (indicators, options, microstructure, stats, http, audit, logging, mcp_bridge + auth)
This commit is contained in:
@@ -0,0 +1,121 @@
|
||||
"""Audit log strutturato per write endpoint MCP (place_order, cancel,
|
||||
set_*, close_*, transfer_*). Usa un logger dedicato `mcp.audit` su stream
|
||||
JSON.
|
||||
|
||||
Sink:
|
||||
- stdout/stderr (sempre): tramite root JSON logger configurato da
|
||||
`mcp_common.logging.configure_root_logging`.
|
||||
- File JSONL persistente (opzionale): se env var `AUDIT_LOG_FILE` è
|
||||
settata, aggiunge un `TimedRotatingFileHandler` che ruota a mezzanotte
|
||||
con `AUDIT_LOG_BACKUP_DAYS` di retention (default 30). Una riga JSON
|
||||
per record (formato `.jsonl`).
|
||||
|
||||
Per VPS produzione: setta `AUDIT_LOG_FILE=/var/log/cerbero-mcp/<service>.audit.jsonl`
|
||||
con bind mount del volume `/var/log/cerbero-mcp` nel docker-compose.
|
||||
|
||||
Payload sensibile (api_key, secret) già filtrato dal SecretsFilter
|
||||
globale; qui non si include creds.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
from typing import Any
|
||||
|
||||
from cerbero_mcp.common.auth import Principal
|
||||
from cerbero_mcp.common.logging import SecretsFilter, get_json_logger
|
||||
|
||||
try:
|
||||
from pythonjsonlogger.json import JsonFormatter as _JsonFormatter # noqa: N813
|
||||
except ImportError:
|
||||
from pythonjsonlogger.jsonlogger import JsonFormatter as _JsonFormatter # noqa: N813
|
||||
|
||||
_logger = get_json_logger("mcp.audit", level=logging.INFO)
|
||||
_file_handler_attached = False
|
||||
|
||||
|
||||
def _configure_audit_sink() -> None:
|
||||
"""Aggiunge FileHandler al logger mcp.audit se AUDIT_LOG_FILE è settato.
|
||||
Idempotente: chiamato la prima volta da audit_write_op, poi no-op.
|
||||
"""
|
||||
global _file_handler_attached
|
||||
if _file_handler_attached:
|
||||
return
|
||||
|
||||
file_path = os.environ.get("AUDIT_LOG_FILE", "").strip()
|
||||
if not file_path:
|
||||
_file_handler_attached = True
|
||||
return
|
||||
|
||||
backup_days = int(os.environ.get("AUDIT_LOG_BACKUP_DAYS", "30"))
|
||||
|
||||
os.makedirs(os.path.dirname(file_path) or ".", exist_ok=True)
|
||||
handler = TimedRotatingFileHandler(
|
||||
file_path,
|
||||
when="midnight",
|
||||
interval=1,
|
||||
backupCount=backup_days,
|
||||
encoding="utf-8",
|
||||
utc=True,
|
||||
)
|
||||
handler.setFormatter(_JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s"))
|
||||
handler.addFilter(SecretsFilter())
|
||||
_logger.addHandler(handler)
|
||||
_file_handler_attached = True
|
||||
|
||||
|
||||
def audit_write_op(
|
||||
*,
|
||||
principal: Principal | None,
|
||||
action: str,
|
||||
exchange: str,
|
||||
target: str | None = None,
|
||||
payload: dict[str, Any] | None = None,
|
||||
result: dict[str, Any] | None = None,
|
||||
error: str | None = None,
|
||||
) -> None:
|
||||
"""Emit a structured audit log record per write operation.
|
||||
|
||||
principal: chi ha invocato (None se anonimo, ma normalmente _check
|
||||
impedisce di arrivare qui senza principal).
|
||||
action: nome del tool (es. "place_order", "cancel_order").
|
||||
exchange: identificatore servizio (deribit, bybit, alpaca, hyperliquid).
|
||||
target: instrument/symbol/order_id su cui si agisce.
|
||||
payload: input non-sensibile (qty, side, leverage, ecc.).
|
||||
result: output del client (order_id, status, ecc.).
|
||||
error: stringa errore se l'operazione ha fallito.
|
||||
"""
|
||||
_configure_audit_sink()
|
||||
record: dict[str, Any] = {
|
||||
"audit_event": "write_op",
|
||||
"action": action,
|
||||
"exchange": exchange,
|
||||
"principal": principal.name if principal else None,
|
||||
"target": target,
|
||||
"payload": payload or {},
|
||||
}
|
||||
if result is not None:
|
||||
record["result"] = _summarize_result(result)
|
||||
if error is not None:
|
||||
record["error"] = error
|
||||
_logger.error("audit", extra=record)
|
||||
else:
|
||||
_logger.info("audit", extra=record)
|
||||
|
||||
|
||||
def _summarize_result(result: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Estrae i campi rilevanti dal result (order_id, state, error code)
|
||||
per evitare di loggare payload enormi.
|
||||
"""
|
||||
keys = (
|
||||
"order_id", "order_link_id", "combo_instrument", "state", "status",
|
||||
"code", "error", "stop_price", "tp_price", "transfer_id",
|
||||
)
|
||||
out: dict[str, Any] = {}
|
||||
for k in keys:
|
||||
if k in result:
|
||||
out[k] = result[k]
|
||||
if "orders" in result:
|
||||
out["orders_count"] = len(result["orders"])
|
||||
return out
|
||||
Reference in New Issue
Block a user