Skip to content

Commit

Permalink
Merge pull request #6446 from qiuming-best/data-path-concurrent
Browse files Browse the repository at this point in the history
Fix data path concurrent
  • Loading branch information
Lyndon-Li authored Jul 3, 2023
2 parents dcdd5f9 + 22a99c3 commit 98803bb
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 55 deletions.
54 changes: 29 additions & 25 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,25 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

log.Info("Restore PVC is ready")
log.Info("Restore PVC is ready and creating data path routine")

// Need to first create file system BR and get data path instance then update data upload status
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataDownloadCompleted,
OnFailed: r.OnDataDownloadFailed,
OnCancelled: r.OnDataDownloadCancelled,
OnProgress: r.OnDataDownloadProgress,
}

fsRestore, err = r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
} else {
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
}

// Update status to InProgress
original := dd.DeepCopy()
Expand All @@ -174,7 +192,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

log.Info("Data download is marked as in progress")

return r.runCancelableDataPath(ctx, dd, result, log)
reconcileResult, err := r.runCancelableDataPath(ctx, fsRestore, dd, result, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err)
r.closeDataPath(ctx, dd.Name)
}
return reconcileResult, err
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
log.Info("Data download is in progress")
if dd.Spec.Cancel {
Expand Down Expand Up @@ -203,25 +226,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Creating data path routine")
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataDownloadCompleted,
OnFailed: r.OnDataDownloadFailed,
OnCancelled: r.OnDataDownloadCancelled,
OnProgress: r.OnDataDownloadProgress,
}

fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("runCancelableDataDownload is concurrent limited")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
} else {
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
}

func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, fsRestore datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log)
Expand Down Expand Up @@ -437,12 +442,11 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}

if err = r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataDownload status")
return err
if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
}

return nil
return err
}

func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,12 @@ func TestDataDownloadReconcile(t *testing.T) {
},
})

require.Nil(t, err)
if test.isGetExposeErr {
assert.Contains(t, err.Error(), test.expectedStatusMsg)
} else {
require.Nil(t, err)
}

require.NotNil(t, actualResult)

dd := velerov2alpha1api.DataDownload{}
Expand Down
55 changes: 30 additions & 25 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,25 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

log.Info("Exposed snapshot is ready")
log.Info("Exposed snapshot is ready and creating data path routine")

// Need to first create file system BR and get data path instance then update data upload status
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataUploadCompleted,
OnFailed: r.OnDataUploadFailed,
OnCancelled: r.OnDataUploadCancelled,
OnProgress: r.OnDataUploadProgress,
}

fsBackup, err = r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
} else {
return r.errorOut(ctx, &du, err, "error to create data path", log)
}
}

// Update status to InProgress
original := du.DeepCopy()
Expand All @@ -171,7 +189,12 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

log.Info("Data upload is marked as in progress")
return r.runCancelableDataUpload(ctx, &du, res, log)
result, err := r.runCancelableDataUpload(ctx, fsBackup, &du, res, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err)
r.closeDataPath(ctx, du.Name)
}
return result, err
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
log.Info("Data upload is in progress")
if du.Spec.Cancel {
Expand All @@ -198,25 +221,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}

func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Creating data path routine")
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataUploadCompleted,
OnFailed: r.OnDataUploadFailed,
OnCancelled: r.OnDataUploadCancelled,
OnProgress: r.OnDataUploadProgress,
}

fsBackup, err := r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("runCancelableDataUpload is concurrent limited")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
} else {
return r.errorOut(ctx, du, err, "error to create data path", log)
}
}

func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Run cancelable dataUpload")
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log)
Expand Down Expand Up @@ -460,12 +466,11 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
}

du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
if err = r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataUpload status")
return err
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataUpload status")
}

return nil
return err
}

func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) (bool, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ func TestReconcile(t *testing.T) {
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
},
{
expectedErrMsg: "unknown type type of snapshot exposer is not exist",
}, {
name: "Dataupload should be accepted",
du: dataUploadBuilder().Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestReconcile(t *testing.T) {
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Cancel(true).Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute},
},
}
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestReconcile(t *testing.T) {
if test.expectedErrMsg == "" {
require.NoError(t, err)
} else {
assert.Equal(t, err.Error(), test.expectedErrMsg)
assert.Contains(t, err.Error(), test.expectedErrMsg)
}

du := velerov2alpha1api.DataUpload{}
Expand Down

0 comments on commit 98803bb

Please sign in to comment.