Skip to content

Commit

Permalink
[2.3] Changed exception log to error level for GRPC drivers (#2491)
Browse files Browse the repository at this point in the history
* Changed exception log to error level

* Use secure_format_exception
  • Loading branch information
nvidianz authored Apr 11, 2024
1 parent 9e02ba2 commit b1c7bf3
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 29 deletions.
37 changes: 18 additions & 19 deletions nvflare/fuel/f3/drivers/aio_grpc_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def send_frame(self, frame: BytesAlike):
f = Frame(seq=seq, data=bytes(frame))
self.aio_ctx.run_coro(self.oq.put(f))
except Exception as ex:
self.logger.debug(f"exception send_frame: {self}: {secure_format_exception(ex)}")
self.logger.error(f"exception send_frame: {self}: {secure_format_exception(ex)}")
if not self.closing:
raise CommError(CommError.ERROR, f"Error sending frame on conn {self}: {secure_format_exception(ex)}")

Expand All @@ -121,14 +121,14 @@ async def read_loop(self, msg_iter):
if error.code() == grpc.StatusCode.CANCELLED:
self.logger.info(f"Connection {self} is closed by peer")
else:
self.logger.info(f"Connection {self} Error: {error.details()}")
self.logger.debug(secure_format_traceback())
self.logger.error(f"Connection {self} Error: {error.details()}")
self.logger.error(secure_format_traceback())
else:
self.logger.info(f"Connection {self} is closed locally")
except Exception as ex:
if not self.closing:
self.logger.info(f"{self}: exception {type(ex)} in read_loop: {secure_format_exception(ex)}")
self.logger.debug(secure_format_traceback())
self.logger.error(f"{self}: exception {type(ex)} in read_loop: {secure_format_exception(ex)}")
self.logger.error(secure_format_traceback())

self.logger.info(f"{self}: in {ct.name}: done read_loop")

Expand All @@ -141,10 +141,10 @@ async def generate_output(self):
yield item
except Exception as ex:
if self.closing:
self.logger.info(f"{self}: connection closed by {type(ex)}: {secure_format_exception(ex)}")
self.logger.error(f"{self}: connection closed by {type(ex)}: {secure_format_exception(ex)}")
else:
self.logger.info(f"{self}: generate_output exception {type(ex)}: {secure_format_exception(ex)}")
self.logger.debug(secure_format_traceback())
self.logger.error(f"{self}: generate_output exception {type(ex)}: {secure_format_exception(ex)}")
self.logger.error(secure_format_traceback())
self.logger.debug(f"{self}: done generate_output")


Expand Down Expand Up @@ -183,9 +183,8 @@ async def Stream(self, request_iterator, context):
except asyncio.CancelledError:
self.logger.info("SERVER: RPC cancelled")
except Exception as ex:
if connection:
self.logger.info(f"{connection}: connection exception: {secure_format_exception(ex)}")
self.logger.debug(secure_format_traceback())
self.logger.error(f"{connection}: connection exception: {secure_format_exception(ex)}")
self.logger.error(secure_format_traceback())
finally:
if connection:
connection.close()
Expand Down Expand Up @@ -221,7 +220,7 @@ def __init__(self, driver, connector, aio_ctx: AioContext, options, conn_ctx: _C
self.logger.info(f"added insecure port at {addr}")
except Exception as ex:
conn_ctx.error = f"cannot listen on {addr}: {type(ex)}: {secure_format_exception(ex)}"
self.logger.debug(conn_ctx.error)
self.logger.error(conn_ctx.error)

async def start(self, conn_ctx: _ConnCtx):
self.logger.debug("starting grpc server")
Expand All @@ -236,7 +235,7 @@ async def shutdown(self):
try:
await self.grpc_server.stop(grace=0.5)
except Exception as ex:
self.logger.debug(f"exception shutdown server: {secure_format_exception(ex)}")
self.logger.error(f"exception shutdown server: {secure_format_exception(ex)}")


class AioGrpcDriver(BaseDriver):
Expand Down Expand Up @@ -276,9 +275,11 @@ async def _start_server(self, connector: ConnectorInfo, aio_ctx: AioContext, con
conn_ctx.conn = True
await self.server.start(conn_ctx)
except Exception as ex:
if not self.closing:
self.logger.debug(secure_format_traceback())
conn_ctx.error = f"failed to start server: {type(ex)}: {secure_format_exception(ex)}"
self.logger.error(conn_ctx.error)
if not self.closing:
self.logger.error(secure_format_traceback())

conn_ctx.waiter.set()

def listen(self, connector: ConnectorInfo):
Expand Down Expand Up @@ -327,14 +328,12 @@ async def _start_connect(self, connector: ConnectorInfo, aio_ctx: AioContext, co
msg_iter = stub.Stream(connection.generate_output())
conn_ctx.conn = connection
await connection.read_loop(msg_iter)
except asyncio.CancelledError:
self.logger.debug("CLIENT: RPC cancelled")
except grpc.FutureCancelledError:
self.logger.info("CLIENT: Future cancelled")
except Exception as ex:
conn_ctx.error = f"connection {connection} error: {type(ex)}: {secure_format_exception(ex)}"
self.logger.debug(conn_ctx.error)
self.logger.debug(secure_format_traceback())
self.logger.error(conn_ctx.error)
self.logger.error(secure_format_traceback())
finally:
if connection:
connection.close()
Expand Down
19 changes: 9 additions & 10 deletions nvflare/fuel/f3/drivers/grpc_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
add_StreamerServicer_to_server,
)
from nvflare.fuel.utils.obj_utils import get_logger
from nvflare.security.logging import secure_format_exception
from nvflare.security.logging import secure_format_exception, secure_format_traceback

from .base_driver import BaseDriver
from .driver_params import DriverCap, DriverParams
Expand Down Expand Up @@ -69,15 +69,14 @@ def close(self):
try:
self.context.abort(grpc.StatusCode.CANCELLED, "service closed")
except Exception as ex:
# ignore any exception when aborting
self.logger.debug(f"exception aborting GRPC context: {secure_format_exception(ex)}")
self.logger.error(f"exception aborting GRPC context: {secure_format_exception(ex)}")
self.context = None
self.logger.info("Closed GRPC context")
if self.channel:
try:
self.channel.close()
except Exception as ex:
self.logger.debug(f"exception closing GRPC channel: {secure_format_exception(ex)}")
self.logger.error(f"exception closing GRPC channel: {secure_format_exception(ex)}")
self.channel = None
self.logger.info("Closed GRPC Channel")

Expand All @@ -88,7 +87,7 @@ def send_frame(self, frame: Union[bytes, bytearray, memoryview]):
self.logger.debug(f"{self.side}: queued frame #{seq}")
self.oq.append(Frame(seq=seq, data=bytes(frame)))
except BaseException as ex:
raise CommError(CommError.ERROR, f"Error sending frame: {ex}")
raise CommError(CommError.ERROR, f"Error sending frame: {secure_format_exception(ex)}")

def read_loop(self, msg_iter):
ct = threading.current_thread()
Expand All @@ -106,7 +105,9 @@ def read_loop(self, msg_iter):
self.logger.error(f"{self.side}: Frame receiver not registered for connection: {self.name}")
except Exception as ex:
if not self.closing:
self.logger.debug(f"{self.side}: exception {type(ex)} in read_loop")
self.logger.error(f"{self}: exception {type(ex)} in read_loop: {secure_format_exception(ex)}")
self.logger.debug(secure_format_traceback())

if self.oq:
self.logger.debug(f"{self.side}: closing queue")
self.oq.close()
Expand Down Expand Up @@ -150,7 +151,7 @@ def Stream(self, request_iterator, context):
t.start()
yield from connection.generate_output()
except BaseException as ex:
self.logger.error(f"Connection closed due to error: {ex}")
self.logger.error(f"Connection closed due to error: {secure_format_exception(ex)}")
finally:
if t is not None:
t.join()
Expand Down Expand Up @@ -189,7 +190,7 @@ def __init__(
self.logger.info(f"added insecure port at {addr}")
except Exception as ex:
error = f"cannot listen on {addr}: {type(ex)}: {secure_format_exception(ex)}"
self.logger.debug(error)
self.logger.error(error)

def start(self):
self.grpc_server.start()
Expand Down Expand Up @@ -260,8 +261,6 @@ def connect(self, connector: ConnectorInfo):
self.logger.debug("CLIENT: added connection")
received = stub.Stream(connection.generate_output())
connection.read_loop(received)
except grpc.FutureCancelledError:
self.logger.debug("RPC Cancelled")
except Exception as ex:
self.logger.error(f"connection {connection} error: {type(ex)}: {secure_format_exception(ex)}")
finally:
Expand Down

0 comments on commit b1c7bf3

Please sign in to comment.