Spaces:
Sleeping
Sleeping
| import json | |
| import math | |
| import time | |
| import dateparser | |
| from datetime import datetime, timezone | |
| from importlib import import_module | |
| from typing import Any, Dict, Optional, Union | |
| from bs4 import BeautifulSoup | |
| from bs4.element import Comment | |
| from dateutil.relativedelta import relativedelta | |
| DATETIME_STRING_PATTERN = "%Y-%m-%dT%H:%M:%SZ" | |
| DEFAULT_LOOKUP_PERIOD = "1h" | |
| # Used from https://stackoverflow.com/a/52081812 and modified | |
| def flatten_dict( | |
| dictionary: Dict[str, Any], | |
| round_the_float: bool = True, | |
| float_round_format_str: str = ".2f", | |
| separator: str = "_", | |
| ) -> Dict[str, Any]: | |
| out: Dict[str, Any] = {} | |
| for key, val in dictionary.items(): | |
| if isinstance(val, dict): | |
| val = [val] | |
| if isinstance(val, list): | |
| for sub_dict in val: | |
| deeper = flatten_dict(sub_dict).items() | |
| out.update({key + separator + key2: val2 for key2, val2 in deeper}) | |
| elif isinstance(val, float) and round_the_float: | |
| out[key] = format(val, float_round_format_str) | |
| else: | |
| out[key] = val | |
| return out | |
| def obj_to_json(obj: Any, sort_keys: bool = False, indent: Optional[int] = None) -> Union[bytes, None]: | |
| if obj is None: | |
| return None | |
| return json.dumps( | |
| obj, | |
| default=datetime_handler, | |
| ensure_ascii=False, | |
| sort_keys=sort_keys, | |
| indent=indent, | |
| ).encode("utf8") | |
| def obj_to_markdown( | |
| obj: Any, | |
| level: int = 1, | |
| str_enclose_start: Optional[str] = None, | |
| str_enclose_end: Optional[str] = None, | |
| ) -> str: | |
| key_prefix = "*" * level | |
| markdowns = [] | |
| if is_collection(obj): | |
| add_key = True | |
| if hasattr(obj, "__dict__"): | |
| item_view = obj.__dict__.items() | |
| elif isinstance(obj, dict): | |
| item_view = obj.items() | |
| else: | |
| add_key = False | |
| item_view = enumerate(obj) | |
| for key, val in item_view: | |
| if add_key: | |
| header = f"{key_prefix} {key}" | |
| else: | |
| header = key_prefix | |
| if is_collection(val): | |
| child_markdown = obj_to_markdown( | |
| obj=val, | |
| level=level + 1, | |
| str_enclose_start=str_enclose_start, | |
| str_enclose_end=str_enclose_end, | |
| ) | |
| markdowns.append(f"{header}\n{child_markdown}") | |
| elif str_enclose_start is not None and isinstance(val, str): | |
| markdowns.append( | |
| f"{header}:\n{str_enclose_start}{val}{str_enclose_end}" | |
| ) | |
| else: | |
| markdowns.append(f"{header}: {val}") | |
| elif str_enclose_start is not None and isinstance(obj, str): | |
| markdowns.append(f"{key_prefix}:\n{str_enclose_start}{obj}{str_enclose_end}") | |
| else: | |
| markdowns.append(f"{key_prefix}: {obj}") | |
| return "\n".join(markdowns) | |
| def is_collection(obj: Any) -> bool: | |
| return isinstance(obj, (dict, list)) or hasattr(obj, "__dict__") | |
| # Copied from searchtweets-v2 and bit modified | |
| def convert_utc_time(datetime_str: str) -> datetime: | |
| """ | |
| Handles datetime argument conversion to the Labs API format, which is | |
| `YYYY-MM-DDTHH:mm:ssZ`. | |
| Flexible passing of date formats in the following types:: | |
| - YYYYmmDDHHMM | |
| - YYYY-mm-DD | |
| - YYYY-mm-DD HH:MM | |
| - YYYY-mm-DDTHH:MM | |
| - 2m (set start_time to two months ago) | |
| - 3d (set start_time to three days ago) | |
| - 12h (set start_time to twelve hours ago) | |
| - 15m (set start_time to fifteen minutes ago) | |
| Args: | |
| datetime_str (str): valid formats are listed above. | |
| Returns: | |
| string of ISO formatted date. | |
| """ | |
| try: | |
| if len(datetime_str) <= 5: | |
| _date = datetime.utcnow() | |
| # parse out numeric character. | |
| num = int(datetime_str[:-1]) | |
| if "d" in datetime_str: | |
| _date = _date + relativedelta(days=-num) | |
| elif "h" in datetime_str: | |
| _date = _date + relativedelta(hours=-num) | |
| elif "m" in datetime_str: | |
| _date = _date + relativedelta(minutes=-num) | |
| elif "M" in datetime_str: | |
| _date = _date + relativedelta(months=-num) | |
| elif "Y" in datetime_str: | |
| _date = _date + relativedelta(years=-num) | |
| elif not {"-", ":"} & set(datetime_str): | |
| _date = datetime.strptime(datetime_str, "%Y%m%d%H%M") | |
| elif "T" in datetime_str: | |
| _date = datetime.strptime(datetime_str, DATETIME_STRING_PATTERN) | |
| else: | |
| _date = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M") | |
| except ValueError: | |
| _date = datetime.strptime(datetime_str, "%Y-%m-%d") | |
| return _date.replace(tzinfo=timezone.utc) | |
| def convert_datetime_str_to_epoch(datetime_str: str) -> Optional[int]: | |
| parsed_datetime = dateparser.parse(datetime_str) | |
| if not parsed_datetime: | |
| return None | |
| unix_timestamp = time.mktime(parsed_datetime.timetuple()) | |
| return math.trunc(unix_timestamp) | |
| def tag_visible(element: Any) -> bool: | |
| if element.parent.name in [ | |
| "style", | |
| "script", | |
| "head", | |
| "title", | |
| "meta", | |
| "[document]", | |
| ]: | |
| return False | |
| if isinstance(element, Comment): | |
| return False | |
| return True | |
| def text_from_html(body: Union[str, bytes]) -> str: | |
| soup = BeautifulSoup(body, "html.parser") | |
| texts = soup.findAll(text=True) | |
| visible_texts = filter(tag_visible, texts) | |
| return " ".join(t.strip() for t in visible_texts) | |
| def dict_to_object( | |
| dictionary: Dict[str, Any], | |
| class_name_key: Optional[str] = "_target_", | |
| full_class_name: Optional[str] = None, | |
| ) -> Any: | |
| new_dict: Dict[str, Any] = dict() | |
| for k, v in dictionary.items(): | |
| if k == class_name_key: | |
| full_class_name = v | |
| elif isinstance(v, Dict): | |
| new_dict[k] = dict_to_object(dictionary=v, class_name_key=class_name_key) | |
| else: | |
| new_dict[k] = v | |
| if full_class_name is None: | |
| return new_dict | |
| module_name, class_name = tuple(full_class_name.rsplit(".", 1)) | |
| module = import_module(module_name) | |
| class_ref = getattr(module, class_name) | |
| return class_ref(**new_dict) | |
| def datetime_handler(x: Any) -> Optional[Any]: | |
| if x is None: | |
| return None | |
| elif isinstance(x, datetime): | |
| return x.isoformat() | |
| return vars(x) if hasattr(x, "__dict__") else x | |