Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
19f29cf
WIP: Attempting to short-circuit event call in updateFetchPositions()
kirktrue Aug 6, 2025
2691b22
Experiment: skip the call to updateFetchPositions by accessing shared…
kirktrue Aug 7, 2025
f62bf8a
Setting cachedSubscriptionHasAllFetchPositions to false when assignme…
kirktrue Aug 7, 2025
386fafb
Reverting use of sharedOffsetsState.offsetFetcherUtils() and retrievi…
kirktrue Aug 8, 2025
9d6a23a
Renaming to CommitOffsetsSharedState
kirktrue Aug 8, 2025
ecccc15
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 10, 2025
cc35591
KAFKA-19588: Reduce number of events generated in AsyncKafkaConsumer.…
kirktrue Aug 10, 2025
0c4dff6
Updates
kirktrue Aug 10, 2025
2bcbc6b
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 10, 2025
e799cce
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 13, 2025
7550adb
Update to check the SubscriptionState instead of relying solely on th…
kirktrue Aug 15, 2025
d45011c
Updated naming
kirktrue Aug 15, 2025
9d1a934
Made AutoCommitState a top-level class and refactoring to support
kirktrue Aug 17, 2025
4242127
[WIP] Clean up
kirktrue Aug 17, 2025
e26b71c
Revert changes to CommitRequestManager to fix failures in CommitReque…
kirktrue Aug 17, 2025
92e6cfe
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 18, 2025
30f1380
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 18, 2025
b24ed3c
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 19, 2025
197cff4
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 19, 2025
1852c83
Remove unused state listener logic from CommitOffsetsSharedState
kirktrue Aug 23, 2025
d2310fc
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 25, 2025
e4b983d
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 25, 2025
336213a
Refactored so that NetworkClientDelegate's metadata error was availab…
kirktrue Aug 26, 2025
38666db
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 26, 2025
be5a406
Clearing the metadata error exception when thrown
kirktrue Aug 26, 2025
4cca2d8
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 26, 2025
0e28d8c
Remove redundant autoCommitState timer update in poll
kirktrue Aug 26, 2025
1ea59a1
Updates for clarity
kirktrue Aug 27, 2025
6595dbc
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 27, 2025
fa7a887
Added comments and reverted unnecessary changes
kirktrue Aug 27, 2025
12a4d0e
Ugh. Typo
kirktrue Aug 27, 2025
b26d7cf
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 27, 2025
a06ca40
Refactoring to make the intention a little more obvious and in keepin…
kirktrue Aug 27, 2025
d6fe1fd
Reverting unnecessary whitespace diffs
kirktrue Aug 27, 2025
23b16a9
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 27, 2025
3643e3e
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 28, 2025
278b06b
Optimization to send only one PollEvent per poll() does not work, nee…
kirktrue Aug 28, 2025
d95cc22
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 28, 2025
b190c7d
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Aug 28, 2025
9cf406f
Updated AbstractMembershipManager comments related to use of SharedAu…
kirktrue Aug 28, 2025
9469b29
Refactoring and JavaDoc clarification
kirktrue Aug 29, 2025
3c00853
Removed superfluous boolean return value from CheckAndUpdatePositions…
kirktrue Aug 29, 2025
04fcedd
Updated CheckAndUpdatePositionsEvent doc to specify that event comple…
kirktrue Aug 29, 2025
7dde181
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Sep 1, 2025
10deac6
s/Shared/ThreadSafe/g
kirktrue Sep 1, 2025
b057da6
Refactor auto-commit check in consumer state
kirktrue Sep 1, 2025
6e01aa0
Reverted unnecessary changes
kirktrue Sep 1, 2025
f523d99
Refactor ThreadSafeAutoCommitState and ThreadSafeConsumerState
kirktrue Sep 1, 2025
72de8a5
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Sep 1, 2025
964ce69
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Sep 3, 2025
f486e98
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Sep 4, 2025
f6bdd23
Merge branch 'trunk' into KAFKA-19589-reduce-events-in-update-fetch-p…
kirktrue Sep 4, 2025
897a632
Merge remote-tracking branch 'origin/KAFKA-19588-avoid-poll-event' in…
kirktrue Sep 18, 2025
4562413
Merge remote-tracking branch 'origin/KAFKA-19589-reduce-events-in-upd…
kirktrue Sep 18, 2025
8f563d2
Minor changes to reduce diff noise
kirktrue Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
*/
private final ConsumerMetadata metadata;

/**
* Keeps track of the auto-commit state.
*
* <p/>
*
* <em>Note</em>: per its class name, this state is <em>shared</em> with the application thread, so care must be
* taken to evaluate how it's used elsewhere when updating related logic.
*/
protected final ThreadSafeAutoCommitState autoCommitState;

/**
* Logger.
*/
Expand Down Expand Up @@ -137,10 +147,17 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl

