Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][refactor] Move node_stats_to_dict to utils.py to avoid importing unnecessary modules #51187

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions python/ray/dashboard/memory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import ray
from ray._private.internal_api import node_stats
from ray._raylet import ActorID, JobID, TaskID
from ray.dashboard.utils import node_stats_to_dict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -381,8 +382,6 @@ def memory_summary(
# Get terminal size
import shutil

from ray.dashboard.modules.node.node_head import node_stats_to_dict

size = shutil.get_terminal_size((80, 20)).columns
line_wrap_threshold = 137

Expand Down
31 changes: 3 additions & 28 deletions python/ray/dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,6 @@ def _gcs_node_info_to_dict(message: gcs_pb2.GcsNodeInfo) -> dict:
)


def node_stats_to_dict(message):
decode_keys = {
"actorId",
"jobId",
"taskId",
"parentTaskId",
"sourceActorId",
"callerId",
"rayletId",
"workerId",
"placementGroupId",
}
core_workers_stats = message.core_workers_stats
message.ClearField("core_workers_stats")
try:
result = dashboard_utils.message_to_dict(message, decode_keys)
result["coreWorkersStats"] = [
dashboard_utils.message_to_dict(
m, decode_keys, always_print_fields_with_no_presence=True
)
for m in core_workers_stats
]
return result
finally:
message.core_workers_stats.extend(core_workers_stats)


class NodeHead(dashboard_utils.DashboardHeadModule):
def __init__(self, config: dashboard_utils.DashboardHeadModuleConfig):
super().__init__(config)
Expand Down Expand Up @@ -422,7 +395,9 @@ def postprocess(node_id_response_tuples):
f"Error updating node stats of {node_id}.", exc_info=response
)
else:
new_node_stats[node_id] = node_stats_to_dict(response)
new_node_stats[node_id] = dashboard_utils.node_stats_to_dict(
response
)

return new_node_stats

Expand Down
27 changes: 27 additions & 0 deletions python/ray/dashboard/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,33 @@ def address_tuple(address):
return ip, int(port)


def node_stats_to_dict(
message: "ray.core.generated.node_manager_pb2.GetNodeStatsReply",
):
decode_keys = {
"actorId",
"jobId",
"taskId",
"parentTaskId",
"sourceActorId",
"callerId",
"rayletId",
"workerId",
"placementGroupId",
}
core_workers_stats = message.core_workers_stats
message.ClearField("core_workers_stats")
try:
result = message_to_dict(message, decode_keys)
result["coreWorkersStats"] = [
message_to_dict(m, decode_keys, always_print_fields_with_no_presence=True)
for m in core_workers_stats
]
return result
finally:
message.core_workers_stats.extend(core_workers_stats)
Comment on lines +384 to +385
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why this is in a finally block? very strange pattern

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The function first clears the field core_workers_stats from the message.
  2. It converts the message into a dict and constructs result["coreWorkersStats"].
  3. finally: if step (2) throws an exception, the field core_workers_stats is set back on the message to avoid side effects.



class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
Expand Down