Skip to content

Commit

Permalink
new sample video func for status, Async pymongo instead motor, better…
Browse files Browse the repository at this point in the history
… log display
  • Loading branch information
5hojib committed Dec 8, 2024
1 parent 7d700d7 commit 2ac3095
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 104 deletions.
2 changes: 1 addition & 1 deletion bot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ def format(self, record: LogRecord) -> str:
else:
index_urls.append("")

PORT = environ.get("PORT")
PORT = environ.get("BASE_URL_PORT") or environ.get("PORT")
Popen(
f"gunicorn web.wserver:app --bind 0.0.0.0:{PORT} --worker-class gevent",
shell=True,
Expand Down
2 changes: 1 addition & 1 deletion bot/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ async def aeon_callback(_, query):

def parseline(line):
try:
return "[" + line.split("] [", 1)[1]
return line.split("] ", 1)[1]
except IndexError:
return line

Expand Down
153 changes: 153 additions & 0 deletions bot/helper/aeon_utils/ffmpeg_s.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from aiofiles.os import path as aiopath, makedirs
from aioshutil import move
from ast import literal_eval
from asyncio import create_subprocess_exec, gather, sleep, wait_for
from asyncio.subprocess import PIPE
from os import path as ospath, cpu_count
from PIL import Image
from pyrogram.types import Message
from re import search as re_search, findall as re_findall, split as re_split
from time import time

from bot import config_dict, subprocess_lock, LOGGER, DEFAULT_SPLIT_SIZE, FFMPEG_NAME
from bot.helper.ext_utils.bot_utils import cmd_exec, sync_to_async, is_premium_user
from bot.helper.ext_utils.files_utils import ARCH_EXT, get_mime_type, get_path_size, clean_target
from bot.helper.ext_utils.links_utils import get_url_name
from bot.helper.ext_utils.status_utils import get_readable_file_size
from bot.helper.ext_utils.telegraph_helper import TelePost


async def get_media_info(path):
try:
result = await cmd_exec(['ffprobe', '-hide_banner', '-loglevel', 'error', '-print_format', 'json', '-show_format', path])
if res := result[1]:
LOGGER.warning('Get Media Info: %s', res)
except Exception as e:
LOGGER.error('Get Media Info: %s. Mostly File not found!', e)
return 0, None, None
fields = literal_eval(result[0]).get('format')
if fields is None:
LOGGER.error('Get_media_info: %s', result)
return 0, None, None
duration = round(float(fields.get('duration', 0)))
tags = fields.get('tags', {})
artist = tags.get('artist') or tags.get('ARTIST') or tags.get('Artist')
title = tags.get('title') or tags.get('TITLE') or tags.get('Title')
return duration, artist, title

class FFProgress:
def __init__(self):
self.is_cancel = False
self._duration = 0
self._start_time = time()
self._eta = 0
self._percentage = '0%'
self._processed_bytes = 0

@property
def processed_bytes(self):
return self._processed_bytes

@property
def percentage(self):
return self._percentage

@property
def eta(self):
return self._eta

@property
def speed(self):
return self._processed_bytes / (time() - self._start_time)

async def readlines(self, stream):
data = bytearray()
while not stream.at_eof():
lines = re.split(br'[\r\n]+', data)
data[:] = lines.pop(-1)
for line in lines:
yield line
data.extend(await stream.read(1024))

async def progress(self, status: str=''):
start_time = time()
async for line in self.readlines(self.listener.suproc.stderr):
if self.is_cancel or self.listener.suproc == 'cancelled' or self.listener.suproc.returncode is not None:
return
if status == 'direct':
self._processed_bytes = await get_path_size(self.outfile)
await sleep(0.5)
continue
if progress := dict(re.findall(r'(frame|fps|size|time|bitrate|speed)\s*\=\s*(\S+)', line.decode('utf-8'))):
if not self._duration:
self._duration = (await get_media_info(self.path))[0]
hh, mm, sms = progress['time'].split(':')
time_to_second = (int(hh) * 3600) + (int(mm) * 60) + float(sms)
self._processed_bytes = int(progress['size'].rstrip('kB')) * 1024
self._percentage = f'{round((time_to_second / self._duration) * 100, 2)}%'
try:
self._eta = (self._duration / float(progress['speed'].strip('x'))) - ((time() - start_time))
except:
pass


class SampleVideo(FFProgress):
def __init__(self, listener, duration, part_duration, gid):
self.listener = listener
self.path = ''
self.name = ''
self.outfile = ''
self.size = 0
self._duration = duration
self._part_duration = part_duration
self._gid = gid
self._start_time = time()
super().__init__()

async def create(self, video_file: str, on_file: bool=False):
filter_complex = ''
self.path = video_file
dir, name = video_file.rsplit('/', 1)
self.outfile = ospath.join(dir, f'SAMPLE.{name}')
segments = [(0, self._part_duration)]
duration = (await get_media_info(video_file))[0]
remaining_duration = duration - (self._part_duration * 2)
parts = (self._duration - (self._part_duration * 2)) // self._part_duration
time_interval = remaining_duration // parts
next_segment = time_interval
for _ in range(parts):
segments.append((next_segment, next_segment + self._part_duration))
next_segment += time_interval
segments.append((duration - self._part_duration, duration))

for i, (start, end) in enumerate(segments):
filter_complex += f"[0:v]trim=start={start}:end={end},setpts=PTS-STARTPTS[v{i}]; "
filter_complex += f"[0:a]atrim=start={start}:end={end},asetpts=PTS-STARTPTS[a{i}]; "

for i in range(len(segments)):
filter_complex += f"[v{i}][a{i}]"

filter_complex += f"concat=n={len(segments)}:v=1:a=1[vout][aout]"

cmd = ['xtra', '-hide_banner', '-i', video_file, '-filter_complex', filter_complex, '-map', '[vout]',
'-map', '[aout]', '-c:v', 'libx264', '-c:a', 'aac', '-threads', f'{cpu_count()//2}', self.outfile]

if self.listener.suproc == 'cancelled':
return False

self.name, self.size = ospath.basename(video_file), await get_path_size(video_file)
self.listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE)
_, code = await gather(self.progress(), self.listener.suproc.wait())

