"""Shape-based matcher stile linemod (line2Dup) - Python puro + numpy/OpenCV. Porting algoritmico dell'idea di `meiqua/shape_based_matching` (no MIPP/SIMD — equivalente usando vettorizzazione numpy). Training (costoso, fatto una volta per ricetta): - Per ogni variante (angolo, scala) del template: 1. Sobel → magnitude + orientation 2. Quantizzazione orientation in N_BINS bin (modulo π, edge simmetrici) 3. Estrazione feature sparse top-magnitude con spacing minimo 4. Salvataggio feature = liste (dx, dy, bin) relative al centro-modello Matching (veloce): - Scena processata una sola volta per livello di piramide: Sobel → magnitude → quant orientation → spread (dilate per bin) → response map (N_BINS, H, W) — bit b acceso dove orientamento b presente. - Per ogni variante: score_map[y,x] = Σ resp[b_i][y+dy_i, x+dx_i] / N_features implementato con shift-add vettorizzato (numpy). - Piramide: matching top-level (basso costo, soglia ridotta) + refinement a risoluzione piena attorno ai candidati. Il training supporta una `mask` binaria per modellare solo una regione parziale della ROI (modello non-rettangolare). """ from __future__ import annotations import os from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass import cv2 import numpy as np from pm2d._jit_kernels import score_by_shift as _jit_score_by_shift, HAS_NUMBA N_BINS = 8 # orientamenti quantizzati modulo π def _oriented_bbox_polygon( cx: float, cy: float, w: float, h: float, angle_deg: float, ) -> np.ndarray: """Ritorna 4 vertici (float32, shape (4,2)) del bbox orientato. Convenzione coerente con cv2.getRotationMatrix2D usato nel train: rotazione counter-clockwise (matematica) ma sistema immagine y-down, quindi visivamente orario. """ w2, h2 = w / 2.0, h / 2.0 # Vertici template non-ruotato centrati al centro corners = np.array([[-w2, -h2], [w2, -h2], [w2, h2], [-w2, h2]], np.float32) a = np.deg2rad(angle_deg) c, s = np.cos(a), np.sin(a) # cv2.getRotationMatrix2D con angolo a positivo applica R = [[c,s],[-s,c]] # (ruota counter-clockwise nel sistema matematico; y-down → orario) R = np.array([[c, s], [-s, c]], np.float32) rotated = corners @ R.T rotated[:, 0] += cx rotated[:, 1] += cy return rotated @dataclass class Match: cx: float cy: float angle_deg: float scale: float score: float bbox_poly: np.ndarray # (4, 2) float32 - 4 vertici ordinati (ruotato) @dataclass class _LevelFeatures: """Feature piramidate (livello l = downsample /2^l).""" dx: np.ndarray # int32 dy: np.ndarray # int32 bin: np.ndarray # int8 n: int @dataclass class _Variant: """Template precomputato (una pose).""" angle_deg: float scale: float # Feature piramide: levels[0] = full-res, levels[l] = /2^l levels: list[_LevelFeatures] # Bbox kernel (per visualizzazione / limiti ricerca) kh: int kw: int cx_local: float # centro-modello dentro al bbox kernel cy_local: float class LineShapeMatcher: """Shape-based matcher linemod-style - Python/numpy, no SIMD.""" def __init__( self, num_features: int = 96, weak_grad: float = 30.0, strong_grad: float = 60.0, angle_range_deg: tuple[float, float] = (0.0, 360.0), angle_step_deg: float = 5.0, scale_range: tuple[float, float] = (1.0, 1.0), scale_step: float = 0.1, spread_radius: int = 4, min_feature_spacing: int = 3, pyramid_levels: int = 2, top_score_factor: float = 0.5, n_threads: int | None = None, ) -> None: self.num_features = num_features self.weak_grad = weak_grad self.strong_grad = strong_grad self.angle_range_deg = angle_range_deg self.angle_step_deg = angle_step_deg self.scale_range = scale_range self.scale_step = scale_step self.spread_radius = spread_radius self.min_feature_spacing = min_feature_spacing self.pyramid_levels = max(1, pyramid_levels) self.top_score_factor = top_score_factor self.n_threads = n_threads or max(1, (os.cpu_count() or 2) - 1) self.variants: list[_Variant] = [] self.template_size: tuple[int, int] = (0, 0) self.template_gray: np.ndarray | None = None # --- Helpers ------------------------------------------------------- @staticmethod def _to_gray(img: np.ndarray) -> np.ndarray: if img.ndim == 3: return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) return img @staticmethod def _gradient(gray: np.ndarray) -> tuple[np.ndarray, np.ndarray]: gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3) mag = cv2.magnitude(gx, gy) ang = np.arctan2(gy, gx) ang_mod = np.where(ang < 0, ang + np.pi, ang) bins = np.floor(ang_mod / np.pi * N_BINS).astype(np.int16) bins = np.clip(bins, 0, N_BINS - 1) return mag, bins def _extract_features( self, mag: np.ndarray, bins: np.ndarray, mask: np.ndarray | None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: if mask is not None: mag = np.where(mask > 0, mag, 0) strong = mag >= self.strong_grad ys, xs = np.where(strong) if len(xs) == 0: return (np.zeros(0, np.int32),) * 3 vals = mag[ys, xs] order = np.argsort(-vals) spc = max(1, self.min_feature_spacing) occupied = np.zeros(mag.shape, dtype=bool) picked_x: list[int] = [] picked_y: list[int] = [] picked_b: list[int] = [] for idx in order: y, x = int(ys[idx]), int(xs[idx]) if occupied[y, x]: continue picked_x.append(x); picked_y.append(y) picked_b.append(int(bins[y, x])) y0 = max(0, y - spc); y1 = min(mag.shape[0], y + spc + 1) x0 = max(0, x - spc); x1 = min(mag.shape[1], x + spc + 1) occupied[y0:y1, x0:x1] = True if len(picked_x) >= self.num_features: break return (np.array(picked_x, np.int32), np.array(picked_y, np.int32), np.array(picked_b, np.int8)) def _scale_list(self) -> list[float]: s0, s1 = self.scale_range if s0 >= s1 or self.scale_step <= 0: return [float(s0)] n = int(np.floor((s1 - s0) / self.scale_step)) + 1 return [float(s0 + i * self.scale_step) for i in range(n)] def _angle_list(self) -> list[float]: a0, a1 = self.angle_range_deg if self.angle_step_deg <= 0 or a0 >= a1: return [float(a0)] n = int(np.floor((a1 - a0) / self.angle_step_deg)) return [float(a0 + i * self.angle_step_deg) for i in range(n)] # --- Training ------------------------------------------------------ def _build_pyramid_features( self, dx: np.ndarray, dy: np.ndarray, bin_: np.ndarray, ) -> list[_LevelFeatures]: """Piramide feature precomputata: livello l = /2^l con dedup.""" levels = [_LevelFeatures(dx=dx.copy(), dy=dy.copy(), bin=bin_.copy(), n=len(dx))] for lvl in range(1, self.pyramid_levels): sf = 2 ** lvl dx_l = (dx // sf).astype(np.int32) dy_l = (dy // sf).astype(np.int32) # Dedup: rimuove feature collassate sullo stesso (dx, dy, bin) key = ((dx_l.astype(np.int64) << 24) | (dy_l.astype(np.int64) << 8) | bin_.astype(np.int64)) _, uniq = np.unique(key, return_index=True) levels.append(_LevelFeatures( dx=dx_l[uniq], dy=dy_l[uniq], bin=bin_[uniq], n=len(uniq), )) return levels def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int: """Genera varianti rotate+scalate con feature sparse + piramide.""" gray = self._to_gray(template_bgr) h, w = gray.shape self.template_size = (w, h) self.template_gray = gray.copy() if mask is None: mask_full = np.full((h, w), 255, dtype=np.uint8) else: mask_full = (mask > 0).astype(np.uint8) * 255 self.variants.clear() for s in self._scale_list(): sw = max(16, int(round(w * s))) sh = max(16, int(round(h * s))) gray_s = cv2.resize(gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2 px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder( gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE, ) mask_p = cv2.copyMakeBorder( mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0, ) center = (diag / 2.0, diag / 2.0) for ang in self._angle_list(): M = cv2.getRotationMatrix2D(center, ang, 1.0) gray_r = cv2.warpAffine( gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE, ) mask_r = cv2.warpAffine( mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0, ) mag, bins = self._gradient(gray_r) fx, fy, fb = self._extract_features(mag, bins, mask_r) if len(fx) < 8: continue cx_c = diag / 2.0 cy_c = diag / 2.0 dx = (fx - cx_c).astype(np.int32) dy = (fy - cy_c).astype(np.int32) x0 = int(dx.min()); x1 = int(dx.max()) y0 = int(dy.min()); y1 = int(dy.max()) kw = x1 - x0 + 1 kh = y1 - y0 + 1 cx_local = -x0 cy_local = -y0 levels = self._build_pyramid_features(dx, dy, fb) self.variants.append(_Variant( angle_deg=float(ang), scale=float(s), levels=levels, kh=kh, kw=kw, cx_local=float(cx_local), cy_local=float(cy_local), )) return len(self.variants) # --- Matching ------------------------------------------------------ def _response_map(self, gray: np.ndarray) -> np.ndarray: """Response map shape (N_BINS, H, W) float32 0/1. Rinormalizzazione anti-background (match vs texture densa) è applicata a valle nel `find()` via `_bg_map` locale. """ mag, bins = self._gradient(gray) valid = mag >= self.weak_grad k = 2 * self.spread_radius + 1 kernel = np.ones((k, k), dtype=np.uint8) H, W = gray.shape raw = np.zeros((N_BINS, H, W), dtype=np.float32) for b in range(N_BINS): mask_b = ((bins == b) & valid).astype(np.uint8) d = cv2.dilate(mask_b, kernel) raw[b] = d.astype(np.float32) return raw @staticmethod def _score_by_shift( resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray, bin_has_data: np.ndarray | None = None, ) -> np.ndarray: """score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx). Dispatch a kernel Numba JIT se disponibile, altrimenti fallback numpy. """ return _jit_score_by_shift(resp, dx, dy, bins, bin_has_data) @staticmethod def _subpixel_peak(acc: np.ndarray, x: int, y: int) -> tuple[float, float]: """Fit parabolico 2D attorno al picco per offset subpixel (±0.5 px).""" H, W = acc.shape if x <= 0 or x >= W - 1 or y <= 0 or y >= H - 1: return float(x), float(y) c = acc[y, x] dx2 = acc[y, x + 1] - 2 * c + acc[y, x - 1] dy2 = acc[y + 1, x] - 2 * c + acc[y - 1, x] dx1 = (acc[y, x + 1] - acc[y, x - 1]) / 2.0 dy1 = (acc[y + 1, x] - acc[y - 1, x]) / 2.0 ox = -dx1 / dx2 if abs(dx2) > 1e-6 else 0.0 oy = -dy1 / dy2 if abs(dy2) > 1e-6 else 0.0 ox = float(np.clip(ox, -0.5, 0.5)) oy = float(np.clip(oy, -0.5, 0.5)) return x + ox, y + oy def _refine_angle( self, resp0: np.ndarray, template_gray: np.ndarray, cx: float, cy: float, angle_deg: float, scale: float, mask_full: np.ndarray, angle_fine_step: float = 0.5, search_radius: float | None = None, ) -> tuple[float, float, float, float]: """Ricerca angolare fine (sub-step) attorno al match grezzo. Genera 5 template temporanei a angle ± {0.5, 1.0} * step e sceglie l'angolo con score massimo (parabolic fit sulle 3 score centrali). Ritorna (angle_refined, score, cx_refined, cy_refined). """ if search_radius is None: search_radius = self.angle_step_deg / 2.0 offsets = np.linspace(-search_radius, search_radius, 5) best = (angle_deg, -1.0, cx, cy) scores_by_off: dict[float, float] = {} h, w = template_gray.shape sw = max(16, int(round(w * scale))) sh = max(16, int(round(h * scale))) gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2; px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE) mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0) center = (diag / 2.0, diag / 2.0) H, W = resp0.shape[1], resp0.shape[2] # Ricerca locale posizione con margine ±2 px sulla (cx, cy) margin = 3 for off in offsets: ang = angle_deg + off M = cv2.getRotationMatrix2D(center, ang, 1.0) gray_r = cv2.warpAffine(gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) mask_r = cv2.warpAffine(mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0) mag, bins = self._gradient(gray_r) fx, fy, fb = self._extract_features(mag, bins, mask_r) if len(fx) < 8: scores_by_off[float(off)] = 0.0 continue dx = (fx - center[0]).astype(np.int32) dy = (fy - center[1]).astype(np.int32) # Finestra locale ±margin attorno a (cx, cy) via slicing vettorizzato y_lo = int(cy) - margin; y_hi = int(cy) + margin + 1 x_lo = int(cx) - margin; x_hi = int(cx) + margin + 1 sh = y_hi - y_lo; sw = x_hi - x_lo acc = np.zeros((sh, sw), dtype=np.float32) for i in range(len(dx)): ddx = int(dx[i]); ddy = int(dy[i]); b = int(fb[i]) sy0 = y_lo + ddy; sy1 = y_hi + ddy sx0 = x_lo + ddx; sx1 = x_hi + ddx a_y0 = max(0, -sy0); a_y1 = sh - max(0, sy1 - H) a_x0 = max(0, -sx0); a_x1 = sw - max(0, sx1 - W) s_y0 = max(0, sy0); s_y1 = min(H, sy1) s_x0 = max(0, sx0); s_x1 = min(W, sx1) if s_y1 > s_y0 and s_x1 > s_x0: acc[a_y0:a_y1, a_x0:a_x1] += resp0[b, s_y0:s_y1, s_x0:s_x1] acc /= len(dx) _, max_val, _, max_loc = cv2.minMaxLoc(acc) scores_by_off[float(off)] = float(max_val) if max_val > best[1]: new_cx = x_lo + float(max_loc[0]) new_cy = y_lo + float(max_loc[1]) best = (ang, float(max_val), new_cx, new_cy) # Parabolic fit su 3 angoli attorno al massimo sorted_offs = sorted(scores_by_off.keys()) best_off = best[0] - angle_deg try: i = sorted_offs.index( min(sorted_offs, key=lambda x: abs(x - best_off)) ) if 0 < i < len(sorted_offs) - 1: s0 = scores_by_off[sorted_offs[i - 1]] s1 = scores_by_off[sorted_offs[i]] s2 = scores_by_off[sorted_offs[i + 1]] denom = (s0 - 2 * s1 + s2) if abs(denom) > 1e-6: delta = 0.5 * (s0 - s2) / denom step = sorted_offs[i + 1] - sorted_offs[i] refined_off = sorted_offs[i] + delta * step return (angle_deg + refined_off, best[1], best[2], best[3]) except ValueError: pass return best def _verify_ncc( self, scene_gray: np.ndarray, cx: float, cy: float, angle_deg: float, scale: float, ) -> float: """NCC tra template warpato alla pose e scena sottostante. Ritorna score [-1, 1]. Usato come filtro anti-falso-positivo: il matcher linemod può dare score alto su texture generiche ma sovrapponendo il template gray i pixel non corrispondono. """ if self.template_gray is None: return 1.0 t = self.template_gray h, w = t.shape cx_t = (w - 1) / 2.0 cy_t = (h - 1) / 2.0 M = cv2.getRotationMatrix2D((cx_t, cy_t), angle_deg, scale) M[0, 2] += cx - cx_t M[1, 2] += cy - cy_t H, W = scene_gray.shape warped = cv2.warpAffine( t, M, (W, H), flags=cv2.INTER_LINEAR, borderValue=0, ) mask = cv2.warpAffine( np.full_like(t, 255), M, (W, H), flags=cv2.INTER_NEAREST, borderValue=0, ) valid = mask > 0 if valid.sum() < 20: return 0.0 tpl = warped[valid].astype(np.float32) scn = scene_gray[valid].astype(np.float32) tm = tpl - tpl.mean() sm = scn - scn.mean() denom = np.sqrt((tm * tm).sum() * (sm * sm).sum()) + 1e-9 return float((tm * sm).sum() / denom) def find( self, scene_bgr: np.ndarray, min_score: float = 0.6, max_matches: int = 20, nms_radius: int | None = None, refine_angle: bool = True, subpixel: bool = True, verify_ncc: bool = True, verify_threshold: float = 0.4, ) -> list[Match]: if not self.variants: raise RuntimeError("Matcher non addestrato: chiamare train() prima.") gray0 = self._to_gray(scene_bgr) grays = [gray0] for _ in range(self.pyramid_levels - 1): grays.append(cv2.pyrDown(grays[-1])) top = len(grays) - 1 # Response map top-level resp_top = self._response_map(grays[top]) bin_has_top = np.array([resp_top[b].any() for b in range(N_BINS)]) if nms_radius is None: nms_radius = max(8, min(self.template_size) // 2) top_thresh = min_score * self.top_score_factor # Background score LOCALE: densità media bin-attivi normalizzata su # bbox template. Rinormalizzazione rimuove match dove la zona ha # attivazioni dense in tutti gli orientamenti (texture/rumore). tw, th = self.template_size def _bg_map(resp: np.ndarray, scale_div: int = 1) -> np.ndarray: """bg_map[y,x] = frazione bin attivi media in bbox template.""" density = resp.sum(axis=0) # (H, W) bw = max(9, tw // scale_div); bh = max(9, th // scale_div) smooth = cv2.boxFilter(density, cv2.CV_32F, (bw, bh)) return np.clip(smooth / N_BINS, 0.0, 0.99) bg_top = _bg_map(resp_top, scale_div=2 ** top) def _rescore(score: np.ndarray, bg: np.ndarray) -> np.ndarray: return np.maximum(0.0, (score - bg) / (1.0 - bg + 1e-6)) # Pruning varianti via top-level (parallelizzato) def _top_score(vi: int) -> tuple[int, float]: var = self.variants[vi] lvl = var.levels[min(top, len(var.levels) - 1)] score = self._score_by_shift( resp_top, lvl.dx, lvl.dy, lvl.bin, bin_has_data=bin_has_top, ) score = _rescore(score, bg_top) return vi, float(score.max()) if score.size else -1.0 kept_variants: list[tuple[int, float]] = [] if self.n_threads > 1: with ThreadPoolExecutor(max_workers=self.n_threads) as ex: for vi, best in ex.map(_top_score, range(len(self.variants))): if best >= top_thresh: kept_variants.append((vi, best)) else: for vi in range(len(self.variants)): vi2, best = _top_score(vi) if best >= top_thresh: kept_variants.append((vi2, best)) if not kept_variants: return [] max_vars_full = max(8, max_matches * 4) kept_variants.sort(key=lambda t: -t[1]) kept_variants = kept_variants[:max_vars_full] # Full-res (parallelizzato per variante) resp0 = self._response_map(gray0) bin_has_full = np.array([resp0[b].any() for b in range(N_BINS)]) bg_full = _bg_map(resp0, scale_div=1) def _full_score(vi: int) -> tuple[int, np.ndarray]: var = self.variants[vi] lvl0 = var.levels[0] score = self._score_by_shift( resp0, lvl0.dx, lvl0.dy, lvl0.bin, bin_has_data=bin_has_full, ) score = _rescore(score, bg_full) return vi, score candidates_per_var: list[tuple[int, np.ndarray]] = [] raw: list[tuple[float, int, int, int]] = [] var_indices = [vi for vi, _ in kept_variants] if self.n_threads > 1 and len(var_indices) > 1: with ThreadPoolExecutor(max_workers=self.n_threads) as ex: results = list(ex.map(_full_score, var_indices)) else: results = [_full_score(vi) for vi in var_indices] for vi, score in results: ys, xs = np.where(score >= min_score) if len(ys) == 0: continue vals = score[ys, xs] K = min(len(vals), max_matches * 5) ord_idx = np.argpartition(-vals, K - 1)[:K] candidates_per_var.append((vi, score)) for i in ord_idx: raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi)) raw.sort(key=lambda c: -c[0]) # Mappa vi → score_map per subpixel/refinement score_maps = dict(candidates_per_var) # NMS + subpixel + refinement angolare # Mask template per refinement (non disponibile qui: usa full) h, w = self.template_gray.shape if self.template_gray is not None else (0, 0) mask_full = np.full((h, w), 255, dtype=np.uint8) kept: list[Match] = [] r2 = nms_radius * nms_radius tw, th = self.template_size for score, xi, yi, vi in raw: var = self.variants[vi] cx_f = float(xi); cy_f = float(yi) if subpixel and vi in score_maps: cx_f, cy_f = self._subpixel_peak(score_maps[vi], xi, yi) if any((k.cx - cx_f) ** 2 + (k.cy - cy_f) ** 2 < r2 for k in kept): continue ang_f = var.angle_deg score_f = score if refine_angle and self.template_gray is not None: ang_f, score_f, cx_f, cy_f = self._refine_angle( resp0, self.template_gray, cx_f, cy_f, var.angle_deg, var.scale, mask_full, search_radius=self.angle_step_deg / 2.0, ) # Verify NCC: filtra falsi positivi con mismatch pixel-level if verify_ncc: ncc = self._verify_ncc(gray0, cx_f, cy_f, ang_f, var.scale) if ncc < verify_threshold: continue poly = _oriented_bbox_polygon( cx_f, cy_f, tw * var.scale, th * var.scale, ang_f, ) kept.append(Match( cx=cx_f, cy=cy_f, angle_deg=ang_f, scale=var.scale, score=score_f, bbox_poly=poly, )) if len(kept) >= max_matches: break return kept