Spaces:
Running
on
Zero
Running
on
Zero
| # flake8: noqa: F401, F403 | |
| import abc | |
| import argparse | |
| import copy as cp | |
| import csv | |
| import datetime | |
| import multiprocessing as mp | |
| import os | |
| import os.path as osp | |
| import random as rd | |
| import shutil | |
| import subprocess | |
| import warnings | |
| from collections import OrderedDict, defaultdict | |
| from multiprocessing import Pool, current_process | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import requests | |
| import seaborn as sns | |
| from huggingface_hub import scan_cache_dir | |
| from sty import bg, ef, fg, rs | |
| from tabulate import tabulate, tabulate_formats | |
| from tqdm import tqdm | |
| def process_punctuation(inText): | |
| import re | |
| outText = inText | |
| punct = [ | |
| ';', r'/', '[', ']', '"', '{', '}', '(', ')', '=', '+', '\\', '_', '-', | |
| '>', '<', '@', '`', ',', '?', '!' | |
| ] | |
| commaStrip = re.compile('(\d)(,)(\d)') # noqa: W605 | |
| periodStrip = re.compile('(?!<=\d)(\.)(?!\d)') # noqa: W605 | |
| for p in punct: | |
| if (p + ' ' in inText or ' ' + p in inText) or (re.search( | |
| commaStrip, inText) is not None): | |
| outText = outText.replace(p, '') | |
| else: | |
| outText = outText.replace(p, ' ') | |
| outText = periodStrip.sub('', outText, re.UNICODE) | |
| return outText | |
| def h2r(value): | |
| if value[0] == '#': | |
| value = value[1:] | |
| assert len(value) == 6 | |
| return tuple(int(value[i:i + 2], 16) for i in range(0, 6, 2)) | |
| def r2h(rgb): | |
| return '#%02x%02x%02x' % rgb | |
| def colored(s, color): | |
| if isinstance(color, str): | |
| if hasattr(fg, color): | |
| return getattr(fg, color) + s + fg.rs | |
| color = h2r(color) | |
| return fg(*color) + s + fg.rs | |
| def istype(s, type): | |
| if isinstance(s, type): | |
| return True | |
| try: | |
| return isinstance(eval(s), type) | |
| except Exception as _: | |
| return False | |
| def bincount(lst): | |
| bins = defaultdict(lambda: 0) | |
| for item in lst: | |
| bins[item] += 1 | |
| return bins | |
| def get_cache_path(repo_id): | |
| hf_cache_info = scan_cache_dir() | |
| repos = list(hf_cache_info.repos) | |
| repo = None | |
| for r in repos: | |
| if r.repo_id == repo_id: | |
| repo = r | |
| break | |
| if repo is None: | |
| return None | |
| revs = list(repo.revisions) | |
| rev2keep, last_modified = None, 0 | |
| for rev in revs: | |
| if rev.last_modified > last_modified: | |
| rev2keep, last_modified = rev, rev.last_modified | |
| if rev2keep is None: | |
| return None | |
| return str(rev2keep.snapshot_path) | |
| def proxy_set(s): | |
| import os | |
| for key in ['http_proxy', 'HTTP_PROXY', 'https_proxy', 'HTTPS_PROXY']: | |
| os.environ[key] = s | |
| def get_rank_and_world_size(): | |
| local_rank = int(os.environ.get("RANK", 0)) | |
| world_size = int(os.environ.get("WORLD_SIZE", 1)) | |
| return local_rank, world_size | |
| def get_local_rank_and_world_size(): | |
| local_rank = int(os.environ.get("LOCAL_RANK", 0)) | |
| world_size = int(os.environ.get("WORLD_SIZE", 1)) | |
| return local_rank, world_size | |
| def splitlen(s, sym='/'): | |
| return len(s.split(sym)) | |
| def listinstr(lst, s): | |
| assert isinstance(lst, list) | |
| for item in lst: | |
| if item in s: | |
| return True | |
| return False | |
| def d2df(D): | |
| return pd.DataFrame({x: [D[x]] for x in D}) | |
| def cn_string(s): | |
| import re | |
| if re.search(u'[\u4e00-\u9fff]', s): | |
| return True | |
| return False | |
| try: | |
| import decord | |
| except ImportError: | |
| pass | |
| def timestr(second=True, minute=False): | |
| s = datetime.datetime.now().strftime('%Y%m%d%H%M%S')[2:] | |
| if second: | |
| return s | |
| elif minute: | |
| return s[:-2] | |
| else: | |
| return s[:-4] | |
| def dict_merge(dct, merge_dct): | |
| for k, _ in merge_dct.items(): | |
| if (k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], dict)): # noqa | |
| dict_merge(dct[k], merge_dct[k]) | |
| else: | |
| dct[k] = merge_dct[k] | |
| def youtube_dl(idx): | |
| cmd = f'youtube-dl -f best -f mp4 "{idx}" -o {idx}.mp4' | |
| os.system(cmd) | |
| def run_command(cmd): | |
| if isinstance(cmd, str): | |
| cmd = cmd.split() | |
| return subprocess.check_output(cmd) | |