Skip to content

Commit

Permalink
Restore non-aio grpc driver (#2077)
Browse files Browse the repository at this point in the history
* restore non-aio grpc driver

* improve grpc drivers

* changed to log_debug for job hb

* fix f-str
  • Loading branch information
yanchengnv authored Oct 16, 2023
1 parent f208127 commit 58cf5a6
Show file tree
Hide file tree
Showing 16 changed files with 502 additions and 90 deletions.
59 changes: 46 additions & 13 deletions nvflare/fuel/f3/comm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,42 @@ class VarName:
SUBNET_HEARTBEAT_INTERVAL = "subnet_heartbeat_interval"
SUBNET_TROUBLE_THRESHOLD = "subnet_trouble_threshold"
COMM_DRIVER_PATH = "comm_driver_path"
USE_AIO_GRPC_VAR_NAME = "use_aio_grpc"


class CommConfigurator:
_config_loaded = False
_configuration = None

def __init__(self):
# only load once!
self.logger = logging.getLogger(self.__class__.__name__)
config = None
for file_name in _comm_config_files:
try:
config = ConfigService.load_json(file_name)
if config:
break
except FileNotFoundError:
self.logger.debug(f"config file {file_name} not found from config path")
config = None
except Exception as ex:
self.logger.error(f"failed to load config file {file_name}: {secure_format_exception(ex)}")
config = None
self.config = config
if not CommConfigurator._config_loaded:
config = None
for file_name in _comm_config_files:
try:
config = ConfigService.load_json(file_name)
if config:
break
except FileNotFoundError:
self.logger.debug(f"config file {file_name} not found from config path")
config = None
except Exception as ex:
self.logger.error(f"failed to load config file {file_name}: {secure_format_exception(ex)}")
config = None

CommConfigurator._configuration = config
CommConfigurator._config_loaded = True
self.config = CommConfigurator._configuration

@staticmethod
def reset():
"""Reset the configurator to allow reloading config files.
Returns:
"""
CommConfigurator._config_loaded = False

def get_config(self):
return self.config
Expand Down Expand Up @@ -78,3 +96,18 @@ def get_subnet_trouble_threshold(self, default):

def get_comm_driver_path(self, default):
return ConfigService.get_str_var(VarName.COMM_DRIVER_PATH, self.config, default=default)

def use_aio_grpc(self, default):
return ConfigService.get_bool_var(VarName.USE_AIO_GRPC_VAR_NAME, self.config, default)

def get_int_var(self, name: str, default=None):
return ConfigService.get_int_var(name, self.config, default=default)

def get_float_var(self, name: str, default=None):
return ConfigService.get_float_var(name, self.config, default=default)

def get_bool_var(self, name: str, default=None):
return ConfigService.get_bool_var(name, self.config, default=default)

def get_str_var(self, name: str, default=None):
return ConfigService.get_str_var(name, self.config, default=default)
121 changes: 51 additions & 70 deletions nvflare/fuel/f3/drivers/aio_grpc_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import random
import threading
import time
from typing import Any, Dict, List
Expand All @@ -34,6 +35,7 @@
from .base_driver import BaseDriver
from .driver_params import DriverCap, DriverParams
from .grpc.streamer_pb2 import Frame
from .grpc.utils import get_grpc_client_credentials, get_grpc_server_credentials, use_aio_grpc
from .net_utils import MAX_FRAME_SIZE, get_address, get_tcp_urls, ssl_required

GRPC_DEFAULT_OPTIONS = [
Expand Down Expand Up @@ -65,6 +67,18 @@ def __init__(self, aio_ctx: AioContext, connector: ConnectorInfo, conn_props: di
self.channel = channel # for client side
self.lock = threading.Lock()

conf = CommConfigurator()
if conf.get_bool_var("simulate_unstable_network", default=False):
self.disconn = threading.Thread(target=self._disconnect, daemon=True)
self.disconn.start()

def _disconnect(self):
t = random.randint(10, 60)
self.logger.info(f"will close connection after {t} secs")
time.sleep(t)
self.logger.info(f"close connection now after {t} secs")
self.close()

def get_conn_properties(self) -> dict:
return self.conn_props

Expand Down Expand Up @@ -101,18 +115,18 @@ async def read_loop(self, msg_iter):
except grpc.aio.AioRpcError as error:
if not self.closing:
if error.code() == grpc.StatusCode.CANCELLED:
self.logger.debug(f"Connection {self} is closed by peer")
self.logger.info(f"Connection {self} is closed by peer")
else:
self.logger.debug(f"Connection {self} Error: {error.details()}")
self.logger.info(f"Connection {self} Error: {error.details()}")
self.logger.debug(secure_format_traceback())
else:
self.logger.debug(f"Connection {self} is closed locally")
self.logger.info(f"Connection {self} is closed locally")
except Exception as ex:
if not self.closing:
self.logger.debug(f"{self}: exception {type(ex)} in read_loop: {secure_format_exception(ex)}")
self.logger.info(f"{self}: exception {type(ex)} in read_loop: {secure_format_exception(ex)}")
self.logger.debug(secure_format_traceback())

self.logger.debug(f"{self}: in {ct.name}: done read_loop")
self.logger.info(f"{self}: in {ct.name}: done read_loop")

async def generate_output(self):
ct = threading.current_thread()
Expand All @@ -123,11 +137,10 @@ async def generate_output(self):
yield item
except Exception as ex:
if self.closing:
self.logger.debug(f"{self}: connection closed by {type(ex)}: {secure_format_exception(ex)}")
self.logger.info(f"{self}: connection closed by {type(ex)}: {secure_format_exception(ex)}")
else:
self.logger.debug(f"{self}: generate_output exception {type(ex)}: {secure_format_exception(ex)}")
self.logger.info(f"{self}: generate_output exception {type(ex)}: {secure_format_exception(ex)}")
self.logger.debug(secure_format_traceback())

self.logger.debug(f"{self}: done generate_output")


Expand All @@ -137,20 +150,10 @@ def __init__(self, server, aio_ctx: AioContext):
self.aio_ctx = aio_ctx
self.logger = get_logger(self)

async def _write_loop(self, connection, grpc_context):
self.logger.debug("started _write_loop")
try:
while True:
f = await connection.oq.get()
await grpc_context.write(f)
except Exception as ex:
self.logger.debug(f"_write_loop except: {type(ex)}: {secure_format_exception(ex)}")
self.logger.debug("finished _write_loop")

async def Stream(self, request_iterator, context):
connection = None
ct = threading.current_thread()
try:
ct = threading.current_thread()
self.logger.debug(f"SERVER started Stream CB in thread {ct.name}")
conn_props = {
DriverParams.PEER_ADDR.value: context.peer(),
Expand All @@ -169,23 +172,22 @@ async def Stream(self, request_iterator, context):
)
self.logger.debug(f"SERVER created connection in thread {ct.name}")
self.server.driver.add_connection(connection)
try:
await asyncio.gather(self._write_loop(connection, context), connection.read_loop(request_iterator))
except asyncio.CancelledError:
self.logger.debug("SERVER: RPC cancelled")
except Exception as ex:
self.logger.debug(f"await gather except: {type(ex)}: {secure_format_exception(ex)}")
self.logger.debug(f"SERVER: done await gather in thread {ct.name}")

self.aio_ctx.run_coro(connection.read_loop(request_iterator))
while True:
item = await connection.oq.get()
yield item
except asyncio.CancelledError:
self.logger.info("SERVER: RPC cancelled")
except Exception as ex:
self.logger.debug(f"Connection closed due to error: {secure_format_exception(ex)}")
if connection:
self.logger.info(f"{connection}: connection exception: {secure_format_exception(ex)}")
self.logger.debug(secure_format_traceback())
finally:
if connection:
with connection.lock:
connection.context = None
self.logger.debug(f"SERVER: closing connection {connection.name}")
connection.close()
self.logger.info(f"SERVER: closed connection {connection.name}")
self.server.driver.close_connection(connection)
self.logger.debug(f"SERVER: cleanly finished Stream CB in thread {ct.name}")
self.logger.info("SERVER: finished Stream CB")


class Server:
Expand All @@ -207,10 +209,12 @@ def __init__(self, driver, connector, aio_ctx: AioContext, options, conn_ctx: _C

secure = ssl_required(params)
if secure:
credentials = AioGrpcDriver.get_grpc_server_credentials(params)
credentials = get_grpc_server_credentials(params)
self.grpc_server.add_secure_port(addr, server_credentials=credentials)
self.logger.info(f"added secure port at {addr}")
else:
self.grpc_server.add_insecure_port(addr)
self.logger.info(f"added insecure port at {addr}")
except Exception as ex:
conn_ctx.error = f"cannot listen on {addr}: {type(ex)}: {secure_format_exception(ex)}"
self.logger.debug(conn_ctx.error)
Expand Down Expand Up @@ -251,7 +255,10 @@ def __init__(self):

@staticmethod
def supported_transports() -> List[str]:
return ["grpc", "grpcs"]
if use_aio_grpc():
return ["grpc", "grpcs"]
else:
return ["agrpc", "agrpcs"]

@staticmethod
def capabilities() -> Dict[str, Any]:
Expand Down Expand Up @@ -280,9 +287,9 @@ def listen(self, connector: ConnectorInfo):
time.sleep(0.1)
if conn_ctx.error:
raise CommError(code=CommError.ERROR, message=conn_ctx.error)
self.logger.debug("SERVER: waiting for server to finish")
self.logger.info(f"SERVER: listening on {connector}")
conn_ctx.waiter.wait()
self.logger.debug("SERVER: server is done")
self.logger.info(f"SERVER: server is done listening on {connector}")

async def _start_connect(self, connector: ConnectorInfo, aio_ctx: AioContext, conn_ctx: _ConnCtx):
self.logger.debug("Started _start_connect coro")
Expand All @@ -295,10 +302,12 @@ async def _start_connect(self, connector: ConnectorInfo, aio_ctx: AioContext, co
secure = ssl_required(params)
if secure:
grpc_channel = grpc.aio.secure_channel(
address, options=self.options, credentials=self.get_grpc_client_credentials(params)
address, options=self.options, credentials=get_grpc_client_credentials(params)
)
self.logger.info(f"created secure channel at {address}")
else:
grpc_channel = grpc.aio.insecure_channel(address, options=self.options)
self.logger.info(f"created insecure channel at {address}")

async with grpc_channel as channel:
self.logger.debug(f"CLIENT: connected to {address}")
Expand Down Expand Up @@ -358,6 +367,7 @@ def connect(self, connector: ConnectorInfo):
self.add_connection(conn_ctx.conn)
conn_ctx.waiter.wait()
self.close_connection(conn_ctx.conn)
self.logger.info(f"CLIENT: connection {conn_ctx.conn} closed")

def shutdown(self):
if self.closing:
Expand All @@ -374,38 +384,9 @@ def shutdown(self):
def get_urls(scheme: str, resources: dict) -> (str, str):
secure = resources.get(DriverParams.SECURE)
if secure:
scheme = "grpcs"
if use_aio_grpc():
scheme = "grpcs"
else:
scheme = "agrpcs"

return get_tcp_urls(scheme, resources)

@staticmethod
def get_grpc_client_credentials(params: dict):

root_cert = AioGrpcDriver.read_file(params.get(DriverParams.CA_CERT.value))
cert_chain = AioGrpcDriver.read_file(params.get(DriverParams.CLIENT_CERT))
private_key = AioGrpcDriver.read_file(params.get(DriverParams.CLIENT_KEY))

return grpc.ssl_channel_credentials(
certificate_chain=cert_chain, private_key=private_key, root_certificates=root_cert
)

@staticmethod
def get_grpc_server_credentials(params: dict):

root_cert = AioGrpcDriver.read_file(params.get(DriverParams.CA_CERT.value))
cert_chain = AioGrpcDriver.read_file(params.get(DriverParams.SERVER_CERT))
private_key = AioGrpcDriver.read_file(params.get(DriverParams.SERVER_KEY))

return grpc.ssl_server_credentials(
[(private_key, cert_chain)],
root_certificates=root_cert,
require_client_auth=True,
)

@staticmethod
def read_file(file_name: str):
if not file_name:
return None

with open(file_name, "rb") as f:
return f.read()
52 changes: 52 additions & 0 deletions nvflare/fuel/f3/drivers/grpc/qq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import queue


class QueueClosed(Exception):
pass


class QQ:
def __init__(self):
self.q = queue.Queue()
self.closed = False
self.logger = logging.getLogger(self.__class__.__name__)

def close(self):
self.closed = True

def append(self, i):
if self.closed:
raise QueueClosed("queue stopped")
self.q.put_nowait(i)

def __iter__(self):
return self

def __next__(self):
if self.closed:
raise StopIteration()
while True:
try:
return self.q.get(block=True, timeout=0.1)
except queue.Empty:
if self.closed:
self.logger.debug("Queue closed - stop iteration")
raise StopIteration()
except Exception as e:
self.logger.error(f"queue exception {type(e)}")
raise e
Loading

0 comments on commit 58cf5a6

Please sign in to comment.