chheplo's picture
Update logger.py
5648fa8 verified
# ========== config ==========
LOG_FILE = "/data/conversation_logs.jsonl" # local, persists on Space VM
DATASET_REPO = "chheplo/ThinkingDhenu-US-Extension-Preview-chat-log" # private Dataset repo
DATASET_PATH = "logs/conversation_logs-4B.jsonl" # path inside repo
FLUSH_EVERY = 25 # push after this many new records
FLUSH_SECONDS = 180 # ...or after this many seconds, whichever comes first
# ============================
import json, os, time, threading, atexit
from datetime import datetime
from huggingface_hub import HfApi
api = HfApi()
_token = os.environ["HF_TOKEN"] # provided as Secret
_buf = [] # in-memory buffer
_last_push = time.time()
_lock = threading.Lock()
def _save_local(rec: dict):
"""Append to /data/... immediately so nothing is lost on crash."""
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
with open(LOG_FILE, "a", encoding="utf-8") as f:
json.dump(rec, f, ensure_ascii=False)
f.write("\n")
def _push_to_hub():
"""Upload the full JSONL to the private dataset (runs in a thread)."""
global _last_push
try:
api.upload_file(
path_or_fileobj=LOG_FILE,
path_in_repo=DATASET_PATH,
repo_id=DATASET_REPO,
repo_type="dataset",
token=_token,
commit_message=f"chat-log sync {datetime.utcnow():%Y-%m-%d %H:%M:%S} UTC",
) # atomic, replaces file
_last_push = time.time()
except Exception as e:
print("[logger] Hub push failed:", e, flush=True)
def _maybe_flush(force=False):
"""
Push in a background thread if threshold met.
We already wrote every line to LOG_FILE in _save_local,
so we do NOT append the buffer again—just clear it.
"""
global _last_push
if not (force or len(_buf) >= FLUSH_EVERY or time.time() - _last_push >= FLUSH_SECONDS):
return
_buf.clear() # nothing more to write
_last_push = time.time()
threading.Thread(target=_push_to_hub, daemon=True).start()
def log_qa(question: str, answer: str):
"""Call this *once* per completed reply (see previous message)."""
rec = {
"ts": datetime.utcnow().isoformat(timespec="seconds"),
"question": question,
"answer": answer,
}
line = json.dumps(rec, ensure_ascii=False) + "\n"
with _lock:
_buf.append(line) # fast, in-memory
_save_local(rec) # write-ahead for safety
_maybe_flush() # push if needed
# Ensure everything is uploaded when the Space shuts down
atexit.register(lambda: (_lock.acquire(), _maybe_flush(force=True), _lock.release()))