|
|
from __future__ import annotations |
|
|
|
|
|
|
|
|
import os |
|
|
import yaml |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from dataclasses import dataclass |
|
|
import shlex |
|
|
from typing import List, Optional, Callable |
|
|
import re |
|
|
import xml.etree.ElementTree as ET |
|
|
|
|
|
|
|
|
def get_project_root() -> Path: |
|
|
""" |
|
|
Devuelve la ruta raíz correcta dependiendo de si se ejecuta en un |
|
|
Space de Hugging Face (escribible) o en local. |
|
|
""" |
|
|
if os.getenv("SPACE_ID") is not None: |
|
|
return Path("/tmp/data") |
|
|
|
|
|
return Path(__file__).resolve().parent.parent |
|
|
|
|
|
|
|
|
def incrustar_subtitulos_ffmpeg( |
|
|
input_video_path: str, |
|
|
srt_path: str, |
|
|
output_video_path: str, |
|
|
codificar_audio: bool = False, |
|
|
sobreescribir: bool = False |
|
|
): |
|
|
""" |
|
|
Incrusta (quema/hardsub) un archivo SRT sobre un vídeo MP4 usando FFmpeg. |
|
|
|
|
|
IMPORTANTE: Requiere que FFmpeg esté instalado en el sistema. |
|
|
|
|
|
:param input_video_path: Ruta al archivo de vídeo MP4 de entrada. |
|
|
:param srt_path: Ruta al archivo de subtítulos SRT. |
|
|
:param output_video_path: Ruta donde se guardará el nuevo vídeo con subtítulos. |
|
|
:param codificar_audio: Si es True, recodifica el audio (más lento, pero a veces soluciona problemas). |
|
|
Si es False (por defecto), copia el stream de audio (-c:a copy). |
|
|
:param sobreescribir: Si es True, permite sobreescribir el archivo de salida si ya existe. |
|
|
:raises FileNotFoundError: Si no se encuentra FFmpeg o alguno de los archivos de entrada. |
|
|
:raises subprocess.CalledProcessError: Si FFmpeg devuelve un error durante el proceso. |
|
|
""" |
|
|
|
|
|
if not os.path.exists(input_video_path): |
|
|
raise FileNotFoundError(f"Vídeo de entrada no encontrado: {input_video_path}") |
|
|
if not os.path.exists(srt_path): |
|
|
raise FileNotFoundError(f"Archivo SRT no encontrado: {srt_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
audio_codec_arg = ["-c:a", "copy"] if not codificar_audio else [] |
|
|
|
|
|
|
|
|
video_codec_arg = ["-c:v", "libx264", "-preset", "fast"] |
|
|
|
|
|
ffmpeg_command = [ |
|
|
"ffmpeg", |
|
|
"-i", input_video_path, |
|
|
"-vf", f"subtitles={srt_path}", |
|
|
*video_codec_arg, |
|
|
*audio_codec_arg, |
|
|
output_video_path |
|
|
] |
|
|
|
|
|
if sobreescribir: |
|
|
ffmpeg_command.insert(1, "-y") |
|
|
|
|
|
print(f"Ejecutando comando FFmpeg: {' '.join(shlex.quote(arg) for arg in ffmpeg_command)}") |
|
|
|
|
|
try: |
|
|
|
|
|
subprocess.run( |
|
|
ffmpeg_command, |
|
|
check=True, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True |
|
|
) |
|
|
print(f"\nÉxito: Vídeo con subtítulos guardado en: {output_video_path}") |
|
|
|
|
|
except FileNotFoundError: |
|
|
print("ERROR: El comando 'ffmpeg' no fue encontrado.") |
|
|
print("Asegúrate de que FFmpeg esté instalado y accesible en tu PATH.") |
|
|
raise |
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"ERROR: FFmpeg falló con el código {e.returncode}") |
|
|
print("Salida de error de FFmpeg:") |
|
|
print(e.stderr) |
|
|
raise |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AppConfig: |
|
|
app: dict |
|
|
api: dict |
|
|
storage: dict |
|
|
sqlite: dict |
|
|
security: dict |
|
|
ui: dict |
|
|
|
|
|
def load_config(path: str = "config.yaml") -> AppConfig: |
|
|
path = Path(path) |
|
|
if not path.exists(): |
|
|
raise FileNotFoundError(f"No encuentro {path.resolve()}. Sube un config.yaml válido.") |
|
|
with path.open("r", encoding="utf-8") as f: |
|
|
cfg = yaml.safe_load(f) or {} |
|
|
for k in ["app", "api", "storage", "sqlite", "security", "ui"]: |
|
|
if k not in cfg: |
|
|
raise ValueError(f"Falta la clave '{k}' en config.yaml") |
|
|
return AppConfig(**cfg) |
|
|
|
|
|
def ensure_dirs(root: str | Path): |
|
|
root = Path(root) |
|
|
(root / "uploads").mkdir(parents=True, exist_ok=True) |
|
|
(root / "results").mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def save_bytes(path: str | Path, data: bytes): |
|
|
path = Path(path) |
|
|
path.parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(path, "wb") as f: |
|
|
f.write(data) |
|
|
|
|
|
def save_text(path: str | Path, text: str): |
|
|
path = Path(path) |
|
|
path.parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
f.write(text) |
|
|
|
|
|
def human_size(num_bytes: int) -> str: |
|
|
units = ["B", "KB", "MB", "GB", "TB", "PB"] |
|
|
n = float(num_bytes) |
|
|
for u in units: |
|
|
if n < 1024.0: |
|
|
return f"{n:.1f} {u}" |
|
|
n /= 1024.0 |
|
|
return f"{n:.1f} EB" |
|
|
|
|
|
def recortar_video(input_path: str, output_path: str, duracion_segundos: int = 240): |
|
|
""" |
|
|
Corta los primeros `duracion_segundos` de un MP4 sin recodificar (rápido y sin pérdida). |
|
|
Requiere ffmpeg en PATH (en conda-forge ya viene). |
|
|
""" |
|
|
input_path = str(Path(input_path)) |
|
|
output_path = str(Path(output_path)) |
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-y", |
|
|
"-hide_banner", |
|
|
"-loglevel", "error", |
|
|
"-ss", "0", |
|
|
"-i", input_path, |
|
|
"-t", str(duracion_segundos), |
|
|
"-c", "copy", |
|
|
output_path |
|
|
] |
|
|
subprocess.run(cmd, check=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TIME_RE = re.compile( |
|
|
r"(?P<start>\d{2}:\d{2}:\d{2}[,\.]\d{3})\s*-->\s*(?P<end>\d{2}:\d{2}:\d{2}[,\.]\d{3})" |
|
|
) |
|
|
|
|
|
@dataclass |
|
|
class Cue: |
|
|
index: int |
|
|
start: str |
|
|
end: str |
|
|
text: str |
|
|
|
|
|
def _norm_ts(ts: str) -> str: |
|
|
"""Convierte '01:02:03,456' -> '01:02:03.456'.""" |
|
|
return ts.replace(",", ".") |
|
|
|
|
|
def _parse_srt(srt_text: str) -> List[Cue]: |
|
|
"""Parsea SRT a una lista de cues normalizados.""" |
|
|
srt_text = srt_text.replace("\r\n", "\n").replace("\r", "\n") |
|
|
blocks = [b.strip() for b in re.split(r"\n\s*\n", srt_text) if b.strip()] |
|
|
cues: List[Cue] = [] |
|
|
|
|
|
for block in blocks: |
|
|
lines = block.split("\n") |
|
|
|
|
|
idx = None |
|
|
if lines and lines[0].strip().isdigit(): |
|
|
idx = int(lines[0].strip()) |
|
|
time_candidates = lines[1:] |
|
|
else: |
|
|
idx = len(cues) + 1 |
|
|
time_candidates = lines |
|
|
|
|
|
m = None |
|
|
time_line_idx = None |
|
|
for i, ln in enumerate(time_candidates[:3]): |
|
|
mm = TIME_RE.search(ln) |
|
|
if mm: |
|
|
m = mm |
|
|
time_line_idx = i |
|
|
break |
|
|
if not m: |
|
|
raise ValueError(f"Bloque SRT sin tiempos válidos (índice {idx}):\n{block}") |
|
|
|
|
|
start = _norm_ts(m.group("start")) |
|
|
end = _norm_ts(m.group("end")) |
|
|
text_lines = time_candidates[time_line_idx + 1 :] |
|
|
text = "\n".join(text_lines).strip() |
|
|
|
|
|
cues.append(Cue(index=idx, start=start, end=end, text=text)) |
|
|
|
|
|
|
|
|
for i, c in enumerate(cues, 1):
|
|
|
c.index = i
|
|
|
return cues
|
|
|
|
|
|
def _build_esf_tree(
|
|
|
cues: List[Cue],
|
|
|
language: str = "es",
|
|
|
voice_db: float = -6.0,
|
|
|
original_db: float = -3.0,
|
|
|
audio_lookup: Optional[Callable[[int], Optional[str]]] = None,
|
|
|
) -> ET.ElementTree:
|
|
|
"""
|
|
|
Construye el árbol XML ESF.
|
|
|
audio_lookup: función opcional index->filename (p. ej., lambda i: f\"{i:03d}.wav\" si existe).
|
|
|
"""
|
|
|
root = ET.Element("esef", attrib={"version": "1.0"})
|
|
|
header = ET.SubElement(root, "header")
|
|
|
ET.SubElement(header, "language").text = language
|
|
|
mix = ET.SubElement(header, "mix")
|
|
|
ET.SubElement(mix, "voice", attrib={"level": f"{voice_db}dB"})
|
|
|
ET.SubElement(mix, "original", attrib={"level": f"{original_db}dB"})
|
|
|
|
|
|
ad = ET.SubElement(root, "ad")
|
|
|
for c in cues:
|
|
|
attrs = {"in": c.start, "out": c.end}
|
|
|
if audio_lookup:
|
|
|
fname = audio_lookup(c.index)
|
|
|
if fname:
|
|
|
attrs["file"] = fname
|
|
|
cue_el = ET.SubElement(ad, "cue", attrib=attrs)
|
|
|
cue_el.text = c.text
|
|
|
return ET.ElementTree(root)
|
|
|
|
|
|
def _xml_pretty_string(tree: ET.ElementTree) -> str:
|
|
|
"""Devuelve XML con sangría (sin depender de minidom)."""
|
|
|
def _indent(elem, level=0):
|
|
|
i = "\n" + level * " "
|
|
|
if len(elem):
|
|
|
if not elem.text or not elem.text.strip():
|
|
|
elem.text = i + " "
|
|
|
for e in elem:
|
|
|
_indent(e, level + 1)
|
|
|
if not e.tail or not e.tail.strip():
|
|
|
e.tail = i
|
|
|
if level and (not elem.tail or not elem.tail.strip()):
|
|
|
elem.tail = i
|
|
|
root = tree.getroot()
|
|
|
_indent(root)
|
|
|
return ET.tostring(root, encoding="utf-8", xml_declaration=True).decode("utf-8")
|
|
|
|
|
|
def srt_to_esf(
|
|
|
srt_text: str,
|
|
|
*,
|
|
|
language: str = "es",
|
|
|
voice_db: float = -6.0,
|
|
|
original_db: float = -3.0,
|
|
|
audio_lookup: Optional[Callable[[int], Optional[str]]] = None,
|
|
|
) -> str:
|
|
|
"""
|
|
|
Convierte un SRT (texto) en un ESF (XML) y lo devuelve como string.
|
|
|
|
|
|
Parámetros:
|
|
|
srt_text: Contenido del .srt.
|
|
|
language: Código ISO del idioma (por defecto 'es').
|
|
|
voice_db: Nivel de la voz AD (dB).
|
|
|
original_db: Nivel del audio original (dB).
|
|
|
audio_lookup: Función opcional index->filename para asociar locuciones por cue.
|
|
|
|
|
|
Retorna:
|
|
|
Cadena XML ESF.
|
|
|
"""
|
|
|
cues = _parse_srt(srt_text)
|
|
|
tree = _build_esf_tree(
|
|
|
cues,
|
|
|
language=language,
|
|
|
voice_db=voice_db,
|
|
|
original_db=original_db,
|
|
|
audio_lookup=audio_lookup,
|
|
|
)
|
|
|
return _xml_pretty_string(tree)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
'''# Crea archivos de prueba si no existen
|
|
|
# (Necesitarás archivos MP4 y SRT reales para que funcione)
|
|
|
INPUT_VID = "mi_video_original.mp4"
|
|
|
INPUT_SRT = "mis_subtitulos.srt"
|
|
|
OUTPUT_VID = "mi_video_con_subtitulos.mp4"
|
|
|
|
|
|
# Simulación de creación de archivos para que no falle inmediatamente en un entorno de prueba
|
|
|
if not os.path.exists(INPUT_SRT):
|
|
|
with open(INPUT_SRT, 'w', encoding='utf-8') as f:
|
|
|
f.write("1\n00:00:01,000 --> 00:00:04,000\n¡Hola! Estos son subtítulos incrustados.\n\n")
|
|
|
f.write("2\n00:00:05,000 --> 00:00:08,000\nGenerado con FFmpeg desde Python.\n")
|
|
|
print(f"Creado archivo SRT de prueba: {INPUT_SRT}")
|
|
|
|
|
|
if not os.path.exists(INPUT_VID):
|
|
|
print(f"ADVERTENCIA: Archivo de vídeo de prueba '{INPUT_VID}' no encontrado. Por favor, reemplázalo por un archivo MP4 real.")
|
|
|
|
|
|
try:
|
|
|
# Llama a la función principal
|
|
|
incrustar_subtitulos_ffmpeg(
|
|
|
input_video_path=INPUT_VID,
|
|
|
srt_path=INPUT_SRT,
|
|
|
output_video_path=OUTPUT_VID,
|
|
|
sobreescribir=True # Permite ejecutar el ejemplo varias veces
|
|
|
)
|
|
|
except Exception as e:
|
|
|
print(f"\nUn error general ocurrió durante el proceso: {e}")''' |