diff --git a/internal/locate/pd_codec.go b/internal/locate/pd_codec.go index 6041ad527d..b0c81097a3 100644 --- a/internal/locate/pd_codec.go +++ b/internal/locate/pd_codec.go @@ -44,6 +44,7 @@ import ( pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" ) var _ pd.Client = &CodecPDClient{} @@ -57,7 +58,7 @@ type CodecPDClient struct { // NewCodecPDClient creates a CodecPDClient in API v1. func NewCodecPDClient(mode apicodec.Mode, client pd.Client) *CodecPDClient { codec := apicodec.NewCodecV1(mode) - return &CodecPDClient{client, codec} + return &CodecPDClient{client.WithCallerComponent("codec-pd-client"), codec} } // NewCodecPDClientWithKeyspace creates a CodecPDClient in API v2 with keyspace name. @@ -71,7 +72,7 @@ func NewCodecPDClientWithKeyspace(mode apicodec.Mode, client pd.Client, keyspace return nil, err } - return &CodecPDClient{client, codec}, nil + return &CodecPDClient{client.WithCallerComponent("codec-pd-client"), codec}, nil } // GetKeyspaceID attempts to retrieve keyspace ID corresponding to the given keyspace name from PD. @@ -202,3 +203,8 @@ func (c *CodecPDClient) decodeRegionKeyInPlace(r *router.Region) error { } return err } + +// WithCallerComponent returns a new PD client with the specified caller component. +func (c *CodecPDClient) WithCallerComponent(component caller.Component) pd.Client { + return &CodecPDClient{c.Client.WithCallerComponent(component), c.codec} +} diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 7fa2e99a9c..90a4f19003 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -668,7 +668,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { } c := &RegionCache{ - pdClient: pdClient, + pdClient: pdClient.WithCallerComponent("region-cache"), requestHealthFeedbackCallback: options.requestHealthFeedbackCallback, } diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 7d4583ff22..b60481421a 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -71,7 +71,7 @@ type storeCache interface { } func newStoreCache(pdClient pd.Client) *storeCacheImpl { - c := &storeCacheImpl{pdClient: pdClient} + c := &storeCacheImpl{pdClient: pdClient.WithCallerComponent("store-cache")} c.notifyCheckCh = make(chan struct{}, 1) c.storeMu.stores = make(map[uint64]*Store) c.tiflashComputeStoreMu.needReload = true diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 60f0ee7cb3..afa6bde919 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -179,7 +179,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e } o := &pdOracle{ - c: pdClient, + c: pdClient.WithCallerComponent("oracle"), quit: make(chan struct{}), lastTSUpdateInterval: atomic.Int64{}, } diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index 01c67c5e46..3e3e09b110 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -45,6 +45,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/caller" ) func TestPDOracle_UntilExpired(t *testing.T) { @@ -87,6 +88,12 @@ func (c *MockPdClient) GetTS(ctx context.Context) (int64, int64, error) { return 0, c.logicalTimestamp.Add(1), nil } +func (c *MockPdClient) WithCallerComponent(component caller.Component) pd.Client { + client := &MockPdClient{Client: c.Client.WithCallerComponent(component)} + client.logicalTimestamp.Store(c.logicalTimestamp.Load()) + return client +} + func TestPdOracle_SetLowResolutionTimestampUpdateInterval(t *testing.T) { pdClient := MockPdClient{} o := NewPdOracleWithClient(&pdClient) diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index da59325b95..91f40780e9 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -69,6 +69,7 @@ const ( rawBatchPutSize = 16 * 1024 // rawBatchPairCount is the maximum limit for rawkv each batch get/delete request. rawBatchPairCount = 512 + componentName = caller.Component("rawkv-client-go") ) type rawOptions struct { @@ -204,7 +205,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) } // Use an unwrapped PDClient to obtain keyspace meta. - pdCli, err := pd.NewClientWithContext(ctx, caller.Component("rawkv-client-go"), pdAddrs, pd.SecurityOption{ + pdCli, err := pd.NewClientWithContext(ctx, componentName, pdAddrs, pd.SecurityOption{ CAPath: opt.security.ClusterSSLCA, CertPath: opt.security.ClusterSSLCert, KeyPath: opt.security.ClusterSSLKey, @@ -240,7 +241,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) apiVersion: opt.apiVersion, clusterID: pdCli.GetClusterID(ctx), regionCache: locate.NewRegionCache(pdCli), - pdClient: pdCli, + pdClient: pdCli.WithCallerComponent(componentName), rpcClient: rpcCli, }, nil } diff --git a/tikv/kv.go b/tikv/kv.go index 8973143504..56a06cbad9 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -275,7 +275,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl clusterID: pdClient.GetClusterID(context.TODO()), uuid: uuid, oracle: o, - pdClient: pdClient, + pdClient: pdClient.WithCallerComponent("kv-store"), regionCache: regionCache, kv: spkv, safePoint: 0, @@ -906,7 +906,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...opt.C if err != nil { return nil, errors.WithStack(err) } - pdCli = util.InterceptedPDClient{Client: pdCli} + pdCli = util.NewInterceptedPDClient(pdCli) uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO())) tlsConfig, err := security.ToTLSConfig() diff --git a/txnkv/client.go b/txnkv/client.go index aa37333598..bfcaad47a7 100644 --- a/txnkv/client.go +++ b/txnkv/client.go @@ -76,7 +76,7 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) { return nil, errors.WithStack(err) } - pdClient = util.InterceptedPDClient{Client: pdClient} + pdClient = util.NewInterceptedPDClient(pdClient) // Construct codec from options. var codecCli *tikv.CodecPDClient diff --git a/util/pd_interceptor.go b/util/pd_interceptor.go index 41eaa661fc..614fb27520 100644 --- a/util/pd_interceptor.go +++ b/util/pd_interceptor.go @@ -44,6 +44,7 @@ import ( "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/clients/tso" "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" ) var ( @@ -64,6 +65,10 @@ type InterceptedPDClient struct { pd.Client } +func NewInterceptedPDClient(client pd.Client) *InterceptedPDClient { + return &InterceptedPDClient{client.WithCallerComponent("intercepted-pd-client")} +} + // interceptedTsFuture is a PD's wrapper future to record stmt detail. type interceptedTsFuture struct { tso.TSFuture @@ -137,3 +142,8 @@ func (m InterceptedPDClient) GetStore(ctx context.Context, storeID uint64) (*met recordPDWaitTime(ctx, start) return s, err } + +// WithCallerComponent implements pd.Client#WithCallerComponent. +func (m InterceptedPDClient) WithCallerComponent(component caller.Component) pd.Client { + return NewInterceptedPDClient(m.Client.WithCallerComponent(component)) +}