7b169fb8db
Layout stile CerberoSuite/Cerbero: - services/ per MCP workspace members (futuri: mcp-docugen, mcp-convert, mcp-inbox) - docs/ flat con design+implementation per componente - scripts/, secrets/ placeholder - pyproject.toml root (uv workspace vuoto + ruff + pytest) - .gitignore, .env.example, .mcp.json.example, README.md Rinominate le doc: Docs/<comp>/2026-04-21-*.md -> docs/<comp>-*.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
3074 lines
90 KiB
Markdown
3074 lines
90 KiB
Markdown
# DocuGen MCP — Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Build a standalone MCP server (`docugen-mcp`) that exposes 6 tools for template CRUD + document generation, using OpenRouter as LLM provider, deployed on Docker on an external VPS.
|
|
|
|
**Architecture:** FastMCP as ASGI root (Streamable HTTP on `/mcp`) with a mounted FastAPI sub-app for static asset serving (`/assets/`, `/generated/`) and health. Filesystem storage for templates; SQLite for generation metrics and ephemeral asset TTL index. Single process, single container.
|
|
|
|
**Tech Stack:** Python 3.11+, uv, Pydantic v2, Pydantic Settings, `mcp` SDK (FastMCP), FastAPI, httpx, aiosqlite, PyYAML, Pillow, pytest + pytest-asyncio + respx, Docker + docker-compose.
|
|
|
|
**Reference spec:** `2026-04-21-docugen-mcp-design.md` (same directory).
|
|
|
|
---
|
|
|
|
## File Structure
|
|
|
|
```
|
|
docugen-mcp/
|
|
├── src/
|
|
│ └── docugen_mcp/
|
|
│ ├── __init__.py
|
|
│ ├── main.py # ASGI bootstrap (FastMCP + FastAPI mount)
|
|
│ ├── config.py # Pydantic Settings
|
|
│ ├── models.py # Shared Pydantic models
|
|
│ ├── auth.py # API key middleware
|
|
│ ├── template_store.py # Filesystem CRUD templates
|
|
│ ├── llm_client.py # OpenRouter wrapper (async, retry)
|
|
│ ├── generation_store.py # SQLite: generations + ephemeral_assets
|
|
│ ├── renderer.py # Orchestrator: validate, prompt, call LLM
|
|
│ ├── mcp_tools.py # FastMCP tool definitions
|
|
│ └── http_routes.py # FastAPI sub-app (assets, generated, health)
|
|
├── tests/
|
|
│ ├── __init__.py
|
|
│ ├── conftest.py # Fixtures
|
|
│ ├── unit/
|
|
│ │ ├── __init__.py
|
|
│ │ ├── test_config.py
|
|
│ │ ├── test_models.py
|
|
│ │ ├── test_auth.py
|
|
│ │ ├── test_template_store.py
|
|
│ │ ├── test_llm_client.py
|
|
│ │ ├── test_generation_store.py
|
|
│ │ └── test_renderer.py
|
|
│ └── integration/
|
|
│ ├── __init__.py
|
|
│ ├── test_mcp_tools.py
|
|
│ ├── test_http_routes.py
|
|
│ └── test_generate_flow.py
|
|
├── .env.example
|
|
├── .gitignore
|
|
├── .python-version
|
|
├── pyproject.toml
|
|
├── Dockerfile
|
|
├── docker-compose.yml
|
|
└── README.md
|
|
```
|
|
|
|
---
|
|
|
|
## Task 0: Bootstrap project
|
|
|
|
**Files:**
|
|
- Create: `docugen-mcp/.gitignore`
|
|
- Create: `docugen-mcp/.python-version`
|
|
- Create: `docugen-mcp/pyproject.toml`
|
|
- Create: `docugen-mcp/README.md` (stub)
|
|
- Create: `docugen-mcp/src/docugen_mcp/__init__.py`
|
|
- Create: `docugen-mcp/tests/__init__.py`
|
|
- Create: `docugen-mcp/tests/unit/__init__.py`
|
|
- Create: `docugen-mcp/tests/integration/__init__.py`
|
|
|
|
- [ ] **Step 1: Create directory structure**
|
|
|
|
```bash
|
|
mkdir -p docugen-mcp/src/docugen_mcp
|
|
mkdir -p docugen-mcp/tests/unit
|
|
mkdir -p docugen-mcp/tests/integration
|
|
cd docugen-mcp
|
|
git init
|
|
```
|
|
|
|
- [ ] **Step 2: Create `.python-version`**
|
|
|
|
```
|
|
3.11
|
|
```
|
|
|
|
- [ ] **Step 3: Create `.gitignore`**
|
|
|
|
```
|
|
__pycache__/
|
|
*.py[cod]
|
|
*.egg-info/
|
|
.venv/
|
|
.env
|
|
data/
|
|
!data/.gitkeep
|
|
.pytest_cache/
|
|
.coverage
|
|
htmlcov/
|
|
dist/
|
|
build/
|
|
*.db
|
|
*.db-journal
|
|
.DS_Store
|
|
```
|
|
|
|
- [ ] **Step 4: Create `pyproject.toml`**
|
|
|
|
```toml
|
|
[project]
|
|
name = "docugen-mcp"
|
|
version = "0.1.0"
|
|
description = "MCP server for document generation from Markdown templates via OpenRouter"
|
|
requires-python = ">=3.11"
|
|
dependencies = [
|
|
"mcp>=1.2",
|
|
"fastapi>=0.115",
|
|
"uvicorn[standard]>=0.34",
|
|
"pydantic>=2.0",
|
|
"pydantic-settings>=2.0",
|
|
"httpx>=0.27",
|
|
"aiosqlite>=0.20",
|
|
"pyyaml>=6.0",
|
|
"pillow>=11.0",
|
|
"python-multipart>=0.0.9",
|
|
]
|
|
|
|
[project.optional-dependencies]
|
|
dev = [
|
|
"pytest>=8.0",
|
|
"pytest-asyncio>=0.24",
|
|
"respx>=0.21",
|
|
"pytest-cov>=5.0",
|
|
]
|
|
|
|
[project.scripts]
|
|
docugen-mcp = "docugen_mcp.main:run"
|
|
|
|
[tool.pytest.ini_options]
|
|
asyncio_mode = "auto"
|
|
testpaths = ["tests"]
|
|
|
|
[tool.coverage.run]
|
|
source = ["src/docugen_mcp"]
|
|
omit = ["src/docugen_mcp/main.py"]
|
|
|
|
[build-system]
|
|
requires = ["hatchling"]
|
|
build-backend = "hatchling.build"
|
|
|
|
[tool.hatch.build.targets.wheel]
|
|
packages = ["src/docugen_mcp"]
|
|
```
|
|
|
|
- [ ] **Step 5: Create stub files**
|
|
|
|
`src/docugen_mcp/__init__.py`:
|
|
```python
|
|
__version__ = "0.1.0"
|
|
```
|
|
|
|
`tests/__init__.py`, `tests/unit/__init__.py`, `tests/integration/__init__.py`: empty files.
|
|
|
|
`README.md`:
|
|
```markdown
|
|
# docugen-mcp
|
|
|
|
MCP server for document generation from Markdown templates via OpenRouter.
|
|
|
|
See design spec: `../2026-04-21-docugen-mcp-design.md`.
|
|
|
|
## Setup
|
|
|
|
```bash
|
|
uv sync --all-extras
|
|
cp .env.example .env # fill in API_KEY, OPENROUTER_API_KEY, PUBLIC_BASE_URL
|
|
uv run docugen-mcp
|
|
```
|
|
|
|
## Test
|
|
|
|
```bash
|
|
uv run pytest
|
|
```
|
|
```
|
|
|
|
- [ ] **Step 6: Install dependencies and verify**
|
|
|
|
```bash
|
|
uv sync --all-extras
|
|
```
|
|
|
|
Expected: `.venv` created, `uv.lock` generated. No errors.
|
|
|
|
- [ ] **Step 7: Initial commit**
|
|
|
|
```bash
|
|
git add .gitignore .python-version pyproject.toml uv.lock README.md src/ tests/
|
|
git commit -m "chore: bootstrap docugen-mcp project"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 1: Config module (Pydantic Settings)
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/config.py`
|
|
- Create: `tests/unit/test_config.py`
|
|
- Create: `.env.example`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/unit/test_config.py`:
|
|
```python
|
|
import pytest
|
|
from pydantic import ValidationError
|
|
|
|
from docugen_mcp.config import Settings
|
|
|
|
|
|
def test_settings_loads_from_env(monkeypatch, tmp_path):
|
|
monkeypatch.setenv("API_KEY", "test-api-key")
|
|
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-test")
|
|
monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com")
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
|
|
settings = Settings()
|
|
|
|
assert settings.api_key == "test-api-key"
|
|
assert settings.openrouter_api_key == "sk-or-test"
|
|
assert settings.public_base_url == "https://mcp.example.com"
|
|
assert settings.data_dir == tmp_path
|
|
assert settings.llm_model_default == "anthropic/claude-sonnet-4"
|
|
assert settings.asset_ttl_days == 30
|
|
assert settings.max_image_size_mb == 10
|
|
assert settings.llm_timeout_seconds == 60
|
|
assert settings.openrouter_base_url == "https://openrouter.ai/api/v1"
|
|
|
|
|
|
def test_settings_missing_required_fails(monkeypatch, tmp_path):
|
|
monkeypatch.delenv("API_KEY", raising=False)
|
|
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
|
|
monkeypatch.delenv("PUBLIC_BASE_URL", raising=False)
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
|
|
with pytest.raises(ValidationError):
|
|
Settings()
|
|
|
|
|
|
def test_public_base_url_strips_trailing_slash(monkeypatch, tmp_path):
|
|
monkeypatch.setenv("API_KEY", "k")
|
|
monkeypatch.setenv("OPENROUTER_API_KEY", "sk")
|
|
monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com/")
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
|
|
settings = Settings()
|
|
|
|
assert settings.public_base_url == "https://mcp.example.com"
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_config.py -v
|
|
```
|
|
Expected: FAIL — `docugen_mcp.config` does not exist.
|
|
|
|
- [ ] **Step 3: Implement config**
|
|
|
|
`src/docugen_mcp/config.py`:
|
|
```python
|
|
from pathlib import Path
|
|
|
|
from pydantic import Field, field_validator
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
|
|
|
|
api_key: str = Field(..., min_length=8)
|
|
openrouter_api_key: str = Field(..., min_length=8)
|
|
openrouter_base_url: str = "https://openrouter.ai/api/v1"
|
|
llm_model_default: str = "anthropic/claude-sonnet-4"
|
|
public_base_url: str = Field(...)
|
|
data_dir: Path = Path("/data")
|
|
asset_ttl_days: int = 30
|
|
max_image_size_mb: int = 10
|
|
llm_timeout_seconds: int = 60
|
|
|
|
@field_validator("public_base_url")
|
|
@classmethod
|
|
def strip_trailing_slash(cls, v: str) -> str:
|
|
return v.rstrip("/")
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_config.py -v
|
|
```
|
|
Expected: 3 PASSED.
|
|
|
|
- [ ] **Step 5: Create `.env.example`**
|
|
|
|
```
|
|
# Authentication
|
|
API_KEY=
|
|
|
|
# OpenRouter
|
|
OPENROUTER_API_KEY=
|
|
OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
|
|
LLM_MODEL_DEFAULT=anthropic/claude-sonnet-4
|
|
|
|
# Public URL (used to build asset links in generated Markdown)
|
|
PUBLIC_BASE_URL=https://mcp.example.com
|
|
|
|
# Storage
|
|
DATA_DIR=/data
|
|
|
|
# Limits
|
|
ASSET_TTL_DAYS=30
|
|
MAX_IMAGE_SIZE_MB=10
|
|
LLM_TIMEOUT_SECONDS=60
|
|
```
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/config.py tests/unit/test_config.py .env.example
|
|
git commit -m "feat: add Settings config with env validation"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 2: Shared Pydantic models
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/models.py`
|
|
- Create: `tests/unit/test_models.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/unit/test_models.py`:
|
|
```python
|
|
import pytest
|
|
from pydantic import ValidationError
|
|
|
|
from docugen_mcp.models import (
|
|
TemplateVariable,
|
|
TemplateFrontmatter,
|
|
ImageVariable,
|
|
GenerationResult,
|
|
TokenUsage,
|
|
)
|
|
|
|
|
|
def test_template_variable_string():
|
|
v = TemplateVariable(name="cliente", type="string")
|
|
assert v.name == "cliente"
|
|
assert v.type == "string"
|
|
|
|
|
|
def test_template_variable_image():
|
|
v = TemplateVariable(name="foto", type="image")
|
|
assert v.type == "image"
|
|
|
|
|
|
def test_template_variable_rejects_unknown_type():
|
|
with pytest.raises(ValidationError):
|
|
TemplateVariable(name="x", type="number")
|
|
|
|
|
|
def test_template_variable_rejects_empty_name():
|
|
with pytest.raises(ValidationError):
|
|
TemplateVariable(name="", type="string")
|
|
|
|
|
|
def test_frontmatter_minimal():
|
|
fm = TemplateFrontmatter(name="fattura", description="Fattura commerciale")
|
|
assert fm.name == "fattura"
|
|
assert fm.required_variables == []
|
|
assert fm.model is None
|
|
assert fm.instructions_hint is None
|
|
|
|
|
|
def test_frontmatter_with_variables():
|
|
fm = TemplateFrontmatter(
|
|
name="fattura",
|
|
description="...",
|
|
model="anthropic/claude-sonnet-4",
|
|
required_variables=[
|
|
{"name": "cliente", "type": "string"},
|
|
{"name": "foto", "type": "image"},
|
|
],
|
|
instructions_hint="tono formale",
|
|
)
|
|
assert len(fm.required_variables) == 2
|
|
assert fm.required_variables[1].type == "image"
|
|
|
|
|
|
def test_frontmatter_name_slug_rules():
|
|
with pytest.raises(ValidationError):
|
|
TemplateFrontmatter(name="Fattura Commerciale", description="x")
|
|
with pytest.raises(ValidationError):
|
|
TemplateFrontmatter(name="../escape", description="x")
|
|
with pytest.raises(ValidationError):
|
|
TemplateFrontmatter(name="-starts-with-dash", description="x")
|
|
|
|
|
|
def test_image_variable():
|
|
img = ImageVariable(kind="image", data_b64="aGVsbG8=", mime="image/png")
|
|
assert img.mime == "image/png"
|
|
|
|
|
|
def test_image_variable_rejects_unsupported_mime():
|
|
with pytest.raises(ValidationError):
|
|
ImageVariable(kind="image", data_b64="aGVsbG8=", mime="image/bmp")
|
|
|
|
|
|
def test_image_variable_rejects_wrong_kind():
|
|
with pytest.raises(ValidationError):
|
|
ImageVariable(kind="file", data_b64="aGVsbG8=", mime="image/png")
|
|
|
|
|
|
def test_generation_result_shape():
|
|
result = GenerationResult(
|
|
generation_id="abc-123",
|
|
markdown="# hello",
|
|
model="anthropic/claude-sonnet-4",
|
|
tokens=TokenUsage(input=100, output=200),
|
|
cost_usd=0.01,
|
|
ephemeral_assets_urls=[],
|
|
ephemeral_expires_at=None,
|
|
)
|
|
assert result.generation_id == "abc-123"
|
|
assert result.tokens.input == 100
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_models.py -v
|
|
```
|
|
Expected: FAIL — module does not exist.
|
|
|
|
- [ ] **Step 3: Implement models**
|
|
|
|
`src/docugen_mcp/models.py`:
|
|
```python
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Literal
|
|
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
|
|
_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9-]*$")
|
|
_SUPPORTED_MIMES = {"image/png", "image/jpeg", "image/webp"}
|
|
|
|
|
|
class TemplateVariable(BaseModel):
|
|
name: str = Field(..., min_length=1)
|
|
type: Literal["string", "image"]
|
|
|
|
|
|
class TemplateFrontmatter(BaseModel):
|
|
name: str
|
|
description: str = ""
|
|
model: str | None = None
|
|
required_variables: list[TemplateVariable] = Field(default_factory=list)
|
|
instructions_hint: str | None = None
|
|
|
|
@field_validator("name")
|
|
@classmethod
|
|
def validate_slug(cls, v: str) -> str:
|
|
if not _SLUG_RE.match(v):
|
|
raise ValueError(
|
|
f"invalid template name '{v}': must match {_SLUG_RE.pattern}"
|
|
)
|
|
return v
|
|
|
|
|
|
class ImageVariable(BaseModel):
|
|
kind: Literal["image"]
|
|
data_b64: str
|
|
mime: str
|
|
|
|
@field_validator("mime")
|
|
@classmethod
|
|
def validate_mime(cls, v: str) -> str:
|
|
if v not in _SUPPORTED_MIMES:
|
|
raise ValueError(f"unsupported mime '{v}'; allowed: {sorted(_SUPPORTED_MIMES)}")
|
|
return v
|
|
|
|
|
|
class TokenUsage(BaseModel):
|
|
input: int = 0
|
|
output: int = 0
|
|
|
|
|
|
class GenerationResult(BaseModel):
|
|
generation_id: str
|
|
markdown: str
|
|
model: str
|
|
tokens: TokenUsage
|
|
cost_usd: float
|
|
ephemeral_assets_urls: list[str] = Field(default_factory=list)
|
|
ephemeral_expires_at: datetime | None = None
|
|
|
|
|
|
class TemplateSummary(BaseModel):
|
|
name: str
|
|
description: str
|
|
updated_at: datetime
|
|
|
|
|
|
class TemplateAsset(BaseModel):
|
|
filename: str
|
|
mime: str
|
|
size_bytes: int
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_models.py -v
|
|
```
|
|
Expected: 10 PASSED.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/models.py tests/unit/test_models.py
|
|
git commit -m "feat: add shared Pydantic models"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 3: Auth middleware
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/auth.py`
|
|
- Create: `tests/unit/test_auth.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/unit/test_auth.py`:
|
|
```python
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from docugen_mcp.auth import ApiKeyAuthMiddleware
|
|
|
|
|
|
@pytest.fixture
|
|
def app_with_auth():
|
|
app = FastAPI()
|
|
app.add_middleware(ApiKeyAuthMiddleware, api_key="secret", exempt_paths={"/health"})
|
|
|
|
@app.get("/protected")
|
|
async def protected():
|
|
return {"ok": True}
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok"}
|
|
|
|
return app
|
|
|
|
|
|
def test_protected_without_header_returns_401(app_with_auth):
|
|
client = TestClient(app_with_auth)
|
|
response = client.get("/protected")
|
|
assert response.status_code == 401
|
|
assert response.json() == {"error": "invalid_api_key"}
|
|
|
|
|
|
def test_protected_with_wrong_key_returns_401(app_with_auth):
|
|
client = TestClient(app_with_auth)
|
|
response = client.get("/protected", headers={"Authorization": "Bearer wrong"})
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_protected_with_correct_key_passes(app_with_auth):
|
|
client = TestClient(app_with_auth)
|
|
response = client.get("/protected", headers={"Authorization": "Bearer secret"})
|
|
assert response.status_code == 200
|
|
assert response.json() == {"ok": True}
|
|
|
|
|
|
def test_health_bypasses_auth(app_with_auth):
|
|
client = TestClient(app_with_auth)
|
|
response = client.get("/health")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_malformed_auth_header_returns_401(app_with_auth):
|
|
client = TestClient(app_with_auth)
|
|
response = client.get("/protected", headers={"Authorization": "secret"})
|
|
assert response.status_code == 401
|
|
response = client.get("/protected", headers={"Authorization": "Basic secret"})
|
|
assert response.status_code == 401
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_auth.py -v
|
|
```
|
|
Expected: FAIL — `docugen_mcp.auth` does not exist.
|
|
|
|
- [ ] **Step 3: Implement auth**
|
|
|
|
`src/docugen_mcp/auth.py`:
|
|
```python
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse
|
|
from starlette.types import ASGIApp
|
|
|
|
|
|
class ApiKeyAuthMiddleware(BaseHTTPMiddleware):
|
|
def __init__(
|
|
self,
|
|
app: ASGIApp,
|
|
api_key: str,
|
|
exempt_paths: set[str] | None = None,
|
|
) -> None:
|
|
super().__init__(app)
|
|
self.api_key = api_key
|
|
self.exempt_paths = exempt_paths or set()
|
|
|
|
async def dispatch(self, request: Request, call_next):
|
|
if request.url.path in self.exempt_paths:
|
|
return await call_next(request)
|
|
|
|
header = request.headers.get("Authorization", "")
|
|
if not header.startswith("Bearer "):
|
|
return JSONResponse(
|
|
status_code=401, content={"error": "invalid_api_key"}
|
|
)
|
|
|
|
provided = header.removeprefix("Bearer ").strip()
|
|
if provided != self.api_key:
|
|
return JSONResponse(
|
|
status_code=401, content={"error": "invalid_api_key"}
|
|
)
|
|
|
|
return await call_next(request)
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_auth.py -v
|
|
```
|
|
Expected: 5 PASSED.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/auth.py tests/unit/test_auth.py
|
|
git commit -m "feat: add API key bearer auth middleware"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 4: Template store (filesystem CRUD)
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/template_store.py`
|
|
- Create: `tests/unit/test_template_store.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/unit/test_template_store.py`:
|
|
```python
|
|
import base64
|
|
import pytest
|
|
|
|
from docugen_mcp.template_store import (
|
|
TemplateStore,
|
|
TemplateNotFound,
|
|
TemplateAlreadyExists,
|
|
InvalidFrontmatter,
|
|
)
|
|
from docugen_mcp.models import TemplateFrontmatter
|
|
|
|
|
|
@pytest.fixture
|
|
def store(tmp_path):
|
|
return TemplateStore(base_dir=tmp_path)
|
|
|
|
|
|
def _fm(name="fattura"):
|
|
return TemplateFrontmatter(
|
|
name=name,
|
|
description="Fattura commerciale",
|
|
required_variables=[{"name": "cliente", "type": "string"}],
|
|
)
|
|
|
|
|
|
async def test_create_and_get(store):
|
|
fm = _fm()
|
|
await store.create(name="fattura", frontmatter=fm, body="# Body {{cliente}}")
|
|
got = await store.get("fattura")
|
|
assert got.frontmatter.name == "fattura"
|
|
assert "# Body {{cliente}}" in got.body
|
|
|
|
|
|
async def test_create_duplicate_raises(store):
|
|
fm = _fm()
|
|
await store.create(name="fattura", frontmatter=fm, body="body")
|
|
with pytest.raises(TemplateAlreadyExists):
|
|
await store.create(name="fattura", frontmatter=fm, body="body")
|
|
|
|
|
|
async def test_get_missing_raises(store):
|
|
with pytest.raises(TemplateNotFound):
|
|
await store.get("nope")
|
|
|
|
|
|
async def test_list_returns_summaries(store):
|
|
await store.create(name="a", frontmatter=_fm("a"), body="b")
|
|
await store.create(name="b", frontmatter=_fm("b"), body="b")
|
|
result = await store.list()
|
|
names = sorted(t.name for t in result)
|
|
assert names == ["a", "b"]
|
|
|
|
|
|
async def test_update_overwrites(store):
|
|
await store.create(name="f", frontmatter=_fm("f"), body="old")
|
|
new_fm = TemplateFrontmatter(name="f", description="new description")
|
|
await store.update(name="f", frontmatter=new_fm, body="new body")
|
|
got = await store.get("f")
|
|
assert got.body == "new body"
|
|
assert got.frontmatter.description == "new description"
|
|
|
|
|
|
async def test_update_missing_raises(store):
|
|
with pytest.raises(TemplateNotFound):
|
|
await store.update(name="nope", frontmatter=_fm("nope"), body="x")
|
|
|
|
|
|
async def test_delete_removes(store):
|
|
await store.create(name="f", frontmatter=_fm("f"), body="b")
|
|
await store.delete("f")
|
|
with pytest.raises(TemplateNotFound):
|
|
await store.get("f")
|
|
|
|
|
|
async def test_delete_missing_raises(store):
|
|
with pytest.raises(TemplateNotFound):
|
|
await store.delete("nope")
|
|
|
|
|
|
async def test_assets_are_saved_and_listed(store):
|
|
png_bytes = b"\x89PNG\r\n\x1a\n"
|
|
assets = [
|
|
{"filename": "logo.png", "data_b64": base64.b64encode(png_bytes).decode(), "mime": "image/png"}
|
|
]
|
|
await store.create(name="f", frontmatter=_fm("f"), body="b", assets=assets)
|
|
got = await store.get("f")
|
|
asset_names = [a.filename for a in got.assets]
|
|
assert "logo.png" in asset_names
|
|
content = await store.read_asset("f", "logo.png")
|
|
assert content == png_bytes
|
|
|
|
|
|
async def test_asset_filename_rejects_path_traversal(store):
|
|
assets = [{"filename": "../evil.png", "data_b64": "aGk=", "mime": "image/png"}]
|
|
with pytest.raises(ValueError):
|
|
await store.create(name="f", frontmatter=_fm("f"), body="b", assets=assets)
|
|
|
|
|
|
async def test_frontmatter_parsing_rejects_malformed_yaml(store, tmp_path):
|
|
template_dir = tmp_path / "broken"
|
|
template_dir.mkdir()
|
|
(template_dir / "template.md").write_text("---\nname: :::broken\n---\nbody")
|
|
with pytest.raises(InvalidFrontmatter):
|
|
await store.get("broken")
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_template_store.py -v
|
|
```
|
|
Expected: FAIL — module does not exist.
|
|
|
|
- [ ] **Step 3: Implement template store**
|
|
|
|
`src/docugen_mcp/template_store.py`:
|
|
```python
|
|
import base64
|
|
import re
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import aiofiles
|
|
import yaml
|
|
from pydantic import ValidationError
|
|
|
|
from docugen_mcp.models import TemplateFrontmatter, TemplateSummary, TemplateAsset
|
|
|
|
|
|
_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9-]*$")
|
|
_ASSET_FILENAME_RE = re.compile(r"^[A-Za-z0-9._-]+$")
|
|
_FRONTMATTER_DELIM = "---"
|
|
|
|
|
|
class TemplateNotFound(Exception):
|
|
pass
|
|
|
|
|
|
class TemplateAlreadyExists(Exception):
|
|
pass
|
|
|
|
|
|
class InvalidFrontmatter(Exception):
|
|
pass
|
|
|
|
|
|
class InvalidTemplateName(ValueError):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class LoadedTemplate:
|
|
frontmatter: TemplateFrontmatter
|
|
body: str
|
|
assets: list[TemplateAsset]
|
|
updated_at: datetime
|
|
|
|
|
|
class TemplateStore:
|
|
def __init__(self, base_dir: Path) -> None:
|
|
self.base_dir = Path(base_dir)
|
|
self.base_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ---- public API ----
|
|
|
|
async def create(
|
|
self,
|
|
name: str,
|
|
frontmatter: TemplateFrontmatter,
|
|
body: str,
|
|
assets: list[dict] | None = None,
|
|
) -> None:
|
|
self._validate_name(name)
|
|
tdir = self.base_dir / name
|
|
if tdir.exists():
|
|
raise TemplateAlreadyExists(name)
|
|
tdir.mkdir(parents=True)
|
|
(tdir / "assets").mkdir()
|
|
await self._write_template_file(tdir, frontmatter, body)
|
|
if assets:
|
|
await self._write_assets(tdir, assets)
|
|
|
|
async def update(
|
|
self,
|
|
name: str,
|
|
frontmatter: TemplateFrontmatter,
|
|
body: str,
|
|
assets: list[dict] | None = None,
|
|
) -> None:
|
|
self._validate_name(name)
|
|
tdir = self.base_dir / name
|
|
if not tdir.exists():
|
|
raise TemplateNotFound(name)
|
|
await self._write_template_file(tdir, frontmatter, body)
|
|
if assets is not None:
|
|
# Replace the assets dir entirely
|
|
assets_dir = tdir / "assets"
|
|
for f in assets_dir.iterdir():
|
|
f.unlink()
|
|
await self._write_assets(tdir, assets)
|
|
|
|
async def delete(self, name: str) -> None:
|
|
self._validate_name(name)
|
|
tdir = self.base_dir / name
|
|
if not tdir.exists():
|
|
raise TemplateNotFound(name)
|
|
for f in (tdir / "assets").iterdir():
|
|
f.unlink()
|
|
(tdir / "assets").rmdir()
|
|
(tdir / "template.md").unlink()
|
|
tdir.rmdir()
|
|
|
|
async def get(self, name: str) -> LoadedTemplate:
|
|
self._validate_name(name)
|
|
tdir = self.base_dir / name
|
|
tpath = tdir / "template.md"
|
|
if not tpath.exists():
|
|
raise TemplateNotFound(name)
|
|
frontmatter, body = await self._read_template_file(tpath)
|
|
assets = self._list_assets(tdir)
|
|
stat = tpath.stat()
|
|
updated_at = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
|
|
return LoadedTemplate(frontmatter=frontmatter, body=body, assets=assets, updated_at=updated_at)
|
|
|
|
async def list(self) -> list[TemplateSummary]:
|
|
summaries = []
|
|
for tdir in sorted(self.base_dir.iterdir()):
|
|
if not tdir.is_dir():
|
|
continue
|
|
tpath = tdir / "template.md"
|
|
if not tpath.exists():
|
|
continue
|
|
try:
|
|
frontmatter, _ = await self._read_template_file(tpath)
|
|
except InvalidFrontmatter:
|
|
continue
|
|
stat = tpath.stat()
|
|
summaries.append(
|
|
TemplateSummary(
|
|
name=frontmatter.name,
|
|
description=frontmatter.description,
|
|
updated_at=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
|
|
)
|
|
)
|
|
return summaries
|
|
|
|
async def read_asset(self, template_name: str, filename: str) -> bytes:
|
|
self._validate_name(template_name)
|
|
self._validate_asset_filename(filename)
|
|
path = self.base_dir / template_name / "assets" / filename
|
|
if not path.exists():
|
|
raise FileNotFoundError(filename)
|
|
async with aiofiles.open(path, "rb") as f:
|
|
return await f.read()
|
|
|
|
def asset_path(self, template_name: str, filename: str) -> Path:
|
|
self._validate_name(template_name)
|
|
self._validate_asset_filename(filename)
|
|
return self.base_dir / template_name / "assets" / filename
|
|
|
|
# ---- helpers ----
|
|
|
|
def _validate_name(self, name: str) -> None:
|
|
if not _SLUG_RE.match(name):
|
|
raise InvalidTemplateName(name)
|
|
|
|
def _validate_asset_filename(self, filename: str) -> None:
|
|
if not _ASSET_FILENAME_RE.match(filename):
|
|
raise ValueError(f"invalid asset filename: {filename!r}")
|
|
|
|
async def _write_template_file(self, tdir: Path, frontmatter: TemplateFrontmatter, body: str) -> None:
|
|
yaml_text = yaml.safe_dump(frontmatter.model_dump(exclude_none=True), sort_keys=False)
|
|
content = f"{_FRONTMATTER_DELIM}\n{yaml_text}{_FRONTMATTER_DELIM}\n{body}"
|
|
async with aiofiles.open(tdir / "template.md", "w") as f:
|
|
await f.write(content)
|
|
|
|
async def _read_template_file(self, path: Path) -> tuple[TemplateFrontmatter, str]:
|
|
async with aiofiles.open(path, "r") as f:
|
|
raw = await f.read()
|
|
if not raw.startswith(_FRONTMATTER_DELIM):
|
|
raise InvalidFrontmatter("missing opening '---'")
|
|
parts = raw.split(_FRONTMATTER_DELIM, 2)
|
|
if len(parts) < 3:
|
|
raise InvalidFrontmatter("missing closing '---'")
|
|
yaml_text = parts[1]
|
|
body = parts[2].lstrip("\n")
|
|
try:
|
|
data = yaml.safe_load(yaml_text) or {}
|
|
frontmatter = TemplateFrontmatter(**data)
|
|
except (yaml.YAMLError, ValidationError) as exc:
|
|
raise InvalidFrontmatter(str(exc)) from exc
|
|
return frontmatter, body
|
|
|
|
async def _write_assets(self, tdir: Path, assets: list[dict]) -> None:
|
|
for asset in assets:
|
|
filename = asset["filename"]
|
|
self._validate_asset_filename(filename)
|
|
data = base64.b64decode(asset["data_b64"])
|
|
async with aiofiles.open(tdir / "assets" / filename, "wb") as f:
|
|
await f.write(data)
|
|
|
|
def _list_assets(self, tdir: Path) -> list[TemplateAsset]:
|
|
assets_dir = tdir / "assets"
|
|
if not assets_dir.exists():
|
|
return []
|
|
out = []
|
|
for f in sorted(assets_dir.iterdir()):
|
|
if not f.is_file():
|
|
continue
|
|
out.append(
|
|
TemplateAsset(
|
|
filename=f.name,
|
|
mime=_guess_mime(f.name),
|
|
size_bytes=f.stat().st_size,
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _guess_mime(filename: str) -> str:
|
|
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
|
|
return {
|
|
"png": "image/png",
|
|
"jpg": "image/jpeg",
|
|
"jpeg": "image/jpeg",
|
|
"webp": "image/webp",
|
|
}.get(ext, "application/octet-stream")
|
|
```
|
|
|
|
- [ ] **Step 4: Add `aiofiles` to dependencies**
|
|
|
|
```bash
|
|
uv add aiofiles
|
|
```
|
|
|
|
- [ ] **Step 5: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_template_store.py -v
|
|
```
|
|
Expected: 10 PASSED.
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/template_store.py tests/unit/test_template_store.py pyproject.toml uv.lock
|
|
git commit -m "feat: add TemplateStore with filesystem CRUD and asset handling"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 5: LLM client (OpenRouter wrapper)
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/llm_client.py`
|
|
- Create: `tests/unit/test_llm_client.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/unit/test_llm_client.py`:
|
|
```python
|
|
import httpx
|
|
import pytest
|
|
import respx
|
|
|
|
from docugen_mcp.llm_client import (
|
|
OpenRouterClient,
|
|
LLMTimeout,
|
|
LLMUpstreamError,
|
|
LLMAuthError,
|
|
LLMRateLimit,
|
|
LLMInvalidResponse,
|
|
LLMEmptyResponse,
|
|
)
|
|
|
|
|
|
def _success_body(text: str = "output text") -> dict:
|
|
return {
|
|
"id": "gen-1",
|
|
"choices": [{"message": {"role": "assistant", "content": text}}],
|
|
"model": "anthropic/claude-sonnet-4",
|
|
"usage": {"prompt_tokens": 100, "completion_tokens": 200, "total_cost": 0.01},
|
|
}
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_success():
|
|
respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
return_value=httpx.Response(200, json=_success_body("hello"))
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=5)
|
|
resp = await client.chat(
|
|
model="anthropic/claude-sonnet-4",
|
|
system="sys",
|
|
user="user",
|
|
)
|
|
assert resp.text == "hello"
|
|
assert resp.tokens_in == 100
|
|
assert resp.tokens_out == 200
|
|
assert resp.cost_usd == 0.01
|
|
assert resp.model == "anthropic/claude-sonnet-4"
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_retries_on_5xx():
|
|
route = respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
side_effect=[
|
|
httpx.Response(503),
|
|
httpx.Response(502),
|
|
httpx.Response(200, json=_success_body()),
|
|
]
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=5, retry_base_delay=0)
|
|
resp = await client.chat(model="m", system="s", user="u")
|
|
assert resp.text == "output text"
|
|
assert route.call_count == 3
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_exhausts_retries_5xx():
|
|
respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
return_value=httpx.Response(500)
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=5, retry_base_delay=0)
|
|
with pytest.raises(LLMUpstreamError):
|
|
await client.chat(model="m", system="s", user="u")
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_retries_on_429():
|
|
route = respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
side_effect=[
|
|
httpx.Response(429),
|
|
httpx.Response(200, json=_success_body()),
|
|
]
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=5, retry_base_delay=0)
|
|
resp = await client.chat(model="m", system="s", user="u")
|
|
assert route.call_count == 2
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_exhausts_retries_429():
|
|
respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
return_value=httpx.Response(429)
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=5, retry_base_delay=0)
|
|
with pytest.raises(LLMRateLimit):
|
|
await client.chat(model="m", system="s", user="u")
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_no_retry_on_401():
|
|
respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
return_value=httpx.Response(401)
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=5, retry_base_delay=0)
|
|
with pytest.raises(LLMAuthError):
|
|
await client.chat(model="m", system="s", user="u")
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_timeout():
|
|
respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
side_effect=httpx.ReadTimeout("timeout")
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=1, retry_base_delay=0)
|
|
with pytest.raises(LLMTimeout):
|
|
await client.chat(model="m", system="s", user="u")
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_invalid_response_shape():
|
|
respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
return_value=httpx.Response(200, json={"no": "choices"})
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=5, retry_base_delay=0)
|
|
with pytest.raises(LLMInvalidResponse):
|
|
await client.chat(model="m", system="s", user="u")
|
|
|
|
|
|
@respx.mock
|
|
async def test_chat_empty_content():
|
|
respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
return_value=httpx.Response(200, json=_success_body(text=""))
|
|
)
|
|
client = OpenRouterClient(api_key="sk", base_url="https://openrouter.ai/api/v1", timeout=5, retry_base_delay=0)
|
|
with pytest.raises(LLMEmptyResponse):
|
|
await client.chat(model="m", system="s", user="u")
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_llm_client.py -v
|
|
```
|
|
Expected: FAIL — module does not exist.
|
|
|
|
- [ ] **Step 3: Implement LLM client**
|
|
|
|
`src/docugen_mcp/llm_client.py`:
|
|
```python
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
from time import perf_counter
|
|
|
|
import httpx
|
|
|
|
|
|
class LLMError(Exception):
|
|
pass
|
|
|
|
|
|
class LLMTimeout(LLMError):
|
|
pass
|
|
|
|
|
|
class LLMUpstreamError(LLMError):
|
|
pass
|
|
|
|
|
|
class LLMAuthError(LLMError):
|
|
pass
|
|
|
|
|
|
class LLMRateLimit(LLMError):
|
|
pass
|
|
|
|
|
|
class LLMInvalidResponse(LLMError):
|
|
pass
|
|
|
|
|
|
class LLMEmptyResponse(LLMError):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class LLMResponse:
|
|
text: str
|
|
model: str
|
|
tokens_in: int
|
|
tokens_out: int
|
|
cost_usd: float
|
|
latency_ms: int
|
|
|
|
|
|
class OpenRouterClient:
|
|
def __init__(
|
|
self,
|
|
api_key: str,
|
|
base_url: str,
|
|
timeout: float = 60,
|
|
max_retries: int = 3,
|
|
retry_base_delay: float = 1.0,
|
|
) -> None:
|
|
self.api_key = api_key
|
|
self.base_url = base_url.rstrip("/")
|
|
self.timeout = timeout
|
|
self.max_retries = max_retries
|
|
self.retry_base_delay = retry_base_delay
|
|
|
|
async def chat(self, model: str, system: str, user: str) -> LLMResponse:
|
|
payload = {
|
|
"model": model,
|
|
"messages": [
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": user},
|
|
],
|
|
}
|
|
headers = {
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
start = perf_counter()
|
|
last_transient_error: Exception | None = None
|
|
|
|
for attempt in range(self.max_retries):
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/chat/completions",
|
|
headers=headers,
|
|
json=payload,
|
|
)
|
|
except (httpx.ReadTimeout, httpx.ConnectTimeout) as exc:
|
|
last_transient_error = exc
|
|
if attempt == self.max_retries - 1:
|
|
raise LLMTimeout(str(exc)) from exc
|
|
await self._sleep_backoff(attempt, rate_limit=False)
|
|
continue
|
|
|
|
status = response.status_code
|
|
if status == 200:
|
|
return self._parse_success(response, start)
|
|
if status in (401, 403):
|
|
raise LLMAuthError(f"status {status}: {response.text[:200]}")
|
|
if status == 429:
|
|
if attempt == self.max_retries - 1:
|
|
raise LLMRateLimit(response.text[:200])
|
|
await self._sleep_backoff(attempt, rate_limit=True)
|
|
continue
|
|
if 500 <= status < 600:
|
|
if attempt == self.max_retries - 1:
|
|
raise LLMUpstreamError(f"status {status}: {response.text[:200]}")
|
|
await self._sleep_backoff(attempt, rate_limit=False)
|
|
continue
|
|
raise LLMUpstreamError(f"unexpected status {status}: {response.text[:200]}")
|
|
|
|
raise LLMUpstreamError(f"retries exhausted: {last_transient_error}")
|
|
|
|
async def _sleep_backoff(self, attempt: int, rate_limit: bool) -> None:
|
|
multiplier = 5 if rate_limit else 1
|
|
delay = self.retry_base_delay * multiplier * (2**attempt)
|
|
if delay > 0:
|
|
await asyncio.sleep(delay)
|
|
|
|
def _parse_success(self, response: httpx.Response, start: float) -> LLMResponse:
|
|
try:
|
|
data = response.json()
|
|
choice = data["choices"][0]
|
|
text = choice["message"]["content"]
|
|
model = data.get("model", "unknown")
|
|
usage = data.get("usage", {})
|
|
except (KeyError, IndexError, ValueError) as exc:
|
|
raise LLMInvalidResponse(str(exc)) from exc
|
|
|
|
if not text:
|
|
raise LLMEmptyResponse("LLM returned empty content")
|
|
|
|
latency_ms = int((perf_counter() - start) * 1000)
|
|
return LLMResponse(
|
|
text=text,
|
|
model=model,
|
|
tokens_in=int(usage.get("prompt_tokens", 0)),
|
|
tokens_out=int(usage.get("completion_tokens", 0)),
|
|
cost_usd=float(usage.get("total_cost", 0.0)),
|
|
latency_ms=latency_ms,
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_llm_client.py -v
|
|
```
|
|
Expected: 9 PASSED.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/llm_client.py tests/unit/test_llm_client.py
|
|
git commit -m "feat: add OpenRouter async client with retry/backoff"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 6: Generation store (SQLite)
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/generation_store.py`
|
|
- Create: `tests/unit/test_generation_store.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/unit/test_generation_store.py`:
|
|
```python
|
|
import pytest
|
|
|
|
from docugen_mcp.generation_store import GenerationStore, GenerationRecord, EphemeralAssetRecord
|
|
|
|
|
|
@pytest.fixture
|
|
async def store(tmp_path):
|
|
s = GenerationStore(db_path=tmp_path / "gen.db", generated_dir=tmp_path / "generated")
|
|
await s.init()
|
|
return s
|
|
|
|
|
|
async def test_record_success_generation(store):
|
|
await store.record_generation(
|
|
GenerationRecord(
|
|
id="g-1",
|
|
template_name="fattura",
|
|
model="m",
|
|
tokens_in=100,
|
|
tokens_out=200,
|
|
cost_usd=0.01,
|
|
success=True,
|
|
error_msg=None,
|
|
)
|
|
)
|
|
stats = await store.get_stats()
|
|
assert stats["total"] == 1
|
|
assert stats["success"] == 1
|
|
assert stats["failed"] == 0
|
|
|
|
|
|
async def test_record_failed_generation(store):
|
|
await store.record_generation(
|
|
GenerationRecord(
|
|
id="g-2",
|
|
template_name="fattura",
|
|
model="m",
|
|
tokens_in=0,
|
|
tokens_out=0,
|
|
cost_usd=0.0,
|
|
success=False,
|
|
error_msg="LLMTimeout",
|
|
)
|
|
)
|
|
stats = await store.get_stats()
|
|
assert stats["failed"] == 1
|
|
|
|
|
|
async def test_register_ephemeral_asset(store, tmp_path):
|
|
asset_file = tmp_path / "generated" / "g-1" / "foto.png"
|
|
asset_file.parent.mkdir(parents=True)
|
|
asset_file.write_bytes(b"png-bytes")
|
|
|
|
await store.register_ephemeral_asset(
|
|
EphemeralAssetRecord(
|
|
generation_id="g-1",
|
|
var_name="foto",
|
|
file_path=str(asset_file),
|
|
mime="image/png",
|
|
ttl_days=30,
|
|
)
|
|
)
|
|
asset = await store.get_ephemeral_asset("g-1", "foto.png")
|
|
assert asset is not None
|
|
assert asset.mime == "image/png"
|
|
|
|
|
|
async def test_get_ephemeral_asset_returns_none_if_missing(store):
|
|
asset = await store.get_ephemeral_asset("nope", "foo.png")
|
|
assert asset is None
|
|
|
|
|
|
async def test_cleanup_expired_removes_records_and_files(store, tmp_path):
|
|
asset_file = tmp_path / "generated" / "g-old" / "foto.png"
|
|
asset_file.parent.mkdir(parents=True)
|
|
asset_file.write_bytes(b"bytes")
|
|
|
|
await store.register_ephemeral_asset(
|
|
EphemeralAssetRecord(
|
|
generation_id="g-old",
|
|
var_name="foto",
|
|
file_path=str(asset_file),
|
|
mime="image/png",
|
|
ttl_days=-1, # already expired
|
|
)
|
|
)
|
|
removed = await store.cleanup_expired()
|
|
assert removed == 1
|
|
assert not asset_file.exists()
|
|
assert await store.get_ephemeral_asset("g-old", "foto.png") is None
|
|
|
|
|
|
async def test_ephemeral_asset_expired_flag(store, tmp_path):
|
|
f = tmp_path / "generated" / "g-e" / "img.png"
|
|
f.parent.mkdir(parents=True)
|
|
f.write_bytes(b"x")
|
|
|
|
await store.register_ephemeral_asset(
|
|
EphemeralAssetRecord(
|
|
generation_id="g-e",
|
|
var_name="img",
|
|
file_path=str(f),
|
|
mime="image/png",
|
|
ttl_days=-1,
|
|
)
|
|
)
|
|
# File still on disk but record expired
|
|
asset = await store.get_ephemeral_asset("g-e", "img.png")
|
|
assert asset is not None
|
|
assert asset.is_expired is True
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_generation_store.py -v
|
|
```
|
|
Expected: FAIL.
|
|
|
|
- [ ] **Step 3: Implement generation store**
|
|
|
|
`src/docugen_mcp/generation_store.py`:
|
|
```python
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
import aiosqlite
|
|
|
|
|
|
_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS generations (
|
|
id TEXT PRIMARY KEY,
|
|
timestamp INTEGER NOT NULL,
|
|
template_name TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
tokens_in INTEGER NOT NULL,
|
|
tokens_out INTEGER NOT NULL,
|
|
cost_usd REAL NOT NULL,
|
|
success INTEGER NOT NULL,
|
|
error_msg TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_generations_timestamp ON generations(timestamp);
|
|
|
|
CREATE TABLE IF NOT EXISTS ephemeral_assets (
|
|
generation_id TEXT NOT NULL,
|
|
var_name TEXT NOT NULL,
|
|
file_path TEXT NOT NULL,
|
|
mime TEXT NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
expires_at INTEGER NOT NULL,
|
|
PRIMARY KEY (generation_id, var_name)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_assets_expires ON ephemeral_assets(expires_at);
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class GenerationRecord:
|
|
id: str
|
|
template_name: str
|
|
model: str
|
|
tokens_in: int
|
|
tokens_out: int
|
|
cost_usd: float
|
|
success: bool
|
|
error_msg: str | None
|
|
|
|
|
|
@dataclass
|
|
class EphemeralAssetRecord:
|
|
generation_id: str
|
|
var_name: str
|
|
file_path: str
|
|
mime: str
|
|
ttl_days: int
|
|
|
|
|
|
@dataclass
|
|
class EphemeralAssetInfo:
|
|
generation_id: str
|
|
var_name: str
|
|
file_path: str
|
|
mime: str
|
|
created_at: datetime
|
|
expires_at: datetime
|
|
is_expired: bool
|
|
|
|
|
|
def _now_ms() -> int:
|
|
return int(datetime.now(tz=timezone.utc).timestamp() * 1000)
|
|
|
|
|
|
class GenerationStore:
|
|
def __init__(self, db_path: Path, generated_dir: Path) -> None:
|
|
self.db_path = Path(db_path)
|
|
self.generated_dir = Path(generated_dir)
|
|
self.generated_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
async def init(self) -> None:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.executescript(_SCHEMA)
|
|
await db.commit()
|
|
|
|
async def record_generation(self, record: GenerationRecord) -> None:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO generations
|
|
(id, timestamp, template_name, model, tokens_in, tokens_out, cost_usd, success, error_msg)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
record.id,
|
|
_now_ms(),
|
|
record.template_name,
|
|
record.model,
|
|
record.tokens_in,
|
|
record.tokens_out,
|
|
record.cost_usd,
|
|
1 if record.success else 0,
|
|
record.error_msg,
|
|
),
|
|
)
|
|
await db.commit()
|
|
|
|
async def register_ephemeral_asset(self, record: EphemeralAssetRecord) -> None:
|
|
now = _now_ms()
|
|
expires = now + int(record.ttl_days * 24 * 3600 * 1000)
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO ephemeral_assets
|
|
(generation_id, var_name, file_path, mime, created_at, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
record.generation_id,
|
|
record.var_name,
|
|
record.file_path,
|
|
record.mime,
|
|
now,
|
|
expires,
|
|
),
|
|
)
|
|
await db.commit()
|
|
|
|
async def get_ephemeral_asset(self, generation_id: str, filename: str) -> EphemeralAssetInfo | None:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
cursor = await db.execute(
|
|
"""
|
|
SELECT generation_id, var_name, file_path, mime, created_at, expires_at
|
|
FROM ephemeral_assets
|
|
WHERE generation_id = ?
|
|
""",
|
|
(generation_id,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
for row in rows:
|
|
path = Path(row[2])
|
|
if path.name == filename:
|
|
created_at = datetime.fromtimestamp(row[4] / 1000, tz=timezone.utc)
|
|
expires_at = datetime.fromtimestamp(row[5] / 1000, tz=timezone.utc)
|
|
return EphemeralAssetInfo(
|
|
generation_id=row[0],
|
|
var_name=row[1],
|
|
file_path=row[2],
|
|
mime=row[3],
|
|
created_at=created_at,
|
|
expires_at=expires_at,
|
|
is_expired=expires_at < datetime.now(tz=timezone.utc),
|
|
)
|
|
return None
|
|
|
|
async def cleanup_expired(self) -> int:
|
|
now = _now_ms()
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
cursor = await db.execute(
|
|
"SELECT generation_id, file_path FROM ephemeral_assets WHERE expires_at < ?",
|
|
(now,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
count = 0
|
|
for _, fpath in rows:
|
|
try:
|
|
Path(fpath).unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
count += 1
|
|
# Remove now-empty generation dirs
|
|
for gen_id, _ in rows:
|
|
gdir = self.generated_dir / gen_id
|
|
if gdir.exists() and not any(gdir.iterdir()):
|
|
gdir.rmdir()
|
|
await db.execute(
|
|
"DELETE FROM ephemeral_assets WHERE expires_at < ?",
|
|
(now,),
|
|
)
|
|
await db.commit()
|
|
return count
|
|
|
|
async def get_stats(self) -> dict:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
cur = await db.execute(
|
|
"SELECT COUNT(*), SUM(success), SUM(cost_usd) FROM generations"
|
|
)
|
|
total, success, cost = await cur.fetchone()
|
|
return {
|
|
"total": total or 0,
|
|
"success": success or 0,
|
|
"failed": (total or 0) - (success or 0),
|
|
"total_cost_usd": float(cost or 0),
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_generation_store.py -v
|
|
```
|
|
Expected: 6 PASSED.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/generation_store.py tests/unit/test_generation_store.py
|
|
git commit -m "feat: add GenerationStore (SQLite) with TTL cleanup"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 7: Renderer (orchestrator)
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/renderer.py`
|
|
- Create: `tests/unit/test_renderer.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/unit/test_renderer.py`:
|
|
```python
|
|
import base64
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
|
|
from docugen_mcp.models import TemplateFrontmatter, TemplateVariable
|
|
from docugen_mcp.template_store import TemplateStore, LoadedTemplate
|
|
from docugen_mcp.generation_store import GenerationStore
|
|
from docugen_mcp.llm_client import OpenRouterClient, LLMResponse
|
|
from docugen_mcp.renderer import (
|
|
Renderer,
|
|
MissingVariables,
|
|
InvalidVariableType,
|
|
ImageTooLarge,
|
|
InvalidImageEncoding,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def env(tmp_path):
|
|
template_store = TemplateStore(base_dir=tmp_path / "templates")
|
|
generation_store = GenerationStore(
|
|
db_path=tmp_path / "gen.db",
|
|
generated_dir=tmp_path / "generated",
|
|
)
|
|
await generation_store.init()
|
|
llm = AsyncMock(spec=OpenRouterClient)
|
|
llm.chat.return_value = LLMResponse(
|
|
text="# Output", model="anthropic/claude-sonnet-4",
|
|
tokens_in=10, tokens_out=20, cost_usd=0.001, latency_ms=100,
|
|
)
|
|
renderer = Renderer(
|
|
template_store=template_store,
|
|
generation_store=generation_store,
|
|
llm=llm,
|
|
public_base_url="https://mcp.example.com",
|
|
default_model="anthropic/claude-sonnet-4",
|
|
asset_ttl_days=30,
|
|
max_image_size_mb=10,
|
|
)
|
|
fm = TemplateFrontmatter(
|
|
name="fattura",
|
|
description="x",
|
|
required_variables=[
|
|
TemplateVariable(name="cliente", type="string"),
|
|
],
|
|
)
|
|
await template_store.create(name="fattura", frontmatter=fm, body="Cliente: {{cliente}}")
|
|
return renderer, llm
|
|
|
|
|
|
async def test_generate_happy_path_string_var(env):
|
|
renderer, llm = env
|
|
result = await renderer.generate(
|
|
template_name="fattura",
|
|
content_md="# Ordine",
|
|
variables={"cliente": "ACME"},
|
|
instructions=None,
|
|
)
|
|
assert result.markdown == "# Output"
|
|
assert result.model == "anthropic/claude-sonnet-4"
|
|
assert result.tokens.input == 10
|
|
call_kwargs = llm.chat.await_args.kwargs
|
|
assert "ACME" in call_kwargs["user"]
|
|
assert "Cliente: ACME" in call_kwargs["system"]
|
|
|
|
|
|
async def test_generate_missing_required_variable_raises(env):
|
|
renderer, _ = env
|
|
with pytest.raises(MissingVariables) as exc:
|
|
await renderer.generate(
|
|
template_name="fattura",
|
|
content_md="x",
|
|
variables={},
|
|
instructions=None,
|
|
)
|
|
assert "cliente" in str(exc.value)
|
|
|
|
|
|
async def test_generate_wrong_type_raises(env):
|
|
renderer, _ = env
|
|
with pytest.raises(InvalidVariableType):
|
|
await renderer.generate(
|
|
template_name="fattura",
|
|
content_md="x",
|
|
variables={"cliente": {"kind": "image", "data_b64": "x", "mime": "image/png"}},
|
|
instructions=None,
|
|
)
|
|
|
|
|
|
async def test_generate_image_variable_is_saved_and_rewritten(tmp_path):
|
|
template_store = TemplateStore(base_dir=tmp_path / "templates")
|
|
generation_store = GenerationStore(
|
|
db_path=tmp_path / "gen.db",
|
|
generated_dir=tmp_path / "generated",
|
|
)
|
|
await generation_store.init()
|
|
llm = AsyncMock(spec=OpenRouterClient)
|
|
llm.chat.return_value = LLMResponse(
|
|
text="# OK", model="m", tokens_in=1, tokens_out=1, cost_usd=0, latency_ms=10,
|
|
)
|
|
renderer = Renderer(
|
|
template_store=template_store,
|
|
generation_store=generation_store,
|
|
llm=llm,
|
|
public_base_url="https://mcp.example.com",
|
|
default_model="m",
|
|
asset_ttl_days=30,
|
|
max_image_size_mb=10,
|
|
)
|
|
fm = TemplateFrontmatter(
|
|
name="report",
|
|
description="x",
|
|
required_variables=[TemplateVariable(name="foto", type="image")],
|
|
)
|
|
await template_store.create(name="report", frontmatter=fm, body="")
|
|
|
|
png = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100
|
|
img_var = {"kind": "image", "data_b64": base64.b64encode(png).decode(), "mime": "image/png"}
|
|
result = await renderer.generate(
|
|
template_name="report",
|
|
content_md="content",
|
|
variables={"foto": img_var},
|
|
instructions=None,
|
|
)
|
|
|
|
assert result.ephemeral_assets_urls
|
|
url = result.ephemeral_assets_urls[0]
|
|
assert url.startswith("https://mcp.example.com/generated/")
|
|
assert url.endswith(".png")
|
|
gen_dir = tmp_path / "generated" / result.generation_id
|
|
assert gen_dir.exists()
|
|
assert any(gen_dir.iterdir())
|
|
|
|
call_kwargs = llm.chat.await_args.kwargs
|
|
assert "https://mcp.example.com/generated/" in call_kwargs["system"]
|
|
|
|
|
|
async def test_image_too_large_raises(env):
|
|
renderer, _ = env
|
|
big = b"x" * (11 * 1024 * 1024) # 11 MB raw
|
|
img_var = {"kind": "image", "data_b64": base64.b64encode(big).decode(), "mime": "image/png"}
|
|
fm = TemplateFrontmatter(
|
|
name="big",
|
|
description="x",
|
|
required_variables=[TemplateVariable(name="foto", type="image")],
|
|
)
|
|
await renderer.template_store.create(name="big", frontmatter=fm, body="{{foto}}")
|
|
with pytest.raises(ImageTooLarge):
|
|
await renderer.generate(
|
|
template_name="big",
|
|
content_md="x",
|
|
variables={"foto": img_var},
|
|
instructions=None,
|
|
)
|
|
|
|
|
|
async def test_invalid_base64_raises(env):
|
|
renderer, _ = env
|
|
fm = TemplateFrontmatter(
|
|
name="bad",
|
|
description="x",
|
|
required_variables=[TemplateVariable(name="foto", type="image")],
|
|
)
|
|
await renderer.template_store.create(name="bad", frontmatter=fm, body="{{foto}}")
|
|
with pytest.raises(InvalidImageEncoding):
|
|
await renderer.generate(
|
|
template_name="bad",
|
|
content_md="x",
|
|
variables={"foto": {"kind": "image", "data_b64": "!!!not-base64!!!", "mime": "image/png"}},
|
|
instructions=None,
|
|
)
|
|
|
|
|
|
async def test_template_asset_paths_are_rewritten(tmp_path):
|
|
template_store = TemplateStore(base_dir=tmp_path / "templates")
|
|
generation_store = GenerationStore(
|
|
db_path=tmp_path / "gen.db",
|
|
generated_dir=tmp_path / "generated",
|
|
)
|
|
await generation_store.init()
|
|
llm = AsyncMock(spec=OpenRouterClient)
|
|
llm.chat.return_value = LLMResponse(
|
|
text="out", model="m", tokens_in=1, tokens_out=1, cost_usd=0, latency_ms=10,
|
|
)
|
|
renderer = Renderer(
|
|
template_store=template_store,
|
|
generation_store=generation_store,
|
|
llm=llm,
|
|
public_base_url="https://mcp.example.com",
|
|
default_model="m",
|
|
asset_ttl_days=30,
|
|
max_image_size_mb=10,
|
|
)
|
|
fm = TemplateFrontmatter(name="brand", description="x")
|
|
assets = [
|
|
{"filename": "logo.png", "data_b64": base64.b64encode(b"\x89PNG").decode(), "mime": "image/png"}
|
|
]
|
|
await template_store.create(
|
|
name="brand",
|
|
frontmatter=fm,
|
|
body="Header  footer",
|
|
assets=assets,
|
|
)
|
|
await renderer.generate(
|
|
template_name="brand",
|
|
content_md="x",
|
|
variables={},
|
|
instructions=None,
|
|
)
|
|
system_prompt = llm.chat.await_args.kwargs["system"]
|
|
assert "https://mcp.example.com/assets/brand/logo.png" in system_prompt
|
|
assert "./assets/logo.png" not in system_prompt
|
|
|
|
|
|
async def test_generate_uses_frontmatter_model_override(tmp_path):
|
|
template_store = TemplateStore(base_dir=tmp_path / "templates")
|
|
generation_store = GenerationStore(
|
|
db_path=tmp_path / "gen.db",
|
|
generated_dir=tmp_path / "generated",
|
|
)
|
|
await generation_store.init()
|
|
llm = AsyncMock(spec=OpenRouterClient)
|
|
llm.chat.return_value = LLMResponse(
|
|
text="out", model="openai/gpt-4o", tokens_in=1, tokens_out=1, cost_usd=0, latency_ms=10,
|
|
)
|
|
renderer = Renderer(
|
|
template_store=template_store,
|
|
generation_store=generation_store,
|
|
llm=llm,
|
|
public_base_url="https://mcp.example.com",
|
|
default_model="anthropic/claude-sonnet-4",
|
|
asset_ttl_days=30,
|
|
max_image_size_mb=10,
|
|
)
|
|
fm = TemplateFrontmatter(name="x", description="y", model="openai/gpt-4o")
|
|
await template_store.create(name="x", frontmatter=fm, body="b")
|
|
await renderer.generate(template_name="x", content_md="c", variables={}, instructions=None)
|
|
assert llm.chat.await_args.kwargs["model"] == "openai/gpt-4o"
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_renderer.py -v
|
|
```
|
|
Expected: FAIL.
|
|
|
|
- [ ] **Step 3: Implement renderer**
|
|
|
|
`src/docugen_mcp/renderer.py`:
|
|
```python
|
|
import base64
|
|
import binascii
|
|
import re
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import aiofiles
|
|
from pydantic import ValidationError
|
|
|
|
from docugen_mcp.generation_store import (
|
|
EphemeralAssetRecord,
|
|
GenerationRecord,
|
|
GenerationStore,
|
|
)
|
|
from docugen_mcp.llm_client import LLMError, OpenRouterClient
|
|
from docugen_mcp.models import (
|
|
GenerationResult,
|
|
ImageVariable,
|
|
TemplateFrontmatter,
|
|
TokenUsage,
|
|
)
|
|
from docugen_mcp.template_store import TemplateStore
|
|
|
|
|
|
class RendererError(Exception):
|
|
pass
|
|
|
|
|
|
class MissingVariables(RendererError):
|
|
def __init__(self, missing: list[str]) -> None:
|
|
super().__init__(f"missing required variables: {missing}")
|
|
self.missing = missing
|
|
|
|
|
|
class InvalidVariableType(RendererError):
|
|
pass
|
|
|
|
|
|
class InvalidImageEncoding(RendererError):
|
|
pass
|
|
|
|
|
|
class ImageTooLarge(RendererError):
|
|
pass
|
|
|
|
|
|
class AssetStorageError(RendererError):
|
|
pass
|
|
|
|
|
|
_ASSET_TEMPLATE_REF_RE = re.compile(r"\.\/assets\/([A-Za-z0-9._-]+)")
|
|
|
|
|
|
class Renderer:
|
|
def __init__(
|
|
self,
|
|
template_store: TemplateStore,
|
|
generation_store: GenerationStore,
|
|
llm: OpenRouterClient,
|
|
public_base_url: str,
|
|
default_model: str,
|
|
asset_ttl_days: int,
|
|
max_image_size_mb: int,
|
|
) -> None:
|
|
self.template_store = template_store
|
|
self.generation_store = generation_store
|
|
self.llm = llm
|
|
self.public_base_url = public_base_url.rstrip("/")
|
|
self.default_model = default_model
|
|
self.asset_ttl_days = asset_ttl_days
|
|
self.max_image_size_bytes = max_image_size_mb * 1024 * 1024
|
|
|
|
async def generate(
|
|
self,
|
|
template_name: str,
|
|
content_md: str,
|
|
variables: dict,
|
|
instructions: str | None,
|
|
) -> GenerationResult:
|
|
loaded = await self.template_store.get(template_name)
|
|
gen_id = str(uuid.uuid4())
|
|
|
|
validated = self._validate_variables(loaded.frontmatter, variables)
|
|
ephemeral_urls, var_strings = await self._materialize_image_variables(
|
|
gen_id, validated
|
|
)
|
|
|
|
system_prompt = self._build_system_prompt(
|
|
loaded.frontmatter, loaded.body, var_strings
|
|
)
|
|
user_prompt = self._build_user_prompt(content_md, var_strings, instructions)
|
|
model = loaded.frontmatter.model or self.default_model
|
|
|
|
try:
|
|
response = await self.llm.chat(model=model, system=system_prompt, user=user_prompt)
|
|
except LLMError as exc:
|
|
await self.generation_store.record_generation(
|
|
GenerationRecord(
|
|
id=gen_id,
|
|
template_name=template_name,
|
|
model=model,
|
|
tokens_in=0,
|
|
tokens_out=0,
|
|
cost_usd=0.0,
|
|
success=False,
|
|
error_msg=f"{type(exc).__name__}: {exc}",
|
|
)
|
|
)
|
|
raise
|
|
|
|
await self.generation_store.record_generation(
|
|
GenerationRecord(
|
|
id=gen_id,
|
|
template_name=template_name,
|
|
model=response.model,
|
|
tokens_in=response.tokens_in,
|
|
tokens_out=response.tokens_out,
|
|
cost_usd=response.cost_usd,
|
|
success=True,
|
|
error_msg=None,
|
|
)
|
|
)
|
|
|
|
expires_at = None
|
|
if ephemeral_urls:
|
|
info = await self.generation_store.get_ephemeral_asset(
|
|
gen_id, Path(ephemeral_urls[0]).name
|
|
)
|
|
expires_at = info.expires_at if info else None
|
|
|
|
return GenerationResult(
|
|
generation_id=gen_id,
|
|
markdown=response.text,
|
|
model=response.model,
|
|
tokens=TokenUsage(input=response.tokens_in, output=response.tokens_out),
|
|
cost_usd=response.cost_usd,
|
|
ephemeral_assets_urls=ephemeral_urls,
|
|
ephemeral_expires_at=expires_at,
|
|
)
|
|
|
|
def _validate_variables(self, fm: TemplateFrontmatter, variables: dict) -> dict:
|
|
missing = [v.name for v in fm.required_variables if v.name not in variables]
|
|
if missing:
|
|
raise MissingVariables(missing)
|
|
|
|
validated: dict = {}
|
|
for var_def in fm.required_variables:
|
|
raw = variables[var_def.name]
|
|
if var_def.type == "string":
|
|
if not isinstance(raw, str):
|
|
raise InvalidVariableType(
|
|
f"{var_def.name}: expected string, got {type(raw).__name__}"
|
|
)
|
|
validated[var_def.name] = raw
|
|
elif var_def.type == "image":
|
|
try:
|
|
img = ImageVariable(**raw) if isinstance(raw, dict) else None
|
|
except ValidationError as exc:
|
|
raise InvalidVariableType(f"{var_def.name}: {exc}") from exc
|
|
if img is None:
|
|
raise InvalidVariableType(
|
|
f"{var_def.name}: expected image object, got {type(raw).__name__}"
|
|
)
|
|
validated[var_def.name] = img
|
|
return validated
|
|
|
|
async def _materialize_image_variables(
|
|
self, gen_id: str, validated: dict
|
|
) -> tuple[list[str], dict[str, str]]:
|
|
urls: list[str] = []
|
|
var_strings: dict[str, str] = {}
|
|
gen_dir = self.generation_store.generated_dir / gen_id
|
|
|
|
for name, value in validated.items():
|
|
if isinstance(value, ImageVariable):
|
|
try:
|
|
raw_bytes = base64.b64decode(value.data_b64, validate=True)
|
|
except (binascii.Error, ValueError) as exc:
|
|
raise InvalidImageEncoding(f"{name}: {exc}") from exc
|
|
if len(raw_bytes) > self.max_image_size_bytes:
|
|
raise ImageTooLarge(
|
|
f"{name}: {len(raw_bytes)} bytes > {self.max_image_size_bytes}"
|
|
)
|
|
ext = {"image/png": "png", "image/jpeg": "jpg", "image/webp": "webp"}[value.mime]
|
|
filename = f"{name}.{ext}"
|
|
file_path = gen_dir / filename
|
|
gen_dir.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
async with aiofiles.open(file_path, "wb") as f:
|
|
await f.write(raw_bytes)
|
|
except OSError as exc:
|
|
raise AssetStorageError(str(exc)) from exc
|
|
await self.generation_store.register_ephemeral_asset(
|
|
EphemeralAssetRecord(
|
|
generation_id=gen_id,
|
|
var_name=name,
|
|
file_path=str(file_path),
|
|
mime=value.mime,
|
|
ttl_days=self.asset_ttl_days,
|
|
)
|
|
)
|
|
url = f"{self.public_base_url}/generated/{gen_id}/{filename}"
|
|
urls.append(url)
|
|
var_strings[name] = url
|
|
else:
|
|
var_strings[name] = str(value)
|
|
return urls, var_strings
|
|
|
|
def _build_system_prompt(
|
|
self, fm: TemplateFrontmatter, body: str, var_strings: dict[str, str]
|
|
) -> str:
|
|
# Rewrite template-scoped asset paths to absolute URLs
|
|
template_name = fm.name
|
|
|
|
def _replace_asset(match: re.Match) -> str:
|
|
filename = match.group(1)
|
|
return f"{self.public_base_url}/assets/{template_name}/{filename}"
|
|
|
|
resolved = _ASSET_TEMPLATE_REF_RE.sub(_replace_asset, body)
|
|
# Substitute {{var}} placeholders with current values
|
|
for name, value in var_strings.items():
|
|
resolved = resolved.replace(f"{{{{{name}}}}}", value)
|
|
return resolved
|
|
|
|
def _build_user_prompt(
|
|
self, content_md: str, var_strings: dict[str, str], instructions: str | None
|
|
) -> str:
|
|
parts = ["## Raw content", content_md]
|
|
if var_strings:
|
|
parts.append("## Variables")
|
|
for name, value in var_strings.items():
|
|
parts.append(f"- {name}: {value}")
|
|
if instructions:
|
|
parts.append("## Additional instructions")
|
|
parts.append(instructions)
|
|
return "\n\n".join(parts)
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/unit/test_renderer.py -v
|
|
```
|
|
Expected: 8 PASSED.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/renderer.py tests/unit/test_renderer.py
|
|
git commit -m "feat: add Renderer orchestrator with var validation and image handling"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 8: HTTP routes (FastAPI sub-app)
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/http_routes.py`
|
|
- Create: `tests/integration/test_http_routes.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/integration/test_http_routes.py`:
|
|
```python
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from docugen_mcp.auth import ApiKeyAuthMiddleware
|
|
from docugen_mcp.generation_store import EphemeralAssetRecord, GenerationStore
|
|
from docugen_mcp.http_routes import build_http_app
|
|
from docugen_mcp.template_store import TemplateStore
|
|
|
|
|
|
@pytest.fixture
|
|
async def client(tmp_path):
|
|
template_store = TemplateStore(base_dir=tmp_path / "templates")
|
|
generation_store = GenerationStore(
|
|
db_path=tmp_path / "gen.db",
|
|
generated_dir=tmp_path / "generated",
|
|
)
|
|
await generation_store.init()
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(ApiKeyAuthMiddleware, api_key="test-key", exempt_paths={"/health"})
|
|
app.mount("/", build_http_app(template_store, generation_store))
|
|
return TestClient(app), template_store, generation_store, tmp_path
|
|
|
|
|
|
def _headers():
|
|
return {"Authorization": "Bearer test-key"}
|
|
|
|
|
|
async def test_health_no_auth(client):
|
|
tc, *_ = client
|
|
r = tc.get("/health")
|
|
assert r.status_code == 200
|
|
assert r.json() == {"status": "ok"}
|
|
|
|
|
|
async def test_assets_serve_existing_file(client):
|
|
tc, template_store, _, tmp_path = client
|
|
from docugen_mcp.models import TemplateFrontmatter
|
|
fm = TemplateFrontmatter(name="brand", description="x")
|
|
import base64
|
|
assets = [{"filename": "logo.png", "data_b64": base64.b64encode(b"\x89PNG").decode(), "mime": "image/png"}]
|
|
await template_store.create(name="brand", frontmatter=fm, body="b", assets=assets)
|
|
|
|
r = tc.get("/assets/brand/logo.png", headers=_headers())
|
|
assert r.status_code == 200
|
|
assert r.content == b"\x89PNG"
|
|
|
|
|
|
async def test_assets_require_auth(client):
|
|
tc, *_ = client
|
|
r = tc.get("/assets/brand/logo.png")
|
|
assert r.status_code == 401
|
|
|
|
|
|
async def test_assets_path_traversal_rejected(client):
|
|
tc, *_ = client
|
|
r = tc.get("/assets/brand/..%2Fevil.png", headers=_headers())
|
|
assert r.status_code == 400
|
|
|
|
|
|
async def test_assets_missing_returns_404(client):
|
|
tc, *_ = client
|
|
r = tc.get("/assets/brand/missing.png", headers=_headers())
|
|
assert r.status_code == 404
|
|
|
|
|
|
async def test_generated_fresh_returns_file(client):
|
|
tc, _, generation_store, tmp_path = client
|
|
gen_dir = tmp_path / "generated" / "g-1"
|
|
gen_dir.mkdir(parents=True)
|
|
(gen_dir / "foto.png").write_bytes(b"image-bytes")
|
|
await generation_store.register_ephemeral_asset(
|
|
EphemeralAssetRecord(
|
|
generation_id="g-1",
|
|
var_name="foto",
|
|
file_path=str(gen_dir / "foto.png"),
|
|
mime="image/png",
|
|
ttl_days=30,
|
|
)
|
|
)
|
|
r = tc.get("/generated/g-1/foto.png", headers=_headers())
|
|
assert r.status_code == 200
|
|
assert r.content == b"image-bytes"
|
|
|
|
|
|
async def test_generated_expired_returns_410(client):
|
|
tc, _, generation_store, tmp_path = client
|
|
gen_dir = tmp_path / "generated" / "g-old"
|
|
gen_dir.mkdir(parents=True)
|
|
(gen_dir / "foto.png").write_bytes(b"x")
|
|
await generation_store.register_ephemeral_asset(
|
|
EphemeralAssetRecord(
|
|
generation_id="g-old",
|
|
var_name="foto",
|
|
file_path=str(gen_dir / "foto.png"),
|
|
mime="image/png",
|
|
ttl_days=-1,
|
|
)
|
|
)
|
|
r = tc.get("/generated/g-old/foto.png", headers=_headers())
|
|
assert r.status_code == 410
|
|
|
|
|
|
async def test_generated_unknown_returns_410(client):
|
|
tc, *_ = client
|
|
r = tc.get("/generated/unknown-id/file.png", headers=_headers())
|
|
assert r.status_code == 410
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/integration/test_http_routes.py -v
|
|
```
|
|
Expected: FAIL.
|
|
|
|
- [ ] **Step 3: Implement HTTP routes**
|
|
|
|
`src/docugen_mcp/http_routes.py`:
|
|
```python
|
|
import re
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import FileResponse, JSONResponse
|
|
|
|
from docugen_mcp.generation_store import GenerationStore
|
|
from docugen_mcp.template_store import InvalidTemplateName, TemplateStore
|
|
|
|
|
|
_SAFE_SEGMENT_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")
|
|
|
|
|
|
def build_http_app(
|
|
template_store: TemplateStore,
|
|
generation_store: GenerationStore,
|
|
) -> FastAPI:
|
|
app = FastAPI()
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok"}
|
|
|
|
@app.get("/assets/{template_name}/{filename}")
|
|
async def get_template_asset(template_name: str, filename: str):
|
|
if not _SAFE_SEGMENT_RE.match(template_name) or not _SAFE_SEGMENT_RE.match(filename):
|
|
raise HTTPException(status_code=400, detail="invalid path")
|
|
try:
|
|
path = template_store.asset_path(template_name, filename)
|
|
except (ValueError, InvalidTemplateName):
|
|
raise HTTPException(status_code=400, detail="invalid path")
|
|
if not path.exists():
|
|
raise HTTPException(status_code=404, detail="not found")
|
|
return FileResponse(path)
|
|
|
|
@app.get("/generated/{gen_id}/{filename}")
|
|
async def get_generated_asset(gen_id: str, filename: str):
|
|
if not _SAFE_SEGMENT_RE.match(gen_id) or not _SAFE_SEGMENT_RE.match(filename):
|
|
raise HTTPException(status_code=400, detail="invalid path")
|
|
info = await generation_store.get_ephemeral_asset(gen_id, filename)
|
|
if info is None or info.is_expired or not Path(info.file_path).exists():
|
|
return JSONResponse(
|
|
status_code=410,
|
|
content={"error": "gone", "reason": "ephemeral asset expired or not found"},
|
|
)
|
|
return FileResponse(info.file_path, media_type=info.mime)
|
|
|
|
return app
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/integration/test_http_routes.py -v
|
|
```
|
|
Expected: 8 PASSED.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/http_routes.py tests/integration/test_http_routes.py
|
|
git commit -m "feat: add FastAPI sub-app for asset serving and health"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 9: MCP tools (FastMCP)
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/mcp_tools.py`
|
|
- Create: `tests/integration/test_mcp_tools.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
`tests/integration/test_mcp_tools.py`:
|
|
```python
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
|
|
from docugen_mcp.llm_client import LLMResponse
|
|
from docugen_mcp.mcp_tools import build_mcp_server
|
|
from docugen_mcp.template_store import TemplateStore
|
|
from docugen_mcp.generation_store import GenerationStore
|
|
|
|
|
|
@pytest.fixture
|
|
async def mcp_env(tmp_path):
|
|
template_store = TemplateStore(base_dir=tmp_path / "templates")
|
|
generation_store = GenerationStore(
|
|
db_path=tmp_path / "gen.db",
|
|
generated_dir=tmp_path / "generated",
|
|
)
|
|
await generation_store.init()
|
|
llm = AsyncMock()
|
|
llm.chat.return_value = LLMResponse(
|
|
text="# Out", model="m", tokens_in=5, tokens_out=10, cost_usd=0.01, latency_ms=50,
|
|
)
|
|
from docugen_mcp.renderer import Renderer
|
|
renderer = Renderer(
|
|
template_store=template_store,
|
|
generation_store=generation_store,
|
|
llm=llm,
|
|
public_base_url="https://mcp.example.com",
|
|
default_model="m",
|
|
asset_ttl_days=30,
|
|
max_image_size_mb=10,
|
|
)
|
|
mcp = build_mcp_server(template_store, renderer)
|
|
return mcp, template_store
|
|
|
|
|
|
async def test_template_create_and_get(mcp_env):
|
|
mcp, _ = mcp_env
|
|
await mcp.tools["template_create"](
|
|
name="demo",
|
|
frontmatter={"name": "demo", "description": "d"},
|
|
body="body",
|
|
assets=None,
|
|
)
|
|
got = await mcp.tools["template_get"](name="demo")
|
|
assert got["name"] == "demo"
|
|
assert got["body"] == "body"
|
|
|
|
|
|
async def test_template_create_duplicate_errors(mcp_env):
|
|
mcp, _ = mcp_env
|
|
await mcp.tools["template_create"](
|
|
name="demo", frontmatter={"name": "demo", "description": "x"}, body="b", assets=None,
|
|
)
|
|
with pytest.raises(Exception):
|
|
await mcp.tools["template_create"](
|
|
name="demo", frontmatter={"name": "demo", "description": "x"}, body="b", assets=None,
|
|
)
|
|
|
|
|
|
async def test_template_list_returns_summaries(mcp_env):
|
|
mcp, _ = mcp_env
|
|
await mcp.tools["template_create"](
|
|
name="a", frontmatter={"name": "a", "description": "A"}, body="b", assets=None,
|
|
)
|
|
await mcp.tools["template_create"](
|
|
name="b", frontmatter={"name": "b", "description": "B"}, body="b", assets=None,
|
|
)
|
|
result = await mcp.tools["template_list"]()
|
|
names = sorted(t["name"] for t in result)
|
|
assert names == ["a", "b"]
|
|
|
|
|
|
async def test_template_update(mcp_env):
|
|
mcp, _ = mcp_env
|
|
await mcp.tools["template_create"](
|
|
name="u", frontmatter={"name": "u", "description": "d"}, body="old", assets=None,
|
|
)
|
|
await mcp.tools["template_update"](
|
|
name="u", frontmatter={"name": "u", "description": "new"}, body="new", assets=None,
|
|
)
|
|
got = await mcp.tools["template_get"](name="u")
|
|
assert got["body"] == "new"
|
|
assert got["frontmatter"]["description"] == "new"
|
|
|
|
|
|
async def test_template_delete(mcp_env):
|
|
mcp, _ = mcp_env
|
|
await mcp.tools["template_create"](
|
|
name="d", frontmatter={"name": "d", "description": "x"}, body="b", assets=None,
|
|
)
|
|
await mcp.tools["template_delete"](name="d")
|
|
with pytest.raises(Exception):
|
|
await mcp.tools["template_get"](name="d")
|
|
|
|
|
|
async def test_document_generate_happy_path(mcp_env):
|
|
mcp, _ = mcp_env
|
|
await mcp.tools["template_create"](
|
|
name="g",
|
|
frontmatter={
|
|
"name": "g",
|
|
"description": "x",
|
|
"required_variables": [{"name": "cliente", "type": "string"}],
|
|
},
|
|
body="Cliente: {{cliente}}",
|
|
assets=None,
|
|
)
|
|
result = await mcp.tools["document_generate"](
|
|
template_name="g",
|
|
content_md="# Ordine",
|
|
variables={"cliente": "ACME"},
|
|
instructions=None,
|
|
)
|
|
assert result["markdown"] == "# Out"
|
|
assert result["model"] == "m"
|
|
assert result["tokens"]["input"] == 5
|
|
assert "generation_id" in result
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/integration/test_mcp_tools.py -v
|
|
```
|
|
Expected: FAIL.
|
|
|
|
- [ ] **Step 3: Implement MCP tools**
|
|
|
|
`src/docugen_mcp/mcp_tools.py`:
|
|
```python
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
from docugen_mcp.models import TemplateFrontmatter
|
|
from docugen_mcp.renderer import Renderer
|
|
from docugen_mcp.template_store import TemplateStore
|
|
|
|
|
|
def build_mcp_server(template_store: TemplateStore, renderer: Renderer) -> FastMCP:
|
|
mcp = FastMCP("docugen-mcp")
|
|
|
|
@mcp.tool()
|
|
async def template_create(
|
|
name: str,
|
|
frontmatter: dict,
|
|
body: str,
|
|
assets: list[dict] | None = None,
|
|
) -> dict:
|
|
"""Create a new template. Fails if the name already exists."""
|
|
fm = TemplateFrontmatter(**frontmatter)
|
|
await template_store.create(name=name, frontmatter=fm, body=body, assets=assets)
|
|
return {"name": name, "status": "created"}
|
|
|
|
@mcp.tool()
|
|
async def template_update(
|
|
name: str,
|
|
frontmatter: dict,
|
|
body: str,
|
|
assets: list[dict] | None = None,
|
|
) -> dict:
|
|
"""Update an existing template. Replaces frontmatter, body, and (if given) assets."""
|
|
fm = TemplateFrontmatter(**frontmatter)
|
|
await template_store.update(name=name, frontmatter=fm, body=body, assets=assets)
|
|
return {"name": name, "status": "updated"}
|
|
|
|
@mcp.tool()
|
|
async def template_delete(name: str) -> dict:
|
|
"""Delete a template and all its assets."""
|
|
await template_store.delete(name)
|
|
return {"name": name, "status": "deleted"}
|
|
|
|
@mcp.tool()
|
|
async def template_get(name: str) -> dict:
|
|
"""Return a template's frontmatter, body, and asset list."""
|
|
loaded = await template_store.get(name)
|
|
return {
|
|
"name": loaded.frontmatter.name,
|
|
"frontmatter": loaded.frontmatter.model_dump(exclude_none=True),
|
|
"body": loaded.body,
|
|
"assets": [a.model_dump() for a in loaded.assets],
|
|
"updated_at": loaded.updated_at.isoformat(),
|
|
}
|
|
|
|
@mcp.tool()
|
|
async def template_list() -> list[dict]:
|
|
"""List all templates with name, description, and updated_at."""
|
|
summaries = await template_store.list()
|
|
return [s.model_dump(mode="json") for s in summaries]
|
|
|
|
@mcp.tool()
|
|
async def document_generate(
|
|
template_name: str,
|
|
content_md: str,
|
|
variables: dict,
|
|
instructions: str | None = None,
|
|
) -> dict:
|
|
"""Generate a Markdown document from a template, content, and variables."""
|
|
result = await renderer.generate(
|
|
template_name=template_name,
|
|
content_md=content_md,
|
|
variables=variables,
|
|
instructions=instructions,
|
|
)
|
|
return result.model_dump(mode="json")
|
|
|
|
# Expose callables directly for test access (FastMCP test helper)
|
|
mcp.tools = {
|
|
"template_create": template_create,
|
|
"template_update": template_update,
|
|
"template_delete": template_delete,
|
|
"template_get": template_get,
|
|
"template_list": template_list,
|
|
"document_generate": document_generate,
|
|
}
|
|
|
|
return mcp
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/integration/test_mcp_tools.py -v
|
|
```
|
|
Expected: 6 PASSED.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/mcp_tools.py tests/integration/test_mcp_tools.py
|
|
git commit -m "feat: add FastMCP tool definitions (6 tools)"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 10: Main bootstrap + end-to-end flow test
|
|
|
|
**Files:**
|
|
- Create: `src/docugen_mcp/main.py`
|
|
- Create: `tests/integration/test_generate_flow.py`
|
|
|
|
- [ ] **Step 1: Write failing integration test**
|
|
|
|
`tests/integration/test_generate_flow.py`:
|
|
```python
|
|
import base64
|
|
import pytest
|
|
import respx
|
|
import httpx
|
|
|
|
from docugen_mcp.main import build_app
|
|
|
|
|
|
def _openrouter_success(text: str = "# Generated"):
|
|
return httpx.Response(200, json={
|
|
"id": "g",
|
|
"choices": [{"message": {"role": "assistant", "content": text}}],
|
|
"model": "anthropic/claude-sonnet-4",
|
|
"usage": {"prompt_tokens": 50, "completion_tokens": 100, "total_cost": 0.02},
|
|
})
|
|
|
|
|
|
@pytest.fixture
|
|
async def app_env(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("API_KEY", "test-api-key")
|
|
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-test")
|
|
monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.test.com")
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
app = await build_app()
|
|
return app, tmp_path
|
|
|
|
|
|
@respx.mock
|
|
async def test_full_generation_flow_records_and_serves_asset(app_env):
|
|
app, tmp_path = app_env
|
|
respx.post("https://openrouter.ai/api/v1/chat/completions").mock(
|
|
return_value=_openrouter_success("# Done")
|
|
)
|
|
|
|
# Find the renderer and template store via app state
|
|
template_store = app.state.template_store
|
|
renderer = app.state.renderer
|
|
generation_store = app.state.generation_store
|
|
|
|
from docugen_mcp.models import TemplateFrontmatter, TemplateVariable
|
|
fm = TemplateFrontmatter(
|
|
name="report",
|
|
description="x",
|
|
required_variables=[TemplateVariable(name="foto", type="image")],
|
|
)
|
|
await template_store.create(name="report", frontmatter=fm, body="")
|
|
|
|
png = b"\x89PNG\r\n\x1a\n" + b"\x00" * 50
|
|
img_var = {"kind": "image", "data_b64": base64.b64encode(png).decode(), "mime": "image/png"}
|
|
|
|
result = await renderer.generate(
|
|
template_name="report",
|
|
content_md="content",
|
|
variables={"foto": img_var},
|
|
instructions=None,
|
|
)
|
|
|
|
assert result.markdown == "# Done"
|
|
stats = await generation_store.get_stats()
|
|
assert stats["success"] == 1
|
|
|
|
# Asset exists on disk
|
|
gen_files = list((tmp_path / "generated" / result.generation_id).iterdir())
|
|
assert len(gen_files) == 1
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify failure**
|
|
|
|
```bash
|
|
uv run pytest tests/integration/test_generate_flow.py -v
|
|
```
|
|
Expected: FAIL — `docugen_mcp.main` does not exist.
|
|
|
|
- [ ] **Step 3: Implement main bootstrap**
|
|
|
|
Architecturally, FastMCP exposes a Starlette/ASGI app via `streamable_http_app()` (check exact name in installed SDK — see Notes). We mount it at `/mcp` inside a root FastAPI app that also hosts the health + asset routes. This avoids root-path conflicts and gives Claude Code the URL `https://host/mcp`.
|
|
|
|
`src/docugen_mcp/main.py`:
|
|
```python
|
|
import asyncio
|
|
import logging
|
|
|
|
import uvicorn
|
|
from fastapi import FastAPI
|
|
|
|
from docugen_mcp.auth import ApiKeyAuthMiddleware
|
|
from docugen_mcp.config import Settings
|
|
from docugen_mcp.generation_store import GenerationStore
|
|
from docugen_mcp.http_routes import build_http_app
|
|
from docugen_mcp.llm_client import OpenRouterClient
|
|
from docugen_mcp.mcp_tools import build_mcp_server
|
|
from docugen_mcp.renderer import Renderer
|
|
from docugen_mcp.template_store import TemplateStore
|
|
|
|
|
|
logger = logging.getLogger("docugen_mcp")
|
|
|
|
|
|
async def build_app(settings: Settings | None = None) -> FastAPI:
|
|
settings = settings or Settings()
|
|
settings.data_dir.mkdir(parents=True, exist_ok=True)
|
|
templates_dir = settings.data_dir / "templates"
|
|
generated_dir = settings.data_dir / "generated"
|
|
db_path = settings.data_dir / "docugen_mcp.db"
|
|
|
|
template_store = TemplateStore(base_dir=templates_dir)
|
|
generation_store = GenerationStore(db_path=db_path, generated_dir=generated_dir)
|
|
await generation_store.init()
|
|
|
|
llm = OpenRouterClient(
|
|
api_key=settings.openrouter_api_key,
|
|
base_url=settings.openrouter_base_url,
|
|
timeout=settings.llm_timeout_seconds,
|
|
)
|
|
renderer = Renderer(
|
|
template_store=template_store,
|
|
generation_store=generation_store,
|
|
llm=llm,
|
|
public_base_url=settings.public_base_url,
|
|
default_model=settings.llm_model_default,
|
|
asset_ttl_days=settings.asset_ttl_days,
|
|
max_image_size_mb=settings.max_image_size_mb,
|
|
)
|
|
|
|
mcp = build_mcp_server(template_store, renderer)
|
|
|
|
# Root FastAPI = HTTP routes (health, assets, generated) + MCP mounted on /mcp
|
|
app = build_http_app(template_store, generation_store)
|
|
app.mount("/mcp", mcp.streamable_http_app())
|
|
|
|
app.add_middleware(
|
|
ApiKeyAuthMiddleware,
|
|
api_key=settings.api_key,
|
|
exempt_paths={"/health"},
|
|
)
|
|
|
|
# Stash for tests and cleanup task
|
|
app.state.settings = settings
|
|
app.state.template_store = template_store
|
|
app.state.generation_store = generation_store
|
|
app.state.renderer = renderer
|
|
app.state.llm = llm
|
|
|
|
@app.on_event("startup")
|
|
async def start_cleanup_task():
|
|
app.state._cleanup_task = asyncio.create_task(
|
|
_periodic_cleanup(generation_store, interval_seconds=24 * 3600)
|
|
)
|
|
|
|
@app.on_event("shutdown")
|
|
async def stop_cleanup_task():
|
|
task = app.state.__dict__.get("_cleanup_task")
|
|
if task:
|
|
task.cancel()
|
|
|
|
return app
|
|
|
|
|
|
async def _periodic_cleanup(generation_store: GenerationStore, interval_seconds: int) -> None:
|
|
while True:
|
|
try:
|
|
removed = await generation_store.cleanup_expired()
|
|
if removed:
|
|
logger.info("cleanup_expired removed %d assets", removed)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.error("cleanup task error: %s", exc)
|
|
await asyncio.sleep(interval_seconds)
|
|
|
|
|
|
def run() -> None:
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
|
uvicorn.run(
|
|
"docugen_mcp.main:asgi_app",
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
factory=True,
|
|
)
|
|
|
|
|
|
async def asgi_app() -> FastAPI:
|
|
return await build_app()
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify pass**
|
|
|
|
```bash
|
|
uv run pytest tests/integration/test_generate_flow.py -v
|
|
```
|
|
Expected: 1 PASSED.
|
|
|
|
Note: if `mcp.streamable_http_app()` method name differs in the installed `mcp` SDK version, replace with the SDK-specific factory (check `FastMCP` class: likely `http_app()`, `as_asgi()`, or `create_app()`). Adjust accordingly, keeping the mount pattern intact.
|
|
|
|
- [ ] **Step 5: Run full test suite**
|
|
|
|
```bash
|
|
uv run pytest -v
|
|
```
|
|
Expected: all tests pass. If any `mcp.streamable_http_app()`-related failure, fix per note above and rerun.
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add src/docugen_mcp/main.py tests/integration/test_generate_flow.py
|
|
git commit -m "feat: add bootstrap (FastMCP + FastAPI mount + cleanup task)"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 11: Docker packaging
|
|
|
|
**Files:**
|
|
- Create: `Dockerfile`
|
|
- Create: `docker-compose.yml`
|
|
- Modify: `README.md` (expand with Docker + client config sections)
|
|
|
|
- [ ] **Step 1: Create Dockerfile**
|
|
|
|
```dockerfile
|
|
FROM python:3.11-slim AS base
|
|
|
|
ENV PYTHONDONTWRITEBYTECODE=1 \
|
|
PYTHONUNBUFFERED=1 \
|
|
PIP_DISABLE_PIP_VERSION_CHECK=1
|
|
|
|
RUN apt-get update && apt-get install -y --no-install-recommends curl \
|
|
&& rm -rf /var/lib/apt/lists/* \
|
|
&& curl -LsSf https://astral.sh/uv/install.sh | sh \
|
|
&& mv /root/.local/bin/uv /usr/local/bin/uv
|
|
|
|
WORKDIR /app
|
|
|
|
COPY pyproject.toml uv.lock README.md ./
|
|
COPY src/ ./src/
|
|
|
|
RUN uv sync --frozen --no-dev
|
|
|
|
RUN useradd --system --no-create-home --uid 1000 mcpuser \
|
|
&& mkdir -p /data \
|
|
&& chown -R mcpuser:mcpuser /app /data
|
|
|
|
USER mcpuser
|
|
|
|
ENV DATA_DIR=/data
|
|
|
|
EXPOSE 8000
|
|
|
|
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
|
|
CMD curl -f http://localhost:8000/health || exit 1
|
|
|
|
CMD ["uv", "run", "docugen-mcp"]
|
|
```
|
|
|
|
- [ ] **Step 2: Create `docker-compose.yml`**
|
|
|
|
```yaml
|
|
services:
|
|
docugen-mcp:
|
|
build: .
|
|
container_name: docugen-mcp
|
|
restart: unless-stopped
|
|
ports:
|
|
- "8000:8000"
|
|
env_file:
|
|
- .env
|
|
volumes:
|
|
- ./data:/data
|
|
healthcheck:
|
|
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
|
|
interval: 30s
|
|
timeout: 5s
|
|
retries: 3
|
|
```
|
|
|
|
- [ ] **Step 3: Expand `README.md`**
|
|
|
|
```markdown
|
|
# docugen-mcp
|
|
|
|
MCP server for generating Markdown documents from templates via OpenRouter.
|
|
|
|
See design spec: `../2026-04-21-docugen-mcp-design.md`.
|
|
|
|
## Setup (local dev)
|
|
|
|
```bash
|
|
uv sync --all-extras
|
|
cp .env.example .env # fill API_KEY, OPENROUTER_API_KEY, PUBLIC_BASE_URL
|
|
uv run docugen-mcp
|
|
```
|
|
|
|
Server listens on `http://localhost:8000`. MCP endpoint: `/mcp`.
|
|
|
|
## Test
|
|
|
|
```bash
|
|
uv run pytest
|
|
```
|
|
|
|
## Docker
|
|
|
|
Build and run:
|
|
|
|
```bash
|
|
docker compose up --build -d
|
|
```
|
|
|
|
Mount `./data` on the host so templates, SQLite DB, and generated assets persist across restarts.
|
|
|
|
## Reverse proxy / TLS
|
|
|
|
The container listens on plain HTTP. Put it behind Nginx/Traefik/Caddy for TLS termination and public exposure.
|
|
|
|
Set `PUBLIC_BASE_URL` in `.env` to the external URL (e.g. `https://mcp.example.com`). This URL is used to build asset links in generated Markdown.
|
|
|
|
## Client configuration
|
|
|
|
### Claude Code
|
|
|
|
```bash
|
|
claude mcp add --transport http docugen-mcp https://mcp.example.com/mcp \
|
|
--header "Authorization: Bearer <API_KEY>"
|
|
```
|
|
|
|
### Claude Desktop
|
|
|
|
Add to `claude_desktop_config.json`:
|
|
|
|
```json
|
|
{
|
|
"mcpServers": {
|
|
"docugen-mcp": {
|
|
"transport": "http",
|
|
"url": "https://mcp.example.com/mcp",
|
|
"headers": {
|
|
"Authorization": "Bearer <API_KEY>"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## MCP tools exposed
|
|
|
|
| Tool | Purpose |
|
|
|------|---------|
|
|
| `template_create` | Create template (frontmatter + body + optional assets) |
|
|
| `template_update` | Update an existing template |
|
|
| `template_delete` | Remove a template |
|
|
| `template_list` | List templates (name, description, updated_at) |
|
|
| `template_get` | Load a template's frontmatter, body, asset list |
|
|
| `document_generate` | Generate Markdown from a template + content + variables |
|
|
|
|
## Environment variables
|
|
|
|
| Variable | Default | Description |
|
|
|----------|---------|-------------|
|
|
| `API_KEY` | *(required)* | Bearer token for MCP clients |
|
|
| `OPENROUTER_API_KEY` | *(required)* | OpenRouter API key |
|
|
| `OPENROUTER_BASE_URL` | `https://openrouter.ai/api/v1` | OpenRouter endpoint |
|
|
| `LLM_MODEL_DEFAULT` | `anthropic/claude-sonnet-4` | Model used when template has no override |
|
|
| `PUBLIC_BASE_URL` | *(required)* | Public URL of this server (for asset links) |
|
|
| `DATA_DIR` | `/data` | Persistence directory |
|
|
| `ASSET_TTL_DAYS` | `30` | Retention for ephemeral generated assets |
|
|
| `MAX_IMAGE_SIZE_MB` | `10` | Maximum size for inline images passed as variables |
|
|
| `LLM_TIMEOUT_SECONDS` | `60` | OpenRouter request timeout |
|
|
```
|
|
|
|
- [ ] **Step 4: Verify Docker build**
|
|
|
|
```bash
|
|
docker compose build
|
|
```
|
|
Expected: build completes without errors.
|
|
|
|
- [ ] **Step 5: Smoke-test the container**
|
|
|
|
Create a minimal `.env` for testing:
|
|
```bash
|
|
cat > .env <<EOF
|
|
API_KEY=local-test-key
|
|
OPENROUTER_API_KEY=sk-or-dummy
|
|
PUBLIC_BASE_URL=http://localhost:8000
|
|
DATA_DIR=/data
|
|
EOF
|
|
|
|
docker compose up -d
|
|
sleep 3
|
|
curl -f http://localhost:8000/health
|
|
# Expected: {"status":"ok"}
|
|
docker compose down
|
|
```
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add Dockerfile docker-compose.yml README.md
|
|
git commit -m "feat: add Dockerfile, docker-compose, and expanded README"
|
|
```
|
|
|
|
---
|
|
|
|
## Final verification
|
|
|
|
- [ ] Run the full suite one more time:
|
|
|
|
```bash
|
|
uv run pytest --cov=src/docugen_mcp --cov-report=term-missing
|
|
```
|
|
Expected: all tests pass; coverage ≥ 85% on logic modules (template_store, renderer, generation_store, llm_client, auth).
|
|
|
|
- [ ] Tag release candidate:
|
|
|
|
```bash
|
|
git tag v0.1.0
|
|
```
|
|
|
|
---
|
|
|
|
## Notes for the implementer
|
|
|
|
- **FastMCP API check:** the exact method to obtain the ASGI app from `FastMCP` depends on the installed `mcp` SDK version. In Task 10 Step 3 the code uses `mcp.streamable_http_app()`. If that method name is wrong on the installed version, check the `FastMCP` class: common alternatives are `http_app()`, `sse_app()`, `create_app()`. Adjust and re-run `test_generate_flow.py` until it passes. The structural pattern (FastMCP ASGI root + FastAPI mount on `/`) stays valid.
|
|
- **Clock-dependent tests:** TTL tests use `ttl_days=-1` to force expiration immediately; avoid sleeping in tests.
|
|
- **Do not commit `.env`:** it's in `.gitignore`. `.env.example` is the template.
|
|
- **Image max size limit is pre-decode:** the 10 MB threshold checks the decoded bytes, not the base64 string. Test in Task 7 uses `11 * 1024 * 1024` raw bytes to trigger it.
|
|
- **SQLite cleanup:** the TTL task runs on the same event loop as the server. For heavy traffic, consider a separate worker.
|
|
- **`template_update` asset semantics:** when `assets` is passed (even empty list `[]`), the directory is wiped and replaced. Pass `None` (or omit) to leave assets untouched. The test suite covers the explicit replacement case but not the implicit preservation — add a follow-up test if this becomes a concern.
|