diff --git a/internal/boost/manager.go b/internal/boost/manager.go index be9d4d6..babd0bd 100644 --- a/internal/boost/manager.go +++ b/internal/boost/manager.go @@ -17,6 +17,7 @@ package boost import ( "context" "errors" + "fmt" "sync" "time" @@ -34,13 +35,19 @@ var ( const ( DefaultManagerCheckInterval = time.Duration(5 * time.Second) + DefaultMaxGoroutines = 10 ) type Manager interface { + // AddStartupCPUBoost registers a new startup-cpu-boost is a manager. AddStartupCPUBoost(ctx context.Context, boost StartupCPUBoost) error + // RemoveStartupCPUBoost removes a startup-cpu-boost from a manager RemoveStartupCPUBoost(ctx context.Context, namespace, name string) + // StartupCPUBoost returns a startup-cpu-boost with a given name and namespace StartupCPUBoostForPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, bool) + // StartupCPUBoostForPod returns a startup-cpu-boost that matches a given pod StartupCPUBoost(namespace, name string) (StartupCPUBoost, bool) + // Start runs the manager's control loop Start(ctx context.Context) error } @@ -74,6 +81,7 @@ type managerImpl struct { checkInterval time.Duration startupCPUBoosts map[string]map[string]StartupCPUBoost timePolicyBoosts map[boostKey]StartupCPUBoost + maxGoroutines int } type boostKey struct { @@ -92,9 +100,12 @@ func NewManagerWithTicker(client client.Client, ticker TimeTicker) Manager { checkInterval: DefaultManagerCheckInterval, startupCPUBoosts: make(map[string]map[string]StartupCPUBoost), timePolicyBoosts: make(map[boostKey]StartupCPUBoost), + maxGoroutines: DefaultMaxGoroutines, } } +// AddStartupCPUBoost registers a new startup-cpu-boost is a manager. +// If a boost with a given name and namespace already exists, it returns an error. func (m *managerImpl) AddStartupCPUBoost(ctx context.Context, boost StartupCPUBoost) error { m.Lock() defer m.Unlock() @@ -107,6 +118,7 @@ func (m *managerImpl) AddStartupCPUBoost(ctx context.Context, boost StartupCPUBo return nil } +// RemoveStartupCPUBoost removes a startup-cpu-boost from a manager if registered. func (m *managerImpl) RemoveStartupCPUBoost(ctx context.Context, namespace, name string) { m.Lock() defer m.Unlock() @@ -119,12 +131,16 @@ func (m *managerImpl) RemoveStartupCPUBoost(ctx context.Context, namespace, name delete(m.timePolicyBoosts, key) } +// StartupCPUBoost returns a startup-cpu-boost with a given name and namespace +// if registered in a manager. func (m *managerImpl) StartupCPUBoost(namespace string, name string) (StartupCPUBoost, bool) { m.RLock() defer m.RUnlock() return m.getStartupCPUBoost(namespace, name) } +// StartupCPUBoostForPod returns a startup-cpu-boost that matches a given pod if such is registered +// in a manager. func (m *managerImpl) StartupCPUBoostForPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, bool) { m.RLock() defer m.RUnlock() @@ -142,23 +158,23 @@ func (m *managerImpl) StartupCPUBoostForPod(ctx context.Context, pod *corev1.Pod return nil, false } +// Start runs the manager's control loop func (m *managerImpl) Start(ctx context.Context) error { log := m.loggerFromContext(ctx) - //t := time.NewTicker(m.checkInterval) - //defer t.Stop() defer m.ticker.Stop() log.V(2).Info("Starting") for { select { case <-m.ticker.Tick(): log.V(5).Info("tick...") - m.updateTimePolicyBoosts(ctx) + m.validateTimePolicyBoosts(ctx) case <-ctx.Done(): return nil } } } +// addStartupCPUBoost registers a new startup-cpu-boost in a manager. func (m *managerImpl) addStartupCPUBoost(boost StartupCPUBoost) { boosts, ok := m.startupCPUBoosts[boost.Namespace()] if !ok { @@ -172,6 +188,8 @@ func (m *managerImpl) addStartupCPUBoost(boost StartupCPUBoost) { } } +// getStartupCPUBoost returns the startup-cpu-boost with a given name and namespace +// if registered in a manager. func (m *managerImpl) getStartupCPUBoost(namespace string, name string) (StartupCPUBoost, bool) { if boosts, ok := m.startupCPUBoosts[namespace]; ok { boost, ok := boosts[name] @@ -180,21 +198,56 @@ func (m *managerImpl) getStartupCPUBoost(namespace string, name string) (Startup return nil, false } -func (m *managerImpl) updateTimePolicyBoosts(ctx context.Context) { +type podRevertTask struct { + boost StartupCPUBoost + pod *corev1.Pod +} + +// validateTimePolicyBoosts validates all time policy boosts in a manager +// and reverts the resources for violated pods. +func (m *managerImpl) validateTimePolicyBoosts(ctx context.Context) { m.RLock() defer m.RUnlock() + revertTasks := make(chan *podRevertTask, m.maxGoroutines) + errors := make(chan error, m.maxGoroutines) log := m.loggerFromContext(ctx) - for _, boost := range m.timePolicyBoosts { - for _, pod := range boost.ValidatePolicy(ctx, duration.FixedDurationPolicyName) { - log = log.WithValues("boost", boost.Name(), "namespace", boost.Namespace(), "pod", pod.Name) - log.V(5).Info("updating pod with initial resources") - if err := boost.RevertResources(ctx, pod); err != nil { - log.Error(err, "failed to revert resources for pod") + + go func() { + for _, boost := range m.timePolicyBoosts { + for _, pod := range boost.ValidatePolicy(ctx, duration.FixedDurationPolicyName) { + revertTasks <- &podRevertTask{ + boost: boost, + pod: pod, + } } } + close(revertTasks) + }() + + go func() { + var wg sync.WaitGroup + for i := 0; i < m.maxGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for task := range revertTasks { + log = log.WithValues("boost", task.boost.Name(), "namespace", task.boost.Namespace(), "pod", task.pod.Name) + log.V(5).Info("updating pod with initial resources") + if err := task.boost.RevertResources(ctx, task.pod); err != nil { + errors <- fmt.Errorf("pod %s/%s: %w", task.pod.Namespace, task.pod.Name, err) + } + } + }() + } + wg.Wait() + close(errors) + }() + for err := range errors { + log.Error(err, "failed to revert resources") } } +// loggerFromContext returns a pre-configured logger from the given context func (m *managerImpl) loggerFromContext(ctx context.Context) logr.Logger { return ctrl.LoggerFrom(ctx). WithName("boost-manager")