-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
206 lines (178 loc) · 7.69 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import json
import requests
import os
import sentry_sdk
from utilobot.helper import make_request, add_document_to_firestore
from sentry_sdk.integrations.gcp import GcpIntegration
from utilobot.bot_functionalities import SongParser, TelegramParseCallbackQueryData, URLShortner
from random import randint
from utilobot.exceptions import BaseFunctionalityException
from sentry_sdk import capture_exception
from google.cloud import firestore
sentry_sdk.init(
os.environ.get('SENTRY_DSN'),
integrations=[GcpIntegration(timeout_warning=True)],
traces_sample_rate=1.0,
)
TELE_TOKEN = os.environ['BOT_TOKEN']
URL = "https://api.telegram.org/bot{}/".format(TELE_TOKEN)
def sendChatAction(chat_action, chat_id):
assert chat_action in ['typing', 'upload_photo', 'record_video', 'upload_video',
'record_audio', 'upload_audio', 'upload_document', 'find_location',
'record_video_note', 'upload_video_note'], "Unidentified chat action detected!"
args = {}
args['chat_id'] = chat_id
args['action'] = chat_action
request, session = make_request("post", URL+"sendChatAction", None, {'json': args})
def send_message(text, chat_id, args={}, relative_url=None):
args['chat_id'] = chat_id
if text:
args['text'] = text
if "parse_mode" not in args:
args['parse_mode'] = "HTML"
if not relative_url:
relative_url = "sendMessage"
request, session = make_request("post", URL+relative_url, None, {'json': args})
def send_inline_keyboard(text, chat_id, reply_markup, data):
args = {
"reply_markup": {
reply_markup: data
}
}
send_message(text, chat_id, args)
def send_dice(chat_id):
args = {
"emoji": "🎲"
}
send_message(None, chat_id, args, "sendDice")
def parse_user_text(msg_text, chat_id):
send_message("Unknown message! Please try again", chat_id)
def parse_bot_commands(command, chat_id, body):
clean_command = command.lower()
if "/song" in clean_command:
# The user wants to parse the song URL
send_message("Paring song, please wait !!!", chat_id)
sendChatAction('typing', chat_id)
song_object = SongParser(command)
data_list = song_object.convert_song()
song_open_button = []
for data in data_list["links"]:
song_open_button.append([{
"text": "Open on {}".format(data['service_name']),
"url": data['link']
}])
message_text = "Select your desired music service for {} from below.".format(data_list['name'])
send_inline_keyboard(message_text, chat_id, "inline_keyboard", song_open_button)
elif "/joke" in clean_command:
# The user wants to hear a joke
message = "Select which joke you want!"
joke_buttons = [[{'text':"Chuch Norris nerdy joke" , 'callback_data' : 'JOKE_CHUCKN_nerdy'}],[{'text':'Chuch Norris explicit joke' , 'callback_data' : 'JOKE_CHUCKN_explicit'}]]
send_inline_keyboard(message, chat_id, "inline_keyboard", joke_buttons)
elif "/short" in clean_command:
# The user wants to shorten a URL
url_shortner_object = URLShortner(command)
short_url = url_shortner_object.get_short_url()
message = "The shortend URL for the provided link is {}".format(short_url)
send_message(message, chat_id)
elif "/dice" in clean_command:
send_dice(chat_id)
elif "/start" in clean_command:
# The starting message send to the user
welcome_message_1 = [
"Hi ",
"Hola ",
"Hey ",
"Hey there, "
]
welcome_message_2 = [
"! Looks like we haven't met before.",
"! Looks like this is our first meet.",
"! How are you doing.",
]
welcome_message_3 = [
" Send /help to know my secrets."
]
message = body.get('message')
name = None
if message and message.get('chat'):
chat = message['chat']
if chat.get('type') == "private":
name = chat.get('first_name')
rand_index1 = randint(0,len(welcome_message_1) -1)
rand_index2 = randint(0,len(welcome_message_2) -1)
rand_index3 = randint(0,len(welcome_message_3) -1)
if name:
msg1 = welcome_message_1[rand_index1] + name
else:
msg1 = welcome_message_1[rand_index1][:-1]
msg = msg1 + welcome_message_2[rand_index2]+ welcome_message_3[rand_index3]
send_message(msg, chat_id)
elif "/help" in clean_command:
help_message = "Following are the commands and usages for the bot.\n\n" \
"<strong>1. Convert songs form one streaming service to others</strong>\n" \
"Just use the command /song along with the URL of the song and receive links of the same song on other services.\n" \
"For e.g. /song https://www.youtube.com/watch?v=w2Ov5jzm3j8\n\n" \
"<strong>2. Get random jokes</strong>\n" \
"Send /joke and follow the onscreen instructions to receive the jokes.\n\n" \
"<strong>3. Shorten a URL</strong>\n" \
"Use the command /short and send the URL you wish to shorten and receive a short URL instantly.\n" \
"For e.g. /short https://www.google.com\n\n" \
"<strong>4. Play dice game</strong>\n" \
"Use the command /dice and you will receive a random dice outcome."
send_message(help_message, chat_id, {"disable_web_page_preview": True})
else:
send_message("Unknown command! Please try again", chat_id)
def parse_incoming_request(body, chat_id):
if 'message' in body:
# Parse the message reply
msg = body.get('message')
entities = msg.get('entities')
if entities:
entities_list = [entity['type'] for entity in entities if entity.get("type")] # Get list of all entity types
# Parse entity types in precedance order
if "bot_command" in entities_list:
parse_bot_commands(msg.get('text'), chat_id, body)
if 'callback_query' in body:
# Parse the callback query
callback_query = body.get('callback_query')
data = callback_query.get('data')
if data:
data_list = data.split("_")
callback_query_parser_object = TelegramParseCallbackQueryData(data_list)
text, args = callback_query_parser_object.parse()
send_message(text, chat_id, args)
# else:
# if msg and msg.get('text'):
# parse_user_text(msg['text'].strip(), chat_id)
def get_message_data(body):
if 'message' in body:
return body['message']
if 'callback_query' in body:
return body['callback_query']['message']
def get_chat_id_from_body(body):
data = get_message_data(body)
return data['chat']['id']
def lambda_handler(request):
body = request.get_json()
print(body)
chat_id = get_chat_id_from_body(body)
chat_data = get_message_data(body)['chat']
firestore_data = {
"chat_id": str(chat_id),
"first_name": chat_data.get("first_name", " "),
"last_name": chat_data.get("last_name", " "),
"username": chat_data.get("username", " "),
"type": chat_data.get("type", " "),
}
add_document_to_firestore("telegramusers", str(chat_id), firestore_data)
try:
parse_incoming_request(body, chat_id)
except BaseFunctionalityException as e:
# capture_exception(e)
send_message(e.return_msg, chat_id)
except Exception as e:
# capture_exception(e)
send_message("Some internal error occurred! Please try again later.", chat_id)
return {
'statusCode': 200
}