import os import io from pathlib import Path from typing import Counter,List, Dict import ast import json import torch from svision_client import extract_scenes, add_ocr_and_faces, keyframes_every_second_extraction, extract_descripcion_escena from asr_client import extract_audio_from_video, diarize_audio, transcribe_long_audio, transcribe_short_audio, identificar_veu from fastapi import APIRouter, UploadFile, File, Query, HTTPException from fastapi.responses import JSONResponse, StreamingResponse, FileResponse from storage.common import validate_token from storage.files.file_manager import FileManager from storage.embeddings_routers import get_embeddings_json EMBEDDINGS_ROOT = Path("/data/embeddings") MEDIA_ROOT = Path("/data/media") os.environ["CUDA_VISIBLE_DEVICES"] = "1" router = APIRouter(prefix="/transcription", tags=["Initial Transcription Process"]) HF_TOKEN = os.getenv("VEUREU_TOKEN") def get_casting(video_sha1: str): """Recupera els embeddings reals de càsting per a un vídeo a partir del seu SHA1. Llegeix el JSON d'embeddings que demo ha pujat prèviament a /data/embeddings mitjançant l'endpoint /embeddings/upload_embeddings i en retorna les columnes face_col i voice_col. """ # get_embeddings_json retorna el JSON complet tal com es va pujar (casting_json) faces_json = get_embeddings_json(video_sha1, "faces") faces_json = faces_json["face_col"] print("--------------") print("la base de datos de caras es ") print(faces_json) voices_json = get_embeddings_json(video_sha1, "voices") voices_json = voices_json["voice_col"] print("--------------") print("la base de datos de voces es ") print(voices_json) return faces_json, voices_json def map_identities_per_second(frames_per_second, intervals): for seg in intervals: seg_start = seg["start"] seg_end = seg["end"] identities = [] for f in frames_per_second: if seg_start <= f["start"] <= seg_end: for face in f.get("faces", []): identities.append(face) seg["counts"] = dict(Counter(identities)) return intervals def _fmt_srt_time(seconds: float) -> str: """Formatea segundos en el formato SRT HH:MM:SS,mmm""" h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) ms = int((seconds - int(seconds)) * 1000) return f"{h:02}:{m:02}:{s:02},{ms:03}" from pathlib import Path from typing import List, Dict from fastapi import HTTPException def generate_srt_from_segments(segments: List[Dict], sha1: str) -> str: """ Generate an SRT subtitle file from diarization/transcription segments. This function: - Creates the required folder structure for storing SRTs. - Removes any previous SRT files for the same SHA1. - Builds the SRT content with timestamps, speaker identity and transcription. - Saves the SRT file to disk. - Returns the SRT content as a string (to be sent by the endpoint). Parameters ---------- segments : List[Dict] List of dictionaries containing: - "start": float (start time in seconds) - "end": float (end time in seconds) - "speaker": dict with "identity" - "transcription": str sha1 : str Identifier used to locate the target media folder. Returns ------- str Full SRT file content as a string. """ # Path: /data/media/ video_root = MEDIA_ROOT / sha1 video_root.mkdir(parents=True, exist_ok=True) # Path: /data/media//origin_srt srt_dir = video_root / "initial_srt" srt_dir.mkdir(parents=True, exist_ok=True) # Delete old SRT files try: for old_srt in srt_dir.glob("*.srt"): old_srt.unlink() except Exception as exc: raise HTTPException(status_code=500, detail=f"Failed to delete old SRT files: {exc}") # Save file as initial.srt final_path = srt_dir / "initial.srt" # Build SRT content srt_lines = [] for i, seg in enumerate(segments, start=1): start = seg.get("start", 0.0) end = seg.get("end", 0.0) transcription = seg.get("transcription", "").strip() speaker_info = seg.get("speaker", {}) speaker = speaker_info.get("identity", "Unknown") text = f"[{speaker}]: {transcription}" if speaker else transcription entry = ( f"{i}\n" f"{_fmt_srt_time(start)} --> {_fmt_srt_time(end)}\n" f"{text}\n" ) srt_lines.append(entry) # Join with blank lines srt_content = "\n".join(srt_lines) # Write to disk try: with final_path.open("w", encoding="utf-8-sig") as f: f.write(srt_content) except Exception as exc: raise HTTPException(status_code=500, detail=f"Failed to write SRT file: {exc}") return srt_content def pipeline_preprocessing_vision(video_path: str, face_col): """ Pipeline que toma un video y realiza todo el preprocesamiento del video de la parte de vision. """ print(f"Procesando video para visión: {video_path}") print("----------------------") print(face_col) print("Extrayendo escenas...") threshold: float = 30.0 offset_frames: int = 240 crop_ratio: float = 0.1 result_extract_scenes = extract_scenes(video_path, threshold, offset_frames, crop_ratio) print(result_extract_scenes) # Obtener las rutas de las imágenes y la información de las escenas escenas = result_extract_scenes[0] if len(result_extract_scenes) > 0 else [] escenas_paths = [f["image"] for f in escenas] print(escenas_paths) info_escenas = result_extract_scenes[1] if len(result_extract_scenes) > 1 else [] print(info_escenas) print("Extrayendo imagenes por segundo...") result_extract_per_second = keyframes_every_second_extraction(video_path) # Obtener las rutas de las imágenes y la información de las escenas images_per_second = result_extract_per_second[0] if len(result_extract_per_second) > 0 else [] images_per_second_paths = [f["image"] for f in images_per_second] info_images_per_second = result_extract_per_second[1] if len(result_extract_per_second) > 1 else [] print("Aumentamos la información de las escenas viendo quién aparece en cada escena y detectando OCR...") info_escenas_completa = [] for imagen_escena, info_escena in zip(escenas_paths, info_escenas): result_add_ocr_and_faces = add_ocr_and_faces(imagen_escena, info_escena, face_col) info_escenas_completa.append(result_add_ocr_and_faces) print("Aumentamos la información de las imagenes por segundo viendo quién aparece en cada escena y detectando OCR...") info_images_per_second_completa = [] for imagen_segundo, info_segundo in zip(images_per_second_paths, info_images_per_second): result_add_ocr_and_faces =add_ocr_and_faces(imagen_segundo, info_segundo, face_col) info_images_per_second_completa.append(result_add_ocr_and_faces) print(info_escenas_completa) print("Ahora se va a tratar los OCR (se sustituirán ciertas escenas por alguna de las imágenes por segundo si tienen mejor OCR)...") # Se hará lo último print("Combinando información de escenas e imágenes por segundo...") info_escenas_completa = map_identities_per_second(info_images_per_second_completa, info_escenas_completa) print(info_escenas_completa) print("Ahora se incluyen en los diccionarios de las escenas la descripciones de estas.") for escena_path, info_escena in zip(escenas_paths, info_escenas_completa): descripcion_escena = extract_descripcion_escena(escena_path) lista = ast.literal_eval(descripcion_escena) frase = lista[0] info_escena["descripcion"] = frase del descripcion_escena torch.cuda.empty_cache() return info_escenas_completa, info_images_per_second_completa def pipeline_preprocessing_audio(video_path: str, voice_col): """ Pipeline que toma un video y realiza todo el preprocesamiento del video de la parte de audio. """ print(f"Procesando video para audio: {video_path}") print("Extrayendo audio del video...") audio_video = extract_audio_from_video(video_path) print(audio_video) print("Diartizando el audio...") diarization_audio = diarize_audio(audio_video) print(diarization_audio) clips_path = diarization_audio[0] print(clips_path) diarization_info = diarization_audio[1] print(diarization_info) print("Transcribiendo el video completo...") full_transcription = transcribe_long_audio(audio_video) print(full_transcription) print("Transcribiendo los clips diartizados...") for clip_path, clip_info in zip(clips_path, diarization_info): clip_transcription = transcribe_short_audio(clip_path) clip_info["transcription"] = clip_transcription print("Calculando los embeddings para cada uno de los clips obtenidos y posteriormente identificar las voces...") for clip_path, clip_info in zip(clips_path, diarization_info): clip_speaker = identificar_veu(clip_path, voice_col) clip_info["speaker"] = clip_speaker return full_transcription, diarization_info @router.post("/generate_initial_srt_and_info", tags=["Initial Transcription Process"]) async def pipeline_video_analysis( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Endpoint that processes a full video identified by its SHA1 folder, performs complete audio-visual preprocessing, and returns an SRT subtitle file. This pipeline integrates: - Vision preprocessing (scene detection, keyframes, OCR, face recognition) - Audio preprocessing (diarization, speech recognition, speaker identity matching) - Identity mapping between vision and audio streams - Final generation of an SRT file describing who speaks and when Parameters ---------- sha1 : str Identifier corresponding to the folder containing the video and related assets. token : str Security token required for authorization. Returns ------- str The generated SRT file (as text) containing time-aligned subtitles with speaker identities and transcriptions. """ validate_token(token) # Resolve directories file_manager = FileManager(MEDIA_ROOT) sha1_folder = MEDIA_ROOT / sha1 clip_folder = sha1_folder / "clip" if not sha1_folder.exists() or not sha1_folder.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not clip_folder.exists() or not clip_folder.is_dir(): raise HTTPException(status_code=404, detail="Clip folder not found") # Locate video file mp4_files = list(clip_folder.glob("*.mp4")) if not mp4_files: raise HTTPException(status_code=404, detail="No MP4 files found") video_path = mp4_files[0] # Convert absolute path to a relative path for FileManager video_path = MEDIA_ROOT / video_path.relative_to(MEDIA_ROOT) print(f"Processing full video: {video_path}") # Get face and voice embeddings for casting face_col, voice_col = get_casting(sha1) # Vision processing pipeline info_escenas, info_images_per_second = pipeline_preprocessing_vision(video_path, face_col) torch.cuda.empty_cache() # Audio processing pipeline full_transcription, info_clips = pipeline_preprocessing_audio(video_path, voice_col) # Merge identities from vision pipeline with audio segments info_clips = map_identities_per_second(info_images_per_second, info_clips) # Generate the final SRT subtitle file srt = generate_srt_from_segments(info_clips, sha1) # Create result JSON result_json = { "full_transcription": full_transcription, "info_escenas": info_escenas, "info_clips": info_clips } # Path: /data/media/ video_root = MEDIA_ROOT / sha1 video_root.mkdir(parents=True, exist_ok=True) # Path: /data/media//origin_srt srt_dir = video_root / "initial_srt" srt_dir.mkdir(parents=True, exist_ok=True) final_path = srt_dir / "initial_info.json" with final_path.open("w", encoding="utf-8") as f: json.dump({ "full_transcription": full_transcription, "info_escenas": info_escenas, "info_clips": info_clips }, f, ensure_ascii=False, indent=4) # The endpoint returns OK message info return {"status": "ok", "message": "Initial SRT and info JSON generated"} def get_initial_info_path(sha1:str): video_root = MEDIA_ROOT / sha1 srt_dir = video_root / "initial_srt" final_path = srt_dir / "initial_info.json" if not video_root.exists() or not video_root.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not srt_dir.exists() or not srt_dir.is_dir(): raise HTTPException(status_code=404, detail="initial_srt folder not found") if not final_path.exists() or not final_path.is_file(): raise HTTPException(status_code=404, detail="initial_info JSON not found") return final_path def get_initial_srt_path(sha1:str): video_root = MEDIA_ROOT / sha1 srt_dir = video_root / "initial_srt" final_path = srt_dir / "initial.srt" if not video_root.exists() or not video_root.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not srt_dir.exists() or not srt_dir.is_dir(): raise HTTPException(status_code=404, detail="initial_srt folder not found") if not final_path.exists() or not final_path.is_file(): raise HTTPException(status_code=404, detail="initial.srt SRT not found") return final_path @router.get("/download_initial_srt", tags=["Initial Transcription Process"]) def download_initial_srt( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Download the cast CSV for a specific video identified by its SHA-1. The CSV is expected under: /data/media//cast/cast.csv Steps: - Validate the token. - Ensure /data/media/ and /cast exist. - Return the CSV as a FileResponse. - Raise 404 if any folder or file is missing. """ validate_token(token) file_path = get_initial_srt_path(sha1) return FileResponse( path=file_path, media_type="text/srt", filename="initial.srt" ) @router.get("/download_initial_info", tags=["Initial Transcription Process"]) def download_initial_info( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Download the cast CSV for a specific video identified by its SHA-1. The CSV is expected under: /data/media//cast/cast.csv Steps: - Validate the token. - Ensure /data/media/ and /cast exist. - Return the CSV as a FileResponse. - Raise 404 if any folder or file is missing. """ validate_token(token) file_path = get_initial_info_path(sha1) return FileResponse( path=file_path, media_type="text/json", filename="initial_info.json" )