Skip to content

Commit

Permalink
region cache: check if the pd returned regions covers the ranges (#1377
Browse files Browse the repository at this point in the history
…) (#1380)

* cherry pick #1377 to tidb-7.1

Signed-off-by: you06 <[email protected]>

* lint

Signed-off-by: you06 <[email protected]>

* rename function

Signed-off-by: you06 <[email protected]>

* warn when no leader regions missing

Signed-off-by: you06 <[email protected]>

---------

Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 authored Jul 10, 2024
1 parent c6d7a48 commit ce640b9
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 2 deletions.
57 changes: 55 additions & 2 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1784,7 +1784,19 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
metrics.RegionCacheCounterWithScanRegionsOK.Inc()

if len(regionsInfo) == 0 {
return nil, errors.New("PD returned no region")
return nil, errors.Errorf(
"PD returned no region, startKey: %q, endKey: %q, limit: %d, encode_start_key: %q, encode_end_key: %q",
util.HexRegionKeyStr(startKey), util.HexRegionKeyStr(endKey), limit,
util.HexRegionKeyStr(c.codec.EncodeRegionKey(startKey)), util.HexRegionKeyStr(c.codec.EncodeRegionKey(endKey)),
)
}

if regionsHaveGapInRange(startKey, endKey, regionsInfo, limit) {
backoffErr = errors.Errorf(
"PD returned regions have gaps, startKey: %q, endKey: %q, limit: %d",
startKey, endKey, limit,
)
continue
}
regions := make([]*Region, 0, len(regionsInfo))
for _, r := range regionsInfo {
Expand All @@ -1802,13 +1814,54 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
return nil, errors.New("receive Regions with no peer")
}
if len(regions) < len(regionsInfo) {
logutil.Logger(context.Background()).Debug(
logutil.Logger(context.Background()).Warn(
"regionCache: scanRegion finished but some regions has no leader.")
}
return regions, nil
}
}

// regionsHaveGapInRange checks if the loaded regions can fully cover the key ranges.
// If there are any gaps between the regions, it returns true, then the requests might be retried.
// TODO: remove this function after PD client supports gap detection and handling it.
func regionsHaveGapInRange(start, end []byte, regionsInfo []*pd.Region, limit int) bool {
if len(regionsInfo) == 0 {
return true
}
var lastEndKey []byte
for i, r := range regionsInfo {
if r.Meta == nil {
return true
}
if i == 0 {
if bytes.Compare(r.Meta.StartKey, start) > 0 {
// there is a gap between first returned region's start_key and start key.
return true
}
}
if i > 0 && bytes.Compare(r.Meta.StartKey, lastEndKey) > 0 {
// there is a gap between two regions.
return true
}
if len(r.Meta.EndKey) == 0 {
// the current region contains all the rest ranges.
return false
}
// note lastEndKey never be empty.
lastEndKey = r.Meta.EndKey
}
if limit > 0 && len(regionsInfo) == limit {
// the regionsInfo is limited by the limit, so there may be some ranges not covered.
// The rest range will be loaded in the next scanRegions call.
return false
}
if len(end) == 0 {
// the end key of the range is empty, but we can't cover it.
return true
}
return bytes.Compare(lastEndKey, end) < 0
}

