Skip to content

Commit

Permalink
chore: Adding cherry picked commits to release-v0.34.x (#1020)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Innis <jonathan.innis.ji@gmail.com>
Co-authored-by: Todd Neal <tnealt@amazon.com>
Co-authored-by: nikmohan123 <154277636+nikmohan123@users.noreply.github.com>
  • Loading branch information
4 people authored Feb 16, 2024
1 parent 0e77b78 commit d7ef220
Showing 24 changed files with 444 additions and 93 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -5,8 +5,7 @@ go 1.21
require (
github.com/Pallinder/go-randomdata v1.2.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/deckarep/golang-set v1.8.0
github.com/docker/docker v25.0.1+incompatible
github.com/docker/docker v25.0.2+incompatible
github.com/go-logr/logr v1.4.1
github.com/go-logr/zapr v1.3.0
github.com/imdario/mergo v0.3.16
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -71,10 +71,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4=
github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo=
github.com/docker/docker v25.0.1+incompatible h1:k5TYd5rIVQRSqcTwCID+cyVA0yRg86+Pcrz1ls0/frA=
github.com/docker/docker v25.0.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v25.0.2+incompatible h1:/OaKeauroa10K4Nqavw4zlhcDq/WBcPMc5DbjOGgozY=
github.com/docker/docker v25.0.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
1 change: 0 additions & 1 deletion kwok/main.go
Original file line number Diff line number Diff line change
@@ -42,7 +42,6 @@ func main() {
WithControllers(ctx, controllers.NewControllers(
op.Clock,
op.GetClient(),
op.KubernetesInterface,
state.NewCluster(op.Clock, op.GetClient(), cloudProvider),
op.EventRecorder,
cloudProvider,
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
@@ -109,7 +109,7 @@ spec:
type: array
x-kubernetes-validations:
- message: '''schedule'' must be set with ''duration'''
rule: '!self.all(x, (has(x.schedule) && !has(x.duration)) || (!has(x.schedule) && has(x.duration)))'
rule: self.all(x, has(x.schedule) == has(x.duration))
consolidateAfter:
description: |-
ConsolidateAfter is the duration the controller will wait
2 changes: 1 addition & 1 deletion pkg/apis/v1beta1/nodepool.go
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ type Disruption struct {
// If there are multiple active budgets, Karpenter uses
// the most restrictive value. If left undefined,
// this will default to one budget with a value to 10%.
// +kubebuilder:validation:XValidation:message="'schedule' must be set with 'duration'",rule="!self.all(x, (has(x.schedule) && !has(x.duration)) || (!has(x.schedule) && has(x.duration)))"
// +kubebuilder:validation:XValidation:message="'schedule' must be set with 'duration'",rule="self.all(x, has(x.schedule) == has(x.duration))"
// +kubebuilder:default:={{nodes: "10%"}}
// +kubebuilder:validation:MaxItems=50
// +optional
19 changes: 18 additions & 1 deletion pkg/apis/v1beta1/nodepool_validation_cel_test.go
Original file line number Diff line number Diff line change
@@ -188,7 +188,7 @@ var _ = Describe("CEL/Validation", func() {
}}
Expect(env.Client.Create(ctx, nodePool)).To(Succeed())
})
It("should fail when creating two budgets where one is invalid", func() {
It("should fail when creating two budgets where one has an invalid crontab", func() {
nodePool.Spec.Disruption.Budgets = []Budget{
{
Nodes: "10",
@@ -202,6 +202,23 @@ var _ = Describe("CEL/Validation", func() {
}}
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
It("should fail when creating multiple budgets where one doesn't have both schedule and duration", func() {
nodePool.Spec.Disruption.Budgets = []Budget{
{
Nodes: "10",
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("20m"))},
},
{
Nodes: "10",
Schedule: ptr.String("* * * * *"),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("20m"))},
},
{
Nodes: "10",
},
}
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
})
Context("KubeletConfiguration", func() {
It("should succeed on kubeReserved with invalid keys", func() {
19 changes: 18 additions & 1 deletion pkg/apis/v1beta1/nodepool_validation_webhook_test.go
Original file line number Diff line number Diff line change
@@ -131,7 +131,7 @@ var _ = Describe("Webhook/Validation", func() {
}}
Expect(nodePool.Validate(ctx)).To(Succeed())
})
It("should fail to validate two budgets where one is invalid", func() {
It("should fail when creating two budgets where one has an invalid crontab", func() {
nodePool.Spec.Disruption.Budgets = []Budget{
{
Nodes: "10",
@@ -145,6 +145,23 @@ var _ = Describe("Webhook/Validation", func() {
}}
Expect(nodePool.Validate(ctx)).ToNot(Succeed())
})
It("should fail when creating multiple budgets where one doesn't have both schedule and duration", func() {
nodePool.Spec.Disruption.Budgets = []Budget{
{
Nodes: "10",
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("20m"))},
},
{
Nodes: "10",
Schedule: ptr.String("* * * * *"),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("20m"))},
},
{
Nodes: "10",
},
}
Expect(nodePool.Validate(ctx)).ToNot(Succeed())
})
})
Context("Limits", func() {
It("should allow undefined limits", func() {
6 changes: 2 additions & 4 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ limitations under the License.
package controllers

import (
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"

@@ -47,14 +46,13 @@ import (
func NewControllers(
clock clock.Clock,
kubeClient client.Client,
kubernetesInterface kubernetes.Interface,
cluster *state.Cluster,
recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider,
) []controller.Controller {

p := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubernetesInterface.CoreV1(), recorder)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

return []controller.Controller{
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
@@ -256,7 +256,7 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
// 3) Assuming CreateInstanceFromTypes(A,B,C,D) returned D, we check if D is part of (A,B,C) and it isn't, so will have another consolidation send a CreateInstanceFromTypes(A,B,C), since they’re cheaper than D resulting in continual consolidation.
// If we had restricted instance types to min flexibility at launch at step (1) i.e CreateInstanceFromTypes(A,B,C), we would have received the instance type part of the list preventing immediate consolidation.
// Taking this to 15 types, we need to only send the 15 cheapest types in the CreateInstanceFromTypes call so that the resulting instance is always in that set of 15 and we won’t immediately consolidate.
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, 0, MinInstanceTypesForSpotToSpotConsolidation)
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions.OrderByPrice(results.NewNodeClaims[0].Requirements), 0, MinInstanceTypesForSpotToSpotConsolidation)

return Command{
candidates: candidates,
116 changes: 116 additions & 0 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
@@ -1367,6 +1367,122 @@ var _ = Describe("Consolidation", func() {
ExpectExists(ctx, env.Client, spotNodeClaim)
ExpectExists(ctx, env.Client, spotNode)
})
It("spot to spot consolidation should order the instance types by price before enforcing minimum flexibility.", func() {
// Fetch 18 spot instances
spotInstances = lo.Slice(lo.Filter(cloudProvider.InstanceTypes, func(i *cloudprovider.InstanceType, _ int) bool {
for _, o := range i.Offerings {
if o.CapacityType == v1beta1.CapacityTypeSpot {
return true
}
}
return false
}), 0, 18)
// Assign the prices for 18 spot instance in ascending order incrementally
for i, inst := range spotInstances {
inst.Offerings[0].Price = 1.00 + float64(i)*0.1
}
// Force an instancetype that is outside the bound of 15 instances to have the cheapest price among the lot.
spotInstances[16].Offerings[0].Price = 0.001

// We now have these spot instance in the list as lowest priced and highest priced instanceTypes
cheapestSpotInstanceType := spotInstances[16]
mostExpensiveInstanceType := spotInstances[17]

// Add these spot instance with this special condition to cloud provider instancetypes
cloudProvider.InstanceTypes = spotInstances

expectedInstanceTypesForConsolidation := make([]*cloudprovider.InstanceType, len(spotInstances))
copy(expectedInstanceTypesForConsolidation, spotInstances)
// Sort the spot instances by pricing from low to high
sort.Slice(expectedInstanceTypesForConsolidation, func(i, j int) bool {
return expectedInstanceTypesForConsolidation[i].Offerings[0].Price < expectedInstanceTypesForConsolidation[j].Offerings[0].Price
})
// These 15 cheapest instance types should eventually be considered for consolidation.
var expectedInstanceTypesNames []string
for i := 0; i < 15; i++ {
expectedInstanceTypesNames = append(expectedInstanceTypesNames, expectedInstanceTypesForConsolidation[i].Name)
}

// Assign the most expensive spot instancetype so that it will definitely be replaced through consolidation
spotNodeClaim.Labels = lo.Assign(spotNodeClaim.Labels, map[string]string{
v1beta1.NodePoolLabelKey: nodePool.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstanceType.Name,
v1beta1.CapacityTypeLabelKey: mostExpensiveInstanceType.Offerings[0].CapacityType,
v1.LabelTopologyZone: mostExpensiveInstanceType.Offerings[0].Zone,
})

spotNode.Labels = lo.Assign(spotNode.Labels, map[string]string{
v1beta1.NodePoolLabelKey: nodePool.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstanceType.Name,
v1beta1.CapacityTypeLabelKey: mostExpensiveInstanceType.Offerings[0].CapacityType,
v1.LabelTopologyZone: mostExpensiveInstanceType.Offerings[0].Zone,
})

rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(true),
},
}}})
ExpectApplied(ctx, env.Client, rs, pod, spotNode, spotNodeClaim, nodePool)

// bind pods to node
ExpectManualBinding(ctx, env.Client, pod, spotNode)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{spotNode}, []*v1beta1.NodeClaim{spotNodeClaim})

fakeClock.Step(10 * time.Minute)

// consolidation won't delete the old nodeclaim until the new nodeclaim is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1)
ExpectReconcileSucceeded(ctx, disruptionController, client.ObjectKey{})
wg.Wait()

// Process the item so that the nodes can be deleted.
ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})

// Cascade any deletion of the nodeclaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, spotNodeClaim)

// should create a new nodeclaim as there is a cheaper one that can hold the pod
nodeClaims := ExpectNodeClaims(ctx, env.Client)
nodes := ExpectNodes(ctx, env.Client)
Expect(nodeClaims).To(HaveLen(1))
Expect(nodes).To(HaveLen(1))

// Expect that the new nodeclaim does not request the most expensive instance type
Expect(nodeClaims[0].Name).ToNot(Equal(spotNodeClaim.Name))
Expect(scheduling.NewNodeSelectorRequirements(nodeClaims[0].Spec.Requirements...).Has(v1.LabelInstanceTypeStable)).To(BeTrue())
Expect(scheduling.NewNodeSelectorRequirements(nodeClaims[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Has(mostExpensiveInstanceType.Name)).To(BeFalse())

// Make sure that the cheapest instance that was outside the bound of 15 instance types is considered for consolidation.
Expect(scheduling.NewNodeSelectorRequirements(nodeClaims[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Has(cheapestSpotInstanceType.Name)).To(BeTrue())
spotInstancesConsideredForConsolidation := scheduling.NewNodeSelectorRequirements(nodeClaims[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Values()

// Make sure that we send only 15 instance types.
Expect(len(spotInstancesConsideredForConsolidation)).To(Equal(15))

// Make sure we considered the first 15 cheapest instance types.
for i := 0; i < 15; i++ {
Expect(spotInstancesConsideredForConsolidation).To(ContainElement(expectedInstanceTypesNames[i]))
}

// and delete the old one
ExpectNotFound(ctx, env.Client, spotNodeClaim, spotNode)
})
DescribeTable("can replace nodes if another nodePool returns no instance types",
func(spotToSpot bool) {
nodeClaim = lo.Ternary(spotToSpot, spotNodeClaim, nodeClaim)
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
@@ -81,7 +81,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
})

2 changes: 1 addition & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue)
})
Loading

0 comments on commit d7ef220

Please sign in to comment.