"""In-process portfolio aggregator. Each Cerbero Suite bot now manages its own portfolio view: instead of calling a shared ``mcp-portfolio`` service, this client composes the account summaries and open positions from the exchanges the bot actually uses (Deribit options + Hyperliquid perps) and converts them to EUR via the macro service. Two values are exposed: * :py:meth:`total_equity_eur` — sum of USDC equity on Deribit and USD equity on Hyperliquid, converted to EUR using the live ``EURUSD`` rate from ``mcp-macro``. * :py:meth:`asset_pct_of_portfolio` — fraction (0..1) of total USD equity exposed to a specific ticker via open positions on the two exchanges. Used by entry filter §2.7 (``eth_holdings_pct_max``). **Scope note**: this is the bot's own slice. Holdings on other exchanges, in cold storage, or held by other bots in the suite are *not* counted. The §2.7 limit is therefore a per-bot cap, not a suite-wide one. """ from __future__ import annotations import asyncio from collections.abc import Iterable from decimal import Decimal from typing import Any, cast from cerbero_bite.clients._exceptions import McpDataAnomalyError from cerbero_bite.clients.deribit import DeribitClient from cerbero_bite.clients.hyperliquid import HyperliquidClient from cerbero_bite.clients.macro import MacroClient __all__ = ["PortfolioClient"] def _decimal_or_zero(value: Any) -> Decimal: if value is None: return Decimal(0) try: return Decimal(str(value)) except (ValueError, ArithmeticError): return Decimal(0) def _position_notional_usd(pos: dict[str, Any]) -> Decimal: """Best-effort USD notional of an open position. Prefers an explicit ``notional_usd`` / ``size_usd`` / ``value_usd`` field. Falls back to ``|size × mark_price|`` (or ``index_price`` if mark is missing). Returns 0 on malformed entries. """ for key in ("notional_usd", "size_usd", "value_usd", "position_value"): v = pos.get(key) if v is not None: return abs(_decimal_or_zero(v)) size = _decimal_or_zero(pos.get("size") or pos.get("szi")) mark = _decimal_or_zero( pos.get("mark_price") or pos.get("entry_price") or pos.get("index_price") ) return abs(size * mark) def _instrument_label(pos: dict[str, Any]) -> str: for key in ("instrument_name", "instrument", "symbol", "coin", "asset"): v = pos.get(key) if v is not None: return str(v).upper() return "" class PortfolioClient: """Aggregates equity + asset exposure across the bot's exchange accounts.""" def __init__( self, *, deribit: DeribitClient, hyperliquid: HyperliquidClient, macro: MacroClient, ) -> None: self._deribit = deribit self._hyperliquid = hyperliquid self._macro = macro async def _equity_usd_components(self) -> tuple[Decimal, Decimal]: """Concurrent fetch of (deribit_equity_usd, hyperliquid_equity_usd).""" deribit_summary, hl_summary = await asyncio.gather( self._deribit.get_account_summary(currency="USDC"), self._hyperliquid.get_account_summary(), ) deribit_eq = _decimal_or_zero(deribit_summary.get("equity")) hl_eq = _decimal_or_zero(hl_summary.get("equity")) return deribit_eq, hl_eq async def total_equity_usd(self) -> Decimal: """Sum equity USD across the bot's exchange accounts.""" deribit_eq, hl_eq = await self._equity_usd_components() return deribit_eq + hl_eq async def total_equity_eur(self) -> Decimal: """Return aggregate bot equity in EUR. Concurrent: account summaries × FX. Raises :class:`McpDataAnomalyError` if the FX rate is non-positive. """ components_t = asyncio.create_task(self._equity_usd_components()) fx_t = asyncio.create_task(self._macro.eur_usd_rate()) await asyncio.gather(components_t, fx_t) deribit_eq, hl_eq = components_t.result() fx = fx_t.result() if fx <= 0: raise McpDataAnomalyError( f"non-positive EURUSD rate: {fx}", service="macro", tool="get_asset_price", ) usd_total = deribit_eq + hl_eq return usd_total / fx async def asset_pct_of_portfolio(self, ticker: str) -> Decimal: """Fraction of bot equity (USD) exposed to ``ticker``. Sums absolute USD notional of open positions whose instrument label contains ``ticker`` (case-insensitive) on Deribit and Hyperliquid, divided by the bot's total USD equity. Returns 0 when there is no equity or no exposure. """ target = ticker.upper() deribit_pos_t = asyncio.create_task( self._deribit.get_positions(currency="USDC") ) hl_pos_t = asyncio.create_task(self._hyperliquid.get_positions()) equity_t = asyncio.create_task(self._equity_usd_components()) await asyncio.gather(deribit_pos_t, hl_pos_t, equity_t) exposure_usd = Decimal(0) for raw_pos in cast(Iterable[Any], deribit_pos_t.result()): if not isinstance(raw_pos, dict): continue if target in _instrument_label(raw_pos): exposure_usd += _position_notional_usd(raw_pos) for raw_pos in cast(Iterable[Any], hl_pos_t.result()): if not isinstance(raw_pos, dict): continue if target in _instrument_label(raw_pos): exposure_usd += _position_notional_usd(raw_pos) deribit_eq, hl_eq = equity_t.result() total_eq = deribit_eq + hl_eq if total_eq <= 0: return Decimal(0) return exposure_usd / total_eq