feat(safety+audit+deploy): consistency_check + audit log file sink + deploy script
ci / ruff lint (push) Failing after 12s
ci / mypy mcp_common (push) Successful in 25s
ci / pytest (push) Successful in 35s
ci / validate compose + Caddyfile (push) Successful in 2m3s
ci / build & push to registry (push) Has been skipped

#2 Env switch safety:
- mcp_common/environment.py: nuova consistency_check() che previene
  switch accidentali a mainnet. Solleva EnvironmentMismatchError se
  resolved=mainnet senza creds["environment"]="mainnet" esplicito,
  o se declared/resolved mismatch. Override via STRICT_MAINNET=false.
- Wirato in app_factory.run_exchange_main al boot.
- 6 nuovi test consistency.

#3 Audit log persistence:
- mcp_common/audit.py: TimedRotatingFileHandler aggiuntivo se env
  AUDIT_LOG_FILE settato. Rotation midnight UTC, retention 30gg
  default (AUDIT_LOG_BACKUP_DAYS). Format JSONL con SecretsFilter.
- docker-compose.prod.yml: bind mount /var/log/cerbero-mcp + env
  AUDIT_LOG_FILE per i 4 servizi exchange (write endpoints).
- 2 nuovi test file sink.

#1 Deploy script:
- scripts/deploy.sh: idempotente, fa docker login + clone/pull repo +
  copia secrets chmod 600 + crea .env + setup audit dir + pull image
  + up + smoke test pubblico HTTPS.
- DEPLOYMENT.md aggiornato: sezioni 2 (script), 3 (safety mainnet),
  4 (audit log query), renumber sezioni successive.

