|
|
import os
|
|
|
import sqlite3
|
|
|
from contextlib import contextmanager
|
|
|
from datetime import datetime
|
|
|
from pathlib import Path
|
|
|
from statistics import median
|
|
|
from typing import Optional, Dict, Any, List, Tuple
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_DB_PATH = None
|
|
|
|
|
|
|
|
|
USE_BLOCKCHAIN_FOR_EVENTS = False
|
|
|
|
|
|
|
|
|
FEEDBACK_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "feedback.db"
|
|
|
|
|
|
|
|
|
CAPTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "captions.db"
|
|
|
|
|
|
|
|
|
VIDEOS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "videos.db"
|
|
|
|
|
|
|
|
|
AUDIODESCRIPTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "audiodescriptions.db"
|
|
|
|
|
|
|
|
|
ACTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "actions.db"
|
|
|
|
|
|
|
|
|
CASTING_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "casting.db"
|
|
|
SCENARIOS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "scenarios.db"
|
|
|
|
|
|
|
|
|
def set_db_path(db_path: str):
|
|
|
global DEFAULT_DB_PATH
|
|
|
DEFAULT_DB_PATH = db_path
|
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
|
|
|
|
|
|
|
def set_blockchain_enabled(enabled: bool) -> None:
|
|
|
"""Activa o desactiva l'ús de blockchain per registrar esdeveniments.
|
|
|
|
|
|
Quan està desactivat (per defecte), els esdeveniments es registren a
|
|
|
demo/temp/events.db. Quan està activat, s'envien a aws_qldb.
|
|
|
"""
|
|
|
|
|
|
global USE_BLOCKCHAIN_FOR_EVENTS
|
|
|
USE_BLOCKCHAIN_FOR_EVENTS = bool(enabled)
|
|
|
|
|
|
|
|
|
def get_connection():
|
|
|
if not DEFAULT_DB_PATH:
|
|
|
raise ValueError("Database path not set. Call set_db_path(path) first.")
|
|
|
return sqlite3.connect(DEFAULT_DB_PATH)
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
def get_conn(db_path: Optional[str] = None):
|
|
|
path = db_path or DEFAULT_DB_PATH
|
|
|
conn = sqlite3.connect(path, check_same_thread=False)
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
try:
|
|
|
yield conn
|
|
|
conn.commit()
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
def init_schema():
|
|
|
with get_conn() as conn:
|
|
|
c = conn.cursor()
|
|
|
|
|
|
c.execute(
|
|
|
"""
|
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
username TEXT UNIQUE NOT NULL,
|
|
|
password_hash TEXT,
|
|
|
role TEXT NOT NULL,
|
|
|
created_at TEXT NOT NULL
|
|
|
);
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
c.execute("PRAGMA table_info(users)")
|
|
|
cols = {row[1] for row in c.fetchall()}
|
|
|
if "password_hash" not in cols:
|
|
|
c.execute("ALTER TABLE users ADD COLUMN password_hash TEXT")
|
|
|
if "role" not in cols:
|
|
|
c.execute("ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'verd'")
|
|
|
if "created_at" not in cols:
|
|
|
c.execute("ALTER TABLE users ADD COLUMN created_at TEXT NOT NULL DEFAULT ''")
|
|
|
except sqlite3.OperationalError:
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
c.execute("ALTER TABLE users DROP COLUMN pw_hash;")
|
|
|
except sqlite3.OperationalError:
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
EVENTS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
with sqlite3.connect(str(EVENTS_DB_PATH)) as econn:
|
|
|
ec = econn.cursor()
|
|
|
ec.execute(
|
|
|
"""
|
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
timestamp TEXT NOT NULL,
|
|
|
session TEXT,
|
|
|
ip TEXT,
|
|
|
user TEXT,
|
|
|
password TEXT,
|
|
|
phone TEXT,
|
|
|
action TEXT NOT NULL,
|
|
|
sha1sum TEXT,
|
|
|
visibility TEXT
|
|
|
);
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
ec.execute("ALTER TABLE events ADD COLUMN visibility TEXT")
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
pass
|
|
|
econn.commit()
|
|
|
|
|
|
|
|
|
c.execute(
|
|
|
"""
|
|
|
CREATE TABLE IF NOT EXISTS feedback_ad (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
video_name TEXT NOT NULL, -- nombre de carpeta dentro de videos/completed
|
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
|
transcripcio INTEGER NOT NULL, -- 1..10
|
|
|
identificacio INTEGER NOT NULL, -- 1..10
|
|
|
localitzacions INTEGER NOT NULL, -- 1..10
|
|
|
activitats INTEGER NOT NULL, -- 1..10
|
|
|
narracions INTEGER NOT NULL, -- 1..10
|
|
|
expressivitat INTEGER NOT NULL, -- 1..10
|
|
|
comments TEXT,
|
|
|
created_at TEXT NOT NULL
|
|
|
);
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
c.execute(
|
|
|
"ALTER TABLE feedback_ad ADD COLUMN expressivitat INTEGER NOT NULL DEFAULT 7;"
|
|
|
)
|
|
|
except sqlite3.OperationalError:
|
|
|
pass
|
|
|
|
|
|
|
|
|
def add_feedback_ad(
|
|
|
video_name: str,
|
|
|
user_id: int,
|
|
|
transcripcio: int,
|
|
|
identificacio: int,
|
|
|
localitzacions: int,
|
|
|
activitats: int,
|
|
|
narracions: int,
|
|
|
expressivitat: int,
|
|
|
comments: str | None,
|
|
|
):
|
|
|
with get_conn() as conn:
|
|
|
conn.execute(
|
|
|
"""INSERT INTO feedback_ad
|
|
|
(video_name, user_id, transcripcio, identificacio, localitzacions, activitats, narracions, expressivitat, comments, created_at)
|
|
|
VALUES (?,?,?,?,?,?,?,?,?,?)""",
|
|
|
(
|
|
|
video_name,
|
|
|
user_id,
|
|
|
transcripcio,
|
|
|
identificacio,
|
|
|
localitzacions,
|
|
|
activitats,
|
|
|
narracions,
|
|
|
expressivitat,
|
|
|
comments,
|
|
|
now_str(),
|
|
|
),
|
|
|
)
|
|
|
|
|
|
|
|
|
def get_feedback_ad_for_video(video_name: str):
|
|
|
with get_conn() as conn:
|
|
|
cur = conn.execute(
|
|
|
"""SELECT * FROM feedback_ad WHERE video_name=? ORDER BY created_at DESC""",
|
|
|
(video_name,),
|
|
|
)
|
|
|
return cur.fetchall()
|
|
|
|
|
|
|
|
|
def get_accessible_videos_for_session(session_id: str | None) -> List[str]:
|
|
|
"""Retorna els noms de vídeo accessibles per a una sessió.
|
|
|
|
|
|
Regles:
|
|
|
- Sempre inclou vídeos amb visibility='public' a videos.db.
|
|
|
- Afegeix vídeos per als quals el camp owner coincideix amb algun phone
|
|
|
registrat a events.db per a la mateixa session.
|
|
|
|
|
|
Args:
|
|
|
session_id: Identificador de sessió (st.session_state.session_id).
|
|
|
"""
|
|
|
|
|
|
|
|
|
public_videos: set[str] = set()
|
|
|
with _connect_videos_db() as vconn:
|
|
|
try:
|
|
|
for row in vconn.execute(
|
|
|
"SELECT DISTINCT video_name FROM videos WHERE visibility = 'public'"
|
|
|
):
|
|
|
public_videos.add(row["video_name"])
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
return []
|
|
|
|
|
|
if not session_id:
|
|
|
return sorted(public_videos)
|
|
|
|
|
|
|
|
|
phones: set[str] = set()
|
|
|
with _connect_actions_db() as aconn:
|
|
|
for row in aconn.execute(
|
|
|
"SELECT DISTINCT phone FROM actions WHERE session = ? AND phone IS NOT NULL AND phone != ''",
|
|
|
(session_id,),
|
|
|
):
|
|
|
phones.add(row["phone"])
|
|
|
|
|
|
if not phones:
|
|
|
return sorted(public_videos)
|
|
|
|
|
|
|
|
|
owner_videos: set[str] = set()
|
|
|
with _connect_videos_db() as vconn:
|
|
|
q_marks = ",".join("?" for _ in phones)
|
|
|
params: Tuple[Any, ...] = tuple(phones)
|
|
|
query = (
|
|
|
f"SELECT DISTINCT video_name FROM videos WHERE owner IN ({q_marks})"
|
|
|
)
|
|
|
for row in vconn.execute(query, params):
|
|
|
owner_videos.add(row["video_name"])
|
|
|
|
|
|
all_videos = public_videos | owner_videos
|
|
|
return sorted(all_videos)
|
|
|
|
|
|
|
|
|
def get_accessible_videos_with_sha1(session_id: str | None) -> List[Dict[str, Any]]:
|
|
|
"""Retorna vídeos accessibles amb el seu sha1sum des de demo/temp/videos.db.
|
|
|
|
|
|
Aplica les mateixes regles que get_accessible_videos_for_session, però
|
|
|
retorna diccionaris amb almenys les claus: video_name i sha1sum.
|
|
|
"""
|
|
|
|
|
|
|
|
|
public_rows: Dict[str, Dict[str, Any]] = {}
|
|
|
with _connect_videos_db() as vconn:
|
|
|
try:
|
|
|
for row in vconn.execute(
|
|
|
"SELECT video_name, sha1sum FROM videos WHERE visibility = 'public'"
|
|
|
):
|
|
|
key = row["video_name"] or row["sha1sum"]
|
|
|
public_rows[key] = {"video_name": row["video_name"], "sha1sum": row["sha1sum"]}
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
return []
|
|
|
|
|
|
if not session_id:
|
|
|
|
|
|
return sorted(public_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"]))
|
|
|
|
|
|
|
|
|
phones: set[str] = set()
|
|
|
with _connect_actions_db() as aconn:
|
|
|
for row in aconn.execute(
|
|
|
"SELECT DISTINCT phone FROM actions WHERE session = ? AND phone IS NOT NULL AND phone != ''",
|
|
|
(session_id,),
|
|
|
):
|
|
|
phones.add(row["phone"])
|
|
|
|
|
|
if not phones:
|
|
|
return sorted(public_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"]))
|
|
|
|
|
|
|
|
|
owner_rows: Dict[str, Dict[str, Any]] = {}
|
|
|
with _connect_videos_db() as vconn:
|
|
|
q_marks = ",".join("?" for _ in phones)
|
|
|
params: Tuple[Any, ...] = tuple(phones)
|
|
|
query = f"SELECT video_name, sha1sum FROM videos WHERE owner IN ({q_marks})"
|
|
|
for row in vconn.execute(query, params):
|
|
|
key = row["video_name"] or row["sha1sum"]
|
|
|
owner_rows[key] = {"video_name": row["video_name"], "sha1sum": row["sha1sum"]}
|
|
|
|
|
|
all_rows: Dict[str, Dict[str, Any]] = {**public_rows, **owner_rows}
|
|
|
return sorted(all_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"]))
|
|
|
|
|
|
|
|
|
def _connect_feedback_db() -> sqlite3.Connection:
|
|
|
"""Connexió directa a demo/data/feedback.db.
|
|
|
|
|
|
És independent de DEFAULT_DB_PATH perquè aquesta BD és específica de feedback
|
|
|
agregat importat des de engine.
|
|
|
"""
|
|
|
|
|
|
FEEDBACK_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
conn = sqlite3.connect(str(FEEDBACK_DB_PATH))
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
return conn
|
|
|
|
|
|
|
|
|
def _connect_actions_db() -> sqlite3.Connection:
|
|
|
ACTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
conn = sqlite3.connect(str(ACTIONS_DB_PATH))
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
return conn
|
|
|
|
|
|
|
|
|
def get_latest_user_phone_for_session(session_id: str) -> Tuple[str, str]:
|
|
|
if not session_id:
|
|
|
return "", ""
|
|
|
|
|
|
try:
|
|
|
with _connect_actions_db() as conn:
|
|
|
cur = conn.execute(
|
|
|
"SELECT user, phone FROM actions "
|
|
|
"WHERE session = ? AND (user IS NOT NULL OR phone IS NOT NULL) "
|
|
|
"ORDER BY id DESC LIMIT 1",
|
|
|
(session_id,),
|
|
|
)
|
|
|
row = cur.fetchone()
|
|
|
if not row:
|
|
|
return "", ""
|
|
|
u = row["user"] if row["user"] is not None else ""
|
|
|
p = row["phone"] if row["phone"] is not None else ""
|
|
|
return str(u), str(p)
|
|
|
except sqlite3.OperationalError:
|
|
|
return "", ""
|
|
|
|
|
|
|
|
|
def insert_action(
|
|
|
*,
|
|
|
session: str,
|
|
|
user: str,
|
|
|
phone: str,
|
|
|
action: str,
|
|
|
sha1sum: str,
|
|
|
timestamp: Optional[str] = None,
|
|
|
) -> None:
|
|
|
ts = timestamp or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
try:
|
|
|
with _connect_actions_db() as conn:
|
|
|
cur = conn.cursor()
|
|
|
cur.execute(
|
|
|
"""
|
|
|
CREATE TABLE IF NOT EXISTS actions (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
timestamp TEXT NOT NULL,
|
|
|
action TEXT NOT NULL,
|
|
|
session TEXT,
|
|
|
user TEXT,
|
|
|
phone TEXT,
|
|
|
sha1sum TEXT
|
|
|
);
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
cur.execute(
|
|
|
"""INSERT INTO actions
|
|
|
(timestamp, action, session, user, phone, sha1sum)
|
|
|
VALUES (?,?,?,?,?,?)""",
|
|
|
(ts, action, session or "", user or "", phone or "", sha1sum or ""),
|
|
|
)
|
|
|
except sqlite3.OperationalError:
|
|
|
return
|
|
|
|
|
|
|
|
|
def get_video_owner_by_sha1(sha1sum: str) -> str:
|
|
|
"""Retorna el telèfon (owner) associat a un sha1sum a videos.db, o "".
|
|
|
|
|
|
Cerca a demo/temp/db/videos.db una fila amb aquest sha1sum i retorna el
|
|
|
camp owner si existeix.
|
|
|
"""
|
|
|
|
|
|
if not sha1sum:
|
|
|
return ""
|
|
|
|
|
|
try:
|
|
|
with _connect_videos_db() as conn:
|
|
|
cur = conn.execute(
|
|
|
"SELECT owner FROM videos WHERE sha1sum = ? LIMIT 1",
|
|
|
(sha1sum,),
|
|
|
)
|
|
|
row = cur.fetchone()
|
|
|
if not row:
|
|
|
return ""
|
|
|
owner = row["owner"] if "owner" in row.keys() else None
|
|
|
return str(owner or "")
|
|
|
except sqlite3.OperationalError:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
def is_video_input_ok(sha1sum: str) -> bool:
|
|
|
"""Retorna True si el vídeo té status='input-OK' a videos.db per a aquest sha1sum."""
|
|
|
|
|
|
if not sha1sum:
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
with _connect_videos_db() as conn:
|
|
|
cur = conn.execute(
|
|
|
"SELECT status FROM videos WHERE sha1sum = ? LIMIT 1",
|
|
|
(sha1sum,),
|
|
|
)
|
|
|
row = cur.fetchone()
|
|
|
if not row:
|
|
|
return False
|
|
|
status = row["status"] if "status" in row.keys() else None
|
|
|
return str(status or "").strip().lower() == "input-ok"
|
|
|
except sqlite3.OperationalError:
|
|
|
return False
|
|
|
|
|
|
|
|
|
def ensure_video_row_for_upload(
|
|
|
*,
|
|
|
sha1sum: str,
|
|
|
video_name: str,
|
|
|
owner_phone: str,
|
|
|
status: str = "input-pending",
|
|
|
visibility: str | None = None,
|
|
|
) -> None:
|
|
|
if not sha1sum:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
with _connect_videos_db() as conn:
|
|
|
cur = conn.cursor()
|
|
|
|
|
|
try:
|
|
|
cur.execute("PRAGMA table_info(videos)")
|
|
|
cols = {row[1] for row in cur.fetchall()}
|
|
|
except sqlite3.OperationalError:
|
|
|
return
|
|
|
|
|
|
alter_stmts = []
|
|
|
if "owner" not in cols:
|
|
|
alter_stmts.append("ALTER TABLE videos ADD COLUMN owner TEXT")
|
|
|
if "status" not in cols:
|
|
|
alter_stmts.append("ALTER TABLE videos ADD COLUMN status TEXT")
|
|
|
if "sha1sum" not in cols:
|
|
|
alter_stmts.append("ALTER TABLE videos ADD COLUMN sha1sum TEXT")
|
|
|
if "visibility" not in cols:
|
|
|
alter_stmts.append("ALTER TABLE videos ADD COLUMN visibility TEXT")
|
|
|
|
|
|
for stmt in alter_stmts:
|
|
|
try:
|
|
|
cur.execute(stmt)
|
|
|
except sqlite3.OperationalError:
|
|
|
continue
|
|
|
|
|
|
row = cur.execute(
|
|
|
"SELECT id FROM videos WHERE sha1sum = ? LIMIT 1",
|
|
|
(sha1sum,),
|
|
|
).fetchone()
|
|
|
if row is not None:
|
|
|
return
|
|
|
|
|
|
vis = visibility or "private"
|
|
|
cur.execute(
|
|
|
"INSERT INTO videos (video_name, owner, visibility, sha1sum, status) "
|
|
|
"VALUES (?,?,?,?,?)",
|
|
|
(video_name or sha1sum, owner_phone or "", vis, sha1sum, status),
|
|
|
)
|
|
|
except sqlite3.OperationalError:
|
|
|
return
|
|
|
|
|
|
|
|
|
def update_video_status(sha1sum: str, status: str) -> None:
|
|
|
"""Actualitza el camp status d'un vídeo existent a videos.db per sha1sum.
|
|
|
|
|
|
Si la taula o el registre no existeixen, no fa res.
|
|
|
"""
|
|
|
|
|
|
if not sha1sum:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
with _connect_videos_db() as conn:
|
|
|
cur = conn.cursor()
|
|
|
|
|
|
try:
|
|
|
cur.execute("PRAGMA table_info(videos)")
|
|
|
cols = {row[1] for row in cur.fetchall()}
|
|
|
except sqlite3.OperationalError:
|
|
|
return
|
|
|
|
|
|
|
|
|
alter_stmts: list[str] = []
|
|
|
if "status" not in cols:
|
|
|
alter_stmts.append("ALTER TABLE videos ADD COLUMN status TEXT")
|
|
|
if "sha1sum" not in cols:
|
|
|
alter_stmts.append("ALTER TABLE videos ADD COLUMN sha1sum TEXT")
|
|
|
|
|
|
for stmt in alter_stmts:
|
|
|
try:
|
|
|
cur.execute(stmt)
|
|
|
except sqlite3.OperationalError:
|
|
|
continue
|
|
|
|
|
|
cur.execute(
|
|
|
"UPDATE videos SET status = ? WHERE sha1sum = ?",
|
|
|
(status, sha1sum),
|
|
|
)
|
|
|
except sqlite3.OperationalError:
|
|
|
return
|
|
|
|
|
|
|
|
|
def get_videos_by_status(status: str) -> List[Dict[str, Any]]:
|
|
|
"""Retorna llista de vídeos a videos.db amb un status concret.
|
|
|
|
|
|
Cada element és un dict amb com a mínim: sha1sum, video_name.
|
|
|
Si la taula o les columnes no existeixen, retorna [].
|
|
|
"""
|
|
|
|
|
|
if not status:
|
|
|
return []
|
|
|
|
|
|
try:
|
|
|
with _connect_videos_db() as conn:
|
|
|
cur = conn.cursor()
|
|
|
try:
|
|
|
cur.execute("PRAGMA table_info(videos)")
|
|
|
cols = {row[1] for row in cur.fetchall()}
|
|
|
except sqlite3.OperationalError:
|
|
|
return []
|
|
|
|
|
|
if "status" not in cols or "sha1sum" not in cols:
|
|
|
return []
|
|
|
|
|
|
|
|
|
has_video_name = "video_name" in cols
|
|
|
select_sql = (
|
|
|
"SELECT sha1sum, video_name FROM videos WHERE status = ?"
|
|
|
if has_video_name
|
|
|
else "SELECT sha1sum, sha1sum AS video_name FROM videos WHERE status = ?"
|
|
|
)
|
|
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
|
for row in cur.execute(select_sql, (status,)):
|
|
|
sha1 = str(row[0]) if row[0] is not None else ""
|
|
|
vname = str(row[1]) if row[1] is not None else sha1
|
|
|
if not sha1:
|
|
|
continue
|
|
|
results.append({"sha1sum": sha1, "video_name": vname})
|
|
|
return results
|
|
|
except sqlite3.OperationalError:
|
|
|
return []
|
|
|
|
|
|
|
|
|
def _connect_audiodescriptions_db() -> sqlite3.Connection:
|
|
|
"""Connexió directa a demo/temp/audiodescriptions.db.
|
|
|
|
|
|
Es fa servir per llegir i actualitzar AD (une_ad, free_ad, eval, mètriques, etc.).
|
|
|
"""
|
|
|
|
|
|
AUDIODESCRIPTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
conn = sqlite3.connect(str(AUDIODESCRIPTIONS_DB_PATH))
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
return conn
|
|
|
|
|
|
|
|
|
def get_audiodescription(sha1sum: str, version: str) -> Optional[sqlite3.Row]:
|
|
|
"""Retorna una fila d'audiodescriptions per sha1sum i version, o None.
|
|
|
|
|
|
Si la taula no existeix, retorna None silenciosament.
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
with _connect_audiodescriptions_db() as conn:
|
|
|
cur = conn.execute(
|
|
|
"SELECT * FROM audiodescriptions WHERE sha1sum=? AND version=?",
|
|
|
(sha1sum, version),
|
|
|
)
|
|
|
return cur.fetchone()
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
def get_videos_from_audiodescriptions() -> List[Dict[str, Any]]:
|
|
|
"""Retorna vídeos disponibles segons audiodescriptions.db i videos.db.
|
|
|
|
|
|
1) Llegeix demo/temp/db/audiodescriptions.db i obté els DISTINCT sha1sum.
|
|
|
2) Per a cada sha1sum, mira a demo/temp/db/videos.db quin és el video_name.
|
|
|
Si no el troba, fa servir el sha1sum com a fallback.
|
|
|
"""
|
|
|
|
|
|
sha1_list: List[str] = []
|
|
|
try:
|
|
|
with _connect_audiodescriptions_db() as conn:
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("SELECT DISTINCT sha1sum FROM audiodescriptions")
|
|
|
rows = cur.fetchall()
|
|
|
sha1_list = [str(r["sha1sum"]) for r in rows if r["sha1sum"]]
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
return []
|
|
|
|
|
|
if not sha1_list:
|
|
|
return []
|
|
|
|
|
|
|
|
|
result: List[Dict[str, Any]] = []
|
|
|
try:
|
|
|
with _connect_videos_db() as vconn:
|
|
|
vcur = vconn.cursor()
|
|
|
for sha1 in sha1_list:
|
|
|
try:
|
|
|
row = vcur.execute(
|
|
|
"SELECT video_name FROM videos WHERE sha1sum = ? LIMIT 1",
|
|
|
(sha1,),
|
|
|
).fetchone()
|
|
|
if row is not None and row["video_name"]:
|
|
|
vname = str(row["video_name"])
|
|
|
else:
|
|
|
|
|
|
continue
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
if vname == sha1:
|
|
|
continue
|
|
|
|
|
|
result.append({"sha1sum": sha1, "video_name": vname})
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
result = []
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
def get_audiodescription_history(sha1sum: str, version: str) -> list[sqlite3.Row]:
|
|
|
"""Retorna només la darrera fila d'audiodescriptions per sha1sum+version.
|
|
|
|
|
|
Abans es feien servir múltiples files per representar diferents etapes
|
|
|
(original, HITL OK, HITL Test). Ara aquests estats viuen en diferents
|
|
|
columnes d'un únic registre, de manera que només ens interessa l'última
|
|
|
versió desada per a (sha1sum, version).
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
sql = (
|
|
|
"SELECT * FROM audiodescriptions "
|
|
|
"WHERE sha1sum = ? AND LOWER(version) = LOWER(?) "
|
|
|
"ORDER BY rowid DESC "
|
|
|
"LIMIT 1"
|
|
|
)
|
|
|
|
|
|
with _connect_audiodescriptions_db() as conn:
|
|
|
row = conn.execute(sql, (sha1sum, version)).fetchone()
|
|
|
return [row] if row is not None else []
|
|
|
except sqlite3.OperationalError:
|
|
|
return []
|
|
|
|
|
|
|
|
|
def update_audiodescription_text(
|
|
|
sha1sum: str,
|
|
|
version: str,
|
|
|
*,
|
|
|
free_ad: Optional[str] = None,
|
|
|
une_ad: Optional[str] = None,
|
|
|
ok_free_ad: Optional[str] = None,
|
|
|
ok_une_ad: Optional[str] = None,
|
|
|
test_free_ad: Optional[str] = None,
|
|
|
test_une_ad: Optional[str] = None,
|
|
|
) -> None:
|
|
|
"""Actualitza camps de text per a una AD existent.
|
|
|
|
|
|
Si no existeix el registre, no fa res (per simplicitat).
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
with _connect_audiodescriptions_db() as conn:
|
|
|
row = conn.execute(
|
|
|
"SELECT 1 FROM audiodescriptions WHERE sha1sum=? AND version=?",
|
|
|
(sha1sum, version),
|
|
|
).fetchone()
|
|
|
if not row:
|
|
|
return
|
|
|
|
|
|
fields = []
|
|
|
values: List[Any] = []
|
|
|
|
|
|
|
|
|
if free_ad is not None:
|
|
|
fields.append("free_ad = ?")
|
|
|
values.append(free_ad)
|
|
|
if une_ad is not None:
|
|
|
fields.append("une_ad = ?")
|
|
|
values.append(une_ad)
|
|
|
|
|
|
|
|
|
if ok_free_ad is not None:
|
|
|
fields.append("ok_free_ad = ?")
|
|
|
values.append(ok_free_ad)
|
|
|
if ok_une_ad is not None:
|
|
|
fields.append("ok_une_ad = ?")
|
|
|
values.append(ok_une_ad)
|
|
|
if test_free_ad is not None:
|
|
|
fields.append("test_free_ad = ?")
|
|
|
values.append(test_free_ad)
|
|
|
if test_une_ad is not None:
|
|
|
fields.append("test_une_ad = ?")
|
|
|
values.append(test_une_ad)
|
|
|
|
|
|
if not fields:
|
|
|
return
|
|
|
|
|
|
values.extend([sha1sum, version])
|
|
|
sql = f"UPDATE audiodescriptions SET {', '.join(fields)} WHERE sha1sum=? AND version=?"
|
|
|
conn.execute(sql, tuple(values))
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
def update_audiodescription_info_ad(
|
|
|
sha1sum: str,
|
|
|
version: str,
|
|
|
info_ad: str,
|
|
|
) -> None:
|
|
|
"""Actualitza el camp info_ad per a una AD existent.
|
|
|
|
|
|
Si la taula o el registre no existeixen, no fa res.
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
with _connect_audiodescriptions_db() as conn:
|
|
|
row = conn.execute(
|
|
|
"SELECT 1 FROM audiodescriptions WHERE sha1sum=? AND version=?",
|
|
|
(sha1sum, version),
|
|
|
).fetchone()
|
|
|
if not row:
|
|
|
return
|
|
|
|
|
|
conn.execute(
|
|
|
"UPDATE audiodescriptions SET info_ad = ? WHERE sha1sum=? AND version=?",
|
|
|
(info_ad, sha1sum, version),
|
|
|
)
|
|
|
except sqlite3.OperationalError:
|
|
|
return
|
|
|
|
|
|
|
|
|
def upsert_audiodescription_text(
|
|
|
sha1sum: str,
|
|
|
version: str,
|
|
|
*,
|
|
|
free_ad: Optional[str] = None,
|
|
|
une_ad: Optional[str] = None,
|
|
|
) -> None:
|
|
|
"""Crea o actualitza un registre d'audiodescripció per sha1sum+version.
|
|
|
|
|
|
- Si la taula no existeix, es crea amb un esquema bàsic.
|
|
|
- Si no hi ha registre per (sha1sum, version), s'insereix.
|
|
|
- Si ja existeix, s'actualitzen els camps de text.
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
with _connect_audiodescriptions_db() as conn:
|
|
|
cur = conn.cursor()
|
|
|
|
|
|
cur.execute(
|
|
|
"""
|
|
|
CREATE TABLE IF NOT EXISTS audiodescriptions (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
sha1sum TEXT NOT NULL,
|
|
|
version TEXT NOT NULL,
|
|
|
une_ad TEXT,
|
|
|
free_ad TEXT,
|
|
|
created_at TEXT,
|
|
|
updated_at TEXT
|
|
|
);
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
row = cur.execute(
|
|
|
"SELECT id FROM audiodescriptions WHERE sha1sum=? AND version=?",
|
|
|
(sha1sum, version),
|
|
|
).fetchone()
|
|
|
|
|
|
now = now_str()
|
|
|
if row is None:
|
|
|
cur.execute(
|
|
|
"""
|
|
|
INSERT INTO audiodescriptions
|
|
|
(sha1sum, version, une_ad, free_ad, created_at, updated_at)
|
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(sha1sum, version, une_ad or "", free_ad or "", now, now),
|
|
|
)
|
|
|
else:
|
|
|
fields = []
|
|
|
values: list[Any] = []
|
|
|
if une_ad is not None:
|
|
|
fields.append("une_ad = ?")
|
|
|
values.append(une_ad)
|
|
|
if free_ad is not None:
|
|
|
fields.append("free_ad = ?")
|
|
|
values.append(free_ad)
|
|
|
|
|
|
if not fields:
|
|
|
return
|
|
|
|
|
|
fields.append("updated_at = ?")
|
|
|
values.append(now)
|
|
|
values.extend([sha1sum, version])
|
|
|
|
|
|
sql = f"UPDATE audiodescriptions SET {', '.join(fields)} WHERE sha1sum=? AND version=?"
|
|
|
cur.execute(sql, tuple(values))
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
def _connect_captions_db() -> sqlite3.Connection:
|
|
|
"""Connexió a demo/data/captions.db i creació de la taula si cal.
|
|
|
|
|
|
Estructura:
|
|
|
- variable TEXT PRIMARY KEY (p.ex. "score_1")
|
|
|
- caption TEXT (etiqueta humana)
|
|
|
"""
|
|
|
|
|
|
CAPTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
conn = sqlite3.connect(str(CAPTIONS_DB_PATH))
|
|
|
cur = conn.cursor()
|
|
|
cur.execute(
|
|
|
"""
|
|
|
CREATE TABLE IF NOT EXISTS captions (
|
|
|
variable TEXT PRIMARY KEY,
|
|
|
caption TEXT NOT NULL
|
|
|
);
|
|
|
"""
|
|
|
)
|
|
|
conn.commit()
|
|
|
return conn
|
|
|
|
|
|
|
|
|
def insert_demo_feedback_row(
|
|
|
*,
|
|
|
user: str,
|
|
|
session: str,
|
|
|
video_name: str,
|
|
|
version: str,
|
|
|
une_ad: str,
|
|
|
free_ad: str,
|
|
|
comments: str | None,
|
|
|
transcripcio: int,
|
|
|
identificacio: int,
|
|
|
localitzacions: int,
|
|
|
activitats: int,
|
|
|
narracions: int,
|
|
|
expressivitat: int,
|
|
|
) -> None:
|
|
|
"""Insereix una valoració detallada a demo/data/feedback.db.
|
|
|
|
|
|
Escala els sliders de 0-7 a 0-100 i desa els textos d'UNE i narració lliure.
|
|
|
Les columnes de sliders tenen per nom el caption del slider a la UI.
|
|
|
"""
|
|
|
|
|
|
|
|
|
def scale(v: int) -> int:
|
|
|
v = max(0, min(7, int(v)))
|
|
|
return int(round(v * 100.0 / 7.0))
|
|
|
|
|
|
slider_values = {
|
|
|
"Precisió Descriptiva": scale(transcripcio),
|
|
|
"Sincronització Temporal": scale(identificacio),
|
|
|
"Claredat i Concisió": scale(localitzacions),
|
|
|
"Inclusió de Diàleg": scale(activitats),
|
|
|
"Contextualització": scale(narracions),
|
|
|
"Flux i Ritme de la Narració": scale(expressivitat),
|
|
|
}
|
|
|
|
|
|
ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
with _connect_feedback_db() as conn:
|
|
|
conn.execute(
|
|
|
"""
|
|
|
INSERT INTO feedback (
|
|
|
timestamp, user, session, video_name, version, une_ad, free_ad, comments,
|
|
|
score_1, score_2, score_3, score_4, score_5, score_6,
|
|
|
"Precisió Descriptiva",
|
|
|
"Sincronització Temporal",
|
|
|
"Claredat i Concisió",
|
|
|
"Inclusió de Diàleg",
|
|
|
"Contextualització",
|
|
|
"Flux i Ritme de la Narració"
|
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
|
|
|
""",
|
|
|
(
|
|
|
ts,
|
|
|
user,
|
|
|
session,
|
|
|
video_name,
|
|
|
version,
|
|
|
une_ad,
|
|
|
free_ad,
|
|
|
comments or "",
|
|
|
slider_values["Precisió Descriptiva"],
|
|
|
slider_values["Sincronització Temporal"],
|
|
|
slider_values["Claredat i Concisió"],
|
|
|
slider_values["Inclusió de Diàleg"],
|
|
|
slider_values["Contextualització"],
|
|
|
slider_values["Flux i Ritme de la Narració"],
|
|
|
slider_values["Precisió Descriptiva"],
|
|
|
slider_values["Sincronització Temporal"],
|
|
|
slider_values["Claredat i Concisió"],
|
|
|
slider_values["Inclusió de Diàleg"],
|
|
|
slider_values["Contextualització"],
|
|
|
slider_values["Flux i Ritme de la Narració"],
|
|
|
),
|
|
|
)
|
|
|
|
|
|
|
|
|
def _connect_videos_db() -> sqlite3.Connection:
|
|
|
"""Connexió directa a demo/temp/videos.db.
|
|
|
|
|
|
Aquesta BD conté metadades dels vídeos (video_name, owner, visibility, sha1sum...).
|
|
|
"""
|
|
|
|
|
|
VIDEOS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
conn = sqlite3.connect(str(VIDEOS_DB_PATH))
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
return conn
|
|
|
|
|
|
|
|
|
def _connect_simple_mapping_db(db_path: Path, table_name: str) -> sqlite3.Connection:
|
|
|
"""Connexió a una BD simple (sha1sum, name, description) a demo/temp/db.
|
|
|
|
|
|
Es fa servir per a casting.db i scenarios.db.
|
|
|
"""
|
|
|
|
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
cur = conn.cursor()
|
|
|
cur.execute(
|
|
|
f"""
|
|
|
CREATE TABLE IF NOT EXISTS {table_name} (
|
|
|
sha1sum TEXT NOT NULL,
|
|
|
name TEXT NOT NULL,
|
|
|
description TEXT
|
|
|
);
|
|
|
"""
|
|
|
)
|
|
|
conn.commit()
|
|
|
return conn
|
|
|
|
|
|
|
|
|
def _connect_casting_db() -> sqlite3.Connection:
|
|
|
"""Connexió directa a demo/temp/db/casting.db (taula casting)."""
|
|
|
|
|
|
return _connect_simple_mapping_db(CASTING_DB_PATH, "casting")
|
|
|
|
|
|
|
|
|
def _connect_scenarios_db() -> sqlite3.Connection:
|
|
|
"""Connexió directa a demo/temp/db/scenarios.db (taula scenarios)."""
|
|
|
|
|
|
return _connect_simple_mapping_db(SCENARIOS_DB_PATH, "scenarios")
|
|
|
|
|
|
|
|
|
def insert_casting_row(sha1sum: str, name: str, description: str | None = None) -> None:
|
|
|
"""Insereix un personatge a casting.db per a un sha1sum donat."""
|
|
|
|
|
|
if not sha1sum or not name:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
with _connect_casting_db() as conn:
|
|
|
conn.execute(
|
|
|
"INSERT INTO casting (sha1sum, name, description) VALUES (?,?,?)",
|
|
|
(sha1sum, name, description or ""),
|
|
|
)
|
|
|
except sqlite3.OperationalError:
|
|
|
return
|
|
|
|
|
|
|
|
|
def insert_scenario_row(sha1sum: str, name: str, description: str | None = None) -> None:
|
|
|
"""Insereix un escenari a scenarios.db per a un sha1sum donat."""
|
|
|
|
|
|
if not sha1sum or not name:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
with _connect_scenarios_db() as conn:
|
|
|
conn.execute(
|
|
|
"INSERT INTO scenarios (sha1sum, name, description) VALUES (?,?,?)",
|
|
|
(sha1sum, name, description or ""),
|
|
|
)
|
|
|
except sqlite3.OperationalError:
|
|
|
return
|
|
|
|
|
|
|
|
|
def log_action(
|
|
|
*,
|
|
|
session: str,
|
|
|
user: str,
|
|
|
phone: str,
|
|
|
action: str,
|
|
|
sha1sum: str,
|
|
|
timestamp: Optional[str] = None,
|
|
|
) -> None:
|
|
|
"""Insereix un registre a demo/temp/actions.db (taula actions)."""
|
|
|
|
|
|
ts = timestamp or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
insert_action(
|
|
|
session=session or "",
|
|
|
user=user or "",
|
|
|
phone=phone or "",
|
|
|
action=action,
|
|
|
sha1sum=sha1sum or "",
|
|
|
timestamp=ts,
|
|
|
)
|
|
|
|
|
|
|
|
|
def has_video_approval_action(sha1sum: str) -> bool:
|
|
|
"""Comprova si existeix una acció d'acceptació d'input per a un sha1sum.
|
|
|
|
|
|
Busca a demo/temp/actions.db una fila amb action='input-OK' i el sha1sum
|
|
|
especificat.
|
|
|
"""
|
|
|
|
|
|
if not sha1sum:
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
with _connect_actions_db() as conn:
|
|
|
cur = conn.execute(
|
|
|
"SELECT 1 FROM actions WHERE action = ? AND sha1sum = ? LIMIT 1",
|
|
|
("input-OK", sha1sum),
|
|
|
)
|
|
|
return cur.fetchone() is not None
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
def get_feedback_video_stats(agg: str = "mitjana") -> List[Dict[str, Any]]:
|
|
|
"""Retorna estadístiques agregades per vídeo de demo/data/feedback.db.
|
|
|
|
|
|
Es basa exclusivament en les columnes numèriques score_1..score_6 (0-100).
|
|
|
|
|
|
agg pot ser:
|
|
|
- "mitjana": mitjana dels scores per vídeo.
|
|
|
- "mediana": mediana dels scores per vídeo.
|
|
|
- "inicial": primer registre (per timestamp) per vídeo.
|
|
|
- "actual": darrer registre (per timestamp) per vídeo.
|
|
|
"""
|
|
|
|
|
|
agg = (agg or "mitjana").lower()
|
|
|
with _connect_feedback_db() as conn:
|
|
|
cur = conn.execute(
|
|
|
"""
|
|
|
SELECT
|
|
|
video_name,
|
|
|
timestamp,
|
|
|
score_1,
|
|
|
score_2,
|
|
|
score_3,
|
|
|
score_4,
|
|
|
score_5,
|
|
|
score_6
|
|
|
FROM feedback
|
|
|
"""
|
|
|
)
|
|
|
rows = cur.fetchall()
|
|
|
|
|
|
by_video: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
for row in rows:
|
|
|
vn = row["video_name"]
|
|
|
parsed_scores = [
|
|
|
row["score_1"],
|
|
|
row["score_2"],
|
|
|
row["score_3"],
|
|
|
row["score_4"],
|
|
|
row["score_5"],
|
|
|
row["score_6"],
|
|
|
]
|
|
|
enriched = {
|
|
|
"video_name": vn,
|
|
|
"timestamp": row["timestamp"],
|
|
|
"scores": parsed_scores,
|
|
|
}
|
|
|
by_video.setdefault(vn, []).append(enriched)
|
|
|
|
|
|
def parse_ts(ts: str) -> datetime:
|
|
|
|
|
|
try:
|
|
|
return datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
|
|
|
except Exception:
|
|
|
return datetime.min
|
|
|
|
|
|
result: List[Dict[str, Any]] = []
|
|
|
for video_name, vrows in by_video.items():
|
|
|
if not vrows:
|
|
|
continue
|
|
|
|
|
|
|
|
|
vrows_sorted = sorted(vrows, key=lambda r: parse_ts(r["timestamp"]))
|
|
|
|
|
|
def agg_index(idx: int) -> Optional[float]:
|
|
|
vals = [r["scores"][idx] for r in vrows if r["scores"][idx] is not None]
|
|
|
if not vals:
|
|
|
return None
|
|
|
if agg == "mitjana":
|
|
|
return float(sum(vals) / len(vals))
|
|
|
if agg == "mediana":
|
|
|
return float(median(vals))
|
|
|
if agg == "inicial":
|
|
|
first = vrows_sorted[0]["scores"][idx]
|
|
|
return float(first) if first is not None else None
|
|
|
if agg == "actual":
|
|
|
last = vrows_sorted[-1]["scores"][idx]
|
|
|
return float(last) if last is not None else None
|
|
|
|
|
|
return float(sum(vals) / len(vals))
|
|
|
|
|
|
row_out: Dict[str, Any] = {
|
|
|
"video_name": video_name,
|
|
|
"n": len(vrows),
|
|
|
}
|
|
|
for i in range(6):
|
|
|
row_out[f"score_{i+1}"] = agg_index(i)
|
|
|
|
|
|
result.append(row_out)
|
|
|
|
|
|
|
|
|
result.sort(key=lambda r: r["video_name"])
|
|
|
return result
|
|
|
|
|
|
|
|
|
def _init_captions_from_eval() -> None:
|
|
|
"""Inicialitza captions.db agafant etiquetes des d'un eval.csv.
|
|
|
|
|
|
Per simplicitat, intentem llegir `demo/data/media/parella/MoE/eval.csv`.
|
|
|
Si no existeix o falla, es deixen etiquetes per defecte.
|
|
|
"""
|
|
|
|
|
|
base_demo = Path(__file__).resolve().parent
|
|
|
eval_path = base_demo / "data" / "media" / "parella" / "MoE" / "eval.csv"
|
|
|
|
|
|
default_labels = [f"score_{i}" for i in range(1, 7)]
|
|
|
labels = default_labels[:]
|
|
|
|
|
|
if eval_path.exists():
|
|
|
try:
|
|
|
import csv
|
|
|
|
|
|
with eval_path.open("r", encoding="utf-8") as f:
|
|
|
reader = csv.DictReader(f)
|
|
|
tmp: List[str] = []
|
|
|
for row in reader:
|
|
|
if len(tmp) >= 6:
|
|
|
break
|
|
|
name = (row.get("Caracteristica") or "").strip().strip('"')
|
|
|
if name:
|
|
|
tmp.append(name)
|
|
|
if tmp:
|
|
|
labels = tmp
|
|
|
while len(labels) < 6:
|
|
|
labels.append(default_labels[len(labels)])
|
|
|
labels = labels[:6]
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
with _connect_captions_db() as conn:
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("DELETE FROM captions")
|
|
|
for i in range(6):
|
|
|
cur.execute(
|
|
|
"INSERT OR REPLACE INTO captions (variable, caption) VALUES (?, ?)",
|
|
|
(f"score_{i+1}", labels[i]),
|
|
|
)
|
|
|
|
|
|
|
|
|
def get_feedback_score_labels() -> List[str]:
|
|
|
"""Retorna les etiquetes humanes per a score_1..score_6 des de captions.db.
|
|
|
|
|
|
Si captions.db és buit, s'intenta inicialitzar-lo a partir d'un eval.csv.
|
|
|
"""
|
|
|
|
|
|
default_labels = [f"score_{i}" for i in range(1, 7)]
|
|
|
|
|
|
with _connect_captions_db() as conn:
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("SELECT variable, caption FROM captions ORDER BY variable")
|
|
|
rows = cur.fetchall()
|
|
|
|
|
|
if not rows:
|
|
|
|
|
|
_init_captions_from_eval()
|
|
|
cur.execute("SELECT variable, caption FROM captions ORDER BY variable")
|
|
|
rows = cur.fetchall()
|
|
|
|
|
|
if not rows:
|
|
|
return default_labels
|
|
|
|
|
|
labels: List[str] = []
|
|
|
for _, caption in rows:
|
|
|
labels.append(caption)
|
|
|
|
|
|
while len(labels) < 6:
|
|
|
labels.append(default_labels[len(labels)])
|
|
|
|
|
|
return labels[:6]
|
|
|
|
|
|
|
|
|
def get_feedback_ad_stats():
|
|
|
|
|
|
with get_conn() as conn:
|
|
|
cur = conn.execute(
|
|
|
"""
|
|
|
SELECT
|
|
|
video_name,
|
|
|
COUNT(*) AS n,
|
|
|
AVG(transcripcio) AS avg_transcripcio,
|
|
|
AVG(identificacio) AS avg_identificacio,
|
|
|
AVG(localitzacions) AS avg_localitzacions,
|
|
|
AVG(activitats) AS avg_activitats,
|
|
|
AVG(narracions) AS avg_narracions,
|
|
|
AVG(expressivitat) AS avg_expressivitat,
|
|
|
(AVG(transcripcio)+AVG(identificacio)+AVG(localitzacions)+AVG(activitats)+AVG(narracions)+AVG(expressivitat))/6.0 AS avg_global
|
|
|
FROM feedback_ad
|
|
|
GROUP BY video_name
|
|
|
ORDER BY avg_global DESC, n DESC;
|
|
|
"""
|
|
|
)
|
|
|
return cur.fetchall()
|
|
|
|
|
|
|
|
|
def now_str():
|
|
|
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_user(username: str, password_hash: str, role: str):
|
|
|
with get_conn() as conn:
|
|
|
conn.execute(
|
|
|
"INSERT INTO users(username, password_hash, role, created_at) VALUES (?,?,?,?)",
|
|
|
(username, password_hash, role, now_str()),
|
|
|
)
|
|
|
|
|
|
|
|
|
def get_user(username: str):
|
|
|
with get_conn() as conn:
|
|
|
cur = conn.execute("SELECT * FROM users WHERE username=?", (username,))
|
|
|
return cur.fetchone()
|
|
|
|
|
|
|
|
|
def get_all_users() -> List[Dict[str, Any]]:
|
|
|
with get_conn() as conn:
|
|
|
cur = conn.execute("SELECT id, username, role FROM users ORDER BY username")
|
|
|
return cur.fetchall()
|
|
|
|
|
|
|
|
|
def update_user_password(username: str, password_hash: str):
|
|
|
with get_conn() as conn:
|
|
|
conn.execute(
|
|
|
"UPDATE users SET password_hash = ? WHERE username = ?",
|
|
|
(password_hash, username),
|
|
|
)
|
|
|
|