Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add context.send_raw function #107

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/uagents/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from uagents.crypto import is_user_address
from uagents.dispatch import dispatcher
from uagents.envelope import Envelope
from uagents.models import Model, ErrorMessage
from uagents.query import enclose_response
from uagents.models import ErrorMessage
from uagents.query import enclose_response_raw


HOST = "0.0.0.0"
Expand Down Expand Up @@ -163,12 +163,14 @@ async def __call__(self, scope, receive, send):

# wait for any queries to be resolved
if expects_response:
response_msg: Model = await self._queries[env.sender]
response_msg, schema_digest = await self._queries[env.sender]
if env.expires is not None:
if datetime.now() > datetime.fromtimestamp(env.expires):
response_msg = ErrorMessage(error="Query envelope expired")
sender = env.target
response = enclose_response(response_msg, sender, str(env.session))
response = enclose_response_raw(
response_msg, schema_digest, sender, str(env.session)
)
else:
response = "{}"

Expand Down
25 changes: 19 additions & 6 deletions src/uagents/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS
from uagents.crypto import Identity
from uagents.dispatch import dispatcher
from uagents.dispatch import JsonStr, dispatcher
from uagents.envelope import Envelope
from uagents.models import Model, ErrorMessage
from uagents.resolver import Resolver
Expand Down Expand Up @@ -103,10 +103,23 @@ async def send(
message: Model,
timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
):
# convert the message into object form
json_message = message.json()
schema_digest = Model.build_schema_digest(message)
await self.send_raw(
destination,
message.json(),
schema_digest,
message_type=type(message),
timeout=timeout,
)

async def send_raw(
self,
destination: str,
json_message: JsonStr,
schema_digest: str,
message_type: Optional[Type[Model]] = None,
timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
):
# check if this message is a reply
if (
self._message_received is not None
Expand All @@ -118,7 +131,7 @@ async def send(
# ensure the reply is valid
if schema_digest not in self._replies[received.schema_digest]:
self._logger.exception(
f"Outgoing message {type(message)} "
f"Outgoing message {message_type or ''} "
f"is not a valid reply to {received.message}"
)
return
Expand All @@ -127,7 +140,7 @@ async def send(
if self._message_received is None and self._interval_messages:
if schema_digest not in self._interval_messages:
self._logger.exception(
f"Outgoing message {type(message)} is not a valid interval message"
f"Outgoing message {message_type} is not a valid interval message"
)
return

Expand All @@ -140,7 +153,7 @@ async def send(

# handle queries waiting for a response
if destination in self._queries:
self._queries[destination].set_result(message)
self._queries[destination].set_result((json_message, schema_digest))
del self._queries[destination]
return

Expand Down
12 changes: 10 additions & 2 deletions src/uagents/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from uagents.config import get_logger
from uagents.crypto import generate_user_address
from uagents.dispatch import JsonStr
from uagents.envelope import Envelope
from uagents.models import Model
from uagents.resolver import Resolver, AlmanacResolver
Expand Down Expand Up @@ -68,12 +69,19 @@ async def query(


def enclose_response(message: Model, sender: str, session: str) -> str:
schema_digest = Model.build_schema_digest(message)
return enclose_response_raw(message.json(), schema_digest, sender, session)


def enclose_response_raw(
json_message: JsonStr, schema_digest: str, sender: str, session: str
) -> str:
response_env = Envelope(
version=1,
sender=sender,
target="",
session=session,
schema_digest=Model.build_schema_digest(message),
schema_digest=schema_digest,
)
response_env.encode_payload(message.json())
response_env.encode_payload(json_message)
return response_env.json()
4 changes: 3 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def setUp(self) -> None:
async def mock_process_sync_message(self, sender: str, msg: Model):
while True:
if sender in self.agent._server._queries:
self.agent._server._queries[sender].set_result(msg)
self.agent._server._queries[sender].set_result(
(msg.json(), Model.build_schema_digest(msg))
)
return

async def test_message_success(self):
Expand Down