Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add a new scheduler to balance the regions of the given key range #8988

Merged
merged 14 commits into from
Feb 10, 2025
Prev Previous commit
Next Next commit
rename
Signed-off-by: 童剑 <1045931706@qq.com>
  • Loading branch information
bufferflies committed Jan 21, 2025
commit 0696ba64e4dad421aab0547371315737f63c9f7f
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
@@ -20,13 +34,13 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

type balanceKeyRangeSchedulerHandler struct {
type balanceRangeSchedulerHandler struct {
rd *render.Render
config *balanceKeyRangeSchedulerConfig
config *balanceRangeSchedulerConfig
}

func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handler {
handler := &balanceKeyRangeSchedulerHandler{
func newBalanceRangeHandler(conf *balanceRangeSchedulerConfig) http.Handler {
handler := &balanceRangeSchedulerHandler{
config: conf,
rd: render.New(render.Options{IndentJSON: true}),
}
@@ -36,42 +50,36 @@ func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handle
return router
}

func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) {
func (handler *balanceRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) {
handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported")
}

func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
func (handler *balanceRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.clone()
if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil {
log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err))
}
}

type balanceKeyRangeSchedulerConfig struct {
type balanceRangeSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
balanceKeyRangeSchedulerParam
balanceRangeSchedulerParam
}

type balanceKeyRangeSchedulerParam struct {
type balanceRangeSchedulerParam struct {
Role string `json:"role"`
Engine string `json:"engine"`
Timeout time.Duration `json:"timeout"`
Ranges []core.KeyRange `json:"ranges"`
}

func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) {
conf.RLock()
defer conf.RUnlock()
return EncodeConfig(conf)
}

func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerParam {
func (conf *balanceRangeSchedulerConfig) clone() *balanceRangeSchedulerParam {
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return &balanceKeyRangeSchedulerParam{
return &balanceRangeSchedulerParam{
Ranges: ranges,
Role: conf.Role,
Engine: conf.Engine,
@@ -80,16 +88,18 @@ func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerPar
}

// EncodeConfig serializes the config.
func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) {
return s.conf.encodeConfig()
func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) {
s.conf.RLock()
defer s.conf.RUnlock()
return EncodeConfig(s.conf)
}

// ReloadConfig reloads the config.
func (s *balanceKeyRangeScheduler) ReloadConfig() error {
func (s *balanceRangeScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()

newCfg := &balanceKeyRangeSchedulerConfig{}
newCfg := &balanceRangeSchedulerConfig{}
if err := s.conf.load(newCfg); err != nil {
return err
}
@@ -100,44 +110,44 @@ func (s *balanceKeyRangeScheduler) ReloadConfig() error {
return nil
}

type balanceKeyRangeScheduler struct {
type balanceRangeScheduler struct {
*BaseScheduler
conf *balanceKeyRangeSchedulerConfig
conf *balanceRangeSchedulerConfig
handler http.Handler
filters []filter.Filter
filterCounter *filter.Counter
}

// ServeHTTP implements the http.Handler interface.
func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (s *balanceRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

// Schedule schedules the balance key range operator.
func (*balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) {
func (*balanceRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) {
log.Debug("balance key range scheduler is scheduling, need to implement")
return nil, nil
}

// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators.
func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
if !allowed {
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange)
}
return allowed
}

// BalanceKeyRangeCreateOption is used to create a scheduler with an option.
type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler)
// BalanceRangeCreateOption is used to create a scheduler with an option.
type BalanceRangeCreateOption func(s *balanceRangeScheduler)

// newBalanceKeyRangeScheduler creates a scheduler that tends to keep given peer role on
// newBalanceRangeScheduler creates a scheduler that tends to keep given peer role on
// special store balanced.
func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler {
s := &balanceKeyRangeScheduler{
func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRangeSchedulerConfig, options ...BalanceRangeCreateOption) Scheduler {
s := &balanceRangeScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf),
conf: conf,
handler: newBalanceKeyRangeHandler(conf),
handler: newBalanceRangeHandler(conf),
}
for _, option := range options {
option(s)
6 changes: 3 additions & 3 deletions pkg/schedule/schedulers/init.go
Original file line number Diff line number Diff line change
@@ -552,7 +552,7 @@ func schedulersRegister() {
// args: [role, engine, timeout, range1, range2, ...]
RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder {
return func(v any) error {
conf, ok := v.(*balanceKeyRangeSchedulerConfig)
conf, ok := v.(*balanceRangeSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
@@ -589,13 +589,13 @@ func schedulersRegister() {

RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller,
storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) {
conf := &balanceKeyRangeSchedulerConfig{
conf := &balanceRangeSchedulerConfig{
schedulerConfig: newBaseDefaultSchedulerConfig(),
}
if err := decoder(conf); err != nil {
return nil, err
}
sche := newBalanceKeyRangeScheduler(opController, conf)
sche := newBalanceRangeScheduler(opController, conf)
conf.init(sche.GetName(), storage, conf)
return sche, nil
})
4 changes: 2 additions & 2 deletions tools/pd-ctl/pdctl/command/scheduler.go
Original file line number Diff line number Diff line change
@@ -562,7 +562,7 @@ func NewConfigSchedulerCommand() *cobra.Command {
newConfigEvictSlowStoreCommand(),
newConfigShuffleHotRegionSchedulerCommand(),
newConfigEvictSlowTrendCommand(),
newConfigBalanceKeyRangeCommand(),
newConfigBalanceRangeCommand(),
)
return c
}
@@ -587,7 +587,7 @@ func newConfigBalanceLeaderCommand() *cobra.Command {
return c
}

func newConfigBalanceKeyRangeCommand() *cobra.Command {
func newConfigBalanceRangeCommand() *cobra.Command {
c := &cobra.Command{
Use: "balance-range-scheduler",
Short: "balance-range-scheduler config",