Skip to content

Commit f75da21

Browse files
committed
Fix the multierr bug
Signed-off-by: JmPotato <[email protected]>
1 parent 75da7c0 commit f75da21

File tree

7 files changed

+73
-37
lines changed

7 files changed

+73
-37
lines changed

Diff for: client/errs/errs.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ import (
2222
"go.uber.org/zap/zapcore"
2323
)
2424

25-
// IsLeaderChange will determine whether there is a leader change.
25+
// IsLeaderChange will determine whether there is a leader/primary change.
2626
func IsLeaderChange(err error) bool {
27+
if err == nil {
28+
return false
29+
}
2730
if err == ErrClientTSOStreamClosed {
2831
return true
2932
}

Diff for: client/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ require (
1616
github.com/stretchr/testify v1.8.2
1717
go.uber.org/atomic v1.10.0
1818
go.uber.org/goleak v1.1.11
19-
go.uber.org/multierr v1.11.0
2019
go.uber.org/zap v1.24.0
2120
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
2221
google.golang.org/grpc v1.62.1
@@ -34,6 +33,7 @@ require (
3433
github.com/prometheus/client_model v0.5.0 // indirect
3534
github.com/prometheus/common v0.46.0 // indirect
3635
github.com/prometheus/procfs v0.12.0 // indirect
36+
go.uber.org/multierr v1.7.0 // indirect
3737
golang.org/x/net v0.23.0 // indirect
3838
golang.org/x/sys v0.18.0 // indirect
3939
golang.org/x/text v0.14.0 // indirect

Diff for: client/go.sum

+1-2
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,8 @@ go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A
8888
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
8989
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
9090
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
91+
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
9192
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
92-
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
93-
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
9493
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
9594
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
9695
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=

Diff for: client/http/client.go

+20-19
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,20 @@ func (ci *clientInner) requestWithRetry(
124124
isLeader bool
125125
statusCode int
126126
err error
127-
logFields = append(reqInfo.logFields(),
128-
zap.String("source", ci.source),
129-
zap.String("server-url", serverURL),
130-
zap.Bool("is-leader", isLeader),
131-
zap.Int("status-code", statusCode),
132-
zap.Error(err))
127+
logFields = append(reqInfo.logFields(), zap.String("source", ci.source))
133128
)
134129
execFunc := func() error {
135130
defer func() {
136-
// Handle some special status codes and errors to increase the success rate of the following requests.
137-
ci.handleHTTPStatusCodeAndErr(statusCode, err)
138-
log.Debug("[pd] http request finished", logFields...)
131+
// - If the status code is 503, it indicates that there may be PD leader/follower changes.
132+
// - If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
133+
if statusCode == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
134+
ci.sd.ScheduleCheckMemberChanged()
135+
}
136+
log.Info("[pd] http request finished", append(logFields,
137+
zap.String("server-url", serverURL),
138+
zap.Bool("is-leader", isLeader),
139+
zap.Int("status-code", statusCode),
140+
zap.Error(err))...)
139141
}()
140142
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
141143
clients := ci.sd.GetAllServiceClients()
@@ -154,7 +156,11 @@ func (ci *clientInner) requestWithRetry(
154156
if err == nil || noNeedRetry(statusCode) {
155157
return err
156158
}
157-
log.Debug("[pd] http request url failed", logFields...)
159+
log.Info("[pd] http request url failed", append(logFields,
160+
zap.String("server-url", serverURL),
161+
zap.Bool("is-leader", isLeader),
162+
zap.Int("status-code", statusCode),
163+
zap.Error(err))...)
158164
}
159165
if skipNum == len(clients) {
160166
return errs.ErrClientNoTargetMember
@@ -174,14 +180,6 @@ func (ci *clientInner) requestWithRetry(
174180
return bo.Exec(ctx, execFunc)
175181
}
176182

177-
func (ci *clientInner) handleHTTPStatusCodeAndErr(code int, err error) {
178-
// - If the status code is 503, it indicates that there may be PD leader/follower changes.
179-
// - If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
180-
if code == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
181-
ci.sd.ScheduleCheckMemberChanged()
182-
}
183-
}
184-
185183
func noNeedRetry(statusCode int) bool {
186184
return statusCode == http.StatusNotFound ||
187185
statusCode == http.StatusForbidden ||
@@ -245,11 +243,14 @@ func (ci *clientInner) doRequest(
245243
if readErr != nil {
246244
logFields = append(logFields, zap.NamedError("read-body-error", err))
247245
} else {
246+
bs = bytes.TrimSpace(bs)
248247
logFields = append(logFields, zap.ByteString("body", bs))
249248
}
250249

251250
log.Error("[pd] request failed with a non-200 status", logFields...)
252-
return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
251+
return resp.StatusCode, errors.Errorf(
252+
"request pd http api failed with status: '%s', body: '%s'", resp.Status, bs,
253+
)
253254
}
254255

255256
if res == nil {

Diff for: client/http/request_info.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,10 @@ func (ri *requestInfo) getURL(addr string) string {
161161

162162
func (ri *requestInfo) logFields() []zap.Field {
163163
return []zap.Field{
164-
zap.String("callerID", ri.callerID),
164+
zap.String("caller-id", ri.callerID),
165165
zap.String("name", ri.name),
166166
zap.String("uri", ri.uri),
167167
zap.String("method", ri.method),
168-
zap.String("targetURL", ri.targetURL),
168+
zap.String("target-url", ri.targetURL),
169169
}
170170
}

Diff for: client/retry/backoff.go

+4-12
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,9 @@ import (
2424
"github.com/pingcap/errors"
2525
"github.com/pingcap/failpoint"
2626
"github.com/pingcap/log"
27-
"go.uber.org/multierr"
2827
"go.uber.org/zap"
2928
)
3029

31-
const maxRecordErrorCount = 20
32-
3330
// Option is used to customize the backoffer.
3431
type Option func(*Backoffer)
3532

@@ -69,18 +66,13 @@ func (bo *Backoffer) Exec(
6966
) error {
7067
defer bo.resetBackoff()
7168
var (
72-
allErrors error
73-
err error
74-
after *time.Timer
69+
err error
70+
after *time.Timer
7571
)
7672
fnName := getFunctionName(fn)
7773
for {
7874
err = fn()
7975
bo.attempt++
80-
if bo.attempt < maxRecordErrorCount {
81-
// multierr.Append will ignore nil error.
82-
allErrors = multierr.Append(allErrors, err)
83-
}
8476
if !bo.isRetryable(err) {
8577
break
8678
}
@@ -100,7 +92,7 @@ func (bo *Backoffer) Exec(
10092
select {
10193
case <-ctx.Done():
10294
after.Stop()
103-
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
95+
return errors.Trace(ctx.Err())
10496
case <-after.C:
10597
failpoint.Inject("backOffExecute", func() {
10698
testBackOffExecuteFlag = true
@@ -115,7 +107,7 @@ func (bo *Backoffer) Exec(
115107
}
116108
}
117109
}
118-
return allErrors
110+
return err
119111
}
120112

121113
// InitialBackoffer make the initial state for retrying.

Diff for: tests/integrations/client/http_client_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"net/url"
2222
"sort"
2323
"strings"
24+
"sync"
2425
"testing"
2526
"time"
2627

@@ -757,3 +758,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() {
757758
re.Equal("pd2", healths[1].Name)
758759
re.True(healths[0].Health && healths[1].Health)
759760
}
761+
762+
func (suite *httpClientTestSuite) TestRetryOnLeaderChange() {
763+
re := suite.Require()
764+
ctx, cancel := context.WithCancel(suite.ctx)
765+
defer cancel()
766+
767+
var wg sync.WaitGroup
768+
wg.Add(1)
769+
go func() {
770+
defer wg.Done()
771+
bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)
772+
client := suite.client.WithBackoffer(bo)
773+
for {
774+
healths, err := client.GetHealthStatus(ctx)
775+
if err != nil && strings.Contains(err.Error(), "context canceled") {
776+
return
777+
}
778+
re.NoError(err)
779+
re.Len(healths, 2)
780+
select {
781+
case <-ctx.Done():
782+
return
783+
default:
784+
}
785+
}
786+
}()
787+
788+
leader := suite.cluster.GetLeaderServer()
789+
re.NotNil(leader)
790+
for i := 0; i < 3; i++ {
791+
leader.ResignLeader()
792+
re.NotEmpty(suite.cluster.WaitLeader())
793+
leader = suite.cluster.GetLeaderServer()
794+
re.NotNil(leader)
795+
}
796+
797+
// Cancel the context to stop the goroutine.
798+
cancel()
799+
wg.Wait()
800+
}

0 commit comments

Comments
 (0)