diff --git a/nvflare/fuel/f3/drivers/aio_grpc_driver.py b/nvflare/fuel/f3/drivers/aio_grpc_driver.py index 89c289af9b..e4a75b1416 100644 --- a/nvflare/fuel/f3/drivers/aio_grpc_driver.py +++ b/nvflare/fuel/f3/drivers/aio_grpc_driver.py @@ -103,7 +103,7 @@ def send_frame(self, frame: BytesAlike): f = Frame(seq=seq, data=bytes(frame)) self.aio_ctx.run_coro(self.oq.put(f)) except Exception as ex: - self.logger.debug(f"exception send_frame: {self}: {secure_format_exception(ex)}") + self.logger.error(f"exception send_frame: {self}: {secure_format_exception(ex)}") if not self.closing: raise CommError(CommError.ERROR, f"Error sending frame on conn {self}: {secure_format_exception(ex)}") @@ -121,14 +121,14 @@ async def read_loop(self, msg_iter): if error.code() == grpc.StatusCode.CANCELLED: self.logger.info(f"Connection {self} is closed by peer") else: - self.logger.info(f"Connection {self} Error: {error.details()}") - self.logger.debug(secure_format_traceback()) + self.logger.error(f"Connection {self} Error: {error.details()}") + self.logger.error(secure_format_traceback()) else: self.logger.info(f"Connection {self} is closed locally") except Exception as ex: if not self.closing: - self.logger.info(f"{self}: exception {type(ex)} in read_loop: {secure_format_exception(ex)}") - self.logger.debug(secure_format_traceback()) + self.logger.error(f"{self}: exception {type(ex)} in read_loop: {secure_format_exception(ex)}") + self.logger.error(secure_format_traceback()) self.logger.info(f"{self}: in {ct.name}: done read_loop") @@ -141,10 +141,10 @@ async def generate_output(self): yield item except Exception as ex: if self.closing: - self.logger.info(f"{self}: connection closed by {type(ex)}: {secure_format_exception(ex)}") + self.logger.error(f"{self}: connection closed by {type(ex)}: {secure_format_exception(ex)}") else: - self.logger.info(f"{self}: generate_output exception {type(ex)}: {secure_format_exception(ex)}") - self.logger.debug(secure_format_traceback()) + self.logger.error(f"{self}: generate_output exception {type(ex)}: {secure_format_exception(ex)}") + self.logger.error(secure_format_traceback()) self.logger.debug(f"{self}: done generate_output") @@ -183,9 +183,8 @@ async def Stream(self, request_iterator, context): except asyncio.CancelledError: self.logger.info("SERVER: RPC cancelled") except Exception as ex: - if connection: - self.logger.info(f"{connection}: connection exception: {secure_format_exception(ex)}") - self.logger.debug(secure_format_traceback()) + self.logger.error(f"{connection}: connection exception: {secure_format_exception(ex)}") + self.logger.error(secure_format_traceback()) finally: if connection: connection.close() @@ -221,7 +220,7 @@ def __init__(self, driver, connector, aio_ctx: AioContext, options, conn_ctx: _C self.logger.info(f"added insecure port at {addr}") except Exception as ex: conn_ctx.error = f"cannot listen on {addr}: {type(ex)}: {secure_format_exception(ex)}" - self.logger.debug(conn_ctx.error) + self.logger.error(conn_ctx.error) async def start(self, conn_ctx: _ConnCtx): self.logger.debug("starting grpc server") @@ -236,7 +235,7 @@ async def shutdown(self): try: await self.grpc_server.stop(grace=0.5) except Exception as ex: - self.logger.debug(f"exception shutdown server: {secure_format_exception(ex)}") + self.logger.error(f"exception shutdown server: {secure_format_exception(ex)}") class AioGrpcDriver(BaseDriver): @@ -276,9 +275,11 @@ async def _start_server(self, connector: ConnectorInfo, aio_ctx: AioContext, con conn_ctx.conn = True await self.server.start(conn_ctx) except Exception as ex: - if not self.closing: - self.logger.debug(secure_format_traceback()) conn_ctx.error = f"failed to start server: {type(ex)}: {secure_format_exception(ex)}" + self.logger.error(conn_ctx.error) + if not self.closing: + self.logger.error(secure_format_traceback()) + conn_ctx.waiter.set() def listen(self, connector: ConnectorInfo): @@ -327,14 +328,12 @@ async def _start_connect(self, connector: ConnectorInfo, aio_ctx: AioContext, co msg_iter = stub.Stream(connection.generate_output()) conn_ctx.conn = connection await connection.read_loop(msg_iter) - except asyncio.CancelledError: - self.logger.debug("CLIENT: RPC cancelled") except grpc.FutureCancelledError: self.logger.info("CLIENT: Future cancelled") except Exception as ex: conn_ctx.error = f"connection {connection} error: {type(ex)}: {secure_format_exception(ex)}" - self.logger.debug(conn_ctx.error) - self.logger.debug(secure_format_traceback()) + self.logger.error(conn_ctx.error) + self.logger.error(secure_format_traceback()) finally: if connection: connection.close() diff --git a/nvflare/fuel/f3/drivers/grpc_driver.py b/nvflare/fuel/f3/drivers/grpc_driver.py index f87fa89cda..60159a1a09 100644 --- a/nvflare/fuel/f3/drivers/grpc_driver.py +++ b/nvflare/fuel/f3/drivers/grpc_driver.py @@ -28,7 +28,7 @@ add_StreamerServicer_to_server, ) from nvflare.fuel.utils.obj_utils import get_logger -from nvflare.security.logging import secure_format_exception +from nvflare.security.logging import secure_format_exception, secure_format_traceback from .base_driver import BaseDriver from .driver_params import DriverCap, DriverParams @@ -69,15 +69,14 @@ def close(self): try: self.context.abort(grpc.StatusCode.CANCELLED, "service closed") except Exception as ex: - # ignore any exception when aborting - self.logger.debug(f"exception aborting GRPC context: {secure_format_exception(ex)}") + self.logger.error(f"exception aborting GRPC context: {secure_format_exception(ex)}") self.context = None self.logger.info("Closed GRPC context") if self.channel: try: self.channel.close() except Exception as ex: - self.logger.debug(f"exception closing GRPC channel: {secure_format_exception(ex)}") + self.logger.error(f"exception closing GRPC channel: {secure_format_exception(ex)}") self.channel = None self.logger.info("Closed GRPC Channel") @@ -88,7 +87,7 @@ def send_frame(self, frame: Union[bytes, bytearray, memoryview]): self.logger.debug(f"{self.side}: queued frame #{seq}") self.oq.append(Frame(seq=seq, data=bytes(frame))) except BaseException as ex: - raise CommError(CommError.ERROR, f"Error sending frame: {ex}") + raise CommError(CommError.ERROR, f"Error sending frame: {secure_format_exception(ex)}") def read_loop(self, msg_iter): ct = threading.current_thread() @@ -106,7 +105,9 @@ def read_loop(self, msg_iter): self.logger.error(f"{self.side}: Frame receiver not registered for connection: {self.name}") except Exception as ex: if not self.closing: - self.logger.debug(f"{self.side}: exception {type(ex)} in read_loop") + self.logger.error(f"{self}: exception {type(ex)} in read_loop: {secure_format_exception(ex)}") + self.logger.debug(secure_format_traceback()) + if self.oq: self.logger.debug(f"{self.side}: closing queue") self.oq.close() @@ -150,7 +151,7 @@ def Stream(self, request_iterator, context): t.start() yield from connection.generate_output() except BaseException as ex: - self.logger.error(f"Connection closed due to error: {ex}") + self.logger.error(f"Connection closed due to error: {secure_format_exception(ex)}") finally: if t is not None: t.join() @@ -189,7 +190,7 @@ def __init__( self.logger.info(f"added insecure port at {addr}") except Exception as ex: error = f"cannot listen on {addr}: {type(ex)}: {secure_format_exception(ex)}" - self.logger.debug(error) + self.logger.error(error) def start(self): self.grpc_server.start() @@ -260,8 +261,6 @@ def connect(self, connector: ConnectorInfo): self.logger.debug("CLIENT: added connection") received = stub.Stream(connection.generate_output()) connection.read_loop(received) - except grpc.FutureCancelledError: - self.logger.debug("RPC Cancelled") except Exception as ex: self.logger.error(f"connection {connection} error: {type(ex)}: {secure_format_exception(ex)}") finally: