Skip to content

Commit

Permalink
unify the grpc errors
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Dec 24, 2024
1 parent 461b86a commit b1c5919
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 142 deletions.
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,9 @@ error = '''
init file log error, %s
'''

["PD:mcs:ErrNotFoundSchedulingAddr"]
["PD:mcs:ErrNotFoundSchedulingPrimary"]
error = '''
cannot find scheduling address
cannot find scheduling primary address
'''

["PD:mcs:ErrSchedulingServer"]
Expand Down
66 changes: 63 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package errs

import "github.com/pingcap/errors"
import (
"github.com/pingcap/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
Expand All @@ -31,6 +35,62 @@ const (
NotServedErr = "is not served"
)

// gRPC errors
var (
// Canceled indicates the operation was canceled (typically by the caller).
ErrStreamClosed = status.Error(codes.Canceled, "stream is closed")

// Unknown error. An example of where this error may be returned is
// if a Status value received from another address space belongs to
// an error-space that is not known in this address space. Also
// errors raised by APIs that do not return enough error information
// may be converted to this error.
ErrUnknown = func(err error) error {
return status.Error(codes.Unknown, err.Error())
}

// DeadlineExceeded means operation expired before completion.
// For operations that change the state of the system, this error may be
// returned even if the operation has completed successfully. For
// example, a successful response from a server could have been delayed
// long enough for the deadline to expire.
ErrForwardTSOTimeout = status.Error(codes.DeadlineExceeded, "forward tso request timeout")
ErrTSOProxyRecvFromClientTimeout = status.Error(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
ErrSendHeartbeatTimeout = status.Error(codes.DeadlineExceeded, "send heartbeat timeout")

// NotFound means some requested entity (e.g., file or directory) was
// not found.
ErrNotFoundTSOAddr = status.Error(codes.NotFound, "not found tso address")
ErrNotFoundSchedulingAddr = status.Error(codes.NotFound, "not found scheduling address")
ErrNotFoundService = status.Error(codes.NotFound, "not found service")

// ResourceExhausted indicates some resource has been exhausted, perhaps
// a per-user quota, or perhaps the entire file system is out of space.
ErrMaxCountTSOProxyRoutinesExceeded = status.Error(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrGRPCRateLimitExceeded = func(err error) error {
return status.Error(codes.ResourceExhausted, err.Error())
}

// FailedPrecondition indicates operation was rejected because the
// system is not in a state required for the operation's execution.
// For example, directory to be deleted may be non-empty, an rmdir
// operation is applied to a non-directory, etc.
ErrMismatchClusterID = func(clusterID, requestClusterID uint64) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", clusterID, requestClusterID)
}

// Unavailable indicates the service is currently unavailable.
// This is a most likely a transient condition and may be corrected
// by retrying with a backoff. Note that it is not always safe to retry
// non-idempotent operations.
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Error(codes.Unavailable, "not leader")
ErrNotStarted = status.Error(codes.Unavailable, "server not started")
ErrEtcdNotStarted = status.Error(codes.Unavailable, "server is started, but etcd not started")
ErrFollowerHandlingNotAllowed = status.Error(codes.Unavailable, "not leader and follower handling not allowed")
)

// common error in multiple packages
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:common:ErrGetSourceStore"))
Expand Down Expand Up @@ -484,6 +544,6 @@ var (

// Micro service errors
var (
ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
ErrNotFoundSchedulingPrimary = errors.Normalize("cannot find scheduling primary address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingPrimary"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)
10 changes: 2 additions & 8 deletions pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
)

var (
// errNotLeader is returned when current server is not the leader.
errNotLeader = status.Errorf(codes.Unavailable, "not leader")
)

var _ meta_storagepb.MetaStorageServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -81,7 +75,7 @@ func (*Service) RegisterRESTHandler(_ map[string]http.Handler) error {

func (s *Service) checkServing() error {
if s.manager == nil || s.manager.srv == nil || !s.manager.srv.IsServing() {
return errNotLeader
return errs.ErrNotLeader
}
return nil
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,18 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
)

var (
// errNotLeader is returned when current server is not the leader.
errNotLeader = status.Errorf(codes.Unavailable, "not leader")
)

var _ rmpb.ResourceManagerServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -89,7 +83,7 @@ func (s *Service) GetManager() *Manager {

func (s *Service) checkServing() error {
if s.manager == nil || s.manager.srv == nil || !s.manager.srv.IsServing() {
return errNotLeader
return errs.ErrNotLeader
}
return nil
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand All @@ -41,12 +39,6 @@ import (
"github.com/tikv/pd/pkg/versioninfo"
)

// gRPC errors
var (
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched")
)

// SetUpRestHandler is a hook to sets up the REST service.
var SetUpRestHandler = func(*Service) (http.Handler, apiutil.APIServiceGroup) {
return dummyRestService{}, apiutil.APIServiceGroup{}
Expand Down Expand Up @@ -107,7 +99,7 @@ func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error {
return errors.WithStack(err)
case <-timer.C:
atomic.StoreInt32(&s.closed, 1)
return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
return errs.ErrSendHeartbeatTimeout
}
}

Expand Down
21 changes: 6 additions & 15 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,18 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
)

// gRPC errors
var (
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched")
)

var _ tsopb.TSOServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -102,14 +95,12 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
return errs.ErrNotStarted
}
header := request.GetHeader()
clusterID := header.GetClusterId()
if clusterID != keypath.ClusterID() {
return status.Errorf(
codes.FailedPrecondition, "mismatch cluster id, need %d but got %d",
keypath.ClusterID(), clusterID)
return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID)
}
keyspaceID := header.GetKeyspaceId()
keyspaceGroupID := header.GetKeyspaceGroupId()
Expand All @@ -119,7 +110,7 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
keyspaceID, keyspaceGroupID,
count)
if err != nil {
return status.Error(codes.Unknown, err.Error())
return errs.ErrUnknown(err)
}
keyspaceGroupIDStr := strconv.FormatUint(uint64(keyspaceGroupID), 10)
tsoHandleDuration.WithLabelValues(keyspaceGroupIDStr).Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -220,10 +211,10 @@ func (s *Service) GetMinTS(

func (s *Service) validRequest(header *tsopb.RequestHeader) (tsopb.ErrorType, error) {
if s.IsClosed() || s.keyspaceGroupManager == nil {
return tsopb.ErrorType_NOT_BOOTSTRAPPED, ErrNotStarted
return tsopb.ErrorType_NOT_BOOTSTRAPPED, errs.ErrNotStarted
}
if header == nil || header.GetClusterId() != keypath.ClusterID() {
return tsopb.ErrorType_CLUSTER_MISMATCHED, ErrClusterMismatched
return tsopb.ErrorType_CLUSTER_MISMATCHED, errs.ErrMismatchClusterID(keypath.ClusterID(), header.GetClusterId())
}
return tsopb.ErrorType_OK, nil
}
Expand Down
9 changes: 3 additions & 6 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
Expand Down Expand Up @@ -279,7 +277,7 @@ func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorM
// TODO: Check if the sender is from the global TSO allocator
func (s *Server) ValidateInternalRequest(_ *tsopb.RequestHeader, _ bool) error {
if s.IsClosed() {
return ErrNotStarted
return errs.ErrNotStarted
}
return nil
}
Expand All @@ -288,11 +286,10 @@ func (s *Server) ValidateInternalRequest(_ *tsopb.RequestHeader, _ bool) error {
// TODO: Check if the keyspace replica is the primary
func (s *Server) ValidateRequest(header *tsopb.RequestHeader) error {
if s.IsClosed() {
return ErrNotStarted
return errs.ErrNotStarted
}
if header.GetClusterId() != keypath.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d",
keypath.ClusterID(), header.GetClusterId())
return errs.ErrMismatchClusterID(keypath.ClusterID(), header.GetClusterId())
}
return nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"github.com/docker/go-units"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -208,7 +206,7 @@ func (s *RegionSyncer) Sync(ctx context.Context, stream pdpb.PD_SyncRegionsServe
}
clusterID := request.GetHeader().GetClusterId()
if clusterID != keypath.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", keypath.ClusterID(), clusterID)
return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID)
}
log.Info("establish sync region stream",
zap.String("requested-server", request.GetMember().GetName()),
Expand Down
2 changes: 1 addition & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (h *adminHandler) recoverAllocID(w http.ResponseWriter, r *http.Request) {
func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), constant.SchedulingServiceName)
if !ok {
return errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
return errs.ErrNotFoundSchedulingPrimary.FastGenByArgs()
}
var idStr string
if len(id) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func (h *confHandler) GetPDServerConfig(w http.ResponseWriter, _ *http.Request)
func (h *confHandler) getSchedulingServerConfig() (*config.Config, error) {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), constant.SchedulingServiceName)
if !ok {
return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
return nil, errs.ErrNotFoundSchedulingPrimary.FastGenByArgs()
}
url := fmt.Sprintf("%s/scheduling/api/v1/config", addr)
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
Expand Down
10 changes: 4 additions & 6 deletions server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -107,7 +105,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings())
if maxConcurrentTSOProxyStreamings >= 0 {
if newCount := s.concurrentTSOProxyStreamings.Add(1); newCount > maxConcurrentTSOProxyStreamings {
return errors.WithStack(ErrMaxCountTSOProxyRoutinesExceeded)
return errors.WithStack(errs.ErrMaxCountTSOProxyRoutinesExceeded)
}
}

