Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 3 additions & 55 deletions scheduler/scheduler_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,59 +450,6 @@ func (s *SystemScheduler) computeJobAllocs() error {
return nil
}

// findNodesForTG runs feasibility checks on nodes. The result includes all nodes for each
// task group (feasible and infeasible) along with metrics information on the checks.
func (s *SystemScheduler) findNodesForTG(buckets *reconciler.NodeReconcileResult) (tgNodes map[string]taskGroupNodes, filteredMetrics map[string]*structs.AllocMetric) {
tgNodes = make(map[string]taskGroupNodes)
filteredMetrics = make(map[string]*structs.AllocMetric)

nodeByID := make(map[string]*structs.Node, len(s.nodes))
for _, node := range s.nodes {
nodeByID[node.ID] = node
}

nodes := make([]*structs.Node, 1)
for _, a := range slices.Concat(buckets.Place, buckets.Update, buckets.Ignore) {
tgName := a.TaskGroup.Name
if tgNodes[tgName] == nil {
tgNodes[tgName] = taskGroupNodes{}
}

node, ok := nodeByID[a.Alloc.NodeID]
if !ok {
s.logger.Debug("could not find node", "node", a.Alloc.NodeID)
continue
}

// Update the set of placement nodes
nodes[0] = node
s.stack.SetNodes(nodes)

if a.Alloc.ID != "" {
// temporarily include the old alloc from a destructive update so
// that we can account for resources that will be freed by that
// allocation. We'll back this change out if we end up needing to
// limit placements by max_parallel or canaries.
s.plan.AppendStoppedAlloc(a.Alloc, sstructs.StatusAllocUpdating, "", "")
}

// Attempt to match the task group
option := s.stack.Select(a.TaskGroup, &feasible.SelectOptions{AllocName: a.Name})

// Always store the results. Keep the metrics that were generated
// for the match attempt so they can be used during placement.
tgNodes[tgName] = append(tgNodes[tgName], &taskGroupNode{node.ID, option, s.ctx.Metrics().Copy()})

if option == nil {
// When no match is found, merge the filter metrics for the task
// group so proper reporting can be done during placement.
filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics())
}
}

return
}

// computePlacements computes placements for allocations
func (s *SystemScheduler) computePlacements(
reconcilerResult *reconciler.NodeReconcileResult, existingByTaskGroup map[string]bool,
Expand Down Expand Up @@ -747,8 +694,9 @@ func (s *SystemScheduler) evictAndPlace(reconciled *reconciler.NodeReconcileResu
// findFeasibleNodesForTG method marks all old allocs from a destructive
// update as stopped in order to get an accurate feasible nodes count
// (accounting for resources that would be freed). Now we need to remove all
// the allocs that have StatusAllocUpdating from the set we're stopping (NodeUpdate) and
// only place and stop the ones the ones that correspond to updates limited by max parallel.
// the allocs that have StatusAllocUpdating from the set we're stopping
// (NodeUpdate) and only place and stop the ones the ones that correspond to
// updates limited by max parallel.
for node, allocations := range s.plan.NodeUpdate {
n := 0
for _, alloc := range allocations {
Expand Down
68 changes: 68 additions & 0 deletions scheduler/scheduler_system_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !ent
// +build !ent

package scheduler

import (
"slices"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler/feasible"
"github.com/hashicorp/nomad/scheduler/reconciler"
sstructs "github.com/hashicorp/nomad/scheduler/structs"
)

// findNodesForTG runs feasibility checks on nodes. The result includes all nodes for each
// task group (feasible and infeasible) along with metrics information on the checks.
func (s *SystemScheduler) findNodesForTG(buckets *reconciler.NodeReconcileResult) (tgNodes map[string]taskGroupNodes, filteredMetrics map[string]*structs.AllocMetric) {
tgNodes = make(map[string]taskGroupNodes)
filteredMetrics = make(map[string]*structs.AllocMetric)

nodeByID := make(map[string]*structs.Node, len(s.nodes))
for _, node := range s.nodes {
nodeByID[node.ID] = node
}

nodes := make([]*structs.Node, 1)
for _, a := range slices.Concat(buckets.Place, buckets.Update, buckets.Ignore) {
tgName := a.TaskGroup.Name
if tgNodes[tgName] == nil {
tgNodes[tgName] = taskGroupNodes{}
}

node, ok := nodeByID[a.Alloc.NodeID]
if !ok {
s.logger.Debug("could not find node", "node", a.Alloc.NodeID)
continue
}

// Update the set of placement nodes
nodes[0] = node
s.stack.SetNodes(nodes)

if a.Alloc.ID != "" {
// temporarily include the old alloc from a destructive update so
// that we can account for resources that will be freed by that
// allocation. We'll back this change out if we end up needing to
// limit placements by max_parallel or canaries.
s.plan.AppendStoppedAlloc(a.Alloc, sstructs.StatusAllocUpdating, "", "")
}

// Attempt to match the task group
option := s.stack.Select(a.TaskGroup, &feasible.SelectOptions{AllocName: a.Name})

// Always store the results. Keep the metrics that were generated
// for the match attempt so they can be used during placement.
tgNodes[tgName] = append(tgNodes[tgName], &taskGroupNode{node.ID, option, s.ctx.Metrics().Copy()})

if option == nil {
// When no match is found, merge the filter metrics for the task
// group so proper reporting can be done during placement.
filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics())
}
}
return
}
Loading