Files
Shape_Model_2D/pm2d/line_matcher.py
Adriano 4b7271094b feat: refine_pose_joint - Nelder-Mead 3D su (cx, cy, angle)
Alternativa al refine angolare 1D + subpixel quadratico: ottimizza
simultaneamente posizione e angolo con Nelder-Mead 3D inline (no
scipy). Default off (refine_pose_joint=False) per backward compat.

Vantaggio Halcon-style: un singolo iter LM/simplex stila il match a
precisione sub-pixel + sub-step in modo congiunto invece di alternare
assi. Convergenza tipica ~24 valutazioni vs ~15 (golden+quadratico)
ma piu robusto su template asimmetrici dove pose e angolo sono
fortemente correlati.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 15:30:20 +02:00

939 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Shape-based matcher stile linemod (line2Dup) - Python puro + numpy/OpenCV.
Porting algoritmico dell'idea di `meiqua/shape_based_matching` (no MIPP/SIMD —
equivalente usando vettorizzazione numpy).
Training (costoso, fatto una volta per ricetta):
- Per ogni variante (angolo, scala) del template:
1. Sobel → magnitude + orientation
2. Quantizzazione orientation in N_BINS bin (modulo π, edge simmetrici)
3. Estrazione feature sparse top-magnitude con spacing minimo
4. Salvataggio feature = liste (dx, dy, bin) relative al centro-modello
Matching (veloce):
- Scena processata una sola volta per livello di piramide:
Sobel → magnitude → quant orientation → spread (dilate per bin) →
response map (N_BINS, H, W) — bit b acceso dove orientamento b presente.
- Per ogni variante:
score_map[y,x] = Σ resp[b_i][y+dy_i, x+dx_i] / N_features
implementato con shift-add vettorizzato (numpy).
- Piramide: matching top-level (basso costo, soglia ridotta) +
refinement a risoluzione piena attorno ai candidati.
Il training supporta una `mask` binaria per modellare solo una regione parziale
della ROI (modello non-rettangolare).
"""
from __future__ import annotations
import math
import os
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import cv2
import numpy as np
_GOLDEN = (math.sqrt(5.0) - 1.0) / 2.0 # ≈ 0.618
from pm2d._jit_kernels import (
score_by_shift as _jit_score_by_shift,
score_bitmap as _jit_score_bitmap,
score_bitmap_rescored as _jit_score_bitmap_rescored,
popcount_density as _jit_popcount,
HAS_NUMBA,
)
N_BINS = 8 # orientamenti quantizzati modulo π
def _oriented_bbox_polygon(
cx: float, cy: float, w: float, h: float, angle_deg: float,
) -> np.ndarray:
"""Ritorna 4 vertici (float32, shape (4,2)) del bbox orientato.
Convenzione coerente con cv2.getRotationMatrix2D usato nel train:
rotazione counter-clockwise (matematica) ma sistema immagine y-down,
quindi visivamente orario.
"""
w2, h2 = w / 2.0, h / 2.0
# Vertici template non-ruotato centrati al centro
corners = np.array([[-w2, -h2], [w2, -h2], [w2, h2], [-w2, h2]], np.float32)
a = np.deg2rad(angle_deg)
c, s = np.cos(a), np.sin(a)
# cv2.getRotationMatrix2D con angolo a positivo applica R = [[c,s],[-s,c]]
# (ruota counter-clockwise nel sistema matematico; y-down → orario)
R = np.array([[c, s], [-s, c]], np.float32)
rotated = corners @ R.T
rotated[:, 0] += cx
rotated[:, 1] += cy
return rotated
@dataclass
class Match:
cx: float
cy: float
angle_deg: float
scale: float
score: float
bbox_poly: np.ndarray # (4, 2) float32 - 4 vertici ordinati (ruotato)
@dataclass
class _LevelFeatures:
"""Feature piramidate (livello l = downsample /2^l)."""
dx: np.ndarray # int32
dy: np.ndarray # int32
bin: np.ndarray # int8
n: int
@dataclass
class _Variant:
"""Template precomputato (una pose)."""
angle_deg: float
scale: float
# Feature piramide: levels[0] = full-res, levels[l] = /2^l
levels: list[_LevelFeatures]
# Bbox kernel (per visualizzazione / limiti ricerca)
kh: int
kw: int
cx_local: float # centro-modello dentro al bbox kernel
cy_local: float
class LineShapeMatcher:
"""Shape-based matcher linemod-style - Python/numpy, no SIMD."""
def __init__(
self,
num_features: int = 96,
weak_grad: float = 30.0,
strong_grad: float = 60.0,
angle_range_deg: tuple[float, float] = (0.0, 360.0),
angle_step_deg: float = 5.0,
scale_range: tuple[float, float] = (1.0, 1.0),
scale_step: float = 0.1,
spread_radius: int = 4,
min_feature_spacing: int = 3,
pyramid_levels: int = 2,
top_score_factor: float = 0.5,
n_threads: int | None = None,
) -> None:
self.num_features = num_features
self.weak_grad = weak_grad
self.strong_grad = strong_grad
self.angle_range_deg = angle_range_deg
self.angle_step_deg = angle_step_deg
self.scale_range = scale_range
self.scale_step = scale_step
self.spread_radius = spread_radius
self.min_feature_spacing = min_feature_spacing
self.pyramid_levels = max(1, pyramid_levels)
self.top_score_factor = top_score_factor
self.n_threads = n_threads or max(1, (os.cpu_count() or 2) - 1)
self.variants: list[_Variant] = []
self.template_size: tuple[int, int] = (0, 0)
self.template_gray: np.ndarray | None = None
# Maschera usata in training (propagata al refine per coerenza).
self._train_mask: np.ndarray | None = None
# --- Helpers -------------------------------------------------------
@staticmethod
def _to_gray(img: np.ndarray) -> np.ndarray:
if img.ndim == 3:
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
return img
@staticmethod
def _gradient(gray: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
mag = cv2.magnitude(gx, gy)
ang = np.arctan2(gy, gx)
ang_mod = np.where(ang < 0, ang + np.pi, ang)
bins = np.floor(ang_mod / np.pi * N_BINS).astype(np.int16)
bins = np.clip(bins, 0, N_BINS - 1)
return mag, bins
def _extract_features(
self, mag: np.ndarray, bins: np.ndarray, mask: np.ndarray | None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
if mask is not None:
mag = np.where(mask > 0, mag, 0)
strong = mag >= self.strong_grad
ys, xs = np.where(strong)
if len(xs) == 0:
return (np.zeros(0, np.int32),) * 3
vals = mag[ys, xs]
order = np.argsort(-vals)
spc = max(1, self.min_feature_spacing)
occupied = np.zeros(mag.shape, dtype=bool)
picked_x: list[int] = []
picked_y: list[int] = []
picked_b: list[int] = []
for idx in order:
y, x = int(ys[idx]), int(xs[idx])
if occupied[y, x]:
continue
picked_x.append(x); picked_y.append(y)
picked_b.append(int(bins[y, x]))
y0 = max(0, y - spc); y1 = min(mag.shape[0], y + spc + 1)
x0 = max(0, x - spc); x1 = min(mag.shape[1], x + spc + 1)
occupied[y0:y1, x0:x1] = True
if len(picked_x) >= self.num_features:
break
return (np.array(picked_x, np.int32),
np.array(picked_y, np.int32),
np.array(picked_b, np.int8))
def _scale_list(self) -> list[float]:
s0, s1 = self.scale_range
if s0 >= s1 or self.scale_step <= 0:
return [float(s0)]
n = int(np.floor((s1 - s0) / self.scale_step)) + 1
return [float(s0 + i * self.scale_step) for i in range(n)]
def _angle_list(self) -> list[float]:
a0, a1 = self.angle_range_deg
if self.angle_step_deg <= 0 or a0 >= a1:
return [float(a0)]
n = int(np.floor((a1 - a0) / self.angle_step_deg))
return [float(a0 + i * self.angle_step_deg) for i in range(n)]
# --- Training ------------------------------------------------------
def _build_pyramid_features(
self, dx: np.ndarray, dy: np.ndarray, bin_: np.ndarray,
) -> list[_LevelFeatures]:
"""Piramide feature precomputata: livello l = /2^l con dedup."""
levels = [_LevelFeatures(dx=dx.copy(), dy=dy.copy(), bin=bin_.copy(),
n=len(dx))]
for lvl in range(1, self.pyramid_levels):
sf = 2 ** lvl
dx_l = (dx // sf).astype(np.int32)
dy_l = (dy // sf).astype(np.int32)
# Dedup: rimuove feature collassate sullo stesso (dx, dy, bin)
key = ((dx_l.astype(np.int64) << 24)
| (dy_l.astype(np.int64) << 8)
| bin_.astype(np.int64))
_, uniq = np.unique(key, return_index=True)
levels.append(_LevelFeatures(
dx=dx_l[uniq], dy=dy_l[uniq], bin=bin_[uniq], n=len(uniq),
))
return levels
def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int:
"""Genera varianti rotate+scalate con feature sparse + piramide."""
gray = self._to_gray(template_bgr)
h, w = gray.shape
self.template_size = (w, h)
self.template_gray = gray.copy()
if mask is None:
mask_full = np.full((h, w), 255, dtype=np.uint8)
else:
mask_full = (mask > 0).astype(np.uint8) * 255
self._train_mask = mask_full.copy()
self.variants.clear()
for s in self._scale_list():
sw = max(16, int(round(w * s)))
sh = max(16, int(round(h * s)))
gray_s = cv2.resize(gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2
px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(
gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE,
)
mask_p = cv2.copyMakeBorder(
mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0,
)
center = (diag / 2.0, diag / 2.0)
for ang in self._angle_list():
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(
gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE,
)
mask_r = cv2.warpAffine(
mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0,
)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
if len(fx) < 8:
continue
cx_c = diag / 2.0
cy_c = diag / 2.0
dx = (fx - cx_c).astype(np.int32)
dy = (fy - cy_c).astype(np.int32)
x0 = int(dx.min()); x1 = int(dx.max())
y0 = int(dy.min()); y1 = int(dy.max())
kw = x1 - x0 + 1
kh = y1 - y0 + 1
cx_local = -x0
cy_local = -y0
levels = self._build_pyramid_features(dx, dy, fb)
self.variants.append(_Variant(
angle_deg=float(ang),
scale=float(s),
levels=levels,
kh=kh, kw=kw,
cx_local=float(cx_local), cy_local=float(cy_local),
))
return len(self.variants)
# --- Matching ------------------------------------------------------
def _response_map(self, gray: np.ndarray) -> np.ndarray:
"""Response map shape (N_BINS, H, W) float32 (legacy path)."""
mag, bins = self._gradient(gray)
valid = mag >= self.weak_grad
k = 2 * self.spread_radius + 1
kernel = np.ones((k, k), dtype=np.uint8)
H, W = gray.shape
raw = np.zeros((N_BINS, H, W), dtype=np.float32)
for b in range(N_BINS):
mask_b = ((bins == b) & valid).astype(np.uint8)
d = cv2.dilate(mask_b, kernel)
raw[b] = d.astype(np.float32)
return raw
def _spread_bitmap(self, gray: np.ndarray) -> np.ndarray:
"""Spread bitmap uint8: bit b acceso dove bin b è presente nel raggio.
Formato compatto 32× più denso della response map (N_BINS, H, W) float32.
"""
mag, bins = self._gradient(gray)
valid = mag >= self.weak_grad
k = 2 * self.spread_radius + 1
kernel = np.ones((k, k), dtype=np.uint8)
H, W = gray.shape
spread = np.zeros((H, W), dtype=np.uint8)
for b in range(N_BINS):
mask_b = ((bins == b) & valid).astype(np.uint8)
d = cv2.dilate(mask_b, kernel)
spread |= (d << b)
return spread
@staticmethod
def _score_by_shift(
resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray,
bin_has_data: np.ndarray | None = None,
) -> np.ndarray:
"""score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx).
Dispatch a kernel Numba JIT se disponibile, altrimenti fallback numpy.
"""
return _jit_score_by_shift(resp, dx, dy, bins, bin_has_data)
@staticmethod
def _subpixel_peak(
acc: np.ndarray, x: int, y: int, plateau_radius: int = 10,
) -> tuple[float, float]:
"""Posizione sub-pixel del picco.
1. Plateau saturo → centroide pesato del plateau (peso = score).
2. Altrimenti → fit quadratico 2D bivariato sui 9 vicini
(z = a + b·dx + c·dy + d·dx² + e·dy² + f·dx·dy), argmax risolto
analiticamente con clamping ±0.5 px.
"""
H, W = acc.shape
val = float(acc[y, x])
# Plateau detection: valori >= val - 0.01 entro raggio limitato
y0 = max(0, y - plateau_radius); y1 = min(H, y + plateau_radius + 1)
x0 = max(0, x - plateau_radius); x1 = min(W, x + plateau_radius + 1)
patch = acc[y0:y1, x0:x1]
plateau = patch >= val - 0.01
if plateau.sum() > 1:
# Centroide pesato per (score - (max-0.01))² per enfatizzare i top
weights = np.where(plateau, patch - (val - 0.01), 0.0).astype(np.float64)
weights = weights * weights
total = weights.sum()
if total > 1e-9:
ys_idx, xs_idx = np.indices(patch.shape)
cx_w = (xs_idx * weights).sum() / total
cy_w = (ys_idx * weights).sum() / total
return float(x0 + cx_w), float(y0 + cy_w)
ys_m, xs_m = np.where(plateau)
return float(x0 + xs_m.mean()), float(y0 + ys_m.mean())
# Fit quadratico 2D bivariato su 3x3 intorno
if x <= 0 or x >= W - 1 or y <= 0 or y >= H - 1:
return float(x), float(y)
# Stencil 3x3: Z[i, j] con i,j ∈ {-1, 0, +1}
Z = acc[y - 1:y + 2, x - 1:x + 2].astype(np.float64)
# Coefficienti da finite differences
b_c = (Z[1, 2] - Z[1, 0]) / 2.0
c_c = (Z[2, 1] - Z[0, 1]) / 2.0
d_c = (Z[1, 2] + Z[1, 0] - 2.0 * Z[1, 1]) / 2.0
e_c = (Z[2, 1] + Z[0, 1] - 2.0 * Z[1, 1]) / 2.0
f_c = (Z[2, 2] - Z[0, 2] - Z[2, 0] + Z[0, 0]) / 4.0
# Max: risolve [2d f; f 2e][dx;dy] = [-b;-c]
det = 4.0 * d_c * e_c - f_c * f_c
if abs(det) > 1e-9:
ox = (-2.0 * e_c * b_c + f_c * c_c) / det
oy = (-2.0 * d_c * c_c + f_c * b_c) / det
else:
# Fallback separabile
ox = -b_c / (2.0 * d_c) if abs(d_c) > 1e-6 else 0.0
oy = -c_c / (2.0 * e_c) if abs(e_c) > 1e-6 else 0.0
ox = float(np.clip(ox, -0.5, 0.5))
oy = float(np.clip(oy, -0.5, 0.5))
return x + ox, y + oy
def _refine_pose_joint(
self,
spread0: np.ndarray,
template_gray: np.ndarray,
cx: float, cy: float,
angle_deg: float, scale: float,
mask_full: np.ndarray,
max_iter: int = 24,
tol: float = 1e-3,
) -> tuple[float, float, float, float]:
"""Refine congiunto (cx, cy, angle) via Nelder-Mead 3D.
Ottimizza simultaneamente posizione e angolo (vs golden search 1D
sull'angolo poi quadratico 2D su xy che alterna assi). Halcon-style:
un singolo iter LM stila il match a precisione sub-pixel + sub-step.
Ritorna (angle, score, cx, cy) dove score e quello calcolato sulla
scena spread (no template gray).
"""
h, w = template_gray.shape
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE)
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0)
center = (diag / 2.0, diag / 2.0)
H, W = spread0.shape
def _score(params: tuple[float, float, float]) -> float:
ddx, ddy, dang = params
ang = angle_deg + dang
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
if len(fx) < 8:
return 0.0
cxe = cx + ddx; cye = cy + ddy
ix = int(round(cxe)); iy = int(round(cye))
tot = 0
valid = 0
for i in range(len(fx)):
xs = ix + int(fx[i] - center[0])
ys = iy + int(fy[i] - center[1])
if 0 <= xs < W and 0 <= ys < H:
bit = np.uint8(1 << int(fb[i]))
if spread0[ys, xs] & bit:
tot += 1
valid += 1
return -float(tot) / max(1, valid) # minimize -score
# Nelder-Mead 3D inline (no scipy). Simplex iniziale: vertice + offset
# dx=±0.5px, dy=±0.5px, dθ=±step/2.
step_a = self.angle_step_deg / 2.0 if self.angle_step_deg > 0 else 1.0
x0 = np.array([0.0, 0.0, 0.0])
simplex = np.array([
x0,
x0 + [0.5, 0.0, 0.0],
x0 + [0.0, 0.5, 0.0],
x0 + [0.0, 0.0, step_a],
])
fvals = np.array([_score(tuple(s)) for s in simplex])
for _ in range(max_iter):
order = np.argsort(fvals)
simplex = simplex[order]; fvals = fvals[order]
if abs(fvals[-1] - fvals[0]) < tol:
break
centroid = simplex[:-1].mean(axis=0)
xr = centroid + 1.0 * (centroid - simplex[-1])
fr = _score(tuple(xr))
if fvals[0] <= fr < fvals[-2]:
simplex[-1] = xr; fvals[-1] = fr
continue
if fr < fvals[0]:
xe = centroid + 2.0 * (centroid - simplex[-1])
fe = _score(tuple(xe))
if fe < fr:
simplex[-1] = xe; fvals[-1] = fe
else:
simplex[-1] = xr; fvals[-1] = fr
continue
xc = centroid + 0.5 * (simplex[-1] - centroid)
fc = _score(tuple(xc))
if fc < fvals[-1]:
simplex[-1] = xc; fvals[-1] = fc
continue
for k in range(1, 4):
simplex[k] = simplex[0] + 0.5 * (simplex[k] - simplex[0])
fvals[k] = _score(tuple(simplex[k]))
best_i = int(np.argmin(fvals))
ddx, ddy, dang = simplex[best_i]
return (angle_deg + float(dang), -float(fvals[best_i]),
cx + float(ddx), cy + float(ddy))
def _refine_angle(
self,
spread0: np.ndarray, # bitmap uint8 (H, W)
bit_active: int,
template_gray: np.ndarray,
cx: float, cy: float,
angle_deg: float, scale: float,
mask_full: np.ndarray,
angle_fine_step: float = 0.5,
search_radius: float | None = None,
original_score: float | None = None,
) -> tuple[float, float, float, float]:
"""Ricerca angolare fine (sub-step) attorno al match grezzo.
Genera 5 template temporanei a angle ± {0.5, 1.0} * step e sceglie
l'angolo con score massimo (parabolic fit sulle 3 score centrali).
Ritorna (angle_refined, score, cx_refined, cy_refined).
"""
# Se il match grezzo è già quasi perfetto, NON refinare
if original_score is not None and original_score >= 0.99:
return (angle_deg, original_score, cx, cy)
if search_radius is None:
search_radius = self.angle_step_deg / 2.0
h, w = template_gray.shape
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE)
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0)
center = (diag / 2.0, diag / 2.0)
H, W = spread0.shape
margin = 3
def _score_at_angle(off: float) -> tuple[float, float, float]:
"""Ritorna (score, best_cx, best_cy) per angolo = angle_deg + off."""
ang = angle_deg + off
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
if len(fx) < 8:
return (0.0, cx, cy)
dx = (fx - center[0]).astype(np.int32)
dy = (fy - center[1]).astype(np.int32)
y_lo = int(cy) - margin; y_hi = int(cy) + margin + 1
x_lo = int(cx) - margin; x_hi = int(cx) + margin + 1
sh_w = y_hi - y_lo; sw_w = x_hi - x_lo
acc = np.zeros((sh_w, sw_w), dtype=np.float32)
for i in range(len(dx)):
ddx = int(dx[i]); ddy = int(dy[i]); b = int(fb[i])
bit = np.uint8(1 << b)
sy0 = y_lo + ddy; sy1 = y_hi + ddy
sx0 = x_lo + ddx; sx1 = x_hi + ddx
a_y0 = max(0, -sy0); a_y1 = sh_w - max(0, sy1 - H)
a_x0 = max(0, -sx0); a_x1 = sw_w - max(0, sx1 - W)
s_y0 = max(0, sy0); s_y1 = min(H, sy1)
s_x0 = max(0, sx0); s_x1 = min(W, sx1)
if s_y1 > s_y0 and s_x1 > s_x0:
region = spread0[s_y0:s_y1, s_x0:s_x1]
acc[a_y0:a_y1, a_x0:a_x1] += (
(region & bit) != 0
).astype(np.float32)
acc /= len(dx)
_, max_val, _, max_loc = cv2.minMaxLoc(acc)
return (float(max_val),
float(x_lo + max_loc[0]), float(y_lo + max_loc[1]))
# Golden-section search su [-search_radius, +search_radius]:
# converge in log tempo a precisione ~0.1°, ~8 valutazioni vs 5
# ma centrate su picco reale (non sample equispaziati).
a_lo = -search_radius
a_hi = +search_radius
x1 = a_hi - _GOLDEN * (a_hi - a_lo)
x2 = a_lo + _GOLDEN * (a_hi - a_lo)
s1, cx1, cy1 = _score_at_angle(x1)
s2, cx2, cy2 = _score_at_angle(x2)
# Score all'origine come riferimento (ang offset 0)
s0, cx0_s, cy0_s = _score_at_angle(0.0)
best = (angle_deg, s0, cx0_s, cy0_s)
tol = 0.1 # gradi
for _ in range(8):
if s1 > best[1]:
best = (angle_deg + x1, s1, cx1, cy1)
if s2 > best[1]:
best = (angle_deg + x2, s2, cx2, cy2)
if abs(a_hi - a_lo) < tol:
break
if s1 > s2:
a_hi = x2
x2 = x1; s2 = s1; cx2 = cx1; cy2 = cy1
x1 = a_hi - _GOLDEN * (a_hi - a_lo)
s1, cx1, cy1 = _score_at_angle(x1)
else:
a_lo = x1
x1 = x2; s1 = s2; cx1 = cx2; cy1 = cy2
x2 = a_lo + _GOLDEN * (a_hi - a_lo)
s2, cx2, cy2 = _score_at_angle(x2)
return best
def _verify_ncc(
self, scene_gray: np.ndarray, cx: float, cy: float,
angle_deg: float, scale: float,
) -> float:
"""NCC tra template warpato alla pose e scena sottostante.
Lavora su un **crop locale** della scena di lato = diagonale del
template ruotato+scalato, non sull'intera scena. Su scene grandi
(1920×1080) taglia drasticamente il costo del warp per ogni match.
Ritorna score [-1, 1]. Usato come filtro anti-falso-positivo:
il matcher linemod può dare score alto su texture generiche ma
sovrapponendo il template gray i pixel non corrispondono.
"""
if self.template_gray is None:
return 1.0
t = self.template_gray
h, w = t.shape
cx_t = (w - 1) / 2.0
cy_t = (h - 1) / 2.0
# Bounding box del template ruotato/scalato attorno a (cx, cy)
diag = int(np.ceil(np.hypot(w, h) * scale)) + 8
H, W = scene_gray.shape
x0 = int(round(cx)) - diag // 2
y0 = int(round(cy)) - diag // 2
cx0 = max(0, x0); cy0 = max(0, y0)
cx1 = min(W, x0 + diag); cy1 = min(H, y0 + diag)
if cx1 - cx0 < 10 or cy1 - cy0 < 10:
return 0.0
scn_crop = scene_gray[cy0:cy1, cx0:cx1]
ch, cw = scn_crop.shape
M = cv2.getRotationMatrix2D((cx_t, cy_t), angle_deg, scale)
# Porta il centro del template a (cx - cx0, cy - cy0) del crop
M[0, 2] += (cx - cx0) - cx_t
M[1, 2] += (cy - cy0) - cy_t
warped = cv2.warpAffine(
t, M, (cw, ch),
flags=cv2.INTER_LINEAR, borderValue=0,
)
if self._train_mask is not None:
mask_src = self._train_mask
else:
mask_src = np.full_like(t, 255)
mask_w = cv2.warpAffine(
mask_src, M, (cw, ch),
flags=cv2.INTER_NEAREST, borderValue=0,
)
valid = mask_w > 0
if valid.sum() < 20:
return 0.0
tpl = warped[valid].astype(np.float32)
scn = scn_crop[valid].astype(np.float32)
tm = tpl - tpl.mean()
sm = scn - scn.mean()
denom = np.sqrt((tm * tm).sum() * (sm * sm).sum()) + 1e-9
return float((tm * sm).sum() / denom)
def find(
self,
scene_bgr: np.ndarray,
min_score: float = 0.6,
max_matches: int = 20,
nms_radius: int | None = None,
refine_angle: bool = True,
subpixel: bool = True,
verify_ncc: bool = True,
verify_threshold: float = 0.4,
coarse_angle_factor: int = 2,
scale_penalty: float = 0.0,
refine_pose_joint: bool = False,
) -> list[Match]:
"""
scale_penalty: se > 0, riduce lo score per match a scala diversa da 1.0:
score_final = score_shape * max(0, 1 - scale_penalty * |scale - 1|)
Utile se l'operatore vuole che match "identico al template anche per
dimensione" abbia score più alto di match "stessa forma, dimensione
diversa". scale_penalty=0 (default) = comportamento shape puro.
"""
if not self.variants:
raise RuntimeError("Matcher non addestrato: chiamare train() prima.")
gray0 = self._to_gray(scene_bgr)
grays = [gray0]
for _ in range(self.pyramid_levels - 1):
grays.append(cv2.pyrDown(grays[-1]))
top = len(grays) - 1
# Spread bitmap (uint8) al top level: 32× meno memoria della response
# map float32 → MOLTO più cache-friendly per _score_by_shift.
spread_top = self._spread_bitmap(grays[top])
bit_active_top = int(
sum(1 << b for b in range(N_BINS)
if (spread_top & np.uint8(1 << b)).any())
)
if nms_radius is None:
nms_radius = max(8, min(self.template_size) // 2)
top_thresh = min_score * self.top_score_factor
tw, th = self.template_size
density_top = _jit_popcount(spread_top)
sf_top = 2 ** top
bg_cache_top: dict[float, np.ndarray] = {}
bg_cache_full: dict[float, np.ndarray] = {}
unique_scales = sorted({var.scale for var in self.variants})
def _bg_for_scale(density: np.ndarray, scale: float,
divisor: int) -> np.ndarray:
bw = max(9, int(round(tw * scale / divisor)))
bh = max(9, int(round(th * scale / divisor)))
sm = cv2.boxFilter(density, cv2.CV_32F, (bw, bh))
return np.clip(sm / N_BINS, 0.0, 0.99)
for sc in unique_scales:
bg_cache_top[sc] = _bg_for_scale(density_top, sc, sf_top)
def _rescore(score: np.ndarray, bg: np.ndarray) -> np.ndarray:
return np.maximum(0.0, (score - bg) / (1.0 - bg + 1e-6))
# Coarse-to-fine angolare:
# 1) Raggruppa varianti per scala, ordina per angolo
# 2) Top-level: valuta solo 1 ogni coarse_angle_factor varianti
# 3) Espandi ai vicini nel full-res
variants_by_scale: dict[float, list[int]] = {}
for vi, var in enumerate(self.variants):
variants_by_scale.setdefault(var.scale, []).append(vi)
coarse_idx_list: list[int] = [] # varianti da valutare al top
neighbor_map: dict[int, list[int]] = {} # vi_coarse -> indici vicini
cf = max(1, coarse_angle_factor)
for scale_key, vi_list in variants_by_scale.items():
vi_sorted = sorted(vi_list, key=lambda i: self.variants[i].angle_deg)
n = len(vi_sorted)
for i in range(0, n, cf):
vi_c = vi_sorted[i]
coarse_idx_list.append(vi_c)
# Vicini: ±cf/2 attorno a i (stessa scala)
half = cf // 2
start = max(0, i - half)
end = min(n, i + half + 1)
neighbor_map[vi_c] = vi_sorted[start:end]
# Pruning varianti via top-level (parallelizzato) - solo coarse
def _top_score(vi: int) -> tuple[int, float]:
var = self.variants[vi]
lvl = var.levels[min(top, len(var.levels) - 1)]
score = _jit_score_bitmap_rescored(
spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top,
bg_cache_top[var.scale],
)
return vi, float(score.max()) if score.size else -1.0
kept_coarse: list[tuple[int, float]] = []
all_top_scores: list[tuple[int, float]] = []
if self.n_threads > 1 and len(coarse_idx_list) > 1:
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
for vi, best in ex.map(_top_score, coarse_idx_list):
all_top_scores.append((vi, best))
if best >= top_thresh:
kept_coarse.append((vi, best))
else:
for vi in coarse_idx_list:
vi2, best = _top_score(vi)
all_top_scores.append((vi2, best))
if best >= top_thresh:
kept_coarse.append((vi2, best))
# Fallback adattivo: se il rescore background ha abbattuto tutti
# gli score sotto top_thresh (scene texturate pesanti), ripesca
# le varianti migliori al top level per dare comunque una chance
# alla fase full-res invece di ritornare 0 match.
if not kept_coarse and all_top_scores:
all_top_scores.sort(key=lambda t: -t[1])
n_keep = max(4, len(all_top_scores) // 10)
# Limita a varianti con score top > 0 (non completamente a zero)
kept_coarse = [(vi, s) for vi, s in all_top_scores[:n_keep] if s > 0]
# Espandi ogni coarse promosso con i suoi vicini (stessa scala,
# angoli intermedi non valutati al top)
expanded: set[int] = set()
score_by_vi: dict[int, float] = {}
for vi_c, s_top in kept_coarse:
for vi_n in neighbor_map.get(vi_c, [vi_c]):
expanded.add(vi_n)
# Usa lo score del coarse come stima per il sort successivo
score_by_vi[vi_n] = max(score_by_vi.get(vi_n, 0.0), s_top)
kept_variants: list[tuple[int, float]] = [
(vi, score_by_vi[vi]) for vi in expanded
]
if not kept_variants:
return []
# Cap adattivo: ordina per score top-level e mantieni le più promettenti.
# Min: max_matches*8 (margine per NMS cross-variant)
# Max: 50% delle varianti totali (protegge performance con molte scale)
kept_variants.sort(key=lambda t: -t[1])
max_vars_full = max(max_matches * 8, len(self.variants) // 2)
kept_variants = kept_variants[:max_vars_full]
# Full-res (parallelizzato) con bitmap
spread0 = self._spread_bitmap(gray0)
bit_active_full = int(
sum(1 << b for b in range(N_BINS)
if (spread0 & np.uint8(1 << b)).any())
)
density_full = _jit_popcount(spread0)
for sc in unique_scales:
bg_cache_full[sc] = _bg_for_scale(density_full, sc, 1)
def _full_score(vi: int) -> tuple[int, np.ndarray]:
var = self.variants[vi]
lvl0 = var.levels[0]
score = _jit_score_bitmap_rescored(
spread0, lvl0.dx, lvl0.dy, lvl0.bin, bit_active_full,
bg_cache_full[var.scale],
)
return vi, score
candidates_per_var: list[tuple[int, np.ndarray]] = []
raw: list[tuple[float, int, int, int]] = []
var_indices = [vi for vi, _ in kept_variants]
if self.n_threads > 1 and len(var_indices) > 1:
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
results = list(ex.map(_full_score, var_indices))
else:
results = [_full_score(vi) for vi in var_indices]
def _scale_factor(s: float) -> float:
"""Penalità moltiplicativa per scala diversa da 1.0."""
if scale_penalty > 0.0 and s != 1.0:
return max(0.0, 1.0 - scale_penalty * abs(s - 1.0))
return 1.0
for vi, score in results:
pen = _scale_factor(self.variants[vi].scale)
# Ordinare/sogliare su score penalizzato: un match a scala 1.5 con
# score 0.8 e penalty=0.3 effettivamente vale 0.56, non 0.8.
score_for_sort = score if pen == 1.0 else score * pen
ys, xs = np.where(score_for_sort >= min_score)
if len(ys) == 0:
continue
vals = score_for_sort[ys, xs]
K = min(len(vals), max_matches * 5)
ord_idx = np.argpartition(-vals, K - 1)[:K]
candidates_per_var.append((vi, score)) # score_map originale
for i in ord_idx:
raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi))
raw.sort(key=lambda c: -c[0])
# Mappa vi → score_map per subpixel/refinement
score_maps = dict(candidates_per_var)
# NMS + subpixel + refinement angolare
# Usa mask salvata in train() per coerenza (se ROI poligonale).
h, w = self.template_gray.shape if self.template_gray is not None else (0, 0)
mask_full = (
self._train_mask if self._train_mask is not None
else np.full((h, w), 255, dtype=np.uint8)
)
# Plateau radius adattivo al template (evita plateau troppo ampi su
# template piccoli: 8% del lato minimo, clampato [3, 10]).
plateau_r = max(3, min(10, int(min(self.template_size) * 0.08)))
# Pre-NMS rapido su raw con coordinate intere (nms_radius ≥ 8,
# la precisione sub-pixel non cambia la decisione di reject).
# Subpixel viene calcolato DOPO il pre-NMS solo sui ~pre_cap
# preliminary sopravvissuti: prima era chiamato su ogni raw (~58k
# chiamate su clip_preciso) anche se la maggior parte veniva poi
# scartata dalla NMS, sprecando la parte più costosa del loop.
r2 = nms_radius * nms_radius
pre_cap = max(max_matches * 3, max_matches + 10)
preliminary_int: list[tuple[float, int, int, int]] = []
for score, xi, yi, vi in raw:
if any((k[1] - xi) ** 2 + (k[2] - yi) ** 2 < r2
for k in preliminary_int):
continue
preliminary_int.append((score, xi, yi, vi))
if len(preliminary_int) >= pre_cap:
break
# Subpixel + refine + verify solo sui candidati pre-NMS (max pre_cap)
kept: list[Match] = []
tw, th = self.template_size
for score, xi, yi, vi in preliminary_int:
if subpixel and vi in score_maps:
cx_f, cy_f = self._subpixel_peak(
score_maps[vi], xi, yi, plateau_radius=plateau_r,
)
else:
cx_f, cy_f = float(xi), float(yi)
var = self.variants[vi]
ang_f = var.angle_deg
score_f = score
if refine_pose_joint and self.template_gray is not None:
ang_f, score_f, cx_f, cy_f = self._refine_pose_joint(
spread0, self.template_gray, cx_f, cy_f,
var.angle_deg, var.scale, mask_full,
)
elif refine_angle and self.template_gray is not None:
ang_f, score_f, cx_f, cy_f = self._refine_angle(
spread0, bit_active_full, self.template_gray, cx_f, cy_f,
var.angle_deg, var.scale, mask_full,
search_radius=self.angle_step_deg / 2.0,
original_score=score,
)
if verify_ncc:
ncc = self._verify_ncc(gray0, cx_f, cy_f, ang_f, var.scale)
if ncc < verify_threshold:
continue
poly = _oriented_bbox_polygon(
cx_f, cy_f, tw * var.scale, th * var.scale, ang_f,
)
# Penalità scala opzionale: score degrada con distanza da 1.0
if scale_penalty > 0.0 and var.scale != 1.0:
score_f = float(score_f) * max(
0.0, 1.0 - scale_penalty * abs(var.scale - 1.0)
)
kept.append(Match(
cx=cx_f, cy=cy_f,
angle_deg=ang_f,
scale=var.scale,
score=score_f,
bbox_poly=poly,
))
if len(kept) >= max_matches:
break
return kept