diff --git a/go.mod b/go.mod index 3024d93ce1..a6b2e6a03e 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,7 @@ go 1.21 require ( github.com/Pallinder/go-randomdata v1.2.0 github.com/avast/retry-go v3.0.0+incompatible - github.com/deckarep/golang-set v1.8.0 - github.com/docker/docker v25.0.1+incompatible + github.com/docker/docker v25.0.2+incompatible github.com/go-logr/logr v1.4.1 github.com/go-logr/zapr v1.3.0 github.com/imdario/mergo v0.3.16 diff --git a/go.sum b/go.sum index 5c12b1a61d..d4eeef7fa6 100644 --- a/go.sum +++ b/go.sum @@ -71,10 +71,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4= -github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= -github.com/docker/docker v25.0.1+incompatible h1:k5TYd5rIVQRSqcTwCID+cyVA0yRg86+Pcrz1ls0/frA= -github.com/docker/docker v25.0.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v25.0.2+incompatible h1:/OaKeauroa10K4Nqavw4zlhcDq/WBcPMc5DbjOGgozY= +github.com/docker/docker v25.0.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= diff --git a/kwok/main.go b/kwok/main.go index 13e2b5d85f..0e8969974a 100644 --- a/kwok/main.go +++ b/kwok/main.go @@ -42,7 +42,6 @@ func main() { WithControllers(ctx, controllers.NewControllers( op.Clock, op.GetClient(), - op.KubernetesInterface, state.NewCluster(op.Clock, op.GetClient(), cloudProvider), op.EventRecorder, cloudProvider, diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index da032545dd..808bd8db5d 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -109,7 +109,7 @@ spec: type: array x-kubernetes-validations: - message: '''schedule'' must be set with ''duration''' - rule: '!self.all(x, (has(x.schedule) && !has(x.duration)) || (!has(x.schedule) && has(x.duration)))' + rule: self.all(x, has(x.schedule) == has(x.duration)) consolidateAfter: description: |- ConsolidateAfter is the duration the controller will wait diff --git a/pkg/apis/v1beta1/nodepool.go b/pkg/apis/v1beta1/nodepool.go index 971cf55dbe..966bfe70b3 100644 --- a/pkg/apis/v1beta1/nodepool.go +++ b/pkg/apis/v1beta1/nodepool.go @@ -91,7 +91,7 @@ type Disruption struct { // If there are multiple active budgets, Karpenter uses // the most restrictive value. If left undefined, // this will default to one budget with a value to 10%. - // +kubebuilder:validation:XValidation:message="'schedule' must be set with 'duration'",rule="!self.all(x, (has(x.schedule) && !has(x.duration)) || (!has(x.schedule) && has(x.duration)))" + // +kubebuilder:validation:XValidation:message="'schedule' must be set with 'duration'",rule="self.all(x, has(x.schedule) == has(x.duration))" // +kubebuilder:default:={{nodes: "10%"}} // +kubebuilder:validation:MaxItems=50 // +optional diff --git a/pkg/apis/v1beta1/nodepool_validation_cel_test.go b/pkg/apis/v1beta1/nodepool_validation_cel_test.go index 81cff60a23..71273edf49 100644 --- a/pkg/apis/v1beta1/nodepool_validation_cel_test.go +++ b/pkg/apis/v1beta1/nodepool_validation_cel_test.go @@ -188,7 +188,7 @@ var _ = Describe("CEL/Validation", func() { }} Expect(env.Client.Create(ctx, nodePool)).To(Succeed()) }) - It("should fail when creating two budgets where one is invalid", func() { + It("should fail when creating two budgets where one has an invalid crontab", func() { nodePool.Spec.Disruption.Budgets = []Budget{ { Nodes: "10", @@ -202,6 +202,23 @@ var _ = Describe("CEL/Validation", func() { }} Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed()) }) + It("should fail when creating multiple budgets where one doesn't have both schedule and duration", func() { + nodePool.Spec.Disruption.Budgets = []Budget{ + { + Nodes: "10", + Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("20m"))}, + }, + { + Nodes: "10", + Schedule: ptr.String("* * * * *"), + Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("20m"))}, + }, + { + Nodes: "10", + }, + } + Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed()) + }) }) Context("KubeletConfiguration", func() { It("should succeed on kubeReserved with invalid keys", func() { diff --git a/pkg/apis/v1beta1/nodepool_validation_webhook_test.go b/pkg/apis/v1beta1/nodepool_validation_webhook_test.go index 0ef63efc2a..408f24a302 100644 --- a/pkg/apis/v1beta1/nodepool_validation_webhook_test.go +++ b/pkg/apis/v1beta1/nodepool_validation_webhook_test.go @@ -131,7 +131,7 @@ var _ = Describe("Webhook/Validation", func() { }} Expect(nodePool.Validate(ctx)).To(Succeed()) }) - It("should fail to validate two budgets where one is invalid", func() { + It("should fail when creating two budgets where one has an invalid crontab", func() { nodePool.Spec.Disruption.Budgets = []Budget{ { Nodes: "10", @@ -145,6 +145,23 @@ var _ = Describe("Webhook/Validation", func() { }} Expect(nodePool.Validate(ctx)).ToNot(Succeed()) }) + It("should fail when creating multiple budgets where one doesn't have both schedule and duration", func() { + nodePool.Spec.Disruption.Budgets = []Budget{ + { + Nodes: "10", + Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("20m"))}, + }, + { + Nodes: "10", + Schedule: ptr.String("* * * * *"), + Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("20m"))}, + }, + { + Nodes: "10", + }, + } + Expect(nodePool.Validate(ctx)).ToNot(Succeed()) + }) }) Context("Limits", func() { It("should allow undefined limits", func() { diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 7e09e2490a..4f46a32c8c 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -17,7 +17,6 @@ limitations under the License. package controllers import ( - "k8s.io/client-go/kubernetes" "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" @@ -47,14 +46,13 @@ import ( func NewControllers( clock clock.Clock, kubeClient client.Client, - kubernetesInterface kubernetes.Interface, cluster *state.Cluster, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, ) []controller.Controller { - p := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster) - evictionQueue := terminator.NewQueue(kubernetesInterface.CoreV1(), recorder) + p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster) + evictionQueue := terminator.NewQueue(kubeClient, recorder) disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p) return []controller.Controller{ diff --git a/pkg/controllers/disruption/consolidation.go b/pkg/controllers/disruption/consolidation.go index cdfff43d23..09639f9f12 100644 --- a/pkg/controllers/disruption/consolidation.go +++ b/pkg/controllers/disruption/consolidation.go @@ -256,7 +256,7 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand // 3) Assuming CreateInstanceFromTypes(A,B,C,D) returned D, we check if D is part of (A,B,C) and it isn't, so will have another consolidation send a CreateInstanceFromTypes(A,B,C), since they’re cheaper than D resulting in continual consolidation. // If we had restricted instance types to min flexibility at launch at step (1) i.e CreateInstanceFromTypes(A,B,C), we would have received the instance type part of the list preventing immediate consolidation. // Taking this to 15 types, we need to only send the 15 cheapest types in the CreateInstanceFromTypes call so that the resulting instance is always in that set of 15 and we won’t immediately consolidate. - results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, 0, MinInstanceTypesForSpotToSpotConsolidation) + results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions.OrderByPrice(results.NewNodeClaims[0].Requirements), 0, MinInstanceTypesForSpotToSpotConsolidation) return Command{ candidates: candidates, diff --git a/pkg/controllers/disruption/consolidation_test.go b/pkg/controllers/disruption/consolidation_test.go index 458b814ba1..f8556cf188 100644 --- a/pkg/controllers/disruption/consolidation_test.go +++ b/pkg/controllers/disruption/consolidation_test.go @@ -1367,6 +1367,122 @@ var _ = Describe("Consolidation", func() { ExpectExists(ctx, env.Client, spotNodeClaim) ExpectExists(ctx, env.Client, spotNode) }) + It("spot to spot consolidation should order the instance types by price before enforcing minimum flexibility.", func() { + // Fetch 18 spot instances + spotInstances = lo.Slice(lo.Filter(cloudProvider.InstanceTypes, func(i *cloudprovider.InstanceType, _ int) bool { + for _, o := range i.Offerings { + if o.CapacityType == v1beta1.CapacityTypeSpot { + return true + } + } + return false + }), 0, 18) + // Assign the prices for 18 spot instance in ascending order incrementally + for i, inst := range spotInstances { + inst.Offerings[0].Price = 1.00 + float64(i)*0.1 + } + // Force an instancetype that is outside the bound of 15 instances to have the cheapest price among the lot. + spotInstances[16].Offerings[0].Price = 0.001 + + // We now have these spot instance in the list as lowest priced and highest priced instanceTypes + cheapestSpotInstanceType := spotInstances[16] + mostExpensiveInstanceType := spotInstances[17] + + // Add these spot instance with this special condition to cloud provider instancetypes + cloudProvider.InstanceTypes = spotInstances + + expectedInstanceTypesForConsolidation := make([]*cloudprovider.InstanceType, len(spotInstances)) + copy(expectedInstanceTypesForConsolidation, spotInstances) + // Sort the spot instances by pricing from low to high + sort.Slice(expectedInstanceTypesForConsolidation, func(i, j int) bool { + return expectedInstanceTypesForConsolidation[i].Offerings[0].Price < expectedInstanceTypesForConsolidation[j].Offerings[0].Price + }) + // These 15 cheapest instance types should eventually be considered for consolidation. + var expectedInstanceTypesNames []string + for i := 0; i < 15; i++ { + expectedInstanceTypesNames = append(expectedInstanceTypesNames, expectedInstanceTypesForConsolidation[i].Name) + } + + // Assign the most expensive spot instancetype so that it will definitely be replaced through consolidation + spotNodeClaim.Labels = lo.Assign(spotNodeClaim.Labels, map[string]string{ + v1beta1.NodePoolLabelKey: nodePool.Name, + v1.LabelInstanceTypeStable: mostExpensiveInstanceType.Name, + v1beta1.CapacityTypeLabelKey: mostExpensiveInstanceType.Offerings[0].CapacityType, + v1.LabelTopologyZone: mostExpensiveInstanceType.Offerings[0].Zone, + }) + + spotNode.Labels = lo.Assign(spotNode.Labels, map[string]string{ + v1beta1.NodePoolLabelKey: nodePool.Name, + v1.LabelInstanceTypeStable: mostExpensiveInstanceType.Name, + v1beta1.CapacityTypeLabelKey: mostExpensiveInstanceType.Offerings[0].CapacityType, + v1.LabelTopologyZone: mostExpensiveInstanceType.Offerings[0].Zone, + }) + + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: ptr.Bool(true), + BlockOwnerDeletion: ptr.Bool(true), + }, + }}}) + ExpectApplied(ctx, env.Client, rs, pod, spotNode, spotNodeClaim, nodePool) + + // bind pods to node + ExpectManualBinding(ctx, env.Client, pod, spotNode) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{spotNode}, []*v1beta1.NodeClaim{spotNodeClaim}) + + fakeClock.Step(10 * time.Minute) + + // consolidation won't delete the old nodeclaim until the new nodeclaim is ready + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectReconcileSucceeded(ctx, disruptionController, client.ObjectKey{}) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{}) + + // Cascade any deletion of the nodeclaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, spotNodeClaim) + + // should create a new nodeclaim as there is a cheaper one that can hold the pod + nodeClaims := ExpectNodeClaims(ctx, env.Client) + nodes := ExpectNodes(ctx, env.Client) + Expect(nodeClaims).To(HaveLen(1)) + Expect(nodes).To(HaveLen(1)) + + // Expect that the new nodeclaim does not request the most expensive instance type + Expect(nodeClaims[0].Name).ToNot(Equal(spotNodeClaim.Name)) + Expect(scheduling.NewNodeSelectorRequirements(nodeClaims[0].Spec.Requirements...).Has(v1.LabelInstanceTypeStable)).To(BeTrue()) + Expect(scheduling.NewNodeSelectorRequirements(nodeClaims[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Has(mostExpensiveInstanceType.Name)).To(BeFalse()) + + // Make sure that the cheapest instance that was outside the bound of 15 instance types is considered for consolidation. + Expect(scheduling.NewNodeSelectorRequirements(nodeClaims[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Has(cheapestSpotInstanceType.Name)).To(BeTrue()) + spotInstancesConsideredForConsolidation := scheduling.NewNodeSelectorRequirements(nodeClaims[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Values() + + // Make sure that we send only 15 instance types. + Expect(len(spotInstancesConsideredForConsolidation)).To(Equal(15)) + + // Make sure we considered the first 15 cheapest instance types. + for i := 0; i < 15; i++ { + Expect(spotInstancesConsideredForConsolidation).To(ContainElement(expectedInstanceTypesNames[i])) + } + + // and delete the old one + ExpectNotFound(ctx, env.Client, spotNodeClaim, spotNode) + }) DescribeTable("can replace nodes if another nodePool returns no instance types", func(spotToSpot bool) { nodeClaim = lo.Ternary(spotToSpot, spotNodeClaim, nodeClaim) diff --git a/pkg/controllers/disruption/orchestration/suite_test.go b/pkg/controllers/disruption/orchestration/suite_test.go index 95e9e0fd67..8f0697c1ac 100644 --- a/pkg/controllers/disruption/orchestration/suite_test.go +++ b/pkg/controllers/disruption/orchestration/suite_test.go @@ -81,7 +81,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) recorder = test.NewEventRecorder() - prov = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) }) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index f65cb6ad83..a6cb674bb5 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -89,7 +89,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) recorder = test.NewEventRecorder() - prov = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue) }) diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index 2322ae8694..047d02b9c8 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -22,6 +22,9 @@ import ( "testing" "time" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" @@ -36,14 +39,13 @@ import ( "sigs.k8s.io/karpenter/pkg/operator/scheme" "sigs.k8s.io/karpenter/pkg/test" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - . "knative.dev/pkg/logging/testing" "knative.dev/pkg/ptr" + . "knative.dev/pkg/logging/testing" + . "sigs.k8s.io/karpenter/pkg/test/expectations" ) @@ -68,7 +70,7 @@ var _ = BeforeSuite(func() { cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() - queue = terminator.NewQueue(env.KubernetesInterface.CoreV1(), recorder) + queue = terminator.NewQueue(env.Client, recorder) terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue), recorder) }) @@ -173,7 +175,7 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotEnqueuedForEviction(queue, podSkip) + Expect(queue.Has(podSkip)).To(BeFalse()) ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) // Expect node to exist and be draining @@ -201,7 +203,7 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) - ExpectNotEnqueuedForEviction(queue, podSkip) + Expect(queue.Has(podSkip)).To(BeFalse()) ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) // Expect node to exist and be draining @@ -211,7 +213,7 @@ var _ = Describe("Termination", func() { EventuallyExpectTerminating(ctx, env.Client, podEvict) ExpectDeleted(ctx, env.Client, podEvict) - ExpectNotEnqueuedForEviction(queue, podSkip) + Expect(queue.Has(podSkip)).To(BeFalse()) // Reconcile to delete node node = ExpectNodeExists(ctx, env.Client, node.Name) @@ -321,7 +323,7 @@ var _ = Describe("Termination", func() { // Expect podNoEvict to fail eviction due to PDB, and be retried Eventually(func() int { - return queue.NumRequeues(client.ObjectKeyFromObject(podNoEvict)) + return queue.NumRequeues(terminator.NewQueueKey(podNoEvict)) }).Should(BeNumerically(">=", 1)) // Delete pod to simulate successful eviction @@ -478,7 +480,7 @@ var _ = Describe("Termination", func() { ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) // Expect mirror pod to not be queued for eviction - ExpectNotEnqueuedForEviction(queue, podNoEvict) + Expect(queue.Has(podNoEvict)).To(BeFalse()) // Expect podEvict to be enqueued for eviction then be successful EventuallyExpectTerminating(ctx, env.Client, podEvict) @@ -581,6 +583,59 @@ var _ = Describe("Termination", func() { ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) ExpectNotFound(ctx, env.Client, node) }) + It("should not evict a new pod with the same name using the old pod's eviction queue key", func() { + pod := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + OwnerReferences: defaultOwnerRefs, + }, + }) + ExpectApplied(ctx, env.Client, node, pod) + + // Trigger Termination Controller + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Don't trigger a call into the queue to make sure that we effectively aren't triggering eviction + // We'll use this to try to leave pods in the queue + + // Expect node to exist and be draining + ExpectNodeWithNodeClaimDraining(env.Client, node.Name) + + // Delete the pod directly to act like something else is doing the pod termination + ExpectDeleted(ctx, env.Client, pod) + + // Requeue the termination controller to completely delete the node + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + + // Expect that the old pod's key still exists in the queue + Expect(queue.Has(pod)) + + // Re-create the pod and node, it should now have the same name, but a different UUID + node = test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{v1beta1.TerminationFinalizer}, + }, + }) + pod = test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + OwnerReferences: defaultOwnerRefs, + }, + }) + ExpectApplied(ctx, env.Client, node, pod) + + // Trigger eviction queue with the pod key still in it + ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) + + Consistently(func(g Gomega) { + g.Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pod), pod)).To(Succeed()) + g.Expect(pod.DeletionTimestamp.IsZero()).To(BeTrue()) + }, ReconcilerPropagationTime, RequestInterval).Should(Succeed()) + }) }) Context("Metrics", func() { It("should fire the terminationSummary metric when deleting nodes", func() { @@ -606,13 +661,6 @@ var _ = Describe("Termination", func() { }) }) -func ExpectNotEnqueuedForEviction(e *terminator.Queue, pods ...*v1.Pod) { - GinkgoHelper() - for _, pod := range pods { - Expect(e.Contains(client.ObjectKeyFromObject(pod))).To(BeFalse()) - } -} - func ExpectNodeWithNodeClaimDraining(c client.Client, nodeName string) *v1.Node { GinkgoHelper() node := ExpectNodeExists(ctx, c, nodeName) diff --git a/pkg/controllers/node/termination/terminator/eviction.go b/pkg/controllers/node/termination/terminator/eviction.go index c724c7500e..8fc3e429df 100644 --- a/pkg/controllers/node/termination/terminator/eviction.go +++ b/pkg/controllers/node/termination/terminator/eviction.go @@ -20,15 +20,16 @@ import ( "context" "errors" "fmt" + "sync" "time" - set "github.com/deckarep/golang-set" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" @@ -62,19 +63,33 @@ func IsNodeDrainError(err error) bool { return errors.As(err, &nodeDrainErr) } +type QueueKey struct { + types.NamespacedName + UID types.UID +} + +func NewQueueKey(pod *v1.Pod) QueueKey { + return QueueKey{ + NamespacedName: client.ObjectKeyFromObject(pod), + UID: pod.UID, + } +} + type Queue struct { workqueue.RateLimitingInterface - set.Set - coreV1Client corev1.CoreV1Interface - recorder events.Recorder + mu sync.Mutex + set sets.Set[QueueKey] + + kubeClient client.Client + recorder events.Recorder } -func NewQueue(coreV1Client corev1.CoreV1Interface, recorder events.Recorder) *Queue { +func NewQueue(kubeClient client.Client, recorder events.Recorder) *Queue { queue := &Queue{ RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)), - Set: set.NewSet(), - coreV1Client: coreV1Client, + set: sets.New[QueueKey](), + kubeClient: kubeClient, recorder: recorder, } return queue @@ -90,19 +105,30 @@ func (q *Queue) Builder(_ context.Context, m manager.Manager) controller.Builder // Add adds pods to the Queue func (q *Queue) Add(pods ...*v1.Pod) { + q.mu.Lock() + defer q.mu.Unlock() + for _, pod := range pods { - if nn := client.ObjectKeyFromObject(pod); !q.Set.Contains(nn) { - q.Set.Add(nn) - q.RateLimitingInterface.Add(nn) + qk := NewQueueKey(pod) + if !q.set.Has(qk) { + q.set.Insert(qk) + q.RateLimitingInterface.Add(qk) } } } +func (q *Queue) Has(pod *v1.Pod) bool { + q.mu.Lock() + defer q.mu.Unlock() + + return q.set.Has(NewQueueKey(pod)) +} + func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { // Check if the queue is empty. client-go recommends not using this function to gate the subsequent // get call, but since we're popping items off the queue synchronously, there should be no synchonization // issues. - if q.Len() == 0 { + if q.RateLimitingInterface.Len() == 0 { return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } // Get pod from queue. This waits until queue is non-empty. @@ -110,45 +136,60 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R if shutdown { return reconcile.Result{}, fmt.Errorf("EvictionQueue is broken and has shutdown") } - nn := item.(types.NamespacedName) - defer q.RateLimitingInterface.Done(nn) + qk := item.(QueueKey) + defer q.RateLimitingInterface.Done(qk) // Evict pod - if q.Evict(ctx, nn) { - q.RateLimitingInterface.Forget(nn) - q.Set.Remove(nn) + if q.Evict(ctx, qk) { + q.RateLimitingInterface.Forget(qk) + q.mu.Lock() + q.set.Delete(qk) + q.mu.Unlock() return reconcile.Result{RequeueAfter: controller.Immediately}, nil } // Requeue pod if eviction failed - q.RateLimitingInterface.AddRateLimited(nn) + q.RateLimitingInterface.AddRateLimited(qk) return reconcile.Result{RequeueAfter: controller.Immediately}, nil } // Evict returns true if successful eviction call, and false if not an eviction-related error -func (q *Queue) Evict(ctx context.Context, nn types.NamespacedName) bool { - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", nn)) - if err := q.coreV1Client.Pods(nn.Namespace).EvictV1(ctx, &policyv1.Eviction{ - ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace}, - }); err != nil { +func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", key.NamespacedName)) + if err := q.kubeClient.SubResource("eviction").Create(ctx, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}}, + &policyv1.Eviction{ + DeleteOptions: &metav1.DeleteOptions{ + Preconditions: &metav1.Preconditions{ + UID: lo.ToPtr(key.UID), + }, + }, + }); err != nil { // status codes for the eviction API are defined here: // https://kubernetes.io/docs/concepts/scheduling-eviction/api-eviction/#how-api-initiated-eviction-works - if apierrors.IsNotFound(err) { // 404 + if apierrors.IsNotFound(err) || apierrors.IsConflict(err) { + // 404 - The pod no longer exists + // https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L160 + // 409 - The pod exists, but it is not the same pod that we initiated the eviction on + // https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L318 return true } if apierrors.IsTooManyRequests(err) { // 429 - PDB violation q.recorder.Publish(terminatorevents.NodeFailedToDrain(&v1.Node{ObjectMeta: metav1.ObjectMeta{ - Name: nn.Name, - Namespace: nn.Namespace, - }}, fmt.Errorf("evicting pod %s/%s violates a PDB", nn.Namespace, nn.Name))) + Name: key.Name, + Namespace: key.Namespace, + }}, fmt.Errorf("evicting pod %s/%s violates a PDB", key.Namespace, key.Name))) return false } logging.FromContext(ctx).Errorf("evicting pod, %s", err) return false } - q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace}})) + q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}})) return true } func (q *Queue) Reset() { + q.mu.Lock() + defer q.mu.Unlock() + q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)) - q.Set = set.NewSet() + q.set = sets.New[QueueKey]() } diff --git a/pkg/controllers/node/termination/terminator/suite_test.go b/pkg/controllers/node/termination/terminator/suite_test.go index 7719932fd8..bd756ac601 100644 --- a/pkg/controllers/node/termination/terminator/suite_test.go +++ b/pkg/controllers/node/termination/terminator/suite_test.go @@ -18,18 +18,19 @@ package terminator_test import ( "context" + "sync" "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/samber/lo" + v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/uuid" . "knative.dev/pkg/logging/testing" - - v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" @@ -57,7 +58,7 @@ var _ = BeforeSuite(func() { env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...)) ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{Drift: lo.ToPtr(true)}})) recorder = test.NewEventRecorder() - queue = terminator.NewQueue(env.KubernetesInterface.CoreV1(), recorder) + queue = terminator.NewQueue(env.Client, recorder) }) var _ = AfterSuite(func() { @@ -91,13 +92,17 @@ var _ = Describe("Eviction/Queue", func() { Context("Eviction API", func() { It("should succeed with no event when the pod is not found", func() { - ExpectApplied(ctx, env.Client, pdb) - Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeTrue()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) + Expect(recorder.Events()).To(HaveLen(0)) + }) + It("should succeed with no event when the pod UID conflicts", func() { + ExpectApplied(ctx, env.Client, pod) + Expect(queue.Evict(ctx, terminator.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue()) Expect(recorder.Events()).To(HaveLen(0)) }) It("should succeed with an evicted event when there are no PDBs", func() { ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeTrue()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) Expect(recorder.Calls("Evicted")).To(Equal(1)) }) It("should succeed with no event when there are PDBs that allow an eviction", func() { @@ -106,12 +111,12 @@ var _ = Describe("Eviction/Queue", func() { MaxUnavailable: &intstr.IntOrString{IntVal: 1}, }) ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeTrue()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) Expect(recorder.Calls("Evicted")).To(Equal(1)) }) It("should return a NodeDrainError event when a PDB is blocking", func() { ExpectApplied(ctx, env.Client, pdb, pod) - Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeFalse()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse()) Expect(recorder.Calls("FailedDraining")).To(Equal(1)) }) It("should fail when two PDBs refer to the same pod", func() { @@ -120,7 +125,33 @@ var _ = Describe("Eviction/Queue", func() { MaxUnavailable: &intstr.IntOrString{IntVal: 0}, }) ExpectApplied(ctx, env.Client, pdb, pdb2, pod) - Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeFalse()) + Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse()) + }) + It("should ensure that calling Evict() is valid while making Add() calls", func() { + cancelCtx, cancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} + DeferCleanup(func() { + cancel() + wg.Wait() // Ensure that we wait for reconcile loop to finish so that we don't get a RACE + }) + + // Keep calling Reconcile() for the entirety of this test + wg.Add(1) + go func() { + defer wg.Done() + + for { + ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) + if cancelCtx.Err() != nil { + return + } + } + }() + + // Ensure that we add enough pods to the queue while we are pulling items off of the queue (enough to trigger a DATA RACE) + for i := 0; i < 10000; i++ { + queue.Add(test.Pod()) + } }) }) }) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 96042b1de7..b44d9077b0 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -30,7 +30,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/workqueue" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" @@ -76,7 +75,6 @@ func WithReason(reason string) func(LaunchOptions) LaunchOptions { type Provisioner struct { cloudProvider cloudprovider.CloudProvider kubeClient client.Client - coreV1Client corev1.CoreV1Interface batcher *Batcher volumeTopology *scheduler.VolumeTopology cluster *state.Cluster @@ -84,14 +82,13 @@ type Provisioner struct { cm *pretty.ChangeMonitor } -func NewProvisioner(kubeClient client.Client, coreV1Client corev1.CoreV1Interface, - recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster, +func NewProvisioner(kubeClient client.Client, recorder events.Recorder, + cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster, ) *Provisioner { p := &Provisioner{ batcher: NewBatcher(), cloudProvider: cloudProvider, kubeClient: kubeClient, - coreV1Client: coreV1Client, volumeTopology: scheduler.NewVolumeTopology(kubeClient), cluster: cluster, recorder: recorder, diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index e050e39c56..f265915cf2 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -56,10 +56,22 @@ const PrintStats = false //nolint:gosec var r = rand.New(rand.NewSource(42)) +// To run the benchmarks use: +// `go test -tags=test_performance -run=XXX -bench=.` +// +// to get something statistically significant for comparison we need to run them several times and then +// compare the results between the old performance and the new performance. +// ```sh +// +// go test -tags=test_performance -run=XXX -bench=. -count=10 | tee /tmp/old +// # make your changes to the code +// go test -tags=test_performance -run=XXX -bench=. -count=10 | tee /tmp/new +// benchstat /tmp/old /tmp/new +// +// ``` func BenchmarkScheduling1(b *testing.B) { benchmarkScheduler(b, 400, 1) } - func BenchmarkScheduling50(b *testing.B) { benchmarkScheduler(b, 400, 50) } @@ -102,7 +114,7 @@ func TestSchedulingProfile(t *testing.T) { totalNodes := 0 var totalTime time.Duration for _, instanceCount := range []int{400} { - for _, podCount := range []int{10, 100, 500, 1000, 1500, 2000, 2500} { + for _, podCount := range []int{10, 100, 500, 1000, 1500, 2000, 5000} { start := time.Now() res := testing.Benchmark(func(b *testing.B) { benchmarkScheduler(b, instanceCount, podCount) }) totalTime += time.Since(start) / time.Duration(res.N) diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 145a874e58..7e928c42e3 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -91,7 +91,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) podStateController = informer.NewPodController(env.Client, cluster) - prov = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) }) var _ = AfterSuite(func() { diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index 9cb1c2f9d3..2ea541e1ee 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -21,13 +21,13 @@ import ( "fmt" "math" + "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/scheduling" "sigs.k8s.io/karpenter/pkg/utils/functional" "sigs.k8s.io/karpenter/pkg/utils/pretty" - "k8s.io/apimachinery/pkg/api/errors" - "go.uber.org/multierr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -85,6 +85,19 @@ func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.C return t, nil } +// topologyError allows lazily generating the error string in the topology error. If a pod fails to schedule, most often +// we are only interested in the fact that it failed to schedule and not why. +type topologyError struct { + topology *TopologyGroup + podDomains *scheduling.Requirement + nodeDomains *scheduling.Requirement +} + +func (t topologyError) Error() string { + return fmt.Sprintf("unsatisfiable topology constraint for %s, key=%s (counts = %s, podDomains = %v, nodeDomains = %v", t.topology.Type, t.topology.Key, + pretty.Map(t.topology.domains, 25), t.podDomains, t.nodeDomains) +} + // Update unregisters the pod as the owner of all affinities and then creates any new topologies based on the pod spec // registered the pod as the owner of all associated affinities, new or old. This allows Update() to be called after // relaxation of a preference to properly break the topology <-> owner relationship so that the preferred topology will @@ -165,7 +178,11 @@ func (t *Topology) AddRequirements(podRequirements, nodeRequirements scheduling. } domains := topology.Get(p, podDomains, nodeDomains) if domains.Len() == 0 { - return nil, fmt.Errorf("unsatisfiable topology constraint for %s, key=%s (counts = %s, podDomains = %v, nodeDomains = %v", topology.Type, topology.Key, pretty.Map(topology.domains, 5), podDomains, nodeDomains) + return nil, topologyError{ + topology: topology, + podDomains: podDomains, + nodeDomains: nodeDomains, + } } requirements.Add(domains) } diff --git a/pkg/controllers/provisioning/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go index 38fa0fdaf0..c4e7ff1dfd 100644 --- a/pkg/controllers/provisioning/scheduling/topologygroup.go +++ b/pkg/controllers/provisioning/scheduling/topologygroup.go @@ -256,10 +256,7 @@ func (t *TopologyGroup) nextDomainAntiAffinity(domains *scheduling.Requirement) // list of domains. The use case where this optimization is really great is when we are launching nodes for // a deployment of pods with self anti-affinity. The domains map here continues to grow, and we continue to // fully scan it each iteration. - if len(t.emptyDomains) == 0 { - return options - } - for domain := range t.domains { + for domain := range t.emptyDomains { if domains.Has(domain) && t.domains[domain] == 0 { options.Insert(domain) } diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 9b9f9ff521..038e6d654d 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" . "knative.dev/pkg/logging/testing" @@ -77,7 +76,7 @@ var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) cluster = state.NewCluster(fakeClock, env.Client, cloudProvider) nodeController = informer.NewNodeController(env.Client, cluster) - prov = provisioning.NewProvisioner(env.Client, corev1.NewForConfigOrDie(env.Config), events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) daemonsetController = informer.NewDaemonSetController(env.Client, cluster) instanceTypes, _ := cloudProvider.GetInstanceTypes(ctx, nil) instanceTypeMap = map[string]*cloudprovider.InstanceType{} diff --git a/pkg/scheduling/requirements.go b/pkg/scheduling/requirements.go index 80362f9087..564c7f066b 100644 --- a/pkg/scheduling/requirements.go +++ b/pkg/scheduling/requirements.go @@ -237,9 +237,38 @@ func labelHint(r Requirements, key string, allowedUndefined sets.Set[string]) st return "" } +// badKeyError allows lazily generating the error string in the case of a bad key error. When requirements fail +// to match, we are most often interested in the failure and not why it fails. +type badKeyError struct { + key string + incoming *Requirement + existing *Requirement +} + +func (b badKeyError) Error() string { + return fmt.Sprintf("key %s, %s not in %s", b.key, b.incoming, b.existing) +} + +// intersectKeys is much faster and allocates less han getting the two key sets separately and intersecting them +func (r Requirements) intersectKeys(rhs Requirements) sets.Set[string] { + smallest := r + largest := rhs + if len(smallest) > len(largest) { + smallest, largest = largest, smallest + } + keys := sets.Set[string]{} + + for key := range smallest { + if _, ok := largest[key]; ok { + keys.Insert(key) + } + } + return keys +} + // Intersects returns errors if the requirements don't have overlapping values, undefined keys are allowed func (r Requirements) Intersects(requirements Requirements) (errs error) { - for key := range r.Keys().Intersection(requirements.Keys()) { + for key := range r.intersectKeys(requirements) { existing := r.Get(key) incoming := requirements.Get(key) // There must be some value, except @@ -251,7 +280,11 @@ func (r Requirements) Intersects(requirements Requirements) (errs error) { continue } } - errs = multierr.Append(errs, fmt.Errorf("key %s, %s not in %s", key, incoming, existing)) + errs = multierr.Append(errs, badKeyError{ + key: key, + incoming: incoming, + existing: existing, + }) } } return errs diff --git a/pkg/scheduling/requirements_test.go b/pkg/scheduling/requirements_test.go index b1cdc3d7c0..fa5a6fed00 100644 --- a/pkg/scheduling/requirements_test.go +++ b/pkg/scheduling/requirements_test.go @@ -17,10 +17,13 @@ limitations under the License. package scheduling import ( + "os" + "runtime/pprof" "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" ) @@ -654,3 +657,32 @@ func FuzzEditDistance(f *testing.F) { editDistance(lhs, rhs) }) } + +// TestSchedulingProfile is used to gather profiling metrics, benchmarking is primarily done with standard +// Go benchmark functions +// go test -tags=test_performance -run=RequirementsProfile +func TestRequirementsProfile(t *testing.T) { + cpuf, err := os.Create("requirements.cpuprofile") + if err != nil { + t.Fatalf("error creating CPU profile: %s", err) + } + lo.Must0(pprof.StartCPUProfile(cpuf)) + defer pprof.StopCPUProfile() + + heapf, err := os.Create("requirements.heapprofile") + if err != nil { + t.Fatalf("error creating heap profile: %s", err) + } + defer lo.Must0(pprof.WriteHeapProfile(heapf)) + + reqsA := NewRequirements(NewRequirement("foo", v1.NodeSelectorOpIn, "a", "b", "c")) + reqsB := NewRequirements(NewRequirement("foo", v1.NodeSelectorOpIn, "d", "e", "f")) + + for i := 0; i < 525000; i++ { + _ = reqsA.Intersects(reqsB) + _ = reqsA.Compatible(reqsB) + _ = reqsA.NodeSelectorRequirements() + _ = reqsA.Keys() + _ = reqsA.Values() + } +} diff --git a/pkg/utils/resources/resources.go b/pkg/utils/resources/resources.go index 4db2cfe642..ffffc2fa3e 100644 --- a/pkg/utils/resources/resources.go +++ b/pkg/utils/resources/resources.go @@ -221,7 +221,7 @@ func Cmp(lhs resource.Quantity, rhs resource.Quantity) int { func Fits(candidate, total v1.ResourceList) bool { // If any of the total resource values are negative then the resource will never fit for _, quantity := range total { - if Cmp(resource.MustParse("0"), quantity) > 0 { + if Cmp(*resource.NewScaledQuantity(0, resource.Kilo), quantity) > 0 { return false } }