import os import json import uuid import time from pathlib import Path import numpy as np from datetime import datetime import random from dotenv import load_dotenv import boto3 import gradio as gr import soundfile as sf from werkzeug.security import generate_password_hash, check_password_hash from supabase import create_client, Client # =============================== # CONFIG & GLOBALS # =============================== import os os.system("pip uninstall -y gradio") os.system("pip install gradio==5.29.1") load_dotenv() BASE_DIR = Path(__file__).parent if "__file__" in globals() else Path(".").resolve() DATA_DIR = Path.home() / ".tts_dataset_creator" USERS_ROOT = DATA_DIR / "users" DATA_DIR.mkdir(parents=True, exist_ok=True) USERS_ROOT.mkdir(parents=True, exist_ok=True) AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY", "") AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY", "") S3_BUCKET = os.environ.get("S3_BUCKET", "voicer-storage") AWS_REGION = os.environ.get("AWS_REGION", "me-south-1") SUPABASE_URL = os.environ.get("SUPABASE_URL", "") SUPABASE_KEY = os.environ.get("SUPABASE_KEY", "") if not SUPABASE_URL or not SUPABASE_KEY: print("⚠️ Supabase env vars not set") supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL and SUPABASE_KEY else None def _create_s3_client(): aws_access_key = os.environ.get("AWS_ACCESS_KEY", "") aws_secret_key = os.environ.get("AWS_SECRET_KEY", "") if not aws_access_key or not aws_secret_key: print("Using IAM role or instance profile for S3") return boto3.client("s3", region_name=AWS_REGION) print("Using explicit access keys for S3") return boto3.client( "s3", aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=AWS_REGION, ) S3_CLIENT = _create_s3_client() # =============================== # COUNTRIES & DIALECTS # =============================== AVAILABLE_COUNTRIES = [ "Egypt", "Saudi Arabia", "Morocco" ] COUNTRY_EMOJIS = { "dz": "🇩🇿", # Algeria "bh": "🇧🇭", # Bahrain "eg": "🇪🇬", # Egypt "iq": "🇮🇶", # Iraq "jo": "🇯🇴", # Jordan "kw": "🇰🇼", # Kuwait "lb": "🇱🇧", # Lebanon "ly": "🇱🇾", # Libya "mr": "🇲🇷", # Mauritania "ma": "🇲🇦", # Morocco "om": "🇴🇲", # Oman "ps": "🇵🇸", # Palestine "qa": "🇶🇦", # Qatar "sa": "🇸🇦", # Saudi Arabia "so": "🇸🇴", # Somalia "sd": "🇸🇩", # Sudan "sy": "🇸🇾", # Syria "tn": "🇹🇳", # Tunisia "ae": "🇦🇪", # United Arab Emirates "ye": "🇾🇪", # Yemen } RECORDING_TARGET_MINUTES = 30 # target total recording time per user RECORDING_TARGET_SECONDS = RECORDING_TARGET_MINUTES * 60 COUNTRY_CODES = { "Algeria": "dz", "Bahrain": "bh", "Egypt": "eg", "Iraq": "iq", "Jordan": "jo", "Kuwait": "kw", "Lebanon": "lb", "Libya": "ly", "Mauritania": "mr", "Morocco": "ma", "Oman": "om", "Palestine": "ps", "Qatar": "qa", "Saudi Arabia": "sa", "Somalia": "so", "Sudan": "sd", "Syria": "sy", "Tunisia": "tn", "United Arab Emirates": "ae", "Yemen": "ye" } COUNTRY_DIALECTS = { "Saudi Arabia": { "حجازية": "hj", "حجازية بدوية": "hj-bd", "جنوبية": "jn", "تهامية": "th", "نجدية": "nj", "نجدية بدوية": "nj-bd", "قصيمية": "qm", "الشمال": "sh", "حساوية": "hs", "قطيفية": "qt", "سيهاتية": "sy", "أخرى": "oth" }, "Egypt": { "قاهرية": "ca", "إسكندرانية": "al", "صعيدية": "sa", "بورسعيدية": "si", "نوبية": "nb", "أخرى": "oth" }, "Morocco": { "فاسية": "fe", "دار البيضاء": "ca", "مراكشية": "ma", "شمالية": "no", "شرقية": "shar", "أخرى": "oth" }, "Iraq": { "بغدادية": "ba", "بصراوية": "bs", "موصلية": "mo", "كردية": "ku", "جنوبية": "so", "أخرى": "oth" }, "Yemen": { "صنعانية": "sa", "عدنية": "ad", "حضرمية": "ha", "تهامية": "ti", "أخرى": "oth" }, "Jordan": { "عمانية": "am", "شمالية": "no", "جنوبية": "so", "بدوية": "be", "أخرى": "oth" }, "Lebanon": { "بيروتية": "be", "جبلية": "mo", "جنوبية": "so", "شمالية": "no", "أخرى": "oth" }, "Syria": { "دمشقية": "da", "حلبية": "al", "حمصية": "ho", "ساحلية": "co", "أخرى": "oth" }, "Palestine": { "قدسية": "je", "غزاوية": "ga", "خليلية": "he", "شمالية": "no", "أخرى": "oth" }, "United Arab Emirates": { "إماراتية": "em", "دبية": "du", "أبوظبية": "ad", "شارقية": "shr", "أخرى": "oth" }, "Kuwait": { "كويتية": "ku", "بدوية": "be", "أخرى": "oth" }, "Qatar": { "قطرية": "qa", "بدوية": "be", "أخرى": "oth" }, "Bahrain": { "بحرينية": "ba", "مدنية": "ur", "أخرى": "oth" }, "Oman": { "عمانية": "om", "ظفارية": "dh", "داخلية": "in", "أخرى": "oth" }, "Algeria": { "جزائرية": "al", "قسنطينية": "co", "وهرانية": "or", "قبائلية": "ka", "أخرى": "oth" }, "Tunisia": { "تونسية": "tu", "صفاقسية": "sf", "سوسية": "so", "أخرى": "oth" }, "Libya": { "طرابلسية": "tr", "بنغازية": "be", "فزانية": "fe", "أخرى": "oth" }, "Sudan": { "خرطومية": "kh", "شمالية": "no", "دارفورية": "da", "أخرى": "oth" }, "Somalia": { "صومالية": "so", "شمالية": "no", "جنوبية": "so", "أخرى": "oth" }, "Mauritania": { "موريتانية": "mr", "حسانية": "ha", "أخرى": "oth" } } RECORDING_INSTRUCTIONS = """
### تعليمات التسجيل 1. **البيئة**: سجّل في مكان هادئ قد ما تقدر، وحاول ما يكون فيه ضوضاء أو أصوات في الخلفية. 2. **الميكروفون**: يفضّل تستخدم مايك سماعة أو مايك خارجي، لأنه غالبًا بيكون أوضح بكثير من مايك اللابتوب. في حالة استخدام الجوال يمكن فقط التأكد من جودة التسجيل قبل الإكمال. 3. **طريقة التحدث**: اقرأ الجملة بصوت واضح وطبيعي، وبلهجتك. لا تغيّر أو تستبدل أي كلمة أبدًا، إلا لو كان فيه اختلاف بالنطق مثل: "ثلاثة" و"تلاتة" — هذا عادي. إذا حسّيت إنك ما تبغى تسجل جملة معينة أو ما عرفت تنطقها، عادي اضغط "Skip". 4. **التعديل**: تقدر تعدل الجملة قبل لا تسجل إذا ودك. 5. **الحفظ**: بعد ما تسجل، اضغط "Save & Next" عشان تحفظ تسجيلك. إذا ودك تعيد، استخدم "Discard"، أو اضغط "Skip" عشان تروح للجملة اللي بعدها. 6. **المدة**: حاول تسجل عدد كافي من الجمل، كل تسجيل يساعدنا أكثر! حاول يكون مجموع تسجيلاتك على الأقل 30 دقيقة، ونقدّر وقتك وجهدك إذا عندك أي مشكلة أو استفسار، تواصل معي على الإيميل: a.a.elghawas@gmail.com
""" CONSENT_DETAILS = """

