Skip to content

Commit

Permalink
server, core: implement the query region gRPC server (#8979)
Browse files Browse the repository at this point in the history
ref #8690

Implement the query region gRPC server interface.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Jan 17, 2025
1 parent 604b0d6 commit a69ee01
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 9 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 h1:RW/oeRwHxB9pGTtSCgf4wrsHw9RwrUg7+wAQRsW8KfE=
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
117 changes: 117 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
batchSearchSize = 16
// CollectFactor is the factor to collect the count of region.
CollectFactor = 0.9
)
Expand Down Expand Up @@ -1464,6 +1465,122 @@ func (r *RegionsInfo) GetStoreRegions(storeID uint64) []*RegionInfo {
return regions
}

// TODO: benchmark the performance of `QueryRegions`.
// QueryRegions searches RegionInfo from regionTree by keys and IDs in batch.
func (r *RegionsInfo) QueryRegions(
keys, prevKeys [][]byte, ids []uint64, needBuckets bool,
) ([]uint64, []uint64, map[uint64]*pdpb.RegionResponse) {
// Iterate the region keys to find the regions.
regions := r.getRegionsByKeys(keys)
// Assert the returned regions count matches the input keys.
if len(regions) != len(keys) {
panic("returned regions count mismatch with the input keys")
}
// Iterate the prevKeys to find the regions.
prevRegions := r.getRegionsByPrevKeys(prevKeys)
// Assert the returned regions count matches the input keys.
if len(prevRegions) != len(prevKeys) {
panic("returned prev regions count mismatch with the input keys")
}
// Build the key -> ID map for the final results.
regionsByID := make(map[uint64]*pdpb.RegionResponse, len(regions))
keyIDMap := sortOutKeyIDMap(regionsByID, regions, needBuckets)
prevKeyIDMap := sortOutKeyIDMap(regionsByID, prevRegions, needBuckets)
// Iterate the region IDs to find the regions.
for _, id := range ids {
// Check if the region has been found.
if regionFound, ok := regionsByID[id]; (ok && regionFound != nil) || id == 0 {
continue
}
// If the given region ID is not found in the region tree, set the region to nil.
if region := r.GetRegion(id); region == nil {
regionsByID[id] = nil
} else {
regionResp := &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}
if needBuckets {
regionResp.Buckets = region.GetBuckets()
}
regionsByID[id] = regionResp
}
}
return keyIDMap, prevKeyIDMap, regionsByID
}

// getRegionsByKeys searches RegionInfo from regionTree by keys.
func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(keys))
// Split the keys into multiple batches, and search each batch separately.
// This is to avoid the lock contention on the `regionTree`.
for _, batch := range splitKeysIntoBatches(keys) {
r.t.RLock()
results := r.tree.searchByKeys(batch)
r.t.RUnlock()
regions = append(regions, results...)
}
return regions
}

func splitKeysIntoBatches(keys [][]byte) [][][]byte {
keysLen := len(keys)
batches := make([][][]byte, 0, (keysLen+batchSearchSize-1)/batchSearchSize)
for i := 0; i < keysLen; i += batchSearchSize {
end := i + batchSearchSize
if end > keysLen {
end = keysLen
}
batches = append(batches, keys[i:end])
}
return batches
}

func (r *RegionsInfo) getRegionsByPrevKeys(prevKeys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(prevKeys))
for _, batch := range splitKeysIntoBatches(prevKeys) {
r.t.RLock()
results := r.tree.searchByPrevKeys(batch)
r.t.RUnlock()
regions = append(regions, results...)
}
return regions
}

// sortOutKeyIDMap will iterate the regions, convert it to a slice of regionID that corresponds to the input regions.
// It will also update `regionsByID` with the regionID and regionResponse.
func sortOutKeyIDMap(
regionsByID map[uint64]*pdpb.RegionResponse, regions []*RegionInfo, needBuckets bool,
) []uint64 {
keyIDMap := make([]uint64, len(regions))
for idx, region := range regions {
regionID := region.GetMeta().GetId()
keyIDMap[idx] = regionID
// Check if the region has been found.
if regionFound, ok := regionsByID[regionID]; (ok && regionFound != nil) || regionID == 0 {
continue
}
// If the given key is not found in the region tree, set the region to nil.
if region == nil {
regionsByID[regionID] = nil
} else {
regionResp := &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}
if needBuckets {
regionResp.Buckets = region.GetBuckets()
}
regionsByID[regionID] = regionResp
}
}
return keyIDMap
}

// SubTreeRegionType is the type of sub tree region.
type SubTreeRegionType string

Expand Down
88 changes: 88 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,3 +1203,91 @@ func TestScanRegion(t *testing.T) {
re.Len(scanNoError([]byte("a"), []byte("e"), 0), 3)
re.Len(scanNoError([]byte("c"), []byte("e"), 0), 1)
}

