Skip to content

Commit 838bcd8

Browse files
committed
the mystery of shadowboxing
1 parent d709da2 commit 838bcd8

File tree

2 files changed

+76
-38
lines changed

2 files changed

+76
-38
lines changed

scheduler/scheduler_system.go

Lines changed: 72 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
281281

282282
// Find which of the eligible nodes are actually feasible for which TG. This way
283283
// we get correct DesiredTotal and DesiredCanaries counts in the reconciler.
284-
s.feasibleNodesForTG = s.findFeasibleNodesForTG(live)
284+
s.feasibleNodesForTG = s.findIgnorableNodes(live)
285285

286286
// Diff the required and existing allocations
287287
nr := reconciler.NewNodeReconciler(s.deployment)
@@ -334,6 +334,32 @@ func (s *SystemScheduler) computeJobAllocs() error {
334334
DesiredTGUpdates: desiredUpdates(reconciliationResult, inplaceUpdates, destructiveUpdates),
335335
}
336336

337+
// any further logic depends on whether we're canarying or not
338+
isCanarying := map[string]bool{}
339+
if s.job != nil && s.deployment != nil {
340+
for _, tg := range s.job.TaskGroups {
341+
dstate, ok := s.deployment.TaskGroups[tg.Name]
342+
if !ok {
343+
continue
344+
}
345+
// a system job is canarying if:
346+
// - it has a non-empty update block (just a sanity check, all
347+
// submitted jobs should have a non-empty update block as part of
348+
// canonicalization)
349+
// - canary parameter in the update block has to be positive
350+
// - deployment has to be non-nil and it cannot have been promoted
351+
// - this cannot be the initial job version
352+
isCanarying[tg.Name] = !tg.Update.IsEmpty() &&
353+
tg.Update.Canary > 0 &&
354+
dstate != nil &&
355+
!dstate.Promoted &&
356+
s.job.Version != 0
357+
}
358+
}
359+
360+
// find feasible nodes for each TG before we do any maxParallel evictions
361+
s.findFeasibleNodesForTG(reconciliationResult.Update)
362+
337363
// Treat non in-place updates as an eviction and new placement, which will
338364
// be limited by max_parallel
339365
s.limitReached = evictAndPlace(s.ctx, s.job, reconciliationResult, sstructs.StatusAllocUpdating)
@@ -358,6 +384,10 @@ func (s *SystemScheduler) computeJobAllocs() error {
358384
return err
359385
}
360386

