From a929a546a790222299b556e449816e622288a5d1 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 3 Jun 2024 16:28:25 +0800 Subject: [PATCH] client/http, api/middleware: enhance the retry logic of the HTTP client (#8229) ref tikv/pd#7300 Schedule a member change check when the HTTP status code is 503 or receives a leader/primary change error. Signed-off-by: JmPotato --- client/client.go | 11 ----- client/errs/errno.go | 13 +++-- client/errs/errs.go | 18 +++++++ client/http/client.go | 49 +++++++++++++------ client/http/request_info.go | 11 +++++ client/pd_service_discovery_test.go | 3 +- client/resource_manager_client.go | 7 +-- client/tso_dispatcher.go | 2 +- errors.toml | 10 ++++ pkg/errs/errno.go | 9 ++-- .../apiutil/multiservicesapi/middleware.go | 4 +- pkg/utils/apiutil/serverapi/middleware.go | 4 +- server/apiv2/middlewares/redirector.go | 4 +- tests/integrations/client/client_test.go | 3 +- .../mcs/tso/keyspace_group_manager_test.go | 5 +- tests/server/cluster/cluster_test.go | 2 +- 16 files changed, 99 insertions(+), 56 deletions(-) diff --git a/client/client.go b/client/client.go index 1865fd0866e..1c8ef3cafe8 100644 --- a/client/client.go +++ b/client/client.go @@ -1431,17 +1431,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint return resp, nil } -// IsLeaderChange will determine whether there is a leader change. -func IsLeaderChange(err error) bool { - if err == errs.ErrClientTSOStreamClosed { - return true - } - errMsg := err.Error() - return strings.Contains(errMsg, errs.NotLeaderErr) || - strings.Contains(errMsg, errs.MismatchLeaderErr) || - strings.Contains(errMsg, errs.NotServedErr) -} - const ( httpSchemePrefix = "http://" httpsSchemePrefix = "https://" diff --git a/client/errs/errno.go b/client/errs/errno.go index 50c136dd5f2..0dbcb4fe147 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -20,21 +20,20 @@ import ( "github.com/pingcap/errors" ) +// Note: keep the same as the ones defined on the server side to ensure the client can use them correctly. const ( + // NoLeaderErr indicates there is no leader in the cluster currently. + NoLeaderErr = "no leader" // NotLeaderErr indicates the non-leader member received the requests which should be received by leader. - // Note: keep the same as the ones defined on the server side, because the client side checks if an error message - // contains this string to judge whether the leader is changed. - NotLeaderErr = "is not leader" + NotLeaderErr = "not leader" // MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader. - // Note: keep the same as the ones defined on the server side, because the client side checks if an error message - // contains this string to judge whether the leader is changed. MismatchLeaderErr = "mismatch leader id" // NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it. - // Note: keep the same as the ones defined on the server side, because the client side checks if an error message - // contains this string to judge whether the leader is changed. NotServedErr = "is not served" // RetryTimeoutErr indicates the server is busy. RetryTimeoutErr = "retry timeout" + // NotPrimaryErr indicates the non-primary member received the requests which should be received by primary. + NotPrimaryErr = "not primary" ) // client errors diff --git a/client/errs/errs.go b/client/errs/errs.go index 47f7c29a467..da333efda4c 100644 --- a/client/errs/errs.go +++ b/client/errs/errs.go @@ -15,11 +15,29 @@ package errs import ( + "strings" + "github.com/pingcap/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +// IsLeaderChange will determine whether there is a leader/primary change. +func IsLeaderChange(err error) bool { + if err == nil { + return false + } + if err == ErrClientTSOStreamClosed { + return true + } + errMsg := err.Error() + return strings.Contains(errMsg, NoLeaderErr) || + strings.Contains(errMsg, NotLeaderErr) || + strings.Contains(errMsg, MismatchLeaderErr) || + strings.Contains(errMsg, NotServedErr) || + strings.Contains(errMsg, NotPrimaryErr) +} + // ZapError is used to make the log output easier. func ZapError(err error, causeError ...error) zap.Field { if err == nil { diff --git a/client/http/client.go b/client/http/client.go index 7b34193c2a4..123ca616422 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -120,10 +120,25 @@ func (ci *clientInner) requestWithRetry( headerOpts ...HeaderOption, ) error { var ( + serverURL string + isLeader bool statusCode int err error + logFields = append(reqInfo.logFields(), zap.String("source", ci.source)) ) execFunc := func() error { + defer func() { + // If the status code is 503, it indicates that there may be PD leader/follower changes. + // If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change. + if statusCode == http.StatusServiceUnavailable || errs.IsLeaderChange(err) { + ci.sd.ScheduleCheckMemberChanged() + } + log.Debug("[pd] http request finished", append(logFields, + zap.String("server-url", serverURL), + zap.Bool("is-leader", isLeader), + zap.Int("status-code", statusCode), + zap.Error(err))...) + }() // It will try to send the request to the PD leader first and then try to send the request to the other PD followers. clients := ci.sd.GetAllServiceClients() if len(clients) == 0 { @@ -131,17 +146,21 @@ func (ci *clientInner) requestWithRetry( } skipNum := 0 for _, cli := range clients { - url := cli.GetURL() - if reqInfo.targetURL != "" && reqInfo.targetURL != url { + serverURL = cli.GetURL() + isLeader = cli.IsConnectedToLeader() + if len(reqInfo.targetURL) > 0 && reqInfo.targetURL != serverURL { skipNum++ continue } - statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...) + statusCode, err = ci.doRequest(ctx, serverURL, reqInfo, headerOpts...) if err == nil || noNeedRetry(statusCode) { return err } - log.Debug("[pd] request url failed", - zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err)) + log.Debug("[pd] http request url failed", append(logFields, + zap.String("server-url", serverURL), + zap.Bool("is-leader", isLeader), + zap.Int("status-code", statusCode), + zap.Error(err))...) } if skipNum == len(clients) { return errs.ErrClientNoTargetMember @@ -169,26 +188,21 @@ func noNeedRetry(statusCode int) bool { func (ci *clientInner) doRequest( ctx context.Context, - url string, reqInfo *requestInfo, + serverURL string, reqInfo *requestInfo, headerOpts ...HeaderOption, ) (int, error) { var ( - source = ci.source callerID = reqInfo.callerID name = reqInfo.name method = reqInfo.method body = reqInfo.body res = reqInfo.res respHandler = reqInfo.respHandler + url = reqInfo.getURL(serverURL) + logFields = append(reqInfo.logFields(), + zap.String("source", ci.source), + zap.String("url", url)) ) - url = reqInfo.getURL(url) - logFields := []zap.Field{ - zap.String("source", source), - zap.String("name", name), - zap.String("url", url), - zap.String("method", method), - zap.String("caller-id", callerID), - } log.Debug("[pd] request the http url", logFields...) req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body)) if err != nil { @@ -229,11 +243,14 @@ func (ci *clientInner) doRequest( if readErr != nil { logFields = append(logFields, zap.NamedError("read-body-error", err)) } else { + // API server will return a JSON body containing the detailed error message + // when the status code is not `http.StatusOK` 200. + bs = bytes.TrimSpace(bs) logFields = append(logFields, zap.ByteString("body", bs)) } log.Error("[pd] request failed with a non-200 status", logFields...) - return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s'", resp.Status) + return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s', body: '%s'", resp.Status, bs) } if res == nil { diff --git a/client/http/request_info.go b/client/http/request_info.go index 202eab1150f..3fb91c6ca97 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/tikv/pd/client/retry" + "go.uber.org/zap" ) // The following constants are the names of the requests. @@ -157,3 +158,13 @@ func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo { func (ri *requestInfo) getURL(addr string) string { return fmt.Sprintf("%s%s", addr, ri.uri) } + +func (ri *requestInfo) logFields() []zap.Field { + return []zap.Field{ + zap.String("caller-id", ri.callerID), + zap.String("name", ri.name), + zap.String("uri", ri.uri), + zap.String("method", ri.method), + zap.String("target-url", ri.targetURL), + } +} diff --git a/client/pd_service_discovery_test.go b/client/pd_service_discovery_test.go index f4cde0e1911..44171873b1a 100644 --- a/client/pd_service_discovery_test.go +++ b/client/pd_service_discovery_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/testutil" "google.golang.org/grpc" @@ -205,7 +206,7 @@ func (suite *serviceClientTestSuite) TestServiceClient() { re.NotNil(leaderConn) _, err := pb.NewGreeterClient(followerConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"}) - re.ErrorContains(err, "not leader") + re.ErrorContains(err, errs.NotLeaderErr) resp, err := pb.NewGreeterClient(leaderConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"}) re.NoError(err) re.Equal("Hello pd", resp.GetMessage()) diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 872b241cfe7..98b123c0823 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -16,7 +16,6 @@ package pd import ( "context" - "strings" "time" "github.com/gogo/protobuf/proto" @@ -35,10 +34,6 @@ const ( modify actionType = 1 groupSettingsPathPrefix = "resource_group/settings" controllerConfigPathPrefix = "resource_group/controller" - // errNotPrimary is returned when the requested server is not primary. - errNotPrimary = "not primary" - // errNotLeader is returned when the requested server is not pd leader. - errNotLeader = "not leader" ) // GroupSettingsPathPrefixBytes is used to watch or get resource groups. @@ -83,7 +78,7 @@ func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) { // gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. func (c *client) gRPCErrorHandler(err error) { - if strings.Contains(err.Error(), errNotPrimary) || strings.Contains(err.Error(), errNotLeader) { + if errs.IsLeaderChange(err) { c.pdSvcDiscovery.ScheduleCheckMemberChanged() } } diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index d5b52ad6039..0919fd84744 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -303,7 +303,7 @@ tsoBatchLoop: cancel() stream = nil // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. - if IsLeaderChange(err) { + if errs.IsLeaderChange(err) { if err := bo.Exec(ctx, svcDiscovery.CheckMemberChanged); err != nil { select { case <-ctx.Done(): diff --git a/errors.toml b/errors.toml index 64101000478..a61c23a6fbd 100644 --- a/errors.toml +++ b/errors.toml @@ -16,11 +16,21 @@ error = ''' redirect failed ''' +["PD:apiutil:ErrRedirectNoLeader"] +error = ''' +redirect finds no leader +''' + ["PD:apiutil:ErrRedirectToNotLeader"] error = ''' redirect to not leader ''' +["PD:apiutil:ErrRedirectToNotPrimary"] +error = ''' +redirect to not primary +''' + ["PD:autoscaling:ErrEmptyMetricsResponse"] error = ''' metrics response from Prometheus is empty diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 8c3e914531b..1f56a821032 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -195,10 +195,11 @@ var ( // apiutil errors var ( - ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect")) - ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist")) - // ErrRedirectToNotLeader is the error message for redirect to not leader. - ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader")) + ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect")) + ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist")) + ErrRedirectNoLeader = errors.Normalize("redirect finds no leader", errors.RFCCodeText("PD:apiutil:ErrRedirectNoLeader")) + ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader")) + ErrRedirectToNotPrimary = errors.Normalize("redirect to not primary", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotPrimary")) ) // grpcutil errors diff --git a/pkg/utils/apiutil/multiservicesapi/middleware.go b/pkg/utils/apiutil/multiservicesapi/middleware.go index ed34ecc6afb..4343adcc981 100644 --- a/pkg/utils/apiutil/multiservicesapi/middleware.go +++ b/pkg/utils/apiutil/multiservicesapi/middleware.go @@ -48,8 +48,8 @@ func ServiceRedirector() gin.HandlerFunc { // Prevent more than one redirection. if name := c.Request.Header.Get(ServiceRedirectorHeader); len(name) != 0 { - log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect)) - c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error()) + log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotPrimary)) + c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotPrimary.FastGenByArgs().Error()) return } diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 1cd3d5b53d6..0718702b5a5 100755 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -210,7 +210,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http leader := h.waitForLeader(r) // The leader has not been elected yet. if leader == nil { - http.Error(w, "no leader", http.StatusServiceUnavailable) + http.Error(w, errs.ErrRedirectNoLeader.FastGenByArgs().Error(), http.StatusServiceUnavailable) return } // If the leader is the current server now, we can handle the request directly. @@ -222,7 +222,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name()) } else { // Prevent more than one redirection among PD/API servers. - log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) + log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader)) http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) return } diff --git a/server/apiv2/middlewares/redirector.go b/server/apiv2/middlewares/redirector.go index 37c06de1585..9c2c4081175 100644 --- a/server/apiv2/middlewares/redirector.go +++ b/server/apiv2/middlewares/redirector.go @@ -43,8 +43,8 @@ func Redirector() gin.HandlerFunc { // Prevent more than one redirection. if name := c.Request.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 { - log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect)) - c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error()) + log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotLeader)) + c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotLeader.FastGenByArgs().Error()) return } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index dfe7a6980c7..65acd897726 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -40,6 +40,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + clierrs "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" @@ -528,7 +529,7 @@ func TestGlobalAndLocalTSO(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) _, _, err = cli.GetTS(ctx) re.Error(err) - re.True(pd.IsLeaderChange(err)) + re.True(clierrs.IsLeaderChange(err)) _, _, err = cli.GetTS(ctx) re.NoError(err) re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateMember")) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 25d9516bf63..6d861962d9b 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + clierrs "github.com/tikv/pd/client/errs" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" @@ -467,8 +468,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient( errMsg := err.Error() // Ignore the errors caused by the split and context cancellation. if strings.Contains(errMsg, "context canceled") || - strings.Contains(errMsg, "not leader") || - strings.Contains(errMsg, "not served") || + strings.Contains(errMsg, clierrs.NotLeaderErr) || + strings.Contains(errMsg, clierrs.NotServedErr) || strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { continue diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 07bcf3ee2a1..9e70a52d11d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -662,7 +662,7 @@ func TestNotLeader(t *testing.T) { grpcStatus, ok := status.FromError(err) re.True(ok) re.Equal(codes.Unavailable, grpcStatus.Code()) - re.Equal("not leader", grpcStatus.Message()) + re.ErrorContains(server.ErrNotLeader, grpcStatus.Message()) } func TestStoreVersionChange(t *testing.T) {