Files
Cerbero-Bite/src/cerbero_bite/cli.py
T
root f664ea1a15 feat(backtest): stylized engine over market_snapshots + CLI subcommand
Aggiunge `core/backtest.py`, motore di backtesting stilizzato che gira
sui dati raccolti in `market_snapshots`. Risponde alla domanda:
"se questa config fosse stata attiva nelle ultime N settimane, quanti
lunedì avrebbero superato i filtri e quale sarebbe stato il P/L stimato?"

**Architettura a due strati**:

1. **Filtri di entry — RIGOROSO**: per ogni Monday-14:00-UTC nei
   snapshot ricostruisce `EntryContext` e chiama lo stesso
   `validate_entry()` del live. Output esatto di "cosa avrebbe deciso
   il bot" per ogni settimana, con conteggio dei motivi di skip.

2. **P/L per trade accettato — STILIZZATO**: senza catena opzioni
   storica, stima credito/exit via Black-Scholes con skew premium
   (default 1.5×) per approssimare la vol smile dell'ETH. Re-prezza
   il combo ad ogni tick futuro per simulare i trigger §7
   (profit_take, stop_loss, vol_stop, time_stop, expiry).

**Aggregati nel `BacktestReport`**:
- n_picks / n_accepted / n_skipped_data / n_completed / n_winners
- win_rate, P/L cumulato (USD + % su capitale)
- max drawdown (USD + % di peak)
- Sharpe annualizzato (52 settimane)
- skip_reasons: dict{motivo → settimane bloccate}

**CLI**: nuovo `cerbero-bite backtest --strategy F --from D --to D
--capital N --asset ETH`. Stampa Rich-formatted summary + tabella
motivi di skip. Esempio:

    cerbero-bite backtest \
      --strategy strategy.aggressiva.yaml \
      --from 2026-04-01 --to 2026-05-01 \
      --capital 10000

**Limiti dichiarati**:
- BS + skew_premium ≠ catena reale: i numeri P/L sono **stime ex-post
  per ranking config**, non promesse operative. Buono per dire
  "config A batte config B sui dati reali", non per dimensionare
  capitale.
- skew_premium 1.5× è stato calibrato sui dati Deribit storici
  (smile slope ETH options); va rifinito quando avremo abbastanza
  chain history da farlo empiricamente.

**Tests**: 15 unit test (BS math, monday picks, filter sim,
position outcome simulation, full pipeline su sintetico).
Suite totale: 420 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:31:54 +00:00

