From 4dea3a48e8a4cc56debe634d62b758ae5b2c92ce Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 9 Aug 2024 14:34:48 +0800 Subject: [PATCH] data mover ms smoking test Signed-off-by: Lyndon-Li --- pkg/cmd/cli/datamover/backup.go | 1 + pkg/cmd/cli/datamover/restore.go | 1 + pkg/controller/data_download_controller.go | 72 ++++---- .../data_download_controller_test.go | 49 +++++- pkg/controller/data_upload_controller.go | 75 +++++---- pkg/controller/data_upload_controller_test.go | 156 ++++++++++-------- pkg/datamover/backup_micro_service.go | 14 +- pkg/datamover/backup_micro_service_test.go | 6 +- pkg/datamover/restore_micro_service.go | 10 +- pkg/datamover/restore_micro_service_test.go | 6 +- pkg/datapath/file_system.go | 46 +++++- pkg/datapath/file_system_test.go | 2 + pkg/datapath/micro_service_watcher.go | 94 ++++++----- pkg/datapath/micro_service_watcher_test.go | 2 +- 14 files changed, 324 insertions(+), 210 deletions(-) diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index 35f483d921..ca600faab5 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -233,6 +233,7 @@ func (s *dataMoverBackup) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err) return diff --git a/pkg/cmd/cli/datamover/restore.go b/pkg/cmd/cli/datamover/restore.go index fb46abd409..fc74a64f15 100644 --- a/pkg/cmd/cli/datamover/restore.go +++ b/pkg/cmd/cli/datamover/restore.go @@ -223,6 +223,7 @@ func (s *dataMoverRestore) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { s.cancelFunc() + dpService.Shutdown() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err) return } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index f0c0c1728d..12365e03cc 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -217,9 +217,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { if dd.Spec.Cancel { log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) - r.TryCancelDataDownload(ctx, dd, "") + r.tryCancelAcceptedDataDownload(ctx, dd, "") } else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil { - r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) + r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr) } else if dd.Status.StartTimestamp != nil { if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout { @@ -272,23 +272,35 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, err, "error to create data path", log) } } + + if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil { + log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", dd.Name) + + r.closeDataPath(ctx, dd.Name) + return r.errorOut(ctx, dd, err, "error initializing data path", log) + } + // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Unable to update status to in progress") - return ctrl.Result{}, err + log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name) + + r.closeDataPath(ctx, dd.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } log.Info("Data download is marked as in progress") - reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log) - if err != nil { - log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err) + if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", dd.Name) + r.closeDataPath(ctx, dd.Name) + return r.errorOut(ctx, dd, err, "error starting data path", log) } - return reconcileResult, err + + return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { log.Info("Data download is in progress") if dd.Spec.Cancel { @@ -331,27 +343,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } } -func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { +func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable dataDownload") + if err := asyncBR.Init(ctx, nil); err != nil { - return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log) + return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable dataDownload") + if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, dd.Spec.DataMoverConfig); err != nil { - return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) + return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - return ctrl.Result{}, nil + log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil } func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) log.Info("Async fs restore data path completed") @@ -384,9 +402,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na } func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) @@ -396,16 +412,12 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { log.WithError(getErr).Warn("Failed to get data download on failure") } else { - if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil { - log.WithError(err).Warnf("Failed to patch data download with err %v", errOut) - } + r.errorOut(ctx, &dd, err, "data path restore failed", log) } } func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) @@ -432,9 +444,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na } } -func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { +func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { log := r.logger.WithField("datadownload", dd.Name) - log.Warn("Async fs backup data path canceled") + log.Warn("Accepted data download is canceled") succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) { dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled @@ -442,7 +454,10 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - dataDownload.Status.Message = message + + if message != "" { + dataDownload.Status.Message = message + } }) if err != nil { @@ -456,7 +471,6 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * // success update r.metrics.RegisterDataDownloadCancel(r.nodeName) r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) - r.closeDataPath(ctx, dd.Name) } func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index c6c2697f7a..385870426f 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -192,6 +192,10 @@ func TestDataDownloadReconcile(t *testing.T) { isFSBRRestoreErr bool notNilExpose bool notMockCleanUp bool + mockInit bool + mockInitErr error + mockStart bool + mockStartErr error mockCancel bool mockClose bool expected *velerov2alpha1api.DataDownload @@ -264,13 +268,36 @@ func TestDataDownloadReconcile(t *testing.T) { expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, { - name: "Unable to update status to in progress for data download", + name: "data path init error", dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needErrs: []bool{false, false, false, true}, + mockInit: true, + mockInitErr: errors.New("fake-data-path-init-error"), + mockClose: true, notNilExpose: true, - notMockCleanUp: true, - expectedStatusMsg: "Patch error", + expectedStatusMsg: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data download", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + needErrs: []bool{false, false, false, true}, + mockInit: true, + mockClose: true, + notNilExpose: true, + notMockCleanUp: true, + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + mockInit: true, + mockStart: true, + mockStartErr: errors.New("fake-data-path-start-error"), + mockClose: true, + notNilExpose: true, + expectedStatusMsg: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error", }, { name: "accept DataDownload error", @@ -399,6 +426,14 @@ func TestDataDownloadReconcile(t *testing.T) { datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { asyncBR := datapathmockes.NewAsyncBR(t) + if test.mockInit { + asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr) + } + + if test.mockStart { + asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr) + } + if test.mockCancel { asyncBR.On("Cancel").Return() } @@ -488,6 +523,10 @@ func TestDataDownloadReconcile(t *testing.T) { assert.True(t, true, apierrors.IsNotFound(err)) } + if !test.needCreateFSBR { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name)) + } + t.Logf("%s: \n %v \n", test.name, dd) }) } @@ -845,7 +884,7 @@ func TestTryCancelDataDownload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataDownload(ctx, test.dd, "") + r.tryCancelAcceptedDataDownload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 91413a8ccf..b10f1f6367 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -230,9 +230,9 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action // we could retry when the CR requeue in periodcally log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase) - r.TryCancelDataUpload(ctx, du, "") + r.tryCancelAcceptedDataUpload(ctx, du, "") } else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil { - r.TryCancelDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) + r.tryCancelAcceptedDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr) } else if du.Status.StartTimestamp != nil { if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout { @@ -283,21 +283,35 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.errorOut(ctx, du, err, "error to create data path", log) } } + + if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil { + log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", du.Name) + + r.closeDataPath(ctx, du.Name) + return r.errorOut(ctx, du, err, "error initializing data path", log) + } + // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { - return r.errorOut(ctx, du, err, "error updating dataupload status", log) + log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name) + + r.closeDataPath(ctx, du.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } log.Info("Data upload is marked as in progress") - result, err := r.runCancelableDataUpload(ctx, asyncBR, du, res, log) - if err != nil { - log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err) + + if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", du.Name) r.closeDataPath(ctx, du.Name) + + return r.errorOut(ctx, du, err, "error starting data path", log) } - return result, err + + return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { log.Info("Data upload is in progress") if du.Spec.Cancel { @@ -340,29 +354,33 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } -func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { - log.Info("Run cancelable dataUpload") +func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable dataUpload") if err := asyncBR.Init(ctx, nil); err != nil { - return r.errorOut(ctx, du, err, "error to initialize asyncBR", log) + return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *DataUploadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable dataUpload") + if err := asyncBR.StartBackup(datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, du.Spec.DataMoverConfig, nil); err != nil { - return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) + return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - return ctrl.Result{}, nil + log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil } func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -406,9 +424,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp } func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -418,16 +434,12 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { log.WithError(getErr).Warn("Failed to get dataupload on failure") } else { - if _, errOut := r.errorOut(ctx, &du, err, "data path backup failed", log); err != nil { - log.WithError(err).Warnf("Failed to patch dataupload with err %v", errOut) - } + r.errorOut(ctx, &du, err, "data path backup failed", log) } } func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -453,17 +465,19 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp } } -// TryCancelDataUpload clear up resources only when update success -func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { +func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { log := r.logger.WithField("dataupload", du.Name) - log.Warn("Async fs backup data path canceled") + log.Warn("Accepted data upload is canceled") succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) { dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled if dataUpload.Status.StartTimestamp.IsZero() { dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - dataUpload.Status.Message = message + + if message != "" { + dataUpload.Status.Message = message + } }) if err != nil { @@ -478,7 +492,6 @@ func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *vele r.metrics.RegisterDataUploadCancel(r.nodeName) // cleans up any objects generated during the snapshot expose r.cleanUp(ctx, du, log) - r.closeDataPath(ctx, du.Name) } func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log *logrus.Entry) { @@ -692,7 +705,7 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a } se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) } else { - err = errors.Wrapf(err, "failed to clean up exposed snapshot with could not find %s snapshot exposer", du.Spec.SnapshotType) + log.Warnf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType) } return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log) diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index ee73372b1b..ea7603b904 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -306,20 +306,16 @@ type fakeDataUploadFSBR struct { du *velerov2alpha1api.DataUpload kubeClient kbclient.Client clock clock.WithTickerAndDelayedExecution + initErr error + startErr error } func (f *fakeDataUploadFSBR) Init(ctx context.Context, param interface{}) error { - return nil + return f.initErr } func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error { - du := f.du - original := f.du.DeepCopy() - du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted - du.Status.CompletionTimestamp = &metav1.Time{Time: f.clock.Now()} - f.kubeClient.Patch(context.Background(), du, kbclient.MergeFrom(original)) - - return nil + return f.startErr } func (f *fakeDataUploadFSBR) StartRestore(snapshotID string, target datapath.AccessPoint, uploaderConfigs map[string]string) error { @@ -348,27 +344,24 @@ func TestReconcile(t *testing.T) { needErrs []bool peekErr error notCreateFSBR bool + fsBRInitErr error + fsBRStartErr error }{ { - name: "Dataupload is not initialized", - du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), - expectedProcessed: false, - expected: nil, - expectedRequeue: ctrl.Result{}, + name: "Dataupload is not initialized", + du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Error get Dataupload", - du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), - expectedProcessed: false, - expected: nil, - expectedRequeue: ctrl.Result{}, - expectedErrMsg: "getting DataUpload: Get error", - needErrs: []bool{true, false, false, false}, + name: "Error get Dataupload", + du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), + expectedRequeue: ctrl.Result{}, + expectedErrMsg: "getting DataUpload: Get error", + needErrs: []bool{true, false, false, false}, }, { - name: "Unsupported data mover type", - du: dataUploadBuilder().DataMover("unknown type").Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase("").Result(), - expectedRequeue: ctrl.Result{}, + name: "Unsupported data mover type", + du: dataUploadBuilder().DataMover("unknown type").Result(), + expected: dataUploadBuilder().Phase("").Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Unknown type of snapshot exposer is not initialized", du: dataUploadBuilder().SnapshotType("unknown type").Result(), @@ -377,13 +370,12 @@ func TestReconcile(t *testing.T) { expectedRequeue: ctrl.Result{}, expectedErrMsg: "unknown type type of snapshot exposer is not exist", }, { - name: "Dataupload should be accepted", - du: dataUploadBuilder().Result(), - pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), - pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload should be accepted", + du: dataUploadBuilder().Result(), + pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), + pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Dataupload should fail to get PVC information", @@ -395,34 +387,31 @@ func TestReconcile(t *testing.T) { expectedErrMsg: "failed to get PVC", }, { - name: "Dataupload should be prepared", - du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{}, - }, { - name: "Dataupload prepared should be completed", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCompleted).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload should be prepared", + du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Dataupload with not enabled cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload prepared should be completed", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Dataupload should be cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload with not enabled cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload should be cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Dataupload should be cancel with match node", @@ -445,19 +434,43 @@ func TestReconcile(t *testing.T) { du.Status.Node = "different_node" return du }(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, - notCreateFSBR: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + notCreateFSBR: true, + }, + { + name: "runCancelableDataUpload is concurrent limited", + dataMgr: datapath.NewManager(0), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, { - name: "runCancelableDataUpload is concurrent limited", - dataMgr: datapath.NewManager(0), + name: "data path init error", pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + fsBRInitErr: errors.New("fake-data-path-init-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), + expectedErrMsg: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data download", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + needErrs: []bool{false, false, false, true}, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + fsBRStartErr: errors.New("fake-data-path-start-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), + expectedErrMsg: "error starting async backup for pod dataupload-1, volume dataupload-1: fake-data-path-start-error", }, { name: "prepare timeout", @@ -480,7 +493,6 @@ func TestReconcile(t *testing.T) { du.DeletionTimestamp = &metav1.Time{Time: time.Now()} return du }(), - expectedProcessed: false, checkFunc: func(du velerov2alpha1api.DataUpload) bool { return du.Spec.Cancel }, @@ -496,7 +508,6 @@ func TestReconcile(t *testing.T) { du.DeletionTimestamp = &metav1.Time{Time: time.Now()} return du }(), - expectedProcessed: false, checkFunc: func(du velerov2alpha1api.DataUpload) bool { return !controllerutil.ContainsFinalizer(&du, DataUploadDownloadFinalizer) }, @@ -555,12 +566,16 @@ func TestReconcile(t *testing.T) { du: test.du, kubeClient: r.client, clock: r.Clock, + initErr: test.fsBRInitErr, + startErr: test.fsBRStartErr, } } } + testCreateFsBR := false if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR { if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { + testCreateFsBR = true _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger()) require.NoError(t, err) } @@ -605,6 +620,11 @@ func TestReconcile(t *testing.T) { if test.checkFunc != nil { assert.True(t, test.checkFunc(du)) } + + if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) + } + }) } } @@ -926,7 +946,7 @@ func TestTryCancelDataUpload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataUpload(ctx, test.dd, "") + r.tryCancelAcceptedDataUpload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) diff --git a/pkg/datamover/backup_micro_service.go b/pkg/datamover/backup_micro_service.go index cedacc0ce4..fa48b3523e 100644 --- a/pkg/datamover/backup_micro_service.go +++ b/pkg/datamover/backup_micro_service.go @@ -127,15 +127,13 @@ func (r *BackupMicroService) Init() error { return err } -var waitControllerTimeout time.Duration = time.Minute * 2 - func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) { log := r.logger.WithFields(logrus.Fields{ "dataupload": r.dataUploadName, }) du := &velerov2alpha1api.DataUpload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataUploadName, @@ -241,8 +239,6 @@ func (r *BackupMicroService) Shutdown() { var funcMarshal = json.Marshal func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) backupBytes, err := funcMarshal(result.Backup) @@ -262,8 +258,6 @@ func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespac } func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) log.WithError(err).Error("Async fs backup data path failed") @@ -274,8 +268,6 @@ func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace s } func (r *BackupMicroService) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) log.Warn("Async fs backup data path canceled") @@ -296,8 +288,6 @@ func (r *BackupMicroService) OnDataUploadProgress(ctx context.Context, namespace return } - log.Infof("Sending event for progress %v (%s)", progress, string(progressBytes)) - r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonProgress, string(progressBytes)) } @@ -313,7 +303,7 @@ func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) { func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) { r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled") - r.eventRecorder.Event(du, false, "Canceling", "Canceling for data upload %s", du.Name) + r.eventRecorder.Event(du, false, datapath.EventReasonCancelling, "Canceling for data upload %s", du.Name) fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) if fsBackup == nil { diff --git a/pkg/datamover/backup_micro_service_test.go b/pkg/datamover/backup_micro_service_test.go index 9db38d4c0d..cca428ee88 100644 --- a/pkg/datamover/backup_micro_service_test.go +++ b/pkg/datamover/backup_micro_service_test.go @@ -301,12 +301,12 @@ func TestRunCancelableDataPath(t *testing.T) { }{ { name: "no du", - ctx: context.Background(), + ctx: ctxTimeout, expectedErr: "error waiting for du: context deadline exceeded", }, { name: "du not in in-progress", - ctx: context.Background(), + ctx: ctxTimeout, kubeClientObj: []runtime.Object{du}, expectedErr: "error waiting for du: context deadline exceeded", }, @@ -412,8 +412,6 @@ func TestRunCancelableDataPath(t *testing.T) { return fsBR } - waitControllerTimeout = time.Second - if test.result != nil { go func() { time.Sleep(time.Millisecond * 500) diff --git a/pkg/datamover/restore_micro_service.go b/pkg/datamover/restore_micro_service.go index 508469701a..d0a4c6f50c 100644 --- a/pkg/datamover/restore_micro_service.go +++ b/pkg/datamover/restore_micro_service.go @@ -122,7 +122,7 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string }) dd := &velerov2alpha1api.DataDownload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataDownloadName, @@ -214,8 +214,6 @@ func (r *RestoreMicroService) Shutdown() { } func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) restoreBytes, err := funcMarshal(result.Restore) @@ -235,8 +233,6 @@ func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, names } func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) log.WithError(err).Error("Async fs restore data path failed") @@ -247,8 +243,6 @@ func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespac } func (r *RestoreMicroService) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) log.Warn("Async fs restore data path canceled") @@ -284,7 +278,7 @@ func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) func (r *RestoreMicroService) cancelDataDownload(dd *velerov2alpha1api.DataDownload) { r.logger.WithField("DataDownload", dd.Name).Info("Data download is being canceled") - r.eventRecorder.Event(dd, false, "Canceling", "Canceling for data download %s", dd.Name) + r.eventRecorder.Event(dd, false, datapath.EventReasonCancelling, "Canceling for data download %s", dd.Name) fsBackup := r.dataPathMgr.GetAsyncBR(dd.Name) if fsBackup == nil { diff --git a/pkg/datamover/restore_micro_service_test.go b/pkg/datamover/restore_micro_service_test.go index e3ef8701da..8a3ed61e1f 100644 --- a/pkg/datamover/restore_micro_service_test.go +++ b/pkg/datamover/restore_micro_service_test.go @@ -254,12 +254,12 @@ func TestRunCancelableRestore(t *testing.T) { }{ { name: "no dd", - ctx: context.Background(), + ctx: ctxTimeout, expectedErr: "error waiting for dd: context deadline exceeded", }, { name: "dd not in in-progress", - ctx: context.Background(), + ctx: ctxTimeout, kubeClientObj: []runtime.Object{dd}, expectedErr: "error waiting for dd: context deadline exceeded", }, @@ -365,8 +365,6 @@ func TestRunCancelableRestore(t *testing.T) { return fsBR } - waitControllerTimeout = time.Second - if test.result != nil { go func() { time.Sleep(time.Millisecond * 500) diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 762c91b188..5d3b54f281 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -18,6 +18,7 @@ package datapath import ( "context" + "sync" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -66,6 +67,8 @@ type fileSystemBR struct { callbacks Callbacks jobName string requestorType string + wgDataPath sync.WaitGroup + dataPathLock sync.Mutex } func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR { @@ -75,6 +78,7 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client, client: client, namespace: namespace, callbacks: callbacks, + wgDataPath: sync.WaitGroup{}, log: log, } @@ -134,6 +138,23 @@ func (fs *fileSystemBR) Init(ctx context.Context, param interface{}) error { } func (fs *fileSystemBR) Close(ctx context.Context) { + if fs.cancel != nil { + fs.cancel() + } + + fs.log.WithField("user", fs.jobName).Info("Closing FileSystemBR") + + fs.wgDataPath.Wait() + + fs.close(ctx) + + fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") +} + +func (fs *fileSystemBR) close(ctx context.Context) { + fs.dataPathLock.Lock() + defer fs.dataPathLock.Unlock() + if fs.uploaderProv != nil { if err := fs.uploaderProv.Close(ctx); err != nil { fs.log.Errorf("failed to close uploader provider with error %v", err) @@ -141,13 +162,6 @@ func (fs *fileSystemBR) Close(ctx context.Context) { fs.uploaderProv = nil } - - if fs.cancel != nil { - fs.cancel() - fs.cancel = nil - } - - fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") } func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { @@ -155,9 +169,18 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin return errors.New("file system data path is not initialized") } + fs.wgDataPath.Add(1) + backupParam := param.(*FSBRStartParam) go func() { + fs.log.Info("Start data path backup") + + defer func() { + fs.close(context.Background()) + fs.wgDataPath.Done() + }() + snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull, backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs) @@ -182,7 +205,16 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo return errors.New("file system data path is not initialized") } + fs.wgDataPath.Add(1) + go func() { + fs.log.Info("Start data path restore") + + defer func() { + fs.close(context.Background()) + fs.wgDataPath.Done() + }() + err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs) if err == provider.ErrorCanceled { diff --git a/pkg/datapath/file_system_test.go b/pkg/datapath/file_system_test.go index 85c6df08d9..fab33df1c0 100644 --- a/pkg/datapath/file_system_test.go +++ b/pkg/datapath/file_system_test.go @@ -96,6 +96,7 @@ func TestAsyncBackup(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) + mockProvider.On("Close", mock.Anything).Return(nil) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks @@ -179,6 +180,7 @@ func TestAsyncRestore(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err) + mockProvider.On("Close", mock.Anything).Return(nil) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go index a5826459a4..8a129bde8f 100644 --- a/pkg/datapath/micro_service_watcher.go +++ b/pkg/datapath/micro_service_watcher.go @@ -46,11 +46,12 @@ const ( ErrCancelled = "data path is canceled" - EventReasonStarted = "Data-Path-Started" - EventReasonCompleted = "Data-Path-Completed" - EventReasonFailed = "Data-Path-Failed" - EventReasonCancelled = "Data-Path-Canceled" - EventReasonProgress = "Data-Path-Progress" + EventReasonStarted = "Data-Path-Started" + EventReasonCompleted = "Data-Path-Completed" + EventReasonFailed = "Data-Path-Failed" + EventReasonCancelled = "Data-Path-Canceled" + EventReasonProgress = "Data-Path-Progress" + EventReasonCancelling = "Data-Path-Canceling" ) type microServiceBRWatcher struct { @@ -76,6 +77,7 @@ type microServiceBRWatcher struct { podInformer ctrlcache.Informer eventHandler cache.ResourceEventHandlerRegistration podHandler cache.ResourceEventHandlerRegistration + watcherLock sync.Mutex } func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string, @@ -121,8 +123,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er return } - ms.log.Infof("Pushed adding event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) - ms.eventCh <- evt }, UpdateFunc: func(_, obj interface{}) { @@ -131,8 +131,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er return } - ms.log.Infof("Pushed updating event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) - ms.eventCh <- evt }, }, @@ -177,12 +175,9 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er } }() - ms.log.WithFields( - logrus.Fields{ - "taskType": ms.taskType, - "taskName": ms.taskName, - "thisPod": ms.thisPod, - }).Info("MicroServiceBR is initialized") + if err := ms.reEnsureThisPod(ctx); err != nil { + return err + } ms.eventInformer = eventInformer ms.podInformer = podInformer @@ -191,42 +186,56 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er ms.ctx, ms.cancel = context.WithCancel(ctx) + ms.log.WithFields( + logrus.Fields{ + "taskType": ms.taskType, + "taskName": ms.taskName, + "thisPod": ms.thisPod, + }).Info("MicroServiceBR is initialized") + succeeded = true return nil + } func (ms *microServiceBRWatcher) Close(ctx context.Context) { if ms.cancel != nil { ms.cancel() - ms.cancel = nil } ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR") ms.wgWatcher.Wait() - if ms.eventInformer != nil && ms.eventHandler != nil { + ms.close() + + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") +} + +func (ms *microServiceBRWatcher) close() { + ms.watcherLock.Lock() + defer ms.watcherLock.Unlock() + + if ms.eventHandler != nil { if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil { ms.log.WithError(err).Warn("Failed to remove event handler") } + + ms.eventHandler = nil } - if ms.podInformer != nil && ms.podHandler != nil { + if ms.podHandler != nil { if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil { ms.log.WithError(err).Warn("Failed to remove pod handler") } - } - ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") + ms.podHandler = nil + } } func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { - ms.log.Infof("Start watching backup ms for source %v", source) - - if err := ms.reEnsureThisPod(); err != nil { - return err - } + ms.log.Infof("Start watching backup ms for source %v", source.ByPath) ms.startWatch() @@ -234,20 +243,16 @@ func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig } func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error { - ms.log.Infof("Start watching restore ms to target %v, from snapshot %s", target, snapshotID) - - if err := ms.reEnsureThisPod(); err != nil { - return err - } + ms.log.Infof("Start watching restore ms to target %s, from snapshot %s", target.ByPath, snapshotID) ms.startWatch() return nil } -func (ms *microServiceBRWatcher) reEnsureThisPod() error { +func (ms *microServiceBRWatcher) reEnsureThisPod(ctx context.Context) error { thisPod := &v1.Pod{} - if err := ms.client.Get(ms.ctx, types.NamespacedName{ + if err := ms.client.Get(ctx, types.NamespacedName{ Namespace: ms.namespace, Name: ms.thisPod, }, thisPod); err != nil { @@ -275,6 +280,11 @@ func (ms *microServiceBRWatcher) startWatch() { go func() { ms.log.Info("Start watching data path pod") + defer func() { + ms.close() + ms.wgWatcher.Done() + }() + var lastPod *v1.Pod watchLoop: @@ -291,14 +301,16 @@ func (ms *microServiceBRWatcher) startWatch() { } if lastPod == nil { - ms.log.Warn("Data path pod watch loop is canceled") - ms.wgWatcher.Done() + ms.log.Warn("Watch loop is cancelled on waiting data path pod") return } epilogLoop: for !ms.startedFromEvent || !ms.terminatedFromEvent { select { + case <-ms.ctx.Done(): + ms.log.Warn("Watch loop is cancelled on waiting final event") + return case <-time.After(eventWaitTimeout): break epilogLoop case evt := <-ms.eventCh: @@ -339,8 +351,6 @@ func (ms *microServiceBRWatcher) startWatch() { } logger.Info("Complete callback on data path pod termination") - - ms.wgWatcher.Done() }() } @@ -348,20 +358,22 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { switch evt.Reason { case EventReasonStarted: ms.startedFromEvent = true - ms.log.Infof("Received data path start message %s", evt.Message) + ms.log.Infof("Received data path start message: %s", evt.Message) case EventReasonProgress: ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log)) case EventReasonCompleted: - ms.log.Infof("Received data path completed message %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) + ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) ms.terminatedFromEvent = true case EventReasonCancelled: - ms.log.Infof("Received data path canceled message %s", evt.Message) + ms.log.Infof("Received data path canceled message: %s", evt.Message) ms.terminatedFromEvent = true case EventReasonFailed: - ms.log.Infof("Received data path failed message %s", evt.Message) + ms.log.Infof("Received data path failed message: %s", evt.Message) ms.terminatedFromEvent = true + case EventReasonCancelling: + ms.log.Infof("Received data path canceling message: %s", evt.Message) default: - ms.log.Debugf("Received event for data mover %s.[reason %s, message %s]", ms.taskName, evt.Reason, evt.Message) + ms.log.Infof("Received event for data path %s,reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) } } diff --git a/pkg/datapath/micro_service_watcher_test.go b/pkg/datapath/micro_service_watcher_test.go index f10f6b3310..f926363e07 100644 --- a/pkg/datapath/micro_service_watcher_test.go +++ b/pkg/datapath/micro_service_watcher_test.go @@ -102,7 +102,7 @@ func TestReEnsureThisPod(t *testing.T) { log: velerotest.NewLogger(), } - err := ms.reEnsureThisPod() + err := ms.reEnsureThisPod(context.Background()) if test.expectErr != "" { assert.EqualError(t, err, test.expectErr) } else {