Skip to content

Commit

Permalink
Merge pull request #7611 from qiuming-best/datamover-cancel
Browse files Browse the repository at this point in the history
Fix cancel bug && adjust StartTimestamp for data mover
  • Loading branch information
blackpiglet authored Apr 3, 2024
2 parents dcd62b9 + a2c1a5a commit c2d267d
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 15 deletions.
8 changes: 6 additions & 2 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
Expand All @@ -290,7 +291,11 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
log.Info("Data download is being canceled")
fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
if fsRestore == nil {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
if r.nodeName == dd.Status.Node {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
} else {
log.Info("Data path is not started in this node and will not canceled by current node")
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -668,7 +673,6 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel

updateFunc := func(datadownload *velerov2alpha1api.DataDownload) {
datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
datadownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
labels := datadownload.GetLabels()
if labels == nil {
labels = make(map[string]string)
Expand Down
35 changes: 32 additions & 3 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...

dataPathMgr := datapath.NewManager(1)

return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func TestDataDownloadReconcile(t *testing.T) {
Expand Down Expand Up @@ -207,11 +207,40 @@ func TestDataDownloadReconcile(t *testing.T) {
mockCancel: true,
},
{
name: "Cancel data downloand in progress",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result(),
name: "Cancel data downloand in progress with create FSBR",
dd: func() *velerov2alpha1api.DataDownload {
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result()
dd.Status.Node = "test-node"
return dd
}(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needCreateFSBR: true,
mockCancel: true,
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceling).Result(),
},
{
name: "Cancel data downloand in progress without create FSBR",
dd: func() *velerov2alpha1api.DataDownload {
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result()
dd.Status.Node = "test-node"
return dd
}(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needCreateFSBR: false,
mockCancel: true,
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "Cancel data downloand in progress in different node",
dd: func() *velerov2alpha1api.DataDownload {
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result()
dd.Status.Node = "different-node"
return dd
}(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needCreateFSBR: false,
mockCancel: true,
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
},
{
name: "Error in data path is concurrent limited",
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Update status to InProgress
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
return r.errorOut(ctx, du, err, "error updating dataupload status", log)
}
Expand All @@ -300,7 +301,11 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)

fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup == nil {
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
if du.Status.Node == r.nodeName {
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
} else {
log.Info("Data path is not started in this node and will not canceled by current node")
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -720,7 +725,6 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov

updateFunc := func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted
dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
labels := dataUpload.GetLabels()
if labels == nil {
labels = make(map[string]string)
Expand Down
44 changes: 36 additions & 8 deletions pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
return nil, err
}
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil,
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func dataUploadBuilder() *builder.DataUploadBuilder {
Expand Down Expand Up @@ -336,6 +336,7 @@ func TestReconcile(t *testing.T) {
expectedErrMsg string
needErrs []bool
peekErr error
notCreateFSBR bool
}{
{
name: "Dataupload is not initialized",
Expand Down Expand Up @@ -412,6 +413,32 @@ func TestReconcile(t *testing.T) {
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should be cancel with match node",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: func() *velerov2alpha1api.DataUpload {
du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result()
du.Status.Node = "test-node"
return du
}(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
expectedRequeue: ctrl.Result{},
notCreateFSBR: true,
},
{
name: "Dataupload should not be cancel with dismatch node",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: func() *velerov2alpha1api.DataUpload {
du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result()
du.Status.Node = "different_node"
return du
}(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{},
notCreateFSBR: true,
},
{
name: "runCancelableDataUpload is concurrent limited",
dataMgr: datapath.NewManager(0),
Expand Down Expand Up @@ -511,16 +538,17 @@ func TestReconcile(t *testing.T) {
} else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())}
}

datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
return &fakeDataUploadFSBR{
du: test.du,
kubeClient: r.client,
clock: r.Clock,
if !test.notCreateFSBR {
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
return &fakeDataUploadFSBR{
du: test.du,
kubeClient: r.client,
clock: r.Clock,
}
}
}

if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR {
if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil {
_, err := r.dataPathMgr.CreateFileSystemBR(test.du.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, velerotest.NewLogger())
require.NoError(t, err)
Expand Down

0 comments on commit c2d267d

Please sign in to comment.