""" DrivAerML Dataset with Memory-Efficient Presampling Support This dataset implements presampling functionality for training, validation and test data. The presampling feature ensures consistent results across different runs, with memory-efficient on-demand loading. Presampling Workflow: 1. Set presampled=False in config to create presampled training, validation and test data 2. The system creates fixed samples for all splits 3. Each run's presampled data is saved as individual files in a directory structure 4. Set presampled=True in config to use the saved presampled data for future runs Directory Structure: presampled_data_path/ ├── train/ │ ├── run_1.npy │ ├── run_2.npy │ └── ... ├── validation/ │ ├── run_1.npy │ ├── run_2.npy │ └── ... └── test/ ├── run_1.npy ├── run_2.npy └── ... Configuration Parameters: - presampled: Boolean flag to control whether to use presampled data - presampled_data_path: Base path where presampled data directory is created Usage: - First run: Set presampled=False to create presampled data - Subsequent runs: Set presampled=True to use existing presampled data """ import os import numpy as np import torch from torch.utils.data import Dataset, DataLoader from torch.utils.data import default_collate import json import re from sklearn.cluster import KMeans from sklearn.neighbors import NearestNeighbors def create_presampled_data(cfg, splits, save_path): """ Create presampled training, validation and test data with fixed random sampling. Saves individual files for each run to enable on-demand loading. Args: cfg: Configuration object splits: Dictionary containing train/validation/test splits save_path: Base path for saving presampled data (directory will be created) """ print("Creating presampled training, validation and test data...") # Create directory structure for presampled data base_dir = os.path.splitext(save_path)[0] # Remove .npy extension if present os.makedirs(base_dir, exist_ok=True) # Set seed for reproducible sampling np.random.seed(0) for split_type in ['train', 'validation', 'test']: print(f"Processing {split_type} split...") split_runs = splits[split_type] # Create subdirectory for this split split_dir = os.path.join(base_dir, split_type) os.makedirs(split_dir, exist_ok=True) for run_number in split_runs: # Find the corresponding .npy file for f in os.listdir(cfg.data_dir): if f.endswith('.npy'): match = re.search(r'run_(\d+)', f) ## Very inefficient? if match and int(match.group(1)) == run_number: npy_file_path = os.path.join(cfg.data_dir, f) # Load the original data data = np.load(npy_file_path, allow_pickle=True).item() coordinates = data['surface_mesh_centers'] field = data['surface_fields'] # Sample points with fixed seed for reproducibility sample_indices = np.random.choice(coordinates.shape[0], cfg.num_points, replace=False) sampled_coordinates = coordinates[sample_indices, :] sampled_field = field[sample_indices, :] # Save individual presampled file for this run presampled_run_data = { 'surface_mesh_centers': sampled_coordinates, 'surface_fields': sampled_field } run_file_path = os.path.join(split_dir, f'run_{run_number}.npy') np.save(run_file_path, presampled_run_data) break print(f"Presampled data saved to directory: {base_dir}") print(f"Structure: {base_dir}/{{train,validation,test}}/run_{{number}}.npy") return base_dir class DrivAerMLDataset(Dataset): def __init__(self, cfg, splits = None, split_type = 'train', presampled = False, save_presampled_data_path = None): """ Initializes the DrivAerMLDataset instance. Args: cfg: Configuration object containing data directory and number of points splits: List of run numbers to include, if None includes all files split_type: Type of split ('train', 'validation', 'test') presampled: Whether to use presampled data save_presampled_data_path: Base path to the presampled data directory """ self.data_dir = cfg.data_dir self.chunked_eval = cfg.chunked_eval self.splits = splits # Store only run numbers and create filename mapping for efficiency self.run_numbers = [] self.original_filenames = {} # run_number -> original filename for f in os.listdir(cfg.data_dir): if f.endswith('.npy'): match = re.search(r'run_(\d+)', f) if match: run_number = int(match.group(1)) if run_number in splits: self.run_numbers.append(run_number) self.original_filenames[run_number] = f if len(self.run_numbers) == 0: raise ValueError(f"No .npy files found in directory: {cfg.data_dir}") self.num_points = cfg.num_points self.split_type = split_type self.presampled = presampled # Is there a script for non presampled dataloader? # Set up presampled data directory path (but don't load data yet) if self.presampled and save_presampled_data_path: self.presampled_base_dir = os.path.splitext(save_presampled_data_path)[0] self.presampled_split_dir = os.path.join(self.presampled_base_dir, self.split_type) if not os.path.exists(self.presampled_split_dir): raise FileNotFoundError(f"Presampled data directory not found: {self.presampled_split_dir}") def __len__(self): return len(self.run_numbers) def __getitem__(self, idx): run_number = self.run_numbers[idx] if self.presampled: # Load presampled data on-demand presampled_file_path = os.path.join(self.presampled_split_dir, f'run_{run_number}.npy') if os.path.exists(presampled_file_path): data_dict = np.load(presampled_file_path, allow_pickle=True).item() coordinates = data_dict['surface_mesh_centers'] field = data_dict['surface_fields'][:,0:1] else: raise FileNotFoundError(f"Presampled file not found: {presampled_file_path}") else: # Load original data original_filename = self.original_filenames[run_number] original_file_path = os.path.join(self.data_dir, original_filename) data = np.load(original_file_path, allow_pickle=True).item() coordinates = data['surface_mesh_centers'] field = data['surface_fields'][:,0:1] # Random sampling sample_indices = np.random.choice(coordinates.shape[0], self.num_points, replace=False) coordinates = coordinates[sample_indices,:] field = field[sample_indices,0:1] if self.split_type == 'test' and self.chunked_eval: # Load original data original_filename = self.original_filenames[run_number] original_file_path = os.path.join(self.data_dir, original_filename) data = np.load(original_file_path, allow_pickle=True).item() coordinates = data['surface_mesh_centers'] field = data['surface_fields'][:,0:1] num_chunks = coordinates.shape[0]//self.num_points indices = torch.randperm(coordinates.shape[0])[:self.num_points*num_chunks] # indices = torch.randperm(coordinates.shape[0]) coordinates = coordinates[indices,:] field = field[indices,0:1] coordinates_tensor = torch.tensor(coordinates, dtype=torch.float32) field_tensor = torch.tensor(field, dtype=torch.float32) # Use mean-std normalization for coordinates coordinates_tensor = (coordinates_tensor - INPUT_POS_MEAN) / INPUT_POS_STD field_tensor = (field_tensor - PRESSURE_MEAN) / PRESSURE_STD data = {'input_pos': coordinates_tensor, 'output_feat': field_tensor, 'output_pos': coordinates_tensor} return data def calculate_normalization_constants(dataloader): """ Calculate normalization constants for both pressure values and coordinate ranges across the entire training dataset. Args: dataloader: Training DataLoader Returns: tuple: (pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std) where coord_ranges = {'min_x', 'max_x', 'min_y', 'max_y', 'min_z', 'max_z'} coord_mean = [mean_x, mean_y, mean_z] coord_std = [std_x, std_y, std_z] """ all_pressures = [] all_coordinates = [] # Store all coordinate points for mean/std calculation # Initialize coordinate extremes max_x = float('-inf') max_y = float('-inf') max_z = float('-inf') min_x = float('inf') min_y = float('inf') min_z = float('inf') print("Calculating normalization constants...") for batch_idx, batch in enumerate(dataloader): # Process pressure values output_feat = batch['output_feat'] pressures = output_feat.flatten().numpy() all_pressures.extend(pressures) # Process coordinate ranges and collect all coordinates input_pos = batch['input_pos'] # Convert tensor to numpy for coordinate calculations input_pos_np = input_pos.numpy() # Collect all coordinate points for mean/std calculation # Reshape from (batch_size, num_points, 3) to (batch_size * num_points, 3) coords_reshaped = input_pos_np.reshape(-1, 3) all_coordinates.extend(coords_reshaped) # Calculate coordinate ranges max_x = max(max_x, np.max(input_pos_np[:,:,0])) max_y = max(max_y, np.max(input_pos_np[:,:,1])) max_z = max(max_z, np.max(input_pos_np[:,:,2])) min_x = min(min_x, np.min(input_pos_np[:,:,0])) min_y = min(min_y, np.min(input_pos_np[:,:,1])) min_z = min(min_z, np.min(input_pos_np[:,:,2])) if batch_idx % 10 == 0: # Print progress every 10 batches print(f"Processed {batch_idx + 1} batches...") # Convert to numpy arrays for efficient computation all_pressures = np.array(all_pressures) all_coordinates = np.array(all_coordinates) # Shape: (total_points, 3) # Calculate pressure statistics pressure_mean = np.mean(all_pressures) pressure_std = np.std(all_pressures) # Calculate coordinate statistics (mean and std for each dimension) coord_mean = np.mean(all_coordinates, axis=0) # [mean_x, mean_y, mean_z] coord_std = np.std(all_coordinates, axis=0) # [std_x, std_y, std_z] # Store coordinate ranges coord_ranges = { 'min_x': min_x, 'max_x': max_x, 'min_y': min_y, 'max_y': max_y, 'min_z': min_z, 'max_z': max_z } # Print comprehensive statistics print(f"\nPressure statistics from {len(all_pressures)} data points:") print(f"Mean: {pressure_mean:.6f}") print(f"Std: {pressure_std:.6f}") print(f"Min: {np.min(all_pressures):.6f}") print(f"Max: {np.max(all_pressures):.6f}") print(f"\nCoordinate ranges:") print(f"X: [{min_x:.6f}, {max_x:.6f}]") print(f"Y: [{min_y:.6f}, {max_y:.6f}]") print(f"Z: [{min_z:.6f}, {max_z:.6f}]") print(f"\nCoordinate statistics for mean-std normalization from {len(all_coordinates)} data points:") print(f"Mean: [{coord_mean[0]:.6f}, {coord_mean[1]:.6f}, {coord_mean[2]:.6f}]") print(f"Std: [{coord_std[0]:.6f}, {coord_std[1]:.6f}, {coord_std[2]:.6f}]") print(f"\nFor use in dataset file:") print(f"INPUT_POS_MEAN = torch.tensor([{coord_mean[0]:.6f}, {coord_mean[1]:.6f}, {coord_mean[2]:.6f}])") print(f"INPUT_POS_STD = torch.tensor([{coord_std[0]:.6f}, {coord_std[1]:.6f}, {coord_std[2]:.6f}])") return pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std def seed_worker(worker_id): worker_seed = torch.initial_seed() % 2**32 np.random.seed(worker_seed) g = torch.Generator() g.manual_seed(0) def get_dataloaders(cfg): splits = json.load(open(cfg.splits_file)) # How is train validation used in DrivAerML? # Handle presampling logic presampled_data_path = getattr(cfg, 'presampled_data_path', os.path.join(cfg.data_dir, 'presampled_val_test_data.npy')) presampled_base_dir = os.path.splitext(presampled_data_path)[0] if not cfg.presampled_exists: # Create presampled data if it doesn't exist or if presampled=False if not os.path.exists(presampled_base_dir): print("=" * 60) print("PRESAMPLING MODE: Creating presampled validation and test data...") print(f"Presampled data will be saved to: {presampled_base_dir}") create_presampled_data(cfg, splits, presampled_data_path) print("Presampled data created successfully!") print("You can now set presampled=True in config for future runs to use this presampled data.") print("=" * 60) else: print(f"Presampled data directory already exists at: {presampled_base_dir}") print("Using existing presampled data. Set presampled=True to use it in future runs.") if not cfg.presampled_exists : print(f"Warning: presampled=True but presampled data directory not found at {presampled_base_dir}") print("Creating presampled data...") create_presampled_data(cfg, splits, presampled_data_path) use_presampled = True print(f"Using presampled training, validation and test data from: {presampled_base_dir}") train_dataset = DrivAerMLDataset(cfg, splits = splits['train'], split_type = 'train', presampled = use_presampled, save_presampled_data_path = presampled_data_path) val_dataset = DrivAerMLDataset(cfg, splits = splits['validation'], split_type = 'validation', presampled = use_presampled, save_presampled_data_path = presampled_data_path) test_dataset = DrivAerMLDataset(cfg, splits = splits['test'], split_type = 'test', presampled = use_presampled, save_presampled_data_path = presampled_data_path) collate_fn = None train_dataloader = DataLoader( train_dataset, batch_size=cfg.batch_size, shuffle=True, drop_last=True, num_workers=cfg.num_workers, collate_fn=collate_fn, worker_init_fn=seed_worker, generator=g ) val_dataloader = DataLoader( val_dataset, batch_size=cfg.batch_size, shuffle=True, drop_last=True, num_workers=cfg.num_workers, collate_fn=collate_fn, worker_init_fn=seed_worker, generator=g ) test_dataloader = DataLoader( test_dataset, batch_size=1, shuffle=False, drop_last=False, num_workers=cfg.num_workers, collate_fn=collate_fn, worker_init_fn=seed_worker, generator=g ) # # Calculate normalization constants # print('Calculating normalization constants...') # pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std = calculate_normalization_constants(train_dataloader) # exit() return train_dataloader, val_dataloader, test_dataloader # Pressure statistics from openfoam surface train dataset (10k points sampled): # Mean: -229.845718 # Std: 269.598572 # Min: -3651.057861 # Max: 859.160034 # Coordinate ranges: # X: [-0.941836, 4.131968] # Y: [-1.129535, 1.125530] # Z: [-0.317549, 1.244577] # Pressure statistics from full openfoam surface train dataset (3323811346 data points): # Mean: -229.266983 # Std: 269.226807 # Min: -111492.804688 # Max: 6382.190918 # Coordinate ranges: # X: [-0.942579, 4.132785] # Y: [-1.131676, 1.131676] # Z: [-0.317577, 1.244584] # Coordinate statistics for mean-std normalization (computed from full dataset): # Mean: [1.595103, 0.000000, 0.463503] # Std: [1.434788, 0.801948, 0.440890] # Pressure statistics from 6553600 data points: # Mean: -0.003021 # Std: 1.002092 # Min: -14.342350 # Max: 4.157114 # Coordinate ranges: # X: [-1.768229, 1.766621] # Y: [-1.408318, 1.410171] # Z: [-1.771534, 1.781146] # Coordinate statistics for mean-std normalization from 6553600 data points: # Mean: [-0.076668, -0.001889, -0.831090] # Std: [0.968414, 0.882944, 0.858088] # For use in dataset file: # INPUT_POS_MEAN = torch.tensor([-0.076668, -0.001889, -0.831090]) # INPUT_POS_STD = torch.tensor([0.968414, 0.882944, 0.858088]) # # With full dataset - pressure normalization # PRESSURE_MEAN = -229.266983 # PRESSURE_STD = 269.226807 # # Coordinate normalization using mean-std # INPUT_POS_MEAN = torch.tensor([1.595103, 0.000000, 0.463503]) # INPUT_POS_STD = torch.tensor([1.434788, 0.801948, 0.440890]) # # Legacy min-max normalization (keep for reference but not used) # input_pos_mins = torch.tensor([-0.942579, -1.131676, -0.317577]) # input_pos_maxs = torch.tensor([4.132785, 1.131676, 1.244584]) # With full dataset - pressure normalization PRESSURE_MEAN = -229.266983 PRESSURE_STD = 269.226807 # Coordinate normalization using mean-std INPUT_POS_MEAN = torch.tensor([1.490858, -0.001515, 0.099364]) INPUT_POS_STD = torch.tensor([1.388309, 0.706769, 0.380478]) # # Legacy min-max normalization (keep for reference but not used) # input_pos_mins = torch.tensor([-0.942579, -1.131676, -0.317577]) # input_pos_maxs = torch.tensor([4.132785, 1.131676, 1.244584]) # Pressure normalization # PRESSURE_MEAN = 0 # PRESSURE_STD = 1 # # Coordinate normalization using mean-std # INPUT_POS_MEAN = torch.tensor([0, 0, 0]) # INPUT_POS_STD = torch.tensor([1, 1, 1])