Skip to content

Commit

Permalink
dxf: support modify app param on scheduler part (#58990)
Browse files Browse the repository at this point in the history
ref #57497
  • Loading branch information
D3Hunter authored Jan 20, 2025
1 parent 6cceaba commit 62f9a39
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 63 deletions.
7 changes: 6 additions & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,16 @@ func (*LitBackfillScheduler) GetEligibleInstances(_ context.Context, _ *proto.Ta
return nil, nil
}

// IsRetryableErr implements scheduler.Extension.IsRetryableErr interface.
// IsRetryableErr implements scheduler.Extension interface.
func (*LitBackfillScheduler) IsRetryableErr(error) bool {
return true
}

// ModifyMeta implements scheduler.Extension interface.
func (*LitBackfillScheduler) ModifyMeta(oldMeta []byte, _ []proto.Modification) ([]byte, error) {
return oldMeta, nil
}

func getTblInfo(ctx context.Context, d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) {
err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error {
tblInfo, err = meta.NewMutator(txn).GetTable(job.SchemaID, job.TableID)
Expand Down
36 changes: 35 additions & 1 deletion pkg/disttask/framework/integrationtests/modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
)

type collectedRuntimeInfo struct {
// not used right now, will support modify task specific params in later PR.
currentTask *proto.Task
subtaskInfos []subtaskRuntimeInfo
}
Expand Down Expand Up @@ -310,4 +309,39 @@ func TestModifyTaskConcurrency(t *testing.T) {
{Step: proto.StepTwo, Concurrency: 7},
}, runtimeInfo.subtaskInfos)
})

t.Run("modify pending task meta, only check the scheduler part", func(t *testing.T) {
defer resetRuntimeInfoFn()
var once sync.Once
modifySyncCh := make(chan struct{})
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k5", proto.TaskTypeExample, 3, "", []byte("init"))
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
require.EqualValues(t, []byte("init"), task.Meta)
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, task.ID, &proto.ModifyParam{
PrevState: proto.TaskStatePending,
Modifications: []proto.Modification{
{Type: proto.ModifyMaxWriteSpeed, To: 123},
},
}))
theTask = task
gotTask, err := c.TaskMgr.GetTaskBaseByID(c.Ctx, theTask.ID)
require.NoError(t, err)
require.Equal(t, proto.TaskStateModifying, gotTask.State)
require.Equal(t, 3, gotTask.Concurrency)
<-modifySyncCh
})
})
modifySyncCh <- struct{}{}
// finish subtasks
for i := 0; i < 5; i++ {
subtaskCh <- struct{}{}
}
task2Base := testutil.WaitTaskDone(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
require.EqualValues(t, []byte("modify_max_write_speed=123"), runtimeInfo.currentTask.Meta)
})
}
15 changes: 15 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/disttask/framework/proto/modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (t ModificationType) String() string {
const (
// ModifyConcurrency is the type for modifying task concurrency.
ModifyConcurrency ModificationType = "modify_concurrency"
// ModifyBatchSize is the type for modifying batch size of add-index.
ModifyBatchSize ModificationType = "modify_batch_size"
// ModifyMaxWriteSpeed is the type for modifying max write speed of add-index.
ModifyMaxWriteSpeed ModificationType = "modify_max_write_speed"
)

// ModifyParam is the parameter for task modification.
Expand Down
3 changes: 0 additions & 3 deletions pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ go_library(
"scheduler_manager.go",
"slots.go",
"state_transform.go",
"testutil.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/scheduler",
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler/mock",
"//pkg/disttask/framework/storage",
"//pkg/domain/infosync",
"//pkg/kv",
Expand All @@ -35,7 +33,6 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_golang//prometheus",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
],
)
Expand Down
6 changes: 6 additions & 0 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ type Extension interface {
// NOTE: don't depend on task meta to decide the next step, if it's really needed,
// initialize required fields on scheduler.Init
GetNextStep(task *proto.TaskBase) proto.Step
// ModifyMeta is used to modify the task meta when the task is in modifying
// state, it should return new meta after applying the modifications to the
// old meta.
// Note: the application side only need to modify meta, no need to do notify,
// task executor will do it later.
ModifyMeta(oldMeta []byte, modifies []proto.Modification) ([]byte, error)
}

// Param is used to pass parameters when creating scheduler.
Expand Down
15 changes: 15 additions & 0 deletions pkg/disttask/framework/scheduler/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func (s *BaseScheduler) onModifying() (bool, error) {
task := s.getTaskClone()
s.logger.Info("on modifying state", zap.Stringer("param", &task.ModifyParam))
recreateScheduler := false
metaModifies := make([]proto.Modification, 0, len(task.ModifyParam.Modifications))
for _, m := range task.ModifyParam.Modifications {
if m.Type == proto.ModifyConcurrency {
if task.Concurrency == int(m.To) {
Expand All @@ -416,10 +417,17 @@ func (s *BaseScheduler) onModifying() (bool, error) {
recreateScheduler = true
task.Concurrency = int(m.To)
} else {
// will implement other modification types later.
s.logger.Warn("unsupported modification type", zap.Stringer("type", m.Type))
metaModifies = append(metaModifies, m)
}
}
if len(metaModifies) > 0 {
s.logger.Info("modify task meta", zap.Stringers("modifies", metaModifies))
newMeta, err := s.ModifyMeta(task.Meta, metaModifies)
if err != nil {
return false, errors.Trace(err)
}
task.Meta = newMeta
}
if err := s.taskMgr.ModifiedTask(s.ctx, task); err != nil {
return false, errors.Trace(err)
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,37 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
mockScheduler "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

// GetTestSchedulerExt return scheduler.Extension for testing.
func GetTestSchedulerExt(ctrl *gomock.Controller) Extension {
mockScheduler := mockScheduler.NewMockExtension(ctrl)
mockScheduler.EXPECT().OnTick(gomock.Any(), gomock.Any()).Return().AnyTimes()
mockScheduler.EXPECT().GetEligibleInstances(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ *proto.Task) ([]string, error) {
return nil, nil
},
).AnyTimes()
mockScheduler.EXPECT().IsRetryableErr(gomock.Any()).Return(true).AnyTimes()
mockScheduler.EXPECT().GetNextStep(gomock.Any()).DoAndReturn(
func(_ *proto.Task) proto.Step {
return proto.StepDone
},
).AnyTimes()
mockScheduler.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ storage.TaskHandle, _ *proto.Task, _ []string, _ proto.Step) (metas [][]byte, err error) {
return nil, nil
},
).AnyTimes()

mockScheduler.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return mockScheduler
}

func TestManagerSchedulersOrdered(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
70 changes: 63 additions & 7 deletions pkg/disttask/framework/scheduler/scheduler_nokit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
})
scheduler.Extension = schExt

