-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
144 lines (119 loc) · 5.72 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
import logging
import sys
from threading import Thread
import telebot
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, KeyboardButton, ReplyKeyboardMarkup
from core import strings, meme_provider, config, models, scheduler
bot = telebot.TeleBot(config.BOT_TOKEN)
def keyboard():
markup = ReplyKeyboardMarkup(True, False)
markup.add(KeyboardButton(strings.KEYBOARD_GET_MEME))
return markup
def inline_keyboard(payload):
markup = InlineKeyboardMarkup()
markup.row_width = 2
markup.add(InlineKeyboardButton("👍", callback_data=f"meme_like:{payload}"),
InlineKeyboardButton("👎", callback_data=f"meme_dislike:{payload}"))
return markup
@bot.message_handler(commands=["start"])
def start(message):
if not models.User.select().where(models.User.user_id == message.from_user.id).exists():
models.User.create(user_id=message.from_user.id)
bot.send_message(message.chat.id, strings.START, reply_markup=keyboard())
@bot.message_handler(content_types=["text"])
def handle_text(message):
if message.text == strings.KEYBOARD_GET_MEME:
response = meme_provider.get_meme_image(message.from_user.id)
if response.error:
bot.send_message(message.chat.id, strings.REPLY_NO_MEMES_LEFT, reply_markup=keyboard())
else:
bot.send_chat_action(message.chat.id, 'upload_photo')
bot.send_photo(message.chat.id, response.image,
reply_markup=inline_keyboard(f"{message.from_user.id}:{response.post_id}:{message.chat.id}"))
else:
bot.send_message(message.chat.id, strings.REPLY_COMMAND_DOES_NOT_EXIST, reply_markup=keyboard())
@bot.callback_query_handler(func=lambda call: True)
def callback_query(call):
changed_reaction = False
if "meme_like" in call.data:
# Structure is call:user_id:post_id:chat_id
tg_user_id = call.data.split(':')[1]
post_id = call.data.split(':')[2]
db_user = models.User.get(models.User.user_id == tg_user_id)
db_post = models.Post.get(models.Post.id == post_id)
# Create new assessment
if not models.Assessment.select().where(models.Assessment.user == db_user).where(
models.Assessment.post == db_post).exists():
models.Assessment.create(post=db_post, user=db_user, positive=True)
bot.answer_callback_query(call.id, '👍')
# Change user's assessment
elif not models.Assessment.select().where(models.Assessment.user == db_user).where(
models.Assessment.post == db_post).get().positive:
db_assessment = models.Assessment.select().where(models.Assessment.user == db_user).where(
models.Assessment.post == db_post).get()
db_assessment.positive = True
db_assessment.save()
changed_reaction = True
bot.answer_callback_query(call.id, strings.REPLY_ASSESSMENT_CHANGED)
else:
bot.answer_callback_query(call.id, strings.REPLY_CANNOT_RATE_TWICE)
elif "meme_dislike" in call.data:
tg_user_id = call.data.split(':')[1]
post_id = call.data.split(':')[2]
db_user = models.User.get(models.User.user_id == tg_user_id)
db_post = models.Post.get(models.Post.id == post_id)
if not models.Assessment.select().where(models.Assessment.user == db_user).where(
models.Assessment.post == db_post).exists():
models.Assessment.create(post=db_post, user=db_user, positive=False)
bot.answer_callback_query(call.id, '👎')
elif models.Assessment.select().where(models.Assessment.user == db_user).where(
models.Assessment.post == db_post).get().positive:
db_assessment = models.Assessment.select().where(models.Assessment.user == db_user).where(
models.Assessment.post == db_post).get()
db_assessment.positive = False
db_assessment.save()
changed_reaction = True
bot.answer_callback_query(call.id, strings.REPLY_ASSESSMENT_CHANGED)
else:
bot.answer_callback_query(call.id, strings.REPLY_CANNOT_RATE_TWICE)
# TODO: this is a code copy
if not changed_reaction:
tg_user_id = call.data.split(':')[1]
tg_chat_id = call.data.split(':')[3]
response = meme_provider.get_meme_image(tg_user_id)
if not response.error:
try:
bot.send_chat_action(tg_chat_id, 'upload_photo')
bot.send_photo(tg_chat_id, response.image,
reply_markup=inline_keyboard(f"{tg_user_id}:{response.post_id}:{tg_chat_id}"))
except:
logging.warning(f"Cannot upload an image, size is too big. Calling recursion.")
callback_query(call)
def get_severity_level():
levels = {
0: logging.DEBUG,
1: logging.INFO,
2: logging.WARNING,
3: logging.ERROR,
4: logging.FATAL
}
try:
return levels[int(config.LOGGING_LEVEL)]
except KeyError as e:
raise ValueError('Undefined unit: {}'.format(e.args[0]))
def main():
logging.basicConfig(filename='latest.log', encoding='utf-8', level=get_severity_level(),
format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
logging.debug('Warming up.')
models.initialize()
meme_provider.initialize()
logging.debug('Allocating a thread for telegram\'s API infinite polling.')
thread = Thread(target=bot.infinity_polling)
thread.daemon = True
thread.start()
logging.info('Bot started.')
asyncio.run(scheduler.run_coroutines())
if __name__ == '__main__':
main()