الموافقة على جمع واستخدام البيانات

هذه الاتفاقية بين المشارك وفريق البحث من جامعة الملك فهد للبترول والمعادن وجامعة طيبة (والتي سنشير إليها فيما يلي بـ "الجامعتين"). الهدف من الاتفاقية هو جمع واستخدام وتوزيع تسجيلات صوتية لدعم أبحاث كشف الأصوات المزيفة (Deepfake) وغيرها من الأبحاث غير التجارية.

  1. هدف جمع البيانات:
    يقوم الفريق بجمع تسجيلات صوتية لإنشاء مجموعة بيانات (Dataset) خاصة بالكشف عن الأصوات المصنعة بالذكاء الاصطناعي باستخدام تقنيات تحويل النص إلى صوت (TTS) أو تقليد الأصوات (Voice Conversion) وطرق أخرى. ستُستخدم هذه البيانات في أبحاث علمية وأكاديمية لتطوير طرق أفضل لاكتشاف الأصوات المزيفة وغيرها من الأبحاث غير التجارية.
  2. طبيعة البيانات التي سيتم جمعها:
    يوافق المشارك على تقديم:
    • تسجيلات صوتية بصوته الطبيعي أو من خلال نصوص/جمل يطلب منه قراءتها.
    • بيانات اختيارية مثل: النوع (ذكر/أنثى)، الفئة العمرية، اللهجة، وغيرها.
    • موافقة على إمكانية تعديل صوته أو تغييره باستخدام أساليب صناعية.
  3. الحقوق الممنوحة:
    يمنح المشارك الفريق الحق الكامل (بدون مقابل مالي أو قيود) في:
    • تسجيل ومعالجة واستخدام الصوت الطبيعي والنسخ المصنعة منه.
    • توزيع مجموعة البيانات (الطبيعية والمصنعة) للباحثين في المجتمع العلمي لأغراض بحثية غير تجارية فقط.
    • نشر عينات صوتية على منصات مهنية أو أكاديمية مثل LinkedIn، X/Twitter، YouTube لتعزيز الوعي بأبحاث الديب فيك أو للإعلان عن توفر البيانات.
  4. إتاحة البيانات:
    سيتم نشر المجموعة الصوتية (الطبيعية والمصنعة) بترخيص مفتوح (Creative Commons Attribution 4.0) مما يسمح لأي باحث باستخدامها ومشاركتها لأغراض أكاديمية غير تجارية.
  5. الخصوصية والسرية:
    • لن يتم نشر اسم المشارك أو أي بيانات شخصية مباشرة إلا بموافقته المكتوبة.
    • سيكون للمشارك معرف (ID) مجهول داخل مجموعة البيانات.
  6. المشاركة والانضمام:
    • المشاركة اختيارية 100٪.
    • للمشارك الحق في الانسحاب أو طلب حذف تسجيلاته قبل نشر مجموعة البيانات للعامة.
    • بعد النشر العام، سحب البيانات لن يكون ممكنًا بسبب طريقة توزيعها.
  7. التعويض:
    يدرك المشارك أن المشاركة لا تتضمن أي مقابل مادي، والمساهمة هنا لدعم وتطوير البحث العلمي فقط.
