engine / main_process /main_router.py
VeuReu's picture
Update main_process/main_router.py
ea9c4ec verified
import os
import io
from pathlib import Path
from typing import Counter,List, Dict
import ast
import json
import torch
from svision_client import extract_scenes, add_ocr_and_faces, keyframes_every_second_extraction, extract_descripcion_escena
from asr_client import extract_audio_from_video, diarize_audio, transcribe_long_audio, transcribe_short_audio, identificar_veu
from fastapi import APIRouter, UploadFile, File, Query, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
from storage.common import validate_token
from storage.files.file_manager import FileManager
from storage.embeddings_routers import get_embeddings_json
EMBEDDINGS_ROOT = Path("/data/embeddings")
MEDIA_ROOT = Path("/data/media")
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
router = APIRouter(prefix="/transcription", tags=["Initial Transcription Process"])
HF_TOKEN = os.getenv("VEUREU_TOKEN")
def get_casting(video_sha1: str):
"""Recupera els embeddings reals de càsting per a un vídeo a partir del seu SHA1.
Llegeix el JSON d'embeddings que demo ha pujat prèviament a /data/embeddings
mitjançant l'endpoint /embeddings/upload_embeddings i en retorna les
columnes face_col i voice_col.
"""
# get_embeddings_json retorna el JSON complet tal com es va pujar (casting_json)
faces_json = get_embeddings_json(video_sha1, "faces")
faces_json = faces_json["face_col"]
print("--------------")
print("la base de datos de caras es ")
print(faces_json)
voices_json = get_embeddings_json(video_sha1, "voices")
voices_json = voices_json["voice_col"]
print("--------------")
print("la base de datos de voces es ")
print(voices_json)
return faces_json, voices_json
def map_identities_per_second(frames_per_second, intervals):
for seg in intervals:
seg_start = seg["start"]
seg_end = seg["end"]
identities = []
for f in frames_per_second:
if seg_start <= f["start"] <= seg_end:
for face in f.get("faces", []):
identities.append(face)
seg["counts"] = dict(Counter(identities))
return intervals
def _fmt_srt_time(seconds: float) -> str:
"""Formatea segundos en el formato SRT HH:MM:SS,mmm"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds - int(seconds)) * 1000)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
from pathlib import Path
from typing import List, Dict
from fastapi import HTTPException
def generate_srt_from_segments(segments: List[Dict], sha1: str) -> str:
"""
Generate an SRT subtitle file from diarization/transcription segments.
This function:
- Creates the required folder structure for storing SRTs.
- Removes any previous SRT files for the same SHA1.
- Builds the SRT content with timestamps, speaker identity and transcription.
- Saves the SRT file to disk.
- Returns the SRT content as a string (to be sent by the endpoint).
Parameters
----------
segments : List[Dict]
List of dictionaries containing:
- "start": float (start time in seconds)
- "end": float (end time in seconds)
- "speaker": dict with "identity"
- "transcription": str
sha1 : str
Identifier used to locate the target media folder.
Returns
-------
str
Full SRT file content as a string.
"""
# Path: /data/media/<sha1>
video_root = MEDIA_ROOT / sha1
video_root.mkdir(parents=True, exist_ok=True)
# Path: /data/media/<sha1>/origin_srt
srt_dir = video_root / "initial_srt"
srt_dir.mkdir(parents=True, exist_ok=True)
# Delete old SRT files
try:
for old_srt in srt_dir.glob("*.srt"):
old_srt.unlink()
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Failed to delete old SRT files: {exc}")
# Save file as initial.srt
final_path = srt_dir / "initial.srt"
# Build SRT content
srt_lines = []
for i, seg in enumerate(segments, start=1):
start = seg.get("start", 0.0)
end = seg.get("end", 0.0)
transcription = seg.get("transcription", "").strip()
speaker_info = seg.get("speaker", {})
speaker = speaker_info.get("identity", "Unknown")
text = f"[{speaker}]: {transcription}" if speaker else transcription
entry = (
f"{i}\n"
f"{_fmt_srt_time(start)} --> {_fmt_srt_time(end)}\n"
f"{text}\n"
)
srt_lines.append(entry)
# Join with blank lines
srt_content = "\n".join(srt_lines)
# Write to disk
try:
with final_path.open("w", encoding="utf-8-sig") as f:
f.write(srt_content)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Failed to write SRT file: {exc}")
return srt_content
def pipeline_preprocessing_vision(video_path: str, face_col):
"""
Pipeline que toma un video y realiza todo el preprocesamiento del video de la parte de vision.
"""
print(f"Procesando video para visión: {video_path}")
print("----------------------")
print(face_col)
print("Extrayendo escenas...")
threshold: float = 30.0
offset_frames: int = 240
crop_ratio: float = 0.1
result_extract_scenes = extract_scenes(video_path, threshold, offset_frames, crop_ratio)
print(result_extract_scenes)
# Obtener las rutas de las imágenes y la información de las escenas
escenas = result_extract_scenes[0] if len(result_extract_scenes) > 0 else []
escenas_paths = [f["image"] for f in escenas]
print(escenas_paths)
info_escenas = result_extract_scenes[1] if len(result_extract_scenes) > 1 else []
print(info_escenas)
print("Extrayendo imagenes por segundo...")
result_extract_per_second = keyframes_every_second_extraction(video_path)
# Obtener las rutas de las imágenes y la información de las escenas
images_per_second = result_extract_per_second[0] if len(result_extract_per_second) > 0 else []
images_per_second_paths = [f["image"] for f in images_per_second]
info_images_per_second = result_extract_per_second[1] if len(result_extract_per_second) > 1 else []
print("Aumentamos la información de las escenas viendo quién aparece en cada escena y detectando OCR...")
info_escenas_completa = []
for imagen_escena, info_escena in zip(escenas_paths, info_escenas):
result_add_ocr_and_faces = add_ocr_and_faces(imagen_escena, info_escena, face_col)
info_escenas_completa.append(result_add_ocr_and_faces)
print("Aumentamos la información de las imagenes por segundo viendo quién aparece en cada escena y detectando OCR...")
info_images_per_second_completa = []
for imagen_segundo, info_segundo in zip(images_per_second_paths, info_images_per_second):
result_add_ocr_and_faces =add_ocr_and_faces(imagen_segundo, info_segundo, face_col)
info_images_per_second_completa.append(result_add_ocr_and_faces)
print(info_escenas_completa)
print("Ahora se va a tratar los OCR (se sustituirán ciertas escenas por alguna de las imágenes por segundo si tienen mejor OCR)...")
# Se hará lo último
print("Combinando información de escenas e imágenes por segundo...")
info_escenas_completa = map_identities_per_second(info_images_per_second_completa, info_escenas_completa)
print(info_escenas_completa)
print("Ahora se incluyen en los diccionarios de las escenas la descripciones de estas.")
for escena_path, info_escena in zip(escenas_paths, info_escenas_completa):
descripcion_escena = extract_descripcion_escena(escena_path)
lista = ast.literal_eval(descripcion_escena)
frase = lista[0]
info_escena["descripcion"] = frase
del descripcion_escena
torch.cuda.empty_cache()
return info_escenas_completa, info_images_per_second_completa
def pipeline_preprocessing_audio(video_path: str, voice_col):
"""
Pipeline que toma un video y realiza todo el preprocesamiento del video de la parte de audio.
"""
print(f"Procesando video para audio: {video_path}")
print("Extrayendo audio del video...")
audio_video = extract_audio_from_video(video_path)
print(audio_video)
print("Diartizando el audio...")
diarization_audio = diarize_audio(audio_video)
print(diarization_audio)
clips_path = diarization_audio[0]
print(clips_path)
diarization_info = diarization_audio[1]
print(diarization_info)
print("Transcribiendo el video completo...")
full_transcription = transcribe_long_audio(audio_video)
print(full_transcription)
print("Transcribiendo los clips diartizados...")
for clip_path, clip_info in zip(clips_path, diarization_info):
clip_transcription = transcribe_short_audio(clip_path)
clip_info["transcription"] = clip_transcription
print("Calculando los embeddings para cada uno de los clips obtenidos y posteriormente identificar las voces...")
for clip_path, clip_info in zip(clips_path, diarization_info):
clip_speaker = identificar_veu(clip_path, voice_col)
clip_info["speaker"] = clip_speaker
return full_transcription, diarization_info
@router.post("/generate_initial_srt_and_info", tags=["Initial Transcription Process"])
async def pipeline_video_analysis(
sha1: str,
token: str = Query(..., description="Token required for authorization")
):
"""
Endpoint that processes a full video identified by its SHA1 folder, performs
complete audio-visual preprocessing, and returns an SRT subtitle file.
This pipeline integrates:
- Vision preprocessing (scene detection, keyframes, OCR, face recognition)
- Audio preprocessing (diarization, speech recognition, speaker identity matching)
- Identity mapping between vision and audio streams
- Final generation of an SRT file describing who speaks and when
Parameters
----------
sha1 : str
Identifier corresponding to the folder containing the video and related assets.
token : str
Security token required for authorization.
Returns
-------
str
The generated SRT file (as text) containing time-aligned subtitles with
speaker identities and transcriptions.
"""
validate_token(token)
# Resolve directories
file_manager = FileManager(MEDIA_ROOT)
sha1_folder = MEDIA_ROOT / sha1
clip_folder = sha1_folder / "clip"
if not sha1_folder.exists() or not sha1_folder.is_dir():
raise HTTPException(status_code=404, detail="SHA1 folder not found")
if not clip_folder.exists() or not clip_folder.is_dir():
raise HTTPException(status_code=404, detail="Clip folder not found")
# Locate video file
mp4_files = list(clip_folder.glob("*.mp4"))
if not mp4_files:
raise HTTPException(status_code=404, detail="No MP4 files found")
video_path = mp4_files[0]
# Convert absolute path to a relative path for FileManager
video_path = MEDIA_ROOT / video_path.relative_to(MEDIA_ROOT)
print(f"Processing full video: {video_path}")
# Get face and voice embeddings for casting
face_col, voice_col = get_casting(sha1)
# Vision processing pipeline
info_escenas, info_images_per_second = pipeline_preprocessing_vision(video_path, face_col)
torch.cuda.empty_cache()
# Audio processing pipeline
full_transcription, info_clips = pipeline_preprocessing_audio(video_path, voice_col)
# Merge identities from vision pipeline with audio segments
info_clips = map_identities_per_second(info_images_per_second, info_clips)
# Generate the final SRT subtitle file
srt = generate_srt_from_segments(info_clips, sha1)
# Create result JSON
result_json = {
"full_transcription": full_transcription,
"info_escenas": info_escenas,
"info_clips": info_clips
}
# Path: /data/media/<sha1>
video_root = MEDIA_ROOT / sha1
video_root.mkdir(parents=True, exist_ok=True)
# Path: /data/media/<sha1>/origin_srt
srt_dir = video_root / "initial_srt"
srt_dir.mkdir(parents=True, exist_ok=True)
final_path = srt_dir / "initial_info.json"
with final_path.open("w", encoding="utf-8") as f:
json.dump({
"full_transcription": full_transcription,
"info_escenas": info_escenas,
"info_clips": info_clips
}, f, ensure_ascii=False, indent=4)
# The endpoint returns OK message info
return {"status": "ok", "message": "Initial SRT and info JSON generated"}
def get_initial_info_path(sha1:str):
video_root = MEDIA_ROOT / sha1
srt_dir = video_root / "initial_srt"
final_path = srt_dir / "initial_info.json"
if not video_root.exists() or not video_root.is_dir():
raise HTTPException(status_code=404, detail="SHA1 folder not found")
if not srt_dir.exists() or not srt_dir.is_dir():
raise HTTPException(status_code=404, detail="initial_srt folder not found")
if not final_path.exists() or not final_path.is_file():
raise HTTPException(status_code=404, detail="initial_info JSON not found")
return final_path
def get_initial_srt_path(sha1:str):
video_root = MEDIA_ROOT / sha1
srt_dir = video_root / "initial_srt"
final_path = srt_dir / "initial.srt"
if not video_root.exists() or not video_root.is_dir():
raise HTTPException(status_code=404, detail="SHA1 folder not found")
if not srt_dir.exists() or not srt_dir.is_dir():
raise HTTPException(status_code=404, detail="initial_srt folder not found")
if not final_path.exists() or not final_path.is_file():
raise HTTPException(status_code=404, detail="initial.srt SRT not found")
return final_path
@router.get("/download_initial_srt", tags=["Initial Transcription Process"])
def download_initial_srt(
sha1: str,
token: str = Query(..., description="Token required for authorization")
):
"""
Download the cast CSV for a specific video identified by its SHA-1.
The CSV is expected under:
/data/media/<sha1>/cast/cast.csv
Steps:
- Validate the token.
- Ensure /data/media/<sha1> and /cast exist.
- Return the CSV as a FileResponse.
- Raise 404 if any folder or file is missing.
"""
validate_token(token)
file_path = get_initial_srt_path(sha1)
return FileResponse(
path=file_path,
media_type="text/srt",
filename="initial.srt"
)
@router.get("/download_initial_info", tags=["Initial Transcription Process"])
def download_initial_info(
sha1: str,
token: str = Query(..., description="Token required for authorization")
):
"""
Download the cast CSV for a specific video identified by its SHA-1.
The CSV is expected under:
/data/media/<sha1>/cast/cast.csv
Steps:
- Validate the token.
- Ensure /data/media/<sha1> and /cast exist.
- Return the CSV as a FileResponse.
- Raise 404 if any folder or file is missing.
"""
validate_token(token)
file_path = get_initial_info_path(sha1)
return FileResponse(
path=file_path,
media_type="text/json",
filename="initial_info.json"
)