Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: separate the metrics package #8833

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"github.com/tikv/pd/client/caller"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/utils/tlsutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -555,7 +556,7 @@
// GetAllMembers gets the members Info from PD.
func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
start := time.Now()
defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -565,7 +566,7 @@
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetMembers(ctx, req)
if err = c.respForErr(cmdFailDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetMembers(), nil
Expand Down Expand Up @@ -683,7 +684,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()

Check warning on line 687 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L687

Added line #L687 was not covered by tests

var resp *pdpb.GetRegionResponse
for _, url := range memberURLs {
Expand All @@ -707,7 +708,7 @@
}

if resp == nil {
cmdFailDurationGetRegion.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationGetRegion.Observe(time.Since(start).Seconds())

Check warning on line 711 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L711

Added line #L711 was not covered by tests
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
Expand All @@ -722,7 +723,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

Expand All @@ -749,7 +750,7 @@
resp, err = protoClient.GetRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
Expand All @@ -762,7 +763,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

Expand All @@ -789,7 +790,7 @@
resp, err = protoClient.GetPrevRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
Expand All @@ -802,7 +803,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

Expand All @@ -829,7 +830,7 @@
resp, err = protoClient.GetRegionByID(cctx, req)
}

if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
Expand All @@ -842,7 +843,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationScanRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationScanRegions.Observe(time.Since(start).Seconds()) }()

Check warning on line 846 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L846

Added line #L846 was not covered by tests

var cancel context.CancelFunc
scanCtx := ctx
Expand Down Expand Up @@ -879,7 +880,7 @@
resp, err = protoClient.ScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {

Check warning on line 883 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L883

Added line #L883 was not covered by tests
return nil, err
}

Expand All @@ -893,7 +894,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }()

var cancel context.CancelFunc
scanCtx := ctx
Expand Down Expand Up @@ -933,7 +934,7 @@
resp, err = protoClient.BatchScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -993,7 +994,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetStore.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetStore.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1007,7 +1008,7 @@
}
resp, err := protoClient.GetStore(ctx, req)

if err = c.respForErr(cmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleStoreResponse(resp)
Expand Down Expand Up @@ -1037,7 +1038,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1051,7 +1052,7 @@
}
resp, err := protoClient.GetAllStores(ctx, req)

if err = c.respForErr(cmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetStores(), nil
Expand All @@ -1064,7 +1065,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1078,7 +1079,7 @@
}
resp, err := protoClient.UpdateGCSafePoint(ctx, req)

if err = c.respForErr(cmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
Expand All @@ -1095,7 +1096,7 @@
}

start := time.Now()
defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1111,7 +1112,7 @@
}
resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req)

if err = c.respForErr(cmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
Expand All @@ -1128,7 +1129,7 @@

func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, group string) error {
start := time.Now()
defer func() { cmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand Down Expand Up @@ -1167,7 +1168,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }()

Check warning on line 1171 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1171

Added line #L1171 was not covered by tests
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
options := &opt.RegionsOp{}
Expand Down Expand Up @@ -1195,7 +1196,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetOperator.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetOperator.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1217,7 +1218,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }()

Check warning on line 1221 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1221

Added line #L1221 was not covered by tests
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
options := &opt.RegionsOp{}
Expand Down Expand Up @@ -1246,7 +1247,7 @@

func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
start := time.Now()
defer func() { cmdDurationScatterRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationScatterRegions.Observe(time.Since(start).Seconds()) }()
options := &opt.RegionsOp{}
for _, opt := range opts {
opt(options)
Expand Down
9 changes: 5 additions & 4 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"go.uber.org/zap"
)

Expand All @@ -39,7 +40,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &pdpb.UpdateGCSafePointV2Request{
Expand All @@ -55,7 +56,7 @@
resp, err := protoClient.UpdateGCSafePointV2(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
Expand All @@ -68,7 +69,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()

Check warning on line 72 in client/gc_client.go

View check run for this annotation

Codecov / codecov/patch

client/gc_client.go#L72

Added line #L72 was not covered by tests

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &pdpb.UpdateServiceSafePointV2Request{
Expand All @@ -85,7 +86,7 @@
}
resp, err := protoClient.UpdateServiceSafePointV2(ctx, req)
cancel()
if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {

Check warning on line 89 in client/gc_client.go

View check run for this annotation

Codecov / codecov/patch

client/gc_client.go#L89

Added line #L89 was not covered by tests
return 0, err
}
return resp.GetMinSafePoint(), nil
Expand Down
3 changes: 2 additions & 1 deletion client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (c *innerClient) close() {
func (c *innerClient) setup() error {
// Init the metrics.
if c.option.InitMetrics {
initAndRegisterMetrics(c.option.MetricsLabels)
metrics.InitAndRegisterMetrics(c.option.MetricsLabels)
}

// Init the client base.
Expand Down
19 changes: 10 additions & 9 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
)

// KeyspaceClient manages keyspace metadata.
Expand Down Expand Up @@ -51,7 +52,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &keyspacepb.LoadKeyspaceRequest{
Header: c.requestHeader(),
Expand All @@ -66,13 +67,13 @@
cancel()

if err != nil {
cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())

Check warning on line 70 in client/keyspace_client.go

View check run for this annotation

Codecov / codecov/patch

client/keyspace_client.go#L70

Added line #L70 was not covered by tests
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Load keyspace %s failed: %s", name, resp.Header.GetError().String())
}

Expand All @@ -95,7 +96,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &keyspacepb.UpdateKeyspaceStateRequest{
Header: c.requestHeader(),
Expand All @@ -111,13 +112,13 @@
cancel()

if err != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())

Check warning on line 115 in client/keyspace_client.go

View check run for this annotation

Codecov / codecov/patch

client/keyspace_client.go#L115

Added line #L115 was not covered by tests
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Update state for keyspace id %d failed: %s", id, resp.Header.GetError().String())
}

Expand All @@ -139,7 +140,7 @@
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &keyspacepb.GetAllKeyspacesRequest{
Header: c.requestHeader(),
Expand All @@ -155,13 +156,13 @@
cancel()

if err != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())

Check warning on line 159 in client/keyspace_client.go

View check run for this annotation

Codecov / codecov/patch

client/keyspace_client.go#L159

Added line #L159 was not covered by tests
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())

Check warning on line 165 in client/keyspace_client.go

View check run for this annotation

Codecov / codecov/patch

client/keyspace_client.go#L165

Added line #L165 was not covered by tests
return nil, errors.Errorf("Get all keyspaces metadata failed: %s", resp.Header.GetError().String())
}

Expand Down
Loading