From c3e5cf054c3cdf6636b7b48212db6f0f72bb885d Mon Sep 17 00:00:00 2001 From: James Riehl Date: Thu, 24 Aug 2023 16:11:40 +0100 Subject: [PATCH 1/5] chore: handle exceptions in message handlers --- python/src/uagents/agent.py | 28 ++++++++++++++++++++++---- python/src/uagents/mailbox.py | 37 +++++++++++++++++++++++------------ 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index af611c60..d6c1a2e3 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -2,6 +2,7 @@ import functools from typing import Dict, List, Optional, Set, Union, Type, Tuple, Any, Coroutine import uuid +from pydantic import ValidationError import requests from cosmpy.aerial.wallet import LocalWallet, PrivateKey @@ -386,11 +387,21 @@ async def handle_message( async def _startup(self): await self._registration_loop() for handler in self._on_startup: - await handler(self._ctx) + try: + await handler(self._ctx) + except OSError as ex: + self._logger.exception(f"OS Error in startup handler: {ex}") + except RuntimeError as ex: + self._logger.exception(f"Runtime Error in startup handler: {ex}") async def _shutdown(self): for handler in self._on_shutdown: - await handler(self._ctx) + try: + await handler(self._ctx) + except OSError as ex: + self._logger.exception(f"OS Error in shutdown handler: {ex}") + except RuntimeError as ex: + self._logger.exception(f"Runtime Error in shutdown handler: {ex}") def setup(self): # register the internal agent protocol @@ -436,7 +447,11 @@ async def _process_message_queue(self): continue # parse the received message - recovered = model_class.parse_raw(message) + try: + recovered = model_class.parse_raw(message) + except ValidationError as ex: + self._logger.warning(f"Unable to parse message: {ex}") + continue context = Context( self._identity.address, @@ -475,7 +490,12 @@ async def _process_message_queue(self): continue if handler is not None: - await handler(context, sender, recovered) + try: + await handler(context, sender, recovered) + except OSError as ex: + self._logger.exception(f"OS Error in message handler: {ex}") + except RuntimeError as ex: + self._logger.exception(f"Runtime Error in message handler: {ex}") class Bureau: diff --git a/python/src/uagents/mailbox.py b/python/src/uagents/mailbox.py index 90553bf3..7524ad11 100644 --- a/python/src/uagents/mailbox.py +++ b/python/src/uagents/mailbox.py @@ -84,17 +84,26 @@ async def _handle_envelope(self, payload: dict): async def process_deletion_queue(self): async with aiohttp.ClientSession() as session: while True: - env_payload = await self._envelopes_to_delete.get() - env_url = f"{self.http_prefix}://{self.base_url}/v1/mailbox/{env_payload['uuid']}" - self._logger.debug(f"Deleting message: {env_payload}") - async with session.delete( - env_url, - headers={"Authorization": f"token {self._access_token}"}, - ) as resp: - if resp.status != 200: - self._logger.exception( - f"Failed to delete envelope from inbox: {(await resp.text())}" - ) + try: + env = await self._envelopes_to_delete.get() + env_url = ( + f"{self.http_prefix}://{self.base_url}/v1/mailbox/{env['uuid']}" + ) + self._logger.debug(f"Deleting message: {env}") + async with session.delete( + env_url, + headers={"Authorization": f"token {self._access_token}"}, + ) as resp: + if resp.status != 200: + self._logger.exception( + f"Failed to delete envelope from inbox: {(await resp.text())}" + ) + except ClientConnectorError as ex: + self._logger.warning(f"Failed to connect to mailbox server: {ex}") + except Exception as ex: + self._logger.exception( + f"Got exception while processing deletion queue: {ex}" + ) async def _poll_server(self): async with aiohttp.ClientSession() as session: @@ -133,10 +142,12 @@ async def _open_websocket_connection(self): await self._handle_envelope(msg["payload"]) except websockets.exceptions.ConnectionClosedError: - pass + self._logger.warning("Mailbox connection closed") + self._access_token = None except ConnectionRefusedError: - pass + self._logger.warning("Mailbox connection refused") + self._access_token = None async def _get_access_token(self): async with aiohttp.ClientSession() as session: From 4ac5c1a4499027e5f135bbeaf448f291a5d82c30 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Wed, 30 Aug 2023 16:46:56 +0100 Subject: [PATCH 2/5] feat: handle exceptions on context send --- python/src/uagents/context.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index 685b50a3..d42b8ec2 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -242,11 +242,18 @@ async def send_raw( env.encode_payload(json_message) env.sign(self._identity) - async with aiohttp.ClientSession() as session: - async with session.post( - endpoint, headers={"content-type": "application/json"}, data=env.json() - ) as resp: - success = resp.status == 200 + try: + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, + headers={"content-type": "application/json"}, + data=env.json(), + ) as resp: + success = resp.status == 200 + except aiohttp.ClientConnectorError as ex: + self._logger.exception(f"Failed to connect to {endpoint}: {ex}") + except Exception as ex: + self._logger.exception(f"Failed to send message to {destination}: {ex}") if not success: self._logger.exception( From d011ca3a25dfc1a262bc927f2bfca7e0655b69a4 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Thu, 31 Aug 2023 09:33:02 +0100 Subject: [PATCH 3/5] chore: move success check and catch all exceptions --- python/src/uagents/agent.py | 6 ++++++ python/src/uagents/context.py | 9 ++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index d6c1a2e3..c155abb9 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -393,6 +393,8 @@ async def _startup(self): self._logger.exception(f"OS Error in startup handler: {ex}") except RuntimeError as ex: self._logger.exception(f"Runtime Error in startup handler: {ex}") + except Exception as ex: + self._logger.exception(f"Exception in startup handler: {ex}") async def _shutdown(self): for handler in self._on_shutdown: @@ -402,6 +404,8 @@ async def _shutdown(self): self._logger.exception(f"OS Error in shutdown handler: {ex}") except RuntimeError as ex: self._logger.exception(f"Runtime Error in shutdown handler: {ex}") + except Exception as ex: + self._logger.exception(f"Exception in shutdown handler: {ex}") def setup(self): # register the internal agent protocol @@ -496,6 +500,8 @@ async def _process_message_queue(self): self._logger.exception(f"OS Error in message handler: {ex}") except RuntimeError as ex: self._logger.exception(f"Runtime Error in message handler: {ex}") + except Exception as ex: + self._logger.exception(f"Exception in message handler: {ex}") class Bureau: diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index d42b8ec2..8ccd92db 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -250,12 +250,11 @@ async def send_raw( data=env.json(), ) as resp: success = resp.status == 200 + if not success: + self._logger.exception( + f"Unable to send envelope to {destination_address} @ {endpoint}" + ) except aiohttp.ClientConnectorError as ex: self._logger.exception(f"Failed to connect to {endpoint}: {ex}") except Exception as ex: self._logger.exception(f"Failed to send message to {destination}: {ex}") - - if not success: - self._logger.exception( - f"Unable to send envelope to {destination_address} @ {endpoint}" - ) From e4340f2a767312e56f8aaf12896198bc41a864d2 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Thu, 31 Aug 2023 09:54:18 +0100 Subject: [PATCH 4/5] chore: catch all exceptions in interval handlers --- python/src/uagents/agent.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index c155abb9..9b24b13e 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -42,10 +42,12 @@ async def _run_interval(func: IntervalCallback, ctx: Context, period: float): while True: try: await func(ctx) - except OSError: - ctx.logger.exception("OS Error in interval handler") - except RuntimeError: - ctx.logger.exception("Runtime Error in interval handler") + except OSError as ex: + ctx.logger.exception(f"OS Error in interval handler: {ex}") + except RuntimeError as ex: + ctx.logger.exception(f"Runtime Error in interval handler: {ex}") + except Exception as ex: + ctx.logger.exception(f"Exception in interval handler: {ex}") await asyncio.sleep(period) @@ -397,6 +399,7 @@ async def _startup(self): self._logger.exception(f"Exception in startup handler: {ex}") async def _shutdown(self): + print("shutting down") for handler in self._on_shutdown: try: await handler(self._ctx) From 86f32addb2262e43652fe1e41ad6889f345fe052 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Thu, 31 Aug 2023 10:00:21 +0100 Subject: [PATCH 5/5] chore: remove print statement --- python/src/uagents/agent.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 9b24b13e..6e3ce246 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -399,7 +399,6 @@ async def _startup(self): self._logger.exception(f"Exception in startup handler: {ex}") async def _shutdown(self): - print("shutting down") for handler in self._on_shutdown: try: await handler(self._ctx)