-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19588: Reduce waiting for event completion in AsyncKafkaConsumer.poll() #20363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…poll() We create—and wait on—PollEvent in Consumer.poll() to ensure we wait for reconciliation and/or auto-commit. However, reconciliation is relatively rare, and auto-commit only happens every N seconds, so the remainder of the time, we should try to avoid sending poll events.
@lianetm Could you add the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kirktrue , took a first look, one high level concern for now
// the interval time or reconciling new assignments | ||
applicationEventHandler.add(event); | ||
|
||
if (reconciliationInProgress.get() || autoCommitState.shouldAutoCommit()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we end up with a race condition here if the app thread sees autoCommitState.shouldAutoCommit()
false at this point (because interval hasn't expired just yet), but by the time the background checks the same when processing the poll event the interval expired?
In that case, I expect the background would trigger the auto-commit while the app thread moved onto updating positions for fetching (and that leads to a whole new set of race conditions that we already dealt with before). Basically, whatever change we introduce here to not wait on Poll, needs to ensure that we retrieve the positions to commit before moving on to update fetch positions, that's the main challenge with this change I expect. Thinking, but not sure yet how to address that if we don't wait on Poll. Thougts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @lianetm that this is opening up the risk of race conditions. However, I think the principle here is a good one. The risk part here is related to the auto-commit timer. If auto-commit is not enabled, we absolutely know that we are not racing with the auto-commit timer. If it is enabled, we are potentially in a race. So, a slight twist on this can safely optimise when auto-commit is not enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, and trying to push it a bit more, even with auto-commit enabled, maybe we can still optimise if we check the timer in the app thread, and then use an atomic var "timerExpired" that the background can reuse (instead of checking the timer again).
So we would optimse if auto-commit disabled, or if it's enabled but the timer hasn't expired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we end up with a race condition here if the app thread sees
autoCommitState.shouldAutoCommit()
false at this point (because interval hasn't expired just yet), but by the time the background checks the same when processing the poll event the interval expired?
No, simply because nothing updates the timer's sense of the current time between when the application thread submits the PollEvent
and when it's processed by the network thread.
The application thread determines the timestamp used by the auto-commit timer via PollEvent
(and AssignmentChangeEvent
). And it's only checked when CommitRequestManager.updateTimerAndMaybeCommit()
is invoked by the ApplicationEventProcessor
. So when we update the timer in the application thread via AutoCommitState.updateTimer()
and then call AutoCommitState.shouldAutoCommit()
, we should get the same result in either thread, at least until the auto-commit is actually kicked off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of trivial initial comments. I've skimmed over the PR and understand the overall flow now. I'll do a more in-depth review shortly.
* Reset the auto-commit timer to the provided time (backoff), so that the next auto-commit is | ||
* sent out then. If auto-commit is disabled this will perform no action. | ||
*/ | ||
void resetTimer(long retryBackoffMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why not final long retryBackoffMs
also?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
this.log = logContext.logger(AutoCommitState.class); | ||
this.timer = time.timer(autoCommitInterval); | ||
this.autoCommitInterval = autoCommitInterval; | ||
this.hasInflightCommit = new AtomicBoolean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't the presence of synchronized
on all of these methods make the use of AtomicBoolean
redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Fixed.
…g with the style from KAFKA-19589
@lianetm @AndrewJSchofield—Could you please add the `ci-approved' label? Thanks! |
@chia7712 @frankvicky—would you or someone you know to be familiar with the new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue: Thanks for the PR.
I have taken a quick pass and left a comment.
I will deep dive shortly.
...nts/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
Outdated
Show resolved
Hide resolved
…toCommitState and SharedReconciliationState
Renames and refactors SharedConsumerState, SharedAutoCommitState, and SharedReconciliationState to ThreadSafe* equivalents. Introduces ThreadSafeConsumerState and ThreadSafeShareConsumerState abstractions. Updates all usages, constructors, and tests to use the new thread-safe classes, improving clarity and consistency in shared state management between application and network threads.
Replaces the isAutoCommitEnabled() method in ThreadSafeAsyncConsumerState with direct access to autoCommitState().isAutoCommitEnabled(). Removes the redundant method and updates AsyncKafkaConsumer to use the new approach.
We create—and wait on—PollEvent in Consumer.poll() to ensure we wait for
reconciliation and/or auto-commit. However, reconciliation is relatively
rare, and auto-commit only happens every N seconds, so the remainder of
the time, we should try to avoid sending poll events.