from __future__ import annotations import json from pathlib import Path import pytest from fastapi.testclient import TestClient @pytest.fixture def tmp_audit_file(tmp_path, monkeypatch): file_path = tmp_path / "audit.jsonl" monkeypatch.setenv("AUDIT_LOG_FILE", str(file_path)) return file_path @pytest.fixture def app(monkeypatch, tmp_audit_file): from tests.unit.test_settings import _minimal_env for k, v in _minimal_env().items(): monkeypatch.setenv(k, v) from cerbero_mcp.__main__ import _make_app from cerbero_mcp.settings import Settings return _make_app(Settings()) def _write_records(file_path: Path, records: list[dict]) -> None: with file_path.open("w") as f: for r in records: f.write(json.dumps(r) + "\n") def _bearer_test(): return {"Authorization": "Bearer t_test_123"} def test_admin_audit_no_file(app): """Senza AUDIT_LOG_FILE settato, ritorna empty + warning.""" import os os.environ.pop("AUDIT_LOG_FILE", None) c = TestClient(app) r = c.get("/admin/audit", headers=_bearer_test()) assert r.status_code == 200 body = r.json() assert body["count"] == 0 assert "warning" in body def test_admin_audit_no_bearer_returns_401(app): c = TestClient(app) r = c.get("/admin/audit") assert r.status_code == 401 def test_admin_audit_no_bot_tag_required(app, tmp_audit_file): """Endpoint admin NON richiede X-Bot-Tag (solo bearer).""" _write_records(tmp_audit_file, []) c = TestClient(app) r = c.get("/admin/audit", headers=_bearer_test()) assert r.status_code == 200 def test_admin_audit_returns_records(app, tmp_audit_file): records = [ { "audit_event": "write_op", "asctime": "2026-05-01 10:00:00,000", "actor": "testnet", "bot_tag": "alpha", "exchange": "deribit", "action": "place_order", "target": "BTC-PERPETUAL", "payload": {"qty": 0.1}, "result": {"order_id": "abc"}, }, { "audit_event": "write_op", "asctime": "2026-05-01 11:00:00,000", "actor": "mainnet", "bot_tag": "beta", "exchange": "bybit", "action": "cancel_order", "target": "ord-1", "payload": {}, }, ] _write_records(tmp_audit_file, records) c = TestClient(app) r = c.get("/admin/audit", headers=_bearer_test()) assert r.status_code == 200 body = r.json() assert body["count"] == 2 def test_admin_audit_filter_by_actor(app, tmp_audit_file): records = [ {"audit_event": "write_op", "asctime": "2026-05-01 10:00:00,000", "actor": "testnet", "bot_tag": "a", "exchange": "deribit", "action": "place_order"}, {"audit_event": "write_op", "asctime": "2026-05-01 11:00:00,000", "actor": "mainnet", "bot_tag": "b", "exchange": "bybit", "action": "place_order"}, ] _write_records(tmp_audit_file, records) c = TestClient(app) r = c.get("/admin/audit?actor=mainnet", headers=_bearer_test()) assert r.status_code == 200 body = r.json() assert body["count"] == 1 assert body["records"][0]["actor"] == "mainnet" def test_admin_audit_filter_by_date_range(app, tmp_audit_file): records = [ {"audit_event": "write_op", "asctime": "2026-04-30 10:00:00,000", "actor": "testnet", "exchange": "deribit", "action": "place_order"}, {"audit_event": "write_op", "asctime": "2026-05-01 10:00:00,000", "actor": "testnet", "exchange": "deribit", "action": "place_order"}, {"audit_event": "write_op", "asctime": "2026-05-02 10:00:00,000", "actor": "testnet", "exchange": "deribit", "action": "place_order"}, ] _write_records(tmp_audit_file, records) c = TestClient(app) r = c.get("/admin/audit?from=2026-05-01&to=2026-05-01T23:59:59", headers=_bearer_test()) assert r.status_code == 200 assert r.json()["count"] == 1 def test_admin_audit_filter_by_bot_tag(app, tmp_audit_file): records = [ {"audit_event": "write_op", "asctime": "2026-05-01 10:00:00,000", "actor": "testnet", "bot_tag": "alpha", "exchange": "deribit", "action": "place_order"}, {"audit_event": "write_op", "asctime": "2026-05-01 11:00:00,000", "actor": "testnet", "bot_tag": "beta", "exchange": "deribit", "action": "place_order"}, ] _write_records(tmp_audit_file, records) c = TestClient(app) r = c.get("/admin/audit?bot_tag=alpha", headers=_bearer_test()) assert r.status_code == 200 assert r.json()["count"] == 1 assert r.json()["records"][0]["bot_tag"] == "alpha" def test_admin_audit_invalid_date(app, tmp_audit_file): _write_records(tmp_audit_file, []) c = TestClient(app) r = c.get("/admin/audit?from=not-a-date", headers=_bearer_test()) assert r.status_code == 400 def test_admin_audit_limit(app, tmp_audit_file): records = [ {"audit_event": "write_op", "asctime": f"2026-05-01 10:{i:02d}:00,000", "actor": "testnet", "exchange": "deribit", "action": "place_order"} for i in range(50) ] _write_records(tmp_audit_file, records) c = TestClient(app) r = c.get("/admin/audit?limit=10", headers=_bearer_test()) assert r.status_code == 200 assert r.json()["count"] == 10