diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 20cb9cda6096..587cf2f80cf7 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -31,7 +31,6 @@ import ( sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/keyutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" @@ -93,8 +92,11 @@ type Controller struct { } // NewController create a new Controller. -func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller { +func NewController(ctx context.Context, cluster sche.CheckerCluster, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller { pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute) + conf := cluster.GetCheckerConfig() + ruleManager := cluster.GetRuleManager() + labeler := cluster.GetRegionLabeler() c := &Controller{ ctx: ctx, cluster: cluster, diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 14c5feb8c9b5..93aa1a13bec2 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -82,7 +82,7 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS prepareChecker := sche.NewPrepareChecker() opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams) schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController, prepareChecker) - checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController, prepareChecker) + checkers := checker.NewController(ctx, cluster, opController, prepareChecker) return &Coordinator{ ctx: ctx, cancel: cancel, @@ -533,11 +533,6 @@ func ResetHotSpotMetrics() { schedulers.HotPendingSum.Reset() } -// ShouldRun returns true if the coordinator should run. -func (c *Coordinator) ShouldRun() bool { - return c.prepareChecker.Check(c.cluster.GetBasicCluster()) -} - // GetSchedulersController returns the schedulers controller. func (c *Coordinator) GetSchedulersController() *schedulers.Controller { return c.schedulers diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index ce2cf01ed166..e9c25fa67808 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -40,7 +40,6 @@ type SchedulerCluster interface { buckets.BucketStatInformer GetSchedulerConfig() sc.SchedulerConfigProvider - GetRegionLabeler() *labeler.RegionLabeler GetStoreConfig() sc.StoreConfigProvider } @@ -61,6 +60,7 @@ type SharedCluster interface { GetBasicCluster() *core.BasicCluster GetSharedConfig() sc.SharedConfigProvider GetRuleManager() *placement.RuleManager + GetRegionLabeler() *labeler.RegionLabeler AllocID() (uint64, error) IsSchedulingHalted() bool } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 99bb60b5558c..6a8a87c1ed3a 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3067,99 +3067,6 @@ func TestPeerState(t *testing.T) { waitNoResponse(re, stream) } -func TestShouldRun(t *testing.T) { - re := require.New(t) - - tc, co, cleanup := prepare(nil, nil, nil, re) - tc.RaftCluster.coordinator = co - defer cleanup() - - re.NoError(tc.addLeaderStore(1, 5)) - re.NoError(tc.addLeaderStore(2, 2)) - re.NoError(tc.addLeaderStore(3, 0)) - re.NoError(tc.addLeaderStore(4, 0)) - re.NoError(tc.LoadRegion(1, 1, 2, 3)) - re.NoError(tc.LoadRegion(2, 1, 2, 3)) - re.NoError(tc.LoadRegion(3, 1, 2, 3)) - re.NoError(tc.LoadRegion(4, 1, 2, 3)) - re.NoError(tc.LoadRegion(5, 1, 2, 3)) - re.NoError(tc.LoadRegion(6, 2, 1, 4)) - re.NoError(tc.LoadRegion(7, 2, 1, 4)) - re.False(co.ShouldRun()) - re.Equal(2, tc.GetStoreRegionCount(4)) - - testCases := []struct { - regionID uint64 - ShouldRun bool - }{ - {1, false}, - {2, false}, - {3, false}, - {4, false}, - {5, false}, - // store4 needs Collect two region - {6, false}, - {7, true}, - } - - for _, testCase := range testCases { - r := tc.GetRegion(testCase.regionID) - nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr)) - re.Equal(testCase.ShouldRun, co.ShouldRun()) - } - nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} - newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) - re.Equal(7, tc.GetClusterNotFromStorageRegionsCnt()) -} - -func TestShouldRunWithNonLeaderRegions(t *testing.T) { - re := require.New(t) - - tc, co, cleanup := prepare(nil, nil, nil, re) - tc.RaftCluster.coordinator = co - defer cleanup() - - re.NoError(tc.addLeaderStore(1, 10)) - re.NoError(tc.addLeaderStore(2, 0)) - re.NoError(tc.addLeaderStore(3, 0)) - for i := range 10 { - re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3)) - } - re.False(co.ShouldRun()) - re.Equal(10, tc.GetStoreRegionCount(1)) - - testCases := []struct { - regionID uint64 - ShouldRun bool - }{ - {1, false}, - {2, false}, - {3, false}, - {4, false}, - {5, false}, - {6, false}, - {7, false}, - {8, false}, - {9, true}, - } - - for _, testCase := range testCases { - r := tc.GetRegion(testCase.regionID) - nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr)) - re.Equal(testCase.ShouldRun, co.ShouldRun()) - } - nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} - newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) - re.Equal(9, tc.GetClusterNotFromStorageRegionsCnt()) - - // Now, after server is prepared, there exist some regions with no leader. - re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) -} - func TestAddScheduler(t *testing.T) { re := require.New(t) diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 6a41ad0823e0..ba4d9e40eefe 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/cache" @@ -36,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/server/api" + "go.uber.org/zap" ) type configTestSuite struct { @@ -133,6 +135,7 @@ func (suite *configTestSuite) TestConfigWatch() { // Manually trigger the config persistence in the PD API server side. func persistConfig(re *require.Assertions, pdLeaderServer *tests.TestServer) { err := pdLeaderServer.GetPersistOptions().Persist(pdLeaderServer.GetServer().GetStorage()) + log.Info("persistConfig", zap.Reflect("opts", pdLeaderServer.GetPersistOptions())) re.NoError(err) } diff --git a/tools/pd-ctl/tests/store/store_test.go b/tools/pd-ctl/tests/store/store_test.go index 6f704a25e8cd..36f7bd6a4f62 100644 --- a/tools/pd-ctl/tests/store/store_test.go +++ b/tools/pd-ctl/tests/store/store_test.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" pdTests "github.com/tikv/pd/tests" ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" @@ -54,13 +55,19 @@ func TestStoreLimitV2(t *testing.T) { re.NoError(leaderServer.BootstrapCluster()) defer cluster.Destroy() + // TODO: fix https://github.com/tikv/pd/issues/7464 + testutil.Eventually(re, func() bool { + return leaderServer.GetRaftCluster().GetCoordinator().AreSchedulersInitialized() + }) + // store command args := []string{"-u", pdAddr, "config", "set", "store-limit-version", "v2"} - _, err = tests.ExecuteCommand(cmd, args...) + output, err := tests.ExecuteCommand(cmd, args...) re.NoError(err) + re.Contains(string(output), "Success") args = []string{"-u", pdAddr, "store", "limit"} - output, err := tests.ExecuteCommand(cmd, args...) + output, err = tests.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "not support get limit")