Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addRetryableExceptions has no effect when called on the DefaultErrorHandler #3621

Open
devjeff opened this issue Nov 13, 2024 · 7 comments
Open
Milestone

Comments

@devjeff
Copy link

devjeff commented Nov 13, 2024

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.2.4

Describe the bug

I tried to use the DefaultErrorHandler with a batch message listener. Here I wanted to specify some retryable exceptions (e. g. JDBCConnectionException) that cause a retry (as long as the backoff limit is not exceeded). What I noticed, is that calls to the metods "addRetryableExceptions " and "addNotRetryableExceptions" apparently have no effect.

The DefaultErrorHandler delegates error handling to a FallbackBatchErrorHandler (if the thrown exception is not a BatchListenerFailedException), but this handler never accesses the internal classifier instance of the DefaultErrorHandler, but instead creates a new one with the default exception list, s. also default constructor of the ExceptionClassifier.

To Reproduce

  1. Create a Kafka message listener with "@KafkaListener" and "@KafkaHandler"
  2. Create a DefaultErrorHandler instance and call the method "addRetryableExceptions" to configure a retryable exception
  3. Configure a ConcurrentKafkaListenerContainerFactory with by calling cf.setCommonErrorHandler(errorHandler)
  4. Also call cf.setBatchListener(true);, although I think that this is not necessary to reproduce the problem
  5. Throw the retryable exception during message processing in the message listener
  6. Send some messages to the Kafka topic. When the error occurs, a retry should be performed, but it isn't

Expected behavior

Maybe, this is the desired behaviour. If the message recovery fails, then the batch will be consumed again by the next poll anyway, which is effectively the same as a retry (and can result in an endless loop until the error is fixed). Still I think, that the DefaultErrorHandler should either not extend ExceptionClassifier or at least contain a description in the Javadoc explaining that the "addRetryableExceptions " and "addNotRetryableExceptions" have no effect on this class.

@artembilan
Copy link
Member

So, logic there in the DefaultErrorHandler like this:

	public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff,
			@Nullable BackOffHandler backOffHandler) {

		super(recoverer, backOff, backOffHandler, createFallback(backOff, recoverer));
	}

	private static CommonErrorHandler createFallback(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
		return new FallbackBatchErrorHandler(backOff, recoverer);
	}

So, yeah, whatever we set into that DefaultErrorHandler has no effect.

We probably need to propagate those addRetryableExceptions and addNotRetryableExceptions into that fallbackBatchHandler as we do for many other props:

	public void setRetryListeners(RetryListener... listeners) {
		super.setRetryListeners(listeners);
		if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
			handler.setRetryListeners(listeners);
		}
	}

	public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
		super.setClassifications(classifications, defaultValue);
		if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
			handler.setClassifications(classifications, defaultValue);
		}
	}

BTW, you can use that setClassifications() for now.

Feel free to contribute the fix: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc !

@chickenchickenlove
Copy link
Contributor

Hi, @devjeff !
Do you have a plan to work on this issue?
If not, I was wondering if it would be okay for me to take it on!

@devjeff
Copy link
Author

devjeff commented Nov 17, 2024

Hi, @devjeff ! Do you have a plan to work on this issue? If not, I was wondering if it would be okay for me to take it on!

Hi. I had no time to work on it so far. Therefore, it would be nice if you can take it over 👍

@chickenchickenlove
Copy link
Contributor

@devjeff Thanks a lot! I'm going to dig into this 😃

@chickenchickenlove
Copy link
Contributor

chickenchickenlove commented Nov 17, 2024

Hi, @artembilan , @devjeff .

Hm... I think addNotRetryableExceptions() works well in batch listener as well.
I set commonErrorHandler described below.

errorHandler.setRetryListeners(retryListener);
errorHandler.addNotRetryableExceptions(MyCustomException.class);
factory.setCommonErrorHandler(errorHandler);

After batch listener throws MyCustomException,
I check the state of FallbackBatchErrorHandler.
The classifier instance which FallbackBatchErrorHandler has, has MyCustomException which I configured in its classified field describe below.

image

Then, the ErrorHandlingUtils decide to not retry it.
ErrorHandlingUtils delegate to classifier whether failed record should be retried.
classifier decide to not retry MyCustomException described below.

image

In conclusion, IMHO, it does works well.

For your information

ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners, getClassifier(),
this.reclassifyOnExceptionChange);

FallbackBatchErrorHandler gets classifier by using getClassifier().

protected BinaryExceptionClassifier getClassifier() {
return this.classifier;
}

ExceptionClassifier returns this.classifier.
this.classifier is from DefaultErrorHandler which I configured before.

Exception recoveryException = thrownException;
Exception lastException = unwrapIfNeeded(thrownException);
Boolean retryable = classifier.classify(lastException);
while (Boolean.TRUE.equals(retryable) && nextBackOff != BackOffExecution.STOP) {

Then, ErrorHandlingUtils uses it to determine whether to retry or not retry.

@devjeff
Copy link
Author

devjeff commented Nov 19, 2024

Yes, I've seen this code, but the exceptions were not added to the classifier of the FallbackBatchErrorHandler. The classifier from the DefaultErrorHandler is not forwarded to the FallbackBatchErrorHandler in any way. Maybe you encountered some default behaviour. Can you try to add the exception to retryable exceptions and check if this works?

errorHandler.addRetryableExceptions(MyCustomException.class);

@chickenchickenlove
Copy link
Contributor

chickenchickenlove commented Nov 19, 2024

@devjeff
Yes, same result.
image
image
This images demonstrate that addNotRetryableExceptions() works well in case of batch.
Please see,

  1. retryable : false
  2. lastException : MyCustomException
  3. classifier -> classified -> MyCustomException -> False

If I configure addRetryableExceptions(MyCustomException.class),
image

  1. retryable : true
  2. lastException : MyCustomException
  3. classifier -> classified -> No MyCustomException

In conclusion,
If I set addNotRetryableExceptions(MyCustomException), retryable is false.
If I set addRetryableExceptions(MyCustomException), retryable is true.

If you are still in trouble, how about checking whether the ConcurrentKafkaListenerContainerFactory you created has been properly registered as a Spring bean?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants