Files
Cerbero-Bite/tests/unit/test_mcp_endpoints.py
T
Adriano abf5a140e2 refactor: telegram + portfolio in-process (drop shared MCP)
Each bot now manages its own notification + portfolio aggregation:

* TelegramClient calls the public Bot API directly via httpx, reading
  CERBERO_BITE_TELEGRAM_BOT_TOKEN / CERBERO_BITE_TELEGRAM_CHAT_ID from
  env. No credentials → silent disabled mode.
* PortfolioClient composes DeribitClient + HyperliquidClient + the new
  MacroClient.get_asset_price/eur_usd_rate to expose equity (EUR) and
  per-asset exposure as the bot's own slice (no cross-bot view).
* mcp-telegram and mcp-portfolio removed from MCP_SERVICES / McpEndpoints
  and the cerbero-bite ping CLI; health_check no longer probes portfolio.

Docs (02/04/06/07) and docker-compose updated to reflect the new
architecture.

353/353 tests pass; ruff clean; mypy src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 00:31:20 +02:00

79 lines
2.4 KiB
Python

"""Tests for the MCP endpoint and token resolver."""
from __future__ import annotations
from pathlib import Path
import pytest
from cerbero_bite.config.mcp_endpoints import (
DEFAULT_ENDPOINTS,
MCP_SERVICES,
load_endpoints,
load_token,
)
def test_defaults_match_known_docker_dns() -> None:
assert DEFAULT_ENDPOINTS["deribit"] == "http://mcp-deribit:9011"
assert DEFAULT_ENDPOINTS["sentiment"] == "http://mcp-sentiment:9014"
def test_load_endpoints_uses_defaults_when_env_empty() -> None:
endpoints = load_endpoints(env={})
for name, url in DEFAULT_ENDPOINTS.items():
assert endpoints.for_service(name) == url
def test_per_service_env_var_override() -> None:
endpoints = load_endpoints(
env={"CERBERO_BITE_MCP_DERIBIT_URL": "http://localhost:9911"}
)
assert endpoints.deribit == "http://localhost:9911"
assert endpoints.macro == DEFAULT_ENDPOINTS["macro"]
def test_per_service_env_var_strips_trailing_slash() -> None:
endpoints = load_endpoints(
env={"CERBERO_BITE_MCP_DERIBIT_URL": "http://localhost:9911///"}
)
assert endpoints.deribit == "http://localhost:9911"
def test_for_service_unknown_raises_key_error() -> None:
endpoints = load_endpoints(env={})
with pytest.raises(KeyError, match="unknown MCP service"):
endpoints.for_service("nope")
def test_load_token_uses_explicit_path(tmp_path: Path) -> None:
target = tmp_path / "core.token"
target.write_text("abcdef\n", encoding="utf-8")
assert load_token(path=target) == "abcdef"
def test_load_token_uses_env_var(tmp_path: Path) -> None:
target = tmp_path / "core.token"
target.write_text("xyz", encoding="utf-8")
token = load_token(env={"CERBERO_BITE_CORE_TOKEN_FILE": str(target)})
assert token == "xyz"
def test_load_token_raises_when_file_missing(tmp_path: Path) -> None:
with pytest.raises(FileNotFoundError):
load_token(path=tmp_path / "missing")
def test_load_token_raises_when_file_empty(tmp_path: Path) -> None:
target = tmp_path / "empty"
target.write_text("", encoding="utf-8")
with pytest.raises(ValueError, match="empty"):
load_token(path=target)
def test_mcp_services_table_is_complete() -> None:
# Telegram and Portfolio are now in-process and must NOT be listed
# as shared MCP services.
expected = {"deribit", "hyperliquid", "macro", "sentiment"}
assert set(MCP_SERVICES) == expected