Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support new topologySpread scheduling constraints #852

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 14 additions & 21 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -235,8 +234,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
nodePoolList.OrderByWeight()

instanceTypes := map[string][]*cloudprovider.InstanceType{}
domains := map[string]sets.Set[string]{}
var notReadyNodePools []string
domainGroups := map[string]scheduler.TopologyDomainGroup{}
for _, nodePool := range nodePoolList.Items {
// Get instance type options
instanceTypeOptions, err := p.cloudProvider.GetInstanceTypes(ctx, lo.ToPtr(nodePool))
Expand All @@ -252,6 +250,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
continue
}
instanceTypes[nodePool.Name] = append(instanceTypes[nodePool.Name], instanceTypeOptions...)
nodePoolTaints := nodePool.Spec.Template.Spec.Taints

// Construct Topology Domains
for _, instanceType := range instanceTypeOptions {
Expand All @@ -261,15 +260,12 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...)
requirements.Add(instanceType.Requirements.Values()...)

for key, requirement := range requirements {
// This code used to execute a Union between domains[key] and requirement.Values().
// The downside of this is that Union is immutable and takes a copy of the set it is executed upon.
// This resulted in a lot of memory pressure on the heap and poor performance
// https://github.com/aws/karpenter/issues/3565
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
for topologyKey, requirement := range requirements {
if _, ok := domainGroups[topologyKey]; !ok {
domainGroups[topologyKey] = scheduler.NewTopologyDomainGroup()
}
for _, domain := range requirement.Values() {
domainGroups[topologyKey].Insert(domain, nodePoolTaints...)
}
}
}
Expand All @@ -278,23 +274,20 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...)
for key, requirement := range requirements {
if requirement.Operator() == corev1.NodeSelectorOpIn {
// The following is a performance optimisation, for the explanation see the comment above
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
if _, ok := domainGroups[key]; !ok {
domainGroups[key] = scheduler.NewTopologyDomainGroup()
}
for _, value := range requirement.Values() {
domainGroups[key].Insert(value, nodePoolTaints...)
}
}
}
}
if len(notReadyNodePools) > 0 {
log.FromContext(ctx).WithValues("nodePools", nodePoolList).Info("skipped nodePools, not ready")
}
// inject topology constraints
pods = p.injectVolumeTopologyRequirements(ctx, pods)

// Calculate cluster topology
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, pods)
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domainGroups, pods)
if err != nil {
return nil, fmt.Errorf("tracking topology counts, %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, daemonResources v1.

func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod) error {
// Check Taints
if err := scheduling.Taints(n.Taints()).Tolerates(pod); err != nil {
if err := scheduling.Taints(n.Taints()).ToleratesPod(pod); err != nil {
return err
}
// determine the volumes that will be mounted if the pod schedules
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem

func (n *NodeClaim) Add(pod *v1.Pod) error {
// Check Taints
if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil {
if err := scheduling.Taints(n.Spec.Taints).ToleratesPod(pod); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (s *Scheduler) calculateExistingNodeClaims(stateNodes []*state.StateNode, d
// Calculate any daemonsets that should schedule to the inflight node
var daemons []*corev1.Pod
for _, p := range daemonSetPods {
if err := scheduling.Taints(node.Taints()).Tolerates(p); err != nil {
if err := scheduling.Taints(node.Taints()).ToleratesPod(p); err != nil {
continue
}
if err := scheduling.NewLabelRequirements(node.Labels()).Compatible(scheduling.NewPodRequirements(p)); err != nil {
Expand Down Expand Up @@ -340,7 +340,7 @@ func getDaemonOverhead(nodeClaimTemplates []*NodeClaimTemplate, daemonSetPods []
for _, nodeClaimTemplate := range nodeClaimTemplates {
var daemons []*corev1.Pod
for _, p := range daemonSetPods {
if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).Tolerates(p); err != nil {
if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).ToleratesPod(p); err != nil {
continue
}
if err := nodeClaimTemplate.Requirements.Compatible(scheduling.NewPodRequirements(p), scheduling.AllowUndefinedWellKnownLabels); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
fakecr "sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -167,7 +166,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
client := fakecr.NewFakeClient()
pods := makeDiversePods(podCount)
cluster = state.NewCluster(&clock.RealClock{}, client)
domains := map[string]sets.Set[string]{}
domains := map[string]scheduling.TopologyDomainGroup{}
topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods)
if err != nil {
b.Fatalf("creating topology, %s", err)
Expand Down
44 changes: 38 additions & 6 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ type Topology struct {
// in some cases.
inverseTopologies map[uint64]*TopologyGroup
// The universe of domains by topology key
domains map[string]sets.Set[string]
domainGroups map[string]TopologyDomainGroup
// excludedPods are the pod UIDs of pods that are excluded from counting. This is used so we can simulate
// moving pods to prevent them from being double counted.
excludedPods sets.Set[string]
cluster *state.Cluster
}

func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domains map[string]sets.Set[string], pods []*corev1.Pod) (*Topology, error) {
func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domainGroups map[string]TopologyDomainGroup, pods []*corev1.Pod) (*Topology, error) {
t := &Topology{
kubeClient: kubeClient,
cluster: cluster,
domains: domains,
domainGroups: domainGroups,
topologies: map[uint64]*TopologyGroup{},
inverseTopologies: map[uint64]*TopologyGroup{},
excludedPods: sets.New[string](),
Expand Down Expand Up @@ -233,7 +233,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po
return err
}

tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey])

hash := tg.Hash()
if existing, ok := t.inverseTopologies[hash]; !ok {
Expand Down Expand Up @@ -266,6 +266,28 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
pods = append(pods, podList.Items...)
}

// capture new domain values from existing nodes that may not have any pods selected by the topology group
// scheduled to them already
t.cluster.ForEachNode(func(n *state.StateNode) bool {
// ignore state nodes which are tracking in-flight NodeClaims
if n.Node == nil {
return true
}
// ignore the node if it doesn't match the topology group
if !tg.nodeFilter.Matches(n.Node) {
return true
}
domain, exists := n.Labels()[tg.Key]
if !exists {
return true
}
// ensure we at least have a count of zero for this potentially new topology domain
if _, countExists := tg.domains[domain]; !countExists {
tg.domains[domain] = 0
}
return true
})

for i, p := range pods {
if IgnoredForTopology(&pods[i]) {
continue
Expand Down Expand Up @@ -311,7 +333,17 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup {
var topologyGroups []*TopologyGroup
for _, cs := range p.Spec.TopologySpreadConstraints {
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey]))
for _, key := range cs.MatchLabelKeys {
if value, ok := p.ObjectMeta.Labels[key]; ok {
cs.LabelSelector.MatchExpressions = append(cs.LabelSelector.MatchExpressions, metav1.LabelSelectorRequirement{
Key: key,
Operator: metav1.LabelSelectorOpIn,
Values: []string{value},
})
}
}
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace),
cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domainGroups[cs.TopologyKey]))
}
return topologyGroups
}
Expand Down Expand Up @@ -348,7 +380,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo
if err != nil {
return nil, err
}
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]))
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey]))
}
}
return topologyGroups, nil
Expand Down
Loading
Loading