Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Oct 11, 2024
1 parent 212bbc0 commit 6bb1680
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
5 changes: 4 additions & 1 deletion core/actors/collector/_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(self):
self._producers = []
self._consumers = []
self._tasks = set()
self._stop_event = asyncio.Event()

async def start(self, msg: Event):
for producer in self._producers:
Expand All @@ -27,6 +28,8 @@ async def start(self, msg: Event):
task.add_done_callback(self._tasks.discard)

async def stop(self):
self._stop_event.set()

await self._queue.put(STOP)
await self._queue.join()

Expand All @@ -53,7 +56,7 @@ async def _run_producer(self, producer, msg):
await self._queue.put(STOP)

async def _run_consumer(self, consumer):
while True:
while not self._stop_event.is_set():
data = await self._queue.get()

if data is STOP:
Expand Down
5 changes: 4 additions & 1 deletion reef/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
self._datasource_factory = datasource_factory
self._tasks = set()
self.order_config = config_service.get("order")
self._stop_event = asyncio.Event()

def on_start(self):
worker_task = asyncio.create_task(self._process_orders())
Expand All @@ -43,6 +44,8 @@ def on_start(self):
self._tasks.add(poll_task)

def on_stop(self):
self._stop_event.set()

for task in list(self._tasks):
task.cancel()

Expand All @@ -68,7 +71,7 @@ async def _process_orders(self):
monitor_interval = self.order_config.get("monitor_interval", 10)

try:
while True:
while not self._stop_event.is_set():
pq_order = await self._order_queue.get()
current_time = time.time()

Expand Down

0 comments on commit 6bb1680

Please sign in to comment.