Skip to content

Commit

Permalink
add mega, delete links, download with user_session
Browse files Browse the repository at this point in the history
  • Loading branch information
5hojib committed Dec 29, 2024
1 parent 3245762 commit 4f57064
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 225 deletions.
3 changes: 3 additions & 0 deletions bot/core/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Config:
DATABASE_URL = ""
DEFAULT_UPLOAD = "rc"
DOWNLOAD_DIR = "/usr/src/app/downloads/"
DELETE_LINKS = False
EQUAL_SPLITS = False
EXTENSION_FILTER = ""
FFMPEG_CMDS = []
Expand All @@ -24,6 +25,8 @@ class Config:
LEECH_SPLIT_SIZE = 2097152000
MEDIA_GROUP = False
MIXED_LEECH = False
MEGA_EMAIL = ""
MEGA_PASSWORD = ""
NAME_SUBSTITUTE = ""
OWNER_ID = 0
QUEUE_ALL = 0
Expand Down
76 changes: 14 additions & 62 deletions bot/helper/listeners/mega_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,22 @@ class AsyncExecutor:
def __init__(self):
self.continue_event = Event()

def do(
self,
function,
args,
):
def do(self, function, args):
self.continue_event.clear()
function(*args)
self.continue_event.wait()


async def mega_login(
executor,
api,
email,
password,
):
async def mega_login(executor, api, email, password):
if email and password:
await sync_to_async(
executor.do,
api.login,
(
email,
password,
),
(email, password),
)


async def mega_logout(
executor,
api,
folder_api=None,
):
async def mega_logout(executor, api, folder_api=None):
await sync_to_async(
executor.do,
api.logout,
Expand All @@ -62,6 +46,7 @@ class MegaAppListener(MegaListener):
)

def __init__(self, continue_event: Event, listener):
super().__init__()
self.continue_event = continue_event
self.node = None
self.public_node = None
Expand All @@ -71,7 +56,6 @@ def __init__(self, continue_event: Event, listener):
self._bytes_transferred = 0
self._speed = 0
self._name = ""
super().__init__()

@property
def speed(self):
Expand All @@ -81,12 +65,7 @@ def speed(self):
def downloaded_bytes(self):
return self._bytes_transferred

def onRequestFinish( # noqa: N802
self,
api,
request,
error,
):
def onRequestFinish(self, api, request, error): # noqa: N802
if str(error).lower() != "no error":
self.error = error.copy()
if str(self.error).casefold() != "not found":
Expand All @@ -112,12 +91,7 @@ def onRequestFinish( # noqa: N802
):
self.continue_event.set()

