refactor(protocol): swap S-expression grammar for strict JSON Schema

Sostituisce la grammatica S-expression con uno schema JSON stretto. La
grammatica S-expression falliva il parsing nel 64% delle generazioni del
modello Qwen3-235B sul run reale; JSON e' nativo per gli LLM moderni e
si parsa con json.loads.

Cambiamenti principali:
- grammar.py: costanti rinominate LOGICAL_OPS / COMPARATOR_OPS /
  CROSSOVER_OPS / ACTION_VALUES / KIND_VALUES.
- parser.py: nuovo AST a dataclass tipizzato (OpNode, IndicatorNode,
  FeatureNode, LiteralNode, Rule, Strategy); parse_strategy ora consuma
  JSON tramite json.loads.
- validator.py: walk dispatchato per tipo (isinstance) invece di
  pattern-matching su 'kind'; arity check su operatori e indicator.
- compiler.py: traversal del nuovo AST tipizzato, dispatch per
  isinstance; logica indicator/feature/literal invariata.
- hypothesis.py: prompt SYSTEM riscritto con esempi JSON e vincoli
  espliciti su no-nesting; estrazione via fence ```json``` + fallback
  brace-balanced.
- __init__.py: re-export pubblico delle entita' del protocollo.
- Tutti i test (parser, validator, compiler, hypothesis_agent,
  falsification, adversarial, e2e, smoke_run) migrati a JSON.
- Rimossa dipendenza sexpdata da pyproject.toml + uv.lock.

