import os import sqlite3 from contextlib import contextmanager from datetime import datetime from pathlib import Path from statistics import median from typing import Optional, Dict, Any, List, Tuple # Reutilizamos la misma lógica que antes, pero centralizada en este módulo DEFAULT_DB_PATH = None # set by set_db_path at runtime # Flag global per decidir si es fa servir blockchain (AWS QLDB) per als esdeveniments USE_BLOCKCHAIN_FOR_EVENTS = False # Ruta a la base de dades de feedback agregat (separa de users.db) FEEDBACK_DB_PATH = Path(__file__).resolve().parent / "temp" / "feedback.db" # Ruta a la base de dades de captions per als scores CAPTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "captions.db" # Ruta a la base de dades d'esdeveniments (events.db) a demo/temp EVENTS_DB_PATH = Path(__file__).resolve().parent / "temp" / "events.db" # Ruta a la base de dades de vídeos (videos.db) a demo/temp VIDEOS_DB_PATH = Path(__file__).resolve().parent / "temp" / "videos.db" def set_db_path(db_path: str): global DEFAULT_DB_PATH DEFAULT_DB_PATH = db_path os.makedirs(os.path.dirname(db_path), exist_ok=True) def set_blockchain_enabled(enabled: bool) -> None: """Activa o desactiva l'ús de blockchain per registrar esdeveniments. Quan està desactivat (per defecte), els esdeveniments es registren a demo/temp/events.db. Quan està activat, s'envien a aws_qldb. """ global USE_BLOCKCHAIN_FOR_EVENTS USE_BLOCKCHAIN_FOR_EVENTS = bool(enabled) def get_connection(): if not DEFAULT_DB_PATH: raise ValueError("Database path not set. Call set_db_path(path) first.") return sqlite3.connect(DEFAULT_DB_PATH) @contextmanager def get_conn(db_path: Optional[str] = None): path = db_path or DEFAULT_DB_PATH conn = sqlite3.connect(path, check_same_thread=False) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init_schema(): with get_conn() as conn: c = conn.cursor() # (tus tablas existentes) c.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT, role TEXT NOT NULL, created_at TEXT NOT NULL ); """ ) # Migraciones: asegurar columnas esperadas try: c.execute("PRAGMA table_info(users)") cols = {row[1] for row in c.fetchall()} # set de nombres de columnas if "password_hash" not in cols: c.execute("ALTER TABLE users ADD COLUMN password_hash TEXT") if "role" not in cols: c.execute("ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'verd'") if "created_at" not in cols: c.execute("ALTER TABLE users ADD COLUMN created_at TEXT NOT NULL DEFAULT ''") except sqlite3.OperationalError: pass # Intento de limpieza de columna antigua si existiera (SQLite no permite DROP COLUMN en versiones antiguas) try: c.execute("ALTER TABLE users DROP COLUMN pw_hash;") except sqlite3.OperationalError: pass # (opcional: tus otras tablas) # Esquema per a demo/temp/events.db (registre d'esdeveniments) EVENTS_DB_PATH.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(str(EVENTS_DB_PATH)) as econn: ec = econn.cursor() ec.execute( """ CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, session TEXT, ip TEXT, user TEXT, password TEXT, phone TEXT, action TEXT NOT NULL, sha1sum TEXT, visibility TEXT ); """ ) # Afegir columna visibility si la taula ja existia sense aquest camp try: ec.execute("ALTER TABLE events ADD COLUMN visibility TEXT") except sqlite3.OperationalError: # La columna ja existeix pass econn.commit() # >>> TABLA PARA FEEDBACK DE AD (no depende de videos) c.execute( """ CREATE TABLE IF NOT EXISTS feedback_ad ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_name TEXT NOT NULL, -- nombre de carpeta dentro de videos/completed user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, transcripcio INTEGER NOT NULL, -- 1..10 identificacio INTEGER NOT NULL, -- 1..10 localitzacions INTEGER NOT NULL, -- 1..10 activitats INTEGER NOT NULL, -- 1..10 narracions INTEGER NOT NULL, -- 1..10 expressivitat INTEGER NOT NULL, -- 1..10 comments TEXT, created_at TEXT NOT NULL ); """ ) # Add column if it doesn't exist, for backwards compatibility try: c.execute( "ALTER TABLE feedback_ad ADD COLUMN expressivitat INTEGER NOT NULL DEFAULT 7;" ) except sqlite3.OperationalError: pass # column already exists def add_feedback_ad( video_name: str, user_id: int, transcripcio: int, identificacio: int, localitzacions: int, activitats: int, narracions: int, expressivitat: int, comments: str | None, ): with get_conn() as conn: conn.execute( """INSERT INTO feedback_ad (video_name, user_id, transcripcio, identificacio, localitzacions, activitats, narracions, expressivitat, comments, created_at) VALUES (?,?,?,?,?,?,?,?,?,?)""", ( video_name, user_id, transcripcio, identificacio, localitzacions, activitats, narracions, expressivitat, comments, now_str(), ), ) def get_feedback_ad_for_video(video_name: str): with get_conn() as conn: cur = conn.execute( """SELECT * FROM feedback_ad WHERE video_name=? ORDER BY created_at DESC""", (video_name,), ) return cur.fetchall() def get_accessible_videos_for_session(session_id: str | None) -> List[str]: """Retorna els noms de vídeo accessibles per a una sessió. Regles: - Sempre inclou vídeos amb visibility='public' a videos.db. - Afegeix vídeos per als quals el camp owner coincideix amb algun phone registrat a events.db per a la mateixa session. Args: session_id: Identificador de sessió (st.session_state.session_id). """ # 1) Vídeos públics public_videos: set[str] = set() with _connect_videos_db() as vconn: try: for row in vconn.execute( "SELECT DISTINCT video_name FROM videos WHERE visibility = 'public'" ): public_videos.add(row["video_name"]) except sqlite3.OperationalError: # Si la taula no existeix encara, no hi ha vídeos return [] if not session_id: return sorted(public_videos) # 2) Telèfons associats a la sessió actual phones: set[str] = set() with _connect_events_db() as econn: for row in econn.execute( "SELECT DISTINCT phone FROM events WHERE session = ? AND phone IS NOT NULL AND phone != ''", (session_id,), ): phones.add(row["phone"]) if not phones: return sorted(public_videos) # 3) Vídeos amb owner associat a algun d'aquests telèfons owner_videos: set[str] = set() with _connect_videos_db() as vconn: q_marks = ",".join("?" for _ in phones) params: Tuple[Any, ...] = tuple(phones) query = ( f"SELECT DISTINCT video_name FROM videos WHERE owner IN ({q_marks})" ) for row in vconn.execute(query, params): owner_videos.add(row["video_name"]) all_videos = public_videos | owner_videos return sorted(all_videos) def _connect_feedback_db() -> sqlite3.Connection: """Connexió directa a demo/data/feedback.db. És independent de DEFAULT_DB_PATH perquè aquesta BD és específica de feedback agregat importat des de engine. """ FEEDBACK_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(FEEDBACK_DB_PATH)) conn.row_factory = sqlite3.Row return conn def _connect_captions_db() -> sqlite3.Connection: """Connexió a demo/data/captions.db i creació de la taula si cal. Estructura: - variable TEXT PRIMARY KEY (p.ex. "score_1") - caption TEXT (etiqueta humana) """ CAPTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(CAPTIONS_DB_PATH)) cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS captions ( variable TEXT PRIMARY KEY, caption TEXT NOT NULL ); """ ) conn.commit() return conn def insert_demo_feedback_row( *, user: str, session: str, video_name: str, version: str, une_ad: str, free_ad: str, comments: str | None, transcripcio: int, identificacio: int, localitzacions: int, activitats: int, narracions: int, expressivitat: int, ) -> None: """Insereix una valoració detallada a demo/data/feedback.db. Escala els sliders de 0-7 a 0-100 i desa els textos d'UNE i narració lliure. Les columnes de sliders tenen per nom el caption del slider a la UI. """ # Escalat 0-7 -> 0-100 def scale(v: int) -> int: v = max(0, min(7, int(v))) return int(round(v * 100.0 / 7.0)) slider_values = { "Precisió Descriptiva": scale(transcripcio), "Sincronització Temporal": scale(identificacio), "Claredat i Concisió": scale(localitzacions), "Inclusió de Diàleg": scale(activitats), "Contextualització": scale(narracions), "Flux i Ritme de la Narració": scale(expressivitat), } ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") with _connect_feedback_db() as conn: conn.execute( """ INSERT INTO feedback ( timestamp, user, session, video_name, version, une_ad, free_ad, comments, score_1, score_2, score_3, score_4, score_5, score_6, "Precisió Descriptiva", "Sincronització Temporal", "Claredat i Concisió", "Inclusió de Diàleg", "Contextualització", "Flux i Ritme de la Narració" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( ts, user, session, video_name, version, une_ad, free_ad, comments or "", slider_values["Precisió Descriptiva"], slider_values["Sincronització Temporal"], slider_values["Claredat i Concisió"], slider_values["Inclusió de Diàleg"], slider_values["Contextualització"], slider_values["Flux i Ritme de la Narració"], slider_values["Precisió Descriptiva"], slider_values["Sincronització Temporal"], slider_values["Claredat i Concisió"], slider_values["Inclusió de Diàleg"], slider_values["Contextualització"], slider_values["Flux i Ritme de la Narració"], ), ) def _connect_events_db() -> sqlite3.Connection: """Connexió directa a demo/temp/events.db. Es fa independent de DEFAULT_DB_PATH per mantenir aquesta BD separada de users.db, igual que feedback.db. """ EVENTS_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(EVENTS_DB_PATH)) conn.row_factory = sqlite3.Row return conn def _connect_videos_db() -> sqlite3.Connection: """Connexió directa a demo/temp/videos.db. Aquesta BD conté metadades dels vídeos (video_name, owner, visibility, sha1sum...). """ VIDEOS_DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(VIDEOS_DB_PATH)) conn.row_factory = sqlite3.Row return conn def log_event( *, session: str, ip: str, user: str, password: str, phone: str, action: str, sha1sum: str, visibility: str | None = None, timestamp: Optional[str] = None, ) -> None: """Insereix un registre a demo/temp/events.db. - timestamp: si no s'especifica, es fa servir UTC "YYYY-MM-DD HH:MM:SS". - session, ip, user, password, phone, sha1sum es guarden com a TEXT. """ ts = timestamp or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") if not USE_BLOCKCHAIN_FOR_EVENTS: # Mode per defecte: registrar en demo/data/events.db with _connect_events_db() as conn: conn.execute( """INSERT INTO events (timestamp, session, ip, user, password, phone, action, sha1sum, visibility) VALUES (?,?,?,?,?,?,?,?,?)""", ( ts, session or "", ip or "", user or "", password or "", phone or "", action, sha1sum or "", visibility or "", ), ) else: # Mode blockchain: delegar a aws_qldb (simulat fins activació real) try: from aws_qldb import qldb_manager payload = { "timestamp": ts, "session": session or "", "ip": ip or "", "user": user or "", "password": password or "", "phone": phone or "", "action": action, "sha1sum": sha1sum or "", "visibility": visibility or "", } # Mètode específic per a esdeveniments generics (simulat) if hasattr(qldb_manager, "record_event"): qldb_manager.record_event(payload) else: # Fallback: registrar com a log simulat print(f"[QLDB EVENTS - SIMULATED] {payload}") except Exception as e: # No interrompre el flux de l'aplicació per errors de blockchain print(f"[QLDB EVENTS ERROR] No s'ha pogut registrar l'esdeveniment: {e}") def get_feedback_video_stats(agg: str = "mitjana") -> List[Dict[str, Any]]: """Retorna estadístiques agregades per vídeo de demo/data/feedback.db. Es basa exclusivament en les columnes numèriques score_1..score_6 (0-100). agg pot ser: - "mitjana": mitjana dels scores per vídeo. - "mediana": mediana dels scores per vídeo. - "inicial": primer registre (per timestamp) per vídeo. - "actual": darrer registre (per timestamp) per vídeo. """ agg = (agg or "mitjana").lower() with _connect_feedback_db() as conn: cur = conn.execute( """ SELECT video_name, timestamp, score_1, score_2, score_3, score_4, score_5, score_6 FROM feedback """ ) rows = cur.fetchall() by_video: Dict[str, List[Dict[str, Any]]] = {} for row in rows: vn = row["video_name"] parsed_scores = [ row["score_1"], row["score_2"], row["score_3"], row["score_4"], row["score_5"], row["score_6"], ] enriched = { "video_name": vn, "timestamp": row["timestamp"], "scores": parsed_scores, } by_video.setdefault(vn, []).append(enriched) def parse_ts(ts: str) -> datetime: # Format des d'init_feedback.py: "YYYY-MM-DD HH:MM:SS" try: return datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") except Exception: return datetime.min result: List[Dict[str, Any]] = [] for video_name, vrows in by_video.items(): if not vrows: continue # Ordenem per timestamp per als modes "inicial" i "actual" vrows_sorted = sorted(vrows, key=lambda r: parse_ts(r["timestamp"])) def agg_index(idx: int) -> Optional[float]: vals = [r["scores"][idx] for r in vrows if r["scores"][idx] is not None] if not vals: return None if agg == "mitjana": return float(sum(vals) / len(vals)) if agg == "mediana": return float(median(vals)) if agg == "inicial": first = vrows_sorted[0]["scores"][idx] return float(first) if first is not None else None if agg == "actual": last = vrows_sorted[-1]["scores"][idx] return float(last) if last is not None else None # fallback a mitjana si el mode no és reconegut return float(sum(vals) / len(vals)) row_out: Dict[str, Any] = { "video_name": video_name, "n": len(vrows), } for i in range(6): row_out[f"score_{i+1}"] = agg_index(i) result.append(row_out) # Ordenació per defecte alfabètica pel nom; l'ordre final es decidirà a la UI result.sort(key=lambda r: r["video_name"]) return result def _init_captions_from_eval() -> None: """Inicialitza captions.db agafant etiquetes des d'un eval.csv. Per simplicitat, intentem llegir `demo/data/media/parella/MoE/eval.csv`. Si no existeix o falla, es deixen etiquetes per defecte. """ base_demo = Path(__file__).resolve().parent eval_path = base_demo / "data" / "media" / "parella" / "MoE" / "eval.csv" default_labels = [f"score_{i}" for i in range(1, 7)] labels = default_labels[:] if eval_path.exists(): try: import csv with eval_path.open("r", encoding="utf-8") as f: reader = csv.DictReader(f) tmp: List[str] = [] for row in reader: if len(tmp) >= 6: break name = (row.get("Caracteristica") or "").strip().strip('"') if name: tmp.append(name) if tmp: labels = tmp while len(labels) < 6: labels.append(default_labels[len(labels)]) labels = labels[:6] except Exception: pass with _connect_captions_db() as conn: cur = conn.cursor() cur.execute("DELETE FROM captions") for i in range(6): cur.execute( "INSERT OR REPLACE INTO captions (variable, caption) VALUES (?, ?)", (f"score_{i+1}", labels[i]), ) def get_feedback_score_labels() -> List[str]: """Retorna les etiquetes humanes per a score_1..score_6 des de captions.db. Si captions.db és buit, s'intenta inicialitzar-lo a partir d'un eval.csv. """ default_labels = [f"score_{i}" for i in range(1, 7)] with _connect_captions_db() as conn: cur = conn.cursor() cur.execute("SELECT variable, caption FROM captions ORDER BY variable") rows = cur.fetchall() if not rows: # Inicialitzar des d'un eval.csv i tornar-ho a intentar _init_captions_from_eval() cur.execute("SELECT variable, caption FROM captions ORDER BY variable") rows = cur.fetchall() if not rows: return default_labels labels: List[str] = [] for _, caption in rows: labels.append(caption) while len(labels) < 6: labels.append(default_labels[len(labels)]) return labels[:6] def get_feedback_ad_stats(): # medias por vídeo y ranking with get_conn() as conn: cur = conn.execute( """ SELECT video_name, COUNT(*) AS n, AVG(transcripcio) AS avg_transcripcio, AVG(identificacio) AS avg_identificacio, AVG(localitzacions) AS avg_localitzacions, AVG(activitats) AS avg_activitats, AVG(narracions) AS avg_narracions, AVG(expressivitat) AS avg_expressivitat, (AVG(transcripcio)+AVG(identificacio)+AVG(localitzacions)+AVG(activitats)+AVG(narracions)+AVG(expressivitat))/6.0 AS avg_global FROM feedback_ad GROUP BY video_name ORDER BY avg_global DESC, n DESC; """ ) return cur.fetchall() def now_str(): return datetime.utcnow().isoformat(timespec="seconds") + "Z" # Users def create_user(username: str, password_hash: str, role: str): with get_conn() as conn: conn.execute( "INSERT INTO users(username, password_hash, role, created_at) VALUES (?,?,?,?)", (username, password_hash, role, now_str()), ) def get_user(username: str): with get_conn() as conn: cur = conn.execute("SELECT * FROM users WHERE username=?", (username,)) return cur.fetchone() def get_all_users() -> List[Dict[str, Any]]: with get_conn() as conn: cur = conn.execute("SELECT id, username, role FROM users ORDER BY username") return cur.fetchall() def update_user_password(username: str, password_hash: str): with get_conn() as conn: conn.execute( "UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username), )