Skip to content

Commit

Permalink
Implement the query region gRPC server interface
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jan 8, 2025
1 parent 973234d commit d272713
Show file tree
Hide file tree
Showing 7 changed files with 1,969 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.23
// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20250107061039-28ba77c8a2bc

require (
github.com/AlekSi/gocov-xml v1.0.0
Expand Down
1,716 changes: 1,707 additions & 9 deletions go.sum

Large diffs are not rendered by default.

91 changes: 91 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,97 @@ func (r *RegionsInfo) GetStoreRegions(storeID uint64) []*RegionInfo {
return regions
}

// TODO: benchmark the performance of `QueryRegions`.
// QueryRegions searches RegionInfo from regionTree by keys and IDs in batch.
func (r *RegionsInfo) QueryRegions(
keys, prevKeys [][]byte, ids []uint64, needBuckets bool,
) ([]uint64, []uint64, map[uint64]*pdpb.RegionResponse) {
// Iterate the region keys to find the regions.
regions := r.getRegionsByKeys(keys)
// Assert the returned regions count matches the input keys.
if len(regions) != len(keys) {
panic("returned regions count mismatch with the input keys")
}
// Iterate the prevKeys to find the regions.
prevRegions := r.getRegionsByPrevKeys(prevKeys)
// Assert the returned regions count matches the input keys.
if len(prevRegions) != len(prevKeys) {
panic("returned prev regions count mismatch with the input keys")
}
// Build the key -> ID map for the final results.
regionsByID := make(map[uint64]*pdpb.RegionResponse, len(regions))
keyIDMap := sortOutKeyIDMap(regionsByID, regions, needBuckets)
prevKeyIDMap := sortOutKeyIDMap(regionsByID, prevRegions, needBuckets)
// Iterate the region IDs to find the regions.
for _, id := range ids {
// Check if the region has been found.
if regionFound, ok := regionsByID[id]; (ok && regionFound != nil) || id == 0 {
continue
}
// If the given region ID is not found in the region tree, set the region to nil.
if region := r.GetRegion(id); region == nil {
regionsByID[id] = nil
} else {
regionResp := &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}
if needBuckets {
regionResp.Buckets = region.GetBuckets()
}
regionsByID[id] = regionResp
}
}
return keyIDMap, prevKeyIDMap, regionsByID
}

// getRegionsByKeys searches RegionInfo from regionTree by keys.
func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.searchByKeys(keys)
}

func (r *RegionsInfo) getRegionsByPrevKeys(prevKeys [][]byte) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.searchByPrevKeys(prevKeys)
}

// sortOutKeyIDMap will iterate the regions, convert it to a slice of regionID that corresponds to the input regions.
// It will also update `regionsByID` with the regionID and regionResponse.
func sortOutKeyIDMap(
regionsByID map[uint64]*pdpb.RegionResponse, regions []*RegionInfo, needBuckets bool,
) []uint64 {
keyIDMap := make([]uint64, len(regions))
for idx, region := range regions {
regionID := region.GetMeta().GetId()
keyIDMap[idx] = regionID
// Check if the region has been found.
if regionFound, ok := regionsByID[regionID]; (ok && regionFound != nil) || regionID == 0 {
continue
}
// If the given key is not found in the region tree, set the region to nil.
if region == nil {
regionsByID[regionID] = nil
} else {
regionResp := &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}
if needBuckets {
regionResp.Buckets = region.GetBuckets()
}
regionsByID[regionID] = regionResp
}
}
return keyIDMap
}

// SubTreeRegionType is the type of sub tree region.
type SubTreeRegionType string

Expand Down
88 changes: 88 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,3 +1203,91 @@ func TestScanRegion(t *testing.T) {
re.Len(scanNoError([]byte("a"), []byte("e"), 0), 3)
re.Len(scanNoError([]byte("c"), []byte("e"), 0), 1)
}