if code == -9:
return False
if code == 0:
if on_file:
newDir, _ = ospath.splitext(video_file)
await makedirs(newDir, exist_ok=True)
await gather(move(video_file, ospath.join(newDir, name)), move(self.outfile, ospath.join(newDir, f'SAMPLE.{name}')))
return newDir
return True

LOGGER.error('%s. Something went wrong while creating sample video, mostly file is corrupted. Path: %s', (await self.listener.suproc.stderr.read()).decode().strip(), video_file)
return video_file
41 changes: 41 additions & 0 deletions bot/helper/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
user_data,
)

from .aeon_utils.ffmpeg_s import SampleVideo
from .ext_utils.bot_utils import get_size_bytes, new_task, sync_to_async
from .ext_utils.bulk_links import extract_bulk_links
from .ext_utils.exceptions import NotSupportedExtractionArchive
Expand Down Expand Up @@ -838,6 +839,46 @@ async def proceed_split(self, up_dir, m_size, o_files, gid):
m_size.append(f_size)
o_files.append(f_path)

async def generate_sample_video_x(self, dl_path, gid, unwanted_files, ft_delete):
data = self.sample_video.split(':') if isinstance(self.sampleVideo, str) else ''
if data:
sample_duration = int(data[0]) if data[0] else 60
part_duration = int(data[1]) if len(data) > 1 else 4
else:
sample_duration, part_duration = 60, 4

samvid = SampleVideo(self, sample_duration, part_duration, gid)

async with cpu_eater_lock:
checked = False
if await aiopath.isfile(dl_path):
if (await get_document_type(dl_path))[0]:
if not checked:
checked = True
LOGGER.info('Creating Sample video: %s', self.name)
async with task_dict_lock:
task_dict[self.mid] = FFmpegStatus(self, gid, 'Sample Video', samvid)
return await samvid.create(dl_path, True)
else:
for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):
for file_ in natsorted(files):
f_path = ospath.join(dirpath, file_)
if f_path in unwanted_files:
continue
if (await get_document_type(f_path))[0]:
if not checked:
checked = True
LOGGER.info('Creating Sample videos: %s', self.name)
async with task_dict_lock:
task_dict[self.mid] = FFMpegStatus(self, gid, 'Sample Video', samvid)
res = await samvid.create(f_path)
if res:
ft_delete.append(res)
if not res:
return res
return dl_path


