diff --git a/src/multi_swarm/persistence/__init__.py b/src/multi_swarm/persistence/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/multi_swarm/persistence/repository.py b/src/multi_swarm/persistence/repository.py new file mode 100644 index 0000000..4e46c38 --- /dev/null +++ b/src/multi_swarm/persistence/repository.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import json +import sqlite3 +import uuid +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from ..genome.hypothesis import HypothesisAgentGenome +from .schema import SCHEMA_SQL + + +class Repository: + def __init__(self, db_path: Path | str): + self.db_path = Path(db_path) + + def _conn(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path, isolation_level=None) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA foreign_keys = ON") + conn.execute("PRAGMA journal_mode = WAL") + return conn + + def init_schema(self) -> None: + self.db_path.parent.mkdir(parents=True, exist_ok=True) + with self._conn() as conn: + conn.executescript(SCHEMA_SQL) + + @staticmethod + def _now() -> str: + return datetime.now(UTC).isoformat() + + # runs + def create_run(self, name: str, config: dict[str, Any]) -> str: + rid = uuid.uuid4().hex + with self._conn() as conn: + conn.execute( + "INSERT INTO runs (id, name, started_at, status, config_json) " + "VALUES (?,?,?,?,?)", + (rid, name, self._now(), "running", json.dumps(config)), + ) + return rid + + def complete_run( + self, run_id: str, total_cost: float, status: str = "completed" + ) -> None: + with self._conn() as conn: + conn.execute( + "UPDATE runs SET completed_at=?, status=?, total_cost_usd=? WHERE id=?", + (self._now(), status, total_cost, run_id), + ) + + def get_run(self, run_id: str) -> dict[str, Any]: + with self._conn() as conn: + row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone() + if row is None: + raise KeyError(run_id) + return dict(row) + + def list_runs(self) -> list[dict[str, Any]]: + with self._conn() as conn: + rows = conn.execute( + "SELECT * FROM runs ORDER BY started_at DESC" + ).fetchall() + return [dict(r) for r in rows] + + # generations + def save_generation_summary( + self, + run_id: str, + generation_idx: int, + n_genomes: int, + fitness_median: float, + fitness_max: float, + fitness_p90: float, + entropy: float, + ) -> None: + with self._conn() as conn: + conn.execute( + """INSERT OR REPLACE INTO generations + (run_id, generation_idx, completed_at, n_genomes, + fitness_median, fitness_max, fitness_p90, entropy) + VALUES (?,?,?,?,?,?,?,?)""", + ( + run_id, + generation_idx, + self._now(), + n_genomes, + fitness_median, + fitness_max, + fitness_p90, + entropy, + ), + ) + + def list_generations(self, run_id: str) -> list[dict[str, Any]]: + with self._conn() as conn: + rows = conn.execute( + "SELECT * FROM generations WHERE run_id=? ORDER BY generation_idx", + (run_id,), + ).fetchall() + return [dict(r) for r in rows] + + # genomes + def save_genome( + self, run_id: str, generation_idx: int, genome: HypothesisAgentGenome + ) -> None: + with self._conn() as conn: + conn.execute( + "INSERT OR REPLACE INTO genomes " + "(id, run_id, generation_idx, payload_json) VALUES (?,?,?,?)", + (genome.id, run_id, generation_idx, json.dumps(genome.to_dict())), + ) + + def list_genomes( + self, run_id: str, generation_idx: int | None = None + ) -> list[dict[str, Any]]: + with self._conn() as conn: + if generation_idx is None: + rows = conn.execute( + "SELECT * FROM genomes WHERE run_id=? ORDER BY generation_idx, id", + (run_id,), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM genomes WHERE run_id=? AND generation_idx=? " + "ORDER BY id", + (run_id, generation_idx), + ).fetchall() + return [dict(r) for r in rows] + + # evaluations + def save_evaluation( + self, + run_id: str, + genome_id: str, + fitness: float, + dsr: float, + dsr_pvalue: float, + sharpe: float, + max_dd: float, + total_return: float, + n_trades: int, + parse_error: str | None, + raw_text: str | None, + ) -> None: + with self._conn() as conn: + conn.execute( + """INSERT OR REPLACE INTO evaluations + (run_id, genome_id, fitness, dsr, dsr_pvalue, sharpe, max_dd, + total_return, n_trades, parse_error, raw_text, eval_ts) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", + ( + run_id, + genome_id, + fitness, + dsr, + dsr_pvalue, + sharpe, + max_dd, + total_return, + n_trades, + parse_error, + raw_text, + self._now(), + ), + ) + + def list_evaluations(self, run_id: str) -> list[dict[str, Any]]: + with self._conn() as conn: + rows = conn.execute( + "SELECT * FROM evaluations WHERE run_id=? ORDER BY fitness DESC", + (run_id,), + ).fetchall() + return [dict(r) for r in rows] + + # cost + def save_cost_record( + self, + run_id: str, + agent_id: str, + tier: str, + input_tokens: int, + output_tokens: int, + cost_usd: float, + ) -> None: + with self._conn() as conn: + conn.execute( + """INSERT INTO cost_records + (run_id, agent_id, ts, tier, input_tokens, output_tokens, cost_usd) + VALUES (?,?,?,?,?,?,?)""", + ( + run_id, + agent_id, + self._now(), + tier, + input_tokens, + output_tokens, + cost_usd, + ), + ) + + def total_cost(self, run_id: str) -> float: + with self._conn() as conn: + row = conn.execute( + "SELECT COALESCE(SUM(cost_usd), 0.0) AS c FROM cost_records " + "WHERE run_id=?", + (run_id,), + ).fetchone() + return float(row["c"]) + + # adversarial + def save_adversarial_finding( + self, + run_id: str, + genome_id: str, + name: str, + severity: str, + detail: str, + ) -> None: + with self._conn() as conn: + conn.execute( + """INSERT INTO adversarial_findings + (run_id, genome_id, name, severity, detail) VALUES (?,?,?,?,?)""", + (run_id, genome_id, name, severity, detail), + ) + + def list_adversarial_findings(self, run_id: str) -> list[dict[str, Any]]: + with self._conn() as conn: + rows = conn.execute( + "SELECT * FROM adversarial_findings WHERE run_id=? ORDER BY id", + (run_id,), + ).fetchall() + return [dict(r) for r in rows] diff --git a/src/multi_swarm/persistence/schema.py b/src/multi_swarm/persistence/schema.py new file mode 100644 index 0000000..5b15054 --- /dev/null +++ b/src/multi_swarm/persistence/schema.py @@ -0,0 +1,77 @@ +SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS runs ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + started_at TEXT NOT NULL, + completed_at TEXT, + status TEXT NOT NULL DEFAULT 'running', + config_json TEXT NOT NULL, + total_cost_usd REAL NOT NULL DEFAULT 0.0 +); + +CREATE TABLE IF NOT EXISTS generations ( + run_id TEXT NOT NULL, + generation_idx INTEGER NOT NULL, + started_at TEXT, + completed_at TEXT, + n_genomes INTEGER NOT NULL, + fitness_median REAL NOT NULL, + fitness_max REAL NOT NULL, + fitness_p90 REAL NOT NULL, + entropy REAL NOT NULL, + PRIMARY KEY (run_id, generation_idx), + FOREIGN KEY (run_id) REFERENCES runs(id) +); + +CREATE TABLE IF NOT EXISTS genomes ( + id TEXT NOT NULL, + run_id TEXT NOT NULL, + generation_idx INTEGER NOT NULL, + payload_json TEXT NOT NULL, + PRIMARY KEY (id, run_id, generation_idx), + FOREIGN KEY (run_id) REFERENCES runs(id) +); + +CREATE TABLE IF NOT EXISTS evaluations ( + run_id TEXT NOT NULL, + genome_id TEXT NOT NULL, + fitness REAL NOT NULL, + dsr REAL NOT NULL, + dsr_pvalue REAL NOT NULL, + sharpe REAL NOT NULL, + max_dd REAL NOT NULL, + total_return REAL NOT NULL, + n_trades INTEGER NOT NULL, + parse_error TEXT, + raw_text TEXT, + eval_ts TEXT NOT NULL, + PRIMARY KEY (run_id, genome_id), + FOREIGN KEY (run_id) REFERENCES runs(id) +); + +CREATE TABLE IF NOT EXISTS cost_records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + ts TEXT NOT NULL, + tier TEXT NOT NULL, + input_tokens INTEGER NOT NULL, + output_tokens INTEGER NOT NULL, + cost_usd REAL NOT NULL, + FOREIGN KEY (run_id) REFERENCES runs(id) +); + +CREATE TABLE IF NOT EXISTS adversarial_findings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id TEXT NOT NULL, + genome_id TEXT NOT NULL, + name TEXT NOT NULL, + severity TEXT NOT NULL, + detail TEXT NOT NULL, + FOREIGN KEY (run_id) REFERENCES runs(id) +); + +CREATE INDEX IF NOT EXISTS idx_evaluations_fitness ON evaluations(run_id, fitness DESC); +CREATE INDEX IF NOT EXISTS idx_genomes_generation ON genomes(run_id, generation_idx); +CREATE INDEX IF NOT EXISTS idx_cost_run ON cost_records(run_id); +""" diff --git a/tests/unit/test_repository.py b/tests/unit/test_repository.py new file mode 100644 index 0000000..e5e347c --- /dev/null +++ b/tests/unit/test_repository.py @@ -0,0 +1,56 @@ +import json +from pathlib import Path + +from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier +from multi_swarm.persistence.repository import Repository + + +def make_genome(idx: int) -> HypothesisAgentGenome: + return HypothesisAgentGenome( + system_prompt=f"p-{idx}", feature_access=["close"], temperature=0.9, + top_p=0.95, model_tier=ModelTier.C, lookback_window=100, cognitive_style="x", + ) + + +def test_repository_creates_schema(tmp_path: Path): + repo = Repository(db_path=tmp_path / "runs.db") + repo.init_schema() + assert (tmp_path / "runs.db").exists() + + +def test_repository_create_run_and_get(tmp_path: Path): + repo = Repository(db_path=tmp_path / "runs.db") + repo.init_schema() + run_id = repo.create_run(name="phase1-test", config={"k": 20}) + run = repo.get_run(run_id) + assert run["name"] == "phase1-test" + assert json.loads(run["config_json"])["k"] == 20 + + +def test_repository_save_genome_and_evaluation(tmp_path: Path): + repo = Repository(db_path=tmp_path / "runs.db") + repo.init_schema() + run_id = repo.create_run(name="t", config={}) + g = make_genome(0) + repo.save_genome(run_id=run_id, generation_idx=0, genome=g) + repo.save_evaluation( + run_id=run_id, genome_id=g.id, fitness=0.5, dsr=0.7, dsr_pvalue=0.05, + sharpe=1.5, max_dd=0.2, total_return=0.3, n_trades=30, + parse_error=None, raw_text="(strategy ...)", + ) + evals = repo.list_evaluations(run_id) + assert len(evals) == 1 + assert evals[0]["fitness"] == 0.5 + + +def test_repository_save_generation_summary(tmp_path: Path): + repo = Repository(db_path=tmp_path / "runs.db") + repo.init_schema() + run_id = repo.create_run(name="t", config={}) + repo.save_generation_summary( + run_id=run_id, generation_idx=0, n_genomes=20, + fitness_median=0.3, fitness_max=0.8, fitness_p90=0.7, entropy=0.85, + ) + gens = repo.list_generations(run_id) + assert len(gens) == 1 + assert gens[0]["fitness_max"] == 0.8