Skip to content
This repository has been archived by the owner on Sep 30, 2022. It is now read-only.

Add whitelist option to URL lock #27

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions tg_bot/modules/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
from telegram.error import BadRequest
from telegram.ext import CommandHandler, MessageHandler, Filters
from telegram.ext.dispatcher import run_async
from telegram.utils.helpers import mention_html
from telegram.utils.helpers import mention_html, escape_markdown
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is escape_markdown a used import?


import tg_bot.modules.sql.locks_sql as sql
from tg_bot import dispatcher, SUDO_USERS, LOGGER
from tg_bot.modules.disable import DisableAbleCommandHandler
from tg_bot.modules.helper_funcs.chat_status import can_delete, is_user_admin, user_not_admin, user_admin, \
bot_can_delete, is_bot_admin
from tg_bot.modules.helper_funcs.filters import CustomFilters
from tg_bot.modules.helper_funcs.misc import split_message
from tg_bot.modules.log_channel import loggable
from tg_bot.modules.sql import users_sql

Expand All @@ -26,7 +27,10 @@
'contact': Filters.contact,
'photo': Filters.photo,
'gif': Filters.document & CustomFilters.mime_type("video/mp4"),
'url': Filters.entity(MessageEntity.URL) | Filters.caption_entity(MessageEntity.URL),
'url': Filters.entity(MessageEntity.URL) |
Filters.caption_entity(MessageEntity.URL) |
Filters.entity(MessageEntity.TEXT_LINK) |
Filters.caption_entity(MessageEntity.TEXT_LINK),
'bots': Filters.status_update.new_chat_members,
'forward': Filters.forwarded,
'game': Filters.game
Expand Down Expand Up @@ -94,6 +98,77 @@ def locktypes(bot: Bot, update: Update):
update.effective_message.reply_text("\n - ".join(["Locks: "] + list(LOCK_TYPES) + list(RESTRICTION_TYPES)))


@run_async
@user_admin
@loggable
def add_whitelist(bot: Bot, update: Update):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be @run_async to ensure as few blocks as possible
Also needs to be @loggable to ensure log channels get info -> which means returning a log at the end as well

chat = update.effective_chat # type: Optional[Chat]
user = update.effective_user # type: Optional[User]
message = update.effective_message # type: Optional[Message]
entities = message.parse_entities([MessageEntity.URL])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline this variable; it isn't used anywhere other than the for loop.

added = []
for url in entities.values():
if sql.add_whitelist(chat.id, url):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be turned into a list comprehension.
added = [url for url in message.parse_entities([MessageEntity.URL]).values() if sql.add_whitelist(chat.id, url)]

added.append(url)
if added:
message.reply_text("Added to whitelist:\n- "+'\n- '.join(added))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep string characters consistent; use "" for the join

return "<b>{}:</b>" \
"\n#WHITELIST" \
"\n<b>Admin:</b> {}" \
"\nWhitelisted:\n<pre>- {}</pre>".format(html.escape(chat.title),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont add the - to the code block; the code is there so you can copy paste it just by tapping.

mention_html(user.id, user.first_name),
html.escape('\n- '.join(added)))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since youre doing this join twice, make it a variable

else:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary else; youve returned in the if statement. the following code can be un-indented by one block.

message.reply_text("No URLs were added to the whitelist")
return ""


@run_async
@user_admin
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as before; @run_async and @loggable. applies for all funcs

@loggable
def remove_whitelist(bot: Bot, update: Update):
chat = update.effective_chat # type: Optional[Chat]
user = update.effective_user # type: Optional[User]
message = update.effective_message # type: Optional[Message]
entities = message.parse_entities(MessageEntity.URL)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline please. Also, parse_entities takes a list of entities; this wont work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it's a single filter for the entities, it does work (doesn't have to be a list), that's how I tested the code before submitting. But I'll change it so it conforms to the specs.

removed = []
for url in entities.values():
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List comp

if sql.remove_whitelist(chat.id, url):
removed.append(url)
if removed:
message.reply_text("Removed from whitelist:\n<pre>- {}</pre>".format(html.escape('\n- '.join(removed))),
parse_mode=ParseMode.HTML)
return "<b>{}:</b>" \
"\n#UNWHITELIST" \
"\n<b>Admin:</b> {}" \
"\nRemoved from whitelist:\n<pre>- {}</pre>".format(html.escape(chat.title),
mention_html(user.id, user.first_name),
html.escape('\n- '.join(removed)))
else:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unecessary else; youve returned in the if statement. the following code can be un-indented by one block.

message.reply_text("Could not remove URL from whitelist or URL not found.")
return ""


@run_async
def list_whitelist(bot: Bot, update: Update):
chat = update.effective_chat # type: Optional[Chat]
message = update.effective_message # type: Optional[Message]
all_whitelisted = sql.get_whitelist(chat.id)

if not all_whitelisted:
message.reply_text("No URLs are whitelisted here!")
return

