demo / api_client.py
VeuReu's picture
Upload 7 files
c5cb13a
raw
history blame
30.7 kB
# api_client.py (UI - Space "veureu")
import os
import requests
import base64
import zipfile
import io
from typing import Iterable, Dict, Any, Tuple
from PIL import Image
class APIClient:
"""
Cliente para 'engine':
POST /jobs -> {"job_id": "..."}
GET /jobs/{job_id}/status -> {"status": "queued|processing|done|failed", ...}
GET /jobs/{job_id}/result -> JobResult {"book": {...}, "une": {...}, ...}
"""
def __init__(self, base_url: str, use_mock: bool = False, data_dir: str | None = None, token: str | None = None, timeout: int = 180, tts_url: str | None = None):
self.base_url = base_url.rstrip("/")
# URL específica para el servicio TTS (por defecto usa la variable de entorno API_TTS_URL)
self.tts_url = tts_url or os.getenv("API_TTS_URL", "https://veureu-tts.hf.space")
print(f"🔧 APIClient.__init__ - tts_url recibida: {tts_url}")
print(f"🔧 APIClient.__init__ - os.getenv(API_TTS_URL): {os.getenv('API_TTS_URL')}")
print(f"🔧 APIClient.__init__ - self.tts_url final: {self.tts_url}")
print(f"🔧 APIClient.__init__ - tipo self.tts_url: {type(self.tts_url)}")
print(f"🔧 APIClient.__init__ - repr self.tts_url: {repr(self.tts_url)}")
self.use_mock = use_mock
self.data_dir = data_dir
self.timeout = timeout
self.session = requests.Session()
# Permite inyectar el token del engine via secret/var en el Space UI
token = token or os.getenv("API_SHARED_TOKEN")
if token:
self.session.headers.update({"Authorization": f"Bearer {token}"})
# ---- modo real (engine) ----
def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]:
url = f"{self.base_url}/jobs"
files = {"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream")}
data = {"modes": ",".join(modes)}
r = self.session.post(url, files=files, data=data, timeout=self.timeout)
r.raise_for_status()
return r.json() # {"job_id": ...}
def _get_status(self, job_id: str) -> Dict[str, Any]:
url = f"{self.base_url}/jobs/{job_id}/status"
r = self.session.get(url, timeout=self.timeout)
if r.status_code == 404:
return {"status": "not_found"}
r.raise_for_status()
return r.json()
def _get_result(self, job_id: str) -> Dict[str, Any]:
url = f"{self.base_url}/jobs/{job_id}/status"
r = self.session.get(url, timeout=self.timeout)
if r.status_code == 404:
return {"status": "not_found"}
r.raise_for_status()
return r.json() # JobResult (status + results según engine)
# ---- API que usa streamlit_app.py ----
def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]:
"""Devuelve {"job_id": "..."}"""
if self.use_mock:
return {"job_id": "mock-123"}
return self._post_jobs(video_path, modes)
def get_job(self, job_id: str) -> Dict[str, Any]:
"""
La UI espera algo del estilo:
{"status":"done","results":{"book":{...},"une":{...}}}
Adaptamos la respuesta de /result del engine a ese contrato.
"""
if self.use_mock:
# resultado inmediato de prueba
return {
"status": "done",
"results": {
"book": {"text": "Text d'exemple (book)", "mp3_bytes": b""},
"une": {"srt": "1\n00:00:00,000 --> 00:00:01,000\nExemple UNE\n", "mp3_bytes": b""},
}
}
# Opción 1: chequear estado primero
st = self._get_status(job_id)
if st.get("status") in {"queued", "processing"}:
return {"status": st.get("status", "queued")}
# Opción 2: obtener resultado final
res = self._get_result(job_id)
# NUEVO: si el engine ya devuelve {"status": ..., "results": {...}}, pásalo tal cual
if isinstance(res, dict) and isinstance(res.get("results"), dict):
return {
"status": res.get("status", st.get("status", "done")),
"results": res.get("results", {}),
}
# LEGACY: mapeo antiguo basado en claves top-level (book/une)
results = {}
if "book" in res:
results["book"] = {
"text": res["book"].get("text"),
}
if "une" in res:
results["une"] = {
"srt": res["une"].get("srt"),
}
for k in ("book", "une"):
if k in res:
if "characters" in res[k]:
results[k]["characters"] = res[k]["characters"]
if "metrics" in res[k]:
results[k]["metrics"] = res[k]["metrics"]
status = "done" if results else st.get("status", "unknown")
return {"status": status, "results": results}
def tts_matxa(self, text: str, voice: str = "central/grau") -> dict:
"""
Llama al space 'tts' para sintetizar audio.
Usa /tts/text para textos cortos (<480 chars) o /tts/text_long para textos largos.
Args:
text (str): Texto a sintetizar.
voice (str): Voz de Matxa a usar (p.ej. 'central/grau').
Returns:
dict: {'mp3_bytes': bytes} o {'error': str}
"""
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
print(f"🔧 tts_matxa - self.tts_url ANTES de construir URL: {self.tts_url}")
print(f"🔧 tts_matxa - tipo self.tts_url: {type(self.tts_url)}")
print(f"🔧 tts_matxa - repr self.tts_url: {repr(self.tts_url)}")
# Usar endpoint apropiado según la longitud del texto
if len(text) > 480:
url = f"{self.tts_url.rstrip('/')}/tts/text_long"
else:
url = f"{self.tts_url.rstrip('/')}/tts/text"
print(f"🔧 tts_matxa - URL final construida: {url}")
print(f"🔧 tts_matxa - repr URL final: {repr(url)}")
data = {
"texto": text,
"voice": voice,
"formato": "mp3"
}
try:
print(f"🎯 Llamando TTS a: {url}")
print(f"📝 Texto length: {len(text)} caracteres")
print(f"🗣️ Voz: {voice}")
r = self.session.post(url, data=data, timeout=self.timeout * 2) # Más tiempo para textos largos
print(f"📊 Response status: {r.status_code}")
r.raise_for_status()
# Devolver los bytes directamente para que el cliente los pueda concatenar
print(f"✅ Audio recibido: {len(r.content)} bytes")
return {"mp3_bytes": r.content}
except requests.exceptions.RequestException as e:
print(f"❌ Error cridant a TTS: {e}")
print(f"❌ URL: {url}")
print(f"❌ Data: {data}")
# Devolvemos un diccionario con error para que la UI lo muestre
return {"error": str(e)}
def generate_audiodescription(self, video_bytes: bytes, video_name: str) -> dict:
"""Llama al endpoint del engine /generate_audiodescription con un MP4 en memoria."""
url = f"{self.base_url}/generate_audiodescription"
try:
files = {
"video": (video_name or "video.mp4", video_bytes, "video/mp4")
}
r = self.session.post(url, files=files, timeout=self.timeout * 10)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def finalize_casting(self, payload: dict) -> dict:
"""Envía el càsting definitiu al engine para consolidar identidades e indexar."""
url = f"{self.base_url}/finalize_casting"
try:
r = self.session.post(url, json=payload, timeout=self.timeout * 5)
r.raise_for_status()
return r.json()
except requests.exceptions.HTTPError as e:
resp = e.response
try:
# Try to include JSON error if present
return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": resp.json() if resp is not None else None}
except Exception:
# Fallback to text body
return {"error": str(e), "status_code": resp.status_code if resp is not None else None, "body": (resp.text if resp is not None else None)}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def load_casting(self, faces_dir: str, voices_dir: str, db_dir: str, drop_collections: bool = False) -> dict:
"""Carga índices de caras y voces al motor de búsqueda Chroma del engine."""
url = f"{self.base_url}/load_casting"
data = {
"faces_dir": faces_dir,
"voices_dir": voices_dir,
"db_dir": db_dir,
"drop_collections": str(1 if drop_collections else 0),
}
try:
r = self.session.post(url, data=data, timeout=self.timeout * 5)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def rebuild_video_with_ad(self, video_path: str, srt_path: str, voice: str = "central/grau") -> dict:
"""
Llama al space 'tts' para reconstruir un vídeo con audiodescripció a partir de un SRT.
Usa el endpoint /tts/srt que devuelve un ZIP con el vídeo final.
Args:
video_path: Ruta al archivo de vídeo original
srt_path: Ruta al archivo SRT con las audiodescripciones
voice: Voz de Matxa (por defecto 'central/grau')
Returns:
dict: {'video_bytes': bytes} o {'error': str}
"""
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
url = f"{self.tts_url.rstrip('/')}/tts/srt"
print(f"🎬 Reconstruyendo video con AD")
print(f"🎯 URL TTS: {url}")
print(f"📹 Video: {video_path}")
print(f"📝 SRT: {srt_path}")
print(f"🗣️ Voz: {voice}")
try:
with open(video_path, 'rb') as video_file:
with open(srt_path, 'rb') as srt_file:
files = {
'video': (os.path.basename(video_path), video_file, 'video/mp4'),
'srt': (os.path.basename(srt_path), srt_file, 'application/x-subrip')
}
data = {
"voice": voice,
"ad_format": "mp3",
"include_final_mp4": "1"
}
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
# El servidor devuelve un ZIP, lo procesamos en memoria
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
# Buscamos el archivo video_con_ad.mp4 dentro del ZIP
for filename in z.namelist():
if filename.endswith('.mp4'):
video_bytes = z.read(filename)
return {"video_bytes": video_bytes}
# Si no se encuentra el MP4 en el ZIP
return {"error": "No se encontró el archivo de vídeo MP4 en la respuesta del servidor."}
except requests.exceptions.RequestException as e:
print(f"Error cridant a la reconstrucció de vídeo: {e}")
return {"error": str(e)}
except zipfile.BadZipFile:
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."}
except Exception as e:
print(f"Error inesperat: {e}")
return {"error": str(e)}
def refine_narration(self, dialogues_srt: str, frame_descriptions_json: str = "[]", config_path: str = "config.yaml") -> dict:
"""Llama al endpoint del engine /refine_narration para generar narrativa y/o SRT."""
url = f"{self.base_url}/refine_narration"
data = {
"dialogues_srt": dialogues_srt,
"frame_descriptions_json": frame_descriptions_json,
"config_path": config_path,
}
try:
r = self.session.post(url, data=data, timeout=self.timeout)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def create_initial_casting(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None,
face_max_groups: int = 3, face_min_cluster_size: int = 3, face_sensitivity: float = 0.5,
voice_max_groups: int = 3, voice_min_cluster_size: int = 3, voice_sensitivity: float = 0.5,
max_frames: int = 100) -> dict:
"""
Llama al endpoint del space 'engine' para crear el 'initial casting'.
Envía el vídeo recién importado como archivo y los parámetros de clustering.
Args:
video_path: Path to video file (if reading from disk)
video_bytes: Video file bytes (if already in memory)
video_name: Name for the video file
face_max_groups: Max number of face clusters (hierarchical)
face_min_cluster_size: Minimum face cluster size
voice_max_groups: Max number of voice clusters (hierarchical)
voice_min_cluster_size: Minimum voice cluster size
max_frames: Maximum number of frames to process
"""
url = f"{self.base_url}/create_initial_casting"
try:
# Prepare file data
if video_bytes:
filename = video_name or "video.mp4"
files = {
"video": (filename, video_bytes, "video/mp4"),
}
elif video_path:
with open(video_path, "rb") as f:
files = {
"video": (os.path.basename(video_path), f.read(), "video/mp4"),
}
else:
return {"error": "Either video_path or video_bytes must be provided"}
data = {
"max_groups": str(face_max_groups),
"min_cluster_size": str(face_min_cluster_size),
"face_sensitivity": str(face_sensitivity),
"voice_max_groups": str(voice_max_groups),
"voice_min_cluster_size": str(voice_min_cluster_size),
"voice_sensitivity": str(voice_sensitivity),
"max_frames": str(max_frames),
}
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
except Exception as e:
return {"error": f"Unexpected error: {str(e)}"}
def detect_scenes(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None,
max_groups: int = 3, min_cluster_size: int = 3, scene_sensitivity: float = 0.5, frame_interval_sec: float = 0.5) -> dict:
"""
Call engine /detect_scenes to compute scene clusters using hierarchical clustering on color histograms.
"""
url = f"{self.base_url}/detect_scenes"
try:
if video_bytes:
filename = video_name or "video.mp4"
files = {
"video": (filename, video_bytes, "video/mp4"),
}
elif video_path:
with open(video_path, "rb") as f:
files = {
"video": (os.path.basename(video_path), f.read(), "video/mp4"),
}
else:
return {"error": "Either video_path or video_bytes must be provided"}
data = {
"max_groups": str(max_groups),
"min_cluster_size": str(min_cluster_size),
"scene_sensitivity": str(scene_sensitivity),
"frame_interval_sec": str(frame_interval_sec),
}
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict:
"""
Genera un único MP3 a partir de un texto largo, usando el endpoint de SRT.
1. Convierte el texto en un SRT falso.
2. Llama a /tts/srt con el SRT.
3. Extrae el 'ad_master.mp3' del ZIP resultante.
"""
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
# 1. Crear un SRT falso en memoria
srt_content = ""
start_time = 0
for i, line in enumerate(text_content.strip().split('\n')):
line = line.strip()
if not line:
continue
# Asignar 5 segundos por línea, un valor simple
end_time = start_time + 5
def format_time(seconds):
h = int(seconds / 3600)
m = int((seconds % 3600) / 60)
s = int(seconds % 60)
ms = int((seconds - int(seconds)) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
srt_content += f"{i+1}\n"
srt_content += f"{format_time(start_time)} --> {format_time(end_time)}\n"
srt_content += f"{line}\n\n"
start_time = end_time
if not srt_content:
return {"error": "El texto proporcionado estaba vacío o no se pudo procesar."}
# 2. Llamar al endpoint /tts/srt
url = f"{self.tts_url.rstrip('/')}/tts/srt"
try:
files = {
'srt': ('fake_ad.srt', srt_content, 'application/x-subrip')
}
data = {"voice": voice, "ad_format": "mp3"}
r = requests.post(url, files=files, data=data, timeout=self.timeout * 5)
r.raise_for_status()
# 3. Extraer 'ad_master.mp3' del ZIP
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
for filename in z.namelist():
if filename == 'ad_master.mp3':
mp3_bytes = z.read(filename)
return {"mp3_bytes": mp3_bytes}
return {"error": "No se encontró 'ad_master.mp3' en la respuesta del servidor."}
except requests.exceptions.RequestException as e:
return {"error": f"Error llamando a la API de SRT: {e}"}
except zipfile.BadZipFile:
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."}
def tts_long_text(self, text: str, voice: str = "central/grau") -> dict:
"""
Llama al endpoint '/tts/text_long' para sintetizar un texto largo.
La API se encarga de todo el procesamiento.
"""
if not self.tts_url:
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
url = f"{self.tts_url.rstrip('/')}/tts/text_long"
data = {
"texto": text,
"voice": voice,
"formato": "mp3"
}
try:
# Usamos un timeout más largo por si el texto es muy extenso
r = requests.post(url, data=data, timeout=self.timeout * 10)
r.raise_for_status()
return {"mp3_bytes": r.content}
except requests.exceptions.RequestException as e:
print(f"Error cridant a TTS per a text llarg: {e}")
return {"error": str(e)}
# ===========================
# Cliente para SVision Space
# ===========================
# Nombres catalanes comunes para asignar a personajes (deben coincidir con app.py)
def get_catalan_names():
"""Retorna llistes de noms catalans."""
noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert",
"Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"]
noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla",
"Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"]
return noms_home, noms_dona
def describe_image_with_svision(image_path: str, is_face: bool = True) -> Tuple[str, str]:
"""
Llama al space svision para describir una imagen (cara o escena).
Args:
image_path: Ruta absoluta a la imagen
is_face: True si es una cara, False si es una escena
Returns:
tuple (descripción_completa, nombre_abreviado)
"""
try:
from gradio_client import Client, handle_file
# Conectar al space svision con timeout generoso para cold start de ZeroGPU
svision_url = os.getenv("SVISION_URL", "https://veureu-svision.hf.space")
print(f"[svision] Connectant a {svision_url}...")
# Autenticar con HuggingFace token para obtener más cuota de ZeroGPU
hf_token = os.getenv("HF_TOKEN")
if hf_token:
client = Client(svision_url, hf_token=hf_token)
print(f"[svision] Autenticat amb token HF")
else:
client = Client(svision_url)
print(f"[svision] Sense autenticació (cuota limitada)")
# Preparar prompt según el tipo
if is_face:
prompt = "Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta."
else:
prompt = "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals."
print(f"[svision] Enviant petició (pot trigar si ZeroGPU està en cold start)...")
print(f"[svision] Image path: {image_path}")
import time
start_time = time.time()
max_tokens = 256 if is_face else 128
max_attempts = int(os.getenv("SVISION_MAX_ATTEMPTS", "5"))
wait_seconds = int(os.getenv("SVISION_RETRY_WAIT", "5"))
result = None
last_error: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
print(f"[svision] Attempt {attempt}/{max_attempts} (wait={wait_seconds}s)")
result = client.predict(
handle_file(image_path),
prompt,
max_tokens,
0.7,
api_name="/describe"
)
if result and isinstance(result, str) and result.strip():
break
raise RuntimeError("Resposta buida de svision")
except Exception as exc:
last_error = exc
print(f"[svision] Error attempt {attempt}/{max_attempts}: {exc}")
if attempt == max_attempts:
raise
time.sleep(wait_seconds)
wait_seconds = min(wait_seconds * 2, 40)
elapsed = time.time() - start_time
print(f"[svision] Resposta rebuda en {elapsed:.1f}s")
full_description = result.strip() if result else ""
# PASO 1: Eliminar el prompt original que puede aparecer en la respuesta
prompt_markers = [
"Descriu aquesta persona. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.",
"Descriu aquesta escena. Inclou: tipus de localització (interior/exterior), elements principals, ambient, il·luminació.",
"Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.",
"Descriu aquesta persona.",
"Descriu aquesta escena.",
]
for marker in prompt_markers:
if marker in full_description:
# Eliminar el prompt y todo lo que esté antes
parts = full_description.split(marker, 1)
if len(parts) > 1:
full_description = parts[1].strip()
# PASO 2: Limpiar prefijos no deseados de forma más agresiva
# Lista de prefijos comunes que aparecen
prefixes_to_remove = [
"user:", "user ", "user\n", "user\t",
"assistant:", "assistant ", "assistant\n", "assistant\t",
"User:", "User ", "User\n",
"Assistant:", "Assistant ", "Assistant\n",
"system:", "system ",
]
# Intentar limpiar múltiples veces por si hay varios prefijos
for _ in range(5): # Máximo 5 iteraciones
original = full_description
for prefix in prefixes_to_remove:
if full_description.lower().startswith(prefix.lower()):
full_description = full_description[len(prefix):].strip()
break
if original == full_description:
break # No hubo cambios, salir
# PASO 3: Limpiar espacios en blanco múltiples y saltos de línea al inicio
full_description = full_description.lstrip()
# PASO 4: Si empieza con salto de línea o tabulación, limpiar
while full_description and full_description[0] in ['\n', '\t', '\r', ' ']:
full_description = full_description[1:]
if not full_description:
return ("", "")
# Generar nombre aleatorio en catalán para caras
if is_face:
# Extraer características clave para el nombre
desc_lower = full_description.lower()
# Determinar género
is_female = any(word in desc_lower for word in ["dona", "noia", "nena", "femení", "femenina"])
# Seleccionar nombre aleatorio pero consistente (hash del path)
import hashlib
hash_val = int(hashlib.md5(image_path.encode()).hexdigest(), 16)
noms_home, noms_dona = get_catalan_names()
if is_female:
name_list = noms_dona
else:
name_list = noms_home
# Usar hash para selección consistente
short_name = name_list[hash_val % len(name_list)]
else:
# Para escenas, extraer primeras palabras clave
words = full_description.split()[:4]
short_name = " ".join(words).capitalize()
print(f"[svision] Descripció generada: {full_description[:100]}...")
print(f"[svision] Nom: {short_name}")
return (full_description, short_name)
except Exception as e:
print(f"[svision] Error al descriure imatge: {e}")
import traceback
traceback.print_exc()
return ("", "")
def generate_short_scene_name(description: str) -> str:
"""
Genera un nombre corto de escena (< 3 palabras) basándose en la descripción
usando el modelo schat (Salamandra-Instruct).
Args:
description: Descripción completa de la escena de svision
Returns:
Nombre corto de la escena (< 3 palabras) o string vacío si falla
"""
try:
# Importar gradio_client
from gradio_client import Client, handle_file
# URL del space schat
schat_url = os.getenv("SCHAT_URL", "https://veureu-schat.hf.space")
print(f"[schat] Connectant a {schat_url}...")
# Autenticar con HuggingFace token
hf_token = os.getenv("HF_TOKEN")
if hf_token:
client = Client(schat_url, hf_token=hf_token)
print(f"[schat] Autenticat amb token HF")
else:
client = Client(schat_url)
print(f"[schat] Sense autenticació")
# Preparar prompt
prompt = f"Basant-te en aquesta descripció d'una escena, genera un nom curt de menys de 3 paraules que la resumeixi:\n\n{description}\n\nNom de l'escena:"
print(f"[schat] Generant nom curt per descripció: {description[:100]}...")
# Llamar al endpoint /predict de schat
# Parámetros típicos: mensaje, historial, max_new_tokens, temperature, top_p, top_k, repetition_penalty
result = client.predict(
prompt, # mensaje
[], # historial vacío
256, # max_new_tokens
0.7, # temperature
0.9, # top_p
50, # top_k
1.0, # repetition_penalty
api_name="/predict"
)
# El resultado es una tupla (respuesta, historial)
if isinstance(result, tuple) and len(result) >= 1:
short_name = result[0].strip() if result[0] else ""
elif isinstance(result, str):
short_name = result.strip()
else:
short_name = ""
# Limpiar posibles comillas o puntuación extra
short_name = short_name.strip('"\'.,!?').strip()
# Limpiar prefijos no deseados
prefixes_to_remove = [
"Nom de l'escena:",
"nom de l'escena:",
"Escena:",
"escena:",
]
for prefix in prefixes_to_remove:
if short_name.lower().startswith(prefix.lower()):
short_name = short_name[len(prefix):].strip()
# Limitar a 3 palabras
words = short_name.split()
if len(words) > 3:
short_name = " ".join(words[:3])
print(f"[schat] Nom curt generat: {short_name}")
return short_name
except Exception as e:
print(f"[schat] Error al generar nom curt: {e}")
import traceback
traceback.print_exc()
return ""