a1110c8ecb
#2 Env switch safety: - mcp_common/environment.py: nuova consistency_check() che previene switch accidentali a mainnet. Solleva EnvironmentMismatchError se resolved=mainnet senza creds["environment"]="mainnet" esplicito, o se declared/resolved mismatch. Override via STRICT_MAINNET=false. - Wirato in app_factory.run_exchange_main al boot. - 6 nuovi test consistency. #3 Audit log persistence: - mcp_common/audit.py: TimedRotatingFileHandler aggiuntivo se env AUDIT_LOG_FILE settato. Rotation midnight UTC, retention 30gg default (AUDIT_LOG_BACKUP_DAYS). Format JSONL con SecretsFilter. - docker-compose.prod.yml: bind mount /var/log/cerbero-mcp + env AUDIT_LOG_FILE per i 4 servizi exchange (write endpoints). - 2 nuovi test file sink. #1 Deploy script: - scripts/deploy.sh: idempotente, fa docker login + clone/pull repo + copia secrets chmod 600 + crea .env + setup audit dir + pull image + up + smoke test pubblico HTTPS. - DEPLOYMENT.md aggiornato: sezioni 2 (script), 3 (safety mainnet), 4 (audit log query), renumber sezioni successive. Test: 488/488 verdi. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
155 lines
4.7 KiB
Python
155 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
import pytest
|
|
from mcp_common import audit as audit_mod
|
|
from mcp_common.audit import audit_write_op
|
|
from mcp_common.auth import Principal
|
|
|
|
|
|
@pytest.fixture
|
|
def captured_records(monkeypatch):
|
|
"""Cattura i record emessi dal logger mcp.audit (propagate=False blocca caplog).
|
|
|
|
Sostituisce il logger del modulo con uno che ha caplog attaccato.
|
|
"""
|
|
records: list[logging.LogRecord] = []
|
|
|
|
class ListHandler(logging.Handler):
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
records.append(record)
|
|
|
|
test_logger = logging.getLogger("mcp.audit.test")
|
|
test_logger.handlers.clear()
|
|
test_logger.addHandler(ListHandler())
|
|
test_logger.setLevel(logging.DEBUG)
|
|
test_logger.propagate = False
|
|
monkeypatch.setattr(audit_mod, "_logger", test_logger)
|
|
return records
|
|
|
|
|
|
def test_audit_write_op_emits_structured_record(captured_records):
|
|
p = Principal("core", {"core"})
|
|
audit_write_op(
|
|
principal=p,
|
|
action="place_order",
|
|
exchange="deribit",
|
|
target="BTC-PERPETUAL",
|
|
payload={"side": "buy", "amount": 10, "leverage": 3},
|
|
result={"order_id": "abc", "state": "open"},
|
|
)
|
|
assert len(captured_records) == 1
|
|
rec = captured_records[0]
|
|
assert rec.action == "place_order"
|
|
assert rec.exchange == "deribit"
|
|
assert rec.target == "BTC-PERPETUAL"
|
|
assert rec.principal == "core"
|
|
assert rec.payload == {"side": "buy", "amount": 10, "leverage": 3}
|
|
assert rec.result == {"order_id": "abc", "state": "open"}
|
|
|
|
|
|
def test_audit_write_op_error_uses_error_level(captured_records):
|
|
p = Principal("core", {"core"})
|
|
audit_write_op(
|
|
principal=p,
|
|
action="cancel_order",
|
|
exchange="bybit",
|
|
target="ord-123",
|
|
payload={},
|
|
error="not_found",
|
|
)
|
|
assert len(captured_records) == 1
|
|
rec = captured_records[0]
|
|
assert rec.levelname == "ERROR"
|
|
assert rec.error == "not_found"
|
|
|
|
|
|
def test_audit_write_op_summarizes_result_fields(captured_records):
|
|
p = Principal("core", {"core"})
|
|
big_result = {
|
|
"order_id": "ord-1",
|
|
"state": "submitted",
|
|
"extra_huge_field": "x" * 10000,
|
|
"orders": [{"id": 1}, {"id": 2}, {"id": 3}],
|
|
}
|
|
audit_write_op(
|
|
principal=p,
|
|
action="place_combo_order",
|
|
exchange="bybit",
|
|
payload={},
|
|
result=big_result,
|
|
)
|
|
rec = captured_records[0]
|
|
assert "extra_huge_field" not in rec.result
|
|
assert rec.result["order_id"] == "ord-1"
|
|
assert rec.result["orders_count"] == 3
|
|
|
|
|
|
def test_audit_write_op_no_principal(captured_records):
|
|
audit_write_op(
|
|
principal=None,
|
|
action="place_order",
|
|
exchange="alpaca",
|
|
payload={},
|
|
)
|
|
rec = captured_records[0]
|
|
assert rec.principal is None
|
|
|
|
|
|
def test_audit_write_op_writes_to_file_when_AUDIT_LOG_FILE_set(tmp_path, monkeypatch):
|
|
"""Con env AUDIT_LOG_FILE settato, una riga JSON appare nel file."""
|
|
import json
|
|
from mcp_common import audit as audit_mod
|
|
|
|
audit_file = tmp_path / "audit.jsonl"
|
|
monkeypatch.setenv("AUDIT_LOG_FILE", str(audit_file))
|
|
# Reset state idempotency flag così il test riesegue setup
|
|
audit_mod._file_handler_attached = False
|
|
# Pulisci handlers preesistenti dal logger (potrebbe avere file vecchio)
|
|
for h in list(audit_mod._logger.handlers):
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
if isinstance(h, TimedRotatingFileHandler):
|
|
audit_mod._logger.removeHandler(h)
|
|
|
|
audit_write_op(
|
|
principal=Principal("core", {"core"}),
|
|
action="place_order",
|
|
exchange="bybit",
|
|
target="BTCUSDT",
|
|
payload={"side": "Buy", "qty": 0.01},
|
|
result={"order_id": "abc123", "status": "submitted"},
|
|
)
|
|
|
|
# Forza flush dei file handler
|
|
for h in audit_mod._logger.handlers:
|
|
h.flush()
|
|
|
|
assert audit_file.exists()
|
|
content = audit_file.read_text().strip()
|
|
assert content, "audit file empty"
|
|
record = json.loads(content.splitlines()[-1])
|
|
assert record["audit_event"] == "write_op"
|
|
assert record["action"] == "place_order"
|
|
assert record["exchange"] == "bybit"
|
|
assert record["target"] == "BTCUSDT"
|
|
assert record["principal"] == "core"
|
|
|
|
|
|
def test_audit_no_file_when_env_unset(tmp_path, monkeypatch):
|
|
"""Senza AUDIT_LOG_FILE, nessun file viene creato."""
|
|
from mcp_common import audit as audit_mod
|
|
monkeypatch.delenv("AUDIT_LOG_FILE", raising=False)
|
|
audit_mod._file_handler_attached = False
|
|
|
|
audit_write_op(
|
|
principal=Principal("core", {"core"}),
|
|
action="cancel_order",
|
|
exchange="bybit",
|
|
target="ord-1",
|
|
payload={},
|
|
)
|
|
# Niente file creato in tmp_path
|
|
files = list(tmp_path.iterdir())
|
|
assert files == []
|