From c3e5cf054c3cdf6636b7b48212db6f0f72bb885d Mon Sep 17 00:00:00 2001 From: James Riehl Date: Thu, 24 Aug 2023 16:11:40 +0100 Subject: [PATCH] chore: handle exceptions in message handlers --- python/src/uagents/agent.py | 28 ++++++++++++++++++++++---- python/src/uagents/mailbox.py | 37 +++++++++++++++++++++++------------ 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index af611c60..d6c1a2e3 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -2,6 +2,7 @@ import functools from typing import Dict, List, Optional, Set, Union, Type, Tuple, Any, Coroutine import uuid +from pydantic import ValidationError import requests from cosmpy.aerial.wallet import LocalWallet, PrivateKey @@ -386,11 +387,21 @@ async def handle_message( async def _startup(self): await self._registration_loop() for handler in self._on_startup: - await handler(self._ctx) + try: + await handler(self._ctx) + except OSError as ex: + self._logger.exception(f"OS Error in startup handler: {ex}") + except RuntimeError as ex: + self._logger.exception(f"Runtime Error in startup handler: {ex}") async def _shutdown(self): for handler in self._on_shutdown: - await handler(self._ctx) + try: + await handler(self._ctx) + except OSError as ex: + self._logger.exception(f"OS Error in shutdown handler: {ex}") + except RuntimeError as ex: + self._logger.exception(f"Runtime Error in shutdown handler: {ex}") def setup(self): # register the internal agent protocol @@ -436,7 +447,11 @@ async def _process_message_queue(self): continue # parse the received message - recovered = model_class.parse_raw(message) + try: + recovered = model_class.parse_raw(message) + except ValidationError as ex: + self._logger.warning(f"Unable to parse message: {ex}") + continue context = Context( self._identity.address, @@ -475,7 +490,12 @@ async def _process_message_queue(self): continue if handler is not None: - await handler(context, sender, recovered) + try: + await handler(context, sender, recovered) + except OSError as ex: + self._logger.exception(f"OS Error in message handler: {ex}") + except RuntimeError as ex: + self._logger.exception(f"Runtime Error in message handler: {ex}") class Bureau: diff --git a/python/src/uagents/mailbox.py b/python/src/uagents/mailbox.py index 90553bf3..7524ad11 100644 --- a/python/src/uagents/mailbox.py +++ b/python/src/uagents/mailbox.py @@ -84,17 +84,26 @@ async def _handle_envelope(self, payload: dict): async def process_deletion_queue(self): async with aiohttp.ClientSession() as session: while True: - env_payload = await self._envelopes_to_delete.get() - env_url = f"{self.http_prefix}://{self.base_url}/v1/mailbox/{env_payload['uuid']}" - self._logger.debug(f"Deleting message: {env_payload}") - async with session.delete( - env_url, - headers={"Authorization": f"token {self._access_token}"}, - ) as resp: - if resp.status != 200: - self._logger.exception( - f"Failed to delete envelope from inbox: {(await resp.text())}" - ) + try: + env = await self._envelopes_to_delete.get() + env_url = ( + f"{self.http_prefix}://{self.base_url}/v1/mailbox/{env['uuid']}" + ) + self._logger.debug(f"Deleting message: {env}") + async with session.delete( + env_url, + headers={"Authorization": f"token {self._access_token}"}, + ) as resp: + if resp.status != 200: + self._logger.exception( + f"Failed to delete envelope from inbox: {(await resp.text())}" + ) + except ClientConnectorError as ex: + self._logger.warning(f"Failed to connect to mailbox server: {ex}") + except Exception as ex: + self._logger.exception( + f"Got exception while processing deletion queue: {ex}" + ) async def _poll_server(self): async with aiohttp.ClientSession() as session: @@ -133,10 +142,12 @@ async def _open_websocket_connection(self): await self._handle_envelope(msg["payload"]) except websockets.exceptions.ConnectionClosedError: - pass + self._logger.warning("Mailbox connection closed") + self._access_token = None except ConnectionRefusedError: - pass + self._logger.warning("Mailbox connection refused") + self._access_token = None async def _get_access_token(self): async with aiohttp.ClientSession() as session: