Skip to content

Commit

Permalink
rollback to pyrogram1
Browse files Browse the repository at this point in the history
  • Loading branch information
BennyThink committed Oct 9, 2023
1 parent 9191b4e commit 82ba992
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 112 deletions.
8 changes: 4 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pyrogram==2.0.106
pyrogram==1.4.16
tgcrypto==1.2.5
yt-dlp==2023.9.24
yt-dlp==2023.10.7
APScheduler==3.10.4
beautifultable==1.1.0
ffmpeg-python==0.2.0
Expand All @@ -11,10 +11,10 @@ flower==2.0.1
psutil==5.9.5
influxdb==5.3.1
beautifulsoup4==4.12.2
fakeredis==2.18.1
fakeredis==2.19.0
supervisor==4.2.5
tgbot-ping==1.0.7
redis==5.0.0
redis==5.0.1
requests==2.31.0
tqdm==4.66.1
requests-toolbelt==1.0.0
Expand Down
2 changes: 0 additions & 2 deletions ytdlbot/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,7 @@ def __init__(self):
self.con = pymysql.connect(
host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl", charset="utf8mb4"
)
logging.debug("Used real MySQL connection.")
except pymysql.err.OperationalError:
logging.warning("Using fake MySQL connection.")
self.con = FakeMySQL()

self.con.ping(reconnect=True)
Expand Down
11 changes: 2 additions & 9 deletions ytdlbot/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,7 @@
import yt_dlp as ytdl
from tqdm import tqdm

from config import (
AUDIO_FORMAT,
ENABLE_ARIA2,
ENABLE_FFMPEG,
SS_YOUTUBE,
TG_MAX_SIZE,
IPv6,
)
from config import AUDIO_FORMAT, ENABLE_ARIA2, ENABLE_FFMPEG, TG_MAX_SIZE, IPv6, SS_YOUTUBE
from limit import Payment
from utils import adjust_formats, apply_log_formatter, current_time, sizeof_fmt

Expand All @@ -42,7 +35,7 @@


def edit_text(bot_msg, text: str):
key = f"{bot_msg.chat.id}-{bot_msg.id}"
key = f"{bot_msg.chat.id}-{bot_msg.message_id}"
# if the key exists, we shouldn't send edit message
if not r.exists(key):
time.sleep(random.random())
Expand Down
96 changes: 43 additions & 53 deletions ytdlbot/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

__author__ = "Benny <[email protected]>"

import asyncio
import logging
import math
import os
Expand All @@ -16,7 +15,6 @@
import re
import shutil
import subprocess
import sys
import tempfile
import threading
import time
Expand All @@ -32,7 +30,7 @@
from apscheduler.schedulers.background import BackgroundScheduler
from celery import Celery
from celery.worker.control import Panel
from pyrogram import Client, enums, idle, types
from pyrogram import Client, idle, types
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor

Expand All @@ -45,10 +43,10 @@
ENABLE_QUEUE,
ENABLE_VIP,
OWNER,
RATE_LIMIT,
RCLONE_PATH,
RATE_LIMIT,
WORKERS,
TMPFILE_PATH
TMPFILE_PATH,
)
from constant import BotText
from database import Redis
Expand All @@ -74,31 +72,32 @@
redis = Redis()
channel = Channel()

bot = create_app("ytdl-celery")
session = "ytdl-celery"
celery_client = create_app(session)


def get_messages(chat_id, message_id):
try:
return bot.get_messages(chat_id, message_id)
return celery_client.get_messages(chat_id, message_id)
except ConnectionError as e:
logging.critical("WTH!!! %s", e)
bot.start()
return bot.get_messages(chat_id, message_id)
celery_client.start()
return celery_client.get_messages(chat_id, message_id)


@app.task(rate_limit=f"{RATE_LIMIT}/m")
def ytdl_download_task(chat_id, message_id, url: str):
logging.info("YouTube celery tasks started for %s", url)
bot_msg = get_messages(chat_id, message_id)
ytdl_normal_download(bot, bot_msg, url)
ytdl_normal_download(celery_client, bot_msg, url)
logging.info("YouTube celery tasks ended.")


@app.task()
def audio_task(chat_id, message_id):
logging.info("Audio celery tasks started for %s-%s", chat_id, message_id)
bot_msg = get_messages(chat_id, message_id)
normal_audio(bot, bot_msg)
normal_audio(celery_client, bot_msg)
logging.info("Audio celery tasks ended.")


Expand All @@ -118,7 +117,7 @@ def get_unique_clink(original_url: str, user_id: int):
def direct_download_task(chat_id, message_id, url):
logging.info("Direct download celery tasks started for %s", url)
bot_msg = get_messages(chat_id, message_id)
direct_normal_download(bot, bot_msg, url)
direct_normal_download(celery_client, bot_msg, url)
logging.info("Direct download celery tasks ended.")


Expand Down Expand Up @@ -148,8 +147,8 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
return
mode = mode or payment.get_user_settings(chat_id)[-1]
if ENABLE_CELERY and mode in [None, "Celery"]:
async_task(ytdl_download_task, chat_id, bot_msg.id, url)
# ytdl_download_task.delay(chat_id, bot_msg.id, url)
async_task(ytdl_download_task, chat_id, bot_msg.message_id, url)
# ytdl_download_task.delay(chat_id, bot_msg.message_id, url)
else:
ytdl_normal_download(client, bot_msg, url)
except Exception as e:
Expand All @@ -160,15 +159,15 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
def direct_download_entrance(client: Client, bot_msg: typing.Union[types.Message, typing.Coroutine], url: str):
if ENABLE_CELERY:
direct_normal_download(client, bot_msg, url)
# direct_download_task.delay(bot_msg.chat.id, bot_msg.id, url)
# direct_download_task.delay(bot_msg.chat.id, bot_msg.message_id, url)
else:
direct_normal_download(client, bot_msg, url)


