Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Dec 5, 2024
1 parent 5d6c9f9 commit 9c2b83a
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 43 deletions.
9 changes: 8 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
Expand Down Expand Up @@ -66,6 +67,7 @@ type Cluster struct {
const (
regionLabelGCInterval = time.Hour
requestTimeout = 3 * time.Second
collectWaitTime = time.Minute

// heartbeat relative const
heartbeatTaskRunner = "heartbeat-task-runner"
Expand Down Expand Up @@ -489,7 +491,12 @@ func (c *Cluster) runUpdateStoreStats() {
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
c.coordinator.RunUntilStop()
// force wait for 1 minute to make prepare checker won't be directly skipped
runCollectWaitTime := collectWaitTime
failpoint.Inject("changeRunCollectWaitTime", func() {
runCollectWaitTime = 1 * time.Second
})
c.coordinator.RunUntilStop(runCollectWaitTime)
}

func (c *Cluster) runMetricsCollectionJob() {
Expand Down
39 changes: 30 additions & 9 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
)

const (
runPrepareCheckerInterval = 3 * time.Second
runSchedulerCheckInterval = 3 * time.Second
collectTimeout = 5 * time.Minute
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
pushOperatorTickInterval = 500 * time.Millisecond
Expand Down Expand Up @@ -207,10 +208,7 @@ func (c *Coordinator) runPrepareChecker() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(runPrepareCheckerInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker.Reset(100 * time.Millisecond)
})
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
Expand All @@ -219,16 +217,16 @@ func (c *Coordinator) runPrepareChecker() {
case <-ticker.C:
if !c.prepareChecker.IsPrepared() {
if c.prepareChecker.Check(c.cluster.GetBasicCluster()) {
log.Info("prepare checker is finished")
log.Info("prepare checker is ready")
}
}
}
}
}

