Phase 4 hardening: dealer-gamma + liquidation-heatmap entry filters

Integra due nuovi filtri dal pacchetto quant indicators rilasciato in
Cerbero_mcp (commit a13e3fe). 335 test pass, mypy strict pulito,
ruff clean.

Filtri (§2.8 — nuovo):
- dealer-gamma: blocca entry quando total_net_dealer_gamma <
  dealer_gamma_min (default 0). Long-gamma regime favorisce credit
  spread (vol-suppressing dealer flow); short-gamma flow lo amplifica
  ed è da evitare.
- liquidation-heatmap: blocca entry quando il segnale euristico di
  cerbero-sentiment riporta long o short squeeze risk = "high"
  (cluster di liquidations imminenti entro 24h).

Entrambi sono best-effort: se il tool MCP fallisce o restituisce
dati anomali l'entry_cycle popola EntryContext con None e
validate_entry salta il gate per non bloccare entry su problemi
infrastrutturali.

Wrapper:
- DeribitClient.dealer_gamma_profile_eth → DealerGammaSnapshot.
- SentimentClient.liquidation_heatmap → LiquidationHeatmap con
  property has_high_squeeze_risk.

Schema:
- EntryConfig.dealer_gamma_min, dealer_gamma_filter_enabled,
  liquidation_filter_enabled.
- EntryContext.dealer_net_gamma, liquidation_squeeze_risk_high
  opzionali.
- strategy.yaml: nuovi campi documentati con commento + hash
  ricalcolato (4c2be4c5...).

Documentazione:
- docs/04-mcp-integration.md riscritto al modello attuale (HTTP
  REST, no mcp SDK, no memory/brain-bridge, place_combo_order
  documentato, environment_info al boot).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-28 07:26:33 +02:00
