8ecc1a24a9
- /health/ready: ping di tutti i client (exchange, env) cached con timeout 2s, status ready|degraded|not_ready, opt-in 503 via READY_FAILS_ON_DEGRADED. - Middleware mcp.request: 1 riga JSON per HTTP request con request_id, method, path, status_code, duration_ms, actor, bot_tag, exchange, tool, client_ip, user_agent. - request_id propagato in request.state, audit log e error envelope per correlazione cross-cutting. - Aggiunto async health() come probe minimo a bybit/alpaca/macro/ sentiment/deribit (hyperliquid lo aveva già). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
129 lines
4.3 KiB
Python
129 lines
4.3 KiB
Python
"""Audit log strutturato per write endpoint MCP (place_order, cancel,
|
|
set_*, close_*, transfer_*). Usa un logger dedicato `mcp.audit` su stream
|
|
JSON.
|
|
|
|
Sink:
|
|
- stdout/stderr (sempre): tramite root JSON logger configurato da
|
|
`mcp_common.logging.configure_root_logging`.
|
|
- File JSONL persistente (opzionale): se env var `AUDIT_LOG_FILE` è
|
|
settata, aggiunge un `TimedRotatingFileHandler` che ruota a mezzanotte
|
|
con `AUDIT_LOG_BACKUP_DAYS` di retention (default 30). Una riga JSON
|
|
per record (formato `.jsonl`).
|
|
|
|
Per VPS produzione: setta `AUDIT_LOG_FILE=/var/log/cerbero-mcp/<service>.audit.jsonl`
|
|
con bind mount del volume `/var/log/cerbero-mcp` nel docker-compose.
|
|
|
|
Payload sensibile (api_key, secret) già filtrato dal SecretsFilter
|
|
globale; qui non si include creds.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
from typing import Any
|
|
|
|
from cerbero_mcp.common.logging import SecretsFilter, get_json_logger
|
|
|
|
try:
|
|
from pythonjsonlogger.json import JsonFormatter as _JsonFormatter # noqa: N813
|
|
except ImportError:
|
|
from pythonjsonlogger.jsonlogger import JsonFormatter as _JsonFormatter # noqa: N813
|
|
|
|
_logger = get_json_logger("mcp.audit", level=logging.INFO)
|
|
_file_handler_attached = False
|
|
|
|
|
|
def _configure_audit_sink() -> None:
|
|
"""Aggiunge FileHandler al logger mcp.audit se AUDIT_LOG_FILE è settato.
|
|
Idempotente: chiamato la prima volta da audit_write_op, poi no-op.
|
|
"""
|
|
global _file_handler_attached
|
|
if _file_handler_attached:
|
|
return
|
|
|
|
file_path = os.environ.get("AUDIT_LOG_FILE", "").strip()
|
|
if not file_path:
|
|
_file_handler_attached = True
|
|
return
|
|
|
|
backup_days = int(os.environ.get("AUDIT_LOG_BACKUP_DAYS", "30"))
|
|
|
|
os.makedirs(os.path.dirname(file_path) or ".", exist_ok=True)
|
|
handler = TimedRotatingFileHandler(
|
|
file_path,
|
|
when="midnight",
|
|
interval=1,
|
|
backupCount=backup_days,
|
|
encoding="utf-8",
|
|
utc=True,
|
|
)
|
|
handler.setFormatter(_JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s"))
|
|
handler.addFilter(SecretsFilter())
|
|
_logger.addHandler(handler)
|
|
_file_handler_attached = True
|
|
|
|
|
|
def audit_write_op(
|
|
*,
|
|
actor: str | None = None,
|
|
bot_tag: str | None = None,
|
|
action: str,
|
|
exchange: str,
|
|
target: str | None = None,
|
|
payload: dict[str, Any] | None = None,
|
|
result: dict[str, Any] | None = None,
|
|
error: str | None = None,
|
|
request_id: str | None = None,
|
|
) -> None:
|
|
"""Emit a structured audit log record per write operation.
|
|
|
|
actor: identificatore di chi ha invocato (es. "testnet", "mainnet",
|
|
oppure None per logging anonimo).
|
|
bot_tag: identificatore del bot chiamante (header X-Bot-Tag).
|
|
action: nome del tool (es. "place_order", "cancel_order").
|
|
exchange: identificatore servizio (deribit, bybit, alpaca, hyperliquid).
|
|
target: instrument/symbol/order_id su cui si agisce.
|
|
payload: input non-sensibile (qty, side, leverage, ecc.).
|
|
result: output del client (order_id, status, ecc.).
|
|
error: stringa errore se l'operazione ha fallito.
|
|
request_id: id propagato dal middleware request log per correlazione
|
|
tra audit log e request log.
|
|
"""
|
|
_configure_audit_sink()
|
|
record: dict[str, Any] = {
|
|
"audit_event": "write_op",
|
|
"action": action,
|
|
"exchange": exchange,
|
|
"actor": actor,
|
|
"bot_tag": bot_tag,
|
|
"target": target,
|
|
"payload": payload or {},
|
|
}
|
|
if request_id is not None:
|
|
record["request_id"] = request_id
|
|
if result is not None:
|
|
record["result"] = _summarize_result(result)
|
|
if error is not None:
|
|
record["error"] = error
|
|
_logger.error("audit", extra=record)
|
|
else:
|
|
_logger.info("audit", extra=record)
|
|
|
|
|
|
def _summarize_result(result: dict[str, Any]) -> dict[str, Any]:
|
|
"""Estrae i campi rilevanti dal result (order_id, state, error code)
|
|
per evitare di loggare payload enormi.
|
|
"""
|
|
keys = (
|
|
"order_id", "order_link_id", "combo_instrument", "state", "status",
|
|
"code", "error", "stop_price", "tp_price", "transfer_id",
|
|
)
|
|
out: dict[str, Any] = {}
|
|
for k in keys:
|
|
if k in result:
|
|
out[k] = result[k]
|
|
if "orders" in result:
|
|
out["orders_count"] = len(result["orders"])
|
|
return out
|