591Analyzer / analyzer.py
54justin's picture
Upload 7 files
f205f47 verified
# �� Copilot �ͦ�
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
import json
from transformers import pipeline, AutoTokenizer, AutoModel
from datasets import Dataset
import re
class RentalDataAnalyzer:
"""���θ�Ƥ��R��"""
def __init__(self, data_path: str = None):
"""
��l�Ƥ��R��
Args:
data_path: ����ɮ׸��|
"""
self.data_path = data_path
self.df = None
self.analysis_results = {}
# ��l��Hugging Face�ҫ��Ω��r���R
self.sentiment_analyzer = None
self.text_classifier = None
def load_data(self, data_path: str = None) -> pd.DataFrame:
"""���J���"""
if data_path:
self.data_path = data_path
try:
if self.data_path.endswith('.json'):
with open(self.data_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.df = pd.DataFrame(data)
elif self.data_path.endswith('.csv'):
self.df = pd.read_csv(self.data_path, encoding='utf-8-sig')
else:
raise ValueError("���䴩���ɮ׮榡")
print(f"���\���J {len(self.df)} �����")
return self.df
except Exception as e:
print(f"���J��Ʈɵo�Ϳ��~: {e}")
return None
def clean_data(self) -> pd.DataFrame:
"""�M�~���"""
if self.df is None:
print("�����J���")
return None
print("�}�l�M�~���...")
# �������Ƹ��
original_count = len(self.df)
self.df = self.df.drop_duplicates(subset=['title', 'address', 'price'])
print(f"���� {original_count - len(self.df)} �����Ƹ��")
# �M�z�������
self.df['price'] = pd.to_numeric(self.df['price'], errors='coerce')
self.df = self.df[self.df['price'] > 0] # �����L���
# �M�z�W�Ƹ��
self.df['area'] = pd.to_numeric(self.df['area'], errors='coerce')
# �p��C�W����
self.df['price_per_ping'] = self.df.apply(
lambda row: row['price'] / row['area'] if row['area'] > 0 else np.nan,
axis=1
)
# �������`�ȡ]�ϥ�IQR��k�^
self.df = self.remove_outliers(self.df, 'price')
print(f"�M�~��Ѿl {len(self.df)} �����ĸ��")
return self.df
def remove_outliers(self, df: pd.DataFrame, column: str) -> pd.DataFrame:
"""�������`��"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers_count = len(df[(df[column] < lower_bound) | (df[column] > upper_bound)])
print(f"���� {outliers_count} �� {column} ���`��")
return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
def basic_statistics(self) -> Dict:
"""�򥻲έp���R"""
if self.df is None or len(self.df) == 0:
return {}
stats = {
'total_properties': len(self.df),
'price_stats': {
'mean': round(self.df['price'].mean(), 2),
'median': round(self.df['price'].median(), 2),
'std': round(self.df['price'].std(), 2),
'min': self.df['price'].min(),
'max': self.df['price'].max(),
'q25': round(self.df['price'].quantile(0.25), 2),
'q75': round(self.df['price'].quantile(0.75), 2)
},
'area_stats': {
'mean': round(self.df['area'].mean(), 2),
'median': round(self.df['area'].median(), 2),
'min': self.df['area'].min(),
'max': self.df['area'].max()
} if not self.df['area'].isna().all() else {},
'price_per_ping_stats': {
'mean': round(self.df['price_per_ping'].mean(), 2),
'median': round(self.df['price_per_ping'].median(), 2),
'min': round(self.df['price_per_ping'].min(), 2),
'max': round(self.df['price_per_ping'].max(), 2)
} if not self.df['price_per_ping'].isna().all() else {}
}
self.analysis_results['basic_stats'] = stats
return stats
def price_distribution_analysis(self) -> Dict:
"""�����������R"""
if self.df is None or len(self.df) == 0:
return {}
# �w�q�����϶�
price_bins = [0, 15000, 20000, 25000, 30000, 40000, float('inf')]
price_labels = ['<15K', '15-20K', '20-25K', '25-30K', '30-40K', '>40K']
self.df['price_range'] = pd.cut(self.df['price'], bins=price_bins, labels=price_labels, right=False)
distribution = self.df['price_range'].value_counts().sort_index()
distribution_dict = {
'ranges': distribution.index.tolist(),
'counts': distribution.values.tolist(),
'percentages': (distribution / len(self.df) * 100).round(2).tolist()
}
self.analysis_results['price_distribution'] = distribution_dict
return distribution_dict
def area_analysis(self) -> Dict:
"""�W�Ƥ��R"""
if self.df is None or len(self.df) == 0 or self.df['area'].isna().all():
return {}
# �w�q�W�ư϶�
area_bins = [0, 20, 30, 40, 50, float('inf')]
area_labels = ['<20�W', '20-30�W', '30-40�W', '40-50�W', '>50�W']
self.df['area_range'] = pd.cut(self.df['area'], bins=area_bins, labels=area_labels, right=False)
area_distribution = self.df['area_range'].value_counts().sort_index()
area_dict = {
'ranges': area_distribution.index.tolist(),
'counts': area_distribution.values.tolist(),
'percentages': (area_distribution / len(self.df) * 100).round(2).tolist()
}
self.analysis_results['area_analysis'] = area_dict
return area_dict
def setup_huggingface_models(self):
"""�]�mHugging Face�ҫ�"""
try:
print("���JHugging Face�ҫ�...")
# ���J���屡�P���R�ҫ�
self.sentiment_analyzer = pipeline(
"sentiment-analysis",
model="ckiplab/bert-base-chinese-ws",
return_all_scores=True
)
print("Hugging Face�ҫ����J����")
except Exception as e:
print(f"���JHugging Face�ҫ��ɵo�Ϳ��~: {e}")
def analyze_descriptions(self) -> Dict:
"""���R����y�z��r"""
if self.df is None or 'raw_info' not in self.df.columns:
return {}
descriptions = self.df['raw_info'].dropna().tolist()
if not descriptions:
return {}
# ����r���R
keywords_analysis = self.analyze_keywords(descriptions)
analysis_result = {
'keywords_frequency': keywords_analysis,
'total_descriptions': len(descriptions)
}
self.analysis_results['description_analysis'] = analysis_result
return analysis_result
def analyze_keywords(self, descriptions: List[str]) -> Dict:
"""���R����r�W�v"""
# �w�q�Ыά�������r
keywords = [
'�񱶹B', '�񨮯�', '�q��', '���x', '������', '�޲z�O',
'�ĥ�', '�q��', '�w�R', '�K�Q', '�ͬ�����', '�ǰ�',
'���s', '���C', '�a��', '�a�q', '�N��', '�~���'
]
keyword_counts = {keyword: 0 for keyword in keywords}
for desc in descriptions:
for keyword in keywords:
if keyword in desc:
keyword_counts[keyword] += 1
# �ƧǨè��e10��
sorted_keywords = dict(sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10])
return sorted_keywords
def correlation_analysis(self) -> Dict:
"""�����ʤ��R"""
if self.df is None or len(self.df) == 0:
return {}
numeric_columns = ['price', 'area', 'price_per_ping']
available_columns = [col for col in numeric_columns if col in self.df.columns and not self.df[col].isna().all()]
if len(available_columns) < 2:
return {}
correlation_matrix = self.df[available_columns].corr()
correlation_dict = {}
for i, col1 in enumerate(available_columns):
for j, col2 in enumerate(available_columns):
if i < j: # �קK����
correlation_dict[f"{col1}_vs_{col2}"] = round(correlation_matrix.loc[col1, col2], 3)
self.analysis_results['correlation'] = correlation_dict
return correlation_dict
def generate_insights(self) -> List[str]:
"""�ͦ����R�}��"""
insights = []
if 'basic_stats' in self.analysis_results:
stats = self.analysis_results['basic_stats']
insights.append(f"�@��� {stats['total_properties']} ���ŦX���󪺯��Ϊ���")
insights.append(f"���������� {stats['price_stats']['mean']:,} ��")
insights.append(f"��������Ƭ� {stats['price_stats']['median']:,} ��")
if stats['price_stats']['mean'] > stats['price_stats']['median']:
insights.append("���������V�k���סA�s�b����������԰�������")
if 'price_distribution' in self.analysis_results:
dist = self.analysis_results['price_distribution']
max_range_idx = dist['percentages'].index(max(dist['percentages']))
most_common_range = dist['ranges'][max_range_idx]
percentage = dist['percentages'][max_range_idx]
insights.append(f"�̱`���������϶��O {most_common_range}�A�� {percentage}%")
if 'area_analysis' in self.analysis_results:
area = self.analysis_results['area_analysis']
if area:
max_area_idx = area['percentages'].index(max(area['percentages']))
most_common_area = area['ranges'][max_area_idx]
insights.append(f"�̱`�����W�ư϶��O {most_common_area}")
return insights
def run_full_analysis(self) -> Dict:
"""���槹����R"""
print("�}�l���槹����R...")
# �򥻲έp
basic_stats = self.basic_statistics()
print("? �򥻲έp���R����")
# �����������R
price_dist = self.price_distribution_analysis()
print("? �����������R����")
# �W�Ƥ��R
area_analysis = self.area_analysis()
print("? �W�Ƥ��R����")
# �y�z��r���R
desc_analysis = self.analyze_descriptions()
print("? �y�z��r���R����")
# �����ʤ��R
correlation = self.correlation_analysis()
print("? �����ʤ��R����")
# �ͦ��}��
insights = self.generate_insights()
print("? �}��ͦ�����")
self.analysis_results['insights'] = insights
return self.analysis_results
def save_analysis_results(self, filename: str = "analysis_results.json"):
"""�x�s���R���G"""
try:
with open(f"output/{filename}", 'w', encoding='utf-8') as f:
json.dump(self.analysis_results, f, ensure_ascii=False, indent=2)
print(f"���R���G�w�x�s�� output/{filename}")
except Exception as e:
print(f"�x�s���R���G�ɵo�Ϳ��~: {e}")
def print_summary(self):
"""�L�X���R�K�n"""
if not self.analysis_results:
print("�S�����R���G�i���")
return
print("\n" + "="*50)
print("���������s�ϯ��Υ������R���i")
print("="*50)
if 'insights' in self.analysis_results:
print("\n? ���n�}��:")
for i, insight in enumerate(self.analysis_results['insights'], 1):
print(f"{i}. {insight}")
if 'basic_stats' in self.analysis_results:
stats = self.analysis_results['basic_stats']
print(f"\n? �����έp:")
print(f" ��������: {stats['price_stats']['mean']:,} ��")
print(f" �����: {stats['price_stats']['median']:,} ��")
print(f" �̧C����: {stats['price_stats']['min']:,} ��")
print(f" �̰�����: {stats['price_stats']['max']:,} ��")
print(f" �зǮt: {stats['price_stats']['std']:,} ��")
if 'price_distribution' in self.analysis_results:
print(f"\n? ��������:")
dist = self.analysis_results['price_distribution']
for range_name, count, percentage in zip(dist['ranges'], dist['counts'], dist['percentages']):
print(f" {range_name}: {count} �� ({percentage}%)")
print("\n" + "="*50)
if __name__ == "__main__":
# ���դ��R��
analyzer = RentalDataAnalyzer()
# ���J���
df = analyzer.load_data("output/rental_data.csv")
if df is not None:
# �M�~���
analyzer.clean_data()
# ���槹����R
results = analyzer.run_full_analysis()
# �x�s���G
analyzer.save_analysis_results()
# ��ܺK�n
analyzer.print_summary()