From 97182dc72ab4852c5e707ef3285f1a5a78b56863 Mon Sep 17 00:00:00 2001 From: m5l14i11 Date: Fri, 20 Sep 2024 16:38:47 +0300 Subject: [PATCH] upd --- .../event_dispatcher/event_worker.py | 28 ++++++++------- .../event_dispatcher/worker_pool.py | 34 +++++++++++++------ 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/infrastructure/event_dispatcher/event_worker.py b/infrastructure/event_dispatcher/event_worker.py index d1ad4c1b..e8114769 100644 --- a/infrastructure/event_dispatcher/event_worker.py +++ b/infrastructure/event_dispatcher/event_worker.py @@ -14,30 +14,34 @@ def __init__( cancel_event: asyncio.Event, dedup: EventDedup, ): - self.event_handler = event_handler - self.cancel_event = cancel_event - self.dedup = dedup - self.queue = asyncio.Queue() + self._event_handler = event_handler + self._cancel_event = cancel_event + self._dedup = dedup + self._queue = asyncio.Queue() + + @property + def queue_size(self): + return self._queue.qsize() async def run(self): async for event, args, kwargs in self._get_event_stream(): - await self.event_handler.handle_event(event, *args, **kwargs) + await self._event_handler.handle_event(event, *args, **kwargs) async def _get_event_stream( self, ) -> AsyncIterable[Tuple[Event, Tuple[Any], Dict[str, Any]]]: - while not self.cancel_event.is_set(): - event, args, kwargs = await self.queue.get() + while not self._cancel_event.is_set(): + event, args, kwargs = await self._queue.get() yield event, args, kwargs - await self.dedup.remove_event(event) + await self._dedup.remove_event(event) - self.queue.task_done() + self._queue.task_done() async def dispatch(self, event: Event, *args, **kwargs) -> None: - if await self.dedup.add_event(event): - await self.queue.put((event, args, kwargs)) + if await self._dedup.add_event(event): + await self._queue.put((event, args, kwargs)) async def wait(self) -> None: - await self.queue.join() + await self._queue.join() diff --git a/infrastructure/event_dispatcher/worker_pool.py b/infrastructure/event_dispatcher/worker_pool.py index cb48dab6..a61527fc 100644 --- a/infrastructure/event_dispatcher/worker_pool.py +++ b/infrastructure/event_dispatcher/worker_pool.py @@ -1,4 +1,5 @@ import asyncio +from typing import List from core.events.base import Event @@ -21,25 +22,38 @@ def __init__( self.dedup = EventDedup() self.event_handler = event_handler self.cancel_event = cancel_event + self._num_priority_groups = num_piority_groups self._initialize_workers(num_workers) - def _initialize_workers(self, num_workers): - self.workers = [ - EventWorker(self.event_handler, self.cancel_event, self.dedup) - for _ in range(num_workers) - ] - - for worker in self.workers: - asyncio.create_task(worker.run()) - async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None: priority_group = self.load_balancer.determine_priority_group( event.meta.priority ) - worker = self.workers[priority_group % len(self.workers)] + group_workers = self._distribute_workers(priority_group) + + worker = min(group_workers, key=lambda worker: worker.queue_size) await worker.dispatch(event, *args, **kwargs) + self.load_balancer.register_event(priority_group) async def wait(self) -> None: await asyncio.gather(*(worker.wait() for worker in self.workers)) + + def _initialize_workers(self, num_workers): + self.workers = [ + EventWorker(self.event_handler, self.cancel_event, self.dedup) + for _ in range(num_workers * self._num_priority_groups) + ] + + for worker in self.workers: + asyncio.create_task(worker.run()) + + def _distribute_workers(self, priority_group: int) -> List[EventWorker]: + workers_per_group = len(self.workers) // self._num_priority_groups + group_start = priority_group * workers_per_group + group_end = group_start + workers_per_group + + return self.workers[group_start:group_end] + +