4d9db750be
- pyproject.toml: ruff target-version py311 → py313 (auto-fix 42 lint warnings via UP rules); aggiunto consider_namespace_packages = true che risolve la collisione conftest tra servizi e permette di lanciare pytest sull'intera suite cross-servizio. - mcp_common.audit: nuovo helper audit_write_op() con logger dedicato mcp.audit. Wirato su tutti i write endpoint di deribit, bybit, alpaca e hyperliquid (place_order, place_combo_order, cancel_*, set_*, close_*, transfer_*, switch_*, amend_*) con principal + target + payload non-sensibile + result summarizzato. - mcp_common.app_factory: ExchangeAppSpec + run_exchange_main() centralizza il boilerplate dei __main__.py (configure_root_logging, fail_fast_if_missing, summarize, load creds, resolve_environment, load token store, uvicorn). I 4 __main__.py exchange ridotti da ~60 LOC ognuno a ~25 LOC dichiarativi. mcp_common.env_validation promosso da mcp_deribit (mantenuto re-export shim per back-compat test_env_validation). - 8 test nuovi (4 audit + 4 app_factory). Suite full: 450/450 verdi. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
98 lines
2.8 KiB
Python
98 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
import pytest
|
|
from mcp_common import audit as audit_mod
|
|
from mcp_common.audit import audit_write_op
|
|
from mcp_common.auth import Principal
|
|
|
|
|
|
@pytest.fixture
|
|
def captured_records(monkeypatch):
|
|
"""Cattura i record emessi dal logger mcp.audit (propagate=False blocca caplog).
|
|
|
|
Sostituisce il logger del modulo con uno che ha caplog attaccato.
|
|
"""
|
|
records: list[logging.LogRecord] = []
|
|
|
|
class ListHandler(logging.Handler):
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
records.append(record)
|
|
|
|
test_logger = logging.getLogger("mcp.audit.test")
|
|
test_logger.handlers.clear()
|
|
test_logger.addHandler(ListHandler())
|
|
test_logger.setLevel(logging.DEBUG)
|
|
test_logger.propagate = False
|
|
monkeypatch.setattr(audit_mod, "_logger", test_logger)
|
|
return records
|
|
|
|
|
|
def test_audit_write_op_emits_structured_record(captured_records):
|
|
p = Principal("core", {"core"})
|
|
audit_write_op(
|
|
principal=p,
|
|
action="place_order",
|
|
exchange="deribit",
|
|
target="BTC-PERPETUAL",
|
|
payload={"side": "buy", "amount": 10, "leverage": 3},
|
|
result={"order_id": "abc", "state": "open"},
|
|
)
|
|
assert len(captured_records) == 1
|
|
rec = captured_records[0]
|
|
assert rec.action == "place_order"
|
|
assert rec.exchange == "deribit"
|
|
assert rec.target == "BTC-PERPETUAL"
|
|
assert rec.principal == "core"
|
|
assert rec.payload == {"side": "buy", "amount": 10, "leverage": 3}
|
|
assert rec.result == {"order_id": "abc", "state": "open"}
|
|
|
|
|
|
def test_audit_write_op_error_uses_error_level(captured_records):
|
|
p = Principal("core", {"core"})
|
|
audit_write_op(
|
|
principal=p,
|
|
action="cancel_order",
|
|
exchange="bybit",
|
|
target="ord-123",
|
|
payload={},
|
|
error="not_found",
|
|
)
|
|
assert len(captured_records) == 1
|
|
rec = captured_records[0]
|
|
assert rec.levelname == "ERROR"
|
|
assert rec.error == "not_found"
|
|
|
|
|
|
def test_audit_write_op_summarizes_result_fields(captured_records):
|
|
p = Principal("core", {"core"})
|
|
big_result = {
|
|
"order_id": "ord-1",
|
|
"state": "submitted",
|
|
"extra_huge_field": "x" * 10000,
|
|
"orders": [{"id": 1}, {"id": 2}, {"id": 3}],
|
|
}
|
|
audit_write_op(
|
|
principal=p,
|
|
action="place_combo_order",
|
|
exchange="bybit",
|
|
payload={},
|
|
result=big_result,
|
|
)
|
|
rec = captured_records[0]
|
|
assert "extra_huge_field" not in rec.result
|
|
assert rec.result["order_id"] == "ord-1"
|
|
assert rec.result["orders_count"] == 3
|
|
|
|
|
|
def test_audit_write_op_no_principal(captured_records):
|
|
audit_write_op(
|
|
principal=None,
|
|
action="place_order",
|
|
exchange="alpaca",
|
|
payload={},
|
|
)
|
|
rec = captured_records[0]
|
|
assert rec.principal is None
|