/**
* If there is a reconciliation running (triggering commit, callbacks) for the
* assignmentReadyToReconcile. This will be true if {@link #maybeReconcile(boolean)} has been triggered
* after receiving a heartbeat response, or a metadata update.
* assignmentReadyToReconcile. {@link ThreadSafeReconciliationState#isInProgress()} will be true if
* {@link #maybeReconcile(boolean)} has been triggered after receiving a heartbeat response, or a metadata update.
* Calling code should generally favor {@link #reconciliationInProgress()} for its clarity over direct use of
* this state.
*
* <p/>
*
* <em>Note</em>: per its class name, this state is <em>shared</em> with the application thread, so care must be
* taken to evaluate how it's used elsewhere when updating related logic.
*/
private boolean reconciliationInProgress;
private final ThreadSafeReconciliationState reconciliationState;

/**
* True if a reconciliation is in progress and the member rejoined the group since the start
Expand Down Expand Up @@ -192,8 +209,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
*/
private boolean isPollTimerExpired;

private final boolean autoCommitEnabled;

/**
* Indicate the operation on consumer group membership that the consumer will perform when leaving the group.
* The property should remain {@code GroupMembershipOperation.DEFAULT} until the consumer is closing.
Expand All @@ -208,7 +223,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
Logger log,
Time time,
RebalanceMetricsManager metricsManager,
boolean autoCommitEnabled) {
ThreadSafeConsumerState threadSafeConsumerState) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.subscriptions = subscriptions;
Expand All @@ -220,7 +235,8 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
this.stateUpdatesListeners = new ArrayList<>();
this.time = time;
this.metricsManager = metricsManager;
this.autoCommitEnabled = autoCommitEnabled;
this.autoCommitState = threadSafeConsumerState.autoCommitState();
this.reconciliationState = threadSafeConsumerState.reconciliationState();
}

/**
Expand Down Expand Up @@ -530,7 +546,7 @@ public void transitionToJoining() {
"the member is in FATAL state");
return;
}
if (reconciliationInProgress) {
if (reconciliationInProgress()) {
rejoinedWhileReconciliationInProgress = true;
}
resetEpoch();
Expand Down Expand Up @@ -830,7 +846,7 @@ public void maybeReconcile(boolean canCommit) {
"current assignment.");
return;
}
if (reconciliationInProgress) {
if (reconciliationInProgress()) {
log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. " +
"Assignment {} will be handled in the next reconciliation loop.", currentTargetAssignment);
return;
Expand All @@ -850,7 +866,7 @@ public void maybeReconcile(boolean canCommit) {
return;
}

if (autoCommitEnabled && !canCommit) return;
if (autoCommitState.isAutoCommitEnabled() && !canCommit) return;
markReconciliationInProgress();

// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are
Expand Down Expand Up @@ -963,7 +979,7 @@ private void revokeAndAssign(LocalAssignment resolvedAssignment,
log.error("Reconciliation failed.", error);
markReconciliationCompleted();
} else {
if (reconciliationInProgress && !maybeAbortReconciliation()) {
if (reconciliationInProgress() && !maybeAbortReconciliation()) {
currentAssignment = resolvedAssignment;

signalReconciliationCompleting();
Expand Down Expand Up @@ -1034,15 +1050,15 @@ protected CompletableFuture<Void> signalPartitionsLost(Set<TopicPartition> parti
* Visible for testing.
*/
void markReconciliationInProgress() {
reconciliationInProgress = true;
reconciliationState.setInProgress(true);
rejoinedWhileReconciliationInProgress = false;
}

/**
* Visible for testing.
*/
void markReconciliationCompleted() {
reconciliationInProgress = false;
reconciliationState.setInProgress(false);
rejoinedWhileReconciliationInProgress = false;
}

