Spaces:
Sleeping
Sleeping
| """ | |
| PhysicsNeMo-Style Dataset Analysis | |
| Following NVIDIA's PhysicsNeMo-Curator methodology for external aerodynamics analysis. | |
| This module provides a structured approach to analyzing datasets with UMAP visualization. | |
| """ | |
| import os | |
| import yaml | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from pathlib import Path | |
| from typing import List, Dict, Tuple, Optional, Any | |
| import multiprocessing as mp | |
| from tqdm import tqdm | |
| from dataclasses import dataclass | |
| from abc import ABC, abstractmethod | |
| # VTK imports | |
| try: | |
| import vtk | |
| from vtk.util.numpy_support import vtk_to_numpy | |
| except ImportError: | |
| print("VTK not found. Please install it with: pip install vtk") | |
| exit(1) | |
| # UMAP import | |
| try: | |
| import umap | |
| except ImportError: | |
| print("UMAP not found. Please install it with: pip install umap-learn") | |
| exit(1) | |
| # Scikit-learn for preprocessing | |
| from sklearn.preprocessing import StandardScaler, RobustScaler | |
| from sklearn.decomposition import PCA | |
| from sklearn.manifold import TSNE | |
| from sklearn.neighbors import NearestNeighbors | |
| # Clustering | |
| try: | |
| import cuml | |
| from cuml.cluster import HDBSCAN | |
| CUML_AVAILABLE = True | |
| except ImportError: | |
| try: | |
| import hdbscan | |
| from hdbscan import prediction | |
| HDBSCAN_AVAILABLE = True | |
| CUML_AVAILABLE = False | |
| except ImportError: | |
| HDBSCAN_AVAILABLE = False | |
| CUML_AVAILABLE = False | |
| class DataFolder: | |
| """Configuration for a single data folder.""" | |
| path: str | |
| label: str | |
| color: str = "blue" | |
| train_path: str = "" | |
| test_path: str = "" | |
| class AnalysisConfig: | |
| """Configuration class for the analysis pipeline.""" | |
| data_folders: List[DataFolder] | |
| file_pattern: str = "*.vtp" | |
| max_files_per_folder: int = -1 | |
| use_multiprocessing: bool = True | |
| num_workers: int = 4 | |
| # Feature extraction | |
| include_pressure_stats: bool = False | |
| max_points_for_analysis: int = 50000 | |
| sampling_method: str = "uniform" | |
| subsampling_distance: float = 0.1 | |
| normalize_features: bool = True | |
| scaling_method: str = "robust" | |
| feature_selection: Dict[str, bool] = None | |
| # UMAP parameters | |
| n_components: int = 2 | |
| n_neighbors: int = 15 | |
| min_dist: float = 0.1 | |
| metric: str = "euclidean" | |
| random_state: int = 42 | |
| n_epochs: int = 1000 | |
| learning_rate: float = 1.0 | |
| # Clustering parameters | |
| clustering_algorithm: str = "hdbscan" | |
| min_cluster_size: int = 10 | |
| cluster_selection_epsilon: float = 1.5 | |
| allow_single_cluster: bool = True | |
| dbscan_eps: float = 0.5 | |
| dbscan_min_samples: int = 3 | |
| # Visualization | |
| figsize: Tuple[int, int] = (12, 10) | |
| point_size: int = 50 | |
| alpha: float = 0.7 | |
| colormap: str = "viridis" | |
| show_labels: bool = False | |
| save_plot: bool = True | |
| output_path: str = "car_umap_visualization.png" | |
| dpi: int = 300 | |
| # Output | |
| save_embedding: bool = True | |
| save_features: bool = True | |
| save_labels: bool = True | |
| def from_yaml(cls, config_path: str) -> 'AnalysisConfig': | |
| """Load configuration from YAML file.""" | |
| with open(config_path, 'r') as f: | |
| config_data = yaml.safe_load(f) | |
| # Extract data section | |
| data = config_data.get('data', {}) | |
| features = config_data.get('features', {}) | |
| umap_config = config_data.get('umap', {}) | |
| clustering = config_data.get('clustering', {}) | |
| visualization = config_data.get('visualization', {}) | |
| output = config_data.get('output', {}) | |
| # Parse data folders | |
| data_folders_list = data.get('data_folders', []) | |
| if not data_folders_list: | |
| # Fallback to single data folder for backward compatibility | |
| single_folder = data.get('data_folder', '') | |
| if single_folder: | |
| data_folders_list = [{'path': single_folder, 'label': 'Dataset', 'color': 'blue', 'train_path': '', 'test_path': ''}] | |
| data_folders = [DataFolder(**folder) for folder in data_folders_list] | |
| return cls( | |
| # Data parameters | |
| data_folders=data_folders, | |
| file_pattern=data.get('file_pattern', '*.vtp'), | |
| max_files_per_folder=data.get('max_files_per_folder', -1), | |
| use_multiprocessing=data.get('use_multiprocessing', True), | |
| num_workers=data.get('num_workers', 4), | |
| # Feature extraction parameters | |
| include_pressure_stats=features.get('include_pressure_stats', False), | |
| max_points_for_analysis=features.get('max_points_for_analysis', 50000), | |
| sampling_method=features.get('sampling_method', 'uniform'), | |
| subsampling_distance=features.get('subsampling_distance', 0.1), | |
| normalize_features=features.get('normalize_features', True), | |
| scaling_method=features.get('scaling_method', 'robust'), | |
| feature_selection=features.get('feature_selection', {}), | |
| # UMAP parameters | |
| n_components=umap_config.get('n_components', 2), | |
| n_neighbors=umap_config.get('n_neighbors', 15), | |
| min_dist=umap_config.get('min_dist', 0.1), | |
| metric=umap_config.get('metric', 'euclidean'), | |
| random_state=umap_config.get('random_state', 42), | |
| n_epochs=umap_config.get('n_epochs', 1000), | |
| learning_rate=umap_config.get('learning_rate', 1.0), | |
| # Clustering parameters | |
| clustering_algorithm=clustering.get('algorithm', 'hdbscan'), | |
| min_cluster_size=clustering.get('min_cluster_size', 10), | |
| cluster_selection_epsilon=clustering.get('cluster_selection_epsilon', 1.5), | |
| allow_single_cluster=clustering.get('allow_single_cluster', True), | |
| dbscan_eps=clustering.get('eps', 0.5), | |
| dbscan_min_samples=clustering.get('min_samples', 3), | |
| # Visualization parameters | |
| figsize=tuple(visualization.get('figsize', [12, 10])), | |
| point_size=visualization.get('point_size', 50), | |
| alpha=visualization.get('alpha', 0.7), | |
| colormap=visualization.get('colormap', 'viridis'), | |
| show_labels=visualization.get('show_labels', False), | |
| save_plot=visualization.get('save_plot', True), | |
| output_path=visualization.get('output_path', 'umap_visualization.png'), | |
| dpi=visualization.get('dpi', 300), | |
| # Output parameters | |
| save_embedding=output.get('save_embedding', True), | |
| save_features=output.get('save_features', True), | |
| save_labels=output.get('save_labels', True) | |
| ) | |
| class GeometryProcessor(ABC): | |
| """Abstract base class for geometry processing.""" | |
| def process(self, polydata: vtk.vtkPolyData) -> Dict[str, float]: | |
| """Process geometry and extract features.""" | |
| pass | |
| class PointCloudProcessor(GeometryProcessor): | |
| """Point cloud feature extraction following PhysicsNeMo methodology.""" | |
| def __init__(self, config: AnalysisConfig): | |
| self.config = config | |
| def process(self, polydata: vtk.vtkPolyData) -> Dict[str, float]: | |
| """Extract point cloud features from VTK polydata.""" | |
| features = {} | |
| # Get points | |
| points = polydata.GetPoints() | |
| num_points = points.GetNumberOfPoints() | |
| if num_points == 0: | |
| return features | |
| # Convert to numpy array | |
| points_array = vtk_to_numpy(points.GetData()) | |
| # Check if this is a mesh (has surface cells) or just a point cloud | |
| has_surface_cells = self._has_surface_cells(polydata) | |
| # If it's a mesh, subsample points from the surface | |
| if has_surface_cells: | |
| points_array = self._subsample_mesh_points(polydata, points_array) | |
| num_points = len(points_array) | |
| # Extract features | |
| all_features = {} | |
| all_features.update(self._extract_basic_features(points_array, num_points, polydata)) | |
| all_features.update(self._extract_shape_features(points_array)) | |
| # Filter features based on feature selection | |
| if self.config.feature_selection: | |
| features = {name: value for name, value in all_features.items() | |
| if self.config.feature_selection.get(name, True)} | |
| else: | |
| features = all_features | |
| return features | |
| def _has_surface_cells(self, polydata: vtk.vtkPolyData) -> bool: | |
| """Check if polydata has surface cells.""" | |
| return (polydata.GetPolys().GetNumberOfCells() > 0 or | |
| polydata.GetStrips().GetNumberOfCells() > 0) | |
| def _subsample_mesh_points(self, polydata: vtk.vtkPolyData, points_array: np.ndarray) -> np.ndarray: | |
| """Subsample points from mesh surface.""" | |
| try: | |
| sample_points = vtk.vtkPolyDataPointSampler() | |
| sample_points.SetInputData(polydata) | |
| sample_points.SetDistance(self.config.subsampling_distance) | |
| sample_points.Update() | |
| sampled_polydata = sample_points.GetOutput() | |
| if sampled_polydata.GetNumberOfPoints() > 0: | |
| return vtk_to_numpy(sampled_polydata.GetPoints().GetData()) | |
| except Exception as e: | |
| print(f"Warning: Point sampling failed: {e}") | |
| return points_array | |
| def _extract_basic_features(self, points_array: np.ndarray, num_points: int, polydata: vtk.vtkPolyData) -> Dict[str, float]: | |
| """Extract basic geometric features.""" | |
| features = {} | |
| # Bounding box | |
| bounds = polydata.GetBounds() | |
| x_length = bounds[1] - bounds[0] | |
| y_length = bounds[3] - bounds[2] | |
| z_length = bounds[5] - bounds[4] | |
| features['x_length'] = x_length | |
| features['y_length'] = y_length | |
| features['z_length'] = z_length | |
| # Aspect ratios | |
| features['aspect_ratio_xy'] = x_length / y_length if y_length > 0 else 0 | |
| features['aspect_ratio_xz'] = x_length / z_length if z_length > 0 else 0 | |
| features['aspect_ratio_yz'] = y_length / z_length if z_length > 0 else 0 | |
| return features | |
| def _extract_shape_features(self, points_array: np.ndarray) -> Dict[str, float]: | |
| """Extract shape features using PCA.""" | |
| features = {} | |
| try: | |
| # Center the points | |
| centered_points = points_array - np.mean(points_array, axis=0) | |
| # Compute covariance matrix | |
| cov_matrix = np.cov(centered_points.T) | |
| # Eigenvalues and eigenvectors | |
| eigenvalues, eigenvectors = np.linalg.eig(cov_matrix) | |
| eigenvalues = np.sort(eigenvalues)[::-1] | |
| if len(eigenvalues) >= 3 and eigenvalues[0] > 0: | |
| # Shape descriptors | |
| features['linearity'] = (eigenvalues[0] - eigenvalues[1]) / eigenvalues[0] | |
| features['planarity'] = (eigenvalues[1] - eigenvalues[2]) / eigenvalues[0] | |
| features['sphericity'] = eigenvalues[2] / eigenvalues[0] | |
| features['anisotropy'] = (eigenvalues[0] - eigenvalues[2]) / eigenvalues[0] | |
| features['omnivariance'] = (eigenvalues[0] * eigenvalues[1] * eigenvalues[2])**(1/3) | |
| # Eigenentropy | |
| normalized_eigenvals = eigenvalues / np.sum(eigenvalues) | |
| features['eigenentropy'] = -np.sum(normalized_eigenvals * np.log(normalized_eigenvals + 1e-10)) | |
| # Change of curvature | |
| features['change_of_curvature'] = eigenvalues[2] / (eigenvalues[0] + eigenvalues[1] + eigenvalues[2]) | |
| # Add PCA eigenvalues (PhysicsNeMo style) | |
| features['pca_eigenvalue_1'] = eigenvalues[0] | |
| features['pca_eigenvalue_2'] = eigenvalues[1] | |
| features['pca_eigenvalue_3'] = eigenvalues[2] | |
| else: | |
| # Default values | |
| features.update({ | |
| 'linearity': 0, 'planarity': 0, 'sphericity': 0, | |
| 'anisotropy': 0, 'omnivariance': 0, 'eigenentropy': 0, | |
| 'change_of_curvature': 0, | |
| 'pca_eigenvalue_1': 0, 'pca_eigenvalue_2': 0, 'pca_eigenvalue_3': 0 | |
| }) | |
| except Exception as e: | |
| features.update({ | |
| 'linearity': 0, 'planarity': 0, 'sphericity': 0, | |
| 'anisotropy': 0, 'omnivariance': 0, 'eigenentropy': 0, | |
| 'change_of_curvature': 0, | |
| 'pca_eigenvalue_1': 0, 'pca_eigenvalue_2': 0, 'pca_eigenvalue_3': 0 | |
| }) | |
| return features | |
| class DataLoader: | |
| """Data loading and preprocessing following PhysicsNeMo methodology.""" | |
| def __init__(self, config: AnalysisConfig, processor: Optional[PointCloudProcessor] = None): | |
| self.config = config | |
| self.processor = processor if processor is not None else PointCloudProcessor(config) | |
| def load_vtp_file(self, file_path: Path) -> Optional[vtk.vtkPolyData]: | |
| """Load a VTP file and return VTK polydata object.""" | |
| try: | |
| reader = vtk.vtkXMLPolyDataReader() | |
| reader.SetFileName(str(file_path)) | |
| reader.Update() | |
| return reader.GetOutput() | |
| except Exception as e: | |
| print(f"Error loading {file_path}: {e}") | |
| return None | |
| def process_single_file(self, file_path: Path) -> Tuple[str, Dict[str, float]]: | |
| """Process a single VTP file and extract features.""" | |
| car_name = file_path.stem | |
| # Load the VTP file | |
| polydata = self.load_vtp_file(file_path) | |
| if polydata is None: | |
| return car_name, {} | |
| # Extract features | |
| features = self.processor.process(polydata) | |
| return car_name, features | |
| def load_dataset(self) -> Tuple[List[str], np.ndarray, List[str]]: | |
| """Load all VTP files from multiple data folders and extract features.""" | |
| all_results = [] | |
| dataset_labels = [] | |
| for data_folder in self.config.data_folders: | |
| print(f"\nProcessing folder: {data_folder.label} ({data_folder.path})") | |
| # Find all VTP files in this folder | |
| folder_path = Path(data_folder.path) | |
| if not folder_path.exists(): | |
| print(f"Warning: Folder {data_folder.path} does not exist, skipping...") | |
| continue | |
| vtp_files = list(folder_path.glob("**/*.vtp")) | |
| print(f"Found {len(vtp_files)} VTP files ") | |
| # load train names | |
| if data_folder.train_path: | |
| with open(data_folder.train_path, 'r') as f: | |
| train_names = [line.strip() for line in f if line.strip()] | |
| train_set = set(train_names) | |
| # Keep only the vtp files that match train_names | |
| vtp_files = [vtp for vtp in vtp_files if vtp.stem in train_set] | |
| print(f"Kept {len(vtp_files)} VTP files in {data_folder.label}") | |
| # Limit number of files if specified | |
| if self.config.max_files_per_folder > 0: | |
| vtp_files = vtp_files[:self.config.max_files_per_folder] | |
| if len(vtp_files) == 0: | |
| print(f"No VTP files found in {data_folder.label}, skipping...") | |
| continue | |
| # Process files with progress bar | |
| if self.config.use_multiprocessing: | |
| print(f"Processing files using {self.config.num_workers} workers...") | |
| with mp.Pool(self.config.num_workers) as pool: | |
| results = list(tqdm( | |
| pool.imap(self.process_single_file, vtp_files), | |
| total=len(vtp_files), | |
| desc=f"Loading {data_folder.label}", | |
| unit="files" | |
| )) | |
| else: | |
| print("Processing files sequentially...") | |
| results = [] | |
| for file_path in tqdm(vtp_files, desc=f"Loading {data_folder.label}", unit="files"): | |
| result = self.process_single_file(file_path) | |
| results.append(result) | |
| # Add dataset labels to results | |
| for car_name, features in results: | |
| if features: # Only include files that were successfully processed | |
| all_results.append((car_name, features)) | |
| # Use the base folder label for all files, regardless of subdirectory | |
| dataset_labels.append(data_folder.label) | |
| print(f"Successfully processed {len([r for r in results if r[1]])} files from {data_folder.label}") | |
| if len(all_results) == 0: | |
| print("No valid files processed from any folder!") | |
| return [], np.array([]), [] | |
| # Extract car names and features | |
| car_names = [name for name, _ in all_results] | |
| all_features = [features for _, features in all_results] | |
| # Convert to DataFrame for easier handling | |
| df = pd.DataFrame(all_features) | |
| # Fill NaN values with 0 | |
| df = df.fillna(0) | |
| # Convert to numpy array | |
| features_array = df.values | |
| print(f"\nTotal successfully processed: {len(car_names)} files") | |
| print(f"Extracted {features_array.shape[1]} features per file") | |
| print(f"Dataset distribution: {dict(zip(*np.unique(dataset_labels, return_counts=True)))}") | |
| return car_names, features_array, dataset_labels | |
| def load_test_dataset(self) -> Tuple[List[str], np.ndarray, List[str]]: | |
| """Load test dataset from test_path configuration.""" | |
| all_results = [] | |
| dataset_labels = [] | |
| for data_folder in self.config.data_folders: | |
| if not data_folder.test_path: | |
| continue | |
| print(f"\nProcessing test folder: {data_folder.label} ({data_folder.test_path})") | |
| # Load test names | |
| with open(data_folder.test_path, 'r') as f: | |
| test_names = [line.strip() for line in f if line.strip()] | |
| test_set = set(test_names) | |
| # Find all VTP files in the test folder | |
| folder_path = Path(data_folder.path) | |
| if not folder_path.exists(): | |
| print(f"Warning: Test folder {data_folder.path} does not exist, skipping...") | |
| continue | |
| vtp_files = list(folder_path.glob("**/*.vtp")) | |
| print(f"Found {len(vtp_files)} VTP files in test folder") | |
| # Keep only the vtp files that match test_names | |
| vtp_files = [vtp for vtp in vtp_files if vtp.stem in test_set] | |
| print(f"Kept {len(vtp_files)} VTP files for testing") | |
| if len(vtp_files) == 0: | |
| print(f"No matching test VTP files found, skipping...") | |
| continue | |
| # Process files with progress bar | |
| if self.config.use_multiprocessing: | |
| print(f"Processing test files using {self.config.num_workers} workers...") | |
| with mp.Pool(self.config.num_workers) as pool: | |
| results = list(tqdm( | |
| pool.imap(self.process_single_file, vtp_files), | |
| total=len(vtp_files), | |
| desc=f"Loading test {data_folder.label}", | |
| unit="files" | |
| )) | |
| else: | |
| print("Processing test files sequentially...") | |
| results = [] | |
| for file_path in tqdm(vtp_files, desc=f"Loading test {data_folder.label}", unit="files"): | |
| result = self.process_single_file(file_path) | |
| results.append(result) | |
| # Add dataset labels to results | |
| for car_name, features in results: | |
| if features: # Only include files that were successfully processed | |
| all_results.append((car_name, features)) | |
| dataset_labels.append(f"{data_folder.label}_test") | |
| print(f"Successfully processed {len([r for r in results if r[1]])} test files from {data_folder.label}") | |
| if len(all_results) == 0: | |
| print("No valid test files processed from any folder!") | |
| return [], np.array([]), [] | |
| # Extract car names and features | |
| car_names = [name for name, _ in all_results] | |
| all_features = [features for _, features in all_results] | |
| # Convert to DataFrame for easier handling | |
| df = pd.DataFrame(all_features) | |
| # Fill NaN values with 0 | |
| df = df.fillna(0) | |
| # Convert to numpy array | |
| features_array = df.values | |
| print(f"\nTotal successfully processed test files: {len(car_names)}") | |
| print(f"Extracted {features_array.shape[1]} features per test file") | |
| return car_names, features_array, dataset_labels | |
| class DimensionalityReducer: | |
| """Dimensionality reduction following PhysicsNeMo methodology.""" | |
| def __init__(self, config: AnalysisConfig): | |
| self.config = config | |
| # Choose scaler based on config | |
| if config.scaling_method == "robust": | |
| self.scaler = RobustScaler() # More robust to outliers than StandardScaler | |
| elif config.scaling_method == "standard": | |
| self.scaler = StandardScaler() | |
| else: | |
| self.scaler = RobustScaler() # Default to robust | |
| def create_umap_embedding(self, features: np.ndarray) -> np.ndarray: | |
| """Create UMAP embedding from features.""" | |
| print("Standardizing features...") | |
| # Standardize features | |
| features_scaled = self.scaler.fit_transform(features) | |
| print("Creating UMAP embedding...") | |
| # Create UMAP embedding | |
| reducer = umap.UMAP( | |
| n_components=self.config.n_components, | |
| n_neighbors=self.config.n_neighbors, | |
| min_dist=self.config.min_dist, | |
| metric=self.config.metric, | |
| random_state=self.config.random_state, | |
| n_epochs=self.config.n_epochs, | |
| learning_rate=self.config.learning_rate, | |
| verbose=True | |
| ) | |
| # Perform the embedding | |
| embedding = reducer.fit_transform(features_scaled) | |
| # Store the reducer for later use | |
| self.reducer = reducer | |
| return embedding | |
| class ConfidenceScorer: | |
| """Confidence scoring for new geometries using HDBSCAN density-based probabilities.""" | |
| def __init__(self, config: AnalysisConfig): | |
| self.config = config | |
| self.training_embedding = None | |
| self.training_names = None | |
| self.umap_reducer = None | |
| self.scaler = None | |
| self.hdbscan_clusterer = None | |
| def fit_training_data(self, training_features: np.ndarray, training_names: List[str], | |
| umap_reducer, scaler, hdbscan_clusterer=None) -> None: | |
| """Fit the confidence scorer on training data.""" | |
| self.training_embedding = umap_reducer.transform(scaler.transform(training_features)) | |
| self.training_names = training_names | |
| self.umap_reducer = umap_reducer | |
| self.scaler = scaler | |
| self.hdbscan_clusterer = hdbscan_clusterer | |
| print(f"✅ Fitted confidence scorer on {len(training_names)} training samples") | |
| def compute_confidence_scores(self, test_features: np.ndarray, test_names: List[str], | |
| k_neighbors: int = 5) -> Tuple[np.ndarray, np.ndarray]: | |
| """Compute confidence scores for test geometries using HDBSCAN density-based probabilities.""" | |
| if self.training_embedding is None: | |
| raise ValueError("Confidence scorer not fitted. Call fit_training_data() first.") | |
| print(f"🔍 Computing HDBSCAN density-based confidence scores for {len(test_names)} test geometries...") | |
| # Transform test features to UMAP space | |
| test_features_scaled = self.scaler.transform(test_features) | |
| test_embedding = self.umap_reducer.transform(test_features_scaled) | |
| # Use HDBSCAN density-based confidence if clusterer is available | |
| if self.hdbscan_clusterer is not None and HDBSCAN_AVAILABLE: | |
| # Use HDBSCAN approximate prediction for density-based confidence | |
| pred_labels, pred_probs = prediction.approximate_predict(self.hdbscan_clusterer, test_embedding) | |
| confidence_scores = pred_probs | |
| print(f"✅ Computed HDBSCAN density-based confidence scores:") | |
| print(f" - Mean confidence: {np.mean(confidence_scores):.4f}") | |
| return test_embedding, confidence_scores | |
| def create_confidence_visualization(self, test_embedding: np.ndarray, test_names: List[str], | |
| confidence_scores: np.ndarray, | |
| save_path: str = "files/confidence_visualization.png") -> None: | |
| """Create visualization showing training vs test points with confidence scores.""" | |
| plt.figure(figsize=(12, 10)) | |
| # Plot training points in light gray | |
| plt.scatter(self.training_embedding[:, 0], self.training_embedding[:, 1], | |
| c='lightgray', s=30, alpha=0.6, label='Training Data', edgecolors='black', linewidth=0.5) | |
| # Plot test points colored by confidence score | |
| plt.scatter(test_embedding[:, 0], test_embedding[:, 1], | |
| c=confidence_scores, s=100, alpha=0.8, | |
| cmap='RdYlGn', vmin=0, vmax=1, | |
| label='Test Data', edgecolors='black', linewidth=1) | |
| # Add colorbar | |
| # cbar = plt.colorbar(scatter) | |
| # cbar.set_label('Confidence Score', fontsize=12) | |
| # Add labels for test points | |
| for i, name in enumerate(test_names): | |
| plt.annotate(name, (test_embedding[i, 0], test_embedding[i, 1]), | |
| xytext=(5, 5), textcoords='offset points', | |
| fontsize=8, alpha=0.8, fontweight='bold') | |
| plt.xlabel('UMAP Component 1', fontsize=12) | |
| plt.ylabel('UMAP Component 2', fontsize=12) | |
| plt.title('Confidence Scoring: Training vs Test Geometries', fontsize=16, fontweight='bold') | |
| plt.legend() | |
| plt.grid(True, alpha=0.3) | |
| # Save plot | |
| os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
| plt.savefig(save_path, dpi=300, bbox_inches='tight') | |
| print(f"Confidence visualization saved to: {save_path}") | |
| plt.show() | |
| class Clusterer: | |
| """HDBSCAN clustering for UMAP embeddings following PhysicsNeMo methodology.""" | |
| def __init__(self, config: AnalysisConfig): | |
| self.config = config | |
| def cluster_embedding(self, embedding: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Any]: | |
| """Perform HDBSCAN clustering on UMAP embedding.""" | |
| print("🔍 Performing HDBSCAN clustering on UMAP embedding...") | |
| # Calculate adaptive min_cluster_size based on number of points | |
| num_points = len(embedding) | |
| # adaptive_min_cluster_size = min(self.config.min_cluster_size, max(2, num_points // 10)) | |
| # For very small datasets, use even smaller cluster size | |
| # if num_points < 20: | |
| # adaptive_min_cluster_size = 2 | |
| # elif num_points < 50: | |
| # adaptive_min_cluster_size = max(2, num_points // 5) | |
| print(f"📊 Clustering parameters:") | |
| print(f" - Number of points: {num_points}") | |
| print(f" - Config min_cluster_size: {self.config.min_cluster_size}") | |
| # print(f" - Adaptive min_cluster_size: {adaptive_min_cluster_size}") | |
| try: | |
| if HDBSCAN_AVAILABLE: | |
| clusterer = hdbscan.HDBSCAN( | |
| min_cluster_size=self.config.min_cluster_size, | |
| metric=self.config.metric, | |
| cluster_selection_epsilon=self.config.cluster_selection_epsilon, | |
| allow_single_cluster=self.config.allow_single_cluster, | |
| prediction_data=True # Enable prediction data for approximate_predict | |
| ) | |
| cluster_labels = clusterer.fit_predict(embedding) | |
| print("✅ Used HDBSCAN (CPU)") | |
| else: | |
| raise Exception("HDBSCAN not available") | |
| # Identify outliers (label = -1) | |
| outlier_mask = cluster_labels == -1 | |
| outliers = np.where(outlier_mask)[0] | |
| print(f"📊 Clustering Results:") | |
| print(f" - Total points: {len(cluster_labels)}") | |
| print(f" - Number of clusters: {len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)}") | |
| print(f" - Outliers: {len(outliers)} ({len(outliers)/len(cluster_labels)*100:.1f}%)") | |
| # Print density statistics | |
| if hasattr(clusterer, 'probabilities_') and clusterer.probabilities_ is not None: | |
| print(f" - Mean cluster probability: {np.mean(clusterer.probabilities_):.4f}") | |
| print(f" - Min cluster probability: {np.min(clusterer.probabilities_):.4f}") | |
| print(f" - Max cluster probability: {np.max(clusterer.probabilities_):.4f}") | |
| # Check if prediction data is available | |
| if hasattr(clusterer, 'prediction_data_') and clusterer.prediction_data_ is not None: | |
| print(f" - Prediction data: ✅ Available for approximate_predict") | |
| else: | |
| print(f" - Prediction data: ❌ Not available - will use distance-based fallback") | |
| # Check if any clusters were found | |
| unique_labels = set(cluster_labels) | |
| num_clusters = len(unique_labels) - (1 if -1 in unique_labels else 0) | |
| if num_clusters == 0: | |
| print("⚠️ No clusters found! Attempting fallback with more permissive parameters...") | |
| return self._fallback_clustering(embedding) | |
| return cluster_labels, outliers, clusterer | |
| except Exception as e: | |
| print(f"❌ Clustering failed: {e}") | |
| # def _fallback_clustering(self, embedding: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Any]: | |
| # """Fallback clustering with more permissive parameters.""" | |
| # print("🔄 Attempting fallback with more permissive parameters...") | |
| # # Try multiple fallback strategies | |
| # fallback_strategies = [ | |
| # # Strategy 1: Very permissive HDBSCAN | |
| # { | |
| # 'min_cluster_size': 2, | |
| # 'cluster_selection_epsilon': 0.0, | |
| # 'allow_single_cluster': True, | |
| # 'min_samples': 1 | |
| # }, | |
| # # Strategy 2: Even more permissive | |
| # { | |
| # 'min_cluster_size': 2, | |
| # 'cluster_selection_epsilon': 0.0, | |
| # 'allow_single_cluster': True, | |
| # 'min_samples': 1, | |
| # 'cluster_selection_method': 'eom' | |
| # }, | |
| # # Strategy 3: Single cluster fallback | |
| # { | |
| # 'min_cluster_size': 2, | |
| # 'cluster_selection_epsilon': 0.0, | |
| # 'allow_single_cluster': True, | |
| # 'min_samples': 1, | |
| # 'cluster_selection_method': 'leaf' | |
| # } | |
| # ] | |
| # for i, strategy in enumerate(fallback_strategies): | |
| # try: | |
| # if HDBSCAN_AVAILABLE: | |
| # print(f" Trying fallback strategy {i+1}...") | |
| # fallback_clusterer = hdbscan.HDBSCAN( | |
| # prediction_data=True, | |
| # **strategy | |
| # ) | |
| # fallback_labels = fallback_clusterer.fit_predict(embedding) | |
| # # Check if clusters were found | |
| # unique_labels = set(fallback_labels) | |
| # num_clusters = len(unique_labels) - (1 if -1 in unique_labels else 0) | |
| # if num_clusters > 0: | |
| # print(f"✅ Fallback strategy {i+1} succeeded with {num_clusters} clusters") | |
| # outliers = np.where(fallback_labels == -1)[0] | |
| # return fallback_labels, outliers, fallback_clusterer | |
| # else: | |
| # print(f" Strategy {i+1} found no clusters, trying next...") | |
| # except Exception as e: | |
| # print(f" Strategy {i+1} failed: {e}") | |
| # continue | |
| # # If all strategies fail, create a single cluster with all points | |
| # print("⚠️ All fallback strategies failed, creating single cluster with all points...") | |
| # try: | |
| # # Create a dummy clusterer that treats all points as one cluster | |
| # dummy_clusterer = hdbscan.HDBSCAN( | |
| # min_cluster_size=1, | |
| # cluster_selection_epsilon=0.0, | |
| # allow_single_cluster=True, | |
| # prediction_data=True | |
| # ) | |
| # # Force all points into cluster 0 | |
| # dummy_labels = np.zeros(len(embedding), dtype=int) | |
| # print("✅ Created single cluster fallback") | |
| # return dummy_labels, np.array([]), dummy_clusterer | |
| # except Exception as final_e: | |
| # print(f"❌ Final fallback also failed: {final_e}") | |
| # # Return no clusters, no outliers, and None clusterer | |
| # return np.zeros(len(embedding), dtype=int), np.array([]), None | |
| class Visualizer: | |
| """Visualization methodology.""" | |
| def __init__(self, config: AnalysisConfig): | |
| self.config = config | |
| def create_umap_visualization(self, embedding: np.ndarray, car_names: List[str], | |
| dataset_labels: List[str] = None, cluster_labels: np.ndarray = None, | |
| outliers: np.ndarray = None) -> None: | |
| """Create and save UMAP visualization with clustering.""" | |
| plt.figure(figsize=self.config.figsize) | |
| if cluster_labels is not None and len(cluster_labels) > 0: | |
| # Create visualization with clustering | |
| unique_clusters = sorted([c for c in set(cluster_labels) if c != -1]) | |
| unique_datasets = list(set(dataset_labels)) if dataset_labels else ['Dataset'] | |
| # Color maps | |
| cluster_colors = plt.cm.tab10(np.linspace(0, 1, len(unique_clusters))) | |
| dataset_colors = plt.cm.Set1(np.linspace(0, 1, len(unique_datasets))) | |
| # Plot clusters | |
| for cluster_idx, cluster_id in enumerate(unique_clusters): | |
| cluster_mask = cluster_labels == cluster_id | |
| if dataset_labels is not None: | |
| # Plot each dataset separately within cluster | |
| for dataset_idx, dataset in enumerate(unique_datasets): | |
| dataset_mask = np.array(dataset_labels) == dataset | |
| combined_mask = cluster_mask & dataset_mask | |
| if np.any(combined_mask): | |
| plt.scatter( | |
| embedding[combined_mask, 0], | |
| embedding[combined_mask, 1], | |
| s=self.config.point_size, | |
| alpha=self.config.alpha, | |
| c=[cluster_colors[cluster_idx]], | |
| marker='o', | |
| edgecolors='black', | |
| linewidth=0.5, | |
| label=f'{dataset} - Cluster {cluster_id}' if cluster_idx == 0 else "" | |
| ) | |
| else: | |
| # Plot cluster without dataset distinction | |
| plt.scatter( | |
| embedding[cluster_mask, 0], | |
| embedding[cluster_mask, 1], | |
| s=self.config.point_size, | |
| alpha=self.config.alpha, | |
| c=[cluster_colors[cluster_idx]], | |
| label=f'Cluster {cluster_id}', | |
| edgecolors='black', | |
| linewidth=0.5 | |
| ) | |
| # Plot outliers | |
| if outliers is not None and len(outliers) > 0: | |
| plt.scatter( | |
| embedding[outliers, 0], | |
| embedding[outliers, 1], | |
| s=self.config.point_size * 1.5, | |
| alpha=self.config.alpha, | |
| c='red', | |
| marker='x', | |
| linewidth=2, | |
| label='Outliers' | |
| ) | |
| plt.legend(title='Clusters', bbox_to_anchor=(1.05, 1), loc='upper left') | |
| plt.title('UMAP Visualization with HDBSCAN Clustering', fontsize=16, fontweight='bold') | |
| elif dataset_labels is not None: | |
| # Create visualization with dataset-based coloring (no clustering) | |
| unique_datasets = list(set(dataset_labels)) | |
| colors = plt.cm.Set1(np.linspace(0, 1, len(unique_datasets))) | |
| dataset_color_map = dict(zip(unique_datasets, colors)) | |
| # Plot each dataset separately | |
| for dataset in unique_datasets: | |
| mask = np.array(dataset_labels) == dataset | |
| plt.scatter( | |
| embedding[mask, 0], | |
| embedding[mask, 1], | |
| s=self.config.point_size, | |
| alpha=self.config.alpha, | |
| c=[dataset_color_map[dataset]], | |
| label=dataset, | |
| edgecolors='black', | |
| linewidth=0.5 | |
| ) | |
| plt.legend(title='Dataset', bbox_to_anchor=(1.05, 1), loc='upper left') | |
| plt.title('UMAP Visualization - Multi-Dataset Analysis', fontsize=16, fontweight='bold') | |
| else: | |
| # Create scatter plot with index-based coloring | |
| scatter = plt.scatter( | |
| embedding[:, 0], | |
| embedding[:, 1], | |
| s=self.config.point_size, | |
| alpha=self.config.alpha, | |
| c=range(len(car_names)), | |
| cmap=self.config.colormap | |
| ) | |
| # Add colorbar | |
| cbar = plt.colorbar(scatter) | |
| cbar.set_label('Model Index', fontsize=12) | |
| plt.title('UMAP Visualization of Dataset', fontsize=16, fontweight='bold') | |
| plt.xlabel('UMAP Component 1', fontsize=12) | |
| plt.ylabel('UMAP Component 2', fontsize=12) | |
| # Add labels if requested | |
| if self.config.show_labels: | |
| for i, name in enumerate(car_names): | |
| plt.annotate(name, (embedding[i, 0], embedding[i, 1]), | |
| xytext=(5, 5), textcoords='offset points', | |
| fontsize=8, alpha=0.7) | |
| plt.tight_layout() | |
| # Save plot if requested | |
| if self.config.save_plot: | |
| os.makedirs(os.path.dirname(self.config.output_path), exist_ok=True) | |
| plt.savefig(self.config.output_path, dpi=self.config.dpi, bbox_inches='tight') | |
| print(f"Plot saved to: {self.config.output_path}") | |
| plt.show() | |
| def create_comparison_plot(self, umap_embedding: np.ndarray, pca_embedding: np.ndarray, car_names: List[str], dataset_labels: List[str] = None) -> None: | |
| """Create comparison plot between UMAP and PCA.""" | |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8)) | |
| if dataset_labels is not None: | |
| # Create plots with dataset-based coloring | |
| unique_datasets = list(set(dataset_labels)) | |
| colors = plt.cm.Set1(np.linspace(0, 1, len(unique_datasets))) | |
| dataset_color_map = dict(zip(unique_datasets, colors)) | |
| # UMAP plot | |
| for dataset in unique_datasets: | |
| mask = np.array(dataset_labels) == dataset | |
| ax1.scatter( | |
| umap_embedding[mask, 0], | |
| umap_embedding[mask, 1], | |
| s=self.config.point_size, | |
| alpha=self.config.alpha, | |
| c=[dataset_color_map[dataset]], | |
| label=dataset, | |
| edgecolors='black', | |
| linewidth=0.5 | |
| ) | |
| ax1.set_title('UMAP Embedding', fontsize=14, fontweight='bold') | |
| ax1.set_xlabel('UMAP Component 1', fontsize=12) | |
| ax1.set_ylabel('UMAP Component 2', fontsize=12) | |
| ax1.legend(title='Dataset', loc='upper right') | |
| # PCA plot | |
| for dataset in unique_datasets: | |
| mask = np.array(dataset_labels) == dataset | |
| ax2.scatter( | |
| pca_embedding[mask, 0], | |
| pca_embedding[mask, 1], | |
| s=self.config.point_size, | |
| alpha=self.config.alpha, | |
| c=[dataset_color_map[dataset]], | |
| label=dataset, | |
| edgecolors='black', | |
| linewidth=0.5 | |
| ) | |
| ax2.set_title('PCA Embedding', fontsize=14, fontweight='bold') | |
| ax2.set_xlabel('PCA Component 1', fontsize=12) | |
| ax2.set_ylabel('PCA Component 2', fontsize=12) | |
| ax2.legend(title='Dataset', loc='upper right') | |
| else: | |
| # UMAP plot | |
| scatter1 = ax1.scatter( | |
| umap_embedding[:, 0], | |
| umap_embedding[:, 1], | |
| s=self.config.point_size, | |
| alpha=self.config.alpha, | |
| c=range(len(car_names)), | |
| cmap=self.config.colormap | |
| ) | |
| ax1.set_title('UMAP Embedding', fontsize=14, fontweight='bold') | |
| ax1.set_xlabel('UMAP Component 1', fontsize=12) | |
| ax1.set_ylabel('UMAP Component 2', fontsize=12) | |
| plt.colorbar(scatter1, ax=ax1, label='Model Index') | |
| # PCA plot | |
| scatter2 = ax2.scatter( | |
| pca_embedding[:, 0], | |
| pca_embedding[:, 1], | |
| s=self.config.point_size, | |
| alpha=self.config.alpha, | |
| c=range(len(car_names)), | |
| cmap=self.config.colormap | |
| ) | |
| ax2.set_title('PCA Embedding', fontsize=14, fontweight='bold') | |
| ax2.set_xlabel('PCA Component 1', fontsize=12) | |
| ax2.set_ylabel('PCA Component 2', fontsize=12) | |
| plt.colorbar(scatter2, ax=ax2, label='Model Index') | |
| plt.tight_layout() | |
| plt.show() | |
| def create_feature_analysis(self, features: np.ndarray, car_names: List[str]) -> None: | |
| """Create feature analysis plots.""" | |
| # Create comprehensive feature names list | |
| feature_names = [ | |
| 'x_length', 'y_length', 'z_length', | |
| 'aspect_ratio_xy', 'aspect_ratio_xz', 'aspect_ratio_yz', | |
| 'linearity', 'planarity', 'sphericity', 'anisotropy', | |
| 'omnivariance', 'eigenentropy', 'change_of_curvature', | |
| 'pca_eigenvalue_1', 'pca_eigenvalue_2', 'pca_eigenvalue_3' | |
| ] | |
| # Ensure we have the right number of feature names | |
| if len(feature_names) < features.shape[1]: | |
| # Add generic names for any missing features | |
| for i in range(len(feature_names), features.shape[1]): | |
| feature_names.append(f'feature_{i}') | |
| elif len(feature_names) > features.shape[1]: | |
| # Truncate if we have too many names | |
| feature_names = feature_names[:features.shape[1]] | |
| df_features = pd.DataFrame(features, columns=feature_names) | |
| # Correlation heatmap | |
| plt.figure(figsize=(12, 10)) | |
| correlation_matrix = df_features.corr() | |
| sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f') | |
| plt.title('Feature Correlation Matrix', fontsize=16, fontweight='bold') | |
| plt.tight_layout() | |
| plt.show() | |
| # Feature distribution | |
| fig, axes = plt.subplots(4, 4, figsize=(16, 16)) | |
| axes = axes.ravel() | |
| for i, feature in enumerate(feature_names[:16]): | |
| if i < len(axes): | |
| axes[i].hist(features[:, i], bins=30, alpha=0.7, edgecolor='black') | |
| axes[i].set_title(f'{feature}', fontsize=10) | |
| axes[i].set_xlabel('Value') | |
| axes[i].set_ylabel('Frequency') | |
| plt.suptitle('Feature Distributions', fontsize=16, fontweight='bold') | |
| plt.tight_layout() | |
| plt.show() | |
| class PhysicsNeMoAnalyzer: | |
| """Main analyzer class following PhysicsNeMo methodology.""" | |
| def __init__(self, config_path: str): | |
| """Initialize with configuration file.""" | |
| # Load configuration from YAML file | |
| self.config = AnalysisConfig.from_yaml(config_path) | |
| # Initialize components | |
| self.processor = PointCloudProcessor(self.config) | |
| self.data_loader = DataLoader(self.config, self.processor) | |
| self.dimensionality_reducer = DimensionalityReducer(self.config) | |
| self.clusterer = Clusterer(self.config) | |
| self.visualizer = Visualizer(self.config) | |
| self.confidence_scorer = ConfidenceScorer(self.config) | |
| def run_unified_analysis(self) -> None: | |
| """Run the unified analysis pipeline: training + confidence scoring in one go.""" | |
| print("=" * 60) | |
| print("Unified PhysicsNeMo Analysis Pipeline") | |
| print("=" * 60) | |
| # Step 1: Load training dataset | |
| print("\n📁 Step 1/6: Loading training dataset...") | |
| train_names, train_features, train_labels = self.data_loader.load_dataset() | |
| if len(train_names) == 0: | |
| print("❌ No training data found!") | |
| return | |
| print(f"✅ Successfully loaded {len(train_names)} training samples") | |
| # Step 2: Create UMAP embedding on training data | |
| print(f"\n🧠 Step 2/6: Creating UMAP embedding on training data...") | |
| train_embedding = self.dimensionality_reducer.create_umap_embedding(train_features) | |
| print("✅ UMAP embedding completed") | |
| # Step 3: Perform HDBSCAN clustering on training embedding | |
| print(f"\n🔍 Step 3/6: Performing HDBSCAN clustering...") | |
| cluster_labels, outliers, clusterer = self.clusterer.cluster_embedding(train_embedding) | |
| print("✅ HDBSCAN clustering completed") | |
| # Step 4: Load test dataset and compute confidence scores | |
| print(f"\n📁 Step 4/6: Loading test dataset and computing confidence scores...") | |
| test_names, test_features, test_labels = self.data_loader.load_test_dataset() | |
| if len(test_names) > 0: | |
| print(f"✅ Successfully loaded {len(test_names)} test samples") | |
| # Fit confidence scorer and compute scores | |
| self.confidence_scorer.fit_training_data( | |
| train_features, train_names, | |
| self.dimensionality_reducer.reducer, | |
| self.dimensionality_reducer.scaler, | |
| clusterer | |
| ) | |
| test_embedding, confidence_scores = self.confidence_scorer.compute_confidence_scores( | |
| test_features, test_names, k_neighbors=4 | |
| ) | |
| print("✅ Confidence scores computed") | |
| else: | |
| print("⚠️ No test data found, skipping confidence scoring") | |
| test_embedding, confidence_scores, mean_confidence_scores = None, None, None | |
| # Step 5: Create visualizations | |
| print(f"\n🎨 Step 5/6: Creating visualizations...") | |
| self.visualizer.create_umap_visualization(train_embedding, train_names, train_labels, cluster_labels, outliers) | |
| self.visualizer.create_feature_analysis(train_features, train_names) | |
| if test_embedding is not None: | |
| self.confidence_scorer.create_confidence_visualization( | |
| test_embedding, test_names, confidence_scores | |
| ) | |
| print("✅ Visualizations completed") | |
| # Step 6: Save all results | |
| print(f"\n💾 Step 6/6: Saving all results...") | |
| self._save_results(train_embedding, train_features, train_names, train_labels, cluster_labels, outliers, clusterer) | |
| if test_embedding is not None: | |
| self._save_confidence_results( | |
| test_embedding, test_features, test_names, test_labels, | |
| confidence_scores | |
| ) | |
| print("✅ All results saved") | |
| print("\n" + "=" * 60) | |
| print("🎉 Unified analysis complete!") | |
| print("=" * 60) | |
| def run_analysis(self) -> None: | |
| """Run the complete analysis pipeline (legacy method).""" | |
| print("=" * 60) | |
| print("PhysicsNeMo-Style Car Dataset Analysis") | |
| print("=" * 60) | |
| # Step 1: Load dataset | |
| print("\n📁 Step 1/5: Loading dataset...") | |
| car_names, features, dataset_labels = self.data_loader.load_dataset() | |
| if len(car_names) == 0: | |
| print("❌ No valid files found!") | |
| return | |
| print(f"✅ Successfully loaded {len(car_names)} models") | |
| # Step 2: Create UMAP embedding | |
| print(f"\n🧠 Step 2/5: Creating UMAP embedding...") | |
| umap_embedding = self.dimensionality_reducer.create_umap_embedding(features) | |
| print("✅ UMAP embedding completed") | |
| # Step 3: Perform HDBSCAN clustering on UMAP embedding | |
| print(f"\n🔍 Step 3/5: Performing HDBSCAN clustering...") | |
| cluster_labels, outliers, clusterer = self.clusterer.cluster_embedding(umap_embedding) | |
| print("✅ HDBSCAN clustering completed") | |
| # Step 4: Create visualizations | |
| print(f"\n🎨 Step 4/5: Creating visualizations...") | |
| self.visualizer.create_umap_visualization(umap_embedding, car_names, dataset_labels, cluster_labels, outliers) | |
| self.visualizer.create_feature_analysis(features, car_names) | |
| print("✅ Visualizations completed") | |
| # Step 5: Save results | |
| print(f"\n💾 Step 5/5: Saving results...") | |
| self._save_results(umap_embedding, features, car_names, dataset_labels, cluster_labels, outliers, clusterer) | |
| print("✅ Results saved") | |
| print("\n" + "=" * 60) | |
| print("🎉 Analysis complete!") | |
| print("=" * 60) | |
| def run_confidence_scoring(self) -> None: | |
| """Run the confidence scoring pipeline for test data.""" | |
| print("=" * 60) | |
| print("Confidence Scoring Pipeline") | |
| print("=" * 60) | |
| # Step 1: Load training data | |
| print("\n📁 Step 1/5: Loading training dataset...") | |
| train_names, train_features, train_labels = self.data_loader.load_dataset() | |
| if len(train_names) == 0: | |
| print("❌ No training data found!") | |
| return | |
| print(f"✅ Successfully loaded {len(train_names)} training samples") | |
| # Step 2: Fit UMAP on training data only | |
| print(f"\n🧠 Step 2/5: Fitting UMAP on training data...") | |
| umap_embedding = self.dimensionality_reducer.create_umap_embedding(train_features) | |
| print("✅ UMAP fitted on training data") | |
| # Step 2.5: Perform HDBSCAN clustering on training embedding | |
| print(f"\n🔍 Step 2.5/5: Performing HDBSCAN clustering...") | |
| cluster_labels, outliers, clusterer = self.clusterer.cluster_embedding(umap_embedding) | |
| print("✅ HDBSCAN clustering completed") | |
| # Step 3: Load test data | |
| print(f"\n📁 Step 3/5: Loading test dataset...") | |
| test_names, test_features, test_labels = self.data_loader.load_test_dataset() | |
| if len(test_names) == 0: | |
| print("❌ No test data found!") | |
| return | |
| print(f"✅ Successfully loaded {len(test_names)} test samples") | |
| # Step 4: Fit confidence scorer and compute scores | |
| print(f"\n🔍 Step 4/5: Computing confidence scores...") | |
| self.confidence_scorer.fit_training_data( | |
| train_features, train_names, | |
| self.dimensionality_reducer.reducer, | |
| self.dimensionality_reducer.scaler, | |
| clusterer | |
| ) | |
| test_embedding, confidence_scores = self.confidence_scorer.compute_confidence_scores( | |
| test_features, test_names, k_neighbors=4 | |
| ) | |
| print("✅ Confidence scores computed") | |
| # Step 5: Create visualizations and save results | |
| print(f"\n🎨 Step 5/5: Creating visualizations and saving results...") | |
| self.confidence_scorer.create_confidence_visualization( | |
| test_embedding, test_names, confidence_scores | |
| ) | |
| # Save confidence results | |
| self._save_confidence_results( | |
| test_embedding, test_features, test_names, test_labels, | |
| confidence_scores | |
| ) | |
| print("✅ Results saved") | |
| print("\n" + "=" * 60) | |
| print("🎉 Confidence scoring complete!") | |
| print("=" * 60) | |
| def _save_confidence_results(self, test_embedding: np.ndarray, test_features: np.ndarray, | |
| test_names: List[str], test_labels: List[str], | |
| confidence_scores: np.ndarray) -> None: | |
| """Save confidence scoring results to files.""" | |
| # Generate dynamic file names | |
| unique_datasets = list(set(test_labels)) | |
| if len(unique_datasets) == 1: | |
| base_name = unique_datasets[0].replace(" ", "_").lower() | |
| else: | |
| base_name = "combined_test" | |
| # Save test embedding | |
| # test_embedding_path = f"files/{base_name}_test_embedding.npy" | |
| # os.makedirs(os.path.dirname(test_embedding_path), exist_ok=True) | |
| # np.save(test_embedding_path, test_embedding) | |
| # print(f"Test embedding saved to: {test_embedding_path}") | |
| # # Save test features | |
| # test_features_path = f"files/{base_name}_test_features.npy" | |
| # np.save(test_features_path, test_features) | |
| # print(f"Test features saved to: {test_features_path}") | |
| # Save test names | |
| # test_names_path = f"files/{base_name}_test_names.npy" | |
| # np.save(test_names_path, test_names) | |
| # print(f"Test names saved to: {test_names_path}") | |
| # Save confidence scores | |
| # confidence_path = f"files/{base_name}_confidence_scores.npy" | |
| # np.save(confidence_path, confidence_scores) | |
| # print(f"Confidence scores saved to: {confidence_path}") | |
| # Save mean confidence scores | |
| # mean_confidence_path = f"files/{base_name}_mean_confidence_scores.npy" | |
| # np.save(mean_confidence_path, mean_confidence_scores) | |
| # print(f"Mean confidence scores saved to: {mean_confidence_path}") | |
| # Create a summary report | |
| summary_path = f"files/{base_name}_confidence_summary.txt" | |
| with open(summary_path, 'w') as f: | |
| f.write("Confidence Scoring Summary\n") | |
| f.write("=" * 30 + "\n\n") | |
| f.write(f"Number of test samples: {len(test_names)}\n") | |
| f.write(f"Confidence score range: {np.min(confidence_scores):.4f} - {np.max(confidence_scores):.4f}\n") | |
| f.write(f"Mean confidence score: {np.mean(confidence_scores):.4f}\n") | |
| f.write(f"Std confidence score: {np.std(confidence_scores):.4f}\n\n") | |
| f.write("Individual Results:\n") | |
| f.write("-" * 20 + "\n") | |
| for i, (name, conf) in enumerate(zip(test_names, confidence_scores)): | |
| f.write(f"{i+1:3d}. {name:20s} | Confidence: {conf:.4f}\n") | |
| print(f"Summary report saved to: {summary_path}") | |
| def _save_results(self, embedding: np.ndarray, features: np.ndarray, car_names: List[str], | |
| dataset_labels: List[str], cluster_labels: np.ndarray = None, outliers: np.ndarray = None, clusterer: Any = None) -> None: | |
| """Save results to files.""" | |
| # Generate dynamic file names based on dataset labels | |
| unique_datasets = list(set(dataset_labels)) | |
| if len(unique_datasets) == 1: | |
| # Single dataset - use the label name | |
| base_name = unique_datasets[0].replace(" ", "_").lower() | |
| else: | |
| # Multiple datasets - use "combined" | |
| base_name = "combined" | |
| # Generate file paths (save to files/ directory) | |
| embedding_path = f"files/{base_name}_umap_embedding.npy" | |
| features_path = f"files/{base_name}_features.npy" | |
| labels_path = f"files/{base_name}_names.npy" | |
| dataset_labels_path = f"files/{base_name}_dataset_labels.npy" | |
| # Save embedding | |
| if self.config.save_embedding: | |
| # Only create directory if there's a directory path | |
| dir_path = os.path.dirname(embedding_path) | |
| if dir_path: | |
| os.makedirs(dir_path, exist_ok=True) | |
| np.save(embedding_path, embedding) | |
| print(f"Embedding saved to: {embedding_path}") | |
| # Save features | |
| if self.config.save_features: | |
| # Only create directory if there's a directory path | |
| dir_path = os.path.dirname(features_path) | |
| if dir_path: | |
| os.makedirs(dir_path, exist_ok=True) | |
| np.save(features_path, features) | |
| print(f"Features saved to: {features_path}") | |
| # Save labels | |
| if self.config.save_labels: | |
| # Only create directory if there's a directory path | |
| dir_path = os.path.dirname(labels_path) | |
| if dir_path: | |
| os.makedirs(dir_path, exist_ok=True) | |
| np.save(labels_path, car_names) | |
| print(f"Names saved to: {labels_path}") | |
| # Save dataset labels | |
| # Only create directory if there's a directory path | |
| dir_path = os.path.dirname(dataset_labels_path) | |
| if dir_path: | |
| os.makedirs(dir_path, exist_ok=True) | |
| np.save(dataset_labels_path, dataset_labels) | |
| print(f"Dataset labels saved to: {dataset_labels_path}") | |
| # Save UMAP reducer, scaler, and clusterer for reuse in Gradio demo | |
| self._save_umap_components(base_name, clusterer) | |
| # Save clustering results | |
| # if cluster_labels is not None: | |
| # cluster_labels_path = self.config.labels_path.replace('.npy', '_cluster_labels.npy') | |
| # os.makedirs(os.path.dirname(cluster_labels_path), exist_ok=True) | |
| # np.save(cluster_labels_path, cluster_labels) | |
| # print(f"Cluster labels saved to: {cluster_labels_path}") | |
| # if outliers is not None and len(outliers) > 0: | |
| # outliers_path = self.config.labels_path.replace('.npy', '_outliers.npy') | |
| # os.makedirs(os.path.dirname(outliers_path), exist_ok=True) | |
| # np.save(outliers_path, outliers) | |
| # print(f"Outliers saved to: {outliers_path}") | |
| def _save_umap_components(self, base_name: str, clusterer=None) -> None: | |
| """Save UMAP reducer, scaler, and clusterer for reuse in Gradio demo.""" | |
| import pickle | |
| # Save UMAP reducer | |
| reducer_path = f"files/{base_name}_umap_reducer.pkl" | |
| dir_path = os.path.dirname(reducer_path) | |
| if dir_path: | |
| os.makedirs(dir_path, exist_ok=True) | |
| with open(reducer_path, 'wb') as f: | |
| pickle.dump(self.dimensionality_reducer.reducer, f) | |
| print(f"UMAP reducer saved to: {reducer_path}") | |
| # Save scaler | |
| scaler_path = f"files/{base_name}_scaler.pkl" | |
| with open(scaler_path, 'wb') as f: | |
| pickle.dump(self.dimensionality_reducer.scaler, f) | |
| print(f"Scaler saved to: {scaler_path}") | |
| # Save HDBSCAN clusterer if provided | |
| if clusterer is not None: | |
| clusterer_path = f"files/{base_name}_hdbscan_clusterer.pkl" | |
| with open(clusterer_path, 'wb') as f: | |
| pickle.dump(clusterer, f) | |
| print(f"HDBSCAN clusterer saved to: {clusterer_path}") | |
| def main(): | |
| """Main function to run the PhysicsNeMo-style analysis.""" | |
| import sys | |
| config_path = "/raid/ansysai/udbhav/alphaLPFM/similarity/umap/config.yaml" | |
| if not os.path.exists(config_path): | |
| print(f"Configuration file not found: {config_path}") | |
| return | |
| # Create analyzer | |
| analyzer = PhysicsNeMoAnalyzer(config_path) | |
| # Check command line arguments for mode | |
| if len(sys.argv) > 1: | |
| if sys.argv[1] == "confidence": | |
| print("Running confidence scoring pipeline only...") | |
| analyzer.run_confidence_scoring() | |
| elif sys.argv[1] == "train": | |
| print("Running training analysis only...") | |
| analyzer.run_analysis() | |
| else: | |
| print(f"Unknown argument: {sys.argv[1]}") | |
| print("Usage: python run_umap.py [train|confidence]") | |
| print(" - No argument: Run unified pipeline (training + confidence scoring)") | |
| print(" - train: Run training analysis only") | |
| print(" - confidence: Run confidence scoring only") | |
| else: | |
| print("Running unified pipeline (training + confidence scoring)...") | |
| analyzer.run_unified_analysis() | |
| if __name__ == "__main__": | |
| main() |