-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-19925: Fix transaction timeout handling during broker upgrades #21161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19925: Fix transaction timeout handling during broker upgrades #21161
Conversation
| self.perform_upgrade(from_kafka_version) | ||
|
|
||
| copier_timeout_sec = 180 | ||
| copier_timeout_sec = 360 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: due to timeouts and re-creation of producer, this copier_timeout needed to be increased. I experimented a bit and found that 360s was a consistently reliable value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I described in https://issues.apache.org/jira/browse/KAFKA-20000, the performance regression is caused by the backoff logic. Therefore, I suggest fixing the underlying issue instead of increasing the timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kindly asking, if this is something to consider? If so, would add some test for this adjustment.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Pankraz76 thanks for the effort. As Justine suggested, hardcoding the timeout is a bit coarse-grained. Please refer to KAFKA-20000 for more discussion.
|
@chia7712 can you take a look when you get a chance please? |
|
@FrancisGodinho thanks for you patch. I have identified some underlying issues in e2e and TV2. Addressing them should allow us to achieve more stable transaction behavior. Please check https://issues.apache.org/jira/browse/KAFKA-19999 and https://issues.apache.org/jira/browse/KAFKA-20000 for more details. |
Pankraz76
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
Show resolved
Hide resolved
…eCopier.java Co-authored-by: Vincent Potuček <[email protected]>
|
@Pankraz76 thanks for the comments, can you re-review please? |
Pankraz76
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue is very well documented, thanks for effort given.
| self.perform_upgrade(from_kafka_version) | ||
|
|
||
| copier_timeout_sec = 180 | ||
| copier_timeout_sec = 360 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| copier_timeout_sec = 360 | |
| copier_timeout_sec = 360 |
sorry again this something for SCA. Taking away the off-topics upfront.
spotless and rewrite both ready to fix on their own.
| // in case the producer gets stuck here, create a new one and continue the loop | ||
| try { producer.close(Duration.ofSeconds(0)); } catch (Exception ignore) {} | ||
| parsedArgs.getAttrs().put("transactionalId", parsedArgs.getString("transactionalId") + producerNumber++); | ||
| producer = createProducer(parsedArgs); | ||
| producer.initTransactions(); | ||
| resetToLastCommittedPositions(consumer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // in case the producer gets stuck here, create a new one and continue the loop | |
| try { producer.close(Duration.ofSeconds(0)); } catch (Exception ignore) {} | |
| parsedArgs.getAttrs().put("transactionalId", parsedArgs.getString("transactionalId") + producerNumber++); | |
| producer = createProducer(parsedArgs); | |
| producer.initTransactions(); | |
| resetToLastCommittedPositions(consumer); | |
| circutBreaker(); // in case the producer gets stuck here, create a new one and continue the loop |
could give dedicated to this concern apply single responsibility principle, giving more focus to each own. Here its just about breaking the circut, how this is actually done seems to be some kind of (randomly) changing impl. detail.
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 Signed-off-by: Vincent Potucek <[email protected]>
…#21161 Signed-off-by: Vincent Potucek <[email protected]>
…#21161 Signed-off-by: Vincent Potucek <[email protected]>
…ache#21161 apache#21168 Signed-off-by: Vincent Potucek <[email protected]>
…ache#21161 apache#21168 Signed-off-by: Vincent Potucek <[email protected]>
…pache#21161 #KAFKA-20000 Signed-off-by: Vincent Potucek <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work 🙇♂️
I left some minor comments in this PR and Jira as well.
However, if you decide to implement this on the server side instead of the client side, please feel free to ignore my review comments. Also, it seems that the cause of TimeoutException via TV2 is resolved by this PR (https://github.com/apache/kafka/pulls?q=is%3Apr+is%3Aclosed+KAFKA-19999). If so, please feel free to ignore my review.
| producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata); | ||
| try { | ||
| producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata); | ||
| } catch (KafkaException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, shouldn't we focus on handling the TimeoutException?
AFAIK, CONCURRENT_TRANSACTIONS eventually manifests as a TimeoutException on the client side. I'm concerned that broad scope retries might mask other underlying errors. In those cases, we need to clearly identify the root cause to take appropriate action.
What do you think?
| } catch (KafkaException e) { | ||
| // in case the producer gets stuck here, create a new one and continue the loop | ||
| try { producer.close(Duration.ofSeconds(0)); } catch (Exception ignore) {} | ||
| parsedArgs.getAttrs().put("transactionalId", parsedArgs.getString("transactionalId") + producerNumber++); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a safer way to generate a globally unique transactionalId to avoid collisions?
Simply appending an incremental number (producerNumber++) to the user-provided ID seems risky in a shared cluster environment. If the generated ID happens to match an existing transactionalId of a running production application, it could trigger the fencing mechanism, unintentionally aborting the active transactions of that application.
Perhaps appending a UUID or a random suffix would be a safer approach to ensure uniqueness?
What do you think?
Agreed, the root cause is the livelock, but fixing the retry is still valuable. +1 to trying the server-side approach. Let’s keeping this PR and handle the server-side fix in a separate PR |
|
@FrancisGodinho , thanks for the update! Following @chia7712 ’s suggestion to try the server-side approach (retrying internally on the broker side), I wanted to check on ownership for the follow-up work. Are you planning to implement the server-side fix as a separate PR? If so, I’m happy to help with reviews/testing. If you’d prefer to focus on the current client-side PR, I can pick up the follow-up server-side PR (or co-author it) to help move KAFKA-20000 forward. Totally up to you! CC. @chia7712 |
|
@chickenchickenlove @chia7712 yeah I think retrying on the server side would be better as well since less logic for the client. I'd like to take a stab at it since I will have some time this week, but I'll let you know if I need help/guidance (thanks for offering!) I'll implement the server side changes as a separate PR (since it's also a separate ticket) and ping you once I have some progress made! |
Problem
During broker upgrades, the
sendOffsetsToTransactioncall would sometimes hang. Logs showed that it continuously returnederrorCode=51which isCONCURRENT_TRANSACTION. The test would eventually hit its timeout and fail. This happened for every single version upgrade and occurred in around 30% of the runs.Resolution
The problem above left the producer in a broken state and even after 5-10 minutes of waiting, it didn't resolve itself (even if we waited a few minutes past the transaction.max.ms time). I tried multiple solutions including waiting extended periods of time and re-trying the
sendOffsetsToTransactionmultiple times whenever timeout occurred.Unfortunately, the producer was just permanently stuck and always receiving the

errorCode=51. In this case, the recommended resolution in the Kafka docs is to close the previous producer and create a new producer. https://kafka.apache.org/documentation/#usingtransactionsUsing the old transaction.id would continue to lead to a stuck state, so this fix creates a brand new producer with a new ID and then rewinds the consumer offset to ensure EOD.
Testing and Validation
Previously, I was able to run the test for a single version upgrade and have it fail within the first 5-10 runs. After the fix, I was able to run it 40 times continuously with 0 failures. I also ran the full test (all versions) ~5 times with 9/9 cases passing.