feat(V2): X-Bot-Tag header obbligatorio + endpoint /admin/audit con filtri

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
AdrianoDev
2026-05-01 08:51:40 +02:00
parent bd6b03ce43
commit 69ac878893
10 changed files with 549 additions and 8 deletions
+155
View File
@@ -0,0 +1,155 @@
from __future__ import annotations
import json
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def tmp_audit_file(tmp_path, monkeypatch):
file_path = tmp_path / "audit.jsonl"
monkeypatch.setenv("AUDIT_LOG_FILE", str(file_path))
return file_path
@pytest.fixture
def app(monkeypatch, tmp_audit_file):
from tests.unit.test_settings import _minimal_env
for k, v in _minimal_env().items():
monkeypatch.setenv(k, v)
from cerbero_mcp.__main__ import _make_app
from cerbero_mcp.settings import Settings
return _make_app(Settings())
def _write_records(file_path: Path, records: list[dict]) -> None:
with file_path.open("w") as f:
for r in records:
f.write(json.dumps(r) + "\n")
def _bearer_test():
return {"Authorization": "Bearer t_test_123"}
def test_admin_audit_no_file(app):
"""Senza AUDIT_LOG_FILE settato, ritorna empty + warning."""
import os
os.environ.pop("AUDIT_LOG_FILE", None)
c = TestClient(app)
r = c.get("/admin/audit", headers=_bearer_test())
assert r.status_code == 200
body = r.json()
assert body["count"] == 0
assert "warning" in body
def test_admin_audit_no_bearer_returns_401(app):
c = TestClient(app)
r = c.get("/admin/audit")
assert r.status_code == 401
def test_admin_audit_no_bot_tag_required(app, tmp_audit_file):
"""Endpoint admin NON richiede X-Bot-Tag (solo bearer)."""
_write_records(tmp_audit_file, [])
c = TestClient(app)
r = c.get("/admin/audit", headers=_bearer_test())
assert r.status_code == 200
def test_admin_audit_returns_records(app, tmp_audit_file):
records = [
{
"audit_event": "write_op",
"asctime": "2026-05-01 10:00:00,000",
"actor": "testnet", "bot_tag": "alpha",
"exchange": "deribit", "action": "place_order",
"target": "BTC-PERPETUAL",
"payload": {"qty": 0.1},
"result": {"order_id": "abc"},
},
{
"audit_event": "write_op",
"asctime": "2026-05-01 11:00:00,000",
"actor": "mainnet", "bot_tag": "beta",
"exchange": "bybit", "action": "cancel_order",
"target": "ord-1",
"payload": {},
},
]
_write_records(tmp_audit_file, records)
c = TestClient(app)
r = c.get("/admin/audit", headers=_bearer_test())
assert r.status_code == 200
body = r.json()
assert body["count"] == 2
def test_admin_audit_filter_by_actor(app, tmp_audit_file):
records = [
{"audit_event": "write_op", "asctime": "2026-05-01 10:00:00,000",
"actor": "testnet", "bot_tag": "a", "exchange": "deribit", "action": "place_order"},
{"audit_event": "write_op", "asctime": "2026-05-01 11:00:00,000",
"actor": "mainnet", "bot_tag": "b", "exchange": "bybit", "action": "place_order"},
]
_write_records(tmp_audit_file, records)
c = TestClient(app)
r = c.get("/admin/audit?actor=mainnet", headers=_bearer_test())
assert r.status_code == 200
body = r.json()
assert body["count"] == 1
assert body["records"][0]["actor"] == "mainnet"
def test_admin_audit_filter_by_date_range(app, tmp_audit_file):
records = [
{"audit_event": "write_op", "asctime": "2026-04-30 10:00:00,000",
"actor": "testnet", "exchange": "deribit", "action": "place_order"},
{"audit_event": "write_op", "asctime": "2026-05-01 10:00:00,000",
"actor": "testnet", "exchange": "deribit", "action": "place_order"},
{"audit_event": "write_op", "asctime": "2026-05-02 10:00:00,000",
"actor": "testnet", "exchange": "deribit", "action": "place_order"},
]
_write_records(tmp_audit_file, records)
c = TestClient(app)
r = c.get("/admin/audit?from=2026-05-01&to=2026-05-01T23:59:59", headers=_bearer_test())
assert r.status_code == 200
assert r.json()["count"] == 1
def test_admin_audit_filter_by_bot_tag(app, tmp_audit_file):
records = [
{"audit_event": "write_op", "asctime": "2026-05-01 10:00:00,000",
"actor": "testnet", "bot_tag": "alpha", "exchange": "deribit", "action": "place_order"},
{"audit_event": "write_op", "asctime": "2026-05-01 11:00:00,000",
"actor": "testnet", "bot_tag": "beta", "exchange": "deribit", "action": "place_order"},
]
_write_records(tmp_audit_file, records)
c = TestClient(app)
r = c.get("/admin/audit?bot_tag=alpha", headers=_bearer_test())
assert r.status_code == 200
assert r.json()["count"] == 1
assert r.json()["records"][0]["bot_tag"] == "alpha"
def test_admin_audit_invalid_date(app, tmp_audit_file):
_write_records(tmp_audit_file, [])
c = TestClient(app)
r = c.get("/admin/audit?from=not-a-date", headers=_bearer_test())
assert r.status_code == 400
def test_admin_audit_limit(app, tmp_audit_file):
records = [
{"audit_event": "write_op", "asctime": f"2026-05-01 10:{i:02d}:00,000",
"actor": "testnet", "exchange": "deribit", "action": "place_order"}
for i in range(50)
]
_write_records(tmp_audit_file, records)
c = TestClient(app)
r = c.get("/admin/audit?limit=10", headers=_bearer_test())
assert r.status_code == 200
assert r.json()["count"] == 10