Skip to content

Commit 2ddb548

Browse files
ryanaolearyjjyaoMengjinYan
authored andcommitted
[Core] Add default Ray Node labels at Node init (#53360)
Signed-off-by: Ryan O'Leary <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]> Co-authored-by: Jiajun Yao <[email protected]> Co-authored-by: Mengjin Yan <[email protected]> Signed-off-by: Kamil Kaczmarek <[email protected]>
1 parent ff10956 commit 2ddb548

File tree

15 files changed

+970
-418
lines changed

15 files changed

+970
-418
lines changed

java/test/src/main/java/io/ray/test/NodeLabelSchedulingTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public void testEmptyNodeLabels() {
1616
List<NodeInfo> nodeInfos = Ray.getRuntimeContext().getAllNodeInfo();
1717
Assert.assertTrue(nodeInfos.size() == 1);
1818
Map<String, String> labels = new HashMap<>();
19-
labels.put("ray.io/node_id", nodeInfos.get(0).nodeId.toString());
19+
labels.put("ray.io/node-id", nodeInfos.get(0).nodeId.toString());
2020
Assert.assertEquals(nodeInfos.get(0).labels, labels);
2121
} finally {
2222
Ray.shutdown();
@@ -30,7 +30,7 @@ public void testSetNodeLabels() {
3030
List<NodeInfo> nodeInfos = Ray.getRuntimeContext().getAllNodeInfo();
3131
Assert.assertTrue(nodeInfos.size() == 1);
3232
Map<String, String> labels = new HashMap<>();
33-
labels.put("ray.io/node_id", nodeInfos.get(0).nodeId.toString());
33+
labels.put("ray.io/node-id", nodeInfos.get(0).nodeId.toString());
3434
labels.put("gpu_type", "A100");
3535
labels.put("azone", "azone-1");
3636
Assert.assertEquals(nodeInfos.get(0).labels, labels);

python/ray/_private/node.py

Lines changed: 27 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import ray._private.services
2525
from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
2626
from ray._common.utils import try_to_create_directory
27+
from ray._private.resource_and_label_spec import ResourceAndLabelSpec
2728
from ray._private.resource_isolation_config import ResourceIsolationConfig
28-
from ray._private.resource_spec import ResourceSpec
2929
from ray._private.services import get_address, serialize_config
3030
from ray._private.utils import (
3131
is_in_test,
@@ -137,7 +137,7 @@ def __init__(
137137
),
138138
)
139139

140-
self._resource_spec = None
140+
self._resource_and_label_spec = None
141141
self._localhost = socket.gethostbyname("localhost")
142142
self._ray_params = ray_params
143143
self._config = ray_params._system_config or {}
@@ -286,8 +286,6 @@ def __init__(
286286
self._raylet_socket_name = self._prepare_socket_file(
287287
self._ray_params.raylet_socket_name, default_prefix="raylet"
288288
)
289-
# Set node labels from RayParams or environment override variables.
290-
self._node_labels = self._get_node_labels()
291289
if (
292290
self._ray_params.env_vars is not None
293291
and "RAY_OVERRIDE_NODE_ID_FOR_TESTING" in self._ray_params.env_vars
@@ -521,94 +519,18 @@ def _init_temp(self):
521519
tpu_logs_symlink = os.path.join(self._logs_dir, "tpu_logs")
522520
try_to_symlink(tpu_logs_symlink, tpu_log_dir)
523521

524-
def _get_node_labels(self):
525-
def merge_labels(env_override_labels, params_labels):
526-
"""Merges two dictionaries, picking from the
527-
first in the event of a conflict. Also emit a warning on every
528-
conflict.
529-
"""
530-
531-
result = params_labels.copy()
532-
result.update(env_override_labels)
533-
534-
for key in set(env_override_labels.keys()).intersection(
535-
set(params_labels.keys())
536-
):
537-
if params_labels[key] != env_override_labels[key]:
538-
logger.warning(
539-
"Autoscaler is overriding your label:"
540-
f"{key}: {params_labels[key]} to "
541-
f"{key}: {env_override_labels[key]}."
542-
)
543-
return result
544-
545-
env_override_labels = {}
546-
env_override_labels_string = os.getenv(
547-
ray_constants.LABELS_ENVIRONMENT_VARIABLE
548-
)
549-
if env_override_labels_string:
550-
try:
551-
env_override_labels = json.loads(env_override_labels_string)
552-
except Exception:
553-
logger.exception(f"Failed to load {env_override_labels_string}")
554-
raise
555-
logger.info(f"Autoscaler overriding labels: {env_override_labels}.")
556-
557-
return merge_labels(env_override_labels, self._ray_params.labels or {})
558-
559-
def get_resource_spec(self):
560-
"""Resolve and return the current resource spec for the node."""
561-
562-
def merge_resources(env_dict, params_dict):
563-
"""Separates special case params and merges two dictionaries, picking from the
564-
first in the event of a conflict. Also emit a warning on every
565-
conflict.
566-
"""
567-
num_cpus = env_dict.pop("CPU", None)
568-
num_gpus = env_dict.pop("GPU", None)
569-
memory = env_dict.pop("memory", None)
570-
object_store_memory = env_dict.pop("object_store_memory", None)
571-
572-
result = params_dict.copy()
573-
result.update(env_dict)
574-
575-
for key in set(env_dict.keys()).intersection(set(params_dict.keys())):
576-
if params_dict[key] != env_dict[key]:
577-
logger.warning(
578-
"Autoscaler is overriding your resource:"
579-
f"{key}: {params_dict[key]} with {env_dict[key]}."
580-
)
581-
return num_cpus, num_gpus, memory, object_store_memory, result
582-
583-
if not self._resource_spec:
584-
env_resources = {}
585-
env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE)
586-
if env_string:
587-
try:
588-
env_resources = json.loads(env_string)
589-
except Exception:
590-
logger.exception(f"Failed to load {env_string}")
591-
raise
592-
logger.debug(f"Autoscaler overriding resources: {env_resources}.")
593-
(
594-
num_cpus,
595-
num_gpus,
596-
memory,
597-
object_store_memory,
598-
resources,
599-
) = merge_resources(env_resources, self._ray_params.resources)
600-
self._resource_spec = ResourceSpec(
601-
self._ray_params.num_cpus if num_cpus is None else num_cpus,
602-
self._ray_params.num_gpus if num_gpus is None else num_gpus,
603-
self._ray_params.memory if memory is None else memory,
604-
(
605-
self._ray_params.object_store_memory
606-
if object_store_memory is None
607-
else object_store_memory
608-
),
609-
resources,
522+
def get_resource_and_label_spec(self):
523+
"""Resolve and return the current ResourceAndLabelSpec for the node."""
524+
if not self._resource_and_label_spec:
525+
self._resource_and_label_spec = ResourceAndLabelSpec(
526+
self._ray_params.num_cpus,
527+
self._ray_params.num_gpus,
528+
self._ray_params.memory,
529+
self._ray_params.object_store_memory,
530+
self._ray_params.resources,
531+
self._ray_params.labels,
610532
).resolve(is_head=self.head, node_ip_address=self.node_ip_address)
611-
return self._resource_spec
533+
return self._resource_and_label_spec
612534

613535
@property
614536
def node_id(self):
@@ -1267,6 +1189,7 @@ def start_raylet(
12671189
create_out=True,
12681190
create_err=True,
12691191
)
1192+
12701193
process_info = ray._private.services.start_raylet(
12711194
self.redis_address,
12721195
self.gcs_address,
@@ -1282,7 +1205,7 @@ def start_raylet(
12821205
self._session_dir,
12831206
self._runtime_env_dir,
12841207
self._logs_dir,
1285-
self.get_resource_spec(),
1208+
self.get_resource_and_label_spec(),
12861209
plasma_directory,
12871210
fallback_directory,
12881211
object_store_memory,
@@ -1315,7 +1238,6 @@ def start_raylet(
13151238
env_updates=self._ray_params.env_vars,
13161239
node_name=self._ray_params.node_name,
13171240
webui=self._webui_url,
1318-
labels=self.node_labels,
13191241
resource_isolation_config=self.resource_isolation_config,
13201242
)
13211243
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
@@ -1477,14 +1399,24 @@ def start_ray_processes(self):
14771399

14781400
# Make sure we don't call `determine_plasma_store_config` multiple
14791401
# times to avoid printing multiple warnings.
1480-
resource_spec = self.get_resource_spec()
1402+
resource_and_label_spec = self.get_resource_and_label_spec()
1403+
if resource_and_label_spec.labels.get(
1404+
ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
1405+
):
1406+
from ray._common.usage import usage_lib
1407+
1408+
usage_lib.record_hardware_usage(
1409+
resource_and_label_spec.labels.get(
1410+
ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
1411+
)
1412+
)
14811413

14821414
(
14831415
plasma_directory,
14841416
fallback_directory,
14851417
object_store_memory,
14861418
) = ray._private.services.determine_plasma_store_config(
1487-
resource_spec.object_store_memory,
1419+
resource_and_label_spec.object_store_memory,
14881420
self._temp_dir,
14891421
plasma_directory=self._ray_params.plasma_directory,
14901422
fallback_directory=self._fallback_directory,

0 commit comments

Comments
 (0)