# api_client.py (UI - Space "veureu") import os import requests import base64 import zipfile import io import json from datetime import datetime from typing import Iterable, Dict, Any, Tuple from PIL import Image def _log(msg: str, level: str = "INFO") -> None: """Log a message with timestamp for API calls debugging.""" ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{ts}] [API_CLIENT] [{level}] {msg}") class APIClient: """ Cliente para 'engine': POST /jobs -> {"job_id": "..."} GET /jobs/{job_id}/status -> {"status": "queued|processing|done|failed", ...} GET /jobs/{job_id}/result -> JobResult {"book": {...}, "une": {...}, ...} """ def __init__(self, base_url: str, use_mock: bool = False, data_dir: str | None = None, token: str | None = None, timeout: int = 180, tts_url: str | None = None): # Base URL del engine: prioritzar variable d'entorn ENGINE_URL env_engine_url = os.getenv("ENGINE_URL") self.base_url = (env_engine_url or base_url or "").rstrip("/") # URL específica para el servicio TTS (por defecto usa la variable de entorno API_TTS_URL) self.tts_url = tts_url or os.getenv("API_TTS_URL", "https://veureu-tts.hf.space") self.use_mock = use_mock self.data_dir = data_dir self.timeout = timeout self.session = requests.Session() # Token secret del engine: prioritzar HF_TOKEN (alineat amb engine) token = token or os.getenv("VEUREU_TOKEN") if token: self.session.headers.update({"Authorization": f"Bearer {token}"}) # ---- modo real (engine) ---- def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: url = f"{self.base_url}/preprocessing/jobs" files = {"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream")} data = {"modes": ",".join(modes)} r = self.session.post(url, files=files, data=data, timeout=self.timeout) r.raise_for_status() return r.json() # {"job_id": ...} def _get_status(self, job_id: str) -> Dict[str, Any]: url = f"{self.base_url}/preprocessing/jobs/{job_id}/status" _log(f"GET {url}") r = self.session.get(url, timeout=self.timeout) if r.status_code == 404: _log(f" -> 404 not_found", "WARN") return {"status": "not_found"} r.raise_for_status() result = r.json() _log(f" -> status={result.get('status', 'unknown')}") return result def _get_result(self, job_id: str) -> Dict[str, Any]: url = f"{self.base_url}/preprocessing/jobs/{job_id}/result" r = self.session.get(url, timeout=self.timeout) if r.status_code == 404: return {"status": "not_found"} r.raise_for_status() return r.json() # JobResult (status + results según engine) # ---- API que usa streamlit_app.py ---- def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: """Devuelve {"job_id": "..."}""" if self.use_mock: return {"job_id": "mock-123"} return self._post_jobs(video_path, modes) def get_job(self, job_id: str) -> Dict[str, Any]: """ La UI espera algo del estilo: {"status":"done","results":{"book":{...},"une":{...}}} Adaptamos la respuesta de /result del engine a ese contrato. """ if self.use_mock: # resultado inmediato de prueba return { "status": "done", "results": { "book": {"text": "Text d'exemple (book)", "mp3_bytes": b""}, "une": {"srt": "1\n00:00:00,000 --> 00:00:01,000\nExemple UNE\n", "mp3_bytes": b""}, } } # El endpoint /status ya devuelve los resultados cuando el job está completado st = self._get_status(job_id) status = st.get("status", "unknown") if status in {"queued", "processing"}: return {"status": status} # Si el status indica completado, los resultados ya vienen en st if "results" in st: _log(f" -> Job done with results: {list(st.get('results', {}).keys())}") return { "status": status, "results": st.get("results", {}), } # Fallback: si no hay results en status, devolver status tal cual _log(f" -> Job status={status}, no results found in response") return {"status": status, "results": {}} def tts_matxa(self, text: str, voice: str = "central/grau") -> dict: """ Llama al space 'tts' para sintetizar audio. Usa /tts/text para textos cortos (<480 chars) o /tts/text_long para textos largos. Args: text (str): Texto a sintetizar. voice (str): Voz de Matxa a usar (p.ej. 'central/grau'). Returns: dict: {'mp3_bytes': bytes} o {'error': str} """ if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") # Usar endpoint apropiado según la longitud del texto if len(text) > 480: url = f"{self.tts_url.rstrip('/')}/tts/text_long" else: url = f"{self.tts_url.rstrip('/')}/tts/text" data = { "texto": text, "voice": voice, "formato": "mp3" } try: r = self.session.post(url, data=data, timeout=self.timeout * 2) r.raise_for_status() return {"mp3_bytes": r.content} except requests.exceptions.RequestException as e: return {"error": str(e)} def generate_une_ad_audio_from_srt(self, srt_text: str, voice: str = "central/grau") -> dict: if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") url = f"{self.tts_url.rstrip('/')}/tts/srt_ad_audio" files = { "srt": ("une_ad.srt", srt_text.encode("utf-8"), "text/plain"), } data = { "voice": voice, "ad_format": "mp3", } try: r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() return {"mp3_bytes": r.content} except requests.exceptions.RequestException as e: return {"error": str(e)} def import_databases(self) -> dict: """Descarga todas las BDs del engine (/data/db) como ZIP. Endpoint: GET /db/download_all_db_files Retorna: {"zip_bytes": bytes} o {"error": str} """ token = os.getenv("VEUREU_TOKEN", "") url = f"{self.base_url}/db/download_all_db_files" try: r = self.session.get(url, params={"token": token}, timeout=self.timeout * 2) r.raise_for_status() # El endpoint devuelve un ZIP binario return {"zip_bytes": r.content} except requests.exceptions.RequestException as e: return {"error": str(e)} # --- Upload original video to engine media storage --- def upload_original_video(self, video_bytes: bytes, video_name: str) -> dict: """Sube el vídeo original al engine para procesamiento posterior. Endpoint: POST /media/upload_original_video El engine calcula el SHA1 y lo guarda en /data/media//clip/ """ url = f"{self.base_url}/media/upload_original_video" token = os.getenv("VEUREU_TOKEN", "") try: files = {"video": (video_name, video_bytes, "video/mp4")} params = {"token": token} r = self.session.post(url, files=files, params=params, timeout=self.timeout * 5) r.raise_for_status() return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} except requests.exceptions.RequestException as e: return {"error": str(e)} # --- Initial transcription (generate_initial_srt_and_info + downloads) --- def generate_initial_srt_and_info(self, sha1sum: str) -> dict: """Lanza el pipeline inicial de transcripció al engine. Endpoint: POST /transcription/generate_initial_srt_and_info Params: sha1, token (HF_TOKEN) """ url = f"{self.base_url}/transcription/generate_initial_srt_and_info" # Usar siempre HF_TOKEN per validar contra engine shared_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if shared_token: params["token"] = shared_token try: r = self.session.post(url, params=params, timeout=self.timeout * 10) r.raise_for_status() # El backend pot retornar text pla o JSON; ho encapsulem sempre com dict if r.headers.get("content-type", "").startswith("application/json"): body = r.json() else: body = {"srt": r.text or ""} body.setdefault("status", "ok") return body except requests.exceptions.RequestException as e: return {"error": str(e)} def download_initial_srt(self, sha1sum: str) -> dict: """Descarrega l'initial.srt generat pel pipeline inicial. Endpoint: GET /transcription/download_initial_srt """ url = f"{self.base_url}/transcription/download_initial_srt" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.get(url, params=params, timeout=self.timeout * 5) r.raise_for_status() # El backend retorna un fitxer de text (SRT) return {"text": r.text or ""} except requests.exceptions.RequestException as e: return {"error": str(e)} def download_initial_info(self, sha1sum: str) -> dict: """Descarrega l'info.json inicial associat al vídeo. Endpoint: GET /transcription/download_initial_info """ url = f"{self.base_url}/transcription/download_initial_info" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.get(url, params=params, timeout=self.timeout * 5) r.raise_for_status() return {"text": r.text or ""} except requests.exceptions.RequestException as e: return {"error": str(e)} # --- Salamandra pipeline (result.srt + free_narration.txt) --- def generate_salamandra_result(self, sha1sum: str) -> dict: """Orquestra la generació dels fitxers de sortida de Salamandra. Endpoint: POST /salamandra/generate_salamadra_result """ url = f"{self.base_url}/salamandra/generate_salamandra_result" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.post(url, params=params, timeout=self.timeout * 20) r.raise_for_status() return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} except requests.exceptions.RequestException as e: return {"error": str(e)} def download_salamandra_srt(self, sha1sum: str) -> dict: """Descarrega el result.srt de Salamandra. Endpoint: GET /salamandra/download_salamadra_srt """ url = f"{self.base_url}/salamandra/download_salamadra_srt" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.get(url, params=params, timeout=self.timeout * 5) r.raise_for_status() return {"text": r.text or ""} except requests.exceptions.RequestException as e: return {"error": str(e)} def download_salamandra_free_narration(self, sha1sum: str) -> dict: """Descarrega el free_narration.txt de Salamandra. Endpoint: GET /salamandra/download_salamadra_free_narration """ url = f"{self.base_url}/salamandra/download_salamadra_free_narration" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.get(url, params=params, timeout=self.timeout * 5) r.raise_for_status() return {"text": r.text or ""} except requests.exceptions.RequestException as e: return {"error": str(e)} # --- MoE pipeline (result.srt + free_narration.txt) --- def generate_moe_result(self, sha1sum: str) -> dict: """Orquestra la generació dels fitxers de sortida de MoE. Endpoint: POST /moe/generate_moe_result """ url = f"{self.base_url}/moe/generate_moe_result" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.post(url, params=params, timeout=self.timeout * 20) r.raise_for_status() return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} except requests.exceptions.RequestException as e: return {"error": str(e)} def download_moe_srt(self, sha1sum: str) -> dict: """Descarrega el result.srt de MoE. Endpoint: GET /moe/download_moe_srt """ url = f"{self.base_url}/moe/download_moe_srt" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.get(url, params=params, timeout=self.timeout * 5) r.raise_for_status() return {"text": r.text or ""} except requests.exceptions.RequestException as e: return {"error": str(e)} def download_moe_free_narration(self, sha1sum: str) -> dict: """Descarrega el free_narration.txt de MoE. Endpoint: GET /moe/download_moe_free_narration """ url = f"{self.base_url}/preprocessing/moe/download_moe_free_narration" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.get(url, params=params, timeout=self.timeout * 5) r.raise_for_status() return {"text": r.text or ""} except requests.exceptions.RequestException as e: return {"error": str(e)} def upload_embeddings(self, video_hash: str, embeddings_json: dict) -> dict: """Puja el JSON de càsting (faces+voices) com a embeddings al backend engine. Utilitza l'endpoint /embeddings/upload_embeddings per als dos tipus ('faces' i 'voices'). """ url = f"{self.base_url}/embeddings/upload_embeddings" shared_token = os.getenv("VEUREU_TOKEN") # Serialitzar un sol cop el JSON complet de càsting try: payload_bytes = json.dumps(embeddings_json, ensure_ascii=False).encode("utf-8") except Exception as e: return {"error": f"Error serialitzant embeddings_json: {e}"} results: dict[str, Any] = {} for embedding_type in ("faces", "voices"): params = { "embedding_type": embedding_type, "video_hash": video_hash, } if shared_token: params["token"] = shared_token files = { "file": ("embeddings.json", payload_bytes, "application/json"), } try: r = self.session.post(url, params=params, files=files, timeout=self.timeout * 2) r.raise_for_status() results[embedding_type] = r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} except requests.exceptions.RequestException as e: results[embedding_type] = {"error": str(e)} return results def import_media(self, sha1sum: str) -> dict: url = f"{self.base_url}/preprocessing/import_media/{sha1sum}" try: r = self.session.get(url, timeout=self.timeout * 5) r.raise_for_status() return {"zip_bytes": r.content} except requests.exceptions.RequestException as e: return {"error": str(e)} def import_media_version(self, sha1sum: str, version: str) -> dict: url = f"{self.base_url}/import_media_version/{sha1sum}/{version}" try: r = self.session.get(url, timeout=self.timeout * 5) r.raise_for_status() return {"zip_bytes": r.content} except requests.exceptions.RequestException as e: return {"error": str(e)} # ---- Pending videos (peding_videos) ---- def upload_pending_video(self, video_bytes: bytes, filename: str) -> dict: """Sube un vídeo pendiente al engine (carpeta /data/peding_videos). Usa el endpoint POST /peding_videos/upload_pending_video. """ url = f"{self.base_url}/pending_videos/upload_pending_video" files = {"video": (filename, io.BytesIO(video_bytes), "video/mp4")} # El backend engine requereix un token de query (?token=...) validat contra HF_TOKEN hf_token = os.getenv("VEUREU_TOKEN") params = {"token": hf_token} if hf_token else {} try: r = self.session.post(url, params=params, files=files, timeout=self.timeout * 5) r.raise_for_status() return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} except requests.exceptions.RequestException as e: return {"error": str(e)} def list_pending_videos(self) -> dict: """Llista els vídeos pendents al backend (endpoint GET /peding_videos/list_peding_videos).""" url = f"{self.base_url}/pending_videos/list_pending_videos" hf_token = os.getenv("VEUREU_TOKEN") params = {"token": hf_token} if hf_token else {} try: r = self.session.get(url, params=params, timeout=self.timeout * 5) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def download_pending_video(self, sha1sum: str) -> dict: """Descarrega un vídeo pendent per sha1 (GET /peding_videos/download_peding_video).""" url = f"{self.base_url}/pending_videos/download_pending_video" hf_token = os.getenv("VEUREU_TOKEN") params = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.get(url, params=params, timeout=self.timeout * 5) r.raise_for_status() return {"video_bytes": r.content} except requests.exceptions.RequestException as e: return {"error": str(e)} def update_databases(self, payload: dict) -> dict: """Envia les sentències SQL generades a l'endpoint /update_databases.""" url = f"{self.base_url}/update_databases" try: r = self.session.post(url, json=payload, timeout=self.timeout * 5) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def export_media(self, zip_bytes: bytes) -> dict: """Envia un ZIP amb els nous vídeos a l'endpoint /export_media.""" url = f"{self.base_url}/export_media" files = {"media_zip": ("media_export.zip", zip_bytes, "application/zip")} try: r = self.session.post(url, files=files, timeout=self.timeout * 10) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def generate_audiodescription(self, video_bytes: bytes, video_name: str) -> dict: """Llama al endpoint del engine /generate_audiodescription con un MP4 en memoria.""" url = f"{self.base_url}/preprocessing/generate_audiodescription" try: files = { "video": (video_name or "video.mp4", video_bytes, "video/mp4") } r = self.session.post(url, files=files, timeout=self.timeout * 10) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def generate_salamandra_ad_from_sha1(self, sha1sum: str) -> dict: """Genera l'SRT d'audiodescripció (Salamandra) a partir del SHA1 del vídeo. Crida al endpoint /transcription/generate_srt del engine, que retorna directament el contingut de l'SRT com a text pla. Aquest mètode embolica la resposta en un dict compatible amb la UI existent: {"status": "done", "results": {"une_srt": "...", "free_text": ""}} """ url = f"{self.base_url}/transcription/generate_srt" hf_token = os.getenv("VEUREU_TOKEN") params: dict[str, Any] = {"sha1": sha1sum} if hf_token: params["token"] = hf_token try: r = self.session.post(url, params=params, timeout=self.timeout * 10) r.raise_for_status() srt_text = r.text or "" return { "status": "done", "results": { "une_srt": srt_text, "free_text": "", }, } except requests.exceptions.RequestException as e: return {"error": str(e)} def finalize_casting(self, payload: dict) -> dict: """Envía el càsting definitiu al engine para consolidar identidades e indexar.""" url = f"{self.base_url}/preprocessing/finalize_casting" n_chars = len(payload.get("characters", [])) n_voices = len(payload.get("voice_clusters", [])) _log(f"POST {url} (characters={n_chars}, voice_clusters={n_voices})") try: r = self.session.post(url, json=payload, timeout=self.timeout * 5) r.raise_for_status() result = r.json() _log(f" -> ok, face_identities={len(result.get('face_identities', []))}, voice_identities={len(result.get('voice_identities', []))}") return result except requests.exceptions.HTTPError as e: resp = e.response _log(f" -> HTTP ERROR {resp.status_code if resp else '?'}: {e}", "ERROR") try: # Try to include JSON error if present return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": resp.json() if resp is not None else None} except Exception: # Fallback to text body return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": (resp.text if resp is not None else None)} except requests.exceptions.RequestException as e: _log(f" -> REQUEST ERROR: {e}", "ERROR") return {"error": str(e)} def load_casting(self, faces_dir: str, voices_dir: str, db_dir: str, drop_collections: bool = False) -> dict: """Carga índices de caras y voces al motor de búsqueda Chroma del engine.""" url = f"{self.base_url}/preprocessing/load_casting" _log(f"POST {url} (faces_dir={faces_dir})") data = { "faces_dir": faces_dir, "voices_dir": voices_dir, "db_dir": db_dir, "drop_collections": str(1 if drop_collections else 0), } try: r = self.session.post(url, data=data, timeout=self.timeout * 5) r.raise_for_status() result = r.json() _log(f" -> ok={result.get('ok')}, faces={result.get('faces', 0)}, voices={result.get('voices', 0)}") return result except requests.exceptions.RequestException as e: _log(f" -> ERROR: {e}", "ERROR") return {"error": str(e)} def rebuild_video_with_ad(self, video_path: str, srt_path: str, voice: str = "central/grau") -> dict: """ Llama al space 'tts' para reconstruir un vídeo con audiodescripció a partir de un SRT. Usa el endpoint /tts/srt que devuelve un ZIP con el vídeo final. Args: video_path: Ruta al archivo de vídeo original srt_path: Ruta al archivo SRT con las audiodescripciones voice: Voz de Matxa (por defecto 'central/grau') Returns: dict: {'video_bytes': bytes} o {'error': str} """ if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") url = f"{self.tts_url.rstrip('/')}/tts/srt" try: with open(video_path, 'rb') as video_file: with open(srt_path, 'rb') as srt_file: files = { 'video': (os.path.basename(video_path), video_file, 'video/mp4'), 'srt': (os.path.basename(srt_path), srt_file, 'application/x-subrip') } data = { "voice": voice, "ad_format": "mp3", "include_final_mp4": "1" } r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() # El servidor devuelve un ZIP, lo procesamos en memoria with zipfile.ZipFile(io.BytesIO(r.content)) as z: # Buscamos el archivo video_con_ad.mp4 dentro del ZIP for filename in z.namelist(): if filename.endswith('.mp4'): video_bytes = z.read(filename) return {"video_bytes": video_bytes} # Si no se encuentra el MP4 en el ZIP return {"error": "No se encontró el archivo de vídeo MP4 en la respuesta del servidor."} except requests.exceptions.RequestException as e: return {"error": str(e)} except zipfile.BadZipFile: return {"error": "La respuesta del servidor no fue un archivo ZIP válido."} except Exception as e: return {"error": str(e)} def apply_refinement( self, *, sha1sum: str | None = None, version: str | None = None, srt_content: str | None = None, reflection_enabled: bool = True, reflexion_enabled: bool = False, introspection_enabled: bool = False, ) -> dict: """Aplica el pipeline de refinement multi-agent sobre un SRT. Endpoint: POST /refinement/apply_refinement Pot treballar de dues maneres: - Passant sha1sum+version perquè el backend llegeixi l'SRT de les BDs - Passant srt_content explícitament """ url = f"{self.base_url}/refinement/apply_refinement" hf_token = os.getenv("VEUREU_TOKEN") payload: dict[str, Any] = { "reflection_enabled": bool(reflection_enabled), "reflexion_enabled": bool(reflexion_enabled), "introspection_enabled": bool(introspection_enabled), } if sha1sum is not None: payload["sha1sum"] = sha1sum if version is not None: payload["version"] = version if srt_content is not None: payload["srt_content"] = srt_content if hf_token: payload["token"] = hf_token try: r = self.session.post(url, json=payload, timeout=self.timeout * 10) r.raise_for_status() return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} except requests.exceptions.RequestException as e: return {"error": str(e)} def refine_narration(self, dialogues_srt: str, frame_descriptions_json: str = "[]", config_path: str = "config.yaml") -> dict: """Llama al endpoint del engine /refine_narration para generar narrativa y/o SRT.""" url = f"{self.base_url}/refine_narration" data = { "dialogues_srt": dialogues_srt, "frame_descriptions_json": frame_descriptions_json, "config_path": config_path, } try: r = self.session.post(url, data=data, timeout=self.timeout) r.raise_for_status() return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def create_initial_casting(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, face_max_groups: int = 3, face_min_cluster_size: int = 3, voice_max_groups: int = 3, voice_min_cluster_size: int = 3, max_frames: int = 100) -> dict: """ Llama al endpoint del space 'engine' para crear el 'initial casting'. Envía el vídeo recién importado como archivo y los parámetros de clustering. Args: video_path: Path to video file (if reading from disk) video_bytes: Video file bytes (if already in memory) video_name: Name for the video file face_max_groups: k-Target de clusters de cara (hierarchical) face_min_cluster_size: Mida mínima de cada cluster de cara voice_max_groups: k-Target de clusters de veu (hierarchical) voice_min_cluster_size: Mida mínima de cada cluster de veu max_frames: Maximum number of frames to process """ url = f"{self.base_url}/preprocessing/create_initial_casting" _log(f"POST {url} (video={video_name or video_path}, max_frames={max_frames})") try: # Prepare file data if video_bytes: filename = video_name or "video.mp4" files = { "video": (filename, video_bytes, "video/mp4"), } elif video_path: with open(video_path, "rb") as f: files = { "video": (os.path.basename(video_path), f.read(), "video/mp4"), } else: return {"error": "Either video_path or video_bytes must be provided"} data = { "max_groups": str(face_max_groups), "min_cluster_size": str(face_min_cluster_size), "voice_max_groups": str(voice_max_groups), "voice_min_cluster_size": str(voice_min_cluster_size), "max_frames": str(max_frames), } r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() result = r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True} _log(f" -> job_id={result.get('job_id', '?')}") return result except requests.exceptions.RequestException as e: _log(f" -> ERROR: {e}", "ERROR") return {"error": str(e)} except Exception as e: _log(f" -> UNEXPECTED ERROR: {e}", "ERROR") return {"error": f"Unexpected error: {str(e)}"} def detect_scenes(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, max_groups: int = 3, min_cluster_size: int = 3, scene_sensitivity: float = 0.5, frame_interval_sec: float = 0.5, max_frames: int = 100) -> dict: """ Call engine /detect_scenes to compute scene clusters using hierarchical clustering on color histograms. """ url = f"{self.base_url}/preprocessing/detect_scenes" _log(f"POST {url} (video={video_name or video_path}, max_groups={max_groups})") try: if video_bytes: filename = video_name or "video.mp4" files = { "video": (filename, video_bytes, "video/mp4"), } elif video_path: with open(video_path, "rb") as f: files = { "video": (os.path.basename(video_path), f.read(), "video/mp4"), } else: return {"error": "Either video_path or video_bytes must be provided"} data = { "max_groups": str(max_groups), "min_cluster_size": str(min_cluster_size), "scene_sensitivity": str(scene_sensitivity), "frame_interval_sec": str(frame_interval_sec), "max_frames": str(max_frames), } r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() result = r.json() _log(f" -> scene_clusters={len(result.get('scene_clusters', []))}") return result except requests.exceptions.RequestException as e: _log(f" -> ERROR: {e}", "ERROR") return {"error": str(e)} def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict: """ Genera un único MP3 a partir de un texto largo, usando el endpoint de SRT. 1. Convierte el texto en un SRT falso. 2. Llama a /tts/srt con el SRT. 3. Extrae el 'ad_master.mp3' del ZIP resultante. """ if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") # 1. Crear un SRT falso en memoria srt_content = "" start_time = 0 for i, line in enumerate(text_content.strip().split('\n')): line = line.strip() if not line: continue # Asignar 5 segundos por línea, un valor simple end_time = start_time + 5 def format_time(seconds): h = int(seconds / 3600) m = int((seconds % 3600) / 60) s = int(seconds % 60) ms = int((seconds - int(seconds)) * 1000) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" srt_content += f"{i+1}\n" srt_content += f"{format_time(start_time)} --> {format_time(end_time)}\n" srt_content += f"{line}\n\n" start_time = end_time if not srt_content: return {"error": "El texto proporcionado estaba vacío o no se pudo procesar."} # 2. Llamar al endpoint /tts/srt url = f"{self.tts_url.rstrip('/')}/tts/srt" try: files = { 'srt': ('fake_ad.srt', srt_content, 'application/x-subrip') } data = {"voice": voice, "ad_format": "mp3"} r = requests.post(url, files=files, data=data, timeout=self.timeout * 5) r.raise_for_status() # 3. Extraer 'ad_master.mp3' del ZIP with zipfile.ZipFile(io.BytesIO(r.content)) as z: for filename in z.namelist(): if filename == 'ad_master.mp3': mp3_bytes = z.read(filename) return {"mp3_bytes": mp3_bytes} return {"error": "No se encontró 'ad_master.mp3' en la respuesta del servidor."} except requests.exceptions.RequestException as e: return {"error": f"Error llamando a la API de SRT: {e}"} except zipfile.BadZipFile: return {"error": "La respuesta del servidor no fue un archivo ZIP válido."} def tts_long_text(self, text: str, voice: str = "central/grau") -> dict: """ Llama al endpoint '/tts/text_long' para sintetizar un texto largo. La API se encarga de todo el procesamiento. """ if not self.tts_url: raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") url = f"{self.tts_url.rstrip('/')}/tts/text_long" data = { "texto": text, "voice": voice, "formato": "mp3" } try: # Usamos un timeout más largo por si el texto es muy extenso r = requests.post(url, data=data, timeout=self.timeout * 10) r.raise_for_status() return {"mp3_bytes": r.content} except requests.exceptions.RequestException as e: return {"error": str(e)} # =========================== # Cliente para SVision Space # =========================== # Nombres catalanes comunes para asignar a personajes (deben coincidir con app.py) def get_catalan_names(): """Retorna llistes de noms catalans.""" noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert", "Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"] noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla", "Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"] return noms_home, noms_dona def describe_image_with_svision(image_path: str, is_face: bool = True) -> Tuple[str, str]: """ Llama al space svision para describir una imagen (cara o escena). Args: image_path: Ruta absoluta a la imagen is_face: True si es una cara, False si es una escena Returns: tuple (descripción_completa, nombre_abreviado) """ try: from gradio_client import Client, handle_file # Conectar al space svision con timeout generoso per al cold start de ZeroGPU svision_url = os.getenv("SVISION_URL", "https://veureu-svision.hf.space") client = Client(svision_url) # Preparar prompt según el tipo if is_face: prompt = "Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta." else: prompt = "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals." import time start_time = time.time() max_tokens = 256 if is_face else 128 max_attempts = int(os.getenv("SVISION_MAX_ATTEMPTS", "5")) wait_seconds = int(os.getenv("SVISION_RETRY_WAIT", "5")) result = None last_error: Exception | None = None for attempt in range(1, max_attempts + 1): try: result = client.predict( handle_file(image_path), prompt, max_tokens, 0.7, api_name="/describe" ) if result and isinstance(result, str) and result.strip(): break raise RuntimeError("Resposta buida de svision") except Exception as exc: last_error = exc if attempt == max_attempts: raise time.sleep(wait_seconds) wait_seconds = min(wait_seconds * 2, 40) full_description = result.strip() if result else "" # PASO 1: Eliminar el prompt original que puede aparecer en la respuesta prompt_markers = [ "Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.", "Descriu aquesta escena. Inclou: tipus de localització (interior/exterior), elements principals, ambient, il·luminació.", "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.", "Descriu aquesta persona.", "Descriu aquesta escena.", ] for marker in prompt_markers: if marker in full_description: # Eliminar el prompt y todo lo que esté antes parts = full_description.split(marker, 1) if len(parts) > 1: full_description = parts[1].strip() # PASO 2: Limpiar prefijos no deseados de forma más agresiva # Lista de prefijos comunes que aparecen prefixes_to_remove = [ "user:", "user ", "user\n", "user\t", "assistant:", "assistant ", "assistant\n", "assistant\t", "User:", "User ", "User\n", "Assistant:", "Assistant ", "Assistant\n", "system:", "system ", ] # Intentar limpiar múltiples veces por si hay varios prefijos for _ in range(5): # Máximo 5 iteraciones original = full_description for prefix in prefixes_to_remove: if full_description.lower().startswith(prefix.lower()): full_description = full_description[len(prefix):].strip() break if original == full_description: break # No hubo cambios, salir # PASO 3: Limpiar espacios en blanco múltiples y saltos de línea al inicio full_description = full_description.lstrip() # PASO 4: Si empieza con salto de línea o tabulación, limpiar while full_description and full_description[0] in ['\n', '\t', '\r', ' ']: full_description = full_description[1:] if not full_description: return ("", "") # Generar nombre aleatorio en catalán para caras if is_face: # Extraer características clave para el nombre desc_lower = full_description.lower() # Determinar género is_female = any(word in desc_lower for word in ["dona", "noia", "nena", "femení", "femenina"]) # Seleccionar nombre aleatorio pero consistente (hash del path) import hashlib hash_val = int(hashlib.md5(image_path.encode()).hexdigest(), 16) noms_home, noms_dona = get_catalan_names() if is_female: name_list = noms_dona else: name_list = noms_home # Usar hash para selección consistente short_name = name_list[hash_val % len(name_list)] else: # Para escenas, extraer primeras palabras clave words = full_description.split()[:4] short_name = " ".join(words).capitalize() return (full_description, short_name) except Exception as e: return ("", "") def generate_short_scene_name(description: str) -> str: """ Genera un nombre corto de escena (< 3 palabras) basándose en la descripción usando el modelo schat (Salamandra-Instruct). Args: description: Descripción completa de la escena de svision Returns: Nombre corto de la escena (< 3 palabras) o string vacío si falla """ try: # Importar gradio_client from gradio_client import Client, handle_file # URL del space schat schat_url = os.getenv("SCHAT_URL", "https://veureu-schat.hf.space") client = Client(schat_url) # Preparar prompt prompt = f"Basant-te en aquesta descripció d'una escena, genera un nom curt de menys de 3 paraules que la resumeixi:\n\n{description}\n\nNom de l'escena:" # Llamar al endpoint /predict de schat # Parámetros típicos: mensaje, historial, max_new_tokens, temperature, top_p, top_k, repetition_penalty result = client.predict( prompt, # mensaje [], # historial vacío 256, # max_new_tokens 0.7, # temperature 0.9, # top_p 50, # top_k 1.0, # repetition_penalty api_name="/predict" ) # El resultado es una tupla (respuesta, historial) if isinstance(result, tuple) and len(result) >= 1: short_name = result[0].strip() if result[0] else "" elif isinstance(result, str): short_name = result.strip() else: short_name = "" # Limpiar posibles comillas o puntuación extra short_name = short_name.strip('"\'.,!?').strip() # Limpiar prefijos no deseados prefixes_to_remove = [ "Nom de l'escena:", "nom de l'escena:", "Escena:", "escena:", ] for prefix in prefixes_to_remove: if short_name.lower().startswith(prefix.lower()): short_name = short_name[len(prefix):].strip() # Limitar a 3 palabras words = short_name.split() if len(words) > 3: short_name = " ".join(words[:3]) return short_name except Exception as e: return ""