Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into evict
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 committed Jan 8, 2025
2 parents c0028dd + adddd4e commit 1d41ba5
Show file tree
Hide file tree
Showing 611 changed files with 7,221 additions and 2,557 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ jobs:
- worker_id: 8
name: 'TSO Integration Test'
- worker_id: 9
name: 'MicroService Integration(!TSO)'
name: 'Microservice Integration(!TSO)'
- worker_id: 10
name: 'MicroService Integration(TSO)'
name: 'Microservice Integration(TSO)'
outputs:
job-total: 10
steps:
Expand Down
8 changes: 8 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ linters:
- protogetter
- reassign
- intrange
- gci
linters-settings:
gocritic:
# Which checks should be disabled; can't be combined with 'enabled-checks'; default is empty
Expand Down Expand Up @@ -233,6 +234,13 @@ linters-settings:
desc: "Use 'sync/atomic' instead of 'go.uber.org/atomic'"
- pkg: github.com/pkg/errors
desc: "Use 'github.com/pingcap/errors' instead of 'github.com/pkg/errors'"
gci:
sections:
- standard
- default
- prefix(github.com/pingcap)
- prefix(github.com/tikv/pd)
- blank
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
Expand Down
70 changes: 36 additions & 34 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/clients/tso"
Expand All @@ -39,7 +42,6 @@ import (
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/utils/tlsutil"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/zap"
)

// GlobalConfigItem standard format of KV pair in GlobalConfig client
Expand Down Expand Up @@ -97,16 +99,6 @@ type RPCClient interface {
// SetExternalTimestamp sets external timestamp
SetExternalTimestamp(ctx context.Context, timestamp uint64) error

// WithCallerComponent returns a new RPCClient with the specified caller
// component. Caller component refers to the specific part or module within
// the process. You can set the component in two ways:
// * Define it manually, like `caller.Component("DDL")`.
// * Use the provided helper function, `caller.GetComponent(upperLayer)`.
// The upperLayer parameter specifies the depth of the caller stack,
// where 0 means the current function. Adjust the upperLayer value based
// on your needs.
WithCallerComponent(callerComponent caller.Component) RPCClient

router.Client
tso.Client
metastorage.Client
Expand All @@ -133,6 +125,15 @@ type Client interface {

// UpdateOption updates the client option.
UpdateOption(option opt.DynamicOption, value any) error
// WithCallerComponent returns a new Client with the specified caller
// component. Caller component refers to the specific part or module within
// the process. You can set the component in two ways:
// * Define it manually, like `caller.Component("DDL")`.
// * Use the provided helper function, `caller.GetComponent(upperLayer)`.
// The upperLayer parameter specifies the depth of the caller stack,
// where 0 means the current function. Adjust the upperLayer value based
// on your needs.
WithCallerComponent(callerComponent caller.Component) Client

// Close closes the client.
Close()
Expand Down Expand Up @@ -359,8 +360,8 @@ func newClientWithKeyspaceName(
c := &client{
callerComponent: adjustCallerComponent(callerComponent),
inner: &innerClient{
// Create a PD service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
// Create a service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the service discovery for the following interactions.
keyspaceID: constants.NullKeyspaceID,
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
Expand All @@ -383,7 +384,7 @@ func newClientWithKeyspaceName(
}
c.inner.keyspaceID = keyspaceMeta.GetId()
// c.keyspaceID is the source of truth for keyspace id.
c.inner.pdSvcDiscovery.SetKeyspaceID(c.inner.keyspaceID)
c.inner.serviceDiscovery.SetKeyspaceID(c.inner.keyspaceID)
return nil
}

Expand Down Expand Up @@ -411,17 +412,17 @@ func (c *client) ResetTSOClient() {

// GetClusterID returns the ClusterID.
func (c *client) GetClusterID(context.Context) uint64 {
return c.inner.pdSvcDiscovery.GetClusterID()
return c.inner.serviceDiscovery.GetClusterID()
}

// GetLeaderURL returns the leader URL.
func (c *client) GetLeaderURL() string {
return c.inner.pdSvcDiscovery.GetServingURL()
return c.inner.serviceDiscovery.GetServingURL()
}

// GetServiceDiscovery returns the client-side service discovery object
func (c *client) GetServiceDiscovery() sd.ServiceDiscovery {
return c.inner.pdSvcDiscovery
return c.inner.serviceDiscovery
}

// UpdateOption updates the client option.
Expand All @@ -437,7 +438,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
}
case opt.EnableTSOFollowerProxy:
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE {
return errors.New("[pd] tso follower proxy is only supported in PD service mode")
return errors.New("[pd] tso follower proxy is only supported in PD mode")
}
enable, ok := value.(bool)
if !ok {
Expand Down Expand Up @@ -484,7 +485,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
serviceClient := c.inner.pdSvcDiscovery.GetServiceClient()
serviceClient := c.inner.serviceDiscovery.GetServiceClient()
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
Expand All @@ -501,10 +502,10 @@ func (c *client) GetTSAsync(ctx context.Context) tso.TSFuture {
return c.inner.dispatchTSORequestWithRetry(ctx)
}

// GetLocalTSAsync implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTSAsync`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
func (c *client) GetLocalTSAsync(ctx context.Context, _ string) tso.TSFuture {
return c.GetTSAsync(ctx)
}
Expand All @@ -515,17 +516,17 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err
return resp.Wait()
}

// GetLocalTS implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTS`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) {
return c.GetTS(ctx)
}

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/API server doesn't support GetMinTS API.
// Handle compatibility issue in case of PD/PD service doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down Expand Up @@ -597,7 +598,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs

var resp *pdpb.GetRegionResponse
for _, url := range memberURLs {
conn, err := c.inner.pdSvcDiscovery.GetOrCreateGRPCConn(url)
conn, err := c.inner.serviceDiscovery.GetOrCreateGRPCConn(url)
if err != nil {
log.Error("[pd] can't get grpc connection", zap.String("member-URL", url), errs.ZapError(err))
continue
Expand All @@ -618,7 +619,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs

if resp == nil {
metrics.CmdFailedDurationGetRegion.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
}
Expand Down Expand Up @@ -730,6 +731,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}

resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
Expand Down Expand Up @@ -1148,7 +1150,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.inner.pdSvcDiscovery.GetClusterID(),
ClusterId: c.inner.serviceDiscovery.GetClusterID(),
CallerId: string(caller.GetCallerID()),
CallerComponent: string(c.callerComponent),
}
Expand Down Expand Up @@ -1332,7 +1334,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
Expand All @@ -1341,7 +1343,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
}

// WithCallerComponent implements the RPCClient interface.
func (c *client) WithCallerComponent(callerComponent caller.Component) RPCClient {
func (c *client) WithCallerComponent(callerComponent caller.Component) Client {
newClient := *c
newClient.callerComponent = callerComponent
return &newClient
Expand Down
3 changes: 2 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/utils/testutil"
"github.com/tikv/pd/client/pkg/utils/tsoutil"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
Expand Down
1 change: 1 addition & 0 deletions client/clients/metastorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"github.com/pingcap/kvproto/pkg/meta_storagepb"

"github.com/tikv/pd/client/opt"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/url"

"github.com/pingcap/kvproto/pkg/metapb"

"github.com/tikv/pd/client/opt"
)

Expand Down
Loading

0 comments on commit 1d41ba5

Please sign in to comment.