-
Notifications
You must be signed in to change notification settings - Fork 2
/
zhelpers.py
66 lines (55 loc) · 1.45 KB
/
zhelpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# encoding: utf-8
"""
Helper module for example applications. Mimics ZeroMQ Guide's zhelpers.h.
"""
import binascii
import os
from random import randint
import zmq
# fix ROUTER/DEALER aliases, missing from pyzmq < 2.1.9
if not hasattr(zmq, 'ROUTER'):
zmq.ROUTER = zmq.XREP
if not hasattr(zmq, 'DEALER'):
zmq.DEALER = zmq.XREQ
# Receives all message parts from socket, prints neatly
def dump(msg_or_socket):
if isinstance(msg_or_socket, zmq.Socket):
# it's a socket, call on current message
return dump(msg_or_socket.recv_multipart())
else:
msg = msg_or_socket
print("----------------------------------------")
for part in msg:
part = part.decode()
print("[%03d]" % len(part), end=' ')
is_text = True
for c in part:
if ord(c) < 32 or ord(c) > 128:
is_text = False
break
if is_text:
# print only if ascii text
print(part)
else:
# not text, print hex
print(binascii.hexlify(part))
return msg
# Set simple random printable identity on socket
def set_id(zsocket):
identity = "%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
zsocket.setsockopt(zmq.IDENTITY, identity)
def zpipe(ctx):
"""build inproc pipe for talking to threads
mimic pipe used in czmq zthread_fork.
Returns a pair of PAIRs connected via inproc
"""
a = ctx.socket(zmq.PAIR)
a.linger = 0
b = ctx.socket(zmq.PAIR)
b.linger = 0
a.hwm = 1
b.hwm = 1
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
return a,b