whitelist_string = "Whitelisted URLs:\n- "+'\n- '.join(all_whitelisted)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why arent you using <code>? would make it easier to remove the urls

Copy link
Author

@GaryBeez GaryBeez Jul 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<code> can be copied with a single tap, but Telegram doesn't give you any feedback of what was copied. If you have a list of links, you won't know if you tapped the correct one (fat finger). By using links, copying is almost as easy by tap+holding, but you also get the feedback of which link was tapped and allows easily opening them to double-check if needed.

When removing from the whitelist, I used <pre> to prevent others from clicking and opening the unwanted site (will change to individual <code> blocks), but when whitelisting, I believe leaving the links clickable is better.


for i, part in enumerate(split_message(whitelist_string)):
#only send first part as a reply
if i == 0:
message.reply_text(part, disable_web_page_preview=True)
else:
chat.send_message(part, disable_web_page_preview=True)


@user_admin
@bot_can_delete
@loggable
Expand Down Expand Up @@ -208,6 +283,17 @@ def del_lockables(bot: Bot, update: Update):
chat.kick_member(new_mem.id)
message.reply_text("Only admins are allowed to add bots to this chat! Get outta here.")
else:
#allow whitelisted URLs
if lockable == 'url':
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be in else if block, not in else

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I see bot locks as a very different beast, as it results in kicking a member. All other locks result in deleting a message. So there's the check for the bot lockable, and "all the other ones" (everything inside the else).

Inside that else, URL locks have this special case. If there was a photo whiltelist, I would put it as an elif at the same level as my URL if, and so on. Then at the end, the code to delete the message only needs to appear once, executed if none of the checks executed a continue

entities = set(message.parse_entities([MessageEntity.URL]).values())

# MessageEntity.TEXT_LINK could be added in the filter above, but would return the text, not the
# url. So I must add all entities that have a 'url' field separately
entities = entities | set(entity.url for entity in message.entities if entity.url)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not use message.entities; its not recommended in the docs. use parse_entities() instead

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason here is explained in the commented text. The parse_entities() method returns the text of the message that contains the link. It's fine for URLs sent directly in the text, but if it's the text link [nice word](www.bad_link.com), it would return "nice word".

All entities of the TEXT_LINK type (and only the entities of that type, AFAIK) have a .url attribute, and it contains the actual URL. So by adding all those URLs to the set, I'm sure I'm including the text link URLs as well.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to reduce this to a single call to set()? are you sure the contents of entities isn't included in your generator here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the contents of a message that has both a normal links and a text link:

'text': 'google.com and bing.com and link'
'entities': [
    {'type': 'url', 'offset': 0, 'length': 10},
    {'type': 'url', 'offset': 15, 'length': 8},
    {'type': 'text_link', 'offset': 28, 'length': 4, 'url': 'http://yahoo.com/'}
]

Entities for a normal URL's don't have a "url" field, so I can't just get the 'url' field of all entities. Meanwhile, text links, when parsed with parse_entities, will only show the text from the message, not the actual link (in this example it would parse to "link" instead of "yahoo.com"), so I can't use the parse method to get text links either. I don't see how the URLs can all be extracted in a single pass...

#if all URLs are any of the whitelisted ones, accept the message
if all( any(regexp.search(text) for regexp in sql.get_whitelist(chat.id).values())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you handle invalid regex expressions? if any are broken, this will blow up and entirely break whitelisting for that chat. I would personally not allow regex for this, as most users have very limited knowledge of it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users don't get to interact with any of this as regex. All they do is send URLs. Only text that Telegram considers a URL is added (via entities). Also, the URLs themselves are escaped with re.escape before the regex is compiled.

for text in entities):
continue
try:
message.delete()
except BadRequest as excp:
Expand Down Expand Up @@ -288,11 +374,14 @@ def __chat_settings__(chat_id, user_id):

