"""FastAPI webapp standalone per PM2D. Endpoint: GET / → HTML UI POST /upload → upload immagine (multipart) POST /match → JSON params + ids → results GET /image/{id}/raw → PNG originale GET /image/{id}/annotated → PNG con overlay match """ from __future__ import annotations import os import tempfile import time import uuid from pathlib import Path import cv2 import numpy as np from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.responses import HTMLResponse, Response from fastapi.staticfiles import StaticFiles from pydantic import BaseModel def _load_env(root: Path) -> None: """Legge .env in root e popola os.environ (no override se già set).""" f = root / ".env" if not f.exists(): return for line in f.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) k = k.strip(); v = v.strip().strip('"').strip("'") os.environ.setdefault(k, v) # Root progetto (parent di pm2d/) PROJECT_ROOT = Path(__file__).resolve().parents[2] _load_env(PROJECT_ROOT) _images_dir_raw = os.environ.get("IMAGES_DIR", "Test") IMAGES_DIR = Path(_images_dir_raw) if not IMAGES_DIR.is_absolute(): IMAGES_DIR = PROJECT_ROOT / IMAGES_DIR from pm2d.line_matcher import LineShapeMatcher, Match from pm2d.auto_tune import auto_tune WEB_DIR = Path(__file__).parent STATIC_DIR = WEB_DIR / "static" STATIC_DIR.mkdir(exist_ok=True) # Persistenza immagini su disco (sopravvive a restart server) CACHE_DIR = Path(tempfile.gettempdir()) / "pm2d_cache" CACHE_DIR.mkdir(exist_ok=True) # Cache in-memory (soft, ricaricata da disco se mancante) _IMG_CACHE: dict[str, np.ndarray] = {} def _store_image(img: np.ndarray) -> str: iid = uuid.uuid4().hex[:12] cv2.imwrite(str(CACHE_DIR / f"{iid}.png"), img) _IMG_CACHE[iid] = img return iid def _load_image(iid: str) -> np.ndarray | None: cached = _IMG_CACHE.get(iid) if cached is not None: return cached p = CACHE_DIR / f"{iid}.png" if not p.exists(): return None img = cv2.imread(str(p)) if img is not None: _IMG_CACHE[iid] = img return img app = FastAPI(title="PM2D Webapp", version="1.0.0") def _encode_png(img: np.ndarray) -> bytes: ok, buf = cv2.imencode(".png", img) if not ok: raise RuntimeError("PNG encode failed") return buf.tobytes() def _draw_matches(scene: np.ndarray, matches: list[Match], template_gray: np.ndarray | None) -> np.ndarray: out = scene.copy() H, W = scene.shape[:2] palette = [ (0, 255, 0), (0, 200, 255), (255, 100, 100), (255, 200, 0), (200, 0, 255), (100, 255, 200), (255, 0, 0), (0, 255, 255), ] for i, m in enumerate(matches): color = palette[i % len(palette)] if template_gray is not None: t = template_gray th, tw = t.shape edge = cv2.Canny(t, 50, 150) cx_t = (tw - 1) / 2.0; cy_t = (th - 1) / 2.0 M = cv2.getRotationMatrix2D((cx_t, cy_t), m.angle_deg, m.scale) M[0, 2] += m.cx - cx_t M[1, 2] += m.cy - cy_t warped = cv2.warpAffine(edge, M, (W, H), flags=cv2.INTER_NEAREST, borderValue=0) mask = warped > 0 if mask.any(): overlay = np.zeros_like(out) overlay[mask] = color out[mask] = (0.3 * out[mask] + 0.7 * overlay[mask]).astype(np.uint8) poly = m.bbox_poly.astype(np.int32).reshape(-1, 1, 2) cv2.polylines(out, [poly], True, color, 2, cv2.LINE_AA) p0 = tuple(m.bbox_poly[0].astype(int)) p1 = tuple(m.bbox_poly[1].astype(int)) cv2.line(out, p0, p1, color, 4, cv2.LINE_AA) cx, cy = int(round(m.cx)), int(round(m.cy)) cv2.drawMarker(out, (cx, cy), color, cv2.MARKER_CROSS, 22, 2, cv2.LINE_AA) L = int(np.linalg.norm(m.bbox_poly[1] - m.bbox_poly[0])) // 2 a = np.deg2rad(m.angle_deg) cv2.arrowedLine(out, (cx, cy), (int(cx + L * np.cos(a)), int(cy - L * np.sin(a))), color, 2, cv2.LINE_AA, tipLength=0.2) label = f"#{i+1} {m.angle_deg:.0f}d s={m.scale:.2f} {m.score:.2f}" cv2.putText(out, label, (cx + 8, cy - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2, cv2.LINE_AA) return out # ---------------- Models ---------------- class UploadResp(BaseModel): id: str width: int height: int class MatchParams(BaseModel): model_id: str scene_id: str roi: list[int] # [x, y, w, h] nell'immagine modello angle_min: float = 0.0 angle_max: float = 360.0 angle_step: float = 5.0 scale_min: float = 1.0 scale_max: float = 1.0 scale_step: float = 0.1 min_score: float = 0.55 max_matches: int = 25 nms_radius: int = 0 num_features: int = 96 weak_grad: float = 30.0 strong_grad: float = 60.0 spread_radius: int = 5 pyramid_levels: int = 3 verify_threshold: float = 0.4 class MatchResult(BaseModel): cx: float cy: float angle_deg: float scale: float score: float bbox_poly: list[list[float]] class MatchResp(BaseModel): matches: list[MatchResult] train_time: float find_time: float num_variants: int annotated_id: str class TuneParams(BaseModel): model_id: str roi: list[int] # ---------- User-facing (simple) params ---------- SYMMETRY_TO_ANGLE_MAX = { "invariante": 0.0, # oggetto simmetrico a rotazione totale (cerchi): 1 variante "nessuna": 360.0, "bilaterale": 180.0, "rot_3": 120.0, "rot_4": 90.0, "rot_6": 60.0, "rot_8": 45.0, } SCALE_PRESETS = { "fissa": (1.0, 1.0, 0.1), "mini": (0.9, 1.1, 0.05), # ±10% "medio": (0.75, 1.25, 0.05), # ±25% "max": (0.5, 1.5, 0.05), # ±50% } PRECISION_ANGLE_STEP = { "veloce": 10.0, "normale": 5.0, "preciso": 2.0, } # "Filtro falsi positivi" = mapping semantico del verify NCC threshold. # Un operatore sceglie il livello di rigore, non un numero astratto. FILTRO_FP_MAP = { "off": 0.0, # disabilitato: mantieni tutti i match shape-based "leggero": 0.20, # tollera variazioni intensità/illuminazione forti "medio": 0.35, # default bilanciato (consigliato) "forte": 0.50, # scarta match con intensità molto diversa dal template } class SimpleMatchParams(BaseModel): model_id: str scene_id: str roi: list[int] tipo: str = "intero" # "intero" | "parziale" simmetria: str = "nessuna" # chiave SYMMETRY_TO_ANGLE_MAX scala: str = "fissa" # chiave SCALE_PRESETS precisione: str = "normale" # chiave PRECISION_ANGLE_STEP filtro_fp: str = "medio" # chiave FILTRO_FP_MAP min_score: float = 0.65 max_matches: int = 25 def _simple_to_technical( p: SimpleMatchParams, roi_img: np.ndarray, ) -> dict: """Converti parametri user-facing → tecnici usando analisi della ROI.""" from pm2d.auto_tune import auto_tune as _auto tune = _auto(roi_img) h, w = roi_img.shape[:2] min_side = min(h, w) # Feature count: parziale = meno feature (area minore) nf = tune["num_features"] if p.tipo == "parziale": nf = max(32, int(nf * 0.6)) # Piramide derivata da dimensione ROI if min_side < 60: pyr = 1 elif min_side < 150: pyr = 2 elif min_side < 400: pyr = 3 else: pyr = 4 # Spread radius ~2-3% del lato minimo spread = max(3, min(10, int(round(min_side * 0.03)))) angle_max = SYMMETRY_TO_ANGLE_MAX.get(p.simmetria, 360.0) smin, smax, sstep = SCALE_PRESETS.get(p.scala, (1.0, 1.0, 0.1)) ang_step = PRECISION_ANGLE_STEP.get(p.precisione, 5.0) return { "num_features": nf, "weak_grad": tune["weak_grad"], "strong_grad": tune["strong_grad"], "spread_radius": spread, "pyramid_levels": pyr, "angle_min": 0.0, "angle_max": angle_max, "angle_step": ang_step, "scale_min": smin, "scale_max": smax, "scale_step": sstep, "min_score": p.min_score, "max_matches": p.max_matches, "nms_radius": 0, "verify_threshold": FILTRO_FP_MAP.get(p.filtro_fp, 0.35), } # ---------------- Endpoints ---------------- @app.get("/", response_class=HTMLResponse) def index(): html_path = STATIC_DIR / "index.html" return HTMLResponse(html_path.read_text(encoding="utf-8")) @app.get("/folder_image/{filename}") def folder_image(filename: str, w: int = 120): """Serve thumbnail PNG dell'immagine IMAGES_DIR (scalata a width w).""" if "/" in filename or ".." in filename: raise HTTPException(400, "nome non valido") path = IMAGES_DIR / filename if not path.is_file(): raise HTTPException(404, "non trovato") img = cv2.imread(str(path), cv2.IMREAD_COLOR) if img is None: raise HTTPException(400, "non leggibile") h0, w0 = img.shape[:2] if w0 > w: sc = w / w0 img = cv2.resize(img, (w, int(h0 * sc)), interpolation=cv2.INTER_AREA) return Response(_encode_png(img), media_type="image/png", headers={"Cache-Control": "public, max-age=3600"}) @app.get("/images") def list_images(): """Lista file immagine nella cartella configurata in IMAGES_DIR.""" if not IMAGES_DIR.is_dir(): return {"dir": str(IMAGES_DIR), "files": []} exts = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"} files = sorted( p.name for p in IMAGES_DIR.iterdir() if p.is_file() and p.suffix.lower() in exts ) return {"dir": str(IMAGES_DIR), "files": files} class LoadFolderReq(BaseModel): filename: str @app.post("/load_from_folder", response_model=UploadResp) def load_from_folder(req: LoadFolderReq): """Carica immagine dalla cartella IMAGES_DIR per nome file.""" name = req.filename if "/" in name or ".." in name: raise HTTPException(400, "nome file non valido") path = IMAGES_DIR / name if not path.is_file(): raise HTTPException(404, f"file non trovato: {name}") img = cv2.imread(str(path), cv2.IMREAD_COLOR) if img is None: raise HTTPException(400, "immagine non leggibile") iid = _store_image(img) return UploadResp(id=iid, width=img.shape[1], height=img.shape[0]) @app.post("/upload", response_model=UploadResp) async def upload(file: UploadFile = File(...)): data = await file.read() arr = np.frombuffer(data, dtype=np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img is None: raise HTTPException(400, "Immagine non valida") iid = _store_image(img) return UploadResp(id=iid, width=img.shape[1], height=img.shape[0]) @app.get("/image/{iid}/raw") def image_raw(iid: str): img = _load_image(iid) if img is None: raise HTTPException(404, "Image not found") return Response(_encode_png(img), media_type="image/png") @app.post("/match", response_model=MatchResp) def match(p: MatchParams): model = _load_image(p.model_id) scene = _load_image(p.scene_id) if model is None or scene is None: raise HTTPException(404, "Immagini non trovate") x, y, w, h = p.roi x = max(0, x); y = max(0, y) w = max(1, min(w, model.shape[1] - x)) h = max(1, min(h, model.shape[0] - y)) roi_img = model[y:y + h, x:x + w] m = LineShapeMatcher( num_features=p.num_features, weak_grad=p.weak_grad, strong_grad=p.strong_grad, angle_range_deg=(p.angle_min, p.angle_max), angle_step_deg=p.angle_step, scale_range=(p.scale_min, p.scale_max), scale_step=p.scale_step, spread_radius=p.spread_radius, pyramid_levels=p.pyramid_levels, ) t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0 nms = p.nms_radius if p.nms_radius > 0 else None t0 = time.time() matches = m.find( scene, min_score=p.min_score, max_matches=p.max_matches, nms_radius=nms, verify_threshold=p.verify_threshold, ) t_find = time.time() - t0 # Render annotated image tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY) annotated = _draw_matches(scene, matches, tg) ann_id = _store_image(annotated) return MatchResp( matches=[MatchResult( cx=m_.cx, cy=m_.cy, angle_deg=m_.angle_deg, scale=m_.scale, score=m_.score, bbox_poly=m_.bbox_poly.tolist(), ) for m_ in matches], train_time=t_train, find_time=t_find, num_variants=n, annotated_id=ann_id, ) @app.post("/match_simple", response_model=MatchResp) def match_simple(p: SimpleMatchParams): """Match con parametri user-facing (tipo/simmetria/scala/precisione). Il server deriva i parametri tecnici (num_features, soglie gradiente, piramide, ecc.) dall'analisi automatica della ROI. """ model = _load_image(p.model_id) scene = _load_image(p.scene_id) if model is None or scene is None: raise HTTPException(404, "Immagini non trovate") x, y, w, h = p.roi x = max(0, x); y = max(0, y) w = max(1, min(w, model.shape[1] - x)) h = max(1, min(h, model.shape[0] - y)) roi_img = model[y:y + h, x:x + w] tech = _simple_to_technical(p, roi_img) m = LineShapeMatcher( num_features=tech["num_features"], weak_grad=tech["weak_grad"], strong_grad=tech["strong_grad"], angle_range_deg=(tech["angle_min"], tech["angle_max"]), angle_step_deg=tech["angle_step"], scale_range=(tech["scale_min"], tech["scale_max"]), scale_step=tech["scale_step"], spread_radius=tech["spread_radius"], pyramid_levels=tech["pyramid_levels"], ) t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0 nms = tech["nms_radius"] if tech["nms_radius"] > 0 else None t0 = time.time() matches = m.find( scene, min_score=tech["min_score"], max_matches=tech["max_matches"], nms_radius=nms, verify_threshold=tech["verify_threshold"], ) t_find = time.time() - t0 tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY) annotated = _draw_matches(scene, matches, tg) ann_id = _store_image(annotated) return MatchResp( matches=[MatchResult( cx=mt.cx, cy=mt.cy, angle_deg=mt.angle_deg, scale=mt.scale, score=mt.score, bbox_poly=mt.bbox_poly.tolist(), ) for mt in matches], train_time=t_train, find_time=t_find, num_variants=n, annotated_id=ann_id, ) @app.post("/auto_tune") def tune(p: TuneParams): model = _load_image(p.model_id) if model is None: raise HTTPException(404, "Immagine non trovata") x, y, w, h = p.roi roi_img = model[y:y + h, x:x + w] t = auto_tune(roi_img) return {k: v for k, v in t.items() if not k.startswith("_")} # Mount static app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") def serve(host: str = "127.0.0.1", port: int = 8080): import uvicorn uvicorn.run(app, host=host, port=port, log_level="info") if __name__ == "__main__": serve()