|
|
|
|
|
|
|
|
import os |
|
|
import io |
|
|
import base64 |
|
|
import zipfile |
|
|
import requests |
|
|
from typing import Iterable, Dict, Any |
|
|
|
|
|
|
|
|
class APIClient: |
|
|
""" |
|
|
High-level client for communicating with the Veureu Engine API. |
|
|
|
|
|
Endpoints managed: |
|
|
POST /jobs |
|
|
→ {"job_id": "..."} |
|
|
|
|
|
GET /jobs/{job_id}/status |
|
|
→ {"status": "queued|processing|done|failed", ...} |
|
|
|
|
|
GET /jobs/{job_id}/result |
|
|
→ JobResult such as {"book": {...}, "une": {...}, ...} |
|
|
|
|
|
This class is used by the Streamlit UI to submit videos, poll job status, |
|
|
retrieve results, generate audio, and interact with the TTS and casting services. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
base_url: str, |
|
|
use_mock: bool = False, |
|
|
data_dir: str | None = None, |
|
|
token: str | None = None, |
|
|
timeout: int = 180 |
|
|
): |
|
|
""" |
|
|
Initialize the API client. |
|
|
|
|
|
Args: |
|
|
base_url: Base URL of the engine or TTS service. |
|
|
use_mock: Whether to respond with mock data instead of real API calls. |
|
|
data_dir: Optional data folder for local mock/test files. |
|
|
token: Authentication token (fallback: API_SHARED_TOKEN env var). |
|
|
timeout: Timeout in seconds for requests. |
|
|
""" |
|
|
self.base_url = base_url.rstrip("/") |
|
|
self.tts_url = self.base_url |
|
|
self.use_mock = use_mock |
|
|
self.data_dir = data_dir |
|
|
self.timeout = timeout |
|
|
self.session = requests.Session() |
|
|
|
|
|
|
|
|
token = token or os.getenv("API_SHARED_TOKEN") |
|
|
if token: |
|
|
self.session.headers.update({"Authorization": f"Bearer {token}"}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: |
|
|
"""Submit a video and processing modes to /jobs.""" |
|
|
url = f"{self.base_url}/jobs" |
|
|
files = { |
|
|
"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream") |
|
|
} |
|
|
data = {"modes": ",".join(modes)} |
|
|
|
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
|
|
|
def _get_status(self, job_id: str) -> Dict[str, Any]: |
|
|
"""Query job status.""" |
|
|
url = f"{self.base_url}/jobs/{job_id}/status" |
|
|
r = self.session.get(url, timeout=self.timeout) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
|
|
|
def _get_result(self, job_id: str) -> Dict[str, Any]: |
|
|
"""Retrieve job result.""" |
|
|
url = f"{self.base_url}/jobs/{job_id}/result" |
|
|
r = self.session.get(url, timeout=self.timeout) |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: |
|
|
"""Return {"job_id": "..."} either from mock or engine.""" |
|
|
if self.use_mock: |
|
|
return {"job_id": "mock-123"} |
|
|
return self._post_jobs(video_path, modes) |
|
|
|
|
|
def get_job(self, job_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Returns UI-friendly job data: |
|
|
{"status": "done", "results": {"book": {...}, "une": {...}}} |
|
|
|
|
|
Maps engine responses into the expected 'results' format. |
|
|
""" |
|
|
if self.use_mock: |
|
|
return { |
|
|
"status": "done", |
|
|
"results": { |
|
|
"book": {"text": "Example text (book)", "mp3_bytes": b""}, |
|
|
"une": { |
|
|
"srt": "1\n00:00:00,000 --> 00:00:01,000\nExample UNE\n", |
|
|
"mp3_bytes": b"" |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
status_data = self._get_status(job_id) |
|
|
|
|
|
|
|
|
if status_data.get("status") in {"queued", "processing"}: |
|
|
return {"status": status_data.get("status", "queued")} |
|
|
|
|
|
raw_result = self._get_result(job_id) |
|
|
results = {} |
|
|
|
|
|
|
|
|
if "book" in raw_result: |
|
|
results["book"] = {"text": raw_result["book"].get("text")} |
|
|
if "une" in raw_result: |
|
|
results["une"] = {"srt": raw_result["une"].get("srt")} |
|
|
|
|
|
|
|
|
for section in ("book", "une"): |
|
|
if section in raw_result: |
|
|
if "characters" in raw_result[section]: |
|
|
results[section]["characters"] = raw_result[section]["characters"] |
|
|
if "metrics" in raw_result[section]: |
|
|
results[section]["metrics"] = raw_result[section]["metrics"] |
|
|
|
|
|
final_status = "done" if results else status_data.get("status", "unknown") |
|
|
return {"status": final_status, "results": results} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def tts_matxa(self, text: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Call the TTS /tts/text endpoint to synthesize short audio. |
|
|
|
|
|
Returns: |
|
|
{"mp3_bytes": b"..."} on success |
|
|
{"error": "..."} on failure |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("TTS service URL not configured.") |
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text" |
|
|
data = {"texto": text, "voice": voice, "formato": "mp3"} |
|
|
|
|
|
try: |
|
|
r = requests.post(url, data=data, timeout=self.timeout) |
|
|
r.raise_for_status() |
|
|
return {"mp3_bytes": r.content} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def rebuild_video_with_ad(self, video_path: str, srt_path: str) -> dict: |
|
|
""" |
|
|
Rebuild a video including audio description (AD) |
|
|
by calling /tts/srt. The server returns a ZIP containing an MP4. |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("TTS service URL not configured.") |
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt" |
|
|
|
|
|
try: |
|
|
files = { |
|
|
"video": (os.path.basename(video_path), open(video_path, "rb"), "video/mp4"), |
|
|
"srt": (os.path.basename(srt_path), open(srt_path, "rb"), "application/x-subrip") |
|
|
} |
|
|
data = {"include_final_mp4": 1} |
|
|
|
|
|
r = requests.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as z: |
|
|
for name in z.namelist(): |
|
|
if name.endswith(".mp4"): |
|
|
return {"video_bytes": z.read(name)} |
|
|
|
|
|
return {"error": "MP4 file not found inside ZIP."} |
|
|
|
|
|
except zipfile.BadZipFile: |
|
|
return {"error": "Invalid ZIP response from server."} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_initial_casting( |
|
|
self, |
|
|
video_path: str = None, |
|
|
video_bytes: bytes = None, |
|
|
video_name: str = None, |
|
|
epsilon: float = 0.5, |
|
|
min_cluster_size: int = 2 |
|
|
) -> dict: |
|
|
""" |
|
|
Calls /create_initial_casting to produce the initial actor/face clustering. |
|
|
|
|
|
Args: |
|
|
video_path: Load video from disk. |
|
|
video_bytes: Provide video already in memory. |
|
|
video_name: Name used if video_bytes is provided. |
|
|
epsilon: DBSCAN epsilon for clustering. |
|
|
min_cluster_size: Minimum number of samples for DBSCAN. |
|
|
""" |
|
|
url = f"{self.base_url}/create_initial_casting" |
|
|
|
|
|
try: |
|
|
|
|
|
if video_bytes: |
|
|
files = {"video": (video_name or "video.mp4", video_bytes, "video/mp4")} |
|
|
elif video_path: |
|
|
with open(video_path, "rb") as f: |
|
|
files = {"video": (os.path.basename(video_path), f.read(), "video/mp4")} |
|
|
else: |
|
|
return {"error": "Either video_path or video_bytes must be provided."} |
|
|
|
|
|
data = { |
|
|
"epsilon": str(epsilon), |
|
|
"min_cluster_size": str(min_cluster_size) |
|
|
} |
|
|
|
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
|
|
|
if r.headers.get("content-type", "").startswith("application/json"): |
|
|
return r.json() |
|
|
|
|
|
return {"ok": True} |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Converts a large text into an SRT-like structure, calls /tts/srt, |
|
|
and extracts 'ad_master.mp3' from the resulting ZIP. |
|
|
|
|
|
Useful for audiobook-like generation. |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("TTS service URL not configured.") |
|
|
|
|
|
|
|
|
srt_content = "" |
|
|
start = 0 |
|
|
|
|
|
for idx, raw_line in enumerate(text_content.strip().split("\n")): |
|
|
line = raw_line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
end = start + 5 |
|
|
|
|
|
def fmt(seconds): |
|
|
h = seconds // 3600 |
|
|
m = (seconds % 3600) // 60 |
|
|
s = seconds % 60 |
|
|
return f"{h:02d}:{m:02d}:{s:02d},000" |
|
|
|
|
|
srt_content += f"{idx+1}\n" |
|
|
srt_content += f"{fmt(start)} --> {fmt(end)}\n" |
|
|
srt_content += f"{line}\n\n" |
|
|
start = end |
|
|
|
|
|
if not srt_content: |
|
|
return {"error": "Provided text is empty or cannot be processed."} |
|
|
|
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt" |
|
|
|
|
|
try: |
|
|
files = {"srt": ("fake_ad.srt", srt_content, "application/x-subrip")} |
|
|
data = {"voice": voice, "ad_format": "mp3"} |
|
|
|
|
|
r = requests.post(url, files=files, data=data, timeout=self.timeout * 5) |
|
|
r.raise_for_status() |
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as z: |
|
|
if "ad_master.mp3" in z.namelist(): |
|
|
return {"mp3_bytes": z.read("ad_master.mp3")} |
|
|
|
|
|
return {"error": "'ad_master.mp3' not found inside ZIP."} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": f"Error calling SRT API: {e}"} |
|
|
except zipfile.BadZipFile: |
|
|
return {"error": "Invalid ZIP response from server."} |
|
|
|
|
|
def tts_long_text(self, text: str, voice: str = "central/grau") -> dict: |
|
|
""" |
|
|
Call /tts/text_long for very long text TTS synthesis. |
|
|
Returns raw MP3 bytes. |
|
|
""" |
|
|
if not self.tts_url: |
|
|
raise ValueError("TTS service URL not configured.") |
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text_long" |
|
|
data = {"texto": text, "voice": voice, "formato": "mp3"} |
|
|
|
|
|
try: |
|
|
r = requests.post(url, data=data, timeout=self.timeout * 10) |
|
|
r.raise_for_status() |
|
|
return {"mp3_bytes": r.content} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"error": str(e)} |
|
|
|