From 36cc3f41f2d185c473a125067754f294d5039394 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 19 Nov 2024 15:04:02 +0800 Subject: [PATCH] client: split the meta storage client (#8822) ref tikv/pd#8690 Split the meta storage client from the client definitions. Signed-off-by: JmPotato --- client/client.go | 6 +- client/clients/metastorage/client.go | 32 +++++++ client/meta_storage_client.go | 92 ++++--------------- client/opt/option.go | 45 +++++++++ .../resource_group/controller/controller.go | 16 ++-- .../controller/controller_test.go | 10 +- client/resource_manager_client.go | 5 +- client/tso_service_discovery.go | 5 +- tests/integrations/client/client_test.go | 8 +- 9 files changed, 125 insertions(+), 94 deletions(-) create mode 100644 client/clients/metastorage/client.go diff --git a/client/client.go b/client/client.go index 020dbbc700c..8a79e9663c5 100644 --- a/client/client.go +++ b/client/client.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/client/clients/metastorage" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/tlsutil" @@ -143,8 +144,7 @@ type RPCClient interface { // TSOClient is the TSO client. TSOClient - // MetaStorageClient is the meta storage client. - MetaStorageClient + metastorage.Client // KeyspaceClient manages keyspace metadata. KeyspaceClient // GCClient manages gcSafePointV2 and serviceSafePointV2 @@ -553,7 +553,7 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) case pdpb.ServiceMode_API_SVC_MODE: newTSOSvcDiscovery = newTSOServiceDiscovery( - c.ctx, MetaStorageClient(c), c.pdSvcDiscovery, + c.ctx, metastorage.Client(c), c.pdSvcDiscovery, c.keyspaceID, c.tlsCfg, c.option) // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, // and will be updated later. diff --git a/client/clients/metastorage/client.go b/client/clients/metastorage/client.go new file mode 100644 index 00000000000..dba1127f9f5 --- /dev/null +++ b/client/clients/metastorage/client.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metastorage + +import ( + "context" + + "github.com/pingcap/kvproto/pkg/meta_storagepb" + "github.com/tikv/pd/client/opt" +) + +// Client is the interface for meta storage client. +type Client interface { + // Watch watches on a key or prefix. + Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) + // Get gets the value for a key. + Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) + // Put puts a key-value pair into meta storage. + Put(ctx context.Context, key []byte, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) +} diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index fd0e326f500..3cc24f0bece 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -24,19 +24,10 @@ import ( "github.com/pingcap/kvproto/pkg/meta_storagepb" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/grpcutil" ) -// MetaStorageClient is the interface for meta storage client. -type MetaStorageClient interface { - // Watch watches on a key or prefix. - Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error) - // Get gets the value for a key. - Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_storagepb.GetResponse, error) - // Put puts a key-value pair into meta storage. - Put(ctx context.Context, key []byte, value []byte, opts ...OpOption) (*meta_storagepb.PutResponse, error) -} - // metaStorageClient gets the meta storage client from current PD leader. func (c *client) metaStorageClient() meta_storagepb.MetaStorageClient { if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { @@ -45,51 +36,6 @@ func (c *client) metaStorageClient() meta_storagepb.MetaStorageClient { return nil } -// Op represents available options when using meta storage client. -type Op struct { - rangeEnd []byte - revision int64 - prevKv bool - lease int64 - limit int64 - isOptsWithPrefix bool -} - -// OpOption configures etcd Op. -type OpOption func(*Op) - -// WithLimit specifies the limit of the key. -func WithLimit(limit int64) OpOption { - return func(op *Op) { op.limit = limit } -} - -// WithRangeEnd specifies the range end of the key. -func WithRangeEnd(rangeEnd []byte) OpOption { - return func(op *Op) { op.rangeEnd = rangeEnd } -} - -// WithRev specifies the start revision of the key. -func WithRev(revision int64) OpOption { - return func(op *Op) { op.revision = revision } -} - -// WithPrevKV specifies the previous key-value pair of the key. -func WithPrevKV() OpOption { - return func(op *Op) { op.prevKv = true } -} - -// WithLease specifies the lease of the key. -func WithLease(lease int64) OpOption { - return func(op *Op) { op.lease = lease } -} - -// WithPrefix specifies the prefix of the key. -func WithPrefix() OpOption { - return func(op *Op) { - op.isOptsWithPrefix = true - } -} - // See https://github.com/etcd-io/etcd/blob/da4bf0f76fb708e0b57763edb46ba523447b9510/client/v3/op.go#L372-L385 func getPrefix(key []byte) []byte { end := make([]byte, len(key)) @@ -105,8 +51,8 @@ func getPrefix(key []byte) []byte { } // Put implements the MetaStorageClient interface. -func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (*meta_storagepb.PutResponse, error) { - options := &Op{} +func (c *client) Put(ctx context.Context, key, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) { + options := &opt.MetaStorageOp{} for _, opt := range opts { opt(options) } @@ -122,8 +68,8 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) ( req := &meta_storagepb.PutRequest{ Key: key, Value: value, - Lease: options.lease, - PrevKv: options.prevKv, + Lease: options.Lease, + PrevKv: options.PrevKv, } ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL()) cli := c.metaStorageClient() @@ -141,13 +87,13 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) ( } // Get implements the MetaStorageClient interface. -func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_storagepb.GetResponse, error) { - options := &Op{} +func (c *client) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { + options := &opt.MetaStorageOp{} for _, opt := range opts { opt(options) } - if options.isOptsWithPrefix { - options.rangeEnd = getPrefix(key) + if options.IsOptsWithPrefix { + options.RangeEnd = getPrefix(key) } if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -160,9 +106,9 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &meta_storagepb.GetRequest{ Key: key, - RangeEnd: options.rangeEnd, - Limit: options.limit, - Revision: options.revision, + RangeEnd: options.RangeEnd, + Limit: options.Limit, + Revision: options.Revision, } ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL()) cli := c.metaStorageClient() @@ -180,14 +126,14 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s } // Watch implements the MetaStorageClient interface. -func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error) { +func (c *client) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) { eventCh := make(chan []*meta_storagepb.Event, 100) - options := &Op{} + options := &opt.MetaStorageOp{} for _, opt := range opts { opt(options) } - if options.isOptsWithPrefix { - options.rangeEnd = getPrefix(key) + if options.IsOptsWithPrefix { + options.RangeEnd = getPrefix(key) } cli := c.metaStorageClient() @@ -196,9 +142,9 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan } res, err := cli.Watch(ctx, &meta_storagepb.WatchRequest{ Key: key, - RangeEnd: options.rangeEnd, - StartRevision: options.revision, - PrevKv: options.prevKv, + RangeEnd: options.RangeEnd, + StartRevision: options.Revision, + PrevKv: options.PrevKv, }) if err != nil { close(eventCh) diff --git a/client/opt/option.go b/client/opt/option.go index b90ff3a905c..a9f6083484e 100644 --- a/client/opt/option.go +++ b/client/opt/option.go @@ -261,3 +261,48 @@ func WithAllowFollowerHandle() GetRegionOption { func WithOutputMustContainAllKeyRange() GetRegionOption { return func(op *GetRegionOp) { op.OutputMustContainAllKeyRange = true } } + +// MetaStorageOp represents available options when using meta storage client. +type MetaStorageOp struct { + RangeEnd []byte + Revision int64 + PrevKv bool + Lease int64 + Limit int64 + IsOptsWithPrefix bool +} + +// MetaStorageOption configures MetaStorageOp. +type MetaStorageOption func(*MetaStorageOp) + +// WithLimit specifies the limit of the key. +func WithLimit(limit int64) MetaStorageOption { + return func(op *MetaStorageOp) { op.Limit = limit } +} + +// WithRangeEnd specifies the range end of the key. +func WithRangeEnd(rangeEnd []byte) MetaStorageOption { + return func(op *MetaStorageOp) { op.RangeEnd = rangeEnd } +} + +// WithRev specifies the start revision of the key. +func WithRev(revision int64) MetaStorageOption { + return func(op *MetaStorageOp) { op.Revision = revision } +} + +// WithPrevKV specifies the previous key-value pair of the key. +func WithPrevKV() MetaStorageOption { + return func(op *MetaStorageOp) { op.PrevKv = true } +} + +// WithLease specifies the lease of the key. +func WithLease(lease int64) MetaStorageOption { + return func(op *MetaStorageOp) { op.Lease = lease } +} + +// WithPrefix specifies the prefix of the key. +func WithPrefix() MetaStorageOption { + return func(op *MetaStorageOp) { + op.IsOptsWithPrefix = true + } +} diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 49372927442..83bd21b1eed 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -31,7 +31,9 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/metastorage" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -85,11 +87,9 @@ type ResourceGroupProvider interface { ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) - - // meta storage client LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) - Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) - Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) + + metastorage.Client } // ResourceControlCreateOption create a ResourceGroupsController with the optional settings. @@ -270,13 +270,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event if !c.ruConfig.isSingleGroupByKeyspace { // Use WithPrevKV() to get the previous key-value pair when get Delete Event. - watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) } } - watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, opt.WithRev(cfgRevision), opt.WithPrefix()) if err != nil { log.Warn("watch resource group config failed", zap.Error(err)) } @@ -297,7 +297,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case <-watchRetryTimer.C: if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { // Use WithPrevKV() to get the previous key-value pair when get Delete Event. - watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) watchRetryTimer.Reset(watchRetryInterval) @@ -307,7 +307,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } } if watchConfigChannel == nil { - watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, opt.WithRev(cfgRevision), opt.WithPrefix()) if err != nil { log.Warn("watch resource group config failed", zap.Error(err)) watchRetryTimer.Reset(watchRetryInterval) diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index a59be4d5a2d..882f99a6868 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" ) func createTestGroupCostController(re *require.Assertions) *groupCostController { @@ -222,16 +223,21 @@ func (m *MockResourceGroupProvider) LoadResourceGroups(ctx context.Context) ([]* return args.Get(0).([]*rmpb.ResourceGroup), args.Get(1).(int64), args.Error(2) } -func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) { +func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) { args := m.Called(ctx, key, opts) return args.Get(0).(chan []*meta_storagepb.Event), args.Error(1) } -func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) { +func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { args := m.Called(ctx, key, opts) return args.Get(0).(*meta_storagepb.GetResponse), args.Error(1) } +func (m *MockResourceGroupProvider) Put(ctx context.Context, key []byte, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) { + args := m.Called(ctx, key, value, opts) + return args.Get(0).(*meta_storagepb.PutResponse), args.Error(1) +} + func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index cc1739e4097..b59bb2a22d3 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -24,6 +24,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" ) @@ -51,7 +52,7 @@ type ResourceManagerClient interface { DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) - Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error) + Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) } // GetResourceGroupOp represents available options when getting resource group. @@ -193,7 +194,7 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri // LoadResourceGroups implements the ResourceManagerClient interface. func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) { - resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, WithPrefix()) + resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, opt.WithPrefix()) if err != nil { return nil, 0, err } diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index d4c8d22e3fe..7d5b761e68c 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" + "github.com/tikv/pd/client/clients/metastorage" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/grpcutil" @@ -120,7 +121,7 @@ func (t *tsoServerDiscovery) resetFailure() { // tsoServiceDiscovery is the service discovery client of the independent TSO service type tsoServiceDiscovery struct { - metacli MetaStorageClient + metacli metastorage.Client apiSvcDiscovery ServiceDiscovery clusterID uint64 keyspaceID atomic.Uint32 @@ -155,7 +156,7 @@ type tsoServiceDiscovery struct { // newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. func newTSOServiceDiscovery( - ctx context.Context, metacli MetaStorageClient, apiSvcDiscovery ServiceDiscovery, + ctx context.Context, metacli metastorage.Client, apiSvcDiscovery ServiceDiscovery, keyspaceID uint32, tlsCfg *tls.Config, option *opt.Option, ) ServiceDiscovery { ctx, cancel := context.WithCancel(ctx) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 53eed7bdc84..118512b88b2 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -1622,7 +1622,7 @@ func TestWatch(t *testing.T) { resp, err := client.Get(ctx, []byte(key)) re.NoError(err) rev := resp.GetHeader().GetRevision() - ch, err := client.Watch(ctx, []byte(key), pd.WithRev(rev)) + ch, err := client.Watch(ctx, []byte(key), opt.WithRev(rev)) re.NoError(err) exit := make(chan struct{}) go func() { @@ -1669,7 +1669,7 @@ func TestPutGet(t *testing.T) { re.NoError(err) re.Equal([]byte("1"), getResp.GetKvs()[0].Value) re.NotEqual(0, getResp.GetHeader().GetRevision()) - putResp, err = client.Put(context.Background(), key, []byte("2"), pd.WithPrevKV()) + putResp, err = client.Put(context.Background(), key, []byte("2"), opt.WithPrevKV()) re.NoError(err) re.Equal([]byte("1"), putResp.GetPrevKv().Value) getResp, err = client.Get(context.Background(), key) @@ -1709,7 +1709,7 @@ func TestClientWatchWithRevision(t *testing.T) { // Mock get revision by loading r, err := s.GetEtcdClient().Put(context.Background(), watchPrefix+"test", "test") re.NoError(err) - res, err := client.Get(context.Background(), []byte(watchPrefix), pd.WithPrefix()) + res, err := client.Get(context.Background(), []byte(watchPrefix), opt.WithPrefix()) re.NoError(err) re.Len(res.Kvs, 1) re.LessOrEqual(r.Header.GetRevision(), res.GetHeader().GetRevision()) @@ -1720,7 +1720,7 @@ func TestClientWatchWithRevision(t *testing.T) { re.NoError(err) } // Start watcher at next revision - ch, err := client.Watch(context.Background(), []byte(watchPrefix), pd.WithRev(res.GetHeader().GetRevision()), pd.WithPrefix(), pd.WithPrevKV()) + ch, err := client.Watch(context.Background(), []byte(watchPrefix), opt.WithRev(res.GetHeader().GetRevision()), opt.WithPrefix(), opt.WithPrevKV()) re.NoError(err) // Mock delete for i := range 3 {