import pyvista as pv import numpy as np from scipy.spatial.distance import jensenshannon, cosine from scipy.ndimage import gaussian_filter import matplotlib.pyplot as plt import os # -------------------- PyVista Config -------------------- pv.OFF_SCREEN = True pv.set_plot_theme("document") # -------------------- Paths -------------------- dataset = "cadillac" train_folder = f'/raid/ansysai/pkakka/6-Transformers/comparePhysicsLM/Data/{dataset}/' train_save_dir = os.path.join(train_folder, f"../../metrics/{dataset}/") os.makedirs(train_save_dir, exist_ok=True) # -------------------- Utility Functions -------------------- def get_points(file_path, max_points=5000): try: mesh = pv.read(file_path) points = mesh.points if len(points) > max_points: indices = np.random.choice(len(points), max_points, replace=False) points = points[indices] return points except Exception as e: raise ValueError(f"Error reading {file_path}: {e}") # -------------------- Training Histogram -------------------- def compute_training_dist(train_folder, output_file='train_dist.npz', num_bins=25, smooth_sigma=1): train_txt_path = os.path.join(train_folder, '1_VTK_surface/train.txt') if not os.path.exists(train_txt_path): raise ValueError(f"train.txt not found at {train_txt_path}") with open(train_txt_path, 'r') as f: folder_names = [line.strip() for line in f if line.strip()] train_files = [] for name in folder_names: vtp_file = os.path.join(train_folder, '1_VTK_surface', name, f'{name}.vtp') if os.path.exists(vtp_file): train_files.append(vtp_file) else: print(f"Warning: VTP not found: {vtp_file}") if not train_files: raise ValueError("No training VTPs found.") train_points = np.concatenate([get_points(f) for f in train_files], axis=0) bin_edges = [np.histogram_bin_edges(train_points[:, i], bins=num_bins) for i in range(3)] train_hist, _ = np.histogramdd(train_points, bins=bin_edges, density=True) train_hist = gaussian_filter(train_hist, sigma=smooth_sigma).flatten() train_hist /= train_hist.sum() np.savez(os.path.join(train_folder, output_file), hist=train_hist, edges0=bin_edges[0], edges1=bin_edges[1], edges2=bin_edges[2]) print(f"Training histogram saved: {output_file} ({train_points.shape[0]} points)") # -------------------- Multi-Metric Similarity -------------------- def compute_similarity_scores(test_file, train_dist_file='train_dist.npz', smooth_sigma=1): data = np.load(os.path.join(train_save_dir, train_dist_file)) train_hist = data['hist'] bin_edges = [data['edges0'], data['edges1'], data['edges2']] test_points = get_points(test_file) test_hist, _ = np.histogramdd(test_points, bins=bin_edges, density=True) test_hist = gaussian_filter(test_hist, sigma=smooth_sigma).flatten() test_hist /= test_hist.sum() epsilon = 1e-12 train_hist_safe = train_hist + epsilon test_hist_safe = test_hist + epsilon # Metrics scores = {} # 1. JS scores['JS'] = 1 - jensenshannon(train_hist_safe, test_hist_safe) # 2. KL (as similarity) scores['KL'] = 1 / (1 + np.sum(train_hist_safe * np.log(train_hist_safe / test_hist_safe))) # 3. Hellinger scores['Hellinger'] = 1 - (1/np.sqrt(2)) * np.linalg.norm(np.sqrt(train_hist_safe) - np.sqrt(test_hist_safe)) # 4. Bhattacharyya scores['Bhattacharyya'] = np.sum(np.sqrt(train_hist_safe * test_hist_safe)) # 5. Cosine scores['Cosine'] = 1 - cosine(train_hist_safe, test_hist_safe) # 6. L1 scores['L1'] = 1 / (1 + np.sum(np.abs(train_hist_safe - test_hist_safe))) # 7. L2 scores['L2'] = 1 / (1 + np.linalg.norm(train_hist_safe - test_hist_safe)) return scores # -------------------- Analyze Test Folder -------------------- def analyze_and_save_scores(test_folder, train_dist_file='train_dist.npz', output_file='test_similarity_scores.txt', smooth_sigma=1): test_txt_path = os.path.join(train_folder, '1_VTK_surface/test.txt') if not os.path.exists(test_txt_path): raise ValueError(f"test.txt not found at {test_txt_path}") with open(test_txt_path, 'r') as f: folder_names = [line.strip() for line in f if line.strip()] output_path = os.path.join(train_save_dir, output_file) all_scores = [] with open(output_path, 'w') as f_out: header = "Test_File\tJS\tKL\tHellinger\tBhattacharyya\tCosine\tL1\tL2\n" f_out.write(header) for name in folder_names: vtp_file = os.path.join(train_folder, '1_VTK_surface', name, f'{name}.vtp') if os.path.exists(vtp_file): try: scores = compute_similarity_scores(vtp_file, train_dist_file, smooth_sigma) all_scores.append(list(scores.values())) f_out.write(f"{name}\t" + "\t".join([f"{s:.6f}" for s in scores.values()]) + "\n") # Convert to regular floats with 3 decimal places for clean printing scores_clean = {k: round(float(v), 3) for k, v in scores.items()} print(f"{name}: {scores_clean}") except Exception as e: print(f"Error for {name}: {e}") f_out.write(f"{name}\tERROR\n") else: print(f"Warning: VTP not found: {vtp_file}") f_out.write(f"{name}\tERROR\n") return folder_names, np.array(all_scores) # -------------------- Main -------------------- if __name__ == "__main__": try: print("Computing training histogram...") compute_training_dist(train_folder, num_bins=25, smooth_sigma=1) print("Analyzing test cases with multiple metrics...") names, scores_array = analyze_and_save_scores(train_folder, smooth_sigma=1) print("Analysis completed successfully!") except Exception as e: print(f"Error during analysis: {e}") import traceback traceback.print_exc()