diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index b80b938591..ec2c87b6ae 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1784,7 +1784,19 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, metrics.RegionCacheCounterWithScanRegionsOK.Inc() if len(regionsInfo) == 0 { - return nil, errors.New("PD returned no region") + return nil, errors.Errorf( + "PD returned no region, startKey: %q, endKey: %q, limit: %d, encode_start_key: %q, encode_end_key: %q", + util.HexRegionKeyStr(startKey), util.HexRegionKeyStr(endKey), limit, + util.HexRegionKeyStr(c.codec.EncodeRegionKey(startKey)), util.HexRegionKeyStr(c.codec.EncodeRegionKey(endKey)), + ) + } + + if regionsHaveGapInRange(startKey, endKey, regionsInfo, limit) { + backoffErr = errors.Errorf( + "PD returned regions have gaps, startKey: %q, endKey: %q, limit: %d", + startKey, endKey, limit, + ) + continue } regions := make([]*Region, 0, len(regionsInfo)) for _, r := range regionsInfo { @@ -1802,13 +1814,54 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, return nil, errors.New("receive Regions with no peer") } if len(regions) < len(regionsInfo) { - logutil.Logger(context.Background()).Debug( + logutil.Logger(context.Background()).Warn( "regionCache: scanRegion finished but some regions has no leader.") } return regions, nil } } +// regionsHaveGapInRange checks if the loaded regions can fully cover the key ranges. +// If there are any gaps between the regions, it returns true, then the requests might be retried. +// TODO: remove this function after PD client supports gap detection and handling it. +func regionsHaveGapInRange(start, end []byte, regionsInfo []*pd.Region, limit int) bool { + if len(regionsInfo) == 0 { + return true + } + var lastEndKey []byte + for i, r := range regionsInfo { + if r.Meta == nil { + return true + } + if i == 0 { + if bytes.Compare(r.Meta.StartKey, start) > 0 { + // there is a gap between first returned region's start_key and start key. + return true + } + } + if i > 0 && bytes.Compare(r.Meta.StartKey, lastEndKey) > 0 { + // there is a gap between two regions. + return true + } + if len(r.Meta.EndKey) == 0 { + // the current region contains all the rest ranges. + return false + } + // note lastEndKey never be empty. + lastEndKey = r.Meta.EndKey + } + if limit > 0 && len(regionsInfo) == limit { + // the regionsInfo is limited by the limit, so there may be some ranges not covered. + // The rest range will be loaded in the next scanRegions call. + return false + } + if len(end) == 0 { + // the end key of the range is empty, but we can't cover it. + return true + } + return bytes.Compare(lastEndKey, end) < 0 +} + // GetCachedRegionWithRLock returns region with lock. func (c *RegionCache) GetCachedRegionWithRLock(regionID RegionVerID) (r *Region) { c.mu.RLock() diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 6226a1c608..0a6b55da0c 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -46,6 +46,7 @@ import ( "unsafe" "github.com/gogo/protobuf/proto" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" @@ -1782,3 +1783,108 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() { cancel() } + +func (s *testRegionCacheSuite) TestRangesAreCoveredCheck() { + check := func(ranges []string, regions []string, limit int, expect bool) { + s.Len(ranges, 2) + rgs := make([]*pd.Region, 0, len(regions)) + for i := 0; i < len(regions); i += 2 { + rgs = append(rgs, &pd.Region{Meta: &metapb.Region{ + StartKey: []byte(regions[i]), + EndKey: []byte(regions[i+1]), + }}) + } + s.Equal(expect, regionsHaveGapInRange([]byte(ranges[0]), []byte(ranges[1]), rgs, limit)) + } + + boundCase := []string{"a", "c"} + // positive + check(boundCase, []string{"a", "c"}, -1, false) + check(boundCase, []string{"a", ""}, -1, false) + check(boundCase, []string{"", "c"}, -1, false) + // negative + check(boundCase, []string{"a", "b"}, -1, true) + check(boundCase, []string{"b", "c"}, -1, true) + check(boundCase, []string{"b", ""}, -1, true) + check(boundCase, []string{"", "b"}, -1, true) + // positive + check(boundCase, []string{"a", "b", "b", "c"}, -1, false) + check(boundCase, []string{"", "b", "b", "c"}, -1, false) + check(boundCase, []string{"a", "b", "b", ""}, -1, false) + check(boundCase, []string{"", "b", "b", ""}, -1, false) + // negative + check(boundCase, []string{"a", "b", "b1", "c"}, -1, true) + check(boundCase, []string{"", "b", "b1", "c"}, -1, true) + check(boundCase, []string{"a", "b", "b1", ""}, -1, true) + check(boundCase, []string{"", "b", "b1", ""}, -1, true) + check(boundCase, []string{}, -1, true) + + unboundCase := []string{"", ""} + // positive + check(unboundCase, []string{"", ""}, -1, false) + // negative + check(unboundCase, []string{"a", "c"}, -1, true) + check(unboundCase, []string{"a", ""}, -1, true) + check(unboundCase, []string{"", "c"}, -1, true) + // positive + check(unboundCase, []string{"", "b", "b", ""}, -1, false) + // negative + check(unboundCase, []string{"", "b", "b1", ""}, -1, true) + check(unboundCase, []string{"a", "b", "b", ""}, -1, true) + check(unboundCase, []string{"", "b", "b", "c"}, -1, true) + check(unboundCase, []string{}, -1, true) + + // test half bounded ranges + check([]string{"", "b"}, []string{"", "a"}, -1, true) + check([]string{"", "b"}, []string{"", "a"}, 1, false) // it's just limitation reached + check([]string{"", "b"}, []string{"", "a"}, 2, true) + check([]string{"a", ""}, []string{"b", ""}, -1, true) + check([]string{"a", ""}, []string{"b", ""}, 1, true) + check([]string{"a", ""}, []string{"b", "c"}, 1, true) + check([]string{"a", ""}, []string{"a", ""}, -1, false) +} + +func (s *testRegionCacheSuite) TestScanRegionsWithGaps() { + // Split at "a", "c", "e" + // nil --- 'a' --- 'c' --- 'e' --- nil + // <- 0 -> <- 1 -> <- 2 -> <- 3 --> + regions := s.cluster.AllocIDs(3) + regions = append([]uint64{s.region1}, regions...) + + peers := [][]uint64{{s.peer1, s.peer2}} + for i := 0; i < 3; i++ { + peers = append(peers, s.cluster.AllocIDs(2)) + } + + for i := 0; i < 3; i++ { + s.cluster.Split(regions[i], regions[i+1], []byte{'a' + 2*byte(i)}, peers[i+1], peers[i+1][0]) + } + + // the last region is not reported to PD yet + getRegionIDsWithInject := func(fn func() ([]*Region, error)) []uint64 { + s.cache.clear() + err := failpoint.Enable("tikvclient/mockSplitRegionNotReportToPD", fmt.Sprintf(`return(%d)`, regions[2])) + s.Nil(err) + resCh := make(chan []*Region) + errCh := make(chan error) + go func() { + rs, err := fn() + errCh <- err + resCh <- rs + }() + time.Sleep(time.Second) + failpoint.Disable("tikvclient/mockSplitRegionNotReportToPD") + s.Nil(<-errCh) + rs := <-resCh + regionIDs := make([]uint64, 0, len(rs)) + for _, r := range rs { + regionIDs = append(regionIDs, r.GetID()) + } + return regionIDs + } + + scanRegionRes := getRegionIDsWithInject(func() ([]*Region, error) { + return s.cache.BatchLoadRegionsWithKeyRange(s.bo, []byte(""), []byte(""), 10) + }) + s.Equal(scanRegionRes, regions) +} diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index fc8af00d2c..81f4215158 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/client-go/v2/internal/mockstore/cluster" + "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" ) @@ -348,6 +349,15 @@ func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*pd.Region { regions = regions[:endPos] } } + if rid, err := util.EvalFailpoint("mockSplitRegionNotReportToPD"); err == nil { + notReportRegionID := uint64(rid.(int)) + for i, r := range regions { + if r.Meta.Id == notReportRegionID { + regions = append(regions[:i], regions[i+1:]...) + break + } + } + } if limit > 0 && len(regions) > limit { regions = regions[:limit] }