diff --git a/infrastructure/event_dispatcher/worker_pool.py b/infrastructure/event_dispatcher/worker_pool.py index 0a83cc4d..ecba6ae4 100644 --- a/infrastructure/event_dispatcher/worker_pool.py +++ b/infrastructure/event_dispatcher/worker_pool.py @@ -60,6 +60,10 @@ def _distribute_workers(self, priority_group: int) -> List[EventWorker]: def _choose_worker(self, group_workers: List[EventWorker]) -> EventWorker: weights = np.array([1 / (worker.queue_size + 1) for worker in group_workers]) - prob = weights / weights.sum() + total_weight = sum(weights) - return np.random.choice(group_workers, p=prob) + choice_point = np.random.uniform(0, total_weight) + cum_weights = np.cumsum(weights) + worker_index = np.searchsorted(cum_weights, choice_point) + + return group_workers[worker_index]