Skip to content

Commit

Permalink
Split the keys to search
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jan 17, 2025
1 parent 76c46bd commit 1118cec
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
38 changes: 32 additions & 6 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
batchSearchSize = 16
// CollectFactor is the factor to collect the count of region.
CollectFactor = 0.9
)
Expand Down Expand Up @@ -1512,15 +1513,40 @@ func (r *RegionsInfo) QueryRegions(

// getRegionsByKeys searches RegionInfo from regionTree by keys.
func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.searchByKeys(keys)
regions := make([]*RegionInfo, 0, len(keys))
// Split the keys into multiple batches, and search each batch separately.
// This is to avoid the lock contention on the `regionTree`.
for _, batch := range splitKeysIntoBatches(keys) {
r.t.RLock()
results := r.tree.searchByKeys(batch)
r.t.RUnlock()
regions = append(regions, results...)
}
return regions
}

func splitKeysIntoBatches(keys [][]byte) [][][]byte {
keysLen := len(keys)
batches := make([][][]byte, 0, (keysLen+batchSearchSize-1)/batchSearchSize)
for i := 0; i < keysLen; i += batchSearchSize {
end := i + batchSearchSize
if end > keysLen {
end = keysLen
}
batches = append(batches, keys[i:end])
}
return batches
}

func (r *RegionsInfo) getRegionsByPrevKeys(prevKeys [][]byte) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.searchByPrevKeys(prevKeys)
regions := make([]*RegionInfo, 0, len(prevKeys))
for _, batch := range splitKeysIntoBatches(prevKeys) {
r.t.RLock()
results := r.tree.searchByPrevKeys(batch)
r.t.RUnlock()
regions = append(regions, results...)
}
return regions
}

// sortOutKeyIDMap will iterate the regions, convert it to a slice of regionID that corresponds to the input regions.
Expand Down
17 changes: 6 additions & 11 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,14 +1532,12 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB

// QueryRegion provides a stream processing of the region query.
func (s *GrpcServer) QueryRegion(stream pdpb.PD_QueryRegionServer) error {
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
if done, err := limiter.Allow(fName); err == nil {
defer done()
} else {
return err
}
done, err := s.rateLimitCheck()
if err != nil {
return err
}
if done != nil {
defer done()
}

Check warning on line 1541 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1534-L1541

Added lines #L1534 - L1541 were not covered by tests

for {
Expand All @@ -1553,9 +1551,6 @@ func (s *GrpcServer) QueryRegion(stream pdpb.PD_QueryRegionServer) error {

// TODO: add forwarding logic.

if s.IsClosed() {
return errs.ErrNotStarted
}
if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId())
}
Expand Down

0 comments on commit 1118cec

Please sign in to comment.