Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 8 additions & 22 deletions framework/py/flwr/common/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
from .address import is_port_in_use
from .logger import log

_GRPC_BASE_OPTIONS = (
("grpc.http2.max_pings_without_data", 0),
("grpc.keepalive_permit_without_calls", 0),
)

GRPC_MAX_MESSAGE_LENGTH: int = 2_147_483_647 # == 2048 * 1024 * 1024 -1 (2GB)

INVALID_CERTIFICATES_ERR_MSG = """
Expand Down Expand Up @@ -166,32 +171,13 @@ def generic_create_grpc_server( # pylint: disable=too-many-arguments, R0914, R0
# Deconstruct tuple into servicer and function
servicer, add_servicer_to_server_fn = servicer_and_add_fn

# Possible options:
# https://github.com/grpc/grpc/blob/v1.43.x/include/grpc/impl/codegen/grpc_types.h
options = [
# Maximum number of concurrent incoming streams to allow on a http2
# connection. Int valued.
# Compose options tuple, ensure minimal allocations per call
options = (
("grpc.max_concurrent_streams", max(100, max_concurrent_workers)),
# Maximum message length that the channel can send.
# Int valued, bytes. -1 means unlimited.
("grpc.max_send_message_length", max_message_length),
# Maximum message length that the channel can receive.
# Int valued, bytes. -1 means unlimited.
("grpc.max_receive_message_length", max_message_length),
# The gRPC default for this setting is 7200000 (2 hours). Flower uses a
# customized default of 210000 (3 minutes and 30 seconds) to improve
# compatibility with popular cloud providers. Mobile Flower clients may
# choose to increase this value if their server environment allows
# long-running idle TCP connections.
("grpc.keepalive_time_ms", keepalive_time_ms),
# Setting this to zero will allow sending unlimited keepalive pings in between
# sending actual data frames.
("grpc.http2.max_pings_without_data", 0),
# Is it permissible to send keepalive pings from the client without
# any outstanding streams. More explanation here:
# https://github.com/adap/flower/pull/2197
("grpc.keepalive_permit_without_calls", 0),
]
) + _GRPC_BASE_OPTIONS

server = grpc.server(
concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_workers),
Expand Down
73 changes: 73 additions & 0 deletions framework/py/flwr/proto/clientappio_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,79 @@
from flwr.proto import message_pb2 as flwr_dot_proto_dot_message__pb2
from flwr.proto import run_pb2 as flwr_dot_proto_dot_run__pb2

_LIST_APPS_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.ListAppsToLaunch(request, context),
request_deserializer=flwr_dot_proto_dot_appio__pb2.ListAppsToLaunchRequest.FromString,
response_serializer=flwr_dot_proto_dot_appio__pb2.ListAppsToLaunchResponse.SerializeToString,
)

_REQUEST_TOKEN_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.RequestToken(request, context),
request_deserializer=flwr_dot_proto_dot_appio__pb2.RequestTokenRequest.FromString,
response_serializer=flwr_dot_proto_dot_appio__pb2.RequestTokenResponse.SerializeToString,
)

_GET_RUN_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.GetRun(request, context),
request_deserializer=flwr_dot_proto_dot_run__pb2.GetRunRequest.FromString,
response_serializer=flwr_dot_proto_dot_run__pb2.GetRunResponse.SerializeToString,
)

_PULL_CLIENTAPP_INPUTS_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.PullClientAppInputs(request, context),
request_deserializer=flwr_dot_proto_dot_appio__pb2.PullAppInputsRequest.FromString,
response_serializer=flwr_dot_proto_dot_appio__pb2.PullAppInputsResponse.SerializeToString,
)

_PUSH_CLIENTAPP_OUTPUTS_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.PushClientAppOutputs(request, context),
request_deserializer=flwr_dot_proto_dot_appio__pb2.PushAppOutputsRequest.FromString,
response_serializer=flwr_dot_proto_dot_appio__pb2.PushAppOutputsResponse.SerializeToString,
)

_PUSH_MESSAGE_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.PushMessage(request, context),
request_deserializer=flwr_dot_proto_dot_appio__pb2.PushAppMessagesRequest.FromString,
response_serializer=flwr_dot_proto_dot_appio__pb2.PushAppMessagesResponse.SerializeToString,
)

_PULL_MESSAGE_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.PullMessage(request, context),
request_deserializer=flwr_dot_proto_dot_appio__pb2.PullAppMessagesRequest.FromString,
response_serializer=flwr_dot_proto_dot_appio__pb2.PullAppMessagesResponse.SerializeToString,
)

_PUSH_OBJECT_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.PushObject(request, context),
request_deserializer=flwr_dot_proto_dot_message__pb2.PushObjectRequest.FromString,
response_serializer=flwr_dot_proto_dot_message__pb2.PushObjectResponse.SerializeToString,
)

_PULL_OBJECT_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.PullObject(request, context),
request_deserializer=flwr_dot_proto_dot_message__pb2.PullObjectRequest.FromString,
response_serializer=flwr_dot_proto_dot_message__pb2.PullObjectResponse.SerializeToString,
)

_CONFIRM_MESSAGE_RECEIVED_HANDLER = grpc.unary_unary_rpc_method_handler(
lambda self, request, context: self.ConfirmMessageReceived(request, context),
request_deserializer=flwr_dot_proto_dot_message__pb2.ConfirmMessageReceivedRequest.FromString,
response_serializer=flwr_dot_proto_dot_message__pb2.ConfirmMessageReceivedResponse.SerializeToString,
)

_RPC_METHOD_HANDLERS = {
'ListAppsToLaunch': _LIST_APPS_HANDLER,
'RequestToken': _REQUEST_TOKEN_HANDLER,
'GetRun': _GET_RUN_HANDLER,
'PullClientAppInputs': _PULL_CLIENTAPP_INPUTS_HANDLER,
'PushClientAppOutputs': _PUSH_CLIENTAPP_OUTPUTS_HANDLER,
'PushMessage': _PUSH_MESSAGE_HANDLER,
'PullMessage': _PULL_MESSAGE_HANDLER,
'PushObject': _PUSH_OBJECT_HANDLER,
'PullObject': _PULL_OBJECT_HANDLER,
'ConfirmMessageReceived': _CONFIRM_MESSAGE_RECEIVED_HANDLER,
}


class ClientAppIoStub(object):
"""Missing associated documentation comment in .proto file."""
Expand Down
4 changes: 2 additions & 2 deletions framework/py/flwr/supernode/start_client_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,16 +528,16 @@ def run_clientappio_api_grpc(
certificates: Optional[tuple[bytes, bytes, bytes]],
) -> grpc.Server:
"""Run ClientAppIo API gRPC server."""
# Direct assignment for function reference, eliminates an extra indirection
clientappio_servicer: grpc.Server = ClientAppIoServicer(
state_factory=state_factory,
ffs_factory=ffs_factory,
objectstore_factory=objectstore_factory,
)
clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server
clientappio_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(
clientappio_servicer,
clientappio_add_servicer_to_server_fn,
add_ClientAppIoServicer_to_server,
),
server_address=address,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
Expand Down