func TestQueryRegions(t *testing.T) {
re := require.New(t)
regions := NewRegionsInfo()
regions.CheckAndPutRegion(NewTestRegionInfo(1, 1, []byte("a"), []byte("b")))
regions.CheckAndPutRegion(NewTestRegionInfo(2, 1, []byte("b"), []byte("c")))
regions.CheckAndPutRegion(NewTestRegionInfo(3, 1, []byte("d"), []byte("e")))
// Query regions by keys.
keyIDMap, prevKeyIDMap, regionsByID := regions.QueryRegions(
[][]byte{[]byte("a"), []byte("b"), []byte("c")},
nil,
nil,
false,
)
re.Len(keyIDMap, 3)
re.Empty(prevKeyIDMap)
re.Equal(uint64(1), keyIDMap[0])
re.Equal(uint64(2), keyIDMap[1])
re.Zero(keyIDMap[2]) // The key is not in the region tree, so its ID should be 0.
re.Len(regionsByID, 2)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
// Query regions by IDs.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
nil,
nil,
[]uint64{1, 2, 3},
false,
)
re.Empty(keyIDMap)
re.Empty(prevKeyIDMap)
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
// Query the region that does not exist.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
nil,
nil,
[]uint64{4},
false,
)
re.Empty(keyIDMap)
re.Empty(prevKeyIDMap)
re.Len(regionsByID, 1)
re.Nil(regionsByID[4])
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("c")},
nil,
nil,
false,
)
re.Len(keyIDMap, 1)
re.Empty(prevKeyIDMap)
re.Zero(keyIDMap[0])
re.Empty(regionsByID)
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("c")},
nil,
[]uint64{4},
false,
)
re.Len(keyIDMap, 1)
re.Empty(prevKeyIDMap)
re.Zero(keyIDMap[0])
re.Nil(regionsByID[4])
// Query regions by keys, previous keys and IDs.
keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions(
[][]byte{[]byte("b"), []byte("c")},
[][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")},
[]uint64{1, 3},
false,
)
re.Len(keyIDMap, 2)
re.Len(prevKeyIDMap, 6)
re.Equal(uint64(2), keyIDMap[0])
re.Zero(keyIDMap[1])
re.Zero(prevKeyIDMap[0])
re.Equal(uint64(1), prevKeyIDMap[1])
re.Zero(prevKeyIDMap[2])
re.Zero(prevKeyIDMap[3])
re.Zero(prevKeyIDMap[4])
re.Zero(prevKeyIDMap[5])
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
}
20 changes: 20 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,26 @@ func (t *regionTree) searchPrev(regionKey []byte) *RegionInfo {
return prevRegionItem.RegionInfo
}

// searchByKeys searches the regions by keys and return a slice of `*RegionInfo` whose order is the same as the input keys.
func (t *regionTree) searchByKeys(keys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, len(keys))
// TODO: do we need to deduplicate the input keys?
for idx, key := range keys {
regions[idx] = t.search(key)
}
return regions
}

// searchByPrevKeys searches the regions by prevKeys and return a slice of `*RegionInfo` whose order is the same as the input keys.
func (t *regionTree) searchByPrevKeys(prevKeys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, len(prevKeys))
// TODO: do we need to deduplicate the input keys?
for idx, key := range prevKeys {
regions[idx] = t.searchPrev(key)
}
return regions
}

// find returns the range item contains the start key.
func (t *regionTree) find(item *regionItem) *regionItem {
var result *regionItem
Expand Down
57 changes: 57 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,63 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB
}, nil
}

// QueryRegion provides a stream processing of the region query.
func (s *GrpcServer) QueryRegion(stream pdpb.PD_QueryRegionServer) error {
done, err := s.rateLimitCheck()
if err != nil {
return err
}
if done != nil {
defer done()
}

for {
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}

// TODO: add forwarding logic.

if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId())
}
rc := s.GetRaftCluster()
if rc == nil {
resp := &pdpb.QueryRegionResponse{
Header: notBootstrappedHeader(),
}
if err = stream.Send(resp); err != nil {
return errors.WithStack(err)
}
continue
}
needBuckets := rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets()

start := time.Now()
keyIDMap, prevKeyIDMap, regionsByID := rc.QueryRegions(
request.GetKeys(),
request.GetPrevKeys(),
request.GetIds(),
needBuckets,
)
regionQueryDuration.Observe(time.Since(start).Seconds())
// Build the response and send it to the client.
response := &pdpb.QueryRegionResponse{
Header: wrapHeader(),
KeyIdMap: keyIDMap,
PrevKeyIdMap: prevKeyIDMap,
RegionsById: regionsByID,
}
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
}
}
}

// Deprecated: use BatchScanRegions instead.
// ScanRegions implements gRPC PDServer.
func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
Expand Down
10 changes: 10 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

regionQueryDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "region_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of region query requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

bucketReportLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down Expand Up @@ -172,6 +181,7 @@ func init() {
prometheus.MustRegister(tsoProxyBatchSize)
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
prometheus.MustRegister(tsoHandleDuration)
prometheus.MustRegister(regionQueryDuration)
prometheus.MustRegister(regionHeartbeatHandleDuration)
prometheus.MustRegister(storeHeartbeatHandleDuration)
prometheus.MustRegister(bucketReportCounter)
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/go-sql-driver/mysql v1.7.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 h1:RW/oeRwHxB9pGTtSCgf4wrsHw9RwrUg7+wAQRsW8KfE=
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
2 changes: 1 addition & 1 deletion tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/mattn/go-shellwords v1.0.12
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/common v0.55.0
Expand Down
4 changes: 2 additions & 2 deletions tools/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 h1:RW/oeRwHxB9pGTtSCgf4wrsHw9RwrUg7+wAQRsW8KfE=
github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down

0 comments on commit a69ee01

Please sign in to comment.