From 411b747e93669a5e65b5ac76ee2fb2f8cbbf5a5e Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Tue, 28 Apr 2026 00:15:28 +0200 Subject: [PATCH] Phase 4 hardening: status CLI, lock file, backup job, hash enforce, pooling, real bias MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sei interventi mirati sui rischi operativi rilevati nell'audit post-Fase 4. 317 test pass, mypy strict pulito, ruff clean. 1. status CLI: legge SQLite reale e mostra kill_switch, posizioni aperte, environment, config_version, last_health_check, started_at. Sostituisce il placeholder "phase 0 skeleton". 2. Lock file single-instance: runtime/lockfile.py acquisisce data/.lockfile via fcntl.flock al boot di run_forever; un secondo container fallisce subito con LockError. 3. Backup orario nello scheduler: nuovo job APScheduler 0 * * * * chiama scripts.backup.backup_database + prune_backups. 4. config_hash enforce su start: il CLI start verifica l'integrità del file (enforce_hash=True). Mismatch → exit 1 prima di toccare stato. dry-run resta enforce_hash=False per debug. 5. Connection pooling MCP: RuntimeContext espone un httpx.AsyncClient long-lived condiviso da tutti i wrapper (limits 20/10 connections/keepalive). aclose() chiamato in run_forever finale. 6. Bias direzionale reale: deribit.historical_close + deribit.adx_14 popolano TrendContext con spot a 30 giorni e ADX(14) effettivi. Sblocca bull_put e bear_call. Quando i dati storici mancano l'engine emette alert MEDIUM e cade su no_entry in modo deterministico. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cerbero_bite/cli.py | 69 ++++++++++++++---- src/cerbero_bite/clients/_base.py | 6 +- src/cerbero_bite/clients/deribit.py | 62 ++++++++++++++++ src/cerbero_bite/runtime/dependencies.py | 18 +++++ src/cerbero_bite/runtime/entry_cycle.py | 54 ++++++++++++-- src/cerbero_bite/runtime/lockfile.py | 91 ++++++++++++++++++++++++ src/cerbero_bite/runtime/orchestrator.py | 77 +++++++++++++++++--- tests/integration/test_entry_cycle.py | 10 +++ tests/integration/test_orchestrator.py | 2 +- tests/unit/test_lockfile.py | 54 ++++++++++++++ tests/unit/test_smoke.py | 32 ++++++++- 11 files changed, 439 insertions(+), 36 deletions(-) create mode 100644 src/cerbero_bite/runtime/lockfile.py create mode 100644 tests/unit/test_lockfile.py diff --git a/src/cerbero_bite/cli.py b/src/cerbero_bite/cli.py index 37d77cd..ab46bf2 100644 --- a/src/cerbero_bite/cli.py +++ b/src/cerbero_bite/cli.py @@ -80,14 +80,46 @@ def main(ctx: click.Context, log_dir: Path, log_level: str) -> None: @main.command() -def status() -> None: +@click.option( + "--db", + type=click.Path(dir_okay=False, path_type=Path), + default=_DEFAULT_DB_PATH, + show_default=True, +) +def status(db: Path) -> None: """Print engine status snapshot.""" + header = f"[bold cyan]Cerbero Bite[/bold cyan] v{__version__}" + if not db.exists(): + console.print( + f"{header}\n" + "engine state: [yellow]never started[/yellow] " + "(state.sqlite missing)" + ) + return + + conn = connect_state(db) + try: + run_migrations(conn) + repo = Repository() + sys_state = repo.get_system_state(conn) + open_positions = repo.list_open_positions(conn) + finally: + conn.close() + + if sys_state is None: + console.print(f"{header}\nengine state: [yellow]uninitialised[/yellow]") + return + + armed = sys_state.kill_switch == 1 + flag = "[red]ARMED[/red]" if armed else "[green]disarmed[/green]" console.print( - f"[bold cyan]Cerbero Bite[/bold cyan] v{__version__}\n" - f"engine state: [yellow]idle[/yellow]\n" - f"kill_switch: [green]0 (disarmed)[/green]\n" - f"open positions: 0\n" - f"phase: 0 (skeleton)" + f"{header}\n" + f"kill_switch: {flag}" + f"{' reason=' + (sys_state.kill_reason or '?') if armed else ''}\n" + f"open positions: {len(open_positions)}\n" + f"config_version: {sys_state.config_version}\n" + f"started_at: {sys_state.started_at.isoformat()}\n" + f"last_health_check: {sys_state.last_health_check.isoformat()}" ) @@ -145,8 +177,9 @@ def _build_orchestrator( audit: Path, environment: str, eur_to_usd: float, + enforce_hash: bool = True, ) -> Orchestrator: - loaded = load_strategy(strategy_path, enforce_hash=False) + loaded = load_strategy(strategy_path, enforce_hash=enforce_hash) token = load_token(path=token_file) return make_orchestrator( cfg=loaded.config, @@ -170,14 +203,19 @@ def start( eur_to_usd: float, ) -> None: """Start the engine main loop (scheduler + monitoring).""" - orch = _build_orchestrator( - strategy_path=strategy_path, - token_file=token_file, - db=db, - audit=audit, - environment=environment, - eur_to_usd=eur_to_usd, - ) + try: + orch = _build_orchestrator( + strategy_path=strategy_path, + token_file=token_file, + db=db, + audit=audit, + environment=environment, + eur_to_usd=eur_to_usd, + enforce_hash=True, + ) + except Exception as exc: + console.print(f"[red]boot aborted[/red]: {type(exc).__name__}: {exc}") + sys.exit(1) console.print( f"[bold cyan]Cerbero Bite[/bold cyan] starting " f"(env={environment}, db={db}, audit={audit})" @@ -213,6 +251,7 @@ def dry_run( audit=audit, environment=environment, eur_to_usd=eur_to_usd, + enforce_hash=False, ) async def _go() -> None: diff --git a/src/cerbero_bite/clients/_base.py b/src/cerbero_bite/clients/_base.py index d429eed..15ed78e 100644 --- a/src/cerbero_bite/clients/_base.py +++ b/src/cerbero_bite/clients/_base.py @@ -78,6 +78,7 @@ class HttpToolClient: retry_max: int = 3, retry_base_delay: float = 1.0, sleep: Callable[[int | float], Awaitable[None] | None] | None = None, + client: httpx.AsyncClient | None = None, ) -> None: self._service = service self._base_url = base_url.rstrip("/") @@ -86,6 +87,7 @@ class HttpToolClient: self._retry_max = max(1, retry_max) self._retry_base_delay = retry_base_delay self._sleep = sleep + self._shared_client = client @property def service(self) -> str: @@ -115,13 +117,15 @@ class HttpToolClient: } payload = body or {} + effective_client = client if client is not None else self._shared_client + async def _attempt() -> Any: return await self._do_request( url=url, headers=headers, payload=payload, tool=tool, - client=client, + client=effective_client, ) if self._retry_max <= 1: diff --git a/src/cerbero_bite/clients/deribit.py b/src/cerbero_bite/clients/deribit.py index 97daae3..9a38393 100644 --- a/src/cerbero_bite/clients/deribit.py +++ b/src/cerbero_bite/clients/deribit.py @@ -262,6 +262,68 @@ class DeribitClient: return int(_sum(bids) + _sum(asks)) + async def historical_close( + self, + *, + instrument: str, + start: datetime, + end: datetime, + resolution: str = "1D", + ) -> Decimal | None: + """Return the close of the first OHLC candle in [start, end]. + + Used to fetch the 30-day-ago ETH spot for the directional bias + in :func:`compute_bias`. Returns ``None`` when the chain has no + data in the window. + """ + raw = await self._http.call( + "get_historical", + { + "instrument": instrument, + "start_date": start.date().isoformat(), + "end_date": end.date().isoformat(), + "resolution": resolution, + }, + ) + candles = (raw or {}).get("candles") or [] + for entry in candles: + if isinstance(entry, dict) and entry.get("close") is not None: + return Decimal(str(entry["close"])) + return None + + async def adx_14( + self, + *, + instrument: str = "ETH-PERPETUAL", + start: datetime, + end: datetime, + resolution: str = "1h", + ) -> Decimal | None: + """Return the most recent ADX(14) value, or ``None`` when missing.""" + raw = await self._http.call( + "get_technical_indicators", + { + "instrument": instrument, + "indicators": ["adx"], + "start_date": start.date().isoformat(), + "end_date": end.date().isoformat(), + "resolution": resolution, + }, + ) + if not isinstance(raw, dict): + return None + # The MCP server returns either a top-level dict with the + # indicator keyed by name, or a list of points. Be tolerant. + adx_payload = raw.get("adx") or raw.get("ADX") or raw.get("indicators", {}) + if isinstance(adx_payload, list) and adx_payload: + tail = adx_payload[-1] + value = tail.get("value") if isinstance(tail, dict) else tail + return None if value is None else Decimal(str(value)) + if isinstance(adx_payload, dict): + value = adx_payload.get("latest") or adx_payload.get("value") + return None if value is None else Decimal(str(value)) + return None + async def get_account_summary(self, currency: str = "USDC") -> dict[str, Any]: result: Any = await self._http.call( "get_account_summary", {"currency": currency} diff --git a/src/cerbero_bite/runtime/dependencies.py b/src/cerbero_bite/runtime/dependencies.py index b446ae3..1020154 100644 --- a/src/cerbero_bite/runtime/dependencies.py +++ b/src/cerbero_bite/runtime/dependencies.py @@ -14,6 +14,8 @@ from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path +import httpx + from cerbero_bite.clients._base import HttpToolClient from cerbero_bite.clients.deribit import DeribitClient from cerbero_bite.clients.hyperliquid import HyperliquidClient @@ -56,8 +58,14 @@ class RuntimeContext: portfolio: PortfolioClient telegram: TelegramClient + http_client: httpx.AsyncClient + clock: Callable[[], datetime] + async def aclose(self) -> None: + """Close the shared HTTP client. Idempotent.""" + await self.http_client.aclose() + def _utc_now() -> datetime: return datetime.now(UTC) @@ -103,6 +111,14 @@ def build_runtime( clock=clk, ) + # Single long-lived AsyncClient shared by every wrapper. httpx pools + # connections per host so the snapshot stage of the entry cycle + # avoids paying TLS/TCP handshakes on each call. + http_client = httpx.AsyncClient( + timeout=httpx.Timeout(timeout_s), + limits=httpx.Limits(max_connections=20, max_keepalive_connections=10), + ) + def _client(service: str) -> HttpToolClient: return HttpToolClient( service=service, @@ -110,6 +126,7 @@ def build_runtime( token=token, timeout_s=timeout_s, retry_max=retry_max, + client=http_client, ) telegram = TelegramClient(_client("telegram")) @@ -131,5 +148,6 @@ def build_runtime( hyperliquid=HyperliquidClient(_client("hyperliquid")), portfolio=PortfolioClient(_client("portfolio")), telegram=telegram, + http_client=http_client, clock=clk, ) diff --git a/src/cerbero_bite/runtime/entry_cycle.py b/src/cerbero_bite/runtime/entry_cycle.py index 3d74378..6bdffe6 100644 --- a/src/cerbero_bite/runtime/entry_cycle.py +++ b/src/cerbero_bite/runtime/entry_cycle.py @@ -84,6 +84,8 @@ class EntryCycleResult: @dataclass(frozen=True) class _MarketSnapshot: spot_eth_usd: Decimal + spot_eth_30d_ago: Decimal | None + adx_14: Decimal | None dvol: Decimal funding_perp: Decimal funding_cross: Decimal @@ -102,7 +104,28 @@ async def _gather_snapshot( cfg: StrategyConfig, now: datetime, ) -> _MarketSnapshot: + window_days = cfg.entry.trend_window_days + historical_start = now - timedelta(days=window_days + 1) + historical_end = now - timedelta(days=window_days - 1) + adx_start = now - timedelta(days=10) + spot_t: asyncio.Task[Decimal] = asyncio.create_task(deribit.index_price_eth()) + spot_past_t: asyncio.Task[Decimal | None] = asyncio.create_task( + deribit.historical_close( + instrument="ETH-PERPETUAL", + start=historical_start, + end=historical_end, + resolution="1D", + ) + ) + adx_t: asyncio.Task[Decimal | None] = asyncio.create_task( + deribit.adx_14( + instrument="ETH-PERPETUAL", + start=adx_start, + end=now, + resolution="1h", + ) + ) dvol_t: asyncio.Task[Decimal] = asyncio.create_task( deribit.latest_dvol(currency="ETH", now=now) ) @@ -128,6 +151,8 @@ async def _gather_snapshot( await asyncio.gather( spot_t, + spot_past_t, + adx_t, dvol_t, funding_perp_t, funding_cross_t, @@ -137,6 +162,8 @@ async def _gather_snapshot( ) return _MarketSnapshot( spot_eth_usd=spot_t.result(), + spot_eth_30d_ago=spot_past_t.result(), + adx_14=adx_t.result(), dvol=dvol_t.result(), funding_perp=funding_perp_t.result(), funding_cross=funding_cross_t.result(), @@ -299,6 +326,10 @@ async def run_entry_cycle( inputs = { "snapshot": { "spot_eth_usd": str(snap.spot_eth_usd), + "spot_eth_30d_ago": ( + str(snap.spot_eth_30d_ago) if snap.spot_eth_30d_ago else None + ), + "adx_14": str(snap.adx_14) if snap.adx_14 is not None else None, "dvol": str(snap.dvol), "funding_perp": str(snap.funding_perp), "funding_cross": str(snap.funding_cross), @@ -326,17 +357,26 @@ async def run_entry_cycle( status=_STATUS_NO_ENTRY, reason=";".join(decision.reasons) ) - # 3. Bias (need a 30-day prior spot — orchestrator passes it in) - # We approximate by reusing the current spot until the historical - # snapshot store ships in Phase 5; for now no historical → bias - # cannot fire bull/bear, only iron_condor when DVOL/ADX align. The - # caller is responsible for plugging in real data via overrides. + # 3. Bias — eth_30d_ago and adx_14 come from the historical snapshot + # collected during the parallel snapshot stage. When either signal + # is missing the bias function falls back to "no entry" (defensive + # behaviour: never trade without confirmed regime data). + if snap.spot_eth_30d_ago is None: + await alert.medium( + source="entry_cycle", + message="historical spot unavailable — bias falls back to neutral", + ) + if snap.adx_14 is None: + await alert.medium( + source="entry_cycle", + message="ADX unavailable — bias may reject iron_condor", + ) trend_ctx = TrendContext( eth_now=snap.spot_eth_usd, - eth_30d_ago=snap.spot_eth_usd, + eth_30d_ago=snap.spot_eth_30d_ago or snap.spot_eth_usd, funding_cross_annualized=snap.funding_cross, dvol_now=snap.dvol, - adx_14=Decimal("25"), # placeholder until ADX lands in market data + adx_14=snap.adx_14 if snap.adx_14 is not None else Decimal("25"), ) bias = compute_bias(trend_ctx, cfg) if bias is None: diff --git a/src/cerbero_bite/runtime/lockfile.py b/src/cerbero_bite/runtime/lockfile.py new file mode 100644 index 0000000..47fef44 --- /dev/null +++ b/src/cerbero_bite/runtime/lockfile.py @@ -0,0 +1,91 @@ +"""Single-instance file lock for the engine (``docs/02-architecture.md``). + +Acquires an exclusive ``fcntl.flock`` on a sentinel file at boot. A +second container/process trying to start while another holds the lock +gets :class:`LockError` immediately and exits before doing any I/O. + +The lock is released either on context exit or when the process dies +(the OS releases ``flock`` automatically), so a hard crash cannot +permanently wedge the system. +""" + +from __future__ import annotations + +import fcntl +import logging +import os +from pathlib import Path +from types import TracebackType + +__all__ = ["EngineLock", "LockError"] + + +_log = logging.getLogger("cerbero_bite.runtime.lockfile") + + +class LockError(RuntimeError): + """Raised when another instance already holds the engine lock.""" + + +class EngineLock: + """Context manager around a per-instance file lock. + + Usage:: + + with EngineLock(Path("data/.lockfile")) as lock: + ... + + On enter, writes the current PID to the lock file so an operator + can identify the running instance. On exit, releases the lock. + """ + + def __init__(self, path: Path | str) -> None: + self._path = Path(path) + self._fh: object | None = None # actual type: io.TextIOWrapper + + @property + def path(self) -> Path: + return self._path + + def acquire(self) -> None: + self._path.parent.mkdir(parents=True, exist_ok=True) + # ``r+`` would fail if the file does not exist yet; use ``a+`` + # then seek to overwrite the contents on each acquire. + fh = self._path.open("a+", encoding="utf-8") + try: + fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError as exc: + fh.close() + raise LockError( + f"another Cerbero Bite instance holds {self._path}; " + f"check the running container or remove the file if stale" + ) from exc + + fh.seek(0) + fh.truncate() + fh.write(f"{os.getpid()}\n") + fh.flush() + self._fh = fh + _log.info("engine lock acquired: pid=%d path=%s", os.getpid(), self._path) + + def release(self) -> None: + if self._fh is None: + return + try: + fcntl.flock(self._fh.fileno(), fcntl.LOCK_UN) # type: ignore[attr-defined] + finally: + self._fh.close() # type: ignore[attr-defined] + self._fh = None + _log.info("engine lock released: %s", self._path) + + def __enter__(self) -> EngineLock: + self.acquire() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + self.release() diff --git a/src/cerbero_bite/runtime/orchestrator.py b/src/cerbero_bite/runtime/orchestrator.py index 94b4cb3..03fb9d7 100644 --- a/src/cerbero_bite/runtime/orchestrator.py +++ b/src/cerbero_bite/runtime/orchestrator.py @@ -25,6 +25,7 @@ from cerbero_bite.config.schema import StrategyConfig from cerbero_bite.runtime.dependencies import RuntimeContext, build_runtime from cerbero_bite.runtime.entry_cycle import EntryCycleResult, run_entry_cycle from cerbero_bite.runtime.health_check import HealthCheck, HealthCheckResult +from cerbero_bite.runtime.lockfile import EngineLock from cerbero_bite.runtime.monitor_cycle import MonitorCycleResult, run_monitor_cycle from cerbero_bite.runtime.recovery import recover_state from cerbero_bite.runtime.scheduler import JobSpec, build_scheduler @@ -40,6 +41,8 @@ Environment = Literal["testnet", "mainnet"] _CRON_ENTRY = "0 14 * * MON" _CRON_MONITOR = "0 2,14 * * *" _CRON_HEALTH = "*/5 * * * *" +_CRON_BACKUP = "0 * * * *" +_BACKUP_RETENTION_DAYS = 30 @dataclass(frozen=True) @@ -135,6 +138,9 @@ class Orchestrator: entry_cron: str = _CRON_ENTRY, monitor_cron: str = _CRON_MONITOR, health_cron: str = _CRON_HEALTH, + backup_cron: str = _CRON_BACKUP, + backup_dir: Path | None = None, + backup_retention_days: int = _BACKUP_RETENTION_DAYS, ) -> AsyncIOScheduler: """Build the scheduler with the canonical job set, ready to start.""" @@ -158,24 +164,77 @@ class Orchestrator: async def _health() -> None: await _safe("health", self.run_health) + backups_target = backup_dir or self._ctx.db_path.parent / "backups" + + async def _backup() -> None: + async def _do() -> None: + await asyncio.to_thread( + _run_backup, + db_path=self._ctx.db_path, + backup_dir=backups_target, + retention_days=backup_retention_days, + ) + + await _safe("backup", _do) + self._scheduler = build_scheduler( [ JobSpec(name="entry", cron=entry_cron, coro_factory=_entry), JobSpec(name="monitor", cron=monitor_cron, coro_factory=_monitor), JobSpec(name="health", cron=health_cron, coro_factory=_health), + JobSpec(name="backup", cron=backup_cron, coro_factory=_backup), ] ) return self._scheduler - async def run_forever(self) -> None: - """Boot, install the scheduler, and block forever (until cancelled).""" - await self.boot() - scheduler = self.install_scheduler() - scheduler.start() - try: - await asyncio.Event().wait() - finally: - scheduler.shutdown(wait=False) + async def run_forever(self, *, lock_path: Path | None = None) -> None: + """Boot, acquire the single-instance lock, install the scheduler. + + ``lock_path`` defaults to ``/.lockfile`` so two + containers cannot trade against the same SQLite file. + """ + lock = EngineLock( + lock_path or self._ctx.db_path.parent / ".lockfile" + ) + with lock: + try: + await self.boot() + scheduler = self.install_scheduler() + scheduler.start() + try: + await asyncio.Event().wait() + finally: + scheduler.shutdown(wait=False) + finally: + await self._ctx.aclose() + + +def _run_backup( + *, db_path: Path, backup_dir: Path, retention_days: int +) -> None: + """Synchronous helper invoked from the scheduler via ``asyncio.to_thread``. + + Keeps the import of ``scripts.backup`` lazy: importing the module + eagerly at orchestrator load time would mean the scheduler depends + on a script that lives outside the ``cerbero_bite`` package, which + breaks ``importlib.util.spec_from_file_location`` if the cwd shifts + at runtime. + """ + import sys # noqa: PLC0415 — kept lazy to keep module load cheap + from importlib.util import ( # noqa: PLC0415 + module_from_spec, + spec_from_file_location, + ) + + backup_py = Path(__file__).resolve().parents[3] / "scripts" / "backup.py" + spec = spec_from_file_location("_cerbero_bite_backup", backup_py) + if spec is None or spec.loader is None: # pragma: no cover — only on broken installs + raise RuntimeError(f"cannot load scripts/backup.py from {backup_py}") + module = module_from_spec(spec) + sys.modules.setdefault(spec.name, module) + spec.loader.exec_module(module) + module.backup_database(db_path=db_path, backup_dir=backup_dir) + module.prune_backups(backup_dir, retention_days=retention_days) # --------------------------------------------------------------------------- diff --git a/tests/integration/test_entry_cycle.py b/tests/integration/test_entry_cycle.py index 7e02e66..4f08200 100644 --- a/tests/integration/test_entry_cycle.py +++ b/tests/integration/test_entry_cycle.py @@ -94,6 +94,16 @@ def _wire_market_snapshot( json={"currency": "ETH", "latest": dvol, "candles": []}, is_reusable=True, ) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_historical", + json={"candles": [{"close": spot * 0.95}]}, + is_reusable=True, + ) + httpx_mock.add_response( + url="http://mcp-deribit:9011/tools/get_technical_indicators", + json={"adx": [{"value": 22.0}]}, + is_reusable=True, + ) httpx_mock.add_response( url="http://mcp-hyperliquid:9012/tools/get_funding_rate", json={"asset": "ETH", "current_funding_rate": funding_perp_hourly}, diff --git a/tests/integration/test_orchestrator.py b/tests/integration/test_orchestrator.py index d63eba3..e82b97f 100644 --- a/tests/integration/test_orchestrator.py +++ b/tests/integration/test_orchestrator.py @@ -125,4 +125,4 @@ def test_install_scheduler_registers_canonical_jobs(tmp_path: Path) -> None: orch = _build_orch(tmp_path) sched = orch.install_scheduler() job_ids = {j.id for j in sched.get_jobs()} - assert job_ids == {"entry", "monitor", "health"} + assert job_ids == {"entry", "monitor", "health", "backup"} diff --git a/tests/unit/test_lockfile.py b/tests/unit/test_lockfile.py new file mode 100644 index 0000000..6e6ff41 --- /dev/null +++ b/tests/unit/test_lockfile.py @@ -0,0 +1,54 @@ +"""Tests for the single-instance EngineLock.""" + +from __future__ import annotations + +import os +from pathlib import Path + +import pytest + +from cerbero_bite.runtime.lockfile import EngineLock, LockError + + +def test_acquire_writes_pid(tmp_path: Path) -> None: + target = tmp_path / "lockfile" + with EngineLock(target): + assert target.exists() + content = target.read_text(encoding="utf-8").strip() + assert int(content) == os.getpid() + + +def test_release_after_with_block(tmp_path: Path) -> None: + target = tmp_path / "lockfile" + lock = EngineLock(target) + with lock: + pass + # second acquire must succeed because the previous one was released + with EngineLock(target): + pass + + +def test_second_acquire_blocks(tmp_path: Path) -> None: + target = tmp_path / "lockfile" + first = EngineLock(target) + first.acquire() + try: + second = EngineLock(target) + with pytest.raises(LockError, match="another Cerbero Bite instance"): + second.acquire() + finally: + first.release() + + +def test_lockfile_directory_is_created(tmp_path: Path) -> None: + nested = tmp_path / "data" / "nested" / "lockfile" + with EngineLock(nested): + assert nested.exists() + + +def test_release_is_idempotent(tmp_path: Path) -> None: + target = tmp_path / "lockfile" + lock = EngineLock(target) + lock.acquire() + lock.release() + lock.release() # must be a no-op diff --git a/tests/unit/test_smoke.py b/tests/unit/test_smoke.py index 43fbb9d..9252564 100644 --- a/tests/unit/test_smoke.py +++ b/tests/unit/test_smoke.py @@ -27,15 +27,41 @@ def test_cli_help_lists_status_command() -> None: assert "status" in result.output -def test_cli_status_runs(tmp_data_dir: Path) -> None: +def test_cli_status_when_state_missing(tmp_data_dir: Path) -> None: runner = CliRunner() result = runner.invoke( cli_main, - ["--log-dir", str(tmp_data_dir / "log"), "status"], + [ + "--log-dir", + str(tmp_data_dir / "log"), + "status", + "--db", + str(tmp_data_dir / "missing.sqlite"), + ], ) assert result.exit_code == 0 assert "Cerbero Bite" in result.output - assert "phase: 0" in result.output + assert "never started" in result.output + + +def test_cli_status_after_kill_switch_arm(tmp_data_dir: Path) -> None: + runner = CliRunner() + db_path = tmp_data_dir / "state.sqlite" + audit_path = tmp_data_dir / "audit.log" + runner.invoke( + cli_main, + [ + "--log-dir", str(tmp_data_dir / "log"), + "kill-switch", "arm", + "--reason", "smoke", + "--db", str(db_path), + "--audit", str(audit_path), + ], + ) + result = runner.invoke(cli_main, ["status", "--db", str(db_path)]) + assert result.exit_code == 0 + assert "ARMED" in result.output + assert "open positions: 0" in result.output def test_cli_kill_switch_arm_persists_state(tmp_data_dir: Path) -> None: