feat(gui): Phase C — Position drilldown with payoff diagram

* gui/data_layer.py — adds load_position_by_id, load_decisions_for_position,
  compute_payoff_curve (pure math: bull_put / bear_call piecewise linear
  P&L at expiry, with breakeven), compute_distance_metrics (OTM%,
  days-to-expiry, days-held, width%).
* gui/pages/5_💼_Position.py — selector across open + 10 most-recent
  closed positions (with deep-link support via ?proposal_id=…), header
  metrics, distance summary, leg snapshot table (entry-time only —
  the GUI never calls MCP), plotly payoff diagram with strike/breakeven/
  entry-spot annotations and max profit/max loss tiles, decision
  history table from the decisions table.

Live greeks/mid are deliberately not pulled: per docs/11-gui-streamlit.md
the GUI reads SQLite + audit log only and lets the engine refresh data.

Validated math against a synthetic bull_put 2475/2350 × 2 contracts:
breakeven 2452.50, max profit $45, max loss $-160 — all matching the
expected formulas (credit, width × n − credit).

353/353 tests still pass; ruff clean; mypy strict src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-30 12:28:26 +02:00
parent db888ce0e8
commit 6f6dd4c8dd
2 changed files with 435 additions and 5 deletions
+190 -5
View File
@@ -19,6 +19,7 @@ from datetime import UTC, datetime, timedelta
from decimal import Decimal from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Literal from typing import Literal
from uuid import UUID
from cerbero_bite.safety.audit_log import ( from cerbero_bite.safety.audit_log import (
AuditChainError, AuditChainError,
@@ -27,7 +28,11 @@ from cerbero_bite.safety.audit_log import (
verify_chain, verify_chain,
) )
from cerbero_bite.state import Repository, connect from cerbero_bite.state import Repository, connect
from cerbero_bite.state.models import PositionRecord, SystemStateRecord from cerbero_bite.state.models import (
DecisionRecord,
PositionRecord,
SystemStateRecord,
)
__all__ = [ __all__ = [
"DEFAULT_AUDIT_PATH", "DEFAULT_AUDIT_PATH",
@@ -37,15 +42,21 @@ __all__ = [
"EngineSnapshot", "EngineSnapshot",
"EquityPoint", "EquityPoint",
"MonthlyStats", "MonthlyStats",
"PayoffCurve",
"PortfolioKpis", "PortfolioKpis",
"PositionDistanceMetrics",
"compute_distance_metrics",
"compute_equity_curve", "compute_equity_curve",
"compute_kpis", "compute_kpis",
"compute_monthly_stats", "compute_monthly_stats",
"compute_payoff_curve",
"load_audit_chain_status", "load_audit_chain_status",
"load_audit_tail", "load_audit_tail",
"load_closed_positions", "load_closed_positions",
"load_decisions_for_position",
"load_engine_snapshot", "load_engine_snapshot",
"load_open_positions", "load_open_positions",
"load_position_by_id",
] ]
@@ -285,10 +296,7 @@ def compute_equity_curve(positions: list[PositionRecord]) -> list[EquityPoint]:
cumulative += pos.pnl_usd cumulative += pos.pnl_usd
peak = max(peak, cumulative) peak = max(peak, cumulative)
dd_usd = peak - cumulative dd_usd = peak - cumulative
if peak > 0: dd_pct = float(dd_usd / peak) if peak > 0 else 0.0
dd_pct = float(dd_usd / peak)
else:
dd_pct = 0.0
points.append( points.append(
EquityPoint( EquityPoint(
timestamp=pos.closed_at, timestamp=pos.closed_at,
@@ -374,6 +382,183 @@ def compute_monthly_stats(positions: list[PositionRecord]) -> list[MonthlyStats]
return out return out
def load_position_by_id(
proposal_id: UUID,
*,
db_path: Path | str = DEFAULT_DB_PATH,
) -> PositionRecord | None:
db_path = Path(db_path)
if not db_path.exists():
return None
repo = Repository()
conn = connect(db_path)
try:
return repo.get_position(conn, proposal_id)
finally:
conn.close()
def load_decisions_for_position(
proposal_id: UUID,
*,
db_path: Path | str = DEFAULT_DB_PATH,
limit: int = 200,
) -> list[DecisionRecord]:
"""Decisions for ``proposal_id`` newest-first."""
db_path = Path(db_path)
if not db_path.exists():
return []
repo = Repository()
conn = connect(db_path)
try:
return repo.list_decisions(conn, proposal_id=proposal_id, limit=limit)
finally:
conn.close()
# ---------------------------------------------------------------------------
# Payoff math (pure, no live data)
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class PayoffCurve:
"""At-expiry P&L curve for a credit spread."""
spreads_type: str # "bull_put" / "bear_call" / "iron_condor"
spot_grid: list[float]
pnl_grid_usd: list[float]
breakeven: float | None
max_profit_usd: float
max_loss_usd: float
short_strike: float
long_strike: float
spot_at_entry: float
def compute_payoff_curve(
position: PositionRecord,
*,
grid_points: int = 60,
margin_pct: float = 0.15,
) -> PayoffCurve:
"""Build the at-expiry payoff for a credit spread.
Supported spreads (Cerbero Bite scope):
* ``bull_put``: short put @ ``short_strike``, long put @
``long_strike`` (lower). Max profit = credit. Max loss = width
credit. Breakeven = short_strike credit_per_contract.
* ``bear_call``: short call @ ``short_strike``, long call @
``long_strike`` (higher). Symmetric to bull_put around the strikes.
* Other types fall back to a flat zero curve to avoid breaking the
page if/when iron condors are implemented later.
"""
short = float(position.short_strike)
long_ = float(position.long_strike)
n = position.n_contracts
width_usd = float(position.spread_width_usd)
credit_total_usd = float(position.credit_usd)
credit_per_contract = credit_total_usd / n if n > 0 else 0.0
spot = float(position.eth_price_at_entry)
lo = min(short, long_, spot) * (1 - margin_pct)
hi = max(short, long_, spot) * (1 + margin_pct)
step = (hi - lo) / max(grid_points - 1, 1)
grid = [lo + i * step for i in range(grid_points)]
if position.spread_type == "bull_put":
# short put at higher strike, long put at lower strike
max_profit = credit_total_usd
max_loss = -(width_usd - credit_total_usd) * n # signed (negative)
breakeven = short - credit_per_contract
pnl = []
for s in grid:
if s >= short:
pnl.append(max_profit)
elif s <= long_:
pnl.append(max_loss)
else:
frac = (s - long_) / (short - long_)
pnl.append(max_loss + frac * (max_profit - max_loss))
elif position.spread_type == "bear_call":
# short call at lower strike, long call at higher strike
max_profit = credit_total_usd
max_loss = -(width_usd - credit_total_usd) * n
breakeven = short + credit_per_contract
pnl = []
for s in grid:
if s <= short:
pnl.append(max_profit)
elif s >= long_:
pnl.append(max_loss)
else:
frac = (s - short) / (long_ - short)
pnl.append(max_profit + frac * (max_loss - max_profit))
else:
max_profit = credit_total_usd
max_loss = -(width_usd - credit_total_usd) * n
breakeven = None
pnl = [0.0 for _ in grid]
return PayoffCurve(
spreads_type=position.spread_type,
spot_grid=grid,
pnl_grid_usd=pnl,
breakeven=breakeven,
max_profit_usd=max_profit,
max_loss_usd=max_loss,
short_strike=short,
long_strike=long_,
spot_at_entry=spot,
)
@dataclass(frozen=True)
class PositionDistanceMetrics:
"""Quick distance summary for the position drilldown."""
short_strike_otm_pct: float | None
days_to_expiry: int | None
days_held: int | None
delta_at_entry: float
width_pct_of_spot: float
def compute_distance_metrics(
position: PositionRecord,
*,
now: datetime | None = None,
) -> PositionDistanceMetrics:
spot = float(position.spot_at_entry)
short = float(position.short_strike)
if spot > 0:
if position.spread_type == "bull_put":
otm_pct = (spot - short) / spot
elif position.spread_type == "bear_call":
otm_pct = (short - spot) / spot
else:
otm_pct = None
else:
otm_pct = None
reference = (now or datetime.now(UTC)).astimezone(UTC)
days_to_expiry = (
(position.expiry - reference).days if position.expiry else None
)
days_held = (
(reference - position.opened_at).days if position.opened_at else None
)
return PositionDistanceMetrics(
short_strike_otm_pct=otm_pct,
days_to_expiry=days_to_expiry,
days_held=days_held,
delta_at_entry=float(position.delta_at_entry),
width_pct_of_spot=float(position.spread_width_pct),
)
def load_audit_tail( def load_audit_tail(
*, *,
audit_path: Path | str = DEFAULT_AUDIT_PATH, audit_path: Path | str = DEFAULT_AUDIT_PATH,
@@ -0,0 +1,245 @@
"""Position page — drilldown on a single open or recently-closed trade."""
from __future__ import annotations
import json
import os
from pathlib import Path
from uuid import UUID
import plotly.graph_objects as go
import streamlit as st
from cerbero_bite.gui.data_layer import (
DEFAULT_DB_PATH,
compute_distance_metrics,
compute_payoff_curve,
humanize_dt,
load_closed_positions,
load_decisions_for_position,
load_open_positions,
load_position_by_id,
)
from cerbero_bite.state.models import PositionRecord
def _resolve_db() -> Path:
return Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH))
def _position_label(p: PositionRecord) -> str:
short = (
f"{int(p.short_strike)}/{int(p.long_strike)}"
if p.short_strike and p.long_strike
else ""
)
return f"{str(p.proposal_id)[:8]} · {p.spread_type} · {short} · {p.status}"
def _render_header(position: PositionRecord) -> None:
cols = st.columns(4)
cols[0].metric("status", position.status)
cols[1].metric("spread", position.spread_type)
cols[2].metric("contracts", position.n_contracts)
cols[3].metric("credit (USD)", f"${float(position.credit_usd):+.2f}")
st.caption(
f"`{position.proposal_id}` · opened {humanize_dt(position.opened_at)} · "
f"expiry {humanize_dt(position.expiry)}"
)
def _render_legs(position: PositionRecord) -> None:
st.subheader("Legs (entry snapshot)")
rows = [
{
"leg": "short",
"instrument": position.short_instrument,
"strike": float(position.short_strike),
"side": "SELL",
"size": position.n_contracts,
"delta_at_entry": float(position.delta_at_entry),
},
{
"leg": "long",
"instrument": position.long_instrument,
"strike": float(position.long_strike),
"side": "BUY",
"size": position.n_contracts,
"delta_at_entry": "", # only short delta is persisted
},
]
st.dataframe(rows, use_container_width=True, hide_index=True)
st.caption(
"Live mid/greeks are not pulled from MCP by the GUI. "
"Refresh shown by the engine via the Audit page."
)
def _render_distance(position: PositionRecord) -> None:
metrics = compute_distance_metrics(position)
cols = st.columns(5)
cols[0].metric(
"Short strike OTM",
f"{metrics.short_strike_otm_pct:.1%}"
if metrics.short_strike_otm_pct is not None
else "",
)
cols[1].metric(
"Days to expiry",
metrics.days_to_expiry if metrics.days_to_expiry is not None else "",
)
cols[2].metric(
"Days held",
metrics.days_held if metrics.days_held is not None else "",
)
cols[3].metric("Δ at entry", f"{metrics.delta_at_entry:+.3f}")
cols[4].metric("Width % of spot", f"{metrics.width_pct_of_spot:.1%}")
def _render_payoff(position: PositionRecord) -> None:
st.subheader("Payoff at expiry")
curve = compute_payoff_curve(position)
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=curve.spot_grid,
y=curve.pnl_grid_usd,
mode="lines",
line={"color": "#3498db", "width": 2.5},
name="P&L at expiry",
fill="tozeroy",
fillcolor="rgba(52,152,219,0.10)",
)
)
fig.add_hline(y=0, line_dash="dot", line_color="grey", opacity=0.5)
fig.add_vline(
x=curve.short_strike,
line_dash="dash",
line_color="#27ae60",
opacity=0.7,
annotation_text=f"short {curve.short_strike:.0f}",
annotation_position="top",
)
fig.add_vline(
x=curve.long_strike,
line_dash="dash",
line_color="#c0392b",
opacity=0.7,
annotation_text=f"long {curve.long_strike:.0f}",
annotation_position="top",
)
if curve.breakeven is not None:
fig.add_vline(
x=curve.breakeven,
line_dash="dot",
line_color="orange",
opacity=0.7,
annotation_text=f"BE {curve.breakeven:.2f}",
annotation_position="bottom",
)
fig.add_vline(
x=curve.spot_at_entry,
line_dash="solid",
line_color="#7f8c8d",
opacity=0.4,
annotation_text=f"entry spot {curve.spot_at_entry:.0f}",
annotation_position="bottom",
)
fig.update_layout(
height=380,
margin={"l": 10, "r": 10, "t": 30, "b": 10},
xaxis_title="ETH spot at expiry (USD)",
yaxis_title="P&L (USD)",
legend={"orientation": "h", "y": 1.1},
)
st.plotly_chart(fig, use_container_width=True)
cols = st.columns(3)
cols[0].metric("Max profit", f"${curve.max_profit_usd:+.2f}")
cols[1].metric("Max loss", f"${curve.max_loss_usd:+.2f}")
cols[2].metric(
"Breakeven",
f"{curve.breakeven:.2f}" if curve.breakeven is not None else "",
)
def _render_decisions(position: PositionRecord) -> None:
st.subheader("Decision history")
decisions = load_decisions_for_position(position.proposal_id)
if not decisions:
st.info("No decisions recorded for this position yet.")
return
rows = []
for d in decisions:
try:
outputs = json.loads(d.outputs_json)
except (TypeError, ValueError):
outputs = {}
rows.append(
{
"timestamp": humanize_dt(d.timestamp),
"decision_type": d.decision_type,
"action": d.action_taken or "",
"notes": d.notes or "",
"outputs": json.dumps(outputs, sort_keys=True)
if outputs
else "",
}
)
st.dataframe(rows, use_container_width=True, hide_index=True)
def render() -> None:
st.title("💼 Position")
st.caption(
"Drilldown on the trade: legs, payoff at expiry, decision history. "
"All data is read from SQLite — no live MCP calls."
)
db_path = _resolve_db()
open_pos = load_open_positions(db_path=db_path)
closed_recent = load_closed_positions(db_path=db_path)[-10:] # last 10
candidates: list[PositionRecord] = list(open_pos) + list(reversed(closed_recent))
if not candidates:
st.info(
"No positions to display. The page will populate once the "
"engine opens its first trade."
)
return
labels = {_position_label(p): p for p in candidates}
pick = st.selectbox(
"Position",
options=list(labels.keys()),
index=0,
)
position = labels[pick]
# Allow deep-linking via ?proposal_id=...
qp = st.query_params.get("proposal_id")
if qp:
try:
qp_uuid = UUID(qp)
override = load_position_by_id(qp_uuid, db_path=db_path)
if override is not None:
position = override
except ValueError:
st.warning(f"Invalid proposal_id query parameter: {qp}")
st.divider()
_render_header(position)
st.divider()
_render_distance(position)
st.divider()
_render_legs(position)
st.divider()
_render_payoff(position)
st.divider()
_render_decisions(position)
render()