engine / refinement /reflection.py
VeuReu's picture
Upload 10 files
8134888 verified
import os
import csv
import json
import logging
import shutil
import sqlite3
from pathlib import Path
from typing import TypedDict, Annotated, List, Dict, Union
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from operator import itemgetter
# --- Configuraci贸n y Herramientas ---
# Directorios de trabajo
BASE_DIR = Path(__file__).resolve().parent
# Detectar la ra铆z del repositorio de forma robusta buscando el directorio que
# contiene la carpeta "demo". Esto evita depender de una profundidad fija de
# parents que puede variar entre el entorno local y el contenedor (/app).
candidate_root = BASE_DIR
REPO_ROOT = BASE_DIR.parent
for parent in [BASE_DIR, *BASE_DIR.parents]:
if (parent / "demo").exists():
REPO_ROOT = parent
break
DEMO_DIR = REPO_ROOT / "demo"
DEMO_TEMP_DIR = DEMO_DIR / "temp"
DEMO_DATA_DIR = DEMO_DIR / "data"
# Directorio temporal y de logs para el m贸dulo de reflexi贸n. En el contenedor
# de Hugging Face, /app suele ser de solo lectura, mientras que /data es
# escribible. Usamos /data/reflection para evitar errores de permisos.
TEMP_DIR = Path("/data/reflection")
TEMP_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = TEMP_DIR / "reflection.log"
# Configurar el logging
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s: %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_FILE, encoding="utf-8")
],
)
logger = logging.getLogger(__name__)
# Aseg煤rate de configurar tu API Key.
# En un entorno real, usa os.environ["OPENAI_API_KEY"]
# Aqu铆 usamos un placeholder para la demostraci贸n.
if "OPENAI_API_KEY" not in os.environ:
logger.warning("OPENAI_API_KEY no est谩 configurada. Usando un placeholder.")
os.environ["OPENAI_API_KEY"] = "sk-..."
# Inicializar LLM (se usa GPT-4o por su capacidad de razonamiento)
# En producci贸n, considera un modelo que soporte tus tokens y latencia requeridas.
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
# --- Ficheros de Ejemplo ---
# Fichero SRT inicial (Narrador)
INITIAL_SRT_CONTENT = """
1
00:00:00,000 --> 00:00:05,340
[Sandra] Per貌 de veritat crec que aquest projecte canviar脿 la nostra nota final.
2
00:00:04,340 --> 00:00:05,790
[Luc铆a] Hem de donar-ho tot.
3
00:00:05,790 --> 00:00:08,790
[Sandra] Ho s茅, ho s茅.
4
00:00:08,000 --> 00:00:10,000
(AD) De sobte, s贸n al parc.
5
00:00:10,000 --> 00:00:14,000
(AD) Ara tallen menjar i fan una amanida a una cuina.
"""
# Fichero JSON de contexto (ejemplo de la respuesta anterior, pero simplificado para el Narrador)
CONTEXT_JSON_CONTENT = """
{
"segments": [
{"id": 1, "start": "00:00:00,000", "end": "00:00:05,340", "type": "dialog", "text": "[Sandra] Per貌 de veritat crec que aquest projecte canviar脿 la nostra nota final."},
{"id": 2, "start": "00:00:04,340", "end": "00:00:05,790", "type": "dialog", "text": "[Luc铆a] Hem de donar-ho tot."},
{"id": 3, "start": "00:00:05,790", "end": "00:00:08,790", "type": "dialog", "text": "[Sandra] Ho s茅, ho s茅."},
{"id": 4, "start": "00:00:08,000", "end": "00:00:10,000", "type": "visual_context", "text": "Cambio de escena a un parque. Personajes caminando."},
{"id": 5, "start": "00:00:10,000", "end": "00:00:14,000", "type": "visual_context", "text": "Escena en una cocina. Los personajes est谩n cortando vegetales y haciendo una ensalada."}
]
}
"""
# Fichero de Reglas UNE (Norma T茅cnica para el Cr铆tico)
# Nota: Aqu铆 se usa un resumen de las reglas pertinentes para un LLM.
UNE_RULES = """
### Reglas UNE de Audiodescripci贸n (Para el Cr铆tico)
1. **Objetividad y Foco Visual:** La descripci贸n debe ser puramente objetiva, describiendo solo lo que se ve. Debe priorizar la acci贸n y los elementos relevantes (personajes, objetos, localizaci贸n).
2. **Tiempo y Espacio (Sincronizaci贸n):** Las audiodescripciones (AD) deben insertarse en los silencios del di谩logo. El tiempo de la AD (entre START y END) debe ser suficiente para narrar el contenido sin solaparse con el di谩logo o la m煤sica importante.
3. **Concisi贸n y Claridad:** Usar lenguaje simple y conciso. Evitar redundancias y juicios de valor.
4. **Formato:** Cada segmento de AD debe tener un formato SRT v谩lido, incluyendo el marcador (AD) al principio de la l铆nea de texto.
5. **Utilidad:** Cada segmento de AD debe ser 煤til para la comprensi贸n y nunca ser redundante. En caso de repetir algo ya explicado antes, mejor no decir nada.
"""
EVALUATION_CRITERIA = [
"Precisi贸 Descriptiva",
"Sincronitzaci贸 Temporal",
"Claredat i Concisi贸",
"Inclusi贸 de Di脿leg/So",
"Contextualitzaci贸",
"Flux i Ritme de la Narraci贸",
]
CRITERIA_WEIGHTS = {
"Precisi贸 Descriptiva": 1,
"Sincronitzaci贸 Temporal": 4,
"Claredat i Concisi贸": 1,
"Inclusi贸 de Di脿leg/So": 1,
"Contextualitzaci贸": 1,
"Flux i Ritme de la Narraci贸": 1,
}
def setup_files(initial_srt_content: str, context_json_content: str):
"""Crea los ficheros iniciales necesarios en el sistema de archivos local."""
(TEMP_DIR / "une_ad_0.srt").write_text(initial_srt_content, encoding="utf-8")
(TEMP_DIR / "json_ad.json").write_text(context_json_content, encoding="utf-8")
logger.info("Ficheros iniciales 'une_ad_0.srt' y 'json_ad.json' creados.")
def _load_audiodescription_from_db(sha1sum: str, version: str) -> tuple[str, str]:
"""Carga une_ad y info_ad desde demo/temp/audiodescriptions.db.
Si info_ad no existeix o 茅s nul, es fa servir CONTEXT_JSON_CONTENT com a
fallback per no trencar el pipeline.
"""
db_path = DEMO_TEMP_DIR / "audiodescriptions.db"
if not db_path.exists():
raise FileNotFoundError(f"No s'ha trobat {db_path}")
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute(
"SELECT * FROM audiodescriptions WHERE sha1sum=? AND version=?",
(sha1sum, version),
)
row = cur.fetchone()
if row is None:
raise ValueError(
f"No s'ha trobat cap registre a audiodescriptions.db per sha1sum={sha1sum}, version={version}"
)
une_ad = row["une_ad"] or ""
# info_ad 茅s una columna de text amb el JSON de context (abans json_ad/preprocess.json)
info_ad_text = None
if "info_ad" in row.keys():
info_ad_text = row["info_ad"]
elif "json_ad" in row.keys(): # compatibilitat enrere
info_ad_text = row["json_ad"]
if not info_ad_text:
logger.warning("info_ad buit a audiodescriptions.db; fent servir CONTEXT_JSON_CONTENT de mostra.")
info_ad_text = CONTEXT_JSON_CONTENT
return une_ad, info_ad_text
finally:
conn.close()
def _write_casting_csv_from_db(sha1sum: str) -> None:
"""Reconstrueix TEMP_DIR/casting.csv a partir de demo/data/casting.db.
El format 茅s simple: cap莽alera "name,description" i una fila per registre.
Si no hi ha BD o registres, el fitxer no es crea i l'agent d'identitat es
saltar脿 autom脿ticament.
"""
db_path = DEMO_DATA_DIR / "casting.db"
if not db_path.exists():
logger.warning("casting.db no trobat; no es generar脿 casting.csv")
return
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute("SELECT name, description FROM casting WHERE sha1sum=?", (sha1sum,))
rows = cur.fetchall()
if not rows:
logger.info("Sense registres de casting per a sha1sum=%s", sha1sum)
return
out_path = TEMP_DIR / "casting.csv"
with out_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["name", "description"])
for r in rows:
writer.writerow([r["name"], r["description"]])
logger.info("casting.csv generat a %s amb %d registres", out_path, len(rows))
finally:
conn.close()
def _write_scenarios_csv_from_db(sha1sum: str) -> None:
"""Reconstrueix TEMP_DIR/scenarios.csv a partir de demo/data/scenarios.db.
Format: cap莽alera "name,description" i una fila per escenari.
"""
db_path = DEMO_DATA_DIR / "scenarios.db"
if not db_path.exists():
logger.warning("scenarios.db no trobat; no es generar脿 scenarios.csv")
return
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute("SELECT name, description FROM scenarios WHERE sha1sum=?", (sha1sum,))
rows = cur.fetchall()
if not rows:
logger.info("Sense registres d'escenaris per a sha1sum=%s", sha1sum)
return
out_path = TEMP_DIR / "scenarios.csv"
with out_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["name", "description"])
for r in rows:
writer.writerow([r["name"], r["description"]])
logger.info("scenarios.csv generat a %s amb %d registres", out_path, len(rows))
finally:
conn.close()
# --- Utilidades ---
def _strip_markdown_fences(content: str) -> str:
"""Elimina fences ```...``` alrededor de una respuesta JSON si existen."""
text = content.strip()
if text.startswith("```"):
lines = text.splitlines()
# descartar primera l铆nea con ``` o ```json
lines = lines[1:]
# eliminar el cierre ``` (pueden existir varias l铆neas en blanco finales)
while lines and lines[-1].strip() == "```":
lines.pop()
text = "\n".join(lines).strip()
return text
def generate_evaluation_report(srt_content: str, iteration: int) -> tuple[float, float, Path]:
"""Solicita al LLM una avaluaci贸 estructurada i guarda'n el CSV."""
criteria_formatted = "\n".join(f"- {name}" for name in EVALUATION_CRITERIA)
prompt = (
"Actua com un auditor UNE. Avalua l'SRT generat, puntuant cada caracter铆stica de 0 a 7 "
"segons la qualitat observada. D贸nega justificaci贸 breve per貌 concreta per a cada cas. "
"Les caracter铆stiques obligat貌ries s贸n:\n"
f"{criteria_formatted}\n"
"Retorna 脷NICAMENT un array JSON d'objectes amb les claus: "
"'caracteristica', 'valoracio' (nombre enter de 0 a 7) i 'justificacio'."
)
response = llm.invoke(
[
SystemMessage(content=prompt),
HumanMessage(
content=(
"# SRT AVALUAT\n"
f"{srt_content}\n\n"
"Assegura't de complir el format indicat."
)
),
]
)
cleaned = _strip_markdown_fences(response.content)
try:
data = json.loads(cleaned)
if not isinstance(data, list):
raise ValueError("La resposta no 茅s una llista.")
except Exception as exc:
logger.error(
"Error al generar l'avaluaci贸 estructurada: %s. Resposta original: %s",
exc,
response.content,
)
data = [
{
"caracteristica": "Avaluaci贸 fallida",
"valoracio": 1,
"justificacio": "No s'ha pogut obtenir l'avaluaci贸 del LLM.",
}
]
eval_path = TEMP_DIR / f"eval_{iteration}.csv"
with eval_path.open("w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Caracteristica", "Valoracio (0-7)", "Justificacio"])
for item in data:
writer.writerow(
[
item.get("caracteristica", ""),
item.get("valoracio", 0),
item.get("justificacio", ""),
]
)
scores = []
weighted_sum = 0.0
total_weight = 0.0
for entry in data:
if not isinstance(entry, dict):
continue
try:
score = float(entry.get("valoracio", 0))
except (TypeError, ValueError):
score = 0.0
scores.append(score)
weight = CRITERIA_WEIGHTS.get(entry.get("caracteristica", ""), 1)
weighted_sum += score * weight
total_weight += weight
mean_score = sum(scores) / len(scores) if scores else 0.0
weighted_mean = weighted_sum / total_weight if total_weight else mean_score
return mean_score, weighted_mean, eval_path
# --- Definici贸n del Estado de la Gr谩fica (StateGraph) ---
class ReflectionState(TypedDict):
"""Representa el estado del bucle de reflexi贸n."""
iteration: int # Ciclo actual (empezando en 0)
current_srt_path: str # Ruta al archivo SRT actual (e.g., une_ad_0.srt, une_ad_1.srt)
critic_report: Dict[str, Union[float, str]] # 脷ltimo informe del cr铆tico (puntuaci贸n y texto)
history: List[SystemMessage] # Historial de mensajes entre agentes
evaluation_mean: float
best_iteration: int
best_weighted_mean: float
best_srt_path: str
best_eval_path: str
# --- Nodos/Agentes de la Gr谩fica ---
def narrator_agent(state: ReflectionState):
"""
Agente que genera o reescribe el SRT.
- En el ciclo 0, genera el SRT inicial.
- En ciclos > 0, reescribe el SRT bas谩ndose en el critic_report.
"""
iteration = state["iteration"]
critic_report = state["critic_report"]
history = state["history"]
# Cargar contexto y 煤ltimo SRT
json_context = (TEMP_DIR / "json_ad.json").read_text(encoding="utf-8")
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8")
# 1. Definir el prompt
if iteration == 0:
# Tarea inicial (aunque en este caso ya se proporciona une_ad_0.srt)
# Aqu铆 se simula la generaci贸n inicial.
prompt = (
"Ets un Narrador expert en Audiodescripci贸 (AD). La teva tasca inicial 茅s generar "
"un fitxer SRT d'audiodescripcions basat en el JSON de context visual. "
"TOT I AIX脥, per a aquesta primera iteraci贸, l'SRT ja s'ha generat. "
"Simplement retorna el contingut de 'une_ad_0.srt' com si fos la teva sortida. "
"Assegura't que totes les audiodescripcions estiguin en catal脿 i que cadascuna pugui ser locutada "
"dins del temps disponible (utilitza un m脿xim aproximat d'11 car脿cters per segon). Si el tram de temps "
"茅s massa curt (<1.5s), combina'l amb el bloc d'AD m茅s proper i ajusta els timestamps perqu猫 la narraci贸 sigui fluida. "
"Evita redund脿ncies: no repeteixis informaci贸 ja descrita en segments d'AD anteriors o al di脿leg, i elimina qualsevol detall que no sigui essencial."
)
output_srt = current_srt
reflection_text = "Generaci贸n inicial. No hay reflexi贸n."
else:
# Tarea de reflexi贸n
prompt = (
"Ets un Narrador expert en Audiodescripci贸 (AD). Has rebut una cr铆tica sobre la teva 煤ltima versi贸 de l'SRT. "
"La teva tasca 茅s REESCRIURE el contingut d'audiodescripci贸 (l铆nies amb '(AD)') del fitxer SRT, "
"assegurant que sigui coherent amb el JSON de context i, sobretot, que CORREGEIXIS TOTS els problemes "
"mencionats a l'Informe Cr铆tic adjunt. Mant茅n intactes els di脿legs (l铆nies amb [Nom]) i escriu totes les audiodescripcions en catal脿 natural. "
"Garanteix que cada bloc d'AD pugui ser locutat dins del seu interval temporal disponible considerant un m脿xim d'11 car脿cters per segon. "
"Si l'interval 茅s massa curt (<1.5s), fusiona'l amb el bloc d'AD anterior o posterior m茅s proper i ajusta els timestamps perqu猫 quedin cont铆nues. "
"Prefereix frases concises i accionables, prioritzant la informaci贸 visual essencial, i elimina redund脿ncies amb AD anteriors o amb els di脿legs."
)
# Concatenar la entrada para el LLM
input_content = f"""
# INFORME CR脥TICO
Porcentaje de Fiabilidad Anterior: {critic_report.get('reliability_percentage')}
Cr铆tica Cualitativa: {critic_report.get('qualitative_critique')}
# JSON DE CONTEXTO VISUAL (Gu铆a para la AD)
{json_context}
# 脷LTIMO ARCHIVO SRT GENERADO (une_ad_{iteration-1}.srt)
{current_srt}
REGLAS: Tu respuesta debe ser *SOLAMENTE* el contenido completo del nuevo archivo SRT (incluyendo di谩logos), sin ning煤n comentario o explicaci贸n adicional.
"""
# Llamada al LLM
response = llm.invoke(
[
SystemMessage(content=prompt),
HumanMessage(content=input_content)
]
)
output_srt = response.content
reflection_text = f"Reescrito en base al informe cr铆tico: {critic_report.get('qualitative_critique', 'N/A')}"
# 2. Guardar la nueva salida
new_srt_path = TEMP_DIR / f"une_ad_{iteration}.srt"
new_srt_path.write_text(output_srt, encoding="utf-8")
# 3. Guardar el pensamiento (reflection_text)
(TEMP_DIR / f"thinking_{iteration}.txt").write_text(reflection_text, encoding="utf-8")
logger.info(f"Narrador: Generada la versi贸n {iteration} del SRT en '{new_srt_path}'.")
# 4. Actualizar el estado
new_history = history + [AIMessage(content=f"Narrador v{iteration} completado. Raz贸n de reflexi贸n: {reflection_text}")]
return {
"iteration": iteration,
"current_srt_path": str(new_srt_path),
"history": new_history,
"evaluation_mean": state.get("evaluation_mean", 0.0),
"best_iteration": state.get("best_iteration", -1),
"best_weighted_mean": state.get("best_weighted_mean", 0.0),
"best_srt_path": state.get("best_srt_path", str(new_srt_path)),
"best_eval_path": state.get("best_eval_path", str(TEMP_DIR / f"eval_{iteration}.csv")),
}
def critic_agent(state: ReflectionState):
"""
Agente que eval煤a la calidad del SRT generado por el Narrador bas谩ndose en las Reglas UNE.
Devuelve una puntuaci贸n y una cr铆tica cualitativa.
"""
iteration = state["iteration"]
history = state["history"]
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8")
prompt = (
"Ets un Cr铆tic d'Audiodescripci贸 molt estricte. La teva tasca 茅s avaluar l'SRT adjunt "
"煤nicament segons les Regles UNE proporcionades. L'avaluaci贸 ha de ser doble: "
"1. **Num猫rica**: Un percentatge de fiabilitat (ex. 85.5) de 0 a 100%. "
"2. **Qualitativa**: Una cr铆tica constructiva sobre les principals mancances de les AD respecte a les regles. "
"Has de ser EXTREMADAMENT estricte amb la sincronitzaci贸 (sense solapament amb el di脿leg), "
"amb l'adequaci贸 temporal (velocitat m脿xima recomanada d'11 car脿cters per segon) i amb l'abs猫ncia de redund脿ncies. "
"Comprova tamb茅 que totes les audiodescripcions estan escrites en catal脿 natural."
)
input_content = f"""
# REGLAS UNE DE AUDIODESCRIPCI脫N:
{UNE_RULES}
# ARCHIVO SRT A EVALUAR (une_ad_{iteration}.srt):
{current_srt}
REGLAS DE RESPUESTA:
Tu respuesta debe ser *SOLAMENTE* un objeto JSON con dos claves:
1. "reliability_percentage": (float) El porcentaje de fiabilidad.
2. "qualitative_critique": (string) La cr铆tica cualitativa y sugerencias de mejora.
Ejemplo de respuesta: {{"reliability_percentage": 75.0, "qualitative_critique": "El segmento 4 se solapa 0.34s con el di谩logo de Sandra. El segmento 5 es demasiado gen茅rico y no describe bien la acci贸n."}}
"""
# Llamada al LLM
response = llm.invoke(
[
SystemMessage(content=prompt),
HumanMessage(content=input_content)
]
)
# Intentar parsear la respuesta del LLM (puede fallar, por eso se usa un try/except)
try:
cleaned_response = _strip_markdown_fences(response.content)
report = json.loads(cleaned_response)
if not isinstance(report, dict) or 'reliability_percentage' not in report:
raise ValueError("Estructura JSON incorrecta.")
except Exception as e:
logger.error(f"Error al parsear el JSON del Cr铆tico: {e}. Respuesta: {response.content}")
report = {"reliability_percentage": 1.0, "qualitative_critique": "El Cr铆tico no devolvi贸 un JSON v谩lido. Reintentar."}
logger.info(f"Cr铆tico: Evaluaci贸n completada. Fiabilidad: {report.get('reliability_percentage')}%.")
mean_score, weighted_mean, eval_path = generate_evaluation_report(current_srt, iteration)
thinking_path = TEMP_DIR / f"thinking_{iteration}.txt"
if thinking_path.exists():
previous_text = thinking_path.read_text(encoding="utf-8")
thinking_path.write_text(
(
f"{previous_text}\n\nMitjana simple d'avaluaci贸: {mean_score:.2f} / 7"
f"\nMitjana ponderada d'avaluaci贸: {weighted_mean:.2f} / 7"
),
encoding="utf-8",
)
best_iteration = state.get("best_iteration", -1)
best_weighted_mean = state.get("best_weighted_mean", -1.0)
best_srt_path = state.get("best_srt_path", state["current_srt_path"])
best_eval_path = state.get("best_eval_path", str(eval_path))
if weighted_mean > best_weighted_mean:
best_iteration = iteration
best_weighted_mean = weighted_mean
best_srt_path = state["current_srt_path"]
best_eval_path = str(eval_path)
new_history = history + [
AIMessage(
content=(
"Cr铆tico v{iter} completado. Fiabilidad: {reliab}%. "
"Mitjana simple: {mean:.2f}/7. Mitjana ponderada: {wmean:.2f}/7"
).format(
iter=iteration,
reliab=report.get("reliability_percentage"),
mean=mean_score,
wmean=weighted_mean,
)
)
]
return {
"iteration": iteration + 1,
"critic_report": report,
"history": new_history,
"evaluation_mean": weighted_mean,
"best_iteration": best_iteration,
"best_weighted_mean": best_weighted_mean,
"best_srt_path": best_srt_path,
"best_eval_path": best_eval_path,
}
# --- Condici贸n de Salida del Bucle ---
def should_continue(state: ReflectionState) -> str:
"""
Funci贸n de chequeo que decide si continuar iterando o finalizar.
"""
MAX_ITERATIONS = 5 # N煤mero m谩ximo de ciclos
MIN_AVERAGE_SCORE = 6.0 # Umbral de calidad sobre 7
iteration = state["iteration"]
mean_score = state.get("evaluation_mean", 0.0)
if mean_score >= MIN_AVERAGE_SCORE:
logger.info(f"FIN: Mitjana ponderada d'avaluaci贸 assolida ({mean_score:.2f} >= {MIN_AVERAGE_SCORE}).")
return "end"
if iteration >= MAX_ITERATIONS:
logger.info(f"FIN: S'ha assolit el m脿xim d'iteracions ({iteration} / {MAX_ITERATIONS}).")
return "end"
logger.info(f"CONTINUAR: Iteraci贸 {iteration} / {MAX_ITERATIONS}. Mitjana ponderada actual: {mean_score:.2f} / 7.")
return "continue"
# --- Construcci贸n de la Gr谩fica ---
# 1. Configurar el estado inicial
initial_state: ReflectionState = {
"iteration": 0,
"current_srt_path": str(TEMP_DIR / "une_ad_0.srt"),
"critic_report": {"reliability_percentage": 0.0, "qualitative_critique": "Inicializando el proceso."},
"history": [],
"evaluation_mean": 0.0,
"best_iteration": -1,
"best_weighted_mean": -1.0,
"best_srt_path": str(TEMP_DIR / "une_ad_0.srt"),
"best_eval_path": str(TEMP_DIR / "eval_0.csv"),
}
# 2. Definir la gr谩fica
workflow = StateGraph(ReflectionState)
# Nodos
workflow.add_node("narrator", narrator_agent)
workflow.add_node("critic", critic_agent)
# Estructura del bucle: Narrator -> Critic -> Check
workflow.set_entry_point("narrator")
workflow.add_edge("narrator", "critic")
# Condici贸n (puente de ramificaci贸n)
workflow.add_conditional_edges(
"critic",
should_continue,
{
"continue": "narrator", # Si no se cumple el umbral/ciclo, vuelve al narrador
"end": END # Si se cumple, termina
}
)
# Compilar la gr谩fica
app = workflow.compile()
def run_reflection_pipeline(srt_content: str, context_json: str | None = None) -> str:
"""Executa el grafo de reflexi贸 sobre un SRT i retorna el SRT final.
- Escriu ``une_ad_0.srt`` i ``json_ad.json`` a ``TEMP_DIR``.
- Inicialitza l'estat del bucle de reflexi贸.
- Executa ``app.invoke(initial_state)``.
- Copia el millor SRT i eval als fitxers finals i retorna el contingut del
millor SRT.
"""
# Preparar fitxers d'entrada per als agents
setup_files(srt_content, context_json or CONTEXT_JSON_CONTENT)
logger.info("--- Comen莽ant el bucle de reflexi贸 (run_reflection_pipeline) ---")
# Executar la gr脿fica
final_state = app.invoke(initial_state)
best_srt_path = Path(final_state["best_srt_path"])
best_eval_path = Path(final_state["best_eval_path"])
# Copiar els millors resultats a fitxers finals est脿ndard
final_srt_path = TEMP_DIR / "une_ad.srt"
final_eval_path = TEMP_DIR / "eval.csv"
try:
shutil.copy(best_srt_path, final_srt_path)
shutil.copy(best_eval_path, final_eval_path)
except Exception as exc: # pragma: no cover - errors de FS no cr铆tics
logger.warning("No s'han pogut copiar els fitxers finals de reflexi贸: %s", exc)
return best_srt_path.read_text(encoding="utf-8")
def refine_srt_with_reflection(srt_content: str, *, context_json: str | None = None) -> str:
"""Refina un SRT usant el pipeline de reflexi贸 definit en aquest m貌dul.
Args:
srt_content: Contingut SRT inicial (cadena completa).
context_json: JSON de context visual (opcional). Si no es proporciona,
s'utilitza `CONTEXT_JSON_CONTENT` de mostra.
Returns:
Contingut del SRT final generat pel sistema de reflexi贸 multiagent.
"""
return run_reflection_pipeline(srt_content, context_json or CONTEXT_JSON_CONTENT)
def refine_video_with_reflection(sha1sum: str, version: str) -> str:
"""Refina un v铆deo identificat per (sha1sum, version) usant les BDs de demo.
Flux:
1. Llegeix une_ad i info_ad des de demo/temp/audiodescriptions.db.
2. Regenera TEMP_DIR/casting.csv a partir de demo/data/casting.db.
3. Regenera TEMP_DIR/scenarios.csv a partir de demo/data/scenarios.db.
4. Executa el pipeline complet de reflexi贸 (run_reflection_pipeline).
Retorna el contingut del SRT final refinat.
"""
une_ad, info_ad = _load_audiodescription_from_db(sha1sum, version)
# Preparar fitxers necessaris perqu猫 els agents puguin treballar
setup_files(une_ad, info_ad)
_write_casting_csv_from_db(sha1sum)
_write_scenarios_csv_from_db(sha1sum)
return run_reflection_pipeline(une_ad, info_ad)
# --- Ejecuci贸n Principal ---
if __name__ == "__main__":
# Exemple d'煤s directe del m貌dul
final_srt = refine_srt_with_reflection(INITIAL_SRT_CONTENT, context_json=CONTEXT_JSON_CONTENT)
print("\n--- Contingut del SRT Final ---")
print(final_srt)