"""Shape-based matcher stile linemod (line2Dup) - Python puro + numpy/OpenCV. Porting algoritmico dell'idea di `meiqua/shape_based_matching` (no MIPP/SIMD — equivalente usando vettorizzazione numpy). Training (costoso, fatto una volta per ricetta): - Per ogni variante (angolo, scala) del template: 1. Sobel → magnitude + orientation 2. Quantizzazione orientation in N_BINS bin (modulo π, edge simmetrici) 3. Estrazione feature sparse top-magnitude con spacing minimo 4. Salvataggio feature = liste (dx, dy, bin) relative al centro-modello Matching (veloce): - Scena processata una sola volta per livello di piramide: Sobel → magnitude → quant orientation → spread (dilate per bin) → response map (N_BINS, H, W) — bit b acceso dove orientamento b presente. - Per ogni variante: score_map[y,x] = Σ resp[b_i][y+dy_i, x+dx_i] / N_features implementato con shift-add vettorizzato (numpy). - Piramide: matching top-level (basso costo, soglia ridotta) + refinement a risoluzione piena attorno ai candidati. Il training supporta una `mask` binaria per modellare solo una regione parziale della ROI (modello non-rettangolare). """ from __future__ import annotations from dataclasses import dataclass import cv2 import numpy as np N_BINS = 8 # orientamenti quantizzati modulo π @dataclass class Match: cx: float cy: float angle_deg: float scale: float score: float bbox: tuple[int, int, int, int] @dataclass class _Variant: """Template precomputato (una pose).""" angle_deg: float scale: float # Feature come 3 array paralleli (dx, dy, bin) relativi al centro-modello dx: np.ndarray # int32, shape (N,) dy: np.ndarray # int32, shape (N,) bin: np.ndarray # int8, shape (N,) # Bbox kernel (per visualizzazione / limiti ricerca) kh: int kw: int cx_local: float # centro-modello dentro al bbox kernel (solo per bbox visivo) cy_local: float n_features: int class LineShapeMatcher: """Shape-based matcher linemod-style - Python/numpy, no SIMD.""" def __init__( self, num_features: int = 96, weak_grad: float = 30.0, strong_grad: float = 60.0, angle_range_deg: tuple[float, float] = (0.0, 360.0), angle_step_deg: float = 5.0, scale_range: tuple[float, float] = (1.0, 1.0), scale_step: float = 0.1, spread_radius: int = 4, min_feature_spacing: int = 3, pyramid_levels: int = 2, top_score_factor: float = 0.5, ) -> None: self.num_features = num_features self.weak_grad = weak_grad self.strong_grad = strong_grad self.angle_range_deg = angle_range_deg self.angle_step_deg = angle_step_deg self.scale_range = scale_range self.scale_step = scale_step self.spread_radius = spread_radius self.min_feature_spacing = min_feature_spacing self.pyramid_levels = max(1, pyramid_levels) self.top_score_factor = top_score_factor self.variants: list[_Variant] = [] self.template_size: tuple[int, int] = (0, 0) # --- Helpers ------------------------------------------------------- @staticmethod def _to_gray(img: np.ndarray) -> np.ndarray: if img.ndim == 3: return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) return img @staticmethod def _gradient(gray: np.ndarray) -> tuple[np.ndarray, np.ndarray]: gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3) mag = cv2.magnitude(gx, gy) ang = np.arctan2(gy, gx) ang_mod = np.where(ang < 0, ang + np.pi, ang) bins = np.floor(ang_mod / np.pi * N_BINS).astype(np.int16) bins = np.clip(bins, 0, N_BINS - 1) return mag, bins def _extract_features( self, mag: np.ndarray, bins: np.ndarray, mask: np.ndarray | None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: if mask is not None: mag = np.where(mask > 0, mag, 0) strong = mag >= self.strong_grad ys, xs = np.where(strong) if len(xs) == 0: return (np.zeros(0, np.int32),) * 3 vals = mag[ys, xs] order = np.argsort(-vals) spc = max(1, self.min_feature_spacing) occupied = np.zeros(mag.shape, dtype=bool) picked_x: list[int] = [] picked_y: list[int] = [] picked_b: list[int] = [] for idx in order: y, x = int(ys[idx]), int(xs[idx]) if occupied[y, x]: continue picked_x.append(x); picked_y.append(y) picked_b.append(int(bins[y, x])) y0 = max(0, y - spc); y1 = min(mag.shape[0], y + spc + 1) x0 = max(0, x - spc); x1 = min(mag.shape[1], x + spc + 1) occupied[y0:y1, x0:x1] = True if len(picked_x) >= self.num_features: break return (np.array(picked_x, np.int32), np.array(picked_y, np.int32), np.array(picked_b, np.int8)) def _scale_list(self) -> list[float]: s0, s1 = self.scale_range if s0 >= s1 or self.scale_step <= 0: return [float(s0)] n = int(np.floor((s1 - s0) / self.scale_step)) + 1 return [float(s0 + i * self.scale_step) for i in range(n)] def _angle_list(self) -> list[float]: a0, a1 = self.angle_range_deg if self.angle_step_deg <= 0 or a0 >= a1: return [float(a0)] n = int(np.floor((a1 - a0) / self.angle_step_deg)) return [float(a0 + i * self.angle_step_deg) for i in range(n)] # --- Training ------------------------------------------------------ def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int: """Genera varianti rotate+scalate con feature sparse. mask: maschera binaria opzionale (stessa shape del template) per limitare il modello a una regione non rettangolare. """ gray = self._to_gray(template_bgr) h, w = gray.shape self.template_size = (w, h) if mask is None: mask_full = np.full((h, w), 255, dtype=np.uint8) else: mask_full = (mask > 0).astype(np.uint8) * 255 self.variants.clear() for s in self._scale_list(): sw = max(16, int(round(w * s))) sh = max(16, int(round(h * s))) gray_s = cv2.resize(gray, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST) diag = int(np.ceil(np.hypot(sh, sw))) + 6 py = (diag - sh) // 2 px = (diag - sw) // 2 gray_p = cv2.copyMakeBorder( gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE, ) mask_p = cv2.copyMakeBorder( mask_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_CONSTANT, value=0, ) center = (diag / 2.0, diag / 2.0) for ang in self._angle_list(): M = cv2.getRotationMatrix2D(center, ang, 1.0) gray_r = cv2.warpAffine( gray_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE, ) mask_r = cv2.warpAffine( mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0, ) mag, bins = self._gradient(gray_r) fx, fy, fb = self._extract_features(mag, bins, mask_r) if len(fx) < 8: continue # Feature relative al centro-modello (centro rotazione) cx_c = diag / 2.0 cy_c = diag / 2.0 dx = (fx - cx_c).astype(np.int32) dy = (fy - cy_c).astype(np.int32) # Dimensione bbox per visualizzazione x0 = int(dx.min()); x1 = int(dx.max()) y0 = int(dy.min()); y1 = int(dy.max()) kw = x1 - x0 + 1 kh = y1 - y0 + 1 cx_local = -x0 # posizione centro dentro al bbox cy_local = -y0 self.variants.append(_Variant( angle_deg=float(ang), scale=float(s), dx=dx, dy=dy, bin=fb, kh=kh, kw=kw, cx_local=float(cx_local), cy_local=float(cy_local), n_features=len(fx), )) return len(self.variants) # --- Matching ------------------------------------------------------ def _response_map(self, gray: np.ndarray) -> np.ndarray: """Costruisce response map shape (N_BINS, H, W) float32 0/1.""" mag, bins = self._gradient(gray) valid = mag >= self.weak_grad k = 2 * self.spread_radius + 1 kernel = np.ones((k, k), dtype=np.uint8) resp = np.zeros((N_BINS, gray.shape[0], gray.shape[1]), dtype=np.float32) for b in range(N_BINS): mask_b = ((bins == b) & valid).astype(np.uint8) d = cv2.dilate(mask_b, kernel) resp[b] = d.astype(np.float32) return resp @staticmethod def _score_by_shift( resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray, ) -> np.ndarray: """score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx). Implementazione vettorizzata con slicing. """ _, H, W = resp.shape acc = np.zeros((H, W), dtype=np.float32) for i in range(len(dx)): ddx = int(dx[i]); ddy = int(dy[i]); b = int(bins[i]) # dst[y, x] += resp[b][y+ddy, x+ddx] y0s = max(0, -ddy); y1s = min(H, H - ddy) x0s = max(0, -ddx); x1s = min(W, W - ddx) if y0s >= y1s or x0s >= x1s: continue y0r = y0s + ddy; y1r = y1s + ddy x0r = x0s + ddx; x1r = x1s + ddx acc[y0s:y1s, x0s:x1s] += resp[b, y0r:y1r, x0r:x1r] if len(dx) > 0: acc /= len(dx) return acc def find( self, scene_bgr: np.ndarray, min_score: float = 0.6, max_matches: int = 20, nms_radius: int | None = None, ) -> list[Match]: if not self.variants: raise RuntimeError("Matcher non addestrato: chiamare train() prima.") gray0 = self._to_gray(scene_bgr) grays = [gray0] for _ in range(self.pyramid_levels - 1): grays.append(cv2.pyrDown(grays[-1])) top = len(grays) - 1 sf = 2 ** top # Response map top-level (usata SOLO per pruning varianti) resp_top = self._response_map(grays[top]) if nms_radius is None: nms_radius = max(8, min(self.template_size) // 2) top_thresh = min_score * self.top_score_factor # Pruning varianti via top-level kept_variants: list[int] = [] for vi, var in enumerate(self.variants): dx_t = (var.dx // sf).astype(np.int32) dy_t = (var.dy // sf).astype(np.int32) key = ((dx_t.astype(np.int64) << 24) | (dy_t.astype(np.int64) << 8) | var.bin.astype(np.int64)) _, uniq_idx = np.unique(key, return_index=True) score = self._score_by_shift( resp_top, dx_t[uniq_idx], dy_t[uniq_idx], var.bin[uniq_idx], ) if score.size and score.max() >= top_thresh: kept_variants.append(vi) if not kept_variants: return [] # Full-res: score_by_shift solo per le varianti sopravvissute resp0 = self._response_map(gray0) refined: list[tuple[float, float, float, int]] = [] for vi in kept_variants: var = self.variants[vi] score = self._score_by_shift(resp0, var.dx, var.dy, var.bin) # Picchi sopra soglia ys, xs = np.where(score >= min_score) if len(ys) == 0: continue vals = score[ys, xs] # Ordine decrescente (solo i top-K per evitare liste enormi) K = min(len(vals), max_matches * 5) ord_idx = np.argpartition(-vals, K - 1)[:K] for i in ord_idx: refined.append((float(vals[i]), float(xs[i]), float(ys[i]), vi)) refined.sort(key=lambda c: -c[0]) kept: list[Match] = [] r2 = nms_radius * nms_radius for score, cx, cy, vi in refined: if any((k.cx - cx) ** 2 + (k.cy - cy) ** 2 < r2 for k in kept): continue var = self.variants[vi] bx = int(round(cx - var.cx_local)) by = int(round(cy - var.cy_local)) kept.append(Match( cx=cx, cy=cy, angle_deg=var.angle_deg, scale=var.scale, score=score, bbox=(bx, by, var.kw, var.kh), )) if len(kept) >= max_matches: break return kept