func TestQueryRegions(t *testing.T) {
re := require.New(t)
regions := NewRegionsInfo()
regions.CheckAndPutRegion(NewTestRegionInfo(1, 1, []byte("a"), []byte("b")))
regions.CheckAndPutRegion(NewTestRegionInfo(2, 1, []byte("b"), []byte("c")))
regions.CheckAndPutRegion(NewTestRegionInfo(3, 1, []byte("d"), []byte("e")))
// Query regions by keys.
keyIDMap, prevKeyIDMap, regionsByID := regions.QueryRegions(
[][]byte{[]byte("a"), []byte("b"), []byte("c")},
nil,
nil,
false,
)
re.Len(keyIDMap, 3)
re.Empty(prevKeyIDMap)
re.Equal(uint64(1), keyIDMap[0])
re.Equal(uint64(2), keyIDMap[1])
re.Zero(keyIDMap[2]) // The key is not in the region tree, so its ID should be 0.
re.Len(regionsByID, 2)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
// Query regions by IDs.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
nil,
nil,
[]uint64{1, 2, 3},
false,
)
re.Empty(keyIDMap)
re.Empty(prevKeyIDMap)
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
// Query the region that does not exist.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
nil,
nil,
[]uint64{4},
false,
)
re.Empty(keyIDMap)
re.Empty(prevKeyIDMap)
re.Len(regionsByID, 1)
re.Nil(regionsByID[4])
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("c")},
nil,
nil,
false,
)
re.Len(keyIDMap, 1)
re.Empty(prevKeyIDMap)
re.Zero(keyIDMap[0])
re.Empty(regionsByID)
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("c")},
nil,
[]uint64{4},
false,
)
re.Len(keyIDMap, 1)
re.Empty(prevKeyIDMap)
re.Zero(keyIDMap[0])
re.Nil(regionsByID[4])
// Query regions by keys, previous keys and IDs.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("b"), []byte("c")},
[][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")},
[]uint64{1, 3},
false,
)
re.Len(keyIDMap, 2)
re.Len(prevKeyIDMap, 6)
re.Equal(uint64(2), keyIDMap[0])
re.Zero(keyIDMap[1])
re.Zero(prevKeyIDMap[0])
re.Equal(uint64(1), prevKeyIDMap[1])
re.Zero(prevKeyIDMap[2])
re.Zero(prevKeyIDMap[3])
re.Zero(prevKeyIDMap[4])
re.Zero(prevKeyIDMap[5])
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
}
20 changes: 20 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,26 @@ func (t *regionTree) searchPrev(regionKey []byte) *RegionInfo {
return prevRegionItem.RegionInfo
}

// searchByKeys searches the regions by keys and return a slice of `*RegionInfo` whose order is the same as the input keys.
func (t *regionTree) searchByKeys(keys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, len(keys))
// TODO: do we need to deduplicate the input keys?
for idx, key := range keys {
regions[idx] = t.search(key)
}
return regions
}

// searchByPrevKeys searches the regions by prevKeys and return a slice of `*RegionInfo` whose order is the same as the input keys.
func (t *regionTree) searchByPrevKeys(prevKeys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, len(prevKeys))
// TODO: do we need to deduplicate the input keys?
for idx, key := range prevKeys {
regions[idx] = t.searchPrev(key)
}
return regions
}

// find returns the range item contains the start key.
func (t *regionTree) find(item *regionItem) *regionItem {
var result *regionItem
Expand Down
52 changes: 52 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,58 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB
}, nil
}

// QueryRegion provides a stream processing of the region query.
func (s *GrpcServer) QueryRegion(stream pdpb.PD_QueryRegionServer) error {
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
if done, err := limiter.Allow(fName); err == nil {
defer done()
} else {
return err
}
}

for {
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}

// TODO: add forwarding logic.

if s.IsClosed() {
return errs.ErrNotStarted
}
if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId())
}
needBuckets := s.GetRaftCluster().GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets()

start := time.Now()
keyIDMap, prevKeyIDMap, regionsByID := s.GetRaftCluster().QueryRegions(
request.GetKeys(),
request.GetPrevKeys(),
request.GetIds(),
needBuckets,
)
regionQueryDuration.Observe(time.Since(start).Seconds())
// Build the response and send it to the client.
response := &pdpb.QueryRegionResponse{
Header: wrapHeader(),
KeyIdMap: keyIDMap,
PrevKeyIdMap: prevKeyIDMap,
RegionsById: regionsByID,
}
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
}
}
}

// Deprecated: use BatchScanRegions instead.
// ScanRegions implements gRPC PDServer.
func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
Expand Down
10 changes: 10 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

regionQueryDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "region_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of region query requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

bucketReportLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down Expand Up @@ -172,6 +181,7 @@ func init() {
prometheus.MustRegister(tsoProxyBatchSize)
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
prometheus.MustRegister(tsoHandleDuration)
prometheus.MustRegister(regionQueryDuration)
prometheus.MustRegister(regionHeartbeatHandleDuration)
prometheus.MustRegister(storeHeartbeatHandleDuration)
prometheus.MustRegister(bucketReportCounter)
Expand Down

0 comments on commit d272713

Please sign in to comment.