74a332a2dd
LRU cache per scena: hash su prime 64KB bytes + parametri matcher (weak/strong_grad, spread_radius, n_bins, pyramid_levels). Quando hit, riusa: - piramide grays - spread_top + bit_active_top + density_top - spread0 + bit_active_full + density_full Tipico use case: UI tuning con slider min_score/verify_threshold/... produce 10+ find() consecutive su scena identica. Risparmia Sobel+dilate+popcount duplicati (~50ms su 1080p). Speedup misurato: ~15% find() su 1080p (54ms su 351ms). Vantaggio maggiore su template piccoli (kernel JIT veloce → scena precompute domina). Cache size 4, invalidata in train() (template cambiato). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1832 lines
79 KiB
Python
1832 lines
79 KiB
Python
"""Shape-based matcher stile linemod (line2Dup) - Python puro + numpy/OpenCV.
|
||
|
||
Porting algoritmico dell'idea di `meiqua/shape_based_matching` (no MIPP/SIMD —
|
||
equivalente usando vettorizzazione numpy).
|
||
|
||
Training (costoso, fatto una volta per ricetta):
|
||
- Per ogni variante (angolo, scala) del template:
|
||
1. Sobel → magnitude + orientation
|
||
2. Quantizzazione orientation in N_BINS bin (modulo π, edge simmetrici)
|
||
3. Estrazione feature sparse top-magnitude con spacing minimo
|
||
4. Salvataggio feature = liste (dx, dy, bin) relative al centro-modello
|
||
|
||
Matching (veloce):
|
||
- Scena processata una sola volta per livello di piramide:
|
||
Sobel → magnitude → quant orientation → spread (dilate per bin) →
|
||
response map (N_BINS, H, W) — bit b acceso dove orientamento b presente.
|
||
- Per ogni variante:
|
||
score_map[y,x] = Σ resp[b_i][y+dy_i, x+dx_i] / N_features
|
||
implementato con shift-add vettorizzato (numpy).
|
||
- Piramide: matching top-level (basso costo, soglia ridotta) +
|
||
refinement a risoluzione piena attorno ai candidati.
|
||
|
||
Il training supporta una `mask` binaria per modellare solo una regione parziale
|
||
della ROI (modello non-rettangolare).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
import os
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from dataclasses import dataclass
|
||
|
||
import cv2
|
||
import numpy as np
|
||
|
||
_GOLDEN = (math.sqrt(5.0) - 1.0) / 2.0 # ≈ 0.618
|
||
|
||
from pm2d._jit_kernels import (
|
||
score_by_shift as _jit_score_by_shift,
|
||
score_bitmap as _jit_score_bitmap,
|
||
score_bitmap_rescored as _jit_score_bitmap_rescored,
|
||
score_bitmap_greedy as _jit_score_bitmap_greedy,
|
||
top_max_per_variant as _jit_top_max_per_variant,
|
||
popcount_density as _jit_popcount,
|
||
HAS_NUMBA,
|
||
)
|
||
|
||
N_BINS = 8 # default: orientamento mod π (no polarity)
|
||
N_BINS_POL = 16 # use_polarity=True: orientamento mod 2π (con polarity)
|
||
|
||
|
||
def opencl_available() -> bool:
|
||
"""Ritorna True se OpenCV ha backend OpenCL disponibile (GPU)."""
|
||
try:
|
||
return bool(cv2.ocl.haveOpenCL())
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def set_gpu_enabled(enabled: bool) -> bool:
|
||
"""Abilita/disabilita backend OpenCL globale di OpenCV.
|
||
|
||
Quando attivato, Sobel/dilate/warpAffine usano UMat con dispatch
|
||
automatico a kernel GPU (Intel UHD, AMD, NVIDIA via OpenCL ICD).
|
||
Speedup tipico: 1.5-3x su Sobel+dilate per scene 1920x1080,
|
||
overhead trascurabile per scene < 640px (transfer CPU<->GPU domina).
|
||
|
||
Halcon-equivalent: 'find_shape_model' con backend GPU integrato.
|
||
Ritorna True se l'attivazione e' riuscita.
|
||
"""
|
||
if not opencl_available():
|
||
return False
|
||
cv2.ocl.setUseOpenCL(bool(enabled))
|
||
return cv2.ocl.useOpenCL()
|
||
|
||
|
||
def _poly_iou(p1: np.ndarray, p2: np.ndarray) -> float:
|
||
"""IoU tra due poligoni convessi (4 vertici, float32) via cv2.intersectConvexConvex.
|
||
|
||
Usa OpenCV (cv2.intersectConvexConvex) per intersezione esatta:
|
||
ritorna area intersezione / area unione. Robusto a rotazioni
|
||
qualsiasi (anti-orarie/orarie) - cv2 normalizza orientamento.
|
||
"""
|
||
a1 = float(cv2.contourArea(p1))
|
||
a2 = float(cv2.contourArea(p2))
|
||
if a1 <= 0 or a2 <= 0:
|
||
return 0.0
|
||
inter_area, _ = cv2.intersectConvexConvex(
|
||
p1.astype(np.float32), p2.astype(np.float32),
|
||
)
|
||
inter_area = float(inter_area)
|
||
if inter_area <= 0:
|
||
return 0.0
|
||
union = a1 + a2 - inter_area
|
||
return inter_area / union if union > 0 else 0.0
|
||
|
||
|
||
def _oriented_bbox_polygon(
|
||
cx: float, cy: float, w: float, h: float, angle_deg: float,
|
||
) -> np.ndarray:
|
||
"""Ritorna 4 vertici (float32, shape (4,2)) del bbox orientato.
|
||
|
||
Convenzione coerente con cv2.getRotationMatrix2D usato nel train:
|
||
rotazione counter-clockwise (matematica) ma sistema immagine y-down,
|
||
quindi visivamente orario.
|
||
"""
|
||
w2, h2 = w / 2.0, h / 2.0
|
||
# Vertici template non-ruotato centrati al centro
|
||
corners = np.array([[-w2, -h2], [w2, -h2], [w2, h2], [-w2, h2]], np.float32)
|
||
a = np.deg2rad(angle_deg)
|
||
c, s = np.cos(a), np.sin(a)
|
||
# cv2.getRotationMatrix2D con angolo a positivo applica R = [[c,s],[-s,c]]
|
||
# (ruota counter-clockwise nel sistema matematico; y-down → orario)
|
||
R = np.array([[c, s], [-s, c]], np.float32)
|
||
rotated = corners @ R.T
|
||
rotated[:, 0] += cx
|
||
rotated[:, 1] += cy
|
||
return rotated
|
||
|
||
|
||
@dataclass
|
||
class Match:
|
||
cx: float
|
||
cy: float
|
||
angle_deg: float
|
||
scale: float
|
||
score: float
|
||
bbox_poly: np.ndarray # (4, 2) float32 - 4 vertici ordinati (ruotato)
|
||
|
||
|
||
@dataclass
|
||
class _LevelFeatures:
|
||
"""Feature piramidate (livello l = downsample /2^l)."""
|
||
dx: np.ndarray # int32
|
||
dy: np.ndarray # int32
|
||
bin: np.ndarray # int8
|
||
n: int
|
||
|
||
|
||
@dataclass
|
||
class _Variant:
|
||
"""Template precomputato (una pose)."""
|
||
angle_deg: float
|
||
scale: float
|
||
# Feature piramide: levels[0] = full-res, levels[l] = /2^l
|
||
levels: list[_LevelFeatures]
|
||
# Bbox kernel (per visualizzazione / limiti ricerca)
|
||
kh: int
|
||
kw: int
|
||
cx_local: float # centro-modello dentro al bbox kernel
|
||
cy_local: float
|
||
# Indice template view (X feature - multi-template ensemble).
|
||
# 0 = template principale del train(); 1+ = view aggiunte via
|
||
# add_template_view(). Usato in _verify_ncc/_compute_recall per
|
||
# scegliere il template gray corretto per match.
|
||
view_idx: int = 0
|
||
|
||
|
||
class LineShapeMatcher:
|
||
"""Shape-based matcher linemod-style - Python/numpy, no SIMD."""
|
||
|
||
def __init__(
|
||
self,
|
||
num_features: int = 96,
|
||
weak_grad: float = 30.0,
|
||
strong_grad: float = 60.0,
|
||
angle_range_deg: tuple[float, float] = (0.0, 360.0),
|
||
angle_step_deg: float = 5.0,
|
||
scale_range: tuple[float, float] = (1.0, 1.0),
|
||
scale_step: float = 0.1,
|
||
spread_radius: int = 4,
|
||
min_feature_spacing: int = 3,
|
||
pyramid_levels: int = 2,
|
||
top_score_factor: float = 0.5,
|
||
n_threads: int | None = None,
|
||
use_polarity: bool = False,
|
||
use_gpu: bool = False,
|
||
) -> None:
|
||
self.num_features = num_features
|
||
self.weak_grad = weak_grad
|
||
self.strong_grad = strong_grad
|
||
self.angle_range_deg = angle_range_deg
|
||
self.angle_step_deg = angle_step_deg
|
||
self.scale_range = scale_range
|
||
self.scale_step = scale_step
|
||
self.spread_radius = spread_radius
|
||
self.min_feature_spacing = min_feature_spacing
|
||
self.pyramid_levels = max(1, pyramid_levels)
|
||
self.top_score_factor = top_score_factor
|
||
self.n_threads = n_threads or max(1, (os.cpu_count() or 2) - 1)
|
||
# Polarity-aware: 16 bin (orientamento mod 2π) usando bitmap uint16.
|
||
# Distingue edge "chiaro→scuro" da "scuro→chiaro" → 2x selettività.
|
||
# Usare quando background di scena varia (chiaro/scuro) e orientamento
|
||
# template e' direzionale.
|
||
self.use_polarity = use_polarity
|
||
self._n_bins = N_BINS_POL if use_polarity else N_BINS
|
||
# GPU offload per Sobel/dilate/warpAffine via cv2.UMat (OpenCL).
|
||
# Effettivo solo se opencl_available(); altrimenti silent fallback CPU.
|
||
self.use_gpu = bool(use_gpu and opencl_available())
|
||
if self.use_gpu:
|
||
cv2.ocl.setUseOpenCL(True)
|
||
|
||
self.variants: list[_Variant] = []
|
||
self.template_size: tuple[int, int] = (0, 0)
|
||
self.template_gray: np.ndarray | None = None
|
||
# Maschera usata in training (propagata al refine per coerenza).
|
||
self._train_mask: np.ndarray | None = None
|
||
# Multi-template ensemble (X feature): N view dello stesso pezzo
|
||
# (chiari/scuri, condizioni diverse). Template principale e' [0],
|
||
# view aggiunte via add_template_view() sono [1+]. Match restituisce
|
||
# la view che ha matchato meglio.
|
||
self._view_templates: list[tuple[np.ndarray, np.ndarray | None]] = []
|
||
|
||
# --- Helpers -------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def _to_gray(img: np.ndarray) -> np.ndarray:
|
||
if img.ndim == 3:
|
||
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
return img
|
||
|
||
def _gradient(self, gray) -> tuple[np.ndarray, np.ndarray]:
|
||
# Accetta np.ndarray o cv2.UMat (per path GPU OpenCL).
|
||
gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
|
||
gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
|
||
mag = cv2.magnitude(gx, gy)
|
||
# Quantizzazione orientation richiede CPU array (np ops): scarica
|
||
# da GPU se necessario.
|
||
if isinstance(gx, cv2.UMat):
|
||
gx = gx.get(); gy = gy.get(); mag = mag.get()
|
||
ang = np.arctan2(gy, gx) # [-π, π]
|
||
if self.use_polarity:
|
||
# Mod 2π: bin 0..15 codifica direzione + polarity edge.
|
||
ang_full = np.where(ang < 0, ang + 2.0 * np.pi, ang)
|
||
bins = np.floor(ang_full / (2.0 * np.pi) * N_BINS_POL).astype(np.int16)
|
||
bins = np.clip(bins, 0, N_BINS_POL - 1)
|
||
else:
|
||
ang_mod = np.where(ang < 0, ang + np.pi, ang)
|
||
bins = np.floor(ang_mod / np.pi * N_BINS).astype(np.int16)
|
||
bins = np.clip(bins, 0, N_BINS - 1)
|
||
return mag, bins
|
||
|
||
def _hysteresis_mask(self, mag: np.ndarray) -> np.ndarray:
|
||
"""Edge mask con hysteresis (Halcon Contrast='auto' two-threshold).
|
||
|
||
Procedura:
|
||
1. seed = pixel con mag >= strong_grad (edge nitidi)
|
||
2. weak = pixel con mag >= weak_grad (edge candidati)
|
||
3. Espande seed dentro weak via componenti connesse 8-vicini
|
||
|
||
Risultato: edge debole connesso a edge forte viene PROMOSSO a
|
||
feature valida; edge debole isolato (rumore) viene SCARTATO.
|
||
|
||
Riduce sia falsi-positivi (rumore puro) sia falsi-negativi
|
||
(continuita' interrotta su edge sottili a basso contrasto).
|
||
"""
|
||
weak = (mag >= self.weak_grad).astype(np.uint8)
|
||
strong = (mag >= self.strong_grad).astype(np.uint8)
|
||
# connectedComponentsWithStats su weak: per ogni componente,
|
||
# se contiene almeno un pixel strong → tutto componente accettato
|
||
n_lab, labels = cv2.connectedComponents(weak, connectivity=8)
|
||
if n_lab <= 1:
|
||
return strong.astype(bool)
|
||
# Label dei pixel strong: marker per componenti da accettare
|
||
strong_labels = np.unique(labels[strong > 0])
|
||
strong_labels = strong_labels[strong_labels > 0] # 0 = bg
|
||
if len(strong_labels) == 0:
|
||
return strong.astype(bool)
|
||
# Mask = appartiene a label di componente "promosso"
|
||
keep = np.isin(labels, strong_labels)
|
||
return keep
|
||
|
||
def _extract_features(
|
||
self, mag: np.ndarray, bins: np.ndarray, mask: np.ndarray | None,
|
||
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
if mask is not None:
|
||
mag = np.where(mask > 0, mag, 0)
|
||
# Halcon-style edge selection: hysteresis tra weak_grad e strong_grad.
|
||
# Edge weak connessi a edge strong sono inclusi (continuita' bordi).
|
||
# Se weak_grad >= strong_grad → fallback a soglia singola strong.
|
||
if self.weak_grad < self.strong_grad:
|
||
edge = self._hysteresis_mask(mag)
|
||
else:
|
||
edge = mag >= self.strong_grad
|
||
ys, xs = np.where(edge)
|
||
if len(xs) == 0:
|
||
return (np.zeros(0, np.int32),) * 3
|
||
vals = mag[ys, xs]
|
||
order = np.argsort(-vals)
|
||
spc = max(1, self.min_feature_spacing)
|
||
occupied = np.zeros(mag.shape, dtype=bool)
|
||
picked_x: list[int] = []
|
||
picked_y: list[int] = []
|
||
picked_b: list[int] = []
|
||
for idx in order:
|
||
y, x = int(ys[idx]), int(xs[idx])
|
||
if occupied[y, x]:
|
||
continue
|
||
picked_x.append(x); picked_y.append(y)
|
||
picked_b.append(int(bins[y, x]))
|
||
y0 = max(0, y - spc); y1 = min(mag.shape[0], y + spc + 1)
|
||
x0 = max(0, x - spc); x1 = min(mag.shape[1], x + spc + 1)
|
||
occupied[y0:y1, x0:x1] = True
|
||
if len(picked_x) >= self.num_features:
|
||
break
|
||
return (np.array(picked_x, np.int32),
|
||
np.array(picked_y, np.int32),
|
||
np.array(picked_b, np.int8))
|
||
|
||
# --- Save / Load (Halcon-style write_shape_model / read_shape_model)
|
||
|
||
def save_model(self, path: str) -> None:
|
||
"""Salva matcher addestrato su disco (formato .npz).
|
||
|
||
Persiste: parametri, template_gray, mask, e tutte le varianti
|
||
pre-computate (con piramide). Halcon-equivalent write_shape_model.
|
||
Caso d'uso: training offline su workstation, deploy su macchina
|
||
di linea senza re-train (zero secondi di startup matching).
|
||
"""
|
||
if not self.variants:
|
||
raise RuntimeError("Modello non addestrato: chiamare train() prima.")
|
||
# Flatten varianti in array piatti (npz non ama dataclass nested)
|
||
n_vars = len(self.variants)
|
||
n_levels = len(self.variants[0].levels)
|
||
var_meta = np.zeros((n_vars, 6), dtype=np.float32) # ang, scale, kh, kw, cxl, cyl
|
||
all_dx, all_dy, all_bin, all_offsets = [], [], [], []
|
||
offset = 0
|
||
all_offsets_per_level = [[] for _ in range(n_levels)]
|
||
all_dx_per_level = [[] for _ in range(n_levels)]
|
||
all_dy_per_level = [[] for _ in range(n_levels)]
|
||
all_bin_per_level = [[] for _ in range(n_levels)]
|
||
for vi, var in enumerate(self.variants):
|
||
var_meta[vi] = (
|
||
var.angle_deg, var.scale, var.kh, var.kw,
|
||
var.cx_local, var.cy_local,
|
||
)
|
||
for li, lvl in enumerate(var.levels):
|
||
all_offsets_per_level[li].append(len(all_dx_per_level[li]))
|
||
all_dx_per_level[li].extend(lvl.dx.tolist())
|
||
all_dy_per_level[li].extend(lvl.dy.tolist())
|
||
all_bin_per_level[li].extend(lvl.bin.tolist())
|
||
for li in range(n_levels):
|
||
all_offsets_per_level[li].append(len(all_dx_per_level[li]))
|
||
|
||
out = {
|
||
"_format_version": np.array([1], dtype=np.int32),
|
||
"params": np.array([
|
||
self.num_features, self.weak_grad, self.strong_grad,
|
||
self.angle_range_deg[0], self.angle_range_deg[1],
|
||
self.angle_step_deg,
|
||
self.scale_range[0], self.scale_range[1], self.scale_step,
|
||
self.spread_radius, self.min_feature_spacing,
|
||
self.pyramid_levels, self.top_score_factor,
|
||
int(self.use_polarity),
|
||
], dtype=np.float64),
|
||
"template_gray": self.template_gray,
|
||
"train_mask": self._train_mask,
|
||
"var_meta": var_meta,
|
||
"n_levels": np.array([n_levels], dtype=np.int32),
|
||
}
|
||
for li in range(n_levels):
|
||
out[f"dx_l{li}"] = np.asarray(all_dx_per_level[li], dtype=np.int32)
|
||
out[f"dy_l{li}"] = np.asarray(all_dy_per_level[li], dtype=np.int32)
|
||
out[f"bin_l{li}"] = np.asarray(all_bin_per_level[li], dtype=np.int8)
|
||
out[f"offsets_l{li}"] = np.asarray(all_offsets_per_level[li], dtype=np.int32)
|
||
np.savez_compressed(path, **out)
|
||
|
||
@classmethod
|
||
def load_model(cls, path: str) -> "LineShapeMatcher":
|
||
"""Carica matcher pre-addestrato da .npz salvato con save_model.
|
||
|
||
Halcon-equivalent read_shape_model. Bypassa completamente train():
|
||
deploy production = istantaneo.
|
||
"""
|
||
data = np.load(path, allow_pickle=False)
|
||
params = data["params"]
|
||
m = cls(
|
||
num_features=int(params[0]),
|
||
weak_grad=float(params[1]),
|
||
strong_grad=float(params[2]),
|
||
angle_range_deg=(float(params[3]), float(params[4])),
|
||
angle_step_deg=float(params[5]),
|
||
scale_range=(float(params[6]), float(params[7])),
|
||
scale_step=float(params[8]),
|
||
spread_radius=int(params[9]),
|
||
min_feature_spacing=int(params[10]),
|
||
pyramid_levels=int(params[11]),
|
||
top_score_factor=float(params[12]),
|
||
use_polarity=bool(int(params[13])),
|
||
)
|
||
tpl = data["template_gray"]
|
||
if tpl.ndim > 0 and tpl.size > 0:
|
||
m.template_gray = tpl
|
||
m.template_size = (tpl.shape[1], tpl.shape[0])
|
||
mk = data["train_mask"]
|
||
m._train_mask = mk if mk.size > 0 else None
|
||
var_meta = data["var_meta"]
|
||
n_levels = int(data["n_levels"][0])
|
||
offsets_l = [data[f"offsets_l{li}"] for li in range(n_levels)]
|
||
dx_l = [data[f"dx_l{li}"] for li in range(n_levels)]
|
||
dy_l = [data[f"dy_l{li}"] for li in range(n_levels)]
|
||
bin_l = [data[f"bin_l{li}"] for li in range(n_levels)]
|
||
m.variants = []
|
||
n_vars = var_meta.shape[0]
|
||
for vi in range(n_vars):
|
||
ang, scale, kh, kw, cxl, cyl = var_meta[vi]
|
||
levels = []
|
||
for li in range(n_levels):
|
||
i0 = int(offsets_l[li][vi])
|
||
i1 = int(offsets_l[li][vi + 1])
|
||
levels.append(_LevelFeatures(
|
||
dx=dx_l[li][i0:i1].copy(),
|
||
dy=dy_l[li][i0:i1].copy(),
|
||
bin=bin_l[li][i0:i1].copy(),
|
||
n=i1 - i0,
|
||
))
|
||
m.variants.append(_Variant(
|
||
angle_deg=float(ang), scale=float(scale),
|
||
levels=levels, kh=int(kh), kw=int(kw),
|
||
cx_local=float(cxl), cy_local=float(cyl),
|
||
))
|
||
return m
|
||
|
||
def set_angle_range_around(
|
||
self, center_deg: float, tolerance_deg: float,
|
||
) -> None:
|
||
"""Restringe angle_range a (center - tol, center + tol).
|
||
|
||
Comodo helper per scenari in cui l'orientamento del pezzo e'
|
||
noto a priori entro ±tolerance_deg (es. feeder vibrante con
|
||
guida meccanica). Riduce drasticamente le varianti generate
|
||
in train(): es. ±15° vs 360° = 24x meno varianti, training
|
||
e matching molto piu veloci.
|
||
|
||
Esempio:
|
||
m.set_angle_range_around(0, 20) # cerca solo in [-20, +20]
|
||
m.train(template)
|
||
"""
|
||
self.angle_range_deg = (
|
||
float(center_deg - tolerance_deg),
|
||
float(center_deg + tolerance_deg),
|
||
)
|
||
|
||
def _scale_list(self) -> list[float]:
|
||
s0, s1 = self.scale_range
|
||
if s0 >= s1 or self.scale_step <= 0:
|
||
return [float(s0)]
|
||
n = int(np.floor((s1 - s0) / self.scale_step)) + 1
|
||
return [float(s0 + i * self.scale_step) for i in range(n)]
|
||
|
||
def _auto_angle_step(self) -> float:
|
||
"""Step angolare derivato da dimensione template (Halcon-style).
|
||
|
||
Formula: step ≈ atan(2 / max_side) gradi. Garantisce che la
|
||
rotazione minima produca uno spostamento di ≥2 px sul perimetro
|
||
del template (sotto sample il matching coarse perde candidati).
|
||
Clampato in [0.5°, 10°].
|
||
"""
|
||
max_side = max(self.template_size) if self.template_size != (0, 0) else 64
|
||
step = math.degrees(math.atan2(2.0, float(max_side)))
|
||
return float(np.clip(step, 0.5, 10.0))
|
||
|
||
def _effective_angle_step(self) -> float:
|
||
"""Risolve angle_step_deg gestendo modalità auto (<=0)."""
|
||
if self.angle_step_deg <= 0:
|
||
return self._auto_angle_step()
|
||
return self.angle_step_deg
|
||
|
||
def _angle_list(self) -> list[float]:
|
||
a0, a1 = self.angle_range_deg
|
||
step = self._effective_angle_step()
|
||
if step <= 0 or a0 >= a1:
|
||
return [float(a0)]
|
||
n = int(np.floor((a1 - a0) / step))
|
||
return [float(a0 + i * step) for i in range(n)]
|
||
|
||
# --- Training ------------------------------------------------------
|
||
|
||
def _build_pyramid_features(
|
||
self, dx: np.ndarray, dy: np.ndarray, bin_: np.ndarray,
|
||
) -> list[_LevelFeatures]:
|
||
"""Piramide feature precomputata: livello l = /2^l con dedup."""
|
||
levels = [_LevelFeatures(dx=dx.copy(), dy=dy.copy(), bin=bin_.copy(),
|
||
n=len(dx))]
|
||
for lvl in range(1, self.pyramid_levels):
|
||
sf = 2 ** lvl
|
||
dx_l = (dx // sf).astype(np.int32)
|
||
dy_l = (dy // sf).astype(np.int32)
|
||
# Dedup: rimuove feature collassate sullo stesso (dx, dy, bin)
|
||
key = ((dx_l.astype(np.int64) << 24)
|
||
| (dy_l.astype(np.int64) << 8)
|
||
| bin_.astype(np.int64))
|
||
_, uniq = np.unique(key, return_index=True)
|
||
levels.append(_LevelFeatures(
|
||
dx=dx_l[uniq], dy=dy_l[uniq], bin=bin_[uniq], n=len(uniq),
|
||
))
|
||
return levels
|
||
|
||
def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int:
|
||
"""Genera varianti rotate+scalate con feature sparse + piramide."""
|
||
gray = self._to_gray(template_bgr)
|
||
h, w = gray.shape
|
||
self.template_size = (w, h)
|
||
self.template_gray = gray.copy()
|
||
if mask is None:
|
||
mask_full = np.full((h, w), 255, dtype=np.uint8)
|
||
else:
|
||
mask_full = (mask > 0).astype(np.uint8) * 255
|
||
self._train_mask = mask_full.copy()
|
||
|
||
self.variants.clear()
|
||
# Reset view list: template principale = view 0
|
||
self._view_templates = [(gray.copy(), mask_full.copy())]
|
||
# Invalida cache: template/param cambiati → spread/feature obsoleti.
|
||
self._refine_feat_cache = {}
|
||
if hasattr(self, "_scene_cache"):
|
||
self._scene_cache.clear()
|
||
self._build_variants_for_view(gray, mask_full, view_idx=0)
|
||
self._dedup_variants()
|
||
return len(self.variants)
|
||
|
||
def add_template_view(
|
||
self, template_bgr: np.ndarray, mask: np.ndarray | None = None,
|
||
) -> int:
|
||
"""Aggiunge una view template extra all'ensemble (Halcon-style
|
||
create_aniso_shape_model con fusione N viste).
|
||
|
||
Genera varianti del nuovo template con stessi parametri (range
|
||
angle/scale) e le APPENDE a self.variants. NCC/recall usano
|
||
automaticamente il template della view che ha matchato.
|
||
|
||
Use case: pezzo che cambia aspetto (chiaro/scuro, prima/dopo
|
||
trattamento, illuminazioni diverse) → un solo matcher resistente.
|
||
|
||
Ritorna numero TOTALE varianti dopo l'aggiunta. Le view sono
|
||
indicizzate da 1 in poi (0 e' il template del train).
|
||
"""
|
||
if not self.variants:
|
||
raise RuntimeError(
|
||
"Chiamare train(template_principale) prima di add_template_view")
|
||
gray = self._to_gray(template_bgr)
|
||
h, w = gray.shape
|
||
if (w, h) != self.template_size:
|
||
# Resize per coerenza con bbox/poly
|
||
gray = cv2.resize(gray, self.template_size, interpolation=cv2.INTER_LINEAR)
|
||
if mask is not None:
|
||
mask = cv2.resize(mask, self.template_size, interpolation=cv2.INTER_NEAREST)
|
||
if mask is None:
|
||
mask_full = np.full(gray.shape, 255, dtype=np.uint8)
|
||
else:
|
||
mask_full = (mask > 0).astype(np.uint8) * 255
|
||
view_idx = len(self._view_templates)
|
||
self._view_templates.append((gray.copy(), mask_full.copy()))
|
||
n_before = len(self.variants)
|
||
self._build_variants_for_view(gray, mask_full, view_idx=view_idx)
|
||
self._dedup_variants()
|
||
return len(self.variants) - n_before
|
||
|
||
def _build_variants_for_view(
|
||
self, gray: np.ndarray, mask_full: np.ndarray, view_idx: int,
|
||
) -> None:
|
||
"""Estrae varianti rotate+scalate per UNA view template.
|
||
|
||
Estrazione algorithm identica al train() originale, separato per
|
||
riuso da add_template_view (multi-template ensemble).
|
||
"""
|
||
h, w = gray.shape
|
||
for s in self._scale_list():
|
||
sw = max(16, int(round(w * s)))
|
||
sh = max(16, int(round(h * s)))
|
||
gray_s = cv2.resize(gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
|
||
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
|
||
diag = int(np.ceil(np.hypot(sh, sw))) + 6
|
||
py = (diag - sh) // 2
|
||
px = (diag - sw) // 2
|
||
gray_p = cv2.copyMakeBorder(
|
||
gray_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_REPLICATE,
|
||
)
|
||
mask_p = cv2.copyMakeBorder(
|
||
mask_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_CONSTANT, value=0,
|
||
)
|
||
center = (diag / 2.0, diag / 2.0)
|
||
|
||
for ang in self._angle_list():
|
||
M = cv2.getRotationMatrix2D(center, ang, 1.0)
|
||
gray_r = cv2.warpAffine(
|
||
gray_p, M, (diag, diag),
|
||
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE,
|
||
)
|
||
mask_r = cv2.warpAffine(
|
||
mask_p, M, (diag, diag),
|
||
flags=cv2.INTER_NEAREST, borderValue=0,
|
||
)
|
||
mag, bins = self._gradient(gray_r)
|
||
fx, fy, fb = self._extract_features(mag, bins, mask_r)
|
||
if len(fx) < 8:
|
||
continue
|
||
|
||
cx_c = diag / 2.0
|
||
cy_c = diag / 2.0
|
||
dx = (fx - cx_c).astype(np.int32)
|
||
dy = (fy - cy_c).astype(np.int32)
|
||
|
||
x0 = int(dx.min()); x1 = int(dx.max())
|
||
y0 = int(dy.min()); y1 = int(dy.max())
|
||
kw = x1 - x0 + 1
|
||
kh = y1 - y0 + 1
|
||
cx_local = -x0
|
||
cy_local = -y0
|
||
|
||
levels = self._build_pyramid_features(dx, dy, fb)
|
||
|
||
self.variants.append(_Variant(
|
||
angle_deg=float(ang),
|
||
scale=float(s),
|
||
levels=levels,
|
||
kh=kh, kw=kw,
|
||
cx_local=float(cx_local), cy_local=float(cy_local),
|
||
view_idx=view_idx,
|
||
))
|
||
|
||
def _dedup_variants(self) -> int:
|
||
"""Rimuove varianti con feature-set identico (post-quantizzazione).
|
||
|
||
Halcon-style: con angle range = (0, 360) e simmetrie del template,
|
||
molte rotazioni producono lo stesso set quantizzato di feature.
|
||
Es: quadrato a 0/90/180/270 deg → stesse features (modulo permutazione).
|
||
Hash su feature ordinate (livello 0, full-res) elimina i duplicati.
|
||
|
||
Vantaggio: meno varianti = meno chiamate kernel JIT al top-level
|
||
senza perdere copertura angolare effettiva. Per template asimmetrici
|
||
non rimuove nulla.
|
||
"""
|
||
seen: dict[bytes, int] = {}
|
||
kept: list[_Variant] = []
|
||
removed = 0
|
||
for var in self.variants:
|
||
lvl0 = var.levels[0]
|
||
order = np.lexsort((lvl0.bin, lvl0.dy, lvl0.dx))
|
||
key = (
|
||
lvl0.dx[order].tobytes()
|
||
+ b"|" + lvl0.dy[order].tobytes()
|
||
+ b"|" + lvl0.bin[order].tobytes()
|
||
+ b"|" + str(round(var.scale, 4)).encode()
|
||
)
|
||
h = key # diretto, senza hash crypto (collision ok solo se identici)
|
||
if h in seen:
|
||
removed += 1
|
||
continue
|
||
seen[h] = len(kept)
|
||
kept.append(var)
|
||
self.variants = kept
|
||
return removed
|
||
|
||
# --- Matching ------------------------------------------------------
|
||
|
||
def _response_map(self, gray: np.ndarray) -> np.ndarray:
|
||
"""Response map shape (N_BINS, H, W) float32 (legacy path)."""
|
||
mag, bins = self._gradient(gray)
|
||
valid = mag >= self.weak_grad
|
||
k = 2 * self.spread_radius + 1
|
||
kernel = np.ones((k, k), dtype=np.uint8)
|
||
H, W = gray.shape
|
||
raw = np.zeros((N_BINS, H, W), dtype=np.float32)
|
||
for b in range(N_BINS):
|
||
mask_b = ((bins == b) & valid).astype(np.uint8)
|
||
d = cv2.dilate(mask_b, kernel)
|
||
raw[b] = d.astype(np.float32)
|
||
return raw
|
||
|
||
# --- Scene precompute cache (II Halcon-style) -----------------------
|
||
_SCENE_CACHE_SIZE = 4
|
||
|
||
def _scene_cache_key(self, gray: np.ndarray) -> str | None:
|
||
"""Hash compatto della scena + param che influenzano spread/density.
|
||
|
||
Hash su prime 64KB della scena (sufficiente discriminante per
|
||
scene fotografiche) + parametri matcher rilevanti. None se cache
|
||
disabilitata (es. scene troppo piccole).
|
||
"""
|
||
if gray.size < 100:
|
||
return None
|
||
try:
|
||
import hashlib
|
||
h = hashlib.md5()
|
||
sample = gray.tobytes()[:65536]
|
||
h.update(sample)
|
||
h.update(f"|{gray.shape}|{gray.dtype}".encode())
|
||
h.update(
|
||
f"|{self.weak_grad}|{self.strong_grad}"
|
||
f"|{self.spread_radius}|{self._n_bins}"
|
||
f"|{self.pyramid_levels}".encode()
|
||
)
|
||
return h.hexdigest()
|
||
except Exception:
|
||
return None
|
||
|
||
def _scene_cache_get(self, key: str) -> tuple | None:
|
||
cache = getattr(self, "_scene_cache", None)
|
||
if cache is None:
|
||
return None
|
||
v = cache.get(key)
|
||
if v is not None:
|
||
cache.move_to_end(key)
|
||
return v
|
||
|
||
def _scene_cache_put(self, key: str, value: tuple) -> None:
|
||
from collections import OrderedDict
|
||
if not hasattr(self, "_scene_cache"):
|
||
self._scene_cache = OrderedDict()
|
||
self._scene_cache[key] = value
|
||
self._scene_cache.move_to_end(key)
|
||
while len(self._scene_cache) > self._SCENE_CACHE_SIZE:
|
||
self._scene_cache.popitem(last=False)
|
||
|
||
def _spread_bitmap(self, gray: np.ndarray) -> np.ndarray:
|
||
"""Spread bitmap: bit b acceso dove bin b è presente nel raggio.
|
||
|
||
dtype: uint8 per N_BINS=8, uint16 per N_BINS_POL=16 (use_polarity).
|
||
Se use_gpu=True: Sobel + dilate via cv2.UMat (OpenCL kernel GPU).
|
||
"""
|
||
if self.use_gpu and not isinstance(gray, cv2.UMat):
|
||
gray_in = cv2.UMat(np.ascontiguousarray(gray))
|
||
else:
|
||
gray_in = gray
|
||
mag, bins = self._gradient(gray_in)
|
||
valid = mag >= self.weak_grad
|
||
k = 2 * self.spread_radius + 1
|
||
kernel = np.ones((k, k), dtype=np.uint8)
|
||
H, W = (gray.shape if isinstance(gray, np.ndarray)
|
||
else (gray.get().shape[0], gray.get().shape[1]))
|
||
nb = self._n_bins
|
||
dtype = np.uint16 if nb > 8 else np.uint8
|
||
spread = np.zeros((H, W), dtype=dtype)
|
||
for b in range(nb):
|
||
mask_b = ((bins == b) & valid).astype(np.uint8)
|
||
if self.use_gpu:
|
||
d = cv2.dilate(cv2.UMat(mask_b), kernel)
|
||
d_np = d.get()
|
||
else:
|
||
d_np = cv2.dilate(mask_b, kernel)
|
||
spread |= (d_np.astype(dtype) << b)
|
||
return spread
|
||
|
||
@staticmethod
|
||
def _score_by_shift(
|
||
resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray,
|
||
bin_has_data: np.ndarray | None = None,
|
||
) -> np.ndarray:
|
||
"""score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx).
|
||
|
||
Dispatch a kernel Numba JIT se disponibile, altrimenti fallback numpy.
|
||
"""
|
||
return _jit_score_by_shift(resp, dx, dy, bins, bin_has_data)
|
||
|
||
@staticmethod
|
||
def _subpixel_peak(
|
||
acc: np.ndarray, x: int, y: int, plateau_radius: int = 10,
|
||
) -> tuple[float, float]:
|
||
"""Posizione sub-pixel del picco.
|
||
|
||
1. Plateau saturo → centroide pesato del plateau (peso = score).
|
||
2. Altrimenti → fit quadratico 2D bivariato sui 9 vicini
|
||
(z = a + b·dx + c·dy + d·dx² + e·dy² + f·dx·dy), argmax risolto
|
||
analiticamente con clamping ±0.5 px.
|
||
"""
|
||
H, W = acc.shape
|
||
val = float(acc[y, x])
|
||
# Plateau detection: valori >= val - 0.01 entro raggio limitato
|
||
y0 = max(0, y - plateau_radius); y1 = min(H, y + plateau_radius + 1)
|
||
x0 = max(0, x - plateau_radius); x1 = min(W, x + plateau_radius + 1)
|
||
patch = acc[y0:y1, x0:x1]
|
||
plateau = patch >= val - 0.01
|
||
if plateau.sum() > 1:
|
||
# Centroide pesato per (score - (max-0.01))² per enfatizzare i top
|
||
weights = np.where(plateau, patch - (val - 0.01), 0.0).astype(np.float64)
|
||
weights = weights * weights
|
||
total = weights.sum()
|
||
if total > 1e-9:
|
||
ys_idx, xs_idx = np.indices(patch.shape)
|
||
cx_w = (xs_idx * weights).sum() / total
|
||
cy_w = (ys_idx * weights).sum() / total
|
||
return float(x0 + cx_w), float(y0 + cy_w)
|
||
ys_m, xs_m = np.where(plateau)
|
||
return float(x0 + xs_m.mean()), float(y0 + ys_m.mean())
|
||
# Fit quadratico 2D bivariato su 3x3 intorno
|
||
if x <= 0 or x >= W - 1 or y <= 0 or y >= H - 1:
|
||
return float(x), float(y)
|
||
# Stencil 3x3: Z[i, j] con i,j ∈ {-1, 0, +1}
|
||
Z = acc[y - 1:y + 2, x - 1:x + 2].astype(np.float64)
|
||
# Coefficienti da finite differences
|
||
b_c = (Z[1, 2] - Z[1, 0]) / 2.0
|
||
c_c = (Z[2, 1] - Z[0, 1]) / 2.0
|
||
d_c = (Z[1, 2] + Z[1, 0] - 2.0 * Z[1, 1]) / 2.0
|
||
e_c = (Z[2, 1] + Z[0, 1] - 2.0 * Z[1, 1]) / 2.0
|
||
f_c = (Z[2, 2] - Z[0, 2] - Z[2, 0] + Z[0, 0]) / 4.0
|
||
# Max: risolve [2d f; f 2e][dx;dy] = [-b;-c]
|
||
det = 4.0 * d_c * e_c - f_c * f_c
|
||
if abs(det) > 1e-9:
|
||
ox = (-2.0 * e_c * b_c + f_c * c_c) / det
|
||
oy = (-2.0 * d_c * c_c + f_c * b_c) / det
|
||
else:
|
||
# Fallback separabile
|
||
ox = -b_c / (2.0 * d_c) if abs(d_c) > 1e-6 else 0.0
|
||
oy = -c_c / (2.0 * e_c) if abs(e_c) > 1e-6 else 0.0
|
||
ox = float(np.clip(ox, -0.5, 0.5))
|
||
oy = float(np.clip(oy, -0.5, 0.5))
|
||
return x + ox, y + oy
|
||
|
||
def _refine_pose_joint(
|
||
self,
|
||
spread0: np.ndarray,
|
||
template_gray: np.ndarray,
|
||
cx: float, cy: float,
|
||
angle_deg: float, scale: float,
|
||
mask_full: np.ndarray,
|
||
max_iter: int = 24,
|
||
tol: float = 1e-3,
|
||
) -> tuple[float, float, float, float]:
|
||
"""Refine congiunto (cx, cy, angle) via Nelder-Mead 3D.
|
||
|
||
Ottimizza simultaneamente posizione e angolo (vs golden search 1D
|
||
sull'angolo poi quadratico 2D su xy che alterna assi). Halcon-style:
|
||
un singolo iter LM stila il match a precisione sub-pixel + sub-step.
|
||
Ritorna (angle, score, cx, cy) dove score e quello calcolato sulla
|
||
scena spread (no template gray).
|
||
"""
|
||
h, w = template_gray.shape
|
||
sw = max(16, int(round(w * scale)))
|
||
sh = max(16, int(round(h * scale)))
|
||
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
|
||
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
|
||
diag = int(np.ceil(np.hypot(sh, sw))) + 6
|
||
py = (diag - sh) // 2; px = (diag - sw) // 2
|
||
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_REPLICATE)
|
||
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_CONSTANT, value=0)
|
||
center = (diag / 2.0, diag / 2.0)
|
||
H, W = spread0.shape
|
||
|
||
def _score(params: tuple[float, float, float]) -> float:
|
||
ddx, ddy, dang = params
|
||
ang = angle_deg + dang
|
||
M = cv2.getRotationMatrix2D(center, ang, 1.0)
|
||
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
|
||
flags=cv2.INTER_NEAREST, borderValue=0)
|
||
mag, bins = self._gradient(gray_r)
|
||
fx, fy, fb = self._extract_features(mag, bins, mask_r)
|
||
if len(fx) < 8:
|
||
return 0.0
|
||
cxe = cx + ddx; cye = cy + ddy
|
||
ix = int(round(cxe)); iy = int(round(cye))
|
||
tot = 0
|
||
valid = 0
|
||
for i in range(len(fx)):
|
||
xs = ix + int(fx[i] - center[0])
|
||
ys = iy + int(fy[i] - center[1])
|
||
if 0 <= xs < W and 0 <= ys < H:
|
||
bit = np.uint8(1 << int(fb[i]))
|
||
if spread0[ys, xs] & bit:
|
||
tot += 1
|
||
valid += 1
|
||
return -float(tot) / max(1, valid) # minimize -score
|
||
|
||
# Nelder-Mead 3D inline (no scipy). Simplex iniziale: vertice + offset
|
||
# dx=±0.5px, dy=±0.5px, dθ=±step/2.
|
||
step_a = self.angle_step_deg / 2.0 if self.angle_step_deg > 0 else 1.0
|
||
x0 = np.array([0.0, 0.0, 0.0])
|
||
simplex = np.array([
|
||
x0,
|
||
x0 + [0.5, 0.0, 0.0],
|
||
x0 + [0.0, 0.5, 0.0],
|
||
x0 + [0.0, 0.0, step_a],
|
||
])
|
||
fvals = np.array([_score(tuple(s)) for s in simplex])
|
||
for _ in range(max_iter):
|
||
order = np.argsort(fvals)
|
||
simplex = simplex[order]; fvals = fvals[order]
|
||
if abs(fvals[-1] - fvals[0]) < tol:
|
||
break
|
||
centroid = simplex[:-1].mean(axis=0)
|
||
xr = centroid + 1.0 * (centroid - simplex[-1])
|
||
fr = _score(tuple(xr))
|
||
if fvals[0] <= fr < fvals[-2]:
|
||
simplex[-1] = xr; fvals[-1] = fr
|
||
continue
|
||
if fr < fvals[0]:
|
||
xe = centroid + 2.0 * (centroid - simplex[-1])
|
||
fe = _score(tuple(xe))
|
||
if fe < fr:
|
||
simplex[-1] = xe; fvals[-1] = fe
|
||
else:
|
||
simplex[-1] = xr; fvals[-1] = fr
|
||
continue
|
||
xc = centroid + 0.5 * (simplex[-1] - centroid)
|
||
fc = _score(tuple(xc))
|
||
if fc < fvals[-1]:
|
||
simplex[-1] = xc; fvals[-1] = fc
|
||
continue
|
||
for k in range(1, 4):
|
||
simplex[k] = simplex[0] + 0.5 * (simplex[k] - simplex[0])
|
||
fvals[k] = _score(tuple(simplex[k]))
|
||
best_i = int(np.argmin(fvals))
|
||
ddx, ddy, dang = simplex[best_i]
|
||
return (angle_deg + float(dang), -float(fvals[best_i]),
|
||
cx + float(ddx), cy + float(ddy))
|
||
|
||
def _refine_angle(
|
||
self,
|
||
spread0: np.ndarray, # bitmap uint8 (H, W)
|
||
bit_active: int,
|
||
template_gray: np.ndarray,
|
||
cx: float, cy: float,
|
||
angle_deg: float, scale: float,
|
||
mask_full: np.ndarray,
|
||
angle_fine_step: float = 0.5,
|
||
search_radius: float | None = None,
|
||
original_score: float | None = None,
|
||
) -> tuple[float, float, float, float]:
|
||
"""Ricerca angolare fine (sub-step) attorno al match grezzo.
|
||
|
||
Genera 5 template temporanei a angle ± {0.5, 1.0} * step e sceglie
|
||
l'angolo con score massimo (parabolic fit sulle 3 score centrali).
|
||
Ritorna (angle_refined, score, cx_refined, cy_refined).
|
||
"""
|
||
# NB: rimosso early-skip su score >= 0.99. Lo score linemod/shape
|
||
# satura facilmente a 1.0 (specie con pyramid_propagate o spread
|
||
# ampio) ma NON garantisce angolo preciso: l'angolo grezzo della
|
||
# variante e' quantizzato a multipli di angle_step (5 deg default).
|
||
# Refine angolare e' essenziale per orientamento sub-step.
|
||
if search_radius is None:
|
||
search_radius = self._effective_angle_step() / 2.0
|
||
|
||
h, w = template_gray.shape
|
||
sw = max(16, int(round(w * scale)))
|
||
sh = max(16, int(round(h * scale)))
|
||
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
|
||
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
|
||
diag = int(np.ceil(np.hypot(sh, sw))) + 6
|
||
py = (diag - sh) // 2; px = (diag - sw) // 2
|
||
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_REPLICATE)
|
||
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_CONSTANT, value=0)
|
||
center = (diag / 2.0, diag / 2.0)
|
||
|
||
H, W = spread0.shape
|
||
margin = 3
|
||
|
||
# Cache template features per angolo (chiave: int(round(ang*20)) =
|
||
# bucket di 0.05°). Golden-search ricontratto puo richiedere lo
|
||
# stesso bucket piu volte; evita re-warp+gradient+extract (costoso).
|
||
# Cache a livello matcher per riusare tra chiamate find() su scene
|
||
# diverse: la rotazione del template non dipende dalla scena.
|
||
if not hasattr(self, '_refine_feat_cache'):
|
||
self._refine_feat_cache = {}
|
||
feat_cache = self._refine_feat_cache
|
||
cache_scale_key = round(scale * 1000)
|
||
|
||
def _score_at_angle(off: float) -> tuple[float, float, float]:
|
||
"""Ritorna (score, best_cx, best_cy) per angolo = angle_deg + off."""
|
||
ang = angle_deg + off
|
||
ck = (round(ang * 20), cache_scale_key)
|
||
cached = feat_cache.get(ck)
|
||
if cached is not None:
|
||
fx, fy, fb = cached
|
||
else:
|
||
M = cv2.getRotationMatrix2D(center, ang, 1.0)
|
||
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
|
||
flags=cv2.INTER_NEAREST, borderValue=0)
|
||
mag, bins = self._gradient(gray_r)
|
||
fx, fy, fb = self._extract_features(mag, bins, mask_r)
|
||
# LRU semplice: limita cache a ~256 angoli (8 angoli * 32 candidati)
|
||
if len(feat_cache) > 256:
|
||
feat_cache.pop(next(iter(feat_cache)))
|
||
feat_cache[ck] = (fx, fy, fb)
|
||
if len(fx) < 8:
|
||
return (0.0, cx, cy)
|
||
dx = (fx - center[0]).astype(np.int32)
|
||
dy = (fy - center[1]).astype(np.int32)
|
||
y_lo = int(cy) - margin; y_hi = int(cy) + margin + 1
|
||
x_lo = int(cx) - margin; x_hi = int(cx) + margin + 1
|
||
sh_w = y_hi - y_lo; sw_w = x_hi - x_lo
|
||
acc = np.zeros((sh_w, sw_w), dtype=np.float32)
|
||
spread_dtype = spread0.dtype.type
|
||
for i in range(len(dx)):
|
||
ddx = int(dx[i]); ddy = int(dy[i]); b = int(fb[i])
|
||
bit = spread_dtype(1 << b)
|
||
sy0 = y_lo + ddy; sy1 = y_hi + ddy
|
||
sx0 = x_lo + ddx; sx1 = x_hi + ddx
|
||
a_y0 = max(0, -sy0); a_y1 = sh_w - max(0, sy1 - H)
|
||
a_x0 = max(0, -sx0); a_x1 = sw_w - max(0, sx1 - W)
|
||
s_y0 = max(0, sy0); s_y1 = min(H, sy1)
|
||
s_x0 = max(0, sx0); s_x1 = min(W, sx1)
|
||
if s_y1 > s_y0 and s_x1 > s_x0:
|
||
region = spread0[s_y0:s_y1, s_x0:s_x1]
|
||
acc[a_y0:a_y1, a_x0:a_x1] += (
|
||
(region & bit) != 0
|
||
).astype(np.float32)
|
||
acc /= len(dx)
|
||
_, max_val, _, max_loc = cv2.minMaxLoc(acc)
|
||
return (float(max_val),
|
||
float(x_lo + max_loc[0]), float(y_lo + max_loc[1]))
|
||
|
||
# Golden-section search su [-search_radius, +search_radius]:
|
||
# converge in log tempo a precisione ~0.1°, ~8 valutazioni vs 5
|
||
# ma centrate su picco reale (non sample equispaziati).
|
||
a_lo = -search_radius
|
||
a_hi = +search_radius
|
||
x1 = a_hi - _GOLDEN * (a_hi - a_lo)
|
||
x2 = a_lo + _GOLDEN * (a_hi - a_lo)
|
||
s1, cx1, cy1 = _score_at_angle(x1)
|
||
s2, cx2, cy2 = _score_at_angle(x2)
|
||
# Score all'origine come riferimento (ang offset 0)
|
||
s0, cx0_s, cy0_s = _score_at_angle(0.0)
|
||
best = (angle_deg, s0, cx0_s, cy0_s)
|
||
tol = 0.1 # gradi
|
||
for _ in range(8):
|
||
if s1 > best[1]:
|
||
best = (angle_deg + x1, s1, cx1, cy1)
|
||
if s2 > best[1]:
|
||
best = (angle_deg + x2, s2, cx2, cy2)
|
||
if abs(a_hi - a_lo) < tol:
|
||
break
|
||
if s1 > s2:
|
||
a_hi = x2
|
||
x2 = x1; s2 = s1; cx2 = cx1; cy2 = cy1
|
||
x1 = a_hi - _GOLDEN * (a_hi - a_lo)
|
||
s1, cx1, cy1 = _score_at_angle(x1)
|
||
else:
|
||
a_lo = x1
|
||
x1 = x2; s1 = s2; cx1 = cx2; cy1 = cy2
|
||
x2 = a_lo + _GOLDEN * (a_hi - a_lo)
|
||
s2, cx2, cy2 = _score_at_angle(x2)
|
||
return best
|
||
|
||
def _get_view_template(
|
||
self, view_idx: int,
|
||
) -> tuple[np.ndarray | None, np.ndarray | None]:
|
||
"""Ritorna (template_gray, mask) per la view specificata.
|
||
|
||
view_idx 0 = template principale (train), 1+ = view extra
|
||
aggiunte via add_template_view. Usato per scegliere il template
|
||
corretto in NCC/recall verification quando il matcher e'
|
||
ensemble multi-template.
|
||
"""
|
||
if 0 <= view_idx < len(self._view_templates):
|
||
return self._view_templates[view_idx]
|
||
return self.template_gray, self._train_mask
|
||
|
||
def _compute_recall(
|
||
self, spread0: np.ndarray, variant: _Variant,
|
||
cx: float, cy: float, angle_deg: float,
|
||
) -> float:
|
||
"""Frazione di feature template che combaciano nello spread scena
|
||
alla pose. Halcon-equivalent: MinScore originale.
|
||
"""
|
||
if self.template_gray is None:
|
||
return 1.0
|
||
h, w = self.template_gray.shape
|
||
scale = variant.scale
|
||
sw = max(16, int(round(w * scale)))
|
||
sh = max(16, int(round(h * scale)))
|
||
gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
|
||
mask_src = (
|
||
self._train_mask if self._train_mask is not None
|
||
else np.full_like(self.template_gray, 255)
|
||
)
|
||
mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST)
|
||
diag = int(np.ceil(np.hypot(sh, sw))) + 6
|
||
py = (diag - sh) // 2; px = (diag - sw) // 2
|
||
gray_p = cv2.copyMakeBorder(
|
||
gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE,
|
||
)
|
||
mask_p = cv2.copyMakeBorder(
|
||
mask_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_CONSTANT, value=0,
|
||
)
|
||
center = (diag / 2.0, diag / 2.0)
|
||
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
|
||
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
|
||
flags=cv2.INTER_NEAREST, borderValue=0)
|
||
mag, bins = self._gradient(gray_r)
|
||
fx, fy, fb = self._extract_features(mag, bins, mask_r)
|
||
n_feat = len(fx)
|
||
if n_feat < 4:
|
||
return 0.0
|
||
H, W = spread0.shape
|
||
spread_dtype = spread0.dtype.type
|
||
ix = int(round(cx)); iy = int(round(cy))
|
||
hits = 0
|
||
for i in range(n_feat):
|
||
xs = ix + int(fx[i] - center[0])
|
||
ys = iy + int(fy[i] - center[1])
|
||
if 0 <= xs < W and 0 <= ys < H:
|
||
bit = spread_dtype(1 << int(fb[i]))
|
||
if spread0[ys, xs] & bit:
|
||
hits += 1
|
||
return hits / n_feat
|
||
|
||
def _compute_soft_score(
|
||
self, scene_gray: np.ndarray, variant: _Variant,
|
||
cx: float, cy: float, angle_deg: float,
|
||
) -> float:
|
||
"""Soft-margin gradient similarity (Halcon Metric='use_polarity')."""
|
||
if self.template_gray is None:
|
||
return 0.0
|
||
h, w = self.template_gray.shape
|
||
scale = variant.scale
|
||
sw = max(16, int(round(w * scale)))
|
||
sh = max(16, int(round(h * scale)))
|
||
gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
|
||
mask_src = (
|
||
self._train_mask if self._train_mask is not None
|
||
else np.full_like(self.template_gray, 255)
|
||
)
|
||
mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST)
|
||
diag = int(np.ceil(np.hypot(sh, sw))) + 6
|
||
py = (diag - sh) // 2; px = (diag - sw) // 2
|
||
gray_p = cv2.copyMakeBorder(
|
||
gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE,
|
||
)
|
||
mask_p = cv2.copyMakeBorder(
|
||
mask_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_CONSTANT, value=0,
|
||
)
|
||
center = (diag / 2.0, diag / 2.0)
|
||
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
|
||
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
|
||
flags=cv2.INTER_NEAREST, borderValue=0)
|
||
gx_t = cv2.Sobel(gray_r, cv2.CV_32F, 1, 0, ksize=3)
|
||
gy_t = cv2.Sobel(gray_r, cv2.CV_32F, 0, 1, ksize=3)
|
||
mag_t = cv2.magnitude(gx_t, gy_t)
|
||
_, bins_t = self._gradient(gray_r)
|
||
fx, fy, _ = self._extract_features(mag_t, bins_t, mask_r)
|
||
if len(fx) < 4:
|
||
return 0.0
|
||
gx_s = cv2.Sobel(scene_gray, cv2.CV_32F, 1, 0, ksize=3)
|
||
gy_s = cv2.Sobel(scene_gray, cv2.CV_32F, 0, 1, ksize=3)
|
||
H, W = scene_gray.shape
|
||
ix = int(round(cx)); iy = int(round(cy))
|
||
sims = []; weights = []
|
||
for i in range(len(fx)):
|
||
xs = ix + int(fx[i] - center[0])
|
||
ys = iy + int(fy[i] - center[1])
|
||
if not (0 <= xs < W and 0 <= ys < H):
|
||
continue
|
||
tx = float(gx_t[int(fy[i]), int(fx[i])])
|
||
ty = float(gy_t[int(fy[i]), int(fx[i])])
|
||
sx = float(gx_s[ys, xs]); sy = float(gy_s[ys, xs])
|
||
tm = math.hypot(tx, ty); sm = math.hypot(sx, sy)
|
||
if tm < 1e-3 or sm < 1e-3:
|
||
continue
|
||
cos_sim = (tx * sx + ty * sy) / (tm * sm)
|
||
cos_sim = max(0.0, cos_sim) if self.use_polarity else abs(cos_sim)
|
||
sims.append(cos_sim); weights.append(min(sm, 255.0))
|
||
if not sims:
|
||
return 0.0
|
||
sims_arr = np.asarray(sims, dtype=np.float32)
|
||
w_arr = np.asarray(weights, dtype=np.float32)
|
||
return float((sims_arr * w_arr).sum() / (w_arr.sum() + 1e-9))
|
||
|
||
def _subpixel_refine_lm(
|
||
self, scene_gray: np.ndarray, variant: _Variant,
|
||
cx: float, cy: float, angle_deg: float,
|
||
n_iters: int = 2,
|
||
) -> tuple[float, float]:
|
||
"""Sub-pixel refinement iterativo via gradient-field least-squares.
|
||
|
||
Halcon-equivalent SubPixel='least_squares_high'. Precisione attesa
|
||
0.05 px (vs 0.5 px del fit quadratic 2D).
|
||
"""
|
||
if self.template_gray is None:
|
||
return cx, cy
|
||
h, w = self.template_gray.shape
|
||
scale = variant.scale
|
||
sw = max(16, int(round(w * scale)))
|
||
sh = max(16, int(round(h * scale)))
|
||
gray_s = cv2.resize(self.template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
|
||
mask_src = (
|
||
self._train_mask if self._train_mask is not None
|
||
else np.full_like(self.template_gray, 255)
|
||
)
|
||
mask_s = cv2.resize(mask_src, (sw, sh), interpolation=cv2.INTER_NEAREST)
|
||
diag = int(np.ceil(np.hypot(sh, sw))) + 6
|
||
py = (diag - sh) // 2; px = (diag - sw) // 2
|
||
gray_p = cv2.copyMakeBorder(
|
||
gray_s, py, diag - sh - py, px, diag - sw - px, cv2.BORDER_REPLICATE,
|
||
)
|
||
mask_p = cv2.copyMakeBorder(
|
||
mask_s, py, diag - sh - py, px, diag - sw - px,
|
||
cv2.BORDER_CONSTANT, value=0,
|
||
)
|
||
center = (diag / 2.0, diag / 2.0)
|
||
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
|
||
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
|
||
flags=cv2.INTER_NEAREST, borderValue=0)
|
||
gx_t = cv2.Sobel(gray_r, cv2.CV_32F, 1, 0, ksize=3)
|
||
gy_t = cv2.Sobel(gray_r, cv2.CV_32F, 0, 1, ksize=3)
|
||
mag_t = cv2.magnitude(gx_t, gy_t)
|
||
_, bins_t = self._gradient(gray_r)
|
||
fx, fy, _ = self._extract_features(mag_t, bins_t, mask_r)
|
||
if len(fx) < 4:
|
||
return cx, cy
|
||
n = len(fx)
|
||
ddx_t = (fx - center[0]).astype(np.float32)
|
||
ddy_t = (fy - center[1]).astype(np.float32)
|
||
gx_tf = np.array([gx_t[int(fy[i]), int(fx[i])] for i in range(n)], dtype=np.float32)
|
||
gy_tf = np.array([gy_t[int(fy[i]), int(fx[i])] for i in range(n)], dtype=np.float32)
|
||
mag_tf = np.hypot(gx_tf, gy_tf) + 1e-6
|
||
nx_t = gx_tf / mag_tf
|
||
ny_t = gy_tf / mag_tf
|
||
gx_s = cv2.Sobel(scene_gray, cv2.CV_32F, 1, 0, ksize=3)
|
||
gy_s = cv2.Sobel(scene_gray, cv2.CV_32F, 0, 1, ksize=3)
|
||
H, W = scene_gray.shape
|
||
cur_cx, cur_cy = float(cx), float(cy)
|
||
for _ in range(n_iters):
|
||
xs = cur_cx + ddx_t
|
||
ys = cur_cy + ddy_t
|
||
xs_c = np.clip(xs, 0, W - 1.001)
|
||
ys_c = np.clip(ys, 0, H - 1.001)
|
||
x0 = xs_c.astype(np.int32); y0 = ys_c.astype(np.int32)
|
||
ax = xs_c - x0; ay = ys_c - y0
|
||
def _bilin(g):
|
||
v00 = g[y0, x0]; v10 = g[y0, x0 + 1]
|
||
v01 = g[y0 + 1, x0]; v11 = g[y0 + 1, x0 + 1]
|
||
return ((1 - ax) * (1 - ay) * v00
|
||
+ ax * (1 - ay) * v10
|
||
+ (1 - ax) * ay * v01
|
||
+ ax * ay * v11)
|
||
sx_v = _bilin(gx_s)
|
||
sy_v = _bilin(gy_s)
|
||
mag_s = np.hypot(sx_v, sy_v) + 1e-6
|
||
nx_s = sx_v / mag_s
|
||
ny_s = sy_v / mag_s
|
||
w = np.minimum(mag_s, 255.0).astype(np.float32)
|
||
err_x = (nx_s - nx_t) * w
|
||
err_y = (ny_s - ny_t) * w
|
||
step_x = -float(err_x.sum()) / (w.sum() + 1e-6)
|
||
step_y = -float(err_y.sum()) / (w.sum() + 1e-6)
|
||
step_x = max(-1.0, min(1.0, step_x))
|
||
step_y = max(-1.0, min(1.0, step_y))
|
||
cur_cx += step_x
|
||
cur_cy += step_y
|
||
if abs(step_x) < 0.02 and abs(step_y) < 0.02:
|
||
break
|
||
return cur_cx, cur_cy
|
||
|
||
def _verify_ncc(
|
||
self, scene_gray: np.ndarray, cx: float, cy: float,
|
||
angle_deg: float, scale: float, view_idx: int = 0,
|
||
) -> float:
|
||
"""NCC tra template warpato alla pose e scena sottostante.
|
||
|
||
Lavora su un **crop locale** della scena di lato = diagonale del
|
||
template ruotato+scalato, non sull'intera scena. Su scene grandi
|
||
(1920×1080) taglia drasticamente il costo del warp per ogni match.
|
||
|
||
Ritorna score [-1, 1]. Usato come filtro anti-falso-positivo:
|
||
il matcher linemod può dare score alto su texture generiche ma
|
||
sovrapponendo il template gray i pixel non corrispondono.
|
||
"""
|
||
t, train_mask = self._get_view_template(view_idx)
|
||
if t is None:
|
||
return 1.0
|
||
h, w = t.shape
|
||
cx_t = (w - 1) / 2.0
|
||
cy_t = (h - 1) / 2.0
|
||
|
||
# Bounding box del template ruotato/scalato attorno a (cx, cy)
|
||
diag = int(np.ceil(np.hypot(w, h) * scale)) + 8
|
||
H, W = scene_gray.shape
|
||
x0 = int(round(cx)) - diag // 2
|
||
y0 = int(round(cy)) - diag // 2
|
||
cx0 = max(0, x0); cy0 = max(0, y0)
|
||
cx1 = min(W, x0 + diag); cy1 = min(H, y0 + diag)
|
||
if cx1 - cx0 < 10 or cy1 - cy0 < 10:
|
||
return 0.0
|
||
scn_crop = scene_gray[cy0:cy1, cx0:cx1]
|
||
ch, cw = scn_crop.shape
|
||
|
||
M = cv2.getRotationMatrix2D((cx_t, cy_t), angle_deg, scale)
|
||
# Porta il centro del template a (cx - cx0, cy - cy0) del crop
|
||
M[0, 2] += (cx - cx0) - cx_t
|
||
M[1, 2] += (cy - cy0) - cy_t
|
||
warped = cv2.warpAffine(
|
||
t, M, (cw, ch),
|
||
flags=cv2.INTER_LINEAR, borderValue=0,
|
||
)
|
||
if train_mask is not None:
|
||
mask_src = train_mask
|
||
else:
|
||
mask_src = np.full_like(t, 255)
|
||
mask_w = cv2.warpAffine(
|
||
mask_src, M, (cw, ch),
|
||
flags=cv2.INTER_NEAREST, borderValue=0,
|
||
)
|
||
valid = mask_w > 0
|
||
if valid.sum() < 20:
|
||
return 0.0
|
||
tpl = warped[valid].astype(np.float32)
|
||
scn = scn_crop[valid].astype(np.float32)
|
||
tm = tpl - tpl.mean()
|
||
sm = scn - scn.mean()
|
||
# Std minimo: se template o scena patch sono quasi uniformi
|
||
# (es. zona di sfondo bianco/nero), NCC e instabile e da false
|
||
# high-correlation. Halcon-style: scarta match.
|
||
tpl_var = float((tm * tm).sum())
|
||
scn_var = float((sm * sm).sum())
|
||
n_pix = float(valid.sum())
|
||
if tpl_var < 1e-3 * n_pix or scn_var < 1e-3 * n_pix:
|
||
return 0.0
|
||
denom = np.sqrt(tpl_var * scn_var) + 1e-9
|
||
return float((tm * sm).sum() / denom)
|
||
|
||
def find(
|
||
self,
|
||
scene_bgr: np.ndarray,
|
||
min_score: float = 0.6,
|
||
max_matches: int = 20,
|
||
nms_radius: int | None = None,
|
||
refine_angle: bool = True,
|
||
subpixel: bool = True,
|
||
verify_ncc: bool = True,
|
||
verify_threshold: float = 0.4,
|
||
ncc_skip_above: float = 1.01, # disabilitato di default: NCC sempre
|
||
coarse_angle_factor: int = 2,
|
||
coarse_stride: int = 1,
|
||
scale_penalty: float = 0.0,
|
||
search_roi: tuple[int, int, int, int] | None = None,
|
||
pyramid_propagate: bool = False, # off di default: meno duplicati
|
||
propagate_topk: int = 4,
|
||
refine_pose_joint: bool = False,
|
||
greediness: float = 0.0,
|
||
batch_top: bool = False,
|
||
nms_iou_threshold: float = 0.3,
|
||
min_recall: float = 0.0,
|
||
use_soft_score: bool = False,
|
||
subpixel_lm: bool = False,
|
||
) -> list[Match]:
|
||
"""
|
||
scale_penalty: se > 0, riduce lo score per match a scala diversa da 1.0:
|
||
score_final = score_shape * max(0, 1 - scale_penalty * |scale - 1|)
|
||
Utile se l'operatore vuole che match "identico al template anche per
|
||
dimensione" abbia score più alto di match "stessa forma, dimensione
|
||
diversa". scale_penalty=0 (default) = comportamento shape puro.
|
||
|
||
search_roi: (x, y, w, h) limita la ricerca a una regione della scena.
|
||
Equivalente a Halcon set_aoi: il matching opera su crop locale e le
|
||
coordinate output sono ri-traslate al sistema scena originale. Usare
|
||
quando si conosce a priori l'area in cui il pezzo può apparire (es.
|
||
feeder a posizione fissa) → costo proporzionale a w·h invece di W·H.
|
||
"""
|
||
if not self.variants:
|
||
raise RuntimeError("Matcher non addestrato: chiamare train() prima.")
|
||
|
||
gray_full = self._to_gray(scene_bgr)
|
||
# Applica ROI di ricerca: restringe scena a crop, ricorda offset per
|
||
# ri-traslare le coordinate dei match a fine pipeline.
|
||
if search_roi is not None:
|
||
rx, ry, rw, rh = search_roi
|
||
H_s, W_s = gray_full.shape
|
||
rx = max(0, int(rx)); ry = max(0, int(ry))
|
||
rw = max(1, min(int(rw), W_s - rx))
|
||
rh = max(1, min(int(rh), H_s - ry))
|
||
gray0 = gray_full[ry:ry + rh, rx:rx + rw]
|
||
roi_offset = (rx, ry)
|
||
else:
|
||
gray0 = gray_full
|
||
roi_offset = (0, 0)
|
||
|
||
# Cache pre-compute scena (II Halcon-style): hash bytes scene + param
|
||
# gradient/spread → riusa spread piramide + density tra find()
|
||
# consecutive con stessa scena (typical UI tuning: slider produce
|
||
# 10+ find() su scena identica). Risparmia ~80% del costo non-kernel.
|
||
cache_key = self._scene_cache_key(gray0)
|
||
cached = self._scene_cache_get(cache_key) if cache_key else None
|
||
if cached is not None:
|
||
grays, spread_top, bit_active_top, density_top, spread0, \
|
||
bit_active_full, density_full, top = cached
|
||
else:
|
||
grays = [gray0]
|
||
for _ in range(self.pyramid_levels - 1):
|
||
grays.append(cv2.pyrDown(grays[-1]))
|
||
top = len(grays) - 1
|
||
spread_top = self._spread_bitmap(grays[top])
|
||
bit_active_top = int(
|
||
sum(1 << b for b in range(self._n_bins)
|
||
if (spread_top & (spread_top.dtype.type(1) << b)).any())
|
||
)
|
||
density_top = _jit_popcount(spread_top)
|
||
# spread0 + density_full computati piu sotto, quindi salvo dopo.
|
||
spread0 = None
|
||
bit_active_full = None
|
||
density_full = None
|
||
if nms_radius is None:
|
||
nms_radius = max(8, min(self.template_size) // 2)
|
||
# Pruning adattivo allo step angolare: con step piccolo (<= 3 deg)
|
||
# ci sono molte varianti vicine, gli score top-level sono ravvicinati
|
||
# e top_thresh*0.5 e' troppo aggressivo: scarta varianti valide che
|
||
# sarebbero state riprese al full-res. Stessa cosa per
|
||
# coarse_angle_factor (skip 1 ogni 2): con step fine non e' utile.
|
||
# Risultato osservato: precisione "veloce" 10° dava risultati
|
||
# migliori di "preciso" 2° proprio perche evitava il pruning.
|
||
eff_step = self._effective_angle_step()
|
||
top_factor = self.top_score_factor
|
||
cf_eff = max(1, coarse_angle_factor)
|
||
if eff_step <= 3.0:
|
||
top_factor = max(top_factor, 0.7)
|
||
cf_eff = 1
|
||
top_thresh = min_score * top_factor
|
||
|
||
tw, th = self.template_size
|
||
# density_top gia' computato sopra (cache o miss)
|
||
sf_top = 2 ** top
|
||
bg_cache_top: dict[float, np.ndarray] = {}
|
||
bg_cache_full: dict[float, np.ndarray] = {}
|
||
unique_scales = sorted({var.scale for var in self.variants})
|
||
|
||
def _bg_for_scale(density: np.ndarray, scale: float,
|
||
divisor: int) -> np.ndarray:
|
||
bw = max(9, int(round(tw * scale / divisor)))
|
||
bh = max(9, int(round(th * scale / divisor)))
|
||
sm = cv2.boxFilter(density, cv2.CV_32F, (bw, bh))
|
||
return np.clip(sm / N_BINS, 0.0, 0.99)
|
||
|
||
for sc in unique_scales:
|
||
bg_cache_top[sc] = _bg_for_scale(density_top, sc, sf_top)
|
||
|
||
def _rescore(score: np.ndarray, bg: np.ndarray) -> np.ndarray:
|
||
return np.maximum(0.0, (score - bg) / (1.0 - bg + 1e-6))
|
||
|
||
# Coarse-to-fine angolare:
|
||
# 1) Raggruppa varianti per scala, ordina per angolo
|
||
# 2) Top-level: valuta solo 1 ogni coarse_angle_factor varianti
|
||
# 3) Espandi ai vicini nel full-res
|
||
variants_by_scale: dict[float, list[int]] = {}
|
||
for vi, var in enumerate(self.variants):
|
||
variants_by_scale.setdefault(var.scale, []).append(vi)
|
||
|
||
coarse_idx_list: list[int] = [] # varianti da valutare al top
|
||
neighbor_map: dict[int, list[int]] = {} # vi_coarse -> indici vicini
|
||
cf = cf_eff
|
||
for scale_key, vi_list in variants_by_scale.items():
|
||
vi_sorted = sorted(vi_list, key=lambda i: self.variants[i].angle_deg)
|
||
n = len(vi_sorted)
|
||
for i in range(0, n, cf):
|
||
vi_c = vi_sorted[i]
|
||
coarse_idx_list.append(vi_c)
|
||
# Vicini: ±cf/2 attorno a i (stessa scala)
|
||
half = cf // 2
|
||
start = max(0, i - half)
|
||
end = min(n, i + half + 1)
|
||
neighbor_map[vi_c] = vi_sorted[start:end]
|
||
|
||
# Pruning varianti via top-level (parallelizzato).
|
||
# coarse_stride > 1: 1 pixel ogni stride (~stride^2 speed-up).
|
||
# pyramid_propagate=True: top-K picchi per restringere full-res.
|
||
# greediness > 0: kernel greedy con early-exit (alternativo a rescore).
|
||
cs = max(1, int(coarse_stride))
|
||
peaks_by_vi: dict[int, list[tuple[int, int, float]]] = {}
|
||
use_greedy_top = greediness > 0.0
|
||
|
||
def _top_score(vi: int) -> tuple[int, float]:
|
||
var = self.variants[vi]
|
||
lvl = var.levels[min(top, len(var.levels) - 1)]
|
||
if use_greedy_top:
|
||
# Greedy non supporta stride né rescore bg
|
||
score = _jit_score_bitmap_greedy(
|
||
spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top,
|
||
top_thresh, greediness,
|
||
)
|
||
else:
|
||
score = _jit_score_bitmap_rescored(
|
||
spread_top, lvl.dx, lvl.dy, lvl.bin, bit_active_top,
|
||
bg_cache_top[var.scale], stride=cs,
|
||
)
|
||
if score.size == 0:
|
||
return vi, -1.0
|
||
best = float(score.max())
|
||
if pyramid_propagate and best > 0:
|
||
flat = score.ravel()
|
||
k = min(propagate_topk, flat.size)
|
||
idx = np.argpartition(-flat, k - 1)[:k]
|
||
peaks: list[tuple[int, int, float]] = []
|
||
for i in idx:
|
||
s = float(flat[i])
|
||
if s < top_thresh * 0.7:
|
||
continue
|
||
yt, xt = int(i // score.shape[1]), int(i % score.shape[1])
|
||
peaks.append((xt, yt, s))
|
||
peaks_by_vi[vi] = peaks
|
||
return vi, best
|
||
|
||
kept_coarse: list[tuple[int, float]] = []
|
||
all_top_scores: list[tuple[int, float]] = []
|
||
# batch_top: usa kernel batch single-call con prange-esterno su
|
||
# varianti. Vince su threadpool quando n_vars >> n_threads e quando
|
||
# H*W top e' piccolo (overhead chiamate JIT > costo kernel).
|
||
if (batch_top and HAS_NUMBA and len(coarse_idx_list) > 4):
|
||
dx_l = []; dy_l = []; bn_l = []; vs_l = []
|
||
for vi in coarse_idx_list:
|
||
var = self.variants[vi]
|
||
lvl = var.levels[min(top, len(var.levels) - 1)]
|
||
dx_l.append(lvl.dx); dy_l.append(lvl.dy); bn_l.append(lvl.bin)
|
||
vs_l.append(var.scale)
|
||
scores_arr = _jit_top_max_per_variant(
|
||
spread_top, dx_l, dy_l, bn_l, bg_cache_top, vs_l,
|
||
bit_active_top,
|
||
)
|
||
for vi, best in zip(coarse_idx_list, scores_arr.tolist()):
|
||
all_top_scores.append((vi, best))
|
||
if best >= top_thresh:
|
||
kept_coarse.append((vi, best))
|
||
elif self.n_threads > 1 and len(coarse_idx_list) > 1:
|
||
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
|
||
for vi, best in ex.map(_top_score, coarse_idx_list):
|
||
all_top_scores.append((vi, best))
|
||
if best >= top_thresh:
|
||
kept_coarse.append((vi, best))
|
||
else:
|
||
for vi in coarse_idx_list:
|
||
vi2, best = _top_score(vi)
|
||
all_top_scores.append((vi2, best))
|
||
if best >= top_thresh:
|
||
kept_coarse.append((vi2, best))
|
||
|
||
# Fallback adattivo: se il rescore background ha abbattuto tutti
|
||
# gli score sotto top_thresh (scene texturate pesanti), ripesca
|
||
# le varianti migliori al top level per dare comunque una chance
|
||
# alla fase full-res invece di ritornare 0 match.
|
||
if not kept_coarse and all_top_scores:
|
||
all_top_scores.sort(key=lambda t: -t[1])
|
||
n_keep = max(4, len(all_top_scores) // 10)
|
||
# Limita a varianti con score top > 0 (non completamente a zero)
|
||
kept_coarse = [(vi, s) for vi, s in all_top_scores[:n_keep] if s > 0]
|
||
|
||
# Espandi ogni coarse promosso con i suoi vicini (stessa scala,
|
||
# angoli intermedi non valutati al top)
|
||
expanded: set[int] = set()
|
||
score_by_vi: dict[int, float] = {}
|
||
for vi_c, s_top in kept_coarse:
|
||
for vi_n in neighbor_map.get(vi_c, [vi_c]):
|
||
expanded.add(vi_n)
|
||
# Usa lo score del coarse come stima per il sort successivo
|
||
score_by_vi[vi_n] = max(score_by_vi.get(vi_n, 0.0), s_top)
|
||
kept_variants: list[tuple[int, float]] = [
|
||
(vi, score_by_vi[vi]) for vi in expanded
|
||
]
|
||
|
||
if not kept_variants:
|
||
return []
|
||
|
||
# Cap adattivo: ordina per score top-level e mantieni le più promettenti.
|
||
# Min: max_matches*8 (margine per NMS cross-variant)
|
||
# Max: 50% delle varianti totali (protegge performance con molte scale)
|
||
kept_variants.sort(key=lambda t: -t[1])
|
||
max_vars_full = max(max_matches * 8, len(self.variants) // 2)
|
||
kept_variants = kept_variants[:max_vars_full]
|
||
|
||
# Full-res (parallelizzato) con bitmap.
|
||
# Riusa cache se disponibile, altrimenti computa e salva.
|
||
if spread0 is None:
|
||
spread0 = self._spread_bitmap(gray0)
|
||
bit_active_full = int(
|
||
sum(1 << b for b in range(self._n_bins)
|
||
if (spread0 & (spread0.dtype.type(1) << b)).any())
|
||
)
|
||
density_full = _jit_popcount(spread0)
|
||
# Salva cache scena complete
|
||
if cache_key is not None:
|
||
self._scene_cache_put(cache_key, (
|
||
grays, spread_top, bit_active_top, density_top,
|
||
spread0, bit_active_full, density_full, top,
|
||
))
|
||
for sc in unique_scales:
|
||
bg_cache_full[sc] = _bg_for_scale(density_full, sc, 1)
|
||
|
||
# Margine in full-res attorno ad ogni peak top: copre incertezza
|
||
# downsampling (sf_top px) + spread_radius + slack per NMS.
|
||
propagate_margin = sf_top + self.spread_radius + max(8, nms_radius // 2)
|
||
H_full, W_full = spread0.shape
|
||
|
||
def _full_score(vi: int) -> tuple[int, np.ndarray]:
|
||
var = self.variants[vi]
|
||
lvl0 = var.levels[0]
|
||
if not pyramid_propagate or vi not in peaks_by_vi or not peaks_by_vi[vi]:
|
||
# Path legacy: scansiona intera scena
|
||
return vi, _jit_score_bitmap_rescored(
|
||
spread0, lvl0.dx, lvl0.dy, lvl0.bin, bit_active_full,
|
||
bg_cache_full[var.scale],
|
||
)
|
||
# Path piramide propagata: valuta solo crop locali attorno
|
||
# alle posizioni dei picchi top-level (riproiettati a full-res).
|
||
score_full = np.zeros((H_full, W_full), dtype=np.float32)
|
||
mark = np.zeros((H_full, W_full), dtype=bool)
|
||
bg = bg_cache_full[var.scale]
|
||
for xt, yt, _s in peaks_by_vi[vi]:
|
||
cx0 = xt * sf_top
|
||
cy0 = yt * sf_top
|
||
x_lo = max(0, cx0 - propagate_margin)
|
||
x_hi = min(W_full, cx0 + propagate_margin + 1)
|
||
y_lo = max(0, cy0 - propagate_margin)
|
||
y_hi = min(H_full, cy0 + propagate_margin + 1)
|
||
if x_hi <= x_lo or y_hi <= y_lo:
|
||
continue
|
||
if mark[y_lo:y_hi, x_lo:x_hi].all():
|
||
continue
|
||
# Crop spread + bg, valuta kernel sul crop
|
||
spread_crop = np.ascontiguousarray(spread0[y_lo:y_hi, x_lo:x_hi])
|
||
bg_crop = np.ascontiguousarray(bg[y_lo:y_hi, x_lo:x_hi])
|
||
score_crop = _jit_score_bitmap_rescored(
|
||
spread_crop, lvl0.dx, lvl0.dy, lvl0.bin,
|
||
bit_active_full, bg_crop,
|
||
)
|
||
score_full[y_lo:y_hi, x_lo:x_hi] = np.maximum(
|
||
score_full[y_lo:y_hi, x_lo:x_hi], score_crop,
|
||
)
|
||
mark[y_lo:y_hi, x_lo:x_hi] = True
|
||
return vi, score_full
|
||
|
||
candidates_per_var: list[tuple[int, np.ndarray]] = []
|
||
raw: list[tuple[float, int, int, int]] = []
|
||
var_indices = [vi for vi, _ in kept_variants]
|
||
if self.n_threads > 1 and len(var_indices) > 1:
|
||
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
|
||
results = list(ex.map(_full_score, var_indices))
|
||
else:
|
||
results = [_full_score(vi) for vi in var_indices]
|
||
|
||
def _scale_factor(s: float) -> float:
|
||
"""Penalità moltiplicativa per scala diversa da 1.0."""
|
||
if scale_penalty > 0.0 and s != 1.0:
|
||
return max(0.0, 1.0 - scale_penalty * abs(s - 1.0))
|
||
return 1.0
|
||
|
||
for vi, score in results:
|
||
pen = _scale_factor(self.variants[vi].scale)
|
||
# Ordinare/sogliare su score penalizzato: un match a scala 1.5 con
|
||
# score 0.8 e penalty=0.3 effettivamente vale 0.56, non 0.8.
|
||
score_for_sort = score if pen == 1.0 else score * pen
|
||
ys, xs = np.where(score_for_sort >= min_score)
|
||
if len(ys) == 0:
|
||
continue
|
||
vals = score_for_sort[ys, xs]
|
||
K = min(len(vals), max_matches * 5)
|
||
ord_idx = np.argpartition(-vals, K - 1)[:K]
|
||
candidates_per_var.append((vi, score)) # score_map originale
|
||
for i in ord_idx:
|
||
raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi))
|
||
|
||
raw.sort(key=lambda c: -c[0])
|
||
|
||
# Mappa vi → score_map per subpixel/refinement
|
||
score_maps = dict(candidates_per_var)
|
||
|
||
# NMS + subpixel + refinement angolare
|
||
# Usa mask salvata in train() per coerenza (se ROI poligonale).
|
||
h, w = self.template_gray.shape if self.template_gray is not None else (0, 0)
|
||
mask_full = (
|
||
self._train_mask if self._train_mask is not None
|
||
else np.full((h, w), 255, dtype=np.uint8)
|
||
)
|
||
# Plateau radius adattivo al template (evita plateau troppo ampi su
|
||
# template piccoli: 8% del lato minimo, clampato [3, 10]).
|
||
plateau_r = max(3, min(10, int(min(self.template_size) * 0.08)))
|
||
|
||
# Pre-NMS rapido su raw con coordinate intere (nms_radius ≥ 8,
|
||
# la precisione sub-pixel non cambia la decisione di reject).
|
||
# Subpixel viene calcolato DOPO il pre-NMS solo sui ~pre_cap
|
||
# preliminary sopravvissuti: prima era chiamato su ogni raw (~58k
|
||
# chiamate su clip_preciso) anche se la maggior parte veniva poi
|
||
# scartata dalla NMS, sprecando la parte più costosa del loop.
|
||
r2 = nms_radius * nms_radius
|
||
pre_cap = max(max_matches * 3, max_matches + 10)
|
||
preliminary_int: list[tuple[float, int, int, int]] = []
|
||
for score, xi, yi, vi in raw:
|
||
if any((k[1] - xi) ** 2 + (k[2] - yi) ** 2 < r2
|
||
for k in preliminary_int):
|
||
continue
|
||
preliminary_int.append((score, xi, yi, vi))
|
||
if len(preliminary_int) >= pre_cap:
|
||
break
|
||
|
||
# Subpixel + refine + verify solo sui candidati pre-NMS (max pre_cap)
|
||
kept: list[Match] = []
|
||
tw, th = self.template_size
|
||
for score, xi, yi, vi in preliminary_int:
|
||
if subpixel and vi in score_maps:
|
||
cx_f, cy_f = self._subpixel_peak(
|
||
score_maps[vi], xi, yi, plateau_radius=plateau_r,
|
||
)
|
||
else:
|
||
cx_f, cy_f = float(xi), float(yi)
|
||
var = self.variants[vi]
|
||
ang_f = var.angle_deg
|
||
score_f = score
|
||
if refine_pose_joint and self.template_gray is not None:
|
||
ang_f, score_f, cx_f, cy_f = self._refine_pose_joint(
|
||
spread0, self.template_gray, cx_f, cy_f,
|
||
var.angle_deg, var.scale, mask_full,
|
||
)
|
||
elif refine_angle and self.template_gray is not None:
|
||
ang_f, score_f, cx_f, cy_f = self._refine_angle(
|
||
spread0, bit_active_full, self.template_gray, cx_f, cy_f,
|
||
var.angle_deg, var.scale, mask_full,
|
||
search_radius=self._effective_angle_step() / 2.0,
|
||
original_score=score,
|
||
)
|
||
# Halcon SubPixel='least_squares_high': refinement iterativo
|
||
# gradient-field per precisione 0.05 px (vs 0.5 quadratic 2D).
|
||
if subpixel_lm and self.template_gray is not None:
|
||
cx_lm, cy_lm = self._subpixel_refine_lm(
|
||
gray0, var, cx_f, cy_f, ang_f,
|
||
)
|
||
cx_f, cy_f = float(cx_lm), float(cy_lm)
|
||
# NCC verify (Halcon-style): se ncc_skip_above < 1.0 salta
|
||
# il calcolo per shape-score gia alti. Default 1.01 = NCC sempre,
|
||
# piu sicuro contro falsi positivi (lo shape-score satura facile).
|
||
# Quando NCC viene calcolato, lo score finale e' la MEDIA tra
|
||
# shape-score e NCC: rende lo score piu discriminante per
|
||
# ranking/visualizzazione (uno score 1.0 vero richiede sia
|
||
# match shape sia template gray identici).
|
||
if verify_ncc and float(score_f) < ncc_skip_above:
|
||
ncc = self._verify_ncc(
|
||
gray0, cx_f, cy_f, ang_f, var.scale,
|
||
view_idx=getattr(var, "view_idx", 0),
|
||
)
|
||
if ncc < verify_threshold:
|
||
continue
|
||
score_f = (float(score_f) + max(0.0, ncc)) * 0.5
|
||
# Soft-margin gradient similarity: sostituisce o integra lo
|
||
# score con metric continua (cos sim gradients) invece di
|
||
# bin discreto. Halcon-style: piu robusto a piccole rotazioni.
|
||
if use_soft_score:
|
||
soft = self._compute_soft_score(
|
||
gray0, var, cx_f, cy_f, ang_f,
|
||
)
|
||
score_f = (float(score_f) + soft) * 0.5
|
||
# Re-check min_score sullo score finale: NCC averaging puo
|
||
# abbattere lo shape-score sotto la soglia user. Senza questo
|
||
# check apparivano match con score < min_score (UI confusing).
|
||
if float(score_f) < min_score:
|
||
continue
|
||
|
||
# Feature recall (Halcon MinScore-style): conta quante feature
|
||
# template effettivamente combaciano nello spread scena alla
|
||
# pose finale. Scarta se sotto min_recall (default 0 = off).
|
||
# Util contro match parziali ad alto NCC ma poche feature reali.
|
||
if min_recall > 0.0:
|
||
recall = self._compute_recall(
|
||
spread0, var, cx_f, cy_f, ang_f,
|
||
)
|
||
if recall < min_recall:
|
||
continue
|
||
|
||
# Ri-traslo coord da spazio crop ROI a spazio scena originale.
|
||
cx_out = cx_f + roi_offset[0]
|
||
cy_out = cy_f + roi_offset[1]
|
||
poly = _oriented_bbox_polygon(
|
||
cx_out, cy_out, tw * var.scale, th * var.scale, ang_f,
|
||
)
|
||
# Reject match con bbox che sfora pesantemente la scena:
|
||
# spesso indica match spurio (centro derivato male o scala
|
||
# incoerente). Tollera 25% out-of-bounds, sopra rigetta.
|
||
H_scn, W_scn = gray_full.shape
|
||
poly_area = float(cv2.contourArea(poly))
|
||
if poly_area > 0:
|
||
# Clip poly alla scena: intersezione con rettangolo (0,0,W,H)
|
||
scene_rect = np.array([
|
||
[0, 0], [W_scn, 0], [W_scn, H_scn], [0, H_scn],
|
||
], dtype=np.float32)
|
||
inter, _ = cv2.intersectConvexConvex(
|
||
poly.astype(np.float32), scene_rect,
|
||
)
|
||
inside_ratio = float(inter) / poly_area
|
||
if inside_ratio < 0.75:
|
||
continue
|
||
# Penalità scala opzionale: score degrada con distanza da 1.0
|
||
if scale_penalty > 0.0 and var.scale != 1.0:
|
||
score_f = float(score_f) * max(
|
||
0.0, 1.0 - scale_penalty * abs(var.scale - 1.0)
|
||
)
|
||
# NMS post-refine cross-variant: usa IoU bbox-poligonale invece
|
||
# di sola distanza centro. Due match orientati diversi ma vicini
|
||
# (pezzi adiacenti) NON vengono fusi se l'overlap reale e basso;
|
||
# due match dello stesso pezzo (centri uguali, rotazione simile)
|
||
# hanno IoU alto e vengono droppati.
|
||
# Fallback distanza centro per match con bbox degenere.
|
||
dup = False
|
||
for k in kept:
|
||
iou = _poly_iou(k.bbox_poly, poly)
|
||
if iou > nms_iou_threshold:
|
||
dup = True
|
||
break
|
||
# Sicurezza: centri molto vicini (dentro nms_radius/2)
|
||
# sempre fusi, anche con orientamenti molto diversi.
|
||
if (k.cx - cx_out) ** 2 + (k.cy - cy_out) ** 2 < (r2 / 4.0):
|
||
dup = True
|
||
break
|
||
if dup:
|
||
continue
|
||
kept.append(Match(
|
||
cx=cx_out, cy=cy_out,
|
||
angle_deg=ang_f,
|
||
scale=var.scale,
|
||
score=score_f,
|
||
bbox_poly=poly,
|
||
))
|
||
if len(kept) >= max_matches:
|
||
break
|
||
return kept
|