Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Patch to add finalizers, Get updated objects when removing finalizer with update fails with conflict. #1023

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/evanphx/json-patch v5.9.0+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.0
github.com/kubernetes-csi/csi-lib-utils v0.17.0
github.com/kubernetes-csi/csi-test/v5 v5.2.0
Expand Down Expand Up @@ -40,7 +41,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
54 changes: 50 additions & 4 deletions pkg/common-controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func (r *snapshotReactor) React(action core.Action) (handled bool, ret runtime.O
// Check and bump object version
storedSnapshotContent, found := r.contents[action.GetName()]
if found {
// don't modify existing object
storedSnapshotContent = storedSnapshotContent.DeepCopy()
// Apply patch
storedSnapshotBytes, err := json.Marshal(storedSnapshotContent)
if err != nil {
Expand Down Expand Up @@ -335,6 +337,8 @@ func (r *snapshotReactor) React(action core.Action) (handled bool, ret runtime.O
// Check and bump object version
storedSnapshot, found := r.snapshots[action.GetName()]
if found {
// don't modify existing object
storedSnapshot = storedSnapshot.DeepCopy()
// Apply patch
storedSnapshotBytes, err := json.Marshal(storedSnapshot)
if err != nil {
Expand All @@ -349,7 +353,8 @@ func (r *snapshotReactor) React(action core.Action) (handled bool, ret runtime.O
if err != nil {
return true, nil, err
}

// following unmarshal removes the time millisecond precision which was present in the original object
// make sure time used in tests are in seconds precision
err = json.Unmarshal(modified, storedSnapshot)
if err != nil {
return true, nil, err
Expand Down Expand Up @@ -457,6 +462,44 @@ func (r *snapshotReactor) React(action core.Action) (handled bool, ret runtime.O
klog.V(4).Infof("saved updated claim %s", claim.Name)
return true, claim, nil

case action.Matches("patch", "persistentvolumeclaims"):
claim := &v1.PersistentVolumeClaim{}
action := action.(core.PatchAction)

// Check and bump object version
storedClaim, found := r.claims[action.GetName()]
if found {
// don't modify existing object
storedClaim = storedClaim.DeepCopy()
// Apply patch
storedClaimBytes, err := json.Marshal(storedClaim)
if err != nil {
return true, nil, err
}
claimPatch, err := jsonpatch.DecodePatch(action.GetPatch())
if err != nil {
return true, nil, err
}
modified, err := claimPatch.Apply(storedClaimBytes)
if err != nil {
return true, nil, err
}
err = json.Unmarshal(modified, claim)
if err != nil {
return true, nil, err
}
storedVer, _ := strconv.Atoi(claim.ResourceVersion)
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("cannot update claim %s: claim not found", action.GetName())
}
// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated claim %s", claim.Name)
return

case action.Matches("get", "secrets"):
name := action.(core.GetAction).GetName()
secret, found := r.secrets[name]
Expand Down Expand Up @@ -811,6 +854,7 @@ func newSnapshotReactor(kubeClient *kubefake.Clientset, client *fake.Clientset,
client.AddReactor("delete", "volumesnapshotclasses", reactor.React)
kubeClient.AddReactor("get", "persistentvolumeclaims", reactor.React)
kubeClient.AddReactor("update", "persistentvolumeclaims", reactor.React)
kubeClient.AddReactor("patch", "persistentvolumeclaims", reactor.React)
kubeClient.AddReactor("get", "persistentvolumes", reactor.React)
kubeClient.AddReactor("get", "secrets", reactor.React)

Expand Down Expand Up @@ -1261,7 +1305,6 @@ func testAddPVCFinalizer(ctrl *csiSnapshotCommonController, reactor *snapshotRea
func testRemovePVCFinalizer(ctrl *csiSnapshotCommonController, reactor *snapshotReactor, test controllerTest) error {
return ctrl.checkandRemovePVCFinalizer(test.initialSnapshots[0], false)
}

func testAddSnapshotFinalizer(ctrl *csiSnapshotCommonController, reactor *snapshotReactor, test controllerTest) error {
return ctrl.addSnapshotFinalizer(test.initialSnapshots[0], true, true)
}
Expand All @@ -1274,6 +1317,10 @@ func testRemoveSnapshotFinalizer(ctrl *csiSnapshotCommonController, reactor *sna
return ctrl.removeSnapshotFinalizer(test.initialSnapshots[0], true, true, false)
}

func testRemoveSnapshotFinalizerAfterUpdateConflict(ctrl *csiSnapshotCommonController, reactor *snapshotReactor, test controllerTest) error {
return ctrl.removeSnapshotFinalizer(test.initialSnapshots[0], true, true, false)
}

func testUpdateSnapshotClass(ctrl *csiSnapshotCommonController, reactor *snapshotReactor, test controllerTest) error {
snap, err := ctrl.checkAndUpdateSnapshotClass(test.initialSnapshots[0])
// syncSnapshotByKey expects that checkAndUpdateSnapshotClass always returns a snapshot
Expand Down Expand Up @@ -1425,7 +1472,6 @@ func runFinalizerTests(t *testing.T, tests []controllerTest, snapshotClasses []*
snapshotscheme.AddToScheme(scheme.Scheme)
for _, test := range tests {
klog.V(4).Infof("starting test %q", test.name)

// Initialize the controller
kubeClient := &kubefake.Clientset{}
client := &fake.Clientset{}
Expand Down Expand Up @@ -1468,7 +1514,7 @@ func runFinalizerTests(t *testing.T, tests []controllerTest, snapshotClasses []*

// Run the tested functions
err = test.test(ctrl, reactor, test)
if err != nil {
if test.expectSuccess && err != nil {
t.Errorf("Test %q failed: %v", test.name, err)
}

Expand Down
62 changes: 14 additions & 48 deletions pkg/common-controller/groupsnapshot_controller_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -154,8 +154,7 @@ func (ctrl *csiSnapshotCommonController) SetDefaultGroupSnapshotClass(groupSnaps
}
klog.V(5).Infof("setDefaultGroupSnapshotClass [%s]: default VolumeGroupSnapshotClassName [%s]", groupSnapshot.Name, defaultClasses[0].Name)
groupSnapshotClone := groupSnapshot.DeepCopy()
groupSnapshotClone.Spec.VolumeGroupSnapshotClassName = &(defaultClasses[0].Name)
newGroupSnapshot, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).Update(context.TODO(), groupSnapshotClone, metav1.UpdateOptions{})
newGroupSnapshot, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).Patch(context.TODO(), groupSnapshotClone.Name, types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"volumeGroupSnapshotClassName":"%s"}}`, defaultClasses[0].Name)), metav1.PatchOptions{})
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshot[%s] default group snapshot class failed %v", utils.GroupSnapshotKey(groupSnapshot), err)
}
Expand Down Expand Up @@ -780,7 +779,7 @@ func (ctrl *csiSnapshotCommonController) createGroupSnapshotContent(groupSnapsho
klog.V(5).Infof("volume group snapshot content %#v", groupSnapshotContent)
// Try to create the VolumeGroupSnapshotContent object
klog.V(5).Infof("createGroupSnapshotContent [%s]: trying to save volume group snapshot content %s", utils.GroupSnapshotKey(groupSnapshot), groupSnapshotContent.Name)
if updateGroupSnapshotContent, err = ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Create(context.TODO(), groupSnapshotContent, metav1.CreateOptions{}); err == nil || apierrs.IsAlreadyExists(err) {
if updateGroupSnapshotContent, err = ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Create(context.TODO(), groupSnapshotContent, metav1.CreateOptions{}); err == nil || errors.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
klog.V(3).Infof("volume group snapshot content %q for group snapshot %q already exists, reusing", groupSnapshotContent.Name, utils.GroupSnapshotKey(groupSnapshot))
Expand Down Expand Up @@ -954,21 +953,7 @@ func (ctrl *csiSnapshotCommonController) needsUpdateGroupSnapshotStatus(groupSna
// addGroupSnapshotContentFinalizer adds a Finalizer for VolumeGroupSnapshotContent.
func (ctrl *csiSnapshotCommonController) addGroupSnapshotContentFinalizer(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
var patches []utils.PatchOp
if len(groupSnapshotContent.Finalizers) > 0 {
// Add to the end of the finalizers if we have any other finalizers
patches = append(patches, utils.PatchOp{
Op: "add",
Path: "/metadata/finalizers/-",
Value: utils.VolumeGroupSnapshotContentFinalizer,
})
} else {
// Replace finalizers with new array if there are no other finalizers
patches = append(patches, utils.PatchOp{
Op: "add",
Path: "/metadata/finalizers",
Value: []string{utils.VolumeGroupSnapshotContentFinalizer},
})
}
patches = append(patches, utils.PatchOpsToAddFinalizers(groupSnapshotContent, utils.VolumeGroupSnapshotContentFinalizer)...)
newGroupSnapshotContent, err := utils.PatchVolumeGroupSnapshotContent(groupSnapshotContent, patches, ctrl.clientset)
if err != nil {
return newControllerUpdateError(groupSnapshotContent.Name, err.Error())
Expand Down Expand Up @@ -1018,33 +1003,15 @@ func (ctrl *csiSnapshotCommonController) addGroupSnapshotFinalizer(groupSnapshot
var updatedGroupSnapshot *crdv1alpha1.VolumeGroupSnapshot
var err error

// NOTE(ggriffiths): Must perform an update if no finalizers exist.
// Unable to find a patch that correctly updated the finalizers if none currently exist.
if len(groupSnapshot.ObjectMeta.Finalizers) == 0 {
groupSnapshotClone := groupSnapshot.DeepCopy()
if addBoundFinalizer {
groupSnapshotClone.ObjectMeta.Finalizers = append(groupSnapshotClone.ObjectMeta.Finalizers, utils.VolumeGroupSnapshotBoundFinalizer)
}
updatedGroupSnapshot, err = ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).Update(context.TODO(), groupSnapshotClone, metav1.UpdateOptions{})
if err != nil {
return newControllerUpdateError(utils.GroupSnapshotKey(groupSnapshot), err.Error())
}
} else {
// Otherwise, perform a patch
var patches []utils.PatchOp
var patches []utils.PatchOp

if addBoundFinalizer {
patches = append(patches, utils.PatchOp{
Op: "add",
Path: "/metadata/finalizers/-",
Value: utils.VolumeGroupSnapshotBoundFinalizer,
})
}
if addBoundFinalizer {
patches = append(patches, utils.PatchOpsToAddFinalizers(groupSnapshot, utils.VolumeGroupSnapshotBoundFinalizer)...)
}

updatedGroupSnapshot, err = utils.PatchVolumeGroupSnapshot(groupSnapshot, patches, ctrl.clientset)
if err != nil {
return newControllerUpdateError(utils.GroupSnapshotKey(groupSnapshot), err.Error())
}
updatedGroupSnapshot, err = utils.PatchVolumeGroupSnapshot(groupSnapshot, patches, ctrl.clientset)
if err != nil {
return newControllerUpdateError(utils.GroupSnapshotKey(groupSnapshot), err.Error())
}

_, err = ctrl.storeGroupSnapshotUpdate(updatedGroupSnapshot)
Expand Down Expand Up @@ -1114,7 +1081,7 @@ func (ctrl *csiSnapshotCommonController) processGroupSnapshotWithDeletionTimesta
for _, snapshotRef := range groupSnapshot.Status.VolumeSnapshotRefList {
snapshot, err := ctrl.snapshotLister.VolumeSnapshots(snapshotRef.Namespace).Get(snapshotRef.Name)
if err != nil {
if apierrs.IsNotFound(err) {
if errors.IsNotFound(err) {
continue
}
return err
Expand Down Expand Up @@ -1161,7 +1128,7 @@ func (ctrl *csiSnapshotCommonController) processGroupSnapshotWithDeletionTimesta
// Delete the individual snapshots part of the group snapshot
for _, snapshot := range groupSnapshot.Status.VolumeSnapshotRefList {
err := ctrl.clientset.SnapshotV1().VolumeSnapshots(snapshot.Namespace).Delete(context.TODO(), snapshot.Name, metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
if err != nil && !errors.IsNotFound(err) {
msg := fmt.Sprintf("failed to delete snapshot API object %s/%s part of group snapshot %s: %v", snapshot.Namespace, snapshot.Name, utils.GroupSnapshotKey(groupSnapshot), err)
klog.Error(msg)
ctrl.eventRecorder.Event(groupSnapshot, v1.EventTypeWarning, "SnapshotDeleteError", msg)
Expand Down Expand Up @@ -1224,8 +1191,7 @@ func (ctrl *csiSnapshotCommonController) removeGroupSnapshotFinalizer(groupSnaps
// TODO: Remove PVC Finalizer

groupSnapshotClone := groupSnapshot.DeepCopy()
groupSnapshotClone.ObjectMeta.Finalizers = utils.RemoveString(groupSnapshotClone.ObjectMeta.Finalizers, utils.VolumeGroupSnapshotBoundFinalizer)
newGroupSnapshot, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).Update(context.TODO(), groupSnapshotClone, metav1.UpdateOptions{})
newGroupSnapshot, err := utils.UpdateRemoveFinalizersSnapshots(groupSnapshotClone, ctrl.clientset, utils.VolumeGroupSnapshotBoundFinalizer)
if err != nil {
return newControllerUpdateError(groupSnapshot.Name, err.Error())
}
Expand Down
100 changes: 100 additions & 0 deletions pkg/common-controller/groupsnapshot_controller_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common_controller

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumegroupsnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/fake"
"github.com/kubernetes-csi/external-snapshotter/v7/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
// crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumegroupsnapshot/v1alpha1"
// clientset "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned"
// groupsnapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v7/listers/volumegroupsnapshot/v1alpha1"
// snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v7/listers/volumesnapshot/v1"
// "github.com/kubernetes-csi/external-snapshotter/v2/pkg/metrics"
// "k8s.io/client-go/kubernetes"
// corelisters "k8s.io/client-go/listers/core/v1"
// "k8s.io/client-go/tools/cache"
// "k8s.io/client-go/tools/record"
// "k8s.io/client-go/util/workqueue"
)

func Test_csiSnapshotCommonController_removeGroupSnapshotFinalizer(t *testing.T) {
type args struct {
groupSnapshot *crdv1alpha1.VolumeGroupSnapshot
removeBoundFinalizer bool
}
tests := []struct {
name string
args args
wantErr bool
expectedFinalizers []string
}{
{
name: "Test removeGroupSnapshotFinalizer",
args: args{
removeBoundFinalizer: true,
groupSnapshot: &crdv1alpha1.VolumeGroupSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "test-group-snapshot",
Finalizers: []string{utils.VolumeGroupSnapshotBoundFinalizer},
},
},
},
},
{
name: "Test removeGroupSnapshotFinalizer and not something else",
args: args{
removeBoundFinalizer: true,
groupSnapshot: &crdv1alpha1.VolumeGroupSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "test-group-snapshot",
Finalizers: []string{"somethingElse", utils.VolumeGroupSnapshotBoundFinalizer},
},
},
},
expectedFinalizers: []string{"somethingElse"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := &csiSnapshotCommonController{
clientset: fake.NewSimpleClientset(tt.args.groupSnapshot),
groupSnapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
}
if err := ctrl.removeGroupSnapshotFinalizer(tt.args.groupSnapshot, tt.args.removeBoundFinalizer); (err != nil) != tt.wantErr {
t.Errorf("csiSnapshotCommonController.removeGroupSnapshotFinalizer() error = %v, wantErr %v", err, tt.wantErr)
}
vgs, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(tt.args.groupSnapshot.Namespace).Get(context.TODO(), tt.args.groupSnapshot.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Error getting volume group snapshot: %v", err)
}
if tt.expectedFinalizers == nil && vgs.Finalizers != nil {
tt.expectedFinalizers = []string{} // if expectedFinalizers is nil, then it should be an empty slice so that cmp.Diff does not panic
}
if vgs.Finalizers != nil && cmp.Diff(vgs.Finalizers, tt.expectedFinalizers) != "" {
t.Errorf("Finalizers not expected: %v", cmp.Diff(vgs.Finalizers, tt.expectedFinalizers))
}

})
}
}
Loading