feat(persistence): SQLite schema + repository for runs/genomes/evals/cost

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-09 20:18:08 +02:00
parent 65dda09aff
commit 34f88865da
4 changed files with 368 additions and 0 deletions
+235
View File
@@ -0,0 +1,235 @@
from __future__ import annotations
import json
import sqlite3
import uuid
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from ..genome.hypothesis import HypothesisAgentGenome
from .schema import SCHEMA_SQL
class Repository:
def __init__(self, db_path: Path | str):
self.db_path = Path(db_path)
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
return conn
def init_schema(self) -> None:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._conn() as conn:
conn.executescript(SCHEMA_SQL)
@staticmethod
def _now() -> str:
return datetime.now(UTC).isoformat()
# runs
def create_run(self, name: str, config: dict[str, Any]) -> str:
rid = uuid.uuid4().hex
with self._conn() as conn:
conn.execute(
"INSERT INTO runs (id, name, started_at, status, config_json) "
"VALUES (?,?,?,?,?)",
(rid, name, self._now(), "running", json.dumps(config)),
)
return rid
def complete_run(
self, run_id: str, total_cost: float, status: str = "completed"
) -> None:
with self._conn() as conn:
conn.execute(
"UPDATE runs SET completed_at=?, status=?, total_cost_usd=? WHERE id=?",
(self._now(), status, total_cost, run_id),
)
def get_run(self, run_id: str) -> dict[str, Any]:
with self._conn() as conn:
row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone()
if row is None:
raise KeyError(run_id)
return dict(row)
def list_runs(self) -> list[dict[str, Any]]:
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM runs ORDER BY started_at DESC"
).fetchall()
return [dict(r) for r in rows]
# generations
def save_generation_summary(
self,
run_id: str,
generation_idx: int,
n_genomes: int,
fitness_median: float,
fitness_max: float,
fitness_p90: float,
entropy: float,
) -> None:
with self._conn() as conn:
conn.execute(
"""INSERT OR REPLACE INTO generations
(run_id, generation_idx, completed_at, n_genomes,
fitness_median, fitness_max, fitness_p90, entropy)
VALUES (?,?,?,?,?,?,?,?)""",
(
run_id,
generation_idx,
self._now(),
n_genomes,
fitness_median,
fitness_max,
fitness_p90,
entropy,
),
)
def list_generations(self, run_id: str) -> list[dict[str, Any]]:
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM generations WHERE run_id=? ORDER BY generation_idx",
(run_id,),
).fetchall()
return [dict(r) for r in rows]
# genomes
def save_genome(
self, run_id: str, generation_idx: int, genome: HypothesisAgentGenome
) -> None:
with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO genomes "
"(id, run_id, generation_idx, payload_json) VALUES (?,?,?,?)",
(genome.id, run_id, generation_idx, json.dumps(genome.to_dict())),
)
def list_genomes(
self, run_id: str, generation_idx: int | None = None
) -> list[dict[str, Any]]:
with self._conn() as conn:
if generation_idx is None:
rows = conn.execute(
"SELECT * FROM genomes WHERE run_id=? ORDER BY generation_idx, id",
(run_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM genomes WHERE run_id=? AND generation_idx=? "
"ORDER BY id",
(run_id, generation_idx),
).fetchall()
return [dict(r) for r in rows]
# evaluations
def save_evaluation(
self,
run_id: str,
genome_id: str,
fitness: float,
dsr: float,
dsr_pvalue: float,
sharpe: float,
max_dd: float,
total_return: float,
n_trades: int,
parse_error: str | None,
raw_text: str | None,
) -> None:
with self._conn() as conn:
conn.execute(
"""INSERT OR REPLACE INTO evaluations
(run_id, genome_id, fitness, dsr, dsr_pvalue, sharpe, max_dd,
total_return, n_trades, parse_error, raw_text, eval_ts)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(
run_id,
genome_id,
fitness,
dsr,
dsr_pvalue,
sharpe,
max_dd,
total_return,
n_trades,
parse_error,
raw_text,
self._now(),
),
)
def list_evaluations(self, run_id: str) -> list[dict[str, Any]]:
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM evaluations WHERE run_id=? ORDER BY fitness DESC",
(run_id,),
).fetchall()
return [dict(r) for r in rows]
# cost
def save_cost_record(
self,
run_id: str,
agent_id: str,
tier: str,
input_tokens: int,
output_tokens: int,
cost_usd: float,
) -> None:
with self._conn() as conn:
conn.execute(
"""INSERT INTO cost_records
(run_id, agent_id, ts, tier, input_tokens, output_tokens, cost_usd)
VALUES (?,?,?,?,?,?,?)""",
(
run_id,
agent_id,
self._now(),
tier,
input_tokens,
output_tokens,
cost_usd,
),
)
def total_cost(self, run_id: str) -> float:
with self._conn() as conn:
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0.0) AS c FROM cost_records "
"WHERE run_id=?",
(run_id,),
).fetchone()
return float(row["c"])
# adversarial
def save_adversarial_finding(
self,
run_id: str,
genome_id: str,
name: str,
severity: str,
detail: str,
) -> None:
with self._conn() as conn:
conn.execute(
"""INSERT INTO adversarial_findings
(run_id, genome_id, name, severity, detail) VALUES (?,?,?,?,?)""",
(run_id, genome_id, name, severity, detail),
)
def list_adversarial_findings(self, run_id: str) -> list[dict[str, Any]]:
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM adversarial_findings WHERE run_id=? ORDER BY id",
(run_id,),
).fetchall()
return [dict(r) for r in rows]
+77
View File
@@ -0,0 +1,77 @@
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
started_at TEXT NOT NULL,
completed_at TEXT,
status TEXT NOT NULL DEFAULT 'running',
config_json TEXT NOT NULL,
total_cost_usd REAL NOT NULL DEFAULT 0.0
);
CREATE TABLE IF NOT EXISTS generations (
run_id TEXT NOT NULL,
generation_idx INTEGER NOT NULL,
started_at TEXT,
completed_at TEXT,
n_genomes INTEGER NOT NULL,
fitness_median REAL NOT NULL,
fitness_max REAL NOT NULL,
fitness_p90 REAL NOT NULL,
entropy REAL NOT NULL,
PRIMARY KEY (run_id, generation_idx),
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE TABLE IF NOT EXISTS genomes (
id TEXT NOT NULL,
run_id TEXT NOT NULL,
generation_idx INTEGER NOT NULL,
payload_json TEXT NOT NULL,
PRIMARY KEY (id, run_id, generation_idx),
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE TABLE IF NOT EXISTS evaluations (
run_id TEXT NOT NULL,
genome_id TEXT NOT NULL,
fitness REAL NOT NULL,
dsr REAL NOT NULL,
dsr_pvalue REAL NOT NULL,
sharpe REAL NOT NULL,
max_dd REAL NOT NULL,
total_return REAL NOT NULL,
n_trades INTEGER NOT NULL,
parse_error TEXT,
raw_text TEXT,
eval_ts TEXT NOT NULL,
PRIMARY KEY (run_id, genome_id),
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE TABLE IF NOT EXISTS cost_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
ts TEXT NOT NULL,
tier TEXT NOT NULL,
input_tokens INTEGER NOT NULL,
output_tokens INTEGER NOT NULL,
cost_usd REAL NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE TABLE IF NOT EXISTS adversarial_findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
genome_id TEXT NOT NULL,
name TEXT NOT NULL,
severity TEXT NOT NULL,
detail TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE INDEX IF NOT EXISTS idx_evaluations_fitness ON evaluations(run_id, fitness DESC);
CREATE INDEX IF NOT EXISTS idx_genomes_generation ON genomes(run_id, generation_idx);
CREATE INDEX IF NOT EXISTS idx_cost_run ON cost_records(run_id);
"""