refactor(mcp_common): remove risk_guard, models, env_validation, storage

This commit is contained in:
AdrianoDev
2026-04-27 17:38:44 +02:00
parent e888fc373d
commit 888a3cde84
9 changed files with 1 additions and 571 deletions
@@ -1,71 +0,0 @@
"""CER-P5-010 env validation tests."""
from __future__ import annotations
import pytest
from mcp_common.env_validation import (
MissingEnvError,
fail_fast_if_missing,
optional_env,
require_env,
summarize,
)
def test_require_env_present(monkeypatch):
monkeypatch.setenv("FOO_KEY", "value1")
assert require_env("FOO_KEY") == "value1"
def test_require_env_missing_raises(monkeypatch):
monkeypatch.delenv("MISSING_REQ", raising=False)
with pytest.raises(MissingEnvError):
require_env("MISSING_REQ", "critical path")
def test_require_env_empty_raises(monkeypatch):
monkeypatch.setenv("EMPTY_REQ", "")
with pytest.raises(MissingEnvError):
require_env("EMPTY_REQ")
def test_require_env_whitespace_only_raises(monkeypatch):
monkeypatch.setenv("WS_REQ", " ")
with pytest.raises(MissingEnvError):
require_env("WS_REQ")
def test_optional_env_default(monkeypatch):
monkeypatch.delenv("OPT_A", raising=False)
assert optional_env("OPT_A", default="fallback") == "fallback"
def test_optional_env_set(monkeypatch):
monkeypatch.setenv("OPT_B", "xx")
assert optional_env("OPT_B", default="fallback") == "xx"
def test_fail_fast_all_present(monkeypatch):
monkeypatch.setenv("AA", "1")
monkeypatch.setenv("BB", "2")
fail_fast_if_missing(["AA", "BB"]) # no exit
def test_fail_fast_missing_exits(monkeypatch):
monkeypatch.setenv("HAVE_IT", "1")
monkeypatch.delenv("MISSING_X", raising=False)
with pytest.raises(SystemExit) as exc:
fail_fast_if_missing(["HAVE_IT", "MISSING_X"])
assert exc.value.code == 2
def test_summarize_does_not_leak_secrets(monkeypatch, caplog):
import logging
monkeypatch.setenv("API_KEY_FOO", "super-secret-token-123456")
monkeypatch.setenv("PORT", "9000")
with caplog.at_level(logging.INFO, logger="mcp_common.env_validation"):
summarize(["API_KEY_FOO", "PORT", "NOT_SET_XYZ"])
log_text = "\n".join(caplog.messages)
assert "super-secret-token-123456" not in log_text
assert "9000" in log_text
assert "<unset>" in log_text
-40
View File
@@ -1,40 +0,0 @@
from mcp_common.models import EventPriority, EventType, L2Entry
def test_l2_entry_minimal():
entry = L2Entry(
timestamp="2026-04-17T10:30:00Z",
setup="bull put spread ETH 1800/1750 14d",
tesi="IV alta post-CPI, attesa mean-reversion",
esito="aperto"
)
assert entry.scostamento_sigma is None
assert entry.tesi_check is None
def test_l2_entry_full():
entry = L2Entry(
timestamp="2026-04-17T10:30:00Z",
setup="bull put spread ETH 1800/1750 14d",
tesi="IV alta post-CPI",
tesi_check="ETH sopra 1820 per 24h con IV in calo",
invalidation="rottura 1800 con volume > 2x media",
esito="chiuso +12 USDC",
scostamento="nessuno",
scostamento_sigma=0.5,
lezione="supporto ha tenuto, timing ok",
sizing_note="size 80 USDC (ATR 1.3x media)",
)
assert entry.scostamento_sigma == 0.5
dump = entry.model_dump()
assert dump["lezione"] == "supporto ha tenuto, timing ok"
def test_event_priority_enum():
assert EventPriority.CRITICAL.value == "critical"
assert EventPriority.LOW < EventPriority.CRITICAL # ordering
def test_event_type_enum():
assert EventType.ALERT.value == "alert"
assert EventType.USER_INSTRUCTION.value == "user_instruction"
-80
View File
@@ -1,80 +0,0 @@
"""Tests for CER-016 risk guard."""
from __future__ import annotations
import pytest
from fastapi import HTTPException
from mcp_common.risk_guard import (
enforce_aggregate,
enforce_leverage,
enforce_single_notional,
max_aggregate,
max_leverage,
max_notional,
)
def test_defaults(monkeypatch):
for k in ("CERBERO_MAX_NOTIONAL", "CERBERO_MAX_AGGREGATE", "CERBERO_MAX_LEVERAGE"):
monkeypatch.delenv(k, raising=False)
assert max_notional() == 200.0
assert max_aggregate() == 1000.0
assert max_leverage() == 3
def test_env_override(monkeypatch):
monkeypatch.setenv("CERBERO_MAX_NOTIONAL", "50")
monkeypatch.setenv("CERBERO_MAX_AGGREGATE", "150")
monkeypatch.setenv("CERBERO_MAX_LEVERAGE", "2")
assert max_notional() == 50.0
assert max_aggregate() == 150.0
assert max_leverage() == 2
def test_leverage_default_when_none(monkeypatch):
monkeypatch.delenv("CERBERO_MAX_LEVERAGE", raising=False)
assert enforce_leverage(None) == 3
def test_leverage_accepts_within_cap(monkeypatch):
monkeypatch.delenv("CERBERO_MAX_LEVERAGE", raising=False)
assert enforce_leverage(2) == 2
assert enforce_leverage(3) == 3
def test_leverage_rejects_above_cap(monkeypatch):
monkeypatch.delenv("CERBERO_MAX_LEVERAGE", raising=False)
with pytest.raises(HTTPException) as exc:
enforce_leverage(50)
assert exc.value.status_code == 403
assert exc.value.detail["error"] == "HARD_PROHIBITION"
def test_leverage_rejects_below_one(monkeypatch):
with pytest.raises(HTTPException):
enforce_leverage(0)
def test_single_notional_ok(monkeypatch):
monkeypatch.delenv("CERBERO_MAX_NOTIONAL", raising=False)
enforce_single_notional(100.0, exchange="deribit", instrument="BTC-PERPETUAL")
def test_single_notional_rejects(monkeypatch):
monkeypatch.delenv("CERBERO_MAX_NOTIONAL", raising=False)
with pytest.raises(HTTPException) as exc:
enforce_single_notional(335.0, exchange="hyperliquid", instrument="ETH")
assert exc.value.status_code == 403
assert "335" in exc.value.detail["message"]
def test_aggregate_ok(monkeypatch):
monkeypatch.delenv("CERBERO_MAX_AGGREGATE", raising=False)
enforce_aggregate(current_total=500.0, new_notional=200.0)
def test_aggregate_rejects(monkeypatch):
monkeypatch.delenv("CERBERO_MAX_AGGREGATE", raising=False)
with pytest.raises(HTTPException) as exc:
enforce_aggregate(current_total=900.0, new_notional=200.0)
assert exc.value.status_code == 403
-48
View File
@@ -1,48 +0,0 @@
from pathlib import Path
from mcp_common.storage import Database, run_migrations
def test_database_creates_wal(tmp_path: Path):
db_path = tmp_path / "test.db"
db = Database(db_path)
db.connect()
# WAL mode attivo
mode = db.conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode.lower() == "wal"
db.close()
def test_database_migrations_run_once(tmp_path: Path):
db_path = tmp_path / "test.db"
db = Database(db_path)
db.connect()
migrations = {
1: "CREATE TABLE foo (id INTEGER PRIMARY KEY, name TEXT);",
2: "ALTER TABLE foo ADD COLUMN value INTEGER DEFAULT 0;",
}
run_migrations(db.conn, migrations)
# Second run: should be no-op
run_migrations(db.conn, migrations)
cols = [r[1] for r in db.conn.execute("PRAGMA table_info(foo)").fetchall()]
assert "name" in cols
assert "value" in cols
version = db.conn.execute("SELECT MAX(version) FROM _schema_version").fetchone()[0]
assert version == 2
db.close()
def test_database_partial_migration(tmp_path: Path):
db_path = tmp_path / "test.db"
db = Database(db_path)
db.connect()
migrations_v1 = {1: "CREATE TABLE foo (id INTEGER);"}
run_migrations(db.conn, migrations_v1)
migrations_v2 = {**migrations_v1, 2: "CREATE TABLE bar (id INTEGER);"}
run_migrations(db.conn, migrations_v2)
tables = {r[0] for r in db.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()}
assert "foo" in tables
assert "bar" in tables
db.close()