diff --git a/infrastructure/event_dispatcher/event_dedup.py b/infrastructure/event_dispatcher/event_dedup.py index f1c6f12e..70c52c94 100644 --- a/infrastructure/event_dispatcher/event_dedup.py +++ b/infrastructure/event_dispatcher/event_dedup.py @@ -1,25 +1,23 @@ import asyncio -from typing import Set + +from cachetools import TTLCache from core.events.base import Event class EventDedup: - def __init__(self): - self._events_in_queue: Set[int] = set() + def __init__(self, ttl: int = 10, maxsize: int = 2048): + self._events_in_queue = TTLCache(maxsize=maxsize, ttl=ttl) self._lock = asyncio.Lock() - async def add_event(self, event: Event) -> bool: + async def acquire(self, event: Event) -> bool: async with self._lock: key = event.meta.key - if key in self._events_in_queue: return False - - self._events_in_queue.add(key) - + self._events_in_queue[key] = True return True - async def remove_event(self, event: Event) -> None: + async def release(self, event: Event) -> None: async with self._lock: - self._events_in_queue.discard(event.meta.key) + self._events_in_queue.pop(event.meta.key, None) diff --git a/infrastructure/event_dispatcher/event_worker.py b/infrastructure/event_dispatcher/event_worker.py index e8114769..df1c874a 100644 --- a/infrastructure/event_dispatcher/event_worker.py +++ b/infrastructure/event_dispatcher/event_worker.py @@ -2,7 +2,6 @@ from typing import Any, AsyncIterable, Dict, Tuple from core.events.base import Event -from infrastructure.event_dispatcher.event_dedup import EventDedup from .event_handler import EventHandler @@ -12,11 +11,9 @@ def __init__( self, event_handler: EventHandler, cancel_event: asyncio.Event, - dedup: EventDedup, ): self._event_handler = event_handler self._cancel_event = cancel_event - self._dedup = dedup self._queue = asyncio.Queue() @property @@ -35,13 +32,10 @@ async def _get_event_stream( yield event, args, kwargs - await self._dedup.remove_event(event) - self._queue.task_done() async def dispatch(self, event: Event, *args, **kwargs) -> None: - if await self._dedup.add_event(event): - await self._queue.put((event, args, kwargs)) + await self._queue.put((event, args, kwargs)) async def wait(self) -> None: await self._queue.join() diff --git a/infrastructure/event_dispatcher/worker_pool.py b/infrastructure/event_dispatcher/worker_pool.py index ecba6ae4..5eb1aa67 100644 --- a/infrastructure/event_dispatcher/worker_pool.py +++ b/infrastructure/event_dispatcher/worker_pool.py @@ -32,11 +32,12 @@ async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None: event.meta.priority ) - group_workers = self._distribute_workers(priority_group) + if await self.dedup.acquire(event): + group_workers = self._distribute_workers(priority_group) - worker = self._choose_worker(group_workers) + worker = self._choose_worker(group_workers) - await worker.dispatch(event, *args, **kwargs) + await worker.dispatch(event, *args, **kwargs) self.load_balancer.register_event(priority_group) @@ -45,7 +46,7 @@ async def wait(self) -> None: def _initialize_workers(self, num_workers): self.workers = [ - EventWorker(self.event_handler, self.cancel_event, self.dedup) + EventWorker(self.event_handler, self.cancel_event) for _ in range(num_workers * self._num_priority_groups) ]