Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Oct 11, 2024
1 parent 4d5089f commit 59862a8
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion config.default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dom = 15
[order]
order_expiration_time = 8
monitor_interval = 3
untracked_interval = 12
batch_size = 3

[portfolio]
risk_per_trade = 0.0004
Expand Down
3 changes: 2 additions & 1 deletion core/actors/collector/_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ async def stop(self):
await self._queue.join()

for task in list(self._tasks):
task.cancel()
if not task.done():
task.cancel()

await self.wait_for_completion()

Expand Down
53 changes: 39 additions & 14 deletions reef/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def on_stop(self):
self._stop_event.set()

for task in list(self._tasks):
task.cancel()
if not task.done():
task.cancel()

self._tasks.clear()

Expand All @@ -69,27 +70,43 @@ async def on_receive(self, event: NewMarketOrderReceived):
async def _process_orders(self):
expiration_time = self.order_config.get("expiration_time", 10)
monitor_interval = self.order_config.get("monitor_interval", 10)
batch_size = self.order_config.get("batch_size", 4)

try:
while not self._stop_event.is_set():
pq_order = await self._order_queue.get()
queue_size = self._order_queue.qsize()
batch_size = min(queue_size, batch_size)

orders = [
await asyncio.wait_for(
self._order_queue.get(), timeout=monitor_interval
)
for _ in range(batch_size)
]

if not orders:
continue

current_time = time.time()
tasks = []

if current_time - pq_order.timestamp > expiration_time:
await self._cancel_order(
pq_order.order_id, pq_order.symbol, pq_order.datasource
)
else:
sleep_time = expiration_time - (current_time - pq_order.timestamp)
for pq_order in orders:
time_elapsed = current_time - pq_order.timestamp

if sleep_time > 0:
await asyncio.sleep(sleep_time)
if time_elapsed > expiration_time:
tasks.append(
self._cancel_order(
pq_order.order_id, pq_order.symbol, pq_order.datasource
)
)
else:
sleep_time = expiration_time - time_elapsed
tasks.append(self._delayed_cancel_order(pq_order, sleep_time))

await self._cancel_order(
pq_order.order_id, pq_order.symbol, pq_order.datasource
)
await asyncio.gather(*tasks)

self._order_queue.task_done()
for _ in orders:
self._order_queue.task_done()

await asyncio.sleep(monitor_interval)
except asyncio.CancelledError:
Expand All @@ -106,6 +123,14 @@ async def _cancel_order(

logging.info(f"Order {order_id} for symbol {symbol.name} canceled.")

async def _delayed_cancel_order(self, pq_order: PQOrder, sleep_time: int):
if sleep_time > 0:
await asyncio.sleep(sleep_time)

await self._cancel_order(
pq_order.order_id, pq_order.symbol, pq_order.datasource
)

async def _fetch_open_orders(self):
services = [
DataSourceType.BYBIT,
Expand Down

0 comments on commit 59862a8

Please sign in to comment.