diff --git a/java/test/src/main/java/io/ray/test/NodeLabelSchedulingTest.java b/java/test/src/main/java/io/ray/test/NodeLabelSchedulingTest.java index b25457e87d6e..f03a6415dc73 100644 --- a/java/test/src/main/java/io/ray/test/NodeLabelSchedulingTest.java +++ b/java/test/src/main/java/io/ray/test/NodeLabelSchedulingTest.java @@ -16,7 +16,7 @@ public void testEmptyNodeLabels() { List nodeInfos = Ray.getRuntimeContext().getAllNodeInfo(); Assert.assertTrue(nodeInfos.size() == 1); Map labels = new HashMap<>(); - labels.put("ray.io/node_id", nodeInfos.get(0).nodeId.toString()); + labels.put("ray.io/node-id", nodeInfos.get(0).nodeId.toString()); Assert.assertEquals(nodeInfos.get(0).labels, labels); } finally { Ray.shutdown(); @@ -30,7 +30,7 @@ public void testSetNodeLabels() { List nodeInfos = Ray.getRuntimeContext().getAllNodeInfo(); Assert.assertTrue(nodeInfos.size() == 1); Map labels = new HashMap<>(); - labels.put("ray.io/node_id", nodeInfos.get(0).nodeId.toString()); + labels.put("ray.io/node-id", nodeInfos.get(0).nodeId.toString()); labels.put("gpu_type", "A100"); labels.put("azone", "azone-1"); Assert.assertEquals(nodeInfos.get(0).labels, labels); diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 00b6599dfc56..5a9cae459933 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -24,8 +24,8 @@ import ray._private.services from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES from ray._common.utils import try_to_create_directory +from ray._private.resource_and_label_spec import ResourceAndLabelSpec from ray._private.resource_isolation_config import ResourceIsolationConfig -from ray._private.resource_spec import ResourceSpec from ray._private.services import get_address, serialize_config from ray._private.utils import ( is_in_test, @@ -137,7 +137,7 @@ def __init__( ), ) - self._resource_spec = None + self._resource_and_label_spec = None self._localhost = socket.gethostbyname("localhost") self._ray_params = ray_params self._config = ray_params._system_config or {} @@ -286,8 +286,6 @@ def __init__( self._raylet_socket_name = self._prepare_socket_file( self._ray_params.raylet_socket_name, default_prefix="raylet" ) - # Set node labels from RayParams or environment override variables. - self._node_labels = self._get_node_labels() if ( self._ray_params.env_vars is not None and "RAY_OVERRIDE_NODE_ID_FOR_TESTING" in self._ray_params.env_vars @@ -521,94 +519,18 @@ def _init_temp(self): tpu_logs_symlink = os.path.join(self._logs_dir, "tpu_logs") try_to_symlink(tpu_logs_symlink, tpu_log_dir) - def _get_node_labels(self): - def merge_labels(env_override_labels, params_labels): - """Merges two dictionaries, picking from the - first in the event of a conflict. Also emit a warning on every - conflict. - """ - - result = params_labels.copy() - result.update(env_override_labels) - - for key in set(env_override_labels.keys()).intersection( - set(params_labels.keys()) - ): - if params_labels[key] != env_override_labels[key]: - logger.warning( - "Autoscaler is overriding your label:" - f"{key}: {params_labels[key]} to " - f"{key}: {env_override_labels[key]}." - ) - return result - - env_override_labels = {} - env_override_labels_string = os.getenv( - ray_constants.LABELS_ENVIRONMENT_VARIABLE - ) - if env_override_labels_string: - try: - env_override_labels = json.loads(env_override_labels_string) - except Exception: - logger.exception(f"Failed to load {env_override_labels_string}") - raise - logger.info(f"Autoscaler overriding labels: {env_override_labels}.") - - return merge_labels(env_override_labels, self._ray_params.labels or {}) - - def get_resource_spec(self): - """Resolve and return the current resource spec for the node.""" - - def merge_resources(env_dict, params_dict): - """Separates special case params and merges two dictionaries, picking from the - first in the event of a conflict. Also emit a warning on every - conflict. - """ - num_cpus = env_dict.pop("CPU", None) - num_gpus = env_dict.pop("GPU", None) - memory = env_dict.pop("memory", None) - object_store_memory = env_dict.pop("object_store_memory", None) - - result = params_dict.copy() - result.update(env_dict) - - for key in set(env_dict.keys()).intersection(set(params_dict.keys())): - if params_dict[key] != env_dict[key]: - logger.warning( - "Autoscaler is overriding your resource:" - f"{key}: {params_dict[key]} with {env_dict[key]}." - ) - return num_cpus, num_gpus, memory, object_store_memory, result - - if not self._resource_spec: - env_resources = {} - env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE) - if env_string: - try: - env_resources = json.loads(env_string) - except Exception: - logger.exception(f"Failed to load {env_string}") - raise - logger.debug(f"Autoscaler overriding resources: {env_resources}.") - ( - num_cpus, - num_gpus, - memory, - object_store_memory, - resources, - ) = merge_resources(env_resources, self._ray_params.resources) - self._resource_spec = ResourceSpec( - self._ray_params.num_cpus if num_cpus is None else num_cpus, - self._ray_params.num_gpus if num_gpus is None else num_gpus, - self._ray_params.memory if memory is None else memory, - ( - self._ray_params.object_store_memory - if object_store_memory is None - else object_store_memory - ), - resources, + def get_resource_and_label_spec(self): + """Resolve and return the current ResourceAndLabelSpec for the node.""" + if not self._resource_and_label_spec: + self._resource_and_label_spec = ResourceAndLabelSpec( + self._ray_params.num_cpus, + self._ray_params.num_gpus, + self._ray_params.memory, + self._ray_params.object_store_memory, + self._ray_params.resources, + self._ray_params.labels, ).resolve(is_head=self.head, node_ip_address=self.node_ip_address) - return self._resource_spec + return self._resource_and_label_spec @property def node_id(self): @@ -1267,6 +1189,7 @@ def start_raylet( create_out=True, create_err=True, ) + process_info = ray._private.services.start_raylet( self.redis_address, self.gcs_address, @@ -1282,7 +1205,7 @@ def start_raylet( self._session_dir, self._runtime_env_dir, self._logs_dir, - self.get_resource_spec(), + self.get_resource_and_label_spec(), plasma_directory, fallback_directory, object_store_memory, @@ -1315,7 +1238,6 @@ def start_raylet( env_updates=self._ray_params.env_vars, node_name=self._ray_params.node_name, webui=self._webui_url, - labels=self.node_labels, resource_isolation_config=self.resource_isolation_config, ) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes @@ -1477,14 +1399,24 @@ def start_ray_processes(self): # Make sure we don't call `determine_plasma_store_config` multiple # times to avoid printing multiple warnings. - resource_spec = self.get_resource_spec() + resource_and_label_spec = self.get_resource_and_label_spec() + if resource_and_label_spec.labels.get( + ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY + ): + from ray._common.usage import usage_lib + + usage_lib.record_hardware_usage( + resource_and_label_spec.labels.get( + ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY + ) + ) ( plasma_directory, fallback_directory, object_store_memory, ) = ray._private.services.determine_plasma_store_config( - resource_spec.object_store_memory, + resource_and_label_spec.object_store_memory, self._temp_dir, plasma_directory=self._ray_params.plasma_directory, fallback_directory=self._fallback_directory, diff --git a/python/ray/_private/resource_and_label_spec.py b/python/ray/_private/resource_and_label_spec.py new file mode 100644 index 000000000000..f33b7d0a45fe --- /dev/null +++ b/python/ray/_private/resource_and_label_spec.py @@ -0,0 +1,478 @@ +import json +import logging +import os +import sys +from typing import Dict, Optional, Tuple + +import ray +import ray._private.ray_constants as ray_constants +from ray._common.constants import HEAD_NODE_RESOURCE_NAME, NODE_ID_PREFIX +from ray._common.utils import RESOURCE_CONSTRAINT_PREFIX +from ray._private import accelerators +from ray._private.accelerators import AcceleratorManager + +logger = logging.getLogger(__name__) + + +class ResourceAndLabelSpec: + """Represents the resource and label configuration passed to a raylet. + + All fields can be None. Before starting services, resolve() should be + called to return a ResourceAndLabelSpec with unknown values filled in with + merged values based on the local machine and user specifications. + """ + + def __init__( + self, + num_cpus: Optional[int] = None, + num_gpus: Optional[int] = None, + memory: Optional[float] = None, + object_store_memory: Optional[float] = None, + resources: Optional[Dict[str, float]] = None, + labels: Optional[Dict[str, str]] = None, + ): + """ + Initialize a ResourceAndLabelSpec + + Args: + num_cpus: The CPUs allocated for this raylet. + num_gpus: The GPUs allocated for this raylet. + memory: The memory allocated for this raylet. + object_store_memory: The object store memory allocated for this raylet. + resources: The custom resources allocated for this raylet. + labels: The labels associated with this node. Labels can be used along + with resources for scheduling. + """ + self.num_cpus = num_cpus + self.num_gpus = num_gpus + self.memory = memory + self.object_store_memory = object_store_memory + self.resources = resources + self.labels = labels + self._is_resolved = False + + def resolved(self) -> bool: + """Returns if resolve() has been called for this ResourceAndLabelSpec + and default values are filled out.""" + return self._is_resolved + + def _all_fields_set(self) -> bool: + """Returns whether all fields in this ResourceAndLabelSpec are not None.""" + return all( + v is not None + for v in ( + self.num_cpus, + self.num_gpus, + self.memory, + self.object_store_memory, + self.resources, + self.labels, + ) + ) + + def to_resource_dict(self): + """Returns a dict suitable to pass to raylet initialization. + + This renames num_cpus / num_gpus to "CPU" / "GPU", + and check types and values. + """ + assert self.resolved() + + resources = dict( + self.resources, + CPU=self.num_cpus, + GPU=self.num_gpus, + memory=int(self.memory), + object_store_memory=int(self.object_store_memory), + ) + + resources = { + resource_label: resource_quantity + for resource_label, resource_quantity in resources.items() + if resource_quantity != 0 + } + + # Check types. + for resource_label, resource_quantity in resources.items(): + assert isinstance(resource_quantity, int) or isinstance( + resource_quantity, float + ), ( + f"{resource_label} ({type(resource_quantity)}): " f"{resource_quantity}" + ) + if ( + isinstance(resource_quantity, float) + and not resource_quantity.is_integer() + ): + raise ValueError( + "Resource quantities must all be whole numbers. " + "Violated by resource '{}' in {}.".format(resource_label, resources) + ) + if resource_quantity < 0: + raise ValueError( + "Resource quantities must be nonnegative. " + "Violated by resource '{}' in {}.".format(resource_label, resources) + ) + if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY: + raise ValueError( + "Resource quantities must be at most {}. " + "Violated by resource '{}' in {}.".format( + ray_constants.MAX_RESOURCE_QUANTITY, resource_label, resources + ) + ) + + return resources + + def resolve( + self, is_head: bool, node_ip_address: Optional[str] = None + ) -> "ResourceAndLabelSpec": + """Fills out this ResourceAndLabelSpec instance with merged values from system defaults and user specification. + + Args: + is_head: Whether this is the head node. + node_ip_address: The IP address of the node that we are on. + This is used to automatically create a node id resource. + + Returns: + ResourceAndLabelSpec: This instance with all fields resolved. + """ + + self._resolve_resources(is_head=is_head, node_ip_address=node_ip_address) + + # Resolve accelerator-specific resources + ( + accelerator_manager, + num_accelerators, + ) = ResourceAndLabelSpec._get_current_node_accelerator( + self.num_gpus, self.resources + ) + self._resolve_accelerator_resources(accelerator_manager, num_accelerators) + + # Default num_gpus value if unset by user and unable to auto-detect. + if self.num_gpus is None: + self.num_gpus = 0 + + # Resolve and merge node labels from all sources (params, env, and default). + self._resolve_labels(accelerator_manager) + + # Resolve memory resources + self._resolve_memory_resources() + + self._is_resolved = True + assert self._all_fields_set() + return self + + @staticmethod + def _load_env_resources() -> Dict[str, float]: + """Load resource overrides from the environment, if present.""" + env_resources = {} + env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE) + if env_string: + try: + env_resources = json.loads(env_string) + except Exception: + logger.exception(f"Failed to load {env_string}") + raise + logger.debug(f"Autoscaler overriding resources: {env_resources}.") + return env_resources + + @staticmethod + def _merge_resources(env_dict: Dict[str, float], params_dict: Dict[str, float]): + """Merge environment and Ray param-provided resources, with env values taking precedence. + Returns separated special case params (CPU/GPU/memory) and the merged resource dict. + """ + num_cpus = env_dict.pop("CPU", None) + num_gpus = env_dict.pop("GPU", None) + memory = env_dict.pop("memory", None) + object_store_memory = env_dict.pop("object_store_memory", None) + + result = params_dict.copy() + result.update(env_dict) + + for key in set(env_dict.keys()).intersection(params_dict or {}): + if params_dict[key] != env_dict[key]: + logger.warning( + f"Autoscaler is overriding your resource: {key}: " + f"{params_dict[key]} with {env_dict[key]}." + ) + + return num_cpus, num_gpus, memory, object_store_memory, result + + def _resolve_resources( + self, is_head: bool, node_ip_address: Optional[str] = None + ) -> None: + """Resolve CPU, GPU, and custom resources. Merges resources from environment, + Ray params, and defaults in that order of precedence.""" + + # Load environment override resources and merge with resources passed + # in from Ray Params. Separates special case params if found in env. + env_resources = ResourceAndLabelSpec._load_env_resources() + ( + num_cpus, + num_gpus, + memory, + object_store_memory, + merged_resources, + ) = ResourceAndLabelSpec._merge_resources(env_resources, self.resources or {}) + + self.num_cpus = self.num_cpus if num_cpus is None else num_cpus + self.num_gpus = self.num_gpus if num_gpus is None else num_gpus + self.memory = self.memory if memory is None else memory + self.object_store_memory = ( + self.object_store_memory + if object_store_memory is None + else object_store_memory + ) + self.resources = merged_resources + + if node_ip_address is None: + node_ip_address = ray.util.get_node_ip_address() + + # Automatically create a node id resource on each node. This is + # queryable with ray._private.state.node_ids() and + # ray._private.state.current_node_id(). + self.resources[NODE_ID_PREFIX + node_ip_address] = 1.0 + + # Automatically create a head node resource. + if HEAD_NODE_RESOURCE_NAME in self.resources: + raise ValueError( + f"{HEAD_NODE_RESOURCE_NAME}" + " is a reserved resource name, use another name instead." + ) + if is_head: + self.resources[HEAD_NODE_RESOURCE_NAME] = 1.0 + + # Auto-detect CPU count if not explicitly set + if self.num_cpus is None: + self.num_cpus = ray._private.utils.get_num_cpus() + + @staticmethod + def _load_env_labels() -> Dict[str, str]: + env_override_labels = {} + env_override_labels_string = os.getenv( + ray_constants.LABELS_ENVIRONMENT_VARIABLE + ) + if env_override_labels_string: + try: + env_override_labels = json.loads(env_override_labels_string) + except Exception: + logger.exception(f"Failed to load {env_override_labels_string}") + raise + logger.info(f"Autoscaler overriding labels: {env_override_labels}.") + + return env_override_labels + + @staticmethod + def _get_default_labels( + accelerator_manager: Optional[AcceleratorManager], + ) -> Dict[str, str]: + default_labels = {} + + # Get environment variables populated from K8s Pod Spec + node_group = os.environ.get(ray._raylet.NODE_TYPE_NAME_ENV, "") + market_type = os.environ.get(ray._raylet.NODE_MARKET_TYPE_ENV, "") + availability_region = os.environ.get(ray._raylet.NODE_REGION_ENV, "") + availability_zone = os.environ.get(ray._raylet.NODE_ZONE_ENV, "") + + # Map environment variables to default ray node labels + if market_type: + default_labels[ray._raylet.RAY_NODE_MARKET_TYPE_KEY] = market_type + if node_group: + default_labels[ray._raylet.RAY_NODE_GROUP_KEY] = node_group + if availability_zone: + default_labels[ray._raylet.RAY_NODE_ZONE_KEY] = availability_zone + if availability_region: + default_labels[ray._raylet.RAY_NODE_REGION_KEY] = availability_region + + # Get accelerator type from AcceleratorManager + if accelerator_manager: + accelerator_type = accelerator_manager.get_current_node_accelerator_type() + if accelerator_type: + default_labels[ + ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY + ] = accelerator_type + + return default_labels + + def _resolve_labels( + self, accelerator_manager: Optional[AcceleratorManager] + ) -> None: + """Resolve and merge environment override, user-input from params, and Ray default + labels in that order of precedence.""" + + # Start with a dictionary filled out with Ray default labels + merged = ResourceAndLabelSpec._get_default_labels(accelerator_manager) + + # Merge user-specified labels from Ray params + for key, val in (self.labels or {}).items(): + if key in merged and merged[key] != val: + logger.warning( + f"User label is overriding Ray default label: {key}: " + f"{key}: {merged[key]} to " + f"{key}: {self.labels[key]}." + ) + merged[key] = val + + # Merge autoscaler override labels from environment + env_labels = ResourceAndLabelSpec._load_env_labels() + for key, val in (env_labels or {}).items(): + if key in merged and merged[key] != val: + logger.warning( + "Autoscaler is overriding your label:" + f"{key}: {merged[key]} to " + f"{key}: {env_labels[key]}." + ) + merged[key] = val + + self.labels = merged + + def _resolve_accelerator_resources(self, accelerator_manager, num_accelerators): + """Detect and update accelerator resources on a node.""" + if not accelerator_manager: + return + + accelerator_resource_name = accelerator_manager.get_resource_name() + visible_accelerator_ids = ( + accelerator_manager.get_current_process_visible_accelerator_ids() + ) + + # Check that the number of accelerators that the raylet wants doesn't + # exceed the amount allowed by visible accelerator ids. + if ( + num_accelerators is not None + and visible_accelerator_ids is not None + and num_accelerators > len(visible_accelerator_ids) + ): + raise ValueError( + f"Attempting to start raylet with {num_accelerators} " + f"{accelerator_resource_name}, " + f"but {accelerator_manager.get_visible_accelerator_ids_env_var()} " + f"contains {visible_accelerator_ids}." + ) + + if accelerator_resource_name == "GPU": + self.num_gpus = num_accelerators + else: + self.resources[accelerator_resource_name] = num_accelerators + + accelerator_type = accelerator_manager.get_current_node_accelerator_type() + if accelerator_type: + self.resources[f"{RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}"] = 1 + additional_resources = ( + accelerator_manager.get_current_node_additional_resources() + ) + if additional_resources: + self.resources.update(additional_resources) + + def _resolve_memory_resources(self): + # Choose a default object store size. + system_memory = ray._common.utils.get_system_memory() + avail_memory = ray._private.utils.estimate_available_memory() + object_store_memory = self.object_store_memory + if object_store_memory is None: + object_store_memory = int( + avail_memory * ray_constants.DEFAULT_OBJECT_STORE_MEMORY_PROPORTION + ) + + # Set the object_store_memory size to 2GB on Mac + # to avoid degraded performance. + # (https://github.com/ray-project/ray/issues/20388) + if sys.platform == "darwin": + object_store_memory = min( + object_store_memory, ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT + ) + + object_store_memory_cap = ( + ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES + ) + + # Cap by shm size by default to avoid low performance, but don't + # go lower than REQUIRE_SHM_SIZE_THRESHOLD. + if sys.platform == "linux" or sys.platform == "linux2": + # Multiple by 0.95 to give a bit of wiggle-room. + # https://github.com/ray-project/ray/pull/23034/files + shm_avail = ray._private.utils.get_shared_memory_bytes() * 0.95 + shm_cap = max(ray_constants.REQUIRE_SHM_SIZE_THRESHOLD, shm_avail) + + object_store_memory_cap = min(object_store_memory_cap, shm_cap) + + # Cap memory to avoid memory waste and perf issues on large nodes + if ( + object_store_memory_cap + and object_store_memory > object_store_memory_cap + ): + logger.debug( + "Warning: Capping object memory store to {}GB. ".format( + object_store_memory_cap // 1e9 + ) + + "To increase this further, specify `object_store_memory` " + "when calling ray.init() or ray start." + ) + object_store_memory = object_store_memory_cap + + memory = self.memory + if memory is None: + memory = avail_memory - object_store_memory + if memory < 100e6 and memory < 0.05 * system_memory: + raise ValueError( + "After taking into account object store and redis memory " + "usage, the amount of memory on this node available for " + "tasks and actors ({} GB) is less than {}% of total. " + "You can adjust these settings with " + "ray.init(memory=, " + "object_store_memory=).".format( + round(memory / 1e9, 2), int(100 * (memory / system_memory)) + ) + ) + + # Set the resolved memory and object_store_memory + self.object_store_memory = object_store_memory + self.memory = memory + + @staticmethod + def _get_current_node_accelerator( + num_gpus: Optional[int], resources: Dict[str, float] + ) -> Tuple[AcceleratorManager, int]: + """ + Returns the AcceleratorManager and accelerator count for the accelerator + associated with this node. This assumes each node has at most one accelerator type. + If no accelerators are present, returns None. + + The resolved accelerator count uses num_gpus (for GPUs) or resources if set, and + otherwise falls back to the count auto-detected by the AcceleratorManager. The + resolved accelerator count is capped by the number of visible accelerators. + + Args: + num_gpus: GPU count (if provided by user). + resources: Resource dictionary containing custom resource keys. + + Returns: + Tuple[Optional[AcceleratorManager], int]: A tuple containing the accelerator + manager (or None) the final resolved accelerator count. + """ + for resource_name in accelerators.get_all_accelerator_resource_names(): + accelerator_manager = accelerators.get_accelerator_manager_for_resource( + resource_name + ) + if accelerator_manager is None: + continue + # Respect configured value for GPUs if set + if resource_name == "GPU": + num_accelerators = num_gpus + else: + num_accelerators = resources.get(resource_name) + if num_accelerators is None: + num_accelerators = ( + accelerator_manager.get_current_node_num_accelerators() + ) + visible_accelerator_ids = ( + accelerator_manager.get_current_process_visible_accelerator_ids() + ) + if visible_accelerator_ids is not None: + num_accelerators = min( + num_accelerators, len(visible_accelerator_ids) + ) + + if num_accelerators > 0: + return accelerator_manager, num_accelerators + + return None, 0 diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py deleted file mode 100644 index 1f2ffbe0497e..000000000000 --- a/python/ray/_private/resource_spec.py +++ /dev/null @@ -1,283 +0,0 @@ -import logging -import sys -from collections import namedtuple -from typing import Optional - -import ray -import ray._private.ray_constants as ray_constants -from ray._common.constants import HEAD_NODE_RESOURCE_NAME, NODE_ID_PREFIX -from ray._common.utils import RESOURCE_CONSTRAINT_PREFIX - -logger = logging.getLogger(__name__) - - -class ResourceSpec( - namedtuple( - "ResourceSpec", - [ - "num_cpus", - "num_gpus", - "memory", - "object_store_memory", - "resources", - ], - ) -): - """Represents the resource configuration passed to a raylet. - - All fields can be None. Before starting services, resolve() should be - called to return a ResourceSpec with unknown values filled in with - defaults based on the local machine specifications. - - Attributes: - num_cpus: The CPUs allocated for this raylet. - num_gpus: The GPUs allocated for this raylet. - memory: The memory allocated for this raylet. - object_store_memory: The object store memory allocated for this raylet. - Note that when calling to_resource_dict(), this will be scaled down - by 30% to account for the global plasma LRU reserve. - resources: The custom resources allocated for this raylet. - """ - - def __new__( - cls, - num_cpus=None, - num_gpus=None, - memory=None, - object_store_memory=None, - resources=None, - ): - return super(ResourceSpec, cls).__new__( - cls, - num_cpus, - num_gpus, - memory, - object_store_memory, - resources, - ) - - def resolved(self): - """Returns if this ResourceSpec has default values filled out.""" - for v in self._asdict().values(): - if v is None: - return False - return True - - def to_resource_dict(self): - """Returns a dict suitable to pass to raylet initialization. - - This renames num_cpus / num_gpus to "CPU" / "GPU", - translates memory from bytes into 100MB memory units, and checks types. - """ - assert self.resolved() - - resources = dict( - self.resources, - CPU=self.num_cpus, - GPU=self.num_gpus, - memory=int(self.memory), - object_store_memory=int(self.object_store_memory), - ) - - resources = { - resource_label: resource_quantity - for resource_label, resource_quantity in resources.items() - if resource_quantity != 0 - } - - # Check types. - for resource_label, resource_quantity in resources.items(): - assert isinstance(resource_quantity, int) or isinstance( - resource_quantity, float - ), ( - f"{resource_label} ({type(resource_quantity)}): " f"{resource_quantity}" - ) - if ( - isinstance(resource_quantity, float) - and not resource_quantity.is_integer() - ): - raise ValueError( - "Resource quantities must all be whole numbers. " - "Violated by resource '{}' in {}.".format(resource_label, resources) - ) - if resource_quantity < 0: - raise ValueError( - "Resource quantities must be nonnegative. " - "Violated by resource '{}' in {}.".format(resource_label, resources) - ) - if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY: - raise ValueError( - "Resource quantities must be at most {}. " - "Violated by resource '{}' in {}.".format( - ray_constants.MAX_RESOURCE_QUANTITY, resource_label, resources - ) - ) - - return resources - - def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): - """Returns a copy with values filled out with system defaults. - - Args: - is_head: Whether this is the head node. - node_ip_address: The IP address of the node that we are on. - This is used to automatically create a node id resource. - """ - - resources = (self.resources or {}).copy() - assert "CPU" not in resources, resources - assert "GPU" not in resources, resources - assert "memory" not in resources, resources - assert "object_store_memory" not in resources, resources - - if node_ip_address is None: - node_ip_address = ray.util.get_node_ip_address() - - # Automatically create a node id resource on each node. This is - # queryable with ray._private.state.node_ids() and - # ray._private.state.current_node_id(). - resources[NODE_ID_PREFIX + node_ip_address] = 1.0 - - # Automatically create a head node resource. - if HEAD_NODE_RESOURCE_NAME in resources: - raise ValueError( - f"{HEAD_NODE_RESOURCE_NAME}" - " is a reserved resource name, use another name instead." - ) - if is_head: - resources[HEAD_NODE_RESOURCE_NAME] = 1.0 - - num_cpus = self.num_cpus - if num_cpus is None: - num_cpus = ray._private.utils.get_num_cpus() - - num_gpus = 0 - for ( - accelerator_resource_name - ) in ray._private.accelerators.get_all_accelerator_resource_names(): - accelerator_manager = ( - ray._private.accelerators.get_accelerator_manager_for_resource( - accelerator_resource_name - ) - ) - num_accelerators = None - if accelerator_resource_name == "GPU": - num_accelerators = self.num_gpus - else: - num_accelerators = resources.get(accelerator_resource_name, None) - visible_accelerator_ids = ( - accelerator_manager.get_current_process_visible_accelerator_ids() - ) - # Check that the number of accelerators that the raylet wants doesn't - # exceed the amount allowed by visible accelerator ids. - if ( - num_accelerators is not None - and visible_accelerator_ids is not None - and num_accelerators > len(visible_accelerator_ids) - ): - raise ValueError( - f"Attempting to start raylet with {num_accelerators} " - f"{accelerator_resource_name}, " - f"but {accelerator_manager.get_visible_accelerator_ids_env_var()} " - f"contains {visible_accelerator_ids}." - ) - if num_accelerators is None: - # Try to automatically detect the number of accelerators. - num_accelerators = ( - accelerator_manager.get_current_node_num_accelerators() - ) - # Don't use more accelerators than allowed by visible accelerator ids. - if visible_accelerator_ids is not None: - num_accelerators = min( - num_accelerators, len(visible_accelerator_ids) - ) - - if num_accelerators: - if accelerator_resource_name == "GPU": - num_gpus = num_accelerators - else: - resources[accelerator_resource_name] = num_accelerators - - accelerator_type = ( - accelerator_manager.get_current_node_accelerator_type() - ) - if accelerator_type: - resources[f"{RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}"] = 1 - - from ray._common.usage import usage_lib - - usage_lib.record_hardware_usage(accelerator_type) - additional_resources = ( - accelerator_manager.get_current_node_additional_resources() - ) - if additional_resources: - resources.update(additional_resources) - # Choose a default object store size. - system_memory = ray._common.utils.get_system_memory() - avail_memory = ray._private.utils.estimate_available_memory() - object_store_memory = self.object_store_memory - if object_store_memory is None: - object_store_memory = int( - avail_memory * ray_constants.DEFAULT_OBJECT_STORE_MEMORY_PROPORTION - ) - - # Set the object_store_memory size to 2GB on Mac - # to avoid degraded performance. - # (https://github.com/ray-project/ray/issues/20388) - if sys.platform == "darwin": - object_store_memory = min( - object_store_memory, ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT - ) - - object_store_memory_cap = ( - ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES - ) - - # Cap by shm size by default to avoid low performance, but don't - # go lower than REQUIRE_SHM_SIZE_THRESHOLD. - if sys.platform == "linux" or sys.platform == "linux2": - # Multiple by 0.95 to give a bit of wiggle-room. - # https://github.com/ray-project/ray/pull/23034/files - shm_avail = ray._private.utils.get_shared_memory_bytes() * 0.95 - shm_cap = max(ray_constants.REQUIRE_SHM_SIZE_THRESHOLD, shm_avail) - - object_store_memory_cap = min(object_store_memory_cap, shm_cap) - - # Cap memory to avoid memory waste and perf issues on large nodes - if ( - object_store_memory_cap - and object_store_memory > object_store_memory_cap - ): - logger.debug( - "Warning: Capping object memory store to {}GB. ".format( - object_store_memory_cap // 1e9 - ) - + "To increase this further, specify `object_store_memory` " - "when calling ray.init() or ray start." - ) - object_store_memory = object_store_memory_cap - - memory = self.memory - if memory is None: - memory = avail_memory - object_store_memory - if memory < 100e6 and memory < 0.05 * system_memory: - raise ValueError( - "After taking into account object store and redis memory " - "usage, the amount of memory on this node available for " - "tasks and actors ({} GB) is less than {}% of total. " - "You can adjust these settings with " - "ray.init(memory=, " - "object_store_memory=).".format( - round(memory / 1e9, 2), int(100 * (memory / system_memory)) - ) - ) - - spec = ResourceSpec( - num_cpus, - num_gpus, - memory, - object_store_memory, - resources, - ) - assert spec.resolved() - return spec diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index ee7ac25403c9..e6aa1975bbc1 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1538,7 +1538,7 @@ def start_raylet( session_dir: str, resource_dir: str, log_dir: str, - resource_spec, + resource_and_label_spec, plasma_directory: str, fallback_directory: str, object_store_memory: int, @@ -1572,7 +1572,6 @@ def start_raylet( env_updates: Optional[dict] = None, node_name: Optional[str] = None, webui: Optional[str] = None, - labels: Optional[dict] = None, ): """Start a raylet, which is a combined local scheduler and object manager. @@ -1594,7 +1593,7 @@ def start_raylet( session_dir: The path of this session. resource_dir: The path of resource of this session . log_dir: The path of the dir where log files are created. - resource_spec: Resources for this raylet. + resource_and_label_spec: Resources and key-value labels for this raylet. plasma_directory: A directory where the Plasma memory mapped files will be created. fallback_directory: A directory where the Object store fallback files will be created. @@ -1649,7 +1648,6 @@ def start_raylet( env_updates: Environment variable overrides. node_name: The name of the node. webui: The url of the UI. - labels: The key-value labels of the node. Returns: ProcessInfo for the process that was started. """ @@ -1658,8 +1656,9 @@ def start_raylet( if use_valgrind and use_profiler: raise ValueError("Cannot use valgrind and profiler at the same time.") - assert resource_spec.resolved() - static_resources = resource_spec.to_resource_dict() + # Get the static resources and labels from the resolved ResourceAndLabelSpec + static_resources = resource_and_label_spec.to_resource_dict() + labels = resource_and_label_spec.labels # Limit the number of workers that can be started in parallel by the # raylet. However, make sure it is at least 1. @@ -1907,7 +1906,7 @@ def start_raylet( if worker_port_list is not None: command.append(f"--worker_port_list={worker_port_list}") command.append( - "--num_prestart_python_workers={}".format(int(resource_spec.num_cpus)) + "--num_prestart_python_workers={}".format(int(resource_and_label_spec.num_cpus)) ) command.append( "--dashboard_agent_command={}".format( diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 1d97f8b02f07..6d253a761354 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1054,10 +1054,12 @@ def get_accelerator_ids_for_accelerator_resource( # Give all accelerator ids in local_mode. if self.mode == LOCAL_MODE: if resource_name == ray_constants.GPU: - max_accelerators = self.node.get_resource_spec().num_gpus + max_accelerators = self.node.get_resource_and_label_spec().num_gpus else: - max_accelerators = self.node.get_resource_spec().resources.get( - resource_name, None + max_accelerators = ( + self.node.get_resource_and_label_spec().resources.get( + resource_name, None + ) ) if max_accelerators: assigned_ids = original_ids[:max_accelerators] diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index f69f80f46e10..b6a010a53aa9 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -774,3 +774,12 @@ cdef extern from "ray/common/constants.h" nogil: cdef const char[] kGcsAutoscalerV2EnabledKey cdef const char[] kGcsAutoscalerClusterConfigKey cdef const char[] kGcsPidKey + cdef const char[] kNodeTypeNameEnv + cdef const char[] kNodeMarketTypeEnv + cdef const char[] kNodeRegionEnv + cdef const char[] kNodeZoneEnv + cdef const char[] kLabelKeyNodeAcceleratorType + cdef const char[] kLabelKeyNodeMarketType + cdef const char[] kLabelKeyNodeRegion + cdef const char[] kLabelKeyNodeZone + cdef const char[] kLabelKeyNodeGroup diff --git a/python/ray/includes/common.pxi b/python/ray/includes/common.pxi index 3db7b9391d72..b4c257a7b140 100644 --- a/python/ray/includes/common.pxi +++ b/python/ray/includes/common.pxi @@ -14,6 +14,15 @@ from ray.includes.common cimport ( kGcsAutoscalerV2EnabledKey, kGcsAutoscalerClusterConfigKey, kGcsPidKey, + kNodeTypeNameEnv, + kNodeMarketTypeEnv, + kNodeRegionEnv, + kNodeZoneEnv, + kLabelKeyNodeAcceleratorType, + kLabelKeyNodeMarketType, + kLabelKeyNodeRegion, + kLabelKeyNodeZone, + kLabelKeyNodeGroup, ) from ray.exceptions import ( @@ -128,3 +137,15 @@ GCS_AUTOSCALER_STATE_NAMESPACE = kGcsAutoscalerStateNamespace.decode() GCS_AUTOSCALER_V2_ENABLED_KEY = kGcsAutoscalerV2EnabledKey.decode() GCS_AUTOSCALER_CLUSTER_CONFIG_KEY = kGcsAutoscalerClusterConfigKey.decode() GCS_PID_KEY = kGcsPidKey.decode() + +# Ray node label related constants form src/ray/common/constants.h +NODE_TYPE_NAME_ENV = kNodeTypeNameEnv.decode() +NODE_MARKET_TYPE_ENV = kNodeMarketTypeEnv.decode() +NODE_REGION_ENV = kNodeRegionEnv.decode() +NODE_ZONE_ENV = kNodeZoneEnv.decode() + +RAY_NODE_ACCELERATOR_TYPE_KEY = kLabelKeyNodeAcceleratorType.decode() +RAY_NODE_MARKET_TYPE_KEY = kLabelKeyNodeMarketType.decode() +RAY_NODE_REGION_KEY = kLabelKeyNodeRegion.decode() +RAY_NODE_ZONE_KEY = kLabelKeyNodeZone.decode() +RAY_NODE_GROUP_KEY = kLabelKeyNodeGroup.decode() diff --git a/python/ray/serve/tests/test_serve_ha.py b/python/ray/serve/tests/test_serve_ha.py index 608b36ae1c33..a15cff0a3a47 100644 --- a/python/ray/serve/tests/test_serve_ha.py +++ b/python/ray/serve/tests/test_serve_ha.py @@ -109,7 +109,7 @@ def check_for_head_node_come_back_up(): import ray import requests from ray.serve.schema import ServeInstanceDetails -from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME +from ray._private.resource_and_label_spec import HEAD_NODE_RESOURCE_NAME ray.init(address="auto") head_node_id = ray.get_runtime_context().get_node_id() serve_details = ServeInstanceDetails( diff --git a/python/ray/tests/test_node_label_scheduling_strategy.py b/python/ray/tests/test_node_label_scheduling_strategy.py index 9493cd68acf2..d13fc587ac08 100644 --- a/python/ray/tests/test_node_label_scheduling_strategy.py +++ b/python/ray/tests/test_node_label_scheduling_strategy.py @@ -106,7 +106,7 @@ def test_node_label_scheduling_in_cluster(ray_start_cluster): assert ray.get(actor.get_node_id.remote(), timeout=3) == node_1 actor = MyActor.options( - scheduling_strategy=NodeLabelSchedulingStrategy({"ray.io/node_id": In(node_4)}) + scheduling_strategy=NodeLabelSchedulingStrategy({"ray.io/node-id": In(node_4)}) ).remote() assert ray.get(actor.get_node_id.remote(), timeout=3) == node_4 diff --git a/python/ray/tests/test_node_labels.py b/python/ray/tests/test_node_labels.py index 6a7221ecc446..0761af137ed8 100644 --- a/python/ray/tests/test_node_labels.py +++ b/python/ray/tests/test_node_labels.py @@ -13,8 +13,8 @@ def check_cmd_stderr(cmd): return subprocess.run(cmd, stderr=subprocess.PIPE).stderr.decode("utf-8") -def add_default_labels(node_info, labels): - labels["ray.io/node_id"] = node_info["NodeID"] +def add_default_labels_for_test(node_info, labels): + labels["ray.io/node-id"] = node_info["NodeID"] return labels @@ -26,7 +26,7 @@ def add_default_labels(node_info, labels): def test_ray_start_set_node_labels_from_json(call_ray_start): ray.init(address=call_ray_start) node_info = ray.nodes()[0] - assert node_info["Labels"] == add_default_labels( + assert node_info["Labels"] == add_default_labels_for_test( node_info, {"gpu_type": "A100", "region": "us"} ) @@ -39,7 +39,7 @@ def test_ray_start_set_node_labels_from_json(call_ray_start): def test_ray_start_set_node_labels_from_string(call_ray_start): ray.init(address=call_ray_start) node_info = ray.nodes()[0] - assert node_info["Labels"] == add_default_labels( + assert node_info["Labels"] == add_default_labels_for_test( node_info, {"gpu_type": "A100", "region": "us"} ) @@ -54,18 +54,18 @@ def test_ray_start_set_node_labels_from_string(call_ray_start): def test_ray_start_set_empty_node_labels(call_ray_start): ray.init(address=call_ray_start) node_info = ray.nodes()[0] - assert node_info["Labels"] == add_default_labels(node_info, {}) + assert node_info["Labels"] == add_default_labels_for_test(node_info, {}) def test_ray_init_set_node_labels(shutdown_only): labels = {"gpu_type": "A100", "region": "us"} ray.init(labels=labels) node_info = ray.nodes()[0] - assert node_info["Labels"] == add_default_labels(node_info, labels) + assert node_info["Labels"] == add_default_labels_for_test(node_info, labels) ray.shutdown() ray.init(labels={}) node_info = ray.nodes()[0] - assert node_info["Labels"] == add_default_labels(node_info, {}) + assert node_info["Labels"] == add_default_labels_for_test(node_info, {}) def test_ray_init_set_node_labels_value_error(ray_start_cluster): @@ -87,7 +87,7 @@ def test_ray_start_set_node_labels_value_error(): assert "Label string is not a key-value pair." in out out = check_cmd_stderr( - ["ray", "start", "--head", '--labels={"ray.io/node_id":"111"}'] + ["ray", "start", "--head", '--labels={"ray.io/node-id":"111"}'] ) assert "Label string is not a key-value pair" in out @@ -104,14 +104,14 @@ def test_cluster_add_node_with_labels(ray_start_cluster): cluster.wait_for_nodes() ray.init(address=cluster.address) node_info = ray.nodes()[0] - assert node_info["Labels"] == add_default_labels(node_info, labels) + assert node_info["Labels"] == add_default_labels_for_test(node_info, labels) head_node_id = ray.nodes()[0]["NodeID"] cluster.add_node(num_cpus=1, labels={}) cluster.wait_for_nodes() for node in ray.nodes(): if node["NodeID"] != head_node_id: - assert node["Labels"] == add_default_labels(node, {}) + assert node["Labels"] == add_default_labels_for_test(node, {}) @pytest.mark.parametrize("autoscaler_v2", [False, True], ids=["v1", "v2"]) @@ -137,12 +137,14 @@ def test_autoscaler_set_node_labels(autoscaler_v2, shutdown_only): for node in ray.nodes(): if node["Resources"].get("CPU", 0) == 1: - assert node["Labels"] == add_default_labels(node, {"region": "us"}) + assert node["Labels"] == add_default_labels_for_test( + node, {"region": "us"} + ) finally: cluster.shutdown() -def test_ray_start_set_node_labels_from_file(): +def test_ray_start_set_node_labels_from_file(shutdown_only): with tempfile.NamedTemporaryFile(mode="w+", delete=False) as test_file: test_file.write('"gpu_type": "A100"\n"region": "us"\n"market-type": "spot"') test_file_path = test_file.name @@ -152,7 +154,7 @@ def test_ray_start_set_node_labels_from_file(): subprocess.check_call(cmd) ray.init(address="auto") node_info = ray.nodes()[0] - assert node_info["Labels"] == add_default_labels( + assert node_info["Labels"] == add_default_labels_for_test( node_info, {"gpu_type": "A100", "region": "us", "market-type": "spot"} ) finally: @@ -160,5 +162,24 @@ def test_ray_start_set_node_labels_from_file(): os.remove(test_file_path) +def test_get_default_ray_node_labels(shutdown_only, monkeypatch): + # Set env vars for this test + monkeypatch.setenv("RAY_NODE_MARKET_TYPE", "spot") + monkeypatch.setenv("RAY_NODE_TYPE_NAME", "worker-group-1") + monkeypatch.setenv("RAY_NODE_REGION", "us-central2") + monkeypatch.setenv("RAY_NODE_ZONE", "us-central2-b") + monkeypatch.setenv("TPU_ACCELERATOR_TYPE", "v4-16") + + ray.init(resources={"TPU": 4}) + node_info = ray.nodes()[0] + labels = node_info["Labels"] + + assert labels.get("ray.io/market-type") == "spot" + assert labels.get("ray.io/node-group") == "worker-group-1" + assert labels.get("ray.io/availability-region") == "us-central2" + assert labels.get("ray.io/availability-zone") == "us-central2-b" + assert labels.get("ray.io/accelerator-type") == "TPU-V4" + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_runtime_context.py b/python/ray/tests/test_runtime_context.py index 88a2baacb8a8..0d44d1925784 100644 --- a/python/ray/tests/test_runtime_context.py +++ b/python/ray/tests/test_runtime_context.py @@ -422,12 +422,11 @@ def test_get_node_labels(ray_start_cluster_head): resources={"worker1": 1}, num_cpus=1, labels={ - "accelerator-type": "A100", - "region": "us-west4", - "market-type": "spot", + "ray.io/accelerator-type": "A100", + "ray.io/availability-region": "us-west4", + "ray.io/market-type": "spot", }, ) - # ray.init(address=cluster.address) @ray.remote class Actor: @@ -438,20 +437,20 @@ def get_node_labels(self): return ray.get_runtime_context().get_node_labels() expected_node_labels = { - "accelerator-type": "A100", - "region": "us-west4", - "market-type": "spot", + "ray.io/accelerator-type": "A100", + "ray.io/availability-region": "us-west4", + "ray.io/market-type": "spot", } # Check node labels from Actor runtime context - a = Actor.options(label_selector={"accelerator-type": "A100"}).remote() + a = Actor.options(label_selector={"ray.io/accelerator-type": "A100"}).remote() node_labels = ray.get(a.get_node_labels.remote()) - expected_node_labels["ray.io/node_id"] = ray.get(a.get_node_id.remote()) + expected_node_labels["ray.io/node-id"] = ray.get(a.get_node_id.remote()) assert expected_node_labels == node_labels # Check node labels from driver runtime context (none are set except default) driver_labels = ray.get_runtime_context().get_node_labels() - assert {"ray.io/node_id": ray.get_runtime_context().get_node_id()} == driver_labels + assert {"ray.io/node-id": ray.get_runtime_context().get_node_id()} == driver_labels if __name__ == "__main__": diff --git a/python/ray/tests/test_state_api.py b/python/ray/tests/test_state_api.py index 1ea400e82a48..77c7b9a72897 100644 --- a/python/ray/tests/test_state_api.py +++ b/python/ray/tests/test_state_api.py @@ -2198,7 +2198,7 @@ def verify(): nodes = list_nodes(detail=True) for node in nodes: assert is_hex(node["node_id"]) - assert node["labels"] == {"ray.io/node_id": node["node_id"]} + assert node["labels"] == {"ray.io/node-id": node["node_id"]} if node["node_name"] == "head_node": assert node["is_head_node"] assert node["state"] == "ALIVE" diff --git a/python/ray/tests/unit/test_resource_and_label_spec.py b/python/ray/tests/unit/test_resource_and_label_spec.py new file mode 100644 index 000000000000..76568b5b4fe0 --- /dev/null +++ b/python/ray/tests/unit/test_resource_and_label_spec.py @@ -0,0 +1,351 @@ +import sys +import json +import pytest +from unittest.mock import patch +from ray._common.constants import HEAD_NODE_RESOURCE_NAME, NODE_ID_PREFIX +import ray._private.ray_constants as ray_constants +from ray._private.accelerators import AcceleratorManager +from ray._private.resource_and_label_spec import ResourceAndLabelSpec + + +class FakeAcceleratorManager(AcceleratorManager): + """Minimal fake Acceleratormanager for testing.""" + + # Configure these values to test different resource resolution paths. + def __init__( + self, + resource_name, + accelerator_type, + num_accelerators, + additional_resources=None, + visible_ids=None, + ): + self._resource_name = resource_name + self._accelerator_type = accelerator_type + self._num_accelerators = num_accelerators + self._additional_resources = additional_resources + self._visible_ids = visible_ids + + def get_current_node_num_accelerators(self) -> int: + return self._num_accelerators + + def get_current_process_visible_accelerator_ids(self): + if self._visible_ids is not None: + return [str(i) for i in range(self._visible_ids)] + return [str(i) for i in range(self._num_accelerators)] + + def get_resource_name(self) -> str: + return self._resource_name + + def get_current_node_accelerator_type(self) -> str: + return self._accelerator_type + + def get_visible_accelerator_ids_env_var(self) -> str: + return "CUDA_VISIBLE_DEVICES" + + def get_current_node_additional_resources(self): + return self._additional_resources or {} + + def set_current_process_visible_accelerator_ids(self, ids): + pass + + def validate_resource_request_quantity(self, quantity: int) -> None: + pass + + +def test_resource_and_label_spec_resolves_with_params(): + """Validate that ResourceAndLabelSpec resolve() respects passed in + Ray Params rather than overriding with auto-detection/system defaults.""" + # Create ResourceAndLabelSpec with args from RayParams. + spec = ResourceAndLabelSpec( + num_cpus=8, + num_gpus=2, + memory=10 * 1024**3, + object_store_memory=5 * 1024**3, + resources={"TPU": 42}, + labels={"ray.io/market-type": "spot"}, + ) + + spec.resolve(is_head=False) + + # Verify that explicit Ray Params values are preserved. + assert spec.num_cpus == 8 + assert spec.num_gpus == 2 + assert spec.memory == 10 * 1024**3 + assert spec.object_store_memory == 5 * 1024**3 + assert spec.resources["TPU"] == 42 + assert any(key.startswith(NODE_ID_PREFIX) for key in spec.resources) + assert spec.labels["ray.io/market-type"] == "spot" + + assert spec.resolved() + + +def test_resource_and_label_spec_resolves_auto_detect(monkeypatch): + """Validate that ResourceAndLabelSpec resolve() fills out defaults detected from + system when Params not passed.""" + monkeypatch.setattr("ray._private.utils.get_num_cpus", lambda: 4) # 4 cpus + monkeypatch.setattr( + "ray._common.utils.get_system_memory", lambda: 16 * 1024**3 + ) # 16GB + monkeypatch.setattr( + "ray._private.utils.estimate_available_memory", lambda: 8 * 1024**3 + ) # 8GB + monkeypatch.setattr( + "ray._private.utils.get_shared_memory_bytes", lambda: 4 * 1024**3 + ) # 4GB + + spec = ResourceAndLabelSpec() + spec.resolve(is_head=True) + + assert spec.resolved() + + # Validate all fields are set based on defaults or calls to system. + assert spec.num_cpus == 4 + assert spec.num_gpus == 0 + assert isinstance(spec.labels, dict) + assert HEAD_NODE_RESOURCE_NAME in spec.resources + assert any(key.startswith(NODE_ID_PREFIX) for key in spec.resources.keys()) + + # object_store_memory = 8GB * DEFAULT_OBJECT_STORE_MEMORY_PROPORTION + expected_object_store = int( + 8 * 1024**3 * ray_constants.DEFAULT_OBJECT_STORE_MEMORY_PROPORTION + ) + assert spec.object_store_memory == expected_object_store + + # memory is total available memory - object_store_memory + expected_memory = 8 * 1024**3 - expected_object_store + assert spec.memory == expected_memory + + +def test_env_resource_overrides_with_conflict(monkeypatch): + """Validate that RESOURCES_ENVIRONMENT_VARIABLE overrides Ray Param resources.""" + # Prepare environment overrides + env_resources = { + "CPU": 8, + "GPU": 4, + "TPU": 4, + } + monkeypatch.setenv( + ray_constants.RESOURCES_ENVIRONMENT_VARIABLE, json.dumps(env_resources) + ) + + ray_params_resources = {"TPU": 8, "B200": 4} + + # num_cpus, num_gpus, and conflicting resources should override + spec = ResourceAndLabelSpec( + num_cpus=2, + num_gpus=1, + resources=ray_params_resources, + labels={}, + ) + + spec.resolve(is_head=True) + + # Environment overrides values take precedence after resolve + assert spec.num_cpus == 8 + assert spec.num_gpus == 4 + assert spec.resources["TPU"] == 4 + assert spec.resources["B200"] == 4 + + +def test_to_resource_dict_with_invalid_types(): + """Validate malformed resource values raise ValueError from to_resource_dict().""" + spec = ResourceAndLabelSpec( + num_cpus=1, + num_gpus=1, + memory=1_000, + object_store_memory=1_000, + resources={"INVALID": -5}, # Invalid + labels={}, + ) + spec.resolve(is_head=True, node_ip_address="127.0.0.1") + with pytest.raises(ValueError): + spec.to_resource_dict() + + +def test_resolve_memory_resources(monkeypatch): + """Validate that resolve correctly sets system object_store memory and + raises ValueError when configured memory is too low.""" + # object_store_memory capped at 95% of shm size to avoid low performance. + monkeypatch.setattr( + "ray._common.utils.get_system_memory", lambda: 2 * 1024**3 + ) # 2 GB + monkeypatch.setattr( + "ray._private.utils.estimate_available_memory", lambda: 1 * 1024**3 + ) # 2 GB + monkeypatch.setattr( + "ray._private.utils.get_shared_memory_bytes", lambda: 512 * 1024**2 + ) # 512 MB + + spec1 = ResourceAndLabelSpec() + spec1.resolve(is_head=False) + + max_shm = 512 * 1024**2 * 0.95 + assert spec1.object_store_memory <= max_shm + assert spec1.memory > 0 + + # Low available memory for tasks/actors triggers ValueError. + monkeypatch.setattr( + "ray._common.utils.get_system_memory", lambda: 2 * 1024**3 + ) # 2 GB + monkeypatch.setattr( + "ray._private.utils.estimate_available_memory", lambda: 100 * 1024**2 + ) # 100 MB + monkeypatch.setattr( + "ray._private.utils.get_shared_memory_bytes", lambda: 50 * 1024**2 + ) # 50 MB + + spec2 = ResourceAndLabelSpec() + with pytest.raises(ValueError, match="available for tasks and actors"): + spec2.resolve(is_head=False) + + +def test_resolve_raises_on_reserved_head_resource(): + """resolve should raise a ValueError if HEAD_NODE_RESOURCE_NAME is set in resources.""" + spec = ResourceAndLabelSpec(resources={HEAD_NODE_RESOURCE_NAME: 1}, labels={}) + with pytest.raises(ValueError, match=HEAD_NODE_RESOURCE_NAME): + spec.resolve(is_head=True) + + +def test_resolve_handles_no_accelerators(): + """Check resolve() is able to handle the no accelerators detected case.""" + spec = ResourceAndLabelSpec() + # No accelerators are returned. + with patch( + "ray._private.accelerators.get_all_accelerator_resource_names", + return_value=[], + ): + spec.resolve(is_head=False, node_ip_address="test") + + # With no accelerators detected or num_gpus, GPU count should default to 0 + # and the resources dictionary is unchanged. + assert spec.num_gpus == 0 + assert spec.resources == {"node:test": 1} + assert spec.resolved() + + +def test_label_spec_resolve_merged_env_labels(monkeypatch): + """Validate that LABELS_ENVIRONMENT_VARIABLE is merged into final labels.""" + override_labels = {"autoscaler-override-label": "example"} + monkeypatch.setenv( + ray_constants.LABELS_ENVIRONMENT_VARIABLE, json.dumps(override_labels) + ) + spec = ResourceAndLabelSpec() + spec.resolve(is_head=True) + + assert any(key == "autoscaler-override-label" for key in spec.labels) + + +def test_merge_labels_populates_defaults(monkeypatch): + """Ensure default labels (node type, market type, region, zone, accelerator) populate correctly.""" + # Patch Ray K8s label environment vars + monkeypatch.setenv(ray_constants.LABELS_ENVIRONMENT_VARIABLE, "{}") + monkeypatch.setenv("RAY_NODE_TYPE_NAME", "worker-group-1") + monkeypatch.setenv("RAY_NODE_MARKET_TYPE", "spot") + monkeypatch.setenv("RAY_NODE_REGION", "us-west1") + monkeypatch.setenv("RAY_NODE_ZONE", "us-west1-a") + + spec = ResourceAndLabelSpec() + + # AcceleratorManager for node with 1 GPU + with patch( + "ray._private.accelerators.get_accelerator_manager_for_resource", + return_value=FakeAcceleratorManager("GPU", "A100", 1), + ), patch( + "ray._private.accelerators.get_all_accelerator_resource_names", + return_value=["GPU"], + ): + spec.resolve(is_head=False) + + # Verify all default labels are present + assert spec.labels.get("ray.io/node-group") == "worker-group-1" + assert spec.labels.get("ray.io/market-type") == "spot" + assert spec.labels.get("ray.io/availability-region") == "us-west1" + assert spec.labels.get("ray.io/availability-zone") == "us-west1-a" + assert spec.labels.get("ray.io/accelerator-type") == "A100" + assert spec.resolved() + + +def test_resolve_raises_if_exceeds_visible_devices(): + """Check that ValueError is raised when requested accelerators exceed visible IDs.""" + spec = ResourceAndLabelSpec() + spec.num_gpus = 3 # request 3 GPUs + + with patch( + "ray._private.accelerators.get_accelerator_manager_for_resource", + return_value=FakeAcceleratorManager( + "GPU", "A100", num_accelerators=5, visible_ids=2 + ), + ), patch( + "ray._private.accelerators.get_all_accelerator_resource_names", + return_value=["GPU"], + ): + with pytest.raises(ValueError, match="Attempting to start raylet"): + spec.resolve(is_head=False) + + +def test_resolve_sets_accelerator_resources(): + """Verify that GPUs/TPU values are auto-detected and assigned properly.""" + spec = ResourceAndLabelSpec() + + # Mock a node with GPUs with 4 visible IDs + with patch( + "ray._private.accelerators.get_accelerator_manager_for_resource", + return_value=FakeAcceleratorManager("GPU", "A100", 4), + ), patch( + "ray._private.accelerators.get_all_accelerator_resource_names", + return_value=["GPU"], + ): + spec.resolve(is_head=False) + + assert spec.num_gpus == 4 + assert spec.resources.get("accelerator_type:A100") == 1 + + +def test_respect_configured_num_gpus(): + """Ensure manually set num_gpus overrides differing auto-detected accelerator value.""" + # Create a ResourceAndLabelSpec with num_gpus=2 from Ray Params. + spec = ResourceAndLabelSpec(num_gpus=2) + # Mock a node with GPUs with 4 visible IDs + with patch( + "ray._private.accelerators.get_accelerator_manager_for_resource", + return_value=FakeAcceleratorManager("GPU", "A100", 4), + ), patch( + "ray._private.accelerators.get_all_accelerator_resource_names", + return_value=["GPU"], + ): + spec.resolve(is_head=False) + + assert spec.num_gpus == 2, ( + f"Expected manually set num_gpus=2 to take precedence over auto-detected value, " + f"but got {spec.num_gpus}" + ) + # Accelerator type key should be set in resources. + assert spec.resources.get("accelerator_type:A100") == 1 + + +def test_resolve_sets_non_gpu_accelerator(): + """Verify that non-GPU accelerators are added to resources. Non-GPU accelerators + should not alter the value of num_gpus.""" + spec = ResourceAndLabelSpec() + # Mock accelerator manager to return a TPU v6e accelerator + with patch( + "ray._private.accelerators.get_accelerator_manager_for_resource", + return_value=FakeAcceleratorManager("TPU", "TPU-v6e", 2, {"TPU-v6e-8-HEAD": 1}), + ), patch( + "ray._private.accelerators.get_all_accelerator_resource_names", + return_value=["TPU"], + ): + spec.resolve(is_head=False) + + # num_gpus should default to 0 + assert spec.num_gpus == 0 + assert spec.resources["TPU"] == 2 + assert spec.resources["TPU-v6e-8-HEAD"] == 1 + # Accelerator type label is present + assert spec.labels.get("ray.io/accelerator-type") == "TPU-v6e" + assert spec.resolved() + + +if __name__ == "__main__": + sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index edac51c437f3..d940741a51f9 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -76,7 +76,11 @@ constexpr char kGcsAutoscalerClusterConfigKey[] = "__autoscaler_cluster_config"; /// Name for cloud instance id env constexpr char kNodeCloudInstanceIdEnv[] = "RAY_CLOUD_INSTANCE_ID"; +/// ENV keys for Ray node labels constexpr char kNodeTypeNameEnv[] = "RAY_NODE_TYPE_NAME"; +constexpr char kNodeMarketTypeEnv[] = "RAY_NODE_MARKET_TYPE"; +constexpr char kNodeRegionEnv[] = "RAY_NODE_REGION"; +constexpr char kNodeZoneEnv[] = "RAY_NODE_ZONE"; constexpr char kNodeCloudInstanceTypeNameEnv[] = "RAY_CLOUD_INSTANCE_TYPE_NAME"; @@ -96,9 +100,28 @@ constexpr char kLibraryPathEnvName[] = "PATH"; constexpr char kLibraryPathEnvName[] = "LD_LIBRARY_PATH"; #endif +/// Default node label keys populated by the Raylet #define RAY_LABEL_KEY_PREFIX "ray.io/" -/// Default node label key: node_id -constexpr char kLabelKeyNodeID[] = RAY_LABEL_KEY_PREFIX "node_id"; + +// The unique ID assigned to this node by the Raylet. +constexpr char kLabelKeyNodeID[] = RAY_LABEL_KEY_PREFIX "node-id"; + +// The accelerator type associated with the Ray node (e.g., "A100"). +constexpr char kLabelKeyNodeAcceleratorType[] = RAY_LABEL_KEY_PREFIX "accelerator-type"; + +// The market type of the cloud instance this Ray node runs on (e.g., "on-demand" or +// "spot"). +constexpr char kLabelKeyNodeMarketType[] = RAY_LABEL_KEY_PREFIX "market-type"; + +// The region of the cloud instance this Ray node runs on (e.g., "us-central2"). +constexpr char kLabelKeyNodeRegion[] = RAY_LABEL_KEY_PREFIX "availability-region"; + +// The zone of the cloud instance this Ray node runs on (e.g., "us-central2-b"). +constexpr char kLabelKeyNodeZone[] = RAY_LABEL_KEY_PREFIX "availability-zone"; + +// The name of the head or worker group this Ray node is a part of. +constexpr char kLabelKeyNodeGroup[] = RAY_LABEL_KEY_PREFIX "node-group"; + #undef RAY_LABEL_KEY_PREFIX /// All nodes implicitly have resources with this prefix and the quantity is 1.