diff --git a/docs/reference/api.md b/docs/reference/api.md index 4b495fef69e..f692742d247 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -138,6 +138,8 @@ _Appears in:_ | `template` _[PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#podtemplatespec-v1-core)_ | Template is the exact pod template used in K8s deployments, statefulsets, etc. | | | | `headService` _[Service](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#service-v1-core)_ | HeadService is the Kubernetes service of the head pod. | | | | `enableIngress` _boolean_ | EnableIngress indicates whether operator should create ingress object for head service or not. | | | +| `resources` _object (keys:string, values:string)_ | Resources specifies the resource quantities for the head group.
These values override the resources passed to `rayStartParams` for the group, but
have no effect on the resources set at the K8s Pod container level. | | | +| `labels` _object (keys:string, values:string)_ | Labels specifies the Ray node labels for the head group.
These labels will also be added to the Pods of this head group and override the `--labels`
argument passed to `rayStartParams`. | | | | `rayStartParams` _object (keys:string, values:string)_ | RayStartParams are the params of the start command: node-manager-port, object-store-memory, ... | | | | `serviceType` _[ServiceType](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#servicetype-v1-core)_ | ServiceType is Kubernetes service type of the head service. it will be used by the workers to connect to the head pod | | | @@ -417,6 +419,8 @@ _Appears in:_ | `minReplicas` _integer_ | MinReplicas denotes the minimum number of desired Pods for this worker group. | 0 | | | `maxReplicas` _integer_ | MaxReplicas denotes the maximum number of desired Pods for this worker group, and the default value is maxInt32. | 2147483647 | | | `idleTimeoutSeconds` _integer_ | IdleTimeoutSeconds denotes the number of seconds to wait before the v2 autoscaler terminates an idle worker pod of this type.
This value is only used with the Ray Autoscaler enabled and defaults to the value set by the AutoscalingConfig if not specified for this worker group. | | | +| `resources` _object (keys:string, values:string)_ | Resources specifies the resource quantities for this worker group.
These values override the resources passed to `rayStartParams` for the group, but
have no effect on the resources set at the K8s Pod container level. | | | +| `labels` _object (keys:string, values:string)_ | Labels specifies the Ray node labels for this worker group.
These labels will also be added to the Pods of this worker group and override the `--labels`
argument passed to `rayStartParams`. | | | | `rayStartParams` _object (keys:string, values:string)_ | RayStartParams are the params of the start command: address, object-store-memory, ... | | | | `template` _[PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#podtemplatespec-v1-core)_ | Template is a pod template for the worker | | | | `scaleStrategy` _[ScaleStrategy](#scalestrategy)_ | ScaleStrategy defines which pods to remove | | | diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index a8e1f1df8f1..29a303bbb35 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -631,10 +631,18 @@ spec: type: object type: object type: object + labels: + additionalProperties: + type: string + type: object rayStartParams: additionalProperties: type: string type: object + resources: + additionalProperties: + type: string + type: object serviceType: type: string template: @@ -4332,6 +4340,10 @@ spec: idleTimeoutSeconds: format: int32 type: integer + labels: + additionalProperties: + type: string + type: object maxReplicas: default: 2147483647 format: int32 @@ -4352,6 +4364,10 @@ spec: default: 0 format: int32 type: integer + resources: + additionalProperties: + type: string + type: object scaleStrategy: properties: workersToDelete: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 8f8679ca607..1cc94efd59d 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -681,10 +681,18 @@ spec: type: object type: object type: object + labels: + additionalProperties: + type: string + type: object rayStartParams: additionalProperties: type: string type: object + resources: + additionalProperties: + type: string + type: object serviceType: type: string template: @@ -4382,6 +4390,10 @@ spec: idleTimeoutSeconds: format: int32 type: integer + labels: + additionalProperties: + type: string + type: object maxReplicas: default: 2147483647 format: int32 @@ -4402,6 +4414,10 @@ spec: default: 0 format: int32 type: integer + resources: + additionalProperties: + type: string + type: object scaleStrategy: properties: workersToDelete: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index a86457fac1a..e2d61172a3c 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -611,10 +611,18 @@ spec: type: object type: object type: object + labels: + additionalProperties: + type: string + type: object rayStartParams: additionalProperties: type: string type: object + resources: + additionalProperties: + type: string + type: object serviceType: type: string template: @@ -4312,6 +4320,10 @@ spec: idleTimeoutSeconds: format: int32 type: integer + labels: + additionalProperties: + type: string + type: object maxReplicas: default: 2147483647 format: int32 @@ -4332,6 +4344,10 @@ spec: default: 0 format: int32 type: integer + resources: + additionalProperties: + type: string + type: object scaleStrategy: properties: workersToDelete: diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index 4ac12c79725..6a6d40b8278 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -75,6 +75,16 @@ type HeadGroupSpec struct { // EnableIngress indicates whether operator should create ingress object for head service or not. // +optional EnableIngress *bool `json:"enableIngress,omitempty"` + // Resources specifies the resource quantities for the head group. + // These values override the resources passed to `rayStartParams` for the group, but + // have no effect on the resources set at the K8s Pod container level. + // +optional + Resources map[string]string `json:"resources,omitempty"` + // Labels specifies the Ray node labels for the head group. + // These labels will also be added to the Pods of this head group and override the `--labels` + // argument passed to `rayStartParams`. + // +optional + Labels map[string]string `json:"labels,omitempty"` // RayStartParams are the params of the start command: node-manager-port, object-store-memory, ... // +optional RayStartParams map[string]string `json:"rayStartParams"` @@ -106,6 +116,16 @@ type WorkerGroupSpec struct { // This value is only used with the Ray Autoscaler enabled and defaults to the value set by the AutoscalingConfig if not specified for this worker group. // +optional IdleTimeoutSeconds *int32 `json:"idleTimeoutSeconds,omitempty"` + // Resources specifies the resource quantities for this worker group. + // These values override the resources passed to `rayStartParams` for the group, but + // have no effect on the resources set at the K8s Pod container level. + // +optional + Resources map[string]string `json:"resources,omitempty"` + // Labels specifies the Ray node labels for this worker group. + // These labels will also be added to the Pods of this worker group and override the `--labels` + // argument passed to `rayStartParams`. + // +optional + Labels map[string]string `json:"labels,omitempty"` // RayStartParams are the params of the start command: address, object-store-memory, ... // +optional RayStartParams map[string]string `json:"rayStartParams"` diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index b4cb5decf12..b5f981b91a4 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -179,6 +179,20 @@ func (in *HeadGroupSpec) DeepCopyInto(out *HeadGroupSpec) { *out = new(bool) **out = **in } + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.RayStartParams != nil { in, out := &in.RayStartParams, &out.RayStartParams *out = make(map[string]string, len(*in)) @@ -827,6 +841,20 @@ func (in *WorkerGroupSpec) DeepCopyInto(out *WorkerGroupSpec) { *out = new(int32) **out = **in } + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.RayStartParams != nil { in, out := &in.RayStartParams, &out.RayStartParams *out = make(map[string]string, len(*in)) diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index a8e1f1df8f1..29a303bbb35 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -631,10 +631,18 @@ spec: type: object type: object type: object + labels: + additionalProperties: + type: string + type: object rayStartParams: additionalProperties: type: string type: object + resources: + additionalProperties: + type: string + type: object serviceType: type: string template: @@ -4332,6 +4340,10 @@ spec: idleTimeoutSeconds: format: int32 type: integer + labels: + additionalProperties: + type: string + type: object maxReplicas: default: 2147483647 format: int32 @@ -4352,6 +4364,10 @@ spec: default: 0 format: int32 type: integer + resources: + additionalProperties: + type: string + type: object scaleStrategy: properties: workersToDelete: diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index 8f8679ca607..1cc94efd59d 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -681,10 +681,18 @@ spec: type: object type: object type: object + labels: + additionalProperties: + type: string + type: object rayStartParams: additionalProperties: type: string type: object + resources: + additionalProperties: + type: string + type: object serviceType: type: string template: @@ -4382,6 +4390,10 @@ spec: idleTimeoutSeconds: format: int32 type: integer + labels: + additionalProperties: + type: string + type: object maxReplicas: default: 2147483647 format: int32 @@ -4402,6 +4414,10 @@ spec: default: 0 format: int32 type: integer + resources: + additionalProperties: + type: string + type: object scaleStrategy: properties: workersToDelete: diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index a86457fac1a..e2d61172a3c 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -611,10 +611,18 @@ spec: type: object type: object type: object + labels: + additionalProperties: + type: string + type: object rayStartParams: additionalProperties: type: string type: object + resources: + additionalProperties: + type: string + type: object serviceType: type: string template: @@ -4312,6 +4320,10 @@ spec: idleTimeoutSeconds: format: int32 type: integer + labels: + additionalProperties: + type: string + type: object maxReplicas: default: 2147483647 format: int32 @@ -4332,6 +4344,10 @@ spec: default: 0 format: int32 type: integer + resources: + additionalProperties: + type: string + type: object scaleStrategy: properties: workersToDelete: diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index 2b5aae59e3e..1eb086d2eb2 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -172,10 +172,16 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace - if podTemplate.Labels == nil { - podTemplate.Labels = make(map[string]string) - } - podTemplate.Labels = labelPod(rayv1.HeadNode, instance.Name, utils.RayNodeHeadGroupLabelValue, instance.Spec.HeadGroupSpec.Template.ObjectMeta.Labels) + // Update rayStartParams with top-level Resources for head group. + updateRayStartParamsResources(ctx, headSpec.RayStartParams, headSpec.Resources) + + // Update --labels` in rayStartParams with top-level Labels for head group. + updateRayStartParamsLabels(headSpec.RayStartParams, headSpec.Labels) + + // Merge K8s labels from the Pod template and the top-level `Labels` field. + mergedLabels := mergeLabels(headSpec.Template.ObjectMeta.Labels, headSpec.Labels) + podTemplate.Labels = labelPod(rayv1.HeadNode, instance.Name, utils.RayNodeHeadGroupLabelValue, mergedLabels) + headSpec.RayStartParams = setMissingRayStartParams(ctx, headSpec.RayStartParams, rayv1.HeadNode, headPort, "") initTemplateAnnotations(instance, &podTemplate) @@ -311,10 +317,17 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo // If the replica of workers is more than 1, `ObjectMeta.Name` may cause name conflict errors. // Hence, we set `ObjectMeta.Name` to an empty string, and use GenerateName to prevent name conflicts. podTemplate.ObjectMeta.Name = "" - if podTemplate.Labels == nil { - podTemplate.Labels = make(map[string]string) - } - podTemplate.Labels = labelPod(rayv1.WorkerNode, instance.Name, workerSpec.GroupName, workerSpec.Template.ObjectMeta.Labels) + + // Update rayStartParams with top-level Resources for worker group. + updateRayStartParamsResources(ctx, workerSpec.RayStartParams, workerSpec.Resources) + + // Update --labels` in rayStartParams with top-level Labels for worker group. + updateRayStartParamsLabels(workerSpec.RayStartParams, workerSpec.Labels) + + // Merge K8s labels from the Pod template and the top-level `Labels` field. + mergedLabels := mergeLabels(workerSpec.Template.ObjectMeta.Labels, workerSpec.Labels) + podTemplate.Labels = labelPod(rayv1.WorkerNode, instance.Name, workerSpec.GroupName, mergedLabels) + workerSpec.RayStartParams = setMissingRayStartParams(ctx, workerSpec.RayStartParams, rayv1.WorkerNode, headPort, fqdnRayIP) initTemplateAnnotations(instance, &podTemplate) @@ -1026,3 +1039,72 @@ func findMemoryReqOrLimit(container corev1.Container) (res *resource.Quantity) { } return nil } + +// updateRayStartParamsLabels reconciles `--labels` in rayStartParams based on group `Labels`. +func updateRayStartParamsLabels(rayStartParams map[string]string, groupLabels map[string]string) { + if len(groupLabels) == 0 { + return + } + var labels []string + // Sort label keys for deterministic output. + keys := make([]string, 0, len(groupLabels)) + for k := range groupLabels { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + labels = append(labels, fmt.Sprintf("%s=%s", k, groupLabels[k])) + } + rayStartParams["labels"] = strings.Join(labels, ",") +} + +// updateRayStartParamsResources reconciles rayStartParams based on the top-level `Resources` field. +func updateRayStartParamsResources(ctx context.Context, rayStartParams map[string]string, groupResources map[string]string) { + log := ctrl.LoggerFrom(ctx) + + if len(groupResources) == 0 { + return + } + // Override relevant rayStartParams fields to ensure consistency. + rayResourcesJson := make(map[string]float64) + for name, quantity := range groupResources { + q, err := resource.ParseQuantity(quantity) + if err != nil { + log.Info("Skipping resource %s: failed to parse quantity '%s': %v", name, quantity, err) + continue + } + + if name == string(corev1.ResourceCPU) { + rayStartParams["num-cpus"] = strconv.FormatInt(q.Value(), 10) + } else if name == string(corev1.ResourceMemory) { + rayStartParams["memory"] = strconv.FormatInt(q.Value(), 10) + } else if utils.IsGPUResourceKey(name) { + rayStartParams["num-gpus"] = strconv.FormatInt(q.Value(), 10) + } else { + rayResourcesJson[name] = q.AsApproximateFloat64() + } + } + + if len(rayResourcesJson) > 0 { + jsonBytes, err := json.Marshal(rayResourcesJson) + if err != nil { + log.Error(err, "Failed to marshal Ray Resources JSON for rayStartParams.") + return + } + rayStartParams["resources"] = fmt.Sprintf("'%s'", string(jsonBytes)) + } +} + +// mergeLabels combines labels from a pod template and a group `labels` spec, +// with the top-level labels field taking precedence. +func mergeLabels(templateLabels map[string]string, groupLabels map[string]string) map[string]string { + merged := make(map[string]string) + for k, v := range templateLabels { + merged[k] = v + } + for k, v := range groupLabels { + merged[k] = v + } + return merged +} diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index 907b5326efc..8b26b4bee5f 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -1888,3 +1888,171 @@ func TestSetAutoscalerV2EnvVars(t *testing.T) { }) } } + +func TestMergeLabels(t *testing.T) { + tests := map[string]struct { + templateSpecLabels map[string]string + workerGroupLabels map[string]string + expectedLabels map[string]string + }{ + "Non-overlapping labels don't override.": { + templateSpecLabels: map[string]string{"pod-label-key": "pod-label-value"}, + workerGroupLabels: map[string]string{"ray/io:some-label": "ray-node-label-value"}, + expectedLabels: map[string]string{"pod-label-key": "pod-label-value", "ray/io:some-label": "ray-node-label-value"}, + }, + "Overlapping labels override with precedence for group `Labels`.": { + templateSpecLabels: map[string]string{"accelerator-type": "GPU", "market-type": "spot"}, + workerGroupLabels: map[string]string{"accelerator-type": "TPU-V6E"}, + expectedLabels: map[string]string{"accelerator-type": "TPU-V6E", "market-type": "spot"}, + }, + "Empty Pod Spec labels.": { + templateSpecLabels: map[string]string{}, + workerGroupLabels: map[string]string{"group-labels": "group-label-value"}, + expectedLabels: map[string]string{"group-labels": "group-label-value"}, + }, + "Empty `Labels` field for group.": { + templateSpecLabels: map[string]string{"pod-label": "pod-label-value"}, + workerGroupLabels: map[string]string{}, + expectedLabels: map[string]string{"pod-label": "pod-label-value"}, + }, + "Both `Labels` and labels in Pod spec nil.": { + templateSpecLabels: nil, + workerGroupLabels: nil, + expectedLabels: map[string]string{}, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + merged := mergeLabels(tc.templateSpecLabels, tc.workerGroupLabels) + assert.Equal(t, tc.expectedLabels, merged) + }) + } +} + +func TestUpdateRayStartParamsLabels(t *testing.T) { + tests := map[string]struct { + initialRayStartParams map[string]string + groupLabels map[string]string + expectedRayStartParams map[string]string + }{ + "Set labels in group `Labels` to `--labels` empty rayStartParams.": { + initialRayStartParams: map[string]string{}, + groupLabels: map[string]string{ + "topology.kubernetes.io/zone": "us-central2", + "ray.io/node-group": "worker-group-1", + "cloud.google.com/gke-spot": "true", + }, + // The output string is sorted alphabetically by key. + expectedRayStartParams: map[string]string{ + "labels": "cloud.google.com/gke-spot=true,ray.io/node-group=worker-group-1,topology.kubernetes.io/zone=us-central2", + }, + }, + " group `Labels overwrites an existing '--labels' parameter in rayStartParams": { + initialRayStartParams: map[string]string{ + "labels": "old=label,to-be=replaced", + "resources": "some-resources", // should be retained + }, + groupLabels: map[string]string{ + "new": "label", + }, + expectedRayStartParams: map[string]string{ + "labels": "new=label", + "resources": "some-resources", + }, + }, + "No-op when group `Labels` is nil": { + initialRayStartParams: map[string]string{"labels": "some=labels"}, + groupLabels: nil, + expectedRayStartParams: map[string]string{"labels": "some=labels"}, + }, + "No-op when group `Labels` is empty": { + initialRayStartParams: map[string]string{"labels": "some=labels"}, + groupLabels: map[string]string{}, + expectedRayStartParams: map[string]string{"labels": "some=labels"}, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + // copy of rayStartParams for test + rayStartParams := make(map[string]string) + for k, v := range tc.initialRayStartParams { + rayStartParams[k] = v + } + + updateRayStartParamsLabels(rayStartParams, tc.groupLabels) + + assert.Equal(t, tc.expectedRayStartParams, rayStartParams) + }) + } +} + +func TestUpdateRayStartParamsResources(t *testing.T) { + ctx := context.Background() + + tests := map[string]struct { + initialRayStartParams map[string]string + groupResources map[string]string + expectedRayStartParams map[string]string + expectedK8sResources corev1.ResourceList + }{ + "No-op when group `Resources` is nil or empty": { + initialRayStartParams: map[string]string{"existing": "true"}, + groupResources: nil, + expectedRayStartParams: map[string]string{"existing": "true"}, + }, + "Basic CPU and Memory set in `Resources` override rayStartParams": { + initialRayStartParams: map[string]string{}, + groupResources: map[string]string{ + string(corev1.ResourceCPU): "2", + string(corev1.ResourceMemory): "4Gi", + }, + expectedRayStartParams: map[string]string{ + "num-cpus": "2", + "memory": "4294967296", // 4Gi in bytes + }, + }, + "GPU and custom TPU resource set in `Resources` override rayStartParams": { + initialRayStartParams: map[string]string{}, + groupResources: map[string]string{ + "nvidia.com/gpu": "1", + "TPU": "4", + }, + expectedRayStartParams: map[string]string{ + "num-gpus": "1", + "resources": "'{\"TPU\":4}'", + }, + }, + "Top-level `Resources` should override existing resource params": { + initialRayStartParams: map[string]string{ + "num-cpus": "1", + "memory": "1000", + "resources": "'{\"Custom-Resource\": 10}'", + }, + groupResources: map[string]string{ + string(corev1.ResourceCPU): "4", + "Custom-Resource": "5", + }, + expectedRayStartParams: map[string]string{ + "num-cpus": "4", + "memory": "1000", // preserved + "resources": "'{\"Custom-Resource\":5}'", + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + rayStartParams := make(map[string]string) + for k, v := range tc.initialRayStartParams { + rayStartParams[k] = v + } + + updateRayStartParamsResources(ctx, rayStartParams, tc.groupResources) + + // Verify rayStartParams are updated based on the top-level Resources values. + assert.Equal(t, tc.expectedRayStartParams, rayStartParams) + }) + } +} diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 74d2b4fe0e6..f804121591d 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -3,6 +3,7 @@ package utils import ( errstd "errors" "fmt" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" @@ -33,16 +34,73 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { return nil } +// validateRayGroupResources checks for conflicting resource definitions. +func validateRayGroupResources(groupName string, rayStartParams, resources map[string]string) error { + hasRayStartResources := rayStartParams["num-cpus"] != "" || + rayStartParams["num-gpus"] != "" || + rayStartParams["memory"] != "" || + rayStartParams["resources"] != "" + if hasRayStartResources && len(resources) > 0 { + return fmt.Errorf("resource fields should not be set in both rayStartParams and Resources for %s group; please use only one", groupName) + } + return nil +} + +// validateRayGroupLabels checks for invalid label definitions and correct label syntax. +func validateRayGroupLabels(groupName string, rayStartParams, labels map[string]string) error { + if _, ok := rayStartParams["labels"]; ok { + return fmt.Errorf("rayStartParams['labels'] is not supported for %s group; please use the top-level Labels field instead", groupName) + } + + // Validate that labels conforms to Kubernetes label syntax. + var allErrs []string + for key, val := range labels { + // Validate the label key. + if errs := validation.IsQualifiedName(key); len(errs) > 0 { + for _, err := range errs { + allErrs = append(allErrs, fmt.Sprintf("invalid label key for %s group: '%s', error: %s", groupName, key, err)) + } + } + + // Validate the label value. + if errs := validation.IsValidLabelValue(val); len(errs) > 0 { + for _, err := range errs { + allErrs = append(allErrs, fmt.Sprintf("invalid label value for key '%s' in %s group: '%s', error: %s", key, groupName, val, err)) + } + } + } + + if len(allErrs) > 0 { + return fmt.Errorf("%s", strings.Join(allErrs, "; ")) + } + + return nil +} + // Validation for invalid Ray Cluster configurations. func ValidateRayClusterSpec(spec *rayv1.RayClusterSpec, annotations map[string]string) error { if len(spec.HeadGroupSpec.Template.Spec.Containers) == 0 { return fmt.Errorf("headGroupSpec should have at least one container") } + if err := validateRayGroupResources("Head", spec.HeadGroupSpec.RayStartParams, spec.HeadGroupSpec.Resources); err != nil { + return err + } + if err := validateRayGroupLabels("Head", spec.HeadGroupSpec.RayStartParams, spec.HeadGroupSpec.Labels); err != nil { + return err + } + for _, workerGroup := range spec.WorkerGroupSpecs { if len(workerGroup.Template.Spec.Containers) == 0 { return fmt.Errorf("workerGroupSpec should have at least one container") } + + if err := validateRayGroupResources(workerGroup.GroupName, workerGroup.RayStartParams, workerGroup.Resources); err != nil { + return err + } + if err := validateRayGroupLabels(workerGroup.GroupName, workerGroup.RayStartParams, workerGroup.Labels); err != nil { + return err + } } if annotations[RayFTEnabledAnnotationKey] != "" && spec.GcsFaultToleranceOptions != nil { diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index dc464424f40..ec9bfb30a7b 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -683,6 +683,174 @@ func TestValidateRayClusterSpecAutoscaler(t *testing.T) { } } +func TestValidateRayClusterSpec_Resources(t *testing.T) { + // Util function to create a RayCluster spec. + createSpec := func() rayv1.RayClusterSpec { + return rayv1.RayClusterSpec{ + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: podTemplateSpec(nil, nil), + }, + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + { + GroupName: "worker-group", + Template: podTemplateSpec(nil, nil), + }, + }, + } + } + + tests := []struct { + name string + errorMessage string + spec rayv1.RayClusterSpec + expectError bool + }{ + { + name: "Invalid: Head group has resources in both rayStartParams and top-level Resources", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.HeadGroupSpec.RayStartParams = map[string]string{"num-cpus": "1"} + s.HeadGroupSpec.Resources = map[string]string{"CPU": "1"} + return s + }(), + expectError: true, + errorMessage: "resource fields should not be set in both rayStartParams and Resources for Head group; please use only one", + }, + { + name: "Invalid: Worker group has resources in both rayStartParams and .Resources", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.WorkerGroupSpecs[0].RayStartParams = map[string]string{"num-gpus": "1"} + s.WorkerGroupSpecs[0].Resources = map[string]string{"GPU": "1"} + return s + }(), + expectError: true, + errorMessage: "resource fields should not be set in both rayStartParams and Resources for worker-group group; please use only one", + }, + { + name: "Valid: Only rayStartParams resources are set for head", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.HeadGroupSpec.RayStartParams = map[string]string{ + "num-cpus": "2", + "memory": "4G", + "resources": "{\"TPU\": \"8\"}", + } + return s + }(), + expectError: false, + }, + { + name: "Valid: Only .Resources field is set for worker", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.WorkerGroupSpecs[0].Resources = map[string]string{"CPU": "2", "memory": "4G", "TPU": "8"} + return s + }(), + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidateRayClusterSpec(&tt.spec, nil) + if tt.expectError { + require.Error(t, err) + assert.EqualError(t, err, tt.errorMessage) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestValidateRayClusterSpec_Labels(t *testing.T) { + // Util function to create a RayCluster spec. + createSpec := func() rayv1.RayClusterSpec { + return rayv1.RayClusterSpec{ + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: podTemplateSpec(nil, nil), + }, + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + { + GroupName: "worker-group", + Template: podTemplateSpec(nil, nil), + }, + }, + } + } + + tests := []struct { + name string + errorMessage string + spec rayv1.RayClusterSpec + expectError bool + }{ + { + name: "Invalid: Head group has 'labels' in rayStartParams", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.HeadGroupSpec.RayStartParams = map[string]string{"labels": "ray.io/node-group=worker-group-1"} + return s + }(), + expectError: true, + errorMessage: "rayStartParams['labels'] is not supported for Head group; please use the top-level Labels field instead", + }, + { + name: "Invalid: Worker group has 'labels' in rayStartParams", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.WorkerGroupSpecs[0].RayStartParams = map[string]string{"labels": "ray.io/node-group=worker-group-1"} + return s + }(), + expectError: true, + errorMessage: "rayStartParams['labels'] is not supported for worker-group group; please use the top-level Labels field instead", + }, + { + name: "Valid: Only .Labels field is set", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.HeadGroupSpec.Labels = map[string]string{"ray.io/market-type": "on-demand"} + s.WorkerGroupSpecs[0].Labels = map[string]string{"ray.io/accelerator-type": "TPU-V6E"} + return s + }(), + expectError: false, + }, + { + name: "Invalid: Label key does not follow Kubernetes syntax", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.WorkerGroupSpecs[0].Labels = map[string]string{"invalid_key!": "value"} + return s + }(), + expectError: true, + errorMessage: "invalid label key for worker-group group: 'invalid_key!', error: name part must consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character", + }, + { + name: "Invalid: Label value does not follow Kubernetes syntax", + spec: func() rayv1.RayClusterSpec { + s := createSpec() + s.HeadGroupSpec.Labels = map[string]string{"valid-key": "invalid/value"} + return s + }(), + expectError: true, + errorMessage: "invalid label value for key 'valid-key' in Head group: 'invalid/value', error: a valid label must be an empty string or consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidateRayClusterSpec(&tt.spec, nil) + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errorMessage) + } else { + require.NoError(t, err) + } + }) + } +} + func TestValidateRayJobStatus(t *testing.T) { tests := []struct { name string diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/headgroupspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/headgroupspec.go index 6970ff2634a..c5233d65732 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/headgroupspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/headgroupspec.go @@ -13,6 +13,8 @@ type HeadGroupSpecApplyConfiguration struct { Template *corev1.PodTemplateSpecApplyConfiguration `json:"template,omitempty"` HeadService *apicorev1.Service `json:"headService,omitempty"` EnableIngress *bool `json:"enableIngress,omitempty"` + Resources map[string]string `json:"resources,omitempty"` + Labels map[string]string `json:"labels,omitempty"` RayStartParams map[string]string `json:"rayStartParams,omitempty"` ServiceType *apicorev1.ServiceType `json:"serviceType,omitempty"` } @@ -47,6 +49,34 @@ func (b *HeadGroupSpecApplyConfiguration) WithEnableIngress(value bool) *HeadGro return b } +// WithResources puts the entries into the Resources field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Resources field, +// overwriting an existing map entries in Resources field with the same key. +func (b *HeadGroupSpecApplyConfiguration) WithResources(entries map[string]string) *HeadGroupSpecApplyConfiguration { + if b.Resources == nil && len(entries) > 0 { + b.Resources = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Resources[k] = v + } + return b +} + +// WithLabels puts the entries into the Labels field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Labels field, +// overwriting an existing map entries in Labels field with the same key. +func (b *HeadGroupSpecApplyConfiguration) WithLabels(entries map[string]string) *HeadGroupSpecApplyConfiguration { + if b.Labels == nil && len(entries) > 0 { + b.Labels = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Labels[k] = v + } + return b +} + // WithRayStartParams puts the entries into the RayStartParams field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, the entries provided by each call will be put on the RayStartParams field, diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/workergroupspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/workergroupspec.go index 85199596806..7aa692895f1 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/workergroupspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/workergroupspec.go @@ -15,6 +15,8 @@ type WorkerGroupSpecApplyConfiguration struct { MinReplicas *int32 `json:"minReplicas,omitempty"` MaxReplicas *int32 `json:"maxReplicas,omitempty"` IdleTimeoutSeconds *int32 `json:"idleTimeoutSeconds,omitempty"` + Resources map[string]string `json:"resources,omitempty"` + Labels map[string]string `json:"labels,omitempty"` RayStartParams map[string]string `json:"rayStartParams,omitempty"` Template *corev1.PodTemplateSpecApplyConfiguration `json:"template,omitempty"` ScaleStrategy *ScaleStrategyApplyConfiguration `json:"scaleStrategy,omitempty"` @@ -75,6 +77,34 @@ func (b *WorkerGroupSpecApplyConfiguration) WithIdleTimeoutSeconds(value int32) return b } +// WithResources puts the entries into the Resources field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Resources field, +// overwriting an existing map entries in Resources field with the same key. +func (b *WorkerGroupSpecApplyConfiguration) WithResources(entries map[string]string) *WorkerGroupSpecApplyConfiguration { + if b.Resources == nil && len(entries) > 0 { + b.Resources = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Resources[k] = v + } + return b +} + +// WithLabels puts the entries into the Labels field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Labels field, +// overwriting an existing map entries in Labels field with the same key. +func (b *WorkerGroupSpecApplyConfiguration) WithLabels(entries map[string]string) *WorkerGroupSpecApplyConfiguration { + if b.Labels == nil && len(entries) > 0 { + b.Labels = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Labels[k] = v + } + return b +} + // WithRayStartParams puts the entries into the RayStartParams field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, the entries provided by each call will be put on the RayStartParams field,