Skip to content

Commit

Permalink
feat: add context.send_raw function (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrriehl committed Jun 21, 2023
1 parent 716ae96 commit ef883e6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 13 deletions.
10 changes: 6 additions & 4 deletions src/uagents/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from uagents.crypto import is_user_address
from uagents.dispatch import dispatcher
from uagents.envelope import Envelope
from uagents.models import Model, ErrorMessage
from uagents.query import enclose_response
from uagents.models import ErrorMessage
from uagents.query import enclose_response_raw


HOST = "0.0.0.0"
Expand Down Expand Up @@ -163,12 +163,14 @@ async def __call__(self, scope, receive, send):

# wait for any queries to be resolved
if expects_response:
response_msg: Model = await self._queries[env.sender]
response_msg, schema_digest = await self._queries[env.sender]
if env.expires is not None:
if datetime.now() > datetime.fromtimestamp(env.expires):
response_msg = ErrorMessage(error="Query envelope expired")
sender = env.target
response = enclose_response(response_msg, sender, str(env.session))
response = enclose_response_raw(
response_msg, schema_digest, sender, str(env.session)
)
else:
response = "{}"

Expand Down
25 changes: 19 additions & 6 deletions src/uagents/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS
from uagents.crypto import Identity
from uagents.dispatch import dispatcher
from uagents.dispatch import JsonStr, dispatcher
from uagents.envelope import Envelope
from uagents.models import Model, ErrorMessage
from uagents.resolver import Resolver
Expand Down Expand Up @@ -103,10 +103,23 @@ async def send(
message: Model,
timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
):
# convert the message into object form
json_message = message.json()
schema_digest = Model.build_schema_digest(message)
await self.send_raw(
destination,
message.json(),
schema_digest,
message_type=type(message),
timeout=timeout,
)

async def send_raw(
self,
destination: str,
json_message: JsonStr,
schema_digest: str,
message_type: Optional[Type[Model]] = None,
timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
):
# check if this message is a reply
if (
self._message_received is not None
Expand All @@ -118,7 +131,7 @@ async def send(
# ensure the reply is valid
if schema_digest not in self._replies[received.schema_digest]:
self._logger.exception(
f"Outgoing message {type(message)} "
f"Outgoing message {message_type or ''} "
f"is not a valid reply to {received.message}"
)
return
Expand All @@ -127,7 +140,7 @@ async def send(
if self._message_received is None and self._interval_messages:
if schema_digest not in self._interval_messages:
self._logger.exception(
f"Outgoing message {type(message)} is not a valid interval message"
f"Outgoing message {message_type} is not a valid interval message"
)
return

Expand All @@ -140,7 +153,7 @@ async def send(

# handle queries waiting for a response
if destination in self._queries:
self._queries[destination].set_result(message)
self._queries[destination].set_result((json_message, schema_digest))
del self._queries[destination]
return

Expand Down
12 changes: 10 additions & 2 deletions src/uagents/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from uagents.config import get_logger
from uagents.crypto import generate_user_address
from uagents.dispatch import JsonStr
from uagents.envelope import Envelope
from uagents.models import Model
from uagents.resolver import Resolver, AlmanacResolver
Expand Down Expand Up @@ -68,12 +69,19 @@ async def query(


def enclose_response(message: Model, sender: str, session: str) -> str:
schema_digest = Model.build_schema_digest(message)
return enclose_response_raw(message.json(), schema_digest, sender, session)


def enclose_response_raw(
json_message: JsonStr, schema_digest: str, sender: str, session: str
) -> str:
response_env = Envelope(
version=1,
sender=sender,
target="",
session=session,
schema_digest=Model.build_schema_digest(message),
schema_digest=schema_digest,
)
response_env.encode_payload(message.json())
response_env.encode_payload(json_message)
return response_env.json()
4 changes: 3 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def setUp(self) -> None:
async def mock_process_sync_message(self, sender: str, msg: Model):
while True:
if sender in self.agent._server._queries:
self.agent._server._queries[sender].set_result(msg)
self.agent._server._queries[sender].set_result(
(msg.json(), Model.build_schema_digest(msg))
)
return

async def test_message_success(self):
Expand Down

0 comments on commit ef883e6

Please sign in to comment.