Skip to content

Latest commit

 

History

History
193 lines (149 loc) · 8.71 KB

File metadata and controls

193 lines (149 loc) · 8.71 KB

第一步:找到所有可以抢占的目标节点

概述

抢占功能的第一步是找到哪些节点可以用来进行抢占操作。整个流程位于 FindCandidates() 中。

其中的核心实现就是在内存中模拟真实的抢占操作,即删除优先级低的 Pod,然后将当前 Pod 调度到节点上。

在模拟抢占之前会做一些准备工作,我们一步步进行分析。

准备工作

func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, error) {
	allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()
    ...
	if len(allNodes) == 0 {
		return nil, core.ErrNoNodesAvailable
	}

	potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
    ...
}

func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) []*framework.NodeInfo {
	var potentialNodes []*framework.NodeInfo
	for _, node := range nodes {
		name := node.Node().Name
		// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
		// to determine whether preemption may help or not on the node.
		if m[name].Code() == framework.UnschedulableAndUnresolvable {
			continue
		}
		potentialNodes = append(potentialNodes, node)
	}
	return potentialNodes
}

准备工作的第一步是对所有的节点进行初步的过滤,通过调用 nodesWherePreemptionMightHelp() 来实现,将那些 UnschedulableAndUnresolvable 类型的节点从候选节点列表中删除。这个类型是由 PreFilter 扩展点为节点加上去的,表示此节点无法调度,即使进行了抢占也无济于事。因此这里会过滤掉这些节点。

准备工作的第二步是使用 getPodDisruptionBudgets() 找到所有的 PodDisruptionBudgets,在后续模拟抢占的时候会用到。

func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, error) {
    ...

	pdbs, err := getPodDisruptionBudgets(pl.pdbLister)
	if err != nil {
		return nil, err
	}
    ...
}

func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
	if pdbLister != nil {
		return pdbLister.List(labels.Everything())
	}
	return nil, nil
}

准备工作的最后一步是调用 getOffsetAndNumCandidates() 找到一个偏移值和预期抢占节点的数量。

func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, error) {
    ...

	offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes)))

    ...
}

func (pl *DefaultPreemption) getOffsetAndNumCandidates(numNodes int32) (int32, int32) {
	return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
}

func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
	n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
	if n < pl.args.MinCandidateNodesAbsolute {
		n = pl.args.MinCandidateNodesAbsolute
	}
	if n > numNodes {
		n = numNodes
	}
	return n
}

可以看出这里的偏移值是小于总节点数的一个随机数。而预期抢占节点的数量的功能相当于是一个优化,即如果有大量的节点,对所有节点进行模拟抢占会造成资源消耗巨大,也没有必要,因此在创建抢占插件的时候,提供了两个参数:

type DefaultPreemptionArgs struct {
    ...
	MinCandidateNodesPercentage int32
	MinCandidateNodesAbsolute int32
}

这两个参数都是用来约束最终期望最少有多少个节点可以实施抢占操作。第一个是一个百分比,默认为10,即10%;第二个是一个数量的绝对值,默认为100个节点。

在上面的 calculateNumCandidates() 中可以看出计算这个数值的逻辑为:

期望可以实施抢占的节点数=min(max(节点总数*MinCandidateNodesPercentage/100, MinCandidateNodesAbsolute),节点总数)

以上是所有模拟抢占前的准备工作。

模拟抢占

现在所有的准备工作都已经完成,接下来会执行模拟抢占,会把准备工作中的返回值作为参数进行执行,代码在 FindCandidates() 中的最后一步 dryRunPreemption()

func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, error) {
    ...

	return dryRunPreemption(ctx, pl.fh.PreemptHandle(), state, pod, potentialNodes, pdbs, offset, numCandidates), nil
}

dryRunPreemption

dryRunPreemption() 中的逻辑是定义了一个函数用于对单个节点进行模拟抢占操作,然后通过并行调用这个函数来遍历每一个节点。

func dryRunPreemption(ctx context.Context, fh framework.PreemptHandle,
	state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
	pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) []Candidate {
	nonViolatingCandidates := newCandidateList(numCandidates)
	violatingCandidates := newCandidateList(numCandidates)
	parallelCtx, cancel := context.WithCancel(ctx)

	checkNode := func(i int) {
		nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
		stateCopy := state.Clone()
		pods, numPDBViolations, fits := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs)
		if fits {
			victims := extenderv1.Victims{
				Pods:             pods,
				NumPDBViolations: int64(numPDBViolations),
			}
			c := &candidate{
				victims: &victims,
				name:    nodeInfoCopy.Node().Name,
			}
			if numPDBViolations == 0 {
				nonViolatingCandidates.add(c)
			} else {
				violatingCandidates.add(c)
			}
			nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
			if nvcSize > 0 && nvcSize+vcSize >= numCandidates {
				cancel()
			}
		}
	}
	parallelize.Until(parallelCtx, len(potentialNodes), checkNode)
	return append(nonViolatingCandidates.get(), violatingCandidates.get()...)
}

checkNode 这个函数中调用 selectVictimsOnNode() 来对一个节点进行模拟抢占操作。

注意在调用之前,首先把节点信息和调度框架的状态信息都进行了克隆操作,这样就可以对克隆后的对象进行模拟抢占而不会影响到原来的对象。

selectVictimsOnNode() 的返回值有三个:

  • pods。当前节点在执行抢占操作时需要被删除的 Pod 列表。
  • numPDBViolations。进行抢占操作时,会影响到的个干扰 violations of PodDisruptionBudget的数量
  • fits。表示当前节点是否适合抢占。

因此,checkNode() 的逻辑是对当前节点调用 selectVictimsOnNode() 进行模拟抢占,如果模拟抢占成功,则根据 numPDBViolations 是否为 0 来将当前节点分为两类:nonViolatingCandidates 和 violatingCandidates,前者是抢占时不会影响干扰的节点,后者是会影响到干扰的节点。

使用 checkNode() 对所有节点进行遍历调用,当 nonViolatingCandidates 大于 0 同时 nonViolatingCandidates 和 violatingCandidates 的总和大于预期的抢占节点的数量时,则停止遍历。然后将结果返回。

以上是对于模拟抢占的外围逻辑进行了分析,下面详细分析对于某一个节点是如何进行模拟抢占操作的,代码位于 selectVictimsOnNode() 中。

selectVictimsOnNode

selectVictimsOnNode() 的作用是对某一个节点进行模拟抢占。基本逻辑如下:

  1. 删除节点上比当前 Pod 优先级低的其它 Pod,同时将这些 Pod 加入到被抢占 Pod 的列表A中。
  2. 在执行完第1步的前提下,运行所有调度插件的 Filter 扩展点,判断当前 Pod 是否可以调度到节点上。
  3. 对第1步的被抢占 Pod 的列表A按照优先级进行排序。
  4. 将被强占 Pod 列表A分成两类,分类依据是这些 Pod 在被强占的情况下会不会影响 干扰violations of PodDisruptionBudget
  5. 对这两类被强占的 Pod 分别进行遍历,一个一个地将这些被强占的 Pod 重新加到节点上,然后每加一个 Pod,就会立即针对当前被调度的 Pod 执行 Filter 扩展点,看是否还适合被调度到节点上,如果是,则添加下一个被强占的 Pod;如果否,则将从节点上把刚加到节点上的被抢占的 Pod 删除,同时将其加入最终的被强占的 Pod 列表B中,最后返回这个最终的被强占的 Pod 列表B。