Skip to content

Object oriented AMQP layer for microservices communication.

License

Notifications You must be signed in to change notification settings

budziam/amqp-ko-python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

AMQP Kø

Object oriented AMQP layer for microservices communication.

Usage

The recommended way to use AMQP Kø is to create your own queue object. The simplest way to do this is using createQueue function.

Create queue

from amqp_ko import create_queue, AsyncConnection, Message, MessageGate
from dataclasses import dataclass


@dataclass(frozen=True)
class TopicFollow(Message):
    user_id: int
    topic_name: str


def unmarshal_topic_follow(data: dict) -> TopicFollow:
    return TopicFollow(
        user_id=data["user_id"],
        topic_name=data["topic_name"],
    )

message_gates = [
    MessageGate("topic_follow", TopicFollow, unmarshal_topic_follow),
]

async with AsyncConnection("localhost", 5672, "rabbitmq", "rabbitmq") as connection:
    queue = await create_queue(connection, "exchange-name", message_gates)

Consume messages

from amqp_ko import Consumer, Job


class ConnectUserWithTopic(Consumer):
    async def consume(self, job: Job[TopicFollow]):
        # Put here some code to connect user with a topic
        # using "job.message.user_id" and "job.message.topic_name"
        await job.ack()
        
await queue.consume(
    "queue-name",
    {TopicFollow: ConnectUserWithTopic()},
)

Produce message

message = TopicFollow(120, "entertainment")
await queue.produce(message)

Installation

pip install amqp-ko