udbhav
Recreate Trame_app branch with clean history
67fb03c
"""
PhysicsNeMo-Style Dataset Analysis
Following NVIDIA's PhysicsNeMo-Curator methodology for external aerodynamics analysis.
This module provides a structured approach to analyzing datasets with UMAP visualization.
"""
import os
import yaml
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
import multiprocessing as mp
from tqdm import tqdm
from dataclasses import dataclass
from abc import ABC, abstractmethod
# VTK imports
try:
import vtk
from vtk.util.numpy_support import vtk_to_numpy
except ImportError:
print("VTK not found. Please install it with: pip install vtk")
exit(1)
# UMAP import
try:
import umap
except ImportError:
print("UMAP not found. Please install it with: pip install umap-learn")
exit(1)
# Scikit-learn for preprocessing
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.neighbors import NearestNeighbors
# Clustering
try:
import cuml
from cuml.cluster import HDBSCAN
CUML_AVAILABLE = True
except ImportError:
try:
import hdbscan
from hdbscan import prediction
HDBSCAN_AVAILABLE = True
CUML_AVAILABLE = False
except ImportError:
HDBSCAN_AVAILABLE = False
CUML_AVAILABLE = False
@dataclass
class DataFolder:
"""Configuration for a single data folder."""
path: str
label: str
color: str = "blue"
train_path: str = ""
test_path: str = ""
@dataclass
class AnalysisConfig:
"""Configuration class for the analysis pipeline."""
data_folders: List[DataFolder]
file_pattern: str = "*.vtp"
max_files_per_folder: int = -1
use_multiprocessing: bool = True
num_workers: int = 4
# Feature extraction
include_pressure_stats: bool = False
max_points_for_analysis: int = 50000
sampling_method: str = "uniform"
subsampling_distance: float = 0.1
normalize_features: bool = True
scaling_method: str = "robust"
feature_selection: Dict[str, bool] = None
# UMAP parameters
n_components: int = 2
n_neighbors: int = 15
min_dist: float = 0.1
metric: str = "euclidean"
random_state: int = 42
n_epochs: int = 1000
learning_rate: float = 1.0
# Clustering parameters
clustering_algorithm: str = "hdbscan"
min_cluster_size: int = 10
cluster_selection_epsilon: float = 1.5
allow_single_cluster: bool = True
dbscan_eps: float = 0.5
dbscan_min_samples: int = 3
# Visualization
figsize: Tuple[int, int] = (12, 10)
point_size: int = 50
alpha: float = 0.7
colormap: str = "viridis"
show_labels: bool = False
save_plot: bool = True
output_path: str = "car_umap_visualization.png"
dpi: int = 300
# Output
save_embedding: bool = True
save_features: bool = True
save_labels: bool = True
@classmethod
def from_yaml(cls, config_path: str) -> 'AnalysisConfig':
"""Load configuration from YAML file."""
with open(config_path, 'r') as f:
config_data = yaml.safe_load(f)
# Extract data section
data = config_data.get('data', {})
features = config_data.get('features', {})
umap_config = config_data.get('umap', {})
clustering = config_data.get('clustering', {})
visualization = config_data.get('visualization', {})
output = config_data.get('output', {})
# Parse data folders
data_folders_list = data.get('data_folders', [])
if not data_folders_list:
# Fallback to single data folder for backward compatibility
single_folder = data.get('data_folder', '')
if single_folder:
data_folders_list = [{'path': single_folder, 'label': 'Dataset', 'color': 'blue', 'train_path': '', 'test_path': ''}]
data_folders = [DataFolder(**folder) for folder in data_folders_list]
return cls(
# Data parameters
data_folders=data_folders,
file_pattern=data.get('file_pattern', '*.vtp'),
max_files_per_folder=data.get('max_files_per_folder', -1),
use_multiprocessing=data.get('use_multiprocessing', True),
num_workers=data.get('num_workers', 4),
# Feature extraction parameters
include_pressure_stats=features.get('include_pressure_stats', False),
max_points_for_analysis=features.get('max_points_for_analysis', 50000),
sampling_method=features.get('sampling_method', 'uniform'),
subsampling_distance=features.get('subsampling_distance', 0.1),
normalize_features=features.get('normalize_features', True),
scaling_method=features.get('scaling_method', 'robust'),
feature_selection=features.get('feature_selection', {}),
# UMAP parameters
n_components=umap_config.get('n_components', 2),
n_neighbors=umap_config.get('n_neighbors', 15),
min_dist=umap_config.get('min_dist', 0.1),
metric=umap_config.get('metric', 'euclidean'),
random_state=umap_config.get('random_state', 42),
n_epochs=umap_config.get('n_epochs', 1000),
learning_rate=umap_config.get('learning_rate', 1.0),
# Clustering parameters
clustering_algorithm=clustering.get('algorithm', 'hdbscan'),
min_cluster_size=clustering.get('min_cluster_size', 10),
cluster_selection_epsilon=clustering.get('cluster_selection_epsilon', 1.5),
allow_single_cluster=clustering.get('allow_single_cluster', True),
dbscan_eps=clustering.get('eps', 0.5),
dbscan_min_samples=clustering.get('min_samples', 3),
# Visualization parameters
figsize=tuple(visualization.get('figsize', [12, 10])),
point_size=visualization.get('point_size', 50),
alpha=visualization.get('alpha', 0.7),
colormap=visualization.get('colormap', 'viridis'),
show_labels=visualization.get('show_labels', False),
save_plot=visualization.get('save_plot', True),
output_path=visualization.get('output_path', 'umap_visualization.png'),
dpi=visualization.get('dpi', 300),
# Output parameters
save_embedding=output.get('save_embedding', True),
save_features=output.get('save_features', True),
save_labels=output.get('save_labels', True)
)
class GeometryProcessor(ABC):
"""Abstract base class for geometry processing."""
@abstractmethod
def process(self, polydata: vtk.vtkPolyData) -> Dict[str, float]:
"""Process geometry and extract features."""
pass
class PointCloudProcessor(GeometryProcessor):
"""Point cloud feature extraction following PhysicsNeMo methodology."""
def __init__(self, config: AnalysisConfig):
self.config = config
def process(self, polydata: vtk.vtkPolyData) -> Dict[str, float]:
"""Extract point cloud features from VTK polydata."""
features = {}
# Get points
points = polydata.GetPoints()
num_points = points.GetNumberOfPoints()
if num_points == 0:
return features
# Convert to numpy array
points_array = vtk_to_numpy(points.GetData())
# Check if this is a mesh (has surface cells) or just a point cloud
has_surface_cells = self._has_surface_cells(polydata)
# If it's a mesh, subsample points from the surface
if has_surface_cells:
points_array = self._subsample_mesh_points(polydata, points_array)
num_points = len(points_array)
# Extract features
all_features = {}
all_features.update(self._extract_basic_features(points_array, num_points, polydata))
all_features.update(self._extract_shape_features(points_array))
# Filter features based on feature selection
if self.config.feature_selection:
features = {name: value for name, value in all_features.items()
if self.config.feature_selection.get(name, True)}
else:
features = all_features
return features
def _has_surface_cells(self, polydata: vtk.vtkPolyData) -> bool:
"""Check if polydata has surface cells."""
return (polydata.GetPolys().GetNumberOfCells() > 0 or
polydata.GetStrips().GetNumberOfCells() > 0)
def _subsample_mesh_points(self, polydata: vtk.vtkPolyData, points_array: np.ndarray) -> np.ndarray:
"""Subsample points from mesh surface."""
try:
sample_points = vtk.vtkPolyDataPointSampler()
sample_points.SetInputData(polydata)
sample_points.SetDistance(self.config.subsampling_distance)
sample_points.Update()
sampled_polydata = sample_points.GetOutput()
if sampled_polydata.GetNumberOfPoints() > 0:
return vtk_to_numpy(sampled_polydata.GetPoints().GetData())
except Exception as e:
print(f"Warning: Point sampling failed: {e}")
return points_array
def _extract_basic_features(self, points_array: np.ndarray, num_points: int, polydata: vtk.vtkPolyData) -> Dict[str, float]:
"""Extract basic geometric features."""
features = {}
# Bounding box
bounds = polydata.GetBounds()
x_length = bounds[1] - bounds[0]
y_length = bounds[3] - bounds[2]
z_length = bounds[5] - bounds[4]
features['x_length'] = x_length
features['y_length'] = y_length
features['z_length'] = z_length
# Aspect ratios
features['aspect_ratio_xy'] = x_length / y_length if y_length > 0 else 0
features['aspect_ratio_xz'] = x_length / z_length if z_length > 0 else 0
features['aspect_ratio_yz'] = y_length / z_length if z_length > 0 else 0
return features
def _extract_shape_features(self, points_array: np.ndarray) -> Dict[str, float]:
"""Extract shape features using PCA."""
features = {}
try:
# Center the points
centered_points = points_array - np.mean(points_array, axis=0)
# Compute covariance matrix
cov_matrix = np.cov(centered_points.T)
# Eigenvalues and eigenvectors
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)
eigenvalues = np.sort(eigenvalues)[::-1]
if len(eigenvalues) >= 3 and eigenvalues[0] > 0:
# Shape descriptors
features['linearity'] = (eigenvalues[0] - eigenvalues[1]) / eigenvalues[0]
features['planarity'] = (eigenvalues[1] - eigenvalues[2]) / eigenvalues[0]
features['sphericity'] = eigenvalues[2] / eigenvalues[0]
features['anisotropy'] = (eigenvalues[0] - eigenvalues[2]) / eigenvalues[0]
features['omnivariance'] = (eigenvalues[0] * eigenvalues[1] * eigenvalues[2])**(1/3)
# Eigenentropy
normalized_eigenvals = eigenvalues / np.sum(eigenvalues)
features['eigenentropy'] = -np.sum(normalized_eigenvals * np.log(normalized_eigenvals + 1e-10))
# Change of curvature
features['change_of_curvature'] = eigenvalues[2] / (eigenvalues[0] + eigenvalues[1] + eigenvalues[2])
# Add PCA eigenvalues (PhysicsNeMo style)
features['pca_eigenvalue_1'] = eigenvalues[0]
features['pca_eigenvalue_2'] = eigenvalues[1]
features['pca_eigenvalue_3'] = eigenvalues[2]
else:
# Default values
features.update({
'linearity': 0, 'planarity': 0, 'sphericity': 0,
'anisotropy': 0, 'omnivariance': 0, 'eigenentropy': 0,
'change_of_curvature': 0,
'pca_eigenvalue_1': 0, 'pca_eigenvalue_2': 0, 'pca_eigenvalue_3': 0
})
except Exception as e:
features.update({
'linearity': 0, 'planarity': 0, 'sphericity': 0,
'anisotropy': 0, 'omnivariance': 0, 'eigenentropy': 0,
'change_of_curvature': 0,
'pca_eigenvalue_1': 0, 'pca_eigenvalue_2': 0, 'pca_eigenvalue_3': 0
})
return features
class DataLoader:
"""Data loading and preprocessing following PhysicsNeMo methodology."""
def __init__(self, config: AnalysisConfig, processor: Optional[PointCloudProcessor] = None):
self.config = config
self.processor = processor if processor is not None else PointCloudProcessor(config)
def load_vtp_file(self, file_path: Path) -> Optional[vtk.vtkPolyData]:
"""Load a VTP file and return VTK polydata object."""
try:
reader = vtk.vtkXMLPolyDataReader()
reader.SetFileName(str(file_path))
reader.Update()
return reader.GetOutput()
except Exception as e:
print(f"Error loading {file_path}: {e}")
return None
def process_single_file(self, file_path: Path) -> Tuple[str, Dict[str, float]]:
"""Process a single VTP file and extract features."""
car_name = file_path.stem
# Load the VTP file
polydata = self.load_vtp_file(file_path)
if polydata is None:
return car_name, {}
# Extract features
features = self.processor.process(polydata)
return car_name, features
def load_dataset(self) -> Tuple[List[str], np.ndarray, List[str]]:
"""Load all VTP files from multiple data folders and extract features."""
all_results = []
dataset_labels = []
for data_folder in self.config.data_folders:
print(f"\nProcessing folder: {data_folder.label} ({data_folder.path})")
# Find all VTP files in this folder
folder_path = Path(data_folder.path)
if not folder_path.exists():
print(f"Warning: Folder {data_folder.path} does not exist, skipping...")
continue
vtp_files = list(folder_path.glob("**/*.vtp"))
print(f"Found {len(vtp_files)} VTP files ")
# load train names
if data_folder.train_path:
with open(data_folder.train_path, 'r') as f:
train_names = [line.strip() for line in f if line.strip()]
train_set = set(train_names)
# Keep only the vtp files that match train_names
vtp_files = [vtp for vtp in vtp_files if vtp.stem in train_set]
print(f"Kept {len(vtp_files)} VTP files in {data_folder.label}")
# Limit number of files if specified
if self.config.max_files_per_folder > 0:
vtp_files = vtp_files[:self.config.max_files_per_folder]
if len(vtp_files) == 0:
print(f"No VTP files found in {data_folder.label}, skipping...")
continue
# Process files with progress bar
if self.config.use_multiprocessing:
print(f"Processing files using {self.config.num_workers} workers...")
with mp.Pool(self.config.num_workers) as pool:
results = list(tqdm(
pool.imap(self.process_single_file, vtp_files),
total=len(vtp_files),
desc=f"Loading {data_folder.label}",
unit="files"
))
else:
print("Processing files sequentially...")
results = []
for file_path in tqdm(vtp_files, desc=f"Loading {data_folder.label}", unit="files"):
result = self.process_single_file(file_path)
results.append(result)
# Add dataset labels to results
for car_name, features in results:
if features: # Only include files that were successfully processed
all_results.append((car_name, features))
# Use the base folder label for all files, regardless of subdirectory
dataset_labels.append(data_folder.label)
print(f"Successfully processed {len([r for r in results if r[1]])} files from {data_folder.label}")
if len(all_results) == 0:
print("No valid files processed from any folder!")
return [], np.array([]), []
# Extract car names and features
car_names = [name for name, _ in all_results]
all_features = [features for _, features in all_results]
# Convert to DataFrame for easier handling
df = pd.DataFrame(all_features)
# Fill NaN values with 0
df = df.fillna(0)
# Convert to numpy array
features_array = df.values
print(f"\nTotal successfully processed: {len(car_names)} files")
print(f"Extracted {features_array.shape[1]} features per file")
print(f"Dataset distribution: {dict(zip(*np.unique(dataset_labels, return_counts=True)))}")
return car_names, features_array, dataset_labels
def load_test_dataset(self) -> Tuple[List[str], np.ndarray, List[str]]:
"""Load test dataset from test_path configuration."""
all_results = []
dataset_labels = []
for data_folder in self.config.data_folders:
if not data_folder.test_path:
continue
print(f"\nProcessing test folder: {data_folder.label} ({data_folder.test_path})")
# Load test names
with open(data_folder.test_path, 'r') as f:
test_names = [line.strip() for line in f if line.strip()]
test_set = set(test_names)
# Find all VTP files in the test folder
folder_path = Path(data_folder.path)
if not folder_path.exists():
print(f"Warning: Test folder {data_folder.path} does not exist, skipping...")
continue
vtp_files = list(folder_path.glob("**/*.vtp"))
print(f"Found {len(vtp_files)} VTP files in test folder")
# Keep only the vtp files that match test_names
vtp_files = [vtp for vtp in vtp_files if vtp.stem in test_set]
print(f"Kept {len(vtp_files)} VTP files for testing")
if len(vtp_files) == 0:
print(f"No matching test VTP files found, skipping...")
continue
# Process files with progress bar
if self.config.use_multiprocessing:
print(f"Processing test files using {self.config.num_workers} workers...")
with mp.Pool(self.config.num_workers) as pool:
results = list(tqdm(
pool.imap(self.process_single_file, vtp_files),
total=len(vtp_files),
desc=f"Loading test {data_folder.label}",
unit="files"
))
else:
print("Processing test files sequentially...")
results = []
for file_path in tqdm(vtp_files, desc=f"Loading test {data_folder.label}", unit="files"):
result = self.process_single_file(file_path)
results.append(result)
# Add dataset labels to results
for car_name, features in results:
if features: # Only include files that were successfully processed
all_results.append((car_name, features))
dataset_labels.append(f"{data_folder.label}_test")
print(f"Successfully processed {len([r for r in results if r[1]])} test files from {data_folder.label}")
if len(all_results) == 0:
print("No valid test files processed from any folder!")
return [], np.array([]), []
# Extract car names and features
car_names = [name for name, _ in all_results]
all_features = [features for _, features in all_results]
# Convert to DataFrame for easier handling
df = pd.DataFrame(all_features)
# Fill NaN values with 0
df = df.fillna(0)
# Convert to numpy array
features_array = df.values
print(f"\nTotal successfully processed test files: {len(car_names)}")
print(f"Extracted {features_array.shape[1]} features per test file")
return car_names, features_array, dataset_labels
class DimensionalityReducer:
"""Dimensionality reduction following PhysicsNeMo methodology."""
def __init__(self, config: AnalysisConfig):
self.config = config
# Choose scaler based on config
if config.scaling_method == "robust":
self.scaler = RobustScaler() # More robust to outliers than StandardScaler
elif config.scaling_method == "standard":
self.scaler = StandardScaler()
else:
self.scaler = RobustScaler() # Default to robust
def create_umap_embedding(self, features: np.ndarray) -> np.ndarray:
"""Create UMAP embedding from features."""
print("Standardizing features...")
# Standardize features
features_scaled = self.scaler.fit_transform(features)
print("Creating UMAP embedding...")
# Create UMAP embedding
reducer = umap.UMAP(
n_components=self.config.n_components,
n_neighbors=self.config.n_neighbors,
min_dist=self.config.min_dist,
metric=self.config.metric,
random_state=self.config.random_state,
n_epochs=self.config.n_epochs,
learning_rate=self.config.learning_rate,
verbose=True
)
# Perform the embedding
embedding = reducer.fit_transform(features_scaled)
# Store the reducer for later use
self.reducer = reducer
return embedding
class ConfidenceScorer:
"""Confidence scoring for new geometries using HDBSCAN density-based probabilities."""
def __init__(self, config: AnalysisConfig):
self.config = config
self.training_embedding = None
self.training_names = None
self.umap_reducer = None
self.scaler = None
self.hdbscan_clusterer = None
def fit_training_data(self, training_features: np.ndarray, training_names: List[str],
umap_reducer, scaler, hdbscan_clusterer=None) -> None:
"""Fit the confidence scorer on training data."""
self.training_embedding = umap_reducer.transform(scaler.transform(training_features))
self.training_names = training_names
self.umap_reducer = umap_reducer
self.scaler = scaler
self.hdbscan_clusterer = hdbscan_clusterer
print(f"✅ Fitted confidence scorer on {len(training_names)} training samples")
def compute_confidence_scores(self, test_features: np.ndarray, test_names: List[str],
k_neighbors: int = 5) -> Tuple[np.ndarray, np.ndarray]:
"""Compute confidence scores for test geometries using HDBSCAN density-based probabilities."""
if self.training_embedding is None:
raise ValueError("Confidence scorer not fitted. Call fit_training_data() first.")
print(f"🔍 Computing HDBSCAN density-based confidence scores for {len(test_names)} test geometries...")
# Transform test features to UMAP space
test_features_scaled = self.scaler.transform(test_features)
test_embedding = self.umap_reducer.transform(test_features_scaled)
# Use HDBSCAN density-based confidence if clusterer is available
if self.hdbscan_clusterer is not None and HDBSCAN_AVAILABLE:
# Use HDBSCAN approximate prediction for density-based confidence
pred_labels, pred_probs = prediction.approximate_predict(self.hdbscan_clusterer, test_embedding)
confidence_scores = pred_probs
print(f"✅ Computed HDBSCAN density-based confidence scores:")
print(f" - Mean confidence: {np.mean(confidence_scores):.4f}")
return test_embedding, confidence_scores
def create_confidence_visualization(self, test_embedding: np.ndarray, test_names: List[str],
confidence_scores: np.ndarray,
save_path: str = "files/confidence_visualization.png") -> None:
"""Create visualization showing training vs test points with confidence scores."""
plt.figure(figsize=(12, 10))
# Plot training points in light gray
plt.scatter(self.training_embedding[:, 0], self.training_embedding[:, 1],
c='lightgray', s=30, alpha=0.6, label='Training Data', edgecolors='black', linewidth=0.5)
# Plot test points colored by confidence score
plt.scatter(test_embedding[:, 0], test_embedding[:, 1],
c=confidence_scores, s=100, alpha=0.8,
cmap='RdYlGn', vmin=0, vmax=1,
label='Test Data', edgecolors='black', linewidth=1)
# Add colorbar
# cbar = plt.colorbar(scatter)
# cbar.set_label('Confidence Score', fontsize=12)
# Add labels for test points
for i, name in enumerate(test_names):
plt.annotate(name, (test_embedding[i, 0], test_embedding[i, 1]),
xytext=(5, 5), textcoords='offset points',
fontsize=8, alpha=0.8, fontweight='bold')
plt.xlabel('UMAP Component 1', fontsize=12)
plt.ylabel('UMAP Component 2', fontsize=12)
plt.title('Confidence Scoring: Training vs Test Geometries', fontsize=16, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
# Save plot
os.makedirs(os.path.dirname(save_path), exist_ok=True)
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Confidence visualization saved to: {save_path}")
plt.show()
class Clusterer:
"""HDBSCAN clustering for UMAP embeddings following PhysicsNeMo methodology."""
def __init__(self, config: AnalysisConfig):
self.config = config
def cluster_embedding(self, embedding: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Any]:
"""Perform HDBSCAN clustering on UMAP embedding."""
print("🔍 Performing HDBSCAN clustering on UMAP embedding...")
# Calculate adaptive min_cluster_size based on number of points
num_points = len(embedding)
# adaptive_min_cluster_size = min(self.config.min_cluster_size, max(2, num_points // 10))
# For very small datasets, use even smaller cluster size
# if num_points < 20:
# adaptive_min_cluster_size = 2
# elif num_points < 50:
# adaptive_min_cluster_size = max(2, num_points // 5)
print(f"📊 Clustering parameters:")
print(f" - Number of points: {num_points}")
print(f" - Config min_cluster_size: {self.config.min_cluster_size}")
# print(f" - Adaptive min_cluster_size: {adaptive_min_cluster_size}")
try:
if HDBSCAN_AVAILABLE:
clusterer = hdbscan.HDBSCAN(
min_cluster_size=self.config.min_cluster_size,
metric=self.config.metric,
cluster_selection_epsilon=self.config.cluster_selection_epsilon,
allow_single_cluster=self.config.allow_single_cluster,
prediction_data=True # Enable prediction data for approximate_predict
)
cluster_labels = clusterer.fit_predict(embedding)
print("✅ Used HDBSCAN (CPU)")
else:
raise Exception("HDBSCAN not available")
# Identify outliers (label = -1)
outlier_mask = cluster_labels == -1
outliers = np.where(outlier_mask)[0]
print(f"📊 Clustering Results:")
print(f" - Total points: {len(cluster_labels)}")
print(f" - Number of clusters: {len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)}")
print(f" - Outliers: {len(outliers)} ({len(outliers)/len(cluster_labels)*100:.1f}%)")
# Print density statistics
if hasattr(clusterer, 'probabilities_') and clusterer.probabilities_ is not None:
print(f" - Mean cluster probability: {np.mean(clusterer.probabilities_):.4f}")
print(f" - Min cluster probability: {np.min(clusterer.probabilities_):.4f}")
print(f" - Max cluster probability: {np.max(clusterer.probabilities_):.4f}")
# Check if prediction data is available
if hasattr(clusterer, 'prediction_data_') and clusterer.prediction_data_ is not None:
print(f" - Prediction data: ✅ Available for approximate_predict")
else:
print(f" - Prediction data: ❌ Not available - will use distance-based fallback")
# Check if any clusters were found
unique_labels = set(cluster_labels)
num_clusters = len(unique_labels) - (1 if -1 in unique_labels else 0)
if num_clusters == 0:
print("⚠️ No clusters found! Attempting fallback with more permissive parameters...")
return self._fallback_clustering(embedding)
return cluster_labels, outliers, clusterer
except Exception as e:
print(f"❌ Clustering failed: {e}")
# def _fallback_clustering(self, embedding: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Any]:
# """Fallback clustering with more permissive parameters."""
# print("🔄 Attempting fallback with more permissive parameters...")
# # Try multiple fallback strategies
# fallback_strategies = [
# # Strategy 1: Very permissive HDBSCAN
# {
# 'min_cluster_size': 2,
# 'cluster_selection_epsilon': 0.0,
# 'allow_single_cluster': True,
# 'min_samples': 1
# },
# # Strategy 2: Even more permissive
# {
# 'min_cluster_size': 2,
# 'cluster_selection_epsilon': 0.0,
# 'allow_single_cluster': True,
# 'min_samples': 1,
# 'cluster_selection_method': 'eom'
# },
# # Strategy 3: Single cluster fallback
# {
# 'min_cluster_size': 2,
# 'cluster_selection_epsilon': 0.0,
# 'allow_single_cluster': True,
# 'min_samples': 1,
# 'cluster_selection_method': 'leaf'
# }
# ]
# for i, strategy in enumerate(fallback_strategies):
# try:
# if HDBSCAN_AVAILABLE:
# print(f" Trying fallback strategy {i+1}...")
# fallback_clusterer = hdbscan.HDBSCAN(
# prediction_data=True,
# **strategy
# )
# fallback_labels = fallback_clusterer.fit_predict(embedding)
# # Check if clusters were found
# unique_labels = set(fallback_labels)
# num_clusters = len(unique_labels) - (1 if -1 in unique_labels else 0)
# if num_clusters > 0:
# print(f"✅ Fallback strategy {i+1} succeeded with {num_clusters} clusters")
# outliers = np.where(fallback_labels == -1)[0]
# return fallback_labels, outliers, fallback_clusterer
# else:
# print(f" Strategy {i+1} found no clusters, trying next...")
# except Exception as e:
# print(f" Strategy {i+1} failed: {e}")
# continue
# # If all strategies fail, create a single cluster with all points
# print("⚠️ All fallback strategies failed, creating single cluster with all points...")
# try:
# # Create a dummy clusterer that treats all points as one cluster
# dummy_clusterer = hdbscan.HDBSCAN(
# min_cluster_size=1,
# cluster_selection_epsilon=0.0,
# allow_single_cluster=True,
# prediction_data=True
# )
# # Force all points into cluster 0
# dummy_labels = np.zeros(len(embedding), dtype=int)
# print("✅ Created single cluster fallback")
# return dummy_labels, np.array([]), dummy_clusterer
# except Exception as final_e:
# print(f"❌ Final fallback also failed: {final_e}")
# # Return no clusters, no outliers, and None clusterer
# return np.zeros(len(embedding), dtype=int), np.array([]), None
class Visualizer:
"""Visualization methodology."""
def __init__(self, config: AnalysisConfig):
self.config = config
def create_umap_visualization(self, embedding: np.ndarray, car_names: List[str],
dataset_labels: List[str] = None, cluster_labels: np.ndarray = None,
outliers: np.ndarray = None) -> None:
"""Create and save UMAP visualization with clustering."""
plt.figure(figsize=self.config.figsize)
if cluster_labels is not None and len(cluster_labels) > 0:
# Create visualization with clustering
unique_clusters = sorted([c for c in set(cluster_labels) if c != -1])
unique_datasets = list(set(dataset_labels)) if dataset_labels else ['Dataset']
# Color maps
cluster_colors = plt.cm.tab10(np.linspace(0, 1, len(unique_clusters)))
dataset_colors = plt.cm.Set1(np.linspace(0, 1, len(unique_datasets)))
# Plot clusters
for cluster_idx, cluster_id in enumerate(unique_clusters):
cluster_mask = cluster_labels == cluster_id
if dataset_labels is not None:
# Plot each dataset separately within cluster
for dataset_idx, dataset in enumerate(unique_datasets):
dataset_mask = np.array(dataset_labels) == dataset
combined_mask = cluster_mask & dataset_mask
if np.any(combined_mask):
plt.scatter(
embedding[combined_mask, 0],
embedding[combined_mask, 1],
s=self.config.point_size,
alpha=self.config.alpha,
c=[cluster_colors[cluster_idx]],
marker='o',
edgecolors='black',
linewidth=0.5,
label=f'{dataset} - Cluster {cluster_id}' if cluster_idx == 0 else ""
)
else:
# Plot cluster without dataset distinction
plt.scatter(
embedding[cluster_mask, 0],
embedding[cluster_mask, 1],
s=self.config.point_size,
alpha=self.config.alpha,
c=[cluster_colors[cluster_idx]],
label=f'Cluster {cluster_id}',
edgecolors='black',
linewidth=0.5
)
# Plot outliers
if outliers is not None and len(outliers) > 0:
plt.scatter(
embedding[outliers, 0],
embedding[outliers, 1],
s=self.config.point_size * 1.5,
alpha=self.config.alpha,
c='red',
marker='x',
linewidth=2,
label='Outliers'
)
plt.legend(title='Clusters', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('UMAP Visualization with HDBSCAN Clustering', fontsize=16, fontweight='bold')
elif dataset_labels is not None:
# Create visualization with dataset-based coloring (no clustering)
unique_datasets = list(set(dataset_labels))
colors = plt.cm.Set1(np.linspace(0, 1, len(unique_datasets)))
dataset_color_map = dict(zip(unique_datasets, colors))
# Plot each dataset separately
for dataset in unique_datasets:
mask = np.array(dataset_labels) == dataset
plt.scatter(
embedding[mask, 0],
embedding[mask, 1],
s=self.config.point_size,
alpha=self.config.alpha,
c=[dataset_color_map[dataset]],
label=dataset,
edgecolors='black',
linewidth=0.5
)
plt.legend(title='Dataset', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('UMAP Visualization - Multi-Dataset Analysis', fontsize=16, fontweight='bold')
else:
# Create scatter plot with index-based coloring
scatter = plt.scatter(
embedding[:, 0],
embedding[:, 1],
s=self.config.point_size,
alpha=self.config.alpha,
c=range(len(car_names)),
cmap=self.config.colormap
)
# Add colorbar
cbar = plt.colorbar(scatter)
cbar.set_label('Model Index', fontsize=12)
plt.title('UMAP Visualization of Dataset', fontsize=16, fontweight='bold')
plt.xlabel('UMAP Component 1', fontsize=12)
plt.ylabel('UMAP Component 2', fontsize=12)
# Add labels if requested
if self.config.show_labels:
for i, name in enumerate(car_names):
plt.annotate(name, (embedding[i, 0], embedding[i, 1]),
xytext=(5, 5), textcoords='offset points',
fontsize=8, alpha=0.7)
plt.tight_layout()
# Save plot if requested
if self.config.save_plot:
os.makedirs(os.path.dirname(self.config.output_path), exist_ok=True)
plt.savefig(self.config.output_path, dpi=self.config.dpi, bbox_inches='tight')
print(f"Plot saved to: {self.config.output_path}")
plt.show()
def create_comparison_plot(self, umap_embedding: np.ndarray, pca_embedding: np.ndarray, car_names: List[str], dataset_labels: List[str] = None) -> None:
"""Create comparison plot between UMAP and PCA."""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8))
if dataset_labels is not None:
# Create plots with dataset-based coloring
unique_datasets = list(set(dataset_labels))
colors = plt.cm.Set1(np.linspace(0, 1, len(unique_datasets)))
dataset_color_map = dict(zip(unique_datasets, colors))
# UMAP plot
for dataset in unique_datasets:
mask = np.array(dataset_labels) == dataset
ax1.scatter(
umap_embedding[mask, 0],
umap_embedding[mask, 1],
s=self.config.point_size,
alpha=self.config.alpha,
c=[dataset_color_map[dataset]],
label=dataset,
edgecolors='black',
linewidth=0.5
)
ax1.set_title('UMAP Embedding', fontsize=14, fontweight='bold')
ax1.set_xlabel('UMAP Component 1', fontsize=12)
ax1.set_ylabel('UMAP Component 2', fontsize=12)
ax1.legend(title='Dataset', loc='upper right')
# PCA plot
for dataset in unique_datasets:
mask = np.array(dataset_labels) == dataset
ax2.scatter(
pca_embedding[mask, 0],
pca_embedding[mask, 1],
s=self.config.point_size,
alpha=self.config.alpha,
c=[dataset_color_map[dataset]],
label=dataset,
edgecolors='black',
linewidth=0.5
)
ax2.set_title('PCA Embedding', fontsize=14, fontweight='bold')
ax2.set_xlabel('PCA Component 1', fontsize=12)
ax2.set_ylabel('PCA Component 2', fontsize=12)
ax2.legend(title='Dataset', loc='upper right')
else:
# UMAP plot
scatter1 = ax1.scatter(
umap_embedding[:, 0],
umap_embedding[:, 1],
s=self.config.point_size,
alpha=self.config.alpha,
c=range(len(car_names)),
cmap=self.config.colormap
)
ax1.set_title('UMAP Embedding', fontsize=14, fontweight='bold')
ax1.set_xlabel('UMAP Component 1', fontsize=12)
ax1.set_ylabel('UMAP Component 2', fontsize=12)
plt.colorbar(scatter1, ax=ax1, label='Model Index')
# PCA plot
scatter2 = ax2.scatter(
pca_embedding[:, 0],
pca_embedding[:, 1],
s=self.config.point_size,
alpha=self.config.alpha,
c=range(len(car_names)),
cmap=self.config.colormap
)
ax2.set_title('PCA Embedding', fontsize=14, fontweight='bold')
ax2.set_xlabel('PCA Component 1', fontsize=12)
ax2.set_ylabel('PCA Component 2', fontsize=12)
plt.colorbar(scatter2, ax=ax2, label='Model Index')
plt.tight_layout()
plt.show()
def create_feature_analysis(self, features: np.ndarray, car_names: List[str]) -> None:
"""Create feature analysis plots."""
# Create comprehensive feature names list
feature_names = [
'x_length', 'y_length', 'z_length',
'aspect_ratio_xy', 'aspect_ratio_xz', 'aspect_ratio_yz',
'linearity', 'planarity', 'sphericity', 'anisotropy',
'omnivariance', 'eigenentropy', 'change_of_curvature',
'pca_eigenvalue_1', 'pca_eigenvalue_2', 'pca_eigenvalue_3'
]
# Ensure we have the right number of feature names
if len(feature_names) < features.shape[1]:
# Add generic names for any missing features
for i in range(len(feature_names), features.shape[1]):
feature_names.append(f'feature_{i}')
elif len(feature_names) > features.shape[1]:
# Truncate if we have too many names
feature_names = feature_names[:features.shape[1]]
df_features = pd.DataFrame(features, columns=feature_names)
# Correlation heatmap
plt.figure(figsize=(12, 10))
correlation_matrix = df_features.corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f')
plt.title('Feature Correlation Matrix', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()
# Feature distribution
fig, axes = plt.subplots(4, 4, figsize=(16, 16))
axes = axes.ravel()
for i, feature in enumerate(feature_names[:16]):
if i < len(axes):
axes[i].hist(features[:, i], bins=30, alpha=0.7, edgecolor='black')
axes[i].set_title(f'{feature}', fontsize=10)
axes[i].set_xlabel('Value')
axes[i].set_ylabel('Frequency')
plt.suptitle('Feature Distributions', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()
class PhysicsNeMoAnalyzer:
"""Main analyzer class following PhysicsNeMo methodology."""
def __init__(self, config_path: str):
"""Initialize with configuration file."""
# Load configuration from YAML file
self.config = AnalysisConfig.from_yaml(config_path)
# Initialize components
self.processor = PointCloudProcessor(self.config)
self.data_loader = DataLoader(self.config, self.processor)
self.dimensionality_reducer = DimensionalityReducer(self.config)
self.clusterer = Clusterer(self.config)
self.visualizer = Visualizer(self.config)
self.confidence_scorer = ConfidenceScorer(self.config)
def run_unified_analysis(self) -> None:
"""Run the unified analysis pipeline: training + confidence scoring in one go."""
print("=" * 60)
print("Unified PhysicsNeMo Analysis Pipeline")
print("=" * 60)
# Step 1: Load training dataset
print("\n📁 Step 1/6: Loading training dataset...")
train_names, train_features, train_labels = self.data_loader.load_dataset()
if len(train_names) == 0:
print("❌ No training data found!")
return
print(f"✅ Successfully loaded {len(train_names)} training samples")
# Step 2: Create UMAP embedding on training data
print(f"\n🧠 Step 2/6: Creating UMAP embedding on training data...")
train_embedding = self.dimensionality_reducer.create_umap_embedding(train_features)
print("✅ UMAP embedding completed")
# Step 3: Perform HDBSCAN clustering on training embedding
print(f"\n🔍 Step 3/6: Performing HDBSCAN clustering...")
cluster_labels, outliers, clusterer = self.clusterer.cluster_embedding(train_embedding)
print("✅ HDBSCAN clustering completed")
# Step 4: Load test dataset and compute confidence scores
print(f"\n📁 Step 4/6: Loading test dataset and computing confidence scores...")
test_names, test_features, test_labels = self.data_loader.load_test_dataset()
if len(test_names) > 0:
print(f"✅ Successfully loaded {len(test_names)} test samples")
# Fit confidence scorer and compute scores
self.confidence_scorer.fit_training_data(
train_features, train_names,
self.dimensionality_reducer.reducer,
self.dimensionality_reducer.scaler,
clusterer
)
test_embedding, confidence_scores = self.confidence_scorer.compute_confidence_scores(
test_features, test_names, k_neighbors=4
)
print("✅ Confidence scores computed")
else:
print("⚠️ No test data found, skipping confidence scoring")
test_embedding, confidence_scores, mean_confidence_scores = None, None, None
# Step 5: Create visualizations
print(f"\n🎨 Step 5/6: Creating visualizations...")
self.visualizer.create_umap_visualization(train_embedding, train_names, train_labels, cluster_labels, outliers)
self.visualizer.create_feature_analysis(train_features, train_names)
if test_embedding is not None:
self.confidence_scorer.create_confidence_visualization(
test_embedding, test_names, confidence_scores
)
print("✅ Visualizations completed")
# Step 6: Save all results
print(f"\n💾 Step 6/6: Saving all results...")
self._save_results(train_embedding, train_features, train_names, train_labels, cluster_labels, outliers, clusterer)
if test_embedding is not None:
self._save_confidence_results(
test_embedding, test_features, test_names, test_labels,
confidence_scores
)
print("✅ All results saved")
print("\n" + "=" * 60)
print("🎉 Unified analysis complete!")
print("=" * 60)
def run_analysis(self) -> None:
"""Run the complete analysis pipeline (legacy method)."""
print("=" * 60)
print("PhysicsNeMo-Style Car Dataset Analysis")
print("=" * 60)
# Step 1: Load dataset
print("\n📁 Step 1/5: Loading dataset...")
car_names, features, dataset_labels = self.data_loader.load_dataset()
if len(car_names) == 0:
print("❌ No valid files found!")
return
print(f"✅ Successfully loaded {len(car_names)} models")
# Step 2: Create UMAP embedding
print(f"\n🧠 Step 2/5: Creating UMAP embedding...")
umap_embedding = self.dimensionality_reducer.create_umap_embedding(features)
print("✅ UMAP embedding completed")
# Step 3: Perform HDBSCAN clustering on UMAP embedding
print(f"\n🔍 Step 3/5: Performing HDBSCAN clustering...")
cluster_labels, outliers, clusterer = self.clusterer.cluster_embedding(umap_embedding)
print("✅ HDBSCAN clustering completed")
# Step 4: Create visualizations
print(f"\n🎨 Step 4/5: Creating visualizations...")
self.visualizer.create_umap_visualization(umap_embedding, car_names, dataset_labels, cluster_labels, outliers)
self.visualizer.create_feature_analysis(features, car_names)
print("✅ Visualizations completed")
# Step 5: Save results
print(f"\n💾 Step 5/5: Saving results...")
self._save_results(umap_embedding, features, car_names, dataset_labels, cluster_labels, outliers, clusterer)
print("✅ Results saved")
print("\n" + "=" * 60)
print("🎉 Analysis complete!")
print("=" * 60)
def run_confidence_scoring(self) -> None:
"""Run the confidence scoring pipeline for test data."""
print("=" * 60)
print("Confidence Scoring Pipeline")
print("=" * 60)
# Step 1: Load training data
print("\n📁 Step 1/5: Loading training dataset...")
train_names, train_features, train_labels = self.data_loader.load_dataset()
if len(train_names) == 0:
print("❌ No training data found!")
return
print(f"✅ Successfully loaded {len(train_names)} training samples")
# Step 2: Fit UMAP on training data only
print(f"\n🧠 Step 2/5: Fitting UMAP on training data...")
umap_embedding = self.dimensionality_reducer.create_umap_embedding(train_features)
print("✅ UMAP fitted on training data")
# Step 2.5: Perform HDBSCAN clustering on training embedding
print(f"\n🔍 Step 2.5/5: Performing HDBSCAN clustering...")
cluster_labels, outliers, clusterer = self.clusterer.cluster_embedding(umap_embedding)
print("✅ HDBSCAN clustering completed")
# Step 3: Load test data
print(f"\n📁 Step 3/5: Loading test dataset...")
test_names, test_features, test_labels = self.data_loader.load_test_dataset()
if len(test_names) == 0:
print("❌ No test data found!")
return
print(f"✅ Successfully loaded {len(test_names)} test samples")
# Step 4: Fit confidence scorer and compute scores
print(f"\n🔍 Step 4/5: Computing confidence scores...")
self.confidence_scorer.fit_training_data(
train_features, train_names,
self.dimensionality_reducer.reducer,
self.dimensionality_reducer.scaler,
clusterer
)
test_embedding, confidence_scores = self.confidence_scorer.compute_confidence_scores(
test_features, test_names, k_neighbors=4
)
print("✅ Confidence scores computed")
# Step 5: Create visualizations and save results
print(f"\n🎨 Step 5/5: Creating visualizations and saving results...")
self.confidence_scorer.create_confidence_visualization(
test_embedding, test_names, confidence_scores
)
# Save confidence results
self._save_confidence_results(
test_embedding, test_features, test_names, test_labels,
confidence_scores
)
print("✅ Results saved")
print("\n" + "=" * 60)
print("🎉 Confidence scoring complete!")
print("=" * 60)
def _save_confidence_results(self, test_embedding: np.ndarray, test_features: np.ndarray,
test_names: List[str], test_labels: List[str],
confidence_scores: np.ndarray) -> None:
"""Save confidence scoring results to files."""
# Generate dynamic file names
unique_datasets = list(set(test_labels))
if len(unique_datasets) == 1:
base_name = unique_datasets[0].replace(" ", "_").lower()
else:
base_name = "combined_test"
# Save test embedding
# test_embedding_path = f"files/{base_name}_test_embedding.npy"
# os.makedirs(os.path.dirname(test_embedding_path), exist_ok=True)
# np.save(test_embedding_path, test_embedding)
# print(f"Test embedding saved to: {test_embedding_path}")
# # Save test features
# test_features_path = f"files/{base_name}_test_features.npy"
# np.save(test_features_path, test_features)
# print(f"Test features saved to: {test_features_path}")
# Save test names
# test_names_path = f"files/{base_name}_test_names.npy"
# np.save(test_names_path, test_names)
# print(f"Test names saved to: {test_names_path}")
# Save confidence scores
# confidence_path = f"files/{base_name}_confidence_scores.npy"
# np.save(confidence_path, confidence_scores)
# print(f"Confidence scores saved to: {confidence_path}")
# Save mean confidence scores
# mean_confidence_path = f"files/{base_name}_mean_confidence_scores.npy"
# np.save(mean_confidence_path, mean_confidence_scores)
# print(f"Mean confidence scores saved to: {mean_confidence_path}")
# Create a summary report
summary_path = f"files/{base_name}_confidence_summary.txt"
with open(summary_path, 'w') as f:
f.write("Confidence Scoring Summary\n")
f.write("=" * 30 + "\n\n")
f.write(f"Number of test samples: {len(test_names)}\n")
f.write(f"Confidence score range: {np.min(confidence_scores):.4f} - {np.max(confidence_scores):.4f}\n")
f.write(f"Mean confidence score: {np.mean(confidence_scores):.4f}\n")
f.write(f"Std confidence score: {np.std(confidence_scores):.4f}\n\n")
f.write("Individual Results:\n")
f.write("-" * 20 + "\n")
for i, (name, conf) in enumerate(zip(test_names, confidence_scores)):
f.write(f"{i+1:3d}. {name:20s} | Confidence: {conf:.4f}\n")
print(f"Summary report saved to: {summary_path}")
def _save_results(self, embedding: np.ndarray, features: np.ndarray, car_names: List[str],
dataset_labels: List[str], cluster_labels: np.ndarray = None, outliers: np.ndarray = None, clusterer: Any = None) -> None:
"""Save results to files."""
# Generate dynamic file names based on dataset labels
unique_datasets = list(set(dataset_labels))
if len(unique_datasets) == 1:
# Single dataset - use the label name
base_name = unique_datasets[0].replace(" ", "_").lower()
else:
# Multiple datasets - use "combined"
base_name = "combined"
# Generate file paths (save to files/ directory)
embedding_path = f"files/{base_name}_umap_embedding.npy"
features_path = f"files/{base_name}_features.npy"
labels_path = f"files/{base_name}_names.npy"
dataset_labels_path = f"files/{base_name}_dataset_labels.npy"
# Save embedding
if self.config.save_embedding:
# Only create directory if there's a directory path
dir_path = os.path.dirname(embedding_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
np.save(embedding_path, embedding)
print(f"Embedding saved to: {embedding_path}")
# Save features
if self.config.save_features:
# Only create directory if there's a directory path
dir_path = os.path.dirname(features_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
np.save(features_path, features)
print(f"Features saved to: {features_path}")
# Save labels
if self.config.save_labels:
# Only create directory if there's a directory path
dir_path = os.path.dirname(labels_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
np.save(labels_path, car_names)
print(f"Names saved to: {labels_path}")
# Save dataset labels
# Only create directory if there's a directory path
dir_path = os.path.dirname(dataset_labels_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
np.save(dataset_labels_path, dataset_labels)
print(f"Dataset labels saved to: {dataset_labels_path}")
# Save UMAP reducer, scaler, and clusterer for reuse in Gradio demo
self._save_umap_components(base_name, clusterer)
# Save clustering results
# if cluster_labels is not None:
# cluster_labels_path = self.config.labels_path.replace('.npy', '_cluster_labels.npy')
# os.makedirs(os.path.dirname(cluster_labels_path), exist_ok=True)
# np.save(cluster_labels_path, cluster_labels)
# print(f"Cluster labels saved to: {cluster_labels_path}")
# if outliers is not None and len(outliers) > 0:
# outliers_path = self.config.labels_path.replace('.npy', '_outliers.npy')
# os.makedirs(os.path.dirname(outliers_path), exist_ok=True)
# np.save(outliers_path, outliers)
# print(f"Outliers saved to: {outliers_path}")
def _save_umap_components(self, base_name: str, clusterer=None) -> None:
"""Save UMAP reducer, scaler, and clusterer for reuse in Gradio demo."""
import pickle
# Save UMAP reducer
reducer_path = f"files/{base_name}_umap_reducer.pkl"
dir_path = os.path.dirname(reducer_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
with open(reducer_path, 'wb') as f:
pickle.dump(self.dimensionality_reducer.reducer, f)
print(f"UMAP reducer saved to: {reducer_path}")
# Save scaler
scaler_path = f"files/{base_name}_scaler.pkl"
with open(scaler_path, 'wb') as f:
pickle.dump(self.dimensionality_reducer.scaler, f)
print(f"Scaler saved to: {scaler_path}")
# Save HDBSCAN clusterer if provided
if clusterer is not None:
clusterer_path = f"files/{base_name}_hdbscan_clusterer.pkl"
with open(clusterer_path, 'wb') as f:
pickle.dump(clusterer, f)
print(f"HDBSCAN clusterer saved to: {clusterer_path}")
def main():
"""Main function to run the PhysicsNeMo-style analysis."""
import sys
config_path = "/raid/ansysai/udbhav/alphaLPFM/similarity/umap/config.yaml"
if not os.path.exists(config_path):
print(f"Configuration file not found: {config_path}")
return
# Create analyzer
analyzer = PhysicsNeMoAnalyzer(config_path)
# Check command line arguments for mode
if len(sys.argv) > 1:
if sys.argv[1] == "confidence":
print("Running confidence scoring pipeline only...")
analyzer.run_confidence_scoring()
elif sys.argv[1] == "train":
print("Running training analysis only...")
analyzer.run_analysis()
else:
print(f"Unknown argument: {sys.argv[1]}")
print("Usage: python run_umap.py [train|confidence]")
print(" - No argument: Run unified pipeline (training + confidence scoring)")
print(" - train: Run training analysis only")
print(" - confidence: Run confidence scoring only")
else:
print("Running unified pipeline (training + confidence scoring)...")
analyzer.run_unified_analysis()
if __name__ == "__main__":
main()