diff --git a/bot/core/config_manager.py b/bot/core/config_manager.py index a8855efa8..294e0e64b 100644 --- a/bot/core/config_manager.py +++ b/bot/core/config_manager.py @@ -11,6 +11,7 @@ class Config: DATABASE_URL = "" DEFAULT_UPLOAD = "rc" DOWNLOAD_DIR = "/usr/src/app/downloads/" + DELETE_LINKS = False EQUAL_SPLITS = False EXTENSION_FILTER = "" FFMPEG_CMDS = [] @@ -24,6 +25,8 @@ class Config: LEECH_SPLIT_SIZE = 2097152000 MEDIA_GROUP = False MIXED_LEECH = False + MEGA_EMAIL = "" + MEGA_PASSWORD = "" NAME_SUBSTITUTE = "" OWNER_ID = 0 QUEUE_ALL = 0 diff --git a/bot/helper/listeners/mega_listener.py b/bot/helper/listeners/mega_listener.py index e6dd3d9b1..4e79032ac 100644 --- a/bot/helper/listeners/mega_listener.py +++ b/bot/helper/listeners/mega_listener.py @@ -10,38 +10,22 @@ class AsyncExecutor: def __init__(self): self.continue_event = Event() - def do( - self, - function, - args, - ): + def do(self, function, args): self.continue_event.clear() function(*args) self.continue_event.wait() -async def mega_login( - executor, - api, - email, - password, -): +async def mega_login(executor, api, email, password): if email and password: await sync_to_async( executor.do, api.login, - ( - email, - password, - ), + (email, password), ) -async def mega_logout( - executor, - api, - folder_api=None, -): +async def mega_logout(executor, api, folder_api=None): await sync_to_async( executor.do, api.logout, @@ -62,6 +46,7 @@ class MegaAppListener(MegaListener): ) def __init__(self, continue_event: Event, listener): + super().__init__() self.continue_event = continue_event self.node = None self.public_node = None @@ -71,7 +56,6 @@ def __init__(self, continue_event: Event, listener): self._bytes_transferred = 0 self._speed = 0 self._name = "" - super().__init__() @property def speed(self): @@ -81,12 +65,7 @@ def speed(self): def downloaded_bytes(self): return self._bytes_transferred - def onRequestFinish( # noqa: N802 - self, - api, - request, - error, - ): + def onRequestFinish(self, api, request, error): # noqa: N802 if str(error).lower() != "no error": self.error = error.copy() if str(self.error).casefold() != "not found": @@ -112,12 +91,7 @@ def onRequestFinish( # noqa: N802 ): self.continue_event.set() - def onRequestTemporaryError( # noqa: N802 - self, - _, - __, - error: MegaError, - ): + def onRequestTemporaryError(self, _, __, error: MegaError): # noqa: N802 LOGGER.error(f"Mega Request error in {error}") if not self.is_cancelled: self.is_cancelled = True @@ -128,27 +102,15 @@ def onRequestTemporaryError( # noqa: N802 self.error = error.toString() self.continue_event.set() - def onTransferUpdate( # noqa: N802 - self, - api: MegaApi, - transfer: MegaTransfer, - ): + def onTransferUpdate(self, api: MegaApi, transfer: MegaTransfer): # noqa: N802 if self.is_cancelled: - api.cancelTransfer( - transfer, - None, - ) + api.cancelTransfer(transfer, None) self.continue_event.set() return self._speed = transfer.getSpeed() self._bytes_transferred = transfer.getTransferredBytes() - def onTransferFinish( # noqa: N802 - self, - _: MegaApi, - transfer: MegaTransfer, - __, - ): + def onTransferFinish(self, _: MegaApi, transfer: MegaTransfer, __): # noqa: N802 try: if self.is_cancelled: self.continue_event.set() @@ -160,27 +122,17 @@ def onTransferFinish( # noqa: N802 except Exception as e: LOGGER.error(e) - def onTransferTemporaryError( # noqa: N802 - self, - _, - transfer, - error, - ): + def onTransferTemporaryError(self, _, transfer, error): # noqa: N802 LOGGER.error( f"Mega download error in file {transfer.getFileName()}: {error}", ) - if transfer.getState() in [ - 1, - 4, - ]: + if transfer.getState() in [1, 4]: return - self.error = ( - f"TransferTempError: {error.toString()} ({transfer.getFileName()})" - ) + self.error = f"TransferTempError: {error.toString()} ({transfer.getFileName()})" if not self.is_cancelled: self.is_cancelled = True self.continue_event.set() async def cancel_task(self): self.is_cancelled = True - await self.listener.on_download_error("Download Canceled by user") + await self.listener.on_download_error("Download Canceled by user") \ No newline at end of file diff --git a/bot/helper/mirror_leech_utils/download_utils/mega_download.py b/bot/helper/mirror_leech_utils/download_utils/mega_download.py index a49ca3646..5a7df2d41 100644 --- a/bot/helper/mirror_leech_utils/download_utils/mega_download.py +++ b/bot/helper/mirror_leech_utils/download_utils/mega_download.py @@ -1,16 +1,15 @@ from secrets import token_hex - from aiofiles.os import makedirs from mega import MegaApi from bot import ( LOGGER, - config_dict, non_queued_dl, queue_dict_lock, task_dict, task_dict_lock, ) +from bot.core.config_manager import Config from bot.helper.ext_utils.bot_utils import sync_to_async from bot.helper.ext_utils.links_utils import get_mega_link_type from bot.helper.ext_utils.task_manager import ( @@ -34,142 +33,57 @@ async def add_mega_download(listener, path): - MAIL = config_dict["MEGA_EMAIL"] - PASS = config_dict["MEGA_PASSWORD"] + email = Config.MEGA_EMAIL + password = Config.MEGA_PASSWORD executor = AsyncExecutor() - api = MegaApi( - None, - None, - None, - "Aeon", - ) + api = MegaApi(None, None, None, "Aeon") folder_api = None - mega_listener = MegaAppListener( - executor.continue_event, - listener, - ) + mega_listener = MegaAppListener(executor.continue_event, listener) api.addListener(mega_listener) - await mega_login( - executor, - api, - MAIL, - PASS, - ) + await mega_login(executor, api, email, password) if get_mega_link_type(listener.link) == "file": - await sync_to_async( - executor.do, - api.getPublicNode, - (listener.link,), - ) + await sync_to_async(executor.do, api.getPublicNode, (listener.link,)) node = mega_listener.public_node else: - folder_api = MegaApi( - None, - None, - None, - "Aeon", - ) + folder_api = MegaApi(None, None, None, "Aeon") folder_api.addListener(mega_listener) - await sync_to_async( - executor.do, - folder_api.loginToFolder, - (listener.link,), - ) - node = await sync_to_async( - folder_api.authorizeNode, - mega_listener.node, - ) + await sync_to_async(executor.do, folder_api.loginToFolder, (listener.link,)) + node = await sync_to_async(folder_api.authorizeNode, mega_listener.node) if mega_listener.error: - mmsg = await send_message( - listener.message, - str(mega_listener.error), - ) - await mega_logout( - executor, - api, - folder_api, - ) + mmsg = await send_message(listener.message, str(mega_listener.error)) + await mega_logout(executor, api, folder_api) await delete_links(listener.message) - await auto_delete_message( - listener.message, - mmsg, - ) + await auto_delete_message(listener.message, mmsg) return - listener.name = ( - listener.name or node.getName() # type: ignore - ) - ( - msg, - button, - ) = await stop_duplicate_check(listener) + listener.name = listener.name or node.getName() + msg, button = await stop_duplicate_check(listener) if msg: - mmsg = await send_message( - listener.message, - msg, - button, - ) - await mega_logout( - executor, - api, - folder_api, - ) + mmsg = await send_message(listener.message, msg, button) + await mega_logout(executor, api, folder_api) await delete_links(listener.message) - await auto_delete_message( - listener.message, - mmsg, - ) + await auto_delete_message(listener.message, mmsg) return gid = token_hex(4) listener.size = api.getSize(node) - """if limit_exceeded := await limit_checker( - listener, - is_mega=True - ): - mmsg = await send_message( - listener.message, - limit_exceeded - ) - await mega_logout( - executor, - api, - folder_api - ) - await delete_links(listener.message) - await auto_delete_message( - listener.message, - mmsg - ) - return""" - ( - added_to_queue, - event, - ) = await check_running_tasks(listener) + added_to_queue, event = await check_running_tasks(listener) if added_to_queue: LOGGER.info(f"Added to Queue/Download: {listener.name}") async with task_dict_lock: - task_dict[listener.mid] = QueueStatus( - listener, - gid, - "Dl", - ) + task_dict[listener.mid] = QueueStatus(listener, gid, "Dl") await listener.on_download_start() await send_status_message(listener.message) - await event.wait() # type: ignore + await event.wait() async with task_dict_lock: if listener.mid not in task_dict: - await mega_logout( - executor, - api, - folder_api, - ) + await mega_logout(executor, api, folder_api) return from_queue = True LOGGER.info(f"Start Queued Download from Mega: {listener.name}") @@ -177,12 +91,7 @@ async def add_mega_download(listener, path): from_queue = False async with task_dict_lock: - task_dict[listener.mid] = MegaDownloadStatus( - listener, - mega_listener, - gid, - "dl", - ) + task_dict[listener.mid] = MegaDownloadStatus(listener, mega_listener, gid, "dl") async with queue_dict_lock: non_queued_dl.add(listener.mid) @@ -193,24 +102,10 @@ async def add_mega_download(listener, path): await send_status_message(listener.message) LOGGER.info(f"Download from Mega: {listener.name}") - await makedirs( - path, - exist_ok=True, - ) + await makedirs(path, exist_ok=True) await sync_to_async( executor.do, api.startDownload, - ( - node, - path, - listener.name, - None, - False, - None, - ), - ) - await mega_logout( - executor, - api, - folder_api, + (node, path, listener.name, None, False, None), ) + await mega_logout(executor, api, folder_api) \ No newline at end of file diff --git a/bot/helper/mirror_leech_utils/download_utils/telegram_download.py b/bot/helper/mirror_leech_utils/download_utils/telegram_download.py index 8eb30af03..0fca3acba 100644 --- a/bot/helper/mirror_leech_utils/download_utils/telegram_download.py +++ b/bot/helper/mirror_leech_utils/download_utils/telegram_download.py @@ -56,10 +56,7 @@ async def _on_download_start(self, file_id, from_queue): async def _on_download_progress(self, current, _): if self._listener.is_cancelled: - if self.session == "user": - TgClient.user.stop_transmission() - else: - TgClient.bot.stop_transmission() + self.session.stop_transmission() self._processed_bytes = current async def _on_download_error(self, error): @@ -98,7 +95,7 @@ async def _download(self, message, path): async def add_download(self, message, path, session): self.session = session if ( - self.session not in ["user", "bot"] + self.session is None and self._listener.user_transmission and self._listener.is_super_chat ): @@ -107,7 +104,12 @@ async def add_download(self, message, path, session): chat_id=message.chat.id, message_ids=message.id, ) - elif self.session != "user": + elif self.session and self.session != TgClient.bot: + message = await self.session.get_messages( + chat_id=message.chat.id, + message_ids=message.id, + ) + else: self.session = "bot" media = ( diff --git a/bot/helper/telegram_helper/message_utils.py b/bot/helper/telegram_helper/message_utils.py index d6611a183..75b235c69 100644 --- a/bot/helper/telegram_helper/message_utils.py +++ b/bot/helper/telegram_helper/message_utils.py @@ -2,62 +2,122 @@ from re import match as re_match from time import time -from pyrogram.errors import FloodPremiumWait, FloodWait +from cachetools import TTLCache +from pyrogram import Client, enums +from pyrogram.errors import FloodWait, MessageEmpty, MessageNotModified +from pyrogram.types import InputMediaPhoto -from bot import LOGGER, intervals, status_dict, task_dict_lock -from bot.core.aeon_client import TgClient +from bot import ( + LOGGER, + intervals, + status_dict, + task_dict_lock, + user_data, +) +from bot.core..aeon_client import TgClient from bot.core.config_manager import Config from bot.helper.ext_utils.bot_utils import SetInterval from bot.helper.ext_utils.exceptions import TgLinkException from bot.helper.ext_utils.status_utils import get_readable_message +session_cache = TTLCache(maxsize=1000, ttl=36000) -async def send_message(message, text, buttons=None): + +async def send_message( + message, + text, + buttons=None, + photo=None, + markdown=False, +): + parse_mode = enums.ParseMode.MARKDOWN if markdown else enums.ParseMode.HTML try: + if isinstance(message, int): + return await TgClient.bot.send_message( + chat_id=message, + text=text, + disable_web_page_preview=True, + disable_notification=True, + reply_markup=buttons, + parse_mode=parse_mode, + ) + if photo: + return await message.reply_photo( + photo=photo, + reply_to_message_id=message.id, + caption=text, + reply_markup=buttons, + disable_notification=True, + parse_mode=parse_mode, + ) return await message.reply( text=text, quote=True, disable_web_page_preview=True, disable_notification=True, reply_markup=buttons, + parse_mode=parse_mode, ) except FloodWait as f: LOGGER.warning(str(f)) await sleep(f.value * 1.2) - return await send_message(message, text, buttons) + return await send_message(message, text, buttons, photo, markdown) except Exception as e: LOGGER.error(str(e)) return str(e) -async def edit_message(message, text, buttons=None): +async def edit_message( + message, + text, + buttons=None, + photo=None, + markdown=False, +): + parse_mode = enums.ParseMode.MARKDOWN if markdown else enums.ParseMode.HTML try: - return await message.edit( + if message.media: + if photo: + return await message.edit_media( + InputMediaPhoto(photo, text), + reply_markup=buttons, + parse_mode=parse_mode, + ) + return await message.edit_caption( + caption=text, + reply_markup=buttons, + parse_mode=parse_mode, + ) + await message.edit( text=text, disable_web_page_preview=True, reply_markup=buttons, + parse_mode=parse_mode, ) except FloodWait as f: LOGGER.warning(str(f)) await sleep(f.value * 1.2) - return await edit_message(message, text, buttons) + return await edit_message(message, text, buttons, photo, markdown) + except (MessageNotModified, MessageEmpty): + pass except Exception as e: LOGGER.error(str(e)) return str(e) -async def send_file(message, file, caption=""): +async def send_file(message, file, caption="", buttons=None): try: return await message.reply_document( document=file, quote=True, caption=caption, disable_notification=True, + reply_markup=buttons, ) except FloodWait as f: LOGGER.warning(str(f)) await sleep(f.value * 1.2) - return await send_file(message, file, caption) + return await send_file(message, file, caption, buttons) except Exception as e: LOGGER.error(str(e)) return str(e) @@ -89,6 +149,24 @@ async def delete_message(message): LOGGER.error(str(e)) +async def one_minute_del(message): + await sleep(60) + await delete_message(message) + + +async def five_minute_del(message): + await sleep(300) + await delete_message(message) + + +async def delete_links(message): + if not Config.DELETE_LINKS: + return + if reply_to := message.reply_to_message: + await delete_message(reply_to) + await delete_message(message) + + async def auto_delete_message(cmd_message=None, bot_message=None): await sleep(60) if cmd_message is not None: @@ -107,9 +185,30 @@ async def delete_status(): LOGGER.error(str(e)) -async def get_tg_link_message(link): +async def get_tg_link_message(link, user_id=""): message = None links = [] + user_s = None + + if user_id: + if user_id in session_cache: + user_s = session_cache[user_id] + else: + user_dict = user_data.get(user_id, {}) + session_string = user_dict.get("session_string") + if session_string: + user_s = Client( + f"session_{user_id}", + Config.TELEGRAM_API, + Config.TELEGRAM_HASH, + session_string=session_string, + no_updates=True, + ) + await user_s.start() + session_cache[user_id] = user_s + else: + user_s = TgClient.user + if link.startswith("https://t.me/"): private = False msg = re_match( @@ -130,9 +229,8 @@ async def get_tg_link_message(link): chat = msg[1] msg_id = msg[2] if "-" in msg_id: - start_id, end_id = msg_id.split("-") - msg_id = start_id = int(start_id) - end_id = int(end_id) + start_id, end_id = map(int, msg_id.split("-")) + msg_id = start_id btw = end_id - start_id if private: link = link.split("&message_id=")[0] @@ -154,31 +252,26 @@ async def get_tg_link_message(link): if not private: try: - message = await TgClient.bot.get_messages( - chat_id=chat, - message_ids=msg_id, - ) + message = await TgClient.bot.get_messages(chat_id=chat, message_ids=msg_id) if message.empty: private = True except Exception as e: private = True - if not TgClient.user: + if not user_s: raise e if not private: - return (links, "bot") if links else (message, "bot") - if TgClient.user: + return (links, TgClient.bot) if links else (message, TgClient.bot) + if user_s: try: - user_message = await TgClient.user.get_messages( + user_message = await user_s.get_messages( chat_id=chat, message_ids=msg_id, ) except Exception as e: - raise TgLinkException( - f"You don't have access to this chat!. ERROR: {e}", - ) from e + raise TgLinkException("We don't have access to this chat!") from e if not user_message.empty: - return (links, "user") if links else (user_message, "user") + return (links, user_s) if links else (user_message, user_s) return None raise TgLinkException("Private: Please report!") diff --git a/bot/modules/mirror_leech.py b/bot/modules/mirror_leech.py index 3936f57ca..37765ae0f 100644 --- a/bot/modules/mirror_leech.py +++ b/bot/modules/mirror_leech.py @@ -14,6 +14,7 @@ from bot.helper.ext_utils.exceptions import DirectDownloadLinkException from bot.helper.ext_utils.links_utils import ( is_gdrive_id, + is_mega_link, is_gdrive_link, is_magnet, is_rclone_path, @@ -27,6 +28,9 @@ from bot.helper.mirror_leech_utils.download_utils.direct_downloader import ( add_direct_download, ) +from bot.helper.mirror_leech_utils.download_utils.mega_download import ( + add_mega_download, +) from bot.helper.mirror_leech_utils.download_utils.direct_link_generator import ( direct_link_generator, ) @@ -348,6 +352,8 @@ async def new_event(self): await add_qb_torrent(self, path, ratio, seed_time) elif is_rclone_path(self.link): await add_rclone_download(self, f"{path}/") + elif is_mega_link(self.link): + await add_mega_download(self, f"{path}/") elif is_gdrive_link(self.link) or is_gdrive_id(self.link): await add_gd_download(self, path) else: diff --git a/config_sample.py b/config_sample.py index 03a7a7ed5..686a48b07 100644 --- a/config_sample.py +++ b/config_sample.py @@ -22,6 +22,7 @@ USE_SERVICE_ACCOUNTS = False NAME_SUBSTITUTE = "" FFMPEG_CMDS = [] +DELETE_LINKS = False # GDrive Tools GDRIVE_ID = "" @@ -37,6 +38,10 @@ RCLONE_SERVE_USER = "" RCLONE_SERVE_PASS = "" +# Mega credentials +MEGA_EMAIL = "" +MEGA_PASSWORD = "" + # Update UPSTREAM_REPO = "https://github.com/AeonOrg/Aeon-MLTB" UPSTREAM_BRANCH = "main"