Files
Cerbero-mcp/src/cerbero_mcp/client_registry.py
T
AdrianoDev b71c66917c chore(V2): cleanup quality gate
- ruff: contextlib.suppress al posto di try/except/pass (client_registry, test_env_routing)
- rimozione services/ legacy (residuo da git rm)
- fix integration test fixture: rimosso sys.modules.pop che inquinava module references nei test successivi (test_audit, test_client_init_default_http)

254 test passano. Ruff: clean. Mypy: 68 warning preesistenti dal codice V1 migrato (strict=false).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 19:02:55 +02:00

41 lines
1.3 KiB
Python

"""Cache lazy di client exchange, una istanza per (exchange, env)."""
from __future__ import annotations
import asyncio
import contextlib
from collections import defaultdict
from collections.abc import Awaitable, Callable
from typing import Any, Literal
Environment = Literal["testnet", "mainnet"]
Builder = Callable[[str, Environment], Awaitable[Any]]
class ClientRegistry:
def __init__(self, *, builder: Builder) -> None:
self._builder = builder
self._clients: dict[tuple[str, Environment], Any] = {}
self._locks: dict[tuple[str, Environment], asyncio.Lock] = defaultdict(
asyncio.Lock
)
async def get(self, exchange: str, env: Environment) -> Any:
key = (exchange, env)
if key in self._clients:
return self._clients[key]
async with self._locks[key]:
if key in self._clients: # double-check
return self._clients[key]
client = await self._builder(exchange, env)
self._clients[key] = client
return client
async def aclose(self) -> None:
for client in self._clients.values():
close = getattr(client, "aclose", None)
if close is None:
continue
with contextlib.suppress(Exception):
await close()
self._clients.clear()