"""Pattern Matching 2D shape-based via edge template matching multi-rotazione/scala. Algoritmo equivalente a Fase Alpha del documento tecnico Vision Suite: - Estrazione edge Canny dal template (invarianza illuminazione) - Generazione varianti del template edge per ogni (angolo, scala) - matchTemplate NCC sulla scena edge per ogni variante - Picchi locali con NMS spaziale per multi-istanza Uso: vedi `EdgeShapeMatcher.train` e `EdgeShapeMatcher.find`. """ from __future__ import annotations from dataclasses import dataclass import cv2 import numpy as np from pm2d.line_matcher import _oriented_bbox_polygon @dataclass class Match: """Singola istanza trovata nella scena.""" cx: float # baricentro x [px] nella scena cy: float # baricentro y [px] nella scena angle_deg: float # rotazione [0, 360) scale: float # fattore scala (1.0 = template originale) score: float # similarità NCC [0, 1] bbox_poly: np.ndarray # (4, 2) float32 - vertici bbox orientato @dataclass class Template: """Variante precomputata del template a un dato (angolo, scala).""" angle_deg: float scale: float edge: np.ndarray # immagine edge ruotata+scalata (uint8 0/255) mask: np.ndarray # maschera supporto (uint8 0/255) cx_local: float # baricentro nel sistema locale variante cy_local: float class EdgeShapeMatcher: """Matcher shape-based su edge Canny con rotazione + scala precomputate.""" def __init__( self, canny_low: int = 50, canny_high: int = 150, angle_step_deg: float = 5.0, angle_range_deg: tuple[float, float] = (0.0, 360.0), scale_range: tuple[float, float] = (1.0, 1.0), scale_step: float = 0.1, match_method: int = cv2.TM_CCOEFF_NORMED, pyramid_levels: int = 3, top_score_factor: float = 0.6, ) -> None: self.canny_low = canny_low self.canny_high = canny_high self.angle_step_deg = angle_step_deg self.angle_range_deg = angle_range_deg self.scale_range = scale_range self.scale_step = scale_step self.match_method = match_method self.pyramid_levels = max(1, pyramid_levels) self.top_score_factor = top_score_factor self.templates: list[Template] = [] self.template_size: tuple[int, int] = (0, 0) # w, h originale self.template_gray: np.ndarray | None = None @staticmethod def _to_gray(img: np.ndarray) -> np.ndarray: if img.ndim == 3: return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) return img def _edges(self, gray: np.ndarray) -> np.ndarray: return cv2.Canny(gray, self.canny_low, self.canny_high) def _scale_list(self) -> list[float]: s0, s1 = self.scale_range if s0 >= s1 or self.scale_step <= 0: return [float(s0)] n = int(np.floor((s1 - s0) / self.scale_step)) + 1 return [float(s0 + i * self.scale_step) for i in range(n)] def _angle_list(self) -> list[float]: a0, a1 = self.angle_range_deg if self.angle_step_deg <= 0 or a0 >= a1: return [float(a0)] n = int(np.floor((a1 - a0) / self.angle_step_deg)) return [float(a0 + i * self.angle_step_deg) for i in range(n)] def train(self, template_bgr: np.ndarray) -> int: """Genera varianti per tutte le combinazioni (angolo, scala).""" gray = self._to_gray(template_bgr) h, w = gray.shape self.template_size = (w, h) self.template_gray = gray.copy() edge_orig = self._edges(gray) mask_orig = np.full((h, w), 255, dtype=np.uint8) self.templates.clear() scales = self._scale_list() angles = self._angle_list() for s in scales: sw = max(8, int(round(w * s))) sh = max(8, int(round(h * s))) edge_s = cv2.resize(edge_orig, (sw, sh), interpolation=cv2.INTER_LINEAR) mask_s = cv2.resize(mask_orig, (sw, sh), interpolation=cv2.INTER_NEAREST) # Re-thresh dopo resize _, edge_s = cv2.threshold(edge_s, 64, 255, cv2.THRESH_BINARY) # Padding diagonale per rotazione senza cropping diag = int(np.ceil(np.hypot(sh, sw))) + 4 pad_y = (diag - sh) // 2 pad_x = (diag - sw) // 2 edge_p = cv2.copyMakeBorder( edge_s, pad_y, diag - sh - pad_y, pad_x, diag - sw - pad_x, cv2.BORDER_CONSTANT, value=0, ) mask_p = cv2.copyMakeBorder( mask_s, pad_y, diag - sh - pad_y, pad_x, diag - sw - pad_x, cv2.BORDER_CONSTANT, value=0, ) center = (diag / 2.0, diag / 2.0) for ang in angles: M = cv2.getRotationMatrix2D(center, ang, 1.0) edge_r = cv2.warpAffine( edge_p, M, (diag, diag), flags=cv2.INTER_LINEAR, borderValue=0, ) mask_r = cv2.warpAffine( mask_p, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0, ) # Crop minimo bounding mask ys, xs = np.where(mask_r > 0) if len(xs) == 0: continue x0, x1 = xs.min(), xs.max() + 1 y0, y1 = ys.min(), ys.max() + 1 edge_c = edge_r[y0:y1, x0:x1] mask_c = mask_r[y0:y1, x0:x1] cx_local = (mask_c.shape[1] - 1) / 2.0 cy_local = (mask_c.shape[0] - 1) / 2.0 self.templates.append( Template( angle_deg=float(ang), scale=float(s), edge=edge_c, mask=mask_c, cx_local=cx_local, cy_local=cy_local, ) ) return len(self.templates) def _pyrdown_binary(self, img: np.ndarray) -> np.ndarray: """pyrDown + re-thresh per mantenere binario 0/255.""" d = cv2.pyrDown(img) _, d = cv2.threshold(d, 32, 255, cv2.THRESH_BINARY) return d def find( self, scene_bgr: np.ndarray, min_score: float = 0.5, max_matches: int = 10, nms_radius: int | None = None, ) -> list[Match]: """Cerca istanze del template nella scena con strategia piramidale. - Top-level: matching brute-force a bassa risoluzione (veloce, soglia ridotta) - Refinement: re-match locale a risoluzione piena per ogni candidato """ if not self.templates: raise RuntimeError("Matcher non addestrato: chiamare train() prima.") gray = self._to_gray(scene_bgr) scene_edge0 = self._edges(gray) # Piramide scena edge scene_pyr = [scene_edge0] for _ in range(self.pyramid_levels - 1): scene_pyr.append(self._pyrdown_binary(scene_pyr[-1])) top = len(scene_pyr) - 1 sf = 2 ** top # scale factor top→0 scene_top = scene_pyr[top] if nms_radius is None: nms_radius = max(8, min(self.template_size) // 2) top_thresh = min_score * self.top_score_factor # Top-level brute-force candidates: list[tuple[float, int, int, int]] = [] for ti, tpl in enumerate(self.templates): edge_top = tpl.edge.copy() mask_top = tpl.mask.copy() for _ in range(top): edge_top = self._pyrdown_binary(edge_top) mask_top = self._pyrdown_binary(mask_top) th, tw = edge_top.shape if th < 6 or tw < 6: continue if scene_top.shape[0] < th or scene_top.shape[1] < tw: continue res = cv2.matchTemplate( scene_top, edge_top, self.match_method, mask=mask_top, ) res = np.nan_to_num(res, nan=-1.0, posinf=-1.0, neginf=-1.0) ys, xs = np.where(res >= top_thresh) for y, x in zip(ys, xs): candidates.append((float(res[y, x]), int(x), int(y), ti)) # Refinement a risoluzione piena: per ogni candidato top, finestra locale refined: list[tuple[float, int, int, int]] = [] margin = sf + 4 for _, xt, yt, ti in candidates: tpl = self.templates[ti] th, tw = tpl.edge.shape x0 = xt * sf y0 = yt * sf sx0 = max(0, x0 - margin) sy0 = max(0, y0 - margin) sx1 = min(scene_edge0.shape[1], x0 + tw + margin) sy1 = min(scene_edge0.shape[0], y0 + th + margin) roi = scene_edge0[sy0:sy1, sx0:sx1] if roi.shape[0] < th or roi.shape[1] < tw: continue res = cv2.matchTemplate( roi, tpl.edge, self.match_method, mask=tpl.mask, ) res = np.nan_to_num(res, nan=-1.0, posinf=-1.0, neginf=-1.0) _, max_val, _, max_loc = cv2.minMaxLoc(res) if max_val < min_score: continue bx = sx0 + max_loc[0] by = sy0 + max_loc[1] refined.append((float(max_val), bx, by, ti)) refined.sort(key=lambda c: -c[0]) # NMS spaziale baricentri kept: list[Match] = [] r2 = nms_radius * nms_radius tw0, th0 = self.template_size for score, x, y, ti in refined: tpl = self.templates[ti] cx = x + tpl.cx_local cy = y + tpl.cy_local if any((k.cx - cx) ** 2 + (k.cy - cy) ** 2 < r2 for k in kept): continue poly = _oriented_bbox_polygon( cx, cy, tw0 * tpl.scale, th0 * tpl.scale, tpl.angle_deg, ) kept.append( Match( cx=cx, cy=cy, angle_deg=tpl.angle_deg, scale=tpl.scale, score=score, bbox_poly=poly, ) ) if len(kept) >= max_matches: break return kept # --- Persistenza modello --- def save(self, path: str) -> None: """Salva matcher su disco (.npz).""" meta = np.array( [(t.angle_deg, t.scale, t.cx_local, t.cy_local) for t in self.templates], dtype=np.float32, ) params = np.array( [self.canny_low, self.canny_high, self.angle_step_deg, self.angle_range_deg[0], self.angle_range_deg[1], self.scale_range[0], self.scale_range[1], self.scale_step, self.template_size[0], self.template_size[1], self.match_method, self.pyramid_levels, self.top_score_factor], dtype=np.float32, ) arrays = {f"edge_{i}": t.edge for i, t in enumerate(self.templates)} arrays.update({f"mask_{i}": t.mask for i, t in enumerate(self.templates)}) np.savez_compressed(path, params=params, meta=meta, **arrays) @classmethod def load(cls, path: str) -> "EdgeShapeMatcher": z = np.load(path) p = z["params"] m = cls( canny_low=int(p[0]), canny_high=int(p[1]), angle_step_deg=float(p[2]), angle_range_deg=(float(p[3]), float(p[4])), scale_range=(float(p[5]), float(p[6])), scale_step=float(p[7]), match_method=int(p[10]), pyramid_levels=int(p[11]) if len(p) > 11 else 3, top_score_factor=float(p[12]) if len(p) > 12 else 0.6, ) m.template_size = (int(p[8]), int(p[9])) meta = z["meta"] for i in range(len(meta)): m.templates.append( Template( angle_deg=float(meta[i, 0]), scale=float(meta[i, 1]), edge=z[f"edge_{i}"], mask=z[f"mask_{i}"], cx_local=float(meta[i, 2]), cy_local=float(meta[i, 3]), ) ) return m