|
|
from __future__ import annotations
|
|
|
|
|
|
from pathlib import Path
|
|
|
import sqlite3
|
|
|
from typing import Iterable, Optional
|
|
|
|
|
|
import json
|
|
|
|
|
|
|
|
|
BASE_DEMO = Path(__file__).resolve().parent.parent
|
|
|
MEDIA_ROOT = BASE_DEMO / "data" / "media"
|
|
|
DB_PATHS = [
|
|
|
BASE_DEMO / "data" / "audiodescriptions.db",
|
|
|
BASE_DEMO / "temp" / "audiodescriptions.db",
|
|
|
]
|
|
|
|
|
|
|
|
|
def read_preprocess_json(sha1sum: str) -> Optional[str]:
|
|
|
"""Lee el fichero preprocess.json para un sha1sum dado.
|
|
|
|
|
|
Devuelve el contenido en texto plano (UTF-8) o None si no existe o hay
|
|
|
errores de lectura.
|
|
|
"""
|
|
|
|
|
|
video_dir = MEDIA_ROOT / sha1sum
|
|
|
path = video_dir / "preprocess.json"
|
|
|
if not path.exists():
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
|
|
|
return path.read_text(encoding="utf-8")
|
|
|
except Exception:
|
|
|
try:
|
|
|
return path.read_text(errors="ignore")
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
|
|
|
def ensure_json_ad_column(conn: sqlite3.Connection) -> None:
|
|
|
"""Añade la columna info_ad si no existe todavía en audiodescriptions.
|
|
|
|
|
|
(Nom antic json_ad, mantingut per compatibilitat del nom de la funció.)
|
|
|
"""
|
|
|
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("PRAGMA table_info(audiodescriptions);")
|
|
|
cols = [row[1] for row in cur.fetchall()]
|
|
|
target_col = "info_ad"
|
|
|
if target_col not in cols:
|
|
|
cur.execute(f"ALTER TABLE audiodescriptions ADD COLUMN {target_col} TEXT;")
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
def update_db(path: Path) -> None:
|
|
|
"""Actualiza una base de datos audiodescriptions.db añadiendo info_ad.
|
|
|
|
|
|
- Asegura que existe la columna info_ad.
|
|
|
- Para cada sha1sum presente en la tabla, intenta leer
|
|
|
demo/data/media/<sha1sum>/preprocess.json i guarda el seu contingut a info_ad.
|
|
|
"""
|
|
|
|
|
|
if not path.exists():
|
|
|
print(f"[INFO] {path} no existe, se omite.")
|
|
|
return
|
|
|
|
|
|
print(f"[INFO] Actualitzant {path} ...")
|
|
|
|
|
|
conn = sqlite3.connect(str(path))
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
try:
|
|
|
ensure_json_ad_column(conn)
|
|
|
cur = conn.cursor()
|
|
|
|
|
|
|
|
|
cur.execute("SELECT DISTINCT sha1sum FROM audiodescriptions;")
|
|
|
rows = cur.fetchall()
|
|
|
total = len(rows)
|
|
|
print(f" - {total} sha1sum diferents trobats.")
|
|
|
|
|
|
updated_rows = 0
|
|
|
for idx, row in enumerate(rows, start=1):
|
|
|
sha1sum = row["sha1sum"]
|
|
|
json_text = read_preprocess_json(sha1sum)
|
|
|
|
|
|
if json_text is None:
|
|
|
|
|
|
print(f" [{idx}/{total}] {sha1sum}: preprocess.json no trobat, s'omet.")
|
|
|
continue
|
|
|
|
|
|
cur.execute(
|
|
|
"UPDATE audiodescriptions SET info_ad = ? WHERE sha1sum = ?;",
|
|
|
(json_text, sha1sum),
|
|
|
)
|
|
|
updated_rows += cur.rowcount
|
|
|
print(f" [{idx}/{total}] {sha1sum}: json_ad actualitzat per {cur.rowcount} files.")
|
|
|
|
|
|
conn.commit()
|
|
|
print(f"[OK] {path}: {updated_rows} files actualitzades amb info_ad.")
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
def main() -> None:
|
|
|
print(f"MEDIA_ROOT: {MEDIA_ROOT} (exists={MEDIA_ROOT.exists()})")
|
|
|
if not MEDIA_ROOT.exists():
|
|
|
raise SystemExit("❌ No s'ha trobat demo/data/media")
|
|
|
|
|
|
for db_path in DB_PATHS:
|
|
|
update_db(db_path)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|