Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f1a7c4f
Add default Ray Node labels at Node init
ryanaoleary May 28, 2025
fce62f2
Fix test
ryanaoleary May 28, 2025
102c2f5
Fix ray re-init error
ryanaoleary May 29, 2025
a0a6763
Actually check accelerator-type in test
ryanaoleary May 29, 2025
c1192c7
Move Ray node related keys/env vars to constants.h
ryanaoleary Jun 6, 2025
c2894bf
Add test fixture
ryanaoleary Jun 6, 2025
0036699
Fix comments
ryanaoleary Jun 12, 2025
54143c4
Fix Ray data test
ryanaoleary Jun 12, 2025
bbf7769
Fix exit_logging test and add try block
ryanaoleary Jun 13, 2025
f6a0c26
Change node id constant
ryanaoleary Jun 23, 2025
d717762
Fix tests that use node_id
ryanaoleary Jun 24, 2025
241861f
Add accelerator util functions
ryanaoleary Jun 24, 2025
10a8ad6
Fix comments and add ResourceAndLabelSpec
ryanaoleary Jul 14, 2025
334dc94
Fix comment
ryanaoleary Jul 14, 2025
7dfb82d
Fix nits / rename file
ryanaoleary Jul 18, 2025
ece86ed
Fix Java test using node_id
ryanaoleary Jul 18, 2025
caefdfd
Refactor resolve functions
ryanaoleary Jul 22, 2025
6be107c
Add ResourceAndLabelSpec unit test file and add helpers
ryanaoleary Jul 23, 2025
147dcad
remove change to test_basic_5
ryanaoleary Jul 23, 2025
38cc250
Update python/ray/_private/resource_and_label_spec.py
ryanaoleary Jul 23, 2025
33018f7
Update python/ray/_private/node.py
ryanaoleary Jul 23, 2025
d518549
Update python/ray/_private/resource_and_label_spec.py
ryanaoleary Jul 23, 2025
8ba4a05
Update python/ray/_private/resource_and_label_spec.py
ryanaoleary Jul 23, 2025
1ba0d6f
Update python/ray/_private/resource_and_label_spec.py
ryanaoleary Jul 23, 2025
3116caf
Update num-gpus being set and fix nits
ryanaoleary Jul 23, 2025
a469b3f
Remove test_sql.py change
ryanaoleary Jul 25, 2025
ad6895c
Fix GPU tests, use monkeypatch, and fix comment
ryanaoleary Jul 25, 2025
775b631
Fix num_gpus edge case and add additional unit tests
ryanaoleary Jul 25, 2025
2ac985b
Move unit tests file, change to call public APIs
ryanaoleary Jul 25, 2025
683f5cb
Update python/ray/_private/resource_and_label_spec.py
ryanaoleary Jul 25, 2025
cd39761
Update python/ray/_private/resource_and_label_spec.py
ryanaoleary Jul 25, 2025
3363509
Add additional unit tests and move resource merge/load from env to Re…
ryanaoleary Jul 26, 2025
a5a68cc
Make functions that don't use self static
ryanaoleary Jul 26, 2025
9df1245
Remove change to unit/BUILD
ryanaoleary Jul 26, 2025
fd43e71
Format
ryanaoleary Jul 26, 2025
e0f6f9d
Update python/ray/_private/resource_and_label_spec.py
ryanaoleary Jul 28, 2025
ee4b20d
Update python/ray/_private/resource_and_label_spec.py
ryanaoleary Jul 28, 2025
5805a54
Change tests to use FakeAcceleratorManager, add _resolve_resources, a…
ryanaoleary Jul 29, 2025
4148cc0
Fix import of usage_lib
ryanaoleary Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void testEmptyNodeLabels() {
List<NodeInfo> nodeInfos = Ray.getRuntimeContext().getAllNodeInfo();
Assert.assertTrue(nodeInfos.size() == 1);
Map<String, String> labels = new HashMap<>();
labels.put("ray.io/node_id", nodeInfos.get(0).nodeId.toString());
labels.put("ray.io/node-id", nodeInfos.get(0).nodeId.toString());
Assert.assertEquals(nodeInfos.get(0).labels, labels);
} finally {
Ray.shutdown();
Expand All @@ -30,7 +30,7 @@ public void testSetNodeLabels() {
List<NodeInfo> nodeInfos = Ray.getRuntimeContext().getAllNodeInfo();
Assert.assertTrue(nodeInfos.size() == 1);
Map<String, String> labels = new HashMap<>();
labels.put("ray.io/node_id", nodeInfos.get(0).nodeId.toString());
labels.put("ray.io/node-id", nodeInfos.get(0).nodeId.toString());
labels.put("gpu_type", "A100");
labels.put("azone", "azone-1");
Assert.assertEquals(nodeInfos.get(0).labels, labels);
Expand Down
122 changes: 27 additions & 95 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import ray._private.services
from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
from ray._common.utils import try_to_create_directory
from ray._private.resource_and_label_spec import ResourceAndLabelSpec
from ray._private.resource_isolation_config import ResourceIsolationConfig
from ray._private.resource_spec import ResourceSpec
from ray._private.services import get_address, serialize_config
from ray._private.utils import (
is_in_test,
Expand Down Expand Up @@ -137,7 +137,7 @@ def __init__(
),
)

self._resource_spec = None
self._resource_and_label_spec = None
self._localhost = socket.gethostbyname("localhost")
self._ray_params = ray_params
self._config = ray_params._system_config or {}
Expand Down Expand Up @@ -286,8 +286,6 @@ def __init__(
self._raylet_socket_name = self._prepare_socket_file(
self._ray_params.raylet_socket_name, default_prefix="raylet"
)
# Set node labels from RayParams or environment override variables.
self._node_labels = self._get_node_labels()
if (
self._ray_params.env_vars is not None
and "RAY_OVERRIDE_NODE_ID_FOR_TESTING" in self._ray_params.env_vars
Expand Down Expand Up @@ -521,94 +519,18 @@ def _init_temp(self):
tpu_logs_symlink = os.path.join(self._logs_dir, "tpu_logs")
try_to_symlink(tpu_logs_symlink, tpu_log_dir)

def _get_node_labels(self):
def merge_labels(env_override_labels, params_labels):
"""Merges two dictionaries, picking from the
first in the event of a conflict. Also emit a warning on every
conflict.
"""

result = params_labels.copy()
result.update(env_override_labels)

for key in set(env_override_labels.keys()).intersection(
set(params_labels.keys())
):
if params_labels[key] != env_override_labels[key]:
logger.warning(
"Autoscaler is overriding your label:"
f"{key}: {params_labels[key]} to "
f"{key}: {env_override_labels[key]}."
)
return result

env_override_labels = {}
env_override_labels_string = os.getenv(
ray_constants.LABELS_ENVIRONMENT_VARIABLE
)
if env_override_labels_string:
try:
env_override_labels = json.loads(env_override_labels_string)
except Exception:
logger.exception(f"Failed to load {env_override_labels_string}")
raise
logger.info(f"Autoscaler overriding labels: {env_override_labels}.")

return merge_labels(env_override_labels, self._ray_params.labels or {})

def get_resource_spec(self):
"""Resolve and return the current resource spec for the node."""

def merge_resources(env_dict, params_dict):
"""Separates special case params and merges two dictionaries, picking from the
first in the event of a conflict. Also emit a warning on every
conflict.
"""
num_cpus = env_dict.pop("CPU", None)
num_gpus = env_dict.pop("GPU", None)
memory = env_dict.pop("memory", None)
object_store_memory = env_dict.pop("object_store_memory", None)

result = params_dict.copy()
result.update(env_dict)

for key in set(env_dict.keys()).intersection(set(params_dict.keys())):
if params_dict[key] != env_dict[key]:
logger.warning(
"Autoscaler is overriding your resource:"
f"{key}: {params_dict[key]} with {env_dict[key]}."
)
return num_cpus, num_gpus, memory, object_store_memory, result

if not self._resource_spec:
env_resources = {}
env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE)
if env_string:
try:
env_resources = json.loads(env_string)
except Exception:
logger.exception(f"Failed to load {env_string}")
raise
logger.debug(f"Autoscaler overriding resources: {env_resources}.")
(
num_cpus,
num_gpus,
memory,
object_store_memory,
resources,
) = merge_resources(env_resources, self._ray_params.resources)
self._resource_spec = ResourceSpec(
self._ray_params.num_cpus if num_cpus is None else num_cpus,
self._ray_params.num_gpus if num_gpus is None else num_gpus,
self._ray_params.memory if memory is None else memory,
(
self._ray_params.object_store_memory
if object_store_memory is None
else object_store_memory
),
resources,
def get_resource_and_label_spec(self):
"""Resolve and return the current ResourceAndLabelSpec for the node."""
if not self._resource_and_label_spec:
self._resource_and_label_spec = ResourceAndLabelSpec(
self._ray_params.num_cpus,
self._ray_params.num_gpus,
self._ray_params.memory,
self._ray_params.object_store_memory,
self._ray_params.resources,
self._ray_params.labels,
).resolve(is_head=self.head, node_ip_address=self.node_ip_address)
return self._resource_spec
return self._resource_and_label_spec

@property
def node_id(self):
Expand Down Expand Up @@ -1267,6 +1189,7 @@ def start_raylet(
create_out=True,
create_err=True,
)

process_info = ray._private.services.start_raylet(
self.redis_address,
self.gcs_address,
Expand All @@ -1282,7 +1205,7 @@ def start_raylet(
self._session_dir,
self._runtime_env_dir,
self._logs_dir,
self.get_resource_spec(),
self.get_resource_and_label_spec(),
plasma_directory,
fallback_directory,
object_store_memory,
Expand Down Expand Up @@ -1315,7 +1238,6 @@ def start_raylet(
env_updates=self._ray_params.env_vars,
node_name=self._ray_params.node_name,
webui=self._webui_url,
labels=self.node_labels,
resource_isolation_config=self.resource_isolation_config,
)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
Expand Down Expand Up @@ -1477,14 +1399,24 @@ def start_ray_processes(self):

# Make sure we don't call `determine_plasma_store_config` multiple
# times to avoid printing multiple warnings.
resource_spec = self.get_resource_spec()
resource_and_label_spec = self.get_resource_and_label_spec()
if resource_and_label_spec.labels.get(
ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
):
from ray._common.usage import usage_lib

usage_lib.record_hardware_usage(
resource_and_label_spec.labels.get(
ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
)
)

(
plasma_directory,
fallback_directory,
object_store_memory,
) = ray._private.services.determine_plasma_store_config(
resource_spec.object_store_memory,
resource_and_label_spec.object_store_memory,
self._temp_dir,
plasma_directory=self._ray_params.plasma_directory,
fallback_directory=self._fallback_directory,
Expand Down
Loading