Skip to content

Commit 3053a3f

Browse files
author
Masahiro Tanaka
committed
add non-blocking client api
1 parent ed265f2 commit 3053a3f

File tree

1 file changed

+23
-8
lines changed

1 file changed

+23
-8
lines changed

mii/client.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ def create_channel(host, port):
5151
GRPC_MAX_MSG_SIZE)])
5252

5353

54+
class QueryResultFuture():
55+
def __init__(self, asyncio_loop, coro):
56+
self.asyncio_loop = asyncio_loop
57+
self.coro = coro
58+
59+
def result(self):
60+
return self.asyncio_loop.run_until_complete(self.coro)
61+
62+
5463
class MIIClient():
5564
"""
5665
Client to send queries to a single endpoint.
@@ -73,11 +82,15 @@ async def _request_async_response(self, request_dict, **query_kwargs):
7382
proto_response
7483
) if "unpack_response_from_proto" in conversions else proto_response
7584

76-
def query(self, request_dict, **query_kwargs):
77-
return self.asyncio_loop.run_until_complete(
85+
def query_async(self, request_dict, **query_kwargs):
86+
return QueryResultFuture(
87+
self.asyncio_loop,
7888
self._request_async_response(request_dict,
7989
**query_kwargs))
8090

91+
def query(self, request_dict, **query_kwargs):
92+
return self.query_async(request_dict, **query_kwargs).result()
93+
8194
async def terminate_async(self):
8295
await self.stub.Terminate(
8396
modelresponse_pb2.google_dot_protobuf_dot_empty__pb2.Empty())
@@ -106,7 +119,13 @@ async def _query_in_tensor_parallel(self, request_string, query_kwargs):
106119
**query_kwargs)))
107120

108121
await responses[0]
109-
return responses[0]
122+
return responses[0].result()
123+
124+
def query_async(self, request_dict, **query_kwargs):
125+
return QueryResultFuture(
126+
self.asyncio_loop,
127+
self._query_in_tensor_parallel(request_dict,
128+
query_kwargs))
110129

111130
def query(self, request_dict, **query_kwargs):
112131
"""Query a local deployment:
@@ -121,11 +140,7 @@ def query(self, request_dict, **query_kwargs):
121140
Returns:
122141
response: Response of the model
123142
"""
124-
response = self.asyncio_loop.run_until_complete(
125-
self._query_in_tensor_parallel(request_dict,
126-
query_kwargs))
127-
ret = response.result()
128-
return ret
143+
return self.query_async(request_dict, **query_kwargs).result()
129144

130145
def terminate(self):
131146
"""Terminates the deployment"""

0 commit comments

Comments
 (0)