diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 1f0e78c69b..95531cb6b5 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -161,7 +161,25 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } - log.Info("Restore PVC is ready") + log.Info("Restore PVC is ready and creating data path routine") + + // Need to first create file system BR and get data path instance then update data upload status + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataDownloadCompleted, + OnFailed: r.OnDataDownloadFailed, + OnCancelled: r.OnDataDownloadCancelled, + OnProgress: r.OnDataDownloadProgress, + } + + fsRestore, err = r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log) + if err != nil { + if err == datapath.ConcurrentLimitExceed { + log.Info("Data path instance is concurrent limited requeue later") + return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil + } else { + return r.errorOut(ctx, dd, err, "error to create data path", log) + } + } // Update status to InProgress original := dd.DeepCopy() @@ -174,7 +192,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request log.Info("Data download is marked as in progress") - return r.runCancelableDataPath(ctx, dd, result, log) + reconcileResult, err := r.runCancelableDataPath(ctx, fsRestore, dd, result, log) + if err != nil { + log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err) + r.closeDataPath(ctx, dd.Name) + } + return reconcileResult, err } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { log.Info("Data download is in progress") if dd.Spec.Cancel { @@ -203,25 +226,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } } -func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { - log.Info("Creating data path routine") - callbacks := datapath.Callbacks{ - OnCompleted: r.OnDataDownloadCompleted, - OnFailed: r.OnDataDownloadFailed, - OnCancelled: r.OnDataDownloadCancelled, - OnProgress: r.OnDataDownloadProgress, - } - - fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log) - if err != nil { - if err == datapath.ConcurrentLimitExceed { - log.Info("runCancelableDataDownload is concurrent limited") - return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil - } else { - return r.errorOut(ctx, dd, err, "error to create data path", log) - } - } - +func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, fsRestore datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log) if err != nil { return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log) @@ -437,12 +442,11 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v dd.Status.Message = errors.WithMessage(err, msg).Error() dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} - if err = r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error updating DataDownload status") - return err + if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil { + log.WithError(patchErr).Error("error updating DataDownload status") } - return nil + return err } func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 5db1170c77..7ccf20e4e6 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -192,7 +192,12 @@ func TestDataDownloadReconcile(t *testing.T) { }, }) - require.Nil(t, err) + if test.isGetExposeErr { + assert.Contains(t, err.Error(), test.expectedStatusMsg) + } else { + require.Nil(t, err) + } + require.NotNil(t, actualResult) dd := velerov2alpha1api.DataDownload{} diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 759b297a19..735026cda2 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -160,7 +160,25 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - log.Info("Exposed snapshot is ready") + log.Info("Exposed snapshot is ready and creating data path routine") + + // Need to first create file system BR and get data path instance then update data upload status + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataUploadCompleted, + OnFailed: r.OnDataUploadFailed, + OnCancelled: r.OnDataUploadCancelled, + OnProgress: r.OnDataUploadProgress, + } + + fsBackup, err = r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log) + if err != nil { + if err == datapath.ConcurrentLimitExceed { + log.Info("Data path instance is concurrent limited requeue later") + return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil + } else { + return r.errorOut(ctx, &du, err, "error to create data path", log) + } + } // Update status to InProgress original := du.DeepCopy() @@ -171,7 +189,12 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } log.Info("Data upload is marked as in progress") - return r.runCancelableDataUpload(ctx, &du, res, log) + result, err := r.runCancelableDataUpload(ctx, fsBackup, &du, res, log) + if err != nil { + log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err) + r.closeDataPath(ctx, du.Name) + } + return result, err } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { log.Info("Data upload is in progress") if du.Spec.Cancel { @@ -198,25 +221,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } -func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { - log.Info("Creating data path routine") - callbacks := datapath.Callbacks{ - OnCompleted: r.OnDataUploadCompleted, - OnFailed: r.OnDataUploadFailed, - OnCancelled: r.OnDataUploadCancelled, - OnProgress: r.OnDataUploadProgress, - } - - fsBackup, err := r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log) - if err != nil { - if err == datapath.ConcurrentLimitExceed { - log.Info("runCancelableDataUpload is concurrent limited") - return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil - } else { - return r.errorOut(ctx, du, err, "error to create data path", log) - } - } - +func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { + log.Info("Run cancelable dataUpload") path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log) if err != nil { return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log) @@ -460,12 +466,11 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel } du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} - if err = r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error updating DataUpload status") - return err + if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil { + log.WithError(patchErr).Error("error updating DataUpload status") } - return nil + return err } func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) (bool, error) { diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 0515a05a4d..654e07531f 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -291,8 +291,8 @@ func TestReconcile(t *testing.T) { expectedProcessed: true, expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), expectedRequeue: ctrl.Result{}, - }, - { + expectedErrMsg: "unknown type type of snapshot exposer is not exist", + }, { name: "Dataupload should be accepted", du: dataUploadBuilder().Result(), pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), @@ -336,7 +336,7 @@ func TestReconcile(t *testing.T) { pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Cancel(true).Result(), expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, }, } @@ -400,7 +400,7 @@ func TestReconcile(t *testing.T) { if test.expectedErrMsg == "" { require.NoError(t, err) } else { - assert.Equal(t, err.Error(), test.expectedErrMsg) + assert.Contains(t, err.Error(), test.expectedErrMsg) } du := velerov2alpha1api.DataUpload{}