|
|
"""UI logic for the "Validació" page.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Dict |
|
|
import sys |
|
|
|
|
|
import shutil |
|
|
import os |
|
|
import tempfile |
|
|
import zipfile |
|
|
import io |
|
|
import requests |
|
|
import streamlit as st |
|
|
|
|
|
from databases import ( |
|
|
get_accessible_videos_with_sha1, |
|
|
get_audiodescription, |
|
|
get_audiodescription_history, |
|
|
update_audiodescription_text, |
|
|
update_audiodescription_info_ad, |
|
|
log_action, |
|
|
update_video_status, |
|
|
get_video_owner_by_sha1, |
|
|
get_videos_by_status, |
|
|
insert_action, |
|
|
) |
|
|
from persistent_data_gate import _load_data_origin |
|
|
|
|
|
|
|
|
def _log(msg: str) -> None: |
|
|
"""Helper de logging a stderr amb timestamp (coherent amb auth.py).""" |
|
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
sys.stderr.write(f"[{ts}] {msg}\n") |
|
|
sys.stderr.flush() |
|
|
|
|
|
|
|
|
def render_validation_page( |
|
|
compliance_client, |
|
|
runtime_videos: Path, |
|
|
permissions: Dict[str, bool], |
|
|
username: str, |
|
|
) -> None: |
|
|
if not permissions.get("validar", False): |
|
|
st.warning("⚠️ No tens permisos per accedir a aquesta secció de validació.") |
|
|
st.stop() |
|
|
|
|
|
st.header("🔍 Validació de Vídeos") |
|
|
|
|
|
tab_videos, tab_ads = st.tabs(["📹 Validar Vídeos", "🎬 Validar Audiodescripcions"]) |
|
|
|
|
|
base_dir = Path(__file__).resolve().parent.parent |
|
|
data_origin = _load_data_origin(base_dir) |
|
|
|
|
|
|
|
|
config_path = base_dir / "config.yaml" |
|
|
user_sms_enabled = False |
|
|
try: |
|
|
if config_path.exists(): |
|
|
import yaml |
|
|
|
|
|
with config_path.open("r", encoding="utf-8") as f: |
|
|
cfg = yaml.safe_load(f) or {} |
|
|
validation_cfg = cfg.get("validation", {}) or {} |
|
|
user_sms_enabled = bool(validation_cfg.get("user_sms_enabled", False)) |
|
|
except Exception: |
|
|
user_sms_enabled = False |
|
|
|
|
|
|
|
|
session_id = st.session_state.get("session_id") |
|
|
accessible_rows = get_accessible_videos_with_sha1(session_id) if data_origin == "internal" else [] |
|
|
|
|
|
|
|
|
base_media_dir = base_dir / "temp" / "media" |
|
|
pending_root = base_dir / "temp" / "pending_videos" |
|
|
|
|
|
with tab_videos: |
|
|
st.subheader("📹 Validar Vídeos Pujats") |
|
|
|
|
|
video_folders = [] |
|
|
|
|
|
|
|
|
col_refresh_list, _ = st.columns([1, 3]) |
|
|
with col_refresh_list: |
|
|
if st.button("🔄 Actualitzar llista de vídeos pendents", key="refresh_pending_videos_list"): |
|
|
st.rerun() |
|
|
|
|
|
if data_origin == "internal": |
|
|
|
|
|
if pending_root.exists() and pending_root.is_dir(): |
|
|
for folder in sorted(pending_root.iterdir()): |
|
|
if not folder.is_dir(): |
|
|
continue |
|
|
sha1 = folder.name |
|
|
video_files = list(folder.glob("*.mp4")) + list(folder.glob("*.avi")) + list(folder.glob("*.mov")) |
|
|
if not video_files: |
|
|
continue |
|
|
mod_time = folder.stat().st_mtime |
|
|
fecha = datetime.fromtimestamp(mod_time).strftime("%Y-%m-%d %H:%M") |
|
|
video_folders.append( |
|
|
{ |
|
|
"sha1sum": sha1, |
|
|
"video_name": sha1, |
|
|
"path": str(folder), |
|
|
"created_at": fecha, |
|
|
"video_files": video_files, |
|
|
} |
|
|
) |
|
|
else: |
|
|
|
|
|
api_client = st.session_state.get("api_client") |
|
|
if api_client is not None: |
|
|
try: |
|
|
resp = api_client.list_pending_videos() |
|
|
_log(f"[pending_videos] list_pending_videos raw resp type= {type(resp)}") |
|
|
_log(f"[pending_videos] list_pending_videos raw resp content= {repr(resp)}") |
|
|
except Exception as e_list: |
|
|
_log(f"[pending_videos] Error cridant list_pending_videos: {e_list}") |
|
|
resp = {"error": "exception"} |
|
|
|
|
|
pending_list = [] |
|
|
if isinstance(resp, dict) and not resp.get("error"): |
|
|
|
|
|
if isinstance(resp.get("videos"), list): |
|
|
pending_list = resp["videos"] |
|
|
elif isinstance(resp.get("items"), list): |
|
|
pending_list = resp["items"] |
|
|
elif isinstance(resp.get("results"), list): |
|
|
pending_list = resp["results"] |
|
|
elif isinstance(resp, list): |
|
|
pending_list = resp |
|
|
elif isinstance(resp, list): |
|
|
pending_list = resp |
|
|
|
|
|
_log(f"[pending_videos] parsed pending_list length= {len(pending_list) if isinstance(pending_list, list) else 'N/A'}") |
|
|
if isinstance(pending_list, list) and pending_list: |
|
|
_log(f"[pending_videos] first items: {pending_list[:3]}") |
|
|
|
|
|
for item in pending_list: |
|
|
sha1 = item.get("sha1") or item.get("video_hash") or item.get("id") |
|
|
if not sha1: |
|
|
continue |
|
|
video_name = item.get("latest_video") or sha1 |
|
|
|
|
|
folder = pending_root / sha1 |
|
|
if folder.exists(): |
|
|
video_files = list(folder.glob("*.mp4")) |
|
|
else: |
|
|
video_files = [] |
|
|
created_at = item.get("created_at") or datetime.utcnow().strftime("%Y-%m-%d %H:%M") |
|
|
video_folders.append( |
|
|
{ |
|
|
"sha1sum": sha1, |
|
|
"video_name": video_name, |
|
|
"path": str(folder), |
|
|
"created_at": created_at, |
|
|
"video_files": video_files, |
|
|
} |
|
|
) |
|
|
|
|
|
if not video_folders: |
|
|
st.info("📝 No hi ha vídeos pujats pendents de validació.") |
|
|
else: |
|
|
opciones_video = [f"{video['video_name']} - {video['created_at']}" for video in video_folders] |
|
|
seleccion = st.selectbox( |
|
|
"Selecciona un vídeo per validar:", |
|
|
opciones_video, |
|
|
index=0 if opciones_video else None, |
|
|
) |
|
|
|
|
|
if seleccion: |
|
|
indice = opciones_video.index(seleccion) |
|
|
video_seleccionat = video_folders[indice] |
|
|
|
|
|
col1, col2 = st.columns([2, 1]) |
|
|
|
|
|
with col1: |
|
|
st.markdown("### 📹 Informació del Vídeo") |
|
|
st.markdown(f"**Nom:** {video_seleccionat['video_name']}") |
|
|
st.markdown(f"**Data:** {video_seleccionat['created_at']}") |
|
|
st.markdown(f"**Arxius:** {len(video_seleccionat['video_files'])} vídeos trobats") |
|
|
|
|
|
|
|
|
if data_origin == "external" and not video_seleccionat["video_files"]: |
|
|
api_client = st.session_state.get("api_client") |
|
|
if api_client is not None: |
|
|
try: |
|
|
resp = api_client.download_pending_video(video_seleccionat["sha1sum"]) |
|
|
except Exception: |
|
|
resp = {"error": "exception"} |
|
|
|
|
|
video_bytes = ( |
|
|
resp.get("video_bytes") |
|
|
if isinstance(resp, dict) |
|
|
else None |
|
|
) |
|
|
if video_bytes: |
|
|
local_folder = pending_root / video_seleccionat["sha1sum"] |
|
|
local_folder.mkdir(parents=True, exist_ok=True) |
|
|
local_path = local_folder / "video.mp4" |
|
|
with local_path.open("wb") as f: |
|
|
f.write(video_bytes) |
|
|
video_seleccionat["video_files"] = [local_path] |
|
|
|
|
|
if video_seleccionat["video_files"]: |
|
|
video_path = str(video_seleccionat["video_files"][0]) |
|
|
st.markdown("**Vídeo principal:**") |
|
|
st.video(video_path) |
|
|
else: |
|
|
st.warning("⚠️ No s'han trobat arxius de vídeo.") |
|
|
|
|
|
with col2: |
|
|
st.markdown("### 🔍 Accions de Validació") |
|
|
|
|
|
col_btn1, col_btn2 = st.columns(2) |
|
|
|
|
|
with col_btn1: |
|
|
if st.button("✅ Acceptar", type="primary", key=f"accept_video_{video_seleccionat['sha1sum']}"): |
|
|
|
|
|
success = compliance_client.record_validator_decision( |
|
|
document_id=f"video_{video_seleccionat['video_name']}", |
|
|
validator_email=f"{username}@veureu.local", |
|
|
decision="acceptat", |
|
|
comments=f"Vídeo validat per {username}", |
|
|
) |
|
|
|
|
|
|
|
|
session_id = st.session_state.get("session_id") or "" |
|
|
phone = st.session_state.get("phone_number") or "" |
|
|
|
|
|
try: |
|
|
log_action( |
|
|
session=session_id, |
|
|
user=username or "", |
|
|
phone=phone, |
|
|
action="input-OK", |
|
|
sha1sum=video_seleccionat["sha1sum"] or "", |
|
|
) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if success: |
|
|
st.success("✅ Vídeo acceptat, registrat al servei de compliance i marcat com input-OK") |
|
|
else: |
|
|
st.error("❌ Error registrant el veredicte al servei de compliance") |
|
|
|
|
|
|
|
|
sha1 = video_seleccionat["sha1sum"] |
|
|
local_pending_dir = pending_root / sha1 |
|
|
local_media_dir = base_media_dir / sha1 |
|
|
try: |
|
|
local_media_dir.mkdir(parents=True, exist_ok=True) |
|
|
src = local_pending_dir / "video.mp4" |
|
|
if src.exists(): |
|
|
dst = local_media_dir / "video.mp4" |
|
|
shutil.copy2(src, dst) |
|
|
if local_pending_dir.exists(): |
|
|
shutil.rmtree(local_pending_dir) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
update_video_status(sha1, "input-OK") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
if user_sms_enabled: |
|
|
try: |
|
|
owner_phone = get_video_owner_by_sha1(sha1) |
|
|
except Exception: |
|
|
owner_phone = "" |
|
|
|
|
|
if owner_phone: |
|
|
try: |
|
|
|
|
|
msg = "Su vídeo ha sido aprobado. Puede entrar en la aplicación y subirlo de nuevo para generar la audiodescripción" |
|
|
compliance_client.notify_user_video_approved( |
|
|
phone=owner_phone, |
|
|
message=msg, |
|
|
sha1sum=sha1, |
|
|
) |
|
|
except Exception as e_sms: |
|
|
_log(f"[VIDEO USER SMS] Error enviant SMS a l'usuari: {e_sms}") |
|
|
|
|
|
with col_btn2: |
|
|
if st.button("❌ Rebutjar", type="secondary", key=f"reject_video_{video_seleccionat['video_name']}"): |
|
|
success = compliance_client.record_validator_decision( |
|
|
document_id=f"video_{video_seleccionat['video_name']}", |
|
|
validator_email=f"{username}@veureu.local", |
|
|
decision="rebutjat", |
|
|
comments=f"Vídeo rebutjat per {username}", |
|
|
) |
|
|
if success: |
|
|
st.success("✅ Vídeo rebutjat i registrat al servei de compliance") |
|
|
else: |
|
|
st.error("❌ Error registrant el veredicte") |
|
|
|
|
|
with tab_ads: |
|
|
st.subheader("🎬 Validar Audiodescripcions") |
|
|
|
|
|
|
|
|
pending_videos = get_videos_by_status("UNE-pending") |
|
|
|
|
|
if not pending_videos: |
|
|
st.info("📝 No hi ha audiodescripcions pendents de validació UNE.") |
|
|
else: |
|
|
options = [f"{v['video_name']} ({v['sha1sum']})" for v in pending_videos] |
|
|
seleccion_ad = st.selectbox( |
|
|
"Selecciona un vídeo per validar la seva audiodescripció:", |
|
|
options, |
|
|
index=0 if options else None, |
|
|
) |
|
|
|
|
|
if seleccion_ad: |
|
|
idx = options.index(seleccion_ad) |
|
|
sel = pending_videos[idx] |
|
|
sha1 = sel["sha1sum"] |
|
|
video_name = sel["video_name"] |
|
|
|
|
|
|
|
|
available_versions = [] |
|
|
for v_name in ("Salamandra", "MoE"): |
|
|
rows = get_audiodescription_history(sha1, v_name) |
|
|
if rows: |
|
|
available_versions.append(v_name) |
|
|
|
|
|
if not available_versions: |
|
|
st.error("No s'ha trobat cap audiodescripció per a aquest vídeo a la base de dades.") |
|
|
else: |
|
|
selected_version = st.selectbox( |
|
|
"Selecciona la versió a validar:", |
|
|
available_versions, |
|
|
index=0, |
|
|
key=f"ad_version_{sha1}", |
|
|
) |
|
|
|
|
|
rows = get_audiodescription_history(sha1, selected_version) or [] |
|
|
row_ad = rows[-1] |
|
|
current_une = row_ad.get("une_ad") or "" |
|
|
current_free = row_ad.get("free_ad") or "" |
|
|
|
|
|
col1, col2 = st.columns([2, 1]) |
|
|
|
|
|
with col1: |
|
|
st.markdown("### 🎬 Informació de l'Audiodescripció") |
|
|
st.markdown(f"**Vídeo:** {video_name}") |
|
|
st.markdown(f"**SHA1:** {sha1}") |
|
|
st.markdown(f"**Versió:** {selected_version}") |
|
|
|
|
|
video_dir = base_media_dir / sha1 |
|
|
video_file = None |
|
|
if video_dir.exists(): |
|
|
for cand in [video_dir / "video.mp4", video_dir / "video.avi", video_dir / "video.mov"]: |
|
|
if cand.exists(): |
|
|
video_file = cand |
|
|
break |
|
|
if video_file is not None: |
|
|
st.video(str(video_file)) |
|
|
else: |
|
|
st.warning("⚠️ No s'ha trobat el fitxer de vídeo original.") |
|
|
|
|
|
st.markdown("#### 📝 Audiodescripció UNE-153010 (SRT)") |
|
|
new_une = st.text_area( |
|
|
"Text SRT UNE-153010", |
|
|
value=current_une, |
|
|
height=220, |
|
|
key=f"une_editor_{sha1}_{selected_version}", |
|
|
) |
|
|
|
|
|
st.markdown("#### 🗒️ Narració lliure") |
|
|
new_free = st.text_area( |
|
|
"Text narració lliure", |
|
|
value=current_free, |
|
|
height=220, |
|
|
key=f"free_editor_{sha1}_{selected_version}", |
|
|
) |
|
|
|
|
|
with col2: |
|
|
st.markdown("### 🔍 Accions de Validació") |
|
|
|
|
|
if st.button("✅ Acceptar", type="primary", key=f"accept_ad_{sha1}_{selected_version}"): |
|
|
try: |
|
|
|
|
|
try: |
|
|
success = compliance_client.record_validator_decision( |
|
|
document_id=f"ad_{video_name}", |
|
|
validator_email=f"{username}@veureu.local", |
|
|
decision="acceptat", |
|
|
comments=f"Audiodescripció UNE validada per {username}", |
|
|
) |
|
|
if not success: |
|
|
st.warning("⚠️ No s'ha pogut registrar el veredicte al servei de compliance.") |
|
|
except Exception as e_comp: |
|
|
_log(f"[UNE validation] Error amb compliance: {e_comp}") |
|
|
|
|
|
|
|
|
update_audiodescription_text( |
|
|
sha1sum=sha1, |
|
|
version=selected_version, |
|
|
ok_une_ad=new_une, |
|
|
ok_free_ad=new_free, |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
update_video_status(sha1, "UNE-OK") |
|
|
except Exception as e_stat: |
|
|
_log(f"[UNE validation] Error actualitzant status a UNE-OK: {e_stat}") |
|
|
|
|
|
|
|
|
try: |
|
|
session_id_actions = session_id or "" |
|
|
user_for_action = username or "" |
|
|
phone_for_action = st.session_state.get("phone_number") or "" |
|
|
insert_action( |
|
|
session=session_id_actions, |
|
|
user=user_for_action, |
|
|
phone=phone_for_action, |
|
|
action="UNE-OK", |
|
|
sha1sum=sha1, |
|
|
) |
|
|
except Exception as e_act: |
|
|
_log(f"[UNE validation] Error registrant acció UNE-OK: {e_act}") |
|
|
|
|
|
|
|
|
if user_sms_enabled: |
|
|
try: |
|
|
owner_phone = get_video_owner_by_sha1(sha1) |
|
|
except Exception: |
|
|
owner_phone = "" |
|
|
|
|
|
if owner_phone: |
|
|
try: |
|
|
msg = ( |
|
|
"La seva audiodescripció ha estat validada segons la norma UNE-153020. " |
|
|
"Pots tornar a l'aplicació per revisar-la i descarregar-la." |
|
|
) |
|
|
compliance_client.notify_user_video_approved( |
|
|
phone=owner_phone, |
|
|
message=msg, |
|
|
sha1sum=sha1, |
|
|
) |
|
|
except Exception as e_sms: |
|
|
_log(f"[UNE USER SMS] Error enviant SMS a l'usuari: {e_sms}") |
|
|
|
|
|
|
|
|
try: |
|
|
if new_une.strip(): |
|
|
base_media_dir = base_dir / "temp" / "media" |
|
|
video_dir = base_media_dir / sha1 |
|
|
video_path = None |
|
|
if video_dir.exists(): |
|
|
for cand in [video_dir / "video.mp4", video_dir / "video.avi", video_dir / "video.mov"]: |
|
|
if cand.exists(): |
|
|
video_path = cand |
|
|
break |
|
|
|
|
|
if video_path is not None: |
|
|
hitl_ok_dir = video_dir / "HITL OK" |
|
|
hitl_ok_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
tts_url = os.getenv("API_TTS_URL", "").strip() |
|
|
if tts_url: |
|
|
with tempfile.TemporaryDirectory(prefix="tts_hitl_ok_") as td: |
|
|
td_path = Path(td) |
|
|
srt_tmp = td_path / "ad_ok.srt" |
|
|
srt_tmp.write_text(new_une, encoding="utf-8") |
|
|
|
|
|
files = { |
|
|
"srt": ("ad_ok.srt", srt_tmp.open("rb"), "text/plain"), |
|
|
"video": (video_path.name, video_path.open("rb"), "video/mp4"), |
|
|
} |
|
|
data = { |
|
|
"voice": "central/grau", |
|
|
"ad_format": "mp3", |
|
|
"include_final_mp4": "1", |
|
|
} |
|
|
|
|
|
resp = requests.post( |
|
|
f"{tts_url.rstrip('/')}/tts/srt", |
|
|
files=files, |
|
|
data=data, |
|
|
timeout=300, |
|
|
) |
|
|
resp.raise_for_status() |
|
|
|
|
|
zip_bytes = resp.content |
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: |
|
|
for member in zf.infolist(): |
|
|
name = member.filename |
|
|
lower = name.lower() |
|
|
if lower.endswith("ad_master.mp3"): |
|
|
target = hitl_ok_dir / "free_ad.mp3" |
|
|
with zf.open(member) as src, target.open("wb") as dst: |
|
|
shutil.copyfileobj(src, dst) |
|
|
elif lower.endswith("video_con_ad.mp4"): |
|
|
target = hitl_ok_dir / "une_ad.mp4" |
|
|
with zf.open(member) as src, target.open("wb") as dst: |
|
|
shutil.copyfileobj(src, dst) |
|
|
else: |
|
|
_log("[UNE TTS] API_TTS_URL no configurada; s'omet la generació de free_ad.mp3/une_ad.mp4 (HITL OK)") |
|
|
except Exception as e_tts: |
|
|
_log(f"[UNE TTS] Error generant assets HITL OK: {e_tts}") |
|
|
|
|
|
|
|
|
try: |
|
|
log_action( |
|
|
session=session_id or "", |
|
|
user=username or "", |
|
|
phone=st.session_state.get("phone_number") or "", |
|
|
action="validate_ad_une_153020", |
|
|
sha1sum=sha1, |
|
|
) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
st.success("✅ Audiodescripció UNE-153010 validada i desada (HITL OK).") |
|
|
st.rerun() |
|
|
except Exception as e_val: |
|
|
st.error(f"❌ Error durant la validació de l'audiodescripció: {e_val}") |
|
|
|
|
|
st.markdown("---") |
|
|
st.markdown("### ℹ️ Informació del Procés de Validació") |
|
|
st.markdown( |
|
|
""" |
|
|
- **Tots els veredictes** es registren al servei de compliance per garantir la traçabilitat |
|
|
- **Cada validació** inclou veredicte, nom del vídeo i validador responsable |
|
|
- **Els registres** compleixen amb la normativa AI Act i GDPR |
|
|
""" |
|
|
) |
|
|
|