Files
Cerbero-Bite/tests/unit/test_alert_manager.py
T
Adriano abf5a140e2 refactor: telegram + portfolio in-process (drop shared MCP)
Each bot now manages its own notification + portfolio aggregation:

* TelegramClient calls the public Bot API directly via httpx, reading
  CERBERO_BITE_TELEGRAM_BOT_TOKEN / CERBERO_BITE_TELEGRAM_CHAT_ID from
  env. No credentials → silent disabled mode.
* PortfolioClient composes DeribitClient + HyperliquidClient + the new
  MacroClient.get_asset_price/eur_usd_rate to expose equity (EUR) and
  per-asset exposure as the bot's own slice (no cross-bot view).
* mcp-telegram and mcp-portfolio removed from MCP_SERVICES / McpEndpoints
  and the cerbero-bite ping CLI; health_check no longer probes portfolio.

Docs (02/04/06/07) and docker-compose updated to reflect the new
architecture.

353/353 tests pass; ruff clean; mypy src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 00:31:20 +02:00

123 lines
4.3 KiB
Python

"""Tests for AlertManager."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
import pytest
from pytest_httpx import HTTPXMock
from cerbero_bite.clients.telegram import TelegramClient
from cerbero_bite.runtime.alert_manager import AlertManager, Severity
from cerbero_bite.safety import AuditLog, iter_entries
from cerbero_bite.safety.kill_switch import KillSwitch
from cerbero_bite.state import Repository, connect, run_migrations, transaction
SEND_URL = "https://api.telegram.org/botTOK/sendMessage"
def _make_alert_manager(tmp_path: Path) -> tuple[AlertManager, Path, Path, KillSwitch]:
db_path = tmp_path / "state.sqlite"
audit_path = tmp_path / "audit.log"
conn = connect(db_path)
run_migrations(conn)
repo = Repository()
with transaction(conn):
repo.init_system_state(
conn, config_version="1.0.0", now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
)
conn.close()
audit = AuditLog(audit_path)
times = iter(
datetime(2026, 4, 27, 14, m, tzinfo=UTC) for m in range(0, 50)
)
ks = KillSwitch(
connection_factory=lambda: connect(db_path),
repository=Repository(),
audit_log=audit,
clock=lambda: next(times),
)
telegram = TelegramClient(bot_token="TOK", chat_id="42")
return AlertManager(telegram=telegram, audit_log=audit, kill_switch=ks), audit_path, db_path, ks
@pytest.mark.asyncio
async def test_low_emits_audit_only(tmp_path: Path, httpx_mock: HTTPXMock) -> None:
am, audit_path, _, ks = _make_alert_manager(tmp_path)
await am.low(source="test", message="just info")
entries = list(iter_entries(audit_path))
assert len(entries) == 1
assert entries[0].event == "ALERT"
assert entries[0].payload["severity"] == "low"
assert ks.is_armed() is False
# No telegram call expected
assert httpx_mock.get_requests() == []
@pytest.mark.asyncio
async def test_medium_calls_telegram_notify(tmp_path: Path, httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
am, audit_path, _, ks = _make_alert_manager(tmp_path)
await am.medium(source="entry_cycle", message="snapshot delayed")
requests = httpx_mock.get_requests()
assert len(requests) == 1
body = json.loads(requests[0].read())
assert body["text"] == "[HIGH][entry_cycle] snapshot delayed"
assert ks.is_armed() is False
assert any(e.payload["severity"] == "medium" for e in iter_entries(audit_path))
@pytest.mark.asyncio
async def test_high_arms_kill_switch_and_calls_notify_alert(
tmp_path: Path, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
am, _, _, ks = _make_alert_manager(tmp_path)
await am.high(source="health", message="3 consecutive MCP failures")
body = json.loads(httpx_mock.get_request().read())
text = body["text"]
assert "ALERT [HIGH]" in text
assert "health" in text and "3 consecutive MCP failures" in text
assert ks.is_armed() is True
@pytest.mark.asyncio
async def test_critical_arms_kill_switch_and_calls_notify_system_error(
tmp_path: Path, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
am, _, _, ks = _make_alert_manager(tmp_path)
await am.critical(
source="audit_chain",
message="hash chain mismatch on line 142",
component="safety.audit_log",
)
body = json.loads(httpx_mock.get_request().read())
text = body["text"]
assert "SYSTEM ERROR [CRITICAL]" in text
assert "safety.audit_log" in text
assert ks.is_armed() is True
@pytest.mark.asyncio
async def test_critical_when_already_armed_is_idempotent(
tmp_path: Path, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
am, _, _, ks = _make_alert_manager(tmp_path)
ks.arm(reason="prior", source="manual")
assert ks.is_armed() is True
await am.critical(source="x", message="anomaly")
assert ks.is_armed() is True
@pytest.mark.asyncio
async def test_emit_with_severity_enum(tmp_path: Path, httpx_mock: HTTPXMock) -> None:
am, audit_path, _, _ks = _make_alert_manager(tmp_path)
await am.emit(Severity.LOW, source="t", message="m")
entries = list(iter_entries(audit_path))
assert entries[0].payload["severity"] == "low"