|
|
|
|
|
import os |
|
|
import requests |
|
|
import base64 |
|
|
import zipfile |
|
|
import io |
|
|
from typing import Iterable, Dict, Any, Tuple |
|
|
from PIL import Image |
|
|
|
|
|
class APIClient: |
|
|
""" |
|
|
Cliente para 'engine': |
|
|
POST /jobs -> {"job_id": "..."} |
|
|
GET /jobs/{job_id}/status -> {"status": "queued|processing|done|failed", ...} |
|
|
GET /jobs/{job_id}/result -> JobResult {"book": {...}, "une": {...}, ...} |
|
|
""" |
|
|
def __init__(self, base_url: str, use_mock: bool = False, data_dir: str | None = None, token: str | None = None, timeout: int = 180, tts_url: str | None = None): |
|
|
self.base_url = base_url.rstrip("/") |
|
|
|
|
|
self.tts_url = tts_url or os.getenv("API_TTS_URL", "https://veureu-tts.hf.space") |
|
|
print(f"🔧 APIClient.__init__ - tts_url recibida: {tts_url}") |
|
|
print(f"🔧 APIClient.__init__ - os.getenv(API_TTS_URL): {os.getenv('API_TTS_URL')}") |
|
|
print(f"🔧 APIClient.__init__ - self.tts_url final: {self.tts_url}") |
|
|
print(f"🔧 APIClient.__init__ - tipo self.tts_url: {type(self.tts_url)}") |
|
|
print(f"🔧 APIClient.__init__ - repr self.tts_url: {repr(self.tts_url)}") |
|
|
self.use_mock = use_mock |
|
|
self.data_dir = data_dir |
|
|
self.timeout = timeout |
|
|
self.session = requests.Session() |
|
|
|
|
|
token = token or os.getenv("API_SHARED_TOKEN") |
|
|
if token: |
|
|
self.session.headers.update({"Authorization": f"Bearer {token}"}) |
|
|
|
|
|
|
|
|
def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: |
|
|
url = f"{self.base_url}/jobs" |
|
|
files = {"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream")} |
|
|
data = {"modes": ",".join(modes)} |
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
|
|
|
def _get_status(self, job_id: str) -> Dict[str, Any]: |
|
|
url = f"{self.base_url}/jobs/{job_id}/status" |
|
|
r = self.session.get(url, timeout=self.timeout) |
|
|
if r.status_code == 404: |
|
|
return {"status": "not_found"} |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
|
|
|
def _get_result(self, job_id: str) -> Dict[str, Any]: |
|
|
url = f"{self.base_url}/jobs/{job_id}/status" |
|
|
r = self.session.get(url, timeout=self.timeout) |
|
|
if r.status_code == 404: |
|
|
return {"status": "not_found"} |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
|
|
|
|
|
|
def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: |
|
|
"""Devuelve {"job_id": "..."}""" |
|
|
if self.use_mock: |
|
|
return {"job_id": "mock-123"} |
|
|
return self._post_jobs(video_path, modes) |
|
|
|
|
|
def get_job(self, job_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
La UI espera algo del estilo: |
|
|
{"status":"done","results":{"book":{...},"une":{...}}} |
|
|
Adaptamos la respuesta de /result del engine a ese contrato. |
|
|
""" |
|
|
if self.use_mock: |
|
|
|
|
|
return { |
|
|
"status": "done", |
|
|
"results": { |
|
|
"book": {"text": "Text d'exemple (book)", "mp3_bytes": b""}, |
|
|
"une": {"srt": "1\n00:00:00,000 --> 00:00:01,000\nExemple UNE\n", "mp3_bytes": b""}, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
st = self._get_status(job_id) |
|
|
if st.get("status") in {"queued", "processing"}: |
|
|
return {"status": st.get("status", "queued")} |
|
|
|
|
|
|
|
|
res = self._get_result(job_id) |
|
|
|
|
|
|
|
|
if isinstance(res, dict) and isinstance(res.get("results"), dict): |
|
|
return { |
|
|
"status": res.get("status", st.get("status", "done")), |
|
|
"results": res.get("results", {}), |
|
|
} |
|
|
|
|
|
|
|
|
results = {} |
|
|
if "book" in res: |
|
|
results["book"] = { |
|
|
"text": res["book"].get("text"), |
|
|
} |
|
|
if "une" in res: |
|
|
results["une"] = { |
|
|
"srt": res["une"].get("srt"), |
|
|
} |
|
|
for k in ("book", "une"): |
|
|
if k in res: |
|
|
if "characters" in res[k]: |
|
|
results[k]["characters"] = res[k]["characters"] |
|
|
if "metrics" in res[k]: |
|
|
results[k]["metrics"] = res[k]["metrics"] |
|
|
|
|
|
status = "done" if results else st.get("status", "unknown") |
|
|
return {"status": status, "results": results} |
|
|
|
|
|
|
|
|
def tts_matxa(self, text: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Llama al space 'tts' para sintetizar audio. |
|
|
Usa /tts/text para textos cortos (<480 chars) o /tts/text_long para textos largos. |
|
|
|
|
|
Args: |
|
|
text (str): Texto a sintetizar. |
|
|
voice (str): Voz de Matxa a usar (p.ej. 'central/grau'). |
|
|
|
|
|
Returns: |
|
|
dict: {'mp3_bytes': bytes} o {'error': str} |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
print(f"🔧 tts_matxa - self.tts_url ANTES de construir URL: {self.tts_url}") |
|
|
print(f"🔧 tts_matxa - tipo self.tts_url: {type(self.tts_url)}") |
|
|
print(f"🔧 tts_matxa - repr self.tts_url: {repr(self.tts_url)}") |
|
|
|
|
|
|
|
|
if len(text) > 480: |
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text_long" |
|
|
else: |
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text" |
|
|
|
|
|
print(f"🔧 tts_matxa - URL final construida: {url}") |
|
|
print(f"🔧 tts_matxa - repr URL final: {repr(url)}") |
|
|
|
|
|
data = { |
|
|
"texto": text, |
|
|
"voice": voice, |
|
|
"formato": "mp3" |
|
|
} |
|
|
|
|
|
try: |
|
|
print(f"🎯 Llamando TTS a: {url}") |
|
|
print(f"📝 Texto length: {len(text)} caracteres") |
|
|
print(f"🗣️ Voz: {voice}") |
|
|
|
|
|
r = self.session.post(url, data=data, timeout=self.timeout * 2) |
|
|
print(f"📊 Response status: {r.status_code}") |
|
|
|
|
|
r.raise_for_status() |
|
|
|
|
|
|
|
|
print(f"✅ Audio recibido: {len(r.content)} bytes") |
|
|
return {"mp3_bytes": r.content} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"❌ Error cridant a TTS: {e}") |
|
|
print(f"❌ URL: {url}") |
|
|
print(f"❌ Data: {data}") |
|
|
|
|
|
return {"error": str(e)} |
|
|
|
|
|
def generate_audiodescription(self, video_bytes: bytes, video_name: str) -> dict: |
|
|
"""Llama al endpoint del engine /generate_audiodescription con un MP4 en memoria.""" |
|
|
url = f"{self.base_url}/generate_audiodescription" |
|
|
try: |
|
|
files = { |
|
|
"video": (video_name or "video.mp4", video_bytes, "video/mp4") |
|
|
} |
|
|
r = self.session.post(url, files=files, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def finalize_casting(self, payload: dict) -> dict: |
|
|
"""Envía el càsting definitiu al engine para consolidar identidades e indexar.""" |
|
|
url = f"{self.base_url}/finalize_casting" |
|
|
try: |
|
|
r = self.session.post(url, json=payload, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.HTTPError as e: |
|
|
resp = e.response |
|
|
try: |
|
|
|
|
|
return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": resp.json() if resp is not None else None} |
|
|
except Exception: |
|
|
|
|
|
return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": (resp.text if resp is not None else None)} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def load_casting(self, faces_dir: str, voices_dir: str, db_dir: str, drop_collections: bool = False) -> dict: |
|
|
"""Carga índices de caras y voces al motor de búsqueda Chroma del engine.""" |
|
|
url = f"{self.base_url}/load_casting" |
|
|
data = { |
|
|
"faces_dir": faces_dir, |
|
|
"voices_dir": voices_dir, |
|
|
"db_dir": db_dir, |
|
|
"drop_collections": str(1 if drop_collections else 0), |
|
|
} |
|
|
try: |
|
|
r = self.session.post(url, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def rebuild_video_with_ad(self, video_path: str, srt_path: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Llama al space 'tts' para reconstruir un vídeo con audiodescripció a partir de un SRT. |
|
|
Usa el endpoint /tts/srt que devuelve un ZIP con el vídeo final. |
|
|
|
|
|
Args: |
|
|
video_path: Ruta al archivo de vídeo original |
|
|
srt_path: Ruta al archivo SRT con las audiodescripciones |
|
|
voice: Voz de Matxa (por defecto 'central/grau') |
|
|
|
|
|
Returns: |
|
|
dict: {'video_bytes': bytes} o {'error': str} |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt" |
|
|
|
|
|
print(f"🎬 Reconstruyendo video con AD") |
|
|
print(f"🎯 URL TTS: {url}") |
|
|
print(f"📹 Video: {video_path}") |
|
|
print(f"📝 SRT: {srt_path}") |
|
|
print(f"🗣️ Voz: {voice}") |
|
|
|
|
|
try: |
|
|
with open(video_path, 'rb') as video_file: |
|
|
with open(srt_path, 'rb') as srt_file: |
|
|
files = { |
|
|
'video': (os.path.basename(video_path), video_file, 'video/mp4'), |
|
|
'srt': (os.path.basename(srt_path), srt_file, 'application/x-subrip') |
|
|
} |
|
|
data = { |
|
|
"voice": voice, |
|
|
"ad_format": "mp3", |
|
|
"include_final_mp4": "1" |
|
|
} |
|
|
|
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
|
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as z: |
|
|
|
|
|
for filename in z.namelist(): |
|
|
if filename.endswith('.mp4'): |
|
|
video_bytes = z.read(filename) |
|
|
return {"video_bytes": video_bytes} |
|
|
|
|
|
|
|
|
return {"error": "No se encontró el archivo de vídeo MP4 en la respuesta del servidor."} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"Error cridant a la reconstrucció de vídeo: {e}") |
|
|
return {"error": str(e)} |
|
|
except zipfile.BadZipFile: |
|
|
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."} |
|
|
except Exception as e: |
|
|
print(f"Error inesperat: {e}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def refine_narration(self, dialogues_srt: str, frame_descriptions_json: str = "[]", config_path: str = "config.yaml") -> dict: |
|
|
"""Llama al endpoint del engine /refine_narration para generar narrativa y/o SRT.""" |
|
|
url = f"{self.base_url}/refine_narration" |
|
|
data = { |
|
|
"dialogues_srt": dialogues_srt, |
|
|
"frame_descriptions_json": frame_descriptions_json, |
|
|
"config_path": config_path, |
|
|
} |
|
|
try: |
|
|
r = self.session.post(url, data=data, timeout=self.timeout) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def create_initial_casting(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, |
|
|
face_max_groups: int = 3, face_min_cluster_size: int = 3, face_sensitivity: float = 0.5, |
|
|
voice_max_groups: int = 3, voice_min_cluster_size: int = 3, voice_sensitivity: float = 0.5, |
|
|
max_frames: int = 100) -> dict: |
|
|
""" |
|
|
Llama al endpoint del space 'engine' para crear el 'initial casting'. |
|
|
|
|
|
Envía el vídeo recién importado como archivo y los parámetros de clustering. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file (if reading from disk) |
|
|
video_bytes: Video file bytes (if already in memory) |
|
|
video_name: Name for the video file |
|
|
face_max_groups: Max number of face clusters (hierarchical) |
|
|
face_min_cluster_size: Minimum face cluster size |
|
|
voice_max_groups: Max number of voice clusters (hierarchical) |
|
|
voice_min_cluster_size: Minimum voice cluster size |
|
|
max_frames: Maximum number of frames to process |
|
|
""" |
|
|
url = f"{self.base_url}/create_initial_casting" |
|
|
try: |
|
|
|
|
|
if video_bytes: |
|
|
filename = video_name or "video.mp4" |
|
|
files = { |
|
|
"video": (filename, video_bytes, "video/mp4"), |
|
|
} |
|
|
elif video_path: |
|
|
with open(video_path, "rb") as f: |
|
|
files = { |
|
|
"video": (os.path.basename(video_path), f.read(), "video/mp4"), |
|
|
} |
|
|
else: |
|
|
return {"error": "Either video_path or video_bytes must be provided"} |
|
|
|
|
|
data = { |
|
|
"max_groups": str(face_max_groups), |
|
|
"min_cluster_size": str(face_min_cluster_size), |
|
|
"face_sensitivity": str(face_sensitivity), |
|
|
"voice_max_groups": str(voice_max_groups), |
|
|
"voice_min_cluster_size": str(voice_min_cluster_size), |
|
|
"voice_sensitivity": str(voice_sensitivity), |
|
|
"max_frames": str(max_frames), |
|
|
} |
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
except Exception as e: |
|
|
return {"error": f"Unexpected error: {str(e)}"} |
|
|
|
|
|
def detect_scenes(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, |
|
|
max_groups: int = 3, min_cluster_size: int = 3, scene_sensitivity: float = 0.5, frame_interval_sec: float = 0.5) -> dict: |
|
|
""" |
|
|
Call engine /detect_scenes to compute scene clusters using hierarchical clustering on color histograms. |
|
|
""" |
|
|
url = f"{self.base_url}/detect_scenes" |
|
|
try: |
|
|
if video_bytes: |
|
|
filename = video_name or "video.mp4" |
|
|
files = { |
|
|
"video": (filename, video_bytes, "video/mp4"), |
|
|
} |
|
|
elif video_path: |
|
|
with open(video_path, "rb") as f: |
|
|
files = { |
|
|
"video": (os.path.basename(video_path), f.read(), "video/mp4"), |
|
|
} |
|
|
else: |
|
|
return {"error": "Either video_path or video_bytes must be provided"} |
|
|
|
|
|
data = { |
|
|
"max_groups": str(max_groups), |
|
|
"min_cluster_size": str(min_cluster_size), |
|
|
"scene_sensitivity": str(scene_sensitivity), |
|
|
"frame_interval_sec": str(frame_interval_sec), |
|
|
} |
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Genera un único MP3 a partir de un texto largo, usando el endpoint de SRT. |
|
|
1. Convierte el texto en un SRT falso. |
|
|
2. Llama a /tts/srt con el SRT. |
|
|
3. Extrae el 'ad_master.mp3' del ZIP resultante. |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
|
|
|
srt_content = "" |
|
|
start_time = 0 |
|
|
for i, line in enumerate(text_content.strip().split('\n')): |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
end_time = start_time + 5 |
|
|
|
|
|
def format_time(seconds): |
|
|
h = int(seconds / 3600) |
|
|
m = int((seconds % 3600) / 60) |
|
|
s = int(seconds % 60) |
|
|
ms = int((seconds - int(seconds)) * 1000) |
|
|
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" |
|
|
|
|
|
srt_content += f"{i+1}\n" |
|
|
srt_content += f"{format_time(start_time)} --> {format_time(end_time)}\n" |
|
|
srt_content += f"{line}\n\n" |
|
|
start_time = end_time |
|
|
|
|
|
if not srt_content: |
|
|
return {"error": "El texto proporcionado estaba vacío o no se pudo procesar."} |
|
|
|
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt" |
|
|
try: |
|
|
files = { |
|
|
'srt': ('fake_ad.srt', srt_content, 'application/x-subrip') |
|
|
} |
|
|
data = {"voice": voice, "ad_format": "mp3"} |
|
|
|
|
|
r = requests.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
|
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as z: |
|
|
for filename in z.namelist(): |
|
|
if filename == 'ad_master.mp3': |
|
|
mp3_bytes = z.read(filename) |
|
|
return {"mp3_bytes": mp3_bytes} |
|
|
|
|
|
return {"error": "No se encontró 'ad_master.mp3' en la respuesta del servidor."} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": f"Error llamando a la API de SRT: {e}"} |
|
|
except zipfile.BadZipFile: |
|
|
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."} |
|
|
|
|
|
|
|
|
def tts_long_text(self, text: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Llama al endpoint '/tts/text_long' para sintetizar un texto largo. |
|
|
La API se encarga de todo el procesamiento. |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)") |
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text_long" |
|
|
data = { |
|
|
"texto": text, |
|
|
"voice": voice, |
|
|
"formato": "mp3" |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
r = requests.post(url, data=data, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
return {"mp3_bytes": r.content} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"Error cridant a TTS per a text llarg: {e}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_catalan_names(): |
|
|
"""Retorna llistes de noms catalans.""" |
|
|
noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert", |
|
|
"Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"] |
|
|
noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla", |
|
|
"Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"] |
|
|
return noms_home, noms_dona |
|
|
|
|
|
def describe_image_with_svision(image_path: str, is_face: bool = True) -> Tuple[str, str]: |
|
|
""" |
|
|
Llama al space svision para describir una imagen (cara o escena). |
|
|
|
|
|
Args: |
|
|
image_path: Ruta absoluta a la imagen |
|
|
is_face: True si es una cara, False si es una escena |
|
|
|
|
|
Returns: |
|
|
tuple (descripción_completa, nombre_abreviado) |
|
|
""" |
|
|
try: |
|
|
from gradio_client import Client, handle_file |
|
|
|
|
|
|
|
|
svision_url = os.getenv("SVISION_URL", "https://veureu-svision.hf.space") |
|
|
print(f"[svision] Connectant a {svision_url}...") |
|
|
|
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
if hf_token: |
|
|
client = Client(svision_url, hf_token=hf_token) |
|
|
print(f"[svision] Autenticat amb token HF") |
|
|
else: |
|
|
client = Client(svision_url) |
|
|
print(f"[svision] Sense autenticació (cuota limitada)") |
|
|
|
|
|
|
|
|
if is_face: |
|
|
prompt = "Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta." |
|
|
else: |
|
|
prompt = "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals." |
|
|
|
|
|
print(f"[svision] Enviant petició (pot trigar si ZeroGPU està en cold start)...") |
|
|
print(f"[svision] Image path: {image_path}") |
|
|
|
|
|
import time |
|
|
start_time = time.time() |
|
|
max_tokens = 256 if is_face else 128 |
|
|
max_attempts = int(os.getenv("SVISION_MAX_ATTEMPTS", "5")) |
|
|
wait_seconds = int(os.getenv("SVISION_RETRY_WAIT", "5")) |
|
|
result = None |
|
|
last_error: Exception | None = None |
|
|
|
|
|
for attempt in range(1, max_attempts + 1): |
|
|
try: |
|
|
print(f"[svision] Attempt {attempt}/{max_attempts} (wait={wait_seconds}s)") |
|
|
result = client.predict( |
|
|
handle_file(image_path), |
|
|
prompt, |
|
|
max_tokens, |
|
|
0.7, |
|
|
api_name="/describe" |
|
|
) |
|
|
if result and isinstance(result, str) and result.strip(): |
|
|
break |
|
|
raise RuntimeError("Resposta buida de svision") |
|
|
except Exception as exc: |
|
|
last_error = exc |
|
|
print(f"[svision] Error attempt {attempt}/{max_attempts}: {exc}") |
|
|
if attempt == max_attempts: |
|
|
raise |
|
|
time.sleep(wait_seconds) |
|
|
wait_seconds = min(wait_seconds * 2, 40) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
print(f"[svision] Resposta rebuda en {elapsed:.1f}s") |
|
|
|
|
|
full_description = result.strip() if result else "" |
|
|
|
|
|
|
|
|
prompt_markers = [ |
|
|
"Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.", |
|
|
"Descriu aquesta escena. Inclou: tipus de localització (interior/exterior), elements principals, ambient, il·luminació.", |
|
|
"Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.", |
|
|
"Descriu aquesta persona.", |
|
|
"Descriu aquesta escena.", |
|
|
] |
|
|
|
|
|
for marker in prompt_markers: |
|
|
if marker in full_description: |
|
|
|
|
|
parts = full_description.split(marker, 1) |
|
|
if len(parts) > 1: |
|
|
full_description = parts[1].strip() |
|
|
|
|
|
|
|
|
|
|
|
prefixes_to_remove = [ |
|
|
"user:", "user ", "user\n", "user\t", |
|
|
"assistant:", "assistant ", "assistant\n", "assistant\t", |
|
|
"User:", "User ", "User\n", |
|
|
"Assistant:", "Assistant ", "Assistant\n", |
|
|
"system:", "system ", |
|
|
] |
|
|
|
|
|
|
|
|
for _ in range(5): |
|
|
original = full_description |
|
|
for prefix in prefixes_to_remove: |
|
|
if full_description.lower().startswith(prefix.lower()): |
|
|
full_description = full_description[len(prefix):].strip() |
|
|
break |
|
|
if original == full_description: |
|
|
break |
|
|
|
|
|
|
|
|
full_description = full_description.lstrip() |
|
|
|
|
|
|
|
|
while full_description and full_description[0] in ['\n', '\t', '\r', ' ']: |
|
|
full_description = full_description[1:] |
|
|
|
|
|
if not full_description: |
|
|
return ("", "") |
|
|
|
|
|
|
|
|
if is_face: |
|
|
|
|
|
desc_lower = full_description.lower() |
|
|
|
|
|
|
|
|
is_female = any(word in desc_lower for word in ["dona", "noia", "nena", "femení", "femenina"]) |
|
|
|
|
|
|
|
|
import hashlib |
|
|
hash_val = int(hashlib.md5(image_path.encode()).hexdigest(), 16) |
|
|
|
|
|
noms_home, noms_dona = get_catalan_names() |
|
|
if is_female: |
|
|
name_list = noms_dona |
|
|
else: |
|
|
name_list = noms_home |
|
|
|
|
|
|
|
|
short_name = name_list[hash_val % len(name_list)] |
|
|
else: |
|
|
|
|
|
words = full_description.split()[:4] |
|
|
short_name = " ".join(words).capitalize() |
|
|
|
|
|
print(f"[svision] Descripció generada: {full_description[:100]}...") |
|
|
print(f"[svision] Nom: {short_name}") |
|
|
|
|
|
return (full_description, short_name) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[svision] Error al descriure imatge: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return ("", "") |
|
|
|
|
|
|
|
|
def generate_short_scene_name(description: str) -> str: |
|
|
""" |
|
|
Genera un nombre corto de escena (< 3 palabras) basándose en la descripción |
|
|
usando el modelo schat (Salamandra-Instruct). |
|
|
|
|
|
Args: |
|
|
description: Descripción completa de la escena de svision |
|
|
|
|
|
Returns: |
|
|
Nombre corto de la escena (< 3 palabras) o string vacío si falla |
|
|
""" |
|
|
try: |
|
|
|
|
|
from gradio_client import Client, handle_file |
|
|
|
|
|
|
|
|
schat_url = os.getenv("SCHAT_URL", "https://veureu-schat.hf.space") |
|
|
print(f"[schat] Connectant a {schat_url}...") |
|
|
|
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
if hf_token: |
|
|
client = Client(schat_url, hf_token=hf_token) |
|
|
print(f"[schat] Autenticat amb token HF") |
|
|
else: |
|
|
client = Client(schat_url) |
|
|
print(f"[schat] Sense autenticació") |
|
|
|
|
|
|
|
|
prompt = f"Basant-te en aquesta descripció d'una escena, genera un nom curt de menys de 3 paraules que la resumeixi:\n\n{description}\n\nNom de l'escena:" |
|
|
|
|
|
print(f"[schat] Generant nom curt per descripció: {description[:100]}...") |
|
|
|
|
|
|
|
|
|
|
|
result = client.predict( |
|
|
prompt, |
|
|
[], |
|
|
256, |
|
|
0.7, |
|
|
0.9, |
|
|
50, |
|
|
1.0, |
|
|
api_name="/predict" |
|
|
) |
|
|
|
|
|
|
|
|
if isinstance(result, tuple) and len(result) >= 1: |
|
|
short_name = result[0].strip() if result[0] else "" |
|
|
elif isinstance(result, str): |
|
|
short_name = result.strip() |
|
|
else: |
|
|
short_name = "" |
|
|
|
|
|
|
|
|
short_name = short_name.strip('"\'.,!?').strip() |
|
|
|
|
|
|
|
|
prefixes_to_remove = [ |
|
|
"Nom de l'escena:", |
|
|
"nom de l'escena:", |
|
|
"Escena:", |
|
|
"escena:", |
|
|
] |
|
|
|
|
|
for prefix in prefixes_to_remove: |
|
|
if short_name.lower().startswith(prefix.lower()): |
|
|
short_name = short_name[len(prefix):].strip() |
|
|
|
|
|
|
|
|
words = short_name.split() |
|
|
if len(words) > 3: |
|
|
short_name = " ".join(words[:3]) |
|
|
|
|
|
print(f"[schat] Nom curt generat: {short_name}") |
|
|
|
|
|
return short_name |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[schat] Error al generar nom curt: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return "" |
|
|
|