Skip to content

Commit

Permalink
replace deprecated Queue and RateLimiter
Browse files Browse the repository at this point in the history
replace the deprecated Queue
and RateLimiter
  • Loading branch information
Madhu-1 committed Dec 3, 2024
1 parent a0cef53 commit 7b6db9d
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 79 deletions.
4 changes: 2 additions & 2 deletions cmd/csi-snapshotter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,11 @@ func main() {
*groupSnapshotNamePrefix,
*groupSnapshotNameUUIDLength,
*extraCreateMetadata,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
utilfeature.DefaultFeatureGate.Enabled(features.VolumeGroupSnapshot),
snapshotContentfactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotContents(),
snapshotContentfactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotClasses(),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
)

run := func(context.Context) {
Expand Down
8 changes: 4 additions & 4 deletions cmd/snapshot-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,10 @@ func main() {
nodeInformer,
metricsManager,
*resyncPeriod,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
*enableDistributedSnapshotting,
*preventVolumeModeConversion,
utilfeature.DefaultFeatureGate.Enabled(features.VolumeGroupSnapshot),
Expand Down
8 changes: 4 additions & 4 deletions pkg/common-controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,10 +1205,10 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
nil,
metricsManager,
60*time.Second,
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
false,
false,
true,
Expand Down
92 changes: 50 additions & 42 deletions pkg/common-controller/snapshot_controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type csiSnapshotCommonController struct {
clientset clientset.Interface
client kubernetes.Interface
eventRecorder record.EventRecorder
snapshotQueue workqueue.RateLimitingInterface
contentQueue workqueue.RateLimitingInterface
groupSnapshotQueue workqueue.RateLimitingInterface
groupSnapshotContentQueue workqueue.RateLimitingInterface
snapshotQueue workqueue.TypedRateLimitingInterface[string]
contentQueue workqueue.TypedRateLimitingInterface[string]
groupSnapshotQueue workqueue.TypedRateLimitingInterface[string]
groupSnapshotContentQueue workqueue.TypedRateLimitingInterface[string]

snapshotLister snapshotlisters.VolumeSnapshotLister
snapshotListerSynced cache.InformerSynced
Expand Down Expand Up @@ -106,10 +106,10 @@ func NewCSISnapshotCommonController(
nodeInformer coreinformers.NodeInformer,
metricsManager metrics.MetricsManager,
resyncPeriod time.Duration,
snapshotRateLimiter workqueue.RateLimiter,
contentRateLimiter workqueue.RateLimiter,
groupSnapshotRateLimiter workqueue.RateLimiter,
groupSnapshotContentRateLimiter workqueue.RateLimiter,
snapshotRateLimiter workqueue.TypedRateLimiter[string],
contentRateLimiter workqueue.TypedRateLimiter[string],
groupSnapshotRateLimiter workqueue.TypedRateLimiter[string],
groupSnapshotContentRateLimiter workqueue.TypedRateLimiter[string],
enableDistributedSnapshotting bool,
preventVolumeModeConversion bool,
enableVolumeGroupSnapshots bool,
Expand All @@ -121,14 +121,18 @@ func NewCSISnapshotCommonController(
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("snapshot-controller")})

ctrl := &csiSnapshotCommonController{
clientset: clientset,
client: client,
eventRecorder: eventRecorder,
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(snapshotRateLimiter, "snapshot-controller-snapshot"),
contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "snapshot-controller-content"),
clientset: clientset,
client: client,
eventRecorder: eventRecorder,
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
snapshotQueue: workqueue.NewTypedRateLimitingQueueWithConfig(snapshotRateLimiter,
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "snapshot-controller-snapshot"}),
contentQueue: workqueue.NewTypedRateLimitingQueueWithConfig(contentRateLimiter,
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "snapshot-controller-content"}),
metricsManager: metricsManager,
}

Expand Down Expand Up @@ -203,8 +207,12 @@ func NewCSISnapshotCommonController(
ctrl.groupSnapshotStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

ctrl.groupSnapshotQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotRateLimiter, "snapshot-controller-group-snapshot")
ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "snapshot-controller-group-content")
ctrl.groupSnapshotQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
groupSnapshotRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
Name: "snapshot-controller-group-snapshot"})
ctrl.groupSnapshotContentQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
groupSnapshotContentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
Name: "snapshot-controller-group-content"})

volumeGroupSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -316,21 +324,21 @@ func (ctrl *csiSnapshotCommonController) enqueueContentWork(obj interface{}) {

// snapshotWorker is the main worker for VolumeSnapshots.
func (ctrl *csiSnapshotCommonController) snapshotWorker() {
keyObj, quit := ctrl.snapshotQueue.Get()
key, quit := ctrl.snapshotQueue.Get()
if quit {
return
}
defer ctrl.snapshotQueue.Done(keyObj)
defer ctrl.snapshotQueue.Done(key)

if err := ctrl.syncSnapshotByKey(keyObj.(string)); err != nil {
if err := ctrl.syncSnapshotByKey(key); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.snapshotQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync snapshot %q, will retry again: %v", keyObj.(string), err)
ctrl.snapshotQueue.AddRateLimited(key)
klog.V(4).Infof("Failed to sync snapshot %q, will retry again: %v", key, err)
} else {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
ctrl.snapshotQueue.Forget(keyObj)
ctrl.snapshotQueue.Forget(key)
}
}

Expand Down Expand Up @@ -392,21 +400,21 @@ func (ctrl *csiSnapshotCommonController) syncSnapshotByKey(key string) error {

// contentWorker is the main worker for VolumeSnapshotContent.
func (ctrl *csiSnapshotCommonController) contentWorker() {
keyObj, quit := ctrl.contentQueue.Get()
key, quit := ctrl.contentQueue.Get()
if quit {
return
}
defer ctrl.contentQueue.Done(keyObj)
defer ctrl.contentQueue.Done(key)

if err := ctrl.syncContentByKey(keyObj.(string)); err != nil {
if err := ctrl.syncContentByKey(key); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.contentQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
ctrl.contentQueue.AddRateLimited(key)
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err)
} else {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
ctrl.contentQueue.Forget(keyObj)
ctrl.contentQueue.Forget(key)
}
}

Expand Down Expand Up @@ -680,41 +688,41 @@ func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotContentWork(obj int

// groupSnapshotWorker is the main worker for VolumeGroupSnapshots.
func (ctrl *csiSnapshotCommonController) groupSnapshotWorker() {
keyObj, quit := ctrl.groupSnapshotQueue.Get()
key, quit := ctrl.groupSnapshotQueue.Get()
if quit {
return
}
defer ctrl.groupSnapshotQueue.Done(keyObj)
defer ctrl.groupSnapshotQueue.Done(key)

if err := ctrl.syncGroupSnapshotByKey(context.Background(), keyObj.(string)); err != nil {
if err := ctrl.syncGroupSnapshotByKey(context.Background(), key); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.groupSnapshotQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", keyObj.(string), err)
ctrl.groupSnapshotQueue.AddRateLimited(key)
klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", key, err)
} else {
// Finally, if no error occurs we forget this item so it does not
// get queued again until another change happens.
ctrl.groupSnapshotQueue.Forget(keyObj)
ctrl.groupSnapshotQueue.Forget(key)
}
}

// groupSnapshotContentWorker is the main worker for VolumeGroupSnapshotContent.
func (ctrl *csiSnapshotCommonController) groupSnapshotContentWorker() {
keyObj, quit := ctrl.groupSnapshotContentQueue.Get()
key, quit := ctrl.groupSnapshotContentQueue.Get()
if quit {
return
}
defer ctrl.groupSnapshotContentQueue.Done(keyObj)
defer ctrl.groupSnapshotContentQueue.Done(key)

if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil {
if err := ctrl.syncGroupSnapshotContentByKey(key); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
ctrl.groupSnapshotContentQueue.AddRateLimited(key)
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err)
} else {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
ctrl.groupSnapshotContentQueue.Forget(keyObj)
ctrl.groupSnapshotContentQueue.Forget(key)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sidecar-controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,11 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
"groupsnapshot",
-1,
true,
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
false,
informerFactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotContents(),
informerFactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotClasses(),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
)

ctrl.eventRecorder = record.NewFakeRecorder(1000)
Expand Down
12 changes: 6 additions & 6 deletions pkg/sidecar-controller/groupsnapshot_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,23 @@ func (ctrl *csiSnapshotSideCarController) enqueueGroupSnapshotContentWork(obj in
// groupSnapshotContentWorker processes items from groupSnapshotContentQueue.
// It must run only once, syncGroupSnapshotContent is not assured to be reentrant.
func (ctrl *csiSnapshotSideCarController) groupSnapshotContentWorker() {
keyObj, quit := ctrl.groupSnapshotContentQueue.Get()
key, quit := ctrl.groupSnapshotContentQueue.Get()
if quit {
return
}
defer ctrl.groupSnapshotContentQueue.Done(keyObj)
defer ctrl.groupSnapshotContentQueue.Done(key)

if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil {
if err := ctrl.syncGroupSnapshotContentByKey(key); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", keyObj.(string), err)
ctrl.groupSnapshotContentQueue.AddRateLimited(key)
klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", key, err)
return
}

// Finally, if no error occurs we forget this item so it does not
// get queued again until another change happens.
ctrl.groupSnapshotContentQueue.Forget(keyObj)
ctrl.groupSnapshotContentQueue.Forget(key)
return
}

Expand Down
42 changes: 23 additions & 19 deletions pkg/sidecar-controller/snapshot_controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type csiSnapshotSideCarController struct {
client kubernetes.Interface
driverName string
eventRecorder record.EventRecorder
contentQueue workqueue.RateLimitingInterface
contentQueue workqueue.TypedRateLimitingInterface[string]
extraCreateMetadata bool

contentLister snapshotlisters.VolumeSnapshotContentLister
Expand All @@ -65,7 +65,7 @@ type csiSnapshotSideCarController struct {
resyncPeriod time.Duration

enableVolumeGroupSnapshots bool
groupSnapshotContentQueue workqueue.RateLimitingInterface
groupSnapshotContentQueue workqueue.TypedRateLimitingInterface[string]
groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister
groupSnapshotContentListerSynced cache.InformerSynced
groupSnapshotClassLister groupsnapshotlisters.VolumeGroupSnapshotClassLister
Expand All @@ -89,11 +89,11 @@ func NewCSISnapshotSideCarController(
groupSnapshotNamePrefix string,
groupSnapshotNameUUIDLength int,
extraCreateMetadata bool,
contentRateLimiter workqueue.RateLimiter,
contentRateLimiter workqueue.TypedRateLimiter[string],
enableVolumeGroupSnapshots bool,
volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer,
volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer,
groupSnapshotContentRateLimiter workqueue.RateLimiter,
groupSnapshotContentRateLimiter workqueue.TypedRateLimiter[string],
) *csiSnapshotSideCarController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
Expand All @@ -102,14 +102,16 @@ func NewCSISnapshotSideCarController(
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)})

ctrl := &csiSnapshotSideCarController{
clientset: clientset,
client: client,
driverName: driverName,
eventRecorder: eventRecorder,
handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength),
resyncPeriod: resyncPeriod,
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "csi-snapshotter-content"),
clientset: clientset,
client: client,
driverName: driverName,
eventRecorder: eventRecorder,
handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength),
resyncPeriod: resyncPeriod,
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
contentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
Name: "csi-snapshotter-content"}),
extraCreateMetadata: extraCreateMetadata,
}

