-
Notifications
You must be signed in to change notification settings - Fork 7
/
receive_ios.py
executable file
·117 lines (95 loc) · 3.18 KB
/
receive_ios.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python
#coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
import threading
import time
import pika
import json
from thread_manager import thread_manager
from config import get_config
from push_ios import push
def main(thread_id):
thread_str = 'Thread %s : ' % str(thread_id)
user = config['user']
password = config['password']
queue = config['queue']
queue_size = get_queue_size()
try:
credentials = pika.PlainCredentials(user, password)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials))
channel = connection.channel()
channel.queue_declare(queue=queue,durable=True)
except:
print 'rabbitmq conneciton fail'
sys.exit()
print thread_str + ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
#print " [x] Received json "+ (body)
print thread_str + " [x] Received... "
ret = push(json.loads(body), config)
if ret == True:
print thread_str + "push success "+ body
else:
print thread_str + 'push fail: '+body
ch.basic_ack(delivery_tag = method.delivery_tag)
ch.stop_consuming()
channel.basic_qos(prefetch_count=10)
channel.basic_consume(callback, queue=queue)
if queue_size > 0:
#print thread_str + 'start consuming...'
#这里如果开启consuming会阻塞
#channel.start_consuming()
pass
else:
print thread_str + 'queue is empty, stop consuming'
channel.stop_consuming()
connection.close()
def get_queue_size():
user = config['user']
password = config['password']
queue = config['queue']
try:
credentials = pika.PlainCredentials(user, password)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials))
channel = connection.channel()
declare_ok = channel.queue_declare(queue=queue,durable=True,passive=True)
return declare_ok.method.message_count
except:
print 'rabbitmq conneciton fail while get queue size'
return 0
def get_thread_num(queue_size):
global _sleep
if queue_size > 100:
thread_num = 100
_sleep = 0.1
elif queue_size > 10:
thread_num = 5
_sleep = 0.1
else:
thread_num = 2
_sleep = 2
return thread_num
if __name__ == '__main__':
config = get_config()
if not config:
print 'config error'
sys.exit()
_sleep = 2
thread_num = get_thread_num(get_queue_size())
# 防止KeyboardInterrupt时报错,程序会在下一步捕获KeyboardInterrupt
try:
thread_manager(thread_num, main)
except:
pass
# 所有线程执行完退出后 循环检查
try:
while True:
time.sleep(_sleep)
print 'all threads exit. loop checking...'
queue_size = get_queue_size();
if queue_size > 0:
thread_manager(get_thread_num(queue_size), main)
except KeyboardInterrupt:
print "\nthread manager stoped"