diff --git a/v2/lockrenewer.go b/v2/lockrenewer.go index dd2b1cc..1872cb0 100644 --- a/v2/lockrenewer.go +++ b/v2/lockrenewer.go @@ -23,6 +23,9 @@ type LockRenewer interface { type LockRenewalOptions struct { // Interval defines the frequency at which we renew the lock on the message. Defaults to 10 seconds. Interval *time.Duration + // LockRenewalTimeout is the timeout value used on the context when sending RenewMessageLock() request. + // Defaults to 5 seconds if not set or 0. Defaults to Lock Expiry time if set to a negative value. + LockRenewalTimeout *time.Duration // CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped. // Defaults to true. CancelMessageContextOnStop *bool @@ -34,12 +37,16 @@ type LockRenewalOptions struct { // NewRenewLockHandler returns a middleware handler that will renew the lock on the message at the specified interval. func NewRenewLockHandler(options *LockRenewalOptions, handler Handler) HandlerFunc { interval := 10 * time.Second + lockRenewalTimeout := 5 * time.Second cancelMessageContextOnStop := true metricRecorder := processor.Metric if options != nil { if options.Interval != nil { interval = *options.Interval } + if options.LockRenewalTimeout != nil && *options.LockRenewalTimeout != 0 { + lockRenewalTimeout = *options.LockRenewalTimeout + } if options.CancelMessageContextOnStop != nil { cancelMessageContextOnStop = *options.CancelMessageContextOnStop } @@ -52,6 +59,7 @@ func NewRenewLockHandler(options *LockRenewalOptions, handler Handler) HandlerFu next: handler, lockRenewer: settler, renewalInterval: &interval, + renewalTimeout: &lockRenewalTimeout, metrics: metricRecorder, cancelMessageCtxOnStop: cancelMessageContextOnStop, stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking @@ -77,6 +85,7 @@ type peekLockRenewer struct { next Handler lockRenewer LockRenewer renewalInterval *time.Duration + renewalTimeout *time.Duration metrics processor.Recorder alive atomic.Bool cancelMessageCtxOnStop bool @@ -127,7 +136,7 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a } logger.Info("renewing lock") count++ - err := plr.lockRenewer.RenewMessageLock(ctx, message, nil) + err := plr.renewMessageLock(ctx, message, nil) if err != nil { logger.Error(fmt.Sprintf("failed to renew lock: %s", err)) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { @@ -170,3 +179,34 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a } } } + +func (plr *peekLockRenewer) renewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error { + lockLostErr := &azservicebus.Error{Code: azservicebus.CodeLockLost} + if message.LockedUntil == nil || time.Until(*message.LockedUntil) < 0 { + // if the lock doesn't exist or is already expired, we should not attempt to renew it. + return lockLostErr + } + renewalTimeout := time.Until(*message.LockedUntil) + if *plr.renewalTimeout > 0 { + renewalTimeout = *plr.renewalTimeout + } + // we should keep retrying until lock expiry or until message context is done + for time.Until(*message.LockedUntil) > 0 && ctx.Err() == nil { + var renewErr error + func() { + getLogger(ctx).Info(fmt.Sprintf("renewing lock with timeout: %s", renewalTimeout)) + renewalCtx, cancel := context.WithTimeout(ctx, renewalTimeout) + defer cancel() + renewErr = plr.lockRenewer.RenewMessageLock(renewalCtx, message, options) + }() + // exit the renewal if we get any error other than context deadline exceeded or if the error is nil. + if !errors.Is(renewErr, context.DeadlineExceeded) { + return renewErr + } + } + // lock is expired or message context is done + if ctx.Err() != nil { + return ctx.Err() + } + return lockLostErr +} diff --git a/v2/lockrenewer_test.go b/v2/lockrenewer_test.go index a6ae781..8c7766a 100644 --- a/v2/lockrenewer_test.go +++ b/v2/lockrenewer_test.go @@ -20,6 +20,7 @@ import ( type fakeSBRenewLockSettler struct { fakeSettler + Delay time.Duration PerMessage map[*azservicebus.ReceivedMessage]*atomic.Int32 mapLock sync.Mutex Err error @@ -27,6 +28,15 @@ type fakeSBRenewLockSettler struct { func (r *fakeSBRenewLockSettler) RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, _ *azservicebus.RenewMessageLockOptions) error { + if r.Delay > 0 { + select { + case <-time.After(r.Delay): + break + case <-ctx.Done(): + r.RenewCalled.Add(1) + return r.Err + } + } r.RenewCalled.Add(1) r.mapLock.Lock() defer r.mapLock.Unlock() @@ -54,7 +64,9 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) { err := settler.CompleteMessage(ctx, message, nil) g.Expect(err).To(Not(HaveOccurred())) })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) defer cancel() lr.Handle(ctx, settler, msg) @@ -80,8 +92,12 @@ func Test_RenewalHandlerStayIndependentPerMessage(t *testing.T) { // send 2 message with different context, cancel the 2nd context right away. // The 2nd message should not be renewed. // The 1st message should be renewed exactly twice - msg1 := &azservicebus.ReceivedMessage{} - msg2 := &azservicebus.ReceivedMessage{} + msg1 := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } + msg2 := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx := context.Background() ctx1, cancel1 := context.WithCancel(ctx) ctx2, cancel2 := context.WithCancel(ctx) @@ -116,7 +132,9 @@ func Test_RenewPeriodically(t *testing.T) { message *azservicebus.ReceivedMessage) { time.Sleep(150 * time.Millisecond) })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) defer cancel() lr.Handle(ctx, settler, msg) @@ -136,7 +154,9 @@ func Test_NewLockRenewalHandler_RenewPeriodically(t *testing.T) { message *azservicebus.ReceivedMessage) { time.Sleep(150 * time.Millisecond) })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) defer cancel() lr.Handle(ctx, settler, msg) @@ -168,9 +188,11 @@ func Test_RenewPeriodically_Error(t *testing.T) { }, }, { - name: "stop periodic renewal on context canceled", + name: "stop periodic renewal on renewal context canceled", isRenewerCanceled: false, - settler: &fakeSBRenewLockSettler{Err: context.Canceled}, + settler: &fakeSBRenewLockSettler{ + Err: context.Canceled, + }, verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { g.Consistently( func(g Gomega) { @@ -184,7 +206,7 @@ func Test_RenewPeriodically_Error(t *testing.T) { }, }, { - name: "stop periodic renewal on context canceled", + name: "stop periodic renewal on msg context canceled", isRenewerCanceled: true, settler: &fakeSBRenewLockSettler{Err: context.Canceled}, verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { @@ -264,7 +286,9 @@ func Test_RenewPeriodically_Error(t *testing.T) { break } })) - msg := &azservicebus.ReceivedMessage{} + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) if tc.isRenewerCanceled { cancel() @@ -275,3 +299,360 @@ func Test_RenewPeriodically_Error(t *testing.T) { }) } } + +func Test_RenewTimeoutOption(t *testing.T) { + type testCase struct { + name string + settler *fakeSBRenewLockSettler + isRenewerCanceled bool + renewTimeout *time.Duration + cancelCtxOnStop *bool + completeDelay time.Duration + processorCtx context.Context + gotMessageCtx context.Context + verify func(g Gomega, tc *testCase, metrics *processor.Informer) + } + testCases := []testCase{ + { + name: "should time out sooner than renewal interval if set", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + // customized error to check renewal timeout config + Err: fmt.Errorf("renew timeout"), + }, + renewTimeout: to.Ptr(time.Duration(10) * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // first renewal attempt at 0ms and finish at 10ms + // second renewal attempt start at 60ms and finish at 70ms + // third renewal attempt start at 120ms and finish at 130ms + // eventually, the downstream handler completed at 180ms and message context was canceled + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(3))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should time out later than renewal interval if set", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(200) * time.Millisecond, + // customized error to check renewal timeout config + Err: fmt.Errorf("renew timeout"), + }, + renewTimeout: to.Ptr(time.Duration(100) * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // first renewal attempt at 0ms and finish at 100ms + // second renewal attempt start at 150ms and is supposed to finish at 250ms + // downstream handler completed at 180ms and message context was canceled before secondary attempt finished + // the second renewal attempt was not finished and RenewCalled should be 1 + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) }, + 300*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should eventually exit if RenewMessageLock() hangs and renewTimeout is disabled", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(300) * time.Millisecond, + // customized error to check renewal timeout config + Err: fmt.Errorf("renew timeout"), + }, + renewTimeout: to.Ptr(time.Duration(-1)), + completeDelay: 300 * time.Millisecond, + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // first renewal attempt at 0ms and hangs, processor context times out at 210ms + // the second renewal attempt was not made and RenewCalled should be 1 + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) }, + 300*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + // message context was canceled by lockrenewer + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.DeadlineExceeded)) + // processor context timed out + g.Expect(tc.processorCtx.Err()).To(Equal(context.DeadlineExceeded)) + }, + }, + { + name: "should exit after first lock renewal failure due to context canceled", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + // customized error to check renewal timeout config + Err: context.Canceled, + }, + renewTimeout: to.Ptr(time.Duration(0)), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // first renewal attempt at 0ms and finish at 50ms with error Canceled + // lock renew handler cancels message context of downstream handler + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) }, + 60*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + interval := 50 * time.Millisecond + reg := processor.NewRegistry() + reg.Init(prometheus.NewRegistry()) + informer := processor.NewInformerFor(reg) + lr := shuttle.NewRenewLockHandler( + &shuttle.LockRenewalOptions{ + Interval: &interval, + CancelMessageContextOnStop: tc.cancelCtxOnStop, + LockRenewalTimeout: tc.renewTimeout, + MetricRecorder: reg, + }, + shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, + message *azservicebus.ReceivedMessage) { + tc.gotMessageCtx = ctx + completeDelay := 180 * time.Millisecond + if tc.completeDelay > 0 { + completeDelay = tc.completeDelay + } + select { + case <-time.After(completeDelay): + break + case <-ctx.Done(): + break + } + })) + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } + ctx, cancel := context.WithTimeout(context.Background(), 210*time.Millisecond) + tc.processorCtx = ctx + if tc.isRenewerCanceled { + cancel() + } + defer cancel() + lr.Handle(ctx, tc.settler, msg) + tc.verify(NewWithT(t), &tc, informer) + }) + } +} + +func Test_RenewRetry(t *testing.T) { + type testCase struct { + name string + settler *fakeSBRenewLockSettler + isRenewerCanceled bool + renewTimeout *time.Duration + cancelCtxOnStop *bool + msgLockedDuration time.Duration + completeDelay time.Duration + processorCtx context.Context + gotMessageCtx context.Context + verify func(g Gomega, tc *testCase, metrics *processor.Informer) + } + testCases := []testCase{ + { + name: "should not attempt to renew if lock is already expired", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + Err: context.DeadlineExceeded, + }, + msgLockedDuration: 0, + renewTimeout: to.Ptr(60 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(0))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should continue retry if error is context deadline exceeded", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + Err: context.DeadlineExceeded, + }, + msgLockedDuration: 1 * time.Minute, + renewTimeout: to.Ptr(60 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // renewal times out every 60ms with context.DeadlineExceeded error + // retry should continue until the downstream handler completes at 180ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(3))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should continue retry until upstream context deadline exceeded", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + Err: context.DeadlineExceeded, + }, + msgLockedDuration: 1 * time.Minute, + renewTimeout: to.Ptr(50 * time.Millisecond), + completeDelay: 300 * time.Millisecond, + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // renewal times out every 50ms with context.Canceled error + // retry should continue until upstream context exceeds deadline at 210ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(4))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.DeadlineExceeded)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(Equal(context.DeadlineExceeded)) + }, + }, + { + name: "should continue retry until lock expires", + settler: &fakeSBRenewLockSettler{ + // set delay to be greater than interval to check for lockTimeout + Delay: time.Duration(100) * time.Millisecond, + Err: context.DeadlineExceeded, + }, + msgLockedDuration: 130 * time.Millisecond, + renewTimeout: to.Ptr(50 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // renewal times out every 50ms with context.Canceled error + // retry should continue until lock expires at 110ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should not retry if renew is successful", + settler: &fakeSBRenewLockSettler{}, + msgLockedDuration: 1 * time.Minute, + renewTimeout: to.Ptr(100 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // should renew 3 times before message completes at 180ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(3))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should not retry if renew fails for unknown error", + settler: &fakeSBRenewLockSettler{ + Err: fmt.Errorf("unknown error"), + }, + msgLockedDuration: 1 * time.Minute, + renewTimeout: to.Ptr(100 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // should renew 3 times before message completes at 180ms + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(3))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should not retry if renew fails for LockLost error", + settler: &fakeSBRenewLockSettler{ + Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}, + }, + msgLockedDuration: 1 * time.Minute, + renewTimeout: to.Ptr(100 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // should immediately exit lock renewal because the error is permanent + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + { + name: "should not retry if renew fails due to context canceled", + settler: &fakeSBRenewLockSettler{ + Err: context.Canceled, + }, + msgLockedDuration: 1 * time.Minute, + renewTimeout: to.Ptr(100 * time.Millisecond), + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + // should exit the renewal after first attempt because context is canceled + g.Eventually( + func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) }, + 180*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + // processor context healthy because we finished early + g.Expect(tc.processorCtx.Err()).To(BeNil()) + }, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + interval := 50 * time.Millisecond + reg := processor.NewRegistry() + reg.Init(prometheus.NewRegistry()) + informer := processor.NewInformerFor(reg) + lr := shuttle.NewRenewLockHandler( + &shuttle.LockRenewalOptions{ + Interval: &interval, + CancelMessageContextOnStop: tc.cancelCtxOnStop, + LockRenewalTimeout: tc.renewTimeout, + MetricRecorder: reg, + }, + shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, + message *azservicebus.ReceivedMessage) { + tc.gotMessageCtx = ctx + completeDelay := 180 * time.Millisecond + if tc.completeDelay > 0 { + completeDelay = tc.completeDelay + } + select { + case <-time.After(completeDelay): + break + case <-ctx.Done(): + break + } + })) + msg := &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(tc.msgLockedDuration)), + } + ctx, cancel := context.WithTimeout(context.Background(), 210*time.Millisecond) + tc.processorCtx = ctx + if tc.isRenewerCanceled { + cancel() + } + defer cancel() + lr.Handle(ctx, tc.settler, msg) + tc.verify(NewWithT(t), &tc, informer) + }) + } +} diff --git a/v2/processor_test.go b/v2/processor_test.go index 9744d3d..a0a433d 100644 --- a/v2/processor_test.go +++ b/v2/processor_test.go @@ -581,14 +581,18 @@ func TestProcessorStart_MultiProcessorWithNewRenewLockHandler(t *testing.T) { func messagesChannel(messageCount int) chan *azservicebus.ReceivedMessage { messages := make(chan *azservicebus.ReceivedMessage, messageCount) for i := 0; i < messageCount; i++ { - messages <- &azservicebus.ReceivedMessage{} + messages <- &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } } return messages } func enqueueCount(q chan *azservicebus.ReceivedMessage, messageCount int) { for i := 0; i < messageCount; i++ { - q <- &azservicebus.ReceivedMessage{} + q <- &azservicebus.ReceivedMessage{ + LockedUntil: to.Ptr(time.Now().Add(1 * time.Minute)), + } } }