File size: 5,820 Bytes
287f01b
 
 
b17b915
287f01b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b17b915
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
287f01b
b17b915
 
 
 
287f01b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
from pathlib import Path

from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

from vision_tools import (
    keyframe_conditional_extraction_ana,
    keyframe_every_second,
    process_frames,
    FaceOfImageEmbedding,
    generar_montage,
    describe_montage_sequence,   # fallback local
)

from llm_router import load_yaml, LLMRouter

def cluster_ocr_sequential(ocr_list: List[Dict[str, Any]], threshold: float = 0.6) -> List[Dict[str, Any]]:
    if not ocr_list:
        return []
    ocr_text = [item.get("ocr") for item in ocr_list if item and isinstance(item.get("ocr"), str)]
    if not ocr_text:
        return []
    model = SentenceTransformer("all-MiniLM-L6-v2")
    embeddings = model.encode(ocr_text, normalize_embeddings=True)

    clusters_repr = []
    prev_emb = embeddings[0]
    start_time = ocr_list[0]["start"]
    for i, emb in enumerate(embeddings[1:], 1):
        sim = cosine_similarity([prev_emb], [emb])[0][0]
        if sim < threshold:
            clusters_repr.append({"index": i - 1, "start_time": start_time})
            prev_emb = emb
            start_time = ocr_list[i]["start"]
    clusters_repr.append({"index": len(embeddings) - 1, "start_time": start_time})

    ocr_final = []
    for cluster in clusters_repr:
        idx = cluster["index"]
        if idx < len(ocr_list) and ocr_list[idx].get("ocr"):
            it = ocr_list[idx]
            ocr_final.append({
                "ocr": it.get("ocr"),
                "image_path": it.get("image_path"),
                "start": cluster["start_time"],
                "end": it.get("end"),
                "faces": it.get("faces"),
            })
    return ocr_final

def build_keyframes_and_per_second(

    video_path: str,

    out_dir: Path,

    cfg: Dict[str, Any],

    face_collection=None,

) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]:
    kf_dir = out_dir / "keyframes"
    ps_dir = out_dir / "frames_per_second"

    keyframes = keyframe_conditional_extraction_ana(video_path=video_path, output_dir=str(kf_dir))
    per_second = keyframe_every_second(video_path=video_path, output_dir=str(ps_dir))

    embedder = FaceOfImageEmbedding(deepface_model="Facenet512")
    kf_proc = process_frames(frames=keyframes, config=cfg, face_col=face_collection, embedding_model=embedder)
    ps_proc = process_frames(frames=per_second, config=cfg, face_col=face_collection, embedding_model=embedder)

    ocr_list = [{
        "ocr": fr.get("ocr"),
        "image_path": fr.get("image_path"),
        "start": fr.get("start"),
        "end": fr.get("end"),
        "faces": fr.get("faces"),
    } for fr in ps_proc]
    ocr_final = cluster_ocr_sequential(ocr_list, threshold=float(cfg.get("video_processing", {}).get("ocr_clustering", {}).get("similarity_threshold", 0.6)))

    kf_mod: List[Dict[str, Any]] = []
    idx = 1
    for k in kf_proc:
        ks, ke = k["start"], k["end"]
        inicio = True
        sustituido = False
        for f in ocr_final:
            if f["start"] >= ks and f["end"] <= ke and inicio:
                kf_mod.append({
                    "id": idx,
                    "start": k["start"],
                    "end": None,
                    "image_path": f["image_path"],
                    "faces": f["faces"],
                    "ocr": f.get("ocr"),
                    "description": None,
                })
                idx += 1
                sustituido = True
                inicio = False
            elif f["start"] >= ks and f["end"] <= ke and not inicio:
                kf_mod.append({
                    "id": idx,
                    "start": f["start"],
                    "end": None,
                    "image_path": f["image_path"],
                    "faces": f["faces"],
                    "ocr": f.get("ocr"),
                    "description": None,
                })
                idx += 1
        if not sustituido:
            k2 = dict(k)
            k2["id"] = idx
            kf_mod.append(k2)
            idx += 1

    return kf_mod, ps_proc, 0.0

def describe_keyframes_with_llm(

    keyframes: List[Dict[str, Any]],

    out_dir: Path,

    face_identities: Optional[set] = None,

    config_path: str | None = None,

) -> Tuple[List[Dict[str, Any]], Optional[str]]:
    cfg = load_yaml(config_path or "config.yaml")
    model_name = (cfg.get("background_descriptor", {}).get("description", {}) or {}).get("model", "salamandra-vision")

    frame_paths = [k.get("image_path") for k in keyframes if k.get("image_path")]
    montage_dir = out_dir / "montage"
    montage_path = None
    if frame_paths:
        montage_path = generar_montage(frame_paths, montage_dir)
        context = {
            "informacion": [{k: v for k, v in fr.items() if k in ("start", "end", "ocr", "faces")} for fr in keyframes],
            "face_identities": sorted(list(face_identities or set()))
        }
        try:
            router = LLMRouter(cfg)
            descs = router.vision_describe(frame_paths, context=context, model=model_name)
        except Exception:
            descs = describe_montage_sequence(
                montage_path=str(montage_path),
                n=len(frame_paths),
                informacion=keyframes,
                face_identities=face_identities or set(),
                config_path=config_path or "config.yaml",
            )
        for i, fr in enumerate(keyframes):
            if i < len(descs):
                fr["description"] = descs[i]
    return keyframes, str(montage_path) if montage_path else None