|
|
import os |
|
|
import csv |
|
|
import json |
|
|
import logging |
|
|
import shutil |
|
|
import sqlite3 |
|
|
from pathlib import Path |
|
|
from typing import TypedDict, Annotated, List, Dict, Union |
|
|
from langgraph.graph import StateGraph, END |
|
|
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage |
|
|
from langchain_openai import ChatOpenAI |
|
|
from operator import itemgetter |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
candidate_root = BASE_DIR |
|
|
REPO_ROOT = BASE_DIR.parent |
|
|
for parent in [BASE_DIR, *BASE_DIR.parents]: |
|
|
if (parent / "demo").exists(): |
|
|
REPO_ROOT = parent |
|
|
break |
|
|
|
|
|
DEMO_DIR = REPO_ROOT / "demo" |
|
|
DEMO_TEMP_DIR = DEMO_DIR / "temp" |
|
|
DEMO_DATA_DIR = DEMO_DIR / "data" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TEMP_DIR = Path("/data/reflection") |
|
|
TEMP_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
LOG_FILE = TEMP_DIR / "reflection.log" |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(levelname)s: %(message)s', |
|
|
handlers=[ |
|
|
logging.StreamHandler(), |
|
|
logging.FileHandler(LOG_FILE, encoding="utf-8") |
|
|
], |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "OPENAI_API_KEY" not in os.environ: |
|
|
logger.warning("OPENAI_API_KEY no est谩 configurada. Usando un placeholder.") |
|
|
os.environ["OPENAI_API_KEY"] = "sk-..." |
|
|
|
|
|
|
|
|
|
|
|
llm = ChatOpenAI(model="gpt-4o", temperature=0.3) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INITIAL_SRT_CONTENT = """ |
|
|
1 |
|
|
00:00:00,000 --> 00:00:05,340 |
|
|
[Sandra] Per貌 de veritat crec que aquest projecte canviar脿 la nostra nota final. |
|
|
|
|
|
2 |
|
|
00:00:04,340 --> 00:00:05,790 |
|
|
[Luc铆a] Hem de donar-ho tot. |
|
|
|
|
|
3 |
|
|
00:00:05,790 --> 00:00:08,790 |
|
|
[Sandra] Ho s茅, ho s茅. |
|
|
|
|
|
4 |
|
|
00:00:08,000 --> 00:00:10,000 |
|
|
(AD) De sobte, s贸n al parc. |
|
|
|
|
|
5 |
|
|
00:00:10,000 --> 00:00:14,000 |
|
|
(AD) Ara tallen menjar i fan una amanida a una cuina. |
|
|
""" |
|
|
|
|
|
|
|
|
CONTEXT_JSON_CONTENT = """ |
|
|
{ |
|
|
"segments": [ |
|
|
{"id": 1, "start": "00:00:00,000", "end": "00:00:05,340", "type": "dialog", "text": "[Sandra] Per貌 de veritat crec que aquest projecte canviar脿 la nostra nota final."}, |
|
|
{"id": 2, "start": "00:00:04,340", "end": "00:00:05,790", "type": "dialog", "text": "[Luc铆a] Hem de donar-ho tot."}, |
|
|
{"id": 3, "start": "00:00:05,790", "end": "00:00:08,790", "type": "dialog", "text": "[Sandra] Ho s茅, ho s茅."}, |
|
|
{"id": 4, "start": "00:00:08,000", "end": "00:00:10,000", "type": "visual_context", "text": "Cambio de escena a un parque. Personajes caminando."}, |
|
|
{"id": 5, "start": "00:00:10,000", "end": "00:00:14,000", "type": "visual_context", "text": "Escena en una cocina. Los personajes est谩n cortando vegetales y haciendo una ensalada."} |
|
|
] |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
UNE_RULES = """ |
|
|
### Reglas UNE de Audiodescripci贸n (Para el Cr铆tico) |
|
|
1. **Objetividad y Foco Visual:** La descripci贸n debe ser puramente objetiva, describiendo solo lo que se ve. Debe priorizar la acci贸n y los elementos relevantes (personajes, objetos, localizaci贸n). |
|
|
2. **Tiempo y Espacio (Sincronizaci贸n):** Las audiodescripciones (AD) deben insertarse en los silencios del di谩logo. El tiempo de la AD (entre START y END) debe ser suficiente para narrar el contenido sin solaparse con el di谩logo o la m煤sica importante. |
|
|
3. **Concisi贸n y Claridad:** Usar lenguaje simple y conciso. Evitar redundancias y juicios de valor. |
|
|
4. **Formato:** Cada segmento de AD debe tener un formato SRT v谩lido, incluyendo el marcador (AD) al principio de la l铆nea de texto. |
|
|
5. **Utilidad:** Cada segmento de AD debe ser 煤til para la comprensi贸n y nunca ser redundante. En caso de repetir algo ya explicado antes, mejor no decir nada. |
|
|
""" |
|
|
|
|
|
EVALUATION_CRITERIA = [ |
|
|
"Precisi贸 Descriptiva", |
|
|
"Sincronitzaci贸 Temporal", |
|
|
"Claredat i Concisi贸", |
|
|
"Inclusi贸 de Di脿leg/So", |
|
|
"Contextualitzaci贸", |
|
|
"Flux i Ritme de la Narraci贸", |
|
|
] |
|
|
|
|
|
CRITERIA_WEIGHTS = { |
|
|
"Precisi贸 Descriptiva": 1, |
|
|
"Sincronitzaci贸 Temporal": 4, |
|
|
"Claredat i Concisi贸": 1, |
|
|
"Inclusi贸 de Di脿leg/So": 1, |
|
|
"Contextualitzaci贸": 1, |
|
|
"Flux i Ritme de la Narraci贸": 1, |
|
|
} |
|
|
|
|
|
def setup_files(initial_srt_content: str, context_json_content: str): |
|
|
"""Crea los ficheros iniciales necesarios en el sistema de archivos local.""" |
|
|
(TEMP_DIR / "une_ad_0.srt").write_text(initial_srt_content, encoding="utf-8") |
|
|
(TEMP_DIR / "json_ad.json").write_text(context_json_content, encoding="utf-8") |
|
|
logger.info("Ficheros iniciales 'une_ad_0.srt' y 'json_ad.json' creados.") |
|
|
|
|
|
|
|
|
def _load_audiodescription_from_db(sha1sum: str, version: str) -> tuple[str, str]: |
|
|
"""Carga une_ad y info_ad desde demo/temp/audiodescriptions.db. |
|
|
|
|
|
Si info_ad no existeix o 茅s nul, es fa servir CONTEXT_JSON_CONTENT com a |
|
|
fallback per no trencar el pipeline. |
|
|
""" |
|
|
|
|
|
db_path = DEMO_TEMP_DIR / "audiodescriptions.db" |
|
|
if not db_path.exists(): |
|
|
raise FileNotFoundError(f"No s'ha trobat {db_path}") |
|
|
|
|
|
conn = sqlite3.connect(str(db_path)) |
|
|
conn.row_factory = sqlite3.Row |
|
|
try: |
|
|
cur = conn.cursor() |
|
|
cur.execute( |
|
|
"SELECT * FROM audiodescriptions WHERE sha1sum=? AND version=?", |
|
|
(sha1sum, version), |
|
|
) |
|
|
row = cur.fetchone() |
|
|
if row is None: |
|
|
raise ValueError( |
|
|
f"No s'ha trobat cap registre a audiodescriptions.db per sha1sum={sha1sum}, version={version}" |
|
|
) |
|
|
|
|
|
une_ad = row["une_ad"] or "" |
|
|
|
|
|
info_ad_text = None |
|
|
if "info_ad" in row.keys(): |
|
|
info_ad_text = row["info_ad"] |
|
|
elif "json_ad" in row.keys(): |
|
|
info_ad_text = row["json_ad"] |
|
|
|
|
|
if not info_ad_text: |
|
|
logger.warning("info_ad buit a audiodescriptions.db; fent servir CONTEXT_JSON_CONTENT de mostra.") |
|
|
info_ad_text = CONTEXT_JSON_CONTENT |
|
|
|
|
|
return une_ad, info_ad_text |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def _write_casting_csv_from_db(sha1sum: str) -> None: |
|
|
"""Reconstrueix TEMP_DIR/casting.csv a partir de demo/data/casting.db. |
|
|
|
|
|
El format 茅s simple: cap莽alera "name,description" i una fila per registre. |
|
|
Si no hi ha BD o registres, el fitxer no es crea i l'agent d'identitat es |
|
|
saltar脿 autom脿ticament. |
|
|
""" |
|
|
|
|
|
db_path = DEMO_DATA_DIR / "casting.db" |
|
|
if not db_path.exists(): |
|
|
logger.warning("casting.db no trobat; no es generar脿 casting.csv") |
|
|
return |
|
|
|
|
|
conn = sqlite3.connect(str(db_path)) |
|
|
conn.row_factory = sqlite3.Row |
|
|
try: |
|
|
cur = conn.cursor() |
|
|
cur.execute("SELECT name, description FROM casting WHERE sha1sum=?", (sha1sum,)) |
|
|
rows = cur.fetchall() |
|
|
if not rows: |
|
|
logger.info("Sense registres de casting per a sha1sum=%s", sha1sum) |
|
|
return |
|
|
|
|
|
out_path = TEMP_DIR / "casting.csv" |
|
|
with out_path.open("w", encoding="utf-8", newline="") as f: |
|
|
writer = csv.writer(f) |
|
|
writer.writerow(["name", "description"]) |
|
|
for r in rows: |
|
|
writer.writerow([r["name"], r["description"]]) |
|
|
|
|
|
logger.info("casting.csv generat a %s amb %d registres", out_path, len(rows)) |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def _write_scenarios_csv_from_db(sha1sum: str) -> None: |
|
|
"""Reconstrueix TEMP_DIR/scenarios.csv a partir de demo/data/scenarios.db. |
|
|
|
|
|
Format: cap莽alera "name,description" i una fila per escenari. |
|
|
""" |
|
|
|
|
|
db_path = DEMO_DATA_DIR / "scenarios.db" |
|
|
if not db_path.exists(): |
|
|
logger.warning("scenarios.db no trobat; no es generar脿 scenarios.csv") |
|
|
return |
|
|
|
|
|
conn = sqlite3.connect(str(db_path)) |
|
|
conn.row_factory = sqlite3.Row |
|
|
try: |
|
|
cur = conn.cursor() |
|
|
cur.execute("SELECT name, description FROM scenarios WHERE sha1sum=?", (sha1sum,)) |
|
|
rows = cur.fetchall() |
|
|
if not rows: |
|
|
logger.info("Sense registres d'escenaris per a sha1sum=%s", sha1sum) |
|
|
return |
|
|
|
|
|
out_path = TEMP_DIR / "scenarios.csv" |
|
|
with out_path.open("w", encoding="utf-8", newline="") as f: |
|
|
writer = csv.writer(f) |
|
|
writer.writerow(["name", "description"]) |
|
|
for r in rows: |
|
|
writer.writerow([r["name"], r["description"]]) |
|
|
|
|
|
logger.info("scenarios.csv generat a %s amb %d registres", out_path, len(rows)) |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def _strip_markdown_fences(content: str) -> str: |
|
|
"""Elimina fences ```...``` alrededor de una respuesta JSON si existen.""" |
|
|
text = content.strip() |
|
|
if text.startswith("```"): |
|
|
lines = text.splitlines() |
|
|
|
|
|
lines = lines[1:] |
|
|
|
|
|
while lines and lines[-1].strip() == "```": |
|
|
lines.pop() |
|
|
text = "\n".join(lines).strip() |
|
|
return text |
|
|
|
|
|
|
|
|
def generate_evaluation_report(srt_content: str, iteration: int) -> tuple[float, float, Path]: |
|
|
"""Solicita al LLM una avaluaci贸 estructurada i guarda'n el CSV.""" |
|
|
criteria_formatted = "\n".join(f"- {name}" for name in EVALUATION_CRITERIA) |
|
|
prompt = ( |
|
|
"Actua com un auditor UNE. Avalua l'SRT generat, puntuant cada caracter铆stica de 0 a 7 " |
|
|
"segons la qualitat observada. D贸nega justificaci贸 breve per貌 concreta per a cada cas. " |
|
|
"Les caracter铆stiques obligat貌ries s贸n:\n" |
|
|
f"{criteria_formatted}\n" |
|
|
"Retorna 脷NICAMENT un array JSON d'objectes amb les claus: " |
|
|
"'caracteristica', 'valoracio' (nombre enter de 0 a 7) i 'justificacio'." |
|
|
) |
|
|
|
|
|
response = llm.invoke( |
|
|
[ |
|
|
SystemMessage(content=prompt), |
|
|
HumanMessage( |
|
|
content=( |
|
|
"# SRT AVALUAT\n" |
|
|
f"{srt_content}\n\n" |
|
|
"Assegura't de complir el format indicat." |
|
|
) |
|
|
), |
|
|
] |
|
|
) |
|
|
|
|
|
cleaned = _strip_markdown_fences(response.content) |
|
|
try: |
|
|
data = json.loads(cleaned) |
|
|
if not isinstance(data, list): |
|
|
raise ValueError("La resposta no 茅s una llista.") |
|
|
except Exception as exc: |
|
|
logger.error( |
|
|
"Error al generar l'avaluaci贸 estructurada: %s. Resposta original: %s", |
|
|
exc, |
|
|
response.content, |
|
|
) |
|
|
data = [ |
|
|
{ |
|
|
"caracteristica": "Avaluaci贸 fallida", |
|
|
"valoracio": 1, |
|
|
"justificacio": "No s'ha pogut obtenir l'avaluaci贸 del LLM.", |
|
|
} |
|
|
] |
|
|
|
|
|
eval_path = TEMP_DIR / f"eval_{iteration}.csv" |
|
|
with eval_path.open("w", encoding="utf-8", newline="") as csvfile: |
|
|
writer = csv.writer(csvfile) |
|
|
writer.writerow(["Caracteristica", "Valoracio (0-7)", "Justificacio"]) |
|
|
for item in data: |
|
|
writer.writerow( |
|
|
[ |
|
|
item.get("caracteristica", ""), |
|
|
item.get("valoracio", 0), |
|
|
item.get("justificacio", ""), |
|
|
] |
|
|
) |
|
|
|
|
|
scores = [] |
|
|
weighted_sum = 0.0 |
|
|
total_weight = 0.0 |
|
|
|
|
|
for entry in data: |
|
|
if not isinstance(entry, dict): |
|
|
continue |
|
|
try: |
|
|
score = float(entry.get("valoracio", 0)) |
|
|
except (TypeError, ValueError): |
|
|
score = 0.0 |
|
|
scores.append(score) |
|
|
|
|
|
weight = CRITERIA_WEIGHTS.get(entry.get("caracteristica", ""), 1) |
|
|
weighted_sum += score * weight |
|
|
total_weight += weight |
|
|
|
|
|
mean_score = sum(scores) / len(scores) if scores else 0.0 |
|
|
weighted_mean = weighted_sum / total_weight if total_weight else mean_score |
|
|
return mean_score, weighted_mean, eval_path |
|
|
|
|
|
|
|
|
class ReflectionState(TypedDict): |
|
|
"""Representa el estado del bucle de reflexi贸n.""" |
|
|
iteration: int |
|
|
current_srt_path: str |
|
|
critic_report: Dict[str, Union[float, str]] |
|
|
history: List[SystemMessage] |
|
|
evaluation_mean: float |
|
|
best_iteration: int |
|
|
best_weighted_mean: float |
|
|
best_srt_path: str |
|
|
best_eval_path: str |
|
|
|
|
|
|
|
|
def narrator_agent(state: ReflectionState): |
|
|
""" |
|
|
Agente que genera o reescribe el SRT. |
|
|
- En el ciclo 0, genera el SRT inicial. |
|
|
- En ciclos > 0, reescribe el SRT bas谩ndose en el critic_report. |
|
|
""" |
|
|
iteration = state["iteration"] |
|
|
critic_report = state["critic_report"] |
|
|
history = state["history"] |
|
|
|
|
|
|
|
|
json_context = (TEMP_DIR / "json_ad.json").read_text(encoding="utf-8") |
|
|
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
|
|
|
|
|
|
|
|
if iteration == 0: |
|
|
|
|
|
|
|
|
prompt = ( |
|
|
"Ets un Narrador expert en Audiodescripci贸 (AD). La teva tasca inicial 茅s generar " |
|
|
"un fitxer SRT d'audiodescripcions basat en el JSON de context visual. " |
|
|
"TOT I AIX脥, per a aquesta primera iteraci贸, l'SRT ja s'ha generat. " |
|
|
"Simplement retorna el contingut de 'une_ad_0.srt' com si fos la teva sortida. " |
|
|
"Assegura't que totes les audiodescripcions estiguin en catal脿 i que cadascuna pugui ser locutada " |
|
|
"dins del temps disponible (utilitza un m脿xim aproximat d'11 car脿cters per segon). Si el tram de temps " |
|
|
"茅s massa curt (<1.5s), combina'l amb el bloc d'AD m茅s proper i ajusta els timestamps perqu猫 la narraci贸 sigui fluida. " |
|
|
"Evita redund脿ncies: no repeteixis informaci贸 ja descrita en segments d'AD anteriors o al di脿leg, i elimina qualsevol detall que no sigui essencial." |
|
|
) |
|
|
output_srt = current_srt |
|
|
reflection_text = "Generaci贸n inicial. No hay reflexi贸n." |
|
|
else: |
|
|
|
|
|
prompt = ( |
|
|
"Ets un Narrador expert en Audiodescripci贸 (AD). Has rebut una cr铆tica sobre la teva 煤ltima versi贸 de l'SRT. " |
|
|
"La teva tasca 茅s REESCRIURE el contingut d'audiodescripci贸 (l铆nies amb '(AD)') del fitxer SRT, " |
|
|
"assegurant que sigui coherent amb el JSON de context i, sobretot, que CORREGEIXIS TOTS els problemes " |
|
|
"mencionats a l'Informe Cr铆tic adjunt. Mant茅n intactes els di脿legs (l铆nies amb [Nom]) i escriu totes les audiodescripcions en catal脿 natural. " |
|
|
"Garanteix que cada bloc d'AD pugui ser locutat dins del seu interval temporal disponible considerant un m脿xim d'11 car脿cters per segon. " |
|
|
"Si l'interval 茅s massa curt (<1.5s), fusiona'l amb el bloc d'AD anterior o posterior m茅s proper i ajusta els timestamps perqu猫 quedin cont铆nues. " |
|
|
"Prefereix frases concises i accionables, prioritzant la informaci贸 visual essencial, i elimina redund脿ncies amb AD anteriors o amb els di脿legs." |
|
|
) |
|
|
|
|
|
|
|
|
input_content = f""" |
|
|
# INFORME CR脥TICO |
|
|
Porcentaje de Fiabilidad Anterior: {critic_report.get('reliability_percentage')} |
|
|
Cr铆tica Cualitativa: {critic_report.get('qualitative_critique')} |
|
|
|
|
|
# JSON DE CONTEXTO VISUAL (Gu铆a para la AD) |
|
|
{json_context} |
|
|
|
|
|
# 脷LTIMO ARCHIVO SRT GENERADO (une_ad_{iteration-1}.srt) |
|
|
{current_srt} |
|
|
|
|
|
REGLAS: Tu respuesta debe ser *SOLAMENTE* el contenido completo del nuevo archivo SRT (incluyendo di谩logos), sin ning煤n comentario o explicaci贸n adicional. |
|
|
""" |
|
|
|
|
|
|
|
|
response = llm.invoke( |
|
|
[ |
|
|
SystemMessage(content=prompt), |
|
|
HumanMessage(content=input_content) |
|
|
] |
|
|
) |
|
|
|
|
|
output_srt = response.content |
|
|
reflection_text = f"Reescrito en base al informe cr铆tico: {critic_report.get('qualitative_critique', 'N/A')}" |
|
|
|
|
|
|
|
|
new_srt_path = TEMP_DIR / f"une_ad_{iteration}.srt" |
|
|
new_srt_path.write_text(output_srt, encoding="utf-8") |
|
|
|
|
|
|
|
|
(TEMP_DIR / f"thinking_{iteration}.txt").write_text(reflection_text, encoding="utf-8") |
|
|
|
|
|
logger.info(f"Narrador: Generada la versi贸n {iteration} del SRT en '{new_srt_path}'.") |
|
|
|
|
|
|
|
|
new_history = history + [AIMessage(content=f"Narrador v{iteration} completado. Raz贸n de reflexi贸n: {reflection_text}")] |
|
|
return { |
|
|
"iteration": iteration, |
|
|
"current_srt_path": str(new_srt_path), |
|
|
"history": new_history, |
|
|
"evaluation_mean": state.get("evaluation_mean", 0.0), |
|
|
"best_iteration": state.get("best_iteration", -1), |
|
|
"best_weighted_mean": state.get("best_weighted_mean", 0.0), |
|
|
"best_srt_path": state.get("best_srt_path", str(new_srt_path)), |
|
|
"best_eval_path": state.get("best_eval_path", str(TEMP_DIR / f"eval_{iteration}.csv")), |
|
|
} |
|
|
|
|
|
def critic_agent(state: ReflectionState): |
|
|
""" |
|
|
Agente que eval煤a la calidad del SRT generado por el Narrador bas谩ndose en las Reglas UNE. |
|
|
Devuelve una puntuaci贸n y una cr铆tica cualitativa. |
|
|
""" |
|
|
iteration = state["iteration"] |
|
|
history = state["history"] |
|
|
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
|
|
|
|
|
prompt = ( |
|
|
"Ets un Cr铆tic d'Audiodescripci贸 molt estricte. La teva tasca 茅s avaluar l'SRT adjunt " |
|
|
"煤nicament segons les Regles UNE proporcionades. L'avaluaci贸 ha de ser doble: " |
|
|
"1. **Num猫rica**: Un percentatge de fiabilitat (ex. 85.5) de 0 a 100%. " |
|
|
"2. **Qualitativa**: Una cr铆tica constructiva sobre les principals mancances de les AD respecte a les regles. " |
|
|
"Has de ser EXTREMADAMENT estricte amb la sincronitzaci贸 (sense solapament amb el di脿leg), " |
|
|
"amb l'adequaci贸 temporal (velocitat m脿xima recomanada d'11 car脿cters per segon) i amb l'abs猫ncia de redund脿ncies. " |
|
|
"Comprova tamb茅 que totes les audiodescripcions estan escrites en catal脿 natural." |
|
|
) |
|
|
|
|
|
input_content = f""" |
|
|
# REGLAS UNE DE AUDIODESCRIPCI脫N: |
|
|
{UNE_RULES} |
|
|
|
|
|
# ARCHIVO SRT A EVALUAR (une_ad_{iteration}.srt): |
|
|
{current_srt} |
|
|
|
|
|
REGLAS DE RESPUESTA: |
|
|
Tu respuesta debe ser *SOLAMENTE* un objeto JSON con dos claves: |
|
|
1. "reliability_percentage": (float) El porcentaje de fiabilidad. |
|
|
2. "qualitative_critique": (string) La cr铆tica cualitativa y sugerencias de mejora. |
|
|
Ejemplo de respuesta: {{"reliability_percentage": 75.0, "qualitative_critique": "El segmento 4 se solapa 0.34s con el di谩logo de Sandra. El segmento 5 es demasiado gen茅rico y no describe bien la acci贸n."}} |
|
|
""" |
|
|
|
|
|
|
|
|
response = llm.invoke( |
|
|
[ |
|
|
SystemMessage(content=prompt), |
|
|
HumanMessage(content=input_content) |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
cleaned_response = _strip_markdown_fences(response.content) |
|
|
report = json.loads(cleaned_response) |
|
|
if not isinstance(report, dict) or 'reliability_percentage' not in report: |
|
|
raise ValueError("Estructura JSON incorrecta.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error al parsear el JSON del Cr铆tico: {e}. Respuesta: {response.content}") |
|
|
report = {"reliability_percentage": 1.0, "qualitative_critique": "El Cr铆tico no devolvi贸 un JSON v谩lido. Reintentar."} |
|
|
|
|
|
logger.info(f"Cr铆tico: Evaluaci贸n completada. Fiabilidad: {report.get('reliability_percentage')}%.") |
|
|
|
|
|
mean_score, weighted_mean, eval_path = generate_evaluation_report(current_srt, iteration) |
|
|
|
|
|
thinking_path = TEMP_DIR / f"thinking_{iteration}.txt" |
|
|
if thinking_path.exists(): |
|
|
previous_text = thinking_path.read_text(encoding="utf-8") |
|
|
thinking_path.write_text( |
|
|
( |
|
|
f"{previous_text}\n\nMitjana simple d'avaluaci贸: {mean_score:.2f} / 7" |
|
|
f"\nMitjana ponderada d'avaluaci贸: {weighted_mean:.2f} / 7" |
|
|
), |
|
|
encoding="utf-8", |
|
|
) |
|
|
|
|
|
best_iteration = state.get("best_iteration", -1) |
|
|
best_weighted_mean = state.get("best_weighted_mean", -1.0) |
|
|
best_srt_path = state.get("best_srt_path", state["current_srt_path"]) |
|
|
best_eval_path = state.get("best_eval_path", str(eval_path)) |
|
|
|
|
|
if weighted_mean > best_weighted_mean: |
|
|
best_iteration = iteration |
|
|
best_weighted_mean = weighted_mean |
|
|
best_srt_path = state["current_srt_path"] |
|
|
best_eval_path = str(eval_path) |
|
|
|
|
|
new_history = history + [ |
|
|
AIMessage( |
|
|
content=( |
|
|
"Cr铆tico v{iter} completado. Fiabilidad: {reliab}%. " |
|
|
"Mitjana simple: {mean:.2f}/7. Mitjana ponderada: {wmean:.2f}/7" |
|
|
).format( |
|
|
iter=iteration, |
|
|
reliab=report.get("reliability_percentage"), |
|
|
mean=mean_score, |
|
|
wmean=weighted_mean, |
|
|
) |
|
|
) |
|
|
] |
|
|
return { |
|
|
"iteration": iteration + 1, |
|
|
"critic_report": report, |
|
|
"history": new_history, |
|
|
"evaluation_mean": weighted_mean, |
|
|
"best_iteration": best_iteration, |
|
|
"best_weighted_mean": best_weighted_mean, |
|
|
"best_srt_path": best_srt_path, |
|
|
"best_eval_path": best_eval_path, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def should_continue(state: ReflectionState) -> str: |
|
|
""" |
|
|
Funci贸n de chequeo que decide si continuar iterando o finalizar. |
|
|
""" |
|
|
MAX_ITERATIONS = 5 |
|
|
MIN_AVERAGE_SCORE = 6.0 |
|
|
|
|
|
iteration = state["iteration"] |
|
|
mean_score = state.get("evaluation_mean", 0.0) |
|
|
|
|
|
if mean_score >= MIN_AVERAGE_SCORE: |
|
|
logger.info(f"FIN: Mitjana ponderada d'avaluaci贸 assolida ({mean_score:.2f} >= {MIN_AVERAGE_SCORE}).") |
|
|
return "end" |
|
|
|
|
|
if iteration >= MAX_ITERATIONS: |
|
|
logger.info(f"FIN: S'ha assolit el m脿xim d'iteracions ({iteration} / {MAX_ITERATIONS}).") |
|
|
return "end" |
|
|
|
|
|
logger.info(f"CONTINUAR: Iteraci贸 {iteration} / {MAX_ITERATIONS}. Mitjana ponderada actual: {mean_score:.2f} / 7.") |
|
|
return "continue" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
initial_state: ReflectionState = { |
|
|
"iteration": 0, |
|
|
"current_srt_path": str(TEMP_DIR / "une_ad_0.srt"), |
|
|
"critic_report": {"reliability_percentage": 0.0, "qualitative_critique": "Inicializando el proceso."}, |
|
|
"history": [], |
|
|
"evaluation_mean": 0.0, |
|
|
"best_iteration": -1, |
|
|
"best_weighted_mean": -1.0, |
|
|
"best_srt_path": str(TEMP_DIR / "une_ad_0.srt"), |
|
|
"best_eval_path": str(TEMP_DIR / "eval_0.csv"), |
|
|
} |
|
|
|
|
|
|
|
|
workflow = StateGraph(ReflectionState) |
|
|
|
|
|
|
|
|
workflow.add_node("narrator", narrator_agent) |
|
|
workflow.add_node("critic", critic_agent) |
|
|
|
|
|
|
|
|
workflow.set_entry_point("narrator") |
|
|
workflow.add_edge("narrator", "critic") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"critic", |
|
|
should_continue, |
|
|
{ |
|
|
"continue": "narrator", |
|
|
"end": END |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
app = workflow.compile() |
|
|
|
|
|
|
|
|
def run_reflection_pipeline(srt_content: str, context_json: str | None = None) -> str: |
|
|
"""Executa el grafo de reflexi贸 sobre un SRT i retorna el SRT final. |
|
|
|
|
|
- Escriu ``une_ad_0.srt`` i ``json_ad.json`` a ``TEMP_DIR``. |
|
|
- Inicialitza l'estat del bucle de reflexi贸. |
|
|
- Executa ``app.invoke(initial_state)``. |
|
|
- Copia el millor SRT i eval als fitxers finals i retorna el contingut del |
|
|
millor SRT. |
|
|
""" |
|
|
|
|
|
|
|
|
setup_files(srt_content, context_json or CONTEXT_JSON_CONTENT) |
|
|
|
|
|
logger.info("--- Comen莽ant el bucle de reflexi贸 (run_reflection_pipeline) ---") |
|
|
|
|
|
|
|
|
final_state = app.invoke(initial_state) |
|
|
|
|
|
best_srt_path = Path(final_state["best_srt_path"]) |
|
|
best_eval_path = Path(final_state["best_eval_path"]) |
|
|
|
|
|
|
|
|
final_srt_path = TEMP_DIR / "une_ad.srt" |
|
|
final_eval_path = TEMP_DIR / "eval.csv" |
|
|
|
|
|
try: |
|
|
shutil.copy(best_srt_path, final_srt_path) |
|
|
shutil.copy(best_eval_path, final_eval_path) |
|
|
except Exception as exc: |
|
|
logger.warning("No s'han pogut copiar els fitxers finals de reflexi贸: %s", exc) |
|
|
|
|
|
return best_srt_path.read_text(encoding="utf-8") |
|
|
|
|
|
|
|
|
def refine_srt_with_reflection(srt_content: str, *, context_json: str | None = None) -> str: |
|
|
"""Refina un SRT usant el pipeline de reflexi贸 definit en aquest m貌dul. |
|
|
|
|
|
Args: |
|
|
srt_content: Contingut SRT inicial (cadena completa). |
|
|
context_json: JSON de context visual (opcional). Si no es proporciona, |
|
|
s'utilitza `CONTEXT_JSON_CONTENT` de mostra. |
|
|
|
|
|
Returns: |
|
|
Contingut del SRT final generat pel sistema de reflexi贸 multiagent. |
|
|
""" |
|
|
|
|
|
return run_reflection_pipeline(srt_content, context_json or CONTEXT_JSON_CONTENT) |
|
|
|
|
|
|
|
|
def refine_video_with_reflection(sha1sum: str, version: str) -> str: |
|
|
"""Refina un v铆deo identificat per (sha1sum, version) usant les BDs de demo. |
|
|
|
|
|
Flux: |
|
|
1. Llegeix une_ad i info_ad des de demo/temp/audiodescriptions.db. |
|
|
2. Regenera TEMP_DIR/casting.csv a partir de demo/data/casting.db. |
|
|
3. Regenera TEMP_DIR/scenarios.csv a partir de demo/data/scenarios.db. |
|
|
4. Executa el pipeline complet de reflexi贸 (run_reflection_pipeline). |
|
|
|
|
|
Retorna el contingut del SRT final refinat. |
|
|
""" |
|
|
|
|
|
une_ad, info_ad = _load_audiodescription_from_db(sha1sum, version) |
|
|
|
|
|
|
|
|
setup_files(une_ad, info_ad) |
|
|
_write_casting_csv_from_db(sha1sum) |
|
|
_write_scenarios_csv_from_db(sha1sum) |
|
|
|
|
|
return run_reflection_pipeline(une_ad, info_ad) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
final_srt = refine_srt_with_reflection(INITIAL_SRT_CONTENT, context_json=CONTEXT_JSON_CONTENT) |
|
|
print("\n--- Contingut del SRT Final ---") |
|
|
print(final_srt) |