Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Oct 9, 2024
1 parent db1703e commit b0e8384
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
44 changes: 34 additions & 10 deletions coral/exchange/ws/_bybit.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class BybitWS(AbstractWSExchange):
PONG_OPERATION = "pong"

PING_INTERVAL = 20
PONG_TIMEOUT = 18
PONG_TIMEOUT = 8
AUTH_TIMEOUT = 10

INTERVALS = {
Timeframe.ONE_MINUTE: 1,
Expand Down Expand Up @@ -80,13 +81,13 @@ async def _connect(self):
ping_timeout=self.PONG_TIMEOUT,
close_timeout=10,
user_agent_header=user_agent,
max_queue=2,
max_queue=4,
)

await self._wait_for_ws(timeout=5)
logger.info("WebSocket connection established.")
await self._handle_reconnect()
self._start_tasks()
await self._handle_reconnect()
except Exception as e:
logger.error(f"Failed to connect to WebSocket: {e}")
raise ConnectionError("Failed to connect to WebSocket") from None
Expand All @@ -108,7 +109,6 @@ async def close(self):
logger.error(f"Failed to close WebSocket properly: {e}")
finally:
self.ws = None
self._tasks.clear()

async def auth(self):
expires = int(time.time() * 10**3) + 3 * 10**3
Expand All @@ -120,20 +120,32 @@ async def auth(self):
).hexdigest()

await self._send(self.AUTH_OPERATION, [self.api_key, expires, sign])
await self._auth_event.wait()
await asyncio.wait_for(self._auth_event.wait(), timeout=self.AUTH_TIMEOUT)

async def _manage_ping_pong(self):
max_ping_retries = 3
retries = 0

while True:
try:
await self.ws.send(json.dumps({self.OP_KEY: self.PING_OPERATION}))
await asyncio.wait_for(
self._pong_received.wait(), timeout=self.PONG_TIMEOUT
)
retries = 0
self._pong_received.clear()
except asyncio.TimeoutError:
logger.warning("Pong response timed out. Reconnecting WebSocket.")
await self.connect()
return
retries += 1
logger.warning(
f"Pong response timed out. Attempt {retries}/{max_ping_retries}."
)

if retries >= max_ping_retries:
logger.error("Max retries exceeded. Reconnecting WebSocket.")
await self.connect()
return
else:
await asyncio.sleep(self.PING_INTERVAL)
except Exception as e:
logger.error(f"Ping/Pong management error: {e}")
await self.close()
Expand All @@ -151,6 +163,15 @@ def _start_tasks(self):
self._tasks.add(self._ping_task)
self._ping_task.add_done_callback(self._tasks.discard)

async def _cancel_tasks(self):
tasks = list(self._tasks)

for task in tasks:
if not task.done():
task.cancel()

await asyncio.gather(*tasks, return_exceptions=True)

@retry(
max_retries=13,
initial_retry_delay=1,
Expand All @@ -176,7 +197,7 @@ async def _receive(self):
logger.error(f"Malformed message received: {e}")
except Exception as e:
logger.exception(f"Unexpected error while receiving message: {e}")
await self._handle_reconnect()
await self.connect()
raise ConnectionError("WebSocket connection error.") from None

async def subscribe(self, topic: str):
Expand Down Expand Up @@ -225,7 +246,7 @@ async def _send(self, operation, args, timeout=5):
await asyncio.wait_for(self.ws.send(json.dumps(message)), timeout=timeout)

if operation != self.AUTH_OPERATION:
logger.info(f"Subscribed {operation.capitalize()} to: {message}")
logger.info(f"{operation.capitalize()} to: {message}")
except asyncio.TimeoutError:
logger.error("Subscription request timed out")
except Exception as e:
Expand Down Expand Up @@ -267,6 +288,9 @@ async def _resubscribe_all(self):
logger.info(f"Resubscribed to all topics: {list(self._subscriptions)}")

async def _handle_reconnect(self):
if not self.ws:
return

if self._auth_event.is_set():
await self.auth()

Expand Down
4 changes: 1 addition & 3 deletions core/actors/_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ def collector(self) -> "DataCollector":
return self._collector

def on_stop(self):
task = asyncio.create_task(self.collector.stop())
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
asyncio.create_task(self.collector.stop())

def pre_receive(self, msg) -> bool:
return FeedPolicy.should_process(self, msg)
Expand Down

0 comments on commit b0e8384

Please sign in to comment.