__help__ = """
- /locktypes: a list of possible locktypes
- /whitelisted: lists urls in this chat's whitelist

*Admin only:*
- /lock <type>: lock items of a certain type (not available in private)
- /unlock <type>: unlock items of a certain type (not available in private)
- /locks: the current list of locks in this chat.
- /whitelist <url>: add url to whitelist so it's not deleted by URL lock (accepts multiple)
- /unwhitelist <url>: remove url from whitelist (accepts multiple)

Locks can be used to restrict a group's users.
eg:
Expand All @@ -307,11 +396,17 @@ def __chat_settings__(chat_id, user_id):
LOCK_HANDLER = CommandHandler("lock", lock, pass_args=True, filters=Filters.group)
UNLOCK_HANDLER = CommandHandler("unlock", unlock, pass_args=True, filters=Filters.group)
LOCKED_HANDLER = CommandHandler("locks", list_locks, filters=Filters.group)
WHITELIST_HANDLER = CommandHandler("whitelist", add_whitelist, filters=Filters.group)
UNWHITELIST_HANDLER = CommandHandler("unwhitelist", remove_whitelist, filters=Filters.group)
WHITELISTED_HANDLER = DisableAbleCommandHandler("whitelisted", list_whitelist, filters=Filters.group, admin_ok=True)

dispatcher.add_handler(LOCK_HANDLER)
dispatcher.add_handler(UNLOCK_HANDLER)
dispatcher.add_handler(LOCKTYPES_HANDLER)
dispatcher.add_handler(LOCKED_HANDLER)
dispatcher.add_handler(WHITELIST_HANDLER)
dispatcher.add_handler(UNWHITELIST_HANDLER)
dispatcher.add_handler(WHITELISTED_HANDLER)

dispatcher.add_handler(MessageHandler(Filters.all & Filters.group, del_lockables), PERM_GROUP)
dispatcher.add_handler(MessageHandler(Filters.all & Filters.group, rest_handler), REST_GROUP)
80 changes: 79 additions & 1 deletion tg_bot/modules/sql/locks_sql.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# New chat added -> setup permissions
import threading
import re

from sqlalchemy import Column, String, Boolean
from sqlalchemy import Column, String, Boolean, UnicodeText

from tg_bot.modules.sql import SESSION, BASE

Expand Down Expand Up @@ -61,13 +62,28 @@ def __init__(self, chat_id):
def __repr__(self):
return "<Restrictions for %s>" % self.chat_id

class URLWhitelist(BASE):
__tablename__ = "permissions_urls"
chat_id = Column(String(14), primary_key=True, nullable=False)
url = Column(UnicodeText, primary_key=True, nullable=False)

def __init__(self, chat_id, url):
self.chat_id = str(chat_id) # ensure string
self.url = url

def __repr__(self):
return "<Permission url whitelist for %s>" % self.chat_id

Permissions.__table__.create(checkfirst=True)
Restrictions.__table__.create(checkfirst=True)
URLWhitelist.__table__.create(checkfirst=True)


PERM_LOCK = threading.RLock()
RESTR_LOCK = threading.RLock()
WHITELIST_LOCK = threading.RLock()
CHAT_WHITELIST = {}
URL_REGEXP = re.compile(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?(\S*)', flags=re.I)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should be a part of the module, not the sql



def init_permissions(chat_id, reset=False):
Expand Down Expand Up @@ -216,6 +232,60 @@ def get_restr(chat_id):
SESSION.close()


def get_whitelist(chat_id):
return CHAT_WHITELIST.get(str(chat_id), {})


def add_whitelist(chat_id, url):
url = URL_REGEXP.search(url).group(3).lower()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple chaining will blow up in case of None return from the search, or the group.

if url.endswith('/'):
url = url[:-1]
with WHITELIST_LOCK:
prev = SESSION.query(URLWhitelist).get((str(chat_id), url))
if not prev:
whitelisted = URLWhitelist(str(chat_id), url)
SESSION.add(whitelisted)
SESSION.commit()
chat_whitelist = CHAT_WHITELIST.setdefault(str(chat_id), {})
chat_whitelist.update(
{url: re.compile(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?'+re.escape(url)+'($|\W)',
flags=re.I)})
return True
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session.close() before returning



def remove_whitelist(chat_id, url):
global CHAT_WHITELIST
with WHITELIST_LOCK:
url = URL_REGEXP.search(url).group(3).lower()
if url.endswith('/'):
url = url[:-1]
CHAT_WHITELIST.get(str(chat_id), {}).pop(url, None)
white = SESSION.query(URLWhitelist).get((str(chat_id), url))
if white:
SESSION.delete(white)
SESSION.commit()
return True

SESSION.close()
return False


def __load_chat_whitelist():
#whitelist for each group is a dict(url: compiled_regexp for url in group)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pep8 comments have a space after the #

global CHAT_WHITELIST
try:
chats = SESSION.query(URLWhitelist.chat_id).distinct().all()
for (chat_id,) in chats: # remove tuple by ( ,)
CHAT_WHITELIST[str(chat_id)] = {}

all_whites = SESSION.query(URLWhitelist).all()
for row in all_whites:
CHAT_WHITELIST[str(row.chat_id)].update(
{row.url: re.compile(r'(^http:\/\/|^https:\/\/|^ftp:\/\/|^)(www\.)?'+re.escape(row.url)+'($|\W)',
flags=re.I)})
finally:
SESSION.close()

def migrate_chat(old_chat_id, new_chat_id):
with PERM_LOCK:
perms = SESSION.query(Permissions).get(str(old_chat_id))
Expand All @@ -228,3 +298,11 @@ def migrate_chat(old_chat_id, new_chat_id):
if rest:
rest.chat_id = str(new_chat_id)
SESSION.commit()

with WHITELIST_LOCK:
white = SESSION.query(URLWhitelist).filter(URLWhitelist.chat_id == str(old_chat_id)).all()
for row in white:
row.chat_id = str(new_chat_id)
SESSION.commit()

__load_chat_whitelist()