Spaces:
Sleeping
Sleeping
| # Code in this file is copied from https://github.com/egbertbouman/youtube-comment-downloader/blob/master/youtube_comment_downloader/downloader.py | |
| # and modified to fit the needs of this project. When code from youtube-comment-downloader was copied it was MIT licensed. | |
| # Code Commit: https://github.com/egbertbouman/youtube-comment-downloader/commit/9a15b8e3fbaebad660875409fb1bbe74db17f304 | |
| import json | |
| import logging | |
| import time | |
| import re | |
| from datetime import datetime, timezone | |
| import dateparser | |
| from typing import Optional, Any, List, Dict, Generator | |
| import requests | |
| from pydantic import BaseModel | |
| from requests import Session | |
| logger = logging.getLogger(__name__) | |
| class YouTubeCommentExtractor(BaseModel): | |
| _YT_URL: str = 'https://www.youtube.com' | |
| _YT_CFG_REGEX: str = r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;' | |
| _YT_INITIAL_DATA_REGEX: str = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+?})\s*;\s*(?:var\s+meta|</script|\n)' | |
| video_url: str | |
| user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36' | |
| sort_by: int = 1 # 0 = sort by popular, 1 = sort by recent | |
| max_comments: Optional[int] = 20 | |
| fetch_replies: bool = False | |
| lang_code: Optional[str] = None | |
| sleep_time: float = 0.1 | |
| request_retries: int = 5 | |
| def __init__(self, **kwargs: Any): | |
| super().__init__(**kwargs) | |
| if self.sort_by not in [0, 1]: | |
| raise ValueError('sort_by must be either 0 or 1') | |
| def _regex_search(text: str, pattern: str, group: int = 1) -> str: | |
| match = re.search(pattern, text) | |
| return match.group(group) if match else '' | |
| def _ajax_request(self, session: Session, endpoint: Dict[str, Any], ytcfg: Dict[str, Any]) -> Any: | |
| url = self._YT_URL + endpoint['commandMetadata']['webCommandMetadata']['apiUrl'] | |
| data = {'context': ytcfg['INNERTUBE_CONTEXT'], | |
| 'continuation': endpoint['continuationCommand']['token']} | |
| for _ in range(self.request_retries): | |
| response = session.post(url, params={'key': ytcfg['INNERTUBE_API_KEY']}, json=data) | |
| if response.status_code == 200: | |
| return response.json() | |
| if response.status_code in [403, 413]: | |
| return {} | |
| else: | |
| time.sleep(self.sleep_time) | |
| def _search_dict(partial: Any, search_key: str) -> Generator[Any, Any, None]: | |
| stack = [partial] | |
| while stack: | |
| current_item = stack.pop() | |
| if isinstance(current_item, dict): | |
| for key, value in current_item.items(): | |
| if key == search_key: | |
| yield value | |
| else: | |
| stack.append(value) | |
| elif isinstance(current_item, list): | |
| for value in current_item: | |
| stack.append(value) | |
| def _fetch_comments(self, until_datetime: Optional[datetime] = None) -> Generator[Any, Any, None]: | |
| session = requests.Session() | |
| session.headers['User-Agent'] = self.user_agent | |
| response = session.get(self.video_url) | |
| if response.request and response.request.url and 'uxe=' in response.request.url: | |
| session.cookies.set('CONSENT', 'YES+cb', domain='.youtube.com') # type: ignore[no-untyped-call] | |
| response = session.get(self.video_url) | |
| html = response.text | |
| ytcfg = json.loads(self._regex_search(html, self._YT_CFG_REGEX)) | |
| if not ytcfg: | |
| return # Unable to extract configuration | |
| if self.lang_code: | |
| ytcfg['INNERTUBE_CONTEXT']['client']['hl'] = self.lang_code | |
| data = json.loads(self._regex_search(html, self._YT_INITIAL_DATA_REGEX)) | |
| section = next(self._search_dict(data, 'itemSectionRenderer'), None) | |
| renderer = next(self._search_dict(section, 'continuationItemRenderer'), None) if section else None | |
| if not renderer: | |
| # Comments disabled? | |
| return | |
| needs_sorting = self.sort_by != 0 | |
| continuations = [renderer['continuationEndpoint']] | |
| while continuations: | |
| continuation = continuations.pop() | |
| response = self._ajax_request(session, continuation, ytcfg) | |
| if not response: | |
| break | |
| if list(self._search_dict(response, 'externalErrorMessage')): | |
| logger.warning('Error returned from server: %s', next(self._search_dict(response, 'externalErrorMessage'))) | |
| return | |
| if needs_sorting: | |
| sub_menu: Dict[str, Any] = next(self._search_dict(response, 'sortFilterSubMenuRenderer'), {}) | |
| sort_menu = sub_menu.get('subMenuItems', []) | |
| if self.sort_by < len(sort_menu): | |
| continuations = [sort_menu[self.sort_by]['serviceEndpoint']] | |
| needs_sorting = False | |
| continue | |
| # TODO: Fix it. Causing observer to fail silently\ | |
| logger.warning("Unable to set sorting") | |
| # raise RuntimeError('Failed to set sorting') | |
| actions = list(self._search_dict(response, 'reloadContinuationItemsCommand')) + \ | |
| list(self._search_dict(response, 'appendContinuationItemsAction')) | |
| for action in actions: | |
| for item in action.get('continuationItems', []): | |
| if action['targetId'] == 'comments-section': | |
| # Process continuations for comments and replies. | |
| continuations[:0] = [ep for ep in self._search_dict(item, 'continuationEndpoint')] | |
| if self.fetch_replies: | |
| # TODO: Fix it. This functionality is broken | |
| if action['targetId'].startswith('comment-replies-item') and 'continuationItemRenderer' in item: | |
| # Process the 'Show more replies' button | |
| continuations.append(next(self._search_dict(item, 'buttonRenderer'))['command']) | |
| for comment in reversed(list(self._search_dict(response, 'commentRenderer'))): | |
| if not self.fetch_replies and "." in comment['commentId']: | |
| continue | |
| comment_time_string = comment['publishedTimeText']['runs'][0]['text'] | |
| comment_time_string = comment_time_string or '' | |
| comment_time = dateparser.parse( | |
| comment_time_string.split('(edited)', 1)[0].strip(), | |
| ) | |
| if comment_time: | |
| comment_time = comment_time.replace(tzinfo=timezone.utc) | |
| if until_datetime and until_datetime > comment_time: | |
| return | |
| yield {'comment_id': comment['commentId'], | |
| 'text': ''.join([c['text'] for c in comment['contentText'].get('runs', [])]), | |
| 'time': comment_time, | |
| 'author': comment.get('authorText', {}).get('simpleText', ''), | |
| 'channel': comment['authorEndpoint']['browseEndpoint'].get('browseId', ''), | |
| 'votes': comment.get('voteCount', {}).get('simpleText', '0'), | |
| 'photo': comment['authorThumbnail']['thumbnails'][-1]['url'], | |
| 'heart': next(self._search_dict(comment, 'isHearted'), False)} | |
| time.sleep(self.sleep_time) | |
| def fetch_comments(self, until_datetime: Optional[datetime] = None) -> List[Dict[str, Any]]: | |
| comments: List[Dict[str, Any]] = [] | |
| for comment in self._fetch_comments(until_datetime=until_datetime): | |
| comments.append(comment) | |
| if self.max_comments and self.max_comments == len(comments): | |
| break | |
| return comments | |