|
|
from __future__ import annotations |
|
|
from fastapi import FastAPI, UploadFile, File,Query, Form, BackgroundTasks, HTTPException |
|
|
from fastapi import Body |
|
|
from fastapi.responses import JSONResponse, FileResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pathlib import Path |
|
|
import shutil |
|
|
import uvicorn |
|
|
import json |
|
|
import uuid |
|
|
from datetime import datetime |
|
|
from typing import Dict |
|
|
from enum import Enum |
|
|
import os |
|
|
import yaml |
|
|
import io |
|
|
|
|
|
from video_processing import process_video_pipeline |
|
|
from audio_tools import process_audio_for_video, extract_audio_ffmpeg, embed_voice_segments |
|
|
from casting_loader import ensure_chroma, build_faces_index, build_voices_index |
|
|
from narration_system import NarrationSystem |
|
|
from llm_router import load_yaml, LLMRouter |
|
|
from character_detection import detect_characters_from_video |
|
|
|
|
|
from pipelines.audiodescription import generate as ad_generate |
|
|
|
|
|
from storage.files.file_manager import FileManager |
|
|
from storage.media_routers import router as media_router |
|
|
|
|
|
app = FastAPI(title="Veureu Engine API", version="0.2.0") |
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
ROOT = Path("/tmp/veureu") |
|
|
ROOT.mkdir(parents=True, exist_ok=True) |
|
|
TEMP_ROOT = Path("/tmp/temp") |
|
|
TEMP_ROOT.mkdir(parents=True, exist_ok=True) |
|
|
VIDEOS_ROOT = Path("/tmp/data/videos") |
|
|
VIDEOS_ROOT.mkdir(parents=True, exist_ok=True) |
|
|
IDENTITIES_ROOT = Path("/tmp/characters") |
|
|
IDENTITIES_ROOT.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
class JobStatus(str, Enum): |
|
|
QUEUED = "queued" |
|
|
PROCESSING = "processing" |
|
|
DONE = "done" |
|
|
FAILED = "failed" |
|
|
|
|
|
jobs: Dict[str, dict] = {} |
|
|
|
|
|
app.include_router(media_router) |
|
|
|
|
|
|
|
|
def describe_image_with_svision(image_path: str, is_face: bool = True) -> tuple[str, str]: |
|
|
""" |
|
|
Llama al space svision para describir una imagen (usado en generación de AD). |
|
|
|
|
|
Args: |
|
|
image_path: Ruta absoluta a la imagen |
|
|
is_face: True si es una cara, False si es una escena |
|
|
|
|
|
Returns: |
|
|
tuple (descripción_completa, nombre_abreviado) |
|
|
""" |
|
|
try: |
|
|
from pathlib import Path as _P |
|
|
import yaml |
|
|
from llm_router import LLMRouter |
|
|
|
|
|
|
|
|
config_path = _P(__file__).parent / "config.yaml" |
|
|
if not config_path.exists(): |
|
|
print(f"[svision] Config no encontrado: {config_path}") |
|
|
return ("", "") |
|
|
|
|
|
with open(config_path, 'r', encoding='utf-8') as f: |
|
|
cfg = yaml.safe_load(f) or {} |
|
|
|
|
|
router = LLMRouter(cfg) |
|
|
|
|
|
|
|
|
if is_face: |
|
|
context = { |
|
|
"task": "describe_person", |
|
|
"instructions": "Descriu la persona en la imatge. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.", |
|
|
"max_tokens": 256 |
|
|
} |
|
|
else: |
|
|
context = { |
|
|
"task": "describe_scene", |
|
|
"instructions": "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.", |
|
|
"max_tokens": 128 |
|
|
} |
|
|
|
|
|
|
|
|
descriptions = router.vision_describe([str(image_path)], context=context, model="salamandra-vision") |
|
|
full_description = descriptions[0] if descriptions else "" |
|
|
|
|
|
if not full_description: |
|
|
return ("", "") |
|
|
|
|
|
print(f"[svision] Descripció generada: {full_description[:100]}...") |
|
|
|
|
|
return (full_description, "") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[svision] Error al descriure imatge: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return ("", "") |
|
|
|
|
|
def normalize_face_lighting(image): |
|
|
""" |
|
|
Normaliza el brillo de una imagen de cara usando técnicas combinadas: |
|
|
1. CLAHE para ecualización adaptativa |
|
|
2. Normalización de rango para homogeneizar brillo general |
|
|
|
|
|
Esto reduce el impacto de diferentes condiciones de iluminación en los embeddings |
|
|
y en la visualización de las imágenes. |
|
|
|
|
|
Args: |
|
|
image: Imagen BGR (OpenCV format) |
|
|
|
|
|
Returns: |
|
|
Imagen normalizada en el mismo formato |
|
|
""" |
|
|
import cv2 |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) |
|
|
l, a, b = cv2.split(lab) |
|
|
|
|
|
|
|
|
|
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) |
|
|
l_clahe = clahe.apply(l) |
|
|
|
|
|
|
|
|
|
|
|
l_min, l_max = l_clahe.min(), l_clahe.max() |
|
|
if l_max > l_min: |
|
|
|
|
|
l_normalized = ((l_clahe - l_min) * 255.0 / (l_max - l_min)).astype(np.uint8) |
|
|
else: |
|
|
l_normalized = l_clahe |
|
|
|
|
|
|
|
|
l_normalized = cv2.GaussianBlur(l_normalized, (3, 3), 0) |
|
|
|
|
|
|
|
|
lab_normalized = cv2.merge([l_normalized, a, b]) |
|
|
|
|
|
|
|
|
normalized = cv2.cvtColor(lab_normalized, cv2.COLOR_LAB2BGR) |
|
|
return normalized |
|
|
|
|
|
def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int, sensitivity: float = 0.5) -> np.ndarray: |
|
|
""" |
|
|
Clustering jerárquico con silhouette score para encontrar automáticamente el mejor número de clusters. |
|
|
Selecciona automáticamente el mejor número de clusters (hasta max_groups) usando silhouette score. |
|
|
Filtra clusters con menos de min_cluster_size muestras (marcados como -1/ruido). |
|
|
|
|
|
Args: |
|
|
X: Array de embeddings (N, D) |
|
|
max_groups: Número máximo de clusters a formar |
|
|
min_cluster_size: Tamaño mínimo de cluster válido |
|
|
sensitivity: Sensibilidad del clustering (0.0-1.0) |
|
|
- 0.0 = muy agresivo (menos clusters) |
|
|
- 0.5 = balanceado (recomendado) |
|
|
- 1.0 = muy permisivo (más clusters) |
|
|
|
|
|
Returns: |
|
|
Array de labels (N,) donde -1 indica ruido |
|
|
""" |
|
|
import numpy as np |
|
|
from scipy.cluster.hierarchy import linkage, fcluster |
|
|
from sklearn.metrics import silhouette_score |
|
|
from collections import Counter |
|
|
|
|
|
if len(X) == 0: |
|
|
return np.array([]) |
|
|
|
|
|
if len(X) < min_cluster_size: |
|
|
|
|
|
return np.full(len(X), -1, dtype=int) |
|
|
|
|
|
|
|
|
|
|
|
Z = linkage(X, method='average', metric='cosine') |
|
|
|
|
|
|
|
|
best_n_clusters = 2 |
|
|
best_score = -1 |
|
|
|
|
|
|
|
|
max_to_try = min(max_groups, len(X) - 1) |
|
|
|
|
|
if max_to_try >= 2: |
|
|
for n_clusters in range(2, max_to_try + 1): |
|
|
trial_labels = fcluster(Z, t=n_clusters, criterion='maxclust') - 1 |
|
|
|
|
|
|
|
|
trial_counts = Counter(trial_labels) |
|
|
valid_clusters = sum(1 for count in trial_counts.values() if count >= min_cluster_size) |
|
|
|
|
|
|
|
|
if valid_clusters >= 2: |
|
|
try: |
|
|
score = silhouette_score(X, trial_labels, metric='cosine') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
penalty = 0.14 - (sensitivity * 0.13) |
|
|
adjusted_score = score - (n_clusters * penalty) |
|
|
|
|
|
if adjusted_score > best_score: |
|
|
best_score = adjusted_score |
|
|
best_n_clusters = n_clusters |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
penalty = 0.14 - (sensitivity * 0.13) |
|
|
print(f"Clustering óptimo: {best_n_clusters} clusters (de máximo {max_groups}), sensitivity={sensitivity:.2f}, penalty={penalty:.3f}, silhouette={best_score:.3f}") |
|
|
labels = fcluster(Z, t=best_n_clusters, criterion='maxclust') |
|
|
|
|
|
|
|
|
labels = labels - 1 |
|
|
|
|
|
|
|
|
label_counts = Counter(labels) |
|
|
filtered_labels = [] |
|
|
for lbl in labels: |
|
|
if label_counts[lbl] >= min_cluster_size: |
|
|
filtered_labels.append(lbl) |
|
|
else: |
|
|
filtered_labels.append(-1) |
|
|
|
|
|
return np.array(filtered_labels, dtype=int) |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return {"ok": True, "service": "veureu-engine"} |
|
|
|
|
|
@app.post("/process_video") |
|
|
async def process_video( |
|
|
video_file: UploadFile = File(...), |
|
|
config_path: str = Form("config.yaml"), |
|
|
out_root: str = Form("results"), |
|
|
db_dir: str = Form("chroma_db"), |
|
|
): |
|
|
tmp_video = ROOT / video_file.filename |
|
|
with tmp_video.open("wb") as f: |
|
|
shutil.copyfileobj(video_file.file, f) |
|
|
result = process_video_pipeline(str(tmp_video), config_path=config_path, out_root=out_root, db_dir=db_dir) |
|
|
return JSONResponse(result) |
|
|
|
|
|
@app.post("/create_initial_casting") |
|
|
async def create_initial_casting( |
|
|
background_tasks: BackgroundTasks, |
|
|
video: UploadFile = File(...), |
|
|
max_groups: int = Form(default=3), |
|
|
min_cluster_size: int = Form(default=3), |
|
|
face_sensitivity: float = Form(default=0.5), |
|
|
voice_max_groups: int = Form(default=3), |
|
|
voice_min_cluster_size: int = Form(default=3), |
|
|
voice_sensitivity: float = Form(default=0.5), |
|
|
max_frames: int = Form(default=100), |
|
|
): |
|
|
""" |
|
|
Crea un job para procesar el vídeo de forma asíncrona usando clustering jerárquico. |
|
|
Devuelve un job_id inmediatamente. |
|
|
""" |
|
|
|
|
|
video_name = Path(video.filename).stem |
|
|
dst_video = VIDEOS_ROOT / f"{video_name}.mp4" |
|
|
with dst_video.open("wb") as f: |
|
|
shutil.copyfileobj(video.file, f) |
|
|
|
|
|
|
|
|
job_id = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
jobs[job_id] = { |
|
|
"id": job_id, |
|
|
"status": JobStatus.QUEUED, |
|
|
"video_path": str(dst_video), |
|
|
"video_name": video_name, |
|
|
"max_groups": int(max_groups), |
|
|
"min_cluster_size": int(min_cluster_size), |
|
|
"face_sensitivity": float(face_sensitivity), |
|
|
"voice_max_groups": int(voice_max_groups), |
|
|
"voice_min_cluster_size": int(voice_min_cluster_size), |
|
|
"voice_sensitivity": float(voice_sensitivity), |
|
|
"max_frames": int(max_frames), |
|
|
"created_at": datetime.now().isoformat(), |
|
|
"results": None, |
|
|
"error": None |
|
|
} |
|
|
|
|
|
print(f"[{job_id}] Job creado para vídeo: {video_name}") |
|
|
|
|
|
|
|
|
background_tasks.add_task(process_video_job, job_id) |
|
|
|
|
|
|
|
|
return {"job_id": job_id} |
|
|
|
|
|
@app.get("/jobs/{job_id}/status") |
|
|
def get_job_status(job_id: str): |
|
|
""" |
|
|
Devuelve el estado actual de un job. |
|
|
El UI hace polling de este endpoint cada 5 segundos. |
|
|
""" |
|
|
if job_id not in jobs: |
|
|
raise HTTPException(status_code=404, detail="Job not found") |
|
|
|
|
|
job = jobs[job_id] |
|
|
|
|
|
|
|
|
status_value = job["status"].value if isinstance(job["status"], JobStatus) else str(job["status"]) |
|
|
response = {"status": status_value} |
|
|
|
|
|
|
|
|
if job.get("results") is not None: |
|
|
response["results"] = job["results"] |
|
|
|
|
|
|
|
|
if job.get("error"): |
|
|
response["error"] = job["error"] |
|
|
|
|
|
return response |
|
|
|
|
|
@app.get("/files/{video_name}/{char_id}/{filename}") |
|
|
def serve_character_file(video_name: str, char_id: str, filename: str): |
|
|
""" |
|
|
Sirve archivos estáticos de personajes (imágenes). |
|
|
Ejemplo: /files/dif_catala_1/char1/representative.jpg |
|
|
""" |
|
|
|
|
|
file_path = TEMP_ROOT / video_name / "characters" / char_id / filename |
|
|
|
|
|
if not file_path.exists(): |
|
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
|
|
|
return FileResponse(file_path) |
|
|
|
|
|
@app.get("/audio/{video_name}/{filename}") |
|
|
def serve_audio_file(video_name: str, filename: str): |
|
|
file_path = TEMP_ROOT / video_name / "clips" / filename |
|
|
if not file_path.exists(): |
|
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
return FileResponse(file_path) |
|
|
|
|
|
def process_video_job(job_id: str): |
|
|
""" |
|
|
Procesa el vídeo de forma asíncrona. |
|
|
Esta función se ejecuta en background. |
|
|
""" |
|
|
try: |
|
|
job = jobs[job_id] |
|
|
print(f"[{job_id}] Iniciando procesamiento...") |
|
|
|
|
|
|
|
|
job["status"] = JobStatus.PROCESSING |
|
|
|
|
|
video_path = job["video_path"] |
|
|
video_name = job["video_name"] |
|
|
max_groups = int(job.get("max_groups", 5)) |
|
|
min_cluster_size = int(job.get("min_cluster_size", 3)) |
|
|
face_sensitivity = float(job.get("face_sensitivity", 0.5)) |
|
|
v_max_groups = int(job.get("voice_max_groups", 5)) |
|
|
v_min_cluster = int(job.get("voice_min_cluster_size", 3)) |
|
|
voice_sensitivity = float(job.get("voice_sensitivity", 0.5)) |
|
|
|
|
|
|
|
|
base = TEMP_ROOT / video_name |
|
|
base.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
print(f"[{job_id}] Directorio base: {base}") |
|
|
|
|
|
|
|
|
try: |
|
|
print(f"[{job_id}] Iniciando detección de personajes (CPU, originales)...") |
|
|
print(f"[{job_id}] *** Normalización de brillo ACTIVADA ***") |
|
|
print(f"[{job_id}] - CLAHE adaptativo (clipLimit=3.0)") |
|
|
print(f"[{job_id}] - Estiramiento de histograma") |
|
|
print(f"[{job_id}] - Suavizado Gaussiano") |
|
|
print(f"[{job_id}] Esto homogeneizará el brillo de todas las caras detectadas") |
|
|
import cv2 |
|
|
import numpy as np |
|
|
try: |
|
|
import face_recognition |
|
|
_use_fr = True |
|
|
print(f"[{job_id}] face_recognition disponible: CPU") |
|
|
except Exception: |
|
|
face_recognition = None |
|
|
_use_fr = False |
|
|
print(f"[{job_id}] face_recognition no disponible. Intentando DeepFace fallback.") |
|
|
try: |
|
|
from deepface import DeepFace |
|
|
except Exception: |
|
|
DeepFace = None |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise RuntimeError("No se pudo abrir el vídeo para extracción de caras") |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) |
|
|
max_samples = job.get("max_frames", 100) |
|
|
|
|
|
if total_frames > 0: |
|
|
frame_indices = sorted(set(np.linspace(0, max(0, total_frames - 1), num=min(max_samples, max(1, total_frames)), dtype=int).tolist())) |
|
|
else: |
|
|
frame_indices = [] |
|
|
print(f"[{job_id}] Total frames: {total_frames}, FPS: {fps:.2f}, Muestreando {len(frame_indices)} frames equiespaciados (máx {max_samples})") |
|
|
|
|
|
|
|
|
faces_root = base / "faces_raw" |
|
|
faces_root.mkdir(parents=True, exist_ok=True) |
|
|
embeddings: list[list[float]] = [] |
|
|
crops_meta: list[dict] = [] |
|
|
|
|
|
saved_count = 0 |
|
|
frames_processed = 0 |
|
|
frames_with_faces = 0 |
|
|
for frame_idx in frame_indices: |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_idx)) |
|
|
ret2, frame = cap.read() |
|
|
if not ret2: |
|
|
continue |
|
|
frames_processed += 1 |
|
|
|
|
|
frame_normalized = normalize_face_lighting(frame) |
|
|
rgb = cv2.cvtColor(frame_normalized, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
if _use_fr and face_recognition is not None: |
|
|
boxes = face_recognition.face_locations(rgb, model="hog") |
|
|
encs = face_recognition.face_encodings(rgb, boxes) |
|
|
if boxes: |
|
|
frames_with_faces += 1 |
|
|
print(f"[{job_id}] Frame {frame_idx}: {len(boxes)} cara(s) detectada(s) con face_recognition") |
|
|
for (top, right, bottom, left), e in zip(boxes, encs): |
|
|
crop = frame_normalized[top:bottom, left:right] |
|
|
if crop.size == 0: |
|
|
continue |
|
|
fn = f"face_{frame_idx:06d}_{saved_count:03d}.jpg" |
|
|
cv2.imwrite(str(faces_root / fn), crop) |
|
|
|
|
|
e = np.array(e, dtype=float) |
|
|
e = e / (np.linalg.norm(e) + 1e-9) |
|
|
embeddings.append(e.astype(float).tolist()) |
|
|
crops_meta.append({ |
|
|
"file": fn, |
|
|
"frame": frame_idx, |
|
|
"box": [int(top), int(right), int(bottom), int(left)], |
|
|
}) |
|
|
saved_count += 1 |
|
|
else: |
|
|
|
|
|
if DeepFace is None: |
|
|
pass |
|
|
else: |
|
|
try: |
|
|
gray = cv2.cvtColor(frame_normalized, cv2.COLOR_BGR2GRAY) |
|
|
try: |
|
|
haar_path = getattr(cv2.data, 'haarcascades', None) or '' |
|
|
face_cascade = cv2.CascadeClassifier(os.path.join(haar_path, 'haarcascade_frontalface_default.xml')) |
|
|
except Exception: |
|
|
face_cascade = None |
|
|
boxes_haar = [] |
|
|
if face_cascade is not None and not face_cascade.empty(): |
|
|
|
|
|
faces_haar = face_cascade.detectMultiScale(gray, scaleFactor=1.08, minNeighbors=5, minSize=(50, 50)) |
|
|
for (x, y, w, h) in faces_haar: |
|
|
top, left, bottom, right = max(0, y), max(0, x), min(frame.shape[0], y+h), min(frame.shape[1], x+w) |
|
|
boxes_haar.append((top, right, bottom, left)) |
|
|
|
|
|
|
|
|
if not boxes_haar: |
|
|
try: |
|
|
tmp_detect = faces_root / f"detect_{frame_idx:06d}.jpg" |
|
|
cv2.imwrite(str(tmp_detect), frame_normalized) |
|
|
detect_result = DeepFace.extract_faces(img_path=str(tmp_detect), detector_backend='opencv', enforce_detection=False) |
|
|
for det in detect_result: |
|
|
facial_area = det.get('facial_area', {}) |
|
|
if facial_area: |
|
|
x, y, w, h = facial_area.get('x', 0), facial_area.get('y', 0), facial_area.get('w', 0), facial_area.get('h', 0) |
|
|
|
|
|
|
|
|
is_full_frame = (x <= 5 and y <= 5 and w >= frame.shape[1] - 10 and h >= frame.shape[0] - 10) |
|
|
|
|
|
if w > 50 and h > 50 and not is_full_frame: |
|
|
top, left, bottom, right = max(0, y), max(0, x), min(frame.shape[0], y+h), min(frame.shape[1], x+w) |
|
|
boxes_haar.append((top, right, bottom, left)) |
|
|
tmp_detect.unlink(missing_ok=True) |
|
|
except Exception as _e_detect: |
|
|
print(f"[{job_id}] Frame {frame_idx}: DeepFace extract_faces error: {_e_detect}") |
|
|
|
|
|
if boxes_haar: |
|
|
frames_with_faces += 1 |
|
|
print(f"[{job_id}] Frame {frame_idx}: {len(boxes_haar)} cara(s) detectada(s) con Haar/DeepFace") |
|
|
|
|
|
for (top, right, bottom, left) in boxes_haar: |
|
|
crop = frame_normalized[top:bottom, left:right] |
|
|
if crop.size == 0: |
|
|
continue |
|
|
fn = f"face_{frame_idx:06d}_{saved_count:03d}.jpg" |
|
|
crop_path = faces_root / fn |
|
|
cv2.imwrite(str(crop_path), crop) |
|
|
reps = DeepFace.represent(img_path=str(crop_path), model_name="Facenet512", enforce_detection=False) |
|
|
for r in (reps or []): |
|
|
emb = r.get("embedding") if isinstance(r, dict) else r |
|
|
if emb is None: |
|
|
continue |
|
|
emb = np.array(emb, dtype=float) |
|
|
emb = emb / (np.linalg.norm(emb) + 1e-9) |
|
|
embeddings.append(emb.astype(float).tolist()) |
|
|
crops_meta.append({ |
|
|
"file": fn, |
|
|
"frame": frame_idx, |
|
|
"box": [int(top), int(right), int(bottom), int(left)], |
|
|
}) |
|
|
saved_count += 1 |
|
|
except Exception as _e_df: |
|
|
print(f"[{job_id}] DeepFace fallback error: {_e_df}") |
|
|
cap.release() |
|
|
|
|
|
print(f"[{job_id}] ✓ Frames procesados: {frames_processed}/{len(frame_indices)}") |
|
|
print(f"[{job_id}] ✓ Frames con caras: {frames_with_faces}") |
|
|
print(f"[{job_id}] ✓ Caras detectadas (embeddings): {len(embeddings)}") |
|
|
|
|
|
|
|
|
if embeddings: |
|
|
Xf = np.array(embeddings) |
|
|
labels = hierarchical_cluster_with_min_size(Xf, max_groups, min_cluster_size, face_sensitivity).tolist() |
|
|
print(f"[{job_id}] Clustering jerárquico de caras: {len(set([l for l in labels if l >= 0]))} clusters") |
|
|
else: |
|
|
labels = [] |
|
|
|
|
|
|
|
|
from face_classifier import validate_and_classify_face, get_random_catalan_name_by_gender, FACE_CONFIDENCE_THRESHOLD |
|
|
|
|
|
characters_validated = [] |
|
|
cluster_map: dict[int, list[int]] = {} |
|
|
for i, lbl in enumerate(labels): |
|
|
if isinstance(lbl, int) and lbl >= 0: |
|
|
cluster_map.setdefault(lbl, []).append(i) |
|
|
|
|
|
chars_dir = base / "characters" |
|
|
chars_dir.mkdir(parents=True, exist_ok=True) |
|
|
import shutil as _sh |
|
|
|
|
|
original_cluster_count = len(cluster_map) |
|
|
print(f"[{job_id}] Procesando {original_cluster_count} clusters detectados...") |
|
|
|
|
|
for ci, idxs in sorted(cluster_map.items(), key=lambda x: x[0]): |
|
|
char_id = f"char_{ci:02d}" |
|
|
|
|
|
|
|
|
face_detections = [] |
|
|
for j in idxs: |
|
|
meta = crops_meta[j] |
|
|
box = meta.get("box", [0, 0, 0, 0]) |
|
|
if len(box) >= 4: |
|
|
top, right, bottom, left = box |
|
|
w = abs(right - left) |
|
|
h = abs(bottom - top) |
|
|
area_score = w * h |
|
|
else: |
|
|
area_score = 0 |
|
|
|
|
|
face_detections.append({ |
|
|
'index': j, |
|
|
'score': area_score, |
|
|
'file': meta['file'], |
|
|
'box': box |
|
|
}) |
|
|
|
|
|
|
|
|
face_detections_sorted = sorted( |
|
|
face_detections, |
|
|
key=lambda x: x['score'], |
|
|
reverse=True |
|
|
) |
|
|
|
|
|
if not face_detections_sorted: |
|
|
print(f"[{job_id}] [VALIDATION] ✗ Cluster {char_id}: sense deteccions, eliminant") |
|
|
continue |
|
|
|
|
|
|
|
|
best_face = face_detections_sorted[0] |
|
|
best_face_path = faces_root / best_face['file'] |
|
|
|
|
|
print(f"[{job_id}] [VALIDATION] Cluster {char_id}: validant millor cara (bbox_area={best_face['score']:.0f}px²)") |
|
|
print(f"[{job_id}] [VALIDATION] Cluster {char_id}: millor cara path={best_face_path}") |
|
|
print(f"[{job_id}] [VALIDATION] ▶▶▶ CRIDANT validate_and_classify_face() ◀◀◀") |
|
|
|
|
|
validation = validate_and_classify_face(str(best_face_path)) |
|
|
|
|
|
print(f"[{job_id}] [VALIDATION] ▶▶▶ validate_and_classify_face() RETORNAT ◀◀◀") |
|
|
|
|
|
if not validation: |
|
|
print(f"[{job_id}] [VALIDATION] ✗ Cluster {char_id}: error en validació DeepFace, eliminant cluster") |
|
|
continue |
|
|
|
|
|
|
|
|
print(f"[{job_id}] [DEEPFACE RESULT] Cluster {char_id}:") |
|
|
print(f"[{job_id}] - is_valid_face: {validation['is_valid_face']}") |
|
|
print(f"[{job_id}] - face_confidence: {validation['face_confidence']:.3f}") |
|
|
print(f"[{job_id}] - man_prob: {validation['man_prob']:.3f}") |
|
|
print(f"[{job_id}] - woman_prob: {validation['woman_prob']:.3f}") |
|
|
print(f"[{job_id}] - gender_diff: {abs(validation['man_prob'] - validation['woman_prob']):.3f}") |
|
|
print(f"[{job_id}] - gender_assigned: {validation['gender']}") |
|
|
print(f"[{job_id}] - gender_confidence: {validation['gender_confidence']:.3f}") |
|
|
|
|
|
|
|
|
if not validation['is_valid_face'] or validation['face_confidence'] < FACE_CONFIDENCE_THRESHOLD: |
|
|
print(f"[{job_id}] [VALIDATION] ✗ Cluster {char_id}: NO ES UNA CARA VÁLIDA (face_confidence={validation['face_confidence']:.3f} < threshold={FACE_CONFIDENCE_THRESHOLD}), eliminant tot el clúster") |
|
|
continue |
|
|
|
|
|
|
|
|
out_dir = chars_dir / char_id |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
total_faces = len(face_detections_sorted) |
|
|
max_faces_to_show = (total_faces // 2) + 1 |
|
|
face_detections_limited = face_detections_sorted[:max_faces_to_show] |
|
|
|
|
|
|
|
|
files = [] |
|
|
face_files_urls = [] |
|
|
for k, face_det in enumerate(face_detections_limited): |
|
|
fname = face_det['file'] |
|
|
src = faces_root / fname |
|
|
dst = out_dir / fname |
|
|
try: |
|
|
_sh.copy2(src, dst) |
|
|
files.append(fname) |
|
|
face_files_urls.append(f"/files/{video_name}/{char_id}/{fname}") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
rep = files[0] if files else None |
|
|
if rep: |
|
|
rep_src = out_dir / rep |
|
|
rep_dst = out_dir / "representative.jpg" |
|
|
try: |
|
|
_sh.copy2(rep_src, rep_dst) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
gender = validation['gender'] |
|
|
character_name = get_random_catalan_name_by_gender(gender, char_id) |
|
|
|
|
|
print(f"[{job_id}] [NAME GENERATION] Cluster {char_id}:") |
|
|
print(f"[{job_id}] - Gender detectado: {gender}") |
|
|
print(f"[{job_id}] - Nombre asignado: {character_name}") |
|
|
print(f"[{job_id}] - Seed usado: {char_id}") |
|
|
|
|
|
character_data = { |
|
|
"id": char_id, |
|
|
"name": character_name, |
|
|
"gender": gender, |
|
|
"gender_confidence": validation['gender_confidence'], |
|
|
"face_confidence": validation['face_confidence'], |
|
|
"man_prob": validation['man_prob'], |
|
|
"woman_prob": validation['woman_prob'], |
|
|
"folder": str(out_dir), |
|
|
"num_faces": len(files), |
|
|
"total_faces_detected": total_faces, |
|
|
"image_url": f"/files/{video_name}/{char_id}/representative.jpg" if rep else "", |
|
|
"face_files": face_files_urls, |
|
|
} |
|
|
|
|
|
characters_validated.append(character_data) |
|
|
|
|
|
print(f"[{job_id}] [VALIDATION] ✓ Cluster {char_id}: CARA VÁLIDA!") |
|
|
print(f"[{job_id}] Nombre: {character_name}") |
|
|
print(f"[{job_id}] Género: {gender} (man={validation['man_prob']:.3f}, woman={validation['woman_prob']:.3f})") |
|
|
print(f"[{job_id}] Confianza género: {validation['gender_confidence']:.3f}") |
|
|
print(f"[{job_id}] Confianza cara: {validation['face_confidence']:.3f}") |
|
|
print(f"[{job_id}] Caras mostradas: {len(files)}/{total_faces}") |
|
|
print(f"[{job_id}] Imagen representativa: {best_face_path.name}") |
|
|
|
|
|
|
|
|
eliminated_count = original_cluster_count - len(characters_validated) |
|
|
print(f"[{job_id}] [VALIDATION] Total: {len(characters_validated)} clústers vàlids " |
|
|
f"(eliminats {eliminated_count} falsos positius)") |
|
|
|
|
|
characters = characters_validated |
|
|
|
|
|
|
|
|
analysis = { |
|
|
"caras": [{"embeddings": e} for e in embeddings], |
|
|
"voices": [], |
|
|
"escenas": [], |
|
|
} |
|
|
analysis_path = str(base / "analysis.json") |
|
|
with open(analysis_path, "w", encoding="utf-8") as f: |
|
|
json.dump(analysis, f, ensure_ascii=False) |
|
|
|
|
|
face_labels = labels |
|
|
num_face_embeddings = len(embeddings) |
|
|
|
|
|
print(f"[{job_id}] Personajes detectados: {len(characters)}") |
|
|
for char in characters: |
|
|
print(f"[{job_id}] - {char['name']}: {char['num_faces']} caras") |
|
|
|
|
|
|
|
|
try: |
|
|
import glob, os |
|
|
for ch in characters: |
|
|
folder = ch.get("folder") |
|
|
face_files = [] |
|
|
if folder and os.path.isdir(folder): |
|
|
|
|
|
patterns = ["face_*.jpg", "face_*.png"] |
|
|
files = [] |
|
|
for pat in patterns: |
|
|
files.extend(glob.glob(os.path.join(folder, pat))) |
|
|
|
|
|
if not files: |
|
|
files.extend(glob.glob(os.path.join(folder, "*.jpg"))) |
|
|
files.extend(glob.glob(os.path.join(folder, "*.png"))) |
|
|
|
|
|
face_files = sorted({os.path.basename(p) for p in files}) |
|
|
|
|
|
for rep_name in ("representative.jpg", "representative.png"): |
|
|
rep_path = os.path.join(folder, rep_name) |
|
|
if os.path.exists(rep_path): |
|
|
if rep_name in face_files: |
|
|
face_files.remove(rep_name) |
|
|
face_files.insert(0, rep_name) |
|
|
ch["face_files"] = face_files |
|
|
|
|
|
if face_files: |
|
|
ch["num_faces"] = len(face_files) |
|
|
except Exception as _e: |
|
|
print(f"[{job_id}] WARN - No se pudo enumerar face_files: {_e}") |
|
|
|
|
|
|
|
|
try: |
|
|
cfg = load_yaml("config.yaml") |
|
|
audio_segments, srt_unmod, full_txt, diar_info, connection_logs = process_audio_for_video(video_path, base, cfg, voice_collection=None) |
|
|
|
|
|
try: |
|
|
for ev in (connection_logs or []): |
|
|
msg = ev.get("message") if isinstance(ev, dict) else None |
|
|
if msg: |
|
|
print(f"[{job_id}] {msg}") |
|
|
except Exception: |
|
|
pass |
|
|
except Exception as e_audio: |
|
|
import traceback |
|
|
print(f"[{job_id}] WARN - Audio pipeline failed: {e_audio}\n{traceback.format_exc()}") |
|
|
audio_segments, srt_unmod, full_txt = [], None, "" |
|
|
diar_info = {"diarization_ok": False, "error": str(e_audio)} |
|
|
connection_logs = [] |
|
|
|
|
|
|
|
|
if not audio_segments: |
|
|
try: |
|
|
from pathlib import Path as _P |
|
|
from pydub import AudioSegment as _AS |
|
|
wav_out = extract_audio_ffmpeg(video_path, base / f"{_P(video_path).stem}.wav", sr=16000) |
|
|
audio = _AS.from_wav(wav_out) |
|
|
clips_dir = base / "clips" |
|
|
clips_dir.mkdir(parents=True, exist_ok=True) |
|
|
cp = clips_dir / "segment_000.wav" |
|
|
audio.export(cp, format="wav") |
|
|
emb_list = embed_voice_segments([str(cp)]) |
|
|
audio_segments = [{ |
|
|
"segment": 0, |
|
|
"start": 0.0, |
|
|
"end": float(len(audio) / 1000.0), |
|
|
"speaker": "SPEAKER_00", |
|
|
"text": "", |
|
|
"voice_embedding": emb_list[0] if emb_list else [], |
|
|
"clip_path": str(cp), |
|
|
"lang": "ca", |
|
|
"lang_prob": 1.0, |
|
|
}] |
|
|
except Exception as _efb: |
|
|
print(f"[{job_id}] WARN - Audio minimal fallback failed: {_efb}") |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
voice_embeddings = [seg.get("voice_embedding") for seg in audio_segments if seg.get("voice_embedding")] |
|
|
if voice_embeddings: |
|
|
try: |
|
|
Xv = np.array(voice_embeddings) |
|
|
v_labels = hierarchical_cluster_with_min_size(Xv, v_max_groups, v_min_cluster, voice_sensitivity).tolist() |
|
|
print(f"[{job_id}] Clustering jerárquico de voz: {len(set([l for l in v_labels if l >= 0]))} clusters") |
|
|
except Exception as _e: |
|
|
print(f"[{job_id}] WARN - Voice clustering failed: {_e}") |
|
|
v_labels = [] |
|
|
else: |
|
|
v_labels = [] |
|
|
|
|
|
|
|
|
job["results"] = { |
|
|
"characters": characters, |
|
|
"num_characters": len(characters), |
|
|
"analysis_path": analysis_path, |
|
|
"base_dir": str(base), |
|
|
"face_labels": face_labels, |
|
|
"num_face_embeddings": num_face_embeddings, |
|
|
"audio_segments": audio_segments, |
|
|
"srt_unmodified": srt_unmod, |
|
|
"full_transcription": full_txt, |
|
|
"voice_labels": v_labels, |
|
|
"num_voice_embeddings": len(voice_embeddings), |
|
|
"diarization_info": diar_info, |
|
|
} |
|
|
job["status"] = JobStatus.DONE |
|
|
|
|
|
|
|
|
print(f"[{job_id}] ✓ Resultados guardados:") |
|
|
print(f"[{job_id}] - Personatges: {len(characters)}") |
|
|
print(f"[{job_id}] - Segments d'àudio: {len(audio_segments)}") |
|
|
print(f"[{job_id}] - Face embeddings: {num_face_embeddings}") |
|
|
print(f"[{job_id}] - Voice embeddings: {len(voice_embeddings)}") |
|
|
|
|
|
except Exception as e_detect: |
|
|
|
|
|
import traceback |
|
|
print(f"[{job_id}] ✗ Error en detección: {e_detect}") |
|
|
print(f"[{job_id}] Traceback: {traceback.format_exc()}") |
|
|
print(f"[{job_id}] Usando modo fallback (carpetas vacías)") |
|
|
|
|
|
|
|
|
for sub in ("sources", "faces", "voices", "backgrounds"): |
|
|
(base / sub).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
job["results"] = { |
|
|
"characters": [], |
|
|
"num_characters": 0, |
|
|
"temp_dirs": { |
|
|
"sources": str(base / "sources"), |
|
|
"faces": str(base / "faces"), |
|
|
"voices": str(base / "voices"), |
|
|
"backgrounds": str(base / "backgrounds"), |
|
|
}, |
|
|
"warning": f"Detección falló, usando modo fallback: {str(e_detect)}" |
|
|
} |
|
|
job["status"] = JobStatus.DONE |
|
|
|
|
|
print(f"[{job_id}] ✓ Job completado exitosamente") |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
print(f"[{job_id}] ✗ Error inesperado: {e}") |
|
|
try: |
|
|
job = jobs.get(job_id) |
|
|
if job is not None: |
|
|
job["status"] = JobStatus.FAILED |
|
|
job["error"] = str(e) |
|
|
except Exception: |
|
|
pass |
|
|
print(f"[{job_id}] Traceback: {traceback.format_exc()}") |
|
|
|
|
|
@app.post("/generate_audiodescription") |
|
|
async def generate_audiodescription(video: UploadFile = File(...)): |
|
|
try: |
|
|
import uuid |
|
|
job_id = str(uuid.uuid4()) |
|
|
vid_name = video.filename or f"video_{job_id}.mp4" |
|
|
base = TEMP_ROOT / Path(vid_name).stem |
|
|
|
|
|
base.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
video_path = base / vid_name |
|
|
with open(video_path, "wb") as f: |
|
|
f.write(await video.read()) |
|
|
|
|
|
|
|
|
result = ad_generate(str(video_path), base) |
|
|
|
|
|
return { |
|
|
"status": "done", |
|
|
"results": { |
|
|
"une_srt": result.get("une_srt", ""), |
|
|
"free_text": result.get("free_text", ""), |
|
|
"artifacts": result.get("artifacts", {}), |
|
|
}, |
|
|
} |
|
|
except Exception as e: |
|
|
import traceback |
|
|
print(f"/generate_audiodescription error: {e}\n{traceback.format_exc()}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/load_casting") |
|
|
async def load_casting( |
|
|
faces_dir: str = Form("identities/faces"), |
|
|
voices_dir: str = Form("identities/voices"), |
|
|
db_dir: str = Form("chroma_db"), |
|
|
drop_collections: bool = Form(False), |
|
|
): |
|
|
client = ensure_chroma(Path(db_dir)) |
|
|
n_faces = build_faces_index(Path(faces_dir), client, collection_name="index_faces", drop=drop_collections) |
|
|
n_voices = build_voices_index(Path(voices_dir), client, collection_name="index_voices", drop=drop_collections) |
|
|
return {"ok": True, "faces": n_faces, "voices": n_voices} |
|
|
|
|
|
@app.post("/finalize_casting") |
|
|
async def finalize_casting( |
|
|
payload: dict = Body(...), |
|
|
): |
|
|
""" |
|
|
Consolidate selected face and voice clusters into identities directories and build indices. |
|
|
Expected payload: |
|
|
{ |
|
|
"video_name": str, |
|
|
"base_dir": str, # engine temp base for this video |
|
|
"characters": [ |
|
|
{"id": "char1", "name": "Nom", "folder": "/tmp/temp/<video>/char1", "kept_files": ["representative.jpg", ...], "description": "..."}, ... |
|
|
], |
|
|
"voice_clusters": [ |
|
|
{"label": 0, "name": "SPEAKER_00", "clips": ["segment_000.wav", ...]}, ... |
|
|
] |
|
|
} |
|
|
""" |
|
|
import os |
|
|
import shutil |
|
|
from pathlib import Path as _P |
|
|
|
|
|
video_name = payload.get("video_name") |
|
|
base_dir = payload.get("base_dir") |
|
|
characters = payload.get("characters", []) or [] |
|
|
voice_clusters = payload.get("voice_clusters", []) or [] |
|
|
|
|
|
if not video_name or not base_dir: |
|
|
raise HTTPException(status_code=400, detail="Missing video_name or base_dir") |
|
|
|
|
|
faces_out = IDENTITIES_ROOT / video_name / "faces" |
|
|
voices_out = IDENTITIES_ROOT / video_name / "voices" |
|
|
faces_out.mkdir(parents=True, exist_ok=True) |
|
|
voices_out.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
for ch in characters: |
|
|
ch_name = (ch.get("name") or "Unknown").strip() or "Unknown" |
|
|
ch_folder = ch.get("folder") |
|
|
kept = ch.get("kept_files") or [] |
|
|
if not ch_folder or not os.path.isdir(ch_folder): |
|
|
continue |
|
|
dst_dir = faces_out / ch_name |
|
|
dst_dir.mkdir(parents=True, exist_ok=True) |
|
|
for fname in kept: |
|
|
src = _P(ch_folder) / fname |
|
|
if src.exists() and src.is_file(): |
|
|
try: |
|
|
shutil.copy2(src, dst_dir / fname) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
clips_dir = _P(base_dir) / "clips" |
|
|
for vc in voice_clusters: |
|
|
v_name = (vc.get("name") or f"SPEAKER_{int(vc.get('label',0)):02d}").strip() |
|
|
dst_dir = voices_out / v_name |
|
|
dst_dir.mkdir(parents=True, exist_ok=True) |
|
|
for wav in (vc.get("clips") or []): |
|
|
src = clips_dir / wav |
|
|
if src.exists() and src.is_file(): |
|
|
try: |
|
|
shutil.copy2(src, dst_dir / wav) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
db_dir = IDENTITIES_ROOT / video_name / "chroma_db" |
|
|
client = ensure_chroma(db_dir) |
|
|
n_faces = build_faces_index(faces_out, client, collection_name="index_faces", deepface_model='Facenet512', drop=True) |
|
|
n_voices = build_voices_index(voices_out, client, collection_name="index_voices", drop=True) |
|
|
|
|
|
|
|
|
face_identities = sorted([p.name for p in faces_out.iterdir() if p.is_dir()]) if faces_out.exists() else [] |
|
|
voice_identities = sorted([p.name for p in voices_out.iterdir() if p.is_dir()]) if voices_out.exists() else [] |
|
|
|
|
|
return { |
|
|
"ok": True, |
|
|
"video_name": video_name, |
|
|
"faces_dir": str(faces_out), |
|
|
"voices_dir": str(voices_out), |
|
|
"db_dir": str(db_dir), |
|
|
"n_faces_embeddings": n_faces, |
|
|
"n_voices_embeddings": n_voices, |
|
|
"face_identities": face_identities, |
|
|
"voice_identities": voice_identities, |
|
|
} |
|
|
|
|
|
@app.get("/files_scene/{video_name}/{scene_id}/{filename}") |
|
|
def serve_scene_file(video_name: str, scene_id: str, filename: str): |
|
|
file_path = TEMP_ROOT / video_name / "scenes" / scene_id / filename |
|
|
if not file_path.exists(): |
|
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
return FileResponse(file_path) |
|
|
|
|
|
@app.post("/detect_scenes") |
|
|
async def detect_scenes( |
|
|
video: UploadFile = File(...), |
|
|
max_groups: int = Form(default=3), |
|
|
min_cluster_size: int = Form(default=3), |
|
|
scene_sensitivity: float = Form(default=0.5), |
|
|
frame_interval_sec: float = Form(default=0.5), |
|
|
): |
|
|
""" |
|
|
Detecta clústers d'escenes mitjançant clustering jeràrquic d'histogrames de color. |
|
|
Retorna una llista de scene_clusters estructurada de forma similar a characters. |
|
|
""" |
|
|
import cv2 |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
video_name = Path(video.filename).stem |
|
|
dst_video = VIDEOS_ROOT / f"{video_name}.mp4" |
|
|
with dst_video.open("wb") as f: |
|
|
shutil.copyfileobj(video.file, f) |
|
|
|
|
|
cap = cv2.VideoCapture(str(dst_video)) |
|
|
if not cap.isOpened(): |
|
|
raise HTTPException(status_code=400, detail="Cannot open video") |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 |
|
|
step = max(1, int(frame_interval_sec * fps)) |
|
|
|
|
|
frames = [] |
|
|
metas = [] |
|
|
idx = 0 |
|
|
while True: |
|
|
ret = cap.grab() |
|
|
if not ret: |
|
|
break |
|
|
if idx % step == 0: |
|
|
ret2, frame = cap.retrieve() |
|
|
if not ret2: |
|
|
break |
|
|
|
|
|
small = cv2.resize(frame, (160, 90)) |
|
|
hsv = cv2.cvtColor(small, cv2.COLOR_BGR2HSV) |
|
|
|
|
|
h_hist = cv2.calcHist([hsv],[0],None,[32],[0,180]).flatten() |
|
|
s_hist = cv2.calcHist([hsv],[1],None,[32],[0,256]).flatten() |
|
|
v_hist = cv2.calcHist([hsv],[2],None,[32],[0,256]).flatten() |
|
|
hist = np.concatenate([h_hist, s_hist, v_hist]) |
|
|
hist = hist / (np.linalg.norm(hist) + 1e-8) |
|
|
frames.append(hist) |
|
|
metas.append({"index": idx, "time_sec": idx/float(fps)}) |
|
|
idx += 1 |
|
|
cap.release() |
|
|
|
|
|
if not frames: |
|
|
return {"scene_clusters": []} |
|
|
|
|
|
X = np.array(frames) |
|
|
labels = hierarchical_cluster_with_min_size(X, max_groups, min_cluster_size, scene_sensitivity).tolist() |
|
|
initial_clusters = len(set([l for l in labels if l >= 0])) |
|
|
print(f"Scene clustering jeràrquic inicial: {initial_clusters} clusters") |
|
|
|
|
|
|
|
|
clusters = {} |
|
|
for i, lbl in enumerate(labels): |
|
|
if lbl is None or lbl < 0: |
|
|
continue |
|
|
clusters.setdefault(int(lbl), []).append(i) |
|
|
|
|
|
|
|
|
|
|
|
centroids = {} |
|
|
for lbl, idxs in clusters.items(): |
|
|
cluster_histograms = X[idxs] |
|
|
centroids[lbl] = np.mean(cluster_histograms, axis=0) |
|
|
|
|
|
print(f"[SCENE VALIDATION] Validant similaritat entre {len(centroids)} clusters...") |
|
|
|
|
|
|
|
|
SIMILARITY_THRESHOLD = 0.25 |
|
|
CORRELATION_THRESHOLD = 0.85 |
|
|
|
|
|
|
|
|
cluster_labels = sorted(centroids.keys()) |
|
|
similarities = {} |
|
|
|
|
|
for i, lbl1 in enumerate(cluster_labels): |
|
|
for lbl2 in cluster_labels[i+1:]: |
|
|
|
|
|
dist = np.linalg.norm(centroids[lbl1] - centroids[lbl2]) |
|
|
|
|
|
|
|
|
corr = np.corrcoef(centroids[lbl1], centroids[lbl2])[0, 1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
are_similar = (dist < SIMILARITY_THRESHOLD) or (corr > CORRELATION_THRESHOLD) |
|
|
|
|
|
similarities[(lbl1, lbl2)] = { |
|
|
'distance': dist, |
|
|
'correlation': corr, |
|
|
'similar': are_similar |
|
|
} |
|
|
|
|
|
if are_similar: |
|
|
print(f"[SCENE VALIDATION] Clusters {lbl1} i {lbl2} són similars: " |
|
|
f"dist={dist:.3f} (threshold={SIMILARITY_THRESHOLD}), " |
|
|
f"corr={corr:.3f} (threshold={CORRELATION_THRESHOLD})") |
|
|
|
|
|
|
|
|
|
|
|
parent = {lbl: lbl for lbl in cluster_labels} |
|
|
|
|
|
def find(x): |
|
|
if parent[x] != x: |
|
|
parent[x] = find(parent[x]) |
|
|
return parent[x] |
|
|
|
|
|
def union(x, y): |
|
|
root_x = find(x) |
|
|
root_y = find(y) |
|
|
if root_x != root_y: |
|
|
parent[root_y] = root_x |
|
|
|
|
|
|
|
|
fusion_count = 0 |
|
|
for (lbl1, lbl2), sim in similarities.items(): |
|
|
if sim['similar']: |
|
|
union(lbl1, lbl2) |
|
|
fusion_count += 1 |
|
|
|
|
|
|
|
|
new_clusters = {} |
|
|
for lbl, idxs in clusters.items(): |
|
|
root = find(lbl) |
|
|
if root not in new_clusters: |
|
|
new_clusters[root] = [] |
|
|
new_clusters[root].extend(idxs) |
|
|
|
|
|
|
|
|
final_clusters_dict = {} |
|
|
for i, (root, idxs) in enumerate(sorted(new_clusters.items())): |
|
|
final_clusters_dict[i] = idxs |
|
|
|
|
|
clusters = final_clusters_dict |
|
|
final_clusters = len(clusters) |
|
|
eliminated = initial_clusters - final_clusters |
|
|
|
|
|
print(f"[SCENE VALIDATION] ===== RESULTADO =====") |
|
|
print(f"[SCENE VALIDATION] Clusters inicials: {initial_clusters}") |
|
|
print(f"[SCENE VALIDATION] Fusions realitzades: {fusion_count}") |
|
|
print(f"[SCENE VALIDATION] Clusters finals: {final_clusters}") |
|
|
print(f"[SCENE VALIDATION] Clusters eliminats (fusionats): {eliminated}") |
|
|
print(f"[SCENE VALIDATION] Reducció: {(eliminated/initial_clusters*100):.1f}%") |
|
|
print(f"[SCENE VALIDATION] =======================") |
|
|
|
|
|
|
|
|
base = TEMP_ROOT / video_name / "scenes" |
|
|
base.mkdir(parents=True, exist_ok=True) |
|
|
scene_list = [] |
|
|
cap = cv2.VideoCapture(str(dst_video)) |
|
|
for lbl, idxs in sorted(clusters.items(), key=lambda x: x[0]): |
|
|
scene_id = f"scene_{int(lbl):02d}" |
|
|
out_dir = base / scene_id |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
frame_files = [] |
|
|
|
|
|
for k, fi in enumerate(idxs[:12]): |
|
|
frame_num = metas[fi]["index"] |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) |
|
|
ret2, frame = cap.read() |
|
|
if not ret2: |
|
|
continue |
|
|
fn = f"frame_{k:03d}.jpg" |
|
|
cv2.imwrite(str(out_dir / fn), frame) |
|
|
frame_files.append(fn) |
|
|
|
|
|
rep = frame_files[0] if frame_files else None |
|
|
image_url = f"/files_scene/{video_name}/{scene_id}/{rep}" if rep else "" |
|
|
|
|
|
|
|
|
scene_description = "" |
|
|
scene_name = f"Escena {lbl+1}" |
|
|
if rep: |
|
|
rep_full_path = out_dir / rep |
|
|
if rep_full_path.exists(): |
|
|
print(f"Llamando a svision para describir {scene_id}...") |
|
|
try: |
|
|
scene_description, scene_name = describe_image_with_svision(str(rep_full_path), is_face=False) |
|
|
if not scene_name: |
|
|
scene_name = f"Escena {lbl+1}" |
|
|
|
|
|
|
|
|
if scene_description: |
|
|
print(f"Llamando a schat para generar nombre corto de {scene_id}...") |
|
|
try: |
|
|
|
|
|
config_path = os.getenv("CONFIG_YAML", "config.yaml") |
|
|
if os.path.exists(config_path): |
|
|
with open(config_path, 'r', encoding='utf-8') as f: |
|
|
cfg = yaml.safe_load(f) or {} |
|
|
router = LLMRouter(cfg) |
|
|
|
|
|
prompt = f"Basant-te en aquesta descripció d'una escena, genera un nom curt de menys de 3 paraules que la resumeixi:\n\n{scene_description}\n\nNom de l'escena:" |
|
|
|
|
|
short_name = router.instruct( |
|
|
prompt=prompt, |
|
|
system="Ets un assistent que genera noms curts i descriptius per a escenes. Respon NOMÉS amb el nom, sense explicacions.", |
|
|
model="salamandra-instruct" |
|
|
).strip() |
|
|
|
|
|
|
|
|
short_name = short_name.strip('"\'.,!?').strip() |
|
|
|
|
|
if short_name and len(short_name) > 0: |
|
|
scene_name = short_name |
|
|
print(f"[schat] Nom generat: {scene_name}") |
|
|
else: |
|
|
print(f"[schat] No s'ha generat nom, usant fallback") |
|
|
except Exception as e_schat: |
|
|
print(f"Error generando nombre con schat: {e_schat}") |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error describiendo {scene_id}: {e}") |
|
|
|
|
|
scene_list.append({ |
|
|
"id": scene_id, |
|
|
"name": scene_name, |
|
|
"description": scene_description, |
|
|
"folder": str(out_dir), |
|
|
"num_frames": len(frame_files), |
|
|
"image_url": image_url, |
|
|
"frame_files": frame_files, |
|
|
}) |
|
|
cap.release() |
|
|
|
|
|
return {"scene_clusters": scene_list, "base_dir": str(base)} |
|
|
|
|
|
@app.post("/refine_narration") |
|
|
async def refine_narration( |
|
|
dialogues_srt: str = Form(...), |
|
|
frame_descriptions_json: str = Form("[]"), |
|
|
config_path: str = Form("config.yaml"), |
|
|
): |
|
|
cfg = load_yaml(config_path) |
|
|
frames = json.loads(frame_descriptions_json) |
|
|
model_name = cfg.get("narration", {}).get("model", "salamandra-instruct") |
|
|
use_remote = model_name in (cfg.get("models", {}).get("routing", {}).get("use_remote_for", [])) |
|
|
|
|
|
if use_remote: |
|
|
router = LLMRouter(cfg) |
|
|
system_msg = ( |
|
|
"Eres un sistema de audiodescripción que cumple UNE-153010. " |
|
|
"Fusiona diálogos del SRT con descripciones concisas en los huecos, evitando redundancias. " |
|
|
"Devuelve JSON con {narrative_text, srt_text}." |
|
|
) |
|
|
prompt = json.dumps({"dialogues_srt": dialogues_srt, "frames": frames, "rules": cfg.get("narration", {})}, ensure_ascii=False) |
|
|
try: |
|
|
txt = router.instruct(prompt=prompt, system=system_msg, model=model_name) |
|
|
out = {} |
|
|
try: |
|
|
out = json.loads(txt) |
|
|
except Exception: |
|
|
out = {"narrative_text": txt, "srt_text": ""} |
|
|
return { |
|
|
"narrative_text": out.get("narrative_text", ""), |
|
|
"srt_text": out.get("srt_text", ""), |
|
|
"approved": True, |
|
|
"critic_feedback": "", |
|
|
} |
|
|
except Exception: |
|
|
ns = NarrationSystem(model_url=None, une_guidelines_path=cfg.get("narration", {}).get("narration_une_guidelines_path", "UNE_153010.txt")) |
|
|
res = ns.run(dialogues_srt, frames) |
|
|
return {"narrative_text": res.narrative_text, "srt_text": res.srt_text, "approved": res.approved, "critic_feedback": res.critic_feedback} |
|
|
|
|
|
ns = NarrationSystem(model_url=None, une_guidelines_path=cfg.get("narration", {}).get("une_guidelines_path", "UNE_153010.txt")) |
|
|
out = ns.run(dialogues_srt, frames) |
|
|
return {"narrative_text": out.narrative_text, "srt_text": out.srt_text, "approved": out.approved, "critic_feedback": out.critic_feedback} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|