-
Notifications
You must be signed in to change notification settings - Fork 631
Description
Describe the issue
I'm using SCDF 2.11
with Kafka
and I'm trying to implement processors in which we don't want to lose any message.
So we want that our processors use transactional producer and consumer which acks messages only if they are processed OK or, when an error occurs, if a message has been put in DLQ.
But when I have an error in the processor, the acks of the consumed offset failed after creating the message in the DLQ so the offset is consumed again and again and I end up with many identical message in th DLQ.
To Reproduce
Here is my configuration :
# Producer config
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=my-tx-
spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all
spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=5
spring.cloud.stream.kafka.binder.transaction.producer.configuration.max.block.ms=5000
spring.cloud.stream.kafka.binder.transaction.producer.configuration.delivery.timeout.ms=4500
spring.cloud.stream.kafka.binder.transaction.producer.configuration.request.timeout.ms=2000
spring.cloud.stream.kafka.binder.transaction.producer.configuration.linger.ms=0
spring.cloud.stream.kafka.binder.transaction.producer.configuration.batch.size=0
# Consumer config
spring.cloud.stream.kafka.binder.consumer.isolation.level=read_committed
spring.cloud.stream.kafka.bindings.input.consumer.ackMode=MANUAL_IMMEDIATE
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.bindings.input.group=mygroup
Here is an extract of the stacktrace :
o.s.k.t.KafkaTransactionManager : Participating in existing transaction
o.s.c.s.b.k.KafkaMessageChannelBinder : Sent to DLQ a message with key='mykey' and payload='byte[1281]' received from 0: mytopic@152
o.s.t.support.TransactionTemplate : Initiating transaction rollback on application exception
java.lang.NullPointerException: Cannot invoke "org.springframework.kafka.support.Acknowledgment.acknowledge()" because the return value of "org.springframework.messaging.MessageHeaders.get(Object, java.lang.Class)" is null
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$DlqSender.sendToDlq(KafkaMessageChannelBinder.java:1639) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$handleRecordForDlq$10(KafkaMessageChannelBinder.java:1273) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
at org.springframework.transaction.support.TransactionOperations.lambda$executeWithoutResult$0(TransactionOperations.java:68) ~[spring-tx-6.1.16.jar:6.1.16]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.1.16.jar:6.1.16]
at org.springframework.transaction.support.TransactionOperations.executeWithoutResult(TransactionOperations.java:67) ~[spring-tx-6.1.16.jar:6.1.16]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleRecordForDlq(KafkaMessageChannelBinder.java:1272) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$6(KafkaMessageChannelBinder.java:1136) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:
Version of the framework
- Spring Boot 3.3.7
- Spring cloud stream binder Kafka 4.1.5
- Spring Kafka 3.2.6
Additional context
It seems that here in KafkaMessageChannelBinder, the ErrorMessage is created without the originalMessage :
Line 732 in 3a937de
.send(new ErrorMessage( |
So headers of the original message cannot be retrieved here :
Line 1265 in 3a937de
if (message instanceof ErrorMessage errorMessage) { |
And then a NPE is raised here :
Line 1630 in 3a937de
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge(); |