diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 11171b7253459..1c465520cad30 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -505,7 +505,17 @@ func (s *BaseScheduler) scheduleSubTask( var size uint64 subTasks := make([]*proto.Subtask, 0, len(metas)) for i, meta := range metas { - // we assign the subtask to the instance in a round-robin way. + // our schedule target is to maximize the resource usage of all nodes while + // fulfilling the target of schedule tasks in the task order, we will try + // to pack the subtasks of different tasks onto as minimal number of nodes + // as possible, to allow later tasks of higher concurrency can be scheduled + // and run, so we order nodes, see TaskManager.GetAllNodes and assign the + // subtask to the instance in a round-robin way. + // for example: + // - we have 2 node N1 and N2 of 8 cores. + // - we have 2 tasks T1 and T2, each is of thread 4 and have 1 subtask. + // - subtasks of T1 and T2 are all scheduled to N1 + // - later we have a task T3 of thread 8, we can schedule it to N2. pos := i % len(adjustedEligibleNodes) instanceID := adjustedEligibleNodes[pos] s.logger.Debug("create subtasks", zap.String("instanceID", instanceID))