Skip to content
28 changes: 11 additions & 17 deletions src/scaler/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,11 @@
import zmq.asyncio

from scaler.config.defaults import PROFILING_INTERVAL_SECONDS
from scaler.config.types.network_backend import NetworkBackend
from scaler.config.types.object_storage_server import ObjectStorageAddressConfig
from scaler.config.types.zmq import ZMQConfig, ZMQType
from scaler.io.async_binder import ZMQAsyncBinder
from scaler.io.mixins import AsyncBinder, AsyncConnector, AsyncObjectStorageConnector
from scaler.io.utility import (
create_async_connector,
create_async_object_storage_connector,
get_scaler_network_backend_from_env,
)
from scaler.io.utility import create_async_connector, create_async_object_storage_connector
from scaler.io.ymq import ymq
from scaler.protocol.python.message import (
ClientDisconnect,
Expand Down Expand Up @@ -269,8 +264,15 @@ async def __get_loops(self):
except Exception as e:
logging.exception(f"{self.identity!r}: failed with unhandled exception:\n{e}")

if get_scaler_network_backend_from_env() == NetworkBackend.tcp_zmq:
try:
await self._connector_external.send(DisconnectRequest.new_msg(self.identity))
except (asyncio.CancelledError, ymq.YMQException) as e:

# This can happen if the scheduler has already closed the connection.
if isinstance(e, asyncio.CancelledError) or e.code == ymq.ErrorCode.ConnectorSocketClosedByRemoteEnd:
pass
else:
logging.exception(f"{self.identity!r}: failed to send DisconnectRequest:\n{e}")

self._connector_external.destroy()
self._processor_manager.destroy("quit")
Expand All @@ -281,16 +283,8 @@ async def __get_loops(self):
logging.info(f"{self.identity!r}: quit")

def __register_signal(self):
backend = get_scaler_network_backend_from_env()
if backend == NetworkBackend.tcp_zmq:
self._loop.add_signal_handler(signal.SIGINT, self.__destroy)
self._loop.add_signal_handler(signal.SIGTERM, self.__destroy)
elif backend == NetworkBackend.ymq:
self._loop.add_signal_handler(signal.SIGINT, lambda: asyncio.ensure_future(self.__graceful_shutdown()))
self._loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.ensure_future(self.__graceful_shutdown()))

async def __graceful_shutdown(self):
await self._connector_external.send(DisconnectRequest.new_msg(self.identity))
self._loop.add_signal_handler(signal.SIGINT, self.__destroy)
self._loop.add_signal_handler(signal.SIGTERM, self.__destroy)

def __destroy(self):
self._task.cancel()
Loading