import os import shutil import zipfile import io from pathlib import Path from typing import Optional def _load_data_origin(base_dir: Path) -> str: """Lee demo/config.yaml y devuelve data_origin (internal|external).""" cfg_path = base_dir / "config.yaml" origin = "internal" try: import yaml with cfg_path.open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) or {} origin = str(cfg.get("data_origin", "internal")).lower() except Exception: origin = "internal" return origin def _load_compliance_flags(base_dir: Path) -> dict: """Llegeix flags de la secció 'compliance' de demo/config.yaml. Retorna un dict amb claus: - private_blockchain_enable - public_blockchain_enable - monthly_digest_enabled """ cfg_path = base_dir / "config.yaml" flags = { "private_blockchain_enable": False, "public_blockchain_enable": False, "monthly_digest_enabled": False, } try: import yaml with cfg_path.open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) or {} comp = cfg.get("compliance", {}) or {} flags["private_blockchain_enable"] = bool(comp.get("private_blockchain_enable", False)) flags["public_blockchain_enable"] = bool(comp.get("public_blockchain_enable", False)) flags["monthly_digest_enabled"] = bool(comp.get("monthly_digest_enabled", False)) except Exception: # Manté valors per defecte en cas d'error return flags return flags def ensure_temp_databases(base_dir: Path, api_client) -> None: """Garantiza que las BDs *.db estén presentes en demo/temp antes del login. - data_origin == "internal": copia demo/data/*.db -> demo/temp/*.db - data_origin == "external": llama al endpoint remoto import_databases. """ data_origin = _load_data_origin(base_dir) compliance_flags = _load_compliance_flags(base_dir) public_blockchain_enable = bool(compliance_flags.get("public_blockchain_enable", False)) temp_dir = base_dir / "temp" temp_dir.mkdir(parents=True, exist_ok=True) if data_origin == "internal": source_dir = base_dir / "data" if source_dir.exists(): for entry in source_dir.glob("*.db"): dest = temp_dir / entry.name shutil.copy2(entry, dest) else: if api_client is None: return try: api_client.import_databases() except Exception: return def _extract_zip_bytes(zip_bytes: bytes, target_dir: Path) -> None: target_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: zf.extractall(target_dir) def ensure_media_for_video( base_dir: Path, api_client, sha1sum: str, version: Optional[str] = None, ) -> None: """Garantiza que exista demo/temp/media/[/]. - data_origin == "internal": * copia video.mp4 desde demo/data/media// a demo/temp/media// * si version, copia carpeta demo/data/media// -> demo/temp/media// - data_origin == "external": * usa endpoints del engine para obtener zips de media y descomprimirlos en demo/temp/media. """ data_origin = _load_data_origin(base_dir) temp_media_root = base_dir / "temp" / "media" data_media_root = base_dir / "data" / "media" video_dir_temp = temp_media_root / sha1sum if not video_dir_temp.exists(): video_dir_temp.mkdir(parents=True, exist_ok=True) if data_origin == "internal": src_video = data_media_root / sha1sum / "video.mp4" if src_video.exists(): shutil.copy2(src_video, video_dir_temp / "video.mp4") else: if api_client is not None: try: resp = api_client.import_media(sha1sum) zip_bytes = resp.get("zip_bytes") if isinstance(resp, dict) else None if zip_bytes: _extract_zip_bytes(zip_bytes, video_dir_temp) except Exception: pass if not version: return version_dir_temp = video_dir_temp / version if version_dir_temp.exists(): return if data_origin == "internal": src_version_dir = data_media_root / sha1sum / version if src_version_dir.exists(): shutil.copytree(src_version_dir, version_dir_temp, dirs_exist_ok=True) else: if api_client is None: return try: resp = api_client.import_media_version(sha1sum, version) zip_bytes = resp.get("zip_bytes") if isinstance(resp, dict) else None if zip_bytes: _extract_zip_bytes(zip_bytes, version_dir_temp) except Exception: return def confirm_changes_and_logout(base_dir: Path, api_client, session_id: str) -> None: """Confirma canvis de la sessió actual i els persisteix. 1) Per a totes les taules de demo/temp/*.db que tinguin camp 'session' amb valor igual a session_id, es generen INSERTs per aplicar els canvis a demo/data/*.db (mode internal) o s'envien a l'endpoint update_databases (mode external). 2) Per als nous vídeos registrats a demo/temp/videos.db amb session=session_id, es miren les carpetes demo/temp/media/ corresponents. - internal: es copien a demo/data/media/. - external: es crea un ZIP amb totes aquestes carpetes i s'envia a l'endpoint export_media del backend. """ if not session_id: return data_origin = _load_data_origin(base_dir) temp_dir = base_dir / "temp" data_dir = base_dir / "data" # --- 1) Sincronitzar taules amb camp 'session' --- sql_statements: list[str] = [] for db_path in temp_dir.glob("*.db"): target_db = data_dir / db_path.name import sqlite3 with sqlite3.connect(str(db_path)) as src_conn: src_conn.row_factory = sqlite3.Row cur = src_conn.cursor() # Llistar taules usuari cur.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" ) tables = [r[0] for r in cur.fetchall()] for table in tables: # Mirar si la taula té columna 'session' cur.execute(f"PRAGMA table_info({table})") cols = cur.fetchall() col_names = [c[1] for c in cols] if "session" not in col_names: continue # Files per a la sessió actual cur.execute(f"SELECT * FROM {table} WHERE session = ?", (session_id,)) rows = cur.fetchall() if not rows: continue columns_sql = ", ".join(col_names) placeholders = ", ".join(["?"] * len(col_names)) insert_sql = f"INSERT OR REPLACE INTO {table} ({columns_sql}) VALUES ({placeholders})" for row in rows: values = [row[name] for name in col_names] if data_origin == "internal": # Aplicar directament a demo/data/*.db target_db.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(str(target_db)) as dst_conn: dst_conn.execute(insert_sql, values) dst_conn.commit() else: # Guardar la sentència + valors com a SQL literal simple # (assumim que el backend farà el parse o executarà directament). # Per simplicitat, fem una interpolació segura bàsica. def _sql_literal(v): if v is None: return "NULL" if isinstance(v, (int, float)): return str(v) return "'" + str(v).replace("'", "''") + "'" values_sql = ", ".join(_sql_literal(v) for v in values) full_sql = f"INSERT OR REPLACE INTO {table} ({columns_sql}) VALUES ({values_sql});" sql_statements.append(full_sql) if data_origin != "internal" and sql_statements and api_client is not None: try: api_client.update_databases({"statements": sql_statements}) except Exception: pass # --- 2) Digest d'esdeveniments per a la sessió (public blockchain) --- if public_blockchain_enable: events_db = temp_dir / "events.db" try: import sqlite3 import hashlib import json with sqlite3.connect(str(events_db)) as econn: econn.row_factory = sqlite3.Row cur = econn.cursor() # Comprovar que existeix la taula events cur.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='events'" ) if cur.fetchone(): cur.execute( "SELECT * FROM events WHERE session = ?", (session_id,) ) rows = cur.fetchall() if rows: payload = [] for r in rows: payload.append({k: r[k] for k in r.keys()}) serialized = json.dumps( payload, sort_keys=True, separators=(",", ":") ) digest_hash = hashlib.sha256( serialized.encode("utf-8") ).hexdigest() # Simular publicació a Polygon: log amb prefix clar print( f"[POLYGON EVENTS DIGEST] session={session_id} " f"events={len(payload)} hash={digest_hash}" ) except Exception: # No aturar el procés si hi ha errors en el càlcul del digest pass # --- 3) Nous vídeos a videos.db associats a la sessió --- videos_db = temp_dir / "videos.db" new_sha1s: set[str] = set() try: import sqlite3 with sqlite3.connect(str(videos_db)) as vconn: vconn.row_factory = sqlite3.Row cur = vconn.cursor() # Només si existeix columna 'session' i 'sha1sum' cur.execute("PRAGMA table_info(videos)") cols = cur.fetchall() col_names = [c[1] for c in cols] if "session" in col_names and "sha1sum" in col_names: cur.execute( "SELECT DISTINCT sha1sum FROM videos WHERE session = ?", (session_id,) ) for r in cur.fetchall(): if r["sha1sum"]: new_sha1s.add(str(r["sha1sum"])) except Exception: new_sha1s = set() if not new_sha1s: return temp_media_root = temp_dir / "media" if data_origin == "internal": # Copiar carpetes de media noves a demo/data/media data_media_root = data_dir / "media" for sha1 in new_sha1s: src_dir = temp_media_root / sha1 dst_dir = data_media_root / sha1 if src_dir.exists(): shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True) else: # Crear un ZIP amb totes les carpetes de sha1sum i enviar-lo a export_media if api_client is None: return import tempfile try: with tempfile.TemporaryDirectory() as tmpdir: zip_path = Path(tmpdir) / "media_export.zip" with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for sha1 in new_sha1s: src_dir = temp_media_root / sha1 if not src_dir.exists(): continue for root, _, files in os.walk(src_dir): root_path = Path(root) for fname in files: fpath = root_path / fname rel_path = fpath.relative_to(temp_media_root) zf.write(fpath, arcname=str(rel_path)) zip_bytes = zip_path.read_bytes() api_client.export_media(zip_bytes) except Exception: return