import pyvista as pv import numpy as np import torch import os from typing import List # Configuration (adjust paths as needed) dataset = "plane_transonic" # Change dataset name as needed train_folder = f'/raid/ansysai/pkakka/6-Transformers/comparePhysicsLM/Data/{dataset}/' train_save_dir = os.path.join(train_folder, f"../../metrics/{dataset}/") test_dir = os.path.join(train_save_dir, f"Transformer_baseline_ep1000/best_case/vtk_files/") os.makedirs(train_save_dir, exist_ok=True) def normalize_point_cloud(pc: torch.Tensor) -> torch.Tensor: """ Center and scale point cloud to unit bounding box. pc: (N, 3) tensor of points. """ pc = pc - pc.mean(dim=0, keepdim=True) # Center at origin max_dist = torch.max(torch.norm(pc, dim=1)) if max_dist > 0: pc = pc / max_dist # Scale to unit sphere (approx) return pc def chamfer_distance(pc1: torch.Tensor, pc2: torch.Tensor, batch_size: int = 1000) -> float: """ Compute bidirectional Chamfer distance between two point clouds with memory optimization. pc1: (N, 3) tensor, pc2: (M, 3) tensor. batch_size: Process points in batches to reduce memory usage. Returns scalar distance (lower = more similar). """ if pc1.shape[0] == 0 or pc2.shape[0] == 0: return float('inf') # Subsample if point clouds are too large to save memory max_points = 5000 if pc1.shape[0] > max_points: indices = torch.randperm(pc1.shape[0])[:max_points] pc1 = pc1[indices] if pc2.shape[0] > max_points: indices = torch.randperm(pc2.shape[0])[:max_points] pc2 = pc2[indices] # Compute distances in batches to avoid memory issues min_dists_1 = [] for i in range(0, pc1.shape[0], batch_size): batch1 = pc1[i:i+batch_size] # Compute distances from this batch to all points in pc2 dists = torch.cdist(batch1, pc2) # (batch_size, M) min_dists_1.append(dists.min(dim=1)[0]) del dists # Free memory immediately min_dists_2 = [] for i in range(0, pc2.shape[0], batch_size): batch2 = pc2[i:i+batch_size] # Compute distances from this batch to all points in pc1 dists = torch.cdist(batch2, pc1) # (batch_size, N) min_dists_2.append(dists.min(dim=1)[0]) del dists # Free memory immediately # Combine results dist1 = torch.cat(min_dists_1).mean() # Avg min distance from pc1 to pc2 dist2 = torch.cat(min_dists_2).mean() # Avg min distance from pc2 to pc1 return (dist1 + dist2).item() def get_point_cloud(vtp_file: str) -> torch.Tensor: """ Extract point cloud (vertex coordinates) from VTP file. Returns: (N, 3) tensor. """ try: mesh = pv.read(vtp_file) points = mesh.points # (N, 3) NumPy array return torch.tensor(points, dtype=torch.float32) except Exception as e: raise ValueError(f"Error reading points from {vtp_file}: {e}") # Part 1: Combine training point clouds and save def compute_training_point_cloud(train_folder: str, output_file: str = 'train_points.pt') -> torch.Tensor: """ Combine all training VTP point clouds into a single point cloud. Saves to train_save_dir and returns the tensor. """ # Read train.txt to get list of training VTP files train_txt_path = os.path.join(train_folder, '1_VTK_surface/train.txt') if not os.path.exists(train_txt_path): raise ValueError(f"train.txt not found at {train_txt_path}") with open(train_txt_path, 'r') as f: folder_names = [line.strip() for line in f if line.strip()] # Construct full paths to VTP files train_files = [] for folder_name in folder_names: vtp_file = os.path.join(train_folder, '1_VTK_surface', folder_name, f'{folder_name}.vtp') if os.path.exists(vtp_file): train_files.append(vtp_file) else: print(f"Warning: VTP file not found: {vtp_file}") if not train_files: raise ValueError("No VTP files found from train.txt list") # Combine all point clouds train_point_clouds = [get_point_cloud(f) for f in train_files] combined_points = torch.cat(train_point_clouds, dim=0) # (N, 3) # Subsample if too many points to manage memory max_train_points = 50000 if combined_points.shape[0] > max_train_points: print(f"Subsampling training points from {combined_points.shape[0]} to {max_train_points}") indices = torch.randperm(combined_points.shape[0])[:max_train_points] combined_points = combined_points[indices] # Normalize combined_points = normalize_point_cloud(combined_points) # Save output_path = os.path.join(train_save_dir, output_file) torch.save(combined_points, output_path) print(f"Training point cloud saved to {output_path} ({combined_points.shape[0]} points)") return combined_points # Part 2: Compute Chamfer distance for test VTP files def compute_test_chamfer_scores(test_dir: str, train_points_file: str = 'train_points.pt', output_file: str = 'test_chamfer_scores.txt', use_gpu: bool = False): """ Compute Chamfer distance for each test VTP file against the training point cloud. Uses the raw test VTP files from the training directory structure. Saves scores to output_file and prints statistics. """ # Load training point cloud train_points_path = os.path.join(train_save_dir, train_points_file) if not os.path.exists(train_points_path): raise ValueError(f"Training point cloud not found at {train_points_path}") train_points = torch.load(train_points_path) if use_gpu and torch.cuda.is_available(): train_points = train_points.to('cuda') # Read test.txt to get list of test case names test_txt_path = os.path.join(train_folder, '1_VTK_surface/test.txt') if not os.path.exists(test_txt_path): raise ValueError(f"test.txt not found at {test_txt_path}") with open(test_txt_path, 'r') as f: test_case_names = [line.strip() for line in f if line.strip()] if not test_case_names: raise ValueError("No test cases found in test.txt") # Output file output_path = os.path.join(os.path.dirname(test_dir), output_file) # Compute scores scores = [] with open(output_path, 'w') as f: f.write("Test_File\tChamfer_Score\n") for test_case in test_case_names: # Look for test VTP files in the original training directory structure vtp_file = os.path.join(train_folder, '1_VTK_surface', test_case, f'{test_case}.vtp') if not os.path.exists(vtp_file): print(f"Warning: VTP file not found for test case {test_case}: {vtp_file}") f.write(f"{test_case}\tERROR: VTP file not found\n") continue try: # Get test point cloud test_points = get_point_cloud(vtp_file) if use_gpu and torch.cuda.is_available(): test_points = test_points.to('cuda') # Normalize test_points = normalize_point_cloud(test_points) # Compute Chamfer distance score = chamfer_distance(test_points, train_points) scores.append(score) f.write(f"{test_case}\t{score:.6f}\n") print(f"Chamfer Score for {test_case}: {score:.6f}") # Clear GPU memory if using GPU if use_gpu and torch.cuda.is_available(): torch.cuda.empty_cache() del test_points # Free memory except Exception as e: print(f"Error computing score for {test_case}: {e}") f.write(f"{test_case}\tERROR: {e}\n") # Clear GPU memory on error too if use_gpu and torch.cuda.is_available(): torch.cuda.empty_cache() print(f"\nTest scores saved to {output_path}") # Print statistics if scores: print(f"\nChamfer Distance Statistics:") print(f"Average Chamfer Score: {np.mean(scores):.6f}") print(f"Std Chamfer Score: {np.std(scores):.6f}") print(f"Min Chamfer Score: {np.min(scores):.6f}") print(f"Max Chamfer Score: {np.max(scores):.6f}") return scores # Example Usage if __name__ == "__main__": # Step 1: Compute and save training point cloud #train_points = compute_training_point_cloud(train_folder) # Step 2: Compute Chamfer scores for test VTP files scores = compute_test_chamfer_scores(test_dir, use_gpu=False)