Files
Shape_Model_2D/pm2d/line_matcher.py
Adriano 7f6571bdd1 feat: hysteresis edge linking (Halcon Contrast='auto' two-threshold)
_hysteresis_mask: edge linking via componenti connesse.
- seed = mag >= strong_grad
- weak = mag >= weak_grad
- Promuove a feature ogni componente weak che contiene almeno un
  pixel strong (connettivita' 8-vicini)

Riduce simultaneamente:
- Falsi positivi: edge debole isolato (rumore puro) escluso
- Falsi negativi: edge debole connesso a edge forte incluso
  (continuita' bordi sottili a basso contrasto)

Attivo automaticamente quando weak_grad < strong_grad. Se uguali,
fallback a sogliatura singola standard. Backward compat completo
dato che default weak=30, strong=60.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 23:01:54 +02:00

1764 lines
76 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Shape-based matcher stile linemod (line2Dup) - Python puro + numpy/OpenCV.
Porting algoritmico dell'idea di `meiqua/shape_based_matching` (no MIPP/SIMD —
equivalente usando vettorizzazione numpy).
Training (costoso, fatto una volta per ricetta):
- Per ogni variante (angolo, scala) del template:
1. Sobel → magnitude + orientation
2. Quantizzazione orientation in N_BINS bin (modulo π, edge simmetrici)
3. Estrazione feature sparse top-magnitude con spacing minimo
4. Salvataggio feature = liste (dx, dy, bin) relative al centro-modello
Matching (veloce):
- Scena processata una sola volta per livello di piramide:
Sobel → magnitude → quant orientation → spread (dilate per bin) →
response map (N_BINS, H, W) — bit b acceso dove orientamento b presente.
- Per ogni variante:
score_map[y,x] = Σ resp[b_i][y+dy_i, x+dx_i] / N_features
implementato con shift-add vettorizzato (numpy).
- Piramide: matching top-level (basso costo, soglia ridotta) +
refinement a risoluzione piena attorno ai candidati.
Il training supporta una `mask` binaria per modellare solo una regione parziale
della ROI (modello non-rettangolare).
"""
from __future__ import annotations
import math
import os
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import cv2
import numpy as np
_GOLDEN = (math.sqrt(5.0) - 1.0) / 2.0 # ≈ 0.618
from pm2d._jit_kernels import (
score_by_shift as _jit_score_by_shift,
score_bitmap as _jit_score_bitmap,
score_bitmap_rescored as _jit_score_bitmap_rescored,
score_bitmap_greedy as _jit_score_bitmap_greedy,
top_max_per_variant as _jit_top_max_per_variant,
popcount_density as _jit_popcount,
HAS_NUMBA,
)
N_BINS = 8 # default: orientamento mod π (no polarity)
N_BINS_POL = 16 # use_polarity=True: orientamento mod 2π (con polarity)
def opencl_available() -> bool:
"""Ritorna True se OpenCV ha backend OpenCL disponibile (GPU)."""
try:
return bool(cv2.ocl.haveOpenCL())
except Exception:
return False
def set_gpu_enabled(enabled: bool) -> bool:
"""Abilita/disabilita backend OpenCL globale di OpenCV.
Quando attivato, Sobel/dilate/warpAffine usano UMat con dispatch
automatico a kernel GPU (Intel UHD, AMD, NVIDIA via OpenCL ICD).
Speedup tipico: 1.5-3x su Sobel+dilate per scene 1920x1080,
overhead trascurabile per scene < 640px (transfer CPU<->GPU domina).
Halcon-equivalent: 'find_shape_model' con backend GPU integrato.
Ritorna True se l'attivazione e' riuscita.
"""
if not opencl_available():
return False
cv2.ocl.setUseOpenCL(bool(enabled))
return cv2.ocl.useOpenCL()
def _poly_iou(p1: np.ndarray, p2: np.ndarray) -> float:
"""IoU tra due poligoni convessi (4 vertici, float32) via cv2.intersectConvexConvex.
Usa OpenCV (cv2.intersectConvexConvex) per intersezione esatta:
ritorna area intersezione / area unione. Robusto a rotazioni
qualsiasi (anti-orarie/orarie) - cv2 normalizza orientamento.
"""
a1 = float(cv2.contourArea(p1))
a2 = float(cv2.contourArea(p2))
if a1 <= 0 or a2 <= 0:
return 0.0
inter_area, _ = cv2.intersectConvexConvex(
p1.astype(np.float32), p2.astype(np.float32),
)
inter_area = float(inter_area)
if inter_area <= 0:
return 0.0
union = a1 + a2 - inter_area
return inter_area / union if union > 0 else 0.0
def _oriented_bbox_polygon(
cx: float, cy: float, w: float, h: float, angle_deg: float,
) -> np.ndarray:
"""Ritorna 4 vertici (float32, shape (4,2)) del bbox orientato.
Convenzione coerente con cv2.getRotationMatrix2D usato nel train:
rotazione counter-clockwise (matematica) ma sistema immagine y-down,
quindi visivamente orario.
"""
w2, h2 = w / 2.0, h / 2.0
# Vertici template non-ruotato centrati al centro
corners = np.array([[-w2, -h2], [w2, -h2], [w2, h2], [-w2, h2]], np.float32)
a = np.deg2rad(angle_deg)
c, s = np.cos(a), np.sin(a)
# cv2.getRotationMatrix2D con angolo a positivo applica R = [[c,s],[-s,c]]
# (ruota counter-clockwise nel sistema matematico; y-down → orario)
R = np.array([[c, s], [-s, c]], np.float32)
rotated = corners @ R.T
rotated[:, 0] += cx
rotated[:, 1] += cy
return rotated
@dataclass
class Match:
cx: float
cy: float
angle_deg: float
scale: float
score: float
bbox_poly: np.ndarray # (4, 2) float32 - 4 vertici ordinati (ruotato)
@dataclass
class _LevelFeatures:
"""Feature piramidate (livello l = downsample /2^l)."""
dx: np.ndarray # int32
dy: np.ndarray # int32
bin: np.ndarray # int8
n: int
@dataclass
class _Variant:
"""Template precomputato (una pose)."""
angle_deg: float
scale: float
# Feature piramide: levels[0] = full-res, levels[l] = /2^l
levels: list[_LevelFeatures]
# Bbox kernel (per visualizzazione / limiti ricerca)
kh: int
kw: int
cx_local: float # centro-modello dentro al bbox kernel
cy_local: float
# Indice template view (X feature - multi-template ensemble).
# 0 = template principale del train(); 1+ = view aggiunte via
# add_template_view(). Usato in _verify_ncc/_compute_recall per
# scegliere il template gray corretto per match.
view_idx: int = 0
class LineShapeMatcher:
"""Shape-based matcher linemod-style - Python/numpy, no SIMD."""
def __init__(
self,
num_features: int = 96,
weak_grad: float = 30.0,
strong_grad: float = 60.0,
angle_range_deg: tuple[float, float] = (0.0, 360.0),
angle_step_deg: float = 5.0,
scale_range: tuple[float, float] = (1.0, 1.0),
scale_step: float = 0.1,
spread_radius: int = 4,
min_feature_spacing: int = 3,
pyramid_levels: int = 2,
top_score_factor: float = 0.5,
n_threads: int | None = None,
use_polarity: bool = False,
use_gpu: bool = False,
) -> None:
self.num_features = num_features
self.weak_grad = weak_grad
self.strong_grad = strong_grad
self.angle_range_deg = angle_range_deg
self.angle_step_deg = angle_step_deg
self.scale_range = scale_range
self.scale_step = scale_step
self.spread_radius = spread_radius
self.min_feature_spacing = min_feature_spacing
self.pyramid_levels = max(1, pyramid_levels)
self.top_score_factor = top_score_factor
self.n_threads = n_threads or max(1, (os.cpu_count() or 2) - 1)
# Polarity-aware: 16 bin (orientamento mod 2π) usando bitmap uint16.
# Distingue edge "chiaro→scuro" da "scuro→chiaro" → 2x selettività.
# Usare quando background di scena varia (chiaro/scuro) e orientamento
# template e' direzionale.
self.use_polarity = use_polarity
self._n_bins = N_BINS_POL if use_polarity else N_BINS
# GPU offload per Sobel/dilate/warpAffine via cv2.UMat (OpenCL).
# Effettivo solo se opencl_available(); altrimenti silent fallback CPU.
self.use_gpu = bool(use_gpu and opencl_available())
if self.use_gpu:
cv2.ocl.setUseOpenCL(True)
self.variants: list[_Variant] = []
self.template_size: tuple[int, int] = (0, 0)
self.template_gray: np.ndarray | None = None
# Maschera usata in training (propagata al refine per coerenza).
self._train_mask: np.ndarray | None = None
# Multi-template ensemble (X feature): N view dello stesso pezzo
# (chiari/scuri, condizioni diverse). Template principale e' [0],
# view aggiunte via add_template_view() sono [1+]. Match restituisce
# la view che ha matchato meglio.
self._view_templates: list[tuple[np.ndarray, np.ndarray | None]] = []
# --- Helpers -------------------------------------------------------
@staticmethod
def _to_gray(img: np.ndarray) -> np.ndarray:
if img.ndim == 3:
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
return img
def _gradient(self, gray) -> tuple[np.ndarray, np.ndarray]:
# Accetta np.ndarray o cv2.UMat (per path GPU OpenCL).
gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
mag = cv2.magnitude(gx, gy)
# Quantizzazione orientation richiede CPU array (np ops): scarica
# da GPU se necessario.
if isinstance(gx, cv2.UMat):
gx = gx.get(); gy = gy.get(); mag = mag.get()
ang = np.arctan2(gy, gx) # [-π, π]
if self.use_polarity:
# Mod 2π: bin 0..15 codifica direzione + polarity edge.
ang_full = np.where(ang < 0, ang + 2.0 * np.pi, ang)
bins = np.floor(ang_full / (2.0 * np.pi) * N_BINS_POL).astype(np.int16)
bins = np.clip(bins, 0, N_BINS_POL - 1)
else:
ang_mod = np.where(ang < 0, ang + np.pi, ang)
bins = np.floor(ang_mod / np.pi * N_BINS).astype(np.int16)
bins = np.clip(bins, 0, N_BINS - 1)
return mag, bins
def _hysteresis_mask(self, mag: np.ndarray) -> np.ndarray:
"""Edge mask con hysteresis (Halcon Contrast='auto' two-threshold).
Procedura:
1. seed = pixel con mag >= strong_grad (edge nitidi)
2. weak = pixel con mag >= weak_grad (edge candidati)
3. Espande seed dentro weak via componenti connesse 8-vicini
Risultato: edge debole connesso a edge forte viene PROMOSSO a
feature valida; edge debole isolato (rumore) viene SCARTATO.
Riduce sia falsi-positivi (rumore puro) sia falsi-negativi
(continuita' interrotta su edge sottili a basso contrasto).
"""
weak = (mag >= self.weak_grad).astype(np.uint8)
strong = (mag >= self.strong_grad).astype(np.uint8)
# connectedComponentsWithStats su weak: per ogni componente,
# se contiene almeno un pixel strong → tutto componente accettato
n_lab, labels = cv2.connectedComponents(weak, connectivity=8)
if n_lab <= 1:
return strong.astype(bool)
# Label dei pixel strong: marker per componenti da accettare
strong_labels = np.unique(labels[strong > 0])
strong_labels = strong_labels[strong_labels > 0] # 0 = bg
if len(strong_labels) == 0:
return strong.astype(bool)
# Mask = appartiene a label di componente "promosso"
keep = np.isin(labels, strong_labels)
return keep
def _extract_features(
self, mag: np.ndarray, bins: np.ndarray, mask: np.ndarray | None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
if mask is not None:
mag = np.where(mask > 0, mag, 0)
# Halcon-style edge selection: hysteresis tra weak_grad e strong_grad.
# Edge weak connessi a edge strong sono inclusi (continuita' bordi).
# Se weak_grad >= strong_grad → fallback a soglia singola strong.
if self.weak_grad < self.strong_grad:
edge = self._hysteresis_mask(mag)
else:
edge = mag >= self.strong_grad
ys, xs = np.where(edge)
if len(xs) == 0:
return (np.zeros(0, np.int32),) * 3
vals = mag[ys, xs]
order = np.argsort(-vals)
spc = max(1, self.min_feature_spacing)
occupied = np.zeros(mag.shape, dtype=bool)
picked_x: list[int] = []
picked_y: list[int] = []
picked_b: list[int] = []
for idx in order:
y, x = int(ys[idx]), int(xs[idx])
if occupied[y, x]:
continue
picked_x.append(x); picked_y.append(y)
picked_b.append(int(bins[y, x]))
y0 = max(0, y - spc); y1 = min(mag.shape[0], y + spc + 1)
x0 = max(0, x - spc); x1 = min(mag.shape[1], x + spc + 1)
occupied[y0:y1, x0:x1] = True
if len(picked_x) >= self.num_features:
break
return (np.array(picked_x, np.int32),
np.array(picked_y, np.int32),
np.array(picked_b, np.int8))
# --- Save / Load (Halcon-style write_shape_model / read_shape_model)
def save_model(self, path: str) -> None:
"""Salva matcher addestrato su disco (formato .npz).
Persiste: parametri, template_gray, mask, e tutte le varianti
pre-computate (con piramide). Halcon-equivalent write_shape_model.
Caso d'uso: training offline su workstation, deploy su macchina
di linea senza re-train (zero secondi di startup matching).
"""
if not self.variants:
raise RuntimeError("Modello non addestrato: chiamare train() prima.")
# Flatten varianti in array piatti (npz non ama dataclass nested)
n_vars = len(self.variants)
n_levels = len(self.variants[0].levels)
var_meta = np.zeros((n_vars, 6), dtype=np.float32) # ang, scale, kh, kw, cxl, cyl
all_dx, all_dy, all_bin, all_offsets = [], [], [], []
offset = 0
all_offsets_per_level = [[] for _ in range(n_levels)]
all_dx_per_level = [[] for _ in range(n_levels)]
all_dy_per_level = [[] for _ in range(n_levels)]
all_bin_per_level = [[] for _ in range(n_levels)]
for vi, var in enumerate(self.variants):
var_meta[vi] = (
var.angle_deg, var.scale, var.kh, var.kw,
var.cx_local, var.cy_local,
)
for li, lvl in enumerate(var.levels):
all_offsets_per_level[li].append(len(all_dx_per_level[li]))
all_dx_per_level[li].extend(lvl.dx.tolist())
all_dy_per_level[li].extend(lvl.dy.tolist())
all_bin_per_level[li].extend(lvl.bin.tolist())
for li in range(n_levels):
all_offsets_per_level[li].append(len(all_dx_per_level[li]))
out = {
"_format_version": np.array([1], dtype=np.int32),
"params": np.array([
self.num_features, self.weak_grad, self.strong_grad,
self.angle_range_deg[0], self.angle_range_deg[1],
self.angle_step_deg,
self.scale_range[0], self.scale_range[1], self.scale_step,
self.spread_radius, self.min_feature_spacing,
self.pyramid_levels, self.top_score_factor,
int(self.use_polarity),
], dtype=np.float64),
"template_gray": self.template_gray,
"train_mask": self._train_mask,
"var_meta": var_meta,
"n_levels": np.array([n_levels], dtype=np.int32),
}
for li in range(n_levels):
out[f"dx_l{li}"] = np.asarray(all_dx_per_level[li], dtype=np.int32)
out[f"dy_l{li}"] = np.asarray(all_dy_per_level[li], dtype=np.int32)
out[f"bin_l{li}"] = np.asarray(all_bin_per_level[li], dtype=np.int8)
out[f"offsets_l{li}"] = np.asarray(all_offsets_per_level[li], dtype=np.int32)
np.savez_compressed(path, **out)
@classmethod
def load_model(cls, path: str) -> "LineShapeMatcher":
"""Carica matcher pre-addestrato da .npz salvato con save_model.
Halcon-equivalent read_shape_model. Bypassa completamente train():
deploy production = istantaneo.
"""
data = np.load(path, allow_pickle=False)
params = data["params"]
m = cls(
num_features=int(params[0]),
weak_grad=float(params[1]),
strong_grad=float(params[2]),
angle_range_deg=(float(params[3]), float(params[4])),
angle_step_deg=float(params[5]),
scale_range=(float(params[6]), float(params[7])),
scale_step=float(params[8]),
spread_radius=int(params[9]),
min_feature_spacing=int(params[10]),
pyramid_levels=int(params[11]),
top_score_factor=float(params[12]),
use_polarity=bool(int(params[13])),
)
tpl = data["template_gray"]
if tpl.ndim > 0 and tpl.size > 0:
m.template_gray = tpl
m.template_size = (tpl.shape[1], tpl.shape[0])
mk = data["train_mask"]
m._train_mask = mk if mk.size > 0 else None
var_meta = data["var_meta"]
n_levels = int(data["n_levels"][0])
offsets_l = [data[f"offsets_l{li}"] for li in range(n_levels)]
dx_l = [data[f"dx_l{li}"] for li in range(n_levels)]
dy_l = [data[f"dy_l{li}"] for li in range(n_levels)]
bin_l = [data[f"bin_l{li}"] for li in range(n_levels)]
m.variants = []
n_vars = var_meta.shape[0]
for vi in range(n_vars):
ang, scale, kh, kw, cxl, cyl = var_meta[vi]
levels = []
for li in range(n_levels):
i0 = int(offsets_l[li][vi])
i1 = int(offsets_l[li][vi + 1])
levels.append(_LevelFeatures(
dx=dx_l[li][i0:i1].copy(),
dy=dy_l[li][i0:i1].copy(),
bin=bin_l[li][i0:i1].copy(),
n=i1 - i0,
))
m.variants.append(_Variant(
angle_deg=float(ang), scale=float(scale),
levels=levels, kh=int(kh), kw=int(kw),
cx_local=float(cxl), cy_local=float(cyl),
))
return m
def set_angle_range_around(
self, center_deg: float, tolerance_deg: float,
) -> None:
"""Restringe angle_range a (center - tol, center + tol).
Comodo helper per scenari in cui l'orientamento del pezzo e'
noto a priori entro ±tolerance_deg (es. feeder vibrante con
guida meccanica). Riduce drasticamente le varianti generate
in train(): es. ±15° vs 360° = 24x meno varianti, training
e matching molto piu veloci.
Esempio:
m.set_angle_range_around(0, 20) # cerca solo in [-20, +20]
m.train(template)
"""
self.angle_range_deg = (
float(center_deg - tolerance_deg),
float(center_deg + tolerance_deg),
)
def _scale_list(self) -> list[float]:
s0, s1 = self.scale_range
if s0 >= s1 or self.scale_step <= 0:
return [float(s0)]
n = int(np.floor((s1 - s0) / self.scale_step)) + 1
return [float(s0 + i * self.scale_step) for i in range(n)]
def _auto_angle_step(self) -> float:
"""Step angolare derivato da dimensione template (Halcon-style).
Formula: step ≈ atan(2 / max_side) gradi. Garantisce che la
rotazione minima produca uno spostamento di ≥2 px sul perimetro
del template (sotto sample il matching coarse perde candidati).
Clampato in [0.5°, 10°].
"""
max_side = max(self.template_size) if self.template_size != (0, 0) else 64
step = math.degrees(math.atan2(2.0, float(max_side)))
return float(np.clip(step, 0.5, 10.0))
def _effective_angle_step(self) -> float:
"""Risolve angle_step_deg gestendo modalità auto (<=0)."""
if self.angle_step_deg <= 0:
return self._auto_angle_step()
return self.angle_step_deg
def _angle_list(self) -> list[float]:
a0, a1 = self.angle_range_deg
step = self._effective_angle_step()
if step <= 0 or a0 >= a1:
return [float(a0)]
n = int(np.floor((a1 - a0) / step))
return [float(a0 + i * step) for i in range(n)]
# --- Training ------------------------------------------------------
def _build_pyramid_features(
self, dx: np.ndarray, dy: np.ndarray, bin_: np.ndarray,
) -> list[_LevelFeatures]:
"""Piramide feature precomputata: livello l = /2^l con dedup."""
levels = [_LevelFeatures(dx=dx.copy(), dy=dy.copy(), bin=bin_.copy(),
n=len(dx))]
for lvl in range(1, self.pyramid_levels):
sf = 2 ** lvl
dx_l = (dx // sf).astype(np.int32)
dy_l = (dy // sf).astype(np.int32)
# Dedup: rimuove feature collassate sullo stesso (dx, dy, bin)
key = ((dx_l.astype(np.int64) << 24)
| (dy_l.astype(np.int64) << 8)
| bin_.astype(np.int64))
_, uniq = np.unique(key, return_index=True)
levels.append(_LevelFeatures(
dx=dx_l[uniq], dy=dy_l[uniq], bin=bin_[uniq], n=len(uniq),
))
return levels
def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int:
"""Genera varianti rotate+scalate con feature sparse + piramide."""
gray = self._to_gray(template_bgr)
h, w = gray.shape
self.template_size = (w, h)
self.template_gray = gray.copy()
if mask is None:
mask_full = np.full((h, w), 255, dtype=np.uint8)
else:
mask_full = (mask > 0).astype(np.uint8) * 255
self._train_mask = mask_full.copy()
self.variants.clear()
# Reset view list: template principale = view 0
self._view_templates = [(gray.copy(), mask_full.copy())]
# Invalida cache feature di refine: il template e cambiato.
self._refine_feat_cache = {}
self._build_variants_for_view(gray, mask_full, view_idx=0)
self._dedup_variants()
return len(self.variants)
def add_template_view(
self, template_bgr: np.ndarray, mask: np.ndarray | None = None,
) -> int:
"""Aggiunge una view template extra all'ensemble (Halcon-style
create_aniso_shape_model con fusione N viste).
Genera varianti del nuovo template con stessi parametri (range
angle/scale) e le APPENDE a self.variants. NCC/recall usano
automaticamente il template della view che ha matchato.
Use case: pezzo che cambia aspetto (chiaro/scuro, prima/dopo
trattamento, illuminazioni diverse) → un solo matcher resistente.
Ritorna numero TOTALE varianti dopo l'aggiunta. Le view sono
indicizzate da 1 in poi (0 e' il template del train).
"""
if not self.variants:
raise RuntimeError(
"Chiamare train(template_principale) prima di add_template_view")
gray = self._to_gray(template_bgr)
h, w = gray.shape
if (w, h) != self.template_size:
# Resize per coerenza con bbox/poly
gray = cv2.resize(gray, self.template_size, interpolation=cv2.INTER_LINEAR)
if mask is not None:
mask = cv2.resize(mask, self.template_size, interpolation=cv2.INTER_NEAREST)
if mask is None:
mask_full = np.full(gray.shape, 255, dtype=np.uint8)
else:
mask_full = (mask > 0).astype(np.uint8) * 255
view_idx = len(self._view_templates)
self._view_templates.append((gray.copy(), mask_full.copy()))
n_before = len(self.variants)
self._build_variants_for_view(gray, mask_full, view_idx=view_idx)
self._dedup_variants()
return len(self.variants) - n_before
def _build_variants_for_view(
self, gray: np.ndarray, mask_full: np.ndarray, view_idx: int,
) -> None:
"""Estrae varianti rotate+scalate per UNA view template.
Estrazione algorithm identica al train() originale, separato per
riuso da add_template_view (multi-template ensemble).
"""
h, w = gray.shape
for s in self._scale_list():
sw = max(16, int(round(w * s)))
sh = max(16, int(round(h * s)))
gray_s = cv2.resize(gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2
px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(
gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE,
)
mask_p = cv2.copyMakeBorder(
mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0,
)
center = (diag / 2.0, diag / 2.0)
for ang in self._angle_list():
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(
gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE,
)
mask_r = cv2.warpAffine(
mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0,
)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
if len(fx) < 8:
continue
cx_c = diag / 2.0
cy_c = diag / 2.0
dx = (fx - cx_c).astype(np.int32)
dy = (fy - cy_c).astype(np.int32)
x0 = int(dx.min()); x1 = int(dx.max())
y0 = int(dy.min()); y1 = int(dy.max())
kw = x1 - x0 + 1
kh = y1 - y0 + 1
cx_local = -x0
cy_local = -y0
levels = self._build_pyramid_features(dx, dy, fb)
self.variants.append(_Variant(
angle_deg=float(ang),
scale=float(s),
levels=levels,
kh=kh, kw=kw,
cx_local=float(cx_local), cy_local=float(cy_local),
view_idx=view_idx,
))
def _dedup_variants(self) -> int:
"""Rimuove varianti con feature-set identico (post-quantizzazione).
Halcon-style: con angle range = (0, 360) e simmetrie del template,
molte rotazioni producono lo stesso set quantizzato di feature.
Es: quadrato a 0/90/180/270 deg → stesse features (modulo permutazione).
Hash su feature ordinate (livello 0, full-res) elimina i duplicati.
Vantaggio: meno varianti = meno chiamate kernel JIT al top-level
senza perdere copertura angolare effettiva. Per template asimmetrici
non rimuove nulla.
"""
seen: dict[bytes, int] = {}
kept: list[_Variant] = []
removed = 0
for var in self.variants:
lvl0 = var.levels[0]
order = np.lexsort((lvl0.bin, lvl0.dy, lvl0.dx))
key = (
lvl0.dx[order].tobytes()
+ b"|" + lvl0.dy[order].tobytes()
+ b"|" + lvl0.bin[order].tobytes()
+ b"|" + str(round(var.scale, 4)).encode()
)
h = key # diretto, senza hash crypto (collision ok solo se identici)
if h in seen:
removed += 1
continue
seen[h] = len(kept)
kept.append(var)
self.variants = kept
return removed
# --- Matching ------------------------------------------------------
def _response_map(self, gray: np.ndarray) -> np.ndarray:
"""Response map shape (N_BINS, H, W) float32 (legacy path)."""
mag, bins = self._gradient(gray)
valid = mag >= self.weak_grad
k = 2 * self.spread_radius + 1
kernel = np.ones((k, k), dtype=np.uint8)
H, W = gray.shape
raw = np.zeros((N_BINS, H, W), dtype=np.float32)
for b in range(N_BINS):
mask_b = ((bins == b) & valid).astype(np.uint8)
d = cv2.dilate(mask_b, kernel)
raw[b] = d.astype(np.float32)
return raw
def _spread_bitmap(self, gray: np.ndarray) -> np.ndarray:
"""Spread bitmap: bit b acceso dove bin b è presente nel raggio.
dtype: uint8 per N_BINS=8, uint16 per N_BINS_POL=16 (use_polarity).
Se use_gpu=True: Sobel + dilate via cv2.UMat (OpenCL kernel GPU).
"""
if self.use_gpu and not isinstance(gray, cv2.UMat):
gray_in = cv2.UMat(np.ascontiguousarray(gray))
else:
gray_in = gray
mag, bins = self._gradient(gray_in)
valid = mag >= self.weak_grad
k = 2 * self.spread_radius + 1
kernel = np.ones((k, k), dtype=np.uint8)
H, W = (gray.shape if isinstance(gray, np.ndarray)
else (gray.get().shape[0], gray.get().shape[1]))
nb = self._n_bins
dtype = np.uint16 if nb > 8 else np.uint8
spread = np.zeros((H, W), dtype=dtype)
for b in range(nb):
mask_b = ((bins == b) & valid).astype(np.uint8)
if self.use_gpu:
d = cv2.dilate(cv2.UMat(mask_b), kernel)
d_np = d.get()
else:
d_np = cv2.dilate(mask_b, kernel)
spread |= (d_np.astype(dtype) << b)
return spread
@staticmethod
def _score_by_shift(
resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray,
bin_has_data: np.ndarray | None = None,
) -> np.ndarray:
"""score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx).
Dispatch a kernel Numba JIT se disponibile, altrimenti fallback numpy.
"""
return _jit_score_by_shift(resp, dx, dy, bins, bin_has_data)
@staticmethod
def _subpixel_peak(
acc: np.ndarray, x: int, y: int, plateau_radius: int = 10,
) -> tuple[float, float]:
"""Posizione sub-pixel del picco.
1. Plateau saturo → centroide pesato del plateau (peso = score).
2. Altrimenti → fit quadratico 2D bivariato sui 9 vicini
(z = a + b·dx + c·dy + d·dx² + e·dy² + f·dx·dy), argmax risolto
analiticamente con clamping ±0.5 px.
"""
H, W = acc.shape
val = float(acc[y, x])
# Plateau detection: valori >= val - 0.01 entro raggio limitato
y0 = max(0, y - plateau_radius); y1 = min(H, y + plateau_radius + 1)
x0 = max(0, x - plateau_radius); x1 = min(W, x + plateau_radius + 1)
patch = acc[y0:y1, x0:x1]
plateau = patch >= val - 0.01
if plateau.sum() > 1:
# Centroide pesato per (score - (max-0.01))² per enfatizzare i top
weights = np.where(plateau, patch - (val - 0.01), 0.0).astype(np.float64)
weights = weights * weights
total = weights.sum()
if total > 1e-9:
ys_idx, xs_idx = np.indices(patch.shape)
cx_w = (xs_idx * weights).sum() / total
cy_w = (ys_idx * weights).sum() / total
return float(x0 + cx_w), float(y0 + cy_w)
ys_m, xs_m = np.where(plateau)
return float(x0 + xs_m.mean()), float(y0 + ys_m.mean())
# Fit quadratico 2D bivariato su 3x3 intorno
if x <= 0 or x >= W - 1 or y <= 0 or y >= H - 1:
return float(x), float(y)
# Stencil 3x3: Z[i, j] con i,j ∈ {-1, 0, +1}
Z = acc[y - 1:y + 2, x - 1:x + 2].astype(np.float64)
# Coefficienti da finite differences
b_c = (Z[1, 2] - Z[1, 0]) / 2.0
c_c = (Z[2, 1] - Z[0, 1]) / 2.0
d_c = (Z[1, 2] + Z[1, 0] - 2.0 * Z[1, 1]) / 2.0
e_c = (Z[2, 1] + Z[0, 1] - 2.0 * Z[1, 1]) / 2.0
f_c = (Z[2, 2] - Z[0, 2] - Z[2, 0] + Z[0, 0]) / 4.0
# Max: risolve [2d f; f 2e][dx;dy] = [-b;-c]
det = 4.0 * d_c * e_c - f_c * f_c
if abs(det) > 1e-9:
ox = (-2.0 * e_c * b_c + f_c * c_c) / det
oy = (-2.0 * d_c * c_c + f_c * b_c) / det
else:
# Fallback separabile
ox = -b_c / (2.0 * d_c) if abs(d_c) > 1e-6 else 0.0
oy = -c_c / (2.0 * e_c) if abs(e_c) > 1e-6 else 0.0
ox = float(np.clip(ox, -0.5, 0.5))
oy = float(np.clip(oy, -0.5, 0.5))
return x + ox, y + oy
def _refine_pose_joint(
self,
spread0: np.ndarray,
template_gray: np.ndarray,
cx: float, cy: float,
angle_deg: float, scale: float,
mask_full: np.ndarray,
max_iter: int = 24,
tol: float = 1e-3,
) -> tuple[float, float, float, float]:
"""Refine congiunto (cx, cy, angle) via Nelder-Mead 3D.
Ottimizza simultaneamente posizione e angolo (vs golden search 1D
sull'angolo poi quadratico 2D su xy che alterna assi). Halcon-style:
un singolo iter LM stila il match a precisione sub-pixel + sub-step.
Ritorna (angle, score, cx, cy) dove score e quello calcolato sulla
scena spread (no template gray).
"""
h, w = template_gray.shape
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE)
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0)
center = (diag / 2.0, diag / 2.0)
H, W = spread0.shape
def _score(params: tuple[float, float, float]) -> float:
ddx, ddy, dang = params
ang = angle_deg + dang
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
if len(fx) < 8:
return 0.0
cxe = cx + ddx; cye = cy + ddy
ix = int(round(cxe)); iy = int(round(cye))
tot = 0
valid = 0
for i in range(len(fx)):
xs = ix + int(fx[i] - center[0])
ys = iy + int(fy[i] - center[1])
if 0 <= xs < W and 0 <= ys < H:
bit = np.uint8(1 << int(fb[i]))
if spread0[ys, xs] & bit:
tot += 1
valid += 1
return -float(tot) / max(1, valid) # minimize -score
# Nelder-Mead 3D inline (no scipy). Simplex iniziale: vertice + offset
# dx=±0.5px, dy=±0.5px, dθ=±step/2.
step_a = self.angle_step_deg / 2.0 if self.angle_step_deg > 0 else 1.0
x0 = np.array([0.0, 0.0, 0.0])
simplex = np.array([
x0,
x0 + [0.5, 0.0, 0.0],
x0 + [0.0, 0.5, 0.0],
x0 + [0.0, 0.0, step_a],
])
fvals = np.array([_score(tuple(s)) for s in simplex])
for _ in range(max_iter):
order = np.argsort(fvals)
simplex = simplex[order]; fvals = fvals[order]
if abs(fvals[-1] - fvals[0]) < tol:
break
centroid = simplex[:-1].mean(axis=0)
xr = centroid + 1.0 * (centroid - simplex[-1])
fr = _score(tuple(xr))
if fvals[0] <= fr < fvals[-2]:
simplex[-1] = xr; fvals[-1] = fr
continue
if fr < fvals[0]:
xe = centroid + 2.0 * (centroid - simplex[-1])
fe = _score(tuple(xe))
if fe < fr:
simplex[-1] = xe; fvals[-1] = fe
else:
simplex[-1] = xr; fvals[-1] = fr
continue
xc = centroid + 0.5 * (simplex[-1] - centroid)
fc = _score(tuple(xc))
if fc < fvals[-1]:
simplex[-1] = xc; fvals[-1] = fc
continue
for k in range(1, 4):
simplex[k] = simplex[0] + 0.5 * (simplex[k] - simplex[0])
fvals[k] = _score(tuple(simplex[k]))
best_i = int(np.argmin(fvals))
ddx, ddy, dang = simplex[best_i]
return (angle_deg + float(dang), -float(fvals[best_i]),
cx + float(ddx), cy + float(ddy))
def _refine_angle(
self,
spread0: np.ndarray, # bitmap uint8 (H, W)
bit_active: int,
template_gray: np.ndarray,
cx: float, cy: float,
angle_deg: float, scale: float,
mask_full: np.ndarray,
angle_fine_step: float = 0.5,
search_radius: float | None = None,
original_score: float | None = None,
) -> tuple[float, float, float, float]:
"""Ricerca angolare fine (sub-step) attorno al match grezzo.
Genera 5 template temporanei a angle ± {0.5, 1.0} * step e sceglie
l'angolo con score massimo (parabolic fit sulle 3 score centrali).
Ritorna (angle_refined, score, cx_refined, cy_refined).
"""
# NB: rimosso early-skip su score >= 0.99. Lo score linemod/shape
# satura facilmente a 1.0 (specie con pyramid_propagate o spread
# ampio) ma NON garantisce angolo preciso: l'angolo grezzo della
# variante e' quantizzato a multipli di angle_step (5 deg default).
# Refine angolare e' essenziale per orientamento sub-step.
if search_radius is None:
search_radius = self._effective_angle_step() / 2.0
h, w = template_gray.shape
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE)
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0)
center = (diag / 2.0, diag / 2.0)
H, W = spread0.shape
margin = 3
# Cache template features per angolo (chiave: int(round(ang*20)) =
# bucket di 0.05°). Golden-search ricontratto puo richiedere lo
# stesso bucket piu volte; evita re-warp+gradient+extract (costoso).
# Cache a livello matcher per riusare tra chiamate find() su scene
# diverse: la rotazione del template non dipende dalla scena.
if not hasattr(self, '_refine_feat_cache'):
self._refine_feat_cache = {}
feat_cache = self._refine_feat_cache
cache_scale_key = round(scale * 1000)
def _score_at_angle(off: float) -> tuple[float, float, float]:
"""Ritorna (score, best_cx, best_cy) per angolo = angle_deg + off."""
ang = angle_deg + off
ck = (round(ang * 20), cache_scale_key)
cached = feat_cache.get(ck)
if cached is not None:
fx, fy, fb = cached
else:
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
# LRU semplice: limita cache a ~256 angoli (8 angoli * 32 candidati)
if len(feat_cache) > 256:
feat_cache.pop(next(iter(feat_cache)))
feat_cache[ck] = (fx, fy, fb)
if len(fx) < 8:
return (0.0, cx, cy)
dx = (fx - center[0]).astype(np.int32)
dy = (fy - center[1]).astype(np.int32)
y_lo = int(cy) - margin; y_hi = int(cy) + margin + 1
x_lo = int(cx) - margin; x_hi = int(cx) + margin + 1
sh_w = y_hi - y_lo; sw_w = x_hi - x_lo
acc = np.zeros((sh_w, sw_w), dtype=np.float32)
spread_dtype = spread0.dtype.type
for i in range(len(dx)):
ddx = int(dx[i]); ddy = int(dy[i]); b = int(fb[i])
bit = spread_dtype(1 << b)
sy0 = y_lo + ddy; sy1 = y_hi + ddy
sx0 = x_lo + ddx; sx1 = x_hi + ddx
a_y0 = max(0, -sy0); a_y1 = sh_w - max(0, sy1 - H)
a_x0 = max(0, -sx0); a_x1 = sw_w - max(0, sx1 - W)
s_y0 = max(0, sy0); s_y1 = min(H, sy1)
s_x0 = max(0, sx0); s_x1 = min(W, sx1)
if s_y1 > s_y0 and s_x1 > s_x0:
region = spread0[s_y0:s_y1, s_x0:s_x1]
acc[a_y0:a_y1, a_x0:a_x1] += (
(region & bit) != 0
).astype(np.float32)
acc /= len(dx)
_, max_val, _, max_loc = cv2.minMaxLoc(acc)
return (float(max_val),
float(x_lo + max_loc[0]), float(y_lo + max_loc[1]))
# Golden-section search su [-search_radius, +search_radius]:
# converge in log tempo a precisione ~0.1°, ~8 valutazioni vs 5
# ma centrate su picco reale (non sample equispaziati).
a_lo = -search_radius
a_hi = +search_radius
x1 = a_hi - _GOLDEN * (a_hi - a_lo)
x2 = a_lo + _GOLDEN * (a_hi - a_lo)
s1, cx1, cy1 = _score_at_angle(x1)
s2, cx2, cy2 = _score_at_angle(x2)
# Score all'origine come riferimento (ang offset 0)
s0, cx0_s, cy0_s = _score_at_angle(0.0)
best = (angle_deg, s0, cx0_s, cy0_s)
tol = 0.1 # gradi
for _ in range(8):
if s1 > best[1]:
best = (angle_deg + x1, s1, cx1, cy1)
if s2 > best[1]:
best = (angle_deg + x2, s2, cx2, cy2)
if abs(a_hi - a_lo) < tol:
break
if s1 > s2:
a_hi = x2
x2 = x1; s2 = s1; cx2 = cx1; cy2 = cy1
x1 = a_hi - _GOLDEN * (a_hi - a_lo)
s1, cx1, cy1 = _score_at_angle(x1)
else:
a_lo = x1
x1 = x2; s1 = s2; cx1 = cx2; cy1 = cy2
x2 = a_lo + _GOLDEN * (a_hi - a_lo)
s2, cx2, cy2 = _score_at_angle(x2)
return best
def _get_view_template(
self, view_idx: int,
) -> tuple[np.ndarray | None, np.ndarray | None]:
"""Ritorna (template_gray, mask) per la view specificata.
view_idx 0 = template principale (train), 1+ = view extra
aggiunte via add_template_view. Usato per scegliere il template
corretto in NCC/recall verification quando il matcher e'
ensemble multi-template.
"""
if 0 <= view_idx < len(self._view_templates):
return self._view_templates[view_idx]
return self.template_gray, self._train_mask
def _compute_recall(
self, spread0: np.ndarray, variant: _Variant,
cx: float, cy: float, angle_deg: float,
) -> float:
"""Frazione di feature template che combaciano nello spread scena
alla pose. Halcon-equivalent: MinScore originale.
"""
if self.template_gray is None:
return 1.0
h, w = self.template_gray.shape
scale = variant.scale
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_src = (
self._train_mask if self._train_mask is not None
else np.full_like(self.template_gray, 255)
)
mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(
gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE,
)
mask_p = cv2.copyMakeBorder(
mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0,
)
center = (diag / 2.0, diag / 2.0)
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
n_feat = len(fx)
if n_feat < 4:
return 0.0
H, W = spread0.shape
spread_dtype = spread0.dtype.type
ix = int(round(cx)); iy = int(round(cy))
hits = 0
for i in range(n_feat):
xs = ix + int(fx[i] - center[0])
ys = iy + int(fy[i] - center[1])
if 0 <= xs < W and 0 <= ys < H:
bit = spread_dtype(1 << int(fb[i]))
if spread0[ys, xs] & bit:
hits += 1
return hits / n_feat
def _compute_soft_score(
self, scene_gray: np.ndarray, variant: _Variant,
cx: float, cy: float, angle_deg: float,
) -> float:
"""Soft-margin gradient similarity (Halcon Metric='use_polarity')."""
if self.template_gray is None:
return 0.0
h, w = self.template_gray.shape
scale = variant.scale
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_src = (
self._train_mask if self._train_mask is not None
else np.full_like(self.template_gray, 255)
)
mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(
gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE,
)
mask_p = cv2.copyMakeBorder(
mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0,
)
center = (diag / 2.0, diag / 2.0)
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
gx_t = cv2.Sobel(gray_r, cv2.CV_32F, 1, 0, ksize=3)
gy_t = cv2.Sobel(gray_r, cv2.CV_32F, 0, 1, ksize=3)
mag_t = cv2.magnitude(gx_t, gy_t)
_, bins_t = self._gradient(gray_r)
fx, fy, _ = self._extract_features(mag_t, bins_t, mask_r)
if len(fx) < 4:
return 0.0
gx_s = cv2.Sobel(scene_gray, cv2.CV_32F, 1, 0, ksize=3)
gy_s = cv2.Sobel(scene_gray, cv2.CV_32F, 0, 1, ksize=3)
H, W = scene_gray.shape
ix = int(round(cx)); iy = int(round(cy))
sims = []; weights = []
for i in range(len(fx)):
xs = ix + int(fx[i] - center[0])
ys = iy + int(fy[i] - center[1])
if not (0 <= xs < W and 0 <= ys < H):
continue
tx = float(gx_t[int(fy[i]), int(fx[i])])
ty = float(gy_t[int(fy[i]), int(fx[i])])
sx = float(gx_s[ys, xs]); sy = float(gy_s[ys, xs])
tm = math.hypot(tx, ty); sm = math.hypot(sx, sy)
if tm < 1e-3 or sm < 1e-3:
continue
cos_sim = (tx * sx + ty * sy) / (tm * sm)
cos_sim = max(0.0, cos_sim) if self.use_polarity else abs(cos_sim)
sims.append(cos_sim); weights.append(min(sm, 255.0))
if not sims:
return 0.0
sims_arr = np.asarray(sims, dtype=np.float32)
w_arr = np.asarray(weights, dtype=np.float32)
return float((sims_arr * w_arr).sum() / (w_arr.sum() + 1e-9))
def _subpixel_refine_lm(
self, scene_gray: np.ndarray, variant: _Variant,
cx: float, cy: float, angle_deg: float,
n_iters: int = 2,
) -> tuple[float, float]:
"""Sub-pixel refinement iterativo via gradient-field least-squares.
Halcon-equivalent SubPixel='least_squares_high'. Precisione attesa
0.05 px (vs 0.5 px del fit quadratic 2D).
"""
if self.template_gray is None:
return cx, cy
h, w = self.template_gray.shape
scale = variant.scale
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_src = (
self._train_mask if self._train_mask is not None
else np.full_like(self.template_gray, 255)
)
mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(
gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE,
)
mask_p = cv2.copyMakeBorder(
mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0,
)
center = (diag / 2.0, diag / 2.0)
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
gx_t = cv2.Sobel(gray_r, cv2.CV_32F, 1, 0, ksize=3)
gy_t = cv2.Sobel(gray_r, cv2.CV_32F, 0, 1, ksize=3)
mag_t = cv2.magnitude(gx_t, gy_t)
_, bins_t = self._gradient(gray_r)
fx, fy, _ = self._extract_features(mag_t, bins_t, mask_r)
if len(fx) < 4:
return cx, cy
n = len(fx)
ddx_t = (fx - center[0]).astype(np.float32)
ddy_t = (fy - center[1]).astype(np.float32)
gx_tf = np.array([gx_t[int(fy[i]), int(fx[i])] for i in range(n)], dtype=np.float32)
gy_tf = np.array([gy_t[int(fy[i]), int(fx[i])] for i in range(n)], dtype=np.float32)
mag_tf = np.hypot(gx_tf, gy_tf) + 1e-6
nx_t = gx_tf / mag_tf
ny_t = gy_tf / mag_tf
gx_s = cv2.Sobel(scene_gray, cv2.CV_32F, 1, 0, ksize=3)
gy_s = cv2.Sobel(scene_gray, cv2.CV_32F, 0, 1, ksize=3)
H, W = scene_gray.shape
cur_cx, cur_cy = float(cx), float(cy)
for _ in range(n_iters):
xs = cur_cx + ddx_t
ys = cur_cy + ddy_t
xs_c = np.clip(xs, 0, W - 1.001)
ys_c = np.clip(ys, 0, H - 1.001)
x0 = xs_c.astype(np.int32); y0 = ys_c.astype(np.int32)
ax = xs_c - x0; ay = ys_c - y0
def _bilin(g):
v00 = g[y0, x0]; v10 = g[y0, x0 + 1]
v01 = g[y0 + 1, x0]; v11 = g[y0 + 1, x0 + 1]
return ((1 - ax) * (1 - ay) * v00
+ ax * (1 - ay) * v10
+ (1 - ax) * ay * v01
+ ax * ay * v11)
sx_v = _bilin(gx_s)
sy_v = _bilin(gy_s)
mag_s = np.hypot(sx_v, sy_v) + 1e-6
nx_s = sx_v / mag_s
ny_s = sy_v / mag_s
w = np.minimum(mag_s, 255.0).astype(np.float32)
err_x = (nx_s - nx_t) * w
err_y = (ny_s - ny_t) * w
step_x = -float(err_x.sum()) / (w.sum() + 1e-6)
step_y = -float(err_y.sum()) / (w.sum() + 1e-6)
step_x = max(-1.0, min(1.0, step_x))
step_y = max(-1.0, min(1.0, step_y))
cur_cx += step_x
cur_cy += step_y
if abs(step_x) < 0.02 and abs(step_y) < 0.02:
break
return cur_cx, cur_cy
def _verify_ncc(
self, scene_gray: np.ndarray, cx: float, cy: float,
angle_deg: float, scale: float, view_idx: int = 0,
) -> float:
"""NCC tra template warpato alla pose e scena sottostante.
Lavora su un **crop locale** della scena di lato = diagonale del
template ruotato+scalato, non sull'intera scena. Su scene grandi
(1920×1080) taglia drasticamente il costo del warp per ogni match.
Ritorna score [-1, 1]. Usato come filtro anti-falso-positivo:
il matcher linemod può dare score alto su texture generiche ma
sovrapponendo il template gray i pixel non corrispondono.
"""
t, train_mask = self._get_view_template(view_idx)
if t is None:
return 1.0
h, w = t.shape
cx_t = (w - 1) / 2.0
cy_t = (h - 1) / 2.0
# Bounding box del template ruotato/scalato attorno a (cx, cy)
diag = int(np.ceil(np.hypot(w, h) * scale)) + 8
H, W = scene_gray.shape
x0 = int(round(cx)) - diag // 2
y0 = int(round(cy)) - diag // 2
cx0 = max(0, x0); cy0 = max(0, y0)
cx1 = min(W, x0 + diag); cy1 = min(H, y0 + diag)
if cx1 - cx0 < 10 or cy1 - cy0 < 10:
return 0.0
scn_crop = scene_gray[cy0:cy1, cx0:cx1]
ch, cw = scn_crop.shape
M = cv2.getRotationMatrix2D((cx_t, cy_t), angle_deg, scale)
# Porta il centro del template a (cx - cx0, cy - cy0) del crop
M[0, 2] += (cx - cx0) - cx_t
M[1, 2] += (cy - cy0) - cy_t
warped = cv2.warpAffine(
t, M, (cw, ch),
flags=cv2.INTER_LINEAR, borderValue=0,
)
if train_mask is not None:
mask_src = train_mask
else:
mask_src = np.full_like(t, 255)
mask_w = cv2.warpAffine(
mask_src, M, (cw, ch),
flags=cv2.INTER_NEAREST, borderValue=0,
)
valid = mask_w > 0
if valid.sum() < 20:
return 0.0
tpl = warped[valid].astype(np.float32)
scn = scn_crop[valid].astype(np.float32)
tm = tpl - tpl.mean()
sm = scn - scn.mean()
# Std minimo: se template o scena patch sono quasi uniformi
# (es. zona di sfondo bianco/nero), NCC e instabile e da false
# high-correlation. Halcon-style: scarta match.
tpl_var = float((tm * tm).sum())
scn_var = float((sm * sm).sum())
n_pix = float(valid.sum())
if tpl_var < 1e-3 * n_pix or scn_var < 1e-3 * n_pix:
return 0.0
denom = np.sqrt(tpl_var * scn_var) + 1e-9
return float((tm * sm).sum() / denom)
def find(
self,
scene_bgr: np.ndarray,
min_score: float = 0.6,
max_matches: int = 20,
nms_radius: int | None = None,
refine_angle: bool = True,
subpixel: bool = True,
verify_ncc: bool = True,
verify_threshold: float = 0.4,
ncc_skip_above: float = 1.01, # disabilitato di default: NCC sempre
coarse_angle_factor: int = 2,
coarse_stride: int = 1,
scale_penalty: float = 0.0,
search_roi: tuple[int, int, int, int] | None = None,
pyramid_propagate: bool = False, # off di default: meno duplicati
propagate_topk: int = 4,
refine_pose_joint: bool = False,
greediness: float = 0.0,
batch_top: bool = False,
nms_iou_threshold: float = 0.3,
min_recall: float = 0.0,
use_soft_score: bool = False,
subpixel_lm: bool = False,
) -> list[Match]:
"""
scale_penalty: se > 0, riduce lo score per match a scala diversa da 1.0:
score_final = score_shape * max(0, 1 - scale_penalty * |scale - 1|)
Utile se l'operatore vuole che match "identico al template anche per
dimensione" abbia score più alto di match "stessa forma, dimensione
diversa". scale_penalty=0 (default) = comportamento shape puro.
search_roi: (x, y, w, h) limita la ricerca a una regione della scena.
Equivalente a Halcon set_aoi: il matching opera su crop locale e le
coordinate output sono ri-traslate al sistema scena originale. Usare
quando si conosce a priori l'area in cui il pezzo può apparire (es.
feeder a posizione fissa) → costo proporzionale a w·h invece di W·H.
"""
if not self.variants:
raise RuntimeError("Matcher non addestrato: chiamare train() prima.")
gray_full = self._to_gray(scene_bgr)
# Applica ROI di ricerca: restringe scena a crop, ricorda offset per
# ri-traslare le coordinate dei match a fine pipeline.
if search_roi is not None:
rx, ry, rw, rh = search_roi
H_s, W_s = gray_full.shape
rx = max(0, int(rx)); ry = max(0, int(ry))
rw = max(1, min(int(rw), W_s - rx))
rh = max(1, min(int(rh), H_s - ry))
gray0 = gray_full[ry:ry + rh, rx:rx + rw]
roi_offset = (rx, ry)
else:
gray0 = gray_full
roi_offset = (0, 0)
grays = [gray0]
for _ in range(self.pyramid_levels - 1):
grays.append(cv2.pyrDown(grays[-1]))
top = len(grays) - 1
# Spread bitmap (uint8) al top level: 32× meno memoria della response
# map float32 → MOLTO più cache-friendly per _score_by_shift.
spread_top = self._spread_bitmap(grays[top])
bit_active_top = int(
sum(1 << b for b in range(self._n_bins)
if (spread_top & (spread_top.dtype.type(1) << b)).any())
)
if nms_radius is None:
nms_radius = max(8, min(self.template_size) // 2)
# Pruning adattivo allo step angolare: con step piccolo (<= 3 deg)
# ci sono molte varianti vicine, gli score top-level sono ravvicinati
# e top_thresh*0.5 e' troppo aggressivo: scarta varianti valide che
# sarebbero state riprese al full-res. Stessa cosa per
# coarse_angle_factor (skip 1 ogni 2): con step fine non e' utile.
# Risultato osservato: precisione "veloce" 10° dava risultati
# migliori di "preciso" 2° proprio perche evitava il pruning.
eff_step = self._effective_angle_step()
top_factor = self.top_score_factor
cf_eff = max(1, coarse_angle_factor)
if eff_step <= 3.0:
top_factor = max(top_factor, 0.7)
cf_eff = 1
top_thresh = min_score * top_factor
tw, th = self.template_size
density_top = _jit_popcount(spread_top)
sf_top = 2 ** top
bg_cache_top: dict[float, np.ndarray] = {}
bg_cache_full: dict[float, np.ndarray] = {}
unique_scales = sorted({var.scale for var in self.variants})
def _bg_for_scale(density: np.ndarray, scale: float,
divisor: int) -> np.ndarray:
bw = max(9, int(round(tw * scale / divisor)))
bh = max(9, int(round(th * scale / divisor)))
sm = cv2.boxFilter(density, cv2.CV_32F, (bw, bh))
return np.clip(sm / N_BINS, 0.0, 0.99)
for sc in unique_scales:
bg_cache_top[sc] = _bg_for_scale(density_top, sc, sf_top)
def _rescore(score: np.ndarray, bg: np.ndarray) -> np.ndarray:
return np.maximum(0.0, (score - bg) / (1.0 - bg + 1e-6))
# Coarse-to-fine angolare:
# 1) Raggruppa varianti per scala, ordina per angolo
# 2) Top-level: valuta solo 1 ogni coarse_angle_factor varianti
# 3) Espandi ai vicini nel full-res
variants_by_scale: dict[float, list[int]] = {}
for vi, var in enumerate(self.variants):
variants_by_scale.setdefault(var.scale, []).append(vi)
coarse_idx_list: list[int] = [] # varianti da valutare al top
neighbor_map: dict[int, list[int]] = {} # vi_coarse -> indici vicini
cf = cf_eff
for scale_key, vi_list in variants_by_scale.items():
vi_sorted = sorted(vi_list, key=lambda i: self.variants[i].angle_deg)
n = len(vi_sorted)
for i in range(0, n, cf):
vi_c = vi_sorted[i]
coarse_idx_list.append(vi_c)
# Vicini: ±cf/2 attorno a i (stessa scala)
half = cf // 2
start = max(0, i - half)
end = min(n, i + half + 1)
neighbor_map[vi_c] = vi_sorted[start:end]
# Pruning varianti via top-level (parallelizzato).
# coarse_stride > 1: 1 pixel ogni stride (~stride^2 speed-up).
# pyramid_propagate=True: top-K picchi per restringere full-res.
# greediness > 0: kernel greedy con early-exit (alternativo a rescore).
cs = max(1, int(coarse_stride))
peaks_by_vi: dict[int, list[tuple[int, int, float]]] = {}
use_greedy_top = greediness > 0.0
def _top_score(vi: int) -> tuple[int, float]:
var = self.variants[vi]
lvl = var.levels[min(top, len(var.levels) - 1)]
if use_greedy_top:
# Greedy non supporta stride né rescore bg
score = _jit_score_bitmap_greedy(
spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top,
top_thresh, greediness,
)
else:
score = _jit_score_bitmap_rescored(
spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top,
bg_cache_top[var.scale], stride=cs,
)
if score.size == 0:
return vi, -1.0
best = float(score.max())
if pyramid_propagate and best > 0:
flat = score.ravel()
k = min(propagate_topk, flat.size)
idx = np.argpartition(-flat, k - 1)[:k]
peaks: list[tuple[int, int, float]] = []
for i in idx:
s = float(flat[i])
if s < top_thresh * 0.7:
continue
yt, xt = int(i // score.shape[1]), int(i % score.shape[1])
peaks.append((xt, yt, s))
peaks_by_vi[vi] = peaks
return vi, best
kept_coarse: list[tuple[int, float]] = []
all_top_scores: list[tuple[int, float]] = []
# batch_top: usa kernel batch single-call con prange-esterno su
# varianti. Vince su threadpool quando n_vars >> n_threads e quando
# H*W top e' piccolo (overhead chiamate JIT > costo kernel).
if (batch_top and HAS_NUMBA and len(coarse_idx_list) > 4):
dx_l = []; dy_l = []; bn_l = []; vs_l = []
for vi in coarse_idx_list:
var = self.variants[vi]
lvl = var.levels[min(top, len(var.levels) - 1)]
dx_l.append(lvl.dx); dy_l.append(lvl.dy); bn_l.append(lvl.bin)
vs_l.append(var.scale)
scores_arr = _jit_top_max_per_variant(
spread_top, dx_l, dy_l, bn_l, bg_cache_top, vs_l,
bit_active_top,
)
for vi, best in zip(coarse_idx_list, scores_arr.tolist()):
all_top_scores.append((vi, best))
if best >= top_thresh:
kept_coarse.append((vi, best))
elif self.n_threads > 1 and len(coarse_idx_list) > 1:
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
for vi, best in ex.map(_top_score, coarse_idx_list):
all_top_scores.append((vi, best))
if best >= top_thresh:
kept_coarse.append((vi, best))
else:
for vi in coarse_idx_list:
vi2, best = _top_score(vi)
all_top_scores.append((vi2, best))
if best >= top_thresh:
kept_coarse.append((vi2, best))
# Fallback adattivo: se il rescore background ha abbattuto tutti
# gli score sotto top_thresh (scene texturate pesanti), ripesca
# le varianti migliori al top level per dare comunque una chance
# alla fase full-res invece di ritornare 0 match.
if not kept_coarse and all_top_scores:
all_top_scores.sort(key=lambda t: -t[1])
n_keep = max(4, len(all_top_scores) // 10)
# Limita a varianti con score top > 0 (non completamente a zero)
kept_coarse = [(vi, s) for vi, s in all_top_scores[:n_keep] if s > 0]
# Espandi ogni coarse promosso con i suoi vicini (stessa scala,
# angoli intermedi non valutati al top)
expanded: set[int] = set()
score_by_vi: dict[int, float] = {}
for vi_c, s_top in kept_coarse:
for vi_n in neighbor_map.get(vi_c, [vi_c]):
expanded.add(vi_n)
# Usa lo score del coarse come stima per il sort successivo
score_by_vi[vi_n] = max(score_by_vi.get(vi_n, 0.0), s_top)
kept_variants: list[tuple[int, float]] = [
(vi, score_by_vi[vi]) for vi in expanded
]
if not kept_variants:
return []
# Cap adattivo: ordina per score top-level e mantieni le più promettenti.
# Min: max_matches*8 (margine per NMS cross-variant)
# Max: 50% delle varianti totali (protegge performance con molte scale)
kept_variants.sort(key=lambda t: -t[1])
max_vars_full = max(max_matches * 8, len(self.variants) // 2)
kept_variants = kept_variants[:max_vars_full]
# Full-res (parallelizzato) con bitmap
spread0 = self._spread_bitmap(gray0)
bit_active_full = int(
sum(1 << b for b in range(self._n_bins)
if (spread0 & (spread0.dtype.type(1) << b)).any())
)
density_full = _jit_popcount(spread0)
for sc in unique_scales:
bg_cache_full[sc] = _bg_for_scale(density_full, sc, 1)
# Margine in full-res attorno ad ogni peak top: copre incertezza
# downsampling (sf_top px) + spread_radius + slack per NMS.
propagate_margin = sf_top + self.spread_radius + max(8, nms_radius // 2)
H_full, W_full = spread0.shape
def _full_score(vi: int) -> tuple[int, np.ndarray]:
var = self.variants[vi]
lvl0 = var.levels[0]
if not pyramid_propagate or vi not in peaks_by_vi or not peaks_by_vi[vi]:
# Path legacy: scansiona intera scena
return vi, _jit_score_bitmap_rescored(
spread0, lvl0.dx, lvl0.dy, lvl0.bin, bit_active_full,
bg_cache_full[var.scale],
)
# Path piramide propagata: valuta solo crop locali attorno
# alle posizioni dei picchi top-level (riproiettati a full-res).
score_full = np.zeros((H_full, W_full), dtype=np.float32)
mark = np.zeros((H_full, W_full), dtype=bool)
bg = bg_cache_full[var.scale]
for xt, yt, _s in peaks_by_vi[vi]:
cx0 = xt * sf_top
cy0 = yt * sf_top
x_lo = max(0, cx0 - propagate_margin)
x_hi = min(W_full, cx0 + propagate_margin + 1)
y_lo = max(0, cy0 - propagate_margin)
y_hi = min(H_full, cy0 + propagate_margin + 1)
if x_hi <= x_lo or y_hi <= y_lo:
continue
if mark[y_lo:y_hi, x_lo:x_hi].all():
continue
# Crop spread + bg, valuta kernel sul crop
spread_crop = np.ascontiguousarray(spread0[y_lo:y_hi, x_lo:x_hi])
bg_crop = np.ascontiguousarray(bg[y_lo:y_hi, x_lo:x_hi])
score_crop = _jit_score_bitmap_rescored(
spread_crop, lvl0.dx, lvl0.dy, lvl0.bin,
bit_active_full, bg_crop,
)
score_full[y_lo:y_hi, x_lo:x_hi] = np.maximum(
score_full[y_lo:y_hi, x_lo:x_hi], score_crop,
)
mark[y_lo:y_hi, x_lo:x_hi] = True
return vi, score_full
candidates_per_var: list[tuple[int, np.ndarray]] = []
raw: list[tuple[float, int, int, int]] = []
var_indices = [vi for vi, _ in kept_variants]
if self.n_threads > 1 and len(var_indices) > 1:
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
results = list(ex.map(_full_score, var_indices))
else:
results = [_full_score(vi) for vi in var_indices]
def _scale_factor(s: float) -> float:
"""Penalità moltiplicativa per scala diversa da 1.0."""
if scale_penalty > 0.0 and s != 1.0:
return max(0.0, 1.0 - scale_penalty * abs(s - 1.0))
return 1.0
for vi, score in results:
pen = _scale_factor(self.variants[vi].scale)
# Ordinare/sogliare su score penalizzato: un match a scala 1.5 con
# score 0.8 e penalty=0.3 effettivamente vale 0.56, non 0.8.
score_for_sort = score if pen == 1.0 else score * pen
ys, xs = np.where(score_for_sort >= min_score)
if len(ys) == 0:
continue
vals = score_for_sort[ys, xs]
K = min(len(vals), max_matches * 5)
ord_idx = np.argpartition(-vals, K - 1)[:K]
candidates_per_var.append((vi, score)) # score_map originale
for i in ord_idx:
raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi))
raw.sort(key=lambda c: -c[0])
# Mappa vi → score_map per subpixel/refinement
score_maps = dict(candidates_per_var)
# NMS + subpixel + refinement angolare
# Usa mask salvata in train() per coerenza (se ROI poligonale).
h, w = self.template_gray.shape if self.template_gray is not None else (0, 0)
mask_full = (
self._train_mask if self._train_mask is not None
else np.full((h, w), 255, dtype=np.uint8)
)
# Plateau radius adattivo al template (evita plateau troppo ampi su
# template piccoli: 8% del lato minimo, clampato [3, 10]).
plateau_r = max(3, min(10, int(min(self.template_size) * 0.08)))
# Pre-NMS rapido su raw con coordinate intere (nms_radius ≥ 8,
# la precisione sub-pixel non cambia la decisione di reject).
# Subpixel viene calcolato DOPO il pre-NMS solo sui ~pre_cap
# preliminary sopravvissuti: prima era chiamato su ogni raw (~58k
# chiamate su clip_preciso) anche se la maggior parte veniva poi
# scartata dalla NMS, sprecando la parte più costosa del loop.
r2 = nms_radius * nms_radius
pre_cap = max(max_matches * 3, max_matches + 10)
preliminary_int: list[tuple[float, int, int, int]] = []
for score, xi, yi, vi in raw:
if any((k[1] - xi) ** 2 + (k[2] - yi) ** 2 < r2
for k in preliminary_int):
continue
preliminary_int.append((score, xi, yi, vi))
if len(preliminary_int) >= pre_cap:
break
# Subpixel + refine + verify solo sui candidati pre-NMS (max pre_cap)
kept: list[Match] = []
tw, th = self.template_size
for score, xi, yi, vi in preliminary_int:
if subpixel and vi in score_maps:
cx_f, cy_f = self._subpixel_peak(
score_maps[vi], xi, yi, plateau_radius=plateau_r,
)
else:
cx_f, cy_f = float(xi), float(yi)
var = self.variants[vi]
ang_f = var.angle_deg
score_f = score
if refine_pose_joint and self.template_gray is not None:
ang_f, score_f, cx_f, cy_f = self._refine_pose_joint(
spread0, self.template_gray, cx_f, cy_f,
var.angle_deg, var.scale, mask_full,
)
elif refine_angle and self.template_gray is not None:
ang_f, score_f, cx_f, cy_f = self._refine_angle(
spread0, bit_active_full, self.template_gray, cx_f, cy_f,
var.angle_deg, var.scale, mask_full,
search_radius=self._effective_angle_step() / 2.0,
original_score=score,
)
# Halcon SubPixel='least_squares_high': refinement iterativo
# gradient-field per precisione 0.05 px (vs 0.5 quadratic 2D).
if subpixel_lm and self.template_gray is not None:
cx_lm, cy_lm = self._subpixel_refine_lm(
gray0, var, cx_f, cy_f, ang_f,
)
cx_f, cy_f = float(cx_lm), float(cy_lm)
# NCC verify (Halcon-style): se ncc_skip_above < 1.0 salta
# il calcolo per shape-score gia alti. Default 1.01 = NCC sempre,
# piu sicuro contro falsi positivi (lo shape-score satura facile).
# Quando NCC viene calcolato, lo score finale e' la MEDIA tra
# shape-score e NCC: rende lo score piu discriminante per
# ranking/visualizzazione (uno score 1.0 vero richiede sia
# match shape sia template gray identici).
if verify_ncc and float(score_f) < ncc_skip_above:
ncc = self._verify_ncc(
gray0, cx_f, cy_f, ang_f, var.scale,
view_idx=getattr(var, "view_idx", 0),
)
if ncc < verify_threshold:
continue
score_f = (float(score_f) + max(0.0, ncc)) * 0.5
# Soft-margin gradient similarity: sostituisce o integra lo
# score con metric continua (cos sim gradients) invece di
# bin discreto. Halcon-style: piu robusto a piccole rotazioni.
if use_soft_score:
soft = self._compute_soft_score(
gray0, var, cx_f, cy_f, ang_f,
)
score_f = (float(score_f) + soft) * 0.5
# Re-check min_score sullo score finale: NCC averaging puo
# abbattere lo shape-score sotto la soglia user. Senza questo
# check apparivano match con score < min_score (UI confusing).
if float(score_f) < min_score:
continue
# Feature recall (Halcon MinScore-style): conta quante feature
# template effettivamente combaciano nello spread scena alla
# pose finale. Scarta se sotto min_recall (default 0 = off).
# Util contro match parziali ad alto NCC ma poche feature reali.
if min_recall > 0.0:
recall = self._compute_recall(
spread0, var, cx_f, cy_f, ang_f,
)
if recall < min_recall:
continue
# Ri-traslo coord da spazio crop ROI a spazio scena originale.
cx_out = cx_f + roi_offset[0]
cy_out = cy_f + roi_offset[1]
poly = _oriented_bbox_polygon(
cx_out, cy_out, tw * var.scale, th * var.scale, ang_f,
)
# Reject match con bbox che sfora pesantemente la scena:
# spesso indica match spurio (centro derivato male o scala
# incoerente). Tollera 25% out-of-bounds, sopra rigetta.
H_scn, W_scn = gray_full.shape
poly_area = float(cv2.contourArea(poly))
if poly_area > 0:
# Clip poly alla scena: intersezione con rettangolo (0,0,W,H)
scene_rect = np.array([
[0, 0], [W_scn, 0], [W_scn, H_scn], [0, H_scn],
], dtype=np.float32)
inter, _ = cv2.intersectConvexConvex(
poly.astype(np.float32), scene_rect,
)
inside_ratio = float(inter) / poly_area
if inside_ratio < 0.75:
continue
# Penalità scala opzionale: score degrada con distanza da 1.0
if scale_penalty > 0.0 and var.scale != 1.0:
score_f = float(score_f) * max(
0.0, 1.0 - scale_penalty * abs(var.scale - 1.0)
)
# NMS post-refine cross-variant: usa IoU bbox-poligonale invece
# di sola distanza centro. Due match orientati diversi ma vicini
# (pezzi adiacenti) NON vengono fusi se l'overlap reale e basso;
# due match dello stesso pezzo (centri uguali, rotazione simile)
# hanno IoU alto e vengono droppati.
# Fallback distanza centro per match con bbox degenere.
dup = False
for k in kept:
iou = _poly_iou(k.bbox_poly, poly)
if iou > nms_iou_threshold:
dup = True
break
# Sicurezza: centri molto vicini (dentro nms_radius/2)
# sempre fusi, anche con orientamenti molto diversi.
if (k.cx - cx_out) ** 2 + (k.cy - cy_out) ** 2 < (r2 / 4.0):
dup = True
break
if dup:
continue
kept.append(Match(
cx=cx_out, cy=cy_out,
angle_deg=ang_f,
scale=var.scale,
score=score_f,
bbox_poly=poly,
))
if len(kept) >= max_matches:
break
return kept