#!/usr/bin/env python3
"""
Enhanced Gradio Demo for UMAP Visualization and 3D Geometry Viewer
Adds functionality to upload any geometry, get confidence score, and highlight in plot
"""
import os
import sys
import numpy as np
import pandas as pd
import tempfile
import plotly.graph_objects as go
import plotly.express as px
from pathlib import Path
import vtk
import pyvista as pv
from typing import List, Tuple, Optional, Dict, Any
import gradio as gr
import json
import time
from scipy.spatial import cKDTree
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
# HDBSCAN imports
try:
import hdbscan
from hdbscan import prediction
HDBSCAN_AVAILABLE = True
except ImportError:
HDBSCAN_AVAILABLE = False
print("⚠️ HDBSCAN not available, will use distance-based confidence scoring")
# Set PyVista to use offscreen rendering
pv.set_plot_theme("document")
def load_umap_data() -> Tuple[np.ndarray, List[str], List[str], np.ndarray, List[str]]:
"""Load UMAP analysis results with dynamic file names."""
try:
# Try to find the files with dynamic naming in files/ directory
possible_embedding_files = [
"files/umap_embedding.npy",
"files/combined_umap_embedding.npy",
]
possible_labels_files = [
"files/names.npy",
"files/combined_names.npy",
]
possible_features_files = [
"files/features.npy",
"files/combined_features.npy",
]
possible_dataset_labels_files = [
"files/labels_dataset_labels.npy",
"files/combined_dataset_labels.npy",
]
# Also check for single dataset files
import glob
single_dataset_embedding = glob.glob("files/*_umap_embedding.npy")
single_dataset_labels = glob.glob("files/*_names.npy")
single_dataset_features = glob.glob("files/*_features.npy")
single_dataset_dataset_labels = glob.glob("files/*_dataset_labels.npy")
# Add single dataset files to the search lists
possible_embedding_files.extend(single_dataset_embedding)
possible_labels_files.extend(single_dataset_labels)
possible_features_files.extend(single_dataset_features)
possible_dataset_labels_files.extend(single_dataset_dataset_labels)
# Find the first existing file for each type
embedding_file = None
for file in possible_embedding_files:
if os.path.exists(file):
embedding_file = file
break
labels_file = None
for file in possible_labels_files:
if os.path.exists(file):
labels_file = file
break
features_file = None
for file in possible_features_files:
if os.path.exists(file):
features_file = file
break
dataset_labels_file = None
for file in possible_dataset_labels_files:
if os.path.exists(file):
dataset_labels_file = file
break
if not all([embedding_file, labels_file, features_file]):
raise FileNotFoundError("Required UMAP data files not found")
# Load embedding coordinates
embedding = np.load(embedding_file)
print(f"✅ Loaded UMAP embedding from {embedding_file}: {embedding.shape}")
# Load data names
data_names = np.load(labels_file, allow_pickle=True).tolist()
print(f"✅ Loaded {len(data_names)} data names from {labels_file}")
# Load dataset labels
if dataset_labels_file and os.path.exists(dataset_labels_file):
dataset_labels = np.load(dataset_labels_file, allow_pickle=True).tolist()
print(f"✅ Loaded dataset labels from {dataset_labels_file}: {len(dataset_labels)} labels")
else:
dataset_labels = ["Unknown"] * len(data_names)
print("⚠️ No dataset labels found, using 'Unknown'")
# Load features
features = np.load(features_file)
print(f"✅ Loaded features from {features_file}: {features.shape}")
return embedding, data_names, dataset_labels, features
except FileNotFoundError as e:
print(f"❌ Error loading data: {e}")
print("Please run the PhysicsNeMo analysis first!")
return None, None, None, None, None
except Exception as e:
print(f"❌ Unexpected error: {e}")
return None, None, None, None, None
def load_umap_components() -> Tuple[Any, Any, Any]:
"""Load saved UMAP reducer, scaler, and HDBSCAN clusterer components."""
try:
import pickle
import glob
# Try to find the saved components with dynamic naming
possible_reducer_files = [
"files/umap_reducer.pkl",
"files/combined_umap_reducer.pkl",
]
possible_scaler_files = [
"files/scaler.pkl",
"files/combined_scaler.pkl",
]
possible_clusterer_files = [
"files/hdbscan_clusterer.pkl",
"files/combined_hdbscan_clusterer.pkl",
]
# Also check for single dataset files
single_dataset_reducer = glob.glob("files/*_umap_reducer.pkl")
single_dataset_scaler = glob.glob("files/*_scaler.pkl")
single_dataset_clusterer = glob.glob("files/*_hdbscan_clusterer.pkl")
# Add single dataset files to the search lists
possible_reducer_files.extend(single_dataset_reducer)
possible_scaler_files.extend(single_dataset_scaler)
possible_clusterer_files.extend(single_dataset_clusterer)
# Find the first existing file for each type
reducer_file = None
for file in possible_reducer_files:
if os.path.exists(file):
reducer_file = file
break
scaler_file = None
for file in possible_scaler_files:
if os.path.exists(file):
scaler_file = file
break
clusterer_file = None
for file in possible_clusterer_files:
if os.path.exists(file):
clusterer_file = file
break
if not all([reducer_file, scaler_file]):
raise FileNotFoundError("Required UMAP component files not found")
# Load UMAP reducer
with open(reducer_file, 'rb') as f:
umap_reducer = pickle.load(f)
print(f"✅ Loaded UMAP reducer from {reducer_file}")
# Load scaler
with open(scaler_file, 'rb') as f:
scaler = pickle.load(f)
print(f"✅ Loaded scaler from {scaler_file}")
# Load HDBSCAN clusterer (optional)
clusterer = None
if clusterer_file:
with open(clusterer_file, 'rb') as f:
clusterer = pickle.load(f)
print(f"✅ Loaded HDBSCAN clusterer from {clusterer_file}")
else:
print("⚠️ No HDBSCAN clusterer found, will use distance-based confidence scoring")
return umap_reducer, scaler, clusterer
except FileNotFoundError as e:
print(f"❌ Error loading UMAP components: {e}")
print("Please run the PhysicsNeMo analysis first to generate the components!")
return None, None, None
except Exception as e:
print(f"❌ Unexpected error loading UMAP components: {e}")
return None, None, None
def load_config() -> Dict[str, Any]:
"""Load configuration from YAML file."""
try:
import yaml
with open("config.yaml", 'r') as f:
config = yaml.safe_load(f)
return config
except Exception as e:
print(f"⚠️ Could not load config: {e}")
return {}
def extract_geometry_features(mesh) -> np.ndarray:
"""Extract features from uploaded geometry for comparison with UMAP data."""
try:
features = {}
# Bounding box
bounds = mesh.bounds
x_length = bounds[1] - bounds[0]
y_length = bounds[3] - bounds[2]
z_length = bounds[5] - bounds[4]
features['x_length'] = x_length
features['y_length'] = y_length
features['z_length'] = z_length
# Aspect ratios
features['aspect_ratio_xy'] = x_length / y_length if y_length > 0 else 0
features['aspect_ratio_xz'] = x_length / z_length if z_length > 0 else 0
features['aspect_ratio_yz'] = y_length / z_length if z_length > 0 else 0
print(f"✅ Aspect ratios: {features['aspect_ratio_xy']}, {features['aspect_ratio_xz']}, {features['aspect_ratio_yz']}")
# Cennter the points
centered_points = mesh.points - np.mean(mesh.points, axis=0)
print(f"✅ Centered points: {centered_points}")
# Compute covariance matrix
cov_matrix = np.cov(centered_points.T)
# Eigenvalues and eigenvectors
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)
eigenvalues = np.sort(eigenvalues)[::-1]
print(f"✅ Eigenvalues: {eigenvalues}")
if len(eigenvalues) >= 3 and eigenvalues[0] > 0:
# Shape descriptors
features['linearity'] = (eigenvalues[0] - eigenvalues[1]) / eigenvalues[0]
features['planarity'] = (eigenvalues[1] - eigenvalues[2]) / eigenvalues[0]
features['sphericity'] = eigenvalues[2] / eigenvalues[0]
features['anisotropy'] = (eigenvalues[0] - eigenvalues[2]) / eigenvalues[0]
features['omnivariance'] = (eigenvalues[0] * eigenvalues[1] * eigenvalues[2])**(1/3)
# Eigenentropy
normalized_eigenvals = eigenvalues / np.sum(eigenvalues)
features['eigenentropy'] = -np.sum(normalized_eigenvals * np.log(normalized_eigenvals + 1e-10))
# Change of curvature
features['change_of_curvature'] = eigenvalues[2] / (eigenvalues[0] + eigenvalues[1] + eigenvalues[2])
# Add PCA eigenvalues (PhysicsNeMo style)
features['pca_eigenvalue_1'] = eigenvalues[0]
features['pca_eigenvalue_2'] = eigenvalues[1]
features['pca_eigenvalue_3'] = eigenvalues[2]
print(f"✅ PCA eigenvalues: {features['pca_eigenvalue_1']}, {features['pca_eigenvalue_2']}, {features['pca_eigenvalue_3']}")
return np.array(features)
except Exception as e:
print(f"❌ Error extracting features: {e}")
# Return default features
return np.zeros(17)
def compute_confidence_score(uploaded_features: np.ndarray, reference_features: np.ndarray,
embedding: np.ndarray, scaler: Any, umap_reducer: Any,
hdbscan_clusterer: Any = None, k: int = 5) -> Tuple[float, int]:
"""
Compute confidence score for uploaded geometry using HDBSCAN density-based probabilities or UMAP embedding distances.
Args:
uploaded_features: Features of uploaded geometry
reference_features: Reference features from UMAP data (training data)
embedding: UMAP embedding coordinates of training data
scaler: Pre-fitted scaler from training
umap_reducer: Pre-fitted UMAP reducer from training
hdbscan_clusterer: Pre-fitted HDBSCAN clusterer from training (optional)
k: Number of nearest neighbors to consider (fallback method)
Returns:
Tuple of (confidence_score, nearest_neighbor_index)
"""
try:
uploaded_features = np.array(list(uploaded_features.item().values()), dtype=float)
# Transform uploaded features to UMAP space using the trained components
uploaded_features_scaled = scaler.transform(uploaded_features.reshape(1, -1))
uploaded_embedding = umap_reducer.transform(uploaded_features_scaled)
# Use HDBSCAN density-based confidence if clusterer is available
if hdbscan_clusterer is not None and HDBSCAN_AVAILABLE:
# Use HDBSCAN approximate prediction for density-based confidence
pred_labels, pred_probs = prediction.approximate_predict(hdbscan_clusterer, uploaded_embedding)
confidence_score = pred_probs[0]
else:
print("⚠️ HDBSCAN clusterer not available")
return confidence_score
except Exception as e:
print(f"❌ Error computing confidence score: {e}")
return 0.0, 0
def predict_umap_position(uploaded_features: np.ndarray, reference_features: np.ndarray,
embedding: np.ndarray, scaler: Any, umap_reducer: Any, k: int = 5) -> Tuple[float, float]:
"""
Predict UMAP position for uploaded geometry using the trained UMAP reducer.
Args:
uploaded_features: Features of uploaded geometry
reference_features: Reference features from UMAP data
embedding: UMAP embedding coordinates
scaler: Pre-fitted scaler from training
umap_reducer: Pre-fitted UMAP reducer from training
k: Number of nearest neighbors to consider (fallback method)
Returns:
Tuple of (predicted_x, predicted_y) coordinates
"""
# try:
uploaded_features = np.array(list(uploaded_features.item().values()), dtype=float)
# Use the pre-fitted scaler from training
uploaded_features_scaled = scaler.transform(uploaded_features.reshape(1, -1))
# Use the trained UMAP reducer to transform the uploaded features
predicted_embedding = umap_reducer.transform(uploaded_features_scaled)
predicted_x = predicted_embedding[0, 0]
predicted_y = predicted_embedding[0, 1]
return predicted_x, predicted_y
def create_interactive_umap_plot(embedding: np.ndarray, data_names: List[str],
dataset_labels: List[str], config: Dict[str, Any],
highlighted_index: Optional[int] = None,
uploaded_position: Optional[Tuple[float, float]] = None,
uploaded_name: str = "Uploaded Geometry") -> go.Figure:
"""Create interactive UMAP plot with dataset colors and highlighting."""
# Create hover text
hover_text = []
for i, (name, dataset) in enumerate(zip(data_names, dataset_labels)):
hover_text.append(
f"{name}
"
f"Dataset: {dataset}
"
f"UMAP: ({embedding[i, 0]:.3f}, {embedding[i, 1]:.3f})
"
f"Click to view 3D geometry"
)
# Create figure
fig = go.Figure()
# Get unique datasets and their colors from config
unique_datasets = list(set(dataset_labels))
# Create dataset to color mapping from config
dataset_color_map = {}
data_folders = config.get('data', {}).get('data_folders', [])
for folder in data_folders:
dataset_color_map[folder['label']] = folder['color']
# Default colors if not in config
default_colors = ['blue', 'red', 'green', 'orange', 'purple', 'brown', 'pink', 'gray', 'olive', 'cyan']
# Plot each dataset separately
for dataset_idx, dataset in enumerate(unique_datasets):
dataset_mask = np.array(dataset_labels) == dataset
dataset_indices = np.where(dataset_mask)[0]
# Get color from config or use default
color = dataset_color_map.get(dataset, default_colors[dataset_idx % len(default_colors)])
# Determine marker size and opacity
marker_size = 10
marker_opacity = 0.8
# Highlight specific point if requested
if highlighted_index is not None and highlighted_index in dataset_indices:
marker_size = 15
marker_opacity = 1.0
color = 'yellow' # Highlight in yellow
fig.add_trace(go.Scatter(
x=embedding[dataset_mask, 0],
y=embedding[dataset_mask, 1],
mode='markers',
marker=dict(
size=marker_size,
color=color,
line=dict(width=1, color='black'),
opacity=marker_opacity
),
name=dataset,
hovertemplate=[hover_text[i] for i in dataset_indices],
customdata=dataset_indices,
showlegend=True
))
# Add uploaded geometry position if provided
if uploaded_position is not None:
fig.add_trace(go.Scatter(
x=[uploaded_position[0]],
y=[uploaded_position[1]],
mode='markers',
marker=dict(
size=20,
color='red',
symbol='star',
line=dict(width=2, color='black'),
opacity=1.0
),
name=uploaded_name,
hovertemplate=f"{uploaded_name}
"
f"Predicted Position: ({uploaded_position[0]:.3f}, {uploaded_position[1]:.3f})
"
f"Uploaded Geometry",
showlegend=True
))
# Update layout
fig.update_layout(
title=dict(
text="Interactive UMAP Visualization with Upload Support",
x=0.5,
font=dict(size=18, color='darkblue')
),
xaxis_title="UMAP Component 1",
yaxis_title="UMAP Component 2",
width=600,
height=500,
hovermode='closest',
showlegend=True,
legend=dict(
orientation="v",
yanchor="top",
y=1,
xanchor="left",
x=1.02
)
)
return fig
def load_vtp_file(vtp_file: str) -> Optional[pv.DataSet]:
"""Load VTP file and return PyVista dataset."""
try:
if not os.path.exists(vtp_file):
print(f"❌ File not found: {vtp_file}")
return None
mesh = pv.read(vtp_file)
print(f"✅ Loaded mesh: {mesh.n_points} points, {mesh.n_cells} cells")
return mesh
except Exception as e:
print(f"❌ Error loading VTP file: {e}")
return None
def display_data_geometry(mesh: pv.DataSet, data_name: str) -> Optional[go.Figure]:
"""Display 3D geometry using Plotly."""
try:
# Convert PyVista mesh to Plotly
fig = go.Figure()
# Get points and faces
points = mesh.points
# Sample points if too many for performance
if len(points) > 20000:
indices = np.random.choice(len(points), 20000, replace=False)
points = points[indices]
fig = go.Figure(data=[go.Scatter3d(
x=points[:, 0],
y=points[:, 1],
z=points[:, 2],
mode='markers',
marker=dict(
size=3,
color=points[:, 2],
colorscale='Viridis',
opacity=0.8,
line=dict(width=0.2, color='rgba(0,0,0,0.3)'),
showscale=True,
colorbar=dict(
title="Height (Z)"
)
),
name='Point Cloud',
hovertemplate='3D Point
' +
'X: %{x:.2f}
' +
'Y: %{y:.2f}
' +
'Z: %{z:.2f}
' +
''
)])
fig.update_layout(
title=dict(
text=f"3D Geometry: {data_name}",
x=0.5,
font=dict(size=16, color='darkblue')
),
scene=dict(
xaxis=dict(
title='X',
gridcolor='lightgray',
showbackground=True,
backgroundcolor='rgba(240,240,240,0.1)'
),
yaxis=dict(
title='Y',
gridcolor='lightgray',
showbackground=True,
backgroundcolor='rgba(240,240,240,0.1)'
),
zaxis=dict(
title='Z',
gridcolor='lightgray',
showbackground=True,
backgroundcolor='rgba(240,240,240,0.1)'
),
camera=dict(
eye=dict(x=1.8, y=1.8, z=1.2),
center=dict(x=0, y=0, z=0),
up=dict(x=0, y=0, z=1)
),
aspectmode='data',
bgcolor='white'
),
width=600,
height=500,
margin=dict(l=20, r=20, t=60, b=20),
paper_bgcolor='white',
plot_bgcolor='white'
)
return fig
except Exception as e:
print(f"❌ Error creating 3D visualization: {e}")
return None
def process_uploaded_geometry(uploaded_file, embedding, data_names, dataset_labels, features, config, scaler, umap_reducer, hdbscan_clusterer):
"""Process uploaded geometry and return confidence score and updated plot."""
if uploaded_file is None:
return None, "No file uploaded", None
try:
print(f"📁 Processing uploaded file: {uploaded_file.name}")
# Load the uploaded geometry
mesh = pv.read(uploaded_file.name)
print(f"✅ Loaded mesh: {mesh.n_points} points, {mesh.n_cells} cells")
# Extract features
uploaded_features = extract_geometry_features(mesh)
# Compute confidence score using the trained scaler, UMAP reducer, and HDBSCAN clusterer
confidence_score = compute_confidence_score(
uploaded_features, features, embedding, scaler, umap_reducer, hdbscan_clusterer, k=4
)
# Predict UMAP position using the trained UMAP reducer
predicted_x, predicted_y = predict_umap_position(
uploaded_features, features, embedding, scaler, umap_reducer, k=4
)
# Create confidence report
confidence_report = f"""
**🎯 Confidence Analysis Results**
**📊 Confidence Score: {float(confidence_score):.3f}**
"""
# Create updated plot with uploaded geometry highlighted
fig = create_interactive_umap_plot(
embedding, data_names, dataset_labels, config,
highlighted_index=None,
uploaded_position=(predicted_x, predicted_y),
uploaded_name="Your Upload"
)
# Create 3D visualization of uploaded geometry
fig_3d = display_data_geometry(mesh, "Uploaded Geometry")
return fig, confidence_report, fig_3d
except Exception as e:
error_msg = f"❌ Error processing uploaded geometry: {str(e)}"
print(error_msg)
return None, error_msg, None
def create_gradio_interface():
"""Create the enhanced Gradio interface."""
# Load data
embedding, data_names, dataset_labels, features = load_umap_data()
if embedding is None:
print("❌ Failed to load UMAP data")
return None
# Load UMAP components (reducer, scaler, and HDBSCAN clusterer)
umap_reducer, scaler, hdbscan_clusterer = load_umap_components()
if umap_reducer is None or scaler is None:
print("❌ Failed to load UMAP components")
return None
# Load config
config = load_config()
# Create initial UMAP plot
umap_fig = create_interactive_umap_plot(embedding, data_names, dataset_labels, config)
def update_geometry_from_selection(selected_data):
"""Update geometry display when selection changes."""
if selected_data is None or selected_data not in data_names:
return None, umap_fig
selected_index = data_names.index(selected_data)
# Find VTP file for selected data
vtp_file = None
for dataset in set(dataset_labels):
dataset_path = f"/raid/ansysai/udbhav/alpha_Xdata/data_prep_transformer/{dataset}/1_VTK_surface"
potential_file = os.path.join(dataset_path, selected_data, f"{selected_data}.vtp")
if os.path.exists(potential_file):
vtp_file = potential_file
break
if vtp_file is None:
return None, umap_fig
# Load the mesh
mesh = load_vtp_file(vtp_file)
if mesh is None:
return None, umap_fig
# Create 3D visualization
fig_3d = display_data_geometry(mesh, selected_data)
if fig_3d is None:
return None, umap_fig
# Create updated UMAP plot with highlighted point
fig_umap = create_interactive_umap_plot(
embedding, data_names, dataset_labels, config,
highlighted_index=selected_index
)
return fig_3d, fig_umap
# Create the interface
with gr.Blocks(title="📊 Enhanced UMAP Visualization Demo", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 📊 Enhanced Interactive UMAP Visualization Demo")
gr.Markdown("Upload any 3D geometry to get confidence scores and see where it fits in the UMAP space!")
with gr.Row():
with gr.Column(scale=1):
# UMAP Visualization
gr.Markdown("## 📊 UMAP Embedding Visualization")
umap_plot = gr.Plot(umap_fig, label="UMAP Plot")
# Upload section
gr.Markdown("## 📁 Upload Your Geometry")
uploaded_file = gr.File(
label="Upload 3D Geometry",
file_types=[".vtp", ".stl", ".vtk", ".obj", ".ply", ".glb", ".vtu"],
type="filepath"
)
upload_btn = gr.Button("🔍 Analyze Uploaded Geometry", variant="primary")
# Confidence results
confidence_output = gr.Markdown(label="Confidence Analysis")
with gr.Column(scale=1):
# 3D Geometry Viewer
gr.Markdown("## 🎯 3D Geometry Viewer")
# Quick selection input
gr.Markdown("### Quick Selection")
selection_dropdown = gr.Dropdown(
choices=data_names,
label="Select Data Item",
info="Type to search or select from dropdown",
allow_custom_value=True,
value=None
)
geometry_plot = gr.Plot(label="3D Geometry")
# Uploaded geometry viewer
gr.Markdown("### Uploaded Geometry")
uploaded_geometry_plot = gr.Plot(label="Uploaded 3D Geometry")
# Event handlers
selection_dropdown.change(
fn=update_geometry_from_selection,
inputs=[selection_dropdown],
outputs=[geometry_plot, umap_plot]
)
upload_btn.click(
fn=process_uploaded_geometry,
inputs=[uploaded_file, gr.State(embedding), gr.State(data_names),
gr.State(dataset_labels), gr.State(features), gr.State(config),
gr.State(scaler), gr.State(umap_reducer), gr.State(hdbscan_clusterer)],
outputs=[umap_plot, confidence_output, uploaded_geometry_plot]
)
return demo
def main():
"""Main function to launch the enhanced Gradio demo."""
print("🚀 Launching Enhanced Gradio UMAP Visualization Demo...")
# Check if we're in the right directory
if not os.path.exists("config.yaml"):
print("❌ Error: config.yaml not found. Please run this script from the umap directory.")
return 1
# Check for required data files
import glob
embedding_files = glob.glob("files/*_umap_embedding.npy") + glob.glob("files/umap_embedding.npy")
features_files = glob.glob("files/*_features.npy") + glob.glob("files/features.npy")
labels_files = glob.glob("files/*_names.npy") + glob.glob("files/names.npy")
reducer_files = glob.glob("files/*_umap_reducer.pkl") + glob.glob("files/umap_reducer.pkl")
scaler_files = glob.glob("files/*_scaler.pkl") + glob.glob("files/scaler.pkl")
clusterer_files = glob.glob("files/*_hdbscan_clusterer.pkl") + glob.glob("files/hdbscan_clusterer.pkl")
if not embedding_files or not features_files or not labels_files:
print("❌ Error: Required UMAP data files not found")
print("Please run the PhysicsNeMo analysis first using: python run_umap.py")
return 1
if not reducer_files or not scaler_files:
print("❌ Error: Required UMAP component files not found")
print("Please run the PhysicsNeMo analysis first using: python run_umap.py")
return 1
if not clusterer_files:
print("⚠️ Warning: HDBSCAN clusterer files not found")
print("The demo will use distance-based confidence scoring instead of HDBSCAN density-based scoring")
print("To enable HDBSCAN clustering, run: python run_umap.py")
try:
# Create and launch the interface
demo = create_gradio_interface()
if demo is None:
return 1
demo.launch(
server_name="0.0.0.0",
server_port=7868, # Different port to avoid conflicts
share=False,
show_error=True,
quiet=False
)
return 0
except Exception as e:
print(f"❌ Error launching demo: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())