Spaces:
Sleeping
Sleeping
| import pyvista as pv | |
| import numpy as np | |
| from scipy.spatial.distance import jensenshannon, cosine | |
| from scipy.ndimage import gaussian_filter | |
| import matplotlib.pyplot as plt | |
| import os | |
| # -------------------- PyVista Config -------------------- | |
| pv.OFF_SCREEN = True | |
| pv.set_plot_theme("document") | |
| # -------------------- Paths -------------------- | |
| dataset = "cadillac" | |
| train_folder = f'/raid/ansysai/pkakka/6-Transformers/comparePhysicsLM/Data/{dataset}/' | |
| train_save_dir = os.path.join(train_folder, f"../../metrics/{dataset}/") | |
| os.makedirs(train_save_dir, exist_ok=True) | |
| # -------------------- Utility Functions -------------------- | |
| def get_points(file_path, max_points=5000): | |
| try: | |
| mesh = pv.read(file_path) | |
| points = mesh.points | |
| if len(points) > max_points: | |
| indices = np.random.choice(len(points), max_points, replace=False) | |
| points = points[indices] | |
| return points | |
| except Exception as e: | |
| raise ValueError(f"Error reading {file_path}: {e}") | |
| # -------------------- Training Histogram -------------------- | |
| def compute_training_dist(train_folder, output_file='train_dist.npz', num_bins=25, smooth_sigma=1): | |
| train_txt_path = os.path.join(train_folder, '1_VTK_surface/train.txt') | |
| if not os.path.exists(train_txt_path): | |
| raise ValueError(f"train.txt not found at {train_txt_path}") | |
| with open(train_txt_path, 'r') as f: | |
| folder_names = [line.strip() for line in f if line.strip()] | |
| train_files = [] | |
| for name in folder_names: | |
| vtp_file = os.path.join(train_folder, '1_VTK_surface', name, f'{name}.vtp') | |
| if os.path.exists(vtp_file): | |
| train_files.append(vtp_file) | |
| else: | |
| print(f"Warning: VTP not found: {vtp_file}") | |
| if not train_files: | |
| raise ValueError("No training VTPs found.") | |
| train_points = np.concatenate([get_points(f) for f in train_files], axis=0) | |
| bin_edges = [np.histogram_bin_edges(train_points[:, i], bins=num_bins) for i in range(3)] | |
| train_hist, _ = np.histogramdd(train_points, bins=bin_edges, density=True) | |
| train_hist = gaussian_filter(train_hist, sigma=smooth_sigma).flatten() | |
| train_hist /= train_hist.sum() | |
| np.savez(os.path.join(train_folder, output_file), | |
| hist=train_hist, edges0=bin_edges[0], edges1=bin_edges[1], edges2=bin_edges[2]) | |
| print(f"Training histogram saved: {output_file} ({train_points.shape[0]} points)") | |
| # -------------------- Multi-Metric Similarity -------------------- | |
| def compute_similarity_scores(test_file, train_dist_file='train_dist.npz', smooth_sigma=1): | |
| data = np.load(os.path.join(train_save_dir, train_dist_file)) | |
| train_hist = data['hist'] | |
| bin_edges = [data['edges0'], data['edges1'], data['edges2']] | |
| test_points = get_points(test_file) | |
| test_hist, _ = np.histogramdd(test_points, bins=bin_edges, density=True) | |
| test_hist = gaussian_filter(test_hist, sigma=smooth_sigma).flatten() | |
| test_hist /= test_hist.sum() | |
| epsilon = 1e-12 | |
| train_hist_safe = train_hist + epsilon | |
| test_hist_safe = test_hist + epsilon | |
| # Metrics | |
| scores = {} | |
| # 1. JS | |
| scores['JS'] = 1 - jensenshannon(train_hist_safe, test_hist_safe) | |
| # 2. KL (as similarity) | |
| scores['KL'] = 1 / (1 + np.sum(train_hist_safe * np.log(train_hist_safe / test_hist_safe))) | |
| # 3. Hellinger | |
| scores['Hellinger'] = 1 - (1/np.sqrt(2)) * np.linalg.norm(np.sqrt(train_hist_safe) - np.sqrt(test_hist_safe)) | |
| # 4. Bhattacharyya | |
| scores['Bhattacharyya'] = np.sum(np.sqrt(train_hist_safe * test_hist_safe)) | |
| # 5. Cosine | |
| scores['Cosine'] = 1 - cosine(train_hist_safe, test_hist_safe) | |
| # 6. L1 | |
| scores['L1'] = 1 / (1 + np.sum(np.abs(train_hist_safe - test_hist_safe))) | |
| # 7. L2 | |
| scores['L2'] = 1 / (1 + np.linalg.norm(train_hist_safe - test_hist_safe)) | |
| return scores | |
| # -------------------- Analyze Test Folder -------------------- | |
| def analyze_and_save_scores(test_folder, train_dist_file='train_dist.npz', output_file='test_similarity_scores.txt', smooth_sigma=1): | |
| test_txt_path = os.path.join(train_folder, '1_VTK_surface/test.txt') | |
| if not os.path.exists(test_txt_path): | |
| raise ValueError(f"test.txt not found at {test_txt_path}") | |
| with open(test_txt_path, 'r') as f: | |
| folder_names = [line.strip() for line in f if line.strip()] | |
| output_path = os.path.join(train_save_dir, output_file) | |
| all_scores = [] | |
| with open(output_path, 'w') as f_out: | |
| header = "Test_File\tJS\tKL\tHellinger\tBhattacharyya\tCosine\tL1\tL2\n" | |
| f_out.write(header) | |
| for name in folder_names: | |
| vtp_file = os.path.join(train_folder, '1_VTK_surface', name, f'{name}.vtp') | |
| if os.path.exists(vtp_file): | |
| try: | |
| scores = compute_similarity_scores(vtp_file, train_dist_file, smooth_sigma) | |
| all_scores.append(list(scores.values())) | |
| f_out.write(f"{name}\t" + "\t".join([f"{s:.6f}" for s in scores.values()]) + "\n") | |
| # Convert to regular floats with 3 decimal places for clean printing | |
| scores_clean = {k: round(float(v), 3) for k, v in scores.items()} | |
| print(f"{name}: {scores_clean}") | |
| except Exception as e: | |
| print(f"Error for {name}: {e}") | |
| f_out.write(f"{name}\tERROR\n") | |
| else: | |
| print(f"Warning: VTP not found: {vtp_file}") | |
| f_out.write(f"{name}\tERROR\n") | |
| return folder_names, np.array(all_scores) | |
| # -------------------- Main -------------------- | |
| if __name__ == "__main__": | |
| try: | |
| print("Computing training histogram...") | |
| compute_training_dist(train_folder, num_bins=25, smooth_sigma=1) | |
| print("Analyzing test cases with multiple metrics...") | |
| names, scores_array = analyze_and_save_scores(train_folder, smooth_sigma=1) | |
| print("Analysis completed successfully!") | |
| except Exception as e: | |
| print(f"Error during analysis: {e}") | |
| import traceback | |
| traceback.print_exc() | |