Spaces:
Build error
Build error
| import PIL | |
| import torch | |
| from .modelling_gecko import GeckoForConditionalGeneration | |
| from .processing_gecko import GeckoProcessor | |
| from .conversation import conv_llama_3 as default_conv, conv_templates | |
| import transformers | |
| from typing import List, Tuple, Union | |
| from io import StringIO | |
| import sys | |
| class Capturing(list): | |
| def __enter__(self): | |
| self._stdout = sys.stdout | |
| sys.stdout = self._stringio = StringIO() | |
| return self | |
| def __exit__(self, *args): | |
| self.extend(self._stringio.getvalue().splitlines()) | |
| del self._stringio # free up some memory | |
| sys.stdout = self._stdout | |
| def chat_gecko( | |
| text:str, | |
| images: List[Union[PIL.Image.Image, str]], | |
| model:GeckoForConditionalGeneration, | |
| processor:GeckoProcessor, | |
| max_input_length:int=None, | |
| history:List[dict]=None, | |
| **kwargs) -> Tuple[str, List[dict]]: | |
| if "llama-3" in model.language_model.name_or_path.lower(): | |
| conv = conv_templates['llama_3'] | |
| terminators = [ | |
| processor.tokenizer.eos_token_id, | |
| processor.tokenizer.convert_tokens_to_ids("<|eot_id|>") | |
| ] | |
| else: | |
| conv = default_conv | |
| terminators = None | |
| kwargs["eos_token_id"] = terminators | |
| conv = conv.copy() | |
| conv.messages = [] | |
| if history is not None: | |
| for message in history: | |
| assert message["role"] in conv.roles | |
| conv.append_message(message["role"], message["text"]) | |
| if text: | |
| assert conv.messages[-1][0] == conv.roles[1], "The last message in the history should be the assistant, if the given text is not empty" | |
| conv.append_message(conv.roles[0], text) | |
| conv.append_message(conv.roles[1], "") | |
| history.append({"role": conv.roles[0], "text": text}) | |
| history.append({"role": conv.roles[1], "text": ""}) | |
| else: | |
| if conv.messages[-1][0] == conv.roles[1]: | |
| assert conv.messages[-1][1] == "", "No user message should be provided" | |
| else: | |
| assert conv.messages[-1][0] == conv.roles[0], "The last message in the history should be the user, if the given text is empty" | |
| conv.append_message(conv.roles[0], "") | |
| history.append({"role": conv.roles[0], "text": ""}) | |
| else: | |
| history = [] | |
| history.append({"role": conv.roles[0], "text": text}) | |
| history.append({"role": conv.roles[1], "text": ""}) | |
| conv.append_message(conv.roles[0], text) | |
| conv.append_message(conv.roles[1], "") | |
| assert conv.messages[-1][0] == conv.roles[1] and conv.messages[-1][1] == "", "Format check" | |
| assert history[-1]["role"] == conv.roles[1] and history[-1]["text"] == "", "Format check" | |
| keyword_prompt = conv.generate_keyword_prompt(text.split("\n")[len(images)]) | |
| prompt = conv.get_prompt() | |
| if images: | |
| for i in range(len(images)): | |
| if isinstance(images[i], str): | |
| images[i] = PIL.Image.open(images[i]).convert("RGB") | |
| inputs = processor(images=images, text=prompt, keywords_text=keyword_prompt, return_tensors="pt", truncation=True, max_length=max_input_length) | |
| for k, v in inputs.items(): | |
| if v is not None: | |
| if isinstance(v, torch.Tensor): | |
| inputs[k] = v.to(model.device) | |
| elif isinstance(v, list): | |
| if k == 'coords': | |
| continue | |
| inputs[k] = [x.to(model.device) for x in v] | |
| elif isinstance(v, transformers.tokenization_utils_base.BatchEncoding) or isinstance(v, dict): | |
| for key, value in v.items(): | |
| if value is not None: | |
| if isinstance(value, list): | |
| inputs[k][key] = [x.to(model.device) for x in value] | |
| else: | |
| inputs[k][key] = value.to(model.device) | |
| else: | |
| raise ValueError(f"Invalid input type: {type(v)}") | |
| with torch.inference_mode(): | |
| output_ids = model.generate(**inputs, **kwargs)[0] | |
| # remove the input tokens | |
| generated_ids = output_ids[inputs["input_ids"].shape[-1]:] | |
| generated_text = processor.decode(generated_ids, skip_special_tokens=True) | |
| history[-1]["text"] = generated_text | |
| return generated_text, history | |
| def chat_gecko_stream( | |
| text:str, | |
| images: List[Union[PIL.Image.Image, str]], | |
| model:GeckoForConditionalGeneration, | |
| processor:GeckoProcessor, | |
| max_input_length:int=None, | |
| history:List[dict]=None, | |
| **kwargs) -> Tuple[str, List[dict]]: | |
| if "llama-3" in model.language_model.name_or_path.lower(): | |
| conv = conv_templates['llama_3'] | |
| terminators = [ | |
| processor.tokenizer.eos_token_id, | |
| processor.tokenizer.convert_tokens_to_ids("<|eot_id|>") | |
| ] | |
| else: | |
| conv = default_conv | |
| terminators = None | |
| kwargs["eos_token_id"] = terminators | |
| conv = conv.copy() | |
| conv.messages = [] | |
| if history is not None: | |
| for message in history: | |
| assert message["role"] in conv.roles | |
| conv.append_message(message["role"], message["text"]) | |
| if text: | |
| assert conv.messages[-1][0] == conv.roles[1], "The last message in the history should be the assistant, if the given text is not empty" | |
| conv.append_message(conv.roles[0], text) | |
| conv.append_message(conv.roles[1], "") | |
| history.append({"role": conv.roles[0], "text": text}) | |
| history.append({"role": conv.roles[1], "text": ""}) | |
| else: | |
| if conv.messages[-1][0] == conv.roles[1]: | |
| assert conv.messages[-1][1] == "", "No user message should be provided" | |
| else: | |
| assert conv.messages[-1][0] == conv.roles[0], "The last message in the history should be the user, if the given text is empty" | |
| conv.append_message(conv.roles[0], "") | |
| history.append({"role": conv.roles[0], "text": ""}) | |
| else: | |
| history = [] | |
| history.append({"role": conv.roles[0], "text": text}) | |
| history.append({"role": conv.roles[1], "text": ""}) | |
| conv.append_message(conv.roles[0], text) | |
| conv.append_message(conv.roles[1], "") | |
| assert conv.messages[-1][0] == conv.roles[1] and conv.messages[-1][1] == "", "Format check" | |
| assert history[-1]["role"] == conv.roles[1] and history[-1]["text"] == "", "Format check" | |
| if images: | |
| for i in range(len(images)): | |
| if isinstance(images[i], str): | |
| images[i] = PIL.Image.open(images[i]) | |
| last_prompt = history[-2]['text'].split("?")[0] | |
| last_prompt = last_prompt.replace('<image>', '').strip() if '<image>' in last_prompt else last_prompt.strip() | |
| keyword_prompt = conv.generate_keyword_prompt(last_prompt.replace('<image>', '').strip()) if '<image>' in last_prompt else conv.generate_keyword_prompt(last_prompt.strip()) | |
| else: | |
| keyword_prompt = None | |
| prompt = conv.get_prompt() | |
| inputs = processor(images=images, text=prompt, keywords_text=keyword_prompt, return_tensors="pt", truncation=True, max_length=max_input_length) | |
| for k, v in inputs.items(): | |
| if v is not None: | |
| if isinstance(v, torch.Tensor): | |
| inputs[k] = v.to(model.device) | |
| elif isinstance(v, list): | |
| if k == 'coords': | |
| continue | |
| inputs[k] = [x.to(model.device) for x in v] | |
| elif isinstance(v, transformers.tokenization_utils_base.BatchEncoding) or isinstance(v, dict): | |
| for key, value in v.items(): | |
| if value is not None: | |
| if isinstance(value, list): | |
| inputs[k][key] = [x.to(model.device) for x in value] | |
| else: | |
| inputs[k][key] = value.to(model.device) | |
| else: | |
| raise ValueError(f"Invalid input type: {type(v)}") | |
| from transformers import TextIteratorStreamer | |
| from threading import Thread | |
| streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) | |
| kwargs["streamer"] = streamer | |
| inputs.update(kwargs) | |
| thread = Thread(target=model.generate, kwargs=inputs) | |
| thread.start() | |
| generator = [] | |
| with Capturing() as print_kw: | |
| for _output in streamer: | |
| history[-1]["text"] += _output | |
| generator.append((history[-1]["text"], history)) | |
| # yield history[-1]["text"], history | |
| return generator, print_kw, inputs | |