ba54b42fdc
Due ottimizzazioni chiave:
1. Spread bitmap uint8 invece di response map (N_BINS, H, W) float32
- 32x meno memoria, cache-friendly
- Nuovi kernel Numba: _jit_score_bitmap, _jit_popcount_density
- Formato: spread[y,x] bit b = bin b attivo nel raggio di spread
- _refine_angle usa slicing su bitmap con mask & bit
2. Pre-NMS prima di refine_angle/verify_ncc
- Problema: loop 'for raw in candidati' applicava refine+verify A OGNI
candidato prima del check NMS → 2000+ refine chiamati per ~25 match
- Fix: pre-NMS su (cx, cy) subpixel, limita a max_matches*3 candidati,
poi refine + verify solo su quelli
- Esempio worst case: lama_full_fast 55.9s → 1.13s (49x)
Benchmark suite 16 scenari (4 immagini x full/part x fast/preciso):
prima: totale find 94.6s
dopo: totale find 27.3s (3.5x globale)
casi peggiori <5s (prima erano >50s)
ROI parziali (solo metà oggetto) funzionano in tutti i casi.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
674 lines
26 KiB
Python
674 lines
26 KiB
Python
"""Shape-based matcher stile linemod (line2Dup) - Python puro + numpy/OpenCV.
|
||
|
||
Porting algoritmico dell'idea di `meiqua/shape_based_matching` (no MIPP/SIMD —
|
||
equivalente usando vettorizzazione numpy).
|
||
|
||
Training (costoso, fatto una volta per ricetta):
|
||
- Per ogni variante (angolo, scala) del template:
|
||
1. Sobel → magnitude + orientation
|
||
2. Quantizzazione orientation in N_BINS bin (modulo π, edge simmetrici)
|
||
3. Estrazione feature sparse top-magnitude con spacing minimo
|
||
4. Salvataggio feature = liste (dx, dy, bin) relative al centro-modello
|
||
|
||
Matching (veloce):
|
||
- Scena processata una sola volta per livello di piramide:
|
||
Sobel → magnitude → quant orientation → spread (dilate per bin) →
|
||
response map (N_BINS, H, W) — bit b acceso dove orientamento b presente.
|
||
- Per ogni variante:
|
||
score_map[y,x] = Σ resp[b_i][y+dy_i, x+dx_i] / N_features
|
||
implementato con shift-add vettorizzato (numpy).
|
||
- Piramide: matching top-level (basso costo, soglia ridotta) +
|
||
refinement a risoluzione piena attorno ai candidati.
|
||
|
||
Il training supporta una `mask` binaria per modellare solo una regione parziale
|
||
della ROI (modello non-rettangolare).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from dataclasses import dataclass
|
||
|
||
import cv2
|
||
import numpy as np
|
||
|
||
from pm2d._jit_kernels import (
|
||
score_by_shift as _jit_score_by_shift,
|
||
score_bitmap as _jit_score_bitmap,
|
||
popcount_density as _jit_popcount,
|
||
HAS_NUMBA,
|
||
)
|
||
|
||
N_BINS = 8 # orientamenti quantizzati modulo π
|
||
|
||
|
||
def _oriented_bbox_polygon(
|
||
cx: float, cy: float, w: float, h: float, angle_deg: float,
|
||
) -> np.ndarray:
|
||
"""Ritorna 4 vertici (float32, shape (4,2)) del bbox orientato.
|
||
|
||
Convenzione coerente con cv2.getRotationMatrix2D usato nel train:
|
||
rotazione counter-clockwise (matematica) ma sistema immagine y-down,
|
||
quindi visivamente orario.
|
||
"""
|
||
w2, h2 = w / 2.0, h / 2.0
|
||
# Vertici template non-ruotato centrati al centro
|
||
corners = np.array([[-w2, -h2], [w2, -h2], [w2, h2], [-w2, h2]], np.float32)
|
||
a = np.deg2rad(angle_deg)
|
||
c, s = np.cos(a), np.sin(a)
|
||
# cv2.getRotationMatrix2D con angolo a positivo applica R = [[c,s],[-s,c]]
|
||
# (ruota counter-clockwise nel sistema matematico; y-down → orario)
|
||
R = np.array([[c, s], [-s, c]], np.float32)
|
||
rotated = corners @ R.T
|
||
rotated[:, 0] += cx
|
||
rotated[:, 1] += cy
|
||
return rotated
|
||
|
||
|
||
@dataclass
|
||
class Match:
|
||
cx: float
|
||
cy: float
|
||
angle_deg: float
|
||
scale: float
|
||
score: float
|
||
bbox_poly: np.ndarray # (4, 2) float32 - 4 vertici ordinati (ruotato)
|
||
|
||
|
||
@dataclass
|
||
class _LevelFeatures:
|
||
"""Feature piramidate (livello l = downsample /2^l)."""
|
||
dx: np.ndarray # int32
|
||
dy: np.ndarray # int32
|
||
bin: np.ndarray # int8
|
||
n: int
|
||
|
||
|
||
@dataclass
|
||
class _Variant:
|
||
"""Template precomputato (una pose)."""
|
||
angle_deg: float
|
||
scale: float
|
||
# Feature piramide: levels[0] = full-res, levels[l] = /2^l
|
||
levels: list[_LevelFeatures]
|
||
# Bbox kernel (per visualizzazione / limiti ricerca)
|
||
kh: int
|
||
kw: int
|
||
cx_local: float # centro-modello dentro al bbox kernel
|
||
cy_local: float
|
||
|
||
|
||
class LineShapeMatcher:
|
||
"""Shape-based matcher linemod-style - Python/numpy, no SIMD."""
|
||
|
||
def __init__(
|
||
self,
|
||
num_features: int = 96,
|
||
weak_grad: float = 30.0,
|
||
strong_grad: float = 60.0,
|
||
angle_range_deg: tuple[float, float] = (0.0, 360.0),
|
||
angle_step_deg: float = 5.0,
|
||
scale_range: tuple[float, float] = (1.0, 1.0),
|
||
scale_step: float = 0.1,
|
||
spread_radius: int = 4,
|
||
min_feature_spacing: int = 3,
|
||
pyramid_levels: int = 2,
|
||
top_score_factor: float = 0.5,
|
||
n_threads: int | None = None,
|
||
) -> None:
|
||
self.num_features = num_features
|
||
self.weak_grad = weak_grad
|
||
self.strong_grad = strong_grad
|
||
self.angle_range_deg = angle_range_deg
|
||
self.angle_step_deg = angle_step_deg
|
||
self.scale_range = scale_range
|
||
self.scale_step = scale_step
|
||
self.spread_radius = spread_radius
|
||
self.min_feature_spacing = min_feature_spacing
|
||
self.pyramid_levels = max(1, pyramid_levels)
|
||
self.top_score_factor = top_score_factor
|
||
self.n_threads = n_threads or max(1, (os.cpu_count() or 2) - 1)
|
||
|
||
self.variants: list[_Variant] = []
|
||
self.template_size: tuple[int, int] = (0, 0)
|
||
self.template_gray: np.ndarray | None = None
|
||
|
||
# --- Helpers -------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def _to_gray(img: np.ndarray) -> np.ndarray:
|
||
if img.ndim == 3:
|
||
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
return img
|
||
|
||
@staticmethod
|
||
def _gradient(gray: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||
gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
|
||
gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
|
||
mag = cv2.magnitude(gx, gy)
|
||
ang = np.arctan2(gy, gx)
|
||
ang_mod = np.where(ang < 0, ang + np.pi, ang)
|
||
bins = np.floor(ang_mod / np.pi * N_BINS).astype(np.int16)
|
||
bins = np.clip(bins, 0, N_BINS - 1)
|
||
return mag, bins
|
||
|
||
def _extract_features(
|
||
self, mag: np.ndarray, bins: np.ndarray, mask: np.ndarray | None,
|
||
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
if mask is not None:
|
||
mag = np.where(mask > 0, mag, 0)
|
||
strong = mag >= self.strong_grad
|
||
ys, xs = np.where(strong)
|
||
if len(xs) == 0:
|
||
return (np.zeros(0, np.int32),) * 3
|
||
vals = mag[ys, xs]
|
||
order = np.argsort(-vals)
|
||
spc = max(1, self.min_feature_spacing)
|
||
occupied = np.zeros(mag.shape, dtype=bool)
|
||
picked_x: list[int] = []
|
||
picked_y: list[int] = []
|
||
picked_b: list[int] = []
|
||
for idx in order:
|
||
y, x = int(ys[idx]), int(xs[idx])
|
||
if occupied[y, x]:
|
||
continue
|
||
picked_x.append(x); picked_y.append(y)
|
||
picked_b.append(int(bins[y, x]))
|
||
y0 = max(0, y - spc); y1 = min(mag.shape[0], y + spc + 1)
|
||
x0 = max(0, x - spc); x1 = min(mag.shape[1], x + spc + 1)
|
||
occupied[y0:y1, x0:x1] = True
|
||
if len(picked_x) >= self.num_features:
|
||
break
|
||
return (np.array(picked_x, np.int32),
|
||
np.array(picked_y, np.int32),
|
||
np.array(picked_b, np.int8))
|
||
|
||
def _scale_list(self) -> list[float]:
|
||
s0, s1 = self.scale_range
|
||
if s0 >= s1 or self.scale_step <= 0:
|
||
return [float(s0)]
|
||
n = int(np.floor((s1 - s0) / self.scale_step)) + 1
|
||
return [float(s0 + i * self.scale_step) for i in range(n)]
|
||
|
||
def _angle_list(self) -> list[float]:
|
||
a0, a1 = self.angle_range_deg
|
||
if self.angle_step_deg <= 0 or a0 >= a1:
|
||
return [float(a0)]
|
||
n = int(np.floor((a1 - a0) / self.angle_step_deg))
|
||
return [float(a0 + i * self.angle_step_deg) for i in range(n)]
|
||
|
||
# --- Training ------------------------------------------------------
|
||
|
||
def _build_pyramid_features(
|
||
self, dx: np.ndarray, dy: np.ndarray, bin_: np.ndarray,
|
||
) -> list[_LevelFeatures]:
|
||
"""Piramide feature precomputata: livello l = /2^l con dedup."""
|
||
levels = [_LevelFeatures(dx=dx.copy(), dy=dy.copy(), bin=bin_.copy(),
|
||
n=len(dx))]
|
||
for lvl in range(1, self.pyramid_levels):
|
||
sf = 2 ** lvl
|
||
dx_l = (dx // sf).astype(np.int32)
|
||
dy_l = (dy // sf).astype(np.int32)
|
||
# Dedup: rimuove feature collassate sullo stesso (dx, dy, bin)
|
||
key = ((dx_l.astype(np.int64) << 24)
|
||
| (dy_l.astype(np.int64) << 8)
|
||
| bin_.astype(np.int64))
|
||
_, uniq = np.unique(key, return_index=True)
|
||
levels.append(_LevelFeatures(
|
||
dx=dx_l[uniq], dy=dy_l[uniq], bin=bin_[uniq], n=len(uniq),
|
||
))
|
||
return levels
|
||
|
||
def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int:
|
||
"""Genera varianti rotate+scalate con feature sparse + piramide."""
|
||
gray = self._to_gray(template_bgr)
|
||
h, w = gray.shape
|
||
self.template_size = (w, h)
|
||
self.template_gray = gray.copy()
|
||
if mask is None:
|
||
mask_full = np.full((h, w), 255, dtype=np.uint8)
|
||
else:
|
||
mask_full = (mask > 0).astype(np.uint8) * 255
|
||
|
||
self.variants.clear()
|
||
for s in self._scale_list():
|
||
sw = max(16, int(round(w * s)))
|
||
sh = max(16, int(round(h * s)))
|
||
gray_s = cv2.resize(gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
|
||
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
|
||
diag = int(np.ceil(np.hypot(sh, sw))) + 6
|
||
py = (diag - sh) // 2
|
||
px = (diag - sw) // 2
|
||
gray_p = cv2.copyMakeBorder(
|
||
gray_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_REPLICATE,
|
||
)
|
||
mask_p = cv2.copyMakeBorder(
|
||
mask_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_CONSTANT, value=0,
|
||
)
|
||
center = (diag / 2.0, diag / 2.0)
|
||
|
||
for ang in self._angle_list():
|
||
M = cv2.getRotationMatrix2D(center, ang, 1.0)
|
||
gray_r = cv2.warpAffine(
|
||
gray_p, M, (diag, diag),
|
||
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE,
|
||
)
|
||
mask_r = cv2.warpAffine(
|
||
mask_p, M, (diag, diag),
|
||
flags=cv2.INTER_NEAREST, borderValue=0,
|
||
)
|
||
mag, bins = self._gradient(gray_r)
|
||
fx, fy, fb = self._extract_features(mag, bins, mask_r)
|
||
if len(fx) < 8:
|
||
continue
|
||
|
||
cx_c = diag / 2.0
|
||
cy_c = diag / 2.0
|
||
dx = (fx - cx_c).astype(np.int32)
|
||
dy = (fy - cy_c).astype(np.int32)
|
||
|
||
x0 = int(dx.min()); x1 = int(dx.max())
|
||
y0 = int(dy.min()); y1 = int(dy.max())
|
||
kw = x1 - x0 + 1
|
||
kh = y1 - y0 + 1
|
||
cx_local = -x0
|
||
cy_local = -y0
|
||
|
||
levels = self._build_pyramid_features(dx, dy, fb)
|
||
|
||
self.variants.append(_Variant(
|
||
angle_deg=float(ang),
|
||
scale=float(s),
|
||
levels=levels,
|
||
kh=kh, kw=kw,
|
||
cx_local=float(cx_local), cy_local=float(cy_local),
|
||
))
|
||
return len(self.variants)
|
||
|
||
# --- Matching ------------------------------------------------------
|
||
|
||
def _response_map(self, gray: np.ndarray) -> np.ndarray:
|
||
"""Response map shape (N_BINS, H, W) float32 (legacy path)."""
|
||
mag, bins = self._gradient(gray)
|
||
valid = mag >= self.weak_grad
|
||
k = 2 * self.spread_radius + 1
|
||
kernel = np.ones((k, k), dtype=np.uint8)
|
||
H, W = gray.shape
|
||
raw = np.zeros((N_BINS, H, W), dtype=np.float32)
|
||
for b in range(N_BINS):
|
||
mask_b = ((bins == b) & valid).astype(np.uint8)
|
||
d = cv2.dilate(mask_b, kernel)
|
||
raw[b] = d.astype(np.float32)
|
||
return raw
|
||
|
||
def _spread_bitmap(self, gray: np.ndarray) -> np.ndarray:
|
||
"""Spread bitmap uint8: bit b acceso dove bin b è presente nel raggio.
|
||
|
||
Formato compatto 32× più denso della response map (N_BINS, H, W) float32.
|
||
"""
|
||
mag, bins = self._gradient(gray)
|
||
valid = mag >= self.weak_grad
|
||
k = 2 * self.spread_radius + 1
|
||
kernel = np.ones((k, k), dtype=np.uint8)
|
||
H, W = gray.shape
|
||
spread = np.zeros((H, W), dtype=np.uint8)
|
||
for b in range(N_BINS):
|
||
mask_b = ((bins == b) & valid).astype(np.uint8)
|
||
d = cv2.dilate(mask_b, kernel)
|
||
spread |= (d << b)
|
||
return spread
|
||
|
||
@staticmethod
|
||
def _score_by_shift(
|
||
resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray,
|
||
bin_has_data: np.ndarray | None = None,
|
||
) -> np.ndarray:
|
||
"""score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx).
|
||
|
||
Dispatch a kernel Numba JIT se disponibile, altrimenti fallback numpy.
|
||
"""
|
||
return _jit_score_by_shift(resp, dx, dy, bins, bin_has_data)
|
||
|
||
@staticmethod
|
||
def _subpixel_peak(acc: np.ndarray, x: int, y: int) -> tuple[float, float]:
|
||
"""Fit parabolico 2D attorno al picco per offset subpixel (±0.5 px)."""
|
||
H, W = acc.shape
|
||
if x <= 0 or x >= W - 1 or y <= 0 or y >= H - 1:
|
||
return float(x), float(y)
|
||
c = acc[y, x]
|
||
dx2 = acc[y, x + 1] - 2 * c + acc[y, x - 1]
|
||
dy2 = acc[y + 1, x] - 2 * c + acc[y - 1, x]
|
||
dx1 = (acc[y, x + 1] - acc[y, x - 1]) / 2.0
|
||
dy1 = (acc[y + 1, x] - acc[y - 1, x]) / 2.0
|
||
ox = -dx1 / dx2 if abs(dx2) > 1e-6 else 0.0
|
||
oy = -dy1 / dy2 if abs(dy2) > 1e-6 else 0.0
|
||
ox = float(np.clip(ox, -0.5, 0.5))
|
||
oy = float(np.clip(oy, -0.5, 0.5))
|
||
return x + ox, y + oy
|
||
|
||
def _refine_angle(
|
||
self,
|
||
spread0: np.ndarray, # bitmap uint8 (H, W)
|
||
bit_active: int,
|
||
template_gray: np.ndarray,
|
||
cx: float, cy: float,
|
||
angle_deg: float, scale: float,
|
||
mask_full: np.ndarray,
|
||
angle_fine_step: float = 0.5,
|
||
search_radius: float | None = None,
|
||
) -> tuple[float, float, float, float]:
|
||
"""Ricerca angolare fine (sub-step) attorno al match grezzo.
|
||
|
||
Genera 5 template temporanei a angle ± {0.5, 1.0} * step e sceglie
|
||
l'angolo con score massimo (parabolic fit sulle 3 score centrali).
|
||
Ritorna (angle_refined, score, cx_refined, cy_refined).
|
||
"""
|
||
if search_radius is None:
|
||
search_radius = self.angle_step_deg / 2.0
|
||
offsets = np.linspace(-search_radius, search_radius, 5)
|
||
best = (angle_deg, -1.0, cx, cy)
|
||
scores_by_off: dict[float, float] = {}
|
||
|
||
h, w = template_gray.shape
|
||
sw = max(16, int(round(w * scale)))
|
||
sh = max(16, int(round(h * scale)))
|
||
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
|
||
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
|
||
diag = int(np.ceil(np.hypot(sh, sw))) + 6
|
||
py = (diag - sh) // 2; px = (diag - sw) // 2
|
||
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_REPLICATE)
|
||
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_CONSTANT, value=0)
|
||
center = (diag / 2.0, diag / 2.0)
|
||
|
||
H, W = spread0.shape
|
||
# Ricerca locale posizione con margine ±2 px sulla (cx, cy)
|
||
margin = 3
|
||
|
||
for off in offsets:
|
||
ang = angle_deg + off
|
||
M = cv2.getRotationMatrix2D(center, ang, 1.0)
|
||
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
|
||
flags=cv2.INTER_NEAREST, borderValue=0)
|
||
mag, bins = self._gradient(gray_r)
|
||
fx, fy, fb = self._extract_features(mag, bins, mask_r)
|
||
if len(fx) < 8:
|
||
scores_by_off[float(off)] = 0.0
|
||
continue
|
||
dx = (fx - center[0]).astype(np.int32)
|
||
dy = (fy - center[1]).astype(np.int32)
|
||
# Finestra locale ±margin attorno a (cx, cy) via slicing su bitmap
|
||
y_lo = int(cy) - margin; y_hi = int(cy) + margin + 1
|
||
x_lo = int(cx) - margin; x_hi = int(cx) + margin + 1
|
||
sh = y_hi - y_lo; sw = x_hi - x_lo
|
||
acc = np.zeros((sh, sw), dtype=np.float32)
|
||
for i in range(len(dx)):
|
||
ddx = int(dx[i]); ddy = int(dy[i]); b = int(fb[i])
|
||
bit = np.uint8(1 << b)
|
||
sy0 = y_lo + ddy; sy1 = y_hi + ddy
|
||
sx0 = x_lo + ddx; sx1 = x_hi + ddx
|
||
a_y0 = max(0, -sy0); a_y1 = sh - max(0, sy1 - H)
|
||
a_x0 = max(0, -sx0); a_x1 = sw - max(0, sx1 - W)
|
||
s_y0 = max(0, sy0); s_y1 = min(H, sy1)
|
||
s_x0 = max(0, sx0); s_x1 = min(W, sx1)
|
||
if s_y1 > s_y0 and s_x1 > s_x0:
|
||
region = spread0[s_y0:s_y1, s_x0:s_x1]
|
||
acc[a_y0:a_y1, a_x0:a_x1] += (
|
||
(region & bit) != 0
|
||
).astype(np.float32)
|
||
acc /= len(dx)
|
||
_, max_val, _, max_loc = cv2.minMaxLoc(acc)
|
||
scores_by_off[float(off)] = float(max_val)
|
||
if max_val > best[1]:
|
||
new_cx = x_lo + float(max_loc[0])
|
||
new_cy = y_lo + float(max_loc[1])
|
||
best = (ang, float(max_val), new_cx, new_cy)
|
||
|
||
# Parabolic fit su 3 angoli attorno al massimo
|
||
sorted_offs = sorted(scores_by_off.keys())
|
||
best_off = best[0] - angle_deg
|
||
try:
|
||
i = sorted_offs.index(
|
||
min(sorted_offs, key=lambda x: abs(x - best_off))
|
||
)
|
||
if 0 < i < len(sorted_offs) - 1:
|
||
s0 = scores_by_off[sorted_offs[i - 1]]
|
||
s1 = scores_by_off[sorted_offs[i]]
|
||
s2 = scores_by_off[sorted_offs[i + 1]]
|
||
denom = (s0 - 2 * s1 + s2)
|
||
if abs(denom) > 1e-6:
|
||
delta = 0.5 * (s0 - s2) / denom
|
||
step = sorted_offs[i + 1] - sorted_offs[i]
|
||
refined_off = sorted_offs[i] + delta * step
|
||
return (angle_deg + refined_off, best[1], best[2], best[3])
|
||
except ValueError:
|
||
pass
|
||
return best
|
||
|
||
def _verify_ncc(
|
||
self, scene_gray: np.ndarray, cx: float, cy: float,
|
||
angle_deg: float, scale: float,
|
||
) -> float:
|
||
"""NCC tra template warpato alla pose e scena sottostante.
|
||
|
||
Ritorna score [-1, 1]. Usato come filtro anti-falso-positivo:
|
||
il matcher linemod può dare score alto su texture generiche ma
|
||
sovrapponendo il template gray i pixel non corrispondono.
|
||
"""
|
||
if self.template_gray is None:
|
||
return 1.0
|
||
t = self.template_gray
|
||
h, w = t.shape
|
||
cx_t = (w - 1) / 2.0
|
||
cy_t = (h - 1) / 2.0
|
||
M = cv2.getRotationMatrix2D((cx_t, cy_t), angle_deg, scale)
|
||
M[0, 2] += cx - cx_t
|
||
M[1, 2] += cy - cy_t
|
||
H, W = scene_gray.shape
|
||
warped = cv2.warpAffine(
|
||
t, M, (W, H),
|
||
flags=cv2.INTER_LINEAR, borderValue=0,
|
||
)
|
||
mask = cv2.warpAffine(
|
||
np.full_like(t, 255), M, (W, H),
|
||
flags=cv2.INTER_NEAREST, borderValue=0,
|
||
)
|
||
valid = mask > 0
|
||
if valid.sum() < 20:
|
||
return 0.0
|
||
tpl = warped[valid].astype(np.float32)
|
||
scn = scene_gray[valid].astype(np.float32)
|
||
tm = tpl - tpl.mean()
|
||
sm = scn - scn.mean()
|
||
denom = np.sqrt((tm * tm).sum() * (sm * sm).sum()) + 1e-9
|
||
return float((tm * sm).sum() / denom)
|
||
|
||
def find(
|
||
self,
|
||
scene_bgr: np.ndarray,
|
||
min_score: float = 0.6,
|
||
max_matches: int = 20,
|
||
nms_radius: int | None = None,
|
||
refine_angle: bool = True,
|
||
subpixel: bool = True,
|
||
verify_ncc: bool = True,
|
||
verify_threshold: float = 0.4,
|
||
) -> list[Match]:
|
||
if not self.variants:
|
||
raise RuntimeError("Matcher non addestrato: chiamare train() prima.")
|
||
|
||
gray0 = self._to_gray(scene_bgr)
|
||
grays = [gray0]
|
||
for _ in range(self.pyramid_levels - 1):
|
||
grays.append(cv2.pyrDown(grays[-1]))
|
||
top = len(grays) - 1
|
||
|
||
# Spread bitmap (uint8) al top level: 32× meno memoria della response
|
||
# map float32 → MOLTO più cache-friendly per _score_by_shift.
|
||
spread_top = self._spread_bitmap(grays[top])
|
||
bit_active_top = int(
|
||
sum(1 << b for b in range(N_BINS)
|
||
if (spread_top & np.uint8(1 << b)).any())
|
||
)
|
||
if nms_radius is None:
|
||
nms_radius = max(8, min(self.template_size) // 2)
|
||
top_thresh = min_score * self.top_score_factor
|
||
|
||
tw, th = self.template_size
|
||
density_top = _jit_popcount(spread_top)
|
||
sf_top = 2 ** top
|
||
bg_cache_top: dict[float, np.ndarray] = {}
|
||
bg_cache_full: dict[float, np.ndarray] = {}
|
||
unique_scales = sorted({var.scale for var in self.variants})
|
||
|
||
def _bg_for_scale(density: np.ndarray, scale: float,
|
||
divisor: int) -> np.ndarray:
|
||
bw = max(9, int(round(tw * scale / divisor)))
|
||
bh = max(9, int(round(th * scale / divisor)))
|
||
sm = cv2.boxFilter(density, cv2.CV_32F, (bw, bh))
|
||
return np.clip(sm / N_BINS, 0.0, 0.99)
|
||
|
||
for sc in unique_scales:
|
||
bg_cache_top[sc] = _bg_for_scale(density_top, sc, sf_top)
|
||
|
||
def _rescore(score: np.ndarray, bg: np.ndarray) -> np.ndarray:
|
||
return np.maximum(0.0, (score - bg) / (1.0 - bg + 1e-6))
|
||
|
||
# Pruning varianti via top-level (parallelizzato)
|
||
def _top_score(vi: int) -> tuple[int, float]:
|
||
var = self.variants[vi]
|
||
lvl = var.levels[min(top, len(var.levels) - 1)]
|
||
score = _jit_score_bitmap(
|
||
spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top,
|
||
)
|
||
score = _rescore(score, bg_cache_top[var.scale])
|
||
return vi, float(score.max()) if score.size else -1.0
|
||
|
||
kept_variants: list[tuple[int, float]] = []
|
||
if self.n_threads > 1:
|
||
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
|
||
for vi, best in ex.map(_top_score, range(len(self.variants))):
|
||
if best >= top_thresh:
|
||
kept_variants.append((vi, best))
|
||
else:
|
||
for vi in range(len(self.variants)):
|
||
vi2, best = _top_score(vi)
|
||
if best >= top_thresh:
|
||
kept_variants.append((vi2, best))
|
||
|
||
if not kept_variants:
|
||
return []
|
||
|
||
# Cap adattivo: ordina per score top-level e mantieni le più promettenti.
|
||
# Min: max_matches*8 (margine per NMS cross-variant)
|
||
# Max: 50% delle varianti totali (protegge performance con molte scale)
|
||
kept_variants.sort(key=lambda t: -t[1])
|
||
max_vars_full = max(max_matches * 8, len(self.variants) // 2)
|
||
kept_variants = kept_variants[:max_vars_full]
|
||
|
||
# Full-res (parallelizzato) con bitmap
|
||
spread0 = self._spread_bitmap(gray0)
|
||
bit_active_full = int(
|
||
sum(1 << b for b in range(N_BINS)
|
||
if (spread0 & np.uint8(1 << b)).any())
|
||
)
|
||
density_full = _jit_popcount(spread0)
|
||
for sc in unique_scales:
|
||
bg_cache_full[sc] = _bg_for_scale(density_full, sc, 1)
|
||
|
||
def _full_score(vi: int) -> tuple[int, np.ndarray]:
|
||
var = self.variants[vi]
|
||
lvl0 = var.levels[0]
|
||
score = _jit_score_bitmap(
|
||
spread0, lvl0.dx, lvl0.dy, lvl0.bin, bit_active_full,
|
||
)
|
||
score = _rescore(score, bg_cache_full[var.scale])
|
||
return vi, score
|
||
|
||
candidates_per_var: list[tuple[int, np.ndarray]] = []
|
||
raw: list[tuple[float, int, int, int]] = []
|
||
var_indices = [vi for vi, _ in kept_variants]
|
||
if self.n_threads > 1 and len(var_indices) > 1:
|
||
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
|
||
results = list(ex.map(_full_score, var_indices))
|
||
else:
|
||
results = [_full_score(vi) for vi in var_indices]
|
||
|
||
for vi, score in results:
|
||
ys, xs = np.where(score >= min_score)
|
||
if len(ys) == 0:
|
||
continue
|
||
vals = score[ys, xs]
|
||
K = min(len(vals), max_matches * 5)
|
||
ord_idx = np.argpartition(-vals, K - 1)[:K]
|
||
candidates_per_var.append((vi, score))
|
||
for i in ord_idx:
|
||
raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi))
|
||
|
||
raw.sort(key=lambda c: -c[0])
|
||
|
||
# Mappa vi → score_map per subpixel/refinement
|
||
score_maps = dict(candidates_per_var)
|
||
|
||
# NMS + subpixel + refinement angolare
|
||
# Mask template per refinement (non disponibile qui: usa full)
|
||
h, w = self.template_gray.shape if self.template_gray is not None else (0, 0)
|
||
mask_full = np.full((h, w), 255, dtype=np.uint8)
|
||
|
||
# Pre-NMS rapido su raw (solo subpixel, no refine/verify): riduce
|
||
# i candidati a ~max_matches*3 prima di operazioni costose (refine,
|
||
# verify) che erano chiamate per ogni raw causando lentezze 100x.
|
||
r2 = nms_radius * nms_radius
|
||
preliminary: list[tuple[float, float, float, int]] = []
|
||
pre_cap = max(max_matches * 3, max_matches + 10)
|
||
for score, xi, yi, vi in raw:
|
||
if subpixel and vi in score_maps:
|
||
cx_f, cy_f = self._subpixel_peak(score_maps[vi], xi, yi)
|
||
else:
|
||
cx_f, cy_f = float(xi), float(yi)
|
||
if any((k[1] - cx_f) ** 2 + (k[2] - cy_f) ** 2 < r2
|
||
for k in preliminary):
|
||
continue
|
||
preliminary.append((score, cx_f, cy_f, vi))
|
||
if len(preliminary) >= pre_cap:
|
||
break
|
||
|
||
# Ora refine + verify solo sui candidati pre-NMS
|
||
kept: list[Match] = []
|
||
tw, th = self.template_size
|
||
for score, cx_f, cy_f, vi in preliminary:
|
||
var = self.variants[vi]
|
||
ang_f = var.angle_deg
|
||
score_f = score
|
||
if refine_angle and self.template_gray is not None:
|
||
ang_f, score_f, cx_f, cy_f = self._refine_angle(
|
||
spread0, bit_active_full, self.template_gray, cx_f, cy_f,
|
||
var.angle_deg, var.scale, mask_full,
|
||
search_radius=self.angle_step_deg / 2.0,
|
||
)
|
||
if verify_ncc:
|
||
ncc = self._verify_ncc(gray0, cx_f, cy_f, ang_f, var.scale)
|
||
if ncc < verify_threshold:
|
||
continue
|
||
|
||
poly = _oriented_bbox_polygon(
|
||
cx_f, cy_f, tw * var.scale, th * var.scale, ang_f,
|
||
)
|
||
kept.append(Match(
|
||
cx=cx_f, cy=cy_f,
|
||
angle_deg=ang_f,
|
||
scale=var.scale,
|
||
score=score_f,
|
||
bbox_poly=poly,
|
||
))
|
||
if len(kept) >= max_matches:
|
||
break
|
||
return kept
|