Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: introduce an independent region client interface #8874

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 17 additions & 82 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

import (
"context"
"encoding/hex"
"fmt"
"net/url"
"runtime/trace"
"strings"
"sync"
Expand All @@ -32,6 +30,7 @@
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/clients/tso"
"github.com/tikv/pd/client/constants"
"github.com/tikv/pd/client/errs"
Expand All @@ -43,15 +42,6 @@
"go.uber.org/zap"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Leader *metapb.Peer
DownPeers []*metapb.Peer
PendingPeers []*metapb.Peer
Buckets *metapb.Buckets
}

// GlobalConfigItem standard format of KV pair in GlobalConfig client
type GlobalConfigItem struct {
EventType pdpb.EventType
Expand All @@ -64,30 +54,6 @@
type RPCClient interface {
// GetAllMembers gets the members Info from PD
GetAllMembers(ctx context.Context) ([]*pdpb.Member, error)
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
// Also, it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -141,6 +107,7 @@
// on your needs.
WithCallerComponent(callerComponent caller.Component) RPCClient

router.Client
tso.Client
metastorage.Client
// KeyspaceClient manages keyspace metadata.
Expand Down Expand Up @@ -214,38 +181,6 @@
SSLKEYBytes []byte
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// NewClient creates a PD client.
func NewClient(
callerComponent caller.Component,
Expand Down Expand Up @@ -634,12 +569,12 @@
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
func handleRegionResponse(res *pdpb.GetRegionResponse) *router.Region {
if res.Region == nil {
return nil
}

r := &Region{
r := &router.Region{
Meta: res.Region,
Leader: res.Leader,
PendingPeers: res.PendingPeers,
Expand All @@ -652,7 +587,7 @@
}

// GetRegionFromMember implements the RPCClient interface.
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*router.Region, error) {

Check warning on line 590 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L590

Added line #L590 was not covered by tests
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -691,7 +626,7 @@
}

// GetRegion implements the RPCClient interface.
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -731,7 +666,7 @@
}

// GetPrevRegion implements the RPCClient interface.
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -771,7 +706,7 @@
}

// GetRegionByID implements the RPCClient interface.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -811,7 +746,7 @@
}

// ScanRegions implements the RPCClient interface.
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {

Check warning on line 749 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L749

Added line #L749 was not covered by tests
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -862,7 +797,7 @@
}

// BatchScanRegions implements the RPCClient interface.
func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) {
func (c *client) BatchScanRegions(ctx context.Context, ranges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -915,10 +850,10 @@
return handleBatchRegionsResponse(resp), nil
}

func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region {
regions := make([]*Region, 0, len(resp.GetRegions()))
func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*router.Region {
regions := make([]*router.Region, 0, len(resp.GetRegions()))
for _, r := range resp.GetRegions() {
region := &Region{
region := &router.Region{
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Expand All @@ -932,21 +867,21 @@
return regions
}

func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
var regions []*Region
func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*router.Region {
var regions []*router.Region

Check warning on line 871 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L870-L871

Added lines #L870 - L871 were not covered by tests
if len(resp.GetRegions()) == 0 {
// Make it compatible with old server.
metas, leaders := resp.GetRegionMetas(), resp.GetLeaders()
for i := range metas {
r := &Region{Meta: metas[i]}
r := &router.Region{Meta: metas[i]}

Check warning on line 876 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L876

Added line #L876 was not covered by tests
if i < len(leaders) {
r.Leader = leaders[i]
}
regions = append(regions, r)
}
} else {
for _, r := range resp.GetRegions() {
region := &Region{
region := &router.Region{

Check warning on line 884 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L884

Added line #L884 was not covered by tests
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Expand Down
93 changes: 93 additions & 0 deletions client/clients/router/router_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package router

import (
"context"
"encoding/hex"
"net/url"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/client/opt"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Leader *metapb.Peer
DownPeers []*metapb.Peer
PendingPeers []*metapb.Peer
Buckets *metapb.Buckets
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// Client defines the interface of a router client, which includes the methods for obtaining the routing information.
type Client interface {
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
// Also, it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
}
2 changes: 1 addition & 1 deletion client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/pdpb"
pd "github.com/tikv/pd/client"
pd "github.com/tikv/pd/client/clients/router"
)

// ServiceSafePoint is the safepoint for a specific service
Expand Down
Loading
Loading