demo / compliance_client.py
VeuReu's picture
Upload 10 files
e2c60fb verified
"""
Cliente para comunicarse con el servicio Veureu Compliance
Este módulo se comunica con el microservicio compliance-service
que maneja OAuth, QLDB, Polygon y notificaciones en un solo lugar.
"""
import requests
import json
import os
import time
import random
from typing import Optional, Dict, Any, List
import streamlit as st
from datetime import datetime
import logging
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
logger = logging.getLogger(__name__)
class ComplianceClient:
"""Cliente para el microservicio de cumplimiento normativo"""
def __init__(self, compliance_service_url: str = None):
# URL del servicio de compliance (variable de entorno o por defecto)
self.compliance_service_url = compliance_service_url or os.getenv(
"COMPLIANCE_SERVICE_URL",
"https://veureu-compliance.hf.space" # Space oficial de compliance
)
# Timeout generós per absorbir cold starts del space de compliance
self.timeout = 60 # segons (màxim per a operacions com SMS o blockchain)
logger.info(f"Compliance client inicializado: {self.compliance_service_url}")
def _make_request(self, method: str, endpoint: str, data: Dict = None) -> Optional[Dict[str, Any]]:
"""
Método helper para hacer peticiones HTTP
Args:
method: Método HTTP ('GET', 'POST')
endpoint: Endpoint del API
data: Datos a enviar (solo para POST)
Returns:
Respuesta JSON o None si error
"""
try:
url = f"{self.compliance_service_url}{endpoint}"
logger.info(f"[COMPLIANCE] HTTP {method.upper()} {url} payload={data}")
if method.upper() == "GET":
response = requests.get(url, timeout=self.timeout)
elif method.upper() == "POST":
response = requests.post(url, json=data, timeout=self.timeout)
else:
logger.error(f"Método no soportado: {method}")
return None
logger.info(f"[COMPLIANCE] Resposta {method.upper()} {endpoint}: status={response.status_code}")
if response.status_code == 200:
return response.json()
else:
logger.error(f"Error en petición {method} {endpoint}: {response.status_code}")
logger.error(f"Response: {response.text}")
return None
except requests.exceptions.Timeout:
logger.error(f"Timeout en petición a {endpoint} (URL={self.compliance_service_url}{endpoint}, timeout={self.timeout}s)")
return None
except requests.exceptions.ConnectionError as e:
logger.error(f"Error de conexión a {self.compliance_service_url}{endpoint}: {e}")
return None
except Exception as e:
logger.error(f"Error en petición a {endpoint}: {e}")
return None
# === MÉTODOS DE AUTENTICACIÓN ===
def authenticate_user(self, token: str) -> Optional[Dict[str, Any]]:
"""Valida token con el servicio de autenticación"""
return self._make_request("POST", "/api/auth/validate", {"token": token})
def get_login_url(self, callback_url: str) -> str:
"""Obtiene URL de login del servicio OAuth"""
response = self._make_request("POST", "/api/auth/login-url", {"callback_url": callback_url})
if response:
return response.get("login_url")
return None
def logout_user(self, token: str) -> bool:
"""Invalida sesión en el servicio de autenticación"""
response = self._make_request("POST", "/api/auth/logout", {"token": token})
return response is not None
def is_authenticated(self) -> bool:
"""Verifica si el usuario actual está autenticado"""
# Primero verificar si hay autenticación tradicional
if 'user' in st.session_state and st.session_state.user:
logger.info(f"Usuario ya autenticado tradicionalmente: {st.session_state.user.get('username', 'unknown')}")
# Crear token automático para usuarios tradicionales
if 'auth_token' not in st.session_state:
st.session_state.auth_token = f"traditional_token_{int(time.time())}"
st.session_state.current_user = {
"email": f"{st.session_state.user.get('username', 'unknown')}@veureu.local",
"name": st.session_state.user.get('username', 'Unknown'),
"method": "traditional"
}
return True
# Si no hay autenticación tradicional, verificar por token
if "auth_token" not in st.session_state:
return False
token = st.session_state.auth_token
user_info = self.authenticate_user(token)
if user_info:
st.session_state.current_user = user_info
logger.info(f"Usuario autenticado: {user_info.get('email', 'unknown')}")
return True
else:
# Limpiar session si token inválido
logger.warning("Token inválido, limpiando sesión")
if "auth_token" in st.session_state:
del st.session_state.auth_token
if "current_user" in st.session_state:
del st.session_state.current_user
return False
def get_current_user(self) -> Optional[str]:
"""Obtiene email del usuario actual"""
# Primero verificar autenticación tradicional
if 'user' in st.session_state and st.session_state.user:
return f"{st.session_state.user.get('username', 'unknown')}@veureu.local"
# Si no, verificar autenticación por token
if self.is_authenticated():
return st.session_state.get("current_user", {}).get("email")
return None
def show_login_button(self) -> bool:
"""Muestra botón de login y maneja redirección"""
print("[OAuth] show_login_button llamado") # Forzar salida a log del Space
logger.info("[OAuth] show_login_button llamado")
# Si ya hay usuario autenticado tradicionalmente, no mostrar login
if 'user' in st.session_state and st.session_state.user:
logger.info(f"[OAuth] Usuario ya autenticado tradicionalmente: {st.session_state.user.get('username', 'unknown')}")
return True
# Verificar si OAuth está configurado en el servicio
try:
health_response = self._make_request("GET", "/")
if health_response and health_response.get("services", {}).get("oauth", {}).get("configured") == False:
logger.warning("[OAuth] OAuth no configurado en el servicio, usando modo demo")
return self._show_demo_login()
except Exception as e:
logger.error(f"[OAuth] Error verificando configuración OAuth: {e}")
return self._show_demo_login()
# Construir callback per a HF Spaces
space_url_env = os.getenv("SPACE_URL")
if space_url_env:
callback_url = space_url_env
else:
# Intent d'inferència per defecte: https://<SPACE_ID>.hf.space
space_id = os.getenv("SPACE_ID")
if space_id:
callback_url = f"https://{space_id}.hf.space"
else:
callback_url = "http://localhost:8501"
logger.info(f"[OAuth] Callback calculat: {callback_url}")
login_url = self.get_login_url(callback_url)
logger.info(f"[OAuth] login_url rebut del servei: {login_url}")
if login_url:
try:
parsed = urlparse(login_url)
query = parse_qs(parsed.query)
query.setdefault("prompt", ["select_account"])
query.setdefault("access_type", ["offline"])
encoded_query = urlencode(query, doseq=True)
login_url = urlunparse(parsed._replace(query=encoded_query))
logger.info(f"[OAuth] login_url ajustat amb prompt=select_account: {login_url}")
except Exception as exc:
logger.warning(f"[OAuth] No s'ha pogut ajustar la login_url: {exc}")
if login_url:
st.markdown(f"""
### 🔐 Iniciar sessió
Per continuar, necessitas iniciar sessió amb el teu compte de Google.
<a href="{login_url}" target="_top">
<button style="
background-color: #4285f4;
color: white;
padding: 12px 24px;
border: none;
border-radius: 4px;
font-size: 16px;
cursor: pointer;
text-decoration: none;
display: inline-block;
">
🚪 Iniciar sessió amb Google
</button>
</a>
*En iniciar sessió, acceptes els termes del servei i la política de privadesa.*
""", unsafe_allow_html=True)
# Fallback JS per obrir fora d'iframe si cal
st.markdown(
f"""
<script>
const a = document.querySelector('a[href="{login_url}"]');
if (a) {{
a.addEventListener('click', function(ev) {{
try {{
if (window.top) {{ window.top.location.href = '{login_url}'; ev.preventDefault(); }}
}} catch (e) {{ /* ignore */ }}
}});
}}
</script>
""",
unsafe_allow_html=True,
)
# Verificar callback OAuth
query_params = st.query_params
logger.info(f"[OAuth] Query params rebuts: {dict(query_params) if query_params else {}}")
# Mostrar params para diagnòstic lleuger
if query_params:
st.caption("OAuth params rebuts: " + ", ".join([f"{k}={query_params[k]}" for k in query_params.keys()]))
# Aceptar varios nombres de parámetro
token_key = None
for k in ["auth_token", "token", "code"]:
if k in query_params:
token_key = k
break
if token_key:
token_val = query_params[token_key]
if isinstance(token_val, list):
token_val = token_val[0]
st.session_state.auth_token = token_val
st.query_params.clear()
if self.is_authenticated():
st.success("✅ Sessió iniciada correctament")
st.rerun()
return True
else:
st.warning("⚠️ No s'ha pogut obtenir la URL d'inici de sessió. Torna-ho a intentar en uns minuts.")
logger.error("[OAuth] login_url és None: comprova COMPLIANCE_SERVICE_URL i l'estat del servei de compliance")
return False
# === VERIFICACIÓ PER TELÈFON / WHATSAPP ===
def create_phone_verification_session(
self,
*,
page: str,
user_id: Optional[str] = None,
email: Optional[str] = None,
action: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Crea una sessió de verificació de telèfon al backend de compliance.
Endpoint: POST /phone_verification/create_session
"""
payload: Dict[str, Any] = {
"page": page,
"user_id": user_id,
"email": email,
"action": action,
}
return self._make_request("POST", "/phone_verification/create_session", payload)
def _show_demo_login(self) -> bool:
"""Muestra formulario de login por SMS en ventana desplegable cuando OAuth no está configurado"""
# Inicializar estado SMS si no existe
if 'sms_step' not in st.session_state:
st.session_state.sms_step = 'phone'
st.session_state.sms_code = None
st.session_state.sms_phone = None
st.markdown("""
### 🔐 Iniciar sessió (Verificació per SMS)
Per continuar, necessitas identificar-te i acceptar les condicions d'ús.
""")
# Ventana desplegable con términos, teléfono y aceptación
with st.expander("📋 Identificació i Acceptació de Condicions", expanded=False):
st.markdown("""
### **Condicions d'ús del sistema Veureu (provisional)**
En iniciar sessió i pujar un vídeo al sistema Veureu, l'usuari declara i accepta el següent:
**📋 Declaracions:**
- Declara ser titular dels drets d'ús i difusió del vídeo, o disposar de l'autorització expressa dels titulars.
- Declara que totes les persones identificables en el vídeo han atorgat el seu consentiment informat per a la seva utilització amb finalitats d'accessibilitat i audiodescripció.
- Declara que el vídeo no conté:
- Contingut violent, sexual o d'odi,
- Informació confidencial o de caràcter sensible,
- Imatges de menors sense consentiment parental verificable.
**✅ Acceptacions:**
- Accepta que es processi el vídeo exclusivament amb finalitats de generació i validació d'audiodescripcions, conforme a la normativa UNE-153010:2020 i al RGPD.
- Accepta que el sistema pugui enviar les dades necessàries a proveïdors tecnològics externs (p. ex., models d'IA) que actuen com a encarregats de tractament, sota clàusules contractuals tipus i sense reutilització de dades.
- Accepta que les accions realitzades (pujada, acceptació, validació, revocació) siguin registrades en un sistema immutable (AWS QLDB) mitjançant identificadors no personals.
- Pot exercir en qualsevol moment el seu dret a revocar el consentiment mitjançant el botó "Revocar permisos", el que eliminarà el material audiovisual i deixarà constància de la revocació en el registre.
- Accepta que, fins a la validació interna del material per part de l'equip Dive, el vídeo romandrà en estat "pendent de validació" i no serà utilitzat públicament.
""")
st.markdown("---")
st.markdown("#### 📱 Dades d'identificació")
# Selector de país y número de teléfono
col_country, col_phone = st.columns([1, 3])
with col_country:
country_code = st.selectbox("País", [
("🇪🇸 +34", "+34"),
("🇫🇷 +33", "+33"),
("🇬🇧 +44", "+44"),
("🇩🇪 +49", "+49"),
("🇮🇹 +39", "+39")
], format_func=lambda x: x[0])[1]
with col_phone:
phone_number = st.text_input("Número de telèfon", placeholder="600 123 000", max_chars=15)
# Checkbox para aceptar términos
accept_terms = st.checkbox("✅ Accepto les condicions d'ús i la política de privadesa", key="sms_accept_terms")
# Botón para enviar código
sms_sent = False
if st.button("📤 Enviar codi SMS", type="primary", disabled=not (phone_number and accept_terms)):
full_phone = f"{country_code}{phone_number}"
if self._send_sms_code(full_phone):
st.session_state.sms_step = 'verify'
st.session_state.sms_phone = full_phone
st.success(f"✅ Codis enviat a {full_phone}")
sms_sent = True
else:
st.error("❌ Error enviant el codi. Torna-ho a intentar.")
# Si se envió el SMS, mostrar campo para verificar código
if sms_sent or st.session_state.sms_step == 'verify':
st.markdown("#### 🔓 Verificació del codi")
if st.session_state.sms_code:
st.info(f"💡 **Mode demo**: El codi és '{st.session_state.sms_code}'")
col_code, col_resend = st.columns([2, 1])
with col_code:
verification_code = st.text_input("Codi de 6 dígits", max_chars=6, placeholder="000000")
with col_resend:
st.markdown("<br>", unsafe_allow_html=True)
if st.button("🔄 Reenviar", key="resend_in_expander"):
if self._send_sms_code(st.session_state.sms_phone):
st.success("✅ Nou codi enviat")
else:
st.error("❌ Error enviant el codi")
if st.button("🔓 Verificar i continuar", type="primary"):
if verification_code == st.session_state.sms_code:
# Crear sesión autenticada
demo_token = f"sms_token_{int(time.time())}"
st.session_state.auth_token = demo_token
st.session_state.current_user = {
"email": f"user@{st.session_state.sms_phone.replace('+', '')}.sms",
"name": "Usuario SMS",
"phone": st.session_state.sms_phone
}
st.success("✅ Sessió iniciada correctament")
# Resetear estado SMS
st.session_state.sms_step = 'phone'
st.session_state.sms_code = None
st.session_state.sms_phone = None
st.rerun()
return True
else:
st.error(f"❌ Codi incorrecte. El codi correcte és: {st.session_state.sms_code}")
return False
def _send_sms_code(self, phone_number: str) -> bool:
"""Simula el envío de código SMS (en producción usaría un servicio real)"""
try:
# Generar código aleatorio de 6 dígitos
code = f"{random.randint(100000, 999999)}"
st.session_state.sms_code = code
# En modo demo, siempre mostramos el código
st.info(f"🔧 **Mode demo**: Codi generat: {code}")
logger.info(f"[SMS] Código generado para {phone_number}: {code}")
# Simular envío exitoso
return True
except Exception as e:
logger.error(f"[SMS] Error enviando código: {e}")
return False
def logout(self) -> bool:
"""Cierra sesión del usuario actual"""
if "auth_token" in st.session_state:
token = st.session_state.auth_token
success = self.logout_user(token)
# Limpiar session local siempre
if "auth_token" in st.session_state:
del st.session_state.auth_token
if "current_user" in st.session_state:
del st.session_state.current_user
logger.info("Sesión cerrada")
return success
return True
# === MÉTODOS DE CUMPLIMIENTO (QLDB/POLYGON) ===
def record_consent(self, user_info: Dict[str, Any],
video_info: Dict[str, Any],
consent_data: Dict[str, Any]) -> Optional[str]:
"""Registra consentimiento de usuario vía API"""
payload = {
"user_info": user_info,
"video_info": video_info,
"consent_data": consent_data
}
response = self._make_request("POST", "/api/compliance/record-consent", payload)
if response:
document_id = response.get("document_id")
logger.info(f"Consentimiento registrado: {document_id}")
return document_id
return None
def notify_video_upload(self, video_name: str, sha1sum: str) -> bool:
"""Notifica al servei de compliance que s'ha pujat un nou vídeo.
Això desencadena l'enviament d'un SMS al validor de vídeos.
"""
payload = {"video_name": video_name, "sha1sum": sha1sum}
response = self._make_request(
"POST", "/api/notifications/video-upload-sms", payload
)
return bool(response and response.get("success"))
def notify_user_video_approved(self, phone: str, message: str, sha1sum: str) -> bool:
"""Envia un SMS a l'usuari indicant que el seu vídeo ha estat aprovat.
El backend de compliance decidirà si utilitza Twilio o Zapier segons
la configuració (twilio_enabled / zapier_enabled).
"""
payload = {"phone": phone, "message": message, "sha1sum": sha1sum}
response = self._make_request(
"POST", "/api/notifications/user-video-approved-sms", payload
)
return bool(response and response.get("success"))
def notify_une_validator_new_ads(self, phone: str, message: str) -> bool:
"""Envia un SMS al validador UNE indicant que hi ha noves AD per validar.
El backend de compliance s'encarrega de triar Twilio o Zapier segons
la configuració (twilio_enabled / zapier_enabled).
"""
payload = {"phone": phone, "message": message}
response = self._make_request(
"POST", "/api/notifications/une-validation-sms", payload
)
return bool(response and response.get("success"))
def send_login_sms(self, phone: str, code: str) -> bool:
"""Envia un SMS de verificació de login a través del servei de compliance.
Demo només passa telèfon i codi; compliance s'encarrega de Zapier.
"""
payload = {"phone": phone, "code": code}
response = self._make_request("POST", "/api/notifications/login-sms", payload)
return bool(response and response.get("success"))
def publish_events_digest(self, session_id: str, digest_hash: str) -> Optional[Dict[str, Any]]:
"""Publica el digest d'esdeveniments d'una sessió en blockchain.
Retorna un dict amb, si té èxit:
- transaction_hash
- transaction_url
"""
payload = {
"session_id": session_id,
"digest_hash": digest_hash,
}
response = self._make_request(
"POST", "/api/blockchain/publish-events-digest", payload
)
if response:
return response
return None
def publish_actions_qldb(self, session_id: str, actions: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Envia el registre de canvis d'actions.db a una taula QLDB.
El backend és responsable d'escriure aquest payload a AWS QLDB quan
la funcionalitat de blockchain privada estigui activada.
"""
payload = {
"session_id": session_id,
"actions": actions,
}
response = self._make_request(
"POST", "/api/blockchain/publish-actions-qldb", payload
)
if response:
return response
return None
def send_validation_request(self, validation_request: Dict[str, Any]) -> bool:
"""Envía solicitud de validación a validadores"""
response = self._make_request("POST", "/api/compliance/send-validation", validation_request)
if response:
logger.info(f"Solicitud de validación enviada: {validation_request.get('document_id')}")
return True
return False
def record_validator_decision(self, document_id: str,
validator_email: str,
decision: str,
comments: str = "") -> bool:
"""Registra decisión de validador"""
payload = {
"document_id": document_id,
"validator_email": validator_email,
"decision": decision,
"comments": comments
}
response = self._make_request("POST", "/api/compliance/record-decision", payload)
if response:
logger.info(f"Decisión registrada: {document_id} -> {decision}")
return True
return False
# === MÉTODOS DE BLOCKCHAIN (POLYGON) ===
def publish_monthly_digest(self, period: str, digest_hash: str | None = None) -> Optional[str]:
"""Publica digest mensual en blockchain.
Si digest_hash no és None, s'envia també al backend perquè el faci
servir com a arrel del digest mensual.
"""
payload: Dict[str, Any] = {"period": period}
if digest_hash:
payload["digest_hash"] = digest_hash
response = self._make_request("POST", "/api/blockchain/publish-digest", payload)
if response:
tx_hash = response.get("transaction_hash")
logger.info(f"Digest publicado: {period} -> {tx_hash}")
return tx_hash
return None
def get_published_digests(self) -> List[Dict[str, Any]]:
"""Obtiene lista de digest publicados"""
response = self._make_request("GET", "/api/blockchain/digests")
if response:
digests = response.get("digests", [])
logger.info(f"Obtenidos {len(digests)} digest publicados")
return digests
return []
def verify_digest(self, period: str, expected_hash: str) -> bool:
"""Verifica integridad de digest en blockchain"""
payload = {
"period": period,
"expected_hash": expected_hash
}
response = self._make_request("POST", "/api/blockchain/verify-digest", payload)
if response:
is_valid = response.get("valid", False)
logger.info(f"Digest verificado: {period} -> {'VÁLIDO' if is_valid else 'INVÁLIDO'}")
return is_valid
return False
def get_compliance_stats(self) -> Dict[str, Any]:
"""Obtiene estadísticas de cumplimiento"""
response = self._make_request("GET", "/api/compliance/stats")
if response:
logger.info("Estadísticas de cumplimiento obtenidas")
return response
return {}
def health_check(self) -> bool:
"""Verifica si el servicio de compliance está disponible"""
response = self._make_request("GET", "/")
if response:
status = response.get("status")
if status == "running":
logger.info("Servicio compliance funcionando correctamente")
return True
logger.warning("Servicio compliance no disponible")
return False
def send_decision_notification(self, notification: Dict[str, Any]) -> bool:
"""Envía notificación de decisión de validación"""
response = self._make_request("POST", "/api/compliance/send-decision-notification", notification)
if response:
logger.info(f"Notificación de decisión enviada: {notification.get('document_id')}")
return True
return False
# Instancia global del cliente
compliance_client = ComplianceClient()