From 094b2bbab5fa91fbe1ef9ac110581367c158468d Mon Sep 17 00:00:00 2001 From: Todd Neal Date: Wed, 18 Oct 2023 13:18:44 -0500 Subject: [PATCH 1/4] support node affinity and taints policies for topology spread --- .../provisioning/scheduling/existingnode.go | 2 +- .../provisioning/scheduling/nodeclaim.go | 2 +- .../provisioning/scheduling/scheduler.go | 4 +- .../provisioning/scheduling/topology.go | 25 +- .../provisioning/scheduling/topology_test.go | 359 ++++++++++++++++++ .../provisioning/scheduling/topologygroup.go | 13 +- .../scheduling/topologynodefilter.go | 39 +- pkg/scheduling/taints.go | 11 +- pkg/utils/pod/scheduling.go | 9 +- test/pkg/environment/common/expectations.go | 4 +- 10 files changed, 444 insertions(+), 24 deletions(-) diff --git a/pkg/controllers/provisioning/scheduling/existingnode.go b/pkg/controllers/provisioning/scheduling/existingnode.go index 98ce95cdcf..961ef4d000 100644 --- a/pkg/controllers/provisioning/scheduling/existingnode.go +++ b/pkg/controllers/provisioning/scheduling/existingnode.go @@ -63,7 +63,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, daemonResources v1. func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod) error { // Check Taints - if err := scheduling.Taints(n.Taints()).Tolerates(pod); err != nil { + if err := scheduling.Taints(n.Taints()).ToleratesPod(pod); err != nil { return err } // determine the volumes that will be mounted if the pod schedules diff --git a/pkg/controllers/provisioning/scheduling/nodeclaim.go b/pkg/controllers/provisioning/scheduling/nodeclaim.go index 0ccaf397df..1116cbe3a3 100644 --- a/pkg/controllers/provisioning/scheduling/nodeclaim.go +++ b/pkg/controllers/provisioning/scheduling/nodeclaim.go @@ -64,7 +64,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem func (n *NodeClaim) Add(pod *v1.Pod) error { // Check Taints - if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil { + if err := scheduling.Taints(n.Spec.Taints).ToleratesPod(pod); err != nil { return err } diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 1bdda17ab5..a5f8028f43 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -303,7 +303,7 @@ func (s *Scheduler) calculateExistingNodeClaims(stateNodes []*state.StateNode, d // Calculate any daemonsets that should schedule to the inflight node var daemons []*corev1.Pod for _, p := range daemonSetPods { - if err := scheduling.Taints(node.Taints()).Tolerates(p); err != nil { + if err := scheduling.Taints(node.Taints()).ToleratesPod(p); err != nil { continue } if err := scheduling.NewLabelRequirements(node.Labels()).Compatible(scheduling.NewPodRequirements(p)); err != nil { @@ -340,7 +340,7 @@ func getDaemonOverhead(nodeClaimTemplates []*NodeClaimTemplate, daemonSetPods [] for _, nodeClaimTemplate := range nodeClaimTemplates { var daemons []*corev1.Pod for _, p := range daemonSetPods { - if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).Tolerates(p); err != nil { + if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).ToleratesPod(p); err != nil { continue } if err := nodeClaimTemplate.Requirements.Compatible(scheduling.NewPodRequirements(p), scheduling.AllowUndefinedWellKnownLabels); err != nil { diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index d82e72c030..454ef13663 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -233,7 +233,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po return err } - tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]) + tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey]) hash := tg.Hash() if existing, ok := t.inverseTopologies[hash]; !ok { @@ -266,6 +266,24 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { pods = append(pods, podList.Items...) } + // capture new domain values from existing nodes that may not have any pods selected by the topology group + // scheduled to them already + t.cluster.ForEachNode(func(n *state.StateNode) bool { + // ignore the node if it doesn't match the topology group + if !tg.nodeFilter.Matches(n.Node) { + return true + } + domain, exists := n.Labels()[tg.Key] + if !exists { + return true + } + // ensure we at least have a count of zero for this potentially new topology domain + if _, countExists := tg.domains[domain]; !countExists { + tg.domains[domain] = 0 + } + return true + }) + for i, p := range pods { if IgnoredForTopology(&pods[i]) { continue @@ -311,7 +329,8 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup { var topologyGroups []*TopologyGroup for _, cs := range p.Spec.TopologySpreadConstraints { - topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), + cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domains[cs.TopologyKey])) } return topologyGroups } @@ -348,7 +367,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo if err != nil { return nil, err } - topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey])) } } return topologyGroups, nil diff --git a/pkg/controllers/provisioning/scheduling/topology_test.go b/pkg/controllers/provisioning/scheduling/topology_test.go index ab1ee034b3..7c4580ef6f 100644 --- a/pkg/controllers/provisioning/scheduling/topology_test.go +++ b/pkg/controllers/provisioning/scheduling/topology_test.go @@ -1126,6 +1126,365 @@ var _ = Describe("Topology", func() { }) }) + Context("NodeTaintsPolicy", func() { + It("should balance pods across a label (NodeTaintsPolicy=ignore)", func() { + if env.Version.Minor() < 26 { + Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") + } + + const spreadLabel = "karpenter.sh/fake-label" + nodePool.Spec.Template.Labels = map[string]string{ + spreadLabel: "baz", + } + node1 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + spreadLabel: "foo", + }, + }, + Taints: []corev1.Taint{ + { + Key: "taintname", + Value: "taintvalue", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + }}) + + node2 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + spreadLabel: "bar", + }, + }, + Taints: []corev1.Taint{ + { + Key: "taintname", + Value: "taintvalue", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + }}) + ExpectApplied(ctx, env.Client, nodePool, node1, node2) + + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + + // there are now three domains for spreadLabel that Karpenter should know about, foo/bar from the two existing + // nodes and baz from the node pool + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov) + ignore := corev1.NodeInclusionPolicyIgnore + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: spreadLabel, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + NodeTaintsPolicy: &ignore, + }} + + ExpectApplied(ctx, env.Client, nodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, + test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + TopologySpreadConstraints: topology, + }, 5)..., + ) + // we're aware of three domains, but can only schedule a single pod to the domain where we're allowed + // to create nodes + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1)) + }) + It("should balance pods across a label (NodeTaintsPolicy=honor)", func() { + if env.Version.Minor() < 26 { + Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") + } + const spreadLabel = "karpenter.sh/fake-label" + nodePool.Spec.Template.Labels = map[string]string{ + spreadLabel: "baz", + } + node1 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + spreadLabel: "foo", + }, + }, + Taints: []corev1.Taint{ + { + Key: "taintname", + Value: "taintvalue", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + }}) + + node2 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + spreadLabel: "bar", + }, + }, + Taints: []corev1.Taint{ + { + Key: "taintname", + Value: "taintvalue", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + }}) + ExpectApplied(ctx, env.Client, nodePool, node1, node2) + + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + + // since two of the nodes are tainted, Karpenter should not consider them for topology domain discovery + // purposes + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov) + honor := corev1.NodeInclusionPolicyHonor + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: spreadLabel, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + NodeTaintsPolicy: &honor, + }} + + ExpectApplied(ctx, env.Client, nodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, + test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + TopologySpreadConstraints: topology, + }, 5)..., + ) + // and should schedule all of the pods on the same node + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(5)) + }) + FIt("should balance pods across a label when discovered from the provisioner (NodeTaintsPolicy=honor)", func() { + if env.Version.Minor() < 26 { + Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") + } + const spreadLabel = "karpenter.sh/fake-label" + nodePool.Spec.Template.Labels = map[string]string{ + spreadLabel: "baz", + } + + nodePoolTainted := test.NodePool(v1.NodePool{ + Spec: v1.NodePoolSpec{ + Template: v1.NodeClaimTemplate{ + Spec: v1.NodeClaimTemplateSpec{ + Taints: []corev1.Taint{ + { + Key: "taintname", + Value: "taintvalue", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + Requirements: []v1.NodeSelectorRequirementWithMinValues{ + { + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: v1.CapacityTypeLabelKey, + Operator: corev1.NodeSelectorOpExists, + }, + }, + { + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: spreadLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"foo", "bar"}, + }, + }, + }, + }, + }, + }, + }) + + ExpectApplied(ctx, env.Client, nodePool, nodePoolTainted) + + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov) + honor := corev1.NodeInclusionPolicyHonor + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: spreadLabel, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + NodeTaintsPolicy: &honor, + }} + + ExpectApplied(ctx, env.Client, nodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, + test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + TopologySpreadConstraints: topology, + }, 5)..., + ) + // and should schedule all of the pods on the same node + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(5)) + }) + }) + + Context("NodeAffinityPolicy", func() { + It("should balance pods across a label (NodeAffinityPolicy=ignore)", func() { + if env.Version.Minor() < 26 { + Skip("NodeAffinityPolicy ony enabled by default for K8s >= 1.26.x") + } + const spreadLabel = "karpenter.sh/fake-label" + const affinityLabel = "karpenter.sh/selector" + const affinityMismatch = "mismatch" + const affinityMatch = "value" + + nodePool.Spec.Template.Labels = map[string]string{ + spreadLabel: "baz", + affinityLabel: affinityMatch, + } + node1 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + spreadLabel: "foo", + affinityLabel: affinityMismatch, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + }}) + + node2 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + spreadLabel: "bar", + affinityLabel: affinityMismatch, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + }}) + ExpectApplied(ctx, env.Client, nodePool, node1, node2) + + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + + // there are now three domains for spreadLabel that Karpenter should know about since we are ignoring the + // fact that the pod can't schedule to two of them because of a required node affinity. foo/bar from the two + // existing nodes with an affinityLabel=affinityMismatch label and baz from the node pool + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov) + ignore := corev1.NodeInclusionPolicyIgnore + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: spreadLabel, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + NodeAffinityPolicy: &ignore, + }} + + ExpectApplied(ctx, env.Client, nodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, + test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + NodeSelector: map[string]string{ + affinityLabel: affinityMatch, + }, + TopologySpreadConstraints: topology, + }, 5)..., + ) + // we're aware of three domains, but can only schedule a single pod to the domain where we're allowed + // to create nodes + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1)) + }) + It("should balance pods across a label (NodeAffinityPolicy=honor)", func() { + if env.Version.Minor() < 26 { + Skip("NodeAffinityPolicy ony enabled by default for K8s >= 1.26.x") + } + const spreadLabel = "karpenter.sh/fake-label" + const affinityLabel = "karpenter.sh/selector" + const affinityMismatch = "mismatch" + const affinityMatch = "value" + + nodePool.Spec.Template.Labels = map[string]string{ + spreadLabel: "baz", + affinityLabel: affinityMatch, + } + node1 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + spreadLabel: "foo", + affinityLabel: affinityMismatch, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + }}) + + node2 := test.Node(test.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + spreadLabel: "bar", + affinityLabel: affinityMismatch, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + }}) + ExpectApplied(ctx, env.Client, nodePool, node1, node2) + + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov) + ignore := corev1.NodeInclusionPolicyHonor + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: spreadLabel, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + NodeAffinityPolicy: &ignore, + }} + + ExpectApplied(ctx, env.Client, nodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, + test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + NodeSelector: map[string]string{ + affinityLabel: affinityMatch, + }, + TopologySpreadConstraints: topology, + }, 5)..., + ) + // we're only aware of the single domain, since we honor the node affinity policy and all of the pods + // should schedule there + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(5)) + }) + }) Context("Combined Zonal and Capacity Type Topology", func() { It("should spread pods while respecting both constraints", func() { topology := []corev1.TopologySpreadConstraint{{ diff --git a/pkg/controllers/provisioning/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go index 1249636ad8..c9b43976cc 100644 --- a/pkg/controllers/provisioning/scheduling/topologygroup.go +++ b/pkg/controllers/provisioning/scheduling/topologygroup.go @@ -68,7 +68,7 @@ type TopologyGroup struct { emptyDomains sets.Set[string] // domains for which we know that no pod exists } -func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup { +func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, taintPolicy *v1.NodeInclusionPolicy, affinityPolicy *v1.NodeInclusionPolicy, domains sets.Set[string]) *TopologyGroup { domainCounts := map[string]int32{} for domain := range domains { domainCounts[domain] = 0 @@ -76,8 +76,17 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod // the nil *TopologyNodeFilter always passes which is what we need for affinity/anti-affinity var nodeSelector TopologyNodeFilter if topologyType == TopologyTypeSpread { - nodeSelector = MakeTopologyNodeFilter(pod) + nodeTaintsPolicy := v1.NodeInclusionPolicyIgnore + if taintPolicy != nil { + nodeTaintsPolicy = *taintPolicy + } + nodeAffinityPolicy := v1.NodeInclusionPolicyHonor + if affinityPolicy != nil { + nodeAffinityPolicy = *affinityPolicy + } + nodeSelector = MakeTopologyNodeFilter(pod, nodeTaintsPolicy, nodeAffinityPolicy) } + return &TopologyGroup{ Type: topologyType, Key: topologyKey, diff --git a/pkg/controllers/provisioning/scheduling/topologynodefilter.go b/pkg/controllers/provisioning/scheduling/topologynodefilter.go index d73b3b7936..dec545d084 100644 --- a/pkg/controllers/provisioning/scheduling/topologynodefilter.go +++ b/pkg/controllers/provisioning/scheduling/topologynodefilter.go @@ -28,23 +28,36 @@ import ( // included for topology counting purposes. This is only used with topology spread constraints as affinities/anti-affinities // always count across all nodes. A nil or zero-value TopologyNodeFilter behaves well and the filter returns true for // all nodes. -type TopologyNodeFilter []scheduling.Requirements +type TopologyNodeFilter struct { + Requirements []scheduling.Requirements + TaintPolicy v1.NodeInclusionPolicy + AffinityPolicy v1.NodeInclusionPolicy + Tolerations []v1.Toleration +} -func MakeTopologyNodeFilter(p *v1.Pod) TopologyNodeFilter { +func MakeTopologyNodeFilter(p *v1.Pod, taintPolicy v1.NodeInclusionPolicy, affinityPolicy v1.NodeInclusionPolicy) TopologyNodeFilter { nodeSelectorRequirements := scheduling.NewLabelRequirements(p.Spec.NodeSelector) // if we only have a label selector, that's the only requirement that must match if p.Spec.Affinity == nil || p.Spec.Affinity.NodeAffinity == nil || p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { - return TopologyNodeFilter{nodeSelectorRequirements} + return TopologyNodeFilter{ + Requirements: []scheduling.Requirements{nodeSelectorRequirements}, + TaintPolicy: taintPolicy, + AffinityPolicy: affinityPolicy, + Tolerations: p.Spec.Tolerations, + } } // otherwise, we need to match the combination of label selector and any term of the required node affinities since // those terms are OR'd together - var filter TopologyNodeFilter + filter := TopologyNodeFilter{ + TaintPolicy: taintPolicy, + AffinityPolicy: affinityPolicy, + } for _, term := range p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { requirements := scheduling.NewRequirements() requirements.Add(nodeSelectorRequirements.Values()...) requirements.Add(scheduling.NewNodeSelectorRequirements(term.MatchExpressions...).Values()...) - filter = append(filter, requirements) + filter.Requirements = append(filter.Requirements, requirements) } return filter @@ -52,7 +65,17 @@ func MakeTopologyNodeFilter(p *v1.Pod) TopologyNodeFilter { // Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology func (t TopologyNodeFilter) Matches(node *v1.Node) bool { - return t.MatchesRequirements(scheduling.NewLabelRequirements(node.Labels)) + matchesAffinity := true + if t.AffinityPolicy == v1.NodeInclusionPolicyHonor { + matchesAffinity = t.MatchesRequirements(scheduling.NewLabelRequirements(node.Labels)) + } + matchesTaints := true + if t.TaintPolicy == v1.NodeInclusionPolicyHonor { + if err := scheduling.Taints(node.Spec.Taints).Tolerates(t.Tolerations); err != nil { + matchesTaints = false + } + } + return matchesAffinity && matchesTaints } // MatchesRequirements returns true if the TopologyNodeFilter doesn't prohibit a node with the requirements from @@ -60,11 +83,11 @@ func (t TopologyNodeFilter) Matches(node *v1.Node) bool { // node we will soon create participates in this topology. func (t TopologyNodeFilter) MatchesRequirements(requirements scheduling.Requirements, compatabilityOptions ...option.Function[scheduling.CompatibilityOptions]) bool { // no requirements, so it always matches - if len(t) == 0 { + if len(t.Requirements) == 0 || t.AffinityPolicy == v1.NodeInclusionPolicyIgnore { return true } // these are an OR, so if any passes the filter passes - for _, req := range t { + for _, req := range t.Requirements { if err := requirements.Compatible(req, compatabilityOptions...); err == nil { return true } diff --git a/pkg/scheduling/taints.go b/pkg/scheduling/taints.go index 102d9ce627..f6f8ae5026 100644 --- a/pkg/scheduling/taints.go +++ b/pkg/scheduling/taints.go @@ -40,12 +40,17 @@ var KnownEphemeralTaints = []corev1.Taint{ // Taints is a decorated alias type for []corev1.Taint type Taints []corev1.Taint -// Tolerates returns true if the pod tolerates all taints. -func (ts Taints) Tolerates(pod *corev1.Pod) (errs error) { +// ToleratesPod returns true if the pod tolerates all taints. +func (ts Taints) ToleratesPod(pod *corev1.Pod) error { + return ts.Tolerates(pod.Spec.Tolerations) +} + +// Tolerates returns true if the toleration slice tolerate all taints. +func (ts Taints) Tolerates(tolerations []corev1.Toleration) (errs error) { for i := range ts { taint := ts[i] tolerates := false - for _, t := range pod.Spec.Tolerations { + for _, t := range tolerations { tolerates = tolerates || t.ToleratesTaint(&taint) } if !tolerates { diff --git a/pkg/utils/pod/scheduling.go b/pkg/utils/pod/scheduling.go index 139df8e634..1fa622aae3 100644 --- a/pkg/utils/pod/scheduling.go +++ b/pkg/utils/pod/scheduling.go @@ -180,9 +180,14 @@ func HasDoNotDisrupt(pod *corev1.Pod) bool { return pod.Annotations[v1.DoNotDisruptAnnotationKey] == "true" } -// ToleratesDisruptedNoScheduleTaint returns true if the pod tolerates karpenter.sh/disrupted:NoSchedule taint +// ToleratesUnschedulableTaint returns true if the pod tolerates node.kubernetes.io/unschedulable taint +func ToleratesUnschedulableTaint(pod *corev1.Pod) bool { + return (scheduling.Taints{{Key: corev1.TaintNodeUnschedulable, Effect: corev1.TaintEffectNoSchedule}}).ToleratesPod(pod) == nil +} + +// ToleratesDisruptionNoScheduleTaint returns true if the pod tolerates karpenter.sh/disruption:NoSchedule=Disrupting taint func ToleratesDisruptedNoScheduleTaint(pod *corev1.Pod) bool { - return scheduling.Taints([]corev1.Taint{v1.DisruptedNoScheduleTaint}).Tolerates(pod) == nil + return scheduling.Taints([]corev1.Taint{v1.DisruptedNoScheduleTaint}).ToleratesPod(pod) == nil } // HasRequiredPodAntiAffinity returns true if a non-empty PodAntiAffinity/RequiredDuringSchedulingIgnoredDuringExecution diff --git a/test/pkg/environment/common/expectations.go b/test/pkg/environment/common/expectations.go index bab2c42725..b72b541b50 100644 --- a/test/pkg/environment/common/expectations.go +++ b/test/pkg/environment/common/expectations.go @@ -861,7 +861,7 @@ func (env *Environment) GetDaemonSetCount(np *v1.NodePool) int { return lo.CountBy(daemonSetList.Items, func(d appsv1.DaemonSet) bool { p := &corev1.Pod{Spec: d.Spec.Template.Spec} nodeClaimTemplate := pscheduling.NewNodeClaimTemplate(np) - if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).Tolerates(p); err != nil { + if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).ToleratesPod(p); err != nil { return false } if err := nodeClaimTemplate.Requirements.Compatible(scheduling.NewPodRequirements(p), scheduling.AllowUndefinedWellKnownLabels); err != nil { @@ -882,7 +882,7 @@ func (env *Environment) GetDaemonSetOverhead(np *v1.NodePool) corev1.ResourceLis return resources.RequestsForPods(lo.FilterMap(daemonSetList.Items, func(ds appsv1.DaemonSet, _ int) (*corev1.Pod, bool) { p := &corev1.Pod{Spec: ds.Spec.Template.Spec} nodeClaimTemplate := pscheduling.NewNodeClaimTemplate(np) - if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).Tolerates(p); err != nil { + if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).ToleratesPod(p); err != nil { return nil, false } if err := nodeClaimTemplate.Requirements.Compatible(scheduling.NewPodRequirements(p), scheduling.AllowUndefinedWellKnownLabels); err != nil { From 881dffb3ec3dcc7e98f1da040e43a8230d9a8943 Mon Sep 17 00:00:00 2001 From: Jason Deal Date: Thu, 7 Dec 2023 15:26:56 -0800 Subject: [PATCH 2/4] correctly handle domains from NodePools when honoring taints --- pkg/controllers/provisioning/provisioner.go | 35 ++-- .../scheduling/scheduling_benchmark_test.go | 3 +- .../provisioning/scheduling/topology.go | 16 +- .../provisioning/scheduling/topology_test.go | 188 +++++++++++++++--- .../scheduling/topologydomaingroup.go | 67 +++++++ .../provisioning/scheduling/topologygroup.go | 26 ++- 6 files changed, 270 insertions(+), 65 deletions(-) create mode 100644 pkg/controllers/provisioning/scheduling/topologydomaingroup.go diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 59e2fe0b97..afb8d5c2e9 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -33,7 +33,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" @@ -235,8 +234,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat nodePoolList.OrderByWeight() instanceTypes := map[string][]*cloudprovider.InstanceType{} - domains := map[string]sets.Set[string]{} - var notReadyNodePools []string + domainGroups := map[string]scheduler.TopologyDomainGroup{} for _, nodePool := range nodePoolList.Items { // Get instance type options instanceTypeOptions, err := p.cloudProvider.GetInstanceTypes(ctx, lo.ToPtr(nodePool)) @@ -252,6 +250,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat continue } instanceTypes[nodePool.Name] = append(instanceTypes[nodePool.Name], instanceTypeOptions...) + nodePoolTaints := nodePool.Spec.Template.Spec.Taints // Construct Topology Domains for _, instanceType := range instanceTypeOptions { @@ -261,15 +260,12 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...) requirements.Add(instanceType.Requirements.Values()...) - for key, requirement := range requirements { - // This code used to execute a Union between domains[key] and requirement.Values(). - // The downside of this is that Union is immutable and takes a copy of the set it is executed upon. - // This resulted in a lot of memory pressure on the heap and poor performance - // https://github.com/aws/karpenter/issues/3565 - if domains[key] == nil { - domains[key] = sets.New(requirement.Values()...) - } else { - domains[key].Insert(requirement.Values()...) + for topologyKey, requirement := range requirements { + if _, ok := domainGroups[topologyKey]; !ok { + domainGroups[topologyKey] = scheduler.NewTopologyDomainGroup() + } + for _, domain := range requirement.Values() { + domainGroups[topologyKey].Insert(domain, nodePoolTaints...) } } } @@ -278,23 +274,20 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...) for key, requirement := range requirements { if requirement.Operator() == corev1.NodeSelectorOpIn { - // The following is a performance optimisation, for the explanation see the comment above - if domains[key] == nil { - domains[key] = sets.New(requirement.Values()...) - } else { - domains[key].Insert(requirement.Values()...) + if _, ok := domainGroups[key]; !ok { + domainGroups[key] = scheduler.NewTopologyDomainGroup() + } + for _, value := range requirement.Values() { + domainGroups[key].Insert(value, nodePoolTaints...) } } } } - if len(notReadyNodePools) > 0 { - log.FromContext(ctx).WithValues("nodePools", nodePoolList).Info("skipped nodePools, not ready") - } // inject topology constraints pods = p.injectVolumeTopologyRequirements(ctx, pods) // Calculate cluster topology - topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, pods) + topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domainGroups, pods) if err != nil { return nil, fmt.Errorf("tracking topology counts, %w", err) } diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 1e4ab2540b..db9c6db58e 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -33,7 +33,6 @@ import ( "github.com/samber/lo" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" fakecr "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -167,7 +166,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { client := fakecr.NewFakeClient() pods := makeDiversePods(podCount) cluster = state.NewCluster(&clock.RealClock{}, client) - domains := map[string]sets.Set[string]{} + domains := map[string]scheduling.TopologyDomainGroup{} topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods) if err != nil { b.Fatalf("creating topology, %s", err) diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index 454ef13663..dbcfa910d0 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -52,18 +52,18 @@ type Topology struct { // in some cases. inverseTopologies map[uint64]*TopologyGroup // The universe of domains by topology key - domains map[string]sets.Set[string] + domainGroups map[string]TopologyDomainGroup // excludedPods are the pod UIDs of pods that are excluded from counting. This is used so we can simulate // moving pods to prevent them from being double counted. excludedPods sets.Set[string] cluster *state.Cluster } -func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domains map[string]sets.Set[string], pods []*corev1.Pod) (*Topology, error) { +func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domainGroups map[string]TopologyDomainGroup, pods []*corev1.Pod) (*Topology, error) { t := &Topology{ kubeClient: kubeClient, cluster: cluster, - domains: domains, + domainGroups: domainGroups, topologies: map[uint64]*TopologyGroup{}, inverseTopologies: map[uint64]*TopologyGroup{}, excludedPods: sets.New[string](), @@ -233,7 +233,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po return err } - tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey]) + tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey]) hash := tg.Hash() if existing, ok := t.inverseTopologies[hash]; !ok { @@ -269,6 +269,10 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { // capture new domain values from existing nodes that may not have any pods selected by the topology group // scheduled to them already t.cluster.ForEachNode(func(n *state.StateNode) bool { + // ignore state nodes which are tracking in-flight NodeClaims + if n.Node == nil { + return true + } // ignore the node if it doesn't match the topology group if !tg.nodeFilter.Matches(n.Node) { return true @@ -330,7 +334,7 @@ func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup { var topologyGroups []*TopologyGroup for _, cs := range p.Spec.TopologySpreadConstraints { topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), - cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domains[cs.TopologyKey])) + cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domainGroups[cs.TopologyKey])) } return topologyGroups } @@ -367,7 +371,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo if err != nil { return nil, err } - topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey])) } } return topologyGroups, nil diff --git a/pkg/controllers/provisioning/scheduling/topology_test.go b/pkg/controllers/provisioning/scheduling/topology_test.go index 7c4580ef6f..13aa14d8a9 100644 --- a/pkg/controllers/provisioning/scheduling/topology_test.go +++ b/pkg/controllers/provisioning/scheduling/topology_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduling_test import ( + "fmt" "time" . "github.com/onsi/gomega" @@ -1132,7 +1133,7 @@ var _ = Describe("Topology", func() { Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") } - const spreadLabel = "karpenter.sh/fake-label" + const spreadLabel = "fake-label" nodePool.Spec.Template.Labels = map[string]string{ spreadLabel: "baz", } @@ -1206,7 +1207,7 @@ var _ = Describe("Topology", func() { if env.Version.Minor() < 26 { Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") } - const spreadLabel = "karpenter.sh/fake-label" + const spreadLabel = "fake-label" nodePool.Spec.Template.Labels = map[string]string{ spreadLabel: "baz", } @@ -1275,23 +1276,91 @@ var _ = Describe("Topology", func() { // and should schedule all of the pods on the same node ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(5)) }) - FIt("should balance pods across a label when discovered from the provisioner (NodeTaintsPolicy=honor)", func() { + It("should balance pods across a label when discovered from the provisioner (NodeTaintsPolicy=ignore)", func() { + const spreadLabel = "fake-label" + const taintKey = "taint-key" + nodePool.Spec.Template.Spec.Requirements = append(nodePool.Spec.Template.Spec.Requirements, v1.NodeSelectorRequirementWithMinValues{ + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: spreadLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"foo"}, + }, + }) + taintedNodePool := test.NodePool(v1.NodePool{ + Spec: v1.NodePoolSpec{ + Template: v1.NodeClaimTemplate{ + Spec: v1.NodeClaimTemplateSpec{ + Taints: []corev1.Taint{ + { + Key: taintKey, + Value: "taint-value", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + Requirements: []v1.NodeSelectorRequirementWithMinValues{ + { + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: v1.CapacityTypeLabelKey, + Operator: corev1.NodeSelectorOpExists, + }, + }, + { + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: spreadLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"bar"}, + }, + }, + }, + }, + }, + }, + }) + + honor := corev1.NodeInclusionPolicyIgnore + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: spreadLabel, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + NodeTaintsPolicy: &honor, + }} + + pods := test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + TopologySpreadConstraints: topology, + }, 2) + + ExpectApplied(ctx, env.Client, nodePool, taintedNodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + + // should fail to schedule both pods, one pod is scheduled to domain "foo" but the other can't be scheduled to domain "bar" + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1)) + }) + It("should balance pods across a label when discovered from the provisioner (NodeTaintsPolicy=honor)", func() { if env.Version.Minor() < 26 { Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") } - const spreadLabel = "karpenter.sh/fake-label" - nodePool.Spec.Template.Labels = map[string]string{ - spreadLabel: "baz", - } - nodePoolTainted := test.NodePool(v1.NodePool{ + const spreadLabel = "fake-label" + const taintKey = "taint-key" + nodePool.Spec.Template.Spec.Requirements = append(nodePool.Spec.Template.Spec.Requirements, v1.NodeSelectorRequirementWithMinValues{ + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: spreadLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"foo"}, + }, + }) + taintedNodePool := test.NodePool(v1.NodePool{ Spec: v1.NodePoolSpec{ Template: v1.NodeClaimTemplate{ Spec: v1.NodeClaimTemplateSpec{ Taints: []corev1.Taint{ { - Key: "taintname", - Value: "taintvalue", + Key: taintKey, + Value: "taint-value", Effect: corev1.TaintEffectNoSchedule, }, }, @@ -1306,7 +1375,7 @@ var _ = Describe("Topology", func() { NodeSelectorRequirement: corev1.NodeSelectorRequirement{ Key: spreadLabel, Operator: corev1.NodeSelectorOpIn, - Values: []string{"foo", "bar"}, + Values: []string{"bar"}, }, }, }, @@ -1315,9 +1384,6 @@ var _ = Describe("Topology", func() { }, }) - ExpectApplied(ctx, env.Client, nodePool, nodePoolTainted) - - ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov) honor := corev1.NodeInclusionPolicyHonor topology := []corev1.TopologySpreadConstraint{{ TopologyKey: spreadLabel, @@ -1327,20 +1393,84 @@ var _ = Describe("Topology", func() { NodeTaintsPolicy: &honor, }} - ExpectApplied(ctx, env.Client, nodePool) - ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, - test.UnschedulablePods(test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: labels}, - ResourceRequirements: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), + pods := test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + TopologySpreadConstraints: topology, + }, 2) + + ExpectApplied(ctx, env.Client, nodePool, taintedNodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + + // should schedule all pods to domain "foo", ignoring bar since pods don't tolerate + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(2)) + }) + It("should balance pods across a label when mutually exclusive NodePools (by taints) share domains (NodeTaintsPolicy=honor)", func() { + const spreadLabel = "fake-label" + const taintKey = "taint-key" + + nodePools := lo.Map([][]string{{"foo", "bar"}, {"foo", "baz"}}, func(domains []string, i int) *v1.NodePool { + return test.NodePool(v1.NodePool{ + Spec: v1.NodePoolSpec{ + Template: v1.NodeClaimTemplate{ + Spec: v1.NodeClaimTemplateSpec{ + Taints: []corev1.Taint{ + { + Key: taintKey, + Value: fmt.Sprintf("nodepool-%d", i), + Effect: corev1.TaintEffectNoSchedule, + }, + }, + Requirements: []v1.NodeSelectorRequirementWithMinValues{ + { + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: v1.CapacityTypeLabelKey, + Operator: corev1.NodeSelectorOpExists, + }, + }, + { + NodeSelectorRequirement: corev1.NodeSelectorRequirement{ + Key: spreadLabel, + Operator: corev1.NodeSelectorOpIn, + Values: domains, + }, + }, + }, + }, }, }, + }) + }) + + honor := corev1.NodeInclusionPolicyHonor + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: spreadLabel, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + NodeTaintsPolicy: &honor, + }} + + pods := lo.Flatten(lo.Map(nodePools, func(np *v1.NodePool, _ int) []*corev1.Pod { + return test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, TopologySpreadConstraints: topology, - }, 5)..., - ) - // and should schedule all of the pods on the same node - ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(5)) + Tolerations: []corev1.Toleration{{ + Key: taintKey, + Effect: corev1.TaintEffectNoSchedule, + Value: np.Spec.Template.Spec.Taints[0].Value, + }}, + }, 2) + })) + + ExpectApplied(ctx, env.Client, nodePools[0], nodePools[1]) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + + // Expect 3 total nodes provisioned, 2 pods schedule to foo, 1 to bar, and 1 to baz + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 2, 1)) }) }) @@ -1349,8 +1479,8 @@ var _ = Describe("Topology", func() { if env.Version.Minor() < 26 { Skip("NodeAffinityPolicy ony enabled by default for K8s >= 1.26.x") } - const spreadLabel = "karpenter.sh/fake-label" - const affinityLabel = "karpenter.sh/selector" + const spreadLabel = "fake-label" + const affinityLabel = "selector" const affinityMismatch = "mismatch" const affinityMatch = "value" @@ -1420,8 +1550,8 @@ var _ = Describe("Topology", func() { if env.Version.Minor() < 26 { Skip("NodeAffinityPolicy ony enabled by default for K8s >= 1.26.x") } - const spreadLabel = "karpenter.sh/fake-label" - const affinityLabel = "karpenter.sh/selector" + const spreadLabel = "fake-label" + const affinityLabel = "selector" const affinityMismatch = "mismatch" const affinityMatch = "value" diff --git a/pkg/controllers/provisioning/scheduling/topologydomaingroup.go b/pkg/controllers/provisioning/scheduling/topologydomaingroup.go new file mode 100644 index 0000000000..8fa783a99b --- /dev/null +++ b/pkg/controllers/provisioning/scheduling/topologydomaingroup.go @@ -0,0 +1,67 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + v1 "k8s.io/api/core/v1" + + "sigs.k8s.io/karpenter/pkg/scheduling" +) + +// TopologyDomainGroup tracks the domains for a single topology. Additionally, it tracks the taints associated with +// each of these domains. This enables us to determine which domains should be considered by a pod if its +// NodeTaintPolicy is honor. +type TopologyDomainGroup map[string][][]v1.Taint + +func NewTopologyDomainGroup() TopologyDomainGroup { + return map[string][][]v1.Taint{} +} + +// Insert either adds a new domain to the TopologyDomainGroup or updates an existing domain. +func (t TopologyDomainGroup) Insert(domain string, taints ...v1.Taint) { + // Note: This could potentially be improved by removing any set of which the new set of taints is a proper subset. + // Currently this is only handled when the incoming set is the empty set. + if _, ok := t[domain]; !ok || len(taints) == 0 { + t[domain] = [][]v1.Taint{taints} + return + } + if len(t[domain][0]) == 0 { + // The domain already contains a set of taints which is the empty set, therefore this domain can be considered + // by all pods, regardless of their tolerations. There is no longer a need to track new sets of taints. + return + } + t[domain] = append(t[domain], taints) +} + +// ForEachDomain calls f on each domain tracked by the topology group +func (t TopologyDomainGroup) ForEachDomain(f func(domain string)) { + for domain := range t { + f(domain) + } +} + +// ForEachToleratedDomain calls f on each domain tracked by the TopologyDomainGroup which are also tolerated by the provided pod. +func (t TopologyDomainGroup) ForEachToleratedDomain(pod *v1.Pod, f func(domain string)) { + for domain, taintGroups := range t { + for _, taints := range taintGroups { + if err := scheduling.Taints(taints).ToleratesPod(pod); err == nil { + f(domain) + break + } + } + } +} diff --git a/pkg/controllers/provisioning/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go index c9b43976cc..20d28a6a78 100644 --- a/pkg/controllers/provisioning/scheduling/topologygroup.go +++ b/pkg/controllers/provisioning/scheduling/topologygroup.go @@ -68,11 +68,7 @@ type TopologyGroup struct { emptyDomains sets.Set[string] // domains for which we know that no pod exists } -func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, taintPolicy *v1.NodeInclusionPolicy, affinityPolicy *v1.NodeInclusionPolicy, domains sets.Set[string]) *TopologyGroup { - domainCounts := map[string]int32{} - for domain := range domains { - domainCounts[domain] = 0 - } +func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, taintPolicy *v1.NodeInclusionPolicy, affinityPolicy *v1.NodeInclusionPolicy, domainGroup TopologyDomainGroup) *TopologyGroup { // the nil *TopologyNodeFilter always passes which is what we need for affinity/anti-affinity var nodeSelector TopologyNodeFilter if topologyType == TopologyTypeSpread { @@ -87,6 +83,22 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod nodeSelector = MakeTopologyNodeFilter(pod, nodeTaintsPolicy, nodeAffinityPolicy) } + domains := map[string]int32{} + if nodeSelector.TaintPolicy == v1.NodeInclusionPolicyHonor { + domainGroup.ForEachToleratedDomain(pod, func(domain string) { + domains[domain] = 0 + }) + } else { + domainGroup.ForEachDomain(func(domain string) { + domains[domain] = 0 + }) + } + + emptyDomains := sets.New[string]() + domainGroup.ForEachDomain(func(domain string) { + emptyDomains.Insert(domain) + }) + return &TopologyGroup{ Type: topologyType, Key: topologyKey, @@ -94,8 +106,8 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod selector: labelSelector, nodeFilter: nodeSelector, maxSkew: maxSkew, - domains: domainCounts, - emptyDomains: domains.Clone(), + domains: domains, + emptyDomains: emptyDomains, owners: map[types.UID]struct{}{}, minDomains: minDomains, } From 7b3827f16e824b23094d55d949dfc9323179da14 Mon Sep 17 00:00:00 2001 From: Jason Deal Date: Thu, 7 Dec 2023 18:12:12 -0800 Subject: [PATCH 3/4] support matchLabelKeys for topology spread --- .../provisioning/scheduling/topology.go | 5 ++ .../provisioning/scheduling/topology_test.go | 76 +++++++++++++++---- 2 files changed, 68 insertions(+), 13 deletions(-) diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index dbcfa910d0..d8f7923cd9 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -333,6 +333,11 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup { var topologyGroups []*TopologyGroup for _, cs := range p.Spec.TopologySpreadConstraints { + for _, label := range cs.MatchLabelKeys { + if value, ok := p.ObjectMeta.Labels[label]; ok { + cs.LabelSelector.MatchLabels[label] = value + } + } topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domainGroups[cs.TopologyKey])) } diff --git a/pkg/controllers/provisioning/scheduling/topology_test.go b/pkg/controllers/provisioning/scheduling/topology_test.go index 13aa14d8a9..2fd859ab87 100644 --- a/pkg/controllers/provisioning/scheduling/topology_test.go +++ b/pkg/controllers/provisioning/scheduling/topology_test.go @@ -1127,11 +1127,69 @@ var _ = Describe("Topology", func() { }) }) + Context("MatchLabelKeys", func() { + BeforeEach(func() { + if env.Version.Minor() < 27 { + Skip("MatchLabelKeys only enabled by default forK8S >= 1.27.x") + } + }) + + It("should support matchLabelKeys", func() { + matchedLabel := "test-label" + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: corev1.LabelHostname, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MatchLabelKeys: []string{matchedLabel}, + MaxSkew: 1, + }} + count := 2 + pods := test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: lo.Assign(labels, map[string]string{matchedLabel: "value-a"}), + }, + TopologySpreadConstraints: topology, + }, count) + pods = append(pods, test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: lo.Assign(labels, map[string]string{matchedLabel: "value-b"}), + }, + TopologySpreadConstraints: topology, + }, count)...) + ExpectApplied(ctx, env.Client, nodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(2, 2)) + }) + + It("should ignore unknown labels specified in matchLabelKeys", func() { + matchedLabel := "test-label" + topology := []corev1.TopologySpreadConstraint{{ + TopologyKey: corev1.LabelHostname, + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MatchLabelKeys: []string{matchedLabel}, + MaxSkew: 1, + }} + pods := test.UnschedulablePods(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + TopologySpreadConstraints: topology, + }, 4) + ExpectApplied(ctx, env.Client, nodePool) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 1, 1, 1)) + }) + }) + Context("NodeTaintsPolicy", func() { - It("should balance pods across a label (NodeTaintsPolicy=ignore)", func() { + BeforeEach(func() { if env.Version.Minor() < 26 { - Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") + Skip("NodeTaintsPolicy only enabled by default for K8s >= 1.26.x") } + }) + + It("should balance pods across a label (NodeTaintsPolicy=ignore)", func() { const spreadLabel = "fake-label" nodePool.Spec.Template.Labels = map[string]string{ @@ -1204,9 +1262,6 @@ var _ = Describe("Topology", func() { ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1)) }) It("should balance pods across a label (NodeTaintsPolicy=honor)", func() { - if env.Version.Minor() < 26 { - Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") - } const spreadLabel = "fake-label" nodePool.Spec.Template.Labels = map[string]string{ spreadLabel: "baz", @@ -1340,10 +1395,6 @@ var _ = Describe("Topology", func() { ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1)) }) It("should balance pods across a label when discovered from the provisioner (NodeTaintsPolicy=honor)", func() { - if env.Version.Minor() < 26 { - Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x") - } - const spreadLabel = "fake-label" const taintKey = "taint-key" nodePool.Spec.Template.Spec.Requirements = append(nodePool.Spec.Template.Spec.Requirements, v1.NodeSelectorRequirementWithMinValues{ @@ -1475,10 +1526,12 @@ var _ = Describe("Topology", func() { }) Context("NodeAffinityPolicy", func() { - It("should balance pods across a label (NodeAffinityPolicy=ignore)", func() { + BeforeEach(func() { if env.Version.Minor() < 26 { Skip("NodeAffinityPolicy ony enabled by default for K8s >= 1.26.x") } + }) + It("should balance pods across a label (NodeAffinityPolicy=ignore)", func() { const spreadLabel = "fake-label" const affinityLabel = "selector" const affinityMismatch = "mismatch" @@ -1547,9 +1600,6 @@ var _ = Describe("Topology", func() { ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1)) }) It("should balance pods across a label (NodeAffinityPolicy=honor)", func() { - if env.Version.Minor() < 26 { - Skip("NodeAffinityPolicy ony enabled by default for K8s >= 1.26.x") - } const spreadLabel = "fake-label" const affinityLabel = "selector" const affinityMismatch = "mismatch" From 5958693505bec927ed322db9e1ab3d2957e81612 Mon Sep 17 00:00:00 2001 From: Jason Deal Date: Wed, 28 Aug 2024 12:12:18 -0700 Subject: [PATCH 4/4] inject expression, rather than overwrite label --- pkg/controllers/provisioning/scheduling/topology.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index d8f7923cd9..78267702bf 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -333,9 +333,13 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup { var topologyGroups []*TopologyGroup for _, cs := range p.Spec.TopologySpreadConstraints { - for _, label := range cs.MatchLabelKeys { - if value, ok := p.ObjectMeta.Labels[label]; ok { - cs.LabelSelector.MatchLabels[label] = value + for _, key := range cs.MatchLabelKeys { + if value, ok := p.ObjectMeta.Labels[key]; ok { + cs.LabelSelector.MatchExpressions = append(cs.LabelSelector.MatchExpressions, metav1.LabelSelectorRequirement{ + Key: key, + Operator: metav1.LabelSelectorOpIn, + Values: []string{value}, + }) } } topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace),