diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index fcd3c3d501..f5d322352a 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -1107,10 +1107,8 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted || du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared || du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { - updated := du.DeepCopy() - updated.Spec.Cancel = true - updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase) - if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil { + msg := fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase) + if err := controller.MarkDataUploadCancel(ctx, client, &du, msg); err != nil { log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) continue } @@ -1132,10 +1130,8 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted || dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { - updated := dd.DeepCopy() - updated.Spec.Cancel = true - updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase) - if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil { + msg := fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase) + if err := controller.MarkDataDownloadCancel(ctx, client, &dd, msg); err != nil { log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName()) continue } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index a5947444e9..bec54c44d1 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -394,7 +394,7 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { return false } - if newObj.Status.Phase != v1.PodRunning { + if newObj.Status.Phase == "" { return false } @@ -433,10 +433,18 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) return []reconcile.Request{} } - requests := make([]reconcile.Request, 1) + if kube.IsPodInAbnormalState(pod) { // let the abnormal restore pod failed early + msg := fmt.Sprintf("restore mark as cancel to failed early for restore pod %s/%s is not in running status", pod.Namespace, pod.Name) + if err := MarkDataDownloadCancel(context.Background(), r.client, dd, msg); err != nil { + r.logger.WithFields(logrus.Fields{ + "Datadownload": dd.Name, + "Restore pod": pod.Name, + }).WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout") + return []reconcile.Request{} + } + } r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name) - // we don't expect anyone else update the CR during the Prepare process updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload) if err != nil || !updated { @@ -448,6 +456,7 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) return []reconcile.Request{} } + requests := make([]reconcile.Request, 1) requests[0] = reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: dd.Namespace, @@ -524,13 +533,34 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel return false, err } - if succeeded { - r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName) - return true, nil + if !succeeded { + r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others") + return false, nil + } + + if err = r.AddAcceptedLabel(ctx, dd); err != nil { + return false, err } - r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others") - return false, nil + r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName) + return true, nil + +} + +func (r *DataDownloadReconciler) AddAcceptedLabel(ctx context.Context, dd *velerov2alpha1api.DataDownload) error { + updated := dd.DeepCopy() + labels := updated.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + + labels[acceptNodeLabelKey] = r.nodeName + updated.SetLabels(labels) + if err := r.client.Patch(ctx, updated, client.MergeFrom(dd)); err != nil { + return errors.Wrapf(err, "failed to add accepted label for datadownload %s", dd.Name) + } + + return nil } func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *velerov2alpha1api.DataDownload) { @@ -614,3 +644,13 @@ func findDataDownloadByPod(client client.Client, pod v1.Pod) (*velerov2alpha1api return nil, nil } + +func MarkDataDownloadCancel(ctx context.Context, cli client.Client, dd *velerov2alpha1api.DataDownload, msg string) error { + updated := dd.DeepCopy() + updated.Spec.Cancel = true + updated.Status.Message = msg + if err := cli.Patch(ctx, updated, client.MergeFrom(dd)); err != nil { + return errors.Wrapf(err, "failed to mark datadownload as canceled %s", dd.Name) + } + return nil +} diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index f52de9f9bb..9e7618df0d 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -55,6 +55,7 @@ import ( const ( dataUploadDownloadRequestor string = "snapshot-data-upload-download" + acceptNodeLabelKey string = "velero.io/accepted-by" preparingMonitorFrequency time.Duration = time.Minute ) @@ -412,7 +413,7 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error { return false } - if newObj.Status.Phase != corev1.PodRunning { + if newObj.Status.Phase == "" { return false } @@ -450,6 +451,17 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco return []reconcile.Request{} } + if kube.IsPodInAbnormalState(pod) { // let the abnormal backup pod failed early + msg := fmt.Sprintf("backup mark as cancel to failed early for restore pod %s/%s is not in running status", pod.Namespace, pod.Name) + if err := MarkDataUploadCancel(context.Background(), r.client, du, msg); err != nil { + r.logger.WithFields(logrus.Fields{ + "Datadupload": du.Name, + "Backup pod": pod.Name, + }).WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout") + return []reconcile.Request{} + } + } + r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name) // we don't expect anyone else update the CR during the Prepare process @@ -546,13 +558,33 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov return false, err } - if succeeded { - r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName) - return true, nil + if !succeeded { + r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others") + return false, nil + } + + if err = r.AddAcceptedLabel(ctx, du); err != nil { + return false, err + } + + r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName) + return true, nil +} + +func (r *DataUploadReconciler) AddAcceptedLabel(ctx context.Context, du *velerov2alpha1api.DataUpload) error { + updated := du.DeepCopy() + labels := updated.GetLabels() + if labels == nil { + labels = make(map[string]string) } - r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others") - return false, nil + labels[acceptNodeLabelKey] = r.nodeName + updated.SetLabels(labels) + if err := r.client.Patch(ctx, updated, client.MergeFrom(du)); err != nil { + return errors.Wrapf(err, "failed to add accepted label for dataupload %s", du.Name) + } + + return nil } func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov2alpha1api.DataUpload) { @@ -666,3 +698,13 @@ func findDataUploadByPod(client client.Client, pod corev1.Pod) (*velerov2alpha1a } return nil, nil } + +func MarkDataUploadCancel(ctx context.Context, cli client.Client, du *velerov2alpha1api.DataUpload, msg string) error { + updated := du.DeepCopy() + updated.Spec.Cancel = true + updated.Status.Message = msg + if err := cli.Patch(ctx, updated, client.MergeFrom(du)); err != nil { + return errors.Wrapf(err, "failed to mark dataupload as canceled %s", du.Name) + } + return nil +} diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index c1464a3d69..c10fd01446 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -110,3 +110,15 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface return nil } + +func IsPodInAbnormalState(pod *corev1api.Pod) bool { + for _, containerStatus := range pod.Status.ContainerStatuses { + if !containerStatus.Ready { + return true + } + if containerStatus.State.Waiting != nil { + return true + } + } + return false +} diff --git a/pkg/util/kube/pod_test.go b/pkg/util/kube/pod_test.go index 6f39c0b23c..43808958d3 100644 --- a/pkg/util/kube/pod_test.go +++ b/pkg/util/kube/pod_test.go @@ -343,3 +343,96 @@ func TestDeletePodIfAny(t *testing.T) { }) } } + +func TestIsPodInAbnormalState(t *testing.T) { + testCases := []struct { + description string + pod *corev1api.Pod + expected bool + }{ + { + description: "All containers ready", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Ready: true}, + {Ready: true}, + }, + }, + }, + expected: false, + }, + { + description: "Some containers not ready", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Ready: true}, + {Ready: false}, + }, + }, + }, + expected: true, + }, + { + description: "Container waiting", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Ready: true}, + { + Ready: false, + State: corev1api.ContainerState{ + Waiting: &corev1api.ContainerStateWaiting{}, + }, + }, + }, + }, + }, + expected: true, + }, + { + description: "All containers ready but waiting", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Ready: true}, + { + Ready: true, + State: corev1api.ContainerState{ + Waiting: &corev1api.ContainerStateWaiting{}, + }, + }, + }, + }, + }, + expected: true, + }, + { + description: "All containers ready and running", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Ready: true}, + { + Ready: true, + State: corev1api.ContainerState{ + Running: &corev1api.ContainerStateRunning{}, + }, + }, + }, + }, + }, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + actual := IsPodInAbnormalState(tc.pod) + if actual != tc.expected { + t.Errorf("Expected pod to be in abnormal state: %v, but got: %v", tc.expected, actual) + } + }) + } +}