demo / page_modules /new_video_processing.py
VeuReu's picture
Upload 15 files
4208190 verified
raw
history blame
111 kB
"""UI logic for the "Processar vídeo nou" page - Recovered from backup with full functionality."""
from __future__ import annotations
import re
import shutil
import subprocess
import os
import time
import tempfile
import hashlib
from pathlib import Path
import sys
from datetime import datetime
import yaml
import sqlite3
import json
import zipfile
import io
import requests
import streamlit as st
from PIL import Image, ImageDraw
from databases import (
log_action,
has_video_approval_action,
upsert_audiodescription_text,
get_latest_user_phone_for_session,
insert_action,
ensure_video_row_for_upload,
is_video_input_ok,
update_video_status,
get_audiodescription,
)
from compliance_client import compliance_client
from persistent_data_gate import ensure_temp_databases, _load_data_origin, ensure_media_for_video
def get_all_catalan_names():
"""Retorna tots els noms catalans disponibles."""
noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert",
"Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"]
noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla",
"Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"]
return noms_home, noms_dona
def _log(msg: str) -> None:
"""Helper de logging a stderr amb timestamp (coherent amb auth.py)."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sys.stderr.write(f"[{ts}] {msg}\n")
sys.stderr.flush()
def get_catalan_name_for_speaker(speaker_label: int, used_names_home: list = None, used_names_dona: list = None) -> str:
"""Genera un nom català per a un speaker, reutilitzant noms de caras si estan disponibles."""
noms_home, noms_dona = get_all_catalan_names()
if used_names_home is None:
used_names_home = []
if used_names_dona is None:
used_names_dona = []
is_male = (speaker_label % 2 == 0)
if is_male:
if used_names_home:
idx = speaker_label // 2
return used_names_home[idx % len(used_names_home)]
else:
hash_val = hash(f"speaker_{speaker_label}")
return noms_home[abs(hash_val) % len(noms_home)]
else:
if used_names_dona:
idx = speaker_label // 2
return used_names_dona[idx % len(used_names_dona)]
else:
hash_val = hash(f"speaker_{speaker_label}")
return noms_dona[abs(hash_val) % len(noms_dona)]
def _get_video_duration(path: str) -> float:
"""Return video duration in seconds using ffprobe, ffmpeg or OpenCV as fallback."""
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
path,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return float(result.stdout.strip())
except (subprocess.CalledProcessError, ValueError, FileNotFoundError):
pass
if shutil.which("ffmpeg"):
try:
ffmpeg_cmd = ["ffmpeg", "-i", path]
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, check=False)
output = result.stderr or result.stdout or ""
match = re.search(r"Duration:\s*(\d+):(\d+):(\d+\.\d+)", output)
if match:
hours, minutes, seconds = match.groups()
total_seconds = (int(hours) * 3600) + (int(minutes) * 60) + float(seconds)
return float(total_seconds)
except FileNotFoundError:
pass
# Últim recurs: intentar amb OpenCV si està disponible
try:
import cv2
cap = cv2.VideoCapture(path)
if cap.isOpened():
fps = cap.get(cv2.CAP_PROP_FPS) or 0
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0
cap.release()
if fps > 0 and frame_count > 0:
return float(frame_count / fps)
else:
cap.release()
except Exception:
pass
return 0.0
def _transcode_video(input_path: str, output_path: str, max_duration: int | None = None) -> None:
cmd = ["ffmpeg", "-y", "-i", input_path]
if max_duration is not None:
cmd += ["-t", str(max_duration)]
cmd += [
"-c:v",
"libx264",
"-preset",
"veryfast",
"-crf",
"23",
"-c:a",
"aac",
"-movflags",
"+faststart",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or "ffmpeg failed")
def render_process_video_page(api, backend_base_url: str) -> None:
st.header("Processar un nou clip de vídeo")
# Llegir config.yaml (flags d'app i límits de media)
base_dir = Path(__file__).parent.parent
config_path = base_dir / "config.yaml"
manual_validation_enabled = True
max_size_mb = 20
max_duration_s = 30
video_validator_sms_enabled = False
skip_manual_validation_for_this_video = False
try:
if config_path.exists():
with config_path.open("r", encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
app_cfg = cfg.get("app", {}) or {}
manual_validation_enabled = bool(app_cfg.get("manual_validation_enabled", True))
media_cfg = cfg.get("media", {}) or {}
# Límits configurables de mida i durada
max_size_mb = int(media_cfg.get("max_size_mb", max_size_mb))
max_duration_s = int(media_cfg.get("max_duration_s", max_duration_s))
# Flags de validació / SMS de validador de vídeo
validation_cfg = cfg.get("validation", {}) or {}
video_validator_sms_enabled = bool(validation_cfg.get("video_validator_sms_enabled", False))
except Exception:
manual_validation_enabled = True
# CSS para estabilizar carruseles y evitar vibración del layout
st.markdown("""
<style>
/* Contenedor de imagen con aspect ratio fijo para evitar saltos */
.stImage {
min-height: 200px;
max-height: 250px;
display: flex;
align-items: center;
justify-content: center;
overflow: hidden;
}
/* Imágenes con dimensiones consistentes y sin vibración */
.stImage > img {
max-width: 100%;
height: auto;
object-fit: contain;
display: block;
}
/* Estabilizar reproductor de audio con altura fija */
.stAudio {
min-height: 54px;
max-height: 80px;
}
/* Caption con altura fija */
.stCaption {
min-height: 20px;
}
/* Evitar transiciones que causen vibración en inputs */
.stTextInput > div, .stTextArea > div {
transition: none !important;
}
/* Botones de navegación con tamaño consistente */
.stButton button {
transition: background-color 0.2s, color 0.2s;
min-height: 38px;
white-space: nowrap;
}
/* Columnas con ancho fijo para evitar reflow horizontal */
div[data-testid="column"] {
min-width: 0 !important;
flex-shrink: 0 !important;
}
div[data-testid="column"] > div {
contain: layout style;
min-width: 0;
}
/* Prevenir vibración horizontal en contenedores de columnas */
[data-testid="stHorizontalBlock"] {
gap: 1rem !important;
}
[data-testid="stHorizontalBlock"] > div {
flex-shrink: 0 !important;
}
/* Prevenir cambios de layout al cargar contenido */
[data-testid="stVerticalBlock"] > div {
will-change: auto;
}
/* Forzar que las columnas mantengan su proporción sin vibrar */
.row-widget.stHorizontalBlock {
width: 100% !important;
}
</style>
""", unsafe_allow_html=True)
msg_detect = st.empty()
msg_finalize = st.empty()
msg_ad = st.empty()
# Inicializar el estado de la página si no existe
if "video_uploaded" not in st.session_state:
st.session_state.video_uploaded = None
if "characters_detected" not in st.session_state:
st.session_state.characters_detected = None
if "audio_segments" not in st.session_state:
st.session_state.audio_segments = None
if "voice_labels" not in st.session_state:
st.session_state.voice_labels = None
if "face_labels" not in st.session_state:
st.session_state.face_labels = None
if "scene_clusters" not in st.session_state:
st.session_state.scene_clusters = None
if "scene_detection_done" not in st.session_state:
st.session_state.scene_detection_done = False
if "detect_done" not in st.session_state:
st.session_state.detect_done = False
if "casting_finalized" not in st.session_state:
st.session_state.casting_finalized = False
if "video_name_from_engine" not in st.session_state:
st.session_state.video_name_from_engine = None
if "diarization_info" not in st.session_state:
st.session_state.diarization_info = {}
if "characters_saved" not in st.session_state:
st.session_state.characters_saved = False
if "video_requires_validation" not in st.session_state:
st.session_state.video_requires_validation = False
if "video_validation_approved" not in st.session_state:
st.session_state.video_validation_approved = False
# --- 1. Subida del vídeo ---
MAX_SIZE_MB = max_size_mb
MAX_DURATION_S = max_duration_s
# Selector de visibilitat (privat/públic), a la dreta del uploader
if "video_visibility" not in st.session_state:
st.session_state.video_visibility = "Privat"
col_upload, col_vis = st.columns([3, 1])
with col_upload:
uploaded_file = st.file_uploader(
f"Puja un clip de vídeo (MP4, < {MAX_SIZE_MB}MB, < {MAX_DURATION_S} segons)",
type=["mp4"],
key="video_uploader",
)
with col_vis:
disabled_vis = st.session_state.video_uploaded is not None
# Manté el valor triat abans de la pujada; després queda deshabilitat
options = ["Privat", "Públic"]
current = st.session_state.get("video_visibility", "Privat")
try:
idx = options.index(current)
except ValueError:
idx = 0
st.selectbox(
"Visibilitat",
options,
index=idx,
key="video_visibility",
disabled=disabled_vis,
)
if uploaded_file is not None:
# Resetear el estado si se sube un nuevo archivo
if st.session_state.video_uploaded is None or uploaded_file.name != st.session_state.video_uploaded.get(
"original_name"
):
st.session_state.video_uploaded = {"original_name": uploaded_file.name, "status": "validating"}
st.session_state.characters_detected = None
st.session_state.characters_saved = False
if st.session_state.video_uploaded["status"] == "validating":
is_valid = True
if uploaded_file.size > MAX_SIZE_MB * 1024 * 1024:
st.error(f"El vídeo supera el límit de {MAX_SIZE_MB}MB.")
is_valid = False
if is_valid:
with st.spinner("Processant el vídeo..."):
temp_path = Path("temp_video.mp4")
with temp_path.open("wb") as f:
f.write(uploaded_file.getbuffer())
was_truncated = False
final_video_path = None
try:
duration = _get_video_duration(str(temp_path))
duration_unknown = False
if not duration:
st.warning(
f"No s'ha pogut obtenir la durada del vídeo. Es continuarà assumint un màxim de {MAX_DURATION_S} segons."
)
duration = float(MAX_DURATION_S)
duration_unknown = True
if is_valid:
if duration > MAX_DURATION_S:
was_truncated = True
video_name = Path(uploaded_file.name).stem
video_dir = Path("/tmp/data/videos") / video_name
video_dir.mkdir(parents=True, exist_ok=True)
# Guardem sempre el vídeo original com a "video.mp4" dins la carpeta
final_video_path = video_dir / "video.mp4"
try:
_transcode_video(
str(temp_path),
str(final_video_path),
MAX_DURATION_S if (was_truncated or duration_unknown) else None,
)
except RuntimeError as exc:
st.error(f"No s'ha pogut processar el vídeo: {exc}")
is_valid = False
if is_valid and final_video_path is not None:
video_bytes = uploaded_file.getvalue()
sha1 = hashlib.sha1(video_bytes).hexdigest()
st.session_state.video_uploaded.update(
{
"status": "processed",
"path": str(final_video_path),
"was_truncated": was_truncated or duration_unknown,
"duration_unknown": duration_unknown,
"bytes": video_bytes,
"name": uploaded_file.name,
"sha1sum": sha1,
}
)
# Si el vídeo ja està marcat com input-OK a videos.db, saltar validació
try:
if is_video_input_ok(sha1):
skip_manual_validation_for_this_video = True
# Assegurar que disposem de temp/media/<sha1>/video.mp4
base_dir = Path(__file__).parent.parent
api_client = st.session_state.get("api_client")
try:
ensure_media_for_video(base_dir, api_client, sha1)
except Exception as e_media:
_log(f"[MEDIA] Error assegurant media per a {sha1}: {e_media}")
except Exception as e_chk:
_log(f"[VIDEOS] Error comprovant status input-OK per a {sha1}: {e_chk}")
# Registre d'esdeveniment de pujada de vídeo a events.db i accions a actions.db/videos.db
try:
session_id = st.session_state.get("session_id", "")
ip = st.session_state.get("client_ip", "")
username = (
(st.session_state.get("user") or {}).get("username")
if st.session_state.get("user")
else ""
)
password = st.session_state.get("last_password", "")
phone = (
st.session_state.get("sms_phone_verified")
or st.session_state.get("sms_phone")
or ""
)
vis_choice = st.session_state.get("video_visibility", "Privat")
vis_flag = "public" if vis_choice.strip().lower().startswith("púb") else "private"
# 1) Registre a actions.db (acció bàsica)
log_action(
session=session_id,
user=username or "",
phone=phone,
action="upload",
sha1sum=sha1,
)
# 2) Determinar user/phone per a actions.db
actions_user, actions_phone = get_latest_user_phone_for_session(session_id)
if not actions_user:
actions_user = username or ""
if not actions_phone:
actions_phone = phone or ""
# 3) Inserir acció "Uploaded video" a actions.db (demo/temp/db/actions.db)
insert_action(
session=session_id,
user=actions_user,
phone=actions_phone,
action="Uploaded video",
sha1sum=sha1,
)
# 4) Assegurar fila a videos.db (demo/temp/db/videos.db) amb owner i status="input-pending"
ensure_video_row_for_upload(
sha1sum=sha1,
video_name=uploaded_file.name,
owner_phone=actions_phone,
status="input-pending",
visibility=vis_flag,
)
except Exception as e:
print(f"[events/actions] Error registrant pujada de vídeo: {e}")
# Guardar sempre el vídeo a demo/temp/pending_videos/<sha1>/video.mp4
# i, en mode external, enviar-lo també a pending_videos de l'engine
try:
base_dir = Path(__file__).parent.parent
data_origin = _load_data_origin(base_dir)
pending_root = base_dir / "temp" / "pending_videos" / sha1
pending_root.mkdir(parents=True, exist_ok=True)
local_pending_path = pending_root / "video.mp4"
# Guardar còpia local del vídeo pendent
with local_pending_path.open("wb") as f_pending:
f_pending.write(video_bytes)
if data_origin == "external":
# Enviar el vídeo al backend engine perquè aparegui a la llista de pendents
try:
resp_pending = api.upload_pending_video(video_bytes, uploaded_file.name)
_log(f"[pending_videos] upload_pending_video resp: {resp_pending}")
except Exception as e_up:
_log(f"[pending_videos] Error cridant upload_pending_video: {e_up}")
except Exception as e_ext:
_log(f"[pending_videos] Error bloc exterior upload_pending_video: {e_ext}")
# Marcar estat de validació segons la configuració de seguretat
if manual_validation_enabled and not skip_manual_validation_for_this_video:
st.session_state.video_requires_validation = True
st.session_state.video_validation_approved = False
# Notificar al validador per SMS només si està habilitat a config.yaml
if video_validator_sms_enabled:
try:
compliance_client.notify_video_upload(
video_name=uploaded_file.name,
sha1sum=sha1,
)
except Exception as sms_exc:
print(f"[VIDEO SMS] Error enviant notificació al validor: {sms_exc}")
else:
# Sense validació manual (o ja input-OK): es considera validat automàticament
st.session_state.video_requires_validation = False
st.session_state.video_validation_approved = True
st.rerun()
finally:
if temp_path.exists():
temp_path.unlink()
if st.session_state.video_uploaded and st.session_state.video_uploaded["status"] == "processed":
st.success(f"Vídeo '{st.session_state.video_uploaded['original_name']}' pujat i processat correctament.")
if st.session_state.video_uploaded["was_truncated"]:
st.warning(f"El vídeo s'ha truncat a {MAX_DURATION_S} segons.")
if manual_validation_enabled and st.session_state.get("video_requires_validation") and not st.session_state.get("video_validation_approved"):
st.info("Per favor, espera a la revisió humana del vídeo.")
# Comprovar si hi ha aprovació de vídeo a events.db per al sha1sum actual
current_sha1 = None
if st.session_state.get("video_uploaded"):
current_sha1 = st.session_state.video_uploaded.get("sha1sum")
if current_sha1 and st.session_state.get("video_requires_validation") and not st.session_state.get("video_validation_approved"):
if has_video_approval_action(current_sha1):
st.session_state.video_validation_approved = True
# Només podem continuar amb el càsting si el vídeo no requereix validació
# o si ja ha estat marcat com a validat.
can_proceed_casting = (
st.session_state.get("video_uploaded") is not None
and (
not st.session_state.get("video_requires_validation")
or st.session_state.get("video_validation_approved")
)
)
# --- 2. Form de detecció amb sliders ---
# Només es mostra quan ja hi ha un vídeo pujat **i** està validat (si cal validació).
if can_proceed_casting:
st.markdown("---")
with st.form("detect_form"):
col_btn, col_face, col_voice, col_scene = st.columns([1, 1, 1, 1])
with col_face:
st.markdown("**Cares**")
face_max_groups = st.slider("k-Target (cares)", 0, 10, 2, 1, key="face_max_groups")
face_min_cluster = st.slider("Mida mínima (cares)", 1, 5, 3, 1, key="face_min_cluster")
with col_voice:
st.markdown("**Veus**")
voice_max_groups = st.slider("k-Target (veus)", 0, 10, 2, 1, key="voice_max_groups")
voice_min_cluster = st.slider("Mida mínima (veus)", 1, 5, 1, 1, key="voice_min_cluster")
with col_scene:
st.markdown("**Escenes**")
scene_max_groups = st.slider("k-Target (escenes)", 0, 5, 2, 1, key="scene_max_groups")
scene_min_cluster = st.slider("Mida mínima (escenes)", 1, 20, 3, 1, key="scene_min_cluster")
with col_btn:
max_frames = st.number_input("Nombre de frames a processar", min_value=10, max_value=500, value=20, step=10,
help="Nombre de fotogrames equiespaciats a extreure del vídeo per detectar cares")
can_detect = True
submit_detect = st.form_submit_button("Detectar Personatges", disabled=not can_detect)
if not can_detect:
st.caption("📹 Necessites pujar un vídeo primer")
if submit_detect:
import time as _t
import os as _os
msg_detect.empty()
msg_finalize.empty()
msg_ad.empty()
try:
v = st.session_state.video_uploaded
# Reset estat abans de començar
st.session_state.scene_clusters = None
st.session_state.scene_detection_done = False
st.session_state.detect_done = False
st.session_state.casting_finalized = False
_log(f"[DETECT] Iniciando detección para vídeo: {v['name']}")
_log(f"[DETECT] Parámetros: face_k={face_max_groups}, face_min={face_min_cluster}, max_frames={max_frames}")
resp = api.create_initial_casting(
video_bytes=v["bytes"],
video_name=v["name"],
face_max_groups=face_max_groups,
face_min_cluster_size=face_min_cluster,
voice_max_groups=voice_max_groups,
voice_min_cluster_size=voice_min_cluster,
max_frames=max_frames,
)
_log(f"[DETECT] Respuesta create_initial_casting: {resp}")
if not isinstance(resp, dict) or not resp.get("job_id"):
_log(f"[DETECT] ERROR: No se recibió job_id válido")
msg_detect.error("No s'ha pogut crear el job al servidor. Torna-ho a intentar.")
else:
job_id = resp["job_id"]
_log(f"[DETECT] Job creado: {job_id}")
msg_detect.info(f"Job creat: {job_id}. Iniciant polling en 3s…")
with st.spinner("Processant al servidor…"):
_t.sleep(3)
attempt, max_attempts = 0, 120
progress_placeholder = st.empty()
while attempt < max_attempts:
stt = api.get_job(job_id)
status = stt.get("status")
if status in ("queued", "processing"):
if attempt % 10 == 0:
elapsed_min = (attempt * 5) // 60
progress_placeholder.info(f"⏳ Processant al servidor... (~{elapsed_min} min)")
_t.sleep(5)
attempt += 1
continue
if status == "failed":
progress_placeholder.empty()
msg_detect.error("El processament ha fallat al servidor.")
break
# Success
_log(f"[DETECT] Job completado. Status raw: {stt}")
res = stt.get("results", {})
_log(f"[DETECT] Results keys: {res.keys() if res else 'None'}")
chars = res.get("characters", [])
fl = res.get("face_labels", [])
segs = res.get("audio_segments", [])
vl = res.get("voice_labels", [])
base_dir = res.get("base_dir")
vname = _os.path.basename(base_dir) if base_dir else None
diar_info = res.get("diarization_info", {})
_log(f"[DETECT] Parsed: chars={len(chars)}, face_labels={len(fl)}, audio_segs={len(segs)}, voice_labels={len(vl)}")
if chars:
for i, c in enumerate(chars):
_log(f"[DETECT] Char[{i}]: id={c.get('id')}, num_faces={c.get('num_faces')}, files={c.get('face_files', [])[:3]}")
st.session_state.characters_detected = chars or []
st.session_state.face_labels = fl or []
st.session_state.audio_segments = segs or []
st.session_state.voice_labels = vl or []
st.session_state.video_name_from_engine = vname
st.session_state.engine_base_dir = base_dir
st.session_state.diarization_info = diar_info or {}
progress_placeholder.empty()
if chars:
msg_detect.success(
f"✓ Detecció completada! Trobades {len(chars)} cares.\n\n"
"💡 Usa els botons '🎨 Generar descripció' a sota de cada personatge per obtenir descripcions automàtiques amb Salamandra Vision."
)
else:
msg_detect.info("No s'han detectat cares en aquest vídeo.")
# Detect scenes
try:
scene_out = api.detect_scenes(
video_bytes=v["bytes"],
video_name=v["name"],
max_groups=scene_max_groups,
min_cluster_size=scene_min_cluster,
frame_interval_sec=0.5,
max_frames=max_frames,
)
scs = scene_out.get("scene_clusters") if isinstance(scene_out, dict) else None
if isinstance(scs, list):
st.session_state.scene_clusters = scs
else:
st.session_state.scene_clusters = []
except Exception:
st.session_state.scene_clusters = []
finally:
st.session_state.scene_detection_done = True
st.session_state.detect_done = True
msg_detect.success("✅ Processament completat!")
break
else:
progress_placeholder.empty()
msg_detect.warning(f"⏱️ El servidor no ha completat el job en {max_attempts * 5 // 60} minuts.")
except Exception as e:
msg_detect.error(f"Error inesperat: {e}")
# Botó per actualitzar manualment l'estat de validació del vídeo
# Només es mostra mentre el vídeo està pendent de validació humana
if (
st.session_state.get("video_uploaded")
and st.session_state.get("video_requires_validation")
and not st.session_state.get("video_validation_approved")
):
col_status, col_refresh = st.columns([3, 1])
with col_status:
st.caption("⏳ Vídeo pendent de validació humana.")
with col_refresh:
if st.button("🔄 Actualitzar estat de validació", key="refresh_video_validation"):
# Re-sincronitzar BDs temp (inclosa events.db) des de l'origen
try:
base_dir = Path(__file__).parent.parent
api_client = st.session_state.get("api_client")
ensure_temp_databases(base_dir, api_client)
except Exception:
pass
if current_sha1:
if has_video_approval_action(current_sha1):
st.session_state.video_validation_approved = True
st.success("✅ Vídeo validat. Pots continuar amb el càsting.")
else:
st.info("Encara no s'ha registrat cap aprovació per a aquest vídeo.")
# --- 3. Carruseles de cares ---
if st.session_state.get("characters_detected") is not None:
st.markdown("---")
n_face_clusters = len(st.session_state.get("characters_detected") or [])
st.subheader(f"🖼️ Cares — clústers: {n_face_clusters}")
if n_face_clusters == 0:
st.info("No s'han detectat clústers de cara en aquest clip.")
for idx, ch in enumerate(st.session_state.characters_detected or []):
try:
folder_name = Path(ch.get("folder") or "").name
except Exception:
folder_name = ""
char_id = ch.get("id") or folder_name or f"char{idx+1}"
def _safe_key(s: str) -> str:
k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "")
return k or f"cluster_{idx+1}"
key_prefix = _safe_key(f"char_{idx+1}_{char_id}")
if f"{key_prefix}_idx" not in st.session_state:
st.session_state[f"{key_prefix}_idx"] = 0
if f"{key_prefix}_discard" not in st.session_state:
st.session_state[f"{key_prefix}_discard"] = set()
faces_all = ch.get("face_files") or ([ch.get("image_url")] if ch.get("image_url") else [])
faces_all = [f for f in faces_all if f]
discard_set = st.session_state[f"{key_prefix}_discard"]
faces = [f for f in faces_all if f not in discard_set]
if not faces:
st.write(f"- {idx+1}. {ch.get('name','(sense nom)')} — sense imatges seleccionades")
continue
cur = st.session_state[f"{key_prefix}_idx"]
if cur >= len(faces):
cur = 0
st.session_state[f"{key_prefix}_idx"] = cur
fname = faces[cur]
if fname.startswith("/files/"):
img_url = f"{backend_base_url}/preprocessing{fname}"
else:
base = ch.get("image_url") or ""
base_dir = "/".join((base or "/").split("/")[:-1])
img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}"
st.markdown(f"**{idx+1}. {ch.get('name','(sense nom)')}{ch.get('num_faces', 0)} cares**")
spacer_col, main_content_col = st.columns([0.12, 0.88])
with spacer_col:
st.write("")
with main_content_col:
media_col, form_col = st.columns([1.3, 2.7])
with media_col:
st.image(img_url, width=180)
st.caption(f"Imatge {cur+1}/{len(faces)}")
nav_prev, nav_del, nav_next = st.columns(3)
with nav_prev:
if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"):
st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(faces)
st.rerun()
with nav_del:
if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquesta imatge del clúster"):
st.session_state[f"{key_prefix}_discard"].add(fname)
new_list = [f for f in faces if f != fname]
new_idx = cur if cur < len(new_list) else 0
st.session_state[f"{key_prefix}_idx"] = new_idx
st.rerun()
with nav_next:
if st.button("➡️", key=f"next_{key_prefix}", help="Següent"):
st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(faces)
st.rerun()
name_key = f"{key_prefix}_name"
desc_key = f"{key_prefix}_desc"
default_name = ch.get("name", "")
default_desc = ch.get("description", "")
if default_name and (name_key not in st.session_state or not st.session_state.get(name_key)):
st.session_state[name_key] = default_name
elif name_key not in st.session_state:
st.session_state[name_key] = default_name or ""
if default_desc and (desc_key not in st.session_state or not st.session_state.get(desc_key)):
st.session_state[desc_key] = default_desc
elif desc_key not in st.session_state:
st.session_state[desc_key] = default_desc or ""
pending_desc_key = f"{key_prefix}_pending_desc"
pending_name_key = f"{key_prefix}_pending_name"
if pending_desc_key in st.session_state:
if desc_key not in st.session_state:
st.session_state[desc_key] = ""
st.session_state[desc_key] = st.session_state[pending_desc_key]
del st.session_state[pending_desc_key]
if pending_name_key in st.session_state:
if name_key not in st.session_state:
st.session_state[name_key] = ""
if not st.session_state.get(name_key):
st.session_state[name_key] = st.session_state[pending_name_key]
del st.session_state[pending_name_key]
with form_col:
st.text_input("Nom del clúster", key=name_key)
st.text_area("Descripció", key=desc_key, height=80)
if st.button("🎨 Generar descripció amb Salamandra Vision", key=f"svision_{key_prefix}"):
with st.spinner("Generant descripció..."):
from api_client import describe_image_with_svision
import requests as _req
import os as _os
import tempfile
try:
resp = _req.get(img_url, timeout=10)
if resp.status_code == 200:
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(resp.content)
tmp_path = tmp.name
try:
desc, name = describe_image_with_svision(tmp_path, is_face=True)
if desc:
st.session_state[pending_desc_key] = desc
st.success("✅ Descripció generada!")
print(f"[SVISION] Descripció generada per {char_id}: {desc[:100]}")
else:
st.warning("⚠️ No s'ha pogut generar una descripció.")
print(f"[SVISION] Descripció buida per {char_id}")
if name and not st.session_state.get(name_key):
st.session_state[pending_name_key] = name
print(f"[SVISION] Nom generat per {char_id}: {name}")
finally:
# Always clean up the temp file
try:
_os.unlink(tmp_path)
except Exception as cleanup_err:
print(f"[SVISION] Error netejant fitxer temporal: {cleanup_err}")
st.rerun()
else:
st.error(f"No s'ha pogut descarregar la imatge (status: {resp.status_code})")
except Exception as e:
st.error(f"Error generant descripció: {str(e)}")
print(f"[SVISION] Error complet: {e}")
import traceback
traceback.print_exc()
# --- 4. Carruseles de veus ---
if st.session_state.get("audio_segments") is not None:
st.markdown("---")
used_names_home = []
used_names_dona = []
noms_home_all, noms_dona_all = get_all_catalan_names()
for ch in (st.session_state.characters_detected or []):
ch_name = ch.get("name", "")
if ch_name in noms_home_all:
used_names_home.append(ch_name)
elif ch_name in noms_dona_all:
used_names_dona.append(ch_name)
segs = st.session_state.audio_segments or []
vlabels = st.session_state.voice_labels or []
valid_indices = [i for i, l in enumerate(vlabels) if isinstance(l, int) and l >= 0]
clusters = {}
for i in valid_indices:
lbl = int(vlabels[i])
clusters.setdefault(lbl, []).append(i)
n_vclusters = len(clusters)
st.subheader(f"🎙️ Empremtes de veu — clústers: {n_vclusters}")
di = st.session_state.get("diarization_info") or {}
if isinstance(di, dict) and not di.get("diarization_ok", True):
st.warning("No s'ha pogut fer la diarització amb pyannote (s'ha aplicat un sol segment de reserva).")
if not segs:
st.info("No s'han detectat mostres de veu.")
elif n_vclusters == 0:
st.info("No s'han format clústers de veu.")
else:
vname = st.session_state.video_name_from_engine
for lbl, idxs in sorted(clusters.items(), key=lambda x: x[0]):
key_prefix = f"voice_{lbl:02d}"
if f"{key_prefix}_idx" not in st.session_state:
st.session_state[f"{key_prefix}_idx"] = 0
if f"{key_prefix}_discard" not in st.session_state:
st.session_state[f"{key_prefix}_discard"] = set()
discard_set = st.session_state[f"{key_prefix}_discard"]
files = []
for i in idxs:
clip_local = (segs[i] or {}).get("clip_path")
fname = os.path.basename(clip_local) if clip_local else None
if fname:
files.append(fname)
files = [f for f in files if f and f not in discard_set]
if not files:
st.write(f"- SPEAKER_{lbl:02d} — sense clips seleccionats")
continue
cur = st.session_state[f"{key_prefix}_idx"]
if cur >= len(files):
cur = 0
st.session_state[f"{key_prefix}_idx"] = cur
fname = files[cur]
audio_url = f"{backend_base_url}/preprocessing/audio/{vname}/{fname}" if (vname and fname) else None
st.markdown(f"**SPEAKER_{lbl:02d}{len(files)} clips**")
c1, c2 = st.columns([1, 2])
with c1:
if audio_url:
st.audio(audio_url, format="audio/wav")
st.caption(f"Clip {cur+1}/{len(files)}")
bcol1, bcol2, bcol3 = st.columns(3)
with bcol1:
if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"):
st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(files)
st.rerun()
with bcol2:
if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquest clip del clúster"):
st.session_state[f"{key_prefix}_discard"].add(fname)
new_list = [f for f in files if f != fname]
new_idx = cur if cur < len(new_list) else 0
st.session_state[f"{key_prefix}_idx"] = new_idx
st.rerun()
with bcol3:
if st.button("➡️", key=f"next_{key_prefix}", help="Següent"):
st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(files)
st.rerun()
with c2:
name_key = f"{key_prefix}_name"
desc_key = f"{key_prefix}_desc"
default_name = get_catalan_name_for_speaker(lbl, used_names_home, used_names_dona)
st.text_input("Nom del clúster", value=st.session_state.get(name_key, default_name), key=name_key)
st.text_area("Descripció", value=st.session_state.get(desc_key, ""), key=desc_key, height=80)
# --- 5. Carruseles de escenas ---
if st.session_state.get("scene_detection_done"):
st.markdown("---")
scene_clusters = st.session_state.get("scene_clusters")
n_scenes = len(scene_clusters or [])
st.subheader(f"📍 Escenes — clústers: {n_scenes}")
if not scene_clusters:
st.info("No s'han detectat clústers d'escenes en aquest clip.")
else:
for sidx, sc in enumerate(scene_clusters):
try:
folder_name = Path(sc.get("folder") or "").name
except Exception:
folder_name = ""
scene_id = sc.get("id") or folder_name or f"scene{sidx+1}"
key_prefix = re.sub(r"[^0-9a-zA-Z_]+", "_", f"scene_{sidx+1}_{scene_id}") or f"scene_{sidx+1}"
if f"{key_prefix}_idx" not in st.session_state:
st.session_state[f"{key_prefix}_idx"] = 0
if f"{key_prefix}_discard" not in st.session_state:
st.session_state[f"{key_prefix}_discard"] = set()
frames_all = sc.get("frame_files") or ([sc.get("image_url")] if sc.get("image_url") else [])
frames_all = [f for f in frames_all if f]
discard_set = st.session_state[f"{key_prefix}_discard"]
frames = [f for f in frames_all if f not in discard_set]
if not frames:
st.write(f"- {sidx+1}. (sense imatges de l'escena)")
continue
cur = st.session_state[f"{key_prefix}_idx"]
if cur >= len(frames):
cur = 0
st.session_state[f"{key_prefix}_idx"] = cur
fname = frames[cur]
if str(fname).startswith("/files/"):
img_url = f"{backend_base_url}/preprocessing{fname}"
else:
base = sc.get("image_url") or ""
base_dir = "/".join((base or "/").split("/")[:-1])
img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}"
st.markdown(f"**{sidx+1}. Escena — {sc.get('num_frames', 0)} frames**")
spacer_col, main_content_col = st.columns([0.12, 0.88])
with spacer_col:
st.write("")
with main_content_col:
media_col, form_col = st.columns([1.4, 2.6])
with media_col:
st.image(img_url, width=220)
st.caption(f"Imatge {cur+1}/{len(frames)}")
nav_prev, nav_del, nav_next = st.columns(3)
with nav_prev:
if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"):
st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(frames)
st.rerun()
with nav_del:
if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquesta imatge del clúster"):
st.session_state[f"{key_prefix}_discard"].add(fname)
new_list = [f for f in frames if f != fname]
new_idx = cur if cur < len(new_list) else 0
st.session_state[f"{key_prefix}_idx"] = new_idx
st.rerun()
with nav_next:
if st.button("➡️", key=f"next_{key_prefix}", help="Següent"):
st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(frames)
st.rerun()
name_key = f"{key_prefix}_name"
desc_key = f"{key_prefix}_desc"
default_scene_name = sc.get("name", "")
default_scene_desc = sc.get("description", "")
if default_scene_name and (name_key not in st.session_state or not st.session_state.get(name_key)):
st.session_state[name_key] = default_scene_name
elif name_key not in st.session_state:
st.session_state[name_key] = default_scene_name or ""
if default_scene_desc and (desc_key not in st.session_state or not st.session_state.get(desc_key)):
st.session_state[desc_key] = default_scene_desc
elif desc_key not in st.session_state:
st.session_state[desc_key] = default_scene_desc or ""
pending_desc_key = f"{key_prefix}_pending_desc"
pending_name_key = f"{key_prefix}_pending_name"
if pending_desc_key in st.session_state:
if desc_key not in st.session_state:
st.session_state[desc_key] = ""
st.session_state[desc_key] = st.session_state[pending_desc_key]
del st.session_state[pending_desc_key]
if pending_name_key in st.session_state:
if name_key not in st.session_state:
st.session_state[name_key] = ""
if not st.session_state.get(name_key):
st.session_state[name_key] = st.session_state[pending_name_key]
del st.session_state[pending_name_key]
with form_col:
st.text_input("Nom del clúster", key=name_key)
st.text_area("Descripció", key=desc_key, height=80)
if st.button("🎨 Generar descripció amb Salamandra Vision", key=f"svision_{key_prefix}"):
with st.spinner("Generant descripció..."):
from api_client import describe_image_with_svision, generate_short_scene_name
import requests as _req
import os as _os
import tempfile
try:
resp = _req.get(img_url, timeout=10)
if resp.status_code == 200:
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(resp.content)
tmp_path = tmp.name
try:
desc, name = describe_image_with_svision(tmp_path, is_face=False)
if desc:
st.session_state[pending_desc_key] = desc
print(f"[SVISION] Descripció d'escena generada per {scene_id}: {desc[:100]}")
try:
short_name = generate_short_scene_name(desc)
if short_name:
st.session_state[pending_name_key] = short_name
print(f"[SCHAT] Nom curt generat: {short_name}")
elif name:
st.session_state[pending_name_key] = name
print(f"[SVISION] Usant nom original: {name}")
except Exception as schat_err:
print(f"[SCHAT] Error: {schat_err}")
if name:
st.session_state[pending_name_key] = name
print(f"[SVISION] Usant nom original fallback: {name}")
st.success("✅ Descripció i nom generats!")
else:
st.warning("⚠️ No s'ha pogut generar una descripció.")
print(f"[SVISION] Descripció d'escena buida per {scene_id}")
finally:
# Always clean up the temp file
try:
_os.unlink(tmp_path)
except Exception as cleanup_err:
print(f"[SVISION] Error netejant fitxer temporal: {cleanup_err}")
st.rerun()
else:
st.error(f"No s'ha pogut descarregar la imatge (status: {resp.status_code})")
except Exception as e:
st.error(f"Error generant descripció: {str(e)}")
print(f"[SVISION] Error complet: {e}")
import traceback
traceback.print_exc()
# --- 6. Confirmación de casting y personajes combinados ---
if st.session_state.get("detect_done"):
st.markdown("---")
colc1, colc2 = st.columns([1,1])
with colc1:
if st.button("Confirmar càsting definitiu", type="primary"):
chars_payload = []
for idx, ch in enumerate(st.session_state.characters_detected or []):
try:
folder_name = Path(ch.get("folder") or "").name
except Exception:
folder_name = ""
char_id = ch.get("id") or folder_name or f"char{idx+1}"
def _safe_key(s: str) -> str:
k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "")
return k or f"cluster_{idx+1}"
key_prefix = _safe_key(f"char_{idx+1}_{char_id}")
name = st.session_state.get(f"{key_prefix}_name") or ch.get("name") or f"Personatge {idx+1}"
desc = st.session_state.get(f"{key_prefix}_desc", "")
faces_all = ch.get("face_files") or []
discard = st.session_state.get(f"{key_prefix}_discard", set())
kept = [f for f in faces_all if f and f not in discard]
chars_payload.append({
"id": char_id,
"name": name,
"description": desc,
"folder": ch.get("folder"),
"kept_files": kept,
})
used_names_home_fin = []
used_names_dona_fin = []
noms_home_all, noms_dona_all = get_all_catalan_names()
for cp in chars_payload:
face_name = cp.get("name", "")
if face_name in noms_home_all:
used_names_home_fin.append(face_name)
elif face_name in noms_dona_all:
used_names_dona_fin.append(face_name)
segs = st.session_state.audio_segments or []
vlabels = st.session_state.voice_labels or []
vname = st.session_state.video_name_from_engine
voice_clusters = {}
for i, seg in enumerate(segs):
lbl = vlabels[i] if i < len(vlabels) else -1
# Només considerem clústers de veu amb etiqueta vàlida (enter >= 0)
if not (isinstance(lbl, int) and lbl >= 0):
continue
clip_local = seg.get("clip_path")
fname = os.path.basename(clip_local) if clip_local else None
if fname:
default_voice_name = get_catalan_name_for_speaker(int(lbl), used_names_home_fin, used_names_dona_fin)
voice_clusters.setdefault(lbl, {"label": lbl, "name": default_voice_name, "description": "", "clips": []})
vpref = f"voice_{int(lbl):02d}"
vname_custom = st.session_state.get(f"{vpref}_name")
vdesc_custom = st.session_state.get(f"{vpref}_desc")
if vname_custom:
voice_clusters[lbl]["name"] = vname_custom
if vdesc_custom is not None:
voice_clusters[lbl]["description"] = vdesc_custom
voice_clusters[lbl]["clips"].append(fname)
payload = {
"video_name": vname,
"base_dir": st.session_state.get("engine_base_dir"),
"characters": chars_payload,
"voice_clusters": list(voice_clusters.values()),
}
if not payload["video_name"] or not payload["base_dir"]:
st.error("Falten dades del vídeo per confirmar el càsting (video_name/base_dir). Torna a processar el vídeo.")
else:
with st.spinner("Consolidant càsting al servidor…"):
res_fc = api.finalize_casting(payload)
if isinstance(res_fc, dict) and res_fc.get("ok"):
st.success(f"Càsting consolidat. Identities: {len(res_fc.get('face_identities', []))} cares, {len(res_fc.get('voice_identities', []))} veus.")
st.session_state.casting_finalized = True
# Guardar casting_json localment per a futurs processos (p.ex. audiodescripció)
try:
casting_json = res_fc.get("casting_json") or {}
v = st.session_state.get("video_uploaded") or {}
sha1 = v.get("sha1sum")
if casting_json and sha1:
base_dir = Path(__file__).parent.parent / "temp" / "media" / sha1
base_dir.mkdir(parents=True, exist_ok=True)
casting_path = base_dir / "casting.json"
with casting_path.open("w", encoding="utf-8") as f:
json.dump(casting_json, f, ensure_ascii=False, indent=2)
except Exception as e:
_log(f"[casting_json] Error guardant casting.json: {e}")
f_id = res_fc.get('face_identities', []) or []
v_id = res_fc.get('voice_identities', []) or []
c3, c4 = st.columns(2)
with c3:
st.markdown("**Identitats de cara**")
for n in f_id:
st.write(f"- {n}")
with c4:
st.markdown("**Identitats de veu**")
for n in v_id:
st.write(f"- {n}")
faces_dir = res_fc.get('faces_dir')
voices_dir = res_fc.get('voices_dir')
db_dir = res_fc.get('db_dir')
with st.spinner("Carregant índexs al cercador (Chroma)…"):
load_res = api.load_casting(faces_dir=faces_dir, voices_dir=voices_dir, db_dir=db_dir, drop_collections=True)
if isinstance(load_res, dict) and load_res.get('ok'):
st.success(f"Índexs carregats: {load_res.get('faces', 0)} cares, {load_res.get('voices', 0)} veus.")
else:
st.error(f"Error carregant índexs: {load_res}")
else:
# Tractament específic per al cas 404 (endpoint inexistent al engine)
if isinstance(res_fc, dict) and res_fc.get("status_code") == 404:
st.error(
"No s'ha pogut consolidar el càsting perquè l'endpoint "
"\"/finalize_casting\" no està disponible al servidor d'engine. "
"Aquesta funcionalitat encara no està implementada o està desactivada."
)
else:
st.error("No s'ha pogut consolidar el càsting per un error al servidor.")
# --- Personatges combinats (cares + veus) ---
if st.session_state.get("casting_finalized"):
st.markdown("---")
st.subheader("👥 Personatges")
def normalize_name(name: str) -> str:
import unicodedata
name_upper = name.upper()
name_normalized = ''.join(
c for c in unicodedata.normalize('NFD', name_upper)
if unicodedata.category(c) != 'Mn'
)
return name_normalized
chars_payload = []
for idx, ch in enumerate(st.session_state.characters_detected or []):
try:
folder_name = Path(ch.get("folder") or "").name
except Exception:
folder_name = ""
char_id = ch.get("id") or folder_name or f"char{idx+1}"
def _safe_key(s: str) -> str:
k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "")
return k or f"cluster_{idx+1}"
key_prefix = _safe_key(f"char_{idx+1}_{char_id}")
name = st.session_state.get(f"{key_prefix}_name") or ch.get("name") or f"Personatge {idx+1}"
name_normalized = normalize_name(name)
desc = st.session_state.get(f"{key_prefix}_desc", "").strip()
chars_payload.append({
"name": name,
"name_normalized": name_normalized,
"face_key_prefix": key_prefix,
"face_files": ch.get("face_files") or [],
"char_data": ch,
"description": desc,
})
used_names_home_pers = []
used_names_dona_pers = []
noms_home_all, noms_dona_all = get_all_catalan_names()
for cp in chars_payload:
face_name = cp.get("name", "")
if face_name in noms_home_all:
used_names_home_pers.append(face_name)
elif face_name in noms_dona_all:
used_names_dona_pers.append(face_name)
segs = st.session_state.audio_segments or []
vlabels = st.session_state.voice_labels or []
vname = st.session_state.video_name_from_engine
voice_clusters_by_name = {}
for i, seg in enumerate(segs):
lbl = vlabels[i] if i < len(vlabels) else -1
if not (isinstance(lbl, int) and lbl >= 0):
continue
vpref = f"voice_{int(lbl):02d}"
default_voice_name = get_catalan_name_for_speaker(int(lbl), used_names_home_pers, used_names_dona_pers) if isinstance(lbl, int) and lbl >= 0 else f"SPEAKER_{int(lbl):02d}"
vname_custom = st.session_state.get(f"{vpref}_name") or default_voice_name
vname_normalized = normalize_name(vname_custom)
vdesc = st.session_state.get(f"{vpref}_desc", "").strip()
clip_local = seg.get("clip_path")
fname = os.path.basename(clip_local) if clip_local else None
if fname:
voice_clusters_by_name.setdefault(vname_normalized, {
"voice_key_prefix": vpref,
"clips": [],
"label": lbl,
"original_name": vname_custom,
"description": vdesc,
})
voice_clusters_by_name[vname_normalized]["clips"].append(fname)
all_normalized_names = set([c["name_normalized"] for c in chars_payload] + list(voice_clusters_by_name.keys()))
for pidx, norm_name in enumerate(sorted(all_normalized_names)):
face_items = [c for c in chars_payload if c["name_normalized"] == norm_name]
voice_data = voice_clusters_by_name.get(norm_name)
display_name = face_items[0]["name"] if face_items else (voice_data["original_name"] if voice_data else norm_name)
descriptions = []
for face_item in face_items:
if face_item["description"]:
descriptions.append(face_item["description"])
if voice_data and voice_data.get("description"):
descriptions.append(voice_data["description"])
combined_description = "\n".join(descriptions) if descriptions else ""
st.markdown(f"**{pidx+1}. {display_name}**")
all_faces = []
for face_item in face_items:
all_faces.extend(face_item["face_files"])
face_data = face_items[0] if face_items else None
col_faces, col_voices, col_text = st.columns([1, 1, 1.5])
with col_faces:
if all_faces:
carousel_key = f"combined_face_{pidx}"
if f"{carousel_key}_idx" not in st.session_state:
st.session_state[f"{carousel_key}_idx"] = 0
cur = st.session_state[f"{carousel_key}_idx"]
if cur >= len(all_faces):
cur = 0
st.session_state[f"{carousel_key}_idx"] = cur
fname = all_faces[cur]
ch = face_data["char_data"] if face_data else {}
if fname.startswith("/files/"):
img_url = f"{backend_base_url}/preprocessing{fname}"
else:
base = ch.get("image_url") or ""
base_dir = "/".join((base or "/").split("/")[:-1])
img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}"
st.image(img_url, width=150)
st.caption(f"Cara {cur+1}/{len(all_faces)}")
bcol1, bcol2 = st.columns(2)
with bcol1:
if st.button("⬅️", key=f"combined_face_prev_{pidx}"):
st.session_state[f"{carousel_key}_idx"] = (cur - 1) % len(all_faces)
st.rerun()
with bcol2:
if st.button("➡️", key=f"combined_face_next_{pidx}"):
st.session_state[f"{carousel_key}_idx"] = (cur + 1) % len(all_faces)
st.rerun()
else:
st.info("Sense imatges")
with col_voices:
if voice_data:
clips = voice_data["clips"]
if clips:
carousel_key = f"combined_voice_{pidx}"
if f"{carousel_key}_idx" not in st.session_state:
st.session_state[f"{carousel_key}_idx"] = 0
cur = st.session_state[f"{carousel_key}_idx"]
if cur >= len(clips):
cur = 0
st.session_state[f"{carousel_key}_idx"] = cur
fname = clips[cur]
audio_url = f"{backend_base_url}/preprocessing/audio/{vname}/{fname}" if (vname and fname) else None
if audio_url:
st.audio(audio_url, format="audio/wav")
st.caption(f"Veu {cur+1}/{len(clips)}")
bcol1, bcol2 = st.columns(2)
with bcol1:
if st.button("⬅️", key=f"combined_voice_prev_{pidx}"):
st.session_state[f"{carousel_key}_idx"] = (cur - 1) % len(clips)
st.rerun()
with bcol2:
if st.button("➡️", key=f"combined_voice_next_{pidx}"):
st.session_state[f"{carousel_key}_idx"] = (cur + 1) % len(clips)
st.rerun()
else:
st.info("Sense clips de veu")
else:
st.info("Sense dades de veu")
with col_text:
combined_name_key = f"combined_char_{pidx}_name"
combined_desc_key = f"combined_char_{pidx}_desc"
if combined_name_key not in st.session_state:
st.session_state[combined_name_key] = norm_name
if combined_desc_key not in st.session_state:
st.session_state[combined_desc_key] = combined_description
st.text_input("Nom del personatge", key=combined_name_key, label_visibility="collapsed", placeholder="Nom del personatge")
st.text_area("Descripció", key=combined_desc_key, height=120, label_visibility="collapsed", placeholder="Descripció del personatge")
# --- 7. Generar audiodescripció ---
st.markdown("---")
if st.button("🎬 Generar audiodescripció", type="primary", use_container_width=True):
v = st.session_state.get("video_uploaded")
if not v:
st.error("No hi ha cap vídeo carregat.")
else:
progress_placeholder = st.empty()
result_placeholder = st.empty()
with st.spinner("Generant audiodescripció... Aquest procés pot trigar diversos minuts."):
progress_placeholder.info("⏳ Processant vídeo i generant audiodescripció...")
try:
sha1 = v.get("sha1sum")
if not sha1:
result_placeholder.error("Falta sha1sum del vídeo per generar l'audiodescripció.")
return
base_media_dir = Path(__file__).parent.parent / "temp" / "media" / sha1
base_media_dir.mkdir(parents=True, exist_ok=True)
# 1) Carregar i enviar el casting_json com a embeddings al engine
casting_json = None
try:
casting_path = base_media_dir / "casting.json"
if casting_path.exists():
with casting_path.open("r", encoding="utf-8") as f:
casting_json = json.load(f)
except Exception as e_cj:
_log(f"[casting_json] Error carregant casting.json: {e_cj}")
if casting_json:
try:
upload_res = api.upload_embeddings(sha1, casting_json)
_log(f"[embeddings] upload_embeddings resp: {upload_res}")
except Exception as e_up:
_log(f"[embeddings] Error pujant embeddings a engine: {e_up}")
# 2) Pipeline inicial: generate_initial_srt_and_info + descarregar fitxers
try:
init_resp = api.generate_initial_srt_and_info(sha1)
_log(f"[initial] generate_initial_srt_and_info resp: {init_resp}")
except Exception as e_init:
_log(f"[initial] Error cridant generate_initial_srt_and_info: {e_init}")
init_resp = {"error": str(e_init)}
if isinstance(init_resp, dict) and init_resp.get("error"):
result_placeholder.error(f"❌ Error al pipeline inicial: {init_resp.get('error')}")
return
# Descarregar i guardar initial.srt
init_srt_text = ""
init_info_text = ""
try:
srt_resp = api.download_initial_srt(sha1)
if isinstance(srt_resp, dict) and not srt_resp.get("error"):
init_srt_text = srt_resp.get("text", "") or ""
initial_srt_path = base_media_dir / "initial.srt"
with initial_srt_path.open("w", encoding="utf-8") as f_srt:
f_srt.write(init_srt_text)
_log(f"[initial] initial.srt desat a {initial_srt_path}")
else:
_log(f"[initial] Error descarregant initial.srt: {srt_resp}")
except Exception as e_srt:
_log(f"[initial] Excepció descarregant initial.srt: {e_srt}")
# Descarregar i guardar info.json
try:
info_resp = api.download_initial_info(sha1)
if isinstance(info_resp, dict) and not info_resp.get("error"):
init_info_text = info_resp.get("text", "") or ""
info_path = base_media_dir / "info.json"
with info_path.open("w", encoding="utf-8") as f_info:
f_info.write(init_info_text)
_log(f"[initial] info.json desat a {info_path}")
else:
_log(f"[initial] Error descarregant info.json: {info_resp}")
except Exception as e_info:
_log(f"[initial] Excepció descarregant info.json: {e_info}")
# 3) Llegir config.yaml per saber quines versions i refinement cal generar
salamandra_enabled = True
moe_enabled = True
reflection_enabled = True
reflexion_enabled = False
introspection_enabled = False
twilio_enabled_cfg = False
zapier_enabled_cfg = False
une_validator_sms_enabled = False
une_phone_validator = ""
try:
base_dir_cfg = Path(__file__).parent.parent
cfg_path = base_dir_cfg / "config.yaml"
if cfg_path.exists():
with cfg_path.open("r", encoding="utf-8") as f_cfg:
cfg = yaml.safe_load(f_cfg) or {}
ver_cfg = cfg.get("versions", {}) or {}
salamandra_enabled = bool(ver_cfg.get("Salamandra_enabled", True))
moe_enabled = bool(ver_cfg.get("MoE_enabled", True))
ref_cfg = cfg.get("refinement", {}) or {}
reflection_enabled = bool(ref_cfg.get("reflection_enabled", True))
reflexion_enabled = bool(ref_cfg.get("reflexion_enabled", False))
introspection_enabled = bool(ref_cfg.get("introspection_enabled", False))
auto_cfg = cfg.get("automation", {}) or {}
twilio_enabled_cfg = bool(auto_cfg.get("twilio_enabled", False))
zapier_enabled_cfg = bool(auto_cfg.get("zapier_enabled", False))
val_cfg = cfg.get("validation", {}) or {}
une_validator_sms_enabled = bool(val_cfg.get("une_validator_sms_enabled", False))
une_phone_validator = str(val_cfg.get("une_phone_validator") or "").strip()
except Exception as e_cfg:
_log(f"[config] Error llegint config.yaml: {e_cfg}")
# Dades comunes per a esdeveniments
session_id = st.session_state.get("session_id", "")
ip = st.session_state.get("client_ip", "")
username = (
(st.session_state.get("user") or {}).get("username")
if st.session_state.get("user")
else ""
)
password = st.session_state.get("last_password", "")
phone = (
st.session_state.get("sms_phone_verified")
or st.session_state.get("sms_phone")
or ""
)
vis_choice = st.session_state.get("video_visibility", "Privat")
vis_flag = "public" if vis_choice.strip().lower().startswith("púb") else "private"
any_success = False
refined_any = False
# 4) Salamandra
if salamandra_enabled:
progress_placeholder.info("🐍 Generant versió Salamandra...")
try:
gen_resp = api.generate_salamandra_result(sha1)
_log(f"[Salamandra] generate_salamandra_result resp: {gen_resp}")
except Exception as e_gen_s:
_log(f"[Salamandra] Error cridant generate_salamandra_result: {e_gen_s}")
gen_resp = {"error": str(e_gen_s)}
if isinstance(gen_resp, dict) and gen_resp.get("error"):
_log(f"[Salamandra] Error en generació: {gen_resp.get('error')}")
else:
salamandra_srt = ""
salamandra_free = ""
try:
srt_s = api.download_salamandra_srt(sha1)
if isinstance(srt_s, dict) and not srt_s.get("error"):
salamandra_srt = srt_s.get("text", "") or ""
sal_dir = base_media_dir / "Salamandra"
sal_dir.mkdir(parents=True, exist_ok=True)
sal_srt_path = sal_dir / "result.srt"
with sal_srt_path.open("w", encoding="utf-8") as f_ss:
f_ss.write(salamandra_srt)
_log(f"[Salamandra] result.srt desat a {sal_srt_path}")
else:
_log(f"[Salamandra] Error descarregant SRT: {srt_s}")
except Exception as e_ds:
_log(f"[Salamandra] Excepció descarregant SRT: {e_ds}")
try:
free_s = api.download_salamandra_free_narration(sha1)
if isinstance(free_s, dict) and not free_s.get("error"):
salamandra_free = free_s.get("text", "") or ""
sal_dir = base_media_dir / "Salamandra"
sal_dir.mkdir(parents=True, exist_ok=True)
sal_free_path = sal_dir / "free_narration.txt"
with sal_free_path.open("w", encoding="utf-8") as f_sf:
f_sf.write(salamandra_free)
_log(f"[Salamandra] free_narration.txt desat a {sal_free_path}")
else:
_log(f"[Salamandra] Error descarregant free_narration: {free_s}")
except Exception as e_df:
_log(f"[Salamandra] Excepció descarregant free_narration: {e_df}")
# Persistir a audiodescriptions.db
try:
upsert_audiodescription_text(
sha1sum=sha1,
version="Salamandra",
une_ad=salamandra_srt or "",
free_ad=salamandra_free or "",
)
any_success = True
except Exception as db_exc:
_log(f"[audiodescriptions] Error desant AD Salamandra: {db_exc}")
# Esdeveniments
try:
if salamandra_srt:
log_event(
session=session_id,
ip=ip,
user=username or "",
password=password or "",
phone=phone,
action="Salamandra AD generated",
sha1sum=sha1,
visibility=vis_flag,
)
if salamandra_free:
log_event(
session=session_id,
ip=ip,
user=username or "",
password=password or "",
phone=phone,
action="Salamandra free AD generated",
sha1sum=sha1,
visibility=vis_flag,
)
except Exception as e_evt_s:
_log(f"[events] Error registrant esdeveniments Salamandra: {e_evt_s}")
# 5) MoE
if moe_enabled:
progress_placeholder.info("🧠 Generant versió MoE...")
try:
gen_resp_m = api.generate_moe_result(sha1)
_log(f"[MoE] generate_moe_result resp: {gen_resp_m}")
except Exception as e_gen_m:
_log(f"[MoE] Error cridant generate_moe_result: {e_gen_m}")
gen_resp_m = {"error": str(e_gen_m)}
if isinstance(gen_resp_m, dict) and gen_resp_m.get("error"):
_log(f"[MoE] Error en generació: {gen_resp_m.get('error')}")
else:
moe_srt = ""
moe_free = ""
try:
srt_m = api.download_moe_srt(sha1)
if isinstance(srt_m, dict) and not srt_m.get("error"):
moe_srt = srt_m.get("text", "") or ""
moe_dir = base_media_dir / "MoE"
moe_dir.mkdir(parents=True, exist_ok=True)
moe_srt_path = moe_dir / "result.srt"
with moe_srt_path.open("w", encoding="utf-8") as f_ms:
f_ms.write(moe_srt)
_log(f"[MoE] result.srt desat a {moe_srt_path}")
else:
_log(f"[MoE] Error descarregant SRT: {srt_m}")
except Exception as e_dm_s:
_log(f"[MoE] Excepció descarregant SRT: {e_dm_s}")
try:
free_m = api.download_moe_free_narration(sha1)
if isinstance(free_m, dict) and not free_m.get("error"):
moe_free = free_m.get("text", "") or ""
moe_dir = base_media_dir / "MoE"
moe_dir.mkdir(parents=True, exist_ok=True)
moe_free_path = moe_dir / "free_narration.txt"
with moe_free_path.open("w", encoding="utf-8") as f_mf:
f_mf.write(moe_free)
_log(f"[MoE] free_narration.txt desat a {moe_free_path}")
else:
_log(f"[MoE] Error descarregant free_narration: {free_m}")
except Exception as e_dm_f:
_log(f"[MoE] Excepció descarregant free_narration: {e_dm_f}")
# Persistir a audiodescriptions.db
try:
upsert_audiodescription_text(
sha1sum=sha1,
version="MoE",
une_ad=moe_srt or "",
free_ad=moe_free or "",
)
any_success = True
except Exception as db_exc_m:
_log(f"[audiodescriptions] Error desant AD MoE: {db_exc_m}")
# Esdeveniments
try:
if moe_srt:
log_event(
session=session_id,
ip=ip,
user=username or "",
password=password or "",
phone=phone,
action="MoE AD generated",
sha1sum=sha1,
visibility=vis_flag,
)
if moe_free:
log_event(
session=session_id,
ip=ip,
user=username or "",
password=password or "",
phone=phone,
action="MoE free AD generated",
sha1sum=sha1,
visibility=vis_flag,
)
except Exception as e_evt_m:
_log(f"[events] Error registrant esdeveniments MoE: {e_evt_m}")
# 6) Refinement opcional sobre les versions generades
try:
refinement_active = bool(reflection_enabled or reflexion_enabled or introspection_enabled)
if refinement_active:
# Guardar info_ad (info.json inicial) si el tenim
if init_info_text and sha1:
try:
update_audiodescription_info_ad(
sha1sum=sha1,
version="Salamandra",
info_ad=init_info_text,
)
except Exception:
pass
try:
update_audiodescription_info_ad(
sha1sum=sha1,
version="MoE",
info_ad=init_info_text,
)
except Exception:
pass
# Refinar Salamandra
if salamandra_enabled and salamandra_srt:
try:
ref_resp_s = api.apply_refinement(
sha1sum=sha1,
version="Salamandra",
srt_content=salamandra_srt,
reflection_enabled=reflection_enabled,
reflexion_enabled=reflexion_enabled,
introspection_enabled=introspection_enabled,
)
_log(f"[Refinement] Salamandra resp: {ref_resp_s}")
refined_srt = None
if isinstance(ref_resp_s, dict):
refined_srt = ref_resp_s.get("refined_srt") or ref_resp_s.get("refinedSrt")
if refined_srt:
update_audiodescription_text(
sha1sum=sha1,
version="Salamandra",
une_ad=refined_srt,
)
refined_any = True
try:
import hashlib as _hashlib
srt_hash = _hashlib.sha1(refined_srt.encode("utf-8")).hexdigest()
log_event(
session=session_id,
ip=ip,
user=username or "",
password=password or "",
phone=phone,
action="Refined AD",
sha1sum=srt_hash,
visibility=vis_flag,
)
except Exception as e_evt_ref_s:
_log(f"[events] Error registrant Refined AD (Salamandra): {e_evt_ref_s}")
except Exception as e_ref_s:
_log(f"[Refinement] Error refinant Salamandra: {e_ref_s}")
# Refinar MoE
if moe_enabled and moe_srt:
try:
ref_resp_m = api.apply_refinement(
sha1sum=sha1,
version="MoE",
srt_content=moe_srt,
reflection_enabled=reflection_enabled,
reflexion_enabled=reflexion_enabled,
introspection_enabled=introspection_enabled,
)
_log(f"[Refinement] MoE resp: {ref_resp_m}")
refined_srt_m = None
if isinstance(ref_resp_m, dict):
refined_srt_m = ref_resp_m.get("refined_srt") or ref_resp_m.get("refinedSrt")
if refined_srt_m:
update_audiodescription_text(
sha1sum=sha1,
version="MoE",
une_ad=refined_srt_m,
)
refined_any = True
try:
import hashlib as _hashlib
srt_hash_m = _hashlib.sha1(refined_srt_m.encode("utf-8")).hexdigest()
log_event(
session=session_id,
ip=ip,
user=username or "",
password=password or "",
phone=phone,
action="Refined AD",
sha1sum=srt_hash_m,
visibility=vis_flag,
)
except Exception as e_evt_ref_m:
_log(f"[events] Error registrant Refined AD (MoE): {e_evt_ref_m}")
except Exception as e_ref_m:
_log(f"[Refinement] Error refinant MoE: {e_ref_m}")
except Exception as e_ref:
_log(f"[Refinement] Error global de refinement: {e_ref}")
# 7) Enviament opcional d'SMS per a validació UNE i event 'Waiting for UNE validation'
try:
if any_success and refined_any and sha1:
sms_channels_enabled = bool(twilio_enabled_cfg or zapier_enabled_cfg)
if sms_channels_enabled and une_validator_sms_enabled and une_phone_validator:
try:
# Text de l'SMS en català, tal com has indicat
sms_msg = "Noves audiodescripcions a validar segons la norma UNE-153020"
compliance_client.notify_une_validator_new_ads(
phone=une_phone_validator,
message=sms_msg,
)
except Exception as e_sms_call:
_log(f"[UNE SMS] Error cridant compliance per UNE: {e_sms_call}")
# Registrar estat d'espera de validació UNE a events.db
try:
log_event(
session=session_id,
ip=ip,
user=username or "",
password=password or "",
phone=une_phone_validator,
action="Waiting for UNE validation",
sha1sum=sha1,
visibility=vis_flag,
)
except Exception as e_evt_wait:
_log(f"[events] Error registrant Waiting for UNE validation: {e_evt_wait}")
except Exception as e_sms:
_log(f"[UNE SMS] Error en flux d'SMS/espera validació: {e_sms}")
# 8) Actualitzar status del vídeo a 'UNE-pending' a videos.db
try:
if any_success and sha1:
update_video_status(sha1, "UNE-pending")
except Exception as e_upd_status:
_log(f"[videos] Error actualitzant status a 'UNE-pending': {e_upd_status}")
# 9) Invocar Space TTS per generar free_ad.mp3 i une_ad.mp4 a temp/media/<sha1>/Original
try:
if any_success and sha1:
# Obtenir el text UNE més recent des d'audiodescriptions.db (prioritzem Salamandra)
une_text = ""
row_s = get_audiodescription(sha1, "Salamandra")
if row_s is not None:
try:
une_text = (row_s["une_ad"] or "").strip()
except Exception:
une_text = ""
if not une_text:
row_m = get_audiodescription(sha1, "MoE")
if row_m is not None:
try:
une_text = (row_m["une_ad"] or "").strip()
except Exception:
une_text = ""
if une_text:
base_media_dir = Path(__file__).parent.parent / "temp" / "media" / sha1
video_path = base_media_dir / "video.mp4"
if not video_path.exists():
# Assegurar que tenim la media localment
try:
ensure_media_for_video(Path(__file__).parent.parent, api, sha1)
except Exception as e_em:
_log(f"[TTS] Error assegurant media per al vídeo: {e_em}")
if video_path.exists():
# Preparar carpeta de sortida Original
original_dir = base_media_dir / "Original"
original_dir.mkdir(parents=True, exist_ok=True)
# Escriure SRT temporal i cridar Space TTS (/tts/srt)
tts_url = os.getenv("API_TTS_URL", "").strip()
if tts_url:
try:
with tempfile.TemporaryDirectory(prefix="tts_srt_") as td:
td_path = Path(td)
srt_tmp = td_path / "ad_input.srt"
srt_tmp.write_text(une_text, encoding="utf-8")
files = {
"srt": ("ad_input.srt", srt_tmp.open("rb"), "text/plain"),
"video": ("video.mp4", video_path.open("rb"), "video/mp4"),
}
data = {
"voice": "central/grau",
"ad_format": "mp3",
"include_final_mp4": "1",
}
resp = requests.post(
f"{tts_url.rstrip('/')}/tts/srt",
files=files,
data=data,
timeout=300,
)
resp.raise_for_status()
# La resposta és un ZIP amb ad_master.(mp3|wav), mix i opcionalment video_con_ad.mp4
zip_bytes = resp.content
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for member in zf.infolist():
name = member.filename
lower = name.lower()
if lower.endswith("ad_master.mp3"):
target = original_dir / "free_ad.mp3"
with zf.open(member) as src, target.open("wb") as dst:
shutil.copyfileobj(src, dst)
elif lower.endswith("video_con_ad.mp4"):
target = original_dir / "une_ad.mp4"
with zf.open(member) as src, target.open("wb") as dst:
shutil.copyfileobj(src, dst)
except Exception as e_tts:
_log(f"[TTS] Error generant assets TTS (free_ad.mp3/une_ad.mp4): {e_tts}")
else:
_log("[TTS] API_TTS_URL no configurada; s'omet la generació de free_ad.mp3/une_ad.mp4")
else:
_log("[TTS] No s'ha trobat text UNE per al vídeo; s'omet la generació TTS")
except Exception as e_tts_global:
_log(f"[TTS] Error global al flux TTS: {e_tts_global}")
# 10) Registrar acció "AD generated" a actions.db per a aquest vídeo
try:
if any_success and sha1:
session_id_actions = session_id
actions_user, actions_phone = get_latest_user_phone_for_session(session_id_actions)
if not actions_user:
actions_user = username or ""
if not actions_phone:
actions_phone = phone or ""
insert_action(
session=session_id_actions,
user=actions_user,
phone=actions_phone,
action="AD generated",
sha1sum=sha1,
)
except Exception as e_act:
_log(f"[actions] Error registrant acció 'AD generated': {e_act}")
if any_success:
progress_placeholder.success("✅ Audiodescripció generada i desada. Ara està pendent de validació UNE.")
result_placeholder.info("La teva audiodescripció s'està generant i queda pendent de validació. Pots sortir de la sessió guardant els canvis i tornar més endavant per revisar el resultat.")
else:
progress_placeholder.empty()
result_placeholder.error("❌ No s'ha pogut generar cap versió d'audiodescripció.")
except Exception as e:
progress_placeholder.empty()
result_placeholder.error(f"❌ Excepció durant la generació: {e}")