feat(cerbero): HTTP client with bearer + bot-tag + retry

Client minimale verso Cerbero MCP unified server con header
Authorization Bearer e X-Bot-Tag su ogni richiesta. Retry esponenziale
solo su ConnectionError (non su 4xx). 4xx/5xx -> RuntimeError.

Switch da httpx a requests per allineamento con libreria mock
`responses` (httpx richiederebbe respx). httpx resta in deps per usi
futuri. Aggiunto types-requests ai dev deps per mypy strict.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-09 19:25:00 +02:00
parent b61bbaf13c
commit 14b1b84a47
5 changed files with 115 additions and 0 deletions
+37
View File
@@ -0,0 +1,37 @@
import pytest
import responses
from multi_swarm.cerbero.client import CerberoClient
@responses.activate
def test_call_tool_passes_bearer_and_bot_tag() -> None:
responses.add(
responses.POST,
"http://test:9000/mcp-deribit/tools/get_iv_rank",
json={"iv_rank": 0.42},
status=200,
)
client = CerberoClient(
base_url="http://test:9000", token="tok-xyz", bot_tag="swarm-poc-phase1"
)
result = client.call_tool("deribit", "get_iv_rank", {"symbol": "BTC-PERPETUAL"})
assert result == {"iv_rank": 0.42}
req = responses.calls[0].request
assert req.headers["Authorization"] == "Bearer tok-xyz"
assert req.headers["X-Bot-Tag"] == "swarm-poc-phase1"
@responses.activate
def test_call_tool_raises_on_error() -> None:
responses.add(
responses.POST,
"http://test:9000/mcp-deribit/tools/get_iv_rank",
json={"error": "bad"},
status=400,
)
client = CerberoClient(
base_url="http://test:9000", token="tok-xyz", bot_tag="swarm-poc-phase1"
)
with pytest.raises(RuntimeError):
client.call_tool("deribit", "get_iv_rank", {})