| |
|
| | import os
|
| | import requests
|
| | import base64
|
| | import zipfile
|
| | import io
|
| | from typing import Iterable, Dict, Any
|
| |
|
| | class APIClient:
|
| | """
|
| | Cliente para 'engine':
|
| | POST /jobs -> {"job_id": "..."}
|
| | GET /jobs/{job_id}/status -> {"status": "queued|processing|done|failed", ...}
|
| | GET /jobs/{job_id}/result -> JobResult {"book": {...}, "une": {...}, ...}
|
| | """
|
| | def __init__(self, base_url: str, use_mock: bool = False, data_dir: str | None = None, token: str | None = None, timeout: int = 180):
|
| | self.base_url = base_url.rstrip("/")
|
| |
|
| | self.tts_url = self.base_url
|
| | self.use_mock = use_mock
|
| | self.data_dir = data_dir
|
| | self.timeout = timeout
|
| | self.session = requests.Session()
|
| |
|
| | token = token or os.getenv("API_SHARED_TOKEN")
|
| | if token:
|
| | self.session.headers.update({"Authorization": f"Bearer {token}"})
|
| |
|
| |
|
| | def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]:
|
| | url = f"{self.base_url}/jobs"
|
| | files = {"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream")}
|
| | data = {"modes": ",".join(modes)}
|
| | r = self.session.post(url, files=files, data=data, timeout=self.timeout)
|
| | r.raise_for_status()
|
| | return r.json()
|
| |
|
| | def _get_status(self, job_id: str) -> Dict[str, Any]:
|
| | url = f"{self.base_url}/jobs/{job_id}/status"
|
| | r = self.session.get(url, timeout=self.timeout)
|
| | r.raise_for_status()
|
| | return r.json()
|
| |
|
| | def _get_result(self, job_id: str) -> Dict[str, Any]:
|
| | url = f"{self.base_url}/jobs/{job_id}/result"
|
| | r = self.session.get(url, timeout=self.timeout)
|
| | r.raise_for_status()
|
| | return r.json()
|
| |
|
| |
|
| | def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]:
|
| | """Devuelve {"job_id": "..."}"""
|
| | if self.use_mock:
|
| | return {"job_id": "mock-123"}
|
| | return self._post_jobs(video_path, modes)
|
| |
|
| | def get_job(self, job_id: str) -> Dict[str, Any]:
|
| | """
|
| | La UI espera algo del estilo:
|
| | {"status":"done","results":{"book":{...},"une":{...}}}
|
| | Adaptamos la respuesta de /result del engine a ese contrato.
|
| | """
|
| | if self.use_mock:
|
| |
|
| | return {
|
| | "status": "done",
|
| | "results": {
|
| | "book": {"text": "Text d'exemple (book)", "mp3_bytes": b""},
|
| | "une": {"srt": "1\n00:00:00,000 --> 00:00:01,000\nExemple UNE\n", "mp3_bytes": b""},
|
| | }
|
| | }
|
| |
|
| |
|
| | st = self._get_status(job_id)
|
| | if st.get("status") in {"queued", "processing"}:
|
| | return {"status": st.get("status", "queued")}
|
| |
|
| | res = self._get_result(job_id)
|
| |
|
| |
|
| | results = {}
|
| | if "book" in res:
|
| | results["book"] = {
|
| | "text": res["book"].get("text"),
|
| |
|
| |
|
| | }
|
| | if "une" in res:
|
| | results["une"] = {
|
| | "srt": res["une"].get("srt"),
|
| | }
|
| |
|
| | for k in ("book", "une"):
|
| | if k in res:
|
| | if "characters" in res[k]:
|
| | results[k]["characters"] = res[k]["characters"]
|
| | if "metrics" in res[k]:
|
| | results[k]["metrics"] = res[k]["metrics"]
|
| |
|
| | status = "done" if results else st.get("status", "unknown")
|
| | return {"status": status, "results": results}
|
| |
|
| |
|
| | def tts_matxa(self, text: str, voice: str = "central/grau") -> dict:
|
| | """
|
| | Llama al space 'tts' para sintetizar audio.
|
| |
|
| | Args:
|
| | text (str): Texto a sintetizar.
|
| | voice (str): Voz de Matxa a usar (p.ej. 'central/alvocat').
|
| |
|
| | Returns:
|
| | dict: {'mp3_data_url': 'data:audio/mpeg;base64,...'}
|
| | """
|
| | if not self.tts_url:
|
| | raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
|
| |
|
| | url = f"{self.tts_url.rstrip('/')}/tts/text"
|
| | data = {
|
| | "texto": text,
|
| | "voice": voice,
|
| | "formato": "mp3"
|
| | }
|
| |
|
| | try:
|
| | r = requests.post(url, data=data, timeout=self.timeout)
|
| | r.raise_for_status()
|
| |
|
| |
|
| | return {"mp3_bytes": r.content}
|
| |
|
| | except requests.exceptions.RequestException as e:
|
| | print(f"Error cridant a TTS: {e}")
|
| |
|
| | return {"error": str(e)}
|
| |
|
| |
|
| | def rebuild_video_with_ad(self, video_path: str, srt_path: str) -> dict:
|
| | """
|
| | Llama al space 'tts' para reconstruir un vídeo con audiodescripció a partir de un SRT.
|
| | El servidor devuelve un ZIP, y de ahí extraemos el MP4 final.
|
| | """
|
| | if not self.tts_url:
|
| | raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
|
| |
|
| | url = f"{self.tts_url.rstrip('/')}/tts/srt"
|
| |
|
| | try:
|
| | files = {
|
| | 'video': (os.path.basename(video_path), open(video_path, 'rb'), 'video/mp4'),
|
| | 'srt': (os.path.basename(srt_path), open(srt_path, 'rb'), 'application/x-subrip')
|
| | }
|
| | data = {"include_final_mp4": 1}
|
| |
|
| | r = requests.post(url, files=files, data=data, timeout=self.timeout * 5)
|
| | r.raise_for_status()
|
| |
|
| |
|
| | with zipfile.ZipFile(io.BytesIO(r.content)) as z:
|
| |
|
| | for filename in z.namelist():
|
| | if filename.endswith('.mp4'):
|
| | video_bytes = z.read(filename)
|
| | return {"video_bytes": video_bytes}
|
| |
|
| |
|
| | return {"error": "No se encontró el archivo de vídeo MP4 en la respuesta del servidor."}
|
| |
|
| | except requests.exceptions.RequestException as e:
|
| | print(f"Error cridant a la reconstrucció de vídeo: {e}")
|
| | return {"error": str(e)}
|
| | except zipfile.BadZipFile:
|
| | return {"error": "La respuesta del servidor no fue un archivo ZIP válido."}
|
| |
|
| |
|
| | def create_initial_casting(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, epsilon: float = 0.5, min_cluster_size: int = 2) -> dict:
|
| | """
|
| | Llama al endpoint del space 'engine' para crear el 'initial casting'.
|
| |
|
| | Envía el vídeo recién importado como archivo y los parámetros de clustering.
|
| |
|
| | Args:
|
| | video_path: Path to video file (if reading from disk)
|
| | video_bytes: Video file bytes (if already in memory)
|
| | video_name: Name for the video file
|
| | epsilon: Clustering epsilon parameter
|
| | min_cluster_size: Minimum cluster size parameter
|
| | """
|
| | url = f"{self.base_url}/create_initial_casting"
|
| | try:
|
| |
|
| | if video_bytes:
|
| | filename = video_name or "video.mp4"
|
| | files = {
|
| | "video": (filename, video_bytes, "video/mp4"),
|
| | }
|
| | elif video_path:
|
| | with open(video_path, "rb") as f:
|
| | files = {
|
| | "video": (os.path.basename(video_path), f.read(), "video/mp4"),
|
| | }
|
| | else:
|
| | return {"error": "Either video_path or video_bytes must be provided"}
|
| |
|
| | data = {
|
| | "epsilon": str(epsilon),
|
| | "min_cluster_size": str(min_cluster_size),
|
| | }
|
| | r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
|
| | r.raise_for_status()
|
| | return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True}
|
| | except requests.exceptions.RequestException as e:
|
| | return {"error": str(e)}
|
| | except Exception as e:
|
| | return {"error": f"Unexpected error: {str(e)}"}
|
| |
|
| | def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict:
|
| | """
|
| | Genera un único MP3 a partir de un texto largo, usando el endpoint de SRT.
|
| | 1. Convierte el texto en un SRT falso.
|
| | 2. Llama a /tts/srt con el SRT.
|
| | 3. Extrae el 'ad_master.mp3' del ZIP resultante.
|
| | """
|
| | if not self.tts_url:
|
| | raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
|
| |
|
| |
|
| | srt_content = ""
|
| | start_time = 0
|
| | for i, line in enumerate(text_content.strip().split('\n')):
|
| | line = line.strip()
|
| | if not line:
|
| | continue
|
| |
|
| | end_time = start_time + 5
|
| |
|
| | def format_time(seconds):
|
| | h = int(seconds / 3600)
|
| | m = int((seconds % 3600) / 60)
|
| | s = int(seconds % 60)
|
| | ms = int((seconds - int(seconds)) * 1000)
|
| | return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
|
| |
|
| | srt_content += f"{i+1}\n"
|
| | srt_content += f"{format_time(start_time)} --> {format_time(end_time)}\n"
|
| | srt_content += f"{line}\n\n"
|
| | start_time = end_time
|
| |
|
| | if not srt_content:
|
| | return {"error": "El texto proporcionado estaba vacío o no se pudo procesar."}
|
| |
|
| |
|
| | url = f"{self.tts_url.rstrip('/')}/tts/srt"
|
| | try:
|
| | files = {
|
| | 'srt': ('fake_ad.srt', srt_content, 'application/x-subrip')
|
| | }
|
| | data = {"voice": voice, "ad_format": "mp3"}
|
| |
|
| | r = requests.post(url, files=files, data=data, timeout=self.timeout * 5)
|
| | r.raise_for_status()
|
| |
|
| |
|
| | with zipfile.ZipFile(io.BytesIO(r.content)) as z:
|
| | for filename in z.namelist():
|
| | if filename == 'ad_master.mp3':
|
| | mp3_bytes = z.read(filename)
|
| | return {"mp3_bytes": mp3_bytes}
|
| |
|
| | return {"error": "No se encontró 'ad_master.mp3' en la respuesta del servidor."}
|
| |
|
| | except requests.exceptions.RequestException as e:
|
| | return {"error": f"Error llamando a la API de SRT: {e}"}
|
| | except zipfile.BadZipFile:
|
| | return {"error": "La respuesta del servidor no fue un archivo ZIP válido."}
|
| |
|
| |
|
| | def tts_long_text(self, text: str, voice: str = "central/grau") -> dict:
|
| | """
|
| | Llama al endpoint '/tts/text_long' para sintetizar un texto largo.
|
| | La API se encarga de todo el procesamiento.
|
| | """
|
| | if not self.tts_url:
|
| | raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
|
| |
|
| | url = f"{self.tts_url.rstrip('/')}/tts/text_long"
|
| | data = {
|
| | "texto": text,
|
| | "voice": voice,
|
| | "formato": "mp3"
|
| | }
|
| |
|
| | try:
|
| |
|
| | r = requests.post(url, data=data, timeout=self.timeout * 10)
|
| | r.raise_for_status()
|
| | return {"mp3_bytes": r.content}
|
| |
|
| | except requests.exceptions.RequestException as e:
|
| | print(f"Error cridant a TTS per a text llarg: {e}")
|
| | return {"error": str(e)}
|
| |
|
| |
|
| |
|