from __future__ import annotations from pathlib import Path import sqlite3 from typing import Iterable, Optional import json BASE_DEMO = Path(__file__).resolve().parent.parent MEDIA_ROOT = BASE_DEMO / "data" / "media" DB_PATHS = [ BASE_DEMO / "data" / "audiodescriptions.db", BASE_DEMO / "temp" / "audiodescriptions.db", ] def read_preprocess_json(sha1sum: str) -> Optional[str]: """Lee el fichero preprocess.json para un sha1sum dado. Devuelve el contenido en texto plano (UTF-8) o None si no existe o hay errores de lectura. """ video_dir = MEDIA_ROOT / sha1sum path = video_dir / "preprocess.json" if not path.exists(): return None try: # Leemos tal cual el JSON para que quede guardado como texto return path.read_text(encoding="utf-8") except Exception: try: return path.read_text(errors="ignore") except Exception: return None def ensure_json_ad_column(conn: sqlite3.Connection) -> None: """Añade la columna info_ad si no existe todavía en audiodescriptions. (Nom antic json_ad, mantingut per compatibilitat del nom de la funció.) """ cur = conn.cursor() cur.execute("PRAGMA table_info(audiodescriptions);") cols = [row[1] for row in cur.fetchall()] target_col = "info_ad" if target_col not in cols: cur.execute(f"ALTER TABLE audiodescriptions ADD COLUMN {target_col} TEXT;") conn.commit() def update_db(path: Path) -> None: """Actualiza una base de datos audiodescriptions.db añadiendo info_ad. - Asegura que existe la columna info_ad. - Para cada sha1sum presente en la tabla, intenta leer demo/data/media//preprocess.json i guarda el seu contingut a info_ad. """ if not path.exists(): print(f"[INFO] {path} no existe, se omite.") return print(f"[INFO] Actualitzant {path} ...") conn = sqlite3.connect(str(path)) conn.row_factory = sqlite3.Row try: ensure_json_ad_column(conn) cur = conn.cursor() # Obtener todos los sha1sum distintos presentes en la tabla cur.execute("SELECT DISTINCT sha1sum FROM audiodescriptions;") rows = cur.fetchall() total = len(rows) print(f" - {total} sha1sum diferents trobats.") updated_rows = 0 for idx, row in enumerate(rows, start=1): sha1sum = row["sha1sum"] json_text = read_preprocess_json(sha1sum) if json_text is None: # No hi ha preprocess.json per a aquest sha1sum; deixem info_ad a NULL print(f" [{idx}/{total}] {sha1sum}: preprocess.json no trobat, s'omet.") continue cur.execute( "UPDATE audiodescriptions SET info_ad = ? WHERE sha1sum = ?;", (json_text, sha1sum), ) updated_rows += cur.rowcount print(f" [{idx}/{total}] {sha1sum}: json_ad actualitzat per {cur.rowcount} files.") conn.commit() print(f"[OK] {path}: {updated_rows} files actualitzades amb info_ad.") finally: conn.close() def main() -> None: print(f"MEDIA_ROOT: {MEDIA_ROOT} (exists={MEDIA_ROOT.exists()})") if not MEDIA_ROOT.exists(): raise SystemExit("❌ No s'ha trobat demo/data/media") for db_path in DB_PATHS: update_db(db_path) if __name__ == "__main__": main()