From 81294c85a8b11d2c20901c12edfd09ccdfd87e89 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 12 Dec 2024 18:35:55 +0800 Subject: [PATCH 1/4] scheduler: consider leader score when evict leader Signed-off-by: lhy1024 --- pkg/schedule/checker/rule_checker.go | 4 +-- pkg/schedule/config/config_provider.go | 2 +- pkg/schedule/filter/comparer.go | 18 ++++++++++++ pkg/schedule/handler/handler.go | 2 +- pkg/schedule/operator/builder.go | 19 ------------ pkg/schedule/operator/create_operator.go | 3 +- pkg/schedule/operator/create_operator_test.go | 2 +- pkg/schedule/schedulers/balance_leader.go | 2 +- pkg/schedule/schedulers/evict_leader.go | 29 ++++++++++++------- pkg/schedule/schedulers/grant_hot_region.go | 2 +- pkg/schedule/schedulers/hot_region.go | 1 - pkg/schedule/schedulers/hot_region_test.go | 2 +- pkg/schedule/schedulers/label.go | 2 +- pkg/schedule/schedulers/shuffle_leader.go | 2 +- .../schedulers/transfer_witness_leader.go | 21 ++++++++------ plugin/scheduler_example/evict_leader.go | 2 +- 16 files changed, 61 insertions(+), 52 deletions(-) diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 2d06f84fdfe..a3e1eb73e2b 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -320,7 +320,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement. if region.GetLeader().GetId() != peer.GetId() && rf.Rule.Role == placement.Leader { ruleCheckerFixLeaderRoleCounter.Inc() if c.allowLeader(fit, peer) { - return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, peer.GetStoreId(), []uint64{}, 0) + return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, peer.GetStoreId(), 0) } ruleCheckerNotAllowLeaderCounter.Inc() return nil, errPeerCannotBeLeader @@ -329,7 +329,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement. ruleCheckerFixFollowerRoleCounter.Inc() for _, p := range region.GetPeers() { if c.allowLeader(fit, p) { - return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, p.GetStoreId(), []uint64{}, 0) + return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, p.GetStoreId(), 0) } } ruleCheckerNoNewLeaderCounter.Inc() diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index 5c1be1089e9..0c89ef87a0e 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -65,7 +65,6 @@ type SchedulerConfigProvider interface { IsTraceRegionFlow() bool GetTolerantSizeRatio() float64 - GetLeaderSchedulePolicy() constant.SchedulePolicy IsDebugMetricsEnabled() bool IsDiagnosticAllowed() bool @@ -112,6 +111,7 @@ type SharedConfigProvider interface { IsCrossTableMergeEnabled() bool IsOneWayMergeEnabled() bool GetMergeScheduleLimit() uint64 + GetLeaderSchedulePolicy() constant.SchedulePolicy GetRegionScoreFormulaVersion() string GetSchedulerMaxWaitingOperator() uint64 GetStoreLimitByType(uint64, storelimit.Type) float64 diff --git a/pkg/schedule/filter/comparer.go b/pkg/schedule/filter/comparer.go index 58d3032f36d..1d9eab4d44b 100644 --- a/pkg/schedule/filter/comparer.go +++ b/pkg/schedule/filter/comparer.go @@ -40,6 +40,24 @@ func RegionScoreComparer(conf config.SharedConfigProvider) StoreComparer { } } +// LeaderScoreComparer creates a StoreComparer to sort store by leader +// score. +func LeaderScoreComparer(conf config.SchedulerConfigProvider) StoreComparer { + leaderSchedulePolicy := conf.GetLeaderSchedulePolicy() + return func(a, b *core.StoreInfo) int { + sa := a.LeaderScore(leaderSchedulePolicy, 0) + sb := b.LeaderScore(leaderSchedulePolicy, 0) + switch { + case sa > sb: + return 1 + case sa < sb: + return -1 + default: + return 0 + } + } +} + // IsolationComparer creates a StoreComparer to sort store by isolation score. func IsolationComparer(locationLabels []string, regionStores []*core.StoreInfo) StoreComparer { return func(a, b *core.StoreInfo) int { diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index a8540b4b5f4..730b7d76771 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -418,7 +418,7 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err return errors.Errorf("region has no voter in store %v", storeID) } - op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, newLeader.GetStoreId(), []uint64{}, operator.OpAdmin) + op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, newLeader.GetStoreId(), operator.OpAdmin) if err != nil { log.Debug("fail to create transfer leader operator", errs.ZapError(err)) return err diff --git a/pkg/schedule/operator/builder.go b/pkg/schedule/operator/builder.go index 29b8aedf978..3bca53f2eaa 100644 --- a/pkg/schedule/operator/builder.go +++ b/pkg/schedule/operator/builder.go @@ -296,25 +296,6 @@ func (b *Builder) SetLeader(storeID uint64) *Builder { return b } -// SetLeaders records all valid target leaders in Builder. -func (b *Builder) SetLeaders(storeIDs []uint64) *Builder { - if b.err != nil { - return b - } - sort.Slice(storeIDs, func(i, j int) bool { return storeIDs[i] < storeIDs[j] }) - for _, storeID := range storeIDs { - peer := b.targetPeers[storeID] - if peer == nil || core.IsLearner(peer) || b.unhealthyPeers[storeID] != nil { - continue - } - b.targetLeaderStoreIDs = append(b.targetLeaderStoreIDs, storeID) - } - // Don't need to check if there's valid target, because `targetLeaderStoreIDs` - // can be empty if this is not a multi-target evict leader operation. Besides, - // `targetLeaderStoreID` must be valid and there must be at least one valid target. - return b -} - // SetPeers resets the target peer list. // // If peer's ID is 0, the builder will allocate a new ID later. If current diff --git a/pkg/schedule/operator/create_operator.go b/pkg/schedule/operator/create_operator.go index 4fae7f9e3f2..782e49bffb1 100644 --- a/pkg/schedule/operator/create_operator.go +++ b/pkg/schedule/operator/create_operator.go @@ -78,10 +78,9 @@ func CreateRemovePeerOperator(desc string, ci sche.SharedCluster, kind OpKind, r } // CreateTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store. -func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) { +func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, kind OpKind) (*Operator, error) { return NewBuilder(desc, ci, region, SkipOriginJointStateCheck). SetLeader(targetStoreID). - SetLeaders(targetStoreIDs). Build(kind) } diff --git a/pkg/schedule/operator/create_operator_test.go b/pkg/schedule/operator/create_operator_test.go index 845255e713c..bb7f8945bea 100644 --- a/pkg/schedule/operator/create_operator_test.go +++ b/pkg/schedule/operator/create_operator_test.go @@ -423,7 +423,7 @@ func (suite *createOperatorTestSuite) TestCreateTransferLeaderOperator() { } for _, testCase := range testCases { region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: testCase.originPeers}, testCase.originPeers[0]) - op, err := CreateTransferLeaderOperator("test", suite.cluster, region, testCase.targetLeaderStoreID, []uint64{}, 0) + op, err := CreateTransferLeaderOperator("test", suite.cluster, region, testCase.targetLeaderStoreID, 0) if testCase.isErr { re.Error(err) diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 03f02002c74..e4802494a65 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -535,7 +535,7 @@ func (s *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. } solver.Step++ defer func() { solver.Step-- }() - op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader) + op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), operator.OpLeader) if err != nil { log.Debug("fail to create balance leader operator", errs.ZapError(err)) if collector != nil { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index defc65846ae..f818ce78fa1 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -361,19 +361,12 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) - // Compatible with old TiKV transfer leader logic. - target := candidates.RandomPick() - targets := candidates.PickAll() - // `targets` MUST contains `target`, so only needs to check if `target` is nil here. - if target == nil { + + if len(candidates.Stores) == 0 { evictLeaderNoTargetStoreCounter.Inc() continue } - targetIDs := make([]uint64, 0, len(targets)) - for _, t := range targets { - targetIDs = append(targetIDs, t.GetID()) - } - op, err := operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpLeader) + op, err := createOperatorWithSort(name, cluster, candidates, region) if err != nil { log.Debug("fail to create evict leader operator", errs.ZapError(err)) continue @@ -385,6 +378,22 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl return ops } +func createOperatorWithSort(name string, cluster sche.SchedulerCluster, candidates *filter.StoreCandidates, region *core.RegionInfo) (*operator.Operator, error) { + // we will pick low leader score store firstly. + candidates.Sort(filter.RegionScoreComparer(cluster.GetSharedConfig())) + var ( + op *operator.Operator + err error + ) + for _, target := range candidates.Stores { + op, err = operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), operator.OpLeader) + if op != nil && err == nil { + return op, err + } + } + return op, err +} + type evictLeaderHandler struct { rd *render.Render config *evictLeaderSchedulerConfig diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 88b9f5c6c93..5db37f9482a 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -314,7 +314,7 @@ func (s *grantHotRegionScheduler) transfer(cluster sche.SchedulerCluster, region dstStore := &metapb.Peer{StoreId: destStoreIDs[i]} if isLeader { - op, err = operator.CreateTransferLeaderOperator(s.GetName()+"-leader", cluster, srcRegion, dstStore.StoreId, []uint64{}, operator.OpLeader) + op, err = operator.CreateTransferLeaderOperator(s.GetName()+"-leader", cluster, srcRegion, dstStore.StoreId, operator.OpLeader) } else { op, err = operator.CreateMovePeerOperator(s.GetName()+"-move", cluster, srcRegion, operator.OpRegion|operator.OpLeader, srcStore.GetID(), dstStore) } diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 97a558c3fe4..1f185df1169 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -1473,7 +1473,6 @@ func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dst bs, region, dstStoreID, - []uint64{}, operator.OpHotRegion) } else { srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers` diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 9f79ac617c9..5e115ce0ff5 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -148,7 +148,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { case movePeer: op, err = operator.CreateMovePeerOperator("move-peer-test", tc, region, operator.OpAdmin, 2, &metapb.Peer{Id: region.GetID()*10000 + 1, StoreId: 4}) case transferLeader: - op, err = operator.CreateTransferLeaderOperator("transfer-leader-test", tc, region, 2, []uint64{}, operator.OpAdmin) + op, err = operator.CreateTransferLeaderOperator("transfer-leader-test", tc, region, 2, operator.OpAdmin) } re.NoError(err) re.NotNil(op) diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index a27ea29687e..5b9a24df288 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -100,7 +100,7 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*ope continue } - op, err := operator.CreateTransferLeaderOperator("label-reject-leader", cluster, region, target.GetID(), []uint64{}, operator.OpLeader) + op, err := operator.CreateTransferLeaderOperator("label-reject-leader", cluster, region, target.GetID(), operator.OpLeader) if err != nil { log.Debug("fail to create transfer label reject leader operator", errs.ZapError(err)) return nil, nil diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index e2a256af7a7..d66d40d2b9e 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -88,7 +88,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) shuffleLeaderNoFollowerCounter.Inc() return nil, nil } - op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, targetStore.GetID(), []uint64{}, operator.OpAdmin) + op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, targetStore.GetID(), operator.OpAdmin) if err != nil { log.Debug("fail to create shuffle leader operator", errs.ZapError(err)) return nil, nil diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index 90191dd355c..ed8320e04bb 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -98,19 +98,22 @@ func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.Sched filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) - // Compatible with old TiKV transfer leader logic. - target := candidates.RandomPick() - targets := candidates.PickAll() - // `targets` MUST contains `target`, so only needs to check if `target` is nil here. - if target == nil { + if len(candidates.Stores) == 0 { transferWitnessLeaderNoTargetStoreCounter.Inc() return nil, errors.New("no target store to schedule") } - targetIDs := make([]uint64, 0, len(targets)) - for _, t := range targets { - targetIDs = append(targetIDs, t.GetID()) + // TODO: also add sort such as evict leader + var ( + op *operator.Operator + err error + ) + for _, target := range candidates.Stores { + op, err = operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), operator.OpLeader) + if op != nil && err == nil { + return op, err + } } - return operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpWitnessLeader) + return op, err } // RecvRegionInfo receives a checked region from coordinator diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index eb976edf851..485ccc5f1a6 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -228,7 +228,7 @@ func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ( if target == nil { continue } - op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), []uint64{}, operator.OpLeader) + op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), operator.OpLeader) if err != nil { log.Debug("fail to create evict leader operator", errs.ZapError(err)) continue From c0028dd0b082534350bd1d4e548e77779ed8da55 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 8 Jan 2025 21:27:57 +0800 Subject: [PATCH 2/4] add test Signed-off-by: lhy1024 --- pkg/schedule/filter/comparer.go | 4 +- pkg/schedule/filter/comparer_test.go | 60 ++++++++++++++++++++++++ pkg/schedule/schedulers/evict_leader.go | 2 +- pkg/utils/operatorutil/operator_check.go | 2 +- 4 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 pkg/schedule/filter/comparer_test.go diff --git a/pkg/schedule/filter/comparer.go b/pkg/schedule/filter/comparer.go index 1d9eab4d44b..f34d4995064 100644 --- a/pkg/schedule/filter/comparer.go +++ b/pkg/schedule/filter/comparer.go @@ -27,6 +27,7 @@ type StoreComparer func(a, b *core.StoreInfo) int // score. func RegionScoreComparer(conf config.SharedConfigProvider) StoreComparer { return func(a, b *core.StoreInfo) int { + // TODO: we should use the real time delta data to calculate the score. sa := a.RegionScore(conf.GetRegionScoreFormulaVersion(), conf.GetHighSpaceRatio(), conf.GetLowSpaceRatio(), 0) sb := b.RegionScore(conf.GetRegionScoreFormulaVersion(), conf.GetHighSpaceRatio(), conf.GetLowSpaceRatio(), 0) switch { @@ -42,9 +43,10 @@ func RegionScoreComparer(conf config.SharedConfigProvider) StoreComparer { // LeaderScoreComparer creates a StoreComparer to sort store by leader // score. -func LeaderScoreComparer(conf config.SchedulerConfigProvider) StoreComparer { +func LeaderScoreComparer(conf config.SharedConfigProvider) StoreComparer { leaderSchedulePolicy := conf.GetLeaderSchedulePolicy() return func(a, b *core.StoreInfo) int { + // TODO: we should use the real time delta data to calculate the score. sa := a.LeaderScore(leaderSchedulePolicy, 0) sb := b.LeaderScore(leaderSchedulePolicy, 0) switch { diff --git a/pkg/schedule/filter/comparer_test.go b/pkg/schedule/filter/comparer_test.go new file mode 100644 index 00000000000..8872d00c1b2 --- /dev/null +++ b/pkg/schedule/filter/comparer_test.go @@ -0,0 +1,60 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "math/rand" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockconfig" +) + +func TestRegionCompare(t *testing.T) { + re := require.New(t) + ids := []uint64{1, 2, 3, 4, 5} + stores := make([]*core.StoreInfo, 0, len(ids)) + for _, id := range ids { + stores = append(stores, core.NewStoreInfo( + &metapb.Store{Id: id}, + core.SetRegionSize(int64(6-id)*1000), + )) + } + cs := NewCandidates(rand.New(rand.NewSource(time.Now().UnixNano())), stores) + cfg := mockconfig.NewTestOptions() + re.Equal(uint64(1), cs.PickFirst().GetID()) + cs.Sort(RegionScoreComparer(cfg)) + re.Equal(uint64(5), cs.PickFirst().GetID()) +} + +func TestLeaderCompare(t *testing.T) { + re := require.New(t) + ids := []uint64{1, 2, 3, 4, 5} + stores := make([]*core.StoreInfo, 0, len(ids)) + for _, id := range ids { + stores = append(stores, core.NewStoreInfo( + &metapb.Store{Id: id}, + core.SetLeaderCount(int(6-id)*1000), + )) + } + cs := NewCandidates(rand.New(rand.NewSource(time.Now().UnixNano())), stores) + cfg := mockconfig.NewTestOptions() + re.Equal(uint64(1), cs.PickFirst().GetID()) + cs.Sort(LeaderScoreComparer(cfg)) + re.Equal(uint64(5), cs.PickFirst().GetID()) +} diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index f818ce78fa1..19c591af2cf 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -380,7 +380,7 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl func createOperatorWithSort(name string, cluster sche.SchedulerCluster, candidates *filter.StoreCandidates, region *core.RegionInfo) (*operator.Operator, error) { // we will pick low leader score store firstly. - candidates.Sort(filter.RegionScoreComparer(cluster.GetSharedConfig())) + candidates.Sort(filter.LeaderScoreComparer(cluster.GetSharedConfig())) var ( op *operator.Operator err error diff --git a/pkg/utils/operatorutil/operator_check.go b/pkg/utils/operatorutil/operator_check.go index 78b9e9d9bab..8736139cc7d 100644 --- a/pkg/utils/operatorutil/operator_check.go +++ b/pkg/utils/operatorutil/operator_check.go @@ -52,7 +52,7 @@ func CheckMultiTargetTransferLeader(re *require.Assertions, op *operator.Operato re.Equal(1, op.Len()) expectedOps := make([]any, 0, len(targetIDs)) for _, targetID := range targetIDs { - expectedOps = append(expectedOps, operator.TransferLeader{FromStore: sourceID, ToStore: targetID, ToStores: targetIDs}) + expectedOps = append(expectedOps, operator.TransferLeader{FromStore: sourceID, ToStore: targetID}) } re.Contains(expectedOps, op.Step(0)) kind |= operator.OpLeader From f2adb40854768a7b1ddeb2e7201835e7f9d361e5 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 8 Jan 2025 21:46:48 +0800 Subject: [PATCH 3/4] fix ci Signed-off-by: lhy1024 --- pkg/schedule/filter/comparer_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/schedule/filter/comparer_test.go b/pkg/schedule/filter/comparer_test.go index 8872d00c1b2..fa50e982f1d 100644 --- a/pkg/schedule/filter/comparer_test.go +++ b/pkg/schedule/filter/comparer_test.go @@ -19,8 +19,10 @@ import ( "testing" "time" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockconfig" ) From 8a695a51c4181115b7dbaca52817b80235d51303 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 9 Jan 2025 12:59:54 +0800 Subject: [PATCH 4/4] use op influence Signed-off-by: lhy1024 --- pkg/schedule/config/config_provider.go | 2 +- pkg/schedule/filter/comparer.go | 19 ------------- pkg/schedule/filter/comparer_test.go | 17 ----------- pkg/schedule/schedulers/evict_leader.go | 26 +++++++++++------ pkg/schedule/schedulers/evict_leader_test.go | 30 ++++++++++++++++++++ pkg/schedule/schedulers/evict_slow_store.go | 2 +- pkg/schedule/schedulers/evict_slow_trend.go | 2 +- 7 files changed, 51 insertions(+), 47 deletions(-) diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index 2d5e8bd2052..a18051ca38e 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -67,6 +67,7 @@ type SchedulerConfigProvider interface { IsTraceRegionFlow() bool GetTolerantSizeRatio() float64 + GetLeaderSchedulePolicy() constant.SchedulePolicy IsDebugMetricsEnabled() bool IsDiagnosticAllowed() bool @@ -113,7 +114,6 @@ type SharedConfigProvider interface { IsCrossTableMergeEnabled() bool IsOneWayMergeEnabled() bool GetMergeScheduleLimit() uint64 - GetLeaderSchedulePolicy() constant.SchedulePolicy GetRegionScoreFormulaVersion() string GetSchedulerMaxWaitingOperator() uint64 GetStoreLimitByType(uint64, storelimit.Type) float64 diff --git a/pkg/schedule/filter/comparer.go b/pkg/schedule/filter/comparer.go index f34d4995064..75877066835 100644 --- a/pkg/schedule/filter/comparer.go +++ b/pkg/schedule/filter/comparer.go @@ -41,25 +41,6 @@ func RegionScoreComparer(conf config.SharedConfigProvider) StoreComparer { } } -// LeaderScoreComparer creates a StoreComparer to sort store by leader -// score. -func LeaderScoreComparer(conf config.SharedConfigProvider) StoreComparer { - leaderSchedulePolicy := conf.GetLeaderSchedulePolicy() - return func(a, b *core.StoreInfo) int { - // TODO: we should use the real time delta data to calculate the score. - sa := a.LeaderScore(leaderSchedulePolicy, 0) - sb := b.LeaderScore(leaderSchedulePolicy, 0) - switch { - case sa > sb: - return 1 - case sa < sb: - return -1 - default: - return 0 - } - } -} - // IsolationComparer creates a StoreComparer to sort store by isolation score. func IsolationComparer(locationLabels []string, regionStores []*core.StoreInfo) StoreComparer { return func(a, b *core.StoreInfo) int { diff --git a/pkg/schedule/filter/comparer_test.go b/pkg/schedule/filter/comparer_test.go index fa50e982f1d..b318808b192 100644 --- a/pkg/schedule/filter/comparer_test.go +++ b/pkg/schedule/filter/comparer_test.go @@ -43,20 +43,3 @@ func TestRegionCompare(t *testing.T) { cs.Sort(RegionScoreComparer(cfg)) re.Equal(uint64(5), cs.PickFirst().GetID()) } - -func TestLeaderCompare(t *testing.T) { - re := require.New(t) - ids := []uint64{1, 2, 3, 4, 5} - stores := make([]*core.StoreInfo, 0, len(ids)) - for _, id := range ids { - stores = append(stores, core.NewStoreInfo( - &metapb.Store{Id: id}, - core.SetLeaderCount(int(6-id)*1000), - )) - } - cs := NewCandidates(rand.New(rand.NewSource(time.Now().UnixNano())), stores) - cfg := mockconfig.NewTestOptions() - re.Equal(uint64(1), cs.PickFirst().GetID()) - cs.Sort(LeaderScoreComparer(cfg)) - re.Equal(uint64(5), cs.PickFirst().GetID()) -} diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 6459b04cb37..bf06ee4dd11 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -17,6 +17,7 @@ package schedulers import ( "math/rand" "net/http" + "sort" "strconv" "github.com/gorilla/mux" @@ -288,7 +289,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) // Schedule implements the Scheduler interface. func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() - return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf), nil + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf, s.OpController), nil } func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { @@ -312,11 +313,11 @@ type evictLeaderStoresConf interface { getBatch() int } -func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, opController *operator.Controller) []*operator.Operator { var ops []*operator.Operator batchSize := conf.getBatch() for range batchSize { - once := scheduleEvictLeaderOnce(r, name, cluster, conf) + once := scheduleEvictLeaderOnce(r, name, cluster, conf, opController) // no more regions if len(once) == 0 { break @@ -330,7 +331,7 @@ func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerC return ops } -func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, opController *operator.Controller) []*operator.Operator { stores := conf.getStores() ops := make([]*operator.Operator, 0, len(stores)) for _, storeID := range stores { @@ -368,7 +369,7 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl evictLeaderNoTargetStoreCounter.Inc() continue } - op, err := createOperatorWithSort(name, cluster, candidates, region) + op, err := createOperatorWithSort(name, cluster, candidates, region, opController) if err != nil { log.Debug("fail to create evict leader operator", errs.ZapError(err)) continue @@ -380,14 +381,23 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl return ops } -func createOperatorWithSort(name string, cluster sche.SchedulerCluster, candidates *filter.StoreCandidates, region *core.RegionInfo) (*operator.Operator, error) { +func createOperatorWithSort(name string, cluster sche.SchedulerCluster, candidates *filter.StoreCandidates, region *core.RegionInfo, opController *operator.Controller) (*operator.Operator, error) { // we will pick low leader score store firstly. - candidates.Sort(filter.LeaderScoreComparer(cluster.GetSharedConfig())) + targets := candidates.Stores + sort.Slice(targets, func(i, j int) bool { + leaderSchedulePolicy := cluster.GetSchedulerConfig().GetLeaderSchedulePolicy() + opInfluence := opController.GetOpInfluence(cluster.GetBasicCluster()) + kind := constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy) + iOp := opInfluence.GetStoreInfluence(targets[i].GetID()).ResourceProperty(kind) + jOp := opInfluence.GetStoreInfluence(targets[j].GetID()).ResourceProperty(kind) + return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < + targets[j].LeaderScore(leaderSchedulePolicy, jOp) + }) var ( op *operator.Operator err error ) - for _, target := range candidates.Stores { + for _, target := range targets { op, err = operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), operator.OpLeader) if op != nil && err == nil { return op, err diff --git a/pkg/schedule/schedulers/evict_leader_test.go b/pkg/schedule/schedulers/evict_leader_test.go index 587a48358e9..6dbe976f5c6 100644 --- a/pkg/schedule/schedulers/evict_leader_test.go +++ b/pkg/schedule/schedulers/evict_leader_test.go @@ -137,3 +137,33 @@ func TestBatchEvict(t *testing.T) { return len(ops) == 5 }) } + +func TestEvictLeaderSelectsLowScoreStore(t *testing.T) { + re := require.New(t) + cancel, _, tc, oc := prepareSchedulersTest() + defer cancel() + + // Add stores with different scores + tc.AddLeaderStore(1, 30) // store 1 + tc.AddLeaderStore(2, 20) // store 2 + tc.AddLeaderStore(3, 10) // store 3 + + // Add regions 1, 2, 3 with leaders in stores 1, 2, 3 + tc.AddLeaderRegion(1, 1, 2, 3) + + // Create EvictLeader scheduler + sl, err := CreateScheduler(types.EvictLeaderScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.EvictLeaderScheduler, []string{"1"}), func(string) error { return nil }) + re.NoError(err) + re.True(sl.IsScheduleAllowed(tc)) + + // Schedule the operator and it should select store 3 to evict the leader, because it has the lowest score with 10. + ops, _ := sl.Schedule(tc, false) + re.Len(ops, 1) + operatorutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{3}) + + // Schedule the operator and it should select store 2 to evict the leader, because it has the lowest score with 5. + tc.AddLeaderStore(2, 5) + ops, _ = sl.Schedule(tc, false) + re.Len(ops, 1) + operatorutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2}) +} diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 8d8c014b110..af6ab7db244 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -244,7 +244,7 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerClust } func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { - return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf, s.OpController) } // IsScheduleAllowed implements the Scheduler interface. diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index cf4bf3d3b39..bc61ff2f794 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -357,7 +357,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus return nil } storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) - return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf, s.OpController) } // IsScheduleAllowed implements the Scheduler interface.