-
Notifications
You must be signed in to change notification settings - Fork 912
Add whitelist option to URL lock #27
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,12 @@ | ||
import html | ||
from typing import Optional, List | ||
|
||
from telegram import Message, Chat, Update, Bot, ParseMode, User, MessageEntity | ||
from telegram import Message, Chat, Update, Bot, ParseMode, User, MessageEntity, MAX_MESSAGE_LENGTH | ||
from telegram import TelegramError | ||
from telegram.error import BadRequest | ||
from telegram.ext import CommandHandler, MessageHandler, Filters | ||
from telegram.ext.dispatcher import run_async | ||
from telegram.utils.helpers import mention_html | ||
from telegram.utils.helpers import mention_html, escape_markdown | ||
|
||
import tg_bot.modules.sql.locks_sql as sql | ||
from tg_bot import dispatcher, SUDO_USERS, LOGGER | ||
|
@@ -25,7 +25,10 @@ | |
'contact': Filters.contact, | ||
'photo': Filters.photo, | ||
'gif': Filters.document & CustomFilters.mime_type("video/mp4"), | ||
'url': Filters.entity(MessageEntity.URL) | Filters.caption_entity(MessageEntity.URL), | ||
'url': Filters.entity(MessageEntity.URL) | | ||
Filters.caption_entity(MessageEntity.URL) | | ||
Filters.entity(MessageEntity.TEXT_LINK) | | ||
Filters.caption_entity(MessageEntity.TEXT_LINK), | ||
'bots': Filters.status_update.new_chat_members, | ||
'forward': Filters.forwarded, | ||
'game': Filters.game | ||
|
@@ -80,6 +83,59 @@ def locktypes(bot: Bot, update: Update): | |
update.effective_message.reply_text("\n - ".join(["Locks: "] + list(LOCK_TYPES) + list(RESTRICTION_TYPES))) | ||
|
||
|
||
@user_admin | ||
def add_whitelist(bot: Bot, update: Update): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be |
||
chat = update.effective_chat # type: Optional[Chat] | ||
message = update.effective_message # type: Optional[Message] | ||
entities = message.parse_entities(MessageEntity.URL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. parse_entities takes a list |
||
added = [] | ||
for url in entities.values(): | ||
if sql.add_whitelist(chat.id, url): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be turned into a list comprehension. |
||
added.append(url) | ||
if added: | ||
message.reply_text("Added {} to whitelist.".format(', '.join(w for w in added))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary list comprehension. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, fixed. |
||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessary else; youve returned in the if statement. the following code can be un-indented by one block. |
||
message.reply_text("No URLs were added to the whitelist") | ||
|
||
|
||
@user_admin | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as before; |
||
def remove_whitelist(bot: Bot, update: Update): | ||
chat = update.effective_chat # type: Optional[Chat] | ||
message = update.effective_message # type: Optional[Message] | ||
entities = message.parse_entities(MessageEntity.URL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. inline please. Also, parse_entities takes a list of entities; this wont work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When it's a single filter for the entities, it does work (doesn't have to be a list), that's how I tested the code before submitting. But I'll change it so it conforms to the specs. |
||
removed = [] | ||
for url in entities.values(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. List comp |
||
if sql.remove_whitelist(chat.id, url): | ||
removed.append(url) | ||
if removed: | ||
message.reply_text("Removed `{}` from whitelist.".format('`, `'.join(escape_markdown(w) for w in removed)), | ||
parse_mode=ParseMode.MARKDOWN) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if possible, use HTML formatting to avoid message parsing issues. escape_markdown is more fragile than escape_html. |
||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unecessary else; youve returned in the if statement. the following code can be un-indented by one block. |
||
message.reply_text("Could not remove URL from whitelist or URL not found.") | ||
|
||
def list_white(bot: Bot, update: Update): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dont use shortenings for func names; |
||
chat = update.effective_chat # type: Optional[Chat] | ||
message = update.effective_message # type: Optional[Message] | ||
all_whitelisted = sql.get_whitelist(chat.id) | ||
|
||
if not all_whitelisted: | ||
message.reply_text("No URLs are whitelisted here!") | ||
return | ||
|
||
BASIC_WHITE_STRING = "Whitelisted URLs:\n" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same; no shortenings |
||
listwhite = BASIC_WHITE_STRING | ||
for url in sorted(all_whitelisted.keys()): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. python3.6 dict keys are sorted already There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK, iterating though dicts now preserves insertion order, but isn't exactly sorted. So initially I was thinking of always listing the URLs in alphabetical order. But listing them in the order they were added is also fine by me -- as long as the position in the list is predictable, it's ok. |
||
entry = "{}, ".format(url) | ||
if len(entry) + len(listwhite) > MAX_MESSAGE_LENGTH: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There already is a split_message function available for longer messages, see the helpers package |
||
message.reply_text(listwhite) | ||
listwhite = entry | ||
else: | ||
listwhite += entry | ||
|
||
if not listwhite == BASIC_WHITE_STRING: | ||
update.effective_message.reply_text(listwhite) | ||
|
||
|
||
@user_admin | ||
@bot_can_delete | ||
@loggable | ||
|
@@ -199,6 +255,16 @@ def del_lockables(bot: Bot, update: Update): | |
chat.kick_member(new_mem.id) | ||
message.reply_text("Only admins are allowed to add bots to this chat! Get outta here.") | ||
else: | ||
#allow whitelisted URLs | ||
if lockable == 'url': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I see bot locks as a very different beast, as it results in kicking a member. All other locks result in deleting a message. So there's the check for the bot lockable, and "all the other ones" (everything inside the Inside that |
||
entities = set(url for url in message.parse_entities(MessageEntity.URL).values()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, parse_entities takes a list. |
||
#MessageEntity.TEXT_LINK could be added in the filter above, but would return the text, not the url, | ||
#so add all entities that have a 'url' field separately | ||
entities = entities | set(entity.url for entity in message.entities if entity.url) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do not use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason here is explained in the commented text. The All entities of the TEXT_LINK type (and only the entities of that type, AFAIK) have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to reduce this to a single call to set()? are you sure the contents of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are the contents of a message that has both a normal links and a text link: 'text': 'google.com and bing.com and link' 'entities': [ {'type': 'url', 'offset': 0, 'length': 10}, {'type': 'url', 'offset': 15, 'length': 8}, {'type': 'text_link', 'offset': 28, 'length': 4, 'url': 'http://yahoo.com/'} ] Entities for a normal URL's don't have a "url" field, so I can't just get the 'url' field of all entities. Meanwhile, text links, when parsed with |
||
#if all URLs are any of the whitelisted ones, accept the message | ||
if all( any(regexp.search(text) for regexp in sql.get_whitelist(chat.id).values()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you handle invalid regex expressions? if any are broken, this will blow up and entirely break whitelisting for that chat. I would personally not allow regex for this, as most users have very limited knowledge of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Users don't get to interact with any of this as regex. All they do is send URLs. Only text that Telegram considers a URL is added (via entities). Also, the URLs themselves are escaped with |
||
for text in entities): | ||
continue | ||
try: | ||
message.delete() | ||
except BadRequest as excp: | ||
|
@@ -279,11 +345,14 @@ def __chat_settings__(chat_id, user_id): | |
|
||
__help__ = """ | ||
- /locktypes: a list of possible locktypes | ||
- /whitelisted: lists urls in this chat's whitelist | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: indent this properly (one more space!) |
||
|
||
*Admin only:* | ||
- /lock <type>: lock items of a certain type (not available in private) | ||
- /unlock <type>: unlock items of a certain type (not available in private) | ||
- /locks: the current list of locks in this chat. | ||
- /whitelist <url>: add url to whitelist so it's not deleted by URL lock (accepts multiple) | ||
- /unwhitelist <url>: remove url from whitelist (accepts multiple) | ||
|
||
Locks can be used to restrict a group's users. | ||
eg: | ||
|
@@ -298,11 +367,17 @@ def __chat_settings__(chat_id, user_id): | |
LOCK_HANDLER = CommandHandler("lock", lock, pass_args=True, filters=Filters.group) | ||
UNLOCK_HANDLER = CommandHandler("unlock", unlock, pass_args=True, filters=Filters.group) | ||
LOCKED_HANDLER = CommandHandler("locks", list_locks, filters=Filters.group) | ||
WHITELIST_HANDLER = CommandHandler("whitelist", add_whitelist, filters=Filters.group) | ||
UNWHITELIST_HANDLER = CommandHandler("unwhitelist", remove_whitelist, filters=Filters.group) | ||
WHITELISTED_HANDLER = DisableAbleCommandHandler("whitelisted", list_white, filters=Filters.group, admin_ok=True) | ||
|
||
dispatcher.add_handler(LOCK_HANDLER) | ||
dispatcher.add_handler(UNLOCK_HANDLER) | ||
dispatcher.add_handler(LOCKTYPES_HANDLER) | ||
dispatcher.add_handler(LOCKED_HANDLER) | ||
dispatcher.add_handler(WHITELIST_HANDLER) | ||
dispatcher.add_handler(UNWHITELIST_HANDLER) | ||
dispatcher.add_handler(WHITELISTED_HANDLER) | ||
|
||
dispatcher.add_handler(MessageHandler(Filters.all & Filters.group, del_lockables), PERM_GROUP) | ||
dispatcher.add_handler(MessageHandler(Filters.all & Filters.group, rest_handler), REST_GROUP) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
# New chat added -> setup permissions | ||
import threading | ||
import re | ||
|
||
from sqlalchemy import Column, String, Boolean | ||
from sqlalchemy import Column, String, Boolean, UnicodeText | ||
|
||
from tg_bot.modules.sql import SESSION, BASE | ||
|
||
|
@@ -61,13 +62,27 @@ def __init__(self, chat_id): | |
def __repr__(self): | ||
return "<Restrictions for %s>" % self.chat_id | ||
|
||
class URLWhitelist(BASE): | ||
__tablename__ = "permissions_urls" | ||
chat_id = Column(String(14), primary_key=True, nullable=False) | ||
url = Column(UnicodeText, primary_key=True, nullable=False) | ||
|
||
def __init__(self, chat_id, url): | ||
self.chat_id = str(chat_id) # ensure string | ||
self.url = url | ||
|
||
def __repr__(self): | ||
return "<Permission url whitelist for %s>" % self.chat_id | ||
|
||
Permissions.__table__.create(checkfirst=True) | ||
Restrictions.__table__.create(checkfirst=True) | ||
URLWhitelist.__table__.create(checkfirst=True) | ||
|
||
|
||
PERM_LOCK = threading.RLock() | ||
RESTR_LOCK = threading.RLock() | ||
WHITELIST_LOCK = threading.RLock() | ||
CHAT_WHITELIST = {} | ||
|
||
|
||
def init_permissions(chat_id, reset=False): | ||
|
@@ -216,6 +231,64 @@ def get_restr(chat_id): | |
SESSION.close() | ||
|
||
|
||
def get_whitelist(chat_id): | ||
return CHAT_WHITELIST.get(str(chat_id), {}) | ||
|
||
|
||
def add_whitelist(chat_id, url): | ||
global CHAT_WHITELIST | ||
with WHITELIST_LOCK: | ||
url = re.search(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?(\S*)', url, flags=re.I).group(3).lower() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given this regex pattern doesnt change, compile it and use it as a global. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also not convinced this should be here, given it isnt sql logic (and youre wasting CPU cycles given this bit doesnt need the lock yet) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only called for strings Telegram classified as a URL entity, and group(3) matches with \S* which is "any non-whitespace character zero or more times", so it really never should be None. Will compile and move the pattern out to a global variable. I decided to have all regexp patterns in a single file so it's easy to change them all, if needed. |
||
if url.endswith('/'): | ||
url = url[:-1] | ||
prev = SESSION.query(URLWhitelist).get((str(chat_id), url)) | ||
if not prev: | ||
whitelisted = URLWhitelist(str(chat_id), url) | ||
SESSION.add(whitelisted) | ||
SESSION.commit() | ||
chat_whitelist = CHAT_WHITELIST.setdefault(str(chat_id), {}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldnt this be indented too? since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is more a case of "double bagging" :-) If for whatever reason the URL was in the DB but not in the dictionary, now it sure is. At least the |
||
chat_whitelist.update( | ||
{url: re.compile(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?'+re.escape(url)+'($|\W)', flags=re.I)} | ||
) | ||
return True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. session.close() before returning |
||
|
||
|
||
def remove_whitelist(chat_id, url): | ||
global CHAT_WHITELIST | ||
with WHITELIST_LOCK: | ||
url = re.search(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?(\S*)', url, flags=re.I).group(3).lower() | ||
if url.endswith('/'): | ||
url = url[:-1] | ||
CHAT_WHITELIST.get(str(chat_id), {}).pop(url, None) | ||
white = SESSION.query(URLWhitelist).get((str(chat_id), url)) | ||
if white: | ||
SESSION.delete(white) | ||
SESSION.commit() | ||
return True | ||
|
||
SESSION.close() | ||
return False | ||
|
||
|
||
def __load_chat_whitelist(): | ||
#whitelist for each group is a dict(url: compiled_regexp for url in group) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: pep8 comments have a space after the # |
||
global CHAT_WHITELIST | ||
try: | ||
chats = SESSION.query(URLWhitelist.chat_id).distinct().all() | ||
for (chat_id,) in chats: # remove tuple by ( ,) | ||
CHAT_WHITELIST[str(chat_id)] = {} | ||
|
||
all_whites = SESSION.query(URLWhitelist).all() | ||
for row in all_whites: | ||
CHAT_WHITELIST[str(row.chat_id)].update( | ||
{row.url: re.compile(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?'+re.escape(row.url)+'($|\W)', | ||
flags=re.I | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: does this bracket really need its own line? |
||
} | ||
) | ||
finally: | ||
SESSION.close() | ||
|
||
def migrate_chat(old_chat_id, new_chat_id): | ||
with PERM_LOCK: | ||
perms = SESSION.query(Permissions).get(str(old_chat_id)) | ||
|
@@ -228,3 +301,11 @@ def migrate_chat(old_chat_id, new_chat_id): | |
if rest: | ||
rest.chat_id = str(new_chat_id) | ||
SESSION.commit() | ||
|
||
with WHITELIST_LOCK: | ||
white = SESSION.query(URLWhitelist).filter(URLWhitelist.chat_id == str(old_chat_id)).all() | ||
for row in white: | ||
row.chat_id = str(new_chat_id) | ||
SESSION.commit() | ||
|
||
__load_chat_whitelist() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a newline at EOF for pep8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is escape_markdown a used import?