Files
Adriano c46525805b docs(plans): Phase 1 lean spike implementation plan
Plan TDD a 38 task per la Phase 1 del PoC Multi-Swarm Coevolutivo:
project skeleton, data layer (OHLCV via ccxt + walk-forward),
backtest engine, metrics (Sharpe + Deflated Sharpe Ratio), Cerbero
wrapper, protocollo S-expression (parser/validator/compiler),
genome + mutation/crossover, LLM client (OpenRouter Qwen + Anthropic
Sonnet), cost tracker, agents (hypothesis LLM-driven, falsification
e adversarial hand-crafted), GA (fitness/selection/loop/summary),
persistence SQLite, orchestrator end-to-end, dashboard Streamlit
(Overview/GA Convergence/Genomes), scripts CLI, smoke run, run reale,
gate decision memo + report tecnico.

Plan generato tramite skill writing-plans dopo decisione strategica B3
(spec docs/superpowers/specs/2026-05-09-decisione-strategica-design.md).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 18:45:09 +02:00

5283 lines
161 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Phase 1 — Lean Spike Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Costruire il loop end-to-end del PoC Multi-Swarm Coevolutivo (Hypothesis swarm K=20 + Falsification + Adversarial hand-crafted, GA con tournament selection, backtest event-driven, fitness v0 DSR) e validare i 5 hard gate di Phase 1 definiti nello spec.
**Architecture:** Python single-package `multi_swarm` con submoduli per responsabilità (data, backtest, metrics, cerbero, protocol, genome, llm, agents, ga, persistence, orchestrator, dashboard). Esecuzione sincrona single-thread, persistence SQLite, dataset cached in Parquet, GUI Streamlit multipage. Niente parallelismo in Phase 1 — performance non è obiettivo, validazione del loop sì.
**Tech Stack:** Python 3.13 + uv; pytest+pytest-mock+responses per testing; ccxt per OHLCV; pydantic v2 per config; sqlite3+sqlmodel per persistence; sexpdata per S-expression parsing; pandas+numpy+scipy per analytics; anthropic + openai SDK (OpenAI SDK punta a OpenRouter per tier C); streamlit + plotly per dashboard.
**Spec di riferimento:** `docs/superpowers/specs/2026-05-09-decisione-strategica-design.md` (sezione 4).
**Convenzioni:**
- TDD su tutto il codice di logica. Test prima, implementazione minima, refactoring.
- Commit frequenti, uno per task completato (a volte uno per step se ha senso).
- Branch: `main`. Niente feature branch in Phase 1, troppo overhead per PoC singolo autore.
- Commit message: `feat:` `test:` `chore:` `fix:` `docs:` `refactor:` prefix.
- Nessun mock di Cerbero in test integrazione: usare istanza locale Docker (testnet token).
- Nessun mock di LLM in test e2e: chiamate reali a Qwen via OpenRouter, ma con popolazione 5 e generazioni 2 per contenere costi.
---
## Task 1: Project skeleton e tooling
**Files:**
- Create: `pyproject.toml`
- Create: `.env.example`
- Create: `README.md`
- Create: `src/multi_swarm/__init__.py`
- Create: `tests/__init__.py`
- [ ] **Step 1: Creare `pyproject.toml`**
```toml
[project]
name = "multi-swarm"
version = "0.1.0"
description = "Multi-Swarm Coevolutive PoC trading swarm — Phase 1 lean spike"
authors = [{ name = "Adriano Dal Pastro", email = "adrianodalpastro@tielogic.com" }]
requires-python = ">=3.13"
dependencies = [
"ccxt>=4.4",
"pandas>=2.2",
"numpy>=2.1",
"scipy>=1.14",
"pydantic>=2.9",
"pydantic-settings>=2.6",
"sqlmodel>=0.0.22",
"sexpdata>=1.0.2",
"anthropic>=0.39",
"openai>=1.55",
"httpx>=0.28",
"tenacity>=9.0",
"pyyaml>=6.0",
"streamlit>=1.40",
"plotly>=5.24",
"pyarrow>=18.0",
]
[dependency-groups]
dev = [
"pytest>=8.3",
"pytest-mock>=3.14",
"pytest-asyncio>=0.24",
"responses>=0.25",
"ruff>=0.7",
"mypy>=1.13",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/multi_swarm"]
[tool.ruff]
line-length = 100
target-version = "py313"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "N", "UP", "B", "RUF"]
[tool.mypy]
python_version = "3.13"
strict = true
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --tb=short"
markers = [
"integration: tests that require external services (Cerbero, LLM API)",
"slow: tests that take more than 5 seconds",
]
```
- [ ] **Step 2: Creare `.env.example`**
```bash
# Cerbero MCP (locale durante Phase 1)
CERBERO_BASE_URL=http://localhost:9000
CERBERO_TESTNET_TOKEN=
CERBERO_MAINNET_TOKEN=
CERBERO_BOT_TAG=swarm-poc-phase1
# LLM providers
OPENROUTER_API_KEY=
ANTHROPIC_API_KEY=
# Run config
RUN_NAME=phase1-spike-001
DATA_DIR=./data
SERIES_DIR=./series
DB_PATH=./runs.db
```
- [ ] **Step 3: Creare `README.md` minimale**
```markdown
# Multi_Swarm_Coevolutive — Phase 1
Lean spike del PoC. Vedi `docs/superpowers/specs/2026-05-09-decisione-strategica-design.md`
per il razionale e `docs/superpowers/plans/2026-05-09-phase1-lean-spike.md` per il
piano implementativo.
## Setup
```bash
uv sync
cp .env.example .env # compilare token e API key
uv run pytest # verifica che tutto installi
```
## Cerbero locale
Phase 1 backtest legge dataset OHLCV cached, ma alcune feature di indicatore
sono delegate a Cerbero. Avviare Cerbero locale prima di eseguire un run:
```bash
cd /home/adriano/Documenti/Git_XYZ/CerberoSuite/Cerbero_mcp
docker compose up -d
```
## Comandi principali
```bash
uv run pytest # tutti i test
uv run pytest tests/unit -v # solo unit
uv run pytest tests/integration -v -m integration # solo integration
uv run python scripts/run_phase1.py # run completo Phase 1
uv run streamlit run src/multi_swarm/dashboard/streamlit_app.py
```
```
- [ ] **Step 4: Creare `src/multi_swarm/__init__.py` e `tests/__init__.py`**
```python
# src/multi_swarm/__init__.py
"""Multi_Swarm_Coevolutive — Phase 1 lean spike."""
__version__ = "0.1.0"
```
```python
# tests/__init__.py
```
- [ ] **Step 5: Sync dipendenze e verifica installazione**
Run: `uv sync && uv run python -c "import multi_swarm; print(multi_swarm.__version__)"`
Expected: stampa `0.1.0` senza errori.
- [ ] **Step 6: Commit**
```bash
git add pyproject.toml .env.example README.md src/multi_swarm/__init__.py tests/__init__.py uv.lock
git commit -m "chore: project skeleton with uv + pyproject + deps"
```
---
## Task 2: Config loader (Pydantic settings)
**Files:**
- Create: `src/multi_swarm/config.py`
- Test: `tests/unit/test_config.py`
- [ ] **Step 1: Scrivere il test fallente**
```python
# tests/unit/test_config.py
import os
from multi_swarm.config import Settings
def test_settings_loads_from_env(monkeypatch):
monkeypatch.setenv("CERBERO_BASE_URL", "http://test:9000")
monkeypatch.setenv("CERBERO_TESTNET_TOKEN", "tok-test")
monkeypatch.setenv("CERBERO_MAINNET_TOKEN", "tok-main")
monkeypatch.setenv("CERBERO_BOT_TAG", "swarm-poc-phase1")
monkeypatch.setenv("OPENROUTER_API_KEY", "or-key")
monkeypatch.setenv("ANTHROPIC_API_KEY", "an-key")
monkeypatch.setenv("RUN_NAME", "test-run")
s = Settings()
assert s.cerbero_base_url == "http://test:9000"
assert s.cerbero_testnet_token == "tok-test"
assert s.run_name == "test-run"
assert s.data_dir.name == "data"
assert s.db_path.name == "runs.db"
def test_settings_requires_tokens(monkeypatch):
monkeypatch.delenv("CERBERO_TESTNET_TOKEN", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
import pytest
from pydantic import ValidationError
with pytest.raises(ValidationError):
Settings()
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_config.py -v`
Expected: FAIL — `ModuleNotFoundError: multi_swarm.config`.
- [ ] **Step 3: Implementare `Settings`**
```python
# src/multi_swarm/config.py
from pathlib import Path
from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
case_sensitive=False,
)
cerbero_base_url: str = "http://localhost:9000"
cerbero_testnet_token: SecretStr
cerbero_mainnet_token: SecretStr | None = None
cerbero_bot_tag: str = "swarm-poc-phase1"
openrouter_api_key: SecretStr
anthropic_api_key: SecretStr | None = None
run_name: str = "phase1-spike-001"
data_dir: Path = Field(default=Path("./data"))
series_dir: Path = Field(default=Path("./series"))
db_path: Path = Field(default=Path("./runs.db"))
def load_settings() -> Settings:
return Settings()
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_config.py -v`
Expected: PASS entrambi.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/config.py tests/unit/test_config.py tests/unit/__init__.py
git commit -m "feat(config): pydantic settings loader from .env"
```
---
## Task 3: OHLCV loader (ccxt → parquet cache)
**Files:**
- Create: `src/multi_swarm/data/__init__.py`
- Create: `src/multi_swarm/data/ohlcv_loader.py`
- Test: `tests/unit/test_ohlcv_loader.py`
- [ ] **Step 1: Scrivere test fallente con mock ccxt**
```python
# tests/unit/test_ohlcv_loader.py
from datetime import datetime, timezone
from pathlib import Path
import pandas as pd
import pytest
from multi_swarm.data.ohlcv_loader import OHLCVLoader, OHLCVRequest
@pytest.fixture
def sample_ohlcv_rows():
base_ts = int(datetime(2024, 1, 1, tzinfo=timezone.utc).timestamp() * 1000)
rows = []
for i in range(48):
rows.append([base_ts + i * 3600 * 1000, 40000 + i, 40100 + i, 39900 + i, 40050 + i, 100.0 + i])
return rows
def test_loader_fetches_and_caches(tmp_path: Path, mocker, sample_ohlcv_rows):
fake_exchange = mocker.MagicMock()
fake_exchange.fetch_ohlcv.return_value = sample_ohlcv_rows
mocker.patch("multi_swarm.data.ohlcv_loader.ccxt.binance", return_value=fake_exchange)
loader = OHLCVLoader(cache_dir=tmp_path)
req = OHLCVRequest(
symbol="BTC/USDT",
timeframe="1h",
start=datetime(2024, 1, 1, tzinfo=timezone.utc),
end=datetime(2024, 1, 3, tzinfo=timezone.utc),
)
df = loader.load(req)
assert isinstance(df, pd.DataFrame)
assert list(df.columns) == ["open", "high", "low", "close", "volume"]
assert len(df) == 48
assert df.index.is_monotonic_increasing
cache_files = list(tmp_path.glob("*.parquet"))
assert len(cache_files) == 1
def test_loader_uses_cache_on_second_call(tmp_path: Path, mocker, sample_ohlcv_rows):
fake_exchange = mocker.MagicMock()
fake_exchange.fetch_ohlcv.return_value = sample_ohlcv_rows
mocker.patch("multi_swarm.data.ohlcv_loader.ccxt.binance", return_value=fake_exchange)
loader = OHLCVLoader(cache_dir=tmp_path)
req = OHLCVRequest(
symbol="BTC/USDT",
timeframe="1h",
start=datetime(2024, 1, 1, tzinfo=timezone.utc),
end=datetime(2024, 1, 3, tzinfo=timezone.utc),
)
df1 = loader.load(req)
df2 = loader.load(req)
assert fake_exchange.fetch_ohlcv.call_count == 2 # paginazione interna, non caching
pd.testing.assert_frame_equal(df1, df2)
# Seconda chiamata legge da cache, non chiama exchange
fake_exchange.fetch_ohlcv.reset_mock()
df3 = loader.load(req)
assert fake_exchange.fetch_ohlcv.call_count == 0
pd.testing.assert_frame_equal(df1, df3)
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_ohlcv_loader.py -v`
Expected: FAIL — modulo non esistente.
- [ ] **Step 3: Implementare `OHLCVLoader`**
```python
# src/multi_swarm/data/__init__.py
```
```python
# src/multi_swarm/data/ohlcv_loader.py
from __future__ import annotations
import hashlib
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import ccxt
import pandas as pd
@dataclass(frozen=True)
class OHLCVRequest:
symbol: str
timeframe: str
start: datetime
end: datetime
def cache_key(self) -> str:
s = f"{self.symbol}|{self.timeframe}|{self.start.isoformat()}|{self.end.isoformat()}"
return hashlib.sha1(s.encode()).hexdigest()[:16]
class OHLCVLoader:
"""Carica OHLCV via ccxt (Binance) e cachea in parquet."""
def __init__(self, cache_dir: Path, exchange_name: str = "binance"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.exchange_name = exchange_name
def load(self, req: OHLCVRequest) -> pd.DataFrame:
cache_file = self.cache_dir / f"{req.cache_key()}.parquet"
if cache_file.exists():
return pd.read_parquet(cache_file)
df = self._fetch_paginated(req)
df.to_parquet(cache_file)
return df
def _fetch_paginated(self, req: OHLCVRequest) -> pd.DataFrame:
exchange = getattr(ccxt, self.exchange_name)({"enableRateLimit": True})
timeframe_ms = exchange.parse_timeframe(req.timeframe) * 1000
since = int(req.start.timestamp() * 1000)
end_ms = int(req.end.timestamp() * 1000)
all_rows: list[list[float]] = []
limit = 1000
while since < end_ms:
rows = exchange.fetch_ohlcv(req.symbol, req.timeframe, since=since, limit=limit)
if not rows:
break
all_rows.extend(rows)
last_ts = rows[-1][0]
if last_ts <= since:
break
since = last_ts + timeframe_ms
if len(rows) < limit:
break
df = pd.DataFrame(all_rows, columns=["ts", "open", "high", "low", "close", "volume"])
df = df.drop_duplicates(subset=["ts"]).sort_values("ts")
df["ts"] = pd.to_datetime(df["ts"], unit="ms", utc=True)
df = df.set_index("ts")
df = df[(df.index >= req.start) & (df.index < req.end)]
return df[["open", "high", "low", "close", "volume"]].astype("float64")
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_ohlcv_loader.py -v`
Expected: PASS entrambi.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/data/ tests/unit/test_ohlcv_loader.py
git commit -m "feat(data): OHLCV loader via ccxt with parquet cache"
```
---
## Task 4: Walk-forward expanding splits
**Files:**
- Create: `src/multi_swarm/data/splits.py`
- Test: `tests/unit/test_splits.py`
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_splits.py
from datetime import datetime, timezone, timedelta
import pandas as pd
import pytest
from multi_swarm.data.splits import expanding_walk_forward, Split
@pytest.fixture
def daily_index():
return pd.date_range("2024-01-01", "2024-12-31", freq="D", tz="UTC")
def test_expanding_split_count(daily_index: pd.DatetimeIndex):
splits = expanding_walk_forward(
daily_index, train_ratio=0.7, n_folds=4, min_train_days=30
)
assert len(splits) == 4
def test_expanding_split_train_grows(daily_index: pd.DatetimeIndex):
splits = expanding_walk_forward(
daily_index, train_ratio=0.7, n_folds=4, min_train_days=30
)
train_lengths = [len(s.train_idx) for s in splits]
assert train_lengths == sorted(train_lengths)
assert train_lengths[0] < train_lengths[-1]
def test_no_overlap_train_test(daily_index: pd.DatetimeIndex):
splits = expanding_walk_forward(
daily_index, train_ratio=0.7, n_folds=4, min_train_days=30
)
for s in splits:
assert s.train_idx[-1] < s.test_idx[0]
def test_min_train_days_respected():
idx = pd.date_range("2024-01-01", "2024-02-15", freq="D", tz="UTC")
splits = expanding_walk_forward(idx, train_ratio=0.7, n_folds=2, min_train_days=20)
for s in splits:
assert len(s.train_idx) >= 20
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_splits.py -v`
Expected: FAIL — modulo non esistente.
- [ ] **Step 3: Implementare splits**
```python
# src/multi_swarm/data/splits.py
from __future__ import annotations
from dataclasses import dataclass
import pandas as pd
@dataclass(frozen=True)
class Split:
fold: int
train_idx: pd.DatetimeIndex
test_idx: pd.DatetimeIndex
def expanding_walk_forward(
index: pd.DatetimeIndex,
train_ratio: float = 0.7,
n_folds: int = 4,
min_train_days: int = 30,
) -> list[Split]:
"""Genera split walk-forward expanding: train cresce, test è la finestra successiva.
Esempio con n_folds=4, train_ratio=0.7:
fold 0: train [0..a0], test [a0..a0+(end-a0)/4]
fold 1: train [0..a1], test [a1..a1+(end-a1)/4]
...
Il train iniziale parte da train_ratio dell'intervallo totale.
"""
if n_folds < 1:
raise ValueError("n_folds must be >= 1")
if not 0 < train_ratio < 1:
raise ValueError("train_ratio must be in (0,1)")
total = len(index)
initial_train = int(total * train_ratio)
remaining = total - initial_train
fold_size = max(1, remaining // n_folds)
splits: list[Split] = []
for f in range(n_folds):
train_end = initial_train + f * fold_size
test_start = train_end
test_end = min(test_start + fold_size, total)
train_idx = index[:train_end]
test_idx = index[test_start:test_end]
if len(train_idx) < min_train_days or len(test_idx) == 0:
continue
splits.append(Split(fold=f, train_idx=train_idx, test_idx=test_idx))
return splits
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_splits.py -v`
Expected: PASS tutti e 4.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/data/splits.py tests/unit/test_splits.py
git commit -m "feat(data): expanding walk-forward splits"
```
---
## Task 5: Backtest core dataclasses (Order, Position, Trade)
**Files:**
- Create: `src/multi_swarm/backtest/__init__.py`
- Create: `src/multi_swarm/backtest/orders.py`
- Test: `tests/unit/test_backtest_orders.py`
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_backtest_orders.py
from datetime import datetime, timezone
import pytest
from multi_swarm.backtest.orders import Order, Side, Position, Trade
def test_order_validates_side():
o = Order(ts=datetime(2024, 1, 1, tzinfo=timezone.utc), side=Side.LONG, size=1.0)
assert o.side == Side.LONG
def test_position_pnl_long():
pos = Position(side=Side.LONG, entry_price=100.0, size=2.0)
assert pos.unrealized_pnl(110.0) == pytest.approx(20.0)
assert pos.unrealized_pnl(90.0) == pytest.approx(-20.0)
def test_position_pnl_short():
pos = Position(side=Side.SHORT, entry_price=100.0, size=2.0)
assert pos.unrealized_pnl(110.0) == pytest.approx(-20.0)
assert pos.unrealized_pnl(90.0) == pytest.approx(20.0)
def test_trade_realized_pnl_with_fees():
t = Trade(
entry_ts=datetime(2024, 1, 1, tzinfo=timezone.utc),
exit_ts=datetime(2024, 1, 2, tzinfo=timezone.utc),
side=Side.LONG,
size=1.0,
entry_price=100.0,
exit_price=110.0,
fees_bp=5.0,
)
# gross 10, fees = 5bp * (100+110) = 0.005 * 210 = 1.05
assert t.gross_pnl == pytest.approx(10.0)
assert t.fees == pytest.approx(0.105)
assert t.net_pnl == pytest.approx(9.895)
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_backtest_orders.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare orders**
```python
# src/multi_swarm/backtest/__init__.py
```
```python
# src/multi_swarm/backtest/orders.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class Side(str, Enum):
LONG = "long"
SHORT = "short"
FLAT = "flat"
@dataclass(frozen=True)
class Order:
ts: datetime
side: Side
size: float
@dataclass(frozen=True)
class Position:
side: Side
entry_price: float
size: float
def unrealized_pnl(self, current_price: float) -> float:
if self.side == Side.LONG:
return (current_price - self.entry_price) * self.size
if self.side == Side.SHORT:
return (self.entry_price - current_price) * self.size
return 0.0
@dataclass(frozen=True)
class Trade:
entry_ts: datetime
exit_ts: datetime
side: Side
size: float
entry_price: float
exit_price: float
fees_bp: float = 5.0
@property
def gross_pnl(self) -> float:
if self.side == Side.LONG:
return (self.exit_price - self.entry_price) * self.size
return (self.entry_price - self.exit_price) * self.size
@property
def fees(self) -> float:
notional_in = self.entry_price * self.size
notional_out = self.exit_price * self.size
return (self.fees_bp / 10000.0) * (notional_in + notional_out)
@property
def net_pnl(self) -> float:
return self.gross_pnl - self.fees
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_backtest_orders.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/backtest/ tests/unit/test_backtest_orders.py
git commit -m "feat(backtest): Order/Position/Trade dataclasses with fees"
```
---
## Task 6: Backtest engine event-driven semplificato
**Files:**
- Create: `src/multi_swarm/backtest/engine.py`
- Test: `tests/unit/test_backtest_engine.py`
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_backtest_engine.py
from datetime import datetime, timezone
import numpy as np
import pandas as pd
import pytest
from multi_swarm.backtest.engine import BacktestEngine, Signal
from multi_swarm.backtest.orders import Side
@pytest.fixture
def trending_ohlcv():
idx = pd.date_range("2024-01-01", periods=100, freq="1h", tz="UTC")
close = np.linspace(100, 120, 100)
df = pd.DataFrame(
{"open": close, "high": close + 0.5, "low": close - 0.5, "close": close, "volume": 1.0},
index=idx,
)
return df
def test_engine_no_signals_zero_pnl(trending_ohlcv):
signals = pd.Series([Side.FLAT] * len(trending_ohlcv), index=trending_ohlcv.index)
engine = BacktestEngine(fees_bp=5.0)
result = engine.run(trending_ohlcv, signals)
assert result.equity_curve.iloc[-1] == pytest.approx(0.0)
assert len(result.trades) == 0
def test_engine_long_in_uptrend_makes_profit(trending_ohlcv):
signals = pd.Series([Side.LONG] * len(trending_ohlcv), index=trending_ohlcv.index)
engine = BacktestEngine(fees_bp=5.0)
result = engine.run(trending_ohlcv, signals)
assert result.equity_curve.iloc[-1] > 0
assert len(result.trades) == 1
assert result.trades[0].side == Side.LONG
def test_engine_position_flips_on_side_change(trending_ohlcv):
half = len(trending_ohlcv) // 2
signals = pd.Series(
[Side.LONG] * half + [Side.SHORT] * (len(trending_ohlcv) - half),
index=trending_ohlcv.index,
)
engine = BacktestEngine(fees_bp=5.0)
result = engine.run(trending_ohlcv, signals)
assert len(result.trades) == 2
assert result.trades[0].side == Side.LONG
assert result.trades[1].side == Side.SHORT
def test_engine_fees_are_subtracted(trending_ohlcv):
signals = pd.Series([Side.LONG] * len(trending_ohlcv), index=trending_ohlcv.index)
engine_no_fees = BacktestEngine(fees_bp=0.0)
engine_fees = BacktestEngine(fees_bp=10.0)
r1 = engine_no_fees.run(trending_ohlcv, signals)
r2 = engine_fees.run(trending_ohlcv, signals)
assert r1.equity_curve.iloc[-1] > r2.equity_curve.iloc[-1]
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_backtest_engine.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare engine**
```python
# src/multi_swarm/backtest/engine.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
import pandas as pd
from .orders import Position, Side, Trade
Signal = Side # alias semantico
@dataclass(frozen=True)
class BacktestResult:
equity_curve: pd.Series
returns: pd.Series
trades: list[Trade]
class BacktestEngine:
"""Engine event-driven sincrono: itera bar per bar, applica segnali con
delay di 1 bar (segnale a t → eseguito a t+1 open) per evitare lookahead.
Position sizing: 1 unit per posizione. Fees applicati su entry+exit.
Niente leva, niente liquidation, niente funding (semplificazione Phase 1).
"""
def __init__(self, fees_bp: float = 5.0):
self.fees_bp = fees_bp
def run(self, ohlcv: pd.DataFrame, signals: pd.Series) -> BacktestResult:
signals = signals.reindex(ohlcv.index).ffill().fillna(Side.FLAT)
position: Position | None = None
trades: list[Trade] = []
equity = 0.0
equity_history: list[float] = []
returns_history: list[float] = []
prev_equity = 0.0
# Esecuzione con delay 1: segnale a t-1 esegue a open di t.
executed_side = pd.Series(Side.FLAT, index=ohlcv.index)
executed_side.iloc[1:] = signals.iloc[:-1].values
for ts, row in ohlcv.iterrows():
target_side = executed_side.loc[ts]
current_side = position.side if position else Side.FLAT
if target_side != current_side:
if position is not None:
trade = Trade(
entry_ts=position_entry_ts,
exit_ts=ts,
side=position.side,
size=position.size,
entry_price=position.entry_price,
exit_price=row["open"],
fees_bp=self.fees_bp,
)
trades.append(trade)
equity += trade.net_pnl
position = None
if target_side in (Side.LONG, Side.SHORT):
position = Position(side=target_side, entry_price=row["open"], size=1.0)
position_entry_ts = ts
mark = row["close"]
mtm = position.unrealized_pnl(mark) if position else 0.0
current_equity = equity + mtm
equity_history.append(current_equity)
returns_history.append(current_equity - prev_equity)
prev_equity = current_equity
if position is not None:
last_ts = ohlcv.index[-1]
last_close = ohlcv["close"].iloc[-1]
trade = Trade(
entry_ts=position_entry_ts,
exit_ts=last_ts,
side=position.side,
size=position.size,
entry_price=position.entry_price,
exit_price=last_close,
fees_bp=self.fees_bp,
)
trades.append(trade)
equity += trade.net_pnl
equity_history[-1] = equity
if len(returns_history) >= 2:
returns_history[-1] = equity - equity_history[-2]
return BacktestResult(
equity_curve=pd.Series(equity_history, index=ohlcv.index, name="equity"),
returns=pd.Series(returns_history, index=ohlcv.index, name="returns"),
trades=trades,
)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_backtest_engine.py -v`
Expected: PASS tutti e 4.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/backtest/engine.py tests/unit/test_backtest_engine.py
git commit -m "feat(backtest): event-driven engine with 1-bar exec delay"
```
---
## Task 7: Metrics base (Sharpe, drawdown, returns)
**Files:**
- Create: `src/multi_swarm/metrics/__init__.py`
- Create: `src/multi_swarm/metrics/basic.py`
- Test: `tests/unit/test_metrics_basic.py`
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_metrics_basic.py
import numpy as np
import pandas as pd
import pytest
from multi_swarm.metrics.basic import sharpe_ratio, max_drawdown, total_return
def test_sharpe_zero_returns():
r = pd.Series([0.0] * 100)
assert sharpe_ratio(r, periods_per_year=8760) == 0.0
def test_sharpe_positive_returns():
np.random.seed(42)
r = pd.Series(np.random.normal(0.001, 0.01, 1000))
s = sharpe_ratio(r, periods_per_year=8760)
assert s > 0
def test_sharpe_negative_returns():
np.random.seed(42)
r = pd.Series(np.random.normal(-0.001, 0.01, 1000))
s = sharpe_ratio(r, periods_per_year=8760)
assert s < 0
def test_max_drawdown_monotonic_up():
eq = pd.Series([100.0, 105.0, 110.0, 115.0, 120.0])
assert max_drawdown(eq) == pytest.approx(0.0)
def test_max_drawdown_known_curve():
eq = pd.Series([100.0, 110.0, 90.0, 95.0, 105.0])
# peak 110, trough 90, drawdown = (110-90)/110 ≈ 0.1818
assert max_drawdown(eq) == pytest.approx(20.0 / 110.0)
def test_total_return():
eq = pd.Series([100.0, 110.0, 105.0, 120.0])
assert total_return(eq) == pytest.approx(0.20)
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_metrics_basic.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare metrics base**
```python
# src/multi_swarm/metrics/__init__.py
```
```python
# src/multi_swarm/metrics/basic.py
from __future__ import annotations
import numpy as np
import pandas as pd
def sharpe_ratio(returns: pd.Series, periods_per_year: int = 8760, rf: float = 0.0) -> float:
"""Sharpe annualizzato. periods_per_year=8760 per dati orari."""
excess = returns - rf / periods_per_year
std = excess.std(ddof=1)
if std == 0 or np.isnan(std):
return 0.0
return float(np.sqrt(periods_per_year) * excess.mean() / std)
def max_drawdown(equity: pd.Series) -> float:
"""Max drawdown percentuale (positivo)."""
peak = equity.cummax()
dd = (peak - equity) / peak.replace(0, np.nan)
dd = dd.fillna(0.0)
return float(dd.max())
def total_return(equity: pd.Series) -> float:
if equity.iloc[0] == 0:
return float(equity.iloc[-1])
return float(equity.iloc[-1] / equity.iloc[0] - 1.0)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_metrics_basic.py -v`
Expected: PASS tutti e 6.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/metrics/ tests/unit/test_metrics_basic.py
git commit -m "feat(metrics): Sharpe + max drawdown + total return"
```
---
## Task 8: Deflated Sharpe Ratio (Bailey & López de Prado)
**Files:**
- Create: `src/multi_swarm/metrics/dsr.py`
- Test: `tests/unit/test_metrics_dsr.py`
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_metrics_dsr.py
import numpy as np
import pandas as pd
import pytest
from multi_swarm.metrics.dsr import deflated_sharpe_ratio, expected_max_sharpe
def test_expected_max_sharpe_grows_with_n_trials():
e1 = expected_max_sharpe(n_trials=1, sharpe_var=1.0)
e10 = expected_max_sharpe(n_trials=10, sharpe_var=1.0)
e100 = expected_max_sharpe(n_trials=100, sharpe_var=1.0)
assert e1 < e10 < e100
def test_dsr_zero_when_sharpe_equals_expected_max():
np.random.seed(0)
returns = pd.Series(np.random.normal(0, 0.01, 500))
dsr, p = deflated_sharpe_ratio(
returns, n_trials=10, periods_per_year=8760, sharpe_var=0.0
)
# Con sharpe_var=0 e Sharpe stimato vicino a 0, p-value deve essere alto.
assert 0.0 <= p <= 1.0
def test_dsr_significant_for_strong_sharpe():
np.random.seed(42)
returns = pd.Series(np.random.normal(0.005, 0.005, 1000))
dsr, p = deflated_sharpe_ratio(
returns, n_trials=5, periods_per_year=8760, sharpe_var=1.0
)
# Sharpe atteso > 0 e p-value basso
assert dsr > 0
assert p < 0.5
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_metrics_dsr.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare DSR**
```python
# src/multi_swarm/metrics/dsr.py
from __future__ import annotations
import numpy as np
import pandas as pd
from scipy import stats
from .basic import sharpe_ratio
EULER_MASCHERONI = 0.5772156649015329
def expected_max_sharpe(n_trials: int, sharpe_var: float) -> float:
"""E[max SR] su n_trials con varianza sharpe_var (Bailey & Lopez de Prado).
Formula: sqrt(sharpe_var) * ((1-γ) * Φ⁻¹(1 - 1/N) + γ * Φ⁻¹(1 - 1/(N·e)))
dove γ è la costante di Eulero-Mascheroni.
"""
if n_trials < 2:
return 0.0
e = np.e
z1 = stats.norm.ppf(1 - 1.0 / n_trials)
z2 = stats.norm.ppf(1 - 1.0 / (n_trials * e))
return float(np.sqrt(sharpe_var) * ((1 - EULER_MASCHERONI) * z1 + EULER_MASCHERONI * z2))
def deflated_sharpe_ratio(
returns: pd.Series,
n_trials: int,
periods_per_year: int = 8760,
sharpe_var: float = 1.0,
skewness: float | None = None,
kurtosis: float | None = None,
) -> tuple[float, float]:
"""Deflated Sharpe Ratio (DSR) e p-value associato.
Restituisce (DSR, p_value). p_value è la prob. che lo SR osservato sia
superiore al massimo atteso sotto null. p_value bassi (es. < 0.05)
indicano significatività dopo correzione per multiple testing.
"""
n = len(returns)
if n < 30:
return 0.0, 1.0
sr = sharpe_ratio(returns, periods_per_year=periods_per_year)
sr_period = sr / np.sqrt(periods_per_year)
if skewness is None:
skewness = float(stats.skew(returns, bias=False))
if kurtosis is None:
kurtosis = float(stats.kurtosis(returns, fisher=True, bias=False))
sr_expected_max = expected_max_sharpe(n_trials, sharpe_var) / np.sqrt(periods_per_year)
denom = np.sqrt(
max(
(1 - skewness * sr_period + ((kurtosis - 1) / 4.0) * sr_period**2) / (n - 1),
1e-12,
)
)
z = (sr_period - sr_expected_max) / denom
p_value = float(1.0 - stats.norm.cdf(z))
dsr = float(stats.norm.cdf(z))
return dsr, p_value
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_metrics_dsr.py -v`
Expected: PASS tutti e 3.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/metrics/dsr.py tests/unit/test_metrics_dsr.py
git commit -m "feat(metrics): Deflated Sharpe Ratio (Bailey & Lopez de Prado)"
```
---
## Task 9: Cerbero HTTP client
**Files:**
- Create: `src/multi_swarm/cerbero/__init__.py`
- Create: `src/multi_swarm/cerbero/client.py`
- Test: `tests/unit/test_cerbero_client.py`
- [ ] **Step 1: Scrivere test fallente con `responses`**
```python
# tests/unit/test_cerbero_client.py
import responses
from multi_swarm.cerbero.client import CerberoClient
@responses.activate
def test_call_tool_passes_bearer_and_bot_tag():
responses.add(
responses.POST,
"http://test:9000/mcp-deribit/tools/get_iv_rank",
json={"iv_rank": 0.42},
status=200,
)
client = CerberoClient(base_url="http://test:9000", token="tok-xyz", bot_tag="swarm-poc-phase1")
result = client.call_tool("deribit", "get_iv_rank", {"symbol": "BTC-PERPETUAL"})
assert result == {"iv_rank": 0.42}
req = responses.calls[0].request
assert req.headers["Authorization"] == "Bearer tok-xyz"
assert req.headers["X-Bot-Tag"] == "swarm-poc-phase1"
@responses.activate
def test_call_tool_raises_on_error():
responses.add(
responses.POST,
"http://test:9000/mcp-deribit/tools/get_iv_rank",
json={"error": "bad"},
status=400,
)
client = CerberoClient(base_url="http://test:9000", token="tok-xyz", bot_tag="swarm-poc-phase1")
import pytest
with pytest.raises(RuntimeError):
client.call_tool("deribit", "get_iv_rank", {})
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_cerbero_client.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare client**
```python
# src/multi_swarm/cerbero/__init__.py
```
```python
# src/multi_swarm/cerbero/client.py
from __future__ import annotations
from typing import Any
import httpx
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
class CerberoClient:
"""Client HTTP minimale verso Cerbero MCP unified server."""
def __init__(
self,
base_url: str,
token: str,
bot_tag: str,
timeout_seconds: float = 10.0,
):
self.base_url = base_url.rstrip("/")
self.token = token
self.bot_tag = bot_tag
self._client = httpx.Client(
timeout=timeout_seconds,
headers={
"Authorization": f"Bearer {token}",
"X-Bot-Tag": bot_tag,
"Content-Type": "application/json",
},
)
def close(self) -> None:
self._client.close()
def __enter__(self) -> CerberoClient:
return self
def __exit__(self, *exc: object) -> None:
self.close()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0),
retry=retry_if_exception_type(httpx.TransportError),
reraise=True,
)
def call_tool(self, exchange: str, tool: str, args: dict[str, Any]) -> Any:
url = f"{self.base_url}/mcp-{exchange}/tools/{tool}"
resp = self._client.post(url, json=args)
if resp.status_code >= 400:
raise RuntimeError(f"Cerbero {exchange}/{tool} returned {resp.status_code}: {resp.text}")
return resp.json()
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_cerbero_client.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/cerbero/ tests/unit/test_cerbero_client.py
git commit -m "feat(cerbero): HTTP client with bearer + bot-tag + retry"
```
---
## Task 10: Cerbero tools wrapper (indicatori usati da Phase 1)
**Files:**
- Create: `src/multi_swarm/cerbero/tools.py`
- Test: `tests/unit/test_cerbero_tools.py`
In Phase 1 gli agenti possono richiedere un sottoinsieme limitato di indicatori: SMA, RSI, ATR, MACD (technical), realized_vol (volatility), funding_rate (microstructure). Il wrapper espone una funzione Python per ognuno, mascherando il dettaglio HTTP.
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_cerbero_tools.py
import pytest
from multi_swarm.cerbero.tools import CerberoTools
def test_tools_dispatch_sma(mocker):
fake_client = mocker.MagicMock()
fake_client.call_tool.return_value = {"value": 100.0}
t = CerberoTools(fake_client)
out = t.sma(exchange="bybit", symbol="BTCUSDT", timeframe="1h", length=20)
fake_client.call_tool.assert_called_once_with(
"bybit", "sma", {"symbol": "BTCUSDT", "timeframe": "1h", "length": 20}
)
assert out == {"value": 100.0}
def test_tools_dispatch_rsi(mocker):
fake_client = mocker.MagicMock()
fake_client.call_tool.return_value = {"value": 55.0}
t = CerberoTools(fake_client)
out = t.rsi(exchange="bybit", symbol="BTCUSDT", timeframe="1h", length=14)
fake_client.call_tool.assert_called_once_with(
"bybit", "rsi", {"symbol": "BTCUSDT", "timeframe": "1h", "length": 14}
)
assert out == {"value": 55.0}
def test_tools_unknown_raises(mocker):
fake_client = mocker.MagicMock()
t = CerberoTools(fake_client)
with pytest.raises(AttributeError):
t.nonexistent_tool() # type: ignore[attr-defined]
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_cerbero_tools.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare wrapper**
```python
# src/multi_swarm/cerbero/tools.py
from __future__ import annotations
from typing import Any
from .client import CerberoClient
class CerberoTools:
"""Sottoinsieme di tool MCP esposti agli agenti in Phase 1."""
def __init__(self, client: CerberoClient):
self._client = client
def sma(self, exchange: str, symbol: str, timeframe: str, length: int) -> Any:
return self._client.call_tool(
exchange, "sma", {"symbol": symbol, "timeframe": timeframe, "length": length}
)
def rsi(self, exchange: str, symbol: str, timeframe: str, length: int = 14) -> Any:
return self._client.call_tool(
exchange, "rsi", {"symbol": symbol, "timeframe": timeframe, "length": length}
)
def atr(self, exchange: str, symbol: str, timeframe: str, length: int = 14) -> Any:
return self._client.call_tool(
exchange, "atr", {"symbol": symbol, "timeframe": timeframe, "length": length}
)
def macd(self, exchange: str, symbol: str, timeframe: str, fast: int = 12, slow: int = 26, signal: int = 9) -> Any:
return self._client.call_tool(
exchange, "macd",
{"symbol": symbol, "timeframe": timeframe, "fast": fast, "slow": slow, "signal": signal},
)
def realized_vol(self, exchange: str, symbol: str, timeframe: str, window: int = 24) -> Any:
return self._client.call_tool(
exchange, "realized_vol",
{"symbol": symbol, "timeframe": timeframe, "window": window},
)
def funding_rate(self, exchange: str, symbol: str) -> Any:
return self._client.call_tool(exchange, "funding_rate", {"symbol": symbol})
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_cerbero_tools.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/cerbero/tools.py tests/unit/test_cerbero_tools.py
git commit -m "feat(cerbero): tools wrapper for Phase 1 indicator subset"
```
---
## Task 11: Protocollo S-expression — grammar e parser
**Files:**
- Create: `src/multi_swarm/protocol/__init__.py`
- Create: `src/multi_swarm/protocol/grammar.py`
- Create: `src/multi_swarm/protocol/parser.py`
- Test: `tests/unit/test_protocol_parser.py`
**Grammar Phase 1 (15 verbi)**: `entry-long`, `entry-short`, `exit`, `flat`, `when`, `and`, `or`, `not`, `gt`, `lt`, `eq`, `feature`, `indicator`, `crossover`, `crossunder`.
Esempio strategia:
```lisp
(strategy
(when (and (gt (indicator rsi 14) 70.0)
(crossunder (feature close) (indicator sma 20)))
(entry-short))
(when (lt (indicator rsi 14) 30.0)
(entry-long))
(when (eq (indicator rsi 14) 50.0)
(exit)))
```
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_protocol_parser.py
import pytest
from multi_swarm.protocol.parser import parse_strategy, ParseError
from multi_swarm.protocol.grammar import VERBS
def test_grammar_has_15_verbs():
assert len(VERBS) == 15
def test_parse_simple_strategy():
src = "(strategy (when (gt (indicator rsi 14) 70.0) (entry-short)))"
ast = parse_strategy(src)
assert ast.kind == "strategy"
assert len(ast.rules) == 1
rule = ast.rules[0]
assert rule.kind == "when"
assert rule.condition.kind == "gt"
assert rule.action.kind == "entry-short"
def test_parse_multiple_rules():
src = """
(strategy
(when (gt (indicator rsi 14) 70.0) (entry-short))
(when (lt (indicator rsi 14) 30.0) (entry-long)))
"""
ast = parse_strategy(src)
assert len(ast.rules) == 2
def test_parse_unknown_verb_raises():
src = "(strategy (when (frobnicate 1 2) (entry-long)))"
with pytest.raises(ParseError):
parse_strategy(src)
def test_parse_malformed_raises():
src = "(strategy (when"
with pytest.raises(ParseError):
parse_strategy(src)
def test_parse_empty_strategy_raises():
src = "(strategy)"
with pytest.raises(ParseError):
parse_strategy(src)
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_protocol_parser.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare grammar e parser**
```python
# src/multi_swarm/protocol/__init__.py
```
```python
# src/multi_swarm/protocol/grammar.py
from __future__ import annotations
VERBS: frozenset[str] = frozenset(
{
"entry-long",
"entry-short",
"exit",
"flat",
"when",
"and",
"or",
"not",
"gt",
"lt",
"eq",
"feature",
"indicator",
"crossover",
"crossunder",
}
)
ACTION_VERBS: frozenset[str] = frozenset({"entry-long", "entry-short", "exit", "flat"})
LOGICAL_VERBS: frozenset[str] = frozenset({"and", "or", "not"})
COMPARATOR_VERBS: frozenset[str] = frozenset({"gt", "lt", "eq"})
DATA_VERBS: frozenset[str] = frozenset({"feature", "indicator", "crossover", "crossunder"})
```
```python
# src/multi_swarm/protocol/parser.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
import sexpdata
from .grammar import (
ACTION_VERBS,
COMPARATOR_VERBS,
DATA_VERBS,
LOGICAL_VERBS,
VERBS,
)
class ParseError(Exception):
pass
@dataclass
class Node:
kind: str
args: list[Any] = field(default_factory=list)
@dataclass
class Rule:
kind: str # "when"
condition: Node
action: Node
@dataclass
class Strategy:
kind: str # "strategy"
rules: list[Rule]
def _to_node(token: Any) -> Node | float | int | str:
if isinstance(token, sexpdata.Symbol):
name = token.value()
return Node(kind=name, args=[])
if isinstance(token, list):
if not token:
raise ParseError("Empty s-expression")
head = token[0]
if not isinstance(head, sexpdata.Symbol):
raise ParseError(f"Non-symbol head: {head!r}")
name = head.value()
if name not in VERBS and name != "strategy":
raise ParseError(f"Unknown verb: {name}")
return Node(kind=name, args=[_to_node(arg) for arg in token[1:]])
return token
def parse_strategy(src: str) -> Strategy:
try:
parsed = sexpdata.loads(src)
except Exception as e:
raise ParseError(f"sexp parse error: {e}") from e
if not isinstance(parsed, list) or not parsed:
raise ParseError("Top-level must be (strategy ...)")
head = parsed[0]
if not isinstance(head, sexpdata.Symbol) or head.value() != "strategy":
raise ParseError("Top-level must start with 'strategy'")
raw_rules = parsed[1:]
if not raw_rules:
raise ParseError("Strategy must contain at least one rule")
rules: list[Rule] = []
for raw in raw_rules:
if not isinstance(raw, list) or len(raw) != 3:
raise ParseError(f"Rule must be (when <cond> <action>): {raw!r}")
head_r = raw[0]
if not isinstance(head_r, sexpdata.Symbol) or head_r.value() != "when":
raise ParseError(f"Rule must start with 'when': {raw!r}")
cond = _to_node(raw[1])
action = _to_node(raw[2])
if not isinstance(cond, Node):
raise ParseError(f"Condition must be a node: {cond!r}")
if not isinstance(action, Node):
raise ParseError(f"Action must be a node: {action!r}")
if action.kind not in ACTION_VERBS:
raise ParseError(f"Action must be one of {ACTION_VERBS}, got {action.kind}")
rules.append(Rule(kind="when", condition=cond, action=action))
return Strategy(kind="strategy", rules=rules)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_protocol_parser.py -v`
Expected: PASS tutti e 6.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/protocol/ tests/unit/test_protocol_parser.py
git commit -m "feat(protocol): S-expression grammar (15 verbs) + parser"
```
---
## Task 12: Protocollo — validator semantico
**Files:**
- Create: `src/multi_swarm/protocol/validator.py`
- Test: `tests/unit/test_protocol_validator.py`
Validator controlla che gli argomenti dei verbi abbiano tipi corretti (es. `gt` richiede 2 espressioni numeriche, `indicator` richiede un nome valido + length intero).
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_protocol_validator.py
import pytest
from multi_swarm.protocol.parser import parse_strategy
from multi_swarm.protocol.validator import validate_strategy, ValidationError
def test_valid_strategy_passes():
src = "(strategy (when (gt (indicator rsi 14) 70.0) (entry-short)))"
ast = parse_strategy(src)
validate_strategy(ast) # no exception
def test_indicator_unknown_name_fails():
src = "(strategy (when (gt (indicator wibble 14) 70.0) (entry-short)))"
ast = parse_strategy(src)
with pytest.raises(ValidationError, match="unknown indicator"):
validate_strategy(ast)
def test_indicator_wrong_arity_fails():
src = "(strategy (when (gt (indicator rsi) 70.0) (entry-short)))"
ast = parse_strategy(src)
with pytest.raises(ValidationError):
validate_strategy(ast)
def test_comparator_wrong_arity_fails():
src = "(strategy (when (gt 1.0) (entry-long)))"
ast = parse_strategy(src)
with pytest.raises(ValidationError):
validate_strategy(ast)
def test_feature_unknown_column_fails():
src = "(strategy (when (gt (feature wibble) 100.0) (entry-long)))"
ast = parse_strategy(src)
with pytest.raises(ValidationError, match="unknown feature"):
validate_strategy(ast)
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_protocol_validator.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare validator**
```python
# src/multi_swarm/protocol/validator.py
from __future__ import annotations
from .parser import Node, Rule, Strategy
from .grammar import COMPARATOR_VERBS, LOGICAL_VERBS
KNOWN_INDICATORS: frozenset[str] = frozenset({"sma", "rsi", "atr", "macd", "realized_vol"})
KNOWN_FEATURES: frozenset[str] = frozenset({"open", "high", "low", "close", "volume"})
class ValidationError(Exception):
pass
def validate_strategy(strategy: Strategy) -> None:
for rule in strategy.rules:
_validate_node(rule.condition, expect_bool=True)
def _validate_node(node: Node, expect_bool: bool) -> None:
if node.kind in LOGICAL_VERBS:
if node.kind == "not":
if len(node.args) != 1:
raise ValidationError(f"'not' needs 1 arg, got {len(node.args)}")
_validate_node(node.args[0], expect_bool=True)
else:
if len(node.args) < 2:
raise ValidationError(f"'{node.kind}' needs >=2 args")
for a in node.args:
_validate_node(a, expect_bool=True)
return
if node.kind in COMPARATOR_VERBS:
if len(node.args) != 2:
raise ValidationError(f"'{node.kind}' needs 2 args, got {len(node.args)}")
for a in node.args:
if isinstance(a, Node):
_validate_node(a, expect_bool=False)
return
if node.kind in {"crossover", "crossunder"}:
if len(node.args) != 2:
raise ValidationError(f"'{node.kind}' needs 2 args")
for a in node.args:
if isinstance(a, Node):
_validate_node(a, expect_bool=False)
return
if node.kind == "indicator":
if len(node.args) < 2:
raise ValidationError(f"'indicator' needs >=2 args (name, length)")
name_node = node.args[0]
if isinstance(name_node, Node):
ind_name = name_node.kind
else:
ind_name = str(name_node)
if ind_name not in KNOWN_INDICATORS:
raise ValidationError(f"unknown indicator: {ind_name}")
return
if node.kind == "feature":
if len(node.args) != 1:
raise ValidationError(f"'feature' needs 1 arg")
feat_node = node.args[0]
if isinstance(feat_node, Node):
feat_name = feat_node.kind
else:
feat_name = str(feat_node)
if feat_name not in KNOWN_FEATURES:
raise ValidationError(f"unknown feature: {feat_name}")
return
raise ValidationError(f"unexpected node kind in expression: {node.kind}")
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_protocol_validator.py -v`
Expected: PASS tutti e 5.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/protocol/validator.py tests/unit/test_protocol_validator.py
git commit -m "feat(protocol): semantic validator for AST"
```
---
## Task 13: Protocollo — compiler AST → callable rules
**Files:**
- Create: `src/multi_swarm/protocol/compiler.py`
- Test: `tests/unit/test_protocol_compiler.py`
Il compiler trasforma l'AST in una funzione `(ohlcv_window: pd.DataFrame) -> Side` che dato uno snapshot di mercato restituisce la decisione di posizione. Gli indicatori sono calcolati da una libreria locale built-in (no Cerbero in compiler — Cerbero è chiamato dagli agenti per ispezione, non dal compiler che deve essere veloce e deterministico).
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_protocol_compiler.py
import numpy as np
import pandas as pd
import pytest
from multi_swarm.protocol.parser import parse_strategy
from multi_swarm.protocol.compiler import compile_strategy
from multi_swarm.backtest.orders import Side
@pytest.fixture
def ohlcv():
idx = pd.date_range("2024-01-01", periods=200, freq="1h", tz="UTC")
close = np.linspace(100, 120, 200)
return pd.DataFrame(
{"open": close, "high": close + 0.5, "low": close - 0.5, "close": close, "volume": 1.0},
index=idx,
)
def test_compile_simple_long(ohlcv):
src = "(strategy (when (lt (indicator rsi 14) 100.0) (entry-long)))"
ast = parse_strategy(src)
fn = compile_strategy(ast)
signals = fn(ohlcv)
assert isinstance(signals, pd.Series)
assert (signals == Side.LONG).all() or (signals.dropna() == Side.LONG).all()
def test_compile_no_match_is_flat(ohlcv):
src = "(strategy (when (gt (indicator rsi 14) 1000.0) (entry-long)))"
ast = parse_strategy(src)
fn = compile_strategy(ast)
signals = fn(ohlcv)
assert (signals == Side.FLAT).any()
def test_compile_two_rules_priority(ohlcv):
src = """
(strategy
(when (gt (feature close) 110.0) (entry-long))
(when (lt (feature close) 105.0) (entry-short)))
"""
ast = parse_strategy(src)
fn = compile_strategy(ast)
signals = fn(ohlcv)
last = signals.iloc[-1]
assert last == Side.LONG # close finale è 120, regola 1 matcha
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_protocol_compiler.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare compiler**
```python
# src/multi_swarm/protocol/compiler.py
from __future__ import annotations
from typing import Callable
import numpy as np
import pandas as pd
from ..backtest.orders import Side
from .parser import Node, Strategy
def _sma(s: pd.Series, length: int) -> pd.Series:
return s.rolling(length, min_periods=1).mean()
def _rsi(s: pd.Series, length: int) -> pd.Series:
delta = s.diff()
up = delta.clip(lower=0)
down = -delta.clip(upper=0)
roll_up = up.ewm(alpha=1.0 / length, adjust=False).mean()
roll_down = down.ewm(alpha=1.0 / length, adjust=False).mean()
rs = roll_up / roll_down.replace(0, np.nan)
return 100 - (100 / (1 + rs))
def _atr(df: pd.DataFrame, length: int) -> pd.Series:
h_l = df["high"] - df["low"]
h_c = (df["high"] - df["close"].shift()).abs()
l_c = (df["low"] - df["close"].shift()).abs()
tr = pd.concat([h_l, h_c, l_c], axis=1).max(axis=1)
return tr.ewm(alpha=1.0 / length, adjust=False).mean()
def _realized_vol(s: pd.Series, window: int) -> pd.Series:
returns = s.pct_change()
return returns.rolling(window, min_periods=1).std() * np.sqrt(window)
INDICATOR_FNS: dict[str, Callable[..., pd.Series]] = {
"sma": lambda df, length: _sma(df["close"], length),
"rsi": lambda df, length: _rsi(df["close"], length),
"atr": lambda df, length: _atr(df, length),
"realized_vol": lambda df, length: _realized_vol(df["close"], length),
"macd": lambda df, fast=12, slow=26: (
_sma(df["close"], fast) - _sma(df["close"], slow)
),
}
def _eval_node(node: Node, df: pd.DataFrame) -> pd.Series:
if node.kind == "feature":
feat = node.args[0]
feat_name = feat.kind if isinstance(feat, Node) else str(feat)
return df[feat_name]
if node.kind == "indicator":
name_node = node.args[0]
ind_name = name_node.kind if isinstance(name_node, Node) else str(name_node)
params = [a for a in node.args[1:] if not isinstance(a, Node)]
return INDICATOR_FNS[ind_name](df, *params)
if node.kind == "gt":
a = _eval_node(node.args[0], df) if isinstance(node.args[0], Node) else _to_series(node.args[0], df)
b = _eval_node(node.args[1], df) if isinstance(node.args[1], Node) else _to_series(node.args[1], df)
return (a > b).astype(bool)
if node.kind == "lt":
a = _eval_node(node.args[0], df) if isinstance(node.args[0], Node) else _to_series(node.args[0], df)
b = _eval_node(node.args[1], df) if isinstance(node.args[1], Node) else _to_series(node.args[1], df)
return (a < b).astype(bool)
if node.kind == "eq":
a = _eval_node(node.args[0], df) if isinstance(node.args[0], Node) else _to_series(node.args[0], df)
b = _eval_node(node.args[1], df) if isinstance(node.args[1], Node) else _to_series(node.args[1], df)
return (a == b).astype(bool)
if node.kind == "and":
result = pd.Series(True, index=df.index)
for a in node.args:
s = _eval_node(a, df) if isinstance(a, Node) else pd.Series(bool(a), index=df.index)
result &= s.fillna(False).astype(bool)
return result
if node.kind == "or":
result = pd.Series(False, index=df.index)
for a in node.args:
s = _eval_node(a, df) if isinstance(a, Node) else pd.Series(bool(a), index=df.index)
result |= s.fillna(False).astype(bool)
return result
if node.kind == "not":
a = node.args[0]
s = _eval_node(a, df) if isinstance(a, Node) else pd.Series(bool(a), index=df.index)
return (~s.fillna(False).astype(bool))
if node.kind == "crossover":
a = _eval_node(node.args[0], df) if isinstance(node.args[0], Node) else _to_series(node.args[0], df)
b = _eval_node(node.args[1], df) if isinstance(node.args[1], Node) else _to_series(node.args[1], df)
return ((a > b) & (a.shift() <= b.shift())).fillna(False).astype(bool)
if node.kind == "crossunder":
a = _eval_node(node.args[0], df) if isinstance(node.args[0], Node) else _to_series(node.args[0], df)
b = _eval_node(node.args[1], df) if isinstance(node.args[1], Node) else _to_series(node.args[1], df)
return ((a < b) & (a.shift() >= b.shift())).fillna(False).astype(bool)
raise RuntimeError(f"unsupported node in compiler: {node.kind}")
def _to_series(value: object, df: pd.DataFrame) -> pd.Series:
return pd.Series(float(value), index=df.index) # type: ignore[arg-type]
def _action_to_side(action: Node) -> Side:
return {
"entry-long": Side.LONG,
"entry-short": Side.SHORT,
"exit": Side.FLAT,
"flat": Side.FLAT,
}[action.kind]
def compile_strategy(strategy: Strategy) -> Callable[[pd.DataFrame], pd.Series]:
"""Compila la strategy in una funzione df → Series[Side].
Le regole sono valutate in ordine; la prima che matcha vince per ogni timestamp.
Default Side.FLAT se nessuna regola matcha.
"""
def fn(df: pd.DataFrame) -> pd.Series:
result = pd.Series(Side.FLAT, index=df.index, dtype=object)
already_set = pd.Series(False, index=df.index)
for rule in strategy.rules:
match = _eval_node(rule.condition, df)
target = _action_to_side(rule.action)
apply_mask = match & ~already_set
result[apply_mask] = target
already_set |= apply_mask
return result
return fn
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_protocol_compiler.py -v`
Expected: PASS tutti e 3.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/protocol/compiler.py tests/unit/test_protocol_compiler.py
git commit -m "feat(protocol): AST compiler to (df -> Series[Side]) signal fn"
```
---
## Task 14: Genome dataclass + serializzazione
**Files:**
- Create: `src/multi_swarm/genome/__init__.py`
- Create: `src/multi_swarm/genome/hypothesis.py`
- Test: `tests/unit/test_genome_hypothesis.py`
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_genome_hypothesis.py
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
def test_genome_creation_defaults():
g = HypothesisAgentGenome(
system_prompt="Pensa come un fisico.",
feature_access=["close", "volume"],
temperature=0.9,
top_p=0.95,
model_tier=ModelTier.C,
lookback_window=200,
cognitive_style="physicist",
)
assert g.id is not None
assert g.parent_ids == []
assert g.generation == 0
def test_genome_serialization_roundtrip():
g = HypothesisAgentGenome(
system_prompt="Pensa come un biologo.",
feature_access=["close", "high", "low"],
temperature=1.1,
top_p=0.9,
model_tier=ModelTier.C,
lookback_window=300,
cognitive_style="biologist",
parent_ids=["abc"],
generation=5,
)
payload = g.to_dict()
g2 = HypothesisAgentGenome.from_dict(payload)
assert g2.system_prompt == g.system_prompt
assert g2.feature_access == g.feature_access
assert g2.temperature == g.temperature
assert g2.parent_ids == g.parent_ids
assert g2.generation == g.generation
assert g2.id == g.id
def test_genome_id_is_deterministic_on_content():
g1 = HypothesisAgentGenome(
system_prompt="X", feature_access=["close"], temperature=0.5,
top_p=0.9, model_tier=ModelTier.C, lookback_window=100, cognitive_style="x",
)
g2 = HypothesisAgentGenome(
system_prompt="X", feature_access=["close"], temperature=0.5,
top_p=0.9, model_tier=ModelTier.C, lookback_window=100, cognitive_style="x",
)
assert g1.id == g2.id
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_genome_hypothesis.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare genome**
```python
# src/multi_swarm/genome/__init__.py
```
```python
# src/multi_swarm/genome/hypothesis.py
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class ModelTier(str, Enum):
B = "B" # Sonnet 4.6 via Anthropic
C = "C" # Qwen 2.5 72B via OpenRouter
@dataclass
class HypothesisAgentGenome:
system_prompt: str
feature_access: list[str]
temperature: float
top_p: float
model_tier: ModelTier
lookback_window: int
cognitive_style: str
parent_ids: list[str] = field(default_factory=list)
generation: int = 0
id: str = ""
def __post_init__(self) -> None:
if not self.id:
self.id = self._compute_id()
def _compute_id(self) -> str:
payload = {
"system_prompt": self.system_prompt,
"feature_access": sorted(self.feature_access),
"temperature": round(self.temperature, 4),
"top_p": round(self.top_p, 4),
"model_tier": self.model_tier.value,
"lookback_window": self.lookback_window,
"cognitive_style": self.cognitive_style,
}
s = json.dumps(payload, sort_keys=True)
return hashlib.sha1(s.encode()).hexdigest()[:16]
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"system_prompt": self.system_prompt,
"feature_access": self.feature_access,
"temperature": self.temperature,
"top_p": self.top_p,
"model_tier": self.model_tier.value,
"lookback_window": self.lookback_window,
"cognitive_style": self.cognitive_style,
"parent_ids": self.parent_ids,
"generation": self.generation,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> HypothesisAgentGenome:
return cls(
system_prompt=data["system_prompt"],
feature_access=list(data["feature_access"]),
temperature=float(data["temperature"]),
top_p=float(data["top_p"]),
model_tier=ModelTier(data["model_tier"]),
lookback_window=int(data["lookback_window"]),
cognitive_style=data["cognitive_style"],
parent_ids=list(data.get("parent_ids", [])),
generation=int(data.get("generation", 0)),
id=data.get("id", ""),
)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_genome_hypothesis.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/genome/ tests/unit/test_genome_hypothesis.py
git commit -m "feat(genome): HypothesisAgentGenome with deterministic id and serde"
```
---
## Task 15: Genome — mutation operators
**Files:**
- Create: `src/multi_swarm/genome/mutation.py`
- Test: `tests/unit/test_genome_mutation.py`
Operatori di mutazione (uno selezionato casualmente per ogni mutazione):
1. `mutate_temperature`: ±0.1, clipped a [0.6, 1.3].
2. `mutate_lookback`: ±50 bar, clipped a [50, 500].
3. `mutate_feature_access`: aggiungi/rimuovi una feature da pool fissa.
4. `mutate_cognitive_style`: cambia da pool fissa di 6 stili.
5. `mutate_prompt_chunk`: l'LLM riscrive una parte del system_prompt (gestito altrove, per ora skip — solo placeholder).
In Phase 1 mutiamo solo i campi numerici/discreti deterministicamente. Le mutazioni del prompt LLM sono delegate al modulo `agents` quando si chiama il "mutator agent".
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_genome_mutation.py
import random
import pytest
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
from multi_swarm.genome.mutation import (
mutate_temperature,
mutate_lookback,
mutate_feature_access,
mutate_cognitive_style,
FEATURE_POOL,
COGNITIVE_STYLES,
)
@pytest.fixture
def base_genome():
return HypothesisAgentGenome(
system_prompt="x",
feature_access=["close"],
temperature=0.9,
top_p=0.95,
model_tier=ModelTier.C,
lookback_window=200,
cognitive_style="physicist",
)
def test_mutate_temperature_within_bounds(base_genome):
rng = random.Random(0)
for _ in range(50):
new = mutate_temperature(base_genome, rng)
assert 0.6 <= new.temperature <= 1.3
def test_mutate_lookback_within_bounds(base_genome):
rng = random.Random(0)
for _ in range(50):
new = mutate_lookback(base_genome, rng)
assert 50 <= new.lookback_window <= 500
def test_mutate_feature_access_changes_set(base_genome):
rng = random.Random(0)
new = mutate_feature_access(base_genome, rng)
assert set(new.feature_access) != set(base_genome.feature_access) or len(FEATURE_POOL) == 1
assert all(f in FEATURE_POOL for f in new.feature_access)
assert len(new.feature_access) >= 1
def test_mutate_cognitive_style_uses_pool(base_genome):
rng = random.Random(0)
new = mutate_cognitive_style(base_genome, rng)
assert new.cognitive_style in COGNITIVE_STYLES
def test_mutation_preserves_lineage(base_genome):
rng = random.Random(0)
new = mutate_temperature(base_genome, rng)
assert base_genome.id in new.parent_ids
assert new.id != base_genome.id
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_genome_mutation.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare mutazioni**
```python
# src/multi_swarm/genome/mutation.py
from __future__ import annotations
import random
from .hypothesis import HypothesisAgentGenome, ModelTier
FEATURE_POOL: tuple[str, ...] = ("open", "high", "low", "close", "volume")
COGNITIVE_STYLES: tuple[str, ...] = (
"physicist", "biologist", "historian", "meteorologist",
"ecologist", "engineer",
)
def _clone_with(g: HypothesisAgentGenome, **overrides: object) -> HypothesisAgentGenome:
payload = g.to_dict()
payload.update(overrides) # type: ignore[arg-type]
payload.pop("id", None)
payload["parent_ids"] = list(g.parent_ids) + [g.id]
payload["generation"] = g.generation + 1
return HypothesisAgentGenome.from_dict(payload)
def mutate_temperature(g: HypothesisAgentGenome, rng: random.Random) -> HypothesisAgentGenome:
delta = rng.choice([-0.1, 0.1])
new_t = max(0.6, min(1.3, g.temperature + delta))
return _clone_with(g, temperature=round(new_t, 4))
def mutate_lookback(g: HypothesisAgentGenome, rng: random.Random) -> HypothesisAgentGenome:
delta = rng.choice([-50, 50])
new_lb = max(50, min(500, g.lookback_window + delta))
return _clone_with(g, lookback_window=new_lb)
def mutate_feature_access(g: HypothesisAgentGenome, rng: random.Random) -> HypothesisAgentGenome:
current = set(g.feature_access)
if len(current) == len(FEATURE_POOL):
op = "remove"
elif not current:
op = "add"
else:
op = rng.choice(["add", "remove"])
if op == "add":
candidates = [f for f in FEATURE_POOL if f not in current]
choice = rng.choice(candidates)
new_set = current | {choice}
else:
if len(current) <= 1:
return _clone_with(g)
choice = rng.choice(sorted(current))
new_set = current - {choice}
return _clone_with(g, feature_access=sorted(new_set))
def mutate_cognitive_style(g: HypothesisAgentGenome, rng: random.Random) -> HypothesisAgentGenome:
candidates = [s for s in COGNITIVE_STYLES if s != g.cognitive_style]
new_style = rng.choice(candidates)
return _clone_with(g, cognitive_style=new_style)
MUTATION_OPS = (mutate_temperature, mutate_lookback, mutate_feature_access, mutate_cognitive_style)
def random_mutate(g: HypothesisAgentGenome, rng: random.Random) -> HypothesisAgentGenome:
op = rng.choice(MUTATION_OPS)
return op(g, rng)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_genome_mutation.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/genome/mutation.py tests/unit/test_genome_mutation.py
git commit -m "feat(genome): deterministic mutation operators (numeric + categorical)"
```
---
## Task 16: Genome — crossover
**Files:**
- Create: `src/multi_swarm/genome/crossover.py`
- Test: `tests/unit/test_genome_crossover.py`
Crossover uniforme: per ogni campo prende valore da parent1 o parent2 con prob 0.5. system_prompt: scelta intera (no merging in Phase 1).
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_genome_crossover.py
import random
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
from multi_swarm.genome.crossover import uniform_crossover
def make(name: str) -> HypothesisAgentGenome:
return HypothesisAgentGenome(
system_prompt=f"prompt-{name}",
feature_access=["close"] if name == "A" else ["close", "volume"],
temperature=0.7 if name == "A" else 1.1,
top_p=0.9,
model_tier=ModelTier.C,
lookback_window=100 if name == "A" else 300,
cognitive_style="physicist" if name == "A" else "biologist",
)
def test_crossover_lineage():
p1 = make("A")
p2 = make("B")
rng = random.Random(0)
child = uniform_crossover(p1, p2, rng)
assert sorted(child.parent_ids[-2:]) == sorted([p1.id, p2.id])
assert child.generation == max(p1.generation, p2.generation) + 1
def test_crossover_inherits_each_field_from_one_parent():
p1 = make("A")
p2 = make("B")
rng = random.Random(0)
child = uniform_crossover(p1, p2, rng)
assert child.system_prompt in (p1.system_prompt, p2.system_prompt)
assert child.temperature in (p1.temperature, p2.temperature)
assert child.lookback_window in (p1.lookback_window, p2.lookback_window)
assert child.cognitive_style in (p1.cognitive_style, p2.cognitive_style)
def test_crossover_deterministic_with_same_seed():
p1 = make("A")
p2 = make("B")
c1 = uniform_crossover(p1, p2, random.Random(42))
c2 = uniform_crossover(p1, p2, random.Random(42))
assert c1.to_dict() == c2.to_dict()
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_genome_crossover.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare crossover**
```python
# src/multi_swarm/genome/crossover.py
from __future__ import annotations
import random
from .hypothesis import HypothesisAgentGenome
def uniform_crossover(
p1: HypothesisAgentGenome,
p2: HypothesisAgentGenome,
rng: random.Random,
) -> HypothesisAgentGenome:
"""Per ogni campo, eredita da p1 (prob 0.5) o p2."""
def pick(field: str) -> object:
return getattr(p1 if rng.random() < 0.5 else p2, field)
payload = {
"system_prompt": pick("system_prompt"),
"feature_access": list(pick("feature_access")), # type: ignore[arg-type]
"temperature": pick("temperature"),
"top_p": pick("top_p"),
"model_tier": pick("model_tier").value if hasattr(pick("model_tier"), "value") else pick("model_tier"), # type: ignore[union-attr]
"lookback_window": pick("lookback_window"),
"cognitive_style": pick("cognitive_style"),
"parent_ids": [p1.id, p2.id],
"generation": max(p1.generation, p2.generation) + 1,
}
return HypothesisAgentGenome.from_dict(payload)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_genome_crossover.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/genome/crossover.py tests/unit/test_genome_crossover.py
git commit -m "feat(genome): uniform crossover for hypothesis genomes"
```
---
## Task 17: LLM client (OpenRouter Qwen + Anthropic Sonnet)
**Files:**
- Create: `src/multi_swarm/llm/__init__.py`
- Create: `src/multi_swarm/llm/client.py`
- Test: `tests/unit/test_llm_client.py`
Wrapper unificato: `LLMClient.complete(genome, system, user) -> CompletionResult`. Sceglie tier da `genome.model_tier`. Per tier C usa OpenAI SDK con base_url = OpenRouter; per tier B usa anthropic SDK.
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_llm_client.py
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
from multi_swarm.llm.client import LLMClient, CompletionResult
def make_genome(tier: ModelTier) -> HypothesisAgentGenome:
return HypothesisAgentGenome(
system_prompt="x", feature_access=["close"], temperature=0.9, top_p=0.95,
model_tier=tier, lookback_window=200, cognitive_style="physicist",
)
def test_completion_tier_c_uses_openrouter(mocker):
fake_openai = mocker.MagicMock()
fake_response = mocker.MagicMock()
fake_response.choices = [mocker.MagicMock(message=mocker.MagicMock(content="(strategy ...)"))]
fake_response.usage = mocker.MagicMock(prompt_tokens=100, completion_tokens=200)
fake_openai.chat.completions.create.return_value = fake_response
mocker.patch("multi_swarm.llm.client.OpenAI", return_value=fake_openai)
client = LLMClient(openrouter_api_key="or-x", anthropic_api_key=None)
g = make_genome(ModelTier.C)
out = client.complete(g, system="sys", user="usr")
assert isinstance(out, CompletionResult)
assert out.text == "(strategy ...)"
assert out.input_tokens == 100
assert out.output_tokens == 200
assert out.tier == ModelTier.C
fake_openai.chat.completions.create.assert_called_once()
def test_completion_tier_b_uses_anthropic(mocker):
fake_anthropic = mocker.MagicMock()
fake_msg = mocker.MagicMock()
fake_msg.content = [mocker.MagicMock(text="(strategy ...)")]
fake_msg.usage = mocker.MagicMock(input_tokens=80, output_tokens=150)
fake_anthropic.messages.create.return_value = fake_msg
mocker.patch("multi_swarm.llm.client.Anthropic", return_value=fake_anthropic)
client = LLMClient(openrouter_api_key="or-x", anthropic_api_key="an-x")
g = make_genome(ModelTier.B)
out = client.complete(g, system="sys", user="usr")
assert out.text == "(strategy ...)"
assert out.input_tokens == 80
assert out.output_tokens == 150
assert out.tier == ModelTier.B
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_llm_client.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare LLM client**
```python
# src/multi_swarm/llm/__init__.py
```
```python
# src/multi_swarm/llm/client.py
from __future__ import annotations
from dataclasses import dataclass
from anthropic import Anthropic
from openai import OpenAI
from ..genome.hypothesis import HypothesisAgentGenome, ModelTier
# Modelli configurati per Phase 1
MODEL_TIER_C = "qwen/qwen-2.5-72b-instruct" # via OpenRouter
MODEL_TIER_B = "claude-sonnet-4-6" # via Anthropic
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
@dataclass(frozen=True)
class CompletionResult:
text: str
input_tokens: int
output_tokens: int
tier: ModelTier
model: str
class LLMClient:
def __init__(
self,
openrouter_api_key: str,
anthropic_api_key: str | None = None,
):
self._openrouter = OpenAI(api_key=openrouter_api_key, base_url=OPENROUTER_BASE_URL)
self._anthropic = Anthropic(api_key=anthropic_api_key) if anthropic_api_key else None
def complete(
self,
genome: HypothesisAgentGenome,
system: str,
user: str,
max_tokens: int = 2000,
) -> CompletionResult:
if genome.model_tier == ModelTier.C:
resp = self._openrouter.chat.completions.create(
model=MODEL_TIER_C,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
temperature=genome.temperature,
top_p=genome.top_p,
max_tokens=max_tokens,
)
return CompletionResult(
text=resp.choices[0].message.content or "",
input_tokens=resp.usage.prompt_tokens,
output_tokens=resp.usage.completion_tokens,
tier=ModelTier.C,
model=MODEL_TIER_C,
)
if self._anthropic is None:
raise RuntimeError("ANTHROPIC_API_KEY required for tier B genomes")
msg = self._anthropic.messages.create(
model=MODEL_TIER_B,
system=system,
messages=[{"role": "user", "content": user}],
temperature=genome.temperature,
top_p=genome.top_p,
max_tokens=max_tokens,
)
text = "".join(block.text for block in msg.content if hasattr(block, "text"))
return CompletionResult(
text=text,
input_tokens=msg.usage.input_tokens,
output_tokens=msg.usage.output_tokens,
tier=ModelTier.B,
model=MODEL_TIER_B,
)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_llm_client.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/llm/ tests/unit/test_llm_client.py
git commit -m "feat(llm): unified client for OpenRouter (Qwen) + Anthropic (Sonnet)"
```
---
## Task 18: Cost tracker
**Files:**
- Create: `src/multi_swarm/llm/cost_tracker.py`
- Test: `tests/unit/test_cost_tracker.py`
Pricing approssimativo Phase 1 (al token):
- tier C (Qwen 2.5 72B via OpenRouter): $0.40/M input, $0.40/M output
- tier B (Claude Sonnet 4.6): $3.00/M input, $15.00/M output
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_cost_tracker.py
from multi_swarm.genome.hypothesis import ModelTier
from multi_swarm.llm.cost_tracker import CostTracker, estimate_cost
def test_estimate_cost_tier_c():
cost = estimate_cost(input_tokens=1_000_000, output_tokens=1_000_000, tier=ModelTier.C)
assert cost == 0.40 + 0.40
def test_estimate_cost_tier_b():
cost = estimate_cost(input_tokens=1_000_000, output_tokens=1_000_000, tier=ModelTier.B)
assert cost == 3.00 + 15.00
def test_tracker_accumulates():
t = CostTracker()
t.record(input_tokens=10_000, output_tokens=20_000, tier=ModelTier.C, run_id="r", agent_id="a")
t.record(input_tokens=5_000, output_tokens=15_000, tier=ModelTier.C, run_id="r", agent_id="b")
summary = t.summary()
assert summary["calls"] == 2
assert summary["input_tokens"] == 15_000
assert summary["output_tokens"] == 35_000
assert summary["cost_usd"] > 0
def test_tracker_per_tier_breakdown():
t = CostTracker()
t.record(input_tokens=10_000, output_tokens=10_000, tier=ModelTier.C, run_id="r", agent_id="a")
t.record(input_tokens=10_000, output_tokens=10_000, tier=ModelTier.B, run_id="r", agent_id="b")
summary = t.summary()
assert "C" in summary["by_tier"]
assert "B" in summary["by_tier"]
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_cost_tracker.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare cost tracker**
```python
# src/multi_swarm/llm/cost_tracker.py
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from ..genome.hypothesis import ModelTier
PRICE_PER_M_TOKENS: dict[ModelTier, dict[str, float]] = {
ModelTier.C: {"input": 0.40, "output": 0.40},
ModelTier.B: {"input": 3.00, "output": 15.00},
}
def estimate_cost(input_tokens: int, output_tokens: int, tier: ModelTier) -> float:
p = PRICE_PER_M_TOKENS[tier]
return (input_tokens / 1_000_000) * p["input"] + (output_tokens / 1_000_000) * p["output"]
@dataclass
class CostRecord:
ts: datetime
run_id: str
agent_id: str
tier: ModelTier
input_tokens: int
output_tokens: int
cost_usd: float
@dataclass
class CostTracker:
records: list[CostRecord] = field(default_factory=list)
def record(
self,
input_tokens: int,
output_tokens: int,
tier: ModelTier,
run_id: str,
agent_id: str,
) -> CostRecord:
cost = estimate_cost(input_tokens, output_tokens, tier)
rec = CostRecord(
ts=datetime.now(timezone.utc),
run_id=run_id,
agent_id=agent_id,
tier=tier,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
)
self.records.append(rec)
return rec
def summary(self) -> dict[str, Any]:
by_tier: dict[str, dict[str, float]] = defaultdict(
lambda: {"calls": 0, "input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0}
)
for r in self.records:
t = r.tier.value
by_tier[t]["calls"] += 1
by_tier[t]["input_tokens"] += r.input_tokens
by_tier[t]["output_tokens"] += r.output_tokens
by_tier[t]["cost_usd"] += r.cost_usd
return {
"calls": len(self.records),
"input_tokens": sum(r.input_tokens for r in self.records),
"output_tokens": sum(r.output_tokens for r in self.records),
"cost_usd": sum(r.cost_usd for r in self.records),
"by_tier": dict(by_tier),
}
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_cost_tracker.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/llm/cost_tracker.py tests/unit/test_cost_tracker.py
git commit -m "feat(llm): cost tracker with per-tier pricing and breakdown"
```
---
## Task 19: Hypothesis agent (LLM call → S-expr)
**Files:**
- Create: `src/multi_swarm/agents/__init__.py`
- Create: `src/multi_swarm/agents/hypothesis.py`
- Test: `tests/unit/test_hypothesis_agent.py`
L'Hypothesis agent prende un genome + un summary di mercato (statistiche di base sull'OHLCV training set) e produce una strategia S-expression. Il prompt template è fissato; il system_prompt del genoma viene iniettato nel system message; il summary di mercato e i feature accessibili sono iniettati nel user message.
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_hypothesis_agent.py
import pandas as pd
import numpy as np
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
from multi_swarm.agents.hypothesis import HypothesisAgent, MarketSummary
from multi_swarm.llm.client import CompletionResult
def make_summary():
return MarketSummary(
symbol="BTC/USDT",
timeframe="1h",
n_bars=1000,
return_mean=0.0001,
return_std=0.01,
skew=0.1,
kurtosis=3.5,
volatility_regime="high",
)
def test_hypothesis_agent_calls_llm_and_parses(mocker):
fake_llm = mocker.MagicMock()
fake_llm.complete.return_value = CompletionResult(
text="(strategy (when (gt (indicator rsi 14) 70.0) (entry-short)))",
input_tokens=200, output_tokens=80, tier=ModelTier.C, model="qwen",
)
g = HypothesisAgentGenome(
system_prompt="Pensa come un fisico.", feature_access=["close"], temperature=0.9,
top_p=0.95, model_tier=ModelTier.C, lookback_window=200, cognitive_style="physicist",
)
agent = HypothesisAgent(llm=fake_llm)
proposal = agent.propose(g, make_summary())
assert proposal.strategy is not None
assert proposal.raw_text.startswith("(strategy")
assert proposal.completion.input_tokens == 200
fake_llm.complete.assert_called_once()
def test_hypothesis_agent_returns_none_on_parse_error(mocker):
fake_llm = mocker.MagicMock()
fake_llm.complete.return_value = CompletionResult(
text="this is not s-expression",
input_tokens=200, output_tokens=80, tier=ModelTier.C, model="qwen",
)
g = HypothesisAgentGenome(
system_prompt="x", feature_access=["close"], temperature=0.9,
top_p=0.95, model_tier=ModelTier.C, lookback_window=200, cognitive_style="physicist",
)
agent = HypothesisAgent(llm=fake_llm)
proposal = agent.propose(g, make_summary())
assert proposal.strategy is None
assert proposal.parse_error is not None
def test_hypothesis_agent_extracts_sexp_from_markdown_fence(mocker):
fake_llm = mocker.MagicMock()
fake_llm.complete.return_value = CompletionResult(
text="Ecco la strategia:\n```lisp\n(strategy (when (lt (indicator rsi 14) 30.0) (entry-long)))\n```\nFatta.",
input_tokens=200, output_tokens=80, tier=ModelTier.C, model="qwen",
)
g = HypothesisAgentGenome(
system_prompt="x", feature_access=["close"], temperature=0.9,
top_p=0.95, model_tier=ModelTier.C, lookback_window=200, cognitive_style="physicist",
)
agent = HypothesisAgent(llm=fake_llm)
proposal = agent.propose(g, make_summary())
assert proposal.strategy is not None
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_hypothesis_agent.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare agent**
```python
# src/multi_swarm/agents/__init__.py
```
```python
# src/multi_swarm/agents/hypothesis.py
from __future__ import annotations
import re
from dataclasses import dataclass
from ..genome.hypothesis import HypothesisAgentGenome
from ..llm.client import CompletionResult, LLMClient
from ..protocol.parser import ParseError, Strategy, parse_strategy
from ..protocol.validator import ValidationError, validate_strategy
@dataclass(frozen=True)
class MarketSummary:
symbol: str
timeframe: str
n_bars: int
return_mean: float
return_std: float
skew: float
kurtosis: float
volatility_regime: str
@dataclass(frozen=True)
class HypothesisProposal:
strategy: Strategy | None
raw_text: str
completion: CompletionResult
parse_error: str | None = None
SYSTEM_TEMPLATE = """\
Sei un agente generatore di ipotesi di trading quantitativo per un sistema swarm.
Il tuo stile cognitivo: {cognitive_style}
Direttiva personale: {system_prompt}
Devi proporre una strategia di trading espressa nel linguaggio S-expression
con i seguenti verbi disponibili:
Azioni: entry-long, entry-short, exit, flat
Logici: and, or, not
Comparatori: gt, lt, eq
Dati: feature, indicator, crossover, crossunder
Indicatori disponibili: sma <length>, rsi <length>, atr <length>, macd, realized_vol <window>.
Feature disponibili: open, high, low, close, volume.
Le regole sono valutate in ordine; la prima che matcha vince per ogni timestamp.
La default action se nessuna regola matcha è 'flat'.
Rispondi SOLO con la S-expression in un fence ```lisp ... ```, senza prosa,
senza spiegazioni. Esempio formato:
```lisp
(strategy
(when (gt (indicator rsi 14) 70.0) (entry-short))
(when (lt (indicator rsi 14) 30.0) (entry-long)))
```
"""
USER_TEMPLATE = """\
Mercato: {symbol} timeframe {timeframe}, {n_bars} barre osservate.
Statistiche return: mean={return_mean:.5f}, std={return_std:.5f}, skew={skew:.3f}, kurt={kurtosis:.3f}.
Regime volatilità: {volatility_regime}.
Feature accessibili dal tuo genoma: {feature_access}.
Lookback massimo che puoi usare nel ragionamento: {lookback_window} barre.
Genera una strategia che cerchi anomalie sfruttabili in questo regime.
"""
_SEXP_FENCE_RE = re.compile(r"```(?:lisp|scheme|sexp)?\s*(\(strategy[\s\S]*?\))\s*```", re.MULTILINE)
def _extract_sexp(text: str) -> str | None:
m = _SEXP_FENCE_RE.search(text)
if m:
return m.group(1)
if text.strip().startswith("(strategy"):
return text.strip()
return None
class HypothesisAgent:
def __init__(self, llm: LLMClient):
self._llm = llm
def propose(
self,
genome: HypothesisAgentGenome,
market: MarketSummary,
) -> HypothesisProposal:
system = SYSTEM_TEMPLATE.format(
cognitive_style=genome.cognitive_style,
system_prompt=genome.system_prompt,
)
user = USER_TEMPLATE.format(
symbol=market.symbol,
timeframe=market.timeframe,
n_bars=market.n_bars,
return_mean=market.return_mean,
return_std=market.return_std,
skew=market.skew,
kurtosis=market.kurtosis,
volatility_regime=market.volatility_regime,
feature_access=", ".join(genome.feature_access),
lookback_window=genome.lookback_window,
)
completion = self._llm.complete(genome, system=system, user=user)
sexp = _extract_sexp(completion.text)
if sexp is None:
return HypothesisProposal(
strategy=None, raw_text=completion.text, completion=completion,
parse_error="no s-expression found in output",
)
try:
ast = parse_strategy(sexp)
validate_strategy(ast)
return HypothesisProposal(
strategy=ast, raw_text=completion.text, completion=completion,
)
except (ParseError, ValidationError) as e:
return HypothesisProposal(
strategy=None, raw_text=completion.text, completion=completion,
parse_error=str(e),
)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_hypothesis_agent.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/agents/ tests/unit/test_hypothesis_agent.py
git commit -m "feat(agents): hypothesis agent with prompt template + s-expr extraction"
```
---
## Task 20: Falsification agent (hand-crafted)
**Files:**
- Create: `src/multi_swarm/agents/falsification.py`
- Test: `tests/unit/test_falsification.py`
In Phase 1 il Falsification è completamente deterministic: prende una strategy AST, la compila, fa girare il backtest sul training set, calcola DSR + drawdown + altre metriche, restituisce un `FalsificationReport`.
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_falsification.py
from datetime import datetime, timezone
import numpy as np
import pandas as pd
import pytest
from multi_swarm.agents.falsification import FalsificationAgent, FalsificationReport
from multi_swarm.protocol.parser import parse_strategy
@pytest.fixture
def trending_ohlcv():
idx = pd.date_range("2024-01-01", periods=500, freq="1h", tz="UTC")
close = 100 + np.cumsum(np.random.RandomState(0).normal(0.01, 1.0, 500))
return pd.DataFrame(
{"open": close, "high": close + 0.5, "low": close - 0.5, "close": close, "volume": 1.0},
index=idx,
)
def test_falsification_returns_report(trending_ohlcv):
src = "(strategy (when (gt (indicator rsi 14) 70.0) (entry-short)) (when (lt (indicator rsi 14) 30.0) (entry-long)))"
ast = parse_strategy(src)
agent = FalsificationAgent(fees_bp=5.0, n_trials_dsr=20)
report = agent.evaluate(ast, trending_ohlcv)
assert isinstance(report, FalsificationReport)
assert isinstance(report.sharpe, float)
assert isinstance(report.dsr, float)
assert 0.0 <= report.dsr <= 1.0
assert isinstance(report.max_drawdown, float)
assert isinstance(report.n_trades, int)
def test_falsification_zero_trades_returns_zero_metrics(trending_ohlcv):
src = "(strategy (when (gt (feature close) 1e9) (entry-long)))"
ast = parse_strategy(src)
agent = FalsificationAgent(fees_bp=5.0, n_trials_dsr=20)
report = agent.evaluate(ast, trending_ohlcv)
assert report.n_trades == 0
assert report.sharpe == 0.0
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_falsification.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare falsification**
```python
# src/multi_swarm/agents/falsification.py
from __future__ import annotations
from dataclasses import dataclass
import pandas as pd
from ..backtest.engine import BacktestEngine
from ..metrics.basic import max_drawdown, sharpe_ratio, total_return
from ..metrics.dsr import deflated_sharpe_ratio
from ..protocol.compiler import compile_strategy
from ..protocol.parser import Strategy
@dataclass(frozen=True)
class FalsificationReport:
sharpe: float
dsr: float
dsr_pvalue: float
max_drawdown: float
total_return: float
n_trades: int
n_bars: int
class FalsificationAgent:
def __init__(self, fees_bp: float = 5.0, n_trials_dsr: int = 50):
self._engine = BacktestEngine(fees_bp=fees_bp)
self._n_trials_dsr = n_trials_dsr
def evaluate(self, strategy: Strategy, ohlcv: pd.DataFrame) -> FalsificationReport:
signal_fn = compile_strategy(strategy)
signals = signal_fn(ohlcv)
result = self._engine.run(ohlcv, signals)
if len(result.trades) == 0:
return FalsificationReport(
sharpe=0.0, dsr=0.0, dsr_pvalue=1.0, max_drawdown=0.0,
total_return=0.0, n_trades=0, n_bars=len(ohlcv),
)
sr = sharpe_ratio(result.returns, periods_per_year=8760)
dsr, p = deflated_sharpe_ratio(
result.returns,
n_trials=self._n_trials_dsr,
periods_per_year=8760,
sharpe_var=1.0,
)
return FalsificationReport(
sharpe=sr,
dsr=dsr,
dsr_pvalue=p,
max_drawdown=max_drawdown(result.equity_curve + 1.0), # +1 evita div per 0
total_return=total_return(result.equity_curve + 1.0),
n_trades=len(result.trades),
n_bars=len(ohlcv),
)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_falsification.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/agents/falsification.py tests/unit/test_falsification.py
git commit -m "feat(agents): hand-crafted falsification (compile→backtest→DSR)"
```
---
## Task 21: Adversarial agent (hand-crafted)
**Files:**
- Create: `src/multi_swarm/agents/adversarial.py`
- Test: `tests/unit/test_adversarial.py`
In Phase 1 l'Adversarial è hand-crafted con check euristici deterministic, no LLM. Verifica:
- `lookahead_check`: il numero di trade è coerente con i segnali (no trade su barra t senza segnale a t-1).
- `degenerate_check`: la strategia non è banale (es. sempre long, sempre flat).
- `trade_frequency_check`: troppi trade (>1 ogni 5 bar) = strategia rumorosa, flag warning.
- `single_trade_check`: 1-2 trade su 500 barre = lucky shot, flag warning.
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_adversarial.py
import numpy as np
import pandas as pd
import pytest
from multi_swarm.agents.adversarial import AdversarialAgent, AdversarialReport, Severity
from multi_swarm.protocol.parser import parse_strategy
@pytest.fixture
def ohlcv():
idx = pd.date_range("2024-01-01", periods=500, freq="1h", tz="UTC")
close = 100 + np.cumsum(np.random.RandomState(0).normal(0.0, 1.0, 500))
return pd.DataFrame(
{"open": close, "high": close + 0.5, "low": close - 0.5, "close": close, "volume": 1.0},
index=idx,
)
def test_degenerate_always_long_flagged(ohlcv):
src = "(strategy (when (gt (feature close) -1e9) (entry-long)))"
ast = parse_strategy(src)
agent = AdversarialAgent()
report = agent.review(ast, ohlcv)
assert any(f.name == "degenerate" and f.severity == Severity.HIGH for f in report.findings)
def test_no_findings_on_reasonable_strategy(ohlcv):
src = "(strategy (when (gt (indicator rsi 14) 70.0) (entry-short)) (when (lt (indicator rsi 14) 30.0) (entry-long)))"
ast = parse_strategy(src)
agent = AdversarialAgent()
report = agent.review(ast, ohlcv)
high_findings = [f for f in report.findings if f.severity == Severity.HIGH]
assert len(high_findings) == 0
def test_zero_trade_strategy_flagged(ohlcv):
src = "(strategy (when (gt (feature close) 1e9) (entry-long)))"
ast = parse_strategy(src)
agent = AdversarialAgent()
report = agent.review(ast, ohlcv)
assert any(f.name == "no_trades" for f in report.findings)
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_adversarial.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare adversarial**
```python
# src/multi_swarm/agents/adversarial.py
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
import pandas as pd
from ..backtest.engine import BacktestEngine
from ..backtest.orders import Side
from ..protocol.compiler import compile_strategy
from ..protocol.parser import Strategy
class Severity(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
@dataclass(frozen=True)
class Finding:
name: str
severity: Severity
detail: str
@dataclass
class AdversarialReport:
findings: list[Finding] = field(default_factory=list)
class AdversarialAgent:
def __init__(self, fees_bp: float = 5.0):
self._engine = BacktestEngine(fees_bp=fees_bp)
def review(self, strategy: Strategy, ohlcv: pd.DataFrame) -> AdversarialReport:
signal_fn = compile_strategy(strategy)
signals = signal_fn(ohlcv)
result = self._engine.run(ohlcv, signals)
report = AdversarialReport()
if len(result.trades) == 0:
report.findings.append(Finding(
name="no_trades", severity=Severity.HIGH,
detail="Strategy never opens a position on training data",
))
return report
unique_signals = signals.unique()
if len(unique_signals) == 1 and unique_signals[0] in (Side.LONG, Side.SHORT):
report.findings.append(Finding(
name="degenerate", severity=Severity.HIGH,
detail=f"Strategy is always {unique_signals[0].value}, no real decision",
))
n_bars = len(ohlcv)
n_trades = len(result.trades)
if n_trades > n_bars / 5:
report.findings.append(Finding(
name="overtrading", severity=Severity.MEDIUM,
detail=f"{n_trades} trades on {n_bars} bars (>1 per 5 bars)",
))
if n_trades < 5:
report.findings.append(Finding(
name="undertrading", severity=Severity.MEDIUM,
detail=f"only {n_trades} trades — likely lucky shot",
))
return report
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_adversarial.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/agents/adversarial.py tests/unit/test_adversarial.py
git commit -m "feat(agents): hand-crafted adversarial with heuristic checks"
```
---
## Task 22: Fitness function v0
**Files:**
- Create: `src/multi_swarm/ga/__init__.py`
- Create: `src/multi_swarm/ga/fitness.py`
- Test: `tests/unit/test_fitness.py`
Fitness v0: `dsr - drawdown_penalty * max_drawdown`. Default `drawdown_penalty = 0.5`. Strategy con 0 trade = fitness 0 (non penalizzata negativamente, ma neutrale).
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_fitness.py
from multi_swarm.agents.falsification import FalsificationReport
from multi_swarm.agents.adversarial import AdversarialReport, Finding, Severity
from multi_swarm.ga.fitness import compute_fitness
def make_falsification(dsr=0.7, max_dd=0.2, n_trades=30):
return FalsificationReport(
sharpe=1.5, dsr=dsr, dsr_pvalue=0.05, max_drawdown=max_dd,
total_return=0.3, n_trades=n_trades, n_bars=500,
)
def test_fitness_zero_trades_is_zero():
f = make_falsification(n_trades=0)
a = AdversarialReport()
assert compute_fitness(f, a) == 0.0
def test_fitness_increases_with_dsr():
a = AdversarialReport()
f1 = make_falsification(dsr=0.5)
f2 = make_falsification(dsr=0.9)
assert compute_fitness(f2, a) > compute_fitness(f1, a)
def test_fitness_decreases_with_drawdown():
a = AdversarialReport()
f1 = make_falsification(max_dd=0.1)
f2 = make_falsification(max_dd=0.4)
assert compute_fitness(f1, a) > compute_fitness(f2, a)
def test_fitness_zeroed_by_high_severity_finding():
f = make_falsification()
a = AdversarialReport(findings=[Finding(name="degenerate", severity=Severity.HIGH, detail="x")])
assert compute_fitness(f, a) == 0.0
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_fitness.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare fitness**
```python
# src/multi_swarm/ga/__init__.py
```
```python
# src/multi_swarm/ga/fitness.py
from __future__ import annotations
from ..agents.adversarial import AdversarialReport, Severity
from ..agents.falsification import FalsificationReport
def compute_fitness(
falsification: FalsificationReport,
adversarial: AdversarialReport,
drawdown_penalty: float = 0.5,
) -> float:
"""Fitness v0 Phase 1.
Logica:
1. Se 0 trade → fitness 0.
2. Se almeno un finding HIGH adversarial → fitness 0 (kill).
3. Altrimenti: dsr - drawdown_penalty * max_drawdown, clamped a 0.
"""
if falsification.n_trades == 0:
return 0.0
if any(f.severity == Severity.HIGH for f in adversarial.findings):
return 0.0
raw = falsification.dsr - drawdown_penalty * falsification.max_drawdown
return max(0.0, float(raw))
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_fitness.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/ga/ tests/unit/test_fitness.py
git commit -m "feat(ga): fitness v0 (DSR - dd_penalty * max_dd, kill on adversarial high)"
```
---
## Task 23: GA — tournament selection + elitism
**Files:**
- Create: `src/multi_swarm/ga/selection.py`
- Test: `tests/unit/test_selection.py`
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_selection.py
import random
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
from multi_swarm.ga.selection import tournament_select, elite_select
def make(idx: int) -> HypothesisAgentGenome:
return HypothesisAgentGenome(
system_prompt=f"p-{idx}", feature_access=["close"], temperature=0.9,
top_p=0.95, model_tier=ModelTier.C, lookback_window=100, cognitive_style="x",
)
def test_tournament_picks_best_in_sample():
population = [make(i) for i in range(10)]
fitnesses = {g.id: float(i) for i, g in enumerate(population)}
rng = random.Random(0)
winner = tournament_select(population, fitnesses, k=5, rng=rng)
assert isinstance(winner, HypothesisAgentGenome)
assert fitnesses[winner.id] >= 0.0
def test_tournament_size_one_is_random():
population = [make(i) for i in range(10)]
fitnesses = {g.id: float(i) for i, g in enumerate(population)}
rng = random.Random(0)
picks = [tournament_select(population, fitnesses, k=1, rng=rng) for _ in range(50)]
distinct = {p.id for p in picks}
assert len(distinct) > 1
def test_elite_select_returns_top_k():
population = [make(i) for i in range(10)]
fitnesses = {g.id: float(i) for i, g in enumerate(population)}
elites = elite_select(population, fitnesses, k=3)
elite_fitnesses = sorted([fitnesses[g.id] for g in elites], reverse=True)
assert elite_fitnesses == [9.0, 8.0, 7.0]
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_selection.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare selection**
```python
# src/multi_swarm/ga/selection.py
from __future__ import annotations
import random
from ..genome.hypothesis import HypothesisAgentGenome
def tournament_select(
population: list[HypothesisAgentGenome],
fitnesses: dict[str, float],
k: int,
rng: random.Random,
) -> HypothesisAgentGenome:
"""Estrae k individui random e restituisce il migliore."""
if k < 1:
raise ValueError("k must be >= 1")
if not population:
raise ValueError("empty population")
candidates = rng.sample(population, k=min(k, len(population)))
return max(candidates, key=lambda g: fitnesses.get(g.id, 0.0))
def elite_select(
population: list[HypothesisAgentGenome],
fitnesses: dict[str, float],
k: int,
) -> list[HypothesisAgentGenome]:
"""Restituisce i k genomi con fitness più alta."""
sorted_pop = sorted(population, key=lambda g: fitnesses.get(g.id, 0.0), reverse=True)
return sorted_pop[:k]
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_selection.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/ga/selection.py tests/unit/test_selection.py
git commit -m "feat(ga): tournament selection + elitism"
```
---
## Task 24: GA — generation step (loop di una generazione)
**Files:**
- Create: `src/multi_swarm/ga/loop.py`
- Test: `tests/unit/test_ga_loop.py`
`step()`: dato (popolazione, fitnesses, RNG, config), produce la prossima popolazione tramite elitism + tournament selection + (mutation OR crossover) per riempire i restanti slot.
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_ga_loop.py
import random
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
from multi_swarm.ga.loop import next_generation, GAConfig
def make(idx: int) -> HypothesisAgentGenome:
return HypothesisAgentGenome(
system_prompt=f"p-{idx}", feature_access=["close"], temperature=0.9,
top_p=0.95, model_tier=ModelTier.C, lookback_window=100, cognitive_style="x",
)
def test_next_generation_size_preserved():
population = [make(i) for i in range(20)]
fitnesses = {g.id: float(i) for i, g in enumerate(population)}
cfg = GAConfig(population_size=20, elite_k=2, tournament_k=3, p_crossover=0.5)
new_pop = next_generation(population, fitnesses, cfg, rng=random.Random(0))
assert len(new_pop) == 20
def test_next_generation_includes_elites():
population = [make(i) for i in range(20)]
fitnesses = {g.id: float(i) for i, g in enumerate(population)}
cfg = GAConfig(population_size=20, elite_k=2, tournament_k=3, p_crossover=0.5)
new_pop = next_generation(population, fitnesses, cfg, rng=random.Random(0))
elite_ids = {g.id for g in sorted(population, key=lambda g: fitnesses[g.id], reverse=True)[:2]}
new_ids = {g.id for g in new_pop}
assert elite_ids.issubset(new_ids)
def test_next_generation_increments_generation_for_offspring():
population = [make(i) for i in range(20)]
fitnesses = {g.id: float(i) for i, g in enumerate(population)}
cfg = GAConfig(population_size=20, elite_k=2, tournament_k=3, p_crossover=0.5)
new_pop = next_generation(population, fitnesses, cfg, rng=random.Random(0))
new_offspring = [g for g in new_pop if g.id not in {p.id for p in population}]
assert all(g.generation > 0 for g in new_offspring)
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_ga_loop.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare loop**
```python
# src/multi_swarm/ga/loop.py
from __future__ import annotations
import random
from dataclasses import dataclass
from ..genome.crossover import uniform_crossover
from ..genome.hypothesis import HypothesisAgentGenome
from ..genome.mutation import random_mutate
from .selection import elite_select, tournament_select
@dataclass(frozen=True)
class GAConfig:
population_size: int
elite_k: int
tournament_k: int
p_crossover: float
def next_generation(
population: list[HypothesisAgentGenome],
fitnesses: dict[str, float],
cfg: GAConfig,
rng: random.Random,
) -> list[HypothesisAgentGenome]:
new_pop: list[HypothesisAgentGenome] = list(elite_select(population, fitnesses, cfg.elite_k))
while len(new_pop) < cfg.population_size:
if rng.random() < cfg.p_crossover and len(population) >= 2:
p1 = tournament_select(population, fitnesses, cfg.tournament_k, rng)
p2 = tournament_select(population, fitnesses, cfg.tournament_k, rng)
child = uniform_crossover(p1, p2, rng)
else:
parent = tournament_select(population, fitnesses, cfg.tournament_k, rng)
child = random_mutate(parent, rng)
new_pop.append(child)
return new_pop[: cfg.population_size]
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_ga_loop.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/ga/loop.py tests/unit/test_ga_loop.py
git commit -m "feat(ga): next_generation step (elitism + tournament + mutate/crossover)"
```
---
## Task 25: SQLite schema + repository
**Files:**
- Create: `src/multi_swarm/persistence/__init__.py`
- Create: `src/multi_swarm/persistence/schema.py`
- Create: `src/multi_swarm/persistence/repository.py`
- Test: `tests/unit/test_repository.py`
Schema essenziale Phase 1:
- `runs(id, name, started_at, completed_at, status, config_json, total_cost_usd)`
- `generations(run_id, generation_idx, started_at, completed_at, n_genomes, fitness_median, fitness_max, fitness_p90, entropy)`
- `genomes(id, run_id, generation_idx, payload_json)`
- `evaluations(genome_id, run_id, fitness, dsr, dsr_pvalue, sharpe, max_dd, total_return, n_trades, parse_error, raw_text, eval_ts)`
- `cost_records(id, run_id, agent_id, ts, tier, input_tokens, output_tokens, cost_usd)`
- `adversarial_findings(genome_id, run_id, name, severity, detail)`
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_repository.py
from pathlib import Path
import json
from multi_swarm.persistence.repository import Repository
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
def make_genome(idx: int) -> HypothesisAgentGenome:
return HypothesisAgentGenome(
system_prompt=f"p-{idx}", feature_access=["close"], temperature=0.9,
top_p=0.95, model_tier=ModelTier.C, lookback_window=100, cognitive_style="x",
)
def test_repository_creates_schema(tmp_path: Path):
repo = Repository(db_path=tmp_path / "runs.db")
repo.init_schema()
assert (tmp_path / "runs.db").exists()
def test_repository_create_run_and_get(tmp_path: Path):
repo = Repository(db_path=tmp_path / "runs.db")
repo.init_schema()
run_id = repo.create_run(name="phase1-test", config={"k": 20})
run = repo.get_run(run_id)
assert run["name"] == "phase1-test"
assert json.loads(run["config_json"])["k"] == 20
def test_repository_save_genome_and_evaluation(tmp_path: Path):
repo = Repository(db_path=tmp_path / "runs.db")
repo.init_schema()
run_id = repo.create_run(name="t", config={})
g = make_genome(0)
repo.save_genome(run_id=run_id, generation_idx=0, genome=g)
repo.save_evaluation(
run_id=run_id, genome_id=g.id, fitness=0.5, dsr=0.7, dsr_pvalue=0.05,
sharpe=1.5, max_dd=0.2, total_return=0.3, n_trades=30,
parse_error=None, raw_text="(strategy ...)",
)
evals = repo.list_evaluations(run_id)
assert len(evals) == 1
assert evals[0]["fitness"] == 0.5
def test_repository_save_generation_summary(tmp_path: Path):
repo = Repository(db_path=tmp_path / "runs.db")
repo.init_schema()
run_id = repo.create_run(name="t", config={})
repo.save_generation_summary(
run_id=run_id, generation_idx=0, n_genomes=20,
fitness_median=0.3, fitness_max=0.8, fitness_p90=0.7, entropy=0.85,
)
gens = repo.list_generations(run_id)
assert len(gens) == 1
assert gens[0]["fitness_max"] == 0.8
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_repository.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare schema + repository**
```python
# src/multi_swarm/persistence/__init__.py
```
```python
# src/multi_swarm/persistence/schema.py
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
started_at TEXT NOT NULL,
completed_at TEXT,
status TEXT NOT NULL DEFAULT 'running',
config_json TEXT NOT NULL,
total_cost_usd REAL NOT NULL DEFAULT 0.0
);
CREATE TABLE IF NOT EXISTS generations (
run_id TEXT NOT NULL,
generation_idx INTEGER NOT NULL,
started_at TEXT,
completed_at TEXT,
n_genomes INTEGER NOT NULL,
fitness_median REAL NOT NULL,
fitness_max REAL NOT NULL,
fitness_p90 REAL NOT NULL,
entropy REAL NOT NULL,
PRIMARY KEY (run_id, generation_idx),
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE TABLE IF NOT EXISTS genomes (
id TEXT NOT NULL,
run_id TEXT NOT NULL,
generation_idx INTEGER NOT NULL,
payload_json TEXT NOT NULL,
PRIMARY KEY (id, run_id, generation_idx),
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE TABLE IF NOT EXISTS evaluations (
run_id TEXT NOT NULL,
genome_id TEXT NOT NULL,
fitness REAL NOT NULL,
dsr REAL NOT NULL,
dsr_pvalue REAL NOT NULL,
sharpe REAL NOT NULL,
max_dd REAL NOT NULL,
total_return REAL NOT NULL,
n_trades INTEGER NOT NULL,
parse_error TEXT,
raw_text TEXT,
eval_ts TEXT NOT NULL,
PRIMARY KEY (run_id, genome_id),
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE TABLE IF NOT EXISTS cost_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
ts TEXT NOT NULL,
tier TEXT NOT NULL,
input_tokens INTEGER NOT NULL,
output_tokens INTEGER NOT NULL,
cost_usd REAL NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE TABLE IF NOT EXISTS adversarial_findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
genome_id TEXT NOT NULL,
name TEXT NOT NULL,
severity TEXT NOT NULL,
detail TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs(id)
);
CREATE INDEX IF NOT EXISTS idx_evaluations_fitness ON evaluations(run_id, fitness DESC);
CREATE INDEX IF NOT EXISTS idx_genomes_generation ON genomes(run_id, generation_idx);
CREATE INDEX IF NOT EXISTS idx_cost_run ON cost_records(run_id);
"""
```
```python
# src/multi_swarm/persistence/repository.py
from __future__ import annotations
import json
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from ..genome.hypothesis import HypothesisAgentGenome
from .schema import SCHEMA_SQL
class Repository:
def __init__(self, db_path: Path | str):
self.db_path = Path(db_path)
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
return conn
def init_schema(self) -> None:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._conn() as conn:
conn.executescript(SCHEMA_SQL)
@staticmethod
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
# runs
def create_run(self, name: str, config: dict[str, Any]) -> str:
rid = uuid.uuid4().hex
with self._conn() as conn:
conn.execute(
"INSERT INTO runs (id, name, started_at, status, config_json) VALUES (?,?,?,?,?)",
(rid, name, self._now(), "running", json.dumps(config)),
)
return rid
def complete_run(self, run_id: str, total_cost: float, status: str = "completed") -> None:
with self._conn() as conn:
conn.execute(
"UPDATE runs SET completed_at=?, status=?, total_cost_usd=? WHERE id=?",
(self._now(), status, total_cost, run_id),
)
def get_run(self, run_id: str) -> dict[str, Any]:
with self._conn() as conn:
row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone()
if row is None:
raise KeyError(run_id)
return dict(row)
def list_runs(self) -> list[dict[str, Any]]:
with self._conn() as conn:
rows = conn.execute("SELECT * FROM runs ORDER BY started_at DESC").fetchall()
return [dict(r) for r in rows]
# generations
def save_generation_summary(
self, run_id: str, generation_idx: int, n_genomes: int,
fitness_median: float, fitness_max: float, fitness_p90: float, entropy: float,
) -> None:
with self._conn() as conn:
conn.execute(
"""INSERT OR REPLACE INTO generations
(run_id, generation_idx, completed_at, n_genomes,
fitness_median, fitness_max, fitness_p90, entropy)
VALUES (?,?,?,?,?,?,?,?)""",
(run_id, generation_idx, self._now(), n_genomes,
fitness_median, fitness_max, fitness_p90, entropy),
)
def list_generations(self, run_id: str) -> list[dict[str, Any]]:
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM generations WHERE run_id=? ORDER BY generation_idx",
(run_id,),
).fetchall()
return [dict(r) for r in rows]
# genomes
def save_genome(self, run_id: str, generation_idx: int, genome: HypothesisAgentGenome) -> None:
with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO genomes (id, run_id, generation_idx, payload_json) VALUES (?,?,?,?)",
(genome.id, run_id, generation_idx, json.dumps(genome.to_dict())),
)
def list_genomes(self, run_id: str, generation_idx: int | None = None) -> list[dict[str, Any]]:
with self._conn() as conn:
if generation_idx is None:
rows = conn.execute(
"SELECT * FROM genomes WHERE run_id=? ORDER BY generation_idx, id", (run_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM genomes WHERE run_id=? AND generation_idx=? ORDER BY id",
(run_id, generation_idx),
).fetchall()
return [dict(r) for r in rows]
# evaluations
def save_evaluation(
self, run_id: str, genome_id: str, fitness: float, dsr: float, dsr_pvalue: float,
sharpe: float, max_dd: float, total_return: float, n_trades: int,
parse_error: str | None, raw_text: str | None,
) -> None:
with self._conn() as conn:
conn.execute(
"""INSERT OR REPLACE INTO evaluations
(run_id, genome_id, fitness, dsr, dsr_pvalue, sharpe, max_dd,
total_return, n_trades, parse_error, raw_text, eval_ts)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(run_id, genome_id, fitness, dsr, dsr_pvalue, sharpe, max_dd,
total_return, n_trades, parse_error, raw_text, self._now()),
)
def list_evaluations(self, run_id: str) -> list[dict[str, Any]]:
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM evaluations WHERE run_id=? ORDER BY fitness DESC",
(run_id,),
).fetchall()
return [dict(r) for r in rows]
# cost
def save_cost_record(
self, run_id: str, agent_id: str, tier: str,
input_tokens: int, output_tokens: int, cost_usd: float,
) -> None:
with self._conn() as conn:
conn.execute(
"""INSERT INTO cost_records
(run_id, agent_id, ts, tier, input_tokens, output_tokens, cost_usd)
VALUES (?,?,?,?,?,?,?)""",
(run_id, agent_id, self._now(), tier, input_tokens, output_tokens, cost_usd),
)
def total_cost(self, run_id: str) -> float:
with self._conn() as conn:
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0.0) AS c FROM cost_records WHERE run_id=?",
(run_id,),
).fetchone()
return float(row["c"])
# adversarial
def save_adversarial_finding(
self, run_id: str, genome_id: str, name: str, severity: str, detail: str,
) -> None:
with self._conn() as conn:
conn.execute(
"""INSERT INTO adversarial_findings
(run_id, genome_id, name, severity, detail) VALUES (?,?,?,?,?)""",
(run_id, genome_id, name, severity, detail),
)
def list_adversarial_findings(self, run_id: str) -> list[dict[str, Any]]:
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM adversarial_findings WHERE run_id=? ORDER BY id", (run_id,),
).fetchall()
return [dict(r) for r in rows]
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_repository.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/persistence/ tests/unit/test_repository.py
git commit -m "feat(persistence): SQLite schema + repository for runs/genomes/evals/cost"
```
---
## Task 26: Generation summary utilities (entropy, percentili)
**Files:**
- Create: `src/multi_swarm/ga/summary.py`
- Test: `tests/unit/test_ga_summary.py`
Helper per calcolare metriche aggregate di una generazione: median, max, p90, entropy della distribuzione di fitness (binned). L'entropy serve come gate Phase 1 (#4 dello spec).
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_ga_summary.py
import math
import pytest
from multi_swarm.ga.summary import generation_summary
def test_summary_basic_stats():
fitnesses = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
s = generation_summary(fitnesses, n_bins=5)
assert s["median"] == pytest.approx(0.45, abs=0.05)
assert s["max"] == pytest.approx(0.9)
assert 0.0 <= s["entropy"] <= math.log(5) + 0.01
def test_summary_uniform_high_entropy():
fitnesses = [0.1 * i for i in range(20)]
s_uniform = generation_summary(fitnesses, n_bins=5)
s_concentrated = generation_summary([0.5] * 20, n_bins=5)
assert s_uniform["entropy"] > s_concentrated["entropy"]
def test_summary_p90():
fitnesses = list(range(100))
s = generation_summary([float(x) for x in fitnesses], n_bins=10)
assert 88.0 <= s["p90"] <= 91.0
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_ga_summary.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare summary**
```python
# src/multi_swarm/ga/summary.py
from __future__ import annotations
import math
import numpy as np
def generation_summary(fitnesses: list[float], n_bins: int = 10) -> dict[str, float]:
arr = np.asarray(fitnesses, dtype=float)
if arr.size == 0:
return {"median": 0.0, "max": 0.0, "p90": 0.0, "entropy": 0.0}
median = float(np.median(arr))
fmax = float(np.max(arr))
p90 = float(np.percentile(arr, 90))
if fmax > 0:
normalized = arr / fmax
else:
normalized = arr
hist, _ = np.histogram(normalized, bins=n_bins, range=(0.0, 1.0))
probs = hist / hist.sum() if hist.sum() > 0 else hist
entropy = float(-sum(p * math.log(p) for p in probs if p > 0))
return {"median": median, "max": fmax, "p90": p90, "entropy": entropy}
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_ga_summary.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/ga/summary.py tests/unit/test_ga_summary.py
git commit -m "feat(ga): generation summary stats (median/max/p90/entropy)"
```
---
## Task 27: Initial population generator
**Files:**
- Create: `src/multi_swarm/ga/initial.py`
- Test: `tests/unit/test_ga_initial.py`
Genera popolazione iniziale K=20: stili cognitivi distribuiti uniformemente sui 6 stili, temperature random in [0.7, 1.2], lookback random in {100, 200, 300}, prompt generati da template fissi per ogni stile cognitivo.
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_ga_initial.py
import random
from multi_swarm.ga.initial import build_initial_population
from multi_swarm.genome.hypothesis import ModelTier
def test_initial_population_size():
pop = build_initial_population(k=20, model_tier=ModelTier.C, rng=random.Random(0))
assert len(pop) == 20
def test_initial_population_unique_ids():
pop = build_initial_population(k=20, model_tier=ModelTier.C, rng=random.Random(0))
ids = {g.id for g in pop}
assert len(ids) == 20
def test_initial_population_covers_all_styles():
pop = build_initial_population(k=12, model_tier=ModelTier.C, rng=random.Random(0))
styles = {g.cognitive_style for g in pop}
assert len(styles) == 6
def test_initial_population_generation_zero():
pop = build_initial_population(k=20, model_tier=ModelTier.C, rng=random.Random(0))
assert all(g.generation == 0 for g in pop)
assert all(g.parent_ids == [] for g in pop)
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_ga_initial.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare initial**
```python
# src/multi_swarm/ga/initial.py
from __future__ import annotations
import random
from ..genome.hypothesis import HypothesisAgentGenome, ModelTier
from ..genome.mutation import COGNITIVE_STYLES
STYLE_PROMPTS: dict[str, str] = {
"physicist": "Cerca leggi conservative, simmetrie, regimi di scala. Pensa in termini di flussi e potenziali.",
"biologist": "Cerca pattern adattivi, nicchie ecologiche, predator-prey dynamics tra partecipanti del mercato.",
"historian": "Cerca pattern ricorrenti su scale temporali multiple, analogie con regimi storici, mean reversion strutturali.",
"meteorologist": "Cerca regimi di volatilità che si autoalimentano, transizioni di stato come fronti, persistenza locale.",
"ecologist": "Cerca interazioni multi-asset, correlazioni cluster, segnali di stress sistemico nelle dinamiche di flusso.",
"engineer": "Cerca segnali con rapporto S/N favorevole, filtri causali, robustezza a perturbazioni di calibrazione.",
}
def build_initial_population(
k: int,
model_tier: ModelTier,
rng: random.Random,
feature_pool: tuple[str, ...] = ("close", "high", "low", "volume"),
) -> list[HypothesisAgentGenome]:
"""Costruisce una popolazione iniziale K varia per stile cognitivo + parametri."""
population: list[HypothesisAgentGenome] = []
for i in range(k):
style = COGNITIVE_STYLES[i % len(COGNITIVE_STYLES)]
n_features = rng.randint(1, len(feature_pool))
feats = sorted(rng.sample(feature_pool, k=n_features))
g = HypothesisAgentGenome(
system_prompt=STYLE_PROMPTS[style],
feature_access=feats,
temperature=round(rng.uniform(0.7, 1.2), 2),
top_p=0.95,
model_tier=model_tier,
lookback_window=rng.choice([100, 150, 200, 300]),
cognitive_style=style,
)
# Seed per garantire id univoco se duplicato (raro ma possibile)
while any(g.id == p.id for p in population):
g = HypothesisAgentGenome(
system_prompt=g.system_prompt + f" [seed-{i}-{rng.randint(0, 1_000_000)}]",
feature_access=g.feature_access,
temperature=g.temperature,
top_p=g.top_p,
model_tier=g.model_tier,
lookback_window=g.lookback_window,
cognitive_style=g.cognitive_style,
)
population.append(g)
return population
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_ga_initial.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/ga/initial.py tests/unit/test_ga_initial.py
git commit -m "feat(ga): initial population generator with cognitive style coverage"
```
---
## Task 28: Market summary builder (statistiche per il prompt)
**Files:**
- Create: `src/multi_swarm/agents/market_summary.py`
- Test: `tests/unit/test_market_summary.py`
Calcola le statistiche del training set che vengono iniettate nel prompt dell'Hypothesis agent.
- [ ] **Step 1: Scrivere test fallente**
```python
# tests/unit/test_market_summary.py
import numpy as np
import pandas as pd
from multi_swarm.agents.market_summary import build_market_summary
def test_build_summary_basic():
idx = pd.date_range("2024-01-01", periods=200, freq="1h", tz="UTC")
np.random.seed(0)
close = 100 + np.cumsum(np.random.normal(0, 1, 200))
df = pd.DataFrame(
{"open": close, "high": close + 0.5, "low": close - 0.5, "close": close, "volume": 1.0},
index=idx,
)
s = build_market_summary(df, symbol="BTC/USDT", timeframe="1h")
assert s.symbol == "BTC/USDT"
assert s.timeframe == "1h"
assert s.n_bars == 200
assert isinstance(s.return_mean, float)
assert isinstance(s.return_std, float)
assert s.volatility_regime in {"low", "medium", "high"}
def test_volatility_regime_high_for_volatile():
idx = pd.date_range("2024-01-01", periods=200, freq="1h", tz="UTC")
np.random.seed(0)
close = 100 + np.cumsum(np.random.normal(0, 5.0, 200)) # alta vol
df = pd.DataFrame(
{"open": close, "high": close + 0.5, "low": close - 0.5, "close": close, "volume": 1.0},
index=idx,
)
s = build_market_summary(df, symbol="BTC/USDT", timeframe="1h")
assert s.volatility_regime in {"medium", "high"}
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/unit/test_market_summary.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare market summary**
```python
# src/multi_swarm/agents/market_summary.py
from __future__ import annotations
import numpy as np
import pandas as pd
from scipy import stats
from .hypothesis import MarketSummary
def build_market_summary(
ohlcv: pd.DataFrame, symbol: str, timeframe: str,
) -> MarketSummary:
returns = ohlcv["close"].pct_change().dropna()
return_mean = float(returns.mean())
return_std = float(returns.std(ddof=1))
skew = float(stats.skew(returns, bias=False))
kurt = float(stats.kurtosis(returns, fisher=True, bias=False))
if return_std < 0.005:
regime = "low"
elif return_std < 0.02:
regime = "medium"
else:
regime = "high"
return MarketSummary(
symbol=symbol,
timeframe=timeframe,
n_bars=len(ohlcv),
return_mean=return_mean,
return_std=return_std,
skew=skew,
kurtosis=kurt,
volatility_regime=regime,
)
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/unit/test_market_summary.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/agents/market_summary.py tests/unit/test_market_summary.py
git commit -m "feat(agents): market summary builder for hypothesis prompt"
```
---
## Task 29: Run orchestrator (end-to-end loop)
**Files:**
- Create: `src/multi_swarm/orchestrator/__init__.py`
- Create: `src/multi_swarm/orchestrator/run.py`
- Test: `tests/integration/test_e2e_minimal_run.py`
L'orchestrator coordina: load OHLCV → build summary → init pop → per ogni gen: chiedi LLM, falsifica, adversarial, fitness → salva su DB → next_generation. Configurazione via dataclass `RunConfig`.
- [ ] **Step 1: Scrivere test integration**
```python
# tests/integration/__init__.py
```
```python
# tests/integration/test_e2e_minimal_run.py
import random
from datetime import datetime, timezone
from pathlib import Path
import pytest
import numpy as np
import pandas as pd
from multi_swarm.orchestrator.run import RunConfig, run_phase1
from multi_swarm.genome.hypothesis import ModelTier
from multi_swarm.persistence.repository import Repository
from multi_swarm.llm.client import CompletionResult
@pytest.fixture
def synthetic_ohlcv():
idx = pd.date_range("2024-01-01", periods=500, freq="1h", tz="UTC")
close = 100 + np.cumsum(np.random.RandomState(0).normal(0.01, 1.0, 500))
return pd.DataFrame(
{"open": close, "high": close + 0.5, "low": close - 0.5, "close": close, "volume": 1.0},
index=idx,
)
@pytest.fixture
def fake_llm(mocker):
"""LLM mock che ritorna sempre una strategia valida."""
fake = mocker.MagicMock()
fake.complete.return_value = CompletionResult(
text="```lisp\n(strategy (when (gt (indicator rsi 14) 70.0) (entry-short)) (when (lt (indicator rsi 14) 30.0) (entry-long)))\n```",
input_tokens=200, output_tokens=80, tier=ModelTier.C, model="qwen",
)
return fake
def test_e2e_minimal_run_completes(tmp_path: Path, synthetic_ohlcv, fake_llm, mocker):
cfg = RunConfig(
run_name="e2e-test",
population_size=5,
n_generations=2,
elite_k=1,
tournament_k=2,
p_crossover=0.5,
seed=42,
model_tier=ModelTier.C,
symbol="BTC/USDT",
timeframe="1h",
fees_bp=5.0,
n_trials_dsr=10,
db_path=tmp_path / "runs.db",
)
run_id = run_phase1(cfg, ohlcv=synthetic_ohlcv, llm=fake_llm)
repo = Repository(db_path=tmp_path / "runs.db")
run = repo.get_run(run_id)
assert run["status"] == "completed"
gens = repo.list_generations(run_id)
assert len(gens) == 2
evals = repo.list_evaluations(run_id)
assert len(evals) >= 5 # almeno una popolazione
```
- [ ] **Step 2: Run test (deve fallire)**
Run: `uv run pytest tests/integration/test_e2e_minimal_run.py -v`
Expected: FAIL.
- [ ] **Step 3: Implementare orchestrator**
```python
# src/multi_swarm/orchestrator/__init__.py
```
```python
# src/multi_swarm/orchestrator/run.py
from __future__ import annotations
import random
from dataclasses import dataclass, field
from pathlib import Path
import pandas as pd
from ..agents.adversarial import AdversarialAgent
from ..agents.falsification import FalsificationAgent
from ..agents.hypothesis import HypothesisAgent
from ..agents.market_summary import build_market_summary
from ..ga.fitness import compute_fitness
from ..ga.initial import build_initial_population
from ..ga.loop import GAConfig, next_generation
from ..ga.summary import generation_summary
from ..genome.hypothesis import HypothesisAgentGenome, ModelTier
from ..llm.client import LLMClient
from ..llm.cost_tracker import CostTracker
from ..persistence.repository import Repository
@dataclass
class RunConfig:
run_name: str
population_size: int = 20
n_generations: int = 10
elite_k: int = 2
tournament_k: int = 3
p_crossover: float = 0.5
seed: int = 42
model_tier: ModelTier = ModelTier.C
symbol: str = "BTC/USDT"
timeframe: str = "1h"
fees_bp: float = 5.0
n_trials_dsr: int = 50
db_path: Path = field(default_factory=lambda: Path("./runs.db"))
def run_phase1(
cfg: RunConfig,
ohlcv: pd.DataFrame,
llm: LLMClient,
) -> str:
rng = random.Random(cfg.seed)
repo = Repository(cfg.db_path)
repo.init_schema()
run_id = repo.create_run(name=cfg.run_name, config=cfg.__dict__ | {"db_path": str(cfg.db_path)})
market = build_market_summary(ohlcv, symbol=cfg.symbol, timeframe=cfg.timeframe)
hypothesis_agent = HypothesisAgent(llm=llm)
falsification_agent = FalsificationAgent(fees_bp=cfg.fees_bp, n_trials_dsr=cfg.n_trials_dsr)
adversarial_agent = AdversarialAgent(fees_bp=cfg.fees_bp)
cost_tracker = CostTracker()
population = build_initial_population(k=cfg.population_size, model_tier=cfg.model_tier, rng=rng)
fitnesses: dict[str, float] = {}
ga_cfg = GAConfig(
population_size=cfg.population_size,
elite_k=cfg.elite_k,
tournament_k=cfg.tournament_k,
p_crossover=cfg.p_crossover,
)
try:
for gen in range(cfg.n_generations):
for genome in population:
if genome.id in fitnesses:
continue # elite already evaluated
repo.save_genome(run_id=run_id, generation_idx=gen, genome=genome)
proposal = hypothesis_agent.propose(genome, market)
cost_record = cost_tracker.record(
input_tokens=proposal.completion.input_tokens,
output_tokens=proposal.completion.output_tokens,
tier=proposal.completion.tier,
run_id=run_id,
agent_id=genome.id,
)
repo.save_cost_record(
run_id=run_id, agent_id=genome.id, tier=cost_record.tier.value,
input_tokens=cost_record.input_tokens, output_tokens=cost_record.output_tokens,
cost_usd=cost_record.cost_usd,
)
if proposal.strategy is None:
repo.save_evaluation(
run_id=run_id, genome_id=genome.id, fitness=0.0,
dsr=0.0, dsr_pvalue=1.0, sharpe=0.0, max_dd=0.0,
total_return=0.0, n_trades=0,
parse_error=proposal.parse_error, raw_text=proposal.raw_text,
)
fitnesses[genome.id] = 0.0
continue
fals = falsification_agent.evaluate(proposal.strategy, ohlcv)
adv = adversarial_agent.review(proposal.strategy, ohlcv)
for finding in adv.findings:
repo.save_adversarial_finding(
run_id=run_id, genome_id=genome.id,
name=finding.name, severity=finding.severity.value, detail=finding.detail,
)
fit = compute_fitness(fals, adv)
repo.save_evaluation(
run_id=run_id, genome_id=genome.id, fitness=fit,
dsr=fals.dsr, dsr_pvalue=fals.dsr_pvalue, sharpe=fals.sharpe,
max_dd=fals.max_drawdown, total_return=fals.total_return,
n_trades=fals.n_trades, parse_error=None, raw_text=proposal.raw_text,
)
fitnesses[genome.id] = fit
gen_fitnesses = [fitnesses[g.id] for g in population]
summary = generation_summary(gen_fitnesses, n_bins=10)
repo.save_generation_summary(
run_id=run_id, generation_idx=gen, n_genomes=len(population),
fitness_median=summary["median"], fitness_max=summary["max"],
fitness_p90=summary["p90"], entropy=summary["entropy"],
)
if gen < cfg.n_generations - 1:
population = next_generation(population, fitnesses, ga_cfg, rng)
repo.complete_run(run_id, total_cost=repo.total_cost(run_id), status="completed")
return run_id
except Exception:
repo.complete_run(run_id, total_cost=repo.total_cost(run_id), status="failed")
raise
```
- [ ] **Step 4: Run test (deve passare)**
Run: `uv run pytest tests/integration/test_e2e_minimal_run.py -v`
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add src/multi_swarm/orchestrator/ tests/integration/
git commit -m "feat(orchestrator): end-to-end Phase 1 runner with persistence"
```
---
## Task 30: Streamlit dashboard skeleton + Overview page
**Files:**
- Create: `src/multi_swarm/dashboard/__init__.py`
- Create: `src/multi_swarm/dashboard/streamlit_app.py`
- Create: `src/multi_swarm/dashboard/data.py`
- Create: `src/multi_swarm/dashboard/pages/01_overview.py`
- Test: `tests/integration/test_streamlit_smoke.py`
`data.py` espone funzioni di lettura per le pagine Streamlit; `streamlit_app.py` è la home; `pages/01_overview.py` mostra ultima run + stato + spesa.
- [ ] **Step 1: Implementare data layer della dashboard**
```python
# src/multi_swarm/dashboard/__init__.py
```
```python
# src/multi_swarm/dashboard/data.py
from __future__ import annotations
import json
from pathlib import Path
import pandas as pd
from ..persistence.repository import Repository
def get_repo(db_path: str | Path) -> Repository:
return Repository(db_path=db_path)
def list_runs_df(repo: Repository) -> pd.DataFrame:
return pd.DataFrame(repo.list_runs())
def get_run_overview(repo: Repository, run_id: str) -> dict:
run = repo.get_run(run_id)
return {
"name": run["name"],
"started_at": run["started_at"],
"completed_at": run["completed_at"],
"status": run["status"],
"total_cost_usd": run["total_cost_usd"],
"config": json.loads(run["config_json"]),
}
def generations_df(repo: Repository, run_id: str) -> pd.DataFrame:
return pd.DataFrame(repo.list_generations(run_id))
def evaluations_df(repo: Repository, run_id: str) -> pd.DataFrame:
return pd.DataFrame(repo.list_evaluations(run_id))
def genomes_df(repo: Repository, run_id: str, generation_idx: int | None = None) -> pd.DataFrame:
rows = repo.list_genomes(run_id, generation_idx)
flat = []
for r in rows:
payload = json.loads(r["payload_json"])
flat.append({
"id": r["id"], "generation_idx": r["generation_idx"],
**payload,
})
return pd.DataFrame(flat)
```
- [ ] **Step 2: Streamlit home page**
```python
# src/multi_swarm/dashboard/streamlit_app.py
from __future__ import annotations
import os
from pathlib import Path
import streamlit as st
st.set_page_config(page_title="Multi-Swarm Phase 1", layout="wide")
st.title("Multi-Swarm Coevolutivo — Phase 1 dashboard")
st.markdown("""
Naviga le pagine nel menu a sinistra:
- **Overview**: ultima run e stato globale.
- **GA Convergence**: fitness per generazione.
- **Genomes**: top-K genomi e ispezione qualitativa.
""")
db_path = os.environ.get("DB_PATH", "./runs.db")
st.session_state["db_path"] = db_path
st.caption(f"DB path: `{Path(db_path).resolve()}`")
```
- [ ] **Step 3: Pagina Overview**
```python
# src/multi_swarm/dashboard/pages/01_overview.py
from __future__ import annotations
import streamlit as st
from multi_swarm.dashboard.data import get_repo, get_run_overview, list_runs_df
st.title("Overview")
db_path = st.session_state.get("db_path", "./runs.db")
repo = get_repo(db_path)
runs = list_runs_df(repo)
if runs.empty:
st.info("Nessuna run nel database. Esegui `scripts/run_phase1.py` per generarne una.")
st.stop()
st.subheader("Tutte le run")
st.dataframe(runs[["id", "name", "started_at", "completed_at", "status", "total_cost_usd"]])
selected = st.selectbox("Seleziona run per dettaglio", runs["id"].tolist())
overview = get_run_overview(repo, selected)
col1, col2, col3, col4 = st.columns(4)
col1.metric("Status", overview["status"])
col2.metric("Cost (USD)", f"{overview['total_cost_usd']:.4f}")
col3.metric("Started", overview["started_at"])
col4.metric("Completed", overview["completed_at"] or "—")
st.subheader("Config")
st.json(overview["config"])
```
- [ ] **Step 4: Smoke test (importabilità)**
```python
# tests/integration/test_streamlit_smoke.py
import importlib
def test_streamlit_app_imports():
# Check the modules import without exec'ing Streamlit's runtime
importlib.import_module("multi_swarm.dashboard.data")
def test_dashboard_data_helpers_signatures():
from multi_swarm.dashboard import data
assert hasattr(data, "list_runs_df")
assert hasattr(data, "generations_df")
assert hasattr(data, "evaluations_df")
assert hasattr(data, "genomes_df")
```
- [ ] **Step 5: Run smoke test**
Run: `uv run pytest tests/integration/test_streamlit_smoke.py -v`
Expected: PASS.
- [ ] **Step 6: Commit**
```bash
git add src/multi_swarm/dashboard/ tests/integration/test_streamlit_smoke.py
git commit -m "feat(dashboard): streamlit skeleton + Overview page + data layer"
```
---
## Task 31: Streamlit page — GA Convergence
**Files:**
- Create: `src/multi_swarm/dashboard/pages/02_ga_convergence.py`
- [ ] **Step 1: Implementare pagina**
```python
# src/multi_swarm/dashboard/pages/02_ga_convergence.py
from __future__ import annotations
import plotly.graph_objects as go
import streamlit as st
from multi_swarm.dashboard.data import generations_df, get_repo, list_runs_df
st.title("GA Convergence")
db_path = st.session_state.get("db_path", "./runs.db")
repo = get_repo(db_path)
runs = list_runs_df(repo)
if runs.empty:
st.info("Nessuna run.")
st.stop()
selected = st.selectbox("Run", runs["id"].tolist())
gens = generations_df(repo, selected)
if gens.empty:
st.warning("Nessuna generazione registrata per questa run.")
st.stop()
fig = go.Figure()
fig.add_trace(go.Scatter(x=gens["generation_idx"], y=gens["fitness_median"], name="median", mode="lines+markers"))
fig.add_trace(go.Scatter(x=gens["generation_idx"], y=gens["fitness_max"], name="max", mode="lines+markers"))
fig.add_trace(go.Scatter(x=gens["generation_idx"], y=gens["fitness_p90"], name="p90", mode="lines+markers"))
fig.update_layout(xaxis_title="generation", yaxis_title="fitness", title="Fitness convergence")
st.plotly_chart(fig, use_container_width=True)
st.subheader("Entropy")
fig2 = go.Figure()
fig2.add_trace(go.Scatter(x=gens["generation_idx"], y=gens["entropy"], mode="lines+markers"))
fig2.add_hline(y=0.5, line_dash="dash", annotation_text="gate threshold (0.5)")
fig2.update_layout(xaxis_title="generation", yaxis_title="entropy", title="Diversity (fitness entropy)")
st.plotly_chart(fig2, use_container_width=True)
st.subheader("Tabella generazioni")
st.dataframe(gens)
```
- [ ] **Step 2: Smoke test (importabilità)**
Run: `uv run python -c "import importlib; importlib.import_module('multi_swarm.dashboard.pages.02_ga_convergence')"`
Note: Streamlit pages prefixed with digits possono essere problematici per import diretto. Per il test possiamo ridurre a verifica della pagina via filesystem.
```bash
test -f src/multi_swarm/dashboard/pages/02_ga_convergence.py && echo OK
```
Expected: stampa `OK`.
- [ ] **Step 3: Commit**
```bash
git add src/multi_swarm/dashboard/pages/02_ga_convergence.py
git commit -m "feat(dashboard): GA convergence page (median/max/p90 + entropy)"
```
---
## Task 32: Streamlit page — Genomes (basic)
**Files:**
- Create: `src/multi_swarm/dashboard/pages/03_genomes.py`
- [ ] **Step 1: Implementare pagina**
```python
# src/multi_swarm/dashboard/pages/03_genomes.py
from __future__ import annotations
import streamlit as st
from multi_swarm.dashboard.data import (
evaluations_df, genomes_df, get_repo, list_runs_df,
)
st.title("Genomes")
db_path = st.session_state.get("db_path", "./runs.db")
repo = get_repo(db_path)
runs = list_runs_df(repo)
if runs.empty:
st.info("Nessuna run.")
st.stop()
selected = st.selectbox("Run", runs["id"].tolist())
evals = evaluations_df(repo, selected)
genomes = genomes_df(repo, selected)
if evals.empty:
st.warning("Nessuna evaluation.")
st.stop()
merged = evals.merge(genomes, left_on="genome_id", right_on="id", how="left", suffixes=("", "_g"))
top = merged.sort_values("fitness", ascending=False).head(10)
st.subheader("Top-10 genomi (per fitness)")
display_cols = [
"genome_id", "fitness", "dsr", "sharpe", "max_dd", "n_trades",
"cognitive_style", "temperature", "lookback_window", "feature_access",
]
existing = [c for c in display_cols if c in top.columns]
st.dataframe(top[existing])
st.subheader("Ispezione genoma")
gid = st.selectbox("Seleziona genome_id", top["genome_id"].tolist())
row = merged[merged["genome_id"] == gid].iloc[0]
col1, col2 = st.columns(2)
with col1:
st.metric("fitness", f"{row['fitness']:.3f}")
st.metric("DSR", f"{row['dsr']:.3f}")
st.metric("Sharpe", f"{row['sharpe']:.3f}")
with col2:
st.metric("max DD", f"{row['max_dd']:.3f}")
st.metric("trades", int(row["n_trades"]))
st.metric("style", str(row.get("cognitive_style", "—")))
st.subheader("System prompt")
st.code(row.get("system_prompt", "—"))
st.subheader("Raw LLM output")
st.code(row.get("raw_text", "—"))
if row.get("parse_error"):
st.error(f"Parse error: {row['parse_error']}")
```
- [ ] **Step 2: Smoke check filesystem**
Run: `test -f src/multi_swarm/dashboard/pages/03_genomes.py && echo OK`
Expected: stampa `OK`.
- [ ] **Step 3: Commit**
```bash
git add src/multi_swarm/dashboard/pages/03_genomes.py
git commit -m "feat(dashboard): Genomes page (top-10 + inspection)"
```
---
## Task 33: Script di entry point per Phase 1
**Files:**
- Create: `scripts/__init__.py`
- Create: `scripts/run_phase1.py`
Lo script orchestra il run reale: carica OHLCV, costruisce LLMClient con API key da .env, esegue `run_phase1`. Configurabile via CLI args con argparse.
- [ ] **Step 1: Implementare script**
```python
# scripts/__init__.py
```
```python
# scripts/run_phase1.py
from __future__ import annotations
import argparse
from datetime import datetime, timezone
from pathlib import Path
from multi_swarm.config import load_settings
from multi_swarm.data.ohlcv_loader import OHLCVLoader, OHLCVRequest
from multi_swarm.genome.hypothesis import ModelTier
from multi_swarm.llm.client import LLMClient
from multi_swarm.orchestrator.run import RunConfig, run_phase1
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Multi-Swarm Phase 1 runner")
p.add_argument("--name", default="phase1-spike-001")
p.add_argument("--population-size", type=int, default=20)
p.add_argument("--n-generations", type=int, default=10)
p.add_argument("--elite-k", type=int, default=2)
p.add_argument("--tournament-k", type=int, default=3)
p.add_argument("--p-crossover", type=float, default=0.5)
p.add_argument("--seed", type=int, default=42)
p.add_argument("--symbol", default="BTC/USDT")
p.add_argument("--timeframe", default="1h")
p.add_argument("--start", default="2024-01-01T00:00:00+00:00")
p.add_argument("--end", default="2026-01-01T00:00:00+00:00")
p.add_argument("--fees-bp", type=float, default=5.0)
p.add_argument("--n-trials-dsr", type=int, default=50)
return p.parse_args()
def main() -> None:
args = parse_args()
settings = load_settings()
loader = OHLCVLoader(cache_dir=settings.series_dir)
req = OHLCVRequest(
symbol=args.symbol,
timeframe=args.timeframe,
start=datetime.fromisoformat(args.start),
end=datetime.fromisoformat(args.end),
)
ohlcv = loader.load(req)
print(f"OHLCV loaded: {len(ohlcv)} bars from {ohlcv.index[0]} to {ohlcv.index[-1]}")
llm = LLMClient(
openrouter_api_key=settings.openrouter_api_key.get_secret_value(),
anthropic_api_key=(
settings.anthropic_api_key.get_secret_value()
if settings.anthropic_api_key else None
),
)
cfg = RunConfig(
run_name=args.name,
population_size=args.population_size,
n_generations=args.n_generations,
elite_k=args.elite_k,
tournament_k=args.tournament_k,
p_crossover=args.p_crossover,
seed=args.seed,
model_tier=ModelTier.C,
symbol=args.symbol,
timeframe=args.timeframe,
fees_bp=args.fees_bp,
n_trials_dsr=args.n_trials_dsr,
db_path=settings.db_path,
)
run_id = run_phase1(cfg, ohlcv=ohlcv, llm=llm)
print(f"Run completed: {run_id}")
if __name__ == "__main__":
main()
```
- [ ] **Step 2: Verifica importabilità**
Run: `uv run python -c "from scripts import run_phase1; print(run_phase1.__doc__ or 'ok')"`
Expected: stampa `ok`.
- [ ] **Step 3: Commit**
```bash
git add scripts/
git commit -m "feat(scripts): Phase 1 runner CLI entry point"
```
---
## Task 34: Smoke run (popolazione minima, 1 generazione, dry data)
**Files:**
- Create: `scripts/smoke_run.py`
Smoke run usa OHLCV sintetico generato in memoria + popolazione 3 + 1 generazione. Niente API LLM reale: usa `MockLLMClient` che restituisce strategy fissa. Serve a validare che tutto il loop gira senza errori prima di spendere token reali.
- [ ] **Step 1: Implementare smoke**
```python
# scripts/smoke_run.py
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
from multi_swarm.genome.hypothesis import HypothesisAgentGenome, ModelTier
from multi_swarm.llm.client import CompletionResult
from multi_swarm.orchestrator.run import RunConfig, run_phase1
class MockLLMClient:
def complete(
self, genome: HypothesisAgentGenome, system: str, user: str,
max_tokens: int = 2000,
) -> CompletionResult:
text = (
"```lisp\n"
"(strategy"
" (when (gt (indicator rsi 14) 70.0) (entry-short))"
" (when (lt (indicator rsi 14) 30.0) (entry-long)))\n"
"```"
)
return CompletionResult(
text=text, input_tokens=120, output_tokens=60,
tier=genome.model_tier, model="mock",
)
def main() -> None:
idx = pd.date_range("2024-01-01", periods=1000, freq="1h", tz="UTC")
close = 100 + np.cumsum(np.random.RandomState(0).normal(0.01, 1.0, 1000))
ohlcv = pd.DataFrame(
{"open": close, "high": close + 0.5, "low": close - 0.5, "close": close, "volume": 1.0},
index=idx,
)
cfg = RunConfig(
run_name="smoke",
population_size=3,
n_generations=1,
elite_k=1,
tournament_k=2,
p_crossover=0.5,
seed=0,
model_tier=ModelTier.C,
db_path=Path("./runs.db"),
)
run_id = run_phase1(cfg, ohlcv=ohlcv, llm=MockLLMClient()) # type: ignore[arg-type]
print(f"Smoke run completed: {run_id}")
if __name__ == "__main__":
main()
```
- [ ] **Step 2: Run smoke**
Run: `uv run python scripts/smoke_run.py`
Expected: stampa `Smoke run completed: <hex>`. File `runs.db` esiste con 3 genomi e 1 generazione.
- [ ] **Step 3: Commit**
```bash
git add scripts/smoke_run.py
git commit -m "feat(scripts): smoke run with mock LLM and synthetic OHLCV"
```
---
## Task 35: Validazione Streamlit dashboard via dataset reale dello smoke run
**Files:**
- (no new files)
- [ ] **Step 1: Avviare dashboard sul DB della smoke run**
Run: `DB_PATH=./runs.db uv run streamlit run src/multi_swarm/dashboard/streamlit_app.py`
Expected: il browser apre `http://localhost:8501`. Le 3 pagine (Overview, GA Convergence, Genomes) mostrano dati senza errori.
- [ ] **Step 2: Verifica visiva (lista da spuntare manualmente)**
- [ ] Overview elenca la run "smoke" con status `completed` e cost > 0.
- [ ] GA Convergence mostra 1 punto per generazione 0 (sarebbero 1 punto su asse x).
- [ ] Genomes mostra 3 genomi nella tabella.
- [ ] Clic su un genome_id mostra system_prompt e raw_text.
Se uno qualunque fallisce, fix prima di chiudere il task. Documenta eventuali bug in `docs/runs/`.
- [ ] **Step 3: Stop dashboard, commit eventuali fix**
```bash
# Solo se sono stati fatti fix
git add -A
git commit -m "fix(dashboard): correggere <bug specifico>"
```
---
## Task 36: Run completo Phase 1 con LLM reale (K=20, 10 generazioni, OHLCV 2 anni)
**Files:**
- Modify: nessuno (solo esecuzione)
- Create: `docs/runs/2026-MM-DD-phase1-run-001.md`
Questo è l'**evento operativo** della Phase 1: il primo run reale. Pre-requisiti:
- Cerbero locale **non** strettamente necessario per Phase 1 (il compiler usa indicatori built-in). Avviare Cerbero solo se gli agenti vorranno chiamare tool MCP per ispezione, ma in Phase 1 il prompt non lo prevede esplicitamente.
- API key OpenRouter configurata in `.env`.
- Budget tracker attivato — monitorare la spesa durante il run.
- [ ] **Step 1: Pre-flight check**
```bash
uv run pytest # tutta la suite verde
uv run ruff check src/ tests/ # linter pulito
uv run mypy src/multi_swarm # type check pulito (ammessi ignore mirati documentati)
```
Expected: tutti verde.
- [ ] **Step 2: Esegui run reale**
```bash
uv run python scripts/run_phase1.py \
--name phase1-run-001 \
--population-size 20 \
--n-generations 10 \
--elite-k 2 \
--tournament-k 3 \
--p-crossover 0.5 \
--seed 42 \
--symbol BTC/USDT \
--timeframe 1h \
--start 2024-01-01T00:00:00+00:00 \
--end 2026-01-01T00:00:00+00:00
```
Expected: durata stimata 30-90 minuti, spesa stimata $40-90 (single run, una su 5-10 totali fino a fine Phase 1).
**Monitoring**: in altra shell, controllare cumulato cost ogni 5 minuti via dashboard Overview, oppure:
```bash
sqlite3 runs.db "SELECT total_cost_usd FROM runs WHERE name='phase1-run-001'"
```
Stop manuale (`Ctrl+C`) se la spesa cumulata supera $120 — sintomo di token output runaway.
- [ ] **Step 3: Apri dashboard e ispeziona**
Run: `DB_PATH=./runs.db uv run streamlit run src/multi_swarm/dashboard/streamlit_app.py`
Verifica che:
- 10 generazioni siano presenti.
- 20 genomi per generazione, almeno 16 con `parse_error IS NULL`.
- Top-5 genomi abbiano DSR ragionevole (>0).
- [ ] **Step 4: Documenta il run**
Crea `docs/runs/2026-MM-DD-phase1-run-001.md` (sostituire MM-DD con la data effettiva) con:
```markdown
# Phase 1 — Run 001
**Data**: <YYYY-MM-DD>
**Config**: K=20, 10 gen, seed=42, symbol BTC/USDT 1h, dataset 2024-2026.
**Costo finale**: $<x.xx>
**Durata wall-clock**: <hh:mm>
## Risultati sintetici
- Top fitness: <x.xx>
- Median fitness gen finale: <x.xx>
- Entropia gen finale: <x.xx>
- % parse success: <xx>%
- # genomi con DSR > 0.5: <n>
## Anomalie
- (es. parse error frequenti su prompt cognitive_style "engineer", da investigare)
## Learning
- ...
## Action items
- ...
```
- [ ] **Step 5: Commit**
```bash
git add docs/runs/
git commit -m "docs(runs): Phase 1 run-001 report"
```
---
## Task 37: Decision memo Phase 1 (gate evaluation)
**Files:**
- Create: `docs/decisions/2026-MM-DD-gate-phase1.md`
Compilare il decision memo gate Phase 1 sulla base dei risultati del run-001 (eventualmente più run se serve aggregare).
- [ ] **Step 1: Author pass — scrivere il memo**
```markdown
# Gate Phase 1 — Decision Memo
**Data**: <YYYY-MM-DD>
**Run analizzati**: phase1-run-001 [, phase1-run-002, ...]
**Spesa totale Phase 1**: $<x.xx> di $700 cap (=<xx>%)
**Tempo speso Phase 1**: <n> settimane
## Hard gate evaluation
| # | Gate | Soglia | Misura | Esito |
|---|------|--------|--------|-------|
| 1 | Loop converge (median ↑ ≥3 gen) | 3 gen consecutive crescita | <descrivi> | PASS/FAIL |
| 2 | Output formalizzabile | ≥80% parse success | <xx>% | PASS/FAIL |
| 3 | Tail superiore | top-5 DSR ≥ 1.5x median | <descrivi> | PASS/FAIL |
| 4 | Diversità non collassa | entropy > 0.5 a fine run | <xx> | PASS/FAIL |
| 5 | Cost predictability | spesa entro ±30% stima | <xx>% deviazione | PASS/FAIL |
## Conclusione (author)
PASS / FAIL con razionale numerico ancorato alla tabella sopra.
## Aggiustamenti raccomandati per Phase 2 (se PASS)
- ...
## Pivot/stop raccomandato (se FAIL)
- ...
```
- [ ] **Step 2: Review pass — adversarial review del memo**
Scegli una delle 3 opzioni dello spec sez. 9.2:
- subagent Claude red-team con prompt esplicito
- collega umano
- timer 48h fresh-eyes pass
Aggiungi al memo una sezione `## Review pass (red team)` con la critica e le contro-evidenze.
- [ ] **Step 3: Sintesi finale e decisione**
Aggiungi `## Decisione finale` con uno di:
- GO Phase 2 (specificare scope, eventuali aggiustamenti)
- ITERATE Phase 1 (specificare cosa cambiare e re-run)
- PIVOT (specificare nuovo dominio o nuovo approach)
- STOP (specificare razionale e learnings)
- [ ] **Step 4: Commit**
```bash
git add docs/decisions/
git commit -m "docs(decisions): Phase 1 gate decision memo with author + review pass"
```
---
## Task 38: Report tecnico Phase 1
**Files:**
- Create: `docs/reports/2026-MM-DD-phase1-technical-report.md`
Report ~5 pagine come da spec Sez. 4.5. Contenuti:
1. Setup sperimentale (config, dataset, periodo, seed).
2. Loop convergence (grafico fitness mediana / max / p90 per generazione, screenshot dashboard).
3. Top-5 genomi: ispezione qualitativa (system_prompt, parametri, strategia generata, performance).
4. Parser failure modes: tassonomia degli errori di parse osservati, suggerimenti per Phase 2.
5. Costi reali vs preventivo: breakdown per tier, per agent, identificare ottimizzazioni.
6. Diversity metrics: entropia per generazione, distinct cognitive_style sopravvissuti.
- [ ] **Step 1: Generare grafici dalla dashboard**
Procedura: aprire la dashboard, fare screenshot delle pagine GA Convergence e Genomes, salvarli in `docs/reports/figures/phase1/`.
- [ ] **Step 2: Scrivere il report**
Fornire il file con la struttura sopra. Usare prosa italiana piena (regola CLAUDE.md per public artifacts).
- [ ] **Step 3: Commit**
```bash
git add docs/reports/
git commit -m "docs(reports): Phase 1 technical report"
```
---
## Self-review
Dopo aver completato la stesura, rilettura del plan a freddo per verificare:
**1. Spec coverage**
- Scope IN Phase 1 (spec sez. 4.1):
- Backtest engine event-driven 1h walk-forward 70/30 → Task 6 (engine), Task 4 (splits) ✓
- Cerbero wrapper come tool layer → Task 9-10 ✓
- Protocollo S-expr fisso 12-15 verbi → Task 11-13 ✓
- Hypothesis Swarm K=20 tier C → Task 27 (initial) + Task 19 (agent) + Task 33 (run script) ✓
- Falsification + Adversarial hand-crafted → Task 20-21 ✓
- Fitness v0 (DSR + drawdown penalty) → Task 22 ✓
- GA loop 8-12 generazioni, tournament + elitism → Task 23-24 + Task 33 (default 10 gen) ✓
- Hard gates (spec sez. 4.4):
- 1 loop converge → Task 26 (summary helpers per misurare) + Task 37 (memo) ✓
- 2 parser >80% → repository memorizza parse_error, Task 37 lo misura ✓
- 3 tail superiore → query SQL su evaluations ✓
- 4 entropy > 0.5 → Task 26 + Task 31 (dashboard mostra hline) ✓
- 5 cost predictability → Task 18 (tracker) + Task 25 (DB) + Task 37 (memo) ✓
- GUI Phase 1 (spec sez. 7.2):
- Overview ✓ Task 30
- GA Convergence ✓ Task 31
- Genomes basic ✓ Task 32
- Deliverable Phase 1 (spec sez. 4.5):
- Codice testato ✓ tutti task con TDD
- Report tecnico ~5 pp ✓ Task 38
- Decision memo ✓ Task 37
**2. Placeholder scan**
- Date YYYY-MM-DD lasciate da compilare nei task 36/37/38: questi sono naturalmente dipendenti dalla data di esecuzione, non sono placeholder di logica. Marcare come "compila al momento del run".
- Pricing LLM in Task 18 è approssimativo: aggiornare con valori reali se OpenRouter cambia tariffa (controllare a inizio run).
- Nessun TBD/TODO nel codice.
**3. Type consistency**
- `HypothesisAgentGenome` interfaccia stabile in tutti i task (id, generation, parent_ids, model_tier).
- `Side` enum coerente: LONG/SHORT/FLAT in backtest, compiler, agents, dashboard.
- `Strategy`/`Rule`/`Node` AST consistenti fra parser → validator → compiler.
- `FalsificationReport` campi usati identici in fitness (Task 22) e repository (Task 25): `dsr`, `dsr_pvalue`, `sharpe`, `max_drawdown`, `total_return`, `n_trades`. ✓
- `AdversarialReport.findings` usato da fitness e repository: `name`, `severity`, `detail` consistenti. ✓
- `CompletionResult` campi `text`, `input_tokens`, `output_tokens`, `tier`, `model`: identici fra LLMClient (Task 17), CostTracker (Task 18), HypothesisAgent (Task 19), Orchestrator (Task 29). ✓
**4. Granularità**
- Task piccoli e atomici (3-5 step), 38 task totali → ~150-200 step. Coerente con stima 4-6 settimane full-time.
- Test integration Task 29 e Task 35-36 richiedono setup più grande, ma sono passi singoli con sub-checklist esplicita.
Nessuna correzione necessaria. Il plan è pronto.
---
## Execution handoff
Plan completo salvato in `docs/superpowers/plans/2026-05-09-phase1-lean-spike.md`.
**Due opzioni di esecuzione:**
1. **Subagent-Driven (raccomandata)** — un fresh subagent per task, review fra task, iterazione rapida.
2. **Inline Execution** — task eseguiti in questa stessa sessione con checkpoint per review.
Quale approccio?