Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: reset prepare checker once the cache is reset #8860

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
43 changes: 0 additions & 43 deletions pkg/btree/btree_generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ package btree

import (
"flag"
"fmt"
"math/rand"
"reflect"
"sort"
Expand Down Expand Up @@ -215,48 +214,6 @@ func TestBTreeG(t *testing.T) {
}
}

func ExampleBTreeG() {
tr := NewG[Int](*btreeDegree)
for i := Int(0); i < 10; i++ {
tr.ReplaceOrInsert(i)
}
fmt.Println("len: ", tr.Len())
v, ok := tr.Get(3)
fmt.Println("get3: ", v, ok)
v, ok = tr.Get(100)
fmt.Println("get100: ", v, ok)
v, ok = tr.Delete(4)
fmt.Println("del4: ", v, ok)
v, ok = tr.Delete(100)
fmt.Println("del100: ", v, ok)
v, ok = tr.ReplaceOrInsert(5)
fmt.Println("replace5: ", v, ok)
v, ok = tr.ReplaceOrInsert(100)
fmt.Println("replace100:", v, ok)
v, ok = tr.Min()
fmt.Println("min: ", v, ok)
v, ok = tr.DeleteMin()
fmt.Println("delmin: ", v, ok)
v, ok = tr.Max()
fmt.Println("max: ", v, ok)
v, ok = tr.DeleteMax()
fmt.Println("delmax: ", v, ok)
fmt.Println("len: ", tr.Len())
// Output:
// len: 10
// get3: 3 true
// get100: 0 false
// del4: 4 true
// del100: 0 false
// replace5: 5 true
// replace100: 0 false
// min: 0 true
// delmin: 0 true
// max: 100 true
// delmax: 100 true
// len: 8
}

