"""HTTP tool client common to every MCP wrapper. Each MCP service exposes ``POST /tools/`` with a JSON body and a ``Bearer `` header. ``HttpToolClient`` is a thin wrapper around :class:`httpx.AsyncClient` that: * Adds the auth header. * Applies the project-wide timeout (default 8 s, see ``docs/10-config-spec.md`` ``mcp.call_timeout_s``). * Retries the call on transient failures with exponential backoff (1 s, 5 s, 30 s) — at most 3 attempts in total. * Maps HTTP errors and ``state == "error"`` envelopes into the typed exceptions in :mod:`cerbero_bite.clients._exceptions`. The wrapper does *not* hold a long-lived ``AsyncClient`` by default: each call opens and closes its own connection so a transient DNS issue on one MCP server does not corrupt connection pooling for the others. A shared pool can still be passed in via ``transport`` / ``client`` when the orchestrator wants connection reuse. """ from __future__ import annotations import json import logging from collections.abc import Awaitable, Callable from typing import Any import httpx from tenacity import ( AsyncRetrying, RetryError, retry_if_exception_type, stop_after_attempt, wait_exponential, ) from cerbero_bite.clients._exceptions import ( McpAuthError, McpError, McpNotFoundError, McpServerError, McpTimeoutError, McpToolError, ) __all__ = ["HttpToolClient"] _log = logging.getLogger("cerbero_bite.clients") _RETRYABLE: tuple[type[BaseException], ...] = ( McpTimeoutError, McpServerError, ) class HttpToolClient: """Async client for ``POST /tools/`` style MCP services. Args: service: short service identifier (``"deribit"``, ``"macro"`` …). base_url: e.g. ``"http://mcp-deribit:9011"``. Trailing slash is stripped. token: bearer token for the ``Authorization`` header. timeout_s: per-request timeout, default 8 seconds. retry_max: max number of attempts (1 = no retry). retry_base_delay: base delay for exponential backoff. sleep: hook for tests to skip real waits. """ def __init__( self, *, service: str, base_url: str, token: str, timeout_s: float = 8.0, retry_max: int = 3, retry_base_delay: float = 1.0, sleep: Callable[[int | float], Awaitable[None] | None] | None = None, ) -> None: self._service = service self._base_url = base_url.rstrip("/") self._token = token self._timeout = httpx.Timeout(timeout_s) self._retry_max = max(1, retry_max) self._retry_base_delay = retry_base_delay self._sleep = sleep @property def service(self) -> str: return self._service @property def base_url(self) -> str: return self._base_url async def call( self, tool: str, body: dict[str, Any] | None = None, *, client: httpx.AsyncClient | None = None, ) -> Any: """Invoke ``tool`` with ``body`` and return the parsed JSON response. Returns whatever shape the server replies with (typically ``dict``, sometimes ``list``). The wrapper checks ``state == "error"`` only on ``dict`` responses; list/scalar responses are passed through unchanged. """ url = f"{self._base_url}/tools/{tool}" headers = { "Authorization": f"Bearer {self._token}", "Content-Type": "application/json", } payload = body or {} async def _attempt() -> Any: return await self._do_request( url=url, headers=headers, payload=payload, tool=tool, client=client, ) if self._retry_max <= 1: return await _attempt() retry_kwargs: dict[str, Any] = { "stop": stop_after_attempt(self._retry_max), "wait": wait_exponential(multiplier=self._retry_base_delay, min=1, max=30), "retry": retry_if_exception_type(_RETRYABLE), "reraise": True, } if self._sleep is not None: retry_kwargs["sleep"] = self._sleep retrier = AsyncRetrying(**retry_kwargs) try: async for attempt in retrier: with attempt: return await _attempt() except RetryError as exc: # pragma: no cover — reraise=True covers it raise exc.last_attempt.exception() or McpError( "retry exhausted", service=self._service, tool=tool ) from exc # mypy needs an explicit fall-through — retry never falls out of the loop raise McpError( "unreachable retry loop exit", service=self._service, tool=tool ) # pragma: no cover async def _do_request( self, *, url: str, headers: dict[str, str], payload: dict[str, Any], tool: str, client: httpx.AsyncClient | None, ) -> Any: request_client = client or httpx.AsyncClient(timeout=self._timeout) owned = client is None try: try: response = await request_client.post(url, json=payload, headers=headers) except httpx.TimeoutException as exc: raise McpTimeoutError( f"timeout calling {self._service}.{tool}", service=self._service, tool=tool, ) from exc except httpx.HTTPError as exc: raise McpServerError( f"HTTP error calling {self._service}.{tool}: {exc}", service=self._service, tool=tool, ) from exc self._raise_for_status(response, tool=tool) try: data: Any = response.json() except json.JSONDecodeError as exc: raise McpServerError( f"{self._service}.{tool}: response is not JSON", service=self._service, tool=tool, ) from exc if isinstance(data, dict) and data.get("state") == "error": raise McpToolError( f"{self._service}.{tool} returned error: " f"{data.get('error', 'unknown')}", service=self._service, tool=tool, payload=data, ) return data finally: if owned: await request_client.aclose() def _raise_for_status(self, response: httpx.Response, *, tool: str) -> None: status = response.status_code if 200 <= status < 300: return if status in (401, 403): raise McpAuthError( f"{self._service}.{tool} authentication failed (HTTP {status})", service=self._service, tool=tool, ) if status == 404: raise McpNotFoundError( f"{self._service}.{tool} not found (HTTP 404)", service=self._service, tool=tool, ) # 4xx other than auth/404 → tool error from server side; do not retry. # 5xx → server fault, retry-eligible. message = ( f"{self._service}.{tool} HTTP {status}: " f"{(response.text or '')[:200]!r}" ) if 500 <= status < 600: raise McpServerError(message, service=self._service, tool=tool) raise McpToolError(message, service=self._service, tool=tool)