"""Weekly entry decision loop (``docs/06-operational-flow.md`` §2). Pure orchestration over the existing core/clients/state primitives. The cycle is auto-execute: when every gate passes, the engine sends the combo order without asking Adriano. Telegram is used only to notify the outcome. """ from __future__ import annotations import asyncio import json import logging from dataclasses import dataclass from datetime import UTC, datetime, timedelta from decimal import Decimal from typing import Any from uuid import uuid4 from cerbero_bite.clients.deribit import ( ComboLegOrder, ComboOrderResult, DeribitClient, InstrumentMeta, ) from cerbero_bite.clients.hyperliquid import HyperliquidClient from cerbero_bite.clients.macro import MacroClient from cerbero_bite.clients.portfolio import PortfolioClient from cerbero_bite.clients.sentiment import SentimentClient from cerbero_bite.config.schema import StrategyConfig from cerbero_bite.core.combo_builder import ComboProposal, build, select_strikes from cerbero_bite.core.entry_validator import ( EntryContext, TrendContext, compute_bias, validate_entry, ) from cerbero_bite.core.liquidity_gate import InstrumentSnapshot, check from cerbero_bite.core.sizing_engine import SizingContext, compute_contracts from cerbero_bite.core.types import OptionQuote from cerbero_bite.runtime import auto_pause as auto_pause_module from cerbero_bite.runtime.alert_manager import AlertManager from cerbero_bite.runtime.dependencies import RuntimeContext from cerbero_bite.state import ( DecisionRecord, InstructionRecord, PositionRecord, transaction, ) from cerbero_bite.state import connect as connect_state __all__ = [ "EntryCycleResult", "EntryDecisionStatus", "run_entry_cycle", ] _log = logging.getLogger("cerbero_bite.runtime.entry") EntryDecisionStatus = str # one of the literals below _STATUS_ENTRY_PLACED = "entry_placed" _STATUS_NO_ENTRY = "no_entry" _STATUS_BROKER_REJECT = "broker_reject" _STATUS_KILL_SWITCH = "kill_switch_armed" _STATUS_HAS_OPEN = "has_open_position" _STATUS_AUTO_PAUSED = "auto_paused" @dataclass(frozen=True) class EntryCycleResult: """Outcome of one ``run_entry_cycle`` call (no exception path).""" status: EntryDecisionStatus reason: str | None proposal: ComboProposal | None = None order: ComboOrderResult | None = None # --------------------------------------------------------------------------- # Snapshot collection # --------------------------------------------------------------------------- @dataclass(frozen=True) class _MarketSnapshot: spot_eth_usd: Decimal spot_eth_30d_ago: Decimal | None adx_14: Decimal | None dvol: Decimal funding_perp: Decimal funding_cross: Decimal macro_days_to_event: int | None eth_holdings_pct: Decimal portfolio_eur: Decimal dealer_net_gamma: Decimal | None liquidation_squeeze_risk_high: bool | None async def _gather_snapshot( *, deribit: DeribitClient, hyperliquid: HyperliquidClient, sentiment: SentimentClient, macro: MacroClient, portfolio: PortfolioClient, cfg: StrategyConfig, now: datetime, ) -> _MarketSnapshot: window_days = cfg.entry.trend_window_days historical_start = now - timedelta(days=window_days + 1) historical_end = now - timedelta(days=window_days - 1) adx_start = now - timedelta(days=10) spot_t: asyncio.Task[Decimal] = asyncio.create_task(deribit.index_price_eth()) spot_past_t: asyncio.Task[Decimal | None] = asyncio.create_task( deribit.historical_close( instrument="ETH-PERPETUAL", start=historical_start, end=historical_end, resolution="1D", ) ) adx_t: asyncio.Task[Decimal | None] = asyncio.create_task( deribit.adx_14( instrument="ETH-PERPETUAL", start=adx_start, end=now, resolution="1h", ) ) dvol_t: asyncio.Task[Decimal] = asyncio.create_task( deribit.latest_dvol(currency="ETH", now=now) ) funding_perp_t: asyncio.Task[Decimal] = asyncio.create_task( hyperliquid.funding_rate_annualized("ETH") ) funding_cross_t: asyncio.Task[Decimal] = asyncio.create_task( sentiment.funding_cross_median_annualized("ETH") ) macro_t: asyncio.Task[int | None] = asyncio.create_task( macro.next_high_severity_within( days=cfg.structure.dte_target, countries=list(cfg.entry.exclude_macro_countries), now=now, ) ) holdings_t: asyncio.Task[Decimal] = asyncio.create_task( portfolio.asset_pct_of_portfolio("ETH") ) portfolio_t: asyncio.Task[Decimal] = asyncio.create_task( portfolio.total_equity_eur() ) # The two quant filters are best-effort: if the underlying tool # fails the orchestrator passes ``None`` and validate_entry skips # the gate (see core/entry_validator §2.8). dealer_t: asyncio.Task[Decimal | None] = asyncio.create_task( _safe_dealer_gamma(deribit) ) liquidation_t: asyncio.Task[bool | None] = asyncio.create_task( _safe_liquidation_squeeze(sentiment) ) await asyncio.gather( spot_t, spot_past_t, adx_t, dvol_t, funding_perp_t, funding_cross_t, macro_t, holdings_t, portfolio_t, dealer_t, liquidation_t, ) return _MarketSnapshot( spot_eth_usd=spot_t.result(), spot_eth_30d_ago=spot_past_t.result(), adx_14=adx_t.result(), dvol=dvol_t.result(), funding_perp=funding_perp_t.result(), funding_cross=funding_cross_t.result(), macro_days_to_event=macro_t.result(), eth_holdings_pct=holdings_t.result(), portfolio_eur=portfolio_t.result(), dealer_net_gamma=dealer_t.result(), liquidation_squeeze_risk_high=liquidation_t.result(), ) async def _safe_dealer_gamma(deribit: DeribitClient) -> Decimal | None: try: snap = await deribit.dealer_gamma_profile_eth() except Exception: return None return snap.total_net_dealer_gamma async def _safe_liquidation_squeeze(sentiment: SentimentClient) -> bool | None: try: heatmap = await sentiment.liquidation_heatmap("ETH") except Exception: return None return heatmap.has_high_squeeze_risk # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- async def _record_decision( ctx: RuntimeContext, *, inputs: dict[str, Any], outputs: dict[str, Any], action_taken: str, notes: str | None, proposal_id: str | None, now: datetime, ) -> None: conn = connect_state(ctx.db_path) try: with transaction(conn): ctx.repository.record_decision( conn, DecisionRecord( decision_type="entry_check", timestamp=now, inputs_json=json.dumps(inputs, default=str, sort_keys=True), outputs_json=json.dumps(outputs, default=str, sort_keys=True), action_taken=action_taken, notes=notes, proposal_id=proposal_id, # type: ignore[arg-type] ), ) finally: conn.close() async def _build_quotes( deribit: DeribitClient, chain: list[InstrumentMeta], ) -> list[OptionQuote]: """Fetch tickers + orderbook depth for the given metas, return OptionQuotes.""" if not chain: return [] names = [m.name for m in chain] if len(names) > 20: # Bite consumes a narrow window of strikes; if it ever overflows # the batch limit, the caller is expected to pre-filter. raise ValueError("entry_cycle: too many instruments to quote in one batch") tickers = await deribit.get_tickers(names) depths = await asyncio.gather( *[deribit.orderbook_depth_top3(m.name) for m in chain] ) by_name: dict[str, dict[str, Any]] = { str(t.get("instrument_name")): t for t in tickers if isinstance(t, dict) } out: list[OptionQuote] = [] for meta, depth in zip(chain, depths, strict=True): ticker = by_name.get(meta.name) if not ticker: continue bid = ticker.get("bid") ask = ticker.get("ask") mark = ticker.get("mark_price") greeks = ticker.get("greeks") or {} if bid is None or ask is None or mark is None: continue out.append( OptionQuote( instrument=meta.name, strike=meta.strike, expiry=meta.expiry, option_type=meta.option_type, bid=Decimal(str(bid)), ask=Decimal(str(ask)), mid=Decimal(str(mark)), delta=Decimal(str(greeks.get("delta") or 0)), gamma=Decimal(str(greeks.get("gamma") or 0)), theta=Decimal(str(greeks.get("theta") or 0)), vega=Decimal(str(greeks.get("vega") or 0)), open_interest=int(meta.open_interest or 0), volume_24h=int(ticker.get("volume_24h") or 0), book_depth_top3=int(depth), ) ) return out def _max_loss_per_contract_usd(short_strike: Decimal, long_strike: Decimal) -> Decimal: return (short_strike - long_strike).copy_abs() # --------------------------------------------------------------------------- # Cycle entry point # --------------------------------------------------------------------------- async def run_entry_cycle( ctx: RuntimeContext, *, eur_to_usd_rate: Decimal, now: datetime | None = None, ) -> EntryCycleResult: """Run one weekly entry evaluation cycle. The function is idempotent and side-effect aware: it persists the decision in the ``decisions`` table regardless of outcome and only creates a position when the broker accepts the order. """ when = (now or ctx.clock()).astimezone(UTC) cfg = ctx.cfg alert: AlertManager = ctx.alert_manager if ctx.kill_switch.is_armed(): await ctx.alert_manager.low( source="entry_cycle", message="kill switch armed — skipping" ) return EntryCycleResult(status=_STATUS_KILL_SWITCH, reason="kill_switch") # §7-bis (F): auto-pause circuit breaker. Read-only consultation # of the persisted state — the breach evaluation runs later, after # capital is known. conn = connect_state(ctx.db_path) try: sys_state = ctx.repository.get_system_state(conn) finally: conn.close() pause_status = auto_pause_module.is_paused(sys_state, now=when) if pause_status.paused: await alert.low( source="entry_cycle", message=( f"auto-paused until {pause_status.until} " f"({pause_status.reason or 'no reason'}) — skipping" ), ) return EntryCycleResult( status=_STATUS_AUTO_PAUSED, reason=pause_status.reason or "auto_paused", ) # Has open position? conn = connect_state(ctx.db_path) try: concurrent = ctx.repository.count_concurrent_positions(conn) finally: conn.close() if concurrent > 0: await alert.low(source="entry_cycle", message="position already open") return EntryCycleResult(status=_STATUS_HAS_OPEN, reason="has_open_position") # 1. Snapshot snap = await _gather_snapshot( deribit=ctx.deribit, hyperliquid=ctx.hyperliquid, sentiment=ctx.sentiment, macro=ctx.macro, portfolio=ctx.portfolio, cfg=cfg, now=when, ) capital_usd = snap.portfolio_eur * eur_to_usd_rate # §7-bis (F): rolling drawdown breach evaluation. Se le ultime N # posizioni chiuse hanno cumulato perdite oltre la soglia, armiamo # la pausa e usciamo subito (l'entry di questo ciclo è saltata). auto_cfg = cfg.auto_pause if auto_cfg.enabled: conn = connect_state(ctx.db_path) try: recent_pnls = ctx.repository.recent_closed_position_pnls_usd( conn, limit=auto_cfg.lookback_trades ) finally: conn.close() breach = auto_pause_module.evaluate_drawdown_breach( cfg=auto_cfg, recent_pnl_usd=recent_pnls, capital_usd=capital_usd, ) if breach.should_pause: until = auto_pause_module.pause_until(when, auto_cfg.pause_weeks) conn = connect_state(ctx.db_path) try: with transaction(conn): ctx.repository.set_auto_pause( conn, until=until, reason=breach.reason ) finally: conn.close() await alert.high( source="entry_cycle", message=( f"auto-pause armed: {breach.reason} — paused until {until}" ), ) return EntryCycleResult( status=_STATUS_AUTO_PAUSED, reason=breach.reason or "auto_paused", ) # 2. Entry filters entry_ctx = EntryContext( capital_usd=capital_usd, dvol_now=snap.dvol, funding_perp_annualized=snap.funding_perp, eth_holdings_pct_of_portfolio=snap.eth_holdings_pct, next_macro_event_in_days=snap.macro_days_to_event, has_open_position=False, dealer_net_gamma=snap.dealer_net_gamma, liquidation_squeeze_risk_high=snap.liquidation_squeeze_risk_high, ) decision = validate_entry(entry_ctx, cfg) inputs = { "snapshot": { "spot_eth_usd": str(snap.spot_eth_usd), "spot_eth_30d_ago": ( str(snap.spot_eth_30d_ago) if snap.spot_eth_30d_ago else None ), "adx_14": str(snap.adx_14) if snap.adx_14 is not None else None, "dvol": str(snap.dvol), "funding_perp": str(snap.funding_perp), "funding_cross": str(snap.funding_cross), "macro_days_to_event": snap.macro_days_to_event, "eth_holdings_pct": str(snap.eth_holdings_pct), "portfolio_eur": str(snap.portfolio_eur), "capital_usd": str(capital_usd), } } if not decision.accepted: await _record_decision( ctx, inputs=inputs, outputs={"accepted": False, "reasons": decision.reasons}, action_taken="no_entry", notes="entry_validator", proposal_id=None, now=when, ) await alert.low( source="entry_cycle", message=f"entry rejected: {'; '.join(decision.reasons)}", ) return EntryCycleResult( status=_STATUS_NO_ENTRY, reason=";".join(decision.reasons) ) # 3. Bias — eth_30d_ago and adx_14 come from the historical snapshot # collected during the parallel snapshot stage. When either signal # is missing the bias function falls back to "no entry" (defensive # behaviour: never trade without confirmed regime data). if snap.spot_eth_30d_ago is None: await alert.medium( source="entry_cycle", message="historical spot unavailable — bias falls back to neutral", ) if snap.adx_14 is None: await alert.medium( source="entry_cycle", message="ADX unavailable — bias may reject iron_condor", ) trend_ctx = TrendContext( eth_now=snap.spot_eth_usd, eth_30d_ago=snap.spot_eth_30d_ago or snap.spot_eth_usd, funding_cross_annualized=snap.funding_cross, dvol_now=snap.dvol, adx_14=snap.adx_14 if snap.adx_14 is not None else Decimal("25"), ) bias = compute_bias(trend_ctx, cfg) if bias is None: await _record_decision( ctx, inputs=inputs, outputs={"bias": None}, action_taken="no_entry", notes="no_bias", proposal_id=None, now=when, ) await alert.low(source="entry_cycle", message="no directional bias") return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="no_bias") # 4. Chain → strikes expiry_from = when expiry_to = when + timedelta(days=cfg.structure.dte_max + 1) chain_meta = await ctx.deribit.options_chain( currency="ETH", expiry_from=expiry_from, expiry_to=expiry_to, min_open_interest=int(cfg.liquidity.open_interest_min), ) quotes = await _build_quotes(ctx.deribit, chain_meta) selection = select_strikes( chain=quotes, bias=bias, spot=snap.spot_eth_usd, now=when, cfg=cfg, dvol_now=snap.dvol, # §3.2 (A) — strike picker dipendente dal regime DVOL ) if selection is None: await _record_decision( ctx, inputs=inputs, outputs={"bias": bias, "n_quotes": len(quotes)}, action_taken="no_entry", notes="no_strike", proposal_id=None, now=when, ) await alert.low(source="entry_cycle", message="no strike candidate") return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="no_strike") short, long_ = selection # 5. Liquidity gate (uses raw bid/ask/depth from the same quotes) short_snap = InstrumentSnapshot( instrument=short.instrument, bid=short.bid, ask=short.ask, mid=short.mid, open_interest=short.open_interest, volume_24h=short.volume_24h, book_depth_top3=short.book_depth_top3, ) long_snap = InstrumentSnapshot( instrument=long_.instrument, bid=long_.bid, ask=long_.ask, mid=long_.mid, open_interest=long_.open_interest, volume_24h=long_.volume_24h, book_depth_top3=long_.book_depth_top3, ) credit_eth_per_contract = short.mid - long_.mid # 6. Sizing width_usd = (short.strike - long_.strike).copy_abs() sizing_ctx = SizingContext( capital_usd=capital_usd, max_loss_per_contract_usd=_max_loss_per_contract_usd( short.strike, long_.strike ), dvol_now=snap.dvol, open_engagement_usd=Decimal("0"), eur_to_usd=eur_to_usd_rate, other_open_positions=0, ) sizing = compute_contracts(sizing_ctx, cfg) if sizing.n_contracts < 1: await _record_decision( ctx, inputs=inputs, outputs={"sizing_reason": sizing.reason_if_zero}, action_taken="no_entry", notes="undersize", proposal_id=None, now=when, ) await alert.low( source="entry_cycle", message=f"undersize: {sizing.reason_if_zero}", ) return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="undersize") # 7. Liquidity check now that we know n_contracts liq = check( short_leg=short_snap, long_leg=long_snap, credit=credit_eth_per_contract * Decimal(sizing.n_contracts), n_contracts=sizing.n_contracts, cfg=cfg, ) if not liq.accepted: await _record_decision( ctx, inputs=inputs, outputs={"liquidity_reasons": liq.reasons}, action_taken="no_entry", notes="illiquid", proposal_id=None, now=when, ) await alert.low( source="entry_cycle", message=f"illiquid: {'; '.join(liq.reasons)}", ) return EntryCycleResult(status=_STATUS_NO_ENTRY, reason="illiquid") # 8. Build proposal + persist + place order proposal = build( short=short, long_=long_, n_contracts=sizing.n_contracts, spot=snap.spot_eth_usd, dvol=snap.dvol, cfg=cfg, now=when, spread_type=bias, ) pct_of_spot = ( width_usd / snap.spot_eth_usd if snap.spot_eth_usd > 0 else Decimal("0") ) record = PositionRecord( proposal_id=proposal.proposal_id, spread_type=bias, expiry=proposal.expiry, short_strike=short.strike, long_strike=long_.strike, short_instrument=short.instrument, long_instrument=long_.instrument, n_contracts=sizing.n_contracts, spread_width_usd=width_usd, spread_width_pct=pct_of_spot, credit_eth=proposal.credit_target_eth, credit_usd=proposal.credit_target_usd, max_loss_usd=proposal.max_loss_usd, spot_at_entry=snap.spot_eth_usd, dvol_at_entry=snap.dvol, delta_at_entry=short.delta, eth_price_at_entry=snap.spot_eth_usd, proposed_at=when, status="proposed", created_at=when, updated_at=when, ) conn = connect_state(ctx.db_path) try: with transaction(conn): ctx.repository.create_position(conn, record) finally: conn.close() legs = [ ComboLegOrder(instrument_name=short.instrument, direction="sell"), ComboLegOrder(instrument_name=long_.instrument, direction="buy"), ] try: order = await ctx.deribit.place_combo_order( legs=legs, side="sell", n_contracts=sizing.n_contracts, limit_price_eth=credit_eth_per_contract, label=f"bite-{proposal.proposal_id}", ) except Exception as exc: conn = connect_state(ctx.db_path) try: with transaction(conn): ctx.repository.update_position_status( conn, proposal.proposal_id, status="cancelled", closed_at=when, close_reason="broker_error", now=when, ) finally: conn.close() await alert.high( source="entry_cycle", message=f"place_combo_order failed: {type(exc).__name__}: {exc}", ) await _record_decision( ctx, inputs=inputs, outputs={"error": str(exc)}, action_taken="broker_error", notes=type(exc).__name__, proposal_id=str(proposal.proposal_id), now=when, ) return EntryCycleResult( status=_STATUS_BROKER_REJECT, reason=f"{type(exc).__name__}: {exc}", proposal=proposal, ) # 9. Persist instruction + update status next_status = "open" if order.state in {"filled", "open"} else "awaiting_fill" if order.state == "rejected": next_status = "cancelled" instruction_id = uuid4() conn = connect_state(ctx.db_path) try: with transaction(conn): ctx.repository.create_instruction( conn, InstructionRecord( instruction_id=instruction_id, proposal_id=proposal.proposal_id, kind="open_combo", payload_json=json.dumps(order.raw, default=str, sort_keys=True), sent_at=when, actual_fill_eth=order.average_price_eth, ), ) ctx.repository.update_position_status( conn, proposal.proposal_id, status=next_status, # type: ignore[arg-type] opened_at=when if next_status == "open" else None, closed_at=when if next_status == "cancelled" else None, close_reason="broker_reject" if next_status == "cancelled" else None, now=when, ) finally: conn.close() await _record_decision( ctx, inputs=inputs, outputs={ "n_contracts": sizing.n_contracts, "credit_eth": str(proposal.credit_target_eth), "max_loss_usd": str(proposal.max_loss_usd), "broker_state": order.state, }, action_taken="propose_open", notes=None, proposal_id=str(proposal.proposal_id), now=when, ) if next_status == "cancelled": await alert.high( source="entry_cycle", message=f"broker rejected combo order: state={order.state}", ) return EntryCycleResult( status=_STATUS_BROKER_REJECT, reason="broker_reject", proposal=proposal, order=order, ) await ctx.telegram.notify_position_opened( instrument=order.combo_instrument, side="SELL", size=sizing.n_contracts, strategy=bias, greeks={ "delta_short": short.delta, "credit_eth": proposal.credit_target_eth, "max_loss_usd": proposal.max_loss_usd, }, expected_pnl_usd=proposal.credit_target_usd, ) ctx.audit_log.append( event="ENTRY_PLACED", payload={ "proposal_id": str(proposal.proposal_id), "spread_type": bias, "n_contracts": sizing.n_contracts, "combo_instrument": order.combo_instrument, "broker_state": order.state, }, now=when, ) _log.info( "entry placed: proposal=%s combo=%s contracts=%d state=%s", proposal.proposal_id, order.combo_instrument, sizing.n_contracts, order.state, ) return EntryCycleResult( status=_STATUS_ENTRY_PLACED, reason=None, proposal=proposal, order=order, )