diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index e5d09a163..317a15e8e 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -49,6 +49,8 @@ import ( const ( retryPollOperationInitialInterval = 200 * time.Millisecond retryPollOperationMaxInterval = 10 * time.Second + // How long the same poll task error can remain suppressed + lastPollTaskErrSuppressTime = 1 * time.Minute ) var ( @@ -170,6 +172,10 @@ type ( pollerRequestCh chan struct{} taskQueueCh chan interface{} sessionTokenBucket *sessionTokenBucket + + lastPollTaskErrMessage string + lastPollTaskErrStarted time.Time + lastPollTaskErrLock sync.Mutex } polledTask struct { @@ -304,9 +310,7 @@ func (bw *baseWorker) pollTask() { bw.retrier.Throttle(bw.stopCh) if bw.pollLimiter == nil || bw.pollLimiter.Wait(bw.limiterContext) == nil { task, err = bw.options.taskWorker.PollTask() - if err != nil && enableVerboseLogging { - bw.logger.Debug("Failed to poll for task.", tagError, err) - } + bw.logPollTaskError(err) if err != nil { if isNonRetriableError(err) { bw.logger.Error("Worker received non-retriable error. Shutting down.", tagError, err) @@ -333,6 +337,24 @@ func (bw *baseWorker) pollTask() { } } +func (bw *baseWorker) logPollTaskError(err error) { + bw.lastPollTaskErrLock.Lock() + defer bw.lastPollTaskErrLock.Unlock() + // No error means reset the message and time + if err == nil { + bw.lastPollTaskErrMessage = "" + bw.lastPollTaskErrStarted = time.Now() + return + } + // Log the error as warn if it doesn't match the last error seen or its over + // the time since + if err.Error() != bw.lastPollTaskErrMessage || time.Since(bw.lastPollTaskErrStarted) > lastPollTaskErrSuppressTime { + bw.logger.Warn("Failed to poll for task.", tagError, err) + bw.lastPollTaskErrMessage = err.Error() + bw.lastPollTaskErrStarted = time.Now() + } +} + func isNonRetriableError(err error) bool { if err == nil { return false