Skip to content

Commit

Permalink
Refine the test
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 24, 2023
1 parent 33ea03a commit aced6d8
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 21 deletions.
2 changes: 1 addition & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRules sets the placement rules in batch.
// SetPlacementRuleInBatch sets the placement rules in batch.
func (c *client) SetPlacementRuleInBatch(ctx context.Context, ruleOps []*RuleOp) error {
ruleOpsJSON, err := json.Marshal(ruleOps)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/utils/tsoutil/tsoutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
logicalBits = (1 << physicalShiftBits) - 1
)

// TimeToTS converts a `time.Time` to an `uint64` TS.
func TimeToTS(t time.Time) uint64 {
return ComposeTS(t.UnixNano()/int64(time.Millisecond), 0)
}

// ParseTS parses the ts to (physical,logical).
func ParseTS(ts uint64) (time.Time, uint64) {
physical, logical := ParseTSUint64(ts)
Expand Down
6 changes: 4 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,9 @@ func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
return nil
}

func (c *RaftCluster) checkAndUpdateMinResolvedTS() (uint64, bool) {
// CheckAndUpdateMinResolvedTS checks and updates the min resolved ts of the cluster.
// This is exported for testing purpose.
func (c *RaftCluster) CheckAndUpdateMinResolvedTS() (uint64, bool) {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -2284,7 +2286,7 @@ func (c *RaftCluster) runMinResolvedTSJob() {
case <-ticker.C:
interval = c.opt.GetMinResolvedTSPersistenceInterval()
if interval != 0 {
if current, needPersist := c.checkAndUpdateMinResolvedTS(); needPersist {
if current, needPersist := c.CheckAndUpdateMinResolvedTS(); needPersist {
c.storage.SaveMinResolvedTS(current)
}
} else {
Expand Down
42 changes: 24 additions & 18 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/tests"
)

Expand Down Expand Up @@ -73,27 +75,30 @@ func (suite *httpClientTestSuite) TearDownSuite() {

func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() {
re := suite.Require()
var (
minResolvedTS uint64
storeMinResolvedTSMap map[uint64]uint64
err error
)
// Wait for the cluster-level min resolved TS to be initialized.
testMinResolvedTS := tsoutil.TimeToTS(time.Now())
raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster()
err := raftCluster.SetMinResolvedTS(1, testMinResolvedTS)
re.NoError(err)
// Make sure the min resolved TS is updated.
testutil.Eventually(re, func() bool {
minResolvedTS, storeMinResolvedTSMap, err = suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, nil)
re.NoError(err)
return minResolvedTS > 0 && len(storeMinResolvedTSMap) == 0
minResolvedTS, _ := raftCluster.CheckAndUpdateMinResolvedTS()
return minResolvedTS == testMinResolvedTS
})
// Wait for the cluster-level min resolved TS to be initialized.
minResolvedTS, storeMinResolvedTSMap, err := suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, nil)
re.NoError(err)
re.Equal(testMinResolvedTS, minResolvedTS)
re.Empty(storeMinResolvedTSMap)
// Get the store-level min resolved TS.
minResolvedTS, storeMinResolvedTSMap, err = suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, []uint64{1})
re.NoError(err)
re.Greater(minResolvedTS, uint64(0))
re.Equal(testMinResolvedTS, minResolvedTS)
re.Len(storeMinResolvedTSMap, 1)
re.Equal(minResolvedTS, storeMinResolvedTSMap[1])
// Get the store-level min resolved TS with an invalid store ID.
minResolvedTS, storeMinResolvedTSMap, err = suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, []uint64{1, 2})
re.NoError(err)
re.Greater(minResolvedTS, uint64(0))
re.Equal(testMinResolvedTS, minResolvedTS)
re.Len(storeMinResolvedTSMap, 2)
re.Equal(minResolvedTS, storeMinResolvedTSMap[1])
re.Equal(uint64(math.MaxUint64), storeMinResolvedTSMap[2])
Expand Down Expand Up @@ -265,16 +270,17 @@ func (suite *httpClientTestSuite) TestRegionLabel() {

func (suite *httpClientTestSuite) TestAccelerateSchedule() {
re := suite.Require()
r1 := core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2"))
r2 := core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3"))
raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster()
err := raftCluster.HandleRegionHeartbeat(r1)
re.NoError(err)
err = raftCluster.HandleRegionHeartbeat(r2)
re.NoError(err)
for _, region := range []*core.RegionInfo{
core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")),
core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")),
} {
err := raftCluster.HandleRegionHeartbeat(region)
re.NoError(err)
}
suspectRegions := raftCluster.GetSuspectRegions()
re.Len(suspectRegions, 0)
err = suite.client.AccelerateSchedule(suite.ctx, []byte("a1"), []byte("a2"))
err := suite.client.AccelerateSchedule(suite.ctx, []byte("a1"), []byte("a2"))
re.NoError(err)
suspectRegions = raftCluster.GetSuspectRegions()
re.Len(suspectRegions, 1)
Expand Down

0 comments on commit aced6d8

Please sign in to comment.