Spaces:
Running
on
Zero
Running
on
Zero
| import csv | |
| import hashlib | |
| import json | |
| import os | |
| import os.path as osp | |
| import pickle | |
| import time | |
| import numpy as np | |
| import pandas as pd | |
| class NumpyEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, (np.int_, np.intc, np.intp, np.int8, | |
| np.int16, np.int32, np.int64, np.uint8, | |
| np.uint16, np.uint32, np.uint64)): | |
| return int(obj) | |
| elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): | |
| return float(obj) | |
| elif isinstance(obj, (np.complex_, np.complex64, np.complex128)): | |
| return {'real': obj.real, 'imag': obj.imag} | |
| elif isinstance(obj, (np.ndarray,)): | |
| return obj.tolist() | |
| elif isinstance(obj, (np.bool_)): | |
| return bool(obj) | |
| elif isinstance(obj, (np.void)): | |
| return None | |
| return json.JSONEncoder.default(self, obj) | |
| # LOAD & DUMP | |
| def dump(data, f, **kwargs): | |
| def dump_pkl(data, pth, **kwargs): | |
| pickle.dump(data, open(pth, 'wb')) | |
| def dump_json(data, pth, **kwargs): | |
| json.dump(data, open(pth, 'w'), indent=4, ensure_ascii=False, cls=NumpyEncoder) | |
| def dump_jsonl(data, f, **kwargs): | |
| lines = [json.dumps(x, ensure_ascii=False, cls=NumpyEncoder) for x in data] | |
| with open(f, 'w', encoding='utf8') as fout: | |
| fout.write('\n'.join(lines)) | |
| def dump_xlsx(data, f, **kwargs): | |
| data.to_excel(f, index=False, engine='xlsxwriter') | |
| def dump_csv(data, f, quoting=csv.QUOTE_ALL): | |
| data.to_csv(f, index=False, encoding='utf-8', quoting=quoting) | |
| def dump_tsv(data, f, quoting=csv.QUOTE_ALL): | |
| data.to_csv(f, sep='\t', index=False, encoding='utf-8', quoting=quoting) | |
| handlers = dict(pkl=dump_pkl, json=dump_json, jsonl=dump_jsonl, xlsx=dump_xlsx, csv=dump_csv, tsv=dump_tsv) | |
| suffix = f.split('.')[-1] | |
| return handlers[suffix](data, f, **kwargs) | |
| def load(f): | |
| def load_pkl(pth): | |
| return pickle.load(open(pth, 'rb')) | |
| def load_json(pth): | |
| return json.load(open(pth, 'r', encoding='utf-8')) | |
| def load_jsonl(f): | |
| lines = open(f, encoding='utf-8').readlines() | |
| lines = [x.strip() for x in lines] | |
| if lines[-1] == '': | |
| lines = lines[:-1] | |
| data = [json.loads(x) for x in lines] | |
| return data | |
| def load_xlsx(f): | |
| return pd.read_excel(f) | |
| def load_csv(f): | |
| return pd.read_csv(f) | |
| def load_tsv(f): | |
| return pd.read_csv(f, sep='\t') | |
| handlers = dict(pkl=load_pkl, json=load_json, jsonl=load_jsonl, xlsx=load_xlsx, csv=load_csv, tsv=load_tsv) | |
| suffix = f.split('.')[-1] | |
| return handlers[suffix](f) | |
| def download_file(url, filename=None): | |
| import urllib.request | |
| from tqdm import tqdm | |
| class DownloadProgressBar(tqdm): | |
| def update_to(self, b=1, bsize=1, tsize=None): | |
| if tsize is not None: | |
| self.total = tsize | |
| self.update(b * bsize - self.n) | |
| if filename is None: | |
| filename = url.split('/')[-1] | |
| with DownloadProgressBar(unit='B', unit_scale=True, | |
| miniters=1, desc=url.split('/')[-1]) as t: | |
| urllib.request.urlretrieve(url, filename=filename, reporthook=t.update_to) | |
| return filename | |
| def ls(dirname='.', match='', mode='all', level=1): | |
| if dirname == '.': | |
| ans = os.listdir(dirname) | |
| else: | |
| ans = [osp.join(dirname, x) for x in os.listdir(dirname)] | |
| assert mode in ['all', 'dir', 'file'] | |
| assert level >= 1 and isinstance(level, int) | |
| if level == 1: | |
| ans = [x for x in ans if match in x] | |
| if mode == 'dir': | |
| ans = [x for x in ans if osp.isdir(x)] | |
| elif mode == 'file': | |
| ans = [x for x in ans if not osp.isdir(x)] | |
| else: | |
| ans = [x for x in ans if osp.isdir(x)] | |
| res = [] | |
| for d in ans: | |
| res.extend(ls(d, match=match, mode=mode, level=level-1)) | |
| ans = res | |
| return ans | |
| def mrlines(fname, sp='\n'): | |
| f = open(fname).read().split(sp) | |
| while f != [] and f[-1] == '': | |
| f = f[:-1] | |
| return f | |
| def mwlines(lines, fname): | |
| with open(fname, 'w') as fout: | |
| fout.write('\n'.join(lines)) | |
| def md5(file_pth): | |
| with open(file_pth, 'rb') as f: | |
| hash = hashlib.new('md5') | |
| for chunk in iter(lambda: f.read(2**20), b''): | |
| hash.update(chunk) | |
| return str(hash.hexdigest()) | |
| def last_modified(pth): | |
| stamp = osp.getmtime(pth) | |
| m_ti = time.ctime(stamp) | |
| t_obj = time.strptime(m_ti) | |
| t = time.strftime('%Y%m%d%H%M%S', t_obj)[2:] | |
| return t | |