From 5b6f3f3893578bf88c044307c85a9f5bb9ff03c0 Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Mon, 6 Nov 2023 20:32:45 +0800 Subject: [PATCH] Add DataUpload Result and CSI VolumeSnapshot check for restore PV. Signed-off-by: Xun Jiang --- pkg/builder/volume_snapshot_builder.go | 5 + pkg/controller/restore_controller.go | 6 + pkg/restore/request.go | 2 + pkg/restore/restore.go | 64 +++++++ pkg/restore/restore_test.go | 246 ++++++++++++++++++++++++- pkg/test/resources.go | 11 ++ 6 files changed, 329 insertions(+), 5 deletions(-) diff --git a/pkg/builder/volume_snapshot_builder.go b/pkg/builder/volume_snapshot_builder.go index 19815c0f05..bbaedd16ef 100644 --- a/pkg/builder/volume_snapshot_builder.go +++ b/pkg/builder/volume_snapshot_builder.go @@ -67,3 +67,8 @@ func (v *VolumeSnapshotBuilder) BoundVolumeSnapshotContentName(vscName string) * v.object.Status.BoundVolumeSnapshotContentName = &vscName return v } + +func (v *VolumeSnapshotBuilder) SourcePVC(name string) *VolumeSnapshotBuilder { + v.object.Spec.Source.PersistentVolumeClaimName = &name + return v +} diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index 5d6ed505ec..f6b9b39d96 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -515,6 +515,11 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu return errors.Wrap(err, "error fetching volume snapshots metadata") } + csiVolumeSnapshots, err := backupStore.GetCSIVolumeSnapshots(restore.Spec.BackupName) + if err != nil { + return errors.Wrap(err, "fail to fetch CSI VolumeSnapshots metadata") + } + restoreLog.Info("starting restore") var podVolumeBackups []*api.PodVolumeBackup @@ -531,6 +536,7 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu BackupReader: backupFile, ResourceModifiers: resourceModifiers, DisableInformerCache: r.disableInformerCache, + CSIVolumeSnapshots: csiVolumeSnapshots, } restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager) diff --git a/pkg/restore/request.go b/pkg/restore/request.go index dcc2ef3d62..2a267a5ffc 100644 --- a/pkg/restore/request.go +++ b/pkg/restore/request.go @@ -21,6 +21,7 @@ import ( "io" "sort" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime" @@ -60,6 +61,7 @@ type Request struct { itemOperationsList *[]*itemoperation.RestoreOperation ResourceModifiers *resourcemodifiers.ResourceModifiers DisableInformerCache bool + CSIVolumeSnapshots []*snapshotv1api.VolumeSnapshot } type restoredItemStatus struct { diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index dee68d33ff..ff6735fb24 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -30,6 +30,7 @@ import ( "time" "github.com/google/uuid" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" @@ -299,6 +300,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( pvsToProvision: sets.NewString(), pvRestorer: pvRestorer, volumeSnapshots: req.VolumeSnapshots, + csiVolumeSnapshots: req.CSIVolumeSnapshots, podVolumeBackups: req.PodVolumeBackups, resourceTerminatingTimeout: kr.resourceTerminatingTimeout, resourceTimeout: kr.resourceTimeout, @@ -348,6 +350,7 @@ type restoreContext struct { pvsToProvision sets.String pvRestorer PVRestorer volumeSnapshots []*volume.Snapshot + csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot podVolumeBackups []*velerov1api.PodVolumeBackup resourceTerminatingTimeout time.Duration resourceTimeout time.Duration @@ -1295,6 +1298,22 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso // want to dynamically re-provision it. return warnings, errs, itemExists + case hasCSIVolumeSnapshot(ctx, obj): + ctx.log.Infof("Dynamically re-provisioning persistent volume because it has a related CSI VolumeSnapshot.") + ctx.pvsToProvision.Insert(name) + + // Return early because we don't want to restore the PV itself, we + // want to dynamically re-provision it. + return warnings, errs, itemExists + + case hasSnapshotDataUpload(ctx, obj): + ctx.log.Infof("Dynamically re-provisioning persistent volume because it has a related snapshot DataUpload.") + ctx.pvsToProvision.Insert(name) + + // Return early because we don't want to restore the PV itself, we + // want to dynamically re-provision it. + return warnings, errs, itemExists + case hasDeleteReclaimPolicy(obj.Object): ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.") ctx.pvsToProvision.Insert(name) @@ -1937,6 +1956,51 @@ func hasSnapshot(pvName string, snapshots []*volume.Snapshot) bool { return false } +func hasCSIVolumeSnapshot(ctx *restoreContext, unstructuredPV *unstructured.Unstructured) bool { + pv := new(v1.PersistentVolume) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPV.Object, pv); err != nil { + ctx.log.WithError(err).Warnf("Unable to convert PV from unstructured to structured") + return false + } + + for _, vs := range ctx.csiVolumeSnapshots { + if pv.Spec.ClaimRef.Name == *vs.Spec.Source.PersistentVolumeClaimName && + pv.Spec.ClaimRef.Namespace == vs.Namespace { + return true + } + } + return false +} + +func hasSnapshotDataUpload(ctx *restoreContext, unstructuredPV *unstructured.Unstructured) bool { + pv := new(v1.PersistentVolume) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPV.Object, pv); err != nil { + ctx.log.WithError(err).Warnf("Unable to convert PV from unstructured to structured") + return false + } + + dataUploadResultList := new(v1.ConfigMapList) + err := ctx.kbClient.List(go_context.TODO(), dataUploadResultList, &crclient.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + velerov1api.RestoreUIDLabel: label.GetValidName(string(ctx.restore.GetUID())), + velerov1api.PVCNamespaceNameLabel: label.GetValidName(pv.Spec.ClaimRef.Namespace + "." + pv.Spec.ClaimRef.Name), + velerov1api.ResourceUsageLabel: label.GetValidName(string(velerov1api.VeleroResourceUsageDataUploadResult)), + }), + }) + if err != nil { + ctx.log.WithError(err).Warnf("Fail to list DataUpload result CM.") + return false + } + + if len(dataUploadResultList.Items) != 1 { + ctx.log.WithError(fmt.Errorf("dataupload result number is not expected")). + Warnf("Got %d DataUpload result. Expect one.", len(dataUploadResultList.Items)) + return false + } + + return true +} + func hasPodVolumeBackup(unstructuredPV *unstructured.Unstructured, ctx *restoreContext) bool { if len(ctx.podVolumeBackups) == 0 { return false diff --git a/pkg/restore/restore_test.go b/pkg/restore/restore_test.go index 4ffd762575..592f90250e 100644 --- a/pkg/restore/restore_test.go +++ b/pkg/restore/restore_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -2256,6 +2257,7 @@ func (*volumeSnapshotter) DeleteSnapshot(snapshotID string) error { // Verification is done by looking at the contents of the API and the metadata/spec/status of // the items in the API. func TestRestorePersistentVolumes(t *testing.T) { + testPVCName := "testPVC" tests := []struct { name string restore *velerov1api.Restore @@ -2265,6 +2267,8 @@ func TestRestorePersistentVolumes(t *testing.T) { volumeSnapshots []*volume.Snapshot volumeSnapshotLocations []*velerov1api.VolumeSnapshotLocation volumeSnapshotterGetter volumeSnapshotterGetter + csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot + dataUploadResult *corev1api.ConfigMap want []*test.APIResource wantError bool wantWarning bool @@ -2923,6 +2927,77 @@ func TestRestorePersistentVolumes(t *testing.T) { ), }, }, + { + name: "when a PV with a reclaim policy of retain has a CSI VolumeSnapshot and does not exist in-cluster, the PV is not restored", + restore: defaultRestore().Result(), + backup: defaultBackup().Result(), + tarball: test.NewTarWriter(t). + AddItems("persistentvolumes", + builder.ForPersistentVolume("pv-1"). + ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain). + ClaimRef("velero", testPVCName). + Result(), + ). + Done(), + apiResources: []*test.APIResource{ + test.PVs(), + test.PVCs(), + }, + csiVolumeSnapshots: []*snapshotv1api.VolumeSnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test", + }, + Spec: snapshotv1api.VolumeSnapshotSpec{ + Source: snapshotv1api.VolumeSnapshotSource{ + PersistentVolumeClaimName: &testPVCName, + }, + }, + }, + }, + volumeSnapshotLocations: []*velerov1api.VolumeSnapshotLocation{ + builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "default").Provider("provider-1").Result(), + }, + volumeSnapshotterGetter: map[string]vsv1.VolumeSnapshotter{ + "provider-1": &volumeSnapshotter{ + snapshotVolumes: map[string]string{"snapshot-1": "new-volume"}, + }, + }, + want: []*test.APIResource{}, + }, + { + name: "when a PV with a reclaim policy of retain has a DataUpload result CM and does not exist in-cluster, the PV is not restored", + restore: defaultRestore().ObjectMeta(builder.WithUID("fakeUID")).Result(), + backup: defaultBackup().Result(), + tarball: test.NewTarWriter(t). + AddItems("persistentvolumes", + builder.ForPersistentVolume("pv-1"). + ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain). + ClaimRef("velero", testPVCName). + Result(), + ). + Done(), + apiResources: []*test.APIResource{ + test.PVs(), + test.PVCs(), + test.ConfigMaps(), + }, + volumeSnapshotLocations: []*velerov1api.VolumeSnapshotLocation{ + builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "default").Provider("provider-1").Result(), + }, + volumeSnapshotterGetter: map[string]vsv1.VolumeSnapshotter{ + "provider-1": &volumeSnapshotter{ + snapshotVolumes: map[string]string{"snapshot-1": "new-volume"}, + }, + }, + dataUploadResult: builder.ForConfigMap("velero", "test").ObjectMeta(builder.WithLabelsMap(map[string]string{ + velerov1api.RestoreUIDLabel: "fakeUID", + velerov1api.PVCNamespaceNameLabel: "velero/testPVC", + velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult), + })).Result(), + want: []*test.APIResource{}, + }, } for _, tc := range tests { @@ -2939,6 +3014,10 @@ func TestRestorePersistentVolumes(t *testing.T) { require.NoError(t, h.restorer.kbClient.Create(context.Background(), vsl)) } + if tc.dataUploadResult != nil { + require.NoError(t, h.restorer.kbClient.Create(context.TODO(), tc.dataUploadResult)) + } + for _, r := range tc.apiResources { h.AddItems(t, r) } @@ -2955,11 +3034,12 @@ func TestRestorePersistentVolumes(t *testing.T) { } data := &Request{ - Log: h.log, - Restore: tc.restore, - Backup: tc.backup, - VolumeSnapshots: tc.volumeSnapshots, - BackupReader: tc.tarball, + Log: h.log, + Restore: tc.restore, + Backup: tc.backup, + VolumeSnapshots: tc.volumeSnapshots, + BackupReader: tc.tarball, + CSIVolumeSnapshots: tc.csiVolumeSnapshots, } warnings, errs := h.restorer.Restore( data, @@ -3652,3 +3732,159 @@ func TestIsAlreadyExistsError(t *testing.T) { }) } } + +func TestHasCSIVolumeSnapshot(t *testing.T) { + tests := []struct { + name string + vs *snapshotv1api.VolumeSnapshot + obj *unstructured.Unstructured + expectedResult bool + }{ + { + name: "Invalid PV, expect false.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": 1, + }, + }, + expectedResult: false, + }, + { + name: "Cannot find VS, expect false", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + }, + }, + expectedResult: false, + }, + { + name: "Find VS, expect true.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "velero", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "test", + }, + }, + }, + }, + vs: builder.ForVolumeSnapshot("velero", "test").SourcePVC("test").Result(), + expectedResult: true, + }, + } + + for _, tc := range tests { + h := newHarness(t) + + ctx := &restoreContext{ + log: h.log, + } + + if tc.vs != nil { + ctx.csiVolumeSnapshots = []*snapshotv1api.VolumeSnapshot{tc.vs} + } + + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expectedResult, hasCSIVolumeSnapshot(ctx, tc.obj)) + }) + } +} + +func TestHasSnapshotDataUpload(t *testing.T) { + tests := []struct { + name string + duResult *corev1api.ConfigMap + obj *unstructured.Unstructured + expectedResult bool + restore *velerov1api.Restore + }{ + { + name: "Invalid PV, expect false.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": 1, + }, + }, + expectedResult: false, + }, + { + name: "Cannot find DataUploadResult CM, expect false", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "testPVC", + }, + }, + }, + }, + duResult: builder.ForConfigMap("velero", "test").Result(), + restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(), + expectedResult: false, + }, + { + name: "Find DataUploadResult CM, expect true", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "testPVC", + }, + }, + }, + }, + duResult: builder.ForConfigMap("velero", "test").ObjectMeta(builder.WithLabelsMap(map[string]string{ + velerov1api.RestoreUIDLabel: "fakeUID", + velerov1api.PVCNamespaceNameLabel: "velero/testPVC", + velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult), + })).Result(), + restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(), + expectedResult: false, + }, + } + + for _, tc := range tests { + h := newHarness(t) + + ctx := &restoreContext{ + log: h.log, + kbClient: h.restorer.kbClient, + restore: tc.restore, + } + + if tc.duResult != nil { + require.NoError(t, ctx.kbClient.Create(context.TODO(), tc.duResult)) + } + + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expectedResult, hasSnapshotDataUpload(ctx, tc.obj)) + }) + } +} diff --git a/pkg/test/resources.go b/pkg/test/resources.go index 7c2fa17f65..709497fca4 100644 --- a/pkg/test/resources.go +++ b/pkg/test/resources.go @@ -142,6 +142,17 @@ func ServiceAccounts(items ...metav1.Object) *APIResource { } } +func ConfigMaps(items ...metav1.Object) *APIResource { + return &APIResource{ + Group: "", + Version: "v1", + Name: "configmaps", + ShortName: "cm", + Namespaced: true, + Items: items, + } +} + func CRDs(items ...metav1.Object) *APIResource { return &APIResource{ Group: "apiextensions.k8s.io",