feat(orchestrator): end-to-end Phase 1 runner with persistence

Loop GA completo: build_initial_population -> hypothesis.propose ->
falsification + adversarial -> compute_fitness -> persistenza ->
next_generation. Stato run/gen/genomes/evals/cost/findings su SQLite,
elite skip-eval, run marcato failed su eccezione.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-09 20:28:35 +02:00
parent 430b874b26
commit 91d160be6f
4 changed files with 261 additions and 0 deletions
+184
View File
@@ -0,0 +1,184 @@
"""End-to-end orchestrator per un run di Phase 1.
Pipeline per ogni generazione:
population -> hypothesis_agent.propose -> falsification + adversarial
-> compute_fitness -> persistenza -> next_generation
Tutto e' loggato sulla repository SQLite (runs, generations, genomes,
evaluations, cost_records, adversarial_findings) cosi' che la GUI Streamlit
possa leggere lo stato a run terminato (o in corso).
"""
from __future__ import annotations
import random
from dataclasses import dataclass, field
from pathlib import Path
import pandas as pd # type: ignore[import-untyped]
from ..agents.adversarial import AdversarialAgent
from ..agents.falsification import FalsificationAgent
from ..agents.hypothesis import HypothesisAgent
from ..agents.market_summary import build_market_summary
from ..ga.fitness import compute_fitness
from ..ga.initial import build_initial_population
from ..ga.loop import GAConfig, next_generation
from ..ga.summary import generation_summary
from ..genome.hypothesis import ModelTier
from ..llm.client import LLMClient
from ..llm.cost_tracker import CostTracker
from ..persistence.repository import Repository
@dataclass
class RunConfig:
"""Parametri di un run end-to-end della Phase 1."""
run_name: str
population_size: int = 20
n_generations: int = 10
elite_k: int = 2
tournament_k: int = 3
p_crossover: float = 0.5
seed: int = 42
model_tier: ModelTier = ModelTier.C
symbol: str = "BTC/USDT"
timeframe: str = "1h"
fees_bp: float = 5.0
n_trials_dsr: int = 50
db_path: Path = field(default_factory=lambda: Path("./runs.db"))
def run_phase1(
cfg: RunConfig,
ohlcv: pd.DataFrame,
llm: LLMClient,
) -> str:
"""Esegue il loop GA end-to-end e ritorna l'``id`` del run.
Su qualunque eccezione marca il run come ``failed`` e rilancia.
"""
rng = random.Random(cfg.seed)
repo = Repository(cfg.db_path)
repo.init_schema()
config_dict = {
**cfg.__dict__,
"db_path": str(cfg.db_path),
"model_tier": cfg.model_tier.value,
}
run_id = repo.create_run(name=cfg.run_name, config=config_dict)
market = build_market_summary(ohlcv, symbol=cfg.symbol, timeframe=cfg.timeframe)
hypothesis_agent = HypothesisAgent(llm=llm)
falsification_agent = FalsificationAgent(
fees_bp=cfg.fees_bp, n_trials_dsr=cfg.n_trials_dsr
)
adversarial_agent = AdversarialAgent(fees_bp=cfg.fees_bp)
cost_tracker = CostTracker()
population = build_initial_population(
k=cfg.population_size, model_tier=cfg.model_tier, rng=rng
)
fitnesses: dict[str, float] = {}
ga_cfg = GAConfig(
population_size=cfg.population_size,
elite_k=cfg.elite_k,
tournament_k=cfg.tournament_k,
p_crossover=cfg.p_crossover,
)
try:
for gen in range(cfg.n_generations):
for genome in population:
if genome.id in fitnesses:
continue # elite gia' valutata in generazione precedente
repo.save_genome(run_id=run_id, generation_idx=gen, genome=genome)
proposal = hypothesis_agent.propose(genome, market)
cost_record = cost_tracker.record(
input_tokens=proposal.completion.input_tokens,
output_tokens=proposal.completion.output_tokens,
tier=proposal.completion.tier,
run_id=run_id,
agent_id=genome.id,
)
repo.save_cost_record(
run_id=run_id,
agent_id=genome.id,
tier=cost_record.tier.value,
input_tokens=cost_record.input_tokens,
output_tokens=cost_record.output_tokens,
cost_usd=cost_record.cost_usd,
)
if proposal.strategy is None:
repo.save_evaluation(
run_id=run_id,
genome_id=genome.id,
fitness=0.0,
dsr=0.0,
dsr_pvalue=1.0,
sharpe=0.0,
max_dd=0.0,
total_return=0.0,
n_trades=0,
parse_error=proposal.parse_error,
raw_text=proposal.raw_text,
)
fitnesses[genome.id] = 0.0
continue
fals = falsification_agent.evaluate(proposal.strategy, ohlcv)
adv = adversarial_agent.review(proposal.strategy, ohlcv)
for finding in adv.findings:
repo.save_adversarial_finding(
run_id=run_id,
genome_id=genome.id,
name=finding.name,
severity=finding.severity.value,
detail=finding.detail,
)
fit = compute_fitness(fals, adv)
repo.save_evaluation(
run_id=run_id,
genome_id=genome.id,
fitness=fit,
dsr=fals.dsr,
dsr_pvalue=fals.dsr_pvalue,
sharpe=fals.sharpe,
max_dd=fals.max_drawdown,
total_return=fals.total_return,
n_trades=fals.n_trades,
parse_error=None,
raw_text=proposal.raw_text,
)
fitnesses[genome.id] = fit
gen_fitnesses = [fitnesses[g.id] for g in population]
summary = generation_summary(gen_fitnesses, n_bins=10)
repo.save_generation_summary(
run_id=run_id,
generation_idx=gen,
n_genomes=len(population),
fitness_median=summary["median"],
fitness_max=summary["max"],
fitness_p90=summary["p90"],
entropy=summary["entropy"],
)
if gen < cfg.n_generations - 1:
population = next_generation(population, fitnesses, ga_cfg, rng)
repo.complete_run(
run_id, total_cost=repo.total_cost(run_id), status="completed"
)
return run_id
except Exception:
repo.complete_run(
run_id, total_cost=repo.total_cost(run_id), status="failed"
)
raise
View File
+77
View File
@@ -0,0 +1,77 @@
from pathlib import Path
import numpy as np
import pandas as pd
import pytest
from multi_swarm.genome.hypothesis import ModelTier
from multi_swarm.llm.client import CompletionResult
from multi_swarm.orchestrator.run import RunConfig, run_phase1
from multi_swarm.persistence.repository import Repository
@pytest.fixture
def synthetic_ohlcv():
idx = pd.date_range("2024-01-01", periods=500, freq="1h", tz="UTC")
close = 100 + np.cumsum(np.random.RandomState(0).normal(0.01, 1.0, 500))
return pd.DataFrame(
{
"open": close,
"high": close + 0.5,
"low": close - 0.5,
"close": close,
"volume": 1.0,
},
index=idx,
)
@pytest.fixture
def fake_llm(mocker):
"""LLM mock che ritorna sempre una strategia valida."""
fake = mocker.MagicMock()
fake.complete.return_value = CompletionResult(
text=(
"```lisp\n(strategy "
"(when (gt (indicator rsi 14) 70.0) (entry-short)) "
"(when (lt (indicator rsi 14) 30.0) (entry-long)))\n```"
),
input_tokens=200,
output_tokens=80,
tier=ModelTier.C,
model="qwen",
)
return fake
def test_e2e_minimal_run_completes(
tmp_path: Path,
synthetic_ohlcv,
fake_llm,
mocker,
):
cfg = RunConfig(
run_name="e2e-test",
population_size=5,
n_generations=2,
elite_k=1,
tournament_k=2,
p_crossover=0.5,
seed=42,
model_tier=ModelTier.C,
symbol="BTC/USDT",
timeframe="1h",
fees_bp=5.0,
n_trials_dsr=10,
db_path=tmp_path / "runs.db",
)
run_id = run_phase1(cfg, ohlcv=synthetic_ohlcv, llm=fake_llm)
repo = Repository(db_path=tmp_path / "runs.db")
run = repo.get_run(run_id)
assert run["status"] == "completed"
gens = repo.list_generations(run_id)
assert len(gens) == 2
evals = repo.list_evaluations(run_id)
assert len(evals) >= 5 # almeno una popolazione