import csv import hashlib import sqlite3 from pathlib import Path from typing import Iterable, Tuple BASE_DIR = Path(__file__).resolve().parent.parent DATA_DIR = BASE_DIR / "data" VIDEOS_DIR = DATA_DIR / "videos" CASTING_DB_PATH = DATA_DIR / "casting.db" SCENARIOS_DB_PATH = DATA_DIR / "scenarios.db" def sha1_of_file(path: Path, buf_size: int = 65536) -> str: """Calcula el SHA1 de un fichero binario.""" h = hashlib.sha1() with path.open("rb") as f: while True: chunk = f.read(buf_size) if not chunk: break h.update(chunk) return h.hexdigest() def ensure_db(db_path: Path, table_name: str) -> sqlite3.Connection: """Crea (o recrea) una BD simple con tabla (sha1sum, name, description).""" db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path)) cur = conn.cursor() cur.execute(f"DROP TABLE IF EXISTS {table_name};") cur.execute( f""" CREATE TABLE {table_name} ( sha1sum TEXT NOT NULL, name TEXT NOT NULL, description TEXT ); """ ) conn.commit() return conn def iter_rows_from_csv(csv_path: Path) -> Iterable[Tuple[str, str]]: """Devuelve pares (name, description) leídos de un CSV simple. - Usa las columnas 0 y 1 de cada fila no vacía. - **Siempre** salta la primera fila como cabecera, sea cual sea su contenido, per evitar que registres com "character name" / "personality" acabin a la BD. """ if not csv_path.exists(): return [] rows: list[Tuple[str, str]] = [] with csv_path.open("r", encoding="utf-8") as f: reader = csv.reader(f) for idx, row in enumerate(reader): if idx == 0: # Saltar sempre la primera fila (cabecera) continue if not row or all(not c.strip() for c in row): continue if len(row) < 2: continue col0, col1 = row[0].strip(), row[1].strip() rows.append((col0, col1)) return rows def populate_single_table( *, conn: sqlite3.Connection, table_name: str, csv_filename: str, ) -> int: """Rellena una tabla (casting o scenarios) a partir dels CSV per vídeo. Recorre demo/data/videos/, busca y un .mp4, calcula sha1sum del vídeo y genera un registre por cada fila del CSV. """ cur = conn.cursor() inserted = 0 if not VIDEOS_DIR.exists(): print(f"[INFO] No existeix {VIDEOS_DIR}, no es pot omplir {table_name}.") return 0 for video_folder in sorted(p for p in VIDEOS_DIR.iterdir() if p.is_dir()): csv_path = video_folder / csv_filename mp4s = sorted(video_folder.glob("*.mp4")) if not csv_path.exists() or not mp4s: continue # Asumim un únic MP4 per carpeta sha1 = sha1_of_file(mp4s[0]) rows = iter_rows_from_csv(csv_path) for name, description in rows: cur.execute( f"INSERT INTO {table_name} (sha1sum, name, description) VALUES (?, ?, ?);", (sha1, name, description), ) inserted += 1 conn.commit() return inserted def main() -> None: casting_conn = ensure_db(CASTING_DB_PATH, "casting") scenarios_conn = ensure_db(SCENARIOS_DB_PATH, "scenarios") try: n_casting = populate_single_table( conn=casting_conn, table_name="casting", csv_filename="casting.csv", ) print(f"[INFO] Registres inserits a casting.db: {n_casting}") n_scen = populate_single_table( conn=scenarios_conn, table_name="scenarios", csv_filename="scenarios.csv", ) print(f"[INFO] Registres inserits a scenarios.db: {n_scen}") finally: casting_conn.close() scenarios_conn.close() if __name__ == "__main__": main()