Skip to content

Commit

Permalink
update ffmpeg_s.py
Browse files Browse the repository at this point in the history
  • Loading branch information
5hojib committed Dec 8, 2024
1 parent 19be259 commit d27f2b9
Showing 1 changed file with 65 additions and 87 deletions.
152 changes: 65 additions & 87 deletions bot/helper/aeon_utils/ffmpeg_s.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from os import cpu_count
from os import path as ospath
from time import time
from typing import List, Optional, Tuple, Union

import aiofiles.os
import aioshutil
from aiofiles.os import makedirs

Expand All @@ -16,65 +18,50 @@

LOGGER = logging.getLogger(__name__)


class MediaInfoError(Exception):
"""Custom exception for media info extraction errors."""
pass


async def extract_media_info(path: str) -> tuple[int, str | None, str | None]:
async def extract_media_info(path: str) -> Tuple[int, Optional[str], Optional[str]]:
"""
Extract media information using ffprobe.
Args:
path (str): Path to the media file.
Returns:
Tuple containing duration, artist, and title.
Raises:
MediaInfoError: If media info cannot be extracted.
"""
try:
result = await cmd_exec(
[
"ffprobe",
"-hide_banner",
"-loglevel",
"error",
"-print_format",
"json",
"-show_format",
path,
]
)

result = await cmd_exec([
"ffprobe",
"-hide_banner",
"-loglevel", "error",
"-print_format", "json",
"-show_format",
path
])

if result[1]:
LOGGER.warning(f"Get Media Info: {result[1]}")

media_data = literal_eval(result[0])
media_format = media_data.get("format")

if media_format is None:
raise MediaInfoError(f"No format information found: {result}")

duration = round(float(media_format.get("duration", 0)))
tags = media_format.get("tags", {})

artist = next(
(
tags.get(key)
for key in ["artist", "ARTIST", "Artist"]
if tags.get(key)
),
None,
)
title = next(
(tags.get(key) for key in ["title", "TITLE", "Title"] if tags.get(key)),
None,
)


artist = next((tags.get(key) for key in ["artist", "ARTIST", "Artist"] if tags.get(key)), None)
title = next((tags.get(key) for key in ["title", "TITLE", "Title"] if tags.get(key)), None)

return duration, artist, title

except Exception as e:
LOGGER.error(f"Get Media Info Error: {e}")
raise MediaInfoError(f"Failed to extract media info: {e}")
Expand Down Expand Up @@ -205,13 +192,12 @@ class SampleVideoCreator:
"""
Create sample videos with configurable parameters.
"""

def __init__(
self,
listener,
duration: int,
part_duration: int,
gid: str,
self,
listener,
duration: int,
part_duration: int,
gid: str
):
self.listener = listener
self.path = ""
Expand All @@ -224,48 +210,41 @@ def __init__(
self._progress_tracker = ProgressTracker()

async def create_sample(
self,
video_file: str,
on_file: bool = False,
) -> str | bool:
self,
video_file: str,
on_file: bool = False
) -> Union[str, bool]:
"""
Create a sample video from the input file.
Args:
video_file (str): Path to the source video
on_file (bool, optional): Whether to process in-place. Defaults to False.
Returns:
Union[str, bool]: New directory path or success status
"""
self.path = video_file
dir_path, name = video_file.rsplit("/", 1)
self.outfile = ospath.join(dir_path, f"SAMPLE.{name}")

# Compute video segments
segments = self._compute_segments(video_file)

segments = await self._compute_segments(video_file)
# Build complex filter for video processing
filter_complex = self._build_filter_complex(segments)

cmd = [
"ffmpeg",
"-hide_banner",
"-i",
video_file,
"-filter_complex",
filter_complex,
"-map",
"[vout]",
"-map",
"[aout]",
"-c:v",
"libx264",
"-c:a",
"aac",
"-threads",
str(cpu_count() // 2),
self.outfile,
"xtra",
"-hide_banner",
"-i", video_file,
"-filter_complex", filter_complex,
"-map", "[vout]",
"-map", "[aout]",
"-c:v", "libx264",
"-c:a", "aac",
"-threads", str(cpu_count()//2),
self.outfile
]

# Early cancellation check
Expand All @@ -275,47 +254,46 @@ async def create_sample(
# Prepare metadata
self.name = ospath.basename(video_file)
self.size = await get_path_size(video_file)

# Execute processing
self.listener.subproc = await asyncio.create_subprocess_exec(
*cmd, stderr=asyncio.subprocess.PIPE
)

self.listener.subproc = await asyncio.create_subprocess_exec(*cmd, stderr=asyncio.subprocess.PIPE)

_, code = await asyncio.gather(
self._progress_tracker.track_progress(
self.listener.subproc.stderr,
self.path,
self.listener.subproc,
time(),
self.listener.subproc.stderr,
self.path,
self.listener.subproc,
time()
),
self.listener.subproc.wait(),
self.listener.subproc.wait()
)

# Process results
return await self._handle_result(code, video_file, name, on_file)

def _compute_segments(self, video_file: str) -> list[tuple[float, float]]:
async def _compute_segments(self, video_file: str) -> List[Tuple[float, float]]:
"""
Compute video segments for sampling.
Args:
video_file (str): Path to the source video
Returns:
List of segment tuples (start_time, end_time)
"""
duration = extract_media_info(video_file)[0]
# Correctly await and extract duration
duration, _, _ = await extract_media_info(video_file)
segments = [(0, self._part_duration)]

remaining_duration = duration - (self._part_duration * 2)
parts = (self._duration - (self._part_duration * 2)) // self._part_duration
time_interval = remaining_duration // parts

next_segment = time_interval
for _ in range(parts):
segments.append((next_segment, next_segment + self._part_duration))
next_segment += time_interval

segments.append((duration - self._part_duration, duration))
return segments

Expand Down

0 comments on commit d27f2b9

Please sign in to comment.