-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.py
90 lines (69 loc) · 2.68 KB
/
queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# coding: utf-8
from __future__ import unicode_literals
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol, task
from config.settings import *
from twisted.logger import Logger
import pika
import json
class NotificationRunner:
log = Logger()
"""
Runs Notification consumer that publishes messages to user's channel
"""
def __init__(self, wamp_session=None):
credentials = pika.PlainCredentials(username=RABBITMQ_USER, password=RABBITMQ_PASS)
self.parameters = pika.ConnectionParameters(credentials=credentials)
self.client = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, self.parameters)
self.session = wamp_session
@defer.inlineCallbacks
def start(self, connection):
channel = yield connection.channel()
exchange = yield channel.exchange_declare(exchange=NOTIFICATION_EXCHANGE, type='topic', durable=True)
queue = yield channel.queue_declare(queue=NOTIFICATION_QUEUE, auto_delete=False, exclusive=False, durable=True)
yield channel.queue_bind(
exchange=NOTIFICATION_EXCHANGE, queue=NOTIFICATION_QUEUE, routing_key=NOTIFICATION_ROUTING_KEY
)
# yield channel.basic_qos(prefetch_count=1)
queue_object, consumer_tag = yield channel.basic_consume(queue=NOTIFICATION_QUEUE, no_ack=False)
l = task.LoopingCall(self.read, queue_object)
l.start(0.01)
@defer.inlineCallbacks
def read(self, queue_object):
ch, method, properties, body = yield queue_object.get()
if body:
data = json.loads(body)
"""
{
'channel': '',
'created_by': {
'first_name': '',
'id': 0,
'last_name': '',
'login': '',
'middle_name': None,
'photo': ''
},
'event_id': '',
'message': ''
}
"""
self.log.info("New Event {}".format(data.get('event_id')))
self.log.info(data.get('message'))
self.log.info("==============")
if self.session:
self.session.publish(data.get('channel'), data.get('message'))
yield ch.basic_ack(delivery_tag=method.delivery_tag)
def connect(self):
d = self.client.connectTCP(RABBITMQ_HOST, RABBITMQ_PORT)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(self.start)
def run(self):
""""
Runs reactor
"""
self.connect()
reactor.run()
if __name__ == "__main__":
runner = NotificationRunner()
runner.run()