// RunUntilStop runs the coordinator until receiving the stop signal.
func (c *Coordinator) RunUntilStop() {
c.Run()
func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) {
c.Run(collectWaitTime...)
<-c.ctx.Done()
log.Info("coordinator is stopping")
c.GetSchedulersController().Wait()
Expand All @@ -237,7 +235,25 @@ func (c *Coordinator) RunUntilStop() {
}

// Run starts coordinator.
func (c *Coordinator) Run() {
func (c *Coordinator) Run(collectWaitTime ...time.Duration) {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker.Reset(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
if c.ShouldRun(collectWaitTime...) {
log.Info("coordinator has finished cluster information preparation")
break
}
select {
case <-ticker.C:
case <-c.ctx.Done():
log.Info("coordinator stops running")
return
}
}
log.Info("coordinator starts to run schedulers")
c.InitSchedulers(true)

Expand Down Expand Up @@ -552,6 +568,11 @@ func ResetHotSpotMetrics() {
schedulers.HotPendingSum.Reset()
}

// ShouldRun returns true if the coordinator should run.
func (c *Coordinator) ShouldRun(collectWaitTime ...time.Duration) bool {
return c.prepareChecker.Check(c.cluster.GetBasicCluster(), collectWaitTime...)
}

// GetSchedulersController returns the schedulers controller.
func (c *Coordinator) GetSchedulersController() *schedulers.Controller {
return c.schedulers
Expand Down
13 changes: 8 additions & 5 deletions pkg/schedule/core/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,22 @@ func NewPrepareChecker() *PrepareChecker {
}

// Check checks if the coordinator has finished cluster information preparation.
func (checker *PrepareChecker) Check(c *core.BasicCluster) bool {
func (checker *PrepareChecker) Check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool {
checker.Lock()
defer checker.Unlock()
if checker.prepared {
return true
}

if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
if totalRegionsCnt == 0 {
if len(collectWaitTime) > 0 && time.Since(checker.start) < collectWaitTime[0] {
return false
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
// The number of active regions should be more than total region of all stores * core.CollectFactor
if float64(totalRegionsCnt)*core.CollectFactor > float64(notLoadedFromRegionsCnt) {
return false
Expand All @@ -63,7 +66,7 @@ func (checker *PrepareChecker) Check(c *core.BasicCluster) bool {
}
storeID := store.GetID()
// It is used to avoid sudden scheduling when scheduling service is just started.
if float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) {
return false
}
if !c.IsStorePrepared(storeID) {
Expand Down
115 changes: 103 additions & 12 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2389,12 +2389,12 @@ func (c *testCluster) updateLeaderCount(storeID uint64, leaderCount int) error {
return c.setStore(newStore)
}

func (c *testCluster) addLeaderStore(storeID uint64) error {
func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error {
stats := &pdpb.StoreStats{}
newStore := core.NewStoreInfo(&metapb.Store{Id: storeID},
core.SetStoreStats(stats),
core.SetLeaderCount(1),
core.SetLeaderSize(10),
core.SetLeaderCount(leaderCount),
core.SetLeaderSize(int64(leaderCount)*10),
core.SetLastHeartbeatTS(time.Now()),
)

Expand Down Expand Up @@ -3029,7 +3029,6 @@ func TestPeerState(t *testing.T) {

tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re)
defer cleanup()
co.GetPrepareChecker().SetPrepared()

// Transfer peer from store 4 to store 1.
re.NoError(tc.addRegionStore(1, 10))
Expand Down Expand Up @@ -3069,12 +3068,104 @@ func TestPeerState(t *testing.T) {
waitNoResponse(re, stream)
}

func TestShouldRun(t *testing.T) {
re := require.New(t)

tc, co, cleanup := prepare(nil, nil, nil, re)
tc.RaftCluster.coordinator = co
defer cleanup()

re.NoError(tc.addLeaderStore(1, 5))
re.NoError(tc.addLeaderStore(2, 2))
re.NoError(tc.addLeaderStore(3, 0))
re.NoError(tc.addLeaderStore(4, 0))
re.NoError(tc.LoadRegion(1, 1, 2, 3))
re.NoError(tc.LoadRegion(2, 1, 2, 3))
re.NoError(tc.LoadRegion(3, 1, 2, 3))
re.NoError(tc.LoadRegion(4, 1, 2, 3))
re.NoError(tc.LoadRegion(5, 1, 2, 3))
re.NoError(tc.LoadRegion(6, 2, 1, 4))
re.NoError(tc.LoadRegion(7, 2, 1, 4))
re.False(co.ShouldRun())
re.Equal(2, tc.GetStoreRegionCount(4))

testCases := []struct {
regionID uint64
ShouldRun bool
}{
{1, false},
{2, false},
{3, false},
{4, false},
{5, false},
// store4 needs Collect two region
{6, false},
{7, true},
}

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr))
re.Equal(testCase.ShouldRun, co.ShouldRun())
}
nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion))
re.Equal(7, tc.GetClusterNotFromStorageRegionsCnt())
}

func TestShouldRunWithNonLeaderRegions(t *testing.T) {
re := require.New(t)

tc, co, cleanup := prepare(nil, nil, nil, re)
tc.RaftCluster.coordinator = co
defer cleanup()

re.NoError(tc.addLeaderStore(1, 10))
re.NoError(tc.addLeaderStore(2, 0))
re.NoError(tc.addLeaderStore(3, 0))
for i := range 10 {
re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3))
}
re.False(co.ShouldRun())
re.Equal(10, tc.GetStoreRegionCount(1))

testCases := []struct {
regionID uint64
ShouldRun bool
}{
{1, false},
{2, false},
{3, false},
{4, false},
{5, false},
{6, false},
{7, false},
{8, false},
{9, true},
}

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr))
re.Equal(testCase.ShouldRun, co.ShouldRun())
}
nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion))
re.Equal(9, tc.GetClusterNotFromStorageRegionsCnt())

// Now, after server is prepared, there exist some regions with no leader.
re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId())
}

