diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 000000000000..815e04f5c54e --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,93 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" +) + +// Cluster provides an overview of a cluster's basic information. +type Cluster interface { + GetHotStat() *statistics.HotStat + GetRegionStats() *statistics.RegionStatistics + GetLabelStats() *statistics.LabelStatistics + GetCoordinator() *schedule.Coordinator + GetRuleManager() *placement.RuleManager + GetBasicCluster() *core.BasicCluster +} + +// HandleStatsAsync handles the flow asynchronously. +func HandleStatsAsync(c Cluster, region *core.RegionInfo) { + checkWritePeerTask := func(cache *statistics.HotPeerCache) { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetWriteLoads(), interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + + checkExpiredTask := func(cache *statistics.HotPeerCache) { + expiredStats := cache.CollectExpiredItems(region) + for _, stat := range expiredStats { + cache.UpdateStat(stat) + } + } + + c.GetHotStat().CheckWriteAsync(checkExpiredTask) + c.GetHotStat().CheckReadAsync(checkExpiredTask) + c.GetHotStat().CheckWriteAsync(checkWritePeerTask) + c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) +} + +// HandleOverlaps handles the overlap regions. +func HandleOverlaps(ctx context.Context, c Cluster, overlaps []*core.RegionInfo) { + for _, item := range overlaps { + select { + case <-ctx.Done(): + return + default: + } + if c.GetRegionStats() != nil { + c.GetRegionStats().ClearDefunctRegion(item.GetID()) + } + c.GetLabelStats().MarkDefunctRegion(item.GetID()) + c.GetRuleManager().InvalidCache(item.GetID()) + } +} + +// Collect collects the cluster information. +func Collect(ctx context.Context, c Cluster, region *core.RegionInfo) { + // get region again from root tree. make sure the observed region is the latest. + bc := c.GetBasicCluster() + if bc == nil { + return + } + region = bc.GetRegion(region.GetID()) + if region == nil { + return + } + select { + case <-ctx.Done(): + return + default: + } + c.GetRegionStats().Observe(region, c.GetBasicCluster().GetRegionStores(region)) +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go new file mode 100644 index 000000000000..5885a9cdb844 --- /dev/null +++ b/pkg/mcs/scheduling/server/cluster.go @@ -0,0 +1,711 @@ +package server + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/ratelimit" + "github.com/tikv/pd/pkg/schedule" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/logutil" + "go.uber.org/zap" +) + +// Cluster is used to manage all information for scheduling purpose. +type Cluster struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + *core.BasicCluster + persistConfig *config.PersistConfig + ruleManager *placement.RuleManager + labelerManager *labeler.RegionLabeler + regionStats *statistics.RegionStatistics + labelStats *statistics.LabelStatistics + hotStat *statistics.HotStat + storage storage.Storage + coordinator *schedule.Coordinator + checkMembershipCh chan struct{} + apiServerLeader atomic.Value + clusterID uint64 + running atomic.Bool + + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner +} + +const ( + regionLabelGCInterval = time.Hour + requestTimeout = 3 * time.Second + collectWaitTime = time.Minute + + // heartbeat relative const + heartbeatTaskRunner = "heartbeat-task-runner" + miscTaskRunner = "misc-task-runner" + logTaskRunner = "log-task-runner" +) + +var syncRunner = ratelimit.NewSyncRunner() + +// NewCluster creates a new cluster. +func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { + ctx, cancel := context.WithCancel(parentCtx) + labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) + if err != nil { + cancel() + return nil, err + } + ruleManager := placement.NewRuleManager(ctx, storage, basicCluster, persistConfig) + c := &Cluster{ + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + ruleManager: ruleManager, + labelerManager: labelerManager, + persistConfig: persistConfig, + hotStat: statistics.NewHotStat(ctx), + labelStats: statistics.NewLabelStatistics(), + regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), + storage: storage, + clusterID: clusterID, + checkMembershipCh: checkMembershipCh, + + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + } + c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) + if err != nil { + cancel() + return nil, err + } + return c, nil +} + +// GetCoordinator returns the coordinator +func (c *Cluster) GetCoordinator() *schedule.Coordinator { + return c.coordinator +} + +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetStoresStats returns stores' statistics from cluster. +// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat +func (c *Cluster) GetStoresStats() *statistics.StoresStats { + return c.hotStat.StoresStats +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + +// GetBasicCluster returns the basic cluster. +func (c *Cluster) GetBasicCluster() *core.BasicCluster { + return c.BasicCluster +} + +// GetSharedConfig returns the shared config. +func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider { + return c.persistConfig +} + +// GetRuleManager returns the rule manager. +func (c *Cluster) GetRuleManager() *placement.RuleManager { + return c.ruleManager +} + +// GetRegionLabeler returns the region labeler. +func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { + return c.labelerManager +} + +// GetRegionSplitter returns the region splitter. +func (c *Cluster) GetRegionSplitter() *splitter.RegionSplitter { + return c.coordinator.GetRegionSplitter() +} + +// GetRegionScatterer returns the region scatter. +func (c *Cluster) GetRegionScatterer() *scatter.RegionScatterer { + return c.coordinator.GetRegionScatterer() +} + +// GetStoresLoads returns load stats of all stores. +func (c *Cluster) GetStoresLoads() map[uint64][]float64 { + return c.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool { + return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return c.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return c.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return c.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetStorage returns the storage. +func (c *Cluster) GetStorage() storage.Storage { + return c.storage +} + +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } + +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } + +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err + } + ctx, cancel := context.WithTimeout(c.ctx, requestTimeout) + defer cancel() + resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + if err != nil { + c.triggerMembershipCheck() + return 0, err + } + return resp.GetId(), nil +} + +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.triggerMembershipCheck() + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + +func (c *Cluster) triggerMembershipCheck() { + select { + case c.checkMembershipCh <- struct{}{}: + default: // avoid blocking + } +} + +// SwitchAPIServerLeader switches the API server leader. +func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { + old := c.apiServerLeader.Load() + return c.apiServerLeader.CompareAndSwap(old, new) +} + +func trySend(notifier chan struct{}) { + select { + case notifier <- struct{}{}: + // If the channel is not empty, it means the check is triggered. + default: + } +} + +// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. +func (c *Cluster) updateScheduler() { + defer logutil.LogPanic() + defer c.wg.Done() + + // Make sure the coordinator has initialized all the existing schedulers. + c.waitSchedulersInitialized() + // Establish a notifier to listen the schedulers updating. + notifier := make(chan struct{}, 1) + // Make sure the check will be triggered once later. + trySend(notifier) + c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-notifier: + // This is triggered by the watcher when the schedulers are updated. + } + + if !c.running.Load() { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-ticker.C: + // retry + trySend(notifier) + continue + } + } + + log.Info("schedulers updating notifier is triggered, try to update the scheduler") + var ( + schedulersController = c.coordinator.GetSchedulersController() + latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers + ) + // Create the newly added schedulers. + for _, scheduler := range latestSchedulersConfig { + schedulerType := types.ConvertOldStrToType[scheduler.Type] + s, err := schedulers.CreateScheduler( + schedulerType, + c.coordinator.GetOperatorController(), + c.storage, + schedulers.ConfigSliceDecoder(schedulerType, scheduler.Args), + schedulersController.RemoveScheduler, + ) + if err != nil { + log.Error("failed to create scheduler", + zap.String("scheduler-type", scheduler.Type), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + name := s.GetName() + if existed, _ := schedulersController.IsSchedulerExisted(name); existed { + log.Info("scheduler has already existed, skip adding it", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + continue + } + if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + log.Error("failed to add scheduler", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + log.Info("add scheduler successfully", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + } + // Remove the deleted schedulers. + for _, name := range schedulersController.GetSchedulerNames() { + scheduler := schedulersController.GetScheduler(name) + oldType := types.SchedulerTypeCompatibleMap[scheduler.GetType()] + if slice.AnyOf(latestSchedulersConfig, func(i int) bool { + return latestSchedulersConfig[i].Type == oldType + }) { + continue + } + if err := schedulersController.RemoveScheduler(name); err != nil { + log.Error("failed to remove scheduler", + zap.String("scheduler-name", name), + errs.ZapError(err)) + continue + } + log.Info("remove scheduler successfully", + zap.String("scheduler-name", name)) + } + } +} + +func (c *Cluster) waitSchedulersInitialized() { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + if c.coordinator.AreSchedulersInitialized() { + return + } + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop waiting the schedulers initialization") + return + case <-ticker.C: + } + } +} + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + } + c.labelStats.ClearDefunctRegions() +} + +func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// HandleStoreHeartbeat updates the store status. +func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error { + stats := heartbeat.GetStats() + storeID := stats.GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("store %v not found", storeID) + } + + nowTime := time.Now() + newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + + if store := c.GetStore(storeID); store != nil { + statistics.UpdateStoreHeartbeatMetrics(store) + } + c.PutStore(newStore) + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + reportInterval := stats.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + checkReadPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, loads, interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + c.hotStat.CheckReadAsync(checkReadPeerTask) + } + + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + collectUnReportedPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckColdPeer(storeID, regions, interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + c.hotStat.CheckReadAsync(collectUnReportedPeerTask) + return nil +} + +// runUpdateStoreStats updates store stats periodically. +func (c *Cluster) runUpdateStoreStats() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(9 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("update store stats background jobs has been stopped") + return + case <-ticker.C: + c.UpdateAllStoreStatus() + } + } +} + +// runCoordinator runs the main scheduling loop. +func (c *Cluster) runCoordinator() { + defer logutil.LogPanic() + defer c.wg.Done() + // force wait for 1 minute to make prepare checker won't be directly skipped + runCollectWaitTime := collectWaitTime + failpoint.Inject("changeRunCollectWaitTime", func() { + runCollectWaitTime = 1 * time.Second + }) + c.coordinator.RunUntilStop(runCollectWaitTime) +} + +func (c *Cluster) runMetricsCollectionJob() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("metrics are reset") + resetMetrics() + log.Info("metrics collection job has been stopped") + return + case <-ticker.C: + c.collectMetrics() + } + } +} + +func (c *Cluster) collectMetrics() { + statsMap := statistics.NewStoreStatisticsMap(c.persistConfig) + stores := c.GetStores() + for _, s := range stores { + statsMap.Observe(s) + statistics.ObserveHotStat(s, c.hotStat.StoresStats) + } + statsMap.Collect() + + c.coordinator.GetSchedulersController().CollectSchedulerMetrics() + c.coordinator.CollectHotSpotMetrics() + if c.regionStats == nil { + return + } + c.regionStats.Collect() + c.labelStats.Collect() + // collect hot cache metrics + c.hotStat.CollectMetrics() + // collect the lock metrics + c.CollectWaitLockMetrics() +} + +func resetMetrics() { + statistics.Reset() + schedulers.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() +} + +// StartBackgroundJobs starts background jobs. +func (c *Cluster) StartBackgroundJobs() { + c.wg.Add(4) + go c.updateScheduler() + go c.runUpdateStoreStats() + go c.runCoordinator() + go c.runMetricsCollectionJob() + c.heartbeatRunner.Start(c.ctx) + c.miscRunner.Start(c.ctx) + c.logRunner.Start(c.ctx) + c.running.Store(true) +} + +// StopBackgroundJobs stops background jobs. +func (c *Cluster) StopBackgroundJobs() { + if !c.running.Load() { + return + } + c.running.Store(false) + c.coordinator.Stop() + c.heartbeatRunner.Stop() + c.miscRunner.Stop() + c.logRunner.Stop() + c.cancel() + c.wg.Wait() +} + +// IsBackgroundJobsRunning returns whether the background jobs are running. Only for test purpose. +func (c *Cluster) IsBackgroundJobsRunning() bool { + return c.running.Load() +} + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + tracer := core.NewNoopHeartbeatProcessTracer() + if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { + tracer = core.NewHeartbeatProcessTracer() + } + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner + if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { + taskRunner = c.heartbeatRunner + miscRunner = c.miscRunner + logRunner = c.logRunner + } + ctx := &core.MetaProcessContext{ + Context: c.ctx, + Tracer: tracer, + TaskRunner: taskRunner, + MiscRunner: miscRunner, + LogRunner: logRunner, + } + tracer.Begin() + if err := c.processRegionHeartbeat(ctx, region); err != nil { + tracer.OnAllStageFinished() + return err + } + tracer.OnAllStageFinished() + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error { + tracer := ctx.Tracer + origin, _, err := c.PreCheckPutRegion(region) + tracer.OnPreCheckFinished() + if err != nil { + return err + } + region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) + cluster.HandleStatsAsync(c, region) + tracer.OnAsyncHotStatsFinished() + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + regionID := region.GetID() + if !saveCache { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + ctx.TaskRunner.RunTask( + regionID, + ratelimit.ObserveRegionStatsAsync, + func(ctx context.Context) { + cluster.Collect(ctx, c, region) + }, + ) + } + // region is not updated to the subtree. + if origin.GetRef() < 2 { + ctx.TaskRunner.RunTask( + regionID, + ratelimit.UpdateSubTree, + func(context.Context) { + c.CheckAndPutSubTree(region) + }, + ratelimit.WithRetained(true), + ) + } + return nil + } + tracer.OnSaveCacheBegin() + var overlaps []*core.RegionInfo + if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However, it can't solve the race condition of concurrent heartbeats from the same region. + + // Async task in next PR. + if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil { + tracer.OnSaveCacheFinished() + return err + } + ctx.TaskRunner.RunTask( + regionID, + ratelimit.UpdateSubTree, + func(context.Context) { + c.CheckAndPutSubTree(region) + }, + ratelimit.WithRetained(retained), + ) + tracer.OnUpdateSubTreeFinished() + ctx.TaskRunner.RunTask( + regionID, + ratelimit.HandleOverlaps, + func(ctx context.Context) { + cluster.HandleOverlaps(ctx, c, overlaps) + }, + ) + } + tracer.OnSaveCacheFinished() + if hasRegionStats { + // handle region stats + ctx.TaskRunner.RunTask( + regionID, + ratelimit.CollectRegionStatsAsync, + func(ctx context.Context) { + cluster.Collect(ctx, c, region) + }, + ) + } + + tracer.OnCollectRegionStatsFinished() + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} + +// SetPrepared set the prepare check to prepared. Only for test purpose. +func (c *Cluster) SetPrepared() { + c.coordinator.GetPrepareChecker().SetPrepared() +} + +// IsSchedulingHalted returns whether the scheduling is halted. +// Currently, the microservice scheduling is halted when: +// - The `HaltScheduling` persist option is set to true. +func (c *Cluster) IsSchedulingHalted() bool { + return c.persistConfig.IsSchedulingHalted() +} diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index b6187bf0a685..076044008c4f 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -359,6 +359,7 @@ type LabelStatistics struct { sync.RWMutex regionLabelStats map[uint64]string labelCounter map[string]int + defunctRegions map[uint64]struct{} } // NewLabelStatistics creates a new LabelStatistics. @@ -366,6 +367,7 @@ func NewLabelStatistics() *LabelStatistics { return &LabelStatistics{ regionLabelStats: make(map[uint64]string), labelCounter: make(map[string]int), + defunctRegions: make(map[uint64]struct{}), } } @@ -399,14 +401,26 @@ func (l *LabelStatistics) Reset() { regionLabelLevelGauge.Reset() } -// ClearDefunctRegion is used to handle the overlap region. -func (l *LabelStatistics) ClearDefunctRegion(regionID uint64) { +// MarkDefunctRegion is used to handle the overlap region. +// It is used to mark the region as defunct and remove it from the label statistics later. +func (l *LabelStatistics) MarkDefunctRegion(regionID uint64) { l.Lock() defer l.Unlock() - if label, ok := l.regionLabelStats[regionID]; ok { - l.labelCounter[label]-- - delete(l.regionLabelStats, regionID) + l.defunctRegions[regionID] = struct{}{} +} + +// ClearDefunctRegions is used to handle the overlap region. +// It is used to remove the defunct regions from the label statistics. +func (l *LabelStatistics) ClearDefunctRegions() { + l.Lock() + defer l.Unlock() + for regionID := range l.defunctRegions { + if label, ok := l.regionLabelStats[regionID]; ok { + l.labelCounter[label]-- + delete(l.regionLabelStats, regionID) + } } + l.defunctRegions = make(map[uint64]struct{}) } // GetLabelCounter is only used for tests. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index f7ff71b888d9..4d728f3d41e9 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1092,7 +1092,12 @@ func TestRegionLabelIsolationLevel(t *testing.T) { cfg.LocationLabels = []string{"zone"} opt.SetReplicationConfig(cfg) re.NoError(err) +<<<<<<< HEAD cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) +======= + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) +>>>>>>> 26123dc75 (checker, statistic: avoid leak in label statistic (#8703)) for i := uint64(1); i <= 4; i++ { var labels []*metapb.StoreLabel @@ -1127,11 +1132,45 @@ func TestRegionLabelIsolationLevel(t *testing.T) { StartKey: []byte{byte(1)}, EndKey: []byte{byte(2)}, } - r := core.NewRegionInfo(region, peers[0]) - re.NoError(cluster.putRegion(r)) + r1 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r1)) +<<<<<<< HEAD cluster.updateRegionsLabelLevelStats([]*core.RegionInfo{r}) counter := cluster.labelLevelStats.GetLabelCounter() +======= + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1}) + counter := cluster.labelStats.GetLabelCounter() +>>>>>>> 26123dc75 (checker, statistic: avoid leak in label statistic (#8703)) + re.Equal(0, counter["none"]) + re.Equal(1, counter["zone"]) + + region = &metapb.Region{ + Id: 10, + Peers: peers, + StartKey: []byte{byte(2)}, + EndKey: []byte{byte(3)}, + } + r2 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r2)) + + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r2}) + counter = cluster.labelStats.GetLabelCounter() + re.Equal(0, counter["none"]) + re.Equal(2, counter["zone"]) + + // issue: https://github.com/tikv/pd/issues/8700 + // step1: heartbeat a overlap region, which is used to simulate the case that the region is merged. + // step2: update region 9 and region 10, which is used to simulate the case that patrol is triggered. + // We should only count region 9. + overlapRegion := r1.Clone( + core.WithStartKey(r1.GetStartKey()), + core.WithEndKey(r2.GetEndKey()), + core.WithLeader(r2.GetPeer(8)), + ) + re.NoError(cluster.HandleRegionHeartbeat(overlapRegion)) + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1, r2}) + counter = cluster.labelStats.GetLabelCounter() re.Equal(0, counter["none"]) re.Equal(1, counter["zone"]) } diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go new file mode 100644 index 000000000000..8578b3480d8a --- /dev/null +++ b/server/cluster/scheduling_controller.go @@ -0,0 +1,492 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/checker" + sc "github.com/tikv/pd/pkg/schedule/config" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" +) + +// schedulingController is used to manage all schedulers and checkers. +type schedulingController struct { + parentCtx context.Context + ctx context.Context + cancel context.CancelFunc + mu syncutil.RWMutex + wg sync.WaitGroup + *core.BasicCluster + opt sc.ConfProvider + coordinator *schedule.Coordinator + labelStats *statistics.LabelStatistics + regionStats *statistics.RegionStatistics + hotStat *statistics.HotStat + slowStat *statistics.SlowStat + running bool +} + +// newSchedulingController creates a new scheduling controller. +func newSchedulingController(parentCtx context.Context, basicCluster *core.BasicCluster, opt sc.ConfProvider, ruleManager *placement.RuleManager) *schedulingController { + ctx, cancel := context.WithCancel(parentCtx) + return &schedulingController{ + parentCtx: parentCtx, + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + opt: opt, + labelStats: statistics.NewLabelStatistics(), + hotStat: statistics.NewHotStat(parentCtx), + slowStat: statistics.NewSlowStat(), + regionStats: statistics.NewRegionStatistics(basicCluster, opt, ruleManager), + } +} + +func (sc *schedulingController) stopSchedulingJobs() bool { + sc.mu.Lock() + defer sc.mu.Unlock() + if !sc.running { + return false + } + sc.coordinator.Stop() + sc.cancel() + sc.wg.Wait() + sc.running = false + log.Info("scheduling service is stopped") + return true +} + +func (sc *schedulingController) startSchedulingJobs(cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { + sc.mu.Lock() + defer sc.mu.Unlock() + if sc.running { + return + } + sc.initCoordinatorLocked(sc.parentCtx, cluster, hbstreams) + sc.wg.Add(3) + go sc.runCoordinator() + go sc.runStatsBackgroundJobs() + go sc.runSchedulingMetricsCollectionJob() + sc.running = true + log.Info("scheduling service is started") +} + +func (sc *schedulingController) initCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { + sc.mu.Lock() + defer sc.mu.Unlock() + sc.initCoordinatorLocked(ctx, cluster, hbstreams) + sc.coordinator.InitSchedulers(false) +} + +func (sc *schedulingController) initCoordinatorLocked(ctx context.Context, cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { + sc.ctx, sc.cancel = context.WithCancel(ctx) + sc.coordinator = schedule.NewCoordinator(sc.ctx, cluster, hbstreams) +} + +// runCoordinator runs the main scheduling loop. +func (sc *schedulingController) runCoordinator() { + defer logutil.LogPanic() + defer sc.wg.Done() + sc.coordinator.RunUntilStop() +} + +func (sc *schedulingController) runStatsBackgroundJobs() { + defer logutil.LogPanic() + defer sc.wg.Done() + + ticker := time.NewTicker(statistics.RegionsStatsObserveInterval) + defer ticker.Stop() + + for _, store := range sc.GetStores() { + storeID := store.GetID() + sc.hotStat.GetOrCreateRollingStoreStats(storeID) + } + for { + select { + case <-sc.ctx.Done(): + log.Info("statistics background jobs has been stopped") + return + case <-ticker.C: + sc.hotStat.ObserveRegionsStats(sc.GetStoresWriteRate()) + } + } +} + +func (sc *schedulingController) runSchedulingMetricsCollectionJob() { + defer logutil.LogPanic() + defer sc.wg.Done() + + ticker := time.NewTicker(metricsCollectionJobInterval) + failpoint.Inject("highFrequencyClusterJobs", func() { + ticker.Reset(time.Millisecond) + }) + defer ticker.Stop() + + for { + select { + case <-sc.ctx.Done(): + log.Info("scheduling metrics are reset") + resetSchedulingMetrics() + log.Info("scheduling metrics collection job has been stopped") + return + case <-ticker.C: + sc.collectSchedulingMetrics() + } + } +} + +func resetSchedulingMetrics() { + statistics.Reset() + schedulers.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() + statistics.ResetRegionStatsMetrics() + statistics.ResetLabelStatsMetrics() + // reset hot cache metrics + statistics.ResetHotCacheStatusMetrics() +} + +func (sc *schedulingController) collectSchedulingMetrics() { + statsMap := statistics.NewStoreStatisticsMap(sc.opt) + stores := sc.GetStores() + for _, s := range stores { + statsMap.Observe(s) + statistics.ObserveHotStat(s, sc.hotStat.StoresStats) + } + statsMap.Collect() + sc.coordinator.GetSchedulersController().CollectSchedulerMetrics() + sc.coordinator.CollectHotSpotMetrics() + if sc.regionStats == nil { + return + } + sc.regionStats.Collect() + sc.labelStats.Collect() + // collect hot cache metrics + sc.hotStat.CollectMetrics() + // collect the lock metrics + sc.CollectWaitLockMetrics() +} + +func (sc *schedulingController) removeStoreStatistics(storeID uint64) { + sc.hotStat.RemoveRollingStoreStats(storeID) + sc.slowStat.RemoveSlowStoreStatus(storeID) +} + +func (sc *schedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { + sc.hotStat.GetOrCreateRollingStoreStats(storeID) + sc.slowStat.ObserveSlowStoreStatus(storeID, isSlow) +} + +// GetHotStat gets hot stat. +func (sc *schedulingController) GetHotStat() *statistics.HotStat { + return sc.hotStat +} + +// GetRegionStats gets region statistics. +func (sc *schedulingController) GetRegionStats() *statistics.RegionStatistics { + return sc.regionStats +} + +// GetLabelStats gets label statistics. +func (sc *schedulingController) GetLabelStats() *statistics.LabelStatistics { + return sc.labelStats +} + +// GetRegionStatsByType gets the status of the region by types. +func (sc *schedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { + if sc.regionStats == nil { + return nil + } + return sc.regionStats.GetRegionStatsByType(typ) +} + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels()) + } + sc.labelStats.ClearDefunctRegions() +} + +func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := sc.GetStore(p.StoreId); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// GetStoresStats returns stores' statistics from cluster. +// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat +func (sc *schedulingController) GetStoresStats() *statistics.StoresStats { + return sc.hotStat.StoresStats +} + +// GetStoresLoads returns load stats of all stores. +func (sc *schedulingController) GetStoresLoads() map[uint64][]float64 { + return sc.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (sc *schedulingController) IsRegionHot(region *core.RegionInfo) bool { + return sc.hotStat.IsRegionHot(region, sc.opt.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := sc.opt.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return sc.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (sc *schedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return sc.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetCoordinator returns the coordinator. +func (sc *schedulingController) GetCoordinator() *schedule.Coordinator { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator +} + +// GetPausedSchedulerDelayAt returns DelayAt of a paused scheduler +func (sc *schedulingController) GetPausedSchedulerDelayAt(name string) (int64, error) { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().GetPausedSchedulerDelayAt(name) +} + +// GetPausedSchedulerDelayUntil returns DelayUntil of a paused scheduler +func (sc *schedulingController) GetPausedSchedulerDelayUntil(name string) (int64, error) { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().GetPausedSchedulerDelayUntil(name) +} + +// GetOperatorController returns the operator controller. +func (sc *schedulingController) GetOperatorController() *operator.Controller { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetOperatorController() +} + +// GetRegionScatterer returns the region scatter. +func (sc *schedulingController) GetRegionScatterer() *scatter.RegionScatterer { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetRegionScatterer() +} + +// GetRegionSplitter returns the region splitter +func (sc *schedulingController) GetRegionSplitter() *splitter.RegionSplitter { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetRegionSplitter() +} + +// GetMergeChecker returns merge checker. +func (sc *schedulingController) GetMergeChecker() *checker.MergeChecker { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetMergeChecker() +} + +// GetRuleChecker returns rule checker. +func (sc *schedulingController) GetRuleChecker() *checker.RuleChecker { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetRuleChecker() +} + +// GetSchedulers gets all schedulers. +func (sc *schedulingController) GetSchedulers() []string { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().GetSchedulerNames() +} + +// GetSchedulerHandlers gets all scheduler handlers. +func (sc *schedulingController) GetSchedulerHandlers() map[string]http.Handler { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().GetSchedulerHandlers() +} + +// AddSchedulerHandler adds a scheduler handler. +func (sc *schedulingController) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) +} + +// RemoveSchedulerHandler removes a scheduler handler. +func (sc *schedulingController) RemoveSchedulerHandler(name string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().RemoveSchedulerHandler(name) +} + +// AddScheduler adds a scheduler. +func (sc *schedulingController) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) +} + +// RemoveScheduler removes a scheduler. +func (sc *schedulingController) RemoveScheduler(name string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().RemoveScheduler(name) +} + +// PauseOrResumeScheduler pauses or resumes a scheduler. +func (sc *schedulingController) PauseOrResumeScheduler(name string, t int64) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetSchedulersController().PauseOrResumeScheduler(name, t) +} + +// PauseOrResumeChecker pauses or resumes checker. +func (sc *schedulingController) PauseOrResumeChecker(name string, t int64) error { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.PauseOrResumeChecker(name, t) +} + +// AddPendingProcessedRegions adds regions to suspect list. +func (sc *schedulingController) AddPendingProcessedRegions(needCheckLen bool, regionIDs ...uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetCheckerController().AddPendingProcessedRegions(needCheckLen, regionIDs...) +} + +// GetPendingProcessedRegions gets all suspect regions. +func (sc *schedulingController) GetPendingProcessedRegions() []uint64 { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetCheckerController().GetPendingProcessedRegions() +} + +// RemovePendingProcessedRegion removes region from pending processed regions. +func (sc *schedulingController) RemovePendingProcessedRegion(id uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetCheckerController().RemovePendingProcessedRegion(id) +} + +// PopOneSuspectKeyRange gets one suspect keyRange group. +// it would return value and true if pop success, or return empty [][2][]byte and false +// if suspectKeyRanges couldn't pop keyRange group. +func (sc *schedulingController) PopOneSuspectKeyRange() ([2][]byte, bool) { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetCheckerController().PopOneSuspectKeyRange() +} + +// ClearSuspectKeyRanges clears the suspect keyRanges, only for unit test +func (sc *schedulingController) ClearSuspectKeyRanges() { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetCheckerController().ClearSuspectKeyRanges() +} + +// AddSuspectKeyRange adds the key range with the its ruleID as the key +// The instance of each keyRange is like following format: +// [2][]byte: start key/end key +func (sc *schedulingController) AddSuspectKeyRange(start, end []byte) { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetCheckerController().AddSuspectKeyRange(start, end) +} + +func (sc *schedulingController) getEvictLeaderStores() (evictStores []uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() + if sc.coordinator == nil { + return nil + } + handler, ok := sc.coordinator.GetSchedulersController().GetSchedulerHandlers()[types.EvictLeaderScheduler.String()] + if !ok { + return + } + type evictLeaderHandler interface { + EvictStoreIDs() []uint64 + } + h, ok := handler.(evictLeaderHandler) + if !ok { + return + } + return h.EvictStoreIDs() +} + +// IsPrepared return true if the prepare checker is ready. +func (sc *schedulingController) IsPrepared() bool { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetPrepareChecker().IsPrepared() +} + +// SetPrepared set the prepare check to prepared. Only for test purpose. +func (sc *schedulingController) SetPrepared() { + sc.mu.RLock() + defer sc.mu.RUnlock() + sc.coordinator.GetPrepareChecker().SetPrepared() +} + +// IsSchedulingControllerRunning returns whether the scheduling controller is running. Only for test purpose. +func (sc *schedulingController) IsSchedulingControllerRunning() bool { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.running +}