# api_client.py (UI - Space "veureu") import os import requests import base64 import zipfile import io from typing import Iterable, Dict, Any, Tuple from PIL import Image class APIClient: """ Cliente para 'engine': POST /jobs -> {"job_id": "..."} GET /jobs/{job_id}/status -> {"status": "queued|processing|done|failed", ...} GET /jobs/{job_id}/result -> JobResult {"book": {...}, "une": {...}, ...} """ def __init__(self, base_url: str, use_mock: bool = False, data_dir: str | None = None, token: str | None = None, timeout: int = 180, tts_url: str | None = None): self.base_url = base_url.rstrip("/") # URL específica para el servicio TTS (por defecto usa la variable de entorno API_TTS_URL) self.tts_url = tts_url or os.getenv("API_TTS_URL", "https://veureu-tts.hf.space") print(f"🔧 APIClient.__init__ - tts_url recibida: {tts_url}") print(f"🔧 APIClient.__init__ - os.getenv(API_TTS_URL): {os.getenv('API_TTS_URL')}") print(f"🔧 APIClient.__init__ - self.tts_url final: {self.tts_url}") print(f"🔧 APIClient.__init__ - tipo self.tts_url: {type(self.tts_url)}") print(f"🔧 APIClient.__init__ - repr self.tts_url: {repr(self.tts_url)}") self.use_mock = use_mock self.data_dir = data_dir self.timeout = timeout self.session = requests.Session() # Permite inyectar el token del engine via secret/var en el Space UI token = token or os.getenv("API_SHARED_TOKEN") if token: self.session.headers.update({"Authorization": f"Bearer {token}"}) # ---- modo real (engine) ---- def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: url = f"{self.base_url}/jobs" files = {"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream")} data = {"modes": ",".join(modes)} r = self.session.post(url, files=files, data=data, timeout=self.timeout) r.raise_for_status() return r.json() # {"job_id": ...} def _get_status(self, job_id: str) -> Dict[str, Any]: url = f"{self.base_url}/jobs/{job_id}/status" r = self.session.get(url, timeout=self.timeout) if r.status_code == 404: return {"status": "not_found"} r.raise_for_status() return r.json() def _get_result(self, job_id: str) -> Dict[str, Any]: url = f"{self.base_url}/jobs/{job_id}/status" r = self.session.get(url, timeout=self.timeout) if r.status_code == 404: return {"status": "not_found"} r.raise_for_status() return r.json() # JobResult (status + results según engine) # ---- API que usa streamlit_app.py ---- def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: """Devuelve {"job_id": "..."}""" if self.use_mock: return {"job_id": "mock-123"} return self._post_jobs(video_path, modes) def get_job(self, job_id: str) -> Dict[str, Any]: """ La UI espera algo del estilo: {"status":"done","results":{"book":{...},"une":{...}}} Adaptamos la respuesta de /result del engine a ese contrato. """ if self.use_mock: # resultado inmediato de prueba return { "status": "done", "results": { "book": {"text": "Text d'exemple (book)", "mp3_bytes": b""}, "une": {"srt": "1\n00:00:00,000 --> 00:00:01,000\nExemple UNE\n", "mp3_bytes": b""}, } } # Opción 1: chequear estado primero st = self._get_status(job_id) if st.get("status") in {"queued", "processing"}: return {"status": st.get("status", "queued")} # Opción 2: obtener resultado final res = self._get_result(job_id) # NUEVO: si el engine ya devuelve {"status": ..., "results": {...}}, pásalo tal cual if isinstance(res, dict) and isinstance(res.get("results"), dict): return { "status": res.get("status", st.get("status", "done")), "results": res.get("results", {}), } # LEGACY: mapeo antiguo basado en claves top-level (book/une) results = {} if "book" in res: results["book"] = { "text": res["book"].get("text"), } if "une" in res: results["une"] = { "srt": res["une"].get("srt"), } for k in ("book", "une"): if k in res: if "characters" in res[k]: results[k]["characters"] = res[k]["characters"] if "metrics" in res[k]: results[k]["metrics"] = res[k]["metrics"] status = "done" if results else st.get("status", "unknown") return {"status": status, "results": results} def tts_matxa(self, text: str, voice: str = "central/grau") -> dict: """ Llama al space 'tts' para sintetizar audio. Usa /tts/text para textos cortos (<480 chars) o /tts/text_long para textos largos. Args: text (str): Texto a sintetizar. voice (str): Voz de Matxa a usar (p.ej. 'central/grau'). Returns: dict: {'mp3_bytes': bytes} o {'error': str} """ if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") print(f"🔧 tts_matxa - self.tts_url ANTES de construir URL: {self.tts_url}") print(f"🔧 tts_matxa - tipo self.tts_url: {type(self.tts_url)}") print(f"🔧 tts_matxa - repr self.tts_url: {repr(self.tts_url)}") # Usar endpoint apropiado según la longitud del texto if len(text) > 480: url = f"{self.tts_url.rstrip('/')}/tts/text_long" else: url = f"{self.tts_url.rstrip('/')}/tts/text" print(f"🔧 tts_matxa - URL final construida: {url}") print(f"🔧 tts_matxa - repr URL final: {repr(url)}") data = { "texto": text, "voice": voice, "formato": "mp3" } try: print(f"🎯 Llamando TTS a: {url}") print(f"📝 Texto length: {len(text)} caracteres") print(f"🗣️ Voz: {voice}") r = self.session.post(url, data=data, timeout=self.timeout * 2) # Más tiempo para textos largos print(f"📊 Response status: {r.status_code}") r.raise_for_status() # Devolver los bytes directamente para que el cliente los pueda concatenar print(f"✅ Audio recibido: {len(r.content)} bytes") return {"mp3_bytes": r.content} except requests.exceptions.RequestException as e: print(f"❌ Error cridant a TTS: {e}") print(f"❌ URL: {url}") print(f"❌ Data: {data}") # Devolvemos un diccionario con error para que la UI lo muestre return {"error": str(e)} def generate_audiodescription(self, video_bytes: bytes, video_name: str) -> dict: """Llama al endpoint del engine /generate_audiodescription con un MP4 en memoria.""" url = f"{self.base_url}/generate_audiodescription" try: files = { "video": (video_name or "video.mp4", video_bytes, "video/mp4") } r = self.session.post(url, files=files, timeout=self.timeout * 10) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def finalize_casting(self, payload: dict) -> dict: """Envía el càsting definitiu al engine para consolidar identidades e indexar.""" url = f"{self.base_url}/finalize_casting" try: r = self.session.post(url, json=payload, timeout=self.timeout * 5) r.raise_for_status() return r.json() except requests.exceptions.HTTPError as e: resp = e.response try: # Try to include JSON error if present return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": resp.json() if resp is not None else None} except Exception: # Fallback to text body return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": (resp.text if resp is not None else None)} except requests.exceptions.RequestException as e: return {"error": str(e)} def load_casting(self, faces_dir: str, voices_dir: str, db_dir: str, drop_collections: bool = False) -> dict: """Carga índices de caras y voces al motor de búsqueda Chroma del engine.""" url = f"{self.base_url}/load_casting" data = { "faces_dir": faces_dir, "voices_dir": voices_dir, "db_dir": db_dir, "drop_collections": str(1 if drop_collections else 0), } try: r = self.session.post(url, data=data, timeout=self.timeout * 5) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def rebuild_video_with_ad(self, video_path: str, srt_path: str, voice: str = "central/grau") -> dict: """ Llama al space 'tts' para reconstruir un vídeo con audiodescripció a partir de un SRT. Usa el endpoint /tts/srt que devuelve un ZIP con el vídeo final. Args: video_path: Ruta al archivo de vídeo original srt_path: Ruta al archivo SRT con las audiodescripciones voice: Voz de Matxa (por defecto 'central/grau') Returns: dict: {'video_bytes': bytes} o {'error': str} """ if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") url = f"{self.tts_url.rstrip('/')}/tts/srt" print(f"🎬 Reconstruyendo video con AD") print(f"🎯 URL TTS: {url}") print(f"📹 Video: {video_path}") print(f"📝 SRT: {srt_path}") print(f"🗣️ Voz: {voice}") try: with open(video_path, 'rb') as video_file: with open(srt_path, 'rb') as srt_file: files = { 'video': (os.path.basename(video_path), video_file, 'video/mp4'), 'srt': (os.path.basename(srt_path), srt_file, 'application/x-subrip') } data = { "voice": voice, "ad_format": "mp3", "include_final_mp4": "1" } r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() # El servidor devuelve un ZIP, lo procesamos en memoria with zipfile.ZipFile(io.BytesIO(r.content)) as z: # Buscamos el archivo video_con_ad.mp4 dentro del ZIP for filename in z.namelist(): if filename.endswith('.mp4'): video_bytes = z.read(filename) return {"video_bytes": video_bytes} # Si no se encuentra el MP4 en el ZIP return {"error": "No se encontró el archivo de vídeo MP4 en la respuesta del servidor."} except requests.exceptions.RequestException as e: print(f"Error cridant a la reconstrucció de vídeo: {e}") return {"error": str(e)} except zipfile.BadZipFile: return {"error": "La respuesta del servidor no fue un archivo ZIP válido."} except Exception as e: print(f"Error inesperat: {e}") return {"error": str(e)} def refine_narration(self, dialogues_srt: str, frame_descriptions_json: str = "[]", config_path: str = "config.yaml") -> dict: """Llama al endpoint del engine /refine_narration para generar narrativa y/o SRT.""" url = f"{self.base_url}/refine_narration" data = { "dialogues_srt": dialogues_srt, "frame_descriptions_json": frame_descriptions_json, "config_path": config_path, } try: r = self.session.post(url, data=data, timeout=self.timeout) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def create_initial_casting(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, face_max_groups: int = 3, face_min_cluster_size: int = 3, face_sensitivity: float = 0.5, voice_max_groups: int = 3, voice_min_cluster_size: int = 3, voice_sensitivity: float = 0.5, max_frames: int = 100) -> dict: """ Llama al endpoint del space 'engine' para crear el 'initial casting'. Envía el vídeo recién importado como archivo y los parámetros de clustering. Args: video_path: Path to video file (if reading from disk) video_bytes: Video file bytes (if already in memory) video_name: Name for the video file face_max_groups: Max number of face clusters (hierarchical) face_min_cluster_size: Minimum face cluster size voice_max_groups: Max number of voice clusters (hierarchical) voice_min_cluster_size: Minimum voice cluster size max_frames: Maximum number of frames to process """ url = f"{self.base_url}/create_initial_casting" try: # Prepare file data if video_bytes: filename = video_name or "video.mp4" files = { "video": (filename, video_bytes, "video/mp4"), } elif video_path: with open(video_path, "rb") as f: files = { "video": (os.path.basename(video_path), f.read(), "video/mp4"), } else: return {"error": "Either video_path or video_bytes must be provided"} data = { "max_groups": str(face_max_groups), "min_cluster_size": str(face_min_cluster_size), "face_sensitivity": str(face_sensitivity), "voice_max_groups": str(voice_max_groups), "voice_min_cluster_size": str(voice_min_cluster_size), "voice_sensitivity": str(voice_sensitivity), "max_frames": str(max_frames), } r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True} except requests.exceptions.RequestException as e: return {"error": str(e)} except Exception as e: return {"error": f"Unexpected error: {str(e)}"} def detect_scenes(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, max_groups: int = 3, min_cluster_size: int = 3, scene_sensitivity: float = 0.5, frame_interval_sec: float = 0.5) -> dict: """ Call engine /detect_scenes to compute scene clusters using hierarchical clustering on color histograms. """ url = f"{self.base_url}/detect_scenes" try: if video_bytes: filename = video_name or "video.mp4" files = { "video": (filename, video_bytes, "video/mp4"), } elif video_path: with open(video_path, "rb") as f: files = { "video": (os.path.basename(video_path), f.read(), "video/mp4"), } else: return {"error": "Either video_path or video_bytes must be provided"} data = { "max_groups": str(max_groups), "min_cluster_size": str(min_cluster_size), "scene_sensitivity": str(scene_sensitivity), "frame_interval_sec": str(frame_interval_sec), } r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict: """ Genera un único MP3 a partir de un texto largo, usando el endpoint de SRT. 1. Convierte el texto en un SRT falso. 2. Llama a /tts/srt con el SRT. 3. Extrae el 'ad_master.mp3' del ZIP resultante. """ if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") # 1. Crear un SRT falso en memoria srt_content = "" start_time = 0 for i, line in enumerate(text_content.strip().split('\n')): line = line.strip() if not line: continue # Asignar 5 segundos por línea, un valor simple end_time = start_time + 5 def format_time(seconds): h = int(seconds / 3600) m = int((seconds % 3600) / 60) s = int(seconds % 60) ms = int((seconds - int(seconds)) * 1000) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" srt_content += f"{i+1}\n" srt_content += f"{format_time(start_time)} --> {format_time(end_time)}\n" srt_content += f"{line}\n\n" start_time = end_time if not srt_content: return {"error": "El texto proporcionado estaba vacío o no se pudo procesar."} # 2. Llamar al endpoint /tts/srt url = f"{self.tts_url.rstrip('/')}/tts/srt" try: files = { 'srt': ('fake_ad.srt', srt_content, 'application/x-subrip') } data = {"voice": voice, "ad_format": "mp3"} r = requests.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() # 3. Extraer 'ad_master.mp3' del ZIP with zipfile.ZipFile(io.BytesIO(r.content)) as z: for filename in z.namelist(): if filename == 'ad_master.mp3': mp3_bytes = z.read(filename) return {"mp3_bytes": mp3_bytes} return {"error": "No se encontró 'ad_master.mp3' en la respuesta del servidor."} except requests.exceptions.RequestException as e: return {"error": f"Error llamando a la API de SRT: {e}"} except zipfile.BadZipFile: return {"error": "La respuesta del servidor no fue un archivo ZIP válido."} def tts_long_text(self, text: str, voice: str = "central/grau") -> dict: """ Llama al endpoint '/tts/text_long' para sintetizar un texto largo. La API se encarga de todo el procesamiento. """ if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") url = f"{self.tts_url.rstrip('/')}/tts/text_long" data = { "texto": text, "voice": voice, "formato": "mp3" } try: # Usamos un timeout más largo por si el texto es muy extenso r = requests.post(url, data=data, timeout=self.timeout * 10) r.raise_for_status() return {"mp3_bytes": r.content} except requests.exceptions.RequestException as e: print(f"Error cridant a TTS per a text llarg: {e}") return {"error": str(e)} # =========================== # Cliente para SVision Space # =========================== # Nombres catalanes comunes para asignar a personajes (deben coincidir con app.py) def get_catalan_names(): """Retorna llistes de noms catalans.""" noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert", "Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"] noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla", "Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"] return noms_home, noms_dona def describe_image_with_svision(image_path: str, is_face: bool = True) -> Tuple[str, str]: """ Llama al space svision para describir una imagen (cara o escena). Args: image_path: Ruta absoluta a la imagen is_face: True si es una cara, False si es una escena Returns: tuple (descripción_completa, nombre_abreviado) """ try: from gradio_client import Client, handle_file # Conectar al space svision con timeout generoso para cold start de ZeroGPU svision_url = os.getenv("SVISION_URL", "https://veureu-svision.hf.space") print(f"[svision] Connectant a {svision_url}...") # Autenticar con HuggingFace token para obtener más cuota de ZeroGPU hf_token = os.getenv("HF_TOKEN") if hf_token: client = Client(svision_url, hf_token=hf_token) print(f"[svision] Autenticat amb token HF") else: client = Client(svision_url) print(f"[svision] Sense autenticació (cuota limitada)") # Preparar prompt según el tipo if is_face: prompt = "Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta." else: prompt = "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals." print(f"[svision] Enviant petició (pot trigar si ZeroGPU està en cold start)...") print(f"[svision] Image path: {image_path}") import time start_time = time.time() max_tokens = 256 if is_face else 128 max_attempts = int(os.getenv("SVISION_MAX_ATTEMPTS", "5")) wait_seconds = int(os.getenv("SVISION_RETRY_WAIT", "5")) result = None last_error: Exception | None = None for attempt in range(1, max_attempts + 1): try: print(f"[svision] Attempt {attempt}/{max_attempts} (wait={wait_seconds}s)") result = client.predict( handle_file(image_path), prompt, max_tokens, 0.7, api_name="/describe" ) if result and isinstance(result, str) and result.strip(): break raise RuntimeError("Resposta buida de svision") except Exception as exc: last_error = exc print(f"[svision] Error attempt {attempt}/{max_attempts}: {exc}") if attempt == max_attempts: raise time.sleep(wait_seconds) wait_seconds = min(wait_seconds * 2, 40) elapsed = time.time() - start_time print(f"[svision] Resposta rebuda en {elapsed:.1f}s") full_description = result.strip() if result else "" # PASO 1: Eliminar el prompt original que puede aparecer en la respuesta prompt_markers = [ "Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.", "Descriu aquesta escena. Inclou: tipus de localització (interior/exterior), elements principals, ambient, il·luminació.", "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.", "Descriu aquesta persona.", "Descriu aquesta escena.", ] for marker in prompt_markers: if marker in full_description: # Eliminar el prompt y todo lo que esté antes parts = full_description.split(marker, 1) if len(parts) > 1: full_description = parts[1].strip() # PASO 2: Limpiar prefijos no deseados de forma más agresiva # Lista de prefijos comunes que aparecen prefixes_to_remove = [ "user:", "user ", "user\n", "user\t", "assistant:", "assistant ", "assistant\n", "assistant\t", "User:", "User ", "User\n", "Assistant:", "Assistant ", "Assistant\n", "system:", "system ", ] # Intentar limpiar múltiples veces por si hay varios prefijos for _ in range(5): # Máximo 5 iteraciones original = full_description for prefix in prefixes_to_remove: if full_description.lower().startswith(prefix.lower()): full_description = full_description[len(prefix):].strip() break if original == full_description: break # No hubo cambios, salir # PASO 3: Limpiar espacios en blanco múltiples y saltos de línea al inicio full_description = full_description.lstrip() # PASO 4: Si empieza con salto de línea o tabulación, limpiar while full_description and full_description[0] in ['\n', '\t', '\r', ' ']: full_description = full_description[1:] if not full_description: return ("", "") # Generar nombre aleatorio en catalán para caras if is_face: # Extraer características clave para el nombre desc_lower = full_description.lower() # Determinar género is_female = any(word in desc_lower for word in ["dona", "noia", "nena", "femení", "femenina"]) # Seleccionar nombre aleatorio pero consistente (hash del path) import hashlib hash_val = int(hashlib.md5(image_path.encode()).hexdigest(), 16) noms_home, noms_dona = get_catalan_names() if is_female: name_list = noms_dona else: name_list = noms_home # Usar hash para selección consistente short_name = name_list[hash_val % len(name_list)] else: # Para escenas, extraer primeras palabras clave words = full_description.split()[:4] short_name = " ".join(words).capitalize() print(f"[svision] Descripció generada: {full_description[:100]}...") print(f"[svision] Nom: {short_name}") return (full_description, short_name) except Exception as e: print(f"[svision] Error al descriure imatge: {e}") import traceback traceback.print_exc() return ("", "") def generate_short_scene_name(description: str) -> str: """ Genera un nombre corto de escena (< 3 palabras) basándose en la descripción usando el modelo schat (Salamandra-Instruct). Args: description: Descripción completa de la escena de svision Returns: Nombre corto de la escena (< 3 palabras) o string vacío si falla """ try: # Importar gradio_client from gradio_client import Client, handle_file # URL del space schat schat_url = os.getenv("SCHAT_URL", "https://veureu-schat.hf.space") print(f"[schat] Connectant a {schat_url}...") # Autenticar con HuggingFace token hf_token = os.getenv("HF_TOKEN") if hf_token: client = Client(schat_url, hf_token=hf_token) print(f"[schat] Autenticat amb token HF") else: client = Client(schat_url) print(f"[schat] Sense autenticació") # Preparar prompt prompt = f"Basant-te en aquesta descripció d'una escena, genera un nom curt de menys de 3 paraules que la resumeixi:\n\n{description}\n\nNom de l'escena:" print(f"[schat] Generant nom curt per descripció: {description[:100]}...") # Llamar al endpoint /predict de schat # Parámetros típicos: mensaje, historial, max_new_tokens, temperature, top_p, top_k, repetition_penalty result = client.predict( prompt, # mensaje [], # historial vacío 256, # max_new_tokens 0.7, # temperature 0.9, # top_p 50, # top_k 1.0, # repetition_penalty api_name="/predict" ) # El resultado es una tupla (respuesta, historial) if isinstance(result, tuple) and len(result) >= 1: short_name = result[0].strip() if result[0] else "" elif isinstance(result, str): short_name = result.strip() else: short_name = "" # Limpiar posibles comillas o puntuación extra short_name = short_name.strip('"\'.,!?').strip() # Limpiar prefijos no deseados prefixes_to_remove = [ "Nom de l'escena:", "nom de l'escena:", "Escena:", "escena:", ] for prefix in prefixes_to_remove: if short_name.lower().startswith(prefix.lower()): short_name = short_name[len(prefix):].strip() # Limitar a 3 palabras words = short_name.split() if len(words) > 3: short_name = " ".join(words[:3]) print(f"[schat] Nom curt generat: {short_name}") return short_name except Exception as e: print(f"[schat] Error al generar nom curt: {e}") import traceback traceback.print_exc() return ""