26e5b9343d
Add PDF report generation for Metrologist dashboard with SPC and measurement reports including SVG charts, capability indices, and company logo embedding. New files: - server/services/report_service.py (Jinja2 + Plotly/Kaleido + WeasyPrint) - server/routers/reports.py (2 GET endpoints with auth) - server/templates/reports/ (base, spc, measurement HTML templates) Modified: - server/main.py (register reports router) - client dashboard (download buttons + proxy routes) - i18n strings IT/EN Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
430 lines
15 KiB
Python
430 lines
15 KiB
Python
"""Report generation service — PDF reports via Jinja2 + Plotly/Kaleido + WeasyPrint."""
|
|
import base64
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import plotly.graph_objects as go
|
|
import plotly.io as pio
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from sqlalchemy import and_, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from weasyprint import HTML
|
|
|
|
from config import settings
|
|
from models.measurement import Measurement
|
|
from models.recipe import Recipe, RecipeVersion
|
|
from models.setting import SystemSetting
|
|
from models.task import RecipeSubtask, RecipeTask
|
|
from models.user import User
|
|
from services.spc_service import (
|
|
compute_capability,
|
|
compute_control_chart,
|
|
compute_histogram,
|
|
compute_summary,
|
|
)
|
|
|
|
# Jinja2 environment for report templates
|
|
_templates_dir = Path(__file__).parent.parent / "templates" / "reports"
|
|
_jinja_env = Environment(
|
|
loader=FileSystemLoader(str(_templates_dir)),
|
|
autoescape=True,
|
|
)
|
|
|
|
|
|
async def _query_measurements(
|
|
db: AsyncSession,
|
|
recipe_id: int,
|
|
version_id: int | None = None,
|
|
subtask_id: int | None = None,
|
|
date_from: datetime | None = None,
|
|
date_to: datetime | None = None,
|
|
operator_id: int | None = None,
|
|
lot_number: str | None = None,
|
|
serial_number: str | None = None,
|
|
) -> list[Measurement]:
|
|
"""Query measurements with common filters (duplicated from statistics router)."""
|
|
filters = []
|
|
|
|
if version_id is not None:
|
|
filters.append(Measurement.version_id == version_id)
|
|
else:
|
|
version_ids = select(RecipeVersion.id).where(
|
|
RecipeVersion.recipe_id == recipe_id
|
|
)
|
|
filters.append(Measurement.version_id.in_(version_ids))
|
|
|
|
if subtask_id is not None:
|
|
filters.append(Measurement.subtask_id == subtask_id)
|
|
if date_from is not None:
|
|
filters.append(Measurement.measured_at >= date_from)
|
|
if date_to is not None:
|
|
filters.append(Measurement.measured_at <= date_to)
|
|
if operator_id is not None:
|
|
filters.append(Measurement.measured_by == operator_id)
|
|
if lot_number is not None:
|
|
filters.append(Measurement.lot_number == lot_number)
|
|
if serial_number is not None:
|
|
filters.append(Measurement.serial_number == serial_number)
|
|
|
|
query = (
|
|
select(Measurement)
|
|
.where(and_(*filters) if filters else True)
|
|
.order_by(Measurement.measured_at.asc())
|
|
)
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
|
|
async def _get_company_info(db: AsyncSession) -> dict:
|
|
"""Read company logo path and company name from system_settings."""
|
|
info = {"logo_base64": None, "company_name": "TieMeasureFlow"}
|
|
|
|
# Company name
|
|
result = await db.execute(
|
|
select(SystemSetting).where(SystemSetting.setting_key == "company_name")
|
|
)
|
|
setting = result.scalar_one_or_none()
|
|
if setting:
|
|
info["company_name"] = setting.setting_value
|
|
|
|
# Company logo
|
|
result = await db.execute(
|
|
select(SystemSetting).where(SystemSetting.setting_key == "company_logo_path")
|
|
)
|
|
setting = result.scalar_one_or_none()
|
|
if setting and setting.setting_value:
|
|
logo_path = settings.upload_path / setting.setting_value
|
|
if logo_path.exists():
|
|
with open(logo_path, "rb") as f:
|
|
logo_bytes = f.read()
|
|
# Detect mime type
|
|
suffix = logo_path.suffix.lower()
|
|
mime = {".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".svg": "image/svg+xml"}.get(suffix, "image/png")
|
|
info["logo_base64"] = f"data:{mime};base64,{base64.b64encode(logo_bytes).decode()}"
|
|
|
|
return info
|
|
|
|
|
|
async def _get_recipe_info(db: AsyncSession, recipe_id: int) -> dict:
|
|
"""Get recipe code and name."""
|
|
result = await db.execute(select(Recipe).where(Recipe.id == recipe_id))
|
|
recipe = result.scalar_one_or_none()
|
|
if recipe:
|
|
return {"code": recipe.code, "name": recipe.name, "id": recipe.id}
|
|
return {"code": "???", "name": "Unknown", "id": recipe_id}
|
|
|
|
|
|
async def _get_subtask_info(db: AsyncSession, subtask_id: int) -> dict | None:
|
|
"""Get subtask details including tolerances."""
|
|
result = await db.execute(
|
|
select(RecipeSubtask).where(RecipeSubtask.id == subtask_id)
|
|
)
|
|
st = result.scalar_one_or_none()
|
|
if st is None:
|
|
return None
|
|
return {
|
|
"id": st.id,
|
|
"marker_number": st.marker_number,
|
|
"description": st.description,
|
|
"unit": st.unit,
|
|
"nominal": float(st.nominal) if st.nominal is not None else None,
|
|
"utl": float(st.utl) if st.utl is not None else None,
|
|
"uwl": float(st.uwl) if st.uwl is not None else None,
|
|
"lwl": float(st.lwl) if st.lwl is not None else None,
|
|
"ltl": float(st.ltl) if st.ltl is not None else None,
|
|
}
|
|
|
|
|
|
async def _get_operator_map(db: AsyncSession, user_ids: set[int]) -> dict[int, str]:
|
|
"""Build a map of user_id -> display name."""
|
|
if not user_ids:
|
|
return {}
|
|
result = await db.execute(select(User).where(User.id.in_(user_ids)))
|
|
users = result.scalars().all()
|
|
return {u.id: u.display_name or u.username for u in users}
|
|
|
|
|
|
def _generate_control_chart_svg(chart_data, subtask_info: dict | None) -> str:
|
|
"""Generate control chart SVG using Plotly + Kaleido."""
|
|
if not chart_data.values:
|
|
return ""
|
|
|
|
fig = go.Figure()
|
|
|
|
# Measurement values
|
|
x_vals = list(range(1, len(chart_data.values) + 1))
|
|
fig.add_trace(go.Scatter(
|
|
x=x_vals, y=chart_data.values,
|
|
mode="lines+markers",
|
|
name="Values",
|
|
line=dict(color="#2563EB", width=2),
|
|
marker=dict(size=5),
|
|
))
|
|
|
|
# Mean line
|
|
fig.add_hline(y=chart_data.mean, line=dict(color="#64748B", dash="dash", width=1.5),
|
|
annotation_text=f"Mean: {chart_data.mean:.4f}")
|
|
|
|
# UCL / LCL
|
|
fig.add_hline(y=chart_data.ucl, line=dict(color="#EF4444", dash="dot", width=1),
|
|
annotation_text=f"UCL: {chart_data.ucl:.4f}")
|
|
fig.add_hline(y=chart_data.lcl, line=dict(color="#EF4444", dash="dot", width=1),
|
|
annotation_text=f"LCL: {chart_data.lcl:.4f}")
|
|
|
|
# Tolerance limits
|
|
if chart_data.utl is not None:
|
|
fig.add_hline(y=chart_data.utl, line=dict(color="#DC2626", width=1.5),
|
|
annotation_text=f"UTL: {chart_data.utl}")
|
|
if chart_data.ltl is not None:
|
|
fig.add_hline(y=chart_data.ltl, line=dict(color="#DC2626", width=1.5),
|
|
annotation_text=f"LTL: {chart_data.ltl}")
|
|
|
|
# Out of control points
|
|
if chart_data.out_of_control:
|
|
ooc_x = [x_vals[i] for i in chart_data.out_of_control]
|
|
ooc_y = [chart_data.values[i] for i in chart_data.out_of_control]
|
|
fig.add_trace(go.Scatter(
|
|
x=ooc_x, y=ooc_y,
|
|
mode="markers",
|
|
name="Out of control",
|
|
marker=dict(color="#EF4444", size=10, symbol="x"),
|
|
))
|
|
|
|
fig.update_layout(
|
|
title="Control Chart",
|
|
xaxis_title="Measurement #",
|
|
yaxis_title="Value",
|
|
template="plotly_white",
|
|
width=700, height=350,
|
|
margin=dict(l=60, r=30, t=40, b=40),
|
|
showlegend=False,
|
|
)
|
|
|
|
svg_bytes = pio.to_image(fig, format="svg", engine="kaleido")
|
|
return svg_bytes.decode("utf-8")
|
|
|
|
|
|
def _generate_histogram_svg(hist_data, subtask_info: dict | None) -> str:
|
|
"""Generate histogram SVG using Plotly + Kaleido."""
|
|
if not hist_data.bins:
|
|
return ""
|
|
|
|
fig = go.Figure()
|
|
|
|
# Histogram bars
|
|
bin_centers = [(hist_data.bins[i] + hist_data.bins[i + 1]) / 2 for i in range(len(hist_data.counts))]
|
|
bin_width = hist_data.bins[1] - hist_data.bins[0] if len(hist_data.bins) > 1 else 1
|
|
|
|
fig.add_trace(go.Bar(
|
|
x=bin_centers, y=hist_data.counts,
|
|
width=bin_width * 0.9,
|
|
marker_color="#2563EB",
|
|
opacity=0.7,
|
|
name="Frequency",
|
|
))
|
|
|
|
# Normal curve overlay
|
|
if hist_data.normal_x and hist_data.normal_y:
|
|
fig.add_trace(go.Scatter(
|
|
x=hist_data.normal_x, y=hist_data.normal_y,
|
|
mode="lines",
|
|
name="Normal curve",
|
|
line=dict(color="#EF4444", width=2),
|
|
))
|
|
|
|
# Tolerance lines
|
|
if subtask_info:
|
|
if subtask_info.get("utl") is not None:
|
|
fig.add_vline(x=subtask_info["utl"], line=dict(color="#DC2626", width=1.5, dash="dash"),
|
|
annotation_text="UTL")
|
|
if subtask_info.get("ltl") is not None:
|
|
fig.add_vline(x=subtask_info["ltl"], line=dict(color="#DC2626", width=1.5, dash="dash"),
|
|
annotation_text="LTL")
|
|
if subtask_info.get("nominal") is not None:
|
|
fig.add_vline(x=subtask_info["nominal"], line=dict(color="#22C55E", width=1.5),
|
|
annotation_text="Nom")
|
|
|
|
fig.update_layout(
|
|
title="Histogram",
|
|
xaxis_title="Value",
|
|
yaxis_title="Frequency",
|
|
template="plotly_white",
|
|
width=700, height=350,
|
|
margin=dict(l=60, r=30, t=40, b=40),
|
|
bargap=0.05,
|
|
showlegend=False,
|
|
)
|
|
|
|
svg_bytes = pio.to_image(fig, format="svg", engine="kaleido")
|
|
return svg_bytes.decode("utf-8")
|
|
|
|
|
|
async def generate_spc_report(
|
|
db: AsyncSession,
|
|
recipe_id: int,
|
|
subtask_id: int,
|
|
version_id: int | None = None,
|
|
date_from: datetime | None = None,
|
|
date_to: datetime | None = None,
|
|
operator_id: int | None = None,
|
|
lot_number: str | None = None,
|
|
serial_number: str | None = None,
|
|
) -> bytes:
|
|
"""Generate SPC PDF report and return bytes.
|
|
|
|
Includes: summary, capability indices, control chart SVG, histogram SVG.
|
|
"""
|
|
# 1. Query measurements
|
|
measurements = await _query_measurements(
|
|
db, recipe_id, version_id, subtask_id,
|
|
date_from, date_to, operator_id, lot_number, serial_number,
|
|
)
|
|
|
|
# 2. Compute SPC data
|
|
pass_fail_values = [m.pass_fail for m in measurements]
|
|
values = [float(m.value) for m in measurements]
|
|
timestamps = [m.measured_at for m in measurements]
|
|
|
|
summary = compute_summary(pass_fail_values)
|
|
|
|
subtask_info = await _get_subtask_info(db, subtask_id)
|
|
tol_utl = subtask_info["utl"] if subtask_info else None
|
|
tol_ltl = subtask_info["ltl"] if subtask_info else None
|
|
tol_uwl = subtask_info["uwl"] if subtask_info else None
|
|
tol_lwl = subtask_info["lwl"] if subtask_info else None
|
|
tol_nominal = subtask_info["nominal"] if subtask_info else None
|
|
|
|
capability = compute_capability(values, tol_utl, tol_ltl, tol_nominal)
|
|
control_chart = compute_control_chart(
|
|
values, timestamps, tol_utl, tol_uwl, tol_lwl, tol_ltl, tol_nominal
|
|
)
|
|
histogram = compute_histogram(values)
|
|
|
|
# 3. Generate SVG charts
|
|
control_chart_svg = _generate_control_chart_svg(control_chart, subtask_info)
|
|
histogram_svg = _generate_histogram_svg(histogram, subtask_info)
|
|
|
|
# 4. Get company info and recipe info
|
|
company = await _get_company_info(db)
|
|
recipe_info = await _get_recipe_info(db, recipe_id)
|
|
|
|
# 5. Build filter description
|
|
filters_desc = []
|
|
if version_id:
|
|
filters_desc.append(f"Version: {version_id}")
|
|
if date_from:
|
|
filters_desc.append(f"From: {date_from.strftime('%Y-%m-%d')}")
|
|
if date_to:
|
|
filters_desc.append(f"To: {date_to.strftime('%Y-%m-%d')}")
|
|
if lot_number:
|
|
filters_desc.append(f"Lot: {lot_number}")
|
|
if serial_number:
|
|
filters_desc.append(f"Serial: {serial_number}")
|
|
|
|
# 6. Render HTML template
|
|
template = _jinja_env.get_template("spc_report.html")
|
|
html_content = template.render(
|
|
company=company,
|
|
recipe=recipe_info,
|
|
subtask=subtask_info,
|
|
summary=summary,
|
|
capability=capability,
|
|
control_chart_svg=control_chart_svg,
|
|
histogram_svg=histogram_svg,
|
|
filters_desc=filters_desc,
|
|
generated_at=datetime.now(),
|
|
n_measurements=len(measurements),
|
|
)
|
|
|
|
# 7. Convert HTML to PDF
|
|
pdf_bytes = HTML(string=html_content).write_pdf()
|
|
return pdf_bytes
|
|
|
|
|
|
async def generate_measurement_report(
|
|
db: AsyncSession,
|
|
recipe_id: int,
|
|
subtask_id: int | None = None,
|
|
version_id: int | None = None,
|
|
date_from: datetime | None = None,
|
|
date_to: datetime | None = None,
|
|
operator_id: int | None = None,
|
|
lot_number: str | None = None,
|
|
serial_number: str | None = None,
|
|
) -> bytes:
|
|
"""Generate measurement table PDF report and return bytes."""
|
|
# 1. Query measurements
|
|
measurements = await _query_measurements(
|
|
db, recipe_id, version_id, subtask_id,
|
|
date_from, date_to, operator_id, lot_number, serial_number,
|
|
)
|
|
|
|
# 2. Get recipe info
|
|
recipe_info = await _get_recipe_info(db, recipe_id)
|
|
company = await _get_company_info(db)
|
|
|
|
# 3. Get subtask map and operator map
|
|
subtask_ids = {m.subtask_id for m in measurements}
|
|
operator_ids = {m.measured_by for m in measurements}
|
|
|
|
subtask_map = {}
|
|
for sid in subtask_ids:
|
|
info = await _get_subtask_info(db, sid)
|
|
if info:
|
|
subtask_map[sid] = info
|
|
|
|
operator_map = await _get_operator_map(db, operator_ids)
|
|
|
|
# 4. Build table rows
|
|
rows = []
|
|
for i, m in enumerate(measurements, 1):
|
|
st_info = subtask_map.get(m.subtask_id, {})
|
|
rows.append({
|
|
"num": i,
|
|
"subtask": f"#{st_info.get('marker_number', '?')} — {st_info.get('description', '?')}",
|
|
"value": f"{float(m.value):.4f}",
|
|
"unit": st_info.get("unit", "mm"),
|
|
"pass_fail": m.pass_fail,
|
|
"measured_at": m.measured_at.strftime("%Y-%m-%d %H:%M"),
|
|
"operator": operator_map.get(m.measured_by, f"ID {m.measured_by}"),
|
|
"lot_number": m.lot_number or "—",
|
|
"serial_number": m.serial_number or "—",
|
|
})
|
|
|
|
# 5. Summary
|
|
pass_fail_values = [m.pass_fail for m in measurements]
|
|
summary = compute_summary(pass_fail_values)
|
|
|
|
# 6. Build filter description
|
|
filters_desc = []
|
|
if subtask_id:
|
|
si = subtask_map.get(subtask_id)
|
|
if si:
|
|
filters_desc.append(f"Point: #{si['marker_number']} — {si['description']}")
|
|
if version_id:
|
|
filters_desc.append(f"Version: {version_id}")
|
|
if date_from:
|
|
filters_desc.append(f"From: {date_from.strftime('%Y-%m-%d')}")
|
|
if date_to:
|
|
filters_desc.append(f"To: {date_to.strftime('%Y-%m-%d')}")
|
|
if lot_number:
|
|
filters_desc.append(f"Lot: {lot_number}")
|
|
if serial_number:
|
|
filters_desc.append(f"Serial: {serial_number}")
|
|
|
|
# 7. Render template
|
|
template = _jinja_env.get_template("measurement_report.html")
|
|
html_content = template.render(
|
|
company=company,
|
|
recipe=recipe_info,
|
|
rows=rows,
|
|
summary=summary,
|
|
filters_desc=filters_desc,
|
|
generated_at=datetime.now(),
|
|
n_measurements=len(measurements),
|
|
)
|
|
|
|
# 8. Convert to PDF
|
|
pdf_bytes = HTML(string=html_content).write_pdf()
|
|
return pdf_bytes
|