Skip to content

Commit

Permalink
thread-safe stop
Browse files Browse the repository at this point in the history
  • Loading branch information
yannbouteiller committed Jan 5, 2024
1 parent e2649cc commit e40426d
Showing 1 changed file with 34 additions and 29 deletions.
63 changes: 34 additions & 29 deletions tlspyo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def __init__(self,

assert accepted_groups is None or isinstance(accepted_groups, dict), "Invalid format for accepted_groups."

self._stopped = False
self._header_size = header_size
self._local_com_port = local_com_port
self._local_com_srv = socket(AF_INET, SOCK_STREAM)
Expand All @@ -95,6 +94,8 @@ def __init__(self,
self._send_local('TEST')

self._finalizer = weakref.finalize(self, self._finalize)
self._stop_lock = Lock()
self._stopped = False

def _finalize(self):
self.stop()
Expand All @@ -109,16 +110,18 @@ def stop(self):
Stop the Relay.
"""
try:
if not self._stopped:
self._send_local('STOP')

self._p.join()
self._local_com_conn.close()
self._local_com_srv.close()
self._local_com_addr = None
self._stopped = True
except KeyboardInterrupt:
with self._stop_lock:
if not self._stopped:
self._send_local('STOP')

self._p.join()
self._local_com_conn.close()
self._local_com_srv.close()
self._local_com_addr = None
self._stopped = True
except KeyboardInterrupt as e:
self.stop()
raise e


class Endpoint:
Expand Down Expand Up @@ -178,8 +181,6 @@ def __init__(self,
elif security == "SSL":
security = "TLS"

self._stopped = False

# threading for local object receiving
self.__obj_buffer = queue.Queue()
self.__socket_closed_lock = Lock()
Expand Down Expand Up @@ -228,6 +229,8 @@ def __init__(self,
self._t_manage_received_objects.start()

self._finalizer = weakref.finalize(self, self._finalize)
self._stop_lock = Lock()
self._stopped = False

def _finalize(self):
self.stop()
Expand Down Expand Up @@ -368,24 +371,26 @@ def stop(self):
Stop the Endpoint.
"""
try:
if not self._stopped:
# send STOP to the local server
self._send_local(cmd='STOP', dest=None, obj=None)

# Join the message reading thread
with self.__socket_closed_lock:
self.__socket_closed_flag = True
self._t_manage_received_objects.join()

# join Twisted process and stop local server
self._p.join()

self._local_com_conn.close()
self._local_com_srv.close()
self._local_com_addr = None
self._stopped = True
except KeyboardInterrupt:
with self._stop_lock:
if not self._stopped:
# send STOP to the local server
self._send_local(cmd='STOP', dest=None, obj=None)

# Join the message reading thread
with self.__socket_closed_lock:
self.__socket_closed_flag = True
self._t_manage_received_objects.join()

# join Twisted process and stop local server
self._p.join()

self._local_com_conn.close()
self._local_com_srv.close()
self._local_com_addr = None
self._stopped = True
except KeyboardInterrupt as e:
self.stop()
raise e

def _process_received_list(self, received_list):
if self._deserialize_locally:
Expand Down

0 comments on commit e40426d

Please sign in to comment.