import os import numpy as np import torch from torch.utils.data import Dataset, DataLoader import json from torch.utils.data import default_collate class ElasticityDataset(Dataset): def __init__(self, cfg, split_type = 'train'): ntrain = cfg.ntrain ntest = cfg.ntest PATH_Sigma = os.path.join(cfg.data_dir, 'Meshes/Random_UnitCell_sigma_10.npy') PATH_XY = os.path.join(cfg.data_dir, 'Meshes/Random_UnitCell_XY_10.npy') input_s = np.load(PATH_Sigma) input_s = torch.tensor(input_s, dtype=torch.float).permute(1, 0) input_xy = np.load(PATH_XY) input_xy = torch.tensor(input_xy, dtype=torch.float).permute(2, 0, 1) self.train_s = input_s[:ntrain] self.test_s = input_s[-ntest:] self.train_xy = input_xy[:ntrain] self.test_xy = input_xy[-ntest:] self.y_normalizer = UnitTransformer(self.train_s) self.train_s = self.y_normalizer.encode(self.train_s) self.test_s = self.y_normalizer.encode(self.test_s) self.y_normalizer.cuda() self.mean = self.train_s.mean(dim=(0, 1), keepdim=True) self.std = self.train_s.std(dim=(0, 1), keepdim=True) + 1e-8 self.split_type = split_type def __len__(self): if self.split_type == 'train': return len(self.train_s) else: return len(self.test_s) def __getitem__(self, idx): if self.split_type == 'train': coordinates = self.train_xy[idx].cuda() field = self.train_s[idx].cuda().unsqueeze(1) else: coordinates = self.test_xy[idx].cuda() field = self.test_s[idx].cuda().unsqueeze(1) data = {'input_pos': coordinates, 'output_feat': field, 'output_pos': coordinates} return data class UnitTransformer(): def __init__(self, X): self.mean = X.mean(dim=(0, 1), keepdim=True) self.std = X.std(dim=(0, 1), keepdim=True) + 1e-8 print('mean', self.mean) print('std', self.std) def to(self, device): self.mean = self.mean.to(device) self.std = self.std.to(device) return self def cuda(self): self.mean = self.mean.cuda() self.std = self.std.cuda() def cpu(self): self.mean = self.mean.cpu() self.std = self.std.cpu() def encode(self, x): x = (x - self.mean) / (self.std) return x def decode(self, x): return x * self.std + self.mean def transform(self, X, inverse=True, component='all'): if component == 'all' or 'all-reduce': if inverse: orig_shape = X.shape return (X * (self.std - 1e-8) + self.mean).view(orig_shape) else: return (X - self.mean) / self.std else: if inverse: orig_shape = X.shape return (X * (self.std[:, component] - 1e-8) + self.mean[:, component]).view(orig_shape) else: return (X - self.mean[:, component]) / self.std[:, component] def calculate_normalization_constants(dataloader): """ Calculate normalization constants for both pressure values and coordinate ranges across the entire training dataset. Args: dataloader: Training DataLoader Returns: tuple: (pressure_mean, pressure_std, coord_ranges) where coord_ranges = {'min_x', 'max_x', 'min_y', 'max_y', 'min_z', 'max_z'} """ all_field = [] # Initialize coordinate extremes max_x = float('-inf') max_y = float('-inf') min_x = float('inf') min_y = float('inf') print("Calculating normalization constants...") for batch_idx, batch in enumerate(dataloader): # Process pressure values output_feat = batch['output_feat'] field = output_feat.flatten().numpy() all_field.extend(field) # print('pressures', pressures.shape) # Process coordinate ranges input_pos = batch['input_pos'] # Convert tensor to numpy for coordinate calculations input_pos_np = input_pos.numpy() max_x = max(max_x, np.max(input_pos_np[:,:,0])) max_y = max(max_y, np.max(input_pos_np[:,:,1])) min_x = min(min_x, np.min(input_pos_np[:,:,0])) min_y = min(min_y, np.min(input_pos_np[:,:,1])) if batch_idx % 10 == 0: # Print progress every 10 batches print(f"Processed {batch_idx + 1} batches...") # Convert to numpy array for efficient computation all_field = np.array(all_field) # Calculate pressure statistics pressure_mean = np.mean(all_field) pressure_std = np.std(all_field) # Store coordinate ranges coord_ranges = { 'min_x': min_x, 'max_x': max_x, 'min_y': min_y, 'max_y': max_y, } # Print comprehensive statistics print(f"\nPressure statistics from {len(all_field)} data points:") print(f"Mean: {pressure_mean:.6f}") print(f"Std: {pressure_std:.6f}") print(f"Min: {np.min(all_field):.6f}") print(f"Max: {np.max(all_field):.6f}") print(f"\nCoordinate ranges:") print(f"X: [{min_x:.6f}, {max_x:.6f}]") print(f"Y: [{min_y:.6f}, {max_y:.6f}]") return pressure_mean, pressure_std, coord_ranges def get_dataloaders(cfg): train_dataset = ElasticityDataset(cfg, split_type = 'train') test_dataset = ElasticityDataset(cfg, split_type = 'test') collate_fn = None train_dataloader = DataLoader(train_dataset, batch_size=cfg.batch_size, shuffle=True, collate_fn=collate_fn) test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn) # # Calculate normalization constants # print('Calculating normalization constants...') # mean, std, coord_ranges = calculate_normalization_constants(train_dataloader) # exit() return train_dataloader, test_dataloader # For NeuralCFD class ElasticitySimulationCollator: def __init__(self, num_supernodes): self.num_supernodes = num_supernodes def __call__(self, batch): collated_batch = {} # inputs to sparse tensors # position: batch_size * (num_inputs, ndim) -> (batch_size * num_inputs, ndim) # features: batch_size * (num_inputs, dim) -> (batch_size * num_inputs, dim) input_pos = [] input_lens = [] for i in range(len(batch)): pos = batch[i]["input_pos"] input_pos.append(pos) input_lens.append(len(pos)) collated_batch["input_pos"] = torch.concat(input_pos) supernodes_offset = 0 supernode_idxs = [] for i in range(len(input_lens)): perm = torch.randperm(len(input_pos[i]))[:self.num_supernodes] + supernodes_offset supernode_idxs.append(perm) supernodes_offset += input_lens[i] collated_batch["supernode_idxs"] = torch.concat(supernode_idxs) # create batch_idx tensor batch_idx = torch.empty(sum(input_lens), dtype=torch.long) start = 0 cur_batch_idx = 0 for i in range(len(input_lens)): end = start + input_lens[i] batch_idx[start:end] = cur_batch_idx start = end cur_batch_idx += 1 collated_batch["batch_idx"] = batch_idx # output_feat to sparse tensor output_feat = [] for i in range(len(batch)): feat = batch[i]["output_feat"] output_feat.append(feat) # output_feat is either list of tensors (for training) or list of list of tensors (for rollout) if torch.is_tensor(output_feat[0]): collated_batch["output_feat"] = torch.concat(output_feat) else: collated_batch["output_feat"] = output_feat # collate dense tensors collated_batch["output_pos"] = default_collate([batch[i]["output_pos"] for i in range(len(batch))]) return collated_batch # Pressure statistics from 972000 data points: # Mean: 187.738770 # Std: 127.082832 # Min: 0.544023 # Max: 1442.220703 # Coordinate ranges: # X: [0.004355, 0.995558] # Y: [0.004534, 0.995479] # MEAN = 187.738770 # STD = 127.082832 # input_pos_mins = torch.tensor([0.004355, 0.004534]) # input_pos_maxs = torch.tensor([0.995558, 0.995479])