Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 16, 2024
1 parent 6558039 commit 00a0e25
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions infrastructure/event_dispatcher/event_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,38 @@ def __call__(cls, *args, **kwargs):

class EventDispatcher(metaclass=SingletonMeta):
def __init__(self, config_service: AbstractConfig):
self.event_handler = EventHandler()
self.cancel_event = asyncio.Event()

self.config = config_service.get("bus")

self._event_handler = EventHandler()
self._cancel_event = asyncio.Event()
self._store = EventStore(config_service)

self._command_worker_pool = None
self._query_worker_pool = None
self._event_worker_pool = None

@property
def command_worker_pool(self):
if self._command_worker_pool is None:
self._command_worker_pool = self._create_worker_pool()
return self._command_worker_pool
return self._get_worker_pool("_command_worker_pool")

@property
def query_worker_pool(self):
if self._query_worker_pool is None:
self._query_worker_pool = self._create_worker_pool()
return self._query_worker_pool
return self._get_worker_pool("_query_worker_pool")

@property
def event_worker_pool(self):
if self._event_worker_pool is None:
self._event_worker_pool = self._create_worker_pool()
return self._event_worker_pool
return self._get_worker_pool("_event_worker_pool")

def register(
self,
event_class: Type[Event],
handler: Callable,
filter_func: Optional[Callable[[Event], bool]] = None,
) -> None:
self.event_handler.register(event_class, handler, filter_func)
self._event_handler.register(event_class, handler, filter_func)

def unregister(self, event_class: Type[Event], handler: Callable) -> None:
self.event_handler.unregister(event_class, handler)
self._event_handler.unregister(event_class, handler)

async def execute(self, command: Command, *args, **kwargs) -> None:
await self._dispatch_to_poll(command, self.command_worker_pool, *args, **kwargs)
Expand Down Expand Up @@ -96,14 +90,20 @@ async def _dispatch_to_poll(
self, event: Type[Event], worker_pool: WorkerPool, *args, **kwargs
) -> None:
if isinstance(event, EventEnded):
self.cancel_event.set()
self._cancel_event.set()
else:
await worker_pool.dispatch_to_worker(event, *args, **kwargs)

def _create_worker_pool(self) -> WorkerPool:
return WorkerPool(
self.config["num_workers"],
self.config["piority_groups"],
self.event_handler,
self.cancel_event,
self._event_handler,
self._cancel_event,
)

def _get_worker_pool(self, pool_attr: str) -> WorkerPool:
if getattr(self, pool_attr) is None:
setattr(self, pool_attr, self._create_worker_pool())

return getattr(self, pool_attr)

0 comments on commit 00a0e25

Please sign in to comment.