demo / utils.py
VeuReu's picture
Upload 2 files
472c07f
raw
history blame
12.1 kB
from __future__ import annotations
# utils.py
import os
import yaml
import subprocess
from pathlib import Path
from dataclasses import dataclass
import shlex # Para manejar argumentos de línea de comandos de forma segura
from typing import List, Optional, Callable
import re
import xml.etree.ElementTree as ET
def get_project_root() -> Path:
"""
Devuelve la ruta raíz correcta dependiendo de si se ejecuta en un
Space de Hugging Face (escribible) o en local.
"""
if os.getenv("SPACE_ID") is not None:
return Path("/tmp/data")
# En local, la raíz del proyecto es la carpeta 'app', que contiene este fichero.
return Path(__file__).resolve().parent.parent
def incrustar_subtitulos_ffmpeg(
input_video_path: str,
srt_path: str,
output_video_path: str,
codificar_audio: bool = False,
sobreescribir: bool = False
):
"""
Incrusta (quema/hardsub) un archivo SRT sobre un vídeo MP4 usando FFmpeg.
IMPORTANTE: Requiere que FFmpeg esté instalado en el sistema.
:param input_video_path: Ruta al archivo de vídeo MP4 de entrada.
:param srt_path: Ruta al archivo de subtítulos SRT.
:param output_video_path: Ruta donde se guardará el nuevo vídeo con subtítulos.
:param codificar_audio: Si es True, recodifica el audio (más lento, pero a veces soluciona problemas).
Si es False (por defecto), copia el stream de audio (-c:a copy).
:param sobreescribir: Si es True, permite sobreescribir el archivo de salida si ya existe.
:raises FileNotFoundError: Si no se encuentra FFmpeg o alguno de los archivos de entrada.
:raises subprocess.CalledProcessError: Si FFmpeg devuelve un error durante el proceso.
"""
if not os.path.exists(input_video_path):
raise FileNotFoundError(f"Vídeo de entrada no encontrado: {input_video_path}")
if not os.path.exists(srt_path):
raise FileNotFoundError(f"Archivo SRT no encontrado: {srt_path}")
# Comando base de FFmpeg para incrustar subtítulos
# -i: archivo de entrada
# -vf: filtro de vídeo, usamos 'subtitles=' para quemar el srt
# -c:v: copia el stream de vídeo original (rápido, sin recodificar el vídeo)
# IMPORTANTE: Para que FFmpeg pueda quemar subtítulos, debe usar el filtro 'subtitles'
# y *no* copiar el stream de vídeo (-c:v copy). Debemos recodificar el vídeo (-c:v libx264).
# Solo copiaremos el audio para ahorrar tiempo a menos que se indique lo contrario.
audio_codec_arg = ["-c:a", "copy"] if not codificar_audio else []
# Se recomienda usar -c:v libx264 y -preset fast para una buena velocidad/calidad
video_codec_arg = ["-c:v", "libx264", "-preset", "fast"]
ffmpeg_command = [
"ffmpeg",
"-i", input_video_path,
"-vf", f"subtitles={srt_path}", # Filtro para quemar el SRT
*video_codec_arg,
*audio_codec_arg,
output_video_path
]
if sobreescribir:
ffmpeg_command.insert(1, "-y") # Añade la opción de sobreescribir
print(f"Ejecutando comando FFmpeg: {' '.join(shlex.quote(arg) for arg in ffmpeg_command)}")
try:
# Ejecutar el comando FFmpeg
subprocess.run(
ffmpeg_command,
check=True, # Lanza CalledProcessError si el código de retorno no es 0
stdout=subprocess.PIPE, # Captura la salida estándar (para no saturar)
stderr=subprocess.PIPE, # Captura la salida de error (donde FFmpeg imprime el progreso)
text=True
)
print(f"\nÉxito: Vídeo con subtítulos guardado en: {output_video_path}")
except FileNotFoundError:
print("ERROR: El comando 'ffmpeg' no fue encontrado.")
print("Asegúrate de que FFmpeg esté instalado y accesible en tu PATH.")
raise
except subprocess.CalledProcessError as e:
print(f"ERROR: FFmpeg falló con el código {e.returncode}")
print("Salida de error de FFmpeg:")
print(e.stderr)
raise
@dataclass
class AppConfig:
app: dict
api: dict
storage: dict
sqlite: dict
security: dict
ui: dict
def load_config(path: str = "config.yaml") -> AppConfig:
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"No encuentro {path.resolve()}. Sube un config.yaml válido.")
with path.open("r", encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
for k in ["app", "api", "storage", "sqlite", "security", "ui"]:
if k not in cfg:
raise ValueError(f"Falta la clave '{k}' en config.yaml")
return AppConfig(**cfg)
def ensure_dirs(root: str | Path):
root = Path(root)
(root / "uploads").mkdir(parents=True, exist_ok=True)
(root / "results").mkdir(parents=True, exist_ok=True)
def save_bytes(path: str | Path, data: bytes):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
f.write(data)
def save_text(path: str | Path, text: str):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(text)
def human_size(num_bytes: int) -> str:
units = ["B", "KB", "MB", "GB", "TB", "PB"]
n = float(num_bytes)
for u in units:
if n < 1024.0:
return f"{n:.1f} {u}"
n /= 1024.0
return f"{n:.1f} EB"
def recortar_video(input_path: str, output_path: str, duracion_segundos: int = 240):
"""
Corta los primeros `duracion_segundos` de un MP4 sin recodificar (rápido y sin pérdida).
Requiere ffmpeg en PATH (en conda-forge ya viene).
"""
input_path = str(Path(input_path))
output_path = str(Path(output_path))
cmd = [
"ffmpeg",
"-y", # sobrescribe salida
"-hide_banner",
"-loglevel", "error", # mensajes limpios
"-ss", "0", # inicio
"-i", input_path, # entrada
"-t", str(duracion_segundos), # duración
"-c", "copy", # copia streams sin recodificar
output_path
]
subprocess.run(cmd, check=True)
# ---- Núcleo: SRT -> ESF (XML string) ----
TIME_RE = re.compile(
r"(?P<start>\d{2}:\d{2}:\d{2}[,\.]\d{3})\s*-->\s*(?P<end>\d{2}:\d{2}:\d{2}[,\.]\d{3})"
)
@dataclass
class Cue:
index: int
start: str # "HH:MM:SS.mmm"
end: str # "HH:MM:SS.mmm"
text: str
def _norm_ts(ts: str) -> str:
"""Convierte '01:02:03,456' -> '01:02:03.456'."""
return ts.replace(",", ".")
def _parse_srt(srt_text: str) -> List[Cue]:
"""Parsea SRT a una lista de cues normalizados."""
srt_text = srt_text.replace("\r\n", "\n").replace("\r", "\n")
blocks = [b.strip() for b in re.split(r"\n\s*\n", srt_text) if b.strip()]
cues: List[Cue] = []
for block in blocks:
lines = block.split("\n")
# Detectar si la primera línea es índice
idx = None
if lines and lines[0].strip().isdigit():
idx = int(lines[0].strip())
time_candidates = lines[1:]
else:
idx = len(cues) + 1
time_candidates = lines
m = None
time_line_idx = None
for i, ln in enumerate(time_candidates[:3]): # robustez
mm = TIME_RE.search(ln)
if mm:
m = mm
time_line_idx = i
break
if not m:
raise ValueError(f"Bloque SRT sin tiempos válidos (índice {idx}):\n{block}")
start = _norm_ts(m.group("start"))
end = _norm_ts(m.group("end"))
text_lines = time_candidates[time_line_idx + 1 :]
text = "\n".join(text_lines).strip()
cues.append(Cue(index=idx, start=start, end=end, text=text))
# Re-indexar por si venía desordenado
for i, c in enumerate(cues, 1):
c.index = i
return cues
def _build_esf_tree(
cues: List[Cue],
language: str = "es",
voice_db: float = -6.0,
original_db: float = -3.0,
audio_lookup: Optional[Callable[[int], Optional[str]]] = None,
) -> ET.ElementTree:
"""
Construye el árbol XML ESF.
audio_lookup: función opcional index->filename (p. ej., lambda i: f\"{i:03d}.wav\" si existe).
"""
root = ET.Element("esef", attrib={"version": "1.0"})
header = ET.SubElement(root, "header")
ET.SubElement(header, "language").text = language
mix = ET.SubElement(header, "mix")
ET.SubElement(mix, "voice", attrib={"level": f"{voice_db}dB"})
ET.SubElement(mix, "original", attrib={"level": f"{original_db}dB"})
ad = ET.SubElement(root, "ad")
for c in cues:
attrs = {"in": c.start, "out": c.end}
if audio_lookup:
fname = audio_lookup(c.index)
if fname:
attrs["file"] = fname
cue_el = ET.SubElement(ad, "cue", attrib=attrs)
cue_el.text = c.text
return ET.ElementTree(root)
def _xml_pretty_string(tree: ET.ElementTree) -> str:
"""Devuelve XML con sangría (sin depender de minidom)."""
def _indent(elem, level=0):
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
for e in elem:
_indent(e, level + 1)
if not e.tail or not e.tail.strip():
e.tail = i
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
root = tree.getroot()
_indent(root)
return ET.tostring(root, encoding="utf-8", xml_declaration=True).decode("utf-8")
def srt_to_esf(
srt_text: str,
*,
language: str = "es",
voice_db: float = -6.0,
original_db: float = -3.0,
audio_lookup: Optional[Callable[[int], Optional[str]]] = None,
) -> str:
"""
Convierte un SRT (texto) en un ESF (XML) y lo devuelve como string.
Parámetros:
srt_text: Contenido del .srt.
language: Código ISO del idioma (por defecto 'es').
voice_db: Nivel de la voz AD (dB).
original_db: Nivel del audio original (dB).
audio_lookup: Función opcional index->filename para asociar locuciones por cue.
Retorna:
Cadena XML ESF.
"""
cues = _parse_srt(srt_text)
tree = _build_esf_tree(
cues,
language=language,
voice_db=voice_db,
original_db=original_db,
audio_lookup=audio_lookup,
)
return _xml_pretty_string(tree)
#----------------------------
if __name__ == "__main__":
pass
# Ejemplo de recorte de vídeo
#recortar_video("videos/curtmetratge_1/curtmetratge_1.mp4", "videos/curtmetratge_1/curtmetratge_1b.mp4", 240)
'''# Crea archivos de prueba si no existen
# (Necesitarás archivos MP4 y SRT reales para que funcione)
INPUT_VID = "mi_video_original.mp4"
INPUT_SRT = "mis_subtitulos.srt"
OUTPUT_VID = "mi_video_con_subtitulos.mp4"
# Simulación de creación de archivos para que no falle inmediatamente en un entorno de prueba
if not os.path.exists(INPUT_SRT):
with open(INPUT_SRT, 'w', encoding='utf-8') as f:
f.write("1\n00:00:01,000 --> 00:00:04,000\n¡Hola! Estos son subtítulos incrustados.\n\n")
f.write("2\n00:00:05,000 --> 00:00:08,000\nGenerado con FFmpeg desde Python.\n")
print(f"Creado archivo SRT de prueba: {INPUT_SRT}")
if not os.path.exists(INPUT_VID):
print(f"ADVERTENCIA: Archivo de vídeo de prueba '{INPUT_VID}' no encontrado. Por favor, reemplázalo por un archivo MP4 real.")
try:
# Llama a la función principal
incrustar_subtitulos_ffmpeg(
input_video_path=INPUT_VID,
srt_path=INPUT_SRT,
output_video_path=OUTPUT_VID,
sobreescribir=True # Permite ejecutar el ejemplo varias veces
)
except Exception as e:
print(f"\nUn error general ocurrió durante el proceso: {e}")'''