Skip to content

Commit

Permalink
fix: better connection of rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-zahedi committed May 15, 2024
1 parent cc84c79 commit eb8628d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 27 deletions.
38 changes: 28 additions & 10 deletions django_telethon/management/commands/runtelegram.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
import asyncio
import logging
import os
import signal

from django.core.management.base import BaseCommand

from django_telethon.default_settings import RABBITMQ_ACTIVE
from django_telethon.rabbitmq import connect_rabbitmq
from django_telethon.rabbitmq import consume_rabbitmq
from django_telethon.utils import connect_clients, re_connect_clients


async def _entry_point():
async def _run_forever():
try:
await connect_clients()
except Exception as e:
logging.exception(e, exc_info=True)

while True:
if RABBITMQ_ACTIVE:
try:
await connect_rabbitmq()
except Exception as e:
logging.exception(e, exc_info=True)

await asyncio.sleep(30)
try:
await re_connect_clients()
Expand All @@ -31,11 +25,35 @@ async def _entry_point():
logging.exception(e, exc_info=True)


async def _main():
loop = asyncio.get_running_loop()

stop = loop.create_future()

for s in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(s, stop.set_result, None)

task_telegram = asyncio.create_task(_run_forever())
task_rabbitmq = asyncio.create_task(consume_rabbitmq())

await stop

task_telegram.cancel()
task_rabbitmq.cancel()
try:
await task_telegram
await task_rabbitmq
except asyncio.CancelledError:
logging.info("Tasks was cancelled")

logging.info("Shutting down gracefully")


class Command(BaseCommand):
help = 'Run telegram'

def handle(self, *args, **options):
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
asyncio.run(_entry_point())
asyncio.run(_main())
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "false"
self.stdout.write(self.style.SUCCESS('Successfully finished run telegram client'))
30 changes: 13 additions & 17 deletions django_telethon/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,31 @@
from django_telethon.default_settings import (
QUEUE_CALLBACK_FN,
QUEUE_CHANNEL_NAME,
RABBITMQ_URL,
RABBITMQ_URL, RABBITMQ_ACTIVE,
)

__all__ = [
"send_to_telegra_thread",
]

_rabbit_registered = False

async def process_message(message: aio_pika.IncomingMessage):
try:
async with message.process():
await QUEUE_CALLBACK_FN(message.body)
except Exception as e:
logging.exception(f"Failed to process message: {e}")

async def connect_rabbitmq():
global _rabbit_registered

if _rabbit_registered:
async def consume_rabbitmq():
if not RABBITMQ_ACTIVE:
return

_rabbit_registered = True

connection = await aio_pika.connect_robust(RABBITMQ_URL)

# Creating a channel
channel = await connection.channel()

# Declare a queue to make sure it exists. If the queue is already there this won't do anything.
queue = await channel.declare_queue(QUEUE_CHANNEL_NAME, durable=True)

# Creating a consumer callback
async def on_message(message: aio_pika.IncomingMessage):
async with message.process():
await QUEUE_CALLBACK_FN(message.body)

await queue.consume(on_message)
await queue.consume(process_message)


def send_to_telegra_thread(**payload):
Expand Down

0 comments on commit eb8628d

Please sign in to comment.