// GetCachedRegionWithRLock returns region with lock.
func (c *RegionCache) GetCachedRegionWithRLock(regionID RegionVerID) (r *Region) {
c.mu.RLock()
Expand Down
106 changes: 106 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"unsafe"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -1782,3 +1783,108 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() {

cancel()
}

func (s *testRegionCacheSuite) TestRangesAreCoveredCheck() {
check := func(ranges []string, regions []string, limit int, expect bool) {
s.Len(ranges, 2)
rgs := make([]*pd.Region, 0, len(regions))
for i := 0; i < len(regions); i += 2 {
rgs = append(rgs, &pd.Region{Meta: &metapb.Region{
StartKey: []byte(regions[i]),
EndKey: []byte(regions[i+1]),
}})
}
s.Equal(expect, regionsHaveGapInRange([]byte(ranges[0]), []byte(ranges[1]), rgs, limit))
}

boundCase := []string{"a", "c"}
// positive
check(boundCase, []string{"a", "c"}, -1, false)
check(boundCase, []string{"a", ""}, -1, false)
check(boundCase, []string{"", "c"}, -1, false)
// negative
check(boundCase, []string{"a", "b"}, -1, true)
check(boundCase, []string{"b", "c"}, -1, true)
check(boundCase, []string{"b", ""}, -1, true)
check(boundCase, []string{"", "b"}, -1, true)
// positive
check(boundCase, []string{"a", "b", "b", "c"}, -1, false)
check(boundCase, []string{"", "b", "b", "c"}, -1, false)
check(boundCase, []string{"a", "b", "b", ""}, -1, false)
check(boundCase, []string{"", "b", "b", ""}, -1, false)
// negative
check(boundCase, []string{"a", "b", "b1", "c"}, -1, true)
check(boundCase, []string{"", "b", "b1", "c"}, -1, true)
check(boundCase, []string{"a", "b", "b1", ""}, -1, true)
check(boundCase, []string{"", "b", "b1", ""}, -1, true)
check(boundCase, []string{}, -1, true)

unboundCase := []string{"", ""}
// positive
check(unboundCase, []string{"", ""}, -1, false)
// negative
check(unboundCase, []string{"a", "c"}, -1, true)
check(unboundCase, []string{"a", ""}, -1, true)
check(unboundCase, []string{"", "c"}, -1, true)
// positive
check(unboundCase, []string{"", "b", "b", ""}, -1, false)
// negative
check(unboundCase, []string{"", "b", "b1", ""}, -1, true)
check(unboundCase, []string{"a", "b", "b", ""}, -1, true)
check(unboundCase, []string{"", "b", "b", "c"}, -1, true)
check(unboundCase, []string{}, -1, true)

// test half bounded ranges
check([]string{"", "b"}, []string{"", "a"}, -1, true)
check([]string{"", "b"}, []string{"", "a"}, 1, false) // it's just limitation reached
check([]string{"", "b"}, []string{"", "a"}, 2, true)
check([]string{"a", ""}, []string{"b", ""}, -1, true)
check([]string{"a", ""}, []string{"b", ""}, 1, true)
check([]string{"a", ""}, []string{"b", "c"}, 1, true)
check([]string{"a", ""}, []string{"a", ""}, -1, false)
}

func (s *testRegionCacheSuite) TestScanRegionsWithGaps() {
// Split at "a", "c", "e"
// nil --- 'a' --- 'c' --- 'e' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 -->
regions := s.cluster.AllocIDs(3)
regions = append([]uint64{s.region1}, regions...)

peers := [][]uint64{{s.peer1, s.peer2}}
for i := 0; i < 3; i++ {
peers = append(peers, s.cluster.AllocIDs(2))
}

for i := 0; i < 3; i++ {
s.cluster.Split(regions[i], regions[i+1], []byte{'a' + 2*byte(i)}, peers[i+1], peers[i+1][0])
}

// the last region is not reported to PD yet
getRegionIDsWithInject := func(fn func() ([]*Region, error)) []uint64 {
s.cache.clear()
err := failpoint.Enable("tikvclient/mockSplitRegionNotReportToPD", fmt.Sprintf(`return(%d)`, regions[2]))
s.Nil(err)
resCh := make(chan []*Region)
errCh := make(chan error)
go func() {
rs, err := fn()
errCh <- err
resCh <- rs
}()
time.Sleep(time.Second)
failpoint.Disable("tikvclient/mockSplitRegionNotReportToPD")
s.Nil(<-errCh)
rs := <-resCh
regionIDs := make([]uint64, 0, len(rs))
for _, r := range rs {
regionIDs = append(regionIDs, r.GetID())
}
return regionIDs
}

scanRegionRes := getRegionIDsWithInject(func() ([]*Region, error) {
return s.cache.BatchLoadRegionsWithKeyRange(s.bo, []byte(""), []byte(""), 10)
})
s.Equal(scanRegionRes, regions)
}
10 changes: 10 additions & 0 deletions internal/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/client-go/v2/internal/mockstore/cluster"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
)

Expand Down Expand Up @@ -348,6 +349,15 @@ func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*pd.Region {
regions = regions[:endPos]
}
}
if rid, err := util.EvalFailpoint("mockSplitRegionNotReportToPD"); err == nil {
notReportRegionID := uint64(rid.(int))
for i, r := range regions {
if r.Meta.Id == notReportRegionID {
regions = append(regions[:i], regions[i+1:]...)
break
}
}
}
if limit > 0 && len(regions) > limit {
regions = regions[:limit]
}
Expand Down

0 comments on commit ce640b9

Please sign in to comment.