Combining DefaultAfterRollbackProcessor and CommonErrorHandler #3427
Unanswered
tommyk-gears
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I have a use case with a transactional record listener where want to stop the listener if a deserialization error occurs, and other errors I want to handle (and sometimes recover) in an
AfterRollbackProcessor
.For this use case I have
CommonDelegatingErrorHandler
that;RecordDeserializationException
errors to aCommonContainerStoppingErrorHandler
CommonErrorHandler
that always throws an exception (this is needed to have the transaction rolled back as far as I understand). (As an alternative I have also tried with aDefaultErrorHandler
with a recoverer that always throws an exception, but with same results).Then I have a
DefaultAfterRollbackProcessor
that actually performs recovery in some way depending on the error etc (e.g. send records to a DLT, or just swallow errors - effectively skipping the message, etc).The issue I face is that after a record has been recovered in the
DefaultAfterRollbackProcessor
, it is still redelivered on next poll iteration. If I remove theCommonErrorHandler
the message is skipped after successful recovery as expected.After some debugging it seems like
DefaultAfterRollbackProcessor
may need to seek forward after this line, since there may have been previous seeks to an earlier offset (e.g. fromDefaultErrorHandler
after recovery failed there).I tried replacing that line of code with this (the last line with
consumer.seek
is essentially what I added);And with this change my error handling setup works as expected as far as I can tell. But I am unsure how
kafkaTemplate.sendOffsetsToTransaction
relates toconsumer.seek
. What if weseek
to one offset and then send another offset insendOffsetsToTransaction
- what is the expected result?I guess my real question is - am I just doing this totally wrong? Is there a better way to handle both deserialization errors and other errors in a transactional record listener? Or is my conclusion regarding the extra seek in
DefaultAfterRollbackProcessor#process
valid?I added a commit with a few tests where the behaviour can be observed (the commit also contains the change to
DefaultAfterRollbackProcessor
in order to make the tests pass).Beta Was this translation helpful? Give feedback.
All reactions