Vargock commited on
Commit
65cfd5e
·
0 Parent(s):

Commiting an old Telegram project based on HG AI models to recognize text tone

Browse files
Files changed (5) hide show
  1. .env.example +3 -0
  2. .gitignore +5 -0
  3. README +3 -0
  4. bot.py +232 -0
  5. db.py +108 -0
.env.example ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ # .env.example (пушится)
2
+ TELEGRAM_TOKEN=your_telegram_token_here
3
+ HF_API_TOKEN=your_hf_token_here
.gitignore ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ # local models, env files, venv
2
+ .env
3
+ .env.local
4
+ .venv/
5
+ __pycache__/
README ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ This is a simple Telegram bot that uses the Hugging Face API to detect the sentiment of text messages. I don’t provide hosting for it, so to use it, you’ll need to create your own Telegram bot and Hugging Face API keys. Then, put them into a .env file in the root folder, following the format in the .env.example file.
2
+
3
+ Simply copy .env.example to .env and fill in your keys.
bot.py ADDED
@@ -0,0 +1,232 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ from collections import deque
4
+ import logging
5
+ import requests
6
+ from dotenv import load_dotenv
7
+ from telegram.ext import ApplicationBuilder, MessageHandler, CommandHandler, filters
8
+
9
+ from db import init_db, save_message, get_recent
10
+
11
+ load_dotenv()
12
+
13
+ logging.basicConfig(
14
+ level=logging.INFO,
15
+ format="%(asctime)s [%(levelname)s] %(message)s"
16
+ )
17
+ log = logging.getLogger("sentiment-bot")
18
+
19
+ TOKEN = os.getenv("TELEGRAM_TOKEN")
20
+ HF_API_TOKEN = os.getenv("HF_API_TOKEN")
21
+
22
+ # Models
23
+
24
+ MODELS = {
25
+ "en_sentiment": "cardiffnlp/twitter-roberta-base-sentiment-latest",
26
+ "ru_sentiment": "blanchefort/rubert-base-cased-sentiment",
27
+ "emotion": "j-hartmann/emotion-english-distilroberta-base",
28
+ "toxicity": "unitary/toxic-bert"
29
+ }
30
+
31
+ LABELS = {
32
+ "en": {"LABEL_0": "NEGATIVE", "LABEL_1": "NEUTRAL", "LABEL_2": "POSITIVE"},
33
+ "ru": {"negative": "НЕГАТИВНЫЙ", "neutral": "НЕЙТРАЛЬНЫЙ", "positive": "ПОЗИТИВНЫЙ"}
34
+ }
35
+
36
+ USER_HISTORY = {}
37
+
38
+ # Helper Functions
39
+
40
+ def detect_lang(text: str) -> str:
41
+ cyr = sum('а' <= c <= 'я' or 'А' <= c <= 'Я' for c in text)
42
+ lat = sum('a' <= c <= 'z' or 'A' <= c <= 'Z' for c in text)
43
+ return "ru" if cyr > lat else "en"
44
+
45
+
46
+ def hf_infer(model: str, text: str):
47
+ """Call Hugging Face inference API safely."""
48
+ try:
49
+ resp = requests.post(
50
+ f"https://api-inference.huggingface.co/models/{model}",
51
+ headers={"Authorization": f"Bearer {HF_API_TOKEN}"},
52
+ json={"inputs": text},
53
+ timeout=15
54
+ )
55
+ resp.raise_for_status()
56
+ out = resp.json()
57
+ if isinstance(out, list) and out:
58
+ return out
59
+ return out
60
+ except Exception as e:
61
+ log.warning(f"HF inference failed ({model}): {e}")
62
+ return None
63
+
64
+
65
+ def update_history(user_id: int, sentiment: str, max_len=10) -> int:
66
+ history = USER_HISTORY.setdefault(user_id, deque(maxlen=max_len))
67
+ history.append((sentiment, time.time()))
68
+ recent = list(history)[-5:]
69
+ return sum(s in ("POSITIVE", "ПОЗИТИВНЫЙ") for s, _ in recent)
70
+
71
+
72
+ def bar(score: float) -> str:
73
+ filled = int(score * 10)
74
+ return "█" * filled + "░" * (10 - filled)
75
+
76
+
77
+ # Core Logic
78
+
79
+ async def analyze(update, context):
80
+ text = update.message.text.strip()
81
+ if not text:
82
+ return
83
+
84
+ user_id = update.effective_user.id
85
+ lang = detect_lang(text)
86
+
87
+ # Choose sentiment model
88
+ sentiment_model = MODELS["ru_sentiment"] if lang == "ru" else MODELS["en_sentiment"]
89
+ labels = LABELS[lang]
90
+
91
+ # HF API call
92
+ sentiment_data = hf_infer(sentiment_model, text)
93
+ if not sentiment_data:
94
+ await update.message.reply_text("⚠️ Couldn't analyze right now, try again.")
95
+ return
96
+
97
+ # Normalize output inline
98
+ if isinstance(sentiment_data, dict):
99
+ sentiment_data = [sentiment_data]
100
+ elif isinstance(sentiment_data, list) and len(sentiment_data) == 1 and isinstance(sentiment_data[0], list):
101
+ sentiment_data = sentiment_data[0]
102
+
103
+ # Filter invalid entries
104
+ sentiment_data = [item for item in sentiment_data if isinstance(item, dict) and "label" in item and "score" in item]
105
+
106
+ if not sentiment_data:
107
+ await update.message.reply_text("⚠️ Couldn't analyze right now, try again.")
108
+ return
109
+
110
+
111
+ top_raw_label = max(sentiment_data, key=lambda x: x["score"])
112
+ sentiment = labels.get(top_raw_label["label"].lower() if lang == "ru" else top_raw_label["label"], top_raw_label["label"]).upper()
113
+ score = top_raw_label["score"]
114
+
115
+ save_message(user_id, text, sentiment, score)
116
+ pos_count = update_history(user_id, sentiment)
117
+
118
+ lines = []
119
+
120
+ if lang == "en":
121
+ lines.append({
122
+ "POSITIVE": "😊 Looks positive!",
123
+ "NEGATIVE": "😞 Sounds negative.",
124
+ "NEUTRAL": "😐 Neutral tone."
125
+ }[sentiment])
126
+ lines.append(f"Sentiment: {sentiment} ({int(score*100)}%) [{bar(score)}]")
127
+ lines.append(f"Positive in last 5 messages: {pos_count}/5")
128
+
129
+ # Emotion
130
+ emo_data = hf_infer(MODELS["emotion"], text)
131
+ if isinstance(emo_data, dict):
132
+ emo_data = [emo_data]
133
+ elif isinstance(emo_data, list) and len(emo_data) == 1 and isinstance(emo_data[0], list):
134
+ emo_data = emo_data[0]
135
+ emo_data = [x for x in emo_data if isinstance(x, dict) and "label" in x and "score" in x]
136
+ if emo_data:
137
+ best = max(emo_data, key=lambda x: x["score"])
138
+ lines.append(f"Emotion: {best['label'].capitalize()}")
139
+
140
+ # Toxicity
141
+ tox_data = hf_infer(MODELS["toxicity"], text)
142
+ if isinstance(tox_data, dict):
143
+ tox_data = [tox_data]
144
+ elif isinstance(tox_data, list) and len(tox_data) == 1 and isinstance(tox_data[0], list):
145
+ tox_data = tox_data[0]
146
+ tox_data = [x for x in tox_data if isinstance(x, dict) and "label" in x and "score" in x]
147
+ if tox_data:
148
+ t = max(tox_data, key=lambda x: x["score"])
149
+ lines.append(f"Toxicity: {int(t['score']*100)}% ({t['label']})")
150
+
151
+ else:
152
+ lines.append({
153
+ "ПОЗИТИВНЫЙ": "😄 Отличный настрой!",
154
+ "НЕГАТИВНЫЙ": "😞 Похоже на негатив.",
155
+ "НЕЙТРАЛЬНЫЙ": "😐 Спокойный тон."
156
+ }[sentiment])
157
+ lines.append(f"Тональность: {sentiment} ({int(score*100)}%) [{bar(score)}]")
158
+ lines.append(f"Позитивных за последние 5: {pos_count}/5")
159
+
160
+ await update.message.reply_text("\n".join(lines), parse_mode="HTML")
161
+
162
+
163
+ # Telegram Bot, User Commands
164
+
165
+ async def start(update, context):
166
+ await update.message.reply_text(
167
+ "👋 Hey! I'm a bot using Hugging Face models.\n"
168
+ "I analyze your messages for sentiment, emotion, and toxicity.\n"
169
+ "Russian is supported but with limited features.\n\nUse /info to learn more."
170
+ )
171
+
172
+ async def info(update, context):
173
+ await update.message.reply_text(
174
+ "ℹ️ <b>Features:</b>\n"
175
+ "- Sentiment analysis (EN & RU)\n"
176
+ "- Emotion detection (EN only)\n"
177
+ "- Toxicity detection (EN only)\n"
178
+ "- Tracks last messages per user\n"
179
+ "- Confidence bars & emoji insights\n\n"
180
+ "Use /credits for model sources or /mystats for your stats.",
181
+ parse_mode="HTML"
182
+ )
183
+
184
+ async def credits(update, context):
185
+ await update.message.reply_text(
186
+ "ℹ️ <b>Model Credits:</b>\n"
187
+ "- <a href='https://huggingface.co/cardiffnlp/twitter-roberta-base-sentiment-latest'>English Sentiment</a>\n"
188
+ "- <a href='https://huggingface.co/blanchefort/rubert-base-cased-sentiment'>Russian Sentiment</a>\n"
189
+ "- <a href='https://huggingface.co/j-hartmann/emotion-english-distilroberta-base'>Emotion</a>\n"
190
+ "- <a href='https://huggingface.co/unitary/toxic-bert'>Toxicity</a>",
191
+ parse_mode="HTML"
192
+ )
193
+
194
+ async def mystats(update, _):
195
+ user_id = update.effective_user.id
196
+ history = get_recent(user_id, 50)
197
+ if not history:
198
+ await update.message.reply_text("No messages analyzed yet.")
199
+ return
200
+
201
+ sentiments = [s for _, s, _, _ in history]
202
+ total = len(sentiments)
203
+ pos = sum(s in ("POSITIVE", "ПОЗИТИВНЫЙ") for s in sentiments)
204
+ neu = sum(s in ("NEUTRAL", "НЕЙТРАЛЬНЫЙ") for s in sentiments)
205
+ neg = sum(s in ("NEGATIVE", "НЕГАТИВНЫЙ") for s in sentiments)
206
+
207
+ await update.message.reply_text(
208
+ f"Your stats:\n"
209
+ f"Positive: {pos} ({pos*100//total}%)\n"
210
+ f"Neutral: {neu} ({neu*100//total}%)\n"
211
+ f"Negative: {neg} ({neg*100//total}%)"
212
+ )
213
+
214
+
215
+ # Run Bot
216
+
217
+ def main():
218
+ init_db()
219
+ app = ApplicationBuilder().token(TOKEN).build()
220
+
221
+ app.add_handler(CommandHandler("start", start))
222
+ app.add_handler(CommandHandler("info", info))
223
+ app.add_handler(CommandHandler("credits", credits))
224
+ app.add_handler(CommandHandler("mystats", mystats))
225
+ app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, analyze))
226
+
227
+ log.info("Bot running...")
228
+ app.run_polling()
229
+
230
+
231
+ if __name__ == "__main__":
232
+ main()
db.py ADDED
@@ -0,0 +1,108 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # db.py
2
+ import sqlite3
3
+ import logging
4
+ from typing import Optional, List, Tuple, Dict
5
+
6
+ logger = logging.getLogger(__name__)
7
+ DB_PATH = "data.db"
8
+
9
+ # Module-level cached connection
10
+ _conn: Optional[sqlite3.Connection] = None
11
+
12
+
13
+ def get_db_connection() -> Optional[sqlite3.Connection]:
14
+ global _conn
15
+ if _conn:
16
+ return _conn
17
+
18
+ try:
19
+ conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=20)
20
+ conn.row_factory = sqlite3.Row
21
+ _conn = conn
22
+ return conn
23
+ except sqlite3.Error as e:
24
+ logger.error("Database connection error: %s", e)
25
+ return None
26
+
27
+
28
+ def init_db() -> None:
29
+ conn = get_db_connection()
30
+
31
+ if not conn:
32
+ raise RuntimeError("Could not obtain database connection")
33
+
34
+ try:
35
+ cur = conn.cursor()
36
+ # Enable WAL for better concurrency
37
+ cur.execute("PRAGMA journal_mode=WAL")
38
+ # Create table
39
+ cur.execute(
40
+ """
41
+ CREATE TABLE IF NOT EXISTS history (
42
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
43
+ user_id INTEGER NOT NULL,
44
+ text TEXT NOT NULL,
45
+ sentiment TEXT NOT NULL,
46
+ confidence REAL NOT NULL,
47
+ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
48
+ )
49
+ """
50
+ )
51
+ # Helpful indexes
52
+ cur.execute("CREATE INDEX IF NOT EXISTS idx_user_id ON history(user_id)")
53
+ cur.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON history(timestamp)")
54
+ cur.execute("CREATE INDEX IF NOT EXISTS idx_user_timestamp ON history(user_id, timestamp)")
55
+ conn.commit()
56
+ logger.info("Database initialized")
57
+
58
+ except sqlite3.Error as e:
59
+ logger.exception("Database initialization error: %s", e)
60
+ raise
61
+
62
+
63
+ def save_message(user_id: int, text: str, sentiment: str, confidence: float) -> bool:
64
+ conn = get_db_connection()
65
+ if not conn:
66
+ return False
67
+
68
+ try:
69
+ cur = conn.cursor()
70
+ cur.execute(
71
+ "INSERT INTO history (user_id, text, sentiment, confidence) VALUES (?, ?, ?, ?)",
72
+ (user_id, text, sentiment, confidence),
73
+ )
74
+ conn.commit()
75
+ logger.debug("Saved message for user %s", user_id)
76
+ return True
77
+ except sqlite3.Error as e:
78
+ logger.exception("Error saving message: %s", e)
79
+ # rollback not strictly necessary after exception, but safe
80
+ try:
81
+ conn.rollback()
82
+ except Exception:
83
+ pass
84
+ return False
85
+
86
+
87
+ def get_recent(user_id: int, limit: int = 10) -> List[Tuple]:
88
+ conn = get_db_connection()
89
+ if not conn:
90
+ return []
91
+
92
+ try:
93
+ cur = conn.cursor()
94
+ cur.execute(
95
+ """
96
+ SELECT text, sentiment, confidence, timestamp
97
+ FROM history
98
+ WHERE user_id = ?
99
+ ORDER BY timestamp DESC
100
+ LIMIT ?
101
+ """,
102
+ (user_id, limit),
103
+ )
104
+ rows = cur.fetchall()
105
+ return [tuple(r) for r in rows]
106
+ except sqlite3.Error as e:
107
+ logger.exception("Error fetching recent messages: %s", e)
108
+ return []