Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 20, 2024
1 parent 484901a commit 639e56e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 21 deletions.
18 changes: 8 additions & 10 deletions infrastructure/event_dispatcher/event_dedup.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
import asyncio
from typing import Set

from cachetools import TTLCache

from core.events.base import Event


class EventDedup:
def __init__(self):
self._events_in_queue: Set[int] = set()
def __init__(self, ttl: int = 10, maxsize: int = 2048):
self._events_in_queue = TTLCache(maxsize=maxsize, ttl=ttl)
self._lock = asyncio.Lock()

async def add_event(self, event: Event) -> bool:
async def acquire(self, event: Event) -> bool:
async with self._lock:
key = event.meta.key

if key in self._events_in_queue:
return False

self._events_in_queue.add(key)

self._events_in_queue[key] = True
return True

async def remove_event(self, event: Event) -> None:
async def release(self, event: Event) -> None:
async with self._lock:
self._events_in_queue.discard(event.meta.key)
self._events_in_queue.pop(event.meta.key, None)
8 changes: 1 addition & 7 deletions infrastructure/event_dispatcher/event_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Any, AsyncIterable, Dict, Tuple

from core.events.base import Event
from infrastructure.event_dispatcher.event_dedup import EventDedup

from .event_handler import EventHandler

Expand All @@ -12,11 +11,9 @@ def __init__(
self,
event_handler: EventHandler,
cancel_event: asyncio.Event,
dedup: EventDedup,
):
self._event_handler = event_handler
self._cancel_event = cancel_event
self._dedup = dedup
self._queue = asyncio.Queue()

@property
Expand All @@ -35,13 +32,10 @@ async def _get_event_stream(

yield event, args, kwargs

await self._dedup.remove_event(event)

self._queue.task_done()

async def dispatch(self, event: Event, *args, **kwargs) -> None:
if await self._dedup.add_event(event):
await self._queue.put((event, args, kwargs))
await self._queue.put((event, args, kwargs))

async def wait(self) -> None:
await self._queue.join()
9 changes: 5 additions & 4 deletions infrastructure/event_dispatcher/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None:
event.meta.priority
)

group_workers = self._distribute_workers(priority_group)
if await self.dedup.acquire(event):
group_workers = self._distribute_workers(priority_group)

worker = self._choose_worker(group_workers)
worker = self._choose_worker(group_workers)

await worker.dispatch(event, *args, **kwargs)
await worker.dispatch(event, *args, **kwargs)

self.load_balancer.register_event(priority_group)

Expand All @@ -45,7 +46,7 @@ async def wait(self) -> None:

def _initialize_workers(self, num_workers):
self.workers = [
EventWorker(self.event_handler, self.cancel_event, self.dedup)
EventWorker(self.event_handler, self.cancel_event)
for _ in range(num_workers * self._num_priority_groups)
]

Expand Down

0 comments on commit 639e56e

Please sign in to comment.