Skip to content

Commit

Permalink
Merge pull request #231 from blackpiglet/code_refactor
Browse files Browse the repository at this point in the history
Only generate one Async Operation for CSI backup and some code refactor
  • Loading branch information
shubham-pampattiwar authored Mar 22, 2024
2 parents 3439ce3 + 96c47e7 commit 36e5755
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 87 deletions.
10 changes: 5 additions & 5 deletions internal/backup/pvc_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (p *PVCBackupItemAction) Progress(operationID string, backup *velerov1api.B
return progress, biav2.InvalidOperationIDError(operationID)
}

dataUpload, err := getDataUpload(context.Background(), backup, p.CRClient, operationID)
dataUpload, err := getDataUpload(context.Background(), p.CRClient, operationID)
if err != nil {
p.Log.Errorf("fail to get DataUpload for backup %s/%s: %s", backup.Namespace, backup.Name, err.Error())
return progress, err
Expand Down Expand Up @@ -294,7 +294,7 @@ func (p *PVCBackupItemAction) Cancel(operationID string, backup *velerov1api.Bac
return biav2.InvalidOperationIDError(operationID)
}

dataUpload, err := getDataUpload(context.Background(), backup, p.CRClient, operationID)
dataUpload, err := getDataUpload(context.Background(), p.CRClient, operationID)
if err != nil {
p.Log.Errorf("fail to get DataUpload for backup %s/%s: %s", backup.Namespace, backup.Name, err.Error())
return err
Expand Down Expand Up @@ -365,10 +365,10 @@ func createDataUpload(ctx context.Context, backup *velerov1api.Backup, crClient
return dataUpload, err
}

func getDataUpload(ctx context.Context, backup *velerov1api.Backup,
func getDataUpload(ctx context.Context,
crClient crclient.Client, operationID string) (*velerov2alpha1.DataUpload, error) {
dataUploadList := new(velerov2alpha1.DataUploadList)
err := crClient.List(context.Background(), dataUploadList, &crclient.ListOptions{
err := crClient.List(ctx, dataUploadList, &crclient.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID}),
})
if err != nil {
Expand All @@ -392,7 +392,7 @@ func cancelDataUpload(ctx context.Context, crClient crclient.Client,
updatedDataUpload := dataUpload.DeepCopy()
updatedDataUpload.Spec.Cancel = true

err := crClient.Patch(context.Background(), updatedDataUpload, crclient.MergeFrom(dataUpload))
err := crClient.Patch(ctx, updatedDataUpload, crclient.MergeFrom(dataUpload))
if err != nil {
return errors.Wrap(err, "error patch DataUpload")
}
Expand Down
4 changes: 2 additions & 2 deletions internal/backup/pvc_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func TestExecute(t *testing.T) {
if boolptr.IsSetToTrue(tc.backup.Spec.SnapshotMoveData) == true {
go func() {
var vsList *v1.VolumeSnapshotList
err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
vsList, err = pvcBIA.SnapshotClient.SnapshotV1().VolumeSnapshots(tc.pvc.Namespace).List(context.Background(), metav1.ListOptions{})
err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) {
vsList, err = pvcBIA.SnapshotClient.SnapshotV1().VolumeSnapshots(tc.pvc.Namespace).List(ctx, metav1.ListOptions{})
require.NoError(t, err)
if err != nil || len(vsList.Items) == 0 {
return false, nil
Expand Down
36 changes: 34 additions & 2 deletions internal/backup/volumesnapshot_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ func (p *VolumeSnapshotBackupItemAction) Execute(item runtime.Unstructured, back
Namespace: vs.Namespace,
Name: vs.Name,
},
{
GroupResource: kuberesource.VolumeSnapshotContents,
Name: vsc.Name,
},
}
}

Expand Down Expand Up @@ -229,7 +233,8 @@ func (p *VolumeSnapshotBackupItemAction) Progress(operationID string, backup *ve
return progress, errors.WithStack(err)
}

vs, err := snapshotClient.SnapshotV1().VolumeSnapshots(operationIDParts[0]).Get(context.Background(), operationIDParts[1], metav1.GetOptions{})
vs, err := snapshotClient.SnapshotV1().VolumeSnapshots(operationIDParts[0]).Get(
context.Background(), operationIDParts[1], metav1.GetOptions{})
if err != nil {
p.Log.Errorf("error getting volumesnapshot %s/%s: %s", operationIDParts[0], operationIDParts[1], err.Error())
return progress, errors.WithStack(err)
Expand All @@ -241,13 +246,40 @@ func (p *VolumeSnapshotBackupItemAction) Progress(operationID string, backup *ve
}

if boolptr.IsSetToTrue(vs.Status.ReadyToUse) {
progress.Completed = true
p.Log.Debugf("VolumeSnapshot %s/%s is ReadyToUse. Continue on querying corresponding VolumeSnapshotContent.",
vs.Namespace, vs.Name)
} else if vs.Status.Error != nil {
errorMessage := ""
if vs.Status.Error.Message != nil {
errorMessage = *vs.Status.Error.Message
}
p.Log.Warnf("VolumeSnapshot has a temporary error %s. Snapshot controller will retry later.", errorMessage)

return progress, nil
}

if vs.Status != nil && vs.Status.BoundVolumeSnapshotContentName != nil {
vsc, err := snapshotClient.SnapshotV1().VolumeSnapshotContents().Get(
context.Background(), *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{})
if err != nil {
p.Log.Errorf("error getting VolumeSnapshotContent %s: %s", *vs.Status.BoundVolumeSnapshotContentName, err.Error())
return progress, errors.WithStack(err)
}

if vsc.Status == nil {
p.Log.Debugf("VolumeSnapshotContent %s has an empty Status. Skip progress update.", vsc.Name)
return progress, nil
}

if boolptr.IsSetToTrue(vsc.Status.ReadyToUse) {
progress.Completed = true
} else if vsc.Status.Error != nil {
progress.Completed = true
if vsc.Status.Error.Message != nil {
progress.Err = *vsc.Status.Error.Message
}
p.Log.Warnf("VolumeSnapshotContent meets an error %s.", progress.Err)
}
}

return progress, nil
Expand Down
71 changes: 2 additions & 69 deletions internal/backup/volumesnapshotcontent_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,18 @@ limitations under the License.
package backup

import (
"context"
"fmt"
"strings"
"time"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/vmware-tanzu/velero-plugin-for-csi/internal/util"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
)

// VolumeSnapshotContentBackupItemAction is a backup item action plugin to backup
Expand Down Expand Up @@ -91,75 +83,16 @@ func (p *VolumeSnapshotContentBackupItemAction) Execute(item runtime.Unstructure
return nil, nil, "", nil, errors.WithStack(err)
}

backupOnGoing := snapCont.GetLabels()[velerov1api.BackupNameLabel] == label.GetValidName(backup.Name)
operationID := ""
var itemToUpdate []velero.ResourceIdentifier

// Only return Async operation for VSC created for this backup.
if backupOnGoing {
// The operationID is of the form <volumesnapshotcontent-name>/<started-time>
operationID = snapCont.Name + "/" + time.Now().Format(time.RFC3339)
itemToUpdate = []velero.ResourceIdentifier{
{
GroupResource: kuberesource.VolumeSnapshotContents,
Name: snapCont.Name,
},
}

}

p.Log.Infof("Returning from VolumeSnapshotContentBackupItemAction with %d additionalItems to backup", len(additionalItems))
return &unstructured.Unstructured{Object: snapContMap}, additionalItems, operationID, itemToUpdate, nil
return &unstructured.Unstructured{Object: snapContMap}, additionalItems, "", nil, nil
}

func (p *VolumeSnapshotContentBackupItemAction) Name() string {
return "VolumeSnapshotContentBackupItemAction"
}

func (p *VolumeSnapshotContentBackupItemAction) Progress(operationID string, backup *velerov1api.Backup) (velero.OperationProgress, error) {
progress := velero.OperationProgress{}
if operationID == "" {
return progress, biav2.InvalidOperationIDError(operationID)
}

// The operationId is of the form <volumesnapshotcontent-name>/<started-time>
operationsIDParts := strings.Split(operationID, "/")
if len(operationsIDParts) != 2 {
p.Log.WithField("operationID", operationID).Error("Invalid operationID")
return progress, biav2.InvalidOperationIDError(operationID)
}
var err error
if progress.Started, err = time.Parse(time.RFC3339, operationsIDParts[1]); err != nil {
p.Log.Errorf("error parsing operationID's StartedTime part into time %s: %s", operationID, err.Error())
return progress, errors.WithStack(fmt.Errorf("fail to parse StartedTime: %s", err.Error()))
}

_, snapshotClient, err := util.GetClients()
if err != nil {
return progress, errors.WithStack(err)
}

vsc, err := snapshotClient.SnapshotV1().VolumeSnapshotContents().Get(context.Background(), operationsIDParts[0], metav1.GetOptions{})
if err != nil {
p.Log.Errorf("error getting volumesnapshotcontent %s: %s", operationsIDParts[0], err.Error())
return progress, errors.WithStack(err)
}

if vsc.Status == nil {
p.Log.Debugf("VolumeSnapshotContent %s has an empty Status. Skip progress update.", vsc.Name)
return progress, nil
}

if boolptr.IsSetToTrue(vsc.Status.ReadyToUse) {
progress.Completed = true
} else if vsc.Status.Error != nil {
progress.Completed = true
if vsc.Status.Error.Message != nil {
progress.Err = *vsc.Status.Error.Message
}
}

return progress, nil
return velero.OperationProgress{}, nil
}

func (p *VolumeSnapshotContentBackupItemAction) Cancel(operationID string, backup *velerov1api.Backup) error {
Expand Down
7 changes: 5 additions & 2 deletions internal/restore/pvc_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ func getDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc

func getDataDownload(ctx context.Context, namespace string, operationID string, crClient crclient.Client) (*velerov2alpha1.DataDownload, error) {
dataDownloadList := new(velerov2alpha1.DataDownloadList)
err := crClient.List(ctx, dataDownloadList, &crclient.ListOptions{LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID})})
err := crClient.List(ctx, dataDownloadList, &crclient.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID}),
Namespace: namespace,
})
if err != nil {
return nil, errors.Wrap(err, "fail to list DataDownload")
}
Expand All @@ -356,7 +359,7 @@ func cancelDataDownload(ctx context.Context, crClient crclient.Client,
updatedDataDownload := dataDownload.DeepCopy()
updatedDataDownload.Spec.Cancel = true

err := crClient.Patch(context.Background(), updatedDataDownload, crclient.MergeFrom(dataDownload))
err := crClient.Patch(ctx, updatedDataDownload, crclient.MergeFrom(dataDownload))
return err
}

Expand Down
19 changes: 19 additions & 0 deletions internal/restore/pvc_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,25 @@ func TestProgress(t *testing.T) {
operationID: "testing",
expectedErr: "didn't find DataDownload",
},
{
name: "DataDownload is not in the expected namespace",
restore: builder.ForRestore("velero", "test").Result(),
dataDownload: &velerov2alpha1.DataDownload{
TypeMeta: metav1.TypeMeta{
Kind: "DataUpload",
APIVersion: velerov2alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "invalid-namespace",
Name: "testing",
Labels: map[string]string{
velerov1api.AsyncOperationIDLabel: "testing",
},
},
},
operationID: "testing",
expectedErr: "didn't find DataDownload",
},
{
name: "DataUpload is found",
restore: builder.ForRestore("velero", "test").Result(),
Expand Down
14 changes: 7 additions & 7 deletions internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap
interval := 5 * time.Second
var snapshotContent *snapshotv1api.VolumeSnapshotContent

err := wait.PollImmediate(interval, timeout, func() (bool, error) {
vs, err := snapshotClient.VolumeSnapshots(volSnap.Namespace).Get(context.TODO(), volSnap.Name, metav1.GetOptions{})
err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, func(ctx context.Context) (bool, error) {
vs, err := snapshotClient.VolumeSnapshots(volSnap.Namespace).Get(ctx, volSnap.Name, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volSnap.Namespace, volSnap.Name))
}
Expand All @@ -247,7 +247,7 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap
return false, nil
}

snapshotContent, err = snapshotClient.VolumeSnapshotContents().Get(context.TODO(), *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{})
snapshotContent, err = snapshotClient.VolumeSnapshotContents().Get(ctx, *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshotcontent %s for volumesnapshot %s/%s", *vs.Status.BoundVolumeSnapshotContentName, vs.Namespace, vs.Name))
}
Expand All @@ -267,7 +267,7 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap
})

if err != nil {
if err == wait.ErrWaitTimeout {
if err == wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) {
if snapshotContent != nil && snapshotContent.Status != nil && snapshotContent.Status.Error != nil {
log.Errorf("Timed out awaiting reconciliation of volumesnapshot, Volumesnapshotcontent %s has error: %v", snapshotContent.Name, *snapshotContent.Status.Error.Message)
return nil, errors.Errorf("CSI got timed out with error: %v", *snapshotContent.Status.Error.Message)
Expand Down Expand Up @@ -418,7 +418,7 @@ func CleanupVolumeSnapshot(volSnap *snapshotv1api.VolumeSnapshot, snapshotClient
}
}

// deleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot
// DeleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot
// instance.
func DeleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot, vsc snapshotv1api.VolumeSnapshotContent,
backup *velerov1api.Backup, snapshotClient snapshotter.SnapshotV1Interface, logger logrus.FieldLogger) {
Expand Down Expand Up @@ -483,8 +483,8 @@ func recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent, back
}

// Check VolumeSnapshotContents is already deleted, before re-creating it.
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
_, err := snapshotClient.VolumeSnapshotContents().Get(context.TODO(), vsc.Name, metav1.GetOptions{})
err = wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, func(ctx context.Context) (bool, error) {
_, err := snapshotClient.VolumeSnapshotContents().Get(ctx, vsc.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
Expand Down

0 comments on commit 36e5755

Please sign in to comment.