Skip to content
This repository was archived by the owner on Feb 8, 2024. It is now read-only.

Message Bus Framework

selvakumaar5496 edited this page Dec 28, 2020 · 76 revisions

This wiki is out-of-date at the moment. Please refer to this document for Message Bus Specification

Overview

Message Bus Framework provides an easy-to-use platform which can be used to send or receive messages across any component on a node to any other component on the other nodes, reliably and efficiently, using message queues protocols e.g. AMQP. It is a common framework and agnostic to the messaging software e.g. Kafka, Rabbit MQ, etc.

Typical list of features provided by Message Bus Framework

  • Send or Receive Messages
  • Python or REST Interfaces
  • Obtain Analytics about Message Queues

Message Bus Framework implements a python framework and can consumed directly by python programs. Components which are written in other programming languages, can use REST interface to send and receive messages.

Pre-requisite

Following setup has to be done before using message bus framework.

  • Message Server Setup

Message Server should be installed, configured and running on the system or in the cluster.

Package type Message Server Package or Configuration
Kafka Setup Kafka Server
RabbitMQ -
  • Message Client SW

Package type Message Server Package or Configuration
Kafka $ pip install confluent-kafka
RabbitMQ -
  • Message Bus Configuration

Configuration is stored in /etc/cortx/message_bus.conf

Parameter Description
message_broker Contains message broker specifications
type String that Represents the type of message broker
cluster Contains list of address(s) of message broker
server String that specifies ip address of the cluster
port String that specifies port number of the cluster
Example
{
   "message_broker":{
      "type":"kafka",
      "cluster":[
         {
            "server":"localhost",
            "port":"9092"
         }
      ]
   }
}

Instantiating Message Bus

A Message Bus instance is required to communicate and work with messages as shown in the example below.

MessageBus(message_servers, server_type)

Where:
message_servers : [Optional] List of "<host>:<port>" where Message Servers is active
server_type : [Optional] Message Server Type e.g. "Kafka" (default) or "RabbitMQ"

Example

from cortx.utils.message_bus import MessageBus

message_bus = MessageBus()

Sending Messages

To send a message you will need an instance of MessageProducer, as shown in the example below.

MessageProducer(message_bus, producer_id, message_type, method)

Where:
message_bus : Instance of MessageBus class
producer_id : String representing ID that uniquely identifies a producer
messge_type : This is essentially equivalent to the queue/topic name, e.g. "delete"
method[Optional] : Signifies if message needs to be sent in "sync" (default) or in "async" manner.

MessageProducer.send(message_list)

Where:
message_list : List of messages. Message object must be convertible to string.

Example:

from cortx.utils.message_bus import MessageProducer

id, type = "sspl_sensor", "Alert" 
producer = MessageProducer(message_bus, producer_id=id, message_type=type)
messages = ["This is message1", "This is message2"]
producer.send(messages)
producer.purge()

Receiving Messages

To receive a message you will need an instance of MessageConsumer, as shown in the example below.

MessageConsumer(message_bus, consumer_id, consumer_group, message_type, auto_ack, start_offset)

Where:
consumer_id : String representing ID that uniquely identifies a consumer
consumer_group : String representing Consumer Group ID. Group of consumers can process messages
message_type : This is essentially equivalent to the queue/topic name, e.g. "delete"
auto_ack[Optional] : True or False. Message should be automatically acknowledged (default is False)
offset[Optional] : Can be set to "earliest" (default) or "latest". ("earliest" will cause messages to be read from the beginning)

MessageConsumer.Receive()

Example:

from cortx.utils.message_bus import MessageConsumer 

id, group, type = "sspl_actuator", "cortx_monitor", "actuator_requests" 
consumer = MessageConsumer(message_bus, consumer_id=id, consumer_group=group, message_type=type)  
while True:
    try:
        message = consumer.receive()
        print(message)
        consumer.ack()
    except Exception as e:
        break