Skip to content

Commit

Permalink
disttask: fix keep reporting error when cancel pending task (#48906)
Browse files Browse the repository at this point in the history
close #48890
  • Loading branch information
D3Hunter authored Nov 27, 2023
1 parent 6ead0ee commit 01d441e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ go_test(
],
flaky = True,
race = "off",
shard_count = 31,
shard_count = 32,
deps = [
"//pkg/disttask/framework/dispatcher",
"//pkg/disttask/framework/handle",
Expand Down
26 changes: 17 additions & 9 deletions pkg/disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,20 @@ func (d *BaseDispatcher) onErrHandlingStage(receiveErrs []error) error {
}

func (d *BaseDispatcher) dispatchSubTask4Revert(meta []byte) error {
instanceIDs, err := d.GetAllSchedulerIDs(d.ctx, d.Task)
if err != nil {
logutil.Logger(d.logCtx).Warn("get task's all instances failed", zap.Error(err))
return err
}
var subTasks []*proto.Subtask
// when step of task is `StepInit`, no need to do revert
if d.Task.Step != proto.StepInit {
instanceIDs, err := d.GetAllSchedulerIDs(d.ctx, d.Task)
if err != nil {
logutil.Logger(d.logCtx).Warn("get task's all instances failed", zap.Error(err))
return err
}

subTasks := make([]*proto.Subtask, 0, len(instanceIDs))
for _, id := range instanceIDs {
// reverting subtasks belong to the same step as current active step.
subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, meta))
subTasks = make([]*proto.Subtask, 0, len(instanceIDs))
for _, id := range instanceIDs {
// reverting subtasks belong to the same step as current active step.
subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, meta))
}
}
return d.updateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes)
}
Expand Down Expand Up @@ -615,6 +619,10 @@ func (d *BaseDispatcher) dispatchSubTask(
logutil.Logger(d.logCtx).Debug("create subtasks", zap.String("instanceID", instanceID))
subTasks = append(subTasks, proto.NewSubtask(subtaskStep, d.Task.ID, d.Task.Type, instanceID, meta))
}
failpoint.Inject("cancelBeforeUpdateTask", func() {
_ = d.updateTask(proto.TaskStateCancelling, subTasks, RetrySQLTimes)
})

return d.updateTask(d.Task.State, subTasks, RetrySQLTimes)
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,3 +728,18 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
require.NotEmpty(t, tasks)
distContext.Close()
}

func TestTaskCancelledBeforeUpdateTask(t *testing.T) {
var m sync.Map
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()
ctx = util.WithInternalSourceType(ctx, "dispatcher")

RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{})
distContext := testkit.NewDistExecutionContext(t, 1)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/cancelBeforeUpdateTask", "1*return(true)"))
DispatchTaskAndCheckState(ctx, "key1", t, &m, proto.TaskStateReverted)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/cancelBeforeUpdateTask"))
distContext.Close()
}

0 comments on commit 01d441e

Please sign in to comment.