Skip to content

Commit

Permalink
Implement the query region gRPC server interface
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jan 8, 2025
1 parent 973234d commit 16341ff
Show file tree
Hide file tree
Showing 11 changed files with 5,390 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.23
// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20250107061039-28ba77c8a2bc

require (
github.com/AlekSi/gocov-xml v1.0.0
Expand Down
1,716 changes: 1,707 additions & 9 deletions go.sum

Large diffs are not rendered by default.

91 changes: 91 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,97 @@ func (r *RegionsInfo) GetStoreRegions(storeID uint64) []*RegionInfo {
return regions
}

// TODO: benchmark the performance of `QueryRegions`.
// QueryRegions searches RegionInfo from regionTree by keys and IDs in batch.
func (r *RegionsInfo) QueryRegions(
keys, prevKeys [][]byte, ids []uint64, needBuckets bool,
) ([]uint64, []uint64, map[uint64]*pdpb.RegionResponse) {
// Iterate the region keys to find the regions.
regions := r.getRegionsByKeys(keys)
// Assert the returned regions count matches the input keys.
if len(regions) != len(keys) {
panic("returned regions count mismatch with the input keys")

Check warning on line 1476 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1476

Added line #L1476 was not covered by tests
}
// Iterate the prevKeys to find the regions.
prevRegions := r.getRegionsByPrevKeys(prevKeys)
// Assert the returned regions count matches the input keys.
if len(prevRegions) != len(prevKeys) {
panic("returned prev regions count mismatch with the input keys")

Check warning on line 1482 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1482

Added line #L1482 was not covered by tests
}
// Build the key -> ID map for the final results.
regionsByID := make(map[uint64]*pdpb.RegionResponse, len(regions))
keyIDMap := sortOutKeyIDMap(regionsByID, regions, needBuckets)
prevKeyIDMap := sortOutKeyIDMap(regionsByID, prevRegions, needBuckets)
// Iterate the region IDs to find the regions.
for _, id := range ids {
// Check if the region has been found.
if regionFound, ok := regionsByID[id]; (ok && regionFound != nil) || id == 0 {
continue
}
// If the given region ID is not found in the region tree, set the region to nil.
if region := r.GetRegion(id); region == nil {
regionsByID[id] = nil
} else {
regionResp := &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}
if needBuckets {
regionResp.Buckets = region.GetBuckets()
}

Check warning on line 1506 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1505-L1506

Added lines #L1505 - L1506 were not covered by tests
regionsByID[id] = regionResp
}
}
return keyIDMap, prevKeyIDMap, regionsByID
}

// getRegionsByKeys searches RegionInfo from regionTree by keys.
func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.searchByKeys(keys)
}

func (r *RegionsInfo) getRegionsByPrevKeys(prevKeys [][]byte) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.searchByPrevKeys(prevKeys)
}

// sortOutKeyIDMap will iterate the regions, convert it to a slice of regionID that corresponds to the input regions.
// It will also update `regionsByID` with the regionID and regionResponse.
func sortOutKeyIDMap(
regionsByID map[uint64]*pdpb.RegionResponse, regions []*RegionInfo, needBuckets bool,
) []uint64 {
keyIDMap := make([]uint64, len(regions))
for idx, region := range regions {
regionID := region.GetMeta().GetId()
keyIDMap[idx] = regionID
// Check if the region has been found.
if regionFound, ok := regionsByID[regionID]; (ok && regionFound != nil) || regionID == 0 {
continue
}
// If the given key is not found in the region tree, set the region to nil.
if region == nil {
regionsByID[regionID] = nil

Check warning on line 1541 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1541

Added line #L1541 was not covered by tests
} else {
regionResp := &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}
if needBuckets {
regionResp.Buckets = region.GetBuckets()
}

Check warning on line 1551 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1550-L1551

Added lines #L1550 - L1551 were not covered by tests
regionsByID[regionID] = regionResp
}
}
return keyIDMap
}

// SubTreeRegionType is the type of sub tree region.
type SubTreeRegionType string

Expand Down
88 changes: 88 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,3 +1203,91 @@ func TestScanRegion(t *testing.T) {
re.Len(scanNoError([]byte("a"), []byte("e"), 0), 3)
re.Len(scanNoError([]byte("c"), []byte("e"), 0), 1)
}