""" AGES = [ "4–9", # baby "10–14", # child "15–19", # teen "20–24", # young adult "25–34", # adult "35–44", # mid-age adult "45–54", # older adult "55–64", # senior "65–74", # elderly "75–84", # aged "85+" # very aged ] GENDER = [ "ذكر", "أنثى" ] def get_dialects_for_country(country: str): dialects = list(COUNTRY_DIALECTS.get(country, {}).keys()) if not dialects: return ["أخرى"] return dialects def split_dialect_code(dialect_code: str): dialect_code = (dialect_code or "").strip().lower() or "unk-gen" parts = dialect_code.split("-", 1) if len(parts) == 2: return parts[0], parts[1] return parts[0], "gen" # =============================== # SENTENCES (per-country, cached) # =============================== SENTENCES_CACHE = {} # {country_code: [(id, text, [dialects]), ...]} def get_sentences_file_for_country(country_code: str) -> Path: """ Return the path to the sentences file for a given country code, e.g. 'eg' -> BASE_DIR / 'sentences_eg.json'. """ return BASE_DIR / f"sentences_{country_code}.json" def load_sentences_for_country(country_code: str): """ Load and cache all sentences for a given country code. Expected JSON structure: { "sentences": [ { "unique_id": "105130", "text": "...", "dialect": ["eg-ca", "eg-al", ...] }, ... ] } """ if country_code in SENTENCES_CACHE: return SENTENCES_CACHE[country_code] path = get_sentences_file_for_country(country_code) # If missing, initialise an empty file (or you can raise an error if you prefer) if not path.exists(): path.write_text( json.dumps({"sentences": []}, ensure_ascii=False, indent=2), encoding="utf-8" ) data = json.loads(path.read_text(encoding="utf-8")) raw_sentences = data.get("sentences", []) SENTENCES_CACHE[country_code] = [ (s["unique_id"], s["text"], s.get("dialect", [])) for s in raw_sentences ] return SENTENCES_CACHE[country_code] def filter_sentences(dialect_code: str, completed_ids): """ Return all (sentence_id, text) pairs for a given dialect_code, excluding any sentence IDs in completed_ids. - dialect_code looks like 'sa-hj', 'eg-ca', etc. - We infer the country_code ('sa', 'eg', ...) from dialect_code, then load the corresponding sentences_{country_code}.json. """ completed_set = set(completed_ids or []) country_code, _ = split_dialect_code(dialect_code) all_sentences = load_sentences_for_country(country_code) return [ (sid, text) for sid, text, dialects in all_sentences if sid not in completed_set and dialect_code in dialects ] # =============================== # AUTH / SUPABASE # =============================== def get_user_by_email(email: str): if not supabase: return None try: resp = supabase.table("users").select("*").eq("email", email.lower()).execute() return resp.data[0] if resp.data else None except Exception as e: print("get_user_by_email error:", e) return None def get_user_by_username(username: str): if not supabase: return None try: resp = supabase.table("users").select("*").eq("username", username).execute() return resp.data[0] if resp.data else None except Exception as e: print("get_user_by_username error:", e) return None def create_user(name: str, email: str, password: str, country: str, dialect_label: str, gender: str, age: str): if not supabase: return False, "Supabase not configured" email = email.lower() if get_user_by_email(email): return False, "Email already registered" base = name.strip().replace(" ", "_").lower() or "user" country_code = COUNTRY_CODES.get(country, "unk") dialect_map = COUNTRY_DIALECTS.get(country, {}) dialect_code_raw = dialect_map.get(dialect_label, "oth") dialect_code = f"{country_code}-{dialect_code_raw}" username = f"{base}_{uuid.uuid4().hex[:7]}_{dialect_code}_{'m' if gender == 'ذكر' else 'f'}" hashed_pw = generate_password_hash(password) payload = { "username": username, "name": name, "email": email, "password": hashed_pw, "country": country, "dialect_code": dialect_code, "gender": gender, "age": age, "created_at": datetime.utcnow().isoformat(), } try: resp = supabase.table("users").insert(payload).execute() if resp.data: supabase.table("sessions").insert({ "username": username, "completed_sentences": [], "total_recording_duration": 0.0, "updated_at": datetime.utcnow().isoformat(), }).execute() return True, username return False, "Failed to insert user" except Exception as e: print("create_user error:", e) return False, f"Registration failed: {e}" def authenticate(email: str, password: str): if not supabase: return False, "Supabase not configured" user = get_user_by_email(email) if not user or not check_password_hash(user.get("password", ""), password): return False, "Invalid email or password" return True, user["username"] def create_password_reset_token(email: str): if not supabase: return False, "Supabase not configured" user = get_user_by_email(email) if not user: return False, "Email not found" token = uuid.uuid4().hex payload = { "email": email.lower(), "token": token, "created_at": datetime.utcnow().isoformat(), } try: supabase.table("password_resets").insert(payload).execute() return True, token except Exception as e: # nice clean message instead of raw dict print("create_password_reset_token error:", e) return False, "Password reset is not configured on the server (missing password_resets table)." def reset_password_with_token(token: str, new_password: str): if not supabase: return False, "Supabase not configured" try: resp = supabase.table("password_resets").select("*").eq("token", token).execute() rows = resp.data or [] if not rows: return False, "Invalid or expired token" row = rows[0] email = row["email"] user = get_user_by_email(email) if not user: return False, "User not found" hashed_pw = generate_password_hash(new_password) supabase.table("users").update({"password": hashed_pw}).eq("email", email).execute() supabase.table("password_resets").delete().eq("token", token).execute() return True, "Password updated successfully" except Exception as e: print("reset_password_with_token error:", e) return False, "Password reset is not fully configured on the server." def load_session(username: str): if not supabase: return {"completed_sentences": [], "total_recording_duration": 0.0} try: resp = supabase.table("sessions").select("*").eq("username", username).execute() if resp.data: row = resp.data[0] return { "completed_sentences": row.get("completed_sentences", []) or [], "total_recording_duration": float(row.get("total_recording_duration", 0.0) or 0.0), } except Exception as e: print("load_session error:", e) return {"completed_sentences": [], "total_recording_duration": 0.0} def save_session(username: str, completed_sentences, total_duration: float): if not supabase: return try: supabase.table("sessions").upsert({ "username": username, "completed_sentences": completed_sentences, "total_recording_duration": total_duration, "updated_at": datetime.utcnow().isoformat(), }).execute() except Exception as e: print("save_session error:", e) # =============================== # STORAGE / AUDIO # =============================== def ensure_user_dirs(username: str, dialect_code: str): country_code, dialect = split_dialect_code(dialect_code) user_dir = USERS_ROOT / country_code / dialect / username (user_dir / "wavs").mkdir(parents=True, exist_ok=True) (user_dir / "txt").mkdir(parents=True, exist_ok=True) return user_dir def validate_audio(audio_path: str): try: with sf.SoundFile(audio_path) as f: duration = len(f) / f.samplerate if f.samplerate < 16000: return False, f"Sample rate too low: {f.samplerate} Hz", duration if duration < 1.0: return False, "Recording too short", duration return True, "OK", duration except Exception as e: return False, f"Audio error: {e}", None def upload_file_to_s3(local_path: Path, s3_key: str): if not S3_CLIENT or not S3_BUCKET: print("S3 not configured, skipping upload:", s3_key) return False try: S3_CLIENT.upload_file(str(local_path), S3_BUCKET, s3_key) return True except Exception as e: print("upload_file_to_s3 error:", e) return False def save_recording_and_upload(username: str, dialect_code: str, sentence_id: str, sentence_text: str, audio_path: str): """ Local: ~/.tts_dataset_creator/users/{country}/{dialect}/{username}/wavs/{country}_{dialect}_{username}_{sentence}.wav S3 (country-level folder only): {country_code}/{username}/wavs/{country}_{dialect}_{username}_{sentence}.wav {country_code}/{username}/metadata.csv """ user_dir = ensure_user_dirs(username, dialect_code) wav_dir = user_dir / "wavs" meta_file = user_dir / "metadata.csv" if not meta_file.exists(): meta_file.write_text("audio_file|text\n", encoding="utf-8") country_code, dialect = split_dialect_code(dialect_code) filename = f"{username}_{sentence_id}.wav" dest = wav_dir / filename Path(audio_path).replace(dest) try: with sf.SoundFile(dest) as f: duration = len(f) / f.samplerate except Exception: duration = 0.0 with meta_file.open("a", encoding="utf-8") as f: f.write(f"{filename}|{sentence_text.strip()}\n") base_prefix = f"{country_code}/{username}" upload_file_to_s3(dest, f"{base_prefix}/wavs/{filename}") upload_file_to_s3(meta_file, f"{base_prefix}/metadata.csv") return duration def make_progress_bar(current_seconds: float, target_seconds: float, bar_length: int = 20) -> str: """ Text progress bar based on time. Example: [████████░░░░░░░░░░] 40.0% """ if target_seconds <= 0: bar = "░" * bar_length return f"[{bar}] 0.0%" ratio = current_seconds / target_seconds ratio = max(0.0, min(1.0, ratio)) # clamp 0–1 filled = int(bar_length * ratio) bar = "█" * filled + "░" * (bar_length - filled) return f"[{bar}] {ratio * 100:.1f}%" def compute_progress(completed_count: int, total_duration: float): """ Progress based on total recording time vs RECORDING_TARGET_SECONDS. """ bar = make_progress_bar(total_duration, RECORDING_TARGET_SECONDS) mins = int(total_duration // 60) secs = int(total_duration % 60) target_mins = int(RECORDING_TARGET_SECONDS // 60) # Example: # [██████░░░░░░░░░░░░] 30.0% # 10m 43s / 30m target • 294 sentences return f"{bar}\n{mins}m {secs}s / {target_mins}m target • {completed_count} sentences" # =============================== # GRADIO APP (3 PAGES) # =============================== def build_app(): with gr.Blocks(title="Arabic Speech Recorder") as demo: state = gr.State({ "logged_in": False, "username": None, "dialect_code": None, "completed_sentences": [], "total_duration": 0.0, "current_sentence_id": "", "current_sentence_text": "", }) gr.Markdown("""

