|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import json
|
|
|
import logging
|
|
|
import math
|
|
|
import os
|
|
|
import shlex
|
|
|
import subprocess
|
|
|
from pathlib import Path
|
|
|
from typing import List, Dict, Any, Tuple, Optional
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
try:
|
|
|
import torch
|
|
|
import torchaudio as ta
|
|
|
import torchaudio.transforms as T
|
|
|
HAS_TORCHAUDIO = True
|
|
|
|
|
|
except Exception:
|
|
|
HAS_TORCHAUDIO = False
|
|
|
ta = None
|
|
|
|
|
|
import soundfile as sf
|
|
|
|
|
|
|
|
|
try:
|
|
|
from pyannote.audio import Pipeline
|
|
|
HAS_PYANNOTE = True
|
|
|
except Exception:
|
|
|
Pipeline = None
|
|
|
HAS_PYANNOTE = False
|
|
|
|
|
|
|
|
|
from speechbrain.inference.speaker import SpeakerRecognition
|
|
|
|
|
|
|
|
|
from sklearn.cluster import KMeans
|
|
|
from sklearn.metrics import silhouette_score
|
|
|
|
|
|
|
|
|
from llm_router import load_yaml, LLMRouter
|
|
|
|
|
|
|
|
|
log = logging.getLogger("audio_tools")
|
|
|
if not log.handlers:
|
|
|
_h = logging.StreamHandler()
|
|
|
_h.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
|
|
|
log.addHandler(_h)
|
|
|
log.setLevel(logging.INFO)
|
|
|
|
|
|
|
|
|
|
|
|
def load_wav(path: str | Path, sr: int = 16000):
|
|
|
"""Load audio as mono float32 at the requested sample rate."""
|
|
|
if HAS_TORCHAUDIO:
|
|
|
wav, in_sr = ta.load(str(path))
|
|
|
if in_sr != sr:
|
|
|
wav = ta.functional.resample(wav, in_sr, sr)
|
|
|
if wav.dim() > 1:
|
|
|
wav = wav.mean(dim=0, keepdim=True)
|
|
|
return wav.squeeze(0).numpy(), sr
|
|
|
import librosa
|
|
|
y, in_sr = sf.read(str(path), dtype="float32", always_2d=False)
|
|
|
if y.ndim > 1:
|
|
|
y = y.mean(axis=1)
|
|
|
if in_sr != sr:
|
|
|
y = librosa.resample(y, orig_sr=in_sr, target_sr=sr)
|
|
|
return y.astype(np.float32), sr
|
|
|
|
|
|
def save_wav(path: str | Path, y, sr: int = 16000):
|
|
|
"""Save mono float32 wav."""
|
|
|
if HAS_TORCHAUDIO:
|
|
|
import torch
|
|
|
wav = torch.from_numpy(np.asarray(y, dtype=np.float32)).unsqueeze(0)
|
|
|
ta.save(str(path), wav, sr)
|
|
|
else:
|
|
|
sf.write(str(path), np.asarray(y, dtype=np.float32), sr)
|
|
|
|
|
|
def extract_audio_ffmpeg(
|
|
|
video_path: str,
|
|
|
audio_out: Path,
|
|
|
sr: int = 16000,
|
|
|
mono: bool = True,
|
|
|
) -> str:
|
|
|
"""Extract audio from video to WAV using ffmpeg."""
|
|
|
audio_out.parent.mkdir(parents=True, exist_ok=True)
|
|
|
cmd = f'ffmpeg -y -i "{video_path}" -vn {"-ac 1" if mono else ""} -ar {sr} -f wav "{audio_out}"'
|
|
|
subprocess.run(
|
|
|
shlex.split(cmd),
|
|
|
check=True,
|
|
|
stdout=subprocess.DEVNULL,
|
|
|
stderr=subprocess.DEVNULL,
|
|
|
)
|
|
|
return str(audio_out)
|
|
|
|
|
|
|
|
|
|
|
|
def transcribe_audio_remote(audio_path: str | Path, cfg: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Send the audio file to the remote ASR Space `veureu/asr` (Gradio or HTTP).
|
|
|
The remote model is 'faster-whisper-large-v3-ca-3catparla' (Aina).
|
|
|
Returns standardized dict: {'text': str, 'segments': list?}
|
|
|
"""
|
|
|
if not cfg:
|
|
|
cfg = load_yaml("config.yaml")
|
|
|
router = LLMRouter(cfg)
|
|
|
model_name = (cfg.get("models", {}).get("asr") or "whisper-catalan")
|
|
|
params = {
|
|
|
"language": "ca",
|
|
|
|
|
|
"timestamps": True,
|
|
|
"diarization": False,
|
|
|
}
|
|
|
try:
|
|
|
result = router.asr_transcribe(str(audio_path), model=model_name, **params)
|
|
|
except Exception as e:
|
|
|
try:
|
|
|
import httpx
|
|
|
if isinstance(e, httpx.ReadTimeout):
|
|
|
log.warning(f"ASR timeout for {audio_path}: {e}")
|
|
|
return {"text": "", "segments": []}
|
|
|
except Exception:
|
|
|
pass
|
|
|
log.warning(f"ASR error for {audio_path}: {e}")
|
|
|
return {"text": "", "segments": []}
|
|
|
|
|
|
if isinstance(result, str):
|
|
|
return {"text": result, "segments": []}
|
|
|
if isinstance(result, dict):
|
|
|
if "text" not in result and "transcription" in result:
|
|
|
result["text"] = result["transcription"]
|
|
|
result.setdefault("segments", [])
|
|
|
return result
|
|
|
return {"text": str(result), "segments": []}
|
|
|
|
|
|
|
|
|
|
|
|
def diarize_audio_silence_based(
|
|
|
wav_path: str,
|
|
|
base_dir: Path,
|
|
|
clips_folder: str = "clips",
|
|
|
min_segment_duration: float = 20.0,
|
|
|
max_segment_duration: float = 50.0,
|
|
|
silence_thresh: int = -40,
|
|
|
min_silence_len: int = 500,
|
|
|
) -> Tuple[List[str], List[Dict[str, Any]], Dict[str, Any], List[Dict[str, Any]]]:
|
|
|
"""Segmentation based on silence detection (alternative to pyannote).
|
|
|
Returns (clip_paths, segments, info, connection_logs) in same format as diarize_audio.
|
|
|
"""
|
|
|
from pydub import AudioSegment
|
|
|
from pydub.silence import detect_nonsilent
|
|
|
|
|
|
audio = AudioSegment.from_wav(wav_path)
|
|
|
duration = len(audio) / 1000.0
|
|
|
|
|
|
|
|
|
nonsilent_ranges = detect_nonsilent(
|
|
|
audio,
|
|
|
min_silence_len=min_silence_len,
|
|
|
silence_thresh=silence_thresh
|
|
|
)
|
|
|
|
|
|
clips_dir = (base_dir / clips_folder)
|
|
|
clips_dir.mkdir(parents=True, exist_ok=True)
|
|
|
clip_paths: List[str] = []
|
|
|
segments: List[Dict[str, Any]] = []
|
|
|
|
|
|
for idx, (start_ms, end_ms) in enumerate(nonsilent_ranges):
|
|
|
start = start_ms / 1000.0
|
|
|
end = end_ms / 1000.0
|
|
|
seg_dur = end - start
|
|
|
|
|
|
|
|
|
if seg_dur < min_segment_duration:
|
|
|
continue
|
|
|
|
|
|
|
|
|
if seg_dur > max_segment_duration:
|
|
|
n = int(math.ceil(seg_dur / max_segment_duration))
|
|
|
sub_d = seg_dur / n
|
|
|
for j in range(n):
|
|
|
s = start + j * sub_d
|
|
|
e = min(end, start + (j + 1) * sub_d)
|
|
|
if e <= s:
|
|
|
continue
|
|
|
clip = audio[int(s * 1000):int(e * 1000)]
|
|
|
cp = clips_dir / f"segment_{idx:03d}_{j:02d}.wav"
|
|
|
clip.export(cp, format="wav")
|
|
|
segments.append({"start": s, "end": e, "speaker": "UNKNOWN"})
|
|
|
clip_paths.append(str(cp))
|
|
|
else:
|
|
|
clip = audio[start_ms:end_ms]
|
|
|
cp = clips_dir / f"segment_{idx:03d}.wav"
|
|
|
clip.export(cp, format="wav")
|
|
|
segments.append({"start": start, "end": end, "speaker": "UNKNOWN"})
|
|
|
clip_paths.append(str(cp))
|
|
|
|
|
|
|
|
|
if not segments:
|
|
|
cp = clips_dir / "segment_000.wav"
|
|
|
audio.export(cp, format="wav")
|
|
|
return (
|
|
|
[str(cp)],
|
|
|
[{"start": 0.0, "end": duration, "speaker": "UNKNOWN"}],
|
|
|
{"diarization_ok": False, "error": "no_segments_after_silence_filter", "token_source": "silence-based"},
|
|
|
[{"service": "silence-detection", "phase": "done", "message": "Segmentation by silence completed"}]
|
|
|
)
|
|
|
|
|
|
diar_info = {
|
|
|
"diarization_ok": True,
|
|
|
"error": "",
|
|
|
"token_source": "silence-based",
|
|
|
"method": "silence-detection",
|
|
|
"num_segments": len(segments)
|
|
|
}
|
|
|
connection_logs = [{
|
|
|
"service": "silence-detection",
|
|
|
"phase": "done",
|
|
|
"message": f"Segmented audio into {len(segments)} clips based on silence"
|
|
|
}]
|
|
|
|
|
|
return clip_paths, segments, diar_info, connection_logs
|
|
|
|
|
|
|
|
|
def diarize_audio(
|
|
|
wav_path: str,
|
|
|
base_dir: Path,
|
|
|
clips_folder: str = "clips",
|
|
|
min_segment_duration: float = 20.0,
|
|
|
max_segment_duration: float = 50.0,
|
|
|
hf_token_env: str | None = None,
|
|
|
use_silence_fallback: bool = True,
|
|
|
force_silence_only: bool = False,
|
|
|
silence_thresh: int = -40,
|
|
|
min_silence_len: int = 500,
|
|
|
) -> Tuple[List[str], List[Dict[str, Any]], Dict[str, Any], List[Dict[str, Any]]]:
|
|
|
"""Diarization with pyannote (or silence-based fallback) and clip export with pydub.
|
|
|
|
|
|
Args:
|
|
|
force_silence_only: If True, skip pyannote and use silence-based segmentation directly.
|
|
|
use_silence_fallback: If True and pyannote fails, use silence-based segmentation.
|
|
|
silence_thresh: dBFS threshold for silence detection (default -40).
|
|
|
min_silence_len: Minimum silence length in milliseconds (default 500).
|
|
|
|
|
|
Returns (clip_paths, segments, info) where info includes diarization_ok and optional error.
|
|
|
"""
|
|
|
|
|
|
if force_silence_only or not HAS_PYANNOTE:
|
|
|
if not HAS_PYANNOTE:
|
|
|
log.info("pyannote not available, using silence-based segmentation")
|
|
|
else:
|
|
|
log.info("Using silence-based segmentation (forced)")
|
|
|
return diarize_audio_silence_based(
|
|
|
wav_path, base_dir, clips_folder,
|
|
|
min_segment_duration, max_segment_duration,
|
|
|
silence_thresh, min_silence_len
|
|
|
)
|
|
|
|
|
|
from pydub import AudioSegment
|
|
|
audio = AudioSegment.from_wav(wav_path)
|
|
|
duration = len(audio) / 1000.0
|
|
|
|
|
|
diarization = None
|
|
|
connection_logs: List[Dict[str, Any]] = []
|
|
|
diar_info: Dict[str, Any] = {"diarization_ok": True, "error": "", "token_source": ""}
|
|
|
try:
|
|
|
|
|
|
_env_token = os.getenv("PYANNOTE_TOKEN")
|
|
|
_token = hf_token_env or _env_token
|
|
|
diar_info["token_source"] = "hf_token_env" if hf_token_env else ("PYANNOTE_TOKEN" if _env_token else "none")
|
|
|
import time as _t
|
|
|
t0 = _t.time()
|
|
|
pipeline = Pipeline.from_pretrained(
|
|
|
"pyannote/speaker-diarization-3.1",
|
|
|
use_auth_token=_token
|
|
|
)
|
|
|
connection_logs.append({"service": "pyannote", "phase": "connect", "message": "Connecting to pyannote server..."})
|
|
|
diarization = pipeline(wav_path)
|
|
|
dt = _t.time() - t0
|
|
|
connection_logs.append({"service": "pyannote", "phase": "done", "message": f"Response from pyannote received in {dt:.2f} s"})
|
|
|
except Exception as e:
|
|
|
log.warning(f"Diarization unavailable: {e}")
|
|
|
diar_info.update({"diarization_ok": False, "error": str(e)})
|
|
|
connection_logs.append({"service": "pyannote", "phase": "error", "message": f"pyannote error: {str(e)}"})
|
|
|
|
|
|
|
|
|
if use_silence_fallback:
|
|
|
log.info("Attempting silence-based segmentation as fallback...")
|
|
|
return diarize_audio_silence_based(
|
|
|
wav_path, base_dir, clips_folder,
|
|
|
min_segment_duration, max_segment_duration,
|
|
|
silence_thresh, min_silence_len
|
|
|
)
|
|
|
|
|
|
clips_dir = (base_dir / clips_folder)
|
|
|
clips_dir.mkdir(parents=True, exist_ok=True)
|
|
|
clip_paths: List[str] = []
|
|
|
segments: List[Dict[str, Any]] = []
|
|
|
spk_map: Dict[str, int] = {}
|
|
|
prev_end = 0.0
|
|
|
|
|
|
if diarization is not None:
|
|
|
for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
|
|
|
start, end = max(0.0, float(turn.start)), min(duration, float(turn.end))
|
|
|
if start < prev_end:
|
|
|
start = prev_end
|
|
|
if end <= start:
|
|
|
continue
|
|
|
|
|
|
seg_dur = end - start
|
|
|
if seg_dur < min_segment_duration:
|
|
|
continue
|
|
|
|
|
|
if seg_dur > max_segment_duration:
|
|
|
n = int(math.ceil(seg_dur / max_segment_duration))
|
|
|
sub_d = seg_dur / n
|
|
|
for j in range(n):
|
|
|
s = start + j * sub_d
|
|
|
e = min(end, start + (j + 1) * sub_d)
|
|
|
if e <= s:
|
|
|
continue
|
|
|
clip = audio[int(s * 1000):int(e * 1000)]
|
|
|
cp = clips_dir / f"segment_{i:03d}_{j:02d}.wav"
|
|
|
clip.export(cp, format="wav")
|
|
|
if speaker not in spk_map:
|
|
|
spk_map[speaker] = len(spk_map)
|
|
|
segments.append({"start": s, "end": e, "speaker": f"SPEAKER_{spk_map[speaker]:02d}"})
|
|
|
clip_paths.append(str(cp))
|
|
|
prev_end = e
|
|
|
else:
|
|
|
clip = audio[int(start * 1000):int(end * 1000)]
|
|
|
cp = clips_dir / f"segment_{i:03d}.wav"
|
|
|
clip.export(cp, format="wav")
|
|
|
if speaker not in spk_map:
|
|
|
spk_map[speaker] = len(spk_map)
|
|
|
segments.append({"start": start, "end": end, "speaker": f"SPEAKER_{spk_map[speaker]:02d}"})
|
|
|
clip_paths.append(str(cp))
|
|
|
prev_end = end
|
|
|
|
|
|
if not segments:
|
|
|
cp = clips_dir / "segment_000.wav"
|
|
|
audio.export(cp, format="wav")
|
|
|
|
|
|
if diar_info.get("error"):
|
|
|
|
|
|
pass
|
|
|
else:
|
|
|
diar_info["reason"] = "no_segments_after_filter"
|
|
|
return [str(cp)], [{"start": 0.0, "end": duration, "speaker": "SPEAKER_00"}], diar_info, connection_logs
|
|
|
|
|
|
pairs = sorted(zip(clip_paths, segments), key=lambda x: x[1]["start"])
|
|
|
clip_paths, segments = [p[0] for p in pairs], [p[1] for p in pairs]
|
|
|
return clip_paths, segments, diar_info, connection_logs
|
|
|
|
|
|
|
|
|
|
|
|
class VoiceEmbedder:
|
|
|
def __init__(self):
|
|
|
|
|
|
|
|
|
|
|
|
from pathlib import Path as _P
|
|
|
|
|
|
safe_savedir = _P("/data/pretrained_models/spkrec-ecapa-voxceleb")
|
|
|
safe_savedir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
self.model = SpeakerRecognition.from_hparams(
|
|
|
source="speechbrain/spkrec-ecapa-voxceleb",
|
|
|
savedir=str(safe_savedir),
|
|
|
)
|
|
|
self.model.eval()
|
|
|
|
|
|
def embed(self, wav_path: str) -> List[float]:
|
|
|
|
|
|
try:
|
|
|
import torch as _torch
|
|
|
except Exception:
|
|
|
_torch = None
|
|
|
|
|
|
if HAS_TORCHAUDIO:
|
|
|
waveform, sr = ta.load(wav_path)
|
|
|
target_sr = 16000
|
|
|
if sr != target_sr:
|
|
|
waveform = T.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
|
|
if waveform.shape[0] > 1:
|
|
|
waveform = waveform.mean(dim=0, keepdim=True)
|
|
|
min_samples = int(0.2 * target_sr)
|
|
|
if waveform.shape[1] < min_samples:
|
|
|
pad = min_samples - waveform.shape[1]
|
|
|
if _torch is None:
|
|
|
raise RuntimeError("Torch not available for padding")
|
|
|
waveform = _torch.cat([waveform, _torch.zeros((1, pad))], dim=1)
|
|
|
if _torch is None:
|
|
|
raise RuntimeError("Torch not available for inference")
|
|
|
with _torch.no_grad():
|
|
|
emb = self.model.encode_batch(waveform).squeeze().cpu().numpy().astype(float)
|
|
|
return emb.tolist()
|
|
|
else:
|
|
|
y, sr = load_wav(wav_path, sr=16000)
|
|
|
min_len = int(0.2 * 16000)
|
|
|
if len(y) < min_len:
|
|
|
y = np.pad(y, (0, min_len - len(y)))
|
|
|
if _torch is None:
|
|
|
raise RuntimeError("Torch not available for inference")
|
|
|
w = _torch.from_numpy(y).unsqueeze(0).unsqueeze(0)
|
|
|
with _torch.no_grad():
|
|
|
emb = self.model.encode_batch(w).squeeze().cpu().numpy().astype(float)
|
|
|
return emb.tolist()
|
|
|
|
|
|
|
|
|
def embed_voice_segments(clip_paths: List[str]) -> List[List[float]]:
|
|
|
ve = VoiceEmbedder()
|
|
|
out: List[List[float]] = []
|
|
|
for cp in clip_paths:
|
|
|
try:
|
|
|
out.append(ve.embed(cp))
|
|
|
except Exception as e:
|
|
|
log.warning(f"Embedding error in {cp}: {e}")
|
|
|
out.append([])
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
def identify_speakers(
|
|
|
embeddings: List[List[float]],
|
|
|
voice_collection,
|
|
|
cfg: Dict[str, Any],
|
|
|
) -> List[str]:
|
|
|
voice_cfg = cfg.get("voice_processing", {}).get("speaker_identification", {})
|
|
|
if not embeddings or sum(1 for e in embeddings if e) < 2:
|
|
|
return ["SPEAKER_00" for _ in embeddings]
|
|
|
|
|
|
valid = [e for e in embeddings if e and len(e) > 0]
|
|
|
if len(valid) < 2:
|
|
|
return ["SPEAKER_00" for _ in embeddings]
|
|
|
|
|
|
min_clusters = max(1, int(voice_cfg.get("min_speakers", 1)))
|
|
|
max_clusters = min(int(voice_cfg.get("max_speakers", 5)), len(valid) - 1)
|
|
|
|
|
|
if voice_cfg.get("find_optimal_clusters", True) and len(valid) > 2:
|
|
|
best_score, best_k = -1.0, min_clusters
|
|
|
for k in range(min_clusters, max_clusters + 1):
|
|
|
if k >= len(valid):
|
|
|
break
|
|
|
km = KMeans(n_clusters=k, random_state=42, n_init="auto")
|
|
|
labels = km.fit_predict(valid)
|
|
|
if len(set(labels)) > 1:
|
|
|
score = silhouette_score(valid, labels)
|
|
|
if score > best_score:
|
|
|
best_score, best_k = score, k
|
|
|
else:
|
|
|
best_k = min(max_clusters, max(min_clusters, int(voice_cfg.get("num_speakers", 2))))
|
|
|
best_k = max(1, min(best_k, len(valid) - 1))
|
|
|
|
|
|
km = KMeans(n_clusters=best_k, random_state=42, n_init="auto", init="k-means++")
|
|
|
labels = km.fit_predict(np.array(valid))
|
|
|
centers = km.cluster_centers_
|
|
|
|
|
|
cluster_to_name: Dict[int, str] = {}
|
|
|
unknown_counter = 0
|
|
|
for cid in range(best_k):
|
|
|
center = centers[cid].tolist()
|
|
|
name = f"SPEAKER_{cid:02d}"
|
|
|
if voice_collection is not None:
|
|
|
try:
|
|
|
q = voice_collection.query(query_embeddings=[center], n_results=1)
|
|
|
metas = q.get("metadatas", [[]])[0]
|
|
|
dists = q.get("distances", [[]])[0]
|
|
|
thr = voice_cfg.get("distance_threshold")
|
|
|
if dists and thr is not None and dists[0] > thr:
|
|
|
name = f"UNKNOWN_{unknown_counter}"
|
|
|
unknown_counter += 1
|
|
|
voice_collection.add(
|
|
|
embeddings=[center],
|
|
|
metadatas=[{"name": name}],
|
|
|
ids=[f"unk_{cid}_{unknown_counter}"],
|
|
|
)
|
|
|
else:
|
|
|
if metas and isinstance(metas[0], dict):
|
|
|
name = metas[0].get("nombre") or metas[0].get("name") \
|
|
|
or metas[0].get("speaker") or metas[0].get("identity") or name
|
|
|
except Exception as e:
|
|
|
log.warning(f"Voice KNN query failed: {e}")
|
|
|
cluster_to_name[cid] = name
|
|
|
|
|
|
personas: List[str] = []
|
|
|
vi = 0
|
|
|
for emb in embeddings:
|
|
|
if not emb:
|
|
|
personas.append("UNKNOWN")
|
|
|
else:
|
|
|
label = int(labels[vi])
|
|
|
personas.append(cluster_to_name.get(label, f"SPEAKER_{label:02d}"))
|
|
|
vi += 1
|
|
|
return personas
|
|
|
|
|
|
|
|
|
|
|
|
def _fmt_srt_time(seconds: float) -> str:
|
|
|
h = int(seconds // 3600)
|
|
|
m = int((seconds % 3600) // 60)
|
|
|
s = int(seconds % 60)
|
|
|
ms = int(round((seconds - int(seconds)) * 1000))
|
|
|
return f"{h:02}:{m:02}:{s:02},{ms:03}"
|
|
|
|
|
|
def generate_srt_from_diarization(
|
|
|
diarization_segments: List[Dict[str, Any]],
|
|
|
transcriptions: List[str],
|
|
|
speakers_per_segment: List[str],
|
|
|
output_srt_path: str,
|
|
|
cfg: Dict[str, Any],
|
|
|
) -> None:
|
|
|
subs = cfg.get("subtitles", {})
|
|
|
max_cpl = int(subs.get("max_chars_per_line", 42))
|
|
|
max_lines = int(subs.get("max_lines_per_cue", 10))
|
|
|
speaker_display = subs.get("speaker_display", "brackets")
|
|
|
|
|
|
items: List[Dict[str, Any]] = []
|
|
|
n = min(len(diarization_segments), len(transcriptions), len(speakers_per_segment))
|
|
|
for i in range(n):
|
|
|
seg = diarization_segments[i]
|
|
|
text = (transcriptions[i] or "").strip()
|
|
|
spk = speakers_per_segment[i]
|
|
|
items.append({"start": float(seg.get("start", 0.0)), "end": float(seg.get("end", 0.0)), "text": text, "speaker": spk})
|
|
|
|
|
|
out = Path(output_srt_path)
|
|
|
out.parent.mkdir(parents=True, exist_ok=True)
|
|
|
with out.open("w", encoding="utf-8-sig") as f:
|
|
|
for i, it in enumerate(items, 1):
|
|
|
text = it["text"]
|
|
|
spk = it["speaker"]
|
|
|
if speaker_display == "brackets" and spk:
|
|
|
text = f"[{spk}]: {text}"
|
|
|
elif speaker_display == "prefix" and spk:
|
|
|
text = f"{spk}: {text}"
|
|
|
words = text.split()
|
|
|
lines: List[str] = []
|
|
|
cur = ""
|
|
|
for w in words:
|
|
|
if len(cur) + len(w) + (1 if cur else 0) <= max_cpl:
|
|
|
cur = (cur + " " + w) if cur else w
|
|
|
else:
|
|
|
lines.append(cur)
|
|
|
cur = w
|
|
|
if len(lines) >= max_lines - 1:
|
|
|
break
|
|
|
if cur and len(lines) < max_lines:
|
|
|
lines.append(cur)
|
|
|
f.write(f"{i}\n{_fmt_srt_time(it['start'])} --> {_fmt_srt_time(it['end'])}\n")
|
|
|
f.write("\n".join(lines) + "\n\n")
|
|
|
|
|
|
|
|
|
|
|
|
def process_audio_for_video(
|
|
|
video_path: str,
|
|
|
out_dir: Path,
|
|
|
cfg: Dict[str, Any],
|
|
|
voice_collection=None,
|
|
|
) -> Tuple[List[Dict[str, Any]], Optional[str], str, Dict[str, Any], List[Dict[str, Any]]]:
|
|
|
"""
|
|
|
Audio pipeline: FFmpeg -> diarization -> remote ASR (full + clips) -> embeddings -> speaker-ID -> SRT.
|
|
|
Returns (audio_segments, srt_path or None, full_transcription_text).
|
|
|
"""
|
|
|
audio_cfg = cfg.get("audio_processing", {})
|
|
|
sr = int(audio_cfg.get("sample_rate", 16000))
|
|
|
fmt = audio_cfg.get("format", "wav")
|
|
|
wav_path = extract_audio_ffmpeg(video_path, out_dir / f"{Path(video_path).stem}.{fmt}", sr=sr)
|
|
|
log.info("Audio extraído")
|
|
|
|
|
|
diar_cfg = audio_cfg.get("diarization", {})
|
|
|
min_dur = float(diar_cfg.get("min_segment_duration", 0.5))
|
|
|
max_dur = float(diar_cfg.get("max_segment_duration", 10.0))
|
|
|
force_silence = bool(diar_cfg.get("force_silence_only", True))
|
|
|
silence_thresh = int(diar_cfg.get("silence_thresh", -40))
|
|
|
min_silence_len = int(diar_cfg.get("min_silence_len", 500))
|
|
|
|
|
|
clip_paths, diar_segs, diar_info, connection_logs = diarize_audio(
|
|
|
wav_path, out_dir, "clips", min_dur, max_dur,
|
|
|
force_silence_only=force_silence,
|
|
|
silence_thresh=silence_thresh,
|
|
|
min_silence_len=min_silence_len
|
|
|
)
|
|
|
log.info("Clips de audio generados.")
|
|
|
|
|
|
full_transcription = ""
|
|
|
asr_section = cfg.get("asr", {})
|
|
|
if asr_section.get("enable_full_transcription", True):
|
|
|
log.info("Transcripción completa (remota, Space 'asr')...")
|
|
|
import time as _t
|
|
|
t0 = _t.time()
|
|
|
connection_logs.append({"service": "asr", "phase": "connect", "message": "Connecting to ASR space..."})
|
|
|
full_res = transcribe_audio_remote(wav_path, cfg)
|
|
|
dt = _t.time() - t0
|
|
|
connection_logs.append({"service": "asr", "phase": "done", "message": f"Response from ASR space received in {dt:.2f} s"})
|
|
|
full_transcription = full_res.get("text", "") or ""
|
|
|
log.info("Transcripción completa finalizada.")
|
|
|
|
|
|
log.info("Transcripción por clip (remota, Space 'asr')...")
|
|
|
trans: List[str] = []
|
|
|
for cp in clip_paths:
|
|
|
import time as _t
|
|
|
t0 = _t.time()
|
|
|
connection_logs.append({"service": "asr", "phase": "connect", "message": "Transcribing clip via ASR space..."})
|
|
|
res = transcribe_audio_remote(cp, cfg)
|
|
|
dt = _t.time() - t0
|
|
|
connection_logs.append({"service": "asr", "phase": "done", "message": f"Clip transcribed in {dt:.2f} s"})
|
|
|
trans.append(res.get("text", ""))
|
|
|
|
|
|
log.info("Se han transcrito todos los clips.")
|
|
|
|
|
|
embeddings = embed_voice_segments(clip_paths) if audio_cfg.get("enable_voice_embeddings", True) else [[] for _ in clip_paths]
|
|
|
|
|
|
if cfg.get("voice_processing", {}).get("speaker_identification", {}).get("enabled", True):
|
|
|
speakers = identify_speakers(embeddings, voice_collection, cfg)
|
|
|
log.info("Speakers identificados correctamente.")
|
|
|
else:
|
|
|
speakers = [seg.get("speaker", f"SPEAKER_{i:02d}") for i, seg in enumerate(diar_segs)]
|
|
|
|
|
|
audio_segments: List[Dict[str, Any]] = []
|
|
|
for i, seg in enumerate(diar_segs):
|
|
|
audio_segments.append(
|
|
|
{
|
|
|
"segment": i,
|
|
|
"start": float(seg.get("start", 0.0)),
|
|
|
"end": float(seg.get("end", 0.0)),
|
|
|
"speaker": speakers[i] if i < len(speakers) else seg.get("speaker", f"SPEAKER_{i:02d}"),
|
|
|
"text": trans[i] if i < len(trans) else "",
|
|
|
"voice_embedding": embeddings[i],
|
|
|
"clip_path": clip_paths[i] if i < len(clip_paths) else str(out_dir / "clips" / f"segment_{i:03d}.wav"),
|
|
|
"lang": "ca",
|
|
|
"lang_prob": 1.0,
|
|
|
}
|
|
|
)
|
|
|
|
|
|
srt_base_path = out_dir / f"transcripcion_diarizada_{Path(video_path).stem}"
|
|
|
srt_unmodified_path = str(srt_base_path) + "_unmodified.srt"
|
|
|
|
|
|
try:
|
|
|
generate_srt_from_diarization(
|
|
|
diar_segs,
|
|
|
[a["text"] for a in audio_segments],
|
|
|
[a["speaker"] for a in audio_segments],
|
|
|
srt_unmodified_path,
|
|
|
cfg,
|
|
|
)
|
|
|
except Exception as e:
|
|
|
log.warning(f"SRT generation failed: {e}")
|
|
|
srt_unmodified_path = None
|
|
|
|
|
|
return audio_segments, srt_unmodified_path, full_transcription, diar_info, connection_logs
|
|
|
|