|
|
"""UI logic for the "Processar vídeo nou" page - Recovered from backup with full functionality.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import re |
|
|
import shutil |
|
|
import subprocess |
|
|
import os |
|
|
import time |
|
|
import tempfile |
|
|
import hashlib |
|
|
from pathlib import Path |
|
|
import sys |
|
|
from datetime import datetime |
|
|
import yaml |
|
|
import sqlite3 |
|
|
import json |
|
|
import zipfile |
|
|
import io |
|
|
import requests |
|
|
|
|
|
import streamlit as st |
|
|
from PIL import Image, ImageDraw |
|
|
from databases import ( |
|
|
log_action, |
|
|
has_video_approval_action, |
|
|
upsert_audiodescription_text, |
|
|
get_latest_user_phone_for_session, |
|
|
insert_action, |
|
|
ensure_video_row_for_upload, |
|
|
is_video_input_ok, |
|
|
update_video_status, |
|
|
get_audiodescription, |
|
|
) |
|
|
from compliance_client import compliance_client |
|
|
from persistent_data_gate import ensure_temp_databases, _load_data_origin, ensure_media_for_video |
|
|
|
|
|
|
|
|
def get_all_catalan_names(): |
|
|
"""Retorna tots els noms catalans disponibles.""" |
|
|
noms_home = ["Jordi", "Marc", "Pau", "Pere", "Joan", "Josep", "David", "Àlex", "Guillem", "Albert", |
|
|
"Arnau", "Martí", "Bernat", "Oriol", "Roger", "Pol", "Lluís", "Sergi", "Carles", "Xavier"] |
|
|
noms_dona = ["Maria", "Anna", "Laura", "Marta", "Cristina", "Núria", "Montserrat", "Júlia", "Sara", "Carla", |
|
|
"Alba", "Elisabet", "Rosa", "Gemma", "Sílvia", "Teresa", "Irene", "Laia", "Marina", "Bet"] |
|
|
return noms_home, noms_dona |
|
|
|
|
|
|
|
|
def _log(msg: str) -> None: |
|
|
"""Helper de logging a stderr amb timestamp (coherent amb auth.py).""" |
|
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
sys.stderr.write(f"[{ts}] {msg}\n") |
|
|
sys.stderr.flush() |
|
|
|
|
|
|
|
|
def get_catalan_name_for_speaker(speaker_label: int, used_names_home: list = None, used_names_dona: list = None) -> str: |
|
|
"""Genera un nom català per a un speaker, reutilitzant noms de caras si estan disponibles.""" |
|
|
noms_home, noms_dona = get_all_catalan_names() |
|
|
|
|
|
if used_names_home is None: |
|
|
used_names_home = [] |
|
|
if used_names_dona is None: |
|
|
used_names_dona = [] |
|
|
|
|
|
is_male = (speaker_label % 2 == 0) |
|
|
|
|
|
if is_male: |
|
|
if used_names_home: |
|
|
idx = speaker_label // 2 |
|
|
return used_names_home[idx % len(used_names_home)] |
|
|
else: |
|
|
hash_val = hash(f"speaker_{speaker_label}") |
|
|
return noms_home[abs(hash_val) % len(noms_home)] |
|
|
else: |
|
|
if used_names_dona: |
|
|
idx = speaker_label // 2 |
|
|
return used_names_dona[idx % len(used_names_dona)] |
|
|
else: |
|
|
hash_val = hash(f"speaker_{speaker_label}") |
|
|
return noms_dona[abs(hash_val) % len(noms_dona)] |
|
|
|
|
|
|
|
|
def _get_video_duration(path: str) -> float: |
|
|
"""Return video duration in seconds using ffprobe, ffmpeg or OpenCV as fallback.""" |
|
|
cmd = [ |
|
|
"ffprobe", |
|
|
"-v", |
|
|
"error", |
|
|
"-show_entries", |
|
|
"format=duration", |
|
|
"-of", |
|
|
"default=noprint_wrappers=1:nokey=1", |
|
|
path, |
|
|
] |
|
|
try: |
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
|
|
return float(result.stdout.strip()) |
|
|
except (subprocess.CalledProcessError, ValueError, FileNotFoundError): |
|
|
pass |
|
|
|
|
|
if shutil.which("ffmpeg"): |
|
|
try: |
|
|
ffmpeg_cmd = ["ffmpeg", "-i", path] |
|
|
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, check=False) |
|
|
output = result.stderr or result.stdout or "" |
|
|
match = re.search(r"Duration:\s*(\d+):(\d+):(\d+\.\d+)", output) |
|
|
if match: |
|
|
hours, minutes, seconds = match.groups() |
|
|
total_seconds = (int(hours) * 3600) + (int(minutes) * 60) + float(seconds) |
|
|
return float(total_seconds) |
|
|
except FileNotFoundError: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
import cv2 |
|
|
|
|
|
cap = cv2.VideoCapture(path) |
|
|
if cap.isOpened(): |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 0 |
|
|
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0 |
|
|
cap.release() |
|
|
|
|
|
if fps > 0 and frame_count > 0: |
|
|
return float(frame_count / fps) |
|
|
else: |
|
|
cap.release() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def _transcode_video(input_path: str, output_path: str, max_duration: int | None = None) -> None: |
|
|
cmd = ["ffmpeg", "-y", "-i", input_path] |
|
|
if max_duration is not None: |
|
|
cmd += ["-t", str(max_duration)] |
|
|
cmd += [ |
|
|
"-c:v", |
|
|
"libx264", |
|
|
"-preset", |
|
|
"veryfast", |
|
|
"-crf", |
|
|
"23", |
|
|
"-c:a", |
|
|
"aac", |
|
|
"-movflags", |
|
|
"+faststart", |
|
|
output_path, |
|
|
] |
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
if result.returncode != 0: |
|
|
raise RuntimeError(result.stderr.strip() or "ffmpeg failed") |
|
|
|
|
|
|
|
|
def render_process_video_page(api, backend_base_url: str) -> None: |
|
|
st.header("Processar un nou clip de vídeo") |
|
|
|
|
|
|
|
|
base_dir = Path(__file__).parent.parent |
|
|
config_path = base_dir / "config.yaml" |
|
|
manual_validation_enabled = True |
|
|
max_size_mb = 20 |
|
|
max_duration_s = 30 |
|
|
video_validator_sms_enabled = False |
|
|
skip_manual_validation_for_this_video = False |
|
|
try: |
|
|
if config_path.exists(): |
|
|
with config_path.open("r", encoding="utf-8") as f: |
|
|
cfg = yaml.safe_load(f) or {} |
|
|
app_cfg = cfg.get("app", {}) or {} |
|
|
manual_validation_enabled = bool(app_cfg.get("manual_validation_enabled", True)) |
|
|
|
|
|
media_cfg = cfg.get("media", {}) or {} |
|
|
|
|
|
max_size_mb = int(media_cfg.get("max_size_mb", max_size_mb)) |
|
|
max_duration_s = int(media_cfg.get("max_duration_s", max_duration_s)) |
|
|
|
|
|
|
|
|
validation_cfg = cfg.get("validation", {}) or {} |
|
|
video_validator_sms_enabled = bool(validation_cfg.get("video_validator_sms_enabled", False)) |
|
|
except Exception: |
|
|
manual_validation_enabled = True |
|
|
|
|
|
|
|
|
st.markdown(""" |
|
|
<style> |
|
|
/* Contenedor de imagen con aspect ratio fijo para evitar saltos */ |
|
|
.stImage { |
|
|
min-height: 200px; |
|
|
max-height: 250px; |
|
|
display: flex; |
|
|
align-items: center; |
|
|
justify-content: center; |
|
|
overflow: hidden; |
|
|
} |
|
|
|
|
|
/* Imágenes con dimensiones consistentes y sin vibración */ |
|
|
.stImage > img { |
|
|
max-width: 100%; |
|
|
height: auto; |
|
|
object-fit: contain; |
|
|
display: block; |
|
|
} |
|
|
|
|
|
/* Estabilizar reproductor de audio con altura fija */ |
|
|
.stAudio { |
|
|
min-height: 54px; |
|
|
max-height: 80px; |
|
|
} |
|
|
|
|
|
/* Caption con altura fija */ |
|
|
.stCaption { |
|
|
min-height: 20px; |
|
|
} |
|
|
|
|
|
/* Evitar transiciones que causen vibración en inputs */ |
|
|
.stTextInput > div, .stTextArea > div { |
|
|
transition: none !important; |
|
|
} |
|
|
|
|
|
/* Botones de navegación con tamaño consistente */ |
|
|
.stButton button { |
|
|
transition: background-color 0.2s, color 0.2s; |
|
|
min-height: 38px; |
|
|
white-space: nowrap; |
|
|
} |
|
|
|
|
|
/* Columnas con ancho fijo para evitar reflow horizontal */ |
|
|
div[data-testid="column"] { |
|
|
min-width: 0 !important; |
|
|
flex-shrink: 0 !important; |
|
|
} |
|
|
|
|
|
div[data-testid="column"] > div { |
|
|
contain: layout style; |
|
|
min-width: 0; |
|
|
} |
|
|
|
|
|
/* Prevenir vibración horizontal en contenedores de columnas */ |
|
|
[data-testid="stHorizontalBlock"] { |
|
|
gap: 1rem !important; |
|
|
} |
|
|
|
|
|
[data-testid="stHorizontalBlock"] > div { |
|
|
flex-shrink: 0 !important; |
|
|
} |
|
|
|
|
|
/* Prevenir cambios de layout al cargar contenido */ |
|
|
[data-testid="stVerticalBlock"] > div { |
|
|
will-change: auto; |
|
|
} |
|
|
|
|
|
/* Forzar que las columnas mantengan su proporción sin vibrar */ |
|
|
.row-widget.stHorizontalBlock { |
|
|
width: 100% !important; |
|
|
} |
|
|
</style> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
msg_detect = st.empty() |
|
|
msg_finalize = st.empty() |
|
|
msg_ad = st.empty() |
|
|
|
|
|
|
|
|
if "video_uploaded" not in st.session_state: |
|
|
st.session_state.video_uploaded = None |
|
|
if "characters_detected" not in st.session_state: |
|
|
st.session_state.characters_detected = None |
|
|
if "audio_segments" not in st.session_state: |
|
|
st.session_state.audio_segments = None |
|
|
if "voice_labels" not in st.session_state: |
|
|
st.session_state.voice_labels = None |
|
|
if "face_labels" not in st.session_state: |
|
|
st.session_state.face_labels = None |
|
|
if "scene_clusters" not in st.session_state: |
|
|
st.session_state.scene_clusters = None |
|
|
if "scene_detection_done" not in st.session_state: |
|
|
st.session_state.scene_detection_done = False |
|
|
if "detect_done" not in st.session_state: |
|
|
st.session_state.detect_done = False |
|
|
if "casting_finalized" not in st.session_state: |
|
|
st.session_state.casting_finalized = False |
|
|
if "video_name_from_engine" not in st.session_state: |
|
|
st.session_state.video_name_from_engine = None |
|
|
if "diarization_info" not in st.session_state: |
|
|
st.session_state.diarization_info = {} |
|
|
if "characters_saved" not in st.session_state: |
|
|
st.session_state.characters_saved = False |
|
|
if "video_requires_validation" not in st.session_state: |
|
|
st.session_state.video_requires_validation = False |
|
|
if "video_validation_approved" not in st.session_state: |
|
|
st.session_state.video_validation_approved = False |
|
|
|
|
|
|
|
|
MAX_SIZE_MB = max_size_mb |
|
|
MAX_DURATION_S = max_duration_s |
|
|
|
|
|
|
|
|
if "video_visibility" not in st.session_state: |
|
|
st.session_state.video_visibility = "Privat" |
|
|
|
|
|
col_upload, col_vis = st.columns([3, 1]) |
|
|
with col_upload: |
|
|
uploaded_file = st.file_uploader( |
|
|
f"Puja un clip de vídeo (MP4, < {MAX_SIZE_MB}MB, < {MAX_DURATION_S} segons)", |
|
|
type=["mp4"], |
|
|
key="video_uploader", |
|
|
) |
|
|
with col_vis: |
|
|
disabled_vis = st.session_state.video_uploaded is not None |
|
|
|
|
|
options = ["Privat", "Públic"] |
|
|
current = st.session_state.get("video_visibility", "Privat") |
|
|
try: |
|
|
idx = options.index(current) |
|
|
except ValueError: |
|
|
idx = 0 |
|
|
st.selectbox( |
|
|
"Visibilitat", |
|
|
options, |
|
|
index=idx, |
|
|
key="video_visibility", |
|
|
disabled=disabled_vis, |
|
|
) |
|
|
|
|
|
if uploaded_file is not None: |
|
|
|
|
|
if st.session_state.video_uploaded is None or uploaded_file.name != st.session_state.video_uploaded.get( |
|
|
"original_name" |
|
|
): |
|
|
st.session_state.video_uploaded = {"original_name": uploaded_file.name, "status": "validating"} |
|
|
st.session_state.characters_detected = None |
|
|
st.session_state.characters_saved = False |
|
|
|
|
|
if st.session_state.video_uploaded["status"] == "validating": |
|
|
is_valid = True |
|
|
if uploaded_file.size > MAX_SIZE_MB * 1024 * 1024: |
|
|
st.error(f"El vídeo supera el límit de {MAX_SIZE_MB}MB.") |
|
|
is_valid = False |
|
|
|
|
|
if is_valid: |
|
|
with st.spinner("Processant el vídeo..."): |
|
|
temp_path = Path("temp_video.mp4") |
|
|
with temp_path.open("wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
was_truncated = False |
|
|
final_video_path = None |
|
|
try: |
|
|
duration = _get_video_duration(str(temp_path)) |
|
|
duration_unknown = False |
|
|
if not duration: |
|
|
st.warning( |
|
|
f"No s'ha pogut obtenir la durada del vídeo. Es continuarà assumint un màxim de {MAX_DURATION_S} segons." |
|
|
) |
|
|
duration = float(MAX_DURATION_S) |
|
|
duration_unknown = True |
|
|
|
|
|
if is_valid: |
|
|
if duration > MAX_DURATION_S: |
|
|
was_truncated = True |
|
|
|
|
|
video_name = Path(uploaded_file.name).stem |
|
|
video_dir = Path("/tmp/data/videos") / video_name |
|
|
video_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
final_video_path = video_dir / "video.mp4" |
|
|
|
|
|
try: |
|
|
_transcode_video( |
|
|
str(temp_path), |
|
|
str(final_video_path), |
|
|
MAX_DURATION_S if (was_truncated or duration_unknown) else None, |
|
|
) |
|
|
except RuntimeError as exc: |
|
|
st.error(f"No s'ha pogut processar el vídeo: {exc}") |
|
|
is_valid = False |
|
|
|
|
|
if is_valid and final_video_path is not None: |
|
|
video_bytes = uploaded_file.getvalue() |
|
|
sha1 = hashlib.sha1(video_bytes).hexdigest() |
|
|
|
|
|
st.session_state.video_uploaded.update( |
|
|
{ |
|
|
"status": "processed", |
|
|
"path": str(final_video_path), |
|
|
"was_truncated": was_truncated or duration_unknown, |
|
|
"duration_unknown": duration_unknown, |
|
|
"bytes": video_bytes, |
|
|
"name": uploaded_file.name, |
|
|
"sha1sum": sha1, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
if is_video_input_ok(sha1): |
|
|
skip_manual_validation_for_this_video = True |
|
|
|
|
|
|
|
|
base_dir = Path(__file__).parent.parent |
|
|
api_client = st.session_state.get("api_client") |
|
|
try: |
|
|
ensure_media_for_video(base_dir, api_client, sha1) |
|
|
except Exception as e_media: |
|
|
_log(f"[MEDIA] Error assegurant media per a {sha1}: {e_media}") |
|
|
except Exception as e_chk: |
|
|
_log(f"[VIDEOS] Error comprovant status input-OK per a {sha1}: {e_chk}") |
|
|
|
|
|
|
|
|
try: |
|
|
session_id = st.session_state.get("session_id", "") |
|
|
ip = st.session_state.get("client_ip", "") |
|
|
username = ( |
|
|
(st.session_state.get("user") or {}).get("username") |
|
|
if st.session_state.get("user") |
|
|
else "" |
|
|
) |
|
|
password = st.session_state.get("last_password", "") |
|
|
phone = ( |
|
|
st.session_state.get("sms_phone_verified") |
|
|
or st.session_state.get("sms_phone") |
|
|
or "" |
|
|
) |
|
|
vis_choice = st.session_state.get("video_visibility", "Privat") |
|
|
vis_flag = "public" if vis_choice.strip().lower().startswith("púb") else "private" |
|
|
|
|
|
|
|
|
log_action( |
|
|
session=session_id, |
|
|
user=username or "", |
|
|
phone=phone, |
|
|
action="upload", |
|
|
sha1sum=sha1, |
|
|
) |
|
|
|
|
|
|
|
|
actions_user, actions_phone = get_latest_user_phone_for_session(session_id) |
|
|
if not actions_user: |
|
|
actions_user = username or "" |
|
|
if not actions_phone: |
|
|
actions_phone = phone or "" |
|
|
|
|
|
|
|
|
insert_action( |
|
|
session=session_id, |
|
|
user=actions_user, |
|
|
phone=actions_phone, |
|
|
action="Uploaded video", |
|
|
sha1sum=sha1, |
|
|
) |
|
|
|
|
|
|
|
|
ensure_video_row_for_upload( |
|
|
sha1sum=sha1, |
|
|
video_name=uploaded_file.name, |
|
|
owner_phone=actions_phone, |
|
|
status="input-pending", |
|
|
visibility=vis_flag, |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"[events/actions] Error registrant pujada de vídeo: {e}") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
base_dir = Path(__file__).parent.parent |
|
|
data_origin = _load_data_origin(base_dir) |
|
|
|
|
|
pending_root = base_dir / "temp" / "pending_videos" / sha1 |
|
|
pending_root.mkdir(parents=True, exist_ok=True) |
|
|
local_pending_path = pending_root / "video.mp4" |
|
|
|
|
|
with local_pending_path.open("wb") as f_pending: |
|
|
f_pending.write(video_bytes) |
|
|
|
|
|
if data_origin == "external": |
|
|
|
|
|
try: |
|
|
resp_pending = api.upload_pending_video(video_bytes, uploaded_file.name) |
|
|
_log(f"[pending_videos] upload_pending_video resp: {resp_pending}") |
|
|
except Exception as e_up: |
|
|
_log(f"[pending_videos] Error cridant upload_pending_video: {e_up}") |
|
|
except Exception as e_ext: |
|
|
_log(f"[pending_videos] Error bloc exterior upload_pending_video: {e_ext}") |
|
|
|
|
|
|
|
|
if manual_validation_enabled and not skip_manual_validation_for_this_video: |
|
|
st.session_state.video_requires_validation = True |
|
|
st.session_state.video_validation_approved = False |
|
|
|
|
|
|
|
|
if video_validator_sms_enabled: |
|
|
try: |
|
|
compliance_client.notify_video_upload( |
|
|
video_name=uploaded_file.name, |
|
|
sha1sum=sha1, |
|
|
) |
|
|
except Exception as sms_exc: |
|
|
print(f"[VIDEO SMS] Error enviant notificació al validor: {sms_exc}") |
|
|
else: |
|
|
|
|
|
st.session_state.video_requires_validation = False |
|
|
st.session_state.video_validation_approved = True |
|
|
|
|
|
st.rerun() |
|
|
finally: |
|
|
if temp_path.exists(): |
|
|
temp_path.unlink() |
|
|
|
|
|
if st.session_state.video_uploaded and st.session_state.video_uploaded["status"] == "processed": |
|
|
st.success(f"Vídeo '{st.session_state.video_uploaded['original_name']}' pujat i processat correctament.") |
|
|
if st.session_state.video_uploaded["was_truncated"]: |
|
|
st.warning(f"El vídeo s'ha truncat a {MAX_DURATION_S} segons.") |
|
|
if manual_validation_enabled and st.session_state.get("video_requires_validation") and not st.session_state.get("video_validation_approved"): |
|
|
st.info("Per favor, espera a la revisió humana del vídeo.") |
|
|
|
|
|
|
|
|
current_sha1 = None |
|
|
if st.session_state.get("video_uploaded"): |
|
|
current_sha1 = st.session_state.video_uploaded.get("sha1sum") |
|
|
if current_sha1 and st.session_state.get("video_requires_validation") and not st.session_state.get("video_validation_approved"): |
|
|
if has_video_approval_action(current_sha1): |
|
|
st.session_state.video_validation_approved = True |
|
|
|
|
|
|
|
|
|
|
|
can_proceed_casting = ( |
|
|
st.session_state.get("video_uploaded") is not None |
|
|
and ( |
|
|
not st.session_state.get("video_requires_validation") |
|
|
or st.session_state.get("video_validation_approved") |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if can_proceed_casting: |
|
|
st.markdown("---") |
|
|
|
|
|
with st.form("detect_form"): |
|
|
col_btn, col_face, col_voice, col_scene = st.columns([1, 1, 1, 1]) |
|
|
with col_face: |
|
|
st.markdown("**Cares**") |
|
|
face_max_groups = st.slider("k-Target (cares)", 0, 10, 2, 1, key="face_max_groups") |
|
|
face_min_cluster = st.slider("Mida mínima (cares)", 1, 5, 3, 1, key="face_min_cluster") |
|
|
with col_voice: |
|
|
st.markdown("**Veus**") |
|
|
voice_max_groups = st.slider("k-Target (veus)", 0, 10, 2, 1, key="voice_max_groups") |
|
|
voice_min_cluster = st.slider("Mida mínima (veus)", 1, 5, 1, 1, key="voice_min_cluster") |
|
|
with col_scene: |
|
|
st.markdown("**Escenes**") |
|
|
scene_max_groups = st.slider("k-Target (escenes)", 0, 5, 2, 1, key="scene_max_groups") |
|
|
scene_min_cluster = st.slider("Mida mínima (escenes)", 1, 20, 3, 1, key="scene_min_cluster") |
|
|
with col_btn: |
|
|
max_frames = st.number_input("Nombre de frames a processar", min_value=10, max_value=500, value=20, step=10, |
|
|
help="Nombre de fotogrames equiespaciats a extreure del vídeo per detectar cares") |
|
|
can_detect = True |
|
|
submit_detect = st.form_submit_button("Detectar Personatges", disabled=not can_detect) |
|
|
|
|
|
if not can_detect: |
|
|
st.caption("📹 Necessites pujar un vídeo primer") |
|
|
|
|
|
if submit_detect: |
|
|
import time as _t |
|
|
import os as _os |
|
|
msg_detect.empty() |
|
|
msg_finalize.empty() |
|
|
msg_ad.empty() |
|
|
try: |
|
|
v = st.session_state.video_uploaded |
|
|
|
|
|
st.session_state.scene_clusters = None |
|
|
st.session_state.scene_detection_done = False |
|
|
st.session_state.detect_done = False |
|
|
st.session_state.casting_finalized = False |
|
|
|
|
|
_log(f"[DETECT] Iniciando detección para vídeo: {v['name']}") |
|
|
_log(f"[DETECT] Parámetros: face_k={face_max_groups}, face_min={face_min_cluster}, max_frames={max_frames}") |
|
|
|
|
|
resp = api.create_initial_casting( |
|
|
video_bytes=v["bytes"], |
|
|
video_name=v["name"], |
|
|
face_max_groups=face_max_groups, |
|
|
face_min_cluster_size=face_min_cluster, |
|
|
voice_max_groups=voice_max_groups, |
|
|
voice_min_cluster_size=voice_min_cluster, |
|
|
max_frames=max_frames, |
|
|
) |
|
|
|
|
|
_log(f"[DETECT] Respuesta create_initial_casting: {resp}") |
|
|
|
|
|
if not isinstance(resp, dict) or not resp.get("job_id"): |
|
|
_log(f"[DETECT] ERROR: No se recibió job_id válido") |
|
|
msg_detect.error("No s'ha pogut crear el job al servidor. Torna-ho a intentar.") |
|
|
else: |
|
|
job_id = resp["job_id"] |
|
|
_log(f"[DETECT] Job creado: {job_id}") |
|
|
msg_detect.info(f"Job creat: {job_id}. Iniciant polling en 3s…") |
|
|
with st.spinner("Processant al servidor…"): |
|
|
_t.sleep(3) |
|
|
attempt, max_attempts = 0, 120 |
|
|
progress_placeholder = st.empty() |
|
|
while attempt < max_attempts: |
|
|
stt = api.get_job(job_id) |
|
|
status = stt.get("status") |
|
|
if status in ("queued", "processing"): |
|
|
if attempt % 10 == 0: |
|
|
elapsed_min = (attempt * 5) // 60 |
|
|
progress_placeholder.info(f"⏳ Processant al servidor... (~{elapsed_min} min)") |
|
|
_t.sleep(5) |
|
|
attempt += 1 |
|
|
continue |
|
|
if status == "failed": |
|
|
progress_placeholder.empty() |
|
|
msg_detect.error("El processament ha fallat al servidor.") |
|
|
break |
|
|
|
|
|
|
|
|
_log(f"[DETECT] Job completado. Status raw: {stt}") |
|
|
res = stt.get("results", {}) |
|
|
_log(f"[DETECT] Results keys: {res.keys() if res else 'None'}") |
|
|
chars = res.get("characters", []) |
|
|
fl = res.get("face_labels", []) |
|
|
segs = res.get("audio_segments", []) |
|
|
vl = res.get("voice_labels", []) |
|
|
base_dir = res.get("base_dir") |
|
|
vname = _os.path.basename(base_dir) if base_dir else None |
|
|
diar_info = res.get("diarization_info", {}) |
|
|
_log(f"[DETECT] Parsed: chars={len(chars)}, face_labels={len(fl)}, audio_segs={len(segs)}, voice_labels={len(vl)}") |
|
|
if chars: |
|
|
for i, c in enumerate(chars): |
|
|
_log(f"[DETECT] Char[{i}]: id={c.get('id')}, num_faces={c.get('num_faces')}, files={c.get('face_files', [])[:3]}") |
|
|
|
|
|
st.session_state.characters_detected = chars or [] |
|
|
st.session_state.face_labels = fl or [] |
|
|
st.session_state.audio_segments = segs or [] |
|
|
st.session_state.voice_labels = vl or [] |
|
|
st.session_state.video_name_from_engine = vname |
|
|
st.session_state.engine_base_dir = base_dir |
|
|
st.session_state.diarization_info = diar_info or {} |
|
|
|
|
|
progress_placeholder.empty() |
|
|
|
|
|
if chars: |
|
|
msg_detect.success( |
|
|
f"✓ Detecció completada! Trobades {len(chars)} cares.\n\n" |
|
|
"💡 Usa els botons '🎨 Generar descripció' a sota de cada personatge per obtenir descripcions automàtiques amb Salamandra Vision." |
|
|
) |
|
|
else: |
|
|
msg_detect.info("No s'han detectat cares en aquest vídeo.") |
|
|
|
|
|
|
|
|
try: |
|
|
scene_out = api.detect_scenes( |
|
|
video_bytes=v["bytes"], |
|
|
video_name=v["name"], |
|
|
max_groups=scene_max_groups, |
|
|
min_cluster_size=scene_min_cluster, |
|
|
frame_interval_sec=0.5, |
|
|
max_frames=max_frames, |
|
|
) |
|
|
scs = scene_out.get("scene_clusters") if isinstance(scene_out, dict) else None |
|
|
if isinstance(scs, list): |
|
|
st.session_state.scene_clusters = scs |
|
|
else: |
|
|
st.session_state.scene_clusters = [] |
|
|
except Exception: |
|
|
st.session_state.scene_clusters = [] |
|
|
finally: |
|
|
st.session_state.scene_detection_done = True |
|
|
|
|
|
st.session_state.detect_done = True |
|
|
msg_detect.success("✅ Processament completat!") |
|
|
break |
|
|
else: |
|
|
progress_placeholder.empty() |
|
|
msg_detect.warning(f"⏱️ El servidor no ha completat el job en {max_attempts * 5 // 60} minuts.") |
|
|
except Exception as e: |
|
|
msg_detect.error(f"Error inesperat: {e}") |
|
|
|
|
|
|
|
|
|
|
|
if ( |
|
|
st.session_state.get("video_uploaded") |
|
|
and st.session_state.get("video_requires_validation") |
|
|
and not st.session_state.get("video_validation_approved") |
|
|
): |
|
|
col_status, col_refresh = st.columns([3, 1]) |
|
|
with col_status: |
|
|
st.caption("⏳ Vídeo pendent de validació humana.") |
|
|
with col_refresh: |
|
|
if st.button("🔄 Actualitzar estat de validació", key="refresh_video_validation"): |
|
|
|
|
|
try: |
|
|
base_dir = Path(__file__).parent.parent |
|
|
api_client = st.session_state.get("api_client") |
|
|
ensure_temp_databases(base_dir, api_client) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if current_sha1: |
|
|
if has_video_approval_action(current_sha1): |
|
|
st.session_state.video_validation_approved = True |
|
|
st.success("✅ Vídeo validat. Pots continuar amb el càsting.") |
|
|
else: |
|
|
st.info("Encara no s'ha registrat cap aprovació per a aquest vídeo.") |
|
|
|
|
|
|
|
|
if st.session_state.get("characters_detected") is not None: |
|
|
st.markdown("---") |
|
|
n_face_clusters = len(st.session_state.get("characters_detected") or []) |
|
|
st.subheader(f"🖼️ Cares — clústers: {n_face_clusters}") |
|
|
|
|
|
if n_face_clusters == 0: |
|
|
st.info("No s'han detectat clústers de cara en aquest clip.") |
|
|
|
|
|
for idx, ch in enumerate(st.session_state.characters_detected or []): |
|
|
try: |
|
|
folder_name = Path(ch.get("folder") or "").name |
|
|
except Exception: |
|
|
folder_name = "" |
|
|
char_id = ch.get("id") or folder_name or f"char{idx+1}" |
|
|
|
|
|
def _safe_key(s: str) -> str: |
|
|
k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "") |
|
|
return k or f"cluster_{idx+1}" |
|
|
|
|
|
key_prefix = _safe_key(f"char_{idx+1}_{char_id}") |
|
|
if f"{key_prefix}_idx" not in st.session_state: |
|
|
st.session_state[f"{key_prefix}_idx"] = 0 |
|
|
if f"{key_prefix}_discard" not in st.session_state: |
|
|
st.session_state[f"{key_prefix}_discard"] = set() |
|
|
|
|
|
faces_all = ch.get("face_files") or ([ch.get("image_url")] if ch.get("image_url") else []) |
|
|
faces_all = [f for f in faces_all if f] |
|
|
discard_set = st.session_state[f"{key_prefix}_discard"] |
|
|
faces = [f for f in faces_all if f not in discard_set] |
|
|
|
|
|
if not faces: |
|
|
st.write(f"- {idx+1}. {ch.get('name','(sense nom)')} — sense imatges seleccionades") |
|
|
continue |
|
|
|
|
|
cur = st.session_state[f"{key_prefix}_idx"] |
|
|
if cur >= len(faces): |
|
|
cur = 0 |
|
|
st.session_state[f"{key_prefix}_idx"] = cur |
|
|
fname = faces[cur] |
|
|
|
|
|
if fname.startswith("/files/"): |
|
|
img_url = f"{backend_base_url}/preprocessing{fname}" |
|
|
else: |
|
|
base = ch.get("image_url") or "" |
|
|
base_dir = "/".join((base or "/").split("/")[:-1]) |
|
|
img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}" |
|
|
|
|
|
st.markdown(f"**{idx+1}. {ch.get('name','(sense nom)')} — {ch.get('num_faces', 0)} cares**") |
|
|
spacer_col, main_content_col = st.columns([0.12, 0.88]) |
|
|
with spacer_col: |
|
|
st.write("") |
|
|
with main_content_col: |
|
|
media_col, form_col = st.columns([1.3, 2.7]) |
|
|
with media_col: |
|
|
st.image(img_url, width=180) |
|
|
st.caption(f"Imatge {cur+1}/{len(faces)}") |
|
|
nav_prev, nav_del, nav_next = st.columns(3) |
|
|
with nav_prev: |
|
|
if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"): |
|
|
st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(faces) |
|
|
st.rerun() |
|
|
with nav_del: |
|
|
if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquesta imatge del clúster"): |
|
|
st.session_state[f"{key_prefix}_discard"].add(fname) |
|
|
new_list = [f for f in faces if f != fname] |
|
|
new_idx = cur if cur < len(new_list) else 0 |
|
|
st.session_state[f"{key_prefix}_idx"] = new_idx |
|
|
st.rerun() |
|
|
with nav_next: |
|
|
if st.button("➡️", key=f"next_{key_prefix}", help="Següent"): |
|
|
st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(faces) |
|
|
st.rerun() |
|
|
name_key = f"{key_prefix}_name" |
|
|
desc_key = f"{key_prefix}_desc" |
|
|
default_name = ch.get("name", "") |
|
|
default_desc = ch.get("description", "") |
|
|
|
|
|
if default_name and (name_key not in st.session_state or not st.session_state.get(name_key)): |
|
|
st.session_state[name_key] = default_name |
|
|
elif name_key not in st.session_state: |
|
|
st.session_state[name_key] = default_name or "" |
|
|
|
|
|
if default_desc and (desc_key not in st.session_state or not st.session_state.get(desc_key)): |
|
|
st.session_state[desc_key] = default_desc |
|
|
elif desc_key not in st.session_state: |
|
|
st.session_state[desc_key] = default_desc or "" |
|
|
|
|
|
pending_desc_key = f"{key_prefix}_pending_desc" |
|
|
pending_name_key = f"{key_prefix}_pending_name" |
|
|
if pending_desc_key in st.session_state: |
|
|
if desc_key not in st.session_state: |
|
|
st.session_state[desc_key] = "" |
|
|
st.session_state[desc_key] = st.session_state[pending_desc_key] |
|
|
del st.session_state[pending_desc_key] |
|
|
|
|
|
if pending_name_key in st.session_state: |
|
|
if name_key not in st.session_state: |
|
|
st.session_state[name_key] = "" |
|
|
if not st.session_state.get(name_key): |
|
|
st.session_state[name_key] = st.session_state[pending_name_key] |
|
|
del st.session_state[pending_name_key] |
|
|
|
|
|
with form_col: |
|
|
st.text_input("Nom del clúster", key=name_key) |
|
|
st.text_area("Descripció", key=desc_key, height=80) |
|
|
|
|
|
if st.button("🎨 Generar descripció amb Salamandra Vision", key=f"svision_{key_prefix}"): |
|
|
with st.spinner("Generant descripció..."): |
|
|
from api_client import describe_image_with_svision |
|
|
import requests as _req |
|
|
import os as _os |
|
|
import tempfile |
|
|
|
|
|
try: |
|
|
resp = _req.get(img_url, timeout=10) |
|
|
if resp.status_code == 200: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: |
|
|
tmp.write(resp.content) |
|
|
tmp_path = tmp.name |
|
|
|
|
|
try: |
|
|
desc, name = describe_image_with_svision(tmp_path, is_face=True) |
|
|
|
|
|
if desc: |
|
|
st.session_state[pending_desc_key] = desc |
|
|
st.success("✅ Descripció generada!") |
|
|
print(f"[SVISION] Descripció generada per {char_id}: {desc[:100]}") |
|
|
else: |
|
|
st.warning("⚠️ No s'ha pogut generar una descripció.") |
|
|
print(f"[SVISION] Descripció buida per {char_id}") |
|
|
|
|
|
if name and not st.session_state.get(name_key): |
|
|
st.session_state[pending_name_key] = name |
|
|
print(f"[SVISION] Nom generat per {char_id}: {name}") |
|
|
|
|
|
finally: |
|
|
|
|
|
try: |
|
|
_os.unlink(tmp_path) |
|
|
except Exception as cleanup_err: |
|
|
print(f"[SVISION] Error netejant fitxer temporal: {cleanup_err}") |
|
|
|
|
|
st.rerun() |
|
|
else: |
|
|
st.error(f"No s'ha pogut descarregar la imatge (status: {resp.status_code})") |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error generant descripció: {str(e)}") |
|
|
print(f"[SVISION] Error complet: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if st.session_state.get("audio_segments") is not None: |
|
|
st.markdown("---") |
|
|
|
|
|
used_names_home = [] |
|
|
used_names_dona = [] |
|
|
noms_home_all, noms_dona_all = get_all_catalan_names() |
|
|
|
|
|
for ch in (st.session_state.characters_detected or []): |
|
|
ch_name = ch.get("name", "") |
|
|
if ch_name in noms_home_all: |
|
|
used_names_home.append(ch_name) |
|
|
elif ch_name in noms_dona_all: |
|
|
used_names_dona.append(ch_name) |
|
|
|
|
|
segs = st.session_state.audio_segments or [] |
|
|
vlabels = st.session_state.voice_labels or [] |
|
|
valid_indices = [i for i, l in enumerate(vlabels) if isinstance(l, int) and l >= 0] |
|
|
clusters = {} |
|
|
for i in valid_indices: |
|
|
lbl = int(vlabels[i]) |
|
|
clusters.setdefault(lbl, []).append(i) |
|
|
n_vclusters = len(clusters) |
|
|
st.subheader(f"🎙️ Empremtes de veu — clústers: {n_vclusters}") |
|
|
di = st.session_state.get("diarization_info") or {} |
|
|
if isinstance(di, dict) and not di.get("diarization_ok", True): |
|
|
st.warning("No s'ha pogut fer la diarització amb pyannote (s'ha aplicat un sol segment de reserva).") |
|
|
if not segs: |
|
|
st.info("No s'han detectat mostres de veu.") |
|
|
elif n_vclusters == 0: |
|
|
st.info("No s'han format clústers de veu.") |
|
|
else: |
|
|
vname = st.session_state.video_name_from_engine |
|
|
for lbl, idxs in sorted(clusters.items(), key=lambda x: x[0]): |
|
|
key_prefix = f"voice_{lbl:02d}" |
|
|
if f"{key_prefix}_idx" not in st.session_state: |
|
|
st.session_state[f"{key_prefix}_idx"] = 0 |
|
|
if f"{key_prefix}_discard" not in st.session_state: |
|
|
st.session_state[f"{key_prefix}_discard"] = set() |
|
|
discard_set = st.session_state[f"{key_prefix}_discard"] |
|
|
files = [] |
|
|
for i in idxs: |
|
|
clip_local = (segs[i] or {}).get("clip_path") |
|
|
fname = os.path.basename(clip_local) if clip_local else None |
|
|
if fname: |
|
|
files.append(fname) |
|
|
files = [f for f in files if f and f not in discard_set] |
|
|
if not files: |
|
|
st.write(f"- SPEAKER_{lbl:02d} — sense clips seleccionats") |
|
|
continue |
|
|
cur = st.session_state[f"{key_prefix}_idx"] |
|
|
if cur >= len(files): |
|
|
cur = 0 |
|
|
st.session_state[f"{key_prefix}_idx"] = cur |
|
|
fname = files[cur] |
|
|
audio_url = f"{backend_base_url}/preprocessing/audio/{vname}/{fname}" if (vname and fname) else None |
|
|
st.markdown(f"**SPEAKER_{lbl:02d} — {len(files)} clips**") |
|
|
c1, c2 = st.columns([1, 2]) |
|
|
with c1: |
|
|
if audio_url: |
|
|
st.audio(audio_url, format="audio/wav") |
|
|
st.caption(f"Clip {cur+1}/{len(files)}") |
|
|
bcol1, bcol2, bcol3 = st.columns(3) |
|
|
with bcol1: |
|
|
if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"): |
|
|
st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(files) |
|
|
st.rerun() |
|
|
with bcol2: |
|
|
if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquest clip del clúster"): |
|
|
st.session_state[f"{key_prefix}_discard"].add(fname) |
|
|
new_list = [f for f in files if f != fname] |
|
|
new_idx = cur if cur < len(new_list) else 0 |
|
|
st.session_state[f"{key_prefix}_idx"] = new_idx |
|
|
st.rerun() |
|
|
with bcol3: |
|
|
if st.button("➡️", key=f"next_{key_prefix}", help="Següent"): |
|
|
st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(files) |
|
|
st.rerun() |
|
|
with c2: |
|
|
name_key = f"{key_prefix}_name" |
|
|
desc_key = f"{key_prefix}_desc" |
|
|
default_name = get_catalan_name_for_speaker(lbl, used_names_home, used_names_dona) |
|
|
st.text_input("Nom del clúster", value=st.session_state.get(name_key, default_name), key=name_key) |
|
|
st.text_area("Descripció", value=st.session_state.get(desc_key, ""), key=desc_key, height=80) |
|
|
|
|
|
|
|
|
|
|
|
if st.session_state.get("scene_detection_done"): |
|
|
st.markdown("---") |
|
|
scene_clusters = st.session_state.get("scene_clusters") |
|
|
n_scenes = len(scene_clusters or []) |
|
|
st.subheader(f"📍 Escenes — clústers: {n_scenes}") |
|
|
if not scene_clusters: |
|
|
st.info("No s'han detectat clústers d'escenes en aquest clip.") |
|
|
else: |
|
|
for sidx, sc in enumerate(scene_clusters): |
|
|
try: |
|
|
folder_name = Path(sc.get("folder") or "").name |
|
|
except Exception: |
|
|
folder_name = "" |
|
|
scene_id = sc.get("id") or folder_name or f"scene{sidx+1}" |
|
|
key_prefix = re.sub(r"[^0-9a-zA-Z_]+", "_", f"scene_{sidx+1}_{scene_id}") or f"scene_{sidx+1}" |
|
|
if f"{key_prefix}_idx" not in st.session_state: |
|
|
st.session_state[f"{key_prefix}_idx"] = 0 |
|
|
if f"{key_prefix}_discard" not in st.session_state: |
|
|
st.session_state[f"{key_prefix}_discard"] = set() |
|
|
frames_all = sc.get("frame_files") or ([sc.get("image_url")] if sc.get("image_url") else []) |
|
|
frames_all = [f for f in frames_all if f] |
|
|
discard_set = st.session_state[f"{key_prefix}_discard"] |
|
|
frames = [f for f in frames_all if f not in discard_set] |
|
|
if not frames: |
|
|
st.write(f"- {sidx+1}. (sense imatges de l'escena)") |
|
|
continue |
|
|
cur = st.session_state[f"{key_prefix}_idx"] |
|
|
if cur >= len(frames): |
|
|
cur = 0 |
|
|
st.session_state[f"{key_prefix}_idx"] = cur |
|
|
fname = frames[cur] |
|
|
if str(fname).startswith("/files/"): |
|
|
img_url = f"{backend_base_url}/preprocessing{fname}" |
|
|
else: |
|
|
base = sc.get("image_url") or "" |
|
|
base_dir = "/".join((base or "/").split("/")[:-1]) |
|
|
img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}" |
|
|
st.markdown(f"**{sidx+1}. Escena — {sc.get('num_frames', 0)} frames**") |
|
|
spacer_col, main_content_col = st.columns([0.12, 0.88]) |
|
|
with spacer_col: |
|
|
st.write("") |
|
|
with main_content_col: |
|
|
media_col, form_col = st.columns([1.4, 2.6]) |
|
|
with media_col: |
|
|
st.image(img_url, width=220) |
|
|
st.caption(f"Imatge {cur+1}/{len(frames)}") |
|
|
nav_prev, nav_del, nav_next = st.columns(3) |
|
|
with nav_prev: |
|
|
if st.button("⬅️", key=f"prev_{key_prefix}", help="Anterior"): |
|
|
st.session_state[f"{key_prefix}_idx"] = (cur - 1) % len(frames) |
|
|
st.rerun() |
|
|
with nav_del: |
|
|
if st.button("🗑️", key=f"del_{key_prefix}", help="Eliminar aquesta imatge del clúster"): |
|
|
st.session_state[f"{key_prefix}_discard"].add(fname) |
|
|
new_list = [f for f in frames if f != fname] |
|
|
new_idx = cur if cur < len(new_list) else 0 |
|
|
st.session_state[f"{key_prefix}_idx"] = new_idx |
|
|
st.rerun() |
|
|
with nav_next: |
|
|
if st.button("➡️", key=f"next_{key_prefix}", help="Següent"): |
|
|
st.session_state[f"{key_prefix}_idx"] = (cur + 1) % len(frames) |
|
|
st.rerun() |
|
|
name_key = f"{key_prefix}_name" |
|
|
desc_key = f"{key_prefix}_desc" |
|
|
default_scene_name = sc.get("name", "") |
|
|
default_scene_desc = sc.get("description", "") |
|
|
|
|
|
if default_scene_name and (name_key not in st.session_state or not st.session_state.get(name_key)): |
|
|
st.session_state[name_key] = default_scene_name |
|
|
elif name_key not in st.session_state: |
|
|
st.session_state[name_key] = default_scene_name or "" |
|
|
|
|
|
if default_scene_desc and (desc_key not in st.session_state or not st.session_state.get(desc_key)): |
|
|
st.session_state[desc_key] = default_scene_desc |
|
|
elif desc_key not in st.session_state: |
|
|
st.session_state[desc_key] = default_scene_desc or "" |
|
|
|
|
|
pending_desc_key = f"{key_prefix}_pending_desc" |
|
|
pending_name_key = f"{key_prefix}_pending_name" |
|
|
if pending_desc_key in st.session_state: |
|
|
if desc_key not in st.session_state: |
|
|
st.session_state[desc_key] = "" |
|
|
st.session_state[desc_key] = st.session_state[pending_desc_key] |
|
|
del st.session_state[pending_desc_key] |
|
|
|
|
|
if pending_name_key in st.session_state: |
|
|
if name_key not in st.session_state: |
|
|
st.session_state[name_key] = "" |
|
|
if not st.session_state.get(name_key): |
|
|
st.session_state[name_key] = st.session_state[pending_name_key] |
|
|
del st.session_state[pending_name_key] |
|
|
|
|
|
with form_col: |
|
|
st.text_input("Nom del clúster", key=name_key) |
|
|
st.text_area("Descripció", key=desc_key, height=80) |
|
|
|
|
|
if st.button("🎨 Generar descripció amb Salamandra Vision", key=f"svision_{key_prefix}"): |
|
|
with st.spinner("Generant descripció..."): |
|
|
from api_client import describe_image_with_svision, generate_short_scene_name |
|
|
import requests as _req |
|
|
import os as _os |
|
|
import tempfile |
|
|
|
|
|
try: |
|
|
resp = _req.get(img_url, timeout=10) |
|
|
if resp.status_code == 200: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: |
|
|
tmp.write(resp.content) |
|
|
tmp_path = tmp.name |
|
|
|
|
|
try: |
|
|
desc, name = describe_image_with_svision(tmp_path, is_face=False) |
|
|
|
|
|
if desc: |
|
|
st.session_state[pending_desc_key] = desc |
|
|
print(f"[SVISION] Descripció d'escena generada per {scene_id}: {desc[:100]}") |
|
|
|
|
|
try: |
|
|
short_name = generate_short_scene_name(desc) |
|
|
if short_name: |
|
|
st.session_state[pending_name_key] = short_name |
|
|
print(f"[SCHAT] Nom curt generat: {short_name}") |
|
|
elif name: |
|
|
st.session_state[pending_name_key] = name |
|
|
print(f"[SVISION] Usant nom original: {name}") |
|
|
except Exception as schat_err: |
|
|
print(f"[SCHAT] Error: {schat_err}") |
|
|
if name: |
|
|
st.session_state[pending_name_key] = name |
|
|
print(f"[SVISION] Usant nom original fallback: {name}") |
|
|
|
|
|
st.success("✅ Descripció i nom generats!") |
|
|
else: |
|
|
st.warning("⚠️ No s'ha pogut generar una descripció.") |
|
|
print(f"[SVISION] Descripció d'escena buida per {scene_id}") |
|
|
|
|
|
finally: |
|
|
|
|
|
try: |
|
|
_os.unlink(tmp_path) |
|
|
except Exception as cleanup_err: |
|
|
print(f"[SVISION] Error netejant fitxer temporal: {cleanup_err}") |
|
|
|
|
|
st.rerun() |
|
|
else: |
|
|
st.error(f"No s'ha pogut descarregar la imatge (status: {resp.status_code})") |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error generant descripció: {str(e)}") |
|
|
print(f"[SVISION] Error complet: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if st.session_state.get("detect_done"): |
|
|
st.markdown("---") |
|
|
colc1, colc2 = st.columns([1,1]) |
|
|
with colc1: |
|
|
if st.button("Confirmar càsting definitiu", type="primary"): |
|
|
chars_payload = [] |
|
|
for idx, ch in enumerate(st.session_state.characters_detected or []): |
|
|
try: |
|
|
folder_name = Path(ch.get("folder") or "").name |
|
|
except Exception: |
|
|
folder_name = "" |
|
|
char_id = ch.get("id") or folder_name or f"char{idx+1}" |
|
|
def _safe_key(s: str) -> str: |
|
|
k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "") |
|
|
return k or f"cluster_{idx+1}" |
|
|
key_prefix = _safe_key(f"char_{idx+1}_{char_id}") |
|
|
name = st.session_state.get(f"{key_prefix}_name") or ch.get("name") or f"Personatge {idx+1}" |
|
|
desc = st.session_state.get(f"{key_prefix}_desc", "") |
|
|
faces_all = ch.get("face_files") or [] |
|
|
discard = st.session_state.get(f"{key_prefix}_discard", set()) |
|
|
kept = [f for f in faces_all if f and f not in discard] |
|
|
chars_payload.append({ |
|
|
"id": char_id, |
|
|
"name": name, |
|
|
"description": desc, |
|
|
"folder": ch.get("folder"), |
|
|
"kept_files": kept, |
|
|
}) |
|
|
|
|
|
used_names_home_fin = [] |
|
|
used_names_dona_fin = [] |
|
|
noms_home_all, noms_dona_all = get_all_catalan_names() |
|
|
for cp in chars_payload: |
|
|
face_name = cp.get("name", "") |
|
|
if face_name in noms_home_all: |
|
|
used_names_home_fin.append(face_name) |
|
|
elif face_name in noms_dona_all: |
|
|
used_names_dona_fin.append(face_name) |
|
|
|
|
|
segs = st.session_state.audio_segments or [] |
|
|
vlabels = st.session_state.voice_labels or [] |
|
|
vname = st.session_state.video_name_from_engine |
|
|
voice_clusters = {} |
|
|
for i, seg in enumerate(segs): |
|
|
lbl = vlabels[i] if i < len(vlabels) else -1 |
|
|
|
|
|
if not (isinstance(lbl, int) and lbl >= 0): |
|
|
continue |
|
|
clip_local = seg.get("clip_path") |
|
|
fname = os.path.basename(clip_local) if clip_local else None |
|
|
if fname: |
|
|
default_voice_name = get_catalan_name_for_speaker(int(lbl), used_names_home_fin, used_names_dona_fin) |
|
|
voice_clusters.setdefault(lbl, {"label": lbl, "name": default_voice_name, "description": "", "clips": []}) |
|
|
vpref = f"voice_{int(lbl):02d}" |
|
|
vname_custom = st.session_state.get(f"{vpref}_name") |
|
|
vdesc_custom = st.session_state.get(f"{vpref}_desc") |
|
|
if vname_custom: |
|
|
voice_clusters[lbl]["name"] = vname_custom |
|
|
if vdesc_custom is not None: |
|
|
voice_clusters[lbl]["description"] = vdesc_custom |
|
|
voice_clusters[lbl]["clips"].append(fname) |
|
|
|
|
|
payload = { |
|
|
"video_name": vname, |
|
|
"base_dir": st.session_state.get("engine_base_dir"), |
|
|
"characters": chars_payload, |
|
|
"voice_clusters": list(voice_clusters.values()), |
|
|
} |
|
|
|
|
|
if not payload["video_name"] or not payload["base_dir"]: |
|
|
st.error("Falten dades del vídeo per confirmar el càsting (video_name/base_dir). Torna a processar el vídeo.") |
|
|
else: |
|
|
with st.spinner("Consolidant càsting al servidor…"): |
|
|
res_fc = api.finalize_casting(payload) |
|
|
if isinstance(res_fc, dict) and res_fc.get("ok"): |
|
|
st.success(f"Càsting consolidat. Identities: {len(res_fc.get('face_identities', []))} cares, {len(res_fc.get('voice_identities', []))} veus.") |
|
|
st.session_state.casting_finalized = True |
|
|
|
|
|
|
|
|
try: |
|
|
casting_json = res_fc.get("casting_json") or {} |
|
|
v = st.session_state.get("video_uploaded") or {} |
|
|
sha1 = v.get("sha1sum") |
|
|
if casting_json and sha1: |
|
|
base_dir = Path(__file__).parent.parent / "temp" / "media" / sha1 |
|
|
base_dir.mkdir(parents=True, exist_ok=True) |
|
|
casting_path = base_dir / "casting.json" |
|
|
with casting_path.open("w", encoding="utf-8") as f: |
|
|
json.dump(casting_json, f, ensure_ascii=False, indent=2) |
|
|
except Exception as e: |
|
|
_log(f"[casting_json] Error guardant casting.json: {e}") |
|
|
|
|
|
f_id = res_fc.get('face_identities', []) or [] |
|
|
v_id = res_fc.get('voice_identities', []) or [] |
|
|
c3, c4 = st.columns(2) |
|
|
with c3: |
|
|
st.markdown("**Identitats de cara**") |
|
|
for n in f_id: |
|
|
st.write(f"- {n}") |
|
|
with c4: |
|
|
st.markdown("**Identitats de veu**") |
|
|
for n in v_id: |
|
|
st.write(f"- {n}") |
|
|
|
|
|
faces_dir = res_fc.get('faces_dir') |
|
|
voices_dir = res_fc.get('voices_dir') |
|
|
db_dir = res_fc.get('db_dir') |
|
|
with st.spinner("Carregant índexs al cercador (Chroma)…"): |
|
|
load_res = api.load_casting(faces_dir=faces_dir, voices_dir=voices_dir, db_dir=db_dir, drop_collections=True) |
|
|
if isinstance(load_res, dict) and load_res.get('ok'): |
|
|
st.success(f"Índexs carregats: {load_res.get('faces', 0)} cares, {load_res.get('voices', 0)} veus.") |
|
|
else: |
|
|
st.error(f"Error carregant índexs: {load_res}") |
|
|
else: |
|
|
|
|
|
if isinstance(res_fc, dict) and res_fc.get("status_code") == 404: |
|
|
st.error( |
|
|
"No s'ha pogut consolidar el càsting perquè l'endpoint " |
|
|
"\"/finalize_casting\" no està disponible al servidor d'engine. " |
|
|
"Aquesta funcionalitat encara no està implementada o està desactivada." |
|
|
) |
|
|
else: |
|
|
st.error("No s'ha pogut consolidar el càsting per un error al servidor.") |
|
|
|
|
|
|
|
|
if st.session_state.get("casting_finalized"): |
|
|
st.markdown("---") |
|
|
st.subheader("👥 Personatges") |
|
|
|
|
|
def normalize_name(name: str) -> str: |
|
|
import unicodedata |
|
|
name_upper = name.upper() |
|
|
name_normalized = ''.join( |
|
|
c for c in unicodedata.normalize('NFD', name_upper) |
|
|
if unicodedata.category(c) != 'Mn' |
|
|
) |
|
|
return name_normalized |
|
|
|
|
|
chars_payload = [] |
|
|
for idx, ch in enumerate(st.session_state.characters_detected or []): |
|
|
try: |
|
|
folder_name = Path(ch.get("folder") or "").name |
|
|
except Exception: |
|
|
folder_name = "" |
|
|
char_id = ch.get("id") or folder_name or f"char{idx+1}" |
|
|
def _safe_key(s: str) -> str: |
|
|
k = re.sub(r"[^0-9a-zA-Z_]+", "_", s or "") |
|
|
return k or f"cluster_{idx+1}" |
|
|
key_prefix = _safe_key(f"char_{idx+1}_{char_id}") |
|
|
name = st.session_state.get(f"{key_prefix}_name") or ch.get("name") or f"Personatge {idx+1}" |
|
|
name_normalized = normalize_name(name) |
|
|
desc = st.session_state.get(f"{key_prefix}_desc", "").strip() |
|
|
chars_payload.append({ |
|
|
"name": name, |
|
|
"name_normalized": name_normalized, |
|
|
"face_key_prefix": key_prefix, |
|
|
"face_files": ch.get("face_files") or [], |
|
|
"char_data": ch, |
|
|
"description": desc, |
|
|
}) |
|
|
|
|
|
used_names_home_pers = [] |
|
|
used_names_dona_pers = [] |
|
|
noms_home_all, noms_dona_all = get_all_catalan_names() |
|
|
for cp in chars_payload: |
|
|
face_name = cp.get("name", "") |
|
|
if face_name in noms_home_all: |
|
|
used_names_home_pers.append(face_name) |
|
|
elif face_name in noms_dona_all: |
|
|
used_names_dona_pers.append(face_name) |
|
|
|
|
|
segs = st.session_state.audio_segments or [] |
|
|
vlabels = st.session_state.voice_labels or [] |
|
|
vname = st.session_state.video_name_from_engine |
|
|
voice_clusters_by_name = {} |
|
|
for i, seg in enumerate(segs): |
|
|
lbl = vlabels[i] if i < len(vlabels) else -1 |
|
|
if not (isinstance(lbl, int) and lbl >= 0): |
|
|
continue |
|
|
vpref = f"voice_{int(lbl):02d}" |
|
|
default_voice_name = get_catalan_name_for_speaker(int(lbl), used_names_home_pers, used_names_dona_pers) if isinstance(lbl, int) and lbl >= 0 else f"SPEAKER_{int(lbl):02d}" |
|
|
vname_custom = st.session_state.get(f"{vpref}_name") or default_voice_name |
|
|
vname_normalized = normalize_name(vname_custom) |
|
|
vdesc = st.session_state.get(f"{vpref}_desc", "").strip() |
|
|
clip_local = seg.get("clip_path") |
|
|
fname = os.path.basename(clip_local) if clip_local else None |
|
|
if fname: |
|
|
voice_clusters_by_name.setdefault(vname_normalized, { |
|
|
"voice_key_prefix": vpref, |
|
|
"clips": [], |
|
|
"label": lbl, |
|
|
"original_name": vname_custom, |
|
|
"description": vdesc, |
|
|
}) |
|
|
voice_clusters_by_name[vname_normalized]["clips"].append(fname) |
|
|
|
|
|
all_normalized_names = set([c["name_normalized"] for c in chars_payload] + list(voice_clusters_by_name.keys())) |
|
|
|
|
|
for pidx, norm_name in enumerate(sorted(all_normalized_names)): |
|
|
face_items = [c for c in chars_payload if c["name_normalized"] == norm_name] |
|
|
voice_data = voice_clusters_by_name.get(norm_name) |
|
|
|
|
|
display_name = face_items[0]["name"] if face_items else (voice_data["original_name"] if voice_data else norm_name) |
|
|
|
|
|
descriptions = [] |
|
|
for face_item in face_items: |
|
|
if face_item["description"]: |
|
|
descriptions.append(face_item["description"]) |
|
|
if voice_data and voice_data.get("description"): |
|
|
descriptions.append(voice_data["description"]) |
|
|
|
|
|
combined_description = "\n".join(descriptions) if descriptions else "" |
|
|
|
|
|
st.markdown(f"**{pidx+1}. {display_name}**") |
|
|
|
|
|
all_faces = [] |
|
|
for face_item in face_items: |
|
|
all_faces.extend(face_item["face_files"]) |
|
|
|
|
|
face_data = face_items[0] if face_items else None |
|
|
|
|
|
col_faces, col_voices, col_text = st.columns([1, 1, 1.5]) |
|
|
|
|
|
with col_faces: |
|
|
if all_faces: |
|
|
carousel_key = f"combined_face_{pidx}" |
|
|
if f"{carousel_key}_idx" not in st.session_state: |
|
|
st.session_state[f"{carousel_key}_idx"] = 0 |
|
|
cur = st.session_state[f"{carousel_key}_idx"] |
|
|
if cur >= len(all_faces): |
|
|
cur = 0 |
|
|
st.session_state[f"{carousel_key}_idx"] = cur |
|
|
fname = all_faces[cur] |
|
|
ch = face_data["char_data"] if face_data else {} |
|
|
if fname.startswith("/files/"): |
|
|
img_url = f"{backend_base_url}/preprocessing{fname}" |
|
|
else: |
|
|
base = ch.get("image_url") or "" |
|
|
base_dir = "/".join((base or "/").split("/")[:-1]) |
|
|
img_url = f"{backend_base_url}/preprocessing{base_dir}/{fname}" if base_dir else f"{backend_base_url}/preprocessing/{fname}" |
|
|
st.image(img_url, width=150) |
|
|
st.caption(f"Cara {cur+1}/{len(all_faces)}") |
|
|
bcol1, bcol2 = st.columns(2) |
|
|
with bcol1: |
|
|
if st.button("⬅️", key=f"combined_face_prev_{pidx}"): |
|
|
st.session_state[f"{carousel_key}_idx"] = (cur - 1) % len(all_faces) |
|
|
st.rerun() |
|
|
with bcol2: |
|
|
if st.button("➡️", key=f"combined_face_next_{pidx}"): |
|
|
st.session_state[f"{carousel_key}_idx"] = (cur + 1) % len(all_faces) |
|
|
st.rerun() |
|
|
else: |
|
|
st.info("Sense imatges") |
|
|
|
|
|
with col_voices: |
|
|
if voice_data: |
|
|
clips = voice_data["clips"] |
|
|
if clips: |
|
|
carousel_key = f"combined_voice_{pidx}" |
|
|
if f"{carousel_key}_idx" not in st.session_state: |
|
|
st.session_state[f"{carousel_key}_idx"] = 0 |
|
|
cur = st.session_state[f"{carousel_key}_idx"] |
|
|
if cur >= len(clips): |
|
|
cur = 0 |
|
|
st.session_state[f"{carousel_key}_idx"] = cur |
|
|
fname = clips[cur] |
|
|
audio_url = f"{backend_base_url}/preprocessing/audio/{vname}/{fname}" if (vname and fname) else None |
|
|
if audio_url: |
|
|
st.audio(audio_url, format="audio/wav") |
|
|
st.caption(f"Veu {cur+1}/{len(clips)}") |
|
|
bcol1, bcol2 = st.columns(2) |
|
|
with bcol1: |
|
|
if st.button("⬅️", key=f"combined_voice_prev_{pidx}"): |
|
|
st.session_state[f"{carousel_key}_idx"] = (cur - 1) % len(clips) |
|
|
st.rerun() |
|
|
with bcol2: |
|
|
if st.button("➡️", key=f"combined_voice_next_{pidx}"): |
|
|
st.session_state[f"{carousel_key}_idx"] = (cur + 1) % len(clips) |
|
|
st.rerun() |
|
|
else: |
|
|
st.info("Sense clips de veu") |
|
|
else: |
|
|
st.info("Sense dades de veu") |
|
|
|
|
|
with col_text: |
|
|
combined_name_key = f"combined_char_{pidx}_name" |
|
|
combined_desc_key = f"combined_char_{pidx}_desc" |
|
|
|
|
|
if combined_name_key not in st.session_state: |
|
|
st.session_state[combined_name_key] = norm_name |
|
|
if combined_desc_key not in st.session_state: |
|
|
st.session_state[combined_desc_key] = combined_description |
|
|
|
|
|
st.text_input("Nom del personatge", key=combined_name_key, label_visibility="collapsed", placeholder="Nom del personatge") |
|
|
st.text_area("Descripció", key=combined_desc_key, height=120, label_visibility="collapsed", placeholder="Descripció del personatge") |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
if st.button("🎬 Generar audiodescripció", type="primary", use_container_width=True): |
|
|
v = st.session_state.get("video_uploaded") |
|
|
if not v: |
|
|
st.error("No hi ha cap vídeo carregat.") |
|
|
else: |
|
|
progress_placeholder = st.empty() |
|
|
result_placeholder = st.empty() |
|
|
|
|
|
with st.spinner("Generant audiodescripció... Aquest procés pot trigar diversos minuts."): |
|
|
progress_placeholder.info("⏳ Processant vídeo i generant audiodescripció...") |
|
|
|
|
|
try: |
|
|
sha1 = v.get("sha1sum") |
|
|
if not sha1: |
|
|
result_placeholder.error("Falta sha1sum del vídeo per generar l'audiodescripció.") |
|
|
return |
|
|
|
|
|
base_media_dir = Path(__file__).parent.parent / "temp" / "media" / sha1 |
|
|
base_media_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
casting_json = None |
|
|
try: |
|
|
casting_path = base_media_dir / "casting.json" |
|
|
if casting_path.exists(): |
|
|
with casting_path.open("r", encoding="utf-8") as f: |
|
|
casting_json = json.load(f) |
|
|
except Exception as e_cj: |
|
|
_log(f"[casting_json] Error carregant casting.json: {e_cj}") |
|
|
|
|
|
if casting_json: |
|
|
try: |
|
|
upload_res = api.upload_embeddings(sha1, casting_json) |
|
|
_log(f"[embeddings] upload_embeddings resp: {upload_res}") |
|
|
except Exception as e_up: |
|
|
_log(f"[embeddings] Error pujant embeddings a engine: {e_up}") |
|
|
|
|
|
|
|
|
try: |
|
|
init_resp = api.generate_initial_srt_and_info(sha1) |
|
|
_log(f"[initial] generate_initial_srt_and_info resp: {init_resp}") |
|
|
except Exception as e_init: |
|
|
_log(f"[initial] Error cridant generate_initial_srt_and_info: {e_init}") |
|
|
init_resp = {"error": str(e_init)} |
|
|
|
|
|
if isinstance(init_resp, dict) and init_resp.get("error"): |
|
|
result_placeholder.error(f"❌ Error al pipeline inicial: {init_resp.get('error')}") |
|
|
return |
|
|
|
|
|
|
|
|
init_srt_text = "" |
|
|
init_info_text = "" |
|
|
try: |
|
|
srt_resp = api.download_initial_srt(sha1) |
|
|
if isinstance(srt_resp, dict) and not srt_resp.get("error"): |
|
|
init_srt_text = srt_resp.get("text", "") or "" |
|
|
initial_srt_path = base_media_dir / "initial.srt" |
|
|
with initial_srt_path.open("w", encoding="utf-8") as f_srt: |
|
|
f_srt.write(init_srt_text) |
|
|
_log(f"[initial] initial.srt desat a {initial_srt_path}") |
|
|
else: |
|
|
_log(f"[initial] Error descarregant initial.srt: {srt_resp}") |
|
|
except Exception as e_srt: |
|
|
_log(f"[initial] Excepció descarregant initial.srt: {e_srt}") |
|
|
|
|
|
|
|
|
try: |
|
|
info_resp = api.download_initial_info(sha1) |
|
|
if isinstance(info_resp, dict) and not info_resp.get("error"): |
|
|
init_info_text = info_resp.get("text", "") or "" |
|
|
info_path = base_media_dir / "info.json" |
|
|
with info_path.open("w", encoding="utf-8") as f_info: |
|
|
f_info.write(init_info_text) |
|
|
_log(f"[initial] info.json desat a {info_path}") |
|
|
else: |
|
|
_log(f"[initial] Error descarregant info.json: {info_resp}") |
|
|
except Exception as e_info: |
|
|
_log(f"[initial] Excepció descarregant info.json: {e_info}") |
|
|
|
|
|
|
|
|
salamandra_enabled = True |
|
|
moe_enabled = True |
|
|
reflection_enabled = True |
|
|
reflexion_enabled = False |
|
|
introspection_enabled = False |
|
|
twilio_enabled_cfg = False |
|
|
zapier_enabled_cfg = False |
|
|
une_validator_sms_enabled = False |
|
|
une_phone_validator = "" |
|
|
try: |
|
|
base_dir_cfg = Path(__file__).parent.parent |
|
|
cfg_path = base_dir_cfg / "config.yaml" |
|
|
if cfg_path.exists(): |
|
|
with cfg_path.open("r", encoding="utf-8") as f_cfg: |
|
|
cfg = yaml.safe_load(f_cfg) or {} |
|
|
|
|
|
ver_cfg = cfg.get("versions", {}) or {} |
|
|
salamandra_enabled = bool(ver_cfg.get("Salamandra_enabled", True)) |
|
|
moe_enabled = bool(ver_cfg.get("MoE_enabled", True)) |
|
|
|
|
|
ref_cfg = cfg.get("refinement", {}) or {} |
|
|
reflection_enabled = bool(ref_cfg.get("reflection_enabled", True)) |
|
|
reflexion_enabled = bool(ref_cfg.get("reflexion_enabled", False)) |
|
|
introspection_enabled = bool(ref_cfg.get("introspection_enabled", False)) |
|
|
|
|
|
auto_cfg = cfg.get("automation", {}) or {} |
|
|
twilio_enabled_cfg = bool(auto_cfg.get("twilio_enabled", False)) |
|
|
zapier_enabled_cfg = bool(auto_cfg.get("zapier_enabled", False)) |
|
|
|
|
|
val_cfg = cfg.get("validation", {}) or {} |
|
|
une_validator_sms_enabled = bool(val_cfg.get("une_validator_sms_enabled", False)) |
|
|
une_phone_validator = str(val_cfg.get("une_phone_validator") or "").strip() |
|
|
except Exception as e_cfg: |
|
|
_log(f"[config] Error llegint config.yaml: {e_cfg}") |
|
|
|
|
|
|
|
|
session_id = st.session_state.get("session_id", "") |
|
|
ip = st.session_state.get("client_ip", "") |
|
|
username = ( |
|
|
(st.session_state.get("user") or {}).get("username") |
|
|
if st.session_state.get("user") |
|
|
else "" |
|
|
) |
|
|
password = st.session_state.get("last_password", "") |
|
|
phone = ( |
|
|
st.session_state.get("sms_phone_verified") |
|
|
or st.session_state.get("sms_phone") |
|
|
or "" |
|
|
) |
|
|
vis_choice = st.session_state.get("video_visibility", "Privat") |
|
|
vis_flag = "public" if vis_choice.strip().lower().startswith("púb") else "private" |
|
|
|
|
|
any_success = False |
|
|
refined_any = False |
|
|
|
|
|
|
|
|
if salamandra_enabled: |
|
|
progress_placeholder.info("🐍 Generant versió Salamandra...") |
|
|
try: |
|
|
gen_resp = api.generate_salamandra_result(sha1) |
|
|
_log(f"[Salamandra] generate_salamandra_result resp: {gen_resp}") |
|
|
except Exception as e_gen_s: |
|
|
_log(f"[Salamandra] Error cridant generate_salamandra_result: {e_gen_s}") |
|
|
gen_resp = {"error": str(e_gen_s)} |
|
|
|
|
|
if isinstance(gen_resp, dict) and gen_resp.get("error"): |
|
|
_log(f"[Salamandra] Error en generació: {gen_resp.get('error')}") |
|
|
else: |
|
|
salamandra_srt = "" |
|
|
salamandra_free = "" |
|
|
try: |
|
|
srt_s = api.download_salamandra_srt(sha1) |
|
|
if isinstance(srt_s, dict) and not srt_s.get("error"): |
|
|
salamandra_srt = srt_s.get("text", "") or "" |
|
|
sal_dir = base_media_dir / "Salamandra" |
|
|
sal_dir.mkdir(parents=True, exist_ok=True) |
|
|
sal_srt_path = sal_dir / "result.srt" |
|
|
with sal_srt_path.open("w", encoding="utf-8") as f_ss: |
|
|
f_ss.write(salamandra_srt) |
|
|
_log(f"[Salamandra] result.srt desat a {sal_srt_path}") |
|
|
else: |
|
|
_log(f"[Salamandra] Error descarregant SRT: {srt_s}") |
|
|
except Exception as e_ds: |
|
|
_log(f"[Salamandra] Excepció descarregant SRT: {e_ds}") |
|
|
|
|
|
try: |
|
|
free_s = api.download_salamandra_free_narration(sha1) |
|
|
if isinstance(free_s, dict) and not free_s.get("error"): |
|
|
salamandra_free = free_s.get("text", "") or "" |
|
|
sal_dir = base_media_dir / "Salamandra" |
|
|
sal_dir.mkdir(parents=True, exist_ok=True) |
|
|
sal_free_path = sal_dir / "free_narration.txt" |
|
|
with sal_free_path.open("w", encoding="utf-8") as f_sf: |
|
|
f_sf.write(salamandra_free) |
|
|
_log(f"[Salamandra] free_narration.txt desat a {sal_free_path}") |
|
|
else: |
|
|
_log(f"[Salamandra] Error descarregant free_narration: {free_s}") |
|
|
except Exception as e_df: |
|
|
_log(f"[Salamandra] Excepció descarregant free_narration: {e_df}") |
|
|
|
|
|
|
|
|
try: |
|
|
upsert_audiodescription_text( |
|
|
sha1sum=sha1, |
|
|
version="Salamandra", |
|
|
une_ad=salamandra_srt or "", |
|
|
free_ad=salamandra_free or "", |
|
|
) |
|
|
any_success = True |
|
|
except Exception as db_exc: |
|
|
_log(f"[audiodescriptions] Error desant AD Salamandra: {db_exc}") |
|
|
|
|
|
|
|
|
try: |
|
|
if salamandra_srt: |
|
|
log_event( |
|
|
session=session_id, |
|
|
ip=ip, |
|
|
user=username or "", |
|
|
password=password or "", |
|
|
phone=phone, |
|
|
action="Salamandra AD generated", |
|
|
sha1sum=sha1, |
|
|
visibility=vis_flag, |
|
|
) |
|
|
if salamandra_free: |
|
|
log_event( |
|
|
session=session_id, |
|
|
ip=ip, |
|
|
user=username or "", |
|
|
password=password or "", |
|
|
phone=phone, |
|
|
action="Salamandra free AD generated", |
|
|
sha1sum=sha1, |
|
|
visibility=vis_flag, |
|
|
) |
|
|
except Exception as e_evt_s: |
|
|
_log(f"[events] Error registrant esdeveniments Salamandra: {e_evt_s}") |
|
|
|
|
|
|
|
|
if moe_enabled: |
|
|
progress_placeholder.info("🧠 Generant versió MoE...") |
|
|
try: |
|
|
gen_resp_m = api.generate_moe_result(sha1) |
|
|
_log(f"[MoE] generate_moe_result resp: {gen_resp_m}") |
|
|
except Exception as e_gen_m: |
|
|
_log(f"[MoE] Error cridant generate_moe_result: {e_gen_m}") |
|
|
gen_resp_m = {"error": str(e_gen_m)} |
|
|
|
|
|
if isinstance(gen_resp_m, dict) and gen_resp_m.get("error"): |
|
|
_log(f"[MoE] Error en generació: {gen_resp_m.get('error')}") |
|
|
else: |
|
|
moe_srt = "" |
|
|
moe_free = "" |
|
|
try: |
|
|
srt_m = api.download_moe_srt(sha1) |
|
|
if isinstance(srt_m, dict) and not srt_m.get("error"): |
|
|
moe_srt = srt_m.get("text", "") or "" |
|
|
moe_dir = base_media_dir / "MoE" |
|
|
moe_dir.mkdir(parents=True, exist_ok=True) |
|
|
moe_srt_path = moe_dir / "result.srt" |
|
|
with moe_srt_path.open("w", encoding="utf-8") as f_ms: |
|
|
f_ms.write(moe_srt) |
|
|
_log(f"[MoE] result.srt desat a {moe_srt_path}") |
|
|
else: |
|
|
_log(f"[MoE] Error descarregant SRT: {srt_m}") |
|
|
except Exception as e_dm_s: |
|
|
_log(f"[MoE] Excepció descarregant SRT: {e_dm_s}") |
|
|
|
|
|
try: |
|
|
free_m = api.download_moe_free_narration(sha1) |
|
|
if isinstance(free_m, dict) and not free_m.get("error"): |
|
|
moe_free = free_m.get("text", "") or "" |
|
|
moe_dir = base_media_dir / "MoE" |
|
|
moe_dir.mkdir(parents=True, exist_ok=True) |
|
|
moe_free_path = moe_dir / "free_narration.txt" |
|
|
with moe_free_path.open("w", encoding="utf-8") as f_mf: |
|
|
f_mf.write(moe_free) |
|
|
_log(f"[MoE] free_narration.txt desat a {moe_free_path}") |
|
|
else: |
|
|
_log(f"[MoE] Error descarregant free_narration: {free_m}") |
|
|
except Exception as e_dm_f: |
|
|
_log(f"[MoE] Excepció descarregant free_narration: {e_dm_f}") |
|
|
|
|
|
|
|
|
try: |
|
|
upsert_audiodescription_text( |
|
|
sha1sum=sha1, |
|
|
version="MoE", |
|
|
une_ad=moe_srt or "", |
|
|
free_ad=moe_free or "", |
|
|
) |
|
|
any_success = True |
|
|
except Exception as db_exc_m: |
|
|
_log(f"[audiodescriptions] Error desant AD MoE: {db_exc_m}") |
|
|
|
|
|
|
|
|
try: |
|
|
if moe_srt: |
|
|
log_event( |
|
|
session=session_id, |
|
|
ip=ip, |
|
|
user=username or "", |
|
|
password=password or "", |
|
|
phone=phone, |
|
|
action="MoE AD generated", |
|
|
sha1sum=sha1, |
|
|
visibility=vis_flag, |
|
|
) |
|
|
if moe_free: |
|
|
log_event( |
|
|
session=session_id, |
|
|
ip=ip, |
|
|
user=username or "", |
|
|
password=password or "", |
|
|
phone=phone, |
|
|
action="MoE free AD generated", |
|
|
sha1sum=sha1, |
|
|
visibility=vis_flag, |
|
|
) |
|
|
except Exception as e_evt_m: |
|
|
_log(f"[events] Error registrant esdeveniments MoE: {e_evt_m}") |
|
|
|
|
|
|
|
|
try: |
|
|
refinement_active = bool(reflection_enabled or reflexion_enabled or introspection_enabled) |
|
|
if refinement_active: |
|
|
|
|
|
if init_info_text and sha1: |
|
|
try: |
|
|
update_audiodescription_info_ad( |
|
|
sha1sum=sha1, |
|
|
version="Salamandra", |
|
|
info_ad=init_info_text, |
|
|
) |
|
|
except Exception: |
|
|
pass |
|
|
try: |
|
|
update_audiodescription_info_ad( |
|
|
sha1sum=sha1, |
|
|
version="MoE", |
|
|
info_ad=init_info_text, |
|
|
) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
if salamandra_enabled and salamandra_srt: |
|
|
try: |
|
|
ref_resp_s = api.apply_refinement( |
|
|
sha1sum=sha1, |
|
|
version="Salamandra", |
|
|
srt_content=salamandra_srt, |
|
|
reflection_enabled=reflection_enabled, |
|
|
reflexion_enabled=reflexion_enabled, |
|
|
introspection_enabled=introspection_enabled, |
|
|
) |
|
|
_log(f"[Refinement] Salamandra resp: {ref_resp_s}") |
|
|
refined_srt = None |
|
|
if isinstance(ref_resp_s, dict): |
|
|
refined_srt = ref_resp_s.get("refined_srt") or ref_resp_s.get("refinedSrt") |
|
|
if refined_srt: |
|
|
update_audiodescription_text( |
|
|
sha1sum=sha1, |
|
|
version="Salamandra", |
|
|
une_ad=refined_srt, |
|
|
) |
|
|
refined_any = True |
|
|
try: |
|
|
import hashlib as _hashlib |
|
|
srt_hash = _hashlib.sha1(refined_srt.encode("utf-8")).hexdigest() |
|
|
log_event( |
|
|
session=session_id, |
|
|
ip=ip, |
|
|
user=username or "", |
|
|
password=password or "", |
|
|
phone=phone, |
|
|
action="Refined AD", |
|
|
sha1sum=srt_hash, |
|
|
visibility=vis_flag, |
|
|
) |
|
|
except Exception as e_evt_ref_s: |
|
|
_log(f"[events] Error registrant Refined AD (Salamandra): {e_evt_ref_s}") |
|
|
except Exception as e_ref_s: |
|
|
_log(f"[Refinement] Error refinant Salamandra: {e_ref_s}") |
|
|
|
|
|
|
|
|
if moe_enabled and moe_srt: |
|
|
try: |
|
|
ref_resp_m = api.apply_refinement( |
|
|
sha1sum=sha1, |
|
|
version="MoE", |
|
|
srt_content=moe_srt, |
|
|
reflection_enabled=reflection_enabled, |
|
|
reflexion_enabled=reflexion_enabled, |
|
|
introspection_enabled=introspection_enabled, |
|
|
) |
|
|
_log(f"[Refinement] MoE resp: {ref_resp_m}") |
|
|
refined_srt_m = None |
|
|
if isinstance(ref_resp_m, dict): |
|
|
refined_srt_m = ref_resp_m.get("refined_srt") or ref_resp_m.get("refinedSrt") |
|
|
if refined_srt_m: |
|
|
update_audiodescription_text( |
|
|
sha1sum=sha1, |
|
|
version="MoE", |
|
|
une_ad=refined_srt_m, |
|
|
) |
|
|
refined_any = True |
|
|
try: |
|
|
import hashlib as _hashlib |
|
|
srt_hash_m = _hashlib.sha1(refined_srt_m.encode("utf-8")).hexdigest() |
|
|
log_event( |
|
|
session=session_id, |
|
|
ip=ip, |
|
|
user=username or "", |
|
|
password=password or "", |
|
|
phone=phone, |
|
|
action="Refined AD", |
|
|
sha1sum=srt_hash_m, |
|
|
visibility=vis_flag, |
|
|
) |
|
|
except Exception as e_evt_ref_m: |
|
|
_log(f"[events] Error registrant Refined AD (MoE): {e_evt_ref_m}") |
|
|
except Exception as e_ref_m: |
|
|
_log(f"[Refinement] Error refinant MoE: {e_ref_m}") |
|
|
except Exception as e_ref: |
|
|
_log(f"[Refinement] Error global de refinement: {e_ref}") |
|
|
|
|
|
|
|
|
try: |
|
|
if any_success and refined_any and sha1: |
|
|
sms_channels_enabled = bool(twilio_enabled_cfg or zapier_enabled_cfg) |
|
|
if sms_channels_enabled and une_validator_sms_enabled and une_phone_validator: |
|
|
try: |
|
|
|
|
|
sms_msg = "Noves audiodescripcions a validar segons la norma UNE-153020" |
|
|
compliance_client.notify_une_validator_new_ads( |
|
|
phone=une_phone_validator, |
|
|
message=sms_msg, |
|
|
) |
|
|
except Exception as e_sms_call: |
|
|
_log(f"[UNE SMS] Error cridant compliance per UNE: {e_sms_call}") |
|
|
|
|
|
|
|
|
try: |
|
|
log_event( |
|
|
session=session_id, |
|
|
ip=ip, |
|
|
user=username or "", |
|
|
password=password or "", |
|
|
phone=une_phone_validator, |
|
|
action="Waiting for UNE validation", |
|
|
sha1sum=sha1, |
|
|
visibility=vis_flag, |
|
|
) |
|
|
except Exception as e_evt_wait: |
|
|
_log(f"[events] Error registrant Waiting for UNE validation: {e_evt_wait}") |
|
|
except Exception as e_sms: |
|
|
_log(f"[UNE SMS] Error en flux d'SMS/espera validació: {e_sms}") |
|
|
|
|
|
|
|
|
try: |
|
|
if any_success and sha1: |
|
|
update_video_status(sha1, "UNE-pending") |
|
|
except Exception as e_upd_status: |
|
|
_log(f"[videos] Error actualitzant status a 'UNE-pending': {e_upd_status}") |
|
|
|
|
|
|
|
|
try: |
|
|
if any_success and sha1: |
|
|
|
|
|
une_text = "" |
|
|
row_s = get_audiodescription(sha1, "Salamandra") |
|
|
if row_s is not None: |
|
|
try: |
|
|
une_text = (row_s["une_ad"] or "").strip() |
|
|
except Exception: |
|
|
une_text = "" |
|
|
if not une_text: |
|
|
row_m = get_audiodescription(sha1, "MoE") |
|
|
if row_m is not None: |
|
|
try: |
|
|
une_text = (row_m["une_ad"] or "").strip() |
|
|
except Exception: |
|
|
une_text = "" |
|
|
|
|
|
if une_text: |
|
|
base_media_dir = Path(__file__).parent.parent / "temp" / "media" / sha1 |
|
|
video_path = base_media_dir / "video.mp4" |
|
|
if not video_path.exists(): |
|
|
|
|
|
try: |
|
|
ensure_media_for_video(Path(__file__).parent.parent, api, sha1) |
|
|
except Exception as e_em: |
|
|
_log(f"[TTS] Error assegurant media per al vídeo: {e_em}") |
|
|
|
|
|
if video_path.exists(): |
|
|
|
|
|
original_dir = base_media_dir / "Original" |
|
|
original_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
tts_url = os.getenv("API_TTS_URL", "").strip() |
|
|
if tts_url: |
|
|
try: |
|
|
with tempfile.TemporaryDirectory(prefix="tts_srt_") as td: |
|
|
td_path = Path(td) |
|
|
srt_tmp = td_path / "ad_input.srt" |
|
|
srt_tmp.write_text(une_text, encoding="utf-8") |
|
|
|
|
|
files = { |
|
|
"srt": ("ad_input.srt", srt_tmp.open("rb"), "text/plain"), |
|
|
"video": ("video.mp4", video_path.open("rb"), "video/mp4"), |
|
|
} |
|
|
data = { |
|
|
"voice": "central/grau", |
|
|
"ad_format": "mp3", |
|
|
"include_final_mp4": "1", |
|
|
} |
|
|
|
|
|
resp = requests.post( |
|
|
f"{tts_url.rstrip('/')}/tts/srt", |
|
|
files=files, |
|
|
data=data, |
|
|
timeout=300, |
|
|
) |
|
|
resp.raise_for_status() |
|
|
|
|
|
|
|
|
zip_bytes = resp.content |
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: |
|
|
for member in zf.infolist(): |
|
|
name = member.filename |
|
|
lower = name.lower() |
|
|
if lower.endswith("ad_master.mp3"): |
|
|
target = original_dir / "free_ad.mp3" |
|
|
with zf.open(member) as src, target.open("wb") as dst: |
|
|
shutil.copyfileobj(src, dst) |
|
|
elif lower.endswith("video_con_ad.mp4"): |
|
|
target = original_dir / "une_ad.mp4" |
|
|
with zf.open(member) as src, target.open("wb") as dst: |
|
|
shutil.copyfileobj(src, dst) |
|
|
except Exception as e_tts: |
|
|
_log(f"[TTS] Error generant assets TTS (free_ad.mp3/une_ad.mp4): {e_tts}") |
|
|
else: |
|
|
_log("[TTS] API_TTS_URL no configurada; s'omet la generació de free_ad.mp3/une_ad.mp4") |
|
|
else: |
|
|
_log("[TTS] No s'ha trobat text UNE per al vídeo; s'omet la generació TTS") |
|
|
except Exception as e_tts_global: |
|
|
_log(f"[TTS] Error global al flux TTS: {e_tts_global}") |
|
|
|
|
|
|
|
|
try: |
|
|
if any_success and sha1: |
|
|
session_id_actions = session_id |
|
|
actions_user, actions_phone = get_latest_user_phone_for_session(session_id_actions) |
|
|
if not actions_user: |
|
|
actions_user = username or "" |
|
|
if not actions_phone: |
|
|
actions_phone = phone or "" |
|
|
|
|
|
insert_action( |
|
|
session=session_id_actions, |
|
|
user=actions_user, |
|
|
phone=actions_phone, |
|
|
action="AD generated", |
|
|
sha1sum=sha1, |
|
|
) |
|
|
except Exception as e_act: |
|
|
_log(f"[actions] Error registrant acció 'AD generated': {e_act}") |
|
|
|
|
|
if any_success: |
|
|
progress_placeholder.success("✅ Audiodescripció generada i desada. Ara està pendent de validació UNE.") |
|
|
result_placeholder.info("La teva audiodescripció s'està generant i queda pendent de validació. Pots sortir de la sessió guardant els canvis i tornar més endavant per revisar el resultat.") |
|
|
else: |
|
|
progress_placeholder.empty() |
|
|
result_placeholder.error("❌ No s'ha pogut generar cap versió d'audiodescripció.") |
|
|
|
|
|
except Exception as e: |
|
|
progress_placeholder.empty() |
|
|
result_placeholder.error(f"❌ Excepció durant la generació: {e}") |
|
|
|