"""Report generation service — PDF reports via Jinja2 + Plotly/Kaleido + WeasyPrint.""" import base64 from datetime import datetime from pathlib import Path import plotly.graph_objects as go import plotly.io as pio from jinja2 import Environment, FileSystemLoader from sqlalchemy import and_, select from sqlalchemy.ext.asyncio import AsyncSession from weasyprint import HTML from config import settings from models.measurement import Measurement from models.recipe import Recipe, RecipeVersion from models.setting import SystemSetting from models.task import RecipeSubtask, RecipeTask from models.user import User from services.spc_service import ( compute_capability, compute_control_chart, compute_histogram, compute_summary, ) # Jinja2 environment for report templates _templates_dir = Path(__file__).parent.parent / "templates" / "reports" _jinja_env = Environment( loader=FileSystemLoader(str(_templates_dir)), autoescape=True, ) async def _query_measurements( db: AsyncSession, recipe_id: int, version_id: int | None = None, subtask_id: int | None = None, date_from: datetime | None = None, date_to: datetime | None = None, operator_id: int | None = None, lot_number: str | None = None, serial_number: str | None = None, ) -> list[Measurement]: """Query measurements with common filters (duplicated from statistics router).""" filters = [] if version_id is not None: filters.append(Measurement.version_id == version_id) else: version_ids = select(RecipeVersion.id).where( RecipeVersion.recipe_id == recipe_id ) filters.append(Measurement.version_id.in_(version_ids)) if subtask_id is not None: filters.append(Measurement.subtask_id == subtask_id) if date_from is not None: filters.append(Measurement.measured_at >= date_from) if date_to is not None: filters.append(Measurement.measured_at <= date_to) if operator_id is not None: filters.append(Measurement.measured_by == operator_id) if lot_number is not None: filters.append(Measurement.lot_number == lot_number) if serial_number is not None: filters.append(Measurement.serial_number == serial_number) query = ( select(Measurement) .where(and_(*filters) if filters else True) .order_by(Measurement.measured_at.asc()) ) result = await db.execute(query) return list(result.scalars().all()) async def _get_company_info(db: AsyncSession) -> dict: """Read company logo path and company name from system_settings.""" info = {"logo_base64": None, "company_name": "TieMeasureFlow"} # Company name result = await db.execute( select(SystemSetting).where(SystemSetting.setting_key == "company_name") ) setting = result.scalar_one_or_none() if setting: info["company_name"] = setting.setting_value # Company logo result = await db.execute( select(SystemSetting).where(SystemSetting.setting_key == "company_logo_path") ) setting = result.scalar_one_or_none() if setting and setting.setting_value: logo_path = settings.upload_path / setting.setting_value if logo_path.exists(): with open(logo_path, "rb") as f: logo_bytes = f.read() # Detect mime type suffix = logo_path.suffix.lower() mime = {".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".svg": "image/svg+xml"}.get(suffix, "image/png") info["logo_base64"] = f"data:{mime};base64,{base64.b64encode(logo_bytes).decode()}" return info async def _get_recipe_info(db: AsyncSession, recipe_id: int) -> dict: """Get recipe code and name.""" result = await db.execute(select(Recipe).where(Recipe.id == recipe_id)) recipe = result.scalar_one_or_none() if recipe: return {"code": recipe.code, "name": recipe.name, "id": recipe.id} return {"code": "???", "name": "Unknown", "id": recipe_id} async def _get_subtask_info(db: AsyncSession, subtask_id: int) -> dict | None: """Get subtask details including tolerances.""" result = await db.execute( select(RecipeSubtask).where(RecipeSubtask.id == subtask_id) ) st = result.scalar_one_or_none() if st is None: return None return { "id": st.id, "marker_number": st.marker_number, "description": st.description, "unit": st.unit, "nominal": float(st.nominal) if st.nominal is not None else None, "utl": float(st.utl) if st.utl is not None else None, "uwl": float(st.uwl) if st.uwl is not None else None, "lwl": float(st.lwl) if st.lwl is not None else None, "ltl": float(st.ltl) if st.ltl is not None else None, } async def _get_operator_map(db: AsyncSession, user_ids: set[int]) -> dict[int, str]: """Build a map of user_id -> display name.""" if not user_ids: return {} result = await db.execute(select(User).where(User.id.in_(user_ids))) users = result.scalars().all() return {u.id: u.display_name or u.username for u in users} def _generate_control_chart_svg(chart_data, subtask_info: dict | None) -> str: """Generate control chart SVG using Plotly + Kaleido.""" if not chart_data.values: return "" fig = go.Figure() # Measurement values x_vals = list(range(1, len(chart_data.values) + 1)) fig.add_trace(go.Scatter( x=x_vals, y=chart_data.values, mode="lines+markers", name="Values", line=dict(color="#2563EB", width=2), marker=dict(size=5), )) # Mean line fig.add_hline(y=chart_data.mean, line=dict(color="#64748B", dash="dash", width=1.5), annotation_text=f"Mean: {chart_data.mean:.4f}") # UCL / LCL fig.add_hline(y=chart_data.ucl, line=dict(color="#EF4444", dash="dot", width=1), annotation_text=f"UCL: {chart_data.ucl:.4f}") fig.add_hline(y=chart_data.lcl, line=dict(color="#EF4444", dash="dot", width=1), annotation_text=f"LCL: {chart_data.lcl:.4f}") # Tolerance limits if chart_data.utl is not None: fig.add_hline(y=chart_data.utl, line=dict(color="#DC2626", width=1.5), annotation_text=f"UTL: {chart_data.utl}") if chart_data.ltl is not None: fig.add_hline(y=chart_data.ltl, line=dict(color="#DC2626", width=1.5), annotation_text=f"LTL: {chart_data.ltl}") # Out of control points if chart_data.out_of_control: ooc_x = [x_vals[i] for i in chart_data.out_of_control] ooc_y = [chart_data.values[i] for i in chart_data.out_of_control] fig.add_trace(go.Scatter( x=ooc_x, y=ooc_y, mode="markers", name="Out of control", marker=dict(color="#EF4444", size=10, symbol="x"), )) fig.update_layout( title="Control Chart", xaxis_title="Measurement #", yaxis_title="Value", template="plotly_white", width=700, height=350, margin=dict(l=60, r=30, t=40, b=40), showlegend=False, ) svg_bytes = pio.to_image(fig, format="svg", engine="kaleido") return svg_bytes.decode("utf-8") def _generate_histogram_svg(hist_data, subtask_info: dict | None) -> str: """Generate histogram SVG using Plotly + Kaleido.""" if not hist_data.bins: return "" fig = go.Figure() # Histogram bars bin_centers = [(hist_data.bins[i] + hist_data.bins[i + 1]) / 2 for i in range(len(hist_data.counts))] bin_width = hist_data.bins[1] - hist_data.bins[0] if len(hist_data.bins) > 1 else 1 fig.add_trace(go.Bar( x=bin_centers, y=hist_data.counts, width=bin_width * 0.9, marker_color="#2563EB", opacity=0.7, name="Frequency", )) # Normal curve overlay if hist_data.normal_x and hist_data.normal_y: fig.add_trace(go.Scatter( x=hist_data.normal_x, y=hist_data.normal_y, mode="lines", name="Normal curve", line=dict(color="#EF4444", width=2), )) # Tolerance lines if subtask_info: if subtask_info.get("utl") is not None: fig.add_vline(x=subtask_info["utl"], line=dict(color="#DC2626", width=1.5, dash="dash"), annotation_text="UTL") if subtask_info.get("ltl") is not None: fig.add_vline(x=subtask_info["ltl"], line=dict(color="#DC2626", width=1.5, dash="dash"), annotation_text="LTL") if subtask_info.get("nominal") is not None: fig.add_vline(x=subtask_info["nominal"], line=dict(color="#22C55E", width=1.5), annotation_text="Nom") fig.update_layout( title="Histogram", xaxis_title="Value", yaxis_title="Frequency", template="plotly_white", width=700, height=350, margin=dict(l=60, r=30, t=40, b=40), bargap=0.05, showlegend=False, ) svg_bytes = pio.to_image(fig, format="svg", engine="kaleido") return svg_bytes.decode("utf-8") async def generate_spc_report( db: AsyncSession, recipe_id: int, subtask_id: int, version_id: int | None = None, date_from: datetime | None = None, date_to: datetime | None = None, operator_id: int | None = None, lot_number: str | None = None, serial_number: str | None = None, ) -> bytes: """Generate SPC PDF report and return bytes. Includes: summary, capability indices, control chart SVG, histogram SVG. """ # 1. Query measurements measurements = await _query_measurements( db, recipe_id, version_id, subtask_id, date_from, date_to, operator_id, lot_number, serial_number, ) # 2. Compute SPC data pass_fail_values = [m.pass_fail for m in measurements] values = [float(m.value) for m in measurements] timestamps = [m.measured_at for m in measurements] summary = compute_summary(pass_fail_values) subtask_info = await _get_subtask_info(db, subtask_id) tol_utl = subtask_info["utl"] if subtask_info else None tol_ltl = subtask_info["ltl"] if subtask_info else None tol_uwl = subtask_info["uwl"] if subtask_info else None tol_lwl = subtask_info["lwl"] if subtask_info else None tol_nominal = subtask_info["nominal"] if subtask_info else None capability = compute_capability(values, tol_utl, tol_ltl, tol_nominal) control_chart = compute_control_chart( values, timestamps, tol_utl, tol_uwl, tol_lwl, tol_ltl, tol_nominal ) histogram = compute_histogram(values) # 3. Generate SVG charts control_chart_svg = _generate_control_chart_svg(control_chart, subtask_info) histogram_svg = _generate_histogram_svg(histogram, subtask_info) # 4. Get company info and recipe info company = await _get_company_info(db) recipe_info = await _get_recipe_info(db, recipe_id) # 5. Build filter description filters_desc = [] if version_id: filters_desc.append(f"Version: {version_id}") if date_from: filters_desc.append(f"From: {date_from.strftime('%Y-%m-%d')}") if date_to: filters_desc.append(f"To: {date_to.strftime('%Y-%m-%d')}") if lot_number: filters_desc.append(f"Lot: {lot_number}") if serial_number: filters_desc.append(f"Serial: {serial_number}") # 6. Render HTML template template = _jinja_env.get_template("spc_report.html") html_content = template.render( company=company, recipe=recipe_info, subtask=subtask_info, summary=summary, capability=capability, control_chart_svg=control_chart_svg, histogram_svg=histogram_svg, filters_desc=filters_desc, generated_at=datetime.now(), n_measurements=len(measurements), ) # 7. Convert HTML to PDF pdf_bytes = HTML(string=html_content).write_pdf() return pdf_bytes async def generate_measurement_report( db: AsyncSession, recipe_id: int, subtask_id: int | None = None, version_id: int | None = None, date_from: datetime | None = None, date_to: datetime | None = None, operator_id: int | None = None, lot_number: str | None = None, serial_number: str | None = None, ) -> bytes: """Generate measurement table PDF report and return bytes.""" # 1. Query measurements measurements = await _query_measurements( db, recipe_id, version_id, subtask_id, date_from, date_to, operator_id, lot_number, serial_number, ) # 2. Get recipe info recipe_info = await _get_recipe_info(db, recipe_id) company = await _get_company_info(db) # 3. Get subtask map and operator map subtask_ids = {m.subtask_id for m in measurements} operator_ids = {m.measured_by for m in measurements} subtask_map = {} for sid in subtask_ids: info = await _get_subtask_info(db, sid) if info: subtask_map[sid] = info operator_map = await _get_operator_map(db, operator_ids) # 4. Build table rows rows = [] for i, m in enumerate(measurements, 1): st_info = subtask_map.get(m.subtask_id, {}) rows.append({ "num": i, "subtask": f"#{st_info.get('marker_number', '?')} — {st_info.get('description', '?')}", "value": f"{float(m.value):.4f}", "unit": st_info.get("unit", "mm"), "pass_fail": m.pass_fail, "measured_at": m.measured_at.strftime("%Y-%m-%d %H:%M"), "operator": operator_map.get(m.measured_by, f"ID {m.measured_by}"), "lot_number": m.lot_number or "—", "serial_number": m.serial_number or "—", }) # 5. Summary pass_fail_values = [m.pass_fail for m in measurements] summary = compute_summary(pass_fail_values) # 6. Build filter description filters_desc = [] if subtask_id: si = subtask_map.get(subtask_id) if si: filters_desc.append(f"Point: #{si['marker_number']} — {si['description']}") if version_id: filters_desc.append(f"Version: {version_id}") if date_from: filters_desc.append(f"From: {date_from.strftime('%Y-%m-%d')}") if date_to: filters_desc.append(f"To: {date_to.strftime('%Y-%m-%d')}") if lot_number: filters_desc.append(f"Lot: {lot_number}") if serial_number: filters_desc.append(f"Serial: {serial_number}") # 7. Render template template = _jinja_env.get_template("measurement_report.html") html_content = template.render( company=company, recipe=recipe_info, rows=rows, summary=summary, filters_desc=filters_desc, generated_at=datetime.now(), n_measurements=len(measurements), ) # 8. Convert to PDF pdf_bytes = HTML(string=html_content).write_pdf() return pdf_bytes