""" Cliente para comunicarse con el servicio Veureu Compliance Este módulo se comunica con el microservicio compliance-service que maneja OAuth, QLDB, Polygon y notificaciones en un solo lugar. """ import requests import json import os import time import random from typing import Optional, Dict, Any, List import streamlit as st from datetime import datetime import logging from urllib.parse import urlparse, parse_qs, urlencode, urlunparse logger = logging.getLogger(__name__) class ComplianceClient: """Cliente para el microservicio de cumplimiento normativo""" def __init__(self, compliance_service_url: str = None): # URL del servicio de compliance (variable de entorno o por defecto) self.compliance_service_url = compliance_service_url or os.getenv( "COMPLIANCE_SERVICE_URL", "https://veureu-compliance.hf.space" # Space oficial de compliance ) # Timeout generós per absorbir cold starts del space de compliance self.timeout = 60 # segons (màxim per a operacions com SMS o blockchain) logger.info(f"Compliance client inicializado: {self.compliance_service_url}") def _make_request(self, method: str, endpoint: str, data: Dict = None) -> Optional[Dict[str, Any]]: """ Método helper para hacer peticiones HTTP Args: method: Método HTTP ('GET', 'POST') endpoint: Endpoint del API data: Datos a enviar (solo para POST) Returns: Respuesta JSON o None si error """ try: url = f"{self.compliance_service_url}{endpoint}" logger.info(f"[COMPLIANCE] HTTP {method.upper()} {url} payload={data}") if method.upper() == "GET": response = requests.get(url, timeout=self.timeout) elif method.upper() == "POST": response = requests.post(url, json=data, timeout=self.timeout) else: logger.error(f"Método no soportado: {method}") return None logger.info(f"[COMPLIANCE] Resposta {method.upper()} {endpoint}: status={response.status_code}") if response.status_code == 200: return response.json() else: logger.error(f"Error en petición {method} {endpoint}: {response.status_code}") logger.error(f"Response: {response.text}") return None except requests.exceptions.Timeout: logger.error(f"Timeout en petición a {endpoint} (URL={self.compliance_service_url}{endpoint}, timeout={self.timeout}s)") return None except requests.exceptions.ConnectionError as e: logger.error(f"Error de conexión a {self.compliance_service_url}{endpoint}: {e}") return None except Exception as e: logger.error(f"Error en petición a {endpoint}: {e}") return None # === MÉTODOS DE AUTENTICACIÓN === def authenticate_user(self, token: str) -> Optional[Dict[str, Any]]: """Valida token con el servicio de autenticación""" return self._make_request("POST", "/api/auth/validate", {"token": token}) def get_login_url(self, callback_url: str) -> str: """Obtiene URL de login del servicio OAuth""" response = self._make_request("POST", "/api/auth/login-url", {"callback_url": callback_url}) if response: return response.get("login_url") return None def logout_user(self, token: str) -> bool: """Invalida sesión en el servicio de autenticación""" response = self._make_request("POST", "/api/auth/logout", {"token": token}) return response is not None def is_authenticated(self) -> bool: """Verifica si el usuario actual está autenticado""" # Primero verificar si hay autenticación tradicional if 'user' in st.session_state and st.session_state.user: logger.info(f"Usuario ya autenticado tradicionalmente: {st.session_state.user.get('username', 'unknown')}") # Crear token automático para usuarios tradicionales if 'auth_token' not in st.session_state: st.session_state.auth_token = f"traditional_token_{int(time.time())}" st.session_state.current_user = { "email": f"{st.session_state.user.get('username', 'unknown')}@veureu.local", "name": st.session_state.user.get('username', 'Unknown'), "method": "traditional" } return True # Si no hay autenticación tradicional, verificar por token if "auth_token" not in st.session_state: return False token = st.session_state.auth_token user_info = self.authenticate_user(token) if user_info: st.session_state.current_user = user_info logger.info(f"Usuario autenticado: {user_info.get('email', 'unknown')}") return True else: # Limpiar session si token inválido logger.warning("Token inválido, limpiando sesión") if "auth_token" in st.session_state: del st.session_state.auth_token if "current_user" in st.session_state: del st.session_state.current_user return False def get_current_user(self) -> Optional[str]: """Obtiene email del usuario actual""" # Primero verificar autenticación tradicional if 'user' in st.session_state and st.session_state.user: return f"{st.session_state.user.get('username', 'unknown')}@veureu.local" # Si no, verificar autenticación por token if self.is_authenticated(): return st.session_state.get("current_user", {}).get("email") return None def show_login_button(self) -> bool: """Muestra botón de login y maneja redirección""" print("[OAuth] show_login_button llamado") # Forzar salida a log del Space logger.info("[OAuth] show_login_button llamado") # Si ya hay usuario autenticado tradicionalmente, no mostrar login if 'user' in st.session_state and st.session_state.user: logger.info(f"[OAuth] Usuario ya autenticado tradicionalmente: {st.session_state.user.get('username', 'unknown')}") return True # Verificar si OAuth está configurado en el servicio try: health_response = self._make_request("GET", "/") if health_response and health_response.get("services", {}).get("oauth", {}).get("configured") == False: logger.warning("[OAuth] OAuth no configurado en el servicio, usando modo demo") return self._show_demo_login() except Exception as e: logger.error(f"[OAuth] Error verificando configuración OAuth: {e}") return self._show_demo_login() # Construir callback per a HF Spaces space_url_env = os.getenv("SPACE_URL") if space_url_env: callback_url = space_url_env else: # Intent d'inferència per defecte: https://.hf.space space_id = os.getenv("SPACE_ID") if space_id: callback_url = f"https://{space_id}.hf.space" else: callback_url = "http://localhost:8501" logger.info(f"[OAuth] Callback calculat: {callback_url}") login_url = self.get_login_url(callback_url) logger.info(f"[OAuth] login_url rebut del servei: {login_url}") if login_url: try: parsed = urlparse(login_url) query = parse_qs(parsed.query) query.setdefault("prompt", ["select_account"]) query.setdefault("access_type", ["offline"]) encoded_query = urlencode(query, doseq=True) login_url = urlunparse(parsed._replace(query=encoded_query)) logger.info(f"[OAuth] login_url ajustat amb prompt=select_account: {login_url}") except Exception as exc: logger.warning(f"[OAuth] No s'ha pogut ajustar la login_url: {exc}") if login_url: st.markdown(f""" ### 🔐 Iniciar sessió Per continuar, necessitas iniciar sessió amb el teu compte de Google. *En iniciar sessió, acceptes els termes del servei i la política de privadesa.* """, unsafe_allow_html=True) # Fallback JS per obrir fora d'iframe si cal st.markdown( f""" """, unsafe_allow_html=True, ) # Verificar callback OAuth query_params = st.query_params logger.info(f"[OAuth] Query params rebuts: {dict(query_params) if query_params else {}}") # Mostrar params para diagnòstic lleuger if query_params: st.caption("OAuth params rebuts: " + ", ".join([f"{k}={query_params[k]}" for k in query_params.keys()])) # Aceptar varios nombres de parámetro token_key = None for k in ["auth_token", "token", "code"]: if k in query_params: token_key = k break if token_key: token_val = query_params[token_key] if isinstance(token_val, list): token_val = token_val[0] st.session_state.auth_token = token_val st.query_params.clear() if self.is_authenticated(): st.success("✅ Sessió iniciada correctament") st.rerun() return True else: st.warning("⚠️ No s'ha pogut obtenir la URL d'inici de sessió. Torna-ho a intentar en uns minuts.") logger.error("[OAuth] login_url és None: comprova COMPLIANCE_SERVICE_URL i l'estat del servei de compliance") return False # === VERIFICACIÓ PER TELÈFON / WHATSAPP === def create_phone_verification_session( self, *, page: str, user_id: Optional[str] = None, email: Optional[str] = None, action: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """Crea una sessió de verificació de telèfon al backend de compliance. Endpoint: POST /phone_verification/create_session """ payload: Dict[str, Any] = { "page": page, "user_id": user_id, "email": email, "action": action, } return self._make_request("POST", "/phone_verification/create_session", payload) def _show_demo_login(self) -> bool: """Muestra formulario de login por SMS en ventana desplegable cuando OAuth no está configurado""" # Inicializar estado SMS si no existe if 'sms_step' not in st.session_state: st.session_state.sms_step = 'phone' st.session_state.sms_code = None st.session_state.sms_phone = None st.markdown(""" ### 🔐 Iniciar sessió (Verificació per SMS) Per continuar, necessitas identificar-te i acceptar les condicions d'ús. """) # Ventana desplegable con términos, teléfono y aceptación with st.expander("📋 Identificació i Acceptació de Condicions", expanded=False): st.markdown(""" ### **Condicions d'ús del sistema Veureu (provisional)** En iniciar sessió i pujar un vídeo al sistema Veureu, l'usuari declara i accepta el següent: **📋 Declaracions:** - Declara ser titular dels drets d'ús i difusió del vídeo, o disposar de l'autorització expressa dels titulars. - Declara que totes les persones identificables en el vídeo han atorgat el seu consentiment informat per a la seva utilització amb finalitats d'accessibilitat i audiodescripció. - Declara que el vídeo no conté: - Contingut violent, sexual o d'odi, - Informació confidencial o de caràcter sensible, - Imatges de menors sense consentiment parental verificable. **✅ Acceptacions:** - Accepta que es processi el vídeo exclusivament amb finalitats de generació i validació d'audiodescripcions, conforme a la normativa UNE-153010:2020 i al RGPD. - Accepta que el sistema pugui enviar les dades necessàries a proveïdors tecnològics externs (p. ex., models d'IA) que actuen com a encarregats de tractament, sota clàusules contractuals tipus i sense reutilització de dades. - Accepta que les accions realitzades (pujada, acceptació, validació, revocació) siguin registrades en un sistema immutable (AWS QLDB) mitjançant identificadors no personals. - Pot exercir en qualsevol moment el seu dret a revocar el consentiment mitjançant el botó "Revocar permisos", el que eliminarà el material audiovisual i deixarà constància de la revocació en el registre. - Accepta que, fins a la validació interna del material per part de l'equip Dive, el vídeo romandrà en estat "pendent de validació" i no serà utilitzat públicament. """) st.markdown("---") st.markdown("#### 📱 Dades d'identificació") # Selector de país y número de teléfono col_country, col_phone = st.columns([1, 3]) with col_country: country_code = st.selectbox("País", [ ("🇪🇸 +34", "+34"), ("🇫🇷 +33", "+33"), ("🇬🇧 +44", "+44"), ("🇩🇪 +49", "+49"), ("🇮🇹 +39", "+39") ], format_func=lambda x: x[0])[1] with col_phone: phone_number = st.text_input("Número de telèfon", placeholder="600 123 000", max_chars=15) # Checkbox para aceptar términos accept_terms = st.checkbox("✅ Accepto les condicions d'ús i la política de privadesa", key="sms_accept_terms") # Botón para enviar código sms_sent = False if st.button("📤 Enviar codi SMS", type="primary", disabled=not (phone_number and accept_terms)): full_phone = f"{country_code}{phone_number}" if self._send_sms_code(full_phone): st.session_state.sms_step = 'verify' st.session_state.sms_phone = full_phone st.success(f"✅ Codis enviat a {full_phone}") sms_sent = True else: st.error("❌ Error enviant el codi. Torna-ho a intentar.") # Si se envió el SMS, mostrar campo para verificar código if sms_sent or st.session_state.sms_step == 'verify': st.markdown("#### 🔓 Verificació del codi") if st.session_state.sms_code: st.info(f"💡 **Mode demo**: El codi és '{st.session_state.sms_code}'") col_code, col_resend = st.columns([2, 1]) with col_code: verification_code = st.text_input("Codi de 6 dígits", max_chars=6, placeholder="000000") with col_resend: st.markdown("
", unsafe_allow_html=True) if st.button("🔄 Reenviar", key="resend_in_expander"): if self._send_sms_code(st.session_state.sms_phone): st.success("✅ Nou codi enviat") else: st.error("❌ Error enviant el codi") if st.button("🔓 Verificar i continuar", type="primary"): if verification_code == st.session_state.sms_code: # Crear sesión autenticada demo_token = f"sms_token_{int(time.time())}" st.session_state.auth_token = demo_token st.session_state.current_user = { "email": f"user@{st.session_state.sms_phone.replace('+', '')}.sms", "name": "Usuario SMS", "phone": st.session_state.sms_phone } st.success("✅ Sessió iniciada correctament") # Resetear estado SMS st.session_state.sms_step = 'phone' st.session_state.sms_code = None st.session_state.sms_phone = None st.rerun() return True else: st.error(f"❌ Codi incorrecte. El codi correcte és: {st.session_state.sms_code}") return False def _send_sms_code(self, phone_number: str) -> bool: """Simula el envío de código SMS (en producción usaría un servicio real)""" try: # Generar código aleatorio de 6 dígitos code = f"{random.randint(100000, 999999)}" st.session_state.sms_code = code # En modo demo, siempre mostramos el código st.info(f"🔧 **Mode demo**: Codi generat: {code}") logger.info(f"[SMS] Código generado para {phone_number}: {code}") # Simular envío exitoso return True except Exception as e: logger.error(f"[SMS] Error enviando código: {e}") return False def logout(self) -> bool: """Cierra sesión del usuario actual""" if "auth_token" in st.session_state: token = st.session_state.auth_token success = self.logout_user(token) # Limpiar session local siempre if "auth_token" in st.session_state: del st.session_state.auth_token if "current_user" in st.session_state: del st.session_state.current_user logger.info("Sesión cerrada") return success return True # === MÉTODOS DE CUMPLIMIENTO (QLDB/POLYGON) === def record_consent(self, user_info: Dict[str, Any], video_info: Dict[str, Any], consent_data: Dict[str, Any]) -> Optional[str]: """Registra consentimiento de usuario vía API""" payload = { "user_info": user_info, "video_info": video_info, "consent_data": consent_data } response = self._make_request("POST", "/api/compliance/record-consent", payload) if response: document_id = response.get("document_id") logger.info(f"Consentimiento registrado: {document_id}") return document_id return None def notify_video_upload(self, video_name: str, sha1sum: str) -> bool: """Notifica al servei de compliance que s'ha pujat un nou vídeo. Això desencadena l'enviament d'un SMS al validor de vídeos. """ payload = {"video_name": video_name, "sha1sum": sha1sum} response = self._make_request( "POST", "/api/notifications/video-upload-sms", payload ) return bool(response and response.get("success")) def notify_user_video_approved(self, phone: str, message: str, sha1sum: str) -> bool: """Envia un SMS a l'usuari indicant que el seu vídeo ha estat aprovat. El backend de compliance decidirà si utilitza Twilio o Zapier segons la configuració (twilio_enabled / zapier_enabled). """ payload = {"phone": phone, "message": message, "sha1sum": sha1sum} response = self._make_request( "POST", "/api/notifications/user-video-approved-sms", payload ) return bool(response and response.get("success")) def notify_une_validator_new_ads(self, phone: str, message: str) -> bool: """Envia un SMS al validador UNE indicant que hi ha noves AD per validar. El backend de compliance s'encarrega de triar Twilio o Zapier segons la configuració (twilio_enabled / zapier_enabled). """ payload = {"phone": phone, "message": message} response = self._make_request( "POST", "/api/notifications/une-validation-sms", payload ) return bool(response and response.get("success")) def send_login_sms(self, phone: str, code: str) -> bool: """Envia un SMS de verificació de login a través del servei de compliance. Demo només passa telèfon i codi; compliance s'encarrega de Zapier. """ payload = {"phone": phone, "code": code} response = self._make_request("POST", "/api/notifications/login-sms", payload) return bool(response and response.get("success")) def publish_events_digest(self, session_id: str, digest_hash: str) -> Optional[Dict[str, Any]]: """Publica el digest d'esdeveniments d'una sessió en blockchain. Retorna un dict amb, si té èxit: - transaction_hash - transaction_url """ payload = { "session_id": session_id, "digest_hash": digest_hash, } response = self._make_request( "POST", "/api/blockchain/publish-events-digest", payload ) if response: return response return None def publish_actions_qldb(self, session_id: str, actions: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Envia el registre de canvis d'actions.db a una taula QLDB. El backend és responsable d'escriure aquest payload a AWS QLDB quan la funcionalitat de blockchain privada estigui activada. """ payload = { "session_id": session_id, "actions": actions, } response = self._make_request( "POST", "/api/blockchain/publish-actions-qldb", payload ) if response: return response return None def send_validation_request(self, validation_request: Dict[str, Any]) -> bool: """Envía solicitud de validación a validadores""" response = self._make_request("POST", "/api/compliance/send-validation", validation_request) if response: logger.info(f"Solicitud de validación enviada: {validation_request.get('document_id')}") return True return False def record_validator_decision(self, document_id: str, validator_email: str, decision: str, comments: str = "") -> bool: """Registra decisión de validador""" payload = { "document_id": document_id, "validator_email": validator_email, "decision": decision, "comments": comments } response = self._make_request("POST", "/api/compliance/record-decision", payload) if response: logger.info(f"Decisión registrada: {document_id} -> {decision}") return True return False # === MÉTODOS DE BLOCKCHAIN (POLYGON) === def publish_monthly_digest(self, period: str, digest_hash: str | None = None) -> Optional[str]: """Publica digest mensual en blockchain. Si digest_hash no és None, s'envia també al backend perquè el faci servir com a arrel del digest mensual. """ payload: Dict[str, Any] = {"period": period} if digest_hash: payload["digest_hash"] = digest_hash response = self._make_request("POST", "/api/blockchain/publish-digest", payload) if response: tx_hash = response.get("transaction_hash") logger.info(f"Digest publicado: {period} -> {tx_hash}") return tx_hash return None def get_published_digests(self) -> List[Dict[str, Any]]: """Obtiene lista de digest publicados""" response = self._make_request("GET", "/api/blockchain/digests") if response: digests = response.get("digests", []) logger.info(f"Obtenidos {len(digests)} digest publicados") return digests return [] def verify_digest(self, period: str, expected_hash: str) -> bool: """Verifica integridad de digest en blockchain""" payload = { "period": period, "expected_hash": expected_hash } response = self._make_request("POST", "/api/blockchain/verify-digest", payload) if response: is_valid = response.get("valid", False) logger.info(f"Digest verificado: {period} -> {'VÁLIDO' if is_valid else 'INVÁLIDO'}") return is_valid return False def get_compliance_stats(self) -> Dict[str, Any]: """Obtiene estadísticas de cumplimiento""" response = self._make_request("GET", "/api/compliance/stats") if response: logger.info("Estadísticas de cumplimiento obtenidas") return response return {} def health_check(self) -> bool: """Verifica si el servicio de compliance está disponible""" response = self._make_request("GET", "/") if response: status = response.get("status") if status == "running": logger.info("Servicio compliance funcionando correctamente") return True logger.warning("Servicio compliance no disponible") return False def send_decision_notification(self, notification: Dict[str, Any]) -> bool: """Envía notificación de decisión de validación""" response = self._make_request("POST", "/api/compliance/send-decision-notification", notification) if response: logger.info(f"Notificación de decisión enviada: {notification.get('document_id')}") return True return False # Instancia global del cliente compliance_client = ComplianceClient()