Files
Cerbero-Bite/src/cerbero_bite/runtime/alert_manager.py
T
Adriano abf5a140e2 refactor: telegram + portfolio in-process (drop shared MCP)
Each bot now manages its own notification + portfolio aggregation:

* TelegramClient calls the public Bot API directly via httpx, reading
  CERBERO_BITE_TELEGRAM_BOT_TOKEN / CERBERO_BITE_TELEGRAM_CHAT_ID from
  env. No credentials → silent disabled mode.
* PortfolioClient composes DeribitClient + HyperliquidClient + the new
  MacroClient.get_asset_price/eur_usd_rate to expose equity (EUR) and
  per-asset exposure as the bot's own slice (no cross-bot view).
* mcp-telegram and mcp-portfolio removed from MCP_SERVICES / McpEndpoints
  and the cerbero-bite ping CLI; health_check no longer probes portfolio.

Docs (02/04/06/07) and docker-compose updated to reflect the new
architecture.

353/353 tests pass; ruff clean; mypy src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 00:31:20 +02:00

110 lines
3.3 KiB
Python

"""Severity-based alert dispatcher (``docs/07-risk-controls.md``).
Routes anomalies to the right notification channel and, for HIGH and
CRITICAL events, arms the kill switch.
* ``LOW`` — append to audit log only.
* ``MEDIUM`` — audit + ``telegram.notify`` (priority="high").
* ``HIGH`` — audit + ``telegram.notify_alert`` (priority="high")
+ arm kill switch.
* ``CRITICAL``— audit + ``telegram.notify_system_error``
+ arm kill switch (already armed → idempotent).
"""
from __future__ import annotations
import logging
from enum import StrEnum
from cerbero_bite.clients.telegram import TelegramClient
from cerbero_bite.safety.audit_log import AuditLog
from cerbero_bite.safety.kill_switch import KillSwitch
__all__ = ["AlertManager", "Severity"]
_log = logging.getLogger("cerbero_bite.runtime.alert_manager")
class Severity(StrEnum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertManager:
"""Dispatcher used by every runtime module to surface anomalies."""
def __init__(
self,
*,
telegram: TelegramClient,
audit_log: AuditLog,
kill_switch: KillSwitch,
) -> None:
self._telegram = telegram
self._audit = audit_log
self._kill = kill_switch
async def emit(
self,
severity: Severity,
*,
source: str,
message: str,
component: str | None = None,
) -> None:
"""Emit an alert at the given severity level."""
self._audit.append(
event="ALERT",
payload={
"severity": severity.value,
"source": source,
"message": message,
"component": component,
},
)
if severity == Severity.LOW:
_log.info("alert.low source=%s message=%s", source, message)
return
if severity == Severity.MEDIUM:
# The TelegramClient already prefixes [PRIORITY][tag] in the
# rendered text, so we pass the raw message and let the
# client compose the final form.
await self._telegram.notify(
message, priority="high", tag=source
)
return
if severity == Severity.HIGH:
await self._telegram.notify_alert(
source=source, message=message, priority="high"
)
self._kill.arm(reason=message, source=source)
return
# CRITICAL
await self._telegram.notify_system_error(
message=message, component=component, priority="critical"
)
self._kill.arm(reason=message, source=source)
async def low(self, *, source: str, message: str) -> None:
await self.emit(Severity.LOW, source=source, message=message)
async def medium(self, *, source: str, message: str) -> None:
await self.emit(Severity.MEDIUM, source=source, message=message)
async def high(self, *, source: str, message: str) -> None:
await self.emit(Severity.HIGH, source=source, message=message)
async def critical(
self, *, source: str, message: str, component: str | None = None
) -> None:
await self.emit(
Severity.CRITICAL, source=source, message=message, component=component
)