Phase 1: core algorithms
Implementa i sette algoritmi puri di docs/03-algorithms.md con disciplina TDD: 112 test, copertura statement+branch al 100% su core/ e config/, mypy --strict pulito, ruff pulito. Moduli: - config/schema.py: StrategyConfig Pydantic v2 con validatori di consistenza (kelly, delta, OTM, spread width, profit/stop). - core/types.py: OptionQuote e OptionLeg condivisi. - core/entry_validator.py: validate_entry (accumula motivi) e compute_bias (bull_put/bear_call/iron_condor/None). - core/liquidity_gate.py: check OI/volume/spread/depth + slippage stimato in % del credito. - core/sizing_engine.py: Quarter Kelly con cap 200/1000 EUR e bande DVOL. - core/combo_builder.py: select_strikes (DTE/OTM/delta/width/credit) e build (ComboProposal con credit/max_loss/breakeven). - core/greeks_aggregator.py: somma firmata BUY/SELL, theta in USD. - core/exit_decision.py: 6 trigger ordinati con eccezione skip-time vicino a profit (mark in (50%,70%] credito). - core/kelly_recalibration.py: full/quarter Kelly, confidence per sample size, blend medio in fascia 30-99 trade. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
"""Pure rule-engine algorithms (no I/O, no LLM, no clock).
|
||||
|
||||
Every module here is deterministic: same input → same output. Inputs are
|
||||
typed via Pydantic models, outputs likewise. The orchestrator in
|
||||
:mod:`cerbero_bite.runtime` is the only layer allowed to bring in real
|
||||
data and call these functions.
|
||||
"""
|
||||
|
||||
@@ -0,0 +1,253 @@
|
||||
"""Strike selection and combo construction (``docs/03-algorithms.md §4``).
|
||||
|
||||
Two responsibilities:
|
||||
|
||||
* :func:`select_strikes` — given a full option chain and a directional
|
||||
bias, return the (short, long) option quotes that satisfy the
|
||||
documented selection rules, or ``None`` when no candidate exists.
|
||||
* :func:`build` — assemble a :class:`ComboProposal` ready to be sent to
|
||||
Cerbero core, with credit, max-loss and breakeven precomputed.
|
||||
|
||||
Iron condor selection is intentionally out of scope here; it is built by
|
||||
the orchestrator as two independent vertical spreads.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from cerbero_bite.config import SpreadType, StrategyConfig
|
||||
from cerbero_bite.core.types import OptionLeg, OptionQuote, PutOrCall
|
||||
|
||||
__all__ = ["ComboProposal", "build", "select_strikes"]
|
||||
|
||||
|
||||
class ComboProposal(BaseModel):
|
||||
"""Trade proposal ready for Telegram pre-trade and Cerbero-core dispatch."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
proposal_id: UUID = Field(default_factory=uuid4)
|
||||
spread_type: SpreadType
|
||||
legs: list[OptionLeg]
|
||||
|
||||
credit_target_eth: Decimal
|
||||
credit_target_usd: Decimal
|
||||
max_loss_eth: Decimal
|
||||
max_loss_usd: Decimal
|
||||
breakeven: Decimal
|
||||
|
||||
spot_at_proposal: Decimal
|
||||
dvol_at_proposal: Decimal
|
||||
expiry: datetime
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# select_strikes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _option_type_for_bias(bias: SpreadType) -> PutOrCall | None:
|
||||
if bias == "bull_put":
|
||||
return "P"
|
||||
if bias == "bear_call":
|
||||
return "C"
|
||||
return None # iron_condor handled at orchestrator level
|
||||
|
||||
|
||||
def _dte_days(now: datetime, expiry: datetime) -> int:
|
||||
"""Calendar days between *now* and *expiry*, floored to int (≥ 0)."""
|
||||
delta = expiry - now
|
||||
return delta.days
|
||||
|
||||
|
||||
def _pick_expiry(
|
||||
chain: list[OptionQuote],
|
||||
*,
|
||||
now: datetime,
|
||||
cfg: StrategyConfig,
|
||||
) -> datetime | None:
|
||||
"""Return the expiry whose DTE is in range and closest to ``dte_target``."""
|
||||
sc = cfg.structure
|
||||
candidates: dict[datetime, int] = {}
|
||||
for q in chain:
|
||||
dte = _dte_days(now, q.expiry)
|
||||
if sc.dte_min <= dte <= sc.dte_max:
|
||||
candidates.setdefault(q.expiry, dte)
|
||||
if not candidates:
|
||||
return None
|
||||
return min(candidates, key=lambda exp: abs(candidates[exp] - sc.dte_target))
|
||||
|
||||
|
||||
def _select_short(
|
||||
quotes: list[OptionQuote],
|
||||
*,
|
||||
spot: Decimal,
|
||||
cfg: StrategyConfig,
|
||||
) -> OptionQuote | None:
|
||||
"""Pick the short-leg quote with delta closest to target inside both bands."""
|
||||
sc = cfg.structure.short_strike
|
||||
eligible: list[OptionQuote] = []
|
||||
for q in quotes:
|
||||
dist = (q.strike - spot).copy_abs() / spot
|
||||
if not (sc.distance_otm_pct_min <= dist <= sc.distance_otm_pct_max):
|
||||
continue
|
||||
abs_delta = q.delta.copy_abs()
|
||||
if not (sc.delta_min <= abs_delta <= sc.delta_max):
|
||||
continue
|
||||
eligible.append(q)
|
||||
if not eligible:
|
||||
return None
|
||||
return min(eligible, key=lambda q: abs(q.delta.copy_abs() - sc.delta_target))
|
||||
|
||||
|
||||
def _select_long(
|
||||
quotes: list[OptionQuote],
|
||||
*,
|
||||
short: OptionQuote,
|
||||
spot: Decimal,
|
||||
bias: SpreadType,
|
||||
cfg: StrategyConfig,
|
||||
) -> OptionQuote | None:
|
||||
"""Pick the long-leg quote whose distance from short matches the target width."""
|
||||
sw = cfg.structure.spread_width
|
||||
width_target = spot * sw.target_pct_of_spot
|
||||
width_min = spot * sw.min_pct_of_spot
|
||||
width_max = spot * sw.max_pct_of_spot
|
||||
|
||||
if bias == "bull_put":
|
||||
target_strike = short.strike - width_target
|
||||
candidates = [q for q in quotes if q.strike < short.strike]
|
||||
else: # bear_call
|
||||
target_strike = short.strike + width_target
|
||||
candidates = [q for q in quotes if q.strike > short.strike]
|
||||
|
||||
if not candidates:
|
||||
return None
|
||||
|
||||
nearest = min(candidates, key=lambda q: (q.strike - target_strike).copy_abs())
|
||||
width = (short.strike - nearest.strike).copy_abs()
|
||||
if not (width_min <= width <= width_max):
|
||||
return None
|
||||
return nearest
|
||||
|
||||
|
||||
def select_strikes(
|
||||
*,
|
||||
chain: list[OptionQuote],
|
||||
bias: SpreadType,
|
||||
spot: Decimal,
|
||||
now: datetime,
|
||||
cfg: StrategyConfig,
|
||||
) -> tuple[OptionQuote, OptionQuote] | None:
|
||||
"""Return the (short, long) quotes for the requested vertical, or ``None``.
|
||||
|
||||
Iron condor is *not* built here: callers should request the two
|
||||
legs (bull_put + bear_call) separately when they need an IC.
|
||||
"""
|
||||
opt_type = _option_type_for_bias(bias)
|
||||
if opt_type is None:
|
||||
return None
|
||||
|
||||
expiry = _pick_expiry(chain, now=now, cfg=cfg)
|
||||
if expiry is None:
|
||||
return None
|
||||
|
||||
typed = [q for q in chain if q.expiry == expiry and q.option_type == opt_type]
|
||||
if not typed:
|
||||
return None
|
||||
|
||||
short = _select_short(typed, spot=spot, cfg=cfg)
|
||||
if short is None:
|
||||
return None
|
||||
|
||||
long_candidates = [q for q in typed if q.instrument != short.instrument]
|
||||
long_ = _select_long(long_candidates, short=short, spot=spot, bias=bias, cfg=cfg)
|
||||
if long_ is None:
|
||||
return None
|
||||
|
||||
width_usd = (short.strike - long_.strike).copy_abs()
|
||||
credit_eth = short.mid - long_.mid
|
||||
# credit ≤ 0 → ratio non-positive < min → falls through to None below.
|
||||
credit_usd = credit_eth * spot
|
||||
if (credit_usd / width_usd) < cfg.structure.credit_to_width_ratio_min:
|
||||
return None
|
||||
|
||||
return short, long_
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_leg(
|
||||
quote: OptionQuote,
|
||||
*,
|
||||
side: str,
|
||||
n_contracts: int,
|
||||
) -> OptionLeg:
|
||||
return OptionLeg(
|
||||
instrument=quote.instrument,
|
||||
side=side, # type: ignore[arg-type]
|
||||
strike=quote.strike,
|
||||
expiry=quote.expiry,
|
||||
type=quote.option_type,
|
||||
size=n_contracts,
|
||||
mid_price_eth=quote.mid,
|
||||
delta=quote.delta,
|
||||
gamma=quote.gamma,
|
||||
theta=quote.theta,
|
||||
vega=quote.vega,
|
||||
)
|
||||
|
||||
|
||||
def build(
|
||||
*,
|
||||
short: OptionQuote,
|
||||
long_: OptionQuote,
|
||||
n_contracts: int,
|
||||
spot: Decimal,
|
||||
dvol: Decimal,
|
||||
cfg: StrategyConfig, # noqa: ARG001 — kept for symmetry with select_strikes
|
||||
now: datetime, # noqa: ARG001 — opening time captured by orchestrator
|
||||
spread_type: SpreadType,
|
||||
) -> ComboProposal:
|
||||
"""Assemble a :class:`ComboProposal` from the two selected quotes."""
|
||||
width_per_contract_usd = (short.strike - long_.strike).copy_abs()
|
||||
credit_per_contract_eth = short.mid - long_.mid
|
||||
credit_per_contract_usd = credit_per_contract_eth * spot
|
||||
max_loss_per_contract_usd = width_per_contract_usd - credit_per_contract_usd
|
||||
|
||||
n_dec = Decimal(n_contracts)
|
||||
credit_target_eth = credit_per_contract_eth * n_dec
|
||||
credit_target_usd = credit_per_contract_usd * n_dec
|
||||
max_loss_usd = max_loss_per_contract_usd * n_dec
|
||||
max_loss_eth = max_loss_usd / spot if spot > 0 else Decimal("0")
|
||||
|
||||
if spread_type == "bull_put":
|
||||
breakeven = short.strike - credit_per_contract_usd
|
||||
else: # bear_call
|
||||
breakeven = short.strike + credit_per_contract_usd
|
||||
|
||||
legs = [
|
||||
_make_leg(short, side="SELL", n_contracts=n_contracts),
|
||||
_make_leg(long_, side="BUY", n_contracts=n_contracts),
|
||||
]
|
||||
|
||||
return ComboProposal(
|
||||
spread_type=spread_type,
|
||||
legs=legs,
|
||||
credit_target_eth=credit_target_eth,
|
||||
credit_target_usd=credit_target_usd,
|
||||
max_loss_eth=max_loss_eth,
|
||||
max_loss_usd=max_loss_usd,
|
||||
breakeven=breakeven,
|
||||
spot_at_proposal=spot,
|
||||
dvol_at_proposal=dvol,
|
||||
expiry=short.expiry,
|
||||
)
|
||||
@@ -0,0 +1,155 @@
|
||||
"""Pure functions for the *entry* phase of the decision loop.
|
||||
|
||||
Two responsibilities (both deterministic, no I/O):
|
||||
|
||||
* :func:`validate_entry` — accumulate all blocking reasons that come from
|
||||
the access filters in ``docs/01-strategy-rules.md §2``.
|
||||
* :func:`compute_bias` — translate market trend/funding/regime into a
|
||||
:data:`SpreadType` choice per ``docs/01-strategy-rules.md §3.1``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from decimal import Decimal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from cerbero_bite.config import SpreadType, StrategyConfig
|
||||
|
||||
__all__ = [
|
||||
"EntryContext",
|
||||
"EntryDecision",
|
||||
"TrendContext",
|
||||
"compute_bias",
|
||||
"validate_entry",
|
||||
]
|
||||
|
||||
|
||||
class EntryContext(BaseModel):
|
||||
"""Snapshot of the inputs needed to decide whether to open a trade."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
capital_usd: Decimal
|
||||
dvol_now: Decimal
|
||||
funding_perp_annualized: Decimal
|
||||
eth_holdings_pct_of_portfolio: Decimal
|
||||
next_macro_event_in_days: int | None
|
||||
has_open_position: bool
|
||||
|
||||
|
||||
class EntryDecision(BaseModel):
|
||||
"""Result of :func:`validate_entry`. ``reasons`` holds *all* blocking reasons."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
accepted: bool
|
||||
reasons: list[str]
|
||||
|
||||
|
||||
class TrendContext(BaseModel):
|
||||
"""Market regime inputs for :func:`compute_bias`."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
eth_now: Decimal
|
||||
eth_30d_ago: Decimal
|
||||
funding_cross_annualized: Decimal
|
||||
dvol_now: Decimal
|
||||
adx_14: Decimal
|
||||
|
||||
|
||||
def validate_entry(ctx: EntryContext, cfg: StrategyConfig) -> EntryDecision:
|
||||
"""Return the entry decision, collecting *every* failing condition.
|
||||
|
||||
Order matches the documentation but does not short-circuit: callers
|
||||
receive the full list of reasons so the report message can explain
|
||||
why a trade was skipped without needing multiple passes.
|
||||
"""
|
||||
reasons: list[str] = []
|
||||
entry_cfg = cfg.entry
|
||||
structure_cfg = cfg.structure
|
||||
|
||||
if ctx.has_open_position:
|
||||
reasons.append("open position already exists")
|
||||
|
||||
if ctx.capital_usd < entry_cfg.capital_min_usd:
|
||||
reasons.append(
|
||||
f"capital below minimum ({ctx.capital_usd} < {entry_cfg.capital_min_usd})"
|
||||
)
|
||||
|
||||
if ctx.dvol_now < entry_cfg.dvol_min:
|
||||
reasons.append(f"dvol too low ({ctx.dvol_now} < {entry_cfg.dvol_min})")
|
||||
elif ctx.dvol_now > entry_cfg.dvol_max:
|
||||
reasons.append(f"dvol too high ({ctx.dvol_now} > {entry_cfg.dvol_max})")
|
||||
|
||||
if (
|
||||
ctx.next_macro_event_in_days is not None
|
||||
and ctx.next_macro_event_in_days <= structure_cfg.dte_target
|
||||
):
|
||||
reasons.append(
|
||||
f"macro event within DTE window ({ctx.next_macro_event_in_days} days)"
|
||||
)
|
||||
|
||||
if abs(ctx.funding_perp_annualized) > entry_cfg.funding_perp_abs_max_annualized:
|
||||
reasons.append(
|
||||
f"funding rate beyond cap ({ctx.funding_perp_annualized} vs "
|
||||
f"±{entry_cfg.funding_perp_abs_max_annualized})"
|
||||
)
|
||||
|
||||
if ctx.eth_holdings_pct_of_portfolio > entry_cfg.eth_holdings_pct_max:
|
||||
reasons.append(
|
||||
f"eth holdings above cap ({ctx.eth_holdings_pct_of_portfolio} > "
|
||||
f"{entry_cfg.eth_holdings_pct_max})"
|
||||
)
|
||||
|
||||
return EntryDecision(accepted=not reasons, reasons=reasons)
|
||||
|
||||
|
||||
def _trend_signal(
|
||||
pct_change: Decimal,
|
||||
bull_threshold: Decimal,
|
||||
bear_threshold: Decimal,
|
||||
) -> str:
|
||||
"""Classify a percentage change into ``bull`` / ``bear`` / ``neutral``."""
|
||||
if pct_change >= bull_threshold:
|
||||
return "bull"
|
||||
if pct_change <= bear_threshold:
|
||||
return "bear"
|
||||
return "neutral"
|
||||
|
||||
|
||||
def compute_bias(ctx: TrendContext, cfg: StrategyConfig) -> SpreadType | None:
|
||||
"""Return the spread type to attempt, or ``None`` to skip the week."""
|
||||
entry_cfg = cfg.entry
|
||||
|
||||
if ctx.eth_30d_ago <= 0:
|
||||
# Invalid market history: refuse to opine on bias.
|
||||
return None
|
||||
trend_pct = ctx.eth_now / ctx.eth_30d_ago - Decimal("1")
|
||||
trend = _trend_signal(
|
||||
trend_pct,
|
||||
entry_cfg.trend_bull_threshold_pct,
|
||||
entry_cfg.trend_bear_threshold_pct,
|
||||
)
|
||||
|
||||
funding = _trend_signal(
|
||||
ctx.funding_cross_annualized,
|
||||
entry_cfg.funding_bull_threshold_annualized,
|
||||
entry_cfg.funding_bear_threshold_annualized,
|
||||
)
|
||||
|
||||
if trend == "bull" and funding == "bull":
|
||||
return "bull_put"
|
||||
if trend == "bear" and funding == "bear":
|
||||
return "bear_call"
|
||||
if trend == "neutral" and funding == "neutral":
|
||||
if (
|
||||
ctx.dvol_now >= entry_cfg.iron_condor_dvol_min
|
||||
and ctx.adx_14 < entry_cfg.iron_condor_adx_max
|
||||
):
|
||||
return "iron_condor"
|
||||
return None
|
||||
# All remaining combinations (discordant or one-neutral-one-directional)
|
||||
# explicitly fall through to "no entry".
|
||||
return None
|
||||
@@ -0,0 +1,158 @@
|
||||
"""Exit-decision rule engine (``docs/03-algorithms.md §6``).
|
||||
|
||||
Pure function over a :class:`PositionSnapshot`. Triggers are evaluated
|
||||
in the documented order; the first match wins. ``HOLD`` is returned
|
||||
when no rule fires. The time-stop ``skip_if_close_to_profit`` exception
|
||||
is interpreted as ``mark ≤ 70% × credit_received`` (re-using the same
|
||||
units as the profit gate), which is the only interpretation that yields
|
||||
non-trivial behaviour: rule 1 takes everything below 50% credit; the
|
||||
exception covers the (50%, 70%] credit band where we wait for rule 1
|
||||
to fire next cycle.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
from typing import Literal
|
||||
from uuid import UUID
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from cerbero_bite.config import SpreadType, StrategyConfig
|
||||
from cerbero_bite.core.types import OptionLeg
|
||||
|
||||
__all__ = ["ExitAction", "ExitDecisionResult", "PositionSnapshot", "evaluate"]
|
||||
|
||||
|
||||
ExitAction = Literal[
|
||||
"HOLD",
|
||||
"CLOSE_PROFIT",
|
||||
"CLOSE_STOP",
|
||||
"CLOSE_VOL",
|
||||
"CLOSE_TIME",
|
||||
"CLOSE_DELTA",
|
||||
"CLOSE_AVERSE",
|
||||
]
|
||||
|
||||
|
||||
class PositionSnapshot(BaseModel):
|
||||
"""Inputs for :func:`evaluate`. All ETH amounts are *totals* for the position."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
proposal_id: UUID
|
||||
spread_type: SpreadType
|
||||
legs: list[OptionLeg]
|
||||
|
||||
credit_received_eth: Decimal
|
||||
credit_received_usd: Decimal
|
||||
spot_at_entry: Decimal
|
||||
dvol_at_entry: Decimal
|
||||
expiry: datetime
|
||||
opened_at: datetime
|
||||
|
||||
eth_price_usd_now: Decimal
|
||||
spot_now: Decimal
|
||||
dvol_now: Decimal
|
||||
mark_combo_now_eth: Decimal
|
||||
delta_short_now: Decimal
|
||||
return_4h_now: Decimal
|
||||
now: datetime
|
||||
|
||||
|
||||
class ExitDecisionResult(BaseModel):
|
||||
"""Action + human-readable reason + PnL estimate at the moment of decision."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
action: ExitAction
|
||||
reason: str
|
||||
pnl_estimate_eth: Decimal
|
||||
pnl_estimate_usd: Decimal
|
||||
|
||||
|
||||
def _days_to_expiry(snapshot: PositionSnapshot) -> Decimal:
|
||||
delta = snapshot.expiry - snapshot.now
|
||||
return Decimal(str(delta.total_seconds())) / Decimal("86400")
|
||||
|
||||
|
||||
def _adverse_move(spread_type: SpreadType, return_4h: Decimal, threshold: Decimal) -> bool:
|
||||
if spread_type == "bull_put":
|
||||
return return_4h <= -threshold
|
||||
if spread_type == "bear_call":
|
||||
return return_4h >= threshold
|
||||
# iron_condor: adverse on either side
|
||||
return return_4h.copy_abs() >= threshold
|
||||
|
||||
|
||||
def evaluate(snapshot: PositionSnapshot, cfg: StrategyConfig) -> ExitDecisionResult:
|
||||
"""Return the exit action for the given position snapshot."""
|
||||
ec = cfg.exit
|
||||
credit = snapshot.credit_received_eth
|
||||
debit = snapshot.mark_combo_now_eth
|
||||
|
||||
pnl_eth = credit - debit
|
||||
pnl_usd = pnl_eth * snapshot.eth_price_usd_now
|
||||
|
||||
profit_take_thresh = credit * ec.profit_take_pct_of_credit
|
||||
stop_thresh = credit * ec.stop_loss_mark_x_credit
|
||||
skip_time_thresh = credit * ec.time_stop_skip_if_close_to_profit_pct
|
||||
days_left = _days_to_expiry(snapshot)
|
||||
|
||||
def _result(action: ExitAction, reason: str) -> ExitDecisionResult:
|
||||
return ExitDecisionResult(
|
||||
action=action,
|
||||
reason=reason,
|
||||
pnl_estimate_eth=pnl_eth,
|
||||
pnl_estimate_usd=pnl_usd,
|
||||
)
|
||||
|
||||
# 1. Profit take
|
||||
if debit <= profit_take_thresh:
|
||||
return _result(
|
||||
"CLOSE_PROFIT",
|
||||
f"mark {debit} ≤ {ec.profit_take_pct_of_credit:.0%} of credit {credit}",
|
||||
)
|
||||
|
||||
# 2. Stop loss
|
||||
if debit >= stop_thresh:
|
||||
return _result(
|
||||
"CLOSE_STOP",
|
||||
f"mark {debit} ≥ {ec.stop_loss_mark_x_credit}× credit {credit}",
|
||||
)
|
||||
|
||||
# 3. Vol stop
|
||||
if snapshot.dvol_now >= snapshot.dvol_at_entry + ec.vol_stop_dvol_increase:
|
||||
return _result(
|
||||
"CLOSE_VOL",
|
||||
f"DVOL {snapshot.dvol_now} ≥ entry {snapshot.dvol_at_entry} "
|
||||
f"+ {ec.vol_stop_dvol_increase}",
|
||||
)
|
||||
|
||||
# 4. Time stop with "close to profit" exception
|
||||
if days_left <= ec.time_stop_dte_remaining and debit > skip_time_thresh:
|
||||
return _result(
|
||||
"CLOSE_TIME",
|
||||
f"DTE {days_left:.2f} ≤ {ec.time_stop_dte_remaining} and mark "
|
||||
f"{debit} above skip threshold {skip_time_thresh}",
|
||||
)
|
||||
# When DTE ≤ 7 but mark is in the (50%, 70%]-credit "close to profit"
|
||||
# zone, we deliberately fall through; rule 1 will fire next cycle.
|
||||
|
||||
# 5. Strike tested
|
||||
if snapshot.delta_short_now.copy_abs() >= ec.delta_breach_threshold:
|
||||
return _result(
|
||||
"CLOSE_DELTA",
|
||||
f"|delta_short| {snapshot.delta_short_now.copy_abs()} ≥ "
|
||||
f"{ec.delta_breach_threshold}",
|
||||
)
|
||||
|
||||
# 6. Explosive adverse move
|
||||
if _adverse_move(snapshot.spread_type, snapshot.return_4h_now, ec.adverse_move_4h_pct):
|
||||
return _result(
|
||||
"CLOSE_AVERSE",
|
||||
f"4h return {snapshot.return_4h_now} adverse for {snapshot.spread_type}",
|
||||
)
|
||||
|
||||
return _result("HOLD", "all triggers within tolerance")
|
||||
@@ -0,0 +1,63 @@
|
||||
"""Aggregate greeks for a multi-leg position (``docs/03-algorithms.md §5``).
|
||||
|
||||
Pure summation with the BUY/SELL sign and the leg size. Theta is
|
||||
converted to USD/day; the other greeks are returned in the same units
|
||||
they came in (per-contract, dimensionless or fractional).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from decimal import Decimal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from cerbero_bite.core.types import OptionLeg
|
||||
|
||||
__all__ = ["AggregateGreeks", "aggregate"]
|
||||
|
||||
|
||||
class AggregateGreeks(BaseModel):
|
||||
"""Net greeks for an option spread."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
delta_net: Decimal
|
||||
gamma_net: Decimal
|
||||
theta_net: Decimal # USD per day
|
||||
vega_net: Decimal
|
||||
|
||||
|
||||
def _sign(side: str) -> Decimal:
|
||||
return Decimal("1") if side == "BUY" else Decimal("-1")
|
||||
|
||||
|
||||
def aggregate(
|
||||
*,
|
||||
legs: list[OptionLeg],
|
||||
eth_price_usd: Decimal,
|
||||
) -> AggregateGreeks:
|
||||
"""Return :class:`AggregateGreeks` summed with side and size weights.
|
||||
|
||||
``theta`` is multiplied by ``eth_price_usd`` to convert from
|
||||
ETH/day (Deribit native) to USD/day, which is what the report
|
||||
consumes.
|
||||
"""
|
||||
delta = Decimal("0")
|
||||
gamma = Decimal("0")
|
||||
theta_eth = Decimal("0")
|
||||
vega = Decimal("0")
|
||||
|
||||
for leg in legs:
|
||||
sign = _sign(leg.side)
|
||||
weight = sign * Decimal(leg.size)
|
||||
delta += weight * leg.delta
|
||||
gamma += weight * leg.gamma
|
||||
theta_eth += weight * leg.theta
|
||||
vega += weight * leg.vega
|
||||
|
||||
return AggregateGreeks(
|
||||
delta_net=delta,
|
||||
gamma_net=gamma,
|
||||
theta_net=theta_eth * eth_price_usd,
|
||||
vega_net=vega,
|
||||
)
|
||||
@@ -0,0 +1,152 @@
|
||||
"""Monthly Kelly recalibration (``docs/03-algorithms.md §7``).
|
||||
|
||||
Pure function over a list of closed-trade records. Returns a fresh
|
||||
:class:`KellyResult` summarising the empirical edge and a recommendation
|
||||
that is *never* applied automatically — the orchestrator surfaces it as
|
||||
a monthly report and Adriano decides whether to bump
|
||||
``sizing.kelly_fraction`` in ``strategy.yaml``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from decimal import Decimal
|
||||
from typing import Literal
|
||||
from uuid import UUID
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from cerbero_bite.config import StrategyConfig
|
||||
|
||||
__all__ = ["Confidence", "KellyResult", "TradeRecord", "recalibrate"]
|
||||
|
||||
|
||||
Confidence = Literal["low", "medium", "high"]
|
||||
|
||||
|
||||
class TradeRecord(BaseModel):
|
||||
"""One closed trade fed to the recalibration."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
proposal_id: UUID
|
||||
pnl_usd: Decimal
|
||||
risk_usd: Decimal
|
||||
closed_at: datetime
|
||||
outcome: str
|
||||
|
||||
|
||||
class KellyResult(BaseModel):
|
||||
"""Recommendation report; the operator may or may not apply it."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
win_rate: Decimal
|
||||
avg_win_pct_risk: Decimal
|
||||
avg_loss_pct_risk: Decimal
|
||||
full_kelly_pct: Decimal
|
||||
quarter_kelly_pct: Decimal
|
||||
sample_size: int
|
||||
recommended_fraction: Decimal
|
||||
confidence: Confidence
|
||||
|
||||
|
||||
def _within_lookback(
|
||||
trades: list[TradeRecord],
|
||||
*,
|
||||
now: datetime,
|
||||
lookback_days: int,
|
||||
) -> list[TradeRecord]:
|
||||
cutoff = now - timedelta(days=lookback_days)
|
||||
return [t for t in trades if t.closed_at >= cutoff]
|
||||
|
||||
|
||||
def _confidence_for(
|
||||
n: int,
|
||||
*,
|
||||
low_threshold: int,
|
||||
high_threshold: int,
|
||||
) -> Confidence:
|
||||
if n < low_threshold:
|
||||
return "low"
|
||||
if n < high_threshold:
|
||||
return "medium"
|
||||
return "high"
|
||||
|
||||
|
||||
def recalibrate(
|
||||
*,
|
||||
trades: list[TradeRecord],
|
||||
now: datetime,
|
||||
cfg: StrategyConfig,
|
||||
) -> KellyResult:
|
||||
"""Return the empirical Kelly summary + recommended fraction."""
|
||||
krc = cfg.kelly_recalibration
|
||||
in_window = _within_lookback(trades, now=now, lookback_days=krc.lookback_days)
|
||||
n = len(in_window)
|
||||
|
||||
if n == 0:
|
||||
return KellyResult(
|
||||
win_rate=Decimal("0"),
|
||||
avg_win_pct_risk=Decimal("0"),
|
||||
avg_loss_pct_risk=Decimal("0"),
|
||||
full_kelly_pct=Decimal("0"),
|
||||
quarter_kelly_pct=Decimal("0"),
|
||||
sample_size=0,
|
||||
recommended_fraction=cfg.sizing.kelly_fraction,
|
||||
confidence="low",
|
||||
)
|
||||
|
||||
wins = [t for t in in_window if t.pnl_usd > 0]
|
||||
losses = [t for t in in_window if t.pnl_usd < 0]
|
||||
win_rate = Decimal(len(wins)) / Decimal(n)
|
||||
|
||||
avg_win = (
|
||||
sum((t.pnl_usd / t.risk_usd for t in wins), Decimal("0")) / Decimal(len(wins))
|
||||
if wins
|
||||
else Decimal("0")
|
||||
)
|
||||
avg_loss = (
|
||||
sum((-t.pnl_usd / t.risk_usd for t in losses), Decimal("0")) / Decimal(len(losses))
|
||||
if losses
|
||||
else Decimal("0")
|
||||
)
|
||||
|
||||
if avg_loss == 0:
|
||||
# No losses — fall back to win_rate as an upper bound on full Kelly.
|
||||
full_kelly = win_rate
|
||||
else:
|
||||
b = avg_win / avg_loss
|
||||
if b == 0:
|
||||
full_kelly = Decimal("0")
|
||||
else:
|
||||
full_kelly = (win_rate * b - (Decimal("1") - win_rate)) / b
|
||||
if full_kelly < 0:
|
||||
full_kelly = Decimal("0")
|
||||
|
||||
quarter_kelly = full_kelly * Decimal("0.25")
|
||||
|
||||
confidence = _confidence_for(
|
||||
n,
|
||||
low_threshold=krc.min_sample_low_confidence,
|
||||
high_threshold=krc.min_sample_high_confidence,
|
||||
)
|
||||
|
||||
if confidence == "low":
|
||||
recommended = cfg.sizing.kelly_fraction
|
||||
elif confidence == "medium":
|
||||
weight = krc.weight_when_medium_confidence
|
||||
recommended = weight * quarter_kelly + (Decimal("1") - weight) * cfg.sizing.kelly_fraction
|
||||
else:
|
||||
recommended = quarter_kelly
|
||||
|
||||
return KellyResult(
|
||||
win_rate=win_rate,
|
||||
avg_win_pct_risk=avg_win,
|
||||
avg_loss_pct_risk=avg_loss,
|
||||
full_kelly_pct=full_kelly,
|
||||
quarter_kelly_pct=quarter_kelly,
|
||||
sample_size=n,
|
||||
recommended_fraction=recommended,
|
||||
confidence=confidence,
|
||||
)
|
||||
@@ -0,0 +1,135 @@
|
||||
"""Pre-trade liquidity gate (``docs/03-algorithms.md §2``).
|
||||
|
||||
Validates each leg against open-interest, volume, bid-ask, depth
|
||||
thresholds and computes the expected slippage as a percentage of the
|
||||
credit. Returns the full list of failing reasons together with the
|
||||
slippage estimate so the report can render a precise rejection.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from decimal import Decimal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from cerbero_bite.config import StrategyConfig
|
||||
|
||||
__all__ = ["InstrumentSnapshot", "LiquidityCheck", "check"]
|
||||
|
||||
|
||||
class InstrumentSnapshot(BaseModel):
|
||||
"""Per-instrument market snapshot used by the liquidity check."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
instrument: str
|
||||
bid: Decimal
|
||||
ask: Decimal
|
||||
mid: Decimal
|
||||
open_interest: int
|
||||
volume_24h: int
|
||||
book_depth_top3: int
|
||||
|
||||
|
||||
class LiquidityCheck(BaseModel):
|
||||
"""Outcome of :func:`check`: accepted flag, full reasons, slippage estimate."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
accepted: bool
|
||||
reasons: list[str]
|
||||
estimated_slippage_pct_of_credit: Decimal
|
||||
|
||||
|
||||
def _check_leg(
|
||||
snap: InstrumentSnapshot,
|
||||
*,
|
||||
label: str,
|
||||
cfg: StrategyConfig,
|
||||
) -> list[str]:
|
||||
"""Return the list of failing reasons for a single leg."""
|
||||
reasons: list[str] = []
|
||||
liq = cfg.liquidity
|
||||
|
||||
if snap.open_interest < liq.open_interest_min:
|
||||
reasons.append(
|
||||
f"{label} open interest below threshold "
|
||||
f"({snap.open_interest} < {liq.open_interest_min})"
|
||||
)
|
||||
if snap.volume_24h < liq.volume_24h_min:
|
||||
reasons.append(
|
||||
f"{label} 24h volume below threshold "
|
||||
f"({snap.volume_24h} < {liq.volume_24h_min})"
|
||||
)
|
||||
if snap.book_depth_top3 < liq.book_depth_top3_min:
|
||||
reasons.append(
|
||||
f"{label} book depth top3 below threshold "
|
||||
f"({snap.book_depth_top3} < {liq.book_depth_top3_min})"
|
||||
)
|
||||
|
||||
if snap.mid > 0:
|
||||
spread_pct = (snap.ask - snap.bid) / snap.mid
|
||||
if spread_pct > liq.bid_ask_spread_pct_max:
|
||||
reasons.append(
|
||||
f"{label} bid-ask spread {spread_pct:.4f} above cap "
|
||||
f"{liq.bid_ask_spread_pct_max}"
|
||||
)
|
||||
else:
|
||||
reasons.append(f"{label} mid price not positive ({snap.mid})")
|
||||
|
||||
return reasons
|
||||
|
||||
|
||||
def check(
|
||||
*,
|
||||
short_leg: InstrumentSnapshot,
|
||||
long_leg: InstrumentSnapshot,
|
||||
credit: Decimal,
|
||||
n_contracts: int,
|
||||
cfg: StrategyConfig,
|
||||
) -> LiquidityCheck:
|
||||
"""Validate the liquidity of a 2-leg vertical spread.
|
||||
|
||||
Args:
|
||||
short_leg: the leg the engine intends to *sell*.
|
||||
long_leg: the leg the engine intends to *buy* as protection.
|
||||
credit: net mid-price credit of the combo, in ETH.
|
||||
n_contracts: number of combo contracts that will be sent.
|
||||
cfg: validated strategy configuration.
|
||||
|
||||
Returns:
|
||||
:class:`LiquidityCheck` with ``accepted`` set when every leg
|
||||
passes its thresholds *and* the estimated slippage is within
|
||||
the configured cap.
|
||||
"""
|
||||
reasons: list[str] = _check_leg(short_leg, label="short leg", cfg=cfg)
|
||||
reasons.extend(_check_leg(long_leg, label="long leg", cfg=cfg))
|
||||
|
||||
if n_contracts <= 0:
|
||||
reasons.append(f"non-positive number of contracts ({n_contracts})")
|
||||
|
||||
if credit <= 0:
|
||||
reasons.append(f"non-positive credit ({credit})")
|
||||
# Cannot compute slippage_pct without a positive credit.
|
||||
return LiquidityCheck(
|
||||
accepted=False,
|
||||
reasons=reasons,
|
||||
estimated_slippage_pct_of_credit=Decimal("0"),
|
||||
)
|
||||
|
||||
slippage_eth_per_contract = (short_leg.ask - short_leg.mid) + (long_leg.mid - long_leg.bid)
|
||||
n_for_slippage = max(n_contracts, 0)
|
||||
slippage_total_eth = slippage_eth_per_contract * Decimal(n_for_slippage)
|
||||
pct = slippage_total_eth / credit
|
||||
|
||||
if pct > cfg.liquidity.slippage_pct_of_credit_max:
|
||||
reasons.append(
|
||||
f"estimated slippage {pct:.4f} above cap "
|
||||
f"{cfg.liquidity.slippage_pct_of_credit_max}"
|
||||
)
|
||||
|
||||
return LiquidityCheck(
|
||||
accepted=not reasons,
|
||||
reasons=reasons,
|
||||
estimated_slippage_pct_of_credit=pct,
|
||||
)
|
||||
@@ -0,0 +1,92 @@
|
||||
"""Quarter-Kelly sizing with caps (``docs/03-algorithms.md §3``).
|
||||
|
||||
Pure function: given capital, max-loss-per-contract, DVOL and the
|
||||
aggregate state, returns the number of contracts to send. Returns
|
||||
``n_contracts == 0`` together with a human-readable ``reason_if_zero``
|
||||
when no entry is allowed; the orchestrator never opens a position when
|
||||
``n_contracts == 0``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from decimal import Decimal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from cerbero_bite.config import StrategyConfig
|
||||
|
||||
__all__ = ["SizingContext", "SizingResult", "compute_contracts"]
|
||||
|
||||
|
||||
class SizingContext(BaseModel):
|
||||
"""Snapshot of the inputs needed to size a single trade."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
capital_usd: Decimal
|
||||
max_loss_per_contract_usd: Decimal
|
||||
dvol_now: Decimal
|
||||
open_engagement_usd: Decimal
|
||||
eur_to_usd: Decimal
|
||||
other_open_positions: int
|
||||
|
||||
|
||||
class SizingResult(BaseModel):
|
||||
"""Result of :func:`compute_contracts`. ``reason_if_zero`` is set iff n=0."""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
n_contracts: int
|
||||
risk_dollars: Decimal
|
||||
reason_if_zero: str | None
|
||||
|
||||
|
||||
def _zero(reason: str) -> SizingResult:
|
||||
return SizingResult(n_contracts=0, risk_dollars=Decimal("0"), reason_if_zero=reason)
|
||||
|
||||
|
||||
def _dvol_multiplier(dvol_now: Decimal, cfg: StrategyConfig) -> Decimal | None:
|
||||
"""Return the size multiplier for the current DVOL, or ``None`` if no entry."""
|
||||
for band in cfg.sizing.dvol_adjustment:
|
||||
if dvol_now < band.dvol_under:
|
||||
return band.multiplier
|
||||
return None
|
||||
|
||||
|
||||
def compute_contracts(ctx: SizingContext, cfg: StrategyConfig) -> SizingResult:
|
||||
"""Return the contract count after Kelly, caps, DVOL and engagement checks."""
|
||||
if ctx.max_loss_per_contract_usd <= 0:
|
||||
return _zero(f"non-positive max_loss_per_contract ({ctx.max_loss_per_contract_usd})")
|
||||
|
||||
risk_target = ctx.capital_usd * cfg.sizing.kelly_fraction
|
||||
cap_per_trade_usd = cfg.sizing.cap_per_trade_eur * ctx.eur_to_usd
|
||||
risk_target = min(risk_target, cap_per_trade_usd)
|
||||
|
||||
multiplier = _dvol_multiplier(ctx.dvol_now, cfg)
|
||||
if multiplier is None:
|
||||
return _zero(f"dvol {ctx.dvol_now} above no-entry threshold")
|
||||
risk_target *= multiplier
|
||||
|
||||
n = int(risk_target // ctx.max_loss_per_contract_usd)
|
||||
n = min(n, cfg.sizing.max_contracts_per_trade)
|
||||
|
||||
cap_aggregate_usd = cfg.sizing.cap_aggregate_open_eur * ctx.eur_to_usd
|
||||
while n > 0 and (
|
||||
Decimal(n) * ctx.max_loss_per_contract_usd + ctx.open_engagement_usd
|
||||
) > cap_aggregate_usd:
|
||||
n -= 1
|
||||
|
||||
if ctx.other_open_positions >= cfg.sizing.max_concurrent_positions:
|
||||
return _zero(
|
||||
f"already {ctx.other_open_positions} concurrent position(s); "
|
||||
f"cap is {cfg.sizing.max_concurrent_positions}"
|
||||
)
|
||||
|
||||
if n < 1:
|
||||
return _zero("undersize after caps")
|
||||
|
||||
return SizingResult(
|
||||
n_contracts=n,
|
||||
risk_dollars=Decimal(n) * ctx.max_loss_per_contract_usd,
|
||||
reason_if_zero=None,
|
||||
)
|
||||
@@ -0,0 +1,72 @@
|
||||
"""Shared option-data types used across multiple algorithms.
|
||||
|
||||
Kept in one place so that :mod:`liquidity_gate`, :mod:`combo_builder`,
|
||||
:mod:`greeks_aggregator` and :mod:`exit_decision` agree on the same
|
||||
record format. The orchestrator is responsible for assembling
|
||||
:class:`OptionQuote` instances from the raw MCP responses.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
__all__ = ["OptionLeg", "OptionQuote", "OrderSide", "PutOrCall"]
|
||||
|
||||
|
||||
PutOrCall = Literal["P", "C"]
|
||||
OrderSide = Literal["BUY", "SELL"]
|
||||
|
||||
|
||||
class OptionQuote(BaseModel):
|
||||
"""Full market + greeks snapshot for a single option instrument.
|
||||
|
||||
Used by combo selection (greeks + strike + expiry) and by the
|
||||
liquidity gate (bid/ask/depth). Time information is in UTC.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
instrument: str
|
||||
strike: Decimal
|
||||
expiry: datetime
|
||||
option_type: PutOrCall
|
||||
|
||||
bid: Decimal
|
||||
ask: Decimal
|
||||
mid: Decimal
|
||||
|
||||
delta: Decimal
|
||||
gamma: Decimal
|
||||
theta: Decimal
|
||||
vega: Decimal
|
||||
|
||||
open_interest: int
|
||||
volume_24h: int
|
||||
book_depth_top3: int
|
||||
|
||||
|
||||
class OptionLeg(BaseModel):
|
||||
"""Single leg of an option spread, ready for execution.
|
||||
|
||||
The signed greeks are stored leg-level (not net): aggregation is done
|
||||
by :func:`cerbero_bite.core.greeks_aggregator.aggregate`.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
instrument: str
|
||||
side: OrderSide
|
||||
strike: Decimal
|
||||
expiry: datetime
|
||||
type: PutOrCall
|
||||
size: int
|
||||
|
||||
mid_price_eth: Decimal
|
||||
delta: Decimal
|
||||
gamma: Decimal
|
||||
theta: Decimal
|
||||
vega: Decimal
|
||||
Reference in New Issue
Block a user