Skip to content

Commit dfeed19

Browse files
committed
Revert interface migration
Signed-off-by: win5923 <[email protected]>
1 parent 2ee7e8a commit dfeed19

File tree

8 files changed

+83
-54
lines changed

8 files changed

+83
-54
lines changed

ray-operator/controllers/ray/batchscheduler/interface/interface.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package schedulerinterface
33
import (
44
"context"
55

6+
corev1 "k8s.io/api/core/v1"
67
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
78
"k8s.io/apimachinery/pkg/runtime"
89
"k8s.io/client-go/rest"
910
"sigs.k8s.io/controller-runtime/pkg/builder"
1011
"sigs.k8s.io/controller-runtime/pkg/client"
12+
13+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1114
)
1215

1316
// BatchScheduler manages submitting RayCluster pods to a third-party scheduler.
@@ -20,6 +23,11 @@ type BatchScheduler interface {
2023
// For most batch schedulers, this results in the creation of a PodGroup.
2124
DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error
2225

26+
// AddMetadataToPod enriches the pod with metadata necessary to tie it to the scheduler.
27+
// For example, setting labels for queues / priority, and setting schedulerName.
28+
// This function will be removed once Rayjob Volcano scheduler integration is completed.
29+
AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod)
30+
2331
// AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler.
2432
// For example, setting labels for queues / priority, and setting schedulerName.
2533
AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string)
@@ -55,6 +63,9 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context,
5563
return nil
5664
}
5765

66+
func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
67+
}
68+
5869
func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
5970
}
6071

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"sigs.k8s.io/controller-runtime/pkg/builder"
1919
"sigs.k8s.io/controller-runtime/pkg/client"
2020

21+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2122
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
2223
)
2324

@@ -37,33 +38,23 @@ func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1
3738
return nil
3839
}
3940

40-
func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, _ string) {
41+
func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
4142
logger := ctrl.LoggerFrom(ctx).WithName("kai-scheduler")
42-
addSchedulerNameToObject(child, k.Name())
43+
pod.Spec.SchedulerName = k.Name()
4344

44-
parentLabel := parent.GetLabels()
45-
queue, ok := parentLabel[QueueLabelName]
45+
queue, ok := app.Labels[QueueLabelName]
4646
if !ok || queue == "" {
47-
logger.Info("Queue label missing from parent; child will remain pending",
47+
logger.Info("Queue label missing from RayCluster; pods will remain pending",
4848
"requiredLabel", QueueLabelName)
4949
return
5050
}
51-
52-
childLabels := child.GetLabels()
53-
if childLabels == nil {
54-
childLabels = make(map[string]string)
51+
if pod.Labels == nil {
52+
pod.Labels = make(map[string]string)
5553
}
56-
childLabels[QueueLabelName] = queue
57-
child.SetLabels(childLabels)
54+
pod.Labels[QueueLabelName] = queue
5855
}
5956

60-
func addSchedulerNameToObject(obj metav1.Object, schedulerName string) {
61-
switch obj := obj.(type) {
62-
case *corev1.Pod:
63-
obj.Spec.SchedulerName = schedulerName
64-
case *corev1.PodTemplateSpec:
65-
obj.Spec.SchedulerName = schedulerName
66-
}
57+
func (k *KaiScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
6758
}
6859

6960
func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func createTestPod() *corev1.Pod {
4141
}
4242
}
4343

44-
func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) {
44+
func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
4545
a := assert.New(t)
4646
scheduler := &KaiScheduler{}
4747
ctx := context.Background()
@@ -52,8 +52,8 @@ func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) {
5252
})
5353
pod := createTestPod()
5454

55-
// Call AddMetadataToChildResource
56-
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
55+
// Call AddMetadataToPod
56+
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
5757

5858
// Assert scheduler name is set to kai-scheduler
5959
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -63,7 +63,7 @@ func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) {
6363
a.Equal("test-queue", pod.Labels[QueueLabelName])
6464
}
6565

66-
func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) {
66+
func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
6767
a := assert.New(t)
6868
scheduler := &KaiScheduler{}
6969
ctx := context.Background()
@@ -72,8 +72,8 @@ func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) {
7272
rayCluster := createTestRayCluster(map[string]string{})
7373
pod := createTestPod()
7474

75-
// Call AddMetadataToChildResource
76-
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
75+
// Call AddMetadataToPod
76+
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
7777

7878
// Assert scheduler name is still set (always required)
7979
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -85,7 +85,7 @@ func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) {
8585
}
8686
}
8787

88-
func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) {
88+
func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
8989
a := assert.New(t)
9090
scheduler := &KaiScheduler{}
9191
ctx := context.Background()
@@ -96,8 +96,8 @@ func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) {
9696
})
9797
pod := createTestPod()
9898

99-
// Call AddMetadataToChildResource
100-
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
99+
// Call AddMetadataToPod
100+
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
101101

102102
// Assert scheduler name is still set
103103
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -109,7 +109,7 @@ func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) {
109109
}
110110
}
111111

112-
func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) {
112+
func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) {
113113
a := assert.New(t)
114114
scheduler := &KaiScheduler{}
115115
ctx := context.Background()
@@ -126,8 +126,8 @@ func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) {
126126
"app": "ray",
127127
}
128128

129-
// Call AddMetadataToChildResource
130-
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
129+
// Call AddMetadataToPod
130+
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
131131

