feat(gui): Phase A — read-only Streamlit dashboard (Status + Audit)

Implements the foundation of the local observation dashboard described
in docs/11-gui-streamlit.md:

* gui/data_layer.py — read-only wrappers over Repository (system_state,
  open positions) and audit_log (tail iteration, chain verify). The GUI
  never imports runtime/ nor calls MCP services.
* gui/main.py — Streamlit entry point with sidebar (engine health
  badge, kill switch banner, last health check age), home overview.
* gui/pages/1_📊_Status.py — engine status with colored health banner,
  kill switch detail, audit anchor, open positions table.
* gui/pages/2_🔍_Audit.py — live audit log stream (newest-first),
  event filters, hash-chain integrity verify button.
* cli.py gui — replaces the placeholder with os.execvpe to
  `python -m streamlit run` bound to 127.0.0.1, --browser.gatherUsageStats
  false; --db / --audit paths exported via env to the GUI process.
* pyproject.toml — N999 ignore for src/cerbero_bite/gui/pages/* (Streamlit
  auto-discovers pages whose filename contains numbers and emoji icons).

Smoke test: GUI launches, HTTP 200 on / and /_stcore/health, data layer
correctly reflects current testnet state (engine=running, kill_switch
disarmed, 0 open positions, audit chain integra 7 entries).

353/353 tests still pass; ruff clean; mypy strict src clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-30 12:07:23 +02:00
parent abf5a140e2
commit 1af983aff1
6 changed files with 684 additions and 3 deletions
+65 -3
View File
@@ -10,6 +10,7 @@ without changing the surface.
from __future__ import annotations
import asyncio
import os
import sys
from collections.abc import Callable
from datetime import UTC, datetime
@@ -580,9 +581,70 @@ async def _ping_all(
@main.command()
def gui() -> None:
"""Launch the Streamlit dashboard."""
_phase0_notice("gui command not yet implemented (will run streamlit on 127.0.0.1:8765).")
@click.option(
"--db",
type=click.Path(path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
help="SQLite state file the dashboard reads.",
)
@click.option(
"--audit",
type=click.Path(path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
help="Audit log file the dashboard streams.",
)
@click.option(
"--port",
type=int,
default=8765,
show_default=True,
help="Local port to bind (always 127.0.0.1).",
)
@click.option(
"--headless/--no-headless",
default=True,
show_default=True,
help="When true, do not auto-open the browser.",
)
def gui(db: Path, audit: Path, port: int, headless: bool) -> None:
"""Launch the Streamlit dashboard (read-only, localhost only)."""
try:
import streamlit # noqa: F401, PLC0415
except ImportError:
click.echo(
"streamlit not installed. Run `uv sync --extra gui` first.",
err=True,
)
sys.exit(1)
main_path = Path(__file__).parent / "gui" / "main.py"
if not main_path.is_file():
click.echo(f"GUI entry point not found: {main_path}", err=True)
sys.exit(1)
env = os.environ.copy()
env["CERBERO_BITE_GUI_DB"] = str(db.resolve())
env["CERBERO_BITE_GUI_AUDIT"] = str(audit.resolve())
cmd = [
sys.executable,
"-m",
"streamlit",
"run",
str(main_path),
"--server.address",
"127.0.0.1",
"--server.port",
str(port),
"--server.headless",
"true" if headless else "false",
"--browser.gatherUsageStats",
"false",
]
click.echo(f"Launching GUI on http://127.0.0.1:{port}")
os.execvpe(cmd[0], cmd, env)
@main.command()
+241
View File
@@ -0,0 +1,241 @@
"""Read-only data access for the Streamlit GUI.
The GUI MUST NOT import ``runtime/`` modules nor make MCP calls. Every
piece of information shown on screen is derived from:
* SQLite (``data/state.sqlite``) via :class:`Repository`.
* The audit log (``data/audit.log``) via the parsing helpers in
:mod:`cerbero_bite.safety.audit_log`.
The module exposes small frozen dataclasses purpose-built for rendering
so each Streamlit page can grab a snapshot in one call instead of
poking at the repository directly.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Literal
from cerbero_bite.safety.audit_log import (
AuditChainError,
AuditEntry,
iter_entries,
verify_chain,
)
from cerbero_bite.state import Repository, connect
from cerbero_bite.state.models import PositionRecord, SystemStateRecord
__all__ = [
"DEFAULT_AUDIT_PATH",
"DEFAULT_DB_PATH",
"AuditChainStatus",
"EngineHealth",
"EngineSnapshot",
"load_audit_chain_status",
"load_audit_tail",
"load_engine_snapshot",
"load_open_positions",
]
DEFAULT_DB_PATH = Path("data/state.sqlite")
DEFAULT_AUDIT_PATH = Path("data/audit.log")
EngineHealth = Literal["running", "degraded", "killed", "stopped", "unknown"]
@dataclass(frozen=True)
class EngineSnapshot:
"""One-shot snapshot used by the Status page."""
health: EngineHealth
kill_switch_armed: bool
kill_reason: str | None
kill_at: datetime | None
last_health_check: datetime | None
last_health_check_age_s: float | None
started_at: datetime | None
config_version: str | None
last_audit_hash: str | None
open_positions: int
@property
def health_label(self) -> str:
return {
"running": "RUNNING",
"degraded": "DEGRADED",
"killed": "KILL SWITCH ARMED",
"stopped": "STOPPED",
"unknown": "UNKNOWN",
}[self.health]
@dataclass(frozen=True)
class AuditChainStatus:
"""Result of calling ``verify_chain`` on the audit log."""
ok: bool
entries_verified: int
error: str | None
def load_engine_snapshot(
*,
db_path: Path | str = DEFAULT_DB_PATH,
now: datetime | None = None,
stale_after_s: float = 600.0,
) -> EngineSnapshot:
"""Read system_state + open positions count and derive engine health.
Health rules:
* kill switch armed → ``killed``
* no system_state row → ``unknown`` (engine never started)
* last health check older than ``stale_after_s`` → ``stopped``
* last health check older than 2× cycle (10 min) but younger than
``stale_after_s`` → ``degraded``
* fresh health check → ``running``
"""
db_path = Path(db_path)
if not db_path.exists():
return EngineSnapshot(
health="unknown",
kill_switch_armed=False,
kill_reason=None,
kill_at=None,
last_health_check=None,
last_health_check_age_s=None,
started_at=None,
config_version=None,
last_audit_hash=None,
open_positions=0,
)
repo = Repository()
conn = connect(db_path)
try:
state: SystemStateRecord | None = repo.get_system_state(conn)
open_pos = len(repo.list_open_positions(conn))
finally:
conn.close()
if state is None:
return EngineSnapshot(
health="unknown",
kill_switch_armed=False,
kill_reason=None,
kill_at=None,
last_health_check=None,
last_health_check_age_s=None,
started_at=None,
config_version=None,
last_audit_hash=None,
open_positions=open_pos,
)
reference = (now or datetime.now(UTC)).astimezone(UTC)
last_check = state.last_health_check
age = (reference - last_check).total_seconds() if last_check else None
if state.kill_switch:
health: EngineHealth = "killed"
elif age is None:
health = "unknown"
elif age > stale_after_s:
health = "stopped"
elif age > 600: # over 10 minutes since last health probe
health = "degraded"
else:
health = "running"
return EngineSnapshot(
health=health,
kill_switch_armed=bool(state.kill_switch),
kill_reason=state.kill_reason,
kill_at=state.kill_at,
last_health_check=last_check,
last_health_check_age_s=age,
started_at=state.started_at,
config_version=state.config_version,
last_audit_hash=state.last_audit_hash,
open_positions=open_pos,
)
def load_open_positions(
*, db_path: Path | str = DEFAULT_DB_PATH
) -> list[PositionRecord]:
db_path = Path(db_path)
if not db_path.exists():
return []
repo = Repository()
conn = connect(db_path)
try:
return repo.list_open_positions(conn)
finally:
conn.close()
def load_audit_tail(
*,
audit_path: Path | str = DEFAULT_AUDIT_PATH,
limit: int = 100,
event_filter: str | None = None,
) -> list[AuditEntry]:
"""Return the most recent audit entries (newest first).
For the GUI we walk the entire file (the audit log is append-only and
bounded by daily rotation; reading 100 lines stays cheap). The
optional ``event_filter`` matches by exact event name.
"""
audit_path = Path(audit_path)
entries: list[AuditEntry] = []
if not audit_path.exists():
return entries
for entry in iter_entries(audit_path):
if event_filter and entry.event != event_filter:
continue
entries.append(entry)
entries.reverse() # newest first
return entries[:limit]
def load_audit_chain_status(
*, audit_path: Path | str = DEFAULT_AUDIT_PATH
) -> AuditChainStatus:
audit_path = Path(audit_path)
try:
n = verify_chain(audit_path)
except AuditChainError as exc:
return AuditChainStatus(ok=False, entries_verified=0, error=str(exc))
except Exception as exc: # pragma: no cover — surface unexpected IO errors
return AuditChainStatus(ok=False, entries_verified=0, error=str(exc))
return AuditChainStatus(ok=True, entries_verified=n, error=None)
def humanize_age(seconds: float | None) -> str:
if seconds is None:
return ""
if seconds < 60:
return f"{int(seconds)}s ago"
if seconds < 3600:
return f"{int(seconds / 60)}m ago"
if seconds < 86400:
return f"{seconds / 3600:.1f}h ago"
return f"{seconds / 86400:.1f}d ago"
def humanize_dt(value: datetime | None) -> str:
if value is None:
return ""
return value.astimezone(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
def humanize_timedelta(value: timedelta | None) -> str: # pragma: no cover
if value is None:
return ""
return f"{value.total_seconds() / 3600:.1f}h"
+138
View File
@@ -0,0 +1,138 @@
"""Streamlit entry point for the Cerbero Bite dashboard.
Launch with::
cerbero-bite gui
or directly::
uv run streamlit run src/cerbero_bite/gui/main.py \
--server.address 127.0.0.1 \
--server.port 8765 \
--server.headless true
The dashboard is **read-mostly**: it reads SQLite + the audit log and
never imports ``runtime/`` modules. Each Streamlit page is in
``gui/pages/`` and Streamlit auto-discovers them.
"""
from __future__ import annotations
import os
from pathlib import Path
import streamlit as st
from cerbero_bite.gui.data_layer import (
DEFAULT_AUDIT_PATH,
DEFAULT_DB_PATH,
humanize_age,
humanize_dt,
load_engine_snapshot,
)
PAGE_TITLE = "Cerbero Bite — Dashboard"
PAGE_ICON = "🐺"
# ---------------------------------------------------------------------------
# Path resolution
# ---------------------------------------------------------------------------
def _resolve_paths() -> tuple[Path, Path]:
"""Read DB / audit paths from env (settable by ``cerbero-bite gui``)."""
db_path = Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH))
audit_path = Path(os.environ.get("CERBERO_BITE_GUI_AUDIT", DEFAULT_AUDIT_PATH))
return db_path, audit_path
# ---------------------------------------------------------------------------
# Sidebar
# ---------------------------------------------------------------------------
_HEALTH_BADGES: dict[str, tuple[str, str]] = {
"running": ("🟢", "RUNNING"),
"degraded": ("🟡", "DEGRADED"),
"killed": ("🔴", "KILL SWITCH"),
"stopped": ("", "STOPPED"),
"unknown": ("", "UNKNOWN"),
}
def _render_sidebar(db_path: Path, audit_path: Path) -> None:
snap = load_engine_snapshot(db_path=db_path)
icon, label = _HEALTH_BADGES.get(snap.health, ("", "UNKNOWN"))
st.sidebar.markdown(f"### {icon} {label}")
if snap.kill_switch_armed:
st.sidebar.error(
f"**Kill switch armed**\n\n"
f"reason: {snap.kill_reason or ''}\n\n"
f"since: {humanize_dt(snap.kill_at)}"
)
st.sidebar.metric(
"Last health check",
humanize_age(snap.last_health_check_age_s),
)
st.sidebar.metric("Open positions", snap.open_positions)
st.sidebar.caption(f"config: `{snap.config_version or ''}`")
st.sidebar.divider()
st.sidebar.caption("Read-only • localhost only")
st.sidebar.caption(f"db: `{db_path}`")
st.sidebar.caption(f"audit: `{audit_path}`")
# ---------------------------------------------------------------------------
# Home page
# ---------------------------------------------------------------------------
def main() -> None:
st.set_page_config(
page_title=PAGE_TITLE,
page_icon=PAGE_ICON,
layout="wide",
initial_sidebar_state="expanded",
)
db_path, audit_path = _resolve_paths()
_render_sidebar(db_path, audit_path)
st.title(f"{PAGE_ICON} Cerbero Bite")
st.caption(
"Rule-based ETH credit-spread engine — read-only dashboard"
)
st.markdown(
"""
Use the sidebar to navigate:
- **Status** — engine health, kill switch, open positions, audit anchor
- **Audit** — live audit log stream + chain integrity verification
The dashboard reads `data/state.sqlite` and `data/audit.log` directly;
it never calls MCP services or the broker. All write actions remain
on the CLI for now.
"""
)
snap = load_engine_snapshot(db_path=db_path)
cols = st.columns(4)
cols[0].metric("Health", _HEALTH_BADGES[snap.health][1])
cols[1].metric(
"Kill switch",
"ARMED" if snap.kill_switch_armed else "DISARMED",
)
cols[2].metric("Open positions", snap.open_positions)
cols[3].metric(
"Last health check",
humanize_age(snap.last_health_check_age_s),
)
if __name__ == "__main__":
main()
+116
View File
@@ -0,0 +1,116 @@
"""Status page — engine health at a glance."""
from __future__ import annotations
import os
from pathlib import Path
import streamlit as st
from cerbero_bite.gui.data_layer import (
DEFAULT_AUDIT_PATH,
DEFAULT_DB_PATH,
humanize_age,
humanize_dt,
load_engine_snapshot,
load_open_positions,
)
def _resolve_paths() -> tuple[Path, Path]:
db_path = Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH))
audit_path = Path(os.environ.get("CERBERO_BITE_GUI_AUDIT", DEFAULT_AUDIT_PATH))
return db_path, audit_path
_HEALTH_COLORS = {
"running": ("🟢", "success"),
"degraded": ("🟡", "warning"),
"killed": ("🔴", "error"),
"stopped": ("", "warning"),
"unknown": ("", "info"),
}
def render() -> None:
st.title("📊 Status")
st.caption("Engine health, kill switch, open positions and audit anchor.")
db_path, _ = _resolve_paths()
snap = load_engine_snapshot(db_path=db_path)
icon, level = _HEALTH_COLORS.get(snap.health, ("", "info"))
banner = f"{icon} **{snap.health_label}**"
if level == "success":
st.success(banner)
elif level == "warning":
st.warning(banner)
elif level == "error":
st.error(banner)
else:
st.info(banner)
if snap.kill_switch_armed:
st.error(
f"**Kill switch armed** — engine will refuse new entries.\n\n"
f"- reason: `{snap.kill_reason or ''}`\n"
f"- since: `{humanize_dt(snap.kill_at)}`\n\n"
"Disarm via CLI: `cerbero-bite kill-switch disarm --reason '<your reason>'`"
)
# Top metrics
cols = st.columns(4)
cols[0].metric("Open positions", snap.open_positions)
cols[1].metric(
"Last health check", humanize_age(snap.last_health_check_age_s)
)
cols[2].metric("Started at", humanize_dt(snap.started_at))
cols[3].metric("Config version", snap.config_version or "")
st.divider()
# Audit anchor
st.subheader("Audit anchor")
if snap.last_audit_hash is None:
st.info("No anchor recorded yet.")
else:
short = (
f"{snap.last_audit_hash[:12]}{snap.last_audit_hash[-12:]}"
if len(snap.last_audit_hash) > 24
else snap.last_audit_hash
)
st.code(short, language="text")
st.caption(
"Last hash chain head persisted in `system_state.last_audit_hash`. "
"On boot the orchestrator compares this with the audit-log file tail; "
"a mismatch arms the kill switch (CRITICAL)."
)
st.divider()
# Open positions table
st.subheader("Open positions")
positions = load_open_positions(db_path=db_path)
if not positions:
st.info("No open positions.")
else:
rows = [
{
"proposal_id": str(p.proposal_id)[:8],
"spread": p.spread_type,
"asset": p.asset,
"n_contracts": p.n_contracts,
"credit_usd": f"{p.credit_usd:.2f}",
"max_loss_usd": f"{p.max_loss_usd:.2f}",
"short_strike": f"{p.short_strike}",
"long_strike": f"{p.long_strike}",
"status": p.status,
"opened_at": humanize_dt(p.opened_at),
"expiry": humanize_dt(p.expiry),
}
for p in positions
]
st.dataframe(rows, use_container_width=True)
render()
+121
View File
@@ -0,0 +1,121 @@
"""Audit page — live audit log stream + chain integrity verification."""
from __future__ import annotations
import json
import os
from collections import Counter
from pathlib import Path
import streamlit as st
from cerbero_bite.gui.data_layer import (
DEFAULT_AUDIT_PATH,
DEFAULT_DB_PATH,
humanize_dt,
load_audit_chain_status,
load_audit_tail,
)
def _resolve_paths() -> tuple[Path, Path]:
db_path = Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH))
audit_path = Path(os.environ.get("CERBERO_BITE_GUI_AUDIT", DEFAULT_AUDIT_PATH))
return db_path, audit_path
def render() -> None:
st.title("🔍 Audit")
st.caption(
"Append-only hash-chained audit log "
"(`data/audit.log`). Reading is non-mutating."
)
_, audit_path = _resolve_paths()
col_l, col_r = st.columns([1, 2])
with col_l:
st.subheader("Chain integrity")
if st.button("Verify chain", type="primary"):
with st.spinner("Walking the chain…"):
status = load_audit_chain_status(audit_path=audit_path)
if status.ok:
st.success(
f"✅ chain integra fino a {status.entries_verified} eventi"
)
else:
st.error(
f"❌ tampering rilevato\n\n```\n{status.error}\n```"
)
else:
st.caption(
"Click to recompute every line's hash and verify the prev-hash "
"linkage. Mismatch → CRITICAL alert in production."
)
with col_r:
st.subheader("Filters")
limit = st.slider(
"Last N entries",
min_value=10,
max_value=500,
value=100,
step=10,
)
# Build event list from the available tail
all_recent = load_audit_tail(audit_path=audit_path, limit=limit)
events_present = sorted({e.event for e in all_recent})
event_filter = st.selectbox(
"Event filter",
options=["(all)", *events_present],
index=0,
)
st.divider()
# Statistics strip
counter: Counter[str] = Counter(e.event for e in all_recent)
if counter:
cols = st.columns(min(len(counter), 6))
for col, (event, count) in zip(cols, counter.most_common(6), strict=False):
col.metric(event, count)
st.divider()
# Filtered tail
filtered = (
all_recent
if event_filter == "(all)"
else [e for e in all_recent if e.event == event_filter]
)
st.subheader(f"Last entries ({len(filtered)} shown)")
if not filtered:
st.info("No matching audit entries.")
return
rows = []
for entry in filtered:
try:
payload_pretty = json.dumps(
entry.payload, ensure_ascii=False, sort_keys=True
)
except (TypeError, ValueError):
payload_pretty = str(entry.payload)
rows.append(
{
"timestamp": humanize_dt(entry.timestamp),
"event": entry.event,
"payload": payload_pretty,
"hash": (
f"{entry.hash[:8]}{entry.hash[-8:]}"
if len(entry.hash) > 16
else entry.hash
),
}
)
st.dataframe(rows, use_container_width=True, hide_index=True)
render()