From 6558039972549b0afc77d111c0c2fc1a3e7d11b8 Mon Sep 17 00:00:00 2001 From: m5l14i11 Date: Mon, 16 Sep 2024 01:34:03 +0300 Subject: [PATCH] upd --- infrastructure/event_dispatcher/event_worker.py | 4 +--- infrastructure/event_dispatcher/worker_pool.py | 3 +++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/infrastructure/event_dispatcher/event_worker.py b/infrastructure/event_dispatcher/event_worker.py index 1bada320..d1ad4c1b 100644 --- a/infrastructure/event_dispatcher/event_worker.py +++ b/infrastructure/event_dispatcher/event_worker.py @@ -17,11 +17,9 @@ def __init__( self.event_handler = event_handler self.cancel_event = cancel_event self.dedup = dedup - self.queue = asyncio.Queue() - self.tasks = asyncio.create_task(self._process_events()) - async def _process_events(self): + async def run(self): async for event, args, kwargs in self._get_event_stream(): await self.event_handler.handle_event(event, *args, **kwargs) diff --git a/infrastructure/event_dispatcher/worker_pool.py b/infrastructure/event_dispatcher/worker_pool.py index 8f8f7b9e..cb48dab6 100644 --- a/infrastructure/event_dispatcher/worker_pool.py +++ b/infrastructure/event_dispatcher/worker_pool.py @@ -29,6 +29,9 @@ def _initialize_workers(self, num_workers): for _ in range(num_workers) ] + for worker in self.workers: + asyncio.create_task(worker.run()) + async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None: priority_group = self.load_balancer.determine_priority_group( event.meta.priority