9fba46d7f7
Default precedenti scartavano match validi con variazione intensita attorno al foro (verify NCC 0.4 troppo stretto). Nuovo: 4/5 fori corona dentata trovati con default. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
387 lines
12 KiB
Python
387 lines
12 KiB
Python
"""FastAPI webapp standalone per PM2D.
|
|
|
|
Endpoint:
|
|
GET / → HTML UI
|
|
POST /upload → upload immagine (multipart)
|
|
POST /match → JSON params + ids → results
|
|
GET /image/{id}/raw → PNG originale
|
|
GET /image/{id}/annotated → PNG con overlay match
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import io
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from fastapi import FastAPI, File, HTTPException, UploadFile
|
|
from fastapi.responses import FileResponse, HTMLResponse, Response, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
from pm2d.line_matcher import LineShapeMatcher, Match
|
|
from pm2d.auto_tune import auto_tune
|
|
|
|
|
|
WEB_DIR = Path(__file__).parent
|
|
STATIC_DIR = WEB_DIR / "static"
|
|
STATIC_DIR.mkdir(exist_ok=True)
|
|
|
|
# In-memory image store: id → np.ndarray BGR
|
|
_IMAGES: dict[str, np.ndarray] = {}
|
|
# Cache del last annotated: (image_id, matches_hash) → png bytes
|
|
_ANNOT_CACHE: dict[tuple[str, int], bytes] = {}
|
|
|
|
app = FastAPI(title="PM2D Webapp", version="1.0.0")
|
|
|
|
|
|
def _store_image(img: np.ndarray) -> str:
|
|
iid = uuid.uuid4().hex[:12]
|
|
_IMAGES[iid] = img
|
|
return iid
|
|
|
|
|
|
def _encode_png(img: np.ndarray) -> bytes:
|
|
ok, buf = cv2.imencode(".png", img)
|
|
if not ok:
|
|
raise RuntimeError("PNG encode failed")
|
|
return buf.tobytes()
|
|
|
|
|
|
def _draw_matches(scene: np.ndarray, matches: list[Match],
|
|
template_gray: np.ndarray | None) -> np.ndarray:
|
|
out = scene.copy()
|
|
H, W = scene.shape[:2]
|
|
palette = [
|
|
(0, 255, 0), (0, 200, 255), (255, 100, 100), (255, 200, 0),
|
|
(200, 0, 255), (100, 255, 200), (255, 0, 0), (0, 255, 255),
|
|
]
|
|
for i, m in enumerate(matches):
|
|
color = palette[i % len(palette)]
|
|
if template_gray is not None:
|
|
t = template_gray
|
|
th, tw = t.shape
|
|
edge = cv2.Canny(t, 50, 150)
|
|
cx_t = (tw - 1) / 2.0; cy_t = (th - 1) / 2.0
|
|
M = cv2.getRotationMatrix2D((cx_t, cy_t), m.angle_deg, m.scale)
|
|
M[0, 2] += m.cx - cx_t
|
|
M[1, 2] += m.cy - cy_t
|
|
warped = cv2.warpAffine(edge, M, (W, H),
|
|
flags=cv2.INTER_NEAREST, borderValue=0)
|
|
mask = warped > 0
|
|
if mask.any():
|
|
overlay = np.zeros_like(out)
|
|
overlay[mask] = color
|
|
out[mask] = (0.3 * out[mask] + 0.7 * overlay[mask]).astype(np.uint8)
|
|
poly = m.bbox_poly.astype(np.int32).reshape(-1, 1, 2)
|
|
cv2.polylines(out, [poly], True, color, 2, cv2.LINE_AA)
|
|
p0 = tuple(m.bbox_poly[0].astype(int))
|
|
p1 = tuple(m.bbox_poly[1].astype(int))
|
|
cv2.line(out, p0, p1, color, 4, cv2.LINE_AA)
|
|
cx, cy = int(round(m.cx)), int(round(m.cy))
|
|
cv2.drawMarker(out, (cx, cy), color, cv2.MARKER_CROSS, 22, 2, cv2.LINE_AA)
|
|
L = int(np.linalg.norm(m.bbox_poly[1] - m.bbox_poly[0])) // 2
|
|
a = np.deg2rad(m.angle_deg)
|
|
cv2.arrowedLine(out, (cx, cy),
|
|
(int(cx + L * np.cos(a)), int(cy - L * np.sin(a))),
|
|
color, 2, cv2.LINE_AA, tipLength=0.2)
|
|
label = f"#{i+1} {m.angle_deg:.0f}d s={m.scale:.2f} {m.score:.2f}"
|
|
cv2.putText(out, label, (cx + 8, cy - 8),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2, cv2.LINE_AA)
|
|
return out
|
|
|
|
|
|
# ---------------- Models ----------------
|
|
|
|
class UploadResp(BaseModel):
|
|
id: str
|
|
width: int
|
|
height: int
|
|
|
|
|
|
class MatchParams(BaseModel):
|
|
model_id: str
|
|
scene_id: str
|
|
roi: list[int] # [x, y, w, h] nell'immagine modello
|
|
angle_min: float = 0.0
|
|
angle_max: float = 360.0
|
|
angle_step: float = 5.0
|
|
scale_min: float = 1.0
|
|
scale_max: float = 1.0
|
|
scale_step: float = 0.1
|
|
min_score: float = 0.55
|
|
max_matches: int = 25
|
|
nms_radius: int = 0
|
|
num_features: int = 96
|
|
weak_grad: float = 30.0
|
|
strong_grad: float = 60.0
|
|
spread_radius: int = 5
|
|
pyramid_levels: int = 3
|
|
verify_threshold: float = 0.4
|
|
|
|
|
|
class MatchResult(BaseModel):
|
|
cx: float
|
|
cy: float
|
|
angle_deg: float
|
|
scale: float
|
|
score: float
|
|
bbox_poly: list[list[float]]
|
|
|
|
|
|
class MatchResp(BaseModel):
|
|
matches: list[MatchResult]
|
|
train_time: float
|
|
find_time: float
|
|
num_variants: int
|
|
annotated_id: str
|
|
|
|
|
|
class TuneParams(BaseModel):
|
|
model_id: str
|
|
roi: list[int]
|
|
|
|
|
|
# ---------- User-facing (simple) params ----------
|
|
|
|
SYMMETRY_TO_ANGLE_MAX = {
|
|
"nessuna": 360.0,
|
|
"bilaterale": 180.0,
|
|
"rot_3": 120.0,
|
|
"rot_4": 90.0,
|
|
"rot_6": 60.0,
|
|
"rot_8": 45.0,
|
|
}
|
|
|
|
SCALE_PRESETS = {
|
|
"fissa": (1.0, 1.0, 0.1),
|
|
"mini": (0.9, 1.1, 0.05), # ±10%
|
|
"medio": (0.75, 1.25, 0.05), # ±25%
|
|
"max": (0.5, 1.5, 0.05), # ±50%
|
|
}
|
|
|
|
PRECISION_ANGLE_STEP = {
|
|
"veloce": 10.0,
|
|
"normale": 5.0,
|
|
"preciso": 2.0,
|
|
}
|
|
|
|
|
|
class SimpleMatchParams(BaseModel):
|
|
model_id: str
|
|
scene_id: str
|
|
roi: list[int]
|
|
tipo: str = "intero" # "intero" | "parziale"
|
|
simmetria: str = "nessuna" # chiave SYMMETRY_TO_ANGLE_MAX
|
|
scala: str = "fissa" # chiave SCALE_PRESETS
|
|
precisione: str = "normale" # chiave PRECISION_ANGLE_STEP
|
|
min_score: float = 0.70
|
|
max_matches: int = 25
|
|
|
|
|
|
def _simple_to_technical(
|
|
p: SimpleMatchParams, roi_img: np.ndarray,
|
|
) -> dict:
|
|
"""Converti parametri user-facing → tecnici usando analisi della ROI."""
|
|
from pm2d.auto_tune import auto_tune as _auto
|
|
|
|
tune = _auto(roi_img)
|
|
h, w = roi_img.shape[:2]
|
|
min_side = min(h, w)
|
|
|
|
# Feature count: parziale = meno feature (area minore)
|
|
nf = tune["num_features"]
|
|
if p.tipo == "parziale":
|
|
nf = max(32, int(nf * 0.6))
|
|
|
|
# Piramide derivata da dimensione ROI
|
|
if min_side < 60:
|
|
pyr = 1
|
|
elif min_side < 150:
|
|
pyr = 2
|
|
elif min_side < 400:
|
|
pyr = 3
|
|
else:
|
|
pyr = 4
|
|
|
|
# Spread radius ~2-3% del lato minimo
|
|
spread = max(3, min(10, int(round(min_side * 0.03))))
|
|
|
|
angle_max = SYMMETRY_TO_ANGLE_MAX.get(p.simmetria, 360.0)
|
|
smin, smax, sstep = SCALE_PRESETS.get(p.scala, (1.0, 1.0, 0.1))
|
|
ang_step = PRECISION_ANGLE_STEP.get(p.precisione, 5.0)
|
|
|
|
return {
|
|
"num_features": nf,
|
|
"weak_grad": tune["weak_grad"],
|
|
"strong_grad": tune["strong_grad"],
|
|
"spread_radius": spread,
|
|
"pyramid_levels": pyr,
|
|
"angle_min": 0.0,
|
|
"angle_max": angle_max,
|
|
"angle_step": ang_step,
|
|
"scale_min": smin,
|
|
"scale_max": smax,
|
|
"scale_step": sstep,
|
|
"min_score": p.min_score,
|
|
"max_matches": p.max_matches,
|
|
"nms_radius": 0,
|
|
# Verify NCC più permissivo per match con variazione intensità
|
|
# (foro su sfondo variabile, parti di oggetto ecc.)
|
|
"verify_threshold": 0.25,
|
|
}
|
|
|
|
|
|
# ---------------- Endpoints ----------------
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def index():
|
|
html_path = STATIC_DIR / "index.html"
|
|
return HTMLResponse(html_path.read_text(encoding="utf-8"))
|
|
|
|
|
|
@app.post("/upload", response_model=UploadResp)
|
|
async def upload(file: UploadFile = File(...)):
|
|
data = await file.read()
|
|
arr = np.frombuffer(data, dtype=np.uint8)
|
|
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
raise HTTPException(400, "Immagine non valida")
|
|
iid = _store_image(img)
|
|
return UploadResp(id=iid, width=img.shape[1], height=img.shape[0])
|
|
|
|
|
|
@app.get("/image/{iid}/raw")
|
|
def image_raw(iid: str):
|
|
img = _IMAGES.get(iid)
|
|
if img is None:
|
|
raise HTTPException(404, "Image not found")
|
|
return Response(_encode_png(img), media_type="image/png")
|
|
|
|
|
|
@app.post("/match", response_model=MatchResp)
|
|
def match(p: MatchParams):
|
|
model = _IMAGES.get(p.model_id)
|
|
scene = _IMAGES.get(p.scene_id)
|
|
if model is None or scene is None:
|
|
raise HTTPException(404, "Immagini non trovate")
|
|
x, y, w, h = p.roi
|
|
x = max(0, x); y = max(0, y)
|
|
w = max(1, min(w, model.shape[1] - x))
|
|
h = max(1, min(h, model.shape[0] - y))
|
|
roi_img = model[y:y + h, x:x + w]
|
|
|
|
m = LineShapeMatcher(
|
|
num_features=p.num_features,
|
|
weak_grad=p.weak_grad, strong_grad=p.strong_grad,
|
|
angle_range_deg=(p.angle_min, p.angle_max),
|
|
angle_step_deg=p.angle_step,
|
|
scale_range=(p.scale_min, p.scale_max),
|
|
scale_step=p.scale_step,
|
|
spread_radius=p.spread_radius,
|
|
pyramid_levels=p.pyramid_levels,
|
|
)
|
|
t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0
|
|
nms = p.nms_radius if p.nms_radius > 0 else None
|
|
t0 = time.time()
|
|
matches = m.find(
|
|
scene, min_score=p.min_score, max_matches=p.max_matches,
|
|
nms_radius=nms, verify_threshold=p.verify_threshold,
|
|
)
|
|
t_find = time.time() - t0
|
|
|
|
# Render annotated image
|
|
tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
|
|
annotated = _draw_matches(scene, matches, tg)
|
|
ann_id = _store_image(annotated)
|
|
|
|
return MatchResp(
|
|
matches=[MatchResult(
|
|
cx=m_.cx, cy=m_.cy, angle_deg=m_.angle_deg, scale=m_.scale,
|
|
score=m_.score,
|
|
bbox_poly=m_.bbox_poly.tolist(),
|
|
) for m_ in matches],
|
|
train_time=t_train, find_time=t_find,
|
|
num_variants=n, annotated_id=ann_id,
|
|
)
|
|
|
|
|
|
@app.post("/match_simple", response_model=MatchResp)
|
|
def match_simple(p: SimpleMatchParams):
|
|
"""Match con parametri user-facing (tipo/simmetria/scala/precisione).
|
|
|
|
Il server deriva i parametri tecnici (num_features, soglie gradiente,
|
|
piramide, ecc.) dall'analisi automatica della ROI.
|
|
"""
|
|
model = _IMAGES.get(p.model_id)
|
|
scene = _IMAGES.get(p.scene_id)
|
|
if model is None or scene is None:
|
|
raise HTTPException(404, "Immagini non trovate")
|
|
x, y, w, h = p.roi
|
|
x = max(0, x); y = max(0, y)
|
|
w = max(1, min(w, model.shape[1] - x))
|
|
h = max(1, min(h, model.shape[0] - y))
|
|
roi_img = model[y:y + h, x:x + w]
|
|
|
|
tech = _simple_to_technical(p, roi_img)
|
|
|
|
m = LineShapeMatcher(
|
|
num_features=tech["num_features"],
|
|
weak_grad=tech["weak_grad"], strong_grad=tech["strong_grad"],
|
|
angle_range_deg=(tech["angle_min"], tech["angle_max"]),
|
|
angle_step_deg=tech["angle_step"],
|
|
scale_range=(tech["scale_min"], tech["scale_max"]),
|
|
scale_step=tech["scale_step"],
|
|
spread_radius=tech["spread_radius"],
|
|
pyramid_levels=tech["pyramid_levels"],
|
|
)
|
|
t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0
|
|
nms = tech["nms_radius"] if tech["nms_radius"] > 0 else None
|
|
t0 = time.time()
|
|
matches = m.find(
|
|
scene, min_score=tech["min_score"], max_matches=tech["max_matches"],
|
|
nms_radius=nms, verify_threshold=tech["verify_threshold"],
|
|
)
|
|
t_find = time.time() - t0
|
|
|
|
tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
|
|
annotated = _draw_matches(scene, matches, tg)
|
|
ann_id = _store_image(annotated)
|
|
|
|
return MatchResp(
|
|
matches=[MatchResult(
|
|
cx=mt.cx, cy=mt.cy, angle_deg=mt.angle_deg, scale=mt.scale,
|
|
score=mt.score, bbox_poly=mt.bbox_poly.tolist(),
|
|
) for mt in matches],
|
|
train_time=t_train, find_time=t_find,
|
|
num_variants=n, annotated_id=ann_id,
|
|
)
|
|
|
|
|
|
@app.post("/auto_tune")
|
|
def tune(p: TuneParams):
|
|
model = _IMAGES.get(p.model_id)
|
|
if model is None:
|
|
raise HTTPException(404, "Immagine non trovata")
|
|
x, y, w, h = p.roi
|
|
roi_img = model[y:y + h, x:x + w]
|
|
t = auto_tune(roi_img)
|
|
return {k: v for k, v in t.items() if not k.startswith("_")}
|
|
|
|
|
|
# Mount static
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
|
|
|
|
|
|
def serve(host: str = "127.0.0.1", port: int = 8080):
|
|
import uvicorn
|
|
uvicorn.run(app, host=host, port=port, log_level="info")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
serve()
|