"""Tests for AlertManager.""" from __future__ import annotations import json from datetime import UTC, datetime from pathlib import Path import pytest from pytest_httpx import HTTPXMock from cerbero_bite.clients.telegram import TelegramClient from cerbero_bite.runtime.alert_manager import AlertManager, Severity from cerbero_bite.safety import AuditLog, iter_entries from cerbero_bite.safety.kill_switch import KillSwitch from cerbero_bite.state import Repository, connect, run_migrations, transaction SEND_URL = "https://api.telegram.org/botTOK/sendMessage" def _make_alert_manager(tmp_path: Path) -> tuple[AlertManager, Path, Path, KillSwitch]: db_path = tmp_path / "state.sqlite" audit_path = tmp_path / "audit.log" conn = connect(db_path) run_migrations(conn) repo = Repository() with transaction(conn): repo.init_system_state( conn, config_version="1.0.0", now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC) ) conn.close() audit = AuditLog(audit_path) times = iter( datetime(2026, 4, 27, 14, m, tzinfo=UTC) for m in range(0, 50) ) ks = KillSwitch( connection_factory=lambda: connect(db_path), repository=Repository(), audit_log=audit, clock=lambda: next(times), ) telegram = TelegramClient(bot_token="TOK", chat_id="42") return AlertManager(telegram=telegram, audit_log=audit, kill_switch=ks), audit_path, db_path, ks @pytest.mark.asyncio async def test_low_emits_audit_only(tmp_path: Path, httpx_mock: HTTPXMock) -> None: am, audit_path, _, ks = _make_alert_manager(tmp_path) await am.low(source="test", message="just info") entries = list(iter_entries(audit_path)) assert len(entries) == 1 assert entries[0].event == "ALERT" assert entries[0].payload["severity"] == "low" assert ks.is_armed() is False # No telegram call expected assert httpx_mock.get_requests() == [] @pytest.mark.asyncio async def test_medium_calls_telegram_notify(tmp_path: Path, httpx_mock: HTTPXMock) -> None: httpx_mock.add_response(url=SEND_URL, json={"ok": True}) am, audit_path, _, ks = _make_alert_manager(tmp_path) await am.medium(source="entry_cycle", message="snapshot delayed") requests = httpx_mock.get_requests() assert len(requests) == 1 body = json.loads(requests[0].read()) assert body["text"] == "[HIGH][entry_cycle] snapshot delayed" assert ks.is_armed() is False assert any(e.payload["severity"] == "medium" for e in iter_entries(audit_path)) @pytest.mark.asyncio async def test_high_arms_kill_switch_and_calls_notify_alert( tmp_path: Path, httpx_mock: HTTPXMock ) -> None: httpx_mock.add_response(url=SEND_URL, json={"ok": True}) am, _, _, ks = _make_alert_manager(tmp_path) await am.high(source="health", message="3 consecutive MCP failures") body = json.loads(httpx_mock.get_request().read()) text = body["text"] assert "ALERT [HIGH]" in text assert "health" in text and "3 consecutive MCP failures" in text assert ks.is_armed() is True @pytest.mark.asyncio async def test_critical_arms_kill_switch_and_calls_notify_system_error( tmp_path: Path, httpx_mock: HTTPXMock ) -> None: httpx_mock.add_response(url=SEND_URL, json={"ok": True}) am, _, _, ks = _make_alert_manager(tmp_path) await am.critical( source="audit_chain", message="hash chain mismatch on line 142", component="safety.audit_log", ) body = json.loads(httpx_mock.get_request().read()) text = body["text"] assert "SYSTEM ERROR [CRITICAL]" in text assert "safety.audit_log" in text assert ks.is_armed() is True @pytest.mark.asyncio async def test_critical_when_already_armed_is_idempotent( tmp_path: Path, httpx_mock: HTTPXMock ) -> None: httpx_mock.add_response(url=SEND_URL, json={"ok": True}) am, _, _, ks = _make_alert_manager(tmp_path) ks.arm(reason="prior", source="manual") assert ks.is_armed() is True await am.critical(source="x", message="anomaly") assert ks.is_armed() is True @pytest.mark.asyncio async def test_emit_with_severity_enum(tmp_path: Path, httpx_mock: HTTPXMock) -> None: am, audit_path, _, _ks = _make_alert_manager(tmp_path) await am.emit(Severity.LOW, source="t", message="m") entries = list(iter_entries(audit_path)) assert entries[0].payload["severity"] == "low"