# ========== config ========== LOG_FILE = "/data/conversation_logs.jsonl" # local, persists on Space VM DATASET_REPO = "chheplo/ThinkingDhenu-US-Extension-Preview-chat-log" # private Dataset repo DATASET_PATH = "logs/conversation_logs-4B.jsonl" # path inside repo FLUSH_EVERY = 25 # push after this many new records FLUSH_SECONDS = 180 # ...or after this many seconds, whichever comes first # ============================ import json, os, time, threading, atexit from datetime import datetime from huggingface_hub import HfApi api = HfApi() _token = os.environ["HF_TOKEN"] # provided as Secret _buf = [] # in-memory buffer _last_push = time.time() _lock = threading.Lock() def _save_local(rec: dict): """Append to /data/... immediately so nothing is lost on crash.""" os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) with open(LOG_FILE, "a", encoding="utf-8") as f: json.dump(rec, f, ensure_ascii=False) f.write("\n") def _push_to_hub(): """Upload the full JSONL to the private dataset (runs in a thread).""" global _last_push try: api.upload_file( path_or_fileobj=LOG_FILE, path_in_repo=DATASET_PATH, repo_id=DATASET_REPO, repo_type="dataset", token=_token, commit_message=f"chat-log sync {datetime.utcnow():%Y-%m-%d %H:%M:%S} UTC", ) # atomic, replaces file _last_push = time.time() except Exception as e: print("[logger] Hub push failed:", e, flush=True) def _maybe_flush(force=False): """ Push in a background thread if threshold met. We already wrote every line to LOG_FILE in _save_local, so we do NOT append the buffer again—just clear it. """ global _last_push if not (force or len(_buf) >= FLUSH_EVERY or time.time() - _last_push >= FLUSH_SECONDS): return _buf.clear() # nothing more to write _last_push = time.time() threading.Thread(target=_push_to_hub, daemon=True).start() def log_qa(question: str, answer: str): """Call this *once* per completed reply (see previous message).""" rec = { "ts": datetime.utcnow().isoformat(timespec="seconds"), "question": question, "answer": answer, } line = json.dumps(rec, ensure_ascii=False) + "\n" with _lock: _buf.append(line) # fast, in-memory _save_local(rec) # write-ahead for safety _maybe_flush() # push if needed # Ensure everything is uploaded when the Space shuts down atexit.register(lambda: (_lock.acquire(), _maybe_flush(force=True), _lock.release()))