diff --git a/exchange/_bybit_ws.py b/exchange/_bybit_ws.py index 4cd19efe..8af74bff 100644 --- a/exchange/_bybit_ws.py +++ b/exchange/_bybit_ws.py @@ -16,11 +16,8 @@ class BybitWS(AbstractWS): - _instance = None - SUBSCRIBE_OPERATION = "subscribe" UNSUBSCRIBE_OPERATION = "unsubscribe" - PING_OPERATION = "ping" INTERVALS = { Timeframe.ONE_MINUTE: 1, Timeframe.THREE_MINUTES: 3, @@ -44,26 +41,21 @@ class BybitWS(AbstractWS): DATA_KEY = "data" CONFIRM_KEY = "confirm" - def __new__(cls, wss: str): - if cls._instance is None: - cls._instance = super(BybitWS, cls).__new__(cls) - cls._instance.ws = None - cls._instance.wss = wss - cls._instance._channels = set() - cls._instance._receive_semaphore = asyncio.Semaphore(1) - cls._instance._lock = asyncio.Lock() - - return cls._instance + def __init__(self, wss: str): + self.wss = wss + self.ws = None + self._channels = set() + self._lock = asyncio.Lock() async def _connect_to_websocket(self): - self.ws = await websockets.connect( - self.wss, - open_timeout=None, - ping_interval=30, - ping_timeout=15, - close_timeout=None, - ) - await self._resubscribe() + if not self.ws or not self.ws.open: + self.ws = await websockets.connect( + self.wss, + open_timeout=None, + ping_interval=30, + ping_timeout=15, + close_timeout=None, + ) @retry( max_retries=13, @@ -78,6 +70,7 @@ async def _connect_to_websocket(self): async def run(self): await self.close() await self._connect_to_websocket() + await self._subscribe() async def close(self): if self.ws and self.ws.open: @@ -89,44 +82,44 @@ async def close(self): handled_exceptions=(RuntimeError, ConnectionClosedError), ) async def receive(self, symbol, timeframe): - async with self._receive_semaphore: - if not self.ws or not self.ws.open: - await self._connect_to_websocket() + await self._connect_to_websocket() - async for message in self.ws: - data = json.loads(message) + async for message in self.ws: + data = json.loads(message) - if self.TOPIC_KEY not in data: - continue + if self.TOPIC_KEY not in data: + continue - topic = data[self.TOPIC_KEY].split(".") + topic = data[self.TOPIC_KEY].split(".") - if symbol.name == topic[2] and timeframe == self.TIMEFRAMES.get( - topic[1] - ): - return [ - Bar(OHLCV.from_dict(ohlcv), ohlcv.get(self.CONFIRM_KEY)) - for ohlcv in data.get(self.DATA_KEY, {}) - ] + if symbol.name == topic[2] and timeframe == self.TIMEFRAMES.get(topic[1]): + return [ + Bar(OHLCV.from_dict(ohlcv), ohlcv.get(self.CONFIRM_KEY)) + for ohlcv in data.get(self.DATA_KEY, None) + if ohlcv + ] async def subscribe(self, symbol, timeframe): async with self._lock: if (symbol, timeframe) not in self._channels: self._channels.add((symbol, timeframe)) - await self._subscribe(symbol, timeframe) + await self._subscribe() async def unsubscribe(self, symbol, timeframe): async with self._lock: if (symbol, timeframe) in self._channels: self._channels.remove((symbol, timeframe)) - await self._unsubscribe(symbol, timeframe) + await self._unsubscribe() - async def _subscribe(self, symbol, timeframe): + async def _subscribe(self): if not self.ws or not self.ws.open: return - channel = f"{self.KLINE_CHANNEL}.{self.INTERVALS[timeframe]}.{symbol.name}" - subscribe_message = {"op": self.SUBSCRIBE_OPERATION, "args": [channel]} + channels = [ + f"{self.KLINE_CHANNEL}.{self.INTERVALS[timeframe]}.{symbol.name}" + for symbol, timeframe in self._channels + ] + subscribe_message = {"op": self.SUBSCRIBE_OPERATION, "args": channels} try: logger.info(f"Subscribe to: {subscribe_message}") @@ -134,20 +127,18 @@ async def _subscribe(self, symbol, timeframe): except Exception as e: logger.error(f"Failed to send subscribe message: {e}") - async def _unsubscribe(self, symbol, timeframe): + async def _unsubscribe(self): if not self.ws or not self.ws.open: return - channel = f"{self.KLINE_CHANNEL}.{self.INTERVALS[timeframe]}.{symbol.name}" - unsubscribe_message = {"op": self.UNSUBSCRIBE_OPERATION, "args": [channel]} + channels = [ + f"{self.KLINE_CHANNEL}.{self.INTERVALS[timeframe]}.{symbol.name}" + for symbol, timeframe in self._channels + ] + unsubscribe_message = {"op": self.UNSUBSCRIBE_OPERATION, "args": channels} try: logger.info(f"Unsubscribe from: {unsubscribe_message}") await self.ws.send(json.dumps(unsubscribe_message)) except Exception as e: logger.error(f"Failed to send unsubscribe message: {e}") - - async def _resubscribe(self): - async with self._lock: - for symbol, timeframe in self._channels: - await self._subscribe(symbol, timeframe)