feat(mcp-docugen): Task 4-6 template_store, llm_client, generation_store

- TemplateStore: CRUD filesystem + asset dir, frontmatter YAML roundtrip,
  path traversal rejection
- OpenRouterClient: async httpx con retry backoff esponenziale (5xx, 429,
  timeout), no-retry su 4xx, parse usage/cost
- GenerationStore: SQLite aiosqlite con schema generations + ephemeral_assets,
  cleanup TTL, stats aggregate

Root pyproject aggiornato con respx + pytest-cov dev deps.
19 + 11 + 9 + 6 = 45 test totali, tutti passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-21 12:21:43 +02:00
parent d5c645bf17
commit 9e80a20063
8 changed files with 963 additions and 0 deletions
@@ -0,0 +1,195 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import aiosqlite
_SCHEMA = """
CREATE TABLE IF NOT EXISTS generations (
id TEXT PRIMARY KEY,
timestamp INTEGER NOT NULL,
template_name TEXT NOT NULL,
model TEXT NOT NULL,
tokens_in INTEGER NOT NULL,
tokens_out INTEGER NOT NULL,
cost_usd REAL NOT NULL,
success INTEGER NOT NULL,
error_msg TEXT
);
CREATE INDEX IF NOT EXISTS idx_generations_timestamp ON generations(timestamp);
CREATE TABLE IF NOT EXISTS ephemeral_assets (
generation_id TEXT NOT NULL,
var_name TEXT NOT NULL,
file_path TEXT NOT NULL,
mime TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
PRIMARY KEY (generation_id, var_name)
);
CREATE INDEX IF NOT EXISTS idx_assets_expires ON ephemeral_assets(expires_at);
"""
@dataclass
class GenerationRecord:
id: str
template_name: str
model: str
tokens_in: int
tokens_out: int
cost_usd: float
success: bool
error_msg: str | None
@dataclass
class EphemeralAssetRecord:
generation_id: str
var_name: str
file_path: str
mime: str
ttl_days: int
@dataclass
class EphemeralAssetInfo:
generation_id: str
var_name: str
file_path: str
mime: str
created_at: datetime
expires_at: datetime
is_expired: bool
def _now_ms() -> int:
return int(datetime.now(tz=timezone.utc).timestamp() * 1000)
class GenerationStore:
def __init__(self, db_path: Path, generated_dir: Path) -> None:
self.db_path = Path(db_path)
self.generated_dir = Path(generated_dir)
self.generated_dir.mkdir(parents=True, exist_ok=True)
async def init(self) -> None:
async with aiosqlite.connect(self.db_path) as db:
await db.executescript(_SCHEMA)
await db.commit()
async def record_generation(self, record: GenerationRecord) -> None:
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT INTO generations
(id, timestamp, template_name, model, tokens_in, tokens_out,
cost_usd, success, error_msg)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
record.id,
_now_ms(),
record.template_name,
record.model,
record.tokens_in,
record.tokens_out,
record.cost_usd,
1 if record.success else 0,
record.error_msg,
),
)
await db.commit()
async def register_ephemeral_asset(
self, record: EphemeralAssetRecord
) -> None:
now = _now_ms()
expires = now + int(record.ttl_days * 24 * 3600 * 1000)
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT OR REPLACE INTO ephemeral_assets
(generation_id, var_name, file_path, mime, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
record.generation_id,
record.var_name,
record.file_path,
record.mime,
now,
expires,
),
)
await db.commit()
async def get_ephemeral_asset(
self, generation_id: str, filename: str
) -> EphemeralAssetInfo | None:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"""
SELECT generation_id, var_name, file_path, mime, created_at, expires_at
FROM ephemeral_assets
WHERE generation_id = ?
""",
(generation_id,),
)
rows = await cursor.fetchall()
for row in rows:
path = Path(row[2])
if path.name == filename:
created_at = datetime.fromtimestamp(row[4] / 1000, tz=timezone.utc)
expires_at = datetime.fromtimestamp(row[5] / 1000, tz=timezone.utc)
return EphemeralAssetInfo(
generation_id=row[0],
var_name=row[1],
file_path=row[2],
mime=row[3],
created_at=created_at,
expires_at=expires_at,
is_expired=expires_at < datetime.now(tz=timezone.utc),
)
return None
async def cleanup_expired(self) -> int:
now = _now_ms()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT generation_id, file_path FROM ephemeral_assets WHERE expires_at < ?",
(now,),
)
rows = await cursor.fetchall()
count = 0
for _, fpath in rows:
try:
Path(fpath).unlink(missing_ok=True)
except OSError:
pass
count += 1
for gen_id, _ in rows:
gdir = self.generated_dir / gen_id
if gdir.exists() and not any(gdir.iterdir()):
gdir.rmdir()
await db.execute(
"DELETE FROM ephemeral_assets WHERE expires_at < ?",
(now,),
)
await db.commit()
return count
async def get_stats(self) -> dict:
async with aiosqlite.connect(self.db_path) as db:
cur = await db.execute(
"SELECT COUNT(*), SUM(success), SUM(cost_usd) FROM generations"
)
total, success, cost = await cur.fetchone()
return {
"total": total or 0,
"success": success or 0,
"failed": (total or 0) - (success or 0),
"total_cost_usd": float(cost or 0),
}
@@ -0,0 +1,146 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from time import perf_counter
import httpx
class LLMError(Exception):
pass
class LLMTimeout(LLMError):
pass
class LLMUpstreamError(LLMError):
pass
class LLMAuthError(LLMError):
pass
class LLMRateLimit(LLMError):
pass
class LLMInvalidResponse(LLMError):
pass
class LLMEmptyResponse(LLMError):
pass
@dataclass
class LLMResponse:
text: str
model: str
tokens_in: int
tokens_out: int
cost_usd: float
latency_ms: int
class OpenRouterClient:
def __init__(
self,
api_key: str,
base_url: str,
timeout: float = 60,
max_retries: int = 3,
retry_base_delay: float = 1.0,
) -> None:
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.max_retries = max_retries
self.retry_base_delay = retry_base_delay
async def chat(self, model: str, system: str, user: str) -> LLMResponse:
payload = {
"model": model,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
start = perf_counter()
last_transient_error: Exception | None = None
for attempt in range(self.max_retries):
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
)
except (httpx.ReadTimeout, httpx.ConnectTimeout) as exc:
last_transient_error = exc
if attempt == self.max_retries - 1:
raise LLMTimeout(str(exc)) from exc
await self._sleep_backoff(attempt, rate_limit=False)
continue
status = response.status_code
if status == 200:
return self._parse_success(response, start)
if status in (401, 403):
raise LLMAuthError(f"status {status}: {response.text[:200]}")
if status == 429:
if attempt == self.max_retries - 1:
raise LLMRateLimit(response.text[:200])
await self._sleep_backoff(attempt, rate_limit=True)
continue
if 500 <= status < 600:
if attempt == self.max_retries - 1:
raise LLMUpstreamError(
f"status {status}: {response.text[:200]}"
)
await self._sleep_backoff(attempt, rate_limit=False)
continue
raise LLMUpstreamError(
f"unexpected status {status}: {response.text[:200]}"
)
raise LLMUpstreamError(f"retries exhausted: {last_transient_error}")
async def _sleep_backoff(self, attempt: int, rate_limit: bool) -> None:
multiplier = 5 if rate_limit else 1
delay = self.retry_base_delay * multiplier * (2**attempt)
if delay > 0:
await asyncio.sleep(delay)
def _parse_success(
self, response: httpx.Response, start: float
) -> LLMResponse:
try:
data = response.json()
choice = data["choices"][0]
text = choice["message"]["content"]
model = data.get("model", "unknown")
usage = data.get("usage", {})
except (KeyError, IndexError, ValueError) as exc:
raise LLMInvalidResponse(str(exc)) from exc
if not text:
raise LLMEmptyResponse("LLM returned empty content")
latency_ms = int((perf_counter() - start) * 1000)
return LLMResponse(
text=text,
model=model,
tokens_in=int(usage.get("prompt_tokens", 0)),
tokens_out=int(usage.get("completion_tokens", 0)),
cost_usd=float(usage.get("total_cost", 0.0)),
latency_ms=latency_ms,
)
@@ -0,0 +1,221 @@
from __future__ import annotations
import base64
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import aiofiles
import yaml
from pydantic import ValidationError
from mcp_docugen.models import TemplateAsset, TemplateFrontmatter, TemplateSummary
_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9-]*$")
_ASSET_FILENAME_RE = re.compile(r"^[A-Za-z0-9._-]+$")
_FRONTMATTER_DELIM = "---"
class TemplateNotFound(Exception):
pass
class TemplateAlreadyExists(Exception):
pass
class InvalidFrontmatter(Exception):
pass
class InvalidTemplateName(ValueError):
pass
@dataclass
class LoadedTemplate:
frontmatter: TemplateFrontmatter
body: str
assets: list[TemplateAsset]
updated_at: datetime
class TemplateStore:
def __init__(self, base_dir: Path) -> None:
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
async def create(
self,
name: str,
frontmatter: TemplateFrontmatter,
body: str,
assets: list[dict] | None = None,
) -> None:
self._validate_name(name)
tdir = self.base_dir / name
if tdir.exists():
raise TemplateAlreadyExists(name)
tdir.mkdir(parents=True)
(tdir / "assets").mkdir()
if assets:
self._validate_assets(assets)
await self._write_template_file(tdir, frontmatter, body)
if assets:
await self._write_assets(tdir, assets)
async def update(
self,
name: str,
frontmatter: TemplateFrontmatter,
body: str,
assets: list[dict] | None = None,
) -> None:
self._validate_name(name)
tdir = self.base_dir / name
if not tdir.exists():
raise TemplateNotFound(name)
await self._write_template_file(tdir, frontmatter, body)
if assets is not None:
assets_dir = tdir / "assets"
for f in assets_dir.iterdir():
f.unlink()
await self._write_assets(tdir, assets)
async def delete(self, name: str) -> None:
self._validate_name(name)
tdir = self.base_dir / name
if not tdir.exists():
raise TemplateNotFound(name)
for f in (tdir / "assets").iterdir():
f.unlink()
(tdir / "assets").rmdir()
(tdir / "template.md").unlink()
tdir.rmdir()
async def get(self, name: str) -> LoadedTemplate:
self._validate_name(name)
tdir = self.base_dir / name
tpath = tdir / "template.md"
if not tpath.exists():
raise TemplateNotFound(name)
frontmatter, body = await self._read_template_file(tpath)
assets = self._list_assets(tdir)
stat = tpath.stat()
updated_at = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
return LoadedTemplate(
frontmatter=frontmatter, body=body, assets=assets, updated_at=updated_at
)
async def list(self) -> list[TemplateSummary]:
summaries = []
for tdir in sorted(self.base_dir.iterdir()):
if not tdir.is_dir():
continue
tpath = tdir / "template.md"
if not tpath.exists():
continue
try:
frontmatter, _ = await self._read_template_file(tpath)
except InvalidFrontmatter:
continue
stat = tpath.stat()
summaries.append(
TemplateSummary(
name=frontmatter.name,
description=frontmatter.description,
updated_at=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
)
)
return summaries
async def read_asset(self, template_name: str, filename: str) -> bytes:
self._validate_name(template_name)
self._validate_asset_filename(filename)
path = self.base_dir / template_name / "assets" / filename
if not path.exists():
raise FileNotFoundError(filename)
async with aiofiles.open(path, "rb") as f:
return await f.read()
def asset_path(self, template_name: str, filename: str) -> Path:
self._validate_name(template_name)
self._validate_asset_filename(filename)
return self.base_dir / template_name / "assets" / filename
def _validate_name(self, name: str) -> None:
if not _SLUG_RE.match(name):
raise InvalidTemplateName(name)
def _validate_asset_filename(self, filename: str) -> None:
if not _ASSET_FILENAME_RE.match(filename):
raise ValueError(f"invalid asset filename: {filename!r}")
def _validate_assets(self, assets: list[dict]) -> None:
for asset in assets:
self._validate_asset_filename(asset["filename"])
async def _write_template_file(
self, tdir: Path, frontmatter: TemplateFrontmatter, body: str
) -> None:
yaml_text = yaml.safe_dump(
frontmatter.model_dump(exclude_none=True), sort_keys=False
)
content = f"{_FRONTMATTER_DELIM}\n{yaml_text}{_FRONTMATTER_DELIM}\n{body}"
async with aiofiles.open(tdir / "template.md", "w") as f:
await f.write(content)
async def _read_template_file(
self, path: Path
) -> tuple[TemplateFrontmatter, str]:
async with aiofiles.open(path) as f:
raw = await f.read()
if not raw.startswith(_FRONTMATTER_DELIM):
raise InvalidFrontmatter("missing opening '---'")
parts = raw.split(_FRONTMATTER_DELIM, 2)
if len(parts) < 3:
raise InvalidFrontmatter("missing closing '---'")
yaml_text = parts[1]
body = parts[2].lstrip("\n")
try:
data = yaml.safe_load(yaml_text) or {}
frontmatter = TemplateFrontmatter(**data)
except (yaml.YAMLError, ValidationError) as exc:
raise InvalidFrontmatter(str(exc)) from exc
return frontmatter, body
async def _write_assets(self, tdir: Path, assets: list[dict]) -> None:
for asset in assets:
filename = asset["filename"]
self._validate_asset_filename(filename)
data = base64.b64decode(asset["data_b64"])
async with aiofiles.open(tdir / "assets" / filename, "wb") as f:
await f.write(data)
def _list_assets(self, tdir: Path) -> list[TemplateAsset]:
assets_dir = tdir / "assets"
if not assets_dir.exists():
return []
out = []
for f in sorted(assets_dir.iterdir()):
if not f.is_file():
continue
out.append(
TemplateAsset(
filename=f.name,
mime=_guess_mime(f.name),
size_bytes=f.stat().st_size,
)
)
return out
def _guess_mime(filename: str) -> str:
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
return {
"png": "image/png",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"webp": "image/webp",
}.get(ext, "application/octet-stream")