parent b5b96f959c
commit f4faef6fd1
11 changed files with 489 additions and 190 deletions
+102 -187
View File
@@ -1,243 +1,158 @@
# 04 — MCP Integration # 04 — MCP Integration
Tutti i server MCP sono già configurati a livello di `CerberoSuite` (vedi Cerbero Bite consuma sei servizi MCP HTTP della suite (`Cerbero_mcp`).
`Cerbero_Office/.mcp.json`). Cerbero Bite vi si connette come **client Non utilizza l'SDK Python `mcp`: ogni server espone gli endpoint REST
MCP** usando l'SDK ufficiale `mcp` per Python. `POST <base_url>/tools/<tool_name>` con autenticazione Bearer, e Cerbero
Bite vi si collega tramite `httpx.AsyncClient` long-lived
(`clients/_base.py`).
## Configurazione di connessione ## Configurazione di connessione
Cerbero Bite legge `~/.config/cerbero-suite/mcp.json` (o, in dev, da Le URL sono risolte da `cerbero_bite.config.mcp_endpoints.load_endpoints`,
`.mcp.json` locale puntato via env var `CERBERO_BITE_MCP_CONFIG`). I con default che corrispondono al DNS della rete Docker
server vengono risolti **per nome** dichiarato nel file di config. `cerbero-suite` (`http://mcp-deribit:9011`, `http://mcp-macro:9013`,
ecc.). Ogni servizio può essere sovrascritto da una variabile
d'ambiente dedicata, utile in sviluppo:
| Servizio | Variabile d'ambiente | Default Docker DNS |
|---|---|---|
| Deribit | `CERBERO_BITE_MCP_DERIBIT_URL` | `http://mcp-deribit:9011` |
| Hyperliquid | `CERBERO_BITE_MCP_HYPERLIQUID_URL` | `http://mcp-hyperliquid:9012` |
| Macro | `CERBERO_BITE_MCP_MACRO_URL` | `http://mcp-macro:9013` |
| Sentiment | `CERBERO_BITE_MCP_SENTIMENT_URL` | `http://mcp-sentiment:9014` |
| Telegram | `CERBERO_BITE_MCP_TELEGRAM_URL` | `http://mcp-telegram:9017` |
| Portfolio | `CERBERO_BITE_MCP_PORTFOLIO_URL` | `http://mcp-portfolio:9018` |
Il bearer token per le chiamate è il token con capability `core` letto
da `secrets/core.token` (path configurabile via
`CERBERO_BITE_CORE_TOKEN_FILE`, default `/run/secrets/core_token` nel
container). Non è loggato.
```python ```python
# clients/_base.py — abstract # clients/_base.py — sintesi
class McpClient: class HttpToolClient:
name: str # "cerbero-deribit", ecc. service: str # "deribit", "macro", ...
base_url: str # "http://mcp-deribit:9011"
token: str # bearer
timeout_s: float = 8.0 timeout_s: float = 8.0
retry_max: int = 3 retry_max: int = 3 # esponenziale 1s/5s/30s
retry_base_delay: float = 1.0 # esponenziale client: httpx.AsyncClient | None # condiviso dal RuntimeContext
async def call(self, tool: str, **params) -> dict: ... async def call(self, tool: str, body: dict | None = None) -> Any: ...
``` ```
Ogni wrapper concreto eredita `McpClient` ed espone metodi tipizzati. Ogni wrapper concreto compone un `HttpToolClient` e ritorna i record
La logica di retry e timeout è centralizzata. Pydantic consumati direttamente dagli algoritmi `core/`.
## Server MCP usati ## Server MCP usati
### `cerbero-deribit` ### `cerbero-deribit`
Tool consumati: Sorgente di tutti i dati di mercato sulle opzioni e canale di
esecuzione: Cerbero Bite invia gli ordini combo direttamente al broker
attraverso questo MCP, senza intermediazioni.
| Tool | Uso | Frequenza | | Tool | Uso | Frequenza |
|---|---|---| |---|---|---|
| `get_index_price(asset="ETH")` | Spot ETH per calcolo strike | Ogni ciclo entry + monitor | | `environment_info` | Verifica al boot: testnet/mainnet, base_url, max_leverage | Boot + ogni ciclo health |
| `get_dvol()` | Volatilità implicita aggregata ETH | Ogni ciclo entry + monitor | | `get_ticker(instrument_name)` | Spot proxy via `ETH-PERPETUAL.mark_price`, mid/bid/ask + greche per le leg | Ogni ciclo entry + monitor |
| `get_options_chain(asset, expiry_window)` | Lista strumenti per dato DTE | Solo entry | | `get_ticker_batch(instrument_names)` | Quotes in batch per la chain candidata (max 20) | Solo entry |
| `get_instrument(instrument_name)` | Mid, bid, ask, greche su singolo strumento | Entry + monitor | | `get_dvol(currency="ETH", start_date, end_date)` | Latest DVOL per filtro §2.3 e bias §3.1 | Ogni ciclo entry + monitor |
| `get_orderbook(instrument_name, depth=5)` | Profondità per liquidity_gate e slippage | Solo entry | | `get_instruments(currency, kind="option", expiry_from, expiry_to, min_open_interest)` | Lista strike per il DTE window | Solo entry |
| `get_combo_mark(legs)` | Mark price del combo (debito di chiusura) | Solo monitor | | `get_orderbook(instrument_name, depth=3)` | `book_depth_top3` per liquidity gate | Solo entry |
| `get_account_summary(currency="USDC")` | Equity Deribit, margin libero | Periodico | | `get_historical(instrument, start_date, end_date, resolution)` | Spot 30g fa per bias direzionale + bootstrap return_4h | Entry + monitor (fallback) |
| `get_technical_indicators(instrument, indicators=["adx"], ...)` | ADX(14) per il filtro Iron Condor §3.1 | Solo entry |
| `get_account_summary(currency="USDC")` | Equity Deribit, margin libero (informativo) | Boot + monitor |
| `get_positions(currency="USDC")` | Riconciliazione stato dopo crash | Boot |
| `place_combo_order(legs, side, amount, type, price, label)` | **Esecuzione**: combo atomico via `private/create_combo` + `private/buy/sell` sul combo creato | Entry + monitor (close) |
| `cancel_order(order_id)` | Repricing e annullamenti | Solo monitor |
Wrapper: Note operative:
```python - Tutti i prezzi e le greche sono restituiti come `float` dal server e
# clients/deribit.py convertiti in `Decimal` ad alta precisione nel wrapper, mai usati
class DeribitClient(McpClient): come `float` nel motore decisionale.
name = "cerbero-deribit" - Se la chain risponde con `mark_iv` palesemente fuori range
(es. 7% o 300%) o tutti i `bid == 0` la chiamata viene segnalata come
async def index_price(self, asset: str) -> Decimal: ... `McpDataAnomalyError`; l'orchestrator emette un alert e salta il
async def dvol(self) -> Decimal: ... ciclo.
async def options_chain(self, asset: str, dte_min: int, dte_max: int) -> list[InstrumentSnapshot]: ... - L'invio di `place_combo_order` è atomico: la creazione del combo e
async def instrument(self, name: str) -> InstrumentSnapshot: ... l'ordine eseguito sul combo viaggiano in sequenza ma all'interno di
async def orderbook(self, name: str, depth: int = 5) -> OrderbookSnapshot: ... un'unica chiamata MCP, senza esposizione a leg risk.
async def combo_mark(self, legs: list[OptionLeg]) -> Decimal: ...
async def account_summary(self) -> AccountSummary: ...
```
**Note:**
- Tutti i prezzi sono ricevuti come float dal MCP, convertiti in
`Decimal` con `quantize` a 6 cifre nel wrapper.
- Greche convertite con la stessa quantizzazione.
- Se `mark_iv = 7%` o `300%` o `bid = 0` su orderbook ATM su tutti gli
strumenti → wrapper solleva `DeribitDataAnomalyError` (probabile
testnet o feed rotto). Il decision orchestrator cattura, alert,
skippa il ciclo.
### `cerbero-hyperliquid` ### `cerbero-hyperliquid`
| Tool | Uso | | Tool | Uso |
|---|---| |---|---|
| `get_perp_funding_rate(asset="ETH")` | Filtro entry §2.6 | | `get_funding_rate(instrument="ETH")` | Funding rate ETH-PERP (annualizzato × 8760) per il filtro entry §2.6 |
| `get_perp_summary(asset="ETH")` | Volume 24h, conferma liquidità correlata |
| `get_account_summary()` | Solo per coerenza, non usato in decision loop |
```python
class HyperliquidClient(McpClient):
async def funding_rate_annualized(self, asset: str) -> Decimal: ...
async def perp_summary(self, asset: str) -> PerpSummary: ...
```
### `cerbero-sentiment` ### `cerbero-sentiment`
| Tool | Uso | | Tool | Uso |
|---|---| |---|---|
| `get_funding_cross_exchange(asset="ETH")` | Bias direzionale §3.1 (mediana 4 maggiori) | | `get_cross_exchange_funding(assets=["ETH"])` | Mediana funding annualizzato (Binance/Bybit/OKX 1095, Hyperliquid 8760) per bias direzionale §3.1 |
Le news qualitative **non sono usate** nel decision loop (no LLM). Le news qualitative non vengono consumate dal decision loop:
Vengono eventualmente lette da Adriano in occasione del report Cerbero Bite è deterministico e non interpreta testi liberi.
settimanale.
```python
class SentimentClient(McpClient):
async def funding_cross_median(self, asset: str) -> Decimal: ...
```
### `cerbero-macro` ### `cerbero-macro`
| Tool | Uso | | Tool | Uso |
|---|---| |---|---|
| `get_calendar(days_ahead=18)` | Filtro eventi macro pre-entry | | `get_macro_calendar(days, country_filter, importance_min)` | Filtro entry §2.5: zero eventi `high` in `country_filter` (default `["US","EU"]`) entro la finestra DTE |
Eventi rilevanti (filtra per `severity = high` e `country in {US, EU}`):
FOMC, FED minutes, CPI, NFP, ECB, GDP, Powell speech, Lagarde speech.
```python
class MacroClient(McpClient):
async def upcoming_events(self, days_ahead: int) -> list[MacroEvent]: ...
async def first_high_severity_within(self, days: int) -> int | None:
"""Days until first high-severity event, None if none in window."""
```
### `cerbero-portfolio` ### `cerbero-portfolio`
| Tool | Uso | | Tool | Uso |
|---|---| |---|---|
| `get_holdings()` | Capitale corrente complessivo | | `get_total_portfolio_value(currency="EUR")` | Capitale di base per il sizing engine, dopo conversione in USD |
| `get_holdings_by_asset()` | Filtro entry §2.7 (ETH < 30% portfolio) | | `get_holdings()` | Aggregazione manuale di `current_value_eur` per i ticker che contengono `"ETH"`, usata dal filtro §2.7 (`eth_holdings_pct_max`) |
| `get_correlation()` | Sanity check, non bloccante |
```python
class PortfolioClient(McpClient):
async def total_equity_usd(self) -> Decimal: ...
async def asset_pct(self, asset: str) -> Decimal: ...
```
### `cerbero-memory`
| Tool | Uso |
|---|---|
| `push_user_instruction(payload, source="cerbero-bite")` | Invio istruzione apertura/chiusura a Cerbero core |
| `get_pending(source="cerbero-bite")` | Verifica ack di Cerbero core |
```python
class MemoryClient(McpClient):
async def push_instruction(self, instruction: CerberoInstruction) -> str:
"""Returns instruction_id."""
async def is_acknowledged(self, instruction_id: str) -> bool: ...
```
**Payload `CerberoInstruction`** (schema condiviso con Cerbero core,
documentato in `Cerbero/prompt.base v4`):
```json
{
"source": "cerbero-bite",
"kind": "open_combo" | "close_combo",
"exchange": "deribit",
"asset": "ETH",
"proposal_id": "uuid-...",
"legs": [
{"instrument": "ETH-13MAY26-1900-P", "side": "SELL", "size": 2,
"limit_price_eth": "0.0048"},
{"instrument": "ETH-13MAY26-1810-P", "side": "BUY", "size": 2,
"limit_price_eth": "0.0021"}
],
"limit_combo_eth": "0.0027",
"tif": "GTC",
"expires_at": "2026-04-27T16:00:00Z",
"max_slippage_eth": "0.0005",
"reason": "weekly_open" | "profit_take" | "stop_loss" | "vol_stop" |
"time_stop" | "delta_breach" | "adverse_move",
"milestone": "advisory_only" | "approved_by_user"
}
```
Cerbero core deduplica per `proposal_id` (idempotenza in caso di retry).
### `cerbero-telegram` ### `cerbero-telegram`
| Tool | Uso | Cerbero Bite usa Telegram in modalità **notify-only**: nessuna conferma
|---|---| manuale, nessun callback. L'engine apre e chiude le posizioni
| `send_message(text, parse_mode="MarkdownV2")` | Report pre/post trade, alert | automaticamente quando le regole sono soddisfatte; Telegram viene
| `send_with_buttons(text, buttons)` | Conferma ad Adriano (yes/no) | informato post-fact.
Le conferme devono ritornare entro 60 minuti (entry) o 30 minuti (exit).
Implementazione: l'engine si mette in `await` su una coda interna
alimentata dal callback Telegram via webhook locale.
```python
class TelegramClient(McpClient):
async def send(self, text: str, parse_mode: str = "MarkdownV2") -> int: ...
async def request_confirmation(self, text: str, timeout_s: int) -> bool: ...
```
### `cerbero-brain-bridge`
| Tool | Uso | | Tool | Uso |
|---|---| |---|---|
| `kb_search(query)` | Lookup pre-trade su pattern simili (consultivo) | | `notify(message, priority, tag)` | Alert MEDIUM o messaggi informativi |
| `kb_read(path)` | Lettura nota wiki specifica | | `notify_position_opened(instrument, side, size, strategy, greeks, expected_pnl)` | Notifica di entry placed |
| `kb_write(path, content)` | Salvataggio learning post-trade | | `notify_position_closed(instrument, realized_pnl, reason)` | Notifica di exit filled |
| `notify_alert(source, message, priority)` | Alert HIGH (kill switch) |
**Importante:** il brain-bridge non partecipa al decision loop. Le | `notify_system_error(message, component, priority)` | Alert CRITICAL |
chiamate `kb_search` sono **consultive** e i risultati allegati al
report di Adriano per contesto, mai consumati come input ai filtri
deterministici.
```python
class BrainBridgeClient(McpClient):
async def search(self, query: str, limit: int = 5) -> list[KbHit]: ...
async def write_note(self, path: str, content: str) -> None: ...
```
### `cerbero-scheduler`
**Non usato** dal decision loop. Cerbero Bite ha il proprio scheduler
APScheduler interno. Il MCP scheduler resta a disposizione del core
Cerbero per altre routine.
## Errori e degradation ## Errori e degradation
| Server down | Comportamento | | Server fuori uso | Comportamento |
|---|---| |---|---|
| `cerbero-deribit` | Skip ciclo entry; per monitor → alert e marca posizione come `unknown_state` (non chiude alla cieca) | | `cerbero-deribit` | **Hard fail**: senza dati di mercato e canale di esecuzione il ciclo viene saltato; in monitor le posizioni esistenti restano nello stato corrente, alert HIGH e kill switch |
| `cerbero-hyperliquid` | Skip filtro funding §2.6 con warning; entry può proseguire se altre condizioni soddisfatte | | `cerbero-hyperliquid` | Skip del filtro funding §2.6 con warning; il ciclo prosegue se le altre condizioni sono soddisfatte |
| `cerbero-sentiment` | Bias §3.1 cade in `no_entry` per default (no funding cross → niente direzione) | | `cerbero-sentiment` | Bias §3.1 cade su `no_entry` per default (senza funding cross il bias non può fissare la direzione) |
| `cerbero-macro` | **Hard fail**: senza calendar non si apre. È un filtro irrinunciabile | | `cerbero-macro` | Hard fail per il filtro §2.5; senza calendar non si apre |
| `cerbero-portfolio` | Skip filtro §2.7 con warning; sizing usa ultimo capitale noto da SQLite con warning | | `cerbero-portfolio` | Skip dei filtri §2.7 con warning; il sizing usa l'ultimo capitale noto da SQLite |
| `cerbero-memory` | Hard fail per esecuzione: senza push_user_instruction non si può aprire/chiudere | | `cerbero-telegram` | Skip notifiche post-fact; il ciclo decisionale non viene bloccato (l'engine non aspetta risposte) |
| `cerbero-telegram` | Skip ciclo: senza canale di conferma niente proposta |
| `cerbero-brain-bridge` | Skip lookup, log warning. Mai bloccante |
Ogni "hard fail" → alert sonoro su Telegram via canale di backup I trigger HIGH e CRITICAL armano il kill switch e propagano un alert
(BotPapà), kill switch armato fino al ripristino. in audit chain.
## Verifica ambiente al boot
All'avvio l'orchestrator (`runtime/orchestrator.boot`) chiama
`cerbero-deribit.environment_info` e confronta il campo `environment`
con `strategy.execution.environment`. Un `mismatch` (per esempio engine
configurato per `testnet` ma server agganciato a `mainnet`) produce un
alert CRITICAL e arma il kill switch prima che qualsiasi ciclo
trading parta. La stessa verifica viene ripetuta dal probe periodico
(ogni 5 minuti) di `runtime/health_check.HealthCheck`.
## Versioning ## Versioning
Cerbero Bite verifica all'avvio la versione di ciascun MCP via I server MCP non espongono attualmente un endpoint `get_version()`
`get_version()` (tool standard). Schema di versioning attesa: formale; il check di compatibilità si limita a `environment_info` per
Deribit e a un round-trip lightweight sui tool read-only degli altri
```python servizi nel job di health check. Quando i server pubblicheranno il
EXPECTED_MCP_VERSIONS = { versionamento esplicito, l'orchestrator confronterà al boot le
"cerbero-deribit": "^2.0.0", versioni con la tabella `EXPECTED_MCP_VERSIONS` e armerà il kill
"cerbero-hyperliquid": "^1.5.0", switch su mismatch.
"cerbero-memory": "^4.0.0",
"cerbero-portfolio": "^1.2.0",
...
}
```
Mismatch → kill switch e alert manuale. Mai partire con MCP a versione
incompatibile.
+56
View File
@@ -22,6 +22,7 @@ from cerbero_bite.core.types import PutOrCall
__all__ = [ __all__ = [
"ComboLegOrder", "ComboLegOrder",
"ComboOrderResult", "ComboOrderResult",
"DealerGammaSnapshot",
"DeribitClient", "DeribitClient",
"DeribitEnvironment", "DeribitEnvironment",
"InstrumentMeta", "InstrumentMeta",
@@ -86,6 +87,17 @@ class ComboOrderResult(BaseModel):
raw: dict[str, Any] raw: dict[str, Any]
class DealerGammaSnapshot(BaseModel):
"""Result of ``get_dealer_gamma_profile`` flattened to what Bite consumes."""
model_config = ConfigDict(frozen=True, extra="ignore")
spot_price: Decimal
total_net_dealer_gamma: Decimal
gamma_flip_level: Decimal | None
strikes_analyzed: int
def _parse_instrument(name: str) -> tuple[Decimal, datetime, PutOrCall]: def _parse_instrument(name: str) -> tuple[Decimal, datetime, PutOrCall]:
"""Return ``(strike, expiry, option_type)`` parsed from a Deribit instrument.""" """Return ``(strike, expiry, option_type)`` parsed from a Deribit instrument."""
match = _INSTRUMENT_RE.match(name) match = _INSTRUMENT_RE.match(name)
@@ -291,6 +303,50 @@ class DeribitClient:
return Decimal(str(entry["close"])) return Decimal(str(entry["close"]))
return None return None
async def dealer_gamma_profile_eth(
self,
*,
expiry_from: datetime | None = None,
expiry_to: datetime | None = None,
top_n_strikes: int = 50,
) -> DealerGammaSnapshot:
"""Return the aggregated dealer net gamma snapshot for ETH options.
Long-gamma regime (``total_net_dealer_gamma > 0``) is associated
with vol-suppressing dealer hedging — the entry filter §2.8 uses
this signal to avoid selling premium during short-gamma regimes
(vol-amplifying dealer flow).
"""
body: dict[str, Any] = {
"currency": "ETH",
"top_n_strikes": top_n_strikes,
}
if expiry_from is not None:
body["expiry_from"] = expiry_from.date().isoformat()
if expiry_to is not None:
body["expiry_to"] = expiry_to.date().isoformat()
raw = await self._http.call("get_dealer_gamma_profile", body)
if not isinstance(raw, dict):
raise McpDataAnomalyError(
"dealer_gamma_profile: unexpected response shape",
service=self.SERVICE,
tool="get_dealer_gamma_profile",
)
spot = raw.get("spot_price")
total = raw.get("total_net_dealer_gamma")
if spot is None or total is None:
raise McpDataAnomalyError(
"dealer_gamma_profile: missing spot_price or total",
service=self.SERVICE,
tool="get_dealer_gamma_profile",
)
return DealerGammaSnapshot(
spot_price=Decimal(str(spot)),
total_net_dealer_gamma=Decimal(str(total)),
gamma_flip_level=_to_decimal(raw.get("gamma_flip_level")),
strikes_analyzed=int(raw.get("strikes_analyzed") or 0),
)
async def adx_14( async def adx_14(
self, self,
*, *,
+60 -1
View File
@@ -16,11 +16,14 @@ from __future__ import annotations
import statistics import statistics
from decimal import Decimal from decimal import Decimal
from typing import Literal
from pydantic import BaseModel, ConfigDict
from cerbero_bite.clients._base import HttpToolClient from cerbero_bite.clients._base import HttpToolClient
from cerbero_bite.clients._exceptions import McpDataAnomalyError from cerbero_bite.clients._exceptions import McpDataAnomalyError
__all__ = ["EXCHANGE_PERIODS_PER_YEAR", "SentimentClient"] __all__ = ["EXCHANGE_PERIODS_PER_YEAR", "LiquidationHeatmap", "SentimentClient"]
# Funding settlement frequency per year. 1095 = 365 × 3 (8-hour funding). # Funding settlement frequency per year. 1095 = 365 × 3 (8-hour funding).
@@ -32,6 +35,26 @@ EXCHANGE_PERIODS_PER_YEAR: dict[str, int] = {
} }
SqueezeRiskLevel = Literal["low", "medium", "high"]
class LiquidationHeatmap(BaseModel):
"""Heuristic liquidation pressure snapshot for a single asset."""
model_config = ConfigDict(frozen=True, extra="ignore")
asset: str
avg_funding_rate: Decimal | None
oi_delta_pct_4h: Decimal | None
oi_delta_pct_24h: Decimal | None
long_squeeze_risk: SqueezeRiskLevel
short_squeeze_risk: SqueezeRiskLevel
@property
def has_high_squeeze_risk(self) -> bool:
return self.long_squeeze_risk == "high" or self.short_squeeze_risk == "high"
class SentimentClient: class SentimentClient:
SERVICE = "sentiment" SERVICE = "sentiment"
@@ -77,3 +100,39 @@ class SentimentClient:
# statistics.median works on Decimal: it returns an averaged # statistics.median works on Decimal: it returns an averaged
# Decimal for even counts, which is exactly what we want. # Decimal for even counts, which is exactly what we want.
return Decimal(str(statistics.median(annualized))) return Decimal(str(statistics.median(annualized)))
async def liquidation_heatmap(self, asset: str) -> LiquidationHeatmap:
"""Return the heuristic liquidation pressure snapshot for ``asset``.
Cerbero Bite uses ``has_high_squeeze_risk`` as an entry-time
filter (§2.8): when either side is flagged ``high`` we skip the
cycle to avoid selling premium right before a likely shock.
"""
raw = await self._http.call(
"get_liquidation_heatmap", {"asset": asset.upper()}
)
if not isinstance(raw, dict):
raise McpDataAnomalyError(
"liquidation_heatmap: unexpected response shape",
service=self.SERVICE,
tool="get_liquidation_heatmap",
)
def _maybe_dec(value: object) -> Decimal | None:
return None if value is None else Decimal(str(value))
long_risk = str(raw.get("long_squeeze_risk") or "low")
short_risk = str(raw.get("short_squeeze_risk") or "low")
if long_risk not in ("low", "medium", "high"):
long_risk = "low"
if short_risk not in ("low", "medium", "high"):
short_risk = "low"
return LiquidationHeatmap(
asset=str(raw.get("asset") or asset).upper(),
avg_funding_rate=_maybe_dec(raw.get("avg_funding_rate")),
oi_delta_pct_4h=_maybe_dec(raw.get("oi_delta_pct_4h")),
oi_delta_pct_24h=_maybe_dec(raw.get("oi_delta_pct_24h")),
long_squeeze_risk=long_risk, # type: ignore[arg-type]
short_squeeze_risk=short_risk, # type: ignore[arg-type]
)
+5
View File
@@ -70,6 +70,11 @@ class EntryConfig(BaseModel):
iron_condor_adx_max: Decimal = Field(default=Decimal("20")) iron_condor_adx_max: Decimal = Field(default=Decimal("20"))
iron_condor_trend_neutral_band_pct: Decimal = Field(default=Decimal("0.05")) iron_condor_trend_neutral_band_pct: Decimal = Field(default=Decimal("0.05"))
# quant filters (§2.8 — added in Phase 4 hardening)
dealer_gamma_min: Decimal = Field(default=Decimal("0"))
dealer_gamma_filter_enabled: bool = True
liquidation_filter_enabled: bool = True
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Structure # Structure
+28
View File
@@ -37,6 +37,13 @@ class EntryContext(BaseModel):
next_macro_event_in_days: int | None next_macro_event_in_days: int | None
has_open_position: bool has_open_position: bool
# Quant filters (§2.8). Both are optional: when the snapshot
# collector cannot reach the underlying MCP tool the orchestrator
# passes ``None``, and ``validate_entry`` skips the gate to avoid
# blocking entries on infrastructure issues.
dealer_net_gamma: Decimal | None = None
liquidation_squeeze_risk_high: bool | None = None
class EntryDecision(BaseModel): class EntryDecision(BaseModel):
"""Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons.""" """Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons."""
@@ -103,6 +110,27 @@ def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision:
f"{entry_cfg.eth_holdings_pct_max})" f"{entry_cfg.eth_holdings_pct_max})"
) )
# §2.8: dealer-gamma regime gate. Skip the cycle when dealers are
# net short gamma (vol-amplifying flow) — selling premium during a
# short-gamma regime maximises path-dependent loss.
if (
entry_cfg.dealer_gamma_filter_enabled
and ctx.dealer_net_gamma is not None
and ctx.dealer_net_gamma < entry_cfg.dealer_gamma_min
):
reasons.append(
f"dealer short-gamma regime "
f"({ctx.dealer_net_gamma} < {entry_cfg.dealer_gamma_min})"
)
# §2.8: liquidation-pressure gate. Skip when the heuristic flags an
# imminent squeeze on either side.
if (
entry_cfg.liquidation_filter_enabled
and ctx.liquidation_squeeze_risk_high is True
):
reasons.append("imminent liquidation squeeze risk")
return EntryDecision(accepted=not reasons, reasons=reasons) return EntryDecision(accepted=not reasons, reasons=reasons)
+33
View File
@@ -92,6 +92,8 @@ class _MarketSnapshot:
macro_days_to_event: int | None macro_days_to_event: int | None
eth_holdings_pct: Decimal eth_holdings_pct: Decimal
portfolio_eur: Decimal portfolio_eur: Decimal
dealer_net_gamma: Decimal | None
liquidation_squeeze_risk_high: bool | None
async def _gather_snapshot( async def _gather_snapshot(
@@ -148,6 +150,15 @@ async def _gather_snapshot(
portfolio_t: asyncio.Task[Decimal] = asyncio.create_task( portfolio_t: asyncio.Task[Decimal] = asyncio.create_task(
portfolio.total_equity_eur() portfolio.total_equity_eur()
) )
# The two quant filters are best-effort: if the underlying tool
# fails the orchestrator passes ``None`` and validate_entry skips
# the gate (see core/entry_validator §2.8).
dealer_t: asyncio.Task[Decimal | None] = asyncio.create_task(
_safe_dealer_gamma(deribit)
)
liquidation_t: asyncio.Task[bool | None] = asyncio.create_task(
_safe_liquidation_squeeze(sentiment)
)
await asyncio.gather( await asyncio.gather(
spot_t, spot_t,
@@ -159,6 +170,8 @@ async def _gather_snapshot(
macro_t, macro_t,
holdings_t, holdings_t,
portfolio_t, portfolio_t,
dealer_t,
liquidation_t,
) )
return _MarketSnapshot( return _MarketSnapshot(
spot_eth_usd=spot_t.result(), spot_eth_usd=spot_t.result(),
@@ -170,9 +183,27 @@ async def _gather_snapshot(
macro_days_to_event=macro_t.result(), macro_days_to_event=macro_t.result(),
eth_holdings_pct=holdings_t.result(), eth_holdings_pct=holdings_t.result(),
portfolio_eur=portfolio_t.result(), portfolio_eur=portfolio_t.result(),
dealer_net_gamma=dealer_t.result(),
liquidation_squeeze_risk_high=liquidation_t.result(),
) )
async def _safe_dealer_gamma(deribit: DeribitClient) -> Decimal | None:
try:
snap = await deribit.dealer_gamma_profile_eth()
except Exception:
return None
return snap.total_net_dealer_gamma
async def _safe_liquidation_squeeze(sentiment: SentimentClient) -> bool | None:
try:
heatmap = await sentiment.liquidation_heatmap("ETH")
except Exception:
return None
return heatmap.has_high_squeeze_risk
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -321,6 +352,8 @@ async def run_entry_cycle(
eth_holdings_pct_of_portfolio=snap.eth_holdings_pct, eth_holdings_pct_of_portfolio=snap.eth_holdings_pct,
next_macro_event_in_days=snap.macro_days_to_event, next_macro_event_in_days=snap.macro_days_to_event,
has_open_position=False, has_open_position=False,
dealer_net_gamma=snap.dealer_net_gamma,
liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high,
) )
decision = validate_entry(entry_ctx, cfg) decision = validate_entry(entry_ctx, cfg)
inputs = { inputs = {
+10 -1
View File
@@ -7,7 +7,7 @@
# the commit message. # the commit message.
config_version: "1.0.0" config_version: "1.0.0"
config_hash: "f4bfebbb048bed7efa5c0fb71dc188619264edbe8dd09bb195bba8350e609d9c" config_hash: "4c2be4c51c849ed58fa22ec2b302016c453894dd0964b6d05445ab1b723e2d10"
last_review: "2026-04-26" last_review: "2026-04-26"
last_reviewer: "Adriano" last_reviewer: "Adriano"
@@ -37,6 +37,15 @@ entry:
iron_condor_adx_max: "20" iron_condor_adx_max: "20"
iron_condor_trend_neutral_band_pct: "0.05" iron_condor_trend_neutral_band_pct: "0.05"
# Quant filters (§2.8) — gates aggiuntivi via i nuovi tool MCP.
# dealer_gamma_min: scarta entry se dealer net gamma < soglia.
# Long-gamma regime (>0) = dealer hedge vol-suppressing, ideale
# per vendere credit spread. Soglia conservativa, da rifinire dopo
# paper trading.
dealer_gamma_min: "0"
dealer_gamma_filter_enabled: true
liquidation_filter_enabled: true
structure: structure:
dte_target: 18 dte_target: 18
dte_min: 14 dte_min: 14
+78
View File
@@ -82,6 +82,9 @@ def _wire_market_snapshot(
macro_events: list[dict[str, Any]] | None = None, macro_events: list[dict[str, Any]] | None = None,
eth_pct: float = 0.10, eth_pct: float = 0.10,
portfolio_eur: float | Decimal = 5000.0, portfolio_eur: float | Decimal = 5000.0,
dealer_total_net_gamma: float = 12345.6,
liquidation_long_risk: str = "low",
liquidation_short_risk: str = "low",
) -> None: ) -> None:
"""Stub every MCP endpoint queried during the snapshot stage.""" """Stub every MCP endpoint queried during the snapshot stage."""
httpx_mock.add_response( httpx_mock.add_response(
@@ -104,6 +107,29 @@ def _wire_market_snapshot(
json={"adx": [{"value": 22.0}]}, json={"adx": [{"value": 22.0}]},
is_reusable=True, is_reusable=True,
) )
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/get_dealer_gamma_profile",
json={
"spot_price": spot,
"total_net_dealer_gamma": dealer_total_net_gamma,
"gamma_flip_level": spot * 0.99,
"strikes_analyzed": 18,
"by_strike": [],
},
is_reusable=True,
)
httpx_mock.add_response(
url="http://mcp-sentiment:9014/tools/get_liquidation_heatmap",
json={
"asset": "ETH",
"avg_funding_rate": funding_cross_period,
"oi_delta_pct_4h": 1.0,
"oi_delta_pct_24h": 1.0,
"long_squeeze_risk": liquidation_long_risk,
"short_squeeze_risk": liquidation_short_risk,
},
is_reusable=True,
)
httpx_mock.add_response( httpx_mock.add_response(
url="http://mcp-hyperliquid:9012/tools/get_funding_rate", url="http://mcp-hyperliquid:9012/tools/get_funding_rate",
json={"asset": "ETH", "current_funding_rate": funding_perp_hourly}, json={"asset": "ETH", "current_funding_rate": funding_perp_hourly},
@@ -504,6 +530,58 @@ async def test_broker_reject_marks_position_cancelled(
assert ctx.kill_switch.is_armed() is True assert ctx.kill_switch.is_armed() is True
@pytest.mark.asyncio
async def test_dealer_short_gamma_blocks_entry(
cfg: StrategyConfig,
runtime_paths: tuple[Path, Path],
now: datetime,
httpx_mock: HTTPXMock,
) -> None:
_wire_market_snapshot(
httpx_mock,
portfolio_eur=3500,
funding_cross_period=0.0002,
dealer_total_net_gamma=-42000.0,
)
bull_cfg = golden_config(
entry=type(cfg.entry)(
**{**cfg.entry.model_dump(), "trend_bull_threshold_pct": Decimal("0")}
)
)
ctx = _ctx(bull_cfg, runtime_paths, now)
res = await run_entry_cycle(
ctx, eur_to_usd_rate=Decimal("1.075"), now=now
)
assert res.status == "no_entry"
assert "dealer short-gamma" in (res.reason or "")
@pytest.mark.asyncio
async def test_liquidation_high_risk_blocks_entry(
cfg: StrategyConfig,
runtime_paths: tuple[Path, Path],
now: datetime,
httpx_mock: HTTPXMock,
) -> None:
_wire_market_snapshot(
httpx_mock,
portfolio_eur=3500,
funding_cross_period=0.0002,
liquidation_long_risk="high",
)
bull_cfg = golden_config(
entry=type(cfg.entry)(
**{**cfg.entry.model_dump(), "trend_bull_threshold_pct": Decimal("0")}
)
)
ctx = _ctx(bull_cfg, runtime_paths, now)
res = await run_entry_cycle(
ctx, eur_to_usd_rate=Decimal("1.075"), now=now
)
assert res.status == "no_entry"
assert "liquidation squeeze" in (res.reason or "")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_already_open_position_skips_cycle( async def test_already_open_position_skips_cycle(
cfg: StrategyConfig, cfg: StrategyConfig,
+29
View File
@@ -333,6 +333,35 @@ async def test_get_positions_returns_list(httpx_mock: HTTPXMock) -> None:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dealer_gamma_profile_eth_parses_payload(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
url="http://mcp-deribit:9011/tools/get_dealer_gamma_profile",
json={
"currency": "ETH",
"spot_price": 3000.0,
"by_strike": [],
"total_net_dealer_gamma": 12345.6,
"gamma_flip_level": 2950.5,
"strikes_analyzed": 18,
},
)
snap = await _client().dealer_gamma_profile_eth()
assert snap.spot_price == Decimal("3000.0")
assert snap.total_net_dealer_gamma == Decimal("12345.6")
assert snap.gamma_flip_level == Decimal("2950.5")
assert snap.strikes_analyzed == 18
@pytest.mark.asyncio
async def test_dealer_gamma_profile_anomaly_when_total_missing(
httpx_mock: HTTPXMock,
) -> None:
httpx_mock.add_response(json={"spot_price": 3000.0})
with pytest.raises(McpDataAnomalyError, match="missing spot_price or total"):
await _client().dealer_gamma_profile_eth()
def test_deribit_client_rejects_wrong_service() -> None: def test_deribit_client_rejects_wrong_service() -> None:
bad = HttpToolClient( bad = HttpToolClient(
service="macro", base_url="http://x:1", token="t", retry_max=1 service="macro", base_url="http://x:1", token="t", retry_max=1
+37
View File
@@ -99,6 +99,43 @@ def test_periods_table_covers_documented_venues() -> None:
} }
@pytest.mark.asyncio
async def test_liquidation_heatmap_parses_high_risk(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
url="http://mcp-sentiment:9014/tools/get_liquidation_heatmap",
json={
"asset": "ETH",
"avg_funding_rate": 0.00012,
"oi_delta_pct_4h": 6.5,
"oi_delta_pct_24h": 8.2,
"long_squeeze_risk": "high",
"short_squeeze_risk": "low",
},
)
out = await _client().liquidation_heatmap("eth")
assert out.asset == "ETH"
assert out.avg_funding_rate == Decimal("0.00012")
assert out.long_squeeze_risk == "high"
assert out.has_high_squeeze_risk is True
@pytest.mark.asyncio
async def test_liquidation_heatmap_unknown_risk_levels_default_to_low(
httpx_mock: HTTPXMock,
) -> None:
httpx_mock.add_response(
json={
"asset": "ETH",
"long_squeeze_risk": "extreme",
"short_squeeze_risk": None,
}
)
out = await _client().liquidation_heatmap("ETH")
assert out.long_squeeze_risk == "low"
assert out.short_squeeze_risk == "low"
assert out.has_high_squeeze_risk is False
def test_sentiment_client_rejects_wrong_service() -> None: def test_sentiment_client_rejects_wrong_service() -> None:
bad = HttpToolClient( bad = HttpToolClient(
service="macro", service="macro",
+51 -1
View File
@@ -9,7 +9,7 @@ from decimal import Decimal
import pytest import pytest
from cerbero_bite.config import StrategyConfig, golden_config from cerbero_bite.config import EntryConfig, StrategyConfig, golden_config
from cerbero_bite.core.entry_validator import ( from cerbero_bite.core.entry_validator import (
EntryContext, EntryContext,
TrendContext, TrendContext,
@@ -144,6 +144,56 @@ def test_eth_holdings_at_cap_is_accepted(cfg: StrategyConfig) -> None:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_dealer_short_gamma_blocks_entry(cfg: StrategyConfig) -> None:
decision = validate_entry(_good_ctx(dealer_net_gamma=Decimal("-5")), cfg)
assert decision.accepted is False
assert any("dealer short-gamma" in r for r in decision.reasons)
def test_dealer_long_gamma_passes(cfg: StrategyConfig) -> None:
decision = validate_entry(_good_ctx(dealer_net_gamma=Decimal("100")), cfg)
assert decision.accepted is True
def test_dealer_gamma_none_skips_filter(cfg: StrategyConfig) -> None:
decision = validate_entry(_good_ctx(dealer_net_gamma=None), cfg)
assert decision.accepted is True
def test_liquidation_squeeze_high_blocks_entry(cfg: StrategyConfig) -> None:
decision = validate_entry(
_good_ctx(liquidation_squeeze_risk_high=True), cfg
)
assert decision.accepted is False
assert any("liquidation squeeze" in r for r in decision.reasons)
def test_liquidation_squeeze_filter_disabled_in_config(
cfg: StrategyConfig,
) -> None:
permissive = golden_config(
entry=EntryConfig(
**{**cfg.entry.model_dump(), "liquidation_filter_enabled": False}
)
)
decision = validate_entry(
_good_ctx(liquidation_squeeze_risk_high=True), permissive
)
assert decision.accepted is True
def test_dealer_gamma_filter_disabled_in_config(cfg: StrategyConfig) -> None:
permissive = golden_config(
entry=EntryConfig(
**{**cfg.entry.model_dump(), "dealer_gamma_filter_enabled": False}
)
)
decision = validate_entry(
_good_ctx(dealer_net_gamma=Decimal("-1000")), permissive
)
assert decision.accepted is True
def test_validate_entry_accumulates_all_reasons(cfg: StrategyConfig) -> None: def test_validate_entry_accumulates_all_reasons(cfg: StrategyConfig) -> None:
decision = validate_entry( decision = validate_entry(
_good_ctx( _good_ctx(