Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #6562 #6563 data mover bugs #6603

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,10 +1107,8 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
msg := fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
if err := controller.MarkDataUploadCancel(ctx, client, &du, msg); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
Expand All @@ -1132,10 +1130,8 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil {
msg := fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
if err := controller.MarkDataDownloadCancel(ctx, client, &dd, msg); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName())
continue
}
Expand Down
56 changes: 48 additions & 8 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

if newObj.Status.Phase != v1.PodRunning {
if newObj.Status.Phase == "" {
return false
}

Expand Down Expand Up @@ -433,10 +433,18 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
return []reconcile.Request{}
}

requests := make([]reconcile.Request, 1)
if kube.IsPodInAbnormalState(pod) { // let the abnormal restore pod failed early
msg := fmt.Sprintf("restore mark as cancel to failed early for restore pod %s/%s is not in running status", pod.Namespace, pod.Name)
if err := MarkDataDownloadCancel(context.Background(), r.client, dd, msg); err != nil {
r.logger.WithFields(logrus.Fields{
"Datadownload": dd.Name,
"Restore pod": pod.Name,
}).WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
}

r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)

// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
Expand All @@ -448,6 +456,7 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
return []reconcile.Request{}
}

requests := make([]reconcile.Request, 1)
requests[0] = reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: dd.Namespace,
Expand Down Expand Up @@ -524,13 +533,34 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel
return false, err
}

if succeeded {
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil
if !succeeded {
r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others")
return false, nil
}

if err = r.AddAcceptedLabel(ctx, dd); err != nil {
return false, err
}

r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others")
return false, nil
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil

}

func (r *DataDownloadReconciler) AddAcceptedLabel(ctx context.Context, dd *velerov2alpha1api.DataDownload) error {
updated := dd.DeepCopy()
labels := updated.GetLabels()
if labels == nil {
labels = make(map[string]string)
}

labels[acceptNodeLabelKey] = r.nodeName
updated.SetLabels(labels)
if err := r.client.Patch(ctx, updated, client.MergeFrom(dd)); err != nil {
return errors.Wrapf(err, "failed to add accepted label for datadownload %s", dd.Name)
}

return nil
}

func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *velerov2alpha1api.DataDownload) {
Expand Down Expand Up @@ -614,3 +644,13 @@ func findDataDownloadByPod(client client.Client, pod v1.Pod) (*velerov2alpha1api

return nil, nil
}

func MarkDataDownloadCancel(ctx context.Context, cli client.Client, dd *velerov2alpha1api.DataDownload, msg string) error {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = msg
if err := cli.Patch(ctx, updated, client.MergeFrom(dd)); err != nil {
return errors.Wrapf(err, "failed to mark datadownload as canceled %s", dd.Name)
}
return nil
}
54 changes: 48 additions & 6 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (

const (
dataUploadDownloadRequestor string = "snapshot-data-upload-download"
acceptNodeLabelKey string = "velero.io/accepted-by"
preparingMonitorFrequency time.Duration = time.Minute
)

Expand Down Expand Up @@ -412,7 +413,7 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

if newObj.Status.Phase != corev1.PodRunning {
if newObj.Status.Phase == "" {
return false
}

Expand Down Expand Up @@ -450,6 +451,17 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco
return []reconcile.Request{}
}

if kube.IsPodInAbnormalState(pod) { // let the abnormal backup pod failed early
msg := fmt.Sprintf("backup mark as cancel to failed early for restore pod %s/%s is not in running status", pod.Namespace, pod.Name)
if err := MarkDataUploadCancel(context.Background(), r.client, du, msg); err != nil {
r.logger.WithFields(logrus.Fields{
"Datadupload": du.Name,
"Backup pod": pod.Name,
}).WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
}

r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name)

// we don't expect anyone else update the CR during the Prepare process
Expand Down Expand Up @@ -546,13 +558,33 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov
return false, err
}

if succeeded {
r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName)
return true, nil
if !succeeded {
r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others")
return false, nil
}

if err = r.AddAcceptedLabel(ctx, du); err != nil {
return false, err
}

r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName)
return true, nil
}

func (r *DataUploadReconciler) AddAcceptedLabel(ctx context.Context, du *velerov2alpha1api.DataUpload) error {
updated := du.DeepCopy()
labels := updated.GetLabels()
if labels == nil {
labels = make(map[string]string)
}

r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others")
return false, nil
labels[acceptNodeLabelKey] = r.nodeName
updated.SetLabels(labels)
if err := r.client.Patch(ctx, updated, client.MergeFrom(du)); err != nil {
return errors.Wrapf(err, "failed to add accepted label for dataupload %s", du.Name)
}

return nil
}

func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov2alpha1api.DataUpload) {
Expand Down Expand Up @@ -666,3 +698,13 @@ func findDataUploadByPod(client client.Client, pod corev1.Pod) (*velerov2alpha1a
}
return nil, nil
}

func MarkDataUploadCancel(ctx context.Context, cli client.Client, du *velerov2alpha1api.DataUpload, msg string) error {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = msg
if err := cli.Patch(ctx, updated, client.MergeFrom(du)); err != nil {
return errors.Wrapf(err, "failed to mark dataupload as canceled %s", du.Name)
}
return nil
}
12 changes: 12 additions & 0 deletions pkg/util/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,15 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface

return nil
}

func IsPodInAbnormalState(pod *corev1api.Pod) bool {
for _, containerStatus := range pod.Status.ContainerStatuses {
if !containerStatus.Ready {
return true
}
if containerStatus.State.Waiting != nil {
return true
}
}
return false
}
93 changes: 93 additions & 0 deletions pkg/util/kube/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,96 @@ func TestDeletePodIfAny(t *testing.T) {
})
}
}

func TestIsPodInAbnormalState(t *testing.T) {
testCases := []struct {
description string
pod *corev1api.Pod
expected bool
}{
{
description: "All containers ready",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{Ready: true},
},
},
},
expected: false,
},
{
description: "Some containers not ready",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{Ready: false},
},
},
},
expected: true,
},
{
description: "Container waiting",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{
Ready: false,
State: corev1api.ContainerState{
Waiting: &corev1api.ContainerStateWaiting{},
},
},
},
},
},
expected: true,
},
{
description: "All containers ready but waiting",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{
Ready: true,
State: corev1api.ContainerState{
Waiting: &corev1api.ContainerStateWaiting{},
},
},
},
},
},
expected: true,
},
{
description: "All containers ready and running",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{
Ready: true,
State: corev1api.ContainerState{
Running: &corev1api.ContainerStateRunning{},
},
},
},
},
},
expected: false,
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
actual := IsPodInAbnormalState(tc.pod)
if actual != tc.expected {
t.Errorf("Expected pod to be in abnormal state: %v, but got: %v", tc.expected, actual)
}
})
}
}
Loading