Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: introduce QueryRegion stream #8708

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/tikv/pd/client

go 1.23

replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20241015032459-be1e7521da0a

require (
github.com/BurntSushi/toml v0.3.1
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/JmPotato/kvproto v0.0.0-20241015032459-be1e7521da0a h1:GPjVn4UhIJljLcFnLaCaU44NmISKtX12P+VyX29m1c0=
github.com/JmPotato/kvproto v0.0.0-20241015032459-be1e7521da0a/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -46,8 +48,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 h1:6BY+3T6Hqpw9UZ/D7Om/xB+Xik3NkkYxBV6qCzUdUvU=
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go 1.23
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20241015032459-be1e7521da0a

require (
github.com/AlekSi/gocov-xml v1.0.0
Expand Down
1,701 changes: 1,692 additions & 9 deletions go.sum

Large diffs are not rendered by default.

65 changes: 65 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,71 @@
return regions
}

// TODO: benchmark the performance of `QueryRegions` and `getRegionsByKeys`.
// QueryRegions searches RegionInfo from regionTree by keys and IDs in batch.
func (r *RegionsInfo) QueryRegions(
keys [][]byte, ids []uint64,
) ([]uint64, map[uint64]*pdpb.RegionResponse) {
// Iterate the region keys to find the regions.
regions := r.getRegionsByKeys(keys)
// Assert the returned regions count matches the input keys.
if len(regions) != len(keys) {
panic("returned regions count mismatch with the input keys")

Check warning on line 1474 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1474

Added line #L1474 was not covered by tests
}
// Build the key -> ID map for the final results.
var (
keyIDMap = make([]uint64, len(regions))
regionsByID = make(map[uint64]*pdpb.RegionResponse, len(regions))
)
for idx, region := range regions {
regionID := region.GetMeta().GetId()
keyIDMap[idx] = regionID
// Check if the region has been found.
if regionFound, ok := regionsByID[regionID]; (ok && regionFound != nil) || regionID == 0 {
continue
}
// If the given key is not found in the region tree, set the region to nil.
if region == nil {
regionsByID[regionID] = nil

Check warning on line 1490 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1490

Added line #L1490 was not covered by tests
} else {
regionsByID[regionID] = &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
// TODO: get the buckets info.
}
}
}
// Iterate the region IDs to find the regions.
for _, id := range ids {
// Check if the region has been found.
if regionFound, ok := regionsByID[id]; (ok && regionFound != nil) || id == 0 {
continue

Check warning on line 1505 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1505

Added line #L1505 was not covered by tests
}
// If the given region ID is not found in the region tree, set the region to nil.
if region := r.GetRegion(id); region == nil {
regionsByID[id] = nil
} else {
regionsByID[id] = &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
// TODO: get the buckets info.
}
}
}
return keyIDMap, regionsByID
}

// getRegionsByKeys searches RegionInfo from regionTree by keys.
func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.searchByKeys(keys)
}

// SubTreeRegionType is the type of sub tree region.
type SubTreeRegionType string

Expand Down
47 changes: 47 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,3 +1201,50 @@ func TestScanRegion(t *testing.T) {
re.Len(scanNoError([]byte("a"), []byte("e"), 0), 3)
re.Len(scanNoError([]byte("c"), []byte("e"), 0), 1)
}

func TestQueryRegions(t *testing.T) {
re := require.New(t)
regions := NewRegionsInfo()
regions.CheckAndPutRegion(NewTestRegionInfo(1, 1, []byte("a"), []byte("b")))
regions.CheckAndPutRegion(NewTestRegionInfo(2, 1, []byte("b"), []byte("c")))
regions.CheckAndPutRegion(NewTestRegionInfo(3, 1, []byte("d"), []byte("e")))
// Query regions by keys.
keyIDMap, regionsByID := regions.QueryRegions([][]byte{[]byte("a"), []byte("b"), []byte("c")}, nil)
re.Len(keyIDMap, 3)
re.Equal(uint64(1), keyIDMap[0])
re.Equal(uint64(2), keyIDMap[1])
// The key is not in the region tree, so its ID should be 0.
re.Zero(keyIDMap[2])
re.Len(regionsByID, 2)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
// Query regions by IDs.
keyIDMap, regionsByID = regions.QueryRegions(nil, []uint64{1, 2, 3})
re.Empty(keyIDMap)
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
// Query regions by keys and IDs.
keyIDMap, regionsByID = regions.QueryRegions([][]byte{[]byte("b"), []byte("c")}, []uint64{1, 3})
re.Len(keyIDMap, 2)
re.Equal(uint64(2), keyIDMap[0])
re.Zero(keyIDMap[1])
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
// Query the region that does not exist.
keyIDMap, regionsByID = regions.QueryRegions(nil, []uint64{4})
re.Empty(keyIDMap)
re.Len(regionsByID, 1)
re.Nil(regionsByID[4])
keyIDMap, regionsByID = regions.QueryRegions([][]byte{[]byte("c")}, nil)
re.Len(keyIDMap, 1)
re.Zero(keyIDMap[0])
re.Empty(regionsByID)
keyIDMap, regionsByID = regions.QueryRegions([][]byte{[]byte("c")}, []uint64{4})
re.Len(keyIDMap, 1)
re.Zero(keyIDMap[0])
re.Nil(regionsByID[4])
}
10 changes: 10 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ func (t *regionTree) searchPrev(regionKey []byte) *RegionInfo {
return prevRegionItem.RegionInfo
}

