# vision_tools.py # ----------------------------------------------------------------------------- # Veureu — VISION utilities (self-contained) # - Image processing and analysis # - Object detection and recognition # - Face detection and recognition # - Scene description # - Montage sequence analysis # ----------------------------------------------------------------------------- from __future__ import annotations import os os.environ["CUDA_VISIBLE_DEVICES"] = "0" from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import json import logging import math import os import shlex import subprocess import numpy as np import torch import torchaudio import torchaudio.transforms as T from transformers import WhisperProcessor, WhisperForConditionalGeneration from pyannote.audio import Pipeline as PyannotePipeline from speechbrain.inference.speaker import SpeakerRecognition from pydub import AudioSegment from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score from scenedetect import VideoManager, SceneManager from scenedetect.detectors import ContentDetector import os, base64, requests, subprocess, contextlib, time from transformers import AutoProcessor, LlavaForConditionalGeneration from PIL import Image from audio_tools import process_audio_for_video from llm_router import load_yaml, LLMRouter import cv2 try: import face_recognition # type: ignore except Exception: face_recognition = None # type: ignore # FaceRecognizer - Implementación directa con DeepFace class DFRecognizer: """Wrapper simple para DeepFace como backend de embeddings.""" def __init__(self, model_name: str = 'Facenet512'): self.model_name = model_name if DeepFace is None: raise ImportError("DeepFace not available") def get_face_embedding_from_path(self, image_path: str) -> Optional[np.ndarray]: """Extrae embedding de cara usando DeepFace.""" try: # Usar DeepFace para obtener embedding embedding = DeepFace.represent( img_path=image_path, model_name=self.model_name, enforce_detection=False, # No forzar detección (ya detectada) detector_backend='skip' ) if isinstance(embedding, list) and len(embedding) > 0: # DeepFace.represent devuelve lista de diccionarios emb = embedding[0].get('embedding') if emb: return np.array(emb, dtype=float) return None except Exception as e: log.debug("DeepFace embedding failed for %s: %s", image_path, e) return None try: from deepface import DeepFace except ImportError: DeepFace = None import easyocr # -------------------------------- Logging ------------------------------------ log = logging.getLogger("audio_tools") if not log.handlers: h = logging.StreamHandler() h.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) log.addHandler(h) log.setLevel(logging.INFO) # ============================ UTILS =========================================== def load_config(path: str = "configs/config_veureu.yaml") -> Dict[str, Any]: p = Path(path) if not p.exists(): log.warning("Config file not found: %s (using defaults)", path) return {} try: import yaml cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} cfg["__path__"] = str(p) return cfg except Exception as e: log.error("Failed to read YAML config: %s", e) return {} # ---------------------------- IMAGE EMBEDDING ---------------------------------- class FaceOfImageEmbedding: """Preferred backend: `face_recognition`; fallback: DeepFace via libs.face_utils.""" def __init__(self, deepface_model: str = 'Facenet512'): self.use_fr = face_recognition is not None self.df = None if not self.use_fr and DFRecognizer is not None: try: self.df = DFRecognizer(model_name=deepface_model) log.info("Using DeepFace (%s) as face embedding backend.", deepface_model) except Exception as e: log.warning("Failed to initialize DeepFace: %s", e) elif self.use_fr: log.info("Using face_recognition as face embedding backend.") else: log.error("No face embedding backend available.") def encode_image(self, image_path: Path) -> Optional[List[float]]: import numpy as np try: if self.use_fr: img = face_recognition.load_image_file(str(image_path)) # type: ignore encs = face_recognition.face_encodings(img) if encs: # Normalizar cada embedding a norma 1 embeddings = [(e / np.linalg.norm(e)).astype(float).tolist() for e in encs] return embeddings return None if self.df is not None: emb = self.df.get_face_embedding_from_path(str(image_path)) if emb is None: return None # Convertir a numpy array y normalizar emb = np.array(emb, dtype=float) emb = emb / np.linalg.norm(emb) return emb.tolist() except Exception as e: log.debug("Fallo embedding cara %s: %s", image_path, e) return None class FaceAnalyzer: """Wrapper sencillo para DeepFace que obtiene edad y género de una imagen.""" def __init__(self, actions=None): if actions is None: actions = ["age", "gender"] self.actions = actions if DeepFace is None: log.warning("DeepFace not available - FaceAnalyzer will return None") def analyze_image(self, img_path: str) -> Optional[Dict[str, Any]]: if DeepFace is None: return None try: result = DeepFace.analyze(img_path=img_path, actions=self.actions) # Si DeepFace devuelve una lista (varias caras), tomamos la primera if isinstance(result, list) and len(result) > 0: result = result[0] # Ahora sí podemos acceder a 'age' y 'dominant_gender' return { "age": result.get("age", "unknown"), "gender": result.get("dominant_gender", "unknown") } except Exception as e: log.warning("No se pudo analizar la imagen %s: %s", img_path, e) return None # ----------------------------------- FUNCTIONS ------------------------------------- def map_identities_per_second(frames_per_second, intervals): for seg in intervals: seg_start = seg["start"] seg_end = seg["end"] # recolectar identidades de los frames en el rango del segmento identities = [] for f in frames_per_second: if seg_start <= f["start"] <= seg_end: for face in f.get("faces", []): identities.append(face) # contar apariciones seg["counts"] = dict(Counter(identities)) return intervals def _split_montage(img: np.ndarray, n: int, cfg: Dict[str, Any]) -> List[np.ndarray]: vd = cfg.get('vision_describer', {}) montage_cfg = vd.get('montage', {}) mode = montage_cfg.get('split_mode', 'horizontal') # 'horizontal'|'vertical'|'grid' h, w = img.shape[:2] tiles: List[np.ndarray] = [] if mode == 'vertical': tile_h = h // n for i in range(n): y0 = i * tile_h; y1 = h if i == n-1 else (i+1) * tile_h tiles.append(img[y0:y1, 0:w]) return tiles if mode == 'grid': rows = int(montage_cfg.get('rows', 1) or 1) cols = int(montage_cfg.get('cols', n) or n) assert rows * cols >= n, "grid rows*cols must be >= n" tile_h = h // rows; tile_w = w // cols k = 0 for r in range(rows): for c in range(cols): if k >= n: break y0, y1 = r*tile_h, h if (r==rows-1) else (r+1)*tile_h x0, x1 = c*tile_w, w if (c==cols-1) else (c+1)*tile_w tiles.append(img[y0:y1, x0:x1]); k += 1 return tiles tile_w = w // n for i in range(n): x0 = i * tile_w; x1 = w if i == n-1 else (i+1) * tile_w tiles.append(img[0:h, x0:x1]) return tiles def generar_montage(frame_paths: List[str], output_dir: str) -> None: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) montage_path = "" if frame_paths: imgs = [cv2.imread(kf) for kf in frame_paths if os.path.exists(kf)] imgs = [img for img in imgs if img is not None] print(f"Se encontraron {len(imgs)} imágenes para el montaje.") if imgs: h = max(img.shape[0] for img in imgs) # altura máxima imgs_resized = [cv2.resize(img, (int(img.shape[1]*h/img.shape[0]), h)) for img in imgs] montage = cv2.hconcat(imgs_resized) montage_path = os.path.join(output_dir, "keyframes_montage.jpg") print(f"Guardando montaje en: {montage_path}") cv2.imwrite(montage_path, montage) print("Montaje guardado.") else: print("No se encontraron imágenes válidas para el montaje.") return montage_path def describe_montage_sequence( montage_path: str, n: int, informacion, face_identities, *, config_path: str = 'config.yaml' ) -> Dict[str, Any]: """Describe each sub-image of a montage using remote Space (svision) via LLMRouter. Returns a list of descriptions, one per tile. """ img = cv2.imread(montage_path, cv2.IMREAD_COLOR) if img is None: raise RuntimeError(f"No se puede leer la imagen: {montage_path}") # Load engine config and split montage into tiles cfg = load_yaml(config_path) tiles = _split_montage(img, n, cfg) if len(tiles) < n: raise RuntimeError(f"Se produjeron {len(tiles)} tiles, se esperaban {n}") # Persist tiles as temporary images next to montage out_dir = Path(montage_path).parent frame_paths: List[str] = [] for i, t in enumerate(tiles): p = out_dir / f"tile_{i:03d}.jpg" cv2.imwrite(str(p), t) frame_paths.append(str(p)) # Prepare context and call remote vision describer context = { "informacion": informacion, "face_identities": sorted(list(face_identities or set())), } model_name = (cfg.get("models", {}).get("vision") or "salamandra-vision") router = LLMRouter(cfg) descs = router.vision_describe(frame_paths, context=context, model=model_name) return descs # --------------------------- IMAGES EXTRACTION ----------------------------- def keyframe_conditional_extraction_ana( video_path, output_dir, threshold=30.0, offset_frames=10 ): """ Detecta cambios de escena en un vídeo, guarda un fotograma por cada cambio, devuelve intervalos con start y end basados en los tiempos de los keyframes y genera un montaje con todos los keyframes. """ if not os.path.exists(output_dir): os.makedirs(output_dir) video_manager = VideoManager([video_path]) scene_manager = SceneManager() scene_manager.add_detector(ContentDetector(threshold=threshold)) video_manager.start() scene_manager.detect_scenes(video_manager) scene_list = scene_manager.get_scene_list() cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) video_duration = total_frames / fps keyframes = [] for i, (start_time, end_time) in enumerate(scene_list): frame_number = int(start_time.get_frames()) + offset_frames cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() if ret: ts = frame_number / fps frame_path = os.path.join(output_dir, f"scene_{i+1:03d}.jpg") cv2.imwrite(frame_path, frame) keyframes.append({ "index": i+1, "time": round(ts, 2), "path": frame_path }) cap.release() video_manager.release() # Construimos intervalos con start y end intervals = [] for i, kf in enumerate(keyframes): start = kf["time"] if i < len(keyframes) - 1: end = keyframes[i+1]["time"] else: end = video_duration # última escena hasta el final intervals.append({ "index": kf["index"], "start": start, "end": round(end, 2), "path": kf["path"] }) return intervals def keyframe_every_second( video_path: str, output_dir: str = ".", max_frames: Optional[int] = 10000, ) -> List[dict]: """ Extrae un fotograma por cada segundo del video. Returns: List[dict]: Cada elemento es {"index", "start", "end", "path"} """ out_dir = Path(output_dir) out_dir.mkdir(parents=True, exist_ok=True) cap = cv2.VideoCapture(str(video_path)) fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps frames: List[dict] = [] idx = 0 sec = 0.0 while sec <= duration: frame_number = int(sec * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() if not ret: break timestamp = frame_number / fps frame_path = out_dir / f"frame_per_second{idx:03d}.jpg" cv2.imwrite(str(frame_path), frame) frames.append({ "index": idx + 1, "start": round(timestamp, 2), "end": None, # lo completamos después "path": str(frame_path), }) idx += 1 sec += 1.0 if max_frames and idx >= max_frames: break cap.release() # Completar los "end" con el inicio del siguiente frame for i in range(len(frames)): if i < len(frames) - 1: frames[i]["end"] = frames[i+1]["start"] else: frames[i]["end"] = round(duration, 2) return frames from collections import Counter, defaultdict # --------------------------- FRAMES PROCESSING ----------------------------- def process_frames( frames: List[dict], # cada elemento es {"index", "start", "end", "path"} config: dict, face_col=None, embedding_model=None, ) -> Tuple[List[dict], List[int]]: """ Procesa keyframes: - Detecta caras - Genera embeddings con FaceEmbedding - Opcionalmente compara con face_col (KNN top-3) - Opcionalmente ejecuta OCR """ frame_results = [] # Crear embedding_model si no se pasa if embedding_model is None: embedding_model = FaceOfImageEmbedding() for idx, frame in enumerate(frames): frame_path = frame["path"] try: raw_faces = embedding_model.encode_image(Path(frame_path)) except Exception as e: print(f"Error procesando {frame_path}: {e}") raw_faces = None faces = [] if raw_faces is not None: if isinstance(raw_faces[0], list): # múltiples for e in raw_faces: faces.append({"embedding": e}) else: # uno solo faces.append({"embedding": raw_faces}) faces_detected = [] for f in faces: embedding = f.get("embedding") identity = "Unknown" knn = [] if face_col is not None and embedding is not None: try: num_embeddings = face_col.count() if num_embeddings < 1: knn = [] identity = "Unknown" else: n_results = min(3, num_embeddings) q = face_col.query( query_embeddings=[embedding], n_results=n_results, include=["metadatas", "distances"] ) knn = [] metas = q.get("metadatas", [[]])[0] dists = q.get("distances", [[]])[0] for meta, dist in zip(metas, dists): person_id = meta.get("identity", "Unknown") if isinstance(meta, dict) else "Unknown" knn.append({"identity": person_id, "distance": float(dist)}) if knn and knn[0]["distance"] < 0.6: identity = knn[0]["identity"] else: identity = "Unknown" except Exception as e: print(f"Face KNN failed: {e}") knn = [] identity = "Unknown" faces_detected.append(identity) use_easyocr = True if use_easyocr: try: reader = easyocr.Reader(['en', 'es'], gpu=True) # Cambiar gpu=False si no hay GPU results = reader.readtext(frame_path) ocr_text_easyocr = " ".join([text for _, text, _ in results]).strip() except Exception as e: print(f"OCR error: {e}") frame_results.append({ "id": frame["index"], "start": frame["start"], "end": frame["end"], "image_path": frame_path, "faces": faces_detected, "ocr": ocr_text_easyocr, }) return frame_results if __name__ == "__main__": import argparse ap = argparse.ArgumentParser(description="Veureu — Audio tools (self-contained)") ap.add_argument("--video", required=True) ap.add_argument("--out", default="results") ap.add_argument("--config", default="configs/config_veureu.yaml") args = ap.parse_args() # Lightweight config loader (only for sample run) import yaml cfg = {} p = Path(args.config) if p.exists(): cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} out_dir = Path(args.out) / Path(args.video).stem out_dir.mkdir(parents=True, exist_ok=True) segs, srt = process_audio_for_video(args.video, out_dir, cfg, voice_collection=None) print(json.dumps({ "segments": len(segs), "srt": srt }, indent=2, ensure_ascii=False))