diff --git a/infrastructure/event_dispatcher/event_handler.py b/infrastructure/event_dispatcher/event_handler.py index f9ffcb74..95fe76e8 100644 --- a/infrastructure/event_dispatcher/event_handler.py +++ b/infrastructure/event_dispatcher/event_handler.py @@ -43,34 +43,42 @@ async def handle_event(self, event: Event, *args, **kwargs) -> None: for handler, filter_fn in handlers: if not filter_fn or filter_fn(event): - try: - await self._call_handler(handler, event, *args, **kwargs) - except Exception as e: - self._handle_exception(handler, event, e) + await self._call_handler(handler, event, *args, **kwargs) async def _call_handler( self, handler: HandlerType, event: Event, *args, **kwargs + ) -> None: + try: + response = await self._execute_handler(handler, event, *args, **kwargs) + + self._handle_event_response(event, response) + except Exception as e: + self._handle_event_error(handler, event, e) + + async def _execute_handler( + self, handler: HandlerType, event: Event, *args, **kwargs ) -> None: if asyncio.iscoroutinefunction(handler): - response = await handler(event, *args, **kwargs) + return await handler(event, *args, **kwargs) else: - response = await asyncio.to_thread(handler, event, *args, **kwargs) + return await asyncio.to_thread(handler, event, *args, **kwargs) + def _handle_event_response(self, event: Event, response: Any) -> None: if isinstance(event, Query): event.set_response(response) elif isinstance(event, Command): event.executed() - def _handle_exception( - self, handler: HandlerType, event: Event, exception: Exception + def _handle_event_error( + self, handler: HandlerType, event: Event, error: Exception ) -> None: - logger.error( - f"Exception encountered in event {event}:{handler} {exception}. Event added to dead letter queue." - ) - if isinstance(event, Command): event.executed() elif isinstance(event, Query): event.set_response(None) - self._dlq.append((event, exception)) + self._dlq.append((event, error)) + + logger.error( + f"Exception encountered in event {event}:{handler} {error}. Event added to dead letter queue." + )