// searchByKeys searches the regions by keys and return a slice of `*RegionInfo` whose order is the same as the input keys.
func (t *regionTree) searchByKeys(keys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, len(keys))
// TODO: do we need to deduplicate the input keys?
for idx, key := range keys {
regions[idx] = t.search(key)
}
return regions
}

// find returns the range item contains the start key.
func (t *regionTree) find(item *regionItem) *regionItem {
var result *regionItem
Expand Down
51 changes: 49 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,6 @@
continue
}

start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
Expand All @@ -578,11 +577,13 @@
return status.Errorf(codes.FailedPrecondition,
"mismatch cluster id, need %d but got %d", clusterID, request.GetHeader().GetClusterId())
}

count := request.GetCount()
ctx, task := trace.NewTask(ctx, "tso")
start := time.Now()
ts, err := s.tsoAllocatorManager.HandleRequest(ctx, request.GetDcLocation(), count)
task.End()
tsoHandleDuration.Observe(time.Since(start).Seconds())
task.End()
if err != nil {
return status.Error(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -1576,6 +1577,52 @@
}, nil
}

// QueryRegion provides a stream processing of the region query.
func (s *GrpcServer) QueryRegion(stream pdpb.PD_QueryRegionServer) error {
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
if done, err := limiter.Allow(fName); err == nil {
defer done()
} else {
return err
}

Check warning on line 1589 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1581-L1589

Added lines #L1581 - L1589 were not covered by tests
}

for {
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}

Check warning on line 1599 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1592-L1599

Added lines #L1592 - L1599 were not covered by tests

// TODO: add forwarding function

if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
}
if clusterID := s.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return status.Errorf(codes.FailedPrecondition,
"mismatch cluster id, need %d but got %d", clusterID, request.GetHeader().GetClusterId())
}

Check warning on line 1609 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1603-L1609

Added lines #L1603 - L1609 were not covered by tests

start := time.Now()
keyIDMap, regionsByID := s.GetRaftCluster().QueryRegions(request.GetRegionKeys(), request.GetRegionIds())
regionQueryDuration.Observe(time.Since(start).Seconds())
// Build the response and send it to the client.
response := &pdpb.QueryRegionResponse{
Header: s.header(),
RegionsById: regionsByID,
KeyIdMap: keyIDMap,
}
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
}

Check warning on line 1622 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1611-L1622

Added lines #L1611 - L1622 were not covered by tests
}
}

// Deprecated: use BatchScanRegions instead.
// ScanRegions implements gRPC PDServer.
func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
Expand Down
10 changes: 10 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

regionQueryDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "region_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of region query requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

bucketReportLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down Expand Up @@ -172,6 +181,7 @@ func init() {
prometheus.MustRegister(tsoProxyBatchSize)
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
prometheus.MustRegister(tsoHandleDuration)
prometheus.MustRegister(regionQueryDuration)
prometheus.MustRegister(regionHeartbeatHandleDuration)
prometheus.MustRegister(storeHeartbeatHandleDuration)
prometheus.MustRegister(bucketReportCounter)
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/tests/integrations
go 1.23

replace (
github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20241015032459-be1e7521da0a
github.com/tikv/pd => ../../
github.com/tikv/pd/client => ../../client
github.com/tikv/pd/tests/integrations/mcs => ./mcs
Expand Down
Loading