AnsysLPFMTrame-App / datasets /DrivAerML /dataset_drivaerml.py
udbhav
Recreate Trame_app branch with clean history
67fb03c
"""
DrivAerML Dataset with Memory-Efficient Presampling Support
This dataset implements presampling functionality for training, validation and test data.
The presampling feature ensures consistent results across different runs, with memory-efficient
on-demand loading.
Presampling Workflow:
1. Set presampled=False in config to create presampled training, validation and test data
2. The system creates fixed samples for all splits
3. Each run's presampled data is saved as individual files in a directory structure
4. Set presampled=True in config to use the saved presampled data for future runs
Directory Structure:
presampled_data_path/
β”œβ”€β”€ train/
β”‚ β”œβ”€β”€ run_1.npy
β”‚ β”œβ”€β”€ run_2.npy
β”‚ └── ...
β”œβ”€β”€ validation/
β”‚ β”œβ”€β”€ run_1.npy
β”‚ β”œβ”€β”€ run_2.npy
β”‚ └── ...
└── test/
β”œβ”€β”€ run_1.npy
β”œβ”€β”€ run_2.npy
└── ...
Configuration Parameters:
- presampled: Boolean flag to control whether to use presampled data
- presampled_data_path: Base path where presampled data directory is created
Usage:
- First run: Set presampled=False to create presampled data
- Subsequent runs: Set presampled=True to use existing presampled data
"""
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import default_collate
import json
import re
from sklearn.cluster import KMeans
from sklearn.neighbors import NearestNeighbors
def create_presampled_data(cfg, splits, save_path):
"""
Create presampled training, validation and test data with fixed random sampling.
Saves individual files for each run to enable on-demand loading.
Args:
cfg: Configuration object
splits: Dictionary containing train/validation/test splits
save_path: Base path for saving presampled data (directory will be created)
"""
print("Creating presampled training, validation and test data...")
# Create directory structure for presampled data
base_dir = os.path.splitext(save_path)[0] # Remove .npy extension if present
os.makedirs(base_dir, exist_ok=True)
# Set seed for reproducible sampling
np.random.seed(0)
for split_type in ['train', 'validation', 'test']:
print(f"Processing {split_type} split...")
split_runs = splits[split_type]
# Create subdirectory for this split
split_dir = os.path.join(base_dir, split_type)
os.makedirs(split_dir, exist_ok=True)
for run_number in split_runs:
# Find the corresponding .npy file
for f in os.listdir(cfg.data_dir):
if f.endswith('.npy'):
match = re.search(r'run_(\d+)', f) ## Very inefficient?
if match and int(match.group(1)) == run_number:
npy_file_path = os.path.join(cfg.data_dir, f)
# Load the original data
data = np.load(npy_file_path, allow_pickle=True).item()
coordinates = data['surface_mesh_centers']
field = data['surface_fields']
# Sample points with fixed seed for reproducibility
sample_indices = np.random.choice(coordinates.shape[0], cfg.num_points, replace=False)
sampled_coordinates = coordinates[sample_indices, :]
sampled_field = field[sample_indices, :]
# Save individual presampled file for this run
presampled_run_data = {
'surface_mesh_centers': sampled_coordinates,
'surface_fields': sampled_field
}
run_file_path = os.path.join(split_dir, f'run_{run_number}.npy')
np.save(run_file_path, presampled_run_data)
break
print(f"Presampled data saved to directory: {base_dir}")
print(f"Structure: {base_dir}/{{train,validation,test}}/run_{{number}}.npy")
return base_dir
class DrivAerMLDataset(Dataset):
def __init__(self, cfg, splits = None, split_type = 'train', presampled = False, save_presampled_data_path = None):
"""
Initializes the DrivAerMLDataset instance.
Args:
cfg: Configuration object containing data directory and number of points
splits: List of run numbers to include, if None includes all files
split_type: Type of split ('train', 'validation', 'test')
presampled: Whether to use presampled data
save_presampled_data_path: Base path to the presampled data directory
"""
self.data_dir = cfg.data_dir
self.chunked_eval = cfg.chunked_eval
self.splits = splits
# Store only run numbers and create filename mapping for efficiency
self.run_numbers = []
self.original_filenames = {} # run_number -> original filename
for f in os.listdir(cfg.data_dir):
if f.endswith('.npy'):
match = re.search(r'run_(\d+)', f)
if match:
run_number = int(match.group(1))
if run_number in splits:
self.run_numbers.append(run_number)
self.original_filenames[run_number] = f
if len(self.run_numbers) == 0:
raise ValueError(f"No .npy files found in directory: {cfg.data_dir}")
self.num_points = cfg.num_points
self.split_type = split_type
self.presampled = presampled # Is there a script for non presampled dataloader?
# Set up presampled data directory path (but don't load data yet)
if self.presampled and save_presampled_data_path:
self.presampled_base_dir = os.path.splitext(save_presampled_data_path)[0]
self.presampled_split_dir = os.path.join(self.presampled_base_dir, self.split_type)
if not os.path.exists(self.presampled_split_dir):
raise FileNotFoundError(f"Presampled data directory not found: {self.presampled_split_dir}")
def __len__(self):
return len(self.run_numbers)
def __getitem__(self, idx):
run_number = self.run_numbers[idx]
if self.presampled:
# Load presampled data on-demand
presampled_file_path = os.path.join(self.presampled_split_dir, f'run_{run_number}.npy')
if os.path.exists(presampled_file_path):
data_dict = np.load(presampled_file_path, allow_pickle=True).item()
coordinates = data_dict['surface_mesh_centers']
field = data_dict['surface_fields'][:,0:1]
else:
raise FileNotFoundError(f"Presampled file not found: {presampled_file_path}")
else:
# Load original data
original_filename = self.original_filenames[run_number]
original_file_path = os.path.join(self.data_dir, original_filename)
data = np.load(original_file_path, allow_pickle=True).item()
coordinates = data['surface_mesh_centers']
field = data['surface_fields'][:,0:1]
# Random sampling
sample_indices = np.random.choice(coordinates.shape[0], self.num_points, replace=False)
coordinates = coordinates[sample_indices,:]
field = field[sample_indices,0:1]
if self.split_type == 'test' and self.chunked_eval:
# Load original data
original_filename = self.original_filenames[run_number]
original_file_path = os.path.join(self.data_dir, original_filename)
data = np.load(original_file_path, allow_pickle=True).item()
coordinates = data['surface_mesh_centers']
field = data['surface_fields'][:,0:1]
num_chunks = coordinates.shape[0]//self.num_points
indices = torch.randperm(coordinates.shape[0])[:self.num_points*num_chunks]
# indices = torch.randperm(coordinates.shape[0])
coordinates = coordinates[indices,:]
field = field[indices,0:1]
coordinates_tensor = torch.tensor(coordinates, dtype=torch.float32)
field_tensor = torch.tensor(field, dtype=torch.float32)
# Use mean-std normalization for coordinates
coordinates_tensor = (coordinates_tensor - INPUT_POS_MEAN) / INPUT_POS_STD
field_tensor = (field_tensor - PRESSURE_MEAN) / PRESSURE_STD
data = {'input_pos': coordinates_tensor, 'output_feat': field_tensor, 'output_pos': coordinates_tensor}
return data
def calculate_normalization_constants(dataloader):
"""
Calculate normalization constants for both pressure values and coordinate ranges
across the entire training dataset.
Args:
dataloader: Training DataLoader
Returns:
tuple: (pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std)
where coord_ranges = {'min_x', 'max_x', 'min_y', 'max_y', 'min_z', 'max_z'}
coord_mean = [mean_x, mean_y, mean_z]
coord_std = [std_x, std_y, std_z]
"""
all_pressures = []
all_coordinates = [] # Store all coordinate points for mean/std calculation
# Initialize coordinate extremes
max_x = float('-inf')
max_y = float('-inf')
max_z = float('-inf')
min_x = float('inf')
min_y = float('inf')
min_z = float('inf')
print("Calculating normalization constants...")
for batch_idx, batch in enumerate(dataloader):
# Process pressure values
output_feat = batch['output_feat']
pressures = output_feat.flatten().numpy()
all_pressures.extend(pressures)
# Process coordinate ranges and collect all coordinates
input_pos = batch['input_pos']
# Convert tensor to numpy for coordinate calculations
input_pos_np = input_pos.numpy()
# Collect all coordinate points for mean/std calculation
# Reshape from (batch_size, num_points, 3) to (batch_size * num_points, 3)
coords_reshaped = input_pos_np.reshape(-1, 3)
all_coordinates.extend(coords_reshaped)
# Calculate coordinate ranges
max_x = max(max_x, np.max(input_pos_np[:,:,0]))
max_y = max(max_y, np.max(input_pos_np[:,:,1]))
max_z = max(max_z, np.max(input_pos_np[:,:,2]))
min_x = min(min_x, np.min(input_pos_np[:,:,0]))
min_y = min(min_y, np.min(input_pos_np[:,:,1]))
min_z = min(min_z, np.min(input_pos_np[:,:,2]))
if batch_idx % 10 == 0: # Print progress every 10 batches
print(f"Processed {batch_idx + 1} batches...")
# Convert to numpy arrays for efficient computation
all_pressures = np.array(all_pressures)
all_coordinates = np.array(all_coordinates) # Shape: (total_points, 3)
# Calculate pressure statistics
pressure_mean = np.mean(all_pressures)
pressure_std = np.std(all_pressures)
# Calculate coordinate statistics (mean and std for each dimension)
coord_mean = np.mean(all_coordinates, axis=0) # [mean_x, mean_y, mean_z]
coord_std = np.std(all_coordinates, axis=0) # [std_x, std_y, std_z]
# Store coordinate ranges
coord_ranges = {
'min_x': min_x, 'max_x': max_x,
'min_y': min_y, 'max_y': max_y,
'min_z': min_z, 'max_z': max_z
}
# Print comprehensive statistics
print(f"\nPressure statistics from {len(all_pressures)} data points:")
print(f"Mean: {pressure_mean:.6f}")
print(f"Std: {pressure_std:.6f}")
print(f"Min: {np.min(all_pressures):.6f}")
print(f"Max: {np.max(all_pressures):.6f}")
print(f"\nCoordinate ranges:")
print(f"X: [{min_x:.6f}, {max_x:.6f}]")
print(f"Y: [{min_y:.6f}, {max_y:.6f}]")
print(f"Z: [{min_z:.6f}, {max_z:.6f}]")
print(f"\nCoordinate statistics for mean-std normalization from {len(all_coordinates)} data points:")
print(f"Mean: [{coord_mean[0]:.6f}, {coord_mean[1]:.6f}, {coord_mean[2]:.6f}]")
print(f"Std: [{coord_std[0]:.6f}, {coord_std[1]:.6f}, {coord_std[2]:.6f}]")
print(f"\nFor use in dataset file:")
print(f"INPUT_POS_MEAN = torch.tensor([{coord_mean[0]:.6f}, {coord_mean[1]:.6f}, {coord_mean[2]:.6f}])")
print(f"INPUT_POS_STD = torch.tensor([{coord_std[0]:.6f}, {coord_std[1]:.6f}, {coord_std[2]:.6f}])")
return pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std
def seed_worker(worker_id):
worker_seed = torch.initial_seed() % 2**32
np.random.seed(worker_seed)
g = torch.Generator()
g.manual_seed(0)
def get_dataloaders(cfg):
splits = json.load(open(cfg.splits_file)) # How is train validation used in DrivAerML?
# Handle presampling logic
presampled_data_path = getattr(cfg, 'presampled_data_path', os.path.join(cfg.data_dir, 'presampled_val_test_data.npy'))
presampled_base_dir = os.path.splitext(presampled_data_path)[0]
if not cfg.presampled_exists:
# Create presampled data if it doesn't exist or if presampled=False
if not os.path.exists(presampled_base_dir):
print("=" * 60)
print("PRESAMPLING MODE: Creating presampled validation and test data...")
print(f"Presampled data will be saved to: {presampled_base_dir}")
create_presampled_data(cfg, splits, presampled_data_path)
print("Presampled data created successfully!")
print("You can now set presampled=True in config for future runs to use this presampled data.")
print("=" * 60)
else:
print(f"Presampled data directory already exists at: {presampled_base_dir}")
print("Using existing presampled data. Set presampled=True to use it in future runs.")
if not cfg.presampled_exists :
print(f"Warning: presampled=True but presampled data directory not found at {presampled_base_dir}")
print("Creating presampled data...")
create_presampled_data(cfg, splits, presampled_data_path)
use_presampled = True
print(f"Using presampled training, validation and test data from: {presampled_base_dir}")
train_dataset = DrivAerMLDataset(cfg, splits = splits['train'], split_type = 'train',
presampled = use_presampled, save_presampled_data_path = presampled_data_path)
val_dataset = DrivAerMLDataset(cfg, splits = splits['validation'], split_type = 'validation',
presampled = use_presampled, save_presampled_data_path = presampled_data_path)
test_dataset = DrivAerMLDataset(cfg, splits = splits['test'], split_type = 'test',
presampled = use_presampled, save_presampled_data_path = presampled_data_path)
collate_fn = None
train_dataloader = DataLoader(
train_dataset, batch_size=cfg.batch_size, shuffle=True,
drop_last=True, num_workers=cfg.num_workers, collate_fn=collate_fn,
worker_init_fn=seed_worker, generator=g
)
val_dataloader = DataLoader(
val_dataset, batch_size=cfg.batch_size, shuffle=True,
drop_last=True, num_workers=cfg.num_workers, collate_fn=collate_fn,
worker_init_fn=seed_worker, generator=g
)
test_dataloader = DataLoader(
test_dataset, batch_size=1, shuffle=False,
drop_last=False, num_workers=cfg.num_workers, collate_fn=collate_fn,
worker_init_fn=seed_worker, generator=g
)
# # Calculate normalization constants
# print('Calculating normalization constants...')
# pressure_mean, pressure_std, coord_ranges, coord_mean, coord_std = calculate_normalization_constants(train_dataloader)
# exit()
return train_dataloader, val_dataloader, test_dataloader
# Pressure statistics from openfoam surface train dataset (10k points sampled):
# Mean: -229.845718
# Std: 269.598572
# Min: -3651.057861
# Max: 859.160034
# Coordinate ranges:
# X: [-0.941836, 4.131968]
# Y: [-1.129535, 1.125530]
# Z: [-0.317549, 1.244577]
# Pressure statistics from full openfoam surface train dataset (3323811346 data points):
# Mean: -229.266983
# Std: 269.226807
# Min: -111492.804688
# Max: 6382.190918
# Coordinate ranges:
# X: [-0.942579, 4.132785]
# Y: [-1.131676, 1.131676]
# Z: [-0.317577, 1.244584]
# Coordinate statistics for mean-std normalization (computed from full dataset):
# Mean: [1.595103, 0.000000, 0.463503]
# Std: [1.434788, 0.801948, 0.440890]
# Pressure statistics from 6553600 data points:
# Mean: -0.003021
# Std: 1.002092
# Min: -14.342350
# Max: 4.157114
# Coordinate ranges:
# X: [-1.768229, 1.766621]
# Y: [-1.408318, 1.410171]
# Z: [-1.771534, 1.781146]
# Coordinate statistics for mean-std normalization from 6553600 data points:
# Mean: [-0.076668, -0.001889, -0.831090]
# Std: [0.968414, 0.882944, 0.858088]
# For use in dataset file:
# INPUT_POS_MEAN = torch.tensor([-0.076668, -0.001889, -0.831090])
# INPUT_POS_STD = torch.tensor([0.968414, 0.882944, 0.858088])
# # With full dataset - pressure normalization
# PRESSURE_MEAN = -229.266983
# PRESSURE_STD = 269.226807
# # Coordinate normalization using mean-std
# INPUT_POS_MEAN = torch.tensor([1.595103, 0.000000, 0.463503])
# INPUT_POS_STD = torch.tensor([1.434788, 0.801948, 0.440890])
# # Legacy min-max normalization (keep for reference but not used)
# input_pos_mins = torch.tensor([-0.942579, -1.131676, -0.317577])
# input_pos_maxs = torch.tensor([4.132785, 1.131676, 1.244584])
# With full dataset - pressure normalization
PRESSURE_MEAN = -229.266983
PRESSURE_STD = 269.226807
# Coordinate normalization using mean-std
INPUT_POS_MEAN = torch.tensor([1.490858, -0.001515, 0.099364])
INPUT_POS_STD = torch.tensor([1.388309, 0.706769, 0.380478])
# # Legacy min-max normalization (keep for reference but not used)
# input_pos_mins = torch.tensor([-0.942579, -1.131676, -0.317577])
# input_pos_maxs = torch.tensor([4.132785, 1.131676, 1.244584])
# Pressure normalization
# PRESSURE_MEAN = 0
# PRESSURE_STD = 1
# # Coordinate normalization using mean-std
# INPUT_POS_MEAN = torch.tensor([0, 0, 0])
# INPUT_POS_STD = torch.tensor([1, 1, 1])