From e033e32086e086435ba9990539ea39b2926f1aa5 Mon Sep 17 00:00:00 2001 From: m5l14i11 Date: Fri, 20 Sep 2024 17:25:48 +0300 Subject: [PATCH] upd --- infrastructure/event_dispatcher/worker_pool.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/infrastructure/event_dispatcher/worker_pool.py b/infrastructure/event_dispatcher/worker_pool.py index a61527fc..0a83cc4d 100644 --- a/infrastructure/event_dispatcher/worker_pool.py +++ b/infrastructure/event_dispatcher/worker_pool.py @@ -1,6 +1,8 @@ import asyncio from typing import List +import numpy as np + from core.events.base import Event from .event_dedup import EventDedup @@ -29,9 +31,10 @@ async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None: priority_group = self.load_balancer.determine_priority_group( event.meta.priority ) + group_workers = self._distribute_workers(priority_group) - worker = min(group_workers, key=lambda worker: worker.queue_size) + worker = self._choose_worker(group_workers) await worker.dispatch(event, *args, **kwargs) @@ -46,8 +49,7 @@ def _initialize_workers(self, num_workers): for _ in range(num_workers * self._num_priority_groups) ] - for worker in self.workers: - asyncio.create_task(worker.run()) + asyncio.gather(*(worker.run() for worker in self.workers)) def _distribute_workers(self, priority_group: int) -> List[EventWorker]: workers_per_group = len(self.workers) // self._num_priority_groups @@ -56,4 +58,8 @@ def _distribute_workers(self, priority_group: int) -> List[EventWorker]: return self.workers[group_start:group_end] + def _choose_worker(self, group_workers: List[EventWorker]) -> EventWorker: + weights = np.array([1 / (worker.queue_size + 1) for worker in group_workers]) + prob = weights / weights.sum() + return np.random.choice(group_workers, p=prob)