# �� Copilot �ͦ� import pandas as pd import numpy as np from typing import Dict, List import json from transformers import pipeline from datasets import Dataset class RentalAnalyzer: """���θ�Ƥ��R�� - Hugging Face Spaces����""" def __init__(self, df: pd.DataFrame, use_hf_models: bool = True): """ ��l�Ƥ��R�� Args: df: ���θ��DataFrame use_hf_models: �O�_�ϥ�Hugging Face�ҫ� """ self.df = df.copy() self.use_hf_models = use_hf_models self.analysis_results = {} # ��l��Hugging Face�ҫ� self.sentiment_analyzer = None if use_hf_models: try: # �ϥθ��p���^�屡�P���R�ҫ��A�קK���J���D self.sentiment_analyzer = pipeline( "sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", return_all_scores=False ) except Exception as e: print(f"Warning: Could not load Hugging Face model: {e}") # ���ըϥιw�]�ҫ� try: self.sentiment_analyzer = pipeline("sentiment-analysis") except Exception as e2: print(f"Warning: Could not load any sentiment model: {e2}") self.use_hf_models = False def clean_data(self) -> pd.DataFrame: """�M�~���""" # �������Ƹ�� original_count = len(self.df) self.df = self.df.drop_duplicates(subset=['title', 'address', 'price']) # �B�z������� self.df['price'] = pd.to_numeric(self.df['price'], errors='coerce') self.df = self.df[self.df['price'] > 0] # �B�z�W�Ƹ�� self.df['area'] = pd.to_numeric(self.df['area'], errors='coerce') self.df = self.df[self.df['area'] > 0] # �p��C�W���� self.df['price_per_ping'] = self.df['price'] / self.df['area'] # �������`�� self.df = self.remove_outliers(self.df, 'price') # �K�[������� self.add_categorical_columns() return self.df def remove_outliers(self, df: pd.DataFrame, column: str) -> pd.DataFrame: """�������`�ȡ]�ϥ�IQR��k�^""" Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)] def add_categorical_columns(self): """�K�[�������""" # �����϶� self.df['price_range'] = pd.cut( self.df['price'], bins=[0, 20000, 25000, 30000, 35000, float('inf')], labels=['<20K', '20-25K', '25-30K', '30-35K', '>35K'] ) # �W�ư϶� self.df['area_range'] = pd.cut( self.df['area'], bins=[0, 25, 30, 35, 40, float('inf')], labels=['<25�W', '25-30�W', '30-35�W', '35-40�W', '>40�W'] ) def basic_statistics(self) -> Dict: """�򥻲έp���R""" stats = { 'total_properties': len(self.df), 'price_stats': { 'mean': round(self.df['price'].mean(), 2), 'median': round(self.df['price'].median(), 2), 'std': round(self.df['price'].std(), 2), 'min': int(self.df['price'].min()), 'max': int(self.df['price'].max()), 'q25': round(self.df['price'].quantile(0.25), 2), 'q75': round(self.df['price'].quantile(0.75), 2) }, 'area_stats': { 'mean': round(self.df['area'].mean(), 2), 'median': round(self.df['area'].median(), 2), 'min': round(self.df['area'].min(), 1), 'max': round(self.df['area'].max(), 1) }, 'price_per_ping_stats': { 'mean': round(self.df['price_per_ping'].mean(), 2), 'median': round(self.df['price_per_ping'].median(), 2), 'min': round(self.df['price_per_ping'].min(), 2), 'max': round(self.df['price_per_ping'].max(), 2) } } return stats def price_distribution_analysis(self) -> Dict: """�����������R""" distribution = self.df['price_range'].value_counts().sort_index() return distribution.to_dict() def area_distribution_analysis(self) -> Dict: """�W�Ƥ������R""" distribution = self.df['area_range'].value_counts().sort_index() return distribution.to_dict() def keywords_analysis(self) -> Dict: """����r���R""" # �w�q�Ыά�������r keywords = [ '�񱶹B', '�񨮯�', '�q��', '���x', '������', '�޲z�O', '�ĥ�', '�q��', '�w�R', '�K�Q', '�ͬ�����', '�ǰ�', '���s', '���C', '�a��', '�a�q', '�N��', '�~���', '���N�]', '�R�e', '��G', '��l�W', '���s', '�����P' ] keyword_counts = {keyword: 0 for keyword in keywords} descriptions = self.df['raw_info'].dropna().tolist() for desc in descriptions: for keyword in keywords: if keyword in str(desc): keyword_counts[keyword] += 1 # �ƧǨè��e10�� sorted_keywords = dict( sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10] ) return sorted_keywords def huggingface_analysis(self) -> Dict: """�ϥ�Hugging Face�ҫ��i����R""" if not self.use_hf_models or self.sentiment_analyzer is None: return {} try: descriptions = self.df['raw_info'].dropna().tolist()[:10] # ���e10���קK�W�� if not descriptions: return {} # ���P���R sentiments = [] for desc in descriptions: try: result = self.sentiment_analyzer(desc[:100]) # ������� sentiments.append(result[0]['label'] if result else 'NEUTRAL') except: sentiments.append('NEUTRAL') # �έp���P���� sentiment_counts = {} for sentiment in sentiments: sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1 # �Ы�Dataset hf_dataset = Dataset.from_dict({ 'text': descriptions, 'price': self.df['price'].head(len(descriptions)).tolist(), 'area': self.df['area'].head(len(descriptions)).tolist(), 'sentiment': sentiments }) return { 'sentiment_distribution': sentiment_counts, 'dataset_size': len(hf_dataset), 'sample_analysis': True } except Exception as e: print(f"Hugging Face analysis error: {e}") return {} def correlation_analysis(self) -> Dict: """�����ʤ��R""" numeric_columns = ['price', 'area', 'price_per_ping'] available_columns = [ col for col in numeric_columns if col in self.df.columns and not self.df[col].isna().all() ] if len(available_columns) < 2: return {} correlation_matrix = self.df[available_columns].corr() correlations = {} for i, col1 in enumerate(available_columns): for j, col2 in enumerate(available_columns): if i < j: # �קK���� correlations[f"{col1}_vs_{col2}"] = round( correlation_matrix.loc[col1, col2], 3 ) return correlations def generate_insights(self) -> List[str]: """�ͦ����R�}��""" insights = [] # �򥻲έp�}�� if 'basic_stats' in self.analysis_results: stats = self.analysis_results['basic_stats'] insights.append(f"�@��� {stats['total_properties']} ���ŦX���󪺯��Ϊ���") insights.append(f"���������� {stats['price_stats']['mean']:,} ��") insights.append(f"��������Ƭ� {stats['price_stats']['median']:,} ��") if stats['price_stats']['mean'] > stats['price_stats']['median']: insights.append("���������V�k���סA�s�b����������԰�������") # �������R�}�� if 'price_distribution' in self.analysis_results: dist = self.analysis_results['price_distribution'] if dist: most_common_range = max(dist, key=dist.get) count = dist[most_common_range] percentage = (count / self.analysis_results['basic_stats']['total_properties']) * 100 insights.append(f"�̱`���������϶��O {most_common_range}�A�� {percentage:.1f}%") # Hugging Face���R�}�� if 'hf_analysis' in self.analysis_results and self.analysis_results['hf_analysis']: hf_results = self.analysis_results['hf_analysis'] if 'sentiment_distribution' in hf_results: insights.append("�w�ϥ�Hugging Face�ҫ��i�污�P���R") return insights def run_analysis(self) -> Dict: """���槹����R""" # �M�~��� self.clean_data() # �򥻲έp self.analysis_results['basic_stats'] = self.basic_statistics() # �������R self.analysis_results['price_distribution'] = self.price_distribution_analysis() self.analysis_results['area_distribution'] = self.area_distribution_analysis() # ����r���R self.analysis_results['keywords_analysis'] = self.keywords_analysis() # �����ʤ��R self.analysis_results['correlation'] = self.correlation_analysis() # Hugging Face���R if self.use_hf_models: self.analysis_results['hf_analysis'] = self.huggingface_analysis() # �ͦ��}�� self.analysis_results['insights'] = self.generate_insights() return self.analysis_results