Skip to content

Commit ae6b393

Browse files
remleduffJoel 'Aaron' Cohen
authored and
Joel 'Aaron' Cohen
committed
Make Socket._wait_for follow njsmith's original a little closer
I'd ignored what would happen if there were multiple send tasks waiting
1 parent d58c4c3 commit ae6b393

File tree

3 files changed

+55
-10
lines changed

3 files changed

+55
-10
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
venv
2+
__pycache__

pubsub.py

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
2+
import trzmq
3+
import trio
4+
import zmq
5+
6+
7+
async def pub(n):
8+
context = zmq.Context()
9+
socket = context.socket(zmq.PUB)
10+
socket.connect("tcp://0.0.0.0:5556")
11+
12+
s = trzmq.Socket(socket)
13+
i = 1
14+
while True:
15+
topic = b"ZMQ-Test"
16+
message = f"Hello, from {n}: {i}..."
17+
await s.send(b"%b %b" % (topic, message.encode()))
18+
print("%s %s" % (topic, message))
19+
i += 1
20+
await trio.sleep(1)
21+
22+
23+
async def sub():
24+
context = zmq.Context()
25+
socket = context.socket(zmq.SUB)
26+
socket.bind("tcp://0.0.0.0:5556")
27+
socket.setsockopt_string(zmq.SUBSCRIBE, '')
28+
s = trzmq.Socket(socket)
29+
while True:
30+
string = await s.recv()
31+
print(string)
32+
33+
34+
async def run():
35+
async with trio.open_nursery() as nursery:
36+
nursery.start_soon(sub)
37+
nursery.start_soon(pub, 1)
38+
nursery.start_soon(pub, 2)
39+
40+
41+
trio.run(run)

trzmq/_socket.py

+12-10
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,20 @@ async def _wait_for(self, flag):
4545
await trio.lowlevel.checkpoint()
4646
return
4747

48-
if not self._someone_is_watching_trigger:
49-
self._someone_is_watching_trigger = True
50-
try:
51-
while not self._events & flag:
48+
while not self._events & flag:
49+
if not self._someone_is_watching_trigger:
50+
self._someone_is_watching_trigger = True
51+
try:
5252
await trio.lowlevel.wait_readable(self._zmq_sock.fd)
53+
finally:
54+
self._someone_is_watching_trigger = False
5355
self._update_events()
54-
finally:
55-
self._someone_is_watching_trigger = False
56-
else:
57-
# Someone else is watching the socket, so we just need to
58-
# wait to be woken up.
59-
await self._wake_events[flag].wait()
56+
else:
57+
# Someone else is watching the socket, so we just need to
58+
# wait to be woken up
59+
await self._wake_events[flag].wait()
60+
# self._events will be uptodate because if there was another
61+
# sender/receiver they ran _update_events after send/receive
6062

6163
# XX needs conflict detection
6264
# ...or does it? each send() call is atomic, like a DGRAM socket.

0 commit comments

Comments
 (0)