From a2c1a5a1137a0020a28133c80c1d02bec00db9fc Mon Sep 17 00:00:00 2001 From: Ming Qiu Date: Tue, 2 Apr 2024 08:49:33 +0000 Subject: [PATCH] Fix cancel bug && adjust StartTimestamp for data mover Signed-off-by: Ming Qiu --- pkg/controller/data_download_controller.go | 8 +++- .../data_download_controller_test.go | 35 +++++++++++++-- pkg/controller/data_upload_controller.go | 8 +++- pkg/controller/data_upload_controller_test.go | 44 +++++++++++++++---- 4 files changed, 80 insertions(+), 15 deletions(-) diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 68b8a17ed3..f60c17fc68 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -271,6 +271,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress + dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { log.WithError(err).Error("Unable to update status to in progress") return ctrl.Result{}, err @@ -290,7 +291,11 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request log.Info("Data download is being canceled") fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name) if fsRestore == nil { - r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) + if r.nodeName == dd.Status.Node { + r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) + } else { + log.Info("Data path is not started in this node and will not canceled by current node") + } return ctrl.Result{}, nil } @@ -668,7 +673,6 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel updateFunc := func(datadownload *velerov2alpha1api.DataDownload) { datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted - datadownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} labels := datadownload.GetLabels() if labels == nil { labels = make(map[string]string) diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 1e7268fcab..a36730b64f 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -149,7 +149,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... dataPathMgr := datapath.NewManager(1) - return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { @@ -207,11 +207,40 @@ func TestDataDownloadReconcile(t *testing.T) { mockCancel: true, }, { - name: "Cancel data downloand in progress", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result(), + name: "Cancel data downloand in progress with create FSBR", + dd: func() *velerov2alpha1api.DataDownload { + dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result() + dd.Status.Node = "test-node" + return dd + }(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), needCreateFSBR: true, mockCancel: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceling).Result(), + }, + { + name: "Cancel data downloand in progress without create FSBR", + dd: func() *velerov2alpha1api.DataDownload { + dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result() + dd.Status.Node = "test-node" + return dd + }(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + needCreateFSBR: false, + mockCancel: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), + }, + { + name: "Cancel data downloand in progress in different node", + dd: func() *velerov2alpha1api.DataDownload { + dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result() + dd.Status.Node = "different-node" + return dd + }(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + needCreateFSBR: false, + mockCancel: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), }, { name: "Error in data path is concurrent limited", diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index d57c9d1573..26a8e74fcc 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -282,6 +282,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress + du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { return r.errorOut(ctx, du, err, "error updating dataupload status", log) } @@ -300,7 +301,11 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) if fsBackup == nil { - r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) + if du.Status.Node == r.nodeName { + r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) + } else { + log.Info("Data path is not started in this node and will not canceled by current node") + } return ctrl.Result{}, nil } @@ -720,7 +725,6 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov updateFunc := func(dataUpload *velerov2alpha1api.DataUpload) { dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted - dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} labels := dataUpload.GetLabels() if labels == nil { labels = make(map[string]string) diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 11b76f5f73..7fc4514c83 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -233,7 +233,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci return nil, err } return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, - testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func dataUploadBuilder() *builder.DataUploadBuilder { @@ -336,6 +336,7 @@ func TestReconcile(t *testing.T) { expectedErrMsg string needErrs []bool peekErr error + notCreateFSBR bool }{ { name: "Dataupload is not initialized", @@ -412,6 +413,32 @@ func TestReconcile(t *testing.T) { expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), expectedRequeue: ctrl.Result{}, }, + { + name: "Dataupload should be cancel with match node", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: func() *velerov2alpha1api.DataUpload { + du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result() + du.Status.Node = "test-node" + return du + }(), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), + expectedRequeue: ctrl.Result{}, + notCreateFSBR: true, + }, + { + name: "Dataupload should not be cancel with dismatch node", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: func() *velerov2alpha1api.DataUpload { + du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result() + du.Status.Node = "different_node" + return du + }(), + expectedProcessed: false, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + notCreateFSBR: true, + }, { name: "runCancelableDataUpload is concurrent limited", dataMgr: datapath.NewManager(0), @@ -511,16 +538,17 @@ func TestReconcile(t *testing.T) { } else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} } - - datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { - return &fakeDataUploadFSBR{ - du: test.du, - kubeClient: r.client, - clock: r.Clock, + if !test.notCreateFSBR { + datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return &fakeDataUploadFSBR{ + du: test.du, + kubeClient: r.client, + clock: r.Clock, + } } } - if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR { if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { _, err := r.dataPathMgr.CreateFileSystemBR(test.du.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, velerotest.NewLogger()) require.NoError(t, err)