"""FastAPI webapp standalone per PM2D. Endpoint: GET / → HTML UI POST /upload → upload immagine (multipart) POST /match → JSON params + ids → results GET /image/{id}/raw → PNG originale GET /image/{id}/annotated → PNG con overlay match """ from __future__ import annotations import hashlib import os import tempfile import time import uuid from collections import OrderedDict from pathlib import Path import cv2 import numpy as np from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.responses import HTMLResponse, Response from fastapi.staticfiles import StaticFiles from pydantic import BaseModel def _load_env(root: Path) -> None: """Legge .env in root e popola os.environ (no override se già set).""" f = root / ".env" if not f.exists(): return for line in f.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) k = k.strip(); v = v.strip().strip('"').strip("'") os.environ.setdefault(k, v) # Root progetto (parent di pm2d/) PROJECT_ROOT = Path(__file__).resolve().parents[2] _load_env(PROJECT_ROOT) _images_dir_raw = os.environ.get("IMAGES_DIR", "Test") IMAGES_DIR = Path(_images_dir_raw) if not IMAGES_DIR.is_absolute(): IMAGES_DIR = PROJECT_ROOT / IMAGES_DIR # Cartella ricette pre-trained (V feature: save/load matcher) RECIPES_DIR = PROJECT_ROOT / "recipes" RECIPES_DIR.mkdir(exist_ok=True) from pm2d.line_matcher import LineShapeMatcher, Match from pm2d.auto_tune import auto_tune WEB_DIR = Path(__file__).parent STATIC_DIR = WEB_DIR / "static" STATIC_DIR.mkdir(exist_ok=True) # Persistenza immagini su disco (sopravvive a restart server) CACHE_DIR = Path(tempfile.gettempdir()) / "pm2d_cache" CACHE_DIR.mkdir(exist_ok=True) # Cache in-memory (soft, ricaricata da disco se mancante) _IMG_CACHE: dict[str, np.ndarray] = {} # Cache matcher addestrati: (roi_hash, params_hash) -> LineShapeMatcher # LRU con capacità limitata _MATCHER_CACHE: OrderedDict = OrderedDict() _MATCHER_CACHE_SIZE = 8 def _matcher_cache_key(roi: np.ndarray, tech: dict) -> str: h = hashlib.md5() h.update(roi.tobytes()) # Solo parametri che influenzano il training relevant = ("num_features", "weak_grad", "strong_grad", "angle_min", "angle_max", "angle_step", "scale_min", "scale_max", "scale_step", "spread_radius", "pyramid_levels") for k in relevant: h.update(f"{k}={tech.get(k)}".encode()) h.update(f"shape={roi.shape}".encode()) return h.hexdigest() def _cache_get_matcher(key: str): m = _MATCHER_CACHE.get(key) if m is not None: _MATCHER_CACHE.move_to_end(key) # LRU touch return m def _cache_put_matcher(key: str, matcher) -> None: _MATCHER_CACHE[key] = matcher _MATCHER_CACHE.move_to_end(key) while len(_MATCHER_CACHE) > _MATCHER_CACHE_SIZE: _MATCHER_CACHE.popitem(last=False) def _store_image(img: np.ndarray) -> str: iid = uuid.uuid4().hex[:12] cv2.imwrite(str(CACHE_DIR / f"{iid}.png"), img) _IMG_CACHE[iid] = img return iid def _load_image(iid: str) -> np.ndarray | None: cached = _IMG_CACHE.get(iid) if cached is not None: return cached p = CACHE_DIR / f"{iid}.png" if not p.exists(): return None img = cv2.imread(str(p)) if img is not None: _IMG_CACHE[iid] = img return img app = FastAPI(title="PM2D Webapp", version="1.0.0") def _encode_png(img: np.ndarray) -> bytes: ok, buf = cv2.imencode(".png", img) if not ok: raise RuntimeError("PNG encode failed") return buf.tobytes() def _draw_matches(scene: np.ndarray, matches: list[Match], template_gray: np.ndarray | None, matcher: "LineShapeMatcher | None" = None) -> np.ndarray: """Disegna match annotati sulla scena. Se matcher e' passato, usa la stessa pipeline di edge filtering (hysteresis weak/strong_grad) e selezione feature usata in training, cosi' l'overlay nel match riflette ESATTAMENTE quello che l'utente ha visto nel preview "Anteprima edge". Inoltre disegna UCS (asse X rosso, Y verde) sul centro pose del match. Senza matcher: fallback Canny (legacy). """ out = scene.copy() H, W = scene.shape[:2] palette = [ (0, 255, 0), (0, 200, 255), (255, 100, 100), (255, 200, 0), (200, 0, 255), (100, 255, 200), (255, 0, 0), (0, 255, 255), ] bin_colors = [ (255, 0, 0), (255, 128, 0), (255, 255, 0), (0, 255, 0), (0, 255, 255), (0, 128, 255), (0, 0, 255), (255, 0, 255), (255, 100, 100), (255, 180, 100), (255, 230, 100), (180, 255, 100), (100, 255, 200), (100, 180, 255), (180, 100, 255), (255, 100, 200), ] for i, m in enumerate(matches): color = palette[i % len(palette)] # Posizione UCS: baricentro feature warpate (default = cx, cy se non disponibile). # Mantiene coerenza con anteprima modello che mostra UCS sul baricentro. ucs_x, ucs_y = float(m.cx), float(m.cy) if template_gray is not None and matcher is not None: t = template_gray th, tw = t.shape cx_t = (tw - 1) / 2.0; cy_t = (th - 1) / 2.0 M = cv2.getRotationMatrix2D((cx_t, cy_t), m.angle_deg, m.scale) M[0, 2] += m.cx - cx_t M[1, 2] += m.cy - cy_t # Background edge filtrati: warpa template + hysteresis warped_gray = cv2.warpAffine( t, M, (W, H), flags=cv2.INTER_LINEAR, borderValue=0) mag, _ = matcher._gradient(warped_gray) if matcher.weak_grad < matcher.strong_grad: edge_mask = matcher._hysteresis_mask(mag) else: edge_mask = mag >= matcher.strong_grad if edge_mask.any(): bg_overlay = np.zeros_like(out) dark = tuple(int(c * 0.35) for c in color) bg_overlay[edge_mask] = dark out = cv2.addWeighted(out, 1.0, bg_overlay, 0.7, 0) # Feature reali del matcher: usa quelle pre-computate della # variante che ha generato il match. Stesse identiche feature # mostrate nell'anteprima modello (ruotate/scalate alla pose). vi = getattr(m, "variant_idx", -1) fx_scene = fy_scene = fb_arr = None if 0 <= vi < len(matcher.variants): lvl0 = matcher.variants[vi].levels[0] # dx/dy sono offsets relativi al CENTRO del template warpato # nelle coordinate del kernel template (gia' pre-ruotate # all'angolo della variante grezza). Per coerenza con la # pose finale m.angle_deg (post-refine), ri-rotazione del # delta angolare (m.angle_deg - var.angle_deg). var = matcher.variants[vi] dang = np.deg2rad(m.angle_deg - var.angle_deg) ca, sa = np.cos(dang), np.sin(dang) dxr = lvl0.dx * ca + lvl0.dy * sa dyr = -lvl0.dx * sa + lvl0.dy * ca fx_scene = m.cx + dxr fy_scene = m.cy + dyr fb_arr = lvl0.bin else: # Fallback: estrai feature dal warpato (perde precisione) _, bins_w = matcher._gradient(warped_gray) fx, fy, fb = matcher._extract_features(mag, bins_w, None) fx_scene = fx.astype(np.float32) fy_scene = fy.astype(np.float32) fb_arr = fb # Disegna feature for k in range(len(fx_scene)): px = int(round(float(fx_scene[k]))) py = int(round(float(fy_scene[k]))) if 0 <= px < W and 0 <= py < H: bcol = bin_colors[int(fb_arr[k]) % len(bin_colors)] cv2.circle(out, (px, py), 2, bcol, -1, cv2.LINE_AA) # UCS sul baricentro feature (in scene coords) if len(fx_scene) > 0: ucs_x = float(np.mean(fx_scene)) ucs_y = float(np.mean(fy_scene)) elif template_gray is not None: # Senza matcher: legacy Canny t = template_gray th, tw = t.shape cx_t = (tw - 1) / 2.0; cy_t = (th - 1) / 2.0 M = cv2.getRotationMatrix2D((cx_t, cy_t), m.angle_deg, m.scale) M[0, 2] += m.cx - cx_t M[1, 2] += m.cy - cy_t edge = cv2.Canny(t, 50, 150) warped = cv2.warpAffine(edge, M, (W, H), flags=cv2.INTER_NEAREST, borderValue=0) mask = warped > 0 if mask.any(): overlay = np.zeros_like(out) overlay[mask] = color out[mask] = (0.3 * out[mask] + 0.7 * overlay[mask]).astype(np.uint8) # bbox poly e linea-marker rimossi (richiesta utente "togli la ROI"): # UCS + edge filtrati gia' identificano pose e orientamento. cx, cy = int(round(ucs_x)), int(round(ucs_y)) # UCS sul centro pose match (richiesta utente: come nell'anteprima # modello). Asse X rosso destra, Y verde basso (image y-down). # Lunghezza derivata dalla diagonale bbox per scala-invariante. L = int(np.linalg.norm(m.bbox_poly[1] - m.bbox_poly[0])) // 2 if L < 10: L = 30 # fallback se bbox degenere ax = np.deg2rad(m.angle_deg) # X axis ruotato (rosso) x_end = (int(cx + L * np.cos(ax)), int(cy - L * np.sin(ax))) cv2.arrowedLine(out, (cx, cy), x_end, (0, 0, 255), 2, cv2.LINE_AA, tipLength=0.2) cv2.putText(out, "X", (x_end[0] + 4, x_end[1] + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) # Y axis perpendicolare (verde, +90° in image coords = giu' visivo) y_end = (int(cx + L * np.cos(ax + np.pi / 2)), int(cy - L * np.sin(ax + np.pi / 2))) cv2.arrowedLine(out, (cx, cy), y_end, (0, 255, 0), 2, cv2.LINE_AA, tipLength=0.2) cv2.putText(out, "Y", (y_end[0] + 4, y_end[1] + 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA) # Origine UCS: cerchio bianco con bordo nero cv2.circle(out, (cx, cy), 4, (0, 0, 0), -1, cv2.LINE_AA) cv2.circle(out, (cx, cy), 3, (255, 255, 255), -1, cv2.LINE_AA) label = f"#{i+1} {m.angle_deg:.0f}d s={m.scale:.2f} {m.score:.2f}" cv2.putText(out, label, (cx + 12, cy - 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2, cv2.LINE_AA) return out # ---------------- Models ---------------- class UploadResp(BaseModel): id: str width: int height: int class MatchParams(BaseModel): model_id: str scene_id: str roi: list[int] # [x, y, w, h] nell'immagine modello angle_min: float = 0.0 angle_max: float = 360.0 angle_step: float = 5.0 scale_min: float = 1.0 scale_max: float = 1.0 scale_step: float = 0.1 min_score: float = 0.55 max_matches: int = 25 nms_radius: int = 0 num_features: int = 96 weak_grad: float = 30.0 strong_grad: float = 60.0 spread_radius: int = 5 pyramid_levels: int = 3 verify_threshold: float = 0.4 class MatchResult(BaseModel): cx: float cy: float angle_deg: float scale: float score: float bbox_poly: list[list[float]] class MatchResp(BaseModel): matches: list[MatchResult] train_time: float find_time: float num_variants: int annotated_id: str diag: dict | None = None # CC: diagnostica pipeline (drop reasons) class TuneParams(BaseModel): model_id: str roi: list[int] # ---------- User-facing (simple) params ---------- SYMMETRY_TO_ANGLE_MAX = { "invariante": 0.0, # oggetto simmetrico a rotazione totale (cerchi): 1 variante "nessuna": 360.0, "bilaterale": 180.0, "rot_3": 120.0, "rot_4": 90.0, "rot_6": 60.0, "rot_8": 45.0, } SCALE_PRESETS = { "fissa": (1.0, 1.0, 0.1), "mini": (0.9, 1.1, 0.05), # ±10% "medio": (0.75, 1.25, 0.05), # ±25% "max": (0.5, 1.5, 0.05), # ±50% } PRECISION_ANGLE_STEP = { "veloce": 10.0, "normale": 5.0, "preciso": 2.0, } # "Filtro falsi positivi" = mapping semantico del verify NCC threshold. # Un operatore sceglie il livello di rigore, non un numero astratto. FILTRO_FP_MAP = { "off": 0.0, # disabilitato: mantieni tutti i match shape-based "leggero": 0.30, # tollera variazioni intensità/illuminazione forti "medio": 0.50, # default bilanciato (consigliato) "forte": 0.70, # scarta match con intensità molto diversa dal template } class SimpleMatchParams(BaseModel): model_id: str scene_id: str roi: list[int] tipo: str = "intero" # "intero" | "parziale" simmetria: str = "nessuna" # chiave SYMMETRY_TO_ANGLE_MAX scala: str = "fissa" # chiave SCALE_PRESETS precisione: str = "normale" # chiave PRECISION_ANGLE_STEP filtro_fp: str = "medio" # chiave FILTRO_FP_MAP penalita_scala: float = 0.0 # 0 = score shape invariante, >0 = penalizza scala != 1 min_score: float = 0.65 max_matches: int = 25 # --- Halcon-mode flags (default off = backward compat) --- # Init-time (richiede ri-train se cambiato) use_polarity: bool = False # F: 16 bin orientation mod 2pi use_gpu: bool = False # R: OpenCL UMat (silent fallback) # Find-time (no retrain) min_recall: float = 0.0 # M: filtra match con poche feature combaciate use_soft_score: bool = False # Y: cosine sim continua dei gradients subpixel_lm: bool = False # Z: precisione 0.05 px nms_iou_threshold: float = 0.3 # A: IoU bbox poligonale coarse_stride: int = 1 # sub-sampling top-level (>=1) pyramid_propagate: bool = False # propagazione candidati top->full greediness: float = 0.0 # early-exit kernel (0..1) refine_pose_joint: bool = False # Nelder-Mead 3D (cx, cy, angle) search_roi: list[int] | None = None # [x, y, w, h] limita area def _simple_to_technical( p: SimpleMatchParams, roi_img: np.ndarray, ) -> dict: """Converti parametri user-facing → tecnici usando analisi della ROI.""" from pm2d.auto_tune import auto_tune as _auto tune = _auto(roi_img) h, w = roi_img.shape[:2] min_side = min(h, w) # Feature count: parziale = meno feature (area minore) nf = tune["num_features"] if p.tipo == "parziale": nf = max(32, int(nf * 0.6)) # Piramide derivata da dimensione ROI if min_side < 60: pyr = 1 elif min_side < 150: pyr = 2 elif min_side < 400: pyr = 3 else: pyr = 4 # Spread radius ~2-3% del lato minimo spread = max(3, min(10, int(round(min_side * 0.03)))) angle_max = SYMMETRY_TO_ANGLE_MAX.get(p.simmetria, 360.0) smin, smax, sstep = SCALE_PRESETS.get(p.scala, (1.0, 1.0, 0.1)) ang_step = PRECISION_ANGLE_STEP.get(p.precisione, 5.0) return { "num_features": nf, "weak_grad": tune["weak_grad"], "strong_grad": tune["strong_grad"], "spread_radius": spread, "pyramid_levels": pyr, "angle_min": 0.0, "angle_max": angle_max, "angle_step": ang_step, "scale_min": smin, "scale_max": smax, "scale_step": sstep, "min_score": p.min_score, "max_matches": p.max_matches, "nms_radius": 0, "verify_threshold": FILTRO_FP_MAP.get(p.filtro_fp, 0.35), "scale_penalty": p.penalita_scala, } # ---------------- Endpoints ---------------- @app.get("/", response_class=HTMLResponse) def index(): html_path = STATIC_DIR / "index.html" return HTMLResponse(html_path.read_text(encoding="utf-8")) @app.post("/upload_to_folder") async def upload_to_folder(file: UploadFile = File(...)): """Salva file caricato nella cartella IMAGES_DIR. Ritorna lista aggiornata.""" if not IMAGES_DIR.is_dir(): raise HTTPException(500, f"IMAGES_DIR non esiste: {IMAGES_DIR}") # Sanitizza nome file (no traversal) name = Path(file.filename or "upload.png").name if not name: raise HTTPException(400, "nome file vuoto") ext = Path(name).suffix.lower() allowed = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"} if ext not in allowed: raise HTTPException(400, f"estensione non supportata: {ext}") # Leggi contenuto e valida come immagine data = await file.read() arr = np.frombuffer(data, dtype=np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img is None: raise HTTPException(400, "file non è un'immagine valida") # Evita overwrite: se esiste, aggiungi suffisso numerico target = IMAGES_DIR / name if target.exists(): stem = target.stem; suffix = target.suffix i = 1 while True: alt = IMAGES_DIR / f"{stem}_{i}{suffix}" if not alt.exists(): target = alt; break i += 1 # Scrivi su disco with open(target, "wb") as f: f.write(data) # Ritorna lista aggiornata return { "saved_as": target.name, "dir": str(IMAGES_DIR), "files": sorted( p.name for p in IMAGES_DIR.iterdir() if p.is_file() and p.suffix.lower() in allowed ), } @app.get("/folder_image/{filename}") def folder_image(filename: str, w: int = 120): """Serve thumbnail PNG dell'immagine IMAGES_DIR (scalata a width w).""" if "/" in filename or ".." in filename: raise HTTPException(400, "nome non valido") path = IMAGES_DIR / filename if not path.is_file(): raise HTTPException(404, "non trovato") img = cv2.imread(str(path), cv2.IMREAD_COLOR) if img is None: raise HTTPException(400, "non leggibile") h0, w0 = img.shape[:2] if w0 > w: sc = w / w0 img = cv2.resize(img, (w, int(h0 * sc)), interpolation=cv2.INTER_AREA) return Response(_encode_png(img), media_type="image/png", headers={"Cache-Control": "public, max-age=3600"}) @app.get("/images") def list_images(): """Lista file immagine nella cartella configurata in IMAGES_DIR.""" if not IMAGES_DIR.is_dir(): return {"dir": str(IMAGES_DIR), "files": []} exts = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"} files = sorted( p.name for p in IMAGES_DIR.iterdir() if p.is_file() and p.suffix.lower() in exts ) return {"dir": str(IMAGES_DIR), "files": files} class LoadFolderReq(BaseModel): filename: str @app.post("/load_from_folder", response_model=UploadResp) def load_from_folder(req: LoadFolderReq): """Carica immagine dalla cartella IMAGES_DIR per nome file.""" name = req.filename if "/" in name or ".." in name: raise HTTPException(400, "nome file non valido") path = IMAGES_DIR / name if not path.is_file(): raise HTTPException(404, f"file non trovato: {name}") img = cv2.imread(str(path), cv2.IMREAD_COLOR) if img is None: raise HTTPException(400, "immagine non leggibile") iid = _store_image(img) return UploadResp(id=iid, width=img.shape[1], height=img.shape[0]) @app.post("/upload", response_model=UploadResp) async def upload(file: UploadFile = File(...)): data = await file.read() arr = np.frombuffer(data, dtype=np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img is None: raise HTTPException(400, "Immagine non valida") iid = _store_image(img) return UploadResp(id=iid, width=img.shape[1], height=img.shape[0]) @app.get("/image/{iid}/raw") def image_raw(iid: str): img = _load_image(iid) if img is None: raise HTTPException(404, "Image not found") return Response(_encode_png(img), media_type="image/png") @app.post("/match", response_model=MatchResp) def match(p: MatchParams): model = _load_image(p.model_id) scene = _load_image(p.scene_id) if model is None or scene is None: raise HTTPException(404, "Immagini non trovate") x, y, w, h = p.roi x = max(0, x); y = max(0, y) w = max(1, min(w, model.shape[1] - x)) h = max(1, min(h, model.shape[0] - y)) roi_img = model[y:y + h, x:x + w] tech_for_cache = { "num_features": p.num_features, "weak_grad": p.weak_grad, "strong_grad": p.strong_grad, "angle_min": p.angle_min, "angle_max": p.angle_max, "angle_step": p.angle_step, "scale_min": p.scale_min, "scale_max": p.scale_max, "scale_step": p.scale_step, "spread_radius": p.spread_radius, "pyramid_levels": p.pyramid_levels, } key = _matcher_cache_key(roi_img, tech_for_cache) m = _cache_get_matcher(key) if m is None: m = LineShapeMatcher( num_features=p.num_features, weak_grad=p.weak_grad, strong_grad=p.strong_grad, angle_range_deg=(p.angle_min, p.angle_max), angle_step_deg=p.angle_step, scale_range=(p.scale_min, p.scale_max), scale_step=p.scale_step, spread_radius=p.spread_radius, pyramid_levels=p.pyramid_levels, ) t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0 _cache_put_matcher(key, m) else: n = len(m.variants); t_train = 0.0 nms = p.nms_radius if p.nms_radius > 0 else None t0 = time.time() matches = m.find( scene, min_score=p.min_score, max_matches=p.max_matches, nms_radius=nms, verify_threshold=p.verify_threshold, ) t_find = time.time() - t0 # Render annotated image tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY) annotated = _draw_matches(scene, matches, tg, matcher=m) ann_id = _store_image(annotated) return MatchResp( matches=[MatchResult( cx=m_.cx, cy=m_.cy, angle_deg=m_.angle_deg, scale=m_.scale, score=m_.score, bbox_poly=m_.bbox_poly.tolist(), ) for m_ in matches], train_time=t_train, find_time=t_find, num_variants=n, annotated_id=ann_id, diag=m.get_last_diag() if hasattr(m, "get_last_diag") else None, ) @app.post("/match_simple", response_model=MatchResp) def match_simple(p: SimpleMatchParams): """Match con parametri user-facing (tipo/simmetria/scala/precisione). Il server deriva i parametri tecnici (num_features, soglie gradiente, piramide, ecc.) dall'analisi automatica della ROI. """ model = _load_image(p.model_id) scene = _load_image(p.scene_id) if model is None or scene is None: raise HTTPException(404, "Immagini non trovate") x, y, w, h = p.roi x = max(0, x); y = max(0, y) w = max(1, min(w, model.shape[1] - x)) h = max(1, min(h, model.shape[0] - y)) roi_img = model[y:y + h, x:x + w] tech = _simple_to_technical(p, roi_img) key = _matcher_cache_key(roi_img, tech) # Halcon-mode init params: incidono sul training, includere in cache key halcon_init_key = f"|pol={p.use_polarity}|gpu={p.use_gpu}" key = key + halcon_init_key m = _cache_get_matcher(key) if m is None: m = LineShapeMatcher( num_features=tech["num_features"], weak_grad=tech["weak_grad"], strong_grad=tech["strong_grad"], angle_range_deg=(tech["angle_min"], tech["angle_max"]), angle_step_deg=tech["angle_step"], scale_range=(tech["scale_min"], tech["scale_max"]), scale_step=tech["scale_step"], spread_radius=tech["spread_radius"], pyramid_levels=tech["pyramid_levels"], use_polarity=p.use_polarity, use_gpu=p.use_gpu, ) t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0 _cache_put_matcher(key, m) else: n = len(m.variants); t_train = 0.0 nms = tech["nms_radius"] if tech["nms_radius"] > 0 else None search_roi_t = tuple(p.search_roi) if p.search_roi else None t0 = time.time() matches = m.find( scene, min_score=tech["min_score"], max_matches=tech["max_matches"], nms_radius=nms, verify_threshold=tech["verify_threshold"], scale_penalty=tech.get("scale_penalty", 0.0), # Halcon-mode flags min_recall=p.min_recall, use_soft_score=p.use_soft_score, subpixel_lm=p.subpixel_lm, nms_iou_threshold=p.nms_iou_threshold, coarse_stride=p.coarse_stride, pyramid_propagate=p.pyramid_propagate, greediness=p.greediness, refine_pose_joint=p.refine_pose_joint, search_roi=search_roi_t, ) t_find = time.time() - t0 tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY) annotated = _draw_matches(scene, matches, tg, matcher=m) ann_id = _store_image(annotated) return MatchResp( matches=[MatchResult( cx=mt.cx, cy=mt.cy, angle_deg=mt.angle_deg, scale=mt.scale, score=mt.score, bbox_poly=mt.bbox_poly.tolist(), ) for mt in matches], train_time=t_train, find_time=t_find, num_variants=n, annotated_id=ann_id, diag=m.get_last_diag() if hasattr(m, "get_last_diag") else None, ) @app.post("/auto_tune") def tune(p: TuneParams): model = _load_image(p.model_id) if model is None: raise HTTPException(404, "Immagine non trovata") x, y, w, h = p.roi roi_img = model[y:y + h, x:x + w] t = auto_tune(roi_img) # Esponi parametri tecnici + meta diagnostica (_self_score, _validation, # _symmetry_order, _orient_entropy) per feedback UI. return t # --- V: Save/Load ricette pre-trained --- class SaveRecipeParams(BaseModel): model_id: str scene_id: str | None = None roi: list[int] # Riusa stessi param simple per training equivalente tipo: str = "intero" simmetria: str = "nessuna" scala: str = "fissa" precisione: str = "normale" use_polarity: bool = False use_gpu: bool = False name: str # nome file ricetta (no path) class EdgePreviewParams(BaseModel): model_id: str roi: list[int] weak_grad: float = 30.0 strong_grad: float = 60.0 num_features: int = 96 min_feature_spacing: int = 3 use_polarity: bool = False @app.post("/preview_edges") def preview_edges(p: EdgePreviewParams): """Estrae edge feature dalla ROI con i parametri dati e ritorna immagine annotata con i pixel selezionati come overlay. Permette tuning interattivo delle soglie weak/strong_grad e num_features per "togliere le sporcizie" (rumore di sfondo, edge spuri) prima di trainare il matcher vero. """ model = _load_image(p.model_id) if model is None: raise HTTPException(404, "Modello non trovato") x, y, w, h = p.roi H_m, W_m = model.shape[:2] x = max(0, min(int(x), W_m - 1)); y = max(0, min(int(y), H_m - 1)) w = max(1, min(int(w), W_m - x)); h = max(1, min(int(h), H_m - y)) roi_img = model[y:y + h, x:x + w] # Matcher temporaneo solo per estrazione feature (no train completo) m = LineShapeMatcher( weak_grad=p.weak_grad, strong_grad=p.strong_grad, num_features=p.num_features, min_feature_spacing=p.min_feature_spacing, use_polarity=p.use_polarity, ) gray = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY) if roi_img.ndim == 3 else roi_img mag, bins = m._gradient(gray) fx, fy, fb = m._extract_features(mag, bins, None) # Mostra anche i pixel "weak/strong" come heatmap di sfondo out = roi_img.copy() if roi_img.ndim == 3 else cv2.cvtColor(roi_img, cv2.COLOR_GRAY2BGR) # Overlay magnitude leggera mag_norm = np.clip(mag / max(1.0, mag.max()) * 255, 0, 255).astype(np.uint8) mag_color = cv2.applyColorMap(mag_norm, cv2.COLORMAP_BONE) out = cv2.addWeighted(out, 0.6, mag_color, 0.4, 0) # Pixel "strong" con hysteresis: contorno verde scuro tenue if m.weak_grad < m.strong_grad: edge_mask = m._hysteresis_mask(mag).astype(np.uint8) * 255 else: edge_mask = (mag >= m.strong_grad).astype(np.uint8) * 255 edge_overlay = np.zeros_like(out) edge_overlay[edge_mask > 0] = (0, 80, 0) # verde scuro out = cv2.addWeighted(out, 1.0, edge_overlay, 0.5, 0) # Feature scelte: cerchietti colorati per bin bin_colors = [ (255, 0, 0), (255, 128, 0), (255, 255, 0), (0, 255, 0), (0, 255, 255), (0, 128, 255), (0, 0, 255), (255, 0, 255), (255, 100, 100), (255, 180, 100), (255, 230, 100), (180, 255, 100), (100, 255, 200), (100, 180, 255), (180, 100, 255), (255, 100, 200), ] for i in range(len(fx)): b = int(fb[i]) col = bin_colors[b % len(bin_colors)] cv2.circle(out, (int(fx[i]), int(fy[i])), 2, col, -1, cv2.LINE_AA) # UCS sul baricentro feature (richiesta utente): assi X rosso, Y verde bary_cx = bary_cy = None if len(fx) > 0: bary_cx = float(np.mean(fx)) bary_cy = float(np.mean(fy)) bx, by = int(round(bary_cx)), int(round(bary_cy)) axis_len = max(20, int(0.15 * max(out.shape[:2]))) # X axis (rosso, verso destra) cv2.arrowedLine(out, (bx, by), (bx + axis_len, by), (0, 0, 255), 2, cv2.LINE_AA, tipLength=0.2) cv2.putText(out, "X", (bx + axis_len + 4, by + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) # Y axis (verde, verso il basso = convenzione image y-down) cv2.arrowedLine(out, (bx, by), (bx, by + axis_len), (0, 255, 0), 2, cv2.LINE_AA, tipLength=0.2) cv2.putText(out, "Y", (bx + 4, by + axis_len + 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA) # Origine: cerchio bianco con bordo nero cv2.circle(out, (bx, by), 4, (0, 0, 0), -1, cv2.LINE_AA) cv2.circle(out, (bx, by), 3, (255, 255, 255), -1, cv2.LINE_AA) img_id = _store_image(out) n_edge_strong = int((mag >= m.strong_grad).sum()) n_edge_total = int(edge_mask.sum() / 255) return { "preview_id": img_id, "n_features": len(fx), "n_edge_strong": n_edge_strong, "n_edge_after_hysteresis": n_edge_total, "mag_max": float(mag.max()), "mag_p50": float(np.percentile(mag, 50)), "mag_p85": float(np.percentile(mag, 85)), "ucs_baricentro": ( {"cx": round(bary_cx, 2), "cy": round(bary_cy, 2)} if bary_cx is not None else None ), } @app.post("/recipes") def save_recipe(p: SaveRecipeParams): """Allena matcher e salva su disco come ricetta riutilizzabile.""" model = _load_image(p.model_id) if model is None: raise HTTPException(404, "Modello non trovato") x, y, w, h = p.roi roi_img = model[y:y + h, x:x + w] sp = SimpleMatchParams( model_id=p.model_id, scene_id=p.scene_id or p.model_id, roi=p.roi, tipo=p.tipo, simmetria=p.simmetria, scala=p.scala, precisione=p.precisione, use_polarity=p.use_polarity, use_gpu=p.use_gpu, ) tech = _simple_to_technical(sp, roi_img) m = LineShapeMatcher( num_features=tech["num_features"], weak_grad=tech["weak_grad"], strong_grad=tech["strong_grad"], angle_range_deg=(tech["angle_min"], tech["angle_max"]), angle_step_deg=tech["angle_step"], scale_range=(tech["scale_min"], tech["scale_max"]), scale_step=tech["scale_step"], spread_radius=tech["spread_radius"], pyramid_levels=tech["pyramid_levels"], use_polarity=p.use_polarity, use_gpu=p.use_gpu, ) m.train(roi_img) safe_name = "".join(c for c in p.name if c.isalnum() or c in "._-") if not safe_name: raise HTTPException(400, "Nome ricetta non valido") if not safe_name.endswith(".npz"): safe_name += ".npz" target = RECIPES_DIR / safe_name m.save_model(str(target)) return {"name": safe_name, "size": target.stat().st_size, "n_variants": len(m.variants)} @app.get("/recipes") def list_recipes(): files = [] if RECIPES_DIR.is_dir(): for f in sorted(RECIPES_DIR.glob("*.npz")): files.append({"name": f.name, "size": f.stat().st_size}) return {"files": files, "dir": str(RECIPES_DIR)} # Cache di matcher caricati da .npz (V feature). Key: nome ricetta. _RECIPE_MATCHERS: OrderedDict = OrderedDict() _RECIPE_MATCHERS_SIZE = 4 @app.post("/recipes/{name}/load") def load_recipe(name: str): """Carica ricetta .npz e popola cache matcher in memoria. Una volta caricata, /match_recipe la usa direttamente senza re-train. Halcon-equivalent read_shape_model + handle. """ safe_name = "".join(c for c in name if c.isalnum() or c in "._-") if not safe_name.endswith(".npz"): safe_name += ".npz" path = RECIPES_DIR / safe_name if not path.is_file(): raise HTTPException(404, f"Ricetta non trovata: {safe_name}") m = LineShapeMatcher.load_model(str(path)) _RECIPE_MATCHERS[safe_name] = m _RECIPE_MATCHERS.move_to_end(safe_name) while len(_RECIPE_MATCHERS) > _RECIPE_MATCHERS_SIZE: _RECIPE_MATCHERS.popitem(last=False) return { "name": safe_name, "n_variants": len(m.variants), "template_size": list(m.template_size), "use_polarity": m.use_polarity, } class RecipeMatchParams(BaseModel): recipe: str scene_id: str # Solo find-time params (training gia' fatto offline) min_score: float = 0.65 max_matches: int = 25 min_recall: float = 0.0 use_soft_score: bool = False subpixel_lm: bool = False nms_iou_threshold: float = 0.3 coarse_stride: int = 1 pyramid_propagate: bool = False greediness: float = 0.0 refine_pose_joint: bool = False search_roi: list[int] | None = None verify_threshold: float = 0.5 scale_penalty: float = 0.0 @app.post("/match_recipe", response_model=MatchResp) def match_recipe(p: RecipeMatchParams): """Match con ricetta pre-trained: zero training, solo find.""" safe_name = p.recipe if p.recipe.endswith(".npz") else f"{p.recipe}.npz" m = _RECIPE_MATCHERS.get(safe_name) if m is None: # Auto-load on demand path = RECIPES_DIR / safe_name if not path.is_file(): raise HTTPException(404, f"Ricetta non trovata: {safe_name}") m = LineShapeMatcher.load_model(str(path)) _RECIPE_MATCHERS[safe_name] = m scene = _load_image(p.scene_id) if scene is None: raise HTTPException(404, "Scena non trovata") search_roi_t = tuple(p.search_roi) if p.search_roi else None t0 = time.time() matches = m.find( scene, min_score=p.min_score, max_matches=p.max_matches, verify_threshold=p.verify_threshold, scale_penalty=p.scale_penalty, min_recall=p.min_recall, use_soft_score=p.use_soft_score, subpixel_lm=p.subpixel_lm, nms_iou_threshold=p.nms_iou_threshold, coarse_stride=p.coarse_stride, pyramid_propagate=p.pyramid_propagate, greediness=p.greediness, refine_pose_joint=p.refine_pose_joint, search_roi=search_roi_t, ) t_find = time.time() - t0 tg = m.template_gray if m.template_gray is not None else np.zeros((1, 1), np.uint8) annotated = _draw_matches(scene, matches, tg, matcher=m) ann_id = _store_image(annotated) return MatchResp( matches=[MatchResult( cx=mt.cx, cy=mt.cy, angle_deg=mt.angle_deg, scale=mt.scale, score=mt.score, bbox_poly=mt.bbox_poly.tolist(), ) for mt in matches], train_time=0.0, find_time=t_find, num_variants=len(m.variants), annotated_id=ann_id, diag=m.get_last_diag() if hasattr(m, "get_last_diag") else None, ) # Mount static app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") def serve(host: str = "127.0.0.1", port: int = 8080): import uvicorn uvicorn.run(app, host=host, port=port, log_level="info") if __name__ == "__main__": serve()