Skip to content

Commit b9d01cb

Browse files
fix race condition handling if pipe handler (#2729) (#2809)
Co-authored-by: Yan Cheng <[email protected]>
1 parent 2accf6f commit b9d01cb

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

nvflare/fuel/utils/pipe/pipe_handler.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -296,11 +296,19 @@ def _read(self):
296296
def _try_read(self):
297297
self._last_heartbeat_received_time = time.time()
298298
while not self.asked_to_stop:
299+
time.sleep(self.read_interval)
299300
if self._pause:
300-
time.sleep(self.read_interval)
301301
continue
302302

303-
msg = self.pipe.receive()
303+
# we assign self.pipe to p and access pipe methods through p
304+
# this is because self.pipe could be set to None at any moment (e.g. the abort process could
305+
# stop the pipe handler at any time).
306+
p = self.pipe
307+
if not p:
308+
# the pipe handler is most likely stopped, but we leave it for the while loop to decide
309+
continue
310+
311+
msg = p.receive()
304312
now = time.time()
305313

306314
if msg:
@@ -315,7 +323,7 @@ def _try_read(self):
315323
else:
316324
# is peer gone?
317325
# ask the pipe for the last known active time of the peer
318-
last_peer_active_time = self.pipe.get_last_peer_active_time()
326+
last_peer_active_time = p.get_last_peer_active_time()
319327
if last_peer_active_time > self._last_heartbeat_received_time:
320328
self._last_heartbeat_received_time = last_peer_active_time
321329

@@ -331,7 +339,6 @@ def _try_read(self):
331339
)
332340
break
333341

334-
time.sleep(self.read_interval)
335342
self.reader = None
336343

337344
def _heartbeat(self):

0 commit comments

Comments
 (0)