|
|
|
|
|
import os |
|
|
import requests |
|
|
import base64 |
|
|
import zipfile |
|
|
import io |
|
|
import json |
|
|
from datetime import datetime |
|
|
from typing import Iterable, Dict, Any, Tuple |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
def _log(msg: str, level: str = "INFO") -> None: |
|
|
"""Log a message with timestamp for API calls debugging.""" |
|
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
print(f"[{ts}] [API_CLIENT] [{level}] {msg}") |
|
|
|
|
|
|
|
|
class APIClient: |
|
|
""" |
|
|
Cliente para 'engine': |
|
|
POST /jobs -> {"job_id": "..."} |
|
|
GET /jobs/{job_id}/status -> {"status": "queued|processing|done|failed", ...} |
|
|
GET /jobs/{job_id}/result -> JobResult {"book": {...}, "une": {...}, ...} |
|
|
""" |
|
|
def __init__(self, base_url: str, use_mock: bool = False, data_dir: str | None = None, token: str | None = None, timeout: int = 180, tts_url: str | None = None): |
|
|
|
|
|
env_engine_url = os.getenv("ENGINE_URL") |
|
|
self.base_url = (env_engine_url or base_url or "").rstrip("/") |
|
|
|
|
|
self.tts_url = tts_url or os.getenv("API_TTS_URL", "https://veureu-tts.hf.space") |
|
|
self.use_mock = use_mock |
|
|
self.data_dir = data_dir |
|
|
self.timeout = timeout |
|
|
self.session = requests.Session() |
|
|
|
|
|
token = token or os.getenv("VEUREU_TOKEN") |
|
|
if token: |
|
|
self.session.headers.update({"Authorization": f"Bearer {token}"}) |
|
|
|
|
|
|
|
|
def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: |
|
|
url = f"{self.base_url}/preprocessing/jobs" |
|
|
files = {"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream")} |
|
|
data = {"modes": ",".join(modes)} |
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
|
|
|
def _get_status(self, job_id: str) -> Dict[str, Any]: |
|
|
url = f"{self.base_url}/preprocessing/jobs/{job_id}/status" |
|
|
_log(f"GET {url}") |
|
|
r = self.session.get(url, timeout=self.timeout) |
|
|
if r.status_code == 404: |
|
|
_log(f" -> 404 not_found", "WARN") |
|
|
return {"status": "not_found"} |
|
|
r.raise_for_status() |
|
|
result = r.json() |
|
|
_log(f" -> status={result.get('status', 'unknown')}") |
|
|
return result |
|
|
|
|
|
def _get_result(self, job_id: str) -> Dict[str, Any]: |
|
|
url = f"{self.base_url}/preprocessing/jobs/{job_id}/result" |
|
|
r = self.session.get(url, timeout=self.timeout) |
|
|
if r.status_code == 404: |
|
|
return {"status": "not_found"} |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
|
|
|
|
|
|
def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: |
|
|
"""Devuelve {"job_id": "..."}""" |
|
|
if self.use_mock: |
|
|
return {"job_id": "mock-123"} |
|
|
return self._post_jobs(video_path, modes) |
|
|
|
|
|
def get_job(self, job_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
La UI espera algo del estilo: |
|
|
{"status":"done","results":{"book":{...},"une":{...}}} |
|
|
Adaptamos la respuesta de /result del engine a ese contrato. |
|
|
""" |
|
|
if self.use_mock: |
|
|
|
|
|
return { |
|
|
"status": "done", |
|
|
"results": { |
|
|
"book": {"text": "Text d'exemple (book)", "mp3_bytes": b""}, |
|
|
"une": {"srt": "1\n00:00:00,000 --> 00:00:01,000\nExemple UNE\n", "mp3_bytes": b""}, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
st = self._get_status(job_id) |
|
|
status = st.get("status", "unknown") |
|
|
|
|
|
if status in {"queued", "processing"}: |
|
|
return {"status": status} |
|
|
|
|
|
|
|
|
if "results" in st: |
|
|
_log(f" -> Job done with results: {list(st.get('results', {}).keys())}") |
|
|
return { |
|
|
"status": status, |
|
|
"results": st.get("results", {}), |
|
|
} |
|
|
|
|
|
|
|
|
_log(f" -> Job status={status}, no results found in response") |
|
|
return {"status": status, "results": {}} |
|
|
|
|
|
|
|
|
def tts_matxa(self, text: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Llama al space 'tts' para sintetizar audio. |
|
|
Usa /tts/text para textos cortos (<480 chars) o /tts/text_long para textos largos. |
|
|
|
|
|
Args: |
|
|
text (str): Texto a sintetizar. |
|
|
voice (str): Voz de Matxa a usar (p.ej. 'central/grau'). |
|
|
|
|
|
Returns: |
|
|
dict: {'mp3_bytes': bytes} o {'error': str} |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
|
|
|
if len(text) > 480: |
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text_long" |
|
|
else: |
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text" |
|
|
|
|
|
data = { |
|
|
"texto": text, |
|
|
"voice": voice, |
|
|
"formato": "mp3" |
|
|
} |
|
|
|
|
|
try: |
|
|
r = self.session.post(url, data=data, timeout=self.timeout * 2) |
|
|
r.raise_for_status() |
|
|
return {"mp3_bytes": r.content} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def generate_une_ad_audio_from_srt(self, srt_text: str, voice: str = "central/grau") -> dict: |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt_ad_audio" |
|
|
|
|
|
files = { |
|
|
"srt": ("une_ad.srt", srt_text.encode("utf-8"), "text/plain"), |
|
|
} |
|
|
data = { |
|
|
"voice": voice, |
|
|
"ad_format": "mp3", |
|
|
} |
|
|
|
|
|
try: |
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"mp3_bytes": r.content} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def import_databases(self) -> dict: |
|
|
"""Descarga todas las BDs del engine (/data/db) como ZIP. |
|
|
|
|
|
Endpoint: GET /db/download_all_db_files |
|
|
Retorna: {"zip_bytes": bytes} o {"error": str} |
|
|
""" |
|
|
|
|
|
token = os.getenv("VEUREU_TOKEN", "") |
|
|
url = f"{self.base_url}/db/download_all_db_files" |
|
|
try: |
|
|
r = self.session.get(url, params={"token": token}, timeout=self.timeout * 2) |
|
|
r.raise_for_status() |
|
|
|
|
|
return {"zip_bytes": r.content} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_original_video(self, video_bytes: bytes, video_name: str) -> dict: |
|
|
"""Sube el vídeo original al engine para procesamiento posterior. |
|
|
|
|
|
Endpoint: POST /media/upload_original_video |
|
|
El engine calcula el SHA1 y lo guarda en /data/media/<sha1>/clip/<video_name> |
|
|
""" |
|
|
url = f"{self.base_url}/media/upload_original_video" |
|
|
token = os.getenv("VEUREU_TOKEN", "") |
|
|
try: |
|
|
files = {"video": (video_name, video_bytes, "video/mp4")} |
|
|
params = {"token": token} |
|
|
r = self.session.post(url, files=files, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
def generate_initial_srt_and_info(self, sha1sum: str) -> dict: |
|
|
"""Lanza el pipeline inicial de transcripció al engine. |
|
|
|
|
|
Endpoint: POST /transcription/generate_initial_srt_and_info |
|
|
Params: sha1, token (HF_TOKEN) |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/transcription/generate_initial_srt_and_info" |
|
|
|
|
|
shared_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if shared_token: |
|
|
params["token"] = shared_token |
|
|
|
|
|
try: |
|
|
r = self.session.post(url, params=params, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
|
|
|
if r.headers.get("content-type", "").startswith("application/json"): |
|
|
body = r.json() |
|
|
else: |
|
|
body = {"srt": r.text or ""} |
|
|
body.setdefault("status", "ok") |
|
|
return body |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def download_initial_srt(self, sha1sum: str) -> dict: |
|
|
"""Descarrega l'initial.srt generat pel pipeline inicial. |
|
|
|
|
|
Endpoint: GET /transcription/download_initial_srt |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/transcription/download_initial_srt" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.get(url, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
|
|
|
return {"text": r.text or ""} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def download_initial_info(self, sha1sum: str) -> dict: |
|
|
"""Descarrega l'info.json inicial associat al vídeo. |
|
|
|
|
|
Endpoint: GET /transcription/download_initial_info |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/transcription/download_initial_info" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.get(url, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"text": r.text or ""} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_salamandra_result(self, sha1sum: str) -> dict: |
|
|
"""Orquestra la generació dels fitxers de sortida de Salamandra. |
|
|
|
|
|
Endpoint: POST /salamandra/generate_salamadra_result |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/salamandra/generate_salamandra_result" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.post(url, params=params, timeout=self.timeout * 20) |
|
|
r.raise_for_status() |
|
|
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def download_salamandra_srt(self, sha1sum: str) -> dict: |
|
|
"""Descarrega el result.srt de Salamandra. |
|
|
|
|
|
Endpoint: GET /salamandra/download_salamadra_srt |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/salamandra/download_salamadra_srt" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.get(url, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"text": r.text or ""} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def download_salamandra_free_narration(self, sha1sum: str) -> dict: |
|
|
"""Descarrega el free_narration.txt de Salamandra. |
|
|
|
|
|
Endpoint: GET /salamandra/download_salamadra_free_narration |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/salamandra/download_salamadra_free_narration" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.get(url, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"text": r.text or ""} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_moe_result(self, sha1sum: str) -> dict: |
|
|
"""Orquestra la generació dels fitxers de sortida de MoE. |
|
|
|
|
|
Endpoint: POST /moe/generate_moe_result |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/moe/generate_moe_result" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.post(url, params=params, timeout=self.timeout * 20) |
|
|
r.raise_for_status() |
|
|
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def download_moe_srt(self, sha1sum: str) -> dict: |
|
|
"""Descarrega el result.srt de MoE. |
|
|
|
|
|
Endpoint: GET /moe/download_moe_srt |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/moe/download_moe_srt" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.get(url, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"text": r.text or ""} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def download_moe_free_narration(self, sha1sum: str) -> dict: |
|
|
"""Descarrega el free_narration.txt de MoE. |
|
|
|
|
|
Endpoint: GET /moe/download_moe_free_narration |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/preprocessing/moe/download_moe_free_narration" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.get(url, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"text": r.text or ""} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def upload_embeddings(self, video_hash: str, embeddings_json: dict) -> dict: |
|
|
"""Puja el JSON de càsting (faces+voices) com a embeddings al backend engine. |
|
|
|
|
|
Utilitza l'endpoint /embeddings/upload_embeddings per als dos tipus ('faces' i 'voices'). |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/embeddings/upload_embeddings" |
|
|
shared_token = os.getenv("VEUREU_TOKEN") |
|
|
|
|
|
|
|
|
try: |
|
|
payload_bytes = json.dumps(embeddings_json, ensure_ascii=False).encode("utf-8") |
|
|
except Exception as e: |
|
|
return {"error": f"Error serialitzant embeddings_json: {e}"} |
|
|
|
|
|
results: dict[str, Any] = {} |
|
|
|
|
|
for embedding_type in ("faces", "voices"): |
|
|
params = { |
|
|
"embedding_type": embedding_type, |
|
|
"video_hash": video_hash, |
|
|
} |
|
|
if shared_token: |
|
|
params["token"] = shared_token |
|
|
|
|
|
files = { |
|
|
"file": ("embeddings.json", payload_bytes, "application/json"), |
|
|
} |
|
|
|
|
|
try: |
|
|
r = self.session.post(url, params=params, files=files, timeout=self.timeout * 2) |
|
|
r.raise_for_status() |
|
|
results[embedding_type] = r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} |
|
|
except requests.exceptions.RequestException as e: |
|
|
results[embedding_type] = {"error": str(e)} |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def import_media(self, sha1sum: str) -> dict: |
|
|
url = f"{self.base_url}/preprocessing/import_media/{sha1sum}" |
|
|
try: |
|
|
r = self.session.get(url, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"zip_bytes": r.content} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def import_media_version(self, sha1sum: str, version: str) -> dict: |
|
|
url = f"{self.base_url}/import_media_version/{sha1sum}/{version}" |
|
|
try: |
|
|
r = self.session.get(url, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"zip_bytes": r.content} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_pending_video(self, video_bytes: bytes, filename: str) -> dict: |
|
|
"""Sube un vídeo pendiente al engine (carpeta /data/peding_videos). |
|
|
|
|
|
Usa el endpoint POST /peding_videos/upload_pending_video. |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/pending_videos/upload_pending_video" |
|
|
files = {"video": (filename, io.BytesIO(video_bytes), "video/mp4")} |
|
|
|
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params = {"token": hf_token} if hf_token else {} |
|
|
try: |
|
|
r = self.session.post(url, params=params, files=files, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def list_pending_videos(self) -> dict: |
|
|
"""Llista els vídeos pendents al backend (endpoint GET /peding_videos/list_peding_videos).""" |
|
|
|
|
|
url = f"{self.base_url}/pending_videos/list_pending_videos" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params = {"token": hf_token} if hf_token else {} |
|
|
try: |
|
|
r = self.session.get(url, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def download_pending_video(self, sha1sum: str) -> dict: |
|
|
"""Descarrega un vídeo pendent per sha1 (GET /peding_videos/download_peding_video).""" |
|
|
|
|
|
url = f"{self.base_url}/pending_videos/download_pending_video" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
try: |
|
|
r = self.session.get(url, params=params, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return {"video_bytes": r.content} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def update_databases(self, payload: dict) -> dict: |
|
|
"""Envia les sentències SQL generades a l'endpoint /update_databases.""" |
|
|
|
|
|
url = f"{self.base_url}/update_databases" |
|
|
try: |
|
|
r = self.session.post(url, json=payload, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def export_media(self, zip_bytes: bytes) -> dict: |
|
|
"""Envia un ZIP amb els nous vídeos a l'endpoint /export_media.""" |
|
|
|
|
|
url = f"{self.base_url}/export_media" |
|
|
files = {"media_zip": ("media_export.zip", zip_bytes, "application/zip")} |
|
|
try: |
|
|
r = self.session.post(url, files=files, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def generate_audiodescription(self, video_bytes: bytes, video_name: str) -> dict: |
|
|
"""Llama al endpoint del engine /generate_audiodescription con un MP4 en memoria.""" |
|
|
url = f"{self.base_url}/preprocessing/generate_audiodescription" |
|
|
try: |
|
|
files = { |
|
|
"video": (video_name or "video.mp4", video_bytes, "video/mp4") |
|
|
} |
|
|
r = self.session.post(url, files=files, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def generate_salamandra_ad_from_sha1(self, sha1sum: str) -> dict: |
|
|
"""Genera l'SRT d'audiodescripció (Salamandra) a partir del SHA1 del vídeo. |
|
|
|
|
|
Crida al endpoint /transcription/generate_srt del engine, que retorna |
|
|
directament el contingut de l'SRT com a text pla. Aquest mètode embolica |
|
|
la resposta en un dict compatible amb la UI existent: |
|
|
|
|
|
{"status": "done", "results": {"une_srt": "...", "free_text": ""}} |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/transcription/generate_srt" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
params: dict[str, Any] = {"sha1": sha1sum} |
|
|
if hf_token: |
|
|
params["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.post(url, params=params, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
srt_text = r.text or "" |
|
|
return { |
|
|
"status": "done", |
|
|
"results": { |
|
|
"une_srt": srt_text, |
|
|
"free_text": "", |
|
|
}, |
|
|
} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def finalize_casting(self, payload: dict) -> dict: |
|
|
"""Envía el càsting definitiu al engine para consolidar identidades e indexar.""" |
|
|
url = f"{self.base_url}/preprocessing/finalize_casting" |
|
|
n_chars = len(payload.get("characters", [])) |
|
|
n_voices = len(payload.get("voice_clusters", [])) |
|
|
_log(f"POST {url} (characters={n_chars}, voice_clusters={n_voices})") |
|
|
try: |
|
|
r = self.session.post(url, json=payload, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
result = r.json() |
|
|
_log(f" -> ok, face_identities={len(result.get('face_identities', []))}, voice_identities={len(result.get('voice_identities', []))}") |
|
|
return result |
|
|
except requests.exceptions.HTTPError as e: |
|
|
resp = e.response |
|
|
_log(f" -> HTTP ERROR {resp.status_code if resp else '?'}: {e}", "ERROR") |
|
|
try: |
|
|
|
|
|
return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": resp.json() if resp is not None else None} |
|
|
except Exception: |
|
|
|
|
|
return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": (resp.text if resp is not None else None)} |
|
|
except requests.exceptions.RequestException as e: |
|
|
_log(f" -> REQUEST ERROR: {e}", "ERROR") |
|
|
return {"error": str(e)} |
|
|
|
|
|
def load_casting(self, faces_dir: str, voices_dir: str, db_dir: str, drop_collections: bool = False) -> dict: |
|
|
"""Carga índices de caras y voces al motor de búsqueda Chroma del engine.""" |
|
|
url = f"{self.base_url}/preprocessing/load_casting" |
|
|
_log(f"POST {url} (faces_dir={faces_dir})") |
|
|
data = { |
|
|
"faces_dir": faces_dir, |
|
|
"voices_dir": voices_dir, |
|
|
"db_dir": db_dir, |
|
|
"drop_collections": str(1 if drop_collections else 0), |
|
|
} |
|
|
try: |
|
|
r = self.session.post(url, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
result = r.json() |
|
|
_log(f" -> ok={result.get('ok')}, faces={result.get('faces', 0)}, voices={result.get('voices', 0)}") |
|
|
return result |
|
|
except requests.exceptions.RequestException as e: |
|
|
_log(f" -> ERROR: {e}", "ERROR") |
|
|
return {"error": str(e)} |
|
|
|
|
|
def rebuild_video_with_ad(self, video_path: str, srt_path: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Llama al space 'tts' para reconstruir un vídeo con audiodescripció a partir de un SRT. |
|
|
Usa el endpoint /tts/srt que devuelve un ZIP con el vídeo final. |
|
|
|
|
|
Args: |
|
|
video_path: Ruta al archivo de vídeo original |
|
|
srt_path: Ruta al archivo SRT con las audiodescripciones |
|
|
voice: Voz de Matxa (por defecto 'central/grau') |
|
|
|
|
|
Returns: |
|
|
dict: {'video_bytes': bytes} o {'error': str} |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt" |
|
|
|
|
|
try: |
|
|
with open(video_path, 'rb') as video_file: |
|
|
with open(srt_path, 'rb') as srt_file: |
|
|
files = { |
|
|
'video': (os.path.basename(video_path), video_file, 'video/mp4'), |
|
|
'srt': (os.path.basename(srt_path), srt_file, 'application/x-subrip') |
|
|
} |
|
|
data = { |
|
|
"voice": voice, |
|
|
"ad_format": "mp3", |
|
|
"include_final_mp4": "1" |
|
|
} |
|
|
|
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
|
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as z: |
|
|
|
|
|
for filename in z.namelist(): |
|
|
if filename.endswith('.mp4'): |
|
|
video_bytes = z.read(filename) |
|
|
return {"video_bytes": video_bytes} |
|
|
|
|
|
|
|
|
return {"error": "No se encontró el archivo de vídeo MP4 en la respuesta del servidor."} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
except zipfile.BadZipFile: |
|
|
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."} |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def apply_refinement( |
|
|
self, |
|
|
*, |
|
|
sha1sum: str | None = None, |
|
|
version: str | None = None, |
|
|
srt_content: str | None = None, |
|
|
reflection_enabled: bool = True, |
|
|
reflexion_enabled: bool = False, |
|
|
introspection_enabled: bool = False, |
|
|
) -> dict: |
|
|
"""Aplica el pipeline de refinement multi-agent sobre un SRT. |
|
|
|
|
|
Endpoint: POST /refinement/apply_refinement |
|
|
|
|
|
Pot treballar de dues maneres: |
|
|
- Passant sha1sum+version perquè el backend llegeixi l'SRT de les BDs |
|
|
- Passant srt_content explícitament |
|
|
""" |
|
|
|
|
|
url = f"{self.base_url}/refinement/apply_refinement" |
|
|
hf_token = os.getenv("VEUREU_TOKEN") |
|
|
|
|
|
payload: dict[str, Any] = { |
|
|
"reflection_enabled": bool(reflection_enabled), |
|
|
"reflexion_enabled": bool(reflexion_enabled), |
|
|
"introspection_enabled": bool(introspection_enabled), |
|
|
} |
|
|
|
|
|
if sha1sum is not None: |
|
|
payload["sha1sum"] = sha1sum |
|
|
if version is not None: |
|
|
payload["version"] = version |
|
|
if srt_content is not None: |
|
|
payload["srt_content"] = srt_content |
|
|
if hf_token: |
|
|
payload["token"] = hf_token |
|
|
|
|
|
try: |
|
|
r = self.session.post(url, json=payload, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"status": "ok"} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def refine_narration(self, dialogues_srt: str, frame_descriptions_json: str = "[]", config_path: str = "config.yaml") -> dict: |
|
|
"""Llama al endpoint del engine /refine_narration para generar narrativa y/o SRT.""" |
|
|
url = f"{self.base_url}/refine_narration" |
|
|
data = { |
|
|
"dialogues_srt": dialogues_srt, |
|
|
"frame_descriptions_json": frame_descriptions_json, |
|
|
"config_path": config_path, |
|
|
} |
|
|
try: |
|
|
r = self.session.post(url, data=data, timeout=self.timeout) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def create_initial_casting(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, |
|
|
face_max_groups: int = 3, face_min_cluster_size: int = 3, |
|
|
voice_max_groups: int = 3, voice_min_cluster_size: int = 3, |
|
|
max_frames: int = 100) -> dict: |
|
|
""" |
|
|
Llama al endpoint del space 'engine' para crear el 'initial casting'. |
|
|
|
|
|
Envía el vídeo recién importado como archivo y los parámetros de clustering. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file (if reading from disk) |
|
|
video_bytes: Video file bytes (if already in memory) |
|
|
video_name: Name for the video file |
|
|
face_max_groups: k-Target de clusters de cara (hierarchical) |
|
|
face_min_cluster_size: Mida mínima de cada cluster de cara |
|
|
voice_max_groups: k-Target de clusters de veu (hierarchical) |
|
|
voice_min_cluster_size: Mida mínima de cada cluster de veu |
|
|
max_frames: Maximum number of frames to process |
|
|
""" |
|
|
url = f"{self.base_url}/preprocessing/create_initial_casting" |
|
|
_log(f"POST {url} (video={video_name or video_path}, max_frames={max_frames})") |
|
|
try: |
|
|
|
|
|
if video_bytes: |
|
|
filename = video_name or "video.mp4" |
|
|
files = { |
|
|
"video": (filename, video_bytes, "video/mp4"), |
|
|
} |
|
|
elif video_path: |
|
|
with open(video_path, "rb") as f: |
|
|
files = { |
|
|
"video": (os.path.basename(video_path), f.read(), "video/mp4"), |
|
|
} |
|
|
else: |
|
|
return {"error": "Either video_path or video_bytes must be provided"} |
|
|
|
|
|
data = { |
|
|
"max_groups": str(face_max_groups), |
|
|
"min_cluster_size": str(face_min_cluster_size), |
|
|
"voice_max_groups": str(voice_max_groups), |
|
|
"voice_min_cluster_size": str(voice_min_cluster_size), |
|
|
"max_frames": str(max_frames), |
|
|
} |
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
result = r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True} |
|
|
_log(f" -> job_id={result.get('job_id', '?')}") |
|
|
return result |
|
|
except requests.exceptions.RequestException as e: |
|
|
_log(f" -> ERROR: {e}", "ERROR") |
|
|
return {"error": str(e)} |
|
|
except Exception as e: |
|
|
_log(f" -> UNEXPECTED ERROR: {e}", "ERROR") |
|
|
return {"error": f"Unexpected error: {str(e)}"} |
|
|
|
|
|
def detect_scenes(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, |
|
|
max_groups: int = 3, min_cluster_size: int = 3, |
|
|
scene_sensitivity: float = 0.5, frame_interval_sec: float = 0.5, |
|
|
max_frames: int = 100) -> dict: |
|
|
""" |
|
|
Call engine /detect_scenes to compute scene clusters using hierarchical clustering on color histograms. |
|
|
""" |
|
|
url = f"{self.base_url}/preprocessing/detect_scenes" |
|
|
_log(f"POST {url} (video={video_name or video_path}, max_groups={max_groups})") |
|
|
try: |
|
|
if video_bytes: |
|
|
filename = video_name or "video.mp4" |
|
|
files = { |
|
|
"video": (filename, video_bytes, "video/mp4"), |
|
|
} |
|
|
elif video_path: |
|
|
with open(video_path, "rb") as f: |
|
|
files = { |
|
|
"video": (os.path.basename(video_path), f.read(), "video/mp4"), |
|
|
} |
|
|
else: |
|
|
return {"error": "Either video_path or video_bytes must be provided"} |
|
|
|
|
|
data = { |
|
|
"max_groups": str(max_groups), |
|
|
"min_cluster_size": str(min_cluster_size), |
|
|
"scene_sensitivity": str(scene_sensitivity), |
|
|
"frame_interval_sec": str(frame_interval_sec), |
|
|
"max_frames": str(max_frames), |
|
|
} |
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
result = r.json() |
|
|
_log(f" -> scene_clusters={len(result.get('scene_clusters', []))}") |
|
|
return result |
|
|
except requests.exceptions.RequestException as e: |
|
|
_log(f" -> ERROR: {e}", "ERROR") |
|
|
return {"error": str(e)} |
|
|
|
|
|
def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Genera un único MP3 a partir de un texto largo, usando el endpoint de SRT. |
|
|
1. Convierte el texto en un SRT falso. |
|
|
2. Llama a /tts/srt con el SRT. |
|
|
3. Extrae el 'ad_master.mp3' del ZIP resultante. |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
|
|
|
srt_content = "" |
|
|
start_time = 0 |
|
|
for i, line in enumerate(text_content.strip().split('\n')): |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
end_time = start_time + 5 |
|
|
|
|
|
def format_time(seconds): |
|
|
h = int(seconds / 3600) |
|
|
m = int((seconds % 3600) / 60) |
|
|
s = int(seconds % 60) |
|
|
ms = int((seconds - int(seconds)) * 1000) |
|
|
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" |
|
|
|
|
|
srt_content += f"{i+1}\n" |
|
|
srt_content += f"{format_time(start_time)} --> {format_time(end_time)}\n" |
|
|
srt_content += f"{line}\n\n" |
|
|
start_time = end_time |
|
|
|
|
|
if not srt_content: |
|
|
return {"error": "El texto proporcionado estaba vacío o no se pudo procesar."} |
|
|
|
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt" |
|
|
try: |
|
|
files = { |
|
|
'srt': ('fake_ad.srt', srt_content, 'application/x-subrip') |
|
|
} |
|
|
data = {"voice": voice, "ad_format": "mp3"} |
|
|
|
|
|
r = requests.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
|
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as z: |
|
|
for filename in z.namelist(): |
|
|
if filename == 'ad_master.mp3': |
|
|
mp3_bytes = z.read(filename) |
|
|
return {"mp3_bytes": mp3_bytes} |
|
|
|
|
|
return {"error": "No se encontró 'ad_master.mp3' en la respuesta del servidor."} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": f"Error llamando a la API de SRT: {e}"} |
|
|
except zipfile.BadZipFile: |
|
|
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."} |
|
|
|
|
|
|
|
|
def tts_long_text(self, text: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Llama al endpoint '/tts/text_long' para sintetizar un texto largo. |
|
|
La API se encarga de todo el procesamiento. |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text_long" |
|
|
data = { |
|
|
"texto": text, |
|
|
"voice": voice, |
|
|
"formato": "mp3" |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
r = requests.post(url, data=data, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
return {"mp3_bytes": r.content} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_catalan_names(): |
|
|
"""Retorna llistes de noms catalans.""" |
|
|
noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert", |
|
|
"Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"] |
|
|
noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla", |
|
|
"Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"] |
|
|
return noms_home, noms_dona |
|
|
|
|
|
def describe_image_with_svision(image_path: str, is_face: bool = True) -> Tuple[str, str]: |
|
|
""" |
|
|
Llama al space svision para describir una imagen (cara o escena). |
|
|
|
|
|
Args: |
|
|
image_path: Ruta absoluta a la imagen |
|
|
is_face: True si es una cara, False si es una escena |
|
|
|
|
|
Returns: |
|
|
tuple (descripción_completa, nombre_abreviado) |
|
|
""" |
|
|
try: |
|
|
from gradio_client import Client, handle_file |
|
|
|
|
|
|
|
|
svision_url = os.getenv("SVISION_URL", "https://veureu-svision.hf.space") |
|
|
client = Client(svision_url) |
|
|
|
|
|
|
|
|
if is_face: |
|
|
prompt = "Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta." |
|
|
else: |
|
|
prompt = "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals." |
|
|
|
|
|
import time |
|
|
start_time = time.time() |
|
|
max_tokens = 256 if is_face else 128 |
|
|
max_attempts = int(os.getenv("SVISION_MAX_ATTEMPTS", "5")) |
|
|
wait_seconds = int(os.getenv("SVISION_RETRY_WAIT", "5")) |
|
|
result = None |
|
|
last_error: Exception | None = None |
|
|
|
|
|
for attempt in range(1, max_attempts + 1): |
|
|
try: |
|
|
result = client.predict( |
|
|
handle_file(image_path), |
|
|
prompt, |
|
|
max_tokens, |
|
|
0.7, |
|
|
api_name="/describe" |
|
|
) |
|
|
if result and isinstance(result, str) and result.strip(): |
|
|
break |
|
|
raise RuntimeError("Resposta buida de svision") |
|
|
except Exception as exc: |
|
|
last_error = exc |
|
|
if attempt == max_attempts: |
|
|
raise |
|
|
time.sleep(wait_seconds) |
|
|
wait_seconds = min(wait_seconds * 2, 40) |
|
|
|
|
|
full_description = result.strip() if result else "" |
|
|
|
|
|
|
|
|
prompt_markers = [ |
|
|
"Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.", |
|
|
"Descriu aquesta escena. Inclou: tipus de localització (interior/exterior), elements principals, ambient, il·luminació.", |
|
|
"Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.", |
|
|
"Descriu aquesta persona.", |
|
|
"Descriu aquesta escena.", |
|
|
] |
|
|
|
|
|
for marker in prompt_markers: |
|
|
if marker in full_description: |
|
|
|
|
|
parts = full_description.split(marker, 1) |
|
|
if len(parts) > 1: |
|
|
full_description = parts[1].strip() |
|
|
|
|
|
|
|
|
|
|
|
prefixes_to_remove = [ |
|
|
"user:", "user ", "user\n", "user\t", |
|
|
"assistant:", "assistant ", "assistant\n", "assistant\t", |
|
|
"User:", "User ", "User\n", |
|
|
"Assistant:", "Assistant ", "Assistant\n", |
|
|
"system:", "system ", |
|
|
] |
|
|
|
|
|
|
|
|
for _ in range(5): |
|
|
original = full_description |
|
|
for prefix in prefixes_to_remove: |
|
|
if full_description.lower().startswith(prefix.lower()): |
|
|
full_description = full_description[len(prefix):].strip() |
|
|
break |
|
|
if original == full_description: |
|
|
break |
|
|
|
|
|
|
|
|
full_description = full_description.lstrip() |
|
|
|
|
|
|
|
|
while full_description and full_description[0] in ['\n', '\t', '\r', ' ']: |
|
|
full_description = full_description[1:] |
|
|
|
|
|
if not full_description: |
|
|
return ("", "") |
|
|
|
|
|
|
|
|
if is_face: |
|
|
|
|
|
desc_lower = full_description.lower() |
|
|
|
|
|
|
|
|
is_female = any(word in desc_lower for word in ["dona", "noia", "nena", "femení", "femenina"]) |
|
|
|
|
|
|
|
|
import hashlib |
|
|
hash_val = int(hashlib.md5(image_path.encode()).hexdigest(), 16) |
|
|
|
|
|
noms_home, noms_dona = get_catalan_names() |
|
|
if is_female: |
|
|
name_list = noms_dona |
|
|
else: |
|
|
name_list = noms_home |
|
|
|
|
|
|
|
|
short_name = name_list[hash_val % len(name_list)] |
|
|
else: |
|
|
|
|
|
words = full_description.split()[:4] |
|
|
short_name = " ".join(words).capitalize() |
|
|
|
|
|
return (full_description, short_name) |
|
|
|
|
|
except Exception as e: |
|
|
return ("", "") |
|
|
|
|
|
|
|
|
def generate_short_scene_name(description: str) -> str: |
|
|
""" |
|
|
Genera un nombre corto de escena (< 3 palabras) basándose en la descripción |
|
|
usando el modelo schat (Salamandra-Instruct). |
|
|
|
|
|
Args: |
|
|
description: Descripción completa de la escena de svision |
|
|
|
|
|
Returns: |
|
|
Nombre corto de la escena (< 3 palabras) o string vacío si falla |
|
|
""" |
|
|
try: |
|
|
|
|
|
from gradio_client import Client, handle_file |
|
|
|
|
|
|
|
|
schat_url = os.getenv("SCHAT_URL", "https://veureu-schat.hf.space") |
|
|
client = Client(schat_url) |
|
|
|
|
|
|
|
|
prompt = f"Basant-te en aquesta descripció d'una escena, genera un nom curt de menys de 3 paraules que la resumeixi:\n\n{description}\n\nNom de l'escena:" |
|
|
|
|
|
|
|
|
|
|
|
result = client.predict( |
|
|
prompt, |
|
|
[], |
|
|
256, |
|
|
0.7, |
|
|
0.9, |
|
|
50, |
|
|
1.0, |
|
|
api_name="/predict" |
|
|
) |
|
|
|
|
|
|
|
|
if isinstance(result, tuple) and len(result) >= 1: |
|
|
short_name = result[0].strip() if result[0] else "" |
|
|
elif isinstance(result, str): |
|
|
short_name = result.strip() |
|
|
else: |
|
|
short_name = "" |
|
|
|
|
|
|
|
|
short_name = short_name.strip('"\'.,!?').strip() |
|
|
|
|
|
|
|
|
prefixes_to_remove = [ |
|
|
"Nom de l'escena:", |
|
|
"nom de l'escena:", |
|
|
"Escena:", |
|
|
"escena:", |
|
|
] |
|
|
|
|
|
for prefix in prefixes_to_remove: |
|
|
if short_name.lower().startswith(prefix.lower()): |
|
|
short_name = short_name[len(prefix):].strip() |
|
|
|
|
|
|
|
|
words = short_name.split() |
|
|
if len(words) > 3: |
|
|
short_name = " ".join(words[:3]) |
|
|
|
|
|
return short_name |
|
|
|
|
|
except Exception as e: |
|
|
return "" |
|
|
|