Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Oct 12, 2024
1 parent 78ef70f commit 19148c5
Showing 1 changed file with 26 additions and 11 deletions.
37 changes: 26 additions & 11 deletions reef/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,38 @@ async def _cancel_order(

logging.info(f"Order {order_id} for symbol {symbol.name} canceled.")

async def _fetch_orders(self, datasource):
service = self._datasource_factory.create(datasource, ProtocolType.REST)

return await asyncio.to_thread(service.fetch_all_open_orders)

async def _fetch_open_orders(self):
services = [
DataSourceType.BYBIT,
]
monitor_interval = self.order_config.get("monitor_interval", 10)

for datasource in services:
service = self._datasource_factory.create(datasource, ProtocolType.REST)
orders = await asyncio.to_thread(service.fetch_all_open_orders)
open_orders = []

tasks = [
asyncio.create_task(self._fetch_orders(datasource))
for datasource in services
]

results = await asyncio.gather(*tasks, return_exceptions=True)

for datasource, result in zip(services, results):
open_orders.extend(
[(order_id, symbol, datasource) for order_id, symbol in result]
)

for order_id, symbol in orders:
pq_order = PQOrder(order_id, symbol, datasource)
for order_id, symbol, datasource in open_orders:
pq_order = PQOrder(order_id, symbol, datasource)

if not any(
existing_order.order_id == pq_order.order_id
for existing_order in self._order_queue._queue
):
await self._order_queue.put(pq_order)
if not any(
existing_order.order_id == pq_order.order_id
for existing_order in self._order_queue._queue
):
await self._order_queue.put(pq_order)

await asyncio.sleep(monitor_interval)
await asyncio.sleep(monitor_interval)

0 comments on commit 19148c5

Please sign in to comment.