Skip to content

Commit

Permalink
data mover ms smoking test
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Aug 9, 2024
1 parent dd3d05b commit 4dea3a4
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 210 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/cli/datamover/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (s *dataMoverBackup) runDataPath() {

result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err)
return
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/cli/datamover/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (s *dataMoverRestore) runDataPath() {
result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
s.cancelFunc()
dpService.Shutdown()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err)
return
}
Expand Down
72 changes: 43 additions & 29 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.TryCancelDataDownload(ctx, dd, "")
r.tryCancelAcceptedDataDownload(ctx, dd, "")
} else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr)
} else if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
Expand Down Expand Up @@ -272,23 +272,35 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
}

if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil {
log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", dd.Name)

r.closeDataPath(ctx, dd.Name)
return r.errorOut(ctx, dd, err, "error initializing data path", log)
}

// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name)

r.closeDataPath(ctx, dd.Name)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}

log.Info("Data download is marked as in progress")

reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err)
if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil {
log.WithError(err).Errorf("Failed to start cancelable data path for %s", dd.Name)

r.closeDataPath(ctx, dd.Name)
return r.errorOut(ctx, dd, err, "error starting data path", log)
}
return reconcileResult, err

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
log.Info("Data download is in progress")
if dd.Spec.Cancel {
Expand Down Expand Up @@ -331,27 +343,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Init cancelable dataDownload")

if err := asyncBR.Init(ctx, nil); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log)
return errors.Wrap(err, "error initializing asyncBR")
}

log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)

return nil
}

func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Start cancelable dataDownload")

if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, dd.Spec.DataMoverConfig); err != nil {
return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log)
return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
}

log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)
return ctrl.Result{}, nil
log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}

func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)

log := r.logger.WithField("datadownload", ddName)
log.Info("Async fs restore data path completed")
Expand Down Expand Up @@ -384,9 +402,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
}

func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)

log := r.logger.WithField("datadownload", ddName)

Expand All @@ -396,16 +412,12 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names
if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil {
log.WithError(getErr).Warn("Failed to get data download on failure")
} else {
if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil {
log.WithError(err).Warnf("Failed to patch data download with err %v", errOut)
}
r.errorOut(ctx, &dd, err, "data path restore failed", log)
}
}

func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)

log := r.logger.WithField("datadownload", ddName)

Expand All @@ -432,17 +444,20 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
}
}

func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) {
func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) {
log := r.logger.WithField("datadownload", dd.Name)
log.Warn("Async fs backup data path canceled")
log.Warn("Accepted data download is canceled")

succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dataDownload.Status.StartTimestamp.IsZero() {
dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
dataDownload.Status.Message = message

if message != "" {
dataDownload.Status.Message = message
}
})

if err != nil {
Expand All @@ -456,7 +471,6 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *
// success update
r.metrics.RegisterDataDownloadCancel(r.nodeName)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
r.closeDataPath(ctx, dd.Name)
}

func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {
Expand Down
49 changes: 44 additions & 5 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ func TestDataDownloadReconcile(t *testing.T) {
isFSBRRestoreErr bool
notNilExpose bool
notMockCleanUp bool
mockInit bool
mockInitErr error
mockStart bool
mockStartErr error
mockCancel bool
mockClose bool
expected *velerov2alpha1api.DataDownload
Expand Down Expand Up @@ -264,13 +268,36 @@ func TestDataDownloadReconcile(t *testing.T) {
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "Unable to update status to in progress for data download",
name: "data path init error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needErrs: []bool{false, false, false, true},
mockInit: true,
mockInitErr: errors.New("fake-data-path-init-error"),
mockClose: true,
notNilExpose: true,
notMockCleanUp: true,
expectedStatusMsg: "Patch error",
expectedStatusMsg: "error initializing asyncBR: fake-data-path-init-error",
},
{
name: "Unable to update status to in progress for data download",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needErrs: []bool{false, false, false, true},
mockInit: true,
mockClose: true,
notNilExpose: true,
notMockCleanUp: true,
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "data path start error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
mockInit: true,
mockStart: true,
mockStartErr: errors.New("fake-data-path-start-error"),
mockClose: true,
notNilExpose: true,
expectedStatusMsg: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error",
},
{
name: "accept DataDownload error",
Expand Down Expand Up @@ -399,6 +426,14 @@ func TestDataDownloadReconcile(t *testing.T) {
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string,
string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
asyncBR := datapathmockes.NewAsyncBR(t)
if test.mockInit {
asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr)
}

if test.mockStart {
asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr)
}

if test.mockCancel {
asyncBR.On("Cancel").Return()
}
Expand Down Expand Up @@ -488,6 +523,10 @@ func TestDataDownloadReconcile(t *testing.T) {
assert.True(t, true, apierrors.IsNotFound(err))
}

if !test.needCreateFSBR {
assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name))
}

t.Logf("%s: \n %v \n", test.name, dd)
})
}
Expand Down Expand Up @@ -845,7 +884,7 @@ func TestTryCancelDataDownload(t *testing.T) {
err = r.client.Create(ctx, test.dd)
require.NoError(t, err)

r.TryCancelDataDownload(ctx, test.dd, "")
r.tryCancelAcceptedDataDownload(ctx, test.dd, "")

if test.expectedErr == "" {
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit 4dea3a4

Please sign in to comment.