🗣️ Arabic Speech Dataset Recorder | مسجّل مجموعة البيانات الصوتية العربية 🎤

منصة لجمع تسجيلات صوتية من مختلف اللهجات العربية لدعم البحث العلمي في كشف الأصوات المزيفة وتقنيات الذكاء الاصطناعي الصوتية.

""") # ---------- LOGIN PAGE ---------- with gr.Column(visible=True) as login_view: gr.Markdown("### تسحيل الدخول") login_email = gr.Textbox(label="Email") login_pw = gr.Textbox(label="Password", type="password") login_btn = gr.Button("تسجيل الدخول", variant="primary") login_msg = gr.Markdown("") goto_register_btn = gr.Button("إنشاء حساب جديد") with gr.Accordion("Forgot password?", open=False, visible=False): fp_email = gr.Textbox(label="Email") fp_btn = gr.Button("Create reset token") fp_output = gr.Markdown("") rp_token = gr.Textbox(label="Reset token") rp_new_pw = gr.Textbox(label="New password", type="password") rp_btn = gr.Button("Reset password") rp_output = gr.Markdown("") # ---------- REGISTER PAGE ---------- with gr.Column(visible=False) as register_view: gr.Markdown("### إنشاء حساب جديد") reg_name = gr.Textbox(label="Name (Latin)") reg_email = gr.Textbox(label="Email") reg_pw = gr.Textbox(label="Password", type="password") reg_country = gr.Dropdown(choices=AVAILABLE_COUNTRIES, value="Saudi Arabia", label="Country") default_dialects = get_dialects_for_country("Saudi Arabia") reg_dialect = gr.Dropdown( choices=default_dialects, value=None, # user must choose label="Dialect" ) reg_gender = gr.Dropdown( choices=GENDER, value=None, # user must choose label="Gender" ) reg_age = gr.Dropdown( choices=AGES, value=None, # user must choose label="Age Group" ) with gr.Accordion("إتفاقية التسجيل بالموقع واستخدام البيانات", open=True, visible=True): inst_output = gr.Markdown(CONSENT_DETAILS) reg_btn = gr.Button("إنشاء حساب", variant="primary") reg_msg = gr.Markdown("") back_to_login_btn = gr.Button("الرجوع لتسجيل الدخول") # ---------- MAIN PAGE ---------- with gr.Column(visible=False) as main_view: info = gr.Markdown("") logout_btn = gr.Button("تسجيل الخروج") with gr.Accordion("تعليمات مهمة للتسجيل", open=True, visible=True): rec_inst_output = gr.Markdown(RECORDING_INSTRUCTIONS) username_box = gr.Textbox(label="👤 Username", interactive=False, visible=False) progress_box = gr.Textbox(label="📊 الإنجاز", interactive=False) sentence_box = gr.Textbox(label="✍️الجملة (يمكنك تعديل الجملة)", interactive=True, lines=3) sentence_id_box = gr.Textbox(label="Sentence ID", interactive=False, visible=False) # 👇 give the audio component a stable DOM id audio_rec = gr.Audio( sources=["microphone"], type="filepath", label="Record", format="wav", ) temp_audio_path = gr.Textbox(label="Temp audio path", visible=False) save_btn = gr.Button("Save & Next", variant="primary", interactive=False) skip_btn = gr.Button("Skip") msg_box = gr.Markdown("") # ---------- Navigation helpers ---------- def show_register(): return ( gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), ) def show_login(): return ( gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ) def show_main(): return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), ) def on_start_recording(): """ Called when the user starts recording. We can use this to clear any previous temp audio path. """ return gr.update(interactive=False), gr.update(interactive=False) audio_rec.start_recording( fn=on_start_recording, outputs=[save_btn, skip_btn], ) def on_stop_recording(audio_path, st): """ Called when the user stops recording. For type="filepath", `audio_path` is a string path to the WAV on the server. """ if not audio_path: # nothing recorded return st, "", gr.update(value=None), gr.update(interactive=True), gr.update(interactive=True) # Store for later use if you want st["last_temp_audio_path"] = audio_path print("Stored temp audio at:", audio_path) time.sleep(1) # simulate processing delay / UX return ( st, audio_path, # -> temp_audio_path Textbox gr.update(value=audio_path), # set Audio value to that file (preview uses file) gr.update(interactive=True), # re-enable Save gr.update(interactive=True), # re-enable Skip ) audio_rec.stop_recording( fn=on_stop_recording, inputs=[audio_rec, state], outputs=[state, temp_audio_path, audio_rec, save_btn, skip_btn], ) def on_clear(): """ Called when the user clears the recording. We can use this to clear any previous temp audio path. """ return gr.update(interactive=False) audio_rec.clear( fn=on_clear, outputs=[save_btn], ) goto_register_btn.click( show_register, inputs=[], outputs=[login_view, register_view, main_view], ) back_to_login_btn.click( show_login, inputs=[], outputs=[login_view, register_view, main_view], ) # ---------- Register callbacks ---------- def update_dialects(country): dialects = get_dialects_for_country(country) # IMPORTANT FIX: don't try to set a default value; let user choose return gr.update(choices=dialects, value=None) reg_country.change( update_dialects, inputs=reg_country, outputs=reg_dialect ) def do_register(name, email, pw, country, dialect_label, gender, age, st): if not all([name, email, pw, country, dialect_label, gender, age]): return ( st, "❌ Please fill all fields", gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), ) ok, result = create_user(name, email, pw, country, dialect_label, gender, age) if not ok: return ( st, f"❌ {result}", gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), ) return ( st, "✅ Registered successfully. You can now login.", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ) reg_btn.click( do_register, inputs=[reg_name, reg_email, reg_pw, reg_country, reg_dialect, reg_gender, reg_age, state], outputs=[state, reg_msg, login_view, register_view, main_view], ) # ---------- Login + password reset ---------- def do_login(email, pw, st): ok, result = authenticate(email, pw) if not ok: return ( st, f"❌ {result}", "", "", "", "", "", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ) username = result user = get_user_by_username(username) dialect_code = user.get("dialect_code", "sa-hj") if user else "sa-hj" sess = load_session(username) completed = sess["completed_sentences"] total_dur = sess["total_recording_duration"] available = filter_sentences(dialect_code, completed) if not available: sentence_id = "" sentence_text = "No more sentences for your dialect." else: sentence_id, sentence_text = random.choice(available) st.update({ "logged_in": True, "username": username, "dialect_code": dialect_code, "completed_sentences": completed, "total_duration": total_dur, "current_sentence_id": sentence_id, "current_sentence_text": sentence_text, }) country = dialect_code.split("-", 1)[0] progress = compute_progress(len(completed), total_dur) username_show = " ".join(username.split("_")[:-3]).title() info_text = f"## **{username_show}** ({COUNTRY_EMOJIS[country]} {COUNTRY_EMOJIS[country]}) " return ( st, "", info_text, username, progress, sentence_text, sentence_id, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), ) login_btn.click( do_login, inputs=[login_email, login_pw, state], outputs=[ state, login_msg, info, username_box, progress_box, sentence_box, sentence_id_box, login_view, register_view, main_view, ], ) def do_forget_password(email): if not email: return "Please enter your email." ok, msg = create_password_reset_token(email) if not ok: return f"❌ {msg}" return f"✅ Reset token (dev mode): `{msg}`" fp_btn.click(do_forget_password, inputs=[fp_email], outputs=[fp_output]) def do_reset_password(token, new_pw): if not token or not new_pw: return "Please provide token and new password." ok, msg = reset_password_with_token(token, new_pw) return ("✅ " if ok else "❌ ") + msg rp_btn.click(do_reset_password, inputs=[rp_token, rp_new_pw], outputs=[rp_output]) # ---------- Main page logic ---------- def next_sentence_for_state(st): available = filter_sentences(st["dialect_code"], st["completed_sentences"]) if not available: st["current_sentence_id"] = "" st["current_sentence_text"] = "No more sentences." else: sid, text = random.choice(available) st["current_sentence_id"] = sid st["current_sentence_text"] = text def handle_save(audio_path, edited_sentence, temp_path, st): if not st.get("logged_in"): progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return st, "Please login first.", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None), gr.update(interactive=True) if not audio_path and not temp_path: progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return st, "⚠️ Record audio first.", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None), gr.update(interactive=True) sentence_text = (edited_sentence or st["current_sentence_text"]).strip() if not sentence_text: progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return st, "⚠️ Sentence text is empty.", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None), gr.update(interactive=True) sid = st["current_sentence_id"] if not sid: progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return st, "⚠️ No active sentence.", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None), gr.update(interactive=True) # Choose which filepath to use: # 1) Prefer current audio_rec value (audio_path) # 2) Fallback to temp_path from stop_recording tmp_path = audio_path or temp_path if not tmp_path: progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return st, "❌ Could not find recorded audio.", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None), gr.update(interactive=True) ok, msg, _dur = validate_audio(tmp_path) if not ok: progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return st, f"❌ Audio error: {msg}", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None), gr.update(interactive=True) duration = save_recording_and_upload( st["username"], st["dialect_code"], sid, sentence_text, tmp_path, ) st["total_duration"] += duration if sid not in st["completed_sentences"]: st["completed_sentences"].append(sid) save_session(st["username"], st["completed_sentences"], st["total_duration"]) next_sentence_for_state(st) progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return ( st, "✅ Saved", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None), # clear audio UI if you want gr.update(interactive=True), ) def disable_skip(): return gr.update(interactive=False) save_btn.click( disable_skip, inputs=[], outputs=[skip_btn], ).then( handle_save, inputs=[audio_rec, sentence_box, temp_audio_path, state], outputs=[state, msg_box, sentence_box, sentence_id_box, progress_box, audio_rec, skip_btn], ) def handle_skip(st): if not st.get("logged_in"): progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return st, "Please login first.", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None) , gr.update(interactive=True) sid = st["current_sentence_id"] if sid and sid not in st["completed_sentences"]: st["completed_sentences"].append(sid) save_session(st["username"], st["completed_sentences"], st["total_duration"]) next_sentence_for_state(st) progress = compute_progress(len(st["completed_sentences"]), st["total_duration"]) return st, "Skipped.", st["current_sentence_text"], st["current_sentence_id"], progress, gr.update(value=None), gr.update(interactive=True) def disable_save(): return gr.update(interactive=False) skip_btn.click( disable_save, inputs=[], outputs=[save_btn], ).then( handle_skip, inputs=[state], outputs=[state, msg_box, sentence_box, sentence_id_box, progress_box, audio_rec, save_btn], ) def do_logout(st): st.update({ "logged_in": False, "username": None, "dialect_code": None, "completed_sentences": [], "total_duration": 0.0, "current_sentence_id": "", "current_sentence_text": "", }) return ( st, "", "", "", "", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ) logout_btn.click( do_logout, inputs=[state], outputs=[ state, info, username_box, progress_box, msg_box, login_view, register_view, main_view, ], ) return demo if __name__ == "__main__": port = int(os.environ.get("GRADIO_SERVER_PORT", 7860)) app = build_app() app.queue() app.launch( server_name="0.0.0.0", server_port=port, debug=False, ) # ===============================