Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 7, 2024
1 parent 7f95be8 commit a960e72
Showing 1 changed file with 40 additions and 49 deletions.
89 changes: 40 additions & 49 deletions exchange/_bybit_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@


class BybitWS(AbstractWS):
_instance = None

SUBSCRIBE_OPERATION = "subscribe"
UNSUBSCRIBE_OPERATION = "unsubscribe"
PING_OPERATION = "ping"
INTERVALS = {
Timeframe.ONE_MINUTE: 1,
Timeframe.THREE_MINUTES: 3,
Expand All @@ -44,26 +41,21 @@ class BybitWS(AbstractWS):
DATA_KEY = "data"
CONFIRM_KEY = "confirm"

def __new__(cls, wss: str):
if cls._instance is None:
cls._instance = super(BybitWS, cls).__new__(cls)
cls._instance.ws = None
cls._instance.wss = wss
cls._instance._channels = set()
cls._instance._receive_semaphore = asyncio.Semaphore(1)
cls._instance._lock = asyncio.Lock()

return cls._instance
def __init__(self, wss: str):
self.wss = wss
self.ws = None
self._channels = set()
self._lock = asyncio.Lock()

async def _connect_to_websocket(self):
self.ws = await websockets.connect(
self.wss,
open_timeout=None,
ping_interval=30,
ping_timeout=15,
close_timeout=None,
)
await self._resubscribe()
if not self.ws or not self.ws.open:
self.ws = await websockets.connect(
self.wss,
open_timeout=None,
ping_interval=30,
ping_timeout=15,
close_timeout=None,
)

@retry(
max_retries=13,
Expand All @@ -78,6 +70,7 @@ async def _connect_to_websocket(self):
async def run(self):
await self.close()
await self._connect_to_websocket()
await self._subscribe()

async def close(self):
if self.ws and self.ws.open:
Expand All @@ -89,65 +82,63 @@ async def close(self):
handled_exceptions=(RuntimeError, ConnectionClosedError),
)
async def receive(self, symbol, timeframe):
async with self._receive_semaphore:
if not self.ws or not self.ws.open:
await self._connect_to_websocket()
await self._connect_to_websocket()

async for message in self.ws:
data = json.loads(message)
async for message in self.ws:
data = json.loads(message)

if self.TOPIC_KEY not in data:
continue
if self.TOPIC_KEY not in data:
continue

topic = data[self.TOPIC_KEY].split(".")
topic = data[self.TOPIC_KEY].split(".")

if symbol.name == topic[2] and timeframe == self.TIMEFRAMES.get(
topic[1]
):
return [
Bar(OHLCV.from_dict(ohlcv), ohlcv.get(self.CONFIRM_KEY))
for ohlcv in data.get(self.DATA_KEY, {})
]
if symbol.name == topic[2] and timeframe == self.TIMEFRAMES.get(topic[1]):
return [
Bar(OHLCV.from_dict(ohlcv), ohlcv.get(self.CONFIRM_KEY))
for ohlcv in data.get(self.DATA_KEY, None)
if ohlcv
]

async def subscribe(self, symbol, timeframe):
async with self._lock:
if (symbol, timeframe) not in self._channels:
self._channels.add((symbol, timeframe))
await self._subscribe(symbol, timeframe)
await self._subscribe()

async def unsubscribe(self, symbol, timeframe):
async with self._lock:
if (symbol, timeframe) in self._channels:
self._channels.remove((symbol, timeframe))
await self._unsubscribe(symbol, timeframe)
await self._unsubscribe()

async def _subscribe(self, symbol, timeframe):
async def _subscribe(self):
if not self.ws or not self.ws.open:
return

channel = f"{self.KLINE_CHANNEL}.{self.INTERVALS[timeframe]}.{symbol.name}"
subscribe_message = {"op": self.SUBSCRIBE_OPERATION, "args": [channel]}
channels = [
f"{self.KLINE_CHANNEL}.{self.INTERVALS[timeframe]}.{symbol.name}"
for symbol, timeframe in self._channels
]
subscribe_message = {"op": self.SUBSCRIBE_OPERATION, "args": channels}

try:
logger.info(f"Subscribe to: {subscribe_message}")
await self.ws.send(json.dumps(subscribe_message))
except Exception as e:
logger.error(f"Failed to send subscribe message: {e}")

async def _unsubscribe(self, symbol, timeframe):
async def _unsubscribe(self):
if not self.ws or not self.ws.open:
return

channel = f"{self.KLINE_CHANNEL}.{self.INTERVALS[timeframe]}.{symbol.name}"
unsubscribe_message = {"op": self.UNSUBSCRIBE_OPERATION, "args": [channel]}
channels = [
f"{self.KLINE_CHANNEL}.{self.INTERVALS[timeframe]}.{symbol.name}"
for symbol, timeframe in self._channels
]
unsubscribe_message = {"op": self.UNSUBSCRIBE_OPERATION, "args": channels}

try:
logger.info(f"Unsubscribe from: {unsubscribe_message}")
await self.ws.send(json.dumps(unsubscribe_message))
except Exception as e:
logger.error(f"Failed to send unsubscribe message: {e}")

async def _resubscribe(self):
async with self._lock:
for symbol, timeframe in self._channels:
await self._subscribe(symbol, timeframe)

0 comments on commit a960e72

Please sign in to comment.