CapaMarianna / app.py
amaisto's picture
Update app.py
90a7ac8 verified
import gradio as gr
import random
import pickle
from sentence_transformers import SentenceTransformer, CrossEncoder, util
import os # Importa il modulo os
import json
import datetime
from huggingface_hub import HfApi, HfFolder
DATASET_REPO_ID = "amaisto/Marianna-Conversations"
LOG_DIR = "conversation_logs"
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
hf_api = HfApi()
class MariannaBot:
def __init__(self):
self.data_path_main = "firstDB.pkl"
self.data_path_legends = "legends.pkl"
self.data_path_exp = "secondDB.pkl"
print("Inizializzazione di MariannaBot (senza DB)...") # Debug
self.database = self.load_data_from_pickle(self.data_path_main)
self.database_legends = self.load_data_from_pickle(self.data_path_legends)
self.database_expansion = self.load_data_from_pickle(self.data_path_exp)
self.database = self.database + self.database_legends + self.database_expansion
self.further_dataset = self.load_further_info_as_dataset(self.database)
self.further_dataset = self.further_dataset + self.database
self.db_keys = [el[0] for el in self.database] if isinstance(self.database, list) else []
self.db_keys_legends = [el[0] for el in self.database] if isinstance(self.database, list) else []
self.db_keys_further = [el[0] for el in self.further_dataset] if isinstance(self.further_dataset, list) else []
# print("Chiavi principali caricate:", len(self.db_keys)) # Debug
# print("Chiavi leggende caricate:", len(self.db_keys_legends)) # Debug
self.query_dic = self.load_queries_dataset()
self.query_dic_keys = list(self.query_dic.keys())
# print("dataset query inziali caricato: ", len(self.query_dic))
self.reset_state()
def load_further_info_as_dataset(self,dataset):
nuova_lista = []
for chiave_principale, info in dataset:
nuovo_dizionario = {'intro': info['intro']}
if 'further_info' in info:
for chiave_secondaria in info['further_info']:
nuova_lista.append((f"{chiave_secondaria} ({chiave_principale})", {'intro': info['further_info'][chiave_secondaria]}))
else:
nuova_lista.append((chiave_principale, nuovo_dizionario))
return nuova_lista
def load_queries_dataset(self):
"""Loads queries dataset"""
return {"si, certo, certamente, ok, assolutamente si, sicuro, sisi, continua, prosegui":"si","no, non ho domande, non mi interessa, niente, nulla":"no","non so, scegli tu, fai tu, casuale, lascio a te, decidi tu, pensaci tu, sorprendimi":"non so","stronzo, vaffanculo, ti odio, pezzo di merda, cazzo":"parolacce"}
def load_data_from_pickle(self, file_path):
"""Loads data from a pickle file."""
try:
with open(file_path, 'rb') as f:
data = pickle.load(f)
print(f"Dati caricati da: {file_path}") # Debug
return data
except FileNotFoundError:
print(f"Errore: File non trovato: {file_path}")
return []
except Exception as e:
print(f"Errore durante il caricamento da pickle {file_path}: {e}")
return []
def initialize_encoder(self):
"""
Initialize encoder and cross-encoder model.
"""
try:
# Initialize the encoder model
encoder_model = "nickprock/sentence-bert-base-italian-xxl-uncased"
cross_encoder_model = "nickprock/cross-encoder-italian-bert-stsb"
self.encoder = SentenceTransformer(encoder_model)
self.cross_encoder = CrossEncoder(cross_encoder_model)
# Pre-encode all database keys
self.db_keys_embeddings = self.encoder.encode(self.db_keys, convert_to_tensor=True)
self.db_keys_legends_embeddings = self.encoder.encode(self.db_keys_legends, convert_to_tensor=True)
self.first_query_emb = self.encoder.encode(self.query_dic_keys, convert_to_tensor=True)
self.further_embeddings = self.encoder.encode(self.db_keys_further, convert_to_tensor=True)
print(f"Encoder initialized with {len(self.db_keys)} keys.")
return True
except Exception as e:
print(f"Error initializing encoder: {str(e)}")
return False
def reset_state(self):
self.state = "initial"
self.welcome_sent = False
self.current_further_info_values = []
self.current_index = 0
self.main_k = []
self.is_telling_stories = False
def get_welcome_message(self):
return random.choice(["""Ciao, benvenuto!\n\nSono Marianna, la testa di Napoli, in napoletano 'a capa 'e Napule, una statua ritrovata per caso nel 1594. \nAll'epoca del mio ritrovamento, si pensò che fossi una rappresentazione della sirena Partenope, dalle cui spoglie, leggenda narra, nacque la città di Napoli. In seguito, diversi studiosi riconobbero in me una statua della dea Venere, probabilmente collocata in uno dei tanti templi che si trovavano nella città in epoca tardo-romana, quando ancora si chiamava Neapolis.
\nPosso raccontarti molte storie sulla città di Napoli e mostrarti le sue bellezze. \nC'è qualcosa in particolare che ti interessa?""","""Benvenuto!\nIo mi chiamo Marianna, 'a capa 'e Napule. Sono stata ritrovata casualmente nel 1594 a Napoli. All'epoca si pensava che fossi una rappresentazione della Sirena Partenope, dalle cui spoglie, secondo la leggenda, nacque la città di Napoli. In seguito, gli archeologi mi riconobbero come una statua della dea Venere, collocata, probabilmente, in uno dei templi della città di Neapolis in epoca tardo-romana.\nConosco molte storie e leggende di Napoli e posso illustrarti le sue bellezze. \nCosa ti interessa in particolare?"""])
def get_personalized_welcome_message(self, user_name):
return random.choice([f"""Ciao {user_name}, benvenuto!\n\nSono Marianna, la testa di Napoli, in napoletano 'a capa 'e Napule, una statua ritrovata per caso nel 1594. \nAll'epoca del mio ritrovamento, si pensò che fossi una rappresentazione della sirena Partenope, dalle cui spoglie, leggenda narra, nacque la città di Napoli. In seguito, diversi studiosi riconobbero in me una statua della dea Venere, probabilmente collocata in uno dei tanti templi che si trovavano nella città in epoca tardo-romana, quando ancora si chiamava Neapolis.
\nPosso raccontarti molte storie sulla città di Napoli e mostrarti le sue bellezze. \nC'è qualcosa in particolare che ti interessa?""",f"""Benvenuto, {user_name}!\nIo mi chiamo Marianna, 'a capa 'e Napule. Sono stata ritrovata casualmente nel 1594 a Napoli. All'epoca si pensava che fossi una rappresentazione della Sirena Partenope, dalle cui spoglie, secondo la leggenda, nacque la città di Napoli. In seguito, gli archeologi mi riconobbero come una statua della dea Venere, collocata, probabilmente, in uno dei templi della città di Neapolis in epoca tardo-romana.\nConosco molte storie e leggende di Napoli e posso illustrarti le sue bellezze. \nCosa ti interessa in particolare?"""])
def get_safe_example_keys(self, num_examples=3):
"""Safely get example keys from the loaded data."""
if not self.db_keys:
return []
return random.sample(self.db_keys, min(len(self.db_keys), num_examples))
def story_flow(self):
"""Gestisce la selezione casuale di una storia dai dati delle leggende."""
if not self.database_legends:
return random.choice(["Mi dispiace, al momento non ho leggende da raccontare.","Ti ho già raccontato tutte le leggende di cui sono a conoscenza!","Emmmm... Non mi viene in mente altro al momento, posso parlarti di altro?"])
available_topics = [item[0] for item in self.database_legends if item[0] not in self.main_k]
if not available_topics:
self.main_k = [] # Reset della lista delle storie raccontate
available_topics = [item[0] for item in self.database_legends]
if not available_topics:
return random.choice(["Sembra che tu abbia già ascoltato tutte le storie!","Ti ho già raccontato tutte le leggende di cui sono a conoscenza!","Emmmm... Non mi viene in mente altro al momento, posso parlarti di altro?"])
random_story_tuple = random.choice([item for item in self.database_legends if item[0] in available_topics])
topic = random_story_tuple[0]
content = random_story_tuple[1]['intro'] if 'intro' in random_story_tuple[1] else None
if not content:
return f"Mi dispiace, non ho trovato un'introduzione per la leggenda '{topic}'."
self.main_k.append(topic)
self.state = "follow_up"
self.is_telling_stories = True
return random.choice([f"Ok, lascia che ti racconti de '{topic}'.\n\n{content}\n\nVuoi che ti racconti un'altra storia?",f"Ora ti parlerò di {topic}!\n\n{content}\n\nPosso raccontarti un'altra storia?"])
def get_legend_content(self, key):
"""Helper function to get the content for a legend key."""
# Se self.database_legends è una lista di stringhe (le chiavi),
# potresti aver bisogno di caricare il contenuto da un altro file
# o averlo pre-caricato in un'altra struttura dati.
# Al momento, restituisco None, dovresti implementare la logica
# per recuperare il contenuto effettivo.
return self.database_legends.get(key) if isinstance(self.database_legends, dict) else None
def get_value(self, key,state):
"""Retrieve a value from the loaded main data by key."""
if state=="A":
for k, v in self.database:
if k == key:
return v
else:
for k, v in self.further_dataset:
if k == key:
return v
return None
def deeper_handle_query(self,message,query_embedding,CS_old):
print('Ricerca in profondità')
try:
semantic_hits = util.semantic_search(query_embedding, self.further_embeddings, top_k=3)
semantic_hits = semantic_hits[0]
cross_inp = [(message, self.db_keys_further[hit['corpus_id']]) for hit in semantic_hits]
cross_scores = self.cross_encoder.predict(cross_inp)
cross_scores = cross_scores + CS_old
reranked_hits = sorted(
[{'corpus_id': hit['corpus_id'], 'cross-score': score}
for hit, score in zip(semantic_hits, cross_scores)],
key=lambda x: x['cross-score'], reverse=True
)
for h in reranked_hits:
print(self.db_keys_further[h['corpus_id']],h['cross-score'])
best_hit = reranked_hits[0]
best_title = self.db_keys_further[best_hit['corpus_id']]
best_score = best_hit['cross-score']
return reranked_hits
except Exception as e:
print(e)
def handle_query(self, message):
"""Handle user queries by searching the database"""
try:
# Encode the user query
query_embedding = self.encoder.encode(message, convert_to_tensor=True)
# Perform semantic search on the keys
semantic_hits = util.semantic_search(query_embedding, self.db_keys_embeddings, top_k=3)
semantic_hits = semantic_hits[0]
cross_inp = [(message, self.db_keys[hit['corpus_id']]) for hit in semantic_hits]
cross_scores = self.cross_encoder.predict(cross_inp)
reranked_hits = sorted(
[{'corpus_id': hit['corpus_id'], 'cross-score': score}
for hit, score in zip(semantic_hits, cross_scores)],
key=lambda x: x['cross-score'], reverse=True
)
for h in reranked_hits:
print(self.db_keys[h['corpus_id']],h['cross-score'])
chiavi = self.db_keys
best_hit = reranked_hits[0]
best_title = self.db_keys[best_hit['corpus_id']]
best_score = best_hit['cross-score']
stateAB="A"
if best_score < 0.70:
reranked_hits = self.deeper_handle_query(message,query_embedding,cross_scores)
best_hit = reranked_hits[0]
best_title = self.db_keys_further[best_hit['corpus_id']]
best_score = best_hit['cross-score']
stateAB="B"
# Main treshold = 0.75
similarity_threshold = 0.75
# treshold granularity
if best_score < similarity_threshold:
# low confidence (< 0.35)
if best_score < 0.55:
return (random.choice(["Mi dispiace, non ho informazioni su questo argomento. Puoi chiedermi di altro sulla città di Napoli.",
"Purtroppo non riesco a rammentare questo argomento, la mia memoria non è più quella di un tempo. Chiedimi qualcos'altro su Napoli e le sue bellezze!",
"Mi dispiace tantissimo, ma non riesco a ricordare. Vuoi chiedermi altro sulla città di Napoli?"]),[best_score,best_title])
# medium confidence(0.55 - 0.75)
else:
alternative_hits = [self.db_keys[hit['corpus_id']] for hit in reranked_hits[:2]]
suggestions = " o ".join(alternative_hits)
value = self.get_value(best_title,stateAB)
if value:
partial_info = value.get('short_intro', value.get('intro', '').split('.')[0] + '.')
self.state = "query"
self.is_telling_stories = False
return (random.choice([f"Potrei avere alcune informazioni su {best_title}, ma non sono completamente sicura sia ciò che stai cercando. I miei suggerimenti sono {suggestions}. \n\nCosa ti interessa?",
f"Credo che tu stia parlando de {best_title}, ma per essere sicura di ciò che vuoi sapere, potresti specificare se parli di {suggestions}?",
f"Per assicurarmi di aver capito bene, vuoi che ti parli di {suggestions}?"]),[best_score,best_title])
else:
return (f"Ho trovato qualcosa su {best_title}, ma non sono completamente sicura. Vuoi saperne di più?",[best_score,best_title])
# high confidence (above the threshold)
if best_title is not None:
value = self.get_value(best_title,stateAB)
if value:
key = best_title
self.main_k.append(key)
self.state = "follow_up"
self.is_telling_stories = False
response = value.get('intro', '')
if isinstance(value, dict):
self.current_further_info_values = list(value.get('further_info', {}).values())
else:
self.current_further_info_values = [] # Se il valore non è un dizionario
self.current_index = 0
return (f"{response}\n\nVuoi sapere altro su {self.main_k[-1]}?",["no_data","no_data"])
else:
return (random.choice(["Mi dispiace, non ho informazioni su questo argomento. Puoi chiedermi di altro sulla città di Napoli.",
"Purtroppo non riesco a rammentare altro su questo argomento, la mia memoria non è più quella di un tempo. Chiedimi qualcos'altro su Napoli e le sue bellezze!",
"Mi dispiace tantissimo, ma non riesco a ricordare altro. Vuoi chiedermi altro sulla città di Napoli?"]),[best_score,best_title])
except Exception as e:
print(e)
self.state = "initial"
return (random.choice(["Mi dispiace, c'è stato un errore. Puoi riprovare con un'altra domanda? ",
"Scusami, sto facendo confusione. Puoi farmi un'altra domanda?",
"Mi dispiace, non ho capito. Puoi essere più preciso?"]),[best_score,best_title])
def first_query(self, message):
try:
# Encode the user query
query_embedding = self.encoder.encode(message, convert_to_tensor=True)
# Perform semantic search on the keys
semantic_hits = util.semantic_search(query_embedding, self.first_query_emb, top_k=4)
semantic_hits = semantic_hits[0]
cross_inp = [(message, self.query_dic_keys[hit['corpus_id']]) for hit in semantic_hits]
cross_scores = self.cross_encoder.predict(cross_inp)
reranked_hits = sorted(
[{'corpus_id': hit['corpus_id'], 'cross-score': score}
for hit, score in zip(semantic_hits, cross_scores)],
key=lambda x: x['cross-score'], reverse=True
)
best_hit = reranked_hits[0]
best_title = self.query_dic[self.query_dic_keys[best_hit['corpus_id']]]
best_score = best_hit['cross-score']
print(message,best_title, best_score)
# Main treshold = 0.75
similarity_threshold = 0.35
# treshold granularity
if best_score < similarity_threshold:
if message == "no":
value = "no"
return value
elif message == "si":
value = "si"
return value
else:
value='query'
return value
else:
value = best_title
return value
except Exception as e:
print(e)
self.state = "initial"
return random.choice(["Mi dispiace, c'è stato un errore. Puoi riprovare con un'altra domanda? ",
"Scusami, sto facendo confusione. Puoi farmi un'altra domanda?",
"Mi dispiace, non ho capito. Puoi essere più preciso?"])
def respond(self, message, history, user_name):
response_text=""
is_idk=False
if not message:
return ((random.choice(["Mi dispiace, c'è stato un errore. Puoi riprovare con un'altra domanda? ",
"Scusami, sto facendo confusione. Puoi farmi un'altra domanda?",
"Mi dispiace, non ho capito. Puoi essere più preciso?"]),["no_data","no_data"]), is_idk)
message = message.lower().strip()
if self.state == "initial":
value=self.first_query(message)
print("analizzando il messaggio....")
if value == "si":
self.state = "query"
self.is_telling_stories = False
return ((random.choice([f"Cosa vorresti sapere, {user_name}?","Di cosa posso parlarti?","Cosa ti interessa?",f"Chiedi pure quello che vuoi, {user_name}"]),["no_data","no_data"]), is_idk)
elif value == "no":
self.state = "end"
return ((random.choice([f"Va bene, grazie per aver parlato con me, {user_name}.",f"Ti ringrazio per aver parlato con me, {user_name}. A presto!","Spero di rivederti presto! Ciao!",f"È stato un piacere conversare con te, {user_name}, alla prossima!","Spero di esserti stata di aiuto. A presto!"]),["no_data","no_data"]), is_idk)
elif value == "non so":
return ([self.story_flow()], is_idk)
elif value == "parolacce":
return ((random.choice(["Mi dispiace sentirtelo dire. Per favore, chiedimi qualcosa","Sono veramente mortificata. Vuoi chiedermi altro?","Sono molto triste. Forse un'altra domanda miglirerà le cose!"]),["no_data","no_data"]), is_idk)
elif value == "query":
return (self.handle_query(message), is_idk)
else:
is_idk = True
return ((f"Scusa, {user_name}, non ho capito.",["no_data","no_data"]), is_idk)
elif self.state == "query":
return (self.handle_query(message), is_idk)
elif self.state == "follow_up":
value=self.first_query(message)
if value == "si":
if self.is_telling_stories:
return ([self.story_flow()], is_idk)
elif self.current_further_info_values and self.current_index < len(self.current_further_info_values):
value = self.current_further_info_values[self.current_index]
self.current_index += 1
if self.current_index < len(self.current_further_info_values):
return ((f"{value}\n\nVuoi sapere altro su {self.main_k[-1]},{user_name}?",["no_data","no_data"]),is_idk)
else:
self.state = "initial"
return ((f"{value}\n\nNon ho altre informazioni su {self.main_k[-1]}. Ti interessa qualcos'altro?",["no_data","no_data"]),is_idk)
else:
self.state = "initial"
return ((f"Non ho altre informazioni su {self.main_k[-1]}. Ti interessa qualcos'altro?",["no_data","no_data"]), is_idk)
elif value == "no":
self.state = "initial"
self.is_telling_stories = False
return ((random.choice(["C'è qualcos'altro che ti interessa?","Hai qualche altra domanda?","Vuoi sapere qualcos'altro?"]),["no_data","no_data"]), is_idk)
elif value == "non so":
return ([self.story_flow()], is_idk)
elif value == "parolacce":
return ((random.choice(["Mi dispiace sentirtelo dire. Per favore, chiedimi qualcosa","Sono veramente mortificata. Vuoi chiedermi altro?","Sono molto triste. Forse un'altra domanda miglirerà le cose!"]),["no_data","no_data"]), is_idk)
elif value == "query":
return (self.handle_query(message), is_idk)
else:
return (("Scusa, non ho capito.",["no_data","no_data"]), is_idk)
is_idk = True
return ((random.choice(["Mi dispiace, c'è stato un errore. Puoi riprovare con un'altra domanda? ",
"Scusami, sto facendo confusione. Puoi farmi un'altra domanda?",
"Mi dispiace, non ho capito. Puoi essere più preciso?"]),["no_data","no_data"]), is_idk)
def log_interaction_to_file(log_filename, user_name, question, answer, is_idk,response):
"""Aggiunge un'interazione al file di log JSONL della sessione corrente."""
if len(response)<2:
log_entry = {
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"user_name": user_name,
"question": question,
"answer": answer,
"is_idk_response": is_idk,
"Similarity Score":"no_data",
"title":"no_data"
}
else:
log_entry = {
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"user_name": user_name,
"question": question,
"answer": answer,
"is_idk_response": is_idk,
"Similarity Score":str(response[0]),
"title":str(response[1][:20])
}
try:
with open(log_filename, "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
except Exception as e:
print(f"Errore durante la scrittura del log locale: {e}") # Sostituire con un logger reale se preferisci
def upload_log_file_to_hf_dataset(local_log_path, user_name):
"""Carica il file di log della sessione sul Dataset Repository di Hugging Face."""
if not os.path.exists(local_log_path) or os.path.getsize(local_log_path) == 0:
print(f"File di log {local_log_path} non trovato o vuoto. Nessun caricamento.")
return
# Definisci un percorso significativo nel repository del dataset
timestamp_str = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H%M%S")
# Sanitizza il nome utente per il nome del file, se necessario
sanitized_user_name = "".join(c if c.isalnum() else "_" for c in user_name) # Semplice sanitizzazione
path_in_repo = f"conversations/{timestamp_str}_{sanitized_user_name}.jsonl"
try:
hf_api.upload_file(
path_or_fileobj=local_log_path,
path_in_repo=path_in_repo,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Aggiunta conversazione per {user_name}"
)
print(f"File di log {local_log_path} caricato su {DATASET_REPO_ID} come {path_in_repo}")
# Opzionale: cancella il file locale dopo il caricamento
# os.remove(local_log_path)
except Exception as e:
print(f"Errore durante il caricamento del log su Hugging Face Hub: {e}")
def main():
bot = MariannaBot()
if not bot.initialize_encoder():
print("Failed to initialize encoder. Exiting.")
return
# --- Funzioni per l'interfaccia Gradio (con logging potenziato) ---
def start_chat_with_name(name_input_value):
if not name_input_value.strip():
return None, None, gr.update(visible=True), gr.update(visible=False), None, ""
user_name = name_input_value.strip()
# Crea un file di log locale univoco per questa sessione
session_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S%f")
sanitized_user_name = "".join(c if c.isalnum() else "_" for c in user_name)
current_session_log_file = os.path.join(LOG_DIR, f"session_{session_timestamp}_{sanitized_user_name}.jsonl")
personalized_welcome = bot.get_personalized_welcome_message(user_name)
initial_chatbot_value = [[None, personalized_welcome]]
# Log dell'inizio della sessione (opzionale, ma potrebbe essere utile)
# log_interaction_to_file(current_session_log_file, user_name, "SESSION_START", personalized_welcome, False)
return user_name, current_session_log_file, gr.update(visible=False), gr.update(visible=True), initial_chatbot_value, ""
def update_chatbot_interface(message, history, current_user_name, current_session_log_file):
if not current_user_name or not current_session_log_file:
history = history or []
return history + [[None, "Errore: sessione non inizializzata correttamente."]], "", current_session_log_file
if not message.strip():
return history, "", current_session_log_file
response, is_idk = bot.respond(message, history, current_user_name)
if len(response)<2:
scores = ["no_data","no_data"]
response_text = response
else:
scores = response[1]
response_text = response[0]
# Salva l'interazione nel file di log della sessione
log_interaction_to_file(current_session_log_file, current_user_name, message, response_text, is_idk, scores)
new_history = history + [[message, response_text]]
return new_history, "", current_session_log_file # Restituisce il log_file per mantenerlo nello stato
def handle_reset_chat(current_user_name, current_session_log_file):
if current_user_name and current_session_log_file:
print(f"Tentativo di caricamento del file di log: {current_session_log_file} per l'utente {current_user_name}")
upload_log_file_to_hf_dataset(current_session_log_file, current_user_name)
bot.reset_state()
initial_welcome_for_reset = "Chat resettata. Inserisci nuovamente il tuo nome per iniziare." # O un messaggio più generico
return [[None, initial_welcome_for_reset]], "", gr.update(visible=True), gr.update(visible=False), None # Resetta session_log_file_state
# --- Struttura dell'interfaccia Gradio ---
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as demo:
user_name_state = gr.State(None)
session_log_file_state = gr.State(None) # Stato per il percorso del file di log della sessione corrente
with gr.Column(elem_id="name_input_section", visible=True) as name_section:
gr.Markdown("## Benvenuto/a!")
gr.Markdown("Sono Marianna, 'La Testa di Napoli'. Prima di iniziare, dimmi come ti chiami.")
name_input_tb = gr.Textbox(label="Il tuo nome di battesimo:", placeholder="Es. Sofia")
start_chat_btn = gr.Button("Inizia a chattare con Marianna")
with gr.Column(elem_id="chat_interface_section", visible=False) as chat_section:
# ... (come prima: Markdown titolo, Image, Chatbot, Textbox messaggio, Button clear, Examples)
with gr.Row():
gr.Markdown("## Chatta con Marianna - 'La Testa di Napoli'")
with gr.Row():
gr.Image("marianna-102.jpeg", elem_id="marianna-image", width=250) # Assicurati che l'immagine esista
chatbot_display = gr.Chatbot(label="Chat", height=500)
chat_message_input = gr.Textbox(label="Il tuo messaggio:", placeholder="Scrivi il tuo messaggio qui...", container=False)
with gr.Row():
clear_button = gr.Button("Clicca qui per ricominciare e salvare la chat") # Testo modificato
example_keys = bot.get_safe_example_keys()
if example_keys:
gr.Examples(examples=[key for key in example_keys], inputs=chat_message_input, label="Suggerimenti:")
# --- Logica degli Eventi ---
start_chat_btn.click(
fn=start_chat_with_name,
inputs=[name_input_tb],
outputs=[user_name_state, session_log_file_state, name_section, chat_section, chatbot_display, chat_message_input]
)
chat_message_input.submit(
fn=update_chatbot_interface,
inputs=[chat_message_input, chatbot_display, user_name_state, session_log_file_state],
outputs=[chatbot_display, chat_message_input, session_log_file_state] # Assicurati di restituire session_log_file_state
)
clear_button.click(
fn=handle_reset_chat,
inputs=[user_name_state, session_log_file_state],
outputs=[chatbot_display, chat_message_input, name_section, chat_section, session_log_file_state] # Resetta anche session_log_file_state
)
demo.launch(share=True, debug=True)
if __name__ == "__main__":
main()