Skip to content

Commit

Permalink
Added debug headers for all message route in CoreCell (NVIDIA#2301)
Browse files Browse the repository at this point in the history
  • Loading branch information
nvidianz authored Jan 23, 2024
1 parent a0e8523 commit 9c3cf18
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions nvflare/fuel/f3/cellnet/core_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,18 @@ def _send_target_messages(
if ep:
reachable_targets[t] = ep
else:
self.log_error(f"cannot send to '{t}': {err}", tm.message)
msg = Message(headers=copy.copy(tm.message.headers), payload=tm.message.payload)
msg.add_headers(
{
MessageHeaderKey.CHANNEL: tm.channel,
MessageHeaderKey.TOPIC: tm.topic,
MessageHeaderKey.FROM_CELL: self.my_info.fqcn,
MessageHeaderKey.TO_CELL: t,
MessageHeaderKey.ORIGIN: self.my_info.fqcn,
MessageHeaderKey.DESTINATION: t,
}
)
self.log_error(f"cannot send to '{t}': {err}", msg)
send_errs[t] = err

for t, ep in reachable_targets.items():
Expand All @@ -1170,12 +1181,12 @@ def _send_target_messages(
{
MessageHeaderKey.CHANNEL: tm.channel,
MessageHeaderKey.TOPIC: tm.topic,
MessageHeaderKey.ORIGIN: self.my_info.fqcn,
MessageHeaderKey.FROM_CELL: self.my_info.fqcn,
MessageHeaderKey.TO_CELL: ep.name,
MessageHeaderKey.ORIGIN: self.my_info.fqcn,
MessageHeaderKey.DESTINATION: t,
MessageHeaderKey.MSG_TYPE: MessageType.REQ,
MessageHeaderKey.ROUTE: [(self.my_info.fqcn, time.time())],
MessageHeaderKey.DESTINATION: t,
MessageHeaderKey.TO_CELL: ep.name,
}
)

Expand Down Expand Up @@ -1506,11 +1517,12 @@ def send_reply(self, reply: Message, to_cell: str, for_req_ids: List[str], secur
reply.add_headers(
{
MessageHeaderKey.FROM_CELL: self.my_info.fqcn,
MessageHeaderKey.TO_CELL: to_cell,
MessageHeaderKey.ORIGIN: self.my_info.fqcn,
MessageHeaderKey.ROUTE: [(self.my_info.fqcn, time.time())],
MessageHeaderKey.DESTINATION: to_cell,
MessageHeaderKey.REQ_ID: for_req_ids,
MessageHeaderKey.MSG_TYPE: MessageType.REPLY,
MessageHeaderKey.ROUTE: [(self.my_info.fqcn, time.time())],
MessageHeaderKey.SECURE: secure,
MessageHeaderKey.OPTIONAL: optional,
}
Expand Down Expand Up @@ -1926,9 +1938,9 @@ def _process_received_msg(self, endpoint: Endpoint, connection: Connection, mess
MessageHeaderKey.CHANNEL: channel,
MessageHeaderKey.TOPIC: topic,
MessageHeaderKey.FROM_CELL: self.my_info.fqcn,
MessageHeaderKey.TO_CELL: endpoint.name,
MessageHeaderKey.ORIGIN: self.my_info.fqcn,
MessageHeaderKey.DESTINATION: origin,
MessageHeaderKey.TO_CELL: endpoint.name,
MessageHeaderKey.REQ_ID: req_id,
MessageHeaderKey.MSG_TYPE: MessageType.REPLY,
MessageHeaderKey.ROUTE: [(self.my_info.fqcn, time.time())],
Expand Down

0 comments on commit 9c3cf18

Please sign in to comment.