From 536ae495c57136c8421350895dc04a018a34beb5 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 9 Dec 2024 16:40:33 +0800 Subject: [PATCH] client: introduce an independent region client interface (#8874) ref tikv/pd#8690 Introduce an independent region client interface. Signed-off-by: JmPotato --- client/client.go | 99 ++++-------------------- client/clients/router/router_client.go | 93 ++++++++++++++++++++++ client/http/types.go | 2 +- tests/integrations/client/client_test.go | 33 ++++---- 4 files changed, 128 insertions(+), 99 deletions(-) create mode 100644 client/clients/router/router_client.go diff --git a/client/client.go b/client/client.go index bf982f4fea0..49ce73bf9fb 100644 --- a/client/client.go +++ b/client/client.go @@ -16,9 +16,7 @@ package pd import ( "context" - "encoding/hex" "fmt" - "net/url" "runtime/trace" "strings" "sync" @@ -32,6 +30,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/clients/metastorage" + "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/clients/tso" "github.com/tikv/pd/client/constants" "github.com/tikv/pd/client/errs" @@ -43,15 +42,6 @@ import ( "go.uber.org/zap" ) -// Region contains information of a region's meta and its peers. -type Region struct { - Meta *metapb.Region - Leader *metapb.Peer - DownPeers []*metapb.Peer - PendingPeers []*metapb.Peer - Buckets *metapb.Buckets -} - // GlobalConfigItem standard format of KV pair in GlobalConfig client type GlobalConfigItem struct { EventType pdpb.EventType @@ -64,30 +54,6 @@ type GlobalConfigItem struct { type RPCClient interface { // GetAllMembers gets the members Info from PD GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) - // GetRegion gets a region and its leader Peer from PD by key. - // The region may expire after split. Caller is responsible for caching and - // taking care of region change. - // Also, it may return nil if PD finds no Region for the key temporarily, - // client should retry later. - GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) - // GetRegionFromMember gets a region from certain members. - GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error) - // GetPrevRegion gets the previous region and its leader Peer of the region where the key is located. - GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) - // GetRegionByID gets a region and its leader Peer from PD by id. - GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) - // Deprecated: use BatchScanRegions instead. - // ScanRegions gets a list of regions, starts from the region that contains key. - // Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0. - // If a region has no leader, corresponding leader will be placed by a peer - // with empty value (PeerID is 0). - ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) - // BatchScanRegions gets a list of regions, starts from the region that contains key. - // Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0. - // If a region has no leader, corresponding leader will be placed by a peer - // with empty value (PeerID is 0). - // The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned. - BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) // GetStore gets a store from PD by store id. // The store may expire later. Caller is responsible for caching and taking care // of store change. @@ -141,6 +107,7 @@ type RPCClient interface { // on your needs. WithCallerComponent(callerComponent caller.Component) RPCClient + router.Client tso.Client metastorage.Client // KeyspaceClient manages keyspace metadata. @@ -214,38 +181,6 @@ type SecurityOption struct { SSLKEYBytes []byte } -// KeyRange defines a range of keys in bytes. -type KeyRange struct { - StartKey []byte - EndKey []byte -} - -// NewKeyRange creates a new key range structure with the given start key and end key bytes. -// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex. -// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like: -// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64" -// by using `string()` method. -// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like: -// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64" -// by using `hex.EncodeToString()` method. -func NewKeyRange(startKey, endKey []byte) *KeyRange { - return &KeyRange{startKey, endKey} -} - -// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded. -func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) { - startKeyStr = url.QueryEscape(string(r.StartKey)) - endKeyStr = url.QueryEscape(string(r.EndKey)) - return -} - -// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded. -func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) { - startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey)) - endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey)) - return -} - // NewClient creates a PD client. func NewClient( callerComponent caller.Component, @@ -634,12 +569,12 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e return minTS.Physical, minTS.Logical, nil } -func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { +func handleRegionResponse(res *pdpb.GetRegionResponse) *router.Region { if res.Region == nil { return nil } - r := &Region{ + r := &router.Region{ Meta: res.Region, Leader: res.Leader, PendingPeers: res.PendingPeers, @@ -652,7 +587,7 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { } // GetRegionFromMember implements the RPCClient interface. -func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*Region, error) { +func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*router.Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -691,7 +626,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs } // GetRegion implements the RPCClient interface. -func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) { +func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -731,7 +666,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio } // GetPrevRegion implements the RPCClient interface. -func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) { +func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -771,7 +706,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR } // GetRegionByID implements the RPCClient interface. -func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) { +func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -811,7 +746,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt } // ScanRegions implements the RPCClient interface. -func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) { +func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -862,7 +797,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, } // BatchScanRegions implements the RPCClient interface. -func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) { +func (c *client) BatchScanRegions(ctx context.Context, ranges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -915,10 +850,10 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit return handleBatchRegionsResponse(resp), nil } -func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region { - regions := make([]*Region, 0, len(resp.GetRegions())) +func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*router.Region { + regions := make([]*router.Region, 0, len(resp.GetRegions())) for _, r := range resp.GetRegions() { - region := &Region{ + region := &router.Region{ Meta: r.Region, Leader: r.Leader, PendingPeers: r.PendingPeers, @@ -932,13 +867,13 @@ func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region { return regions } -func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region { - var regions []*Region +func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*router.Region { + var regions []*router.Region if len(resp.GetRegions()) == 0 { // Make it compatible with old server. metas, leaders := resp.GetRegionMetas(), resp.GetLeaders() for i := range metas { - r := &Region{Meta: metas[i]} + r := &router.Region{Meta: metas[i]} if i < len(leaders) { r.Leader = leaders[i] } @@ -946,7 +881,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region { } } else { for _, r := range resp.GetRegions() { - region := &Region{ + region := &router.Region{ Meta: r.Region, Leader: r.Leader, PendingPeers: r.PendingPeers, diff --git a/client/clients/router/router_client.go b/client/clients/router/router_client.go new file mode 100644 index 00000000000..667c82a6805 --- /dev/null +++ b/client/clients/router/router_client.go @@ -0,0 +1,93 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package router + +import ( + "context" + "encoding/hex" + "net/url" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/client/opt" +) + +// Region contains information of a region's meta and its peers. +type Region struct { + Meta *metapb.Region + Leader *metapb.Peer + DownPeers []*metapb.Peer + PendingPeers []*metapb.Peer + Buckets *metapb.Buckets +} + +// KeyRange defines a range of keys in bytes. +type KeyRange struct { + StartKey []byte + EndKey []byte +} + +// NewKeyRange creates a new key range structure with the given start key and end key bytes. +// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex. +// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like: +// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64" +// by using `string()` method. +// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like: +// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64" +// by using `hex.EncodeToString()` method. +func NewKeyRange(startKey, endKey []byte) *KeyRange { + return &KeyRange{startKey, endKey} +} + +// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded. +func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) { + startKeyStr = url.QueryEscape(string(r.StartKey)) + endKeyStr = url.QueryEscape(string(r.EndKey)) + return +} + +// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded. +func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) { + startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey)) + endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey)) + return +} + +// Client defines the interface of a router client, which includes the methods for obtaining the routing information. +type Client interface { + // GetRegion gets a region and its leader Peer from PD by key. + // The region may expire after split. Caller is responsible for caching and + // taking care of region change. + // Also, it may return nil if PD finds no Region for the key temporarily, + // client should retry later. + GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) + // GetRegionFromMember gets a region from certain members. + GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error) + // GetPrevRegion gets the previous region and its leader Peer of the region where the key is located. + GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) + // GetRegionByID gets a region and its leader Peer from PD by id. + GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) + // Deprecated: use BatchScanRegions instead. + // ScanRegions gets a list of regions, starts from the region that contains key. + // Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0. + // If a region has no leader, corresponding leader will be placed by a peer + // with empty value (PeerID is 0). + ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) + // BatchScanRegions gets a list of regions, starts from the region that contains key. + // Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0. + // If a region has no leader, corresponding leader will be placed by a peer + // with empty value (PeerID is 0). + // The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned. + BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) +} diff --git a/client/http/types.go b/client/http/types.go index 4bc60978a0e..cab564e99ac 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -22,7 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/kvproto/pkg/pdpb" - pd "github.com/tikv/pd/client" + pd "github.com/tikv/pd/client/clients/router" ) // ServiceSafePoint is the safepoint for a specific service diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 34430918bcf..0462a6d9ea0 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/pkg/caller" "github.com/tikv/pd/client/pkg/retry" @@ -539,11 +540,11 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1( checkTS(re, cli, lastTS) re.NoError(failpoint.Enable("github.com/tikv/pd/client/responseNil", "return(true)")) - regions, err := cli.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: []byte(""), EndKey: []byte("")}}, 100) + regions, err := cli.BatchScanRegions(ctx, []router.KeyRange{{StartKey: []byte(""), EndKey: []byte("")}}, 100) re.NoError(err) re.Empty(regions) re.NoError(failpoint.Disable("github.com/tikv/pd/client/responseNil")) - regions, err = cli.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: []byte(""), EndKey: []byte("")}}, 100) + regions, err = cli.BatchScanRegions(ctx, []router.KeyRange{{StartKey: []byte(""), EndKey: []byte("")}}, 100) re.NoError(err) re.Len(regions, 1) } @@ -1216,7 +1217,7 @@ func (suite *clientTestSuite) TestScanRegions() { // Wait for region heartbeats. testutil.Eventually(re, func() bool { - scanRegions, err := suite.client.BatchScanRegions(context.Background(), []pd.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10) + scanRegions, err := suite.client.BatchScanRegions(context.Background(), []router.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10) return err == nil && len(scanRegions) == 10 }) @@ -1234,7 +1235,7 @@ func (suite *clientTestSuite) TestScanRegions() { t := suite.T() check := func(start, end []byte, limit int, expect []*metapb.Region) { - scanRegions, err := suite.client.BatchScanRegions(context.Background(), []pd.KeyRange{{StartKey: start, EndKey: end}}, limit) + scanRegions, err := suite.client.BatchScanRegions(context.Background(), []router.KeyRange{{StartKey: start, EndKey: end}}, limit) re.NoError(err) re.Len(scanRegions, len(expect)) t.Log("scanRegions", scanRegions) @@ -1849,7 +1850,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { // Wait for region heartbeats. testutil.Eventually(re, func() bool { - scanRegions, err := suite.client.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10) + scanRegions, err := suite.client.BatchScanRegions(ctx, []router.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10) return err == nil && len(scanRegions) == 10 }) @@ -1871,7 +1872,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { t := suite.T() var outputMustContainAllKeyRangeOptions []bool - check := func(ranges []pd.KeyRange, limit int, expect []*metapb.Region) { + check := func(ranges []router.KeyRange, limit int, expect []*metapb.Region) { for _, bucket := range []bool{false, true} { for _, outputMustContainAllKeyRange := range outputMustContainAllKeyRangeOptions { var opts []opt.GetRegionOption @@ -1917,16 +1918,16 @@ func (suite *clientTestSuite) TestBatchScanRegions() { // valid ranges outputMustContainAllKeyRangeOptions = []bool{false, true} - check([]pd.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10, regions) - check([]pd.KeyRange{{StartKey: []byte{1}, EndKey: nil}}, 5, regions[1:6]) - check([]pd.KeyRange{ + check([]router.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10, regions) + check([]router.KeyRange{{StartKey: []byte{1}, EndKey: nil}}, 5, regions[1:6]) + check([]router.KeyRange{ {StartKey: []byte{0}, EndKey: []byte{1}}, {StartKey: []byte{2}, EndKey: []byte{3}}, {StartKey: []byte{4}, EndKey: []byte{5}}, {StartKey: []byte{6}, EndKey: []byte{7}}, {StartKey: []byte{8}, EndKey: []byte{9}}, }, 10, []*metapb.Region{regions[0], regions[2], regions[4], regions[6], regions[8]}) - check([]pd.KeyRange{ + check([]router.KeyRange{ {StartKey: []byte{0}, EndKey: []byte{1}}, {StartKey: []byte{2}, EndKey: []byte{3}}, {StartKey: []byte{4}, EndKey: []byte{5}}, @@ -1935,7 +1936,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { }, 3, []*metapb.Region{regions[0], regions[2], regions[4]}) outputMustContainAllKeyRangeOptions = []bool{false} - check([]pd.KeyRange{ + check([]router.KeyRange{ {StartKey: []byte{0}, EndKey: []byte{0, 1}}, // non-continuous ranges in a region {StartKey: []byte{0, 2}, EndKey: []byte{0, 3}}, {StartKey: []byte{0, 3}, EndKey: []byte{0, 4}}, @@ -1944,26 +1945,26 @@ func (suite *clientTestSuite) TestBatchScanRegions() { {StartKey: []byte{4}, EndKey: []byte{5}}, }, 10, []*metapb.Region{regions[0], regions[1], regions[2], regions[4]}) outputMustContainAllKeyRangeOptions = []bool{false} - check([]pd.KeyRange{ + check([]router.KeyRange{ {StartKey: []byte{9}, EndKey: []byte{10, 1}}, }, 10, []*metapb.Region{regions[9]}) // invalid ranges _, err := suite.client.BatchScanRegions( ctx, - []pd.KeyRange{{StartKey: []byte{1}, EndKey: []byte{0}}}, + []router.KeyRange{{StartKey: []byte{1}, EndKey: []byte{0}}}, 10, opt.WithOutputMustContainAllKeyRange(), ) re.ErrorContains(err, "invalid key range, start key > end key") - _, err = suite.client.BatchScanRegions(ctx, []pd.KeyRange{ + _, err = suite.client.BatchScanRegions(ctx, []router.KeyRange{ {StartKey: []byte{0}, EndKey: []byte{2}}, {StartKey: []byte{1}, EndKey: []byte{3}}, }, 10) re.ErrorContains(err, "invalid key range, ranges overlapped") _, err = suite.client.BatchScanRegions( ctx, - []pd.KeyRange{{StartKey: []byte{9}, EndKey: []byte{10, 1}}}, + []router.KeyRange{{StartKey: []byte{9}, EndKey: []byte{10, 1}}}, 10, opt.WithOutputMustContainAllKeyRange(), ) @@ -1988,7 +1989,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { testutil.Eventually(re, func() bool { _, err = suite.client.BatchScanRegions( ctx, - []pd.KeyRange{{StartKey: []byte{9}, EndKey: []byte{101}}}, + []router.KeyRange{{StartKey: []byte{9}, EndKey: []byte{101}}}, 10, opt.WithOutputMustContainAllKeyRange(), )