Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark dataupload/datadownload in cancel when velero pod restart #6461

Merged
merged 2 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,15 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger).SetupWithManager(s.mgr); err != nil {
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger)
s.markDataUploadsCancel(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger).SetupWithManager(s.mgr); err != nil {
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger)
s.markDataDownloadsCancel(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}

Expand Down Expand Up @@ -337,6 +341,62 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
s.markInProgressPVRsFailed(client)
}

func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconciler) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if dataUploads, err := r.FindDataUploads(s.ctx, client, s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
} else {
for i := range dataUploads {
du := dataUploads[i]
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
Lyndon-Li marked this conversation as resolved.
Show resolved Hide resolved
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", du.GetName())
continue
}
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}
}
}

func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReconciler) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if dataDownloads, err := r.FindDataDownloads(s.ctx, client, s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
} else {
for i := range dataDownloads {
dd := dataDownloads[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
Lyndon-Li marked this conversation as resolved.
Show resolved Hide resolved
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(dd)); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
continue
}
s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}
}
}

func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
pvbs := &velerov1api.PodVolumeBackupList{}
if err := client.List(s.ctx, pvbs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil {
Expand Down
56 changes: 54 additions & 2 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client,
}

for i, backup := range backups.Items {
if backup.Status.Phase != velerov1api.BackupPhaseInProgress {
if backup.Status.Phase != velerov1api.BackupPhaseInProgress && backup.Status.Phase != velerov1api.BackupPhaseWaitingForPluginOperations {
log.Debugf("the status of backup %q is %q, skip", backup.GetName(), backup.Status.Phase)
continue
}
Expand All @@ -1078,6 +1078,7 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client,
continue
}
log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason)
markDataUploadsCancel(ctx, client, backup, log)
}
}

Expand All @@ -1088,7 +1089,7 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client,
return
}
for i, restore := range restores.Items {
if restore.Status.Phase != velerov1api.RestorePhaseInProgress {
if restore.Status.Phase != velerov1api.RestorePhaseInProgress && restore.Status.Phase != velerov1api.RestorePhaseWaitingForPluginOperations {
log.Debugf("the status of restore %q is %q, skip", restore.GetName(), restore.Status.Phase)
continue
}
Expand All @@ -1101,5 +1102,56 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client,
continue
}
log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason)
markDataDownloadsCancel(ctx, client, restore, log)
}
}

func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup velerov1api.Backup, log logrus.FieldLogger) {
dataUploads := &velerov2alpha1api.DataUploadList{}

if err := client.List(ctx, dataUploads, &ctrlclient.MatchingFields{"metadata.namespace": backup.GetNamespace()}, &ctrlclient.MatchingLabels{velerov1api.BackupUIDLabel: string(backup.GetUID())}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list dataUploads")
return
}

for i := range dataUploads.Items {
du := dataUploads.Items[i]
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
Lyndon-Li marked this conversation as resolved.
Show resolved Hide resolved
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
log.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}
}

func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, restore velerov1api.Restore, log logrus.FieldLogger) {
dataDownloads := &velerov2alpha1api.DataDownloadList{}

if err := client.List(ctx, dataDownloads, &ctrlclient.MatchingFields{"metadata.namespace": restore.GetNamespace()}, &ctrlclient.MatchingLabels{velerov1api.RestoreUIDLabel: string(restore.GetUID())}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list dataDownloads")
return
}

for i := range dataDownloads.Items {
dd := dataDownloads.Items[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
Lyndon-Li marked this conversation as resolved.
Show resolved Hide resolved
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName())
continue
}
log.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}
}
85 changes: 70 additions & 15 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type DataDownloadReconciler struct {
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
clock clock.WithTickerAndDelayedExecution
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
repositoryEnsurer *repository.Ensurer
Expand All @@ -73,7 +73,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
logger: logger.WithField("controller", "DataDownload"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
Clock: &clock.RealClock{},
nodeName: nodeName,
repositoryEnsurer: repoEnsurer,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
Expand Down Expand Up @@ -134,9 +134,15 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

log.Info("Data download is accepted")

if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}

hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}

// ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
Expand All @@ -147,7 +153,10 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Status.StartTimestamp != nil {
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
} else if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, dd)
}
Expand All @@ -156,6 +165,13 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared")

if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}

fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)

if fsRestore != nil {
Expand Down Expand Up @@ -280,7 +296,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na

original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
} else {
Expand Down Expand Up @@ -322,9 +338,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dd.Status.StartTimestamp.IsZero() {
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
}
Expand Down Expand Up @@ -398,15 +414,13 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*v1.Pod)

dd := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Labels[velerov1api.DataDownloadLabel],
}, dd)

dd, err := findDataDownloadByPod(r.client, *pod)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
return []reconcile.Request{}
} else if dd == nil {
r.logger.WithField("Restore pod", pod.Name).Error("get empty DataDownload")
return []reconcile.Request{}
Lyndon-Li marked this conversation as resolved.
Show resolved Hide resolved
}

if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
Expand Down Expand Up @@ -438,6 +452,30 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
return requests
}

func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
pods := &v1.PodList{}
var dataDownloads []*velerov2alpha1api.DataDownload
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
return nil, errors.Wrapf(err, "failed to list pods on current node")
}

for _, pod := range pods.Items {
if pod.Spec.NodeName != r.nodeName {
Lyndon-Li marked this conversation as resolved.
Show resolved Hide resolved
r.logger.Debugf("Pod %s related data download will not handled by %s nodes", pod.GetName(), r.nodeName)
continue
}
dd, err := findDataDownloadByPod(cli, pod)
if err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to get dataDownload by pod")
continue
} else if dd != nil {
dataDownloads = append(dataDownloads, dd)
}
}
return dataDownloads, nil
}

func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
ssb.Status.Node = r.nodeName
Expand All @@ -455,7 +493,7 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}

if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
Expand All @@ -471,7 +509,7 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel
// and the success one could handle later logic
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
})

if err != nil {
Expand Down Expand Up @@ -549,3 +587,20 @@ func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectRef
APIVersion: dd.APIVersion,
}
}

func findDataDownloadByPod(client client.Client, pod v1.Pod) (*velerov2alpha1api.DataDownload, error) {
if label, exist := pod.Labels[velerov1api.DataDownloadLabel]; exist {
dd := &velerov2alpha1api.DataDownload{}
err := client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: label,
}, dd)

if err != nil {
return nil, errors.Wrapf(err, "error to find DataDownload by pod %s/%s", pod.Namespace, pod.Name)
}
return dd, nil
}

return nil, nil
}
10 changes: 9 additions & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,14 @@ func TestFindDataDownloadForPod(t *testing.T) {
assert.Equal(t, du.Namespace, requests[0].Namespace)
assert.Equal(t, du.Name, requests[0].Name)
},
}, {
name: "no selected label found for pod",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Result(),
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Empty(t, requests)
},
}, {
name: "no matched pod",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
Expand All @@ -594,7 +602,7 @@ func TestFindDataDownloadForPod(t *testing.T) {
},
},
{
name: "dataDownload not accepte",
name: "dataDownload not accept",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(),
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
Expand Down
Loading
Loading