Spaces:
Paused
Paused
| # �� Copilot �ͦ� | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple | |
| import json | |
| from transformers import pipeline, AutoTokenizer, AutoModel | |
| from datasets import Dataset | |
| import re | |
| class RentalDataAnalyzer: | |
| """���θ�Ƥ��R��""" | |
| def __init__(self, data_path: str = None): | |
| """ | |
| ��l�Ƥ��R�� | |
| Args: | |
| data_path: ����ɮ��| | |
| """ | |
| self.data_path = data_path | |
| self.df = None | |
| self.analysis_results = {} | |
| # ��l��Hugging Face�ҫ��Ω��r���R | |
| self.sentiment_analyzer = None | |
| self.text_classifier = None | |
| def load_data(self, data_path: str = None) -> pd.DataFrame: | |
| """���J���""" | |
| if data_path: | |
| self.data_path = data_path | |
| try: | |
| if self.data_path.endswith('.json'): | |
| with open(self.data_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| self.df = pd.DataFrame(data) | |
| elif self.data_path.endswith('.csv'): | |
| self.df = pd.read_csv(self.data_path, encoding='utf-8-sig') | |
| else: | |
| raise ValueError("���䴩���ɮ榡") | |
| print(f"���\���J {len(self.df)} �����") | |
| return self.df | |
| except Exception as e: | |
| print(f"���J��Ʈɵo�Ϳ��~: {e}") | |
| return None | |
| def clean_data(self) -> pd.DataFrame: | |
| """�M�~���""" | |
| if self.df is None: | |
| print("�����J���") | |
| return None | |
| print("�}�l�M�~���...") | |
| # �������Ƹ�� | |
| original_count = len(self.df) | |
| self.df = self.df.drop_duplicates(subset=['title', 'address', 'price']) | |
| print(f"���� {original_count - len(self.df)} �����Ƹ��") | |
| # �M�z������� | |
| self.df['price'] = pd.to_numeric(self.df['price'], errors='coerce') | |
| self.df = self.df[self.df['price'] > 0] # �����L��� | |
| # �M�z�W�Ƹ�� | |
| self.df['area'] = pd.to_numeric(self.df['area'], errors='coerce') | |
| # �p��C�W���� | |
| self.df['price_per_ping'] = self.df.apply( | |
| lambda row: row['price'] / row['area'] if row['area'] > 0 else np.nan, | |
| axis=1 | |
| ) | |
| # �������`�ȡ]�ϥ�IQR��k�^ | |
| self.df = self.remove_outliers(self.df, 'price') | |
| print(f"�M�~��Ѿl {len(self.df)} �����ĸ��") | |
| return self.df | |
| def remove_outliers(self, df: pd.DataFrame, column: str) -> pd.DataFrame: | |
| """�������`��""" | |
| Q1 = df[column].quantile(0.25) | |
| Q3 = df[column].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| outliers_count = len(df[(df[column] < lower_bound) | (df[column] > upper_bound)]) | |
| print(f"���� {outliers_count} �� {column} ���`��") | |
| return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)] | |
| def basic_statistics(self) -> Dict: | |
| """�έp���R""" | |
| if self.df is None or len(self.df) == 0: | |
| return {} | |
| stats = { | |
| 'total_properties': len(self.df), | |
| 'price_stats': { | |
| 'mean': round(self.df['price'].mean(), 2), | |
| 'median': round(self.df['price'].median(), 2), | |
| 'std': round(self.df['price'].std(), 2), | |
| 'min': self.df['price'].min(), | |
| 'max': self.df['price'].max(), | |
| 'q25': round(self.df['price'].quantile(0.25), 2), | |
| 'q75': round(self.df['price'].quantile(0.75), 2) | |
| }, | |
| 'area_stats': { | |
| 'mean': round(self.df['area'].mean(), 2), | |
| 'median': round(self.df['area'].median(), 2), | |
| 'min': self.df['area'].min(), | |
| 'max': self.df['area'].max() | |
| } if not self.df['area'].isna().all() else {}, | |
| 'price_per_ping_stats': { | |
| 'mean': round(self.df['price_per_ping'].mean(), 2), | |
| 'median': round(self.df['price_per_ping'].median(), 2), | |
| 'min': round(self.df['price_per_ping'].min(), 2), | |
| 'max': round(self.df['price_per_ping'].max(), 2) | |
| } if not self.df['price_per_ping'].isna().all() else {} | |
| } | |
| self.analysis_results['basic_stats'] = stats | |
| return stats | |
| def price_distribution_analysis(self) -> Dict: | |
| """�����������R""" | |
| if self.df is None or len(self.df) == 0: | |
| return {} | |
| # �w�q�����϶� | |
| price_bins = [0, 15000, 20000, 25000, 30000, 40000, float('inf')] | |
| price_labels = ['<15K', '15-20K', '20-25K', '25-30K', '30-40K', '>40K'] | |
| self.df['price_range'] = pd.cut(self.df['price'], bins=price_bins, labels=price_labels, right=False) | |
| distribution = self.df['price_range'].value_counts().sort_index() | |
| distribution_dict = { | |
| 'ranges': distribution.index.tolist(), | |
| 'counts': distribution.values.tolist(), | |
| 'percentages': (distribution / len(self.df) * 100).round(2).tolist() | |
| } | |
| self.analysis_results['price_distribution'] = distribution_dict | |
| return distribution_dict | |
| def area_analysis(self) -> Dict: | |
| """�W�Ƥ��R""" | |
| if self.df is None or len(self.df) == 0 or self.df['area'].isna().all(): | |
| return {} | |
| # �w�q�W�ư϶� | |
| area_bins = [0, 20, 30, 40, 50, float('inf')] | |
| area_labels = ['<20�W', '20-30�W', '30-40�W', '40-50�W', '>50�W'] | |
| self.df['area_range'] = pd.cut(self.df['area'], bins=area_bins, labels=area_labels, right=False) | |
| area_distribution = self.df['area_range'].value_counts().sort_index() | |
| area_dict = { | |
| 'ranges': area_distribution.index.tolist(), | |
| 'counts': area_distribution.values.tolist(), | |
| 'percentages': (area_distribution / len(self.df) * 100).round(2).tolist() | |
| } | |
| self.analysis_results['area_analysis'] = area_dict | |
| return area_dict | |
| def setup_huggingface_models(self): | |
| """�]�mHugging Face�ҫ�""" | |
| try: | |
| print("���JHugging Face�ҫ�...") | |
| # ���J���屡�P���R�ҫ� | |
| self.sentiment_analyzer = pipeline( | |
| "sentiment-analysis", | |
| model="ckiplab/bert-base-chinese-ws", | |
| return_all_scores=True | |
| ) | |
| print("Hugging Face�ҫ����J����") | |
| except Exception as e: | |
| print(f"���JHugging Face�ҫ��ɵo�Ϳ��~: {e}") | |
| def analyze_descriptions(self) -> Dict: | |
| """���R����y�z��r""" | |
| if self.df is None or 'raw_info' not in self.df.columns: | |
| return {} | |
| descriptions = self.df['raw_info'].dropna().tolist() | |
| if not descriptions: | |
| return {} | |
| # ����r���R | |
| keywords_analysis = self.analyze_keywords(descriptions) | |
| analysis_result = { | |
| 'keywords_frequency': keywords_analysis, | |
| 'total_descriptions': len(descriptions) | |
| } | |
| self.analysis_results['description_analysis'] = analysis_result | |
| return analysis_result | |
| def analyze_keywords(self, descriptions: List[str]) -> Dict: | |
| """���R����r�W�v""" | |
| # �w�q�Ыά�������r | |
| keywords = [ | |
| '�B', '��', '�q��', '���x', '������', '�z�O', | |
| '�ĥ�', '�q��', '�w�R', '�K�Q', '�ͬ�����', '�ǰ�', | |
| '���s', '���C', '�a��', '�a�q', '�N��', '�~���' | |
| ] | |
| keyword_counts = {keyword: 0 for keyword in keywords} | |
| for desc in descriptions: | |
| for keyword in keywords: | |
| if keyword in desc: | |
| keyword_counts[keyword] += 1 | |
| # �ƧǨè��e10�� | |
| sorted_keywords = dict(sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10]) | |
| return sorted_keywords | |
| def correlation_analysis(self) -> Dict: | |
| """�����ʤ��R""" | |
| if self.df is None or len(self.df) == 0: | |
| return {} | |
| numeric_columns = ['price', 'area', 'price_per_ping'] | |
| available_columns = [col for col in numeric_columns if col in self.df.columns and not self.df[col].isna().all()] | |
| if len(available_columns) < 2: | |
| return {} | |
| correlation_matrix = self.df[available_columns].corr() | |
| correlation_dict = {} | |
| for i, col1 in enumerate(available_columns): | |
| for j, col2 in enumerate(available_columns): | |
| if i < j: # �קK���� | |
| correlation_dict[f"{col1}_vs_{col2}"] = round(correlation_matrix.loc[col1, col2], 3) | |
| self.analysis_results['correlation'] = correlation_dict | |
| return correlation_dict | |
| def generate_insights(self) -> List[str]: | |
| """�ͦ����R�}��""" | |
| insights = [] | |
| if 'basic_stats' in self.analysis_results: | |
| stats = self.analysis_results['basic_stats'] | |
| insights.append(f"�@��� {stats['total_properties']} ���ŦX�����Ϊ���") | |
| insights.append(f"���������� {stats['price_stats']['mean']:,} ��") | |
| insights.append(f"��������Ƭ� {stats['price_stats']['median']:,} ��") | |
| if stats['price_stats']['mean'] > stats['price_stats']['median']: | |
| insights.append("���������V�k���סA�s�b�����������������") | |
| if 'price_distribution' in self.analysis_results: | |
| dist = self.analysis_results['price_distribution'] | |
| max_range_idx = dist['percentages'].index(max(dist['percentages'])) | |
| most_common_range = dist['ranges'][max_range_idx] | |
| percentage = dist['percentages'][max_range_idx] | |
| insights.append(f"�̱`���������϶��O {most_common_range}�A�� {percentage}%") | |
| if 'area_analysis' in self.analysis_results: | |
| area = self.analysis_results['area_analysis'] | |
| if area: | |
| max_area_idx = area['percentages'].index(max(area['percentages'])) | |
| most_common_area = area['ranges'][max_area_idx] | |
| insights.append(f"�̱`�����W�ư϶��O {most_common_area}") | |
| return insights | |
| def run_full_analysis(self) -> Dict: | |
| """���槹����R""" | |
| print("�}�l���槹����R...") | |
| # �έp | |
| basic_stats = self.basic_statistics() | |
| print("? �έp���R����") | |
| # �����������R | |
| price_dist = self.price_distribution_analysis() | |
| print("? �����������R����") | |
| # �W�Ƥ��R | |
| area_analysis = self.area_analysis() | |
| print("? �W�Ƥ��R����") | |
| # �y�z��r���R | |
| desc_analysis = self.analyze_descriptions() | |
| print("? �y�z��r���R����") | |
| # �����ʤ��R | |
| correlation = self.correlation_analysis() | |
| print("? �����ʤ��R����") | |
| # �ͦ��}�� | |
| insights = self.generate_insights() | |
| print("? �}��ͦ�����") | |
| self.analysis_results['insights'] = insights | |
| return self.analysis_results | |
| def save_analysis_results(self, filename: str = "analysis_results.json"): | |
| """�x�s���R���G""" | |
| try: | |
| with open(f"output/{filename}", 'w', encoding='utf-8') as f: | |
| json.dump(self.analysis_results, f, ensure_ascii=False, indent=2) | |
| print(f"���R���G�w�x�s�� output/{filename}") | |
| except Exception as e: | |
| print(f"�x�s���R���G�ɵo�Ϳ��~: {e}") | |
| def print_summary(self): | |
| """�L�X���R�K�n""" | |
| if not self.analysis_results: | |
| print("�S�����R���G�i���") | |
| return | |
| print("\n" + "="*50) | |
| print("���������s�ϯ��Υ������R���i") | |
| print("="*50) | |
| if 'insights' in self.analysis_results: | |
| print("\n? ���n�}��:") | |
| for i, insight in enumerate(self.analysis_results['insights'], 1): | |
| print(f"{i}. {insight}") | |
| if 'basic_stats' in self.analysis_results: | |
| stats = self.analysis_results['basic_stats'] | |
| print(f"\n? �����έp:") | |
| print(f" ��������: {stats['price_stats']['mean']:,} ��") | |
| print(f" �����: {stats['price_stats']['median']:,} ��") | |
| print(f" �̧C����: {stats['price_stats']['min']:,} ��") | |
| print(f" �̰�����: {stats['price_stats']['max']:,} ��") | |
| print(f" �зǮt: {stats['price_stats']['std']:,} ��") | |
| if 'price_distribution' in self.analysis_results: | |
| print(f"\n? ��������:") | |
| dist = self.analysis_results['price_distribution'] | |
| for range_name, count, percentage in zip(dist['ranges'], dist['counts'], dist['percentages']): | |
| print(f" {range_name}: {count} �� ({percentage}%)") | |
| print("\n" + "="*50) | |
| if __name__ == "__main__": | |
| # ���դ��R�� | |
| analyzer = RentalDataAnalyzer() | |
| # ���J��� | |
| df = analyzer.load_data("output/rental_data.csv") | |
| if df is not None: | |
| # �M�~��� | |
| analyzer.clean_data() | |
| # ���槹����R | |
| results = analyzer.run_full_analysis() | |
| # �x�s���G | |
| analyzer.save_analysis_results() | |
| # ��ܺK�n | |
| analyzer.print_summary() |