"""Tests for scripts.backup.""" from __future__ import annotations import importlib.util import sys from datetime import UTC, datetime, timedelta from pathlib import Path import pytest from cerbero_bite.state import connect, run_migrations REPO_ROOT = Path(__file__).resolve().parents[2] def _load_backup_module() -> object: """Load scripts/backup.py as a module without polluting sys.path.""" spec = importlib.util.spec_from_file_location( "_cerbero_bite_backup", REPO_ROOT / "scripts" / "backup.py" ) assert spec is not None and spec.loader is not None module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module spec.loader.exec_module(module) return module @pytest.fixture def backup_mod() -> object: return _load_backup_module() def test_backup_database_creates_snapshot(tmp_path: Path, backup_mod: object) -> None: db = tmp_path / "state.sqlite" conn = connect(db) try: run_migrations(conn) conn.execute( "INSERT INTO system_state(id, last_health_check, config_version, started_at) " "VALUES (1, '2026-04-27', '1.0.0', '2026-04-27')" ) finally: conn.close() backup_dir = tmp_path / "backups" snapshot = backup_mod.backup_database( # type: ignore[attr-defined] db_path=db, backup_dir=backup_dir, now=datetime(2026, 4, 27, 14, 0, tzinfo=UTC), ) assert snapshot.exists() assert snapshot.name == "state-20260427-14.sqlite" # Snapshot is itself a valid SQLite db with the same row. snap = connect(snapshot) try: rows = snap.execute( "SELECT config_version FROM system_state WHERE id = 1" ).fetchone() finally: snap.close() assert rows[0] == "1.0.0" def test_backup_database_replaces_existing_hour_snapshot( tmp_path: Path, backup_mod: object ) -> None: db = tmp_path / "state.sqlite" conn = connect(db) try: run_migrations(conn) finally: conn.close() when = datetime(2026, 4, 27, 14, 0, tzinfo=UTC) first = backup_mod.backup_database(db_path=db, backup_dir=tmp_path / "b", now=when) # type: ignore[attr-defined] second = backup_mod.backup_database(db_path=db, backup_dir=tmp_path / "b", now=when) # type: ignore[attr-defined] assert first == second files = list((tmp_path / "b").iterdir()) assert len(files) == 1 def test_prune_backups_removes_old_files(tmp_path: Path, backup_mod: object) -> None: backup_dir = tmp_path / "b" backup_dir.mkdir() fresh = backup_dir / "state-20260420-10.sqlite" stale = backup_dir / "state-20260101-12.sqlite" other = backup_dir / "unrelated.txt" fresh.touch() stale.touch() other.touch() deleted = backup_mod.prune_backups( # type: ignore[attr-defined] backup_dir, retention_days=30, now=datetime(2026, 4, 27, tzinfo=UTC), ) assert deleted == [stale] assert fresh.exists() assert other.exists() def test_prune_backups_ignores_unparseable_filenames( tmp_path: Path, backup_mod: object ) -> None: backup_dir = tmp_path / "b" backup_dir.mkdir() (backup_dir / "state-bogus-XX.sqlite").touch() deleted = backup_mod.prune_backups( # type: ignore[attr-defined] backup_dir, retention_days=0, now=datetime(2026, 4, 27, tzinfo=UTC) + timedelta(days=10000), ) assert deleted == [] def test_list_backups_returns_sorted(tmp_path: Path, backup_mod: object) -> None: backup_dir = tmp_path / "b" backup_dir.mkdir() a = backup_dir / "state-20260103-08.sqlite" b = backup_dir / "state-20260101-08.sqlite" a.touch() b.touch() listed = list(backup_mod.list_backups(backup_dir)) # type: ignore[attr-defined] assert listed == [b, a]