"""Shape-based matcher stile linemod (line2Dup) - Python puro + numpy/OpenCV. Porting algoritmico dell'idea di `meiqua/shape_based_matching` (no MIPP/SIMD — equivalente usando vettorizzazione numpy). Training (costoso, fatto una volta per ricetta): - Per ogni variante (angolo, scala) del template: 1. Sobel → magnitude + orientation 2. Quantizzazione orientation in N_BINS bin (modulo π, edge simmetrici) 3. Estrazione feature sparse top-magnitude con spacing minimo 4. Salvataggio feature = liste (dx, dy, bin) relative al centro-modello Matching (veloce): - Scena processata una sola volta per livello di piramide: Sobel → magnitude → quant orientation → spread (dilate per bin) → response map (N_BINS, H, W) — bit b acceso dove orientamento b presente. - Per ogni variante: score_map[y,x] = Σ resp[b_i][y+dy_i, x+dx_i] / N_features implementato con shift-add vettorizzato (numpy). - Piramide: matching top-level (basso costo, soglia ridotta) + refinement a risoluzione piena attorno ai candidati. Il training supporta una `mask` binaria per modellare solo una regione parziale della ROI (modello non-rettangolare). """ from __future__ import annotations import math import os from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass import cv2 import numpy as np _GOLDEN = (math.sqrt(5.0) - 1.0) / 2.0 # ≈ 0.618 from pm2d._jit_kernels import ( score_by_shift as _jit_score_by_shift, score_bitmap as _jit_score_bitmap, score_bitmap_rescored as _jit_score_bitmap_rescored, score_bitmap_greedy as _jit_score_bitmap_greedy, top_max_per_variant as _jit_top_max_per_variant, popcount_density as _jit_popcount, HAS_NUMBA, ) N_BINS = 8 # default: orientamento mod π (no polarity) N_BINS_POL = 16 # use_polarity=True: orientamento mod 2π (con polarity) def opencl_available() -> bool: """Ritorna True se OpenCV ha backend OpenCL disponibile (GPU).""" try: return bool(cv2.ocl.haveOpenCL()) except Exception: return False def set_gpu_enabled(enabled: bool) -> bool: """Abilita/disabilita backend OpenCL globale di OpenCV. Quando attivato, Sobel/dilate/warpAffine usano UMat con dispatch automatico a kernel GPU (Intel UHD, AMD, NVIDIA via OpenCL ICD). Speedup tipico: 1.5-3x su Sobel+dilate per scene 1920x1080, overhead trascurabile per scene < 640px (transfer CPU<->GPU domina). Halcon-equivalent: 'find_shape_model' con backend GPU integrato. Ritorna True se l'attivazione e' riuscita. """ if not opencl_available(): return False cv2.ocl.setUseOpenCL(bool(enabled)) return cv2.ocl.useOpenCL() def _poly_iou(p1: np.ndarray, p2: np.ndarray) -> float: """IoU tra due poligoni convessi (4 vertici, float32) via cv2.intersectConvexConvex. Usa OpenCV (cv2.intersectConvexConvex) per intersezione esatta: ritorna area intersezione / area unione. Robusto a rotazioni qualsiasi (anti-orarie/orarie) - cv2 normalizza orientamento. """ a1 = float(cv2.contourArea(p1)) a2 = float(cv2.contourArea(p2)) if a1 <= 0 or a2 <= 0: return 0.0 inter_area, _ = cv2.intersectConvexConvex( p1.astype(np.float32), p2.astype(np.float32), ) inter_area = float(inter_area) if inter_area <= 0: return 0.0 union = a1 + a2 - inter_area return inter_area / union if union > 0 else 0.0 def _oriented_bbox_polygon( cx: float, cy: float, w: float, h: float, angle_deg: float, ) -> np.ndarray: """Ritorna 4 vertici (float32, shape (4,2)) del bbox orientato. Convenzione coerente con cv2.getRotationMatrix2D usato nel train: rotazione counter-clockwise (matematica) ma sistema immagine y-down, quindi visivamente orario. """ w2, h2 = w / 2.0, h / 2.0 # Vertici template non-ruotato centrati al centro corners = np.array([[-w2, -h2], [w2, -h2], [w2, h2], [-w2, h2]], np.float32) a = np.deg2rad(angle_deg) c, s = np.cos(a), np.sin(a) # cv2.getRotationMatrix2D con angolo a positivo applica R = [[c,s],[-s,c]] # (ruota counter-clockwise nel sistema matematico; y-down → orario) R = np.array([[c, s], [-s, c]], np.float32) rotated = corners @ R.T rotated[:, 0] += cx rotated[:, 1] += cy return rotated @dataclass class Match: cx: float cy: float angle_deg: float scale: float score: float bbox_poly: np.ndarray # (4, 2) float32 - 4 vertici ordinati (ruotato) variant_idx: int = -1 # indice variante usata (per overlay coerente) @dataclass class _LevelFeatures: """Feature piramidate (livello l = downsample /2^l).""" dx: np.ndarray # int32 dy: np.ndarray # int32 bin: np.ndarray # int8 n: int @dataclass class _Variant: """Template precomputato (una pose).""" angle_deg: float scale: float # Feature piramide: levels[0] = full-res, levels[l] = /2^l levels: list[_LevelFeatures] # Bbox kernel (per visualizzazione / limiti ricerca) kh: int kw: int cx_local: float # centro-modello dentro al bbox kernel cy_local: float # Indice template view (X feature - multi-template ensemble). # 0 = template principale del train(); 1+ = view aggiunte via # add_template_view(). Usato in _verify_ncc/_compute_recall per # scegliere il template gray corretto per match. view_idx: int = 0 class LineShapeMatcher: """Shape-based matcher linemod-style - Python/numpy, no SIMD.""" def __init__( self, num_features: int = 96, weak_grad: float = 30.0, strong_grad: float = 60.0, angle_range_deg: tuple[float, float] = (0.0, 360.0), angle_step_deg: float = 5.0, scale_range: tuple[float, float] = (1.0, 1.0), scale_step: float = 0.1, spread_radius: int = 4, min_feature_spacing: int = 3, pyramid_levels: int = 2, top_score_factor: float = 0.5, n_threads: int | None = None, use_polarity: bool = False, use_gpu: bool = False, ) -> None: self.num_features = num_features self.weak_grad = weak_grad self.strong_grad = strong_grad self.angle_range_deg = angle_range_deg self.angle_step_deg = angle_step_deg self.scale_range = scale_range self.scale_step = scale_step self.spread_radius = spread_radius self.min_feature_spacing = min_feature_spacing self.pyramid_levels = max(1, pyramid_levels) self.top_score_factor = top_score_factor self.n_threads = n_threads or max(1, (os.cpu_count() or 2) - 1) # Polarity-aware: 16 bin (orientamento mod 2π) usando bitmap uint16. # Distingue edge "chiaro→scuro" da "scuro→chiaro" → 2x selettività. # Usare quando background di scena varia (chiaro/scuro) e orientamento # template e' direzionale. self.use_polarity = use_polarity self._n_bins = N_BINS_POL if use_polarity else N_BINS # GPU offload per Sobel/dilate/warpAffine via cv2.UMat (OpenCL). # Effettivo solo se opencl_available(); altrimenti silent fallback CPU. self.use_gpu = bool(use_gpu and opencl_available()) if self.use_gpu: cv2.ocl.setUseOpenCL(True) self.variants: list[_Variant] = [] self.template_size: tuple[int, int] = (0, 0) self.template_gray: np.ndarray | None = None # Maschera usata in training (propagata al refine per coerenza). self._train_mask: np.ndarray | None = None # Multi-template ensemble (X feature): N view dello stesso pezzo # (chiari/scuri, condizioni diverse). Template principale e' [0], # view aggiunte via add_template_view() sono [1+]. Match restituisce # la view che ha matchato meglio. self._view_templates: list[tuple[np.ndarray, np.ndarray | None]] = [] # --- Helpers ------------------------------------------------------- @staticmethod def _to_gray(img: np.ndarray) -> np.ndarray: if img.ndim == 3: return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) return img def _gradient(self, gray) -> tuple[np.ndarray, np.ndarray]: # Accetta np.ndarray o cv2.UMat (per path GPU OpenCL). gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3) mag = cv2.magnitude(gx, gy) # Quantizzazione orientation richiede CPU array (np ops): scarica # da GPU se necessario. if isinstance(gx, cv2.UMat): gx = gx.get(); gy = gy.get(); mag = mag.get() ang = np.arctan2(gy, gx) # [-π, π] if self.use_polarity: # Mod 2π: bin 0..15 codifica direzione + polarity edge. ang_full = np.where(ang < 0, ang + 2.0 * np.pi, ang) bins = np.floor(ang_full / (2.0 * np.pi) * N_BINS_POL).astype(np.int16) bins = np.clip(bins, 0, N_BINS_POL - 1) else: ang_mod = np.where(ang < 0, ang + np.pi, ang) bins = np.floor(ang_mod / np.pi * N_BINS).astype(np.int16) bins = np.clip(bins, 0, N_BINS - 1) return mag, bins def _hysteresis_mask(self, mag: np.ndarray) -> np.ndarray: """Edge mask con hysteresis (Halcon Contrast='auto' two-threshold). Procedura: 1. seed = pixel con mag >= strong_grad (edge nitidi) 2. weak = pixel con mag >= weak_grad (edge candidati) 3. Espande seed dentro weak via componenti connesse 8-vicini Risultato: edge debole connesso a edge forte viene PROMOSSO a feature valida; edge debole isolato (rumore) viene SCARTATO. Riduce sia falsi-positivi (rumore puro) sia falsi-negativi (continuita' interrotta su edge sottili a basso contrasto). """ weak = (mag >= self.weak_grad).astype(np.uint8) strong = (mag >= self.strong_grad).astype(np.uint8) # connectedComponentsWithStats su weak: per ogni componente, # se contiene almeno un pixel strong → tutto componente accettato n_lab, labels = cv2.connectedComponents(weak, connectivity=8) if n_lab <= 1: return strong.astype(bool) # Label dei pixel strong: marker per componenti da accettare strong_labels = np.unique(labels[strong > 0]) strong_labels = strong_labels[strong_labels > 0] # 0 = bg if len(strong_labels) == 0: return strong.astype(bool) # Mask = appartiene a label di componente "promosso" keep = np.isin(labels, strong_labels) return keep def _extract_features( self, mag: np.ndarray, bins: np.ndarray, mask: np.ndarray | None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: if mask is not None: mag = np.where(mask > 0, mag, 0) # Halcon-style edge selection: hysteresis tra weak_grad e strong_grad. # Edge weak connessi a edge strong sono inclusi (continuita' bordi). # Se weak_grad >= strong_grad → fallback a soglia singola strong. if self.weak_grad < self.strong_grad: edge = self._hysteresis_mask(mag) else: edge = mag >= self.strong_grad ys, xs = np.where(edge) if len(xs) == 0: return (np.zeros(0, np.int32),) * 3 vals = mag[ys, xs] order = np.argsort(-vals) spc = max(1, self.min_feature_spacing) occupied = np.zeros(mag.shape, dtype=bool) picked_x: list[int] = [] picked_y: list[int] = [] picked_b: list[int] = [] for idx in order: y, x = int(ys[idx]), int(xs[idx]) if occupied[y, x]: continue picked_x.append(x); picked_y.append(y) picked_b.append(int(bins[y, x])) y0 = max(0, y - spc); y1 = min(mag.shape[0], y + spc + 1) x0 = max(0, x - spc); x1 = min(mag.shape[1], x + spc + 1) occupied[y0:y1, x0:x1] = True if len(picked_x) >= self.num_features: break return (np.array(picked_x, np.int32), np.array(picked_y, np.int32), np.array(picked_b, np.int8)) # --- Save / Load (Halcon-style write_shape_model / read_shape_model) def save_model(self, path: str) -> None: """Salva matcher addestrato su disco (formato .npz). Persiste: parametri, template_gray, mask, e tutte le varianti pre-computate (con piramide). Halcon-equivalent write_shape_model. Caso d'uso: training offline su workstation, deploy su macchina di linea senza re-train (zero secondi di startup matching). """ if not self.variants: raise RuntimeError("Modello non addestrato: chiamare train() prima.") # Flatten varianti in array piatti (npz non ama dataclass nested) n_vars = len(self.variants) n_levels = len(self.variants[0].levels) var_meta = np.zeros((n_vars, 6), dtype=np.float32) # ang, scale, kh, kw, cxl, cyl all_dx, all_dy, all_bin, all_offsets = [], [], [], [] offset = 0 all_offsets_per_level = [[] for _ in range(n_levels)] all_dx_per_level = [[] for _ in range(n_levels)] all_dy_per_level = [[] for _ in range(n_levels)] all_bin_per_level = [[] for _ in range(n_levels)] for vi, var in enumerate(self.variants): var_meta[vi] = ( var.angle_deg, var.scale, var.kh, var.kw, var.cx_local, var.cy_local, ) for li, lvl in enumerate(var.levels): all_offsets_per_level[li].append(len(all_dx_per_level[li])) all_dx_per_level[li].extend(lvl.dx.tolist()) all_dy_per_level[li].extend(lvl.dy.tolist()) all_bin_per_level[li].extend(lvl.bin.tolist()) for li in range(n_levels): all_offsets_per_level[li].append(len(all_dx_per_level[li])) out = { "_format_version": np.array([1], dtype=np.int32), "params": np.array([ self.num_features, self.weak_grad, self.strong_grad, self.angle_range_deg[0], self.angle_range_deg[1], self.angle_step_deg, self.scale_range[0], self.scale_range[1], self.scale_step, self.spread_radius, self.min_feature_spacing, self.pyramid_levels, self.top_score_factor, int(self.use_polarity), ], dtype=np.float64), "template_gray": self.template_gray, "train_mask": self._train_mask, "var_meta": var_meta, "n_levels": np.array([n_levels], dtype=np.int32), } for li in range(n_levels): out[f"dx_l{li}"] = np.asarray(all_dx_per_level[li], dtype=np.int32) out[f"dy_l{li}"] = np.asarray(all_dy_per_level[li], dtype=np.int32) out[f"bin_l{li}"] = np.asarray(all_bin_per_level[li], dtype=np.int8) out[f"offsets_l{li}"] = np.asarray(all_offsets_per_level[li], dtype=np.int32) np.savez_compressed(path, **out) @classmethod def load_model(cls, path: str) -> "LineShapeMatcher": """Carica matcher pre-addestrato da .npz salvato con save_model. Halcon-equivalent read_shape_model. Bypassa completamente train(): deploy production = istantaneo. """ data = np.load(path, allow_pickle=False) params = data["params"] m = cls( num_features=int(params[0]), weak_grad=float(params[1]), strong_grad=float(params[2]), angle_range_deg=(float(params[3]), float(params[4])), angle_step_deg=float(params[5]), scale_range=(float(params[6]), float(params[7])), scale_step=float(params[8]), spread_radius=int(params[9]), min_feature_spacing=int(params[10]), pyramid_levels=int(params[11]), top_score_factor=float(params[12]), use_polarity=bool(int(params[13])), ) tpl = data["template_gray"] if tpl.ndim > 0 and tpl.size > 0: m.template_gray = tpl m.template_size = (tpl.shape[1], tpl.shape[0]) mk = data["train_mask"] m._train_mask = mk if mk.size > 0 else None var_meta = data["var_meta"] n_levels = int(data["n_levels"][0]) offsets_l = [data[f"offsets_l{li}"] for li in range(n_levels)] dx_l = [data[f"dx_l{li}"] for li in range(n_levels)] dy_l = [data[f"dy_l{li}"] for li in range(n_levels)] bin_l = [data[f"bin_l{li}"] for li in range(n_levels)] m.variants = [] n_vars = var_meta.shape[0] for vi in range(n_vars): ang, scale, kh, kw, cxl, cyl = var_meta[vi] levels = [] for li in range(n_levels): i0 = int(offsets_l[li][vi]) i1 = int(offsets_l[li][vi + 1]) levels.append(_LevelFeatures( dx=dx_l[li][i0:i1].copy(), dy=dy_l[li][i0:i1].copy(), bin=bin_l[li][i0:i1].copy(), n=i1 - i0, )) m.variants.append(_Variant( angle_deg=float(ang), scale=float(scale), levels=levels, kh=int(kh), kw=int(kw), cx_local=float(cxl), cy_local=float(cyl), )) return m def set_angle_range_around( self, center_deg: float, tolerance_deg: float, ) -> None: """Restringe angle_range a (center - tol, center + tol). Comodo helper per scenari in cui l'orientamento del pezzo e' noto a priori entro ±tolerance_deg (es. feeder vibrante con guida meccanica). Riduce drasticamente le varianti generate in train(): es. ±15° vs 360° = 24x meno varianti, training e matching molto piu veloci. Esempio: m.set_angle_range_around(0, 20) # cerca solo in [-20, +20] m.train(template) """ self.angle_range_deg = ( float(center_deg - tolerance_deg), float(center_deg + tolerance_deg), ) def _scale_list(self) -> list[float]: s0, s1 = self.scale_range if s0 >= s1 or self.scale_step <= 0: return [float(s0)] n = int(np.floor((s1 - s0) / self.scale_step)) + 1 return [float(s0 + i * self.scale_step) for i in range(n)] def _auto_angle_step(self) -> float: """Step angolare derivato da dimensione template (Halcon-style). Formula: step ≈ atan(2 / max_side) gradi. Garantisce che la rotazione minima produca uno spostamento di ≥2 px sul perimetro del template (sotto sample il matching coarse perde candidati). Clampato in [0.5°, 10°]. """ max_side = max(self.template_size) if self.template_size != (0, 0) else 64 step = math.degrees(math.atan2(2.0, float(max_side))) return float(np.clip(step, 0.5, 10.0)) def _effective_angle_step(self) -> float: """Risolve angle_step_deg gestendo modalità auto (<=0).""" if self.angle_step_deg <= 0: return self._auto_angle_step() return self.angle_step_deg def _angle_list(self) -> list[float]: a0, a1 = self.angle_range_deg step = self._effective_angle_step() if step <= 0 or a0 >= a1: return [float(a0)] n = int(np.floor((a1 - a0) / step)) return [float(a0 + i * step) for i in range(n)] # --- Training ------------------------------------------------------ def _build_pyramid_features( self, dx: np.ndarray, dy: np.ndarray, bin_: np.ndarray, ) -> list[_LevelFeatures]: """Piramide feature precomputata: livello l = /2^l con dedup.""" levels = [_LevelFeatures(dx=dx.copy(), dy=dy.copy(), bin=bin_.copy(), n=len(dx))] for lvl in range(1, self.pyramid_levels): sf = 2 ** lvl dx_l = (dx // sf).astype(np.int32) dy_l = (dy // sf).astype(np.int32) # Dedup: rimuove feature collassate sullo stesso (dx, dy, bin) key = ((dx_l.astype(np.int64) << 24) | (dy_l.astype(np.int64) << 8) | bin_.astype(np.int64)) _, uniq = np.unique(key, return_index=True) levels.append(_LevelFeatures( dx=dx_l[uniq], dy=dy_l[uniq], bin=bin_[uniq], n=len(uniq), )) return levels def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int: """Genera varianti rotate+scalate con feature sparse + piramide.""" gray = self._to_gray(template_bgr) h, w = gray.shape self.template_size = (w, h) self.template_gray = gray.copy() if mask is None: mask_full = np.full((h, w), 255, dtype=np.uint8) else: mask_full = (mask > 0).astype(np.uint8) * 255 self._train_mask = mask_full.copy() self.variants.clear() # Reset view list: template principale = view 0 self._view_templates = [(gray.copy(), mask_full.copy())] # Invalida cache: template/param cambiati → spread/feature obsoleti. self._refine_feat_cache = {} if hasattr(self, "_scene_cache"): self._scene_cache.clear() self._build_variants_for_view(gray, mask_full, view_idx=0) self._dedup_variants() return len(self.variants) def add_template_view( self, template_bgr: np.ndarray, mask: np.ndarray | None = None, ) -> int: """Aggiunge una view template extra all'ensemble (Halcon-style create_aniso_shape_model con fusione N viste). Genera varianti del nuovo template con stessi parametri (range angle/scale) e le APPENDE a self.variants. NCC/recall usano automaticamente il template della view che ha matchato. Use case: pezzo che cambia aspetto (chiaro/scuro, prima/dopo trattamento, illuminazioni diverse) → un solo matcher resistente. Ritorna numero TOTALE varianti dopo l'aggiunta. Le view sono indicizzate da 1 in poi (0 e' il template del train). """ if not self.variants: raise RuntimeError( "Chiamare train(template_principale) prima di add_template_view") gray = self._to_gray(template_bgr) h, w = gray.shape if (w, h) != self.template_size: # Resize per coerenza con bbox/poly gray = cv2.resize(gray, self.template_size, interpolation=cv2.INTER_LINEAR) if mask is not None: mask = cv2.resize(mask, self.template_size, interpolation=cv2.INTER_NEAREST) if mask is None: mask_full = np.full(gray.shape, 255, dtype=np.uint8) else: mask_full = (mask > 0).astype(np.uint8) * 255 view_idx = len(self._view_templates) self._view_templates.append((gray.copy(), mask_full.copy())) n_before = len(self.variants) self._build_variants_for_view(gray, mask_full, view_idx=view_idx) self._dedup_variants() return len(self.variants) - n_before def _build_variants_for_view( self, gray: np.ndarray, mask_full: np.ndarray, view_idx: int, ) -> None: """Estrae varianti rotate+scalate per UNA view template. Estrazione algorithm identica al train() originale, separato per riuso da add_template_view (multi-template ensemble). """ h, w = gray.shape for s in self._scale_list(): sw = max(16, int(round(w * s))) sh = max(16, int(round(h * s))) gray_s = cv2.resize(gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2 px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder( gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE, ) mask_p = cv2.copyMakeBorder( mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0, ) center = (diag / 2.0, diag / 2.0) for ang in self._angle_list(): M = cv2.getRotationMatrix2D(center, ang, 1.0) gray_r = cv2.warpAffine( gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE, ) mask_r = cv2.warpAffine( mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0, ) mag, bins = self._gradient(gray_r) fx, fy, fb = self._extract_features(mag, bins, mask_r) if len(fx) < 8: continue cx_c = diag / 2.0 cy_c = diag / 2.0 dx = (fx - cx_c).astype(np.int32) dy = (fy - cy_c).astype(np.int32) x0 = int(dx.min()); x1 = int(dx.max()) y0 = int(dy.min()); y1 = int(dy.max()) kw = x1 - x0 + 1 kh = y1 - y0 + 1 cx_local = -x0 cy_local = -y0 levels = self._build_pyramid_features(dx, dy, fb) self.variants.append(_Variant( angle_deg=float(ang), scale=float(s), levels=levels, kh=kh, kw=kw, cx_local=float(cx_local), cy_local=float(cy_local), view_idx=view_idx, )) def _dedup_variants(self) -> int: """Rimuove varianti con feature-set identico (post-quantizzazione). Halcon-style: con angle range = (0, 360) e simmetrie del template, molte rotazioni producono lo stesso set quantizzato di feature. Es: quadrato a 0/90/180/270 deg → stesse features (modulo permutazione). Hash su feature ordinate (livello 0, full-res) elimina i duplicati. Vantaggio: meno varianti = meno chiamate kernel JIT al top-level senza perdere copertura angolare effettiva. Per template asimmetrici non rimuove nulla. """ seen: dict[bytes, int] = {} kept: list[_Variant] = [] removed = 0 for var in self.variants: lvl0 = var.levels[0] order = np.lexsort((lvl0.bin, lvl0.dy, lvl0.dx)) key = ( lvl0.dx[order].tobytes() + b"|" + lvl0.dy[order].tobytes() + b"|" + lvl0.bin[order].tobytes() + b"|" + str(round(var.scale, 4)).encode() ) h = key # diretto, senza hash crypto (collision ok solo se identici) if h in seen: removed += 1 continue seen[h] = len(kept) kept.append(var) self.variants = kept return removed # --- Matching ------------------------------------------------------ def _response_map(self, gray: np.ndarray) -> np.ndarray: """Response map shape (N_BINS, H, W) float32 (legacy path).""" mag, bins = self._gradient(gray) valid = mag >= self.weak_grad k = 2 * self.spread_radius + 1 kernel = np.ones((k, k), dtype=np.uint8) H, W = gray.shape raw = np.zeros((N_BINS, H, W), dtype=np.float32) for b in range(N_BINS): mask_b = ((bins == b) & valid).astype(np.uint8) d = cv2.dilate(mask_b, kernel) raw[b] = d.astype(np.float32) return raw # --- Scene precompute cache (II Halcon-style) ----------------------- _SCENE_CACHE_SIZE = 4 def _scene_cache_key(self, gray: np.ndarray) -> str | None: """Hash compatto della scena + param che influenzano spread/density. Hash su prime 64KB della scena (sufficiente discriminante per scene fotografiche) + parametri matcher rilevanti. None se cache disabilitata (es. scene troppo piccole). """ if gray.size < 100: return None try: import hashlib h = hashlib.md5() sample = gray.tobytes()[:65536] h.update(sample) h.update(f"|{gray.shape}|{gray.dtype}".encode()) h.update( f"|{self.weak_grad}|{self.strong_grad}" f"|{self.spread_radius}|{self._n_bins}" f"|{self.pyramid_levels}".encode() ) return h.hexdigest() except Exception: return None def _scene_cache_get(self, key: str) -> tuple | None: cache = getattr(self, "_scene_cache", None) if cache is None: return None v = cache.get(key) if v is not None: cache.move_to_end(key) return v def _scene_cache_put(self, key: str, value: tuple) -> None: from collections import OrderedDict if not hasattr(self, "_scene_cache"): self._scene_cache = OrderedDict() self._scene_cache[key] = value self._scene_cache.move_to_end(key) while len(self._scene_cache) > self._SCENE_CACHE_SIZE: self._scene_cache.popitem(last=False) def _spread_bitmap(self, gray: np.ndarray) -> np.ndarray: """Spread bitmap: bit b acceso dove bin b è presente nel raggio. dtype: uint8 per N_BINS=8, uint16 per N_BINS_POL=16 (use_polarity). Se use_gpu=True: Sobel + dilate via cv2.UMat (OpenCL kernel GPU). """ if self.use_gpu and not isinstance(gray, cv2.UMat): gray_in = cv2.UMat(np.ascontiguousarray(gray)) else: gray_in = gray mag, bins = self._gradient(gray_in) valid = mag >= self.weak_grad k = 2 * self.spread_radius + 1 kernel = np.ones((k, k), dtype=np.uint8) H, W = (gray.shape if isinstance(gray, np.ndarray) else (gray.get().shape[0], gray.get().shape[1])) nb = self._n_bins dtype = np.uint16 if nb > 8 else np.uint8 spread = np.zeros((H, W), dtype=dtype) # XX optimization: skip dilate per bin senza pixel attivi. # Su scene a bassa varianza orientation (es. pezzi industriali con # poche direzioni dominanti) tipicamente 50-70% dei bin sono vuoti. # Pre-calcolo bin presenti via mask globale; per bin assenti niente # dilate (resta zero nel bitmap). if isinstance(bins, np.ndarray): valid_bins = bins[valid] if isinstance(valid, np.ndarray) else None if valid_bins is not None and valid_bins.size > 0: bin_present = np.zeros(nb, dtype=bool) unique_bins = np.unique(valid_bins) bin_present[unique_bins[unique_bins < nb]] = True else: bin_present = np.zeros(nb, dtype=bool) else: bin_present = np.ones(nb, dtype=bool) for b in range(nb): if not bin_present[b]: continue # XX: nessun pixel di questo bin sopra weak_grad mask_b = ((bins == b) & valid).astype(np.uint8) if self.use_gpu: d = cv2.dilate(cv2.UMat(mask_b), kernel) d_np = d.get() else: d_np = cv2.dilate(mask_b, kernel) spread |= (d_np.astype(dtype) << b) return spread @staticmethod def _score_by_shift( resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray, bin_has_data: np.ndarray | None = None, ) -> np.ndarray: """score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx). Dispatch a kernel Numba JIT se disponibile, altrimenti fallback numpy. """ return _jit_score_by_shift(resp, dx, dy, bins, bin_has_data) @staticmethod def _subpixel_peak( acc: np.ndarray, x: int, y: int, plateau_radius: int = 10, ) -> tuple[float, float]: """Posizione sub-pixel del picco. 1. Plateau saturo → centroide pesato del plateau (peso = score). 2. Altrimenti → fit quadratico 2D bivariato sui 9 vicini (z = a + b·dx + c·dy + d·dx² + e·dy² + f·dx·dy), argmax risolto analiticamente con clamping ±0.5 px. """ H, W = acc.shape val = float(acc[y, x]) # Plateau detection: valori >= val - 0.01 entro raggio limitato y0 = max(0, y - plateau_radius); y1 = min(H, y + plateau_radius + 1) x0 = max(0, x - plateau_radius); x1 = min(W, x + plateau_radius + 1) patch = acc[y0:y1, x0:x1] plateau = patch >= val - 0.01 if plateau.sum() > 1: # Centroide pesato per (score - (max-0.01))² per enfatizzare i top weights = np.where(plateau, patch - (val - 0.01), 0.0).astype(np.float64) weights = weights * weights total = weights.sum() if total > 1e-9: ys_idx, xs_idx = np.indices(patch.shape) cx_w = (xs_idx * weights).sum() / total cy_w = (ys_idx * weights).sum() / total return float(x0 + cx_w), float(y0 + cy_w) ys_m, xs_m = np.where(plateau) return float(x0 + xs_m.mean()), float(y0 + ys_m.mean()) # Fit quadratico 2D bivariato su 3x3 intorno if x <= 0 or x >= W - 1 or y <= 0 or y >= H - 1: return float(x), float(y) # Stencil 3x3: Z[i, j] con i,j ∈ {-1, 0, +1} Z = acc[y - 1:y + 2, x - 1:x + 2].astype(np.float64) # Coefficienti da finite differences b_c = (Z[1, 2] - Z[1, 0]) / 2.0 c_c = (Z[2, 1] - Z[0, 1]) / 2.0 d_c = (Z[1, 2] + Z[1, 0] - 2.0 * Z[1, 1]) / 2.0 e_c = (Z[2, 1] + Z[0, 1] - 2.0 * Z[1, 1]) / 2.0 f_c = (Z[2, 2] - Z[0, 2] - Z[2, 0] + Z[0, 0]) / 4.0 # Max: risolve [2d f; f 2e][dx;dy] = [-b;-c] det = 4.0 * d_c * e_c - f_c * f_c if abs(det) > 1e-9: ox = (-2.0 * e_c * b_c + f_c * c_c) / det oy = (-2.0 * d_c * c_c + f_c * b_c) / det else: # Fallback separabile ox = -b_c / (2.0 * d_c) if abs(d_c) > 1e-6 else 0.0 oy = -c_c / (2.0 * e_c) if abs(e_c) > 1e-6 else 0.0 ox = float(np.clip(ox, -0.5, 0.5)) oy = float(np.clip(oy, -0.5, 0.5)) return x + ox, y + oy def _refine_pose_joint( self, spread0: np.ndarray, template_gray: np.ndarray, cx: float, cy: float, angle_deg: float, scale: float, mask_full: np.ndarray, max_iter: int = 24, tol: float = 1e-3, ) -> tuple[float, float, float, float]: """Refine congiunto (cx, cy, angle) via Nelder-Mead 3D. Ottimizza simultaneamente posizione e angolo (vs golden search 1D sull'angolo poi quadratico 2D su xy che alterna assi). Halcon-style: un singolo iter LM stila il match a precisione sub-pixel + sub-step. Ritorna (angle, score, cx, cy) dove score e quello calcolato sulla scena spread (no template gray). """ h, w = template_gray.shape sw = max(16, int(round(w * scale))) sh = max(16, int(round(h * scale))) gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2; px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE) mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0) center = (diag / 2.0, diag / 2.0) H, W = spread0.shape def _score(params: tuple[float, float, float]) -> float: ddx, ddy, dang = params ang = angle_deg + dang M = cv2.getRotationMatrix2D(center, ang, 1.0) gray_r = cv2.warpAffine(gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) mask_r = cv2.warpAffine(mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0) mag, bins = self._gradient(gray_r) fx, fy, fb = self._extract_features(mag, bins, mask_r) if len(fx) < 8: return 0.0 cxe = cx + ddx; cye = cy + ddy ix = int(round(cxe)); iy = int(round(cye)) tot = 0 valid = 0 for i in range(len(fx)): xs = ix + int(fx[i] - center[0]) ys = iy + int(fy[i] - center[1]) if 0 <= xs < W and 0 <= ys < H: bit = np.uint8(1 << int(fb[i])) if spread0[ys, xs] & bit: tot += 1 valid += 1 return -float(tot) / max(1, valid) # minimize -score # Nelder-Mead 3D inline (no scipy). Simplex iniziale: vertice + offset # dx=±0.5px, dy=±0.5px, dθ=±step/2. step_a = self.angle_step_deg / 2.0 if self.angle_step_deg > 0 else 1.0 x0 = np.array([0.0, 0.0, 0.0]) simplex = np.array([ x0, x0 + [0.5, 0.0, 0.0], x0 + [0.0, 0.5, 0.0], x0 + [0.0, 0.0, step_a], ]) fvals = np.array([_score(tuple(s)) for s in simplex]) for _ in range(max_iter): order = np.argsort(fvals) simplex = simplex[order]; fvals = fvals[order] if abs(fvals[-1] - fvals[0]) < tol: break centroid = simplex[:-1].mean(axis=0) xr = centroid + 1.0 * (centroid - simplex[-1]) fr = _score(tuple(xr)) if fvals[0] <= fr < fvals[-2]: simplex[-1] = xr; fvals[-1] = fr continue if fr < fvals[0]: xe = centroid + 2.0 * (centroid - simplex[-1]) fe = _score(tuple(xe)) if fe < fr: simplex[-1] = xe; fvals[-1] = fe else: simplex[-1] = xr; fvals[-1] = fr continue xc = centroid + 0.5 * (simplex[-1] - centroid) fc = _score(tuple(xc)) if fc < fvals[-1]: simplex[-1] = xc; fvals[-1] = fc continue for k in range(1, 4): simplex[k] = simplex[0] + 0.5 * (simplex[k] - simplex[0]) fvals[k] = _score(tuple(simplex[k])) best_i = int(np.argmin(fvals)) ddx, ddy, dang = simplex[best_i] return (angle_deg + float(dang), -float(fvals[best_i]), cx + float(ddx), cy + float(ddy)) def _refine_angle( self, spread0: np.ndarray, # bitmap uint8 (H, W) bit_active: int, template_gray: np.ndarray, cx: float, cy: float, angle_deg: float, scale: float, mask_full: np.ndarray, angle_fine_step: float = 0.5, search_radius: float | None = None, original_score: float | None = None, ) -> tuple[float, float, float, float]: """Ricerca angolare fine (sub-step) attorno al match grezzo. Genera 5 template temporanei a angle ± {0.5, 1.0} * step e sceglie l'angolo con score massimo (parabolic fit sulle 3 score centrali). Ritorna (angle_refined, score, cx_refined, cy_refined). """ # NB: rimosso early-skip su score >= 0.99. Lo score linemod/shape # satura facilmente a 1.0 (specie con pyramid_propagate o spread # ampio) ma NON garantisce angolo preciso: l'angolo grezzo della # variante e' quantizzato a multipli di angle_step (5 deg default). # Refine angolare e' essenziale per orientamento sub-step. if search_radius is None: search_radius = self._effective_angle_step() / 2.0 h, w = template_gray.shape sw = max(16, int(round(w * scale))) sh = max(16, int(round(h * scale))) gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2; px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE) mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0) center = (diag / 2.0, diag / 2.0) H, W = spread0.shape margin = 3 # Cache template features per angolo (chiave: int(round(ang*20)) = # bucket di 0.05°). Golden-search ricontratto puo richiedere lo # stesso bucket piu volte; evita re-warp+gradient+extract (costoso). # Cache a livello matcher per riusare tra chiamate find() su scene # diverse: la rotazione del template non dipende dalla scena. if not hasattr(self, '_refine_feat_cache'): self._refine_feat_cache = {} feat_cache = self._refine_feat_cache cache_scale_key = round(scale * 1000) def _score_at_angle(off: float) -> tuple[float, float, float]: """Ritorna (score, best_cx, best_cy) per angolo = angle_deg + off.""" ang = angle_deg + off ck = (round(ang * 20), cache_scale_key) cached = feat_cache.get(ck) if cached is not None: fx, fy, fb = cached else: M = cv2.getRotationMatrix2D(center, ang, 1.0) gray_r = cv2.warpAffine(gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) mask_r = cv2.warpAffine(mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0) mag, bins = self._gradient(gray_r) fx, fy, fb = self._extract_features(mag, bins, mask_r) # LRU semplice: limita cache a ~256 angoli (8 angoli * 32 candidati) if len(feat_cache) > 256: feat_cache.pop(next(iter(feat_cache))) feat_cache[ck] = (fx, fy, fb) if len(fx) < 8: return (0.0, cx, cy) dx = (fx - center[0]).astype(np.int32) dy = (fy - center[1]).astype(np.int32) y_lo = int(cy) - margin; y_hi = int(cy) + margin + 1 x_lo = int(cx) - margin; x_hi = int(cx) + margin + 1 sh_w = y_hi - y_lo; sw_w = x_hi - x_lo acc = np.zeros((sh_w, sw_w), dtype=np.float32) spread_dtype = spread0.dtype.type for i in range(len(dx)): ddx = int(dx[i]); ddy = int(dy[i]); b = int(fb[i]) bit = spread_dtype(1 << b) sy0 = y_lo + ddy; sy1 = y_hi + ddy sx0 = x_lo + ddx; sx1 = x_hi + ddx a_y0 = max(0, -sy0); a_y1 = sh_w - max(0, sy1 - H) a_x0 = max(0, -sx0); a_x1 = sw_w - max(0, sx1 - W) s_y0 = max(0, sy0); s_y1 = min(H, sy1) s_x0 = max(0, sx0); s_x1 = min(W, sx1) if s_y1 > s_y0 and s_x1 > s_x0: region = spread0[s_y0:s_y1, s_x0:s_x1] acc[a_y0:a_y1, a_x0:a_x1] += ( (region & bit) != 0 ).astype(np.float32) acc /= len(dx) _, max_val, _, max_loc = cv2.minMaxLoc(acc) return (float(max_val), float(x_lo + max_loc[0]), float(y_lo + max_loc[1])) # Golden-section search su [-search_radius, +search_radius]: # converge in log tempo a precisione ~0.1°, ~8 valutazioni vs 5 # ma centrate su picco reale (non sample equispaziati). a_lo = -search_radius a_hi = +search_radius x1 = a_hi - _GOLDEN * (a_hi - a_lo) x2 = a_lo + _GOLDEN * (a_hi - a_lo) s1, cx1, cy1 = _score_at_angle(x1) s2, cx2, cy2 = _score_at_angle(x2) # Score all'origine come riferimento (ang offset 0) s0, cx0_s, cy0_s = _score_at_angle(0.0) best = (angle_deg, s0, cx0_s, cy0_s) tol = 0.1 # gradi for _ in range(8): if s1 > best[1]: best = (angle_deg + x1, s1, cx1, cy1) if s2 > best[1]: best = (angle_deg + x2, s2, cx2, cy2) if abs(a_hi - a_lo) < tol: break if s1 > s2: a_hi = x2 x2 = x1; s2 = s1; cx2 = cx1; cy2 = cy1 x1 = a_hi - _GOLDEN * (a_hi - a_lo) s1, cx1, cy1 = _score_at_angle(x1) else: a_lo = x1 x1 = x2; s1 = s2; cx1 = cx2; cy1 = cy2 x2 = a_lo + _GOLDEN * (a_hi - a_lo) s2, cx2, cy2 = _score_at_angle(x2) return best def _get_view_template( self, view_idx: int, ) -> tuple[np.ndarray | None, np.ndarray | None]: """Ritorna (template_gray, mask) per la view specificata. view_idx 0 = template principale (train), 1+ = view extra aggiunte via add_template_view. Usato per scegliere il template corretto in NCC/recall verification quando il matcher e' ensemble multi-template. """ if 0 <= view_idx < len(self._view_templates): return self._view_templates[view_idx] return self.template_gray, self._train_mask def _compute_recall( self, spread0: np.ndarray, variant: _Variant, cx: float, cy: float, angle_deg: float, ) -> float: """Frazione di feature template che combaciano nello spread scena alla pose. Halcon-equivalent: MinScore originale. """ if self.template_gray is None: return 1.0 h, w = self.template_gray.shape scale = variant.scale sw = max(16, int(round(w * scale))) sh = max(16, int(round(h * scale))) gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_src = ( self._train_mask if self._train_mask is not None else np.full_like(self.template_gray, 255) ) mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2; px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder( gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE, ) mask_p = cv2.copyMakeBorder( mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0, ) center = (diag / 2.0, diag / 2.0) M = cv2.getRotationMatrix2D(center, angle_deg, 1.0) gray_r = cv2.warpAffine(gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) mask_r = cv2.warpAffine(mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0) mag, bins = self._gradient(gray_r) fx, fy, fb = self._extract_features(mag, bins, mask_r) n_feat = len(fx) if n_feat < 4: return 0.0 H, W = spread0.shape spread_dtype = spread0.dtype.type ix = int(round(cx)); iy = int(round(cy)) hits = 0 for i in range(n_feat): xs = ix + int(fx[i] - center[0]) ys = iy + int(fy[i] - center[1]) if 0 <= xs < W and 0 <= ys < H: bit = spread_dtype(1 << int(fb[i])) if spread0[ys, xs] & bit: hits += 1 return hits / n_feat def _compute_soft_score( self, scene_gray: np.ndarray, variant: _Variant, cx: float, cy: float, angle_deg: float, ) -> float: """Soft-margin gradient similarity (Halcon Metric='use_polarity').""" if self.template_gray is None: return 0.0 h, w = self.template_gray.shape scale = variant.scale sw = max(16, int(round(w * scale))) sh = max(16, int(round(h * scale))) gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_src = ( self._train_mask if self._train_mask is not None else np.full_like(self.template_gray, 255) ) mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2; px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder( gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE, ) mask_p = cv2.copyMakeBorder( mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0, ) center = (diag / 2.0, diag / 2.0) M = cv2.getRotationMatrix2D(center, angle_deg, 1.0) gray_r = cv2.warpAffine(gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) mask_r = cv2.warpAffine(mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0) gx_t = cv2.Sobel(gray_r, cv2.CV_32F, 1, 0, ksize=3) gy_t = cv2.Sobel(gray_r, cv2.CV_32F, 0, 1, ksize=3) mag_t = cv2.magnitude(gx_t, gy_t) _, bins_t = self._gradient(gray_r) fx, fy, _ = self._extract_features(mag_t, bins_t, mask_r) if len(fx) < 4: return 0.0 gx_s = cv2.Sobel(scene_gray, cv2.CV_32F, 1, 0, ksize=3) gy_s = cv2.Sobel(scene_gray, cv2.CV_32F, 0, 1, ksize=3) H, W = scene_gray.shape ix = int(round(cx)); iy = int(round(cy)) sims = []; weights = [] for i in range(len(fx)): xs = ix + int(fx[i] - center[0]) ys = iy + int(fy[i] - center[1]) if not (0 <= xs < W and 0 <= ys < H): continue tx = float(gx_t[int(fy[i]), int(fx[i])]) ty = float(gy_t[int(fy[i]), int(fx[i])]) sx = float(gx_s[ys, xs]); sy = float(gy_s[ys, xs]) tm = math.hypot(tx, ty); sm = math.hypot(sx, sy) if tm < 1e-3 or sm < 1e-3: continue cos_sim = (tx * sx + ty * sy) / (tm * sm) cos_sim = max(0.0, cos_sim) if self.use_polarity else abs(cos_sim) sims.append(cos_sim); weights.append(min(sm, 255.0)) if not sims: return 0.0 sims_arr = np.asarray(sims, dtype=np.float32) w_arr = np.asarray(weights, dtype=np.float32) return float((sims_arr * w_arr).sum() / (w_arr.sum() + 1e-9)) def _subpixel_refine_lm( self, scene_gray: np.ndarray, variant: _Variant, cx: float, cy: float, angle_deg: float, n_iters: int = 2, ) -> tuple[float, float]: """Sub-pixel refinement iterativo via gradient-field least-squares. Halcon-equivalent SubPixel='least_squares_high'. Precisione attesa 0.05 px (vs 0.5 px del fit quadratic 2D). """ if self.template_gray is None: return cx, cy h, w = self.template_gray.shape scale = variant.scale sw = max(16, int(round(w * scale))) sh = max(16, int(round(h * scale))) gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_src = ( self._train_mask if self._train_mask is not None else np.full_like(self.template_gray, 255) ) mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2; px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder( gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE, ) mask_p = cv2.copyMakeBorder( mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0, ) center = (diag / 2.0, diag / 2.0) M = cv2.getRotationMatrix2D(center, angle_deg, 1.0) gray_r = cv2.warpAffine(gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) mask_r = cv2.warpAffine(mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0) gx_t = cv2.Sobel(gray_r, cv2.CV_32F, 1, 0, ksize=3) gy_t = cv2.Sobel(gray_r, cv2.CV_32F, 0, 1, ksize=3) mag_t = cv2.magnitude(gx_t, gy_t) _, bins_t = self._gradient(gray_r) fx, fy, _ = self._extract_features(mag_t, bins_t, mask_r) if len(fx) < 4: return cx, cy n = len(fx) ddx_t = (fx - center[0]).astype(np.float32) ddy_t = (fy - center[1]).astype(np.float32) gx_tf = np.array([gx_t[int(fy[i]), int(fx[i])] for i in range(n)], dtype=np.float32) gy_tf = np.array([gy_t[int(fy[i]), int(fx[i])] for i in range(n)], dtype=np.float32) mag_tf = np.hypot(gx_tf, gy_tf) + 1e-6 nx_t = gx_tf / mag_tf ny_t = gy_tf / mag_tf gx_s = cv2.Sobel(scene_gray, cv2.CV_32F, 1, 0, ksize=3) gy_s = cv2.Sobel(scene_gray, cv2.CV_32F, 0, 1, ksize=3) H, W = scene_gray.shape cur_cx, cur_cy = float(cx), float(cy) for _ in range(n_iters): xs = cur_cx + ddx_t ys = cur_cy + ddy_t xs_c = np.clip(xs, 0, W - 1.001) ys_c = np.clip(ys, 0, H - 1.001) x0 = xs_c.astype(np.int32); y0 = ys_c.astype(np.int32) ax = xs_c - x0; ay = ys_c - y0 def _bilin(g): v00 = g[y0, x0]; v10 = g[y0, x0 + 1] v01 = g[y0 + 1, x0]; v11 = g[y0 + 1, x0 + 1] return ((1 - ax) * (1 - ay) * v00 + ax * (1 - ay) * v10 + (1 - ax) * ay * v01 + ax * ay * v11) sx_v = _bilin(gx_s) sy_v = _bilin(gy_s) mag_s = np.hypot(sx_v, sy_v) + 1e-6 nx_s = sx_v / mag_s ny_s = sy_v / mag_s w = np.minimum(mag_s, 255.0).astype(np.float32) err_x = (nx_s - nx_t) * w err_y = (ny_s - ny_t) * w step_x = -float(err_x.sum()) / (w.sum() + 1e-6) step_y = -float(err_y.sum()) / (w.sum() + 1e-6) step_x = max(-1.0, min(1.0, step_x)) step_y = max(-1.0, min(1.0, step_y)) cur_cx += step_x cur_cy += step_y if abs(step_x) < 0.02 and abs(step_y) < 0.02: break return cur_cx, cur_cy def _verify_ncc( self, scene_gray: np.ndarray, cx: float, cy: float, angle_deg: float, scale: float, view_idx: int = 0, ) -> float: """NCC tra template warpato alla pose e scena sottostante. Lavora su un **crop locale** della scena di lato = diagonale del template ruotato+scalato, non sull'intera scena. Su scene grandi (1920×1080) taglia drasticamente il costo del warp per ogni match. Ritorna score [-1, 1]. Usato come filtro anti-falso-positivo: il matcher linemod può dare score alto su texture generiche ma sovrapponendo il template gray i pixel non corrispondono. """ t, train_mask = self._get_view_template(view_idx) if t is None: return 1.0 h, w = t.shape cx_t = (w - 1) / 2.0 cy_t = (h - 1) / 2.0 # Bounding box del template ruotato/scalato attorno a (cx, cy) diag = int(np.ceil(np.hypot(w, h) * scale)) + 8 H, W = scene_gray.shape x0 = int(round(cx)) - diag // 2 y0 = int(round(cy)) - diag // 2 cx0 = max(0, x0); cy0 = max(0, y0) cx1 = min(W, x0 + diag); cy1 = min(H, y0 + diag) if cx1 - cx0 < 10 or cy1 - cy0 < 10: return 0.0 scn_crop = scene_gray[cy0:cy1, cx0:cx1] ch, cw = scn_crop.shape M = cv2.getRotationMatrix2D((cx_t, cy_t), angle_deg, scale) # Porta il centro del template a (cx - cx0, cy - cy0) del crop M[0, 2] += (cx - cx0) - cx_t M[1, 2] += (cy - cy0) - cy_t warped = cv2.warpAffine( t, M, (cw, ch), flags=cv2.INTER_LINEAR, borderValue=0, ) if train_mask is not None: mask_src = train_mask else: mask_src = np.full_like(t, 255) mask_w = cv2.warpAffine( mask_src, M, (cw, ch), flags=cv2.INTER_NEAREST, borderValue=0, ) valid = mask_w > 0 if valid.sum() < 20: return 0.0 tpl = warped[valid].astype(np.float32) scn = scn_crop[valid].astype(np.float32) tm = tpl - tpl.mean() sm = scn - scn.mean() # Std minimo: se template o scena patch sono quasi uniformi # (es. zona di sfondo bianco/nero), NCC e instabile e da false # high-correlation. Halcon-style: scarta match. tpl_var = float((tm * tm).sum()) scn_var = float((sm * sm).sum()) n_pix = float(valid.sum()) if tpl_var < 1e-3 * n_pix or scn_var < 1e-3 * n_pix: return 0.0 denom = np.sqrt(tpl_var * scn_var) + 1e-9 return float((tm * sm).sum() / denom) def find( self, scene_bgr: np.ndarray, min_score: float = 0.6, max_matches: int = 20, nms_radius: int | None = None, refine_angle: bool = True, subpixel: bool = True, verify_ncc: bool = True, verify_threshold: float = 0.4, ncc_skip_above: float = 1.01, # disabilitato di default: NCC sempre coarse_angle_factor: int = 2, coarse_stride: int = 1, scale_penalty: float = 0.0, search_roi: tuple[int, int, int, int] | None = None, pyramid_propagate: bool = False, # off di default: meno duplicati propagate_topk: int = 4, refine_pose_joint: bool = False, greediness: float = 0.0, batch_top: bool = False, nms_iou_threshold: float = 0.3, min_recall: float = 0.0, use_soft_score: bool = False, subpixel_lm: bool = False, debug: bool = False, profile: bool = False, ) -> list[Match]: """ scale_penalty: se > 0, riduce lo score per match a scala diversa da 1.0: score_final = score_shape * max(0, 1 - scale_penalty * |scale - 1|) Utile se l'operatore vuole che match "identico al template anche per dimensione" abbia score più alto di match "stessa forma, dimensione diversa". scale_penalty=0 (default) = comportamento shape puro. search_roi: (x, y, w, h) limita la ricerca a una regione della scena. Equivalente a Halcon set_aoi: il matching opera su crop locale e le coordinate output sono ri-traslate al sistema scena originale. Usare quando si conosce a priori l'area in cui il pezzo può apparire (es. feeder a posizione fissa) → costo proporzionale a w·h invece di W·H. """ if not self.variants: raise RuntimeError("Matcher non addestrato: chiamare train() prima.") # Diagnostic counter: traccia perche' candidati sono droppati lungo # la pipeline. Esposto via get_last_diag() o ritornato implicitamente # se debug=True (vedi sotto). diag = { "n_variants_total": len(self.variants), "n_variants_top_evaluated": 0, "n_variants_top_passed": 0, "n_variants_full_evaluated": 0, "n_raw_candidates": 0, "n_after_pre_nms": 0, "drop_ncc_low": 0, "drop_min_score_post_avg": 0, "drop_recall_low": 0, "drop_bbox_out_of_scene": 0, "drop_nms_iou": 0, "n_variants_pruned_histogram": 0, "n_final": 0, "top_thresh_used": 0.0, "verify_threshold_used": float(verify_threshold), "min_score_used": float(min_score), "min_recall_used": float(min_recall), "use_polarity": bool(self.use_polarity), "use_soft_score": bool(use_soft_score), "subpixel_lm": bool(subpixel_lm), } self._last_diag = diag # GGG: profile mode → timing per fase, esposto via get_last_profile() import time as _time prof = {} if profile else None _t_prev = _time.perf_counter() if profile else 0.0 def _checkpoint(name: str): nonlocal _t_prev if prof is None: return now = _time.perf_counter() prof[name] = (now - _t_prev) * 1000.0 # ms _t_prev = now self._last_profile = prof gray_full = self._to_gray(scene_bgr) _checkpoint("to_gray") # Applica ROI di ricerca: restringe scena a crop, ricorda offset per # ri-traslare le coordinate dei match a fine pipeline. if search_roi is not None: rx, ry, rw, rh = search_roi H_s, W_s = gray_full.shape rx = max(0, int(rx)); ry = max(0, int(ry)) rw = max(1, min(int(rw), W_s - rx)) rh = max(1, min(int(rh), H_s - ry)) gray0 = gray_full[ry:ry + rh, rx:rx + rw] roi_offset = (rx, ry) else: gray0 = gray_full roi_offset = (0, 0) # Cache pre-compute scena (II Halcon-style): hash bytes scene + param # gradient/spread → riusa spread piramide + density tra find() # consecutive con stessa scena (typical UI tuning: slider produce # 10+ find() su scena identica). Risparmia ~80% del costo non-kernel. cache_key = self._scene_cache_key(gray0) cached = self._scene_cache_get(cache_key) if cache_key else None if cached is not None: grays, spread_top, bit_active_top, density_top, spread0, \ bit_active_full, density_full, top = cached else: grays = [gray0] for _ in range(self.pyramid_levels - 1): grays.append(cv2.pyrDown(grays[-1])) top = len(grays) - 1 spread_top = self._spread_bitmap(grays[top]) bit_active_top = int( sum(1 << b for b in range(self._n_bins) if (spread_top & (spread_top.dtype.type(1) << b)).any()) ) density_top = _jit_popcount(spread_top) # spread0 + density_full computati piu sotto, quindi salvo dopo. spread0 = None bit_active_full = None density_full = None _checkpoint("spread_top") if nms_radius is None: nms_radius = max(8, min(self.template_size) // 2) # Pruning adattivo allo step angolare: con step piccolo (<= 3 deg) # ci sono molte varianti vicine, gli score top-level sono ravvicinati # e top_thresh*0.5 e' troppo aggressivo: scarta varianti valide che # sarebbero state riprese al full-res. Stessa cosa per # coarse_angle_factor (skip 1 ogni 2): con step fine non e' utile. # Risultato osservato: precisione "veloce" 10° dava risultati # migliori di "preciso" 2° proprio perche evitava il pruning. eff_step = self._effective_angle_step() top_factor = self.top_score_factor cf_eff = max(1, coarse_angle_factor) if eff_step <= 3.0: top_factor = max(top_factor, 0.7) cf_eff = 1 top_thresh = min_score * top_factor diag["top_thresh_used"] = float(top_thresh) tw, th = self.template_size # density_top gia' computato sopra (cache o miss) sf_top = 2 ** top bg_cache_top: dict[float, np.ndarray] = {} bg_cache_full: dict[float, np.ndarray] = {} unique_scales = sorted({var.scale for var in self.variants}) def _bg_for_scale(density: np.ndarray, scale: float, divisor: int) -> np.ndarray: bw = max(9, int(round(tw * scale / divisor))) bh = max(9, int(round(th * scale / divisor))) sm = cv2.boxFilter(density, cv2.CV_32F, (bw, bh)) return np.clip(sm / N_BINS, 0.0, 0.99) for sc in unique_scales: bg_cache_top[sc] = _bg_for_scale(density_top, sc, sf_top) def _rescore(score: np.ndarray, bg: np.ndarray) -> np.ndarray: return np.maximum(0.0, (score - bg) / (1.0 - bg + 1e-6)) # Coarse-to-fine angolare: # 1) Raggruppa varianti per scala, ordina per angolo # 2) Top-level: valuta solo 1 ogni coarse_angle_factor varianti # 3) Espandi ai vicini nel full-res variants_by_scale: dict[float, list[int]] = {} for vi, var in enumerate(self.variants): variants_by_scale.setdefault(var.scale, []).append(vi) coarse_idx_list: list[int] = [] # varianti da valutare al top neighbor_map: dict[int, list[int]] = {} # vi_coarse -> indici vicini cf = cf_eff for scale_key, vi_list in variants_by_scale.items(): vi_sorted = sorted(vi_list, key=lambda i: self.variants[i].angle_deg) n = len(vi_sorted) for i in range(0, n, cf): vi_c = vi_sorted[i] coarse_idx_list.append(vi_c) # Vicini: ±cf/2 attorno a i (stessa scala) half = cf // 2 start = max(0, i - half) end = min(n, i + half + 1) neighbor_map[vi_c] = vi_sorted[start:end] # VV: pruning preliminare via overlap istogramma orientation. # Scene-bins-attivi vs variant-feature-bins. Se la variante ha bin # dominanti che la scena non possiede → score impossibile, skip # senza chiamare il kernel. Costo: O(n_variants * 8 ops). scene_bins = np.array( [bool((bit_active_top >> b) & 1) for b in range(self._n_bins)], dtype=bool, ) if scene_bins.any(): n_scene_active = int(scene_bins.sum()) # Soglia: variante deve avere >= 50% delle sue feature in bin # presenti nella scena. Sotto = score certamente < 0.5. pruned_idx_list = [] n_pruned = 0 for vi in coarse_idx_list: lvl = self.variants[vi].levels[ min(top, len(self.variants[vi].levels) - 1) ] if len(lvl.bin) == 0: continue feat_in_scene = int(np.isin(lvl.bin, np.where(scene_bins)[0]).sum()) ratio = feat_in_scene / len(lvl.bin) if ratio < 0.5 * min_score: n_pruned += 1 continue pruned_idx_list.append(vi) if n_pruned > 0 and pruned_idx_list: coarse_idx_list = pruned_idx_list diag["n_variants_pruned_histogram"] = n_pruned else: diag["n_variants_pruned_histogram"] = 0 # Pruning varianti via top-level (parallelizzato). # coarse_stride > 1: 1 pixel ogni stride (~stride^2 speed-up). # pyramid_propagate=True: top-K picchi per restringere full-res. # greediness > 0: kernel greedy con early-exit (alternativo a rescore). cs = max(1, int(coarse_stride)) peaks_by_vi: dict[int, list[tuple[int, int, float]]] = {} use_greedy_top = greediness > 0.0 def _top_score(vi: int) -> tuple[int, float]: var = self.variants[vi] lvl = var.levels[min(top, len(var.levels) - 1)] if use_greedy_top: # Greedy non supporta stride né rescore bg score = _jit_score_bitmap_greedy( spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top, top_thresh, greediness, ) else: score = _jit_score_bitmap_rescored( spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top, bg_cache_top[var.scale], stride=cs, ) if score.size == 0: return vi, -1.0 best = float(score.max()) if pyramid_propagate and best > 0: flat = score.ravel() k = min(propagate_topk, flat.size) idx = np.argpartition(-flat, k - 1)[:k] peaks: list[tuple[int, int, float]] = [] for i in idx: s = float(flat[i]) if s < top_thresh * 0.7: continue yt, xt = int(i // score.shape[1]), int(i % score.shape[1]) peaks.append((xt, yt, s)) peaks_by_vi[vi] = peaks return vi, best kept_coarse: list[tuple[int, float]] = [] all_top_scores: list[tuple[int, float]] = [] diag["n_variants_top_evaluated"] = len(coarse_idx_list) # batch_top: usa kernel batch single-call con prange-esterno su # varianti. Vince su threadpool quando n_vars >> n_threads e quando # H*W top e' piccolo (overhead chiamate JIT > costo kernel). if (batch_top and HAS_NUMBA and len(coarse_idx_list) > 4): dx_l = []; dy_l = []; bn_l = []; vs_l = [] for vi in coarse_idx_list: var = self.variants[vi] lvl = var.levels[min(top, len(var.levels) - 1)] dx_l.append(lvl.dx); dy_l.append(lvl.dy); bn_l.append(lvl.bin) vs_l.append(var.scale) scores_arr = _jit_top_max_per_variant( spread_top, dx_l, dy_l, bn_l, bg_cache_top, vs_l, bit_active_top, ) for vi, best in zip(coarse_idx_list, scores_arr.tolist()): all_top_scores.append((vi, best)) if best >= top_thresh: kept_coarse.append((vi, best)) elif self.n_threads > 1 and len(coarse_idx_list) > 1: with ThreadPoolExecutor(max_workers=self.n_threads) as ex: for vi, best in ex.map(_top_score, coarse_idx_list): all_top_scores.append((vi, best)) if best >= top_thresh: kept_coarse.append((vi, best)) else: for vi in coarse_idx_list: vi2, best = _top_score(vi) all_top_scores.append((vi2, best)) if best >= top_thresh: kept_coarse.append((vi2, best)) # Fallback adattivo: se il rescore background ha abbattuto tutti # gli score sotto top_thresh (scene texturate pesanti), ripesca # le varianti migliori al top level per dare comunque una chance # alla fase full-res invece di ritornare 0 match. if not kept_coarse and all_top_scores: all_top_scores.sort(key=lambda t: -t[1]) n_keep = max(4, len(all_top_scores) // 10) # Limita a varianti con score top > 0 (non completamente a zero) kept_coarse = [(vi, s) for vi, s in all_top_scores[:n_keep] if s > 0] # Espandi ogni coarse promosso con i suoi vicini (stessa scala, # angoli intermedi non valutati al top) expanded: set[int] = set() score_by_vi: dict[int, float] = {} for vi_c, s_top in kept_coarse: for vi_n in neighbor_map.get(vi_c, [vi_c]): expanded.add(vi_n) # Usa lo score del coarse come stima per il sort successivo score_by_vi[vi_n] = max(score_by_vi.get(vi_n, 0.0), s_top) kept_variants: list[tuple[int, float]] = [ (vi, score_by_vi[vi]) for vi in expanded ] _checkpoint("top_pruning") if not kept_variants: return [] # Cap adattivo: ordina per score top-level e mantieni le più promettenti. # Min: max_matches*8 (margine per NMS cross-variant) # Max: 50% delle varianti totali (protegge performance con molte scale) kept_variants.sort(key=lambda t: -t[1]) max_vars_full = max(max_matches * 8, len(self.variants) // 2) kept_variants = kept_variants[:max_vars_full] diag["n_variants_top_passed"] = len(kept_coarse) diag["n_variants_full_evaluated"] = len(kept_variants) # Full-res (parallelizzato) con bitmap. # Riusa cache se disponibile, altrimenti computa e salva. if spread0 is None: spread0 = self._spread_bitmap(gray0) bit_active_full = int( sum(1 << b for b in range(self._n_bins) if (spread0 & (spread0.dtype.type(1) << b)).any()) ) density_full = _jit_popcount(spread0) # Salva cache scena complete if cache_key is not None: self._scene_cache_put(cache_key, ( grays, spread_top, bit_active_top, density_top, spread0, bit_active_full, density_full, top, )) for sc in unique_scales: bg_cache_full[sc] = _bg_for_scale(density_full, sc, 1) # Margine in full-res attorno ad ogni peak top: copre incertezza # downsampling (sf_top px) + spread_radius + slack per NMS. propagate_margin = sf_top + self.spread_radius + max(8, nms_radius // 2) H_full, W_full = spread0.shape def _full_score(vi: int) -> tuple[int, np.ndarray]: var = self.variants[vi] lvl0 = var.levels[0] if not pyramid_propagate or vi not in peaks_by_vi or not peaks_by_vi[vi]: # Path legacy: scansiona intera scena return vi, _jit_score_bitmap_rescored( spread0, lvl0.dx, lvl0.dy, lvl0.bin, bit_active_full, bg_cache_full[var.scale], ) # Path piramide propagata: valuta solo crop locali attorno # alle posizioni dei picchi top-level (riproiettati a full-res). score_full = np.zeros((H_full, W_full), dtype=np.float32) mark = np.zeros((H_full, W_full), dtype=bool) bg = bg_cache_full[var.scale] for xt, yt, _s in peaks_by_vi[vi]: cx0 = xt * sf_top cy0 = yt * sf_top x_lo = max(0, cx0 - propagate_margin) x_hi = min(W_full, cx0 + propagate_margin + 1) y_lo = max(0, cy0 - propagate_margin) y_hi = min(H_full, cy0 + propagate_margin + 1) if x_hi <= x_lo or y_hi <= y_lo: continue if mark[y_lo:y_hi, x_lo:x_hi].all(): continue # Crop spread + bg, valuta kernel sul crop spread_crop = np.ascontiguousarray(spread0[y_lo:y_hi, x_lo:x_hi]) bg_crop = np.ascontiguousarray(bg[y_lo:y_hi, x_lo:x_hi]) score_crop = _jit_score_bitmap_rescored( spread_crop, lvl0.dx, lvl0.dy, lvl0.bin, bit_active_full, bg_crop, ) score_full[y_lo:y_hi, x_lo:x_hi] = np.maximum( score_full[y_lo:y_hi, x_lo:x_hi], score_crop, ) mark[y_lo:y_hi, x_lo:x_hi] = True return vi, score_full candidates_per_var: list[tuple[int, np.ndarray]] = [] raw: list[tuple[float, int, int, int]] = [] var_indices = [vi for vi, _ in kept_variants] if self.n_threads > 1 and len(var_indices) > 1: with ThreadPoolExecutor(max_workers=self.n_threads) as ex: results = list(ex.map(_full_score, var_indices)) else: results = [_full_score(vi) for vi in var_indices] def _scale_factor(s: float) -> float: """Penalità moltiplicativa per scala diversa da 1.0.""" if scale_penalty > 0.0 and s != 1.0: return max(0.0, 1.0 - scale_penalty * abs(s - 1.0)) return 1.0 for vi, score in results: pen = _scale_factor(self.variants[vi].scale) # Ordinare/sogliare su score penalizzato: un match a scala 1.5 con # score 0.8 e penalty=0.3 effettivamente vale 0.56, non 0.8. score_for_sort = score if pen == 1.0 else score * pen ys, xs = np.where(score_for_sort >= min_score) if len(ys) == 0: continue vals = score_for_sort[ys, xs] K = min(len(vals), max_matches * 5) ord_idx = np.argpartition(-vals, K - 1)[:K] candidates_per_var.append((vi, score)) # score_map originale for i in ord_idx: raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi)) raw.sort(key=lambda c: -c[0]) diag["n_raw_candidates"] = len(raw) _checkpoint("full_kernel") # Mappa vi → score_map per subpixel/refinement score_maps = dict(candidates_per_var) # NMS + subpixel + refinement angolare # Usa mask salvata in train() per coerenza (se ROI poligonale). h, w = self.template_gray.shape if self.template_gray is not None else (0, 0) mask_full = ( self._train_mask if self._train_mask is not None else np.full((h, w), 255, dtype=np.uint8) ) # Plateau radius adattivo al template (evita plateau troppo ampi su # template piccoli: 8% del lato minimo, clampato [3, 10]). plateau_r = max(3, min(10, int(min(self.template_size) * 0.08))) # Pre-NMS rapido su raw con coordinate intere (nms_radius ≥ 8, # la precisione sub-pixel non cambia la decisione di reject). # Subpixel viene calcolato DOPO il pre-NMS solo sui ~pre_cap # preliminary sopravvissuti: prima era chiamato su ogni raw (~58k # chiamate su clip_preciso) anche se la maggior parte veniva poi # scartata dalla NMS, sprecando la parte più costosa del loop. r2 = nms_radius * nms_radius pre_cap = max(max_matches * 3, max_matches + 10) preliminary_int: list[tuple[float, int, int, int]] = [] for score, xi, yi, vi in raw: if any((k[1] - xi) ** 2 + (k[2] - yi) ** 2 < r2 for k in preliminary_int): continue preliminary_int.append((score, xi, yi, vi)) if len(preliminary_int) >= pre_cap: break diag["n_after_pre_nms"] = len(preliminary_int) # Subpixel + refine + verify solo sui candidati pre-NMS (max pre_cap) kept: list[Match] = [] tw, th = self.template_size for score, xi, yi, vi in preliminary_int: if subpixel and vi in score_maps: cx_f, cy_f = self._subpixel_peak( score_maps[vi], xi, yi, plateau_radius=plateau_r, ) else: cx_f, cy_f = float(xi), float(yi) var = self.variants[vi] ang_f = var.angle_deg score_f = score if refine_pose_joint and self.template_gray is not None: ang_f, score_f, cx_f, cy_f = self._refine_pose_joint( spread0, self.template_gray, cx_f, cy_f, var.angle_deg, var.scale, mask_full, ) elif refine_angle and self.template_gray is not None: ang_f, score_f, cx_f, cy_f = self._refine_angle( spread0, bit_active_full, self.template_gray, cx_f, cy_f, var.angle_deg, var.scale, mask_full, search_radius=self._effective_angle_step() / 2.0, original_score=score, ) # Halcon SubPixel='least_squares_high': refinement iterativo # gradient-field per precisione 0.05 px (vs 0.5 quadratic 2D). if subpixel_lm and self.template_gray is not None: cx_lm, cy_lm = self._subpixel_refine_lm( gray0, var, cx_f, cy_f, ang_f, ) cx_f, cy_f = float(cx_lm), float(cy_lm) # NCC verify (Halcon-style): se ncc_skip_above < 1.0 salta # il calcolo per shape-score gia alti. Default 1.01 = NCC sempre, # piu sicuro contro falsi positivi (lo shape-score satura facile). # Quando NCC viene calcolato, lo score finale e' la MEDIA tra # shape-score e NCC: rende lo score piu discriminante per # ranking/visualizzazione (uno score 1.0 vero richiede sia # match shape sia template gray identici). if verify_ncc and float(score_f) < ncc_skip_above: ncc = self._verify_ncc( gray0, cx_f, cy_f, ang_f, var.scale, view_idx=getattr(var, "view_idx", 0), ) if ncc < verify_threshold: diag["drop_ncc_low"] += 1 continue score_f = (float(score_f) + max(0.0, ncc)) * 0.5 # Soft-margin gradient similarity: sostituisce o integra lo # score con metric continua (cos sim gradients) invece di # bin discreto. Halcon-style: piu robusto a piccole rotazioni. if use_soft_score: soft = self._compute_soft_score( gray0, var, cx_f, cy_f, ang_f, ) score_f = (float(score_f) + soft) * 0.5 # Re-check min_score sullo score finale: NCC averaging puo # abbattere lo shape-score sotto la soglia user. Senza questo # check apparivano match con score < min_score (UI confusing). if float(score_f) < min_score: diag["drop_min_score_post_avg"] += 1 continue # Feature recall (Halcon MinScore-style): conta quante feature # template effettivamente combaciano nello spread scena alla # pose finale. Scarta se sotto min_recall (default 0 = off). # Util contro match parziali ad alto NCC ma poche feature reali. if min_recall > 0.0: recall = self._compute_recall( spread0, var, cx_f, cy_f, ang_f, ) if recall < min_recall: diag["drop_recall_low"] += 1 continue # Ri-traslo coord da spazio crop ROI a spazio scena originale. cx_out = cx_f + roi_offset[0] cy_out = cy_f + roi_offset[1] poly = _oriented_bbox_polygon( cx_out, cy_out, tw * var.scale, th * var.scale, ang_f, ) # Reject match con bbox che sfora pesantemente la scena: # spesso indica match spurio (centro derivato male o scala # incoerente). Tollera 25% out-of-bounds, sopra rigetta. H_scn, W_scn = gray_full.shape poly_area = float(cv2.contourArea(poly)) if poly_area > 0: # Clip poly alla scena: intersezione con rettangolo (0,0,W,H) scene_rect = np.array([ [0, 0], [W_scn, 0], [W_scn, H_scn], [0, H_scn], ], dtype=np.float32) inter, _ = cv2.intersectConvexConvex( poly.astype(np.float32), scene_rect, ) inside_ratio = float(inter) / poly_area if inside_ratio < 0.75: diag["drop_bbox_out_of_scene"] += 1 continue # Penalità scala opzionale: score degrada con distanza da 1.0 if scale_penalty > 0.0 and var.scale != 1.0: score_f = float(score_f) * max( 0.0, 1.0 - scale_penalty * abs(var.scale - 1.0) ) # NMS post-refine cross-variant: usa IoU bbox-poligonale invece # di sola distanza centro. Due match orientati diversi ma vicini # (pezzi adiacenti) NON vengono fusi se l'overlap reale e basso; # due match dello stesso pezzo (centri uguali, rotazione simile) # hanno IoU alto e vengono droppati. # Fallback distanza centro per match con bbox degenere. dup = False for k in kept: iou = _poly_iou(k.bbox_poly, poly) if iou > nms_iou_threshold: dup = True break # Sicurezza: centri molto vicini (dentro nms_radius/2) # sempre fusi, anche con orientamenti molto diversi. if (k.cx - cx_out) ** 2 + (k.cy - cy_out) ** 2 < (r2 / 4.0): dup = True break if dup: diag["drop_nms_iou"] += 1 continue kept.append(Match( cx=cx_out, cy=cy_out, angle_deg=ang_f, scale=var.scale, score=score_f, bbox_poly=poly, variant_idx=int(vi), )) if len(kept) >= max_matches: break diag["n_final"] = len(kept) _checkpoint("refine_verify_nms") if profile: self._last_profile = prof if debug: # Debug mode: stampa diagnostica su stderr per visibilita' immediata. import sys as _sys _sys.stderr.write(f"[pm2d.find debug] {self._format_diag(diag)}\n") return kept def _format_diag(self, diag: dict) -> str: """Formatta dict diagnostica in una linea leggibile.""" return ( f"vars: {diag['n_variants_total']} -> " f"top_eval={diag['n_variants_top_evaluated']} " f"top_pass={diag['n_variants_top_passed']} " f"full_eval={diag['n_variants_full_evaluated']} | " f"raw={diag['n_raw_candidates']} " f"pre_nms={diag['n_after_pre_nms']} -> " f"drop[ncc={diag['drop_ncc_low']}, " f"score={diag['drop_min_score_post_avg']}, " f"recall={diag['drop_recall_low']}, " f"bbox={diag['drop_bbox_out_of_scene']}, " f"nms={diag['drop_nms_iou']}] = " f"final={diag['n_final']} (top_thresh={diag['top_thresh_used']:.2f})" ) def get_last_profile(self) -> dict | None: """Ritorna timing per fase dell'ultimo find(profile=True). Chiavi: to_gray, spread_top, top_pruning, full_kernel, refine_verify_nms (millisecondi). Util per identificare bottleneck dove ottimizzare. """ return getattr(self, "_last_profile", None) def get_last_diag(self) -> dict | None: """Ritorna dict diagnostica dell'ultima chiamata find(). Halcon-equivalent: oggi inspect_shape_model espone parziali contatori. Util per debug 'perche' 0 match', tuning interattivo, validation. Vedi diag keys per significato (n_variants_top_evaluated, drop_*, ...). """ return getattr(self, "_last_diag", None)