diff --git a/src/cerbero_bite/gui/data_layer.py b/src/cerbero_bite/gui/data_layer.py index 3bd5955..dd48ee6 100644 --- a/src/cerbero_bite/gui/data_layer.py +++ b/src/cerbero_bite/gui/data_layer.py @@ -19,6 +19,7 @@ from datetime import UTC, datetime, timedelta from decimal import Decimal from pathlib import Path from typing import Literal +from uuid import UUID from cerbero_bite.safety.audit_log import ( AuditChainError, @@ -27,7 +28,11 @@ from cerbero_bite.safety.audit_log import ( verify_chain, ) from cerbero_bite.state import Repository, connect -from cerbero_bite.state.models import PositionRecord, SystemStateRecord +from cerbero_bite.state.models import ( + DecisionRecord, + PositionRecord, + SystemStateRecord, +) __all__ = [ "DEFAULT_AUDIT_PATH", @@ -37,15 +42,21 @@ __all__ = [ "EngineSnapshot", "EquityPoint", "MonthlyStats", + "PayoffCurve", "PortfolioKpis", + "PositionDistanceMetrics", + "compute_distance_metrics", "compute_equity_curve", "compute_kpis", "compute_monthly_stats", + "compute_payoff_curve", "load_audit_chain_status", "load_audit_tail", "load_closed_positions", + "load_decisions_for_position", "load_engine_snapshot", "load_open_positions", + "load_position_by_id", ] @@ -285,10 +296,7 @@ def compute_equity_curve(positions: list[PositionRecord]) -> list[EquityPoint]: cumulative += pos.pnl_usd peak = max(peak, cumulative) dd_usd = peak - cumulative - if peak > 0: - dd_pct = float(dd_usd / peak) - else: - dd_pct = 0.0 + dd_pct = float(dd_usd / peak) if peak > 0 else 0.0 points.append( EquityPoint( timestamp=pos.closed_at, @@ -374,6 +382,183 @@ def compute_monthly_stats(positions: list[PositionRecord]) -> list[MonthlyStats] return out +def load_position_by_id( + proposal_id: UUID, + *, + db_path: Path | str = DEFAULT_DB_PATH, +) -> PositionRecord | None: + db_path = Path(db_path) + if not db_path.exists(): + return None + repo = Repository() + conn = connect(db_path) + try: + return repo.get_position(conn, proposal_id) + finally: + conn.close() + + +def load_decisions_for_position( + proposal_id: UUID, + *, + db_path: Path | str = DEFAULT_DB_PATH, + limit: int = 200, +) -> list[DecisionRecord]: + """Decisions for ``proposal_id`` newest-first.""" + db_path = Path(db_path) + if not db_path.exists(): + return [] + repo = Repository() + conn = connect(db_path) + try: + return repo.list_decisions(conn, proposal_id=proposal_id, limit=limit) + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Payoff math (pure, no live data) +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class PayoffCurve: + """At-expiry P&L curve for a credit spread.""" + + spreads_type: str # "bull_put" / "bear_call" / "iron_condor" + spot_grid: list[float] + pnl_grid_usd: list[float] + breakeven: float | None + max_profit_usd: float + max_loss_usd: float + short_strike: float + long_strike: float + spot_at_entry: float + + +def compute_payoff_curve( + position: PositionRecord, + *, + grid_points: int = 60, + margin_pct: float = 0.15, +) -> PayoffCurve: + """Build the at-expiry payoff for a credit spread. + + Supported spreads (Cerbero Bite scope): + + * ``bull_put``: short put @ ``short_strike``, long put @ + ``long_strike`` (lower). Max profit = credit. Max loss = width โˆ’ + credit. Breakeven = short_strike โˆ’ credit_per_contract. + * ``bear_call``: short call @ ``short_strike``, long call @ + ``long_strike`` (higher). Symmetric to bull_put around the strikes. + * Other types fall back to a flat zero curve to avoid breaking the + page if/when iron condors are implemented later. + """ + short = float(position.short_strike) + long_ = float(position.long_strike) + n = position.n_contracts + width_usd = float(position.spread_width_usd) + credit_total_usd = float(position.credit_usd) + credit_per_contract = credit_total_usd / n if n > 0 else 0.0 + spot = float(position.eth_price_at_entry) + + lo = min(short, long_, spot) * (1 - margin_pct) + hi = max(short, long_, spot) * (1 + margin_pct) + step = (hi - lo) / max(grid_points - 1, 1) + grid = [lo + i * step for i in range(grid_points)] + + if position.spread_type == "bull_put": + # short put at higher strike, long put at lower strike + max_profit = credit_total_usd + max_loss = -(width_usd - credit_total_usd) * n # signed (negative) + breakeven = short - credit_per_contract + pnl = [] + for s in grid: + if s >= short: + pnl.append(max_profit) + elif s <= long_: + pnl.append(max_loss) + else: + frac = (s - long_) / (short - long_) + pnl.append(max_loss + frac * (max_profit - max_loss)) + elif position.spread_type == "bear_call": + # short call at lower strike, long call at higher strike + max_profit = credit_total_usd + max_loss = -(width_usd - credit_total_usd) * n + breakeven = short + credit_per_contract + pnl = [] + for s in grid: + if s <= short: + pnl.append(max_profit) + elif s >= long_: + pnl.append(max_loss) + else: + frac = (s - short) / (long_ - short) + pnl.append(max_profit + frac * (max_loss - max_profit)) + else: + max_profit = credit_total_usd + max_loss = -(width_usd - credit_total_usd) * n + breakeven = None + pnl = [0.0 for _ in grid] + + return PayoffCurve( + spreads_type=position.spread_type, + spot_grid=grid, + pnl_grid_usd=pnl, + breakeven=breakeven, + max_profit_usd=max_profit, + max_loss_usd=max_loss, + short_strike=short, + long_strike=long_, + spot_at_entry=spot, + ) + + +@dataclass(frozen=True) +class PositionDistanceMetrics: + """Quick distance summary for the position drilldown.""" + + short_strike_otm_pct: float | None + days_to_expiry: int | None + days_held: int | None + delta_at_entry: float + width_pct_of_spot: float + + +def compute_distance_metrics( + position: PositionRecord, + *, + now: datetime | None = None, +) -> PositionDistanceMetrics: + spot = float(position.spot_at_entry) + short = float(position.short_strike) + if spot > 0: + if position.spread_type == "bull_put": + otm_pct = (spot - short) / spot + elif position.spread_type == "bear_call": + otm_pct = (short - spot) / spot + else: + otm_pct = None + else: + otm_pct = None + + reference = (now or datetime.now(UTC)).astimezone(UTC) + days_to_expiry = ( + (position.expiry - reference).days if position.expiry else None + ) + days_held = ( + (reference - position.opened_at).days if position.opened_at else None + ) + + return PositionDistanceMetrics( + short_strike_otm_pct=otm_pct, + days_to_expiry=days_to_expiry, + days_held=days_held, + delta_at_entry=float(position.delta_at_entry), + width_pct_of_spot=float(position.spread_width_pct), + ) + + def load_audit_tail( *, audit_path: Path | str = DEFAULT_AUDIT_PATH, diff --git a/src/cerbero_bite/gui/pages/5_๐Ÿ’ผ_Position.py b/src/cerbero_bite/gui/pages/5_๐Ÿ’ผ_Position.py new file mode 100644 index 0000000..f933503 --- /dev/null +++ b/src/cerbero_bite/gui/pages/5_๐Ÿ’ผ_Position.py @@ -0,0 +1,245 @@ +"""Position page โ€” drilldown on a single open or recently-closed trade.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from uuid import UUID + +import plotly.graph_objects as go +import streamlit as st + +from cerbero_bite.gui.data_layer import ( + DEFAULT_DB_PATH, + compute_distance_metrics, + compute_payoff_curve, + humanize_dt, + load_closed_positions, + load_decisions_for_position, + load_open_positions, + load_position_by_id, +) +from cerbero_bite.state.models import PositionRecord + + +def _resolve_db() -> Path: + return Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH)) + + +def _position_label(p: PositionRecord) -> str: + short = ( + f"{int(p.short_strike)}/{int(p.long_strike)}" + if p.short_strike and p.long_strike + else "โ€”" + ) + return f"{str(p.proposal_id)[:8]} ยท {p.spread_type} ยท {short} ยท {p.status}" + + +def _render_header(position: PositionRecord) -> None: + cols = st.columns(4) + cols[0].metric("status", position.status) + cols[1].metric("spread", position.spread_type) + cols[2].metric("contracts", position.n_contracts) + cols[3].metric("credit (USD)", f"${float(position.credit_usd):+.2f}") + st.caption( + f"`{position.proposal_id}` ยท opened {humanize_dt(position.opened_at)} ยท " + f"expiry {humanize_dt(position.expiry)}" + ) + + +def _render_legs(position: PositionRecord) -> None: + st.subheader("Legs (entry snapshot)") + rows = [ + { + "leg": "short", + "instrument": position.short_instrument, + "strike": float(position.short_strike), + "side": "SELL", + "size": position.n_contracts, + "delta_at_entry": float(position.delta_at_entry), + }, + { + "leg": "long", + "instrument": position.long_instrument, + "strike": float(position.long_strike), + "side": "BUY", + "size": position.n_contracts, + "delta_at_entry": "โ€”", # only short delta is persisted + }, + ] + st.dataframe(rows, use_container_width=True, hide_index=True) + st.caption( + "Live mid/greeks are not pulled from MCP by the GUI. " + "Refresh shown by the engine via the Audit page." + ) + + +def _render_distance(position: PositionRecord) -> None: + metrics = compute_distance_metrics(position) + cols = st.columns(5) + cols[0].metric( + "Short strike OTM", + f"{metrics.short_strike_otm_pct:.1%}" + if metrics.short_strike_otm_pct is not None + else "โ€”", + ) + cols[1].metric( + "Days to expiry", + metrics.days_to_expiry if metrics.days_to_expiry is not None else "โ€”", + ) + cols[2].metric( + "Days held", + metrics.days_held if metrics.days_held is not None else "โ€”", + ) + cols[3].metric("ฮ” at entry", f"{metrics.delta_at_entry:+.3f}") + cols[4].metric("Width % of spot", f"{metrics.width_pct_of_spot:.1%}") + + +def _render_payoff(position: PositionRecord) -> None: + st.subheader("Payoff at expiry") + curve = compute_payoff_curve(position) + + fig = go.Figure() + fig.add_trace( + go.Scatter( + x=curve.spot_grid, + y=curve.pnl_grid_usd, + mode="lines", + line={"color": "#3498db", "width": 2.5}, + name="P&L at expiry", + fill="tozeroy", + fillcolor="rgba(52,152,219,0.10)", + ) + ) + fig.add_hline(y=0, line_dash="dot", line_color="grey", opacity=0.5) + fig.add_vline( + x=curve.short_strike, + line_dash="dash", + line_color="#27ae60", + opacity=0.7, + annotation_text=f"short {curve.short_strike:.0f}", + annotation_position="top", + ) + fig.add_vline( + x=curve.long_strike, + line_dash="dash", + line_color="#c0392b", + opacity=0.7, + annotation_text=f"long {curve.long_strike:.0f}", + annotation_position="top", + ) + if curve.breakeven is not None: + fig.add_vline( + x=curve.breakeven, + line_dash="dot", + line_color="orange", + opacity=0.7, + annotation_text=f"BE {curve.breakeven:.2f}", + annotation_position="bottom", + ) + fig.add_vline( + x=curve.spot_at_entry, + line_dash="solid", + line_color="#7f8c8d", + opacity=0.4, + annotation_text=f"entry spot {curve.spot_at_entry:.0f}", + annotation_position="bottom", + ) + fig.update_layout( + height=380, + margin={"l": 10, "r": 10, "t": 30, "b": 10}, + xaxis_title="ETH spot at expiry (USD)", + yaxis_title="P&L (USD)", + legend={"orientation": "h", "y": 1.1}, + ) + st.plotly_chart(fig, use_container_width=True) + + cols = st.columns(3) + cols[0].metric("Max profit", f"${curve.max_profit_usd:+.2f}") + cols[1].metric("Max loss", f"${curve.max_loss_usd:+.2f}") + cols[2].metric( + "Breakeven", + f"{curve.breakeven:.2f}" if curve.breakeven is not None else "โ€”", + ) + + +def _render_decisions(position: PositionRecord) -> None: + st.subheader("Decision history") + decisions = load_decisions_for_position(position.proposal_id) + if not decisions: + st.info("No decisions recorded for this position yet.") + return + + rows = [] + for d in decisions: + try: + outputs = json.loads(d.outputs_json) + except (TypeError, ValueError): + outputs = {} + rows.append( + { + "timestamp": humanize_dt(d.timestamp), + "decision_type": d.decision_type, + "action": d.action_taken or "โ€”", + "notes": d.notes or "", + "outputs": json.dumps(outputs, sort_keys=True) + if outputs + else "", + } + ) + st.dataframe(rows, use_container_width=True, hide_index=True) + + +def render() -> None: + st.title("๐Ÿ’ผ Position") + st.caption( + "Drilldown on the trade: legs, payoff at expiry, decision history. " + "All data is read from SQLite โ€” no live MCP calls." + ) + + db_path = _resolve_db() + + open_pos = load_open_positions(db_path=db_path) + closed_recent = load_closed_positions(db_path=db_path)[-10:] # last 10 + candidates: list[PositionRecord] = list(open_pos) + list(reversed(closed_recent)) + + if not candidates: + st.info( + "No positions to display. The page will populate once the " + "engine opens its first trade." + ) + return + + labels = {_position_label(p): p for p in candidates} + pick = st.selectbox( + "Position", + options=list(labels.keys()), + index=0, + ) + position = labels[pick] + + # Allow deep-linking via ?proposal_id=... + qp = st.query_params.get("proposal_id") + if qp: + try: + qp_uuid = UUID(qp) + override = load_position_by_id(qp_uuid, db_path=db_path) + if override is not None: + position = override + except ValueError: + st.warning(f"Invalid proposal_id query parameter: {qp}") + + st.divider() + _render_header(position) + st.divider() + _render_distance(position) + st.divider() + _render_legs(position) + st.divider() + _render_payoff(position) + st.divider() + _render_decisions(position) + + +render()