Spaces:
Sleeping
Sleeping
| import pyvista as pv | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import os | |
| from umap.umap_ import UMAP # Correct import for umap-learn | |
| # Configuration | |
| dataset = "plane_transonic" | |
| train_folder = f'/raid/ansysai/pkakka/6-Transformers/comparePhysicsLM/Data/{dataset}/' | |
| train_save_dir = os.path.join(train_folder, f"../../metrics/{dataset}/") | |
| os.makedirs(train_save_dir, exist_ok=True) | |
| def normalize_points(points): | |
| """Center and scale points to unit bounding box.""" | |
| points = points - np.mean(points, axis=0) | |
| max_dist = np.max(np.linalg.norm(points, axis=1)) | |
| if max_dist > 0: | |
| points = points / max_dist | |
| return points | |
| def get_points(file_path, max_points=1000): | |
| """Extract and subsample point cloud from VTP file.""" | |
| try: | |
| mesh = pv.read(file_path) | |
| points = mesh.points | |
| if len(points) > max_points: | |
| indices = np.random.choice(len(points), max_points, replace=False) | |
| points = points[indices] | |
| return normalize_points(points).flatten() # Flatten to 1D for UMAP | |
| except Exception as e: | |
| raise ValueError(f"Error reading {file_path}: {e}") | |
| def compute_umap_embeddings(train_folder, output_file='umap_embeddings.npz'): | |
| """Compute UMAP embeddings for training and test VTPs.""" | |
| train_txt_path = os.path.join(train_folder, '1_VTK_surface/train.txt') | |
| test_txt_path = os.path.join(train_folder, '1_VTK_surface/test.txt') | |
| for path in [train_txt_path, test_txt_path]: | |
| if not os.path.exists(path): | |
| raise ValueError(f"{path} not found") | |
| with open(train_txt_path, 'r') as f: | |
| train_names = [line.strip() for line in f if line.strip()] | |
| with open(test_txt_path, 'r') as f: | |
| test_names = [line.strip() for line in f if line.strip()] | |
| train_files = [] | |
| for name in train_names: | |
| vtp_file = os.path.join(train_folder, '1_VTK_surface', name, f'{name}.vtp') | |
| if os.path.exists(vtp_file): | |
| train_files.append((name, vtp_file)) | |
| else: | |
| print(f"Warning: Training VTP not found: {vtp_file}") | |
| test_files = [] | |
| for name in test_names: | |
| vtp_file = os.path.join(train_folder, '1_VTK_surface', name, f'{name}.vtp') | |
| if os.path.exists(vtp_file): | |
| test_files.append((name, vtp_file)) | |
| else: | |
| print(f"Warning: Test VTP not found: {vtp_file}") | |
| if not train_files or not test_files: | |
| raise ValueError("No valid training or test VTPs found") | |
| train_features = [get_points(vtp) for _, vtp in train_files] | |
| test_features = [get_points(vtp) for _, vtp in test_files] | |
| all_features = train_features + test_features | |
| reducer = UMAP(n_components=2, random_state=42, n_neighbors=15, min_dist=0.1) | |
| embeddings = reducer.fit_transform(all_features) | |
| train_embeddings = embeddings[:len(train_files)] | |
| test_embeddings = embeddings[len(train_files):] | |
| output_path = os.path.join(train_save_dir, output_file) | |
| np.savez(output_path, train_emb=train_embeddings, test_emb=test_embeddings, | |
| train_names=[n for n, _ in train_files], test_names=[n for n, _ in test_files]) | |
| print(f"UMAP embeddings saved: {output_path}") | |
| return train_embeddings, test_embeddings, [n for n, _ in train_files], [n for n, _ in test_files] | |
| def compute_umap_scores(train_embeddings, test_embeddings, test_names, output_file='test_umap_scores.txt'): | |
| """Compute similarity scores based on UMAP space distances.""" | |
| scores = [] | |
| output_path = os.path.join(train_save_dir, output_file) | |
| with open(output_path, 'w') as f: | |
| f.write("Test_File\tUMAP_Score\n") | |
| for i, (test_emb, name) in enumerate(zip(test_embeddings, test_names)): | |
| distances = np.linalg.norm(train_embeddings - test_emb, axis=1) | |
| median_dist = np.median(distances) | |
| score = 1 / (1 + median_dist) # 0-1, higher = closer | |
| scores.append(score) | |
| print(f"{name}: {score:.4f}") | |
| f.write(f"{name}\t{score:.6f}\n") | |
| if scores: | |
| print(f"\nAverage Score: {np.mean(scores):.4f} ± {np.std(scores):.4f}") | |
| print(f"Min/Max: {np.min(scores):.4f} / {np.max(scores):.4f}") | |
| plt.figure(figsize=(8, 6)) | |
| plt.scatter(train_embeddings[:, 0], train_embeddings[:, 1], c='blue', label='Train', alpha=0.6) | |
| plt.scatter(test_embeddings[:, 0], test_embeddings[:, 1], c='red', label='Test', alpha=0.6) | |
| plt.xlabel('UMAP 1') | |
| plt.ylabel('UMAP 2') | |
| plt.title('UMAP Embeddings of Geometries') | |
| plt.legend() | |
| plt.savefig(os.path.join(train_save_dir, 'umap_plot.png')) | |
| plt.close() | |
| return scores | |
| def visualize_sample(test_folder): | |
| """Plot the first test geometry.""" | |
| test_txt_path = os.path.join(test_folder, '1_VTK_surface/test.txt') | |
| with open(test_txt_path, 'r') as f: | |
| folder_names = [line.strip() for line in f if line.strip()] | |
| if folder_names: | |
| name = folder_names[0] | |
| vtp_file = os.path.join(test_folder, '1_VTK_surface', name, f'{name}.vtp') | |
| if os.path.exists(vtp_file): | |
| mesh = pv.read(vtp_file) | |
| plotter = pv.Plotter() | |
| plotter.add_mesh(mesh, color='blue', show_edges=True) | |
| plotter.add_title(f'Sample Geometry: {name}') | |
| plotter.show() | |
| # Run analysis | |
| if __name__ == "__main__": | |
| train_emb, test_emb, train_names, test_names = compute_umap_embeddings(train_folder) | |
| scores = compute_umap_scores(train_emb, test_emb, test_names) | |
| visualize_sample(train_folder) |