Test: 135 passed (era 122; aggiunti casi parser/validator).
ruff + mypy strict clean. Smoke run end-to-end OK.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-10 21:17:26 +02:00
parent df76906505
commit 44eb6436c1
16 changed files with 1082 additions and 392 deletions
+114 -42
View File
@@ -35,42 +35,76 @@ Sei un agente generatore di ipotesi di trading quantitativo per un sistema swarm
Il tuo stile cognitivo: {cognitive_style}
Direttiva personale: {system_prompt}
Devi proporre una strategia di trading espressa nel linguaggio S-expression
con i seguenti verbi disponibili:
Devi proporre una strategia di trading espressa in JSON STRETTO.
La risposta deve essere un singolo oggetto JSON dentro fence ```json...```
con questa shape:
Azioni: entry-long, entry-short, exit, flat
Logici: and, or, not
Comparatori: gt, lt, eq
Dati: feature, indicator, crossover, crossunder
```json
{{
"rules": [
{{"condition": <nodo>, "action": "entry-long|entry-short|exit|flat"}}
]
}}
```
Indicatori disponibili (calcolati implicitamente sul prezzo close):
sma <length>, rsi <length>, atr <length>, macd, realized_vol <window>.
Feature disponibili: open, high, low, close, volume.
NODI DISPONIBILI
REGOLE STRETTE DI SINTASSI:
- (indicator <name> <args...>) restituisce una serie numerica. Es.
(indicator rsi 14), (indicator sma 50), (indicator macd 12 26 9).
- (feature <name>) restituisce la colonna OHLCV. Es. (feature close).
- Gli indicatori NON sono annidabili: NON puoi scrivere
(sma (indicator realized_vol 30) 150) o (indicator rsi (feature high) 14).
Le funzioni sma/rsi/etc. ESISTONO SOLO come argomenti di indicator,
non sono verbi indipendenti.
- Costanti numeriche (es. 70.0, 30, 0.02) sono valide come 2° operando di gt/lt/eq.
- crossover/crossunder accettano due espressioni-serie:
(crossover (feature close) (indicator sma 20)) — corretto.
(crossover (sma close 20) (sma close 50)) — ERRATO (sma non è verbo).
Operatori logici:
{{"op": "and", "args": [<nodo>, <nodo>, ...]}} // >=2 nodi
{{"op": "or", "args": [<nodo>, <nodo>, ...]}} // >=2 nodi
{{"op": "not", "args": [<nodo>]}} // 1 nodo
Le regole sono valutate in ordine; la prima che matcha vince per ogni timestamp.
La default action se nessuna regola matcha è 'flat'.
Comparatori (ritornano boolean series):
{{"op": "gt", "args": [<a>, <b>]}} // a > b
{{"op": "lt", "args": [<a>, <b>]}} // a < b
{{"op": "eq", "args": [<a>, <b>]}} // a == b
Rispondi SOLO con la S-expression in un fence ```lisp ... ```, senza prosa,
senza spiegazioni. Esempio formato:
Crossover (eventi su 2 serie):
{{"op": "crossover", "args": [<serie_a>, <serie_b>]}}
{{"op": "crossunder", "args": [<serie_a>, <serie_b>]}}
```lisp
(strategy
(when (gt (indicator rsi 14) 70.0) (entry-short))
(when (lt (indicator rsi 14) 30.0) (entry-long))
(when (crossover (feature close) (indicator sma 50)) (entry-long)))
Leaf - indicatori (calcolati su close):
{{"kind": "indicator", "name": "sma", "params": [<length>]}}
{{"kind": "indicator", "name": "rsi", "params": [<length>]}}
{{"kind": "indicator", "name": "atr", "params": [<length>]}}
{{"kind": "indicator", "name": "realized_vol", "params": [<window>]}}
{{"kind": "indicator", "name": "macd", "params": [<fast>, <slow>, <signal>]}}
// 0-3 numeri (tutti opzionali con default 12, 26, 9)
Leaf - feature OHLCV:
{{"kind": "feature", "name": "open|high|low|close|volume"}}
Leaf - letterale numerico:
{{"kind": "literal", "value": 70.0}}
VINCOLI
- Gli indicator NON sono annidabili: 'params' accetta solo numeri, mai altri nodi.
- Le regole sono valutate in ordine; la prima che matcha vince per ogni timestamp.
- Default action se nessuna regola matcha = flat.
- 'op' e 'kind' sono mutuamente esclusivi sullo stesso nodo.
Rispondi SOLO con il fence ```json...``` contenente l'oggetto strategy.
Esempio:
```json
{{
"rules": [
{{
"condition": {{"op": "gt", "args": [
{{"kind": "indicator", "name": "rsi", "params": [14]}},
{{"kind": "literal", "value": 70.0}}
]}},
"action": "entry-short"
}},
{{
"condition": {{"op": "lt", "args": [
{{"kind": "indicator", "name": "rsi", "params": [14]}},
{{"kind": "literal", "value": 30.0}}
]}},
"action": "entry-long"
}}
]
}}
```
"""
@@ -79,7 +113,7 @@ USER_TEMPLATE = """\
Mercato: {symbol} timeframe {timeframe}, {n_bars} barre osservate.
Statistiche return: mean={return_mean:.5f}, std={return_std:.5f}, \
skew={skew:.3f}, kurt={kurtosis:.3f}.
Regime volatilità: {volatility_regime}.
Regime volatilità : {volatility_regime}.
Feature accessibili dal tuo genoma: {feature_access}.
Lookback massimo che puoi usare nel ragionamento: {lookback_window} barre.
@@ -88,19 +122,57 @@ Genera una strategia che cerchi anomalie sfruttabili in questo regime.
"""
_SEXP_FENCE_RE = re.compile(
r"```(?:lisp|scheme|sexp)?\s*(\(strategy[\s\S]*?\))\s*```",
_JSON_FENCE_RE = re.compile(
r"```(?:json)?\s*(\{[\s\S]*\})\s*```",
re.MULTILINE,
)
def _extract_sexp(text: str) -> str | None:
m = _SEXP_FENCE_RE.search(text)
def _balance_braces(s: str) -> str | None:
"""Ritorna il prefix di ``s`` che chiude la prima ``{`` con bilanciamento.
Usato come fallback quando l'LLM ritorna JSON top-level senza fence ma
seguito da prosa: troviamo dove finisce il primo oggetto e tagliamo.
"""
if not s.startswith("{"):
return None
depth = 0
in_string = False
escape = False
for i, ch in enumerate(s):
if in_string:
if escape:
escape = False
elif ch == "\\":
escape = True
elif ch == '"':
in_string = False
continue
if ch == '"':
in_string = True
elif ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return s[: i + 1]
return None
def _extract_json(text: str) -> str | None:
"""Estrai un oggetto JSON dal testo del completion.
Strategie di estrazione, in ordine:
1. Fence ```json...``` (greedy: cattura fino all'ultimo ``}`` prima della
chiusura del fence).
2. Testo che inizia direttamente con ``{`` (dopo strip), bilanciato a
livello di parentesi graffe.
"""
m = _JSON_FENCE_RE.search(text)
if m:
return m.group(1)
if text.strip().startswith("(strategy"):
return text.strip()
return None
stripped = text.strip()
return _balance_braces(stripped)
class HypothesisAgent:
@@ -131,16 +203,16 @@ class HypothesisAgent:
completion = self._llm.complete(genome, system=system, user=user)
sexp = _extract_sexp(completion.text)
if sexp is None:
payload = _extract_json(completion.text)
if payload is None:
return HypothesisProposal(
strategy=None,
raw_text=completion.text,
completion=completion,
parse_error="no s-expression found in output",
parse_error="no JSON object found in output",
)
try:
ast = parse_strategy(sexp)
ast = parse_strategy(payload)
validate_strategy(ast)
return HypothesisProposal(
strategy=ast,
+30
View File
@@ -0,0 +1,30 @@
"""Protocol layer: JSON-based strategy grammar + parser + validator + compiler."""
from .compiler import compile_strategy
from .parser import (
FeatureNode,
IndicatorNode,
LiteralNode,
Node,
OpNode,
ParseError,
Rule,
Strategy,
parse_strategy,
)
from .validator import ValidationError, validate_strategy
__all__ = [
"FeatureNode",
"IndicatorNode",
"LiteralNode",
"Node",
"OpNode",
"ParseError",
"Rule",
"Strategy",
"ValidationError",
"compile_strategy",
"parse_strategy",
"validate_strategy",
]
+74 -86
View File
@@ -12,9 +12,9 @@ Design notes
a different concrete signature (``(df, length)`` vs ``(df, fast, slow)``);
modelling that under ``mypy --strict`` would require a ``Protocol`` per
arity, which is overkill for the Phase 1 indicator subset.
* Numeric leaves coming out of :mod:`sexpdata` arrive as ``int`` / ``float``
/ ``str``; we widen via :func:`_to_series` to broadcast them along the
DataFrame index for arithmetic comparisons.
* I parametri di un :class:`IndicatorNode` sono sempre ``float``; cast a
``int`` per indicatori con argomenti tipo "length" è deferito alle helper
(``_ind_sma``, ecc.) attraverso ``int(...)``.
"""
from __future__ import annotations
@@ -26,7 +26,14 @@ import numpy as np
import pandas as pd # type: ignore[import-untyped]
from ..backtest.orders import Side
from .parser import Node, Strategy
from .parser import (
FeatureNode,
IndicatorNode,
LiteralNode,
Node,
OpNode,
Strategy,
)
def _sma(s: pd.Series, length: int) -> pd.Series:
@@ -61,27 +68,30 @@ def _realized_vol(s: pd.Series, window: int) -> pd.Series:
return returns.rolling(window, min_periods=1).std() * np.sqrt(window)
def _ind_sma(df: pd.DataFrame, length: int) -> pd.Series:
return _sma(df["close"], length)
def _ind_sma(df: pd.DataFrame, length: float) -> pd.Series:
return _sma(df["close"], int(length))
def _ind_rsi(df: pd.DataFrame, length: int) -> pd.Series:
return _rsi(df["close"], length)
def _ind_rsi(df: pd.DataFrame, length: float) -> pd.Series:
return _rsi(df["close"], int(length))
def _ind_atr(df: pd.DataFrame, length: int) -> pd.Series:
return _atr(df, length)
def _ind_atr(df: pd.DataFrame, length: float) -> pd.Series:
return _atr(df, int(length))
def _ind_realized_vol(df: pd.DataFrame, window: int) -> pd.Series:
return _realized_vol(df["close"], window)
def _ind_realized_vol(df: pd.DataFrame, window: float) -> pd.Series:
return _realized_vol(df["close"], int(window))
def _ind_macd(
df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9,
df: pd.DataFrame,
fast: float = 12,
slow: float = 26,
signal: float = 9,
) -> pd.Series:
macd_line = _sma(df["close"], fast) - _sma(df["close"], slow)
signal_line = _sma(macd_line, signal)
macd_line = _sma(df["close"], int(fast)) - _sma(df["close"], int(slow))
signal_line = _sma(macd_line, int(signal))
return macd_line - signal_line
@@ -98,16 +108,9 @@ INDICATOR_FNS: dict[str, Any] = {
}
def _to_series(value: object, df: pd.DataFrame) -> pd.Series:
def _to_series(value: float, df: pd.DataFrame) -> pd.Series:
"""Broadcast a numeric literal across the DataFrame index."""
return pd.Series(float(value), index=df.index) # type: ignore[arg-type]
def _eval_arg(arg: Any, df: pd.DataFrame) -> pd.Series:
"""Evaluate either a child Node or a scalar literal into a Series."""
if isinstance(arg, Node):
return _eval_node(arg, df)
return _to_series(arg, df)
return pd.Series(float(value), index=df.index)
def _compare_with_nan(result: pd.Series, a: pd.Series, b: pd.Series) -> pd.Series:
@@ -124,71 +127,60 @@ def _compare_with_nan(result: pd.Series, a: pd.Series, b: pd.Series) -> pd.Serie
return out
def _eval_bool_arg(arg: Any, df: pd.DataFrame) -> pd.Series:
"""Evaluate either a child Node (bool series) or a literal into a bool Series."""
if isinstance(arg, Node):
return _eval_node(arg, df).fillna(False).astype(bool)
return pd.Series(bool(arg), index=df.index)
def _eval_bool_arg(node: Node, df: pd.DataFrame) -> pd.Series:
"""Evaluate a child Node into a boolean Series (NaN -> False)."""
return _eval_node(node, df).fillna(False).astype(bool)
def _eval_node(node: Node, df: pd.DataFrame) -> pd.Series:
kind = node.kind
if isinstance(node, FeatureNode):
return df[node.name]
if kind == "feature":
feat = node.args[0]
feat_name = feat.kind if isinstance(feat, Node) else str(feat)
return df[feat_name]
if kind == "indicator":
name_node = node.args[0]
ind_name = name_node.kind if isinstance(name_node, Node) else str(name_node)
params = [a for a in node.args[1:] if not isinstance(a, Node)]
fn = INDICATOR_FNS[ind_name]
result: pd.Series = fn(df, *params)
if isinstance(node, IndicatorNode):
fn = INDICATOR_FNS[node.name]
result: pd.Series = fn(df, *node.params)
return result
if kind == "gt":
a = _eval_arg(node.args[0], df)
b = _eval_arg(node.args[1], df)
return _compare_with_nan(a > b, a, b)
if isinstance(node, LiteralNode):
return _to_series(node.value, df)
if kind == "lt":
a = _eval_arg(node.args[0], df)
b = _eval_arg(node.args[1], df)
return _compare_with_nan(a < b, a, b)
if isinstance(node, OpNode):
op = node.op
if op == "gt":
a = _eval_node(node.args[0], df)
b = _eval_node(node.args[1], df)
return _compare_with_nan(a > b, a, b)
if op == "lt":
a = _eval_node(node.args[0], df)
b = _eval_node(node.args[1], df)
return _compare_with_nan(a < b, a, b)
if op == "eq":
a = _eval_node(node.args[0], df)
b = _eval_node(node.args[1], df)
return _compare_with_nan(a == b, a, b)
if op == "and":
result = pd.Series(True, index=df.index)
for a in node.args:
result &= _eval_bool_arg(a, df)
return result
if op == "or":
result = pd.Series(False, index=df.index)
for a in node.args:
result |= _eval_bool_arg(a, df)
return result
if op == "not":
return ~_eval_bool_arg(node.args[0], df)
if op == "crossover":
a = _eval_node(node.args[0], df)
b = _eval_node(node.args[1], df)
return ((a > b) & (a.shift() <= b.shift())).fillna(False).astype(bool)
if op == "crossunder":
a = _eval_node(node.args[0], df)
b = _eval_node(node.args[1], df)
return ((a < b) & (a.shift() >= b.shift())).fillna(False).astype(bool)
raise RuntimeError(f"unsupported op in compiler: {op}")
if kind == "eq":
a = _eval_arg(node.args[0], df)
b = _eval_arg(node.args[1], df)
return _compare_with_nan(a == b, a, b)
if kind == "and":
result = pd.Series(True, index=df.index)
for a in node.args:
result &= _eval_bool_arg(a, df)
return result
if kind == "or":
result = pd.Series(False, index=df.index)
for a in node.args:
result |= _eval_bool_arg(a, df)
return result
if kind == "not":
s = _eval_bool_arg(node.args[0], df)
return ~s
if kind == "crossover":
a = _eval_arg(node.args[0], df)
b = _eval_arg(node.args[1], df)
return ((a > b) & (a.shift() <= b.shift())).fillna(False).astype(bool)
if kind == "crossunder":
a = _eval_arg(node.args[0], df)
b = _eval_arg(node.args[1], df)
return ((a < b) & (a.shift() >= b.shift())).fillna(False).astype(bool)
raise RuntimeError(f"unsupported node in compiler: {kind}")
raise RuntimeError(f"unsupported node type in compiler: {type(node).__name__}")
_ACTION_TO_SIDE: dict[str, Side] = {
@@ -199,10 +191,6 @@ _ACTION_TO_SIDE: dict[str, Side] = {
}
def _action_to_side(action: Node) -> Side:
return _ACTION_TO_SIDE[action.kind]
def compile_strategy(strategy: Strategy) -> Callable[[pd.DataFrame], pd.Series]:
"""Compile a :class:`Strategy` AST into a ``df -> Series[Side]`` callable.
@@ -218,7 +206,7 @@ def compile_strategy(strategy: Strategy) -> Callable[[pd.DataFrame], pd.Series]:
any_rule_seen = pd.Series(False, index=df.index)
for rule in strategy.rules:
match = _eval_node(rule.condition, df)
target = _action_to_side(rule.action)
target = _ACTION_TO_SIDE[rule.action]
valid = ~_isna_series(match)
any_rule_seen |= valid
match_bool = match.where(valid, False).astype(bool)
+23 -22
View File
@@ -1,26 +1,27 @@
from __future__ import annotations
VERBS: frozenset[str] = frozenset(
{
"entry-long",
"entry-short",
"exit",
"flat",
"when",
"and",
"or",
"not",
"gt",
"lt",
"eq",
"feature",
"indicator",
"crossover",
"crossunder",
}
# Grammatica JSON Schema (Phase 1, post S-expression refactor).
#
# Distinzione strutturale:
# * Nodi OPERATORE -> dict con chiave ``"op"`` (logici, comparatori, crossover)
# * Nodi LEAF -> dict con chiave ``"kind"`` (indicator, feature, literal)
# ``op`` e ``kind`` sono mutuamente esclusivi sullo stesso nodo.
LOGICAL_OPS: frozenset[str] = frozenset({"and", "or", "not"})
COMPARATOR_OPS: frozenset[str] = frozenset({"gt", "lt", "eq"})
CROSSOVER_OPS: frozenset[str] = frozenset({"crossover", "crossunder"})
ACTION_VALUES: frozenset[str] = frozenset(
{"entry-long", "entry-short", "exit", "flat"}
)
KIND_VALUES: frozenset[str] = frozenset({"indicator", "feature", "literal"})
KNOWN_INDICATORS: frozenset[str] = frozenset(
{"sma", "rsi", "atr", "macd", "realized_vol"}
)
KNOWN_FEATURES: frozenset[str] = frozenset(
{"open", "high", "low", "close", "volume"}
)
ACTION_VERBS: frozenset[str] = frozenset({"entry-long", "entry-short", "exit", "flat"})
LOGICAL_VERBS: frozenset[str] = frozenset({"and", "or", "not"})
COMPARATOR_VERBS: frozenset[str] = frozenset({"gt", "lt", "eq"})
DATA_VERBS: frozenset[str] = frozenset({"feature", "indicator", "crossover", "crossunder"})
# Convenience union (utile a validator / parser).
ALL_OPS: frozenset[str] = LOGICAL_OPS | COMPARATOR_OPS | CROSSOVER_OPS
+165 -58
View File
@@ -1,96 +1,203 @@
"""JSON-based parser per la strategia di trading (Phase 1).
L'AST è una piccola gerarchia di dataclass:
* :class:`Strategy` è il top-level (lista di :class:`Rule`).
* :class:`Rule` accoppia una condizione (Node) ad un'azione (str).
* :class:`Node` è un'unione: nodi operatore (:class:`OpNode`) e nodi leaf
(:class:`IndicatorNode`, :class:`FeatureNode`, :class:`LiteralNode`).
Convenzione di shape sui dict in input:
* Nodi operatore: ``{"op": "<name>", "args": [<node>, ...]}``.
* Nodi indicator: ``{"kind": "indicator", "name": "<name>", "params": [<num>, ...]}``.
* Nodi feature: ``{"kind": "feature", "name": "<name>"}``.
* Nodi literal: ``{"kind": "literal", "value": <number>}``.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any
import sexpdata # type: ignore[import-untyped]
from .grammar import ACTION_VERBS, VERBS
from .grammar import (
ACTION_VALUES,
ALL_OPS,
)
class ParseError(Exception):
"""Raised when an S-expression strategy cannot be parsed."""
"""Raised when a JSON strategy cannot be parsed into a valid AST."""
# ---------------------------------------------------------------------------
# Dataclass AST
# ---------------------------------------------------------------------------
@dataclass
class Node:
kind: str
args: list[Any] = field(default_factory=list)
class OpNode:
"""Operator node: logical / comparator / crossover."""
op: str
args: list[Node] = field(default_factory=list)
@dataclass
class IndicatorNode:
"""Leaf: indicatore tecnico calcolato sul dataframe OHLCV."""
name: str
params: list[float] = field(default_factory=list)
@dataclass
class FeatureNode:
"""Leaf: colonna OHLCV (open/high/low/close/volume)."""
name: str
@dataclass
class LiteralNode:
"""Leaf: costante numerica."""
value: float
Node = OpNode | IndicatorNode | FeatureNode | LiteralNode
@dataclass
class Rule:
kind: str # always "when"
condition: Node
action: Node
action: str
@dataclass
class Strategy:
kind: str # always "strategy"
rules: list[Rule]
def _to_node(token: Any) -> Node | float | int | str:
"""Convert a sexpdata token tree into a Node (or scalar leaf)."""
if isinstance(token, sexpdata.Symbol):
name = str(token.value())
# Bare symbols inside expressions (e.g. `rsi` in (indicator rsi 14))
# are kept as Node-with-no-args so callers can introspect uniformly.
return Node(kind=name, args=[])
if isinstance(token, list):
if not token:
raise ParseError("Empty s-expression")
head = token[0]
if not isinstance(head, sexpdata.Symbol):
raise ParseError(f"Non-symbol head: {head!r}")
name = str(head.value())
if name not in VERBS:
raise ParseError(f"Unknown verb: {name}")
return Node(kind=name, args=[_to_node(arg) for arg in token[1:]])
# numeric / string literals pass through unchanged
return token # type: ignore[no-any-return]
# ---------------------------------------------------------------------------
# Conversione dict -> Node
# ---------------------------------------------------------------------------
def _to_node(obj: Any) -> Node:
if not isinstance(obj, dict):
raise ParseError(f"Node must be a JSON object, got {type(obj).__name__}")
has_op = "op" in obj
has_kind = "kind" in obj
if has_op and has_kind:
raise ParseError(
"Node cannot define both 'op' and 'kind' (mutually exclusive)"
)
if not has_op and not has_kind:
raise ParseError("Node must define either 'op' or 'kind'")
if has_op:
op = obj["op"]
if not isinstance(op, str):
raise ParseError(f"'op' must be a string, got {type(op).__name__}")
if op not in ALL_OPS:
raise ParseError(f"Unknown op: {op!r}")
raw_args = obj.get("args")
if not isinstance(raw_args, list):
raise ParseError(f"Operator '{op}' missing 'args' list")
args = [_to_node(a) for a in raw_args]
return OpNode(op=op, args=args)
# leaf node
kind = obj["kind"]
if not isinstance(kind, str):
raise ParseError(f"'kind' must be a string, got {type(kind).__name__}")
if kind == "indicator":
name = obj.get("name")
if not isinstance(name, str):
raise ParseError("indicator node requires string 'name'")
raw_params = obj.get("params", [])
if not isinstance(raw_params, list):
raise ParseError("indicator 'params' must be a list")
params: list[float] = []
for p in raw_params:
if isinstance(p, bool) or not isinstance(p, (int, float)):
raise ParseError(
f"indicator '{name}' params accept only numbers, got {p!r}"
)
params.append(float(p))
return IndicatorNode(name=name, params=params)
if kind == "feature":
name = obj.get("name")
if not isinstance(name, str):
raise ParseError("feature node requires string 'name'")
return FeatureNode(name=name)
if kind == "literal":
if "value" not in obj:
raise ParseError("literal node requires 'value'")
value = obj["value"]
if isinstance(value, bool) or not isinstance(value, (int, float)):
raise ParseError(f"literal value must be numeric, got {value!r}")
return LiteralNode(value=float(value))
raise ParseError(f"Unknown leaf kind: {kind!r}")
# ---------------------------------------------------------------------------
# Top-level parser
# ---------------------------------------------------------------------------
def parse_strategy(src: str) -> Strategy:
"""Parse an S-expression strategy string into a Strategy AST.
"""Parse a JSON strategy string into a :class:`Strategy` AST.
The grammar is documented in :mod:`multi_swarm.protocol.grammar` and is
intentionally tiny (15 verbs). We delegate raw S-expr lexing to
:mod:`sexpdata`, then validate the verb set ourselves.
Lo schema atteso è::
{
"rules": [
{"condition": <node>, "action": "<action-string>"},
...
]
}
Raise :class:`ParseError` su JSON malformato o struttura inattesa.
"""
try:
parsed = sexpdata.loads(src)
except Exception as e: # sexpdata raises various exception types
raise ParseError(f"sexp parse error: {e}") from e
parsed = json.loads(src)
except json.JSONDecodeError as e:
raise ParseError(f"invalid JSON: {e}") from e
if not isinstance(parsed, list) or not parsed:
raise ParseError("Top-level must be (strategy ...)")
head = parsed[0]
if not isinstance(head, sexpdata.Symbol) or str(head.value()) != "strategy":
raise ParseError("Top-level must start with 'strategy'")
raw_rules = parsed[1:]
if not isinstance(parsed, dict):
raise ParseError("Top-level must be a JSON object with 'rules'")
if "rules" not in parsed:
raise ParseError("Top-level object must contain 'rules' key")
raw_rules = parsed["rules"]
if not isinstance(raw_rules, list):
raise ParseError("'rules' must be a list")
if not raw_rules:
raise ParseError("Strategy must contain at least one rule")
rules: list[Rule] = []
for raw in raw_rules:
if not isinstance(raw, list) or len(raw) != 3:
raise ParseError(f"Rule must be (when <cond> <action>): {raw!r}")
head_r = raw[0]
if not isinstance(head_r, sexpdata.Symbol) or str(head_r.value()) != "when":
raise ParseError(f"Rule must start with 'when': {raw!r}")
cond = _to_node(raw[1])
action = _to_node(raw[2])
if not isinstance(cond, Node):
raise ParseError(f"Condition must be a node: {cond!r}")
if not isinstance(action, Node):
raise ParseError(f"Action must be a node: {action!r}")
if action.kind not in ACTION_VERBS:
if not isinstance(raw, dict):
raise ParseError(f"Rule must be a JSON object, got {raw!r}")
if "condition" not in raw or "action" not in raw:
raise ParseError(
f"Action must be one of {sorted(ACTION_VERBS)}, got {action.kind!r}"
f"Rule must contain 'condition' and 'action' keys: {raw!r}"
)
rules.append(Rule(kind="when", condition=cond, action=action))
action = raw["action"]
if not isinstance(action, str):
raise ParseError(f"action must be a string, got {action!r}")
if action not in ACTION_VALUES:
raise ParseError(
f"action must be one of {sorted(ACTION_VALUES)}, got {action!r}"
)
cond = _to_node(raw["condition"])
rules.append(Rule(condition=cond, action=action))
return Strategy(kind="strategy", rules=rules)
return Strategy(rules=rules)
+79 -69
View File
@@ -1,20 +1,41 @@
"""Semantic validation for the JSON-based strategy AST.
Il parser garantisce già shape sintattica (op vs kind, struttura args/params,
tipi base). Qui si controllano vincoli semantici di Phase 1:
* Arity di operatori logici / comparatori / crossover.
* Whitelist indicator + arity dei params.
* Whitelist feature.
* Niente nesting di indicator (params puramente numerici, garantito già dal
parser ma ricontrollato esplicitamente per chiarezza).
"""
from __future__ import annotations
from .grammar import COMPARATOR_VERBS, LOGICAL_VERBS
from .parser import Node, Strategy
KNOWN_INDICATORS: frozenset[str] = frozenset({"sma", "rsi", "atr", "macd", "realized_vol"})
KNOWN_FEATURES: frozenset[str] = frozenset({"open", "high", "low", "close", "volume"})
from .grammar import (
COMPARATOR_OPS,
CROSSOVER_OPS,
KNOWN_FEATURES,
KNOWN_INDICATORS,
LOGICAL_OPS,
)
from .parser import (
FeatureNode,
IndicatorNode,
LiteralNode,
Node,
OpNode,
Strategy,
)
# Numero di parametri numerici accettati dopo il nome dell'indicatore.
# La tupla (min, max) include solo i numeri (gli argomenti di tipo Node sono
# proibiti dal compiler - gli indicatori non sono annidabili in Phase 1).
# (min, max) sui soli numeri. Indicatori non sono annidabili in Phase 1.
INDICATOR_ARITY: dict[str, tuple[int, int]] = {
"sma": (1, 1), # length
"rsi": (1, 1), # length
"atr": (1, 1), # length
"realized_vol": (1, 1), # window
"macd": (0, 3), # fast, slow, signal (tutti opzionali con default)
"macd": (0, 3), # fast, slow, signal (tutti opzionali)
}
@@ -23,77 +44,66 @@ class ValidationError(Exception):
def validate_strategy(strategy: Strategy) -> None:
"""Check semantic constraints on a parsed Strategy AST.
The parser already enforces verb-set membership; this pass adds:
* arity checks for logical/comparator/data verbs,
* known-indicator / known-feature whitelists.
"""
"""Walk every rule of the strategy and assert semantic constraints."""
for rule in strategy.rules:
_validate_node(rule.condition, _expect_bool=True)
_validate_node(rule.condition)
def _validate_node(node: Node, _expect_bool: bool) -> None:
if node.kind in LOGICAL_VERBS:
if node.kind == "not":
if len(node.args) != 1:
raise ValidationError(f"'not' needs 1 arg, got {len(node.args)}")
arg = node.args[0]
if isinstance(arg, Node):
_validate_node(arg, _expect_bool=True)
def _validate_node(node: Node) -> None:
if isinstance(node, OpNode):
_validate_op(node)
return
if isinstance(node, IndicatorNode):
_validate_indicator(node)
return
if isinstance(node, FeatureNode):
if node.name not in KNOWN_FEATURES:
raise ValidationError(f"unknown feature: {node.name}")
return
if isinstance(node, LiteralNode):
# parser ha già validato il tipo numerico
return
raise ValidationError(f"unexpected node type: {type(node).__name__}")
def _validate_op(node: OpNode) -> None:
op = node.op
n = len(node.args)
if op in LOGICAL_OPS:
if op == "not":
if n != 1:
raise ValidationError(f"'not' needs 1 arg, got {n}")
else:
if len(node.args) < 2:
raise ValidationError(f"'{node.kind}' needs >=2 args")
for a in node.args:
if isinstance(a, Node):
_validate_node(a, _expect_bool=True)
return
if node.kind in COMPARATOR_VERBS:
if len(node.args) != 2:
raise ValidationError(f"'{node.kind}' needs 2 args, got {len(node.args)}")
if n < 2:
raise ValidationError(f"'{op}' needs >=2 args, got {n}")
for a in node.args:
if isinstance(a, Node):
_validate_node(a, _expect_bool=False)
_validate_node(a)
return
if node.kind in {"crossover", "crossunder"}:
if len(node.args) != 2:
raise ValidationError(f"'{node.kind}' needs 2 args")
if op in COMPARATOR_OPS:
if n != 2:
raise ValidationError(f"'{op}' needs 2 args, got {n}")
for a in node.args:
if isinstance(a, Node):
_validate_node(a, _expect_bool=False)
_validate_node(a)
return
if node.kind == "indicator":
if len(node.args) < 1:
raise ValidationError("'indicator' needs >=1 args (name [, params...])")
name_node = node.args[0]
ind_name = name_node.kind if isinstance(name_node, Node) else str(name_node)
if ind_name not in KNOWN_INDICATORS:
raise ValidationError(f"unknown indicator: {ind_name}")
# Gli indicatori non accettano Node come params (no-nesting in Phase 1).
for a in node.args[1:]:
if isinstance(a, Node):
raise ValidationError(
f"indicator '{ind_name}' does not accept nested expressions; "
f"only numeric literals (got node {a.kind})"
)
n_params = len(node.args) - 1
min_p, max_p = INDICATOR_ARITY[ind_name]
if not (min_p <= n_params <= max_p):
raise ValidationError(
f"indicator '{ind_name}' arity {n_params} out of [{min_p},{max_p}]"
)
if op in CROSSOVER_OPS:
if n != 2:
raise ValidationError(f"'{op}' needs 2 args, got {n}")
for a in node.args:
_validate_node(a)
return
if node.kind == "feature":
if len(node.args) != 1:
raise ValidationError("'feature' needs 1 arg")
feat_node = node.args[0]
feat_name = feat_node.kind if isinstance(feat_node, Node) else str(feat_node)
if feat_name not in KNOWN_FEATURES:
raise ValidationError(f"unknown feature: {feat_name}")
return
raise ValidationError(f"unexpected op in expression: {op}")
raise ValidationError(f"unexpected node kind in expression: {node.kind}")
def _validate_indicator(node: IndicatorNode) -> None:
if node.name not in KNOWN_INDICATORS:
raise ValidationError(f"unknown indicator: {node.name}")
n_params = len(node.params)
min_p, max_p = INDICATOR_ARITY[node.name]
if not (min_p <= n_params <= max_p):
raise ValidationError(
f"indicator '{node.name}' arity {n_params} out of [{min_p},{max_p}]"
)