diff --git a/infrastructure/event_dispatcher/worker_pool.py b/infrastructure/event_dispatcher/worker_pool.py index a61527fc..0a83cc4d 100644 --- a/infrastructure/event_dispatcher/worker_pool.py +++ b/infrastructure/event_dispatcher/worker_pool.py @@ -1,6 +1,8 @@ import asyncio from typing import List +import numpy as np + from core.events.base import Event from .event_dedup import EventDedup @@ -29,9 +31,10 @@ async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None: priority_group = self.load_balancer.determine_priority_group( event.meta.priority ) + group_workers = self._distribute_workers(priority_group) - worker = min(group_workers, key=lambda worker: worker.queue_size) + worker = self._choose_worker(group_workers) await worker.dispatch(event, *args, **kwargs) @@ -46,8 +49,7 @@ def _initialize_workers(self, num_workers): for _ in range(num_workers * self._num_priority_groups) ] - for worker in self.workers: - asyncio.create_task(worker.run()) + asyncio.gather(*(worker.run() for worker in self.workers)) def _distribute_workers(self, priority_group: int) -> List[EventWorker]: workers_per_group = len(self.workers) // self._num_priority_groups @@ -56,4 +58,8 @@ def _distribute_workers(self, priority_group: int) -> List[EventWorker]: return self.workers[group_start:group_end] + def _choose_worker(self, group_workers: List[EventWorker]) -> EventWorker: + weights = np.array([1 / (worker.queue_size + 1) for worker in group_workers]) + prob = weights / weights.sum() + return np.random.choice(group_workers, p=prob)