Skip to content

Commit cb2bd58

Browse files
committed
Merge branch 'balance_key_range' into balance_key_range_scheduler
2 parents fb723a0 + 8bdb7bc commit cb2bd58

File tree

12 files changed

+175
-90
lines changed

12 files changed

+175
-90
lines changed

Diff for: pkg/core/basic_cluster.go

+10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package core
1616

1717
import (
1818
"bytes"
19+
"encoding/json"
1920

2021
"github.com/tikv/pd/pkg/core/constant"
2122
)
@@ -156,6 +157,15 @@ type KeyRange struct {
156157
EndKey []byte `json:"end-key"`
157158
}
158159

160+
// MarshalJSON marshals to json.
161+
func (kr KeyRange) MarshalJSON() ([]byte, error) {
162+
m := map[string]string{
163+
"start-key": HexRegionKeyStr(kr.StartKey),
164+
"end-key": HexRegionKeyStr(kr.EndKey),
165+
}
166+
return json.Marshal(m)
167+
}
168+
159169
// NewKeyRange create a KeyRange with the given start key and end key.
160170
func NewKeyRange(startKey, endKey string) KeyRange {
161171
return KeyRange{

Diff for: pkg/mcs/scheduling/server/cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func (c *Cluster) updateScheduler() {
316316
for _, scheduler := range latestSchedulersConfig {
317317
schedulerType, ok := types.ConvertOldStrToType[scheduler.Type]
318318
if !ok {
319-
log.Error("scheduler not found ", zap.String("type", scheduler.Type))
319+
log.Error("scheduler not found", zap.String("type", scheduler.Type))
320320
continue
321321
}
322322
s, err := schedulers.CreateScheduler(

Diff for: pkg/schedule/operator/kind.go

-4
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ const (
3535
OpMerge
3636
// Initiated by range scheduler.
3737
OpRange
38-
// Initiated by key range scheduler.
39-
OpKeyRange
4038
// Initiated by replica checker.
4139
OpReplica
4240
// Include region split. Initiated by rule checker if `kind & OpAdmin == 0`.
@@ -62,7 +60,6 @@ var flagToName = map[OpKind]string{
6260
OpHotRegion: "hot-region",
6361
OpReplica: "replica",
6462
OpMerge: "merge",
65-
OpKeyRange: "key-range",
6663
OpRange: "range",
6764
OpWitness: "witness",
6865
OpWitnessLeader: "witness-leader",
@@ -77,7 +74,6 @@ var nameToFlag = map[string]OpKind{
7774
"replica": OpReplica,
7875
"merge": OpMerge,
7976
"range": OpRange,
80-
"key-range": OpKeyRange,
8177
"witness-leader": OpWitnessLeader,
8278
}
8379

Diff for: pkg/schedule/operator/operator_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -476,9 +476,6 @@ func (suite *operatorTestSuite) TestSchedulerKind() {
476476
}, {
477477
op: NewTestOperator(1, &metapb.RegionEpoch{}, OpLeader),
478478
expect: OpLeader,
479-
}, {
480-
op: NewTestOperator(1, &metapb.RegionEpoch{}, OpKeyRange|OpLeader),
481-
expect: OpKeyRange,
482479
},
483480
}
484481
for _, v := range testData {

Diff for: pkg/schedule/schedulers/balance_key_range.go

+127-48
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package schedulers
22

33
import (
44
"github.com/pingcap/kvproto/pkg/metapb"
5+
"github.com/tikv/pd/pkg/schedule/placement"
6+
"go.uber.org/zap"
57
"net/http"
8+
"sort"
69
"time"
710

811
"github.com/gorilla/mux"
@@ -25,10 +28,10 @@ const balanceKeyRangeName = "balance-key-ranges"
2528

2629
type balanceKeyRangeSchedulerHandler struct {
2730
rd *render.Render
28-
config *balanceKeyRangeSchedulerConfig
31+
config *balanceRangeSchedulerConfig
2932
}
3033

31-
func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handler {
34+
func newBalanceKeyRangeHandler(conf *balanceRangeSchedulerConfig) http.Handler {
3235
handler := &balanceKeyRangeSchedulerHandler{
3336
config: conf,
3437
rd: render.New(render.Options{IndentJSON: true}),
@@ -50,31 +53,31 @@ func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter
5053
}
5154
}
5255

53-
type balanceKeyRangeSchedulerConfig struct {
56+
type balanceRangeSchedulerConfig struct {
5457
syncutil.RWMutex
5558
schedulerConfig
56-
balanceKeyRangeSchedulerParam
59+
balanceRangeSchedulerParam
5760
}
5861

59-
type balanceKeyRangeSchedulerParam struct {
62+
type balanceRangeSchedulerParam struct {
6063
Role string `json:"role"`
6164
Engine string `json:"engine"`
6265
Timeout time.Duration `json:"timeout"`
6366
Ranges []core.KeyRange `json:"ranges"`
6467
}
6568

66-
func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) {
69+
func (conf *balanceRangeSchedulerConfig) encodeConfig() ([]byte, error) {
6770
conf.RLock()
6871
defer conf.RUnlock()
6972
return EncodeConfig(conf)
7073
}
7174

72-
func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerParam {
75+
func (conf *balanceRangeSchedulerConfig) clone() *balanceRangeSchedulerParam {
7376
conf.RLock()
7477
defer conf.RUnlock()
7578
ranges := make([]core.KeyRange, len(conf.Ranges))
7679
copy(ranges, conf.Ranges)
77-
return &balanceKeyRangeSchedulerParam{
80+
return &balanceRangeSchedulerParam{
7881
Ranges: ranges,
7982
Role: conf.Role,
8083
Engine: conf.Engine,
@@ -83,16 +86,16 @@ func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerPar
8386
}
8487

8588
// EncodeConfig serializes the config.
86-
func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) {
89+
func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) {
8790
return s.conf.encodeConfig()
8891
}
8992

9093
// ReloadConfig reloads the config.
91-
func (s *balanceKeyRangeScheduler) ReloadConfig() error {
94+
func (s *balanceRangeScheduler) ReloadConfig() error {
9295
s.conf.Lock()
9396
defer s.conf.Unlock()
9497

95-
newCfg := &balanceKeyRangeSchedulerConfig{}
98+
newCfg := &balanceRangeSchedulerConfig{}
9699
if err := s.conf.load(newCfg); err != nil {
97100
return err
98101
}
@@ -103,9 +106,9 @@ func (s *balanceKeyRangeScheduler) ReloadConfig() error {
103106
return nil
104107
}
105108

106-
type balanceKeyRangeScheduler struct {
109+
type balanceRangeScheduler struct {
107110
*BaseScheduler
108-
conf *balanceKeyRangeSchedulerConfig
111+
conf *balanceRangeSchedulerConfig
109112
handler http.Handler
110113
start time.Time
111114
role Role
@@ -114,31 +117,31 @@ type balanceKeyRangeScheduler struct {
114117
}
115118

116119
// ServeHTTP implements the http.Handler interface.
117-
func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
120+
func (s *balanceRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
118121
s.handler.ServeHTTP(w, r)
119122
}
120123

121124
// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators.
122-
func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
123-
allowed := s.OpController.OperatorCount(operator.OpKeyRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
125+
func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
126+
allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
124127
if !allowed {
125-
operator.IncOperatorLimitCounter(s.GetType(), operator.OpKeyRange)
128+
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange)
126129
}
127130
if time.Now().Sub(s.start) > s.conf.Timeout {
128131
allowed = false
129-
balanceExpiredCounter.Inc()
132+
balanceRangeExpiredCounter.Inc()
130133
}
131134
return allowed
132135
}
133136

134137
// BalanceKeyRangeCreateOption is used to create a scheduler with an option.
135-
type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler)
138+
type BalanceKeyRangeCreateOption func(s *balanceRangeScheduler)
136139

137140
// newBalanceKeyRangeScheduler creates a scheduler that tends to keep given peer role on
138141
// special store balanced.
139-
func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler {
140-
s := &balanceKeyRangeScheduler{
141-
BaseScheduler: NewBaseScheduler(opController, types.BalanceKeyRangeScheduler, conf),
142+
func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler {
143+
s := &balanceRangeScheduler{
144+
BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf),
142145
conf: conf,
143146
handler: newBalanceKeyRangeHandler(conf),
144147
start: time.Now(),
@@ -160,55 +163,126 @@ func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanc
160163
}
161164

162165
// Schedule schedules the balance key range operator.
163-
func (s *balanceKeyRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
164-
balanceKeyRangeCounter.Inc()
166+
func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
167+
balanceRangeCounter.Inc()
165168
plan,err:=s.prepare(cluster)
169+
downFilter := filter.NewRegionDownFilter()
170+
replicaFilter := filter.NewRegionReplicatedFilter(cluster)
171+
snapshotFilter := filter.NewSnapshotSendFilter(plan.stores, constant.Medium)
172+
baseRegionFilters := []filter.RegionFilter{downFilter, replicaFilter, snapshotFilter}
173+
174+
for sourceIndex,sourceStore:=range plan.stores{
175+
plan.source=sourceStore
176+
switch s.role{
177+
case Leader:
178+
plan.region=filter.SelectOneRegion(cluster.RandLeaderRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...)
179+
case Learner:
180+
plan.region=filter.SelectOneRegion(cluster.RandLearnerRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...)
181+
case Follower:
182+
plan.region=filter.SelectOneRegion(cluster.RandFollowerRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...)
183+
}
184+
if plan.region == nil {
185+
balanceRangeNoRegionCounter.Inc()
186+
continue
187+
}
188+
log.Debug("select region", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID()))
189+
// Skip hot regions.
190+
if cluster.IsRegionHot(plan.region) {
191+
log.Debug("region is hot", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID()))
192+
balanceRangeHotCounter.Inc()
193+
continue
194+
}
195+
// Check region leader
196+
if plan.region.GetLeader() == nil {
197+
log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID()))
198+
balanceRangeNoLeaderCounter.Inc()
199+
continue
200+
}
201+
plan.fit = replicaFilter.(*filter.RegionReplicatedFilter).GetFit()
202+
if op := s.transferPeer(plan, plan.stores[sourceIndex+1:]); op != nil {
203+
op.Counters = append(op.Counters, balanceRegionNewOpCounter)
204+
return []*operator.Operator{op}, nil
205+
}
206+
}
207+
166208
if err != nil {
167209
log.Error("failed to prepare balance key range scheduler", errs.ZapError(err))
168210
return nil,nil
169211

170212
}
171213
}
172214

173-
// BalanceKeyRangeSchedulerPlan is used to record the plan of balance key range scheduler.
174-
type BalanceKeyRangeSchedulerPlan struct {
175-
source []*core.StoreInfo
176-
// store_id -> score
177-
scores map[uint64]uint64
178-
// store_id -> peer
179-
regions map[uint64]*metapb.Peer
215+
// transferPeer selects the best store to create a new peer to replace the old peer.
216+
func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, dstStores []*storeInfo) *operator.Operator {
217+
excludeTargets := plan.region.GetStoreIDs()
218+
if s.role!=Leader{
219+
excludeTargets = append(excludeTargets, plan.sourceStoreID())
220+
}
221+
return nil
222+
}
223+
224+
// balanceRangeSchedulerPlan is used to record the plan of balance key range scheduler.
225+
type balanceRangeSchedulerPlan struct {
226+
// stores is sorted by score desc
227+
stores []*storeInfo
228+
source *storeInfo
229+
target *storeInfo
230+
region *core.RegionInfo
231+
fit *placement.RegionFit
232+
}
233+
234+
type storeInfo struct {
235+
store *core.StoreInfo
236+
score uint64
180237
}
181238

182-
func (s *balanceKeyRangeScheduler) prepare(cluster sche.SchedulerCluster)(*BalanceKeyRangeSchedulerPlan,error) {
239+
func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster)(*balanceRangeSchedulerPlan,error) {
183240
krs := core.NewKeyRanges(s.conf.Ranges)
184241
scanRegions, err := cluster.BatchScanRegions(krs)
185242
if err != nil {
186243
return nil,err
187244
}
188-
stores := cluster.GetStores()
189-
sources := filter.SelectSourceStores(stores, s.filters, cluster.GetSchedulerConfig(), nil, nil)
190-
scores := make(map[uint64]uint64, len(sources))
191-
regions:=make(map[uint64]*metapb.Peer,len(scanRegions))
245+
sources := filter.SelectSourceStores(cluster.GetStores(), s.filters, cluster.GetSchedulerConfig(), nil, nil)
246+
storeInfos:=make(map[uint64]*storeInfo,len(sources))
247+
for _, source := range sources {
248+
storeInfos[source.GetID()] = &storeInfo{store: source}
249+
}
192250
for _, region := range scanRegions {
193251
for _, peer := range s.role.getPeers(region) {
194-
scores[peer.GetStoreId()] += 1
195-
regions[peer.GetStoreId()] = peer
252+
storeInfos[peer.GetStoreId()].score += 1
196253
}
197254
}
198-
return &BalanceKeyRangeSchedulerPlan{
199-
source: sources,
200-
scores: scores,
201-
regions: regions,
255+
256+
stores:=make([]*storeInfo,0,len(storeInfos))
257+
for _, store := range storeInfos {
258+
stores = append(stores, store)
259+
}
260+
sort.Slice(stores, func(i, j int) bool {
261+
return stores[i].score > stores[j].score
262+
})
263+
return &balanceRangeSchedulerPlan{
264+
stores:stores,
265+
source: nil,
266+
target: nil,
267+
region: nil,
202268
},nil
203269
}
204270

271+
func (p *balanceRangeSchedulerPlan) sourceStoreID() uint64 {
272+
return p.source.store.GetID()
273+
}
274+
275+
func (p *balanceRangeSchedulerPlan) targetStoreID() uint64 {
276+
return p.target.store.GetID()
277+
}
278+
205279

206280

207281
type Role int
208282

209283
const (
210284
Leader Role = iota
211-
Voter
285+
Follower
212286
Learner
213287
Unknown
214288
RoleLen
@@ -218,7 +292,7 @@ func (r Role) String() string {
218292
switch r {
219293
case Leader:
220294
return "leader"
221-
case Voter:
295+
case Follower:
222296
return "voter"
223297
case Learner:
224298
return "learner"
@@ -231,8 +305,8 @@ func NewRole(role string) Role {
231305
switch role {
232306
case "leader":
233307
return Leader
234-
case "voter":
235-
return Voter
308+
case "follower":
309+
return Follower
236310
case "learner":
237311
return Learner
238312
default:
@@ -244,8 +318,13 @@ func (r Role) getPeers(region *core.RegionInfo) []*metapb.Peer {{
244318
switch r {
245319
case Leader:
246320
return []*metapb.Peer{region.GetLeader()}
247-
case Voter:
248-
return region.GetVoters()
321+
case Follower:
322+
followers:=region.GetFollowers()
323+
ret:=make([]*metapb.Peer,len(followers))
324+
for _,peer:=range followers{
325+
ret=append(ret,peer)
326+
}
327+
return ret
249328
case Learner:
250329
return region.GetLearners()
251330
default:

0 commit comments

Comments
 (0)