diff --git a/pyproject.toml b/pyproject.toml index f307c1b..94475a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,6 +96,9 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "tests/**" = ["PLR2004", "ARG", "S101", "ERA001", "B017"] +# Streamlit auto-discovers pages whose file names start with a number and +# may contain icons; the convention conflicts with N999. +"src/cerbero_bite/gui/pages/*" = ["N999"] [tool.ruff.format] quote-style = "double" diff --git a/src/cerbero_bite/cli.py b/src/cerbero_bite/cli.py index 7c12a18..02132cb 100644 --- a/src/cerbero_bite/cli.py +++ b/src/cerbero_bite/cli.py @@ -10,6 +10,7 @@ without changing the surface. from __future__ import annotations import asyncio +import os import sys from collections.abc import Callable from datetime import UTC, datetime @@ -580,9 +581,70 @@ async def _ping_all( @main.command() -def gui() -> None: - """Launch the Streamlit dashboard.""" - _phase0_notice("gui command not yet implemented (will run streamlit on 127.0.0.1:8765).") +@click.option( + "--db", + type=click.Path(path_type=Path), + default=_DEFAULT_DB_PATH, + show_default=True, + help="SQLite state file the dashboard reads.", +) +@click.option( + "--audit", + type=click.Path(path_type=Path), + default=_DEFAULT_AUDIT_PATH, + show_default=True, + help="Audit log file the dashboard streams.", +) +@click.option( + "--port", + type=int, + default=8765, + show_default=True, + help="Local port to bind (always 127.0.0.1).", +) +@click.option( + "--headless/--no-headless", + default=True, + show_default=True, + help="When true, do not auto-open the browser.", +) +def gui(db: Path, audit: Path, port: int, headless: bool) -> None: + """Launch the Streamlit dashboard (read-only, localhost only).""" + try: + import streamlit # noqa: F401, PLC0415 + except ImportError: + click.echo( + "streamlit not installed. Run `uv sync --extra gui` first.", + err=True, + ) + sys.exit(1) + + main_path = Path(__file__).parent / "gui" / "main.py" + if not main_path.is_file(): + click.echo(f"GUI entry point not found: {main_path}", err=True) + sys.exit(1) + + env = os.environ.copy() + env["CERBERO_BITE_GUI_DB"] = str(db.resolve()) + env["CERBERO_BITE_GUI_AUDIT"] = str(audit.resolve()) + + cmd = [ + sys.executable, + "-m", + "streamlit", + "run", + str(main_path), + "--server.address", + "127.0.0.1", + "--server.port", + str(port), + "--server.headless", + "true" if headless else "false", + "--browser.gatherUsageStats", + "false", + ] + click.echo(f"Launching GUI on http://127.0.0.1:{port} …") + os.execvpe(cmd[0], cmd, env) @main.command() diff --git a/src/cerbero_bite/gui/data_layer.py b/src/cerbero_bite/gui/data_layer.py new file mode 100644 index 0000000..b91221f --- /dev/null +++ b/src/cerbero_bite/gui/data_layer.py @@ -0,0 +1,241 @@ +"""Read-only data access for the Streamlit GUI. + +The GUI MUST NOT import ``runtime/`` modules nor make MCP calls. Every +piece of information shown on screen is derived from: + +* SQLite (``data/state.sqlite``) via :class:`Repository`. +* The audit log (``data/audit.log``) via the parsing helpers in + :mod:`cerbero_bite.safety.audit_log`. + +The module exposes small frozen dataclasses purpose-built for rendering +so each Streamlit page can grab a snapshot in one call instead of +poking at the repository directly. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Literal + +from cerbero_bite.safety.audit_log import ( + AuditChainError, + AuditEntry, + iter_entries, + verify_chain, +) +from cerbero_bite.state import Repository, connect +from cerbero_bite.state.models import PositionRecord, SystemStateRecord + +__all__ = [ + "DEFAULT_AUDIT_PATH", + "DEFAULT_DB_PATH", + "AuditChainStatus", + "EngineHealth", + "EngineSnapshot", + "load_audit_chain_status", + "load_audit_tail", + "load_engine_snapshot", + "load_open_positions", +] + + +DEFAULT_DB_PATH = Path("data/state.sqlite") +DEFAULT_AUDIT_PATH = Path("data/audit.log") + + +EngineHealth = Literal["running", "degraded", "killed", "stopped", "unknown"] + + +@dataclass(frozen=True) +class EngineSnapshot: + """One-shot snapshot used by the Status page.""" + + health: EngineHealth + kill_switch_armed: bool + kill_reason: str | None + kill_at: datetime | None + last_health_check: datetime | None + last_health_check_age_s: float | None + started_at: datetime | None + config_version: str | None + last_audit_hash: str | None + open_positions: int + + @property + def health_label(self) -> str: + return { + "running": "RUNNING", + "degraded": "DEGRADED", + "killed": "KILL SWITCH ARMED", + "stopped": "STOPPED", + "unknown": "UNKNOWN", + }[self.health] + + +@dataclass(frozen=True) +class AuditChainStatus: + """Result of calling ``verify_chain`` on the audit log.""" + + ok: bool + entries_verified: int + error: str | None + + +def load_engine_snapshot( + *, + db_path: Path | str = DEFAULT_DB_PATH, + now: datetime | None = None, + stale_after_s: float = 600.0, +) -> EngineSnapshot: + """Read system_state + open positions count and derive engine health. + + Health rules: + + * kill switch armed → ``killed`` + * no system_state row → ``unknown`` (engine never started) + * last health check older than ``stale_after_s`` → ``stopped`` + * last health check older than 2× cycle (10 min) but younger than + ``stale_after_s`` → ``degraded`` + * fresh health check → ``running`` + """ + db_path = Path(db_path) + if not db_path.exists(): + return EngineSnapshot( + health="unknown", + kill_switch_armed=False, + kill_reason=None, + kill_at=None, + last_health_check=None, + last_health_check_age_s=None, + started_at=None, + config_version=None, + last_audit_hash=None, + open_positions=0, + ) + + repo = Repository() + conn = connect(db_path) + try: + state: SystemStateRecord | None = repo.get_system_state(conn) + open_pos = len(repo.list_open_positions(conn)) + finally: + conn.close() + + if state is None: + return EngineSnapshot( + health="unknown", + kill_switch_armed=False, + kill_reason=None, + kill_at=None, + last_health_check=None, + last_health_check_age_s=None, + started_at=None, + config_version=None, + last_audit_hash=None, + open_positions=open_pos, + ) + + reference = (now or datetime.now(UTC)).astimezone(UTC) + last_check = state.last_health_check + age = (reference - last_check).total_seconds() if last_check else None + + if state.kill_switch: + health: EngineHealth = "killed" + elif age is None: + health = "unknown" + elif age > stale_after_s: + health = "stopped" + elif age > 600: # over 10 minutes since last health probe + health = "degraded" + else: + health = "running" + + return EngineSnapshot( + health=health, + kill_switch_armed=bool(state.kill_switch), + kill_reason=state.kill_reason, + kill_at=state.kill_at, + last_health_check=last_check, + last_health_check_age_s=age, + started_at=state.started_at, + config_version=state.config_version, + last_audit_hash=state.last_audit_hash, + open_positions=open_pos, + ) + + +def load_open_positions( + *, db_path: Path | str = DEFAULT_DB_PATH +) -> list[PositionRecord]: + db_path = Path(db_path) + if not db_path.exists(): + return [] + repo = Repository() + conn = connect(db_path) + try: + return repo.list_open_positions(conn) + finally: + conn.close() + + +def load_audit_tail( + *, + audit_path: Path | str = DEFAULT_AUDIT_PATH, + limit: int = 100, + event_filter: str | None = None, +) -> list[AuditEntry]: + """Return the most recent audit entries (newest first). + + For the GUI we walk the entire file (the audit log is append-only and + bounded by daily rotation; reading 100 lines stays cheap). The + optional ``event_filter`` matches by exact event name. + """ + audit_path = Path(audit_path) + entries: list[AuditEntry] = [] + if not audit_path.exists(): + return entries + for entry in iter_entries(audit_path): + if event_filter and entry.event != event_filter: + continue + entries.append(entry) + entries.reverse() # newest first + return entries[:limit] + + +def load_audit_chain_status( + *, audit_path: Path | str = DEFAULT_AUDIT_PATH +) -> AuditChainStatus: + audit_path = Path(audit_path) + try: + n = verify_chain(audit_path) + except AuditChainError as exc: + return AuditChainStatus(ok=False, entries_verified=0, error=str(exc)) + except Exception as exc: # pragma: no cover — surface unexpected IO errors + return AuditChainStatus(ok=False, entries_verified=0, error=str(exc)) + return AuditChainStatus(ok=True, entries_verified=n, error=None) + + +def humanize_age(seconds: float | None) -> str: + if seconds is None: + return "—" + if seconds < 60: + return f"{int(seconds)}s ago" + if seconds < 3600: + return f"{int(seconds / 60)}m ago" + if seconds < 86400: + return f"{seconds / 3600:.1f}h ago" + return f"{seconds / 86400:.1f}d ago" + + +def humanize_dt(value: datetime | None) -> str: + if value is None: + return "—" + return value.astimezone(UTC).strftime("%Y-%m-%d %H:%M:%S UTC") + + +def humanize_timedelta(value: timedelta | None) -> str: # pragma: no cover + if value is None: + return "—" + return f"{value.total_seconds() / 3600:.1f}h" diff --git a/src/cerbero_bite/gui/main.py b/src/cerbero_bite/gui/main.py new file mode 100644 index 0000000..26b69f6 --- /dev/null +++ b/src/cerbero_bite/gui/main.py @@ -0,0 +1,138 @@ +"""Streamlit entry point for the Cerbero Bite dashboard. + +Launch with:: + + cerbero-bite gui + +or directly:: + + uv run streamlit run src/cerbero_bite/gui/main.py \ + --server.address 127.0.0.1 \ + --server.port 8765 \ + --server.headless true + +The dashboard is **read-mostly**: it reads SQLite + the audit log and +never imports ``runtime/`` modules. Each Streamlit page is in +``gui/pages/`` and Streamlit auto-discovers them. +""" + +from __future__ import annotations + +import os +from pathlib import Path + +import streamlit as st + +from cerbero_bite.gui.data_layer import ( + DEFAULT_AUDIT_PATH, + DEFAULT_DB_PATH, + humanize_age, + humanize_dt, + load_engine_snapshot, +) + +PAGE_TITLE = "Cerbero Bite — Dashboard" +PAGE_ICON = "🐺" + + +# --------------------------------------------------------------------------- +# Path resolution +# --------------------------------------------------------------------------- + + +def _resolve_paths() -> tuple[Path, Path]: + """Read DB / audit paths from env (settable by ``cerbero-bite gui``).""" + db_path = Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH)) + audit_path = Path(os.environ.get("CERBERO_BITE_GUI_AUDIT", DEFAULT_AUDIT_PATH)) + return db_path, audit_path + + +# --------------------------------------------------------------------------- +# Sidebar +# --------------------------------------------------------------------------- + + +_HEALTH_BADGES: dict[str, tuple[str, str]] = { + "running": ("🟢", "RUNNING"), + "degraded": ("🟡", "DEGRADED"), + "killed": ("🔴", "KILL SWITCH"), + "stopped": ("⚫", "STOPPED"), + "unknown": ("⚪", "UNKNOWN"), +} + + +def _render_sidebar(db_path: Path, audit_path: Path) -> None: + snap = load_engine_snapshot(db_path=db_path) + icon, label = _HEALTH_BADGES.get(snap.health, ("⚪", "UNKNOWN")) + + st.sidebar.markdown(f"### {icon} {label}") + if snap.kill_switch_armed: + st.sidebar.error( + f"**Kill switch armed**\n\n" + f"reason: {snap.kill_reason or '—'}\n\n" + f"since: {humanize_dt(snap.kill_at)}" + ) + + st.sidebar.metric( + "Last health check", + humanize_age(snap.last_health_check_age_s), + ) + st.sidebar.metric("Open positions", snap.open_positions) + st.sidebar.caption(f"config: `{snap.config_version or '—'}`") + + st.sidebar.divider() + st.sidebar.caption("Read-only • localhost only") + st.sidebar.caption(f"db: `{db_path}`") + st.sidebar.caption(f"audit: `{audit_path}`") + + +# --------------------------------------------------------------------------- +# Home page +# --------------------------------------------------------------------------- + + +def main() -> None: + st.set_page_config( + page_title=PAGE_TITLE, + page_icon=PAGE_ICON, + layout="wide", + initial_sidebar_state="expanded", + ) + + db_path, audit_path = _resolve_paths() + _render_sidebar(db_path, audit_path) + + st.title(f"{PAGE_ICON} Cerbero Bite") + st.caption( + "Rule-based ETH credit-spread engine — read-only dashboard" + ) + + st.markdown( + """ + Use the sidebar to navigate: + + - **Status** — engine health, kill switch, open positions, audit anchor + - **Audit** — live audit log stream + chain integrity verification + + The dashboard reads `data/state.sqlite` and `data/audit.log` directly; + it never calls MCP services or the broker. All write actions remain + on the CLI for now. + """ + ) + + snap = load_engine_snapshot(db_path=db_path) + cols = st.columns(4) + cols[0].metric("Health", _HEALTH_BADGES[snap.health][1]) + cols[1].metric( + "Kill switch", + "ARMED" if snap.kill_switch_armed else "DISARMED", + ) + cols[2].metric("Open positions", snap.open_positions) + cols[3].metric( + "Last health check", + humanize_age(snap.last_health_check_age_s), + ) + + +if __name__ == "__main__": + main() diff --git a/src/cerbero_bite/gui/pages/1_📊_Status.py b/src/cerbero_bite/gui/pages/1_📊_Status.py new file mode 100644 index 0000000..d4d16df --- /dev/null +++ b/src/cerbero_bite/gui/pages/1_📊_Status.py @@ -0,0 +1,116 @@ +"""Status page — engine health at a glance.""" + +from __future__ import annotations + +import os +from pathlib import Path + +import streamlit as st + +from cerbero_bite.gui.data_layer import ( + DEFAULT_AUDIT_PATH, + DEFAULT_DB_PATH, + humanize_age, + humanize_dt, + load_engine_snapshot, + load_open_positions, +) + + +def _resolve_paths() -> tuple[Path, Path]: + db_path = Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH)) + audit_path = Path(os.environ.get("CERBERO_BITE_GUI_AUDIT", DEFAULT_AUDIT_PATH)) + return db_path, audit_path + + +_HEALTH_COLORS = { + "running": ("🟢", "success"), + "degraded": ("🟡", "warning"), + "killed": ("🔴", "error"), + "stopped": ("⚫", "warning"), + "unknown": ("⚪", "info"), +} + + +def render() -> None: + st.title("📊 Status") + st.caption("Engine health, kill switch, open positions and audit anchor.") + + db_path, _ = _resolve_paths() + snap = load_engine_snapshot(db_path=db_path) + + icon, level = _HEALTH_COLORS.get(snap.health, ("⚪", "info")) + banner = f"{icon} **{snap.health_label}**" + if level == "success": + st.success(banner) + elif level == "warning": + st.warning(banner) + elif level == "error": + st.error(banner) + else: + st.info(banner) + + if snap.kill_switch_armed: + st.error( + f"**Kill switch armed** — engine will refuse new entries.\n\n" + f"- reason: `{snap.kill_reason or '—'}`\n" + f"- since: `{humanize_dt(snap.kill_at)}`\n\n" + "Disarm via CLI: `cerbero-bite kill-switch disarm --reason ''`" + ) + + # Top metrics + cols = st.columns(4) + cols[0].metric("Open positions", snap.open_positions) + cols[1].metric( + "Last health check", humanize_age(snap.last_health_check_age_s) + ) + cols[2].metric("Started at", humanize_dt(snap.started_at)) + cols[3].metric("Config version", snap.config_version or "—") + + st.divider() + + # Audit anchor + st.subheader("Audit anchor") + if snap.last_audit_hash is None: + st.info("No anchor recorded yet.") + else: + short = ( + f"{snap.last_audit_hash[:12]}…{snap.last_audit_hash[-12:]}" + if len(snap.last_audit_hash) > 24 + else snap.last_audit_hash + ) + st.code(short, language="text") + st.caption( + "Last hash chain head persisted in `system_state.last_audit_hash`. " + "On boot the orchestrator compares this with the audit-log file tail; " + "a mismatch arms the kill switch (CRITICAL)." + ) + + st.divider() + + # Open positions table + st.subheader("Open positions") + positions = load_open_positions(db_path=db_path) + if not positions: + st.info("No open positions.") + else: + rows = [ + { + "proposal_id": str(p.proposal_id)[:8], + "spread": p.spread_type, + "asset": p.asset, + "n_contracts": p.n_contracts, + "credit_usd": f"{p.credit_usd:.2f}", + "max_loss_usd": f"{p.max_loss_usd:.2f}", + "short_strike": f"{p.short_strike}", + "long_strike": f"{p.long_strike}", + "status": p.status, + "opened_at": humanize_dt(p.opened_at), + "expiry": humanize_dt(p.expiry), + } + for p in positions + ] + st.dataframe(rows, use_container_width=True) + + +render() diff --git a/src/cerbero_bite/gui/pages/2_🔍_Audit.py b/src/cerbero_bite/gui/pages/2_🔍_Audit.py new file mode 100644 index 0000000..5de04f5 --- /dev/null +++ b/src/cerbero_bite/gui/pages/2_🔍_Audit.py @@ -0,0 +1,121 @@ +"""Audit page — live audit log stream + chain integrity verification.""" + +from __future__ import annotations + +import json +import os +from collections import Counter +from pathlib import Path + +import streamlit as st + +from cerbero_bite.gui.data_layer import ( + DEFAULT_AUDIT_PATH, + DEFAULT_DB_PATH, + humanize_dt, + load_audit_chain_status, + load_audit_tail, +) + + +def _resolve_paths() -> tuple[Path, Path]: + db_path = Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH)) + audit_path = Path(os.environ.get("CERBERO_BITE_GUI_AUDIT", DEFAULT_AUDIT_PATH)) + return db_path, audit_path + + +def render() -> None: + st.title("🔍 Audit") + st.caption( + "Append-only hash-chained audit log " + "(`data/audit.log`). Reading is non-mutating." + ) + + _, audit_path = _resolve_paths() + + col_l, col_r = st.columns([1, 2]) + + with col_l: + st.subheader("Chain integrity") + if st.button("Verify chain", type="primary"): + with st.spinner("Walking the chain…"): + status = load_audit_chain_status(audit_path=audit_path) + if status.ok: + st.success( + f"✅ chain integra fino a {status.entries_verified} eventi" + ) + else: + st.error( + f"❌ tampering rilevato\n\n```\n{status.error}\n```" + ) + else: + st.caption( + "Click to recompute every line's hash and verify the prev-hash " + "linkage. Mismatch → CRITICAL alert in production." + ) + + with col_r: + st.subheader("Filters") + limit = st.slider( + "Last N entries", + min_value=10, + max_value=500, + value=100, + step=10, + ) + # Build event list from the available tail + all_recent = load_audit_tail(audit_path=audit_path, limit=limit) + events_present = sorted({e.event for e in all_recent}) + event_filter = st.selectbox( + "Event filter", + options=["(all)", *events_present], + index=0, + ) + + st.divider() + + # Statistics strip + counter: Counter[str] = Counter(e.event for e in all_recent) + if counter: + cols = st.columns(min(len(counter), 6)) + for col, (event, count) in zip(cols, counter.most_common(6), strict=False): + col.metric(event, count) + + st.divider() + + # Filtered tail + filtered = ( + all_recent + if event_filter == "(all)" + else [e for e in all_recent if e.event == event_filter] + ) + + st.subheader(f"Last entries ({len(filtered)} shown)") + if not filtered: + st.info("No matching audit entries.") + return + + rows = [] + for entry in filtered: + try: + payload_pretty = json.dumps( + entry.payload, ensure_ascii=False, sort_keys=True + ) + except (TypeError, ValueError): + payload_pretty = str(entry.payload) + rows.append( + { + "timestamp": humanize_dt(entry.timestamp), + "event": entry.event, + "payload": payload_pretty, + "hash": ( + f"{entry.hash[:8]}…{entry.hash[-8:]}" + if len(entry.hash) > 16 + else entry.hash + ), + } + ) + st.dataframe(rows, use_container_width=True, hide_index=True) + + +render()