|
|
from __future__ import annotations
|
|
|
|
|
|
from fastapi import APIRouter, UploadFile, File, Form, BackgroundTasks, HTTPException, Body
|
|
|
from fastapi.responses import FileResponse
|
|
|
from pathlib import Path
|
|
|
from datetime import datetime
|
|
|
from enum import Enum
|
|
|
from typing import Dict, Any, List
|
|
|
import shutil
|
|
|
import os
|
|
|
import uuid
|
|
|
import numpy as np
|
|
|
import cv2
|
|
|
import tempfile
|
|
|
|
|
|
from casting_loader import ensure_chroma, build_faces_index, build_voices_index
|
|
|
from llm_router import load_yaml, LLMRouter
|
|
|
|
|
|
|
|
|
import svision_client
|
|
|
import asr_client
|
|
|
|
|
|
|
|
|
ROOT = Path("/tmp/veureu")
|
|
|
ROOT.mkdir(parents=True, exist_ok=True)
|
|
|
TEMP_ROOT = Path("/tmp/temp")
|
|
|
TEMP_ROOT.mkdir(parents=True, exist_ok=True)
|
|
|
VIDEOS_ROOT = Path("/tmp/data/videos")
|
|
|
VIDEOS_ROOT.mkdir(parents=True, exist_ok=True)
|
|
|
IDENTITIES_ROOT = Path("/tmp/characters")
|
|
|
IDENTITIES_ROOT.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
class JobStatus(str, Enum):
|
|
|
QUEUED = "queued"
|
|
|
PROCESSING = "processing"
|
|
|
DONE = "done"
|
|
|
FAILED = "failed"
|
|
|
|
|
|
|
|
|
jobs: Dict[str, dict] = {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int, sensitivity: float = 0.5) -> np.ndarray:
|
|
|
"""Hierarchical clustering using only min_cluster_size and k-target (max_groups).
|
|
|
|
|
|
- Primero intenta crear el máximo número posible de clusters con al menos
|
|
|
``min_cluster_size`` elementos.
|
|
|
- Después fusiona implícitamente (bajando el número de clusters) hasta
|
|
|
llegar a un número de clusters válidos (tamaño >= min_cluster_size)
|
|
|
menor o igual que ``max_groups``.
|
|
|
|
|
|
``sensitivity`` se mantiene en la firma por compatibilidad, pero no se usa.
|
|
|
"""
|
|
|
from scipy.cluster.hierarchy import linkage, fcluster
|
|
|
from collections import Counter
|
|
|
|
|
|
n_samples = len(X)
|
|
|
if n_samples == 0:
|
|
|
return np.array([])
|
|
|
|
|
|
|
|
|
|
|
|
if n_samples < min_cluster_size:
|
|
|
return np.full(n_samples, -1, dtype=int)
|
|
|
|
|
|
|
|
|
k_target = max(0, int(max_groups))
|
|
|
|
|
|
|
|
|
if k_target == 0:
|
|
|
return np.full(n_samples, -1, dtype=int)
|
|
|
|
|
|
|
|
|
Z = linkage(X, method="average", metric="cosine")
|
|
|
|
|
|
|
|
|
max_possible = n_samples // min_cluster_size
|
|
|
if max_possible <= 0:
|
|
|
return np.full(n_samples, -1, dtype=int)
|
|
|
|
|
|
max_to_try = min(max_possible, n_samples)
|
|
|
|
|
|
best_labels = np.full(n_samples, -1, dtype=int)
|
|
|
|
|
|
|
|
|
|
|
|
for n_clusters in range(max_to_try, 0, -1):
|
|
|
trial_labels = fcluster(Z, t=n_clusters, criterion="maxclust") - 1
|
|
|
counts = Counter(trial_labels)
|
|
|
|
|
|
|
|
|
valid_clusters = {lbl for lbl, cnt in counts.items() if cnt >= min_cluster_size}
|
|
|
num_valid = len(valid_clusters)
|
|
|
|
|
|
if num_valid == 0:
|
|
|
|
|
|
continue
|
|
|
|
|
|
if num_valid <= k_target:
|
|
|
|
|
|
final_labels = []
|
|
|
for lbl in trial_labels:
|
|
|
if lbl in valid_clusters:
|
|
|
final_labels.append(lbl)
|
|
|
else:
|
|
|
final_labels.append(-1)
|
|
|
best_labels = np.array(final_labels, dtype=int)
|
|
|
break
|
|
|
|
|
|
return best_labels
|
|
|
|
|
|
|
|
|
router = APIRouter(tags=["Preprocessing Manager"])
|
|
|
|
|
|
|
|
|
@router.post("/create_initial_casting")
|
|
|
async def create_initial_casting(
|
|
|
background_tasks: BackgroundTasks,
|
|
|
video: UploadFile = File(...),
|
|
|
max_groups: int = Form(default=3),
|
|
|
min_cluster_size: int = Form(default=3),
|
|
|
face_sensitivity: float = Form(default=0.5),
|
|
|
voice_max_groups: int = Form(default=3),
|
|
|
voice_min_cluster_size: int = Form(default=3),
|
|
|
voice_sensitivity: float = Form(default=0.5),
|
|
|
max_frames: int = Form(default=100),
|
|
|
):
|
|
|
video_name = Path(video.filename).stem
|
|
|
dst_video = VIDEOS_ROOT / f"{video_name}.mp4"
|
|
|
with dst_video.open("wb") as f:
|
|
|
shutil.copyfileobj(video.file, f)
|
|
|
|
|
|
job_id = str(uuid.uuid4())
|
|
|
|
|
|
jobs[job_id] = {
|
|
|
"id": job_id,
|
|
|
"status": JobStatus.QUEUED,
|
|
|
"video_path": str(dst_video),
|
|
|
"video_name": video_name,
|
|
|
"max_groups": int(max_groups),
|
|
|
"min_cluster_size": int(min_cluster_size),
|
|
|
"face_sensitivity": float(face_sensitivity),
|
|
|
"voice_max_groups": int(voice_max_groups),
|
|
|
"voice_min_cluster_size": int(voice_min_cluster_size),
|
|
|
"voice_sensitivity": float(voice_sensitivity),
|
|
|
"max_frames": int(max_frames),
|
|
|
"created_at": datetime.now().isoformat(),
|
|
|
"results": None,
|
|
|
"error": None,
|
|
|
}
|
|
|
|
|
|
print(f"[{job_id}] Job creado para vídeo: {video_name}")
|
|
|
background_tasks.add_task(process_video_job, job_id)
|
|
|
return {"job_id": job_id}
|
|
|
|
|
|
|
|
|
@router.get("/jobs/{job_id}/status")
|
|
|
def get_job_status(job_id: str):
|
|
|
if job_id not in jobs:
|
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
|
|
job = jobs[job_id]
|
|
|
status_value = job["status"].value if isinstance(job["status"], JobStatus) else str(job["status"])
|
|
|
response = {"status": status_value}
|
|
|
|
|
|
if job.get("results") is not None:
|
|
|
response["results"] = job["results"]
|
|
|
if job.get("error"):
|
|
|
response["error"] = job["error"]
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
@router.get("/files/{video_name}/{char_id}/{filename}")
|
|
|
def serve_character_file(video_name: str, char_id: str, filename: str):
|
|
|
file_path = TEMP_ROOT / video_name / "characters" / char_id / filename
|
|
|
if not file_path.exists():
|
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
return FileResponse(file_path)
|
|
|
|
|
|
|
|
|
@router.get("/audio/{video_name}/{filename}")
|
|
|
def serve_audio_file(video_name: str, filename: str):
|
|
|
file_path = TEMP_ROOT / video_name / "clips" / filename
|
|
|
if not file_path.exists():
|
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
return FileResponse(file_path)
|
|
|
|
|
|
|
|
|
@router.post("/load_casting")
|
|
|
async def load_casting(
|
|
|
faces_dir: str = Form("identities/faces"),
|
|
|
voices_dir: str = Form("identities/voices"),
|
|
|
db_dir: str = Form("chroma_db"),
|
|
|
drop_collections: bool = Form(False),
|
|
|
):
|
|
|
client = ensure_chroma(Path(db_dir))
|
|
|
n_faces = build_faces_index(Path(faces_dir), client, collection_name="index_faces", drop=drop_collections)
|
|
|
n_voices = build_voices_index(Path(voices_dir), client, collection_name="index_voices", drop=drop_collections)
|
|
|
return {"ok": True, "faces": n_faces, "voices": n_voices}
|
|
|
|
|
|
|
|
|
@router.post("/finalize_casting")
|
|
|
async def finalize_casting(
|
|
|
payload: dict = Body(...),
|
|
|
):
|
|
|
import shutil as _sh
|
|
|
from pathlib import Path as _P
|
|
|
|
|
|
video_name = payload.get("video_name")
|
|
|
base_dir = payload.get("base_dir")
|
|
|
characters = payload.get("characters", []) or []
|
|
|
voice_clusters = payload.get("voice_clusters", []) or []
|
|
|
|
|
|
if not video_name or not base_dir:
|
|
|
raise HTTPException(status_code=400, detail="Missing video_name or base_dir")
|
|
|
|
|
|
faces_out = IDENTITIES_ROOT / video_name / "faces"
|
|
|
voices_out = IDENTITIES_ROOT / video_name / "voices"
|
|
|
faces_out.mkdir(parents=True, exist_ok=True)
|
|
|
voices_out.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
for ch in characters:
|
|
|
ch_name = (ch.get("name") or "Unknown").strip() or "Unknown"
|
|
|
ch_folder = ch.get("folder")
|
|
|
kept = ch.get("kept_files") or []
|
|
|
if not ch_folder or not os.path.isdir(ch_folder):
|
|
|
continue
|
|
|
dst_dir = faces_out / ch_name
|
|
|
dst_dir.mkdir(parents=True, exist_ok=True)
|
|
|
for fname in kept:
|
|
|
src = _P(ch_folder) / fname
|
|
|
if src.exists() and src.is_file():
|
|
|
try:
|
|
|
_sh.copy2(src, dst_dir / fname)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
clips_dir = _P(base_dir) / "clips"
|
|
|
for vc in voice_clusters:
|
|
|
v_name = (vc.get("name") or f"SPEAKER_{int(vc.get('label',0)):02d}").strip()
|
|
|
dst_dir = voices_out / v_name
|
|
|
dst_dir.mkdir(parents=True, exist_ok=True)
|
|
|
for wav in (vc.get("clips") or []):
|
|
|
src = clips_dir / wav
|
|
|
if src.exists() and src.is_file():
|
|
|
try:
|
|
|
_sh.copy2(src, dst_dir / wav)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
db_dir = IDENTITIES_ROOT / video_name / "chroma_db"
|
|
|
try:
|
|
|
client = ensure_chroma(db_dir)
|
|
|
n_faces = build_faces_index(
|
|
|
faces_out,
|
|
|
client,
|
|
|
collection_name="index_faces",
|
|
|
deepface_model="Facenet512",
|
|
|
drop=True,
|
|
|
)
|
|
|
n_voices = build_voices_index(
|
|
|
voices_out,
|
|
|
client,
|
|
|
collection_name="index_voices",
|
|
|
drop=True,
|
|
|
)
|
|
|
except Exception as e:
|
|
|
print(f"[finalize_casting] WARN - No se pudieron construir índices ChromaDB: {e}")
|
|
|
n_faces = 0
|
|
|
n_voices = 0
|
|
|
|
|
|
face_identities = sorted([p.name for p in faces_out.iterdir() if p.is_dir()]) if faces_out.exists() else []
|
|
|
voice_identities = sorted([p.name for p in voices_out.iterdir() if p.is_dir()]) if voices_out.exists() else []
|
|
|
|
|
|
casting_json = {"face_col": [], "voice_col": []}
|
|
|
|
|
|
try:
|
|
|
cfg = load_yaml("config.yaml")
|
|
|
router_llm = LLMRouter(cfg)
|
|
|
except Exception:
|
|
|
router_llm = None
|
|
|
|
|
|
try:
|
|
|
if face_identities and router_llm is not None:
|
|
|
factory = router_llm.client_factories.get("salamandra-vision")
|
|
|
if factory is not None:
|
|
|
vclient = factory()
|
|
|
gclient = getattr(vclient, "_client", None)
|
|
|
else:
|
|
|
gclient = None
|
|
|
|
|
|
if gclient is not None:
|
|
|
for identity in face_identities:
|
|
|
id_dir = faces_out / identity
|
|
|
if not id_dir.is_dir():
|
|
|
continue
|
|
|
img_path = None
|
|
|
for ext in (".jpg", ".jpeg", ".png", ".bmp", ".webp"):
|
|
|
candidates = list(id_dir.glob(f"*{ext}"))
|
|
|
if candidates:
|
|
|
img_path = candidates[0]
|
|
|
break
|
|
|
if not img_path:
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
out = gclient.predict(str(img_path), api_name="/face_image_embedding")
|
|
|
emb = None
|
|
|
if isinstance(out, list):
|
|
|
if out and isinstance(out[0], (list, tuple, float, int)):
|
|
|
if out and isinstance(out[0], (list, tuple)):
|
|
|
emb = list(out[0])
|
|
|
else:
|
|
|
emb = list(out)
|
|
|
elif isinstance(out, dict) and "embedding" in out:
|
|
|
emb = out.get("embedding")
|
|
|
|
|
|
if not emb:
|
|
|
continue
|
|
|
|
|
|
casting_json["face_col"].append({
|
|
|
"nombre": identity,
|
|
|
"embedding": emb,
|
|
|
})
|
|
|
except Exception:
|
|
|
continue
|
|
|
except Exception:
|
|
|
casting_json["face_col"] = []
|
|
|
|
|
|
try:
|
|
|
if voice_identities and router_llm is not None:
|
|
|
factory = router_llm.client_factories.get("whisper-catalan")
|
|
|
if factory is not None:
|
|
|
aclient = factory()
|
|
|
gclient = getattr(aclient, "_client", None)
|
|
|
else:
|
|
|
gclient = None
|
|
|
|
|
|
if gclient is not None:
|
|
|
for identity in voice_identities:
|
|
|
id_dir = voices_out / identity
|
|
|
if not id_dir.is_dir():
|
|
|
continue
|
|
|
wav_files = sorted([
|
|
|
p for p in id_dir.iterdir()
|
|
|
if p.is_file() and p.suffix.lower() in [".wav", ".flac", ".mp3"]
|
|
|
])
|
|
|
if not wav_files:
|
|
|
continue
|
|
|
|
|
|
wf = wav_files[0]
|
|
|
try:
|
|
|
out = gclient.predict(str(wf), api_name="/voice_embedding")
|
|
|
emb = None
|
|
|
if isinstance(out, list):
|
|
|
emb = list(out)
|
|
|
elif isinstance(out, dict) and "embedding" in out:
|
|
|
emb = out.get("embedding")
|
|
|
|
|
|
if not emb:
|
|
|
continue
|
|
|
|
|
|
casting_json["voice_col"].append({
|
|
|
"nombre": identity,
|
|
|
"embedding": emb,
|
|
|
})
|
|
|
except Exception:
|
|
|
continue
|
|
|
except Exception:
|
|
|
casting_json["voice_col"] = []
|
|
|
|
|
|
return {
|
|
|
"ok": True,
|
|
|
"video_name": video_name,
|
|
|
"faces_dir": str(faces_out),
|
|
|
"voices_dir": str(voices_out),
|
|
|
"db_dir": str(db_dir),
|
|
|
"n_faces_embeddings": n_faces,
|
|
|
"n_voices_embeddings": n_voices,
|
|
|
"face_identities": face_identities,
|
|
|
"voice_identities": voice_identities,
|
|
|
"casting_json": casting_json,
|
|
|
}
|
|
|
|
|
|
|
|
|
@router.get("/files_scene/{video_name}/{scene_id}/{filename}")
|
|
|
def serve_scene_file(video_name: str, scene_id: str, filename: str):
|
|
|
file_path = TEMP_ROOT / video_name / "scenes" / scene_id / filename
|
|
|
if not file_path.exists():
|
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
return FileResponse(file_path)
|
|
|
|
|
|
|
|
|
@router.post("/detect_scenes")
|
|
|
async def detect_scenes(
|
|
|
video: UploadFile = File(...),
|
|
|
max_groups: int = Form(default=3),
|
|
|
min_cluster_size: int = Form(default=3),
|
|
|
scene_sensitivity: float = Form(default=0.5),
|
|
|
frame_interval_sec: float = Form(default=0.5),
|
|
|
max_frames: int = Form(default=100),
|
|
|
):
|
|
|
"""Detecta escenas usando frames equiespaciados del vídeo y clustering jerárquico.
|
|
|
|
|
|
- Extrae ``max_frames`` fotogramas equiespaciados del vídeo original.
|
|
|
- Descarta frames negros o muy oscuros antes de construir el histograma.
|
|
|
- Representa cada frame por un histograma de color 3D (8x8x8) normalizado
|
|
|
dividiendo por la media (si el histograma es todo ceros o la media es 0,
|
|
|
se descarta el frame).
|
|
|
- Aplica ``hierarchical_cluster_with_min_size`` igual que para cares i veus.
|
|
|
"""
|
|
|
|
|
|
video_name = Path(video.filename).stem
|
|
|
dst_video = VIDEOS_ROOT / f"{video_name}.mp4"
|
|
|
with dst_video.open("wb") as f:
|
|
|
shutil.copyfileobj(video.file, f)
|
|
|
|
|
|
try:
|
|
|
print(f"[detect_scenes] Extrayendo frames equiespaciados de {video_name}...")
|
|
|
|
|
|
cap = cv2.VideoCapture(str(dst_video))
|
|
|
if not cap.isOpened():
|
|
|
raise RuntimeError("No se pudo abrir el vídeo para detectar escenas")
|
|
|
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
|
|
|
if total_frames <= 0:
|
|
|
cap.release()
|
|
|
print("[detect_scenes] total_frames <= 0")
|
|
|
return {"scene_clusters": []}
|
|
|
|
|
|
n_samples = max(1, min(int(max_frames), total_frames))
|
|
|
frame_indices = sorted(set(np.linspace(0, max(0, total_frames - 1), num=n_samples, dtype=int).tolist()))
|
|
|
print(f"[detect_scenes] Total frames: {total_frames}, muestreando {len(frame_indices)} frames")
|
|
|
|
|
|
|
|
|
base = TEMP_ROOT / video_name
|
|
|
scenes_dir = base / "scenes"
|
|
|
scenes_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
keyframe_paths: List[Path] = []
|
|
|
keyframe_infos: List[dict] = []
|
|
|
features: List[np.ndarray] = []
|
|
|
|
|
|
for i, frame_idx in enumerate(frame_indices):
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_idx))
|
|
|
ret, frame = cap.read()
|
|
|
if not ret:
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
mean_intensity = float(gray.mean())
|
|
|
if mean_intensity < 5.0:
|
|
|
|
|
|
continue
|
|
|
|
|
|
local_keyframe = scenes_dir / f"keyframe_{frame_idx:06d}.jpg"
|
|
|
try:
|
|
|
cv2.imwrite(str(local_keyframe), frame)
|
|
|
except Exception as werr:
|
|
|
print(f"[detect_scenes] Error guardando frame {frame_idx}: {werr}")
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
|
|
|
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
hist = cv2.calcHist(
|
|
|
[img_rgb], [0, 1, 2], None,
|
|
|
[8, 8, 8], [0, 256, 0, 256, 0, 256]
|
|
|
).astype("float32").flatten()
|
|
|
|
|
|
if not np.any(hist):
|
|
|
|
|
|
continue
|
|
|
|
|
|
mean_val = float(hist.mean())
|
|
|
if mean_val <= 0.0:
|
|
|
|
|
|
continue
|
|
|
|
|
|
hist /= mean_val
|
|
|
features.append(hist)
|
|
|
except Exception as fe_err:
|
|
|
print(f"[detect_scenes] Error calculando embedding para frame {frame_idx}: {fe_err}")
|
|
|
continue
|
|
|
|
|
|
keyframe_paths.append(local_keyframe)
|
|
|
|
|
|
info = {"start": int(frame_idx), "end": int(frame_idx) + 1}
|
|
|
keyframe_infos.append(info)
|
|
|
|
|
|
cap.release()
|
|
|
|
|
|
if not features or len(features) < min_cluster_size:
|
|
|
print(
|
|
|
f"[detect_scenes] No hay suficientes frames válidos para clusterizar escenas: "
|
|
|
f"validos={len(features)}, min_cluster_size={min_cluster_size}"
|
|
|
)
|
|
|
return {"scene_clusters": []}
|
|
|
|
|
|
Xs = np.vstack(features)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("[detect_scenes] Clustering jerárquico de escenas...")
|
|
|
scene_labels = hierarchical_cluster_with_min_size(Xs, max_groups, min_cluster_size, 0.5)
|
|
|
unique_labels = sorted({int(l) for l in scene_labels if int(l) >= 0})
|
|
|
print(f"[detect_scenes] Etiquetas de escena válidas: {unique_labels}")
|
|
|
|
|
|
|
|
|
cluster_map: Dict[int, List[int]] = {}
|
|
|
for idx, lbl in enumerate(scene_labels):
|
|
|
lbl = int(lbl)
|
|
|
if lbl >= 0:
|
|
|
cluster_map.setdefault(lbl, []).append(idx)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scene_clusters: List[Dict[str, Any]] = []
|
|
|
for ci, idxs in sorted(cluster_map.items(), key=lambda x: x[0]):
|
|
|
if not idxs:
|
|
|
continue
|
|
|
|
|
|
scene_id = f"scene_{ci:02d}"
|
|
|
scene_out_dir = scenes_dir / scene_id
|
|
|
scene_out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
cluster_start = None
|
|
|
cluster_end = None
|
|
|
representative_file = None
|
|
|
|
|
|
for j, k_idx in enumerate(idxs):
|
|
|
src = keyframe_paths[k_idx]
|
|
|
dst = scene_out_dir / src.name
|
|
|
try:
|
|
|
shutil.copy2(src, dst)
|
|
|
except Exception as cp_err:
|
|
|
print(f"[detect_scenes] Error copiando keyframe {src} a cluster {scene_id}: {cp_err}")
|
|
|
continue
|
|
|
|
|
|
if representative_file is None:
|
|
|
representative_file = dst
|
|
|
|
|
|
info = keyframe_infos[k_idx]
|
|
|
start = info.get("start", k_idx)
|
|
|
end = info.get("end", k_idx + 1)
|
|
|
cluster_start = start if cluster_start is None else min(cluster_start, start)
|
|
|
cluster_end = end if cluster_end is None else max(cluster_end, end)
|
|
|
|
|
|
if representative_file is None:
|
|
|
continue
|
|
|
|
|
|
scene_clusters.append({
|
|
|
"id": scene_id,
|
|
|
"name": f"Escena {len(scene_clusters)+1}",
|
|
|
"folder": str(scene_out_dir),
|
|
|
"image_url": f"/files_scene/{video_name}/{scene_id}/{representative_file.name}",
|
|
|
"start_time": float(cluster_start) if cluster_start is not None else 0.0,
|
|
|
"end_time": float(cluster_end) if cluster_end is not None else 0.0,
|
|
|
})
|
|
|
|
|
|
print(f"[detect_scenes] {len(scene_clusters)} escenes clusteritzades")
|
|
|
return {"scene_clusters": scene_clusters}
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[detect_scenes] Error: {e}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
return {"scene_clusters": [], "error": str(e)}
|
|
|
|
|
|
|
|
|
def process_video_job(job_id: str):
|
|
|
"""
|
|
|
Process video job in background using EXTERNAL spaces (svision, asr).
|
|
|
|
|
|
NO local GPU needed - all vision/audio processing is delegated to:
|
|
|
- svision: face detection + embeddings (MTCNN + FaceNet)
|
|
|
- asr: audio diarization + voice embeddings (pyannote + ECAPA)
|
|
|
|
|
|
Engine only does: frame extraction, clustering (math), file organization.
|
|
|
"""
|
|
|
try:
|
|
|
job = jobs[job_id]
|
|
|
print(f"[{job_id}] Iniciando procesamiento (delegando a svision/asr)...")
|
|
|
|
|
|
job["status"] = JobStatus.PROCESSING
|
|
|
|
|
|
video_path = job["video_path"]
|
|
|
video_name = job["video_name"]
|
|
|
max_groups = int(job.get("max_groups", 5))
|
|
|
min_cluster_size = int(job.get("min_cluster_size", 3))
|
|
|
face_sensitivity = float(job.get("face_sensitivity", 0.5))
|
|
|
|
|
|
base = TEMP_ROOT / video_name
|
|
|
base.mkdir(parents=True, exist_ok=True)
|
|
|
print(f"[{job_id}] Directorio base: {base}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[{job_id}] Extrayendo frames del vídeo...")
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
if not cap.isOpened():
|
|
|
raise RuntimeError("No se pudo abrir el vídeo")
|
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
|
|
|
max_samples = job.get("max_frames", 100)
|
|
|
|
|
|
if total_frames > 0:
|
|
|
frame_indices = sorted(set(np.linspace(0, max(0, total_frames - 1), num=min(max_samples, max(1, total_frames)), dtype=int).tolist()))
|
|
|
else:
|
|
|
frame_indices = []
|
|
|
|
|
|
print(f"[{job_id}] Total frames: {total_frames}, FPS: {fps:.2f}, Muestreando {len(frame_indices)} frames")
|
|
|
|
|
|
|
|
|
frames_dir = base / "frames_temp"
|
|
|
frames_dir.mkdir(parents=True, exist_ok=True)
|
|
|
faces_root = base / "faces_raw"
|
|
|
faces_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
frame_paths: List[str] = []
|
|
|
for frame_idx in frame_indices:
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_idx))
|
|
|
ret, frame = cap.read()
|
|
|
if not ret:
|
|
|
continue
|
|
|
frame_path = frames_dir / f"frame_{frame_idx:06d}.jpg"
|
|
|
cv2.imwrite(str(frame_path), frame)
|
|
|
frame_paths.append(str(frame_path))
|
|
|
cap.release()
|
|
|
|
|
|
print(f"[{job_id}] ✓ {len(frame_paths)} frames extraídos")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[{job_id}] Enviando frames a svision para detección de caras...")
|
|
|
|
|
|
embeddings: List[List[float]] = []
|
|
|
crops_meta: List[dict] = []
|
|
|
saved_count = 0
|
|
|
frames_with_faces = 0
|
|
|
|
|
|
for i, frame_path in enumerate(frame_paths):
|
|
|
frame_idx = frame_indices[i] if i < len(frame_indices) else i
|
|
|
try:
|
|
|
|
|
|
faces = svision_client.get_face_embeddings_from_image(frame_path)
|
|
|
|
|
|
if faces:
|
|
|
frames_with_faces += 1
|
|
|
for face_data in faces:
|
|
|
emb = face_data.get("embedding", [])
|
|
|
if not emb:
|
|
|
continue
|
|
|
|
|
|
|
|
|
emb = np.array(emb, dtype=float)
|
|
|
emb = emb / (np.linalg.norm(emb) + 1e-9)
|
|
|
embeddings.append(emb.tolist())
|
|
|
|
|
|
|
|
|
crop_path = face_data.get("face_crop_path")
|
|
|
fn = f"face_{frame_idx:06d}_{saved_count:03d}.jpg"
|
|
|
local_crop_path = faces_root / fn
|
|
|
|
|
|
crop_saved = False
|
|
|
if crop_path:
|
|
|
|
|
|
if isinstance(crop_path, str) and crop_path.startswith("http"):
|
|
|
try:
|
|
|
import requests
|
|
|
resp = requests.get(crop_path, timeout=30)
|
|
|
if resp.status_code == 200:
|
|
|
with open(local_crop_path, "wb") as f:
|
|
|
f.write(resp.content)
|
|
|
crop_saved = True
|
|
|
except Exception as dl_err:
|
|
|
print(f"[{job_id}] Error descargando crop: {dl_err}")
|
|
|
|
|
|
elif isinstance(crop_path, str) and os.path.exists(crop_path):
|
|
|
shutil.copy2(crop_path, local_crop_path)
|
|
|
crop_saved = True
|
|
|
|
|
|
if not crop_saved:
|
|
|
|
|
|
shutil.copy2(frame_path, local_crop_path)
|
|
|
|
|
|
crops_meta.append({
|
|
|
"file": fn,
|
|
|
"frame": frame_idx,
|
|
|
"index": face_data.get("index", saved_count),
|
|
|
})
|
|
|
saved_count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[{job_id}] Error procesando frame {frame_idx}: {e}")
|
|
|
continue
|
|
|
|
|
|
print(f"[{job_id}] ✓ Frames con caras: {frames_with_faces}/{len(frame_paths)}")
|
|
|
print(f"[{job_id}] ✓ Caras detectadas: {len(embeddings)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if embeddings:
|
|
|
print(f"[{job_id}] Clustering jerárquico...")
|
|
|
Xf = np.array(embeddings)
|
|
|
labels = hierarchical_cluster_with_min_size(Xf, max_groups, min_cluster_size, face_sensitivity).tolist()
|
|
|
n_clusters = len(set([l for l in labels if l >= 0]))
|
|
|
print(f"[{job_id}] ✓ Clustering: {n_clusters} clusters")
|
|
|
else:
|
|
|
labels = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
characters: List[Dict[str, Any]] = []
|
|
|
cluster_map: Dict[int, List[int]] = {}
|
|
|
for idx, lbl in enumerate(labels):
|
|
|
if isinstance(lbl, int) and lbl >= 0:
|
|
|
cluster_map.setdefault(lbl, []).append(idx)
|
|
|
|
|
|
chars_dir = base / "characters"
|
|
|
chars_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
print(f"[{job_id}] cluster_map: {cluster_map}")
|
|
|
print(f"[{job_id}] crops_meta count: {len(crops_meta)}")
|
|
|
print(f"[{job_id}] faces_root: {faces_root}, exists: {faces_root.exists()}")
|
|
|
if faces_root.exists():
|
|
|
existing_files = list(faces_root.glob("*"))
|
|
|
print(f"[{job_id}] Files in faces_root: {len(existing_files)}")
|
|
|
for ef in existing_files[:5]:
|
|
|
print(f"[{job_id}] - {ef.name}")
|
|
|
|
|
|
for ci, idxs in sorted(cluster_map.items(), key=lambda x: x[0]):
|
|
|
char_id = f"char_{ci:02d}"
|
|
|
print(f"[{job_id}] Processing cluster {char_id} with {len(idxs)} indices: {idxs[:5]}...")
|
|
|
|
|
|
if not idxs:
|
|
|
continue
|
|
|
|
|
|
out_dir = chars_dir / char_id
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
total_faces = len(idxs)
|
|
|
max_faces_to_show = (total_faces // 2) + 1
|
|
|
selected_idxs = idxs[:max_faces_to_show]
|
|
|
|
|
|
files: List[str] = []
|
|
|
file_urls: List[str] = []
|
|
|
|
|
|
for j in selected_idxs:
|
|
|
if j >= len(crops_meta):
|
|
|
print(f"[{job_id}] Index {j} out of range (crops_meta len={len(crops_meta)})")
|
|
|
continue
|
|
|
meta = crops_meta[j]
|
|
|
fname = meta.get("file")
|
|
|
if not fname:
|
|
|
print(f"[{job_id}] No filename in meta for index {j}")
|
|
|
continue
|
|
|
|
|
|
src = faces_root / fname
|
|
|
dst = out_dir / fname
|
|
|
try:
|
|
|
if src.exists():
|
|
|
shutil.copy2(src, dst)
|
|
|
files.append(fname)
|
|
|
file_urls.append(f"/files/{video_name}/{char_id}/{fname}")
|
|
|
else:
|
|
|
print(f"[{job_id}] Source file not found: {src}")
|
|
|
except Exception as cp_err:
|
|
|
print(f"[{job_id}] Error copying {fname}: {cp_err}")
|
|
|
|
|
|
|
|
|
rep = files[0] if files else None
|
|
|
if rep:
|
|
|
try:
|
|
|
shutil.copy2(out_dir / rep, out_dir / "representative.jpg")
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
cluster_number = ci + 1
|
|
|
character_name = f"Cluster {cluster_number}"
|
|
|
|
|
|
characters.append({
|
|
|
"id": char_id,
|
|
|
"name": character_name,
|
|
|
"folder": str(out_dir),
|
|
|
"num_faces": len(files),
|
|
|
"total_faces_detected": total_faces,
|
|
|
"image_url": f"/files/{video_name}/{char_id}/representative.jpg" if rep else "",
|
|
|
"face_files": file_urls,
|
|
|
})
|
|
|
print(f"[{job_id}] ✓ Cluster {char_id}: {len(files)} caras")
|
|
|
|
|
|
|
|
|
try:
|
|
|
shutil.rmtree(frames_dir)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
print(f"[{job_id}] ✓ Total: {len(characters)} personajes")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
voice_max_groups = int(job.get("voice_max_groups", 3))
|
|
|
voice_min_cluster_size = int(job.get("voice_min_cluster_size", 3))
|
|
|
voice_sensitivity = float(job.get("voice_sensitivity", 0.5))
|
|
|
|
|
|
audio_segments: List[Dict[str, Any]] = []
|
|
|
voice_labels: List[int] = []
|
|
|
voice_embeddings: List[List[float]] = []
|
|
|
diarization_info: Dict[str, Any] = {}
|
|
|
|
|
|
print(f"[{job_id}] Procesando audio con ASR space...")
|
|
|
try:
|
|
|
|
|
|
diar_result = asr_client.extract_audio_and_diarize(video_path)
|
|
|
clips = diar_result.get("clips", [])
|
|
|
segments = diar_result.get("segments", [])
|
|
|
|
|
|
print(f"[{job_id}] Diarización: {len(clips)} clips, {len(segments)} segmentos")
|
|
|
|
|
|
|
|
|
clips_dir = base / "clips"
|
|
|
clips_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
for i, clip_info in enumerate(clips if isinstance(clips, list) else []):
|
|
|
clip_path = clip_info if isinstance(clip_info, str) else clip_info.get("path") if isinstance(clip_info, dict) else None
|
|
|
if not clip_path:
|
|
|
continue
|
|
|
|
|
|
|
|
|
local_clip = clips_dir / f"segment_{i:03d}.wav"
|
|
|
try:
|
|
|
if isinstance(clip_path, str) and clip_path.startswith("http"):
|
|
|
import requests
|
|
|
resp = requests.get(clip_path, timeout=30)
|
|
|
if resp.status_code == 200:
|
|
|
with open(local_clip, "wb") as f:
|
|
|
f.write(resp.content)
|
|
|
elif isinstance(clip_path, str) and os.path.exists(clip_path):
|
|
|
shutil.copy2(clip_path, local_clip)
|
|
|
except Exception as dl_err:
|
|
|
print(f"[{job_id}] Error guardando clip {i}: {dl_err}")
|
|
|
continue
|
|
|
|
|
|
|
|
|
seg_info = segments[i] if i < len(segments) else {}
|
|
|
speaker = seg_info.get("speaker", f"SPEAKER_{i:02d}")
|
|
|
|
|
|
|
|
|
emb = asr_client.get_voice_embedding(str(local_clip))
|
|
|
if emb:
|
|
|
voice_embeddings.append(emb)
|
|
|
|
|
|
audio_segments.append({
|
|
|
"index": i,
|
|
|
"clip_path": str(local_clip),
|
|
|
"clip_url": f"/audio/{video_name}/segment_{i:03d}.wav",
|
|
|
"speaker": speaker,
|
|
|
"start": seg_info.get("start", 0),
|
|
|
"end": seg_info.get("end", 0),
|
|
|
})
|
|
|
|
|
|
print(f"[{job_id}] ✓ {len(audio_segments)} segmentos de audio procesados")
|
|
|
|
|
|
|
|
|
if voice_embeddings:
|
|
|
print(f"[{job_id}] Clustering jerárquico de voz...")
|
|
|
Xv = np.array(voice_embeddings)
|
|
|
voice_labels = hierarchical_cluster_with_min_size(
|
|
|
Xv, voice_max_groups, voice_min_cluster_size, voice_sensitivity
|
|
|
).tolist()
|
|
|
n_voice_clusters = len(set([l for l in voice_labels if l >= 0]))
|
|
|
print(f"[{job_id}] ✓ Clustering de voz: {n_voice_clusters} clusters")
|
|
|
|
|
|
diarization_info = {
|
|
|
"num_segments": len(audio_segments),
|
|
|
"num_voice_clusters": len(set([l for l in voice_labels if l >= 0])) if voice_labels else 0,
|
|
|
}
|
|
|
|
|
|
except Exception as audio_err:
|
|
|
print(f"[{job_id}] Error en procesamiento de audio: {audio_err}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
|
|
|
job["results"] = {
|
|
|
"characters": characters,
|
|
|
"face_labels": labels,
|
|
|
"audio_segments": audio_segments,
|
|
|
"voice_labels": voice_labels,
|
|
|
"diarization_info": diarization_info,
|
|
|
"video_name": video_name,
|
|
|
"base_dir": str(base),
|
|
|
}
|
|
|
job["status"] = JobStatus.DONE
|
|
|
print(f"[{job_id}] ✓ Procesamiento completado")
|
|
|
|
|
|
except Exception as proc_error:
|
|
|
print(f"[{job_id}] Error en procesamiento: {proc_error}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
job["results"] = {
|
|
|
"characters": [], "face_labels": [],
|
|
|
"audio_segments": [], "voice_labels": [], "diarization_info": {},
|
|
|
"video_name": video_name, "base_dir": str(base)
|
|
|
}
|
|
|
job["status"] = JobStatus.DONE
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[{job_id}] Error general: {e}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
job["status"] = JobStatus.FAILED
|
|
|
job["error"] = str(e)
|
|
|
|