263470786d
Aggiunge la persistenza SQLite, l'audit log a hash chain, il kill switch coordinato e i CLI di gestione documentati in docs/05-data-model.md e docs/07-risk-controls.md. 197 test pass, 1 skipped (sqlite3 CLI mancante), copertura totale 97%. State (`state/`): - 0001_init.sql con positions, instructions, decisions, dvol_history, manual_actions, system_state. - db.py: connect con WAL + foreign_keys + transaction ctx, runner forward-only basato su PRAGMA user_version. - models.py: record Pydantic, Decimal preservato come TEXT. - repository.py: CRUD typed con singola connessione passata, cache aware, posizioni concorrenti. Safety (`safety/`): - audit_log.py: AuditLog append-only con SHA-256 chain e fsync, verify_chain riconosce ogni manomissione (payload, prev_hash, hash, JSON, separatori). - kill_switch.py: arm/disarm transazionali, idempotenti, accoppiati all'audit chain. Config (`config/loader.py` + `strategy.yaml`): - Loader YAML con deep-merge di strategy.local.yaml. - Verifica config_hash SHA-256 (riga config_hash esclusa). - File golden strategy.yaml + esempio override. Scripts: - dead_man.sh: watchdog shell indipendente da Python. - backup.py: VACUUM INTO orario con retention 30 giorni. CLI: - audit verify (exit 2 su tampering). - kill-switch arm/disarm/status su SQLite reale. - state inspect con tabella posizioni aperte. - config hash, config validate. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
127 lines
3.7 KiB
Python
127 lines
3.7 KiB
Python
"""Tests for scripts.backup."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import sys
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from cerbero_bite.state import connect, run_migrations
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
|
|
|
|
def _load_backup_module() -> object:
|
|
"""Load scripts/backup.py as a module without polluting sys.path."""
|
|
spec = importlib.util.spec_from_file_location(
|
|
"_cerbero_bite_backup", REPO_ROOT / "scripts" / "backup.py"
|
|
)
|
|
assert spec is not None and spec.loader is not None
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[spec.name] = module
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
@pytest.fixture
|
|
def backup_mod() -> object:
|
|
return _load_backup_module()
|
|
|
|
|
|
def test_backup_database_creates_snapshot(tmp_path: Path, backup_mod: object) -> None:
|
|
db = tmp_path / "state.sqlite"
|
|
conn = connect(db)
|
|
try:
|
|
run_migrations(conn)
|
|
conn.execute(
|
|
"INSERT INTO system_state(id, last_health_check, config_version, started_at) "
|
|
"VALUES (1, '2026-04-27', '1.0.0', '2026-04-27')"
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
backup_dir = tmp_path / "backups"
|
|
snapshot = backup_mod.backup_database( # type: ignore[attr-defined]
|
|
db_path=db,
|
|
backup_dir=backup_dir,
|
|
now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC),
|
|
)
|
|
assert snapshot.exists()
|
|
assert snapshot.name == "state-20260427-14.sqlite"
|
|
|
|
# Snapshot is itself a valid SQLite db with the same row.
|
|
snap = connect(snapshot)
|
|
try:
|
|
rows = snap.execute(
|
|
"SELECT config_version FROM system_state WHERE id = 1"
|
|
).fetchone()
|
|
finally:
|
|
snap.close()
|
|
assert rows[0] == "1.0.0"
|
|
|
|
|
|
def test_backup_database_replaces_existing_hour_snapshot(
|
|
tmp_path: Path, backup_mod: object
|
|
) -> None:
|
|
db = tmp_path / "state.sqlite"
|
|
conn = connect(db)
|
|
try:
|
|
run_migrations(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
when = datetime(2026, 4, 27, 14, 0, tzinfo=UTC)
|
|
first = backup_mod.backup_database(db_path=db, backup_dir=tmp_path / "b", now=when) # type: ignore[attr-defined]
|
|
second = backup_mod.backup_database(db_path=db, backup_dir=tmp_path / "b", now=when) # type: ignore[attr-defined]
|
|
assert first == second
|
|
files = list((tmp_path / "b").iterdir())
|
|
assert len(files) == 1
|
|
|
|
|
|
def test_prune_backups_removes_old_files(tmp_path: Path, backup_mod: object) -> None:
|
|
backup_dir = tmp_path / "b"
|
|
backup_dir.mkdir()
|
|
fresh = backup_dir / "state-20260420-10.sqlite"
|
|
stale = backup_dir / "state-20260101-12.sqlite"
|
|
other = backup_dir / "unrelated.txt"
|
|
fresh.touch()
|
|
stale.touch()
|
|
other.touch()
|
|
|
|
deleted = backup_mod.prune_backups( # type: ignore[attr-defined]
|
|
backup_dir,
|
|
retention_days=30,
|
|
now=datetime(2026, 4, 27, tzinfo=UTC),
|
|
)
|
|
assert deleted == [stale]
|
|
assert fresh.exists()
|
|
assert other.exists()
|
|
|
|
|
|
def test_prune_backups_ignores_unparseable_filenames(
|
|
tmp_path: Path, backup_mod: object
|
|
) -> None:
|
|
backup_dir = tmp_path / "b"
|
|
backup_dir.mkdir()
|
|
(backup_dir / "state-bogus-XX.sqlite").touch()
|
|
deleted = backup_mod.prune_backups( # type: ignore[attr-defined]
|
|
backup_dir,
|
|
retention_days=0,
|
|
now=datetime(2026, 4, 27, tzinfo=UTC) + timedelta(days=10000),
|
|
)
|
|
assert deleted == []
|
|
|
|
|
|
def test_list_backups_returns_sorted(tmp_path: Path, backup_mod: object) -> None:
|
|
backup_dir = tmp_path / "b"
|
|
backup_dir.mkdir()
|
|
a = backup_dir / "state-20260103-08.sqlite"
|
|
b = backup_dir / "state-20260101-08.sqlite"
|
|
a.touch()
|
|
b.touch()
|
|
listed = list(backup_mod.list_backups(backup_dir)) # type: ignore[attr-defined]
|
|
assert listed == [b, a]
|