Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx committed Nov 29, 2024
1 parent 76b797d commit e60afe6
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 15 deletions.
2 changes: 2 additions & 0 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package schedule

import (
"fmt"
"time"

"github.com/pingcap/log"
Expand All @@ -39,6 +40,7 @@ func newPrepareChecker() *prepareChecker {
func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool {
checker.Lock()
defer checker.Unlock()
log.Info("check prepare checker", zap.Bool("prepared", checker.prepared), zap.String("count", fmt.Sprintf("%d", c.GetTotalRegionCount())), zap.String("not-from-storage-count", fmt.Sprintf("%d", c.GetClusterNotFromStorageRegionsCnt())))
if checker.prepared {
return true
}
Expand Down
110 changes: 95 additions & 15 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
// Copyright 2018 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
Expand All @@ -33,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -169,7 +156,6 @@ func (suite *middlewareTestSuite) TestRequestInfoMiddleware() {
resp.Body.Close()
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode)

re.Equal("Profile", resp.Header.Get("service-label"))
re.Equal("{\"seconds\":[\"1\"]}", resp.Header.Get("url-param"))
re.Equal("{\"testkey\":\"testvalue\"}", resp.Header.Get("body-param"))
Expand Down Expand Up @@ -1100,3 +1086,97 @@ func sendRequest(re *require.Assertions, url string, method string, statusCode i
resp.Body.Close()
return output
}

func TestDeleteAllRegionCacheScheduling(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

// 启动服务器
re.NoError(cluster.RunInitialServers())
re.NotEmpty(cluster.WaitLeader())

leaderServer := cluster.GetLeaderServer()
re.NotNil(leaderServer)

// 引导启动集群
re.NoError(leaderServer.BootstrapCluster())

rc := leaderServer.GetRaftCluster()
re.NotNil(rc)

// 准备测试数据:添加 stores
for i := uint64(1); i <= 3; i++ {
store := &metapb.Store{
Id: i,
Address: fmt.Sprintf("tikv%d", i),
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
}
tests.MustPutStore(re, cluster, store)
}

// 创建测试 region
region := &metapb.Region{
Id: 1,
StartKey: []byte(""),
EndKey: []byte("b"),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 12, StoreId: 2, Role: metapb.PeerRole_Voter},
},
}

// 创建 region 信息并模拟心跳
regionInfo := core.NewRegionInfo(region, region.Peers[0])

// 等待集群初始化完成
testutil.Eventually(re, func() bool {
return rc.GetCoordinator() != nil
})

// 发送 region 心跳
rc.HandleRegionHeartbeat(regionInfo)

// 等待 coordinator 准备完成
testutil.Eventually(re, func() bool {
return rc.GetCoordinator().GetPrepareChecker().IsPrepared()
}, testutil.WithWaitFor(10*time.Second))

// 等待调度器生成操作,增加超时时间和检查间隔
testutil.Eventually(re, func() bool {
count := rc.GetOperatorController().OperatorCount(operator.OpRegion)
t.Logf("current operator count: %d", count)
return count > 0
}, testutil.WithWaitFor(30*time.Second), testutil.WithTickInterval(500*time.Millisecond))

// 调用 API 删除 region cache
addr := leaderServer.GetAddr() + "/pd/api/v1/admin/cache/regions"
t.Logf("calling API: DELETE %s", addr)
output := sendRequest(re, addr, http.MethodDelete, http.StatusOK)
re.Empty(output)

// 验证调度被重置
re.Equal(0, int(rc.GetOperatorController().OperatorCount(operator.OpRegion)))

// 模拟持续的 region 心跳上报
for i := 0; i < 10; i++ {
rc.HandleRegionHeartbeat(regionInfo)
time.Sleep(100 * time.Millisecond)
}

// 验证调度器最终会重新生成调度操作
testutil.Eventually(re, func() bool {
count := rc.GetOperatorController().OperatorCount(operator.OpRegion)
t.Logf("final operator count: %d", count)
return count > 0
}, testutil.WithWaitFor(10*time.Second), testutil.WithTickInterval(500*time.Millisecond))
}

0 comments on commit e60afe6

Please sign in to comment.