591Analyzer / rental_analyzer.py
54justin's picture
Upload 13 files
f3d41f0 verified
# �� Copilot �ͦ�
import pandas as pd
import numpy as np
from typing import Dict, List
import json
from transformers import pipeline
from datasets import Dataset
class RentalAnalyzer:
"""���θ�Ƥ��R�� - Hugging Face Spaces����"""
def __init__(self, df: pd.DataFrame, use_hf_models: bool = True):
"""
��l�Ƥ��R��
Args:
df: ����DataFrame
use_hf_models: �O�_�ϥ�Hugging Face�ҫ�
"""
self.df = df.copy()
self.use_hf_models = use_hf_models
self.analysis_results = {}
# ��l��Hugging Face�ҫ�
self.sentiment_analyzer = None
if use_hf_models:
try:
# �ϥθ��p���^�屡�P���R�ҫ��A�קK���J���D
self.sentiment_analyzer = pipeline(
"sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment-latest",
return_all_scores=False
)
except Exception as e:
print(f"Warning: Could not load Hugging Face model: {e}")
# ���ըϥιw�]�ҫ�
try:
self.sentiment_analyzer = pipeline("sentiment-analysis")
except Exception as e2:
print(f"Warning: Could not load any sentiment model: {e2}")
self.use_hf_models = False
def clean_data(self) -> pd.DataFrame:
"""�M�~���"""
# �������Ƹ��
original_count = len(self.df)
self.df = self.df.drop_duplicates(subset=['title', 'address', 'price'])
# �B�z�������
self.df['price'] = pd.to_numeric(self.df['price'], errors='coerce')
self.df = self.df[self.df['price'] > 0]
# �B�z�W�Ƹ��
self.df['area'] = pd.to_numeric(self.df['area'], errors='coerce')
self.df = self.df[self.df['area'] > 0]
# �p��C�W����
self.df['price_per_ping'] = self.df['price'] / self.df['area']
# �������`��
self.df = self.remove_outliers(self.df, 'price')
# �K�[�������
self.add_categorical_columns()
return self.df
def remove_outliers(self, df: pd.DataFrame, column: str) -> pd.DataFrame:
"""�������`�ȡ]�ϥ�IQR��k�^"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
def add_categorical_columns(self):
"""�K�[�������"""
# �����϶�
self.df['price_range'] = pd.cut(
self.df['price'],
bins=[0, 20000, 25000, 30000, 35000, float('inf')],
labels=['<20K', '20-25K', '25-30K', '30-35K', '>35K']
)
# �W�ư϶�
self.df['area_range'] = pd.cut(
self.df['area'],
bins=[0, 25, 30, 35, 40, float('inf')],
labels=['<25�W', '25-30�W', '30-35�W', '35-40�W', '>40�W']
)
def basic_statistics(self) -> Dict:
"""�򥻲έp���R"""
stats = {
'total_properties': len(self.df),
'price_stats': {
'mean': round(self.df['price'].mean(), 2),
'median': round(self.df['price'].median(), 2),
'std': round(self.df['price'].std(), 2),
'min': int(self.df['price'].min()),
'max': int(self.df['price'].max()),
'q25': round(self.df['price'].quantile(0.25), 2),
'q75': round(self.df['price'].quantile(0.75), 2)
},
'area_stats': {
'mean': round(self.df['area'].mean(), 2),
'median': round(self.df['area'].median(), 2),
'min': round(self.df['area'].min(), 1),
'max': round(self.df['area'].max(), 1)
},
'price_per_ping_stats': {
'mean': round(self.df['price_per_ping'].mean(), 2),
'median': round(self.df['price_per_ping'].median(), 2),
'min': round(self.df['price_per_ping'].min(), 2),
'max': round(self.df['price_per_ping'].max(), 2)
}
}
return stats
def price_distribution_analysis(self) -> Dict:
"""�����������R"""
distribution = self.df['price_range'].value_counts().sort_index()
return distribution.to_dict()
def area_distribution_analysis(self) -> Dict:
"""�W�Ƥ������R"""
distribution = self.df['area_range'].value_counts().sort_index()
return distribution.to_dict()
def keywords_analysis(self) -> Dict:
"""����r���R"""
# �w�q�Ыά�������r
keywords = [
'�񱶹B', '�񨮯�', '�q��', '���x', '������', '�޲z�O',
'�ĥ�', '�q��', '�w�R', '�K�Q', '�ͬ�����', '�ǰ�',
'���s', '���C', '�a��', '�a�q', '�N��', '�~���',
'���N�]', '�R�e', '��G', '��l�W', '���s', '�����P'
]
keyword_counts = {keyword: 0 for keyword in keywords}
descriptions = self.df['raw_info'].dropna().tolist()
for desc in descriptions:
for keyword in keywords:
if keyword in str(desc):
keyword_counts[keyword] += 1
# �ƧǨè��e10��
sorted_keywords = dict(
sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10]
)
return sorted_keywords
def huggingface_analysis(self) -> Dict:
"""�ϥ�Hugging Face�ҫ��i����R"""
if not self.use_hf_models or self.sentiment_analyzer is None:
return {}
try:
descriptions = self.df['raw_info'].dropna().tolist()[:10] # ���e10���קK�W��
if not descriptions:
return {}
# ���P���R
sentiments = []
for desc in descriptions:
try:
result = self.sentiment_analyzer(desc[:100]) # �������
sentiments.append(result[0]['label'] if result else 'NEUTRAL')
except:
sentiments.append('NEUTRAL')
# �έp���P����
sentiment_counts = {}
for sentiment in sentiments:
sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1
# �Ы�Dataset
hf_dataset = Dataset.from_dict({
'text': descriptions,
'price': self.df['price'].head(len(descriptions)).tolist(),
'area': self.df['area'].head(len(descriptions)).tolist(),
'sentiment': sentiments
})
return {
'sentiment_distribution': sentiment_counts,
'dataset_size': len(hf_dataset),
'sample_analysis': True
}
except Exception as e:
print(f"Hugging Face analysis error: {e}")
return {}
def correlation_analysis(self) -> Dict:
"""�����ʤ��R"""
numeric_columns = ['price', 'area', 'price_per_ping']
available_columns = [
col for col in numeric_columns
if col in self.df.columns and not self.df[col].isna().all()
]
if len(available_columns) < 2:
return {}
correlation_matrix = self.df[available_columns].corr()
correlations = {}
for i, col1 in enumerate(available_columns):
for j, col2 in enumerate(available_columns):
if i < j: # �קK����
correlations[f"{col1}_vs_{col2}"] = round(
correlation_matrix.loc[col1, col2], 3
)
return correlations
def generate_insights(self) -> List[str]:
"""�ͦ����R�}��"""
insights = []
# �򥻲έp�}��
if 'basic_stats' in self.analysis_results:
stats = self.analysis_results['basic_stats']
insights.append(f"�@��� {stats['total_properties']} ���ŦX���󪺯��Ϊ���")
insights.append(f"���������� {stats['price_stats']['mean']:,} ��")
insights.append(f"��������Ƭ� {stats['price_stats']['median']:,} ��")
if stats['price_stats']['mean'] > stats['price_stats']['median']:
insights.append("���������V�k���סA�s�b����������԰�������")
# �������R�}��
if 'price_distribution' in self.analysis_results:
dist = self.analysis_results['price_distribution']
if dist:
most_common_range = max(dist, key=dist.get)
count = dist[most_common_range]
percentage = (count / self.analysis_results['basic_stats']['total_properties']) * 100
insights.append(f"�̱`���������϶��O {most_common_range}�A�� {percentage:.1f}%")
# Hugging Face���R�}��
if 'hf_analysis' in self.analysis_results and self.analysis_results['hf_analysis']:
hf_results = self.analysis_results['hf_analysis']
if 'sentiment_distribution' in hf_results:
insights.append("�w�ϥ�Hugging Face�ҫ��i�污�P���R")
return insights
def run_analysis(self) -> Dict:
"""���槹����R"""
# �M�~���
self.clean_data()
# �򥻲έp
self.analysis_results['basic_stats'] = self.basic_statistics()
# �������R
self.analysis_results['price_distribution'] = self.price_distribution_analysis()
self.analysis_results['area_distribution'] = self.area_distribution_analysis()
# ����r���R
self.analysis_results['keywords_analysis'] = self.keywords_analysis()
# �����ʤ��R
self.analysis_results['correlation'] = self.correlation_analysis()
# Hugging Face���R
if self.use_hf_models:
self.analysis_results['hf_analysis'] = self.huggingface_analysis()
# �ͦ��}��
self.analysis_results['insights'] = self.generate_insights()
return self.analysis_results