b718e81ccf
Manca il path "load" della V feature: utente poteva salvare ricetta
ma non caricarla dalla UI. Aggiunto:
Server:
- POST /recipes/{name}/load: carica .npz in cache _RECIPE_MATCHERS
- POST /match_recipe: usa matcher caricato senza re-train (zero
training time, solo find params propagati)
UI:
- Dropdown ricette disponibili (auto-refreshed da GET /recipes)
- Bottone "Carica" attiva ricetta + popola state.active_recipe
- Bottone "Stacca" torna al flow normale (training da ROI)
- Status indicator mostra ricetta attiva e dimensioni
doMatch dispatcha automaticamente:
- ricetta attiva → /match_recipe (no model/ROI necessari)
- altrimenti → /match o /match_simple come prima
Use case: ricetta tarata offline, deploy a runtime production senza
ricaricare modello+ROI ogni volta.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
786 lines
26 KiB
Python
786 lines
26 KiB
Python
"""FastAPI webapp standalone per PM2D.
|
|
|
|
Endpoint:
|
|
GET / → HTML UI
|
|
POST /upload → upload immagine (multipart)
|
|
POST /match → JSON params + ids → results
|
|
GET /image/{id}/raw → PNG originale
|
|
GET /image/{id}/annotated → PNG con overlay match
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import os
|
|
import tempfile
|
|
import time
|
|
import uuid
|
|
from collections import OrderedDict
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from fastapi import FastAPI, File, HTTPException, UploadFile
|
|
from fastapi.responses import HTMLResponse, Response
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
|
|
def _load_env(root: Path) -> None:
|
|
"""Legge .env in root e popola os.environ (no override se già set)."""
|
|
f = root / ".env"
|
|
if not f.exists():
|
|
return
|
|
for line in f.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
k, v = line.split("=", 1)
|
|
k = k.strip(); v = v.strip().strip('"').strip("'")
|
|
os.environ.setdefault(k, v)
|
|
|
|
|
|
# Root progetto (parent di pm2d/)
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[2]
|
|
_load_env(PROJECT_ROOT)
|
|
|
|
_images_dir_raw = os.environ.get("IMAGES_DIR", "Test")
|
|
IMAGES_DIR = Path(_images_dir_raw)
|
|
if not IMAGES_DIR.is_absolute():
|
|
IMAGES_DIR = PROJECT_ROOT / IMAGES_DIR
|
|
|
|
# Cartella ricette pre-trained (V feature: save/load matcher)
|
|
RECIPES_DIR = PROJECT_ROOT / "recipes"
|
|
RECIPES_DIR.mkdir(exist_ok=True)
|
|
|
|
from pm2d.line_matcher import LineShapeMatcher, Match
|
|
from pm2d.auto_tune import auto_tune
|
|
|
|
|
|
WEB_DIR = Path(__file__).parent
|
|
STATIC_DIR = WEB_DIR / "static"
|
|
STATIC_DIR.mkdir(exist_ok=True)
|
|
|
|
# Persistenza immagini su disco (sopravvive a restart server)
|
|
CACHE_DIR = Path(tempfile.gettempdir()) / "pm2d_cache"
|
|
CACHE_DIR.mkdir(exist_ok=True)
|
|
|
|
# Cache in-memory (soft, ricaricata da disco se mancante)
|
|
_IMG_CACHE: dict[str, np.ndarray] = {}
|
|
|
|
# Cache matcher addestrati: (roi_hash, params_hash) -> LineShapeMatcher
|
|
# LRU con capacità limitata
|
|
_MATCHER_CACHE: OrderedDict = OrderedDict()
|
|
_MATCHER_CACHE_SIZE = 8
|
|
|
|
|
|
def _matcher_cache_key(roi: np.ndarray, tech: dict) -> str:
|
|
h = hashlib.md5()
|
|
h.update(roi.tobytes())
|
|
# Solo parametri che influenzano il training
|
|
relevant = ("num_features", "weak_grad", "strong_grad",
|
|
"angle_min", "angle_max", "angle_step",
|
|
"scale_min", "scale_max", "scale_step",
|
|
"spread_radius", "pyramid_levels")
|
|
for k in relevant:
|
|
h.update(f"{k}={tech.get(k)}".encode())
|
|
h.update(f"shape={roi.shape}".encode())
|
|
return h.hexdigest()
|
|
|
|
|
|
def _cache_get_matcher(key: str):
|
|
m = _MATCHER_CACHE.get(key)
|
|
if m is not None:
|
|
_MATCHER_CACHE.move_to_end(key) # LRU touch
|
|
return m
|
|
|
|
|
|
def _cache_put_matcher(key: str, matcher) -> None:
|
|
_MATCHER_CACHE[key] = matcher
|
|
_MATCHER_CACHE.move_to_end(key)
|
|
while len(_MATCHER_CACHE) > _MATCHER_CACHE_SIZE:
|
|
_MATCHER_CACHE.popitem(last=False)
|
|
|
|
|
|
def _store_image(img: np.ndarray) -> str:
|
|
iid = uuid.uuid4().hex[:12]
|
|
cv2.imwrite(str(CACHE_DIR / f"{iid}.png"), img)
|
|
_IMG_CACHE[iid] = img
|
|
return iid
|
|
|
|
|
|
def _load_image(iid: str) -> np.ndarray | None:
|
|
cached = _IMG_CACHE.get(iid)
|
|
if cached is not None:
|
|
return cached
|
|
p = CACHE_DIR / f"{iid}.png"
|
|
if not p.exists():
|
|
return None
|
|
img = cv2.imread(str(p))
|
|
if img is not None:
|
|
_IMG_CACHE[iid] = img
|
|
return img
|
|
|
|
app = FastAPI(title="PM2D Webapp", version="1.0.0")
|
|
|
|
|
|
def _encode_png(img: np.ndarray) -> bytes:
|
|
ok, buf = cv2.imencode(".png", img)
|
|
if not ok:
|
|
raise RuntimeError("PNG encode failed")
|
|
return buf.tobytes()
|
|
|
|
|
|
def _draw_matches(scene: np.ndarray, matches: list[Match],
|
|
template_gray: np.ndarray | None) -> np.ndarray:
|
|
out = scene.copy()
|
|
H, W = scene.shape[:2]
|
|
palette = [
|
|
(0, 255, 0), (0, 200, 255), (255, 100, 100), (255, 200, 0),
|
|
(200, 0, 255), (100, 255, 200), (255, 0, 0), (0, 255, 255),
|
|
]
|
|
for i, m in enumerate(matches):
|
|
color = palette[i % len(palette)]
|
|
if template_gray is not None:
|
|
t = template_gray
|
|
th, tw = t.shape
|
|
edge = cv2.Canny(t, 50, 150)
|
|
cx_t = (tw - 1) / 2.0; cy_t = (th - 1) / 2.0
|
|
M = cv2.getRotationMatrix2D((cx_t, cy_t), m.angle_deg, m.scale)
|
|
M[0, 2] += m.cx - cx_t
|
|
M[1, 2] += m.cy - cy_t
|
|
warped = cv2.warpAffine(edge, M, (W, H),
|
|
flags=cv2.INTER_NEAREST, borderValue=0)
|
|
mask = warped > 0
|
|
if mask.any():
|
|
overlay = np.zeros_like(out)
|
|
overlay[mask] = color
|
|
out[mask] = (0.3 * out[mask] + 0.7 * overlay[mask]).astype(np.uint8)
|
|
poly = m.bbox_poly.astype(np.int32).reshape(-1, 1, 2)
|
|
cv2.polylines(out, [poly], True, color, 2, cv2.LINE_AA)
|
|
p0 = tuple(m.bbox_poly[0].astype(int))
|
|
p1 = tuple(m.bbox_poly[1].astype(int))
|
|
cv2.line(out, p0, p1, color, 4, cv2.LINE_AA)
|
|
cx, cy = int(round(m.cx)), int(round(m.cy))
|
|
cv2.drawMarker(out, (cx, cy), color, cv2.MARKER_CROSS, 22, 2, cv2.LINE_AA)
|
|
L = int(np.linalg.norm(m.bbox_poly[1] - m.bbox_poly[0])) // 2
|
|
a = np.deg2rad(m.angle_deg)
|
|
cv2.arrowedLine(out, (cx, cy),
|
|
(int(cx + L * np.cos(a)), int(cy - L * np.sin(a))),
|
|
color, 2, cv2.LINE_AA, tipLength=0.2)
|
|
label = f"#{i+1} {m.angle_deg:.0f}d s={m.scale:.2f} {m.score:.2f}"
|
|
cv2.putText(out, label, (cx + 8, cy - 8),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2, cv2.LINE_AA)
|
|
return out
|
|
|
|
|
|
# ---------------- Models ----------------
|
|
|
|
class UploadResp(BaseModel):
|
|
id: str
|
|
width: int
|
|
height: int
|
|
|
|
|
|
class MatchParams(BaseModel):
|
|
model_id: str
|
|
scene_id: str
|
|
roi: list[int] # [x, y, w, h] nell'immagine modello
|
|
angle_min: float = 0.0
|
|
angle_max: float = 360.0
|
|
angle_step: float = 5.0
|
|
scale_min: float = 1.0
|
|
scale_max: float = 1.0
|
|
scale_step: float = 0.1
|
|
min_score: float = 0.55
|
|
max_matches: int = 25
|
|
nms_radius: int = 0
|
|
num_features: int = 96
|
|
weak_grad: float = 30.0
|
|
strong_grad: float = 60.0
|
|
spread_radius: int = 5
|
|
pyramid_levels: int = 3
|
|
verify_threshold: float = 0.4
|
|
|
|
|
|
class MatchResult(BaseModel):
|
|
cx: float
|
|
cy: float
|
|
angle_deg: float
|
|
scale: float
|
|
score: float
|
|
bbox_poly: list[list[float]]
|
|
|
|
|
|
class MatchResp(BaseModel):
|
|
matches: list[MatchResult]
|
|
train_time: float
|
|
find_time: float
|
|
num_variants: int
|
|
annotated_id: str
|
|
|
|
|
|
class TuneParams(BaseModel):
|
|
model_id: str
|
|
roi: list[int]
|
|
|
|
|
|
# ---------- User-facing (simple) params ----------
|
|
|
|
SYMMETRY_TO_ANGLE_MAX = {
|
|
"invariante": 0.0, # oggetto simmetrico a rotazione totale (cerchi): 1 variante
|
|
"nessuna": 360.0,
|
|
"bilaterale": 180.0,
|
|
"rot_3": 120.0,
|
|
"rot_4": 90.0,
|
|
"rot_6": 60.0,
|
|
"rot_8": 45.0,
|
|
}
|
|
|
|
SCALE_PRESETS = {
|
|
"fissa": (1.0, 1.0, 0.1),
|
|
"mini": (0.9, 1.1, 0.05), # ±10%
|
|
"medio": (0.75, 1.25, 0.05), # ±25%
|
|
"max": (0.5, 1.5, 0.05), # ±50%
|
|
}
|
|
|
|
PRECISION_ANGLE_STEP = {
|
|
"veloce": 10.0,
|
|
"normale": 5.0,
|
|
"preciso": 2.0,
|
|
}
|
|
|
|
# "Filtro falsi positivi" = mapping semantico del verify NCC threshold.
|
|
# Un operatore sceglie il livello di rigore, non un numero astratto.
|
|
FILTRO_FP_MAP = {
|
|
"off": 0.0, # disabilitato: mantieni tutti i match shape-based
|
|
"leggero": 0.30, # tollera variazioni intensità/illuminazione forti
|
|
"medio": 0.50, # default bilanciato (consigliato)
|
|
"forte": 0.70, # scarta match con intensità molto diversa dal template
|
|
}
|
|
|
|
|
|
class SimpleMatchParams(BaseModel):
|
|
model_id: str
|
|
scene_id: str
|
|
roi: list[int]
|
|
tipo: str = "intero" # "intero" | "parziale"
|
|
simmetria: str = "nessuna" # chiave SYMMETRY_TO_ANGLE_MAX
|
|
scala: str = "fissa" # chiave SCALE_PRESETS
|
|
precisione: str = "normale" # chiave PRECISION_ANGLE_STEP
|
|
filtro_fp: str = "medio" # chiave FILTRO_FP_MAP
|
|
penalita_scala: float = 0.0 # 0 = score shape invariante, >0 = penalizza scala != 1
|
|
min_score: float = 0.65
|
|
max_matches: int = 25
|
|
# --- Halcon-mode flags (default off = backward compat) ---
|
|
# Init-time (richiede ri-train se cambiato)
|
|
use_polarity: bool = False # F: 16 bin orientation mod 2pi
|
|
use_gpu: bool = False # R: OpenCL UMat (silent fallback)
|
|
# Find-time (no retrain)
|
|
min_recall: float = 0.0 # M: filtra match con poche feature combaciate
|
|
use_soft_score: bool = False # Y: cosine sim continua dei gradients
|
|
subpixel_lm: bool = False # Z: precisione 0.05 px
|
|
nms_iou_threshold: float = 0.3 # A: IoU bbox poligonale
|
|
coarse_stride: int = 1 # sub-sampling top-level (>=1)
|
|
pyramid_propagate: bool = False # propagazione candidati top->full
|
|
greediness: float = 0.0 # early-exit kernel (0..1)
|
|
refine_pose_joint: bool = False # Nelder-Mead 3D (cx, cy, angle)
|
|
search_roi: list[int] | None = None # [x, y, w, h] limita area
|
|
|
|
|
|
def _simple_to_technical(
|
|
p: SimpleMatchParams, roi_img: np.ndarray,
|
|
) -> dict:
|
|
"""Converti parametri user-facing → tecnici usando analisi della ROI."""
|
|
from pm2d.auto_tune import auto_tune as _auto
|
|
|
|
tune = _auto(roi_img)
|
|
h, w = roi_img.shape[:2]
|
|
min_side = min(h, w)
|
|
|
|
# Feature count: parziale = meno feature (area minore)
|
|
nf = tune["num_features"]
|
|
if p.tipo == "parziale":
|
|
nf = max(32, int(nf * 0.6))
|
|
|
|
# Piramide derivata da dimensione ROI
|
|
if min_side < 60:
|
|
pyr = 1
|
|
elif min_side < 150:
|
|
pyr = 2
|
|
elif min_side < 400:
|
|
pyr = 3
|
|
else:
|
|
pyr = 4
|
|
|
|
# Spread radius ~2-3% del lato minimo
|
|
spread = max(3, min(10, int(round(min_side * 0.03))))
|
|
|
|
angle_max = SYMMETRY_TO_ANGLE_MAX.get(p.simmetria, 360.0)
|
|
smin, smax, sstep = SCALE_PRESETS.get(p.scala, (1.0, 1.0, 0.1))
|
|
ang_step = PRECISION_ANGLE_STEP.get(p.precisione, 5.0)
|
|
|
|
return {
|
|
"num_features": nf,
|
|
"weak_grad": tune["weak_grad"],
|
|
"strong_grad": tune["strong_grad"],
|
|
"spread_radius": spread,
|
|
"pyramid_levels": pyr,
|
|
"angle_min": 0.0,
|
|
"angle_max": angle_max,
|
|
"angle_step": ang_step,
|
|
"scale_min": smin,
|
|
"scale_max": smax,
|
|
"scale_step": sstep,
|
|
"min_score": p.min_score,
|
|
"max_matches": p.max_matches,
|
|
"nms_radius": 0,
|
|
"verify_threshold": FILTRO_FP_MAP.get(p.filtro_fp, 0.35),
|
|
"scale_penalty": p.penalita_scala,
|
|
}
|
|
|
|
|
|
# ---------------- Endpoints ----------------
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def index():
|
|
html_path = STATIC_DIR / "index.html"
|
|
return HTMLResponse(html_path.read_text(encoding="utf-8"))
|
|
|
|
|
|
@app.post("/upload_to_folder")
|
|
async def upload_to_folder(file: UploadFile = File(...)):
|
|
"""Salva file caricato nella cartella IMAGES_DIR. Ritorna lista aggiornata."""
|
|
if not IMAGES_DIR.is_dir():
|
|
raise HTTPException(500, f"IMAGES_DIR non esiste: {IMAGES_DIR}")
|
|
# Sanitizza nome file (no traversal)
|
|
name = Path(file.filename or "upload.png").name
|
|
if not name:
|
|
raise HTTPException(400, "nome file vuoto")
|
|
ext = Path(name).suffix.lower()
|
|
allowed = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}
|
|
if ext not in allowed:
|
|
raise HTTPException(400, f"estensione non supportata: {ext}")
|
|
# Leggi contenuto e valida come immagine
|
|
data = await file.read()
|
|
arr = np.frombuffer(data, dtype=np.uint8)
|
|
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
raise HTTPException(400, "file non è un'immagine valida")
|
|
# Evita overwrite: se esiste, aggiungi suffisso numerico
|
|
target = IMAGES_DIR / name
|
|
if target.exists():
|
|
stem = target.stem; suffix = target.suffix
|
|
i = 1
|
|
while True:
|
|
alt = IMAGES_DIR / f"{stem}_{i}{suffix}"
|
|
if not alt.exists():
|
|
target = alt; break
|
|
i += 1
|
|
# Scrivi su disco
|
|
with open(target, "wb") as f:
|
|
f.write(data)
|
|
# Ritorna lista aggiornata
|
|
return {
|
|
"saved_as": target.name,
|
|
"dir": str(IMAGES_DIR),
|
|
"files": sorted(
|
|
p.name for p in IMAGES_DIR.iterdir()
|
|
if p.is_file() and p.suffix.lower() in allowed
|
|
),
|
|
}
|
|
|
|
|
|
@app.get("/folder_image/{filename}")
|
|
def folder_image(filename: str, w: int = 120):
|
|
"""Serve thumbnail PNG dell'immagine IMAGES_DIR (scalata a width w)."""
|
|
if "/" in filename or ".." in filename:
|
|
raise HTTPException(400, "nome non valido")
|
|
path = IMAGES_DIR / filename
|
|
if not path.is_file():
|
|
raise HTTPException(404, "non trovato")
|
|
img = cv2.imread(str(path), cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
raise HTTPException(400, "non leggibile")
|
|
h0, w0 = img.shape[:2]
|
|
if w0 > w:
|
|
sc = w / w0
|
|
img = cv2.resize(img, (w, int(h0 * sc)), interpolation=cv2.INTER_AREA)
|
|
return Response(_encode_png(img), media_type="image/png",
|
|
headers={"Cache-Control": "public, max-age=3600"})
|
|
|
|
|
|
@app.get("/images")
|
|
def list_images():
|
|
"""Lista file immagine nella cartella configurata in IMAGES_DIR."""
|
|
if not IMAGES_DIR.is_dir():
|
|
return {"dir": str(IMAGES_DIR), "files": []}
|
|
exts = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}
|
|
files = sorted(
|
|
p.name for p in IMAGES_DIR.iterdir()
|
|
if p.is_file() and p.suffix.lower() in exts
|
|
)
|
|
return {"dir": str(IMAGES_DIR), "files": files}
|
|
|
|
|
|
class LoadFolderReq(BaseModel):
|
|
filename: str
|
|
|
|
|
|
@app.post("/load_from_folder", response_model=UploadResp)
|
|
def load_from_folder(req: LoadFolderReq):
|
|
"""Carica immagine dalla cartella IMAGES_DIR per nome file."""
|
|
name = req.filename
|
|
if "/" in name or ".." in name:
|
|
raise HTTPException(400, "nome file non valido")
|
|
path = IMAGES_DIR / name
|
|
if not path.is_file():
|
|
raise HTTPException(404, f"file non trovato: {name}")
|
|
img = cv2.imread(str(path), cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
raise HTTPException(400, "immagine non leggibile")
|
|
iid = _store_image(img)
|
|
return UploadResp(id=iid, width=img.shape[1], height=img.shape[0])
|
|
|
|
|
|
@app.post("/upload", response_model=UploadResp)
|
|
async def upload(file: UploadFile = File(...)):
|
|
data = await file.read()
|
|
arr = np.frombuffer(data, dtype=np.uint8)
|
|
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
raise HTTPException(400, "Immagine non valida")
|
|
iid = _store_image(img)
|
|
return UploadResp(id=iid, width=img.shape[1], height=img.shape[0])
|
|
|
|
|
|
@app.get("/image/{iid}/raw")
|
|
def image_raw(iid: str):
|
|
img = _load_image(iid)
|
|
if img is None:
|
|
raise HTTPException(404, "Image not found")
|
|
return Response(_encode_png(img), media_type="image/png")
|
|
|
|
|
|
@app.post("/match", response_model=MatchResp)
|
|
def match(p: MatchParams):
|
|
model = _load_image(p.model_id)
|
|
scene = _load_image(p.scene_id)
|
|
if model is None or scene is None:
|
|
raise HTTPException(404, "Immagini non trovate")
|
|
x, y, w, h = p.roi
|
|
x = max(0, x); y = max(0, y)
|
|
w = max(1, min(w, model.shape[1] - x))
|
|
h = max(1, min(h, model.shape[0] - y))
|
|
roi_img = model[y:y + h, x:x + w]
|
|
|
|
tech_for_cache = {
|
|
"num_features": p.num_features,
|
|
"weak_grad": p.weak_grad, "strong_grad": p.strong_grad,
|
|
"angle_min": p.angle_min, "angle_max": p.angle_max,
|
|
"angle_step": p.angle_step,
|
|
"scale_min": p.scale_min, "scale_max": p.scale_max,
|
|
"scale_step": p.scale_step,
|
|
"spread_radius": p.spread_radius,
|
|
"pyramid_levels": p.pyramid_levels,
|
|
}
|
|
key = _matcher_cache_key(roi_img, tech_for_cache)
|
|
m = _cache_get_matcher(key)
|
|
if m is None:
|
|
m = LineShapeMatcher(
|
|
num_features=p.num_features,
|
|
weak_grad=p.weak_grad, strong_grad=p.strong_grad,
|
|
angle_range_deg=(p.angle_min, p.angle_max),
|
|
angle_step_deg=p.angle_step,
|
|
scale_range=(p.scale_min, p.scale_max),
|
|
scale_step=p.scale_step,
|
|
spread_radius=p.spread_radius,
|
|
pyramid_levels=p.pyramid_levels,
|
|
)
|
|
t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0
|
|
_cache_put_matcher(key, m)
|
|
else:
|
|
n = len(m.variants); t_train = 0.0
|
|
nms = p.nms_radius if p.nms_radius > 0 else None
|
|
t0 = time.time()
|
|
matches = m.find(
|
|
scene, min_score=p.min_score, max_matches=p.max_matches,
|
|
nms_radius=nms, verify_threshold=p.verify_threshold,
|
|
)
|
|
t_find = time.time() - t0
|
|
|
|
# Render annotated image
|
|
tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
|
|
annotated = _draw_matches(scene, matches, tg)
|
|
ann_id = _store_image(annotated)
|
|
|
|
return MatchResp(
|
|
matches=[MatchResult(
|
|
cx=m_.cx, cy=m_.cy, angle_deg=m_.angle_deg, scale=m_.scale,
|
|
score=m_.score,
|
|
bbox_poly=m_.bbox_poly.tolist(),
|
|
) for m_ in matches],
|
|
train_time=t_train, find_time=t_find,
|
|
num_variants=n, annotated_id=ann_id,
|
|
)
|
|
|
|
|
|
@app.post("/match_simple", response_model=MatchResp)
|
|
def match_simple(p: SimpleMatchParams):
|
|
"""Match con parametri user-facing (tipo/simmetria/scala/precisione).
|
|
|
|
Il server deriva i parametri tecnici (num_features, soglie gradiente,
|
|
piramide, ecc.) dall'analisi automatica della ROI.
|
|
"""
|
|
model = _load_image(p.model_id)
|
|
scene = _load_image(p.scene_id)
|
|
if model is None or scene is None:
|
|
raise HTTPException(404, "Immagini non trovate")
|
|
x, y, w, h = p.roi
|
|
x = max(0, x); y = max(0, y)
|
|
w = max(1, min(w, model.shape[1] - x))
|
|
h = max(1, min(h, model.shape[0] - y))
|
|
roi_img = model[y:y + h, x:x + w]
|
|
|
|
tech = _simple_to_technical(p, roi_img)
|
|
|
|
key = _matcher_cache_key(roi_img, tech)
|
|
# Halcon-mode init params: incidono sul training, includere in cache key
|
|
halcon_init_key = f"|pol={p.use_polarity}|gpu={p.use_gpu}"
|
|
key = key + halcon_init_key
|
|
m = _cache_get_matcher(key)
|
|
if m is None:
|
|
m = LineShapeMatcher(
|
|
num_features=tech["num_features"],
|
|
weak_grad=tech["weak_grad"], strong_grad=tech["strong_grad"],
|
|
angle_range_deg=(tech["angle_min"], tech["angle_max"]),
|
|
angle_step_deg=tech["angle_step"],
|
|
scale_range=(tech["scale_min"], tech["scale_max"]),
|
|
scale_step=tech["scale_step"],
|
|
spread_radius=tech["spread_radius"],
|
|
pyramid_levels=tech["pyramid_levels"],
|
|
use_polarity=p.use_polarity,
|
|
use_gpu=p.use_gpu,
|
|
)
|
|
t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0
|
|
_cache_put_matcher(key, m)
|
|
else:
|
|
n = len(m.variants); t_train = 0.0
|
|
nms = tech["nms_radius"] if tech["nms_radius"] > 0 else None
|
|
search_roi_t = tuple(p.search_roi) if p.search_roi else None
|
|
t0 = time.time()
|
|
matches = m.find(
|
|
scene, min_score=tech["min_score"], max_matches=tech["max_matches"],
|
|
nms_radius=nms, verify_threshold=tech["verify_threshold"],
|
|
scale_penalty=tech.get("scale_penalty", 0.0),
|
|
# Halcon-mode flags
|
|
min_recall=p.min_recall,
|
|
use_soft_score=p.use_soft_score,
|
|
subpixel_lm=p.subpixel_lm,
|
|
nms_iou_threshold=p.nms_iou_threshold,
|
|
coarse_stride=p.coarse_stride,
|
|
pyramid_propagate=p.pyramid_propagate,
|
|
greediness=p.greediness,
|
|
refine_pose_joint=p.refine_pose_joint,
|
|
search_roi=search_roi_t,
|
|
)
|
|
t_find = time.time() - t0
|
|
|
|
tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
|
|
annotated = _draw_matches(scene, matches, tg)
|
|
ann_id = _store_image(annotated)
|
|
|
|
return MatchResp(
|
|
matches=[MatchResult(
|
|
cx=mt.cx, cy=mt.cy, angle_deg=mt.angle_deg, scale=mt.scale,
|
|
score=mt.score, bbox_poly=mt.bbox_poly.tolist(),
|
|
) for mt in matches],
|
|
train_time=t_train, find_time=t_find,
|
|
num_variants=n, annotated_id=ann_id,
|
|
)
|
|
|
|
|
|
@app.post("/auto_tune")
|
|
def tune(p: TuneParams):
|
|
model = _load_image(p.model_id)
|
|
if model is None:
|
|
raise HTTPException(404, "Immagine non trovata")
|
|
x, y, w, h = p.roi
|
|
roi_img = model[y:y + h, x:x + w]
|
|
t = auto_tune(roi_img)
|
|
# Esponi parametri tecnici + meta diagnostica (_self_score, _validation,
|
|
# _symmetry_order, _orient_entropy) per feedback UI.
|
|
return t
|
|
|
|
|
|
# --- V: Save/Load ricette pre-trained ---
|
|
|
|
class SaveRecipeParams(BaseModel):
|
|
model_id: str
|
|
scene_id: str | None = None
|
|
roi: list[int]
|
|
# Riusa stessi param simple per training equivalente
|
|
tipo: str = "intero"
|
|
simmetria: str = "nessuna"
|
|
scala: str = "fissa"
|
|
precisione: str = "normale"
|
|
use_polarity: bool = False
|
|
use_gpu: bool = False
|
|
name: str # nome file ricetta (no path)
|
|
|
|
|
|
@app.post("/recipes")
|
|
def save_recipe(p: SaveRecipeParams):
|
|
"""Allena matcher e salva su disco come ricetta riutilizzabile."""
|
|
model = _load_image(p.model_id)
|
|
if model is None:
|
|
raise HTTPException(404, "Modello non trovato")
|
|
x, y, w, h = p.roi
|
|
roi_img = model[y:y + h, x:x + w]
|
|
sp = SimpleMatchParams(
|
|
model_id=p.model_id, scene_id=p.scene_id or p.model_id, roi=p.roi,
|
|
tipo=p.tipo, simmetria=p.simmetria, scala=p.scala,
|
|
precisione=p.precisione,
|
|
use_polarity=p.use_polarity, use_gpu=p.use_gpu,
|
|
)
|
|
tech = _simple_to_technical(sp, roi_img)
|
|
m = LineShapeMatcher(
|
|
num_features=tech["num_features"],
|
|
weak_grad=tech["weak_grad"], strong_grad=tech["strong_grad"],
|
|
angle_range_deg=(tech["angle_min"], tech["angle_max"]),
|
|
angle_step_deg=tech["angle_step"],
|
|
scale_range=(tech["scale_min"], tech["scale_max"]),
|
|
scale_step=tech["scale_step"],
|
|
spread_radius=tech["spread_radius"],
|
|
pyramid_levels=tech["pyramid_levels"],
|
|
use_polarity=p.use_polarity,
|
|
use_gpu=p.use_gpu,
|
|
)
|
|
m.train(roi_img)
|
|
safe_name = "".join(c for c in p.name if c.isalnum() or c in "._-")
|
|
if not safe_name:
|
|
raise HTTPException(400, "Nome ricetta non valido")
|
|
if not safe_name.endswith(".npz"):
|
|
safe_name += ".npz"
|
|
target = RECIPES_DIR / safe_name
|
|
m.save_model(str(target))
|
|
return {"name": safe_name, "size": target.stat().st_size,
|
|
"n_variants": len(m.variants)}
|
|
|
|
|
|
@app.get("/recipes")
|
|
def list_recipes():
|
|
files = []
|
|
if RECIPES_DIR.is_dir():
|
|
for f in sorted(RECIPES_DIR.glob("*.npz")):
|
|
files.append({"name": f.name, "size": f.stat().st_size})
|
|
return {"files": files, "dir": str(RECIPES_DIR)}
|
|
|
|
|
|
# Cache di matcher caricati da .npz (V feature). Key: nome ricetta.
|
|
_RECIPE_MATCHERS: OrderedDict = OrderedDict()
|
|
_RECIPE_MATCHERS_SIZE = 4
|
|
|
|
|
|
@app.post("/recipes/{name}/load")
|
|
def load_recipe(name: str):
|
|
"""Carica ricetta .npz e popola cache matcher in memoria.
|
|
|
|
Una volta caricata, /match_recipe la usa direttamente senza
|
|
re-train. Halcon-equivalent read_shape_model + handle.
|
|
"""
|
|
safe_name = "".join(c for c in name if c.isalnum() or c in "._-")
|
|
if not safe_name.endswith(".npz"):
|
|
safe_name += ".npz"
|
|
path = RECIPES_DIR / safe_name
|
|
if not path.is_file():
|
|
raise HTTPException(404, f"Ricetta non trovata: {safe_name}")
|
|
m = LineShapeMatcher.load_model(str(path))
|
|
_RECIPE_MATCHERS[safe_name] = m
|
|
_RECIPE_MATCHERS.move_to_end(safe_name)
|
|
while len(_RECIPE_MATCHERS) > _RECIPE_MATCHERS_SIZE:
|
|
_RECIPE_MATCHERS.popitem(last=False)
|
|
return {
|
|
"name": safe_name,
|
|
"n_variants": len(m.variants),
|
|
"template_size": list(m.template_size),
|
|
"use_polarity": m.use_polarity,
|
|
}
|
|
|
|
|
|
class RecipeMatchParams(BaseModel):
|
|
recipe: str
|
|
scene_id: str
|
|
# Solo find-time params (training gia' fatto offline)
|
|
min_score: float = 0.65
|
|
max_matches: int = 25
|
|
min_recall: float = 0.0
|
|
use_soft_score: bool = False
|
|
subpixel_lm: bool = False
|
|
nms_iou_threshold: float = 0.3
|
|
coarse_stride: int = 1
|
|
pyramid_propagate: bool = False
|
|
greediness: float = 0.0
|
|
refine_pose_joint: bool = False
|
|
search_roi: list[int] | None = None
|
|
verify_threshold: float = 0.5
|
|
scale_penalty: float = 0.0
|
|
|
|
|
|
@app.post("/match_recipe", response_model=MatchResp)
|
|
def match_recipe(p: RecipeMatchParams):
|
|
"""Match con ricetta pre-trained: zero training, solo find."""
|
|
safe_name = p.recipe if p.recipe.endswith(".npz") else f"{p.recipe}.npz"
|
|
m = _RECIPE_MATCHERS.get(safe_name)
|
|
if m is None:
|
|
# Auto-load on demand
|
|
path = RECIPES_DIR / safe_name
|
|
if not path.is_file():
|
|
raise HTTPException(404, f"Ricetta non trovata: {safe_name}")
|
|
m = LineShapeMatcher.load_model(str(path))
|
|
_RECIPE_MATCHERS[safe_name] = m
|
|
scene = _load_image(p.scene_id)
|
|
if scene is None:
|
|
raise HTTPException(404, "Scena non trovata")
|
|
search_roi_t = tuple(p.search_roi) if p.search_roi else None
|
|
t0 = time.time()
|
|
matches = m.find(
|
|
scene,
|
|
min_score=p.min_score, max_matches=p.max_matches,
|
|
verify_threshold=p.verify_threshold,
|
|
scale_penalty=p.scale_penalty,
|
|
min_recall=p.min_recall,
|
|
use_soft_score=p.use_soft_score,
|
|
subpixel_lm=p.subpixel_lm,
|
|
nms_iou_threshold=p.nms_iou_threshold,
|
|
coarse_stride=p.coarse_stride,
|
|
pyramid_propagate=p.pyramid_propagate,
|
|
greediness=p.greediness,
|
|
refine_pose_joint=p.refine_pose_joint,
|
|
search_roi=search_roi_t,
|
|
)
|
|
t_find = time.time() - t0
|
|
tg = m.template_gray if m.template_gray is not None else np.zeros((1, 1), np.uint8)
|
|
annotated = _draw_matches(scene, matches, tg)
|
|
ann_id = _store_image(annotated)
|
|
return MatchResp(
|
|
matches=[MatchResult(
|
|
cx=mt.cx, cy=mt.cy, angle_deg=mt.angle_deg, scale=mt.scale,
|
|
score=mt.score, bbox_poly=mt.bbox_poly.tolist(),
|
|
) for mt in matches],
|
|
train_time=0.0, find_time=t_find,
|
|
num_variants=len(m.variants), annotated_id=ann_id,
|
|
)
|
|
|
|
|
|
# Mount static
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
|
|
|
|
|
|
def serve(host: str = "127.0.0.1", port: int = 8080):
|
|
import uvicorn
|
|
uvicorn.run(app, host=host, port=port, log_level="info")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
serve()
|