AnsysLPFMTrame-App / utils /Jansen-score-test_all.py
udbhav
Recreate Trame_app branch with clean history
67fb03c
import pyvista as pv
import numpy as np
from scipy.spatial.distance import jensenshannon, cosine
from scipy.ndimage import gaussian_filter
import matplotlib.pyplot as plt
import os
# -------------------- PyVista Config --------------------
pv.OFF_SCREEN = True
pv.set_plot_theme("document")
# -------------------- Paths --------------------
dataset = "cadillac"
train_folder = f'/raid/ansysai/pkakka/6-Transformers/comparePhysicsLM/Data/{dataset}/'
train_save_dir = os.path.join(train_folder, f"../../metrics/{dataset}/")
os.makedirs(train_save_dir, exist_ok=True)
# -------------------- Utility Functions --------------------
def get_points(file_path, max_points=5000):
try:
mesh = pv.read(file_path)
points = mesh.points
if len(points) > max_points:
indices = np.random.choice(len(points), max_points, replace=False)
points = points[indices]
return points
except Exception as e:
raise ValueError(f"Error reading {file_path}: {e}")
# -------------------- Training Histogram --------------------
def compute_training_dist(train_folder, output_file='train_dist.npz', num_bins=25, smooth_sigma=1):
train_txt_path = os.path.join(train_folder, '1_VTK_surface/train.txt')
if not os.path.exists(train_txt_path):
raise ValueError(f"train.txt not found at {train_txt_path}")
with open(train_txt_path, 'r') as f:
folder_names = [line.strip() for line in f if line.strip()]
train_files = []
for name in folder_names:
vtp_file = os.path.join(train_folder, '1_VTK_surface', name, f'{name}.vtp')
if os.path.exists(vtp_file):
train_files.append(vtp_file)
else:
print(f"Warning: VTP not found: {vtp_file}")
if not train_files:
raise ValueError("No training VTPs found.")
train_points = np.concatenate([get_points(f) for f in train_files], axis=0)
bin_edges = [np.histogram_bin_edges(train_points[:, i], bins=num_bins) for i in range(3)]
train_hist, _ = np.histogramdd(train_points, bins=bin_edges, density=True)
train_hist = gaussian_filter(train_hist, sigma=smooth_sigma).flatten()
train_hist /= train_hist.sum()
np.savez(os.path.join(train_folder, output_file),
hist=train_hist, edges0=bin_edges[0], edges1=bin_edges[1], edges2=bin_edges[2])
print(f"Training histogram saved: {output_file} ({train_points.shape[0]} points)")
# -------------------- Multi-Metric Similarity --------------------
def compute_similarity_scores(test_file, train_dist_file='train_dist.npz', smooth_sigma=1):
data = np.load(os.path.join(train_save_dir, train_dist_file))
train_hist = data['hist']
bin_edges = [data['edges0'], data['edges1'], data['edges2']]
test_points = get_points(test_file)
test_hist, _ = np.histogramdd(test_points, bins=bin_edges, density=True)
test_hist = gaussian_filter(test_hist, sigma=smooth_sigma).flatten()
test_hist /= test_hist.sum()
epsilon = 1e-12
train_hist_safe = train_hist + epsilon
test_hist_safe = test_hist + epsilon
# Metrics
scores = {}
# 1. JS
scores['JS'] = 1 - jensenshannon(train_hist_safe, test_hist_safe)
# 2. KL (as similarity)
scores['KL'] = 1 / (1 + np.sum(train_hist_safe * np.log(train_hist_safe / test_hist_safe)))
# 3. Hellinger
scores['Hellinger'] = 1 - (1/np.sqrt(2)) * np.linalg.norm(np.sqrt(train_hist_safe) - np.sqrt(test_hist_safe))
# 4. Bhattacharyya
scores['Bhattacharyya'] = np.sum(np.sqrt(train_hist_safe * test_hist_safe))
# 5. Cosine
scores['Cosine'] = 1 - cosine(train_hist_safe, test_hist_safe)
# 6. L1
scores['L1'] = 1 / (1 + np.sum(np.abs(train_hist_safe - test_hist_safe)))
# 7. L2
scores['L2'] = 1 / (1 + np.linalg.norm(train_hist_safe - test_hist_safe))
return scores
# -------------------- Analyze Test Folder --------------------
def analyze_and_save_scores(test_folder, train_dist_file='train_dist.npz', output_file='test_similarity_scores.txt', smooth_sigma=1):
test_txt_path = os.path.join(train_folder, '1_VTK_surface/test.txt')
if not os.path.exists(test_txt_path):
raise ValueError(f"test.txt not found at {test_txt_path}")
with open(test_txt_path, 'r') as f:
folder_names = [line.strip() for line in f if line.strip()]
output_path = os.path.join(train_save_dir, output_file)
all_scores = []
with open(output_path, 'w') as f_out:
header = "Test_File\tJS\tKL\tHellinger\tBhattacharyya\tCosine\tL1\tL2\n"
f_out.write(header)
for name in folder_names:
vtp_file = os.path.join(train_folder, '1_VTK_surface', name, f'{name}.vtp')
if os.path.exists(vtp_file):
try:
scores = compute_similarity_scores(vtp_file, train_dist_file, smooth_sigma)
all_scores.append(list(scores.values()))
f_out.write(f"{name}\t" + "\t".join([f"{s:.6f}" for s in scores.values()]) + "\n")
# Convert to regular floats with 3 decimal places for clean printing
scores_clean = {k: round(float(v), 3) for k, v in scores.items()}
print(f"{name}: {scores_clean}")
except Exception as e:
print(f"Error for {name}: {e}")
f_out.write(f"{name}\tERROR\n")
else:
print(f"Warning: VTP not found: {vtp_file}")
f_out.write(f"{name}\tERROR\n")
return folder_names, np.array(all_scores)
# -------------------- Main --------------------
if __name__ == "__main__":
try:
print("Computing training histogram...")
compute_training_dist(train_folder, num_bins=25, smooth_sigma=1)
print("Analyzing test cases with multiple metrics...")
names, scores_array = analyze_and_save_scores(train_folder, smooth_sigma=1)
print("Analysis completed successfully!")
except Exception as e:
print(f"Error during analysis: {e}")
import traceback
traceback.print_exc()