func TestQueryRegions(t *testing.T) {
re := require.New(t)
regions := NewRegionsInfo()
regions.CheckAndPutRegion(NewTestRegionInfo(1, 1, []byte("a"), []byte("b")))
regions.CheckAndPutRegion(NewTestRegionInfo(2, 1, []byte("b"), []byte("c")))
regions.CheckAndPutRegion(NewTestRegionInfo(3, 1, []byte("d"), []byte("e")))
// Query regions by keys.
keyIDMap, prevKeyIDMap, regionsByID := regions.QueryRegions(
[][]byte{[]byte("a"), []byte("b"), []byte("c")},
nil,
nil,
false,
)
re.Len(keyIDMap, 3)
re.Empty(prevKeyIDMap)
re.Equal(uint64(1), keyIDMap[0])
re.Equal(uint64(2), keyIDMap[1])
re.Zero(keyIDMap[2]) // The key is not in the region tree, so its ID should be 0.
re.Len(regionsByID, 2)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
// Query regions by IDs.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
nil,
nil,
[]uint64{1, 2, 3},
false,
)
re.Empty(keyIDMap)
re.Empty(prevKeyIDMap)
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
// Query the region that does not exist.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
nil,
nil,
[]uint64{4},
false,
)
re.Empty(keyIDMap)
re.Empty(prevKeyIDMap)
re.Len(regionsByID, 1)
re.Nil(regionsByID[4])
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("c")},
nil,
nil,
false,
)
re.Len(keyIDMap, 1)
re.Empty(prevKeyIDMap)
re.Zero(keyIDMap[0])
re.Empty(regionsByID)
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("c")},
nil,
[]uint64{4},
false,
)
re.Len(keyIDMap, 1)
re.Empty(prevKeyIDMap)
re.Zero(keyIDMap[0])
re.Nil(regionsByID[4])
// Query regions by keys, previous keys and IDs.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("b"), []byte("c")},
[][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")},
[]uint64{1, 3},
false,
)
re.Len(keyIDMap, 2)
re.Len(prevKeyIDMap, 6)
re.Equal(uint64(2), keyIDMap[0])
re.Zero(keyIDMap[1])
re.Zero(prevKeyIDMap[0])
re.Equal(uint64(1), prevKeyIDMap[1])
re.Zero(prevKeyIDMap[2])
re.Zero(prevKeyIDMap[3])
re.Zero(prevKeyIDMap[4])
re.Zero(prevKeyIDMap[5])
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
}
20 changes: 20 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,26 @@ func (t *regionTree) searchPrev(regionKey []byte) *RegionInfo {
return prevRegionItem.RegionInfo
}

// searchByKeys searches the regions by keys and return a slice of `*RegionInfo` whose order is the same as the input keys.
func (t *regionTree) searchByKeys(keys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, len(keys))
// TODO: do we need to deduplicate the input keys?
for idx, key := range keys {
regions[idx] = t.search(key)
}
return regions
}

// searchByPrevKeys searches the regions by prevKeys and return a slice of `*RegionInfo` whose order is the same as the input keys.
func (t *regionTree) searchByPrevKeys(prevKeys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, len(prevKeys))
// TODO: do we need to deduplicate the input keys?
for idx, key := range prevKeys {
regions[idx] = t.searchPrev(key)
}
return regions
}

// find returns the range item contains the start key.
func (t *regionTree) find(item *regionItem) *regionItem {
var result *regionItem
Expand Down
52 changes: 52 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,58 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB
}, nil
}

// QueryRegion provides a stream processing of the region query.
func (s *GrpcServer) QueryRegion(stream pdpb.PD_QueryRegionServer) error {
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
if done, err := limiter.Allow(fName); err == nil {
defer done()
} else {
return err
}

Check warning on line 1575 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1567-L1575

Added lines #L1567 - L1575 were not covered by tests
}

for {
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}

Check warning on line 1585 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1578-L1585

Added lines #L1578 - L1585 were not covered by tests

// TODO: add forwarding logic.

if s.IsClosed() {
return errs.ErrNotStarted
}
if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId())
}
needBuckets := s.GetRaftCluster().GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets()

start := time.Now()
keyIDMap, prevKeyIDMap, regionsByID := s.GetRaftCluster().QueryRegions(
request.GetKeys(),
request.GetPrevKeys(),
request.GetIds(),
needBuckets,
)
regionQueryDuration.Observe(time.Since(start).Seconds())
// Build the response and send it to the client.
response := &pdpb.QueryRegionResponse{
Header: wrapHeader(),
KeyIdMap: keyIDMap,
PrevKeyIdMap: prevKeyIDMap,
RegionsById: regionsByID,
}
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
}

Check warning on line 1614 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1589-L1614

Added lines #L1589 - L1614 were not covered by tests
}
}

// Deprecated: use BatchScanRegions instead.
// ScanRegions implements gRPC PDServer.
func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
Expand Down
10 changes: 10 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

regionQueryDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "region_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of region query requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

bucketReportLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down Expand Up @@ -172,6 +181,7 @@ func init() {
prometheus.MustRegister(tsoProxyBatchSize)
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
prometheus.MustRegister(tsoHandleDuration)
prometheus.MustRegister(regionQueryDuration)
prometheus.MustRegister(regionHeartbeatHandleDuration)
prometheus.MustRegister(storeHeartbeatHandleDuration)
prometheus.MustRegister(bucketReportCounter)
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/tests/integrations
go 1.23

replace (
github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20250107061039-28ba77c8a2bc
github.com/tikv/pd => ../../
github.com/tikv/pd/client => ../../client
github.com/tikv/pd/tests/integrations/mcs => ./mcs
Expand Down
Loading

0 comments on commit 16341ff

Please sign in to comment.