diff --git a/src/multi_swarm/orchestrator/__init__.py b/src/multi_swarm/orchestrator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/multi_swarm/orchestrator/run.py b/src/multi_swarm/orchestrator/run.py new file mode 100644 index 0000000..9b7504b --- /dev/null +++ b/src/multi_swarm/orchestrator/run.py @@ -0,0 +1,184 @@ +"""End-to-end orchestrator per un run di Phase 1. + +Pipeline per ogni generazione: + + population -> hypothesis_agent.propose -> falsification + adversarial + -> compute_fitness -> persistenza -> next_generation + +Tutto e' loggato sulla repository SQLite (runs, generations, genomes, +evaluations, cost_records, adversarial_findings) cosi' che la GUI Streamlit +possa leggere lo stato a run terminato (o in corso). +""" + +from __future__ import annotations + +import random +from dataclasses import dataclass, field +from pathlib import Path + +import pandas as pd # type: ignore[import-untyped] + +from ..agents.adversarial import AdversarialAgent +from ..agents.falsification import FalsificationAgent +from ..agents.hypothesis import HypothesisAgent +from ..agents.market_summary import build_market_summary +from ..ga.fitness import compute_fitness +from ..ga.initial import build_initial_population +from ..ga.loop import GAConfig, next_generation +from ..ga.summary import generation_summary +from ..genome.hypothesis import ModelTier +from ..llm.client import LLMClient +from ..llm.cost_tracker import CostTracker +from ..persistence.repository import Repository + + +@dataclass +class RunConfig: + """Parametri di un run end-to-end della Phase 1.""" + + run_name: str + population_size: int = 20 + n_generations: int = 10 + elite_k: int = 2 + tournament_k: int = 3 + p_crossover: float = 0.5 + seed: int = 42 + model_tier: ModelTier = ModelTier.C + symbol: str = "BTC/USDT" + timeframe: str = "1h" + fees_bp: float = 5.0 + n_trials_dsr: int = 50 + db_path: Path = field(default_factory=lambda: Path("./runs.db")) + + +def run_phase1( + cfg: RunConfig, + ohlcv: pd.DataFrame, + llm: LLMClient, +) -> str: + """Esegue il loop GA end-to-end e ritorna l'``id`` del run. + + Su qualunque eccezione marca il run come ``failed`` e rilancia. + """ + rng = random.Random(cfg.seed) + + repo = Repository(cfg.db_path) + repo.init_schema() + config_dict = { + **cfg.__dict__, + "db_path": str(cfg.db_path), + "model_tier": cfg.model_tier.value, + } + run_id = repo.create_run(name=cfg.run_name, config=config_dict) + + market = build_market_summary(ohlcv, symbol=cfg.symbol, timeframe=cfg.timeframe) + + hypothesis_agent = HypothesisAgent(llm=llm) + falsification_agent = FalsificationAgent( + fees_bp=cfg.fees_bp, n_trials_dsr=cfg.n_trials_dsr + ) + adversarial_agent = AdversarialAgent(fees_bp=cfg.fees_bp) + cost_tracker = CostTracker() + + population = build_initial_population( + k=cfg.population_size, model_tier=cfg.model_tier, rng=rng + ) + fitnesses: dict[str, float] = {} + + ga_cfg = GAConfig( + population_size=cfg.population_size, + elite_k=cfg.elite_k, + tournament_k=cfg.tournament_k, + p_crossover=cfg.p_crossover, + ) + + try: + for gen in range(cfg.n_generations): + for genome in population: + if genome.id in fitnesses: + continue # elite gia' valutata in generazione precedente + repo.save_genome(run_id=run_id, generation_idx=gen, genome=genome) + proposal = hypothesis_agent.propose(genome, market) + cost_record = cost_tracker.record( + input_tokens=proposal.completion.input_tokens, + output_tokens=proposal.completion.output_tokens, + tier=proposal.completion.tier, + run_id=run_id, + agent_id=genome.id, + ) + repo.save_cost_record( + run_id=run_id, + agent_id=genome.id, + tier=cost_record.tier.value, + input_tokens=cost_record.input_tokens, + output_tokens=cost_record.output_tokens, + cost_usd=cost_record.cost_usd, + ) + + if proposal.strategy is None: + repo.save_evaluation( + run_id=run_id, + genome_id=genome.id, + fitness=0.0, + dsr=0.0, + dsr_pvalue=1.0, + sharpe=0.0, + max_dd=0.0, + total_return=0.0, + n_trades=0, + parse_error=proposal.parse_error, + raw_text=proposal.raw_text, + ) + fitnesses[genome.id] = 0.0 + continue + + fals = falsification_agent.evaluate(proposal.strategy, ohlcv) + adv = adversarial_agent.review(proposal.strategy, ohlcv) + for finding in adv.findings: + repo.save_adversarial_finding( + run_id=run_id, + genome_id=genome.id, + name=finding.name, + severity=finding.severity.value, + detail=finding.detail, + ) + fit = compute_fitness(fals, adv) + repo.save_evaluation( + run_id=run_id, + genome_id=genome.id, + fitness=fit, + dsr=fals.dsr, + dsr_pvalue=fals.dsr_pvalue, + sharpe=fals.sharpe, + max_dd=fals.max_drawdown, + total_return=fals.total_return, + n_trades=fals.n_trades, + parse_error=None, + raw_text=proposal.raw_text, + ) + fitnesses[genome.id] = fit + + gen_fitnesses = [fitnesses[g.id] for g in population] + summary = generation_summary(gen_fitnesses, n_bins=10) + repo.save_generation_summary( + run_id=run_id, + generation_idx=gen, + n_genomes=len(population), + fitness_median=summary["median"], + fitness_max=summary["max"], + fitness_p90=summary["p90"], + entropy=summary["entropy"], + ) + + if gen < cfg.n_generations - 1: + population = next_generation(population, fitnesses, ga_cfg, rng) + + repo.complete_run( + run_id, total_cost=repo.total_cost(run_id), status="completed" + ) + return run_id + except Exception: + repo.complete_run( + run_id, total_cost=repo.total_cost(run_id), status="failed" + ) + raise diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/test_e2e_minimal_run.py b/tests/integration/test_e2e_minimal_run.py new file mode 100644 index 0000000..b01aec0 --- /dev/null +++ b/tests/integration/test_e2e_minimal_run.py @@ -0,0 +1,77 @@ +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +from multi_swarm.genome.hypothesis import ModelTier +from multi_swarm.llm.client import CompletionResult +from multi_swarm.orchestrator.run import RunConfig, run_phase1 +from multi_swarm.persistence.repository import Repository + + +@pytest.fixture +def synthetic_ohlcv(): + idx = pd.date_range("2024-01-01", periods=500, freq="1h", tz="UTC") + close = 100 + np.cumsum(np.random.RandomState(0).normal(0.01, 1.0, 500)) + return pd.DataFrame( + { + "open": close, + "high": close + 0.5, + "low": close - 0.5, + "close": close, + "volume": 1.0, + }, + index=idx, + ) + + +@pytest.fixture +def fake_llm(mocker): + """LLM mock che ritorna sempre una strategia valida.""" + fake = mocker.MagicMock() + fake.complete.return_value = CompletionResult( + text=( + "```lisp\n(strategy " + "(when (gt (indicator rsi 14) 70.0) (entry-short)) " + "(when (lt (indicator rsi 14) 30.0) (entry-long)))\n```" + ), + input_tokens=200, + output_tokens=80, + tier=ModelTier.C, + model="qwen", + ) + return fake + + +def test_e2e_minimal_run_completes( + tmp_path: Path, + synthetic_ohlcv, + fake_llm, + mocker, +): + cfg = RunConfig( + run_name="e2e-test", + population_size=5, + n_generations=2, + elite_k=1, + tournament_k=2, + p_crossover=0.5, + seed=42, + model_tier=ModelTier.C, + symbol="BTC/USDT", + timeframe="1h", + fees_bp=5.0, + n_trials_dsr=10, + db_path=tmp_path / "runs.db", + ) + + run_id = run_phase1(cfg, ohlcv=synthetic_ohlcv, llm=fake_llm) + + repo = Repository(db_path=tmp_path / "runs.db") + run = repo.get_run(run_id) + assert run["status"] == "completed" + gens = repo.list_generations(run_id) + assert len(gens) == 2 + evals = repo.list_evaluations(run_id) + assert len(evals) >= 5 # almeno una popolazione