|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
import shlex |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from typing import Dict, Any, List, Tuple, Optional |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_audio_ffmpeg(video_path: str, audio_out: Path, sr: int = 16000, mono: bool = True) -> str: |
|
|
audio_out.parent.mkdir(parents=True, exist_ok=True) |
|
|
cmd = f'ffmpeg -y -i "{video_path}" -vn {"-ac 1" if mono else ""} -ar {sr} -f wav "{audio_out}"' |
|
|
subprocess.run(shlex.split(cmd), check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
return str(audio_out) |
|
|
|
|
|
|
|
|
def _get_video_duration_seconds(video_path: str) -> float: |
|
|
try: |
|
|
|
|
|
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=duration -of default=nw=1 "{video_path}"' |
|
|
out = subprocess.check_output(shlex.split(cmd), stderr=subprocess.DEVNULL).decode("utf-8", errors="ignore") |
|
|
for line in out.splitlines(): |
|
|
if line.startswith("duration="): |
|
|
try: |
|
|
return float(line.split("=", 1)[1]) |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def diarize_audio(wav_path: str, base_dir: Path, hf_token_env: str | None = None) -> Tuple[List[Dict[str, Any]], List[str]]: |
|
|
"""Returns segments [{'start','end','speaker'}] and dummy clip_paths (not used in MVP).""" |
|
|
segments: List[Dict[str, Any]] = [] |
|
|
clip_paths: List[str] = [] |
|
|
|
|
|
token = os.getenv("PYANNOTE_TOKEN") or (os.getenv(hf_token_env) if hf_token_env else os.getenv("HF_TOKEN")) |
|
|
try: |
|
|
if token: |
|
|
from pyannote.audio import Pipeline |
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=token) |
|
|
diarization = pipeline(wav_path) |
|
|
|
|
|
|
|
|
for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True)): |
|
|
segments.append({ |
|
|
"start": float(getattr(turn, "start", 0.0) or 0.0), |
|
|
"end": float(getattr(turn, "end", 0.0) or 0.0), |
|
|
"speaker": str(speaker) if speaker is not None else f"SPEAKER_{i:02d}", |
|
|
}) |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
segments.append({"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"}) |
|
|
except Exception: |
|
|
|
|
|
segments.append({"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"}) |
|
|
|
|
|
segments = sorted(segments, key=lambda s: s.get("start", 0.0)) |
|
|
return segments, clip_paths |
|
|
|
|
|
|
|
|
def _fmt_srt_time(seconds: float) -> str: |
|
|
h = int(seconds // 3600) |
|
|
m = int((seconds % 3600) // 60) |
|
|
s = int(seconds % 60) |
|
|
ms = int(round((seconds - int(seconds)) * 1000)) |
|
|
return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
|
|
|
|
|
|
|
def _generate_srt(segments: List[Dict[str, Any]], texts: List[str]) -> str: |
|
|
n = min(len(segments), len(texts)) |
|
|
lines: List[str] = [] |
|
|
for i in range(n): |
|
|
seg = segments[i] |
|
|
text = (texts[i] or "").strip() |
|
|
start = float(seg.get("start", 0.0)) |
|
|
end = float(seg.get("end", max(start + 2.0, start))) |
|
|
speaker = seg.get("speaker") |
|
|
if speaker: |
|
|
text = f"[{speaker}]: {text}" if text else f"[{speaker}]" |
|
|
lines.append(str(i + 1)) |
|
|
lines.append(f"{_fmt_srt_time(start)} --> {_fmt_srt_time(end)}") |
|
|
lines.append(text) |
|
|
lines.append("") |
|
|
return "\n".join(lines).strip() + "\n" |
|
|
|
|
|
|
|
|
def asr_transcribe_wav_simple(wav_path: str) -> str: |
|
|
"""Very robust ASR stub: try faster-whisper small if present; otherwise return empty text. |
|
|
Intended for MVP in Spaces without heavy GPU. """ |
|
|
try: |
|
|
from faster_whisper import WhisperModel |
|
|
model = WhisperModel("Systran/faster-whisper-small", device="cpu") |
|
|
|
|
|
segments, info = model.transcribe(wav_path, vad_filter=True, without_timestamps=True, language=None) |
|
|
text = " ".join(seg.text.strip() for seg in segments if getattr(seg, "text", None)) |
|
|
return text.strip() |
|
|
except Exception: |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
def generate(video_path: str, out_dir: Path) -> Dict[str, Any]: |
|
|
"""End-to-end MVP that returns {'une_srt','free_text','artifacts':{...}}.""" |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
wav_path = extract_audio_ffmpeg(video_path, out_dir / f"{Path(video_path).stem}.wav") |
|
|
|
|
|
|
|
|
segments, _ = diarize_audio(wav_path, out_dir, hf_token_env="HF_TOKEN") |
|
|
|
|
|
|
|
|
free_text = asr_transcribe_wav_simple(wav_path) |
|
|
|
|
|
|
|
|
if not segments: |
|
|
segments = [{"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"}] |
|
|
texts: List[str] = [] |
|
|
if len(segments) <= 1: |
|
|
texts = [free_text] |
|
|
else: |
|
|
|
|
|
words = free_text.split() |
|
|
chunk = max(1, len(words) // len(segments)) |
|
|
for i in range(len(segments)): |
|
|
start_idx = i * chunk |
|
|
end_idx = (i + 1) * chunk if i < len(segments) - 1 else len(words) |
|
|
texts.append(" ".join(words[start_idx:end_idx])) |
|
|
|
|
|
une_srt = _generate_srt(segments, texts) |
|
|
|
|
|
return { |
|
|
"une_srt": une_srt, |
|
|
"free_text": free_text, |
|
|
"artifacts": { |
|
|
"wav_path": str(wav_path), |
|
|
}, |
|
|
} |
|
|
|