From d27f2b9916a7c44653fce60472b09cff7f64321e Mon Sep 17 00:00:00 2001 From: 5hojib Date: Sun, 8 Dec 2024 21:09:30 +0600 Subject: [PATCH] update ffmpeg_s.py --- bot/helper/aeon_utils/ffmpeg_s.py | 152 +++++++++++++----------------- 1 file changed, 65 insertions(+), 87 deletions(-) diff --git a/bot/helper/aeon_utils/ffmpeg_s.py b/bot/helper/aeon_utils/ffmpeg_s.py index a7ee67536..f42e6056f 100644 --- a/bot/helper/aeon_utils/ffmpeg_s.py +++ b/bot/helper/aeon_utils/ffmpeg_s.py @@ -7,7 +7,9 @@ from os import cpu_count from os import path as ospath from time import time +from typing import List, Optional, Tuple, Union +import aiofiles.os import aioshutil from aiofiles.os import makedirs @@ -16,65 +18,50 @@ LOGGER = logging.getLogger(__name__) - class MediaInfoError(Exception): """Custom exception for media info extraction errors.""" + pass - -async def extract_media_info(path: str) -> tuple[int, str | None, str | None]: +async def extract_media_info(path: str) -> Tuple[int, Optional[str], Optional[str]]: """ Extract media information using ffprobe. - + Args: path (str): Path to the media file. - + Returns: Tuple containing duration, artist, and title. - + Raises: MediaInfoError: If media info cannot be extracted. """ try: - result = await cmd_exec( - [ - "ffprobe", - "-hide_banner", - "-loglevel", - "error", - "-print_format", - "json", - "-show_format", - path, - ] - ) - + result = await cmd_exec([ + "ffprobe", + "-hide_banner", + "-loglevel", "error", + "-print_format", "json", + "-show_format", + path + ]) + if result[1]: LOGGER.warning(f"Get Media Info: {result[1]}") - + media_data = literal_eval(result[0]) media_format = media_data.get("format") - + if media_format is None: raise MediaInfoError(f"No format information found: {result}") - + duration = round(float(media_format.get("duration", 0))) tags = media_format.get("tags", {}) - - artist = next( - ( - tags.get(key) - for key in ["artist", "ARTIST", "Artist"] - if tags.get(key) - ), - None, - ) - title = next( - (tags.get(key) for key in ["title", "TITLE", "Title"] if tags.get(key)), - None, - ) - + + artist = next((tags.get(key) for key in ["artist", "ARTIST", "Artist"] if tags.get(key)), None) + title = next((tags.get(key) for key in ["title", "TITLE", "Title"] if tags.get(key)), None) + return duration, artist, title - + except Exception as e: LOGGER.error(f"Get Media Info Error: {e}") raise MediaInfoError(f"Failed to extract media info: {e}") @@ -205,13 +192,12 @@ class SampleVideoCreator: """ Create sample videos with configurable parameters. """ - def __init__( - self, - listener, - duration: int, - part_duration: int, - gid: str, + self, + listener, + duration: int, + part_duration: int, + gid: str ): self.listener = listener self.path = "" @@ -224,48 +210,41 @@ def __init__( self._progress_tracker = ProgressTracker() async def create_sample( - self, - video_file: str, - on_file: bool = False, - ) -> str | bool: + self, + video_file: str, + on_file: bool = False + ) -> Union[str, bool]: """ Create a sample video from the input file. - + Args: video_file (str): Path to the source video on_file (bool, optional): Whether to process in-place. Defaults to False. - + Returns: Union[str, bool]: New directory path or success status """ self.path = video_file dir_path, name = video_file.rsplit("/", 1) self.outfile = ospath.join(dir_path, f"SAMPLE.{name}") - + # Compute video segments - segments = self._compute_segments(video_file) - + segments = await self._compute_segments(video_file) + # Build complex filter for video processing filter_complex = self._build_filter_complex(segments) - + cmd = [ - "ffmpeg", - "-hide_banner", - "-i", - video_file, - "-filter_complex", - filter_complex, - "-map", - "[vout]", - "-map", - "[aout]", - "-c:v", - "libx264", - "-c:a", - "aac", - "-threads", - str(cpu_count() // 2), - self.outfile, + "xtra", + "-hide_banner", + "-i", video_file, + "-filter_complex", filter_complex, + "-map", "[vout]", + "-map", "[aout]", + "-c:v", "libx264", + "-c:a", "aac", + "-threads", str(cpu_count()//2), + self.outfile ] # Early cancellation check @@ -275,47 +254,46 @@ async def create_sample( # Prepare metadata self.name = ospath.basename(video_file) self.size = await get_path_size(video_file) - + # Execute processing - self.listener.subproc = await asyncio.create_subprocess_exec( - *cmd, stderr=asyncio.subprocess.PIPE - ) - + self.listener.subproc = await asyncio.create_subprocess_exec(*cmd, stderr=asyncio.subprocess.PIPE) + _, code = await asyncio.gather( self._progress_tracker.track_progress( - self.listener.subproc.stderr, - self.path, - self.listener.subproc, - time(), + self.listener.subproc.stderr, + self.path, + self.listener.subproc, + time() ), - self.listener.subproc.wait(), + self.listener.subproc.wait() ) # Process results return await self._handle_result(code, video_file, name, on_file) - def _compute_segments(self, video_file: str) -> list[tuple[float, float]]: + async def _compute_segments(self, video_file: str) -> List[Tuple[float, float]]: """ Compute video segments for sampling. - + Args: video_file (str): Path to the source video - + Returns: List of segment tuples (start_time, end_time) """ - duration = extract_media_info(video_file)[0] + # Correctly await and extract duration + duration, _, _ = await extract_media_info(video_file) segments = [(0, self._part_duration)] - + remaining_duration = duration - (self._part_duration * 2) parts = (self._duration - (self._part_duration * 2)) // self._part_duration time_interval = remaining_duration // parts - + next_segment = time_interval for _ in range(parts): segments.append((next_segment, next_segment + self._part_duration)) next_segment += time_interval - + segments.append((duration - self._part_duration, duration)) return segments