From 7b6db9d16f6549a96d3283a38dbb0c2233d45a60 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 3 Dec 2024 09:00:24 +0100 Subject: [PATCH] replace deprecated Queue and RateLimiter replace the deprecated Queue and RateLimiter --- cmd/csi-snapshotter/main.go | 4 +- cmd/snapshot-controller/main.go | 8 +- pkg/common-controller/framework_test.go | 8 +- .../snapshot_controller_base.go | 92 ++++++++++--------- pkg/sidecar-controller/framework_test.go | 4 +- .../groupsnapshot_helper.go | 12 +-- .../snapshot_controller_base.go | 42 +++++---- 7 files changed, 91 insertions(+), 79 deletions(-) diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index b8b907cc9..69f46598e 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -274,11 +274,11 @@ func main() { *groupSnapshotNamePrefix, *groupSnapshotNameUUIDLength, *extraCreateMetadata, - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), utilfeature.DefaultFeatureGate.Enabled(features.VolumeGroupSnapshot), snapshotContentfactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotContents(), snapshotContentfactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotClasses(), - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), ) run := func(context.Context) { diff --git a/cmd/snapshot-controller/main.go b/cmd/snapshot-controller/main.go index 1a90c254a..d9b006377 100644 --- a/cmd/snapshot-controller/main.go +++ b/cmd/snapshot-controller/main.go @@ -233,10 +233,10 @@ func main() { nodeInformer, metricsManager, *resyncPeriod, - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), *enableDistributedSnapshotting, *preventVolumeModeConversion, utilfeature.DefaultFeatureGate.Enabled(features.VolumeGroupSnapshot), diff --git a/pkg/common-controller/framework_test.go b/pkg/common-controller/framework_test.go index 7c4fdc1bb..37e68483a 100644 --- a/pkg/common-controller/framework_test.go +++ b/pkg/common-controller/framework_test.go @@ -1205,10 +1205,10 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte nil, metricsManager, 60*time.Second, - workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), - workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), - workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), - workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute), false, false, true, diff --git a/pkg/common-controller/snapshot_controller_base.go b/pkg/common-controller/snapshot_controller_base.go index 51d09eec5..8f88bb2d4 100644 --- a/pkg/common-controller/snapshot_controller_base.go +++ b/pkg/common-controller/snapshot_controller_base.go @@ -50,10 +50,10 @@ type csiSnapshotCommonController struct { clientset clientset.Interface client kubernetes.Interface eventRecorder record.EventRecorder - snapshotQueue workqueue.RateLimitingInterface - contentQueue workqueue.RateLimitingInterface - groupSnapshotQueue workqueue.RateLimitingInterface - groupSnapshotContentQueue workqueue.RateLimitingInterface + snapshotQueue workqueue.TypedRateLimitingInterface[string] + contentQueue workqueue.TypedRateLimitingInterface[string] + groupSnapshotQueue workqueue.TypedRateLimitingInterface[string] + groupSnapshotContentQueue workqueue.TypedRateLimitingInterface[string] snapshotLister snapshotlisters.VolumeSnapshotLister snapshotListerSynced cache.InformerSynced @@ -106,10 +106,10 @@ func NewCSISnapshotCommonController( nodeInformer coreinformers.NodeInformer, metricsManager metrics.MetricsManager, resyncPeriod time.Duration, - snapshotRateLimiter workqueue.RateLimiter, - contentRateLimiter workqueue.RateLimiter, - groupSnapshotRateLimiter workqueue.RateLimiter, - groupSnapshotContentRateLimiter workqueue.RateLimiter, + snapshotRateLimiter workqueue.TypedRateLimiter[string], + contentRateLimiter workqueue.TypedRateLimiter[string], + groupSnapshotRateLimiter workqueue.TypedRateLimiter[string], + groupSnapshotContentRateLimiter workqueue.TypedRateLimiter[string], enableDistributedSnapshotting bool, preventVolumeModeConversion bool, enableVolumeGroupSnapshots bool, @@ -121,14 +121,18 @@ func NewCSISnapshotCommonController( eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("snapshot-controller")}) ctrl := &csiSnapshotCommonController{ - clientset: clientset, - client: client, - eventRecorder: eventRecorder, - resyncPeriod: resyncPeriod, - snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), - contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), - snapshotQueue: workqueue.NewNamedRateLimitingQueue(snapshotRateLimiter, "snapshot-controller-snapshot"), - contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "snapshot-controller-content"), + clientset: clientset, + client: client, + eventRecorder: eventRecorder, + resyncPeriod: resyncPeriod, + snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + snapshotQueue: workqueue.NewTypedRateLimitingQueueWithConfig(snapshotRateLimiter, + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "snapshot-controller-snapshot"}), + contentQueue: workqueue.NewTypedRateLimitingQueueWithConfig(contentRateLimiter, + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "snapshot-controller-content"}), metricsManager: metricsManager, } @@ -203,8 +207,12 @@ func NewCSISnapshotCommonController( ctrl.groupSnapshotStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) - ctrl.groupSnapshotQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotRateLimiter, "snapshot-controller-group-snapshot") - ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "snapshot-controller-group-content") + ctrl.groupSnapshotQueue = workqueue.NewTypedRateLimitingQueueWithConfig( + groupSnapshotRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "snapshot-controller-group-snapshot"}) + ctrl.groupSnapshotContentQueue = workqueue.NewTypedRateLimitingQueueWithConfig( + groupSnapshotContentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "snapshot-controller-group-content"}) volumeGroupSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -316,21 +324,21 @@ func (ctrl *csiSnapshotCommonController) enqueueContentWork(obj interface{}) { // snapshotWorker is the main worker for VolumeSnapshots. func (ctrl *csiSnapshotCommonController) snapshotWorker() { - keyObj, quit := ctrl.snapshotQueue.Get() + key, quit := ctrl.snapshotQueue.Get() if quit { return } - defer ctrl.snapshotQueue.Done(keyObj) + defer ctrl.snapshotQueue.Done(key) - if err := ctrl.syncSnapshotByKey(keyObj.(string)); err != nil { + if err := ctrl.syncSnapshotByKey(key); err != nil { // Rather than wait for a full resync, re-add the key to the // queue to be processed. - ctrl.snapshotQueue.AddRateLimited(keyObj) - klog.V(4).Infof("Failed to sync snapshot %q, will retry again: %v", keyObj.(string), err) + ctrl.snapshotQueue.AddRateLimited(key) + klog.V(4).Infof("Failed to sync snapshot %q, will retry again: %v", key, err) } else { // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - ctrl.snapshotQueue.Forget(keyObj) + ctrl.snapshotQueue.Forget(key) } } @@ -392,21 +400,21 @@ func (ctrl *csiSnapshotCommonController) syncSnapshotByKey(key string) error { // contentWorker is the main worker for VolumeSnapshotContent. func (ctrl *csiSnapshotCommonController) contentWorker() { - keyObj, quit := ctrl.contentQueue.Get() + key, quit := ctrl.contentQueue.Get() if quit { return } - defer ctrl.contentQueue.Done(keyObj) + defer ctrl.contentQueue.Done(key) - if err := ctrl.syncContentByKey(keyObj.(string)); err != nil { + if err := ctrl.syncContentByKey(key); err != nil { // Rather than wait for a full resync, re-add the key to the // queue to be processed. - ctrl.contentQueue.AddRateLimited(keyObj) - klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err) + ctrl.contentQueue.AddRateLimited(key) + klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err) } else { // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - ctrl.contentQueue.Forget(keyObj) + ctrl.contentQueue.Forget(key) } } @@ -680,41 +688,41 @@ func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotContentWork(obj int // groupSnapshotWorker is the main worker for VolumeGroupSnapshots. func (ctrl *csiSnapshotCommonController) groupSnapshotWorker() { - keyObj, quit := ctrl.groupSnapshotQueue.Get() + key, quit := ctrl.groupSnapshotQueue.Get() if quit { return } - defer ctrl.groupSnapshotQueue.Done(keyObj) + defer ctrl.groupSnapshotQueue.Done(key) - if err := ctrl.syncGroupSnapshotByKey(context.Background(), keyObj.(string)); err != nil { + if err := ctrl.syncGroupSnapshotByKey(context.Background(), key); err != nil { // Rather than wait for a full resync, re-add the key to the // queue to be processed. - ctrl.groupSnapshotQueue.AddRateLimited(keyObj) - klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", keyObj.(string), err) + ctrl.groupSnapshotQueue.AddRateLimited(key) + klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", key, err) } else { // Finally, if no error occurs we forget this item so it does not // get queued again until another change happens. - ctrl.groupSnapshotQueue.Forget(keyObj) + ctrl.groupSnapshotQueue.Forget(key) } } // groupSnapshotContentWorker is the main worker for VolumeGroupSnapshotContent. func (ctrl *csiSnapshotCommonController) groupSnapshotContentWorker() { - keyObj, quit := ctrl.groupSnapshotContentQueue.Get() + key, quit := ctrl.groupSnapshotContentQueue.Get() if quit { return } - defer ctrl.groupSnapshotContentQueue.Done(keyObj) + defer ctrl.groupSnapshotContentQueue.Done(key) - if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil { + if err := ctrl.syncGroupSnapshotContentByKey(key); err != nil { // Rather than wait for a full resync, re-add the key to the // queue to be processed. - ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj) - klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err) + ctrl.groupSnapshotContentQueue.AddRateLimited(key) + klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err) } else { // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - ctrl.groupSnapshotContentQueue.Forget(keyObj) + ctrl.groupSnapshotContentQueue.Forget(key) } } diff --git a/pkg/sidecar-controller/framework_test.go b/pkg/sidecar-controller/framework_test.go index 304e62b0a..677ea51c8 100644 --- a/pkg/sidecar-controller/framework_test.go +++ b/pkg/sidecar-controller/framework_test.go @@ -575,11 +575,11 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte "groupsnapshot", -1, true, - workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute), false, informerFactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotContents(), informerFactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotClasses(), - workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute), ) ctrl.eventRecorder = record.NewFakeRecorder(1000) diff --git a/pkg/sidecar-controller/groupsnapshot_helper.go b/pkg/sidecar-controller/groupsnapshot_helper.go index 8e3a31c28..13baa61f1 100644 --- a/pkg/sidecar-controller/groupsnapshot_helper.go +++ b/pkg/sidecar-controller/groupsnapshot_helper.go @@ -66,23 +66,23 @@ func (ctrl *csiSnapshotSideCarController) enqueueGroupSnapshotContentWork(obj in // groupSnapshotContentWorker processes items from groupSnapshotContentQueue. // It must run only once, syncGroupSnapshotContent is not assured to be reentrant. func (ctrl *csiSnapshotSideCarController) groupSnapshotContentWorker() { - keyObj, quit := ctrl.groupSnapshotContentQueue.Get() + key, quit := ctrl.groupSnapshotContentQueue.Get() if quit { return } - defer ctrl.groupSnapshotContentQueue.Done(keyObj) + defer ctrl.groupSnapshotContentQueue.Done(key) - if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil { + if err := ctrl.syncGroupSnapshotContentByKey(key); err != nil { // Rather than wait for a full resync, re-add the key to the // queue to be processed. - ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj) - klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", keyObj.(string), err) + ctrl.groupSnapshotContentQueue.AddRateLimited(key) + klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", key, err) return } // Finally, if no error occurs we forget this item so it does not // get queued again until another change happens. - ctrl.groupSnapshotContentQueue.Forget(keyObj) + ctrl.groupSnapshotContentQueue.Forget(key) return } diff --git a/pkg/sidecar-controller/snapshot_controller_base.go b/pkg/sidecar-controller/snapshot_controller_base.go index e6b4d96b9..0e0c7295d 100644 --- a/pkg/sidecar-controller/snapshot_controller_base.go +++ b/pkg/sidecar-controller/snapshot_controller_base.go @@ -50,7 +50,7 @@ type csiSnapshotSideCarController struct { client kubernetes.Interface driverName string eventRecorder record.EventRecorder - contentQueue workqueue.RateLimitingInterface + contentQueue workqueue.TypedRateLimitingInterface[string] extraCreateMetadata bool contentLister snapshotlisters.VolumeSnapshotContentLister @@ -65,7 +65,7 @@ type csiSnapshotSideCarController struct { resyncPeriod time.Duration enableVolumeGroupSnapshots bool - groupSnapshotContentQueue workqueue.RateLimitingInterface + groupSnapshotContentQueue workqueue.TypedRateLimitingInterface[string] groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister groupSnapshotContentListerSynced cache.InformerSynced groupSnapshotClassLister groupsnapshotlisters.VolumeGroupSnapshotClassLister @@ -89,11 +89,11 @@ func NewCSISnapshotSideCarController( groupSnapshotNamePrefix string, groupSnapshotNameUUIDLength int, extraCreateMetadata bool, - contentRateLimiter workqueue.RateLimiter, + contentRateLimiter workqueue.TypedRateLimiter[string], enableVolumeGroupSnapshots bool, volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer, volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer, - groupSnapshotContentRateLimiter workqueue.RateLimiter, + groupSnapshotContentRateLimiter workqueue.TypedRateLimiter[string], ) *csiSnapshotSideCarController { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) @@ -102,14 +102,16 @@ func NewCSISnapshotSideCarController( eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)}) ctrl := &csiSnapshotSideCarController{ - clientset: clientset, - client: client, - driverName: driverName, - eventRecorder: eventRecorder, - handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength), - resyncPeriod: resyncPeriod, - contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), - contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "csi-snapshotter-content"), + clientset: clientset, + client: client, + driverName: driverName, + eventRecorder: eventRecorder, + handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength), + resyncPeriod: resyncPeriod, + contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + contentQueue: workqueue.NewTypedRateLimitingQueueWithConfig( + contentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "csi-snapshotter-content"}), extraCreateMetadata: extraCreateMetadata, } @@ -136,7 +138,9 @@ func NewCSISnapshotSideCarController( ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots if enableVolumeGroupSnapshots { ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) - ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "csi-snapshotter-groupsnapshotcontent") + ctrl.groupSnapshotContentQueue = workqueue.NewTypedRateLimitingQueueWithConfig( + groupSnapshotContentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "csi-snapshotter-groupsnapshotcontent"}) volumeGroupSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -217,27 +221,27 @@ func (ctrl *csiSnapshotSideCarController) contentWorker() { } func (ctrl *csiSnapshotSideCarController) processNextItem() bool { - keyObj, quit := ctrl.contentQueue.Get() + key, quit := ctrl.contentQueue.Get() if quit { return false } - defer ctrl.contentQueue.Done(keyObj) + defer ctrl.contentQueue.Done(key) - requeue, err := ctrl.syncContentByKey(keyObj.(string)) + requeue, err := ctrl.syncContentByKey(key) if err != nil { - klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err) + klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err) // Always requeue on error to be able to call functions like "return false, doSomething()" where doSomething // does not need to worry about re-queueing. requeue = true } if requeue { - ctrl.contentQueue.AddRateLimited(keyObj) + ctrl.contentQueue.AddRateLimited(key) return true } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - ctrl.contentQueue.Forget(keyObj) + ctrl.contentQueue.Forget(key) return true }