diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 9968b8b91226..d54e74adaa11 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -59,6 +59,9 @@ import ( ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" + "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection" + podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates" @@ -266,6 +269,8 @@ var ( provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.") frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed") asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.") + proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false") + podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.") ) func isFlagPassed(name string) bool { @@ -527,6 +532,20 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter podListProcessor.AddProcessor(injector) podListProcessor.AddProcessor(provreqProcesor) } + + if *proactiveScaleupEnabled { + podInjectionBackoffRegistry := podinjectionbackoff.NewFakePodControllerRegistry() + + podInjectionPodListProcessor := podinjection.NewPodInjectionPodListProcessor(podInjectionBackoffRegistry) + enforceInjectedPodsLimitProcessor := podinjection.NewEnforceInjectedPodsLimitProcessor(*podInjectionLimit) + + podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{podInjectionPodListProcessor, podListProcessor, enforceInjectedPodsLimitProcessor}) + + // FakePodsScaleUpStatusProcessor processor needs to be the first processor in ScaleUpStatusProcessor as it filters out fake pods from + // Scale Up status so that we don't emit events. + opts.Processors.ScaleUpStatusProcessor = podinjection.NewFakePodsScaleUpStatusProcessor(podInjectionBackoffRegistry) + } + opts.Processors.PodListProcessor = podListProcessor scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{} if autoscalingOptions.ParallelDrain { diff --git a/cluster-autoscaler/processors/podinjection/backoff/backoff_registry.go b/cluster-autoscaler/processors/podinjection/backoff/backoff_registry.go new file mode 100644 index 000000000000..28b14e7b215a --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/backoff/backoff_registry.go @@ -0,0 +1,97 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjectionbackoff + +import ( + "time" + + "github.com/cenkalti/backoff/v4" + "k8s.io/apimachinery/pkg/types" +) + +const ( + baseBackoff = 5 * time.Minute + backoffThreshold = 30 * time.Minute +) + +// controllerEntry describes a backed off controller +type controllerEntry struct { + until time.Time + backoff backoff.ExponentialBackOff +} + +// ControllerRegistry contains backed off controllers to be used in time-based backing off of controllers considered in fake pod injection +type ControllerRegistry struct { + backedOffControllers map[types.UID]controllerEntry +} + +// NewFakePodControllerRegistry Creates & returns an instance of fakePodControllerBackoffRegistry +func NewFakePodControllerRegistry() *ControllerRegistry { + return &ControllerRegistry{ + backedOffControllers: make(map[types.UID]controllerEntry), + } +} + +// newExponentialBackOff creates an instance of ExponentialBackOff using non-default values. +func newExponentialBackOff(clock backoff.Clock) backoff.ExponentialBackOff { + b := backoff.ExponentialBackOff{ + InitialInterval: baseBackoff, + // Disables randomization for easier testing and better predictability + RandomizationFactor: 0, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: backoffThreshold, + // Disable stopping if it reaches threshold + MaxElapsedTime: 0, + Stop: backoff.Stop, + Clock: clock, + } + b.Reset() + return b +} + +// BackoffController Backs off a controller +// If the controller is already in backoff it's backoff time is exponentially increased +// If the controller was in backoff, it resets its entry and makes it in backoff +// If the controller is not in backoff and not stored, a new entry is created +func (r *ControllerRegistry) BackoffController(ownerUID types.UID, now time.Time) { + if ownerUID == "" { + return + } + + controller, found := r.backedOffControllers[ownerUID] + + if !found || now.After(controller.until) { + controller = controllerEntry{ + backoff: newExponentialBackOff(backoff.SystemClock), + } + } + // NextBackOff() needs to be called to increase the next interval + controller.until = now.Add(controller.backoff.NextBackOff()) + + r.backedOffControllers[ownerUID] = controller +} + +// BackOffUntil Returns the back off status a controller with id `uid` +func (r *ControllerRegistry) BackOffUntil(uid types.UID, now time.Time) time.Time { + controller, found := r.backedOffControllers[uid] + + if !found { + return time.Time{} + } + + return controller.until +} diff --git a/cluster-autoscaler/processors/podinjection/backoff/backoff_registry_test.go b/cluster-autoscaler/processors/podinjection/backoff/backoff_registry_test.go new file mode 100644 index 000000000000..0794d5bda74f --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/backoff/backoff_registry_test.go @@ -0,0 +1,124 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjectionbackoff + +import ( + "testing" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/types" +) + +func TestBackoffControllerOfPod(t *testing.T) { + c1 := types.UID("c1") + c2 := types.UID("c2") + clock := &clock{} + + testCases := map[string]struct { + backoffCounts map[types.UID]int + spendTime time.Duration + expectedBackedoffControllers map[types.UID]controllerEntry + }{ + "backing-off a controller adds its controller UID in backoff correctly": { + backoffCounts: map[types.UID]int{ + c1: 1, + }, + expectedBackedoffControllers: map[types.UID]controllerEntry{ + c1: { + until: clock.now.Add(baseBackoff), + }, + }, + }, + "backing-off an already backed-off controller exponentially increases backoff duration": { + backoffCounts: map[types.UID]int{ + c1: 2, + }, + expectedBackedoffControllers: map[types.UID]controllerEntry{ + c1: { + until: clock.now.Add(time.Duration(float64(baseBackoff) * backoff.DefaultMultiplier)), + }, + }, + }, + "backing-off a controller doesn't affect other controllers": { + backoffCounts: map[types.UID]int{ + c1: 1, + c2: 2, + }, + expectedBackedoffControllers: map[types.UID]controllerEntry{ + c1: { + until: clock.now.Add(baseBackoff), + }, + c2: { + until: clock.now.Add(time.Duration(float64(baseBackoff) * backoff.DefaultMultiplier)), + }, + }, + }, + "backing-off a past backed-off controller resets backoff": { + backoffCounts: map[types.UID]int{ + c1: 1, + }, + spendTime: baseBackoff * 2, + expectedBackedoffControllers: map[types.UID]controllerEntry{ + c1: { + until: clock.now.Add(baseBackoff * 2).Add(baseBackoff), + }, + }, + }, + "back-off duration doesn't exceed backoffThreshold": { + backoffCounts: map[types.UID]int{ + c1: 15, + }, + expectedBackedoffControllers: map[types.UID]controllerEntry{ + c1: { + until: clock.now.Add(backoffThreshold), + }, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + // Reset time between test cases + clock.now = time.Time{} + clock.now = clock.now.Add(tc.spendTime) + + registry := NewFakePodControllerRegistry() + + for uid, backoffCount := range tc.backoffCounts { + for i := 0; i < backoffCount; i++ { + registry.BackoffController(uid, clock.now) + } + } + + assert.Equal(t, len(registry.backedOffControllers), len(tc.expectedBackedoffControllers)) + for uid, backoffController := range tc.expectedBackedoffControllers { + assert.NotNil(t, registry.backedOffControllers[uid]) + assert.Equal(t, backoffController.until, registry.backedOffControllers[uid].until) + } + }) + } +} + +type clock struct { + now time.Time +} + +func (c *clock) Now() time.Time { + return c.now +} diff --git a/cluster-autoscaler/processors/podinjection/enforce_injected_pods_limit_processor.go b/cluster-autoscaler/processors/podinjection/enforce_injected_pods_limit_processor.go new file mode 100644 index 000000000000..1c674401a953 --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/enforce_injected_pods_limit_processor.go @@ -0,0 +1,56 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" +) + +// EnforceInjectedPodsLimitProcessor is a PodListProcessor used to limit the number of injected fake pods. +type EnforceInjectedPodsLimitProcessor struct { + podLimit int +} + +// NewEnforceInjectedPodsLimitProcessor return an instance of EnforceInjectedPodsLimitProcessor +func NewEnforceInjectedPodsLimitProcessor(podLimit int) *EnforceInjectedPodsLimitProcessor { + return &EnforceInjectedPodsLimitProcessor{ + podLimit: podLimit, + } +} + +// Process filters unschedulablePods and enforces the limit of the number of injected pods +func (p *EnforceInjectedPodsLimitProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) { + + numberOfFakePodsToRemove := len(unschedulablePods) - p.podLimit + var unschedulablePodsAfterProcessing []*apiv1.Pod + + for _, pod := range unschedulablePods { + if IsFake(pod) && numberOfFakePodsToRemove > 0 { + numberOfFakePodsToRemove -= 1 + continue + } + + unschedulablePodsAfterProcessing = append(unschedulablePodsAfterProcessing, pod) + } + + return unschedulablePodsAfterProcessing, nil +} + +// CleanUp is called at CA termination +func (p *EnforceInjectedPodsLimitProcessor) CleanUp() { +} diff --git a/cluster-autoscaler/processors/podinjection/enforce_injected_pods_limit_processor_test.go b/cluster-autoscaler/processors/podinjection/enforce_injected_pods_limit_processor_test.go new file mode 100644 index 000000000000..3e13b8e386e7 --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/enforce_injected_pods_limit_processor_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + "testing" + + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +func TestEnforceInjectedPodsLimitProcessor(t *testing.T) { + + samplePod := buildTestPod("default", "test-pod") + ownerUid := types.UID("sample uid") + + testCases := []struct { + name string + podLimit int + unschedulablePods []*apiv1.Pod + expectedNumberOfResultedUnschedulablePods int + expectedNumberOfResultedUnschedulableFakePods int + expectedNumberOfResultedUnschedulableRealPods int + }{ + { + name: "Real pods = 0 && fake pods < PodLimit", + podLimit: 10, + unschedulablePods: makeFakePods(ownerUid, samplePod, 5), + expectedNumberOfResultedUnschedulablePods: 5, + expectedNumberOfResultedUnschedulableFakePods: 5, + expectedNumberOfResultedUnschedulableRealPods: 0, + }, + { + name: "Real pods = 0 && fake pods > PodLimit", + podLimit: 10, + unschedulablePods: makeFakePods(ownerUid, samplePod, 15), + expectedNumberOfResultedUnschedulablePods: 10, + expectedNumberOfResultedUnschedulableFakePods: 10, + expectedNumberOfResultedUnschedulableRealPods: 0, + }, + { + name: "Real pods > PodLimit && some fake pods", + podLimit: 10, + unschedulablePods: append(makeTestingPods(11), makeFakePods(ownerUid, samplePod, 5)...), + expectedNumberOfResultedUnschedulablePods: 11, + expectedNumberOfResultedUnschedulableFakePods: 0, + expectedNumberOfResultedUnschedulableRealPods: 11, + }, + { + name: "Real pods = PodLimit && some fake pods", + podLimit: 10, + unschedulablePods: append(makeTestingPods(10), makeFakePods(ownerUid, samplePod, 5)...), + expectedNumberOfResultedUnschedulablePods: 10, + expectedNumberOfResultedUnschedulableFakePods: 0, + expectedNumberOfResultedUnschedulableRealPods: 10, + }, + { + name: "Real pods < PodLimit && real pods + fake pods > PodLimit", + podLimit: 10, + unschedulablePods: append(makeTestingPods(3), makeFakePods(ownerUid, samplePod, 10)...), + expectedNumberOfResultedUnschedulablePods: 10, + expectedNumberOfResultedUnschedulableFakePods: 7, + expectedNumberOfResultedUnschedulableRealPods: 3, + }, + { + name: "Real pods < PodLimit && real pods + fake pods < PodLimit", + podLimit: 10, + unschedulablePods: append(makeTestingPods(3), makeFakePods(ownerUid, samplePod, 4)...), + expectedNumberOfResultedUnschedulablePods: 7, + expectedNumberOfResultedUnschedulableFakePods: 4, + expectedNumberOfResultedUnschedulableRealPods: 3, + }, + { + name: "Real pods < PodLimit && real pods + fake pods = PodLimit", + podLimit: 10, + unschedulablePods: append(makeTestingPods(3), makeFakePods(ownerUid, samplePod, 7)...), + expectedNumberOfResultedUnschedulablePods: 10, + expectedNumberOfResultedUnschedulableFakePods: 7, + expectedNumberOfResultedUnschedulableRealPods: 3, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := NewEnforceInjectedPodsLimitProcessor(tc.podLimit) + pods, _ := p.Process(nil, tc.unschedulablePods) + assert.EqualValues(t, tc.expectedNumberOfResultedUnschedulablePods, len(pods)) + numberOfFakePods := numberOfFakePods(pods) + assert.EqualValues(t, tc.expectedNumberOfResultedUnschedulableFakePods, numberOfFakePods) + assert.EqualValues(t, tc.expectedNumberOfResultedUnschedulableRealPods, len(pods)-numberOfFakePods) + }) + } +} + +func numberOfFakePods(pods []*apiv1.Pod) int { + numberOfFakePods := 0 + for _, pod := range pods { + if IsFake(pod) { + numberOfFakePods += 1 + } + } + return numberOfFakePods +} + +func makeTestingPods(numberOfRealTestPods int) []*apiv1.Pod { + var testingPods []*apiv1.Pod + for range numberOfRealTestPods { + testingPods = append(testingPods, buildTestPod("default", "test-pod")) + } + return testingPods +} diff --git a/cluster-autoscaler/processors/podinjection/job_controller.go b/cluster-autoscaler/processors/podinjection/job_controller.go new file mode 100644 index 000000000000..565b129c5ecd --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/job_controller.go @@ -0,0 +1,63 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/klog/v2" +) + +func createJobControllers(ctx *context.AutoscalingContext) []controller { + var controllers []controller + jobs, err := ctx.ListerRegistry.JobLister().List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list jobs: %v", err) + } + for _, job := range jobs { + controllers = append(controllers, controller{uid: job.UID, desiredReplicas: desiredReplicasFromJob(job)}) + } + return controllers +} + +func desiredReplicasFromJob(job *batchv1.Job) int { + parallelism := 1 + completion := 1 + + if job.Spec.Parallelism != nil { + parallelism = int(*(job.Spec.Parallelism)) + } + + if job.Spec.Completions != nil { + completion = int(*(job.Spec.Completions)) + } + + if isWorkQueueJob(job) && job.Status.Succeeded == 0 { + return parallelism + } + + incomplete := completion - int(job.Status.Succeeded) + desiredReplicas := min(incomplete, parallelism) + return max(desiredReplicas, 0) +} + +// isWorkQueueJob returns true if the job is a work queue job (Completions is 1 or nil and Parallelism >=0) +// work queue jobs should have replicas equal to Parallelism regardless in case of no Succeeded +func isWorkQueueJob(job *batchv1.Job) bool { + return (job.Spec.Completions == nil || *(job.Spec.Completions) == 1) && job.Spec.Parallelism != nil && *(job.Spec.Parallelism) >= 0 +} diff --git a/cluster-autoscaler/processors/podinjection/job_controller_test.go b/cluster-autoscaler/processors/podinjection/job_controller_test.go new file mode 100644 index 000000000000..0fdee2a6a0a9 --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/job_controller_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + "testing" + + "github.com/stretchr/testify/assert" + batchv1 "k8s.io/api/batch/v1" +) + +func TestDesiredReplicasFromJob(t *testing.T) { + one := int32(1) + five := int32(5) + ten := int32(10) + + testCases := []struct { + name string + job *batchv1.Job + wantReplicas int + }{ + { + name: "No parallel jobs - Parallelism and completion not defined", + job: &batchv1.Job{}, + wantReplicas: 1, + }, + { + name: "No parallel jobs - Parallelism and completion set to 1", + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: &one, + Completions: &one, + }, + }, + wantReplicas: 1, + }, + { + name: "Parallel Jobs with a fixed completion count: incomplete pods less than parallelism", + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Completions: &ten, + Parallelism: &five, + }, + Status: batchv1.JobStatus{ + Succeeded: 6, + }, + }, + wantReplicas: 4, + }, + { + name: "Parallel Jobs with a fixed completion count: incomplete pods more than parallelism", + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Completions: &ten, + Parallelism: &five, + }, + Status: batchv1.JobStatus{ + Succeeded: 2, + }, + }, + wantReplicas: 5, + }, + { + name: "Work queue with succeeded pods", + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: &five, + }, + Status: batchv1.JobStatus{ + Succeeded: 2, + }, + }, + wantReplicas: 0, + }, + { + name: "Work queue without succeeded pods", + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: &five, + }, + Status: batchv1.JobStatus{ + Succeeded: 0, + }, + }, + wantReplicas: 5, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + replicas := desiredReplicasFromJob(tc.job) + assert.Equal(t, tc.wantReplicas, replicas) + }) + } +} diff --git a/cluster-autoscaler/processors/podinjection/pod_group.go b/cluster-autoscaler/processors/podinjection/pod_group.go new file mode 100644 index 000000000000..747e05a55f24 --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/pod_group.go @@ -0,0 +1,74 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +type podGroup struct { + podCount int + desiredReplicas int + sample *apiv1.Pod + ownerUid types.UID +} + +// groupPods creates a map of controller uids and podGroups. +// If a controller for some pods is not found, such pods are ignored and not grouped +func groupPods(pods []*apiv1.Pod, controllers []controller) map[types.UID]podGroup { + podGroups := map[types.UID]podGroup{} + for _, con := range controllers { + podGroups[con.uid] = makePodGroup(con.desiredReplicas) + } + + for _, pod := range pods { + for _, ownerRef := range pod.OwnerReferences { + podGroups = updatePodGroups(pod, ownerRef, podGroups) + } + } + return podGroups +} + +// updatePodGroups updates the pod group if ownerRef is the controller of the pod +func updatePodGroups(pod *apiv1.Pod, ownerRef metav1.OwnerReference, podGroups map[types.UID]podGroup) map[types.UID]podGroup { + if ownerRef.Controller == nil { + return podGroups + } + if !*(ownerRef.Controller) { + return podGroups + } + group, found := podGroups[ownerRef.UID] + if !found { + return podGroups + } + if group.sample == nil && pod.Spec.NodeName == "" { + group.sample = pod + group.ownerUid = ownerRef.UID + } + group.podCount += 1 + podGroups[ownerRef.UID] = group + return podGroups +} + +func makePodGroup(desiredReplicas int) podGroup { + return podGroup{ + podCount: 0, + desiredReplicas: desiredReplicas, + } +} diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor.go new file mode 100644 index 000000000000..74fb4b6d9a20 --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor.go @@ -0,0 +1,164 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + "fmt" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/autoscaler/cluster-autoscaler/context" + podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const ( + // FakePodAnnotationKey the key for pod type + FakePodAnnotationKey = "podtype" + // FakePodAnnotationValue the value for a fake pod + FakePodAnnotationValue = "fakepod" +) + +// PodInjectionPodListProcessor is a PodListProcessor used to inject fake pods to consider replica count in the respective controllers for the scale-up. +// For each controller, #fake pods injected = #replicas specified the controller - #scheduled pods - #finished pods - #unschedulable pods +type PodInjectionPodListProcessor struct { + fakePodControllerBackoffRegistry *podinjectionbackoff.ControllerRegistry +} + +// controller is a struct that can be used to abstract different pod controllers +type controller struct { + uid types.UID + desiredReplicas int +} + +// NewPodInjectionPodListProcessor return an instance of PodInjectionPodListProcessor +func NewPodInjectionPodListProcessor(fakePodRegistry *podinjectionbackoff.ControllerRegistry) *PodInjectionPodListProcessor { + return &PodInjectionPodListProcessor{ + fakePodControllerBackoffRegistry: fakePodRegistry, + } +} + +// Process updates unschedulablePods by injecting fake pods to match target replica count +func (p *PodInjectionPodListProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) { + + controllers := listControllers(ctx) + controllers = p.skipBackedoffControllers(controllers) + + nodeInfos, err := ctx.ClusterSnapshot.NodeInfos().List() + if err != nil { + klog.Errorf("Failed to list nodeInfos from cluster snapshot: %v", err) + return unschedulablePods, fmt.Errorf("failed to list nodeInfos from cluster snapshot: %v", err) + } + scheduledPods := podsFromNodeInfos(nodeInfos) + + groupedPods := groupPods(append(scheduledPods, unschedulablePods...), controllers) + var podsToInject []*apiv1.Pod + + for _, groupedPod := range groupedPods { + var fakePodCount = groupedPod.fakePodCount() + fakePods := makeFakePods(groupedPod.ownerUid, groupedPod.sample, fakePodCount) + podsToInject = append(podsToInject, fakePods...) + } + + unschedulablePodsAfterProcessing := append(unschedulablePods, podsToInject...) + + return unschedulablePodsAfterProcessing, nil +} + +// CleanUp is called at CA termination +func (p *PodInjectionPodListProcessor) CleanUp() { +} + +// makeFakePods creates podCount number of copies of the sample pod +// makeFakePods also adds annotation to the pod to be marked as "fake" +func makeFakePods(ownerUid types.UID, samplePod *apiv1.Pod, podCount int) []*apiv1.Pod { + var fakePods []*apiv1.Pod + for i := 1; i <= podCount; i++ { + newPod := withFakePodAnnotation(samplePod.DeepCopy()) + newPod.Name = fmt.Sprintf("%s-copy-%d", samplePod.Name, i) + newPod.UID = types.UID(fmt.Sprintf("%s-%d", string(ownerUid), i)) + fakePods = append(fakePods, newPod) + } + return fakePods +} + +// withFakePodAnnotation adds annotation of key `FakePodAnnotationKey` with value `FakePodAnnotationValue` to passed pod. +// withFakePodAnnotation also creates a new annotations map if original pod.Annotations is nil +func withFakePodAnnotation(pod *apiv1.Pod) *apiv1.Pod { + if pod.Annotations == nil { + pod.Annotations = make(map[string]string, 1) + } + pod.Annotations[FakePodAnnotationKey] = FakePodAnnotationValue + return pod +} + +// fakePodCount calculate the fake pod count that should be injected from this podGroup +func (p *podGroup) fakePodCount() int { + // Controllers with no unschedulable pods are ignored + if p.podCount == 0 || p.sample == nil { + return 0 + } + fakePodCount := p.desiredReplicas - p.podCount + if fakePodCount <= 0 { + return 0 + } + return fakePodCount +} + +// podsFromNodeInfos return all the pods in the nodeInfos +func podsFromNodeInfos(nodeInfos []*framework.NodeInfo) []*apiv1.Pod { + var pods []*apiv1.Pod + for _, nodeInfo := range nodeInfos { + for _, podInfo := range nodeInfo.Pods { + pods = append(pods, podInfo.Pod) + } + } + return pods +} + +// listControllers returns the list of controllers that can be used to inject fake pods +func listControllers(ctx *context.AutoscalingContext) []controller { + var controllers []controller + controllers = append(controllers, createReplicaSetControllers(ctx)...) + controllers = append(controllers, createJobControllers(ctx)...) + controllers = append(controllers, createStatefulSetControllers(ctx)...) + return controllers +} + +// IsFake returns true if the a pod is marked as fake and false otherwise +func IsFake(pod *apiv1.Pod) bool { + if pod.Annotations == nil { + return false + } + return pod.Annotations[FakePodAnnotationKey] == FakePodAnnotationValue +} + +func (p *PodInjectionPodListProcessor) skipBackedoffControllers(controllers []controller) []controller { + var filteredControllers []controller + backoffRegistry := p.fakePodControllerBackoffRegistry + now := time.Now() + for _, controller := range controllers { + if backoffUntil := backoffRegistry.BackOffUntil(controller.uid, now); backoffUntil.After(now) { + klog.Warningf("Skipping generating fake pods for controller in backoff until (%s): %v", backoffUntil.Format(time.TimeOnly), controller.uid) + continue + } + filteredControllers = append(filteredControllers, controller) + } + return filteredControllers +} diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go new file mode 100644 index 000000000000..d2f96b244585 --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -0,0 +1,420 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/autoscaler/cluster-autoscaler/context" + podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestTargetCountInjectionPodListProcessor(t *testing.T) { + node := BuildTestNode("node1", 100, 0) + + replicaSet1 := createTestReplicaSet("rep-set-1", "default", 5) + scheduledPodRep1Copy1 := buildTestPod("default", "-scheduled-pod-rep1-1", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID), withNodeName(node.Name)) + podRep1Copy1 := buildTestPod("default", "pod-rep1-1", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) + podRep1Copy2 := buildTestPod("default", "pod-rep1-2", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) + + job1 := createTestJob("job-1", "default", 10, 10, 0) + scheduledPodJob1Copy1 := buildTestPod("default", "scheduled-pod-job1-1", withControllerOwnerRef(job1.Name, "Job", job1.UID), withNodeName(node.Name)) + podJob1Copy1 := buildTestPod("default", "pod-job1-1", withControllerOwnerRef(job1.Name, "Job", job1.UID)) + podJob1Copy2 := buildTestPod("default", "pod-job1-2", withControllerOwnerRef(job1.Name, "Job", job1.UID)) + + parallelStatefulset := createTestStatefulset("parallel-statefulset-1", "default", appsv1.ParallelPodManagement, 10) + scheduledParallelStatefulsetPod := buildTestPod("default", "parallel-scheduled-pod-statefulset-1", withControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID), withNodeName(node.Name)) + parallelStatefulsetPodCopy1 := buildTestPod("default", "parallel-pod-statefulset1-1", withControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID)) + parallelStatefulsetPodCopy2 := buildTestPod("default", "parallel-pod-statefulset1-2", withControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID)) + + sequentialStatefulset := createTestStatefulset("sequential-statefulset-1", "default", appsv1.OrderedReadyPodManagement, 10) + scheduledSequentialStatefulsetPod := buildTestPod("default", "sequential-scheduled-pod-statefulset-1", withControllerOwnerRef(sequentialStatefulset.Name, "StatefulSet", sequentialStatefulset.UID), withNodeName(node.Name)) + sequentialStatefulsetPodCopy1 := buildTestPod("default", "sequential-pod-statefulset1-1", withControllerOwnerRef(sequentialStatefulset.Name, "StatefulSet", sequentialStatefulset.UID)) + sequentialStatefulsetPodCopy2 := buildTestPod("default", "sequential-pod-statefulset1-2", withControllerOwnerRef(sequentialStatefulset.Name, "StatefulSet", sequentialStatefulset.UID)) + + replicaSetLister, err := kubernetes.NewTestReplicaSetLister([]*appsv1.ReplicaSet{&replicaSet1}) + assert.NoError(t, err) + jobLister, err := kubernetes.NewTestJobLister([]*batchv1.Job{&job1}) + assert.NoError(t, err) + statefulsetLister, err := kubernetes.NewTestStatefulSetLister([]*appsv1.StatefulSet{¶llelStatefulset, &sequentialStatefulset}) + assert.NoError(t, err) + + testCases := []struct { + name string + scheduledPods []*apiv1.Pod + unschedulabePods []*apiv1.Pod + wantPods []*apiv1.Pod + }{ + { + name: "ReplicaSet", + scheduledPods: []*apiv1.Pod{scheduledPodRep1Copy1}, + unschedulabePods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2}, + wantPods: append([]*apiv1.Pod{podRep1Copy1, podRep1Copy2}, makeFakePods(replicaSet1.UID, podRep1Copy1, 2)...), + }, + { + name: "Job", + scheduledPods: []*apiv1.Pod{scheduledPodJob1Copy1}, + unschedulabePods: []*apiv1.Pod{podJob1Copy1, podJob1Copy2}, + wantPods: append([]*apiv1.Pod{podJob1Copy1, podJob1Copy2}, makeFakePods(job1.UID, podJob1Copy1, 7)...), + }, + { + name: "Statefulset - Parallel pod management policy", + scheduledPods: []*apiv1.Pod{scheduledParallelStatefulsetPod}, + unschedulabePods: []*apiv1.Pod{parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2}, + wantPods: append([]*apiv1.Pod{parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2}, makeFakePods(parallelStatefulset.UID, parallelStatefulsetPodCopy1, 7)...), + }, + { + name: "Statefulset - sequential pod management policy", + scheduledPods: []*apiv1.Pod{scheduledSequentialStatefulsetPod}, + unschedulabePods: []*apiv1.Pod{sequentialStatefulsetPodCopy1, sequentialStatefulsetPodCopy2}, + wantPods: []*apiv1.Pod{sequentialStatefulsetPodCopy1, sequentialStatefulsetPodCopy2}, + }, + { + name: "Mix of controllers", + scheduledPods: []*apiv1.Pod{scheduledPodRep1Copy1, scheduledPodJob1Copy1, scheduledParallelStatefulsetPod}, + unschedulabePods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2}, + wantPods: append( + append( + append( + []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2}, + makeFakePods(replicaSet1.UID, podRep1Copy1, 2)...), + makeFakePods(job1.UID, podJob1Copy1, 7)...), + makeFakePods(parallelStatefulset.UID, parallelStatefulsetPodCopy1, 7)..., + ), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) + clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot() + clusterSnapshot.AddNode(node) + for _, pod := range tc.scheduledPods { + clusterSnapshot.AddPod(pod, node.Name) + } + ctx := context.AutoscalingContext{ + AutoscalingKubeClients: context.AutoscalingKubeClients{ + ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister), + }, + ClusterSnapshot: clusterSnapshot, + } + pods, err := p.Process(&ctx, tc.unschedulabePods) + assert.NoError(t, err) + assert.ElementsMatch(t, tc.wantPods, pods) + }) + } +} + +func TestGroupPods(t *testing.T) { + noControllerPod := buildTestPod("default", "pod-no-podGroup") + + replicaSet1 := createTestReplicaSet("rep-set-1", "default", 10) + podRep1Copy1 := buildTestPod("default", "pod-rep1-1", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) + podRep1Copy2 := buildTestPod("default", "pod-rep1-2", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) + podRep1ScheduledCopy1 := buildTestPod("default", "pod-rep1-3", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID), withNodeName("n1")) + podRep1ScheduledCopy2 := buildTestPod("default", "pod-rep1-4", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID), withNodeName("n1")) + + replicaSet2 := createTestReplicaSet("rep-set-2", "default", 10) + podRep2Copy1 := buildTestPod("default", "pod-rep2-1", withControllerOwnerRef(replicaSet2.Name, "ReplicaSet", replicaSet2.UID)) + podRep2ScheduledCopy1 := buildTestPod("default", "pod-rep2-1", withControllerOwnerRef(replicaSet2.Name, "ReplicaSet", replicaSet2.UID), withNodeName("n1")) + + replicaSet3 := createTestReplicaSet("rep-set-3", "default", 10) + podRep3Copy1 := buildTestPod("default", "pod-rep3-1", withControllerOwnerRef(replicaSet3.Name, "ReplicaSet", replicaSet3.UID)) + + job1 := createTestJob("job-1", "default", 10, 10, 0) + podJob1Copy1 := buildTestPod("default", "pod-job1-1", withControllerOwnerRef(job1.Name, "Job", job1.UID)) + podJob1Copy2 := buildTestPod("default", "pod-job1-2", withControllerOwnerRef(job1.Name, "Job", job1.UID)) + + job2 := createTestJob("job-2", "default", 10, 10, 0) + podJob2Copy1 := buildTestPod("default", "pod-job-2", withControllerOwnerRef(job2.Name, "Job", job2.UID)) + + statefulset1 := createTestStatefulset("statefulset-1", "default", appsv1.ParallelPodManagement, 10) + statefulset1Copy1 := buildTestPod("default", "pod-statefulset1-1", withControllerOwnerRef(statefulset1.Name, "StatefulSet", statefulset1.UID)) + statefulset1Copy2 := buildTestPod("default", "pod-statefulset1-2", withControllerOwnerRef(statefulset1.Name, "StatefulSet", statefulset1.UID)) + + statefulset2 := createTestStatefulset("statefulset-2", "default", appsv1.ParallelPodManagement, 10) + statefulset2Copy1 := buildTestPod("default", "pod-statefulset2-1", withControllerOwnerRef(statefulset2.Name, "StatefulSet", statefulset2.UID)) + + testCases := []struct { + name string + unscheduledPods []*apiv1.Pod + scheduledPods []*apiv1.Pod + replicaSets []*appsv1.ReplicaSet + jobs []*batchv1.Job + statefulsets []*appsv1.StatefulSet + wantGroupedPods map[types.UID]podGroup + }{ + { + name: "no pods", + replicaSets: []*appsv1.ReplicaSet{&replicaSet1, &replicaSet2}, + wantGroupedPods: map[types.UID]podGroup{ + replicaSet1.UID: {podCount: 0, desiredReplicas: 10, sample: nil}, + replicaSet2.UID: {podCount: 0, desiredReplicas: 10, sample: nil}, + }, + }, + { + name: "no unschedulable pods", + scheduledPods: []*apiv1.Pod{podRep1ScheduledCopy1, podRep1ScheduledCopy2, podRep2ScheduledCopy1}, + replicaSets: []*appsv1.ReplicaSet{&replicaSet1, &replicaSet2}, + wantGroupedPods: map[types.UID]podGroup{ + replicaSet1.UID: {podCount: 2, desiredReplicas: 10, sample: nil}, + replicaSet2.UID: {podCount: 1, desiredReplicas: 10, sample: nil}, + }, + }, + { + name: "scheduled and unschedulable pods", + scheduledPods: []*apiv1.Pod{podRep1ScheduledCopy2}, + unscheduledPods: []*apiv1.Pod{podRep1Copy1, podRep2Copy1}, + replicaSets: []*appsv1.ReplicaSet{&replicaSet1, &replicaSet2}, + wantGroupedPods: map[types.UID]podGroup{ + replicaSet1.UID: {podCount: 2, desiredReplicas: 10, sample: podRep1Copy1, ownerUid: replicaSet1.UID}, + replicaSet2.UID: {podCount: 1, desiredReplicas: 10, sample: podRep2Copy1, ownerUid: replicaSet2.UID}, + }, + }, + { + name: "pods without a controller are ignored", + unscheduledPods: []*apiv1.Pod{noControllerPod}, + wantGroupedPods: map[types.UID]podGroup{}, + }, + { + name: "unable to retrieve a controller - pods are ignored", + unscheduledPods: []*apiv1.Pod{podRep3Copy1}, + wantGroupedPods: map[types.UID]podGroup{}, + }, + { + name: "pods form multiple replicaSets", + unscheduledPods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podRep2Copy1}, + replicaSets: []*appsv1.ReplicaSet{&replicaSet1, &replicaSet2}, + wantGroupedPods: map[types.UID]podGroup{ + replicaSet1.UID: {podCount: 2, desiredReplicas: 10, sample: podRep1Copy1, ownerUid: replicaSet1.UID}, + replicaSet2.UID: {podCount: 1, desiredReplicas: 10, sample: podRep2Copy1, ownerUid: replicaSet2.UID}, + }, + }, + { + name: "pods form multiple jobs", + unscheduledPods: []*apiv1.Pod{podJob1Copy1, podJob1Copy2, podJob2Copy1}, + jobs: []*batchv1.Job{&job1, &job2}, + wantGroupedPods: map[types.UID]podGroup{ + job1.UID: {podCount: 2, desiredReplicas: 10, sample: podJob1Copy1, ownerUid: job1.UID}, + job2.UID: {podCount: 1, desiredReplicas: 10, sample: podJob2Copy1, ownerUid: job2.UID}, + }, + }, + { + name: "pods form multiple statefulsets", + unscheduledPods: []*apiv1.Pod{statefulset1Copy1, statefulset1Copy2, statefulset2Copy1}, + statefulsets: []*appsv1.StatefulSet{&statefulset1, &statefulset2}, + wantGroupedPods: map[types.UID]podGroup{ + statefulset1.UID: {podCount: 2, desiredReplicas: 10, sample: statefulset1Copy1, ownerUid: statefulset1.UID}, + statefulset2.UID: {podCount: 1, desiredReplicas: 10, sample: statefulset2Copy1, ownerUid: statefulset2.UID}, + }, + }, + { + name: "unscheduledPods from multiple different controllers", + unscheduledPods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podRep2Copy1, podJob1Copy1, statefulset1Copy1}, + replicaSets: []*appsv1.ReplicaSet{&replicaSet1, &replicaSet2}, + jobs: []*batchv1.Job{&job1}, + statefulsets: []*appsv1.StatefulSet{&statefulset1}, + wantGroupedPods: map[types.UID]podGroup{ + replicaSet1.UID: {podCount: 2, desiredReplicas: 10, sample: podRep1Copy1, ownerUid: replicaSet1.UID}, + replicaSet2.UID: {podCount: 1, desiredReplicas: 10, sample: podRep2Copy1, ownerUid: replicaSet2.UID}, + job1.UID: {podCount: 1, desiredReplicas: 10, sample: podJob1Copy1, ownerUid: job1.UID}, + statefulset1.UID: {podCount: 1, desiredReplicas: 10, sample: statefulset1Copy1, ownerUid: statefulset1.UID}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + replicaSetLister, err := kubernetes.NewTestReplicaSetLister(tc.replicaSets) + assert.NoError(t, err) + jobLister, err := kubernetes.NewTestJobLister(tc.jobs) + assert.NoError(t, err) + statefulsetLister, err := kubernetes.NewTestStatefulSetLister(tc.statefulsets) + assert.NoError(t, err) + + ctx := context.AutoscalingContext{ + AutoscalingKubeClients: context.AutoscalingKubeClients{ + ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister), + }, + } + controllers := listControllers(&ctx) + groupedPods := groupPods(append(tc.scheduledPods, tc.unscheduledPods...), controllers) + assert.Equal(t, tc.wantGroupedPods, groupedPods) + }) + } +} + +func TestUpdatePodGroups(t *testing.T) { + replicaSet1 := createTestReplicaSet("rep-set-1", "default", 10) + podRep1Copy1 := buildTestPod("default", "pod-rep1-1", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) + podRep1Copy2 := buildTestPod("default", "pod-rep1-2", withControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID)) + samplePodGroups := map[types.UID]podGroup{replicaSet1.UID: makePodGroup(10)} + sampleFalse := false + sampleTrue := true + + testCases := []struct { + name string + pod *apiv1.Pod + ownerRef metav1.OwnerReference + podGroups map[types.UID]podGroup + wantPodGroup map[types.UID]podGroup + }{ + { + name: "owner ref nil controller", + pod: podRep1Copy1, + ownerRef: metav1.OwnerReference{}, + podGroups: samplePodGroups, + wantPodGroup: samplePodGroups, + }, + { + name: "owner ref controller set to false", + pod: podRep1Copy1, + ownerRef: metav1.OwnerReference{Controller: &sampleFalse}, + podGroups: samplePodGroups, + wantPodGroup: samplePodGroups, + }, + { + name: "owner ref controller not found", + pod: podRep1Copy1, + ownerRef: metav1.OwnerReference{Controller: &sampleTrue, UID: types.UID("not found uid")}, + podGroups: samplePodGroups, + wantPodGroup: samplePodGroups, + }, + { + name: "sample pod added and count updated", + pod: podRep1Copy1, + ownerRef: podRep1Copy1.OwnerReferences[0], + podGroups: samplePodGroups, + wantPodGroup: map[types.UID]podGroup{replicaSet1.UID: { + podCount: 1, + desiredReplicas: 10, + sample: podRep1Copy1, + ownerUid: replicaSet1.UID, + }, + }, + }, + { + name: "only count updated", + pod: podRep1Copy2, + ownerRef: podRep1Copy1.OwnerReferences[0], + podGroups: map[types.UID]podGroup{replicaSet1.UID: { + podCount: 1, + desiredReplicas: 10, + sample: podRep1Copy1, + ownerUid: replicaSet1.UID, + }, + }, + wantPodGroup: map[types.UID]podGroup{replicaSet1.UID: { + podCount: 2, + desiredReplicas: 10, + sample: podRep1Copy1, + ownerUid: replicaSet1.UID, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + podGroups := updatePodGroups(tc.pod, tc.ownerRef, tc.podGroups) + assert.Equal(t, tc.wantPodGroup, podGroups) + }) + } +} +func TestMakeFakePods(t *testing.T) { + samplePod := buildTestPod("default", "test-pod") + // Test case: Positive fake pod count + fakePodCount := 5 + ownerUid := types.UID("sample uid") + fakePods := makeFakePods(ownerUid, samplePod, fakePodCount) + assert.Equal(t, fakePodCount, len(fakePods)) + for idx, fakePod := range fakePods { + assert.Equal(t, fakePod.Name, fmt.Sprintf("%s-copy-%d", samplePod.Name, idx+1)) + assert.Equal(t, fakePod.UID, types.UID(fmt.Sprintf("%s-%d", string(ownerUid), idx+1))) + assert.NotNil(t, fakePod.Annotations) + assert.Equal(t, fakePod.Annotations[FakePodAnnotationKey], FakePodAnnotationValue) + } + + // Test case: Zero fake pod count + fakePodCount = 0 + fakePods = makeFakePods(ownerUid, samplePod, fakePodCount) + assert.Nil(t, fakePods) +} + +func createTestReplicaSet(uid, namespace string, targetReplicaCount int32) appsv1.ReplicaSet { + return appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid), Name: uid, Namespace: namespace}, + Spec: appsv1.ReplicaSetSpec{ + Replicas: &targetReplicaCount, + }, + } +} + +func createTestJob(uid, namespace string, parallelism, completions, succeeded int32) batchv1.Job { + return batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid), Name: uid, Namespace: namespace}, + Spec: batchv1.JobSpec{ + Parallelism: ¶llelism, + Completions: &completions, + }, + Status: batchv1.JobStatus{ + Succeeded: succeeded, + }, + } +} +func createTestStatefulset(uid, namespace string, podManagementPolicy appsv1.PodManagementPolicyType, numReplicas int32) appsv1.StatefulSet { + return appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid), Name: uid, Namespace: namespace}, + Spec: appsv1.StatefulSetSpec{ + Replicas: &numReplicas, + PodManagementPolicy: podManagementPolicy, + }, + } +} + +func buildTestPod(namespace, name string, opts ...podOption) *apiv1.Pod { + pod := BuildTestPod(name, 10, 10) + pod.Namespace = namespace + for _, opt := range opts { + opt(pod) + } + return pod +} + +type podOption func(*apiv1.Pod) + +func withControllerOwnerRef(name, kind string, uid types.UID) podOption { + return func(pod *apiv1.Pod) { + pod.OwnerReferences = GenerateOwnerReferences(name, kind, "apps/v1", uid) + } +} + +func withNodeName(nodeName string) podOption { + return func(pod *apiv1.Pod) { + pod.Spec.NodeName = nodeName + } +} diff --git a/cluster-autoscaler/processors/podinjection/replicaset_controller.go b/cluster-autoscaler/processors/podinjection/replicaset_controller.go new file mode 100644 index 000000000000..4384e182b3dd --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/replicaset_controller.go @@ -0,0 +1,44 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/klog/v2" +) + +func createReplicaSetControllers(ctx *context.AutoscalingContext) []controller { + var controllers []controller + replicaSets, err := ctx.ListerRegistry.ReplicaSetLister().List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list replicaSets: %v", err) + return controllers + } + for _, replicaSet := range replicaSets { + controllers = append(controllers, controller{uid: replicaSet.UID, desiredReplicas: desiredReplicasFromReplicaSet(replicaSet)}) + } + return controllers +} + +func desiredReplicasFromReplicaSet(replicaSet *appsv1.ReplicaSet) int { + if replicaSet.Spec.Replicas == nil { + return 0 + } + return int(*replicaSet.Spec.Replicas) +} diff --git a/cluster-autoscaler/processors/podinjection/scale_up_status_processor.go b/cluster-autoscaler/processors/podinjection/scale_up_status_processor.go new file mode 100644 index 000000000000..e829acef6d5c --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/scale_up_status_processor.go @@ -0,0 +1,109 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + "strings" + "time" + + apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + ca_context "k8s.io/autoscaler/cluster-autoscaler/context" + podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/klog/v2" +) + +// FakePodsScaleUpStatusProcessor is a ScaleUpStatusProcessor used for filtering out fake pods from scaleup status. +type FakePodsScaleUpStatusProcessor struct { + fakePodControllerBackoffRegistry *podinjectionbackoff.ControllerRegistry +} + +// NewFakePodsScaleUpStatusProcessor return an instance of FakePodsScaleUpStatusProcessor +func NewFakePodsScaleUpStatusProcessor(fakePodRegistry *podinjectionbackoff.ControllerRegistry) *FakePodsScaleUpStatusProcessor { + return &FakePodsScaleUpStatusProcessor{ + fakePodControllerBackoffRegistry: fakePodRegistry, + } +} + +// Process updates scaleupStatus to remove all fake pods from +// PodsRemainUnschedulable, PodsAwaitEvaluation & PodsTriggeredScaleup +func (a *FakePodsScaleUpStatusProcessor) Process(_ *ca_context.AutoscalingContext, scaleUpStatus *status.ScaleUpStatus) { + controllersToBackoff := extractFakePodsControllersUIDs(scaleUpStatus.PodsRemainUnschedulable) + for uid := range controllersToBackoff { + a.fakePodControllerBackoffRegistry.BackoffController(uid, time.Now()) + } + + scaleUpStatus.PodsRemainUnschedulable = filterFakePods(scaleUpStatus.PodsRemainUnschedulable, func(noScaleUpInfo status.NoScaleUpInfo) *apiv1.Pod { return noScaleUpInfo.Pod }, "PodsRemainUnschedulable") + scaleUpStatus.PodsAwaitEvaluation = filterFakePods(scaleUpStatus.PodsAwaitEvaluation, func(pod *apiv1.Pod) *apiv1.Pod { return pod }, "PodsAwaitEvaluation") + scaleUpStatus.PodsTriggeredScaleUp = filterFakePods(scaleUpStatus.PodsTriggeredScaleUp, func(pod *apiv1.Pod) *apiv1.Pod { return pod }, "PodsTriggeredScaleUp") +} + +// filterFakePods removes fake pods from the input list of T using passed getPod(T) +// Uses `resourceName` to log which resource it has modified +// Returns a list containing only non-fake pods +func filterFakePods[T any](podsWrappers []T, getPod func(T) *apiv1.Pod, resourceName string) []T { + filteredPodsSouces := make([]T, 0) + removedPods := make([]*apiv1.Pod, 0) + + for _, podsWrapper := range podsWrappers { + currentPod := getPod(podsWrapper) + if !IsFake(currentPod) { + filteredPodsSouces = append(filteredPodsSouces, podsWrapper) + continue + } + + controllerRef := v1.GetControllerOf(currentPod) + if controllerRef == nil { + klog.Infof("Failed to find controller for pod %s, ignoring.", currentPod.Name) + continue + } + + removedPods = append(removedPods, currentPod) + klog.V(5).Infof("Filtering out pod %s from PodsRemainUnschedulable with controller reference %s", currentPod.Name, controllerRef.Name) + } + + logRemovedPods(removedPods, resourceName) + return filteredPodsSouces +} + +// extractFakePodsControllersUIDs extracts the uids from NoScaleUpInfos with fake pods +func extractFakePodsControllersUIDs(NoScaleUpInfos []status.NoScaleUpInfo) map[types.UID]bool { + uids := make(map[types.UID]bool) + for _, NoScaleUpInfo := range NoScaleUpInfos { + if IsFake(NoScaleUpInfo.Pod) { + uids[NoScaleUpInfo.Pod.UID] = true + } + } + return uids +} + +// logRemovedPods logs the removed pods from resourceName +func logRemovedPods(removedPods []*apiv1.Pod, resourceName string) { + if len(removedPods) == 0 { + return + } + controllerRefNames := make([]string, len(removedPods)) + for idx, pod := range removedPods { + controllerRefNames[idx] = v1.GetControllerOf(pod).Name + } + klog.Infof("Filtered out %d pods from %s for controllers %s", len(removedPods), resourceName, strings.Join(controllerRefNames, ", ")) +} + +// CleanUp is called at CA termination +func (a *FakePodsScaleUpStatusProcessor) CleanUp() {} diff --git a/cluster-autoscaler/processors/podinjection/scale_up_status_processor_test.go b/cluster-autoscaler/processors/podinjection/scale_up_status_processor_test.go new file mode 100644 index 000000000000..9cd6126f938e --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/scale_up_status_processor_test.go @@ -0,0 +1,104 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + "testing" + + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestProcess(t *testing.T) { + testCases := map[string]struct { + podsRemainUnschedulable []*apiv1.Pod + podsAwaitEvaluation []*apiv1.Pod + podsTriggeredScaleUp []*apiv1.Pod + expectedPodsRemainUnschedulable []*apiv1.Pod + expectedPodsAwaitEvaluation []*apiv1.Pod + expectedPodsTriggeredScaleUp []*apiv1.Pod + }{ + "Fake pods are removed from PodsRemainUnschedulable": { + podsRemainUnschedulable: []*apiv1.Pod{createPod("pod-1", false), createPod("fake-pod-1", true)}, + expectedPodsRemainUnschedulable: []*apiv1.Pod{createPod("pod-1", false)}, + }, + "Fake pods are removed from PodsTriggerScaleup": { + podsTriggeredScaleUp: []*apiv1.Pod{createPod("pod-1", false), createPod("fake-pod-1", true)}, + expectedPodsTriggeredScaleUp: []*apiv1.Pod{createPod("pod-1", false)}, + }, + "Fake pods are removed from PodsAwaitEvaluation": { + podsAwaitEvaluation: []*apiv1.Pod{createPod("pod-1", false), createPod("fake-pod-1", true)}, + expectedPodsAwaitEvaluation: []*apiv1.Pod{createPod("pod-1", false)}, + }, + "Fake pods are removed from all pod related lists in scaleup status": { + podsTriggeredScaleUp: []*apiv1.Pod{createPod("pod-1", false), createPod("fake-pod-1", true)}, + expectedPodsTriggeredScaleUp: []*apiv1.Pod{createPod("pod-1", false)}, + podsRemainUnschedulable: []*apiv1.Pod{createPod("pod-2", false), createPod("fake-pod-2", true)}, + expectedPodsRemainUnschedulable: []*apiv1.Pod{createPod("pod-2", false)}, + podsAwaitEvaluation: []*apiv1.Pod{createPod("pod-3", false), createPod("fake-pod-3", true)}, + expectedPodsAwaitEvaluation: []*apiv1.Pod{createPod("pod-3", false)}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + scaleUpStatus := &status.ScaleUpStatus{ + PodsTriggeredScaleUp: tc.podsTriggeredScaleUp, + PodsAwaitEvaluation: tc.podsAwaitEvaluation, + PodsRemainUnschedulable: makeNoScaleUpInfoFromPods(tc.podsRemainUnschedulable), + } + ctx := &context.AutoscalingContext{} + + p := NewFakePodsScaleUpStatusProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) + p.Process(ctx, scaleUpStatus) + + assert.ElementsMatch(t, tc.expectedPodsRemainUnschedulable, extractPodsFromNoScaleUpInfo(scaleUpStatus.PodsRemainUnschedulable)) + assert.ElementsMatch(t, tc.expectedPodsAwaitEvaluation, scaleUpStatus.PodsAwaitEvaluation) + assert.ElementsMatch(t, tc.expectedPodsTriggeredScaleUp, scaleUpStatus.PodsTriggeredScaleUp) + }) + } +} + +func createPod(name string, isFake bool) *apiv1.Pod { + return BuildTestPod(name, 10, 10, func(p *apiv1.Pod) { + if !isFake { + return + } + *p = *withFakePodAnnotation(p) + }) +} + +func makeNoScaleUpInfoFromPods(pods []*apiv1.Pod) []status.NoScaleUpInfo { + noScaleUpInfos := make([]status.NoScaleUpInfo, len(pods)) + for idx, pod := range pods { + noScaleUpInfos[idx] = status.NoScaleUpInfo{ + Pod: pod, + } + } + return noScaleUpInfos +} +func extractPodsFromNoScaleUpInfo(noScaleUpInfos []status.NoScaleUpInfo) []*apiv1.Pod { + pods := make([]*apiv1.Pod, len(noScaleUpInfos)) + for idx, noScaleUpInfo := range noScaleUpInfos { + pods[idx] = noScaleUpInfo.Pod + } + return pods +} diff --git a/cluster-autoscaler/processors/podinjection/statefulset_controller.go b/cluster-autoscaler/processors/podinjection/statefulset_controller.go new file mode 100644 index 000000000000..66f4ba21c533 --- /dev/null +++ b/cluster-autoscaler/processors/podinjection/statefulset_controller.go @@ -0,0 +1,49 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podinjection + +import ( + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/klog/v2" +) + +func createStatefulSetControllers(ctx *context.AutoscalingContext) []controller { + var controllers []controller + statefulSets, err := ctx.ListerRegistry.StatefulSetLister().List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list statefulsets: %v", err) + return controllers + } + for _, statefulSet := range statefulSets { + // Non parallel pod management (OrderedReadyPodManagement) waits for a pod to be ready to create another one + // Which becomes very slow (can take 2+ mins per pod) + // Making fast scale ups not a priority and fake pod injection costly + if statefulSet.Spec.PodManagementPolicy == appsv1.ParallelPodManagement { + controllers = append(controllers, controller{uid: statefulSet.UID, desiredReplicas: desiredReplicasFromStatefulSet(statefulSet)}) + } + } + return controllers +} + +func desiredReplicasFromStatefulSet(statefulSet *appsv1.StatefulSet) int { + if statefulSet.Spec.Replicas == nil { + return 0 + } + return int(*statefulSet.Spec.Replicas) +}