diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index f984704ad4f19..c7c4b38d2d77f 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -189,11 +189,16 @@ func (*LitBackfillScheduler) GetEligibleInstances(_ context.Context, _ *proto.Ta return nil, nil } -// IsRetryableErr implements scheduler.Extension.IsRetryableErr interface. +// IsRetryableErr implements scheduler.Extension interface. func (*LitBackfillScheduler) IsRetryableErr(error) bool { return true } +// ModifyMeta implements scheduler.Extension interface. +func (*LitBackfillScheduler) ModifyMeta(oldMeta []byte, _ []proto.Modification) ([]byte, error) { + return oldMeta, nil +} + func getTblInfo(ctx context.Context, d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) { err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error { tblInfo, err = meta.NewMutator(txn).GetTable(job.SchemaID, job.TableID) diff --git a/pkg/disttask/framework/integrationtests/modify_test.go b/pkg/disttask/framework/integrationtests/modify_test.go index 2bb5de71d5ae1..b1d106d7c849b 100644 --- a/pkg/disttask/framework/integrationtests/modify_test.go +++ b/pkg/disttask/framework/integrationtests/modify_test.go @@ -31,7 +31,6 @@ import ( ) type collectedRuntimeInfo struct { - // not used right now, will support modify task specific params in later PR. currentTask *proto.Task subtaskInfos []subtaskRuntimeInfo } @@ -310,4 +309,39 @@ func TestModifyTaskConcurrency(t *testing.T) { {Step: proto.StepTwo, Concurrency: 7}, }, runtimeInfo.subtaskInfos) }) + + t.Run("modify pending task meta, only check the scheduler part", func(t *testing.T) { + defer resetRuntimeInfoFn() + var once sync.Once + modifySyncCh := make(chan struct{}) + var theTask *proto.Task + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() { + once.Do(func() { + task, err := handle.SubmitTask(c.Ctx, "k5", proto.TaskTypeExample, 3, "", []byte("init")) + require.NoError(t, err) + require.Equal(t, 3, task.Concurrency) + require.EqualValues(t, []byte("init"), task.Meta) + require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, task.ID, &proto.ModifyParam{ + PrevState: proto.TaskStatePending, + Modifications: []proto.Modification{ + {Type: proto.ModifyMaxWriteSpeed, To: 123}, + }, + })) + theTask = task + gotTask, err := c.TaskMgr.GetTaskBaseByID(c.Ctx, theTask.ID) + require.NoError(t, err) + require.Equal(t, proto.TaskStateModifying, gotTask.State) + require.Equal(t, 3, gotTask.Concurrency) + <-modifySyncCh + }) + }) + modifySyncCh <- struct{}{} + // finish subtasks + for i := 0; i < 5; i++ { + subtaskCh <- struct{}{} + } + task2Base := testutil.WaitTaskDone(c.Ctx, t, theTask.Key) + require.Equal(t, proto.TaskStateSucceed, task2Base.State) + require.EqualValues(t, []byte("modify_max_write_speed=123"), runtimeInfo.currentTask.Meta) + }) } diff --git a/pkg/disttask/framework/mock/scheduler_mock.go b/pkg/disttask/framework/mock/scheduler_mock.go index 5d614260d96e2..5932d1b3c4221 100644 --- a/pkg/disttask/framework/mock/scheduler_mock.go +++ b/pkg/disttask/framework/mock/scheduler_mock.go @@ -130,6 +130,21 @@ func (mr *MockSchedulerMockRecorder) IsRetryableErr(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsRetryableErr", reflect.TypeOf((*MockScheduler)(nil).IsRetryableErr), arg0) } +// ModifyMeta mocks base method. +func (m *MockScheduler) ModifyMeta(arg0 []byte, arg1 []proto.Modification) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ModifyMeta", arg0, arg1) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ModifyMeta indicates an expected call of ModifyMeta. +func (mr *MockSchedulerMockRecorder) ModifyMeta(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyMeta", reflect.TypeOf((*MockScheduler)(nil).ModifyMeta), arg0, arg1) +} + // OnDone mocks base method. func (m *MockScheduler) OnDone(arg0 context.Context, arg1 storage.TaskHandle, arg2 *proto.Task) error { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/proto/modify.go b/pkg/disttask/framework/proto/modify.go index d81ab62d9f32c..f370fc4639c4b 100644 --- a/pkg/disttask/framework/proto/modify.go +++ b/pkg/disttask/framework/proto/modify.go @@ -27,6 +27,10 @@ func (t ModificationType) String() string { const ( // ModifyConcurrency is the type for modifying task concurrency. ModifyConcurrency ModificationType = "modify_concurrency" + // ModifyBatchSize is the type for modifying batch size of add-index. + ModifyBatchSize ModificationType = "modify_batch_size" + // ModifyMaxWriteSpeed is the type for modifying max write speed of add-index. + ModifyMaxWriteSpeed ModificationType = "modify_max_write_speed" ) // ModifyParam is the parameter for task modification. diff --git a/pkg/disttask/framework/scheduler/BUILD.bazel b/pkg/disttask/framework/scheduler/BUILD.bazel index 69e9648967d7c..658aa8fe02974 100644 --- a/pkg/disttask/framework/scheduler/BUILD.bazel +++ b/pkg/disttask/framework/scheduler/BUILD.bazel @@ -11,14 +11,12 @@ go_library( "scheduler_manager.go", "slots.go", "state_transform.go", - "testutil.go", ], importpath = "github.com/pingcap/tidb/pkg/disttask/framework/scheduler", visibility = ["//visibility:public"], deps = [ "//pkg/disttask/framework/handle", "//pkg/disttask/framework/proto", - "//pkg/disttask/framework/scheduler/mock", "//pkg/disttask/framework/storage", "//pkg/domain/infosync", "//pkg/kv", @@ -35,7 +33,6 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_prometheus_client_golang//prometheus", - "@org_uber_go_mock//gomock", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/disttask/framework/scheduler/interface.go b/pkg/disttask/framework/scheduler/interface.go index 5a30d6cc68c45..a36615facad15 100644 --- a/pkg/disttask/framework/scheduler/interface.go +++ b/pkg/disttask/framework/scheduler/interface.go @@ -131,6 +131,12 @@ type Extension interface { // NOTE: don't depend on task meta to decide the next step, if it's really needed, // initialize required fields on scheduler.Init GetNextStep(task *proto.TaskBase) proto.Step + // ModifyMeta is used to modify the task meta when the task is in modifying + // state, it should return new meta after applying the modifications to the + // old meta. + // Note: the application side only need to modify meta, no need to do notify, + // task executor will do it later. + ModifyMeta(oldMeta []byte, modifies []proto.Modification) ([]byte, error) } // Param is used to pass parameters when creating scheduler. diff --git a/pkg/disttask/framework/scheduler/mock/scheduler_mock.go b/pkg/disttask/framework/scheduler/mock/scheduler_mock.go index e820c22016f16..016388349f47d 100644 --- a/pkg/disttask/framework/scheduler/mock/scheduler_mock.go +++ b/pkg/disttask/framework/scheduler/mock/scheduler_mock.go @@ -89,6 +89,21 @@ func (mr *MockExtensionMockRecorder) IsRetryableErr(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsRetryableErr", reflect.TypeOf((*MockExtension)(nil).IsRetryableErr), arg0) } +// ModifyMeta mocks base method. +func (m *MockExtension) ModifyMeta(arg0 []byte, arg1 []proto.Modification) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ModifyMeta", arg0, arg1) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ModifyMeta indicates an expected call of ModifyMeta. +func (mr *MockExtensionMockRecorder) ModifyMeta(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyMeta", reflect.TypeOf((*MockExtension)(nil).ModifyMeta), arg0, arg1) +} + // OnDone mocks base method. func (m *MockExtension) OnDone(arg0 context.Context, arg1 storage.TaskHandle, arg2 *proto.Task) error { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 1c465520cad30..a7ae30f9e28c6 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -405,6 +405,7 @@ func (s *BaseScheduler) onModifying() (bool, error) { task := s.getTaskClone() s.logger.Info("on modifying state", zap.Stringer("param", &task.ModifyParam)) recreateScheduler := false + metaModifies := make([]proto.Modification, 0, len(task.ModifyParam.Modifications)) for _, m := range task.ModifyParam.Modifications { if m.Type == proto.ModifyConcurrency { if task.Concurrency == int(m.To) { @@ -416,10 +417,17 @@ func (s *BaseScheduler) onModifying() (bool, error) { recreateScheduler = true task.Concurrency = int(m.To) } else { - // will implement other modification types later. - s.logger.Warn("unsupported modification type", zap.Stringer("type", m.Type)) + metaModifies = append(metaModifies, m) } } + if len(metaModifies) > 0 { + s.logger.Info("modify task meta", zap.Stringers("modifies", metaModifies)) + newMeta, err := s.ModifyMeta(task.Meta, metaModifies) + if err != nil { + return false, errors.Trace(err) + } + task.Meta = newMeta + } if err := s.taskMgr.ModifiedTask(s.ctx, task); err != nil { return false, errors.Trace(err) } diff --git a/pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go b/pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go index 68305ff508f87..92ca494441ecc 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go @@ -22,10 +22,37 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/mock" "github.com/pingcap/tidb/pkg/disttask/framework/proto" + mockScheduler "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mock" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) +// GetTestSchedulerExt return scheduler.Extension for testing. +func GetTestSchedulerExt(ctrl *gomock.Controller) Extension { + mockScheduler := mockScheduler.NewMockExtension(ctrl) + mockScheduler.EXPECT().OnTick(gomock.Any(), gomock.Any()).Return().AnyTimes() + mockScheduler.EXPECT().GetEligibleInstances(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ *proto.Task) ([]string, error) { + return nil, nil + }, + ).AnyTimes() + mockScheduler.EXPECT().IsRetryableErr(gomock.Any()).Return(true).AnyTimes() + mockScheduler.EXPECT().GetNextStep(gomock.Any()).DoAndReturn( + func(_ *proto.Task) proto.Step { + return proto.StepDone + }, + ).AnyTimes() + mockScheduler.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ storage.TaskHandle, _ *proto.Task, _ []string, _ proto.Step) (metas [][]byte, err error) { + return nil, nil + }, + ).AnyTimes() + + mockScheduler.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return mockScheduler +} + func TestManagerSchedulersOrdered(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/pkg/disttask/framework/scheduler/scheduler_nokit_test.go b/pkg/disttask/framework/scheduler/scheduler_nokit_test.go index 181397bf501ba..129f6d1d45383 100644 --- a/pkg/disttask/framework/scheduler/scheduler_nokit_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_nokit_test.go @@ -348,6 +348,9 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { }) scheduler.Extension = schExt + runningTask := task + runningTask.State = proto.TaskStateRunning + t.Run("test onPausing", func(t *testing.T) { scheduler.task.Store(&schTask) @@ -490,8 +493,8 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { require.True(t, ctrl.Satisfied()) }) - t.Run("test on modifying", func(t *testing.T) { - taskBefore := schTask + t.Run("test on modifying, failed to update system table", func(t *testing.T) { + taskBefore := runningTask taskBefore.State = proto.TaskStateModifying taskBefore.ModifyParam = proto.ModifyParam{ PrevState: proto.TaskStateRunning, @@ -504,15 +507,68 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { recreateScheduler, err := scheduler.onModifying() require.ErrorContains(t, err, "modify err") require.False(t, recreateScheduler) + require.Equal(t, taskBefore, *scheduler.GetTask()) + require.True(t, ctrl.Satisfied()) + }) + t.Run("test on modifying concurrency, success", func(t *testing.T) { + taskBefore := runningTask + taskBefore.State = proto.TaskStateModifying + taskBefore.ModifyParam = proto.ModifyParam{ + PrevState: proto.TaskStateRunning, + Modifications: []proto.Modification{ + {Type: proto.ModifyConcurrency, To: 123}, + }, + } + scheduler.task.Store(&taskBefore) taskMgr.EXPECT().ModifiedTask(gomock.Any(), gomock.Any()).Return(nil) - recreateScheduler, err = scheduler.onModifying() + recreateScheduler, err := scheduler.onModifying() require.NoError(t, err) require.True(t, recreateScheduler) - expectedTask := taskBefore + expectedTask := runningTask expectedTask.Concurrency = 123 - expectedTask.State = proto.TaskStateRunning - expectedTask.ModifyParam = proto.ModifyParam{} - require.Equal(t, *scheduler.GetTask(), expectedTask) + require.Equal(t, expectedTask, *scheduler.GetTask()) + require.True(t, ctrl.Satisfied()) + }) + + t.Run("test on modifying task meta, failed to get new meta", func(t *testing.T) { + taskBefore := runningTask + taskBefore.State = proto.TaskStateModifying + taskBefore.ModifyParam = proto.ModifyParam{ + PrevState: proto.TaskStateRunning, + Modifications: []proto.Modification{ + {Type: proto.ModifyMaxWriteSpeed, To: 11111}, + }, + } + scheduler.task.Store(&taskBefore) + schExt.EXPECT().ModifyMeta(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("modify meta err")) + recreateScheduler, err := scheduler.onModifying() + require.ErrorContains(t, err, "modify meta err") + require.False(t, recreateScheduler) + require.Equal(t, taskBefore, *scheduler.GetTask()) + require.True(t, ctrl.Satisfied()) + }) + + t.Run("test on modifying concurrency and task meta, success", func(t *testing.T) { + taskBefore := runningTask + taskBefore.State = proto.TaskStateModifying + taskBefore.ModifyParam = proto.ModifyParam{ + PrevState: proto.TaskStateRunning, + Modifications: []proto.Modification{ + {Type: proto.ModifyConcurrency, To: 123}, + {Type: proto.ModifyMaxWriteSpeed, To: 11111}, + }, + } + scheduler.task.Store(&taskBefore) + schExt.EXPECT().ModifyMeta(gomock.Any(), gomock.Any()).Return([]byte("max-11111"), nil) + taskMgr.EXPECT().ModifiedTask(gomock.Any(), gomock.Any()).Return(nil) + recreateScheduler, err := scheduler.onModifying() + require.NoError(t, err) + require.True(t, recreateScheduler) + expectedTask := runningTask + expectedTask.Concurrency = 123 + expectedTask.Meta = []byte("max-11111") + require.Equal(t, expectedTask, *scheduler.GetTask()) + require.True(t, ctrl.Satisfied()) }) } diff --git a/pkg/disttask/framework/scheduler/testutil.go b/pkg/disttask/framework/scheduler/testutil.go deleted file mode 100644 index 6303dde35fcdb..0000000000000 --- a/pkg/disttask/framework/scheduler/testutil.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package scheduler - -import ( - "context" - - "github.com/pingcap/tidb/pkg/disttask/framework/proto" - mockScheduler "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mock" - "github.com/pingcap/tidb/pkg/disttask/framework/storage" - "go.uber.org/mock/gomock" -) - -// GetTestSchedulerExt return scheduler.Extension for testing. -func GetTestSchedulerExt(ctrl *gomock.Controller) Extension { - mockScheduler := mockScheduler.NewMockExtension(ctrl) - mockScheduler.EXPECT().OnTick(gomock.Any(), gomock.Any()).Return().AnyTimes() - mockScheduler.EXPECT().GetEligibleInstances(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ *proto.Task) ([]string, error) { - return nil, nil - }, - ).AnyTimes() - mockScheduler.EXPECT().IsRetryableErr(gomock.Any()).Return(true).AnyTimes() - mockScheduler.EXPECT().GetNextStep(gomock.Any()).DoAndReturn( - func(_ *proto.Task) proto.Step { - return proto.StepDone - }, - ).AnyTimes() - mockScheduler.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ storage.TaskHandle, _ *proto.Task, _ []string, _ proto.Step) (metas [][]byte, err error) { - return nil, nil - }, - ).AnyTimes() - - mockScheduler.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - return mockScheduler -} diff --git a/pkg/disttask/framework/testutil/scheduler_util.go b/pkg/disttask/framework/testutil/scheduler_util.go index d9983ac44e15e..9c55cbc6ba061 100644 --- a/pkg/disttask/framework/testutil/scheduler_util.go +++ b/pkg/disttask/framework/testutil/scheduler_util.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "math" + "strings" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" @@ -99,6 +100,18 @@ func GetMockSchedulerExt(ctrl *gomock.Controller, schedulerInfo SchedulerInfo) s return res, nil }, ).AnyTimes() + mockScheduler.EXPECT().ModifyMeta(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ []byte, modifies []proto.Modification) ([]byte, error) { + var sb strings.Builder + for i, m := range modifies { + if i > 0 { + sb.WriteString(",") + } + sb.WriteString(fmt.Sprintf("%s=%d", m.Type, m.To)) + } + return []byte(sb.String()), nil + }, + ).AnyTimes() mockScheduler.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() return mockScheduler diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index 143bc83c99068..792f08aca2b04 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -466,6 +466,11 @@ func (sch *importScheduler) updateCurrentTask(task *proto.Task) { } } +// ModifyMeta implements scheduler.Extension interface. +func (*importScheduler) ModifyMeta(oldMeta []byte, _ []proto.Modification) ([]byte, error) { + return oldMeta, nil +} + // nolint:deadcode func dropTableIndexes(ctx context.Context, handle storage.TaskHandle, taskMeta *TaskMeta, logger *zap.Logger) error { tblInfo := taskMeta.Plan.TableInfo