|
|
from __future__ import annotations
|
|
|
|
|
|
import os
|
|
|
import shlex
|
|
|
import subprocess
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, Any, List, Tuple, Optional
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_audio_ffmpeg(video_path: str, audio_out: Path, sr: int = 16000, mono: bool = True) -> str:
|
|
|
audio_out.parent.mkdir(parents=True, exist_ok=True)
|
|
|
cmd = f'ffmpeg -y -i "{video_path}" -vn {"-ac 1" if mono else ""} -ar {sr} -f wav "{audio_out}"'
|
|
|
subprocess.run(shlex.split(cmd), check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
return str(audio_out)
|
|
|
|
|
|
|
|
|
def _get_video_duration_seconds(video_path: str) -> float:
|
|
|
try:
|
|
|
|
|
|
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=duration -of default=nw=1 "{video_path}"'
|
|
|
out = subprocess.check_output(shlex.split(cmd), stderr=subprocess.DEVNULL).decode("utf-8", errors="ignore")
|
|
|
for line in out.splitlines():
|
|
|
if line.startswith("duration="):
|
|
|
try:
|
|
|
return float(line.split("=", 1)[1])
|
|
|
except Exception:
|
|
|
pass
|
|
|
except Exception:
|
|
|
pass
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
def diarize_audio(wav_path: str, base_dir: Path, hf_token_env: str | None = None) -> Tuple[List[Dict[str, Any]], List[str]]:
|
|
|
"""Returns segments [{'start','end','speaker'}] and dummy clip_paths (not used in MVP)."""
|
|
|
segments: List[Dict[str, Any]] = []
|
|
|
clip_paths: List[str] = []
|
|
|
|
|
|
token = os.getenv("PYANNOTE_TOKEN") or (os.getenv(hf_token_env) if hf_token_env else os.getenv("HF_TOKEN"))
|
|
|
try:
|
|
|
if token:
|
|
|
from pyannote.audio import Pipeline
|
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=token)
|
|
|
diarization = pipeline(wav_path)
|
|
|
|
|
|
|
|
|
for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
|
|
|
segments.append({
|
|
|
"start": float(getattr(turn, "start", 0.0) or 0.0),
|
|
|
"end": float(getattr(turn, "end", 0.0) or 0.0),
|
|
|
"speaker": str(speaker) if speaker is not None else f"SPEAKER_{i:02d}",
|
|
|
})
|
|
|
else:
|
|
|
|
|
|
|
|
|
|
|
|
segments.append({"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"})
|
|
|
except Exception:
|
|
|
|
|
|
segments.append({"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"})
|
|
|
|
|
|
segments = sorted(segments, key=lambda s: s.get("start", 0.0))
|
|
|
return segments, clip_paths
|
|
|
|
|
|
|
|
|
def _fmt_srt_time(seconds: float) -> str:
|
|
|
h = int(seconds // 3600)
|
|
|
m = int((seconds % 3600) // 60)
|
|
|
s = int(seconds % 60)
|
|
|
ms = int(round((seconds - int(seconds)) * 1000))
|
|
|
return f"{h:02}:{m:02}:{s:02},{ms:03}"
|
|
|
|
|
|
|
|
|
def _generate_srt(segments: List[Dict[str, Any]], texts: List[str]) -> str:
|
|
|
n = min(len(segments), len(texts))
|
|
|
lines: List[str] = []
|
|
|
for i in range(n):
|
|
|
seg = segments[i]
|
|
|
text = (texts[i] or "").strip()
|
|
|
start = float(seg.get("start", 0.0))
|
|
|
end = float(seg.get("end", max(start + 2.0, start)))
|
|
|
speaker = seg.get("speaker")
|
|
|
if speaker:
|
|
|
text = f"[{speaker}]: {text}" if text else f"[{speaker}]"
|
|
|
lines.append(str(i + 1))
|
|
|
lines.append(f"{_fmt_srt_time(start)} --> {_fmt_srt_time(end)}")
|
|
|
lines.append(text)
|
|
|
lines.append("")
|
|
|
return "\n".join(lines).strip() + "\n"
|
|
|
|
|
|
|
|
|
def asr_transcribe_wav_simple(wav_path: str) -> str:
|
|
|
"""Very robust ASR stub: try faster-whisper small if present; otherwise return empty text.
|
|
|
Intended for MVP in Spaces without heavy GPU. """
|
|
|
try:
|
|
|
from faster_whisper import WhisperModel
|
|
|
model = WhisperModel("Systran/faster-whisper-small", device="cpu")
|
|
|
|
|
|
segments, info = model.transcribe(wav_path, vad_filter=True, without_timestamps=True, language=None)
|
|
|
text = " ".join(seg.text.strip() for seg in segments if getattr(seg, "text", None))
|
|
|
return text.strip()
|
|
|
except Exception:
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
def generate(video_path: str, out_dir: Path) -> Dict[str, Any]:
|
|
|
"""End-to-end MVP that returns {'une_srt','free_text','artifacts':{...}}."""
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
wav_path = extract_audio_ffmpeg(video_path, out_dir / f"{Path(video_path).stem}.wav")
|
|
|
|
|
|
|
|
|
segments, _ = diarize_audio(wav_path, out_dir, hf_token_env="HF_TOKEN")
|
|
|
|
|
|
|
|
|
free_text = asr_transcribe_wav_simple(wav_path)
|
|
|
|
|
|
|
|
|
if not segments:
|
|
|
segments = [{"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"}]
|
|
|
texts: List[str] = []
|
|
|
if len(segments) <= 1:
|
|
|
texts = [free_text]
|
|
|
else:
|
|
|
|
|
|
words = free_text.split()
|
|
|
chunk = max(1, len(words) // len(segments))
|
|
|
for i in range(len(segments)):
|
|
|
start_idx = i * chunk
|
|
|
end_idx = (i + 1) * chunk if i < len(segments) - 1 else len(words)
|
|
|
texts.append(" ".join(words[start_idx:end_idx]))
|
|
|
|
|
|
une_srt = _generate_srt(segments, texts)
|
|
|
|
|
|
return {
|
|
|
"une_srt": une_srt,
|
|
|
"free_text": free_text,
|
|
|
"artifacts": {
|
|
|
"wav_path": str(wav_path),
|
|
|
},
|
|
|
}
|
|
|
|