diff --git a/bot/helper/aeon_utils/caption_gen.py b/bot/helper/aeon_utils/caption_gen.py index 0dc6987e4..3ff3bb057 100644 --- a/bot/helper/aeon_utils/caption_gen.py +++ b/bot/helper/aeon_utils/caption_gen.py @@ -19,53 +19,53 @@ def __missing__(self, key): return "Unknown" -async def generate_caption(file, dirpath, lcaption): - up_path = os.path.join(dirpath, file) +async def generate_caption(filename, directory, caption_template): + file_path = os.path.join(directory, filename) try: - result = await cmd_exec(["mediainfo", "--Output=JSON", up_path]) + result = await cmd_exec(["mediainfo", "--Output=JSON", file_path]) if result[1]: - LOGGER.info(f"Get Media Info: {result[1]}") + LOGGER.info(f"MediaInfo command output: {result[1]}") - mediainfo_result = json.loads(result[0]) # Parse JSON output - except Exception as e: - LOGGER.error(f"Media Info: {e}. Mostly File not found!") - return file + mediainfo_data = json.loads(result[0]) # Parse JSON output + except Exception as error: + LOGGER.error(f"Failed to retrieve media info: {error}. File may not exist!") + return filename - media = mediainfo_result.get("media", {}) - track = media.get("track", []) - video_info = next((t for t in track if t["@type"] == "Video"), {}) - audio_info = [t for t in track if t["@type"] == "Audio"] - subtitle_info = [t for t in track if t["@type"] == "Text"] + media_data = mediainfo_data.get("media", {}) + track_data = media_data.get("track", []) + video_metadata = next((track for track in track_data if track["@type"] == "Video"), {}) + audio_metadata = [track for track in track_data if track["@type"] == "Audio"] + subtitle_metadata = [track for track in track_data if track["@type"] == "Text"] - duration = round(float(video_info.get("Duration", 0)) / 1000) - qual = get_video_quality(video_info.get("Height")) + video_duration = video_metadata.get("Duration", 0) + video_quality = get_video_quality(video_metadata.get("Height")) - lang = ", ".join( - update_language("", audio) for audio in audio_info if audio.get("Language") + audio_languages = ", ".join( + parse_audio_language("", audio) for audio in audio_metadata if audio.get("Language") ) - stitles = ", ".join( - update_subtitles("", subtitle) - for subtitle in subtitle_info + subtitle_languages = ", ".join( + parse_subtitle_language("", subtitle) + for subtitle in subtitle_metadata if subtitle.get("Language") ) - lang = lang if lang else "Unknown" - stitles = stitles if stitles else "Unknown" - qual = qual if qual else "Unknown" - md5_hex = calculate_md5(up_path) - - caption_dict = DefaultDict( - filename=file, - size=get_readable_file_size(await aiopath.getsize(up_path)), - duration=get_readable_time(duration, True), - quality=qual, - audios=lang, - subtitles=stitles, - md5_hash=md5_hex, + audio_languages = audio_languages if audio_languages else "Unknown" + subtitle_languages = subtitle_languages if subtitle_languages else "Unknown" + video_quality = video_quality if video_quality else "Unknown" + file_md5_hash = calculate_md5(file_path) + + caption_data = DefaultDict( + filename=filename, + size=get_readable_file_size(await aiopath.getsize(file_path)), + duration=get_readable_time(video_duration, include_seconds=True), + quality=video_quality, + audios=audio_languages, + subtitles=subtitle_languages, + md5_hash=file_md5_hash, ) - return lcaption.format_map(caption_dict) + return caption_template.format_map(caption_data) def get_video_quality(height): @@ -78,35 +78,37 @@ def get_video_quality(height): 4320: "4320p", 8640: "8640p", } - for h, q in sorted(quality_map.items()): - if height and int(height) <= h: - return q + for threshold, quality in sorted(quality_map.items()): + if height and int(height) <= threshold: + return quality return "Unknown" -def update_language(lang, stream): - language_code = stream.get("Language") +def parse_audio_language(existing_languages, audio_stream): + language_code = audio_stream.get("Language") if language_code: with suppress(Exception): language_name = Language.get(language_code).display_name() - if language_name not in lang: - lang += f"{language_name}, " - return lang + if language_name not in existing_languages: + LOGGER.debug(f"Parsed audio language: {language_name}") + existing_languages += f"{language_name}, " + return existing_languages.strip(", ") -def update_subtitles(stitles, stream): - subtitle_code = stream.get("Language") +def parse_subtitle_language(existing_subtitles, subtitle_stream): + subtitle_code = subtitle_stream.get("Language") if subtitle_code: with suppress(Exception): subtitle_name = Language.get(subtitle_code).display_name() - if subtitle_name not in stitles: - stitles += f"{subtitle_name}, " - return stitles - - -def calculate_md5(filepath): - hash_md5 = md5() - with open(filepath, "rb") as f: - for byte_block in iter(lambda: f.read(4096), b""): - hash_md5.update(byte_block) - return hash_md5.hexdigest() + if subtitle_name not in existing_subtitles: + LOGGER.debug(f"Parsed subtitle language: {subtitle_name}") + existing_subtitles += f"{subtitle_name}, " + return existing_subtitles.strip(", ") + + +def calculate_md5(file_path): + md5_hash = md5() + with open(file_path, "rb") as file: + for chunk in iter(lambda: file.read(4096), b""): + md5_hash.update(chunk) + return md5_hash.hexdigest() \ No newline at end of file