"""Audit page — live audit log stream + chain integrity verification.""" from __future__ import annotations import json import os from collections import Counter from pathlib import Path import streamlit as st from cerbero_bite.gui.data_layer import ( DEFAULT_AUDIT_PATH, DEFAULT_DB_PATH, humanize_dt, load_audit_chain_status, load_audit_tail, ) def _resolve_paths() -> tuple[Path, Path]: db_path = Path(os.environ.get("CERBERO_BITE_GUI_DB", DEFAULT_DB_PATH)) audit_path = Path(os.environ.get("CERBERO_BITE_GUI_AUDIT", DEFAULT_AUDIT_PATH)) return db_path, audit_path def render() -> None: st.title("🔍 Audit") st.caption( "Registro audit append-only con hash chain " "(`data/audit.log`). La lettura non modifica nulla." ) _, audit_path = _resolve_paths() col_l, col_r = st.columns([1, 2]) with col_l: st.subheader("Integrità catena") if st.button("Verifica catena", type="primary"): with st.spinner("Sto percorrendo la catena…"): status = load_audit_chain_status(audit_path=audit_path) if status.ok: st.success( f"✅ catena integra fino a {status.entries_verified} eventi" ) else: st.error( f"❌ tampering rilevato\n\n```\n{status.error}\n```" ) else: st.caption( "Premi per ricalcolare l'hash di ogni riga e verificare il " "collegamento prev-hash. Mismatch → alert CRITICAL in " "produzione." ) with col_r: st.subheader("Filtri") limit = st.slider( "Ultimi N eventi", min_value=10, max_value=500, value=100, step=10, ) # Build event list from the available tail all_recent = load_audit_tail(audit_path=audit_path, limit=limit) events_present = sorted({e.event for e in all_recent}) event_filter = st.selectbox( "Filtro per evento", options=["(tutti)", *events_present], index=0, ) st.divider() # Statistics strip counter: Counter[str] = Counter(e.event for e in all_recent) if counter: cols = st.columns(min(len(counter), 6)) for col, (event, count) in zip(cols, counter.most_common(6), strict=False): col.metric(event, count) st.divider() # Tail filtrata filtered = ( all_recent if event_filter == "(tutti)" else [e for e in all_recent if e.event == event_filter] ) st.subheader(f"Ultimi eventi ({len(filtered)} mostrati)") if not filtered: st.info("Nessun evento corrisponde ai filtri.") return rows = [] for entry in filtered: try: payload_pretty = json.dumps( entry.payload, ensure_ascii=False, sort_keys=True ) except (TypeError, ValueError): payload_pretty = str(entry.payload) rows.append( { "timestamp": humanize_dt(entry.timestamp), "evento": entry.event, "payload": payload_pretty, "hash": ( f"{entry.hash[:8]}…{entry.hash[-8:]}" if len(entry.hash) > 16 else entry.hash ), } ) st.dataframe(rows, use_container_width=True, hide_index=True) render()