From f91605ab2013d9dec9828facf8a2d24defef2a33 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 26 Nov 2024 16:18:17 +0800 Subject: [PATCH] server: use `ResourceExhausted` for the rate limit error (#8853) ref tikv/pd#5739 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/grpc_service.go | 147 ++++++++++-------------------------- tests/server/server_test.go | 19 ++--- 2 files changed, 49 insertions(+), 117 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 3aa07e841ff..59d009daff4 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -301,9 +301,7 @@ func (s *GrpcServer) GetMinTS( if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetMinTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -458,9 +456,7 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetMembersResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID @@ -522,7 +518,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } if s.IsServiceIndependent(constant.TSOServiceName) { @@ -640,9 +636,7 @@ func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapReque if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.BootstrapResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -684,9 +678,7 @@ func (s *GrpcServer) IsBootstrapped(ctx context.Context, request *pdpb.IsBootstr if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.IsBootstrappedResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -713,9 +705,7 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AllocIDResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -749,9 +739,7 @@ func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, _ *pdpb.IsSnapsho if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.IsSnapshotRecoveringResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } // recovering mark is stored in etcd directly, there's no need to forward. @@ -775,9 +763,7 @@ func (s *GrpcServer) GetStore(ctx context.Context, request *pdpb.GetStoreRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetStoreResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -831,9 +817,7 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.PutStoreResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -888,9 +872,7 @@ func (s *GrpcServer) GetAllStores(ctx context.Context, request *pdpb.GetAllStore if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetAllStoresResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -933,9 +915,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.StoreHeartbeatResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrRateLimitExceeded.FastGenByArgs().Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1114,7 +1094,7 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } for { @@ -1230,7 +1210,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } for { @@ -1435,9 +1415,7 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1499,9 +1477,7 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1558,9 +1534,7 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1620,9 +1594,7 @@ func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ScanRegionsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1678,9 +1650,7 @@ func (s *GrpcServer) BatchScanRegions(ctx context.Context, request *pdpb.BatchSc if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.BatchScanRegionsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1770,9 +1740,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AskSplitResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1816,9 +1784,7 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AskBatchSplitResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -1892,9 +1858,7 @@ func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportSplitResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1930,9 +1894,7 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportBatchSplitResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1969,9 +1931,7 @@ func (s *GrpcServer) GetClusterConfig(ctx context.Context, request *pdpb.GetClus if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetClusterConfigResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2001,9 +1961,7 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.PutClusterConfigResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2042,9 +2000,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ScatterRegionResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2156,9 +2112,7 @@ func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafe if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetGCSafePointResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2197,7 +2151,7 @@ func (s *GrpcServer) SyncRegions(stream pdpb.PD_SyncRegionsServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } ctx := s.cluster.Context() @@ -2215,9 +2169,7 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.UpdateGCSafePointResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2264,9 +2216,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.UpdateServiceGCSafePointResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2320,9 +2270,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetOperatorResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2535,9 +2483,7 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SyncMaxTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } tsoAllocatorManager := s.GetTSOAllocatorManager() @@ -2640,9 +2586,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SplitRegionsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2706,9 +2650,7 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SplitAndScatterRegionsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2795,12 +2737,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.StoreGlobalConfigResponse{ - Error: &pdpb.Error{ - Type: pdpb.ErrorType_UNKNOWN, - Message: err.Error(), - }, - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } configPath := request.GetConfigPath() @@ -2846,7 +2783,7 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, err + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } configPath := request.GetConfigPath() @@ -2894,7 +2831,7 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } ctx, cancel := context.WithCancel(server.Context()) @@ -2991,9 +2928,7 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportMinResolvedTsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -3031,9 +2966,7 @@ func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.Set if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SetExternalTimestampResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -3069,9 +3002,7 @@ func (s *GrpcServer) GetExternalTimestamp(ctx context.Context, request *pdpb.Get if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetExternalTimestampResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { diff --git a/tests/server/server_test.go b/tests/server/server_test.go index fc562825460..a5fcd33d2bc 100644 --- a/tests/server/server_test.go +++ b/tests/server/server_test.go @@ -170,11 +170,12 @@ func TestGRPCRateLimit(t *testing.T) { Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()}, RegionKey: []byte(""), }) - re.NoError(err) + re.Empty(resp.GetHeader().GetError()) if i == 0 { - re.Empty(resp.GetHeader().GetError()) + re.NoError(err) } else { - re.Contains(resp.GetHeader().GetError().GetMessage(), "rate limit exceeded") + re.Error(err) + re.Contains(err.Error(), "rate limit exceeded") } } @@ -214,9 +215,9 @@ func TestGRPCRateLimit(t *testing.T) { Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()}, RegionKey: []byte(""), }) - re.NoError(err) - if resp.GetHeader().GetError() != nil { - errCh <- resp.GetHeader().GetError().GetMessage() + re.Empty(resp.GetHeader().GetError()) + if err != nil { + errCh <- err.Error() } else { okCh <- struct{}{} } @@ -229,9 +230,9 @@ func TestGRPCRateLimit(t *testing.T) { Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()}, RegionKey: []byte(""), }) - re.NoError(err) - if resp.GetHeader().GetError() != nil { - errCh <- resp.GetHeader().GetError().GetMessage() + re.Empty(resp.GetHeader().GetError()) + if err != nil { + errCh <- err.Error() } else { okCh <- struct{}{} }