132132
// Assert scheduler name is set
133133
a.Equal("kai-scheduler", pod.Spec.SchedulerName)

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -93,32 +93,21 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, objec
9393
return nil
9494
}
9595

96-
// AddMetadataToPod adds essential labels and annotations to the child resource.
96+
// AddMetadataToPod adds essential labels and annotations to the Ray pod
9797
// the scheduler needs these labels and annotations in order to do the scheduling properly
98-
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, _ string) {
99-
// when gang scheduling is enabled, extra labels need to be added to all child resources
100-
if k.isGangSchedulingEnabled(parent) {
101-
labels := child.GetLabels()
102-
if labels == nil {
103-
labels = make(map[string]string)
104-
}
105-
labels[kubeSchedulerPodGroupLabelKey] = parent.GetName()
106-
child.SetLabels(labels)
98+
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) {
99+
// when gang scheduling is enabled, extra labels need to be added to all pods
100+
if k.isGangSchedulingEnabled(rayCluster) {
101+
pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name
107102
}
108-
addSchedulerNameToObject(child, k.Name())
103+
pod.Spec.SchedulerName = k.Name()
109104
}
110105

111-
func addSchedulerNameToObject(obj metav1.Object, schedulerName string) {
112-
switch obj := obj.(type) {
113-
case *corev1.Pod:
114-
obj.Spec.SchedulerName = schedulerName
115-
case *corev1.PodTemplateSpec:
116-
obj.Spec.SchedulerName = schedulerName
117-
}
106+
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
118107
}
119108

120-
func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
121-
_, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled]
109+
func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
110+
_, exist := app.Labels[utils.RayGangSchedulingEnabled]
122111
return exist
123112
}
124113

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func TestCreatePodGroupWithMultipleHosts(t *testing.T) {
117117
a.Equal(int32(5), podGroup.Spec.MinMember)
118118
}
119119

120-
func TestAddMetadataToChildResource(t *testing.T) {
120+
func TestAddMetadataToPod(t *testing.T) {
121121
tests := []struct {
122122
name string
123123
enableGang bool
@@ -150,7 +150,7 @@ func TestAddMetadataToChildResource(t *testing.T) {
150150
}
151151

152152
scheduler := &KubeScheduler{}
153-
scheduler.AddMetadataToChildResource(context.TODO(), &cluster, pod, "worker")
153+
scheduler.AddMetadataToPod(context.TODO(), &cluster, "worker", pod)
154154

155155
if tt.enableGang {
156156
a.Equal(cluster.Name, pod.Labels[kubeSchedulerPodGroupLabelKey])

ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,18 @@ func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, pa
213213
addSchedulerName(child, v.Name())
214214
}
215215

216+
func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
217+
pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app)
218+
pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName
219+
if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok {
220+
pod.Labels[QueueNameLabelKey] = queue
221+
}
222+
if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok {
223+
pod.Spec.PriorityClassName = priorityClassName
224+
}
225+
pod.Spec.SchedulerName = v.Name()
226+
}
227+
216228
func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
217229
if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil {
218230
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)

ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,32 @@ func (y *YuniKornScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
122122
return exist
123123
}
124124

125+
// AddMetadataToPod adds essential labels and annotations to the Ray pod
126+
// the yunikorn scheduler needs these labels and annotations in order to do the scheduling properly
127+
func (y *YuniKornScheduler) AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
128+
logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName)
129+
// the applicationID and queue name must be provided in the labels
130+
populateLabelsFromObject(rayCluster, pod, RayApplicationIDLabelName, YuniKornPodApplicationIDLabelName)
131+
populateLabelsFromObject(rayCluster, pod, RayApplicationQueueLabelName, YuniKornPodQueueLabelName)
132+
addSchedulerNameToObject(pod, y.Name())
133+
134+
// when gang scheduling is enabled, extra annotations need to be added to all pods
135+
if y.isGangSchedulingEnabled(rayCluster) {
136+
// populate the taskGroups info to each pod
137+
err := propagateTaskGroupsAnnotation(rayCluster, pod)
138+
if err != nil {
139+
logger.Error(err, "failed to add gang scheduling related annotations to pod, "+
140+
"gang scheduling will not be enabled for this workload",
141+
"name", pod.Name, "namespace", pod.Namespace)
142+
return
143+
}
144+
145+
// set the task group name based on the head or worker group name
146+
// the group name for the head and each of the worker group should be different
147+
pod.Annotations[YuniKornTaskGroupNameAnnotationName] = groupName
148+
}
149+
}
150+
125151
func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) {
126152
logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName)
127153

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -928,7 +928,7 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1
928928
// call the scheduler plugin if so
929929
if r.options.BatchSchedulerManager != nil {
930930
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
931-
scheduler.AddMetadataToChildResource(ctx, &instance, &pod, utils.RayNodeHeadGroupLabelValue)
931+
scheduler.AddMetadataToPod(ctx, &instance, utils.RayNodeHeadGroupLabelValue, &pod)
932932
} else {
933933
return err
934934
}
@@ -951,7 +951,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray
951951
pod := r.buildWorkerPod(ctx, instance, worker)
952952
if r.options.BatchSchedulerManager != nil {
953953
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
954-
scheduler.AddMetadataToChildResource(ctx, &instance, &pod, worker.GroupName)
954+
scheduler.AddMetadataToPod(ctx, &instance, worker.GroupName, &pod)
955955
} else {
956956
return err
957957
}

0 commit comments

Comments
 (0)