Spaces:
Sleeping
Sleeping
| """ | |
| DrivAerML Dataset with Memory-Efficient Presampling Support | |
| This dataset implements presampling functionality for training, validation and test data. | |
| The presampling feature ensures consistent results across different runs, with memory-efficient | |
| on-demand loading. | |
| Presampling Workflow: | |
| 1. Set presampled=False in config to create presampled training, validation and test data | |
| 2. The system creates fixed samples for all splits | |
| 3. Each run's presampled data is saved as individual files in a directory structure | |
| 4. Set presampled=True in config to use the saved presampled data for future runs | |
| Directory Structure: | |
| presampled_data_path/ | |
| βββ train/ | |
| β βββ run_1.npy | |
| β βββ run_2.npy | |
| β βββ ... | |
| βββ validation/ | |
| β βββ run_1.npy | |
| β βββ run_2.npy | |
| β βββ ... | |
| βββ test/ | |
| βββ run_1.npy | |
| βββ run_2.npy | |
| βββ ... | |
| Configuration Parameters: | |
| - presampled: Boolean flag to control whether to use presampled data | |
| - presampled_data_path: Base path where presampled data directory is created | |
| Usage: | |
| - First run: Set presampled=False to create presampled data | |
| - Subsequent runs: Set presampled=True to use existing presampled data | |
| """ | |
| import os | |
| import numpy as np | |
| import torch | |
| from torch.utils.data import Dataset, DataLoader | |
| from torch.utils.data import default_collate | |
| import json | |
| import re | |
| from sklearn.cluster import KMeans | |
| from sklearn.neighbors import NearestNeighbors | |
| def create_presampled_data(cfg, splits, save_path): | |
| """ | |
| Create presampled training, validation and test data with fixed random sampling. | |
| Saves individual files for each run to enable on-demand loading. | |
| Args: | |
| cfg: Configuration object | |
| splits: Dictionary containing train/validation/test splits | |
| save_path: Base path for saving presampled data (directory will be created) | |
| """ | |
| print("Creating presampled training, validation and test data...") | |
| # Create directory structure for presampled data | |
| base_dir = os.path.splitext(save_path)[0] # Remove .npy extension if present | |
| os.makedirs(base_dir, exist_ok=True) | |
| # Set seed for reproducible sampling | |
| np.random.seed(0) | |
| for split_type in ['train', 'validation', 'test']: | |
| print(f"Processing {split_type} split...") | |
| split_runs = splits[split_type] | |
| # Create subdirectory for this split | |
| split_dir = os.path.join(base_dir, split_type) | |
| os.makedirs(split_dir, exist_ok=True) | |
| for run_number in split_runs: | |
| # Find the corresponding .npy file | |
| for f in os.listdir(cfg.data_dir): | |
| if f.endswith('.npy'): | |
| match = re.search(r'run_(\d+)', f) ## Very inefficient? | |
| if match and int(match.group(1)) == run_number: | |
| npy_file_path = os.path.join(cfg.data_dir, f) | |
| # Load the original data | |
| data = np.load(npy_file_path, allow_pickle=True).item() | |
| coordinates = data['surface_mesh_centers'] | |
| field = data['surface_fields'] | |
| # Sample points with fixed seed for reproducibility | |
| sample_indices = np.random.choice(coordinates.shape[0], cfg.num_points, replace=False) | |
| sampled_coordinates = coordinates[sample_indices, :] | |
| sampled_field = field[sample_indices, :] | |
| # Save individual presampled file for this run | |
| presampled_run_data = { | |
| 'surface_mesh_centers': sampled_coordinates, | |
| 'surface_fields': sampled_field | |
| } | |
| run_file_path = os.path.join(split_dir, f'run_{run_number}.npy') | |
| np.save(run_file_path, presampled_run_data) | |
| break | |
| print(f"Presampled data saved to directory: {base_dir}") | |
| print(f"Structure: {base_dir}/{{train,validation,test}}/run_{{number}}.npy") | |
| return base_dir | |
| class DrivAerMLDataset(Dataset): | |
| def __init__(self, cfg, splits = None, split_type = 'train', presampled = False, save_presampled_data_path = None): | |
| """ | |
| Initializes the DrivAerMLDataset instance. | |
| Args: | |
| cfg: Configuration object containing data directory and number of points | |
| splits: List of run numbers to include, if None includes all files | |
| split_type: Type of split ('train', 'validation', 'test') | |
| presampled: Whether to use presampled data | |
| save_presampled_data_path: Base path to the presampled data directory | |
| """ | |
| self.data_dir = cfg.data_dir | |
| self.chunked_eval = cfg.chunked_eval | |
| self.splits = splits | |
| # Store only run numbers and create filename mapping for efficiency | |
| self.run_numbers = [] | |
| self.original_filenames = {} # run_number -> original filename | |
| for f in os.listdir(cfg.data_dir): | |
| if f.endswith('.npy'): | |
| match = re.search(r'run_(\d+)', f) | |
| if match: | |
| run_number = int(match.group(1)) | |
| if run_number in splits: | |
| self.run_numbers.append(run_number) | |
| self.original_filenames[run_number] = f | |
| if len(self.run_numbers) == 0: | |
| raise ValueError(f"No .npy files found in directory: {cfg.data_dir}") | |
| self.num_points = cfg.num_points | |
| self.split_type = split_type | |
| self.presampled = presampled # Is there a script for non presampled dataloader? | |
| # Set up presampled data directory path (but don't load data yet) | |
| if self.presampled and save_presampled_data_path: | |
| self.presampled_base_dir = os.path.splitext(save_presampled_data_path)[0] | |
| self.presampled_split_dir = os.path.join(self.presampled_base_dir, self.split_type) | |
| if not os.path.exists(self.presampled_split_dir): | |
| raise FileNotFoundError(f"Presampled data directory not found: {self.presampled_split_dir}") | |
| def __len__(self): | |
| return len(self.run_numbers) | |
| def __getitem__(self, idx): | |
| run_number = self.run_numbers[idx] | |
| if self.presampled: | |
| # Load presampled data on-demand | |
| presampled_file_path = os.path.join(self.presampled_split_dir, f'run_{run_number}.npy') | |
| if os.path.exists(presampled_file_path): | |
| data_dict = np.load(presampled_file_path, allow_pickle=True).item() | |
| coordinates = data_dict['surface_mesh_centers'] | |
| field = data_dict['surface_fields'][:,0:1] | |
| else: | |
| raise FileNotFoundError(f"Presampled file not found: {presampled_file_path}") | |
| else: | |
| # Load original data | |
| original_filename = self.original_filenames[run_number] | |
| original_file_path = os.path.join(self.data_dir, original_filename) | |
| data = np.load(original_file_path, allow_pickle=True).item() | |
| coordinates = data['surface_mesh_centers'] | |
| field = data['surface_fields'][:,0:1] | |
| # Random sampling | |
| sample_indices = np.random.choice(coordinates.shape[0], self.num_points, replace=False) | |
| coordinates = coordinates[sample_indices,:] | |
| field = field[sample_indices,0:1] | |
| if self.split_type == 'test' and self.chunked_eval: | |
| # Load original data | |
| original_filename = self.original_filenames[run_number] | |
| original_file_path = os.path.join(self.data_dir, original_filename) | |
| data = np.load(original_file_path, allow_pickle=True).item() | |
| coordinates = data['surface_mesh_centers'] | |
| field = data['surface_fields'][:,0:1] | |
| num_chunks = coordinates.shape[0]//self.num_points | |
| indices = torch.randperm(coordinates.shape[0])[:self.num_points*num_chunks] | |
| # indices = torch.randperm(coordinates.shape[0]) | |
| coordinates = coordinates[indices,:] | |
| field = field[indices,0:1] | |
| coordinates_tensor = torch.tensor(coordinates, dtype=torch.float32) | |
| field_tensor = torch.tensor(field, dtype=torch.float32) | |
| # Use mean-std normalization for coordinates | |
| coordinates_tensor = (coordinates_tensor - INPUT_POS_MEAN) / INPUT_POS_STD | |
| field_tensor = (field_tensor - PRESSURE_MEAN) / PRESSURE_STD | |
| data = {'input_pos': coordinates_tensor, 'output_feat': field_tensor, 'output_pos': coordinates_tensor} | |
| return data | |
| def calculate_normalization_constants(dataloader): | |
| """ | |
| Calculate normalization constants for both pressure values and coordinate ranges | |
| across the entire training dataset. | |
| Args: | |
| dataloader: Training DataLoader | |
| Returns: | |
| tuple: (pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std) | |
| where coord_ranges = {'min_x', 'max_x', 'min_y', 'max_y', 'min_z', 'max_z'} | |
| coord_mean = [mean_x, mean_y, mean_z] | |
| coord_std = [std_x, std_y, std_z] | |
| """ | |
| all_pressures = [] | |
| all_coordinates = [] # Store all coordinate points for mean/std calculation | |
| # Initialize coordinate extremes | |
| max_x = float('-inf') | |
| max_y = float('-inf') | |
| max_z = float('-inf') | |
| min_x = float('inf') | |
| min_y = float('inf') | |
| min_z = float('inf') | |
| print("Calculating normalization constants...") | |
| for batch_idx, batch in enumerate(dataloader): | |
| # Process pressure values | |
| output_feat = batch['output_feat'] | |
| pressures = output_feat.flatten().numpy() | |
| all_pressures.extend(pressures) | |
| # Process coordinate ranges and collect all coordinates | |
| input_pos = batch['input_pos'] | |
| # Convert tensor to numpy for coordinate calculations | |
| input_pos_np = input_pos.numpy() | |
| # Collect all coordinate points for mean/std calculation | |
| # Reshape from (batch_size, num_points, 3) to (batch_size * num_points, 3) | |
| coords_reshaped = input_pos_np.reshape(-1, 3) | |
| all_coordinates.extend(coords_reshaped) | |
| # Calculate coordinate ranges | |
| max_x = max(max_x, np.max(input_pos_np[:,:,0])) | |
| max_y = max(max_y, np.max(input_pos_np[:,:,1])) | |
| max_z = max(max_z, np.max(input_pos_np[:,:,2])) | |
| min_x = min(min_x, np.min(input_pos_np[:,:,0])) | |
| min_y = min(min_y, np.min(input_pos_np[:,:,1])) | |
| min_z = min(min_z, np.min(input_pos_np[:,:,2])) | |
| if batch_idx % 10 == 0: # Print progress every 10 batches | |
| print(f"Processed {batch_idx + 1} batches...") | |
| # Convert to numpy arrays for efficient computation | |
| all_pressures = np.array(all_pressures) | |
| all_coordinates = np.array(all_coordinates) # Shape: (total_points, 3) | |
| # Calculate pressure statistics | |
| pressure_mean = np.mean(all_pressures) | |
| pressure_std = np.std(all_pressures) | |
| # Calculate coordinate statistics (mean and std for each dimension) | |
| coord_mean = np.mean(all_coordinates, axis=0) # [mean_x, mean_y, mean_z] | |
| coord_std = np.std(all_coordinates, axis=0) # [std_x, std_y, std_z] | |
| # Store coordinate ranges | |
| coord_ranges = { | |
| 'min_x': min_x, 'max_x': max_x, | |
| 'min_y': min_y, 'max_y': max_y, | |
| 'min_z': min_z, 'max_z': max_z | |
| } | |
| # Print comprehensive statistics | |
| print(f"\nPressure statistics from {len(all_pressures)} data points:") | |
| print(f"Mean: {pressure_mean:.6f}") | |
| print(f"Std: {pressure_std:.6f}") | |
| print(f"Min: {np.min(all_pressures):.6f}") | |
| print(f"Max: {np.max(all_pressures):.6f}") | |
| print(f"\nCoordinate ranges:") | |
| print(f"X: [{min_x:.6f}, {max_x:.6f}]") | |
| print(f"Y: [{min_y:.6f}, {max_y:.6f}]") | |
| print(f"Z: [{min_z:.6f}, {max_z:.6f}]") | |
| print(f"\nCoordinate statistics for mean-std normalization from {len(all_coordinates)} data points:") | |
| print(f"Mean: [{coord_mean[0]:.6f}, {coord_mean[1]:.6f}, {coord_mean[2]:.6f}]") | |
| print(f"Std: [{coord_std[0]:.6f}, {coord_std[1]:.6f}, {coord_std[2]:.6f}]") | |
| print(f"\nFor use in dataset file:") | |
| print(f"INPUT_POS_MEAN = torch.tensor([{coord_mean[0]:.6f}, {coord_mean[1]:.6f}, {coord_mean[2]:.6f}])") | |
| print(f"INPUT_POS_STD = torch.tensor([{coord_std[0]:.6f}, {coord_std[1]:.6f}, {coord_std[2]:.6f}])") | |
| return pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std | |
| def seed_worker(worker_id): | |
| worker_seed = torch.initial_seed() % 2**32 | |
| np.random.seed(worker_seed) | |
| g = torch.Generator() | |
| g.manual_seed(0) | |
| def get_dataloaders(cfg): | |
| splits = json.load(open(cfg.splits_file)) # How is train validation used in DrivAerML? | |
| # Handle presampling logic | |
| presampled_data_path = getattr(cfg, 'presampled_data_path', os.path.join(cfg.data_dir, 'presampled_val_test_data.npy')) | |
| presampled_base_dir = os.path.splitext(presampled_data_path)[0] | |
| if not cfg.presampled_exists: | |
| # Create presampled data if it doesn't exist or if presampled=False | |
| if not os.path.exists(presampled_base_dir): | |
| print("=" * 60) | |
| print("PRESAMPLING MODE: Creating presampled validation and test data...") | |
| print(f"Presampled data will be saved to: {presampled_base_dir}") | |
| create_presampled_data(cfg, splits, presampled_data_path) | |
| print("Presampled data created successfully!") | |
| print("You can now set presampled=True in config for future runs to use this presampled data.") | |
| print("=" * 60) | |
| else: | |
| print(f"Presampled data directory already exists at: {presampled_base_dir}") | |
| print("Using existing presampled data. Set presampled=True to use it in future runs.") | |
| if not cfg.presampled_exists : | |
| print(f"Warning: presampled=True but presampled data directory not found at {presampled_base_dir}") | |
| print("Creating presampled data...") | |
| create_presampled_data(cfg, splits, presampled_data_path) | |
| use_presampled = True | |
| print(f"Using presampled training, validation and test data from: {presampled_base_dir}") | |
| train_dataset = DrivAerMLDataset(cfg, splits = splits['train'], split_type = 'train', | |
| presampled = use_presampled, save_presampled_data_path = presampled_data_path) | |
| val_dataset = DrivAerMLDataset(cfg, splits = splits['validation'], split_type = 'validation', | |
| presampled = use_presampled, save_presampled_data_path = presampled_data_path) | |
| test_dataset = DrivAerMLDataset(cfg, splits = splits['test'], split_type = 'test', | |
| presampled = use_presampled, save_presampled_data_path = presampled_data_path) | |
| collate_fn = None | |
| train_dataloader = DataLoader( | |
| train_dataset, batch_size=cfg.batch_size, shuffle=True, | |
| drop_last=True, num_workers=cfg.num_workers, collate_fn=collate_fn, | |
| worker_init_fn=seed_worker, generator=g | |
| ) | |
| val_dataloader = DataLoader( | |
| val_dataset, batch_size=cfg.batch_size, shuffle=True, | |
| drop_last=True, num_workers=cfg.num_workers, collate_fn=collate_fn, | |
| worker_init_fn=seed_worker, generator=g | |
| ) | |
| test_dataloader = DataLoader( | |
| test_dataset, batch_size=1, shuffle=False, | |
| drop_last=False, num_workers=cfg.num_workers, collate_fn=collate_fn, | |
| worker_init_fn=seed_worker, generator=g | |
| ) | |
| # # Calculate normalization constants | |
| # print('Calculating normalization constants...') | |
| # pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std = calculate_normalization_constants(train_dataloader) | |
| # exit() | |
| return train_dataloader, val_dataloader, test_dataloader | |
| # Pressure statistics from openfoam surface train dataset (10k points sampled): | |
| # Mean: -229.845718 | |
| # Std: 269.598572 | |
| # Min: -3651.057861 | |
| # Max: 859.160034 | |
| # Coordinate ranges: | |
| # X: [-0.941836, 4.131968] | |
| # Y: [-1.129535, 1.125530] | |
| # Z: [-0.317549, 1.244577] | |
| # Pressure statistics from full openfoam surface train dataset (3323811346 data points): | |
| # Mean: -229.266983 | |
| # Std: 269.226807 | |
| # Min: -111492.804688 | |
| # Max: 6382.190918 | |
| # Coordinate ranges: | |
| # X: [-0.942579, 4.132785] | |
| # Y: [-1.131676, 1.131676] | |
| # Z: [-0.317577, 1.244584] | |
| # Coordinate statistics for mean-std normalization (computed from full dataset): | |
| # Mean: [1.595103, 0.000000, 0.463503] | |
| # Std: [1.434788, 0.801948, 0.440890] | |
| # Pressure statistics from 6553600 data points: | |
| # Mean: -0.003021 | |
| # Std: 1.002092 | |
| # Min: -14.342350 | |
| # Max: 4.157114 | |
| # Coordinate ranges: | |
| # X: [-1.768229, 1.766621] | |
| # Y: [-1.408318, 1.410171] | |
| # Z: [-1.771534, 1.781146] | |
| # Coordinate statistics for mean-std normalization from 6553600 data points: | |
| # Mean: [-0.076668, -0.001889, -0.831090] | |
| # Std: [0.968414, 0.882944, 0.858088] | |
| # For use in dataset file: | |
| # INPUT_POS_MEAN = torch.tensor([-0.076668, -0.001889, -0.831090]) | |
| # INPUT_POS_STD = torch.tensor([0.968414, 0.882944, 0.858088]) | |
| # # With full dataset - pressure normalization | |
| # PRESSURE_MEAN = -229.266983 | |
| # PRESSURE_STD = 269.226807 | |
| # # Coordinate normalization using mean-std | |
| # INPUT_POS_MEAN = torch.tensor([1.595103, 0.000000, 0.463503]) | |
| # INPUT_POS_STD = torch.tensor([1.434788, 0.801948, 0.440890]) | |
| # # Legacy min-max normalization (keep for reference but not used) | |
| # input_pos_mins = torch.tensor([-0.942579, -1.131676, -0.317577]) | |
| # input_pos_maxs = torch.tensor([4.132785, 1.131676, 1.244584]) | |
| # With full dataset - pressure normalization | |
| PRESSURE_MEAN = -229.266983 | |
| PRESSURE_STD = 269.226807 | |
| # Coordinate normalization using mean-std | |
| INPUT_POS_MEAN = torch.tensor([1.490858, -0.001515, 0.099364]) | |
| INPUT_POS_STD = torch.tensor([1.388309, 0.706769, 0.380478]) | |
| # # Legacy min-max normalization (keep for reference but not used) | |
| # input_pos_mins = torch.tensor([-0.942579, -1.131676, -0.317577]) | |
| # input_pos_maxs = torch.tensor([4.132785, 1.131676, 1.244584]) | |
| # Pressure normalization | |
| # PRESSURE_MEAN = 0 | |
| # PRESSURE_STD = 1 | |
| # # Coordinate normalization using mean-std | |
| # INPUT_POS_MEAN = torch.tensor([0, 0, 0]) | |
| # INPUT_POS_STD = torch.tensor([1, 1, 1]) | |