feat: import 6 MCP services + common workspace
This commit is contained in:
@@ -0,0 +1,84 @@
|
||||
import pytest
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
from option_mcp_common.auth import (
|
||||
Principal,
|
||||
TokenStore,
|
||||
acl_requires,
|
||||
require_principal,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def token_store():
|
||||
return TokenStore(tokens={
|
||||
"token-core-123": Principal(name="core", capabilities={"core"}),
|
||||
"token-obs-456": Principal(name="observer", capabilities={"observer"}),
|
||||
})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app(token_store):
|
||||
app = FastAPI()
|
||||
app.state.token_store = token_store
|
||||
|
||||
@app.get("/public")
|
||||
def public():
|
||||
return {"ok": True}
|
||||
|
||||
@app.get("/private")
|
||||
def private(principal: Principal = Depends(require_principal)):
|
||||
return {"name": principal.name}
|
||||
|
||||
@app.post("/core-only")
|
||||
@acl_requires(core=True, observer=False)
|
||||
def core_only(principal: Principal = Depends(require_principal)):
|
||||
return {"who": principal.name}
|
||||
|
||||
@app.post("/observer-only")
|
||||
@acl_requires(core=False, observer=True)
|
||||
def observer_only(principal: Principal = Depends(require_principal)):
|
||||
return {"who": principal.name}
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def test_public_endpoint_no_auth(app):
|
||||
client = TestClient(app)
|
||||
assert client.get("/public").status_code == 200
|
||||
|
||||
|
||||
def test_private_without_header_401(app):
|
||||
client = TestClient(app)
|
||||
assert client.get("/private").status_code == 401
|
||||
|
||||
|
||||
def test_private_bad_token_403(app):
|
||||
client = TestClient(app)
|
||||
r = client.get("/private", headers={"Authorization": "Bearer nope"})
|
||||
assert r.status_code == 403
|
||||
|
||||
|
||||
def test_private_good_token_200(app):
|
||||
client = TestClient(app)
|
||||
r = client.get("/private", headers={"Authorization": "Bearer token-core-123"})
|
||||
assert r.status_code == 200
|
||||
assert r.json() == {"name": "core"}
|
||||
|
||||
|
||||
def test_acl_core_token_on_core_only_endpoint(app):
|
||||
client = TestClient(app)
|
||||
r = client.post("/core-only", headers={"Authorization": "Bearer token-core-123"})
|
||||
assert r.status_code == 200
|
||||
|
||||
|
||||
def test_acl_observer_on_core_only_rejected(app):
|
||||
client = TestClient(app)
|
||||
r = client.post("/core-only", headers={"Authorization": "Bearer token-obs-456"})
|
||||
assert r.status_code == 403
|
||||
|
||||
|
||||
def test_acl_observer_on_observer_only_ok(app):
|
||||
client = TestClient(app)
|
||||
r = client.post("/observer-only", headers={"Authorization": "Bearer token-obs-456"})
|
||||
assert r.status_code == 200
|
||||
@@ -0,0 +1,71 @@
|
||||
"""CER-P5-010 env validation tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from option_mcp_common.env_validation import (
|
||||
MissingEnvError,
|
||||
fail_fast_if_missing,
|
||||
optional_env,
|
||||
require_env,
|
||||
summarize,
|
||||
)
|
||||
|
||||
|
||||
def test_require_env_present(monkeypatch):
|
||||
monkeypatch.setenv("FOO_KEY", "value1")
|
||||
assert require_env("FOO_KEY") == "value1"
|
||||
|
||||
|
||||
def test_require_env_missing_raises(monkeypatch):
|
||||
monkeypatch.delenv("MISSING_REQ", raising=False)
|
||||
with pytest.raises(MissingEnvError):
|
||||
require_env("MISSING_REQ", "critical path")
|
||||
|
||||
|
||||
def test_require_env_empty_raises(monkeypatch):
|
||||
monkeypatch.setenv("EMPTY_REQ", "")
|
||||
with pytest.raises(MissingEnvError):
|
||||
require_env("EMPTY_REQ")
|
||||
|
||||
|
||||
def test_require_env_whitespace_only_raises(monkeypatch):
|
||||
monkeypatch.setenv("WS_REQ", " ")
|
||||
with pytest.raises(MissingEnvError):
|
||||
require_env("WS_REQ")
|
||||
|
||||
|
||||
def test_optional_env_default(monkeypatch):
|
||||
monkeypatch.delenv("OPT_A", raising=False)
|
||||
assert optional_env("OPT_A", default="fallback") == "fallback"
|
||||
|
||||
|
||||
def test_optional_env_set(monkeypatch):
|
||||
monkeypatch.setenv("OPT_B", "xx")
|
||||
assert optional_env("OPT_B", default="fallback") == "xx"
|
||||
|
||||
|
||||
def test_fail_fast_all_present(monkeypatch):
|
||||
monkeypatch.setenv("AA", "1")
|
||||
monkeypatch.setenv("BB", "2")
|
||||
fail_fast_if_missing(["AA", "BB"]) # no exit
|
||||
|
||||
|
||||
def test_fail_fast_missing_exits(monkeypatch):
|
||||
monkeypatch.setenv("HAVE_IT", "1")
|
||||
monkeypatch.delenv("MISSING_X", raising=False)
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
fail_fast_if_missing(["HAVE_IT", "MISSING_X"])
|
||||
assert exc.value.code == 2
|
||||
|
||||
|
||||
def test_summarize_does_not_leak_secrets(monkeypatch, caplog):
|
||||
import logging
|
||||
monkeypatch.setenv("API_KEY_FOO", "super-secret-token-123456")
|
||||
monkeypatch.setenv("PORT", "9000")
|
||||
with caplog.at_level(logging.INFO, logger="option_mcp_common.env_validation"):
|
||||
summarize(["API_KEY_FOO", "PORT", "NOT_SET_XYZ"])
|
||||
log_text = "\n".join(caplog.messages)
|
||||
assert "super-secret-token-123456" not in log_text
|
||||
assert "9000" in log_text
|
||||
assert "<unset>" in log_text
|
||||
@@ -0,0 +1,80 @@
|
||||
|
||||
from option_mcp_common.indicators import adx, atr, macd, rsi, sma
|
||||
|
||||
|
||||
def test_rsi_simple():
|
||||
closes = [44, 44.34, 44.09, 44.15, 43.61, 44.33, 44.83, 45.10, 45.42, 45.84,
|
||||
46.08, 45.89, 46.03, 45.61, 46.28]
|
||||
r = rsi(closes, period=14)
|
||||
assert r is not None
|
||||
# Known textbook RSI value ballpark
|
||||
assert 65.0 < r < 75.0
|
||||
|
||||
|
||||
def test_rsi_insufficient_data():
|
||||
assert rsi([1, 2, 3], period=14) is None
|
||||
|
||||
|
||||
def test_sma_simple():
|
||||
assert sma([1, 2, 3, 4, 5], period=5) == 3.0
|
||||
assert sma([1, 2, 3], period=5) is None
|
||||
|
||||
|
||||
def test_atr_simple():
|
||||
# highs, lows, closes
|
||||
highs = [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]
|
||||
lows = [ 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23]
|
||||
closes = [9.5,10.5,11.5,12.5,13.5,14.5,15.5,16.5,17.5,18.5,19.5,20.5,21.5,22.5,23.5]
|
||||
a = atr(highs, lows, closes, period=14)
|
||||
assert a is not None
|
||||
assert 0.9 < a <= 1.5
|
||||
|
||||
|
||||
def test_macd_trend_up():
|
||||
# monotonic uptrend → MACD > 0, histogram > 0
|
||||
closes = [float(i) for i in range(1, 60)]
|
||||
m = macd(closes)
|
||||
assert m["macd"] is not None
|
||||
assert m["signal"] is not None
|
||||
assert m["hist"] is not None
|
||||
assert m["macd"] > 0
|
||||
assert m["hist"] >= 0
|
||||
|
||||
|
||||
def test_macd_insufficient_data():
|
||||
m = macd([1.0, 2.0, 3.0])
|
||||
assert m == {"macd": None, "signal": None, "hist": None}
|
||||
|
||||
|
||||
def test_macd_trend_down():
|
||||
closes = [float(i) for i in range(60, 1, -1)]
|
||||
m = macd(closes)
|
||||
assert m["macd"] < 0
|
||||
assert m["hist"] <= 0
|
||||
|
||||
|
||||
def test_adx_insufficient_data():
|
||||
a = adx([1.0] * 10, [0.5] * 10, [0.7] * 10, period=14)
|
||||
assert a == {"adx": None, "+di": None, "-di": None}
|
||||
|
||||
|
||||
def test_adx_strong_uptrend():
|
||||
highs = [float(i) + 1.0 for i in range(1, 40)]
|
||||
lows = [float(i) for i in range(1, 40)]
|
||||
closes = [float(i) + 0.5 for i in range(1, 40)]
|
||||
a = adx(highs, lows, closes, period=14)
|
||||
assert a["adx"] is not None
|
||||
assert a["+di"] is not None and a["-di"] is not None
|
||||
# strong uptrend → +DI >> -DI, ADX high
|
||||
assert a["+di"] > a["-di"]
|
||||
assert a["adx"] > 50.0
|
||||
|
||||
|
||||
def test_adx_flat_market():
|
||||
highs = [10.0] * 40
|
||||
lows = [9.0] * 40
|
||||
closes = [9.5] * 40
|
||||
a = adx(highs, lows, closes, period=14)
|
||||
# no directional movement → ADX near 0
|
||||
assert a["adx"] is not None
|
||||
assert a["adx"] < 5.0
|
||||
@@ -0,0 +1,77 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
from option_mcp_common.logging import (
|
||||
SecretsFilter,
|
||||
configure_root_logging,
|
||||
get_json_logger,
|
||||
)
|
||||
|
||||
|
||||
def test_secrets_filter_masks_bearer():
|
||||
f = SecretsFilter()
|
||||
rec = logging.LogRecord(
|
||||
name="t", level=logging.INFO, pathname="", lineno=0,
|
||||
msg="Got Bearer abcdef123456 from client",
|
||||
args=(), exc_info=None,
|
||||
)
|
||||
f.filter(rec)
|
||||
assert "abcdef" not in rec.msg
|
||||
assert "***" in rec.msg
|
||||
|
||||
|
||||
def test_secrets_filter_masks_api_key_json():
|
||||
f = SecretsFilter()
|
||||
rec = logging.LogRecord(
|
||||
name="t", level=logging.INFO, pathname="", lineno=0,
|
||||
msg='{"api_key": "sk-live-abc123xyz"}',
|
||||
args=(), exc_info=None,
|
||||
)
|
||||
f.filter(rec)
|
||||
assert "sk-live-abc123xyz" not in rec.msg
|
||||
|
||||
|
||||
def test_json_logger_outputs_json(capsys):
|
||||
logger = get_json_logger("test")
|
||||
logger.info("hello", extra={"user_id": 42})
|
||||
captured = capsys.readouterr()
|
||||
# output is on stderr by default for json logger
|
||||
line = (captured.err or captured.out).strip().splitlines()[-1]
|
||||
data = json.loads(line)
|
||||
assert data["message"] == "hello"
|
||||
assert data["user_id"] == 42
|
||||
|
||||
|
||||
def test_configure_root_json_format(monkeypatch, capsys):
|
||||
monkeypatch.setenv("LOG_FORMAT", "json")
|
||||
monkeypatch.setenv("LOG_LEVEL", "INFO")
|
||||
configure_root_logging()
|
||||
logging.info("root json test")
|
||||
line = capsys.readouterr().err.strip().splitlines()[-1]
|
||||
data = json.loads(line)
|
||||
assert data["message"] == "root json test"
|
||||
assert data["levelname"] == "INFO"
|
||||
|
||||
|
||||
def test_configure_root_text_format(monkeypatch, capsys):
|
||||
monkeypatch.setenv("LOG_FORMAT", "text")
|
||||
configure_root_logging()
|
||||
logging.info("root text test")
|
||||
line = capsys.readouterr().err.strip().splitlines()[-1]
|
||||
# text format non è JSON parseable
|
||||
try:
|
||||
json.loads(line)
|
||||
raise AssertionError("expected text format, got JSON")
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
assert "root text test" in line
|
||||
|
||||
|
||||
def test_configure_root_applies_secrets_filter(monkeypatch, capsys):
|
||||
monkeypatch.setenv("LOG_FORMAT", "json")
|
||||
configure_root_logging()
|
||||
logging.info("calling with Bearer sk-live-leak123456 token")
|
||||
line = capsys.readouterr().err.strip().splitlines()[-1]
|
||||
data = json.loads(line)
|
||||
assert "sk-live-leak123456" not in data["message"]
|
||||
assert "***" in data["message"]
|
||||
@@ -0,0 +1,112 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
from option_mcp_common.auth import Principal, TokenStore, require_principal
|
||||
from option_mcp_common.mcp_bridge import _derive_input_schemas, mount_mcp_endpoint
|
||||
from option_mcp_common.server import build_app
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class EchoBody(BaseModel):
|
||||
msg: str
|
||||
n: int = 1
|
||||
|
||||
|
||||
def _make_app() -> tuple[FastAPI, TokenStore]:
|
||||
store = TokenStore(tokens={"t": Principal("obs", {"observer"})})
|
||||
app = build_app(name="t", version="v", token_store=store)
|
||||
|
||||
@app.post("/tools/echo")
|
||||
def echo(body: EchoBody, principal: Principal = Depends(require_principal)):
|
||||
return {"echo": body.msg, "n": body.n}
|
||||
|
||||
@app.post("/tools/ping")
|
||||
def ping(principal: Principal = Depends(require_principal)):
|
||||
return {"pong": True}
|
||||
|
||||
return app, store
|
||||
|
||||
|
||||
def test_derive_input_schemas_resolves_lazy_annotations():
|
||||
app, _ = _make_app()
|
||||
schemas = _derive_input_schemas(app, ["echo", "ping"])
|
||||
assert "echo" in schemas
|
||||
echo_schema = schemas["echo"]
|
||||
assert echo_schema["type"] == "object"
|
||||
assert "msg" in echo_schema["properties"]
|
||||
assert "n" in echo_schema["properties"]
|
||||
assert "msg" in echo_schema["required"]
|
||||
# ping has no Pydantic body → not in map (fallback applied by caller)
|
||||
assert "ping" not in schemas
|
||||
|
||||
|
||||
def test_mount_mcp_endpoint_exposes_derived_schemas():
|
||||
app, store = _make_app()
|
||||
mount_mcp_endpoint(
|
||||
app,
|
||||
name="test",
|
||||
version="1.0",
|
||||
token_store=store,
|
||||
internal_base_url="http://localhost:0",
|
||||
tools=[
|
||||
{"name": "echo", "description": "Echo a message."},
|
||||
{"name": "ping", "description": "Ping."},
|
||||
],
|
||||
)
|
||||
c = TestClient(app)
|
||||
r = c.post(
|
||||
"/mcp",
|
||||
headers={"Authorization": "Bearer t"},
|
||||
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
tools = r.json()["result"]["tools"]
|
||||
by_name = {t["name"]: t for t in tools}
|
||||
assert set(by_name["echo"]["inputSchema"]["required"]) == {"msg"}
|
||||
# ping fallback su schema generico
|
||||
assert by_name["ping"]["inputSchema"] == {
|
||||
"type": "object",
|
||||
"additionalProperties": True,
|
||||
}
|
||||
|
||||
|
||||
def test_mount_mcp_endpoint_requires_auth():
|
||||
app, store = _make_app()
|
||||
mount_mcp_endpoint(
|
||||
app,
|
||||
name="test",
|
||||
version="1.0",
|
||||
token_store=store,
|
||||
internal_base_url="http://localhost:0",
|
||||
tools=[{"name": "echo"}],
|
||||
)
|
||||
c = TestClient(app)
|
||||
r = c.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"})
|
||||
assert r.status_code == 401
|
||||
r = c.post(
|
||||
"/mcp",
|
||||
headers={"Authorization": "Bearer WRONG"},
|
||||
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
|
||||
)
|
||||
assert r.status_code == 403
|
||||
|
||||
|
||||
def test_explicit_input_schema_overrides_derived():
|
||||
app, store = _make_app()
|
||||
custom = {"type": "object", "properties": {"custom": {"type": "string"}}, "required": ["custom"]}
|
||||
mount_mcp_endpoint(
|
||||
app,
|
||||
name="test",
|
||||
version="1.0",
|
||||
token_store=store,
|
||||
internal_base_url="http://localhost:0",
|
||||
tools=[{"name": "echo", "input_schema": custom}],
|
||||
)
|
||||
c = TestClient(app)
|
||||
r = c.post(
|
||||
"/mcp",
|
||||
headers={"Authorization": "Bearer t"},
|
||||
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
|
||||
)
|
||||
assert r.json()["result"]["tools"][0]["inputSchema"] == custom
|
||||
@@ -0,0 +1,40 @@
|
||||
from option_mcp_common.models import EventPriority, EventType, L2Entry
|
||||
|
||||
|
||||
def test_l2_entry_minimal():
|
||||
entry = L2Entry(
|
||||
timestamp="2026-04-17T10:30:00Z",
|
||||
setup="bull put spread ETH 1800/1750 14d",
|
||||
tesi="IV alta post-CPI, attesa mean-reversion",
|
||||
esito="aperto"
|
||||
)
|
||||
assert entry.scostamento_sigma is None
|
||||
assert entry.tesi_check is None
|
||||
|
||||
|
||||
def test_l2_entry_full():
|
||||
entry = L2Entry(
|
||||
timestamp="2026-04-17T10:30:00Z",
|
||||
setup="bull put spread ETH 1800/1750 14d",
|
||||
tesi="IV alta post-CPI",
|
||||
tesi_check="ETH sopra 1820 per 24h con IV in calo",
|
||||
invalidation="rottura 1800 con volume > 2x media",
|
||||
esito="chiuso +12 USDC",
|
||||
scostamento="nessuno",
|
||||
scostamento_sigma=0.5,
|
||||
lezione="supporto ha tenuto, timing ok",
|
||||
sizing_note="size 80 USDC (ATR 1.3x media)",
|
||||
)
|
||||
assert entry.scostamento_sigma == 0.5
|
||||
dump = entry.model_dump()
|
||||
assert dump["lezione"] == "supporto ha tenuto, timing ok"
|
||||
|
||||
|
||||
def test_event_priority_enum():
|
||||
assert EventPriority.CRITICAL.value == "critical"
|
||||
assert EventPriority.LOW < EventPriority.CRITICAL # ordering
|
||||
|
||||
|
||||
def test_event_type_enum():
|
||||
assert EventType.ALERT.value == "alert"
|
||||
assert EventType.USER_INSTRUCTION.value == "user_instruction"
|
||||
@@ -0,0 +1,80 @@
|
||||
"""Tests for CER-016 risk guard."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
from option_mcp_common.risk_guard import (
|
||||
enforce_aggregate,
|
||||
enforce_leverage,
|
||||
enforce_single_notional,
|
||||
max_aggregate,
|
||||
max_leverage,
|
||||
max_notional,
|
||||
)
|
||||
|
||||
|
||||
def test_defaults(monkeypatch):
|
||||
for k in ("CERBERO_MAX_NOTIONAL", "CERBERO_MAX_AGGREGATE", "CERBERO_MAX_LEVERAGE"):
|
||||
monkeypatch.delenv(k, raising=False)
|
||||
assert max_notional() == 200.0
|
||||
assert max_aggregate() == 1000.0
|
||||
assert max_leverage() == 3
|
||||
|
||||
|
||||
def test_env_override(monkeypatch):
|
||||
monkeypatch.setenv("CERBERO_MAX_NOTIONAL", "50")
|
||||
monkeypatch.setenv("CERBERO_MAX_AGGREGATE", "150")
|
||||
monkeypatch.setenv("CERBERO_MAX_LEVERAGE", "2")
|
||||
assert max_notional() == 50.0
|
||||
assert max_aggregate() == 150.0
|
||||
assert max_leverage() == 2
|
||||
|
||||
|
||||
def test_leverage_default_when_none(monkeypatch):
|
||||
monkeypatch.delenv("CERBERO_MAX_LEVERAGE", raising=False)
|
||||
assert enforce_leverage(None) == 3
|
||||
|
||||
|
||||
def test_leverage_accepts_within_cap(monkeypatch):
|
||||
monkeypatch.delenv("CERBERO_MAX_LEVERAGE", raising=False)
|
||||
assert enforce_leverage(2) == 2
|
||||
assert enforce_leverage(3) == 3
|
||||
|
||||
|
||||
def test_leverage_rejects_above_cap(monkeypatch):
|
||||
monkeypatch.delenv("CERBERO_MAX_LEVERAGE", raising=False)
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
enforce_leverage(50)
|
||||
assert exc.value.status_code == 403
|
||||
assert exc.value.detail["error"] == "HARD_PROHIBITION"
|
||||
|
||||
|
||||
def test_leverage_rejects_below_one(monkeypatch):
|
||||
with pytest.raises(HTTPException):
|
||||
enforce_leverage(0)
|
||||
|
||||
|
||||
def test_single_notional_ok(monkeypatch):
|
||||
monkeypatch.delenv("CERBERO_MAX_NOTIONAL", raising=False)
|
||||
enforce_single_notional(100.0, exchange="deribit", instrument="BTC-PERPETUAL")
|
||||
|
||||
|
||||
def test_single_notional_rejects(monkeypatch):
|
||||
monkeypatch.delenv("CERBERO_MAX_NOTIONAL", raising=False)
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
enforce_single_notional(335.0, exchange="hyperliquid", instrument="ETH")
|
||||
assert exc.value.status_code == 403
|
||||
assert "335" in exc.value.detail["message"]
|
||||
|
||||
|
||||
def test_aggregate_ok(monkeypatch):
|
||||
monkeypatch.delenv("CERBERO_MAX_AGGREGATE", raising=False)
|
||||
enforce_aggregate(current_total=500.0, new_notional=200.0)
|
||||
|
||||
|
||||
def test_aggregate_rejects(monkeypatch):
|
||||
monkeypatch.delenv("CERBERO_MAX_AGGREGATE", raising=False)
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
enforce_aggregate(current_total=900.0, new_notional=200.0)
|
||||
assert exc.value.status_code == 403
|
||||
@@ -0,0 +1,90 @@
|
||||
from fastapi.testclient import TestClient
|
||||
from option_mcp_common.auth import Principal, TokenStore
|
||||
from option_mcp_common.server import build_app
|
||||
|
||||
|
||||
def test_build_app_health():
|
||||
store = TokenStore(tokens={})
|
||||
app = build_app(name="test-mcp", version="0.0.1", token_store=store)
|
||||
client = TestClient(app)
|
||||
r = client.get("/health")
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["status"] == "healthy"
|
||||
assert body["name"] == "test-mcp"
|
||||
assert body["version"] == "0.0.1"
|
||||
assert "uptime_seconds" in body
|
||||
assert "data_timestamp" in body
|
||||
assert r.headers.get("X-Duration-Ms") is not None
|
||||
|
||||
|
||||
def test_build_app_adds_token_store():
|
||||
store = TokenStore(tokens={"t1": Principal("x", {"core"})})
|
||||
app = build_app(name="t", version="v", token_store=store)
|
||||
assert app.state.token_store is store
|
||||
|
||||
|
||||
def test_timestamp_injector_dict_response():
|
||||
"""CER-P5-001: dict response gets data_timestamp + X-Data-Timestamp header."""
|
||||
store = TokenStore(tokens={})
|
||||
app = build_app(name="t", version="v", token_store=store)
|
||||
|
||||
@app.post("/tools/foo")
|
||||
def foo():
|
||||
return {"ok": True}
|
||||
|
||||
client = TestClient(app)
|
||||
r = client.post("/tools/foo")
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["ok"] is True
|
||||
assert "data_timestamp" in body
|
||||
assert r.headers.get("X-Data-Timestamp") is not None
|
||||
|
||||
|
||||
def test_timestamp_injector_list_of_dicts():
|
||||
"""CER-P5-001: list of dicts → each item gets data_timestamp."""
|
||||
store = TokenStore(tokens={})
|
||||
app = build_app(name="t", version="v", token_store=store)
|
||||
|
||||
@app.post("/tools/list_items")
|
||||
def list_items():
|
||||
return [{"x": 1}, {"x": 2}]
|
||||
|
||||
client = TestClient(app)
|
||||
r = client.post("/tools/list_items")
|
||||
body = r.json()
|
||||
assert isinstance(body, list)
|
||||
assert len(body) == 2
|
||||
for item in body:
|
||||
assert "data_timestamp" in item
|
||||
assert r.headers.get("X-Data-Timestamp") is not None
|
||||
|
||||
|
||||
def test_timestamp_injector_preserves_existing():
|
||||
"""CER-P5-001: se già presente, non override."""
|
||||
store = TokenStore(tokens={})
|
||||
app = build_app(name="t", version="v", token_store=store)
|
||||
|
||||
@app.post("/tools/already")
|
||||
def already():
|
||||
return {"data_timestamp": "2020-01-01T00:00:00Z", "x": 1}
|
||||
|
||||
client = TestClient(app)
|
||||
body = client.post("/tools/already").json()
|
||||
assert body["data_timestamp"] == "2020-01-01T00:00:00Z"
|
||||
|
||||
|
||||
def test_timestamp_injector_empty_list_gets_header_only():
|
||||
"""CER-P5-001: list vuota — no body modification, ma header presente."""
|
||||
store = TokenStore(tokens={})
|
||||
app = build_app(name="t", version="v", token_store=store)
|
||||
|
||||
@app.post("/tools/empty_list")
|
||||
def empty_list():
|
||||
return []
|
||||
|
||||
client = TestClient(app)
|
||||
r = client.post("/tools/empty_list")
|
||||
assert r.json() == []
|
||||
assert r.headers.get("X-Data-Timestamp") is not None
|
||||
@@ -0,0 +1,48 @@
|
||||
from pathlib import Path
|
||||
|
||||
from option_mcp_common.storage import Database, run_migrations
|
||||
|
||||
|
||||
def test_database_creates_wal(tmp_path: Path):
|
||||
db_path = tmp_path / "test.db"
|
||||
db = Database(db_path)
|
||||
db.connect()
|
||||
# WAL mode attivo
|
||||
mode = db.conn.execute("PRAGMA journal_mode").fetchone()[0]
|
||||
assert mode.lower() == "wal"
|
||||
db.close()
|
||||
|
||||
|
||||
def test_database_migrations_run_once(tmp_path: Path):
|
||||
db_path = tmp_path / "test.db"
|
||||
db = Database(db_path)
|
||||
db.connect()
|
||||
migrations = {
|
||||
1: "CREATE TABLE foo (id INTEGER PRIMARY KEY, name TEXT);",
|
||||
2: "ALTER TABLE foo ADD COLUMN value INTEGER DEFAULT 0;",
|
||||
}
|
||||
run_migrations(db.conn, migrations)
|
||||
# Second run: should be no-op
|
||||
run_migrations(db.conn, migrations)
|
||||
cols = [r[1] for r in db.conn.execute("PRAGMA table_info(foo)").fetchall()]
|
||||
assert "name" in cols
|
||||
assert "value" in cols
|
||||
version = db.conn.execute("SELECT MAX(version) FROM _schema_version").fetchone()[0]
|
||||
assert version == 2
|
||||
db.close()
|
||||
|
||||
|
||||
def test_database_partial_migration(tmp_path: Path):
|
||||
db_path = tmp_path / "test.db"
|
||||
db = Database(db_path)
|
||||
db.connect()
|
||||
migrations_v1 = {1: "CREATE TABLE foo (id INTEGER);"}
|
||||
run_migrations(db.conn, migrations_v1)
|
||||
migrations_v2 = {**migrations_v1, 2: "CREATE TABLE bar (id INTEGER);"}
|
||||
run_migrations(db.conn, migrations_v2)
|
||||
tables = {r[0] for r in db.conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table'"
|
||||
).fetchall()}
|
||||
assert "foo" in tables
|
||||
assert "bar" in tables
|
||||
db.close()
|
||||
Reference in New Issue
Block a user