From a69ee01287378a2072e56e2920727f596dd6850c Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 17 Jan 2025 22:10:01 +0800 Subject: [PATCH] server, core: implement the query region gRPC server (#8979) ref tikv/pd#8690 Implement the query region gRPC server interface. Signed-off-by: JmPotato --- go.mod | 2 +- go.sum | 4 +- pkg/core/region.go | 117 ++++++++++++++++++++++++++++++++++++++ pkg/core/region_test.go | 88 ++++++++++++++++++++++++++++ pkg/core/region_tree.go | 20 +++++++ server/grpc_service.go | 57 +++++++++++++++++++ server/metrics.go | 10 ++++ tests/integrations/go.mod | 2 +- tests/integrations/go.sum | 4 +- tools/go.mod | 2 +- tools/go.sum | 4 +- 11 files changed, 301 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index d3e42f3f381..8d0ad90fb11 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 + github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7 diff --git a/go.sum b/go.sum index 97095710805..8850c1a6509 100644 --- a/go.sum +++ b/go.sum @@ -392,8 +392,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 h1:RW/oeRwHxB9pGTtSCgf4wrsHw9RwrUg7+wAQRsW8KfE= -github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E= +github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/core/region.go b/pkg/core/region.go index 706e6bbd712..94fc525f11b 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -47,6 +47,7 @@ import ( const ( randomRegionMaxRetry = 10 scanRegionLimit = 1000 + batchSearchSize = 16 // CollectFactor is the factor to collect the count of region. CollectFactor = 0.9 ) @@ -1464,6 +1465,122 @@ func (r *RegionsInfo) GetStoreRegions(storeID uint64) []*RegionInfo { return regions } +// TODO: benchmark the performance of `QueryRegions`. +// QueryRegions searches RegionInfo from regionTree by keys and IDs in batch. +func (r *RegionsInfo) QueryRegions( + keys, prevKeys [][]byte, ids []uint64, needBuckets bool, +) ([]uint64, []uint64, map[uint64]*pdpb.RegionResponse) { + // Iterate the region keys to find the regions. + regions := r.getRegionsByKeys(keys) + // Assert the returned regions count matches the input keys. + if len(regions) != len(keys) { + panic("returned regions count mismatch with the input keys") + } + // Iterate the prevKeys to find the regions. + prevRegions := r.getRegionsByPrevKeys(prevKeys) + // Assert the returned regions count matches the input keys. + if len(prevRegions) != len(prevKeys) { + panic("returned prev regions count mismatch with the input keys") + } + // Build the key -> ID map for the final results. + regionsByID := make(map[uint64]*pdpb.RegionResponse, len(regions)) + keyIDMap := sortOutKeyIDMap(regionsByID, regions, needBuckets) + prevKeyIDMap := sortOutKeyIDMap(regionsByID, prevRegions, needBuckets) + // Iterate the region IDs to find the regions. + for _, id := range ids { + // Check if the region has been found. + if regionFound, ok := regionsByID[id]; (ok && regionFound != nil) || id == 0 { + continue + } + // If the given region ID is not found in the region tree, set the region to nil. + if region := r.GetRegion(id); region == nil { + regionsByID[id] = nil + } else { + regionResp := &pdpb.RegionResponse{ + Region: region.GetMeta(), + Leader: region.GetLeader(), + DownPeers: region.GetDownPeers(), + PendingPeers: region.GetPendingPeers(), + } + if needBuckets { + regionResp.Buckets = region.GetBuckets() + } + regionsByID[id] = regionResp + } + } + return keyIDMap, prevKeyIDMap, regionsByID +} + +// getRegionsByKeys searches RegionInfo from regionTree by keys. +func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo { + regions := make([]*RegionInfo, 0, len(keys)) + // Split the keys into multiple batches, and search each batch separately. + // This is to avoid the lock contention on the `regionTree`. + for _, batch := range splitKeysIntoBatches(keys) { + r.t.RLock() + results := r.tree.searchByKeys(batch) + r.t.RUnlock() + regions = append(regions, results...) + } + return regions +} + +func splitKeysIntoBatches(keys [][]byte) [][][]byte { + keysLen := len(keys) + batches := make([][][]byte, 0, (keysLen+batchSearchSize-1)/batchSearchSize) + for i := 0; i < keysLen; i += batchSearchSize { + end := i + batchSearchSize + if end > keysLen { + end = keysLen + } + batches = append(batches, keys[i:end]) + } + return batches +} + +func (r *RegionsInfo) getRegionsByPrevKeys(prevKeys [][]byte) []*RegionInfo { + regions := make([]*RegionInfo, 0, len(prevKeys)) + for _, batch := range splitKeysIntoBatches(prevKeys) { + r.t.RLock() + results := r.tree.searchByPrevKeys(batch) + r.t.RUnlock() + regions = append(regions, results...) + } + return regions +} + +// sortOutKeyIDMap will iterate the regions, convert it to a slice of regionID that corresponds to the input regions. +// It will also update `regionsByID` with the regionID and regionResponse. +func sortOutKeyIDMap( + regionsByID map[uint64]*pdpb.RegionResponse, regions []*RegionInfo, needBuckets bool, +) []uint64 { + keyIDMap := make([]uint64, len(regions)) + for idx, region := range regions { + regionID := region.GetMeta().GetId() + keyIDMap[idx] = regionID + // Check if the region has been found. + if regionFound, ok := regionsByID[regionID]; (ok && regionFound != nil) || regionID == 0 { + continue + } + // If the given key is not found in the region tree, set the region to nil. + if region == nil { + regionsByID[regionID] = nil + } else { + regionResp := &pdpb.RegionResponse{ + Region: region.GetMeta(), + Leader: region.GetLeader(), + DownPeers: region.GetDownPeers(), + PendingPeers: region.GetPendingPeers(), + } + if needBuckets { + regionResp.Buckets = region.GetBuckets() + } + regionsByID[regionID] = regionResp + } + } + return keyIDMap +} + // SubTreeRegionType is the type of sub tree region. type SubTreeRegionType string diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 473421b0e52..cfd05b776f2 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -1203,3 +1203,91 @@ func TestScanRegion(t *testing.T) { re.Len(scanNoError([]byte("a"), []byte("e"), 0), 3) re.Len(scanNoError([]byte("c"), []byte("e"), 0), 1) } + +func TestQueryRegions(t *testing.T) { + re := require.New(t) + regions := NewRegionsInfo() + regions.CheckAndPutRegion(NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))) + regions.CheckAndPutRegion(NewTestRegionInfo(2, 1, []byte("b"), []byte("c"))) + regions.CheckAndPutRegion(NewTestRegionInfo(3, 1, []byte("d"), []byte("e"))) + // Query regions by keys. + keyIDMap, prevKeyIDMap, regionsByID := regions.QueryRegions( + [][]byte{[]byte("a"), []byte("b"), []byte("c")}, + nil, + nil, + false, + ) + re.Len(keyIDMap, 3) + re.Empty(prevKeyIDMap) + re.Equal(uint64(1), keyIDMap[0]) + re.Equal(uint64(2), keyIDMap[1]) + re.Zero(keyIDMap[2]) // The key is not in the region tree, so its ID should be 0. + re.Len(regionsByID, 2) + re.Equal(uint64(1), regionsByID[1].GetRegion().GetId()) + re.Equal(uint64(2), regionsByID[2].GetRegion().GetId()) + // Query regions by IDs. + keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions( + nil, + nil, + []uint64{1, 2, 3}, + false, + ) + re.Empty(keyIDMap) + re.Empty(prevKeyIDMap) + re.Len(regionsByID, 3) + re.Equal(uint64(1), regionsByID[1].GetRegion().GetId()) + re.Equal(uint64(2), regionsByID[2].GetRegion().GetId()) + re.Equal(uint64(3), regionsByID[3].GetRegion().GetId()) + // Query the region that does not exist. + keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions( + nil, + nil, + []uint64{4}, + false, + ) + re.Empty(keyIDMap) + re.Empty(prevKeyIDMap) + re.Len(regionsByID, 1) + re.Nil(regionsByID[4]) + keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions( + [][]byte{[]byte("c")}, + nil, + nil, + false, + ) + re.Len(keyIDMap, 1) + re.Empty(prevKeyIDMap) + re.Zero(keyIDMap[0]) + re.Empty(regionsByID) + keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions( + [][]byte{[]byte("c")}, + nil, + []uint64{4}, + false, + ) + re.Len(keyIDMap, 1) + re.Empty(prevKeyIDMap) + re.Zero(keyIDMap[0]) + re.Nil(regionsByID[4]) + // Query regions by keys, previous keys and IDs. + keyIDMap, prevKeyIDMap, regionsByID = regions.QueryRegions( + [][]byte{[]byte("b"), []byte("c")}, + [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")}, + []uint64{1, 3}, + false, + ) + re.Len(keyIDMap, 2) + re.Len(prevKeyIDMap, 6) + re.Equal(uint64(2), keyIDMap[0]) + re.Zero(keyIDMap[1]) + re.Zero(prevKeyIDMap[0]) + re.Equal(uint64(1), prevKeyIDMap[1]) + re.Zero(prevKeyIDMap[2]) + re.Zero(prevKeyIDMap[3]) + re.Zero(prevKeyIDMap[4]) + re.Zero(prevKeyIDMap[5]) + re.Len(regionsByID, 3) + re.Equal(uint64(1), regionsByID[1].GetRegion().GetId()) + re.Equal(uint64(2), regionsByID[2].GetRegion().GetId()) + re.Equal(uint64(3), regionsByID[3].GetRegion().GetId()) +} diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 6efafd133cf..1e0a0d5565d 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -255,6 +255,26 @@ func (t *regionTree) searchPrev(regionKey []byte) *RegionInfo { return prevRegionItem.RegionInfo } +// searchByKeys searches the regions by keys and return a slice of `*RegionInfo` whose order is the same as the input keys. +func (t *regionTree) searchByKeys(keys [][]byte) []*RegionInfo { + regions := make([]*RegionInfo, len(keys)) + // TODO: do we need to deduplicate the input keys? + for idx, key := range keys { + regions[idx] = t.search(key) + } + return regions +} + +// searchByPrevKeys searches the regions by prevKeys and return a slice of `*RegionInfo` whose order is the same as the input keys. +func (t *regionTree) searchByPrevKeys(prevKeys [][]byte) []*RegionInfo { + regions := make([]*RegionInfo, len(prevKeys)) + // TODO: do we need to deduplicate the input keys? + for idx, key := range prevKeys { + regions[idx] = t.searchPrev(key) + } + return regions +} + // find returns the range item contains the start key. func (t *regionTree) find(item *regionItem) *regionItem { var result *regionItem diff --git a/server/grpc_service.go b/server/grpc_service.go index 649d02a37b4..b985e870a03 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1530,6 +1530,63 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB }, nil } +// QueryRegion provides a stream processing of the region query. +func (s *GrpcServer) QueryRegion(stream pdpb.PD_QueryRegionServer) error { + done, err := s.rateLimitCheck() + if err != nil { + return err + } + if done != nil { + defer done() + } + + for { + request, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + // TODO: add forwarding logic. + + if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID { + return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId()) + } + rc := s.GetRaftCluster() + if rc == nil { + resp := &pdpb.QueryRegionResponse{ + Header: notBootstrappedHeader(), + } + if err = stream.Send(resp); err != nil { + return errors.WithStack(err) + } + continue + } + needBuckets := rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() + + start := time.Now() + keyIDMap, prevKeyIDMap, regionsByID := rc.QueryRegions( + request.GetKeys(), + request.GetPrevKeys(), + request.GetIds(), + needBuckets, + ) + regionQueryDuration.Observe(time.Since(start).Seconds()) + // Build the response and send it to the client. + response := &pdpb.QueryRegionResponse{ + Header: wrapHeader(), + KeyIdMap: keyIDMap, + PrevKeyIdMap: prevKeyIDMap, + RegionsById: regionsByID, + } + if err := stream.Send(response); err != nil { + return errors.WithStack(err) + } + } +} + // Deprecated: use BatchScanRegions instead. // ScanRegions implements gRPC PDServer. func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) { diff --git a/server/metrics.go b/server/metrics.go index fdcc5b4be22..dd07447140b 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -99,6 +99,15 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), }) + regionQueryDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "region_query_duration_seconds", + Help: "Bucketed histogram of processing time (s) of region query requests.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }) + bucketReportLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "pd", @@ -172,6 +181,7 @@ func init() { prometheus.MustRegister(tsoProxyBatchSize) prometheus.MustRegister(tsoProxyForwardTimeoutCounter) prometheus.MustRegister(tsoHandleDuration) + prometheus.MustRegister(regionQueryDuration) prometheus.MustRegister(regionHeartbeatHandleDuration) prometheus.MustRegister(storeHeartbeatHandleDuration) prometheus.MustRegister(bucketReportCounter) diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index 432826c52bd..fca5b54bb07 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -14,7 +14,7 @@ require ( github.com/go-sql-driver/mysql v1.7.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 + github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/client_model v0.6.1 diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 9900c796278..c23ee2733a6 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 h1:RW/oeRwHxB9pGTtSCgf4wrsHw9RwrUg7+wAQRsW8KfE= -github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E= +github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/go.mod b/tools/go.mod index 2ccfa4abd60..eb4c32afc13 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -22,7 +22,7 @@ require ( github.com/mattn/go-shellwords v1.0.12 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 + github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/common v0.55.0 diff --git a/tools/go.sum b/tools/go.sum index 93d685c193b..93cbc6736a9 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -386,8 +386,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412 h1:RW/oeRwHxB9pGTtSCgf4wrsHw9RwrUg7+wAQRsW8KfE= -github.com/pingcap/kvproto v0.0.0-20250117013947-1fdf41372412/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E= +github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=