diff --git a/exchange/_bybit_ws.py b/exchange/_bybit_ws.py index bcac1811..0285629d 100644 --- a/exchange/_bybit_ws.py +++ b/exchange/_bybit_ws.py @@ -13,6 +13,7 @@ logger = logging.getLogger(__name__) +connect_exceptions = (ConnectionError, ConnectionClosedError, asyncio.TimeoutError) class BybitWS(AbstractWS): SUBSCRIBE_OPERATION = "subscribe" @@ -68,7 +69,7 @@ async def _connect(self): @retry( max_retries=13, initial_retry_delay=1, - handled_exceptions=(ConnectionError, ConnectionClosedError), + handled_exceptions=connect_exceptions, ) async def run(self): await self.close() @@ -77,16 +78,15 @@ async def run(self): async def close(self): if self.ws and self.ws.open: await self._unsubscribe() + + if self.ws: await self.ws.close() await self.ws.wait_closed() @retry( max_retries=13, initial_retry_delay=1, - handled_exceptions=( - ConnectionError, - ConnectionClosedError, - ), + handled_exceptions=connect_exceptions, ) async def receive(self, symbol, timeframe): await self._connect() @@ -117,34 +117,30 @@ async def unsubscribe(self, symbol, timeframe): await self._subscribe() async def _subscribe(self): - if not self.ws or not self.ws.open: - return - - subscribe_message = { - "op": self.SUBSCRIBE_OPERATION, - "args": self._get_channels_args(), - } - - try: - logger.info(f"Subscribe to: {subscribe_message}") - await self.ws.send(json.dumps(subscribe_message)) - except Exception as e: - logger.error(f"Failed to send subscribe message: {e}") + await self._send_message(self.SUBSCRIBE_OPERATION) async def _unsubscribe(self): - if not self.ws or not self.ws.open or not self._channels: + await self._send_message(self.UNSUBSCRIBE_OPERATION) + + async def _send_message(self, operation): + if ( + not self.ws + or not self.ws.open + or (operation == self.UNSUBSCRIBE_OPERATION and not self._channels) + ): return - unsubscribe_message = { - "op": self.UNSUBSCRIBE_OPERATION, + message = { + "op": operation, "args": self._get_channels_args(), } try: - logger.info(f"Unsubscribe from: {unsubscribe_message}") - await self.ws.send(json.dumps(unsubscribe_message)) + await asyncio.wait_for(self.ws.send(json.dumps(message)), timeout=5) + except asyncio.TimeoutError: + logger.error("Subscription request timed out") except Exception as e: - logger.error(f"Failed to send unsubscribe message: {e}") + logger.error(f"Failed to send {operation} message: {e}") async def _wait_for_ws(self): while not self.ws or not self.ws.open: