Files
Cerbero-Bite/tests/unit/test_state_repository.py
Adriano 263470786d Phase 2: persistence + safety controls
Aggiunge la persistenza SQLite, l'audit log a hash chain, il kill
switch coordinato e i CLI di gestione documentati in
docs/05-data-model.md e docs/07-risk-controls.md. 197 test pass,
1 skipped (sqlite3 CLI mancante), copertura totale 97%.

State (`state/`):
- 0001_init.sql con positions, instructions, decisions, dvol_history,
  manual_actions, system_state.
- db.py: connect con WAL + foreign_keys + transaction ctx, runner
  forward-only basato su PRAGMA user_version.
- models.py: record Pydantic, Decimal preservato come TEXT.
- repository.py: CRUD typed con singola connessione passata, cache
  aware, posizioni concorrenti.

Safety (`safety/`):
- audit_log.py: AuditLog append-only con SHA-256 chain e fsync,
  verify_chain riconosce ogni manomissione (payload, prev_hash,
  hash, JSON, separatori).
- kill_switch.py: arm/disarm transazionali, idempotenti, accoppiati
  all'audit chain.

Config (`config/loader.py` + `strategy.yaml`):
- Loader YAML con deep-merge di strategy.local.yaml.
- Verifica config_hash SHA-256 (riga config_hash esclusa).
- File golden strategy.yaml + esempio override.

Scripts:
- dead_man.sh: watchdog shell indipendente da Python.
- backup.py: VACUUM INTO orario con retention 30 giorni.

CLI:
- audit verify (exit 2 su tampering).
- kill-switch arm/disarm/status su SQLite reale.
- state inspect con tabella posizioni aperte.
- config hash, config validate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:35:35 +02:00

451 lines
15 KiB
Python

