Skip to content

Commit

Permalink
Add option to timeout during LockRenewal calls (#242)
Browse files Browse the repository at this point in the history
* add option to timeout during LockRenewal calls

* UT and fixes

* retry when we know we can

* fix

* fix existing ut

* added UT

* one more UT

* comments

* fix ut

* fix ut

* fix

* add check for LockUntil field

* add locklost failure case

* fix comments

* move renewal call inside renewMessageLock()
  • Loading branch information
karenychen authored Aug 5, 2024
1 parent f3bc6b3 commit 231d6c3
Show file tree
Hide file tree
Showing 3 changed files with 437 additions and 12 deletions.
42 changes: 41 additions & 1 deletion v2/lockrenewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type LockRenewer interface {
type LockRenewalOptions struct {
// Interval defines the frequency at which we renew the lock on the message. Defaults to 10 seconds.
Interval *time.Duration
// LockRenewalTimeout is the timeout value used on the context when sending RenewMessageLock() request.
// Defaults to 5 seconds if not set or 0. Defaults to Lock Expiry time if set to a negative value.
LockRenewalTimeout *time.Duration
// CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped.
// Defaults to true.
CancelMessageContextOnStop *bool
Expand All @@ -34,12 +37,16 @@ type LockRenewalOptions struct {
// NewRenewLockHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewRenewLockHandler(options *LockRenewalOptions, handler Handler) HandlerFunc {
interval := 10 * time.Second
lockRenewalTimeout := 5 * time.Second
cancelMessageContextOnStop := true
metricRecorder := processor.Metric
if options != nil {
if options.Interval != nil {
interval = *options.Interval
}
if options.LockRenewalTimeout != nil && *options.LockRenewalTimeout != 0 {
lockRenewalTimeout = *options.LockRenewalTimeout
}
if options.CancelMessageContextOnStop != nil {
cancelMessageContextOnStop = *options.CancelMessageContextOnStop
}
Expand All @@ -52,6 +59,7 @@ func NewRenewLockHandler(options *LockRenewalOptions, handler Handler) HandlerFu
next: handler,
lockRenewer: settler,
renewalInterval: &interval,
renewalTimeout: &lockRenewalTimeout,
metrics: metricRecorder,
cancelMessageCtxOnStop: cancelMessageContextOnStop,
stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking
Expand All @@ -77,6 +85,7 @@ type peekLockRenewer struct {
next Handler
lockRenewer LockRenewer
renewalInterval *time.Duration
renewalTimeout *time.Duration
metrics processor.Recorder
alive atomic.Bool
cancelMessageCtxOnStop bool
Expand Down Expand Up @@ -127,7 +136,7 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
}
logger.Info("renewing lock")
count++
err := plr.lockRenewer.RenewMessageLock(ctx, message, nil)
err := plr.renewMessageLock(ctx, message, nil)
if err != nil {
logger.Error(fmt.Sprintf("failed to renew lock: %s", err))
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
Expand Down Expand Up @@ -170,3 +179,34 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
}
}
}

func (plr *peekLockRenewer) renewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error {
lockLostErr := &azservicebus.Error{Code: azservicebus.CodeLockLost}
if message.LockedUntil == nil || time.Until(*message.LockedUntil) < 0 {
// if the lock doesn't exist or is already expired, we should not attempt to renew it.
return lockLostErr
}
renewalTimeout := time.Until(*message.LockedUntil)
if *plr.renewalTimeout > 0 {
renewalTimeout = *plr.renewalTimeout
}
// we should keep retrying until lock expiry or until message context is done
for time.Until(*message.LockedUntil) > 0 && ctx.Err() == nil {
var renewErr error
func() {
getLogger(ctx).Info(fmt.Sprintf("renewing lock with timeout: %s", renewalTimeout))
renewalCtx, cancel := context.WithTimeout(ctx, renewalTimeout)
defer cancel()
renewErr = plr.lockRenewer.RenewMessageLock(renewalCtx, message, options)
}()
// exit the renewal if we get any error other than context deadline exceeded or if the error is nil.
if !errors.Is(renewErr, context.DeadlineExceeded) {
return renewErr
}
}
// lock is expired or message context is done
if ctx.Err() != nil {
return ctx.Err()
}
return lockLostErr
}
Loading

0 comments on commit 231d6c3

Please sign in to comment.