Files
Cerbero-Bite/tests/unit/test_clients_telegram.py
Adriano abf5a140e2 refactor: telegram + portfolio in-process (drop shared MCP)
Each bot now manages its own notification + portfolio aggregation:

* TelegramClient calls the public Bot API directly via httpx, reading
  CERBERO_BITE_TELEGRAM_BOT_TOKEN / CERBERO_BITE_TELEGRAM_CHAT_ID from
  env. No credentials → silent disabled mode.
* PortfolioClient composes DeribitClient + HyperliquidClient + the new
  MacroClient.get_asset_price/eur_usd_rate to expose equity (EUR) and
  per-asset exposure as the bot's own slice (no cross-bot view).
* mcp-telegram and mcp-portfolio removed from MCP_SERVICES / McpEndpoints
  and the cerbero-bite ping CLI; health_check no longer probes portfolio.

Docs (02/04/06/07) and docker-compose updated to reflect the new
architecture.

353/353 tests pass; ruff clean; mypy src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 00:31:20 +02:00

237 lines
7.5 KiB
Python

"""Tests for in-process TelegramClient (Bot API, notify-only)."""
from __future__ import annotations
import json
from decimal import Decimal
import httpx
import pytest
from pytest_httpx import HTTPXMock
from cerbero_bite.clients.telegram import (
TelegramClient,
TelegramError,
load_telegram_credentials,
)
SEND_URL = "https://api.telegram.org/botTOK/sendMessage"
def _client(**kw) -> TelegramClient:
defaults = {"bot_token": "TOK", "chat_id": "42"}
defaults.update(kw)
return TelegramClient(**defaults)
def _request_body(httpx_mock: HTTPXMock) -> dict:
request = httpx_mock.get_request()
assert request is not None
return json.loads(request.read())
# ---------------------------------------------------------------------------
# enabled / disabled
# ---------------------------------------------------------------------------
def test_enabled_when_both_token_and_chat_id_present() -> None:
assert _client().enabled is True
def test_disabled_when_token_missing() -> None:
c = TelegramClient(bot_token=None, chat_id="42")
assert c.enabled is False
def test_disabled_when_chat_id_missing() -> None:
c = TelegramClient(bot_token="TOK", chat_id=None)
assert c.enabled is False
def test_disabled_when_token_blank() -> None:
c = TelegramClient(bot_token=" ", chat_id="42")
assert c.enabled is False
@pytest.mark.asyncio
async def test_disabled_notify_is_noop(httpx_mock: HTTPXMock) -> None:
c = TelegramClient(bot_token=None, chat_id=None)
await c.notify("hello")
assert httpx_mock.get_requests() == []
# ---------------------------------------------------------------------------
# notify formatting
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_notify_sends_with_priority_and_tag(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True, "result": {}})
await _client().notify("hello", priority="high", tag="entry")
body = _request_body(httpx_mock)
assert body["chat_id"] == "42"
assert body["parse_mode"] == "HTML"
assert body["text"] == "[HIGH][entry] hello"
assert body["disable_web_page_preview"] is True
@pytest.mark.asyncio
async def test_notify_default_priority_normal(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
await _client().notify("plain")
body = _request_body(httpx_mock)
assert body["text"] == "[NORMAL] plain"
@pytest.mark.asyncio
async def test_notify_position_opened_formats_decimals(
httpx_mock: HTTPXMock,
) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
await _client().notify_position_opened(
instrument="ETH-15MAY26-2475-P",
side="SELL",
size=2,
strategy="bull_put",
greeks={"delta": Decimal("-0.04"), "vega": Decimal("0.20")},
expected_pnl_usd=Decimal("45.00"),
)
text = _request_body(httpx_mock)["text"]
assert "POSITION OPENED" in text
assert "ETH-15MAY26-2475-P" in text
assert "SELL" in text and "size: 2" in text and "bull_put" in text
assert "delta=-0.0400" in text and "vega=+0.2000" in text
assert "$+45.00" in text
@pytest.mark.asyncio
async def test_notify_position_opened_without_greeks(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
await _client().notify_position_opened(
instrument="BTC-PERPETUAL", side="BUY", size=1, strategy="hedge"
)
text = _request_body(httpx_mock)["text"]
assert "greeks" not in text
assert "expected pnl" not in text
@pytest.mark.asyncio
async def test_notify_position_closed(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
await _client().notify_position_closed(
instrument="ETH-15MAY26-2475-P_2350-P",
realized_pnl_usd=Decimal("32.50"),
reason="CLOSE_PROFIT",
)
text = _request_body(httpx_mock)["text"]
assert "POSITION CLOSED" in text
assert "ETH-15MAY26-2475-P_2350-P" in text
assert "$+32.50" in text
assert "CLOSE_PROFIT" in text
@pytest.mark.asyncio
async def test_notify_position_closed_negative_pnl(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
await _client().notify_position_closed(
instrument="X", realized_pnl_usd=Decimal("-12.5"), reason="STOP"
)
text = _request_body(httpx_mock)["text"]
assert "$-12.50" in text
@pytest.mark.asyncio
async def test_notify_alert(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
await _client().notify_alert(
source="kill_switch", message="armed manually", priority="critical"
)
text = _request_body(httpx_mock)["text"]
assert "ALERT [CRITICAL]" in text
assert "kill_switch" in text and "armed manually" in text
@pytest.mark.asyncio
async def test_notify_system_error(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
await _client().notify_system_error(
message="deribit feed anomaly", component="clients.deribit"
)
text = _request_body(httpx_mock)["text"]
assert "SYSTEM ERROR [CRITICAL]" in text
assert "deribit feed anomaly" in text
assert "clients.deribit" in text
@pytest.mark.asyncio
async def test_notify_system_error_without_component(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
await _client().notify_system_error(message="boom")
text = _request_body(httpx_mock)["text"]
assert "component" not in text
# ---------------------------------------------------------------------------
# error paths
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_http_non_200_raises(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, status_code=500, text="upstream")
with pytest.raises(TelegramError, match="HTTP 500"):
await _client().notify("x")
@pytest.mark.asyncio
async def test_api_ok_false_raises(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
url=SEND_URL, json={"ok": False, "description": "chat not found"}
)
with pytest.raises(TelegramError, match="chat not found"):
await _client().notify("x")
# ---------------------------------------------------------------------------
# shared httpx client
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_uses_shared_http_client(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(url=SEND_URL, json={"ok": True})
shared = httpx.AsyncClient()
try:
c = _client(http_client=shared)
await c.notify("x")
finally:
await shared.aclose()
assert len(httpx_mock.get_requests()) == 1
# ---------------------------------------------------------------------------
# env-var loader
# ---------------------------------------------------------------------------
def test_load_credentials_returns_none_when_unset() -> None:
assert load_telegram_credentials(env={}) == (None, None)
def test_load_credentials_strips_whitespace() -> None:
env = {
"CERBERO_BITE_TELEGRAM_BOT_TOKEN": " abc ",
"CERBERO_BITE_TELEGRAM_CHAT_ID": " -100 ",
}
assert load_telegram_credentials(env=env) == ("abc", "-100")
def test_load_credentials_treats_empty_as_none() -> None:
env = {
"CERBERO_BITE_TELEGRAM_BOT_TOKEN": "",
"CERBERO_BITE_TELEGRAM_CHAT_ID": " ",
}
assert load_telegram_credentials(env=env) == (None, None)