demo / api_client.py
VeuReu's picture
Upload 3 files
4f0191e verified
# api_client.py (UI - Space "veureu")
import os
import requests
import base64
import zipfile
import io
import json
from datetime import datetime
from typing import Iterable, Dict, Any, Tuple
from PIL import Image
def _log(msg: str, level: str = "INFO") -> None:
"""Log a message with timestamp for API calls debugging."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{ts}] [API_CLIENT] [{level}] {msg}")
class APIClient:
"""
Cliente para 'engine':
POST /jobs -> {"job_id": "..."}
GET /jobs/{job_id}/status -> {"status": "queued|processing|done|failed", ...}
GET /jobs/{job_id}/result -> JobResult {"book": {...}, "une": {...}, ...}
"""
def __init__(self, base_url: str, use_mock: bool = False, data_dir: str | None = None, token: str | None = None, timeout: int = 180, tts_url: str | None = None):
# Base URL del engine: prioritzar variable d'entorn ENGINE_URL
env_engine_url = os.getenv("ENGINE_URL")
self.base_url = (env_engine_url or base_url or "").rstrip("/")
# URL específica para el servicio TTS (por defecto usa la variable de entorno API_TTS_URL)
self.tts_url = tts_url or os.getenv("API_TTS_URL", "https://veureu-tts.hf.space")
self.use_mock = use_mock
self.data_dir = data_dir
self.timeout = timeout
self.session = requests.Session()
# Token secret del engine: prioritzar HF_TOKEN (alineat amb engine)
token = token or os.getenv("VEUREU_TOKEN")
if token:
self.session.headers.update({"Authorization": f"Bearer {token}"})
# ---- modo real (engine) ----
def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]:
url = f"{self.base_url}/preprocessing/jobs"
files = {"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream")}
data = {"modes": ",".join(modes)}
r = self.session.post(url, files=files, data=data, timeout=self.timeout)
r.raise_for_status()
return r.json() # {"job_id": ...}
def _get_status(self, job_id: str) -> Dict[str, Any]:
url = f"{self.base_url}/preprocessing/jobs/{job_id}/status"
_log(f"GET {url}")
r = self.session.get(url, timeout=self.timeout)
if r.status_code == 404:
_log(f" -> 404 not_found", "WARN")
return {"status": "not_found"}
r.raise_for_status()
result = r.json()
_log(f" -> status={result.get('status', 'unknown')}")
return result
def _get_result(self, job_id: str) -> Dict[str, Any]:
url = f"{self.base_url}/preprocessing/jobs/{job_id}/result"
r = self.session.get(url, timeout=self.timeout)
if r.status_code == 404:
return {"status": "not_found"}
r.raise_for_status()
return r.json() # JobResult (status + results según engine)
# ---- API que usa streamlit_app.py ----
def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]:
"""Devuelve {"job_id": "..."}"""
if self.use_mock:
return {"job_id": "mock-123"}
return self._post_jobs(video_path, modes)
def get_job(self, job_id: str) -> Dict[str, Any]:
"""
La UI espera algo del estilo:
{"status":"done","results":{"book":{...},"une":{...}}}
Adaptamos la respuesta de /result del engine a ese contrato.
"""
if self.use_mock:
# resultado inmediato de prueba
return {
"status": "done",
"results": {
"book": {"text": "Text d'exemple (book)", "mp3_bytes": b""},
"une": {"srt": "1\n00:00:00,000 --> 00:00:01,000\nExemple UNE\n", "mp3_bytes": b""},
}
}
# El endpoint /status ya devuelve los resultados cuando el job está completado
st = self._get_status(job_id)
status = st.get("status", "unknown")
if status in {"queued", "processing"}:
return {"status": status}
# Si el status indica completado, los resultados ya vienen en st
if "results" in st:
_log(f" -> Job done with results: {list(st.get('results', {}).keys())}")
return {
"status": status,
"results": st.get("results", {}),
}
# Fallback: si no hay results en status, devolver status tal cual
_log(f" -> Job status={status}, no results found in response")
return {"status": status, "results": {}}
def tts_matxa(self, text: str, voice: str = "central/grau") -> dict:
"""
Llama al space 'tts' para sintetizar audio.
Usa /tts/text para textos cortos (<480 chars) o /tts/text_long para textos largos.
Args:
text (str): Texto a sintetizar.
voice (str): Voz de Matxa a usar (p.ej. 'central/grau').
Returns:
dict: {'mp3_bytes': bytes} o {'error': str}
"""
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
# Usar endpoint apropiado según la longitud del texto
if len(text) > 480:
url = f"{self.tts_url.rstrip('/')}/tts/text_long"
else:
url = f"{self.tts_url.rstrip('/')}/tts/text"
data = {
"texto": text,
"voice": voice,
"formato": "mp3"
}
try:
r = self.session.post(url, data=data, timeout=self.timeout * 2)
r.raise_for_status()
return {"mp3_bytes": r.content}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def generate_une_ad_audio_from_srt(self, srt_text: str, voice: str = "central/grau") -> dict:
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
url = f"{self.tts_url.rstrip('/')}/tts/srt_ad_audio"
files = {
"srt": ("une_ad.srt", srt_text.encode("utf-8"), "text/plain"),
}
data = {
"voice": voice,
"ad_format": "mp3",
}
try:
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
return {"mp3_bytes": r.content}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def import_databases(self) -> dict:
"""Descarga todas las BDs del engine (/data/db) como ZIP.
Endpoint: GET /db/download_all_db_files
Retorna: {"zip_bytes": bytes} o {"error": str}
"""
token = os.getenv("VEUREU_TOKEN", "")
url = f"{self.base_url}/db/download_all_db_files"
try:
r = self.session.get(url, params={"token": token}, timeout=self.timeout * 2)
r.raise_for_status()
# El endpoint devuelve un ZIP binario
return {"zip_bytes": r.content}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# --- Upload original video to engine media storage ---
def upload_original_video(self, video_bytes: bytes, video_name: str) -> dict:
"""Sube el vídeo original al engine para procesamiento posterior.
Endpoint: POST /media/upload_original_video
El engine calcula el SHA1 y lo guarda en /data/media/<sha1>/clip/<video_name>
"""
url = f"{self.base_url}/media/upload_original_video"
token = os.getenv("VEUREU_TOKEN", "")
try:
files = {"video": (video_name, video_bytes, "video/mp4")}
params = {"token": token}
r = self.session.post(url, files=files, params=params, timeout=self.timeout * 5)
r.raise_for_status()
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# --- Initial transcription (generate_initial_srt_and_info + downloads) ---
def generate_initial_srt_and_info(self, sha1sum: str) -> dict:
"""Lanza el pipeline inicial de transcripció al engine.
Endpoint: POST /transcription/generate_initial_srt_and_info
Params: sha1, token (HF_TOKEN)
"""
url = f"{self.base_url}/transcription/generate_initial_srt_and_info"
# Usar siempre HF_TOKEN per validar contra engine
shared_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if shared_token:
params["token"] = shared_token
try:
r = self.session.post(url, params=params, timeout=self.timeout * 10)
r.raise_for_status()
# El backend pot retornar text pla o JSON; ho encapsulem sempre com dict
if r.headers.get("content-type", "").startswith("application/json"):
body = r.json()
else:
body = {"srt": r.text or ""}
body.setdefault("status", "ok")
return body
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def download_initial_srt(self, sha1sum: str) -> dict:
"""Descarrega l'initial.srt generat pel pipeline inicial.
Endpoint: GET /transcription/download_initial_srt
"""
url = f"{self.base_url}/transcription/download_initial_srt"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.get(url, params=params, timeout=self.timeout * 5)
r.raise_for_status()
# El backend retorna un fitxer de text (SRT)
return {"text": r.text or ""}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def download_initial_info(self, sha1sum: str) -> dict:
"""Descarrega l'info.json inicial associat al vídeo.
Endpoint: GET /transcription/download_initial_info
"""
url = f"{self.base_url}/transcription/download_initial_info"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.get(url, params=params, timeout=self.timeout * 5)
r.raise_for_status()
return {"text": r.text or ""}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# --- Salamandra pipeline (result.srt + free_narration.txt) ---
def generate_salamandra_result(self, sha1sum: str) -> dict:
"""Orquestra la generació dels fitxers de sortida de Salamandra.
Endpoint: POST /salamandra/generate_salamadra_result
"""
url = f"{self.base_url}/salamandra/generate_salamandra_result"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.post(url, params=params, timeout=self.timeout * 20)
r.raise_for_status()
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def download_salamandra_srt(self, sha1sum: str) -> dict:
"""Descarrega el result.srt de Salamandra.
Endpoint: GET /salamandra/download_salamadra_srt
"""
url = f"{self.base_url}/salamandra/download_salamadra_srt"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.get(url, params=params, timeout=self.timeout * 5)
r.raise_for_status()
return {"text": r.text or ""}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def download_salamandra_free_narration(self, sha1sum: str) -> dict:
"""Descarrega el free_narration.txt de Salamandra.
Endpoint: GET /salamandra/download_salamadra_free_narration
"""
url = f"{self.base_url}/salamandra/download_salamadra_free_narration"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.get(url, params=params, timeout=self.timeout * 5)
r.raise_for_status()
return {"text": r.text or ""}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# --- MoE pipeline (result.srt + free_narration.txt) ---
def generate_moe_result(self, sha1sum: str) -> dict:
"""Orquestra la generació dels fitxers de sortida de MoE.
Endpoint: POST /moe/generate_moe_result
"""
url = f"{self.base_url}/moe/generate_moe_result"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.post(url, params=params, timeout=self.timeout * 20)
r.raise_for_status()
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def download_moe_srt(self, sha1sum: str) -> dict:
"""Descarrega el result.srt de MoE.
Endpoint: GET /moe/download_moe_srt
"""
url = f"{self.base_url}/moe/download_moe_srt"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.get(url, params=params, timeout=self.timeout * 5)
r.raise_for_status()
return {"text": r.text or ""}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def download_moe_free_narration(self, sha1sum: str) -> dict:
"""Descarrega el free_narration.txt de MoE.
Endpoint: GET /moe/download_moe_free_narration
"""
url = f"{self.base_url}/preprocessing/moe/download_moe_free_narration"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.get(url, params=params, timeout=self.timeout * 5)
r.raise_for_status()
return {"text": r.text or ""}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def upload_embeddings(self, video_hash: str, embeddings_json: dict) -> dict:
"""Puja el JSON de càsting (faces+voices) com a embeddings al backend engine.
Utilitza l'endpoint /embeddings/upload_embeddings per als dos tipus ('faces' i 'voices').
"""
url = f"{self.base_url}/embeddings/upload_embeddings"
shared_token = os.getenv("VEUREU_TOKEN")
# Serialitzar un sol cop el JSON complet de càsting
try:
payload_bytes = json.dumps(embeddings_json, ensure_ascii=False).encode("utf-8")
except Exception as e:
return {"error": f"Error serialitzant embeddings_json: {e}"}
results: dict[str, Any] = {}
for embedding_type in ("faces", "voices"):
params = {
"embedding_type": embedding_type,
"video_hash": video_hash,
}
if shared_token:
params["token"] = shared_token
files = {
"file": ("embeddings.json", payload_bytes, "application/json"),
}
try:
r = self.session.post(url, params=params, files=files, timeout=self.timeout * 2)
r.raise_for_status()
results[embedding_type] = r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"}
except requests.exceptions.RequestException as e:
results[embedding_type] = {"error": str(e)}
return results
def import_media(self, sha1sum: str) -> dict:
url = f"{self.base_url}/preprocessing/import_media/{sha1sum}"
try:
r = self.session.get(url, timeout=self.timeout * 5)
r.raise_for_status()
return {"zip_bytes": r.content}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def import_media_version(self, sha1sum: str, version: str) -> dict:
url = f"{self.base_url}/import_media_version/{sha1sum}/{version}"
try:
r = self.session.get(url, timeout=self.timeout * 5)
r.raise_for_status()
return {"zip_bytes": r.content}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# ---- Pending videos (peding_videos) ----
def upload_pending_video(self, video_bytes: bytes, filename: str) -> dict:
"""Sube un vídeo pendiente al engine (carpeta /data/peding_videos).
Usa el endpoint POST /peding_videos/upload_pending_video.
"""
url = f"{self.base_url}/pending_videos/upload_pending_video"
files = {"video": (filename, io.BytesIO(video_bytes), "video/mp4")}
# El backend engine requereix un token de query (?token=...) validat contra HF_TOKEN
hf_token = os.getenv("VEUREU_TOKEN")
params = {"token": hf_token} if hf_token else {}
try:
r = self.session.post(url, params=params, files=files, timeout=self.timeout * 5)
r.raise_for_status()
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def list_pending_videos(self) -> dict:
"""Llista els vídeos pendents al backend (endpoint GET /peding_videos/list_peding_videos)."""
url = f"{self.base_url}/pending_videos/list_pending_videos"
hf_token = os.getenv("VEUREU_TOKEN")
params = {"token": hf_token} if hf_token else {}
try:
r = self.session.get(url, params=params, timeout=self.timeout * 5)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def download_pending_video(self, sha1sum: str) -> dict:
"""Descarrega un vídeo pendent per sha1 (GET /peding_videos/download_peding_video)."""
url = f"{self.base_url}/pending_videos/download_pending_video"
hf_token = os.getenv("VEUREU_TOKEN")
params = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.get(url, params=params, timeout=self.timeout * 5)
r.raise_for_status()
return {"video_bytes": r.content}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def update_databases(self, payload: dict) -> dict:
"""Envia les sentències SQL generades a l'endpoint /update_databases."""
url = f"{self.base_url}/update_databases"
try:
r = self.session.post(url, json=payload, timeout=self.timeout * 5)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def export_media(self, zip_bytes: bytes) -> dict:
"""Envia un ZIP amb els nous vídeos a l'endpoint /export_media."""
url = f"{self.base_url}/export_media"
files = {"media_zip": ("media_export.zip", zip_bytes, "application/zip")}
try:
r = self.session.post(url, files=files, timeout=self.timeout * 10)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def generate_audiodescription(self, video_bytes: bytes, video_name: str) -> dict:
"""Llama al endpoint del engine /generate_audiodescription con un MP4 en memoria."""
url = f"{self.base_url}/preprocessing/generate_audiodescription"
try:
files = {
"video": (video_name or "video.mp4", video_bytes, "video/mp4")
}
r = self.session.post(url, files=files, timeout=self.timeout * 10)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def generate_salamandra_ad_from_sha1(self, sha1sum: str) -> dict:
"""Genera l'SRT d'audiodescripció (Salamandra) a partir del SHA1 del vídeo.
Crida al endpoint /transcription/generate_srt del engine, que retorna
directament el contingut de l'SRT com a text pla. Aquest mètode embolica
la resposta en un dict compatible amb la UI existent:
{"status": "done", "results": {"une_srt": "...", "free_text": ""}}
"""
url = f"{self.base_url}/transcription/generate_srt"
hf_token = os.getenv("VEUREU_TOKEN")
params: dict[str, Any] = {"sha1": sha1sum}
if hf_token:
params["token"] = hf_token
try:
r = self.session.post(url, params=params, timeout=self.timeout * 10)
r.raise_for_status()
srt_text = r.text or ""
return {
"status": "done",
"results": {
"une_srt": srt_text,
"free_text": "",
},
}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def finalize_casting(self, payload: dict) -> dict:
"""Envía el càsting definitiu al engine para consolidar identidades e indexar."""
url = f"{self.base_url}/preprocessing/finalize_casting"
n_chars = len(payload.get("characters", []))
n_voices = len(payload.get("voice_clusters", []))
_log(f"POST {url} (characters={n_chars}, voice_clusters={n_voices})")
try:
r = self.session.post(url, json=payload, timeout=self.timeout * 5)
r.raise_for_status()
result = r.json()
_log(f" -> ok, face_identities={len(result.get('face_identities', []))}, voice_identities={len(result.get('voice_identities', []))}")
return result
except requests.exceptions.HTTPError as e:
resp = e.response
_log(f" -> HTTP ERROR {resp.status_code if resp else '?'}: {e}", "ERROR")
try:
# Try to include JSON error if present
return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": resp.json() if resp is not None else None}
except Exception:
# Fallback to text body
return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": (resp.text if resp is not None else None)}
except requests.exceptions.RequestException as e:
_log(f" -> REQUEST ERROR: {e}", "ERROR")
return {"error": str(e)}
def load_casting(self, faces_dir: str, voices_dir: str, db_dir: str, drop_collections: bool = False) -> dict:
"""Carga índices de caras y voces al motor de búsqueda Chroma del engine."""
url = f"{self.base_url}/preprocessing/load_casting"
_log(f"POST {url} (faces_dir={faces_dir})")
data = {
"faces_dir": faces_dir,
"voices_dir": voices_dir,
"db_dir": db_dir,
"drop_collections": str(1 if drop_collections else 0),
}
try:
r = self.session.post(url, data=data, timeout=self.timeout * 5)
r.raise_for_status()
result = r.json()
_log(f" -> ok={result.get('ok')}, faces={result.get('faces', 0)}, voices={result.get('voices', 0)}")
return result
except requests.exceptions.RequestException as e:
_log(f" -> ERROR: {e}", "ERROR")
return {"error": str(e)}
def rebuild_video_with_ad(self, video_path: str, srt_path: str, voice: str = "central/grau") -> dict:
"""
Llama al space 'tts' para reconstruir un vídeo con audiodescripció a partir de un SRT.
Usa el endpoint /tts/srt que devuelve un ZIP con el vídeo final.
Args:
video_path: Ruta al archivo de vídeo original
srt_path: Ruta al archivo SRT con las audiodescripciones
voice: Voz de Matxa (por defecto 'central/grau')
Returns:
dict: {'video_bytes': bytes} o {'error': str}
"""
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
url = f"{self.tts_url.rstrip('/')}/tts/srt"
try:
with open(video_path, 'rb') as video_file:
with open(srt_path, 'rb') as srt_file:
files = {
'video': (os.path.basename(video_path), video_file, 'video/mp4'),
'srt': (os.path.basename(srt_path), srt_file, 'application/x-subrip')
}
data = {
"voice": voice,
"ad_format": "mp3",
"include_final_mp4": "1"
}
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
# El servidor devuelve un ZIP, lo procesamos en memoria
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
# Buscamos el archivo video_con_ad.mp4 dentro del ZIP
for filename in z.namelist():
if filename.endswith('.mp4'):
video_bytes = z.read(filename)
return {"video_bytes": video_bytes}
# Si no se encuentra el MP4 en el ZIP
return {"error": "No se encontró el archivo de vídeo MP4 en la respuesta del servidor."}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
except zipfile.BadZipFile:
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."}
except Exception as e:
return {"error": str(e)}
def apply_refinement(
self,
*,
sha1sum: str | None = None,
version: str | None = None,
srt_content: str | None = None,
reflection_enabled: bool = True,
reflexion_enabled: bool = False,
introspection_enabled: bool = False,
) -> dict:
"""Aplica el pipeline de refinement multi-agent sobre un SRT.
Endpoint: POST /refinement/apply_refinement
Pot treballar de dues maneres:
- Passant sha1sum+version perquè el backend llegeixi l'SRT de les BDs
- Passant srt_content explícitament
"""
url = f"{self.base_url}/refinement/apply_refinement"
hf_token = os.getenv("VEUREU_TOKEN")
payload: dict[str, Any] = {
"reflection_enabled": bool(reflection_enabled),
"reflexion_enabled": bool(reflexion_enabled),
"introspection_enabled": bool(introspection_enabled),
}
if sha1sum is not None:
payload["sha1sum"] = sha1sum
if version is not None:
payload["version"] = version
if srt_content is not None:
payload["srt_content"] = srt_content
if hf_token:
payload["token"] = hf_token
try:
r = self.session.post(url, json=payload, timeout=self.timeout * 10)
r.raise_for_status()
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def refine_narration(self, dialogues_srt: str, frame_descriptions_json: str = "[]", config_path: str = "config.yaml") -> dict:
"""Llama al endpoint del engine /refine_narration para generar narrativa y/o SRT."""
url = f"{self.base_url}/refine_narration"
data = {
"dialogues_srt": dialogues_srt,
"frame_descriptions_json": frame_descriptions_json,
"config_path": config_path,
}
try:
r = self.session.post(url, data=data, timeout=self.timeout)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def create_initial_casting(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None,
face_max_groups: int = 3, face_min_cluster_size: int = 3,
voice_max_groups: int = 3, voice_min_cluster_size: int = 3,
max_frames: int = 100) -> dict:
"""
Llama al endpoint del space 'engine' para crear el 'initial casting'.
Envía el vídeo recién importado como archivo y los parámetros de clustering.
Args:
video_path: Path to video file (if reading from disk)
video_bytes: Video file bytes (if already in memory)
video_name: Name for the video file
face_max_groups: k-Target de clusters de cara (hierarchical)
face_min_cluster_size: Mida mínima de cada cluster de cara
voice_max_groups: k-Target de clusters de veu (hierarchical)
voice_min_cluster_size: Mida mínima de cada cluster de veu
max_frames: Maximum number of frames to process
"""
url = f"{self.base_url}/preprocessing/create_initial_casting"
_log(f"POST {url} (video={video_name or video_path}, max_frames={max_frames})")
try:
# Prepare file data
if video_bytes:
filename = video_name or "video.mp4"
files = {
"video": (filename, video_bytes, "video/mp4"),
}
elif video_path:
with open(video_path, "rb") as f:
files = {
"video": (os.path.basename(video_path), f.read(), "video/mp4"),
}
else:
return {"error": "Either video_path or video_bytes must be provided"}
data = {
"max_groups": str(face_max_groups),
"min_cluster_size": str(face_min_cluster_size),
"voice_max_groups": str(voice_max_groups),
"voice_min_cluster_size": str(voice_min_cluster_size),
"max_frames": str(max_frames),
}
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
result = r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True}
_log(f" -> job_id={result.get('job_id', '?')}")
return result
except requests.exceptions.RequestException as e:
_log(f" -> ERROR: {e}", "ERROR")
return {"error": str(e)}
except Exception as e:
_log(f" -> UNEXPECTED ERROR: {e}", "ERROR")
return {"error": f"Unexpected error: {str(e)}"}
def detect_scenes(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None,
max_groups: int = 3, min_cluster_size: int = 3,
scene_sensitivity: float = 0.5, frame_interval_sec: float = 0.5,
max_frames: int = 100) -> dict:
"""
Call engine /detect_scenes to compute scene clusters using hierarchical clustering on color histograms.
"""
url = f"{self.base_url}/preprocessing/detect_scenes"
_log(f"POST {url} (video={video_name or video_path}, max_groups={max_groups})")
try:
if video_bytes:
filename = video_name or "video.mp4"
files = {
"video": (filename, video_bytes, "video/mp4"),
}
elif video_path:
with open(video_path, "rb") as f:
files = {
"video": (os.path.basename(video_path), f.read(), "video/mp4"),
}
else:
return {"error": "Either video_path or video_bytes must be provided"}
data = {
"max_groups": str(max_groups),
"min_cluster_size": str(min_cluster_size),
"scene_sensitivity": str(scene_sensitivity),
"frame_interval_sec": str(frame_interval_sec),
"max_frames": str(max_frames),
}
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
result = r.json()
_log(f" -> scene_clusters={len(result.get('scene_clusters', []))}")
return result
except requests.exceptions.RequestException as e:
_log(f" -> ERROR: {e}", "ERROR")
return {"error": str(e)}
def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict:
"""
Genera un único MP3 a partir de un texto largo, usando el endpoint de SRT.
1. Convierte el texto en un SRT falso.
2. Llama a /tts/srt con el SRT.
3. Extrae el 'ad_master.mp3' del ZIP resultante.
"""
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
# 1. Crear un SRT falso en memoria
srt_content = ""
start_time = 0
for i, line in enumerate(text_content.strip().split('\n')):
line = line.strip()
if not line:
continue
# Asignar 5 segundos por línea, un valor simple
end_time = start_time + 5
def format_time(seconds):
h = int(seconds / 3600)
m = int((seconds % 3600) / 60)
s = int(seconds % 60)
ms = int((seconds - int(seconds)) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
srt_content += f"{i+1}\n"
srt_content += f"{format_time(start_time)} --> {format_time(end_time)}\n"
srt_content += f"{line}\n\n"
start_time = end_time
if not srt_content:
return {"error": "El texto proporcionado estaba vacío o no se pudo procesar."}
# 2. Llamar al endpoint /tts/srt
url = f"{self.tts_url.rstrip('/')}/tts/srt"
try:
files = {
'srt': ('fake_ad.srt', srt_content, 'application/x-subrip')
}
data = {"voice": voice, "ad_format": "mp3"}
r = requests.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
# 3. Extraer 'ad_master.mp3' del ZIP
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
for filename in z.namelist():
if filename == 'ad_master.mp3':
mp3_bytes = z.read(filename)
return {"mp3_bytes": mp3_bytes}
return {"error": "No se encontró 'ad_master.mp3' en la respuesta del servidor."}
except requests.exceptions.RequestException as e:
return {"error": f"Error llamando a la API de SRT: {e}"}
except zipfile.BadZipFile:
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."}
def tts_long_text(self, text: str, voice: str = "central/grau") -> dict:
"""
Llama al endpoint '/tts/text_long' para sintetizar un texto largo.
La API se encarga de todo el procesamiento.
"""
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
url = f"{self.tts_url.rstrip('/')}/tts/text_long"
data = {
"texto": text,
"voice": voice,
"formato": "mp3"
}
try:
# Usamos un timeout más largo por si el texto es muy extenso
r = requests.post(url, data=data, timeout=self.timeout * 10)
r.raise_for_status()
return {"mp3_bytes": r.content}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# ===========================
# Cliente para SVision Space
# ===========================
# Nombres catalanes comunes para asignar a personajes (deben coincidir con app.py)
def get_catalan_names():
"""Retorna llistes de noms catalans."""
noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert",
"Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"]
noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla",
"Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"]
return noms_home, noms_dona
def describe_image_with_svision(image_path: str, is_face: bool = True) -> Tuple[str, str]:
"""
Llama al space svision para describir una imagen (cara o escena).
Args:
image_path: Ruta absoluta a la imagen
is_face: True si es una cara, False si es una escena
Returns:
tuple (descripción_completa, nombre_abreviado)
"""
try:
from gradio_client import Client, handle_file
# Conectar al space svision con timeout generoso per al cold start de ZeroGPU
svision_url = os.getenv("SVISION_URL", "https://veureu-svision.hf.space")
client = Client(svision_url)
# Preparar prompt según el tipo
if is_face:
prompt = "Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta."
else:
prompt = "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals."
import time
start_time = time.time()
max_tokens = 256 if is_face else 128
max_attempts = int(os.getenv("SVISION_MAX_ATTEMPTS", "5"))
wait_seconds = int(os.getenv("SVISION_RETRY_WAIT", "5"))
result = None
last_error: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
result = client.predict(
handle_file(image_path),
prompt,
max_tokens,
0.7,
api_name="/describe"
)
if result and isinstance(result, str) and result.strip():
break
raise RuntimeError("Resposta buida de svision")
except Exception as exc:
last_error = exc
if attempt == max_attempts:
raise
time.sleep(wait_seconds)
wait_seconds = min(wait_seconds * 2, 40)
full_description = result.strip() if result else ""
# PASO 1: Eliminar el prompt original que puede aparecer en la respuesta
prompt_markers = [
"Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.",
"Descriu aquesta escena. Inclou: tipus de localització (interior/exterior), elements principals, ambient, il·luminació.",
"Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.",
"Descriu aquesta persona.",
"Descriu aquesta escena.",
]
for marker in prompt_markers:
if marker in full_description:
# Eliminar el prompt y todo lo que esté antes
parts = full_description.split(marker, 1)
if len(parts) > 1:
full_description = parts[1].strip()
# PASO 2: Limpiar prefijos no deseados de forma más agresiva
# Lista de prefijos comunes que aparecen
prefixes_to_remove = [
"user:", "user ", "user\n", "user\t",
"assistant:", "assistant ", "assistant\n", "assistant\t",
"User:", "User ", "User\n",
"Assistant:", "Assistant ", "Assistant\n",
"system:", "system ",
]
# Intentar limpiar múltiples veces por si hay varios prefijos
for _ in range(5): # Máximo 5 iteraciones
original = full_description
for prefix in prefixes_to_remove:
if full_description.lower().startswith(prefix.lower()):
full_description = full_description[len(prefix):].strip()
break
if original == full_description:
break # No hubo cambios, salir
# PASO 3: Limpiar espacios en blanco múltiples y saltos de línea al inicio
full_description = full_description.lstrip()
# PASO 4: Si empieza con salto de línea o tabulación, limpiar
while full_description and full_description[0] in ['\n', '\t', '\r', ' ']:
full_description = full_description[1:]
if not full_description:
return ("", "")
# Generar nombre aleatorio en catalán para caras
if is_face:
# Extraer características clave para el nombre
desc_lower = full_description.lower()
# Determinar género
is_female = any(word in desc_lower for word in ["dona", "noia", "nena", "femení", "femenina"])
# Seleccionar nombre aleatorio pero consistente (hash del path)
import hashlib
hash_val = int(hashlib.md5(image_path.encode()).hexdigest(), 16)
noms_home, noms_dona = get_catalan_names()
if is_female:
name_list = noms_dona
else:
name_list = noms_home
# Usar hash para selección consistente
short_name = name_list[hash_val % len(name_list)]
else:
# Para escenas, extraer primeras palabras clave
words = full_description.split()[:4]
short_name = " ".join(words).capitalize()
return (full_description, short_name)
except Exception as e:
return ("", "")
def generate_short_scene_name(description: str) -> str:
"""
Genera un nombre corto de escena (< 3 palabras) basándose en la descripción
usando el modelo schat (Salamandra-Instruct).
Args:
description: Descripción completa de la escena de svision
Returns:
Nombre corto de la escena (< 3 palabras) o string vacío si falla
"""
try:
# Importar gradio_client
from gradio_client import Client, handle_file
# URL del space schat
schat_url = os.getenv("SCHAT_URL", "https://veureu-schat.hf.space")
client = Client(schat_url)
# Preparar prompt
prompt = f"Basant-te en aquesta descripció d'una escena, genera un nom curt de menys de 3 paraules que la resumeixi:\n\n{description}\n\nNom de l'escena:"
# Llamar al endpoint /predict de schat
# Parámetros típicos: mensaje, historial, max_new_tokens, temperature, top_p, top_k, repetition_penalty
result = client.predict(
prompt, # mensaje
[], # historial vacío
256, # max_new_tokens
0.7, # temperature
0.9, # top_p
50, # top_k
1.0, # repetition_penalty
api_name="/predict"
)
# El resultado es una tupla (respuesta, historial)
if isinstance(result, tuple) and len(result) >= 1:
short_name = result[0].strip() if result[0] else ""
elif isinstance(result, str):
short_name = result.strip()
else:
short_name = ""
# Limpiar posibles comillas o puntuación extra
short_name = short_name.strip('"\'.,!?').strip()
# Limpiar prefijos no deseados
prefixes_to_remove = [
"Nom de l'escena:",
"nom de l'escena:",
"Escena:",
"escena:",
]
for prefix in prefixes_to_remove:
if short_name.lower().startswith(prefix.lower()):
short_name = short_name[len(prefix):].strip()
# Limitar a 3 palabras
words = short_name.split()
if len(words) > 3:
short_name = " ".join(words[:3])
return short_name
except Exception as e:
return ""