387+
for k, v := range s.feasibleNodesForTG {
388+
fmt.Printf("found %d feasible nodes for tg %v: %v\n", v.Size(), k, v.String())
389+
}
390+
361391
// if there is not deployment we're done at this point
362392
if s.deployment == nil {
363393
return nil
@@ -367,13 +397,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
367397
// nodes, so for system jobs we do it backwards a bit: the "desired" total
368398
// is the total we were able to place.
369399
// track if any of the task groups is doing a canary update now
370-
isCanarying := map[string]bool{}
371400
for _, tg := range s.job.TaskGroups {
372-
dstate, ok := s.deployment.TaskGroups[tg.Name]
373-
if !ok {
374-
continue
375-
}
376-
377401
feasibleNodes, ok := s.feasibleNodesForTG[tg.Name]
378402
if !ok {
379403
// this will happen if we're seeing a TG that shouldn't be placed; we only ever
@@ -384,19 +408,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
384408

385409
s.deployment.TaskGroups[tg.Name].DesiredTotal = feasibleNodes.Size()
386410

387-
// a system job is canarying if:
388-
// - it has a non-empty update block (just a sanity check, all submitted
389-
// jobs should have a non-empty update block as part of
390-
// canonicalization)
391-
// - canary parameter in the update block has to be positive
392-
// - deployment has to be non-nil and it cannot have been promoted
393-
// - this cannot be the initial job version
394-
isCanarying[tg.Name] = !tg.Update.IsEmpty() &&
395-
tg.Update.Canary > 0 &&
396-
dstate != nil &&
397-
!dstate.Promoted &&
398-
s.job.Version != 0
399-
411+
// if this TG isn't canarying, we're done
400412
if !isCanarying[tg.Name] {
401413
continue
402414
}
@@ -476,7 +488,10 @@ func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric {
476488
return acc
477489
}
478490

479-
func (s *SystemScheduler) findFeasibleNodesForTG(allocs []*structs.Allocation) map[string]*set.Set[string] {
491+
// findIgnorableNodes checks if there are allocations deployed to nodes that are
492+
// from the same job version as ours, and can thus be omitted from feasibility
493+
// checks
494+
func (s *SystemScheduler) findIgnorableNodes(allocs []*structs.Allocation) map[string]*set.Set[string] {
480495
if s.job == nil {
481496
return nil
482497
}
@@ -503,6 +518,31 @@ func (s *SystemScheduler) findFeasibleNodesForTG(allocs []*structs.Allocation) m
503518
return feasibleNodes
504519
}
505520

521+
func (s *SystemScheduler) findFeasibleNodesForTG(updates []reconciler.AllocTuple) {
522+
for _, a := range updates {
523+
tgName := a.TaskGroup.Name
524+
fmt.Printf("looking for feasible node for tg %s\n", tgName)
525+
526+
s.stack.SetNodes(s.nodes)
527+
528+
// Attempt to match the task group
529+
option := s.stack.Select(a.TaskGroup, &feasible.SelectOptions{AllocName: a.Name})
530+
531+
if option == nil {
532+
fmt.Printf("no feasible node found for %v!\n", a.Alloc)
533+
continue
534+
}
535+
536+
fmt.Printf("found feasible node %v for tg %v\n", option.Node.ID, tgName)
537+
// count this node as feasible
538+
if s.feasibleNodesForTG[tgName] == nil {
539+
s.feasibleNodesForTG[tgName] = set.New[string](0)
540+
}
541+
542+
s.feasibleNodesForTG[tgName].Insert(option.Node.ID)
543+
}
544+
}
545+
506546
// computePlacements computes placements for allocations
507547
func (s *SystemScheduler) computePlacements(
508548
reconcilerResult *reconciler.NodeReconcileResult, existingByTaskGroup map[string]bool,
@@ -521,6 +561,10 @@ func (s *SystemScheduler) computePlacements(
521561
deploymentID = s.deployment.ID
522562
}
523563

564+
for _, r := range reconcilerResult.Place {
565+
fmt.Println(r.Name)
566+
}
567+
524568
nodes := make([]*structs.Node, 1)
525569
for _, missing := range reconcilerResult.Place {
526570
tgName := missing.TaskGroup.Name
@@ -571,10 +615,6 @@ func (s *SystemScheduler) computePlacements(
571615
s.planAnnotations.DesiredTGUpdates[tgName].Place -= 1
572616
}
573617

574-
if s.plan.Deployment != nil {
575-
s.deployment.TaskGroups[tgName].DesiredTotal -= 1
576-
}
577-
578618
// Filtered nodes are not reported to users, just omitted from the job status
579619
continue
580620
}
@@ -744,13 +784,13 @@ func (s *SystemScheduler) canHandle(trigger string) bool {
744784
}
745785

746786
// evictAndPlace is used to mark allocations for evicts and add them to the
747-
// placement queue. evictAndPlace modifies the diffResult. It returns true if
748-
// the limit has been reached for any task group.
749-
func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.NodeReconcileResult, desc string) bool {
787+
// placement queue. evictAndPlace modifies the reconciler result. It returns
788+
// true if the limit has been reached for any task group.
789+
func evictAndPlace(ctx feasible.Context, job *structs.Job, reconciled *reconciler.NodeReconcileResult, desc string) bool {
750790

751791
limits := map[string]int{} // per task group limits
752792
if !job.Stopped() {
753-
jobLimit := len(diff.Update)
793+
jobLimit := len(reconciled.Update)
754794
if job.Update.MaxParallel > 0 {
755795
jobLimit = job.Update.MaxParallel
756796
}
@@ -764,13 +804,11 @@ func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.Node
764804
}
765805

766806
limited := false
767-
for _, a := range diff.Update {
807+
for _, a := range reconciled.Update {
768808
if limit := limits[a.Alloc.TaskGroup]; limit > 0 {
769809
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "")
770-
diff.Place = append(diff.Place, a)
771-
if !a.Canary {
772-
limits[a.Alloc.TaskGroup]--
773-
}
810+
reconciled.Place = append(reconciled.Place, a)
811+
limits[a.Alloc.TaskGroup]--
774812
} else {
775813
limited = true
776814
}

scheduler/scheduler_system_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3383,8 +3383,8 @@ func TestEvictAndPlace(t *testing.T) {
33833383

33843384
}
33853385

3386-
// TestSystemScheduler_UpdateBlock tests various permutations of the update block
3387-
func TestSystemScheduler_UpdateBlock(t *testing.T) {
3386+
// TestSystemSched_UpdateBlock tests various permutations of the update block
3387+
func TestSystemSched_UpdateBlock(t *testing.T) {
33883388
ci.Parallel(t)
33893389

33903390
collect := func(planned map[string][]*structs.Allocation) map[string]int {
@@ -3597,8 +3597,8 @@ func TestSystemScheduler_UpdateBlock(t *testing.T) {
35973597
tg1: {
35983598
DesiredTotal: 10,
35993599
DesiredCanaries: 3,
3600-
PlacedCanaries: []string{"0", "1", "2"},
3601-
PlacedAllocs: 3,
3600+
// PlacedCanaries: []string{"0", "1", "2"},
3601+
PlacedAllocs: 2,
36023602
},
36033603
tg2: {DesiredTotal: 10, PlacedAllocs: 5},
36043604
},

0 commit comments

Comments
 (0)