"""Hourly SQLite backup utility (``docs/05-data-model.md``). Uses ``VACUUM INTO`` so the snapshot is a self-contained, defragmented SQLite file that can be inspected without locking the live database. Retention is enforced by deletion: any file matching ``state-YYYYMMDD-HH.sqlite`` older than ``retention_days`` is removed. Designed to be invoked from APScheduler in the orchestrator and from the CLI for ad-hoc backups. """ from __future__ import annotations import argparse import re import sqlite3 from collections.abc import Iterable from datetime import UTC, datetime, timedelta from pathlib import Path __all__ = ["BACKUP_FILENAME_RE", "backup_database", "prune_backups"] BACKUP_FILENAME_RE = re.compile(r"^state-(\d{8}-\d{2})\.sqlite$") _DEFAULT_RETENTION_DAYS = 30 def _format_backup_name(now: datetime) -> str: return f"state-{now.astimezone(UTC):%Y%m%d-%H}.sqlite" def backup_database( *, db_path: Path | str, backup_dir: Path | str, now: datetime | None = None, ) -> Path: """Create a snapshot via ``VACUUM INTO`` and return its path.""" src = Path(db_path) dst_dir = Path(backup_dir) dst_dir.mkdir(parents=True, exist_ok=True) timestamp = (now or datetime.now(UTC)).astimezone(UTC) dst = dst_dir / _format_backup_name(timestamp) if dst.exists(): # Idempotent at hour granularity: same hour = same target file. dst.unlink() conn = sqlite3.connect(str(src)) try: conn.execute(f"VACUUM INTO '{dst.as_posix()}'") finally: conn.close() return dst def _parse_backup_timestamp(name: str) -> datetime | None: match = BACKUP_FILENAME_RE.match(name) if match is None: return None try: return datetime.strptime(match.group(1), "%Y%m%d-%H").replace(tzinfo=UTC) except ValueError: return None def prune_backups( backup_dir: Path | str, *, retention_days: int = _DEFAULT_RETENTION_DAYS, now: datetime | None = None, ) -> list[Path]: """Remove backups older than ``retention_days``. Returns the deleted paths.""" cutoff = (now or datetime.now(UTC)).astimezone(UTC) - timedelta(days=retention_days) deleted: list[Path] = [] for entry in Path(backup_dir).iterdir(): if not entry.is_file(): continue ts = _parse_backup_timestamp(entry.name) if ts is None: continue if ts < cutoff: entry.unlink() deleted.append(entry) return deleted def list_backups(backup_dir: Path | str) -> Iterable[Path]: return sorted( (p for p in Path(backup_dir).iterdir() if BACKUP_FILENAME_RE.match(p.name)), key=lambda p: p.name, ) def _cli() -> None: parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) parser.add_argument("--db", default="data/state.sqlite") parser.add_argument("--out", default="data/backups") parser.add_argument("--retention-days", type=int, default=_DEFAULT_RETENTION_DAYS) args = parser.parse_args() out = backup_database(db_path=args.db, backup_dir=args.out) pruned = prune_backups(args.out, retention_days=args.retention_days) print(f"backup -> {out}") if pruned: print(f"pruned: {', '.join(p.name for p in pruned)}") if __name__ == "__main__": _cli()