Skip to content

Commit

Permalink
client: split the meta storage client (#8822)
Browse files Browse the repository at this point in the history
ref #8690

Split the meta storage client from the client definitions.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Nov 19, 2024
1 parent 40fe83e commit 36cc3f4
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 94 deletions.
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/utils/tlsutil"
Expand Down Expand Up @@ -143,8 +144,7 @@ type RPCClient interface {

// TSOClient is the TSO client.
TSOClient
// MetaStorageClient is the meta storage client.
MetaStorageClient
metastorage.Client
// KeyspaceClient manages keyspace metadata.
KeyspaceClient
// GCClient manages gcSafePointV2 and serviceSafePointV2
Expand Down Expand Up @@ -553,7 +553,7 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(
c.ctx, MetaStorageClient(c), c.pdSvcDiscovery,
c.ctx, metastorage.Client(c), c.pdSvcDiscovery,
c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
Expand Down
32 changes: 32 additions & 0 deletions client/clients/metastorage/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metastorage

import (
"context"

"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/tikv/pd/client/opt"
)

// Client is the interface for meta storage client.
type Client interface {
// Watch watches on a key or prefix.
Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error)
// Get gets the value for a key.
Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error)
// Put puts a key-value pair into meta storage.
Put(ctx context.Context, key []byte, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error)
}
92 changes: 19 additions & 73 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,10 @@ import (
"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/utils/grpcutil"
)

// MetaStorageClient is the interface for meta storage client.
type MetaStorageClient interface {
// Watch watches on a key or prefix.
Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error)
// Get gets the value for a key.
Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_storagepb.GetResponse, error)
// Put puts a key-value pair into meta storage.
Put(ctx context.Context, key []byte, value []byte, opts ...OpOption) (*meta_storagepb.PutResponse, error)
}

