Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 13, 2024
1 parent 707a749 commit 39ef728
Showing 1 changed file with 20 additions and 24 deletions.
44 changes: 20 additions & 24 deletions exchange/_bybit_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

logger = logging.getLogger(__name__)

connect_exceptions = (ConnectionError, ConnectionClosedError, asyncio.TimeoutError)

class BybitWS(AbstractWS):
SUBSCRIBE_OPERATION = "subscribe"
Expand Down Expand Up @@ -68,7 +69,7 @@ async def _connect(self):
@retry(
max_retries=13,
initial_retry_delay=1,
handled_exceptions=(ConnectionError, ConnectionClosedError),
handled_exceptions=connect_exceptions,
)
async def run(self):
await self.close()
Expand All @@ -77,16 +78,15 @@ async def run(self):
async def close(self):
if self.ws and self.ws.open:
await self._unsubscribe()

if self.ws:
await self.ws.close()
await self.ws.wait_closed()

@retry(
max_retries=13,
initial_retry_delay=1,
handled_exceptions=(
ConnectionError,
ConnectionClosedError,
),
handled_exceptions=connect_exceptions,
)
async def receive(self, symbol, timeframe):
await self._connect()
Expand Down Expand Up @@ -117,34 +117,30 @@ async def unsubscribe(self, symbol, timeframe):
await self._subscribe()

async def _subscribe(self):
if not self.ws or not self.ws.open:
return

subscribe_message = {
"op": self.SUBSCRIBE_OPERATION,
"args": self._get_channels_args(),
}

try:
logger.info(f"Subscribe to: {subscribe_message}")
await self.ws.send(json.dumps(subscribe_message))
except Exception as e:
logger.error(f"Failed to send subscribe message: {e}")
await self._send_message(self.SUBSCRIBE_OPERATION)

async def _unsubscribe(self):
if not self.ws or not self.ws.open or not self._channels:
await self._send_message(self.UNSUBSCRIBE_OPERATION)

async def _send_message(self, operation):
if (
not self.ws
or not self.ws.open
or (operation == self.UNSUBSCRIBE_OPERATION and not self._channels)
):
return

unsubscribe_message = {
"op": self.UNSUBSCRIBE_OPERATION,
message = {
"op": operation,
"args": self._get_channels_args(),
}

try:
logger.info(f"Unsubscribe from: {unsubscribe_message}")
await self.ws.send(json.dumps(unsubscribe_message))
await asyncio.wait_for(self.ws.send(json.dumps(message)), timeout=5)
except asyncio.TimeoutError:
logger.error("Subscription request timed out")
except Exception as e:
logger.error(f"Failed to send unsubscribe message: {e}")
logger.error(f"Failed to send {operation} message: {e}")

async def _wait_for_ws(self):
while not self.ws or not self.ws.open:
Expand Down

0 comments on commit 39ef728

Please sign in to comment.