Compare commits

..

1 Commits

Author SHA1 Message Date
Adriano 1cc7881a51 feat: pm2d.eval - validation harness CLI per LineShapeMatcher
Tool da CLI per misurare oggettivamente la qualita' del matcher
su dataset etichettato. Halcon ha questo solo nell'IDE (HDevelop),
qui esposto come modulo Python testabile in CI.

Format dataset JSON:
  - template + mask
  - params init matcher (override)
  - find_params (override per find())
  - scenes con ground_truth: lista pose attese (cx, cy, angle, scale,
    tolerance_px, tolerance_deg)

Metriche per scena: TP/FP/FN, precision, recall, IoU medio bbox,
tempo find. Aggregato: precision globale, recall, F1.

Match-to-GT criterio: distanza centro <= tolerance_px AND
|angle| <= tolerance_deg, oppure IoU bbox >= 0.3.

Use case:
- regressione: confronto config A vs B oggettivo
- tuning: trovare param ottimi via grid-search guidato da F1
- validazione pre-deploy: report TP/FP/FN su dataset prod

Esposto come entry-point pm2d-eval (pyproject.toml).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 10:09:45 +02:00
3 changed files with 220 additions and 69 deletions
+217
View File
@@ -0,0 +1,217 @@
"""CLI validation harness per LineShapeMatcher.
Usage:
python -m pm2d.eval dataset.json [opzioni]
Formato dataset (JSON):
{
"template": "path/to/template.png",
"mask": "path/to/mask.png", # opzionale
"params": { # opzionali, override su matcher init
"use_polarity": true,
"angle_step_deg": 5,
...
},
"find_params": { # opzionali, passati a find()
"min_score": 0.6,
"use_soft_score": true,
...
},
"scenes": [
{
"image": "path/to/scene1.png",
"ground_truth": [
{"cx": 320.0, "cy": 240.0, "angle_deg": 12.0,
"scale": 1.0, "tolerance_px": 5.0,
"tolerance_deg": 3.0}
]
}
]
}
Output: report precision/recall/IoU/timing per ogni scena + aggregati.
"""
from __future__ import annotations
import argparse
import json
import math
import sys
import time
from pathlib import Path
import cv2
import numpy as np
from pm2d.line_matcher import LineShapeMatcher, _poly_iou, _oriented_bbox_polygon
def _load_image(path: str | Path) -> np.ndarray:
img = cv2.imread(str(path), cv2.IMREAD_UNCHANGED)
if img is None:
raise FileNotFoundError(f"Immagine non trovata: {path}")
if img.ndim == 2:
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
return img
def _gt_to_poly(gt: dict, tw: int, th: int) -> np.ndarray:
"""Costruisce bbox poligonale per un ground truth."""
s = float(gt.get("scale", 1.0))
return _oriented_bbox_polygon(
float(gt["cx"]), float(gt["cy"]),
tw * s, th * s, float(gt["angle_deg"]),
)
def _match_to_gt(match, gt: dict, tw: int, th: int,
iou_thr: float = 0.3) -> bool:
"""True se il match corrisponde al ground truth.
Criterio: distanza centro <= tolerance_px AND |angle_deg - gt| <= tolerance_deg
OR IoU bbox >= iou_thr (fallback per pose con tolerance ampie).
"""
tol_px = float(gt.get("tolerance_px", 5.0))
tol_deg = float(gt.get("tolerance_deg", 3.0))
dx = match.cx - float(gt["cx"])
dy = match.cy - float(gt["cy"])
dist = math.hypot(dx, dy)
da = abs((match.angle_deg - float(gt["angle_deg"]) + 180) % 360 - 180)
if dist <= tol_px and da <= tol_deg:
return True
# Fallback IoU
poly_gt = _gt_to_poly(gt, tw, th)
poly_m = match.bbox_poly
if _poly_iou(poly_m, poly_gt) >= iou_thr:
return True
return False
def evaluate_scene(matcher: LineShapeMatcher, scene_bgr: np.ndarray,
gt_list: list[dict], find_params: dict,
tw: int, th: int) -> dict:
"""Esegue match e calcola TP/FP/FN per una scena."""
t0 = time.time()
matches = matcher.find(scene_bgr, **find_params)
elapsed = time.time() - t0
gt_matched = [False] * len(gt_list)
match_is_tp = [False] * len(matches)
iou_per_match = [0.0] * len(matches)
for i, m in enumerate(matches):
for j, gt in enumerate(gt_list):
if gt_matched[j]:
continue
if _match_to_gt(m, gt, tw, th):
gt_matched[j] = True
match_is_tp[i] = True
# Calcolo IoU per metrica
poly_gt = _gt_to_poly(gt, tw, th)
iou_per_match[i] = _poly_iou(m.bbox_poly, poly_gt)
break
tp = sum(match_is_tp)
fp = len(matches) - tp
fn = len(gt_list) - sum(gt_matched)
return {
"n_matches": len(matches),
"n_gt": len(gt_list),
"tp": tp, "fp": fp, "fn": fn,
"find_time_s": elapsed,
"iou_mean": float(np.mean([i for i, t in zip(iou_per_match, match_is_tp) if t])
if tp > 0 else 0.0),
"diag": (matcher.get_last_diag()
if hasattr(matcher, "get_last_diag") else None),
}
def run(dataset_path: str, scene_filter: str | None = None,
verbose: bool = False) -> dict:
"""Esegue eval su dataset, ritorna report aggregato."""
dataset_path = Path(dataset_path)
base = dataset_path.parent
with open(dataset_path) as f:
ds = json.load(f)
template = _load_image(base / ds["template"])
mask = None
if ds.get("mask"):
mask_img = cv2.imread(str(base / ds["mask"]), cv2.IMREAD_GRAYSCALE)
if mask_img is not None:
mask = (mask_img > 128).astype(np.uint8) * 255
init_params = ds.get("params", {})
find_params = ds.get("find_params", {})
matcher = LineShapeMatcher(**init_params)
n_var = matcher.train(template, mask=mask)
tw, th = matcher.template_size
print(f"Template: {ds['template']} ({tw}x{th}), {n_var} varianti")
print(f"Param matcher: {init_params}")
print(f"Param find: {find_params}")
print()
scenes = ds["scenes"]
if scene_filter:
scenes = [s for s in scenes if scene_filter in s["image"]]
rows = []
tot_tp = tot_fp = tot_fn = 0
tot_time = 0.0
for sc in scenes:
scene = _load_image(base / sc["image"])
gt = sc.get("ground_truth", [])
result = evaluate_scene(matcher, scene, gt, find_params, tw, th)
rows.append({"scene": sc["image"], **result})
tot_tp += result["tp"]; tot_fp += result["fp"]; tot_fn += result["fn"]
tot_time += result["find_time_s"]
prec = result["tp"] / max(1, result["tp"] + result["fp"])
rec = result["tp"] / max(1, result["tp"] + result["fn"])
line = (f" {sc['image']:30s} "
f"TP={result['tp']} FP={result['fp']} FN={result['fn']} "
f"P={prec:.2f} R={rec:.2f} "
f"IoU={result['iou_mean']:.2f} "
f"t={result['find_time_s']*1000:.0f}ms")
print(line)
if verbose and result["diag"] and hasattr(matcher, "_format_diag"):
print(f" diag: {matcher._format_diag(result['diag'])}")
# Aggregati
precision = tot_tp / max(1, tot_tp + tot_fp)
recall = tot_tp / max(1, tot_tp + tot_fn)
f1 = 2 * precision * recall / max(1e-9, precision + recall)
print()
print(f"AGGREGATO: precision={precision:.3f} recall={recall:.3f} "
f"F1={f1:.3f} TP={tot_tp} FP={tot_fp} FN={tot_fn}")
print(f"TIME: total={tot_time:.2f}s avg={tot_time / max(1, len(scenes)) * 1000:.0f}ms/scene")
return {
"precision": precision, "recall": recall, "f1": f1,
"tp": tot_tp, "fp": tot_fp, "fn": tot_fn,
"total_time_s": tot_time, "n_scenes": len(scenes),
"per_scene": rows,
}
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(
description="pm2d-eval: validation harness per LineShapeMatcher"
)
p.add_argument("dataset", help="JSON dataset (template + scenes + GT)")
p.add_argument("--scene-filter", default=None,
help="Filtro substring sui nomi scena (debug)")
p.add_argument("--verbose", "-v", action="store_true",
help="Stampa diag dict per ogni scena")
p.add_argument("--out", default=None,
help="Salva report JSON su file")
args = p.parse_args(argv)
report = run(args.dataset, scene_filter=args.scene_filter,
verbose=args.verbose)
if args.out:
with open(args.out, "w") as f:
json.dump(report, f, indent=2)
print(f"Report salvato: {args.out}")
return 0 if report["f1"] > 0.5 else 1
if __name__ == "__main__":
sys.exit(main())
-69
View File
@@ -1309,7 +1309,6 @@ class LineShapeMatcher:
min_recall: float = 0.0,
use_soft_score: bool = False,
subpixel_lm: bool = False,
debug: bool = False,
) -> list[Match]:
"""
scale_penalty: se > 0, riduce lo score per match a scala diversa da 1.0:
@@ -1327,32 +1326,6 @@ class LineShapeMatcher:
if not self.variants:
raise RuntimeError("Matcher non addestrato: chiamare train() prima.")
# Diagnostic counter: traccia perche' candidati sono droppati lungo
# la pipeline. Esposto via get_last_diag() o ritornato implicitamente
# se debug=True (vedi sotto).
diag = {
"n_variants_total": len(self.variants),
"n_variants_top_evaluated": 0,
"n_variants_top_passed": 0,
"n_variants_full_evaluated": 0,
"n_raw_candidates": 0,
"n_after_pre_nms": 0,
"drop_ncc_low": 0,
"drop_min_score_post_avg": 0,
"drop_recall_low": 0,
"drop_bbox_out_of_scene": 0,
"drop_nms_iou": 0,
"n_final": 0,
"top_thresh_used": 0.0,
"verify_threshold_used": float(verify_threshold),
"min_score_used": float(min_score),
"min_recall_used": float(min_recall),
"use_polarity": bool(self.use_polarity),
"use_soft_score": bool(use_soft_score),
"subpixel_lm": bool(subpixel_lm),
}
self._last_diag = diag
gray_full = self._to_gray(scene_bgr)
# Applica ROI di ricerca: restringe scena a crop, ricorda offset per
# ri-traslare le coordinate dei match a fine pipeline.
@@ -1395,7 +1368,6 @@ class LineShapeMatcher:
top_factor = max(top_factor, 0.7)
cf_eff = 1
top_thresh = min_score * top_factor
diag["top_thresh_used"] = float(top_thresh)
tw, th = self.template_size
density_top = _jit_popcount(spread_top)
@@ -1481,7 +1453,6 @@ class LineShapeMatcher:
kept_coarse: list[tuple[int, float]] = []
all_top_scores: list[tuple[int, float]] = []
diag["n_variants_top_evaluated"] = len(coarse_idx_list)
# batch_top: usa kernel batch single-call con prange-esterno su
# varianti. Vince su threadpool quando n_vars >> n_threads e quando
# H*W top e' piccolo (overhead chiamate JIT > costo kernel).
@@ -1545,8 +1516,6 @@ class LineShapeMatcher:
kept_variants.sort(key=lambda t: -t[1])
max_vars_full = max(max_matches * 8, len(self.variants) // 2)
kept_variants = kept_variants[:max_vars_full]
diag["n_variants_top_passed"] = len(kept_coarse)
diag["n_variants_full_evaluated"] = len(kept_variants)
# Full-res (parallelizzato) con bitmap
spread0 = self._spread_bitmap(gray0)
@@ -1632,7 +1601,6 @@ class LineShapeMatcher:
raw.append((float(vals[i]), int(xs[i]), int(ys[i]), vi))
raw.sort(key=lambda c: -c[0])
diag["n_raw_candidates"] = len(raw)
# Mappa vi → score_map per subpixel/refinement
score_maps = dict(candidates_per_var)
@@ -1664,7 +1632,6 @@ class LineShapeMatcher:
preliminary_int.append((score, xi, yi, vi))
if len(preliminary_int) >= pre_cap:
break
diag["n_after_pre_nms"] = len(preliminary_int)
# Subpixel + refine + verify solo sui candidati pre-NMS (max pre_cap)
kept: list[Match] = []
@@ -1711,7 +1678,6 @@ class LineShapeMatcher:
view_idx=getattr(var, "view_idx", 0),
)
if ncc < verify_threshold:
diag["drop_ncc_low"] += 1
continue
score_f = (float(score_f) + max(0.0, ncc)) * 0.5
# Soft-margin gradient similarity: sostituisce o integra lo
@@ -1726,7 +1692,6 @@ class LineShapeMatcher:
# abbattere lo shape-score sotto la soglia user. Senza questo
# check apparivano match con score < min_score (UI confusing).
if float(score_f) < min_score:
diag["drop_min_score_post_avg"] += 1
continue
# Feature recall (Halcon MinScore-style): conta quante feature
@@ -1738,7 +1703,6 @@ class LineShapeMatcher:
spread0, var, cx_f, cy_f, ang_f,
)
if recall < min_recall:
diag["drop_recall_low"] += 1
continue
# Ri-traslo coord da spazio crop ROI a spazio scena originale.
@@ -1762,7 +1726,6 @@ class LineShapeMatcher:
)
inside_ratio = float(inter) / poly_area
if inside_ratio < 0.75:
diag["drop_bbox_out_of_scene"] += 1
continue
# Penalità scala opzionale: score degrada con distanza da 1.0
if scale_penalty > 0.0 and var.scale != 1.0:
@@ -1787,7 +1750,6 @@ class LineShapeMatcher:
dup = True
break
if dup:
diag["drop_nms_iou"] += 1
continue
kept.append(Match(
cx=cx_out, cy=cy_out,
@@ -1798,35 +1760,4 @@ class LineShapeMatcher:
))
if len(kept) >= max_matches:
break
diag["n_final"] = len(kept)
if debug:
# Debug mode: stampa diagnostica su stderr per visibilita' immediata.
import sys as _sys
_sys.stderr.write(f"[pm2d.find debug] {self._format_diag(diag)}\n")
return kept
def _format_diag(self, diag: dict) -> str:
"""Formatta dict diagnostica in una linea leggibile."""
return (
f"vars: {diag['n_variants_total']} -> "
f"top_eval={diag['n_variants_top_evaluated']} "
f"top_pass={diag['n_variants_top_passed']} "
f"full_eval={diag['n_variants_full_evaluated']} | "
f"raw={diag['n_raw_candidates']} "
f"pre_nms={diag['n_after_pre_nms']} -> "
f"drop[ncc={diag['drop_ncc_low']}, "
f"score={diag['drop_min_score_post_avg']}, "
f"recall={diag['drop_recall_low']}, "
f"bbox={diag['drop_bbox_out_of_scene']}, "
f"nms={diag['drop_nms_iou']}] = "
f"final={diag['n_final']} (top_thresh={diag['top_thresh_used']:.2f})"
)
def get_last_diag(self) -> dict | None:
"""Ritorna dict diagnostica dell'ultima chiamata find().
Halcon-equivalent: oggi inspect_shape_model espone parziali contatori.
Util per debug 'perche' 0 match', tuning interattivo, validation.
Vedi diag keys per significato (n_variants_top_evaluated, drop_*, ...).
"""
return getattr(self, "_last_diag", None)
+3
View File
@@ -12,6 +12,9 @@ dependencies = [
"uvicorn[standard]>=0.34",
]
[project.scripts]
pm2d-eval = "pm2d.eval:main"
[dependency-groups]
dev = [
"httpx>=0.28.1",