|
|
import os
|
|
|
import shutil
|
|
|
import zipfile
|
|
|
import io
|
|
|
import runpy
|
|
|
from pathlib import Path
|
|
|
from typing import Optional
|
|
|
|
|
|
from compliance_client import compliance_client
|
|
|
|
|
|
def _load_data_origin(base_dir: Path) -> str:
|
|
|
"""Lee demo/config.yaml y devuelve data_origin (internal|external)."""
|
|
|
|
|
|
cfg_path = base_dir / "config.yaml"
|
|
|
origin = "internal"
|
|
|
try:
|
|
|
import yaml
|
|
|
|
|
|
with cfg_path.open("r", encoding="utf-8") as f:
|
|
|
cfg = yaml.safe_load(f) or {}
|
|
|
|
|
|
app_cfg = cfg.get("app", {}) or {}
|
|
|
origin = str(app_cfg.get("data_origin", "internal")).lower()
|
|
|
except Exception:
|
|
|
origin = "internal"
|
|
|
return origin
|
|
|
|
|
|
|
|
|
def _load_compliance_flags(base_dir: Path) -> dict:
|
|
|
"""Llegeix flags de la secci贸 'compliance' de demo/config.yaml.
|
|
|
|
|
|
Retorna un dict amb claus:
|
|
|
- private_blockchain_enable
|
|
|
- public_blockchain_enable
|
|
|
- monthly_digest_enabled
|
|
|
"""
|
|
|
|
|
|
cfg_path = base_dir / "config.yaml"
|
|
|
flags = {
|
|
|
"private_blockchain_enabled": False,
|
|
|
"public_blockchain_enabled": False,
|
|
|
"monthly_digest_enabled": False,
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
import yaml
|
|
|
|
|
|
with cfg_path.open("r", encoding="utf-8") as f:
|
|
|
cfg = yaml.safe_load(f) or {}
|
|
|
comp = cfg.get("compliance", {}) or {}
|
|
|
|
|
|
flags["private_blockchain_enabled"] = bool(
|
|
|
comp.get("private_blockchain_enabled",
|
|
|
comp.get("private_blockchain_enable", False))
|
|
|
)
|
|
|
flags["public_blockchain_enabled"] = bool(
|
|
|
comp.get("public_blockchain_enabled",
|
|
|
comp.get("public_blockchain_enable", False))
|
|
|
)
|
|
|
flags["monthly_digest_enabled"] = bool(comp.get("monthly_digest_enabled", False))
|
|
|
except Exception:
|
|
|
|
|
|
return flags
|
|
|
|
|
|
return flags
|
|
|
|
|
|
|
|
|
def ensure_temp_databases(base_dir: Path, api_client) -> None:
|
|
|
"""Garantiza que las BDs *.db est茅n presentes en demo/temp/db antes del login.
|
|
|
|
|
|
- data_origin == "internal": copia demo/data/db/*.db -> demo/temp/db/*.db
|
|
|
- data_origin == "external": llama al endpoint remoto import_databases (ZIP) y lo extrae en demo/temp/db.
|
|
|
|
|
|
Solo se ejecuta una vez por sesi贸n del servidor (usa marker file).
|
|
|
"""
|
|
|
|
|
|
data_origin = _load_data_origin(base_dir)
|
|
|
temp_root = base_dir / "temp"
|
|
|
db_temp_dir = temp_root / "db"
|
|
|
db_temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
marker_file = db_temp_dir / ".db_imported"
|
|
|
|
|
|
if data_origin == "internal":
|
|
|
source_dir = base_dir / "data" / "db"
|
|
|
if not source_dir.exists():
|
|
|
return
|
|
|
|
|
|
|
|
|
db_files = list(source_dir.glob("*.db"))
|
|
|
missing = [f.name for f in db_files if not (db_temp_dir / f.name).exists()]
|
|
|
|
|
|
if marker_file.exists() and not missing:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
for entry in db_files:
|
|
|
dest = db_temp_dir / entry.name
|
|
|
if not dest.exists():
|
|
|
shutil.copy2(entry, dest)
|
|
|
|
|
|
|
|
|
try:
|
|
|
marker_file.write_text("internal", encoding="utf-8")
|
|
|
except Exception:
|
|
|
pass
|
|
|
else:
|
|
|
|
|
|
ext_marker = db_temp_dir / ".external_db_imported"
|
|
|
missing = [name for name in ("feedback.db", "users.db", "videos.db") if not (db_temp_dir / name).exists()]
|
|
|
|
|
|
if ext_marker.exists() and not missing:
|
|
|
return
|
|
|
|
|
|
if missing:
|
|
|
try:
|
|
|
ext_marker.unlink(missing_ok=True)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
if api_client is None:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
resp = api_client.import_databases()
|
|
|
zip_bytes = resp.get("zip_bytes") if isinstance(resp, dict) else None
|
|
|
if zip_bytes:
|
|
|
_extract_zip_bytes(zip_bytes, db_temp_dir)
|
|
|
try:
|
|
|
ext_marker.write_text("imported", encoding="utf-8")
|
|
|
except Exception:
|
|
|
pass
|
|
|
except Exception:
|
|
|
return
|
|
|
|
|
|
|
|
|
def maybe_publish_monthly_actions_digest(
|
|
|
base_dir: Path,
|
|
|
compliance_client,
|
|
|
public_blockchain_enabled: bool,
|
|
|
monthly_digest_enabled: bool,
|
|
|
) -> None:
|
|
|
"""Publica, si cal, el digest mensual sobre actions.db a Polygon.
|
|
|
|
|
|
S'executa despr茅s de tenir demo/temp/db poblats. Calcula el hash de totes
|
|
|
les accions del mes anterior (segons camp timestamp) i, si encara no s'ha
|
|
|
registrat una acci贸 "Monthly Digest sent to Polygon" amb aquest hash,
|
|
|
el publica via servei de compliance.
|
|
|
"""
|
|
|
|
|
|
if not (public_blockchain_enabled and monthly_digest_enabled):
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
from datetime import datetime, date
|
|
|
import sqlite3
|
|
|
import hashlib
|
|
|
import json
|
|
|
|
|
|
db_temp_dir = base_dir / "temp" / "db"
|
|
|
actions_db_path = db_temp_dir / "actions.db"
|
|
|
if not actions_db_path.exists():
|
|
|
return
|
|
|
|
|
|
today = datetime.utcnow().date()
|
|
|
|
|
|
if today.month == 1:
|
|
|
year = today.year - 1
|
|
|
month = 12
|
|
|
else:
|
|
|
year = today.year
|
|
|
month = today.month - 1
|
|
|
|
|
|
period = f"{year:04d}-{month:02d}"
|
|
|
|
|
|
|
|
|
start_ts = f"{period}-01 00:00:00"
|
|
|
if month == 12:
|
|
|
next_year = year + 1
|
|
|
next_month = 1
|
|
|
else:
|
|
|
next_year = year
|
|
|
next_month = month + 1
|
|
|
end_period = f"{next_year:04d}-{next_month:02d}"
|
|
|
end_ts = f"{end_period}-01 00:00:00"
|
|
|
|
|
|
with sqlite3.connect(str(actions_db_path)) as conn:
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
cur = conn.cursor()
|
|
|
|
|
|
|
|
|
cur.execute(
|
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='actions'"
|
|
|
)
|
|
|
if not cur.fetchone():
|
|
|
return
|
|
|
|
|
|
|
|
|
cur.execute(
|
|
|
"SELECT * FROM actions WHERE timestamp >= ? AND timestamp < ?",
|
|
|
(start_ts, end_ts),
|
|
|
)
|
|
|
rows = cur.fetchall()
|
|
|
if not rows:
|
|
|
return
|
|
|
|
|
|
payload: list[dict[str, Any]] = []
|
|
|
for r in rows:
|
|
|
payload.append({k: r[k] for k in r.keys()})
|
|
|
|
|
|
serialized = json.dumps(payload, sort_keys=True, separators=(",", ":"))
|
|
|
digest_hash = hashlib.sha256(serialized.encode("utf-8")).hexdigest()
|
|
|
|
|
|
|
|
|
cur.execute(
|
|
|
"SELECT 1 FROM actions WHERE action = 'Monthly Digest sent to Polygon' AND sha1sum = ? LIMIT 1",
|
|
|
(digest_hash,),
|
|
|
)
|
|
|
if cur.fetchone():
|
|
|
return
|
|
|
|
|
|
|
|
|
try:
|
|
|
resp = compliance_client.publish_monthly_digest(period, digest_hash=digest_hash)
|
|
|
if resp:
|
|
|
try:
|
|
|
from databases import log_action
|
|
|
|
|
|
log_action(
|
|
|
session="",
|
|
|
user="",
|
|
|
phone="",
|
|
|
action="Monthly Digest sent to Polygon",
|
|
|
sha1sum=digest_hash,
|
|
|
)
|
|
|
except Exception:
|
|
|
pass
|
|
|
except Exception:
|
|
|
|
|
|
return
|
|
|
except Exception:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
def _extract_zip_bytes(zip_bytes: bytes, target_dir: Path) -> None:
|
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
|
|
|
zf.extractall(target_dir)
|
|
|
|
|
|
def ensure_media_for_video(
|
|
|
base_dir: Path,
|
|
|
api_client,
|
|
|
sha1sum: str,
|
|
|
version: Optional[str] = None,
|
|
|
) -> None:
|
|
|
"""Garantiza que exista demo/temp/media/<sha1sum>[/<version>].
|
|
|
|
|
|
- data_origin == "internal":
|
|
|
* copia video.mp4 desde demo/data/media/<sha1sum>/ a demo/temp/media/<sha1sum>/
|
|
|
* si version, copia carpeta demo/data/media/<sha1sum>/<version> -> demo/temp/media/<sha1sum>/<version>
|
|
|
- data_origin == "external":
|
|
|
* usa endpoints del engine para obtener zips de media y descomprimirlos en demo/temp/media.
|
|
|
"""
|
|
|
|
|
|
data_origin = _load_data_origin(base_dir)
|
|
|
temp_media_root = base_dir / "temp" / "media"
|
|
|
data_media_root = base_dir / "data" / "media"
|
|
|
|
|
|
video_dir_temp = temp_media_root / sha1sum
|
|
|
if not video_dir_temp.exists():
|
|
|
video_dir_temp.mkdir(parents=True, exist_ok=True)
|
|
|
if data_origin == "internal":
|
|
|
src_video = data_media_root / sha1sum / "video.mp4"
|
|
|
if src_video.exists():
|
|
|
shutil.copy2(src_video, video_dir_temp / "video.mp4")
|
|
|
else:
|
|
|
if api_client is not None:
|
|
|
try:
|
|
|
resp = api_client.import_media(sha1sum)
|
|
|
zip_bytes = resp.get("zip_bytes") if isinstance(resp, dict) else None
|
|
|
if zip_bytes:
|
|
|
_extract_zip_bytes(zip_bytes, video_dir_temp)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
if not version:
|
|
|
return
|
|
|
|
|
|
version_dir_temp = video_dir_temp / version
|
|
|
if version_dir_temp.exists():
|
|
|
return
|
|
|
|
|
|
if data_origin == "internal":
|
|
|
src_version_dir = data_media_root / sha1sum / version
|
|
|
if src_version_dir.exists():
|
|
|
shutil.copytree(src_version_dir, version_dir_temp, dirs_exist_ok=True)
|
|
|
else:
|
|
|
if api_client is None:
|
|
|
return
|
|
|
try:
|
|
|
resp = api_client.import_media_version(sha1sum, version)
|
|
|
zip_bytes = resp.get("zip_bytes") if isinstance(resp, dict) else None
|
|
|
if zip_bytes:
|
|
|
_extract_zip_bytes(zip_bytes, version_dir_temp)
|
|
|
except Exception:
|
|
|
return
|
|
|
|
|
|
|
|
|
def confirm_changes_and_logout(base_dir: Path, api_client, session_id: str) -> None:
|
|
|
"""Confirma canvis de la sessi贸 actual i els persisteix.
|
|
|
|
|
|
1) Per a totes les taules de demo/temp/*.db que tinguin camp 'session' amb
|
|
|
valor igual a session_id, es generen INSERTs per aplicar els canvis a
|
|
|
demo/data/*.db (mode internal) o s'envien a l'endpoint update_databases
|
|
|
(mode external).
|
|
|
|
|
|
2) Per als nous v铆deos registrats a demo/temp/videos.db amb session=session_id,
|
|
|
es miren les carpetes demo/temp/media/<sha1sum> corresponents.
|
|
|
- internal: es copien a demo/data/media/<sha1sum>.
|
|
|
- external: es crea un ZIP amb totes aquestes carpetes i s'envia a
|
|
|
l'endpoint export_media del backend.
|
|
|
"""
|
|
|
|
|
|
if not session_id:
|
|
|
return
|
|
|
|
|
|
data_origin = _load_data_origin(base_dir)
|
|
|
temp_root = base_dir / "temp"
|
|
|
db_temp_dir = temp_root / "db"
|
|
|
data_dir = base_dir / "data"
|
|
|
data_db_dir = data_dir / "db"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import sqlite3
|
|
|
|
|
|
if data_origin == "internal":
|
|
|
sql_statements: list[str] = []
|
|
|
|
|
|
for db_path in db_temp_dir.glob("*.db"):
|
|
|
target_db = data_db_dir / db_path.name
|
|
|
|
|
|
with sqlite3.connect(str(db_path)) as src_conn:
|
|
|
src_conn.row_factory = sqlite3.Row
|
|
|
cur = src_conn.cursor()
|
|
|
|
|
|
|
|
|
cur.execute(
|
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
|
|
|
)
|
|
|
tables = [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
for table in tables:
|
|
|
|
|
|
cur.execute(f"PRAGMA table_info({table})")
|
|
|
cols = cur.fetchall()
|
|
|
col_names = [c[1] for c in cols]
|
|
|
if "session" not in col_names:
|
|
|
continue
|
|
|
|
|
|
|
|
|
cur.execute(f"SELECT * FROM {table} WHERE session = ?", (session_id,))
|
|
|
rows = cur.fetchall()
|
|
|
if not rows:
|
|
|
continue
|
|
|
|
|
|
columns_sql = ", ".join(col_names)
|
|
|
placeholders = ", ".join(["?"] * len(col_names))
|
|
|
insert_sql = f"INSERT OR REPLACE INTO {table} ({columns_sql}) VALUES ({placeholders})"
|
|
|
|
|
|
for row in rows:
|
|
|
values = [row[name] for name in col_names]
|
|
|
|
|
|
target_db.parent.mkdir(parents=True, exist_ok=True)
|
|
|
with sqlite3.connect(str(target_db)) as dst_conn:
|
|
|
dst_conn.execute(insert_sql, values)
|
|
|
dst_conn.commit()
|
|
|
else:
|
|
|
|
|
|
backup_dir = db_temp_dir / "backup"
|
|
|
if backup_dir.exists() and api_client is not None:
|
|
|
for db_path in db_temp_dir.glob("*.db"):
|
|
|
backup_db = backup_dir / db_path.name
|
|
|
if not backup_db.exists():
|
|
|
continue
|
|
|
|
|
|
with sqlite3.connect(str(db_path)) as src_conn, sqlite3.connect(str(backup_db)) as bkp_conn:
|
|
|
src_conn.row_factory = sqlite3.Row
|
|
|
bkp_conn.row_factory = sqlite3.Row
|
|
|
cur_src = src_conn.cursor()
|
|
|
cur_bkp = bkp_conn.cursor()
|
|
|
|
|
|
|
|
|
cur_src.execute(
|
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
|
|
|
)
|
|
|
tables = [r[0] for r in cur_src.fetchall()]
|
|
|
|
|
|
for table in tables:
|
|
|
|
|
|
cur_src.execute(f"PRAGMA table_info({table})")
|
|
|
cols = cur_src.fetchall()
|
|
|
col_names = [c[1] for c in cols]
|
|
|
if not col_names:
|
|
|
continue
|
|
|
|
|
|
|
|
|
cur_src.execute(f"SELECT * FROM {table}")
|
|
|
src_rows = cur_src.fetchall()
|
|
|
|
|
|
try:
|
|
|
cur_bkp.execute(f"SELECT * FROM {table}")
|
|
|
bkp_rows = cur_bkp.fetchall()
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
|
bkp_rows = []
|
|
|
|
|
|
def _row_key(row) -> tuple:
|
|
|
return tuple(row[c] for c in col_names)
|
|
|
|
|
|
bkp_set = {_row_key(r) for r in bkp_rows}
|
|
|
|
|
|
|
|
|
new_rows = [r for r in src_rows if _row_key(r) not in bkp_set]
|
|
|
if not new_rows:
|
|
|
continue
|
|
|
|
|
|
columns_sql = ", ".join(col_names)
|
|
|
|
|
|
def _sql_literal(v):
|
|
|
if v is None:
|
|
|
return "NULL"
|
|
|
if isinstance(v, (int, float)):
|
|
|
return str(v)
|
|
|
return "'" + str(v).replace("'", "''") + "'"
|
|
|
|
|
|
|
|
|
print(
|
|
|
f"[SYNC] DB={db_path.name} table={table} new_or_changed_rows={len(new_rows)}"
|
|
|
)
|
|
|
|
|
|
|
|
|
for row in new_rows:
|
|
|
values_sql = ", ".join(_sql_literal(row[c]) for c in col_names)
|
|
|
full_sql = (
|
|
|
f"INSERT OR REPLACE INTO {table} ({columns_sql}) "
|
|
|
f"VALUES ({values_sql});"
|
|
|
)
|
|
|
try:
|
|
|
api_client.update_databases({"statements": [full_sql]})
|
|
|
except Exception:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
events_digest_info = None
|
|
|
if public_blockchain_enabled:
|
|
|
actions_db_path = db_temp_dir / "actions.db"
|
|
|
try:
|
|
|
import sqlite3
|
|
|
import hashlib
|
|
|
import json
|
|
|
|
|
|
with sqlite3.connect(str(actions_db_path)) as aconn:
|
|
|
aconn.row_factory = sqlite3.Row
|
|
|
cur = aconn.cursor()
|
|
|
|
|
|
|
|
|
cur.execute(
|
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='actions'"
|
|
|
)
|
|
|
if cur.fetchone():
|
|
|
cur.execute(
|
|
|
"SELECT * FROM actions WHERE session = ?", (session_id,)
|
|
|
)
|
|
|
rows = cur.fetchall()
|
|
|
if rows:
|
|
|
payload = []
|
|
|
for r in rows:
|
|
|
payload.append({k: r[k] for k in r.keys()})
|
|
|
|
|
|
serialized = json.dumps(
|
|
|
payload, sort_keys=True, separators=(",", ":")
|
|
|
)
|
|
|
digest_hash = hashlib.sha256(
|
|
|
serialized.encode("utf-8")
|
|
|
).hexdigest()
|
|
|
|
|
|
print(
|
|
|
f"[POLYGON EVENTS DIGEST] session={session_id} "
|
|
|
f"events={len(payload)} hash={digest_hash}"
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
resp = compliance_client.publish_events_digest(
|
|
|
session_id=session_id,
|
|
|
digest_hash=digest_hash,
|
|
|
)
|
|
|
if resp:
|
|
|
tx_hash = resp.get("transaction_hash")
|
|
|
tx_url = resp.get("transaction_url")
|
|
|
print(
|
|
|
f"[POLYGON PUBLISH] ok tx_hash={tx_hash} tx_url={tx_url}"
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
from databases import log_action
|
|
|
|
|
|
log_action(
|
|
|
session=session_id or "",
|
|
|
user="",
|
|
|
phone="",
|
|
|
action="Polygon events digest published",
|
|
|
sha1sum=digest_hash,
|
|
|
)
|
|
|
except Exception:
|
|
|
pass
|
|
|
else:
|
|
|
print("[POLYGON PUBLISH] error: resposta buida o nul路la")
|
|
|
except Exception as bexc:
|
|
|
print(f"[POLYGON PUBLISH] error publicant digest: {bexc}")
|
|
|
|
|
|
events_digest_info = {
|
|
|
"events_digest": digest_hash,
|
|
|
"events_count": len(payload),
|
|
|
}
|
|
|
except Exception:
|
|
|
|
|
|
events_digest_info = None
|
|
|
|
|
|
|
|
|
if private_blockchain_enabled:
|
|
|
actions_db_path = db_temp_dir / "actions.db"
|
|
|
try:
|
|
|
import sqlite3
|
|
|
import hashlib
|
|
|
import json
|
|
|
|
|
|
if actions_db_path.exists():
|
|
|
with sqlite3.connect(str(actions_db_path)) as aconn:
|
|
|
aconn.row_factory = sqlite3.Row
|
|
|
cur = aconn.cursor()
|
|
|
cur.execute(
|
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='actions'"
|
|
|
)
|
|
|
if cur.fetchone():
|
|
|
cur.execute(
|
|
|
"SELECT * FROM actions WHERE session = ?",
|
|
|
(session_id,),
|
|
|
)
|
|
|
rows = cur.fetchall()
|
|
|
if rows:
|
|
|
actions_payload = []
|
|
|
for r in rows:
|
|
|
row_dict = {k: r[k] for k in r.keys()}
|
|
|
phone_val = row_dict.get("phone")
|
|
|
if phone_val:
|
|
|
phone_hash = hashlib.sha256(
|
|
|
str(phone_val).encode("utf-8")
|
|
|
).hexdigest()
|
|
|
row_dict["phone_hash"] = phone_hash
|
|
|
row_dict["phone"] = None
|
|
|
actions_payload.append(row_dict)
|
|
|
|
|
|
try:
|
|
|
_ = compliance_client.publish_actions_qldb(
|
|
|
session_id=session_id,
|
|
|
actions=actions_payload,
|
|
|
)
|
|
|
except Exception as qexc:
|
|
|
print(f"[QLDB PUBLISH] error guardant actions a QLDB: {qexc}")
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
revoked_sha1s: dict[str, str] = {}
|
|
|
actions_db = db_temp_dir / "actions.db"
|
|
|
try:
|
|
|
import sqlite3
|
|
|
|
|
|
with sqlite3.connect(str(actions_db)) as aconn:
|
|
|
aconn.row_factory = sqlite3.Row
|
|
|
cur = aconn.cursor()
|
|
|
|
|
|
cur.execute("PRAGMA table_info(actions)")
|
|
|
cols = {row[1] for row in cur.fetchall()}
|
|
|
if {"session", "action", "sha1sum", "phone"}.issubset(cols):
|
|
|
cur.execute(
|
|
|
"""
|
|
|
SELECT DISTINCT sha1sum, phone
|
|
|
FROM actions
|
|
|
WHERE session = ? AND action = 'Revocation of permits' AND sha1sum IS NOT NULL
|
|
|
""",
|
|
|
(session_id,),
|
|
|
)
|
|
|
for row in cur.fetchall():
|
|
|
sha1 = str(row["sha1sum"] or "")
|
|
|
phone = str(row["phone"] or "")
|
|
|
if sha1:
|
|
|
revoked_sha1s[sha1] = phone
|
|
|
except Exception:
|
|
|
revoked_sha1s = {}
|
|
|
|
|
|
|
|
|
if revoked_sha1s:
|
|
|
videos_db = db_temp_dir / "videos.db"
|
|
|
ad_db = db_temp_dir / "audiodescriptions.db"
|
|
|
|
|
|
try:
|
|
|
import sqlite3
|
|
|
|
|
|
|
|
|
if videos_db.exists():
|
|
|
with sqlite3.connect(str(videos_db)) as vconn:
|
|
|
cur_v = vconn.cursor()
|
|
|
cur_v.execute("PRAGMA table_info(videos)")
|
|
|
vcols = {row[1] for row in cur_v.fetchall()}
|
|
|
if "sha1sum" in vcols:
|
|
|
for sha1 in revoked_sha1s.keys():
|
|
|
cur_v.execute("DELETE FROM videos WHERE sha1sum = ?", (sha1,))
|
|
|
vconn.commit()
|
|
|
|
|
|
|
|
|
if ad_db.exists():
|
|
|
with sqlite3.connect(str(ad_db)) as adconn:
|
|
|
cur_ad = adconn.cursor()
|
|
|
cur_ad.execute("PRAGMA table_info(audiodescriptions)")
|
|
|
acols = {row[1] for row in cur_ad.fetchall()}
|
|
|
if {"sha1sum"}.issubset(acols):
|
|
|
for sha1 in revoked_sha1s.keys():
|
|
|
cur_ad.execute("DELETE FROM audiodescriptions WHERE sha1sum = ?", (sha1,))
|
|
|
adconn.commit()
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
temp_media_root = temp_root / "media"
|
|
|
temp_pending_root = temp_root / "pending_videos"
|
|
|
for sha1 in revoked_sha1s.keys():
|
|
|
try:
|
|
|
media_dir = temp_media_root / sha1
|
|
|
pending_dir = temp_pending_root / sha1
|
|
|
if media_dir.exists():
|
|
|
shutil.rmtree(media_dir, ignore_errors=True)
|
|
|
if pending_dir.exists():
|
|
|
shutil.rmtree(pending_dir, ignore_errors=True)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
import yaml
|
|
|
|
|
|
config_path = base_dir / "config.yaml"
|
|
|
user_sms_enabled = False
|
|
|
if config_path.exists():
|
|
|
with config_path.open("r", encoding="utf-8") as f:
|
|
|
cfg = yaml.safe_load(f) or {}
|
|
|
validation_cfg = cfg.get("validation", {}) or {}
|
|
|
user_sms_enabled = bool(validation_cfg.get("user_sms_enabled", False))
|
|
|
|
|
|
if user_sms_enabled:
|
|
|
from databases import log_action
|
|
|
|
|
|
for sha1, phone in revoked_sha1s.items():
|
|
|
if not phone:
|
|
|
continue
|
|
|
try:
|
|
|
msg = (
|
|
|
"Els permisos per utilitzar el vostre v铆deo han estat revocats. "
|
|
|
"Les dades associades han estat eliminades del sistema."
|
|
|
)
|
|
|
sms_ok = compliance_client.notify_user_video_approved(
|
|
|
phone=phone,
|
|
|
message=msg,
|
|
|
sha1sum=sha1,
|
|
|
)
|
|
|
if sms_ok:
|
|
|
try:
|
|
|
log_action(
|
|
|
session=session_id or "",
|
|
|
user="",
|
|
|
phone=phone,
|
|
|
action="SMS sent to user for video revocation",
|
|
|
sha1sum=sha1 or "",
|
|
|
)
|
|
|
except Exception:
|
|
|
pass
|
|
|
except Exception:
|
|
|
continue
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
import sqlite3
|
|
|
|
|
|
with sqlite3.connect(str(videos_db)) as vconn:
|
|
|
vconn.row_factory = sqlite3.Row
|
|
|
cur = vconn.cursor()
|
|
|
|
|
|
cur.execute("PRAGMA table_info(videos)")
|
|
|
cols = cur.fetchall()
|
|
|
col_names = [c[1] for c in cols]
|
|
|
if "session" in col_names and "sha1sum" in col_names:
|
|
|
cur.execute(
|
|
|
"SELECT DISTINCT sha1sum FROM videos WHERE session = ?",
|
|
|
(session_id,),
|
|
|
)
|
|
|
for r in cur.fetchall():
|
|
|
if r["sha1sum"]:
|
|
|
new_sha1s.add(str(r["sha1sum"]))
|
|
|
except Exception:
|
|
|
new_sha1s = set()
|
|
|
|
|
|
if not new_sha1s:
|
|
|
return events_digest_info
|
|
|
|
|
|
temp_media_root = temp_root / "media"
|
|
|
|
|
|
if data_origin == "internal":
|
|
|
|
|
|
data_media_root = data_dir / "media"
|
|
|
for sha1 in new_sha1s:
|
|
|
src_dir = temp_media_root / sha1
|
|
|
dst_dir = data_media_root / sha1
|
|
|
if src_dir.exists():
|
|
|
shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
|
|
|
else:
|
|
|
|
|
|
if api_client is None:
|
|
|
return
|
|
|
|
|
|
import tempfile
|
|
|
|
|
|
try:
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
zip_path = Path(tmpdir) / "media_export.zip"
|
|
|
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
|
for sha1 in new_sha1s:
|
|
|
src_dir = temp_media_root / sha1
|
|
|
if not src_dir.exists():
|
|
|
continue
|
|
|
for root, _, files in os.walk(src_dir):
|
|
|
root_path = Path(root)
|
|
|
for fname in files:
|
|
|
fpath = root_path / fname
|
|
|
rel_path = fpath.relative_to(temp_media_root)
|
|
|
zf.write(fpath, arcname=str(rel_path))
|
|
|
|
|
|
zip_bytes = zip_path.read_bytes()
|
|
|
api_client.export_media(zip_bytes)
|
|
|
except Exception:
|
|
|
return events_digest_info
|
|
|
|
|
|
return events_digest_info
|
|
|
|