Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 20, 2024
1 parent 97182dc commit e033e32
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions infrastructure/event_dispatcher/worker_pool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from typing import List

import numpy as np

from core.events.base import Event

from .event_dedup import EventDedup
Expand Down Expand Up @@ -29,9 +31,10 @@ async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None:
priority_group = self.load_balancer.determine_priority_group(
event.meta.priority
)

group_workers = self._distribute_workers(priority_group)

worker = min(group_workers, key=lambda worker: worker.queue_size)
worker = self._choose_worker(group_workers)

await worker.dispatch(event, *args, **kwargs)

Expand All @@ -46,8 +49,7 @@ def _initialize_workers(self, num_workers):
for _ in range(num_workers * self._num_priority_groups)
]

for worker in self.workers:
asyncio.create_task(worker.run())
asyncio.gather(*(worker.run() for worker in self.workers))

def _distribute_workers(self, priority_group: int) -> List[EventWorker]:
workers_per_group = len(self.workers) // self._num_priority_groups
Expand All @@ -56,4 +58,8 @@ def _distribute_workers(self, priority_group: int) -> List[EventWorker]:

return self.workers[group_start:group_end]

def _choose_worker(self, group_workers: List[EventWorker]) -> EventWorker:
weights = np.array([1 / (worker.queue_size + 1) for worker in group_workers])
prob = weights / weights.sum()

return np.random.choice(group_workers, p=prob)

0 comments on commit e033e32

Please sign in to comment.