#!/usr/bin/env python3 """ Enhanced Gradio Demo for UMAP Visualization and 3D Geometry Viewer Adds functionality to upload any geometry, get confidence score, and highlight in plot """ import os import sys import numpy as np import pandas as pd import tempfile import plotly.graph_objects as go import plotly.express as px from pathlib import Path import vtk import pyvista as pv from typing import List, Tuple, Optional, Dict, Any import gradio as gr import json import time from scipy.spatial import cKDTree from sklearn.preprocessing import StandardScaler from sklearn.neighbors import NearestNeighbors # HDBSCAN imports try: import hdbscan from hdbscan import prediction HDBSCAN_AVAILABLE = True except ImportError: HDBSCAN_AVAILABLE = False print("⚠️ HDBSCAN not available, will use distance-based confidence scoring") # Set PyVista to use offscreen rendering pv.set_plot_theme("document") def load_umap_data() -> Tuple[np.ndarray, List[str], List[str], np.ndarray, List[str]]: """Load UMAP analysis results with dynamic file names.""" try: # Try to find the files with dynamic naming in files/ directory possible_embedding_files = [ "files/umap_embedding.npy", "files/combined_umap_embedding.npy", ] possible_labels_files = [ "files/names.npy", "files/combined_names.npy", ] possible_features_files = [ "files/features.npy", "files/combined_features.npy", ] possible_dataset_labels_files = [ "files/labels_dataset_labels.npy", "files/combined_dataset_labels.npy", ] # Also check for single dataset files import glob single_dataset_embedding = glob.glob("files/*_umap_embedding.npy") single_dataset_labels = glob.glob("files/*_names.npy") single_dataset_features = glob.glob("files/*_features.npy") single_dataset_dataset_labels = glob.glob("files/*_dataset_labels.npy") # Add single dataset files to the search lists possible_embedding_files.extend(single_dataset_embedding) possible_labels_files.extend(single_dataset_labels) possible_features_files.extend(single_dataset_features) possible_dataset_labels_files.extend(single_dataset_dataset_labels) # Find the first existing file for each type embedding_file = None for file in possible_embedding_files: if os.path.exists(file): embedding_file = file break labels_file = None for file in possible_labels_files: if os.path.exists(file): labels_file = file break features_file = None for file in possible_features_files: if os.path.exists(file): features_file = file break dataset_labels_file = None for file in possible_dataset_labels_files: if os.path.exists(file): dataset_labels_file = file break if not all([embedding_file, labels_file, features_file]): raise FileNotFoundError("Required UMAP data files not found") # Load embedding coordinates embedding = np.load(embedding_file) print(f"✅ Loaded UMAP embedding from {embedding_file}: {embedding.shape}") # Load data names data_names = np.load(labels_file, allow_pickle=True).tolist() print(f"✅ Loaded {len(data_names)} data names from {labels_file}") # Load dataset labels if dataset_labels_file and os.path.exists(dataset_labels_file): dataset_labels = np.load(dataset_labels_file, allow_pickle=True).tolist() print(f"✅ Loaded dataset labels from {dataset_labels_file}: {len(dataset_labels)} labels") else: dataset_labels = ["Unknown"] * len(data_names) print("⚠️ No dataset labels found, using 'Unknown'") # Load features features = np.load(features_file) print(f"✅ Loaded features from {features_file}: {features.shape}") return embedding, data_names, dataset_labels, features except FileNotFoundError as e: print(f"❌ Error loading data: {e}") print("Please run the PhysicsNeMo analysis first!") return None, None, None, None, None except Exception as e: print(f"❌ Unexpected error: {e}") return None, None, None, None, None def load_umap_components() -> Tuple[Any, Any, Any]: """Load saved UMAP reducer, scaler, and HDBSCAN clusterer components.""" try: import pickle import glob # Try to find the saved components with dynamic naming possible_reducer_files = [ "files/umap_reducer.pkl", "files/combined_umap_reducer.pkl", ] possible_scaler_files = [ "files/scaler.pkl", "files/combined_scaler.pkl", ] possible_clusterer_files = [ "files/hdbscan_clusterer.pkl", "files/combined_hdbscan_clusterer.pkl", ] # Also check for single dataset files single_dataset_reducer = glob.glob("files/*_umap_reducer.pkl") single_dataset_scaler = glob.glob("files/*_scaler.pkl") single_dataset_clusterer = glob.glob("files/*_hdbscan_clusterer.pkl") # Add single dataset files to the search lists possible_reducer_files.extend(single_dataset_reducer) possible_scaler_files.extend(single_dataset_scaler) possible_clusterer_files.extend(single_dataset_clusterer) # Find the first existing file for each type reducer_file = None for file in possible_reducer_files: if os.path.exists(file): reducer_file = file break scaler_file = None for file in possible_scaler_files: if os.path.exists(file): scaler_file = file break clusterer_file = None for file in possible_clusterer_files: if os.path.exists(file): clusterer_file = file break if not all([reducer_file, scaler_file]): raise FileNotFoundError("Required UMAP component files not found") # Load UMAP reducer with open(reducer_file, 'rb') as f: umap_reducer = pickle.load(f) print(f"✅ Loaded UMAP reducer from {reducer_file}") # Load scaler with open(scaler_file, 'rb') as f: scaler = pickle.load(f) print(f"✅ Loaded scaler from {scaler_file}") # Load HDBSCAN clusterer (optional) clusterer = None if clusterer_file: with open(clusterer_file, 'rb') as f: clusterer = pickle.load(f) print(f"✅ Loaded HDBSCAN clusterer from {clusterer_file}") else: print("⚠️ No HDBSCAN clusterer found, will use distance-based confidence scoring") return umap_reducer, scaler, clusterer except FileNotFoundError as e: print(f"❌ Error loading UMAP components: {e}") print("Please run the PhysicsNeMo analysis first to generate the components!") return None, None, None except Exception as e: print(f"❌ Unexpected error loading UMAP components: {e}") return None, None, None def load_config() -> Dict[str, Any]: """Load configuration from YAML file.""" try: import yaml with open("config.yaml", 'r') as f: config = yaml.safe_load(f) return config except Exception as e: print(f"⚠️ Could not load config: {e}") return {} def extract_geometry_features(mesh) -> np.ndarray: """Extract features from uploaded geometry for comparison with UMAP data.""" try: features = {} # Bounding box bounds = mesh.bounds x_length = bounds[1] - bounds[0] y_length = bounds[3] - bounds[2] z_length = bounds[5] - bounds[4] features['x_length'] = x_length features['y_length'] = y_length features['z_length'] = z_length # Aspect ratios features['aspect_ratio_xy'] = x_length / y_length if y_length > 0 else 0 features['aspect_ratio_xz'] = x_length / z_length if z_length > 0 else 0 features['aspect_ratio_yz'] = y_length / z_length if z_length > 0 else 0 print(f"✅ Aspect ratios: {features['aspect_ratio_xy']}, {features['aspect_ratio_xz']}, {features['aspect_ratio_yz']}") # Cennter the points centered_points = mesh.points - np.mean(mesh.points, axis=0) print(f"✅ Centered points: {centered_points}") # Compute covariance matrix cov_matrix = np.cov(centered_points.T) # Eigenvalues and eigenvectors eigenvalues, eigenvectors = np.linalg.eig(cov_matrix) eigenvalues = np.sort(eigenvalues)[::-1] print(f"✅ Eigenvalues: {eigenvalues}") if len(eigenvalues) >= 3 and eigenvalues[0] > 0: # Shape descriptors features['linearity'] = (eigenvalues[0] - eigenvalues[1]) / eigenvalues[0] features['planarity'] = (eigenvalues[1] - eigenvalues[2]) / eigenvalues[0] features['sphericity'] = eigenvalues[2] / eigenvalues[0] features['anisotropy'] = (eigenvalues[0] - eigenvalues[2]) / eigenvalues[0] features['omnivariance'] = (eigenvalues[0] * eigenvalues[1] * eigenvalues[2])**(1/3) # Eigenentropy normalized_eigenvals = eigenvalues / np.sum(eigenvalues) features['eigenentropy'] = -np.sum(normalized_eigenvals * np.log(normalized_eigenvals + 1e-10)) # Change of curvature features['change_of_curvature'] = eigenvalues[2] / (eigenvalues[0] + eigenvalues[1] + eigenvalues[2]) # Add PCA eigenvalues (PhysicsNeMo style) features['pca_eigenvalue_1'] = eigenvalues[0] features['pca_eigenvalue_2'] = eigenvalues[1] features['pca_eigenvalue_3'] = eigenvalues[2] print(f"✅ PCA eigenvalues: {features['pca_eigenvalue_1']}, {features['pca_eigenvalue_2']}, {features['pca_eigenvalue_3']}") return np.array(features) except Exception as e: print(f"❌ Error extracting features: {e}") # Return default features return np.zeros(17) def compute_confidence_score(uploaded_features: np.ndarray, reference_features: np.ndarray, embedding: np.ndarray, scaler: Any, umap_reducer: Any, hdbscan_clusterer: Any = None, k: int = 5) -> Tuple[float, int]: """ Compute confidence score for uploaded geometry using HDBSCAN density-based probabilities or UMAP embedding distances. Args: uploaded_features: Features of uploaded geometry reference_features: Reference features from UMAP data (training data) embedding: UMAP embedding coordinates of training data scaler: Pre-fitted scaler from training umap_reducer: Pre-fitted UMAP reducer from training hdbscan_clusterer: Pre-fitted HDBSCAN clusterer from training (optional) k: Number of nearest neighbors to consider (fallback method) Returns: Tuple of (confidence_score, nearest_neighbor_index) """ try: uploaded_features = np.array(list(uploaded_features.item().values()), dtype=float) # Transform uploaded features to UMAP space using the trained components uploaded_features_scaled = scaler.transform(uploaded_features.reshape(1, -1)) uploaded_embedding = umap_reducer.transform(uploaded_features_scaled) # Use HDBSCAN density-based confidence if clusterer is available if hdbscan_clusterer is not None and HDBSCAN_AVAILABLE: # Use HDBSCAN approximate prediction for density-based confidence pred_labels, pred_probs = prediction.approximate_predict(hdbscan_clusterer, uploaded_embedding) confidence_score = pred_probs[0] else: print("⚠️ HDBSCAN clusterer not available") return confidence_score except Exception as e: print(f"❌ Error computing confidence score: {e}") return 0.0, 0 def predict_umap_position(uploaded_features: np.ndarray, reference_features: np.ndarray, embedding: np.ndarray, scaler: Any, umap_reducer: Any, k: int = 5) -> Tuple[float, float]: """ Predict UMAP position for uploaded geometry using the trained UMAP reducer. Args: uploaded_features: Features of uploaded geometry reference_features: Reference features from UMAP data embedding: UMAP embedding coordinates scaler: Pre-fitted scaler from training umap_reducer: Pre-fitted UMAP reducer from training k: Number of nearest neighbors to consider (fallback method) Returns: Tuple of (predicted_x, predicted_y) coordinates """ # try: uploaded_features = np.array(list(uploaded_features.item().values()), dtype=float) # Use the pre-fitted scaler from training uploaded_features_scaled = scaler.transform(uploaded_features.reshape(1, -1)) # Use the trained UMAP reducer to transform the uploaded features predicted_embedding = umap_reducer.transform(uploaded_features_scaled) predicted_x = predicted_embedding[0, 0] predicted_y = predicted_embedding[0, 1] return predicted_x, predicted_y def create_interactive_umap_plot(embedding: np.ndarray, data_names: List[str], dataset_labels: List[str], config: Dict[str, Any], highlighted_index: Optional[int] = None, uploaded_position: Optional[Tuple[float, float]] = None, uploaded_name: str = "Uploaded Geometry") -> go.Figure: """Create interactive UMAP plot with dataset colors and highlighting.""" # Create hover text hover_text = [] for i, (name, dataset) in enumerate(zip(data_names, dataset_labels)): hover_text.append( f"{name}
" f"Dataset: {dataset}
" f"UMAP: ({embedding[i, 0]:.3f}, {embedding[i, 1]:.3f})
" f"Click to view 3D geometry" ) # Create figure fig = go.Figure() # Get unique datasets and their colors from config unique_datasets = list(set(dataset_labels)) # Create dataset to color mapping from config dataset_color_map = {} data_folders = config.get('data', {}).get('data_folders', []) for folder in data_folders: dataset_color_map[folder['label']] = folder['color'] # Default colors if not in config default_colors = ['blue', 'red', 'green', 'orange', 'purple', 'brown', 'pink', 'gray', 'olive', 'cyan'] # Plot each dataset separately for dataset_idx, dataset in enumerate(unique_datasets): dataset_mask = np.array(dataset_labels) == dataset dataset_indices = np.where(dataset_mask)[0] # Get color from config or use default color = dataset_color_map.get(dataset, default_colors[dataset_idx % len(default_colors)]) # Determine marker size and opacity marker_size = 10 marker_opacity = 0.8 # Highlight specific point if requested if highlighted_index is not None and highlighted_index in dataset_indices: marker_size = 15 marker_opacity = 1.0 color = 'yellow' # Highlight in yellow fig.add_trace(go.Scatter( x=embedding[dataset_mask, 0], y=embedding[dataset_mask, 1], mode='markers', marker=dict( size=marker_size, color=color, line=dict(width=1, color='black'), opacity=marker_opacity ), name=dataset, hovertemplate=[hover_text[i] for i in dataset_indices], customdata=dataset_indices, showlegend=True )) # Add uploaded geometry position if provided if uploaded_position is not None: fig.add_trace(go.Scatter( x=[uploaded_position[0]], y=[uploaded_position[1]], mode='markers', marker=dict( size=20, color='red', symbol='star', line=dict(width=2, color='black'), opacity=1.0 ), name=uploaded_name, hovertemplate=f"{uploaded_name}
" f"Predicted Position: ({uploaded_position[0]:.3f}, {uploaded_position[1]:.3f})
" f"Uploaded Geometry", showlegend=True )) # Update layout fig.update_layout( title=dict( text="Interactive UMAP Visualization with Upload Support", x=0.5, font=dict(size=18, color='darkblue') ), xaxis_title="UMAP Component 1", yaxis_title="UMAP Component 2", width=600, height=500, hovermode='closest', showlegend=True, legend=dict( orientation="v", yanchor="top", y=1, xanchor="left", x=1.02 ) ) return fig def load_vtp_file(vtp_file: str) -> Optional[pv.DataSet]: """Load VTP file and return PyVista dataset.""" try: if not os.path.exists(vtp_file): print(f"❌ File not found: {vtp_file}") return None mesh = pv.read(vtp_file) print(f"✅ Loaded mesh: {mesh.n_points} points, {mesh.n_cells} cells") return mesh except Exception as e: print(f"❌ Error loading VTP file: {e}") return None def display_data_geometry(mesh: pv.DataSet, data_name: str) -> Optional[go.Figure]: """Display 3D geometry using Plotly.""" try: # Convert PyVista mesh to Plotly fig = go.Figure() # Get points and faces points = mesh.points # Sample points if too many for performance if len(points) > 20000: indices = np.random.choice(len(points), 20000, replace=False) points = points[indices] fig = go.Figure(data=[go.Scatter3d( x=points[:, 0], y=points[:, 1], z=points[:, 2], mode='markers', marker=dict( size=3, color=points[:, 2], colorscale='Viridis', opacity=0.8, line=dict(width=0.2, color='rgba(0,0,0,0.3)'), showscale=True, colorbar=dict( title="Height (Z)" ) ), name='Point Cloud', hovertemplate='3D Point
' + 'X: %{x:.2f}
' + 'Y: %{y:.2f}
' + 'Z: %{z:.2f}
' + '' )]) fig.update_layout( title=dict( text=f"3D Geometry: {data_name}", x=0.5, font=dict(size=16, color='darkblue') ), scene=dict( xaxis=dict( title='X', gridcolor='lightgray', showbackground=True, backgroundcolor='rgba(240,240,240,0.1)' ), yaxis=dict( title='Y', gridcolor='lightgray', showbackground=True, backgroundcolor='rgba(240,240,240,0.1)' ), zaxis=dict( title='Z', gridcolor='lightgray', showbackground=True, backgroundcolor='rgba(240,240,240,0.1)' ), camera=dict( eye=dict(x=1.8, y=1.8, z=1.2), center=dict(x=0, y=0, z=0), up=dict(x=0, y=0, z=1) ), aspectmode='data', bgcolor='white' ), width=600, height=500, margin=dict(l=20, r=20, t=60, b=20), paper_bgcolor='white', plot_bgcolor='white' ) return fig except Exception as e: print(f"❌ Error creating 3D visualization: {e}") return None def process_uploaded_geometry(uploaded_file, embedding, data_names, dataset_labels, features, config, scaler, umap_reducer, hdbscan_clusterer): """Process uploaded geometry and return confidence score and updated plot.""" if uploaded_file is None: return None, "No file uploaded", None try: print(f"📁 Processing uploaded file: {uploaded_file.name}") # Load the uploaded geometry mesh = pv.read(uploaded_file.name) print(f"✅ Loaded mesh: {mesh.n_points} points, {mesh.n_cells} cells") # Extract features uploaded_features = extract_geometry_features(mesh) # Compute confidence score using the trained scaler, UMAP reducer, and HDBSCAN clusterer confidence_score = compute_confidence_score( uploaded_features, features, embedding, scaler, umap_reducer, hdbscan_clusterer, k=4 ) # Predict UMAP position using the trained UMAP reducer predicted_x, predicted_y = predict_umap_position( uploaded_features, features, embedding, scaler, umap_reducer, k=4 ) # Create confidence report confidence_report = f""" **🎯 Confidence Analysis Results** **📊 Confidence Score: {float(confidence_score):.3f}** """ # Create updated plot with uploaded geometry highlighted fig = create_interactive_umap_plot( embedding, data_names, dataset_labels, config, highlighted_index=None, uploaded_position=(predicted_x, predicted_y), uploaded_name="Your Upload" ) # Create 3D visualization of uploaded geometry fig_3d = display_data_geometry(mesh, "Uploaded Geometry") return fig, confidence_report, fig_3d except Exception as e: error_msg = f"❌ Error processing uploaded geometry: {str(e)}" print(error_msg) return None, error_msg, None def create_gradio_interface(): """Create the enhanced Gradio interface.""" # Load data embedding, data_names, dataset_labels, features = load_umap_data() if embedding is None: print("❌ Failed to load UMAP data") return None # Load UMAP components (reducer, scaler, and HDBSCAN clusterer) umap_reducer, scaler, hdbscan_clusterer = load_umap_components() if umap_reducer is None or scaler is None: print("❌ Failed to load UMAP components") return None # Load config config = load_config() # Create initial UMAP plot umap_fig = create_interactive_umap_plot(embedding, data_names, dataset_labels, config) def update_geometry_from_selection(selected_data): """Update geometry display when selection changes.""" if selected_data is None or selected_data not in data_names: return None, umap_fig selected_index = data_names.index(selected_data) # Find VTP file for selected data vtp_file = None for dataset in set(dataset_labels): dataset_path = f"/raid/ansysai/udbhav/alpha_Xdata/data_prep_transformer/{dataset}/1_VTK_surface" potential_file = os.path.join(dataset_path, selected_data, f"{selected_data}.vtp") if os.path.exists(potential_file): vtp_file = potential_file break if vtp_file is None: return None, umap_fig # Load the mesh mesh = load_vtp_file(vtp_file) if mesh is None: return None, umap_fig # Create 3D visualization fig_3d = display_data_geometry(mesh, selected_data) if fig_3d is None: return None, umap_fig # Create updated UMAP plot with highlighted point fig_umap = create_interactive_umap_plot( embedding, data_names, dataset_labels, config, highlighted_index=selected_index ) return fig_3d, fig_umap # Create the interface with gr.Blocks(title="📊 Enhanced UMAP Visualization Demo", theme=gr.themes.Soft()) as demo: gr.Markdown("# 📊 Enhanced Interactive UMAP Visualization Demo") gr.Markdown("Upload any 3D geometry to get confidence scores and see where it fits in the UMAP space!") with gr.Row(): with gr.Column(scale=1): # UMAP Visualization gr.Markdown("## 📊 UMAP Embedding Visualization") umap_plot = gr.Plot(umap_fig, label="UMAP Plot") # Upload section gr.Markdown("## 📁 Upload Your Geometry") uploaded_file = gr.File( label="Upload 3D Geometry", file_types=[".vtp", ".stl", ".vtk", ".obj", ".ply", ".glb", ".vtu"], type="filepath" ) upload_btn = gr.Button("🔍 Analyze Uploaded Geometry", variant="primary") # Confidence results confidence_output = gr.Markdown(label="Confidence Analysis") with gr.Column(scale=1): # 3D Geometry Viewer gr.Markdown("## 🎯 3D Geometry Viewer") # Quick selection input gr.Markdown("### Quick Selection") selection_dropdown = gr.Dropdown( choices=data_names, label="Select Data Item", info="Type to search or select from dropdown", allow_custom_value=True, value=None ) geometry_plot = gr.Plot(label="3D Geometry") # Uploaded geometry viewer gr.Markdown("### Uploaded Geometry") uploaded_geometry_plot = gr.Plot(label="Uploaded 3D Geometry") # Event handlers selection_dropdown.change( fn=update_geometry_from_selection, inputs=[selection_dropdown], outputs=[geometry_plot, umap_plot] ) upload_btn.click( fn=process_uploaded_geometry, inputs=[uploaded_file, gr.State(embedding), gr.State(data_names), gr.State(dataset_labels), gr.State(features), gr.State(config), gr.State(scaler), gr.State(umap_reducer), gr.State(hdbscan_clusterer)], outputs=[umap_plot, confidence_output, uploaded_geometry_plot] ) return demo def main(): """Main function to launch the enhanced Gradio demo.""" print("🚀 Launching Enhanced Gradio UMAP Visualization Demo...") # Check if we're in the right directory if not os.path.exists("config.yaml"): print("❌ Error: config.yaml not found. Please run this script from the umap directory.") return 1 # Check for required data files import glob embedding_files = glob.glob("files/*_umap_embedding.npy") + glob.glob("files/umap_embedding.npy") features_files = glob.glob("files/*_features.npy") + glob.glob("files/features.npy") labels_files = glob.glob("files/*_names.npy") + glob.glob("files/names.npy") reducer_files = glob.glob("files/*_umap_reducer.pkl") + glob.glob("files/umap_reducer.pkl") scaler_files = glob.glob("files/*_scaler.pkl") + glob.glob("files/scaler.pkl") clusterer_files = glob.glob("files/*_hdbscan_clusterer.pkl") + glob.glob("files/hdbscan_clusterer.pkl") if not embedding_files or not features_files or not labels_files: print("❌ Error: Required UMAP data files not found") print("Please run the PhysicsNeMo analysis first using: python run_umap.py") return 1 if not reducer_files or not scaler_files: print("❌ Error: Required UMAP component files not found") print("Please run the PhysicsNeMo analysis first using: python run_umap.py") return 1 if not clusterer_files: print("⚠️ Warning: HDBSCAN clusterer files not found") print("The demo will use distance-based confidence scoring instead of HDBSCAN density-based scoring") print("To enable HDBSCAN clustering, run: python run_umap.py") try: # Create and launch the interface demo = create_gradio_interface() if demo is None: return 1 demo.launch( server_name="0.0.0.0", server_port=7868, # Different port to avoid conflicts share=False, show_error=True, quiet=False ) return 0 except Exception as e: print(f"❌ Error launching demo: {e}") return 1 if __name__ == "__main__": sys.exit(main())