-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
api: add a new scheduler to balance the regions of the given key range #8988
base: master
Are you sure you want to change the base?
Changes from all commits
e693b3e
23ff7d0
1e6d628
d1da5b5
d0cfc2d
d86148f
8bdb7bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package schedulers | ||
|
||
import ( | ||
"net/http" | ||
"time" | ||
|
||
"github.com/gorilla/mux" | ||
"github.com/unrolled/render" | ||
|
||
"github.com/pingcap/log" | ||
|
||
"github.com/tikv/pd/pkg/core" | ||
"github.com/tikv/pd/pkg/core/constant" | ||
"github.com/tikv/pd/pkg/errs" | ||
sche "github.com/tikv/pd/pkg/schedule/core" | ||
"github.com/tikv/pd/pkg/schedule/filter" | ||
"github.com/tikv/pd/pkg/schedule/operator" | ||
"github.com/tikv/pd/pkg/schedule/plan" | ||
"github.com/tikv/pd/pkg/schedule/types" | ||
"github.com/tikv/pd/pkg/utils/syncutil" | ||
) | ||
|
||
type balanceKeyRangeSchedulerHandler struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep using balanceRangeSchedulerHandler? |
||
rd *render.Render | ||
config *balanceKeyRangeSchedulerConfig | ||
} | ||
|
||
func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handler { | ||
handler := &balanceKeyRangeSchedulerHandler{ | ||
config: conf, | ||
rd: render.New(render.Options{IndentJSON: true}), | ||
} | ||
router := mux.NewRouter() | ||
router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost) | ||
router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet) | ||
return router | ||
} | ||
|
||
func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) { | ||
handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported") | ||
} | ||
|
||
func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) { | ||
conf := handler.config.clone() | ||
if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil { | ||
log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err)) | ||
} | ||
} | ||
|
||
type balanceKeyRangeSchedulerConfig struct { | ||
syncutil.RWMutex | ||
schedulerConfig | ||
balanceKeyRangeSchedulerParam | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we use a slice to support multiple key ranges with different roles or engines? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to support a multi-key range with same role and engine. |
||
} | ||
|
||
type balanceKeyRangeSchedulerParam struct { | ||
Role string `json:"role"` | ||
Engine string `json:"engine"` | ||
Timeout time.Duration `json:"timeout"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do all ranges share the same timeout? |
||
Ranges []core.KeyRange `json:"ranges"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can let the API interface like:
Then we can alias the table name by tidb or others to help the user read it. |
||
} | ||
|
||
func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) { | ||
conf.RLock() | ||
defer conf.RUnlock() | ||
return EncodeConfig(conf) | ||
} | ||
|
||
func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerParam { | ||
conf.RLock() | ||
defer conf.RUnlock() | ||
ranges := make([]core.KeyRange, len(conf.Ranges)) | ||
copy(ranges, conf.Ranges) | ||
return &balanceKeyRangeSchedulerParam{ | ||
Ranges: ranges, | ||
Role: conf.Role, | ||
Engine: conf.Engine, | ||
Timeout: conf.Timeout, | ||
} | ||
} | ||
|
||
// EncodeConfig serializes the config. | ||
func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a little bit confusing to have both EncodeConfig and encodeConfig. |
||
return s.conf.encodeConfig() | ||
} | ||
|
||
// ReloadConfig reloads the config. | ||
func (s *balanceKeyRangeScheduler) ReloadConfig() error { | ||
s.conf.Lock() | ||
defer s.conf.Unlock() | ||
|
||
newCfg := &balanceKeyRangeSchedulerConfig{} | ||
if err := s.conf.load(newCfg); err != nil { | ||
return err | ||
} | ||
s.conf.Ranges = newCfg.Ranges | ||
s.conf.Timeout = newCfg.Timeout | ||
s.conf.Role = newCfg.Role | ||
s.conf.Engine = newCfg.Engine | ||
return nil | ||
} | ||
|
||
type balanceKeyRangeScheduler struct { | ||
*BaseScheduler | ||
conf *balanceKeyRangeSchedulerConfig | ||
handler http.Handler | ||
filters []filter.Filter | ||
filterCounter *filter.Counter | ||
} | ||
|
||
// ServeHTTP implements the http.Handler interface. | ||
func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
s.handler.ServeHTTP(w, r) | ||
} | ||
|
||
// Schedule schedules the balance key range operator. | ||
func (*balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { | ||
log.Debug("balance key range scheduler is scheduling, need to implement") | ||
return nil, nil | ||
} | ||
|
||
// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators. | ||
func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { | ||
allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() | ||
if !allowed { | ||
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange) | ||
} | ||
return allowed | ||
} | ||
|
||
// BalanceKeyRangeCreateOption is used to create a scheduler with an option. | ||
type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) | ||
|
||
// newBalanceKeyRangeScheduler creates a scheduler that tends to keep given peer role on | ||
// special store balanced. | ||
func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler { | ||
s := &balanceKeyRangeScheduler{ | ||
BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf), | ||
conf: conf, | ||
handler: newBalanceKeyRangeHandler(conf), | ||
} | ||
for _, option := range options { | ||
option(s) | ||
} | ||
s.filters = []filter.Filter{ | ||
&filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, | ||
filter.NewSpecialUseFilter(s.GetName()), | ||
} | ||
s.filterCounter = filter.NewCounter(s.GetName()) | ||
return s | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need a license and please also change the filename.