Skip to content

Commit 51ef408

Browse files
committed
add multikueue tests for tfjob
1 parent a57b642 commit 51ef408

File tree

4 files changed

+175
-53
lines changed

4 files changed

+175
-53
lines changed

pkg/util/testingjobs/tfjob/wrappers_tfjob.go

+57-51
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ limitations under the License.
1717
package testing
1818

1919
import (
20-
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2120
corev1 "k8s.io/api/core/v1"
2221
"k8s.io/apimachinery/pkg/api/resource"
2322
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2423
"k8s.io/apimachinery/pkg/types"
2524
"k8s.io/utils/ptr"
2625

2726
"sigs.k8s.io/kueue/pkg/controller/constants"
27+
28+
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2829
)
2930

3031
// TFJobWrapper wraps a Job.
@@ -42,61 +43,46 @@ func MakeTFJob(name, ns string) *TFJobWrapper {
4243
RunPolicy: kftraining.RunPolicy{
4344
Suspend: ptr.To(true),
4445
},
45-
TFReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{
46-
kftraining.TFJobReplicaTypeChief: {
47-
Replicas: ptr.To[int32](1),
48-
Template: corev1.PodTemplateSpec{
49-
Spec: corev1.PodSpec{
50-
RestartPolicy: "Never",
51-
Containers: []corev1.Container{
52-
{
53-
Name: "c",
54-
Image: "pause",
55-
Command: []string{},
56-
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
57-
},
58-
},
59-
NodeSelector: map[string]string{},
60-
},
61-
},
62-
},
63-
kftraining.TFJobReplicaTypePS: {
64-
Replicas: ptr.To[int32](1),
65-
Template: corev1.PodTemplateSpec{
66-
Spec: corev1.PodSpec{
67-
RestartPolicy: "Never",
68-
Containers: []corev1.Container{
69-
{
70-
Name: "c",
71-
Image: "pause",
72-
Command: []string{},
73-
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
74-
},
75-
},
76-
NodeSelector: map[string]string{},
77-
},
78-
},
46+
TFReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{},
47+
},
48+
}}
49+
}
50+
51+
type TFReplicaSpecRequirement struct {
52+
ReplicaType kftraining.ReplicaType
53+
ReplicaCount int32
54+
Annotations map[string]string
55+
RestartPolicy kftraining.RestartPolicy
56+
Image string
57+
}
58+
59+
func (j *TFJobWrapper) TFReplicaSpecs(replicaSpecs ...TFReplicaSpecRequirement) *TFJobWrapper {
60+
j.Spec.TFReplicaSpecs = make(map[kftraining.ReplicaType]*kftraining.ReplicaSpec)
61+
62+
for _, rs := range replicaSpecs {
63+
j.Spec.TFReplicaSpecs[rs.ReplicaType] = &kftraining.ReplicaSpec{
64+
Replicas: ptr.To[int32](rs.ReplicaCount),
65+
Template: corev1.PodTemplateSpec{
66+
ObjectMeta: metav1.ObjectMeta{
67+
Annotations: rs.Annotations,
7968
},
80-
kftraining.TFJobReplicaTypeWorker: {
81-
Replicas: ptr.To[int32](1),
82-
Template: corev1.PodTemplateSpec{
83-
Spec: corev1.PodSpec{
84-
RestartPolicy: "Never",
85-
Containers: []corev1.Container{
86-
{
87-
Name: "c",
88-
Image: "pause",
89-
Command: []string{},
90-
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
91-
},
92-
},
93-
NodeSelector: map[string]string{},
69+
Spec: corev1.PodSpec{
70+
RestartPolicy: corev1.RestartPolicy(rs.RestartPolicy),
71+
Containers: []corev1.Container{
72+
{
73+
Name: "tensorflow", // each tfjob container must have the name "tensorflow"
74+
Image: rs.Image,
75+
Command: []string{},
76+
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
9477
},
9578
},
79+
NodeSelector: map[string]string{},
9680
},
9781
},
98-
},
99-
}}
82+
}
83+
}
84+
85+
return j
10086
}
10187

10288
// PriorityClass updates job priorityclass.
@@ -122,6 +108,20 @@ func (j *TFJobWrapper) Obj() *kftraining.TFJob {
122108
return &j.TFJob
123109
}
124110

