From 784dd65bf7a0c05210974e7764f52a6246e3556d Mon Sep 17 00:00:00 2001 From: Archento Date: Tue, 25 Jul 2023 12:07:55 +0200 Subject: [PATCH 1/5] feat: add broadcast to agents via protocol --- src/uagents/config.py | 1 + src/uagents/context.py | 49 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/uagents/config.py b/src/uagents/config.py index b56e3414..9dd23f9a 100644 --- a/src/uagents/config.py +++ b/src/uagents/config.py @@ -27,6 +27,7 @@ class AgentNetwork(Enum): AGENT_NETWORK = AgentNetwork.FETCHAI_TESTNET AGENTVERSE_URL = "https://agentverse.ai" +ALMANAC_API_URL = "https://staging.agentverse.ai/v1/almanac/" MAILBOX_POLL_INTERVAL_SECONDS = 1.0 DEFAULT_ENVELOPE_TIMEOUT_SECONDS = 30 diff --git a/src/uagents/context.py b/src/uagents/context.py index b88f779b..83d3c165 100644 --- a/src/uagents/context.py +++ b/src/uagents/context.py @@ -1,4 +1,5 @@ from __future__ import annotations + import asyncio import logging import uuid @@ -7,14 +8,15 @@ from typing import Dict, Set, Optional, Callable, Any, Awaitable, Type, TYPE_CHECKING import aiohttp +import requests from cosmpy.aerial.client import LedgerClient from cosmpy.aerial.wallet import LocalWallet -from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS +from uagents.config import ALMANAC_API_URL, DEFAULT_ENVELOPE_TIMEOUT_SECONDS from uagents.crypto import Identity from uagents.dispatch import JsonStr, dispatcher from uagents.envelope import Envelope -from uagents.models import Model, ErrorMessage +from uagents.models import ErrorMessage, Model from uagents.resolver import Resolver from uagents.storage import KeyValueStore @@ -97,6 +99,22 @@ def get_message_protocol(self, message_schema_digest) -> Optional[str]: return protocol_digest return None + def get_agents_by_protocol(self, protocol_digest: str) -> list: + if not isinstance(protocol_digest, str) or not protocol_digest.startswith( + "proto:" + ): + self.logger.error(f"Invalid protocol digest: {protocol_digest}") + return [] + response = requests.post( + url=ALMANAC_API_URL + "search", + json={"text": protocol_digest[6:]}, + timeout=DEFAULT_ENVELOPE_TIMEOUT_SECONDS, + ) + if response.status_code == 200: + data = response.json() + return [agent["address"] for agent in data if agent["status"] == "local"] + return [] + async def send( self, destination: str, @@ -104,7 +122,7 @@ async def send( timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, ): schema_digest = Model.build_schema_digest(message) - await self.send_raw( + await self._send_raw( destination, message.json(), schema_digest, @@ -112,7 +130,30 @@ async def send( timeout=timeout, ) - async def send_raw( + async def broadcast( + self, + destination: str, + message: Model, + timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, + ): + agents = self.get_agents_by_protocol(destination) + if not agents: + self.logger.error(f"No active agents found for: {destination}") + return + schema_digest = Model.build_schema_digest(message) + for address in agents: + try: + await self._send_raw( + address, + message.json(), + schema_digest, + message_type=type(message), + timeout=timeout, + ) + except Exception as e: + self.logger.error(f"Error sending message to {address}: {e}") + + async def _send_raw( self, destination: str, json_message: JsonStr, From 84802d4bd67ce4dc8eeb9cfe9d23afbea86a524e Mon Sep 17 00:00:00 2001 From: Archento Date: Tue, 8 Aug 2023 17:23:39 +0200 Subject: [PATCH 2/5] fix review comments --- src/uagents/config.py | 2 +- src/uagents/context.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/uagents/config.py b/src/uagents/config.py index 9dd23f9a..aeed3362 100644 --- a/src/uagents/config.py +++ b/src/uagents/config.py @@ -27,7 +27,7 @@ class AgentNetwork(Enum): AGENT_NETWORK = AgentNetwork.FETCHAI_TESTNET AGENTVERSE_URL = "https://agentverse.ai" -ALMANAC_API_URL = "https://staging.agentverse.ai/v1/almanac/" +ALMANAC_API_URL = AGENTVERSE_URL + "/v1/almanac/" MAILBOX_POLL_INTERVAL_SECONDS = 1.0 DEFAULT_ENVELOPE_TIMEOUT_SECONDS = 30 diff --git a/src/uagents/context.py b/src/uagents/context.py index 83d3c165..85cb057a 100644 --- a/src/uagents/context.py +++ b/src/uagents/context.py @@ -122,7 +122,7 @@ async def send( timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, ): schema_digest = Model.build_schema_digest(message) - await self._send_raw( + await self.send_raw( destination, message.json(), schema_digest, @@ -143,7 +143,7 @@ async def broadcast( schema_digest = Model.build_schema_digest(message) for address in agents: try: - await self._send_raw( + await self.send_raw( address, message.json(), schema_digest, @@ -153,7 +153,7 @@ async def broadcast( except Exception as e: self.logger.error(f"Error sending message to {address}: {e}") - async def _send_raw( + async def send_raw( self, destination: str, json_message: JsonStr, From b76f7678d323cb2dce09955c86bf90ee6477b159 Mon Sep 17 00:00:00 2001 From: Archento Date: Wed, 9 Aug 2023 14:48:02 +0200 Subject: [PATCH 3/5] add more specific exception --- src/uagents/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/uagents/context.py b/src/uagents/context.py index 85cb057a..3d07f86a 100644 --- a/src/uagents/context.py +++ b/src/uagents/context.py @@ -150,8 +150,8 @@ async def broadcast( message_type=type(message), timeout=timeout, ) - except Exception as e: - self.logger.error(f"Error sending message to {address}: {e}") + except asyncio.exceptions.CancelledError: + self.logger.error(f"Error sending message to {address}") async def send_raw( self, From d433cd21554b0ef134e3eac1eb693c5e23df9a16 Mon Sep 17 00:00:00 2001 From: Archento Date: Mon, 21 Aug 2023 18:09:10 +0200 Subject: [PATCH 4/5] fix for review comments --- src/uagents/config.py | 1 + src/uagents/context.py | 61 ++++++++++++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/src/uagents/config.py b/src/uagents/config.py index aeed3362..4989b265 100644 --- a/src/uagents/config.py +++ b/src/uagents/config.py @@ -31,6 +31,7 @@ class AgentNetwork(Enum): MAILBOX_POLL_INTERVAL_SECONDS = 1.0 DEFAULT_ENVELOPE_TIMEOUT_SECONDS = 30 +DEFAULT_SEARCH_LIMIT = 100 def parse_endpoint_config( diff --git a/src/uagents/context.py b/src/uagents/context.py index 3d07f86a..863942ec 100644 --- a/src/uagents/context.py +++ b/src/uagents/context.py @@ -5,14 +5,28 @@ import uuid from dataclasses import dataclass from time import time -from typing import Dict, Set, Optional, Callable, Any, Awaitable, Type, TYPE_CHECKING +from typing import ( + Dict, + List, + Set, + Optional, + Callable, + Any, + Awaitable, + Type, + TYPE_CHECKING, +) import aiohttp import requests from cosmpy.aerial.client import LedgerClient from cosmpy.aerial.wallet import LocalWallet -from uagents.config import ALMANAC_API_URL, DEFAULT_ENVELOPE_TIMEOUT_SECONDS +from uagents.config import ( + ALMANAC_API_URL, + DEFAULT_ENVELOPE_TIMEOUT_SECONDS, + DEFAULT_SEARCH_LIMIT, +) from uagents.crypto import Identity from uagents.dispatch import JsonStr, dispatcher from uagents.envelope import Envelope @@ -99,12 +113,14 @@ def get_message_protocol(self, message_schema_digest) -> Optional[str]: return protocol_digest return None - def get_agents_by_protocol(self, protocol_digest: str) -> list: - if not isinstance(protocol_digest, str) or not protocol_digest.startswith( - "proto:" - ): + def get_agents_by_protocol( + self, protocol_digest: str, limit: Optional[int] = None + ) -> List[str]: + if not isinstance( + protocol_digest, str + ) or not protocol_digest.startswith("proto:"): self.logger.error(f"Invalid protocol digest: {protocol_digest}") - return [] + raise ValueError("Invalid protocol digest") response = requests.post( url=ALMANAC_API_URL + "search", json={"text": protocol_digest[6:]}, @@ -112,7 +128,12 @@ def get_agents_by_protocol(self, protocol_digest: str) -> list: ) if response.status_code == 200: data = response.json() - return [agent["address"] for agent in data if agent["status"] == "local"] + agents = [ + agent["address"] + for agent in data + if agent["status"] == "local" + ] + return agents[:limit] return [] async def send( @@ -130,28 +151,34 @@ async def send( timeout=timeout, ) - async def broadcast( + async def experimental_broadcast( self, - destination: str, + destination_protocol: str, message: Model, + limit: Optional[int] = DEFAULT_SEARCH_LIMIT, timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, ): - agents = self.get_agents_by_protocol(destination) + agents = self.get_agents_by_protocol(destination_protocol, limit=limit) if not agents: - self.logger.error(f"No active agents found for: {destination}") + self.logger.error( + f"No active agents found for: {destination_protocol}" + ) return schema_digest = Model.build_schema_digest(message) - for address in agents: - try: - await self.send_raw( + futures = await asyncio.gather( + *[ + self.send_raw( address, message.json(), schema_digest, message_type=type(message), timeout=timeout, ) - except asyncio.exceptions.CancelledError: - self.logger.error(f"Error sending message to {address}") + for address in agents + ] + ) + self.logger.debug(f"Sent {len(futures)} messages") + async def send_raw( self, From f2cb03f0677cf4a10b579baa2d91ef9419c9e12c Mon Sep 17 00:00:00 2001 From: Archento Date: Mon, 21 Aug 2023 18:14:53 +0200 Subject: [PATCH 5/5] run black --- src/uagents/context.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/uagents/context.py b/src/uagents/context.py index 863942ec..685b50a3 100644 --- a/src/uagents/context.py +++ b/src/uagents/context.py @@ -116,9 +116,9 @@ def get_message_protocol(self, message_schema_digest) -> Optional[str]: def get_agents_by_protocol( self, protocol_digest: str, limit: Optional[int] = None ) -> List[str]: - if not isinstance( - protocol_digest, str - ) or not protocol_digest.startswith("proto:"): + if not isinstance(protocol_digest, str) or not protocol_digest.startswith( + "proto:" + ): self.logger.error(f"Invalid protocol digest: {protocol_digest}") raise ValueError("Invalid protocol digest") response = requests.post( @@ -128,11 +128,7 @@ def get_agents_by_protocol( ) if response.status_code == 200: data = response.json() - agents = [ - agent["address"] - for agent in data - if agent["status"] == "local" - ] + agents = [agent["address"] for agent in data if agent["status"] == "local"] return agents[:limit] return [] @@ -160,9 +156,7 @@ async def experimental_broadcast( ): agents = self.get_agents_by_protocol(destination_protocol, limit=limit) if not agents: - self.logger.error( - f"No active agents found for: {destination_protocol}" - ) + self.logger.error(f"No active agents found for: {destination_protocol}") return schema_digest = Model.build_schema_digest(message) futures = await asyncio.gather( @@ -179,7 +173,6 @@ async def experimental_broadcast( ) self.logger.debug(f"Sent {len(futures)} messages") - async def send_raw( self, destination: str,