Expand All @@ -132,7 +130,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
}
if request.GetCount() == 0 {
err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive")
return status.Error(codes.Unknown, err.Error())
return errs.ErrUnknown(err)
}
forwardCtx, cancelForward, forwardStream, lastForwardedHost, tsoStreamErr, err = s.handleTSOForwarding(forwardCtx, forwardStream, stream, server, request, tsDeadlineCh, lastForwardedHost, cancelForward)
if tsoStreamErr != nil {
Expand All @@ -155,7 +153,7 @@ func (s *GrpcServer) handleTSOForwarding(forwardCtx context.Context, forwardStre
) {
forwardedHost, ok := s.GetServicePrimaryAddr(stream.Context(), constant.TSOServiceName)
if !ok || len(forwardedHost) == 0 {
return forwardCtx, cancelForward, forwardStream, lastForwardedHost, errors.WithStack(ErrNotFoundTSOAddr), nil
return forwardCtx, cancelForward, forwardStream, lastForwardedHost, errors.WithStack(errs.ErrNotFoundTSOAddr), nil
}
if forwardStream == nil || lastForwardedHost != forwardedHost {
if cancelForward != nil {
Expand Down Expand Up @@ -458,7 +456,7 @@ func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) {
}
forwardedHost, ok = s.GetServicePrimaryAddr(ctx, constant.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
return pdpb.Timestamp{}, errs.ErrNotFoundTSOAddr
}
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
Expand Down
Loading

0 comments on commit b1c5919

Please sign in to comment.