func TestDeleteMinG(t *testing.T) {
tr := NewG[Int](3)
for _, v := range perm(100) {
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func deleteAllRegionCache(c *gin.Context) {
return
}
cluster.ResetRegionCache()
cluster.ResetPrepared()
c.String(http.StatusOK, "All regions are removed from server cache.")
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,11 @@ func (c *Cluster) SetPrepared() {
c.coordinator.GetPrepareChecker().SetPrepared()
}

// ResetPrepared reset the prepare checker.
func (c *Cluster) ResetPrepared() {
c.coordinator.GetPrepareChecker().ResetPrepared()
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the microservice scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *Server) updateAPIServerMemberLoop() {
}
if s.cluster.SwitchAPIServerLeader(pdpb.NewPDClient(cc)) {
if status.Leader != curLeader {
log.Info("switch leader", zap.String("leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]))
log.Info("switch leader", zap.String("current-leader", fmt.Sprintf("%x", curLeader)), zap.String("new-leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]))
}
curLeader = ep.ID
break
Expand Down
3 changes: 3 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,3 +911,6 @@ func (mc *Cluster) ObserveRegionsStats() {
storeIDs, writeBytesRates, writeKeysRates := mc.BasicCluster.GetStoresWriteRate()
mc.HotStat.ObserveRegionsStats(storeIDs, writeBytesRates, writeKeysRates)
}

// ResetPrepared mocks method.
func (*Cluster) ResetPrepared() {}
11 changes: 9 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/logutil"
)
Expand Down Expand Up @@ -91,11 +90,15 @@ type Controller struct {
// patrolRegionScanLimit is the limit of regions to scan.
// It is calculated by the number of regions.
patrolRegionScanLimit int
prepareChecker *sche.PrepareChecker
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.CheckerCluster, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller {
pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute)
conf := cluster.GetCheckerConfig()
ruleManager := cluster.GetRuleManager()
labeler := cluster.GetRegionLabeler()
c := &Controller{
ctx: ctx,
cluster: cluster,
Expand All @@ -113,6 +116,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
patrolRegionContext: &PatrolRegionContext{},
interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(),
patrolRegionScanLimit: calculateScanLimit(cluster),
prepareChecker: prepareChecker,
}
c.duration.Store(time.Duration(0))
return c
Expand All @@ -136,6 +140,9 @@ func (c *Controller) PatrolRegions() {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
c.updatePatrolWorkersIfNeeded()
if !c.prepareChecker.IsPrepared() {
continue
}
if c.cluster.IsSchedulingHalted() {
for len(c.patrolRegionContext.regionChan) > 0 {
<-c.patrolRegionContext.regionChan
Expand Down
36 changes: 29 additions & 7 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Coordinator struct {
schedulersInitialized bool

cluster sche.ClusterInformer
prepareChecker *prepareChecker
prepareChecker *sche.PrepareChecker
checkers *checker.Controller
regionScatterer *scatter.RegionScatterer
regionSplitter *splitter.RegionSplitter
Expand All @@ -82,15 +82,16 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator.
func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(parentCtx)
prepareChecker := sche.NewPrepareChecker()
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController, prepareChecker)
checkers := checker.NewController(ctx, cluster, opController, prepareChecker)
return &Coordinator{
ctx: ctx,
cancel: cancel,
schedulersInitialized: false,
cluster: cluster,
prepareChecker: newPrepareChecker(),
prepareChecker: prepareChecker,
checkers: checkers,
regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddPendingProcessedRegions),
regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddPendingProcessedRegions),
Expand Down Expand Up @@ -205,6 +206,26 @@ func (c *Coordinator) driveSlowNodeScheduler() {
}
}

func (c *Coordinator) runPrepareChecker() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
if !c.prepareChecker.IsPrepared() {
if c.prepareChecker.Check(c.cluster.GetBasicCluster()) {
log.Info("prepare checker is ready")
}
}
}
}
}

// RunUntilStop runs the coordinator until receiving the stop signal.
func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) {
c.Run(collectWaitTime...)
Expand Down Expand Up @@ -238,7 +259,8 @@ func (c *Coordinator) Run(collectWaitTime ...time.Duration) {
log.Info("coordinator starts to run schedulers")
c.InitSchedulers(true)

c.wg.Add(4)
c.wg.Add(5)
go c.runPrepareChecker()
// Starts to patrol regions.
go c.PatrolRegions()
// Checks suspect key ranges
Expand Down Expand Up @@ -550,7 +572,7 @@ func ResetHotSpotMetrics() {

// ShouldRun returns true if the coordinator should run.
func (c *Coordinator) ShouldRun(collectWaitTime ...time.Duration) bool {
return c.prepareChecker.check(c.cluster.GetBasicCluster(), collectWaitTime...)
return c.prepareChecker.Check(c.cluster.GetBasicCluster(), collectWaitTime...)
}

// GetSchedulersController returns the schedulers controller.
Expand Down Expand Up @@ -618,7 +640,7 @@ func (c *Coordinator) GetRuleChecker() *checker.RuleChecker {
}

// GetPrepareChecker returns the prepare checker.
func (c *Coordinator) GetPrepareChecker() *prepareChecker {
func (c *Coordinator) GetPrepareChecker() *sche.PrepareChecker {
return c.prepareChecker
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type SchedulerCluster interface {
buckets.BucketStatInformer

GetSchedulerConfig() sc.SchedulerConfigProvider
GetRegionLabeler() *labeler.RegionLabeler
GetStoreConfig() sc.StoreConfigProvider
}

Expand All @@ -61,6 +60,7 @@ type SharedCluster interface {
GetBasicCluster() *core.BasicCluster
GetSharedConfig() sc.SharedConfigProvider
GetRuleManager() *placement.RuleManager
GetRegionLabeler() *labeler.RegionLabeler
AllocID() (uint64, error)
IsSchedulingHalted() bool
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package core

import (
"time"
Expand All @@ -25,25 +25,30 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

type prepareChecker struct {
const collectTimeout = 5 * time.Minute

// PrepareChecker is used to check if the coordinator has finished cluster information preparation.
type PrepareChecker struct {
syncutil.RWMutex
start time.Time
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
// NewPrepareChecker creates a new PrepareChecker.
func NewPrepareChecker() *PrepareChecker {
return &PrepareChecker{
start: time.Now(),
}
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool {
// Check checks if the coordinator has finished cluster information preparation.
func (checker *PrepareChecker) Check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool {
checker.Lock()
defer checker.Unlock()
if checker.prepared {
return true
}

if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
Expand Down Expand Up @@ -76,15 +81,23 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}

// IsPrepared returns whether the coordinator is prepared.
func (checker *prepareChecker) IsPrepared() bool {
func (checker *PrepareChecker) IsPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// SetPrepared is for test purpose
func (checker *prepareChecker) SetPrepared() {
func (checker *PrepareChecker) SetPrepared() {
checker.Lock()
defer checker.Unlock()
checker.prepared = true
}

// ResetPrepared is for test purpose
func (checker *PrepareChecker) ResetPrepared() {
checker.Lock()
defer checker.Unlock()
checker.prepared = false
checker.start = time.Now()
}
7 changes: 6 additions & 1 deletion pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,19 @@ type Controller struct {
// which will only be initialized and used in the API service mode now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
prepareChecker *sche.PrepareChecker
}

// NewController creates a scheduler controller.
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller {
return &Controller{
ctx: ctx,
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
schedulerHandlers: make(map[string]http.Handler),
opController: opController,
prepareChecker: prepareChecker,
}
}

Expand Down Expand Up @@ -370,6 +372,9 @@ func (c *Controller) runScheduler(s *ScheduleController) {
for {
select {
case <-ticker.C:
if !c.prepareChecker.IsPrepared() {
continue
}
diagnosable := s.IsDiagnosticAllowed()
if !s.AllowSchedule(diagnosable) {
continue
Expand Down
2 changes: 2 additions & 0 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
type cluster interface {
core.StoreSetInformer

ResetPrepared()
ResetRegionCache()
AllocID() (uint64, error)
BuryStore(storeID uint64, forceBury bool) error
Expand Down Expand Up @@ -547,6 +548,7 @@ func (u *Controller) changeStage(stage stage) {
if u.step > 1 {
// == 1 means no operation has done, no need to invalid cache
u.cluster.ResetRegionCache()
u.cluster.ResetPrepared()
}
output.Info = "Unsafe recovery Finished"
output.Details = u.getAffectedTableDigest()
Expand Down
1 change: 1 addition & 0 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Reque
var err error
rc := getCluster(r)
rc.ResetRegionCache()
rc.ResetPrepared()
msg := "All regions are removed from server cache."
if rc.IsServiceIndependent(constant.SchedulingServiceName) {
err = h.deleteRegionCacheInSchedulingServer()
Expand Down
3 changes: 1 addition & 2 deletions server/api/diagnostic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (suite *diagnosticTestSuite) SetupSuite() {
mustBootstrapCluster(re, suite.svr)
mustPutStore(re, suite.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
mustPutStore(re, suite.svr, 2, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
suite.svr.GetRaftCluster().GetCoordinator().GetPrepareChecker().SetPrepared()
}

func (suite *diagnosticTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -130,9 +131,7 @@ func (suite *diagnosticTestSuite) TestSchedulerDiagnosticAPI() {
re.NoError(err)
suite.checkStatus("pending", balanceRegionURL)

fmt.Println("before put region")
mustPutRegion(re, suite.svr, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60))
fmt.Println("after put region")
suite.checkStatus("normal", balanceRegionURL)

deleteURL := fmt.Sprintf("%s/%s", suite.schedulerPrefix, types.BalanceRegionScheduler.String())
Expand Down
7 changes: 4 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2448,13 +2448,14 @@ func (c *RaftCluster) GetProgressByID(storeID string) (action string, process, l
}
progress := c.progressManager.GetProgresses(filter)
if len(progress) != 0 {
process, ls, cs, err = c.progressManager.Status(progress[0])
pg := progress[0]
process, ls, cs, err = c.progressManager.Status(pg)
if err != nil {
return
}
if strings.HasPrefix(progress[0], removingAction) {
if strings.HasPrefix(pg, removingAction) {
action = removingAction
} else if strings.HasPrefix(progress[0], preparingAction) {
} else if strings.HasPrefix(pg, preparingAction) {
action = preparingAction
}
return
Expand Down
Loading
Loading