Expand All @@ -136,7 +138,9 @@ func NewCSISnapshotSideCarController(
ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots
if enableVolumeGroupSnapshots {
ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "csi-snapshotter-groupsnapshotcontent")
ctrl.groupSnapshotContentQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
groupSnapshotContentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
Name: "csi-snapshotter-groupsnapshotcontent"})

volumeGroupSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -217,27 +221,27 @@ func (ctrl *csiSnapshotSideCarController) contentWorker() {
}

func (ctrl *csiSnapshotSideCarController) processNextItem() bool {
keyObj, quit := ctrl.contentQueue.Get()
key, quit := ctrl.contentQueue.Get()
if quit {
return false
}
defer ctrl.contentQueue.Done(keyObj)
defer ctrl.contentQueue.Done(key)

requeue, err := ctrl.syncContentByKey(keyObj.(string))
requeue, err := ctrl.syncContentByKey(key)
if err != nil {
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err)
// Always requeue on error to be able to call functions like "return false, doSomething()" where doSomething
// does not need to worry about re-queueing.
requeue = true
}
if requeue {
ctrl.contentQueue.AddRateLimited(keyObj)
ctrl.contentQueue.AddRateLimited(key)
return true
}

// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
ctrl.contentQueue.Forget(keyObj)
ctrl.contentQueue.Forget(key)
return true
}

Expand Down

0 comments on commit 7b6db9d

Please sign in to comment.