Expand Down Expand Up @@ -1372,7 +1388,7 @@ Map<Uuid, SortedSet<Integer>> topicPartitionsAwaitingReconciliation() {
* by a call to {@link #maybeReconcile(boolean)}. Visible for testing.
*/
boolean reconciliationInProgress() {
return reconciliationInProgress;
return reconciliationState.isInProgress();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,11 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() {
private final long retryBackoffMs;
private final int requestTimeoutMs;
private final Duration defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private final ThreadSafeAsyncConsumerState threadSafeConsumerState;
private volatile boolean closed = false;
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();

// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
Expand Down Expand Up @@ -388,7 +386,6 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
GroupRebalanceConfig.ProtocolType.CONSUMER
);
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
LogContext logContext = createLogContext(config, groupRebalanceConfig);
this.backgroundEventQueue = backgroundEventQueue;
this.log = logContext.logger(getClass());
Expand Down Expand Up @@ -430,6 +427,19 @@ public AsyncKafkaConsumer(final ConsumerConfig config,

// This FetchBuffer is shared between the application and network threads.
this.fetchBuffer = new FetchBuffer(logContext);
this.threadSafeConsumerState = new ThreadSafeAsyncConsumerState(
ThreadSafeAutoCommitState.fromConfig(
logContext,
config,
time
),
logContext,
metadata,
subscriptions,
time,
retryBackoffMs,
apiVersions
);
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(time,
logContext,
metadata,
Expand All @@ -440,7 +450,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler,
false,
asyncConsumerMetrics
asyncConsumerMetrics,
threadSafeConsumerState
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
Expand All @@ -459,7 +470,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
metrics,
offsetCommitCallbackInvoker,
memberStateListener,
streamsRebalanceData
streamsRebalanceData,
threadSafeConsumerState
);
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata,
Expand All @@ -473,7 +485,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier,
asyncConsumerMetrics
asyncConsumerMetrics,
threadSafeConsumerState
);
this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
Expand Down Expand Up @@ -532,7 +545,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
int requestTimeoutMs,
int defaultApiTimeoutMs,
String groupId,
boolean autoCommitEnabled) {
ThreadSafeAsyncConsumerState threadSafeConsumerState) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
Expand All @@ -557,7 +570,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, CONSUMER_METRIC_GROUP);
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
this.threadSafeConsumerState = threadSafeConsumerState;
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
Expand All @@ -577,7 +590,6 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.fetchBuffer = new FetchBuffer(logContext);
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.time = time;
Expand Down Expand Up @@ -623,6 +635,19 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
new RebalanceCallbackMetricsManager(metrics)
);
ApiVersions apiVersions = new ApiVersions();
this.threadSafeConsumerState = new ThreadSafeAsyncConsumerState(
ThreadSafeAutoCommitState.fromConfig(
logContext,
config,
time
),
logContext,
metadata,
subscriptions,
time,
retryBackoffMs,
apiVersions
);
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(
time,
config,
Expand All @@ -631,7 +656,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
metadata,
backgroundEventHandler,
false,
asyncConsumerMetrics
asyncConsumerMetrics,
threadSafeConsumerState
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(
Expand All @@ -650,7 +676,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
metrics,
offsetCommitCallbackInvoker,
memberStateListener,
Optional.empty()
Optional.empty(),
threadSafeConsumerState
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
Expand All @@ -665,7 +692,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier,
asyncConsumerMetrics);
asyncConsumerMetrics,
threadSafeConsumerState);
this.streamsRebalanceListenerInvoker = Optional.empty();
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
Expand All @@ -682,7 +710,8 @@ ApplicationEventHandler build(
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final Supplier<RequestManagers> requestManagersSupplier,
final AsyncConsumerMetrics asyncConsumerMetrics
final AsyncConsumerMetrics asyncConsumerMetrics,
final ThreadSafeConsumerState threadSafeConsumerState
);

}
Expand Down Expand Up @@ -833,14 +862,7 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}

do {
PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are still polling.
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or reconciling new assignments
applicationEventHandler.add(event);
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
sendPollEvent(timer);

// We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling
Expand Down Expand Up @@ -876,6 +898,29 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}
}

private void sendPollEvent(Timer timer) {
long currentTimeMs = timer.currentTimeMs();

// Make sure to let the background thread know that we are still polling.
PollEvent event = new PollEvent(currentTimeMs);

if (threadSafeConsumerState.canSkipWaitingOnPoll(currentTimeMs)) {
// This will *not* trigger async auto-commits of consumed positions as the shared Timer for
// auto-commit interval will not change between the application thread and the network thread. This
// is true of the reconciliation state. The state will not change between the SharedConsumerState
// check above and the processing of the PollEvent.
applicationEventHandler.add(event);
} else {
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or reconciling new assignments
applicationEventHandler.add(event);

// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
}
}

/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
* partitions.
Expand Down Expand Up @@ -1488,7 +1533,7 @@ private void autoCommitOnClose(final Timer timer) {
if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
return;

if (autoCommitEnabled)
if (threadSafeConsumerState.autoCommitState().isAutoCommitEnabled())
commitSyncAllConsumed(timer);

applicationEventHandler.add(new CommitOnCloseEvent());
Expand Down Expand Up @@ -1777,9 +1822,9 @@ private Fetch<K, V> pollForFetches(Timer timer) {
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure

// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
// NOTE: for hasAllFetchPositions to return the correct answer, we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
if (!subscriptions.hasAllFetchPositions() && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}

Expand Down Expand Up @@ -1834,11 +1879,21 @@ private Fetch<K, V> collectFetch() {
* defined
*/
private boolean updateFetchPositions(final Timer timer) {
cachedSubscriptionHasAllFetchPositions = false;
// Fetch position validation is in the hot path for poll() and the cost of thread interaction for
// event processing is *very* heavy, CPU-wise. In a stable system, the positions are valid; having the
// network thread check the validity yields the same answer 99%+ of the time. But calling the
// network thread to determine that is very expensive.
//
// Instead, let the *application thread* determine if any partitions need their positions updated. If not,
// the application thread can skip sending an event to the network thread that will simply end up coming
// to the same conclusion, albeit much slower.
if (threadSafeConsumerState.canSkipUpdateFetchPositions())
return true;

try {
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
} catch (TimeoutException e) {
return false;
} finally {
Expand Down
Loading