diff --git a/src/scaler/worker/worker.py b/src/scaler/worker/worker.py index c96e8f087..5a933ef32 100644 --- a/src/scaler/worker/worker.py +++ b/src/scaler/worker/worker.py @@ -10,16 +10,11 @@ import zmq.asyncio from scaler.config.defaults import PROFILING_INTERVAL_SECONDS -from scaler.config.types.network_backend import NetworkBackend from scaler.config.types.object_storage_server import ObjectStorageAddressConfig from scaler.config.types.zmq import ZMQConfig, ZMQType from scaler.io.async_binder import ZMQAsyncBinder from scaler.io.mixins import AsyncBinder, AsyncConnector, AsyncObjectStorageConnector -from scaler.io.utility import ( - create_async_connector, - create_async_object_storage_connector, - get_scaler_network_backend_from_env, -) +from scaler.io.utility import create_async_connector, create_async_object_storage_connector from scaler.io.ymq import ymq from scaler.protocol.python.message import ( ClientDisconnect, @@ -269,8 +264,15 @@ async def __get_loops(self): except Exception as e: logging.exception(f"{self.identity!r}: failed with unhandled exception:\n{e}") - if get_scaler_network_backend_from_env() == NetworkBackend.tcp_zmq: + try: await self._connector_external.send(DisconnectRequest.new_msg(self.identity)) + except (asyncio.CancelledError, ymq.YMQException) as e: + + # This can happen if the scheduler has already closed the connection. + if isinstance(e, asyncio.CancelledError) or e.code == ymq.ErrorCode.ConnectorSocketClosedByRemoteEnd: + pass + else: + logging.exception(f"{self.identity!r}: failed to send DisconnectRequest:\n{e}") self._connector_external.destroy() self._processor_manager.destroy("quit") @@ -281,16 +283,8 @@ async def __get_loops(self): logging.info(f"{self.identity!r}: quit") def __register_signal(self): - backend = get_scaler_network_backend_from_env() - if backend == NetworkBackend.tcp_zmq: - self._loop.add_signal_handler(signal.SIGINT, self.__destroy) - self._loop.add_signal_handler(signal.SIGTERM, self.__destroy) - elif backend == NetworkBackend.ymq: - self._loop.add_signal_handler(signal.SIGINT, lambda: asyncio.ensure_future(self.__graceful_shutdown())) - self._loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.ensure_future(self.__graceful_shutdown())) - - async def __graceful_shutdown(self): - await self._connector_external.send(DisconnectRequest.new_msg(self.identity)) + self._loop.add_signal_handler(signal.SIGINT, self.__destroy) + self._loop.add_signal_handler(signal.SIGTERM, self.__destroy) def __destroy(self): self._task.cancel()