"""UI logic for the "Processar vídeo nou" page - Recovered from backup with full functionality.""" from __future__ import annotations import re import shutil import subprocess import os import time import tempfile import hashlib from pathlib import Path import sys from datetime import datetime import yaml import sqlite3 import json import zipfile import io import requests import streamlit as st from PIL import Image, ImageDraw from databases import ( log_action, has_video_approval_action, upsert_audiodescription_text, get_latest_user_phone_for_session, insert_action, ensure_video_row_for_upload, is_video_input_ok, update_video_status, get_audiodescription, ) from compliance_client import compliance_client from persistent_data_gate import ensure_temp_databases, _load_data_origin, ensure_media_for_video def get_all_catalan_names(): """Retorna tots els noms catalans disponibles.""" noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert", "Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"] noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla", "Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"] return noms_home, noms_dona def _log(msg: str) -> None: """Helper de logging a stderr amb timestamp (coherent amb auth.py).""" ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") sys.stderr.write(f"[{ts}] {msg}\n") sys.stderr.flush() def get_catalan_name_for_speaker(speaker_label: int, used_names_home: list = None, used_names_dona: list = None) -> str: """Genera un nom català per a un speaker, reutilitzant noms de caras si estan disponibles.""" noms_home, noms_dona = get_all_catalan_names() if used_names_home is None: used_names_home = [] if used_names_dona is None: used_names_dona = [] is_male = (speaker_label % 2 == 0) if is_male: if used_names_home: idx = speaker_label // 2 return used_names_home[idx % len(used_names_home)] else: hash_val = hash(f"speaker_{speaker_label}") return noms_home[abs(hash_val) % len(noms_home)] else: if used_names_dona: idx = speaker_label // 2 return used_names_dona[idx % len(used_names_dona)] else: hash_val = hash(f"speaker_{speaker_label}") return noms_dona[abs(hash_val) % len(noms_dona)] def _get_video_duration(path: str) -> float: """Return video duration in seconds using ffprobe, ffmpeg or OpenCV as fallback.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return float(result.stdout.strip()) except (subprocess.CalledProcessError, ValueError, FileNotFoundError): pass if shutil.which("ffmpeg"): try: ffmpeg_cmd = ["ffmpeg", "-i", path] result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, check=False) output = result.stderr or result.stdout or "" match = re.search(r"Duration:\s*(\d+):(\d+):(\d+\.\d+)", output) if match: hours, minutes, seconds = match.groups() total_seconds = (int(hours) * 3600) + (int(minutes) * 60) + float(seconds) return float(total_seconds) except FileNotFoundError: pass # Últim recurs: intentar amb OpenCV si està disponible try: import cv2 cap = cv2.VideoCapture(path) if cap.isOpened(): fps = cap.get(cv2.CAP_PROP_FPS) or 0 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0 cap.release() if fps > 0 and frame_count > 0: return float(frame_count / fps) else: cap.release() except Exception: pass return 0.0 def _transcode_video(input_path: str, output_path: str, max_duration: int | None = None) -> None: cmd = ["ffmpeg", "-y", "-i", input_path] if max_duration is not None: cmd += ["-t", str(max_duration)] cmd += [ "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-c:a", "aac", "-movflags", "+faststart", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(result.stderr.strip() or "ffmpeg failed") def render_process_video_page(api, backend_base_url: str) -> None: st.header("Processar un nou clip de vídeo") # Llegir config.yaml (flags d'app i límits de media) base_dir = Path(__file__).parent.parent config_path = base_dir / "config.yaml" manual_validation_enabled = True max_size_mb = 20 max_duration_s = 30 video_validator_sms_enabled = False skip_manual_validation_for_this_video = False try: if config_path.exists(): with config_path.open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) or {} app_cfg = cfg.get("app", {}) or {} manual_validation_enabled = bool(app_cfg.get("manual_validation_enabled", True)) media_cfg = cfg.get("media", {}) or {} # Límits configurables de mida i durada max_size_mb = int(media_cfg.get("max_size_mb", max_size_mb)) max_duration_s = int(media_cfg.get("max_duration_s", max_duration_s)) # Flags de validació / SMS de validador de vídeo validation_cfg = cfg.get("validation", {}) or {} video_validator_sms_enabled = bool(validation_cfg.get("video_validator_sms_enabled", False)) except Exception: manual_validation_enabled = True # CSS para estabilizar carruseles y evitar vibración del layout st.markdown(""" """, unsafe_allow_html=True) msg_detect = st.empty() msg_finalize = st.empty() msg_ad = st.empty() # Inicializar el estado de la página si no existe if "video_uploaded" not in st.session_state: st.session_state.video_uploaded = None if "characters_detected" not in st.session_state: st.session_state.characters_detected = None if "audio_segments" not in st.session_state: st.session_state.audio_segments = None if "voice_labels" not in st.session_state: st.session_state.voice_labels = None if "face_labels" not in st.session_state: st.session_state.face_labels = None if "scene_clusters" not in st.session_state: st.session_state.scene_clusters = None if "scene_detection_done" not in st.session_state: st.session_state.scene_detection_done = False if "detect_done" not in st.session_state: st.session_state.detect_done = False if "casting_finalized" not in st.session_state: st.session_state.casting_finalized = False if "video_name_from_engine" not in st.session_state: st.session_state.video_name_from_engine = None if "diarization_info" not in st.session_state: st.session_state.diarization_info = {} if "characters_saved" not in st.session_state: st.session_state.characters_saved = False if "video_requires_validation" not in st.session_state: st.session_state.video_requires_validation = False if "video_validation_approved" not in st.session_state: st.session_state.video_validation_approved = False # --- 1. Subida del vídeo --- MAX_SIZE_MB = max_size_mb MAX_DURATION_S = max_duration_s # Selector de visibilitat (privat/públic), a la dreta del uploader if "video_visibility" not in st.session_state: st.session_state.video_visibility = "Privat" col_upload, col_vis = st.columns([3, 1]) with col_upload: uploaded_file = st.file_uploader( f"Puja un clip de vídeo (MP4, < {MAX_SIZE_MB}MB, < {MAX_DURATION_S} segons)", type=["mp4"], key="video_uploader", ) with col_vis: disabled_vis = st.session_state.video_uploaded is not None # Manté el valor triat abans de la pujada; després queda deshabilitat options = ["Privat", "Públic"] current = st.session_state.get("video_visibility", "Privat") try: idx = options.index(current) except ValueError: idx = 0 st.selectbox( "Visibilitat", options, index=idx, key="video_visibility", disabled=disabled_vis, ) if uploaded_file is not None: # Resetear el estado si se sube un nuevo archivo if st.session_state.video_uploaded is None or uploaded_file.name != st.session_state.video_uploaded.get( "original_name" ): st.session_state.video_uploaded = {"original_name": uploaded_file.name, "status": "validating"} st.session_state.characters_detected = None st.session_state.characters_saved = False if st.session_state.video_uploaded["status"] == "validating": is_valid = True if uploaded_file.size > MAX_SIZE_MB * 1024 * 1024: st.error(f"El vídeo supera el límit de {MAX_SIZE_MB}MB.") is_valid = False if is_valid: with st.spinner("Processant el vídeo..."): temp_path = Path("temp_video.mp4") with temp_path.open("wb") as f: f.write(uploaded_file.getbuffer()) was_truncated = False final_video_path = None try: duration = _get_video_duration(str(temp_path)) duration_unknown = False if not duration: st.warning( f"No s'ha pogut obtenir la durada del vídeo. Es continuarà assumint un màxim de {MAX_DURATION_S} segons." ) duration = float(MAX_DURATION_S) duration_unknown = True if is_valid: if duration > MAX_DURATION_S: was_truncated = True video_name = Path(uploaded_file.name).stem video_dir = Path("/tmp/data/videos") / video_name video_dir.mkdir(parents=True, exist_ok=True) # Guardem sempre el vídeo original com a "video.mp4" dins la carpeta final_video_path = video_dir / "video.mp4" try: _transcode_video( str(temp_path), str(final_video_path), MAX_DURATION_S if (was_truncated or duration_unknown) else None, ) except RuntimeError as exc: st.error(f"No s'ha pogut processar el vídeo: {exc}") is_valid = False if is_valid and final_video_path is not None: video_bytes = uploaded_file.getvalue() sha1 = hashlib.sha1(video_bytes).hexdigest() st.session_state.video_uploaded.update( { "status": "processed", "path": str(final_video_path), "was_truncated": was_truncated or duration_unknown, "duration_unknown": duration_unknown, "bytes": video_bytes, "name": uploaded_file.name, "sha1sum": sha1, } ) # Si el vídeo ja està marcat com input-OK a videos.db, saltar validació try: if is_video_input_ok(sha1): skip_manual_validation_for_this_video = True # Assegurar que disposem de temp/media//video.mp4 base_dir = Path(__file__).parent.parent api_client = st.session_state.get("api_client") try: ensure_media_for_video(base_dir, api_client, sha1) except Exception as e_media: _log(f"[MEDIA] Error assegurant media per a {sha1}: {e_media}") except Exception as e_chk: _log(f"[VIDEOS] Error comprovant status input-OK per a {sha1}: {e_chk}") # Registre d'esdeveniment de pujada de vídeo a events.db i accions a actions.db/videos.db try: session_id = st.session_state.get("session_id", "") ip = st.session_state.get("client_ip", "") username = ( (st.session_state.get("user") or {}).get("username") if st.session_state.get("user") else "" ) password = st.session_state.get("last_password", "") phone = ( st.session_state.get("sms_phone_verified") or st.session_state.get("sms_phone") or "" ) vis_choice = st.session_state.get("video_visibility", "Privat") vis_flag = "public" if vis_choice.strip().lower().startswith("púb") else "private" # 1) Registre a actions.db (acció bàsica) log_action( session=session_id, user=username or "", phone=phone, action="upload", sha1sum=sha1, ) # 2) Determinar user/phone per a actions.db actions_user, actions_phone = get_latest_user_phone_for_session(session_id) if not actions_user: actions_user = username or "" if not actions_phone: actions_phone = phone or "" # 3) Inserir acció "Uploaded video" a actions.db (demo/temp/db/actions.db) insert_action( session=session_id, user=actions_user, phone=actions_phone, action="Uploaded video", sha1sum=sha1, ) # 4) Assegurar fila a videos.db (demo/temp/db/videos.db) amb owner i status="input-pending" ensure_video_row_for_upload( sha1sum=sha1, video_name=uploaded_file.name, owner_phone=actions_phone, status="input-pending", visibility=vis_flag, ) except Exception as e: print(f"[events/actions] Error registrant pujada de vídeo: {e}") # Guardar sempre el vídeo a demo/temp/pending_videos//video.mp4 # i, en mode external, enviar-lo també a pending_videos de l'engine try: base_dir = Path(__file__).parent.parent data_origin = _load_data_origin(base_dir) pending_root = base_dir / "temp" / "pending_videos" / sha1 pending_root.mkdir(parents=True, exist_ok=True) local_pending_path = pending_root / "video.mp4" # Guardar còpia local del vídeo pendent with local_pending_path.open("wb") as f_pending: f_pending.write(video_bytes) if data_origin == "external": # Enviar el vídeo al backend engine perquè aparegui a la llista de pendents try: resp_pending = api.upload_pending_video(video_bytes, uploaded_file.name) _log(f"[pending_videos] upload_pending_video resp: {resp_pending}") except Exception as e_up: _log(f"[pending_videos] Error cridant upload_pending_video: {e_up}") except Exception as e_ext: _log(f"[pending_videos] Error bloc exterior upload_pending_video: {e_ext}") # Marcar estat de validació segons la configuració de seguretat if manual_validation_enabled and not skip_manual_validation_for_this_video: st.session_state.video_requires_validation = True st.session_state.video_validation_approved = False # Notificar al validador per SMS només si està habilitat a config.yaml if video_validator_sms_enabled: try: compliance_client.notify_video_upload( video_name=uploaded_file.name, sha1sum=sha1, ) except Exception as sms_exc: print(f"[VIDEO SMS] Error enviant notificació al validor: {sms_exc}") else: # Sense validació manual (o ja input-OK): es considera validat automàticament st.session_state.video_requires_validation = False st.session_state.video_validation_approved = True st.rerun() finally: if temp_path.exists(): temp_path.unlink() if st.session_state.video_uploaded and st.session_state.video_uploaded["status"] == "processed": st.success(f"Vídeo '{st.session_state.video_uploaded['original_name']}' pujat i processat correctament.") if st.session_state.video_uploaded["was_truncated"]: st.warning(f"El vídeo s'ha truncat a {MAX_DURATION_S} segons.") if manual_validation_enabled and st.session_state.get("video_requires_validation") and not st.session_state.get("video_validation_approved"): st.info("Per favor, espera a la revisió humana del vídeo.") # Comprovar si hi ha aprovació de vídeo a events.db per al sha1sum actual current_sha1 = None if st.session_state.get("video_uploaded"): current_sha1 = st.session_state.video_uploaded.get("sha1sum") if current_sha1 and st.session_state.get("video_requires_validation") and not st.session_state.get("video_validation_approved"): if has_video_approval_action(current_sha1): st.session_state.video_validation_approved = True # Només podem continuar amb el càsting si el vídeo no requereix validació # o si ja ha estat marcat com a validat. can_proceed_casting = ( st.session_state.get("video_uploaded") is not None and ( not st.session_state.get("video_requires_validation") or st.session_state.get("video_validation_approved") ) ) # --- 2. Form de detecció amb sliders --- # Només es mostra quan ja hi ha un vídeo pujat **i** està validat (si cal validació). if can_proceed_casting: st.markdown("---") with st.form("detect_form"): col_btn, col_face, col_voice, col_scene = st.columns([1, 1, 1, 1]) with col_face: st.markdown("**Cares**") face_max_groups = st.slider("k-Target (cares)", 0, 10, 2, 1, key="face_max_groups") face_min_cluster = st.slider("Mida mínima (cares)", 1, 5, 3, 1, key="face_min_cluster") with col_voice: st.markdown("**Veus**") voice_max_groups = st.slider("k-Target (veus)", 0, 10, 2, 1, key="voice_max_groups") voice_min_cluster = st.slider("Mida mínima (veus)", 1, 5, 1, 1, key="voice_min_cluster") with col_scene: st.markdown("**Escenes**") scene_max_groups = st.slider("k-Target (escenes)", 0, 5, 2, 1, key="scene_max_groups") scene_min_cluster = st.slider("Mida mínima (escenes)", 1, 20, 3, 1, key="scene_min_cluster") with col_btn: max_frames = st.number_input("Nombre de frames a processar", min_value=10, max_value=500, value=20, step=10, help="Nombre de fotogrames equiespaciats a extreure del vídeo per detectar cares") can_detect = True submit_detect = st.form_submit_button("Detectar Personatges", disabled=not can_detect) if not can_detect: st.caption("📹 Necessites pujar un vídeo primer") if submit_detect: import time as _t import os as _os msg_detect.empty() msg_finalize.empty() msg_ad.empty() try: v = st.session_state.video_uploaded # Reset estat abans de començar st.session_state.scene_clusters = None st.session_state.scene_detection_done = False st.session_state.detect_done = False st.session_state.casting_finalized = False _log(f"[DETECT] Iniciando detección para vídeo: {v['name']}") _log(f"[DETECT] Parámetros: face_k={face_max_groups}, face_min={face_min_cluster}, max_frames={max_frames}") resp = api.create_initial_casting( video_bytes=v["bytes"], video_name=v["name"], face_max_groups=face_max_groups, face_min_cluster_size=face_min_cluster, voice_max_groups=voice_max_groups, voice_min_cluster_size=voice_min_cluster, max_frames=max_frames, ) _log(f"[DETECT] Respuesta create_initial_casting: {resp}") if not isinstance(resp, dict) or not resp.get("job_id"): _log(f"[DETECT] ERROR: No se recibió job_id válido") msg_detect.error("No s'ha pogut crear el job al servidor. Torna-ho a intentar.") else: job_id = resp["job_id"] _log(f"[DETECT] Job creado: {job_id}") msg_detect.info(f"Job creat: {job_id}. Iniciant polling en 3s…") with st.spinner("Processant al servidor…"): _t.sleep(3) attempt, max_attempts = 0, 120 progress_placeholder = st.empty() while attempt < max_attempts: stt = api.get_job(job_id) status = stt.get("status") if status in ("queued", "processing"): if attempt % 10 == 0: elapsed_min = (attempt * 5) // 60 progress_placeholder.info(f"⏳ Processant al servidor... (~{elapsed_min} min)") _t.sleep(5) attempt += 1 continue if status == "failed": progress_placeholder.empty() msg_detect.error("El processament ha fallat al servidor.") break # Success _log(f"[DETECT] Job completado. Status raw: {stt}") res = stt.get("results", {}) _log(f"[DETECT] Results keys: {res.keys() if res else 'None'}") chars = res.get("characters", []) fl = res.get("face_labels", []) segs = res.get("audio_segments", []) vl = res.get("voice_labels", []) base_dir = res.get("base_dir") vname = _os.path.basename(base_dir) if base_dir else None diar_info = res.get("diarization_info", {}) _log(f"[DETECT] Parsed: chars={len(chars)}, face_labels={len(fl)}, audio_segs={len(segs)}, voice_labels={len(vl)}") if chars: for i, c in enumerate(chars): _log(f"[DETECT] Char[{i}]: id={c.get('id')}, num_faces={c.get('num_faces')}, files={c.get('face_files', [])[:3]}") st.session_state.characters_detected = chars or [] st.session_state.face_labels = fl or [] st.session_state.audio_segments = segs or [] st.session_state.voice_labels = vl or [] st.session_state.video_name_from_engine = vname st.session_state.engine_base_dir = base_dir st.session_state.diarization_info = diar_info or {} progress_placeholder.empty() if chars: msg_detect.success( f"✓ Detecció completada! Trobades {len(chars)} cares.\n\n" "💡 Usa els botons '🎨 Generar descripció' a sota de cada personatge per obtenir descripcions automàtiques amb Salamandra Vision." ) else: msg_detect.info("No s'han detectat cares en aquest vídeo.") # Detect scenes try: scene_out = api.detect_scenes( video_bytes=v["bytes"], video_name=v["name"], max_groups=scene_max_groups, min_cluster_size=scene_min_cluster, frame_interval_sec=0.5, max_frames=max_frames, ) scs = scene_out.get("scene_clusters") if isinstance(scene_out, dict) else None if isinstance(scs, list): st.session_state.scene_clusters = scs else: st.session_state.scene_clusters = [] except Exception: st.session_state.scene_clusters = [] finally: st.session_state.scene_detection_done = True st.session_state.detect_done = True msg_detect.success("✅ Processament completat!") break else: progress_placeholder.empty() msg_detect.warning(f"⏱️ El servidor no ha completat el job en {max_attempts * 5 // 60} minuts.") except Exception as e: msg_detect.error(f"Error inesperat: {e}") # Botó per actualitzar manualment l'estat de validació del vídeo # Només es mostra mentre el vídeo està pendent de validació humana if ( st.session_state.get("video_uploaded") and st.session_state.get("video_requires_validation") and not st.session_state.get("video_validation_approved") ): col_status, col_refresh = st.columns([3, 1]) with col_status: st.caption("⏳ Vídeo pendent de validació humana.") with col_refresh: if st.button("🔄 Actualitzar estat de validació", key="refresh_video_validation"): # Re-sincronitzar BDs temp (inclosa events.db) des de l'origen try: base_dir = Path(__file__).parent.parent api_client = st.session_state.get("api_client") ensure_temp_databases(base_dir, api_client) except Exception: pass if current_sha1: if has_video_approval_action(current_sha1): st.session_state.video_validation_approved = True st.success("✅ Vídeo validat. Pots continuar amb el càsting.") else: st.info("Encara no s'ha registrat cap aprovació per a aquest vídeo.") # --- 3. Carruseles de cares --- if st.session_state.get("characters_detected") is not None: st.markdown("---") n_face_clusters = len(st.session_state.get("characters_detected") or []) st.subheader(f"🖼️ Cares — clústers: {n_face_clusters}") if n_face_clusters == 0: st.info("No s'han detectat clústers de cara en aquest clip.") for idx, ch in enumerate(st.session_state.characters_detected or []): try: folder_name = Path(ch.get("folder") or "").name except Exception: folder_name = "" char_id = ch.get("id") or folder_name or f"char{idx+1}" def _safe_key(s: str) -> str: k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "") return k or f"cluster_{idx+1}" key_prefix = _safe_key(f"char_{idx+1}_{char_id}") if f"{key_prefix}_idx" not in st.session_state: st.session_state[f"{key_prefix}_idx"] = 0 if f"{key_prefix}_discard" not in st.session_state: st.session_state[f"{key_prefix}_discard"] = set() faces_all = ch.get("face_files") or ([ch.get("image_url")] if ch.get("image_url") else []) faces_all = [f for f in faces_all if f] discard_set = st.session_state[f"{key_prefix}_discard"] faces = [f for f in faces_all if f not in discard_set] if not faces: st.write(f"- {idx+1}. {ch.get('name','(sense nom)')} — sense imatges seleccionades") continue cur = st.session_state[f"{key_prefix}_idx"] if cur >= len(faces): cur = 0 st.session_state[f"{key_prefix}_idx"] = cur fname = faces[cur] if fname.startswith("/files/"): img_url = f"{backend_base_url}/preprocessing{fname}" else: base = ch.get("image_url") or "" base_dir = "/".join((base or "/").split("/")[:-1]) img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}" st.markdown(f"**{idx+1}. {ch.get('name','(sense nom)')} — {ch.get('num_faces', 0)} cares**") spacer_col, main_content_col = st.columns([0.12, 0.88]) with spacer_col: st.write("") with main_content_col: media_col, form_col = st.columns([1.3, 2.7]) with media_col: st.image(img_url, width=180) st.caption(f"Imatge {cur+1}/{len(faces)}") nav_prev, nav_del, nav_next = st.columns(3) with nav_prev: if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"): st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(faces) st.rerun() with nav_del: if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquesta imatge del clúster"): st.session_state[f"{key_prefix}_discard"].add(fname) new_list = [f for f in faces if f != fname] new_idx = cur if cur < len(new_list) else 0 st.session_state[f"{key_prefix}_idx"] = new_idx st.rerun() with nav_next: if st.button("➡️", key=f"next_{key_prefix}", help="Següent"): st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(faces) st.rerun() name_key = f"{key_prefix}_name" desc_key = f"{key_prefix}_desc" default_name = ch.get("name", "") default_desc = ch.get("description", "") if default_name and (name_key not in st.session_state or not st.session_state.get(name_key)): st.session_state[name_key] = default_name elif name_key not in st.session_state: st.session_state[name_key] = default_name or "" if default_desc and (desc_key not in st.session_state or not st.session_state.get(desc_key)): st.session_state[desc_key] = default_desc elif desc_key not in st.session_state: st.session_state[desc_key] = default_desc or "" pending_desc_key = f"{key_prefix}_pending_desc" pending_name_key = f"{key_prefix}_pending_name" if pending_desc_key in st.session_state: if desc_key not in st.session_state: st.session_state[desc_key] = "" st.session_state[desc_key] = st.session_state[pending_desc_key] del st.session_state[pending_desc_key] if pending_name_key in st.session_state: if name_key not in st.session_state: st.session_state[name_key] = "" if not st.session_state.get(name_key): st.session_state[name_key] = st.session_state[pending_name_key] del st.session_state[pending_name_key] with form_col: st.text_input("Nom del clúster", key=name_key) st.text_area("Descripció", key=desc_key, height=80) if st.button("🎨 Generar descripció amb Salamandra Vision", key=f"svision_{key_prefix}"): with st.spinner("Generant descripció..."): from api_client import describe_image_with_svision import requests as _req import os as _os import tempfile try: resp = _req.get(img_url, timeout=10) if resp.status_code == 200: with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: tmp.write(resp.content) tmp_path = tmp.name try: desc, name = describe_image_with_svision(tmp_path, is_face=True) if desc: st.session_state[pending_desc_key] = desc st.success("✅ Descripció generada!") print(f"[SVISION] Descripció generada per {char_id}: {desc[:100]}") else: st.warning("⚠️ No s'ha pogut generar una descripció.") print(f"[SVISION] Descripció buida per {char_id}") if name and not st.session_state.get(name_key): st.session_state[pending_name_key] = name print(f"[SVISION] Nom generat per {char_id}: {name}") finally: # Always clean up the temp file try: _os.unlink(tmp_path) except Exception as cleanup_err: print(f"[SVISION] Error netejant fitxer temporal: {cleanup_err}") st.rerun() else: st.error(f"No s'ha pogut descarregar la imatge (status: {resp.status_code})") except Exception as e: st.error(f"Error generant descripció: {str(e)}") print(f"[SVISION] Error complet: {e}") import traceback traceback.print_exc() # --- 4. Carruseles de veus --- if st.session_state.get("audio_segments") is not None: st.markdown("---") used_names_home = [] used_names_dona = [] noms_home_all, noms_dona_all = get_all_catalan_names() for ch in (st.session_state.characters_detected or []): ch_name = ch.get("name", "") if ch_name in noms_home_all: used_names_home.append(ch_name) elif ch_name in noms_dona_all: used_names_dona.append(ch_name) segs = st.session_state.audio_segments or [] vlabels = st.session_state.voice_labels or [] valid_indices = [i for i, l in enumerate(vlabels) if isinstance(l, int) and l >= 0] clusters = {} for i in valid_indices: lbl = int(vlabels[i]) clusters.setdefault(lbl, []).append(i) n_vclusters = len(clusters) st.subheader(f"🎙️ Empremtes de veu — clústers: {n_vclusters}") di = st.session_state.get("diarization_info") or {} if isinstance(di, dict) and not di.get("diarization_ok", True): st.warning("No s'ha pogut fer la diarització amb pyannote (s'ha aplicat un sol segment de reserva).") if not segs: st.info("No s'han detectat mostres de veu.") elif n_vclusters == 0: st.info("No s'han format clústers de veu.") else: vname = st.session_state.video_name_from_engine for lbl, idxs in sorted(clusters.items(), key=lambda x: x[0]): key_prefix = f"voice_{lbl:02d}" if f"{key_prefix}_idx" not in st.session_state: st.session_state[f"{key_prefix}_idx"] = 0 if f"{key_prefix}_discard" not in st.session_state: st.session_state[f"{key_prefix}_discard"] = set() discard_set = st.session_state[f"{key_prefix}_discard"] files = [] for i in idxs: clip_local = (segs[i] or {}).get("clip_path") fname = os.path.basename(clip_local) if clip_local else None if fname: files.append(fname) files = [f for f in files if f and f not in discard_set] if not files: st.write(f"- SPEAKER_{lbl:02d} — sense clips seleccionats") continue cur = st.session_state[f"{key_prefix}_idx"] if cur >= len(files): cur = 0 st.session_state[f"{key_prefix}_idx"] = cur fname = files[cur] audio_url = f"{backend_base_url}/preprocessing/audio/{vname}/{fname}" if (vname and fname) else None st.markdown(f"**SPEAKER_{lbl:02d} — {len(files)} clips**") c1, c2 = st.columns([1, 2]) with c1: if audio_url: st.audio(audio_url, format="audio/wav") st.caption(f"Clip {cur+1}/{len(files)}") bcol1, bcol2, bcol3 = st.columns(3) with bcol1: if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"): st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(files) st.rerun() with bcol2: if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquest clip del clúster"): st.session_state[f"{key_prefix}_discard"].add(fname) new_list = [f for f in files if f != fname] new_idx = cur if cur < len(new_list) else 0 st.session_state[f"{key_prefix}_idx"] = new_idx st.rerun() with bcol3: if st.button("➡️", key=f"next_{key_prefix}", help="Següent"): st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(files) st.rerun() with c2: name_key = f"{key_prefix}_name" desc_key = f"{key_prefix}_desc" default_name = get_catalan_name_for_speaker(lbl, used_names_home, used_names_dona) st.text_input("Nom del clúster", value=st.session_state.get(name_key, default_name), key=name_key) st.text_area("Descripció", value=st.session_state.get(desc_key, ""), key=desc_key, height=80) # --- 5. Carruseles de escenas --- if st.session_state.get("scene_detection_done"): st.markdown("---") scene_clusters = st.session_state.get("scene_clusters") n_scenes = len(scene_clusters or []) st.subheader(f"📍 Escenes — clústers: {n_scenes}") if not scene_clusters: st.info("No s'han detectat clústers d'escenes en aquest clip.") else: for sidx, sc in enumerate(scene_clusters): try: folder_name = Path(sc.get("folder") or "").name except Exception: folder_name = "" scene_id = sc.get("id") or folder_name or f"scene{sidx+1}" key_prefix = re.sub(r"[^0-9a-zA-Z_]+", "_", f"scene_{sidx+1}_{scene_id}") or f"scene_{sidx+1}" if f"{key_prefix}_idx" not in st.session_state: st.session_state[f"{key_prefix}_idx"] = 0 if f"{key_prefix}_discard" not in st.session_state: st.session_state[f"{key_prefix}_discard"] = set() frames_all = sc.get("frame_files") or ([sc.get("image_url")] if sc.get("image_url") else []) frames_all = [f for f in frames_all if f] discard_set = st.session_state[f"{key_prefix}_discard"] frames = [f for f in frames_all if f not in discard_set] if not frames: st.write(f"- {sidx+1}. (sense imatges de l'escena)") continue cur = st.session_state[f"{key_prefix}_idx"] if cur >= len(frames): cur = 0 st.session_state[f"{key_prefix}_idx"] = cur fname = frames[cur] if str(fname).startswith("/files/"): img_url = f"{backend_base_url}/preprocessing{fname}" else: base = sc.get("image_url") or "" base_dir = "/".join((base or "/").split("/")[:-1]) img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}" st.markdown(f"**{sidx+1}. Escena — {sc.get('num_frames', 0)} frames**") spacer_col, main_content_col = st.columns([0.12, 0.88]) with spacer_col: st.write("") with main_content_col: media_col, form_col = st.columns([1.4, 2.6]) with media_col: st.image(img_url, width=220) st.caption(f"Imatge {cur+1}/{len(frames)}") nav_prev, nav_del, nav_next = st.columns(3) with nav_prev: if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"): st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(frames) st.rerun() with nav_del: if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquesta imatge del clúster"): st.session_state[f"{key_prefix}_discard"].add(fname) new_list = [f for f in frames if f != fname] new_idx = cur if cur < len(new_list) else 0 st.session_state[f"{key_prefix}_idx"] = new_idx st.rerun() with nav_next: if st.button("➡️", key=f"next_{key_prefix}", help="Següent"): st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(frames) st.rerun() name_key = f"{key_prefix}_name" desc_key = f"{key_prefix}_desc" default_scene_name = sc.get("name", "") default_scene_desc = sc.get("description", "") if default_scene_name and (name_key not in st.session_state or not st.session_state.get(name_key)): st.session_state[name_key] = default_scene_name elif name_key not in st.session_state: st.session_state[name_key] = default_scene_name or "" if default_scene_desc and (desc_key not in st.session_state or not st.session_state.get(desc_key)): st.session_state[desc_key] = default_scene_desc elif desc_key not in st.session_state: st.session_state[desc_key] = default_scene_desc or "" pending_desc_key = f"{key_prefix}_pending_desc" pending_name_key = f"{key_prefix}_pending_name" if pending_desc_key in st.session_state: if desc_key not in st.session_state: st.session_state[desc_key] = "" st.session_state[desc_key] = st.session_state[pending_desc_key] del st.session_state[pending_desc_key] if pending_name_key in st.session_state: if name_key not in st.session_state: st.session_state[name_key] = "" if not st.session_state.get(name_key): st.session_state[name_key] = st.session_state[pending_name_key] del st.session_state[pending_name_key] with form_col: st.text_input("Nom del clúster", key=name_key) st.text_area("Descripció", key=desc_key, height=80) if st.button("🎨 Generar descripció amb Salamandra Vision", key=f"svision_{key_prefix}"): with st.spinner("Generant descripció..."): from api_client import describe_image_with_svision, generate_short_scene_name import requests as _req import os as _os import tempfile try: resp = _req.get(img_url, timeout=10) if resp.status_code == 200: with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: tmp.write(resp.content) tmp_path = tmp.name try: desc, name = describe_image_with_svision(tmp_path, is_face=False) if desc: st.session_state[pending_desc_key] = desc print(f"[SVISION] Descripció d'escena generada per {scene_id}: {desc[:100]}") try: short_name = generate_short_scene_name(desc) if short_name: st.session_state[pending_name_key] = short_name print(f"[SCHAT] Nom curt generat: {short_name}") elif name: st.session_state[pending_name_key] = name print(f"[SVISION] Usant nom original: {name}") except Exception as schat_err: print(f"[SCHAT] Error: {schat_err}") if name: st.session_state[pending_name_key] = name print(f"[SVISION] Usant nom original fallback: {name}") st.success("✅ Descripció i nom generats!") else: st.warning("⚠️ No s'ha pogut generar una descripció.") print(f"[SVISION] Descripció d'escena buida per {scene_id}") finally: # Always clean up the temp file try: _os.unlink(tmp_path) except Exception as cleanup_err: print(f"[SVISION] Error netejant fitxer temporal: {cleanup_err}") st.rerun() else: st.error(f"No s'ha pogut descarregar la imatge (status: {resp.status_code})") except Exception as e: st.error(f"Error generant descripció: {str(e)}") print(f"[SVISION] Error complet: {e}") import traceback traceback.print_exc() # --- 6. Confirmación de casting y personajes combinados --- if st.session_state.get("detect_done"): st.markdown("---") colc1, colc2 = st.columns([1,1]) with colc1: if st.button("Confirmar càsting definitiu", type="primary"): chars_payload = [] for idx, ch in enumerate(st.session_state.characters_detected or []): try: folder_name = Path(ch.get("folder") or "").name except Exception: folder_name = "" char_id = ch.get("id") or folder_name or f"char{idx+1}" def _safe_key(s: str) -> str: k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "") return k or f"cluster_{idx+1}" key_prefix = _safe_key(f"char_{idx+1}_{char_id}") name = st.session_state.get(f"{key_prefix}_name") or ch.get("name") or f"Personatge {idx+1}" desc = st.session_state.get(f"{key_prefix}_desc", "") faces_all = ch.get("face_files") or [] discard = st.session_state.get(f"{key_prefix}_discard", set()) kept = [f for f in faces_all if f and f not in discard] chars_payload.append({ "id": char_id, "name": name, "description": desc, "folder": ch.get("folder"), "kept_files": kept, }) used_names_home_fin = [] used_names_dona_fin = [] noms_home_all, noms_dona_all = get_all_catalan_names() for cp in chars_payload: face_name = cp.get("name", "") if face_name in noms_home_all: used_names_home_fin.append(face_name) elif face_name in noms_dona_all: used_names_dona_fin.append(face_name) segs = st.session_state.audio_segments or [] vlabels = st.session_state.voice_labels or [] vname = st.session_state.video_name_from_engine voice_clusters = {} for i, seg in enumerate(segs): lbl = vlabels[i] if i < len(vlabels) else -1 # Només considerem clústers de veu amb etiqueta vàlida (enter >= 0) if not (isinstance(lbl, int) and lbl >= 0): continue clip_local = seg.get("clip_path") fname = os.path.basename(clip_local) if clip_local else None if fname: default_voice_name = get_catalan_name_for_speaker(int(lbl), used_names_home_fin, used_names_dona_fin) voice_clusters.setdefault(lbl, {"label": lbl, "name": default_voice_name, "description": "", "clips": []}) vpref = f"voice_{int(lbl):02d}" vname_custom = st.session_state.get(f"{vpref}_name") vdesc_custom = st.session_state.get(f"{vpref}_desc") if vname_custom: voice_clusters[lbl]["name"] = vname_custom if vdesc_custom is not None: voice_clusters[lbl]["description"] = vdesc_custom voice_clusters[lbl]["clips"].append(fname) payload = { "video_name": vname, "base_dir": st.session_state.get("engine_base_dir"), "characters": chars_payload, "voice_clusters": list(voice_clusters.values()), } if not payload["video_name"] or not payload["base_dir"]: st.error("Falten dades del vídeo per confirmar el càsting (video_name/base_dir). Torna a processar el vídeo.") else: with st.spinner("Consolidant càsting al servidor…"): res_fc = api.finalize_casting(payload) if isinstance(res_fc, dict) and res_fc.get("ok"): st.success(f"Càsting consolidat. Identities: {len(res_fc.get('face_identities', []))} cares, {len(res_fc.get('voice_identities', []))} veus.") st.session_state.casting_finalized = True # Guardar casting_json localment per a futurs processos (p.ex. audiodescripció) try: casting_json = res_fc.get("casting_json") or {} v = st.session_state.get("video_uploaded") or {} sha1 = v.get("sha1sum") if casting_json and sha1: base_dir = Path(__file__).parent.parent / "temp" / "media" / sha1 base_dir.mkdir(parents=True, exist_ok=True) casting_path = base_dir / "casting.json" with casting_path.open("w", encoding="utf-8") as f: json.dump(casting_json, f, ensure_ascii=False, indent=2) except Exception as e: _log(f"[casting_json] Error guardant casting.json: {e}") f_id = res_fc.get('face_identities', []) or [] v_id = res_fc.get('voice_identities', []) or [] c3, c4 = st.columns(2) with c3: st.markdown("**Identitats de cara**") for n in f_id: st.write(f"- {n}") with c4: st.markdown("**Identitats de veu**") for n in v_id: st.write(f"- {n}") faces_dir = res_fc.get('faces_dir') voices_dir = res_fc.get('voices_dir') db_dir = res_fc.get('db_dir') with st.spinner("Carregant índexs al cercador (Chroma)…"): load_res = api.load_casting(faces_dir=faces_dir, voices_dir=voices_dir, db_dir=db_dir, drop_collections=True) if isinstance(load_res, dict) and load_res.get('ok'): st.success(f"Índexs carregats: {load_res.get('faces', 0)} cares, {load_res.get('voices', 0)} veus.") else: st.error(f"Error carregant índexs: {load_res}") else: # Tractament específic per al cas 404 (endpoint inexistent al engine) if isinstance(res_fc, dict) and res_fc.get("status_code") == 404: st.error( "No s'ha pogut consolidar el càsting perquè l'endpoint " "\"/finalize_casting\" no està disponible al servidor d'engine. " "Aquesta funcionalitat encara no està implementada o està desactivada." ) else: st.error("No s'ha pogut consolidar el càsting per un error al servidor.") # --- Personatges combinats (cares + veus) --- if st.session_state.get("casting_finalized"): st.markdown("---") st.subheader("👥 Personatges") def normalize_name(name: str) -> str: import unicodedata name_upper = name.upper() name_normalized = ''.join( c for c in unicodedata.normalize('NFD', name_upper) if unicodedata.category(c) != 'Mn' ) return name_normalized chars_payload = [] for idx, ch in enumerate(st.session_state.characters_detected or []): try: folder_name = Path(ch.get("folder") or "").name except Exception: folder_name = "" char_id = ch.get("id") or folder_name or f"char{idx+1}" def _safe_key(s: str) -> str: k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "") return k or f"cluster_{idx+1}" key_prefix = _safe_key(f"char_{idx+1}_{char_id}") name = st.session_state.get(f"{key_prefix}_name") or ch.get("name") or f"Personatge {idx+1}" name_normalized = normalize_name(name) desc = st.session_state.get(f"{key_prefix}_desc", "").strip() chars_payload.append({ "name": name, "name_normalized": name_normalized, "face_key_prefix": key_prefix, "face_files": ch.get("face_files") or [], "char_data": ch, "description": desc, }) used_names_home_pers = [] used_names_dona_pers = [] noms_home_all, noms_dona_all = get_all_catalan_names() for cp in chars_payload: face_name = cp.get("name", "") if face_name in noms_home_all: used_names_home_pers.append(face_name) elif face_name in noms_dona_all: used_names_dona_pers.append(face_name) segs = st.session_state.audio_segments or [] vlabels = st.session_state.voice_labels or [] vname = st.session_state.video_name_from_engine voice_clusters_by_name = {} for i, seg in enumerate(segs): lbl = vlabels[i] if i < len(vlabels) else -1 if not (isinstance(lbl, int) and lbl >= 0): continue vpref = f"voice_{int(lbl):02d}" default_voice_name = get_catalan_name_for_speaker(int(lbl), used_names_home_pers, used_names_dona_pers) if isinstance(lbl, int) and lbl >= 0 else f"SPEAKER_{int(lbl):02d}" vname_custom = st.session_state.get(f"{vpref}_name") or default_voice_name vname_normalized = normalize_name(vname_custom) vdesc = st.session_state.get(f"{vpref}_desc", "").strip() clip_local = seg.get("clip_path") fname = os.path.basename(clip_local) if clip_local else None if fname: voice_clusters_by_name.setdefault(vname_normalized, { "voice_key_prefix": vpref, "clips": [], "label": lbl, "original_name": vname_custom, "description": vdesc, }) voice_clusters_by_name[vname_normalized]["clips"].append(fname) all_normalized_names = set([c["name_normalized"] for c in chars_payload] + list(voice_clusters_by_name.keys())) for pidx, norm_name in enumerate(sorted(all_normalized_names)): face_items = [c for c in chars_payload if c["name_normalized"] == norm_name] voice_data = voice_clusters_by_name.get(norm_name) display_name = face_items[0]["name"] if face_items else (voice_data["original_name"] if voice_data else norm_name) descriptions = [] for face_item in face_items: if face_item["description"]: descriptions.append(face_item["description"]) if voice_data and voice_data.get("description"): descriptions.append(voice_data["description"]) combined_description = "\n".join(descriptions) if descriptions else "" st.markdown(f"**{pidx+1}. {display_name}**") all_faces = [] for face_item in face_items: all_faces.extend(face_item["face_files"]) face_data = face_items[0] if face_items else None col_faces, col_voices, col_text = st.columns([1, 1, 1.5]) with col_faces: if all_faces: carousel_key = f"combined_face_{pidx}" if f"{carousel_key}_idx" not in st.session_state: st.session_state[f"{carousel_key}_idx"] = 0 cur = st.session_state[f"{carousel_key}_idx"] if cur >= len(all_faces): cur = 0 st.session_state[f"{carousel_key}_idx"] = cur fname = all_faces[cur] ch = face_data["char_data"] if face_data else {} if fname.startswith("/files/"): img_url = f"{backend_base_url}/preprocessing{fname}" else: base = ch.get("image_url") or "" base_dir = "/".join((base or "/").split("/")[:-1]) img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}" st.image(img_url, width=150) st.caption(f"Cara {cur+1}/{len(all_faces)}") bcol1, bcol2 = st.columns(2) with bcol1: if st.button("⬅️", key=f"combined_face_prev_{pidx}"): st.session_state[f"{carousel_key}_idx"] = (cur - 1) % len(all_faces) st.rerun() with bcol2: if st.button("➡️", key=f"combined_face_next_{pidx}"): st.session_state[f"{carousel_key}_idx"] = (cur + 1) % len(all_faces) st.rerun() else: st.info("Sense imatges") with col_voices: if voice_data: clips = voice_data["clips"] if clips: carousel_key = f"combined_voice_{pidx}" if f"{carousel_key}_idx" not in st.session_state: st.session_state[f"{carousel_key}_idx"] = 0 cur = st.session_state[f"{carousel_key}_idx"] if cur >= len(clips): cur = 0 st.session_state[f"{carousel_key}_idx"] = cur fname = clips[cur] audio_url = f"{backend_base_url}/preprocessing/audio/{vname}/{fname}" if (vname and fname) else None if audio_url: st.audio(audio_url, format="audio/wav") st.caption(f"Veu {cur+1}/{len(clips)}") bcol1, bcol2 = st.columns(2) with bcol1: if st.button("⬅️", key=f"combined_voice_prev_{pidx}"): st.session_state[f"{carousel_key}_idx"] = (cur - 1) % len(clips) st.rerun() with bcol2: if st.button("➡️", key=f"combined_voice_next_{pidx}"): st.session_state[f"{carousel_key}_idx"] = (cur + 1) % len(clips) st.rerun() else: st.info("Sense clips de veu") else: st.info("Sense dades de veu") with col_text: combined_name_key = f"combined_char_{pidx}_name" combined_desc_key = f"combined_char_{pidx}_desc" if combined_name_key not in st.session_state: st.session_state[combined_name_key] = norm_name if combined_desc_key not in st.session_state: st.session_state[combined_desc_key] = combined_description st.text_input("Nom del personatge", key=combined_name_key, label_visibility="collapsed", placeholder="Nom del personatge") st.text_area("Descripció", key=combined_desc_key, height=120, label_visibility="collapsed", placeholder="Descripció del personatge") # --- 7. Generar audiodescripció --- st.markdown("---") if st.button("🎬 Generar audiodescripció", type="primary", use_container_width=True): v = st.session_state.get("video_uploaded") if not v: st.error("No hi ha cap vídeo carregat.") else: progress_placeholder = st.empty() result_placeholder = st.empty() with st.spinner("Generant audiodescripció... Aquest procés pot trigar diversos minuts."): progress_placeholder.info("⏳ Processant vídeo i generant audiodescripció...") try: sha1 = v.get("sha1sum") if not sha1: result_placeholder.error("Falta sha1sum del vídeo per generar l'audiodescripció.") return base_media_dir = Path(__file__).parent.parent / "temp" / "media" / sha1 base_media_dir.mkdir(parents=True, exist_ok=True) # 1) Carregar i enviar el casting_json com a embeddings al engine casting_json = None try: casting_path = base_media_dir / "casting.json" if casting_path.exists(): with casting_path.open("r", encoding="utf-8") as f: casting_json = json.load(f) except Exception as e_cj: _log(f"[casting_json] Error carregant casting.json: {e_cj}") if casting_json: try: upload_res = api.upload_embeddings(sha1, casting_json) _log(f"[embeddings] upload_embeddings resp: {upload_res}") except Exception as e_up: _log(f"[embeddings] Error pujant embeddings a engine: {e_up}") # 2) Pipeline inicial: generate_initial_srt_and_info + descarregar fitxers try: init_resp = api.generate_initial_srt_and_info(sha1) _log(f"[initial] generate_initial_srt_and_info resp: {init_resp}") except Exception as e_init: _log(f"[initial] Error cridant generate_initial_srt_and_info: {e_init}") init_resp = {"error": str(e_init)} if isinstance(init_resp, dict) and init_resp.get("error"): result_placeholder.error(f"❌ Error al pipeline inicial: {init_resp.get('error')}") return # Descarregar i guardar initial.srt init_srt_text = "" init_info_text = "" try: srt_resp = api.download_initial_srt(sha1) if isinstance(srt_resp, dict) and not srt_resp.get("error"): init_srt_text = srt_resp.get("text", "") or "" initial_srt_path = base_media_dir / "initial.srt" with initial_srt_path.open("w", encoding="utf-8") as f_srt: f_srt.write(init_srt_text) _log(f"[initial] initial.srt desat a {initial_srt_path}") else: _log(f"[initial] Error descarregant initial.srt: {srt_resp}") except Exception as e_srt: _log(f"[initial] Excepció descarregant initial.srt: {e_srt}") # Descarregar i guardar info.json try: info_resp = api.download_initial_info(sha1) if isinstance(info_resp, dict) and not info_resp.get("error"): init_info_text = info_resp.get("text", "") or "" info_path = base_media_dir / "info.json" with info_path.open("w", encoding="utf-8") as f_info: f_info.write(init_info_text) _log(f"[initial] info.json desat a {info_path}") else: _log(f"[initial] Error descarregant info.json: {info_resp}") except Exception as e_info: _log(f"[initial] Excepció descarregant info.json: {e_info}") # 3) Llegir config.yaml per saber quines versions i refinement cal generar salamandra_enabled = True moe_enabled = True reflection_enabled = True reflexion_enabled = False introspection_enabled = False twilio_enabled_cfg = False zapier_enabled_cfg = False une_validator_sms_enabled = False une_phone_validator = "" try: base_dir_cfg = Path(__file__).parent.parent cfg_path = base_dir_cfg / "config.yaml" if cfg_path.exists(): with cfg_path.open("r", encoding="utf-8") as f_cfg: cfg = yaml.safe_load(f_cfg) or {} ver_cfg = cfg.get("versions", {}) or {} salamandra_enabled = bool(ver_cfg.get("Salamandra_enabled", True)) moe_enabled = bool(ver_cfg.get("MoE_enabled", True)) ref_cfg = cfg.get("refinement", {}) or {} reflection_enabled = bool(ref_cfg.get("reflection_enabled", True)) reflexion_enabled = bool(ref_cfg.get("reflexion_enabled", False)) introspection_enabled = bool(ref_cfg.get("introspection_enabled", False)) auto_cfg = cfg.get("automation", {}) or {} twilio_enabled_cfg = bool(auto_cfg.get("twilio_enabled", False)) zapier_enabled_cfg = bool(auto_cfg.get("zapier_enabled", False)) val_cfg = cfg.get("validation", {}) or {} une_validator_sms_enabled = bool(val_cfg.get("une_validator_sms_enabled", False)) une_phone_validator = str(val_cfg.get("une_phone_validator") or "").strip() except Exception as e_cfg: _log(f"[config] Error llegint config.yaml: {e_cfg}") # Dades comunes per a esdeveniments session_id = st.session_state.get("session_id", "") ip = st.session_state.get("client_ip", "") username = ( (st.session_state.get("user") or {}).get("username") if st.session_state.get("user") else "" ) password = st.session_state.get("last_password", "") phone = ( st.session_state.get("sms_phone_verified") or st.session_state.get("sms_phone") or "" ) vis_choice = st.session_state.get("video_visibility", "Privat") vis_flag = "public" if vis_choice.strip().lower().startswith("púb") else "private" any_success = False refined_any = False # 4) Salamandra if salamandra_enabled: progress_placeholder.info("🐍 Generant versió Salamandra...") try: gen_resp = api.generate_salamandra_result(sha1) _log(f"[Salamandra] generate_salamandra_result resp: {gen_resp}") except Exception as e_gen_s: _log(f"[Salamandra] Error cridant generate_salamandra_result: {e_gen_s}") gen_resp = {"error": str(e_gen_s)} if isinstance(gen_resp, dict) and gen_resp.get("error"): _log(f"[Salamandra] Error en generació: {gen_resp.get('error')}") else: salamandra_srt = "" salamandra_free = "" try: srt_s = api.download_salamandra_srt(sha1) if isinstance(srt_s, dict) and not srt_s.get("error"): salamandra_srt = srt_s.get("text", "") or "" sal_dir = base_media_dir / "Salamandra" sal_dir.mkdir(parents=True, exist_ok=True) sal_srt_path = sal_dir / "result.srt" with sal_srt_path.open("w", encoding="utf-8") as f_ss: f_ss.write(salamandra_srt) _log(f"[Salamandra] result.srt desat a {sal_srt_path}") else: _log(f"[Salamandra] Error descarregant SRT: {srt_s}") except Exception as e_ds: _log(f"[Salamandra] Excepció descarregant SRT: {e_ds}") try: free_s = api.download_salamandra_free_narration(sha1) if isinstance(free_s, dict) and not free_s.get("error"): salamandra_free = free_s.get("text", "") or "" sal_dir = base_media_dir / "Salamandra" sal_dir.mkdir(parents=True, exist_ok=True) sal_free_path = sal_dir / "free_narration.txt" with sal_free_path.open("w", encoding="utf-8") as f_sf: f_sf.write(salamandra_free) _log(f"[Salamandra] free_narration.txt desat a {sal_free_path}") else: _log(f"[Salamandra] Error descarregant free_narration: {free_s}") except Exception as e_df: _log(f"[Salamandra] Excepció descarregant free_narration: {e_df}") # Persistir a audiodescriptions.db try: upsert_audiodescription_text( sha1sum=sha1, version="Salamandra", une_ad=salamandra_srt or "", free_ad=salamandra_free or "", ) any_success = True except Exception as db_exc: _log(f"[audiodescriptions] Error desant AD Salamandra: {db_exc}") # Esdeveniments try: if salamandra_srt: log_event( session=session_id, ip=ip, user=username or "", password=password or "", phone=phone, action="Salamandra AD generated", sha1sum=sha1, visibility=vis_flag, ) if salamandra_free: log_event( session=session_id, ip=ip, user=username or "", password=password or "", phone=phone, action="Salamandra free AD generated", sha1sum=sha1, visibility=vis_flag, ) except Exception as e_evt_s: _log(f"[events] Error registrant esdeveniments Salamandra: {e_evt_s}") # 5) MoE if moe_enabled: progress_placeholder.info("🧠 Generant versió MoE...") try: gen_resp_m = api.generate_moe_result(sha1) _log(f"[MoE] generate_moe_result resp: {gen_resp_m}") except Exception as e_gen_m: _log(f"[MoE] Error cridant generate_moe_result: {e_gen_m}") gen_resp_m = {"error": str(e_gen_m)} if isinstance(gen_resp_m, dict) and gen_resp_m.get("error"): _log(f"[MoE] Error en generació: {gen_resp_m.get('error')}") else: moe_srt = "" moe_free = "" try: srt_m = api.download_moe_srt(sha1) if isinstance(srt_m, dict) and not srt_m.get("error"): moe_srt = srt_m.get("text", "") or "" moe_dir = base_media_dir / "MoE" moe_dir.mkdir(parents=True, exist_ok=True) moe_srt_path = moe_dir / "result.srt" with moe_srt_path.open("w", encoding="utf-8") as f_ms: f_ms.write(moe_srt) _log(f"[MoE] result.srt desat a {moe_srt_path}") else: _log(f"[MoE] Error descarregant SRT: {srt_m}") except Exception as e_dm_s: _log(f"[MoE] Excepció descarregant SRT: {e_dm_s}") try: free_m = api.download_moe_free_narration(sha1) if isinstance(free_m, dict) and not free_m.get("error"): moe_free = free_m.get("text", "") or "" moe_dir = base_media_dir / "MoE" moe_dir.mkdir(parents=True, exist_ok=True) moe_free_path = moe_dir / "free_narration.txt" with moe_free_path.open("w", encoding="utf-8") as f_mf: f_mf.write(moe_free) _log(f"[MoE] free_narration.txt desat a {moe_free_path}") else: _log(f"[MoE] Error descarregant free_narration: {free_m}") except Exception as e_dm_f: _log(f"[MoE] Excepció descarregant free_narration: {e_dm_f}") # Persistir a audiodescriptions.db try: upsert_audiodescription_text( sha1sum=sha1, version="MoE", une_ad=moe_srt or "", free_ad=moe_free or "", ) any_success = True except Exception as db_exc_m: _log(f"[audiodescriptions] Error desant AD MoE: {db_exc_m}") # Esdeveniments try: if moe_srt: log_event( session=session_id, ip=ip, user=username or "", password=password or "", phone=phone, action="MoE AD generated", sha1sum=sha1, visibility=vis_flag, ) if moe_free: log_event( session=session_id, ip=ip, user=username or "", password=password or "", phone=phone, action="MoE free AD generated", sha1sum=sha1, visibility=vis_flag, ) except Exception as e_evt_m: _log(f"[events] Error registrant esdeveniments MoE: {e_evt_m}") # 6) Refinement opcional sobre les versions generades try: refinement_active = bool(reflection_enabled or reflexion_enabled or introspection_enabled) if refinement_active: # Guardar info_ad (info.json inicial) si el tenim if init_info_text and sha1: try: update_audiodescription_info_ad( sha1sum=sha1, version="Salamandra", info_ad=init_info_text, ) except Exception: pass try: update_audiodescription_info_ad( sha1sum=sha1, version="MoE", info_ad=init_info_text, ) except Exception: pass # Refinar Salamandra if salamandra_enabled and salamandra_srt: try: ref_resp_s = api.apply_refinement( sha1sum=sha1, version="Salamandra", srt_content=salamandra_srt, reflection_enabled=reflection_enabled, reflexion_enabled=reflexion_enabled, introspection_enabled=introspection_enabled, ) _log(f"[Refinement] Salamandra resp: {ref_resp_s}") refined_srt = None if isinstance(ref_resp_s, dict): refined_srt = ref_resp_s.get("refined_srt") or ref_resp_s.get("refinedSrt") if refined_srt: update_audiodescription_text( sha1sum=sha1, version="Salamandra", une_ad=refined_srt, ) refined_any = True try: import hashlib as _hashlib srt_hash = _hashlib.sha1(refined_srt.encode("utf-8")).hexdigest() log_event( session=session_id, ip=ip, user=username or "", password=password or "", phone=phone, action="Refined AD", sha1sum=srt_hash, visibility=vis_flag, ) except Exception as e_evt_ref_s: _log(f"[events] Error registrant Refined AD (Salamandra): {e_evt_ref_s}") except Exception as e_ref_s: _log(f"[Refinement] Error refinant Salamandra: {e_ref_s}") # Refinar MoE if moe_enabled and moe_srt: try: ref_resp_m = api.apply_refinement( sha1sum=sha1, version="MoE", srt_content=moe_srt, reflection_enabled=reflection_enabled, reflexion_enabled=reflexion_enabled, introspection_enabled=introspection_enabled, ) _log(f"[Refinement] MoE resp: {ref_resp_m}") refined_srt_m = None if isinstance(ref_resp_m, dict): refined_srt_m = ref_resp_m.get("refined_srt") or ref_resp_m.get("refinedSrt") if refined_srt_m: update_audiodescription_text( sha1sum=sha1, version="MoE", une_ad=refined_srt_m, ) refined_any = True try: import hashlib as _hashlib srt_hash_m = _hashlib.sha1(refined_srt_m.encode("utf-8")).hexdigest() log_event( session=session_id, ip=ip, user=username or "", password=password or "", phone=phone, action="Refined AD", sha1sum=srt_hash_m, visibility=vis_flag, ) except Exception as e_evt_ref_m: _log(f"[events] Error registrant Refined AD (MoE): {e_evt_ref_m}") except Exception as e_ref_m: _log(f"[Refinement] Error refinant MoE: {e_ref_m}") except Exception as e_ref: _log(f"[Refinement] Error global de refinement: {e_ref}") # 7) Enviament opcional d'SMS per a validació UNE i event 'Waiting for UNE validation' try: if any_success and refined_any and sha1: sms_channels_enabled = bool(twilio_enabled_cfg or zapier_enabled_cfg) if sms_channels_enabled and une_validator_sms_enabled and une_phone_validator: try: # Text de l'SMS en català, tal com has indicat sms_msg = "Noves audiodescripcions a validar segons la norma UNE-153020" compliance_client.notify_une_validator_new_ads( phone=une_phone_validator, message=sms_msg, ) except Exception as e_sms_call: _log(f"[UNE SMS] Error cridant compliance per UNE: {e_sms_call}") # Registrar estat d'espera de validació UNE a events.db try: log_event( session=session_id, ip=ip, user=username or "", password=password or "", phone=une_phone_validator, action="Waiting for UNE validation", sha1sum=sha1, visibility=vis_flag, ) except Exception as e_evt_wait: _log(f"[events] Error registrant Waiting for UNE validation: {e_evt_wait}") except Exception as e_sms: _log(f"[UNE SMS] Error en flux d'SMS/espera validació: {e_sms}") # 8) Actualitzar status del vídeo a 'UNE-pending' a videos.db try: if any_success and sha1: update_video_status(sha1, "UNE-pending") except Exception as e_upd_status: _log(f"[videos] Error actualitzant status a 'UNE-pending': {e_upd_status}") # 9) Invocar Space TTS per generar free_ad.mp3 i une_ad.mp4 a temp/media//Original try: if any_success and sha1: # Obtenir el text UNE més recent des d'audiodescriptions.db (prioritzem Salamandra) une_text = "" row_s = get_audiodescription(sha1, "Salamandra") if row_s is not None: try: une_text = (row_s["une_ad"] or "").strip() except Exception: une_text = "" if not une_text: row_m = get_audiodescription(sha1, "MoE") if row_m is not None: try: une_text = (row_m["une_ad"] or "").strip() except Exception: une_text = "" if une_text: base_media_dir = Path(__file__).parent.parent / "temp" / "media" / sha1 video_path = base_media_dir / "video.mp4" if not video_path.exists(): # Assegurar que tenim la media localment try: ensure_media_for_video(Path(__file__).parent.parent, api, sha1) except Exception as e_em: _log(f"[TTS] Error assegurant media per al vídeo: {e_em}") if video_path.exists(): # Preparar carpeta de sortida Original original_dir = base_media_dir / "Original" original_dir.mkdir(parents=True, exist_ok=True) # Escriure SRT temporal i cridar Space TTS (/tts/srt) tts_url = os.getenv("API_TTS_URL", "").strip() if tts_url: try: with tempfile.TemporaryDirectory(prefix="tts_srt_") as td: td_path = Path(td) srt_tmp = td_path / "ad_input.srt" srt_tmp.write_text(une_text, encoding="utf-8") files = { "srt": ("ad_input.srt", srt_tmp.open("rb"), "text/plain"), "video": ("video.mp4", video_path.open("rb"), "video/mp4"), } data = { "voice": "central/grau", "ad_format": "mp3", "include_final_mp4": "1", } resp = requests.post( f"{tts_url.rstrip('/')}/tts/srt", files=files, data=data, timeout=300, ) resp.raise_for_status() # La resposta és un ZIP amb ad_master.(mp3|wav), mix i opcionalment video_con_ad.mp4 zip_bytes = resp.content with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: for member in zf.infolist(): name = member.filename lower = name.lower() if lower.endswith("ad_master.mp3"): target = original_dir / "free_ad.mp3" with zf.open(member) as src, target.open("wb") as dst: shutil.copyfileobj(src, dst) elif lower.endswith("video_con_ad.mp4"): target = original_dir / "une_ad.mp4" with zf.open(member) as src, target.open("wb") as dst: shutil.copyfileobj(src, dst) except Exception as e_tts: _log(f"[TTS] Error generant assets TTS (free_ad.mp3/une_ad.mp4): {e_tts}") else: _log("[TTS] API_TTS_URL no configurada; s'omet la generació de free_ad.mp3/une_ad.mp4") else: _log("[TTS] No s'ha trobat text UNE per al vídeo; s'omet la generació TTS") except Exception as e_tts_global: _log(f"[TTS] Error global al flux TTS: {e_tts_global}") # 10) Registrar acció "AD generated" a actions.db per a aquest vídeo try: if any_success and sha1: session_id_actions = session_id actions_user, actions_phone = get_latest_user_phone_for_session(session_id_actions) if not actions_user: actions_user = username or "" if not actions_phone: actions_phone = phone or "" insert_action( session=session_id_actions, user=actions_user, phone=actions_phone, action="AD generated", sha1sum=sha1, ) except Exception as e_act: _log(f"[actions] Error registrant acció 'AD generated': {e_act}") if any_success: progress_placeholder.success("✅ Audiodescripció generada i desada. Ara està pendent de validació UNE.") result_placeholder.info("La teva audiodescripció s'està generant i queda pendent de validació. Pots sortir de la sessió guardant els canvis i tornar més endavant per revisar el resultat.") else: progress_placeholder.empty() result_placeholder.error("❌ No s'ha pogut generar cap versió d'audiodescripció.") except Exception as e: progress_placeholder.empty() result_placeholder.error(f"❌ Excepció durant la generació: {e}")