diff --git a/infrastructure/event_dispatcher/event_dispatcher.py b/infrastructure/event_dispatcher/event_dispatcher.py index 515e9629..f2f3c4a3 100644 --- a/infrastructure/event_dispatcher/event_dispatcher.py +++ b/infrastructure/event_dispatcher/event_dispatcher.py @@ -22,33 +22,27 @@ def __call__(cls, *args, **kwargs): class EventDispatcher(metaclass=SingletonMeta): def __init__(self, config_service: AbstractConfig): - self.event_handler = EventHandler() - self.cancel_event = asyncio.Event() - self.config = config_service.get("bus") + self._event_handler = EventHandler() + self._cancel_event = asyncio.Event() self._store = EventStore(config_service) + self._command_worker_pool = None self._query_worker_pool = None self._event_worker_pool = None @property def command_worker_pool(self): - if self._command_worker_pool is None: - self._command_worker_pool = self._create_worker_pool() - return self._command_worker_pool + return self._get_worker_pool("_command_worker_pool") @property def query_worker_pool(self): - if self._query_worker_pool is None: - self._query_worker_pool = self._create_worker_pool() - return self._query_worker_pool + return self._get_worker_pool("_query_worker_pool") @property def event_worker_pool(self): - if self._event_worker_pool is None: - self._event_worker_pool = self._create_worker_pool() - return self._event_worker_pool + return self._get_worker_pool("_event_worker_pool") def register( self, @@ -56,10 +50,10 @@ def register( handler: Callable, filter_func: Optional[Callable[[Event], bool]] = None, ) -> None: - self.event_handler.register(event_class, handler, filter_func) + self._event_handler.register(event_class, handler, filter_func) def unregister(self, event_class: Type[Event], handler: Callable) -> None: - self.event_handler.unregister(event_class, handler) + self._event_handler.unregister(event_class, handler) async def execute(self, command: Command, *args, **kwargs) -> None: await self._dispatch_to_poll(command, self.command_worker_pool, *args, **kwargs) @@ -96,7 +90,7 @@ async def _dispatch_to_poll( self, event: Type[Event], worker_pool: WorkerPool, *args, **kwargs ) -> None: if isinstance(event, EventEnded): - self.cancel_event.set() + self._cancel_event.set() else: await worker_pool.dispatch_to_worker(event, *args, **kwargs) @@ -104,6 +98,12 @@ def _create_worker_pool(self) -> WorkerPool: return WorkerPool( self.config["num_workers"], self.config["piority_groups"], - self.event_handler, - self.cancel_event, + self._event_handler, + self._cancel_event, ) + + def _get_worker_pool(self, pool_attr: str) -> WorkerPool: + if getattr(self, pool_attr) is None: + setattr(self, pool_attr, self._create_worker_pool()) + + return getattr(self, pool_attr)