diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index 6a28c04ca85a..f691432c72c3 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -15,6 +15,7 @@ package schedule import ( + "fmt" "time" "github.com/pingcap/log" @@ -39,6 +40,7 @@ func newPrepareChecker() *prepareChecker { func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool { checker.Lock() defer checker.Unlock() + log.Info("check prepare checker", zap.Bool("prepared", checker.prepared), zap.String("count", fmt.Sprintf("%d", c.GetTotalRegionCount())), zap.String("not-from-storage-count", fmt.Sprintf("%d", c.GetClusterNotFromStorageRegionsCnt()))) if checker.prepared { return true } diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 14df5ff8eeab..d52630718b59 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -1,17 +1,3 @@ -// Copyright 2018 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package api import ( @@ -33,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -169,7 +156,6 @@ func (suite *middlewareTestSuite) TestRequestInfoMiddleware() { resp.Body.Close() re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode) - re.Equal("Profile", resp.Header.Get("service-label")) re.Equal("{\"seconds\":[\"1\"]}", resp.Header.Get("url-param")) re.Equal("{\"testkey\":\"testvalue\"}", resp.Header.Get("body-param")) @@ -1100,3 +1086,97 @@ func sendRequest(re *require.Assertions, url string, method string, statusCode i resp.Body.Close() return output } + +func TestDeleteAllRegionCacheScheduling(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + // 启动服务器 + re.NoError(cluster.RunInitialServers()) + re.NotEmpty(cluster.WaitLeader()) + + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + + // 引导启动集群 + re.NoError(leaderServer.BootstrapCluster()) + + rc := leaderServer.GetRaftCluster() + re.NotNil(rc) + + // 准备测试数据:添加 stores + for i := uint64(1); i <= 3; i++ { + store := &metapb.Store{ + Id: i, + Address: fmt.Sprintf("tikv%d", i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + } + tests.MustPutStore(re, cluster, store) + } + + // 创建测试 region + region := &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("b"), + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 12, StoreId: 2, Role: metapb.PeerRole_Voter}, + }, + } + + // 创建 region 信息并模拟心跳 + regionInfo := core.NewRegionInfo(region, region.Peers[0]) + + // 等待集群初始化完成 + testutil.Eventually(re, func() bool { + return rc.GetCoordinator() != nil + }) + + // 发送 region 心跳 + rc.HandleRegionHeartbeat(regionInfo) + + // 等待 coordinator 准备完成 + testutil.Eventually(re, func() bool { + return rc.GetCoordinator().GetPrepareChecker().IsPrepared() + }, testutil.WithWaitFor(10*time.Second)) + + // 等待调度器生成操作,增加超时时间和检查间隔 + testutil.Eventually(re, func() bool { + count := rc.GetOperatorController().OperatorCount(operator.OpRegion) + t.Logf("current operator count: %d", count) + return count > 0 + }, testutil.WithWaitFor(30*time.Second), testutil.WithTickInterval(500*time.Millisecond)) + + // 调用 API 删除 region cache + addr := leaderServer.GetAddr() + "/pd/api/v1/admin/cache/regions" + t.Logf("calling API: DELETE %s", addr) + output := sendRequest(re, addr, http.MethodDelete, http.StatusOK) + re.Empty(output) + + // 验证调度被重置 + re.Equal(0, int(rc.GetOperatorController().OperatorCount(operator.OpRegion))) + + // 模拟持续的 region 心跳上报 + for i := 0; i < 10; i++ { + rc.HandleRegionHeartbeat(regionInfo) + time.Sleep(100 * time.Millisecond) + } + + // 验证调度器最终会重新生成调度操作 + testutil.Eventually(re, func() bool { + count := rc.GetOperatorController().OperatorCount(operator.OpRegion) + t.Logf("final operator count: %d", count) + return count > 0 + }, testutil.WithWaitFor(10*time.Second), testutil.WithTickInterval(500*time.Millisecond)) +}