Files
Shape_Model_2D/pm2d/web/server.py
Adriano 3e4c20ecf5 feat: upload file nella cartella IMAGES_DIR
POST /upload_to_folder: sanitizza nome, valida estensione e contenuto
via cv2.imdecode, auto-rename su collisione.
Toolbar UI: bottone 'Carica file', dopo upload ricarica picker.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 14:45:16 +02:00

590 lines
19 KiB
Python

"""FastAPI webapp standalone per PM2D.
Endpoint:
GET / → HTML UI
POST /upload → upload immagine (multipart)
POST /match → JSON params + ids → results
GET /image/{id}/raw → PNG originale
GET /image/{id}/annotated → PNG con overlay match
"""
from __future__ import annotations
import hashlib
import os
import tempfile
import time
import uuid
from collections import OrderedDict
from pathlib import Path
import cv2
import numpy as np
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.responses import HTMLResponse, Response
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
def _load_env(root: Path) -> None:
"""Legge .env in root e popola os.environ (no override se già set)."""
f = root / ".env"
if not f.exists():
return
for line in f.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
k = k.strip(); v = v.strip().strip('"').strip("'")
os.environ.setdefault(k, v)
# Root progetto (parent di pm2d/)
PROJECT_ROOT = Path(__file__).resolve().parents[2]
_load_env(PROJECT_ROOT)
_images_dir_raw = os.environ.get("IMAGES_DIR", "Test")
IMAGES_DIR = Path(_images_dir_raw)
if not IMAGES_DIR.is_absolute():
IMAGES_DIR = PROJECT_ROOT / IMAGES_DIR
from pm2d.line_matcher import LineShapeMatcher, Match
from pm2d.auto_tune import auto_tune
WEB_DIR = Path(__file__).parent
STATIC_DIR = WEB_DIR / "static"
STATIC_DIR.mkdir(exist_ok=True)
# Persistenza immagini su disco (sopravvive a restart server)
CACHE_DIR = Path(tempfile.gettempdir()) / "pm2d_cache"
CACHE_DIR.mkdir(exist_ok=True)
# Cache in-memory (soft, ricaricata da disco se mancante)
_IMG_CACHE: dict[str, np.ndarray] = {}
# Cache matcher addestrati: (roi_hash, params_hash) -> LineShapeMatcher
# LRU con capacità limitata
_MATCHER_CACHE: OrderedDict = OrderedDict()
_MATCHER_CACHE_SIZE = 8
def _matcher_cache_key(roi: np.ndarray, tech: dict) -> str:
h = hashlib.md5()
h.update(roi.tobytes())
# Solo parametri che influenzano il training
relevant = ("num_features", "weak_grad", "strong_grad",
"angle_min", "angle_max", "angle_step",
"scale_min", "scale_max", "scale_step",
"spread_radius", "pyramid_levels")
for k in relevant:
h.update(f"{k}={tech.get(k)}".encode())
h.update(f"shape={roi.shape}".encode())
return h.hexdigest()
def _cache_get_matcher(key: str):
m = _MATCHER_CACHE.get(key)
if m is not None:
_MATCHER_CACHE.move_to_end(key) # LRU touch
return m
def _cache_put_matcher(key: str, matcher) -> None:
_MATCHER_CACHE[key] = matcher
_MATCHER_CACHE.move_to_end(key)
while len(_MATCHER_CACHE) > _MATCHER_CACHE_SIZE:
_MATCHER_CACHE.popitem(last=False)
def _store_image(img: np.ndarray) -> str:
iid = uuid.uuid4().hex[:12]
cv2.imwrite(str(CACHE_DIR / f"{iid}.png"), img)
_IMG_CACHE[iid] = img
return iid
def _load_image(iid: str) -> np.ndarray | None:
cached = _IMG_CACHE.get(iid)
if cached is not None:
return cached
p = CACHE_DIR / f"{iid}.png"
if not p.exists():
return None
img = cv2.imread(str(p))
if img is not None:
_IMG_CACHE[iid] = img
return img
app = FastAPI(title="PM2D Webapp", version="1.0.0")
def _encode_png(img: np.ndarray) -> bytes:
ok, buf = cv2.imencode(".png", img)
if not ok:
raise RuntimeError("PNG encode failed")
return buf.tobytes()
def _draw_matches(scene: np.ndarray, matches: list[Match],
template_gray: np.ndarray | None) -> np.ndarray:
out = scene.copy()
H, W = scene.shape[:2]
palette = [
(0, 255, 0), (0, 200, 255), (255, 100, 100), (255, 200, 0),
(200, 0, 255), (100, 255, 200), (255, 0, 0), (0, 255, 255),
]
for i, m in enumerate(matches):
color = palette[i % len(palette)]
if template_gray is not None:
t = template_gray
th, tw = t.shape
edge = cv2.Canny(t, 50, 150)
cx_t = (tw - 1) / 2.0; cy_t = (th - 1) / 2.0
M = cv2.getRotationMatrix2D((cx_t, cy_t), m.angle_deg, m.scale)
M[0, 2] += m.cx - cx_t
M[1, 2] += m.cy - cy_t
warped = cv2.warpAffine(edge, M, (W, H),
flags=cv2.INTER_NEAREST, borderValue=0)
mask = warped > 0
if mask.any():
overlay = np.zeros_like(out)
overlay[mask] = color
out[mask] = (0.3 * out[mask] + 0.7 * overlay[mask]).astype(np.uint8)
poly = m.bbox_poly.astype(np.int32).reshape(-1, 1, 2)
cv2.polylines(out, [poly], True, color, 2, cv2.LINE_AA)
p0 = tuple(m.bbox_poly[0].astype(int))
p1 = tuple(m.bbox_poly[1].astype(int))
cv2.line(out, p0, p1, color, 4, cv2.LINE_AA)
cx, cy = int(round(m.cx)), int(round(m.cy))
cv2.drawMarker(out, (cx, cy), color, cv2.MARKER_CROSS, 22, 2, cv2.LINE_AA)
L = int(np.linalg.norm(m.bbox_poly[1] - m.bbox_poly[0])) // 2
a = np.deg2rad(m.angle_deg)
cv2.arrowedLine(out, (cx, cy),
(int(cx + L * np.cos(a)), int(cy - L * np.sin(a))),
color, 2, cv2.LINE_AA, tipLength=0.2)
label = f"#{i+1} {m.angle_deg:.0f}d s={m.scale:.2f} {m.score:.2f}"
cv2.putText(out, label, (cx + 8, cy - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2, cv2.LINE_AA)
return out
# ---------------- Models ----------------
class UploadResp(BaseModel):
id: str
width: int
height: int
class MatchParams(BaseModel):
model_id: str
scene_id: str
roi: list[int] # [x, y, w, h] nell'immagine modello
angle_min: float = 0.0
angle_max: float = 360.0
angle_step: float = 5.0
scale_min: float = 1.0
scale_max: float = 1.0
scale_step: float = 0.1
min_score: float = 0.55
max_matches: int = 25
nms_radius: int = 0
num_features: int = 96
weak_grad: float = 30.0
strong_grad: float = 60.0
spread_radius: int = 5
pyramid_levels: int = 3
verify_threshold: float = 0.4
class MatchResult(BaseModel):
cx: float
cy: float
angle_deg: float
scale: float
score: float
bbox_poly: list[list[float]]
class MatchResp(BaseModel):
matches: list[MatchResult]
train_time: float
find_time: float
num_variants: int
annotated_id: str
class TuneParams(BaseModel):
model_id: str
roi: list[int]
# ---------- User-facing (simple) params ----------
SYMMETRY_TO_ANGLE_MAX = {
"invariante": 0.0, # oggetto simmetrico a rotazione totale (cerchi): 1 variante
"nessuna": 360.0,
"bilaterale": 180.0,
"rot_3": 120.0,
"rot_4": 90.0,
"rot_6": 60.0,
"rot_8": 45.0,
}
SCALE_PRESETS = {
"fissa": (1.0, 1.0, 0.1),
"mini": (0.9, 1.1, 0.05), # ±10%
"medio": (0.75, 1.25, 0.05), # ±25%
"max": (0.5, 1.5, 0.05), # ±50%
}
PRECISION_ANGLE_STEP = {
"veloce": 10.0,
"normale": 5.0,
"preciso": 2.0,
}
# "Filtro falsi positivi" = mapping semantico del verify NCC threshold.
# Un operatore sceglie il livello di rigore, non un numero astratto.
FILTRO_FP_MAP = {
"off": 0.0, # disabilitato: mantieni tutti i match shape-based
"leggero": 0.20, # tollera variazioni intensità/illuminazione forti
"medio": 0.35, # default bilanciato (consigliato)
"forte": 0.50, # scarta match con intensità molto diversa dal template
}
class SimpleMatchParams(BaseModel):
model_id: str
scene_id: str
roi: list[int]
tipo: str = "intero" # "intero" | "parziale"
simmetria: str = "nessuna" # chiave SYMMETRY_TO_ANGLE_MAX
scala: str = "fissa" # chiave SCALE_PRESETS
precisione: str = "normale" # chiave PRECISION_ANGLE_STEP
filtro_fp: str = "medio" # chiave FILTRO_FP_MAP
penalita_scala: float = 0.0 # 0 = score shape invariante, >0 = penalizza scala != 1
min_score: float = 0.65
max_matches: int = 25
def _simple_to_technical(
p: SimpleMatchParams, roi_img: np.ndarray,
) -> dict:
"""Converti parametri user-facing → tecnici usando analisi della ROI."""
from pm2d.auto_tune import auto_tune as _auto
tune = _auto(roi_img)
h, w = roi_img.shape[:2]
min_side = min(h, w)
# Feature count: parziale = meno feature (area minore)
nf = tune["num_features"]
if p.tipo == "parziale":
nf = max(32, int(nf * 0.6))
# Piramide derivata da dimensione ROI
if min_side < 60:
pyr = 1
elif min_side < 150:
pyr = 2
elif min_side < 400:
pyr = 3
else:
pyr = 4
# Spread radius ~2-3% del lato minimo
spread = max(3, min(10, int(round(min_side * 0.03))))
angle_max = SYMMETRY_TO_ANGLE_MAX.get(p.simmetria, 360.0)
smin, smax, sstep = SCALE_PRESETS.get(p.scala, (1.0, 1.0, 0.1))
ang_step = PRECISION_ANGLE_STEP.get(p.precisione, 5.0)
return {
"num_features": nf,
"weak_grad": tune["weak_grad"],
"strong_grad": tune["strong_grad"],
"spread_radius": spread,
"pyramid_levels": pyr,
"angle_min": 0.0,
"angle_max": angle_max,
"angle_step": ang_step,
"scale_min": smin,
"scale_max": smax,
"scale_step": sstep,
"min_score": p.min_score,
"max_matches": p.max_matches,
"nms_radius": 0,
"verify_threshold": FILTRO_FP_MAP.get(p.filtro_fp, 0.35),
"scale_penalty": p.penalita_scala,
}
# ---------------- Endpoints ----------------
@app.get("/", response_class=HTMLResponse)
def index():
html_path = STATIC_DIR / "index.html"
return HTMLResponse(html_path.read_text(encoding="utf-8"))
@app.post("/upload_to_folder")
async def upload_to_folder(file: UploadFile = File(...)):
"""Salva file caricato nella cartella IMAGES_DIR. Ritorna lista aggiornata."""
if not IMAGES_DIR.is_dir():
raise HTTPException(500, f"IMAGES_DIR non esiste: {IMAGES_DIR}")
# Sanitizza nome file (no traversal)
name = Path(file.filename or "upload.png").name
if not name:
raise HTTPException(400, "nome file vuoto")
ext = Path(name).suffix.lower()
allowed = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}
if ext not in allowed:
raise HTTPException(400, f"estensione non supportata: {ext}")
# Leggi contenuto e valida come immagine
data = await file.read()
arr = np.frombuffer(data, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(400, "file non è un'immagine valida")
# Evita overwrite: se esiste, aggiungi suffisso numerico
target = IMAGES_DIR / name
if target.exists():
stem = target.stem; suffix = target.suffix
i = 1
while True:
alt = IMAGES_DIR / f"{stem}_{i}{suffix}"
if not alt.exists():
target = alt; break
i += 1
# Scrivi su disco
with open(target, "wb") as f:
f.write(data)
# Ritorna lista aggiornata
return {
"saved_as": target.name,
"dir": str(IMAGES_DIR),
"files": sorted(
p.name for p in IMAGES_DIR.iterdir()
if p.is_file() and p.suffix.lower() in allowed
),
}
@app.get("/folder_image/{filename}")
def folder_image(filename: str, w: int = 120):
"""Serve thumbnail PNG dell'immagine IMAGES_DIR (scalata a width w)."""
if "/" in filename or ".." in filename:
raise HTTPException(400, "nome non valido")
path = IMAGES_DIR / filename
if not path.is_file():
raise HTTPException(404, "non trovato")
img = cv2.imread(str(path), cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(400, "non leggibile")
h0, w0 = img.shape[:2]
if w0 > w:
sc = w / w0
img = cv2.resize(img, (w, int(h0 * sc)), interpolation=cv2.INTER_AREA)
return Response(_encode_png(img), media_type="image/png",
headers={"Cache-Control": "public, max-age=3600"})
@app.get("/images")
def list_images():
"""Lista file immagine nella cartella configurata in IMAGES_DIR."""
if not IMAGES_DIR.is_dir():
return {"dir": str(IMAGES_DIR), "files": []}
exts = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}
files = sorted(
p.name for p in IMAGES_DIR.iterdir()
if p.is_file() and p.suffix.lower() in exts
)
return {"dir": str(IMAGES_DIR), "files": files}
class LoadFolderReq(BaseModel):
filename: str
@app.post("/load_from_folder", response_model=UploadResp)
def load_from_folder(req: LoadFolderReq):
"""Carica immagine dalla cartella IMAGES_DIR per nome file."""
name = req.filename
if "/" in name or ".." in name:
raise HTTPException(400, "nome file non valido")
path = IMAGES_DIR / name
if not path.is_file():
raise HTTPException(404, f"file non trovato: {name}")
img = cv2.imread(str(path), cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(400, "immagine non leggibile")
iid = _store_image(img)
return UploadResp(id=iid, width=img.shape[1], height=img.shape[0])
@app.post("/upload", response_model=UploadResp)
async def upload(file: UploadFile = File(...)):
data = await file.read()
arr = np.frombuffer(data, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(400, "Immagine non valida")
iid = _store_image(img)
return UploadResp(id=iid, width=img.shape[1], height=img.shape[0])
@app.get("/image/{iid}/raw")
def image_raw(iid: str):
img = _load_image(iid)
if img is None:
raise HTTPException(404, "Image not found")
return Response(_encode_png(img), media_type="image/png")
@app.post("/match", response_model=MatchResp)
def match(p: MatchParams):
model = _load_image(p.model_id)
scene = _load_image(p.scene_id)
if model is None or scene is None:
raise HTTPException(404, "Immagini non trovate")
x, y, w, h = p.roi
x = max(0, x); y = max(0, y)
w = max(1, min(w, model.shape[1] - x))
h = max(1, min(h, model.shape[0] - y))
roi_img = model[y:y + h, x:x + w]
tech_for_cache = {
"num_features": p.num_features,
"weak_grad": p.weak_grad, "strong_grad": p.strong_grad,
"angle_min": p.angle_min, "angle_max": p.angle_max,
"angle_step": p.angle_step,
"scale_min": p.scale_min, "scale_max": p.scale_max,
"scale_step": p.scale_step,
"spread_radius": p.spread_radius,
"pyramid_levels": p.pyramid_levels,
}
key = _matcher_cache_key(roi_img, tech_for_cache)
m = _cache_get_matcher(key)
if m is None:
m = LineShapeMatcher(
num_features=p.num_features,
weak_grad=p.weak_grad, strong_grad=p.strong_grad,
angle_range_deg=(p.angle_min, p.angle_max),
angle_step_deg=p.angle_step,
scale_range=(p.scale_min, p.scale_max),
scale_step=p.scale_step,
spread_radius=p.spread_radius,
pyramid_levels=p.pyramid_levels,
)
t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0
_cache_put_matcher(key, m)
else:
n = len(m.variants); t_train = 0.0
nms = p.nms_radius if p.nms_radius > 0 else None
t0 = time.time()
matches = m.find(
scene, min_score=p.min_score, max_matches=p.max_matches,
nms_radius=nms, verify_threshold=p.verify_threshold,
)
t_find = time.time() - t0
# Render annotated image
tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
annotated = _draw_matches(scene, matches, tg)
ann_id = _store_image(annotated)
return MatchResp(
matches=[MatchResult(
cx=m_.cx, cy=m_.cy, angle_deg=m_.angle_deg, scale=m_.scale,
score=m_.score,
bbox_poly=m_.bbox_poly.tolist(),
) for m_ in matches],
train_time=t_train, find_time=t_find,
num_variants=n, annotated_id=ann_id,
)
@app.post("/match_simple", response_model=MatchResp)
def match_simple(p: SimpleMatchParams):
"""Match con parametri user-facing (tipo/simmetria/scala/precisione).
Il server deriva i parametri tecnici (num_features, soglie gradiente,
piramide, ecc.) dall'analisi automatica della ROI.
"""
model = _load_image(p.model_id)
scene = _load_image(p.scene_id)
if model is None or scene is None:
raise HTTPException(404, "Immagini non trovate")
x, y, w, h = p.roi
x = max(0, x); y = max(0, y)
w = max(1, min(w, model.shape[1] - x))
h = max(1, min(h, model.shape[0] - y))
roi_img = model[y:y + h, x:x + w]
tech = _simple_to_technical(p, roi_img)
key = _matcher_cache_key(roi_img, tech)
m = _cache_get_matcher(key)
if m is None:
m = LineShapeMatcher(
num_features=tech["num_features"],
weak_grad=tech["weak_grad"], strong_grad=tech["strong_grad"],
angle_range_deg=(tech["angle_min"], tech["angle_max"]),
angle_step_deg=tech["angle_step"],
scale_range=(tech["scale_min"], tech["scale_max"]),
scale_step=tech["scale_step"],
spread_radius=tech["spread_radius"],
pyramid_levels=tech["pyramid_levels"],
)
t0 = time.time(); n = m.train(roi_img); t_train = time.time() - t0
_cache_put_matcher(key, m)
else:
n = len(m.variants); t_train = 0.0
nms = tech["nms_radius"] if tech["nms_radius"] > 0 else None
t0 = time.time()
matches = m.find(
scene, min_score=tech["min_score"], max_matches=tech["max_matches"],
nms_radius=nms, verify_threshold=tech["verify_threshold"],
scale_penalty=tech.get("scale_penalty", 0.0),
)
t_find = time.time() - t0
tg = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
annotated = _draw_matches(scene, matches, tg)
ann_id = _store_image(annotated)
return MatchResp(
matches=[MatchResult(
cx=mt.cx, cy=mt.cy, angle_deg=mt.angle_deg, scale=mt.scale,
score=mt.score, bbox_poly=mt.bbox_poly.tolist(),
) for mt in matches],
train_time=t_train, find_time=t_find,
num_variants=n, annotated_id=ann_id,
)
@app.post("/auto_tune")
def tune(p: TuneParams):
model = _load_image(p.model_id)
if model is None:
raise HTTPException(404, "Immagine non trovata")
x, y, w, h = p.roi
roi_img = model[y:y + h, x:x + w]
t = auto_tune(roi_img)
return {k: v for k, v in t.items() if not k.startswith("_")}
# Mount static
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
def serve(host: str = "127.0.0.1", port: int = 8080):
import uvicorn
uvicorn.run(app, host=host, port=port, log_level="info")
if __name__ == "__main__":
serve()