-
Notifications
You must be signed in to change notification settings - Fork 912
Add whitelist option to URL lock #27
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,14 +7,15 @@ | |
from telegram.error import BadRequest | ||
from telegram.ext import CommandHandler, MessageHandler, Filters | ||
from telegram.ext.dispatcher import run_async | ||
from telegram.utils.helpers import mention_html | ||
from telegram.utils.helpers import mention_html, escape_markdown | ||
|
||
import tg_bot.modules.sql.locks_sql as sql | ||
from tg_bot import dispatcher, SUDO_USERS, LOGGER | ||
from tg_bot.modules.disable import DisableAbleCommandHandler | ||
from tg_bot.modules.helper_funcs.chat_status import can_delete, is_user_admin, user_not_admin, user_admin, \ | ||
bot_can_delete, is_bot_admin | ||
from tg_bot.modules.helper_funcs.filters import CustomFilters | ||
from tg_bot.modules.helper_funcs.misc import split_message | ||
from tg_bot.modules.log_channel import loggable | ||
from tg_bot.modules.sql import users_sql | ||
|
||
|
@@ -26,7 +27,10 @@ | |
'contact': Filters.contact, | ||
'photo': Filters.photo, | ||
'gif': Filters.document & CustomFilters.mime_type("video/mp4"), | ||
'url': Filters.entity(MessageEntity.URL) | Filters.caption_entity(MessageEntity.URL), | ||
'url': Filters.entity(MessageEntity.URL) | | ||
Filters.caption_entity(MessageEntity.URL) | | ||
Filters.entity(MessageEntity.TEXT_LINK) | | ||
Filters.caption_entity(MessageEntity.TEXT_LINK), | ||
'bots': Filters.status_update.new_chat_members, | ||
'forward': Filters.forwarded, | ||
'game': Filters.game | ||
|
@@ -94,6 +98,77 @@ def locktypes(bot: Bot, update: Update): | |
update.effective_message.reply_text("\n - ".join(["Locks: "] + list(LOCK_TYPES) + list(RESTRICTION_TYPES))) | ||
|
||
|
||
@run_async | ||
@user_admin | ||
@loggable | ||
def add_whitelist(bot: Bot, update: Update): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be |
||
chat = update.effective_chat # type: Optional[Chat] | ||
user = update.effective_user # type: Optional[User] | ||
message = update.effective_message # type: Optional[Message] | ||
entities = message.parse_entities([MessageEntity.URL]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inline this variable; it isn't used anywhere other than the |
||
added = [] | ||
for url in entities.values(): | ||
if sql.add_whitelist(chat.id, url): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be turned into a list comprehension. |
||
added.append(url) | ||
if added: | ||
message.reply_text("Added to whitelist:\n- "+'\n- '.join(added)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. keep string characters consistent; use |
||
return "<b>{}:</b>" \ | ||
"\n#WHITELIST" \ | ||
"\n<b>Admin:</b> {}" \ | ||
"\nWhitelisted:\n<pre>- {}</pre>".format(html.escape(chat.title), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dont add the |
||
mention_html(user.id, user.first_name), | ||
html.escape('\n- '.join(added))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since youre doing this join twice, make it a variable |
||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessary else; youve returned in the if statement. the following code can be un-indented by one block. |
||
message.reply_text("No URLs were added to the whitelist") | ||
return "" | ||
|
||
|
||
@run_async | ||
@user_admin | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as before; |
||
@loggable | ||
def remove_whitelist(bot: Bot, update: Update): | ||
chat = update.effective_chat # type: Optional[Chat] | ||
user = update.effective_user # type: Optional[User] | ||
message = update.effective_message # type: Optional[Message] | ||
entities = message.parse_entities(MessageEntity.URL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. inline please. Also, parse_entities takes a list of entities; this wont work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When it's a single filter for the entities, it does work (doesn't have to be a list), that's how I tested the code before submitting. But I'll change it so it conforms to the specs. |
||
removed = [] | ||
for url in entities.values(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. List comp |
||
if sql.remove_whitelist(chat.id, url): | ||
removed.append(url) | ||
if removed: | ||
message.reply_text("Removed from whitelist:\n<pre>- {}</pre>".format(html.escape('\n- '.join(removed))), | ||
parse_mode=ParseMode.HTML) | ||
return "<b>{}:</b>" \ | ||
"\n#UNWHITELIST" \ | ||
"\n<b>Admin:</b> {}" \ | ||
"\nRemoved from whitelist:\n<pre>- {}</pre>".format(html.escape(chat.title), | ||
mention_html(user.id, user.first_name), | ||
html.escape('\n- '.join(removed))) | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unecessary else; youve returned in the if statement. the following code can be un-indented by one block. |
||
message.reply_text("Could not remove URL from whitelist or URL not found.") | ||
return "" | ||
|
||
|
||
@run_async | ||
def list_whitelist(bot: Bot, update: Update): | ||
chat = update.effective_chat # type: Optional[Chat] | ||
message = update.effective_message # type: Optional[Message] | ||
all_whitelisted = sql.get_whitelist(chat.id) | ||
|
||
if not all_whitelisted: | ||
message.reply_text("No URLs are whitelisted here!") | ||
return | ||
|
||
whitelist_string = "Whitelisted URLs:\n- "+'\n- '.join(all_whitelisted) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why arent you using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When removing from the whitelist, I used |
||
|
||
for i, part in enumerate(split_message(whitelist_string)): | ||
#only send first part as a reply | ||
if i == 0: | ||
message.reply_text(part, disable_web_page_preview=True) | ||
else: | ||
chat.send_message(part, disable_web_page_preview=True) | ||
|
||
|
||
@user_admin | ||
@bot_can_delete | ||
@loggable | ||
|
@@ -208,6 +283,17 @@ def del_lockables(bot: Bot, update: Update): | |
chat.kick_member(new_mem.id) | ||
message.reply_text("Only admins are allowed to add bots to this chat! Get outta here.") | ||
else: | ||
#allow whitelisted URLs | ||
if lockable == 'url': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I see bot locks as a very different beast, as it results in kicking a member. All other locks result in deleting a message. So there's the check for the bot lockable, and "all the other ones" (everything inside the Inside that |
||
entities = set(message.parse_entities([MessageEntity.URL]).values()) | ||
|
||
# MessageEntity.TEXT_LINK could be added in the filter above, but would return the text, not the | ||
# url. So I must add all entities that have a 'url' field separately | ||
entities = entities | set(entity.url for entity in message.entities if entity.url) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do not use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason here is explained in the commented text. The All entities of the TEXT_LINK type (and only the entities of that type, AFAIK) have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to reduce this to a single call to set()? are you sure the contents of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are the contents of a message that has both a normal links and a text link: 'text': 'google.com and bing.com and link' 'entities': [ {'type': 'url', 'offset': 0, 'length': 10}, {'type': 'url', 'offset': 15, 'length': 8}, {'type': 'text_link', 'offset': 28, 'length': 4, 'url': 'http://yahoo.com/'} ] Entities for a normal URL's don't have a "url" field, so I can't just get the 'url' field of all entities. Meanwhile, text links, when parsed with |
||
#if all URLs are any of the whitelisted ones, accept the message | ||
if all( any(regexp.search(text) for regexp in sql.get_whitelist(chat.id).values()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you handle invalid regex expressions? if any are broken, this will blow up and entirely break whitelisting for that chat. I would personally not allow regex for this, as most users have very limited knowledge of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Users don't get to interact with any of this as regex. All they do is send URLs. Only text that Telegram considers a URL is added (via entities). Also, the URLs themselves are escaped with |
||
for text in entities): | ||
continue | ||
try: | ||
message.delete() | ||
except BadRequest as excp: | ||
|
@@ -288,11 +374,14 @@ def __chat_settings__(chat_id, user_id): | |
|
||
__help__ = """ | ||
- /locktypes: a list of possible locktypes | ||
- /whitelisted: lists urls in this chat's whitelist | ||
|
||
*Admin only:* | ||
- /lock <type>: lock items of a certain type (not available in private) | ||
- /unlock <type>: unlock items of a certain type (not available in private) | ||
- /locks: the current list of locks in this chat. | ||
- /whitelist <url>: add url to whitelist so it's not deleted by URL lock (accepts multiple) | ||
- /unwhitelist <url>: remove url from whitelist (accepts multiple) | ||
|
||
Locks can be used to restrict a group's users. | ||
eg: | ||
|
@@ -307,11 +396,17 @@ def __chat_settings__(chat_id, user_id): | |
LOCK_HANDLER = CommandHandler("lock", lock, pass_args=True, filters=Filters.group) | ||
UNLOCK_HANDLER = CommandHandler("unlock", unlock, pass_args=True, filters=Filters.group) | ||
LOCKED_HANDLER = CommandHandler("locks", list_locks, filters=Filters.group) | ||
WHITELIST_HANDLER = CommandHandler("whitelist", add_whitelist, filters=Filters.group) | ||
UNWHITELIST_HANDLER = CommandHandler("unwhitelist", remove_whitelist, filters=Filters.group) | ||
WHITELISTED_HANDLER = DisableAbleCommandHandler("whitelisted", list_whitelist, filters=Filters.group, admin_ok=True) | ||
|
||
dispatcher.add_handler(LOCK_HANDLER) | ||
dispatcher.add_handler(UNLOCK_HANDLER) | ||
dispatcher.add_handler(LOCKTYPES_HANDLER) | ||
dispatcher.add_handler(LOCKED_HANDLER) | ||
dispatcher.add_handler(WHITELIST_HANDLER) | ||
dispatcher.add_handler(UNWHITELIST_HANDLER) | ||
dispatcher.add_handler(WHITELISTED_HANDLER) | ||
|
||
dispatcher.add_handler(MessageHandler(Filters.all & Filters.group, del_lockables), PERM_GROUP) | ||
dispatcher.add_handler(MessageHandler(Filters.all & Filters.group, rest_handler), REST_GROUP) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
# New chat added -> setup permissions | ||
import threading | ||
import re | ||
|
||
from sqlalchemy import Column, String, Boolean | ||
from sqlalchemy import Column, String, Boolean, UnicodeText | ||
|
||
from tg_bot.modules.sql import SESSION, BASE | ||
|
||
|
@@ -61,13 +62,28 @@ def __init__(self, chat_id): | |
def __repr__(self): | ||
return "<Restrictions for %s>" % self.chat_id | ||
|
||
class URLWhitelist(BASE): | ||
__tablename__ = "permissions_urls" | ||
chat_id = Column(String(14), primary_key=True, nullable=False) | ||
url = Column(UnicodeText, primary_key=True, nullable=False) | ||
|
||
def __init__(self, chat_id, url): | ||
self.chat_id = str(chat_id) # ensure string | ||
self.url = url | ||
|
||
def __repr__(self): | ||
return "<Permission url whitelist for %s>" % self.chat_id | ||
|
||
Permissions.__table__.create(checkfirst=True) | ||
Restrictions.__table__.create(checkfirst=True) | ||
URLWhitelist.__table__.create(checkfirst=True) | ||
|
||
|
||
PERM_LOCK = threading.RLock() | ||
RESTR_LOCK = threading.RLock() | ||
WHITELIST_LOCK = threading.RLock() | ||
CHAT_WHITELIST = {} | ||
URL_REGEXP = re.compile(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?(\S*)', flags=re.I) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this logic should be a part of the module, not the sql |
||
|
||
|
||
def init_permissions(chat_id, reset=False): | ||
|
@@ -216,6 +232,60 @@ def get_restr(chat_id): | |
SESSION.close() | ||
|
||
|
||
def get_whitelist(chat_id): | ||
return CHAT_WHITELIST.get(str(chat_id), {}) | ||
|
||
|
||
def add_whitelist(chat_id, url): | ||
url = URL_REGEXP.search(url).group(3).lower() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Multiple chaining will blow up in case of |
||
if url.endswith('/'): | ||
url = url[:-1] | ||
with WHITELIST_LOCK: | ||
prev = SESSION.query(URLWhitelist).get((str(chat_id), url)) | ||
if not prev: | ||
whitelisted = URLWhitelist(str(chat_id), url) | ||
SESSION.add(whitelisted) | ||
SESSION.commit() | ||
chat_whitelist = CHAT_WHITELIST.setdefault(str(chat_id), {}) | ||
chat_whitelist.update( | ||
{url: re.compile(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?'+re.escape(url)+'($|\W)', | ||
flags=re.I)}) | ||
return True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. session.close() before returning |
||
|
||
|
||
def remove_whitelist(chat_id, url): | ||
global CHAT_WHITELIST | ||
with WHITELIST_LOCK: | ||
url = URL_REGEXP.search(url).group(3).lower() | ||
if url.endswith('/'): | ||
url = url[:-1] | ||
CHAT_WHITELIST.get(str(chat_id), {}).pop(url, None) | ||
white = SESSION.query(URLWhitelist).get((str(chat_id), url)) | ||
if white: | ||
SESSION.delete(white) | ||
SESSION.commit() | ||
return True | ||
|
||
SESSION.close() | ||
return False | ||
|
||
|
||
def __load_chat_whitelist(): | ||
#whitelist for each group is a dict(url: compiled_regexp for url in group) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: pep8 comments have a space after the # |
||
global CHAT_WHITELIST | ||
try: | ||
chats = SESSION.query(URLWhitelist.chat_id).distinct().all() | ||
for (chat_id,) in chats: # remove tuple by ( ,) | ||
CHAT_WHITELIST[str(chat_id)] = {} | ||
|
||
all_whites = SESSION.query(URLWhitelist).all() | ||
for row in all_whites: | ||
CHAT_WHITELIST[str(row.chat_id)].update( | ||
{row.url: re.compile(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?'+re.escape(row.url)+'($|\W)', | ||
flags=re.I)}) | ||
finally: | ||
SESSION.close() | ||
|
||
def migrate_chat(old_chat_id, new_chat_id): | ||
with PERM_LOCK: | ||
perms = SESSION.query(Permissions).get(str(old_chat_id)) | ||
|
@@ -228,3 +298,11 @@ def migrate_chat(old_chat_id, new_chat_id): | |
if rest: | ||
rest.chat_id = str(new_chat_id) | ||
SESSION.commit() | ||
|
||
with WHITELIST_LOCK: | ||
white = SESSION.query(URLWhitelist).filter(URLWhitelist.chat_id == str(old_chat_id)).all() | ||
for row in white: | ||
row.chat_id = str(new_chat_id) | ||
SESSION.commit() | ||
|
||
__load_chat_whitelist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is escape_markdown a used import?