runningTask := task
runningTask.State = proto.TaskStateRunning

t.Run("test onPausing", func(t *testing.T) {
scheduler.task.Store(&schTask)

Expand Down Expand Up @@ -490,8 +493,8 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
require.True(t, ctrl.Satisfied())
})

t.Run("test on modifying", func(t *testing.T) {
taskBefore := schTask
t.Run("test on modifying, failed to update system table", func(t *testing.T) {
taskBefore := runningTask
taskBefore.State = proto.TaskStateModifying
taskBefore.ModifyParam = proto.ModifyParam{
PrevState: proto.TaskStateRunning,
Expand All @@ -504,15 +507,68 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
recreateScheduler, err := scheduler.onModifying()
require.ErrorContains(t, err, "modify err")
require.False(t, recreateScheduler)
require.Equal(t, taskBefore, *scheduler.GetTask())
require.True(t, ctrl.Satisfied())
})

t.Run("test on modifying concurrency, success", func(t *testing.T) {
taskBefore := runningTask
taskBefore.State = proto.TaskStateModifying
taskBefore.ModifyParam = proto.ModifyParam{
PrevState: proto.TaskStateRunning,
Modifications: []proto.Modification{
{Type: proto.ModifyConcurrency, To: 123},
},
}
scheduler.task.Store(&taskBefore)
taskMgr.EXPECT().ModifiedTask(gomock.Any(), gomock.Any()).Return(nil)
recreateScheduler, err = scheduler.onModifying()
recreateScheduler, err := scheduler.onModifying()
require.NoError(t, err)
require.True(t, recreateScheduler)
expectedTask := taskBefore
expectedTask := runningTask
expectedTask.Concurrency = 123
expectedTask.State = proto.TaskStateRunning
expectedTask.ModifyParam = proto.ModifyParam{}
require.Equal(t, *scheduler.GetTask(), expectedTask)
require.Equal(t, expectedTask, *scheduler.GetTask())
require.True(t, ctrl.Satisfied())
})

t.Run("test on modifying task meta, failed to get new meta", func(t *testing.T) {
taskBefore := runningTask
taskBefore.State = proto.TaskStateModifying
taskBefore.ModifyParam = proto.ModifyParam{
PrevState: proto.TaskStateRunning,
Modifications: []proto.Modification{
{Type: proto.ModifyMaxWriteSpeed, To: 11111},
},
}
scheduler.task.Store(&taskBefore)
schExt.EXPECT().ModifyMeta(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("modify meta err"))
recreateScheduler, err := scheduler.onModifying()
require.ErrorContains(t, err, "modify meta err")
require.False(t, recreateScheduler)
require.Equal(t, taskBefore, *scheduler.GetTask())
require.True(t, ctrl.Satisfied())
})

t.Run("test on modifying concurrency and task meta, success", func(t *testing.T) {
taskBefore := runningTask
taskBefore.State = proto.TaskStateModifying
taskBefore.ModifyParam = proto.ModifyParam{
PrevState: proto.TaskStateRunning,
Modifications: []proto.Modification{
{Type: proto.ModifyConcurrency, To: 123},
{Type: proto.ModifyMaxWriteSpeed, To: 11111},
},
}
scheduler.task.Store(&taskBefore)
schExt.EXPECT().ModifyMeta(gomock.Any(), gomock.Any()).Return([]byte("max-11111"), nil)
taskMgr.EXPECT().ModifiedTask(gomock.Any(), gomock.Any()).Return(nil)
recreateScheduler, err := scheduler.onModifying()
require.NoError(t, err)
require.True(t, recreateScheduler)
expectedTask := runningTask
expectedTask.Concurrency = 123
expectedTask.Meta = []byte("max-11111")
require.Equal(t, expectedTask, *scheduler.GetTask())
require.True(t, ctrl.Satisfied())
})
}
49 changes: 0 additions & 49 deletions pkg/disttask/framework/scheduler/testutil.go

This file was deleted.

13 changes: 13 additions & 0 deletions pkg/disttask/framework/testutil/scheduler_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"math"
"strings"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
Expand Down Expand Up @@ -99,6 +100,18 @@ func GetMockSchedulerExt(ctrl *gomock.Controller, schedulerInfo SchedulerInfo) s
return res, nil
},
).AnyTimes()
mockScheduler.EXPECT().ModifyMeta(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ []byte, modifies []proto.Modification) ([]byte, error) {
var sb strings.Builder
for i, m := range modifies {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(fmt.Sprintf("%s=%d", m.Type, m.To))
}
return []byte(sb.String()), nil
},
).AnyTimes()

mockScheduler.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return mockScheduler
Expand Down
Loading

0 comments on commit 62f9a39

Please sign in to comment.