Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Oct 9, 2024
1 parent b0e8384 commit ee8c88b
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 42 deletions.
37 changes: 20 additions & 17 deletions coral/exchange/ws/_bybit.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class BybitWS(AbstractWSExchange):
PING_OPERATION = "ping"
PONG_OPERATION = "pong"

PING_INTERVAL = 20
PING_INTERVAL = 10
PONG_TIMEOUT = 8
AUTH_TIMEOUT = 10

Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(self, wss: str, api_key: str, api_secret: str):
self._lock = asyncio.Lock()
self._subscriptions = set()
self._auth_event = asyncio.Event()
self._message_queue = asyncio.Queue()
self._topic_queues = {}
self._tasks = set()
self._semaphore = asyncio.Semaphore(1)
self._pong_received = asyncio.Event()
Expand Down Expand Up @@ -163,15 +163,6 @@ def _start_tasks(self):
self._tasks.add(self._ping_task)
self._ping_task.add_done_callback(self._tasks.discard)

async def _cancel_tasks(self):
tasks = list(self._tasks)

for task in tasks:
if not task.done():
task.cancel()

await asyncio.gather(*tasks, return_exceptions=True)

@retry(
max_retries=13,
initial_retry_delay=1,
Expand All @@ -190,9 +181,14 @@ async def _receive(self):
self._auth_event.set()

if self._is_data_message(data):
await self._message_queue.put(
(data.get(self.TOPIC_KEY), data.get(self.DATA_KEY))
)
topic = data.get(self.TOPIC_KEY)

if topic and topic in self._topic_queues:
await self._topic_queues[topic].put(data.get(self.DATA_KEY))
else:
logger.warning(
f"Received data for unsubscribed topic: {topic}"
)
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Malformed message received: {e}")
except Exception as e:
Expand All @@ -204,17 +200,24 @@ async def subscribe(self, topic: str):
async with self._lock:
await self._send(self.SUBSCRIBE_OPERATION, [topic])
self._subscriptions.add(topic)
self._topic_queues[topic] = asyncio.Queue()

async def unsubscribe(self, topic: str):
async with self._lock:
await self._send(self.UNSUBSCRIBE_OPERATION, [topic])

if topic in self._subscriptions:
self._subscriptions.remove(topic)
if topic in self._topic_queues:
self._topic_queues.pop(topic)

async def get_message(self, topic: str):
if topic not in self._topic_queues:
logger.error(f"No queue available for topic: {topic}")
return None

async def get_message(self):
message = await self._message_queue.get()
self._message_queue.task_done()
message = await self._topic_queues[topic].get()
self._topic_queues[topic].task_done()
return message

def kline_topic(self, timeframe: Timeframe, symbol: Symbol) -> str:
Expand Down
2 changes: 1 addition & 1 deletion core/interfaces/abstract_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def subscribe(self, topic: str):
pass

@abstractmethod
def get_message(self):
def get_message(self, topic: str):
pass

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion core/interfaces/abstract_stream_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ async def unsubscribe(self, ws: AbstractWSExchange):
pass

@abstractmethod
def parse(self, ws: AbstractWSExchange, topic: str, message: Any) -> Any:
def next(self, ws: AbstractWSExchange) -> Any:
pass
3 changes: 1 addition & 2 deletions feed/streams/base/_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ def __aiter__(self) -> "AsyncRealTimeData":

async def __anext__(self):
try:
topic, data = await self.ws.get_message()
return self.strategy.parse(self.ws, topic, data)
return await self.strategy.next(self.ws)
except StopAsyncIteration:
await self.strategy.unsubscribe()
raise
14 changes: 10 additions & 4 deletions feed/streams/strategy/_kline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ def __init__(self, timeframe: Timeframe, symbol: Symbol):
super().__init__()
self.timeframe = timeframe
self.symbol = symbol
self.topic = None

async def subscribe(self, ws: AbstractWSExchange):
await ws.subscribe(ws.kline_topic(self.timeframe, self.symbol))
self.topic = ws.kline_topic(self.timeframe, self.symbol)
await ws.subscribe(self.topic)

async def unsubscribe(self, ws: AbstractWSExchange):
await ws.unsubscribe(ws.kline_topic(self.timeframe, self.symbol))
if self.topic:
await ws.unsubscribe(self.topic)
self.topic = None

def parse(self, ws: AbstractWSExchange, topic: str, message):
if topic != ws.kline_topic(self.timeframe, self.symbol):
async def next(self, ws: AbstractWSExchange):
if not self.topic:
return []

message = await ws.get_message(self.topic)

return [
Bar(OHLCV.from_dict(ohlcv), ohlcv.get("confirm", False))
for ohlcv in message
Expand Down
14 changes: 10 additions & 4 deletions feed/streams/strategy/_liquidation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,21 @@ class LiquidationStreamStrategy(AbstractStreamStrategy):
def __init__(self, symbol: Symbol):
super().__init__()
self.symbol = symbol
self.topic = None

async def subscribe(self, ws: AbstractWSExchange):
await ws.subscribe(ws.liquidation_topic(self.symbol))
self.topic = ws.liquidation_topic(self.symbol)
await ws.subscribe(self.topic)

async def unsubscribe(self, ws: AbstractWSExchange):
await ws.unsubscribe(ws.liquidation_topic(self.symbol))
if self.topic:
await ws.unsubscribe(self.topic)
self.topic = None

def parse(self, ws: AbstractWSExchange, topic: str, message):
if topic != ws.liquidation_topic(self.symbol):
async def next(self, ws: AbstractWSExchange):
if not self.topic:
return []

message = await ws.get_message(self.topic)

return [message]
14 changes: 10 additions & 4 deletions feed/streams/strategy/_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,24 @@ class OrderStreamStrategy(AbstractStreamStrategy):
def __init__(self, symbol: Symbol):
super().__init__()
self.symbol = symbol
self.topic = None

async def subscribe(self, ws: AbstractWSExchange):
await ws.auth()
await ws.subscribe(ws.order_topic())
self.topic = ws.order_topic()
await ws.subscribe(self.topic)

async def unsubscribe(self, ws: AbstractWSExchange):
await ws.unsubscribe(ws.order_topic())
if self.topic:
await ws.unsubscribe(self.topic)
self.topic = None

def parse(self, ws: AbstractWSExchange, topic, message):
if topic != ws.order_topic():
async def next(self, ws: AbstractWSExchange):
if not self.topic:
return []

message = await ws.get_message(self.topic)

return [
Order.from_dict(order)
for order in message
Expand Down
16 changes: 11 additions & 5 deletions feed/streams/strategy/_order_book.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,27 @@ def __init__(self, symbol: Symbol, depth: int):
super().__init__()
self.symbol = symbol
self.depth = depth
self.topic = None

async def subscribe(
self,
ws: AbstractWSExchange,
):
await ws.subscribe(ws.order_book_topic(self.symbol, self.depth))
self.topic = ws.order_book_topic(self.symbol, self.depth)
await ws.subscribe(self.topic)

async def unsubscribe(
self,
ws: AbstractWSExchange,
):
await ws.unsubscribe(ws.order_book_topic(self.symbol, self.depth))
if self.topic:
await ws.unsubscribe(self.topic)
self.topic = None

def parse(self, ws: AbstractWSExchange, topic, message):
if topic != ws.order_book_topic(self.symbol, self.depth):
async def next(self, ws: AbstractWSExchange):
if not self.topic:
return []

return []
message = await ws.get_message(self.topic)

return [message]
14 changes: 10 additions & 4 deletions feed/streams/strategy/_position.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@ class PositionStreamStrategy(AbstractStreamStrategy):
def __init__(self, symbol: Symbol):
super().__init__()
self.symbol = symbol
self.topic = None

async def subscribe(self, ws: AbstractWSExchange):
await ws.auth()
await ws.subscribe(ws.position_topic())
self.topic = ws.position_topic()
await ws.subscribe(self.topic)

async def unsubscribe(self, ws: AbstractWSExchange):
await ws.unsubscribe(ws.position_topic())
if self.topic:
await ws.unsubscribe(self.topic)
self.topic = None

def parse(self, ws: AbstractWSExchange, topic, message):
if topic != ws.position_topic():
async def next(self, ws: AbstractWSExchange):
if not self.topic:
return []

message = await ws.get_message(self.topic)

return [
position
for position in message
Expand Down

0 comments on commit ee8c88b

Please sign in to comment.