111+
// Clone returns deep copy of the Job.
112+
func (j *TFJobWrapper) Clone() *TFJobWrapper {
113+
return &TFJobWrapper{TFJob: *j.DeepCopy()}
114+
}
115+
116+
// Label sets the label key and value
117+
func (j *TFJobWrapper) Label(key, value string) *TFJobWrapper {
118+
if j.Labels == nil {
119+
j.Labels = make(map[string]string)
120+
}
121+
j.Labels[key] = value
122+
return j
123+
}
124+
125125
// Queue updates the queue name of the job.
126126
func (j *TFJobWrapper) Queue(queue string) *TFJobWrapper {
127127
if j.Labels == nil {
@@ -155,3 +155,9 @@ func (j *TFJobWrapper) UID(uid string) *TFJobWrapper {
155155
j.ObjectMeta.UID = types.UID(uid)
156156
return j
157157
}
158+
159+
// Condition adds a condition
160+
func (j *TFJobWrapper) StatusConditions(c kftraining.JobCondition) *TFJobWrapper {
161+
j.Status.Conditions = append(j.Status.Conditions, c)
162+
return j
163+
}

test/e2e/multikueue/e2e_test.go

+100
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"os/exec"
2222

2323
"github.com/google/go-cmp/cmp/cmpopts"
24+
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2425
"github.com/onsi/ginkgo/v2"
2526
"github.com/onsi/gomega"
2627
batchv1 "k8s.io/api/batch/v1"
@@ -37,9 +38,11 @@ import (
3738
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3839
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
3940
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
41+
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
4042
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
4143
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
4244
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
45+
testing "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
4346
"sigs.k8s.io/kueue/pkg/workload"
4447
"sigs.k8s.io/kueue/test/util"
4548
)
@@ -364,6 +367,103 @@ var _ = ginkgo.Describe("MultiKueue", func() {
364367
util.IgnoreConditionTimestampsAndObservedGeneration)))
365368
})
366369
})
370+
ginkgo.It("Should run a kubeflow tfjob on worker if admitted", func() {
371+
tfJob := testing.MakeTFJob("tfjob1", managerNs.Name).
372+
Queue(managerLq.Name).
373+
TFReplicaSpecs(
374+
testing.TFReplicaSpecRequirement{
375+
ReplicaType: kftraining.TFJobReplicaTypePS,
376+
ReplicaCount: 1,
377+
Annotations: map[string]string{
378+
"sidecar.istio.io/inject": "false",
379+
},
380+
RestartPolicy: "Never",
381+
Image: "kubeflow/tf-dist-mnist-test:v1-855e096",
382+
},
383+
testing.TFReplicaSpecRequirement{
384+
ReplicaType: kftraining.TFJobReplicaTypeWorker,
385+
ReplicaCount: 2,
386+
Annotations: map[string]string{
387+
"sidecar.istio.io/inject": "false",
388+
},
389+
RestartPolicy: "OnFailure",
390+
Image: "kubeflow/tf-dist-mnist-test:v1-855e096",
391+
},
392+
).
393+
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceCPU, "1").
394+
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M").
395+
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceCPU, "0.5").
396+
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceMemory, "100M").
397+
Obj()
398+
399+
ginkgo.By("Creating the tfJob", func() {
400+
gomega.Expect(k8sManagerClient.Create(ctx, tfJob)).Should(gomega.Succeed())
401+
})
402+
403+
createdLeaderWorkload := &kueue.Workload{}
404+
wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name}
405+
406+
// the execution should be given to the worker
407+
ginkgo.By("Waiting to be admitted in worker1 and manager", func() {
408+
gomega.Eventually(func(g gomega.Gomega) {
409+
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())
410+
g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
411+
Type: kueue.WorkloadAdmitted,
412+
Status: metav1.ConditionTrue,
413+
Reason: "Admitted",
414+
Message: "The workload is admitted",
415+
}, util.IgnoreConditionTimestampsAndObservedGeneration))
416+
g.Expect(workload.FindAdmissionCheck(createdLeaderWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
417+
Name: multiKueueAc.Name,
418+
State: kueue.CheckStateReady,
419+
Message: `The workload got reservation on "worker1"`,
420+
}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")))
421+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
422+
})
423+
424+
ginkgo.By("Waiting for the tfJob to get status updates", func() {
425+
gomega.Eventually(func(g gomega.Gomega) {
426+
createdTfJob := &kftraining.TFJob{}
427+
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(tfJob), createdTfJob)).To(gomega.Succeed())
428+
g.Expect(createdTfJob.Status.ReplicaStatuses).To(gomega.BeComparableTo(
429+
map[kftraining.ReplicaType]*kftraining.ReplicaStatus{
430+
kftraining.TFJobReplicaTypePS: {
431+
Active: 1,
432+
Succeeded: 0,
433+
},
434+
kftraining.TFJobReplicaTypeWorker: {
435+
Active: 2,
436+
Succeeded: 0,
437+
},
438+
},
439+
util.IgnoreConditionTimestampsAndObservedGeneration))
440+
}, 3*util.LongTimeout, util.Interval).Should(gomega.Succeed())
441+
})
442+
443+
ginkgo.By("Waiting for the tfJob to finish", func() {
444+
gomega.Eventually(func(g gomega.Gomega) {
445+
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())
446+
447+
g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{
448+
Type: kueue.WorkloadFinished,
449+
Status: metav1.ConditionTrue,
450+
Reason: kueue.WorkloadFinishedReasonSucceeded,
451+
Message: fmt.Sprintf("TFJob %s/%s successfully completed.", createdLeaderWorkload.Namespace, tfJob.Name),
452+
}, util.IgnoreConditionTimestampsAndObservedGeneration))
453+
}, 2*util.LongTimeout, util.Interval).Should(gomega.Succeed())
454+
})
455+
456+
ginkgo.By("Checking no objects are left in the worker clusters and the tfJob is completed", func() {
457+
gomega.Eventually(func(g gomega.Gomega) {
458+
workerWl := &kueue.Workload{}
459+
g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
460+
g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
461+
workerTfJob := &kftraining.TFJob{}
462+
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError())
463+
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError())
464+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
465+
})
466+
})
367467
})
368468
ginkgo.When("The connection to a worker cluster is unreliable", func() {
369469
ginkgo.It("Should update the cluster status to reflect the connection state", func() {

test/e2e/multikueue/suite_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"testing"
2424
"time"
2525

26+
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2627
"github.com/onsi/ginkgo/v2"
2728
"github.com/onsi/gomega"
2829
authenticationv1 "k8s.io/api/authentication/v1"
@@ -86,6 +87,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig
8687
policyRule(jobset.SchemeGroupVersion.Group, "jobsets/status", "get"),
8788
policyRule(kueue.SchemeGroupVersion.Group, "workloads", resourceVerbs...),
8889
policyRule(kueue.SchemeGroupVersion.Group, "workloads/status", "get", "patch", "update"),
90+
policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs", resourceVerbs...),
91+
policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs/status", "get"),
8992
},
9093
}
9194
err := c.Create(ctx, cr)
@@ -223,6 +226,10 @@ var _ = ginkgo.BeforeSuite(func() {
223226
util.WaitForJobSetAvailability(ctx, k8sWorker1Client)
224227
util.WaitForJobSetAvailability(ctx, k8sWorker2Client)
225228

229+
// there should not be a kubeflow operator in manager cluster
230+
util.WaitForKubeFlowAvailability(ctx, k8sWorker1Client)
231+
util.WaitForKubeFlowAvailability(ctx, k8sWorker2Client)
232+
226233
ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart))
227234

228235
discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerCfg)

test/util/e2e.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77

88
"github.com/google/go-cmp/cmp/cmpopts"
9+
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
910
"github.com/onsi/gomega"
1011
appsv1 "k8s.io/api/apps/v1"
1112
corev1 "k8s.io/api/core/v1"
@@ -43,6 +44,9 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config)
4344
err = jobset.AddToScheme(scheme.Scheme)
4445
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
4546

47+
err = kftraining.AddToScheme(scheme.Scheme)
48+
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
49+
4650
client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme})
4751
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
4852
return client, cfg
@@ -100,6 +104,11 @@ func WaitForKueueAvailability(ctx context.Context, k8sClient client.Client) {
100104
}
101105

102106
func WaitForJobSetAvailability(ctx context.Context, k8sClient client.Client) {
103-
kcmKey := types.NamespacedName{Namespace: "jobset-system", Name: "jobset-controller-manager"}
104-
waitForOperatorAvailability(ctx, k8sClient, kcmKey)
107+
jcmKey := types.NamespacedName{Namespace: "jobset-system", Name: "jobset-controller-manager"}
108+
waitForOperatorAvailability(ctx, k8sClient, jcmKey)
109+
}
110+
111+
func WaitForKubeFlowAvailability(ctx context.Context, k8sClient client.Client) {
112+
kftoKey := types.NamespacedName{Namespace: "kubeflow", Name: "training-operator"}
113+
waitForOperatorAvailability(ctx, k8sClient, kftoKey)
105114
}

0 commit comments

Comments
 (0)