Skip to content

Commit

Permalink
feat: time policy based pod revert done in parallel (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikouaj authored May 21, 2024
1 parent cf90a1f commit e04806a
Showing 1 changed file with 63 additions and 10 deletions.
73 changes: 63 additions & 10 deletions internal/boost/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package boost
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -34,13 +35,19 @@ var (

const (
DefaultManagerCheckInterval = time.Duration(5 * time.Second)
DefaultMaxGoroutines = 10
)

type Manager interface {
// AddStartupCPUBoost registers a new startup-cpu-boost is a manager.
AddStartupCPUBoost(ctx context.Context, boost StartupCPUBoost) error
// RemoveStartupCPUBoost removes a startup-cpu-boost from a manager
RemoveStartupCPUBoost(ctx context.Context, namespace, name string)
// StartupCPUBoost returns a startup-cpu-boost with a given name and namespace
StartupCPUBoostForPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, bool)
// StartupCPUBoostForPod returns a startup-cpu-boost that matches a given pod
StartupCPUBoost(namespace, name string) (StartupCPUBoost, bool)
// Start runs the manager's control loop
Start(ctx context.Context) error
}

Expand Down Expand Up @@ -74,6 +81,7 @@ type managerImpl struct {
checkInterval time.Duration
startupCPUBoosts map[string]map[string]StartupCPUBoost
timePolicyBoosts map[boostKey]StartupCPUBoost
maxGoroutines int
}

type boostKey struct {
Expand All @@ -92,9 +100,12 @@ func NewManagerWithTicker(client client.Client, ticker TimeTicker) Manager {
checkInterval: DefaultManagerCheckInterval,
startupCPUBoosts: make(map[string]map[string]StartupCPUBoost),
timePolicyBoosts: make(map[boostKey]StartupCPUBoost),
maxGoroutines: DefaultMaxGoroutines,
}
}

// AddStartupCPUBoost registers a new startup-cpu-boost is a manager.
// If a boost with a given name and namespace already exists, it returns an error.
func (m *managerImpl) AddStartupCPUBoost(ctx context.Context, boost StartupCPUBoost) error {
m.Lock()
defer m.Unlock()
Expand All @@ -107,6 +118,7 @@ func (m *managerImpl) AddStartupCPUBoost(ctx context.Context, boost StartupCPUBo
return nil
}

// RemoveStartupCPUBoost removes a startup-cpu-boost from a manager if registered.
func (m *managerImpl) RemoveStartupCPUBoost(ctx context.Context, namespace, name string) {
m.Lock()
defer m.Unlock()
Expand All @@ -119,12 +131,16 @@ func (m *managerImpl) RemoveStartupCPUBoost(ctx context.Context, namespace, name
delete(m.timePolicyBoosts, key)
}

// StartupCPUBoost returns a startup-cpu-boost with a given name and namespace
// if registered in a manager.
func (m *managerImpl) StartupCPUBoost(namespace string, name string) (StartupCPUBoost, bool) {
m.RLock()
defer m.RUnlock()
return m.getStartupCPUBoost(namespace, name)
}

// StartupCPUBoostForPod returns a startup-cpu-boost that matches a given pod if such is registered
// in a manager.
func (m *managerImpl) StartupCPUBoostForPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, bool) {
m.RLock()
defer m.RUnlock()
Expand All @@ -142,23 +158,23 @@ func (m *managerImpl) StartupCPUBoostForPod(ctx context.Context, pod *corev1.Pod
return nil, false
}

// Start runs the manager's control loop
func (m *managerImpl) Start(ctx context.Context) error {
log := m.loggerFromContext(ctx)
//t := time.NewTicker(m.checkInterval)
//defer t.Stop()
defer m.ticker.Stop()
log.V(2).Info("Starting")
for {
select {
case <-m.ticker.Tick():
log.V(5).Info("tick...")
m.updateTimePolicyBoosts(ctx)
m.validateTimePolicyBoosts(ctx)
case <-ctx.Done():
return nil
}
}
}

// addStartupCPUBoost registers a new startup-cpu-boost in a manager.
func (m *managerImpl) addStartupCPUBoost(boost StartupCPUBoost) {
boosts, ok := m.startupCPUBoosts[boost.Namespace()]
if !ok {
Expand All @@ -172,6 +188,8 @@ func (m *managerImpl) addStartupCPUBoost(boost StartupCPUBoost) {
}
}

// getStartupCPUBoost returns the startup-cpu-boost with a given name and namespace
// if registered in a manager.
func (m *managerImpl) getStartupCPUBoost(namespace string, name string) (StartupCPUBoost, bool) {
if boosts, ok := m.startupCPUBoosts[namespace]; ok {
boost, ok := boosts[name]
Expand All @@ -180,21 +198,56 @@ func (m *managerImpl) getStartupCPUBoost(namespace string, name string) (Startup
return nil, false
}

func (m *managerImpl) updateTimePolicyBoosts(ctx context.Context) {
type podRevertTask struct {
boost StartupCPUBoost
pod *corev1.Pod
}

// validateTimePolicyBoosts validates all time policy boosts in a manager
// and reverts the resources for violated pods.
func (m *managerImpl) validateTimePolicyBoosts(ctx context.Context) {
m.RLock()
defer m.RUnlock()
revertTasks := make(chan *podRevertTask, m.maxGoroutines)
errors := make(chan error, m.maxGoroutines)
log := m.loggerFromContext(ctx)
for _, boost := range m.timePolicyBoosts {
for _, pod := range boost.ValidatePolicy(ctx, duration.FixedDurationPolicyName) {
log = log.WithValues("boost", boost.Name(), "namespace", boost.Namespace(), "pod", pod.Name)
log.V(5).Info("updating pod with initial resources")
if err := boost.RevertResources(ctx, pod); err != nil {
log.Error(err, "failed to revert resources for pod")

go func() {
for _, boost := range m.timePolicyBoosts {
for _, pod := range boost.ValidatePolicy(ctx, duration.FixedDurationPolicyName) {
revertTasks <- &podRevertTask{
boost: boost,
pod: pod,
}
}
}
close(revertTasks)
}()

go func() {
var wg sync.WaitGroup
for i := 0; i < m.maxGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range revertTasks {
log = log.WithValues("boost", task.boost.Name(), "namespace", task.boost.Namespace(), "pod", task.pod.Name)
log.V(5).Info("updating pod with initial resources")
if err := task.boost.RevertResources(ctx, task.pod); err != nil {
errors <- fmt.Errorf("pod %s/%s: %w", task.pod.Namespace, task.pod.Name, err)
}
}
}()
}
wg.Wait()
close(errors)
}()
for err := range errors {
log.Error(err, "failed to revert resources")
}
}

// loggerFromContext returns a pre-configured logger from the given context
func (m *managerImpl) loggerFromContext(ctx context.Context) logr.Logger {
return ctrl.LoggerFrom(ctx).
WithName("boost-manager")
Expand Down

0 comments on commit e04806a

Please sign in to comment.