|
|
import os |
|
|
import io |
|
|
|
|
|
from pathlib import Path |
|
|
from typing import Counter,List, Dict |
|
|
import ast |
|
|
import json |
|
|
import torch |
|
|
from svision_client import extract_scenes, add_ocr_and_faces, keyframes_every_second_extraction, extract_descripcion_escena |
|
|
from asr_client import extract_audio_from_video, diarize_audio, transcribe_long_audio, transcribe_short_audio, identificar_veu |
|
|
|
|
|
from fastapi import APIRouter, UploadFile, File, Query, HTTPException |
|
|
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse |
|
|
|
|
|
from storage.common import validate_token |
|
|
from storage.files.file_manager import FileManager |
|
|
from storage.embeddings_routers import get_embeddings_json |
|
|
|
|
|
EMBEDDINGS_ROOT = Path("/data/embeddings") |
|
|
MEDIA_ROOT = Path("/data/media") |
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = "1" |
|
|
router = APIRouter(prefix="/transcription", tags=["Initial Transcription Process"]) |
|
|
HF_TOKEN = os.getenv("VEUREU_TOKEN") |
|
|
|
|
|
def get_casting(video_sha1: str): |
|
|
"""Recupera els embeddings reals de càsting per a un vídeo a partir del seu SHA1. |
|
|
|
|
|
Llegeix el JSON d'embeddings que demo ha pujat prèviament a /data/embeddings |
|
|
mitjançant l'endpoint /embeddings/upload_embeddings i en retorna les |
|
|
columnes face_col i voice_col. |
|
|
""" |
|
|
|
|
|
|
|
|
faces_json = get_embeddings_json(video_sha1, "faces") |
|
|
faces_json = faces_json["face_col"] |
|
|
print("--------------") |
|
|
print("la base de datos de caras es ") |
|
|
print(faces_json) |
|
|
voices_json = get_embeddings_json(video_sha1, "voices") |
|
|
voices_json = voices_json["voice_col"] |
|
|
print("--------------") |
|
|
print("la base de datos de voces es ") |
|
|
print(voices_json) |
|
|
|
|
|
return faces_json, voices_json |
|
|
|
|
|
def map_identities_per_second(frames_per_second, intervals): |
|
|
for seg in intervals: |
|
|
seg_start = seg["start"] |
|
|
seg_end = seg["end"] |
|
|
|
|
|
identities = [] |
|
|
for f in frames_per_second: |
|
|
if seg_start <= f["start"] <= seg_end: |
|
|
for face in f.get("faces", []): |
|
|
identities.append(face) |
|
|
|
|
|
seg["counts"] = dict(Counter(identities)) |
|
|
|
|
|
return intervals |
|
|
|
|
|
def _fmt_srt_time(seconds: float) -> str: |
|
|
"""Formatea segundos en el formato SRT HH:MM:SS,mmm""" |
|
|
h = int(seconds // 3600) |
|
|
m = int((seconds % 3600) // 60) |
|
|
s = int(seconds % 60) |
|
|
ms = int((seconds - int(seconds)) * 1000) |
|
|
return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
|
|
|
|
from pathlib import Path |
|
|
from typing import List, Dict |
|
|
from fastapi import HTTPException |
|
|
|
|
|
|
|
|
def generate_srt_from_segments(segments: List[Dict], sha1: str) -> str: |
|
|
""" |
|
|
Generate an SRT subtitle file from diarization/transcription segments. |
|
|
|
|
|
This function: |
|
|
- Creates the required folder structure for storing SRTs. |
|
|
- Removes any previous SRT files for the same SHA1. |
|
|
- Builds the SRT content with timestamps, speaker identity and transcription. |
|
|
- Saves the SRT file to disk. |
|
|
- Returns the SRT content as a string (to be sent by the endpoint). |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
segments : List[Dict] |
|
|
List of dictionaries containing: |
|
|
- "start": float (start time in seconds) |
|
|
- "end": float (end time in seconds) |
|
|
- "speaker": dict with "identity" |
|
|
- "transcription": str |
|
|
sha1 : str |
|
|
Identifier used to locate the target media folder. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
Full SRT file content as a string. |
|
|
""" |
|
|
|
|
|
|
|
|
video_root = MEDIA_ROOT / sha1 |
|
|
video_root.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
srt_dir = video_root / "initial_srt" |
|
|
srt_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
try: |
|
|
for old_srt in srt_dir.glob("*.srt"): |
|
|
old_srt.unlink() |
|
|
except Exception as exc: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to delete old SRT files: {exc}") |
|
|
|
|
|
|
|
|
final_path = srt_dir / "initial.srt" |
|
|
|
|
|
|
|
|
srt_lines = [] |
|
|
|
|
|
for i, seg in enumerate(segments, start=1): |
|
|
start = seg.get("start", 0.0) |
|
|
end = seg.get("end", 0.0) |
|
|
transcription = seg.get("transcription", "").strip() |
|
|
|
|
|
speaker_info = seg.get("speaker", {}) |
|
|
speaker = speaker_info.get("identity", "Unknown") |
|
|
|
|
|
text = f"[{speaker}]: {transcription}" if speaker else transcription |
|
|
|
|
|
entry = ( |
|
|
f"{i}\n" |
|
|
f"{_fmt_srt_time(start)} --> {_fmt_srt_time(end)}\n" |
|
|
f"{text}\n" |
|
|
) |
|
|
srt_lines.append(entry) |
|
|
|
|
|
|
|
|
srt_content = "\n".join(srt_lines) |
|
|
|
|
|
|
|
|
try: |
|
|
with final_path.open("w", encoding="utf-8-sig") as f: |
|
|
f.write(srt_content) |
|
|
except Exception as exc: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to write SRT file: {exc}") |
|
|
|
|
|
return srt_content |
|
|
|
|
|
def pipeline_preprocessing_vision(video_path: str, face_col): |
|
|
""" |
|
|
Pipeline que toma un video y realiza todo el preprocesamiento del video de la parte de vision. |
|
|
""" |
|
|
|
|
|
print(f"Procesando video para visión: {video_path}") |
|
|
|
|
|
print("----------------------") |
|
|
print(face_col) |
|
|
|
|
|
print("Extrayendo escenas...") |
|
|
threshold: float = 30.0 |
|
|
offset_frames: int = 240 |
|
|
crop_ratio: float = 0.1 |
|
|
result_extract_scenes = extract_scenes(video_path, threshold, offset_frames, crop_ratio) |
|
|
print(result_extract_scenes) |
|
|
|
|
|
escenas = result_extract_scenes[0] if len(result_extract_scenes) > 0 else [] |
|
|
escenas_paths = [f["image"] for f in escenas] |
|
|
print(escenas_paths) |
|
|
info_escenas = result_extract_scenes[1] if len(result_extract_scenes) > 1 else [] |
|
|
print(info_escenas) |
|
|
|
|
|
print("Extrayendo imagenes por segundo...") |
|
|
result_extract_per_second = keyframes_every_second_extraction(video_path) |
|
|
|
|
|
images_per_second = result_extract_per_second[0] if len(result_extract_per_second) > 0 else [] |
|
|
images_per_second_paths = [f["image"] for f in images_per_second] |
|
|
info_images_per_second = result_extract_per_second[1] if len(result_extract_per_second) > 1 else [] |
|
|
|
|
|
print("Aumentamos la información de las escenas viendo quién aparece en cada escena y detectando OCR...") |
|
|
info_escenas_completa = [] |
|
|
for imagen_escena, info_escena in zip(escenas_paths, info_escenas): |
|
|
result_add_ocr_and_faces = add_ocr_and_faces(imagen_escena, info_escena, face_col) |
|
|
info_escenas_completa.append(result_add_ocr_and_faces) |
|
|
|
|
|
print("Aumentamos la información de las imagenes por segundo viendo quién aparece en cada escena y detectando OCR...") |
|
|
info_images_per_second_completa = [] |
|
|
for imagen_segundo, info_segundo in zip(images_per_second_paths, info_images_per_second): |
|
|
result_add_ocr_and_faces =add_ocr_and_faces(imagen_segundo, info_segundo, face_col) |
|
|
info_images_per_second_completa.append(result_add_ocr_and_faces) |
|
|
print(info_escenas_completa) |
|
|
|
|
|
print("Ahora se va a tratar los OCR (se sustituirán ciertas escenas por alguna de las imágenes por segundo si tienen mejor OCR)...") |
|
|
|
|
|
|
|
|
print("Combinando información de escenas e imágenes por segundo...") |
|
|
info_escenas_completa = map_identities_per_second(info_images_per_second_completa, info_escenas_completa) |
|
|
print(info_escenas_completa) |
|
|
|
|
|
print("Ahora se incluyen en los diccionarios de las escenas la descripciones de estas.") |
|
|
for escena_path, info_escena in zip(escenas_paths, info_escenas_completa): |
|
|
descripcion_escena = extract_descripcion_escena(escena_path) |
|
|
lista = ast.literal_eval(descripcion_escena) |
|
|
frase = lista[0] |
|
|
info_escena["descripcion"] = frase |
|
|
del descripcion_escena |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return info_escenas_completa, info_images_per_second_completa |
|
|
|
|
|
def pipeline_preprocessing_audio(video_path: str, voice_col): |
|
|
""" |
|
|
Pipeline que toma un video y realiza todo el preprocesamiento del video de la parte de audio. |
|
|
""" |
|
|
print(f"Procesando video para audio: {video_path}") |
|
|
|
|
|
print("Extrayendo audio del video...") |
|
|
audio_video = extract_audio_from_video(video_path) |
|
|
print(audio_video) |
|
|
|
|
|
print("Diartizando el audio...") |
|
|
diarization_audio = diarize_audio(audio_video) |
|
|
print(diarization_audio) |
|
|
clips_path = diarization_audio[0] |
|
|
print(clips_path) |
|
|
diarization_info = diarization_audio[1] |
|
|
print(diarization_info) |
|
|
|
|
|
print("Transcribiendo el video completo...") |
|
|
full_transcription = transcribe_long_audio(audio_video) |
|
|
print(full_transcription) |
|
|
|
|
|
print("Transcribiendo los clips diartizados...") |
|
|
for clip_path, clip_info in zip(clips_path, diarization_info): |
|
|
clip_transcription = transcribe_short_audio(clip_path) |
|
|
clip_info["transcription"] = clip_transcription |
|
|
|
|
|
print("Calculando los embeddings para cada uno de los clips obtenidos y posteriormente identificar las voces...") |
|
|
for clip_path, clip_info in zip(clips_path, diarization_info): |
|
|
clip_speaker = identificar_veu(clip_path, voice_col) |
|
|
clip_info["speaker"] = clip_speaker |
|
|
|
|
|
return full_transcription, diarization_info |
|
|
|
|
|
@router.post("/generate_initial_srt_and_info", tags=["Initial Transcription Process"]) |
|
|
async def pipeline_video_analysis( |
|
|
sha1: str, |
|
|
token: str = Query(..., description="Token required for authorization") |
|
|
): |
|
|
""" |
|
|
Endpoint that processes a full video identified by its SHA1 folder, performs |
|
|
complete audio-visual preprocessing, and returns an SRT subtitle file. |
|
|
|
|
|
This pipeline integrates: |
|
|
- Vision preprocessing (scene detection, keyframes, OCR, face recognition) |
|
|
- Audio preprocessing (diarization, speech recognition, speaker identity matching) |
|
|
- Identity mapping between vision and audio streams |
|
|
- Final generation of an SRT file describing who speaks and when |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
sha1 : str |
|
|
Identifier corresponding to the folder containing the video and related assets. |
|
|
token : str |
|
|
Security token required for authorization. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
The generated SRT file (as text) containing time-aligned subtitles with |
|
|
speaker identities and transcriptions. |
|
|
""" |
|
|
|
|
|
validate_token(token) |
|
|
|
|
|
|
|
|
file_manager = FileManager(MEDIA_ROOT) |
|
|
sha1_folder = MEDIA_ROOT / sha1 |
|
|
clip_folder = sha1_folder / "clip" |
|
|
|
|
|
if not sha1_folder.exists() or not sha1_folder.is_dir(): |
|
|
raise HTTPException(status_code=404, detail="SHA1 folder not found") |
|
|
|
|
|
if not clip_folder.exists() or not clip_folder.is_dir(): |
|
|
raise HTTPException(status_code=404, detail="Clip folder not found") |
|
|
|
|
|
|
|
|
mp4_files = list(clip_folder.glob("*.mp4")) |
|
|
if not mp4_files: |
|
|
raise HTTPException(status_code=404, detail="No MP4 files found") |
|
|
|
|
|
video_path = mp4_files[0] |
|
|
|
|
|
|
|
|
video_path = MEDIA_ROOT / video_path.relative_to(MEDIA_ROOT) |
|
|
|
|
|
print(f"Processing full video: {video_path}") |
|
|
|
|
|
|
|
|
face_col, voice_col = get_casting(sha1) |
|
|
|
|
|
|
|
|
info_escenas, info_images_per_second = pipeline_preprocessing_vision(video_path, face_col) |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
full_transcription, info_clips = pipeline_preprocessing_audio(video_path, voice_col) |
|
|
|
|
|
|
|
|
info_clips = map_identities_per_second(info_images_per_second, info_clips) |
|
|
|
|
|
|
|
|
srt = generate_srt_from_segments(info_clips, sha1) |
|
|
|
|
|
|
|
|
result_json = { |
|
|
"full_transcription": full_transcription, |
|
|
"info_escenas": info_escenas, |
|
|
"info_clips": info_clips |
|
|
} |
|
|
|
|
|
|
|
|
video_root = MEDIA_ROOT / sha1 |
|
|
video_root.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
srt_dir = video_root / "initial_srt" |
|
|
srt_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
final_path = srt_dir / "initial_info.json" |
|
|
|
|
|
with final_path.open("w", encoding="utf-8") as f: |
|
|
json.dump({ |
|
|
"full_transcription": full_transcription, |
|
|
"info_escenas": info_escenas, |
|
|
"info_clips": info_clips |
|
|
}, f, ensure_ascii=False, indent=4) |
|
|
|
|
|
|
|
|
return {"status": "ok", "message": "Initial SRT and info JSON generated"} |
|
|
|
|
|
def get_initial_info_path(sha1:str): |
|
|
video_root = MEDIA_ROOT / sha1 |
|
|
srt_dir = video_root / "initial_srt" |
|
|
final_path = srt_dir / "initial_info.json" |
|
|
|
|
|
if not video_root.exists() or not video_root.is_dir(): |
|
|
raise HTTPException(status_code=404, detail="SHA1 folder not found") |
|
|
if not srt_dir.exists() or not srt_dir.is_dir(): |
|
|
raise HTTPException(status_code=404, detail="initial_srt folder not found") |
|
|
if not final_path.exists() or not final_path.is_file(): |
|
|
raise HTTPException(status_code=404, detail="initial_info JSON not found") |
|
|
|
|
|
return final_path |
|
|
|
|
|
def get_initial_srt_path(sha1:str): |
|
|
video_root = MEDIA_ROOT / sha1 |
|
|
srt_dir = video_root / "initial_srt" |
|
|
final_path = srt_dir / "initial.srt" |
|
|
|
|
|
if not video_root.exists() or not video_root.is_dir(): |
|
|
raise HTTPException(status_code=404, detail="SHA1 folder not found") |
|
|
if not srt_dir.exists() or not srt_dir.is_dir(): |
|
|
raise HTTPException(status_code=404, detail="initial_srt folder not found") |
|
|
if not final_path.exists() or not final_path.is_file(): |
|
|
raise HTTPException(status_code=404, detail="initial.srt SRT not found") |
|
|
|
|
|
return final_path |
|
|
|
|
|
@router.get("/download_initial_srt", tags=["Initial Transcription Process"]) |
|
|
def download_initial_srt( |
|
|
sha1: str, |
|
|
token: str = Query(..., description="Token required for authorization") |
|
|
): |
|
|
""" |
|
|
Download the cast CSV for a specific video identified by its SHA-1. |
|
|
The CSV is expected under: |
|
|
/data/media/<sha1>/cast/cast.csv |
|
|
Steps: |
|
|
- Validate the token. |
|
|
- Ensure /data/media/<sha1> and /cast exist. |
|
|
- Return the CSV as a FileResponse. |
|
|
- Raise 404 if any folder or file is missing. |
|
|
""" |
|
|
validate_token(token) |
|
|
|
|
|
file_path = get_initial_srt_path(sha1) |
|
|
|
|
|
return FileResponse( |
|
|
path=file_path, |
|
|
media_type="text/srt", |
|
|
filename="initial.srt" |
|
|
) |
|
|
|
|
|
@router.get("/download_initial_info", tags=["Initial Transcription Process"]) |
|
|
def download_initial_info( |
|
|
sha1: str, |
|
|
token: str = Query(..., description="Token required for authorization") |
|
|
): |
|
|
""" |
|
|
Download the cast CSV for a specific video identified by its SHA-1. |
|
|
The CSV is expected under: |
|
|
/data/media/<sha1>/cast/cast.csv |
|
|
Steps: |
|
|
- Validate the token. |
|
|
- Ensure /data/media/<sha1> and /cast exist. |
|
|
- Return the CSV as a FileResponse. |
|
|
- Raise 404 if any folder or file is missing. |
|
|
""" |
|
|
validate_token(token) |
|
|
|
|
|
file_path = get_initial_info_path(sha1) |
|
|
|
|
|
return FileResponse( |
|
|
path=file_path, |
|
|
media_type="text/json", |
|
|
filename="initial_info.json" |
|
|
) |