feat(gui): Phase B — Equity + History pages

Adds the analytics surface of the dashboard:

* gui/data_layer.py — extended with load_closed_positions (windowed
  filter on closed_at) and three pure-function aggregators:
  compute_equity_curve, compute_kpis, compute_monthly_stats. Drawdown
  is measured against the running peak of cumulative realised P&L.
* gui/pages/3_📈_Equity.py — KPI strip, plotly cumulative-PnL line,
  drawdown area below, P&L histogram by close_reason, per-month table
  with win-rate.
* gui/pages/4_📜_History.py — windowed table of closed trades with
  multiselect close-reason and winners/losers radio filters, six-tile
  KPI strip, CSV export button.
* pyproject.toml — relax mypy on plotly + pandas (no shipped stubs).

Validated with synthetic data: 3 trades, 67% win rate, $50 total,
max drawdown $30 — all matching expected math. GUI launches, HTTP 200
on / and /_stcore/health.

353/353 tests still pass; ruff clean; mypy strict src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-30 12:11:02 +02:00
parent 1af983aff1
commit db888ce0e8
4 changed files with 491 additions and 1 deletions
+1 -1
View File
@@ -116,7 +116,7 @@ no_implicit_reexport = true
files = ["src/cerbero_bite"] files = ["src/cerbero_bite"]
[[tool.mypy.overrides]] [[tool.mypy.overrides]]
module = ["apscheduler.*"] module = ["apscheduler.*", "plotly.*", "pandas.*"]
ignore_missing_imports = true ignore_missing_imports = true
[tool.pytest.ini_options] [tool.pytest.ini_options]
+194
View File
@@ -16,6 +16,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Literal from typing import Literal
@@ -34,8 +35,15 @@ __all__ = [
"AuditChainStatus", "AuditChainStatus",
"EngineHealth", "EngineHealth",
"EngineSnapshot", "EngineSnapshot",
"EquityPoint",
"MonthlyStats",
"PortfolioKpis",
"compute_equity_curve",
"compute_kpis",
"compute_monthly_stats",
"load_audit_chain_status", "load_audit_chain_status",
"load_audit_tail", "load_audit_tail",
"load_closed_positions",
"load_engine_snapshot", "load_engine_snapshot",
"load_open_positions", "load_open_positions",
] ]
@@ -180,6 +188,192 @@ def load_open_positions(
conn.close() conn.close()
def load_closed_positions(
*,
db_path: Path | str = DEFAULT_DB_PATH,
start: datetime | None = None,
end: datetime | None = None,
) -> list[PositionRecord]:
"""Return positions with status ``closed`` (sorted oldest → newest).
The optional ``start`` / ``end`` window filters by ``closed_at``.
Positions still in flight (open / awaiting_fill / closing /
cancelled) are excluded. ``cancelled`` positions are also excluded
since they never had P&L impact.
"""
db_path = Path(db_path)
if not db_path.exists():
return []
repo = Repository()
conn = connect(db_path)
try:
rows = repo.list_positions(conn, status="closed")
finally:
conn.close()
out: list[PositionRecord] = []
for r in rows:
if r.closed_at is None:
continue
if start is not None and r.closed_at < start:
continue
if end is not None and r.closed_at > end:
continue
out.append(r)
out.sort(key=lambda p: p.closed_at) # type: ignore[arg-type, return-value]
return out
# ---------------------------------------------------------------------------
# Analytics
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class EquityPoint:
"""One point on the cumulative-PnL curve."""
timestamp: datetime
realized_pnl_usd: Decimal
cumulative_pnl_usd: Decimal
drawdown_usd: Decimal
drawdown_pct: float
@dataclass(frozen=True)
class MonthlyStats:
"""Aggregated stats for a calendar month."""
year_month: str # "2026-04"
n_trades: int
n_wins: int
win_rate: float
pnl_usd: Decimal
avg_pnl_usd: Decimal
@dataclass(frozen=True)
class PortfolioKpis:
"""High-level KPI strip for the History/Equity pages."""
n_trades: int
n_wins: int
win_rate: float
total_pnl_usd: Decimal
avg_win_usd: Decimal
avg_loss_usd: Decimal
edge_per_trade_usd: Decimal
max_drawdown_usd: Decimal
max_drawdown_pct: float
def compute_equity_curve(positions: list[PositionRecord]) -> list[EquityPoint]:
"""Build a cumulative PnL series from closed positions.
Drawdown is measured against the running peak of cumulative PnL
(so it accounts for past wins). ``drawdown_pct`` is expressed
relative to the peak — undefined when peak ≤ 0 (returns 0.0).
"""
if not positions:
return []
points: list[EquityPoint] = []
cumulative = Decimal(0)
peak = Decimal(0)
for pos in positions:
if pos.pnl_usd is None or pos.closed_at is None:
continue
cumulative += pos.pnl_usd
peak = max(peak, cumulative)
dd_usd = peak - cumulative
if peak > 0:
dd_pct = float(dd_usd / peak)
else:
dd_pct = 0.0
points.append(
EquityPoint(
timestamp=pos.closed_at,
realized_pnl_usd=pos.pnl_usd,
cumulative_pnl_usd=cumulative,
drawdown_usd=dd_usd,
drawdown_pct=dd_pct,
)
)
return points
def compute_kpis(positions: list[PositionRecord]) -> PortfolioKpis:
"""Aggregate KPI strip across the supplied closed positions."""
pnls = [p.pnl_usd for p in positions if p.pnl_usd is not None]
n = len(pnls)
if n == 0:
zero = Decimal(0)
return PortfolioKpis(
n_trades=0,
n_wins=0,
win_rate=0.0,
total_pnl_usd=zero,
avg_win_usd=zero,
avg_loss_usd=zero,
edge_per_trade_usd=zero,
max_drawdown_usd=zero,
max_drawdown_pct=0.0,
)
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p < 0]
total = sum(pnls, Decimal(0))
avg_win = sum(wins, Decimal(0)) / Decimal(len(wins)) if wins else Decimal(0)
avg_loss = sum(losses, Decimal(0)) / Decimal(len(losses)) if losses else Decimal(0)
curve = compute_equity_curve(positions)
if curve:
max_dd = max((p.drawdown_usd for p in curve), default=Decimal(0))
max_dd_pct = max((p.drawdown_pct for p in curve), default=0.0)
else: # pragma: no cover — defensive, curve is empty iff pnls empty
max_dd = Decimal(0)
max_dd_pct = 0.0
return PortfolioKpis(
n_trades=n,
n_wins=len(wins),
win_rate=len(wins) / n,
total_pnl_usd=total,
avg_win_usd=avg_win,
avg_loss_usd=avg_loss,
edge_per_trade_usd=total / Decimal(n),
max_drawdown_usd=max_dd,
max_drawdown_pct=max_dd_pct,
)
def compute_monthly_stats(positions: list[PositionRecord]) -> list[MonthlyStats]:
"""Aggregate per calendar month (UTC), oldest → newest."""
buckets: dict[str, list[Decimal]] = {}
for pos in positions:
if pos.pnl_usd is None or pos.closed_at is None:
continue
key = pos.closed_at.astimezone(UTC).strftime("%Y-%m")
buckets.setdefault(key, []).append(pos.pnl_usd)
out: list[MonthlyStats] = []
for key in sorted(buckets):
pnls = buckets[key]
n = len(pnls)
wins = sum(1 for p in pnls if p > 0)
total = sum(pnls, Decimal(0))
out.append(
MonthlyStats(
year_month=key,
n_trades=n,
n_wins=wins,
win_rate=wins / n if n else 0.0,
pnl_usd=total,
avg_pnl_usd=total / Decimal(n) if n else Decimal(0),
)
)
return out
def load_audit_tail( def load_audit_tail(
*, *,
audit_path: Path | str = DEFAULT_AUDIT_PATH, audit_path: Path | str = DEFAULT_AUDIT_PATH,
+172
View File
@@ -0,0 +1,172 @@
"""Equity page — cumulative PnL, drawdown, distributions."""
from __future__ import annotations
import os
from collections import Counter
from datetime import UTC, datetime, timedelta
from pathlib import Path
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
from cerbero_bite.gui.data_layer import (
DEFAULT_DB_PATH,
compute_equity_curve,
compute_kpis,
compute_monthly_stats,
load_closed_positions,
)
def _resolve_db() -> Path:
return Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH))
def _date_window(label: str) -> tuple[datetime | None, datetime | None]:
"""UI control for picking the analytics window."""
options = {
"All time": (None, None),
"Last 30 days": (datetime.now(UTC) - timedelta(days=30), None),
"Last 90 days": (datetime.now(UTC) - timedelta(days=90), None),
"Year to date": (datetime(datetime.now(UTC).year, 1, 1, tzinfo=UTC), None),
}
pick = st.selectbox(label, list(options.keys()), index=0)
return options[pick]
def render() -> None:
st.title("📈 Equity")
st.caption(
"Cumulative realised P&L, drawdown, and per-trade distribution. "
"Computed from closed positions in `data/state.sqlite`."
)
start, end = _date_window("Window")
db_path = _resolve_db()
positions = load_closed_positions(db_path=db_path, start=start, end=end)
if not positions:
st.info(
"No closed positions in the selected window yet. "
"The equity curve will populate as soon as the engine closes its first trade."
)
return
# KPI strip
kpis = compute_kpis(positions)
cols = st.columns(5)
cols[0].metric("Closed trades", kpis.n_trades)
cols[1].metric("Win rate", f"{kpis.win_rate:.0%}")
cols[2].metric("Total P&L", f"${float(kpis.total_pnl_usd):+.2f}")
cols[3].metric("Edge / trade", f"${float(kpis.edge_per_trade_usd):+.2f}")
cols[4].metric(
"Max drawdown",
f"${float(kpis.max_drawdown_usd):.2f}",
delta=f"{kpis.max_drawdown_pct:.1%}",
delta_color="inverse",
)
st.divider()
# Equity curve + drawdown
curve = compute_equity_curve(positions)
df = pd.DataFrame(
{
"timestamp": [p.timestamp for p in curve],
"cumulative_pnl_usd": [float(p.cumulative_pnl_usd) for p in curve],
"drawdown_usd": [float(p.drawdown_usd) for p in curve],
"realized_pnl_usd": [float(p.realized_pnl_usd) for p in curve],
}
)
st.subheader("Cumulative P&L (USD)")
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=df["timestamp"],
y=df["cumulative_pnl_usd"],
mode="lines+markers",
name="cumulative P&L",
line={"color": "#2ecc71", "width": 2},
)
)
fig.add_hline(y=0, line_dash="dot", line_color="grey", opacity=0.5)
fig.update_layout(
height=320,
margin={"l": 10, "r": 10, "t": 30, "b": 10},
xaxis_title=None,
yaxis_title="USD",
)
st.plotly_chart(fig, use_container_width=True)
st.subheader("Drawdown (USD)")
dd_fig = go.Figure()
dd_fig.add_trace(
go.Scatter(
x=df["timestamp"],
y=-df["drawdown_usd"],
mode="lines",
fill="tozeroy",
name="drawdown",
line={"color": "#e74c3c", "width": 1.5},
)
)
dd_fig.update_layout(
height=220,
margin={"l": 10, "r": 10, "t": 30, "b": 10},
xaxis_title=None,
yaxis_title="USD",
)
st.plotly_chart(dd_fig, use_container_width=True)
# PnL distribution
st.subheader("P&L distribution by close reason")
by_reason: dict[str, list[float]] = {}
for pos in positions:
if pos.pnl_usd is None:
continue
by_reason.setdefault(pos.close_reason or "(unknown)", []).append(
float(pos.pnl_usd)
)
counts = Counter(
(pos.close_reason or "(unknown)") for pos in positions
)
cols = st.columns(min(len(counts), 6) or 1)
for col, (reason, count) in zip(cols, counts.most_common(6), strict=False):
col.metric(reason, count)
hist_fig = go.Figure()
for reason, pnls in by_reason.items():
hist_fig.add_trace(go.Histogram(x=pnls, name=reason, opacity=0.6, nbinsx=30))
hist_fig.update_layout(
barmode="overlay",
height=320,
margin={"l": 10, "r": 10, "t": 30, "b": 10},
xaxis_title="P&L (USD)",
yaxis_title="trades",
legend={"orientation": "h", "y": 1.1},
)
st.plotly_chart(hist_fig, use_container_width=True)
# Monthly table
st.subheader("Per-month stats")
months = compute_monthly_stats(positions)
rows = [
{
"month": m.year_month,
"trades": m.n_trades,
"wins": m.n_wins,
"win_rate": f"{m.win_rate:.0%}",
"P&L (USD)": f"{float(m.pnl_usd):+.2f}",
"avg / trade": f"{float(m.avg_pnl_usd):+.2f}",
}
for m in months
]
st.dataframe(rows, use_container_width=True, hide_index=True)
render()
@@ -0,0 +1,124 @@
"""History page — closed-trade table with filters and CSV export."""
from __future__ import annotations
import io
import os
from datetime import UTC, datetime, timedelta
from pathlib import Path
import pandas as pd
import streamlit as st
from cerbero_bite.gui.data_layer import (
DEFAULT_DB_PATH,
compute_kpis,
humanize_dt,
load_closed_positions,
)
def _resolve_db() -> Path:
return Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH))
def _date_window() -> tuple[datetime | None, datetime | None]:
presets = {
"All time": (None, None),
"Last 7 days": (datetime.now(UTC) - timedelta(days=7), None),
"Last 30 days": (datetime.now(UTC) - timedelta(days=30), None),
"Last 90 days": (datetime.now(UTC) - timedelta(days=90), None),
"Year to date": (datetime(datetime.now(UTC).year, 1, 1, tzinfo=UTC), None),
}
pick = st.selectbox("Window", list(presets.keys()), index=0)
return presets[pick]
def render() -> None:
st.title("📜 History")
st.caption("Closed trades with filters, KPI strip, and CSV export.")
db_path = _resolve_db()
start, end = _date_window()
positions = load_closed_positions(db_path=db_path, start=start, end=end)
# Sub-filter by close reason and PnL sign.
reason_options = sorted({p.close_reason or "(unknown)" for p in positions})
chosen_reasons = st.multiselect(
"Close reasons", options=reason_options, default=reason_options
)
pnl_filter = st.radio(
"P&L filter",
options=["all", "winners", "losers"],
horizontal=True,
index=0,
)
filtered = []
for p in positions:
reason = p.close_reason or "(unknown)"
if reason not in chosen_reasons:
continue
if pnl_filter == "winners" and (p.pnl_usd is None or p.pnl_usd <= 0):
continue
if pnl_filter == "losers" and (p.pnl_usd is None or p.pnl_usd >= 0):
continue
filtered.append(p)
# KPI strip
kpis = compute_kpis(filtered)
cols = st.columns(6)
cols[0].metric("Trades", kpis.n_trades)
cols[1].metric("Win rate", f"{kpis.win_rate:.0%}")
cols[2].metric("Total P&L", f"${float(kpis.total_pnl_usd):+.2f}")
cols[3].metric("Avg win", f"${float(kpis.avg_win_usd):+.2f}")
cols[4].metric("Avg loss", f"${float(kpis.avg_loss_usd):+.2f}")
cols[5].metric("Edge / trade", f"${float(kpis.edge_per_trade_usd):+.2f}")
st.divider()
if not filtered:
st.info("No trades match the current filters.")
return
# Build DataFrame for display + export
rows = []
for p in filtered:
days_held = (
(p.closed_at - p.opened_at).days
if p.opened_at and p.closed_at
else None
)
rows.append(
{
"proposal_id": str(p.proposal_id)[:8],
"spread_type": p.spread_type,
"asset": p.asset,
"n_contracts": p.n_contracts,
"short_strike": float(p.short_strike),
"long_strike": float(p.long_strike),
"credit_usd": float(p.credit_usd),
"max_loss_usd": float(p.max_loss_usd),
"pnl_usd": float(p.pnl_usd) if p.pnl_usd is not None else None,
"close_reason": p.close_reason or "(unknown)",
"days_held": days_held,
"opened_at": humanize_dt(p.opened_at),
"closed_at": humanize_dt(p.closed_at),
"expiry": humanize_dt(p.expiry),
}
)
df = pd.DataFrame(rows)
st.dataframe(df, use_container_width=True, hide_index=True)
# CSV export
buf = io.StringIO()
df.to_csv(buf, index=False)
st.download_button(
"⬇ Download CSV",
data=buf.getvalue(),
file_name=f"cerbero_bite_history_{datetime.now(UTC).date()}.csv",
mime="text/csv",
)
render()