Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 20, 2024
1 parent 025c203 commit 97182dc
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
28 changes: 16 additions & 12 deletions infrastructure/event_dispatcher/event_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,34 @@ def __init__(
cancel_event: asyncio.Event,
dedup: EventDedup,
):
self.event_handler = event_handler
self.cancel_event = cancel_event
self.dedup = dedup
self.queue = asyncio.Queue()
self._event_handler = event_handler
self._cancel_event = cancel_event
self._dedup = dedup
self._queue = asyncio.Queue()

@property
def queue_size(self):
return self._queue.qsize()

async def run(self):
async for event, args, kwargs in self._get_event_stream():
await self.event_handler.handle_event(event, *args, **kwargs)
await self._event_handler.handle_event(event, *args, **kwargs)

async def _get_event_stream(
self,
) -> AsyncIterable[Tuple[Event, Tuple[Any], Dict[str, Any]]]:
while not self.cancel_event.is_set():
event, args, kwargs = await self.queue.get()
while not self._cancel_event.is_set():
event, args, kwargs = await self._queue.get()

yield event, args, kwargs

await self.dedup.remove_event(event)
await self._dedup.remove_event(event)

self.queue.task_done()
self._queue.task_done()

async def dispatch(self, event: Event, *args, **kwargs) -> None:
if await self.dedup.add_event(event):
await self.queue.put((event, args, kwargs))
if await self._dedup.add_event(event):
await self._queue.put((event, args, kwargs))

async def wait(self) -> None:
await self.queue.join()
await self._queue.join()
34 changes: 24 additions & 10 deletions infrastructure/event_dispatcher/worker_pool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from typing import List

from core.events.base import Event

Expand All @@ -21,25 +22,38 @@ def __init__(
self.dedup = EventDedup()
self.event_handler = event_handler
self.cancel_event = cancel_event
self._num_priority_groups = num_piority_groups
self._initialize_workers(num_workers)

def _initialize_workers(self, num_workers):
self.workers = [
EventWorker(self.event_handler, self.cancel_event, self.dedup)
for _ in range(num_workers)
]

for worker in self.workers:
asyncio.create_task(worker.run())

async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None:
priority_group = self.load_balancer.determine_priority_group(
event.meta.priority
)
worker = self.workers[priority_group % len(self.workers)]
group_workers = self._distribute_workers(priority_group)

worker = min(group_workers, key=lambda worker: worker.queue_size)

await worker.dispatch(event, *args, **kwargs)

self.load_balancer.register_event(priority_group)

async def wait(self) -> None:
await asyncio.gather(*(worker.wait() for worker in self.workers))

def _initialize_workers(self, num_workers):
self.workers = [
EventWorker(self.event_handler, self.cancel_event, self.dedup)
for _ in range(num_workers * self._num_priority_groups)
]

for worker in self.workers:
asyncio.create_task(worker.run())

def _distribute_workers(self, priority_group: int) -> List[EventWorker]:
workers_per_group = len(self.workers) // self._num_priority_groups
group_start = priority_group * workers_per_group
group_end = group_start + workers_per_group

return self.workers[group_start:group_end]


0 comments on commit 97182dc

Please sign in to comment.