Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow limiting the number of nodegroups created in parallel #7884

Merged
merged 1 commit into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/actions/nodegroup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type CreateOpts struct {
DryRunSettings DryRunSettings
SkipOutdatedAddonsCheck bool
ConfigFileProvided bool
Parallelism int
}

type DryRunSettings struct {
Expand Down Expand Up @@ -171,7 +172,7 @@ func (m *Manager) Create(ctx context.Context, options CreateOpts, nodegroupFilte
return cmdutils.PrintNodeGroupDryRunConfig(clusterConfigCopy, options.DryRunSettings.OutStream)
}

if err := m.nodeCreationTasks(ctx, isOwnedCluster, skipEgressRules, options.UpdateAuthConfigMap); err != nil {
if err := m.nodeCreationTasks(ctx, isOwnedCluster, skipEgressRules, options.UpdateAuthConfigMap, options.Parallelism); err != nil {
return err
}

Expand Down Expand Up @@ -203,7 +204,7 @@ func makeOutpostsService(clusterConfig *api.ClusterConfig, provider api.ClusterP
}
}

func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgressRules bool, updateAuthConfigMap *bool) error {
func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgressRules bool, updateAuthConfigMap *bool, parallelism int) error {
cfg := m.cfg
meta := cfg.Metadata

Expand Down Expand Up @@ -259,10 +260,10 @@ func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgr
}
disableAccessEntryCreation := !m.accessEntry.IsEnabled() || updateAuthConfigMap != nil
if nodeGroupTasks := m.stackManager.NewUnmanagedNodeGroupTask(ctx, cfg.NodeGroups, !awsNodeUsesIRSA, skipEgressRules,
disableAccessEntryCreation, vpcImporter); nodeGroupTasks.Len() > 0 {
disableAccessEntryCreation, vpcImporter, parallelism); nodeGroupTasks.Len() > 0 {
allNodeGroupTasks.Append(nodeGroupTasks)
}
managedTasks := m.stackManager.NewManagedNodeGroupTask(ctx, cfg.ManagedNodeGroups, !awsNodeUsesIRSA, vpcImporter)
managedTasks := m.stackManager.NewManagedNodeGroupTask(ctx, cfg.ManagedNodeGroups, !awsNodeUsesIRSA, vpcImporter, parallelism)
if managedTasks.Len() > 0 {
allNodeGroupTasks.Append(managedTasks)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/actions/nodegroup/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ type stackManagerDelegate struct {
ngTaskCreator nodeGroupTaskCreator
}

func (s *stackManagerDelegate) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer) *tasks.TaskTree {
func (s *stackManagerDelegate) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer, nodeGroupParallelism int) *tasks.TaskTree {
return s.ngTaskCreator.NewUnmanagedNodeGroupTask(ctx, nodeGroups, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation, vpcImporter)
}

