From 10f2affd2bd9036fcb7e7b99b24d7a1210873539 Mon Sep 17 00:00:00 2001 From: sandronadiradze Date: Mon, 28 Oct 2024 19:50:28 +0400 Subject: [PATCH 1/3] chore modify producer in accounts --- apis/rabbitmq/producer.py | 112 ++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 52 deletions(-) diff --git a/apis/rabbitmq/producer.py b/apis/rabbitmq/producer.py index 63f4983..0f35ffb 100644 --- a/apis/rabbitmq/producer.py +++ b/apis/rabbitmq/producer.py @@ -1,6 +1,7 @@ import json import os import time +from typing import Any, Dict, Optional import pika from pika.exceptions import AMQPConnectionError, ChannelWrongStateError @@ -8,7 +9,13 @@ class EventPublisher: def __init__( - self, exchange_name: str, host: str, port: int, username: str, password: str + self, + exchange_name: str, + host: str, + port: int, + username: str, + password: str, + exchange_type: str = "direct", ): """ Initializes RabbitMQ connection with the given parameters @@ -17,76 +24,76 @@ def __init__( self._connection = None self._channel = None self._exchange_name = exchange_name - self.connect(host, port, username, password) + self._exchange_type = exchange_type + self._host = host + self._port = port + self._username = username + self._password = password + self._connect_with_retry() - def connect(self, host, port, username, password): + def _connect(self): """Establishes a connection to RabbitMQ and declares the exchange.""" - while True: + self._connection = pika.BlockingConnection( + pika.ConnectionParameters( + host=self._host, + port=self._port, + credentials=pika.PlainCredentials(self._username, self._password), + heartbeat=600, + blocked_connection_timeout=300, + ) + ) + self._channel = self._connection.channel() + self._channel.exchange_declare( + exchange=self._exchange_name, + exchange_type=self._exchange_type, + durable=True, + auto_delete=True, + ) + + def _connect_with_retry(self, retries: int = 5, delay: int = 5): + for attempt in range(retries): try: - self._connection = pika.BlockingConnection( - pika.ConnectionParameters( - host=host, - port=port, - credentials=pika.PlainCredentials(username, password), - heartbeat=0, - blocked_connection_timeout=300, - retry_delay=5, # Retry after 5 seconds - connection_attempts=5, - ) - ) - self._channel = self._connection.channel() - self._channel.exchange_declare( - exchange=self._exchange_name, - exchange_type="direct", # Changed to direct - ) + self._connect() print("Connected to RabbitMQ") - break # Exit the loop if successful + return except AMQPConnectionError: - print("Connection to RabbitMQ failed. Retrying in 5 seconds...") - time.sleep(5) + print( + f"Connection attempt {attempt + 1} failed. Retrying in {delay} seconds..." + ) + time.sleep(delay) - def publish_event(self, body: any, headers: dict[str, str], routing_key: str): + raise ConnectionError("Failed to connect to RabbitMQ after multiple attempts.") + + def publish_event(self, body: Any, headers: Dict[str, str], routing_key: str): """ Publishes an event to the Event-bus. Parameters: ---------- - body : any + body : Any The body of the event to publish. - headers : dict[str, str] + headers : Dict[str, str] The headers used by consumers for filtering. routing_key : str The routing key to determine which queue receives the message. """ content = json.dumps(body).encode() - print( - f"Publishing event with headers '{headers}', body '{content}', and routing key '{routing_key}'" - ) + properties = pika.BasicProperties(headers=headers) try: - if self._channel.is_open: - self._channel.basic_publish( - exchange=self._exchange_name, - routing_key=routing_key, # Specify the routing key here - body=content, - properties=pika.BasicProperties(headers=headers), # Sends headers - ) - else: - print("Channel is closed. Attempting to reconnect...") - self.reconnect_and_publish(body, headers, routing_key) - except ChannelWrongStateError: - print("Channel is closed. Attempting to reconnect...") - self.reconnect_and_publish(body, headers, routing_key) - - def reconnect_and_publish(self, body, headers, routing_key): - """Reconnects and attempts to publish the event again.""" - self.connect( - os.environ["RABBITMQ_HOST"], - int(os.environ["RABBITMQ_PORT"]), - os.environ["RABBITMQ_DEFAULT_USER"], - os.environ["RABBITMQ_DEFAULT_PASS"], - ) - self.publish_event(body, headers, routing_key) + self._channel.basic_publish( + exchange=self._exchange_name, + routing_key=routing_key, + body=content, + properties=properties, + mandatory=True, # Ensures the message gets routed correctly + ) + except (ChannelWrongStateError, AMQPConnectionError) as e: + print( + f"Failed to publish message due to {str(e)}. Attempting to reconnect..." + ) + self._connect_with_retry() + self.publish_event(body, headers, routing_key) def close(self): """Closes the channel and connection gracefully.""" @@ -97,6 +104,7 @@ def close(self): print("Connection to RabbitMQ closed.") +# Usage example event_publisher = EventPublisher( exchange_name="event_bus", host=os.environ["RABBITMQ_HOST"], From 1384e5762844c4a0053d68b826f721dfcc7aa929 Mon Sep 17 00:00:00 2001 From: sandronadiradze Date: Mon, 28 Oct 2024 20:38:07 +0400 Subject: [PATCH 2/3] chore: add print statement after successfull message publish --- apis/rabbitmq/producer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/apis/rabbitmq/producer.py b/apis/rabbitmq/producer.py index 0f35ffb..c00255f 100644 --- a/apis/rabbitmq/producer.py +++ b/apis/rabbitmq/producer.py @@ -88,6 +88,7 @@ def publish_event(self, body: Any, headers: Dict[str, str], routing_key: str): properties=properties, mandatory=True, # Ensures the message gets routed correctly ) + print(f"Publishing event with body: {body}, and headers: {headers}") except (ChannelWrongStateError, AMQPConnectionError) as e: print( f"Failed to publish message due to {str(e)}. Attempting to reconnect..." From fd11835de01ee6d763a20ea0d30501e1306a693b Mon Sep 17 00:00:00 2001 From: sandronadiradze Date: Tue, 29 Oct 2024 12:20:21 +0400 Subject: [PATCH 3/3] chore: modify exchange auto_delete from true to false --- apis/rabbitmq/producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apis/rabbitmq/producer.py b/apis/rabbitmq/producer.py index c00255f..e4205cc 100644 --- a/apis/rabbitmq/producer.py +++ b/apis/rabbitmq/producer.py @@ -47,7 +47,7 @@ def _connect(self): exchange=self._exchange_name, exchange_type=self._exchange_type, durable=True, - auto_delete=True, + auto_delete=False, ) def _connect_with_retry(self, retries: int = 5, delay: int = 5):