Skip to content

Commit

Permalink
feat: new almanac and name service registration flow (#114)
Browse files Browse the repository at this point in the history
Co-authored-by: James Riehl <[email protected]>
Co-authored-by: James Riehl <[email protected]>
  • Loading branch information
3 people committed Jul 27, 2023
1 parent 01ae9d1 commit b8569a5
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 224 deletions.
11 changes: 2 additions & 9 deletions examples/08-local-network-interaction/agent1.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from uagents.setup import fund_agent_if_low
from uagents.resolver import get_agent_address
from uagents import Agent, Context, Model


Expand All @@ -11,21 +10,15 @@ class Message(Model):


bob = Agent(
name="agent bob",
name="bob",
port=8001,
seed="agent bob secret phrase",
seed="bob secret phrase",
endpoint=["http://127.0.0.1:8001/submit"],
)

fund_agent_if_low(bob.wallet.address())


@bob.on_event("startup")
async def register_name(ctx: Context):
await bob.register_name()
print("agent bob registered address: ", get_agent_address(ctx.name))


@bob.on_message(model=Message)
async def message_handler(ctx: Context, sender: str, msg: Message):
ctx.logger.info(f"Received message from {sender}: {msg.message}")
Expand Down
4 changes: 3 additions & 1 deletion examples/08-local-network-interaction/agent2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ class Message(Model):
message: str


RECIPIENT_ADDRESS = "agent1q2kxet3vh0scsf0sm7y2erzz33cve6tv5uk63x64upw5g68kr0chkv7hw50"

alice = Agent(
name="alice",
port=8000,
Expand All @@ -18,7 +20,7 @@ class Message(Model):

@alice.on_interval(period=2.0)
async def send_message(ctx: Context):
await ctx.send("agent bob", Message(message="Hello there bob."))
await ctx.send(RECIPIENT_ADDRESS, Message(message="Hello there bob."))


@alice.on_message(model=Message)
Expand Down
44 changes: 44 additions & 0 deletions examples/13-agent-name-service/agent1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from cosmpy.aerial.wallet import LocalWallet

from uagents.network import get_ledger, get_name_service_contract
from uagents.setup import fund_agent_if_low
from uagents import Agent, Context, Model


# NOTE: Run agent1.py before running agent2.py


class Message(Model):
message: str


bob = Agent(
name="bob-0",
seed="agent bob-0 secret phrase",
port=8001,
endpoint=["http://localhost:8001/submit"],
)

ledger = get_ledger()
my_wallet = LocalWallet.from_unsafe_seed("registration test wallet")
name_service_contract = get_name_service_contract()
DOMAIN = "agent"

for wallet in [my_wallet, bob.wallet]:
fund_agent_if_low(wallet.address())


@bob.on_event("startup")
async def register_agent_name(ctx: Context):
await name_service_contract.register(
ledger, my_wallet, ctx.address, ctx.name, DOMAIN
)


@bob.on_message(model=Message)
async def message_handler(ctx: Context, sender: str, msg: Message):
ctx.logger.info(f"Received message from {sender}: {msg.message}")


if __name__ == "__main__":
bob.run()
28 changes: 28 additions & 0 deletions examples/13-agent-name-service/agent2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from uagents.setup import fund_agent_if_low
from uagents import Agent, Context, Model


class Message(Model):
message: str


alice = Agent(
name="alice-0",
seed="agent alice-0 secret phrase",
port=8000,
endpoint=["http://localhost:8000/submit"],
)


fund_agent_if_low(alice.wallet.address())


@alice.on_interval(period=5)
async def alice_interval_handler(ctx: Context):
bob_name = "bob-0.agent"
ctx.logger.info(f"Sending message to {bob_name}...")
await ctx.send(bob_name, Message(message="Hello there bob."))


if __name__ == "__main__":
alice.run()
131 changes: 48 additions & 83 deletions src/uagents/agent.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import asyncio
import functools
from typing import Dict, List, Optional, Set, Union, Type, Tuple, Any
from typing import Dict, List, Optional, Set, Union, Type, Tuple, Any, Coroutine
import uuid
import requests

from cosmpy.aerial.wallet import LocalWallet, PrivateKey
from cosmpy.crypto.address import Address
from cosmpy.aerial.contract.cosmwasm import create_cosmwasm_execute_msg
from cosmpy.aerial.client import prepare_and_broadcast_basic_transaction
from cosmpy.aerial.tx import Transaction


from uagents.asgi import ASGIServer
from uagents.context import (
Expand All @@ -28,14 +24,10 @@
from uagents.network import (
get_ledger,
get_almanac_contract,
get_service_contract,
wait_for_tx_to_complete,
)
from uagents.mailbox import MailboxClient
from uagents.config import (
CONTRACT_ALMANAC,
REGISTRATION_FEE,
REGISTRATION_DENOM,
MIN_REGISTRATION_TIME,
LEDGER_PREFIX,
parse_endpoint_config,
Expand All @@ -56,6 +48,11 @@ async def _run_interval(func: IntervalCallback, ctx: Context, period: float):
await asyncio.sleep(period)


async def _delay(coroutine: Coroutine, delay_seconds: float):
await asyncio.sleep(delay_seconds)
await coroutine


async def _handle_error(ctx: Context, destination: str, msg: ErrorMessage):
await ctx.send(destination, msg)

Expand Down Expand Up @@ -114,7 +111,6 @@ def __init__(

self._ledger = get_ledger()
self._almanac_contract = get_almanac_contract()
self._service_contract = get_service_contract()
self._storage = KeyValueStore(self.address[0:16])
self._interval_handlers: List[Tuple[IntervalCallback, float]] = []
self._interval_messages: Set[str] = set()
Expand Down Expand Up @@ -234,69 +230,56 @@ def update_loop(self, loop):
def update_queries(self, queries):
self._queries = queries

async def _register(self, ctx: Context):
agent_balance = ctx.ledger.query_bank_balance(Address(ctx.wallet.address()))

if agent_balance < REGISTRATION_FEE:
async def register(self):
if self._endpoints is None:
self._logger.warning(
f"I do not have enough funds to register on Almanac contract\
\nFund using wallet address: {self.wallet.address()}"
"I have no endpoint and cannot receive external messages"
)
return

signature = self.sign_registration()

self._logger.info("Registering on almanac contract...")

transaction = Transaction()

almanac_msg = self._almanac_contract.get_registration_msg(
self.protocols, self._endpoints, signature, self.address
)
# register if not yet registered or registration is about to expire
# or anything has changed from the last registration
if (
not self._almanac_contract.is_registered(self.address)
or self._schedule_registration() < MIN_REGISTRATION_TIME
or self._endpoints != self._almanac_contract.get_endpoints(self.address)
or list(self.protocols.keys())
!= self._almanac_contract.get_protocols(self.address)
):
agent_balance = self._ledger.query_bank_balance(
Address(self.wallet.address())
)

transaction.add_message(
create_cosmwasm_execute_msg(
ctx.wallet.address(),
CONTRACT_ALMANAC,
almanac_msg,
funds=f"{REGISTRATION_FEE}{REGISTRATION_DENOM}",
if agent_balance < REGISTRATION_FEE:
self._logger.warning(
f"I do not have enough funds to register on Almanac contract\
\nFund using wallet address: {self.wallet.address()}"
)
return
self._logger.info("Registering on almanac contract...")
signature = self.sign_registration()
await self._almanac_contract.register(
self._ledger,
self.wallet,
self.address,
list(self.protocols.keys()),
self._endpoints,
signature,
)
)
self._logger.info("Registering on almanac contract...complete")
else:
self._logger.info("Almanac registration is up to date!")

transaction = prepare_and_broadcast_basic_transaction(
ctx.ledger, transaction, ctx.wallet
async def _registration_loop(self):
await self.register()
# schedule the next registration
self._loop.create_task(
_delay(self._registration_loop(), self._schedule_registration())
)
await wait_for_tx_to_complete(transaction.tx_hash)
self._logger.info("Registering on almanac contract...complete")

def _schedule_registration(self):
return self._almanac_contract.get_expiry(self.address)

async def register_name(self):
self._logger.info("Registering name...")

if not self._almanac_contract.is_registered(self.address):
self._logger.warning(
f"Agent {self.name} needs to be registered in almanac contract to register its name"
)
return

transaction = self._service_contract.get_registration_tx(
self.name, str(self.wallet.address()), self.address
)

if transaction is None:
self._logger.error(
f"Please select another name, {self.name} is owned by another address"
)

return
transaction = prepare_and_broadcast_basic_transaction(
self._ledger, transaction, self.wallet
)
await wait_for_tx_to_complete(transaction.tx_hash)
self._logger.info("Registering name...complete")

def on_interval(
self,
period: float,
Expand Down Expand Up @@ -397,6 +380,7 @@ async def handle_message(
await self._message_queue.put((schema_digest, sender, message, session))

async def _startup(self):
await self._registration_loop()
for handler in self._on_startup:
await handler(self._ctx)

Expand Down Expand Up @@ -426,25 +410,6 @@ def start_background_tasks(self):
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)

# start the contract registration update loop
if self._endpoints is not None:
if (
not self._almanac_contract.is_registered(self.address)
or self._schedule_registration() < MIN_REGISTRATION_TIME
or self._endpoints != self._almanac_contract.get_endpoints(self.address)
):
self._loop.create_task(
_run_interval(
self._register, self._ctx, self._schedule_registration()
)
)
else:
self._logger.info("Registration up to date!")
else:
self._logger.warning(
"I have no endpoint and won't be able to receive external messages"
)

def run(self):
self.setup()
try:
Expand Down Expand Up @@ -516,7 +481,7 @@ def __init__(
endpoint: Optional[Union[str, List[str], Dict[str, dict]]] = None,
):
self._loop = asyncio.get_event_loop_policy().get_event_loop()
self._agents = []
self._agents: List[Agent] = []
self._endpoints = parse_endpoint_config(endpoint)
self._port = port or 8000
self._queries: Dict[str, asyncio.Future] = {}
Expand All @@ -527,7 +492,7 @@ def __init__(
def add(self, agent: Agent):
agent.update_loop(self._loop)
agent.update_queries(self._queries)
if agent.mailbox is not None:
if agent.agentverse["use_mailbox"]:
self._use_mailbox = True
else:
agent.update_endpoints(self._endpoints)
Expand All @@ -537,7 +502,7 @@ def run(self):
tasks = []
for agent in self._agents:
agent.setup()
if agent.mailbox is not None:
if agent.agentverse["use_mailbox"]:
tasks.append(
self._loop.create_task(
agent.mailbox_client.process_deletion_queue()
Expand Down
Loading

0 comments on commit b8569a5

Please sign in to comment.