Spaces:
Running
Running
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| from sklearn.model_selection import StratifiedKFold | |
| from sklearn.metrics import classification_report, accuracy_score, roc_auc_score, precision_recall_fscore_support, confusion_matrix | |
| from sklearn.preprocessing import StandardScaler, LabelEncoder | |
| from sklearn.impute import SimpleImputer | |
| import matplotlib.pyplot as plt | |
| import json | |
| import joblib | |
| import os | |
| import seaborn as sns | |
| from scipy import stats | |
| import time | |
| import argparse | |
| def setup_gpu(): | |
| if torch.cuda.is_available(): | |
| return True | |
| else: | |
| print("No GPUs found. Using CPU.") | |
| return False | |
| GPU_AVAILABLE = setup_gpu() | |
| DEVICE = torch.device('cuda' if GPU_AVAILABLE else 'cpu') | |
| def load_data_from_json(directory_path): | |
| if os.path.isfile(directory_path): | |
| directory = os.path.dirname(directory_path) | |
| else: | |
| directory = directory_path | |
| print(f"Loading JSON files from directory: {directory}") | |
| json_files = [os.path.join(directory, f) for f in os.listdir(directory) | |
| if f.endswith('.json') and os.path.isfile(os.path.join(directory, f))] | |
| if not json_files: | |
| raise ValueError(f"No JSON files found in directory {directory}") | |
| print(f"Found {len(json_files)} JSON files") | |
| all_data = [] | |
| for file_path in json_files: | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data_dict = json.load(f) | |
| if 'data' in data_dict: | |
| all_data.extend(data_dict['data']) | |
| else: | |
| print(f"Warning: 'data' key not found in {os.path.basename(file_path)}") | |
| except Exception as e: | |
| print(f"Error loading {os.path.basename(file_path)}: {str(e)}") | |
| if not all_data: | |
| raise ValueError("Failed to load data from JSON files") | |
| df = pd.DataFrame(all_data) | |
| label_mapping = { | |
| 'ai': 'AI', | |
| 'human': 'Human', | |
| 'ai+rew': 'AI', | |
| } | |
| if 'source' in df.columns: | |
| df['label'] = df['source'].map(lambda x: label_mapping.get(x, x)) | |
| else: | |
| print("Warning: 'source' column not found, using default label") | |
| df['label'] = 'Unknown' | |
| valid_labels = ['AI', 'Human'] | |
| df = df[df['label'].isin(valid_labels)] | |
| print(f"Filtered to {len(df)} examples with labels: {valid_labels}") | |
| print(f"Label distribution: {df['label'].value_counts().to_dict()}") | |
| return df | |
| class Medium_Binary_Network(nn.Module): | |
| def __init__(self, input_size, hidden_sizes=[256, 128, 64, 32], dropout=0.3): | |
| super(Medium_Binary_Network, self).__init__() | |
| layers = [] | |
| prev_size = input_size | |
| for hidden_size in hidden_sizes: | |
| layers.append(nn.Linear(prev_size, hidden_size)) | |
| layers.append(nn.ReLU()) | |
| layers.append(nn.Dropout(dropout)) | |
| prev_size = hidden_size | |
| layers.append(nn.Linear(prev_size, 2)) | |
| self.model = nn.Sequential(*layers) | |
| def forward(self, x): | |
| return self.model(x) | |
| def cross_validate_simple_classifier(directory_path="experiments/results/two_scores_with_long_text_analyze_2048T", | |
| feature_config=None, | |
| n_splits=5, | |
| random_state=42, | |
| epochs=100, | |
| hidden_sizes=[256, 128, 64, 32], | |
| dropout=0.3, | |
| early_stopping_patience=10): | |
| print("\n" + "="*50) | |
| print("MEDIUM BINARY CLASSIFIER CROSS-VALIDATION") | |
| print("="*50) | |
| if feature_config is None: | |
| feature_config = { | |
| 'basic_scores': True, | |
| 'basic_text_stats': ['total_tokens', 'total_words', 'unique_words', 'stop_words', 'avg_word_length'], | |
| 'morphological': ['pos_distribution', 'unique_lemmas', 'lemma_word_ratio'], | |
| 'syntactic': ['dependencies', 'noun_chunks'], | |
| 'entities': ['total_entities', 'entity_types'], | |
| 'diversity': ['ttr', 'mtld'], | |
| 'structure': ['sentence_count', 'avg_sentence_length', 'question_sentences', 'exclamation_sentences'], | |
| 'readability': ['words_per_sentence', 'syllables_per_word', 'flesh_kincaid_score', 'long_words_percent'], | |
| 'semantic': True | |
| } | |
| df = load_data_from_json(directory_path) | |
| features_df = select_features(df, feature_config) | |
| print(f"Selected {len(features_df.columns)} features") | |
| imputer = SimpleImputer(strategy='mean') | |
| X = imputer.fit_transform(features_df) | |
| label_encoder = LabelEncoder() | |
| y = label_encoder.fit_transform(df['label'].values) | |
| skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) | |
| fold_metrics = [] | |
| fold_models = [] | |
| all_train_losses = [] | |
| all_val_losses = [] | |
| all_train_accs = [] | |
| all_val_accs = [] | |
| all_y_true = [] | |
| all_y_pred = [] | |
| best_fold_score = -1 | |
| best_fold_index = -1 | |
| for fold, (train_idx, test_idx) in enumerate(skf.split(X, y)): | |
| print(f"\n{'='*20} Fold {fold+1}/{n_splits} {'='*20}") | |
| X_train, X_test = X[train_idx], X[test_idx] | |
| y_train, y_test = y[train_idx], y[test_idx] | |
| scaler = StandardScaler() | |
| X_train_scaled = scaler.fit_transform(X_train) | |
| X_test_scaled = scaler.transform(X_test) | |
| X_train_tensor = torch.FloatTensor(X_train_scaled).to(DEVICE) | |
| y_train_tensor = torch.LongTensor(y_train).to(DEVICE) | |
| X_test_tensor = torch.FloatTensor(X_test_scaled).to(DEVICE) | |
| y_test_tensor = torch.LongTensor(y_test).to(DEVICE) | |
| train_dataset = TensorDataset(X_train_tensor, y_train_tensor) | |
| train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) | |
| model = Medium_Binary_Network(X_train_scaled.shape[1], hidden_sizes=hidden_sizes, dropout=dropout).to(DEVICE) | |
| print(f"Model created with {len(hidden_sizes)} hidden layers: {hidden_sizes}") | |
| criterion = nn.CrossEntropyLoss() | |
| optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) | |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) | |
| best_val_loss = float('inf') | |
| patience_counter = 0 | |
| best_model_state = None | |
| train_losses = [] | |
| val_losses = [] | |
| train_accs = [] | |
| val_accs = [] | |
| for epoch in range(epochs): | |
| model.train() | |
| running_loss = 0.0 | |
| running_corrects = 0 | |
| for inputs, labels in train_loader: | |
| optimizer.zero_grad() | |
| outputs = model(inputs) | |
| loss = criterion(outputs, labels) | |
| loss.backward() | |
| optimizer.step() | |
| _, preds = torch.max(outputs, 1) | |
| running_loss += loss.item() * inputs.size(0) | |
| running_corrects += torch.sum(preds == labels).item() | |
| epoch_loss = running_loss / len(train_loader.dataset) | |
| epoch_acc = running_corrects / len(train_loader.dataset) | |
| train_losses.append(epoch_loss) | |
| train_accs.append(epoch_acc) | |
| model.eval() | |
| with torch.no_grad(): | |
| val_outputs = model(X_test_tensor) | |
| val_loss = criterion(val_outputs, y_test_tensor) | |
| val_losses.append(val_loss.item()) | |
| _, val_preds = torch.max(val_outputs, 1) | |
| val_acc = torch.sum(val_preds == y_test_tensor).item() / len(y_test_tensor) | |
| val_accs.append(val_acc) | |
| if val_loss < best_val_loss: | |
| best_val_loss = val_loss | |
| patience_counter = 0 | |
| best_model_state = model.state_dict().copy() | |
| else: | |
| patience_counter += 1 | |
| if patience_counter >= early_stopping_patience: | |
| print(f"Early stopping at epoch {epoch+1}") | |
| break | |
| scheduler.step(val_loss) | |
| if (epoch + 1) % 10 == 0 or epoch == 0: | |
| print(f"Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}") | |
| if best_model_state: | |
| model.load_state_dict(best_model_state) | |
| print("Loaded best model weights") | |
| model.eval() | |
| with torch.no_grad(): | |
| test_outputs = model(X_test_tensor) | |
| _, predicted = torch.max(test_outputs, 1) | |
| test_acc = torch.sum(predicted == y_test_tensor).item() / len(y_test_tensor) | |
| y_test_np = y_test | |
| predicted_np = predicted.cpu().numpy() | |
| all_y_true.extend(y_test_np) | |
| all_y_pred.extend(predicted_np) | |
| precision, recall, f1, _ = precision_recall_fscore_support(y_test_np, predicted_np, average='weighted') | |
| fold_metric = { | |
| 'fold': fold + 1, | |
| 'accuracy': float(test_acc), | |
| 'precision': float(precision), | |
| 'recall': float(recall), | |
| 'f1': float(f1), | |
| 'val_loss': float(best_val_loss) | |
| } | |
| fold_metrics.append(fold_metric) | |
| fold_models.append({ | |
| 'model': model, | |
| 'scaler': scaler, | |
| 'label_encoder': label_encoder, | |
| 'imputer': imputer, | |
| 'score': test_acc | |
| }) | |
| if test_acc > best_fold_score: | |
| best_fold_score = test_acc | |
| best_fold_index = fold | |
| all_train_losses.extend(train_losses) | |
| all_val_losses.extend(val_losses) | |
| all_train_accs.extend(train_accs) | |
| all_val_accs.extend(val_accs) | |
| print(f"Fold {fold+1} Results:") | |
| print(f" Accuracy: {test_acc:.4f}") | |
| print(f" Precision: {precision:.4f}") | |
| print(f" Recall: {recall:.4f}") | |
| print(f" F1 Score: {f1:.4f}") | |
| overall_accuracy = accuracy_score(all_y_true, all_y_pred) | |
| overall_precision, overall_recall, overall_f1, _ = precision_recall_fscore_support( | |
| all_y_true, all_y_pred, average='weighted' | |
| ) | |
| fold_accuracies = [metrics['accuracy'] for metrics in fold_metrics] | |
| mean_accuracy = np.mean(fold_accuracies) | |
| std_accuracy = np.std(fold_accuracies) | |
| ci_lower = mean_accuracy - 1.96 * std_accuracy / np.sqrt(n_splits) | |
| ci_upper = mean_accuracy + 1.96 * std_accuracy / np.sqrt(n_splits) | |
| plot_learning_curve(all_train_losses, all_val_losses) | |
| plot_accuracy_curve(all_train_accs, all_val_accs) | |
| class_names = label_encoder.classes_ | |
| cm = confusion_matrix(all_y_true, all_y_pred) | |
| plt.figure(figsize=(10, 8)) | |
| sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', | |
| xticklabels=class_names, | |
| yticklabels=class_names) | |
| plt.title('Binary Classification Confusion Matrix (Cross-Validation)') | |
| plt.ylabel('True Label') | |
| plt.xlabel('Predicted Label') | |
| os.makedirs('plots/binary', exist_ok=True) | |
| plt.savefig('plots/binary/confusion_matrix_medium.png') | |
| plt.close() | |
| print("\n" + "="*50) | |
| print("CROSS-VALIDATION SUMMARY") | |
| print("="*50) | |
| print(f"Mean Accuracy: {mean_accuracy:.4f} ± {std_accuracy:.4f}") | |
| print(f"95% Confidence Interval: [{ci_lower:.4f}, {ci_upper:.4f}]") | |
| print(f"Overall Accuracy: {overall_accuracy:.4f}") | |
| print(f"Overall Precision: {overall_precision:.4f}") | |
| print(f"Overall Recall: {overall_recall:.4f}") | |
| print(f"Overall F1: {overall_f1:.4f}") | |
| print(f"\nBest Fold: {best_fold_index + 1} (Accuracy: {fold_metrics[best_fold_index]['accuracy']:.4f})") | |
| best_model_data = fold_models[best_fold_index] | |
| results = { | |
| 'fold_metrics': fold_metrics, | |
| 'overall': { | |
| 'accuracy': float(overall_accuracy), | |
| 'precision': float(overall_precision), | |
| 'recall': float(overall_recall), | |
| 'f1': float(overall_f1) | |
| }, | |
| 'cross_validation': { | |
| 'mean_accuracy': float(mean_accuracy), | |
| 'std_accuracy': float(std_accuracy), | |
| 'confidence_interval_95': [float(ci_lower), float(ci_upper)] | |
| }, | |
| 'best_fold': { | |
| 'fold': best_fold_index + 1, | |
| 'accuracy': float(fold_metrics[best_fold_index]['accuracy']) | |
| }, | |
| 'model_config': { | |
| 'hidden_sizes': hidden_sizes, | |
| 'dropout': dropout | |
| } | |
| } | |
| output_dir = 'models/medium_binary_classifier' | |
| save_paths = save_binary_model(best_model_data, results, output_dir=output_dir) | |
| return best_model_data, results, save_paths | |
| def plot_learning_curve(train_losses, val_losses): | |
| plt.figure(figsize=(10, 6)) | |
| epochs = range(1, len(train_losses) + 1) | |
| plt.plot(epochs, train_losses, 'b-', label='Training Loss') | |
| plt.plot(epochs, val_losses, 'r-', label='Validation Loss') | |
| plt.title('Learning Curve') | |
| plt.xlabel('Epochs') | |
| plt.ylabel('Loss') | |
| plt.legend() | |
| plt.grid(True) | |
| os.makedirs('plots/binary', exist_ok=True) | |
| plt.savefig('plots/binary/learning_curve.png') | |
| plt.close() | |
| print("Learning curve saved to plots/binary/learning_curve.png") | |
| def plot_accuracy_curve(train_accuracies, val_accuracies): | |
| plt.figure(figsize=(10, 6)) | |
| epochs = range(1, len(train_accuracies) + 1) | |
| plt.plot(epochs, train_accuracies, 'g-', label='Training Accuracy') | |
| plt.plot(epochs, val_accuracies, 'm-', label='Validation Accuracy') | |
| plt.title('Accuracy Curve') | |
| plt.xlabel('Epochs') | |
| plt.ylabel('Accuracy') | |
| plt.legend() | |
| plt.grid(True) | |
| plt.ylim(0, 1.0) | |
| os.makedirs('plots/binary', exist_ok=True) | |
| plt.savefig('plots/binary/accuracy_curve.png') | |
| plt.close() | |
| print("Accuracy curve saved to plots/binary/accuracy_curve.png") | |
| def select_features(df, feature_config): | |
| features_df = pd.DataFrame() | |
| if feature_config.get('basic_scores', True): | |
| if 'score_chat' in df.columns: | |
| features_df['score_chat'] = df['score_chat'] | |
| if 'score_coder' in df.columns: | |
| features_df['score_coder'] = df['score_coder'] | |
| if 'text_analysis' in df.columns: | |
| if feature_config.get('basic_text_stats'): | |
| for feature in feature_config['basic_text_stats']: | |
| features_df[f'basic_{feature}'] = df['text_analysis'].apply( | |
| lambda x: x.get('basic_stats', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
| ) | |
| if feature_config.get('morphological'): | |
| for feature in feature_config['morphological']: | |
| if feature == 'pos_distribution': | |
| pos_types = ['NOUN', 'VERB', 'ADJ', 'ADV', 'PROPN', 'DET', 'ADP', 'PRON', 'CCONJ', 'SCONJ'] | |
| for pos in pos_types: | |
| features_df[f'pos_{pos}'] = df['text_analysis'].apply( | |
| lambda x: x.get('morphological_analysis', {}).get('pos_distribution', {}).get(pos, 0) | |
| if isinstance(x, dict) else 0 | |
| ) | |
| else: | |
| features_df[f'morph_{feature}'] = df['text_analysis'].apply( | |
| lambda x: x.get('morphological_analysis', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
| ) | |
| if feature_config.get('syntactic'): | |
| for feature in feature_config['syntactic']: | |
| if feature == 'dependencies': | |
| dep_types = ['nsubj', 'obj', 'amod', 'nmod', 'ROOT', 'punct', 'case'] | |
| for dep in dep_types: | |
| features_df[f'dep_{dep}'] = df['text_analysis'].apply( | |
| lambda x: x.get('syntactic_analysis', {}).get('dependencies', {}).get(dep, 0) | |
| if isinstance(x, dict) else 0 | |
| ) | |
| else: | |
| features_df[f'synt_{feature}'] = df['text_analysis'].apply( | |
| lambda x: x.get('syntactic_analysis', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
| ) | |
| if feature_config.get('entities'): | |
| for feature in feature_config['entities']: | |
| if feature == 'entity_types': | |
| entity_types = ['PER', 'LOC', 'ORG'] | |
| for ent in entity_types: | |
| features_df[f'ent_{ent}'] = df['text_analysis'].apply( | |
| lambda x: x.get('named_entities', {}).get('entity_types', {}).get(ent, 0) | |
| if isinstance(x, dict) else 0 | |
| ) | |
| else: | |
| features_df[f'ent_{feature}'] = df['text_analysis'].apply( | |
| lambda x: x.get('named_entities', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
| ) | |
| if feature_config.get('diversity'): | |
| for feature in feature_config['diversity']: | |
| features_df[f'div_{feature}'] = df['text_analysis'].apply( | |
| lambda x: x.get('lexical_diversity', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
| ) | |
| if feature_config.get('structure'): | |
| for feature in feature_config['structure']: | |
| features_df[f'struct_{feature}'] = df['text_analysis'].apply( | |
| lambda x: x.get('text_structure', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
| ) | |
| if feature_config.get('readability'): | |
| for feature in feature_config['readability']: | |
| features_df[f'read_{feature}'] = df['text_analysis'].apply( | |
| lambda x: x.get('readability', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
| ) | |
| if feature_config.get('semantic'): | |
| features_df['semantic_coherence'] = df['text_analysis'].apply( | |
| lambda x: x.get('semantic_coherence', {}).get('avg_coherence_score', 0) if isinstance(x, dict) else 0 | |
| ) | |
| print(f"Generated {len(features_df.columns)} features") | |
| return features_df | |
| def augment_text_features(features_df, num_augmentations=5, noise_factor=0.05): | |
| augmented_dfs = [features_df] | |
| for i in range(num_augmentations): | |
| numeric_cols = features_df.select_dtypes(include=[np.number]).columns | |
| augmented_df = features_df.copy() | |
| for col in numeric_cols: | |
| augmented_df[col] = augmented_df[col].astype(float) | |
| noise = augmented_df[numeric_cols] * np.random.normal(0, noise_factor, size=augmented_df[numeric_cols].shape) | |
| augmented_df[numeric_cols] += noise | |
| augmented_dfs.append(augmented_df) | |
| return pd.concat(augmented_dfs, ignore_index=True) | |
| def cross_validate_binary_classifier(directory_path="experiments/results/two_scores_with_long_text_analyze_2048T", | |
| model_config=None, | |
| feature_config=None, | |
| n_splits=5, | |
| random_state=42, | |
| epochs=100, | |
| early_stopping_patience=10, | |
| use_augmentation=True, | |
| num_augmentations=2, | |
| noise_factor=0.05): | |
| if model_config is None: | |
| model_config = { | |
| 'hidden_layers': [256, 128, 64], | |
| 'dropout_rate': 0.3 | |
| } | |
| if feature_config is None: | |
| feature_config = { | |
| 'basic_scores': True, | |
| 'basic_text_stats': ['total_tokens', 'total_words', 'unique_words', 'stop_words', 'avg_word_length'], | |
| 'morphological': ['pos_distribution', 'unique_lemmas', 'lemma_word_ratio'], | |
| 'syntactic': ['dependencies', 'noun_chunks'], | |
| 'entities': ['total_entities', 'entity_types'], | |
| 'diversity': ['ttr', 'mtld'], | |
| 'structure': ['sentence_count', 'avg_sentence_length', 'question_sentences', 'exclamation_sentences'], | |
| 'readability': ['words_per_sentence', 'syllables_per_word', 'flesh_kincaid_score', 'long_words_percent'], | |
| 'semantic': True | |
| } | |
| print("\n" + "="*50) | |
| print("BINARY CLASSIFIER CROSS-VALIDATION") | |
| print("="*50) | |
| df = load_data_from_json(directory_path) | |
| features_df = select_features(df, feature_config) | |
| print(f"Selected features: {features_df.columns.tolist()}") | |
| imputer = SimpleImputer(strategy='mean') | |
| if use_augmentation: | |
| print(f"Augmenting data with {num_augmentations} copies (noise factor: {noise_factor})...") | |
| original_size = len(features_df) | |
| features_df_augmented = augment_text_features(features_df, | |
| num_augmentations=num_augmentations, | |
| noise_factor=noise_factor) | |
| y_augmented = np.tile(df['label'].values, num_augmentations + 1) | |
| print(f"Data size increased from {original_size} to {len(features_df_augmented)}") | |
| X = imputer.fit_transform(features_df_augmented) | |
| y = y_augmented | |
| else: | |
| X = imputer.fit_transform(features_df) | |
| y = df['label'].values | |
| label_encoder = LabelEncoder() | |
| y_encoded = label_encoder.fit_transform(y) | |
| print(f"Data size: {X.shape}") | |
| print(f"Labels distribution: {pd.Series(y).value_counts().to_dict()}") | |
| skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) | |
| fold_metrics = [] | |
| fold_models = [] | |
| all_y_true = [] | |
| all_y_pred = [] | |
| all_y_scores = [] | |
| best_fold_score = -1 | |
| best_fold_index = -1 | |
| print(f"\nPerforming {n_splits}-fold cross-validation...") | |
| num_avg_epochs = 5 | |
| saved_weights = [] | |
| for fold, (train_idx, test_idx) in enumerate(skf.split(X, y_encoded)): | |
| print(f"\n{'='*20} Fold {fold+1}/{n_splits} {'='*20}") | |
| X_train, X_test = X[train_idx], X[test_idx] | |
| y_train, y_test = y_encoded[train_idx], y_encoded[test_idx] | |
| scaler = StandardScaler() | |
| X_train_scaled = scaler.fit_transform(X_train) | |
| X_test_scaled = scaler.transform(X_test) | |
| X_train_tensor = torch.FloatTensor(X_train_scaled).to(DEVICE) | |
| y_train_tensor = torch.LongTensor(y_train).to(DEVICE) | |
| X_test_tensor = torch.FloatTensor(X_test_scaled).to(DEVICE) | |
| y_test_tensor = torch.LongTensor(y_test).to(DEVICE) | |
| train_dataset = TensorDataset(X_train_tensor, y_train_tensor) | |
| train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) | |
| num_classes = len(label_encoder.classes_) | |
| model = build_neural_network(X_train_scaled.shape[1], num_classes, | |
| hidden_layers=model_config['hidden_layers']) | |
| criterion = nn.CrossEntropyLoss() | |
| optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) | |
| scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95) | |
| best_val_loss = float('inf') | |
| patience_counter = 0 | |
| best_model_state = None | |
| train_losses = [] | |
| val_losses = [] | |
| saved_weights = [] | |
| for epoch in range(epochs): | |
| model.train() | |
| running_loss = 0.0 | |
| for inputs, labels in train_loader: | |
| optimizer.zero_grad() | |
| outputs = model(inputs) | |
| loss = criterion(outputs, labels) | |
| loss.backward() | |
| optimizer.step() | |
| running_loss += loss.item() * inputs.size(0) | |
| epoch_loss = running_loss / len(train_loader.dataset) | |
| train_losses.append(epoch_loss) | |
| model.eval() | |
| with torch.no_grad(): | |
| val_outputs = model(X_test_tensor) | |
| val_loss = criterion(val_outputs, y_test_tensor) | |
| val_losses.append(val_loss.item()) | |
| if val_loss < best_val_loss: | |
| best_val_loss = val_loss | |
| patience_counter = 0 | |
| best_model_state = model.state_dict().copy() | |
| else: | |
| patience_counter += 1 | |
| if patience_counter >= early_stopping_patience: | |
| print(f"Early stopping at epoch {epoch+1}") | |
| break | |
| if epoch >= epochs - num_avg_epochs: | |
| saved_weights.append(model.state_dict().copy()) | |
| scheduler.step() | |
| if (epoch + 1) % 10 == 0 or epoch == 0: | |
| print(f"Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}") | |
| if len(saved_weights) > 0: | |
| print(f"Averaging weights from last {len(saved_weights)} epochs...") | |
| avg_state_dict = saved_weights[0].copy() | |
| for key in avg_state_dict.keys(): | |
| if epoch >= epochs - num_avg_epochs: | |
| for i in range(1, len(saved_weights)): | |
| avg_state_dict[key] += saved_weights[i][key] | |
| avg_state_dict[key] /= len(saved_weights) | |
| model.load_state_dict(avg_state_dict) | |
| print("Model loaded with averaged weights") | |
| elif best_model_state: | |
| model.load_state_dict(best_model_state) | |
| print("Model loaded with best validation weights") | |
| model.eval() | |
| with torch.no_grad(): | |
| test_outputs = model(X_test_tensor) | |
| _, predicted = torch.max(test_outputs.data, 1) | |
| predicted_np = predicted.cpu().numpy() | |
| probabilities = torch.softmax(test_outputs, dim=1) | |
| pos_scores = probabilities[:, 1].cpu().numpy() | |
| all_y_true.extend(y_test) | |
| all_y_pred.extend(predicted_np) | |
| all_y_scores.extend(pos_scores) | |
| fold_acc = accuracy_score(y_test, predicted_np) | |
| precision, recall, f1, _ = precision_recall_fscore_support(y_test, predicted_np, average='weighted') | |
| try: | |
| fold_auc = roc_auc_score(y_test, pos_scores) | |
| except: | |
| fold_auc = 0.0 | |
| print("Warning: Could not compute AUC") | |
| fold_metrics.append({ | |
| 'fold': fold + 1, | |
| 'accuracy': float(fold_acc), | |
| 'precision': float(precision), | |
| 'recall': float(recall), | |
| 'f1': float(f1), | |
| 'auc': float(fold_auc), | |
| 'best_val_loss': float(best_val_loss) | |
| }) | |
| fold_models.append({ | |
| 'model': model, | |
| 'scaler': scaler, | |
| 'label_encoder': label_encoder, | |
| 'imputer': imputer, | |
| 'score': fold_acc | |
| }) | |
| if fold_acc > best_fold_score: | |
| best_fold_score = fold_acc | |
| best_fold_index = fold | |
| print(f"Fold {fold+1} Results:") | |
| print(f" Accuracy: {fold_acc:.4f}") | |
| print(f" Precision: {precision:.4f}") | |
| print(f" Recall: {recall:.4f}") | |
| print(f" F1 Score: {f1:.4f}") | |
| if fold_auc > 0: | |
| print(f" AUC: {fold_auc:.4f}") | |
| overall_accuracy = accuracy_score(all_y_true, all_y_pred) | |
| overall_precision, overall_recall, overall_f1, _ = precision_recall_fscore_support( | |
| all_y_true, all_y_pred, average='weighted' | |
| ) | |
| try: | |
| overall_auc = roc_auc_score(all_y_true, all_y_scores) | |
| except: | |
| overall_auc = 0.0 | |
| print("Warning: Could not compute overall AUC") | |
| fold_accuracies = [metrics['accuracy'] for metrics in fold_metrics] | |
| mean_accuracy = np.mean(fold_accuracies) | |
| std_accuracy = np.std(fold_accuracies) | |
| ci_lower = mean_accuracy - 1.96 * std_accuracy / np.sqrt(n_splits) | |
| ci_upper = mean_accuracy + 1.96 * std_accuracy / np.sqrt(n_splits) | |
| class_counts = np.bincount(y_encoded) | |
| baseline_accuracy = np.max(class_counts) / len(y_encoded) | |
| most_frequent_class = np.argmax(class_counts) | |
| t_stat, p_value = stats.ttest_1samp(fold_accuracies, baseline_accuracy) | |
| best_model_data = fold_models[best_fold_index] | |
| class_names = label_encoder.classes_ | |
| cm = confusion_matrix(all_y_true, all_y_pred) | |
| plt.figure(figsize=(10, 8)) | |
| sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', | |
| xticklabels=class_names, | |
| yticklabels=class_names) | |
| plt.title('Binary Classification Confusion Matrix (Cross-Validation)') | |
| plt.ylabel('True Label') | |
| plt.xlabel('Predicted Label') | |
| os.makedirs('plots/binary', exist_ok=True) | |
| plt.savefig('plots/binary/confusion_matrix_cv.png') | |
| plt.close() | |
| if overall_auc > 0: | |
| from sklearn.metrics import roc_curve | |
| fpr, tpr, _ = roc_curve(all_y_true, all_y_scores) | |
| plt.figure(figsize=(10, 8)) | |
| plt.plot(fpr, tpr, lw=2, label=f'ROC curve (AUC = {overall_auc:.4f})') | |
| plt.plot([0, 1], [0, 1], 'k--', lw=2) | |
| plt.xlim([0.0, 1.0]) | |
| plt.ylim([0.0, 1.05]) | |
| plt.xlabel('False Positive Rate') | |
| plt.ylabel('True Positive Rate') | |
| plt.title('Receiver Operating Characteristic (ROC)') | |
| plt.legend(loc="lower right") | |
| plt.savefig('plots/binary/roc_curve.png') | |
| plt.close() | |
| results = { | |
| 'fold_metrics': fold_metrics, | |
| 'overall': { | |
| 'accuracy': float(overall_accuracy), | |
| 'precision': float(overall_precision), | |
| 'recall': float(overall_recall), | |
| 'f1': float(overall_f1), | |
| 'auc': float(overall_auc) if overall_auc > 0 else None | |
| }, | |
| 'cross_validation': { | |
| 'mean_accuracy': float(mean_accuracy), | |
| 'std_accuracy': float(std_accuracy), | |
| 'confidence_interval_95': [float(ci_lower), float(ci_upper)], | |
| 'baseline_accuracy': float(baseline_accuracy), | |
| 'most_frequent_class': str(label_encoder.inverse_transform([most_frequent_class])[0]), | |
| 't_statistic': float(t_stat), | |
| 'p_value': float(p_value), | |
| 'statistically_significant': "yes" if p_value < 0.05 else "no" | |
| }, | |
| 'best_fold': { | |
| 'fold': best_fold_index + 1, | |
| 'accuracy': float(fold_metrics[best_fold_index]['accuracy']) | |
| } | |
| } | |
| print("\n" + "="*50) | |
| print("CROSS-VALIDATION SUMMARY") | |
| print("="*50) | |
| print(f"Mean Accuracy: {mean_accuracy:.4f} ± {std_accuracy:.4f}") | |
| print(f"95% Confidence Interval: [{ci_lower:.4f}, {ci_upper:.4f}]") | |
| print(f"Overall Accuracy: {overall_accuracy:.4f}") | |
| print(f"Baseline Accuracy: {baseline_accuracy:.4f} (most frequent class: {label_encoder.inverse_transform([most_frequent_class])[0]})") | |
| print(f"T-statistic: {t_stat:.4f}, p-value: {p_value:.6f}") | |
| if p_value < 0.05: | |
| print("The model is significantly better than the baseline (p < 0.05)") | |
| else: | |
| print("The model is NOT significantly better than the baseline (p >= 0.05)") | |
| print(f"\nBest Fold: {best_fold_index + 1} (Accuracy: {fold_metrics[best_fold_index]['accuracy']:.4f})") | |
| return best_model_data, results | |
| def save_binary_model(model_data, results, output_dir='models/binary_classifier'): | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| model_path = os.path.join(output_dir, 'nn_model.pt') | |
| torch.save(model_data['model'].state_dict(), model_path) | |
| scaler_path = os.path.join(output_dir, 'scaler.joblib') | |
| joblib.dump(model_data['scaler'], scaler_path) | |
| encoder_path = os.path.join(output_dir, 'label_encoder.joblib') | |
| joblib.dump(model_data['label_encoder'], encoder_path) | |
| imputer_path = os.path.join(output_dir, 'imputer.joblib') | |
| joblib.dump(model_data['imputer'], imputer_path) | |
| results_path = os.path.join(output_dir, 'cv_results.json') | |
| with open(results_path, 'w') as f: | |
| json.dump(results, f, indent=4) | |
| print(f"Binary model saved to {model_path}") | |
| print(f"CV results saved to {results_path}") | |
| return { | |
| 'model_path': model_path, | |
| 'scaler_path': scaler_path, | |
| 'encoder_path': encoder_path, | |
| 'imputer_path': imputer_path, | |
| 'results_path': results_path | |
| } | |
| def parse_args(): | |
| parser = argparse.ArgumentParser(description='Binary Neural Network Classifier (Human vs AI) with Cross-Validation') | |
| parser.add_argument('--random_seed', type=int, default=42, | |
| help='Random seed for reproducibility') | |
| parser.add_argument('--folds', type=int, default=5, | |
| help='Number of cross-validation folds') | |
| parser.add_argument('--epochs', type=int, default=100, | |
| help='Maximum number of training epochs per fold') | |
| parser.add_argument('--patience', type=int, default=10, | |
| help='Early stopping patience (epochs)') | |
| return parser.parse_args() | |
| def main(): | |
| print("\n" + "="*50) | |
| print("MEDIUM BINARY CLASSIFIER") | |
| print("="*50 + "\n") | |
| args = parse_args() | |
| seed = args.random_seed | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| if GPU_AVAILABLE: | |
| torch.cuda.manual_seed_all(seed) | |
| plt.switch_backend('agg') | |
| feature_config = { | |
| 'basic_scores': True, | |
| 'basic_text_stats': ['total_tokens', 'total_words', 'unique_words', 'stop_words', 'avg_word_length'], | |
| 'morphological': ['pos_distribution', 'unique_lemmas', 'lemma_word_ratio'], | |
| 'syntactic': ['dependencies', 'noun_chunks'], | |
| 'entities': ['total_entities', 'entity_types'], | |
| 'diversity': ['ttr', 'mtld'], | |
| 'structure': ['sentence_count', 'avg_sentence_length', 'question_sentences', 'exclamation_sentences'], | |
| 'readability': ['words_per_sentence', 'syllables_per_word', 'flesh_kincaid_score', 'long_words_percent'], | |
| 'semantic': True | |
| } | |
| model_data, results, save_paths = cross_validate_simple_classifier( | |
| directory_path="experiments/results/two_scores_with_long_text_analyze_2048T", | |
| feature_config=feature_config, | |
| n_splits=5, | |
| random_state=seed, | |
| epochs=150, | |
| hidden_sizes=[256, 192, 128, 64], | |
| dropout=0.3, | |
| early_stopping_patience=15 | |
| ) | |
| print("\nTraining completed.") | |
| print(f"Medium binary classifier saved to {save_paths['model_path']}") | |
| if __name__ == "__main__": | |
| main() | |