"""Command-line interface for Cerbero Bite. The CLI is the single user-facing entry point. Each subcommand maps to a specific operational action documented in ``docs/06-operational-flow.md`` and ``docs/07-risk-controls.md``. All commands are placeholders in Phase 0; subsequent phases will replace the bodies with real logic without changing the surface. """ from __future__ import annotations import asyncio import os import sys from collections.abc import Callable from datetime import UTC, datetime, timedelta from decimal import Decimal from pathlib import Path from typing import Any import click from rich.console import Console from rich.table import Table from cerbero_bite import __version__ from cerbero_bite.clients import HttpToolClient, McpError from cerbero_bite.clients.deribit import DeribitClient from cerbero_bite.clients.hyperliquid import HyperliquidClient from cerbero_bite.clients.macro import MacroClient from cerbero_bite.clients.sentiment import SentimentClient from cerbero_bite.config.loader import compute_config_hash, load_strategy from cerbero_bite.config.mcp_endpoints import ( DEFAULT_ENDPOINTS, load_bot_tag, load_endpoints, load_token, ) from cerbero_bite.config.runtime_flags import load_runtime_flags from cerbero_bite.logging import configure as configure_logging from cerbero_bite.logging import get_logger from cerbero_bite.runtime.orchestrator import Orchestrator, make_orchestrator from cerbero_bite.safety.audit_log import AuditChainError, AuditLog from cerbero_bite.safety.audit_log import verify_chain as verify_audit_chain from cerbero_bite.safety.kill_switch import KillSwitch from cerbero_bite.state import Repository, run_migrations, transaction from cerbero_bite.state import connect as connect_state console = Console() log = get_logger("cli") _DEFAULT_DB_PATH = Path("data/state.sqlite") _DEFAULT_AUDIT_PATH = Path("data/audit.log") _DEFAULT_STRATEGY_PATH = Path("strategy.yaml") def _phase0_notice(action: str) -> None: console.print(f"[yellow]\\[phase 0 placeholder][/yellow] {action}") @click.group(context_settings={"help_option_names": ["-h", "--help"]}) @click.version_option(__version__, prog_name="cerbero-bite") @click.option( "--log-dir", type=click.Path(file_okay=False, path_type=Path), default=Path("data/log"), show_default=True, help="Directory for JSONL log files.", ) @click.option( "--log-level", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"], case_sensitive=False), default="INFO", show_default=True, ) @click.pass_context def main(ctx: click.Context, log_dir: Path, log_level: str) -> None: """Cerbero Bite — rule-based ETH credit spread engine.""" # Load `.env` once at CLI entry, unless we are running under # pytest (which sets ``PYTEST_CURRENT_TEST`` for the duration of # the test). Existing env vars win over the file (override=False). if "PYTEST_CURRENT_TEST" not in os.environ: from dotenv import load_dotenv # noqa: PLC0415 load_dotenv(Path.cwd() / ".env", override=False) configure_logging(log_dir=log_dir, level=log_level.upper()) ctx.ensure_object(dict) ctx.obj["log_dir"] = log_dir @main.command() @click.option( "--db", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ) def status(db: Path) -> None: """Print engine status snapshot.""" header = f"[bold cyan]Cerbero Bite[/bold cyan] v{__version__}" if not db.exists(): console.print( f"{header}\n" "engine state: [yellow]never started[/yellow] " "(state.sqlite missing)" ) return conn = connect_state(db) try: run_migrations(conn) repo = Repository() sys_state = repo.get_system_state(conn) open_positions = repo.list_open_positions(conn) finally: conn.close() if sys_state is None: console.print(f"{header}\nengine state: [yellow]uninitialised[/yellow]") return armed = sys_state.kill_switch == 1 flag = "[red]ARMED[/red]" if armed else "[green]disarmed[/green]" console.print( f"{header}\n" f"kill_switch: {flag}" f"{' reason=' + (sys_state.kill_reason or '?') if armed else ''}\n" f"open positions: {len(open_positions)}\n" f"config_version: {sys_state.config_version}\n" f"started_at: {sys_state.started_at.isoformat()}\n" f"last_health_check: {sys_state.last_health_check.isoformat()}" ) @main.command() @click.option( "--db", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ) @click.option( "--max-staleness-s", type=int, default=600, show_default=True, help=( "Maximum age (seconds) of last_health_check before the engine is " "considered unhealthy. Used by Docker HEALTHCHECK." ), ) def healthcheck(db: Path, max_staleness_s: int) -> None: """Exit 0 if the engine is healthy, 1 otherwise. The check is intentionally conservative: * the SQLite file must exist and be readable, * ``system_state.kill_switch`` must be 0, * ``system_state.last_health_check`` must not be older than ``--max-staleness-s`` seconds. Wired as the container HEALTHCHECK in ``Dockerfile``. """ if not db.exists(): console.print("[red]unhealthy[/red]: state.sqlite missing") sys.exit(1) try: conn = connect_state(db) try: run_migrations(conn) sys_state = Repository().get_system_state(conn) finally: conn.close() except Exception as exc: console.print(f"[red]unhealthy[/red]: {type(exc).__name__}: {exc}") sys.exit(1) if sys_state is None: console.print("[red]unhealthy[/red]: system_state singleton missing") sys.exit(1) if sys_state.kill_switch == 1: console.print( f"[red]unhealthy[/red]: kill switch armed " f"reason={sys_state.kill_reason!r}" ) sys.exit(1) age = (datetime.now(UTC) - sys_state.last_health_check).total_seconds() if age > max_staleness_s: console.print( f"[red]unhealthy[/red]: last_health_check stale " f"({age:.0f}s > {max_staleness_s}s)" ) sys.exit(1) console.print(f"[green]healthy[/green] last_check_age={age:.0f}s") def _engine_options(func: Callable[..., Any]) -> Callable[..., Any]: """Common options for the engine commands.""" decorators = [ click.option( "--strategy", "strategy_path", type=click.Path(exists=True, dir_okay=False, path_type=Path), default=_DEFAULT_STRATEGY_PATH, show_default=True, ), click.option( "--token", type=str, default=None, help=( "MCP bearer token (overrides CERBERO_BITE_MCP_TOKEN). " "The server uses the token to choose between testnet " "and mainnet upstream environments." ), ), click.option( "--db", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ), click.option( "--audit", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_AUDIT_PATH, show_default=True, ), click.option( "--environment", type=click.Choice(["testnet", "mainnet"]), default="testnet", show_default=True, ), click.option( "--eur-to-usd", type=float, default=1.075, show_default=True, help="EUR→USD conversion rate used by the sizing engine.", ), ] for d in reversed(decorators): func = d(func) return func def _build_orchestrator( *, strategy_path: Path, token: str | None, db: Path, audit: Path, environment: str, eur_to_usd: float, enforce_hash: bool = True, ) -> Orchestrator: loaded = load_strategy(strategy_path, enforce_hash=enforce_hash) resolved_token = load_token(value=token) # Strategy file values win over the CLI defaults; explicit overrides # via env-style values (CLI flags) still apply when the user provides # them — Click signals "default" via Click's resilient_parsing flag, # but for now the CLI value is treated as authoritative when it # differs from the documented default to keep the surface small. cfg_env = loaded.config.execution.environment cfg_fx = loaded.config.execution.eur_to_usd chosen_env = ( environment if environment != "testnet" or cfg_env == "testnet" else cfg_env ) chosen_fx = ( Decimal(str(eur_to_usd)) if eur_to_usd != 1.075 else cfg_fx ) return make_orchestrator( cfg=loaded.config, endpoints=load_endpoints(), token=resolved_token, db_path=db, audit_path=audit, expected_environment=chosen_env, # type: ignore[arg-type] eur_to_usd=chosen_fx, bot_tag=load_bot_tag(), flags=load_runtime_flags(), ) @main.command() @_engine_options def start( strategy_path: Path, token: str | None, db: Path, audit: Path, environment: str, eur_to_usd: float, ) -> None: """Start the engine main loop (scheduler + monitoring).""" try: orch = _build_orchestrator( strategy_path=strategy_path, token=token, db=db, audit=audit, environment=environment, eur_to_usd=eur_to_usd, enforce_hash=True, ) except Exception as exc: console.print(f"[red]boot aborted[/red]: {type(exc).__name__}: {exc}") sys.exit(1) console.print( f"[bold cyan]Cerbero Bite[/bold cyan] starting " f"(env={environment}, db={db}, audit={audit})" ) try: asyncio.run(orch.run_forever()) except KeyboardInterrupt: console.print("[yellow]engine interrupted[/yellow]") @main.command() @_engine_options @click.option( "--cycle", type=click.Choice(["entry", "monitor", "health"]), default="entry", show_default=True, ) def dry_run( strategy_path: Path, token: str | None, db: Path, audit: Path, environment: str, eur_to_usd: float, cycle: str, ) -> None: """Execute one cycle without starting the scheduler.""" orch = _build_orchestrator( strategy_path=strategy_path, token=token, db=db, audit=audit, environment=environment, eur_to_usd=eur_to_usd, enforce_hash=False, ) async def _go() -> None: await orch.boot() if cycle == "entry": entry = await orch.run_entry() console.print( f"[green]entry[/green] {entry.status} reason={entry.reason}" ) elif cycle == "monitor": mon = await orch.run_monitor() console.print(f"[green]monitor[/green] outcomes={len(mon.outcomes)}") for outcome in mon.outcomes: console.print( f" • {outcome.proposal_id[:8]} {outcome.action} " f"closed={outcome.closed}" ) else: health = await orch.run_health() console.print(f"[green]health[/green] state={health.state}") asyncio.run(_go()) @main.command() def stop() -> None: """Gracefully stop a running engine (manual: send SIGTERM).""" console.print( "Use [bold]docker compose stop cerbero-bite[/bold] (or send SIGTERM " "to the engine PID). The container traps the signal and shuts down " "the scheduler before exit." ) @main.group(name="kill-switch") def kill_switch() -> None: """Manage the engine kill switch.""" def _make_kill_switch( db_path: Path, audit_path: Path, *, config_version: str ) -> KillSwitch: """Wire a :class:`KillSwitch` against the on-disk paths. ``init_system_state`` is called eagerly so the CLI can be used on a fresh checkout before the engine ever ran. """ db_path.parent.mkdir(parents=True, exist_ok=True) audit_path.parent.mkdir(parents=True, exist_ok=True) conn = connect_state(db_path) try: run_migrations(conn) repo = Repository() with transaction(conn): repo.init_system_state( conn, config_version=config_version, now=datetime.now(UTC) ) finally: conn.close() return KillSwitch( connection_factory=lambda: connect_state(db_path), repository=Repository(), audit_log=AuditLog(audit_path), ) @kill_switch.command(name="arm") @click.option("--reason", required=True, help="Why you are arming the kill switch.") @click.option( "--source", default="manual", show_default=True, help="Trigger label (manual, mcp_timeout, hash_chain, ...).", ) @click.option( "--db", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ) @click.option( "--audit", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_AUDIT_PATH, show_default=True, ) @click.option( "--config-version", default="unknown", show_default=True, help="Recorded next to the kill event when the singleton is initialised.", ) def kill_switch_arm( reason: str, source: str, db: Path, audit: Path, config_version: str ) -> None: """Arm the kill switch (engine refuses new entries).""" ks = _make_kill_switch(db, audit, config_version=config_version) ks.arm(reason=reason, source=source) console.print(f"[red]kill switch ARMED[/red] reason={reason!r} source={source}") @kill_switch.command(name="disarm") @click.option("--reason", required=True, help="Why you are disarming.") @click.option( "--source", default="manual", show_default=True, ) @click.option( "--db", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ) @click.option( "--audit", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_AUDIT_PATH, show_default=True, ) @click.option( "--config-version", default="unknown", show_default=True, ) def kill_switch_disarm( reason: str, source: str, db: Path, audit: Path, config_version: str ) -> None: """Disarm the kill switch.""" ks = _make_kill_switch(db, audit, config_version=config_version) ks.disarm(reason=reason, source=source) console.print(f"[green]kill switch DISARMED[/green] reason={reason!r}") @kill_switch.command(name="status") @click.option( "--db", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ) def kill_switch_status(db: Path) -> None: """Print the current kill switch state.""" if not db.exists(): console.print("[yellow]state.sqlite not found — engine never ran[/yellow]") return conn = connect_state(db) try: run_migrations(conn) state = Repository().get_system_state(conn) finally: conn.close() if state is None: console.print("[yellow]system_state singleton missing[/yellow]") return armed = state.kill_switch == 1 flag = "[red]ARMED[/red]" if armed else "[green]disarmed[/green]" console.print( f"kill_switch: {flag}\n" f"reason: {state.kill_reason or '-'}\n" f"kill_at: {state.kill_at.isoformat() if state.kill_at else '-'}\n" f"last_health_check: {state.last_health_check.isoformat()}" ) @main.command() @click.option( "--token", type=str, default=None, help=( "MCP bearer token (overrides CERBERO_BITE_MCP_TOKEN). The " "server uses the token to choose between testnet and mainnet." ), ) @click.option( "--timeout", type=float, default=4.0, show_default=True, help="Per-service timeout in seconds for the ping call.", ) def ping(token: str | None, timeout: float) -> None: """Print health status for every MCP service Cerbero Bite uses.""" try: resolved_token = load_token(value=token) except ValueError as exc: console.print(f"[red]token error[/red]: {exc}") sys.exit(1) endpoints = load_endpoints() rows = asyncio.run(_ping_all(endpoints, token=resolved_token, timeout=timeout)) table = Table(title="MCP services") table.add_column("service") table.add_column("url") table.add_column("status") table.add_column("detail") for service, url, status, detail in rows: colour = {"ok": "green", "fail": "red", "skipped": "yellow"}.get(status, "white") table.add_row(service, url, f"[{colour}]{status.upper()}[/{colour}]", detail) console.print(table) async def _ping_one( *, service: str, url: str, token: str, timeout: float, ) -> tuple[str, str]: """Return ``(status, detail)`` for one service health check.""" http = HttpToolClient( service=service, base_url=url, token=token, retry_max=1, timeout_s=timeout, ) try: if service == "deribit": info = await DeribitClient(http).environment_info() return "ok", f"environment={info.environment}" if service == "macro": await MacroClient(http).get_calendar(days=1, importance_min="high") return "ok", "calendar reachable" if service == "sentiment": await SentimentClient(http).funding_cross_median_annualized("ETH") return "ok", "funding reachable" if service == "hyperliquid": await HyperliquidClient(http).funding_rate_annualized("ETH") return "ok", "ETH-PERP reachable" return "skipped", "no probe defined" # pragma: no cover except McpError as exc: return "fail", f"{type(exc).__name__}: {exc}" except Exception as exc: # surface any unexpected error for the operator return "fail", f"{type(exc).__name__}: {exc}" async def _ping_all( endpoints: object, *, token: str, timeout: float ) -> list[tuple[str, str, str, str]]: rows: list[tuple[str, str, str, str]] = [] for service in DEFAULT_ENDPOINTS: url = endpoints.for_service(service) # type: ignore[attr-defined] status, detail = await _ping_one( service=service, url=url, token=token, timeout=timeout ) rows.append((service, url, status, detail)) return rows @main.command() @click.option( "--db", type=click.Path(path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, help="SQLite state file the dashboard reads.", ) @click.option( "--audit", type=click.Path(path_type=Path), default=_DEFAULT_AUDIT_PATH, show_default=True, help="Audit log file the dashboard streams.", ) @click.option( "--port", type=int, default=8765, show_default=True, help="Local port to bind (always 127.0.0.1).", ) @click.option( "--headless/--no-headless", default=True, show_default=True, help="When true, do not auto-open the browser.", ) def gui(db: Path, audit: Path, port: int, headless: bool) -> None: """Launch the Streamlit dashboard (read-only, localhost only).""" try: import streamlit # noqa: F401, PLC0415 except ImportError: click.echo( "streamlit not installed. Run `uv sync --extra gui` first.", err=True, ) sys.exit(1) main_path = Path(__file__).parent / "gui" / "main.py" if not main_path.is_file(): click.echo(f"GUI entry point not found: {main_path}", err=True) sys.exit(1) env = os.environ.copy() env["CERBERO_BITE_GUI_DB"] = str(db.resolve()) env["CERBERO_BITE_GUI_AUDIT"] = str(audit.resolve()) cmd = [ sys.executable, "-m", "streamlit", "run", str(main_path), "--server.address", "127.0.0.1", "--server.port", str(port), "--server.headless", "true" if headless else "false", "--browser.gatherUsageStats", "false", ] click.echo(f"Launching GUI on http://127.0.0.1:{port} …") os.execvpe(cmd[0], cmd, env) @main.command() @click.option("--from", "date_from", required=True, help="ISO date YYYY-MM-DD.") @click.option("--to", "date_to", required=True, help="ISO date YYYY-MM-DD.") @click.option("--capital", type=float, default=1500.0, show_default=True) @click.option("--dry-run/--no-dry-run", default=True, show_default=True) def replay(date_from: str, date_to: str, capital: float, dry_run: bool) -> None: """Replay historical period through the decision engine.""" _phase0_notice( f"replay {date_from} → {date_to}, capital={capital}, dry_run={dry_run}" ) @main.command() @click.option( "--strategy", "strategy_path", type=click.Path(path_type=Path), default=Path("strategy.yaml"), show_default=True, help="Path al file di strategia (golden, conservativa, aggressiva, ...).", ) @click.option( "--db", "db_path", type=click.Path(path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, help="SQLite con `market_snapshots` storiche.", ) @click.option( "--from", "date_from", type=click.DateTime(formats=["%Y-%m-%d"]), default=None, help="ISO date YYYY-MM-DD (default: 90 giorni fa).", ) @click.option( "--to", "date_to", type=click.DateTime(formats=["%Y-%m-%d"]), default=None, help="ISO date YYYY-MM-DD (default: oggi).", ) @click.option( "--capital", type=float, default=1500.0, show_default=True, help="Capitale di partenza per il backtest, in USD.", ) @click.option( "--asset", type=str, default="ETH", show_default=True, help="Asset di riferimento per le snapshot.", ) @click.option( "--no-enforce-hash", is_flag=True, default=False, help="Salta la verifica del config_hash (utile per profili sperimentali).", ) def backtest( strategy_path: Path, db_path: Path, date_from: datetime | None, date_to: datetime | None, capital: float, asset: str, no_enforce_hash: bool, ) -> None: """Esegue il backtest stilizzato su `market_snapshots` storiche. Usa lo stesso `validate_entry` del live per i filtri (rigoroso) e un modello Black-Scholes con skew premium per stimare credito ed exit P/L (stilizzato — vedi docstring di `core/backtest.py`). """ from cerbero_bite.config.loader import load_strategy # noqa: PLC0415 from cerbero_bite.core.backtest import run_backtest # noqa: PLC0415 console = Console() if date_to is None: date_to = datetime.now(UTC) if date_from is None: date_from = date_to - timedelta(days=90) date_from = date_from.replace(tzinfo=UTC) if date_from.tzinfo is None else date_from date_to = date_to.replace(tzinfo=UTC) if date_to.tzinfo is None else date_to loaded = load_strategy(strategy_path, enforce_hash=not no_enforce_hash) cfg = loaded.config conn = connect_state(db_path) try: repo = Repository() snapshots = repo.list_market_snapshots( conn, asset=asset.upper(), start=date_from, end=date_to, limit=10000, ) finally: conn.close() if not snapshots: console.print( f"[yellow]Nessuno snapshot {asset} trovato fra {date_from.date()} " f"e {date_to.date()}.[/yellow]" ) sys.exit(1) console.print( f"[green]Caricate {len(snapshots)} snapshot {asset} " f"({snapshots[-1].timestamp.date()} → {snapshots[0].timestamp.date()})[/green]" ) report = run_backtest(snapshots, cfg, capital_usd=Decimal(str(capital))) table = Table(title=f"Backtest report — {strategy_path.name}") table.add_column("Metrica", style="cyan") table.add_column("Valore", style="bold") table.add_row("Picks (daily 14:00)", str(report.n_picks)) table.add_row( "Accettati dai filtri", f"{report.n_accepted} ({report.n_accepted / max(1, report.n_picks):.0%})", ) table.add_row("Saltati per dato mancante", str(report.n_skipped_data)) table.add_row("Trade completati (con P/L)", str(report.n_completed)) table.add_row("Vincenti", f"{report.n_winners} ({report.win_rate:.0%})") table.add_row("P/L cumulato (USD)", f"{report.cumulative_pnl_usd:+.2f}") table.add_row( "P/L su capitale", f"{report.cumulative_pnl_pct_of_capital:+.2%}" ) table.add_row( "Max drawdown", f"−{report.max_drawdown_usd:.0f} USD " f"({report.max_drawdown_pct:.1%})", ) table.add_row( "Sharpe (annualized)", f"{report.sharpe_annualized}" if report.sharpe_annualized is not None else "—", ) console.print(table) if report.skip_reasons: skip_table = Table(title="Motivi di skip aggregati") skip_table.add_column("Motivo") skip_table.add_column("Giorni", justify="right") for reason, count in sorted( report.skip_reasons.items(), key=lambda kv: -kv[1] ): skip_table.add_row(reason, str(count)) console.print(skip_table) console.print( "[dim]Il modello P/L è stilizzato: BS + skew premium 1.5×. " "Numeri ottimi per ranking config, non per promesse operative.[/dim]" ) @main.group() def config() -> None: """Strategy configuration utilities.""" @config.command(name="hash") @click.option( "--file", "yaml_path", type=click.Path(exists=True, dir_okay=False, path_type=Path), default=_DEFAULT_STRATEGY_PATH, show_default=True, ) def config_hash(yaml_path: Path) -> None: """Compute and print the SHA-256 of *yaml_path* (config_hash field excluded).""" text = yaml_path.read_text(encoding="utf-8") digest = compute_config_hash(text) console.print(digest) @config.command(name="validate") @click.option( "--file", "yaml_path", type=click.Path(exists=True, dir_okay=False, path_type=Path), default=_DEFAULT_STRATEGY_PATH, show_default=True, ) @click.option( "--enforce-hash/--no-enforce-hash", default=True, show_default=True, help="When enabled, the recorded config_hash must match the file body.", ) def config_validate(yaml_path: Path, enforce_hash: bool) -> None: """Load and validate ``strategy.yaml`` (and any local override).""" loaded = load_strategy(yaml_path, enforce_hash=enforce_hash) console.print( f"[green]ok[/green] version={loaded.config.config_version} " f"hash={loaded.computed_hash[:16]}… " f"sources={', '.join(p.name for p in loaded.sources)}" ) @main.group() def audit() -> None: """Audit log utilities.""" @audit.command(name="verify") @click.option( "--file", "audit_path", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_AUDIT_PATH, show_default=True, ) def audit_verify(audit_path: Path) -> None: """Walk the hash chain in *audit_path* and report tampering.""" try: count = verify_audit_chain(audit_path) except AuditChainError as exc: console.print(f"[red]TAMPERED[/red]: {exc}") sys.exit(2) if count == 0: console.print("[yellow]audit log empty[/yellow]") else: console.print(f"[green]ok[/green] {count} entries verified") @main.group() def state() -> None: """State inspection utilities.""" @state.command(name="inspect") @click.option( "--db", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ) def state_inspect(db: Path) -> None: """Print a short snapshot of the SQLite state file.""" if not db.exists(): console.print("[yellow]state.sqlite not found[/yellow]") return conn = connect_state(db) try: run_migrations(conn) repo = Repository() sys_state = repo.get_system_state(conn) positions = repo.list_open_positions(conn) concurrent = repo.count_concurrent_positions(conn) finally: conn.close() if sys_state is None: console.print("[yellow]system_state singleton missing[/yellow]") return armed = "[red]ARMED[/red]" if sys_state.kill_switch == 1 else "[green]disarmed[/green]" console.print( f"engine state: kill_switch={armed}, " f"open positions: {concurrent}, " f"config_version: {sys_state.config_version}" ) if not positions: console.print("no open positions") return table = Table(title="open positions") table.add_column("proposal_id") table.add_column("status") table.add_column("spread") table.add_column("short") table.add_column("long") table.add_column("n") table.add_column("expiry") for pos in positions: table.add_row( str(pos.proposal_id)[:8], pos.status, pos.spread_type, str(pos.short_strike), str(pos.long_strike), str(pos.n_contracts), pos.expiry.isoformat(), ) console.print(table) @main.group(name="option-chain") def option_chain() -> None: """Strumenti per la catena opzioni storica (`option_chain_snapshots`).""" @option_chain.command(name="trigger") @click.option( "--strategy", "strategy_path", type=click.Path(exists=True, dir_okay=False, path_type=Path), default=_DEFAULT_STRATEGY_PATH, show_default=True, ) @click.option( "--db", "db_path", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ) @click.option( "--audit", "audit_path", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_AUDIT_PATH, show_default=True, ) @click.option( "--token", type=str, default=None, help="MCP bearer token (override su CERBERO_BITE_MCP_TOKEN).", ) @click.option("--asset", default="ETH", show_default=True) def option_chain_trigger( strategy_path: Path, db_path: Path, audit_path: Path, token: str | None, asset: str, ) -> None: """Esegue UNA volta il collector della catena opzioni e persiste in DB. Utile per popolare i dati senza aspettare il cron del job ``option_chain_snapshot``. Riusa esattamente la stessa pipeline schedulata. """ from cerbero_bite.runtime.dependencies import build_runtime # noqa: PLC0415 from cerbero_bite.runtime.option_chain_snapshot_cycle import ( # noqa: PLC0415 collect_option_chain_snapshot, ) cfg = load_strategy(strategy_path).config ctx = build_runtime( cfg=cfg, endpoints=load_endpoints(), token=load_token(value=token), db_path=db_path, audit_path=audit_path, bot_tag=load_bot_tag(), ) n = asyncio.run(collect_option_chain_snapshot(ctx, asset=asset)) console.print( f"[green]Persisted {n} option chain quote(s) for {asset}[/green]" if n > 0 else f"[yellow]No quotes persisted (chain empty or fetch failed)[/yellow]" ) @option_chain.command(name="analyze") @click.option( "--strategy", "strategy_path", type=click.Path(exists=True, dir_okay=False, path_type=Path), default=_DEFAULT_STRATEGY_PATH, show_default=True, ) @click.option( "--db", "db_path", type=click.Path(dir_okay=False, path_type=Path), default=_DEFAULT_DB_PATH, show_default=True, ) @click.option("--asset", default="ETH", show_default=True) @click.option( "--bias", type=click.Choice(["bull_put", "bear_call"], case_sensitive=False), default="bull_put", show_default=True, help="Direzione da simulare (il rule engine lo deciderebbe da trend×funding).", ) def option_chain_analyze( strategy_path: Path, db_path: Path, asset: str, bias: str, ) -> None: """Analizza l'ultimo snapshot di catena salvato. Per la strategia indicata, simula la selezione strike (delta target, OTM range, width 4%, credit/width ratio min) e mostra: * lo strike che il rule engine sceglierebbe come short e long, * credito atteso, larghezza, rapporto credit/width, * pass/fail del gate `credit_to_width_ratio_min`. """ from cerbero_bite.core.combo_builder import select_strikes # noqa: PLC0415 from cerbero_bite.core.types import OptionQuote # noqa: PLC0415 cfg = load_strategy(strategy_path).config conn = connect_state(db_path) try: repo = Repository() latest_ts = repo.latest_option_chain_timestamp(conn, asset=asset.upper()) if latest_ts is None: console.print( "[red]Nessuno snapshot di catena trovato. Lancia prima " "`cerbero-bite option-chain trigger`.[/red]" ) sys.exit(1) quotes_records = repo.list_option_chain_snapshots( conn, asset=asset.upper(), start=latest_ts, end=latest_ts, ) finally: conn.close() console.print( f"[cyan]Snapshot del {latest_ts.isoformat()} — {len(quotes_records)} " f"quote totali[/cyan]" ) # Costruzione OptionQuote da OptionChainQuoteRecord per riusare select_strikes. quotes: list[OptionQuote] = [] for q in quotes_records: if q.bid is None or q.ask is None or q.mid is None or q.delta is None: continue quotes.append( OptionQuote( instrument=q.instrument_name, strike=q.strike, expiry=q.expiry, option_type=q.option_type, bid=q.bid, ask=q.ask, mid=q.mid, delta=q.delta, gamma=q.gamma or Decimal("0"), theta=q.theta or Decimal("0"), vega=q.vega or Decimal("0"), open_interest=q.open_interest or 0, volume_24h=q.volume_24h or 0, book_depth_top3=q.book_depth_top3 or 0, ) ) if not quotes: console.print("[red]Nessun quote completo per la simulazione.[/red]") sys.exit(1) # Lo spot al momento dello snapshot: estraiamo dall'ultimo # `market_snapshot` ETH a quel timestamp (tolleranza ±15 min). spot = _resolve_spot_at(db_path, asset=asset.upper(), at=latest_ts) if spot is None: console.print( "[yellow]Spot non recuperabile dai market_snapshots; " "stimato dal mid ATM.[/yellow]" ) spot = _atm_spot_proxy(quotes) selection = select_strikes( chain=quotes, bias=bias, # type: ignore[arg-type] spot=spot, now=latest_ts, cfg=cfg, ) if selection is None: console.print( "[red]Il rule engine NON aprirebbe trade con questa catena[/red] " "(no strike compatibile coi gate delta/distance/width/credit-ratio)." ) sys.exit(0) short, long_ = selection width_usd = (short.strike - long_.strike).copy_abs() credit_eth = short.mid - long_.mid credit_usd = credit_eth * spot ratio = credit_usd / width_usd if width_usd > 0 else Decimal("0") ratio_target = cfg.structure.credit_to_width_ratio_min table = Table(title=f"Simulazione picker — bias={bias}, spot={spot:.0f}") table.add_column("Campo", style="cyan") table.add_column("Valore", style="bold") table.add_row("Short strike", f"{short.strike} ({short.delta:+.3f}δ)") table.add_row("Long strike", f"{long_.strike} ({long_.delta:+.3f}δ)") table.add_row("Width", f"{width_usd:.0f} USD") table.add_row("Credit", f"{credit_eth:.4f} ETH ≈ {credit_usd:.2f} USD") table.add_row( "Credit/width ratio", f"{ratio:.2%} (gate ≥ {float(ratio_target):.0%})", ) pass_str = ( "[green]PASS — entry possibile[/green]" if ratio >= ratio_target else "[red]FAIL — premio troppo magro[/red]" ) table.add_row("Verdetto gate ratio", pass_str) console.print(table) def _resolve_spot_at(db_path: Path, *, asset: str, at: datetime) -> Decimal | None: """Best-effort lookup dello spot al timestamp ``at`` ± 15 min.""" conn = connect_state(db_path) try: rows = Repository().list_market_snapshots( conn, asset=asset, start=at - timedelta(minutes=15), end=at + timedelta(minutes=15), limit=1, ) finally: conn.close() if not rows: return None return rows[0].spot def _atm_spot_proxy(quotes: list[Any]) -> Decimal: """Stima dello spot prendendo lo strike il cui delta è più vicino a 0.5.""" quote = min(quotes, key=lambda q: abs(abs(q.delta) - Decimal("0.5"))) return quote.strike def _entrypoint() -> None: """Wrapper used by ``cerbero-bite`` console script.""" try: main(prog_name="cerbero-bite") except KeyboardInterrupt: console.print("[red]interrupted[/red]") sys.exit(130) if __name__ == "__main__": _entrypoint()