-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrabbit.py
90 lines (70 loc) · 2.56 KB
/
rabbit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import argparse
import pika
import time
class RabbitMQ:
def __init__(self):
self.__connection = None
self.__channel = None
self.__queue = ''
self.__durable = False
def connect(self):
self.__connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.__channel = self.__connection.channel()
def make_persistent_messages(self):
self.__durable = True
def set_queue_name(self, queue_name):
self.__queue = queue_name
self.__channel.queue_declare(
queue=queue_name,
durable=self.__durable
)
def send_message(self, message):
self.__channel.basic_publish(
exchange='',
routing_key=self.__queue,
body=message,
properties=pika.BasicProperties(
delivery_mode=2
) if self.__durable else None
)
print(" [x] Sent %r" % message)
def callback(self, ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
if self.__durable:
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_receiver(self):
print(' [*] Waiting for messages. To exit press CTRL+C')
if self.__durable:
self.__channel.basic_qos(prefetch_count=1)
self.__channel.basic_consume(
queue=self.__queue,
on_message_callback=self.callback,
auto_ack=not self.__durable
)
self.__channel.start_consuming()
def close(self):
self.__connection.close()
if __name__ == "__main__":
rabbit = RabbitMQ()
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-d', '--durable', dest='durable', action='store_true')
arg_parser.add_argument('-q', '--queue', dest='queue')
arg_parser.add_argument('-r', '--receive', dest='worker', action='store_true')
arg_parser.add_argument('-s', '--send', dest='task', nargs='+')
args = arg_parser.parse_args()
if args.task or args.worker:
rabbit.connect()
if args.durable:
rabbit.make_persistent_messages()
gl_queue_name = args.queue if args.queue else 'test'
rabbit.set_queue_name(gl_queue_name)
if args.task:
for msg in args.task:
rabbit.send_message(msg)
rabbit.close()
elif args.worker:
rabbit.start_receiver()