diff --git a/e2e/scheduler_system/systemsched_test.go b/e2e/scheduler_system/systemsched_test.go index ee17ada6d91..dfa27a07b49 100644 --- a/e2e/scheduler_system/systemsched_test.go +++ b/e2e/scheduler_system/systemsched_test.go @@ -218,16 +218,6 @@ func testCanaryDeploymentToAllEligibleNodes(t *testing.T) { ) t.Cleanup(cleanup2) - // how many eligible nodes do we have? - nodesApi := job2.NodesApi() - nodesList, _, err := nodesApi.List(nil) - must.Nil(t, err) - must.SliceNotEmpty(t, nodesList) - - // Get updated allocations - allocs := job2.Allocs() - must.SliceNotEmpty(t, allocs) - deploymentsApi := job2.DeploymentsApi() deploymentsList, _, err := deploymentsApi.List(nil) must.NoError(t, err) @@ -253,6 +243,10 @@ func testCanaryDeploymentToAllEligibleNodes(t *testing.T) { return false }) + // Get updated allocations + allocs := job2.Allocs() + must.SliceNotEmpty(t, allocs) + // find allocations from v1 version of the job, they should all be canaries count := 0 for _, a := range allocs { @@ -263,7 +257,10 @@ func testCanaryDeploymentToAllEligibleNodes(t *testing.T) { } must.Eq(t, len(initialAllocs), count, must.Sprint("expected canaries to be placed on all eligible nodes")) + updatedDeployment, _, err := deploymentsApi.Info(deployment.ID, nil) + must.NoError(t, err) + // deployment must not be terminal and needs to have the right status // description set - must.Eq(t, structs.DeploymentStatusDescriptionRunningNeedsPromotion, deployment.StatusDescription) + must.Eq(t, structs.DeploymentStatusDescriptionRunningNeedsPromotion, updatedDeployment.StatusDescription) } diff --git a/scheduler/feasible/context.go b/scheduler/feasible/context.go index a03b62cfe9a..9d9817c31e3 100644 --- a/scheduler/feasible/context.go +++ b/scheduler/feasible/context.go @@ -252,6 +252,13 @@ func NewEvalEligibility() *EvalEligibility { } } +// Reset clears the contents of the eval eligibility +func (e *EvalEligibility) Reset() { + e.job = make(map[string]ComputedClassFeasibility) + e.taskGroups = make(map[string]map[string]ComputedClassFeasibility) + e.tgEscapedConstraints = make(map[string]bool) +} + // SetJob takes the job being evaluated and calculates the escaped constraints // at the job and task group level. func (e *EvalEligibility) SetJob(job *structs.Job) { diff --git a/scheduler/feasible/stack.go b/scheduler/feasible/stack.go index 9941cbe9706..52404d4113b 100644 --- a/scheduler/feasible/stack.go +++ b/scheduler/feasible/stack.go @@ -352,6 +352,11 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran // Reset the binpack selector and context s.scoreNorm.Reset() s.ctx.Reset() + + // Since the system stack is always evaluating a single + // node, previous eligibility information is not applicable + // so reset it + s.ctx.Eligibility().Reset() start := time.Now() // Get the task groups constraints. diff --git a/scheduler/reconciler/deployments.go b/scheduler/reconciler/deployments.go deleted file mode 100644 index a43fbbedb89..00000000000 --- a/scheduler/reconciler/deployments.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package reconciler - -import "github.com/hashicorp/nomad/nomad/structs" - -// cancelUnneededDeployments cancels any deployment that is not needed. -// A deployment update will be staged for jobs that should stop or have the -// wrong version. Unneeded deployments include: -// 1. Jobs that are marked for stop, but there is a non-terminal deployment. -// 2. Deployments that are active, but referencing a different job version. -// 3. Deployments that are already successful. -// -// returns: old deployment, current deployment and a slice of deployment status -// updates. -func cancelUnneededDeployments(j *structs.Job, d *structs.Deployment) (*structs.Deployment, *structs.Deployment, []*structs.DeploymentStatusUpdate) { - var updates []*structs.DeploymentStatusUpdate - - // If the job is stopped and there is a non-terminal deployment, cancel it - if j.Stopped() { - if d != nil && d.Active() { - updates = append(updates, &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusCancelled, - StatusDescription: structs.DeploymentStatusDescriptionStoppedJob, - }) - } - - // Nothing else to do - return d, nil, updates - } - - if d == nil { - return nil, nil, nil - } - - // Check if the deployment is active and referencing an older job and cancel it - if d.JobCreateIndex != j.CreateIndex || d.JobVersion != j.Version { - if d.Active() { - updates = append(updates, &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusCancelled, - StatusDescription: structs.DeploymentStatusDescriptionNewerJob, - }) - } - - return d, nil, updates - } - - // Clear it as the current deployment if it is successful - if d.Status == structs.DeploymentStatusSuccessful { - return d, nil, updates - } - - return nil, d, updates -} diff --git a/scheduler/reconciler/reconcile_cluster.go b/scheduler/reconciler/reconcile_cluster.go index 5469fa9f890..8ecbc119657 100644 --- a/scheduler/reconciler/reconcile_cluster.go +++ b/scheduler/reconciler/reconcile_cluster.go @@ -277,7 +277,7 @@ func (a *AllocReconciler) Compute() *ReconcileResults { // Create the allocation matrix m := newAllocMatrix(a.jobState.Job, a.jobState.ExistingAllocs) - a.jobState.DeploymentOld, a.jobState.DeploymentCurrent, result.DeploymentUpdates = cancelUnneededDeployments(a.jobState.Job, a.jobState.DeploymentCurrent) + a.jobState.DeploymentOld, a.jobState.DeploymentCurrent, result.DeploymentUpdates = cancelUnneededServiceDeployments(a.jobState.Job, a.jobState.DeploymentCurrent) // If we are just stopping a job we do not need to do anything more than // stopping all running allocs @@ -569,6 +569,57 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe return result, deploymentComplete } +// cancelUnneededServiceDeployments cancels any deployment that is not needed. +// A deployment update will be staged for jobs that should stop or have the +// wrong version. Unneeded deployments include: +// 1. Jobs that are marked for stop, but there is a non-terminal deployment. +// 2. Deployments that are active, but referencing a different job version. +// 3. Deployments that are already successful. +// +// returns: old deployment, current deployment and a slice of deployment status +// updates. +func cancelUnneededServiceDeployments(j *structs.Job, d *structs.Deployment) (*structs.Deployment, *structs.Deployment, []*structs.DeploymentStatusUpdate) { + var updates []*structs.DeploymentStatusUpdate + + // If the job is stopped and there is a non-terminal deployment, cancel it + if j.Stopped() { + if d != nil && d.Active() { + updates = append(updates, &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionStoppedJob, + }) + } + + // Nothing else to do + return d, nil, updates + } + + if d == nil { + return nil, nil, nil + } + + // Check if the deployment is active and referencing an older job and cancel it + if d.JobCreateIndex != j.CreateIndex || d.JobVersion != j.Version { + if d.Active() { + updates = append(updates, &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionNewerJob, + }) + } + + return d, nil, updates + } + + // Clear it as the current deployment if it is successful + if d.Status == structs.DeploymentStatusSuccessful { + return d, nil, updates + } + + return nil, d, updates +} + // setDeploymentStatusAndUpdates sets status for a.deployment if necessary and // returns an array of DeploymentStatusUpdates. func (a *AllocReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, createdDeployment *structs.Deployment) []*structs.DeploymentStatusUpdate { diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 2c516474bc9..9cbdc0f29eb 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -5,8 +5,6 @@ package reconciler import ( "fmt" - "maps" - "math" "slices" "time" @@ -61,25 +59,14 @@ func (nr *NodeReconciler) Compute( // Create the required task groups. required := materializeSystemTaskGroups(job) - // Canary deployments deploy to the TaskGroup.UpdateStrategy.Canary - // percentage of eligible nodes, so we create a mapping of task group name - // to a list of nodes that canaries should be placed on. - canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes) - compatHadExistingDeployment := nr.DeploymentCurrent != nil result := new(NodeReconcileResult) - var deploymentComplete bool for nodeID, allocs := range nodeAllocs { - diff, deploymentCompleteForNode := nr.computeForNode(job, nodeID, eligibleNodes, - notReadyNodes, taintedNodes, canaryNodes[nodeID], canariesPerTG, required, - allocs, terminal, serverSupportsDisconnectedClients) + diff := nr.computeForNode(job, nodeID, eligibleNodes, + notReadyNodes, taintedNodes, required, allocs, terminal, + serverSupportsDisconnectedClients) result.Append(diff) - - deploymentComplete = deploymentCompleteForNode - if deploymentComplete { - break - } } // COMPAT(1.14.0) prevent a new deployment from being created in the case @@ -90,93 +77,9 @@ func (nr *NodeReconciler) Compute( nr.DeploymentCurrent = nil } - nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...) - return result } -// computeCanaryNodes is a helper function that, given required task groups, -// mappings of nodes to their live allocs and terminal allocs, and a map of -// eligible nodes, outputs a map[nodeID] -> map[TG] -> bool which indicates -// which TGs this node is a canary for, and a map[TG] -> int to indicate how -// many total canaries are to be placed for a TG. -func (nr *NodeReconciler) computeCanaryNodes(required map[string]*structs.TaskGroup, - liveAllocs map[string][]*structs.Allocation, terminalAllocs structs.TerminalByNodeByName, - eligibleNodes map[string]*structs.Node) (map[string]map[string]bool, map[string]int) { - - canaryNodes := map[string]map[string]bool{} - eligibleNodesList := slices.Collect(maps.Values(eligibleNodes)) - canariesPerTG := map[string]int{} - - for _, tg := range required { - if tg.Update.IsEmpty() || tg.Update.Canary == 0 { - continue - } - - // round up to the nearest integer - numberOfCanaryNodes := int(math.Ceil(float64(tg.Update.Canary) * float64(len(eligibleNodes)) / 100)) - canariesPerTG[tg.Name] = numberOfCanaryNodes - - // check if there are any live allocations on any nodes that are/were - // canaries. - for nodeID, allocs := range liveAllocs { - for _, a := range allocs { - eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes( - eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID) - } - } - - // check if there are any terminal allocations that were canaries - for nodeID, terminalAlloc := range terminalAllocs { - for _, a := range terminalAlloc { - eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes( - eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID) - } - } - - for i, n := range eligibleNodesList { - if i > numberOfCanaryNodes-1 { - break - } - - if _, ok := canaryNodes[n.ID]; !ok { - canaryNodes[n.ID] = map[string]bool{} - } - - canaryNodes[n.ID][tg.Name] = true - } - } - - return canaryNodes, canariesPerTG -} - -func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOfCanaryNodes int, - a *structs.Allocation, tg *structs.TaskGroup, canaryNodes map[string]map[string]bool, nodeID string) ([]*structs.Node, int) { - - if a.DeploymentStatus == nil || a.DeploymentStatus.Canary == false || - nr.DeploymentCurrent == nil { - return nodesList, numberOfCanaryNodes - } - - nodes := nodesList - numberOfCanaries := numberOfCanaryNodes - if a.TaskGroup == tg.Name { - if _, ok := canaryNodes[nodeID]; !ok { - canaryNodes[nodeID] = map[string]bool{} - } - canaryNodes[nodeID][tg.Name] = true - - // this node should no longer be considered when searching - // for canary nodes - numberOfCanaries -= 1 - nodes = slices.DeleteFunc( - nodes, - func(n *structs.Node) bool { return n.ID == nodeID }, - ) - } - return nodes, numberOfCanaries -} - // computeForNode is used to do a set difference between the target // allocations and the existing allocations for a particular node. This returns // 8 sets of results: @@ -191,27 +94,22 @@ func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOf // 8. those that may still be running on a node that has resumed reconnected. // // This method mutates the NodeReconciler fields, and returns a new -// NodeReconcilerResult object and a boolean to indicate wither the deployment -// is complete or not. +// NodeReconcilerResult object. func (nr *NodeReconciler) computeForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining taintedNodes map[string]*structs.Node, // nodes which are down (by node id) - canaryNode map[string]bool, // indicates whether this node is a canary node for tg - canariesPerTG map[string]int, // indicates how many canary placements we expect per tg required map[string]*structs.TaskGroup, // set of allocations that must exist liveAllocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic -) (*NodeReconcileResult, bool) { +) *NodeReconcileResult { result := new(NodeReconcileResult) // cancel deployments that aren't needed anymore - var deploymentUpdates []*structs.DeploymentStatusUpdate - nr.DeploymentOld, nr.DeploymentCurrent, deploymentUpdates = cancelUnneededDeployments(job, nr.DeploymentCurrent) - nr.DeploymentUpdates = append(nr.DeploymentUpdates, deploymentUpdates...) + nr.cancelUnnededSystemDeployments(job) // set deployment paused and failed, if we currently have a deployment var deploymentPaused, deploymentFailed bool @@ -225,9 +123,6 @@ func (nr *NodeReconciler) computeForNode( deploymentFailed = nr.DeploymentCurrent.Status == structs.DeploymentStatusFailed } - // Track desired total and desired canaries across all loops - desiredCanaries := map[string]int{} - // Track whether we're during a canary update isCanarying := map[string]bool{} @@ -255,7 +150,7 @@ func (nr *NodeReconciler) computeForNode( // deployment var dstate = new(structs.DeploymentState) if nr.DeploymentCurrent != nil { - dstate, _ = nr.DeploymentCurrent.TaskGroups[tg.Name] + dstate = nr.DeploymentCurrent.TaskGroups[tg.Name] } supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients) @@ -370,8 +265,20 @@ func (nr *NodeReconciler) computeForNode( } // For an existing allocation, if the nodeID is no longer - // eligible, the diff should be ignored + // eligible, the diff should be ignored unless the job + // definition has been updated. If the definition has been + // updated, stop the allocation. if _, ineligible := notReadyNodes[nodeID]; ineligible { + if job.JobModifyIndex != alloc.Job.JobModifyIndex { + result.Stop = append(result.Stop, AllocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + }) + + continue + } + goto IGNORE } @@ -388,17 +295,14 @@ func (nr *NodeReconciler) computeForNode( // If the definition is updated we need to update if job.JobModifyIndex != alloc.Job.JobModifyIndex { - if canariesPerTG[tg.Name] > 0 && dstate != nil && !dstate.Promoted { + if !tg.Update.IsEmpty() && tg.Update.Canary > 0 && dstate != nil && !dstate.Promoted { isCanarying[tg.Name] = true - if canaryNode[tg.Name] { - result.Update = append(result.Update, AllocTuple{ - Name: name, - TaskGroup: tg, - Alloc: alloc, - Canary: true, - }) - desiredCanaries[tg.Name] += 1 - } + result.Update = append(result.Update, AllocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + Canary: true, + }) } else { result.Update = append(result.Update, AllocTuple{ Name: name, @@ -419,13 +323,8 @@ func (nr *NodeReconciler) computeForNode( }) } - // as we iterate over require groups, we'll keep track of whether the - // deployment is complete or not - deploymentComplete := false - // Scan the required groups for name, tg := range required { - // populate deployment state for this task group var dstate = new(structs.DeploymentState) var existingDeployment bool @@ -440,16 +339,10 @@ func (nr *NodeReconciler) computeForNode( dstate.AutoPromote = tg.Update.AutoPromote dstate.ProgressDeadline = tg.Update.ProgressDeadline } - dstate.DesiredTotal = len(eligibleNodes) - } - - if isCanarying[tg.Name] && !dstate.Promoted { - dstate.DesiredCanaries = canariesPerTG[tg.Name] } // Check for an existing allocation if _, ok := existing[name]; !ok { - // Check for a terminal sysbatch allocation, which should be not placed // again unless the job has been updated. if job.Type == structs.JobTypeSysBatch { @@ -494,6 +387,11 @@ func (nr *NodeReconciler) computeForNode( Alloc: termOnNode, } + // If the terminal allocation was a canary, mark it as such. + if termOnNode != nil && termOnNode.DeploymentStatus != nil && termOnNode.DeploymentStatus.Canary { + allocTuple.Canary = true + } + // If the new allocation isn't annotated with a previous allocation // or if the previous allocation isn't from the same node then we // annotate the allocTuple with a new Allocation @@ -506,12 +404,10 @@ func (nr *NodeReconciler) computeForNode( // check if deployment is place ready or complete deploymentPlaceReady := !deploymentPaused && !deploymentFailed - deploymentComplete = nr.isDeploymentComplete(tg.Name, result, isCanarying[tg.Name]) // check if perhaps there's nothing else to do for this TG if existingDeployment || tg.Update.IsEmpty() || - (dstate.DesiredTotal == 0 && dstate.DesiredCanaries == 0) || !deploymentPlaceReady { continue } @@ -527,7 +423,54 @@ func (nr *NodeReconciler) computeForNode( } } - return result, deploymentComplete + return result +} + +// cancelUnneededServiceDeployments cancels any deployment that is not needed. +// A deployment update will be staged for jobs that should stop or have the +// wrong version. Unneeded deployments include: +// 1. Jobs that are marked for stop, but there is a non-terminal deployment. +// 2. Deployments that are active, but referencing a different job version. +// 3. Deployments that are already successful. +func (nr *NodeReconciler) cancelUnnededSystemDeployments(j *structs.Job) { + // If the job is stopped and there is a non-terminal deployment, cancel it + if j.Stopped() { + if nr.DeploymentCurrent != nil && nr.DeploymentCurrent.Active() { + nr.DeploymentUpdates = append(nr.DeploymentUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionStoppedJob, + }) + } + + // Nothing else to do + return + } + + if nr.DeploymentCurrent == nil { + return + } + + // Check if the deployment is active and referencing an older job and cancel it + if nr.DeploymentCurrent.JobCreateIndex != j.CreateIndex || nr.DeploymentCurrent.JobVersion != j.Version { + if nr.DeploymentCurrent.Active() { + nr.DeploymentUpdates = append(nr.DeploymentUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionNewerJob, + }) + + nr.DeploymentOld = nr.DeploymentCurrent + nr.DeploymentCurrent = nil + return + } + } + + // Clear it as the current deployment if it is successful + if nr.DeploymentCurrent.Status == structs.DeploymentStatusSuccessful { + nr.DeploymentOld = nr.DeploymentCurrent + nr.DeploymentCurrent = nil + } } func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGroup, @@ -538,6 +481,12 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro return } + // if there's an old deployment for the same job version as we're + // processing, never create a new one + if nr.DeploymentOld != nil && nr.DeploymentOld.JobVersion == job.Version { + return + } + updatingSpec := updates != 0 hadRunning := false @@ -545,9 +494,7 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro return a.Job.ID == job.ID && a.Job.Version == job.Version && a.Job.CreateIndex == job.CreateIndex } - if slices.ContainsFunc(allocs, func(alloc *structs.Allocation) bool { - return hadRunningCondition(alloc) - }) { + if slices.ContainsFunc(allocs, hadRunningCondition) { nr.compatHasSameVersionAllocs = true hadRunning = true } @@ -586,74 +533,6 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate } -func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult, isCanarying bool) bool { - complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0 - - if !complete || nr.DeploymentCurrent == nil || isCanarying { - return false - } - - // ensure everything is healthy - if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok { - if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs - complete = false - } - } - - return complete -} - -func (nr *NodeReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate { - statusUpdates := []*structs.DeploymentStatusUpdate{} - - if d := nr.DeploymentCurrent; d != nil { - - // Deployments that require promotion should have appropriate status set - // immediately, no matter their completness. - if d.RequiresPromotion() { - if d.HasAutoPromote() { - d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion - } else { - d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion - } - return statusUpdates - } - - // Mark the deployment as complete if possible - if deploymentComplete { - if job.IsMultiregion() { - // the unblocking/successful states come after blocked, so we - // need to make sure we don't revert those states - if d.Status != structs.DeploymentStatusUnblocking && - d.Status != structs.DeploymentStatusSuccessful { - statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ - DeploymentID: nr.DeploymentCurrent.ID, - Status: structs.DeploymentStatusBlocked, - StatusDescription: structs.DeploymentStatusDescriptionBlocked, - }) - } - } else { - statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ - DeploymentID: nr.DeploymentCurrent.ID, - Status: structs.DeploymentStatusSuccessful, - StatusDescription: structs.DeploymentStatusDescriptionSuccessful, - }) - } - } - - // Mark the deployment as pending since its state is now computed. - if d.Status == structs.DeploymentStatusInitializing { - statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ - DeploymentID: nr.DeploymentCurrent.ID, - Status: structs.DeploymentStatusPending, - StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer, - }) - } - } - - return statusUpdates -} - // materializeSystemTaskGroups is used to materialize all the task groups // a system or sysbatch job requires. func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index 724b05e9776..5c370576077 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -73,7 +73,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.computeForNode(job, "node1", eligible, nil, tainted, nil, nil, required, live, terminal, true) + diff := nr.computeForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, place: 1}, diff) if len(diff.Ignore) > 0 { @@ -96,7 +96,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.computeForNode(job, "node1", eligible, nil, tainted, nil, nil, required, live, terminal, true) + diff := nr.computeForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) assertDiffCount(t, diffResultCount{update: 1, place: 1}, diff) }) @@ -158,9 +158,9 @@ func TestDiffSystemAllocsForNode_Placements(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { nr := NewNodeReconciler(nil) - diff, _ := nr.computeForNode( + diff := nr.computeForNode( job, tc.nodeID, eligible, nil, - tainted, nil, nil, required, allocsForNode, terminal, true) + tainted, required, allocsForNode, terminal, true) assertDiffCount(t, tc.expected, diff) }) @@ -217,8 +217,8 @@ func TestDiffSystemAllocsForNode_Stops(t *testing.T) { terminal := structs.TerminalByNodeByName{} nr := NewNodeReconciler(nil) - diff, _ := nr.computeForNode( - job, node.ID, eligible, nil, tainted, nil, nil, required, allocs, terminal, true) + diff := nr.computeForNode( + job, node.ID, eligible, nil, tainted, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, stop: 1, update: 1}, diff) if len(diff.Update) > 0 { @@ -287,8 +287,8 @@ func TestDiffSystemAllocsForNode_IneligibleNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.computeForNode( - job, tc.nodeID, eligible, ineligible, tainted, nil, nil, + diff := nr.computeForNode( + job, tc.nodeID, eligible, ineligible, tainted, required, []*structs.Allocation{alloc}, terminal, true, ) assertDiffCount(t, tc.expect, diff) @@ -344,9 +344,9 @@ func TestDiffSystemAllocsForNode_DrainingNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.computeForNode( + diff := nr.computeForNode( job, drainNode.ID, map[string]*structs.Node{}, nil, - tainted, nil, nil, required, allocs, terminal, true) + tainted, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{migrate: 1, ignore: 1}, diff) if len(diff.Migrate) > 0 { @@ -396,9 +396,9 @@ func TestDiffSystemAllocsForNode_LostNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.computeForNode( + diff := nr.computeForNode( job, deadNode.ID, map[string]*structs.Node{}, nil, - tainted, nil, nil, required, allocs, terminal, true) + tainted, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{lost: 2}, diff) if len(diff.Migrate) > 0 { @@ -522,8 +522,8 @@ func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { } nr := NewNodeReconciler(nil) - got, _ := nr.computeForNode( - job, tc.node.ID, eligibleNodes, nil, taintedNodes, nil, nil, + got := nr.computeForNode( + job, tc.node.ID, eligibleNodes, nil, taintedNodes, required, []*structs.Allocation{alloc}, terminal, true, ) assertDiffCount(t, tc.expect, got) @@ -781,430 +781,3 @@ func TestNodeDeployments(t *testing.T) { }) } } - -func Test_computeCanaryNodes(t *testing.T) { - ci.Parallel(t) - - // generate an odd number of nodes - fiveEligibleNodes := map[string]*structs.Node{} - // name them so we can refer to their names while testing pre-existing - // canary allocs - fiveEligibleNodeNames := []string{"node1", "node2", "node3", "node4", "node5"} - for _, name := range fiveEligibleNodeNames { - node := mock.Node() - node.ID = name - fiveEligibleNodes[name] = node - } - - // generate an even number of nodes - fourEligibleNodes := map[string]*structs.Node{} - for range 4 { - nodeID := uuid.Generate() - node := mock.Node() - node.ID = nodeID - fourEligibleNodes[nodeID] = node - } - - testCases := []struct { - name string - nodes map[string]*structs.Node - liveAllocs map[string][]*structs.Allocation - terminalAllocs structs.TerminalByNodeByName - required map[string]*structs.TaskGroup - existingDeployment *structs.Deployment - expectedCanaryNodes map[string]int // number of nodes per tg - expectedCanaryNodeID map[string]string // sometimes we want to make sure a particular node ID is a canary - }{ - { - name: "no required task groups", - nodes: fourEligibleNodes, - liveAllocs: nil, - terminalAllocs: nil, - required: nil, - existingDeployment: nil, - expectedCanaryNodes: map[string]int{}, - expectedCanaryNodeID: nil, - }, - { - name: "one task group with no update strategy", - nodes: fourEligibleNodes, - liveAllocs: nil, - terminalAllocs: nil, - required: map[string]*structs.TaskGroup{ - "foo": { - Name: "foo", - }}, - existingDeployment: nil, - expectedCanaryNodes: map[string]int{}, - expectedCanaryNodeID: nil, - }, - { - name: "one task group with 33% canary deployment", - nodes: fourEligibleNodes, - liveAllocs: nil, - terminalAllocs: nil, - required: map[string]*structs.TaskGroup{ - "foo": { - Name: "foo", - Update: &structs.UpdateStrategy{ - Canary: 33, - MaxParallel: 1, // otherwise the update strategy will be considered nil - }, - }, - }, - existingDeployment: nil, - expectedCanaryNodes: map[string]int{ - "foo": 2, // we always round up - }, - expectedCanaryNodeID: nil, - }, - { - name: "one task group with 100% canary deployment, four nodes", - nodes: fourEligibleNodes, - liveAllocs: nil, - terminalAllocs: nil, - required: map[string]*structs.TaskGroup{ - "foo": { - Name: "foo", - Update: &structs.UpdateStrategy{ - Canary: 100, - MaxParallel: 1, // otherwise the update strategy will be considered nil - }, - }, - }, - existingDeployment: nil, - expectedCanaryNodes: map[string]int{ - "foo": 4, - }, - expectedCanaryNodeID: nil, - }, - { - name: "one task group with 50% canary deployment, even nodes", - nodes: fourEligibleNodes, - liveAllocs: nil, - terminalAllocs: nil, - required: map[string]*structs.TaskGroup{ - "foo": { - Name: "foo", - Update: &structs.UpdateStrategy{ - Canary: 50, - MaxParallel: 1, // otherwise the update strategy will be considered nil - }, - }, - }, - existingDeployment: nil, - expectedCanaryNodes: map[string]int{ - "foo": 2, - }, - expectedCanaryNodeID: nil, - }, - { - name: "two task groups: one with 50% canary deploy, second one with 2% canary deploy, pre-existing canary alloc", - nodes: fiveEligibleNodes, - liveAllocs: map[string][]*structs.Allocation{ - "foo": {mock.Alloc()}, // should be disregarded since it's not one of our nodes - fiveEligibleNodeNames[0]: { - {DeploymentStatus: nil}, - {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: false}}, - {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, TaskGroup: "foo"}, - }, - fiveEligibleNodeNames[1]: { - {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, TaskGroup: "bar"}, - }, - }, - terminalAllocs: structs.TerminalByNodeByName{ - fiveEligibleNodeNames[2]: map[string]*structs.Allocation{ - "foo": { - DeploymentStatus: &structs.AllocDeploymentStatus{ - Canary: true, - }, - TaskGroup: "foo", - }, - }, - }, - required: map[string]*structs.TaskGroup{ - "foo": { - Name: "foo", - Update: &structs.UpdateStrategy{ - Canary: 50, - MaxParallel: 1, // otherwise the update strategy will be considered nil - }, - }, - "bar": { - Name: "bar", - Update: &structs.UpdateStrategy{ - Canary: 2, - MaxParallel: 1, // otherwise the update strategy will be considered nil - }, - }, - }, - existingDeployment: structs.NewDeployment(mock.SystemJob(), 100, time.Now().Unix()), - expectedCanaryNodes: map[string]int{ - "foo": 3, // we always round up - "bar": 1, // we always round up - }, - expectedCanaryNodeID: map[string]string{ - fiveEligibleNodeNames[0]: "foo", - fiveEligibleNodeNames[1]: "bar", - fiveEligibleNodeNames[2]: "foo", - }, - }, - { - name: "task group with 100% canary deploy, 1 eligible node", - nodes: map[string]*structs.Node{"foo": mock.Node()}, - liveAllocs: nil, - terminalAllocs: nil, - required: map[string]*structs.TaskGroup{ - "foo": { - Name: "foo", - Update: &structs.UpdateStrategy{ - Canary: 100, - MaxParallel: 1, - }, - }, - }, - existingDeployment: nil, - expectedCanaryNodes: map[string]int{ - "foo": 1, - }, - expectedCanaryNodeID: nil, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - nr := NewNodeReconciler(tc.existingDeployment) - canaryNodes, canariesPerTG := nr.computeCanaryNodes(tc.required, tc.liveAllocs, tc.terminalAllocs, tc.nodes) - must.Eq(t, tc.expectedCanaryNodes, canariesPerTG) - if tc.liveAllocs != nil { - for nodeID, tgName := range tc.expectedCanaryNodeID { - must.True(t, canaryNodes[nodeID][tgName]) - } - } - }) - } -} - -// Tests the reconciler creates new canaries when the job changes -func TestNodeReconciler_NewCanaries(t *testing.T) { - ci.Parallel(t) - - job := mock.SystemJob() - job.TaskGroups[0].Update = &structs.UpdateStrategy{ - Canary: 20, // deploy to 20% of eligible nodes - MaxParallel: 1, // otherwise the update strategy will be considered nil - } - job.JobModifyIndex = 1 - - // Create 10 nodes - nodes := []*structs.Node{} - for i := range 10 { - node := mock.Node() - node.ID = fmt.Sprintf("node_%d", i) - node.Name = fmt.Sprintf("node_%d", i) - nodes = append(nodes, node) - } - - allocs := []*structs.Allocation{} - for _, n := range nodes { - a := mock.Alloc() - a.Job = job - a.Name = "my-job.web[0]" - a.NodeID = n.ID - a.NodeName = n.Name - a.TaskGroup = job.TaskGroups[0].Name - - allocs = append(allocs, a) - } - - // bump the job version up - newJobVersion := job.Copy() - newJobVersion.Version = job.Version + 1 - newJobVersion.JobModifyIndex = job.JobModifyIndex + 1 - - // bump the version and add a new TG - newJobWithNewTaskGroup := newJobVersion.Copy() - newJobWithNewTaskGroup.Version = newJobVersion.Version + 1 - newJobWithNewTaskGroup.JobModifyIndex = newJobVersion.JobModifyIndex + 1 - tg := newJobVersion.TaskGroups[0].Copy() - tg.Name = "other" - tg.Update = &structs.UpdateStrategy{MaxParallel: 1} - newJobWithNewTaskGroup.TaskGroups = append(newJobWithNewTaskGroup.TaskGroups, tg) - - // new job with no previous allocs and no canary update strategy - jobWithNoUpdates := mock.SystemJob() - jobWithNoUpdates.Name = "i-am-a-brand-new-job" - jobWithNoUpdates.TaskGroups[0].Name = "i-am-a-brand-new-tg" - jobWithNoUpdates.TaskGroups[0].Update = structs.DefaultUpdateStrategy - - // additional test to make sure there are no canaries being placed for v0 - // jobs - freshJob := mock.SystemJob() - freshJob.TaskGroups[0].Update = structs.DefaultUpdateStrategy - freshNodes := []*structs.Node{} - for range 2 { - node := mock.Node() - freshNodes = append(freshNodes, node) - } - - testCases := []struct { - name string - job *structs.Job - nodes []*structs.Node - existingDeployment *structs.Deployment - expectedDesiredCanaries map[string]int - expectedDesiredTotal map[string]int - expectedDeploymentStatusDescription string - expectedPlaceCount int - expectedUpdateCount int - }{ - { - name: "new job version", - job: newJobVersion, - nodes: nodes, - existingDeployment: nil, - expectedDesiredCanaries: map[string]int{newJobVersion.TaskGroups[0].Name: 2}, - expectedDesiredTotal: map[string]int{newJobVersion.TaskGroups[0].Name: 10}, - expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, - expectedPlaceCount: 0, - expectedUpdateCount: 2, - }, - { - name: "new job version with a new TG (no existing allocs, no canaries)", - job: newJobWithNewTaskGroup, - nodes: nodes, - existingDeployment: nil, - expectedDesiredCanaries: map[string]int{ - newJobWithNewTaskGroup.TaskGroups[0].Name: 2, - newJobWithNewTaskGroup.TaskGroups[1].Name: 0, - }, - expectedDesiredTotal: map[string]int{ - newJobWithNewTaskGroup.TaskGroups[0].Name: 10, - newJobWithNewTaskGroup.TaskGroups[1].Name: 10, - }, - expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, - expectedPlaceCount: 10, - expectedUpdateCount: 2, - }, - { - name: "brand new job with no update block", - job: jobWithNoUpdates, - nodes: nodes, - existingDeployment: nil, - expectedDesiredCanaries: map[string]int{ - jobWithNoUpdates.TaskGroups[0].Name: 0, - }, - expectedDesiredTotal: map[string]int{ - jobWithNoUpdates.TaskGroups[0].Name: 10, - }, - expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunning, - expectedPlaceCount: 10, - expectedUpdateCount: 0, - }, - { - name: "fresh job with no updates, empty nodes", - job: freshJob, - nodes: freshNodes, - existingDeployment: nil, - expectedDesiredCanaries: map[string]int{ - freshJob.TaskGroups[0].Name: 0, - }, - expectedDesiredTotal: map[string]int{ - freshJob.TaskGroups[0].Name: 2, - }, - expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunning, - expectedPlaceCount: 2, - expectedUpdateCount: 0, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - reconciler := NewNodeReconciler(tc.existingDeployment) - r := reconciler.Compute(tc.job, tc.nodes, nil, nil, allocs, nil, false) - must.NotNil(t, reconciler.DeploymentCurrent) - must.Eq(t, tc.expectedPlaceCount, len(r.Place), must.Sprint("incorrect amount of r.Place")) - must.Eq(t, tc.expectedUpdateCount, len(r.Update), must.Sprint("incorrect amount of r.Update")) - must.Eq(t, tc.expectedDeploymentStatusDescription, reconciler.DeploymentCurrent.StatusDescription) - for _, tg := range tc.job.TaskGroups { - must.Eq(t, tc.expectedDesiredCanaries[tg.Name], - reconciler.DeploymentCurrent.TaskGroups[tg.Name].DesiredCanaries, - must.Sprintf("incorrect number of DesiredCanaries for %s", tg.Name)) - must.Eq(t, tc.expectedDesiredTotal[tg.Name], - reconciler.DeploymentCurrent.TaskGroups[tg.Name].DesiredTotal, - must.Sprintf("incorrect number of DesiredTotal for %s", tg.Name)) - } - }) - } -} - -// Tests the reconciler correctly promotes canaries -func TestNodeReconciler_CanaryPromotion(t *testing.T) { - ci.Parallel(t) - - job := mock.SystemJob() - job.TaskGroups[0].Update = &structs.UpdateStrategy{ - Canary: 20, // deploy to 20% of eligible nodes - MaxParallel: 1, // otherwise the update strategy will be considered nil - } - job.JobModifyIndex = 1 - - // bump the job version up - newJobVersion := job.Copy() - newJobVersion.Version = job.Version + 1 - newJobVersion.JobModifyIndex = job.JobModifyIndex + 1 - - // Create 5 nodes - nodes := []*structs.Node{} - for i := range 5 { - node := mock.Node() - node.ID = fmt.Sprintf("node_%d", i) - node.Name = fmt.Sprintf("node_%d", i) - nodes = append(nodes, node) - } - - // Create v0 allocs on 2 of the nodes, and v1 (canary) allocs on 3 nodes - allocs := []*structs.Allocation{} - for _, n := range nodes[0:3] { - a := mock.Alloc() - a.Job = job - a.Name = "my-job.web[0]" - a.NodeID = n.ID - a.NodeName = n.Name - a.TaskGroup = job.TaskGroups[0].Name - - allocs = append(allocs, a) - } - for _, n := range nodes[3:] { - a := mock.Alloc() - a.Job = job - a.Name = "my-job.web[0]" - a.NodeID = n.ID - a.NodeName = n.Name - a.TaskGroup = job.TaskGroups[0].Name - a.DeploymentStatus = &structs.AllocDeploymentStatus{Canary: true} - a.Job.Version = newJobVersion.Version - a.Job.JobModifyIndex = newJobVersion.JobModifyIndex - - allocs = append(allocs, a) - } - - // promote canaries - deployment := structs.NewDeployment(newJobVersion, 10, time.Now().Unix()) - deployment.TaskGroups[newJobVersion.TaskGroups[0].Name] = &structs.DeploymentState{ - Promoted: true, - HealthyAllocs: 5, - DesiredTotal: 5, - DesiredCanaries: 0, - } - - // reconcile - reconciler := NewNodeReconciler(deployment) - reconciler.Compute(newJobVersion, nodes, nil, nil, allocs, nil, false) - - must.NotNil(t, reconciler.DeploymentCurrent) - must.Eq(t, 5, reconciler.DeploymentCurrent.TaskGroups[newJobVersion.TaskGroups[0].Name].DesiredTotal) - must.SliceContainsFunc(t, reconciler.DeploymentUpdates, structs.DeploymentStatusSuccessful, - func(a *structs.DeploymentStatusUpdate, b string) bool { return a.Status == b }, - ) -} diff --git a/scheduler/scheduler_sysbatch.go b/scheduler/scheduler_sysbatch.go new file mode 100644 index 00000000000..c8c9be7f3d3 --- /dev/null +++ b/scheduler/scheduler_sysbatch.go @@ -0,0 +1,546 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package scheduler + +import ( + "fmt" + "runtime/debug" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler/feasible" + "github.com/hashicorp/nomad/scheduler/reconciler" + sstructs "github.com/hashicorp/nomad/scheduler/structs" +) + +const ( + // maxSysBatchScheduleAttempts is used to limit the number of times we will + // attempt to schedule if we continue to hit conflicts for sysbatch jobs. + maxSysBatchScheduleAttempts = 2 +) + +// SysBatchScheduler is used for 'sysbatch' jobs. This scheduler is designed for +// jobs that should be run on every client. The 'system' mode will ensure those +// jobs continuously run regardless of successful task exits, whereas 'sysbatch' +// considers the task complete on success. +type SysBatchScheduler struct { + logger log.Logger + eventsCh chan<- interface{} + state sstructs.State + planner sstructs.Planner + + eval *structs.Evaluation + job *structs.Job + plan *structs.Plan + planResult *structs.PlanResult + ctx *feasible.EvalContext + stack *feasible.SystemStack + + nodes []*structs.Node + notReadyNodes map[string]struct{} + nodesByDC map[string]int + + limitReached bool + + failedTGAllocs map[string]*structs.AllocMetric + queuedAllocs map[string]int + planAnnotations *structs.PlanAnnotations +} + +func NewSysBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { + return &SysBatchScheduler{ + logger: logger.Named("sysbatch_sched"), + eventsCh: eventsCh, + state: state, + planner: planner, + } +} + +// Process is used to handle a single evaluation. +func (s *SysBatchScheduler) Process(eval *structs.Evaluation) (err error) { + + defer func() { + if r := recover(); r != nil { + s.logger.Error("processing eval panicked scheduler - please report this as a bug!", "eval_id", eval.ID, "error", r, "stack_trace", string(debug.Stack())) + err = fmt.Errorf("failed to process eval: %v", r) + } + }() + + // Store the evaluation + s.eval = eval + + // Update our logger with the eval's information + s.logger = s.logger.With("eval_id", eval.ID, "job_id", eval.JobID, "namespace", eval.Namespace) + + // Verify the evaluation trigger reason is understood + if !s.canHandle(eval.TriggeredBy) { + desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) + return setStatus(s.logger, s.planner, s.eval, nil, nil, + s.failedTGAllocs, s.planAnnotations, structs.EvalStatusFailed, desc, + s.queuedAllocs, "") + } + + limit := maxSysBatchScheduleAttempts + + // Retry up to the maxSystemScheduleAttempts and reset if progress is made. + progress := func() bool { return progressMade(s.planResult) } + if err := retryMax(limit, s.process, progress); err != nil { + if statusErr, ok := err.(*SetStatusError); ok { + return setStatus(s.logger, s.planner, s.eval, nil, nil, + s.failedTGAllocs, s.planAnnotations, statusErr.EvalStatus, err.Error(), + s.queuedAllocs, "") + } + return err + } + + // Update the status to complete + return setStatus(s.logger, s.planner, s.eval, nil, nil, + s.failedTGAllocs, s.planAnnotations, structs.EvalStatusComplete, "", + s.queuedAllocs, "") +} + +// process is wrapped in retryMax to iteratively run the handler until we have no +// further work or we've made the maximum number of attempts. +func (s *SysBatchScheduler) process() (bool, error) { + // Lookup the Job by ID + var err error + ws := memdb.NewWatchSet() + s.job, err = s.state.JobByID(ws, s.eval.Namespace, s.eval.JobID) + if err != nil { + return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) + } + + numTaskGroups := 0 + if !s.job.Stopped() { + numTaskGroups = len(s.job.TaskGroups) + } + s.queuedAllocs = make(map[string]int, numTaskGroups) + + // Get the ready nodes in the required datacenters + if !s.job.Stopped() { + s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCsAndPool( + s.state, s.job.Datacenters, s.job.NodePool) + if err != nil { + return false, fmt.Errorf("failed to get ready nodes: %v", err) + } + } + + // Create a plan + s.plan = s.eval.MakePlan(s.job) + + // Reset the failed allocations + s.failedTGAllocs = nil + + // Create an evaluation context + s.ctx = feasible.NewEvalContext(s.eventsCh, s.state, s.plan, s.logger) + + // Construct the placement stack + s.stack = feasible.NewSystemStack(true, s.ctx) + if !s.job.Stopped() { + s.setJob(s.job) + } + + // Compute the target job allocations + if err := s.computeJobAllocs(); err != nil { + s.logger.Error("failed to compute job allocations", "error", err) + return false, err + } + + // If the plan is a no-op, we can bail. If AnnotatePlan is set submit the plan + // anyways to get the annotations. + if s.plan.IsNoOp() && !s.eval.AnnotatePlan { + return true, nil + } + + // Submit the plan + if s.eval.AnnotatePlan { + s.plan.Annotations = s.planAnnotations + } + result, newState, err := s.planner.SubmitPlan(s.plan) + s.planResult = result + if err != nil { + return false, err + } + + // Decrement the number of allocations pending per task group based on the + // number of allocations successfully placed + adjustQueuedAllocations(s.logger, result, s.queuedAllocs) + + // If we got a state refresh, try again since we have stale data + if newState != nil { + s.logger.Debug("refresh forced") + s.state = newState + return false, nil + } + + // Try again if the plan was not fully committed, potential conflict + fullCommit, expected, actual := result.FullCommit(s.plan) + if !fullCommit { + s.logger.Debug("plan didn't fully commit", "attempted", expected, "placed", actual) + return false, nil + } + + // Success! + return true, nil +} + +// setJob updates the stack with the given job and job's node pool scheduler +// configuration. +func (s *SysBatchScheduler) setJob(job *structs.Job) error { + // Fetch node pool and global scheduler configuration to determine how to + // configure the scheduler. + pool, err := s.state.NodePoolByName(nil, job.NodePool) + if err != nil { + return fmt.Errorf("failed to get job node pool %q: %v", job.NodePool, err) + } + + _, schedConfig, err := s.state.SchedulerConfig() + if err != nil { + return fmt.Errorf("failed to get scheduler configuration: %v", err) + } + + s.stack.SetJob(job) + s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool)) + return nil +} + +// computeJobAllocs is used to reconcile differences between the job, +// existing allocations and node status to update the allocations. +func (s *SysBatchScheduler) computeJobAllocs() error { + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + allocs, err := s.state.AllocsByJob(ws, s.eval.Namespace, s.eval.JobID, true) + if err != nil { + return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err) + } + + // Determine the tainted nodes containing job allocs + tainted, err := taintedNodes(s.state, allocs) + if err != nil { + return fmt.Errorf("failed to get tainted nodes for job '%s': %v", s.eval.JobID, err) + } + + // Update the allocations which are in pending/running state on tainted + // nodes to lost. + updateNonTerminalAllocsToLost(s.plan, tainted, allocs) + + // Split out terminal allocations + live, term := structs.SplitTerminalAllocs(allocs) + + // Diff the required and existing allocations + nr := reconciler.NewNodeReconciler(nil) + r := nr.Compute(s.job, s.nodes, s.notReadyNodes, tainted, live, term, + s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true)) + if s.logger.IsDebug() { + s.logger.Debug("reconciled current state with desired state", r.Fields()...) + } + + // Add all the allocs to stop + for _, e := range r.Stop { + s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocNotNeeded, "", "") + } + + // Add all the allocs to migrate + for _, e := range r.Migrate { + s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocNodeTainted, "", "") + } + + // Lost allocations should be transitioned to desired status stop and client + // status lost. + for _, e := range r.Lost { + s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocLost, structs.AllocClientStatusLost, "") + } + + for _, e := range r.Disconnecting { + s.plan.AppendUnknownAlloc(e.Alloc) + } + + allocExistsForTaskGroup := map[string]bool{} + // Attempt to do the upgrades in place. + // Reconnecting allocations need to be updated to persists alloc state + // changes. + updates := make([]reconciler.AllocTuple, 0, len(r.Update)+len(r.Reconnecting)) + updates = append(updates, r.Update...) + updates = append(updates, r.Reconnecting...) + destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, updates) + r.Update = destructiveUpdates + + for _, inplaceUpdate := range inplaceUpdates { + allocExistsForTaskGroup[inplaceUpdate.TaskGroup.Name] = true + } + + s.planAnnotations = &structs.PlanAnnotations{ + DesiredTGUpdates: desiredUpdates(r, inplaceUpdates, destructiveUpdates), + } + + // Treat non in-place updates as an eviction and new placement, which will + // be limited by max_parallel + s.limitReached = evictAndPlace(s.ctx, s.job, r, sstructs.StatusAllocUpdating) + + // Nothing remaining to do if placement is not required + if len(r.Place) == 0 { + if !s.job.Stopped() { + for _, tg := range s.job.TaskGroups { + s.queuedAllocs[tg.Name] = 0 + } + } + return nil + } + + // Record the number of allocations that needs to be placed per Task Group + for _, allocTuple := range r.Place { + s.queuedAllocs[allocTuple.TaskGroup.Name] += 1 + } + + for _, ignoredAlloc := range r.Ignore { + allocExistsForTaskGroup[ignoredAlloc.TaskGroup.Name] = true + } + + // Compute the placements + return s.computePlacements(r.Place, allocExistsForTaskGroup) +} + +// computePlacements computes placements for allocations +func (s *SysBatchScheduler) computePlacements(place []reconciler.AllocTuple, existingByTaskGroup map[string]bool) error { + nodeByID := make(map[string]*structs.Node, len(s.nodes)) + for _, node := range s.nodes { + nodeByID[node.ID] = node + } + + // track node filtering, to only report an error if all nodes have been filtered + var filteredMetrics map[string]*structs.AllocMetric + + nodes := make([]*structs.Node, 1) + for _, missing := range place { + tgName := missing.TaskGroup.Name + + node, ok := nodeByID[missing.Alloc.NodeID] + if !ok { + s.logger.Debug("could not find node", "node", missing.Alloc.NodeID) + continue + } + + // Update the set of placement nodes + nodes[0] = node + s.stack.SetNodes(nodes) + + // Attempt to match the task group + option := s.stack.Select(missing.TaskGroup, &feasible.SelectOptions{AllocName: missing.Name}) + + if option == nil { + // If the task can't be placed on this node, update reporting data + // and continue to short circuit the loop + + // If this node was filtered because of constraint + // mismatches and we couldn't create an allocation then + // decrement queuedAllocs for that task group. + if s.ctx.Metrics().NodesFiltered > 0 { + queued := s.queuedAllocs[tgName] - 1 + s.queuedAllocs[tgName] = queued + + if filteredMetrics == nil { + filteredMetrics = map[string]*structs.AllocMetric{} + } + filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics()) + + // If no tasks have been placed and there aren't any previously + // existing (ignored or updated) tasks on the node, mark the alloc as failed to be placed + // if queued <= 0 && !existingByTaskGroup[tgName] { + if queued <= 0 && !existingByTaskGroup[tgName] { + if s.failedTGAllocs == nil { + s.failedTGAllocs = make(map[string]*structs.AllocMetric) + } + s.failedTGAllocs[tgName] = filteredMetrics[tgName] + } + + // If we are annotating the plan, then decrement the desired + // placements based on whether the node meets the constraints + if s.planAnnotations != nil && + s.planAnnotations.DesiredTGUpdates != nil { + s.planAnnotations.DesiredTGUpdates[tgName].Place -= 1 + } + + // Filtered nodes are not reported to users, just omitted from the job status + continue + } + + // Check if this task group has already failed, reported to the user as a count + if metric, ok := s.failedTGAllocs[tgName]; ok { + metric.CoalescedFailures += 1 + metric.ExhaustResources(missing.TaskGroup) + continue + } + + // Store the available nodes by datacenter + s.ctx.Metrics().NodesAvailable = s.nodesByDC + s.ctx.Metrics().NodesInPool = len(s.nodes) + s.ctx.Metrics().NodePool = s.job.NodePool + + // Compute top K scoring node metadata + s.ctx.Metrics().PopulateScoreMetaData() + + // Lazy initialize the failed map + if s.failedTGAllocs == nil { + s.failedTGAllocs = make(map[string]*structs.AllocMetric) + } + + // Update metrics with the resources requested by the task group. + s.ctx.Metrics().ExhaustResources(missing.TaskGroup) + + // Actual failure to start this task on this candidate node, report it individually + s.failedTGAllocs[tgName] = s.ctx.Metrics() + s.addBlocked(node) + + continue + } + + // Store the available nodes by datacenter + s.ctx.Metrics().NodesAvailable = s.nodesByDC + s.ctx.Metrics().NodesInPool = len(s.nodes) + + // Compute top K scoring node metadata + s.ctx.Metrics().PopulateScoreMetaData() + + // Set fields based on if we found an allocation option + resources := &structs.AllocatedResources{ + Tasks: option.TaskResources, + TaskLifecycles: option.TaskLifecycles, + Shared: structs.AllocatedSharedResources{ + DiskMB: int64(missing.TaskGroup.EphemeralDisk.SizeMB), + }, + } + + if option.AllocResources != nil { + resources.Shared.Networks = option.AllocResources.Networks + resources.Shared.Ports = option.AllocResources.Ports + } + + // Create an allocation for this + alloc := &structs.Allocation{ + ID: uuid.Generate(), + Namespace: s.job.Namespace, + EvalID: s.eval.ID, + Name: missing.Name, + JobID: s.job.ID, + TaskGroup: tgName, + Metrics: s.ctx.Metrics(), + NodeID: option.Node.ID, + NodeName: option.Node.Name, + TaskResources: resources.OldTaskResources(), + AllocatedResources: resources, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + // SharedResources is considered deprecated, will be removed in 0.11. + // It is only set for compat reasons + SharedResources: &structs.Resources{ + DiskMB: missing.TaskGroup.EphemeralDisk.SizeMB, + Networks: resources.Shared.Networks, + }, + } + + // If the new allocation is replacing an older allocation then we record the + // older allocation id so that they are chained + if missing.Alloc != nil { + alloc.PreviousAllocation = missing.Alloc.ID + } + + // If this placement involves preemption, set DesiredState to evict for those allocations + if option.PreemptedAllocs != nil { + var preemptedAllocIDs []string + for _, stop := range option.PreemptedAllocs { + s.plan.AppendPreemptedAlloc(stop, alloc.ID) + + preemptedAllocIDs = append(preemptedAllocIDs, stop.ID) + if s.eval.AnnotatePlan && s.planAnnotations != nil { + s.planAnnotations.PreemptedAllocs = append(s.planAnnotations.PreemptedAllocs, stop.Stub(nil)) + if s.planAnnotations.DesiredTGUpdates != nil { + desired := s.planAnnotations.DesiredTGUpdates[tgName] + desired.Preemptions += 1 + } + } + } + alloc.PreemptedAllocations = preemptedAllocIDs + } + + s.plan.AppendAlloc(alloc, nil) + } + + return nil +} + +// addBlocked creates a new blocked eval for this job on this node +// and submit to the planner (worker.go), which keeps the eval for execution later +func (s *SysBatchScheduler) addBlocked(node *structs.Node) error { + e := s.ctx.Eligibility() + escaped := e.HasEscaped() + + // Only store the eligible classes if the eval hasn't escaped. + var classEligibility map[string]bool + if !escaped { + classEligibility = e.GetClasses() + } + + blocked := s.eval.CreateBlockedEval(classEligibility, escaped, e.QuotaLimitReached(), s.failedTGAllocs) + blocked.StatusDescription = sstructs.DescBlockedEvalFailedPlacements + blocked.NodeID = node.ID + + return s.planner.CreateEval(blocked) +} + +func (s *SysBatchScheduler) canHandle(trigger string) bool { + switch trigger { + case structs.EvalTriggerJobRegister: + case structs.EvalTriggerNodeUpdate: + case structs.EvalTriggerFailedFollowUp: + case structs.EvalTriggerJobDeregister: + case structs.EvalTriggerRollingUpdate: + case structs.EvalTriggerPreemption: + case structs.EvalTriggerDeploymentWatcher: + case structs.EvalTriggerNodeDrain: + case structs.EvalTriggerAllocStop: + case structs.EvalTriggerQueuedAllocs: + case structs.EvalTriggerScaling: + case structs.EvalTriggerReconnect: + default: + return trigger == structs.EvalTriggerPeriodicJob + } + return true +} + +// evictAndPlace is used to mark allocations for evicts and add them to the +// placement queue. evictAndPlace modifies the diffResult. It returns true if +// the limit has been reached for any task group. +func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.NodeReconcileResult, desc string) bool { + + limits := map[string]int{} // per task group limits + if !job.Stopped() { + jobLimit := len(diff.Update) + if job.Update.MaxParallel > 0 { + jobLimit = job.Update.MaxParallel + } + for _, tg := range job.TaskGroups { + if tg.Update != nil && tg.Update.MaxParallel > 0 { + limits[tg.Name] = tg.Update.MaxParallel + } else { + limits[tg.Name] = jobLimit + } + } + } + + limited := false + for _, a := range diff.Update { + if limit := limits[a.Alloc.TaskGroup]; limit > 0 { + ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") + diff.Place = append(diff.Place, a) + if !a.Canary { + limits[a.Alloc.TaskGroup]-- + } + } else { + limited = true + } + } + return limited +} diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index 11d82c5dd86..9101acbdd04 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -1122,7 +1122,7 @@ func TestSysBatch_JobConstraint_RunMultiple(t *testing.T) { must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation - err := h.Process(NewSystemScheduler, eval) + err := h.Process(NewSysBatchScheduler, eval) must.NoError(t, err) // Create a mock evaluation to run the job again, which will not place any @@ -1138,7 +1138,7 @@ func TestSysBatch_JobConstraint_RunMultiple(t *testing.T) { } must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval2})) - err = h.Process(NewSystemScheduler, eval2) + err = h.Process(NewSysBatchScheduler, eval2) must.NoError(t, err) // Ensure a single plan @@ -1822,7 +1822,7 @@ func TestSysBatch_Preemption(t *testing.T) { func TestSysBatch_canHandle(t *testing.T) { ci.Parallel(t) - s := SystemScheduler{sysbatch: true} + s := SysBatchScheduler{} t.Run("sysbatch register", func(t *testing.T) { must.True(t, s.canHandle(structs.EvalTriggerJobRegister)) }) diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 9e40aa72e51..73f32f264cc 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -5,7 +5,10 @@ package scheduler import ( "fmt" + "maps" + "math" "runtime/debug" + "slices" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" @@ -21,22 +24,17 @@ const ( // we will attempt to schedule if we continue to hit conflicts for system // jobs. maxSystemScheduleAttempts = 5 - - // maxSysBatchScheduleAttempts is used to limit the number of times we will - // attempt to schedule if we continue to hit conflicts for sysbatch jobs. - maxSysBatchScheduleAttempts = 2 ) -// SystemScheduler is used for 'system' and 'sysbatch' jobs. This scheduler is -// designed for jobs that should be run on every client. The 'system' mode -// will ensure those jobs continuously run regardless of successful task exits, -// whereas 'sysbatch' considers the task complete on success. +// SystemScheduler is used for 'system' jobs. This scheduler is designed for +// jobs that should be run on every client. The 'system' mode will ensure those +// jobs continuously run regardless of successful task exits, whereas 'sysbatch' +// considers the task complete on success. type SystemScheduler struct { logger log.Logger eventsCh chan<- interface{} state sstructs.State planner sstructs.Planner - sysbatch bool eval *structs.Evaluation job *structs.Job @@ -49,15 +47,43 @@ type SystemScheduler struct { notReadyNodes map[string]struct{} nodesByDC map[string]int - deployment *structs.Deployment - - limitReached bool + deployment *structs.Deployment + nodesForTG map[string]taskGroupNodes // used to track node feasibility information for each TG + filteredNodeMetricsForTG map[string]*structs.AllocMetric // used to track filtered node metrics for each TG failedTGAllocs map[string]*structs.AllocMetric queuedAllocs map[string]int planAnnotations *structs.PlanAnnotations } +// taskGroupNodes are a collection of taskGroupNode which include +// node feasibility information for a task group. +type taskGroupNodes []*taskGroupNode + +// feasible returns all taskGroupNode that are feasible for placement +func (t taskGroupNodes) feasible() (feasibleNodes []*taskGroupNode) { + for _, tgn := range t { + if tgn.isFeasible() { + feasibleNodes = append(feasibleNodes, tgn) + } + } + + return +} + +// taskGroupNode is a container for holding node feasibility +// information for a task group. +type taskGroupNode struct { + NodeID string + RankedNode *feasible.RankedNode + Metrics *structs.AllocMetric +} + +// isFeasible returns if the node is feasible for the task group. +func (t *taskGroupNode) isFeasible() bool { + return t.RankedNode != nil +} + // NewSystemScheduler is a factory function to instantiate a new system // scheduler. func NewSystemScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { @@ -66,17 +92,6 @@ func NewSystemScheduler(logger log.Logger, eventsCh chan<- interface{}, state ss eventsCh: eventsCh, state: state, planner: planner, - sysbatch: false, - } -} - -func NewSysBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { - return &SystemScheduler{ - logger: logger.Named("sysbatch_sched"), - eventsCh: eventsCh, - state: state, - planner: planner, - sysbatch: true, } } @@ -105,9 +120,6 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) (err error) { } limit := maxSystemScheduleAttempts - if s.sysbatch { - limit = maxSysBatchScheduleAttempts - } // Retry up to the maxSystemScheduleAttempts and reset if progress is made. progress := func() bool { return progressMade(s.planResult) } @@ -152,15 +164,13 @@ func (s *SystemScheduler) process() (bool, error) { } } - if !s.sysbatch { - s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.Namespace, s.eval.JobID) - if err != nil { - return false, fmt.Errorf("failed to get deployment for job %q: %w", s.eval.JobID, err) - } - // system deployments may be mutated in the reconciler because the node - // count can change between evaluations - s.deployment = s.deployment.Copy() + s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.Namespace, s.eval.JobID) + if err != nil { + return false, fmt.Errorf("failed to get deployment for job %q: %w", s.eval.JobID, err) } + // system deployments may be mutated in the reconciler because the node + // count can change between evaluations + s.deployment = s.deployment.Copy() // Create a plan s.plan = s.eval.MakePlan(s.job) @@ -172,7 +182,7 @@ func (s *SystemScheduler) process() (bool, error) { s.ctx = feasible.NewEvalContext(s.eventsCh, s.state, s.plan, s.logger) // Construct the placement stack - s.stack = feasible.NewSystemStack(s.sysbatch, s.ctx) + s.stack = feasible.NewSystemStack(false, s.ctx) if !s.job.Stopped() { s.setJob(s.job) } @@ -266,38 +276,34 @@ func (s *SystemScheduler) computeJobAllocs() error { // Diff the required and existing allocations nr := reconciler.NewNodeReconciler(s.deployment) - r := nr.Compute(s.job, s.nodes, s.notReadyNodes, tainted, live, term, - s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true)) + reconciliationResult := nr.Compute(s.job, s.nodes, s.notReadyNodes, tainted, + live, term, s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true)) if s.logger.IsDebug() { - s.logger.Debug("reconciled current state with desired state", r.Fields()...) + s.logger.Debug("reconciled current state with desired state", reconciliationResult.Fields()...) } - // Add the deployment changes to the plan - s.plan.Deployment = nr.DeploymentCurrent - s.plan.DeploymentUpdates = nr.DeploymentUpdates - // Update the stored deployment if nr.DeploymentCurrent != nil { s.deployment = nr.DeploymentCurrent } // Add all the allocs to stop - for _, e := range r.Stop { + for _, e := range reconciliationResult.Stop { s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocNotNeeded, "", "") } // Add all the allocs to migrate - for _, e := range r.Migrate { + for _, e := range reconciliationResult.Migrate { s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocNodeTainted, "", "") } // Lost allocations should be transitioned to desired status stop and client // status lost. - for _, e := range r.Lost { + for _, e := range reconciliationResult.Lost { s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocLost, structs.AllocClientStatusLost, "") } - for _, e := range r.Disconnecting { + for _, e := range reconciliationResult.Disconnecting { s.plan.AppendUnknownAlloc(e.Alloc) } @@ -305,88 +311,214 @@ func (s *SystemScheduler) computeJobAllocs() error { // Attempt to do the upgrades in place. // Reconnecting allocations need to be updated to persists alloc state // changes. - updates := make([]reconciler.AllocTuple, 0, len(r.Update)+len(r.Reconnecting)) - updates = append(updates, r.Update...) - updates = append(updates, r.Reconnecting...) + updates := make([]reconciler.AllocTuple, 0, len(reconciliationResult.Update)+len(reconciliationResult.Reconnecting)) + updates = append(updates, reconciliationResult.Update...) + updates = append(updates, reconciliationResult.Reconnecting...) destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, updates) - r.Update = destructiveUpdates + reconciliationResult.Update = destructiveUpdates for _, inplaceUpdate := range inplaceUpdates { allocExistsForTaskGroup[inplaceUpdate.TaskGroup.Name] = true } s.planAnnotations = &structs.PlanAnnotations{ - DesiredTGUpdates: desiredUpdates(r, inplaceUpdates, destructiveUpdates), + DesiredTGUpdates: desiredUpdates(reconciliationResult, inplaceUpdates, destructiveUpdates), + } + + // Gather node information for all the task groups. + s.nodesForTG, s.filteredNodeMetricsForTG = s.findNodesForTG(reconciliationResult) + + // Set placement totals for task groups. The totals will be the available + // feasible nodes for each group. + for tgName := range s.nodesForTG { + s.planAnnotations.DesiredTGUpdates[tgName].Place = uint64(len(s.nodesForTG[tgName].feasible())) } // Treat non in-place updates as an eviction and new placement, which will // be limited by max_parallel - s.limitReached = evictAndPlace(s.ctx, s.job, r, sstructs.StatusAllocUpdating) + s.evictAndPlace(reconciliationResult, sstructs.StatusAllocUpdating) - // Nothing remaining to do if placement is not required - if len(r.Place) == 0 { - if !s.job.Stopped() { - for _, tg := range s.job.TaskGroups { - s.queuedAllocs[tg.Name] = 0 - } + if !s.job.Stopped() { + for _, tg := range s.job.TaskGroups { + s.queuedAllocs[tg.Name] = 0 } - return nil } // Record the number of allocations that needs to be placed per Task Group - for _, allocTuple := range r.Place { + for _, allocTuple := range reconciliationResult.Place { s.queuedAllocs[allocTuple.TaskGroup.Name] += 1 } - for _, ignoredAlloc := range r.Ignore { + for _, ignoredAlloc := range reconciliationResult.Ignore { allocExistsForTaskGroup[ignoredAlloc.TaskGroup.Name] = true } // Compute the placements - return s.computePlacements(r.Place, allocExistsForTaskGroup) -} + if err := s.computePlacements(reconciliationResult, allocExistsForTaskGroup); err != nil { + return err + } + + // if there is no deployment we're done at this point + if s.deployment == nil { + return nil + } + + // we only know the total amount of placements once we filter out infeasible + // nodes, so for system jobs we do it backwards a bit: the "desired" total + // is the total we were able to place. + // track if any of the task groups is doing a canary update now + deploymentComplete := true + for _, tg := range s.job.TaskGroups { + feasibleNodes := s.nodesForTG[tg.Name].feasible() + if len(feasibleNodes) < 1 { + // this will happen if we're seeing a TG that shouldn't be placed. + // + // in case the deployment is in a successful state, this indicate a + // noop eval due to infeasible nodes. In this case we set the dstate + // for this task group to nil. + if s.deployment.Status == structs.DeploymentStatusSuccessful { + s.deployment.TaskGroups[tg.Name] = nil + } + + continue + } + + dstate, ok := s.deployment.TaskGroups[tg.Name] + // no deployment for this TG + if !ok { + continue + } + + // we can set the desired total now, it's always the amount of all + // feasible nodes + s.deployment.TaskGroups[tg.Name].DesiredTotal = len(feasibleNodes) + + // a system job is canarying if: + // - it has a non-empty update block (just a sanity check, all + // submitted jobs should have a non-empty update block as part of + // canonicalization) + // - canary parameter in the update block has to be positive + // - deployment has to be non-nil and it cannot have been promoted + // - this cannot be the initial job version + isCanarying := !tg.Update.IsEmpty() && + tg.Update.Canary > 0 && + dstate != nil && + !dstate.Promoted && + s.job.Version != 0 + + if isCanarying { + // we can now also set the desired canaries: it's the tg.Update.Canary + // percent of all the feasible nodes, rounded up to the nearest int + requiredCanaries := int(math.Ceil(float64(tg.Update.Canary) * float64(len(feasibleNodes)) / 100)) + s.deployment.TaskGroups[tg.Name].DesiredCanaries = requiredCanaries + + // Initially, if the job requires canaries, we place all of them on + // all eligible nodes. At this point we know which nodes are + // feasible, so we evict unnedded canaries. + placedCanaries := s.evictUnneededCanaries(requiredCanaries, tg.Name, reconciliationResult) + + // Update deployment and plan annotation with canaries that were placed + s.deployment.TaskGroups[tg.Name].PlacedCanaries = placedCanaries + s.planAnnotations.DesiredTGUpdates[tg.Name].Canary = uint64(len(placedCanaries)) + } -func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric { - if acc == nil { - return curr.Copy() + groupComplete := s.isDeploymentComplete(dstate, isCanarying) + deploymentComplete = deploymentComplete && groupComplete } - acc.NodesEvaluated += curr.NodesEvaluated - acc.NodesFiltered += curr.NodesFiltered + // adjust the deployment updates and set the right deployment status + nr.DeploymentUpdates = append(nr.DeploymentUpdates, s.setDeploymentStatusAndUpdates(deploymentComplete, s.job)...) - if acc.ClassFiltered == nil { - acc.ClassFiltered = make(map[string]int) + // Check if perhaps we're dealing with a nil deployment, i.e., a deployment + // which is in successful state and where all task groups have a nil dstate. + // In this case, set the deployment to nil. + nilDstates := true + for _, tg := range s.deployment.TaskGroups { + if tg != nil { + nilDstates = false + } } - for k, v := range curr.ClassFiltered { - acc.ClassFiltered[k] += v + if nilDstates { + s.deployment = nil + nr.DeploymentUpdates = nil } - if acc.ConstraintFiltered == nil { - acc.ConstraintFiltered = make(map[string]int) + + // Add the deployment changes to the plan + s.plan.Deployment = s.deployment + s.plan.DeploymentUpdates = nr.DeploymentUpdates + + return nil +} + +// findNodesForTG runs feasibility checks on nodes. The result includes all nodes for each +// task group (feasible and infeasible) along with metrics information on the checks. +func (s *SystemScheduler) findNodesForTG(buckets *reconciler.NodeReconcileResult) (tgNodes map[string]taskGroupNodes, filteredMetrics map[string]*structs.AllocMetric) { + tgNodes = make(map[string]taskGroupNodes) + filteredMetrics = make(map[string]*structs.AllocMetric) + + nodeByID := make(map[string]*structs.Node, len(s.nodes)) + for _, node := range s.nodes { + nodeByID[node.ID] = node } - for k, v := range curr.ConstraintFiltered { - acc.ConstraintFiltered[k] += v + + nodes := make([]*structs.Node, 1) + for _, a := range slices.Concat(buckets.Place, buckets.Update, buckets.Ignore) { + tgName := a.TaskGroup.Name + if tgNodes[tgName] == nil { + tgNodes[tgName] = taskGroupNodes{} + } + + node, ok := nodeByID[a.Alloc.NodeID] + if !ok { + s.logger.Debug("could not find node", "node", a.Alloc.NodeID) + continue + } + + // Update the set of placement nodes + nodes[0] = node + s.stack.SetNodes(nodes) + + if a.Alloc.ID != "" { + // temporarily include the old alloc from a destructive update so + // that we can account for resources that will be freed by that + // allocation. We'll back this change out if we end up needing to + // limit placements by max_parallel or canaries. + s.plan.AppendStoppedAlloc(a.Alloc, sstructs.StatusAllocUpdating, "", "") + } + + // Attempt to match the task group + option := s.stack.Select(a.TaskGroup, &feasible.SelectOptions{AllocName: a.Name}) + + // Always store the results. Keep the metrics that were generated + // for the match attempt so they can be used during placement. + tgNodes[tgName] = append(tgNodes[tgName], &taskGroupNode{node.ID, option, s.ctx.Metrics().Copy()}) + + if option == nil { + // When no match is found, merge the filter metrics for the task + // group so proper reporting can be done during placement. + filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics()) + } } - acc.AllocationTime += curr.AllocationTime - return acc + + return } // computePlacements computes placements for allocations -func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, existingByTaskGroup map[string]bool) error { +func (s *SystemScheduler) computePlacements( + reconcilerResult *reconciler.NodeReconcileResult, existingByTaskGroup map[string]bool, +) error { + nodeByID := make(map[string]*structs.Node, len(s.nodes)) for _, node := range s.nodes { nodeByID[node.ID] = node } - // track node filtering, to only report an error if all nodes have been filtered - var filteredMetrics map[string]*structs.AllocMetric - var deploymentID string if s.deployment != nil && s.deployment.Active() { deploymentID = s.deployment.ID } - nodes := make([]*structs.Node, 1) - for _, missing := range place { + for _, missing := range reconcilerResult.Place { tgName := missing.TaskGroup.Name node, ok := nodeByID[missing.Alloc.NodeID] @@ -395,12 +527,25 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist continue } - // Update the set of placement nodes - nodes[0] = node - s.stack.SetNodes(nodes) - - // Attempt to match the task group - option := s.stack.Select(missing.TaskGroup, &feasible.SelectOptions{AllocName: missing.Name}) + // we've already performed feasibility check for all the task groups and + // nodes, so look up + var option *feasible.RankedNode + var metrics *structs.AllocMetric + optionsForTG := s.nodesForTG[tgName] + if existing := slices.IndexFunc( + optionsForTG, + func(rn *taskGroupNode) bool { return rn.NodeID == node.ID }, + ); existing != -1 { + option = optionsForTG[existing].RankedNode + metrics = optionsForTG[existing].Metrics + } else { + // we should have an entry for every node that is looked + // up. if we don't, something must be wrong + s.logger.Error("failed to locate node feasibility information", + "node-id", node.ID, "task_group", tgName) + // provide a stubbed metric to work with + metrics = &structs.AllocMetric{} + } if option == nil { // If the task can't be placed on this node, update reporting data @@ -409,15 +554,10 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist // If this node was filtered because of constraint // mismatches and we couldn't create an allocation then // decrement queuedAllocs for that task group. - if s.ctx.Metrics().NodesFiltered > 0 { + if metrics.NodesFiltered > 0 { queued := s.queuedAllocs[tgName] - 1 s.queuedAllocs[tgName] = queued - if filteredMetrics == nil { - filteredMetrics = map[string]*structs.AllocMetric{} - } - filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics()) - // If no tasks have been placed and there aren't any previously // existing (ignored or updated) tasks on the node, mark the alloc as failed to be placed // if queued <= 0 && !existingByTaskGroup[tgName] { @@ -425,18 +565,7 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist if s.failedTGAllocs == nil { s.failedTGAllocs = make(map[string]*structs.AllocMetric) } - s.failedTGAllocs[tgName] = filteredMetrics[tgName] - } - - // If we are annotating the plan, then decrement the desired - // placements based on whether the node meets the constraints - if s.planAnnotations != nil && - s.planAnnotations.DesiredTGUpdates != nil { - s.planAnnotations.DesiredTGUpdates[tgName].Place -= 1 - } - - if s.plan.Deployment != nil { - s.deployment.TaskGroups[tgName].DesiredTotal -= 1 + s.failedTGAllocs[tgName] = s.filteredNodeMetricsForTG[tgName] } // Filtered nodes are not reported to users, just omitted from the job status @@ -451,12 +580,12 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist } // Store the available nodes by datacenter - s.ctx.Metrics().NodesAvailable = s.nodesByDC - s.ctx.Metrics().NodesInPool = len(s.nodes) - s.ctx.Metrics().NodePool = s.job.NodePool + metrics.NodesAvailable = s.nodesByDC + metrics.NodesInPool = len(s.nodes) + metrics.NodePool = s.job.NodePool // Compute top K scoring node metadata - s.ctx.Metrics().PopulateScoreMetaData() + metrics.PopulateScoreMetaData() // Lazy initialize the failed map if s.failedTGAllocs == nil { @@ -464,21 +593,21 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist } // Update metrics with the resources requested by the task group. - s.ctx.Metrics().ExhaustResources(missing.TaskGroup) + metrics.ExhaustResources(missing.TaskGroup) // Actual failure to start this task on this candidate node, report it individually - s.failedTGAllocs[tgName] = s.ctx.Metrics() + s.failedTGAllocs[tgName] = metrics s.addBlocked(node) continue } // Store the available nodes by datacenter - s.ctx.Metrics().NodesAvailable = s.nodesByDC - s.ctx.Metrics().NodesInPool = len(s.nodes) + metrics.NodesAvailable = s.nodesByDC + metrics.NodesInPool = len(s.nodes) // Compute top K scoring node metadata - s.ctx.Metrics().PopulateScoreMetaData() + metrics.PopulateScoreMetaData() // Set fields based on if we found an allocation option resources := &structs.AllocatedResources{ @@ -502,7 +631,7 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist Name: missing.Name, JobID: s.job.ID, TaskGroup: tgName, - Metrics: s.ctx.Metrics(), + Metrics: metrics, NodeID: option.Node.ID, NodeName: option.Node.Name, DeploymentID: deploymentID, @@ -590,28 +719,23 @@ func (s *SystemScheduler) canHandle(trigger string) bool { case structs.EvalTriggerScaling: case structs.EvalTriggerReconnect: default: - switch s.sysbatch { - case true: - return trigger == structs.EvalTriggerPeriodicJob - case false: - return false - } + return false } return true } // evictAndPlace is used to mark allocations for evicts and add them to the -// placement queue. evictAndPlace modifies the diffResult. It returns true if -// the limit has been reached for any task group. -func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.NodeReconcileResult, desc string) bool { +// placement queue. evictAndPlace modifies the reconciler result. It returns +// true if the limit has been reached for any task group. +func (s *SystemScheduler) evictAndPlace(reconciled *reconciler.NodeReconcileResult, desc string) { limits := map[string]int{} // per task group limits - if !job.Stopped() { - jobLimit := len(diff.Update) - if job.Update.MaxParallel > 0 { - jobLimit = job.Update.MaxParallel + if !s.job.Stopped() { + jobLimit := len(reconciled.Update) + if s.job.Update.MaxParallel > 0 { + jobLimit = s.job.Update.MaxParallel } - for _, tg := range job.TaskGroups { + for _, tg := range s.job.TaskGroups { if tg.Update != nil && tg.Update.MaxParallel > 0 { limits[tg.Name] = tg.Update.MaxParallel } else { @@ -620,17 +744,194 @@ func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.Node } } - limited := false - for _, a := range diff.Update { + // findFeasibleNodesForTG method marks all old allocs from a destructive + // update as stopped in order to get an accurate feasible nodes count + // (accounting for resources that would be freed). Now we need to remove all + // the allocs that have StatusAllocUpdating from the set we're stopping (NodeUpdate) and + // only place and stop the ones the ones that correspond to updates limited by max parallel. + for node, allocations := range s.plan.NodeUpdate { + n := 0 + for _, alloc := range allocations { + if alloc.DesiredDescription != sstructs.StatusAllocUpdating { + allocations[n] = alloc + n += 1 + } + } + s.plan.NodeUpdate[node] = allocations[:n] + } + + for _, a := range reconciled.Update { if limit := limits[a.Alloc.TaskGroup]; limit > 0 { - ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") - diff.Place = append(diff.Place, a) - if !a.Canary { - limits[a.Alloc.TaskGroup]-- + s.ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") + reconciled.Place = append(reconciled.Place, a) + + limits[a.Alloc.TaskGroup]-- + } + } + + // it may be the case that there are keys in the NodeUpdate that are empty. + // We should delete them, otherwise the plan won't be correctly recognize as + // a no-op. + maps.DeleteFunc(s.plan.NodeUpdate, func(k string, v []*structs.Allocation) bool { + return len(v) == 0 + }) +} + +// evictAndPlaceCanaries checks how many canaries are needed against the amount +// of feasible nodes, and removes unnecessary placements from the plan. +func (s *SystemScheduler) evictUnneededCanaries(requiredCanaries int, tgName string, buckets *reconciler.NodeReconcileResult) []string { + + desiredCanaries := make([]string, 0) + + // no canaries to consider, quit early + if requiredCanaries == 0 { + return desiredCanaries + } + + canaryCounter := requiredCanaries + + // Start with finding any existing failed canaries + failedCanaries := map[string]struct{}{} + for _, alloc := range buckets.Place { + if alloc.Alloc != nil && alloc.Alloc.DeploymentStatus != nil && alloc.Alloc.DeploymentStatus.Canary { + failedCanaries[alloc.Alloc.ID] = struct{}{} + } + } + + // Generate a list of preferred allocations for + // canaries. These are existing canary applications + // that are failed. + preferCanary := map[string]struct{}{} + for _, allocations := range s.plan.NodeAllocation { + for _, alloc := range allocations { + if _, ok := failedCanaries[alloc.PreviousAllocation]; ok { + preferCanary[alloc.ID] = struct{}{} + } + } + } + + // Remove the number of preferred canaries found + // from the counter. + canaryCounter -= len(preferCanary) + + // Check for any canaries that are already running. For any + // that are found, add to the desired list and decrement + // the counter. + for _, tuple := range buckets.Ignore { + if tuple.TaskGroup.Name == tgName && tuple.Alloc != nil && + tuple.Alloc.DeploymentStatus != nil && tuple.Alloc.DeploymentStatus.Canary { + desiredCanaries = append(desiredCanaries, tuple.Alloc.ID) + canaryCounter-- + } + } + + // iterate over node allocations to find canary allocs + for node, allocations := range s.plan.NodeAllocation { + n := 0 + for _, alloc := range allocations { + // these are the allocs we keep + if alloc.DeploymentStatus == nil || !alloc.DeploymentStatus.Canary || alloc.TaskGroup != tgName { + allocations[n] = alloc + n += 1 + continue + } + + // if it's a canary, we only keep up to desiredCanaries amount of + // them + if alloc.DeploymentStatus.Canary { + // Check if this is a preferred allocation for the canary + _, preferred := preferCanary[alloc.ID] + + // If it is a preferred allocation, or the counter is not exhausted, + // keep the allocation + if canaryCounter > 0 || preferred { + canaryCounter -= 1 + desiredCanaries = append(desiredCanaries, alloc.ID) + allocations[n] = alloc + n += 1 + } else { + // If the counter has been exhausted the allocation will not be + // placed, but a stop will have been appended for the update. + // Locate it and remove it. + idx := slices.IndexFunc(s.plan.NodeUpdate[alloc.NodeID], func(a *structs.Allocation) bool { + return a.ID == alloc.PreviousAllocation + }) + if idx > -1 { + s.plan.NodeUpdate[alloc.NodeID] = append(s.plan.NodeUpdate[alloc.NodeID][0:idx], s.plan.NodeUpdate[alloc.NodeID][idx+1:]...) + } + } } - } else { - limited = true } + + // because of this nifty trick we don't need to allocate an extra slice + s.plan.NodeAllocation[node] = allocations[:n] } - return limited + + return desiredCanaries +} + +func (s *SystemScheduler) isDeploymentComplete(dstate *structs.DeploymentState, isCanarying bool) bool { + if s.deployment == nil || isCanarying { + return false + } + + complete := true + + // ensure everything is healthy + if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs + complete = false + } + + return complete +} + +func (s *SystemScheduler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate { + statusUpdates := []*structs.DeploymentStatusUpdate{} + + if d := s.deployment; d != nil { + + // Deployments that require promotion should have appropriate status set + // immediately, no matter their completness. + if d.RequiresPromotion() { + if d.HasAutoPromote() { + d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion + } else { + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + } + return statusUpdates + } + + // Mark the deployment as complete if possible + if deploymentComplete { + if job.IsMultiregion() { + // the unblocking/successful states come after blocked, so we + // need to make sure we don't revert those states + if d.Status != structs.DeploymentStatusUnblocking && + d.Status != structs.DeploymentStatusSuccessful { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: s.deployment.ID, + Status: structs.DeploymentStatusBlocked, + StatusDescription: structs.DeploymentStatusDescriptionBlocked, + }) + } + } else { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: s.deployment.ID, + Status: structs.DeploymentStatusSuccessful, + StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + }) + } + } + + // Mark the deployment as pending since its state is now computed. + if d.Status == structs.DeploymentStatusInitializing { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: s.deployment.ID, + Status: structs.DeploymentStatusPending, + StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer, + }) + } + } + + return statusUpdates } diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index eeed3d7827a..8a93eeb3097 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -330,7 +330,7 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) { h := tests.NewHarness(t) // Create some nodes - for i := 0; i < 10; i++ { + for i := range 10 { node := mock.Node() if i < 9 { node.NodeClass = "foo" @@ -364,15 +364,10 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) { must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation - err := h.Process(NewSystemScheduler, eval) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, h.Process(NewSystemScheduler, eval)) // Ensure a single plan - if len(h.Plans) != 1 { - t.Fatalf("bad: %#v", h.Plans) - } + must.SliceLen(t, 1, h.Plans) plan := h.Plans[0] // Ensure the plan allocated @@ -380,9 +375,7 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) { for _, allocList := range plan.NodeAllocation { planned = append(planned, allocList...) } - if len(planned) != 9 { - t.Fatalf("bad: %#v %d", planned, len(planned)) - } + must.SliceLen(t, 9, planned) // Lookup the allocations by JobID ws := memdb.NewWatchSet() @@ -390,9 +383,7 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) { must.NoError(t, err) // Ensure all allocations placed - if len(out) != 9 { - t.Fatalf("bad: %#v", out) - } + must.SliceLen(t, 9, out) // Check the available nodes if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 10 { @@ -404,23 +395,14 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) // Ensure the plan had annotations. - if plan.Annotations == nil { - t.Fatalf("expected annotations") - } + must.NotNil(t, plan.Annotations) desiredTGs := plan.Annotations.DesiredTGUpdates - if l := len(desiredTGs); l != 1 { - t.Fatalf("incorrect number of task groups; got %v; want %v", l, 1) - } + must.MapLen(t, 1, desiredTGs, must.Sprint("incorrect number of task groups")) desiredChanges, ok := desiredTGs["web"] - if !ok { - t.Fatalf("expected task group web to have desired changes") - } - - expected := &structs.DesiredUpdates{Place: 9} - must.Eq(t, desiredChanges, expected) - + must.True(t, ok, must.Sprint("expected task group web to have desired changes")) + must.Eq(t, 9, desiredChanges.Place) } func TestSystemSched_JobRegister_AddNode(t *testing.T) { @@ -1293,11 +1275,11 @@ func TestSystemSched_Queued_With_Constraints(t *testing.T) { must.Zero(t, val) } -// This test ensures that the scheduler correctly ignores ineligible -// nodes when scheduling due to a new node being added. The job has two -// task groups constrained to a particular node class. The desired behavior -// should be that the TaskGroup constrained to the newly added node class is -// added and that the TaskGroup constrained to the ineligible node is ignored. +// This test ensures that the scheduler correctly ignores ineligible nodes when +// scheduling due to a new node being added. The job has two task groups +// constrained to a particular node class. The desired behavior should be that +// the TaskGroup constrained to the newly added node class is added and that the +// TaskGroup constrained to the ineligible node is ignored. func TestSystemSched_JobConstraint_AddNode(t *testing.T) { ci.Parallel(t) @@ -2307,7 +2289,7 @@ func TestSystemSched_Preemption(t *testing.T) { func TestSystemSched_canHandle(t *testing.T) { ci.Parallel(t) - s := SystemScheduler{sysbatch: false} + s := SystemScheduler{} t.Run("system register", func(t *testing.T) { must.True(t, s.canHandle(structs.EvalTriggerJobRegister)) }) @@ -3120,8 +3102,12 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation - err := h.Process(NewSystemScheduler, eval) - must.NoError(t, err) + if tc.jobType == structs.JobTypeSystem { + must.NoError(t, h.Process(NewSystemScheduler, eval)) + } + if tc.jobType == structs.JobTypeSysBatch { + must.NoError(t, h.Process(NewSysBatchScheduler, eval)) + } // Ensure a single plan must.Len(t, tc.expectedPlanCount, h.Plans) @@ -3352,8 +3338,14 @@ func TestEvictAndPlace(t *testing.T) { diff := &reconciler.NodeReconcileResult{Update: allocs} _, ctx := feasible.MockContext(t) - must.Eq(t, tc.expectLimited, evictAndPlace(ctx, job, diff, ""), - must.Sprintf("limited")) + s := SystemScheduler{ctx: ctx, job: job, plan: &structs.Plan{ + EvalID: uuid.Generate(), + NodeUpdate: make(map[string][]*structs.Allocation), + NodeAllocation: make(map[string][]*structs.Allocation), + NodePreemptions: make(map[string][]*structs.Allocation), + }} + + s.evictAndPlace(diff, "") must.Len(t, tc.expectPlace, diff.Place, must.Sprintf( "evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) }) @@ -3424,6 +3416,8 @@ func TestSystemSched_UpdateBlock(t *testing.T) { expectAllocs map[string]int // plan NodeAllocations group -> count expectStop map[string]int // plan NodeUpdates group -> count expectDState map[string]*structs.DeploymentState + + ineligibleNodes int // number of nodes to mark as ineligible }{ { name: "legacy upgrade non-deployment", @@ -3473,8 +3467,8 @@ func TestSystemSched_UpdateBlock(t *testing.T) { expectAllocs: map[string]int{tg1: 2, tg2: 3}, expectStop: map[string]int{tg1: 2, tg2: 3}, expectDState: map[string]*structs.DeploymentState{ - tg1: {DesiredTotal: 10, PlacedAllocs: 4}, - tg2: {DesiredTotal: 10, PlacedAllocs: 6}, + tg1: {DesiredTotal: 10, PlacedAllocs: 4}, // 2 previous + 2 destructive + tg2: {DesiredTotal: 10, PlacedAllocs: 6}, // 3 previous + 3 destructive }, }, @@ -3547,8 +3541,8 @@ func TestSystemSched_UpdateBlock(t *testing.T) { expectAllocs: map[string]int{tg1: 7, tg2: 10}, expectStop: map[string]int{tg1: 2, tg2: 3}, expectDState: map[string]*structs.DeploymentState{ - tg1: {DesiredTotal: 10, PlacedAllocs: 7}, - tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + tg1: {DesiredTotal: 10, PlacedAllocs: 7}, // 2 destructive + 5 new + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, // 3 destructive + 7 new }, }, @@ -3563,22 +3557,25 @@ func TestSystemSched_UpdateBlock(t *testing.T) { }, existingPrevious: map[string][]int{ tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7}, // only 8 were previously eligible }, existingOldDState: map[string]*structs.DeploymentState{ tg1: {DesiredTotal: 10, PlacedAllocs: 10}, - tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + tg2: {DesiredTotal: 8, PlacedAllocs: 8}, }, - expectAllocs: map[string]int{tg1: 3, tg2: 5}, - expectStop: map[string]int{tg1: 3, tg2: 5}, + expectAllocs: map[string]int{tg1: 2, tg2: 7}, + expectStop: map[string]int{tg1: 2, tg2: 5}, expectDState: map[string]*structs.DeploymentState{ tg1: { DesiredTotal: 10, DesiredCanaries: 3, - PlacedCanaries: []string{"0", "1", "2"}, - PlacedAllocs: 3, + PlacedCanaries: []string{"0", "1"}, + PlacedAllocs: 2, // want 3 canaries, limited by max_parallel + }, + tg2: { + DesiredTotal: 10, + PlacedAllocs: 7, // 2 new + 5 destructive updates }, - tg2: {DesiredTotal: 10, PlacedAllocs: 5}, }, }, @@ -3624,7 +3621,7 @@ func TestSystemSched_UpdateBlock(t *testing.T) { DesiredTotal: 10, DesiredCanaries: 3, PlacedCanaries: []string{"7", "8", "9"}, - PlacedAllocs: 5, + PlacedAllocs: 5, // 2 failed + 2 replacements + 1 new }, tg2: {DesiredTotal: 10, PlacedAllocs: 10}, }, @@ -3669,7 +3666,7 @@ func TestSystemSched_UpdateBlock(t *testing.T) { DesiredTotal: 10, DesiredCanaries: 3, PlacedCanaries: []string{"7", "8", "9"}, - PlacedAllocs: 3, + PlacedAllocs: 3, // 1 existing canary + 2 new canaries }, tg2: {DesiredTotal: 10, PlacedAllocs: 10}, }, @@ -3707,14 +3704,14 @@ func TestSystemSched_UpdateBlock(t *testing.T) { }, tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10}, }, - expectAllocs: nil, - expectStop: nil, + expectAllocs: map[string]int{}, + expectStop: map[string]int{}, expectDState: map[string]*structs.DeploymentState{ tg1: { DesiredTotal: 10, DesiredCanaries: 3, PlacedCanaries: []string{"7", "8", "9"}, - PlacedAllocs: 3, + PlacedAllocs: 3, // unchanged because we're not promoted }, tg2: {DesiredTotal: 10, PlacedAllocs: 10}, }, @@ -3756,11 +3753,57 @@ func TestSystemSched_UpdateBlock(t *testing.T) { DesiredTotal: 10, DesiredCanaries: 3, PlacedCanaries: []string{"7", "8", "9"}, - PlacedAllocs: 5, + PlacedAllocs: 5, // 3 running canaries + 2 new limited by max_parallel }, tg2: {DesiredTotal: 10, PlacedAllocs: 10}, }, }, + + { + name: "deployment complete with ineligible nodes", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 10, + Canary: 30, + AutoPromote: true, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: true, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 3, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 5}, // 7 to replace minus 2 on ineligible nodes + expectStop: map[string]int{tg1: 7}, // stop all previous versions + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 8, // 10 nodes minus 2 ineligble nodes + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 8, + }, + tg2: { + DesiredTotal: 8, // 10 nodes minus 2 ineligble nodes + PlacedAllocs: 10, // New allocations were already placed + }, + }, + ineligibleNodes: 2, + }, } for _, tc := range testCases { @@ -3769,6 +3812,10 @@ func TestSystemSched_UpdateBlock(t *testing.T) { h := tests.NewHarness(t) nodes := createNodes(t, h, 10) + for i := range tc.ineligibleNodes { + nodes[i].SchedulingEligibility = structs.NodeSchedulingIneligible + } + oldJob := mock.SystemJob() oldJob.TaskGroups[0].Update = tc.tg1UpdateBlock oldJob.TaskGroups[0].Name = tg1 @@ -3955,3 +4002,253 @@ func TestSystemSched_UpdateBlock(t *testing.T) { } } + +func TestSystemSched_evictUnneededCanaries(t *testing.T) { + tests := []struct { + name string + requiredCanaries int + tgName string + nodeAllocation map[string][]*structs.Allocation + expectedDesiredCanaries int + expectedNodeAllocation []string + }{ + { + name: "no required canaries", + requiredCanaries: 0, + tgName: "foo", + nodeAllocation: nil, + expectedDesiredCanaries: 0, + expectedNodeAllocation: nil, + }, + { + name: "existing allocs for 2 task groups: tg1 with no canaries, tg2 with canaries, calling for tg1", + requiredCanaries: 1, + tgName: "tg1", + nodeAllocation: map[string][]*structs.Allocation{ + "node1": { + { + ID: "tg1_alloc1", + DeploymentStatus: &structs.AllocDeploymentStatus{Canary: false}, + TaskGroup: "tg1", + }, + { + ID: "tg1_alloc2", + DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, + TaskGroup: "tg2", + }, + }, + "node2": { + { + ID: "tg2_alloc1", + DeploymentStatus: &structs.AllocDeploymentStatus{Canary: false}, + TaskGroup: "tg1", + }, + { + ID: "tg2_alloc2", + DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, + TaskGroup: "tg2", + }, + }, + }, + expectedDesiredCanaries: 0, + expectedNodeAllocation: []string{"tg1_alloc1", "tg1_alloc2", "tg2_alloc1", "tg2_alloc2"}, + }, + { + name: "existing allocs for 2 task groups with canaries", + requiredCanaries: 1, + tgName: "tg1", + nodeAllocation: map[string][]*structs.Allocation{ + "node1": { + { + ID: "tg1_alloc1", + DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, + TaskGroup: "tg1", + }, + { + ID: "tg2_alloc1", + DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, + TaskGroup: "tg2", + }, + }, + "node2": { + { + ID: "tg1_alloc2", + DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, + TaskGroup: "tg1", + }, + { + ID: "tg2_alloc2", + DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, + TaskGroup: "tg2", + }, + }, + }, + expectedDesiredCanaries: 1, + expectedNodeAllocation: []string{"tg1_alloc1", "tg2_alloc1", "tg2_alloc2"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := SystemScheduler{ + plan: mock.Plan(), + } + s.plan.NodeAllocation = tt.nodeAllocation + + must.SliceLen(t, tt.expectedDesiredCanaries, s.evictUnneededCanaries(tt.requiredCanaries, tt.tgName, &reconciler.NodeReconcileResult{}), must.Sprint("unexpected desired canaries")) + allocsOnNodes := []*structs.Allocation{} + for _, a := range s.plan.NodeAllocation { + allocsOnNodes = append(allocsOnNodes, a...) + } + must.SliceContainsAllFunc(t, allocsOnNodes, tt.expectedNodeAllocation, + func(a *structs.Allocation, id string) bool { + return a.ID == id + }) + }) + } +} + +func TestSystemSched_NoOpEvalWithInfeasibleNodes(t *testing.T) { + ci.Parallel(t) + h := tests.NewHarness(t) + + nodes := make([]*structs.Node, 4) + eligible := []string{} + for i := range 4 { + node := mock.Node() + if i%2 == 0 { + node.Attributes["kernel.name"] = "not-linux" + } else { + eligible = append(eligible, node.ID) + } + nodes[i] = node + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + job := mock.SystemJob() + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + existingAllocIDs := []string{} + allocs := []*structs.Allocation{} + for i := range 4 { + if i%2 != 0 { + alloc := mock.MinAllocForJob(job) + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.NodeID = nodes[i].ID + alloc.Name = structs.AllocName(job.Name, job.TaskGroups[0].Name, 0) + existingAllocIDs = append(existingAllocIDs, alloc.ID) + allocs = append(allocs, alloc) + } + } + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + d := mock.Deployment() + d.JobID = job.ID + d.JobVersion = job.Version + d.Status = structs.DeploymentStatusSuccessful + must.NoError(t, h.State.UpsertDeployment(h.NextIndex(), d)) + + eval := &structs.Evaluation{ + Namespace: job.Namespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + AnnotatePlan: true, + } + must.NoError(t, h.State.UpsertEvals( + structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + job = job.Copy() + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + must.Len(t, 1, h.Plans) + plan := h.Plans[0] + must.Nil(t, plan.Deployment, must.Sprintf("expected no new deployment")) + must.Eq(t, 2, plan.Annotations.DesiredTGUpdates["web"].InPlaceUpdate) + must.MapLen(t, 0, plan.NodeUpdate, must.Sprintf("expected no stops")) + must.MapLen(t, 2, plan.NodeAllocation) + for nodeID, allocs := range plan.NodeAllocation { + must.SliceContains(t, eligible, nodeID) + must.Len(t, 1, allocs) + must.SliceContains(t, existingAllocIDs, allocs[0].ID, + must.Sprintf("expected existing alloc to be updated in-place")) + } +} + +func TestSystemSched_CanariesWithInfeasibleNodes(t *testing.T) { + ci.Parallel(t) + h := tests.NewHarness(t) + + nodes := make([]*structs.Node, 4) + eligible := []string{} + for i := range 4 { + node := mock.Node() + if i%2 == 0 { + node.Attributes["kernel.name"] = "not-linux" + } else { + eligible = append(eligible, node.ID) + } + nodes[i] = node + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + job := mock.SystemJob() + job.TaskGroups[0].Update = &structs.UpdateStrategy{ + MaxParallel: 4, + Canary: 100, // blue-green + } + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + existingAllocIDs := []string{} + allocs := []*structs.Allocation{} + for _, eligibleNode := range eligible { + alloc := mock.MinAllocForJob(job) + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.NodeID = eligibleNode + alloc.Name = structs.AllocName(job.Name, job.TaskGroups[0].Name, 0) + existingAllocIDs = append(existingAllocIDs, alloc.ID) + allocs = append(allocs, alloc) + } + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + d := mock.Deployment() + d.JobID = job.ID + d.JobVersion = job.Version + d.Status = structs.DeploymentStatusSuccessful + must.NoError(t, h.State.UpsertDeployment(h.NextIndex(), d)) + + // destructively update the job + + job = job.Copy() + job.TaskGroups[0].Tasks[0].Resources.CPU++ + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + eval := &structs.Evaluation{ + Namespace: job.Namespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + AnnotatePlan: true, + } + must.NoError(t, h.State.UpsertEvals( + structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + must.Len(t, 1, h.Plans) + plan := h.Plans[0] + must.NotNil(t, plan.Deployment, must.Sprintf("expected a new deployment")) + + dstate := plan.Deployment.TaskGroups["web"] + test.Len(t, 2, dstate.PlacedCanaries) + test.Eq(t, 2, dstate.DesiredCanaries) + test.Eq(t, 2, dstate.DesiredTotal) + + must.Eq(t, 2, plan.Annotations.DesiredTGUpdates["web"].Canary, + must.Sprintf("expected canaries: %#v", plan.Annotations.DesiredTGUpdates)) +} diff --git a/scheduler/util.go b/scheduler/util.go index 9d7b03cb4c9..e003891ece3 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -918,3 +918,28 @@ func genericAllocUpdateFn(ctx feasible.Context, stack feasible.Stack, evalID str return false, false, newAlloc } } + +// mergeNodeFiltered merges allocation metrics for task group +func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric { + if acc == nil { + return curr.Copy() + } + + acc.NodesEvaluated += curr.NodesEvaluated + acc.NodesFiltered += curr.NodesFiltered + + if acc.ClassFiltered == nil { + acc.ClassFiltered = make(map[string]int) + } + for k, v := range curr.ClassFiltered { + acc.ClassFiltered[k] += v + } + if acc.ConstraintFiltered == nil { + acc.ConstraintFiltered = make(map[string]int) + } + for k, v := range curr.ConstraintFiltered { + acc.ConstraintFiltered[k] += v + } + acc.AllocationTime += curr.AllocationTime + return acc +}