// metaStorageClient gets the meta storage client from current PD leader.
func (c *client) metaStorageClient() meta_storagepb.MetaStorageClient {
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
Expand All @@ -45,51 +36,6 @@ func (c *client) metaStorageClient() meta_storagepb.MetaStorageClient {
return nil
}

// Op represents available options when using meta storage client.
type Op struct {
rangeEnd []byte
revision int64
prevKv bool
lease int64
limit int64
isOptsWithPrefix bool
}

// OpOption configures etcd Op.
type OpOption func(*Op)

// WithLimit specifies the limit of the key.
func WithLimit(limit int64) OpOption {
return func(op *Op) { op.limit = limit }
}

// WithRangeEnd specifies the range end of the key.
func WithRangeEnd(rangeEnd []byte) OpOption {
return func(op *Op) { op.rangeEnd = rangeEnd }
}

// WithRev specifies the start revision of the key.
func WithRev(revision int64) OpOption {
return func(op *Op) { op.revision = revision }
}

// WithPrevKV specifies the previous key-value pair of the key.
func WithPrevKV() OpOption {
return func(op *Op) { op.prevKv = true }
}

// WithLease specifies the lease of the key.
func WithLease(lease int64) OpOption {
return func(op *Op) { op.lease = lease }
}

// WithPrefix specifies the prefix of the key.
func WithPrefix() OpOption {
return func(op *Op) {
op.isOptsWithPrefix = true
}
}

// See https://github.com/etcd-io/etcd/blob/da4bf0f76fb708e0b57763edb46ba523447b9510/client/v3/op.go#L372-L385
func getPrefix(key []byte) []byte {
end := make([]byte, len(key))
Expand All @@ -105,8 +51,8 @@ func getPrefix(key []byte) []byte {
}

// Put implements the MetaStorageClient interface.
func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (*meta_storagepb.PutResponse, error) {
options := &Op{}
func (c *client) Put(ctx context.Context, key, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) {
options := &opt.MetaStorageOp{}
for _, opt := range opts {
opt(options)
}
Expand All @@ -122,8 +68,8 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
req := &meta_storagepb.PutRequest{
Key: key,
Value: value,
Lease: options.lease,
PrevKv: options.prevKv,
Lease: options.Lease,
PrevKv: options.PrevKv,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL())
cli := c.metaStorageClient()
Expand All @@ -141,13 +87,13 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
}

// Get implements the MetaStorageClient interface.
func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_storagepb.GetResponse, error) {
options := &Op{}
func (c *client) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) {
options := &opt.MetaStorageOp{}
for _, opt := range opts {
opt(options)
}
if options.isOptsWithPrefix {
options.rangeEnd = getPrefix(key)
if options.IsOptsWithPrefix {
options.RangeEnd = getPrefix(key)
}

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand All @@ -160,9 +106,9 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
ctx, cancel := context.WithTimeout(ctx, c.option.Timeout)
req := &meta_storagepb.GetRequest{
Key: key,
RangeEnd: options.rangeEnd,
Limit: options.limit,
Revision: options.revision,
RangeEnd: options.RangeEnd,
Limit: options.Limit,
Revision: options.Revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL())
cli := c.metaStorageClient()
Expand All @@ -180,14 +126,14 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
}

// Watch implements the MetaStorageClient interface.
func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error) {
func (c *client) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) {
eventCh := make(chan []*meta_storagepb.Event, 100)
options := &Op{}
options := &opt.MetaStorageOp{}
for _, opt := range opts {
opt(options)
}
if options.isOptsWithPrefix {
options.rangeEnd = getPrefix(key)
if options.IsOptsWithPrefix {
options.RangeEnd = getPrefix(key)
}

cli := c.metaStorageClient()
Expand All @@ -196,9 +142,9 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan
}
res, err := cli.Watch(ctx, &meta_storagepb.WatchRequest{
Key: key,
RangeEnd: options.rangeEnd,
StartRevision: options.revision,
PrevKv: options.prevKv,
RangeEnd: options.RangeEnd,
StartRevision: options.Revision,
PrevKv: options.PrevKv,
})
if err != nil {
close(eventCh)
Expand Down
45 changes: 45 additions & 0 deletions client/opt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,48 @@ func WithAllowFollowerHandle() GetRegionOption {
func WithOutputMustContainAllKeyRange() GetRegionOption {
return func(op *GetRegionOp) { op.OutputMustContainAllKeyRange = true }
}

// MetaStorageOp represents available options when using meta storage client.
type MetaStorageOp struct {
RangeEnd []byte
Revision int64
PrevKv bool
Lease int64
Limit int64
IsOptsWithPrefix bool
}

// MetaStorageOption configures MetaStorageOp.
type MetaStorageOption func(*MetaStorageOp)

// WithLimit specifies the limit of the key.
func WithLimit(limit int64) MetaStorageOption {
return func(op *MetaStorageOp) { op.Limit = limit }
}

// WithRangeEnd specifies the range end of the key.
func WithRangeEnd(rangeEnd []byte) MetaStorageOption {
return func(op *MetaStorageOp) { op.RangeEnd = rangeEnd }
}

// WithRev specifies the start revision of the key.
func WithRev(revision int64) MetaStorageOption {
return func(op *MetaStorageOp) { op.Revision = revision }
}

// WithPrevKV specifies the previous key-value pair of the key.
func WithPrevKV() MetaStorageOption {
return func(op *MetaStorageOp) { op.PrevKv = true }
}

// WithLease specifies the lease of the key.
func WithLease(lease int64) MetaStorageOption {
return func(op *MetaStorageOp) { op.Lease = lease }
}

// WithPrefix specifies the prefix of the key.
func WithPrefix() MetaStorageOption {
return func(op *MetaStorageOp) {
op.IsOptsWithPrefix = true
}
}
16 changes: 8 additions & 8 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -85,11 +87,9 @@ type ResourceGroupProvider interface {
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)

// meta storage client
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error)
Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error)

metastorage.Client
}

// ResourceControlCreateOption create a ResourceGroupsController with the optional settings.
Expand Down Expand Up @@ -270,13 +270,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
}

watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, opt.WithRev(cfgRevision), opt.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
}
Expand All @@ -297,7 +297,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
Expand All @@ -307,7 +307,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
}
if watchConfigChannel == nil {
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, opt.WithRev(cfgRevision), opt.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
Expand Down
10 changes: 8 additions & 2 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/opt"
)

func createTestGroupCostController(re *require.Assertions) *groupCostController {
Expand Down Expand Up @@ -222,16 +223,21 @@ func (m *MockResourceGroupProvider) LoadResourceGroups(ctx context.Context) ([]*
return args.Get(0).([]*rmpb.ResourceGroup), args.Get(1).(int64), args.Error(2)
}

func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) {
func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) {
args := m.Called(ctx, key, opts)
return args.Get(0).(chan []*meta_storagepb.Event), args.Error(1)
}

func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) {
func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) {
args := m.Called(ctx, key, opts)
return args.Get(0).(*meta_storagepb.GetResponse), args.Error(1)
}

func (m *MockResourceGroupProvider) Put(ctx context.Context, key []byte, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) {
args := m.Called(ctx, key, value, opts)
return args.Get(0).(*meta_storagepb.PutResponse), args.Error(1)
}

func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
5 changes: 3 additions & 2 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -51,7 +52,7 @@ type ResourceManagerClient interface {
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error)
Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error)
}

// GetResourceGroupOp represents available options when getting resource group.
Expand Down Expand Up @@ -193,7 +194,7 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri

// LoadResourceGroups implements the ResourceManagerClient interface.
func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) {
resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, WithPrefix())
resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, opt.WithPrefix())
if err != nil {
return nil, 0, err
}
Expand Down
Loading

0 comments on commit 36cc3f4

Please sign in to comment.