-
Notifications
You must be signed in to change notification settings - Fork 5
/
filters.py
127 lines (113 loc) · 4.63 KB
/
filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""Filters
Available Commands:
.savefilter
.listfilters
.clearfilter"""
import asyncio
import re
from telethon import events, utils
from telethon.tl import types
from ULTRA.plugins.sql_helper.filter_sql import get_filter, add_filter, remove_filter, get_all_filters, remove_all_filters
DELETE_TIMEOUT = 0
TYPE_TEXT = 0
TYPE_PHOTO = 1
TYPE_DOCUMENT = 2
global last_triggered_filters
last_triggered_filters = {} # pylint:disable=E0602
@command(incoming=True)
async def on_snip(event):
global last_triggered_filters
name = event.raw_text
if event.chat_id in last_triggered_filters:
if name in last_triggered_filters[event.chat_id]:
# avoid ULTRA spam
# "I demand rights for us bots, we are equal to you humans." -Henri Koivuneva (t.me/UserbotTesting/2698)
return False
snips = get_all_filters(event.chat_id)
if snips:
for snip in snips:
pattern = r"( |^|[^\w])" + re.escape(snip.keyword) + r"( |$|[^\w])"
if re.search(pattern, name, flags=re.IGNORECASE):
if snip.snip_type == TYPE_PHOTO:
media = types.InputPhoto(
int(snip.media_id),
int(snip.media_access_hash),
snip.media_file_reference
)
elif snip.snip_type == TYPE_DOCUMENT:
media = types.InputDocument(
int(snip.media_id),
int(snip.media_access_hash),
snip.media_file_reference
)
else:
media = None
message_id = event.message.id
if event.reply_to_msg_id:
message_id = event.reply_to_msg_id
await event.reply(
snip.reply,
file=media
)
if event.chat_id not in last_triggered_filters:
last_triggered_filters[event.chat_id] = []
last_triggered_filters[event.chat_id].append(name)
await asyncio.sleep(DELETE_TIMEOUT)
last_triggered_filters[event.chat_id].remove(name)
@command(pattern="^.savefilter (.*)")
async def on_snip_save(event):
name = event.pattern_match.group(1)
msg = await event.get_reply_message()
if msg:
snip = {'type': TYPE_TEXT, 'text': msg.message or ''}
if msg.media:
media = None
if isinstance(msg.media, types.MessageMediaPhoto):
media = utils.get_input_photo(msg.media.photo)
snip['type'] = TYPE_PHOTO
elif isinstance(msg.media, types.MessageMediaDocument):
media = utils.get_input_document(msg.media.document)
snip['type'] = TYPE_DOCUMENT
if media:
snip['id'] = media.id
snip['hash'] = media.access_hash
snip['fr'] = media.file_reference
add_filter(event.chat_id, name, snip['text'], snip['type'], snip.get('id'), snip.get('hash'), snip.get('fr'))
await event.edit(f"filter {name} saved successfully. Get it with {name}")
else:
await event.edit("Reply to a message with `savefilter keyword` to save the filter")
@command(pattern="^.listfilters$")
async def on_snip_list(event):
all_snips = get_all_filters(event.chat_id)
OUT_STR = "Available Filters in the Current Chat:\n"
if len(all_snips) > 0:
for a_snip in all_snips:
OUT_STR += f"👉 {a_snip.keyword} \n"
else:
OUT_STR = "No Filters. Start Saving using `.savefilter`"
if len(OUT_STR) > 4096:
with io.BytesIO(str.encode(OUT_STR)) as out_file:
out_file.name = "filters.text"
await bot.send_file(
event.chat_id,
out_file,
force_document=True,
allow_cache=False,
caption="Available Filters in the Current Chat",
reply_to=event
)
await event.delete()
else:
await event.edit(OUT_STR)
@command(pattern="^.clearfilter (.*)")
async def on_snip_delete(event):
name = event.pattern_match.group(1)
remove_filter(event.chat_id, name)
await event.edit(f"filter {name} deleted successfully")
@command(pattern="^.clearallfilters$")
async def on_all_snip_delete(event):
remove_all_filters(event.chat_id)
await event.edit(f"filters **in current chat** deleted successfully")