Skip to content

Commit

Permalink
fix run loop
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Feb 4, 2025
1 parent d5c9957 commit 4e939ea
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 3 deletions.
5 changes: 5 additions & 0 deletions orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
)
}

cmds = append(cmds,
work.CmdScheduleNextJob(),
)
if s.ExecOutWalker != nil {
cmds = append(cmds, execout.CmdDownloadSegment(0))
}
Expand Down Expand Up @@ -133,12 +136,14 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
case stage.MsgMergeFinished:
s.Stages.MergeCompleted(msg.Unit)
cmds = append(cmds,
work.CmdScheduleNextJob(),
s.Stages.CmdTryMerge(msg.Stage),
)

case stage.MsgAllStoresCompleted:
s.storesSyncCompleted = true
cmds = append(cmds,
work.CmdScheduleNextJob(), // in case some mapper jobs need scheduling
s.cmdShutdownWhenComplete(),
)

Expand Down
2 changes: 0 additions & 2 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,12 @@ func (s *Stages) AllStoresCompleted() bool {
}
lastSegment := s.storeSegmenter.LastIndex()

fmt.Println("----------------------------------")
for idx, stage := range s.stages {
if stage.kind != KindStore {
continue
}
for seg := s.storeSegmenter.FirstIndex(); seg <= lastSegment; seg++ {
state := s.getState(Unit{Segment: seg, Stage: idx})
fmt.Println("stage: ", idx, "segment: ", seg, "status:", state)
if state != UnitCompleted && state != UnitNoOp {
return false
}
Expand Down
1 change: 0 additions & 1 deletion test/runnable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (f *testRun) run(t *testing.T, testName string) error {
count := 0
workerFactory := func(ctx context.Context) work.Worker {
count++
fmt.Println("factoring worker ", count)
return &TestWorker{
t: t,
responseCollector: newResponseCollector(ctx),
Expand Down

0 comments on commit 4e939ea

Please sign in to comment.