diff --git a/reef/_actor.py b/reef/_actor.py index 73960a30..34414ecf 100644 --- a/reef/_actor.py +++ b/reef/_actor.py @@ -107,23 +107,38 @@ async def _cancel_order( logging.info(f"Order {order_id} for symbol {symbol.name} canceled.") + async def _fetch_orders(self, datasource): + service = self._datasource_factory.create(datasource, ProtocolType.REST) + + return await asyncio.to_thread(service.fetch_all_open_orders) + async def _fetch_open_orders(self): services = [ DataSourceType.BYBIT, ] monitor_interval = self.order_config.get("monitor_interval", 10) - for datasource in services: - service = self._datasource_factory.create(datasource, ProtocolType.REST) - orders = await asyncio.to_thread(service.fetch_all_open_orders) + open_orders = [] + + tasks = [ + asyncio.create_task(self._fetch_orders(datasource)) + for datasource in services + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + for datasource, result in zip(services, results): + open_orders.extend( + [(order_id, symbol, datasource) for order_id, symbol in result] + ) - for order_id, symbol in orders: - pq_order = PQOrder(order_id, symbol, datasource) + for order_id, symbol, datasource in open_orders: + pq_order = PQOrder(order_id, symbol, datasource) - if not any( - existing_order.order_id == pq_order.order_id - for existing_order in self._order_queue._queue - ): - await self._order_queue.put(pq_order) + if not any( + existing_order.order_id == pq_order.order_id + for existing_order in self._order_queue._queue + ): + await self._order_queue.put(pq_order) - await asyncio.sleep(monitor_interval) + await asyncio.sleep(monitor_interval)