func TestAddScheduler(t *testing.T) {
re := require.New(t)

tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re)
defer cleanup()
co.GetPrepareChecker().SetPrepared()
controller := co.GetSchedulersController()
re.Len(controller.GetSchedulerNames(), len(sc.DefaultSchedulers))
re.NoError(controller.RemoveScheduler(types.BalanceLeaderScheduler.String()))
Expand All @@ -3086,9 +3177,9 @@ func TestAddScheduler(t *testing.T) {
stream := mockhbstream.NewHeartbeatStream()

// Add stores 1,2,3
re.NoError(tc.addLeaderStore(1))
re.NoError(tc.addLeaderStore(2))
re.NoError(tc.addLeaderStore(3))
re.NoError(tc.addLeaderStore(1, 1))
re.NoError(tc.addLeaderStore(2, 1))
re.NoError(tc.addLeaderStore(3, 1))
// Add regions 1 with leader in store 1 and followers in stores 2,3
re.NoError(tc.addLeaderRegion(1, 1, 2, 3))
// Add regions 2 with leader in store 2 and followers in stores 1,3
Expand Down Expand Up @@ -3152,8 +3243,8 @@ func TestPersistScheduler(t *testing.T) {
defer cleanup()
defaultCount := len(sc.DefaultSchedulers)
// Add stores 1,2
re.NoError(tc.addLeaderStore(1))
re.NoError(tc.addLeaderStore(2))
re.NoError(tc.addLeaderStore(1, 1))
re.NoError(tc.addLeaderStore(2, 1))

controller := co.GetSchedulersController()
re.Len(controller.GetSchedulerNames(), defaultCount)
Expand Down Expand Up @@ -3268,8 +3359,8 @@ func TestRemoveScheduler(t *testing.T) {
defer cleanup()

// Add stores 1,2
re.NoError(tc.addLeaderStore(1))
re.NoError(tc.addLeaderStore(2))
re.NoError(tc.addLeaderStore(1, 1))
re.NoError(tc.addLeaderStore(2, 1))
defaultCount := len(sc.DefaultSchedulers)
controller := co.GetSchedulersController()
re.Len(controller.GetSchedulerNames(), defaultCount)
Expand Down
2 changes: 2 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ func TestAPI(t *testing.T) {
func (suite *apiTestSuite) SetupSuite() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`))
suite.env = tests.NewSchedulingTestEnvironment(suite.T())
}

func (suite *apiTestSuite) TearDownSuite() {
suite.env.Cleanup()
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime"))
}

func (suite *apiTestSuite) TestGetCheckerByName() {
Expand Down
3 changes: 0 additions & 3 deletions tests/integrations/mcs/scheduling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/cache"
Expand All @@ -37,7 +36,6 @@ import (
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/server/api"
"go.uber.org/zap"
)

type configTestSuite struct {
Expand Down Expand Up @@ -135,7 +133,6 @@ func (suite *configTestSuite) TestConfigWatch() {
// Manually trigger the config persistence in the PD API server side.
func persistConfig(re *require.Assertions, pdLeaderServer *tests.TestServer) {
err := pdLeaderServer.GetPersistOptions().Persist(pdLeaderServer.GetServer().GetStorage())
log.Info("persistConfig", zap.Reflect("opts", pdLeaderServer.GetPersistOptions()))
re.NoError(err)
}

Expand Down
8 changes: 3 additions & 5 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (suite *serverTestSuite) SetupSuite() {
var err error
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1)
Expand All @@ -81,6 +82,7 @@ func (suite *serverTestSuite) TearDownSuite() {
suite.cluster.Destroy()
suite.cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
}

Expand Down Expand Up @@ -498,13 +500,9 @@ func (suite *serverTestSuite) TestStoreLimit() {
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
leaderServer := suite.pdLeader.GetServer()
tc.WaitForPrimaryServing(re)
testutil.Eventually(re, func() bool {
return leaderServer.GetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName)
})
oc := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetOperatorController()

leaderServer := suite.pdLeader.GetServer()
conf := leaderServer.GetReplicationConfig().Clone()
conf.MaxReplicas = 1
leaderServer.SetReplicationConfig(*conf)
Expand Down
2 changes: 1 addition & 1 deletion tests/scheduling_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (tc *TestSchedulingCluster) WaitForPrimaryServing(re *require.Assertions) *
var primary *scheduling.Server
testutil.Eventually(re, func() bool {
for _, server := range tc.servers {
if server.IsServing() && server.GetCoordinator().AreSchedulersInitialized() {
if server.IsServing() {
primary = server
return true
}
Expand Down
Loading

0 comments on commit 9c2b83a

Please sign in to comment.