Skip to content

Commit 74aeec5

Browse files
committed
add multikueue tests for tfjob
1 parent b6b7a7d commit 74aeec5

File tree

4 files changed

+161
-3
lines changed

4 files changed

+161
-3
lines changed

pkg/util/testingjobs/tfjob/wrappers_tfjob.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ limitations under the License.
1717
package testing
1818

1919
import (
20-
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2120
corev1 "k8s.io/api/core/v1"
2221
"k8s.io/apimachinery/pkg/api/resource"
2322
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2423
"k8s.io/apimachinery/pkg/types"
2524
"k8s.io/utils/ptr"
2625

2726
"sigs.k8s.io/kueue/pkg/controller/constants"
27+
28+
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2829
)
2930

3031
// TFJobWrapper wraps a Job.
@@ -150,6 +151,20 @@ func (j *TFJobWrapper) Obj() *kftraining.TFJob {
150151
return &j.TFJob
151152
}
152153

154+
// Clone returns deep copy of the TFJobWrapper.
155+
func (j *TFJobWrapper) Clone() *TFJobWrapper {
156+
return &TFJobWrapper{TFJob: *j.DeepCopy()}
157+
}
158+
159+
// Label sets the label key and value
160+
func (j *TFJobWrapper) Label(key, value string) *TFJobWrapper {
161+
if j.Labels == nil {
162+
j.Labels = make(map[string]string)
163+
}
164+
j.Labels[key] = value
165+
return j
166+
}
167+
153168
// Queue updates the queue name of the job.
154169
func (j *TFJobWrapper) Queue(queue string) *TFJobWrapper {
155170
if j.Labels == nil {
@@ -183,3 +198,15 @@ func (j *TFJobWrapper) UID(uid string) *TFJobWrapper {
183198
j.ObjectMeta.UID = types.UID(uid)
184199
return j
185200
}
201+
202+
// Condition adds a condition
203+
func (j *TFJobWrapper) StatusConditions(c kftraining.JobCondition) *TFJobWrapper {
204+
j.Status.Conditions = append(j.Status.Conditions, c)
205+
return j
206+
}
207+
208+
func (j *TFJobWrapper) Image(replicaType kftraining.ReplicaType, image string, args []string) *TFJobWrapper {
209+
j.Spec.TFReplicaSpecs[replicaType].Template.Spec.Containers[0].Image = image
210+
j.Spec.TFReplicaSpecs[replicaType].Template.Spec.Containers[0].Args = args
211+
return j
212+
}

test/e2e/multikueue/e2e_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"os/exec"
2222

2323
"github.com/google/go-cmp/cmp/cmpopts"
24+
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2425
"github.com/onsi/ginkgo/v2"
2526
"github.com/onsi/gomega"
2627
batchv1 "k8s.io/api/batch/v1"
@@ -37,9 +38,11 @@ import (
3738
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3839
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
3940
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
41+
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
4042
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
4143
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
4244
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
45+
testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
4346
"sigs.k8s.io/kueue/pkg/workload"
4447
"sigs.k8s.io/kueue/test/util"
4548
)
@@ -364,6 +367,118 @@ var _ = ginkgo.Describe("MultiKueue", func() {
364367
util.IgnoreConditionTimestampsAndObservedGeneration)))
365368
})
366369
})
370+
ginkgo.It("Should run a kubeflow TFJob on worker if admitted", func() {
371+
tfJob := testingtfjob.MakeTFJob("tfjob1", managerNs.Name).
372+
Queue(managerLq.Name).
373+
TFReplicaSpecs(
374+
testingtfjob.TFReplicaSpecRequirement{
375+
ReplicaType: kftraining.TFJobReplicaTypeChief,
376+
ReplicaCount: 1,
377+
Annotations: map[string]string{
378+
"sidecar.istio.io/inject": "false",
379+
},
380+
RestartPolicy: "OnFailure",
381+
},
382+
testingtfjob.TFReplicaSpecRequirement{
383+
ReplicaType: kftraining.TFJobReplicaTypePS,
384+
ReplicaCount: 1,
385+
Annotations: map[string]string{
386+
"sidecar.istio.io/inject": "false",
387+
},
388+
RestartPolicy: "Never",
389+
},
390+
testingtfjob.TFReplicaSpecRequirement{
391+
ReplicaType: kftraining.TFJobReplicaTypeWorker,
392+
ReplicaCount: 2,
393+
Annotations: map[string]string{
394+
"sidecar.istio.io/inject": "false",
395+
},
396+
RestartPolicy: "OnFailure",
397+
},
398+
).
399+
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceCPU, "0.5").
400+
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M").
401+
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceCPU, "0.5").
402+
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M").
403+
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceCPU, "0.5").
404+
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceMemory, "100M").
405+
Image(kftraining.TFJobReplicaTypeChief, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}).
406+
Image(kftraining.TFJobReplicaTypePS, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}).
407+
Image(kftraining.TFJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}).
408+
Obj()
409+
410+
ginkgo.By("Creating the TfJob", func() {
411+
gomega.Expect(k8sManagerClient.Create(ctx, tfJob)).Should(gomega.Succeed())
412+
})
413+
414+
createdLeaderWorkload := &kueue.Workload{}
415+
wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name}
416+
417+
// the execution should be given to the worker
418+
ginkgo.By("Waiting to be admitted in worker1 and manager", func() {
419+
gomega.Eventually(func(g gomega.Gomega) {
420+
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())
421+
g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
422+
Type: kueue.WorkloadAdmitted,
423+
Status: metav1.ConditionTrue,
424+
Reason: "Admitted",
425+
Message: "The workload is admitted",
426+
}, util.IgnoreConditionTimestampsAndObservedGeneration))
427+
g.Expect(workload.FindAdmissionCheck(createdLeaderWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
428+
Name: multiKueueAc.Name,
429+
State: kueue.CheckStateReady,
430+
Message: `The workload got reservation on "worker1"`,
431+
}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")))
432+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
433+
})
434+
435+
ginkgo.By("Waiting for the TfJob to get status updates", func() {
436+
gomega.Eventually(func(g gomega.Gomega) {
437+
createdTfJob := &kftraining.TFJob{}
438+
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(tfJob), createdTfJob)).To(gomega.Succeed())
439+
g.Expect(createdTfJob.Status.ReplicaStatuses).To(gomega.BeComparableTo(
440+
map[kftraining.ReplicaType]*kftraining.ReplicaStatus{
441+
kftraining.TFJobReplicaTypeChief: {
442+
Active: 1,
443+
Succeeded: 0,
444+
},
445+
kftraining.TFJobReplicaTypePS: {
446+
Active: 1,
447+
Succeeded: 0,
448+
},
449+
kftraining.TFJobReplicaTypeWorker: {
450+
Active: 2,
451+
Succeeded: 0,
452+
},
453+
},
454+
util.IgnoreConditionTimestampsAndObservedGeneration))
455+
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
456+
})
457+
458+
ginkgo.By("Waiting for the TfJob to finish", func() {
459+
gomega.Eventually(func(g gomega.Gomega) {
460+
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())
461+
462+
g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{
463+
Type: kueue.WorkloadFinished,
464+
Status: metav1.ConditionTrue,
465+
Reason: kueue.WorkloadFinishedReasonSucceeded,
466+
Message: fmt.Sprintf("TFJob %s/%s successfully completed.", createdLeaderWorkload.Namespace, tfJob.Name),
467+
}, util.IgnoreConditionTimestampsAndObservedGeneration))
468+
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
469+
})
470+
471+
ginkgo.By("Checking no objects are left in the worker clusters and the TfJob is completed", func() {
472+
gomega.Eventually(func(g gomega.Gomega) {
473+
workerWl := &kueue.Workload{}
474+
g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
475+
g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
476+
workerTfJob := &kftraining.TFJob{}
477+
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError())
478+
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError())
479+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
480+
})
481+
})
367482
})
368483
ginkgo.When("The connection to a worker cluster is unreliable", func() {
369484
ginkgo.It("Should update the cluster status to reflect the connection state", func() {

test/e2e/multikueue/suite_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"testing"
2424
"time"
2525

26+
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2627
"github.com/onsi/ginkgo/v2"
2728
"github.com/onsi/gomega"
2829
authenticationv1 "k8s.io/api/authentication/v1"
@@ -86,6 +87,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig
8687
policyRule(jobset.SchemeGroupVersion.Group, "jobsets/status", "get"),
8788
policyRule(kueue.SchemeGroupVersion.Group, "workloads", resourceVerbs...),
8889
policyRule(kueue.SchemeGroupVersion.Group, "workloads/status", "get", "patch", "update"),
90+
policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs", resourceVerbs...),
91+
policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs/status", "get"),
8992
},
9093
}
9194
err := c.Create(ctx, cr)
@@ -223,6 +226,10 @@ var _ = ginkgo.BeforeSuite(func() {
223226
util.WaitForJobSetAvailability(ctx, k8sWorker1Client)
224227
util.WaitForJobSetAvailability(ctx, k8sWorker2Client)
225228

229+
// there should not be a kubeflow operator in manager cluster
230+
util.WaitForKubeFlowAvailability(ctx, k8sWorker1Client)
231+
util.WaitForKubeFlowAvailability(ctx, k8sWorker2Client)
232+
226233
ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart))
227234

228235
discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerCfg)

test/util/e2e.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77

88
"github.com/google/go-cmp/cmp/cmpopts"
9+
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
910
"github.com/onsi/gomega"
1011
appsv1 "k8s.io/api/apps/v1"
1112
corev1 "k8s.io/api/core/v1"
@@ -43,6 +44,9 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config)
4344
err = jobset.AddToScheme(scheme.Scheme)
4445
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
4546

47+
err = kftraining.AddToScheme(scheme.Scheme)
48+
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
49+
4650
client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme})
4751
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
4852
return client, cfg
@@ -100,6 +104,11 @@ func WaitForKueueAvailability(ctx context.Context, k8sClient client.Client) {
100104
}
101105

102106
func WaitForJobSetAvailability(ctx context.Context, k8sClient client.Client) {
103-
kcmKey := types.NamespacedName{Namespace: "jobset-system", Name: "jobset-controller-manager"}
104-
waitForOperatorAvailability(ctx, k8sClient, kcmKey)
107+
jcmKey := types.NamespacedName{Namespace: "jobset-system", Name: "jobset-controller-manager"}
108+
waitForOperatorAvailability(ctx, k8sClient, jcmKey)
109+
}
110+
111+
func WaitForKubeFlowAvailability(ctx context.Context, k8sClient client.Client) {
112+
kftoKey := types.NamespacedName{Namespace: "kubeflow", Name: "training-operator"}
113+
waitForOperatorAvailability(ctx, k8sClient, kftoKey)
105114
}

0 commit comments

Comments
 (0)