From e3fea9033bdd8f98647706a87c34271c18b03f36 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 2 Dec 2024 17:59:07 +0800 Subject: [PATCH] move prepare checker Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 9 +---- pkg/schedule/checker/checker_controller.go | 8 +++- pkg/schedule/coordinator.go | 35 ++++++------------ pkg/schedule/{ => core}/prepare_checker.go | 37 ++++++++++--------- .../schedulers/scheduler_controller.go | 8 +++- tests/integrations/mcs/scheduling/api_test.go | 2 - .../mcs/scheduling/server_test.go | 2 - tests/server/api/api_test.go | 2 +- 8 files changed, 46 insertions(+), 57 deletions(-) rename pkg/schedule/{ => core}/prepare_checker.go (68%) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 33e2be96f3b..1da9a0a6543 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -8,7 +8,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" @@ -67,7 +66,6 @@ type Cluster struct { const ( regionLabelGCInterval = time.Hour requestTimeout = 3 * time.Second - collectWaitTime = time.Minute // heartbeat relative const heartbeatTaskRunner = "heartbeat-task-runner" @@ -491,12 +489,7 @@ func (c *Cluster) runUpdateStoreStats() { func (c *Cluster) runCoordinator() { defer logutil.LogPanic() defer c.wg.Done() - // force wait for 1 minute to make prepare checker won't be directly skipped - runCollectWaitTime := collectWaitTime - failpoint.Inject("changeRunCollectWaitTime", func() { - runCollectWaitTime = 1 * time.Second - }) - c.coordinator.RunUntilStop(runCollectWaitTime) + c.coordinator.RunUntilStop() } func (c *Cluster) runMetricsCollectionJob() { diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 49d097daea6..ee834bb6c36 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -89,10 +89,11 @@ type Controller struct { // patrolRegionScanLimit is the limit of regions to scan. // It is calculated by the number of regions. patrolRegionScanLimit int + prepareChecker *sche.PrepareChecker } // NewController create a new Controller. -func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller { +func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller { pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute) c := &Controller{ ctx: ctx, @@ -111,6 +112,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config patrolRegionContext: &PatrolRegionContext{}, interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(), patrolRegionScanLimit: calculateScanLimit(cluster), + prepareChecker: prepareChecker, } c.duration.Store(time.Duration(0)) return c @@ -130,6 +132,10 @@ func (c *Controller) PatrolRegions() { regions []*core.RegionInfo ) for { + if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) { + time.Sleep(time.Second) + continue + } select { case <-ticker.C: c.updateTickerIfNeeded(ticker) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 739ca5c84b6..14c5feb8c9b 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -44,7 +44,6 @@ import ( const ( runSchedulerCheckInterval = 3 * time.Second - collectTimeout = 5 * time.Minute maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond @@ -66,7 +65,7 @@ type Coordinator struct { schedulersInitialized bool cluster sche.ClusterInformer - prepareChecker *prepareChecker + prepareChecker *sche.PrepareChecker checkers *checker.Controller regionScatterer *scatter.RegionScatterer regionSplitter *splitter.RegionSplitter @@ -80,15 +79,16 @@ type Coordinator struct { // NewCoordinator creates a new Coordinator. func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator { ctx, cancel := context.WithCancel(parentCtx) + prepareChecker := sche.NewPrepareChecker() opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams) - schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController) - checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController) + schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController, prepareChecker) + checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController, prepareChecker) return &Coordinator{ ctx: ctx, cancel: cancel, schedulersInitialized: false, cluster: cluster, - prepareChecker: newPrepareChecker(), + prepareChecker: prepareChecker, checkers: checkers, regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddPendingProcessedRegions), regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddPendingProcessedRegions), @@ -204,8 +204,8 @@ func (c *Coordinator) driveSlowNodeScheduler() { } // RunUntilStop runs the coordinator until receiving the stop signal. -func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) { - c.Run(collectWaitTime...) +func (c *Coordinator) RunUntilStop() { + c.Run() <-c.ctx.Done() log.Info("coordinator is stopping") c.GetSchedulersController().Wait() @@ -214,25 +214,12 @@ func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) { } // Run starts coordinator. -func (c *Coordinator) Run(collectWaitTime ...time.Duration) { +func (c *Coordinator) Run() { ticker := time.NewTicker(runSchedulerCheckInterval) failpoint.Inject("changeCoordinatorTicker", func() { ticker.Reset(100 * time.Millisecond) }) defer ticker.Stop() - log.Info("coordinator starts to collect cluster information") - for { - if c.ShouldRun(collectWaitTime...) { - log.Info("coordinator has finished cluster information preparation") - break - } - select { - case <-ticker.C: - case <-c.ctx.Done(): - log.Info("coordinator stops running") - return - } - } log.Info("coordinator starts to run schedulers") c.InitSchedulers(true) @@ -547,8 +534,8 @@ func ResetHotSpotMetrics() { } // ShouldRun returns true if the coordinator should run. -func (c *Coordinator) ShouldRun(collectWaitTime ...time.Duration) bool { - return c.prepareChecker.check(c.cluster.GetBasicCluster(), collectWaitTime...) +func (c *Coordinator) ShouldRun() bool { + return c.prepareChecker.Check(c.cluster.GetBasicCluster()) } // GetSchedulersController returns the schedulers controller. @@ -616,7 +603,7 @@ func (c *Coordinator) GetRuleChecker() *checker.RuleChecker { } // GetPrepareChecker returns the prepare checker. -func (c *Coordinator) GetPrepareChecker() *prepareChecker { +func (c *Coordinator) GetPrepareChecker() *sche.PrepareChecker { return c.prepareChecker } diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/core/prepare_checker.go similarity index 68% rename from pkg/schedule/prepare_checker.go rename to pkg/schedule/core/prepare_checker.go index f691432c72c..d7d5ac2e233 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/core/prepare_checker.go @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package core import ( - "fmt" "time" "github.com/pingcap/log" @@ -24,33 +23,34 @@ import ( "go.uber.org/zap" ) -type prepareChecker struct { +const collectTimeout = 5 * time.Minute + +// PrepareChecker is used to check if the coordinator has finished cluster information preparation. +type PrepareChecker struct { syncutil.RWMutex start time.Time prepared bool } -func newPrepareChecker() *prepareChecker { - return &prepareChecker{ +// NewPrepareChecker creates a new PrepareChecker. +func NewPrepareChecker() *PrepareChecker { + return &PrepareChecker{ start: time.Now(), } } -// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration. -func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool { - checker.Lock() - defer checker.Unlock() - log.Info("check prepare checker", zap.Bool("prepared", checker.prepared), zap.String("count", fmt.Sprintf("%d", c.GetTotalRegionCount())), zap.String("not-from-storage-count", fmt.Sprintf("%d", c.GetClusterNotFromStorageRegionsCnt()))) - if checker.prepared { +// Check checks if the coordinator has finished cluster information preparation. +func (checker *PrepareChecker) Check(c *core.BasicCluster) bool { + if checker.IsPrepared() { return true } + checker.Lock() + defer checker.Unlock() + if time.Since(checker.start) > collectTimeout { checker.prepared = true return true } - if len(collectWaitTime) > 0 && time.Since(checker.start) < collectWaitTime[0] { - return false - } notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() totalRegionsCnt := c.GetTotalRegionCount() // The number of active regions should be more than total region of all stores * core.CollectFactor @@ -63,7 +63,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti } storeID := store.GetID() // It is used to avoid sudden scheduling when scheduling service is just started. - if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { + if float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { return false } if !c.IsStorePrepared(storeID) { @@ -76,22 +76,23 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti } // IsPrepared returns whether the coordinator is prepared. -func (checker *prepareChecker) IsPrepared() bool { +func (checker *PrepareChecker) IsPrepared() bool { checker.RLock() defer checker.RUnlock() return checker.prepared } // SetPrepared is for test purpose -func (checker *prepareChecker) SetPrepared() { +func (checker *PrepareChecker) SetPrepared() { checker.Lock() defer checker.Unlock() checker.prepared = true } // ResetPrepared is for test purpose -func (checker *prepareChecker) ResetPrepared() { +func (checker *PrepareChecker) ResetPrepared() { checker.Lock() defer checker.Unlock() checker.prepared = false + checker.start = time.Now() } diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index cb4ffd6f9c2..5042fed23df 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -56,10 +56,11 @@ type Controller struct { // which will only be initialized and used in the API service mode now. schedulerHandlers map[string]http.Handler opController *operator.Controller + prepareChecker *sche.PrepareChecker } // NewController creates a scheduler controller. -func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller { +func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller { return &Controller{ ctx: ctx, cluster: cluster, @@ -67,6 +68,7 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e schedulers: make(map[string]*ScheduleController), schedulerHandlers: make(map[string]http.Handler), opController: opController, + prepareChecker: prepareChecker, } } @@ -368,6 +370,10 @@ func (c *Controller) runScheduler(s *ScheduleController) { for { select { case <-ticker.C: + if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) { + time.Sleep(time.Second) + continue + } diagnosable := s.IsDiagnosticAllowed() if !s.AllowSchedule(diagnosable) { continue diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index f3e7f235018..0a80aea1c01 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -41,7 +41,6 @@ func TestAPI(t *testing.T) { func (suite *apiTestSuite) SetupSuite() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`)) suite.env = tests.NewSchedulingTestEnvironment(suite.T()) } @@ -49,7 +48,6 @@ func (suite *apiTestSuite) TearDownSuite() { suite.env.Cleanup() re := suite.Require() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime")) } func (suite *apiTestSuite) TestGetCheckerByName() { diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 085b87afe86..ebe19b0b266 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -61,7 +61,6 @@ func (suite *serverTestSuite) SetupSuite() { var err error re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) @@ -83,7 +82,6 @@ func (suite *serverTestSuite) TearDownSuite() { suite.cancel() re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime")) } func (suite *serverTestSuite) TestAllocID() { diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index d52630718b5..cf983fd2869 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -1168,7 +1168,7 @@ func TestDeleteAllRegionCacheScheduling(t *testing.T) { re.Equal(0, int(rc.GetOperatorController().OperatorCount(operator.OpRegion))) // 模拟持续的 region 心跳上报 - for i := 0; i < 10; i++ { + for range 10 { rc.HandleRegionHeartbeat(regionInfo) time.Sleep(100 * time.Millisecond) }