591Analyzer / scraper.py
54justin's picture
Upload 7 files
f205f47 verified
# �� Copilot �ͦ�
import requests
import time
import json
import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import re
from typing import List, Dict, Optional
class Rent591Scraper:
"""591����������O"""
def __init__(self):
self.base_url = "https://rent.591.com.tw"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
self.session = requests.Session()
self.session.headers.update(self.headers)
def setup_driver(self):
"""�]�mChrome WebDriver"""
chrome_options = Options()
chrome_options.add_argument('--headless') # �L�Y�Ҧ�
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument(f'--user-agent={self.headers["User-Agent"]}')
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
return driver
def get_csrf_token(self, driver):
"""���CSRF Token"""
try:
# �X�ݭ������token
driver.get("https://rent.591.com.tw/")
time.sleep(2)
# ���ձq����������token
token_element = driver.find_element(By.NAME, "csrf-token")
if token_element:
return token_element.get_attribute("content")
# �p�G�S���A���ձqcookies�����
cookies = driver.get_cookies()
for cookie in cookies:
if 'token' in cookie['name'].lower():
return cookie['value']
except Exception as e:
print(f"���token����: {e}")
return None
def scrape_rental_data(self, max_pages: int = 10) -> List[Dict]:
"""
��������
Args:
max_pages: �̤j��������
Returns:
���θ�ƦC��
"""
driver = self.setup_driver()
all_data = []
try:
# �ؼ�URL�Ѽ�
params = {
'region': '17', # ������
'section': '247', # ���s��
'kind': '1', # ��h���a
'layout': '2', # 2��
'shape': '2' # �q��j��
}
for page in range(1, max_pages + 1):
print(f"���b������ {page} ��...")
# �c��URL
params['page'] = page
url = f"{self.base_url}/list?" + "&".join([f"{k}={v}" for k, v in params.items()])
driver.get(url)
time.sleep(3) # ���ݭ������J
# �ˬd�O�_�����
rental_items = driver.find_elements(By.CSS_SELECTOR, '.rent-item')
if not rental_items:
print(f"�� {page} ���S������ơA�����")
break
page_data = self.parse_page_data(driver)
all_data.extend(page_data)
print(f"�� {page} ����� {len(page_data)} �����")
# �קK�Q��IP�A�[�J����
time.sleep(2)
except Exception as e:
print(f"������Ʈɵo�Ϳ��~: {e}")
finally:
driver.quit()
return all_data
def parse_page_data(self, driver) -> List[Dict]:
"""�ѪR�歶���"""
page_data = []
try:
# �������HTML
soup = BeautifulSoup(driver.page_source, 'html.parser')
# �d�䯲�ζ���
rental_items = soup.find_all('div', class_='rent-item')
for item in rental_items:
try:
rental_info = self.extract_rental_info(item)
if rental_info:
page_data.append(rental_info)
except Exception as e:
print(f"�ѪR�浧��Ʈɵo�Ϳ��~: {e}")
continue
except Exception as e:
print(f"�ѪR������Ʈɵo�Ϳ��~: {e}")
return page_data
def extract_rental_info(self, item) -> Optional[Dict]:
"""�����浧���θ�T"""
try:
# �򥻸�T
title_elem = item.find('h3', class_='rent-item-title')
title = title_elem.get_text(strip=True) if title_elem else "N/A"
# ����
price_elem = item.find('div', class_='rent-item-price')
price_text = price_elem.get_text(strip=True) if price_elem else "0"
price = self.extract_price(price_text)
# �a�}
address_elem = item.find('div', class_='rent-item-address')
address = address_elem.get_text(strip=True) if address_elem else "N/A"
# �ԲӸ�T
info_elem = item.find('div', class_='rent-item-info')
info_text = info_elem.get_text(strip=True) if info_elem else ""
# �����W�ơB�Ӽh����T
area = self.extract_area(info_text)
floor = self.extract_floor(info_text)
# �s��
link_elem = item.find('a')
link = self.base_url + link_elem.get('href') if link_elem and link_elem.get('href') else ""
return {
'title': title,
'price': price,
'address': address,
'area': area,
'floor': floor,
'link': link,
'raw_info': info_text
}
except Exception as e:
print(f"�������θ�T�ɵo�Ϳ��~: {e}")
return None
def extract_price(self, price_text: str) -> int:
"""���������Ʀr"""
try:
# �����D�Ʀr�r�šA��������
price_match = re.search(r'[\d,]+', price_text.replace(',', ''))
if price_match:
return int(price_match.group().replace(',', ''))
except:
pass
return 0
def extract_area(self, info_text: str) -> float:
"""�����W��"""
try:
area_match = re.search(r'(\d+(?:\.\d+)?)\s*�W', info_text)
if area_match:
return float(area_match.group(1))
except:
pass
return 0.0
def extract_floor(self, info_text: str) -> str:
"""�����Ӽh��T"""
try:
floor_match = re.search(r'(\d+)��', info_text)
if floor_match:
return floor_match.group(1) + '��'
except:
pass
return "N/A"
def save_data(self, data: List[Dict], filename: str = "rental_data.json"):
"""�x�s��ƨ��ɮ�"""
try:
with open(f"output/{filename}", 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"��Ƥw�x�s�� output/{filename}")
except Exception as e:
print(f"�x�s��Ʈɵo�Ϳ��~: {e}")
def to_dataframe(self, data: List[Dict]) -> pd.DataFrame:
"""�ഫ��DataFrame"""
return pd.DataFrame(data)
if __name__ == "__main__":
scraper = Rent591Scraper()
print("�}�l����591����...")
# �������
rental_data = scraper.scrape_rental_data(max_pages=5)
if rental_data:
print(f"�`�@������ {len(rental_data)} �����")
# �x�s��l���
scraper.save_data(rental_data)
# �ഫ��DataFrame���x�sCSV
df = scraper.to_dataframe(rental_data)
df.to_csv("output/rental_data.csv", index=False, encoding='utf-8-sig')
print("��ƪ��������I")
print(df.head())
else:
print("�S�������������")