def audio_entrance(client, bot_msg):
if ENABLE_CELERY:
async_task(audio_task, bot_msg.chat.id, bot_msg.id)
# audio_task.delay(bot_msg.chat.id, bot_msg.id)
async_task(audio_task, bot_msg.chat.id, bot_msg.message_id)
# audio_task.delay(bot_msg.chat.id, bot_msg.message_id)
else:
normal_audio(client, bot_msg)

Expand Down Expand Up @@ -207,7 +206,7 @@ def direct_normal_download(client: Client, bot_msg: typing.Union[types.Message,
logging.info("Downloaded file %s", filename)
st_size = os.stat(filepath).st_size

client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
client.send_chat_action(chat_id, "upload_document")
client.send_document(
bot_msg.chat.id,
filepath,
Expand All @@ -226,11 +225,11 @@ def normal_audio(client: Client, bot_msg: typing.Union[types.Message, typing.Cor
)
orig_url: str = re.findall(r"https?://.*", bot_msg.caption)[0]
with tempfile.TemporaryDirectory(prefix="ytdl-", dir=TMPFILE_PATH) as tmp:
client.send_chat_action(chat_id, enums.ChatAction.RECORD_AUDIO)
client.send_chat_action(chat_id, "record_audio")
# just try to download the audio using yt-dlp
filepath = ytdl_download(orig_url, tmp, status_msg, hijack="bestaudio[ext=m4a]")
status_msg.edit_text("Sending audio now...")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_AUDIO)
client.send_chat_action(chat_id, "upload_audio")
for f in filepath:
client.send_audio(chat_id, f)
status_msg.edit_text("✅ Conversion complete.")
Expand Down Expand Up @@ -266,18 +265,18 @@ def ytdl_normal_download(client: Client, bot_msg: typing.Union[types.Message, ty

video_paths = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
client.send_chat_action(chat_id, "upload_document")
bot_msg.edit_text("Download complete. Sending now...")
try:
upload_processor(client, bot_msg, url, video_paths)
except pyrogram.errors.FloodWait as e:
except pyrogram.errors.Flood as e:
logging.critical("FloodWait from Telegram: %s", e)
client.send_message(
chat_id,
f"I'm being rate limited by Telegram. Your video will come after {e.value} seconds. Please wait patiently.",
f"I'm being rate limited by Telegram. Your video will come after {e.x} seconds. Please wait patiently.",
)
flood_owner_message(client, e)
time.sleep(e.value)
time.sleep(e.x)
upload_processor(client, bot_msg, url, video_paths)

bot_msg.edit_text("Download success!✅")
Expand Down Expand Up @@ -409,7 +408,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
redis.add_send_cache(unique, getattr(obj, "file_id", None))
redis.update_metrics("video_success")
if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path):
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.id)
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id)
return res_msg


Expand Down Expand Up @@ -451,8 +450,14 @@ def gen_cap(bm, url, video_path):

def gen_video_markup():
markup = InlineKeyboardMarkup(
[[InlineKeyboardButton("convert to audio", callback_data="convert")]]
) # First row # Generates a callback query when pressed
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"convert to audio", callback_data="convert"
)
]
]
)
return markup


Expand Down Expand Up @@ -502,43 +507,28 @@ def async_task(task_name, *args):
task_name.apply_async(args=args, queue=destination)


def run_celery():
worker_name = os.getenv("WORKER_NAME", "")
argv = ["-A", "tasks", "worker", "--loglevel=info", "--pool=threads", f"--concurrency={WORKERS}", "-n", worker_name]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv)


def purge_tasks():
count = app.control.purge()
return f"purged {count} tasks."


def run_celery():
# 创建一个新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
worker_name = os.getenv("WORKER_NAME", "")
argv = [
"-A",
"tasks",
"worker",
"--loglevel=info",
"--pool=threads",
f"--concurrency={WORKERS}",
"-n",
worker_name,
]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv)
except:
logging.warning("Celery worker failed to start.")
sys.exit(1)


if __name__ == "__main__":
# celery_client.start()
print("Bootstrapping Celery worker now.....")
time.sleep(5)
threading.Thread(target=run_celery, daemon=True).start()

scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
scheduler.add_job(auto_restart, "interval", seconds=120)
scheduler.add_job(auto_restart, "interval", seconds=900)
scheduler.start()

idle()
bot.stop()
celery_client.stop()
20 changes: 1 addition & 19 deletions ytdlbot/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
import logging
import os
import pathlib
import re
import shutil
import subprocess
import tempfile
import time
import uuid
from datetime import datetime

import coloredlogs
import ffmpeg
import psutil

from flower_tasks import app

from config import TMPFILE_PATH
from flower_tasks import app

inspect = app.control.inspect()

Expand Down Expand Up @@ -193,21 +190,6 @@ def next_salt_detector(self):
# logging.warning("Potential crash detected by %s, it's time to commit suicide...", self.func_name())
# return True

def fail_connect_detector(self):
# TODO: don't know why sometimes it stops connected to DC
last_line = self.logs.strip().split("\n")[-1]
try:
log_time_str = re.findall(r"\[(.*),", last_line)[0]
log_time = datetime.strptime(log_time_str, "%Y-%m-%d %H:%M:%S")
except Exception:
return

time_difference = (datetime.now() - log_time).total_seconds()

if ("Sending as video" in last_line or "PingTask started" in last_line) and time_difference > 60:
logging.warning("Can't connect to Telegram DC")
return True


def auto_restart():
log_path = "/var/log/ytdl.log"
Expand Down
Loading

0 comments on commit 82ba992

Please sign in to comment.