Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:sweeftdigital/accounts_G2 into dev
Browse files Browse the repository at this point in the history
nika-alaverdashvili committed Oct 29, 2024
2 parents ff6d9ff + fd11835 commit 41b17f5
Showing 1 changed file with 61 additions and 52 deletions.
113 changes: 61 additions & 52 deletions apis/rabbitmq/producer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import json
import os
import time
from typing import Any, Dict, Optional

import pika
from pika.exceptions import AMQPConnectionError, ChannelWrongStateError


class EventPublisher:
def __init__(
self, exchange_name: str, host: str, port: int, username: str, password: str
self,
exchange_name: str,
host: str,
port: int,
username: str,
password: str,
exchange_type: str = "direct",
):
"""
Initializes RabbitMQ connection with the given parameters
@@ -17,76 +24,77 @@ def __init__(
self._connection = None
self._channel = None
self._exchange_name = exchange_name
self.connect(host, port, username, password)
self._exchange_type = exchange_type
self._host = host
self._port = port
self._username = username
self._password = password
self._connect_with_retry()

def connect(self, host, port, username, password):
def _connect(self):
"""Establishes a connection to RabbitMQ and declares the exchange."""
while True:
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self._host,
port=self._port,
credentials=pika.PlainCredentials(self._username, self._password),
heartbeat=600,
blocked_connection_timeout=300,
)
)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange_name,
exchange_type=self._exchange_type,
durable=True,
auto_delete=False,
)

def _connect_with_retry(self, retries: int = 5, delay: int = 5):
for attempt in range(retries):
try:
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=host,
port=port,
credentials=pika.PlainCredentials(username, password),
heartbeat=0,
blocked_connection_timeout=300,
retry_delay=5, # Retry after 5 seconds
connection_attempts=5,
)
)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange_name,
exchange_type="direct", # Changed to direct
)
self._connect()
print("Connected to RabbitMQ")
break # Exit the loop if successful
return
except AMQPConnectionError:
print("Connection to RabbitMQ failed. Retrying in 5 seconds...")
time.sleep(5)
print(
f"Connection attempt {attempt + 1} failed. Retrying in {delay} seconds..."
)
time.sleep(delay)

def publish_event(self, body: any, headers: dict[str, str], routing_key: str):
raise ConnectionError("Failed to connect to RabbitMQ after multiple attempts.")

def publish_event(self, body: Any, headers: Dict[str, str], routing_key: str):
"""
Publishes an event to the Event-bus.
Parameters:
----------
body : any
body : Any
The body of the event to publish.
headers : dict[str, str]
headers : Dict[str, str]
The headers used by consumers for filtering.
routing_key : str
The routing key to determine which queue receives the message.
"""
content = json.dumps(body).encode()
print(
f"Publishing event with headers '{headers}', body '{content}', and routing key '{routing_key}'"
)
properties = pika.BasicProperties(headers=headers)

try:
if self._channel.is_open:
self._channel.basic_publish(
exchange=self._exchange_name,
routing_key=routing_key, # Specify the routing key here
body=content,
properties=pika.BasicProperties(headers=headers), # Sends headers
)
else:
print("Channel is closed. Attempting to reconnect...")
self.reconnect_and_publish(body, headers, routing_key)
except ChannelWrongStateError:
print("Channel is closed. Attempting to reconnect...")
self.reconnect_and_publish(body, headers, routing_key)

def reconnect_and_publish(self, body, headers, routing_key):
"""Reconnects and attempts to publish the event again."""
self.connect(
os.environ["RABBITMQ_HOST"],
int(os.environ["RABBITMQ_PORT"]),
os.environ["RABBITMQ_DEFAULT_USER"],
os.environ["RABBITMQ_DEFAULT_PASS"],
)
self.publish_event(body, headers, routing_key)
self._channel.basic_publish(
exchange=self._exchange_name,
routing_key=routing_key,
body=content,
properties=properties,
mandatory=True, # Ensures the message gets routed correctly
)
print(f"Publishing event with body: {body}, and headers: {headers}")
except (ChannelWrongStateError, AMQPConnectionError) as e:
print(
f"Failed to publish message due to {str(e)}. Attempting to reconnect..."
)
self._connect_with_retry()
self.publish_event(body, headers, routing_key)

def close(self):
"""Closes the channel and connection gracefully."""
@@ -97,6 +105,7 @@ def close(self):
print("Connection to RabbitMQ closed.")


# Usage example
event_publisher = EventPublisher(
exchange_name="event_bus",
host=os.environ["RABBITMQ_HOST"],

0 comments on commit 41b17f5

Please sign in to comment.