Skip to content

Commit 881dffb

Browse files
committed
correctly handle domains from NodePools when honoring taints
1 parent 094b2bb commit 881dffb

File tree

6 files changed

+270
-65
lines changed

6 files changed

+270
-65
lines changed

pkg/controllers/provisioning/provisioner.go

+14-21
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
appsv1 "k8s.io/api/apps/v1"
3434
corev1 "k8s.io/api/core/v1"
3535
"k8s.io/apimachinery/pkg/types"
36-
"k8s.io/apimachinery/pkg/util/sets"
3736
"k8s.io/client-go/util/workqueue"
3837
"k8s.io/klog/v2"
3938
controllerruntime "sigs.k8s.io/controller-runtime"
@@ -235,8 +234,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
235234
nodePoolList.OrderByWeight()
236235

237236
instanceTypes := map[string][]*cloudprovider.InstanceType{}
238-
domains := map[string]sets.Set[string]{}
239-
var notReadyNodePools []string
237+
domainGroups := map[string]scheduler.TopologyDomainGroup{}
240238
for _, nodePool := range nodePoolList.Items {
241239
// Get instance type options
242240
instanceTypeOptions, err := p.cloudProvider.GetInstanceTypes(ctx, lo.ToPtr(nodePool))
@@ -252,6 +250,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
252250
continue
253251
}
254252
instanceTypes[nodePool.Name] = append(instanceTypes[nodePool.Name], instanceTypeOptions...)
253+
nodePoolTaints := nodePool.Spec.Template.Spec.Taints
255254

256255
// Construct Topology Domains
257256
for _, instanceType := range instanceTypeOptions {
@@ -261,15 +260,12 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
261260
requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...)
262261
requirements.Add(instanceType.Requirements.Values()...)
263262

264-
for key, requirement := range requirements {
265-
// This code used to execute a Union between domains[key] and requirement.Values().
266-
// The downside of this is that Union is immutable and takes a copy of the set it is executed upon.
267-
// This resulted in a lot of memory pressure on the heap and poor performance
268-
// https://github.com/aws/karpenter/issues/3565
269-
if domains[key] == nil {
270-
domains[key] = sets.New(requirement.Values()...)
271-
} else {
272-
domains[key].Insert(requirement.Values()...)
263+
for topologyKey, requirement := range requirements {
264+
if _, ok := domainGroups[topologyKey]; !ok {
265+
domainGroups[topologyKey] = scheduler.NewTopologyDomainGroup()
266+
}
267+
for _, domain := range requirement.Values() {
268+
domainGroups[topologyKey].Insert(domain, nodePoolTaints...)
273269
}
274270
}
275271
}
@@ -278,23 +274,20 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
278274
requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...)
279275
for key, requirement := range requirements {
280276
if requirement.Operator() == corev1.NodeSelectorOpIn {
281-
// The following is a performance optimisation, for the explanation see the comment above
282-
if domains[key] == nil {
283-
domains[key] = sets.New(requirement.Values()...)
284-
} else {
285-
domains[key].Insert(requirement.Values()...)
277+
if _, ok := domainGroups[key]; !ok {
278+
domainGroups[key] = scheduler.NewTopologyDomainGroup()
279+
}
280+
for _, value := range requirement.Values() {
281+
domainGroups[key].Insert(value, nodePoolTaints...)
286282
}
287283
}
288284
}
289285
}
290-
if len(notReadyNodePools) > 0 {
291-
log.FromContext(ctx).WithValues("nodePools", nodePoolList).Info("skipped nodePools, not ready")
292-
}
293286
// inject topology constraints
294287
pods = p.injectVolumeTopologyRequirements(ctx, pods)
295288

296289
// Calculate cluster topology
297-
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, pods)
290+
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domainGroups, pods)
298291
if err != nil {
299292
return nil, fmt.Errorf("tracking topology counts, %w", err)
300293
}

pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"github.com/samber/lo"
3434
corev1 "k8s.io/api/core/v1"
3535
"k8s.io/apimachinery/pkg/api/resource"
36-
"k8s.io/apimachinery/pkg/util/sets"
3736
"k8s.io/client-go/tools/record"
3837
"k8s.io/utils/clock"
3938
fakecr "sigs.k8s.io/controller-runtime/pkg/client/fake"
@@ -167,7 +166,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
167166
client := fakecr.NewFakeClient()
168167
pods := makeDiversePods(podCount)
169168
cluster = state.NewCluster(&clock.RealClock{}, client)
170-
domains := map[string]sets.Set[string]{}
169+
domains := map[string]scheduling.TopologyDomainGroup{}
171170
topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods)
172171
if err != nil {
173172
b.Fatalf("creating topology, %s", err)

pkg/controllers/provisioning/scheduling/topology.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,18 @@ type Topology struct {
5252
// in some cases.
5353
inverseTopologies map[uint64]*TopologyGroup
5454
// The universe of domains by topology key
55-
domains map[string]sets.Set[string]
55+
domainGroups map[string]TopologyDomainGroup
5656
// excludedPods are the pod UIDs of pods that are excluded from counting. This is used so we can simulate
5757
// moving pods to prevent them from being double counted.
5858
excludedPods sets.Set[string]
5959
cluster *state.Cluster
6060
}
6161

62-
func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domains map[string]sets.Set[string], pods []*corev1.Pod) (*Topology, error) {
62+
func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domainGroups map[string]TopologyDomainGroup, pods []*corev1.Pod) (*Topology, error) {
6363
t := &Topology{
6464
kubeClient: kubeClient,
6565
cluster: cluster,
66-
domains: domains,
66+
domainGroups: domainGroups,
6767
topologies: map[uint64]*TopologyGroup{},
6868
inverseTopologies: map[uint64]*TopologyGroup{},
6969
excludedPods: sets.New[string](),
@@ -233,7 +233,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po
233233
return err
234234
}
235235

236-
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey])
236+
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey])
237237

238238
hash := tg.Hash()
239239
if existing, ok := t.inverseTopologies[hash]; !ok {
@@ -269,6 +269,10 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
269269
// capture new domain values from existing nodes that may not have any pods selected by the topology group
270270
// scheduled to them already
271271
t.cluster.ForEachNode(func(n *state.StateNode) bool {
272+
// ignore state nodes which are tracking in-flight NodeClaims
273+
if n.Node == nil {
274+
return true
275+
}
272276
// ignore the node if it doesn't match the topology group
273277
if !tg.nodeFilter.Matches(n.Node) {
274278
return true
@@ -330,7 +334,7 @@ func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup {
330334
var topologyGroups []*TopologyGroup
331335
for _, cs := range p.Spec.TopologySpreadConstraints {
332336
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace),
333-
cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domains[cs.TopologyKey]))
337+
cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domainGroups[cs.TopologyKey]))
334338
}
335339
return topologyGroups
336340
}
@@ -367,7 +371,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo
367371
if err != nil {
368372
return nil, err
369373
}
370-
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey]))
374+
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey]))
371375
}
372376
}
373377
return topologyGroups, nil

0 commit comments

Comments
 (0)