Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ func BuildPod(ctx context.Context, podTemplateSpec corev1.PodTemplateSpec, rayNo
initLivenessAndReadinessProbe(&pod.Spec.Containers[utils.RayContainerIndex], rayNodeType, creatorCRDType)
}

// Add downward API environment variables for Ray's default node labels for Ray label-based scheduling.
addDefaultRayNodeLabels(&pod)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we guard this logic with a Ray version check?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code doesn't rely on any API change from Ray, it just sets some env vars and the actual node labels get set in Ray core using those vars, but I can add a version guard here (I guess for whatever version ray-project/ray#53360 is included in) if we don't want it setting any unused vars for users on older versions of Ray.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From offline discussion with @MengjinYan we were leaning towards not including a version guard, since users are not required to specify the Ray version they're using in the CR spec


return pod
}

Expand Down Expand Up @@ -681,6 +684,7 @@ func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, fqdnRay
container.Env = append(container.Env, rayCloudInstanceID)

// RAY_NODE_TYPE_NAME is used by Ray Autoscaler V2 (alpha). See https://github.com/ray-project/kuberay/issues/1965 for more details.
// This value can be used to set the ray.io/node-group default Ray node label.
nodeGroupNameEnv := corev1.EnvVar{
Name: utils.RAY_NODE_TYPE_NAME,
ValueFrom: &corev1.EnvVarSource{
Expand Down Expand Up @@ -1034,3 +1038,157 @@ func isGPUResourceKey(key string) bool {
match, _ := regexp.MatchString(`nvidia\.com/mig-\d+g\.\d+gb$`, key)
return match
}

// containsEnvVar is a helper function to check if a container contains a specified EnvVar
func containsEnvVar(container corev1.Container, envVar string) bool {
for _, env := range container.Env {
if env.Name == envVar {
return true
}
}
return false
}

// addDefaultRayNodeLabels passes default Ray node labels to Ray runtime environment
func addDefaultRayNodeLabels(pod *corev1.Pod) {
rayContainer := &pod.Spec.Containers[utils.RayContainerIndex]
envVars := rayContainer.Env

if !containsEnvVar(*rayContainer, utils.RayNodeMarketType) {
// used to set the ray.io/market-type node label
envVars = append(envVars, corev1.EnvVar{
Name: utils.RayNodeMarketType,
Value: string(getPodMarketType(pod)),
})
}
if !containsEnvVar(*rayContainer, utils.RayNodeZone) {
envVars = append(envVars, getPodZoneEnvVar(pod))
}
if !containsEnvVar(*rayContainer, utils.RayNodeRegion) {
envVars = append(envVars, getPodRegionEnvVar(pod))
}
rayContainer.Env = envVars
}

// getPodZoneEnvVar is a helper function to determine the ray.io/availability-zone label value
// based on a Pod spec - checking labels, nodeSelectors, and then falling back to downward API.
func getPodZoneEnvVar(pod *corev1.Pod) corev1.EnvVar {
if podZone, ok := pod.Labels[utils.K8sTopologyZoneLabel]; ok && podZone != "" {
return corev1.EnvVar{
Name: utils.RayNodeZone,
Value: podZone,
}
} else if podZone, ok := pod.Spec.NodeSelector[utils.K8sTopologyZoneLabel]; ok && podZone != "" {
return corev1.EnvVar{
Name: utils.RayNodeZone,
Value: podZone,
}
}
// uses downward api to set the ray.io/availability-zone node label
// Ref: https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone
return corev1.EnvVar{
Name: utils.RayNodeZone,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.labels['%s']", utils.K8sTopologyZoneLabel),
},
},
}
}

// getPodRegionEnvVar is a helper function to determine the ray.io/availability-region label value
// based on a Pod spec - checking labels, nodeSelectors, and then falling back to downward API.
func getPodRegionEnvVar(pod *corev1.Pod) corev1.EnvVar {
if podRegion, ok := pod.Labels[utils.K8sTopologyRegionLabel]; ok && podRegion != "" {
return corev1.EnvVar{
Name: utils.RayNodeRegion,
Value: podRegion,
}
} else if podRegion, ok := pod.Spec.NodeSelector[utils.K8sTopologyRegionLabel]; ok && podRegion != "" {
return corev1.EnvVar{
Name: utils.RayNodeRegion,
Value: podRegion,
}
}
// uses downward api to set the ray.io/availability-region node label
// Ref: https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesioregion
return corev1.EnvVar{
Name: utils.RayNodeRegion,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.labels['%s']", utils.K8sTopologyRegionLabel),
},
},
}
}

// getPodMarketTypeFromNodeSelector is a helper function to determine the ray.io/market-type
// label value based on a Kubernetes Pod spec - checking labels, nodeSelector, and nodeAffinity.
func getPodMarketType(pod *corev1.Pod) utils.PodMarketType {
marketType := getPodMarketTypeFromNodeSelector(pod.Spec.NodeSelector)

if marketType == utils.OnDemandMarketType && pod.Spec.Affinity != nil {
// check for NodeAffinity if nodeSelector specifying spot instance not found
marketType = getPodMarketTypeFromNodeAffinity(pod.Spec.Affinity.NodeAffinity)
}
return marketType
}

// getPodMarketTypeFromNodeSelector returns a ray.io/market-type label
// based on user-provided Kubernetes nodeSelector values.
func getPodMarketTypeFromNodeSelector(selector map[string]string) utils.PodMarketType {
if selector == nil {
return utils.OnDemandMarketType
}
// check for GKE spot instance selector
if val, ok := selector[utils.GKESpotLabel]; ok && val == "true" {
return utils.SpotMarketType
}
// check for EKS spot instance selector
if val, ok := selector[utils.EKSCapacityTypeLabel]; ok && val == "SPOT" {
return utils.SpotMarketType
}
return utils.OnDemandMarketType
}

// getPodMarketTypeFromNodeSelector returns a ray.io/market-type label
// based on user-provided Kubernetes nodeAffinity values.
func getPodMarketTypeFromNodeAffinity(nodeAffinity *corev1.NodeAffinity) utils.PodMarketType {
if nodeAffinity == nil {
return utils.OnDemandMarketType
}

// Only add the spot instance label if the Pod is guaranteed to be on a node of
// that type when scheduled.
requiredTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
if requiredTerms == nil {
return utils.OnDemandMarketType
}

for _, term := range requiredTerms.NodeSelectorTerms {
for _, expr := range term.MatchExpressions {
switch expr.Key {
// GKE specific check
case utils.GKESpotLabel:
if expr.Operator == corev1.NodeSelectorOpIn {
for _, val := range expr.Values {
if val == "true" {
return utils.SpotMarketType
}
}
}
// Amazon EKS specific check
case utils.EKSCapacityTypeLabel:
if expr.Operator == corev1.NodeSelectorOpIn {
for _, val := range expr.Values {
if val == "SPOT" {
return utils.SpotMarketType
}
}
}
}
}
}
// Default to on-demand instance type
return utils.OnDemandMarketType
}
Loading
Loading