diff --git a/bot/__init__.py b/bot/__init__.py index 44579bf11..bbfd50b4d 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -63,7 +63,6 @@ queue_dict_lock = Lock() qb_listener_lock = Lock() cpu_eater_lock = Lock() -subprocess_lock = Lock() same_directory_lock = Lock() extension_filter = ["aria2", "!qB"] drives_names = [] diff --git a/bot/core/config_manager.py b/bot/core/config_manager.py index f1a3dd727..0f38a713b 100644 --- a/bot/core/config_manager.py +++ b/bot/core/config_manager.py @@ -15,7 +15,7 @@ class Config: DELETE_LINKS = False EXTENSION_FILTER = "" FSUB_IDS = "" - FFMPEG_CMDS: ClassVar[list[str]] = [] + FFMPEG_CMDS = {} FILELION_API = "" GDRIVE_ID = "" INCOMPLETE_TASK_NOTIFIER = False diff --git a/bot/helper/common.py b/bot/helper/common.py index 842e8f435..2fd5292bc 100644 --- a/bot/helper/common.py +++ b/bot/helper/common.py @@ -1,5 +1,5 @@ import contextlib -from asyncio import create_subprocess_exec, gather, sleep +from asyncio import create_subprocess_exec, gather, sleep, Lock from asyncio.subprocess import PIPE from os import path as ospath from os import walk @@ -17,7 +17,6 @@ extension_filter, intervals, multi_tags, - subprocess_lock, task_dict, task_dict_lock, user_data, @@ -75,6 +74,7 @@ def __init__(self): self.rc_flags = "" self.tag = "" self.name = "" + self.subname = "" self.new_dir = "" self.name_sub = "" self.thumbnail_layout = "" @@ -83,6 +83,7 @@ def __init__(self): self.max_split_size = 0 self.multi = 0 self.size = 0 + self.subsize = 0 self.is_leech = False self.is_qbit = False self.is_clone = False @@ -109,9 +110,11 @@ def __init__(self): self.is_torrent = False self.as_med = False self.as_doc = False + self.is_file = False self.ffmpeg_cmds = None self.chat_thread_id = None self.subproc = None + self.subprocess_lock = Lock() self.thumb = None self.extension_filter = [] self.is_super_chat = self.message.chat.type.name in ["SUPERGROUP", "CHANNEL"] @@ -201,11 +204,13 @@ async def before_start(self): ): self.up_dest = self.user_dict["upload_paths"][self.up_dest] - self.ffmpeg_cmds = ( - self.ffmpeg_cmds - or self.user_dict.get("ffmpeg_cmds", None) - or (Config.FFMPEG_CMDS if "ffmpeg_cmds" not in self.user_dict else None) - ) + if self.ffmpeg_cmds and not isinstance(self.ffmpeg_cmds, list): + if self.user_dict.get("ffmpeg_cmds", None): + self.ffmpeg_cmds = self.user_dict["ffmpeg_cmds"].get(self.ffmpeg_cmds, None) + elif "ffmpeg_cmds" not in self.user_dict and Config.FFMPEG_CMDS: + self.ffmpeg_cmds = Config.FFMPEG_CMDS.get(self.ffmpeg_cmds, None) + else: + self.ffmpeg_cmds = None if self.ffmpeg_cmds: self.seed = False @@ -553,18 +558,17 @@ async def decompress_zst(self, dl_path, is_dir=False): cmd = ["unzstd", f_path, "-o", out_path] if self.is_cancelled: return "" - async with subprocess_lock: + async with self.subprocess_lock: self.subproc = await create_subprocess_exec( *cmd, stderr=PIPE, ) - _, stderr = await self.subproc.communicate() + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code != 0: try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( @@ -572,21 +576,21 @@ async def decompress_zst(self, dl_path, is_dir=False): ) elif not self.seed: await remove(f_path) + self.subproc = None return None if dl_path.endswith(".zst"): out_path = get_base_name(dl_path) cmd = ["unzstd", dl_path, "-o", out_path] if self.is_cancelled: return "" - async with subprocess_lock: + async with self.subprocess_lock: self.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await self.subproc.communicate() + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code != 0: try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( @@ -594,6 +598,7 @@ async def decompress_zst(self, dl_path, is_dir=False): ) elif not self.seed: await remove(dl_path) + self.subproc = None return out_path return dl_path @@ -603,7 +608,7 @@ async def proceed_extract(self, dl_path, gid): LOGGER.info(f"Extracting: {self.name}") async with task_dict_lock: task_dict[self.mid] = SevenZStatus(self, gid, "Extract") - if await aiopath.isdir(dl_path): + if not self.is_file: if self.seed: self.new_dir = f"{self.dir}10000" up_path = f"{self.new_dir}/{self.name}" @@ -635,23 +640,27 @@ async def proceed_extract(self, dl_path, gid): f"-o{t_path}", "-aot", "-xr!@PaxHeader", + "-bsp1", + "-bse1", + "-bb3", ] if not pswd: del cmd[2] if self.is_cancelled: return "" - async with subprocess_lock: + self.subname = file_ + async with self.subprocess_lock: self.subproc = await create_subprocess_exec( *cmd, + stdout=PIPE, stderr=PIPE, ) - _, stderr = await self.subproc.communicate() + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code != 0: try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( @@ -669,6 +678,7 @@ async def proceed_extract(self, dl_path, gid): await remove(del_path) except Exception: self.is_cancelled = True + self.subproc = None return up_path dl_path = await self.decompress_zst(dl_path) up_path = get_base_name(dl_path) @@ -683,17 +693,21 @@ async def proceed_extract(self, dl_path, gid): f"-o{up_path}", "-aot", "-xr!@PaxHeader", + "-bsp1", + "-bse1", + "-bb3", ] if not pswd: del cmd[2] if self.is_cancelled: return "" - async with subprocess_lock: - self.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await self.subproc.communicate() + async with self.subprocess_lock: + self.subproc = await create_subprocess_exec( + *cmd, stdout=PIPE, stderr=PIPE + ) + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code == -9: self.is_cancelled = True return "" @@ -704,21 +718,24 @@ async def proceed_extract(self, dl_path, gid): await remove(dl_path) except Exception: self.is_cancelled = True + self.subproc = None return up_path try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( f"{stderr}. Unable to extract archive! Uploading anyway. Path: {dl_path}", ) self.new_dir = "" + self.subproc = None return dl_path except NotSupportedExtractionArchive: LOGGER.info( f"Not any valid archive, uploading file as it is. Path: {dl_path}", ) self.new_dir = "" + self.subproc = None return dl_path async def proceed_compress(self, dl_path, gid, o_files, ft_delete): @@ -738,12 +755,15 @@ async def proceed_compress(self, dl_path, gid, o_files, ft_delete): "7z", f"-v{split_size}b", "a", - "-mx=0", + "-mx=9", f"-p{pswd}", up_path, dl_path, + "-bsp1", + "-bse1", + "-bb3", ] - if await aiopath.isdir(dl_path): + if not self.is_file: cmd.extend(f"-xr!*.{ext}" for ext in self.extension_filter) if o_files: for f in o_files: @@ -763,12 +783,11 @@ async def proceed_compress(self, dl_path, gid, o_files, ft_delete): LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}") if self.is_cancelled: return "" - async with subprocess_lock: - self.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await self.subproc.communicate() + async with self.subprocess_lock: + self.subproc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code == -9: self.is_cancelled = True return "" @@ -780,15 +799,17 @@ async def proceed_compress(self, dl_path, gid, o_files, ft_delete): with contextlib.suppress(Exception): await remove(f) ft_delete.clear() + self.suproc = None return up_path await clean_target(self.new_dir) if not delete: self.new_dir = "" try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error(f"{stderr}. Unable to zip this path: {dl_path}") + self.subproc = None return dl_path async def proceed_split(self, up_dir, m_size, o_files, gid): @@ -805,6 +826,11 @@ async def proceed_split(self, up_dir, m_size, o_files, gid): async with task_dict_lock: task_dict[self.mid] = FFmpegStatus(self, gid, "Split") LOGGER.info(f"Splitting: {self.name}") + if self.is_file: + self.subsize = self.size + else: + self.subsize = f_size + self.subname = file_ res = await split_file( f_path, f_size, @@ -813,6 +839,7 @@ async def proceed_split(self, up_dir, m_size, o_files, gid): self.split_size, self, ) + self.subproc = None if self.is_cancelled: return if not res: @@ -852,17 +879,19 @@ async def generate_sample_video(self, dl_path, gid, unwanted_files, ft_delete): task_dict[self.mid] = FFmpegStatus(self, gid, "Sample Video") checked = False - if await aiopath.isfile(dl_path): + if self.is_file: if (await get_document_type(dl_path))[0]: checked = True async with cpu_eater_lock: LOGGER.info(f"Creating Sample video: {self.name}") + self.subsize = self.size res = await create_sample_video( self, dl_path, sample_duration, part_duration, ) + self.suproc = None if res: new_folder = ospath.splitext(dl_path)[0] name = dl_path.rsplit("/", 1)[1] @@ -902,12 +931,15 @@ async def generate_sample_video(self, dl_path, gid, unwanted_files, ft_delete): if checked: cpu_eater_lock.release() return "" + self.subsize = await aiopath.getsize(f_path) + self.subname = file_ res = await create_sample_video( self, f_path, sample_duration, part_duration, ) + self.subproc = None if res: ft_delete.append(res) if checked: @@ -975,7 +1007,13 @@ async def proceed_convert(m_path): LOGGER.info(f"Converting: {self.name}") else: LOGGER.info(f"Converting: {m_path}") + if self.is_file: + self.subsize = self.size + else: + self.subsize = await aiopath.getsize(m_path) + self.subname = m_path.rsplit("/", 1)[-1] res = await convert_video(self, m_path, vext) + self.subproc = None return "" if self.is_cancelled else res if ( is_audio @@ -996,11 +1034,17 @@ async def proceed_convert(m_path): LOGGER.info(f"Converting: {self.name}") else: LOGGER.info(f"Converting: {m_path}") + if self.is_file: + self.subsize = self.size + else: + self.subsize = await aiopath.getsize(m_path) + self.subname = m_path.rsplit("/", 1)[-1] res = await convert_audio(self, m_path, aext) + self.subproc = None return "" if self.is_cancelled else res return "" - if await aiopath.isfile(dl_path): + if self.is_file: output_file = await proceed_convert(dl_path) if checked: cpu_eater_lock.release() @@ -1042,7 +1086,7 @@ async def proceed_convert(m_path): async def generate_screenshots(self, dl_path): ss_nb = int(self.screen_shots) if isinstance(self.screen_shots, str) else 10 - if await aiopath.isfile(dl_path): + if self.is_file: if (await get_document_type(dl_path))[0]: LOGGER.info(f"Creating Screenshot for: {dl_path}") res = await take_ss(dl_path, ss_nb) @@ -1078,7 +1122,7 @@ async def generate_screenshots(self, dl_path): return dl_path async def substitute(self, dl_path): - if await aiopath.isfile(dl_path): + if self.is_file: up_dir, name = dl_path.rsplit("/", 1) for substitution in self.name_sub: sen = False @@ -1152,7 +1196,8 @@ async def proceed_ffmpeg(self, dl_path, gid): for item in self.ffmpeg_cmds ] for ffmpeg_cmd in cmds: - cmd = ["xtra", "-hide_banner", "-loglevel", "error", *ffmpeg_cmd] + cmd = ["xtra", "-hide_banner", "-loglevel", "error", "-progress", + "pipe:1", *ffmpeg_cmd] if "-del" in cmd: cmd.remove("-del") delete_files = True @@ -1168,7 +1213,7 @@ async def proceed_ffmpeg(self, dl_path, gid): ext = "all" else: ext = ospath.splitext(input_file)[-1] - if await aiopath.isfile(dl_path): + if self.is_file: is_video, is_audio, _ = await get_document_type(dl_path) if (not is_video and not is_audio) or (is_video and ext == "audio"): break @@ -1189,7 +1234,9 @@ async def proceed_ffmpeg(self, dl_path, gid): await cpu_eater_lock.acquire() LOGGER.info(f"Running ffmpeg cmd for: {file_path}") cmd[index + 1] = file_path + self.subsize = self.size res = await run_ffmpeg_cmd(self, cmd, file_path) + self.subproc = None if res and delete_files: await remove(file_path) directory = ospath.dirname(res) @@ -1229,7 +1276,10 @@ async def proceed_ffmpeg(self, dl_path, gid): ) await cpu_eater_lock.acquire() LOGGER.info(f"Running ffmpeg cmd for: {f_path}") + self.subsize = await aiopath.getsize(f_path) + self.subname = file_ res = await run_ffmpeg_cmd(self, cmd, f_path) + self.suproc = None if res and delete_files: await remove(f_path) directory = ospath.dirname(res) diff --git a/bot/helper/ext_utils/bot_utils.py b/bot/helper/ext_utils/bot_utils.py index af41adb6a..7de18f299 100644 --- a/bot/helper/ext_utils/bot_utils.py +++ b/bot/helper/ext_utils/bot_utils.py @@ -120,7 +120,13 @@ def process_argument_with_values(start_index): values = [] for j in range(start_index + 1, total): if items[j] in arg_base: - break + check = " ".join(values).strip() + if check.startswith("[") and check.endswith("]"): + break + elif check.startswith("["): + pass + else: + break values.append(items[j]) return values @@ -141,16 +147,6 @@ def process_argument_with_values(start_index): "-med", ]: arg_base[part] = True - elif part == "-ff": - i += 1 - if i < total: - values = [] - while i < total: - values.append(items[i]) - if items[i].endswith("]"): - break - i += 1 - arg_base[part] = " ".join(values) else: sub_list = process_argument_with_values(i) if sub_list: diff --git a/bot/helper/ext_utils/db_handler.py b/bot/helper/ext_utils/db_handler.py index b41bfe47f..7a7f52e09 100644 --- a/bot/helper/ext_utils/db_handler.py +++ b/bot/helper/ext_utils/db_handler.py @@ -84,10 +84,8 @@ async def update_qbittorrent(self, key, value): async def save_qbit_settings(self): if self._return: return - await self.db.settings.qbittorrent.replace_one( - {"_id": TgClient.ID}, - qbit_options, - upsert=True, + await self.db.settings.qbittorrent.update_one( + {"_id": TgClient.ID}, {"$set": qbit_options}, upsert=True ) async def update_private_file(self, path): diff --git a/bot/helper/ext_utils/files_utils.py b/bot/helper/ext_utils/files_utils.py index afc2eebd6..5bb2e5d27 100644 --- a/bot/helper/ext_utils/files_utils.py +++ b/bot/helper/ext_utils/files_utils.py @@ -114,7 +114,7 @@ def exit_clean_up(_, __): LOGGER.info("Please wait, while we clean up and stop the running downloads") clean_all() srun( - ["pkill", "-9", "-f", "gunicorn|xria|xnox|xtra|xone"], + ["pkill", "-9", "-f", "gunicorn|xria|xnox|xtra|xone|7z"], check=False, ) exit(0) diff --git a/bot/helper/ext_utils/help_messages.py b/bot/helper/ext_utils/help_messages.py index a5fa275eb..47f204883 100644 --- a/bot/helper/ext_utils/help_messages.py +++ b/bot/helper/ext_utils/help_messages.py @@ -271,6 +271,7 @@ Notes: 1. Add -del to the list(s) which you want from the bot to delete the original files after command run complete! 2. Seed will get disbaled while using this option +3. To execute one of pre-added lists in bot like: ({"subtitle": ["-i mltb.mkv -c copy -c:s srt mltb.mkv"]}), you must use -ff subtitle (list key) Examples: ["-i mltb.mkv -c copy -c:s srt mltb.mkv", "-i mltb.video -c copy -c:s srt mltb", "-i mltb.m4a -c:a libmp3lame -q:a 2 mltb.mp3", "-i mltb.audio -c:a libmp3lame -q:a 2 mltb.mp3"] Here I will explain how to use mltb.* which is reference to files you want to work on. 1. First cmd: the input is mltb.mkv so this cmd will work only on mkv videos and the output is mltb.mkv also so all outputs is mkv. -del will delete the original media after complete run of the cmd. diff --git a/bot/helper/ext_utils/media_utils.py b/bot/helper/ext_utils/media_utils.py index f13dbd8b8..c293d80e3 100644 --- a/bot/helper/ext_utils/media_utils.py +++ b/bot/helper/ext_utils/media_utils.py @@ -12,7 +12,7 @@ from aioshutil import rmtree from PIL import Image -from bot import LOGGER, subprocess_lock +from bot import LOGGER from bot.core.config_manager import Config from .bot_utils import cmd_exec, sync_to_async @@ -28,8 +28,14 @@ async def convert_video(listener, video_file, ext, retry=False): "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-i", video_file, + "-map", + "0:v", + "-map", + "0:a", "-c:v", "libx264", "-c:a", @@ -39,17 +45,19 @@ async def convert_video(listener, video_file, ext, retry=False): output, ] if ext == "mp4": - cmd[10:10] = ["-c:s", "mov_text"] + cmd[16:16] = ["-c:s", "mov_text"] elif ext == "mkv": - cmd[10:10] = ["-c:s", "ass"] + cmd[16:16] = ["-c:s", "ass"] else: - cmd[10:10] = ["-c:s", "copy"] + cmd[16:16] = ["-c:s", "copy"] else: cmd = [ "xtra", "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-i", video_file, "-map", @@ -60,12 +68,12 @@ async def convert_video(listener, video_file, ext, retry=False): ] if listener.is_cancelled: return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == 0: return output if code == -9: @@ -76,7 +84,7 @@ async def convert_video(listener, video_file, ext, retry=False): await remove(output) return await convert_video(listener, video_file, ext, True) try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( @@ -93,6 +101,8 @@ async def convert_audio(listener, audio_file, ext): "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-i", audio_file, "-threads", @@ -101,19 +111,19 @@ async def convert_audio(listener, audio_file, ext): ] if listener.is_cancelled: return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == 0: return output if code == -9: listener.is_cancelled = True return False try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( @@ -451,6 +461,8 @@ async def split_file( "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-ss", str(start_time), "-i", @@ -470,22 +482,25 @@ async def split_file( out_path, ] if not multi_streams: - del cmd[10] - del cmd[10] - if listener.is_cancelled: - return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await listener.subproc.communicate() + del cmd[12] + del cmd[12] if listener.is_cancelled: return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec( + *cmd, stdout=PIPE, stderr=PIPE + ) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False + code = listener.subproc.returncode if code == -9: listener.is_cancelled = True return False if code != 0: try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" with contextlib.suppress(Exception): @@ -542,8 +557,9 @@ async def split_file( start_time += lpd - 3 i += 1 else: + listener.subsize = 0 out_path = f"{dirpath}/{file_}." - async with subprocess_lock: + async with listener.subprocess_lock: if listener.is_cancelled: return False listener.subproc = await create_subprocess_exec( @@ -555,16 +571,16 @@ async def split_file( out_path, stderr=PIPE, ) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == -9: listener.is_cancelled = True return False if code != 0: try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error(f"{stderr}. Split Document: {path}") @@ -604,6 +620,8 @@ async def create_sample_video(listener, video_file, sample_duration, part_durati "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-i", video_file, "-filter_complex", @@ -623,19 +641,19 @@ async def create_sample_video(listener, video_file, sample_duration, part_durati if listener.is_cancelled: return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == -9: listener.is_cancelled = True return False if code == 0: return output_file try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( @@ -756,19 +774,19 @@ async def run_ffmpeg_cmd(listener, ffmpeg, path): ffmpeg[-1] = output if listener.is_cancelled: return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*ffmpeg, stderr=PIPE) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec(*ffmpeg, stdout=PIPE, stderr=PIPE) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == 0: return output if code == -9: listener.is_cancelled = True return False try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( diff --git a/bot/helper/ext_utils/status_utils.py b/bot/helper/ext_utils/status_utils.py index c7adcbedf..5f2e042db 100644 --- a/bot/helper/ext_utils/status_utils.py +++ b/bot/helper/ext_utils/status_utils.py @@ -201,21 +201,31 @@ async def get_readable_message(sid, is_user, page_no=1, status="All", page_step= else: msg += f"{index + start_position}.{tstatus}: " msg += f"{escape(f'{task.name()}')}" - if tstatus not in [ - MirrorStatus.STATUS_SPLIT, - MirrorStatus.STATUS_SEED, - MirrorStatus.STATUS_SAMVID, - MirrorStatus.STATUS_CONVERT, - MirrorStatus.STATUS_FFMPEG, - MirrorStatus.STATUS_QUEUEUP, - ]: + if ( + tstatus + not in [ + MirrorStatus.STATUS_SEED, + MirrorStatus.STATUS_QUEUEUP, + MirrorStatus.STATUS_SPLIT, + ] + or (MirrorStatus.STATUS_SPLIT + and task.listener.subsize) + ): progress = ( await task.progress() if iscoroutinefunction(task.progress) else task.progress() ) + if task.listener.subname: + msg += f"\n{task.listener.subname[:35]}" msg += f"\n{get_progress_bar_string(progress)} {progress}" - msg += f"\nProcessed: {task.processed_bytes()} of {task.size()}" + if task.listener.subname: + size = ( + f"{get_readable_file_size(task.listener.subsize)} ({task.size()})" + ) + else: + size = task.size() + msg += f"\nProcessed: {task.processed_bytes()} of {size}" msg += f"\nSpeed: {task.speed()} | ETA: {task.eta()}" if hasattr(task, "seeders_num"): with contextlib.suppress(Exception): diff --git a/bot/helper/listeners/task_listener.py b/bot/helper/listeners/task_listener.py index fd29f2ca0..401c00c07 100644 --- a/bot/helper/listeners/task_listener.py +++ b/bot/helper/listeners/task_listener.py @@ -165,6 +165,7 @@ async def on_download_complete(self): return up_path = f"{self.dir}/{self.name}" + self.is_file = await aiopath.isfile(up_path) self.size = await get_path_size(up_path) if not Config.QUEUE_ALL: async with queue_dict_lock: @@ -172,15 +173,19 @@ async def on_download_complete(self): non_queued_dl.remove(self.mid) await start_from_queued() - if self.join and await aiopath.isdir(up_path): + if self.join and not self.is_file: await join_files(up_path) if self.extract: up_path = await self.proceed_extract(up_path, gid) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None + self.subname = "" + self.subsize = 0 if self.ffmpeg_cmds: up_path = await self.proceed_ffmpeg( @@ -189,21 +194,28 @@ async def on_download_complete(self): ) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None + self.subname = "" + self.subsize = 0 if self.name_sub: up_path = await self.substitute(up_path) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) self.name = up_path.rsplit("/", 1)[1] if self.screen_shots: up_path = await self.generate_screenshots(up_path) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None if self.convert_audio or self.convert_video: up_path = await self.convert_media( @@ -215,8 +227,12 @@ async def on_download_complete(self): ) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None + self.subname = "" + self.subsize = 0 if self.sample_video: up_path = await self.generate_sample_video( @@ -227,8 +243,12 @@ async def on_download_complete(self): ) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None + self.subname = "" + self.subsize = 0 if self.compress: up_path = await self.proceed_compress( @@ -237,8 +257,12 @@ async def on_download_complete(self): unwanted_files, files_to_delete, ) + self.is_file = await aiopath.isfile(up_path) if self.is_cancelled: return + self.subproc = None + self.subname = "" + self.subsize = 0 up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) @@ -252,6 +276,9 @@ async def on_download_complete(self): ) if self.is_cancelled: return + self.subproc = None + self.subname = "" + self.subsize = 0 add_to_queue, event = await check_running_tasks(self, "up") await start_from_queued() diff --git a/bot/helper/mirror_leech_utils/status_utils/ffmpeg_status.py b/bot/helper/mirror_leech_utils/status_utils/ffmpeg_status.py index b193eb74f..184e0cb3e 100644 --- a/bot/helper/mirror_leech_utils/status_utils/ffmpeg_status.py +++ b/bot/helper/mirror_leech_utils/status_utils/ffmpeg_status.py @@ -1,14 +1,56 @@ -from bot import LOGGER, subprocess_lock -from bot.helper.ext_utils.status_utils import MirrorStatus, get_readable_file_size +from .... import LOGGER +from ...ext_utils.bot_utils import new_task +from ...ext_utils.status_utils import ( + get_readable_file_size, + MirrorStatus, + get_readable_time, +) class FFmpegStatus: def __init__(self, listener, gid, status=""): self.listener = listener self._gid = gid - self._size = self.listener.size + self._processed_bytes = 0 + self._speed_raw = 0 + self._progress_raw = 0 + self._active = False self.cstatus = status + @new_task + async def _ffmpeg_progress(self): + while True: + async with self.listener.subprocess_lock: + if self.listener.subproc is None or self.listener.is_cancelled: + break + line = await self.listener.subproc.stdout.readline() + if not line: + break + line = line.decode().strip() + if "=" in line: + key, value = line.split("=", 1) + if value != "N/A": + if key == "total_size": + self._processed_bytes = int(value) + self._progress_raw = ( + self._processed_bytes / self.listener.subsize * 100 + ) + elif key == "bitrate": + self._speed_raw = (float(value.strip("kbits/s")) / 8) * 1000 + self._active = False + + def speed(self): + return f"{get_readable_file_size(self._speed_raw)}/s" + + def processed_bytes(self): + return get_readable_file_size(self._processed_bytes) + + async def progress(self): + if not self._active and self.listener.subsize and self.listener.subproc is not None: + await self._ffmpeg_progress() + self._active = True + return f"{round(self._progress_raw, 2)}%" + def gid(self): return self._gid @@ -16,27 +58,35 @@ def name(self): return self.listener.name def size(self): - return get_readable_file_size(self._size) + return get_readable_file_size(self.listener.size) + + def eta(self): + try: + seconds = (self.listener.subsize - self._processed_bytes) / self._speed_raw + return get_readable_time(seconds) + except: + return "-" def status(self): if self.cstatus == "Convert": return MirrorStatus.STATUS_CONVERT - if self.cstatus == "Split": + elif self.cstatus == "Split": return MirrorStatus.STATUS_SPLIT - if self.cstatus == "Sample Video": + elif self.cstatus == "Sample Video": return MirrorStatus.STATUS_SAMVID - return MirrorStatus.STATUS_FFMPEG + else: + return MirrorStatus.STATUS_FFMPEG def task(self): return self async def cancel_task(self): LOGGER.info(f"Cancelling {self.cstatus}: {self.listener.name}") - self.listener.is_cancelled = True - async with subprocess_lock: + async with self.listener.subprocess_lock: + self.listener.is_cancelled = True if ( self.listener.subproc is not None and self.listener.subproc.returncode is None ): self.listener.subproc.kill() - await self.listener.on_upload_error(f"{self.cstatus} stopped by user!") + await self.listener.on_upload_error(f"{self.cstatus} stopped by user!") \ No newline at end of file diff --git a/bot/helper/mirror_leech_utils/status_utils/sevenz_status.py b/bot/helper/mirror_leech_utils/status_utils/sevenz_status.py index e2e92e9bf..b6b00ce6c 100644 --- a/bot/helper/mirror_leech_utils/status_utils/sevenz_status.py +++ b/bot/helper/mirror_leech_utils/status_utils/sevenz_status.py @@ -1,10 +1,11 @@ from time import time +from re import search -from bot import LOGGER, subprocess_lock -from bot.helper.ext_utils.files_utils import get_path_size -from bot.helper.ext_utils.status_utils import ( - MirrorStatus, +from .... import LOGGER +from ...ext_utils.bot_utils import new_task +from ...ext_utils.status_utils import ( get_readable_file_size, + MirrorStatus, get_readable_time, ) @@ -12,70 +13,103 @@ class SevenZStatus: def __init__(self, listener, gid, status=""): self.listener = listener - self._size = self.listener.size self._gid = gid self._start_time = time() - self._proccessed_bytes = 0 + self._processed_bytes = 0 + self._progress_str = "0%" + self._active = False self.cstatus = status + @new_task + async def _sevenz_progress(self): + pattern = r"\b(?:Add\s+new\s+data\s+to\s+archive:.*?,\s+(\d+)\s+bytes|Physical\s+Size\s*=\s*(\d+))" + while True: + async with self.listener.subprocess_lock: + if self.listener.subproc is None or self.listener.is_cancelled: + break + line = await self.listener.subproc.stdout.readline() + line = line.decode().strip() + if line.startswith("Add new data to archive:") or line.startswith( + "Physical Size =" + ): + if match := search(pattern, line): + size = match[1] or match[2] + self.listener.subsize = int(size) + break + s = b"" + while True: + async with self.listener.subprocess_lock: + if self.listener.is_cancelled or self.listener.subproc is None: + break + char = await self.listener.subproc.stdout.read(1) + if not char: + break + s += char + if char == b"%": + try: + self._progress_str = s.decode().rsplit(" ", 1)[-1].strip() + self._processed_bytes = ( + int(self._progress_str.strip("%")) / 100 + ) * self.listener.subsize + except: + self._processed_bytes = 0 + self._progress_str = "0%" + s = b"" + + self._active = False + def gid(self): return self._gid - def speed_raw(self): - return self._proccessed_bytes / (time() - self._start_time) - - async def progress_raw(self): - await self.processed_raw() - try: - return self._proccessed_bytes / self._size * 100 - except Exception: - return 0 + def _speed_raw(self): + return self._processed_bytes / (time() - self._start_time) async def progress(self): - return f"{round(await self.progress_raw(), 2)}%" + if not self._active and self.listener.subproc is not None: + await self._sevenz_progress() + self._active = True + return self._progress_str def speed(self): - return f"{get_readable_file_size(self.speed_raw())}/s" + return f"{get_readable_file_size(self._speed_raw())}/s" + + def processed_bytes(self): + return get_readable_file_size(self._processed_bytes) def name(self): return self.listener.name def size(self): - return get_readable_file_size(self._size) + return get_readable_file_size(self.listener.size) def eta(self): try: - seconds = (self._size - self._proccessed_bytes) / self.speed_raw() + seconds = ( + self.listener.subsize - self._processed_bytes + ) / self._speed_raw() return get_readable_time(seconds) - except Exception: + except: return "-" def status(self): if self.cstatus == "Extract": return MirrorStatus.STATUS_EXTRACT - return MirrorStatus.STATUS_ARCHIVE + else: + return MirrorStatus.STATUS_ARCHIVE def processed_bytes(self): - return get_readable_file_size(self._proccessed_bytes) - - async def processed_raw(self): - if self.listener.new_dir: - self._proccessed_bytes = await get_path_size(self.listener.new_dir) - else: - self._proccessed_bytes = ( - await get_path_size(self.listener.dir) - self._size - ) + return get_readable_file_size(self._processed_bytes) def task(self): return self async def cancel_task(self): LOGGER.info(f"Cancelling {self.cstatus}: {self.listener.name}") - self.listener.is_cancelled = True - async with subprocess_lock: + async with self.listener.subprocess_lock: + self.listener.is_cancelled = True if ( self.listener.subproc is not None and self.listener.subproc.returncode is None ): self.listener.subproc.kill() - await self.listener.on_upload_error(f"{self.cstatus} stopped by user!") + await self.listener.on_upload_error(f"{self.cstatus} stopped by user!") \ No newline at end of file diff --git a/bot/helper/mirror_leech_utils/status_utils/yt_dlp_status.py b/bot/helper/mirror_leech_utils/status_utils/yt_dlp_status.py index cc180e7b7..3ddd4399c 100644 --- a/bot/helper/mirror_leech_utils/status_utils/yt_dlp_status.py +++ b/bot/helper/mirror_leech_utils/status_utils/yt_dlp_status.py @@ -11,19 +11,19 @@ def __init__(self, listener, obj, gid): self._obj = obj self._gid = gid self.listener = listener - self._proccessed_bytes = 0 + self._processed_bytes = 0 def gid(self): return self._gid def processed_bytes(self): - return get_readable_file_size(self._proccessed_bytes) + return get_readable_file_size(self._processed_bytes) async def processed_raw(self): if self._obj.downloaded_bytes != 0: - self._proccessed_bytes = self._obj.downloaded_bytes + self._processed_bytes = self._obj.downloaded_bytes else: - self._proccessed_bytes = await get_path_size(self.listener.dir) + self._processed_bytes = await get_path_size(self.listener.dir) def size(self): return get_readable_file_size(self._obj.size) @@ -46,7 +46,7 @@ def eta(self): return get_readable_time(self._obj.eta) try: seconds = ( - self._obj.size - self._proccessed_bytes + self._obj.size - self._processed_bytes ) / self._obj.download_speed return get_readable_time(seconds) except Exception: diff --git a/bot/helper/mirror_leech_utils/telegram_uploader.py b/bot/helper/mirror_leech_utils/telegram_uploader.py index 512eeb420..2d47dcbd5 100644 --- a/bot/helper/mirror_leech_utils/telegram_uploader.py +++ b/bot/helper/mirror_leech_utils/telegram_uploader.py @@ -270,6 +270,9 @@ async def upload(self, o_files, ft_delete): delete_file = False self._error = "" self._up_path = f_path = ospath.join(dirpath, file_) + if not ospath.exists(self._up_path): + LOGGER.error(f"{self._up_path} not exists! Continue uploading!") + continue if self._up_path in ft_delete: delete_file = True if self._up_path in o_files: diff --git a/bot/modules/bot_settings.py b/bot/modules/bot_settings.py index 5689de987..8faed4b87 100644 --- a/bot/modules/bot_settings.py +++ b/bot/modules/bot_settings.py @@ -187,6 +187,8 @@ async def edit_variable(_, message, pre_message, key): value = int(value) elif value.startswith("[") and value.endswith("]"): value = eval(value) + elif value.startswith("{") and value.endswith("}"): + value = eval(value) if key not in [ "CMD_SUFFIX", "OWNER_ID", diff --git a/bot/modules/mirror_leech.py b/bot/modules/mirror_leech.py index bc1bedb4e..3dff3a616 100644 --- a/bot/modules/mirror_leech.py +++ b/bot/modules/mirror_leech.py @@ -166,7 +166,10 @@ async def new_event(self): self.multi = 0 try: - self.ffmpeg_cmds = eval(args["-ff"]) + if args["-ff"].strip().startswith("["): + self.ffmpeg_cmds = eval(args["-ff"]) + else: + self.ffmpeg_cmds = args["-ff"] except Exception as e: self.ffmpeg_cmds = None LOGGER.error(e) diff --git a/bot/modules/restart.py b/bot/modules/restart.py index 188869e19..9c6428957 100644 --- a/bot/modules/restart.py +++ b/bot/modules/restart.py @@ -121,7 +121,7 @@ async def confirm_restart(_, query): "pkill", "-9", "-f", - "gunicorn|xria|xnox|xtra|xone", + "gunicorn|xria|xnox|xtra|xone|7z", ) proc2 = await create_subprocess_exec("python3", "update.py") await gather(proc1.wait(), proc2.wait()) diff --git a/bot/modules/stats.py b/bot/modules/stats.py index aafae5eab..0b9fd2b47 100644 --- a/bot/modules/stats.py +++ b/bot/modules/stats.py @@ -27,7 +27,7 @@ "python": (["python3", "--version"], r"Python ([\d.]+)"), "rclone": (["xone", "--version"], r"rclone v([\d.]+)"), "yt-dlp": (["yt-dlp", "--version"], r"([\d.]+)"), - "ffmpeg": (["xtra", "-version"], r"ffmpeg version ([\d.]+(-\w+)?).*"), + "ffmpeg": (["xtra", "-version"], r"ffmpeg version (n[\d.]+)"), "7z": (["7z", "i"], r"7-Zip ([\d.]+)"), } diff --git a/bot/modules/users_settings.py b/bot/modules/users_settings.py index 63a2965b1..7506575c2 100644 --- a/bot/modules/users_settings.py +++ b/bot/modules/users_settings.py @@ -233,7 +233,7 @@ async def set_option(_, message, pre_event, option): user_dict["upload_paths"][name] = path value = user_dict["upload_paths"] elif option == "ffmpeg_cmds": - if value.startswith("[") and value.endswith("]"): + if value.startswith("{") and value.endswith("}"): try: value = eval(value) except Exception as e: @@ -547,11 +547,12 @@ async def edit_user_settings(client, query): ) buttons.data_button("Back", f"userset {user_id} back") buttons.data_button("Close", f"userset {user_id} close") - rmsg = """list of lists of ffmpeg commands. You can set multiple ffmpeg commands for all files before upload. Don't write ffmpeg at beginning, start directly with the arguments. + rmsg = """Dict of list values of ffmpeg commands. You can set multiple ffmpeg commands for all files before upload. Don't write ffmpeg at beginning, start directly with the arguments. +Examples: {"subtitle": ["-i mltb.mkv -c copy -c:s srt mltb.mkv", "-i mltb.video -c copy -c:s srt mltb"], "convert": ["-i mltb.m4a -c:a libmp3lame -q:a 2 mltb.mp3", "-i mltb.audio -c:a libmp3lame -q:a 2 mltb.mp3"]} Notes: -1. Add -del to the list which you want from the bot to delete the original files after command run complete! -2. Seed will get disbaled while using this option -Examples: ["-i mltb.mkv -c copy -c:s srt mltb.mkv", "-i mltb.video -c copy -c:s srt mltb", "-i mltb.m4a -c:a libmp3lame -q:a 2 mltb.mp3", "-i mltb.audio -c:a libmp3lame -q:a 2 mltb.mp3"] +- Add `-del` to the list which you want from the bot to delete the original files after command run complete! +- To execute one of those lists in bot for example, you must use -ff subtitle (list key) or -ff convert (list key) +- Seed will get disbaled while using this option Here I will explain how to use mltb.* which is reference to files you want to work on. 1. First cmd: the input is mltb.mkv so this cmd will work only on mkv videos and the output is mltb.mkv also so all outputs is mkv. -del will delete the original media after complete run of the cmd. 2. Second cmd: the input is mltb.video so this cmd will work on all videos and the output is only mltb so the extenstion is same as input files. diff --git a/bot/modules/ytdlp.py b/bot/modules/ytdlp.py index 9004c3f41..63273abd3 100644 --- a/bot/modules/ytdlp.py +++ b/bot/modules/ytdlp.py @@ -343,7 +343,10 @@ async def new_event(self): self.multi = 0 try: - self.ffmpeg_cmds = eval(args["-ff"]) + if args["-ff"].strip().startswith("["): + self.ffmpeg_cmds = eval(args["-ff"]) + else: + self.ffmpeg_cmds = args["-ff"] except Exception as e: self.ffmpeg_cmds = None LOGGER.error(e) diff --git a/config_sample.py b/config_sample.py index 7e0300a86..26039e14e 100644 --- a/config_sample.py +++ b/config_sample.py @@ -21,7 +21,7 @@ YT_DLP_OPTIONS = "" USE_SERVICE_ACCOUNTS = False NAME_SUBSTITUTE = "" -FFMPEG_CMDS = [] +FFMPEG_CMDS = {} # INKYPINKY DELETE_LINKS = False