Spaces:
Paused
Paused
| # �� Copilot �ͦ� | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List | |
| import json | |
| from transformers import pipeline | |
| from datasets import Dataset | |
| class RentalAnalyzer: | |
| """���θ�Ƥ��R�� - Hugging Face Spaces����""" | |
| def __init__(self, df: pd.DataFrame, use_hf_models: bool = True): | |
| """ | |
| ��l�Ƥ��R�� | |
| Args: | |
| df: ����DataFrame | |
| use_hf_models: �O�_�ϥ�Hugging Face�ҫ� | |
| """ | |
| self.df = df.copy() | |
| self.use_hf_models = use_hf_models | |
| self.analysis_results = {} | |
| # ��l��Hugging Face�ҫ� | |
| self.sentiment_analyzer = None | |
| if use_hf_models: | |
| try: | |
| # �ϥθ��p���^�屡�P���R�ҫ��A�קK���J���D | |
| self.sentiment_analyzer = pipeline( | |
| "sentiment-analysis", | |
| model="cardiffnlp/twitter-roberta-base-sentiment-latest", | |
| return_all_scores=False | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Could not load Hugging Face model: {e}") | |
| # ���ըϥιw�]�ҫ� | |
| try: | |
| self.sentiment_analyzer = pipeline("sentiment-analysis") | |
| except Exception as e2: | |
| print(f"Warning: Could not load any sentiment model: {e2}") | |
| self.use_hf_models = False | |
| def clean_data(self) -> pd.DataFrame: | |
| """�M�~���""" | |
| # �������Ƹ�� | |
| original_count = len(self.df) | |
| self.df = self.df.drop_duplicates(subset=['title', 'address', 'price']) | |
| # �B�z������� | |
| self.df['price'] = pd.to_numeric(self.df['price'], errors='coerce') | |
| self.df = self.df[self.df['price'] > 0] | |
| # �B�z�W�Ƹ�� | |
| self.df['area'] = pd.to_numeric(self.df['area'], errors='coerce') | |
| self.df = self.df[self.df['area'] > 0] | |
| # �p��C�W���� | |
| self.df['price_per_ping'] = self.df['price'] / self.df['area'] | |
| # �������`�� | |
| self.df = self.remove_outliers(self.df, 'price') | |
| # �K�[������� | |
| self.add_categorical_columns() | |
| return self.df | |
| def remove_outliers(self, df: pd.DataFrame, column: str) -> pd.DataFrame: | |
| """�������`�ȡ]�ϥ�IQR��k�^""" | |
| Q1 = df[column].quantile(0.25) | |
| Q3 = df[column].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)] | |
| def add_categorical_columns(self): | |
| """�K�[�������""" | |
| # �����϶� | |
| self.df['price_range'] = pd.cut( | |
| self.df['price'], | |
| bins=[0, 20000, 25000, 30000, 35000, float('inf')], | |
| labels=['<20K', '20-25K', '25-30K', '30-35K', '>35K'] | |
| ) | |
| # �W�ư϶� | |
| self.df['area_range'] = pd.cut( | |
| self.df['area'], | |
| bins=[0, 25, 30, 35, 40, float('inf')], | |
| labels=['<25�W', '25-30�W', '30-35�W', '35-40�W', '>40�W'] | |
| ) | |
| def basic_statistics(self) -> Dict: | |
| """�έp���R""" | |
| stats = { | |
| 'total_properties': len(self.df), | |
| 'price_stats': { | |
| 'mean': round(self.df['price'].mean(), 2), | |
| 'median': round(self.df['price'].median(), 2), | |
| 'std': round(self.df['price'].std(), 2), | |
| 'min': int(self.df['price'].min()), | |
| 'max': int(self.df['price'].max()), | |
| 'q25': round(self.df['price'].quantile(0.25), 2), | |
| 'q75': round(self.df['price'].quantile(0.75), 2) | |
| }, | |
| 'area_stats': { | |
| 'mean': round(self.df['area'].mean(), 2), | |
| 'median': round(self.df['area'].median(), 2), | |
| 'min': round(self.df['area'].min(), 1), | |
| 'max': round(self.df['area'].max(), 1) | |
| }, | |
| 'price_per_ping_stats': { | |
| 'mean': round(self.df['price_per_ping'].mean(), 2), | |
| 'median': round(self.df['price_per_ping'].median(), 2), | |
| 'min': round(self.df['price_per_ping'].min(), 2), | |
| 'max': round(self.df['price_per_ping'].max(), 2) | |
| } | |
| } | |
| return stats | |
| def price_distribution_analysis(self) -> Dict: | |
| """�����������R""" | |
| distribution = self.df['price_range'].value_counts().sort_index() | |
| return distribution.to_dict() | |
| def area_distribution_analysis(self) -> Dict: | |
| """�W�Ƥ������R""" | |
| distribution = self.df['area_range'].value_counts().sort_index() | |
| return distribution.to_dict() | |
| def keywords_analysis(self) -> Dict: | |
| """����r���R""" | |
| # �w�q�Ыά�������r | |
| keywords = [ | |
| '�B', '��', '�q��', '���x', '������', '�z�O', | |
| '�ĥ�', '�q��', '�w�R', '�K�Q', '�ͬ�����', '�ǰ�', | |
| '���s', '���C', '�a��', '�a�q', '�N��', '�~���', | |
| '���N�]', '�R�e', '��G', '��l�W', '���s', '�����P' | |
| ] | |
| keyword_counts = {keyword: 0 for keyword in keywords} | |
| descriptions = self.df['raw_info'].dropna().tolist() | |
| for desc in descriptions: | |
| for keyword in keywords: | |
| if keyword in str(desc): | |
| keyword_counts[keyword] += 1 | |
| # �ƧǨè��e10�� | |
| sorted_keywords = dict( | |
| sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10] | |
| ) | |
| return sorted_keywords | |
| def huggingface_analysis(self) -> Dict: | |
| """�ϥ�Hugging Face�ҫ��i����R""" | |
| if not self.use_hf_models or self.sentiment_analyzer is None: | |
| return {} | |
| try: | |
| descriptions = self.df['raw_info'].dropna().tolist()[:10] # ���e10���קK�W�� | |
| if not descriptions: | |
| return {} | |
| # ���P���R | |
| sentiments = [] | |
| for desc in descriptions: | |
| try: | |
| result = self.sentiment_analyzer(desc[:100]) # ������� | |
| sentiments.append(result[0]['label'] if result else 'NEUTRAL') | |
| except: | |
| sentiments.append('NEUTRAL') | |
| # �έp���P���� | |
| sentiment_counts = {} | |
| for sentiment in sentiments: | |
| sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1 | |
| # �Ы�Dataset | |
| hf_dataset = Dataset.from_dict({ | |
| 'text': descriptions, | |
| 'price': self.df['price'].head(len(descriptions)).tolist(), | |
| 'area': self.df['area'].head(len(descriptions)).tolist(), | |
| 'sentiment': sentiments | |
| }) | |
| return { | |
| 'sentiment_distribution': sentiment_counts, | |
| 'dataset_size': len(hf_dataset), | |
| 'sample_analysis': True | |
| } | |
| except Exception as e: | |
| print(f"Hugging Face analysis error: {e}") | |
| return {} | |
| def correlation_analysis(self) -> Dict: | |
| """�����ʤ��R""" | |
| numeric_columns = ['price', 'area', 'price_per_ping'] | |
| available_columns = [ | |
| col for col in numeric_columns | |
| if col in self.df.columns and not self.df[col].isna().all() | |
| ] | |
| if len(available_columns) < 2: | |
| return {} | |
| correlation_matrix = self.df[available_columns].corr() | |
| correlations = {} | |
| for i, col1 in enumerate(available_columns): | |
| for j, col2 in enumerate(available_columns): | |
| if i < j: # �קK���� | |
| correlations[f"{col1}_vs_{col2}"] = round( | |
| correlation_matrix.loc[col1, col2], 3 | |
| ) | |
| return correlations | |
| def generate_insights(self) -> List[str]: | |
| """�ͦ����R�}��""" | |
| insights = [] | |
| # �έp�}�� | |
| if 'basic_stats' in self.analysis_results: | |
| stats = self.analysis_results['basic_stats'] | |
| insights.append(f"�@��� {stats['total_properties']} ���ŦX�����Ϊ���") | |
| insights.append(f"���������� {stats['price_stats']['mean']:,} ��") | |
| insights.append(f"��������Ƭ� {stats['price_stats']['median']:,} ��") | |
| if stats['price_stats']['mean'] > stats['price_stats']['median']: | |
| insights.append("���������V�k���סA�s�b�����������������") | |
| # �������R�}�� | |
| if 'price_distribution' in self.analysis_results: | |
| dist = self.analysis_results['price_distribution'] | |
| if dist: | |
| most_common_range = max(dist, key=dist.get) | |
| count = dist[most_common_range] | |
| percentage = (count / self.analysis_results['basic_stats']['total_properties']) * 100 | |
| insights.append(f"�̱`���������϶��O {most_common_range}�A�� {percentage:.1f}%") | |
| # Hugging Face���R�}�� | |
| if 'hf_analysis' in self.analysis_results and self.analysis_results['hf_analysis']: | |
| hf_results = self.analysis_results['hf_analysis'] | |
| if 'sentiment_distribution' in hf_results: | |
| insights.append("�w�ϥ�Hugging Face�ҫ��i�污�P���R") | |
| return insights | |
| def run_analysis(self) -> Dict: | |
| """���槹����R""" | |
| # �M�~��� | |
| self.clean_data() | |
| # �έp | |
| self.analysis_results['basic_stats'] = self.basic_statistics() | |
| # �������R | |
| self.analysis_results['price_distribution'] = self.price_distribution_analysis() | |
| self.analysis_results['area_distribution'] = self.area_distribution_analysis() | |
| # ����r���R | |
| self.analysis_results['keywords_analysis'] = self.keywords_analysis() | |
| # �����ʤ��R | |
| self.analysis_results['correlation'] = self.correlation_analysis() | |
| # Hugging Face���R | |
| if self.use_hf_models: | |
| self.analysis_results['hf_analysis'] = self.huggingface_analysis() | |
| # �ͦ��}�� | |
| self.analysis_results['insights'] = self.generate_insights() | |
| return self.analysis_results |