Skip to content

Commit

Permalink
update caption_gen.py
Browse files Browse the repository at this point in the history
  • Loading branch information
5hojib committed Jan 7, 2025
1 parent 18553b5 commit ffb9731
Showing 1 changed file with 57 additions and 55 deletions.
112 changes: 57 additions & 55 deletions bot/helper/aeon_utils/caption_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,53 @@ def __missing__(self, key):
return "Unknown"


async def generate_caption(file, dirpath, lcaption):
up_path = os.path.join(dirpath, file)
async def generate_caption(filename, directory, caption_template):
file_path = os.path.join(directory, filename)

try:
result = await cmd_exec(["mediainfo", "--Output=JSON", up_path])
result = await cmd_exec(["mediainfo", "--Output=JSON", file_path])
if result[1]:
LOGGER.info(f"Get Media Info: {result[1]}")
LOGGER.info(f"MediaInfo command output: {result[1]}")

mediainfo_result = json.loads(result[0]) # Parse JSON output
except Exception as e:
LOGGER.error(f"Media Info: {e}. Mostly File not found!")
return file
mediainfo_data = json.loads(result[0]) # Parse JSON output
except Exception as error:
LOGGER.error(f"Failed to retrieve media info: {error}. File may not exist!")
return filename

media = mediainfo_result.get("media", {})
track = media.get("track", [])
video_info = next((t for t in track if t["@type"] == "Video"), {})
audio_info = [t for t in track if t["@type"] == "Audio"]
subtitle_info = [t for t in track if t["@type"] == "Text"]
media_data = mediainfo_data.get("media", {})
track_data = media_data.get("track", [])
video_metadata = next((track for track in track_data if track["@type"] == "Video"), {})
audio_metadata = [track for track in track_data if track["@type"] == "Audio"]
subtitle_metadata = [track for track in track_data if track["@type"] == "Text"]

duration = round(float(video_info.get("Duration", 0)) / 1000)
qual = get_video_quality(video_info.get("Height"))
video_duration = video_metadata.get("Duration", 0)
video_quality = get_video_quality(video_metadata.get("Height"))

lang = ", ".join(
update_language("", audio) for audio in audio_info if audio.get("Language")
audio_languages = ", ".join(
parse_audio_language("", audio) for audio in audio_metadata if audio.get("Language")
)
stitles = ", ".join(
update_subtitles("", subtitle)
for subtitle in subtitle_info
subtitle_languages = ", ".join(
parse_subtitle_language("", subtitle)
for subtitle in subtitle_metadata
if subtitle.get("Language")
)

lang = lang if lang else "Unknown"
stitles = stitles if stitles else "Unknown"
qual = qual if qual else "Unknown"
md5_hex = calculate_md5(up_path)

caption_dict = DefaultDict(
filename=file,
size=get_readable_file_size(await aiopath.getsize(up_path)),
duration=get_readable_time(duration, True),
quality=qual,
audios=lang,
subtitles=stitles,
md5_hash=md5_hex,
audio_languages = audio_languages if audio_languages else "Unknown"
subtitle_languages = subtitle_languages if subtitle_languages else "Unknown"
video_quality = video_quality if video_quality else "Unknown"
file_md5_hash = calculate_md5(file_path)

caption_data = DefaultDict(
filename=filename,
size=get_readable_file_size(await aiopath.getsize(file_path)),
duration=get_readable_time(video_duration, include_seconds=True),
quality=video_quality,
audios=audio_languages,
subtitles=subtitle_languages,
md5_hash=file_md5_hash,
)

return lcaption.format_map(caption_dict)
return caption_template.format_map(caption_data)


def get_video_quality(height):
Expand All @@ -78,35 +78,37 @@ def get_video_quality(height):
4320: "4320p",
8640: "8640p",
}
for h, q in sorted(quality_map.items()):
if height and int(height) <= h:
return q
for threshold, quality in sorted(quality_map.items()):
if height and int(height) <= threshold:
return quality
return "Unknown"


def update_language(lang, stream):
language_code = stream.get("Language")
def parse_audio_language(existing_languages, audio_stream):
language_code = audio_stream.get("Language")
if language_code:
with suppress(Exception):
language_name = Language.get(language_code).display_name()
if language_name not in lang:
lang += f"{language_name}, "
return lang
if language_name not in existing_languages:
LOGGER.debug(f"Parsed audio language: {language_name}")
existing_languages += f"{language_name}, "
return existing_languages.strip(", ")


def update_subtitles(stitles, stream):
subtitle_code = stream.get("Language")
def parse_subtitle_language(existing_subtitles, subtitle_stream):
subtitle_code = subtitle_stream.get("Language")
if subtitle_code:
with suppress(Exception):
subtitle_name = Language.get(subtitle_code).display_name()
if subtitle_name not in stitles:
stitles += f"{subtitle_name}, "
return stitles


def calculate_md5(filepath):
hash_md5 = md5()
with open(filepath, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
hash_md5.update(byte_block)
return hash_md5.hexdigest()
if subtitle_name not in existing_subtitles:
LOGGER.debug(f"Parsed subtitle language: {subtitle_name}")
existing_subtitles += f"{subtitle_name}, "
return existing_subtitles.strip(", ")


def calculate_md5(file_path):
md5_hash = md5()
with open(file_path, "rb") as file:
for chunk in iter(lambda: file.read(4096), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()

0 comments on commit ffb9731

Please sign in to comment.