"""Direct Telegram Bot API client (notify-only). Cerbero Bite is fully autonomous: Telegram is used solely to *notify* the operator of what the engine has done — there is no inbound queue and no confirmation logic. Credentials are read from the environment: * ``CERBERO_BITE_TELEGRAM_BOT_TOKEN`` — bot token from BotFather. * ``CERBERO_BITE_TELEGRAM_CHAT_ID`` — destination chat id. If either is missing the client runs in **disabled** mode: every ``notify_*`` becomes a no-op logged at DEBUG. This keeps unconfigured deployments and the test environment harmless. """ from __future__ import annotations import logging import os from decimal import Decimal from typing import Any import httpx __all__ = [ "TELEGRAM_BOT_TOKEN_ENV", "TELEGRAM_CHAT_ID_ENV", "TelegramClient", "TelegramError", "load_telegram_credentials", ] TELEGRAM_BOT_TOKEN_ENV = "CERBERO_BITE_TELEGRAM_BOT_TOKEN" TELEGRAM_CHAT_ID_ENV = "CERBERO_BITE_TELEGRAM_CHAT_ID" _log = logging.getLogger("cerbero_bite.clients.telegram") class TelegramError(RuntimeError): """Raised when the Telegram Bot API rejects a sendMessage call.""" def _to_float(value: Decimal | float | int) -> float: return float(value) def load_telegram_credentials( env: dict[str, str] | None = None, ) -> tuple[str | None, str | None]: """Return ``(bot_token, chat_id)`` from env. Empty strings → ``None``.""" e = env if env is not None else os.environ token = (e.get(TELEGRAM_BOT_TOKEN_ENV) or "").strip() or None chat = (e.get(TELEGRAM_CHAT_ID_ENV) or "").strip() or None return token, chat class TelegramClient: """Notify-only client over the public Telegram Bot API.""" BASE_URL = "https://api.telegram.org" def __init__( self, *, bot_token: str | None, chat_id: str | None, http_client: httpx.AsyncClient | None = None, timeout_s: float = 5.0, parse_mode: str = "HTML", ) -> None: self._token = (bot_token or "").strip() or None self._chat_id = (str(chat_id).strip() if chat_id is not None else "") or None self._client = http_client self._timeout = timeout_s self._parse_mode = parse_mode @property def enabled(self) -> bool: return self._token is not None and self._chat_id is not None async def _send(self, text: str) -> None: if not self.enabled: _log.debug("telegram disabled, dropping message: %s", text[:120]) return url = f"{self.BASE_URL}/bot{self._token}/sendMessage" payload: dict[str, Any] = { "chat_id": self._chat_id, "text": text, "parse_mode": self._parse_mode, "disable_web_page_preview": True, } client = self._client owns = client is None if client is None: client = httpx.AsyncClient(timeout=self._timeout) try: resp = await client.post(url, json=payload, timeout=self._timeout) finally: if owns: await client.aclose() if resp.status_code != 200: raise TelegramError( f"telegram HTTP {resp.status_code}: {resp.text[:200]}" ) data = resp.json() if not isinstance(data, dict) or not data.get("ok", False): desc = ( data.get("description", "?") if isinstance(data, dict) else str(data) ) raise TelegramError(f"telegram api error: {desc}") async def notify( self, message: str, *, priority: str = "normal", tag: str | None = None, ) -> None: prefix = f"[{priority.upper()}]" if tag: prefix = f"{prefix}[{tag}]" await self._send(f"{prefix} {message}") async def notify_position_opened( self, *, instrument: str, side: str, size: int, strategy: str, greeks: dict[str, Decimal | float] | None = None, expected_pnl_usd: Decimal | float | None = None, ) -> None: lines = [ "POSITION OPENED", f"instrument: {instrument}", f"side: {side} | size: {size} | strategy: {strategy}", ] if greeks: joined = ", ".join( f"{k}={_to_float(v):+.4f}" for k, v in greeks.items() ) lines.append(f"greeks: {joined}") if expected_pnl_usd is not None: lines.append(f"expected pnl: ${_to_float(expected_pnl_usd):+.2f}") await self._send("\n".join(lines)) async def notify_position_closed( self, *, instrument: str, realized_pnl_usd: Decimal | float, reason: str, ) -> None: pnl = _to_float(realized_pnl_usd) await self._send( "POSITION CLOSED\n" f"instrument: {instrument}\n" f"realized pnl: ${pnl:+.2f}\n" f"reason: {reason}" ) async def notify_alert( self, *, source: str, message: str, priority: str = "high", ) -> None: await self._send( f"ALERT [{priority.upper()}]\n" f"source: {source}\n" f"{message}" ) async def notify_system_error( self, *, message: str, component: str | None = None, priority: str = "critical", ) -> None: text = f"SYSTEM ERROR [{priority.upper()}]\n" if component: text += f"component: {component}\n" text += message await self._send(text)