From 14ba85788b1586fa5544f288d4bb9269bebbbe00 Mon Sep 17 00:00:00 2001 From: kayrus Date: Thu, 13 Jul 2023 17:23:11 +0200 Subject: [PATCH] Add max-entries flag to limit ListVolumes request entries per page --- README.md | 2 ++ cmd/csi-attacher/main.go | 3 ++- pkg/attacher/lister.go | 9 ++++++--- pkg/controller/csi_handler_test.go | 6 +++--- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 9f97345c9f..2f0eab2787 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ Note that the external-attacher does not scale with more replicas. Only one exte * `--worker-threads`: The number of goroutines for processing VolumeAttachments. 10 workers is used by default. +* `--max-entries`: The max number of entries per page for processing ListVolumes. 0 means no limit and it is the default value. + * `--retry-interval-start`: The exponential backoff for failures. See [CSI error and timeout handling](#csi-error-and-timeout-handling) for details. 1 second is used by default. * `--retry-interval-max`: The exponential backoff maximum value. See [CSI error and timeout handling](#csi-error-and-timeout-handling) for details. 5 minutes is used by default. diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index 00ce97f15f..517b3b5c7c 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -56,6 +56,7 @@ var ( showVersion = flag.Bool("version", false, "Show version.") timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads") + maxEntries = flag.Int("max-entries", 0, "Max entries per each page in volume lister call, 0 means no limit.") retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.") retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.") @@ -211,7 +212,7 @@ func main() { vaLister := factory.Storage().V1().VolumeAttachments().Lister() csiNodeLister := factory.Storage().V1().CSINodes().Lister() volAttacher := attacher.NewAttacher(csiConn) - CSIVolumeLister := attacher.NewVolumeLister(csiConn) + CSIVolumeLister := attacher.NewVolumeLister(csiConn, *maxEntries) handler = controller.NewCSIHandler( clientset, csiAttacher, diff --git a/pkg/attacher/lister.go b/pkg/attacher/lister.go index 004c92e4dc..61ad69f4a5 100644 --- a/pkg/attacher/lister.go +++ b/pkg/attacher/lister.go @@ -26,13 +26,15 @@ import ( ) type CSIVolumeLister struct { - client csi.ControllerClient + client csi.ControllerClient + maxEntries int32 } // NewVolumeLister provides a new VolumeLister object. -func NewVolumeLister(conn *grpc.ClientConn) *CSIVolumeLister { +func NewVolumeLister(conn *grpc.ClientConn, maxEntries int) *CSIVolumeLister { return &CSIVolumeLister{ - client: csi.NewControllerClient(conn), + client: csi.NewControllerClient(conn), + maxEntries: int32(maxEntries), } } @@ -43,6 +45,7 @@ func (a *CSIVolumeLister) ListVolumes(ctx context.Context) (map[string]([]string for { rsp, err := a.client.ListVolumes(ctx, &csi.ListVolumesRequest{ StartingToken: tok, + MaxEntries: a.maxEntries, }) if err != nil { return nil, fmt.Errorf("failed to list volumes: %v", err) diff --git a/pkg/controller/csi_handler_test.go b/pkg/controller/csi_handler_test.go index 0e70b19a87..d64fcd1ffd 100644 --- a/pkg/controller/csi_handler_test.go +++ b/pkg/controller/csi_handler_test.go @@ -1440,7 +1440,7 @@ func TestCSIHandlerReconcileVA(t *testing.T) { pvWithFinalizer(), }, listerResponse: map[string][]string{ - testVolumeHandle: []string{testNodeID}, + testVolumeHandle: {testNodeID}, }, expectedActions: []core.Action{ // Intentionally empty @@ -1453,7 +1453,7 @@ func TestCSIHandlerReconcileVA(t *testing.T) { pvWithFinalizer(), }, listerResponse: map[string][]string{ - testVolumeHandle: []string{testNodeID}, + testVolumeHandle: {testNodeID}, }, expectedActions: []core.Action{}, expectedCSICalls: []csiCall{ @@ -1464,7 +1464,7 @@ func TestCSIHandlerReconcileVA(t *testing.T) { name: "no volume attachments but existing lister response results in no action", initialObjects: []runtime.Object{}, listerResponse: map[string][]string{ - testVolumeHandle: []string{testNodeID}, + testVolumeHandle: {testNodeID}, }, expectedActions: []core.Action{}, },