func (s *stackManagerDelegate) NewManagedNodeGroupTask(context.Context, []*api.ManagedNodeGroup, bool, vpc.Importer) *tasks.TaskTree {
func (s *stackManagerDelegate) NewManagedNodeGroupTask(context.Context, []*api.ManagedNodeGroup, bool, vpc.Importer, int) *tasks.TaskTree {
return nil
}

Expand Down
17 changes: 8 additions & 9 deletions pkg/cfn/manager/create_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/kris-nova/logger"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -25,7 +23,7 @@ import (
// NewTasksToCreateCluster defines all tasks required to create a cluster along
// with some nodegroups; see CreateAllNodeGroups for how onlyNodeGroupSubset works.
func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroups []*api.NodeGroup,
managedNodeGroups []*api.ManagedNodeGroup, accessConfig *api.AccessConfig, accessEntryCreator accessentry.CreatorInterface, postClusterCreationTasks ...tasks.Task) *tasks.TaskTree {
managedNodeGroups []*api.ManagedNodeGroup, accessConfig *api.AccessConfig, accessEntryCreator accessentry.CreatorInterface, nodeGroupParallelism int, postClusterCreationTasks ...tasks.Task) *tasks.TaskTree {
taskTree := tasks.TaskTree{Parallel: false}

taskTree.Append(&createClusterTask{
Expand All @@ -46,11 +44,11 @@ func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroup
IsSubTask: true,
}
disableAccessEntryCreation := accessConfig.AuthenticationMode == ekstypes.AuthenticationModeConfigMap
if unmanagedNodeGroupTasks := c.NewUnmanagedNodeGroupTask(ctx, nodeGroups, false, false, disableAccessEntryCreation, vpcImporter); unmanagedNodeGroupTasks.Len() > 0 {
if unmanagedNodeGroupTasks := c.NewUnmanagedNodeGroupTask(ctx, nodeGroups, false, false, disableAccessEntryCreation, vpcImporter, nodeGroupParallelism); unmanagedNodeGroupTasks.Len() > 0 {
unmanagedNodeGroupTasks.IsSubTask = true
nodeGroupTasks.Append(unmanagedNodeGroupTasks)
}
if managedNodeGroupTasks := c.NewManagedNodeGroupTask(ctx, managedNodeGroups, false, vpcImporter); managedNodeGroupTasks.Len() > 0 {
if managedNodeGroupTasks := c.NewManagedNodeGroupTask(ctx, managedNodeGroups, false, vpcImporter, nodeGroupParallelism); managedNodeGroupTasks.Len() > 0 {
managedNodeGroupTasks.IsSubTask = true
nodeGroupTasks.Append(managedNodeGroupTasks)
}
Expand All @@ -75,7 +73,7 @@ func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroup
}

// NewUnmanagedNodeGroupTask returns tasks for creating self-managed nodegroups.
func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer) *tasks.TaskTree {
func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer, parallelism int) *tasks.TaskTree {
task := &UnmanagedNodeGroupTask{
ClusterConfig: c.spec,
NodeGroups: nodeGroups,
Expand All @@ -93,12 +91,13 @@ func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGro
SkipEgressRules: skipEgressRules,
DisableAccessEntryCreation: disableAccessEntryCreation,
VPCImporter: vpcImporter,
Parallelism: parallelism,
})
}

// NewManagedNodeGroupTask defines tasks required to create managed nodegroups
func (c *StackCollection) NewManagedNodeGroupTask(ctx context.Context, nodeGroups []*api.ManagedNodeGroup, forceAddCNIPolicy bool, vpcImporter vpc.Importer) *tasks.TaskTree {
taskTree := &tasks.TaskTree{Parallel: true}
func (c *StackCollection) NewManagedNodeGroupTask(ctx context.Context, nodeGroups []*api.ManagedNodeGroup, forceAddCNIPolicy bool, vpcImporter vpc.Importer, nodeGroupParallelism int) *tasks.TaskTree {
taskTree := &tasks.TaskTree{Parallel: true, Limit: nodeGroupParallelism}
for _, ng := range nodeGroups {
// Disable parallelisation if any tags propagation is done
// since nodegroup must be created to propagate tags to its ASGs.
Expand Down Expand Up @@ -162,7 +161,7 @@ func (c *StackCollection) NewTasksToCreateIAMServiceAccounts(serviceAccounts []*
objectMeta.SetAnnotations(sa.AsObjectMeta().Annotations)
objectMeta.SetLabels(sa.AsObjectMeta().Labels)
if err := kubernetes.MaybeCreateServiceAccountOrUpdateMetadata(clientSet, objectMeta); err != nil {
return errors.Wrapf(err, "failed to create service account %s/%s", objectMeta.GetNamespace(), objectMeta.GetName())
return fmt.Errorf("failed to create service account %s/%s: %w", objectMeta.GetNamespace(), objectMeta.GetName(), err)
}
return nil
},
Expand Down
58 changes: 32 additions & 26 deletions pkg/cfn/manager/fakes/fake_stack_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading