diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 9e2b5e580c17..e9226a437f3c 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -103,6 +103,8 @@ from ray.includes.common cimport ( CLabelMatchExpression, CLabelIn, CLabelNotIn, + CLabelSelector, + CNodeResources, CRayFunction, CWorkerType, CJobConfig, @@ -597,6 +599,36 @@ cdef int prepare_label_selector( return 0 +cdef extern from * nogil: + """ + #include "ray/common/scheduling/cluster_resource_data.h" + + // Helper function convert a std::unordered_map to the absl::flat_hash_map used by NodeResources. + void SetNodeResourcesLabels(ray::NodeResources& resources, const std::unordered_map& labels) { + for (const auto& pair : labels) { + resources.labels[pair.first] = pair.second; + } + } + """ + void SetNodeResourcesLabels(CNodeResources& resources, const unordered_map[c_string, c_string]& labels) + +def node_labels_match_selector(node_labels: Dict[str, str], selector: Dict[str, str]) -> bool: + """ + Checks if the given node labels satisfy the label selector. This helper function exposes + the C++ logic for determining if a node satisfies a label selector to the Python layer. + """ + cdef: + CNodeResources c_node_resources + CLabelSelector c_label_selector + unordered_map[c_string, c_string] c_labels_map + + prepare_labels(node_labels, &c_labels_map) + SetNodeResourcesLabels(c_node_resources, c_labels_map) + prepare_label_selector(selector, &c_label_selector) + + # Return whether the node resources satisfy the label constraint. + return c_node_resources.HasRequiredLabels(c_label_selector) + cdef int prepare_fallback_strategy( list fallback_strategy, c_vector[CFallbackOption] *fallback_strategy_vector) except -1: diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 75278ab76d80..7c863d900344 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -275,6 +275,12 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: CLineageReconstructionTask() const c_string &SerializeAsString() const +cdef extern from "ray/common/scheduling/cluster_resource_data.h" namespace "ray" nogil: + cdef cppclass CNodeResources "ray::NodeResources": + CNodeResources() + unordered_map[c_string, c_string] labels + c_bool HasRequiredLabels(const CLabelSelector &label_selector) const + cdef extern from "ray/common/scheduling/label_selector.h" namespace "ray": cdef cppclass CLabelSelector "ray::LabelSelector": CLabelSelector() nogil except + diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index f3912f6599ad..ca1e443d56d2 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -1707,6 +1707,14 @@ def override_deployment_info( override_max_replicas_per_node = options.pop( "max_replicas_per_node", replica_config.max_replicas_per_node ) + override_bundle_label_selector = options.pop( + "placement_group_bundle_label_selector", + replica_config.placement_group_bundle_label_selector, + ) + override_fallback_strategy = options.pop( + "placement_group_fallback_strategy", + replica_config.placement_group_fallback_strategy, + ) # Record telemetry for container runtime env feature at deployment level if override_actor_options.get("runtime_env") and ( @@ -1725,6 +1733,8 @@ def override_deployment_info( placement_group_bundles=override_placement_group_bundles, placement_group_strategy=override_placement_group_strategy, max_replicas_per_node=override_max_replicas_per_node, + placement_group_bundle_label_selector=override_bundle_label_selector, + placement_group_fallback_strategy=override_fallback_strategy, ) override_options["replica_config"] = replica_config diff --git a/python/ray/serve/_private/cluster_node_info_cache.py b/python/ray/serve/_private/cluster_node_info_cache.py index ae0121dd309d..ff6166190de7 100644 --- a/python/ray/serve/_private/cluster_node_info_cache.py +++ b/python/ray/serve/_private/cluster_node_info_cache.py @@ -89,6 +89,10 @@ def get_available_resources_per_node(self) -> Dict[str, Union[float, Dict]]: return self._cached_available_resources_per_node + def get_node_labels(self, node_id: str) -> Dict[str, str]: + """Get the labels for a specific node from the cache.""" + return self._cached_node_labels.get(node_id, {}) + class DefaultClusterNodeInfoCache(ClusterNodeInfoCache): def __init__(self, gcs_client: GcsClient): diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index 251a2f473ab9..4490d16c4552 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -817,6 +817,8 @@ class CreatePlacementGroupRequest: target_node_id: str name: str runtime_env: Optional[str] = None + bundle_label_selector: Optional[List[Dict[str, str]]] = None + fallback_strategy: Optional[List[Dict[str, Any]]] = None # This error is used to raise when a by-value DeploymentResponse is converted to an diff --git a/python/ray/serve/_private/config.py b/python/ray/serve/_private/config.py index 723d269a3012..623f74721dc2 100644 --- a/python/ray/serve/_private/config.py +++ b/python/ray/serve/_private/config.py @@ -480,6 +480,8 @@ def __init__( ray_actor_options: Dict, placement_group_bundles: Optional[List[Dict[str, float]]] = None, placement_group_strategy: Optional[str] = None, + placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None, + placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None, max_replicas_per_node: Optional[int] = None, needs_pickle: bool = True, ): @@ -505,9 +507,14 @@ def __init__( self.placement_group_bundles = placement_group_bundles self.placement_group_strategy = placement_group_strategy + self.placement_group_bundle_label_selector = ( + placement_group_bundle_label_selector + ) + self.placement_group_fallback_strategy = placement_group_fallback_strategy self.max_replicas_per_node = max_replicas_per_node + self._normalize_bundle_label_selector() self._validate() # Create resource_dict. This contains info about the replica's resource @@ -516,6 +523,21 @@ def __init__( self.resource_dict = resources_from_ray_options(self.ray_actor_options) self.needs_pickle = needs_pickle + def _normalize_bundle_label_selector(self): + """If a single selector is provided for multiple bundles, it is broadcasted + uniformly to all bundles. + """ + if ( + self.placement_group_bundles + and self.placement_group_bundle_label_selector + and len(self.placement_group_bundle_label_selector) == 1 + and len(self.placement_group_bundles) > 1 + ): + self.placement_group_bundle_label_selector = ( + self.placement_group_bundle_label_selector + * len(self.placement_group_bundles) + ) + def _validate(self): self._validate_ray_actor_options() self._validate_placement_group_options() @@ -535,15 +557,22 @@ def update( ray_actor_options: dict, placement_group_bundles: Optional[List[Dict[str, float]]] = None, placement_group_strategy: Optional[str] = None, + placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None, + placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None, max_replicas_per_node: Optional[int] = None, ): self.ray_actor_options = ray_actor_options self.placement_group_bundles = placement_group_bundles self.placement_group_strategy = placement_group_strategy + self.placement_group_bundle_label_selector = ( + placement_group_bundle_label_selector + ) + self.placement_group_fallback_strategy = placement_group_fallback_strategy self.max_replicas_per_node = max_replicas_per_node + self._normalize_bundle_label_selector() self._validate() self.resource_dict = resources_from_ray_options(self.ray_actor_options) @@ -557,6 +586,8 @@ def create( ray_actor_options: Optional[Dict] = None, placement_group_bundles: Optional[List[Dict[str, float]]] = None, placement_group_strategy: Optional[str] = None, + placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None, + placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None, max_replicas_per_node: Optional[int] = None, deployment_def_name: Optional[str] = None, ): @@ -597,17 +628,23 @@ def create( deployment_def_name = deployment_def.__name__ config = cls( - deployment_def_name, - pickle_dumps( + deployment_def_name=deployment_def_name, + serialized_deployment_def=pickle_dumps( deployment_def, f"Could not serialize the deployment {repr(deployment_def)}", ), - pickle_dumps(init_args, "Could not serialize the deployment init args"), - pickle_dumps(init_kwargs, "Could not serialize the deployment init kwargs"), - ray_actor_options, - placement_group_bundles, - placement_group_strategy, - max_replicas_per_node, + serialized_init_args=pickle_dumps( + init_args, "Could not serialize the deployment init args" + ), + serialized_init_kwargs=pickle_dumps( + init_kwargs, "Could not serialize the deployment init kwargs" + ), + ray_actor_options=ray_actor_options, + placement_group_bundles=placement_group_bundles, + placement_group_strategy=placement_group_strategy, + placement_group_bundle_label_selector=placement_group_bundle_label_selector, + placement_group_fallback_strategy=placement_group_fallback_strategy, + max_replicas_per_node=max_replicas_per_node, ) config._deployment_def = deployment_def @@ -633,6 +670,8 @@ def _validate_ray_actor_options(self): "resources", # Other options "runtime_env", + "label_selector", + "fallback_strategy", } for option in self.ray_actor_options: @@ -674,11 +713,37 @@ def _validate_placement_group_options(self) -> None: "`placement_group_bundles` must also be provided." ) + if self.placement_group_fallback_strategy is not None: + if self.placement_group_bundles is None: + raise ValueError( + "If `placement_group_fallback_strategy` is provided, " + "`placement_group_bundles` must also be provided." + ) + if not isinstance(self.placement_group_fallback_strategy, list): + raise TypeError( + "placement_group_fallback_strategy must be a list of dictionaries. " + f"Got: {type(self.placement_group_fallback_strategy)}." + ) + for i, strategy in enumerate(self.placement_group_fallback_strategy): + if not isinstance(strategy, dict): + raise TypeError( + f"placement_group_fallback_strategy entry at index {i} must be a dictionary. " + f"Got: {type(strategy)}." + ) + + if self.placement_group_bundle_label_selector is not None: + if self.placement_group_bundles is None: + raise ValueError( + "If `placement_group_bundle_label_selector` is provided, " + "`placement_group_bundles` must also be provided." + ) + if self.placement_group_bundles is not None: validate_placement_group( bundles=self.placement_group_bundles, strategy=self.placement_group_strategy or "PACK", lifetime="detached", + bundle_label_selector=self.placement_group_bundle_label_selector, ) resource_error_prefix = ( @@ -772,19 +837,37 @@ def init_kwargs(self) -> Optional[Tuple[Any]]: @classmethod def from_proto(cls, proto: ReplicaConfigProto, needs_pickle: bool = True): return ReplicaConfig( - proto.deployment_def_name, - proto.deployment_def, - proto.init_args if proto.init_args != b"" else None, - proto.init_kwargs if proto.init_kwargs != b"" else None, - json.loads(proto.ray_actor_options), - json.loads(proto.placement_group_bundles) - if proto.placement_group_bundles - else None, - proto.placement_group_strategy - if proto.placement_group_strategy != "" - else None, - proto.max_replicas_per_node if proto.max_replicas_per_node else None, - needs_pickle, + deployment_def_name=proto.deployment_def_name, + serialized_deployment_def=proto.deployment_def, + serialized_init_args=(proto.init_args if proto.init_args != b"" else None), + serialized_init_kwargs=( + proto.init_kwargs if proto.init_kwargs != b"" else None + ), + ray_actor_options=json.loads(proto.ray_actor_options), + placement_group_bundles=( + json.loads(proto.placement_group_bundles) + if proto.placement_group_bundles + else None + ), + placement_group_strategy=( + proto.placement_group_strategy + if proto.placement_group_strategy != "" + else None + ), + placement_group_bundle_label_selector=( + json.loads(proto.placement_group_bundle_label_selector) + if proto.placement_group_bundle_label_selector + else None + ), + placement_group_fallback_strategy=( + json.loads(proto.placement_group_fallback_strategy) + if proto.placement_group_fallback_strategy + else None + ), + max_replicas_per_node=( + proto.max_replicas_per_node if proto.max_replicas_per_node else None + ), + needs_pickle=needs_pickle, ) @classmethod @@ -793,19 +876,39 @@ def from_proto_bytes(cls, proto_bytes: bytes, needs_pickle: bool = True): return cls.from_proto(proto, needs_pickle) def to_proto(self): + placement_group_bundles = ( + json.dumps(self.placement_group_bundles) + if self.placement_group_bundles is not None + else "" + ) + + bundle_label_selector = ( + json.dumps(self.placement_group_bundle_label_selector) + if self.placement_group_bundle_label_selector is not None + else "" + ) + + fallback_strategy = ( + json.dumps(self.placement_group_fallback_strategy) + if self.placement_group_fallback_strategy is not None + else "" + ) + + max_replicas_per_node = ( + self.max_replicas_per_node if self.max_replicas_per_node is not None else 0 + ) + return ReplicaConfigProto( deployment_def_name=self.deployment_def_name, deployment_def=self.serialized_deployment_def, init_args=self.serialized_init_args, init_kwargs=self.serialized_init_kwargs, ray_actor_options=json.dumps(self.ray_actor_options), - placement_group_bundles=json.dumps(self.placement_group_bundles) - if self.placement_group_bundles is not None - else "", + placement_group_bundles=placement_group_bundles, placement_group_strategy=self.placement_group_strategy, - max_replicas_per_node=self.max_replicas_per_node - if self.max_replicas_per_node is not None - else 0, + placement_group_bundle_label_selector=bundle_label_selector, + placement_group_fallback_strategy=fallback_strategy, + max_replicas_per_node=max_replicas_per_node, ) def to_proto_bytes(self): @@ -818,6 +921,8 @@ def to_dict(self): "ray_actor_options": self.ray_actor_options, "placement_group_bundles": self.placement_group_bundles, "placement_group_strategy": self.placement_group_strategy, + "placement_group_bundle_label_selector": self.placement_group_bundle_label_selector, + "placement_group_fallback_strategy": self.placement_group_fallback_strategy, "max_replicas_per_node": self.max_replicas_per_node, } diff --git a/python/ray/serve/_private/default_impl.py b/python/ray/serve/_private/default_impl.py index f38a6f70f20a..526b0dc0a3e5 100644 --- a/python/ray/serve/_private/default_impl.py +++ b/python/ray/serve/_private/default_impl.py @@ -66,6 +66,7 @@ def _default_create_placement_group( _soft_target_node_id=request.target_node_id, name=request.name, lifetime="detached", + bundle_label_selector=request.bundle_label_selector, ) diff --git a/python/ray/serve/_private/deployment_scheduler.py b/python/ray/serve/_private/deployment_scheduler.py index 53526d4f38a4..cc72afb36f95 100644 --- a/python/ray/serve/_private/deployment_scheduler.py +++ b/python/ray/serve/_private/deployment_scheduler.py @@ -10,6 +10,7 @@ from typing import Any, Callable, Dict, List, Optional, Set, Tuple import ray +from ray._raylet import node_labels_match_selector from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache from ray.serve._private.common import ( CreatePlacementGroupRequest, @@ -155,6 +156,8 @@ class ReplicaSchedulingRequest: # These are optional: by default replicas do not have a placement group. placement_group_bundles: Optional[List[Dict[str, float]]] = None placement_group_strategy: Optional[str] = None + placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None + placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None max_replicas_per_node: Optional[int] = None @property @@ -572,6 +575,7 @@ def _schedule_replica( strategy=placement_group_strategy, target_node_id=target_node_id, name=scheduling_request.actor_options["name"], + bundle_label_selector=scheduling_request.placement_group_bundle_label_selector, ) ) except Exception: @@ -661,16 +665,14 @@ def schedule( Returns: The IDs of replicas to stop for each deployment. """ + # Update pending replicas from upscales. for upscale in upscales.values(): for scheduling_request in upscale: replica_id = scheduling_request.replica_id deployment_id = replica_id.deployment_id self._pending_replicas[deployment_id][replica_id] = scheduling_request - non_strict_pack_pgs_exist = any( - d.is_non_strict_pack_pg() for d in self._deployments.values() - ) - # Schedule replicas using compact strategy. + # Check for deprecated environment variable usage if RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY: warnings.warn( "The environment variable 'RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY' " @@ -679,39 +681,26 @@ def schedule( DeprecationWarning, stacklevel=2, ) - if RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY and not non_strict_pack_pgs_exist: - # Flatten dict of deployment replicas into all replicas, - # then sort by decreasing resource size - all_scheduling_requests = sorted( - _flatten(self._pending_replicas).values(), - key=lambda r: r.required_resources, - reverse=True, - ) - # Schedule each replica - for scheduling_request in all_scheduling_requests: - target_node = self._find_best_available_node( - scheduling_request.required_resources, - self._get_available_resources_per_node(), - ) - - self._schedule_replica( - scheduling_request, - default_scheduling_strategy="DEFAULT", - target_node_id=target_node, - ) + # Determine scheduling strategy + non_strict_pack_pgs_exist = any( + d.is_non_strict_pack_pg() for d in self._deployments.values() + ) + use_pack_strategy = ( + RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY and not non_strict_pack_pgs_exist + ) + if use_pack_strategy: + # This branch is only reached if each deployment either: + # 1. Use STRICT_PACK placement group strategy, or + # 2. Do not use placement groups at all. + # This ensures Serve's best-fit node selection is respected by Ray Core + # (since _soft_target_node_id only works with STRICT_PACK). + self._schedule_with_pack_strategy() else: - for pending_replicas in self._pending_replicas.values(): - if not pending_replicas: - continue - - for scheduling_request in list(pending_replicas.values()): - self._schedule_replica( - scheduling_request=scheduling_request, - default_scheduling_strategy="SPREAD", - ) + self._schedule_with_spread_strategy() + # Handle downscales deployment_to_replicas_to_stop = {} for downscale in downscales.values(): deployment_to_replicas_to_stop[ @@ -722,6 +711,118 @@ def schedule( return deployment_to_replicas_to_stop + def _schedule_with_pack_strategy(self): + """Tries to schedule pending replicas using PACK strategy.""" + # Flatten dict of deployment replicas into all replicas, + # then sort by decreasing resource size + all_scheduling_requests = sorted( + _flatten(self._pending_replicas).values(), + key=lambda r: r.required_resources, + reverse=True, + ) + + # Fetch node labels for active nodes. + active_nodes = self._cluster_node_info_cache.get_active_node_ids() + all_node_labels = { + node_id: self._cluster_node_info_cache.get_node_labels(node_id) + for node_id in active_nodes + } + + for scheduling_request in all_scheduling_requests: + self._pack_schedule_replica(scheduling_request, all_node_labels) + + def _schedule_with_spread_strategy(self): + """Tries to schedule pending replicas using the SPREAD strategy.""" + for pending_replicas in self._pending_replicas.values(): + if not pending_replicas: + continue + + for scheduling_request in list(pending_replicas.values()): + self._schedule_replica( + scheduling_request=scheduling_request, + default_scheduling_strategy="SPREAD", + ) + + def _pack_schedule_replica( + self, + scheduling_request: ReplicaSchedulingRequest, + all_node_labels: Dict[str, Dict[str, str]], + ): + """Attempts to schedule a single request on the best available node.""" + + placement_candidates = self._build_pack_placement_candidates(scheduling_request) + + target_node = None + for required_resources, required_labels in placement_candidates: + target_node = self._find_best_fit_node_for_pack( + required_resources, + self._get_available_resources_per_node(), + required_labels_list=required_labels, + node_labels=all_node_labels, + ) + if target_node: + break + + self._schedule_replica( + scheduling_request, + default_scheduling_strategy="DEFAULT", + target_node_id=target_node, + ) + + def _build_pack_placement_candidates( + self, scheduling_request: ReplicaSchedulingRequest + ) -> List[Tuple[Resources, List[Dict[str, str]]]]: + """Returns a list of (resources, labels) tuples to attempt for scheduling.""" + + # Collect a list of required resources and labels to try to schedule to + # support replica compaction when fallback strategies are provided. + placement_candidates = [] + primary_labels = [] + primary_bundles = scheduling_request.placement_group_bundles + + if primary_bundles: + # PG: Use PG bundle_label_selector + if scheduling_request.placement_group_bundle_label_selector: + pg_strategy = scheduling_request.placement_group_strategy or None + if pg_strategy == "STRICT_PACK": + # All bundle_label_selectors must be satisfied on same node. + primary_labels = ( + scheduling_request.placement_group_bundle_label_selector + ) + else: + # TODO(ryanaoleary@): Support PACK strategy with bundle label selectors. + raise NotImplementedError( + "Placement Group strategy 'PACK' with bundle_label_selector " + "is not yet supported in the Serve scheduler." + ) + else: + # Actor: Use Actor label selector + if "label_selector" in scheduling_request.actor_options: + primary_labels = [ + scheduling_request.actor_options["label_selector"] or {} + ] + + # If PG is defined on scheduling request, then `required_resources` represents the sum across all bundles. + placement_candidates.append( + (scheduling_request.required_resources, primary_labels) + ) + + if scheduling_request.placement_group_fallback_strategy: + # TODO(ryanaoleary@): Add support for placement group fallback_strategy when it's added to options. + raise NotImplementedError( + "Placement Group fallback strategies are not yet supported in the Serve scheduler." + ) + + elif scheduling_request.actor_options.get("fallback_strategy"): + # Fallback strategy provided for Ray Actor. + for fallback in scheduling_request.actor_options["fallback_strategy"]: + fallback_labels = [fallback.get("label_selector", {}) or {}] + placement_candidates.append( + (scheduling_request.required_resources, fallback_labels) + ) + + return placement_candidates + def _get_replicas_to_stop( self, deployment_id: DeploymentID, max_num_to_stop: int ) -> Set[ReplicaID]: @@ -789,10 +890,25 @@ def key(node_and_num_running_replicas_of_all_deployments): return replicas_to_stop - def _find_best_available_node( + def _filter_nodes_by_label_selector( + self, + available_nodes: Dict[str, Resources], + required_labels: Dict[str, str], + node_labels: Dict[str, Dict[str, str]], + ) -> Dict[str, Resources]: + """Filters available nodes based on label selector constraints.""" + return { + node_id: resources + for node_id, resources in available_nodes.items() + if node_labels_match_selector(node_labels.get(node_id, {}), required_labels) + } + + def _find_best_fit_node_for_pack( self, required_resources: Resources, available_resources_per_node: Dict[str, Resources], + required_labels_list: Optional[List[Dict[str, str]]] = None, + node_labels: Optional[Dict[str, Dict[str, str]]] = None, ) -> Optional[str]: """Chooses best available node to schedule the required resources. @@ -801,6 +917,15 @@ def _find_best_available_node( over idle nodes. """ + # Filter feasible nodes by provided label selectors if provided. + if required_labels_list and node_labels: + for required_labels in required_labels_list: + available_resources_per_node = self._filter_nodes_by_label_selector( + available_resources_per_node, required_labels, node_labels + ) + if not available_resources_per_node: + return None + node_to_running_replicas = self._get_node_to_running_replicas() non_idle_nodes = { diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index bf37f65d4d0e..73e96f0d4258 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -138,6 +138,12 @@ def create( placement_group_strategy=info.replica_config.placement_group_strategy, max_replicas_per_node=info.replica_config.max_replicas_per_node, route_prefix=info.route_prefix, + placement_group_bundle_label_selector=( + info.replica_config.placement_group_bundle_label_selector + ), + placement_group_fallback_strategy=( + info.replica_config.placement_group_fallback_strategy + ), ) return cls(info, target_num_replicas, version, deleting) @@ -155,24 +161,68 @@ def is_scaled_copy_of(self, other_target_state: "DeploymentTargetState") -> bool if other_target_state.info is None: return False + if self.info is None: + return False + + actor_options_match = ( + self.info.replica_config.ray_actor_options + == other_target_state.info.replica_config.ray_actor_options + ) + bundles_match = ( + self.info.replica_config.placement_group_bundles + == other_target_state.info.replica_config.placement_group_bundles + ) + strategy_match = ( + self.info.replica_config.placement_group_strategy + == other_target_state.info.replica_config.placement_group_strategy + ) + max_replicas_match = ( + self.info.replica_config.max_replicas_per_node + == other_target_state.info.replica_config.max_replicas_per_node + ) + deployment_config_match = self.info.deployment_config.dict( + exclude={"num_replicas"} + ) == other_target_state.info.deployment_config.dict(exclude={"num_replicas"}) + + # Backward compatibility check for older versions of Ray without these fields. + current_bundle_label_selector = getattr( + self.info.replica_config, "placement_group_bundle_label_selector", None + ) + other_bundle_label_selector = getattr( + other_target_state.info.replica_config, + "placement_group_bundle_label_selector", + None, + ) + bundle_label_selector_match = ( + current_bundle_label_selector == other_bundle_label_selector + ) + + current_fallback = getattr( + self.info.replica_config, "placement_group_fallback_strategy", None + ) + other_fallback = getattr( + other_target_state.info.replica_config, + "placement_group_fallback_strategy", + None, + ) + fallback_match = current_fallback == other_fallback + + # TODO(zcin): version can be None, this is from an outdated codepath. + # We should remove outdated code, so version can never be None. + version_match = ( + self.version is not None and self.version == other_target_state.version + ) + return all( [ - self.info.replica_config.ray_actor_options - == other_target_state.info.replica_config.ray_actor_options, - self.info.replica_config.placement_group_bundles - == other_target_state.info.replica_config.placement_group_bundles, - self.info.replica_config.placement_group_strategy - == other_target_state.info.replica_config.placement_group_strategy, - self.info.replica_config.max_replicas_per_node - == other_target_state.info.replica_config.max_replicas_per_node, - self.info.deployment_config.dict(exclude={"num_replicas"}) - == other_target_state.info.deployment_config.dict( - exclude={"num_replicas"} - ), - # TODO(zcin): version can be None, this is from an outdated codepath. - # We should remove outdated code, so version can never be None. - self.version, - self.version == other_target_state.version, + actor_options_match, + bundles_match, + strategy_match, + bundle_label_selector_match, + fallback_match, + max_replicas_match, + deployment_config_match, + version_match, ] ) @@ -638,6 +688,12 @@ def start( placement_group_strategy=( deployment_info.replica_config.placement_group_strategy ), + placement_group_bundle_label_selector=( + deployment_info.replica_config.placement_group_bundle_label_selector + ), + placement_group_fallback_strategy=( + deployment_info.replica_config.placement_group_fallback_strategy + ), max_replicas_per_node=( deployment_info.replica_config.max_replicas_per_node ), diff --git a/python/ray/serve/_private/test_utils.py b/python/ray/serve/_private/test_utils.py index 623129481451..87d5c4c96b27 100644 --- a/python/ray/serve/_private/test_utils.py +++ b/python/ray/serve/_private/test_utils.py @@ -153,6 +153,9 @@ def add_node(self, node_id: str, resources: Dict = None, labels: Dict = None): def set_available_resources_per_node(self, node_id: str, resources: Dict): self.available_resources_per_node[node_id] = deepcopy(resources) + def get_node_labels(self, node_id: str): + return self.node_labels.get(node_id, {}) + class FakeRemoteFunction: def remote(self): diff --git a/python/ray/serve/_private/version.py b/python/ray/serve/_private/version.py index 1c064a9a9dc7..e9c2c080f80b 100644 --- a/python/ray/serve/_private/version.py +++ b/python/ray/serve/_private/version.py @@ -21,6 +21,8 @@ def __init__( ray_actor_options: Optional[Dict], placement_group_bundles: Optional[List[Dict[str, float]]] = None, placement_group_strategy: Optional[str] = None, + placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None, + placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None, max_replicas_per_node: Optional[int] = None, route_prefix: Optional[str] = None, ): @@ -37,6 +39,10 @@ def __init__( self.ray_actor_options = ray_actor_options self.placement_group_bundles = placement_group_bundles self.placement_group_strategy = placement_group_strategy + self.placement_group_bundle_label_selector = ( + placement_group_bundle_label_selector + ) + self.placement_group_fallback_strategy = placement_group_fallback_strategy self.max_replicas_per_node = max_replicas_per_node self.route_prefix = route_prefix self.compute_hashes() @@ -96,6 +102,14 @@ def compute_hashes(self): combined_placement_group_options["bundles"] = self.placement_group_bundles if self.placement_group_strategy is not None: combined_placement_group_options["strategy"] = self.placement_group_strategy + if self.placement_group_bundle_label_selector is not None: + combined_placement_group_options[ + "bundle_label_selector" + ] = self.placement_group_bundle_label_selector + if self.placement_group_fallback_strategy is not None: + combined_placement_group_options[ + "fallback_strategy" + ] = self.placement_group_fallback_strategy serialized_placement_group_options = _serialize( combined_placement_group_options ) @@ -131,19 +145,43 @@ def compute_hashes(self): def to_proto(self) -> bytes: # TODO(simon): enable cross language user config + + placement_group_bundles = ( + json.dumps(self.placement_group_bundles) + if self.placement_group_bundles is not None + else "" + ) + + placement_group_bundle_label_selector = ( + json.dumps(self.placement_group_bundle_label_selector) + if self.placement_group_bundle_label_selector is not None + else "" + ) + + placement_group_fallback_strategy = ( + json.dumps(self.placement_group_fallback_strategy) + if self.placement_group_fallback_strategy is not None + else "" + ) + + placement_group_strategy = ( + self.placement_group_strategy + if self.placement_group_strategy is not None + else "" + ) + max_replicas_per_node = ( + self.max_replicas_per_node if self.max_replicas_per_node is not None else 0 + ) + return DeploymentVersionProto( code_version=self.code_version, deployment_config=self.deployment_config.to_proto(), ray_actor_options=json.dumps(self.ray_actor_options), - placement_group_bundles=json.dumps(self.placement_group_bundles) - if self.placement_group_bundles is not None - else "", - placement_group_strategy=self.placement_group_strategy - if self.placement_group_strategy is not None - else "", - max_replicas_per_node=self.max_replicas_per_node - if self.max_replicas_per_node is not None - else 0, + placement_group_bundles=placement_group_bundles, + placement_group_strategy=placement_group_strategy, + placement_group_bundle_label_selector=placement_group_bundle_label_selector, + placement_group_fallback_strategy=placement_group_fallback_strategy, + max_replicas_per_node=max_replicas_per_node, ) @classmethod @@ -157,8 +195,20 @@ def from_proto(cls, proto: DeploymentVersionProto): if proto.placement_group_bundles else None ), - placement_group_version=( - proto.placement_group_version if proto.placement_group_version else None + placement_group_bundle_label_selector=( + json.loads(proto.placement_group_bundle_label_selector) + if proto.placement_group_bundle_label_selector + else None + ), + placement_group_fallback_strategy=( + json.loads(proto.placement_group_fallback_strategy) + if proto.placement_group_fallback_strategy + else None + ), + placement_group_strategy=( + proto.placement_group_strategy + if proto.placement_group_strategy + else None ), max_replicas_per_node=( proto.max_replicas_per_node if proto.max_replicas_per_node else None diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 9173011f785c..64e251dbda89 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -327,6 +327,9 @@ def deployment( ray_actor_options: Default[Dict] = DEFAULT.VALUE, placement_group_bundles: Default[List[Dict[str, float]]] = DEFAULT.VALUE, placement_group_strategy: Default[str] = DEFAULT.VALUE, + placement_group_bundle_label_selector: Default[ + List[Dict[str, str]] + ] = DEFAULT.VALUE, max_replicas_per_node: Default[int] = DEFAULT.VALUE, user_config: Default[Optional[Any]] = DEFAULT.VALUE, max_ongoing_requests: Default[int] = DEFAULT.VALUE, @@ -366,7 +369,7 @@ class MyDeployment: route_prefix: Route prefix for HTTP requests. Defaults to '/'. Deprecated. ray_actor_options: Options to pass to the Ray Actor decorator, such as resource requirements. Valid options are: `accelerator_type`, `memory`, - `num_cpus`, `num_gpus`, `resources`, and `runtime_env`. + `num_cpus`, `num_gpus`, `resources`, `runtime_env`, and `label_selector`. placement_group_bundles: Defines a set of placement group bundles to be scheduled *for each replica* of this deployment. The replica actor will be scheduled in the first bundle provided, so the resources specified in @@ -377,6 +380,9 @@ class MyDeployment: This cannot be set together with max_replicas_per_node. placement_group_strategy: Strategy to use for the replica placement group specified via `placement_group_bundles`. Defaults to `PACK`. + placement_group_bundle_label_selector: A list of label selectors to apply to the + placement group on a per-bundle level. If a single label selector is provided, + it is applied to all bundles. Otherwise, the length must match `placement_group_bundles`. max_replicas_per_node: The max number of replicas of this deployment that can run on a single node. Valid values are None (default, no limit) or an integer in the range of [1, 100]. @@ -495,6 +501,13 @@ def decorator(_func_or_class): if placement_group_strategy is not DEFAULT.VALUE else None ), + placement_group_bundle_label_selector=( + placement_group_bundle_label_selector + if placement_group_bundle_label_selector is not DEFAULT.VALUE + else None + ), + # TODO(ryanaoleary@): add placement_group_fallback_strategy when + # fallback_strategy support is added to placement group options. max_replicas_per_node=( max_replicas_per_node if max_replicas_per_node is not DEFAULT.VALUE diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index 31505c8ef70d..f3dc5db45073 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -215,6 +215,9 @@ def options( ray_actor_options: Default[Optional[Dict]] = DEFAULT.VALUE, placement_group_bundles: Default[List[Dict[str, float]]] = DEFAULT.VALUE, placement_group_strategy: Default[str] = DEFAULT.VALUE, + placement_group_bundle_label_selector: Default[ + List[Dict[str, str]] + ] = DEFAULT.VALUE, max_replicas_per_node: Default[int] = DEFAULT.VALUE, user_config: Default[Optional[Any]] = DEFAULT.VALUE, max_ongoing_requests: Default[int] = DEFAULT.VALUE, @@ -341,6 +344,17 @@ def options( if placement_group_strategy is DEFAULT.VALUE: placement_group_strategy = self._replica_config.placement_group_strategy + if placement_group_bundle_label_selector is DEFAULT.VALUE: + placement_group_bundle_label_selector = ( + self._replica_config.placement_group_bundle_label_selector + ) + + # TODO(ryanaoleary@): Add conditional check once fallback_strategy is + # added to placement group options. + placement_group_fallback_strategy = ( + self._replica_config.placement_group_fallback_strategy + ) + if max_replicas_per_node is DEFAULT.VALUE: max_replicas_per_node = self._replica_config.max_replicas_per_node @@ -378,6 +392,8 @@ def options( ray_actor_options=ray_actor_options, placement_group_bundles=placement_group_bundles, placement_group_strategy=placement_group_strategy, + placement_group_bundle_label_selector=placement_group_bundle_label_selector, + placement_group_fallback_strategy=placement_group_fallback_strategy, max_replicas_per_node=max_replicas_per_node, ) diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index c5cd17defe50..46d8f997ec9a 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -254,6 +254,19 @@ class RayActorOptionsSchema(BaseModel): "See :ref:`accelerator types `." ), ) + label_selector: Dict[str, str] = Field( + default=None, + description=( + "If specified, requires that the actor run on a node with the specified labels." + ), + ) + fallback_strategy: List[Dict[str, Any]] = Field( + default=None, + description=( + "If specified, expresses soft constraints through a list of decorator " + "options to fall back on when scheduling on a node." + ), + ) @validator("runtime_env") def runtime_env_contains_remote_uris(cls, v): @@ -395,6 +408,17 @@ class DeploymentSchema(BaseModel, allow_population_by_field_name=True): ), ) + placement_group_bundle_label_selector: List[Dict[str, str]] = Field( + default=DEFAULT.VALUE, + description=( + "A list of label selectors to apply to the placement group " + "on a per-bundle level." + ), + ) + + # TODO(ryanaoleary@): Support placement_group_fallback_strategy here when + # support is added for that field to placement group options. + max_replicas_per_node: int = Field( default=DEFAULT.VALUE, description=( @@ -450,6 +474,32 @@ def validate_max_replicas_per_node_and_placement_group_bundles(cls, values): return values + @root_validator + def validate_bundle_label_selector(cls, values): + placement_group_bundles = values.get("placement_group_bundles", None) + bundle_label_selector = values.get( + "placement_group_bundle_label_selector", None + ) + + if bundle_label_selector not in [DEFAULT.VALUE, None]: + if placement_group_bundles in [DEFAULT.VALUE, None]: + raise ValueError( + "Setting bundle_label_selector is not allowed when " + "placement_group_bundles is not provided." + ) + + if len(bundle_label_selector) != 1 and len(bundle_label_selector) != len( + placement_group_bundles + ): + raise ValueError( + f"The `placement_group_bundle_label_selector` list must contain either " + f"a single selector (to apply to all bundles) or match the number of " + f"`placement_group_bundles`. Got {len(bundle_label_selector)} " + f"selectors for {len(placement_group_bundles)} bundles." + ) + + return values + @root_validator def validate_max_queued_requests(cls, values): max_queued_requests = values.get("max_queued_requests", None) diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index 6bf5243eabb8..32eef2bbebe0 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -345,3 +345,42 @@ def metrics_start_shutdown(request): serve.shutdown() ray.shutdown() reset_ray_address() + + +# Helper function to return the node ID of a remote worker. +@ray.remote(num_cpus=0) +def _get_node_id(): + return ray.get_runtime_context().get_node_id() + + +# Test fixture to start a Serve instance in a RayCluster with two labeled nodes +@pytest.fixture +def serve_instance_with_labeled_nodes(ray_cluster): + cluster = ray_cluster + + # Unlabeled default node. + cluster.add_node(num_cpus=3, resources={"worker0": 1}) + + # Node 1 - labeled A100 node in us-west. + cluster.add_node( + num_cpus=3, + resources={"worker1": 1}, + labels={"region": "us-west", "gpu-type": "A100"}, + ) + + # Node 2 - labeled H100 node in us-east. + cluster.add_node( + num_cpus=3, + resources={"worker2": 1}, + labels={"region": "us-east", "gpu-type": "H100"}, + ) + + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + node_1_id = ray.get(_get_node_id.options(resources={"worker1": 1}).remote()) + node_2_id = ray.get(_get_node_id.options(resources={"worker2": 1}).remote()) + + serve.start() + + yield _get_global_client(), node_1_id, node_2_id, cluster diff --git a/python/ray/serve/tests/test_deployment_scheduler.py b/python/ray/serve/tests/test_deployment_scheduler.py index cb2a00520dad..acb567dadd5f 100644 --- a/python/ray/serve/tests/test_deployment_scheduler.py +++ b/python/ray/serve/tests/test_deployment_scheduler.py @@ -135,6 +135,47 @@ def on_scheduled(actor_handle, placement_group): scheduler.on_replica_stopping(r2_id) scheduler.on_deployment_deleted(dep_id) + @pytest.mark.asyncio + async def test_spread_serve_strict_spread_pg(self, ray_cluster): + """ + Verifies STRICT_SPREAD PG strategy runs successfully in the Spread Scheduler + and spreads bundles across distinct nodes. + """ + cluster = ray_cluster + cluster.add_node(num_cpus=3) + cluster.add_node(num_cpus=3) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + serve.start() + + @ray.remote(num_cpus=0) + def get_task_node_id(): + return ray.get_runtime_context().get_node_id() + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}, {"CPU": 1}], + placement_group_strategy="STRICT_SPREAD", + ) + class StrictSpread: + async def get_bundle_node_id(self, bundle_index: int): + pg = ray.util.get_current_placement_group() + return await get_task_node_id.options( + scheduling_strategy=ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy( + placement_group=pg, + placement_group_bundle_index=bundle_index, + ) + ).remote() + + handle = serve.run(StrictSpread.bind(), name="strict_spread_app") + + node_0 = await handle.get_bundle_node_id.remote(0) + node_1 = await handle.get_bundle_node_id.remote(1) + + assert node_0 != node_1 + + serve.delete("strict_spread_app") + serve.shutdown() + @serve.deployment def A(): @@ -285,6 +326,402 @@ def test_e2e_custom_resources(self, ray_cluster, use_pg): serve.shutdown() + @pytest.mark.asyncio + async def test_e2e_serve_strict_pack_pg_label_selector( + self, serve_instance_with_labeled_nodes + ): + """ + Verifies STRICT_PACK strategy with placement_group_bundle_label_selector in Pack Scheduling Mode. + + Since the strategy is STRICT_PACK, both bundles must be scheduled on the same node, + and that node must satisfy the label constraints in each selector. + """ + _, _, us_east_node_id, _ = serve_instance_with_labeled_nodes + + @ray.remote(num_cpus=0) + def get_task_node_id(): + return ray.get_runtime_context().get_node_id() + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}, {"CPU": 1}], + placement_group_strategy="STRICT_PACK", + placement_group_bundle_label_selector=[ + {"gpu-type": "H100", "region": "us-east"} + ], + ) + class StrictPackSelector: + async def get_bundle_node_id(self, bundle_index: int): + pg = ray.util.get_current_placement_group() + return await get_task_node_id.options( + scheduling_strategy=ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy( + placement_group=pg, + placement_group_bundle_index=bundle_index, + ) + ).remote() + + handle = serve.run(StrictPackSelector.bind(), name="strict_pack_app") + + # Both bundles are scheduled to the same node which matches the label constraints. + assert await handle.get_bundle_node_id.remote(0) == us_east_node_id + assert await handle.get_bundle_node_id.remote(1) == us_east_node_id + + serve.delete("strict_pack_app") + + @pytest.mark.asyncio + async def test_e2e_serve_pack_pg_forces_spread( + self, serve_instance_with_labeled_nodes + ): + """ + Verifies that using non-strict PACK PG strategy with label selectors works. + + STRICT_PACK throws NotImplementedError for selectors. However, 'PACK' is considered a + 'Non-Strict' strategy which forces the scheduler to fall back to 'Spread Mode'. + """ + _, _, us_east_node_id, _ = serve_instance_with_labeled_nodes + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}], + placement_group_strategy="PACK", + placement_group_bundle_label_selector=[{"gpu-type": "H100"}], + ) + class PackSelector: + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + # If this stayed in the Pack Scheduler, it would raise NotImplementedError. + # Because it forces Spread Mode, it succeeds. + handle = serve.run(PackSelector.bind(), name="pack_selector_app") + assert await handle.get_node_id.remote() == us_east_node_id + serve.delete("pack_selector_app") + + @pytest.mark.asyncio + async def test_e2e_serve_multiple_bundles_selector( + self, serve_instance_with_labeled_nodes + ): + """Verifies multiple bundles with bundle_label_selector are applied correctly.""" + _, us_west_node_id, us_east_node_id, _ = serve_instance_with_labeled_nodes + + # Helper task to return the node ID it's running on + @ray.remote(num_cpus=0) + def get_task_node_id(): + return ray.get_runtime_context().get_node_id() + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}, {"CPU": 1}], + placement_group_strategy="SPREAD", + placement_group_bundle_label_selector=[ + {"gpu-type": "H100"}, # matches us-east node + {"gpu-type": "A100"}, # matches us-west node + ], + ) + class MultiBundleSelector: + async def get_bundle_node_id(self, bundle_index: int): + pg = ray.util.get_current_placement_group() + return await get_task_node_id.options( + scheduling_strategy=ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy( + placement_group=pg, + placement_group_bundle_index=bundle_index, + ) + ).remote() + + handle = serve.run(MultiBundleSelector.bind(), name="multi_bundle_app") + + # Verify bundles are scheduled to expected nodes based on label selectors. + assert await handle.get_bundle_node_id.remote(0) == us_east_node_id + assert await handle.get_bundle_node_id.remote(1) == us_west_node_id + serve.delete("multi_bundle_app") + + @pytest.mark.asyncio + async def test_e2e_serve_multiple_bundles_single_bundle_label_selector( + self, serve_instance_with_labeled_nodes + ): + """ + Verifies that when only one bundle_label_selector is provided for multiple bundles, + the label_selector is applied to each bundle uniformly. + """ + _, _, us_east_node_id, _ = serve_instance_with_labeled_nodes + + @ray.remote(num_cpus=0) + def get_task_node_id(): + return ray.get_runtime_context().get_node_id() + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}, {"CPU": 1}], + # Use SPREAD to verify the label constraint forces them to same node. + placement_group_strategy="SPREAD", + placement_group_bundle_label_selector=[ + {"gpu-type": "H100"}, + ], + ) + class MultiBundleSelector: + async def get_bundle_node_id(self, bundle_index: int): + pg = ray.util.get_current_placement_group() + return await get_task_node_id.options( + scheduling_strategy=ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy( + placement_group=pg, + placement_group_bundle_index=bundle_index, + ) + ).remote() + + handle = serve.run(MultiBundleSelector.bind(), name="multi_bundle_app") + assert await handle.get_bundle_node_id.remote(0) == us_east_node_id + assert await handle.get_bundle_node_id.remote(1) == us_east_node_id + serve.delete("multi_bundle_app") + + @pytest.mark.asyncio + async def test_e2e_serve_actor_multiple_fallbacks( + self, serve_instance_with_labeled_nodes + ): + """ + Verifies that the scheduler can iterate through a label selector and multiple fallback options. + """ + _, us_west_node_id, _, _ = serve_instance_with_labeled_nodes + + @serve.deployment( + ray_actor_options={ + "label_selector": {"region": "invalid-label-1"}, + "fallback_strategy": [ + {"label_selector": {"region": "invalid-label-2"}}, + {"label_selector": {"region": "us-west"}}, # Should match + ], + } + ) + class MultiFallbackActor: + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + handle = serve.run(MultiFallbackActor.bind(), name="multi_fallback_app") + assert await handle.get_node_id.remote() == us_west_node_id + serve.delete("multi_fallback_app") + + +@pytest.mark.asyncio +async def test_e2e_serve_label_selector(serve_instance_with_labeled_nodes): + """ + Verifies that label selectors work correctly for both Actors and Placement Groups. + + This test also verifies that label selectors are respected when scheduling with a + preferred node ID for resource compaction. This test verifies both the Pack and + Spread scheduler paths. + """ + _, us_west_node_id, us_east_node_id, _ = serve_instance_with_labeled_nodes + + # Validate a Serve deplyoment utilizes a label_selector when passed to the Ray Actor options. + @serve.deployment(ray_actor_options={"label_selector": {"region": "us-west"}}) + class DeploymentActor: + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + handle = serve.run(DeploymentActor.bind(), name="actor_app") + assert await handle.get_node_id.remote() == us_west_node_id + serve.delete("actor_app") + + # Validate placement_group scheduling strategy with placement_group_bundle_label_selector + # and PACK strategy. + @serve.deployment( + placement_group_bundles=[{"CPU": 1}], + placement_group_strategy="PACK", + placement_group_bundle_label_selector=[{"gpu-type": "H100"}], + ) + class DeploymentPGPack: + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + handle_pack = serve.run(DeploymentPGPack.bind(), name="pg_pack_app") + assert await handle_pack.get_node_id.remote() == us_east_node_id + serve.delete("pg_pack_app") + + # Validate placement_group scheduling strategy with placement_group_bundle_label_selector + # and SPREAD strategy. + @serve.deployment( + placement_group_bundles=[{"CPU": 1}], + placement_group_strategy="SPREAD", + placement_group_bundle_label_selector=[{"gpu-type": "H100"}], + ) + class DeploymentPGSpread: + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + handle_spread = serve.run(DeploymentPGSpread.bind(), name="pg_spread_app") + assert await handle_spread.get_node_id.remote() == us_east_node_id + serve.delete("pg_spread_app") + + +@pytest.mark.asyncio +async def test_e2e_serve_fallback_strategy(serve_instance_with_labeled_nodes): + """ + Verifies that fallback strategies allow scheduling on alternative nodes when + primary constraints fail. + """ + _, _, h100_node_id, _ = serve_instance_with_labeled_nodes + + # Fallback strategy specified for Ray Actor in Serve deployment. + @serve.deployment( + ray_actor_options={ + "label_selector": {"region": "unavailable"}, + "fallback_strategy": [{"label_selector": {"gpu-type": "H100"}}], + } + ) + class FallbackDeployment: + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + # TODO (ryanaoleary@): Add a test for fallback_strategy in placement group options + # when support is added. + + handle = serve.run(FallbackDeployment.bind(), name="fallback_app") + assert await handle.get_node_id.remote() == h100_node_id + serve.delete("fallback_app") + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "use_pg,strategy", + [ + (False, None), # Actor-level label_selector. + (True, "PACK"), # PG bundle_label_selector with PACK strategy. + (True, "SPREAD"), # PG bundle_label_selector with SPREAD strategy. + ( + True, + "STRICT_SPREAD", + ), # PG bundle_label_selector with STRICT_SPREAD strategy. + ], +) +async def test_e2e_serve_label_selector_unschedulable( + serve_instance_with_labeled_nodes, use_pg, strategy +): + """ + Verifies the interaction between unschedulable a placement_group_bundle_label_selector + and different scheduling strategies in the Pack and Spread Serve scheduler. + """ + _, _, _, cluster = serve_instance_with_labeled_nodes + + @serve.deployment + def A(): + return ray.get_runtime_context().get_node_id() + + # Cluster in fixture only contains us-west and us-east. + target_label = {"region": "eu-central"} + + if use_pg: + app = A.options( + num_replicas=1, + placement_group_bundles=[{"CPU": 1}], + placement_group_strategy=strategy, + placement_group_bundle_label_selector=[target_label], + ).bind() + else: + app = A.options( + num_replicas=1, + ray_actor_options={"label_selector": target_label}, + ).bind() + + handle = serve._run(app, name="unschedulable_label_app", _blocking=False) + + def check_status(expected_status): + try: + status_info = serve.status().applications["unschedulable_label_app"] + return status_info.status == expected_status + except KeyError: + return False + + def verify_resource_request_stuck(): + """Verifies that the underlying resource request is pending.""" + # Serve deployment should be stuck DEPLOYING. + if not check_status("DEPLOYING"): + return False + + # Check PG/Actor is actually pending. + if use_pg: + pgs = ray.util.state.list_placement_groups() + return any(pg["state"] == "PENDING" for pg in pgs) + else: + actors = ray.util.state.list_actors() + return any(a["state"] == "PENDING_CREATION" for a in actors) + + # Serve deployment should remain stuck in deploying because Actor/PG can't be scheduled. + wait_for_condition(verify_resource_request_stuck, timeout=10) + assert not check_status("RUNNING"), ( + "Test setup failed: The deployment became RUNNING before the required " + "node was added. The label selector constraint was ignored." + ) + + # Add a suitable node to the cluster. + cluster.add_node(num_cpus=2, labels=target_label, resources={"target_node": 1}) + cluster.wait_for_nodes() + expected_node_id = ray.get( + get_node_id.options(resources={"target_node": 1}).remote() + ) + + # Validate deployment can now be scheduled since label selector is satisfied. + wait_for_condition(lambda: check_status("RUNNING"), timeout=10) + assert await handle.remote() == expected_node_id + + serve.delete("unschedulable_label_app") + + +@pytest.mark.asyncio +async def test_e2e_serve_fallback_strategy_unschedulable( + serve_instance_with_labeled_nodes, +): + """ + Verifies that an unschedulable fallback_strategy causes the Serve deployment to wait + until a suitable node is added to the cluster. + """ + _, _, _, cluster = serve_instance_with_labeled_nodes + + @serve.deployment + def A(): + return ray.get_runtime_context().get_node_id() + + fallback_label = {"region": "me-central2"} + + app = A.options( + num_replicas=1, + ray_actor_options={ + "label_selector": {"region": "non-existant"}, + "fallback_strategy": [{"label_selector": fallback_label}], + }, + ).bind() + + handle = serve._run(app, name="unschedulable_fallback_app", _blocking=False) + + def check_status(expected_status): + try: + status_info = serve.status().applications["unschedulable_fallback_app"] + return status_info.status == expected_status + except KeyError: + return False + + def verify_resource_request_stuck(): + """Verifies that the underlying resource request is pending.""" + # Serve deployment should be stuck DEPLOYING. + if not check_status("DEPLOYING"): + return False + + actors = ray.util.state.list_actors() + return any(a["state"] == "PENDING_CREATION" for a in actors) + + # Serve deployment should remain stuck in deploying because Actor/PG can't be scheduled. + wait_for_condition(verify_resource_request_stuck, timeout=10) + assert not check_status("RUNNING"), ( + "Test setup failed: The deployment became RUNNING before the required " + "node was added. The label selector constraint was ignored." + ) + + # Add a node that matches the fallback. + cluster.add_node(num_cpus=2, labels=fallback_label, resources={"fallback_node": 1}) + cluster.wait_for_nodes() + expected_node_id = ray.get( + get_node_id.options(resources={"fallback_node": 1}).remote() + ) + + # The serve deployment should recover and start running on the fallback node. + wait_for_condition(lambda: check_status("RUNNING"), timeout=10) + assert await handle.remote() == expected_node_id + + serve.delete("unschedulable_fallback_app") + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_deployment_version.py b/python/ray/serve/tests/test_deployment_version.py index b1202aee3134..b52170949b10 100644 --- a/python/ray/serve/tests/test_deployment_version.py +++ b/python/ray/serve/tests/test_deployment_version.py @@ -38,6 +38,48 @@ def test_route_prefix_changes_trigger_reconfigure_hash(): assert v1.requires_actor_reconfigure(v2) +def test_placement_group_options_trigger_restart(): + """Test that changing a PG label selector or fallback strategy triggers an actor restart.""" + cfg = DeploymentConfig() + + # Initial deployment. + v1 = DeploymentVersion( + code_version="1", + deployment_config=cfg, + ray_actor_options={}, + placement_group_bundles=[{"CPU": 1}], + placement_group_strategy="PACK", + ) + + # Change bundle_label_selector. + v2 = DeploymentVersion( + code_version="1", + deployment_config=cfg, + ray_actor_options={}, + placement_group_bundles=[{"CPU": 1}], + placement_group_strategy="PACK", + placement_group_bundle_label_selector=[{"region": "us-west"}], + ) + + # Validate actor restart occurs due to differing hash. + assert v1.placement_group_options_hash != v2.placement_group_options_hash + assert v1.requires_actor_restart(v2) + + # Change fallback_strategy. + v3 = DeploymentVersion( + code_version="1", + deployment_config=cfg, + ray_actor_options={}, + placement_group_bundles=[{"CPU": 1}], + placement_group_strategy="PACK", + placement_group_fallback_strategy=[{"bundles": [{"CPU": 1}]}], + ) + + # Validate actor restart occurs due to differing hash. + assert v1.placement_group_options_hash != v3.placement_group_options_hash + assert v1.requires_actor_restart(v3) + + if __name__ == "__main__": import sys diff --git a/python/ray/serve/tests/unit/test_application_state.py b/python/ray/serve/tests/unit/test_application_state.py index 67f8c57ab852..974d164858b8 100644 --- a/python/ray/serve/tests/unit/test_application_state.py +++ b/python/ray/serve/tests/unit/test_application_state.py @@ -1496,6 +1496,68 @@ def test_override_ray_actor_options_5(self): == "s3://B" ) + def test_override_bundle_label_selector(self, info): + """Test placement_group_bundle_label_selector is propagated from config.""" + config = ServeApplicationSchema( + name="default", + import_path="test.import.path", + deployments=[ + DeploymentSchema( + name="A", + placement_group_bundles=[{"CPU": 1}], + placement_group_bundle_label_selector=[ + {"accelerator-type": "A100"} + ], + ) + ], + ) + + updated_infos = override_deployment_info({"A": info}, config) + updated_info = updated_infos["A"] + + assert updated_info.replica_config.placement_group_bundle_label_selector == [ + {"accelerator-type": "A100"} + ] + + def test_override_fallback_strategy(self, info): + """Test placement_group_fallback_strategy is preserved when config is updated. + + placement_group_fallback_strategy is not yet part of the public DeploymentSchema, + so we cannot set it via the override config. Instead, we verify that an existing + value in the ReplicaConfig is preserved when other fields are updated via the config. + """ + initial_info = DeploymentInfo( + route_prefix="/", + version="123", + deployment_config=DeploymentConfig(num_replicas=1), + replica_config=ReplicaConfig.create( + lambda x: x, + placement_group_bundles=[{"CPU": 1}], + placement_group_fallback_strategy=[{"bundles": [{"CPU": 1}]}], + ), + start_time_ms=0, + deployer_job_id="", + ) + + config = ServeApplicationSchema( + name="default", + import_path="test.import.path", + deployments=[ + DeploymentSchema( + name="A", + num_replicas=5, # Update a different field + ) + ], + ) + + updated_infos = override_deployment_info({"A": initial_info}, config) + updated_info = updated_infos["A"] + + assert updated_info.deployment_config.num_replicas == 5 + assert updated_info.replica_config.placement_group_fallback_strategy == [ + {"bundles": [{"CPU": 1}]} + ] + class TestAutoscale: def test_autoscale(self, mocked_application_state_manager): diff --git a/python/ray/serve/tests/unit/test_config.py b/python/ray/serve/tests/unit/test_config.py index f348cf1d27ca..4c0ef478aebe 100644 --- a/python/ray/serve/tests/unit/test_config.py +++ b/python/ray/serve/tests/unit/test_config.py @@ -580,6 +580,117 @@ def f(): assert config.init_args == tuple() assert config.init_kwargs == dict() + def test_placement_group_bundle_label_selector_validation(self): + class Class: + pass + + # Label selector provided without bundles + with pytest.raises( + ValueError, + match="If `placement_group_bundle_label_selector` is provided, `placement_group_bundles` must also be provided.", + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundle_label_selector=[{"gpu": "T4"}], + ) + + # bundle_label_selector list does not match bundles list length + with pytest.raises( + ValueError, + match="The length of `bundle_label_selector` should equal the length of `bundles`", + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}], + placement_group_bundle_label_selector=[{"gpu": "T4"}, {"gpu": "L4"}], + ) + + # Valid config - multiple bundles provided for one bundle_label_selector. + config = ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}], + placement_group_bundle_label_selector=[{"gpu": "T4"}], + ) + assert config.placement_group_bundle_label_selector == [ + {"gpu": "T4"}, + {"gpu": "T4"}, + {"gpu": "T4"}, + ] + + # Valid config - multiple bundles and an equal number of bundle label selectors. + config = ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1}, {"CPU": 1}], + placement_group_bundle_label_selector=[{"gpu": "T4"}, {"gpu": "L4"}], + ) + assert config.placement_group_bundle_label_selector == [ + {"gpu": "T4"}, + {"gpu": "L4"}, + ] + + def test_placement_group_fallback_strategy_validation(self): + class Class: + pass + + # Validate that fallback strategy provided without bundles raises error. + with pytest.raises( + ValueError, + match="If `placement_group_fallback_strategy` is provided, `placement_group_bundles` must also be provided.", + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_fallback_strategy=[{"bundles": [{"CPU": 1}]}], + ) + + # Validate that fallback strategy is a list + with pytest.raises( + TypeError, + match="placement_group_fallback_strategy must be a list of dictionaries.", + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1}], + placement_group_fallback_strategy="not_a_list", + ) + + # Fallback strategy list contains non-dict items + with pytest.raises( + TypeError, + match="placement_group_fallback_strategy entry at index 1 must be a dictionary.", + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1}], + placement_group_fallback_strategy=[ + {"bundles": [{"CPU": 1}]}, + "invalid_entry", + ], + ) + + # Valid config + config = ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1}], + placement_group_fallback_strategy=[{"bundles": [{"CPU": 1}]}], + ) + assert config.placement_group_fallback_strategy == [{"bundles": [{"CPU": 1}]}] + class TestAutoscalingConfig: def test_target_ongoing_requests(self): diff --git a/python/ray/serve/tests/unit/test_deployment_scheduler.py b/python/ray/serve/tests/unit/test_deployment_scheduler.py index 7b502e24e44f..928bb7309c96 100644 --- a/python/ray/serve/tests/unit/test_deployment_scheduler.py +++ b/python/ray/serve/tests/unit/test_deployment_scheduler.py @@ -867,6 +867,233 @@ def test_downscale_single_deployment(): scheduler.on_deployment_deleted(dep_id) +def test_schedule_passes_placement_group_options(): + """Test that bundle_label_selector is passed to CreatePlacementGroupRequest.""" + cluster_node_info_cache = MockClusterNodeInfoCache() + captured_requests = [] + + def mock_create_pg(request): + captured_requests.append(request) + + class MockPG: + def wait(self, *args): + return True + + return MockPG() + + scheduler = default_impl.create_deployment_scheduler( + cluster_node_info_cache, + head_node_id_override="fake-head-node-id", + create_placement_group_fn_override=mock_create_pg, + ) + + dep_id = DeploymentID(name="pg_options_test") + # Use Spread policy here, but the logic is shared across policies. + scheduler.on_deployment_created(dep_id, SpreadDeploymentSchedulingPolicy()) + + test_labels = [{"region": "us-west"}] + # Create a request with the new options + req = ReplicaSchedulingRequest( + replica_id=ReplicaID("r1", dep_id), + actor_def=MockActorClass(), + actor_resources={"CPU": 1}, + actor_options={"name": "r1"}, + actor_init_args=(), + on_scheduled=lambda *args, **kwargs: None, + placement_group_bundles=[{"CPU": 1}], + placement_group_bundle_label_selector=test_labels, + placement_group_strategy="STRICT_PACK", + ) + + scheduler.schedule(upscales={dep_id: [req]}, downscales={}) + + # Verify the PlacementGroupSchedulingRequest is created. + assert len(captured_requests) == 1 + pg_request = captured_requests[0] + + # bundle_label_selector should be passed to request. + assert pg_request.bundle_label_selector == test_labels + + +def test_filter_nodes_by_label_selector(): + """Test _filter_nodes_by_label_selector logic used by _find_best_fit_node_for_pack + when bin-packing, such that label constraints are enforced for the preferred node.""" + + class MockScheduler(default_impl.DefaultDeploymentScheduler): + def __init__(self): + pass + + scheduler = MockScheduler() + + nodes = { + "n1": Resources(), + "n2": Resources(), + "n3": Resources(), + } + node_labels = { + "n1": {"region": "us-west", "gpu": "T4", "env": "prod"}, + "n2": {"region": "us-east", "gpu": "A100", "env": "dev"}, + "n3": {"region": "me-central", "env": "staging"}, # No GPU label + } + + # equals operator + filtered = scheduler._filter_nodes_by_label_selector( + nodes, {"region": "us-west"}, node_labels + ) + assert set(filtered.keys()) == {"n1"} + + # not equals operator + filtered = scheduler._filter_nodes_by_label_selector( + nodes, {"region": "!us-west"}, node_labels + ) + assert set(filtered.keys()) == {"n2", "n3"} + + # in operator + filtered = scheduler._filter_nodes_by_label_selector( + nodes, {"region": "in(us-west, us-east)"}, node_labels + ) + assert set(filtered.keys()) == {"n1", "n2"} + + # !in operator + filtered = scheduler._filter_nodes_by_label_selector( + nodes, {"env": "!in(dev, staging)"}, node_labels + ) + assert set(filtered.keys()) == {"n1"} + + # Missing labels treated as not a match for equality. + filtered = scheduler._filter_nodes_by_label_selector( + nodes, {"gpu": "A100"}, node_labels + ) + assert set(filtered.keys()) == {"n2"} + + # Not equal should match node with missing labels. + filtered = scheduler._filter_nodes_by_label_selector( + nodes, {"gpu": "!T4"}, node_labels + ) + assert set(filtered.keys()) == {"n2", "n3"} + + # Validate we handle whitespace. + filtered = scheduler._filter_nodes_by_label_selector( + nodes, {"region": "in( us-west , us-east )"}, node_labels + ) + assert set(filtered.keys()) == {"n1", "n2"} + + +def test_build_pack_placement_candidates(): + """Test strategy generation logic in DefaultDeploymentScheduler._build_pack_placement_candidates, + verifying that the scheduler correctly generates a list of (resources, labels) tuples to + attempt for scheduling.""" + + # Setup scheduler with mocks + cluster_node_info_cache = MockClusterNodeInfoCache() + scheduler = default_impl.create_deployment_scheduler( + cluster_node_info_cache, + head_node_id_override="head_node", + create_placement_group_fn_override=None, + ) + + # Basic Ray Actor + req_basic = ReplicaSchedulingRequest( + replica_id=ReplicaID("r1", DeploymentID(name="d1")), + actor_def=MockActorClass(), + actor_resources={"CPU": 1}, + actor_options={}, + actor_init_args=(), + on_scheduled=Mock(), + ) + strategies = scheduler._build_pack_placement_candidates(req_basic) + assert len(strategies) == 1 + assert strategies[0][0] == {"CPU": 1} + assert strategies[0][1] == [] + + # Actor with label_selector and fallback_strategy + req_fallback = ReplicaSchedulingRequest( + replica_id=ReplicaID("r2", DeploymentID(name="d1")), + actor_def=MockActorClass(), + actor_resources={"CPU": 1}, + actor_options={ + "label_selector": {"region": "us-west"}, + "fallback_strategy": [{"label_selector": {"region": "us-east"}}], + }, + actor_init_args=(), + on_scheduled=Mock(), + ) + strategies = scheduler._build_pack_placement_candidates(req_fallback) + assert len(strategies) == 2 + + assert strategies[0][0] == {"CPU": 1} + assert strategies[0][1] == [{"region": "us-west"}] + assert strategies[1][0] == {"CPU": 1} + assert strategies[1][1] == [{"region": "us-east"}] + + # Scheduling replica with placement group PACK strategy and bundle_label_selector + req_pack = ReplicaSchedulingRequest( + replica_id=ReplicaID("r4", DeploymentID(name="d1")), + actor_def=MockActorClass(), + actor_resources={"CPU": 0.1}, + actor_options={}, + actor_init_args=(), + on_scheduled=Mock(), + placement_group_bundles=[{"CPU": 5}], + placement_group_strategy="PACK", + placement_group_bundle_label_selector=[ + {"accelerator-type": "H100"}, + {"accelerator-type": "H100"}, + ], + ) + + with pytest.raises(NotImplementedError): + scheduler._build_pack_placement_candidates(req_pack) + + # Scheduling replica with placement group STRICT_PACK strategy and bundle_label_selector + req_pg = ReplicaSchedulingRequest( + replica_id=ReplicaID("r3", DeploymentID(name="d1")), + actor_def=MockActorClass(), + actor_resources={}, + actor_options={}, + actor_init_args=(), + on_scheduled=Mock(), + placement_group_bundles=[{"CPU": 2}], + placement_group_strategy="STRICT_PACK", + placement_group_bundle_label_selector=[{"accelerator-type": "A100"}], + ) + strategies = scheduler._build_pack_placement_candidates(req_pg) + assert len(strategies) == 1 + + assert strategies[0][0] == {"CPU": 2} + assert strategies[0][1] == [{"accelerator-type": "A100"}] + + +def test_build_pack_placement_candidates_pg_fallback_error(): + """ + Test that providing placement_group_fallback_strategy raises NotImplementedError. + """ + cluster_node_info_cache = MockClusterNodeInfoCache() + scheduler = default_impl.create_deployment_scheduler( + cluster_node_info_cache, + head_node_id_override="head_node", + create_placement_group_fn_override=None, + ) + + # Create a request with placement_group_fallback_strategy defined. + req = ReplicaSchedulingRequest( + replica_id=ReplicaID("r1", DeploymentID(name="d1")), + actor_def=MockActorClass(), + actor_resources={}, + actor_options={}, + actor_init_args=(), + on_scheduled=Mock(), + placement_group_bundles=[{"CPU": 1}], + placement_group_strategy="STRICT_PACK", + # Raises NotImplementedError since not added to placement group options yet. + placement_group_fallback_strategy=[{"label_selector": {"zone": "us-east-1a"}}], + ) + + # Verify the scheduler raises the expected error + with pytest.raises(NotImplementedError, match="not yet supported"): + scheduler._build_pack_placement_candidates(req) + + @pytest.mark.skipif( not RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY, reason="Needs pack strategy." ) diff --git a/python/ray/serve/tests/unit/test_schema.py b/python/ray/serve/tests/unit/test_schema.py index 2cbda98eac8e..d0f0ebd00d30 100644 --- a/python/ray/serve/tests/unit/test_schema.py +++ b/python/ray/serve/tests/unit/test_schema.py @@ -382,6 +382,45 @@ def test_num_replicas_nullable(self): } DeploymentSchema.parse_obj(deployment_options) + def test_validate_bundle_label_selector(self): + """Test validation for placement_group_bundle_label_selector.""" + + deployment_schema = self.get_minimal_deployment_schema() + + # Validate bundle_label_selector provided without bundles raises. + deployment_schema["placement_group_bundle_label_selector"] = [{"a": "b"}] + with pytest.raises( + ValidationError, + match="Setting bundle_label_selector is not allowed when placement_group_bundles is not provided", + ): + DeploymentSchema.parse_obj(deployment_schema) + + # Validate mismatched lengths for bundles and bundle_label_selector raises. + deployment_schema["placement_group_bundles"] = [{"CPU": 1}, {"CPU": 1}] + deployment_schema["placement_group_bundle_label_selector"] = [ + {"a": "b"}, + {"c": "d"}, + {"e": "f"}, + ] + with pytest.raises( + ValidationError, + match=r"list must contain either a single selector \(to apply to all bundles\) or match the number of `placement_group_bundles`", + ): + DeploymentSchema.parse_obj(deployment_schema) + + # Valid config - 2 bundles and 2 placement_group_bundle_label_selector. + deployment_schema["placement_group_bundle_label_selector"] = [ + {"a": "b"}, + {"c": "d"}, + ] + DeploymentSchema.parse_obj(deployment_schema) + + # Valid config - single placement_group_bundle_label_selector. + deployment_schema["placement_group_bundle_label_selector"] = [ + {"a": "b"}, + ] + DeploymentSchema.parse_obj(deployment_schema) + class TestServeApplicationSchema: def get_valid_serve_application_schema(self): diff --git a/src/ray/protobuf/serve.proto b/src/ray/protobuf/serve.proto index a0b6d5afb0bd..10a231457b49 100644 --- a/src/ray/protobuf/serve.proto +++ b/src/ray/protobuf/serve.proto @@ -244,6 +244,8 @@ message DeploymentVersion { string placement_group_bundles = 4; string placement_group_strategy = 5; int32 max_replicas_per_node = 6; + string placement_group_bundle_label_selector = 7; + string placement_group_fallback_strategy = 8; } message ReplicaConfig { @@ -255,6 +257,8 @@ message ReplicaConfig { string placement_group_bundles = 6; string placement_group_strategy = 7; int32 max_replicas_per_node = 8; + string placement_group_bundle_label_selector = 9; + string placement_group_fallback_strategy = 10; } enum TargetCapacityDirection {