from __future__ import annotations # utils.py import os import yaml import subprocess from pathlib import Path from dataclasses import dataclass import shlex # Para manejar argumentos de línea de comandos de forma segura from typing import List, Optional, Callable import re import xml.etree.ElementTree as ET def get_project_root() -> Path: """ Devuelve la ruta raíz correcta dependiendo de si se ejecuta en un Space de Hugging Face (escribible) o en local. """ if os.getenv("SPACE_ID") is not None: return Path("/tmp/data") # En local, la raíz del proyecto es la carpeta 'app', que contiene este fichero. return Path(__file__).resolve().parent.parent def incrustar_subtitulos_ffmpeg( input_video_path: str, srt_path: str, output_video_path: str, codificar_audio: bool = False, sobreescribir: bool = False ): """ Incrusta (quema/hardsub) un archivo SRT sobre un vídeo MP4 usando FFmpeg. IMPORTANTE: Requiere que FFmpeg esté instalado en el sistema. :param input_video_path: Ruta al archivo de vídeo MP4 de entrada. :param srt_path: Ruta al archivo de subtítulos SRT. :param output_video_path: Ruta donde se guardará el nuevo vídeo con subtítulos. :param codificar_audio: Si es True, recodifica el audio (más lento, pero a veces soluciona problemas). Si es False (por defecto), copia el stream de audio (-c:a copy). :param sobreescribir: Si es True, permite sobreescribir el archivo de salida si ya existe. :raises FileNotFoundError: Si no se encuentra FFmpeg o alguno de los archivos de entrada. :raises subprocess.CalledProcessError: Si FFmpeg devuelve un error durante el proceso. """ if not os.path.exists(input_video_path): raise FileNotFoundError(f"Vídeo de entrada no encontrado: {input_video_path}") if not os.path.exists(srt_path): raise FileNotFoundError(f"Archivo SRT no encontrado: {srt_path}") # Comando base de FFmpeg para incrustar subtítulos # -i: archivo de entrada # -vf: filtro de vídeo, usamos 'subtitles=' para quemar el srt # -c:v: copia el stream de vídeo original (rápido, sin recodificar el vídeo) # IMPORTANTE: Para que FFmpeg pueda quemar subtítulos, debe usar el filtro 'subtitles' # y *no* copiar el stream de vídeo (-c:v copy). Debemos recodificar el vídeo (-c:v libx264). # Solo copiaremos el audio para ahorrar tiempo a menos que se indique lo contrario. audio_codec_arg = ["-c:a", "copy"] if not codificar_audio else [] # Se recomienda usar -c:v libx264 y -preset fast para una buena velocidad/calidad video_codec_arg = ["-c:v", "libx264", "-preset", "fast"] ffmpeg_command = [ "ffmpeg", "-i", input_video_path, "-vf", f"subtitles={srt_path}", # Filtro para quemar el SRT *video_codec_arg, *audio_codec_arg, output_video_path ] if sobreescribir: ffmpeg_command.insert(1, "-y") # Añade la opción de sobreescribir print(f"Ejecutando comando FFmpeg: {' '.join(shlex.quote(arg) for arg in ffmpeg_command)}") try: # Ejecutar el comando FFmpeg subprocess.run( ffmpeg_command, check=True, # Lanza CalledProcessError si el código de retorno no es 0 stdout=subprocess.PIPE, # Captura la salida estándar (para no saturar) stderr=subprocess.PIPE, # Captura la salida de error (donde FFmpeg imprime el progreso) text=True ) print(f"\nÉxito: Vídeo con subtítulos guardado en: {output_video_path}") except FileNotFoundError: print("ERROR: El comando 'ffmpeg' no fue encontrado.") print("Asegúrate de que FFmpeg esté instalado y accesible en tu PATH.") raise except subprocess.CalledProcessError as e: print(f"ERROR: FFmpeg falló con el código {e.returncode}") print("Salida de error de FFmpeg:") print(e.stderr) raise @dataclass class AppConfig: app: dict api: dict storage: dict sqlite: dict security: dict ui: dict def load_config(path: str = "config.yaml") -> AppConfig: path = Path(path) if not path.exists(): raise FileNotFoundError(f"No encuentro {path.resolve()}. Sube un config.yaml válido.") with path.open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) or {} for k in ["app", "api", "storage", "sqlite", "security", "ui"]: if k not in cfg: raise ValueError(f"Falta la clave '{k}' en config.yaml") return AppConfig(**cfg) def ensure_dirs(root: str | Path): root = Path(root) (root / "uploads").mkdir(parents=True, exist_ok=True) (root / "results").mkdir(parents=True, exist_ok=True) def save_bytes(path: str | Path, data: bytes): path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "wb") as f: f.write(data) def save_text(path: str | Path, text: str): path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(text) def human_size(num_bytes: int) -> str: units = ["B", "KB", "MB", "GB", "TB", "PB"] n = float(num_bytes) for u in units: if n < 1024.0: return f"{n:.1f} {u}" n /= 1024.0 return f"{n:.1f} EB" def recortar_video(input_path: str, output_path: str, duracion_segundos: int = 240): """ Corta los primeros `duracion_segundos` de un MP4 sin recodificar (rápido y sin pérdida). Requiere ffmpeg en PATH (en conda-forge ya viene). """ input_path = str(Path(input_path)) output_path = str(Path(output_path)) cmd = [ "ffmpeg", "-y", # sobrescribe salida "-hide_banner", "-loglevel", "error", # mensajes limpios "-ss", "0", # inicio "-i", input_path, # entrada "-t", str(duracion_segundos), # duración "-c", "copy", # copia streams sin recodificar output_path ] subprocess.run(cmd, check=True) # ---- Núcleo: SRT -> ESF (XML string) ---- TIME_RE = re.compile( r"(?P\d{2}:\d{2}:\d{2}[,\.]\d{3})\s*-->\s*(?P\d{2}:\d{2}:\d{2}[,\.]\d{3})" ) @dataclass class Cue: index: int start: str # "HH:MM:SS.mmm" end: str # "HH:MM:SS.mmm" text: str def _norm_ts(ts: str) -> str: """Convierte '01:02:03,456' -> '01:02:03.456'.""" return ts.replace(",", ".") def _parse_srt(srt_text: str) -> List[Cue]: """Parsea SRT a una lista de cues normalizados.""" srt_text = srt_text.replace("\r\n", "\n").replace("\r", "\n") blocks = [b.strip() for b in re.split(r"\n\s*\n", srt_text) if b.strip()] cues: List[Cue] = [] for block in blocks: lines = block.split("\n") # Detectar si la primera línea es índice idx = None if lines and lines[0].strip().isdigit(): idx = int(lines[0].strip()) time_candidates = lines[1:] else: idx = len(cues) + 1 time_candidates = lines m = None time_line_idx = None for i, ln in enumerate(time_candidates[:3]): # robustez mm = TIME_RE.search(ln) if mm: m = mm time_line_idx = i break if not m: raise ValueError(f"Bloque SRT sin tiempos válidos (índice {idx}):\n{block}") start = _norm_ts(m.group("start")) end = _norm_ts(m.group("end")) text_lines = time_candidates[time_line_idx + 1 :] text = "\n".join(text_lines).strip() cues.append(Cue(index=idx, start=start, end=end, text=text)) # Re-indexar por si venía desordenado for i, c in enumerate(cues, 1): c.index = i return cues def _build_esf_tree( cues: List[Cue], language: str = "es", voice_db: float = -6.0, original_db: float = -3.0, audio_lookup: Optional[Callable[[int], Optional[str]]] = None, ) -> ET.ElementTree: """ Construye el árbol XML ESF. audio_lookup: función opcional index->filename (p. ej., lambda i: f\"{i:03d}.wav\" si existe). """ root = ET.Element("esef", attrib={"version": "1.0"}) header = ET.SubElement(root, "header") ET.SubElement(header, "language").text = language mix = ET.SubElement(header, "mix") ET.SubElement(mix, "voice", attrib={"level": f"{voice_db}dB"}) ET.SubElement(mix, "original", attrib={"level": f"{original_db}dB"}) ad = ET.SubElement(root, "ad") for c in cues: attrs = {"in": c.start, "out": c.end} if audio_lookup: fname = audio_lookup(c.index) if fname: attrs["file"] = fname cue_el = ET.SubElement(ad, "cue", attrib=attrs) cue_el.text = c.text return ET.ElementTree(root) def _xml_pretty_string(tree: ET.ElementTree) -> str: """Devuelve XML con sangría (sin depender de minidom).""" def _indent(elem, level=0): i = "\n" + level * " " if len(elem): if not elem.text or not elem.text.strip(): elem.text = i + " " for e in elem: _indent(e, level + 1) if not e.tail or not e.tail.strip(): e.tail = i if level and (not elem.tail or not elem.tail.strip()): elem.tail = i root = tree.getroot() _indent(root) return ET.tostring(root, encoding="utf-8", xml_declaration=True).decode("utf-8") def srt_to_esf( srt_text: str, *, language: str = "es", voice_db: float = -6.0, original_db: float = -3.0, audio_lookup: Optional[Callable[[int], Optional[str]]] = None, ) -> str: """ Convierte un SRT (texto) en un ESF (XML) y lo devuelve como string. Parámetros: srt_text: Contenido del .srt. language: Código ISO del idioma (por defecto 'es'). voice_db: Nivel de la voz AD (dB). original_db: Nivel del audio original (dB). audio_lookup: Función opcional index->filename para asociar locuciones por cue. Retorna: Cadena XML ESF. """ cues = _parse_srt(srt_text) tree = _build_esf_tree( cues, language=language, voice_db=voice_db, original_db=original_db, audio_lookup=audio_lookup, ) return _xml_pretty_string(tree) #---------------------------- if __name__ == "__main__": pass # Ejemplo de recorte de vídeo #recortar_video("videos/curtmetratge_1/curtmetratge_1.mp4", "videos/curtmetratge_1/curtmetratge_1b.mp4", 240) '''# Crea archivos de prueba si no existen # (Necesitarás archivos MP4 y SRT reales para que funcione) INPUT_VID = "mi_video_original.mp4" INPUT_SRT = "mis_subtitulos.srt" OUTPUT_VID = "mi_video_con_subtitulos.mp4" # Simulación de creación de archivos para que no falle inmediatamente en un entorno de prueba if not os.path.exists(INPUT_SRT): with open(INPUT_SRT, 'w', encoding='utf-8') as f: f.write("1\n00:00:01,000 --> 00:00:04,000\n¡Hola! Estos son subtítulos incrustados.\n\n") f.write("2\n00:00:05,000 --> 00:00:08,000\nGenerado con FFmpeg desde Python.\n") print(f"Creado archivo SRT de prueba: {INPUT_SRT}") if not os.path.exists(INPUT_VID): print(f"ADVERTENCIA: Archivo de vídeo de prueba '{INPUT_VID}' no encontrado. Por favor, reemplázalo por un archivo MP4 real.") try: # Llama a la función principal incrustar_subtitulos_ffmpeg( input_video_path=INPUT_VID, srt_path=INPUT_SRT, output_video_path=OUTPUT_VID, sobreescribir=True # Permite ejecutar el ejemplo varias veces ) except Exception as e: print(f"\nUn error general ocurrió durante el proceso: {e}")'''