From e693b3e755ba7441f068cd7f09fe22c18eee0e46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Tue, 7 Jan 2025 18:24:57 +0800 Subject: [PATCH 1/7] add scheduler config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/schedule/operator/kind.go | 2 + pkg/schedule/schedulers/balance_key_range.go | 168 ++++++++++++++++++ .../schedulers/balance_key_range_test.go | 1 + pkg/schedule/schedulers/init.go | 25 +++ pkg/schedule/types/type.go | 2 + 5 files changed, 198 insertions(+) create mode 100644 pkg/schedule/schedulers/balance_key_range.go create mode 100644 pkg/schedule/schedulers/balance_key_range_test.go diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index 0187a64c568..0c99a6b7a17 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -35,6 +35,8 @@ const ( OpMerge // Initiated by range scheduler. OpRange + // Initiated by key range scheduler. + OpKeyRange // Initiated by replica checker. OpReplica // Include region split. Initiated by rule checker if `kind & OpAdmin == 0`. diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go new file mode 100644 index 00000000000..454eb88089a --- /dev/null +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -0,0 +1,168 @@ +package schedulers + +import ( + "net/http" + "net/url" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/log" + _ "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" + "github.com/tikv/pd/pkg/errs" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/unrolled/render" +) + +const ( + DefaultTimeout = 1 * time.Hour +) + +type balanceKeyRangeSchedulerHandler struct { + rd *render.Render + config *balanceKeyRangeSchedulerConfig +} + +func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handler { + handler := &balanceKeyRangeSchedulerHandler{ + config: conf, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet) + return router +} + +func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, r *http.Request) { + handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported") +} + +func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) { + conf := handler.config.clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +type balanceKeyRangeSchedulerConfig struct { + baseDefaultSchedulerConfig + balanceKeyRangeSchedulerParam +} + +type balanceKeyRangeSchedulerParam struct { + Role string `json:"role"` + Engine string `json:"engine"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + Timeout time.Duration `json:"timeout"` +} + +func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) { + conf.RLock() + defer conf.RUnlock() + return EncodeConfig(conf) +} + +func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerParam { + conf.RLock() + defer conf.RUnlock() + return &balanceKeyRangeSchedulerParam{ + Role: conf.Role, + Engine: conf.Engine, + StartKey: conf.StartKey, + EndKey: conf.EndKey, + } +} + +func (conf *balanceKeyRangeSchedulerConfig) parseFromArgs(args []string) error { + if len(args) < 4 { + return errs.ErrSchedulerConfig.FastGenByArgs("args length should be greater than 4") + } + newConf := &balanceKeyRangeSchedulerConfig{} + var err error + newConf.StartKey, err = url.QueryUnescape(args[0]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + newConf.EndKey, err = url.QueryUnescape(args[1]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + + newConf.Role, err = url.QueryUnescape(args[2]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + + newConf.Engine, err = url.QueryUnescape(args[3]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + if len(args) >= 5 { + timeout, err := url.QueryUnescape(args[4]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + conf.Timeout, err = time.ParseDuration(timeout) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + } else { + conf.Timeout = DefaultTimeout + } + *newConf = *newConf + return nil +} + +func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) { + return s.conf.encodeConfig() +} + +func (s *balanceKeyRangeScheduler) ReloadConfig() error { + return nil +} + +type balanceKeyRangeScheduler struct { + *BaseScheduler + conf *balanceKeyRangeSchedulerConfig + handler http.Handler + filters []filter.Filter + filterCounter *filter.Counter +} + +func (s *balanceKeyRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + log.Info("balance key range scheduler is scheduling, need to implement") + return nil, nil +} + +func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpKeyRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() + if !allowed { + operator.IncOperatorLimitCounter(s.GetType(), operator.OpKeyRange) + } + return allowed +} + +type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) + +// newBalanceKeyRangeScheduler creates a scheduler that tends to keep given peer role on +// special store balanced. +func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler { + s := &balanceKeyRangeScheduler{ + BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler, conf), + conf: conf, + handler: newBalanceKeyRangeHandler(conf), + } + for _, option := range options { + option(s) + } + s.filters = []filter.Filter{ + &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, + filter.NewSpecialUseFilter(s.GetName()), + } + s.filterCounter = filter.NewCounter(s.GetName()) + return s +} diff --git a/pkg/schedule/schedulers/balance_key_range_test.go b/pkg/schedule/schedulers/balance_key_range_test.go new file mode 100644 index 00000000000..9185832f5db --- /dev/null +++ b/pkg/schedule/schedulers/balance_key_range_test.go @@ -0,0 +1 @@ +package schedulers diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 51d857ae445..3fc4c0659c4 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -545,4 +545,29 @@ func schedulersRegister() { conf.init(sche.GetName(), storage, conf) return sche, nil }) + + // balance key range scheduler + RegisterSliceDecoderBuilder(types.BalanceKeyRangeScheduler, func(args []string) ConfigDecoder { + return func(v any) error { + conf, ok := v.(*balanceKeyRangeSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + return parseBalanceKeyRangeParamArgs(args, conf) + } + }) + + RegisterScheduler(types.BalanceKeyRangeScheduler, func(opController *operator.Controller, + storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { + conf := &balanceKeyRangeSchedulerConfig{ + baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(), + } + if err := decoder(conf); err != nil { + return nil, err + } + sche := newBalanceKeyRangeScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil + }) + } diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index 7bc27892010..24983a98520 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -70,6 +70,8 @@ const ( TransferWitnessLeaderScheduler CheckerSchedulerType = "transfer-witness-leader-scheduler" // LabelScheduler is label scheduler name. LabelScheduler CheckerSchedulerType = "label-scheduler" + // BalanceKeyRangeScheduler is balance key range scheduler name. + BalanceKeyRangeScheduler CheckerSchedulerType = "balance-key-range-scheduler" ) // TODO: SchedulerTypeCompatibleMap and ConvertOldStrToType should be removed after From 23ff7d068653200ecb27f97a483d651721124a47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Fri, 10 Jan 2025 09:50:26 +0800 Subject: [PATCH 2/7] add new scheduler for key range MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/schedule/operator/kind.go | 1 + pkg/schedule/operator/operator_test.go | 3 + pkg/schedule/schedulers/balance_key_range.go | 80 ++++++------------- .../schedulers/balance_key_range_test.go | 6 ++ pkg/schedule/schedulers/init.go | 23 +++++- pkg/schedule/schedulers/scheduler.go | 2 + pkg/schedule/types/type.go | 37 +++++---- server/api/scheduler.go | 23 ++++++ server/cluster/cluster_test.go | 7 ++ tools/pd-ctl/pdctl/command/scheduler.go | 58 ++++++++++++++ .../pd-ctl/tests/scheduler/scheduler_test.go | 22 ++++- 11 files changed, 187 insertions(+), 75 deletions(-) diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index 0c99a6b7a17..c6e4614f525 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -76,6 +76,7 @@ var nameToFlag = map[string]OpKind{ "replica": OpReplica, "merge": OpMerge, "range": OpRange, + "key-range": OpKeyRange, "witness-leader": OpWitnessLeader, } diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 6976b5ca12e..422091dea19 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -476,6 +476,9 @@ func (suite *operatorTestSuite) TestSchedulerKind() { }, { op: NewTestOperator(1, &metapb.RegionEpoch{}, OpLeader), expect: OpLeader, + }, { + op: NewTestOperator(1, &metapb.RegionEpoch{}, OpKeyRange|OpLeader), + expect: OpKeyRange, }, } for _, v := range testData { diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index 454eb88089a..dea00f45e9e 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -2,12 +2,13 @@ package schedulers import ( "net/http" - "net/url" "time" "github.com/gorilla/mux" "github.com/pingcap/log" - _ "github.com/tikv/pd/pkg/core" + "github.com/unrolled/render" + + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" sche "github.com/tikv/pd/pkg/schedule/core" @@ -15,7 +16,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/types" - "github.com/unrolled/render" ) const ( @@ -44,7 +44,9 @@ func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWrit func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.clone() - handler.rd.JSON(w, http.StatusOK, conf) + if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil { + log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err)) + } } type balanceKeyRangeSchedulerConfig struct { @@ -53,11 +55,10 @@ type balanceKeyRangeSchedulerConfig struct { } type balanceKeyRangeSchedulerParam struct { - Role string `json:"role"` - Engine string `json:"engine"` - StartKey string `json:"start_key"` - EndKey string `json:"end_key"` - Timeout time.Duration `json:"timeout"` + Role string `json:"role"` + Engine string `json:"engine"` + Timeout time.Duration `json:"timeout"` + Ranges []core.KeyRange `json:"ranges"` } func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) { @@ -69,54 +70,16 @@ func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) { func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerParam { conf.RLock() defer conf.RUnlock() + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) return &balanceKeyRangeSchedulerParam{ - Role: conf.Role, - Engine: conf.Engine, - StartKey: conf.StartKey, - EndKey: conf.EndKey, + Ranges: ranges, + Role: conf.Role, + Engine: conf.Engine, + Timeout: conf.Timeout, } } -func (conf *balanceKeyRangeSchedulerConfig) parseFromArgs(args []string) error { - if len(args) < 4 { - return errs.ErrSchedulerConfig.FastGenByArgs("args length should be greater than 4") - } - newConf := &balanceKeyRangeSchedulerConfig{} - var err error - newConf.StartKey, err = url.QueryUnescape(args[0]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - newConf.EndKey, err = url.QueryUnescape(args[1]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - - newConf.Role, err = url.QueryUnescape(args[2]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - - newConf.Engine, err = url.QueryUnescape(args[3]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - if len(args) >= 5 { - timeout, err := url.QueryUnescape(args[4]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - conf.Timeout, err = time.ParseDuration(timeout) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - } else { - conf.Timeout = DefaultTimeout - } - *newConf = *newConf - return nil -} - func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) { return s.conf.encodeConfig() } @@ -133,8 +96,13 @@ type balanceKeyRangeScheduler struct { filterCounter *filter.Counter } -func (s *balanceKeyRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - log.Info("balance key range scheduler is scheduling, need to implement") +// ServeHTTP implements the http.Handler interface. +func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +func (s *balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { + log.Debug("balance key range scheduler is scheduling, need to implement") return nil, nil } @@ -152,7 +120,7 @@ type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) // special store balanced. func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler { s := &balanceKeyRangeScheduler{ - BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler, conf), + BaseScheduler: NewBaseScheduler(opController, types.BalanceKeyRangeScheduler, conf), conf: conf, handler: newBalanceKeyRangeHandler(conf), } diff --git a/pkg/schedule/schedulers/balance_key_range_test.go b/pkg/schedule/schedulers/balance_key_range_test.go index 9185832f5db..f0a402d108a 100644 --- a/pkg/schedule/schedulers/balance_key_range_test.go +++ b/pkg/schedule/schedulers/balance_key_range_test.go @@ -1 +1,7 @@ package schedulers + +import "testing" + +func TestHttpApi(t *testing.T) { + +} diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 3fc4c0659c4..f9b296ee6d8 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -15,6 +15,7 @@ package schedulers import ( + "net/url" "strconv" "strings" "sync" @@ -547,13 +548,33 @@ func schedulersRegister() { }) // balance key range scheduler + // args: [role, engine, range1, range2, ...] RegisterSliceDecoderBuilder(types.BalanceKeyRangeScheduler, func(args []string) ConfigDecoder { return func(v any) error { conf, ok := v.(*balanceKeyRangeSchedulerConfig) if !ok { return errs.ErrScheduleConfigNotExist.FastGenByArgs() } - return parseBalanceKeyRangeParamArgs(args, conf) + if len(args) < 4 { + return errs.ErrSchedulerConfig.FastGenByArgs("args length must be greater than 3") + } + role, err := url.QueryUnescape(args[0]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + engine, err := url.QueryUnescape(args[1]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + ranges, err := getKeyRanges(args[2:]) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Engine = engine + conf.Role = role + conf.Timeout = DefaultTimeout + return nil } }) diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 8976c3a1928..fd6d6710350 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -157,7 +157,9 @@ func CreateScheduler( removeSchedulerCb ...func(string) error, ) (Scheduler, error) { fn, ok := schedulerMap[typ] + log.Info("create scheduler", zap.Any("typ", typ)) if !ok { + log.Warn("create scheduler not found", zap.Any("typ", typ)) return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index 24983a98520..baeb536c987 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -99,29 +99,31 @@ var ( SplitBucketScheduler: "split-bucket", TransferWitnessLeaderScheduler: "transfer-witness-leader", LabelScheduler: "label", + BalanceKeyRangeScheduler: "balance-key-range", } // ConvertOldStrToType exists for compatibility. // // It is used to convert the old scheduler type to `CheckerSchedulerType`. ConvertOldStrToType = map[string]CheckerSchedulerType{ - "balance-leader": BalanceLeaderScheduler, - "balance-region": BalanceRegionScheduler, - "balance-witness": BalanceWitnessScheduler, - "evict-leader": EvictLeaderScheduler, - "evict-slow-store": EvictSlowStoreScheduler, - "evict-slow-trend": EvictSlowTrendScheduler, - "grant-leader": GrantLeaderScheduler, - "grant-hot-region": GrantHotRegionScheduler, - "hot-region": BalanceHotRegionScheduler, - "random-merge": RandomMergeScheduler, - "scatter-range": ScatterRangeScheduler, - "shuffle-hot-region": ShuffleHotRegionScheduler, - "shuffle-leader": ShuffleLeaderScheduler, - "shuffle-region": ShuffleRegionScheduler, - "split-bucket": SplitBucketScheduler, - "transfer-witness-leader": TransferWitnessLeaderScheduler, - "label": LabelScheduler, + "balance-leader": BalanceLeaderScheduler, + "balance-region": BalanceRegionScheduler, + "balance-witness": BalanceWitnessScheduler, + "evict-leader": EvictLeaderScheduler, + "evict-slow-store": EvictSlowStoreScheduler, + "evict-slow-trend": EvictSlowTrendScheduler, + "grant-leader": GrantLeaderScheduler, + "grant-hot-region": GrantHotRegionScheduler, + "hot-region": BalanceHotRegionScheduler, + "random-merge": RandomMergeScheduler, + "scatter-range": ScatterRangeScheduler, + "shuffle-hot-region": ShuffleHotRegionScheduler, + "shuffle-leader": ShuffleLeaderScheduler, + "shuffle-region": ShuffleRegionScheduler, + "split-bucket": SplitBucketScheduler, + "transfer-witness-leader": TransferWitnessLeaderScheduler, + "label": LabelScheduler, + "balance-key-range-scheduler": BalanceKeyRangeScheduler, } // StringToSchedulerType is a map to convert the scheduler string to the CheckerSchedulerType. @@ -145,6 +147,7 @@ var ( "split-bucket-scheduler": SplitBucketScheduler, "transfer-witness-leader-scheduler": TransferWitnessLeaderScheduler, "label-scheduler": LabelScheduler, + "balance-key-range-scheduler": BalanceKeyRangeScheduler, } // DefaultSchedulers is the default scheduler types. diff --git a/server/api/scheduler.go b/server/api/scheduler.go index b2d18012c89..f8b62864c0c 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -99,6 +99,29 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques } switch tp { + case types.BalanceKeyRangeScheduler: + exist, _ := h.IsSchedulerExisted(name) + if exist { + h.r.JSON(w, http.StatusBadRequest, "The scheduler already exists, pls remove the exist scheduler first.") + return + } + if err := apiutil.CollectStringOption("role", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if err := apiutil.CollectStringOption("engine", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + if err := apiutil.CollectEscapeStringOption("end_key", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } case types.ScatterRangeScheduler: if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 2f6d04bbf52..c62fb64fc80 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3209,6 +3209,13 @@ func TestAddScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(gls)) + gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) + re.Error(err) + + gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"leaner", "tiflash", "100", "200"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls)) + hb, err := schedulers.CreateScheduler(types.BalanceHotRegionScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) conf, err = hb.EncodeConfig() diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 5dc05aff62f..48e8d9ecf2d 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -162,6 +162,7 @@ func NewAddSchedulerCommand() *cobra.Command { c.AddCommand(NewSlowTrendEvictLeaderSchedulerCommand()) c.AddCommand(NewBalanceWitnessSchedulerCommand()) c.AddCommand(NewTransferWitnessLeaderSchedulerCommand()) + c.AddCommand(NewBalanceKeyRangeSchedulerCommand()) return c } @@ -374,6 +375,16 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } +func NewBalanceKeyRangeSchedulerCommand() *cobra.Command { + c := &cobra.Command{ + Use: "balance-key-range-scheduler [--format=raw|encode|hex] ", + Short: "add a scheduler to balance region for given key range", + Run: addSchedulerForBalanceKeyRangeCommandFunc, + } + c.Flags().String("format", "hex", "the key format") + return c +} + // NewTransferWitnessLeaderSchedulerCommand returns a command to add a transfer-witness-leader-shceudler. func NewTransferWitnessLeaderSchedulerCommand() *cobra.Command { c := &cobra.Command{ @@ -412,6 +423,32 @@ func addSchedulerForGrantHotRegionCommandFunc(cmd *cobra.Command, args []string) postJSON(cmd, schedulersPrefix, input) } +func addSchedulerForBalanceKeyRangeCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 4 { + cmd.Println(cmd.UsageString()) + return + } + startKey, err := parseKey(cmd.Flags(), args[2]) + if err != nil { + cmd.Println("Error: ", err) + return + } + endKey, err := parseKey(cmd.Flags(), args[3]) + if err != nil { + cmd.Println("Error: ", err) + return + } + + input := make(map[string]any) + input["name"] = cmd.Name() + input["engine"] = args[0] + input["role"] = args[1] + input["start_key"] = url.QueryEscape(startKey) + input["end_key"] = url.QueryEscape(endKey) + + postJSON(cmd, schedulersPrefix, input) +} + func addSchedulerCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 0 { cmd.Println(cmd.UsageString()) @@ -523,6 +560,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigEvictSlowStoreCommand(), newConfigShuffleHotRegionSchedulerCommand(), newConfigEvictSlowTrendCommand(), + newConfigBalanceKeyRangeCommand(), ) return c } @@ -547,6 +585,26 @@ func newConfigBalanceLeaderCommand() *cobra.Command { return c } +func newConfigBalanceKeyRangeCommand() *cobra.Command { + c := &cobra.Command{ + Use: "balance-key-range-scheduler", + Short: "balance-key-range-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + + c.AddCommand(&cobra.Command{ + Use: "show", + Short: "show the config item", + Run: listSchedulerConfigCommandFunc, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, + }) + + return c +} + func newSplitBucketCommand() *cobra.Command { c := &cobra.Command{ Use: "split-bucket-scheduler", diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index f3a81845921..50237d8303c 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -15,6 +15,7 @@ package scheduler_test import ( + "encoding/base64" "encoding/json" "fmt" "reflect" @@ -84,7 +85,7 @@ func (suite *schedulerTestSuite) TearDownTest() { return currentSchedulers[i] == scheduler }) { echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", scheduler}, nil) - re.Contains(echo, "Success!") + re.Contains(echo, "Success!", scheduler) } } for _, scheduler := range currentSchedulers { @@ -541,6 +542,25 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust return !strings.Contains(echo, "evict-leader-scheduler") }) + // test balance key range scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]any) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler", "show"}, &conf) + re.Equal("learner", conf["role"]) + re.Equal("tiflash", conf["engine"]) + ranges := conf["ranges"].([]interface{})[0].(map[string]interface{}) + re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) + re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + re.Contains(echo, "400") + re.Contains(echo, "scheduler already exists") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-key-range-scheduler"}, nil) + re.Contains(echo, "Success!") + // test balance leader config conf = make(map[string]any) conf1 := make(map[string]any) From 1e6d628b5afd10bbb2c481016a584a87ea69c6b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Fri, 10 Jan 2025 14:32:53 +0800 Subject: [PATCH 3/7] pass ut MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/mcs/scheduling/server/cluster.go | 5 +++- pkg/schedule/types/type.go | 36 ++++++++++++++-------------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 6f80572673c..e45611b6feb 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -314,7 +314,10 @@ func (c *Cluster) updateScheduler() { ) // Create the newly added schedulers. for _, scheduler := range latestSchedulersConfig { - schedulerType := types.ConvertOldStrToType[scheduler.Type] + schedulerType, ok := types.ConvertOldStrToType[scheduler.Type] + if !ok { + log.Warn("scheduler not found ", zap.String("type", scheduler.Type)) + } s, err := schedulers.CreateScheduler( schedulerType, c.coordinator.GetOperatorController(), diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index baeb536c987..c9d06f31e9f 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -106,24 +106,24 @@ var ( // // It is used to convert the old scheduler type to `CheckerSchedulerType`. ConvertOldStrToType = map[string]CheckerSchedulerType{ - "balance-leader": BalanceLeaderScheduler, - "balance-region": BalanceRegionScheduler, - "balance-witness": BalanceWitnessScheduler, - "evict-leader": EvictLeaderScheduler, - "evict-slow-store": EvictSlowStoreScheduler, - "evict-slow-trend": EvictSlowTrendScheduler, - "grant-leader": GrantLeaderScheduler, - "grant-hot-region": GrantHotRegionScheduler, - "hot-region": BalanceHotRegionScheduler, - "random-merge": RandomMergeScheduler, - "scatter-range": ScatterRangeScheduler, - "shuffle-hot-region": ShuffleHotRegionScheduler, - "shuffle-leader": ShuffleLeaderScheduler, - "shuffle-region": ShuffleRegionScheduler, - "split-bucket": SplitBucketScheduler, - "transfer-witness-leader": TransferWitnessLeaderScheduler, - "label": LabelScheduler, - "balance-key-range-scheduler": BalanceKeyRangeScheduler, + "balance-leader": BalanceLeaderScheduler, + "balance-region": BalanceRegionScheduler, + "balance-witness": BalanceWitnessScheduler, + "evict-leader": EvictLeaderScheduler, + "evict-slow-store": EvictSlowStoreScheduler, + "evict-slow-trend": EvictSlowTrendScheduler, + "grant-leader": GrantLeaderScheduler, + "grant-hot-region": GrantHotRegionScheduler, + "hot-region": BalanceHotRegionScheduler, + "random-merge": RandomMergeScheduler, + "scatter-range": ScatterRangeScheduler, + "shuffle-hot-region": ShuffleHotRegionScheduler, + "shuffle-leader": ShuffleLeaderScheduler, + "shuffle-region": ShuffleRegionScheduler, + "split-bucket": SplitBucketScheduler, + "transfer-witness-leader": TransferWitnessLeaderScheduler, + "label": LabelScheduler, + "balance-key-range": BalanceKeyRangeScheduler, } // StringToSchedulerType is a map to convert the scheduler string to the CheckerSchedulerType. From d1da5b577e21852949548c112e3b62e1bf61fad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Mon, 13 Jan 2025 15:30:14 +0800 Subject: [PATCH 4/7] lint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/schedule/schedulers/balance_key_range.go | 24 ++++++++++++++++--- .../schedulers/balance_key_range_test.go | 7 ------ pkg/schedule/schedulers/init.go | 1 - server/cluster/cluster_test.go | 2 +- tools/pd-ctl/pdctl/command/scheduler.go | 1 + .../pd-ctl/tests/scheduler/scheduler_test.go | 2 +- 6 files changed, 24 insertions(+), 13 deletions(-) delete mode 100644 pkg/schedule/schedulers/balance_key_range_test.go diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index dea00f45e9e..aace3cc057b 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -5,9 +5,10 @@ import ( "time" "github.com/gorilla/mux" - "github.com/pingcap/log" "github.com/unrolled/render" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" @@ -19,6 +20,7 @@ import ( ) const ( + // DefaultTimeout is the default balance key range scheduler timeout. DefaultTimeout = 1 * time.Hour ) @@ -38,7 +40,7 @@ func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handle return router } -func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) { handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported") } @@ -80,11 +82,24 @@ func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerPar } } +// EncodeConfig serializes the config. func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) { return s.conf.encodeConfig() } +// ReloadConfig reloads the config. func (s *balanceKeyRangeScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + + newCfg := &balanceKeyRangeSchedulerConfig{} + if err := s.conf.load(newCfg); err != nil { + return err + } + s.conf.Ranges = newCfg.Ranges + s.conf.Timeout = newCfg.Timeout + s.conf.Role = newCfg.Role + s.conf.Engine = newCfg.Engine return nil } @@ -101,11 +116,13 @@ func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Requ s.handler.ServeHTTP(w, r) } -func (s *balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { +// Schedule schedules the balance key range operator. +func (*balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { log.Debug("balance key range scheduler is scheduling, need to implement") return nil, nil } +// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators. func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpKeyRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { @@ -114,6 +131,7 @@ func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerClust return allowed } +// BalanceKeyRangeCreateOption is used to create a scheduler with an option. type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) // newBalanceKeyRangeScheduler creates a scheduler that tends to keep given peer role on diff --git a/pkg/schedule/schedulers/balance_key_range_test.go b/pkg/schedule/schedulers/balance_key_range_test.go deleted file mode 100644 index f0a402d108a..00000000000 --- a/pkg/schedule/schedulers/balance_key_range_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package schedulers - -import "testing" - -func TestHttpApi(t *testing.T) { - -} diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index f9b296ee6d8..4734b162203 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -590,5 +590,4 @@ func schedulersRegister() { conf.init(sche.GetName(), storage, conf) return sche, nil }) - } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index c62fb64fc80..1fdac79f539 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3209,7 +3209,7 @@ func TestAddScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(gls)) - gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) + _, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) re.Error(err) gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"leaner", "tiflash", "100", "200"}), controller.RemoveScheduler) diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 48e8d9ecf2d..9a5993dd8cf 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -375,6 +375,7 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } +// NewBalanceKeyRangeSchedulerCommand returns a command to add a balance-key-range-scheduler. func NewBalanceKeyRangeSchedulerCommand() *cobra.Command { c := &cobra.Command{ Use: "balance-key-range-scheduler [--format=raw|encode|hex] ", diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 50237d8303c..e5575677cb6 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -551,7 +551,7 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler", "show"}, &conf) re.Equal("learner", conf["role"]) re.Equal("tiflash", conf["engine"]) - ranges := conf["ranges"].([]interface{})[0].(map[string]interface{}) + ranges := conf["ranges"].([]any)[0].(map[string]any) re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) From d0cfc2d352dac6f58c03417cb19a1c0cbf5945db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Mon, 13 Jan 2025 17:17:36 +0800 Subject: [PATCH 5/7] pass ut MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/mcs/scheduling/server/cluster.go | 3 ++- pkg/schedule/operator/kind.go | 1 + pkg/schedule/schedulers/balance_key_range.go | 9 +++------ pkg/schedule/schedulers/init.go | 17 +++++++++++++---- pkg/schedule/schedulers/scheduler.go | 2 -- server/api/scheduler.go | 10 ++++++++++ server/cluster/cluster_test.go | 9 ++++++++- tools/pd-ctl/tests/scheduler/scheduler_test.go | 8 +++++--- 8 files changed, 42 insertions(+), 17 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index e45611b6feb..f0b87e82c06 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -316,7 +316,8 @@ func (c *Cluster) updateScheduler() { for _, scheduler := range latestSchedulersConfig { schedulerType, ok := types.ConvertOldStrToType[scheduler.Type] if !ok { - log.Warn("scheduler not found ", zap.String("type", scheduler.Type)) + log.Error("scheduler not found ", zap.String("type", scheduler.Type)) + continue } s, err := schedulers.CreateScheduler( schedulerType, diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index c6e4614f525..0a7ccb34245 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -62,6 +62,7 @@ var flagToName = map[OpKind]string{ OpHotRegion: "hot-region", OpReplica: "replica", OpMerge: "merge", + OpKeyRange: "key-range", OpRange: "range", OpWitness: "witness", OpWitnessLeader: "witness-leader", diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index aace3cc057b..c21c62af71f 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -17,11 +17,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/types" -) - -const ( - // DefaultTimeout is the default balance key range scheduler timeout. - DefaultTimeout = 1 * time.Hour + "github.com/tikv/pd/pkg/utils/syncutil" ) type balanceKeyRangeSchedulerHandler struct { @@ -52,7 +48,8 @@ func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter } type balanceKeyRangeSchedulerConfig struct { - baseDefaultSchedulerConfig + syncutil.RWMutex + schedulerConfig balanceKeyRangeSchedulerParam } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 4734b162203..37d17ddd9ae 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" @@ -548,7 +549,7 @@ func schedulersRegister() { }) // balance key range scheduler - // args: [role, engine, range1, range2, ...] + // args: [role, engine, timeout, range1, range2, ...] RegisterSliceDecoderBuilder(types.BalanceKeyRangeScheduler, func(args []string) ConfigDecoder { return func(v any) error { conf, ok := v.(*balanceKeyRangeSchedulerConfig) @@ -566,14 +567,22 @@ func schedulersRegister() { if err != nil { return errs.ErrQueryUnescape.Wrap(err) } - ranges, err := getKeyRanges(args[2:]) + timeout, err := url.QueryUnescape(args[2]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + duration, err := time.ParseDuration(timeout) + if err != nil { + return errs.ErrURLParse.Wrap(err) + } + ranges, err := getKeyRanges(args[3:]) if err != nil { return err } conf.Ranges = ranges conf.Engine = engine conf.Role = role - conf.Timeout = DefaultTimeout + conf.Timeout = duration return nil } }) @@ -581,7 +590,7 @@ func schedulersRegister() { RegisterScheduler(types.BalanceKeyRangeScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { conf := &balanceKeyRangeSchedulerConfig{ - baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(), + schedulerConfig: newBaseDefaultSchedulerConfig(), } if err := decoder(conf); err != nil { return nil, err diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index fd6d6710350..8976c3a1928 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -157,9 +157,7 @@ func CreateScheduler( removeSchedulerCb ...func(string) error, ) (Scheduler, error) { fn, ok := schedulerMap[typ] - log.Info("create scheduler", zap.Any("typ", typ)) if !ok { - log.Warn("create scheduler not found", zap.Any("typ", typ)) return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } diff --git a/server/api/scheduler.go b/server/api/scheduler.go index f8b62864c0c..e50e563e5b8 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -113,6 +113,16 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques h.r.JSON(w, http.StatusInternalServerError, err.Error()) return } + defaultTimeout := "1h" + if err := apiutil.CollectStringOption("timeout", input, collector); err != nil { + if errors.ErrorEqual(err, errs.ErrOptionNotExist) { + collector(defaultTimeout) + } else { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + } + if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 1fdac79f539..ca78b4cfdd7 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3212,9 +3212,16 @@ func TestAddScheduler(t *testing.T) { _, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) re.Error(err) - gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"leaner", "tiflash", "100", "200"}), controller.RemoveScheduler) + gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler) re.NoError(err) re.NoError(controller.AddScheduler(gls)) + conf, err = gls.EncodeConfig() + re.NoError(err) + data = make(map[string]any) + re.NoError(json.Unmarshal(conf, &data)) + re.Equal("learner", data["role"]) + re.Equal("tiflash", data["engine"]) + re.Equal(float64(time.Hour.Nanoseconds()), data["timeout"]) hb, err := schedulers.CreateScheduler(types.BalanceHotRegionScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index e5575677cb6..38338bc2494 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -548,9 +548,11 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) re.Contains(echo, "Success!") conf = make(map[string]any) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler", "show"}, &conf) - re.Equal("learner", conf["role"]) - re.Equal("tiflash", conf["engine"]) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler"}, &conf) + return conf["role"] == "learner" && conf["engine"] == "tiflash" + }) + re.Equal(float64(time.Hour.Nanoseconds()), conf["timeout"]) ranges := conf["ranges"].([]any)[0].(map[string]any) re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) From d86148f6160442b7e3bab0e1b3c5f11ee383ff0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Wed, 15 Jan 2025 18:17:13 +0800 Subject: [PATCH 6/7] rename balance-key-range to balance-range MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/schedule/operator/kind.go | 4 ---- pkg/schedule/operator/operator_test.go | 3 --- pkg/schedule/schedulers/balance_key_range.go | 6 +++--- pkg/schedule/schedulers/init.go | 4 ++-- pkg/schedule/types/type.go | 10 +++++----- server/api/scheduler.go | 2 +- server/cluster/cluster_test.go | 4 ++-- tools/pd-ctl/pdctl/command/scheduler.go | 18 +++++++++--------- tools/pd-ctl/tests/scheduler/scheduler_test.go | 10 +++++----- 10 files changed, 28 insertions(+), 35 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index f0b87e82c06..9ab5d329398 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -316,7 +316,7 @@ func (c *Cluster) updateScheduler() { for _, scheduler := range latestSchedulersConfig { schedulerType, ok := types.ConvertOldStrToType[scheduler.Type] if !ok { - log.Error("scheduler not found ", zap.String("type", scheduler.Type)) + log.Error("scheduler not found", zap.String("type", scheduler.Type)) continue } s, err := schedulers.CreateScheduler( diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index 0a7ccb34245..0187a64c568 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -35,8 +35,6 @@ const ( OpMerge // Initiated by range scheduler. OpRange - // Initiated by key range scheduler. - OpKeyRange // Initiated by replica checker. OpReplica // Include region split. Initiated by rule checker if `kind & OpAdmin == 0`. @@ -62,7 +60,6 @@ var flagToName = map[OpKind]string{ OpHotRegion: "hot-region", OpReplica: "replica", OpMerge: "merge", - OpKeyRange: "key-range", OpRange: "range", OpWitness: "witness", OpWitnessLeader: "witness-leader", @@ -77,7 +74,6 @@ var nameToFlag = map[string]OpKind{ "replica": OpReplica, "merge": OpMerge, "range": OpRange, - "key-range": OpKeyRange, "witness-leader": OpWitnessLeader, } diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 422091dea19..6976b5ca12e 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -476,9 +476,6 @@ func (suite *operatorTestSuite) TestSchedulerKind() { }, { op: NewTestOperator(1, &metapb.RegionEpoch{}, OpLeader), expect: OpLeader, - }, { - op: NewTestOperator(1, &metapb.RegionEpoch{}, OpKeyRange|OpLeader), - expect: OpKeyRange, }, } for _, v := range testData { diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index c21c62af71f..71e0fab29d9 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -121,9 +121,9 @@ func (*balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRu // IsScheduleAllowed checks if the scheduler is allowed to schedule new operators. func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := s.OpController.OperatorCount(operator.OpKeyRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() + allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(s.GetType(), operator.OpKeyRange) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange) } return allowed } @@ -135,7 +135,7 @@ type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) // special store balanced. func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler { s := &balanceKeyRangeScheduler{ - BaseScheduler: NewBaseScheduler(opController, types.BalanceKeyRangeScheduler, conf), + BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf), conf: conf, handler: newBalanceKeyRangeHandler(conf), } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 37d17ddd9ae..f86e1596f27 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -550,7 +550,7 @@ func schedulersRegister() { // balance key range scheduler // args: [role, engine, timeout, range1, range2, ...] - RegisterSliceDecoderBuilder(types.BalanceKeyRangeScheduler, func(args []string) ConfigDecoder { + RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder { return func(v any) error { conf, ok := v.(*balanceKeyRangeSchedulerConfig) if !ok { @@ -587,7 +587,7 @@ func schedulersRegister() { } }) - RegisterScheduler(types.BalanceKeyRangeScheduler, func(opController *operator.Controller, + RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { conf := &balanceKeyRangeSchedulerConfig{ schedulerConfig: newBaseDefaultSchedulerConfig(), diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index c9d06f31e9f..87e89c18948 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -70,8 +70,8 @@ const ( TransferWitnessLeaderScheduler CheckerSchedulerType = "transfer-witness-leader-scheduler" // LabelScheduler is label scheduler name. LabelScheduler CheckerSchedulerType = "label-scheduler" - // BalanceKeyRangeScheduler is balance key range scheduler name. - BalanceKeyRangeScheduler CheckerSchedulerType = "balance-key-range-scheduler" + // BalanceRangeScheduler is balance key range scheduler name. + BalanceRangeScheduler CheckerSchedulerType = "balance-range-scheduler" ) // TODO: SchedulerTypeCompatibleMap and ConvertOldStrToType should be removed after @@ -99,7 +99,7 @@ var ( SplitBucketScheduler: "split-bucket", TransferWitnessLeaderScheduler: "transfer-witness-leader", LabelScheduler: "label", - BalanceKeyRangeScheduler: "balance-key-range", + BalanceRangeScheduler: "balance-range", } // ConvertOldStrToType exists for compatibility. @@ -123,7 +123,7 @@ var ( "split-bucket": SplitBucketScheduler, "transfer-witness-leader": TransferWitnessLeaderScheduler, "label": LabelScheduler, - "balance-key-range": BalanceKeyRangeScheduler, + "balance-range": BalanceRangeScheduler, } // StringToSchedulerType is a map to convert the scheduler string to the CheckerSchedulerType. @@ -147,7 +147,7 @@ var ( "split-bucket-scheduler": SplitBucketScheduler, "transfer-witness-leader-scheduler": TransferWitnessLeaderScheduler, "label-scheduler": LabelScheduler, - "balance-key-range-scheduler": BalanceKeyRangeScheduler, + "balance-range-scheduler": BalanceRangeScheduler, } // DefaultSchedulers is the default scheduler types. diff --git a/server/api/scheduler.go b/server/api/scheduler.go index e50e563e5b8..d9f8aa6518d 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -99,7 +99,7 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques } switch tp { - case types.BalanceKeyRangeScheduler: + case types.BalanceRangeScheduler: exist, _ := h.IsSchedulerExisted(name) if exist { h.r.JSON(w, http.StatusBadRequest, "The scheduler already exists, pls remove the exist scheduler first.") diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index ca78b4cfdd7..d2382ded70c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3209,10 +3209,10 @@ func TestAddScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(gls)) - _, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) + _, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{}), controller.RemoveScheduler) re.Error(err) - gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler) + gls, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler) re.NoError(err) re.NoError(controller.AddScheduler(gls)) conf, err = gls.EncodeConfig() diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 9a5993dd8cf..50525d885fd 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -162,7 +162,7 @@ func NewAddSchedulerCommand() *cobra.Command { c.AddCommand(NewSlowTrendEvictLeaderSchedulerCommand()) c.AddCommand(NewBalanceWitnessSchedulerCommand()) c.AddCommand(NewTransferWitnessLeaderSchedulerCommand()) - c.AddCommand(NewBalanceKeyRangeSchedulerCommand()) + c.AddCommand(NewBalanceRangeSchedulerCommand()) return c } @@ -375,12 +375,12 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } -// NewBalanceKeyRangeSchedulerCommand returns a command to add a balance-key-range-scheduler. -func NewBalanceKeyRangeSchedulerCommand() *cobra.Command { +// NewBalanceRangeSchedulerCommand returns a command to add a balance-key-range-scheduler. +func NewBalanceRangeSchedulerCommand() *cobra.Command { c := &cobra.Command{ - Use: "balance-key-range-scheduler [--format=raw|encode|hex] ", - Short: "add a scheduler to balance region for given key range", - Run: addSchedulerForBalanceKeyRangeCommandFunc, + Use: "balance-range-scheduler [--format=raw|encode|hex] ", + Short: "add a scheduler to balance region for given range", + Run: addSchedulerForBalanceRangeCommandFunc, } c.Flags().String("format", "hex", "the key format") return c @@ -424,7 +424,7 @@ func addSchedulerForGrantHotRegionCommandFunc(cmd *cobra.Command, args []string) postJSON(cmd, schedulersPrefix, input) } -func addSchedulerForBalanceKeyRangeCommandFunc(cmd *cobra.Command, args []string) { +func addSchedulerForBalanceRangeCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 4 { cmd.Println(cmd.UsageString()) return @@ -588,8 +588,8 @@ func newConfigBalanceLeaderCommand() *cobra.Command { func newConfigBalanceKeyRangeCommand() *cobra.Command { c := &cobra.Command{ - Use: "balance-key-range-scheduler", - Short: "balance-key-range-scheduler config", + Use: "balance-range-scheduler", + Short: "balance-range-scheduler config", Run: listSchedulerConfigCommandFunc, } diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 38338bc2494..f95cf033239 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -543,13 +543,13 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust }) // test balance key range scheduler - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler"}, nil) re.NotContains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) re.Contains(echo, "Success!") conf = make(map[string]any) testutil.Eventually(re, func() bool { - mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler"}, &conf) + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-range-scheduler"}, &conf) return conf["role"] == "learner" && conf["engine"] == "tiflash" }) re.Equal(float64(time.Hour.Nanoseconds()), conf["timeout"]) @@ -557,10 +557,10 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) re.Contains(echo, "400") re.Contains(echo, "scheduler already exists") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-key-range-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-range-scheduler"}, nil) re.Contains(echo, "Success!") // test balance leader config From 8bdb7bc5c5b1add45ff0b27d5e81c111d5bca296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Thu, 16 Jan 2025 15:21:44 +0800 Subject: [PATCH 7/7] use hex encode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/core/basic_cluster.go | 10 ++++++++++ tools/pd-ctl/pdctl/command/scheduler.go | 9 +++++---- tools/pd-ctl/tests/scheduler/scheduler_test.go | 5 ++--- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index f7c3c5e93b1..45e06648c35 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -16,6 +16,7 @@ package core import ( "bytes" + "encoding/json" "github.com/tikv/pd/pkg/core/constant" ) @@ -156,6 +157,15 @@ type KeyRange struct { EndKey []byte `json:"end-key"` } +// MarshalJSON marshals to json. +func (kr KeyRange) MarshalJSON() ([]byte, error) { + m := map[string]string{ + "start-key": HexRegionKeyStr(kr.StartKey), + "end-key": HexRegionKeyStr(kr.EndKey), + } + return json.Marshal(m) +} + // NewKeyRange create a KeyRange with the given start key and end key. func NewKeyRange(startKey, endKey string) KeyRange { return KeyRange{ diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 50525d885fd..1492709fc79 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -375,12 +375,13 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } -// NewBalanceRangeSchedulerCommand returns a command to add a balance-key-range-scheduler. +// NewBalanceRangeSchedulerCommand returns a command to add a balance-range-scheduler. func NewBalanceRangeSchedulerCommand() *cobra.Command { c := &cobra.Command{ - Use: "balance-range-scheduler [--format=raw|encode|hex] ", - Short: "add a scheduler to balance region for given range", - Run: addSchedulerForBalanceRangeCommandFunc, + Use: "balance-range-scheduler [--format=raw|encode|hex] ", + Short: "add a scheduler to balance region for given range", + Run: addSchedulerForBalanceRangeCommandFunc, + Deprecated: "balance-range will be deprecated in the future, please use sql instead", } c.Flags().String("format", "hex", "the key format") return c diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index f95cf033239..1d011329c42 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -15,7 +15,6 @@ package scheduler_test import ( - "encoding/base64" "encoding/json" "fmt" "reflect" @@ -554,8 +553,8 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust }) re.Equal(float64(time.Hour.Nanoseconds()), conf["timeout"]) ranges := conf["ranges"].([]any)[0].(map[string]any) - re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) - re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) + re.Equal(core.HexRegionKeyStr([]byte("a")), ranges["start-key"]) + re.Equal(core.HexRegionKeyStr([]byte("b")), ranges["end-key"]) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) re.Contains(echo, "400")