Skip to content

Commit 53f92e1

Browse files
authored
Avoid panic when using dedicated client after being recycled, return an error instead (#593)
* avoid panic on use after close, return error instead * introduce ErrDedicatedClientRecycled * better comment for ErrDedicatedClientRecycled
1 parent ab9f3fc commit 53f92e1

File tree

5 files changed

+49
-31
lines changed

5 files changed

+49
-31
lines changed

client.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,9 @@ func (c *dedicatedSingleClient) B() Builder {
177177

178178
func (c *dedicatedSingleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
179179
retry:
180-
c.check()
180+
if err := c.check(); err != nil {
181+
return newErrResult(err)
182+
}
181183
resp = c.wire.Do(ctx, cmd)
182184
if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) {
183185
goto retry
@@ -197,7 +199,9 @@ func (c *dedicatedSingleClient) DoMulti(ctx context.Context, multi ...Completed)
197199
retryable = allReadOnly(multi)
198200
}
199201
retry:
200-
c.check()
202+
if err := c.check(); err != nil {
203+
return fillErrs(len(multi), err)
204+
}
201205
resp = c.wire.DoMulti(ctx, multi...).s
202206
if retryable && anyRetryable(resp, c.wire, ctx) {
203207
goto retry
@@ -212,7 +216,9 @@ retry:
212216

213217
func (c *dedicatedSingleClient) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) (err error) {
214218
retry:
215-
c.check()
219+
if err := c.check(); err != nil {
220+
return err
221+
}
216222
err = c.wire.Receive(ctx, subscribe, fn)
217223
if c.retry {
218224
if _, ok := err.(*RedisError); !ok && isRetryable(err, c.wire, ctx) {
@@ -226,7 +232,11 @@ retry:
226232
}
227233

228234
func (c *dedicatedSingleClient) SetPubSubHooks(hooks PubSubHooks) <-chan error {
229-
c.check()
235+
if err := c.check(); err != nil {
236+
ch := make(chan error, 1)
237+
ch <- err
238+
return ch
239+
}
230240
return c.wire.SetPubSubHooks(hooks)
231241
}
232242

@@ -235,10 +245,11 @@ func (c *dedicatedSingleClient) Close() {
235245
c.release()
236246
}
237247

238-
func (c *dedicatedSingleClient) check() {
248+
func (c *dedicatedSingleClient) check() error {
239249
if atomic.LoadUint32(&c.mark) != 0 {
240-
panic(dedicatedClientUsedAfterReleased)
250+
return ErrDedicatedClientRecycled
241251
}
252+
return nil
242253
}
243254

244255
func (c *dedicatedSingleClient) release() {

client_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -548,10 +548,10 @@ func TestSingleClient(t *testing.T) {
548548
}
549549
})
550550

551-
t.Run("Dedicate panic after released", func(t *testing.T) {
551+
t.Run("Dedicate ErrDedicatedClientRecycled after released", func(t *testing.T) {
552552
m.AcquireFn = func() wire { return &mockWire{} }
553-
check := func() {
554-
if err := recover(); err != dedicatedClientUsedAfterReleased {
553+
check := func(err error) {
554+
if !errors.Is(err, ErrDedicatedClientRecycled) {
555555
t.Fatalf("unexpected err %v", err)
556556
}
557557
}
@@ -567,20 +567,22 @@ func TestSingleClient(t *testing.T) {
567567
closeFn(c, cancel)
568568
for _, fn := range []func(){
569569
func() {
570-
defer check()
571-
c.Do(context.Background(), c.B().Get().Key("k").Build())
570+
resp := c.Do(context.Background(), c.B().Get().Key("k").Build())
571+
check(resp.Error())
572572
},
573573
func() {
574-
defer check()
575-
c.DoMulti(context.Background(), c.B().Get().Key("k").Build())
574+
resp := c.DoMulti(context.Background(), c.B().Get().Key("k").Build())
575+
for _, r := range resp {
576+
check(r.Error())
577+
}
576578
},
577579
func() {
578-
defer check()
579-
c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {})
580+
err := c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {})
581+
check(err)
580582
},
581583
func() {
582-
defer check()
583-
c.SetPubSubHooks(PubSubHooks{})
584+
ch := c.SetPubSubHooks(PubSubHooks{})
585+
check(<-ch)
584586
},
585587
} {
586588
fn()

cluster.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,7 @@ func (c *dedicatedClusterClient) acquire(ctx context.Context, slot uint16) (wire
11111111
c.mu.Lock()
11121112
defer c.mu.Unlock()
11131113
if c.mark {
1114-
panic(dedicatedClientUsedAfterReleased)
1114+
return nil, ErrDedicatedClientRecycled
11151115
}
11161116
if c.slot == cmds.NoSlot {
11171117
c.slot = slot
@@ -1241,7 +1241,9 @@ func (c *dedicatedClusterClient) SetPubSubHooks(hooks PubSubHooks) <-chan error
12411241
c.mu.Lock()
12421242
defer c.mu.Unlock()
12431243
if c.mark {
1244-
panic(dedicatedClientUsedAfterReleased)
1244+
ch := make(chan error, 1)
1245+
ch <- ErrDedicatedClientRecycled
1246+
return ch
12451247
}
12461248
if p := c.pshks; p != nil {
12471249
c.pshks = nil

cluster_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,9 +1520,9 @@ func TestClusterClient(t *testing.T) {
15201520
}
15211521
})
15221522

1523-
t.Run("Dedicate panic after released", func(t *testing.T) {
1524-
check := func() {
1525-
if err := recover(); err != dedicatedClientUsedAfterReleased {
1523+
t.Run("Dedicate ErrDedicatedClientRecycled after released", func(t *testing.T) {
1524+
check := func(err error) {
1525+
if !errors.Is(err, ErrDedicatedClientRecycled) {
15261526
t.Fatalf("unexpected err %v", err)
15271527
}
15281528
}
@@ -1538,20 +1538,22 @@ func TestClusterClient(t *testing.T) {
15381538
closeFn(c, cancel)
15391539
for _, fn := range []func(){
15401540
func() {
1541-
defer check()
1542-
c.Do(context.Background(), c.B().Get().Key("k").Build())
1541+
resp := c.Do(context.Background(), c.B().Get().Key("k").Build())
1542+
check(resp.Error())
15431543
},
15441544
func() {
1545-
defer check()
1546-
c.DoMulti(context.Background(), c.B().Get().Key("k").Build())
1545+
resp := c.DoMulti(context.Background(), c.B().Get().Key("k").Build())
1546+
for _, r := range resp {
1547+
check(r.Error())
1548+
}
15471549
},
15481550
func() {
1549-
defer check()
1550-
c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {})
1551+
err := c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {})
1552+
check(err)
15511553
},
15521554
func() {
1553-
defer check()
1554-
c.SetPubSubHooks(PubSubHooks{})
1555+
ch := c.SetPubSubHooks(PubSubHooks{})
1556+
check(<-ch)
15551557
},
15561558
} {
15571559
fn()

rueidis.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ var (
5151
ErrReplicaOnlyNotSupported = errors.New("ReplicaOnly is not supported for single client")
5252
// ErrWrongPipelineMultiplex means wrong value for ClientOption.PipelineMultiplex
5353
ErrWrongPipelineMultiplex = errors.New("ClientOption.PipelineMultiplex must not be bigger than MaxPipelineMultiplex")
54+
// ErrDedicatedClientRecycled means the caller attempted to use the dedicated client which has been already recycled (after canceled/closed).
55+
ErrDedicatedClientRecycled = errors.New("dedicated client should not be used after recycled")
5456
)
5557

5658
// ClientOption should be passed to NewClient to construct a Client
@@ -392,4 +394,3 @@ func dial(dst string, opt *ClientOption) (conn net.Conn, err error) {
392394
}
393395

394396
const redisErrMsgCommandNotAllow = "command is not allowed"
395-
const dedicatedClientUsedAfterReleased = "DedicatedClient should not be used after recycled"

0 commit comments

Comments
 (0)