diff --git a/config.default.ini b/config.default.ini index e663d60d..b6e6127a 100644 --- a/config.default.ini +++ b/config.default.ini @@ -22,7 +22,7 @@ dom = 15 [order] order_expiration_time = 8 monitor_interval = 3 -untracked_interval = 12 +batch_size = 3 [portfolio] risk_per_trade = 0.0004 diff --git a/core/actors/collector/_data.py b/core/actors/collector/_data.py index 5875251b..a6e07c5f 100644 --- a/core/actors/collector/_data.py +++ b/core/actors/collector/_data.py @@ -34,7 +34,8 @@ async def stop(self): await self._queue.join() for task in list(self._tasks): - task.cancel() + if not task.done(): + task.cancel() await self.wait_for_completion() diff --git a/reef/_actor.py b/reef/_actor.py index e02a879a..ff057e2e 100644 --- a/reef/_actor.py +++ b/reef/_actor.py @@ -47,7 +47,8 @@ def on_stop(self): self._stop_event.set() for task in list(self._tasks): - task.cancel() + if not task.done(): + task.cancel() self._tasks.clear() @@ -69,27 +70,43 @@ async def on_receive(self, event: NewMarketOrderReceived): async def _process_orders(self): expiration_time = self.order_config.get("expiration_time", 10) monitor_interval = self.order_config.get("monitor_interval", 10) + batch_size = self.order_config.get("batch_size", 4) try: while not self._stop_event.is_set(): - pq_order = await self._order_queue.get() + queue_size = self._order_queue.qsize() + batch_size = min(queue_size, batch_size) + + orders = [ + await asyncio.wait_for( + self._order_queue.get(), timeout=monitor_interval + ) + for _ in range(batch_size) + ] + + if not orders: + continue + current_time = time.time() + tasks = [] - if current_time - pq_order.timestamp > expiration_time: - await self._cancel_order( - pq_order.order_id, pq_order.symbol, pq_order.datasource - ) - else: - sleep_time = expiration_time - (current_time - pq_order.timestamp) + for pq_order in orders: + time_elapsed = current_time - pq_order.timestamp - if sleep_time > 0: - await asyncio.sleep(sleep_time) + if time_elapsed > expiration_time: + tasks.append( + self._cancel_order( + pq_order.order_id, pq_order.symbol, pq_order.datasource + ) + ) + else: + sleep_time = expiration_time - time_elapsed + tasks.append(self._delayed_cancel_order(pq_order, sleep_time)) - await self._cancel_order( - pq_order.order_id, pq_order.symbol, pq_order.datasource - ) + await asyncio.gather(*tasks) - self._order_queue.task_done() + for _ in orders: + self._order_queue.task_done() await asyncio.sleep(monitor_interval) except asyncio.CancelledError: @@ -106,6 +123,14 @@ async def _cancel_order( logging.info(f"Order {order_id} for symbol {symbol.name} canceled.") + async def _delayed_cancel_order(self, pq_order: PQOrder, sleep_time: int): + if sleep_time > 0: + await asyncio.sleep(sleep_time) + + await self._cancel_order( + pq_order.order_id, pq_order.symbol, pq_order.datasource + ) + async def _fetch_open_orders(self): services = [ DataSourceType.BYBIT,