Test: 488/488 verdi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
AdrianoDev
2026-04-29 09:29:04 +02:00
parent 019b7e3298
commit a1110c8ecb
9 changed files with 573 additions and 17 deletions
+10 -1
View File
@@ -24,7 +24,11 @@ import uvicorn
from mcp_common.auth import load_token_store_from_files
from mcp_common.env_validation import fail_fast_if_missing, require_env, summarize
from mcp_common.environment import EnvironmentInfo, resolve_environment
from mcp_common.environment import (
EnvironmentInfo,
consistency_check,
resolve_environment,
)
from mcp_common.logging import configure_root_logging
@@ -69,6 +73,11 @@ def run_exchange_main(spec: ExchangeAppSpec) -> None:
default_base_url_testnet=spec.default_base_url_testnet,
)
# Safety: previene switch accidentali a mainnet senza conferma esplicita
# nel secret. Solleva EnvironmentMismatchError → boot abort se mismatch.
strict_mainnet = os.environ.get("STRICT_MAINNET", "true").lower() not in ("0", "false", "no")
consistency_check(env_info, creds, strict_mainnet=strict_mainnet)
client = spec.build_client(creds, env_info)
token_store = load_token_store_from_files(
+54 -7
View File
@@ -1,22 +1,68 @@
"""Audit log strutturato per write endpoint MCP (place_order, cancel,
set_*, close_*, transfer_*). Usa un logger dedicato `mcp.audit` su stream
JSON: in deployment può essere redirezionato a file/syslog/SIEM separato.
JSON.
Logica:
- `audit_write_op(principal, action, exchange, target, payload, result)`
emette UN record JSON per ogni operazione con esito (ok/error).
- Payload sensibile (api_key, secret) già filtrato dal SecretsFilter
globale; qui non si include creds.
Sink:
- stdout/stderr (sempre): tramite root JSON logger configurato da
`mcp_common.logging.configure_root_logging`.
- File JSONL persistente (opzionale): se env var `AUDIT_LOG_FILE` è
settata, aggiunge un `TimedRotatingFileHandler` che ruota a mezzanotte
con `AUDIT_LOG_BACKUP_DAYS` di retention (default 30). Una riga JSON
per record (formato `.jsonl`).
Per VPS produzione: setta `AUDIT_LOG_FILE=/var/log/cerbero-mcp/<service>.audit.jsonl`
con bind mount del volume `/var/log/cerbero-mcp` nel docker-compose.
Payload sensibile (api_key, secret) già filtrato dal SecretsFilter
globale; qui non si include creds.
"""
from __future__ import annotations
import logging
import os
from logging.handlers import TimedRotatingFileHandler
from typing import Any
from mcp_common.auth import Principal
from mcp_common.logging import get_json_logger
from mcp_common.logging import SecretsFilter, get_json_logger
try:
from pythonjsonlogger.json import JsonFormatter as _JsonFormatter # noqa: N813
except ImportError:
from pythonjsonlogger.jsonlogger import JsonFormatter as _JsonFormatter # noqa: N813
_logger = get_json_logger("mcp.audit", level=logging.INFO)
_file_handler_attached = False
def _configure_audit_sink() -> None:
"""Aggiunge FileHandler al logger mcp.audit se AUDIT_LOG_FILE è settato.
Idempotente: chiamato la prima volta da audit_write_op, poi no-op.
"""
global _file_handler_attached
if _file_handler_attached:
return
file_path = os.environ.get("AUDIT_LOG_FILE", "").strip()
if not file_path:
_file_handler_attached = True
return
backup_days = int(os.environ.get("AUDIT_LOG_BACKUP_DAYS", "30"))
os.makedirs(os.path.dirname(file_path) or ".", exist_ok=True)
handler = TimedRotatingFileHandler(
file_path,
when="midnight",
interval=1,
backupCount=backup_days,
encoding="utf-8",
utc=True,
)
handler.setFormatter(_JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s"))
handler.addFilter(SecretsFilter())
_logger.addHandler(handler)
_file_handler_attached = True
def audit_write_op(
@@ -40,6 +86,7 @@ def audit_write_op(
result: output del client (order_id, status, ecc.).
error: stringa errore se l'operazione ha fallito.
"""
_configure_audit_sink()
record: dict[str, Any] = {
"audit_event": "write_op",
"action": action,
@@ -1,18 +1,31 @@
"""Resolver di ambiente (testnet/mainnet) per MCP exchange.
Precedenza: env var > campo secret > default True (testnet).
Safety: `consistency_check` previene switch accidentali a mainnet senza
conferma esplicita nel secret JSON.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from typing import Literal
logger = logging.getLogger(__name__)
Environment = Literal["testnet", "mainnet"]
Source = Literal["env", "credentials", "default"]
TRUTHY = {"1", "true", "yes", "on"}
# Tokens nel base_url che indicano endpoint testnet (case-insensitive).
TESTNET_URL_HINTS = ("test", "testnet", "paper")
class EnvironmentMismatchError(RuntimeError):
"""Boot abort: ambiente risolto non matcha conferma esplicita nel secret."""
@dataclass(frozen=True)
class EnvironmentInfo:
@@ -67,3 +80,59 @@ def resolve_environment(
env_value=env_value,
base_url=base_url,
)
def consistency_check(
env_info: EnvironmentInfo,
creds: dict,
*,
strict_mainnet: bool = True,
) -> list[str]:
"""Verifica coerenza environment risolto vs secret JSON. Restituisce
lista di warning string. Solleva EnvironmentMismatchError per mismatch
bloccanti.
Regole:
- Se `creds["environment"]` è presente e DIVERSO da `env_info.environment`:
→ raise EnvironmentMismatchError (declared vs resolved mismatch).
- Se `env_info.environment == "mainnet"` e `creds.get("environment") !=
"mainnet"`: con `strict_mainnet=True` → raise (richiede conferma
esplicita). Con `strict_mainnet=False` → warning.
- Se `env_info.base_url` contiene token testnet ("test", "testnet",
"paper") ma `env_info.environment == "mainnet"` (o viceversa): warning
(URL/environment incoerenti).
"""
warnings: list[str] = []
declared = creds.get("environment")
if declared and declared != env_info.environment:
raise EnvironmentMismatchError(
f"{env_info.exchange}: secret declared environment={declared!r} "
f"but resolver resolved environment={env_info.environment!r}"
)
if env_info.environment == "mainnet" and declared != "mainnet":
msg = (
f"{env_info.exchange}: resolved mainnet without explicit confirmation "
"in secret. Add `\"environment\": \"mainnet\"` to the credentials JSON."
)
if strict_mainnet:
raise EnvironmentMismatchError(msg)
warnings.append(msg)
url_lower = (env_info.base_url or "").lower()
has_test_hint = any(token in url_lower for token in TESTNET_URL_HINTS)
if env_info.environment == "mainnet" and has_test_hint:
warnings.append(
f"{env_info.exchange}: environment=mainnet but base_url contains "
f"testnet hint ({env_info.base_url!r})"
)
if env_info.environment == "testnet" and not has_test_hint and url_lower:
warnings.append(
f"{env_info.exchange}: environment=testnet but base_url does not "
f"appear to be a testnet endpoint ({env_info.base_url!r})"
)
for w in warnings:
logger.warning("environment consistency: %s", w)
return warnings
+32 -1
View File
@@ -3,6 +3,8 @@ from __future__ import annotations
import json
from unittest.mock import MagicMock, patch
import pytest
from mcp_common.app_factory import ExchangeAppSpec, run_exchange_main
from mcp_common.environment import EnvironmentInfo
@@ -75,7 +77,9 @@ def test_run_exchange_main_uses_default_port(tmp_path, monkeypatch):
def test_run_exchange_main_env_var_overrides_creds(tmp_path, monkeypatch):
creds_file = tmp_path / "creds.json"
creds_file.write_text(json.dumps({"testnet": True}))
# `environment: mainnet` esplicito perché env var override → mainnet
# e consistency_check richiede conferma per evitare switch accidentale.
creds_file.write_text(json.dumps({"testnet": True, "environment": "mainnet"}))
monkeypatch.setenv("TESTEX_CREDENTIALS_FILE", str(creds_file))
monkeypatch.setenv("TESTEX_TESTNET", "false")
@@ -95,6 +99,33 @@ def test_run_exchange_main_env_var_overrides_creds(tmp_path, monkeypatch):
assert captured["env_info"].source == "env"
def test_run_exchange_main_aborts_on_mainnet_without_confirmation(tmp_path, monkeypatch):
"""Mainnet senza creds['environment']='mainnet' → boot abort fail-fast."""
from mcp_common.environment import EnvironmentMismatchError
creds_file = tmp_path / "creds.json"
creds_file.write_text(json.dumps({"testnet": False}))
monkeypatch.setenv("TESTEX_CREDENTIALS_FILE", str(creds_file))
monkeypatch.delenv("TESTEX_TESTNET", raising=False)
monkeypatch.delenv("STRICT_MAINNET", raising=False)
spec = _make_spec()
with pytest.raises(EnvironmentMismatchError):
with patch("mcp_common.app_factory.uvicorn.run"):
run_exchange_main(spec)
def test_run_exchange_main_strict_mainnet_disabled_via_env(tmp_path, monkeypatch):
"""STRICT_MAINNET=false permette mainnet senza conferma (warning soltanto)."""
creds_file = tmp_path / "creds.json"
creds_file.write_text(json.dumps({"testnet": False}))
monkeypatch.setenv("TESTEX_CREDENTIALS_FILE", str(creds_file))
monkeypatch.setenv("STRICT_MAINNET", "false")
spec = _make_spec()
with patch("mcp_common.app_factory.uvicorn.run"):
run_exchange_main(spec) # non solleva
def test_run_exchange_main_missing_creds_file_exits(monkeypatch):
monkeypatch.delenv("TESTEX_CREDENTIALS_FILE", raising=False)
+57
View File
@@ -95,3 +95,60 @@ def test_audit_write_op_no_principal(captured_records):
)
rec = captured_records[0]
assert rec.principal is None
def test_audit_write_op_writes_to_file_when_AUDIT_LOG_FILE_set(tmp_path, monkeypatch):
"""Con env AUDIT_LOG_FILE settato, una riga JSON appare nel file."""
import json
from mcp_common import audit as audit_mod
audit_file = tmp_path / "audit.jsonl"
monkeypatch.setenv("AUDIT_LOG_FILE", str(audit_file))
# Reset state idempotency flag così il test riesegue setup
audit_mod._file_handler_attached = False
# Pulisci handlers preesistenti dal logger (potrebbe avere file vecchio)
for h in list(audit_mod._logger.handlers):
from logging.handlers import TimedRotatingFileHandler
if isinstance(h, TimedRotatingFileHandler):
audit_mod._logger.removeHandler(h)
audit_write_op(
principal=Principal("core", {"core"}),
action="place_order",
exchange="bybit",
target="BTCUSDT",
payload={"side": "Buy", "qty": 0.01},
result={"order_id": "abc123", "status": "submitted"},
)
# Forza flush dei file handler
for h in audit_mod._logger.handlers:
h.flush()
assert audit_file.exists()
content = audit_file.read_text().strip()
assert content, "audit file empty"
record = json.loads(content.splitlines()[-1])
assert record["audit_event"] == "write_op"
assert record["action"] == "place_order"
assert record["exchange"] == "bybit"
assert record["target"] == "BTCUSDT"
assert record["principal"] == "core"
def test_audit_no_file_when_env_unset(tmp_path, monkeypatch):
"""Senza AUDIT_LOG_FILE, nessun file viene creato."""
from mcp_common import audit as audit_mod
monkeypatch.delenv("AUDIT_LOG_FILE", raising=False)
audit_mod._file_handler_attached = False
audit_write_op(
principal=Principal("core", {"core"}),
action="cancel_order",
exchange="bybit",
target="ord-1",
payload={},
)
# Niente file creato in tmp_path
files = list(tmp_path.iterdir())
assert files == []
+74 -1
View File
@@ -1,7 +1,11 @@
from __future__ import annotations
import pytest
from mcp_common.environment import resolve_environment
from mcp_common.environment import (
EnvironmentMismatchError,
consistency_check,
resolve_environment,
)
def test_env_var_overrides_secret(monkeypatch):
@@ -114,3 +118,72 @@ def test_alpaca_paper_flag_key(monkeypatch):
)
assert info.environment == "mainnet"
assert info.source == "credentials"
# ───────── consistency_check ─────────
def _info(env: str, exchange: str = "deribit") -> "EnvironmentInfo":
"""Helper costruisce EnvironmentInfo per test."""
from mcp_common.environment import EnvironmentInfo
return EnvironmentInfo(
exchange=exchange,
environment=env,
source="env",
env_value="false" if env == "mainnet" else "true",
base_url=f"https://api.{exchange}.com" if env == "mainnet" else f"https://test.{exchange}.com",
)
def test_consistency_check_testnet_no_confirmation_ok():
"""Testnet senza conferma esplicita → ok, ritorna []. Default safe."""
info = _info("testnet")
creds = {"api_key": "k", "api_secret": "s"}
warnings = consistency_check(info, creds)
assert warnings == []
def test_consistency_check_mainnet_no_confirmation_raises():
"""Mainnet senza creds['environment']='mainnet' esplicito → fail-fast."""
info = _info("mainnet")
creds = {"api_key": "k", "api_secret": "s"}
with pytest.raises(EnvironmentMismatchError, match="mainnet.*explicit confirmation"):
consistency_check(info, creds)
def test_consistency_check_mainnet_with_confirmation_ok():
info = _info("mainnet")
creds = {"api_key": "k", "api_secret": "s", "environment": "mainnet"}
warnings = consistency_check(info, creds)
assert warnings == []
def test_consistency_check_explicit_mismatch_raises():
"""Secret dichiara mainnet ma resolver risolve testnet → fail-fast."""
info = _info("testnet")
creds = {"environment": "mainnet"}
with pytest.raises(EnvironmentMismatchError, match="declared.*resolved"):
consistency_check(info, creds)
def test_consistency_check_strict_mainnet_disabled():
"""Con strict_mainnet=False mainnet senza conferma logga warning ma non raise."""
info = _info("mainnet")
creds = {"api_key": "k", "api_secret": "s"}
warnings = consistency_check(info, creds, strict_mainnet=False)
assert any("mainnet" in w for w in warnings)
def test_consistency_check_url_does_not_match_environment_warns():
"""Base URL contiene 'test' ma environment='mainnet' → warning."""
from mcp_common.environment import EnvironmentInfo
info = EnvironmentInfo(
exchange="bybit",
environment="mainnet",
source="env",
env_value="false",
base_url="https://api-testnet.bybit.com", # url DICE testnet ma resolver MAINNET
)
creds = {"environment": "mainnet"}
warnings = consistency_check(info, creds)
assert any("base_url" in w.lower() for w in warnings)