import os import sqlite3 from contextlib import contextmanager from datetime import datetime from pathlib import Path from statistics import median from typing import Optional, Dict, Any, List, Tuple # Reutilizamos la misma lógica que antes, pero centralizada en este módulo DEFAULT_DB_PATH = None # set by set_db_path at runtime # Flag global per decidir si es fa servir blockchain (AWS QLDB) per als esdeveniments USE_BLOCKCHAIN_FOR_EVENTS = False # Ruta a la base de dades de feedback agregat (separa de users.db) FEEDBACK_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "feedback.db" # Ruta a la base de dades de captions per als scores CAPTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "captions.db" # Ruta a la base de dades de vídeos (videos.db) a demo/temp/db VIDEOS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "videos.db" # Ruta a la base de dades d'audiodescripcions (audiodescriptions.db) a demo/temp/db AUDIODESCRIPTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "audiodescriptions.db" # Ruta a la base de dades d'accions (actions.db) a demo/temp/db ACTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "actions.db" # Ruta a les bases de dades de càsting i escenaris a demo/temp/db CASTING_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "casting.db" SCENARIOS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "scenarios.db" def set_db_path(db_path: str): global DEFAULT_DB_PATH DEFAULT_DB_PATH = db_path os.makedirs(os.path.dirname(db_path), exist_ok=True) def set_blockchain_enabled(enabled: bool) -> None: """Activa o desactiva l'ús de blockchain per registrar esdeveniments. Quan està desactivat (per defecte), els esdeveniments es registren a demo/temp/events.db. Quan està activat, s'envien a aws_qldb. """ global USE_BLOCKCHAIN_FOR_EVENTS USE_BLOCKCHAIN_FOR_EVENTS = bool(enabled) def get_connection(): if not DEFAULT_DB_PATH: raise ValueError("Database path not set. Call set_db_path(path) first.") return sqlite3.connect(DEFAULT_DB_PATH) @contextmanager def get_conn(db_path: Optional[str] = None): path = db_path or DEFAULT_DB_PATH conn = sqlite3.connect(path, check_same_thread=False) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init_schema(): with get_conn() as conn: c = conn.cursor() # (tus tablas existentes) c.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT, role TEXT NOT NULL, created_at TEXT NOT NULL ); """ ) # Migraciones: asegurar columnas esperadas try: c.execute("PRAGMA table_info(users)") cols = {row[1] for row in c.fetchall()} # set de nombres de columnas if "password_hash" not in cols: c.execute("ALTER TABLE users ADD COLUMN password_hash TEXT") if "role" not in cols: c.execute("ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'verd'") if "created_at" not in cols: c.execute("ALTER TABLE users ADD COLUMN created_at TEXT NOT NULL DEFAULT ''") except sqlite3.OperationalError: pass # Intento de limpieza de columna antigua si existiera (SQLite no permite DROP COLUMN en versiones antiguas) try: c.execute("ALTER TABLE users DROP COLUMN pw_hash;") except sqlite3.OperationalError: pass # (opcional: tus otras tablas) # >>> TABLA PARA FEEDBACK DE AD (no depende de videos) c.execute( """ CREATE TABLE IF NOT EXISTS feedback_ad ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_name TEXT NOT NULL, -- nombre de carpeta dentro de videos/completed user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, transcripcio INTEGER NOT NULL, -- 1..10 identificacio INTEGER NOT NULL, -- 1..10 localitzacions INTEGER NOT NULL, -- 1..10 activitats INTEGER NOT NULL, -- 1..10 narracions INTEGER NOT NULL, -- 1..10 expressivitat INTEGER NOT NULL, -- 1..10 comments TEXT, created_at TEXT NOT NULL ); """ ) # Add column if it doesn't exist, for backwards compatibility try: c.execute( "ALTER TABLE feedback_ad ADD COLUMN expressivitat INTEGER NOT NULL DEFAULT 7;" ) except sqlite3.OperationalError: pass # column already exists def add_feedback_ad( video_name: str, user_id: int, transcripcio: int, identificacio: int, localitzacions: int, activitats: int, narracions: int, expressivitat: int, comments: str | None, ): with get_conn() as conn: conn.execute( """INSERT INTO feedback_ad (video_name, user_id, transcripcio, identificacio, localitzacions, activitats, narracions, expressivitat, comments, created_at) VALUES (?,?,?,?,?,?,?,?,?,?)""", ( video_name, user_id, transcripcio, identificacio, localitzacions, activitats, narracions, expressivitat, comments, now_str(), ), ) def get_feedback_ad_for_video(video_name: str): with get_conn() as conn: cur = conn.execute( """SELECT * FROM feedback_ad WHERE video_name=? ORDER BY created_at DESC""", (video_name,), ) return cur.fetchall() def get_accessible_videos_for_session(session_id: str | None) -> List[str]: """Retorna els noms de vídeo accessibles per a una sessió. Regles: - Sempre inclou vídeos amb visibility='public' a videos.db. - Afegeix vídeos per als quals el camp owner coincideix amb algun phone registrat a events.db per a la mateixa session. Args: session_id: Identificador de sessió (st.session_state.session_id). """ # 1) Vídeos públics public_videos: set[str] = set() with _connect_videos_db() as vconn: try: for row in vconn.execute( "SELECT DISTINCT video_name FROM videos WHERE visibility = 'public'" ): public_videos.add(row["video_name"]) except sqlite3.OperationalError: # Si la taula no existeix encara, no hi ha vídeos return [] if not session_id: return sorted(public_videos) # 2) Telèfons associats a la sessió actual (a partir d'actions.db) phones: set[str] = set() with _connect_actions_db() as aconn: for row in aconn.execute( "SELECT DISTINCT phone FROM actions WHERE session = ? AND phone IS NOT NULL AND phone != ''", (session_id,), ): phones.add(row["phone"]) if not phones: return sorted(public_videos) # 3) Vídeos amb owner associat a algun d'aquests telèfons owner_videos: set[str] = set() with _connect_videos_db() as vconn: q_marks = ",".join("?" for _ in phones) params: Tuple[Any, ...] = tuple(phones) query = ( f"SELECT DISTINCT video_name FROM videos WHERE owner IN ({q_marks})" ) for row in vconn.execute(query, params): owner_videos.add(row["video_name"]) all_videos = public_videos | owner_videos return sorted(all_videos) def get_accessible_videos_with_sha1(session_id: str | None) -> List[Dict[str, Any]]: """Retorna vídeos accessibles amb el seu sha1sum des de demo/temp/videos.db. Aplica les mateixes regles que get_accessible_videos_for_session, però retorna diccionaris amb almenys les claus: video_name i sha1sum. """ # 1) Vídeos públics (segons camp visibility actual de la taula videos) public_rows: Dict[str, Dict[str, Any]] = {} with _connect_videos_db() as vconn: try: for row in vconn.execute( "SELECT video_name, sha1sum FROM videos WHERE visibility = 'public'" ): key = row["video_name"] or row["sha1sum"] public_rows[key] = {"video_name": row["video_name"], "sha1sum": row["sha1sum"]} except sqlite3.OperationalError: # Si la taula no existeix encara, no hi ha vídeos return [] if not session_id: # Ordenem per nom de vídeo per estabilitat return sorted(public_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"])) # 2) Telèfons associats a la sessió actual (a partir de actions.db) phones: set[str] = set() with _connect_actions_db() as aconn: for row in aconn.execute( "SELECT DISTINCT phone FROM actions WHERE session = ? AND phone IS NOT NULL AND phone != ''", (session_id,), ): phones.add(row["phone"]) if not phones: return sorted(public_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"])) # 3) Vídeos amb owner associat a algun d'aquests telèfons (independentment de privacy) owner_rows: Dict[str, Dict[str, Any]] = {} with _connect_videos_db() as vconn: q_marks = ",".join("?" for _ in phones) params: Tuple[Any, ...] = tuple(phones) query = f"SELECT video_name, sha1sum FROM videos WHERE owner IN ({q_marks})" for row in vconn.execute(query, params): key = row["video_name"] or row["sha1sum"] owner_rows[key] = {"video_name": row["video_name"], "sha1sum": row["sha1sum"]} all_rows: Dict[str, Dict[str, Any]] = {**public_rows, **owner_rows} return sorted(all_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"])) def _connect_feedback_db() -> sqlite3.Connection: """Connexió directa a demo/data/feedback.db. És independent de DEFAULT_DB_PATH perquè aquesta BD és específica de feedback agregat importat des de engine. """ FEEDBACK_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(FEEDBACK_DB_PATH)) conn.row_factory = sqlite3.Row return conn def _connect_actions_db() -> sqlite3.Connection: ACTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(ACTIONS_DB_PATH)) conn.row_factory = sqlite3.Row return conn def get_latest_user_phone_for_session(session_id: str) -> Tuple[str, str]: if not session_id: return "", "" try: with _connect_actions_db() as conn: cur = conn.execute( "SELECT user, phone FROM actions " "WHERE session = ? AND (user IS NOT NULL OR phone IS NOT NULL) " "ORDER BY id DESC LIMIT 1", (session_id,), ) row = cur.fetchone() if not row: return "", "" u = row["user"] if row["user"] is not None else "" p = row["phone"] if row["phone"] is not None else "" return str(u), str(p) except sqlite3.OperationalError: return "", "" def insert_action( *, session: str, user: str, phone: str, action: str, sha1sum: str, timestamp: Optional[str] = None, ) -> None: ts = timestamp or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") try: with _connect_actions_db() as conn: cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS actions ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, action TEXT NOT NULL, session TEXT, user TEXT, phone TEXT, sha1sum TEXT ); """ ) cur.execute( """INSERT INTO actions (timestamp, action, session, user, phone, sha1sum) VALUES (?,?,?,?,?,?)""", (ts, action, session or "", user or "", phone or "", sha1sum or ""), ) except sqlite3.OperationalError: return def get_video_owner_by_sha1(sha1sum: str) -> str: """Retorna el telèfon (owner) associat a un sha1sum a videos.db, o "". Cerca a demo/temp/db/videos.db una fila amb aquest sha1sum i retorna el camp owner si existeix. """ if not sha1sum: return "" try: with _connect_videos_db() as conn: cur = conn.execute( "SELECT owner FROM videos WHERE sha1sum = ? LIMIT 1", (sha1sum,), ) row = cur.fetchone() if not row: return "" owner = row["owner"] if "owner" in row.keys() else None return str(owner or "") except sqlite3.OperationalError: return "" def can_revoke_video_for_session(sha1sum: str, session_id: str) -> bool: if not sha1sum or not session_id: return False visibility_val = "" owner_val = "" try: with _connect_videos_db() as conn: cur = conn.execute( "SELECT visibility, owner FROM videos WHERE sha1sum = ? LIMIT 1", (sha1sum,), ) row = cur.fetchone() if not row: return False visibility_val = str(row["visibility"] or "").strip().lower() owner_val = str(row["owner"] or "").strip() except sqlite3.OperationalError: return False if visibility_val not in {"private", "privat"}: return False phones: set[str] = set() try: with _connect_actions_db() as conn: cur = conn.execute( "SELECT DISTINCT phone FROM actions WHERE session = ? AND phone IS NOT NULL AND phone != ''", (session_id,), ) for row in cur.fetchall(): phones.add(str(row["phone"])) except sqlite3.OperationalError: return False if not phones: return False return owner_val in phones def is_video_input_ok(sha1sum: str) -> bool: """Retorna True si el vídeo té status='input-OK' a videos.db per a aquest sha1sum.""" if not sha1sum: return False try: with _connect_videos_db() as conn: cur = conn.execute( "SELECT status FROM videos WHERE sha1sum = ? LIMIT 1", (sha1sum,), ) row = cur.fetchone() if not row: return False status = row["status"] if "status" in row.keys() else None return str(status or "").strip().lower() == "input-ok" except sqlite3.OperationalError: return False def ensure_video_row_for_upload( *, sha1sum: str, video_name: str, owner_phone: str, status: str = "input-pending", visibility: str | None = None, ) -> None: if not sha1sum: return try: with _connect_videos_db() as conn: cur = conn.cursor() try: cur.execute("PRAGMA table_info(videos)") cols = {row[1] for row in cur.fetchall()} except sqlite3.OperationalError: return alter_stmts = [] if "owner" not in cols: alter_stmts.append("ALTER TABLE videos ADD COLUMN owner TEXT") if "status" not in cols: alter_stmts.append("ALTER TABLE videos ADD COLUMN status TEXT") if "sha1sum" not in cols: alter_stmts.append("ALTER TABLE videos ADD COLUMN sha1sum TEXT") if "visibility" not in cols: alter_stmts.append("ALTER TABLE videos ADD COLUMN visibility TEXT") for stmt in alter_stmts: try: cur.execute(stmt) except sqlite3.OperationalError: continue row = cur.execute( "SELECT id FROM videos WHERE sha1sum = ? LIMIT 1", (sha1sum,), ).fetchone() if row is not None: return vis = visibility or "private" cur.execute( "INSERT INTO videos (video_name, owner, visibility, sha1sum, status) " "VALUES (?,?,?,?,?)", (video_name or sha1sum, owner_phone or "", vis, sha1sum, status), ) except sqlite3.OperationalError: return def update_video_status(sha1sum: str, status: str) -> None: """Actualitza el camp status d'un vídeo existent a videos.db per sha1sum. Si la taula o el registre no existeixen, no fa res. """ if not sha1sum: return try: with _connect_videos_db() as conn: cur = conn.cursor() try: cur.execute("PRAGMA table_info(videos)") cols = {row[1] for row in cur.fetchall()} except sqlite3.OperationalError: return # Assegurar columnes bàsiques alter_stmts: list[str] = [] if "status" not in cols: alter_stmts.append("ALTER TABLE videos ADD COLUMN status TEXT") if "sha1sum" not in cols: alter_stmts.append("ALTER TABLE videos ADD COLUMN sha1sum TEXT") for stmt in alter_stmts: try: cur.execute(stmt) except sqlite3.OperationalError: continue cur.execute( "UPDATE videos SET status = ? WHERE sha1sum = ?", (status, sha1sum), ) except sqlite3.OperationalError: return def get_videos_by_status(status: str) -> List[Dict[str, Any]]: """Retorna llista de vídeos a videos.db amb un status concret. Cada element és un dict amb com a mínim: sha1sum, video_name. Si la taula o les columnes no existeixen, retorna []. """ if not status: return [] try: with _connect_videos_db() as conn: cur = conn.cursor() try: cur.execute("PRAGMA table_info(videos)") cols = {row[1] for row in cur.fetchall()} except sqlite3.OperationalError: return [] if "status" not in cols or "sha1sum" not in cols: return [] # video_name pot no existir en esquemes antics; fem SELECT defensiu has_video_name = "video_name" in cols select_sql = ( "SELECT sha1sum, video_name FROM videos WHERE status = ?" if has_video_name else "SELECT sha1sum, sha1sum AS video_name FROM videos WHERE status = ?" ) results: List[Dict[str, Any]] = [] for row in cur.execute(select_sql, (status,)): sha1 = str(row[0]) if row[0] is not None else "" vname = str(row[1]) if row[1] is not None else sha1 if not sha1: continue results.append({"sha1sum": sha1, "video_name": vname}) return results except sqlite3.OperationalError: return [] def _connect_audiodescriptions_db() -> sqlite3.Connection: """Connexió directa a demo/temp/audiodescriptions.db. Es fa servir per llegir i actualitzar AD (une_ad, free_ad, eval, mètriques, etc.). """ AUDIODESCRIPTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(AUDIODESCRIPTIONS_DB_PATH)) conn.row_factory = sqlite3.Row return conn def get_audiodescription(sha1sum: str, version: str) -> Optional[sqlite3.Row]: """Retorna una fila d'audiodescriptions per sha1sum i version, o None. Si la taula no existeix, retorna None silenciosament. """ try: with _connect_audiodescriptions_db() as conn: cur = conn.execute( "SELECT * FROM audiodescriptions WHERE sha1sum=? AND version=?", (sha1sum, version), ) return cur.fetchone() except sqlite3.OperationalError: # Taula o BD encara no creada return None def get_videos_from_audiodescriptions( session_id: str | None = None, role: str | None = None, ) -> List[Dict[str, Any]]: """Retorna vídeos disponibles segons audiodescriptions.db i videos.db. 1) Llegeix demo/temp/db/audiodescriptions.db i obté els DISTINCT sha1sum. 2) Per a cada sha1sum, mira a demo/temp/db/videos.db quin és el video_name. Si no el troba, fa servir el sha1sum com a fallback. """ sha1_list: List[str] = [] try: with _connect_audiodescriptions_db() as conn: cur = conn.cursor() cur.execute("SELECT DISTINCT sha1sum FROM audiodescriptions") rows = cur.fetchall() sha1_list = [str(r["sha1sum"]) for r in rows if r["sha1sum"]] except sqlite3.OperationalError: # Taula o BD encara no creada return [] if not sha1_list: return [] # Determinar telèfon associat a la sessió actual (si aplica) session_phone: str = "" if session_id: try: _, session_phone = get_latest_user_phone_for_session(session_id) except sqlite3.OperationalError: session_phone = "" # Map sha1sum -> video_name utilitzant videos.db, amb filtres de visibility/rol/phone result: List[Dict[str, Any]] = [] try: with _connect_videos_db() as vconn: vcur = vconn.cursor() for sha1 in sha1_list: try: row = vcur.execute( "SELECT video_name, visibility, owner FROM videos WHERE sha1sum = ? LIMIT 1", (sha1,), ).fetchone() if row is None: # Sense registre a videos.db, ignorem aquest sha1 continue vname = str(row["video_name"]) if row["video_name"] else "" visibility_val = str(row["visibility"] or "").strip().lower() owner_val = str(row["owner"] or "").strip() if not vname: # Sense nom humà, ignorem aquest vídeo al desplegable continue except sqlite3.OperationalError: # Esquema antic o taula inexistent: ignorem aquest sha1 continue # Evitar mostrar hashes com a nom: descartar si vname == sha1 if vname == sha1: continue # Regles d'accés segons visibility + rol + telèfon de sessió is_public = visibility_val == "public" is_private = visibility_val == "private" or not visibility_val allowed = False if is_public: allowed = True else: # Vídeos privats if role in {"verd", "blau"}: # Administrador i revisor veuen tots els privats allowed = True elif role == "groc": # Col·laborador verificat només si el seu telèfon coincideix amb owner if session_phone and owner_val and session_phone == owner_val: allowed = True else: # Altres rols (taronja, vermell, o desconegut) no veuen privats allowed = False if not allowed: continue result.append({"sha1sum": sha1, "video_name": vname}) except sqlite3.OperationalError: # Si videos.db no existeix o té un esquema incompatible, no retornem res result = [] return result def get_audiodescription_history(sha1sum: str, version: str) -> list[sqlite3.Row]: """Retorna només la darrera fila d'audiodescriptions per sha1sum+version. Abans es feien servir múltiples files per representar diferents etapes (original, HITL OK, HITL Test). Ara aquests estats viuen en diferents columnes d'un únic registre, de manera que només ens interessa l'última versió desada per a (sha1sum, version). """ try: sql = ( "SELECT * FROM audiodescriptions " "WHERE sha1sum = ? AND LOWER(version) = LOWER(?) " "ORDER BY rowid DESC " "LIMIT 1" ) with _connect_audiodescriptions_db() as conn: row = conn.execute(sql, (sha1sum, version)).fetchone() return [row] if row is not None else [] except sqlite3.OperationalError: return [] def update_audiodescription_text( sha1sum: str, version: str, *, free_ad: Optional[str] = None, une_ad: Optional[str] = None, ok_free_ad: Optional[str] = None, ok_une_ad: Optional[str] = None, test_free_ad: Optional[str] = None, test_une_ad: Optional[str] = None, ) -> None: """Actualitza camps de text per a una AD existent. Si no existeix el registre, no fa res (per simplicitat). """ try: with _connect_audiodescriptions_db() as conn: row = conn.execute( "SELECT 1 FROM audiodescriptions WHERE sha1sum=? AND version=?", (sha1sum, version), ).fetchone() if not row: return fields = [] values: List[Any] = [] # Camps base if free_ad is not None: fields.append("free_ad = ?") values.append(free_ad) if une_ad is not None: fields.append("une_ad = ?") values.append(une_ad) # Camps HITL OK / HITL Test (si les columnes existeixen a l'esquema actual) if ok_free_ad is not None: fields.append("ok_free_ad = ?") values.append(ok_free_ad) if ok_une_ad is not None: fields.append("ok_une_ad = ?") values.append(ok_une_ad) if test_free_ad is not None: fields.append("test_free_ad = ?") values.append(test_free_ad) if test_une_ad is not None: fields.append("test_une_ad = ?") values.append(test_une_ad) if not fields: return values.extend([sha1sum, version]) sql = f"UPDATE audiodescriptions SET {', '.join(fields)} WHERE sha1sum=? AND version=?" conn.execute(sql, tuple(values)) except sqlite3.OperationalError: # Si la taula no existeix, ignorem l'actualització return def update_audiodescription_info_ad( sha1sum: str, version: str, info_ad: str, ) -> None: """Actualitza el camp info_ad per a una AD existent. Si la taula o el registre no existeixen, no fa res. """ try: with _connect_audiodescriptions_db() as conn: row = conn.execute( "SELECT 1 FROM audiodescriptions WHERE sha1sum=? AND version=?", (sha1sum, version), ).fetchone() if not row: return conn.execute( "UPDATE audiodescriptions SET info_ad = ? WHERE sha1sum=? AND version=?", (info_ad, sha1sum, version), ) except sqlite3.OperationalError: return def upsert_audiodescription_text( sha1sum: str, version: str, *, free_ad: Optional[str] = None, une_ad: Optional[str] = None, ) -> None: """Crea o actualitza un registre d'audiodescripció per sha1sum+version. - Si la taula no existeix, es crea amb un esquema bàsic. - Si no hi ha registre per (sha1sum, version), s'insereix. - Si ja existeix, s'actualitzen els camps de text. """ try: with _connect_audiodescriptions_db() as conn: cur = conn.cursor() # Assegurar que la taula existeix (no trenca si ja existeix amb aquest esquema o un de compatible) cur.execute( """ CREATE TABLE IF NOT EXISTS audiodescriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT, sha1sum TEXT NOT NULL, version TEXT NOT NULL, une_ad TEXT, free_ad TEXT, created_at TEXT, updated_at TEXT ); """ ) row = cur.execute( "SELECT id FROM audiodescriptions WHERE sha1sum=? AND version=?", (sha1sum, version), ).fetchone() now = now_str() if row is None: cur.execute( """ INSERT INTO audiodescriptions (sha1sum, version, une_ad, free_ad, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, (sha1sum, version, une_ad or "", free_ad or "", now, now), ) else: fields = [] values: list[Any] = [] if une_ad is not None: fields.append("une_ad = ?") values.append(une_ad) if free_ad is not None: fields.append("free_ad = ?") values.append(free_ad) if not fields: return fields.append("updated_at = ?") values.append(now) values.extend([sha1sum, version]) sql = f"UPDATE audiodescriptions SET {', '.join(fields)} WHERE sha1sum=? AND version=?" cur.execute(sql, tuple(values)) except sqlite3.OperationalError: # Si passa algun error d'esquema, no aturem l'aplicació return def _connect_captions_db() -> sqlite3.Connection: """Connexió a demo/data/captions.db i creació de la taula si cal. Estructura: - variable TEXT PRIMARY KEY (p.ex. "score_1") - caption TEXT (etiqueta humana) """ CAPTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(CAPTIONS_DB_PATH)) cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS captions ( variable TEXT PRIMARY KEY, caption TEXT NOT NULL ); """ ) conn.commit() return conn def insert_demo_feedback_row( *, user: str, session: str, video_name: str, version: str, une_ad: str, free_ad: str, comments: str | None, transcripcio: int, identificacio: int, localitzacions: int, activitats: int, narracions: int, expressivitat: int, ) -> None: """Insereix una valoració detallada a demo/data/feedback.db. Escala els sliders de 0-7 a 0-100 i desa els textos d'UNE i narració lliure. Les columnes de sliders tenen per nom el caption del slider a la UI. """ # Escalat 0-7 -> 0-100 def scale(v: int) -> int: v = max(0, min(7, int(v))) return int(round(v * 100.0 / 7.0)) slider_values = { "Precisió Descriptiva": scale(transcripcio), "Sincronització Temporal": scale(identificacio), "Claredat i Concisió": scale(localitzacions), "Inclusió de Diàleg": scale(activitats), "Contextualització": scale(narracions), "Flux i Ritme de la Narració": scale(expressivitat), } ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") with _connect_feedback_db() as conn: conn.execute( """ INSERT INTO feedback ( timestamp, user, session, video_name, version, une_ad, free_ad, comments, score_1, score_2, score_3, score_4, score_5, score_6, "Precisió Descriptiva", "Sincronització Temporal", "Claredat i Concisió", "Inclusió de Diàleg", "Contextualització", "Flux i Ritme de la Narració" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( ts, user, session, video_name, version, une_ad, free_ad, comments or "", slider_values["Precisió Descriptiva"], slider_values["Sincronització Temporal"], slider_values["Claredat i Concisió"], slider_values["Inclusió de Diàleg"], slider_values["Contextualització"], slider_values["Flux i Ritme de la Narració"], slider_values["Precisió Descriptiva"], slider_values["Sincronització Temporal"], slider_values["Claredat i Concisió"], slider_values["Inclusió de Diàleg"], slider_values["Contextualització"], slider_values["Flux i Ritme de la Narració"], ), ) def _connect_videos_db() -> sqlite3.Connection: """Connexió directa a demo/temp/videos.db. Aquesta BD conté metadades dels vídeos (video_name, owner, visibility, sha1sum...). """ VIDEOS_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(VIDEOS_DB_PATH)) conn.row_factory = sqlite3.Row return conn def _connect_simple_mapping_db(db_path: Path, table_name: str) -> sqlite3.Connection: """Connexió a una BD simple (sha1sum, name, description) a demo/temp/db. Es fa servir per a casting.db i scenarios.db. """ db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cur = conn.cursor() cur.execute( f""" CREATE TABLE IF NOT EXISTS {table_name} ( sha1sum TEXT NOT NULL, name TEXT NOT NULL, description TEXT ); """ ) conn.commit() return conn def _connect_casting_db() -> sqlite3.Connection: """Connexió directa a demo/temp/db/casting.db (taula casting).""" return _connect_simple_mapping_db(CASTING_DB_PATH, "casting") def _connect_scenarios_db() -> sqlite3.Connection: """Connexió directa a demo/temp/db/scenarios.db (taula scenarios).""" return _connect_simple_mapping_db(SCENARIOS_DB_PATH, "scenarios") def insert_casting_row(sha1sum: str, name: str, description: str | None = None) -> None: """Insereix un personatge a casting.db per a un sha1sum donat.""" if not sha1sum or not name: return try: with _connect_casting_db() as conn: conn.execute( "INSERT INTO casting (sha1sum, name, description) VALUES (?,?,?)", (sha1sum, name, description or ""), ) except sqlite3.OperationalError: return def insert_scenario_row(sha1sum: str, name: str, description: str | None = None) -> None: """Insereix un escenari a scenarios.db per a un sha1sum donat.""" if not sha1sum or not name: return try: with _connect_scenarios_db() as conn: conn.execute( "INSERT INTO scenarios (sha1sum, name, description) VALUES (?,?,?)", (sha1sum, name, description or ""), ) except sqlite3.OperationalError: return def log_action( *, session: str, user: str, phone: str, action: str, sha1sum: str, timestamp: Optional[str] = None, ) -> None: """Insereix un registre a demo/temp/actions.db (taula actions).""" ts = timestamp or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") insert_action( session=session or "", user=user or "", phone=phone or "", action=action, sha1sum=sha1sum or "", timestamp=ts, ) def has_video_approval_action(sha1sum: str) -> bool: """Comprova si existeix una acció d'acceptació d'input per a un sha1sum. Busca a demo/temp/actions.db una fila amb action='input-OK' i el sha1sum especificat. """ if not sha1sum: return False try: with _connect_actions_db() as conn: cur = conn.execute( "SELECT 1 FROM actions WHERE action = ? AND sha1sum = ? LIMIT 1", ("input-OK", sha1sum), ) return cur.fetchone() is not None except sqlite3.OperationalError: # Si la taula encara no existeix, no hi ha aprovacions return False def get_feedback_video_stats(agg: str = "mitjana") -> List[Dict[str, Any]]: """Retorna estadístiques agregades per vídeo de demo/data/feedback.db. Es basa exclusivament en les columnes numèriques score_1..score_6 (0-100). agg pot ser: - "mitjana": mitjana dels scores per vídeo. - "mediana": mediana dels scores per vídeo. - "inicial": primer registre (per timestamp) per vídeo. - "actual": darrer registre (per timestamp) per vídeo. """ agg = (agg or "mitjana").lower() with _connect_feedback_db() as conn: cur = conn.execute( """ SELECT video_name, timestamp, score_1, score_2, score_3, score_4, score_5, score_6 FROM feedback """ ) rows = cur.fetchall() by_video: Dict[str, List[Dict[str, Any]]] = {} for row in rows: vn = row["video_name"] parsed_scores = [ row["score_1"], row["score_2"], row["score_3"], row["score_4"], row["score_5"], row["score_6"], ] enriched = { "video_name": vn, "timestamp": row["timestamp"], "scores": parsed_scores, } by_video.setdefault(vn, []).append(enriched) def parse_ts(ts: str) -> datetime: # Format des d'init_feedback.py: "YYYY-MM-DD HH:MM:SS" try: return datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") except Exception: return datetime.min result: List[Dict[str, Any]] = [] for video_name, vrows in by_video.items(): if not vrows: continue # Ordenem per timestamp per als modes "inicial" i "actual" vrows_sorted = sorted(vrows, key=lambda r: parse_ts(r["timestamp"])) def agg_index(idx: int) -> Optional[float]: vals = [r["scores"][idx] for r in vrows if r["scores"][idx] is not None] if not vals: return None if agg == "mitjana": return float(sum(vals) / len(vals)) if agg == "mediana": return float(median(vals)) if agg == "inicial": first = vrows_sorted[0]["scores"][idx] return float(first) if first is not None else None if agg == "actual": last = vrows_sorted[-1]["scores"][idx] return float(last) if last is not None else None # fallback a mitjana si el mode no és reconegut return float(sum(vals) / len(vals)) row_out: Dict[str, Any] = { "video_name": video_name, "n": len(vrows), } for i in range(6): row_out[f"score_{i+1}"] = agg_index(i) result.append(row_out) # Ordenació per defecte alfabètica pel nom; l'ordre final es decidirà a la UI result.sort(key=lambda r: r["video_name"]) return result def get_feedback_rows_for_video_version(video_name: str, version: str) -> List[sqlite3.Row]: """Retorna totes les files de feedback per a un vídeo i versió concrets. Només s'utilitzen les columnes score_1..score_6 (0-100) per a estadístiques de distribució a la UI. """ if not video_name or not version: return [] try: with _connect_feedback_db() as conn: cur = conn.execute( """ SELECT score_1, score_2, score_3, score_4, score_5, score_6 FROM feedback WHERE video_name = ? AND version = ? """, (video_name, version), ) return cur.fetchall() except sqlite3.OperationalError: return [] def _init_captions_from_eval() -> None: """Inicialitza captions.db agafant etiquetes des d'un eval.csv. Per simplicitat, intentem llegir `demo/data/media/parella/MoE/eval.csv`. Si no existeix o falla, es deixen etiquetes per defecte. """ base_demo = Path(__file__).resolve().parent eval_path = base_demo / "data" / "media" / "parella" / "MoE" / "eval.csv" default_labels = [f"score_{i}" for i in range(1, 7)] labels = default_labels[:] if eval_path.exists(): try: import csv with eval_path.open("r", encoding="utf-8") as f: reader = csv.DictReader(f) tmp: List[str] = [] for row in reader: if len(tmp) >= 6: break name = (row.get("Caracteristica") or "").strip().strip('"') if name: tmp.append(name) if tmp: labels = tmp while len(labels) < 6: labels.append(default_labels[len(labels)]) labels = labels[:6] except Exception: pass with _connect_captions_db() as conn: cur = conn.cursor() cur.execute("DELETE FROM captions") for i in range(6): cur.execute( "INSERT OR REPLACE INTO captions (variable, caption) VALUES (?, ?)", (f"score_{i+1}", labels[i]), ) def get_feedback_score_labels() -> List[str]: """Retorna les etiquetes humanes per a score_1..score_6 des de captions.db. Si captions.db és buit, s'intenta inicialitzar-lo a partir d'un eval.csv. """ default_labels = [f"score_{i}" for i in range(1, 7)] with _connect_captions_db() as conn: cur = conn.cursor() cur.execute("SELECT variable, caption FROM captions ORDER BY variable") rows = cur.fetchall() if not rows: # Inicialitzar des d'un eval.csv i tornar-ho a intentar _init_captions_from_eval() cur.execute("SELECT variable, caption FROM captions ORDER BY variable") rows = cur.fetchall() if not rows: return default_labels labels: List[str] = [] for _, caption in rows: labels.append(caption) while len(labels) < 6: labels.append(default_labels[len(labels)]) return labels[:6] def get_feedback_ad_stats(): # medias por vídeo y ranking with get_conn() as conn: cur = conn.execute( """ SELECT video_name, COUNT(*) AS n, AVG(transcripcio) AS avg_transcripcio, AVG(identificacio) AS avg_identificacio, AVG(localitzacions) AS avg_localitzacions, AVG(activitats) AS avg_activitats, AVG(narracions) AS avg_narracions, AVG(expressivitat) AS avg_expressivitat, (AVG(transcripcio)+AVG(identificacio)+AVG(localitzacions)+AVG(activitats)+AVG(narracions)+AVG(expressivitat))/6.0 AS avg_global FROM feedback_ad GROUP BY video_name ORDER BY avg_global DESC, n DESC; """ ) return cur.fetchall() def now_str(): return datetime.utcnow().isoformat(timespec="seconds") + "Z" # Users def create_user(username: str, password_hash: str, role: str): with get_conn() as conn: conn.execute( "INSERT INTO users(username, password_hash, role, created_at) VALUES (?,?,?,?)", (username, password_hash, role, now_str()), ) def get_user(username: str): with get_conn() as conn: cur = conn.execute("SELECT * FROM users WHERE username=?", (username,)) return cur.fetchone() def get_all_users() -> List[Dict[str, Any]]: with get_conn() as conn: cur = conn.execute("SELECT id, username, role FROM users ORDER BY username") return cur.fetchall() def update_user_password(username: str, password_hash: str): with get_conn() as conn: conn.execute( "UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username), )