85 lines
2.3 KiB
Python
85 lines
2.3 KiB
Python
import pytest
|
|
from fastapi import Depends, FastAPI
|
|
from fastapi.testclient import TestClient
|
|
from option_mcp_common.auth import (
|
|
Principal,
|
|
TokenStore,
|
|
acl_requires,
|
|
require_principal,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def token_store():
|
|
return TokenStore(tokens={
|
|
"token-core-123": Principal(name="core", capabilities={"core"}),
|
|
"token-obs-456": Principal(name="observer", capabilities={"observer"}),
|
|
})
|
|
|
|
|
|
@pytest.fixture
|
|
def app(token_store):
|
|
app = FastAPI()
|
|
app.state.token_store = token_store
|
|
|
|
@app.get("/public")
|
|
def public():
|
|
return {"ok": True}
|
|
|
|
@app.get("/private")
|
|
def private(principal: Principal = Depends(require_principal)):
|
|
return {"name": principal.name}
|
|
|
|
@app.post("/core-only")
|
|
@acl_requires(core=True, observer=False)
|
|
def core_only(principal: Principal = Depends(require_principal)):
|
|
return {"who": principal.name}
|
|
|
|
@app.post("/observer-only")
|
|
@acl_requires(core=False, observer=True)
|
|
def observer_only(principal: Principal = Depends(require_principal)):
|
|
return {"who": principal.name}
|
|
|
|
return app
|
|
|
|
|
|
def test_public_endpoint_no_auth(app):
|
|
client = TestClient(app)
|
|
assert client.get("/public").status_code == 200
|
|
|
|
|
|
def test_private_without_header_401(app):
|
|
client = TestClient(app)
|
|
assert client.get("/private").status_code == 401
|
|
|
|
|
|
def test_private_bad_token_403(app):
|
|
client = TestClient(app)
|
|
r = client.get("/private", headers={"Authorization": "Bearer nope"})
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_private_good_token_200(app):
|
|
client = TestClient(app)
|
|
r = client.get("/private", headers={"Authorization": "Bearer token-core-123"})
|
|
assert r.status_code == 200
|
|
assert r.json() == {"name": "core"}
|
|
|
|
|
|
def test_acl_core_token_on_core_only_endpoint(app):
|
|
client = TestClient(app)
|
|
r = client.post("/core-only", headers={"Authorization": "Bearer token-core-123"})
|
|
assert r.status_code == 200
|
|
|
|
|
|
def test_acl_observer_on_core_only_rejected(app):
|
|
client = TestClient(app)
|
|
r = client.post("/core-only", headers={"Authorization": "Bearer token-obs-456"})
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_acl_observer_on_observer_only_ok(app):
|
|
client = TestClient(app)
|
|
r = client.post("/observer-only", headers={"Authorization": "Bearer token-obs-456"})
|
|
assert r.status_code == 200
|