perf: piramide al training, refinement sub-step, multithreading

LineShapeMatcher:
- Feature piramidate precomputate al training (_LevelFeatures per livello
  piramide, dedup risolto una volta)
- Refinement angolare: 5 offset ±step/2 + parabolic fit → precisione ~0.5°
  con angle_step=5° (10x fine rispetto a step training)
- Subpixel posizione: parabolic fit 2D sul picco → frazione pixel
- Multithreading: n_threads auto=CPU-1, parallelizza top-level pruning e
  full-res matching tramite ThreadPoolExecutor (numpy/cv2 rilasciano GIL)

GUI:
- Dialog edit_params con bottone Auto-tune
- Legenda numerata match con pallino colore (#i, coords, angle, scala, score)
- Hotkey finestra: r=params, o=nuovo ROI, m=nuovo modello, s=nuova scena
- Pannello con train/find time + HOTKEY in basso

auto_tune.py:
- Analisi template: soglie grad da percentili, num_features da densità
  edge, pyramid_levels da min_side, min_score da entropia orientation,
  rilevazione simmetria rotazionale (soglia 0.75 NCC su magnitude)

Benchmark clip.png (13 istanze, 72 varianti angolari):
  prima: 5.84s, precisione 5° (step training)
  ora:   1.67s, precisione ~0.5°, subpixel posizione
  speed-up: 3.5x, precisione angolare 10x

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-24 01:22:56 +02:00
parent b9a4d51fac
commit 075b014bd7
4 changed files with 918 additions and 105 deletions
+275 -55
View File
@@ -26,6 +26,8 @@ della ROI (modello non-rettangolare).
from __future__ import annotations
import os
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import cv2
@@ -34,6 +36,29 @@ import numpy as np
N_BINS = 8 # orientamenti quantizzati modulo π
def _oriented_bbox_polygon(
cx: float, cy: float, w: float, h: float, angle_deg: float,
) -> np.ndarray:
"""Ritorna 4 vertici (float32, shape (4,2)) del bbox orientato.
Convenzione coerente con cv2.getRotationMatrix2D usato nel train:
rotazione counter-clockwise (matematica) ma sistema immagine y-down,
quindi visivamente orario.
"""
w2, h2 = w / 2.0, h / 2.0
# Vertici template non-ruotato centrati al centro
corners = np.array([[-w2, -h2], [w2, -h2], [w2, h2], [-w2, h2]], np.float32)
a = np.deg2rad(angle_deg)
c, s = np.cos(a), np.sin(a)
# cv2.getRotationMatrix2D con angolo a positivo applica R = [[c,s],[-s,c]]
# (ruota counter-clockwise nel sistema matematico; y-down → orario)
R = np.array([[c, s], [-s, c]], np.float32)
rotated = corners @ R.T
rotated[:, 0] += cx
rotated[:, 1] += cy
return rotated
@dataclass
class Match:
cx: float
@@ -41,7 +66,16 @@ class Match:
angle_deg: float
scale: float
score: float
bbox: tuple[int, int, int, int]
bbox_poly: np.ndarray # (4, 2) float32 - 4 vertici ordinati (ruotato)
@dataclass
class _LevelFeatures:
"""Feature piramidate (livello l = downsample /2^l)."""
dx: np.ndarray # int32
dy: np.ndarray # int32
bin: np.ndarray # int8
n: int
@dataclass
@@ -49,16 +83,13 @@ class _Variant:
"""Template precomputato (una pose)."""
angle_deg: float
scale: float
# Feature come 3 array paralleli (dx, dy, bin) relativi al centro-modello
dx: np.ndarray # int32, shape (N,)
dy: np.ndarray # int32, shape (N,)
bin: np.ndarray # int8, shape (N,)
# Feature piramide: levels[0] = full-res, levels[l] = /2^l
levels: list[_LevelFeatures]
# Bbox kernel (per visualizzazione / limiti ricerca)
kh: int
kw: int
cx_local: float # centro-modello dentro al bbox kernel (solo per bbox visivo)
cx_local: float # centro-modello dentro al bbox kernel
cy_local: float
n_features: int
class LineShapeMatcher:
@@ -77,6 +108,7 @@ class LineShapeMatcher:
min_feature_spacing: int = 3,
pyramid_levels: int = 2,
top_score_factor: float = 0.5,
n_threads: int | None = None,
) -> None:
self.num_features = num_features
self.weak_grad = weak_grad
@@ -89,9 +121,11 @@ class LineShapeMatcher:
self.min_feature_spacing = min_feature_spacing
self.pyramid_levels = max(1, pyramid_levels)
self.top_score_factor = top_score_factor
self.n_threads = n_threads or max(1, (os.cpu_count() or 2) - 1)
self.variants: list[_Variant] = []
self.template_size: tuple[int, int] = (0, 0)
self.template_gray: np.ndarray | None = None
# --- Helpers -------------------------------------------------------
@@ -159,15 +193,32 @@ class LineShapeMatcher:
# --- Training ------------------------------------------------------
def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int:
"""Genera varianti rotate+scalate con feature sparse.
def _build_pyramid_features(
self, dx: np.ndarray, dy: np.ndarray, bin_: np.ndarray,
) -> list[_LevelFeatures]:
"""Piramide feature precomputata: livello l = /2^l con dedup."""
levels = [_LevelFeatures(dx=dx.copy(), dy=dy.copy(), bin=bin_.copy(),
n=len(dx))]
for lvl in range(1, self.pyramid_levels):
sf = 2 ** lvl
dx_l = (dx // sf).astype(np.int32)
dy_l = (dy // sf).astype(np.int32)
# Dedup: rimuove feature collassate sullo stesso (dx, dy, bin)
key = ((dx_l.astype(np.int64) << 24)
| (dy_l.astype(np.int64) << 8)
| bin_.astype(np.int64))
_, uniq = np.unique(key, return_index=True)
levels.append(_LevelFeatures(
dx=dx_l[uniq], dy=dy_l[uniq], bin=bin_[uniq], n=len(uniq),
))
return levels
mask: maschera binaria opzionale (stessa shape del template) per
limitare il modello a una regione non rettangolare.
"""
def train(self, template_bgr: np.ndarray, mask: np.ndarray | None = None) -> int:
"""Genera varianti rotate+scalate con feature sparse + piramide."""
gray = self._to_gray(template_bgr)
h, w = gray.shape
self.template_size = (w, h)
self.template_gray = gray.copy()
if mask is None:
mask_full = np.full((h, w), 255, dtype=np.uint8)
else:
@@ -207,27 +258,26 @@ class LineShapeMatcher:
if len(fx) < 8:
continue
# Feature relative al centro-modello (centro rotazione)
cx_c = diag / 2.0
cy_c = diag / 2.0
dx = (fx - cx_c).astype(np.int32)
dy = (fy - cy_c).astype(np.int32)
# Dimensione bbox per visualizzazione
x0 = int(dx.min()); x1 = int(dx.max())
y0 = int(dy.min()); y1 = int(dy.max())
kw = x1 - x0 + 1
kh = y1 - y0 + 1
cx_local = -x0 # posizione centro dentro al bbox
cx_local = -x0
cy_local = -y0
levels = self._build_pyramid_features(dx, dy, fb)
self.variants.append(_Variant(
angle_deg=float(ang),
scale=float(s),
dx=dx, dy=dy, bin=fb,
levels=levels,
kh=kh, kw=kw,
cx_local=float(cx_local), cy_local=float(cy_local),
n_features=len(fx),
))
return len(self.variants)
@@ -249,16 +299,21 @@ class LineShapeMatcher:
@staticmethod
def _score_by_shift(
resp: np.ndarray, dx: np.ndarray, dy: np.ndarray, bins: np.ndarray,
bin_has_data: np.ndarray | None = None,
) -> np.ndarray:
"""score[y,x] = Σ_i resp[bin_i][y+dy_i, x+dx_i] / len(dx).
Implementazione vettorizzata con slicing.
Ottimizzazione: se `bin_has_data` è fornito, skippa feature il cui
bin non ha pixel attivi nella scena (contribuzione = 0).
"""
_, H, W = resp.shape
acc = np.zeros((H, W), dtype=np.float32)
for i in range(len(dx)):
ddx = int(dx[i]); ddy = int(dy[i]); b = int(bins[i])
# dst[y, x] += resp[b][y+ddy, x+ddx]
n = len(dx)
for i in range(n):
b = int(bins[i])
if bin_has_data is not None and not bin_has_data[b]:
continue
ddx = int(dx[i]); ddy = int(dy[i])
y0s = max(0, -ddy); y1s = min(H, H - ddy)
x0s = max(0, -ddx); x1s = min(W, W - ddx)
if y0s >= y1s or x0s >= x1s:
@@ -266,16 +321,133 @@ class LineShapeMatcher:
y0r = y0s + ddy; y1r = y1s + ddy
x0r = x0s + ddx; x1r = x1s + ddx
acc[y0s:y1s, x0s:x1s] += resp[b, y0r:y1r, x0r:x1r]
if len(dx) > 0:
acc /= len(dx)
if n > 0:
acc /= n
return acc
@staticmethod
def _subpixel_peak(acc: np.ndarray, x: int, y: int) -> tuple[float, float]:
"""Fit parabolico 2D attorno al picco per offset subpixel (±0.5 px)."""
H, W = acc.shape
if x <= 0 or x >= W - 1 or y <= 0 or y >= H - 1:
return float(x), float(y)
c = acc[y, x]
dx2 = acc[y, x + 1] - 2 * c + acc[y, x - 1]
dy2 = acc[y + 1, x] - 2 * c + acc[y - 1, x]
dx1 = (acc[y, x + 1] - acc[y, x - 1]) / 2.0
dy1 = (acc[y + 1, x] - acc[y - 1, x]) / 2.0
ox = -dx1 / dx2 if abs(dx2) > 1e-6 else 0.0
oy = -dy1 / dy2 if abs(dy2) > 1e-6 else 0.0
ox = float(np.clip(ox, -0.5, 0.5))
oy = float(np.clip(oy, -0.5, 0.5))
return x + ox, y + oy
def _refine_angle(
self,
resp0: np.ndarray,
template_gray: np.ndarray,
cx: float, cy: float,
angle_deg: float, scale: float,
mask_full: np.ndarray,
angle_fine_step: float = 0.5,
search_radius: float | None = None,
) -> tuple[float, float, float, float]:
"""Ricerca angolare fine (sub-step) attorno al match grezzo.
Genera 5 template temporanei a angle ± {0.5, 1.0} * step e sceglie
l'angolo con score massimo (parabolic fit sulle 3 score centrali).
Ritorna (angle_refined, score, cx_refined, cy_refined).
"""
if search_radius is None:
search_radius = self.angle_step_deg / 2.0
offsets = np.linspace(-search_radius, search_radius, 5)
best = (angle_deg, -1.0, cx, cy)
scores_by_off: dict[float, float] = {}
h, w = template_gray.shape
sw = max(16, int(round(w * scale)))
sh = max(16, int(round(h * scale)))
gray_s = cv2.resize(template_gray, (sw, sh), interpolation=cv2.INTER_LINEAR)
mask_s = cv2.resize(mask_full, (sw, sh), interpolation=cv2.INTER_NEAREST)
diag = int(np.ceil(np.hypot(sh, sw))) + 6
py = (diag - sh) // 2; px = (diag - sw) // 2
gray_p = cv2.copyMakeBorder(gray_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_REPLICATE)
mask_p = cv2.copyMakeBorder(mask_s, py, diag - sh - py, px, diag - sw - px,
cv2.BORDER_CONSTANT, value=0)
center = (diag / 2.0, diag / 2.0)
H, W = resp0.shape[1], resp0.shape[2]
# Ricerca locale posizione con margine ±2 px sulla (cx, cy)
margin = 3
for off in offsets:
ang = angle_deg + off
M = cv2.getRotationMatrix2D(center, ang, 1.0)
gray_r = cv2.warpAffine(gray_p, M, (diag, diag),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
mask_r = cv2.warpAffine(mask_p, M, (diag, diag),
flags=cv2.INTER_NEAREST, borderValue=0)
mag, bins = self._gradient(gray_r)
fx, fy, fb = self._extract_features(mag, bins, mask_r)
if len(fx) < 8:
scores_by_off[float(off)] = 0.0
continue
dx = (fx - center[0]).astype(np.int32)
dy = (fy - center[1]).astype(np.int32)
# Finestra locale ±margin attorno a (cx, cy) via slicing vettorizzato
y_lo = int(cy) - margin; y_hi = int(cy) + margin + 1
x_lo = int(cx) - margin; x_hi = int(cx) + margin + 1
sh = y_hi - y_lo; sw = x_hi - x_lo
acc = np.zeros((sh, sw), dtype=np.float32)
for i in range(len(dx)):
ddx = int(dx[i]); ddy = int(dy[i]); b = int(fb[i])
sy0 = y_lo + ddy; sy1 = y_hi + ddy
sx0 = x_lo + ddx; sx1 = x_hi + ddx
a_y0 = max(0, -sy0); a_y1 = sh - max(0, sy1 - H)
a_x0 = max(0, -sx0); a_x1 = sw - max(0, sx1 - W)
s_y0 = max(0, sy0); s_y1 = min(H, sy1)
s_x0 = max(0, sx0); s_x1 = min(W, sx1)
if s_y1 > s_y0 and s_x1 > s_x0:
acc[a_y0:a_y1, a_x0:a_x1] += resp0[b, s_y0:s_y1, s_x0:s_x1]
acc /= len(dx)
_, max_val, _, max_loc = cv2.minMaxLoc(acc)
scores_by_off[float(off)] = float(max_val)
if max_val > best[1]:
new_cx = x_lo + float(max_loc[0])
new_cy = y_lo + float(max_loc[1])
best = (ang, float(max_val), new_cx, new_cy)
# Parabolic fit su 3 angoli attorno al massimo
sorted_offs = sorted(scores_by_off.keys())
best_off = best[0] - angle_deg
try:
i = sorted_offs.index(
min(sorted_offs, key=lambda x: abs(x - best_off))
)
if 0 < i < len(sorted_offs) - 1:
s0 = scores_by_off[sorted_offs[i - 1]]
s1 = scores_by_off[sorted_offs[i]]
s2 = scores_by_off[sorted_offs[i + 1]]
denom = (s0 - 2 * s1 + s2)
if abs(denom) > 1e-6:
delta = 0.5 * (s0 - s2) / denom
step = sorted_offs[i + 1] - sorted_offs[i]
refined_off = sorted_offs[i] + delta * step
return (angle_deg + refined_off, best[1], best[2], best[3])
except ValueError:
pass
return best
def find(
self,
scene_bgr: np.ndarray,
min_score: float = 0.6,
max_matches: int = 20,
nms_radius: int | None = None,
refine_angle: bool = True,
subpixel: bool = True,
) -> list[Match]:
if not self.variants:
raise RuntimeError("Matcher non addestrato: chiamare train() prima.")
@@ -285,66 +457,114 @@ class LineShapeMatcher:
for _ in range(self.pyramid_levels - 1):
grays.append(cv2.pyrDown(grays[-1]))
top = len(grays) - 1
sf = 2 ** top
# Response map top-level (usata SOLO per pruning varianti)
# Response map top-level
resp_top = self._response_map(grays[top])
bin_has_top = np.array([resp_top[b].any() for b in range(N_BINS)])
if nms_radius is None:
nms_radius = max(8, min(self.template_size) // 2)
top_thresh = min_score * self.top_score_factor
# Pruning varianti via top-level
kept_variants: list[int] = []
for vi, var in enumerate(self.variants):
dx_t = (var.dx // sf).astype(np.int32)
dy_t = (var.dy // sf).astype(np.int32)
key = ((dx_t.astype(np.int64) << 24)
| (dy_t.astype(np.int64) << 8)
| var.bin.astype(np.int64))
_, uniq_idx = np.unique(key, return_index=True)
# Pruning varianti via top-level (parallelizzato)
def _top_score(vi: int) -> tuple[int, float]:
var = self.variants[vi]
lvl = var.levels[min(top, len(var.levels) - 1)]
score = self._score_by_shift(
resp_top, dx_t[uniq_idx], dy_t[uniq_idx], var.bin[uniq_idx],
resp_top, lvl.dx, lvl.dy, lvl.bin, bin_has_data=bin_has_top,
)
if score.size and score.max() >= top_thresh:
kept_variants.append(vi)
return vi, float(score.max()) if score.size else -1.0
kept_variants: list[tuple[int, float]] = []
if self.n_threads > 1:
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
for vi, best in ex.map(_top_score, range(len(self.variants))):
if best >= top_thresh:
kept_variants.append((vi, best))
else:
for vi in range(len(self.variants)):
vi2, best = _top_score(vi)
if best >= top_thresh:
kept_variants.append((vi2, best))
if not kept_variants:
return []
# Full-res: score_by_shift solo per le varianti sopravvissute
max_vars_full = max(8, max_matches * 4)
kept_variants.sort(key=lambda t: -t[1])
kept_variants = kept_variants[:max_vars_full]
# Full-res (parallelizzato per variante)
resp0 = self._response_map(gray0)
refined: list[tuple[float, float, float, int]] = []
for vi in kept_variants:
bin_has_full = np.array([resp0[b].any() for b in range(N_BINS)])
def _full_score(vi: int) -> tuple[int, np.ndarray]:
var = self.variants[vi]
score = self._score_by_shift(resp0, var.dx, var.dy, var.bin)
# Picchi sopra soglia
lvl0 = var.levels[0]
score = self._score_by_shift(
resp0, lvl0.dx, lvl0.dy, lvl0.bin, bin_has_data=bin_has_full,
)
return vi, score
candidates_per_var: list[tuple[int, np.ndarray]] = []
raw: list[tuple[float, int, int, int]] = []
var_indices = [vi for vi, _ in kept_variants]
if self.n_threads > 1 and len(var_indices) > 1:
with ThreadPoolExecutor(max_workers=self.n_threads) as ex:
results = list(ex.map(_full_score, var_indices))
else:
results = [_full_score(vi) for vi in var_indices]
for vi, score in results:
ys, xs = np.where(score >= min_score)
if len(ys) == 0:
continue
vals = score[ys, xs]
# Ordine decrescente (solo i top-K per evitare liste enormi)
K = min(len(vals), max_matches * 5)
ord_idx = np.argpartition(-vals, K - 1)[:K]
candidates_per_var.append((vi, score))
for i in ord_idx:
refined.append((float(vals[i]),
float(xs[i]), float(ys[i]), vi))
raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi))
refined.sort(key=lambda c: -c[0])
raw.sort(key=lambda c: -c[0])
# Mappa vi → score_map per subpixel/refinement
score_maps = dict(candidates_per_var)
# NMS + subpixel + refinement angolare
# Mask template per refinement (non disponibile qui: usa full)
h, w = self.template_gray.shape if self.template_gray is not None else (0, 0)
mask_full = np.full((h, w), 255, dtype=np.uint8)
kept: list[Match] = []
r2 = nms_radius * nms_radius
for score, cx, cy, vi in refined:
if any((k.cx - cx) ** 2 + (k.cy - cy) ** 2 < r2 for k in kept):
continue
tw, th = self.template_size
for score, xi, yi, vi in raw:
var = self.variants[vi]
bx = int(round(cx - var.cx_local))
by = int(round(cy - var.cy_local))
cx_f = float(xi); cy_f = float(yi)
if subpixel and vi in score_maps:
cx_f, cy_f = self._subpixel_peak(score_maps[vi], xi, yi)
if any((k.cx - cx_f) ** 2 + (k.cy - cy_f) ** 2 < r2 for k in kept):
continue
ang_f = var.angle_deg
score_f = score
if refine_angle and self.template_gray is not None:
ang_f, score_f, cx_f, cy_f = self._refine_angle(
resp0, self.template_gray, cx_f, cy_f,
var.angle_deg, var.scale, mask_full,
search_radius=self.angle_step_deg / 2.0,
)
poly = _oriented_bbox_polygon(
cx_f, cy_f, tw * var.scale, th * var.scale, ang_f,
)
kept.append(Match(
cx=cx, cy=cy,
angle_deg=var.angle_deg,
cx=cx_f, cy=cy_f,
angle_deg=ang_f,
scale=var.scale,
score=score,
bbox=(bx, by, var.kw, var.kh),
score=score_f,
bbox_poly=poly,
))
if len(kept) >= max_matches:
break