Skip to content

Commit

Permalink
schedule: impl balance range scheduler (#9005)
Browse files Browse the repository at this point in the history
close #9006

Signed-off-by: 童剑 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
bufferflies and ti-chi-bot[bot] authored Mar 5, 2025
1 parent c828ec8 commit bdd857e
Show file tree
Hide file tree
Showing 14 changed files with 733 additions and 69 deletions.
11 changes: 11 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ type KeyRanges struct {
krs []*KeyRange
}

// NewKeyRanges creates a KeyRanges.
func NewKeyRanges(ranges []KeyRange) *KeyRanges {
krs := make([]*KeyRange, 0, len(ranges))
for _, kr := range ranges {
krs = append(krs, &kr)
}
return &KeyRanges{
krs,
}
}

// NewKeyRangesWithSize creates a KeyRanges with the hint size.
func NewKeyRangesWithSize(size int) *KeyRanges {
return &KeyRanges{
Expand Down
77 changes: 67 additions & 10 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,73 @@ func (r *RegionInfo) GetPeer(peerID uint64) *metapb.Peer {
return nil
}

// Role is the role of the region.
type Role int

const (
// Leader is the leader of the region.
Leader Role = iota
// Follower is the follower of the region.
Follower
// Learner is the learner of the region.
Learner
// Unknown is the unknown role of the region include witness.
Unknown
)

// String returns the string value of the role.
func (r Role) String() string {
switch r {
case Leader:
return "leader"
case Follower:
return "voter"
case Learner:
return "learner"
default:
return "unknown"
}
}

// NewRole creates a new role.
func NewRole(role string) Role {
switch role {
case "leader":
return Leader
case "follower":
return Follower
case "learner":
return Learner
default:
return Unknown
}
}

// MarshalJSON returns the JSON encoding of Role.
func (r Role) MarshalJSON() ([]byte, error) {
return []byte(`"` + r.String() + `"`), nil
}

// GetPeersByRole returns the peers with specified role.
func (r *RegionInfo) GetPeersByRole(role Role) []*metapb.Peer {
switch role {
case Leader:
return []*metapb.Peer{r.GetLeader()}
case Follower:
followers := r.GetFollowers()
ret := make([]*metapb.Peer, 0, len(followers))
for _, peer := range followers {
ret = append(ret, peer)
}
return ret
case Learner:
learners := r.GetLearners()
return learners
default:
return nil
}
}

// GetDownPeer returns the down peer with specified peer id.
func (r *RegionInfo) GetDownPeer(peerID uint64) *metapb.Peer {
for _, down := range r.downPeers {
Expand Down Expand Up @@ -482,16 +549,6 @@ func (r *RegionInfo) GetFollowers() map[uint64]*metapb.Peer {
return followers
}

// GetFollower randomly returns a follow peer.
func (r *RegionInfo) GetFollower() *metapb.Peer {
for _, peer := range r.GetVoters() {
if r.leader == nil || r.leader.GetId() != peer.GetId() {
return peer
}
}
return nil
}

// GetNonWitnessVoters returns a map indicate the non-witness voter peers distributed.
func (r *RegionInfo) GetNonWitnessVoters() map[uint64]*metapb.Peer {
peers := r.GetVoters()
Expand Down
35 changes: 35 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,3 +1291,38 @@ func TestQueryRegions(t *testing.T) {
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
}

func TestGetPeers(t *testing.T) {
re := require.New(t)
learner := &metapb.Peer{StoreId: 1, Id: 1, Role: metapb.PeerRole_Learner}
leader := &metapb.Peer{StoreId: 2, Id: 2}
follower1 := &metapb.Peer{StoreId: 3, Id: 3}
follower2 := &metapb.Peer{StoreId: 4, Id: 4}
region := NewRegionInfo(&metapb.Region{Id: 100, Peers: []*metapb.Peer{
leader, follower1, follower2, learner,
}}, leader, WithLearners([]*metapb.Peer{learner}))
for _, v := range []struct {
role string
peers []*metapb.Peer
}{
{
role: "leader",
peers: []*metapb.Peer{leader},
},
{
role: "follower",
peers: []*metapb.Peer{follower1, follower2},
},
{
role: "learner",
peers: []*metapb.Peer{learner},
},
{
role: "witness",
peers: nil,
},
} {
role := NewRole(v.role)
re.Equal(v.peers, region.GetPeersByRole(role))
}
}
2 changes: 2 additions & 0 deletions pkg/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,8 @@ var (
allSpecialEngines = []string{core.EngineTiFlash}
// NotSpecialEngines is used to filter the special engine.
NotSpecialEngines = placement.LabelConstraint{Key: core.EngineKey, Op: placement.NotIn, Values: allSpecialEngines}
// SpecialEngines is used to filter the TiFlash engine.
SpecialEngines = placement.LabelConstraint{Key: core.EngineKey, Op: placement.In, Values: allSpecialEngines}
)

type isolationFilter struct {
Expand Down
10 changes: 10 additions & 0 deletions pkg/schedule/filter/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,16 @@ func TestSpecialUseFilter(t *testing.T) {
}
}

func TestSpecialEngine(t *testing.T) {
re := require.New(t)
tiflash := core.NewStoreInfoWithLabel(1, map[string]string{core.EngineKey: core.EngineTiFlash})
tikv := core.NewStoreInfoWithLabel(2, map[string]string{core.EngineKey: core.EngineTiKV})
re.True(SpecialEngines.MatchStore(tiflash))
re.False(SpecialEngines.MatchStore(tikv))
re.True(NotSpecialEngines.MatchStore(tikv))
re.False(NotSpecialEngines.MatchStore(tiflash))
}

func BenchmarkCloneRegionTest(b *testing.B) {
epoch := &metapb.RegionEpoch{
ConfVer: 1,
Expand Down
14 changes: 14 additions & 0 deletions pkg/schedule/operator/influence.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ type StoreInfluence struct {
StepCost map[storelimit.Type]int64
}

// GetStoreInfluenceByRole returns the influence of the store according to the role.
func (s *StoreInfluence) GetStoreInfluenceByRole(r core.Role) int64 {
switch r {
case core.Leader:
return s.LeaderCount
case core.Follower:
return s.RegionCount
case core.Learner:
return s.RegionCount
default:
return 0
}
}

func (s *StoreInfluence) add(other *StoreInfluence) {
s.RegionCount += other.RegionCount
s.RegionSize += other.RegionSize
Expand Down
27 changes: 26 additions & 1 deletion pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package operator

import (
"bytes"
"context"
"fmt"
"strconv"
Expand Down Expand Up @@ -828,14 +829,33 @@ func (oc *Controller) GetHistory(start time.Time) []OpHistory {
return history
}

// OpInfluenceOption is used to filter the region.
// returns true if the region meets the condition, it will ignore this region in the influence calculation.
// returns false if the region does not meet the condition, it will calculate the influence of this region.
type OpInfluenceOption func(region *core.RegionInfo) bool

// WithRangeOption returns an OpInfluenceOption that filters the region by the key ranges.
func WithRangeOption(ranges []core.KeyRange) OpInfluenceOption {
return func(region *core.RegionInfo) bool {
for _, r := range ranges {
// the start key of the region must greater than the given range start key.
// the end key of the region must less than the given range end key.
if bytes.Compare(region.GetStartKey(), r.StartKey) < 0 || bytes.Compare(r.EndKey, region.GetEndKey()) < 0 {
return false
}
}
return true
}
}

// OperatorCount gets the count of operators filtered by kind.
// kind only has one OpKind.
func (oc *Controller) OperatorCount(kind OpKind) uint64 {
return oc.counts.getCountByKind(kind)
}

// GetOpInfluence gets OpInfluence.
func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence {
func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster, ops ...OpInfluenceOption) OpInfluence {
influence := OpInfluence{
StoresInfluence: make(map[uint64]*StoreInfluence),
}
Expand All @@ -844,6 +864,11 @@ func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence {
op := value.(*Operator)
if !op.CheckTimeout() && !op.CheckSuccess() {
region := cluster.GetRegion(op.RegionID())
for _, opt := range ops {
if !opt(region) {
return true
}
}
if region != nil {
op.UnfinishedInfluence(influence, region)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/schedule/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,30 @@ func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() {
re.Equal(3, stream.MsgLength())
}

func (suite *operatorControllerTestSuite) TestInfluenceOpt() {
re := suite.Require()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster, false /* no need to run */)
controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream)
cluster.AddLeaderRegionWithRange(1, "200", "300", 1, 2, 3)
op := &Operator{
regionID: 1,
kind: OpRegion,
steps: []OpStep{
AddLearner{ToStore: 2, PeerID: 2},
},
timeout: time.Minute,
}
re.True(controller.addOperatorInner(op))
op.Start()
inf := controller.GetOpInfluence(cluster.GetBasicCluster())
re.Len(inf.StoresInfluence, 1)
inf = controller.GetOpInfluence(cluster.GetBasicCluster(), WithRangeOption([]core.KeyRange{{StartKey: []byte("220"), EndKey: []byte("280")}}))
re.Empty(inf.StoresInfluence)
inf = controller.GetOpInfluence(cluster.GetBasicCluster(), WithRangeOption([]core.KeyRange{{StartKey: []byte("100"), EndKey: []byte("400")}}))
re.Len(inf.StoresInfluence, 1)
}

func (suite *operatorControllerTestSuite) TestCalcInfluence() {
re := suite.Require()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
Expand Down
Loading

0 comments on commit bdd857e

Please sign in to comment.