1cc7881a51
Tool da CLI per misurare oggettivamente la qualita' del matcher
su dataset etichettato. Halcon ha questo solo nell'IDE (HDevelop),
qui esposto come modulo Python testabile in CI.
Format dataset JSON:
- template + mask
- params init matcher (override)
- find_params (override per find())
- scenes con ground_truth: lista pose attese (cx, cy, angle, scale,
tolerance_px, tolerance_deg)
Metriche per scena: TP/FP/FN, precision, recall, IoU medio bbox,
tempo find. Aggregato: precision globale, recall, F1.
Match-to-GT criterio: distanza centro <= tolerance_px AND
|angle| <= tolerance_deg, oppure IoU bbox >= 0.3.
Use case:
- regressione: confronto config A vs B oggettivo
- tuning: trovare param ottimi via grid-search guidato da F1
- validazione pre-deploy: report TP/FP/FN su dataset prod
Esposto come entry-point pm2d-eval (pyproject.toml).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
218 lines
7.3 KiB
Python
218 lines
7.3 KiB
Python
"""CLI validation harness per LineShapeMatcher.
|
|
|
|
Usage:
|
|
python -m pm2d.eval dataset.json [opzioni]
|
|
|
|
Formato dataset (JSON):
|
|
{
|
|
"template": "path/to/template.png",
|
|
"mask": "path/to/mask.png", # opzionale
|
|
"params": { # opzionali, override su matcher init
|
|
"use_polarity": true,
|
|
"angle_step_deg": 5,
|
|
...
|
|
},
|
|
"find_params": { # opzionali, passati a find()
|
|
"min_score": 0.6,
|
|
"use_soft_score": true,
|
|
...
|
|
},
|
|
"scenes": [
|
|
{
|
|
"image": "path/to/scene1.png",
|
|
"ground_truth": [
|
|
{"cx": 320.0, "cy": 240.0, "angle_deg": 12.0,
|
|
"scale": 1.0, "tolerance_px": 5.0,
|
|
"tolerance_deg": 3.0}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
Output: report precision/recall/IoU/timing per ogni scena + aggregati.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from pm2d.line_matcher import LineShapeMatcher, _poly_iou, _oriented_bbox_polygon
|
|
|
|
|
|
def _load_image(path: str | Path) -> np.ndarray:
|
|
img = cv2.imread(str(path), cv2.IMREAD_UNCHANGED)
|
|
if img is None:
|
|
raise FileNotFoundError(f"Immagine non trovata: {path}")
|
|
if img.ndim == 2:
|
|
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
|
|
return img
|
|
|
|
|
|
def _gt_to_poly(gt: dict, tw: int, th: int) -> np.ndarray:
|
|
"""Costruisce bbox poligonale per un ground truth."""
|
|
s = float(gt.get("scale", 1.0))
|
|
return _oriented_bbox_polygon(
|
|
float(gt["cx"]), float(gt["cy"]),
|
|
tw * s, th * s, float(gt["angle_deg"]),
|
|
)
|
|
|
|
|
|
def _match_to_gt(match, gt: dict, tw: int, th: int,
|
|
iou_thr: float = 0.3) -> bool:
|
|
"""True se il match corrisponde al ground truth.
|
|
|
|
Criterio: distanza centro <= tolerance_px AND |angle_deg - gt| <= tolerance_deg
|
|
OR IoU bbox >= iou_thr (fallback per pose con tolerance ampie).
|
|
"""
|
|
tol_px = float(gt.get("tolerance_px", 5.0))
|
|
tol_deg = float(gt.get("tolerance_deg", 3.0))
|
|
dx = match.cx - float(gt["cx"])
|
|
dy = match.cy - float(gt["cy"])
|
|
dist = math.hypot(dx, dy)
|
|
da = abs((match.angle_deg - float(gt["angle_deg"]) + 180) % 360 - 180)
|
|
if dist <= tol_px and da <= tol_deg:
|
|
return True
|
|
# Fallback IoU
|
|
poly_gt = _gt_to_poly(gt, tw, th)
|
|
poly_m = match.bbox_poly
|
|
if _poly_iou(poly_m, poly_gt) >= iou_thr:
|
|
return True
|
|
return False
|
|
|
|
|
|
def evaluate_scene(matcher: LineShapeMatcher, scene_bgr: np.ndarray,
|
|
gt_list: list[dict], find_params: dict,
|
|
tw: int, th: int) -> dict:
|
|
"""Esegue match e calcola TP/FP/FN per una scena."""
|
|
t0 = time.time()
|
|
matches = matcher.find(scene_bgr, **find_params)
|
|
elapsed = time.time() - t0
|
|
|
|
gt_matched = [False] * len(gt_list)
|
|
match_is_tp = [False] * len(matches)
|
|
iou_per_match = [0.0] * len(matches)
|
|
for i, m in enumerate(matches):
|
|
for j, gt in enumerate(gt_list):
|
|
if gt_matched[j]:
|
|
continue
|
|
if _match_to_gt(m, gt, tw, th):
|
|
gt_matched[j] = True
|
|
match_is_tp[i] = True
|
|
# Calcolo IoU per metrica
|
|
poly_gt = _gt_to_poly(gt, tw, th)
|
|
iou_per_match[i] = _poly_iou(m.bbox_poly, poly_gt)
|
|
break
|
|
tp = sum(match_is_tp)
|
|
fp = len(matches) - tp
|
|
fn = len(gt_list) - sum(gt_matched)
|
|
return {
|
|
"n_matches": len(matches),
|
|
"n_gt": len(gt_list),
|
|
"tp": tp, "fp": fp, "fn": fn,
|
|
"find_time_s": elapsed,
|
|
"iou_mean": float(np.mean([i for i, t in zip(iou_per_match, match_is_tp) if t])
|
|
if tp > 0 else 0.0),
|
|
"diag": (matcher.get_last_diag()
|
|
if hasattr(matcher, "get_last_diag") else None),
|
|
}
|
|
|
|
|
|
def run(dataset_path: str, scene_filter: str | None = None,
|
|
verbose: bool = False) -> dict:
|
|
"""Esegue eval su dataset, ritorna report aggregato."""
|
|
dataset_path = Path(dataset_path)
|
|
base = dataset_path.parent
|
|
with open(dataset_path) as f:
|
|
ds = json.load(f)
|
|
|
|
template = _load_image(base / ds["template"])
|
|
mask = None
|
|
if ds.get("mask"):
|
|
mask_img = cv2.imread(str(base / ds["mask"]), cv2.IMREAD_GRAYSCALE)
|
|
if mask_img is not None:
|
|
mask = (mask_img > 128).astype(np.uint8) * 255
|
|
init_params = ds.get("params", {})
|
|
find_params = ds.get("find_params", {})
|
|
|
|
matcher = LineShapeMatcher(**init_params)
|
|
n_var = matcher.train(template, mask=mask)
|
|
tw, th = matcher.template_size
|
|
print(f"Template: {ds['template']} ({tw}x{th}), {n_var} varianti")
|
|
print(f"Param matcher: {init_params}")
|
|
print(f"Param find: {find_params}")
|
|
print()
|
|
|
|
scenes = ds["scenes"]
|
|
if scene_filter:
|
|
scenes = [s for s in scenes if scene_filter in s["image"]]
|
|
|
|
rows = []
|
|
tot_tp = tot_fp = tot_fn = 0
|
|
tot_time = 0.0
|
|
for sc in scenes:
|
|
scene = _load_image(base / sc["image"])
|
|
gt = sc.get("ground_truth", [])
|
|
result = evaluate_scene(matcher, scene, gt, find_params, tw, th)
|
|
rows.append({"scene": sc["image"], **result})
|
|
tot_tp += result["tp"]; tot_fp += result["fp"]; tot_fn += result["fn"]
|
|
tot_time += result["find_time_s"]
|
|
prec = result["tp"] / max(1, result["tp"] + result["fp"])
|
|
rec = result["tp"] / max(1, result["tp"] + result["fn"])
|
|
line = (f" {sc['image']:30s} "
|
|
f"TP={result['tp']} FP={result['fp']} FN={result['fn']} "
|
|
f"P={prec:.2f} R={rec:.2f} "
|
|
f"IoU={result['iou_mean']:.2f} "
|
|
f"t={result['find_time_s']*1000:.0f}ms")
|
|
print(line)
|
|
if verbose and result["diag"] and hasattr(matcher, "_format_diag"):
|
|
print(f" diag: {matcher._format_diag(result['diag'])}")
|
|
|
|
# Aggregati
|
|
precision = tot_tp / max(1, tot_tp + tot_fp)
|
|
recall = tot_tp / max(1, tot_tp + tot_fn)
|
|
f1 = 2 * precision * recall / max(1e-9, precision + recall)
|
|
print()
|
|
print(f"AGGREGATO: precision={precision:.3f} recall={recall:.3f} "
|
|
f"F1={f1:.3f} TP={tot_tp} FP={tot_fp} FN={tot_fn}")
|
|
print(f"TIME: total={tot_time:.2f}s avg={tot_time / max(1, len(scenes)) * 1000:.0f}ms/scene")
|
|
|
|
return {
|
|
"precision": precision, "recall": recall, "f1": f1,
|
|
"tp": tot_tp, "fp": tot_fp, "fn": tot_fn,
|
|
"total_time_s": tot_time, "n_scenes": len(scenes),
|
|
"per_scene": rows,
|
|
}
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
p = argparse.ArgumentParser(
|
|
description="pm2d-eval: validation harness per LineShapeMatcher"
|
|
)
|
|
p.add_argument("dataset", help="JSON dataset (template + scenes + GT)")
|
|
p.add_argument("--scene-filter", default=None,
|
|
help="Filtro substring sui nomi scena (debug)")
|
|
p.add_argument("--verbose", "-v", action="store_true",
|
|
help="Stampa diag dict per ogni scena")
|
|
p.add_argument("--out", default=None,
|
|
help="Salva report JSON su file")
|
|
args = p.parse_args(argv)
|
|
report = run(args.dataset, scene_filter=args.scene_filter,
|
|
verbose=args.verbose)
|
|
if args.out:
|
|
with open(args.out, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"Report salvato: {args.out}")
|
|
return 0 if report["f1"] > 0.5 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|