File size: 12,070 Bytes
e653427 cf71735 472c07f cf71735 e653427 9167d9a aec9643 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
from __future__ import annotations
# utils.py
import os
import yaml
import subprocess
from pathlib import Path
from dataclasses import dataclass
import shlex # Para manejar argumentos de línea de comandos de forma segura
from typing import List, Optional, Callable
import re
import xml.etree.ElementTree as ET
def get_project_root() -> Path:
"""
Devuelve la ruta raíz correcta dependiendo de si se ejecuta en un
Space de Hugging Face (escribible) o en local.
"""
if os.getenv("SPACE_ID") is not None:
return Path("/tmp/data")
# En local, la raíz del proyecto es la carpeta 'app', que contiene este fichero.
return Path(__file__).resolve().parent.parent
def incrustar_subtitulos_ffmpeg(
input_video_path: str,
srt_path: str,
output_video_path: str,
codificar_audio: bool = False,
sobreescribir: bool = False
):
"""
Incrusta (quema/hardsub) un archivo SRT sobre un vídeo MP4 usando FFmpeg.
IMPORTANTE: Requiere que FFmpeg esté instalado en el sistema.
:param input_video_path: Ruta al archivo de vídeo MP4 de entrada.
:param srt_path: Ruta al archivo de subtítulos SRT.
:param output_video_path: Ruta donde se guardará el nuevo vídeo con subtítulos.
:param codificar_audio: Si es True, recodifica el audio (más lento, pero a veces soluciona problemas).
Si es False (por defecto), copia el stream de audio (-c:a copy).
:param sobreescribir: Si es True, permite sobreescribir el archivo de salida si ya existe.
:raises FileNotFoundError: Si no se encuentra FFmpeg o alguno de los archivos de entrada.
:raises subprocess.CalledProcessError: Si FFmpeg devuelve un error durante el proceso.
"""
if not os.path.exists(input_video_path):
raise FileNotFoundError(f"Vídeo de entrada no encontrado: {input_video_path}")
if not os.path.exists(srt_path):
raise FileNotFoundError(f"Archivo SRT no encontrado: {srt_path}")
# Comando base de FFmpeg para incrustar subtítulos
# -i: archivo de entrada
# -vf: filtro de vídeo, usamos 'subtitles=' para quemar el srt
# -c:v: copia el stream de vídeo original (rápido, sin recodificar el vídeo)
# IMPORTANTE: Para que FFmpeg pueda quemar subtítulos, debe usar el filtro 'subtitles'
# y *no* copiar el stream de vídeo (-c:v copy). Debemos recodificar el vídeo (-c:v libx264).
# Solo copiaremos el audio para ahorrar tiempo a menos que se indique lo contrario.
audio_codec_arg = ["-c:a", "copy"] if not codificar_audio else []
# Se recomienda usar -c:v libx264 y -preset fast para una buena velocidad/calidad
video_codec_arg = ["-c:v", "libx264", "-preset", "fast"]
ffmpeg_command = [
"ffmpeg",
"-i", input_video_path,
"-vf", f"subtitles={srt_path}", # Filtro para quemar el SRT
*video_codec_arg,
*audio_codec_arg,
output_video_path
]
if sobreescribir:
ffmpeg_command.insert(1, "-y") # Añade la opción de sobreescribir
print(f"Ejecutando comando FFmpeg: {' '.join(shlex.quote(arg) for arg in ffmpeg_command)}")
try:
# Ejecutar el comando FFmpeg
subprocess.run(
ffmpeg_command,
check=True, # Lanza CalledProcessError si el código de retorno no es 0
stdout=subprocess.PIPE, # Captura la salida estándar (para no saturar)
stderr=subprocess.PIPE, # Captura la salida de error (donde FFmpeg imprime el progreso)
text=True
)
print(f"\nÉxito: Vídeo con subtítulos guardado en: {output_video_path}")
except FileNotFoundError:
print("ERROR: El comando 'ffmpeg' no fue encontrado.")
print("Asegúrate de que FFmpeg esté instalado y accesible en tu PATH.")
raise
except subprocess.CalledProcessError as e:
print(f"ERROR: FFmpeg falló con el código {e.returncode}")
print("Salida de error de FFmpeg:")
print(e.stderr)
raise
@dataclass
class AppConfig:
app: dict
api: dict
storage: dict
sqlite: dict
security: dict
ui: dict
def load_config(path: str = "config.yaml") -> AppConfig:
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"No encuentro {path.resolve()}. Sube un config.yaml válido.")
with path.open("r", encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
for k in ["app", "api", "storage", "sqlite", "security", "ui"]:
if k not in cfg:
raise ValueError(f"Falta la clave '{k}' en config.yaml")
return AppConfig(**cfg)
def ensure_dirs(root: str | Path):
root = Path(root)
(root / "uploads").mkdir(parents=True, exist_ok=True)
(root / "results").mkdir(parents=True, exist_ok=True)
def save_bytes(path: str | Path, data: bytes):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
f.write(data)
def save_text(path: str | Path, text: str):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(text)
def human_size(num_bytes: int) -> str:
units = ["B", "KB", "MB", "GB", "TB", "PB"]
n = float(num_bytes)
for u in units:
if n < 1024.0:
return f"{n:.1f} {u}"
n /= 1024.0
return f"{n:.1f} EB"
def recortar_video(input_path: str, output_path: str, duracion_segundos: int = 240):
"""
Corta los primeros `duracion_segundos` de un MP4 sin recodificar (rápido y sin pérdida).
Requiere ffmpeg en PATH (en conda-forge ya viene).
"""
input_path = str(Path(input_path))
output_path = str(Path(output_path))
cmd = [
"ffmpeg",
"-y", # sobrescribe salida
"-hide_banner",
"-loglevel", "error", # mensajes limpios
"-ss", "0", # inicio
"-i", input_path, # entrada
"-t", str(duracion_segundos), # duración
"-c", "copy", # copia streams sin recodificar
output_path
]
subprocess.run(cmd, check=True)
# ---- Núcleo: SRT -> ESF (XML string) ----
TIME_RE = re.compile(
r"(?P<start>\d{2}:\d{2}:\d{2}[,\.]\d{3})\s*-->\s*(?P<end>\d{2}:\d{2}:\d{2}[,\.]\d{3})"
)
@dataclass
class Cue:
index: int
start: str # "HH:MM:SS.mmm"
end: str # "HH:MM:SS.mmm"
text: str
def _norm_ts(ts: str) -> str:
"""Convierte '01:02:03,456' -> '01:02:03.456'."""
return ts.replace(",", ".")
def _parse_srt(srt_text: str) -> List[Cue]:
"""Parsea SRT a una lista de cues normalizados."""
srt_text = srt_text.replace("\r\n", "\n").replace("\r", "\n")
blocks = [b.strip() for b in re.split(r"\n\s*\n", srt_text) if b.strip()]
cues: List[Cue] = []
for block in blocks:
lines = block.split("\n")
# Detectar si la primera línea es índice
idx = None
if lines and lines[0].strip().isdigit():
idx = int(lines[0].strip())
time_candidates = lines[1:]
else:
idx = len(cues) + 1
time_candidates = lines
m = None
time_line_idx = None
for i, ln in enumerate(time_candidates[:3]): # robustez
mm = TIME_RE.search(ln)
if mm:
m = mm
time_line_idx = i
break
if not m:
raise ValueError(f"Bloque SRT sin tiempos válidos (índice {idx}):\n{block}")
start = _norm_ts(m.group("start"))
end = _norm_ts(m.group("end"))
text_lines = time_candidates[time_line_idx + 1 :]
text = "\n".join(text_lines).strip()
cues.append(Cue(index=idx, start=start, end=end, text=text))
# Re-indexar por si venía desordenado
for i, c in enumerate(cues, 1):
c.index = i
return cues
def _build_esf_tree(
cues: List[Cue],
language: str = "es",
voice_db: float = -6.0,
original_db: float = -3.0,
audio_lookup: Optional[Callable[[int], Optional[str]]] = None,
) -> ET.ElementTree:
"""
Construye el árbol XML ESF.
audio_lookup: función opcional index->filename (p. ej., lambda i: f\"{i:03d}.wav\" si existe).
"""
root = ET.Element("esef", attrib={"version": "1.0"})
header = ET.SubElement(root, "header")
ET.SubElement(header, "language").text = language
mix = ET.SubElement(header, "mix")
ET.SubElement(mix, "voice", attrib={"level": f"{voice_db}dB"})
ET.SubElement(mix, "original", attrib={"level": f"{original_db}dB"})
ad = ET.SubElement(root, "ad")
for c in cues:
attrs = {"in": c.start, "out": c.end}
if audio_lookup:
fname = audio_lookup(c.index)
if fname:
attrs["file"] = fname
cue_el = ET.SubElement(ad, "cue", attrib=attrs)
cue_el.text = c.text
return ET.ElementTree(root)
def _xml_pretty_string(tree: ET.ElementTree) -> str:
"""Devuelve XML con sangría (sin depender de minidom)."""
def _indent(elem, level=0):
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
for e in elem:
_indent(e, level + 1)
if not e.tail or not e.tail.strip():
e.tail = i
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
root = tree.getroot()
_indent(root)
return ET.tostring(root, encoding="utf-8", xml_declaration=True).decode("utf-8")
def srt_to_esf(
srt_text: str,
*,
language: str = "es",
voice_db: float = -6.0,
original_db: float = -3.0,
audio_lookup: Optional[Callable[[int], Optional[str]]] = None,
) -> str:
"""
Convierte un SRT (texto) en un ESF (XML) y lo devuelve como string.
Parámetros:
srt_text: Contenido del .srt.
language: Código ISO del idioma (por defecto 'es').
voice_db: Nivel de la voz AD (dB).
original_db: Nivel del audio original (dB).
audio_lookup: Función opcional index->filename para asociar locuciones por cue.
Retorna:
Cadena XML ESF.
"""
cues = _parse_srt(srt_text)
tree = _build_esf_tree(
cues,
language=language,
voice_db=voice_db,
original_db=original_db,
audio_lookup=audio_lookup,
)
return _xml_pretty_string(tree)
#----------------------------
if __name__ == "__main__":
pass
# Ejemplo de recorte de vídeo
#recortar_video("videos/curtmetratge_1/curtmetratge_1.mp4", "videos/curtmetratge_1/curtmetratge_1b.mp4", 240)
'''# Crea archivos de prueba si no existen
# (Necesitarás archivos MP4 y SRT reales para que funcione)
INPUT_VID = "mi_video_original.mp4"
INPUT_SRT = "mis_subtitulos.srt"
OUTPUT_VID = "mi_video_con_subtitulos.mp4"
# Simulación de creación de archivos para que no falle inmediatamente en un entorno de prueba
if not os.path.exists(INPUT_SRT):
with open(INPUT_SRT, 'w', encoding='utf-8') as f:
f.write("1\n00:00:01,000 --> 00:00:04,000\n¡Hola! Estos son subtítulos incrustados.\n\n")
f.write("2\n00:00:05,000 --> 00:00:08,000\nGenerado con FFmpeg desde Python.\n")
print(f"Creado archivo SRT de prueba: {INPUT_SRT}")
if not os.path.exists(INPUT_VID):
print(f"ADVERTENCIA: Archivo de vídeo de prueba '{INPUT_VID}' no encontrado. Por favor, reemplázalo por un archivo MP4 real.")
try:
# Llama a la función principal
incrustar_subtitulos_ffmpeg(
input_video_path=INPUT_VID,
srt_path=INPUT_SRT,
output_video_path=OUTPUT_VID,
sobreescribir=True # Permite ejecutar el ejemplo varias veces
)
except Exception as e:
print(f"\nUn error general ocurrió durante el proceso: {e}")''' |