Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: reset prepare checker once the cache is reset #8860

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func deleteAllRegionCache(c *gin.Context) {
return
}
cluster.ResetRegionCache()
cluster.ResetPrepared()
c.String(http.StatusOK, "All regions are removed from server cache.")
}

Expand Down
14 changes: 6 additions & 8 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
Expand Down Expand Up @@ -67,7 +66,6 @@ type Cluster struct {
const (
regionLabelGCInterval = time.Hour
requestTimeout = 3 * time.Second
collectWaitTime = time.Minute

// heartbeat relative const
heartbeatTaskRunner = "heartbeat-task-runner"
Expand Down Expand Up @@ -491,12 +489,7 @@ func (c *Cluster) runUpdateStoreStats() {
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
// force wait for 1 minute to make prepare checker won't be directly skipped
runCollectWaitTime := collectWaitTime
failpoint.Inject("changeRunCollectWaitTime", func() {
runCollectWaitTime = 1 * time.Second
})
c.coordinator.RunUntilStop(runCollectWaitTime)
c.coordinator.RunUntilStop()
}

func (c *Cluster) runMetricsCollectionJob() {
Expand Down Expand Up @@ -706,6 +699,11 @@ func (c *Cluster) SetPrepared() {
c.coordinator.GetPrepareChecker().SetPrepared()
}

// ResetPrepared reset the prepare checker.
func (c *Cluster) ResetPrepared() {
c.coordinator.GetPrepareChecker().ResetPrepared()
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the microservice scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
Expand Down
3 changes: 3 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,3 +909,6 @@ func (mc *Cluster) ObserveRegionsStats() {
storeIDs, writeBytesRates, writeKeysRates := mc.BasicCluster.GetStoresWriteRate()
mc.HotStat.ObserveRegionsStats(storeIDs, writeBytesRates, writeKeysRates)
}

// ResetPrepared mocks method.
func (*Cluster) ResetPrepared() {}
7 changes: 6 additions & 1 deletion pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ type Controller struct {
// patrolRegionScanLimit is the limit of regions to scan.
// It is calculated by the number of regions.
patrolRegionScanLimit int
prepareChecker *sche.PrepareChecker
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this function has too many parameters. Do we have any good way to solve it?

pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute)
c := &Controller{
ctx: ctx,
Expand All @@ -111,6 +112,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
patrolRegionContext: &PatrolRegionContext{},
interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(),
patrolRegionScanLimit: calculateScanLimit(cluster),
prepareChecker: prepareChecker,
}
c.duration.Store(time.Duration(0))
return c
Expand All @@ -134,6 +136,9 @@ func (c *Controller) PatrolRegions() {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
c.updatePatrolWorkersIfNeeded()
if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) {
continue
}
if c.cluster.IsSchedulingHalted() {
for len(c.patrolRegionContext.regionChan) > 0 {
<-c.patrolRegionContext.regionChan
Expand Down
35 changes: 11 additions & 24 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (

const (
runSchedulerCheckInterval = 3 * time.Second
collectTimeout = 5 * time.Minute
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
pushOperatorTickInterval = 500 * time.Millisecond
Expand All @@ -66,7 +65,7 @@ type Coordinator struct {
schedulersInitialized bool

cluster sche.ClusterInformer
prepareChecker *prepareChecker
prepareChecker *sche.PrepareChecker
checkers *checker.Controller
regionScatterer *scatter.RegionScatterer
regionSplitter *splitter.RegionSplitter
Expand All @@ -80,15 +79,16 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator.
func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(parentCtx)
prepareChecker := sche.NewPrepareChecker()
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController, prepareChecker)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController, prepareChecker)
return &Coordinator{
ctx: ctx,
cancel: cancel,
schedulersInitialized: false,
cluster: cluster,
prepareChecker: newPrepareChecker(),
prepareChecker: prepareChecker,
checkers: checkers,
regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddPendingProcessedRegions),
regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddPendingProcessedRegions),
Expand Down Expand Up @@ -204,8 +204,8 @@ func (c *Coordinator) driveSlowNodeScheduler() {
}

// RunUntilStop runs the coordinator until receiving the stop signal.
func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) {
c.Run(collectWaitTime...)
func (c *Coordinator) RunUntilStop() {
c.Run()
<-c.ctx.Done()
log.Info("coordinator is stopping")
c.GetSchedulersController().Wait()
Expand All @@ -214,25 +214,12 @@ func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) {
}

// Run starts coordinator.
func (c *Coordinator) Run(collectWaitTime ...time.Duration) {
func (c *Coordinator) Run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker.Reset(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
if c.ShouldRun(collectWaitTime...) {
log.Info("coordinator has finished cluster information preparation")
break
}
select {
case <-ticker.C:
case <-c.ctx.Done():
log.Info("coordinator stops running")
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the similar log?

}
log.Info("coordinator starts to run schedulers")
c.InitSchedulers(true)

Expand Down Expand Up @@ -547,8 +534,8 @@ func ResetHotSpotMetrics() {
}

// ShouldRun returns true if the coordinator should run.
func (c *Coordinator) ShouldRun(collectWaitTime ...time.Duration) bool {
return c.prepareChecker.check(c.cluster.GetBasicCluster(), collectWaitTime...)
func (c *Coordinator) ShouldRun() bool {
return c.prepareChecker.Check(c.cluster.GetBasicCluster())
}

// GetSchedulersController returns the schedulers controller.
Expand Down Expand Up @@ -616,7 +603,7 @@ func (c *Coordinator) GetRuleChecker() *checker.RuleChecker {
}

// GetPrepareChecker returns the prepare checker.
func (c *Coordinator) GetPrepareChecker() *prepareChecker {
func (c *Coordinator) GetPrepareChecker() *sche.PrepareChecker {
return c.prepareChecker
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package core

import (
"time"
Expand All @@ -23,32 +23,34 @@ import (
"go.uber.org/zap"
)

type prepareChecker struct {
const collectTimeout = 5 * time.Minute

// PrepareChecker is used to check if the coordinator has finished cluster information preparation.
type PrepareChecker struct {
syncutil.RWMutex
start time.Time
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
// NewPrepareChecker creates a new PrepareChecker.
func NewPrepareChecker() *PrepareChecker {
return &PrepareChecker{
start: time.Now(),
}
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool {
checker.Lock()
defer checker.Unlock()
if checker.prepared {
// Check checks if the coordinator has finished cluster information preparation.
func (checker *PrepareChecker) Check(c *core.BasicCluster) bool {
if checker.IsPrepared() {
return true
}
checker.Lock()
defer checker.Unlock()

if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
if len(collectWaitTime) > 0 && time.Since(checker.start) < collectWaitTime[0] {
return false
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
// The number of active regions should be more than total region of all stores * core.CollectFactor
Expand All @@ -61,7 +63,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}
storeID := store.GetID()
// It is used to avoid sudden scheduling when scheduling service is just started.
if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) {
if float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
return false
}
if !c.IsStorePrepared(storeID) {
Expand All @@ -74,15 +76,23 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}

// IsPrepared returns whether the coordinator is prepared.
func (checker *prepareChecker) IsPrepared() bool {
func (checker *PrepareChecker) IsPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// SetPrepared is for test purpose
func (checker *prepareChecker) SetPrepared() {
func (checker *PrepareChecker) SetPrepared() {
checker.Lock()
defer checker.Unlock()
checker.prepared = true
}

// ResetPrepared is for test purpose
func (checker *PrepareChecker) ResetPrepared() {
checker.Lock()
defer checker.Unlock()
checker.prepared = false
checker.start = time.Now()
}
7 changes: 6 additions & 1 deletion pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ type Controller struct {
// which will only be initialized and used in the API service mode now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
prepareChecker *sche.PrepareChecker
}

// NewController creates a scheduler controller.
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller {
return &Controller{
ctx: ctx,
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
schedulerHandlers: make(map[string]http.Handler),
opController: opController,
prepareChecker: prepareChecker,
}
}

Expand Down Expand Up @@ -368,6 +370,9 @@ func (c *Controller) runScheduler(s *ScheduleController) {
for {
select {
case <-ticker.C:
if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) {
continue
}
diagnosable := s.IsDiagnosticAllowed()
if !s.AllowSchedule(diagnosable) {
continue
Expand Down
2 changes: 2 additions & 0 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ const (
type cluster interface {
core.StoreSetInformer

ResetPrepared()
ResetRegionCache()
AllocID() (uint64, error)
BuryStore(storeID uint64, forceBury bool) error
Expand Down Expand Up @@ -545,6 +546,7 @@ func (u *Controller) changeStage(stage stage) {
if u.step > 1 {
// == 1 means no operation has done, no need to invalid cache
u.cluster.ResetRegionCache()
u.cluster.ResetPrepared()
}
output.Info = "Unsafe recovery Finished"
output.Details = u.getAffectedTableDigest()
Expand Down
1 change: 1 addition & 0 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Reque
var err error
rc := getCluster(r)
rc.ResetRegionCache()
rc.ResetPrepared()
msg := "All regions are removed from server cache."
if rc.IsServiceIndependent(constant.SchedulingServiceName) {
err = h.deleteRegionCacheInSchedulingServer()
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,13 @@ func (sc *schedulingController) SetPrepared() {
sc.coordinator.GetPrepareChecker().SetPrepared()
}

// ResetPrepared reset the prepare checker.
func (sc *schedulingController) ResetPrepared() {
sc.mu.RLock()
defer sc.mu.RUnlock()
sc.coordinator.GetPrepareChecker().ResetPrepared()
}

// IsSchedulingControllerRunning returns whether the scheduling controller is running. Only for test purpose.
func (sc *schedulingController) IsSchedulingControllerRunning() bool {
sc.mu.RLock()
Expand Down
2 changes: 0 additions & 2 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ func TestAPI(t *testing.T) {
func (suite *apiTestSuite) SetupSuite() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`))
suite.env = tests.NewSchedulingTestEnvironment(suite.T())
}

func (suite *apiTestSuite) TearDownSuite() {
suite.env.Cleanup()
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime"))
}

func (suite *apiTestSuite) TestGetCheckerByName() {
Expand Down
Loading
Loading