993326136b
Copiati e aggiornati i test da services/common/tests/ a tests/unit/common/. Import aggiornati da mcp_common a cerbero_mcp.common. Eliminati test di funzionalità V1-only (app_factory, environment, auth/Principal, server_base). Refactored test_audit.py (principal→actor str) e test_mcp_bridge.py (TokenStore→valid_tokens set). 71/71 test passano. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
152 lines
4.6 KiB
Python
152 lines
4.6 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
import pytest
|
|
from cerbero_mcp.common import audit as audit_mod
|
|
from cerbero_mcp.common.audit import audit_write_op
|
|
|
|
|
|
@pytest.fixture
|
|
def captured_records(monkeypatch):
|
|
"""Cattura i record emessi dal logger mcp.audit (propagate=False blocca caplog).
|
|
|
|
Sostituisce il logger del modulo con uno che ha caplog attaccato.
|
|
"""
|
|
records: list[logging.LogRecord] = []
|
|
|
|
class ListHandler(logging.Handler):
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
records.append(record)
|
|
|
|
test_logger = logging.getLogger("mcp.audit.test")
|
|
test_logger.handlers.clear()
|
|
test_logger.addHandler(ListHandler())
|
|
test_logger.setLevel(logging.DEBUG)
|
|
test_logger.propagate = False
|
|
monkeypatch.setattr(audit_mod, "_logger", test_logger)
|
|
return records
|
|
|
|
|
|
def test_audit_write_op_emits_structured_record(captured_records):
|
|
audit_write_op(
|
|
actor="core",
|
|
action="place_order",
|
|
exchange="deribit",
|
|
target="BTC-PERPETUAL",
|
|
payload={"side": "buy", "amount": 10, "leverage": 3},
|
|
result={"order_id": "abc", "state": "open"},
|
|
)
|
|
assert len(captured_records) == 1
|
|
rec = captured_records[0]
|
|
assert rec.action == "place_order"
|
|
assert rec.exchange == "deribit"
|
|
assert rec.target == "BTC-PERPETUAL"
|
|
assert rec.actor == "core"
|
|
assert rec.payload == {"side": "buy", "amount": 10, "leverage": 3}
|
|
assert rec.result == {"order_id": "abc", "state": "open"}
|
|
|
|
|
|
def test_audit_write_op_error_uses_error_level(captured_records):
|
|
audit_write_op(
|
|
actor="core",
|
|
action="cancel_order",
|
|
exchange="bybit",
|
|
target="ord-123",
|
|
payload={},
|
|
error="not_found",
|
|
)
|
|
assert len(captured_records) == 1
|
|
rec = captured_records[0]
|
|
assert rec.levelname == "ERROR"
|
|
assert rec.error == "not_found"
|
|
|
|
|
|
def test_audit_write_op_summarizes_result_fields(captured_records):
|
|
big_result = {
|
|
"order_id": "ord-1",
|
|
"state": "submitted",
|
|
"extra_huge_field": "x" * 10000,
|
|
"orders": [{"id": 1}, {"id": 2}, {"id": 3}],
|
|
}
|
|
audit_write_op(
|
|
actor="core",
|
|
action="place_combo_order",
|
|
exchange="bybit",
|
|
payload={},
|
|
result=big_result,
|
|
)
|
|
rec = captured_records[0]
|
|
assert "extra_huge_field" not in rec.result
|
|
assert rec.result["order_id"] == "ord-1"
|
|
assert rec.result["orders_count"] == 3
|
|
|
|
|
|
def test_audit_write_op_no_actor(captured_records):
|
|
audit_write_op(
|
|
actor=None,
|
|
action="place_order",
|
|
exchange="alpaca",
|
|
payload={},
|
|
)
|
|
rec = captured_records[0]
|
|
assert rec.actor is None
|
|
|
|
|
|
def test_audit_write_op_writes_to_file_when_AUDIT_LOG_FILE_set(tmp_path, monkeypatch):
|
|
"""Con env AUDIT_LOG_FILE settato, una riga JSON appare nel file."""
|
|
import json
|
|
|
|
from cerbero_mcp.common import audit as audit_mod
|
|
|
|
audit_file = tmp_path / "audit.jsonl"
|
|
monkeypatch.setenv("AUDIT_LOG_FILE", str(audit_file))
|
|
# Reset state idempotency flag così il test riesegue setup
|
|
audit_mod._file_handler_attached = False
|
|
# Pulisci handlers preesistenti dal logger (potrebbe avere file vecchio)
|
|
for h in list(audit_mod._logger.handlers):
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
if isinstance(h, TimedRotatingFileHandler):
|
|
audit_mod._logger.removeHandler(h)
|
|
|
|
audit_write_op(
|
|
actor="core",
|
|
action="place_order",
|
|
exchange="bybit",
|
|
target="BTCUSDT",
|
|
payload={"side": "Buy", "qty": 0.01},
|
|
result={"order_id": "abc123", "status": "submitted"},
|
|
)
|
|
|
|
# Forza flush dei file handler
|
|
for h in audit_mod._logger.handlers:
|
|
h.flush()
|
|
|
|
assert audit_file.exists()
|
|
content = audit_file.read_text().strip()
|
|
assert content, "audit file empty"
|
|
record = json.loads(content.splitlines()[-1])
|
|
assert record["audit_event"] == "write_op"
|
|
assert record["action"] == "place_order"
|
|
assert record["exchange"] == "bybit"
|
|
assert record["target"] == "BTCUSDT"
|
|
assert record["actor"] == "core"
|
|
|
|
|
|
def test_audit_no_file_when_env_unset(tmp_path, monkeypatch):
|
|
"""Senza AUDIT_LOG_FILE, nessun file viene creato."""
|
|
from cerbero_mcp.common import audit as audit_mod
|
|
monkeypatch.delenv("AUDIT_LOG_FILE", raising=False)
|
|
audit_mod._file_handler_attached = False
|
|
|
|
audit_write_op(
|
|
actor="core",
|
|
action="cancel_order",
|
|
exchange="bybit",
|
|
target="ord-1",
|
|
payload={},
|
|
)
|
|
# Niente file creato in tmp_path
|
|
files = list(tmp_path.iterdir())
|
|
assert files == []
|