diff --git a/changelogs/unreleased/6308-Lyndon-Li b/changelogs/unreleased/6308-Lyndon-Li new file mode 100644 index 0000000000..4a80f3922a --- /dev/null +++ b/changelogs/unreleased/6308-Lyndon-Li @@ -0,0 +1 @@ +Add the code for data mover backup expose \ No newline at end of file diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go new file mode 100644 index 0000000000..d6482f2a96 --- /dev/null +++ b/pkg/exposer/csi_snapshot.go @@ -0,0 +1,385 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" + + "github.com/vmware-tanzu/velero/pkg/util/csi" + "github.com/vmware-tanzu/velero/pkg/util/kube" + + snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/typed/volumesnapshot/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +// CSISnapshotExposeParam define the input param for Expose of CSI snapshots +type CSISnapshotExposeParam struct { + // SourceNamespace is the original namespace of the volume that the snapshot is taken for + SourceNamespace string + + // AccessMode defines the mode to access the snapshot + AccessMode string + + // StorageClass is the storage class of the volume that the snapshot is taken for + StorageClass string + + // HostingPodLabels is the labels that are going to apply to the hosting pod + HostingPodLabels map[string]string +} + +// CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots +type CSISnapshotExposeWaitParam struct { + // NodeClient is the client that is used to find the hosting pod + NodeClient client.Client + NodeName string +} + +// NewCSISnapshotExposer create a new instance of CSI snapshot exposer +func NewCSISnapshotExposer(kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, log logrus.FieldLogger) SnapshotExposer { + return &csiSnapshotExposer{ + kubeClient: kubeClient, + csiSnapshotClient: csiSnapshotClient, + log: log, + } +} + +type csiSnapshotExposer struct { + kubeClient kubernetes.Interface + csiSnapshotClient snapshotter.SnapshotV1Interface + log logrus.FieldLogger +} + +func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, snapshotName string, timeout time.Duration, param interface{}) error { + csiExposeParam := param.(*CSISnapshotExposeParam) + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + }) + + curLog.Info("Exposing CSI snapshot") + + volumeSnapshot, err := csi.WaitVolumeSnapshotReady(ctx, e.csiSnapshotClient, snapshotName, csiExposeParam.SourceNamespace, timeout) + if err != nil { + return errors.Wrapf(err, "error wait volume snapshot ready") + } + + curLog.Info("Volumesnapshot is ready") + + vsc, err := csi.GetVolumeSnapshotContentForVolumeSnapshot(volumeSnapshot, e.csiSnapshotClient) + if err != nil { + return errors.Wrap(err, "error to get volume snapshot content") + } + + curLog.WithField("vsc name", vsc.Name).WithField("vs name", volumeSnapshot.Name).Infof("Got VSC from VS in namespace %s", volumeSnapshot.Namespace) + + retained, err := csi.RetainVSC(ctx, e.csiSnapshotClient, vsc) + if err != nil { + return errors.Wrap(err, "error to retain volume snapshot content") + } + + curLog.WithField("vsc name", vsc.Name).WithField("retained", (retained != nil)).Info("Finished to retain VSC") + + defer func() { + if retained != nil { + csi.DeleteVolumeSnapshotContentIfAny(ctx, e.csiSnapshotClient, retained.Name, curLog) + } + }() + + err = csi.EnsureDeleteVS(ctx, e.csiSnapshotClient, volumeSnapshot.Name, volumeSnapshot.Namespace, timeout) + if err != nil { + return errors.Wrap(err, "error to delete volume snapshot") + } + + curLog.WithField("vs name", volumeSnapshot.Name).Infof("VS is deleted in namespace %s", volumeSnapshot.Namespace) + + err = csi.EnsureDeleteVSC(ctx, e.csiSnapshotClient, vsc.Name, timeout) + if err != nil { + return errors.Wrap(err, "error to delete volume snapshot content") + } + + curLog.WithField("vsc name", vsc.Name).Infof("VSC is deleted") + retained = nil + + backupVS, err := e.createBackupVS(ctx, ownerObject, volumeSnapshot) + if err != nil { + return errors.Wrap(err, "error to create backup volume snapshot") + } + + curLog.WithField("vs name", backupVS.Name).Infof("Backup VS is created from %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name) + + defer func() { + if err != nil { + csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, backupVS.Name, backupVS.Namespace, curLog) + } + }() + + backupVSC, err := e.createBackupVSC(ctx, ownerObject, vsc, backupVS) + if err != nil { + return errors.Wrap(err, "error to create backup volume snapshot content") + } + + curLog.WithField("vsc name", backupVSC.Name).Infof("Backup VSC is created from %s", vsc.Name) + + backupPVC, err := e.createBackupPVC(ctx, ownerObject, backupVS.Name, csiExposeParam.StorageClass, csiExposeParam.AccessMode, *volumeSnapshot.Status.RestoreSize) + if err != nil { + return errors.Wrap(err, "error to create backup pvc") + } + + curLog.WithField("pvc name", backupPVC.Name).Info("Backup PVC is created") + + defer func() { + if err != nil { + kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), backupPVC.Name, backupPVC.Namespace, curLog) + } + }() + + backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels) + if err != nil { + return errors.Wrap(err, "error to create backup pod") + } + + curLog.WithField("pod name", backupPod.Name).Info("Backup pod is created") + + defer func() { + if err != nil { + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), backupPod.Name, backupPod.Namespace, curLog) + } + }() + + return nil +} + +func (e *csiSnapshotExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, timeout time.Duration, param interface{}) (*ExposeResult, error) { + exposeWaitParam := param.(*CSISnapshotExposeWaitParam) + + backupPodName := ownerObject.Name + backupPVCName := ownerObject.Name + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + }) + + pod := &corev1.Pod{} + err := exposeWaitParam.NodeClient.Get(ctx, types.NamespacedName{ + Namespace: ownerObject.Namespace, + Name: backupPodName, + }, pod) + if err != nil { + if apierrors.IsNotFound(err) { + curLog.WithField("backup pod", backupPodName).Errorf("Backup pod is not running in the current node %s", exposeWaitParam.NodeName) + return nil, nil + } else { + return nil, errors.Wrapf(err, "error to get backup pod %s", backupPodName) + } + } + + curLog.WithField("pod", pod.Name).Infof("Backup pod is in running state in node %s", pod.Spec.NodeName) + + _, err = kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), backupPVCName, ownerObject.Namespace, timeout) + if err != nil { + return nil, errors.Wrapf(err, "error to wait backup PVC bound, %s", backupPVCName) + } + + curLog.WithField("backup pvc", backupPVCName).Info("Backup PVC is bound") + + return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, PVC: backupPVCName}}, nil +} + +func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference, vsName string, sourceNamespace string) { + backupPodName := ownerObject.Name + backupPVCName := ownerObject.Name + backupVSName := ownerObject.Name + + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), backupPodName, ownerObject.Namespace, e.log) + kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), backupPVCName, ownerObject.Namespace, e.log) + csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, backupVSName, ownerObject.Namespace, e.log) + csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, vsName, sourceNamespace, e.log) +} + +func getVolumeModeByAccessMode(accessMode string) (corev1.PersistentVolumeMode, error) { + if accessMode == AccessModeFileSystem { + return corev1.PersistentVolumeFilesystem, nil + } else { + return "", errors.Errorf("unsupported access mode %s", accessMode) + } +} + +func (e *csiSnapshotExposer) createBackupVS(ctx context.Context, ownerObject corev1.ObjectReference, snapshotVS *snapshotv1api.VolumeSnapshot) (*snapshotv1api.VolumeSnapshot, error) { + backupVSName := ownerObject.Name + backupVSCName := ownerObject.Name + + vs := &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: backupVSName, + Namespace: ownerObject.Namespace, + // Don't add ownerReference to SnapshotBackup. + // The backupPVC should be deleted before backupVS, otherwise, the deletion of backupVS will fail since + // backupPVC has its dataSource referring to it + }, + Spec: snapshotv1api.VolumeSnapshotSpec{ + Source: snapshotv1api.VolumeSnapshotSource{ + VolumeSnapshotContentName: &backupVSCName, + }, + VolumeSnapshotClassName: snapshotVS.Spec.VolumeSnapshotClassName, + }, + } + + return e.csiSnapshotClient.VolumeSnapshots(vs.Namespace).Create(ctx, vs, metav1.CreateOptions{}) +} + +func (e *csiSnapshotExposer) createBackupVSC(ctx context.Context, ownerObject corev1.ObjectReference, snapshotVSC *snapshotv1api.VolumeSnapshotContent, vs *snapshotv1api.VolumeSnapshot) (*snapshotv1api.VolumeSnapshotContent, error) { + backupVSCName := ownerObject.Name + + vsc := &snapshotv1api.VolumeSnapshotContent{ + ObjectMeta: metav1.ObjectMeta{ + Name: backupVSCName, + }, + Spec: snapshotv1api.VolumeSnapshotContentSpec{ + VolumeSnapshotRef: corev1.ObjectReference{ + Name: vs.Name, + Namespace: vs.Namespace, + UID: vs.UID, + ResourceVersion: vs.ResourceVersion, + }, + Source: snapshotv1api.VolumeSnapshotContentSource{ + SnapshotHandle: snapshotVSC.Status.SnapshotHandle, + }, + DeletionPolicy: snapshotVSC.Spec.DeletionPolicy, + Driver: snapshotVSC.Spec.Driver, + VolumeSnapshotClassName: snapshotVSC.Spec.VolumeSnapshotClassName, + }, + } + + return e.csiSnapshotClient.VolumeSnapshotContents().Create(ctx, vsc, metav1.CreateOptions{}) +} + +func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject corev1.ObjectReference, backupVS string, storageClass string, accessMode string, resource resource.Quantity) (*corev1.PersistentVolumeClaim, error) { + backupVCName := ownerObject.Name + + volumeMode, err := getVolumeModeByAccessMode(accessMode) + if err != nil { + return nil, err + } + + dataSource := &corev1.TypedLocalObjectReference{ + APIGroup: &snapshotv1api.SchemeGroupVersion.Group, + Kind: "VolumeSnapshot", + Name: backupVS, + } + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ownerObject.Namespace, + Name: backupVCName, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ownerObject.APIVersion, + Kind: ownerObject.Kind, + Name: ownerObject.Name, + UID: ownerObject.UID, + Controller: boolptr.True(), + }, + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + StorageClassName: &storageClass, + VolumeMode: &volumeMode, + DataSource: dataSource, + DataSourceRef: dataSource, + + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource, + }, + }, + }, + } + + created, err := e.kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error to create pvc") + } + + return created, err +} + +func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, label map[string]string) (*corev1.Pod, error) { + podName := ownerObject.Name + + var gracePeriod int64 = 0 + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: ownerObject.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ownerObject.APIVersion, + Kind: ownerObject.Kind, + Name: ownerObject.Name, + UID: ownerObject.UID, + Controller: boolptr.True(), + }, + }, + Labels: label, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: podName, + Image: "alpine:latest", + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"sleep", "infinity"}, + VolumeMounts: []corev1.VolumeMount{{ + Name: backupPVC.Name, + MountPath: "/" + backupPVC.Name, + }}, + }, + }, + TerminationGracePeriodSeconds: &gracePeriod, + Volumes: []corev1.Volume{{ + Name: backupPVC.Name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: backupPVC.Name, + }, + }, + }}, + }, + } + + return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{}) +} diff --git a/pkg/exposer/csi_snapshot_test.go b/pkg/exposer/csi_snapshot_test.go new file mode 100644 index 0000000000..20a4504410 --- /dev/null +++ b/pkg/exposer/csi_snapshot_test.go @@ -0,0 +1,298 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "testing" + "time" + + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" + snapshotFake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + clientTesting "k8s.io/client-go/testing" + + corev1 "k8s.io/api/core/v1" + + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/boolptr" +) + +type reactor struct { + verb string + resource string + reactorFunc clientTesting.ReactionFunc +} + +func TestExpose(t *testing.T) { + vscName := "fake-vsc" + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + vsObject := &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + Status: &snapshotv1api.VolumeSnapshotStatus{ + BoundVolumeSnapshotContentName: &vscName, + ReadyToUse: boolptr.True(), + RestoreSize: &resource.Quantity{}, + }, + } + + var restoreSize int64 + vscObj := &snapshotv1api.VolumeSnapshotContent{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vsc", + }, + Spec: snapshotv1api.VolumeSnapshotContentSpec{ + DeletionPolicy: snapshotv1api.VolumeSnapshotContentDelete, + }, + Status: &snapshotv1api.VolumeSnapshotContentStatus{ + RestoreSize: &restoreSize, + }, + } + + tests := []struct { + name string + snapshotClientObj []runtime.Object + kubeClientObj []runtime.Object + ownerBackup *velerov1.Backup + snapshotName string + exposeParam CSISnapshotExposeParam + snapReactors []reactor + kubeReactors []reactor + err string + }{ + { + name: "wait vs ready fail", + snapshotName: "fake-vs", + ownerBackup: backup, + err: "error wait volume snapshot ready: error to get volumesnapshot /fake-vs: volumesnapshots.snapshot.storage.k8s.io \"fake-vs\" not found", + }, + { + name: "get vsc fail", + ownerBackup: backup, + snapshotName: "fake-vs", + exposeParam: CSISnapshotExposeParam{ + SourceNamespace: "fake-ns", + }, + snapshotClientObj: []runtime.Object{ + vsObject, + }, + err: "error to get volume snapshot content: error getting volume snapshot content from API: volumesnapshotcontents.snapshot.storage.k8s.io \"fake-vsc\" not found", + }, + { + name: "delete vs fail", + ownerBackup: backup, + snapshotName: "fake-vs", + exposeParam: CSISnapshotExposeParam{ + SourceNamespace: "fake-ns", + }, + snapshotClientObj: []runtime.Object{ + vsObject, + vscObj, + }, + snapReactors: []reactor{ + { + verb: "delete", + resource: "volumesnapshots", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-delete-error") + }, + }, + }, + err: "error to delete volume snapshot: error to delete volume snapshot: fake-delete-error", + }, + { + name: "delete vsc fail", + ownerBackup: backup, + snapshotName: "fake-vs", + exposeParam: CSISnapshotExposeParam{ + SourceNamespace: "fake-ns", + }, + snapshotClientObj: []runtime.Object{ + vsObject, + vscObj, + }, + snapReactors: []reactor{ + { + verb: "delete", + resource: "volumesnapshotcontents", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-delete-error") + }, + }, + }, + err: "error to delete volume snapshot content: error to delete volume snapshot content: fake-delete-error", + }, + { + name: "create backup vs fail", + ownerBackup: backup, + snapshotName: "fake-vs", + exposeParam: CSISnapshotExposeParam{ + SourceNamespace: "fake-ns", + }, + snapshotClientObj: []runtime.Object{ + vsObject, + vscObj, + }, + snapReactors: []reactor{ + { + verb: "create", + resource: "volumesnapshots", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-create-error") + }, + }, + }, + err: "error to create backup volume snapshot: fake-create-error", + }, + { + name: "create backup vsc fail", + ownerBackup: backup, + snapshotName: "fake-vs", + exposeParam: CSISnapshotExposeParam{ + SourceNamespace: "fake-ns", + }, + snapshotClientObj: []runtime.Object{ + vsObject, + vscObj, + }, + snapReactors: []reactor{ + { + verb: "create", + resource: "volumesnapshotcontents", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-create-error") + }, + }, + }, + err: "error to create backup volume snapshot content: fake-create-error", + }, + { + name: "create backup pvc fail, invalid access mode", + ownerBackup: backup, + snapshotName: "fake-vs", + exposeParam: CSISnapshotExposeParam{ + SourceNamespace: "fake-ns", + AccessMode: "fake-mode", + }, + snapshotClientObj: []runtime.Object{ + vsObject, + vscObj, + }, + err: "error to create backup pvc: unsupported access mode fake-mode", + }, + { + name: "create backup pvc fail", + ownerBackup: backup, + snapshotName: "fake-vs", + exposeParam: CSISnapshotExposeParam{ + SourceNamespace: "fake-ns", + AccessMode: AccessModeFileSystem, + }, + snapshotClientObj: []runtime.Object{ + vsObject, + vscObj, + }, + kubeReactors: []reactor{ + { + verb: "create", + resource: "persistentvolumeclaims", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-create-error") + }, + }, + }, + err: "error to create backup pvc: error to create pvc: fake-create-error", + }, + { + name: "create backup pod fail", + ownerBackup: backup, + snapshotName: "fake-vs", + exposeParam: CSISnapshotExposeParam{ + SourceNamespace: "fake-ns", + AccessMode: AccessModeFileSystem, + }, + snapshotClientObj: []runtime.Object{ + vsObject, + vscObj, + }, + kubeReactors: []reactor{ + { + verb: "create", + resource: "pods", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-create-error") + }, + }, + }, + err: "error to create backup pod: fake-create-error", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.snapshotClientObj...) + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.snapReactors { + fakeSnapshotClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + exposer := csiSnapshotExposer{ + kubeClient: fakeKubeClient, + csiSnapshotClient: fakeSnapshotClient.SnapshotV1(), + log: velerotest.NewLogger(), + } + + var ownerObject corev1.ObjectReference + if test.ownerBackup != nil { + ownerObject = corev1.ObjectReference{ + Kind: test.ownerBackup.Kind, + Namespace: test.ownerBackup.Namespace, + Name: test.ownerBackup.Name, + UID: test.ownerBackup.UID, + APIVersion: test.ownerBackup.APIVersion, + } + } + + err := exposer.Expose(context.Background(), ownerObject, test.snapshotName, time.Millisecond, &test.exposeParam) + assert.EqualError(t, err, test.err) + }) + } +} diff --git a/pkg/exposer/snapshot.go b/pkg/exposer/snapshot.go new file mode 100644 index 0000000000..505cd00790 --- /dev/null +++ b/pkg/exposer/snapshot.go @@ -0,0 +1,38 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "time" + + corev1 "k8s.io/api/core/v1" +) + +// SnapshotExposer is the interfaces for a snapshot exposer +type SnapshotExposer interface { + // Expose starts the process to expose a snapshot, the expose process may take long time + Expose(context.Context, corev1.ObjectReference, string, time.Duration, interface{}) error + + // GetExposed polls the status of the expose. + // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. + // Otherwise, it returns nil as the expose result without an error. + GetExposed(context.Context, corev1.ObjectReference, time.Duration, interface{}) (*ExposeResult, error) + + // CleanUp cleans up any objects generated during the snapshot expose + CleanUp(context.Context, corev1.ObjectReference, string, string) +} diff --git a/pkg/exposer/types.go b/pkg/exposer/types.go new file mode 100644 index 0000000000..91fe0d0666 --- /dev/null +++ b/pkg/exposer/types.go @@ -0,0 +1,37 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + corev1 "k8s.io/api/core/v1" +) + +const ( + AccessModeFileSystem = "by-file-system" +) + +// ExposeResult defines the result of expose. +// Varying from the type of the expose, the result may be different. +type ExposeResult struct { + ByPod ExposeByPod +} + +// ExposeByPod defines the result for the expose method that a hosting pod is created +type ExposeByPod struct { + HostingPod *corev1.Pod + PVC string +} diff --git a/pkg/util/csi/volume_snapshot.go b/pkg/util/csi/volume_snapshot.go new file mode 100644 index 0000000000..c6d665e746 --- /dev/null +++ b/pkg/util/csi/volume_snapshot.go @@ -0,0 +1,194 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "context" + "encoding/json" + "fmt" + "time" + + jsonpatch "github.com/evanphx/json-patch" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" + snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/typed/volumesnapshot/v1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +const ( + waitInternal = 2 * time.Second +) + +// WaitVolumeSnapshotReady waits a VS to become ready to use until the timeout reaches +func WaitVolumeSnapshotReady(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface, + volumeSnapshot string, volumeSnapshotNS string, timeout time.Duration) (*snapshotv1api.VolumeSnapshot, error) { + var updated *snapshotv1api.VolumeSnapshot + + err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) { + tmpVS, err := snapshotClient.VolumeSnapshots(volumeSnapshotNS).Get(ctx, volumeSnapshot, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrapf(err, fmt.Sprintf("error to get volumesnapshot %s/%s", volumeSnapshotNS, volumeSnapshot)) + } + + if tmpVS.Status == nil || tmpVS.Status.BoundVolumeSnapshotContentName == nil || !boolptr.IsSetToTrue(tmpVS.Status.ReadyToUse) || tmpVS.Status.RestoreSize == nil { + return false, nil + } + + updated = tmpVS + return true, nil + }) + + return updated, err +} + +// GetVolumeSnapshotContentForVolumeSnapshot returns the volumesnapshotcontent object associated with the volumesnapshot +func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnapshot, snapshotClient snapshotter.SnapshotV1Interface) (*snapshotv1api.VolumeSnapshotContent, error) { + if volSnap.Status == nil || volSnap.Status.BoundVolumeSnapshotContentName == nil { + return nil, errors.Errorf("invalid snapshot info in volume snapshot %s", volSnap.Name) + } + + vsc, err := snapshotClient.VolumeSnapshotContents().Get(context.TODO(), *volSnap.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error getting volume snapshot content from API") + } + + return vsc, nil +} + +// RetainVSC updates the VSC's deletion policy to Retain and return the update VSC +func RetainVSC(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface, + vsc *snapshotv1api.VolumeSnapshotContent) (*snapshotv1api.VolumeSnapshotContent, error) { + if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentRetain { + return nil, nil + } + origBytes, err := json.Marshal(vsc) + if err != nil { + return nil, errors.Wrap(err, "error marshaling original VSC") + } + + updated := vsc.DeepCopy() + updated.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain + + updatedBytes, err := json.Marshal(updated) + if err != nil { + return nil, errors.Wrap(err, "error marshaling updated VSC") + } + + patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes) + if err != nil { + return nil, errors.Wrap(err, "error creating json merge patch for VSC") + } + + retained, err := snapshotClient.VolumeSnapshotContents().Patch(ctx, vsc.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error patching VSC") + } + + return retained, nil +} + +// DeleteVolumeSnapshotContentIfAny deletes a VSC by name if it exists, and log an error when the deletion fails +func DeleteVolumeSnapshotContentIfAny(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface, vscName string, log logrus.FieldLogger) { + vsc, err := snapshotClient.VolumeSnapshotContents().Get(ctx, vscName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + log.WithError(err).Warnf("Abort deleting VSC, it doesn't exist %s", vscName) + } + } else { + err = snapshotClient.VolumeSnapshotContents().Delete(ctx, vsc.Name, metav1.DeleteOptions{}) + if err != nil { + log.WithError(err).Warnf("Failed to delete volume snapshot content %s", vsc.Name) + } + } +} + +// EnsureDeleteVS asserts the existence of a VS by name, deletes it and waits for its disappearance and returns errors on any failure +func EnsureDeleteVS(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface, + vsName string, vsNamespace string, timeout time.Duration) error { + err := snapshotClient.VolumeSnapshots(vsNamespace).Delete(ctx, vsName, metav1.DeleteOptions{}) + if err != nil { + return errors.Wrap(err, "error to delete volume snapshot") + } + + err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) { + _, err := snapshotClient.VolumeSnapshots(vsNamespace).Get(ctx, vsName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + + return false, errors.Wrapf(err, fmt.Sprintf("error to get VolumeSnapshot %s", vsName)) + } + + return false, nil + }) + + if err != nil { + return errors.Wrapf(err, "error to assure VolumeSnapshot is deleted, %s", vsName) + } + + return nil +} + +// EnsureDeleteVSC asserts the existence of a VSC by name, deletes it and waits for its disappearance and returns errors on any failure +func EnsureDeleteVSC(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface, + vscName string, timeout time.Duration) error { + err := snapshotClient.VolumeSnapshotContents().Delete(ctx, vscName, metav1.DeleteOptions{}) + if err != nil { + return errors.Wrap(err, "error to delete volume snapshot content") + } + + err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) { + _, err := snapshotClient.VolumeSnapshotContents().Get(ctx, vscName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + + return false, errors.Wrapf(err, fmt.Sprintf("error to get VolumeSnapshotContent %s", vscName)) + } + + return false, nil + }) + + if err != nil { + return errors.Wrapf(err, "error to assure VolumeSnapshotContent is deleted, %s", vscName) + } + + return nil +} + +// DeleteVolumeSnapshotIfAny deletes a VS by name if it exists, and log an error when the deletion fails +func DeleteVolumeSnapshotIfAny(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface, vsName string, vsNamespace string, log logrus.FieldLogger) { + err := snapshotClient.VolumeSnapshots(vsNamespace).Delete(ctx, vsName, metav1.DeleteOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + log.WithError(err).Debugf("Abort deleting volume snapshot, it doesn't exist %s/%s", vsNamespace, vsName) + } else { + log.WithError(err).Errorf("Failed to delete volume snapshot %s/%s", vsNamespace, vsName) + } + } +} diff --git a/pkg/util/csi/volume_snapshot_test.go b/pkg/util/csi/volume_snapshot_test.go new file mode 100644 index 0000000000..9c34f746a5 --- /dev/null +++ b/pkg/util/csi/volume_snapshot_test.go @@ -0,0 +1,357 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "context" + "errors" + "testing" + "time" + + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" + snapshotFake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientTesting "k8s.io/client-go/testing" + + "github.com/vmware-tanzu/velero/pkg/util/boolptr" +) + +type reactor struct { + verb string + resource string + reactorFunc clientTesting.ReactionFunc +} + +func TestWaitVolumeSnapshotReady(t *testing.T) { + vscName := "fake-vsc" + vsObj := &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + Status: &snapshotv1api.VolumeSnapshotStatus{ + BoundVolumeSnapshotContentName: &vscName, + ReadyToUse: boolptr.True(), + RestoreSize: &resource.Quantity{}, + }, + } + + tests := []struct { + name string + clientObj []runtime.Object + vsName string + namespace string + err string + expect *snapshotv1api.VolumeSnapshot + }{ + { + name: "get vs error", + vsName: "fake-vs-1", + namespace: "fake-ns-1", + err: "error to get volumesnapshot fake-ns-1/fake-vs-1: volumesnapshots.snapshot.storage.k8s.io \"fake-vs-1\" not found", + }, + { + name: "vs status is nil", + vsName: "fake-vs", + namespace: "fake-ns", + clientObj: []runtime.Object{ + &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + }, + }, + err: "timed out waiting for the condition", + }, + { + name: "vsc is nil in status", + vsName: "fake-vs", + namespace: "fake-ns", + clientObj: []runtime.Object{ + &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + Status: &snapshotv1api.VolumeSnapshotStatus{}, + }, + }, + err: "timed out waiting for the condition", + }, + { + name: "ready to use is nil in status", + vsName: "fake-vs", + namespace: "fake-ns", + clientObj: []runtime.Object{ + &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + Status: &snapshotv1api.VolumeSnapshotStatus{ + BoundVolumeSnapshotContentName: &vscName, + }, + }, + }, + err: "timed out waiting for the condition", + }, + { + name: "restore size is nil in status", + vsName: "fake-vs", + namespace: "fake-ns", + clientObj: []runtime.Object{ + &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + Status: &snapshotv1api.VolumeSnapshotStatus{ + BoundVolumeSnapshotContentName: &vscName, + ReadyToUse: boolptr.True(), + }, + }, + }, + err: "timed out waiting for the condition", + }, + { + name: "success", + vsName: "fake-vs", + namespace: "fake-ns", + clientObj: []runtime.Object{ + vsObj, + }, + expect: vsObj, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.clientObj...) + + vs, err := WaitVolumeSnapshotReady(context.Background(), fakeSnapshotClient.SnapshotV1(), test.vsName, test.namespace, time.Millisecond) + if err != nil { + assert.EqualError(t, err, test.err) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, test.expect, vs) + }) + } +} + +func TestGetVolumeSnapshotContentForVolumeSnapshot(t *testing.T) { + vscName := "fake-vsc" + vsObj := &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + Status: &snapshotv1api.VolumeSnapshotStatus{ + BoundVolumeSnapshotContentName: &vscName, + ReadyToUse: boolptr.True(), + RestoreSize: &resource.Quantity{}, + }, + } + + vscObj := &snapshotv1api.VolumeSnapshotContent{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vsc", + }, + } + + tests := []struct { + name string + snapshotObj *snapshotv1api.VolumeSnapshot + clientObj []runtime.Object + vsName string + namespace string + err string + expect *snapshotv1api.VolumeSnapshotContent + }{ + { + name: "vs status is nil", + vsName: "fake-vs", + namespace: "fake-ns", + snapshotObj: &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + }, + err: "invalid snapshot info in volume snapshot fake-vs", + }, + { + name: "vsc is nil in status", + vsName: "fake-vs", + namespace: "fake-ns", + snapshotObj: &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + }, + err: "invalid snapshot info in volume snapshot fake-vs", + }, + { + name: "get vsc fail", + vsName: "fake-vs", + namespace: "fake-ns", + snapshotObj: vsObj, + err: "error getting volume snapshot content from API: volumesnapshotcontents.snapshot.storage.k8s.io \"fake-vsc\" not found", + }, + { + name: "success", + vsName: "fake-vs", + namespace: "fake-ns", + snapshotObj: vsObj, + clientObj: []runtime.Object{vscObj}, + expect: vscObj, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.clientObj...) + + vs, err := GetVolumeSnapshotContentForVolumeSnapshot(test.snapshotObj, fakeSnapshotClient.SnapshotV1()) + if err != nil { + assert.EqualError(t, err, test.err) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, test.expect, vs) + }) + } +} + +func TestEnsureDeleteVS(t *testing.T) { + vsObj := &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vs", + Namespace: "fake-ns", + }, + } + + tests := []struct { + name string + clientObj []runtime.Object + vsName string + namespace string + reactors []reactor + err string + }{ + { + name: "delete fail", + vsName: "fake-vs", + namespace: "fake-ns", + err: "error to delete volume snapshot: volumesnapshots.snapshot.storage.k8s.io \"fake-vs\" not found", + }, + { + name: "wait fail", + vsName: "fake-vs", + namespace: "fake-ns", + clientObj: []runtime.Object{vsObj}, + reactors: []reactor{ + { + verb: "get", + resource: "volumesnapshots", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-get-error") + }, + }, + }, + err: "error to assure VolumeSnapshot is deleted, fake-vs: error to get VolumeSnapshot fake-vs: fake-get-error", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.clientObj...) + + for _, reactor := range test.reactors { + fakeSnapshotClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + err := EnsureDeleteVS(context.Background(), fakeSnapshotClient.SnapshotV1(), test.vsName, test.namespace, time.Millisecond) + if err != nil { + assert.EqualError(t, err, test.err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestEnsureDeleteVSC(t *testing.T) { + vscObj := &snapshotv1api.VolumeSnapshotContent{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vsc", + }, + } + + tests := []struct { + name string + clientObj []runtime.Object + reactors []reactor + vscName string + err string + }{ + { + name: "delete fail", + vscName: "fake-vsc", + err: "error to delete volume snapshot content: volumesnapshotcontents.snapshot.storage.k8s.io \"fake-vsc\" not found", + }, + { + name: "wait fail", + vscName: "fake-vsc", + clientObj: []runtime.Object{vscObj}, + reactors: []reactor{ + { + verb: "get", + resource: "volumesnapshotcontents", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-get-error") + }, + }, + }, + err: "error to assure VolumeSnapshotContent is deleted, fake-vsc: error to get VolumeSnapshotContent fake-vsc: fake-get-error", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.clientObj...) + + for _, reactor := range test.reactors { + fakeSnapshotClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + err := EnsureDeleteVSC(context.Background(), fakeSnapshotClient.SnapshotV1(), test.vscName, time.Millisecond) + if err != nil { + assert.EqualError(t, err, test.err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index ec42504acb..be874a37ae 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -16,8 +16,14 @@ limitations under the License. package kube import ( + "context" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" ) // IsPodRunning does a well-rounded check to make sure the specified pod is running stably. @@ -63,3 +69,15 @@ func isPodScheduledInStatus(pod *corev1api.Pod, statusCheckFunc func(*corev1api. return nil } + +// DeletePodIfAny deletes a pod by name if it exists, and log an error when the deletion fails +func DeletePodIfAny(ctx context.Context, podGetter corev1client.CoreV1Interface, podName string, podNamespace string, log logrus.FieldLogger) { + err := podGetter.Pods(podNamespace).Delete(ctx, podName, metav1.DeleteOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + log.WithError(err).Debugf("Abort deleting pod, it doesn't exist %s/%s", podNamespace, podName) + } else { + log.WithError(err).Errorf("Failed to delete pod %s/%s", podNamespace, podName) + } + } +} diff --git a/pkg/util/kube/pvc_pv.go b/pkg/util/kube/pvc_pv.go new file mode 100644 index 0000000000..f04676d1f7 --- /dev/null +++ b/pkg/util/kube/pvc_pv.go @@ -0,0 +1,79 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1api "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" +) + +const ( + waitInternal = 2 * time.Second +) + +// DeletePVCIfAny deletes a PVC by name if it exists, and log an error when the deletion fails +func DeletePVCIfAny(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvcName string, pvcNamespace string, log logrus.FieldLogger) { + err := pvcGetter.PersistentVolumeClaims(pvcNamespace).Delete(ctx, pvcName, metav1.DeleteOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + log.WithError(err).Debugf("Abort deleting PVC, it doesn't exist, %s/%s", pvcNamespace, pvcName) + } else { + log.WithError(err).Errorf("Failed to delete pvc %s/%s", pvcNamespace, pvcName) + } + } +} + +// WaitPVCBound wait for binding of a PVC specified by name and returns the bound PV object +func WaitPVCBound(ctx context.Context, pvcGetter corev1client.CoreV1Interface, + pvGetter corev1client.CoreV1Interface, pvc string, namespace string, timeout time.Duration) (*corev1api.PersistentVolume, error) { + var updated *corev1api.PersistentVolumeClaim + err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) { + tmpPVC, err := pvcGetter.PersistentVolumeClaims(namespace).Get(ctx, pvc, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrapf(err, fmt.Sprintf("error to get pvc %s/%s", namespace, pvc)) + } + + if tmpPVC.Spec.VolumeName == "" { + return false, nil + } + + updated = tmpPVC + + return true, nil + }) + + if err != nil { + return nil, errors.Wrap(err, "error to wait for rediness of PVC") + } + + pv, err := pvGetter.PersistentVolumes().Get(ctx, updated.Spec.VolumeName, metav1.GetOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error to get PV") + } + + return pv, err +} diff --git a/pkg/util/kube/pvc_pv_test.go b/pkg/util/kube/pvc_pv_test.go new file mode 100644 index 0000000000..4713e2e3a8 --- /dev/null +++ b/pkg/util/kube/pvc_pv_test.go @@ -0,0 +1,131 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + + corev1api "k8s.io/api/core/v1" + + clientTesting "k8s.io/client-go/testing" +) + +type reactor struct { + verb string + resource string + reactorFunc clientTesting.ReactionFunc +} + +func TestWaitPVCBound(t *testing.T) { + pvcObject := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-namespace", + Name: "fake-pvc", + }, + } + + pvcObjectBound := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-namespace", + Name: "fake-pvc", + }, + Spec: corev1api.PersistentVolumeClaimSpec{ + VolumeName: "fake-pv", + }, + } + + pvObj := &corev1api.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-pv", + }, + } + + tests := []struct { + name string + pvcName string + pvcNamespace string + kubeClientObj []runtime.Object + kubeReactors []reactor + expected *corev1api.PersistentVolume + err string + }{ + { + name: "wait pvc error", + pvcName: "fake-pvc", + pvcNamespace: "fake-namespace", + err: "error to wait for rediness of PVC: error to get pvc fake-namespace/fake-pvc: persistentvolumeclaims \"fake-pvc\" not found", + }, + { + name: "wait pvc timeout", + pvcName: "fake-pvc", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObject, + }, + err: "error to wait for rediness of PVC: timed out waiting for the condition", + }, + { + name: "get pv fail", + pvcName: "fake-pvc", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObjectBound, + }, + err: "error to get PV: persistentvolumes \"fake-pv\" not found", + }, + { + name: "success", + pvcName: "fake-pvc", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObjectBound, + pvObj, + }, + expected: pvObj, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + var kubeClient kubernetes.Interface = fakeKubeClient + + pv, err := WaitPVCBound(context.Background(), kubeClient.CoreV1(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, time.Millisecond) + + if err != nil { + assert.EqualError(t, err, test.err) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, test.expected, pv) + }) + } +}