def onRequestTemporaryError( # noqa: N802
self,
_,
__,
error: MegaError,
):
def onRequestTemporaryError(self, _, __, error: MegaError): # noqa: N802
LOGGER.error(f"Mega Request error in {error}")
if not self.is_cancelled:
self.is_cancelled = True
Expand All @@ -128,27 +102,15 @@ def onRequestTemporaryError( # noqa: N802
self.error = error.toString()
self.continue_event.set()

def onTransferUpdate( # noqa: N802
self,
api: MegaApi,
transfer: MegaTransfer,
):
def onTransferUpdate(self, api: MegaApi, transfer: MegaTransfer): # noqa: N802
if self.is_cancelled:
api.cancelTransfer(
transfer,
None,
)
api.cancelTransfer(transfer, None)
self.continue_event.set()
return
self._speed = transfer.getSpeed()
self._bytes_transferred = transfer.getTransferredBytes()

def onTransferFinish( # noqa: N802
self,
_: MegaApi,
transfer: MegaTransfer,
__,
):
def onTransferFinish(self, _: MegaApi, transfer: MegaTransfer, __): # noqa: N802
try:
if self.is_cancelled:
self.continue_event.set()
Expand All @@ -160,27 +122,17 @@ def onTransferFinish( # noqa: N802
except Exception as e:
LOGGER.error(e)

def onTransferTemporaryError( # noqa: N802
self,
_,
transfer,
error,
):
def onTransferTemporaryError(self, _, transfer, error): # noqa: N802
LOGGER.error(
f"Mega download error in file {transfer.getFileName()}: {error}",
)
if transfer.getState() in [
1,
4,
]:
if transfer.getState() in [1, 4]:
return
self.error = (
f"TransferTempError: {error.toString()} ({transfer.getFileName()})"
)
self.error = f"TransferTempError: {error.toString()} ({transfer.getFileName()})"
if not self.is_cancelled:
self.is_cancelled = True
self.continue_event.set()

async def cancel_task(self):
self.is_cancelled = True
await self.listener.on_download_error("Download Canceled by user")
await self.listener.on_download_error("Download Canceled by user")
157 changes: 26 additions & 131 deletions bot/helper/mirror_leech_utils/download_utils/mega_download.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from secrets import token_hex

from aiofiles.os import makedirs
from mega import MegaApi

from bot import (
LOGGER,
config_dict,
non_queued_dl,
queue_dict_lock,
task_dict,
task_dict_lock,
)
from bot.core.config_manager import Config
from bot.helper.ext_utils.bot_utils import sync_to_async
from bot.helper.ext_utils.links_utils import get_mega_link_type
from bot.helper.ext_utils.task_manager import (
Expand All @@ -34,155 +33,65 @@


async def add_mega_download(listener, path):
MAIL = config_dict["MEGA_EMAIL"]
PASS = config_dict["MEGA_PASSWORD"]
email = Config.MEGA_EMAIL
password = Config.MEGA_PASSWORD

executor = AsyncExecutor()
api = MegaApi(
None,
None,
None,
"Aeon",
)
api = MegaApi(None, None, None, "Aeon")
folder_api = None

mega_listener = MegaAppListener(
executor.continue_event,
listener,
)
mega_listener = MegaAppListener(executor.continue_event, listener)
api.addListener(mega_listener)

await mega_login(
executor,
api,
MAIL,
PASS,
)
await mega_login(executor, api, email, password)

if get_mega_link_type(listener.link) == "file":
await sync_to_async(
executor.do,
api.getPublicNode,
(listener.link,),
)
await sync_to_async(executor.do, api.getPublicNode, (listener.link,))
node = mega_listener.public_node
else:
folder_api = MegaApi(
None,
None,
None,
"Aeon",
)
folder_api = MegaApi(None, None, None, "Aeon")
folder_api.addListener(mega_listener)
await sync_to_async(
executor.do,
folder_api.loginToFolder,
(listener.link,),
)
node = await sync_to_async(
folder_api.authorizeNode,
mega_listener.node,
)
await sync_to_async(executor.do, folder_api.loginToFolder, (listener.link,))
node = await sync_to_async(folder_api.authorizeNode, mega_listener.node)

if mega_listener.error:
mmsg = await send_message(
listener.message,
str(mega_listener.error),
)
await mega_logout(
executor,
api,
folder_api,
)
mmsg = await send_message(listener.message, str(mega_listener.error))
await mega_logout(executor, api, folder_api)
await delete_links(listener.message)
await auto_delete_message(
listener.message,
mmsg,
)
await auto_delete_message(listener.message, mmsg)
return

listener.name = (
listener.name or node.getName() # type: ignore
)
(
msg,
button,
) = await stop_duplicate_check(listener)
listener.name = listener.name or node.getName()
msg, button = await stop_duplicate_check(listener)
if msg:
mmsg = await send_message(
listener.message,
msg,
button,
)
await mega_logout(
executor,
api,
folder_api,
)
mmsg = await send_message(listener.message, msg, button)
await mega_logout(executor, api, folder_api)
await delete_links(listener.message)
await auto_delete_message(
listener.message,
mmsg,
)
await auto_delete_message(listener.message, mmsg)
return

gid = token_hex(4)
listener.size = api.getSize(node)
"""if limit_exceeded := await limit_checker(
listener,
is_mega=True
):
mmsg = await send_message(
listener.message,
limit_exceeded
)
await mega_logout(
executor,
api,
folder_api
)
await delete_links(listener.message)
await auto_delete_message(
listener.message,
mmsg
)
return"""

(
added_to_queue,
event,
) = await check_running_tasks(listener)
added_to_queue, event = await check_running_tasks(listener)
if added_to_queue:
LOGGER.info(f"Added to Queue/Download: {listener.name}")
async with task_dict_lock:
task_dict[listener.mid] = QueueStatus(
listener,
gid,
"Dl",
)
task_dict[listener.mid] = QueueStatus(listener, gid, "Dl")
await listener.on_download_start()
await send_status_message(listener.message)
await event.wait() # type: ignore
await event.wait()
async with task_dict_lock:
if listener.mid not in task_dict:
await mega_logout(
executor,
api,
folder_api,
)
await mega_logout(executor, api, folder_api)
return
from_queue = True
LOGGER.info(f"Start Queued Download from Mega: {listener.name}")
else:
from_queue = False

async with task_dict_lock:
task_dict[listener.mid] = MegaDownloadStatus(
listener,
mega_listener,
gid,
"dl",
)
task_dict[listener.mid] = MegaDownloadStatus(listener, mega_listener, gid, "dl")
async with queue_dict_lock:
non_queued_dl.add(listener.mid)

Expand All @@ -193,24 +102,10 @@ async def add_mega_download(listener, path):
await send_status_message(listener.message)
LOGGER.info(f"Download from Mega: {listener.name}")

await makedirs(
path,
exist_ok=True,
)
await makedirs(path, exist_ok=True)
await sync_to_async(
executor.do,
api.startDownload,
(
node,
path,
listener.name,
None,
False,
None,
),
)
await mega_logout(
executor,
api,
folder_api,
(node, path, listener.name, None, False, None),
)
await mega_logout(executor, api, folder_api)
Loading

0 comments on commit 4f57064

Please sign in to comment.