975 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Command-line interface for Cerbero Bite.
The CLI is the single user-facing entry point. Each subcommand maps to a
specific operational action documented in ``docs/06-operational-flow.md``
and ``docs/07-risk-controls.md``. All commands are placeholders in
Phase 0; subsequent phases will replace the bodies with real logic
without changing the surface.
"""
from __future__ import annotations
import asyncio
import os
import sys
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from pathlib import Path
from typing import Any
import click
from rich.console import Console
from rich.table import Table
from cerbero_bite import __version__
from cerbero_bite.clients import HttpToolClient, McpError
from cerbero_bite.clients.deribit import DeribitClient
from cerbero_bite.clients.hyperliquid import HyperliquidClient
from cerbero_bite.clients.macro import MacroClient
from cerbero_bite.clients.sentiment import SentimentClient
from cerbero_bite.config.loader import compute_config_hash, load_strategy
from cerbero_bite.config.mcp_endpoints import (
DEFAULT_ENDPOINTS,
load_bot_tag,
load_endpoints,
load_token,
)
from cerbero_bite.config.runtime_flags import load_runtime_flags
from cerbero_bite.logging import configure as configure_logging
from cerbero_bite.logging import get_logger
from cerbero_bite.runtime.orchestrator import Orchestrator, make_orchestrator
from cerbero_bite.safety.audit_log import AuditChainError, AuditLog
from cerbero_bite.safety.audit_log import verify_chain as verify_audit_chain
from cerbero_bite.safety.kill_switch import KillSwitch
from cerbero_bite.state import Repository, run_migrations, transaction
from cerbero_bite.state import connect as connect_state
console = Console()
log = get_logger("cli")
_DEFAULT_DB_PATH = Path("data/state.sqlite")
_DEFAULT_AUDIT_PATH = Path("data/audit.log")
_DEFAULT_STRATEGY_PATH = Path("strategy.yaml")
def _phase0_notice(action: str) -> None:
console.print(f"[yellow]\\[phase 0 placeholder][/yellow] {action}")
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.version_option(__version__, prog_name="cerbero-bite")
@click.option(
"--log-dir",
type=click.Path(file_okay=False, path_type=Path),
default=Path("data/log"),
show_default=True,
help="Directory for JSONL log files.",
)
@click.option(
"--log-level",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"], case_sensitive=False),
default="INFO",
show_default=True,
)
@click.pass_context
def main(ctx: click.Context, log_dir: Path, log_level: str) -> None:
"""Cerbero Bite — rule-based ETH credit spread engine."""
# Load `.env` once at CLI entry, unless we are running under
# pytest (which sets ``PYTEST_CURRENT_TEST`` for the duration of
# the test). Existing env vars win over the file (override=False).
if "PYTEST_CURRENT_TEST" not in os.environ:
from dotenv import load_dotenv # noqa: PLC0415
load_dotenv(Path.cwd() / ".env", override=False)
configure_logging(log_dir=log_dir, level=log_level.upper())
ctx.ensure_object(dict)
ctx.obj["log_dir"] = log_dir
@main.command()
@click.option(
"--db",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
def status(db: Path) -> None:
"""Print engine status snapshot."""
header = f"[bold cyan]Cerbero Bite[/bold cyan] v{__version__}"
if not db.exists():
console.print(
f"{header}\n"
"engine state: [yellow]never started[/yellow] "
"(state.sqlite missing)"
)
return
conn = connect_state(db)
try:
run_migrations(conn)
repo = Repository()
sys_state = repo.get_system_state(conn)
open_positions = repo.list_open_positions(conn)
finally:
conn.close()
if sys_state is None:
console.print(f"{header}\nengine state: [yellow]uninitialised[/yellow]")
return
armed = sys_state.kill_switch == 1
flag = "[red]ARMED[/red]" if armed else "[green]disarmed[/green]"
console.print(
f"{header}\n"
f"kill_switch: {flag}"
f"{' reason=' + (sys_state.kill_reason or '?') if armed else ''}\n"
f"open positions: {len(open_positions)}\n"
f"config_version: {sys_state.config_version}\n"
f"started_at: {sys_state.started_at.isoformat()}\n"
f"last_health_check: {sys_state.last_health_check.isoformat()}"
)
@main.command()
@click.option(
"--db",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--max-staleness-s",
type=int,
default=600,
show_default=True,
help=(
"Maximum age (seconds) of last_health_check before the engine is "
"considered unhealthy. Used by Docker HEALTHCHECK."
),
)
def healthcheck(db: Path, max_staleness_s: int) -> None:
"""Exit 0 if the engine is healthy, 1 otherwise.
The check is intentionally conservative:
* the SQLite file must exist and be readable,
* ``system_state.kill_switch`` must be 0,
* ``system_state.last_health_check`` must not be older than
``--max-staleness-s`` seconds.
Wired as the container HEALTHCHECK in ``Dockerfile``.
"""
if not db.exists():
console.print("[red]unhealthy[/red]: state.sqlite missing")
sys.exit(1)
try:
conn = connect_state(db)
try:
run_migrations(conn)
sys_state = Repository().get_system_state(conn)
finally:
conn.close()
except Exception as exc:
console.print(f"[red]unhealthy[/red]: {type(exc).__name__}: {exc}")
sys.exit(1)
if sys_state is None:
console.print("[red]unhealthy[/red]: system_state singleton missing")
sys.exit(1)
if sys_state.kill_switch == 1:
console.print(
f"[red]unhealthy[/red]: kill switch armed "
f"reason={sys_state.kill_reason!r}"
)
sys.exit(1)
age = (datetime.now(UTC) - sys_state.last_health_check).total_seconds()
if age > max_staleness_s:
console.print(
f"[red]unhealthy[/red]: last_health_check stale "
f"({age:.0f}s > {max_staleness_s}s)"
)
sys.exit(1)
console.print(f"[green]healthy[/green] last_check_age={age:.0f}s")
def _engine_options(func: Callable[..., Any]) -> Callable[..., Any]:
"""Common options for the engine commands."""
decorators = [
click.option(
"--strategy",
"strategy_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
),
click.option(
"--token",
type=str,
default=None,
help=(
"MCP bearer token (overrides CERBERO_BITE_MCP_TOKEN). "
"The server uses the token to choose between testnet "
"and mainnet upstream environments."
),
),
click.option(
"--db",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
),
click.option(
"--audit",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
),
click.option(
"--environment",
type=click.Choice(["testnet", "mainnet"]),
default="testnet",
show_default=True,
),
click.option(
"--eur-to-usd",
type=float,
default=1.075,
show_default=True,
help="EUR→USD conversion rate used by the sizing engine.",
),
]
for d in reversed(decorators):
func = d(func)
return func
def _build_orchestrator(
*,
strategy_path: Path,
token: str | None,
db: Path,
audit: Path,
environment: str,
eur_to_usd: float,
enforce_hash: bool = True,
) -> Orchestrator:
loaded = load_strategy(strategy_path, enforce_hash=enforce_hash)
resolved_token = load_token(value=token)
# Strategy file values win over the CLI defaults; explicit overrides
# via env-style values (CLI flags) still apply when the user provides
# them — Click signals "default" via Click's resilient_parsing flag,
# but for now the CLI value is treated as authoritative when it
# differs from the documented default to keep the surface small.
cfg_env = loaded.config.execution.environment
cfg_fx = loaded.config.execution.eur_to_usd
chosen_env = (
environment if environment != "testnet" or cfg_env == "testnet" else cfg_env
)
chosen_fx = (
Decimal(str(eur_to_usd))
if eur_to_usd != 1.075
else cfg_fx
)
return make_orchestrator(
cfg=loaded.config,
endpoints=load_endpoints(),
token=resolved_token,
db_path=db,
audit_path=audit,
expected_environment=chosen_env, # type: ignore[arg-type]
eur_to_usd=chosen_fx,
bot_tag=load_bot_tag(),
flags=load_runtime_flags(),
)
@main.command()
@_engine_options
def start(
strategy_path: Path,
token: str | None,
db: Path,
audit: Path,
environment: str,
eur_to_usd: float,
) -> None:
"""Start the engine main loop (scheduler + monitoring)."""
try:
orch = _build_orchestrator(
strategy_path=strategy_path,
token=token,
db=db,
audit=audit,
environment=environment,
eur_to_usd=eur_to_usd,
enforce_hash=True,
)
except Exception as exc:
console.print(f"[red]boot aborted[/red]: {type(exc).__name__}: {exc}")
sys.exit(1)
console.print(
f"[bold cyan]Cerbero Bite[/bold cyan] starting "
f"(env={environment}, db={db}, audit={audit})"
)
try:
asyncio.run(orch.run_forever())
except KeyboardInterrupt:
console.print("[yellow]engine interrupted[/yellow]")
@main.command()
@_engine_options
@click.option(
"--cycle",
type=click.Choice(["entry", "monitor", "health"]),
default="entry",
show_default=True,
)
def dry_run(
strategy_path: Path,
token: str | None,
db: Path,
audit: Path,
environment: str,
eur_to_usd: float,
cycle: str,
) -> None:
"""Execute one cycle without starting the scheduler."""
orch = _build_orchestrator(
strategy_path=strategy_path,
token=token,
db=db,
audit=audit,
environment=environment,
eur_to_usd=eur_to_usd,
enforce_hash=False,
)
async def _go() -> None:
await orch.boot()
if cycle == "entry":
entry = await orch.run_entry()
console.print(
f"[green]entry[/green] {entry.status} reason={entry.reason}"
)
elif cycle == "monitor":
mon = await orch.run_monitor()
console.print(f"[green]monitor[/green] outcomes={len(mon.outcomes)}")
for outcome in mon.outcomes:
console.print(
f"{outcome.proposal_id[:8]} {outcome.action} "
f"closed={outcome.closed}"
)
else:
health = await orch.run_health()
console.print(f"[green]health[/green] state={health.state}")
asyncio.run(_go())
@main.command()
def stop() -> None:
"""Gracefully stop a running engine (manual: send SIGTERM)."""
console.print(
"Use [bold]docker compose stop cerbero-bite[/bold] (or send SIGTERM "
"to the engine PID). The container traps the signal and shuts down "
"the scheduler before exit."
)
@main.group(name="kill-switch")
def kill_switch() -> None:
"""Manage the engine kill switch."""
def _make_kill_switch(
db_path: Path, audit_path: Path, *, config_version: str
) -> KillSwitch:
"""Wire a :class:`KillSwitch` against the on-disk paths.
``init_system_state`` is called eagerly so the CLI can be used on
a fresh checkout before the engine ever ran.
"""
db_path.parent.mkdir(parents=True, exist_ok=True)
audit_path.parent.mkdir(parents=True, exist_ok=True)
conn = connect_state(db_path)
try:
run_migrations(conn)
repo = Repository()
with transaction(conn):
repo.init_system_state(
conn, config_version=config_version, now=datetime.now(UTC)
)
finally:
conn.close()
return KillSwitch(
connection_factory=lambda: connect_state(db_path),
repository=Repository(),
audit_log=AuditLog(audit_path),
)
@kill_switch.command(name="arm")
@click.option("--reason", required=True, help="Why you are arming the kill switch.")
@click.option(
"--source",
default="manual",
show_default=True,
help="Trigger label (manual, mcp_timeout, hash_chain, ...).",
)
@click.option(
"--db",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--audit",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
@click.option(
"--config-version",
default="unknown",
show_default=True,
help="Recorded next to the kill event when the singleton is initialised.",
)
def kill_switch_arm(
reason: str, source: str, db: Path, audit: Path, config_version: str
) -> None:
"""Arm the kill switch (engine refuses new entries)."""
ks = _make_kill_switch(db, audit, config_version=config_version)
ks.arm(reason=reason, source=source)
console.print(f"[red]kill switch ARMED[/red] reason={reason!r} source={source}")
@kill_switch.command(name="disarm")
@click.option("--reason", required=True, help="Why you are disarming.")
@click.option(
"--source",
default="manual",
show_default=True,
)
@click.option(
"--db",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
@click.option(
"--audit",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
@click.option(
"--config-version",
default="unknown",
show_default=True,
)
def kill_switch_disarm(
reason: str, source: str, db: Path, audit: Path, config_version: str
) -> None:
"""Disarm the kill switch."""
ks = _make_kill_switch(db, audit, config_version=config_version)
ks.disarm(reason=reason, source=source)
console.print(f"[green]kill switch DISARMED[/green] reason={reason!r}")
@kill_switch.command(name="status")
@click.option(
"--db",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
def kill_switch_status(db: Path) -> None:
"""Print the current kill switch state."""
if not db.exists():
console.print("[yellow]state.sqlite not found — engine never ran[/yellow]")
return
conn = connect_state(db)
try:
run_migrations(conn)
state = Repository().get_system_state(conn)
finally:
conn.close()
if state is None:
console.print("[yellow]system_state singleton missing[/yellow]")
return
armed = state.kill_switch == 1
flag = "[red]ARMED[/red]" if armed else "[green]disarmed[/green]"
console.print(
f"kill_switch: {flag}\n"
f"reason: {state.kill_reason or '-'}\n"
f"kill_at: {state.kill_at.isoformat() if state.kill_at else '-'}\n"
f"last_health_check: {state.last_health_check.isoformat()}"
)
@main.command()
@click.option(
"--token",
type=str,
default=None,
help=(
"MCP bearer token (overrides CERBERO_BITE_MCP_TOKEN). The "
"server uses the token to choose between testnet and mainnet."
),
)
@click.option(
"--timeout",
type=float,
default=4.0,
show_default=True,
help="Per-service timeout in seconds for the ping call.",
)
def ping(token: str | None, timeout: float) -> None:
"""Print health status for every MCP service Cerbero Bite uses."""
try:
resolved_token = load_token(value=token)
except ValueError as exc:
console.print(f"[red]token error[/red]: {exc}")
sys.exit(1)
endpoints = load_endpoints()
rows = asyncio.run(_ping_all(endpoints, token=resolved_token, timeout=timeout))
table = Table(title="MCP services")
table.add_column("service")
table.add_column("url")
table.add_column("status")
table.add_column("detail")
for service, url, status, detail in rows:
colour = {"ok": "green", "fail": "red", "skipped": "yellow"}.get(status, "white")
table.add_row(service, url, f"[{colour}]{status.upper()}[/{colour}]", detail)
console.print(table)
async def _ping_one(
*,
service: str,
url: str,
token: str,
timeout: float,
) -> tuple[str, str]:
"""Return ``(status, detail)`` for one service health check."""
http = HttpToolClient(
service=service,
base_url=url,
token=token,
retry_max=1,
timeout_s=timeout,
)
try:
if service == "deribit":
info = await DeribitClient(http).environment_info()
return "ok", f"environment={info.environment}"
if service == "macro":
await MacroClient(http).get_calendar(days=1, importance_min="high")
return "ok", "calendar reachable"
if service == "sentiment":
await SentimentClient(http).funding_cross_median_annualized("ETH")
return "ok", "funding reachable"
if service == "hyperliquid":
await HyperliquidClient(http).funding_rate_annualized("ETH")
return "ok", "ETH-PERP reachable"
return "skipped", "no probe defined" # pragma: no cover
except McpError as exc:
return "fail", f"{type(exc).__name__}: {exc}"
except Exception as exc: # surface any unexpected error for the operator
return "fail", f"{type(exc).__name__}: {exc}"
async def _ping_all(
endpoints: object, *, token: str, timeout: float
) -> list[tuple[str, str, str, str]]:
rows: list[tuple[str, str, str, str]] = []
for service in DEFAULT_ENDPOINTS:
url = endpoints.for_service(service) # type: ignore[attr-defined]
status, detail = await _ping_one(
service=service, url=url, token=token, timeout=timeout
)
rows.append((service, url, status, detail))
return rows
@main.command()
@click.option(
"--db",
type=click.Path(path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
help="SQLite state file the dashboard reads.",
)
@click.option(
"--audit",
type=click.Path(path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
help="Audit log file the dashboard streams.",
)
@click.option(
"--port",
type=int,
default=8765,
show_default=True,
help="Local port to bind (always 127.0.0.1).",
)
@click.option(
"--headless/--no-headless",
default=True,
show_default=True,
help="When true, do not auto-open the browser.",
)
def gui(db: Path, audit: Path, port: int, headless: bool) -> None:
"""Launch the Streamlit dashboard (read-only, localhost only)."""
try:
import streamlit # noqa: F401, PLC0415
except ImportError:
click.echo(
"streamlit not installed. Run `uv sync --extra gui` first.",
err=True,
)
sys.exit(1)
main_path = Path(__file__).parent / "gui" / "main.py"
if not main_path.is_file():
click.echo(f"GUI entry point not found: {main_path}", err=True)
sys.exit(1)
env = os.environ.copy()
env["CERBERO_BITE_GUI_DB"] = str(db.resolve())
env["CERBERO_BITE_GUI_AUDIT"] = str(audit.resolve())
cmd = [
sys.executable,
"-m",
"streamlit",
"run",
str(main_path),
"--server.address",
"127.0.0.1",
"--server.port",
str(port),
"--server.headless",
"true" if headless else "false",
"--browser.gatherUsageStats",
"false",
]
click.echo(f"Launching GUI on http://127.0.0.1:{port}")
os.execvpe(cmd[0], cmd, env)
@main.command()
@click.option("--from", "date_from", required=True, help="ISO date YYYY-MM-DD.")
@click.option("--to", "date_to", required=True, help="ISO date YYYY-MM-DD.")
@click.option("--capital", type=float, default=1500.0, show_default=True)
@click.option("--dry-run/--no-dry-run", default=True, show_default=True)
def replay(date_from: str, date_to: str, capital: float, dry_run: bool) -> None:
"""Replay historical period through the decision engine."""
_phase0_notice(
f"replay {date_from}{date_to}, capital={capital}, dry_run={dry_run}"
)
@main.command()
@click.option(
"--strategy",
"strategy_path",
type=click.Path(path_type=Path),
default=Path("strategy.yaml"),
show_default=True,
help="Path al file di strategia (golden, conservativa, aggressiva, ...).",
)
@click.option(
"--db",
"db_path",
type=click.Path(path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
help="SQLite con `market_snapshots` storiche.",
)
@click.option(
"--from",
"date_from",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: 90 giorni fa).",
)
@click.option(
"--to",
"date_to",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="ISO date YYYY-MM-DD (default: oggi).",
)
@click.option(
"--capital",
type=float,
default=1500.0,
show_default=True,
help="Capitale di partenza per il backtest, in USD.",
)
@click.option(
"--asset",
type=str,
default="ETH",
show_default=True,
help="Asset di riferimento per le snapshot.",
)
@click.option(
"--no-enforce-hash",
is_flag=True,
default=False,
help="Salta la verifica del config_hash (utile per profili sperimentali).",
)
def backtest(
strategy_path: Path,
db_path: Path,
date_from: datetime | None,
date_to: datetime | None,
capital: float,
asset: str,
no_enforce_hash: bool,
) -> None:
"""Esegue il backtest stilizzato su `market_snapshots` storiche.
Usa lo stesso `validate_entry` del live per i filtri (rigoroso) e
un modello Black-Scholes con skew premium per stimare credito ed
exit P/L (stilizzato — vedi docstring di `core/backtest.py`).
"""
from cerbero_bite.config.loader import load_strategy # noqa: PLC0415
from cerbero_bite.core.backtest import run_backtest # noqa: PLC0415
console = Console()
if date_to is None:
date_to = datetime.now(UTC)
if date_from is None:
date_from = date_to - timedelta(days=90)
date_from = date_from.replace(tzinfo=UTC) if date_from.tzinfo is None else date_from
date_to = date_to.replace(tzinfo=UTC) if date_to.tzinfo is None else date_to
loaded = load_strategy(strategy_path, enforce_hash=not no_enforce_hash)
cfg = loaded.config
conn = connect_state(db_path)
try:
repo = Repository()
snapshots = repo.list_market_snapshots(
conn,
asset=asset.upper(),
start=date_from,
end=date_to,
limit=10000,
)
finally:
conn.close()
if not snapshots:
console.print(
f"[yellow]Nessuno snapshot {asset} trovato fra {date_from.date()} "
f"e {date_to.date()}.[/yellow]"
)
sys.exit(1)
console.print(
f"[green]Caricate {len(snapshots)} snapshot {asset} "
f"({snapshots[-1].timestamp.date()}{snapshots[0].timestamp.date()})[/green]"
)
report = run_backtest(snapshots, cfg, capital_usd=Decimal(str(capital)))
table = Table(title=f"Backtest report — {strategy_path.name}")
table.add_column("Metrica", style="cyan")
table.add_column("Valore", style="bold")
table.add_row("Picks (lunedì 14:00)", str(report.n_picks))
table.add_row(
"Accettati dai filtri",
f"{report.n_accepted} ({report.n_accepted / max(1, report.n_picks):.0%})",
)
table.add_row("Saltati per dato mancante", str(report.n_skipped_data))
table.add_row("Trade completati (con P/L)", str(report.n_completed))
table.add_row("Vincenti", f"{report.n_winners} ({report.win_rate:.0%})")
table.add_row("P/L cumulato (USD)", f"{report.cumulative_pnl_usd:+.2f}")
table.add_row(
"P/L su capitale", f"{report.cumulative_pnl_pct_of_capital:+.2%}"
)
table.add_row(
"Max drawdown", f"{report.max_drawdown_usd:.0f} USD "
f"({report.max_drawdown_pct:.1%})",
)
table.add_row(
"Sharpe (annualized)",
f"{report.sharpe_annualized}" if report.sharpe_annualized is not None
else "",
)
console.print(table)
if report.skip_reasons:
skip_table = Table(title="Motivi di skip aggregati")
skip_table.add_column("Motivo")
skip_table.add_column("Settimane", justify="right")
for reason, count in sorted(
report.skip_reasons.items(), key=lambda kv: -kv[1]
):
skip_table.add_row(reason, str(count))
console.print(skip_table)
console.print(
"[dim]Il modello P/L è stilizzato: BS + skew premium 1.5×. "
"Numeri ottimi per ranking config, non per promesse operative.[/dim]"
)
@main.group()
def config() -> None:
"""Strategy configuration utilities."""
@config.command(name="hash")
@click.option(
"--file",
"yaml_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
def config_hash(yaml_path: Path) -> None:
"""Compute and print the SHA-256 of *yaml_path* (config_hash field excluded)."""
text = yaml_path.read_text(encoding="utf-8")
digest = compute_config_hash(text)
console.print(digest)
@config.command(name="validate")
@click.option(
"--file",
"yaml_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=_DEFAULT_STRATEGY_PATH,
show_default=True,
)
@click.option(
"--enforce-hash/--no-enforce-hash",
default=True,
show_default=True,
help="When enabled, the recorded config_hash must match the file body.",
)
def config_validate(yaml_path: Path, enforce_hash: bool) -> None:
"""Load and validate ``strategy.yaml`` (and any local override)."""
loaded = load_strategy(yaml_path, enforce_hash=enforce_hash)
console.print(
f"[green]ok[/green] version={loaded.config.config_version} "
f"hash={loaded.computed_hash[:16]}"
f"sources={', '.join(p.name for p in loaded.sources)}"
)
@main.group()
def audit() -> None:
"""Audit log utilities."""
@audit.command(name="verify")
@click.option(
"--file",
"audit_path",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_AUDIT_PATH,
show_default=True,
)
def audit_verify(audit_path: Path) -> None:
"""Walk the hash chain in *audit_path* and report tampering."""
try:
count = verify_audit_chain(audit_path)
except AuditChainError as exc:
console.print(f"[red]TAMPERED[/red]: {exc}")
sys.exit(2)
if count == 0:
console.print("[yellow]audit log empty[/yellow]")
else:
console.print(f"[green]ok[/green] {count} entries verified")
@main.group()
def state() -> None:
"""State inspection utilities."""
@state.command(name="inspect")
@click.option(
"--db",
type=click.Path(dir_okay=False, path_type=Path),
default=_DEFAULT_DB_PATH,
show_default=True,
)
def state_inspect(db: Path) -> None:
"""Print a short snapshot of the SQLite state file."""
if not db.exists():
console.print("[yellow]state.sqlite not found[/yellow]")
return
conn = connect_state(db)
try:
run_migrations(conn)
repo = Repository()
sys_state = repo.get_system_state(conn)
positions = repo.list_open_positions(conn)
concurrent = repo.count_concurrent_positions(conn)
finally:
conn.close()
if sys_state is None:
console.print("[yellow]system_state singleton missing[/yellow]")
return
armed = "[red]ARMED[/red]" if sys_state.kill_switch == 1 else "[green]disarmed[/green]"
console.print(
f"engine state: kill_switch={armed}, "
f"open positions: {concurrent}, "
f"config_version: {sys_state.config_version}"
)
if not positions:
console.print("no open positions")
return
table = Table(title="open positions")
table.add_column("proposal_id")
table.add_column("status")
table.add_column("spread")
table.add_column("short")
table.add_column("long")
table.add_column("n")
table.add_column("expiry")
for pos in positions:
table.add_row(
str(pos.proposal_id)[:8],
pos.status,
pos.spread_type,
str(pos.short_strike),
str(pos.long_strike),
str(pos.n_contracts),
pos.expiry.isoformat(),
)
console.print(table)
def _entrypoint() -> None:
"""Wrapper used by ``cerbero-bite`` console script."""
try:
main(prog_name="cerbero-bite")
except KeyboardInterrupt:
console.print("[red]interrupted[/red]")
sys.exit(130)
if __name__ == "__main__":
_entrypoint()