async def generate_sample_video(self, dl_path, gid, unwanted_files, ft_delete):
data = (
self.sample_video.split(":")
Expand Down
7 changes: 4 additions & 3 deletions bot/helper/ext_utils/db_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from aiofiles.os import makedirs
from aiofiles.os import path as aiopath
from dotenv import dotenv_values
from motor.motor_asyncio import AsyncIOMotorClient
# from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import AsyncMongoClient
from pymongo.errors import PyMongoError
from pymongo.server_api import ServerApi

Expand All @@ -27,7 +28,7 @@ async def connect(self):
if config_dict["DATABASE_URL"]:
if self._conn is not None:
await self._conn.close()
self._conn = AsyncIOMotorClient(
self._conn = AsyncMongoClient(
config_dict["DATABASE_URL"],
server_api=ServerApi("1"),
)
Expand All @@ -47,7 +48,7 @@ async def disconnect(self):

async def db_load(self):
if self._db is None:
await self.connect()
await self.aconnect()
if self._return:
return
# Save bot settings
Expand Down
94 changes: 0 additions & 94 deletions bot/helper/ext_utils/media_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,100 +658,6 @@ async def create_sample_video(listener, video_file, sample_duration, part_durati
await remove(output_file)
return False

"""finished_segments = []
await makedirs(f"{dir}/segments/", exist_ok=True)
ext = name.rsplit(".", 1)[-1]
for index, (start_time, end_time) in enumerate(segments, start=1):
output_seg = f"{dir}/segments/segment{index}.{ext}"
cmd = [
"xtra",
"-hide_banner",
"-loglevel",
"error",
"-i",
video_file,
"-ss",
f"{start_time}",
"-to",
f"{end_time}",
"-c",
"copy",
output_seg,
]
if listener.is_cancelled:
return False
listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE)
_, stderr = await listener.subproc.communicate()
if listener.is_cancelled:
return False
code = listener.subproc.returncode
if code == -9:
listener.is_cancelled = True
return False
elif code != 0:
try:
stderr = stderr.decode().strip()
except Exception:
stderr = "Unable to decode the error!"
LOGGER.error(
f"{stderr}. Something went wrong while splitting file for sample video, mostly file is corrupted. Path: {video_file}"
)
if await aiopath.exists(output_file):
await remove(output_file)
return False
else:
finished_segments.append(f"file '{output_seg}'")
segments_file = f"{dir}/segments.txt"
async with aiopen(segments_file, "w+") as f:
await f.write("\n".join(finished_segments))
cmd = [
"xtra",
"-hide_banner",
"-loglevel",
"error",
"-f",
"concat",
"-safe",
"0",
"-i",
segments_file,
"-c:v",
"libx264",
"-c:a",
"aac",
"-threads",
f"{cpu_count() // 2}",
output_file,
]
if listener.is_cancelled:
return False
listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE)
_, stderr = await listener.subproc.communicate()
if listener.is_cancelled:
return False
code = listener.subproc.returncode
if code == -9:
listener.is_cancelled = True
return False
elif code != 0:
try:
stderr = stderr.decode().strip()
except Exception:
stderr = "Unable to decode the error!"
LOGGER.error(
f"{stderr}. Something went wrong while creating sample video, mostly file is corrupted. Path: {video_file}"
)
if await aiopath.exists(output_file):
await remove(output_file)
await gather(remove(segments_file), rmtree(f"{dir}/segments"))
return False
await gather(remove(segments_file), rmtree(f"{dir}/segments"))
return output_file"""
return None


async def run_ffmpeg_cmd(listener, ffmpeg, path):
base_name, ext = ospath.splitext(path)
Expand Down
1 change: 0 additions & 1 deletion bot/helper/ext_utils/status_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ async def get_readable_message(sid, is_user, page_no=1, status="All", page_step=
if tstatus not in [
MirrorStatus.STATUS_SPLIT,
MirrorStatus.STATUS_SEED,
MirrorStatus.STATUS_SAMVID,
MirrorStatus.STATUS_CONVERT,
MirrorStatus.STATUS_FFMPEG,
MirrorStatus.STATUS_QUEUEUP,
Expand Down
2 changes: 1 addition & 1 deletion bot/helper/listeners/task_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ async def on_download_complete(self):
self.size = await get_path_size(up_dir)

if self.sample_video:
up_path = await self.generate_sample_video(
up_path = await self.generate_sample_video_x(
up_path,
gid,
unwanted_files,
Expand Down
Loading

0 comments on commit 2ac3095

Please sign in to comment.