import os import sqlite3 from contextlib import contextmanager from typing import Optional, Dict, Any, List, Tuple from datetime import datetime DEFAULT_DB_PATH = None # set by set_db_path at runtime def set_db_path(db_path: str): global DEFAULT_DB_PATH DEFAULT_DB_PATH = db_path os.makedirs(os.path.dirname(db_path), exist_ok=True) def get_connection(): if not DEFAULT_DB_PATH: raise ValueError("Database path not set. Call set_db_path(path) first.") return sqlite3.connect(DEFAULT_DB_PATH) @contextmanager def get_conn(db_path: Optional[str] = None): path = db_path or DEFAULT_DB_PATH conn = sqlite3.connect(path, check_same_thread=False) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init_schema(): with get_conn() as conn: c = conn.cursor() # (tus tablas existentes) c.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT, role TEXT NOT NULL, created_at TEXT NOT NULL ); """) # Migraciones: asegurar columnas esperadas try: c.execute("PRAGMA table_info(users)") cols = {row[1] for row in c.fetchall()} # set de nombres de columnas if "password_hash" not in cols: c.execute("ALTER TABLE users ADD COLUMN password_hash TEXT") if "role" not in cols: c.execute("ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'verd'") if "created_at" not in cols: c.execute("ALTER TABLE users ADD COLUMN created_at TEXT NOT NULL DEFAULT ''") except sqlite3.OperationalError: pass # Intento de limpieza de columna antigua si existiera (SQLite no permite DROP COLUMN en versiones antiguas) try: c.execute("ALTER TABLE users DROP COLUMN pw_hash;") except sqlite3.OperationalError: pass # (opcional: tus otras tablas) # >>> NUEVA TABLA PARA FEEDBACK DE AD (no depende de videos) c.execute(""" CREATE TABLE IF NOT EXISTS feedback_ad ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_name TEXT NOT NULL, -- nombre de carpeta dentro de videos/completed user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, transcripcio INTEGER NOT NULL, -- 1..10 identificacio INTEGER NOT NULL, -- 1..10 localitzacions INTEGER NOT NULL, -- 1..10 activitats INTEGER NOT NULL, -- 1..10 narracions INTEGER NOT NULL, -- 1..10 expressivitat INTEGER NOT NULL, -- 1..10 comments TEXT, created_at TEXT NOT NULL ); """) # Add column if it doesn't exist, for backwards compatibility try: c.execute("ALTER TABLE feedback_ad ADD COLUMN expressivitat INTEGER NOT NULL DEFAULT 7;") except sqlite3.OperationalError: pass # column already exists def add_feedback_ad(video_name: str, user_id: int, transcripcio: int, identificacio: int, localitzacions: int, activitats: int, narracions: int, expressivitat: int, comments: str | None): with get_conn() as conn: conn.execute( """INSERT INTO feedback_ad (video_name, user_id, transcripcio, identificacio, localitzacions, activitats, narracions, expressivitat, comments, created_at) VALUES (?,?,?,?,?,?,?,?,?,?)""", (video_name, user_id, transcripcio, identificacio, localitzacions, activitats, narracions, expressivitat, comments, now_str()) ) def get_feedback_ad_for_video(video_name: str): with get_conn() as conn: cur = conn.execute( """SELECT * FROM feedback_ad WHERE video_name=? ORDER BY created_at DESC""", (video_name,) ) return cur.fetchall() def get_feedback_ad_stats(): # medias por vĂ­deo y ranking with get_conn() as conn: cur = conn.execute(""" SELECT video_name, COUNT(*) AS n, AVG(transcripcio) AS avg_transcripcio, AVG(identificacio) AS avg_identificacio, AVG(localitzacions) AS avg_localitzacions, AVG(activitats) AS avg_activitats, AVG(narracions) AS avg_narracions, AVG(expressivitat) AS avg_expressivitat, (AVG(transcripcio)+AVG(identificacio)+AVG(localitzacions)+AVG(activitats)+AVG(narracions)+AVG(expressivitat))/6.0 AS avg_global FROM feedback_ad GROUP BY video_name ORDER BY avg_global DESC, n DESC; """) return cur.fetchall() def now_str(): return datetime.utcnow().isoformat(timespec="seconds") + "Z" # Users def create_user(username: str, password_hash: str, role: str): with get_conn() as conn: conn.execute( "INSERT INTO users(username, password_hash, role, created_at) VALUES (?,?,?,?)", (username, password_hash, role, now_str()), ) def get_user(username: str): with get_conn() as conn: cur = conn.execute("SELECT * FROM users WHERE username=?", (username,)) return cur.fetchone() def get_all_users() -> List[Dict[str, Any]]: with get_conn() as conn: cur = conn.execute("SELECT id, username, role FROM users ORDER BY username") return cur.fetchall() def update_user_password(username: str, password_hash: str): with get_conn() as conn: conn.execute( "UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username) ) # Videos def create_video(filename: str, original_name: str, uploader: str, status: str) -> int: with get_conn() as conn: cur = conn.execute( "INSERT INTO videos(filename, original_name, uploader, status, created_at) VALUES (?,?,?,?,?)", (filename, original_name, uploader, status, now_str()), ) return cur.lastrowid def update_video_status(video_id: int, status: str): with get_conn() as conn: conn.execute("UPDATE videos SET status=? WHERE id=?", (status, video_id)) def list_videos(limit: int = 200): with get_conn() as conn: cur = conn.execute( "SELECT * FROM videos ORDER BY id DESC LIMIT ?", (limit,) ) return cur.fetchall() def get_video(video_id: int): with get_conn() as conn: cur = conn.execute("SELECT * FROM videos WHERE id=?", (video_id,)) return cur.fetchone() # Results def upsert_result(video_id: int, mode: str, text_path: str = None, srt_path: str = None, mp3_path: str = None, characters_json: str = None, metrics_json: str = None): with get_conn() as conn: cur = conn.execute( "SELECT id FROM results WHERE video_id=? AND mode=?", (video_id, mode) ) row = cur.fetchone() if row: conn.execute( "UPDATE results SET text_path=?, srt_path=?, mp3_path=?, characters_json=?, metrics_json=? WHERE id=?", (text_path, srt_path, mp3_path, characters_json, metrics_json, row["id"]) ) else: conn.execute( "INSERT INTO results(video_id, mode, text_path, srt_path, mp3_path, characters_json, metrics_json, created_at) VALUES (?,?,?,?,?,?,?,?)", (video_id, mode, text_path, srt_path, mp3_path, characters_json, metrics_json, now_str()) ) def get_results(video_id: int): with get_conn() as conn: cur = conn.execute("SELECT * FROM results WHERE video_id=?", (video_id,)) return cur.fetchall() # Feedback def add_feedback(video_id: int, user_id: int, clarity: int, sync: int, relevance: int, naturalness: int, overall: int, comments: str): with get_conn() as conn: conn.execute( "INSERT INTO feedback(video_id, user_id, clarity, sync, relevance, naturalness, overall, comments, created_at) VALUES (?,?,?,?,?,?,?,?,?)", (video_id, user_id, clarity, sync, relevance, naturalness, overall, comments, now_str()) ) def get_feedback_for_video(video_id: int): with get_conn() as conn: cur = conn.execute("SELECT * FROM feedback WHERE video_id=?", (video_id,)) return cur.fetchall() def get_feedback_stats(): with get_conn() as conn: cur = conn.execute(""" SELECT v.id as video_id, v.original_name as name, COUNT(f.id) as n, AVG(f.overall) as overall_avg, AVG(f.clarity) as clarity_avg, AVG(f.sync) as sync_avg, AVG(f.relevance) as relevance_avg, AVG(f.naturalness) as naturalness_avg FROM videos v LEFT JOIN feedback f ON f.video_id = v.id GROUP BY v.id ORDER BY overall_avg DESC NULLS LAST, n DESC, v.id DESC """) return cur.fetchall() def get_feedback_with_users(): with get_conn() as conn: cur = conn.execute(""" SELECT f.*, u.role as user_role FROM feedback f JOIN users u ON u.id = f.user_id """) return cur.fetchall() def get_feedback_stats_weighted(role_weights: Dict[str, float] = None): role_weights = role_weights or {'verd':1.0,'groc':1.0,'taronja':0.0,'blau':1.5} # Aggregate in Python to apply weights with get_conn() as conn: # Bring minimal info to Python for weighting cur = conn.execute(""" SELECT v.id as video_id, v.original_name as name FROM videos v """) videos = cur.fetchall() # Pull feedback + roles cur = conn.execute(""" SELECT f.video_id, f.clarity, f.sync, f.relevance, f.naturalness, f.overall, u.role as user_role FROM feedback f JOIN users u ON u.id = f.user_id """) fb = cur.fetchall() by_vid = {v["video_id"]: {"name": v["name"], "n":0, "w_sum":0.0, "overall":0.0,"clarity":0.0,"sync":0.0,"relevance":0.0,"naturalness":0.0} for v in videos} for r in fb: w = role_weights.get(r["user_role"], 0.0) if r["video_id"] not in by_vid: # safety continue by_vid[r["video_id"]]["n"] += 1 by_vid[r["video_id"]]["w_sum"] += w by_vid[r["video_id"]]["overall"] += w * r["overall"] by_vid[r["video_id"]]["clarity"] += w * r["clarity"] by_vid[r["video_id"]]["sync"] += w * r["sync"] by_vid[r["video_id"]]["relevance"] += w * r["relevance"] by_vid[r["video_id"]]["naturalness"] += w * r["naturalness"] # Build result rows with weighted averages rows = [] for vid, agg in by_vid.items(): w = agg["w_sum"] if agg["w_sum"]>0 else None def avg(val): return (val / w) if w else None rows.append({ "video_id": vid, "name": agg["name"], "n_ratings": agg["n"], "w_count": agg["w_sum"], "overall_avg_w": avg(agg["overall"]), "clarity_avg_w": avg(agg["clarity"]), "sync_avg_w": avg(agg["sync"]), "relevance_avg_w": avg(agg["relevance"]), "naturalness_avg_w": avg(agg["naturalness"]), }) # Sort: by weighted overall desc, then n_ratings desc rows.sort(key=lambda r: (r["overall_avg_w"] is None, -(r["overall_avg_w"] or 0), -r["n_ratings"], -r["w_count"])) return rows