"""CRUD tests for :mod:`cerbero_bite.state.repository`."""
from __future__ import annotations
import sqlite3
from collections.abc import Iterator
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from pathlib import Path
from uuid import uuid4
import pytest
from cerbero_bite.state import (
DecisionRecord,
DvolSnapshot,
InstructionRecord,
ManualAction,
PositionRecord,
Repository,
connect,
run_migrations,
transaction,
)
@pytest.fixture
def conn(tmp_path: Path) -> Iterator[sqlite3.Connection]:
db = tmp_path / "state.sqlite"
c = connect(db)
run_migrations(c)
yield c
c.close()
@pytest.fixture
def repo() -> Repository:
return Repository()
def _make_position(**overrides: object) -> PositionRecord:
base: dict[str, object] = {
"proposal_id": uuid4(),
"spread_type": "bull_put",
"expiry": datetime(2026, 5, 15, 8, 0, tzinfo=UTC),
"short_strike": Decimal("2475"),
"long_strike": Decimal("2350"),
"short_instrument": "ETH-15MAY26-2475-P",
"long_instrument": "ETH-15MAY26-2350-P",
"n_contracts": 2,
"spread_width_usd": Decimal("125"),
"spread_width_pct": Decimal("0.0417"),
"credit_eth": Decimal("0.030"),
"credit_usd": Decimal("90"),
"max_loss_usd": Decimal("160"),
"spot_at_entry": Decimal("3000"),
"dvol_at_entry": Decimal("50"),
"delta_at_entry": Decimal("-0.12"),
"eth_price_at_entry": Decimal("3000"),
"proposed_at": datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
"status": "proposed",
"created_at": datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
"updated_at": datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
}
base.update(overrides)
return PositionRecord(**base) # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# positions
# ---------------------------------------------------------------------------
def test_create_and_get_position_roundtrip(
conn: sqlite3.Connection, repo: Repository
) -> None:
record = _make_position()
with transaction(conn):
repo.create_position(conn, record)
fetched = repo.get_position(conn, record.proposal_id)
assert fetched is not None
assert fetched.proposal_id == record.proposal_id
assert fetched.short_strike == Decimal("2475")
# Decimal precision preserved (no float coercion).
assert fetched.spread_width_pct == Decimal("0.0417")
def test_get_unknown_position_returns_none(
conn: sqlite3.Connection, repo: Repository
) -> None:
assert repo.get_position(conn, uuid4()) is None
def test_list_open_positions_filters_by_status(
conn: sqlite3.Connection, repo: Repository
) -> None:
open_pos = _make_position(status="open")
closed_pos = _make_position(status="closed")
with transaction(conn):
repo.create_position(conn, open_pos)
repo.create_position(conn, closed_pos)
open_only = repo.list_open_positions(conn)
assert {p.proposal_id for p in open_only} == {open_pos.proposal_id}
def test_count_concurrent_positions_excludes_closed(
conn: sqlite3.Connection, repo: Repository
) -> None:
with transaction(conn):
repo.create_position(conn, _make_position(status="open"))
repo.create_position(conn, _make_position(status="awaiting_fill"))
repo.create_position(conn, _make_position(status="closed"))
repo.create_position(conn, _make_position(status="cancelled"))
assert repo.count_concurrent_positions(conn) == 2
def test_update_position_status_with_only_status_skips_optional_fields(
conn: sqlite3.Connection, repo: Repository
) -> None:
"""Hit the False side of every optional-field branch in ``update_position_status``."""
record = _make_position(status="proposed")
with transaction(conn):
repo.create_position(conn, record)
repo.update_position_status(
conn,
record.proposal_id,
status="awaiting_fill",
now=datetime(2026, 4, 27, 14, 5, tzinfo=UTC),
)
fetched = repo.get_position(conn, record.proposal_id)
assert fetched is not None
assert fetched.status == "awaiting_fill"
assert fetched.opened_at is None
assert fetched.closed_at is None
assert fetched.close_reason is None
def test_update_position_status_persists_open_then_close(
conn: sqlite3.Connection, repo: Repository
) -> None:
record = _make_position(status="awaiting_fill")
opened_at = datetime(2026, 4, 27, 14, 10, tzinfo=UTC)
with transaction(conn):
repo.create_position(conn, record)
repo.update_position_status(
conn,
record.proposal_id,
status="open",
opened_at=opened_at,
now=datetime(2026, 4, 27, 14, 11, tzinfo=UTC),
)
after_open = repo.get_position(conn, record.proposal_id)
assert after_open is not None
assert after_open.opened_at == opened_at
closed_at = datetime(2026, 5, 12, 14, 0, tzinfo=UTC)
with transaction(conn):
repo.update_position_status(
conn,
record.proposal_id,
status="closed",
closed_at=closed_at,
close_reason="CLOSE_PROFIT",
debit_paid_eth=Decimal("0.012"),
pnl_eth=Decimal("0.018"),
pnl_usd=Decimal("54"),
now=closed_at,
)
fetched = repo.get_position(conn, record.proposal_id)
assert fetched is not None
assert fetched.status == "closed"
assert fetched.close_reason == "CLOSE_PROFIT"
assert fetched.debit_paid_eth == Decimal("0.012")
assert fetched.pnl_eth == Decimal("0.018")
def test_naive_datetime_is_normalised_to_utc(
conn: sqlite3.Connection, repo: Repository
) -> None:
naive_now = datetime(2026, 4, 27, 14, 0)
record = _make_position(proposed_at=naive_now, created_at=naive_now, updated_at=naive_now)
with transaction(conn):
repo.create_position(conn, record)
fetched = repo.get_position(conn, record.proposal_id)
assert fetched is not None
assert fetched.proposed_at.tzinfo is not None
assert fetched.proposed_at.utcoffset() == timedelta(0)
# ---------------------------------------------------------------------------
# instructions
# ---------------------------------------------------------------------------
def test_instruction_lifecycle_ack_and_fill(
conn: sqlite3.Connection, repo: Repository
) -> None:
pos = _make_position(status="awaiting_fill")
instr = InstructionRecord(
instruction_id=uuid4(),
proposal_id=pos.proposal_id,
kind="open_combo",
payload_json='{"action":"open"}',
sent_at=datetime(2026, 4, 27, 14, 5, tzinfo=UTC),
)
with transaction(conn):
repo.create_position(conn, pos)
repo.create_instruction(conn, instr)
repo.update_instruction(
conn,
instr.instruction_id,
acknowledged_at=datetime(2026, 4, 27, 14, 6, tzinfo=UTC),
)
repo.update_instruction(
conn,
instr.instruction_id,
filled_at=datetime(2026, 4, 27, 14, 8, tzinfo=UTC),
actual_fill_eth=Decimal("0.0298"),
actual_fees_eth=Decimal("0.0001"),
)
fetched = repo.list_instructions(conn, pos.proposal_id)
assert len(fetched) == 1
assert fetched[0].acknowledged_at is not None
assert fetched[0].filled_at is not None
assert fetched[0].actual_fill_eth == Decimal("0.0298")
def test_update_instruction_no_op_when_no_fields(
conn: sqlite3.Connection, repo: Repository
) -> None:
pos = _make_position()
instr = InstructionRecord(
instruction_id=uuid4(),
proposal_id=pos.proposal_id,
kind="open_combo",
payload_json="{}",
sent_at=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
)
with transaction(conn):
repo.create_position(conn, pos)
repo.create_instruction(conn, instr)
repo.update_instruction(conn, instr.instruction_id) # no fields
fetched = repo.list_instructions(conn, pos.proposal_id)
assert fetched[0].acknowledged_at is None
# ---------------------------------------------------------------------------
# decisions / dvol / manual_actions
# ---------------------------------------------------------------------------
def test_record_and_list_decisions(
conn: sqlite3.Connection, repo: Repository
) -> None:
decision = DecisionRecord(
decision_type="entry_check",
timestamp=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
inputs_json='{"capital":1500}',
outputs_json='{"accepted":true}',
action_taken="propose_open",
)
with transaction(conn):
decision_id = repo.record_decision(conn, decision)
assert decision_id > 0
decisions = repo.list_decisions(conn)
assert len(decisions) == 1
assert decisions[0].id == decision_id
assert decisions[0].action_taken == "propose_open"
def test_record_dvol_snapshot_replaces_on_duplicate_timestamp(
conn: sqlite3.Connection, repo: Repository
) -> None:
ts = datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
with transaction(conn):
repo.record_dvol_snapshot(
conn, DvolSnapshot(timestamp=ts, dvol=Decimal("50"), eth_spot=Decimal("3000"))
)
repo.record_dvol_snapshot(
conn, DvolSnapshot(timestamp=ts, dvol=Decimal("55"), eth_spot=Decimal("3050"))
)
rows = conn.execute("SELECT COUNT(*) FROM dvol_history").fetchone()
assert rows[0] == 1
def test_manual_action_enqueue_consume_cycle(
conn: sqlite3.Connection, repo: Repository
) -> None:
pos = _make_position()
action = ManualAction(
kind="approve_proposal",
proposal_id=pos.proposal_id,
payload_json='{"reason":"go"}',
created_at=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
)
with transaction(conn):
repo.create_position(conn, pos)
action_id = repo.enqueue_manual_action(conn, action)
next_action = repo.next_unconsumed_action(conn)
assert next_action is not None
assert next_action.id == action_id
with transaction(conn):
repo.mark_action_consumed(
conn,
action_id,
consumed_by="orchestrator",
result="ok",
now=datetime(2026, 4, 27, 14, 1, tzinfo=UTC),
)
assert repo.next_unconsumed_action(conn) is None
# ---------------------------------------------------------------------------
# system_state
# ---------------------------------------------------------------------------
def test_init_system_state_is_idempotent(
conn: sqlite3.Connection, repo: Repository
) -> None:
now = datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
with transaction(conn):
repo.init_system_state(conn, config_version="1.0.0", now=now)
repo.init_system_state(conn, config_version="1.0.0", now=now)
state = repo.get_system_state(conn)
assert state is not None
assert state.kill_switch == 0
assert state.config_version == "1.0.0"
def test_kill_switch_arm_and_disarm(
conn: sqlite3.Connection, repo: Repository
) -> None:
now = datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
with transaction(conn):
repo.init_system_state(conn, config_version="1.0.0", now=now)
repo.set_kill_switch(conn, armed=True, reason="manual", now=now)
state = repo.get_system_state(conn)
assert state is not None
assert state.kill_switch == 1
assert state.kill_reason == "manual"
assert state.kill_at is not None
later = now + timedelta(minutes=5)
with transaction(conn):
repo.set_kill_switch(conn, armed=False, reason=None, now=later)
state = repo.get_system_state(conn)
assert state is not None
assert state.kill_switch == 0
assert state.kill_at is None
assert state.last_health_check == later
def test_get_system_state_returns_none_when_uninitialised(
conn: sqlite3.Connection, repo: Repository
) -> None:
assert repo.get_system_state(conn) is None
def test_list_positions_filters_by_status(
conn: sqlite3.Connection, repo: Repository
) -> None:
open_pos = _make_position(status="open")
closed_pos = _make_position(status="closed")
with transaction(conn):
repo.create_position(conn, open_pos)
repo.create_position(conn, closed_pos)
closed_only = repo.list_positions(conn, status="closed")
assert len(closed_only) == 1
assert closed_only[0].proposal_id == closed_pos.proposal_id
def test_list_positions_without_filter_returns_all(
conn: sqlite3.Connection, repo: Repository
) -> None:
with transaction(conn):
repo.create_position(conn, _make_position(status="open"))
repo.create_position(conn, _make_position(status="closed"))
assert len(repo.list_positions(conn)) == 2
def test_list_decisions_filters_by_proposal(
conn: sqlite3.Connection, repo: Repository
) -> None:
pos = _make_position()
with transaction(conn):
repo.create_position(conn, pos)
repo.record_decision(
conn,
DecisionRecord(
decision_type="entry_check",
proposal_id=pos.proposal_id,
timestamp=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
inputs_json="{}",
outputs_json="{}",
action_taken="propose_open",
),
)
repo.record_decision(
conn,
DecisionRecord(
decision_type="exit_check",
timestamp=datetime(2026, 4, 27, 15, 0, tzinfo=UTC),
inputs_json="{}",
outputs_json="{}",
action_taken="HOLD",
),
)
only_for_proposal = repo.list_decisions(conn, proposal_id=pos.proposal_id)
assert len(only_for_proposal) == 1
assert only_for_proposal[0].action_taken == "propose_open"
def test_update_instruction_sets_cancelled(
conn: sqlite3.Connection, repo: Repository
) -> None:
pos = _make_position()
instr = InstructionRecord(
instruction_id=uuid4(),
proposal_id=pos.proposal_id,
kind="open_combo",
payload_json="{}",
sent_at=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
)
with transaction(conn):
repo.create_position(conn, pos)
repo.create_instruction(conn, instr)
repo.update_instruction(
conn,
instr.instruction_id,
cancelled_at=datetime(2026, 4, 27, 14, 30, tzinfo=UTC),
)
fetched = repo.list_instructions(conn, pos.proposal_id)
assert fetched[0].cancelled_at is not None
def test_touch_health_check_updates_timestamp(
conn: sqlite3.Connection, repo: Repository
) -> None:
now = datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
with transaction(conn):
repo.init_system_state(conn, config_version="1.0.0", now=now)
later = now + timedelta(minutes=10)
repo.touch_health_check(conn, now=later)
state = repo.get_system_state(conn)
assert state is not None
assert state.last_health_check == later