Files
Shape_Model_2D/pm2d/line_matcher.py
Adriano 5059ce1d89 feat: use_soft_score - Halcon Metric soft-margin gradient similarity
_compute_soft_score: cos(theta_template - theta_scena) continuo
(non quantizzato a bin) pesato per magnitude. Polarity-aware se
use_polarity=True (mod 2pi) else |cos| (mod pi).

Quando use_soft_score=True (default off, backward compat), lo score
finale e' fuso con quello shape: piu discriminante per match a
piccola rotazione (penalita' graduale invece di binaria on/off).

Equivalente a Halcon Metric='use_polarity' / 'ignore_global_polarity'
in find_shape_model.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 22:32:17 +02:00

1351 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Shape-based matcher stile linemod (line2Dup) - Python puro + numpy/OpenCV.
Porting algoritmico dell'idea di `meiqua/shape_based_matching` (no MIPP/SIMD —
equivalente usando vettorizzazione numpy).
Training (costoso, fatto una volta per ricetta):
- Per ogni variante (angolo, scala) del template:
1. Sobel → magnitude + orientation
2. Quantizzazione orientation in N_BINS bin (modulo π, edge simmetrici)
3. Estrazione feature sparse top-magnitude con spacing minimo
4. Salvataggio feature = liste (dx, dy, bin) relative al centro-modello
Matching (veloce):
- Scena processata una sola volta per livello di piramide:
Sobel → magnitude → quant orientation → spread (dilate per bin) →
response map (N_BINS, H, W) — bit b acceso dove orientamento b presente.
- Per ogni variante:
score_map[y,x] = Σ resp[b_i][y+dy_i, x+dx_i] / N_features
implementato con shift-add vettorizzato (numpy).
- Piramide: matching top-level (basso costo, soglia ridotta) +
refinement a risoluzione piena attorno ai candidati.
Il training supporta una `mask` binaria per modellare solo una regione parziale
della ROI (modello non-rettangolare).
"""
from __future__ import annotations
import math
import os
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import cv2
import numpy as np
_GOLDEN = (math.sqrt(5.0) - 1.0) / 2.0 # ≈ 0.618
from pm2d._jit_kernels import (
score_by_shift as _jit_score_by_shift,
score_bitmap as _jit_score_bitmap,
score_bitmap_rescored as _jit_score_bitmap_rescored,
score_bitmap_greedy as _jit_score_bitmap_greedy,
top_max_per_variant as _jit_top_max_per_variant,
popcount_density as _jit_popcount,
HAS_NUMBA,
)
N_BINS = 8 # default: orientamento mod π (no polarity)
N_BINS_POL = 16 # use_polarity=True: orientamento mod 2π (con polarity)
def _poly_iou(p1: np.ndarray, p2: np.ndarray) -> float:
"""IoU tra due poligoni convessi (4 vertici, float32) via cv2.intersectConvexConvex.
Usa OpenCV (cv2.intersectConvexConvex) per intersezione esatta:
ritorna area intersezione / area unione. Robusto a rotazioni
qualsiasi (anti-orarie/orarie) - cv2 normalizza orientamento.
"""
a1 = float(cv2.contourArea(p1))
a2 = float(cv2.contourArea(p2))
if a1 <= 0 or a2 <= 0:
return 0.0
inter_area, _ = cv2.intersectConvexConvex(
p1.astype(np.float32), p2.astype(np.float32),
)
inter_area = float(inter_area)
if inter_area <= 0:
return 0.0
union = a1 + a2 - inter_area
return inter_area / union if union > 0 else 0.0
def _oriented_bbox_polygon(
cx: float, cy: float, w: float, h: float, angle_deg: float,
) -> np.ndarray:
"""Ritorna 4 vertici (float32, shape (4,2)) del bbox orientato.
Convenzione coerente con cv2.getRotationMatrix2D usato nel train:
rotazione counter-clockwise (matematica) ma sistema immagine y-down,
quindi visivamente orario.
"""
w2, h2 = w / 2.0, h / 2.0
# Vertici template non-ruotato centrati al centro
corners = np.array([[-w2, -h2], [w2, -h2], [w2, h2], [-w2, h2]], np.float32)
a = np.deg2rad(angle_deg)
c, s = np.cos(a), np.sin(a)
# cv2.getRotationMatrix2D con angolo a positivo applica R = [[c,s],[-s,c]]
# (ruota counter-clockwise nel sistema matematico; y-down → orario)
R = np.array([[c, s], [-s, c]], np.float32)
rotated = corners @ R.T
rotated[:, 0] += cx
rotated[:, 1] += cy
return rotated
@dataclass
class Match:
cx: float
cy: float
angle_deg: float
scale: float
score: float
bbox_poly: np.ndarray # (4, 2) float32 - 4 vertici ordinati (ruotato)
@dataclass
class _LevelFeatures:
"""Feature piramidate (livello l = downsample /2^l)."""
dx: np.ndarray # int32
dy: np.ndarray # int32
bin: np.ndarray # int8
n: int
@dataclass
class _Variant:
"""Template precomputato (una pose)."""
angle_deg: float
scale: float
# Feature piramide: levels[0] = full-res, levels[l] = /2^l
levels: list[_LevelFeatures]
# Bbox kernel (per visualizzazione / limiti ricerca)
kh: int
kw: int
cx_local: float # centro-modello dentro al bbox kernel
cy_local: float
class LineShapeMatcher:
"""Shape-based matcher linemod-style - Python/numpy, no SIMD."""
def __init__(
self,
num_features: int = 96,
weak_grad: float = 30.0,
strong_grad: float = 60.0,
angle_range_deg: tuple[float, float] = (0.0, 360.0),
angle_step_deg: float = 5.0,
scale_range: tuple[float, float] = (1.0, 1.0),
scale_step: float = 0.1,
spread_radius: int = 4,
min_feature_spacing: int = 3,
pyramid_levels: int = 2,
top_score_factor: float = 0.5,
n_threads: int | None = None,
use_polarity: bool = False,
) -> None:
self.num_features = num_features
self.weak_grad = weak_grad
self.strong_grad = strong_grad
self.angle_range_deg = angle_range_deg
self.angle_step_deg = angle_step_deg
self.scale_range = scale_range
self.scale_step = scale_step
self.spread_radius = spread_radius
self.min_feature_spacing = min_feature_spacing
self.pyramid_levels = max(1, pyramid_levels)
self.top_score_factor = top_score_factor
self.n_threads = n_threads or max(1, (os.cpu_count() or 2) - 1)
# Polarity-aware: 16 bin (orientamento mod 2π) usando bitmap uint16.
# Distingue edge "chiaro→scuro" da "scuro→chiaro" → 2x selettività.
# Usare quando background di scena varia (chiaro/scuro) e orientamento
# template e' direzionale.
self.use_polarity = use_polarity
self._n_bins = N_BINS_POL if use_polarity else N_BINS
self.variants: list[_Variant] = []
self.template_size: tuple[int, int] = (0, 0)
self.template_gray: np.ndarray | None = None
# Maschera usata in training (propagata al refine per coerenza).
self._train_mask: np.ndarray | None = None
# --- Helpers -------------------------------------------------------
@staticmethod
def _to_gray(img: np.ndarray) -> np.ndarray:
if img.ndim == 3:
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
return img
def _gradient(self, gray: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
mag = cv2.magnitude(gx, gy)
ang = np.arctan2(gy, gx) # [-π, π]
if self.use_polarity:
# Mod 2π: bin 0..15 codifica direzione + polarity edge.
ang_full = np.where(ang < 0, ang + 2.0 * np.pi, ang)
bins = np.floor(ang_full / (2.0 * np.pi) * N_BINS_POL).astype(np.int16)
bins = np.clip(bins, 0, N_BINS_POL - 1)
else:
ang_mod = np.where(ang < 0, ang + np.pi, ang)
bins = np.floor(ang_mod / np.pi * N_BINS).astype(np.int16)
bins = np.clip(bins, 0, N_BINS - 1)
return mag, bins
def _extract_features(
self, mag: np.ndarray, bins: np.ndarray, mask: np.ndarray | None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
if mask is not None:
mag = np.where(mask > 0, mag, 0)
strong = mag >= self.strong_grad
ys, xs = np.where(strong)
if len(xs) == 0:
return (np.zeros(0, np.int32),) * 3
vals = mag[ys, xs]
order = np.argsort(-vals)
spc = max(1, self.min_feature_spacing)
occupied = np.zeros(mag.shape, dtype=bool)
picked_x: list[int] = []
picked_y: list[int] = []
picked_b: list[int] = []
for idx in order:
y, x = int(ys[idx]), int(xs[idx])
if occupied[y, x]:
continue
picked_x.append(x); picked_y.append(y)
picked_b.append(int(bins[y, x]))
y0 = max(0, y - spc); y1 = min(mag.shape[0], y + spc + 1)
x0 = max(0, x - spc); x1 = min(mag.shape[1], x + spc + 1)
occupied[y0:y1, x0:x1] = True
if len(picked_x) >= self.num_features:
break
return (np.array(picked_x, np.int32),
np.array(picked_y, np.int32),
np.array(picked_b, np.int8))
def set_angle_range_around(
self, center_deg: float, tolerance_deg: float,
) -> None:
"""Restringe angle_range a (center - tol, center + tol).
Comodo helper per scenari in cui l'orientamento del pezzo e'
noto a priori entro ±tolerance_deg (es. feeder vibrante con
guida meccanica). Riduce drasticamente le varianti generate
in train(): es. ±15° vs 360° = 24x meno varianti, training
e matching molto piu veloci.
Esempio:
m.set_angle_range_around(0, 20) # cerca solo in [-20, +20]
m.train(template)
"""
self.angle_range_deg = (
float(center_deg - tolerance_deg),
float(center_deg + tolerance_deg),
)
def _scale_list(self) -> list[float]:
s0, s1 = self.scale_range
if s0 >= s1 or self.scale_step <= 0:
return [float(s0)]
n = int(np.floor((s1 - s0) / self.scale_step)) + 1
return [float(s0 + i * self.scale_step) for i in range(n)]
def _auto_angle_step(self) -> float:
"""Step angolare derivato da dimensione template (Halcon-style).
Formula: step ≈ atan(2 / max_side) gradi. Garantisce che la
rotazione minima produca uno spostamento di ≥2 px sul perimetro
del template (sotto sample il matching coarse perde candidati).
Clampato in [0.5°, 10°].
"""
max_side = max(self.template_size) if self.template_size != (0, 0) else 64
step = math.degrees(math.atan2(2.0, float(max_side)))
return float(np.clip(step, 0.5, 10.0))
def _effective_angle_step(self) -> float:
"""Risolve angle_step_deg gestendo modalità auto (<=0)."""
if self.angle_step_deg <= 0:
return self._auto_angle_step()
return self.angle_step_deg
def _angle_list(self) -> list[float]:
a0, a1 = self.angle_range_deg
step = self._effective_angle_step()
if step <= 0 or a0 >= a1:
return [float(a0)]
n = int(np.floor((a1 - a0) / step))
return [float(a0 + i * step) for i in range(n)]
# --- Training ------------------------------------------------------
def _build_pyramid_features(
self, dx: np.ndarray, dy: np.ndarray, bin_: np.ndarray,
) -> list[_LevelFeatures]:
"""Piramide feature precomputata: livello l = /2^l con dedup."""
levels = [_LevelFeatures(dx=dx.copy(), dy=dy.copy(), bin=bin_.copy(),
n=len(dx))]
for lvl in range(1, self.pyramid_levels):
sf = 2 ** lvl
dx_l = (dx // sf).astype(np.int32)
dy_l = (dy // sf).astype(np.int32)
# Dedup: rimuove feature collassate sullo stesso (dx, dy, bin)
key = ((dx_l.astype(np.int64) << 24)
| (dy_l.astype(np.int64) << 8)
| bin_.astype(np.int64))
_, uniq = np.unique(key, return_index=True)
levels.append(_LevelFeatures(
dx=dx_l[uniq], dy=dy_l[uniq], bin=bin_[uniq], n=len(uniq),
))
return levels
def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int:
"""Genera varianti rotate+scalate con feature sparse + piramide."""
gray = self._to_gray(template_bgr)
h, w = gray.shape
self.template_size = (w, h)
self.template_gray = gray.copy()
if mask is None:
mask_full = np.full((h, w), 255, dtype=np.uint8)
else:
mask_full = (mask > 0).astype(np.uint8) * 255
self._train_mask = mask_full.copy()
self.variants.clear()
# Invalida cache feature di refine: il template e cambiato.
self._refine_feat_cache = {}
for s in self._scale_list():
sw = max(16, int(round(w * s)))
sh = max(16, int(round(h * s)))
gray_s = cv2.resize(gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2
px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(
gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE,
)
mask_p = cv2.copyMakeBorder(
mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0,
)
center = (diag / 2.0, diag / 2.0)
for ang in self._angle_list():
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(
gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE,
)
mask_r = cv2.warpAffine(
mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0,
)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
if len(fx) < 8:
continue
cx_c = diag / 2.0
cy_c = diag / 2.0
dx = (fx - cx_c).astype(np.int32)
dy = (fy - cy_c).astype(np.int32)
x0 = int(dx.min()); x1 = int(dx.max())
y0 = int(dy.min()); y1 = int(dy.max())
kw = x1 - x0 + 1
kh = y1 - y0 + 1
cx_local = -x0
cy_local = -y0
levels = self._build_pyramid_features(dx, dy, fb)
self.variants.append(_Variant(
angle_deg=float(ang),
scale=float(s),
levels=levels,
kh=kh, kw=kw,
cx_local=float(cx_local), cy_local=float(cy_local),
))
self._dedup_variants()
return len(self.variants)
def _dedup_variants(self) -> int:
"""Rimuove varianti con feature-set identico (post-quantizzazione).
Halcon-style: con angle range = (0, 360) e simmetrie del template,
molte rotazioni producono lo stesso set quantizzato di feature.
Es: quadrato a 0/90/180/270 deg → stesse features (modulo permutazione).
Hash su feature ordinate (livello 0, full-res) elimina i duplicati.
Vantaggio: meno varianti = meno chiamate kernel JIT al top-level
senza perdere copertura angolare effettiva. Per template asimmetrici
non rimuove nulla.
"""
seen: dict[bytes, int] = {}
kept: list[_Variant] = []
removed = 0
for var in self.variants:
lvl0 = var.levels[0]
order = np.lexsort((lvl0.bin, lvl0.dy, lvl0.dx))
key = (
lvl0.dx[order].tobytes()
+ b"|" + lvl0.dy[order].tobytes()
+ b"|" + lvl0.bin[order].tobytes()
+ b"|" + str(round(var.scale, 4)).encode()
)
h = key # diretto, senza hash crypto (collision ok solo se identici)
if h in seen:
removed += 1
continue
seen[h] = len(kept)
kept.append(var)
self.variants = kept
return removed
# --- Matching ------------------------------------------------------
def _response_map(self, gray: np.ndarray) -> np.ndarray:
"""Response map shape (N_BINS, H, W) float32 (legacy path)."""
mag, bins = self._gradient(gray)
valid = mag >= self.weak_grad
k = 2 * self.spread_radius + 1
kernel = np.ones((k, k), dtype=np.uint8)
H, W = gray.shape
raw = np.zeros((N_BINS, H, W), dtype=np.float32)
for b in range(N_BINS):
mask_b = ((bins == b) & valid).astype(np.uint8)
d = cv2.dilate(mask_b, kernel)
raw[b] = d.astype(np.float32)
return raw
def _spread_bitmap(self, gray: np.ndarray) -> np.ndarray:
"""Spread bitmap: bit b acceso dove bin b è presente nel raggio.
dtype: uint8 per N_BINS=8, uint16 per N_BINS_POL=16 (use_polarity).
"""
mag, bins = self._gradient(gray)
valid = mag >= self.weak_grad
k = 2 * self.spread_radius + 1
kernel = np.ones((k, k), dtype=np.uint8)
H, W = gray.shape
nb = self._n_bins
dtype = np.uint16 if nb > 8 else np.uint8
spread = np.zeros((H, W), dtype=dtype)
for b in range(nb):
mask_b = ((bins == b) & valid).astype(np.uint8)
d = cv2.dilate(mask_b, kernel)
spread |= (d.astype(dtype) << b)
return spread
@staticmethod
def _score_by_shift(
resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray,
bin_has_data: np.ndarray | None = None,
) -> np.ndarray:
"""score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx).
Dispatch a kernel Numba JIT se disponibile, altrimenti fallback numpy.
"""
return _jit_score_by_shift(resp, dx, dy, bins, bin_has_data)
@staticmethod
def _subpixel_peak(
acc: np.ndarray, x: int, y: int, plateau_radius: int = 10,
) -> tuple[float, float]:
"""Posizione sub-pixel del picco.
1. Plateau saturo → centroide pesato del plateau (peso = score).
2. Altrimenti → fit quadratico 2D bivariato sui 9 vicini
(z = a + b·dx + c·dy + d·dx² + e·dy² + f·dx·dy), argmax risolto
analiticamente con clamping ±0.5 px.
"""
H, W = acc.shape
val = float(acc[y, x])
# Plateau detection: valori >= val - 0.01 entro raggio limitato
y0 = max(0, y - plateau_radius); y1 = min(H, y + plateau_radius + 1)
x0 = max(0, x - plateau_radius); x1 = min(W, x + plateau_radius + 1)
patch = acc[y0:y1, x0:x1]
plateau = patch >= val - 0.01
if plateau.sum() > 1:
# Centroide pesato per (score - (max-0.01))² per enfatizzare i top
weights = np.where(plateau, patch - (val - 0.01), 0.0).astype(np.float64)
weights = weights * weights
total = weights.sum()
if total > 1e-9:
ys_idx, xs_idx = np.indices(patch.shape)
cx_w = (xs_idx * weights).sum() / total
cy_w = (ys_idx * weights).sum() / total
return float(x0 + cx_w), float(y0 + cy_w)
ys_m, xs_m = np.where(plateau)
return float(x0 + xs_m.mean()), float(y0 + ys_m.mean())
# Fit quadratico 2D bivariato su 3x3 intorno
if x <= 0 or x >= W - 1 or y <= 0 or y >= H - 1:
return float(x), float(y)
# Stencil 3x3: Z[i, j] con i,j ∈ {-1, 0, +1}
Z = acc[y - 1:y + 2, x - 1:x + 2].astype(np.float64)
# Coefficienti da finite differences
b_c = (Z[1, 2] - Z[1, 0]) / 2.0
c_c = (Z[2, 1] - Z[0, 1]) / 2.0
d_c = (Z[1, 2] + Z[1, 0] - 2.0 * Z[1, 1]) / 2.0
e_c = (Z[2, 1] + Z[0, 1] - 2.0 * Z[1, 1]) / 2.0
f_c = (Z[2, 2] - Z[0, 2] - Z[2, 0] + Z[0, 0]) / 4.0
# Max: risolve [2d f; f 2e][dx;dy] = [-b;-c]
det = 4.0 * d_c * e_c - f_c * f_c
if abs(det) > 1e-9:
ox = (-2.0 * e_c * b_c + f_c * c_c) / det
oy = (-2.0 * d_c * c_c + f_c * b_c) / det
else:
# Fallback separabile
ox = -b_c / (2.0 * d_c) if abs(d_c) > 1e-6 else 0.0
oy = -c_c / (2.0 * e_c) if abs(e_c) > 1e-6 else 0.0
ox = float(np.clip(ox, -0.5, 0.5))
oy = float(np.clip(oy, -0.5, 0.5))
return x + ox, y + oy
def _refine_pose_joint(
self,
spread0: np.ndarray,
template_gray: np.ndarray,
cx: float, cy: float,
angle_deg: float, scale: float,
mask_full: np.ndarray,
max_iter: int = 24,
tol: float = 1e-3,
) -> tuple[float, float, float, float]:
"""Refine congiunto (cx, cy, angle) via Nelder-Mead 3D.
Ottimizza simultaneamente posizione e angolo (vs golden search 1D
sull'angolo poi quadratico 2D su xy che alterna assi). Halcon-style:
un singolo iter LM stila il match a precisione sub-pixel + sub-step.
Ritorna (angle, score, cx, cy) dove score e quello calcolato sulla
scena spread (no template gray).
"""
h, w = template_gray.shape
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE)
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0)
center = (diag / 2.0, diag / 2.0)
H, W = spread0.shape
def _score(params: tuple[float, float, float]) -> float:
ddx, ddy, dang = params
ang = angle_deg + dang
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
if len(fx) < 8:
return 0.0
cxe = cx + ddx; cye = cy + ddy
ix = int(round(cxe)); iy = int(round(cye))
tot = 0
valid = 0
for i in range(len(fx)):
xs = ix + int(fx[i] - center[0])
ys = iy + int(fy[i] - center[1])
if 0 <= xs < W and 0 <= ys < H:
bit = np.uint8(1 << int(fb[i]))
if spread0[ys, xs] & bit:
tot += 1
valid += 1
return -float(tot) / max(1, valid) # minimize -score
# Nelder-Mead 3D inline (no scipy). Simplex iniziale: vertice + offset
# dx=±0.5px, dy=±0.5px, dθ=±step/2.
step_a = self.angle_step_deg / 2.0 if self.angle_step_deg > 0 else 1.0
x0 = np.array([0.0, 0.0, 0.0])
simplex = np.array([
x0,
x0 + [0.5, 0.0, 0.0],
x0 + [0.0, 0.5, 0.0],
x0 + [0.0, 0.0, step_a],
])
fvals = np.array([_score(tuple(s)) for s in simplex])
for _ in range(max_iter):
order = np.argsort(fvals)
simplex = simplex[order]; fvals = fvals[order]
if abs(fvals[-1] - fvals[0]) < tol:
break
centroid = simplex[:-1].mean(axis=0)
xr = centroid + 1.0 * (centroid - simplex[-1])
fr = _score(tuple(xr))
if fvals[0] <= fr < fvals[-2]:
simplex[-1] = xr; fvals[-1] = fr
continue
if fr < fvals[0]:
xe = centroid + 2.0 * (centroid - simplex[-1])
fe = _score(tuple(xe))
if fe < fr:
simplex[-1] = xe; fvals[-1] = fe
else:
simplex[-1] = xr; fvals[-1] = fr
continue
xc = centroid + 0.5 * (simplex[-1] - centroid)
fc = _score(tuple(xc))
if fc < fvals[-1]:
simplex[-1] = xc; fvals[-1] = fc
continue
for k in range(1, 4):
simplex[k] = simplex[0] + 0.5 * (simplex[k] - simplex[0])
fvals[k] = _score(tuple(simplex[k]))
best_i = int(np.argmin(fvals))
ddx, ddy, dang = simplex[best_i]
return (angle_deg + float(dang), -float(fvals[best_i]),
cx + float(ddx), cy + float(ddy))
def _refine_angle(
self,
spread0: np.ndarray, # bitmap uint8 (H, W)
bit_active: int,
template_gray: np.ndarray,
cx: float, cy: float,
angle_deg: float, scale: float,
mask_full: np.ndarray,
angle_fine_step: float = 0.5,
search_radius: float | None = None,
original_score: float | None = None,
) -> tuple[float, float, float, float]:
"""Ricerca angolare fine (sub-step) attorno al match grezzo.
Genera 5 template temporanei a angle ± {0.5, 1.0} * step e sceglie
l'angolo con score massimo (parabolic fit sulle 3 score centrali).
Ritorna (angle_refined, score, cx_refined, cy_refined).
"""
# NB: rimosso early-skip su score >= 0.99. Lo score linemod/shape
# satura facilmente a 1.0 (specie con pyramid_propagate o spread
# ampio) ma NON garantisce angolo preciso: l'angolo grezzo della
# variante e' quantizzato a multipli di angle_step (5 deg default).
# Refine angolare e' essenziale per orientamento sub-step.
if search_radius is None:
search_radius = self._effective_angle_step() / 2.0
h, w = template_gray.shape
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE)
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0)
center = (diag / 2.0, diag / 2.0)
H, W = spread0.shape
margin = 3
# Cache template features per angolo (chiave: int(round(ang*20)) =
# bucket di 0.05°). Golden-search ricontratto puo richiedere lo
# stesso bucket piu volte; evita re-warp+gradient+extract (costoso).
# Cache a livello matcher per riusare tra chiamate find() su scene
# diverse: la rotazione del template non dipende dalla scena.
if not hasattr(self, '_refine_feat_cache'):
self._refine_feat_cache = {}
feat_cache = self._refine_feat_cache
cache_scale_key = round(scale * 1000)
def _score_at_angle(off: float) -> tuple[float, float, float]:
"""Ritorna (score, best_cx, best_cy) per angolo = angle_deg + off."""
ang = angle_deg + off
ck = (round(ang * 20), cache_scale_key)
cached = feat_cache.get(ck)
if cached is not None:
fx, fy, fb = cached
else:
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
# LRU semplice: limita cache a ~256 angoli (8 angoli * 32 candidati)
if len(feat_cache) > 256:
feat_cache.pop(next(iter(feat_cache)))
feat_cache[ck] = (fx, fy, fb)
if len(fx) < 8:
return (0.0, cx, cy)
dx = (fx - center[0]).astype(np.int32)
dy = (fy - center[1]).astype(np.int32)
y_lo = int(cy) - margin; y_hi = int(cy) + margin + 1
x_lo = int(cx) - margin; x_hi = int(cx) + margin + 1
sh_w = y_hi - y_lo; sw_w = x_hi - x_lo
acc = np.zeros((sh_w, sw_w), dtype=np.float32)
spread_dtype = spread0.dtype.type
for i in range(len(dx)):
ddx = int(dx[i]); ddy = int(dy[i]); b = int(fb[i])
bit = spread_dtype(1 << b)
sy0 = y_lo + ddy; sy1 = y_hi + ddy
sx0 = x_lo + ddx; sx1 = x_hi + ddx
a_y0 = max(0, -sy0); a_y1 = sh_w - max(0, sy1 - H)
a_x0 = max(0, -sx0); a_x1 = sw_w - max(0, sx1 - W)
s_y0 = max(0, sy0); s_y1 = min(H, sy1)
s_x0 = max(0, sx0); s_x1 = min(W, sx1)
if s_y1 > s_y0 and s_x1 > s_x0:
region = spread0[s_y0:s_y1, s_x0:s_x1]
acc[a_y0:a_y1, a_x0:a_x1] += (
(region & bit) != 0
).astype(np.float32)
acc /= len(dx)
_, max_val, _, max_loc = cv2.minMaxLoc(acc)
return (float(max_val),
float(x_lo + max_loc[0]), float(y_lo + max_loc[1]))
# Golden-section search su [-search_radius, +search_radius]:
# converge in log tempo a precisione ~0.1°, ~8 valutazioni vs 5
# ma centrate su picco reale (non sample equispaziati).
a_lo = -search_radius
a_hi = +search_radius
x1 = a_hi - _GOLDEN * (a_hi - a_lo)
x2 = a_lo + _GOLDEN * (a_hi - a_lo)
s1, cx1, cy1 = _score_at_angle(x1)
s2, cx2, cy2 = _score_at_angle(x2)
# Score all'origine come riferimento (ang offset 0)
s0, cx0_s, cy0_s = _score_at_angle(0.0)
best = (angle_deg, s0, cx0_s, cy0_s)
tol = 0.1 # gradi
for _ in range(8):
if s1 > best[1]:
best = (angle_deg + x1, s1, cx1, cy1)
if s2 > best[1]:
best = (angle_deg + x2, s2, cx2, cy2)
if abs(a_hi - a_lo) < tol:
break
if s1 > s2:
a_hi = x2
x2 = x1; s2 = s1; cx2 = cx1; cy2 = cy1
x1 = a_hi - _GOLDEN * (a_hi - a_lo)
s1, cx1, cy1 = _score_at_angle(x1)
else:
a_lo = x1
x1 = x2; s1 = s2; cx1 = cx2; cy1 = cy2
x2 = a_lo + _GOLDEN * (a_hi - a_lo)
s2, cx2, cy2 = _score_at_angle(x2)
return best
def _compute_soft_score(
self, scene_gray: np.ndarray, variant: _Variant,
cx: float, cy: float, angle_deg: float,
) -> float:
"""Soft-margin gradient similarity (Halcon Metric='use_polarity').
Score = mean(max(0, cos(theta_template - theta_scene))) sulle
feature template alla pose, pesato per magnitude scena. Continuo
in [0, 1], piu discriminante della metric a bin (Y di "Halcon
improvements"): match a leggera rotazione = penalita' graduale
invece di on/off bin.
Polarity:
- use_polarity=True: cos(theta_t - theta_s) considera direzione
completa (mod 2pi)
- use_polarity=False: |cos(theta_t - theta_s)| considera solo
orientazione (mod pi)
"""
if self.template_gray is None:
return 0.0
h, w = self.template_gray.shape
scale = variant.scale
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_src = (
self._train_mask if self._train_mask is not None
else np.full_like(self.template_gray, 255)
)
mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(
gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE,
)
mask_p = cv2.copyMakeBorder(
mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0,
)
center = (diag / 2.0, diag / 2.0)
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
# Gradient template (continuo, non quantizzato)
gx_t = cv2.Sobel(gray_r, cv2.CV_32F, 1, 0, ksize=3)
gy_t = cv2.Sobel(gray_r, cv2.CV_32F, 0, 1, ksize=3)
mag_t = cv2.magnitude(gx_t, gy_t)
# Estrai posizioni feature alla pose
_, bins_t = self._gradient(gray_r)
fx, fy, _ = self._extract_features(mag_t, bins_t, mask_r)
if len(fx) < 4:
return 0.0
# Gradient scena (continuo)
gx_s = cv2.Sobel(scene_gray, cv2.CV_32F, 1, 0, ksize=3)
gy_s = cv2.Sobel(scene_gray, cv2.CV_32F, 0, 1, ksize=3)
H, W = scene_gray.shape
ix = int(round(cx)); iy = int(round(cy))
sims = []
weights = []
for i in range(len(fx)):
xs = ix + int(fx[i] - center[0])
ys = iy + int(fy[i] - center[1])
if not (0 <= xs < W and 0 <= ys < H):
continue
tx = float(gx_t[int(fy[i]), int(fx[i])])
ty = float(gy_t[int(fy[i]), int(fx[i])])
sx = float(gx_s[ys, xs]); sy = float(gy_s[ys, xs])
tm = math.hypot(tx, ty); sm = math.hypot(sx, sy)
if tm < 1e-3 or sm < 1e-3:
continue
# cos(theta_t - theta_s) = (tx*sx + ty*sy) / (tm*sm)
cos_sim = (tx * sx + ty * sy) / (tm * sm)
if not self.use_polarity:
# Mod pi: |cos| considera solo orientazione (no polarity)
cos_sim = abs(cos_sim)
else:
cos_sim = max(0.0, cos_sim)
sims.append(cos_sim)
weights.append(min(sm, 255.0))
if not sims:
return 0.0
sims_arr = np.asarray(sims, dtype=np.float32)
w_arr = np.asarray(weights, dtype=np.float32)
return float((sims_arr * w_arr).sum() / (w_arr.sum() + 1e-9))
def _verify_ncc(
self, scene_gray: np.ndarray, cx: float, cy: float,
angle_deg: float, scale: float,
) -> float:
"""NCC tra template warpato alla pose e scena sottostante.
Lavora su un **crop locale** della scena di lato = diagonale del
template ruotato+scalato, non sull'intera scena. Su scene grandi
(1920×1080) taglia drasticamente il costo del warp per ogni match.
Ritorna score [-1, 1]. Usato come filtro anti-falso-positivo:
il matcher linemod può dare score alto su texture generiche ma
sovrapponendo il template gray i pixel non corrispondono.
"""
if self.template_gray is None:
return 1.0
t = self.template_gray
h, w = t.shape
cx_t = (w - 1) / 2.0
cy_t = (h - 1) / 2.0
# Bounding box del template ruotato/scalato attorno a (cx, cy)
diag = int(np.ceil(np.hypot(w, h) * scale)) + 8
H, W = scene_gray.shape
x0 = int(round(cx)) - diag // 2
y0 = int(round(cy)) - diag // 2
cx0 = max(0, x0); cy0 = max(0, y0)
cx1 = min(W, x0 + diag); cy1 = min(H, y0 + diag)
if cx1 - cx0 < 10 or cy1 - cy0 < 10:
return 0.0
scn_crop = scene_gray[cy0:cy1, cx0:cx1]
ch, cw = scn_crop.shape
M = cv2.getRotationMatrix2D((cx_t, cy_t), angle_deg, scale)
# Porta il centro del template a (cx - cx0, cy - cy0) del crop
M[0, 2] += (cx - cx0) - cx_t
M[1, 2] += (cy - cy0) - cy_t
warped = cv2.warpAffine(
t, M, (cw, ch),
flags=cv2.INTER_LINEAR, borderValue=0,
)
if self._train_mask is not None:
mask_src = self._train_mask
else:
mask_src = np.full_like(t, 255)
mask_w = cv2.warpAffine(
mask_src, M, (cw, ch),
flags=cv2.INTER_NEAREST, borderValue=0,
)
valid = mask_w > 0
if valid.sum() < 20:
return 0.0
tpl = warped[valid].astype(np.float32)
scn = scn_crop[valid].astype(np.float32)
tm = tpl - tpl.mean()
sm = scn - scn.mean()
# Std minimo: se template o scena patch sono quasi uniformi
# (es. zona di sfondo bianco/nero), NCC e instabile e da false
# high-correlation. Halcon-style: scarta match.
tpl_var = float((tm * tm).sum())
scn_var = float((sm * sm).sum())
n_pix = float(valid.sum())
if tpl_var < 1e-3 * n_pix or scn_var < 1e-3 * n_pix:
return 0.0
denom = np.sqrt(tpl_var * scn_var) + 1e-9
return float((tm * sm).sum() / denom)
def find(
self,
scene_bgr: np.ndarray,
min_score: float = 0.6,
max_matches: int = 20,
nms_radius: int | None = None,
refine_angle: bool = True,
subpixel: bool = True,
verify_ncc: bool = True,
verify_threshold: float = 0.4,
ncc_skip_above: float = 1.01, # disabilitato di default: NCC sempre
coarse_angle_factor: int = 2,
coarse_stride: int = 1,
scale_penalty: float = 0.0,
search_roi: tuple[int, int, int, int] | None = None,
pyramid_propagate: bool = False, # off di default: meno duplicati
propagate_topk: int = 4,
refine_pose_joint: bool = False,
greediness: float = 0.0,
batch_top: bool = False,
nms_iou_threshold: float = 0.3,
use_soft_score: bool = False,
) -> list[Match]:
"""
scale_penalty: se > 0, riduce lo score per match a scala diversa da 1.0:
score_final = score_shape * max(0, 1 - scale_penalty * |scale - 1|)
Utile se l'operatore vuole che match "identico al template anche per
dimensione" abbia score più alto di match "stessa forma, dimensione
diversa". scale_penalty=0 (default) = comportamento shape puro.
search_roi: (x, y, w, h) limita la ricerca a una regione della scena.
Equivalente a Halcon set_aoi: il matching opera su crop locale e le
coordinate output sono ri-traslate al sistema scena originale. Usare
quando si conosce a priori l'area in cui il pezzo può apparire (es.
feeder a posizione fissa) → costo proporzionale a w·h invece di W·H.
"""
if not self.variants:
raise RuntimeError("Matcher non addestrato: chiamare train() prima.")
gray_full = self._to_gray(scene_bgr)
# Applica ROI di ricerca: restringe scena a crop, ricorda offset per
# ri-traslare le coordinate dei match a fine pipeline.
if search_roi is not None:
rx, ry, rw, rh = search_roi
H_s, W_s = gray_full.shape
rx = max(0, int(rx)); ry = max(0, int(ry))
rw = max(1, min(int(rw), W_s - rx))
rh = max(1, min(int(rh), H_s - ry))
gray0 = gray_full[ry:ry + rh, rx:rx + rw]
roi_offset = (rx, ry)
else:
gray0 = gray_full
roi_offset = (0, 0)
grays = [gray0]
for _ in range(self.pyramid_levels - 1):
grays.append(cv2.pyrDown(grays[-1]))
top = len(grays) - 1
# Spread bitmap (uint8) al top level: 32× meno memoria della response
# map float32 → MOLTO più cache-friendly per _score_by_shift.
spread_top = self._spread_bitmap(grays[top])
bit_active_top = int(
sum(1 << b for b in range(self._n_bins)
if (spread_top & (spread_top.dtype.type(1) << b)).any())
)
if nms_radius is None:
nms_radius = max(8, min(self.template_size) // 2)
# Pruning adattivo allo step angolare: con step piccolo (<= 3 deg)
# ci sono molte varianti vicine, gli score top-level sono ravvicinati
# e top_thresh*0.5 e' troppo aggressivo: scarta varianti valide che
# sarebbero state riprese al full-res. Stessa cosa per
# coarse_angle_factor (skip 1 ogni 2): con step fine non e' utile.
# Risultato osservato: precisione "veloce" 10° dava risultati
# migliori di "preciso" 2° proprio perche evitava il pruning.
eff_step = self._effective_angle_step()
top_factor = self.top_score_factor
cf_eff = max(1, coarse_angle_factor)
if eff_step <= 3.0:
top_factor = max(top_factor, 0.7)
cf_eff = 1
top_thresh = min_score * top_factor
tw, th = self.template_size
density_top = _jit_popcount(spread_top)
sf_top = 2 ** top
bg_cache_top: dict[float, np.ndarray] = {}
bg_cache_full: dict[float, np.ndarray] = {}
unique_scales = sorted({var.scale for var in self.variants})
def _bg_for_scale(density: np.ndarray, scale: float,
divisor: int) -> np.ndarray:
bw = max(9, int(round(tw * scale / divisor)))
bh = max(9, int(round(th * scale / divisor)))
sm = cv2.boxFilter(density, cv2.CV_32F, (bw, bh))
return np.clip(sm / N_BINS, 0.0, 0.99)
for sc in unique_scales:
bg_cache_top[sc] = _bg_for_scale(density_top, sc, sf_top)
def _rescore(score: np.ndarray, bg: np.ndarray) -> np.ndarray:
return np.maximum(0.0, (score - bg) / (1.0 - bg + 1e-6))
# Coarse-to-fine angolare:
# 1) Raggruppa varianti per scala, ordina per angolo
# 2) Top-level: valuta solo 1 ogni coarse_angle_factor varianti
# 3) Espandi ai vicini nel full-res
variants_by_scale: dict[float, list[int]] = {}
for vi, var in enumerate(self.variants):
variants_by_scale.setdefault(var.scale, []).append(vi)
coarse_idx_list: list[int] = [] # varianti da valutare al top
neighbor_map: dict[int, list[int]] = {} # vi_coarse -> indici vicini
cf = cf_eff
for scale_key, vi_list in variants_by_scale.items():
vi_sorted = sorted(vi_list, key=lambda i: self.variants[i].angle_deg)
n = len(vi_sorted)
for i in range(0, n, cf):
vi_c = vi_sorted[i]
coarse_idx_list.append(vi_c)
# Vicini: ±cf/2 attorno a i (stessa scala)
half = cf // 2
start = max(0, i - half)
end = min(n, i + half + 1)
neighbor_map[vi_c] = vi_sorted[start:end]
# Pruning varianti via top-level (parallelizzato).
# coarse_stride > 1: 1 pixel ogni stride (~stride^2 speed-up).
# pyramid_propagate=True: top-K picchi per restringere full-res.
# greediness > 0: kernel greedy con early-exit (alternativo a rescore).
cs = max(1, int(coarse_stride))
peaks_by_vi: dict[int, list[tuple[int, int, float]]] = {}
use_greedy_top = greediness > 0.0
def _top_score(vi: int) -> tuple[int, float]:
var = self.variants[vi]
lvl = var.levels[min(top, len(var.levels) - 1)]
if use_greedy_top:
# Greedy non supporta stride né rescore bg
score = _jit_score_bitmap_greedy(
spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top,
top_thresh, greediness,
)
else:
score = _jit_score_bitmap_rescored(
spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top,
bg_cache_top[var.scale], stride=cs,
)
if score.size == 0:
return vi, -1.0
best = float(score.max())
if pyramid_propagate and best > 0:
flat = score.ravel()
k = min(propagate_topk, flat.size)
idx = np.argpartition(-flat, k - 1)[:k]
peaks: list[tuple[int, int, float]] = []
for i in idx:
s = float(flat[i])
if s < top_thresh * 0.7:
continue
yt, xt = int(i // score.shape[1]), int(i % score.shape[1])
peaks.append((xt, yt, s))
peaks_by_vi[vi] = peaks
return vi, best
kept_coarse: list[tuple[int, float]] = []
all_top_scores: list[tuple[int, float]] = []
# batch_top: usa kernel batch single-call con prange-esterno su
# varianti. Vince su threadpool quando n_vars >> n_threads e quando
# H*W top e' piccolo (overhead chiamate JIT > costo kernel).
if (batch_top and HAS_NUMBA and len(coarse_idx_list) > 4):
dx_l = []; dy_l = []; bn_l = []; vs_l = []
for vi in coarse_idx_list:
var = self.variants[vi]
lvl = var.levels[min(top, len(var.levels) - 1)]
dx_l.append(lvl.dx); dy_l.append(lvl.dy); bn_l.append(lvl.bin)
vs_l.append(var.scale)
scores_arr = _jit_top_max_per_variant(
spread_top, dx_l, dy_l, bn_l, bg_cache_top, vs_l,
bit_active_top,
)
for vi, best in zip(coarse_idx_list, scores_arr.tolist()):
all_top_scores.append((vi, best))
if best >= top_thresh:
kept_coarse.append((vi, best))
elif self.n_threads > 1 and len(coarse_idx_list) > 1:
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
for vi, best in ex.map(_top_score, coarse_idx_list):
all_top_scores.append((vi, best))
if best >= top_thresh:
kept_coarse.append((vi, best))
else:
for vi in coarse_idx_list:
vi2, best = _top_score(vi)
all_top_scores.append((vi2, best))
if best >= top_thresh:
kept_coarse.append((vi2, best))
# Fallback adattivo: se il rescore background ha abbattuto tutti
# gli score sotto top_thresh (scene texturate pesanti), ripesca
# le varianti migliori al top level per dare comunque una chance
# alla fase full-res invece di ritornare 0 match.
if not kept_coarse and all_top_scores:
all_top_scores.sort(key=lambda t: -t[1])
n_keep = max(4, len(all_top_scores) // 10)
# Limita a varianti con score top > 0 (non completamente a zero)
kept_coarse = [(vi, s) for vi, s in all_top_scores[:n_keep] if s > 0]
# Espandi ogni coarse promosso con i suoi vicini (stessa scala,
# angoli intermedi non valutati al top)
expanded: set[int] = set()
score_by_vi: dict[int, float] = {}
for vi_c, s_top in kept_coarse:
for vi_n in neighbor_map.get(vi_c, [vi_c]):
expanded.add(vi_n)
# Usa lo score del coarse come stima per il sort successivo
score_by_vi[vi_n] = max(score_by_vi.get(vi_n, 0.0), s_top)
kept_variants: list[tuple[int, float]] = [
(vi, score_by_vi[vi]) for vi in expanded
]
if not kept_variants:
return []
# Cap adattivo: ordina per score top-level e mantieni le più promettenti.
# Min: max_matches*8 (margine per NMS cross-variant)
# Max: 50% delle varianti totali (protegge performance con molte scale)
kept_variants.sort(key=lambda t: -t[1])
max_vars_full = max(max_matches * 8, len(self.variants) // 2)
kept_variants = kept_variants[:max_vars_full]
# Full-res (parallelizzato) con bitmap
spread0 = self._spread_bitmap(gray0)
bit_active_full = int(
sum(1 << b for b in range(self._n_bins)
if (spread0 & (spread0.dtype.type(1) << b)).any())
)
density_full = _jit_popcount(spread0)
for sc in unique_scales:
bg_cache_full[sc] = _bg_for_scale(density_full, sc, 1)
# Margine in full-res attorno ad ogni peak top: copre incertezza
# downsampling (sf_top px) + spread_radius + slack per NMS.
propagate_margin = sf_top + self.spread_radius + max(8, nms_radius // 2)
H_full, W_full = spread0.shape
def _full_score(vi: int) -> tuple[int, np.ndarray]:
var = self.variants[vi]
lvl0 = var.levels[0]
if not pyramid_propagate or vi not in peaks_by_vi or not peaks_by_vi[vi]:
# Path legacy: scansiona intera scena
return vi, _jit_score_bitmap_rescored(
spread0, lvl0.dx, lvl0.dy, lvl0.bin, bit_active_full,
bg_cache_full[var.scale],
)
# Path piramide propagata: valuta solo crop locali attorno
# alle posizioni dei picchi top-level (riproiettati a full-res).
score_full = np.zeros((H_full, W_full), dtype=np.float32)
mark = np.zeros((H_full, W_full), dtype=bool)
bg = bg_cache_full[var.scale]
for xt, yt, _s in peaks_by_vi[vi]:
cx0 = xt * sf_top
cy0 = yt * sf_top
x_lo = max(0, cx0 - propagate_margin)
x_hi = min(W_full, cx0 + propagate_margin + 1)
y_lo = max(0, cy0 - propagate_margin)
y_hi = min(H_full, cy0 + propagate_margin + 1)
if x_hi <= x_lo or y_hi <= y_lo:
continue
if mark[y_lo:y_hi, x_lo:x_hi].all():
continue
# Crop spread + bg, valuta kernel sul crop
spread_crop = np.ascontiguousarray(spread0[y_lo:y_hi, x_lo:x_hi])
bg_crop = np.ascontiguousarray(bg[y_lo:y_hi, x_lo:x_hi])
score_crop = _jit_score_bitmap_rescored(
spread_crop, lvl0.dx, lvl0.dy, lvl0.bin,
bit_active_full, bg_crop,
)
score_full[y_lo:y_hi, x_lo:x_hi] = np.maximum(
score_full[y_lo:y_hi, x_lo:x_hi], score_crop,
)
mark[y_lo:y_hi, x_lo:x_hi] = True
return vi, score_full
candidates_per_var: list[tuple[int, np.ndarray]] = []
raw: list[tuple[float, int, int, int]] = []
var_indices = [vi for vi, _ in kept_variants]
if self.n_threads > 1 and len(var_indices) > 1:
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
results = list(ex.map(_full_score, var_indices))
else:
results = [_full_score(vi) for vi in var_indices]
def _scale_factor(s: float) -> float:
"""Penalità moltiplicativa per scala diversa da 1.0."""
if scale_penalty > 0.0 and s != 1.0:
return max(0.0, 1.0 - scale_penalty * abs(s - 1.0))
return 1.0
for vi, score in results:
pen = _scale_factor(self.variants[vi].scale)
# Ordinare/sogliare su score penalizzato: un match a scala 1.5 con
# score 0.8 e penalty=0.3 effettivamente vale 0.56, non 0.8.
score_for_sort = score if pen == 1.0 else score * pen
ys, xs = np.where(score_for_sort >= min_score)
if len(ys) == 0:
continue
vals = score_for_sort[ys, xs]
K = min(len(vals), max_matches * 5)
ord_idx = np.argpartition(-vals, K - 1)[:K]
candidates_per_var.append((vi, score)) # score_map originale
for i in ord_idx:
raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi))
raw.sort(key=lambda c: -c[0])
# Mappa vi → score_map per subpixel/refinement
score_maps = dict(candidates_per_var)
# NMS + subpixel + refinement angolare
# Usa mask salvata in train() per coerenza (se ROI poligonale).
h, w = self.template_gray.shape if self.template_gray is not None else (0, 0)
mask_full = (
self._train_mask if self._train_mask is not None
else np.full((h, w), 255, dtype=np.uint8)
)
# Plateau radius adattivo al template (evita plateau troppo ampi su
# template piccoli: 8% del lato minimo, clampato [3, 10]).
plateau_r = max(3, min(10, int(min(self.template_size) * 0.08)))
# Pre-NMS rapido su raw con coordinate intere (nms_radius ≥ 8,
# la precisione sub-pixel non cambia la decisione di reject).
# Subpixel viene calcolato DOPO il pre-NMS solo sui ~pre_cap
# preliminary sopravvissuti: prima era chiamato su ogni raw (~58k
# chiamate su clip_preciso) anche se la maggior parte veniva poi
# scartata dalla NMS, sprecando la parte più costosa del loop.
r2 = nms_radius * nms_radius
pre_cap = max(max_matches * 3, max_matches + 10)
preliminary_int: list[tuple[float, int, int, int]] = []
for score, xi, yi, vi in raw:
if any((k[1] - xi) ** 2 + (k[2] - yi) ** 2 < r2
for k in preliminary_int):
continue
preliminary_int.append((score, xi, yi, vi))
if len(preliminary_int) >= pre_cap:
break
# Subpixel + refine + verify solo sui candidati pre-NMS (max pre_cap)
kept: list[Match] = []
tw, th = self.template_size
for score, xi, yi, vi in preliminary_int:
if subpixel and vi in score_maps:
cx_f, cy_f = self._subpixel_peak(
score_maps[vi], xi, yi, plateau_radius=plateau_r,
)
else:
cx_f, cy_f = float(xi), float(yi)
var = self.variants[vi]
ang_f = var.angle_deg
score_f = score
if refine_pose_joint and self.template_gray is not None:
ang_f, score_f, cx_f, cy_f = self._refine_pose_joint(
spread0, self.template_gray, cx_f, cy_f,
var.angle_deg, var.scale, mask_full,
)
elif refine_angle and self.template_gray is not None:
ang_f, score_f, cx_f, cy_f = self._refine_angle(
spread0, bit_active_full, self.template_gray, cx_f, cy_f,
var.angle_deg, var.scale, mask_full,
search_radius=self._effective_angle_step() / 2.0,
original_score=score,
)
# NCC verify (Halcon-style): se ncc_skip_above < 1.0 salta
# il calcolo per shape-score gia alti. Default 1.01 = NCC sempre,
# piu sicuro contro falsi positivi (lo shape-score satura facile).
# Quando NCC viene calcolato, lo score finale e' la MEDIA tra
# shape-score e NCC: rende lo score piu discriminante per
# ranking/visualizzazione (uno score 1.0 vero richiede sia
# match shape sia template gray identici).
if verify_ncc and float(score_f) < ncc_skip_above:
ncc = self._verify_ncc(gray0, cx_f, cy_f, ang_f, var.scale)
if ncc < verify_threshold:
continue
score_f = (float(score_f) + max(0.0, ncc)) * 0.5
# Soft-margin gradient similarity: sostituisce o integra lo
# score con metric continua (cos sim gradients) invece di
# bin discreto. Halcon-style: piu robusto a piccole rotazioni.
if use_soft_score:
soft = self._compute_soft_score(
gray0, var, cx_f, cy_f, ang_f,
)
score_f = (float(score_f) + soft) * 0.5
# Re-check min_score sullo score finale: NCC averaging puo
# abbattere lo shape-score sotto la soglia user. Senza questo
# check apparivano match con score < min_score (UI confusing).
if float(score_f) < min_score:
continue
# Ri-traslo coord da spazio crop ROI a spazio scena originale.
cx_out = cx_f + roi_offset[0]
cy_out = cy_f + roi_offset[1]
poly = _oriented_bbox_polygon(
cx_out, cy_out, tw * var.scale, th * var.scale, ang_f,
)
# Reject match con bbox che sfora pesantemente la scena:
# spesso indica match spurio (centro derivato male o scala
# incoerente). Tollera 25% out-of-bounds, sopra rigetta.
H_scn, W_scn = gray_full.shape
poly_area = float(cv2.contourArea(poly))
if poly_area > 0:
# Clip poly alla scena: intersezione con rettangolo (0,0,W,H)
scene_rect = np.array([
[0, 0], [W_scn, 0], [W_scn, H_scn], [0, H_scn],
], dtype=np.float32)
inter, _ = cv2.intersectConvexConvex(
poly.astype(np.float32), scene_rect,
)
inside_ratio = float(inter) / poly_area
if inside_ratio < 0.75:
continue
# Penalità scala opzionale: score degrada con distanza da 1.0
if scale_penalty > 0.0 and var.scale != 1.0:
score_f = float(score_f) * max(
0.0, 1.0 - scale_penalty * abs(var.scale - 1.0)
)
# NMS post-refine cross-variant: usa IoU bbox-poligonale invece
# di sola distanza centro. Due match orientati diversi ma vicini
# (pezzi adiacenti) NON vengono fusi se l'overlap reale e basso;
# due match dello stesso pezzo (centri uguali, rotazione simile)
# hanno IoU alto e vengono droppati.
# Fallback distanza centro per match con bbox degenere.
dup = False
for k in kept:
iou = _poly_iou(k.bbox_poly, poly)
if iou > nms_iou_threshold:
dup = True
break
# Sicurezza: centri molto vicini (dentro nms_radius/2)
# sempre fusi, anche con orientamenti molto diversi.
if (k.cx - cx_out) ** 2 + (k.cy - cy_out) ** 2 < (r2 / 4.0):
dup = True
break
if dup:
continue
kept.append(Match(
cx=cx_out, cy=cy_out,
angle_deg=ang_f,
scale=var.scale,
score=score_f,
bbox_poly=poly,
))
if len(kept) >= max_matches:
break
return kept