# �� Copilot �ͦ� import pandas as pd import numpy as np from typing import Dict, List, Tuple import json from transformers import pipeline, AutoTokenizer, AutoModel from datasets import Dataset import re class RentalDataAnalyzer: """���θ�Ƥ��R��""" def __init__(self, data_path: str = None): """ ��l�Ƥ��R�� Args: data_path: ����ɮ׸��| """ self.data_path = data_path self.df = None self.analysis_results = {} # ��l��Hugging Face�ҫ��Ω��r���R self.sentiment_analyzer = None self.text_classifier = None def load_data(self, data_path: str = None) -> pd.DataFrame: """���J���""" if data_path: self.data_path = data_path try: if self.data_path.endswith('.json'): with open(self.data_path, 'r', encoding='utf-8') as f: data = json.load(f) self.df = pd.DataFrame(data) elif self.data_path.endswith('.csv'): self.df = pd.read_csv(self.data_path, encoding='utf-8-sig') else: raise ValueError("���䴩���ɮ׮榡") print(f"���\���J {len(self.df)} �����") return self.df except Exception as e: print(f"���J��Ʈɵo�Ϳ��~: {e}") return None def clean_data(self) -> pd.DataFrame: """�M�~���""" if self.df is None: print("�Х����J���") return None print("�}�l�M�~���...") # �������Ƹ�� original_count = len(self.df) self.df = self.df.drop_duplicates(subset=['title', 'address', 'price']) print(f"���� {original_count - len(self.df)} �����Ƹ��") # �M�z������� self.df['price'] = pd.to_numeric(self.df['price'], errors='coerce') self.df = self.df[self.df['price'] > 0] # �����L�į��� # �M�z�W�Ƹ�� self.df['area'] = pd.to_numeric(self.df['area'], errors='coerce') # �p��C�W���� self.df['price_per_ping'] = self.df.apply( lambda row: row['price'] / row['area'] if row['area'] > 0 else np.nan, axis=1 ) # �������`�ȡ]�ϥ�IQR��k�^ self.df = self.remove_outliers(self.df, 'price') print(f"�M�~��Ѿl {len(self.df)} �����ĸ��") return self.df def remove_outliers(self, df: pd.DataFrame, column: str) -> pd.DataFrame: """�������`��""" Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers_count = len(df[(df[column] < lower_bound) | (df[column] > upper_bound)]) print(f"���� {outliers_count} �� {column} ���`��") return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)] def basic_statistics(self) -> Dict: """�򥻲έp���R""" if self.df is None or len(self.df) == 0: return {} stats = { 'total_properties': len(self.df), 'price_stats': { 'mean': round(self.df['price'].mean(), 2), 'median': round(self.df['price'].median(), 2), 'std': round(self.df['price'].std(), 2), 'min': self.df['price'].min(), 'max': self.df['price'].max(), 'q25': round(self.df['price'].quantile(0.25), 2), 'q75': round(self.df['price'].quantile(0.75), 2) }, 'area_stats': { 'mean': round(self.df['area'].mean(), 2), 'median': round(self.df['area'].median(), 2), 'min': self.df['area'].min(), 'max': self.df['area'].max() } if not self.df['area'].isna().all() else {}, 'price_per_ping_stats': { 'mean': round(self.df['price_per_ping'].mean(), 2), 'median': round(self.df['price_per_ping'].median(), 2), 'min': round(self.df['price_per_ping'].min(), 2), 'max': round(self.df['price_per_ping'].max(), 2) } if not self.df['price_per_ping'].isna().all() else {} } self.analysis_results['basic_stats'] = stats return stats def price_distribution_analysis(self) -> Dict: """�����������R""" if self.df is None or len(self.df) == 0: return {} # �w�q�����϶� price_bins = [0, 15000, 20000, 25000, 30000, 40000, float('inf')] price_labels = ['<15K', '15-20K', '20-25K', '25-30K', '30-40K', '>40K'] self.df['price_range'] = pd.cut(self.df['price'], bins=price_bins, labels=price_labels, right=False) distribution = self.df['price_range'].value_counts().sort_index() distribution_dict = { 'ranges': distribution.index.tolist(), 'counts': distribution.values.tolist(), 'percentages': (distribution / len(self.df) * 100).round(2).tolist() } self.analysis_results['price_distribution'] = distribution_dict return distribution_dict def area_analysis(self) -> Dict: """�W�Ƥ��R""" if self.df is None or len(self.df) == 0 or self.df['area'].isna().all(): return {} # �w�q�W�ư϶� area_bins = [0, 20, 30, 40, 50, float('inf')] area_labels = ['<20�W', '20-30�W', '30-40�W', '40-50�W', '>50�W'] self.df['area_range'] = pd.cut(self.df['area'], bins=area_bins, labels=area_labels, right=False) area_distribution = self.df['area_range'].value_counts().sort_index() area_dict = { 'ranges': area_distribution.index.tolist(), 'counts': area_distribution.values.tolist(), 'percentages': (area_distribution / len(self.df) * 100).round(2).tolist() } self.analysis_results['area_analysis'] = area_dict return area_dict def setup_huggingface_models(self): """�]�mHugging Face�ҫ�""" try: print("���JHugging Face�ҫ�...") # ���J���屡�P���R�ҫ� self.sentiment_analyzer = pipeline( "sentiment-analysis", model="ckiplab/bert-base-chinese-ws", return_all_scores=True ) print("Hugging Face�ҫ����J����") except Exception as e: print(f"���JHugging Face�ҫ��ɵo�Ϳ��~: {e}") def analyze_descriptions(self) -> Dict: """���R����y�z��r""" if self.df is None or 'raw_info' not in self.df.columns: return {} descriptions = self.df['raw_info'].dropna().tolist() if not descriptions: return {} # ����r���R keywords_analysis = self.analyze_keywords(descriptions) analysis_result = { 'keywords_frequency': keywords_analysis, 'total_descriptions': len(descriptions) } self.analysis_results['description_analysis'] = analysis_result return analysis_result def analyze_keywords(self, descriptions: List[str]) -> Dict: """���R����r�W�v""" # �w�q�Ыά�������r keywords = [ '�񱶹B', '�񨮯�', '�q��', '���x', '������', '�޲z�O', '�ĥ�', '�q��', '�w�R', '�K�Q', '�ͬ�����', '�ǰ�', '���s', '���C', '�a��', '�a�q', '�N��', '�~���' ] keyword_counts = {keyword: 0 for keyword in keywords} for desc in descriptions: for keyword in keywords: if keyword in desc: keyword_counts[keyword] += 1 # �ƧǨè��e10�� sorted_keywords = dict(sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10]) return sorted_keywords def correlation_analysis(self) -> Dict: """�����ʤ��R""" if self.df is None or len(self.df) == 0: return {} numeric_columns = ['price', 'area', 'price_per_ping'] available_columns = [col for col in numeric_columns if col in self.df.columns and not self.df[col].isna().all()] if len(available_columns) < 2: return {} correlation_matrix = self.df[available_columns].corr() correlation_dict = {} for i, col1 in enumerate(available_columns): for j, col2 in enumerate(available_columns): if i < j: # �קK���� correlation_dict[f"{col1}_vs_{col2}"] = round(correlation_matrix.loc[col1, col2], 3) self.analysis_results['correlation'] = correlation_dict return correlation_dict def generate_insights(self) -> List[str]: """�ͦ����R�}��""" insights = [] if 'basic_stats' in self.analysis_results: stats = self.analysis_results['basic_stats'] insights.append(f"�@��� {stats['total_properties']} ���ŦX���󪺯��Ϊ���") insights.append(f"���������� {stats['price_stats']['mean']:,} ��") insights.append(f"��������Ƭ� {stats['price_stats']['median']:,} ��") if stats['price_stats']['mean'] > stats['price_stats']['median']: insights.append("���������V�k���סA�s�b����������԰�������") if 'price_distribution' in self.analysis_results: dist = self.analysis_results['price_distribution'] max_range_idx = dist['percentages'].index(max(dist['percentages'])) most_common_range = dist['ranges'][max_range_idx] percentage = dist['percentages'][max_range_idx] insights.append(f"�̱`���������϶��O {most_common_range}�A�� {percentage}%") if 'area_analysis' in self.analysis_results: area = self.analysis_results['area_analysis'] if area: max_area_idx = area['percentages'].index(max(area['percentages'])) most_common_area = area['ranges'][max_area_idx] insights.append(f"�̱`�����W�ư϶��O {most_common_area}") return insights def run_full_analysis(self) -> Dict: """���槹����R""" print("�}�l���槹����R...") # �򥻲έp basic_stats = self.basic_statistics() print("? �򥻲έp���R����") # �����������R price_dist = self.price_distribution_analysis() print("? �����������R����") # �W�Ƥ��R area_analysis = self.area_analysis() print("? �W�Ƥ��R����") # �y�z��r���R desc_analysis = self.analyze_descriptions() print("? �y�z��r���R����") # �����ʤ��R correlation = self.correlation_analysis() print("? �����ʤ��R����") # �ͦ��}�� insights = self.generate_insights() print("? �}��ͦ�����") self.analysis_results['insights'] = insights return self.analysis_results def save_analysis_results(self, filename: str = "analysis_results.json"): """�x�s���R���G""" try: with open(f"output/{filename}", 'w', encoding='utf-8') as f: json.dump(self.analysis_results, f, ensure_ascii=False, indent=2) print(f"���R���G�w�x�s�� output/{filename}") except Exception as e: print(f"�x�s���R���G�ɵo�Ϳ��~: {e}") def print_summary(self): """�L�X���R�K�n""" if not self.analysis_results: print("�S�����R���G�i���") return print("\n" + "="*50) print("���������s�ϯ��Υ������R���i") print("="*50) if 'insights' in self.analysis_results: print("\n? ���n�}��:") for i, insight in enumerate(self.analysis_results['insights'], 1): print(f"{i}. {insight}") if 'basic_stats' in self.analysis_results: stats = self.analysis_results['basic_stats'] print(f"\n? �����έp:") print(f" ��������: {stats['price_stats']['mean']:,} ��") print(f" �����: {stats['price_stats']['median']:,} ��") print(f" �̧C����: {stats['price_stats']['min']:,} ��") print(f" �̰�����: {stats['price_stats']['max']:,} ��") print(f" �зǮt: {stats['price_stats']['std']:,} ��") if 'price_distribution' in self.analysis_results: print(f"\n? ��������:") dist = self.analysis_results['price_distribution'] for range_name, count, percentage in zip(dist['ranges'], dist['counts'], dist['percentages']): print(f" {range_name}: {count} �� ({percentage}%)") print("\n" + "="*50) if __name__ == "__main__": # ���դ��R�� analyzer = RentalDataAnalyzer() # ���J��� df = analyzer.load_data("output/rental_data.csv") if df is not None: # �M�~��� analyzer.clean_data() # ���槹����R results = analyzer.run_full_analysis() # �x�s���G analyzer.save_analysis_results() # ��ܺK�n analyzer.print_summary()