Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 20, 2024
1 parent d577fb3 commit 484901a
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions infrastructure/event_dispatcher/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,42 @@ async def handle_event(self, event: Event, *args, **kwargs) -> None:

for handler, filter_fn in handlers:
if not filter_fn or filter_fn(event):
try:
await self._call_handler(handler, event, *args, **kwargs)
except Exception as e:
self._handle_exception(handler, event, e)
await self._call_handler(handler, event, *args, **kwargs)

async def _call_handler(
self, handler: HandlerType, event: Event, *args, **kwargs
) -> None:
try:
response = await self._execute_handler(handler, event, *args, **kwargs)

self._handle_event_response(event, response)
except Exception as e:
self._handle_event_error(handler, event, e)

async def _execute_handler(
self, handler: HandlerType, event: Event, *args, **kwargs
) -> None:
if asyncio.iscoroutinefunction(handler):
response = await handler(event, *args, **kwargs)
return await handler(event, *args, **kwargs)
else:
response = await asyncio.to_thread(handler, event, *args, **kwargs)
return await asyncio.to_thread(handler, event, *args, **kwargs)

def _handle_event_response(self, event: Event, response: Any) -> None:
if isinstance(event, Query):
event.set_response(response)
elif isinstance(event, Command):
event.executed()

def _handle_exception(
self, handler: HandlerType, event: Event, exception: Exception
def _handle_event_error(
self, handler: HandlerType, event: Event, error: Exception
) -> None:
logger.error(
f"Exception encountered in event {event}:{handler} {exception}. Event added to dead letter queue."
)

if isinstance(event, Command):
event.executed()
elif isinstance(event, Query):
event.set_response(None)

self._dlq.append((event, exception))
self._dlq.append((event, error))

logger.error(
f"Exception encountered in event {event}:{handler} {error}. Event added to dead letter queue."
)

0 comments on commit 484901a

Please sign in to comment.