Spaces:
Sleeping
Sleeping
| # SPDX-FileCopyrightText: Copyright (c) 2023 - 2024 NVIDIA CORPORATION & AFFILIATES. | |
| # SPDX-FileCopyrightText: All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """ | |
| This is the datapipe to read OpenFoam files (vtp/vtu/stl) and save them as point clouds | |
| in npy format. | |
| """ | |
| import time, random | |
| from collections import defaultdict | |
| from pathlib import Path | |
| from typing import Any, Iterable, List, Literal, Mapping, Optional, Union, Callable | |
| import numpy as np | |
| import pandas as pd | |
| import pyvista as pv | |
| import vtk | |
| from torch.utils.data import Dataset | |
| import os | |
| from vtk.util import numpy_support | |
| def get_filenames(filepath: str) -> List[str]: | |
| """Function to get filenames from a directory""" | |
| if os.path.exists(filepath): | |
| filenames = [] | |
| for item in os.listdir(filepath): | |
| filenames.append(item) | |
| return filenames | |
| else: | |
| FileNotFoundError() | |
| def get_node_to_elem(polydata: Any) -> Any: | |
| """Function to convert node to elem""" | |
| c2p = vtk.vtkPointDataToCellData() | |
| c2p.SetInputData(polydata) | |
| c2p.Update() | |
| cell_data = c2p.GetOutput() | |
| return cell_data | |
| def get_fields(data, variables): | |
| """Function to get fields from VTP/VTU""" | |
| fields = [] | |
| for array_name in variables: | |
| try: | |
| array = data.GetArray(array_name) | |
| except ValueError: | |
| raise ValueError( | |
| f"Failed to get array {array_name} from the unstructured grid." | |
| ) | |
| array_data = numpy_support.vtk_to_numpy(array).reshape( | |
| array.GetNumberOfTuples(), array.GetNumberOfComponents() | |
| ) | |
| fields.append(array_data) | |
| return fields | |
| class DrivAerAwsPaths: | |
| def _get_index(car_dir: Path) -> str: | |
| return car_dir.name.removeprefix("run_") | |
| def geometry_path(car_dir: Path) -> Path: | |
| return car_dir / f"drivaer_{DrivAerAwsPaths._get_index(car_dir)}.stl" | |
| def volume_path(car_dir: Path) -> Path: | |
| return car_dir / f"volume_{DrivAerAwsPaths._get_index(car_dir)}.vtu" | |
| def surface_path(car_dir: Path) -> Path: | |
| return car_dir / f"boundary_{DrivAerAwsPaths._get_index(car_dir)}.vtp" | |
| class OpenFoamDataset(Dataset): | |
| """ | |
| Datapipe for converting openfoam dataset to npy | |
| """ | |
| def __init__( | |
| self, | |
| data_path: Union[str, Path], | |
| surface_variables: Optional[list] = [ | |
| "pMean", | |
| "wallShearStress", | |
| ], | |
| volume_variables: Optional[list] = ["UMean", "pMean"], | |
| model_type=None, | |
| ): | |
| if isinstance(data_path, str): | |
| data_path = Path(data_path) | |
| data_path = data_path.expanduser() | |
| self.data_path = data_path | |
| self.path_getter = DrivAerAwsPaths | |
| assert self.data_path.exists(), f"Path {self.data_path} does not exist" | |
| assert self.data_path.is_dir(), f"Path {self.data_path} is not a directory" | |
| self.filenames = get_filenames(self.data_path) | |
| random.shuffle(self.filenames) | |
| self.indices = np.array(len(self.filenames)) | |
| self.surface_variables = surface_variables | |
| self.volume_variables = volume_variables | |
| self.model_type = model_type | |
| def __len__(self): | |
| return len(self.filenames) | |
| def __getitem__(self, idx): | |
| cfd_filename = self.filenames[idx] | |
| car_dir = self.data_path / cfd_filename | |
| if self.model_type == "surface": | |
| surface_filepath = self.path_getter.surface_path(car_dir) | |
| reader = vtk.vtkXMLPolyDataReader() | |
| reader.SetFileName(surface_filepath) | |
| reader.Update() | |
| polydata = reader.GetOutput() | |
| celldata_all = get_node_to_elem(polydata) | |
| celldata = celldata_all.GetCellData() | |
| surface_fields = get_fields(celldata, self.surface_variables) | |
| surface_fields = np.concatenate(surface_fields, axis=-1) | |
| mesh = pv.PolyData(polydata) | |
| surface_coordinates = np.array(mesh.cell_centers().points) | |
| else: | |
| surface_fields = None | |
| surface_coordinates = None | |
| # Add the parameters to the dictionary | |
| return { | |
| "surface_mesh_centers": np.float32(surface_coordinates), | |
| "surface_fields": np.float32(surface_fields), | |
| } | |