diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java index ffe01c089e7bf..520766cd0fedb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java @@ -110,6 +110,16 @@ public abstract class AbstractMembershipManager impl */ private final ConsumerMetadata metadata; + /** + * Keeps track of the auto-commit state. + * + *

+ * + * Note: per its class name, this state is shared with the application thread, so care must be + * taken to evaluate how it's used elsewhere when updating related logic. + */ + protected final ThreadSafeAutoCommitState autoCommitState; + /** * Logger. */ @@ -137,10 +147,17 @@ public abstract class AbstractMembershipManager impl /** * If there is a reconciliation running (triggering commit, callbacks) for the - * assignmentReadyToReconcile. This will be true if {@link #maybeReconcile(boolean)} has been triggered - * after receiving a heartbeat response, or a metadata update. + * assignmentReadyToReconcile. {@link ThreadSafeReconciliationState#isInProgress()} will be true if + * {@link #maybeReconcile(boolean)} has been triggered after receiving a heartbeat response, or a metadata update. + * Calling code should generally favor {@link #reconciliationInProgress()} for its clarity over direct use of + * this state. + * + *

+ * + * Note: per its class name, this state is shared with the application thread, so care must be + * taken to evaluate how it's used elsewhere when updating related logic. */ - private boolean reconciliationInProgress; + private final ThreadSafeReconciliationState reconciliationState; /** * True if a reconciliation is in progress and the member rejoined the group since the start @@ -192,8 +209,6 @@ public abstract class AbstractMembershipManager impl */ private boolean isPollTimerExpired; - private final boolean autoCommitEnabled; - /** * Indicate the operation on consumer group membership that the consumer will perform when leaving the group. * The property should remain {@code GroupMembershipOperation.DEFAULT} until the consumer is closing. @@ -208,7 +223,7 @@ public abstract class AbstractMembershipManager impl Logger log, Time time, RebalanceMetricsManager metricsManager, - boolean autoCommitEnabled) { + ThreadSafeConsumerState threadSafeConsumerState) { this.groupId = groupId; this.state = MemberState.UNSUBSCRIBED; this.subscriptions = subscriptions; @@ -220,7 +235,8 @@ public abstract class AbstractMembershipManager impl this.stateUpdatesListeners = new ArrayList<>(); this.time = time; this.metricsManager = metricsManager; - this.autoCommitEnabled = autoCommitEnabled; + this.autoCommitState = threadSafeConsumerState.autoCommitState(); + this.reconciliationState = threadSafeConsumerState.reconciliationState(); } /** @@ -530,7 +546,7 @@ public void transitionToJoining() { "the member is in FATAL state"); return; } - if (reconciliationInProgress) { + if (reconciliationInProgress()) { rejoinedWhileReconciliationInProgress = true; } resetEpoch(); @@ -830,7 +846,7 @@ public void maybeReconcile(boolean canCommit) { "current assignment."); return; } - if (reconciliationInProgress) { + if (reconciliationInProgress()) { log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. " + "Assignment {} will be handled in the next reconciliation loop.", currentTargetAssignment); return; @@ -850,7 +866,7 @@ public void maybeReconcile(boolean canCommit) { return; } - if (autoCommitEnabled && !canCommit) return; + if (autoCommitState.isAutoCommitEnabled() && !canCommit) return; markReconciliationInProgress(); // Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are @@ -963,7 +979,7 @@ private void revokeAndAssign(LocalAssignment resolvedAssignment, log.error("Reconciliation failed.", error); markReconciliationCompleted(); } else { - if (reconciliationInProgress && !maybeAbortReconciliation()) { + if (reconciliationInProgress() && !maybeAbortReconciliation()) { currentAssignment = resolvedAssignment; signalReconciliationCompleting(); @@ -1034,7 +1050,7 @@ protected CompletableFuture signalPartitionsLost(Set parti * Visible for testing. */ void markReconciliationInProgress() { - reconciliationInProgress = true; + reconciliationState.setInProgress(true); rejoinedWhileReconciliationInProgress = false; } @@ -1042,7 +1058,7 @@ void markReconciliationInProgress() { * Visible for testing. */ void markReconciliationCompleted() { - reconciliationInProgress = false; + reconciliationState.setInProgress(false); rejoinedWhileReconciliationInProgress = false; } @@ -1372,7 +1388,7 @@ Map> topicPartitionsAwaitingReconciliation() { * by a call to {@link #maybeReconcile(boolean)}. Visible for testing. */ boolean reconciliationInProgress() { - return reconciliationInProgress; + return reconciliationState.isInProgress(); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 938ae909027d0..96ae317361c60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -320,13 +320,11 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() { private final long retryBackoffMs; private final int requestTimeoutMs; private final Duration defaultApiTimeoutMs; - private final boolean autoCommitEnabled; + private final ThreadSafeAsyncConsumerState threadSafeConsumerState; private volatile boolean closed = false; // Init value is needed to avoid NPE in case of exception raised in the constructor private Optional clientTelemetryReporter = Optional.empty(); - // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates - private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; @@ -388,7 +386,6 @@ public AsyncKafkaConsumer(final ConsumerConfig config, GroupRebalanceConfig.ProtocolType.CONSUMER ); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); - this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); this.backgroundEventQueue = backgroundEventQueue; this.log = logContext.logger(getClass()); @@ -430,6 +427,19 @@ public AsyncKafkaConsumer(final ConsumerConfig config, // This FetchBuffer is shared between the application and network threads. this.fetchBuffer = new FetchBuffer(logContext); + this.threadSafeConsumerState = new ThreadSafeAsyncConsumerState( + ThreadSafeAutoCommitState.fromConfig( + logContext, + config, + time + ), + logContext, + metadata, + subscriptions, + time, + retryBackoffMs, + apiVersions + ); final Supplier networkClientDelegateSupplier = NetworkClientDelegate.supplier(time, logContext, metadata, @@ -440,7 +450,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler, false, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig)); @@ -459,7 +470,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, metrics, offsetCommitCallbackInvoker, memberStateListener, - streamsRebalanceData + streamsRebalanceData, + threadSafeConsumerState ); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, @@ -473,7 +485,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState ); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, @@ -532,7 +545,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, int requestTimeoutMs, int defaultApiTimeoutMs, String groupId, - boolean autoCommitEnabled) { + ThreadSafeAsyncConsumerState threadSafeConsumerState) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = clientId; @@ -557,7 +570,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics); this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, CONSUMER_METRIC_GROUP); this.clientTelemetryReporter = Optional.empty(); - this.autoCommitEnabled = autoCommitEnabled; + this.threadSafeConsumerState = threadSafeConsumerState; this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); this.backgroundEventHandler = new BackgroundEventHandler( backgroundEventQueue, @@ -577,7 +590,6 @@ public AsyncKafkaConsumer(final ConsumerConfig config, this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); - this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); this.fetchBuffer = new FetchBuffer(logContext); this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.time = time; @@ -623,6 +635,19 @@ public AsyncKafkaConsumer(final ConsumerConfig config, new RebalanceCallbackMetricsManager(metrics) ); ApiVersions apiVersions = new ApiVersions(); + this.threadSafeConsumerState = new ThreadSafeAsyncConsumerState( + ThreadSafeAutoCommitState.fromConfig( + logContext, + config, + time + ), + logContext, + metadata, + subscriptions, + time, + retryBackoffMs, + apiVersions + ); Supplier networkClientDelegateSupplier = () -> new NetworkClientDelegate( time, config, @@ -631,7 +656,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, metadata, backgroundEventHandler, false, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); Supplier requestManagersSupplier = RequestManagers.supplier( @@ -650,7 +676,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, metrics, offsetCommitCallbackInvoker, memberStateListener, - Optional.empty() + Optional.empty(), + threadSafeConsumerState ); Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, @@ -665,7 +692,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, - asyncConsumerMetrics); + asyncConsumerMetrics, + threadSafeConsumerState); this.streamsRebalanceListenerInvoker = Optional.empty(); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = new CompletableEventReaper(logContext); @@ -682,7 +710,8 @@ ApplicationEventHandler build( final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, final Supplier requestManagersSupplier, - final AsyncConsumerMetrics asyncConsumerMetrics + final AsyncConsumerMetrics asyncConsumerMetrics, + final ThreadSafeConsumerState threadSafeConsumerState ); } @@ -833,14 +862,7 @@ public ConsumerRecords poll(final Duration timeout) { } do { - PollEvent event = new PollEvent(timer.currentTimeMs()); - // Make sure to let the background thread know that we are still polling. - // This will trigger async auto-commits of consumed positions when hitting - // the interval time or reconciling new assignments - applicationEventHandler.add(event); - // Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests - // retrieve the positions to commit before proceeding with fetching new records - ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis()); + sendPollEvent(timer); // We must not allow wake-ups between polling for fetches and returning the records. // If the polled fetches are not empty the consumed position has already been updated in the polling @@ -876,6 +898,29 @@ public ConsumerRecords poll(final Duration timeout) { } } + private void sendPollEvent(Timer timer) { + long currentTimeMs = timer.currentTimeMs(); + + // Make sure to let the background thread know that we are still polling. + PollEvent event = new PollEvent(currentTimeMs); + + if (threadSafeConsumerState.canSkipWaitingOnPoll(currentTimeMs)) { + // This will *not* trigger async auto-commits of consumed positions as the shared Timer for + // auto-commit interval will not change between the application thread and the network thread. This + // is true of the reconciliation state. The state will not change between the SharedConsumerState + // check above and the processing of the PollEvent. + applicationEventHandler.add(event); + } else { + // This will trigger async auto-commits of consumed positions when hitting + // the interval time or reconciling new assignments + applicationEventHandler.add(event); + + // Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests + // retrieve the positions to commit before proceeding with fetching new records + ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis()); + } + } + /** * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and * partitions. @@ -1488,7 +1533,7 @@ private void autoCommitOnClose(final Timer timer) { if (groupMetadata.get().isEmpty() || applicationEventHandler == null) return; - if (autoCommitEnabled) + if (threadSafeConsumerState.autoCommitState().isAutoCommitEnabled()) commitSyncAllConsumed(timer); applicationEventHandler.add(new CommitOnCloseEvent()); @@ -1777,9 +1822,9 @@ private Fetch pollForFetches(Timer timer) { // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure - // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // NOTE: for hasAllFetchPositions to return the correct answer, we MUST call // updateAssignmentMetadataIfNeeded before this method. - if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + if (!subscriptions.hasAllFetchPositions() && pollTimeout > retryBackoffMs) { pollTimeout = retryBackoffMs; } @@ -1834,11 +1879,21 @@ private Fetch collectFetch() { * defined */ private boolean updateFetchPositions(final Timer timer) { - cachedSubscriptionHasAllFetchPositions = false; + // Fetch position validation is in the hot path for poll() and the cost of thread interaction for + // event processing is *very* heavy, CPU-wise. In a stable system, the positions are valid; having the + // network thread check the validity yields the same answer 99%+ of the time. But calling the + // network thread to determine that is very expensive. + // + // Instead, let the *application thread* determine if any partitions need their positions updated. If not, + // the application thread can skip sending an event to the network thread that will simply end up coming + // to the same conclusion, albeit much slower. + if (threadSafeConsumerState.canSkipUpdateFetchPositions()) + return true; + try { CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer)); wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future()); - cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent); + applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent); } catch (TimeoutException e) { return false; } finally { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index fe4d3806f2af4..dadf048e77823 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -78,7 +78,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener private final ConsumerMetadata metadata; private final LogContext logContext; private final Logger log; - private final Optional autoCommitState; + private final ThreadSafeAutoCommitState autoCommitState; private final CoordinatorRequestManager coordinatorRequestManager; private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final OffsetCommitMetricsManager metricsManager; @@ -115,7 +115,8 @@ public CommitRequestManager( final String groupId, final Optional groupInstanceId, final Metrics metrics, - final ConsumerMetadata metadata) { + final ConsumerMetadata metadata, + final ThreadSafeConsumerState threadSafeConsumerState) { this(time, logContext, subscriptions, @@ -128,10 +129,12 @@ public CommitRequestManager( config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG), OptionalDouble.empty(), metrics, - metadata); + metadata, + threadSafeConsumerState); } // Visible for testing + @SuppressWarnings({"checkstyle:ParameterNumber"}) CommitRequestManager( final Time time, final LogContext logContext, @@ -145,19 +148,14 @@ public CommitRequestManager( final long retryBackoffMaxMs, final OptionalDouble jitter, final Metrics metrics, - final ConsumerMetadata metadata) { + final ConsumerMetadata metadata, + final ThreadSafeConsumerState threadSafeConsumerState) { Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets"); this.time = time; this.logContext = logContext; this.log = logContext.logger(getClass()); this.pendingRequests = new PendingRequests(); - if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { - final long autoCommitInterval = - Integer.toUnsignedLong(config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - this.autoCommitState = Optional.of(new AutoCommitState(time, autoCommitInterval, logContext)); - } else { - this.autoCommitState = Optional.empty(); - } + this.autoCommitState = threadSafeConsumerState.autoCommitState(); this.coordinatorRequestManager = coordinatorRequestManager; this.groupId = groupId; this.groupInstanceId = groupInstanceId; @@ -212,7 +210,7 @@ public void signalClose() { */ @Override public long maximumTimeToWait(long currentTimeMs) { - return autoCommitState.map(ac -> ac.remainingMs(currentTimeMs)).orElse(Long.MAX_VALUE); + return autoCommitState.remainingMs(); } private static long findMinTime(final Collection requests, final long currentTimeMs) { @@ -239,12 +237,11 @@ private KafkaException maybeWrapAsTimeoutException(Throwable t) { * failed. */ private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { - AutoCommitState autocommit = autoCommitState.get(); CompletableFuture> result; if (requestState.offsets.isEmpty()) { result = CompletableFuture.completedFuture(Collections.emptyMap()); } else { - autocommit.setInflightCommitStatus(true); + autoCommitState.setInflightCommit(true); OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); result = request.future; result.whenComplete(autoCommitCallback(request.offsets)); @@ -266,7 +263,7 @@ private CompletableFuture> requestAutoCom * response for the in-flight is received. */ private void maybeAutoCommitAsync() { - if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + if (autoCommitState.shouldAutoCommit()) { OffsetCommitRequestState requestState = createOffsetCommitRequest( subscriptions.allConsumed(), Long.MAX_VALUE); @@ -319,7 +316,7 @@ private void maybeResetTimerWithBackoff(final CompletableFuture maybeAutoCommitSyncBeforeRebalance(final long deadlineMs) { - if (!autoCommitEnabled()) { + if (!autoCommitState.isAutoCommitEnabled()) { return CompletableFuture.completedFuture(null); } @@ -366,7 +363,7 @@ private void autoCommitSyncBeforeRebalanceWithRetries(OffsetCommitRequestState r */ private BiConsumer, ? super Throwable> autoCommitCallback(final Map allConsumedOffsets) { return (response, throwable) -> { - autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); + autoCommitState.setInflightCommit(false); if (throwable == null) { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); log.debug("Completed auto-commit of offsets {}", allConsumedOffsets); @@ -566,7 +563,7 @@ private boolean isStaleEpochErrorAndValidEpochAvailable(Throwable error) { } private void updateAutoCommitTimer(final long currentTimeMs) { - this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs)); + this.autoCommitState.updateTimer(currentTimeMs); } // Visible for testing @@ -600,7 +597,7 @@ public void onMemberEpochUpdated(Optional memberEpoch, String memberId) * @return True if auto-commit is enabled as defined in the config {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} */ public boolean autoCommitEnabled() { - return autoCommitState.isPresent(); + return autoCommitState.isAutoCommitEnabled(); } /** @@ -609,7 +606,7 @@ public boolean autoCommitEnabled() { * perform no action. */ public void resetAutoCommitTimer() { - autoCommitState.ifPresent(AutoCommitState::resetTimer); + autoCommitState.resetInterval(); } /** @@ -617,7 +614,7 @@ public void resetAutoCommitTimer() { * sent out then. If auto-commit is not enabled this will perform no action. */ public void resetAutoCommitTimer(long retryBackoffMs) { - autoCommitState.ifPresent(s -> s.resetTimer(retryBackoffMs)); + autoCommitState.resetInterval(retryBackoffMs); } /** @@ -1301,59 +1298,6 @@ private void maybeFailOnCoordinatorFatalError() { } } - /** - * Encapsulates the state of auto-committing and manages the auto-commit timer. - */ - private static class AutoCommitState { - private final Timer timer; - private final long autoCommitInterval; - private boolean hasInflightCommit; - - private final Logger log; - - public AutoCommitState( - final Time time, - final long autoCommitInterval, - final LogContext logContext) { - this.autoCommitInterval = autoCommitInterval; - this.timer = time.timer(autoCommitInterval); - this.hasInflightCommit = false; - this.log = logContext.logger(getClass()); - } - - public boolean shouldAutoCommit() { - if (!this.timer.isExpired()) { - return false; - } - if (this.hasInflightCommit) { - log.trace("Skipping auto-commit on the interval because a previous one is still in-flight."); - return false; - } - return true; - } - - public void resetTimer() { - this.timer.reset(autoCommitInterval); - } - - public void resetTimer(long retryBackoffMs) { - this.timer.reset(retryBackoffMs); - } - - public long remainingMs(final long currentTimeMs) { - this.timer.update(currentTimeMs); - return this.timer.remainingMs(); - } - - public void updateTimer(final long currentTimeMs) { - this.timer.update(currentTimeMs); - } - - public void setInflightCommitStatus(final boolean inflightCommitStatus) { - this.hasInflightCommit = inflightCommitStatus; - } - } - static class MemberInfo { String memberId = ""; Optional memberEpoch = Optional.empty(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java index e07424a63938e..3c9c76cdd149d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java @@ -151,7 +151,7 @@ public ConsumerMembershipManager(String groupId, BackgroundEventHandler backgroundEventHandler, Time time, Metrics metrics, - boolean autoCommitEnabled) { + ThreadSafeConsumerState threadSafeConsumerState) { this(groupId, groupInstanceId, rackId, @@ -164,7 +164,7 @@ public ConsumerMembershipManager(String groupId, backgroundEventHandler, time, new ConsumerRebalanceMetricsManager(metrics), - autoCommitEnabled); + threadSafeConsumerState); } // Visible for testing @@ -180,14 +180,14 @@ public ConsumerMembershipManager(String groupId, BackgroundEventHandler backgroundEventHandler, Time time, RebalanceMetricsManager metricsManager, - boolean autoCommitEnabled) { + ThreadSafeConsumerState threadSafeConsumerState) { super(groupId, subscriptions, metadata, logContext.logger(ConsumerMembershipManager.class), time, metricsManager, - autoCommitEnabled); + threadSafeConsumerState); this.groupInstanceId = groupInstanceId; this.rackId = rackId; this.rebalanceTimeoutMs = rebalanceTimeoutMs; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index d2d178a88c38b..5f2e1dbadfea2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -63,6 +63,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private final Supplier networkClientDelegateSupplier; private final Supplier requestManagersSupplier; private final AsyncConsumerMetrics asyncConsumerMetrics; + private final ThreadSafeExceptionReference metadataError; private ApplicationEventProcessor applicationEventProcessor; private NetworkClientDelegate networkClientDelegate; private RequestManagers requestManagers; @@ -79,7 +80,8 @@ public ConsumerNetworkThread(LogContext logContext, Supplier applicationEventProcessorSupplier, Supplier networkClientDelegateSupplier, Supplier requestManagersSupplier, - AsyncConsumerMetrics asyncConsumerMetrics) { + AsyncConsumerMetrics asyncConsumerMetrics, + ThreadSafeConsumerState threadSafeConsumerState) { super(BACKGROUND_THREAD_NAME, true); this.time = time; this.log = logContext.logger(getClass()); @@ -90,6 +92,7 @@ public ConsumerNetworkThread(LogContext logContext, this.requestManagersSupplier = requestManagersSupplier; this.running = true; this.asyncConsumerMetrics = asyncConsumerMetrics; + this.metadataError = threadSafeConsumerState.metadataError(); } @Override @@ -378,7 +381,7 @@ private void maybeFailOnMetadataError(List> events) { if (subscriptionMetadataEvent.isEmpty()) return; - networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> + metadataError.getClearAndRun(metadataError -> subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) ); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 3c280e39d0279..194f4b1a957d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -70,7 +70,7 @@ public class NetworkClientDelegate implements AutoCloseable { private final int requestTimeoutMs; private final Queue unsentRequests; private final long retryBackoffMs; - private Optional metadataError; + private final ThreadSafeExceptionReference metadataError; private final boolean notifyMetadataErrorsViaErrorQueue; private final AsyncConsumerMetrics asyncConsumerMetrics; @@ -82,7 +82,8 @@ public NetworkClientDelegate( final Metadata metadata, final BackgroundEventHandler backgroundEventHandler, final boolean notifyMetadataErrorsViaErrorQueue, - final AsyncConsumerMetrics asyncConsumerMetrics) { + final AsyncConsumerMetrics asyncConsumerMetrics, + final ThreadSafeConsumerState threadSafeConsumerState) { this.time = time; this.client = client; this.metadata = metadata; @@ -91,7 +92,7 @@ public NetworkClientDelegate( this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadataError = Optional.empty(); + this.metadataError = threadSafeConsumerState.metadataError(); this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue; this.asyncConsumerMetrics = asyncConsumerMetrics; } @@ -163,7 +164,7 @@ private void maybePropagateMetadataError() { if (notifyMetadataErrorsViaErrorQueue) { backgroundEventHandler.add(new ErrorEvent(e)); } else { - metadataError = Optional.of(e); + metadataError.set(e); } } } @@ -247,12 +248,6 @@ private ClientRequest makeClientRequest( unsent.handler ); } - - public Optional getAndClearMetadataError() { - Optional metadataError = this.metadataError; - this.metadataError = Optional.empty(); - return metadataError; - } public Node leastLoadedNode() { return this.client.leastLoadedNode(time.milliseconds()).node(); @@ -453,7 +448,8 @@ public static Supplier supplier(final Time time, final ClientTelemetrySender clientTelemetrySender, final BackgroundEventHandler backgroundEventHandler, final boolean notifyMetadataErrorsViaErrorQueue, - final AsyncConsumerMetrics asyncConsumerMetrics) { + final AsyncConsumerMetrics asyncConsumerMetrics, + final ThreadSafeConsumerState threadSafeConsumerState) { return new CachedSupplier<>() { @Override protected NetworkClientDelegate create() { @@ -467,7 +463,7 @@ protected NetworkClientDelegate create() { metadata, throttleTimeSensor, clientTelemetrySender); - return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); + return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics, threadSafeConsumerState); } }; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index 4c8d10ad323ac..45a4b6b25640e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -54,7 +54,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -93,12 +92,7 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou private final NetworkClientDelegate networkClientDelegate; private final CommitRequestManager commitRequestManager; private final long defaultApiTimeoutMs; - - /** - * Exception that occurred while updating positions after the triggering event had already - * expired. It will be propagated and cleared on the next call to update fetch positions. - */ - private final AtomicReference cachedUpdatePositionsException = new AtomicReference<>(); + private final ThreadSafeExceptionReference positionsUpdateError; /** * This holds the last OffsetFetch request triggered to retrieve committed offsets to update @@ -113,12 +107,12 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, final ConsumerMetadata metadata, final IsolationLevel isolationLevel, final Time time, - final long retryBackoffMs, final int requestTimeoutMs, final long defaultApiTimeoutMs, final ApiVersions apiVersions, final NetworkClientDelegate networkClientDelegate, final CommitRequestManager commitRequestManager, + final ThreadSafeAsyncConsumerState threadSafeConsumerState, final LogContext logContext) { requireNonNull(subscriptionState); requireNonNull(metadata); @@ -139,13 +133,13 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.apiVersions = apiVersions; this.networkClientDelegate = networkClientDelegate; - this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState, - time, retryBackoffMs, apiVersions); + this.offsetFetcherUtils = threadSafeConsumerState.offsetFetcherUtils(); // Register the cluster metadata update callback. Note this only relies on the // requestsToRetry initialized above, and won't be invoked until all managers are // initialized and the network thread started. this.metadata.addClusterUpdateListener(this); this.commitRequestManager = commitRequestManager; + this.positionsUpdateError = threadSafeConsumerState.positionsUpdateError(); } private static class PendingFetchCommittedRequest { @@ -231,8 +225,8 @@ public CompletableFuture> fetchO * on {@link SubscriptionState#hasAllFetchPositions()}). It will complete immediately, with true, if all positions * are already available. If some positions are missing, the future will complete once the offsets are retrieved and positions are updated. */ - public CompletableFuture updateFetchPositions(long deadlineMs) { - CompletableFuture result = new CompletableFuture<>(); + public CompletableFuture updateFetchPositions(long deadlineMs) { + CompletableFuture result = new CompletableFuture<>(); try { if (maybeCompleteWithPreviousException(result)) { @@ -243,7 +237,7 @@ public CompletableFuture updateFetchPositions(long deadlineMs) { if (subscriptionState.hasAllFetchPositions()) { // All positions are already available - result.complete(true); + result.complete(null); return result; } @@ -252,7 +246,7 @@ public CompletableFuture updateFetchPositions(long deadlineMs) { if (error != null) { result.completeExceptionally(error); } else { - result.complete(subscriptionState.hasAllFetchPositions()); + result.complete(null); } }); @@ -262,13 +256,8 @@ public CompletableFuture updateFetchPositions(long deadlineMs) { return result; } - private boolean maybeCompleteWithPreviousException(CompletableFuture result) { - Throwable cachedException = cachedUpdatePositionsException.getAndSet(null); - if (cachedException != null) { - result.completeExceptionally(cachedException); - return true; - } - return false; + private boolean maybeCompleteWithPreviousException(CompletableFuture result) { + return positionsUpdateError.getClearAndRun(result::completeExceptionally); } /** @@ -318,7 +307,7 @@ private void cacheExceptionIfEventExpired(CompletableFuture result, long d result.whenComplete((__, error) -> { boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; if (error != null && updatePositionsExpired) { - cachedUpdatePositionsException.set(error); + positionsUpdateError.set(error); } }); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index ae39753f3d8e8..9075108c8c67b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -167,7 +167,8 @@ public static Supplier supplier(final Time time, final Metrics metrics, final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, final MemberStateListener applicationThreadMemberStateListener, - final Optional streamsRebalanceData + final Optional streamsRebalanceData, + final ThreadSafeAsyncConsumerState threadSafeConsumerState ) { return new CachedSupplier<>() { @Override @@ -216,7 +217,8 @@ protected RequestManagers create() { groupRebalanceConfig.groupId, groupRebalanceConfig.groupInstanceId, metrics, - metadata); + metadata, + threadSafeConsumerState); if (streamsRebalanceData.isPresent()) { streamsMembershipManager = new StreamsMembershipManager( groupRebalanceConfig.groupId, @@ -258,7 +260,7 @@ protected RequestManagers create() { backgroundEventHandler, time, metrics, - config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + threadSafeConsumerState); // Update the group member ID label in the client telemetry reporter. // According to KIP-1082, the consumer will generate the member ID as the incarnation ID of the process. @@ -286,12 +288,12 @@ protected RequestManagers create() { metadata, fetchConfig.isolationLevel, time, - retryBackoffMs, requestTimeoutMs, defaultApiTimeoutMs, apiVersions, networkClientDelegate, commitRequestManager, + threadSafeConsumerState, logContext); return new RequestManagers( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 12b01b5482e32..26e2279bd9b5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -269,6 +269,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { // This FetchBuffer is shared between the application and network threads. this.fetchBuffer = new ShareFetchBuffer(logContext); + ThreadSafeConsumerState threadSafeConsumerState = new ThreadSafeShareConsumerState(); final Supplier networkClientDelegateSupplier = NetworkClientDelegate.supplier( time, logContext, @@ -280,7 +281,8 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler, true, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState ); this.completedAcknowledgements = new LinkedList<>(); @@ -312,7 +314,8 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, - asyncConsumerMetrics); + asyncConsumerMetrics, + threadSafeConsumerState); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext); @@ -383,8 +386,9 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { this.backgroundEventHandler = new BackgroundEventHandler( backgroundEventQueue, time, asyncConsumerMetrics); + ThreadSafeConsumerState threadSafeConsumerState = new ThreadSafeShareConsumerState(); final Supplier networkClientDelegateSupplier = - () -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics); + () -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics, threadSafeConsumerState); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -418,7 +422,8 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, - asyncConsumerMetrics); + asyncConsumerMetrics, + threadSafeConsumerState); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = new CompletableEventReaper(logContext); @@ -483,7 +488,8 @@ ApplicationEventHandler build( final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, final Supplier requestManagersSupplier, - final AsyncConsumerMetrics asyncConsumerMetrics + final AsyncConsumerMetrics asyncConsumerMetrics, + final ThreadSafeConsumerState threadSafeConsumerState ); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java index 47ab87edb358d..162326f57801b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java @@ -107,7 +107,7 @@ public ShareMembershipManager(LogContext logContext, logContext.logger(ShareMembershipManager.class), time, metricsManager, - false); + new ThreadSafeShareConsumerState()); this.rackId = rackId; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeAsyncConsumerState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeAsyncConsumerState.java new file mode 100644 index 0000000000000..e08629a0ce705 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeAsyncConsumerState.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; + +public class ThreadSafeAsyncConsumerState extends ThreadSafeConsumerState { + + private final SubscriptionState subscriptions; + private final OffsetFetcherUtils offsetFetcherUtils; + private final ThreadSafeAutoCommitState autoCommitState; + + /** + * Exception that occurred while updating positions after the triggering event had already + * expired. It will be propagated and cleared on the next call to update fetch positions. + */ + private final ThreadSafeExceptionReference positionsUpdateError; + + public ThreadSafeAsyncConsumerState(ThreadSafeAutoCommitState autoCommitState, + LogContext logContext, + ConsumerMetadata metadata, + SubscriptionState subscriptions, + Time time, + long retryBackoffMs, + ApiVersions apiVersions) { + this.autoCommitState = autoCommitState; + this.subscriptions = subscriptions; + this.offsetFetcherUtils = new OffsetFetcherUtils( + logContext, + metadata, + subscriptions, + time, + retryBackoffMs, + apiVersions + ); + this.positionsUpdateError = new ThreadSafeExceptionReference(); + } + + public ThreadSafeAutoCommitState autoCommitState() { + return autoCommitState; + } + + OffsetFetcherUtils offsetFetcherUtils() { + return offsetFetcherUtils; + } + + public ThreadSafeExceptionReference positionsUpdateError() { + return positionsUpdateError; + } + + public boolean canSkipUpdateFetchPositions() { + positionsUpdateError.maybeThrowException(); + metadataError.maybeClearAndThrowException(); + + // In cases of metadata updates, getPartitionsToValidate() will review the partitions and + // determine which, if any, need to be validated. If any partitions require validation, the + // update fetch positions step can't be skipped. + if (!offsetFetcherUtils.getPartitionsToValidate().isEmpty()) + return false; + + // If there are no partitions in the AWAIT_RESET, AWAIT_VALIDATION, or INITIALIZING states, it's ok to skip. + return subscriptions.hasAllFetchPositions(); + } + + public boolean canSkipWaitingOnPoll(long currentTimeMs) { + if (reconciliationState.isInProgress()) + return false; + + autoCommitState.updateTimer(currentTimeMs); + return !autoCommitState.shouldAutoCommit(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeAutoCommitState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeAutoCommitState.java new file mode 100644 index 0000000000000..af450d9a2263f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeAutoCommitState.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import org.slf4j.Logger; + +import java.time.Duration; + +/** + * Encapsulates the state of auto-committing and manages the auto-commit timer. + */ +public interface ThreadSafeAutoCommitState { + + /** + * @return {@code true} if auto-commit is enabled as defined in the configuration + * {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}. + */ + boolean isAutoCommitEnabled(); + + boolean shouldAutoCommit(); + + /** + * Reset the auto-commit timer to the {@link ConsumerConfig#AUTO_COMMIT_INTERVAL_MS_CONFIG auto-commit interval}, + * so that the next auto-commit is sent out on the interval starting from now. If auto-commit is disabled this will + * perform no action. + */ + void resetInterval(); + + /** + * Reset the auto-commit timer to the provided interval, so that the next auto-commit is + * sent out then. If auto-commit is disabled this will perform no action. + */ + void resetInterval(final long interval); + + /** + * Return the number of milliseconds remaining on the timer based on the most previous call to + * {@link #updateTimer(long)} and {@link #resetInterval()}/{@link #resetInterval(long)}. + */ + long remainingMs(); + + /** + * Updates the timer to the timestamp provided. + * + *

+ * + * Note that the timer doesn't update automatically on its own, nor is it updated periodically by the background + * thread. The timer's notion of the current time is only updated through this mechanism. It is expected that + * this method will only be called during {@link AsyncKafkaConsumer#poll(Duration)} invocation by the application + * thread. The network thread is free to update the auto-commit interval via either {@link #resetInterval()} or + * {@link #resetInterval(long)}. + */ + void updateTimer(final long currentTimeMs); + + void setInflightCommit(final boolean hasInflightCommit); + + static ThreadSafeAutoCommitState fromConfig(final LogContext logContext, + final ConsumerConfig config, + final Time time) { + if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + final long interval = Integer.toUnsignedLong(config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + return new AutoCommitEnabled(logContext, time, interval); + } else { + return AUTO_COMMIT_DISABLED; + } + } + + class AutoCommitEnabled implements ThreadSafeAutoCommitState { + + private final Logger log; + private final Timer timer; + private final long autoCommitInterval; + private boolean hasInflightCommit; + + public AutoCommitEnabled(final LogContext logContext, + final Time time, + final long autoCommitInterval) { + this.log = logContext.logger(ThreadSafeAutoCommitState.class); + this.timer = time.timer(autoCommitInterval); + this.autoCommitInterval = autoCommitInterval; + this.hasInflightCommit = false; + } + + @Override + public boolean isAutoCommitEnabled() { + return true; + } + + @Override + public synchronized boolean shouldAutoCommit() { + if (!timer.isExpired()) { + return false; + } + + if (hasInflightCommit) { + log.trace("Skipping auto-commit on the interval because a previous one is still in-flight."); + return false; + } + + return true; + } + + @Override + public synchronized void resetInterval() { + timer.reset(autoCommitInterval); + } + + @Override + public synchronized void resetInterval(final long interval) { + timer.reset(interval); + } + + @Override + public synchronized long remainingMs() { + return timer.remainingMs(); + } + + @Override + public synchronized void updateTimer(final long currentTimeMs) { + timer.update(currentTimeMs); + } + + @Override + public synchronized void setInflightCommit(final boolean hasInflightCommit) { + this.hasInflightCommit = hasInflightCommit; + } + } + + ThreadSafeAutoCommitState AUTO_COMMIT_DISABLED = new ThreadSafeAutoCommitState() { + @Override + public boolean isAutoCommitEnabled() { + return false; + } + + @Override + public boolean shouldAutoCommit() { + return false; + } + + @Override + public void resetInterval() { + // No op + } + + @Override + public void resetInterval(final long interval) { + // No op + } + + @Override + public long remainingMs() { + return Long.MAX_VALUE; + } + + @Override + public void updateTimer(final long currentTimeMs) { + // No op + } + + @Override + public void setInflightCommit(final boolean inflightCommitStatus) { + // No op + } + }; +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeConsumerState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeConsumerState.java new file mode 100644 index 0000000000000..df2baf3d06886 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeConsumerState.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +public abstract class ThreadSafeConsumerState { + + protected final ThreadSafeExceptionReference metadataError; + + protected final ThreadSafeReconciliationState reconciliationState; + + protected ThreadSafeConsumerState() { + this.reconciliationState = new ThreadSafeReconciliationState(); + this.metadataError = new ThreadSafeExceptionReference(); + } + + public abstract ThreadSafeAutoCommitState autoCommitState(); + + public ThreadSafeExceptionReference metadataError() { + return metadataError; + } + + public ThreadSafeReconciliationState reconciliationState() { + return reconciliationState; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeExceptionReference.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeExceptionReference.java new file mode 100644 index 0000000000000..2e0df0cdd490f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeExceptionReference.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.KafkaException; + +import java.util.concurrent.CompletionException; +import java.util.function.Consumer; + +/** + * {@code ThreadSafeExceptionReference} builds on top of {@link ThreadSafeReference} both to be more explicit + * about the contents and to provide utility methods. + */ +public class ThreadSafeExceptionReference extends ThreadSafeReference { + + private static final Consumer THROW_EXCEPTION = exception -> { + // Unwrap the ExecutionException to model what ConsumerUtils.getResult() does when handling exceptions + // from the call to Future.get(). + if (exception instanceof CompletionException) + exception = exception.getCause(); + + throw ConsumerUtils.maybeWrapAsKafkaException(exception); + }; + + /** + * If the underlying error is present, this will throw the error and clear it. + * + *

+ * + * Note: if the exception is wrapped in a {@link CompletionException}, it will be unwrapped. However, if + * the underlying error is not a subclass of {@link KafkaException}, it will be wrapped as such + * so that it is an unchecked exception. + */ + public void maybeClearAndThrowException() { + getClearAndRun(THROW_EXCEPTION); + } + + /** + * If the underlying error is present, this will throw the error. + * + *

+ * + * Note: if the exception is wrapped in a {@link CompletionException}, it will be unwrapped. However, if + * the underlying error is not a subclass of {@link KafkaException}, it will be wrapped as such + * so that it is an unchecked exception. + */ + public void maybeThrowException() { + ifPresent(THROW_EXCEPTION); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeReconciliationState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeReconciliationState.java new file mode 100644 index 0000000000000..b2d7d36c3f210 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeReconciliationState.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.PollEvent; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class is constructed from within the {@link ThreadSafeConsumerState} instance, which means it's available + * for both the application and network threads to use. The main user is the {@link AbstractMembershipManager} for + * mutations and the {@link ThreadSafeConsumerState#canSkipWaitingOnPoll(long)} method for determining if the costly + * {@link PollEvent} can be sent in the background or not. + * + *

+ * + * Yes, this class is a wrapper around a simple {@link AtomicBoolean}, but the intention behind dedicating a class + * to it hopefully makes the shared nature and its purpose more apparent. + */ +public class ThreadSafeReconciliationState { + + private final AtomicBoolean value; + + public ThreadSafeReconciliationState() { + this(false); + } + + public ThreadSafeReconciliationState(boolean value) { + this.value = new AtomicBoolean(value); + } + + public boolean isInProgress() { + return value.get(); + } + + public void setInProgress(boolean value) { + this.value.set(value); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeReference.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeReference.java new file mode 100644 index 0000000000000..d5d4bfd59a6a7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeReference.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * {@code ThreadSafeReference} serves as a thread-safe primitive around an object reference that provides + * utility methods for more ergonomic use. + */ +public class ThreadSafeReference { + + private final AtomicReference reference = new AtomicReference<>(); + + /** + * Thin wrapper around {@link AtomicReference#get()} that provides a null-safe API via {@link Optional}. + */ + public Optional get() { + return Optional.ofNullable(reference.get()); + } + + /** + * If the underlying reference is nonnull, the given {@link Consumer action} is invoked. + */ + public void ifPresent(Consumer action) { + get().ifPresent(action); + } + + /** + * Thin wrapper around {@link AtomicReference#getAndSet(Object)} that provides a null-safe API via {@link Optional}. + */ + public Optional getAndClear() { + return Optional.ofNullable(reference.getAndSet(null)); + } + + /** + * Wrapper around {@link #getAndClear()} and {@link Optional#ifPresent(Consumer)} that retrieves and clears out + * the underlying reference in a single, atomic operation, and then invokes the given {@link Consumer} if the + * value was present. Lastly, it returns the present/empty flag so that the caller can short-circuit with less + * boilerplate. + */ + public boolean getClearAndRun(Consumer action) { + Optional value = getAndClear(); + + if (value.isPresent()) { + action.accept(value.get()); + return true; + } else { + return false; + } + } + + public void set(T value) { + reference.set(value); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeShareConsumerState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeShareConsumerState.java new file mode 100644 index 0000000000000..d471a4f816d54 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ThreadSafeShareConsumerState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ShareConsumer; + +/** + * This class stores shared state needed by both the application thread ({@link AsyncKafkaConsumer}) and the + * network thread ({@link ConsumerNetworkThread}) for the {@link ShareConsumer}. + */ +public class ThreadSafeShareConsumerState extends ThreadSafeConsumerState { + + @Override + public ThreadSafeAutoCommitState autoCommitState() { + return ThreadSafeAutoCommitState.AUTO_COMMIT_DISABLED; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index 6ab827b617c19..1123b9b54f35e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.clients.consumer.internals.ThreadSafeConsumerState; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.internals.IdempotentCloser; @@ -56,7 +57,8 @@ public ApplicationEventHandler(final LogContext logContext, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, final Supplier requestManagersSupplier, - final AsyncConsumerMetrics asyncConsumerMetrics) { + final AsyncConsumerMetrics asyncConsumerMetrics, + final ThreadSafeConsumerState threadSafeConsumerState) { this.log = logContext.logger(ApplicationEventHandler.class); this.time = time; this.applicationEventQueue = applicationEventQueue; @@ -68,7 +70,9 @@ public ApplicationEventHandler(final LogContext logContext, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, - asyncConsumerMetrics); + asyncConsumerMetrics, + threadSafeConsumerState + ); this.networkThread.start(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 853c5484df5be..e6ad32777157f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -457,7 +457,7 @@ private void process(final ResetOffsetEvent event) { * them to update positions in the subscription state. */ private void process(final CheckAndUpdatePositionsEvent event) { - CompletableFuture future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); + CompletableFuture future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); future.whenComplete(complete(event.future())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java index 5f1ced33e3a09..efd4450fb2a35 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.TopicPartition; @@ -27,10 +28,11 @@ * offsets and update positions when it gets them. This will first attempt to use the committed offsets if available. If * no committed offsets available, it will use the partition offsets retrieved from the leader. *

- * The event completes with a boolean indicating if all assigned partitions have valid fetch positions - * (based on {@link SubscriptionState#hasAllFetchPositions()}). + * The event completes when {@link OffsetsRequestManager} has completed its attempt to update the positions. There + * is no guarantee that {@link SubscriptionState#hasAllFetchPositions()} will return {@code true} just because the + * event has completed. */ -public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent { +public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent { public CheckAndUpdatePositionsEvent(long deadlineMs) { super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java index 402697227ee80..3d769c0e321eb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -44,6 +44,7 @@ public class ApplicationEventHandlerTest { private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); private final RequestManagers requestManagers = mock(RequestManagers.class); private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); + private final ThreadSafeConsumerState threadSafeConsumerState = mock(ThreadSafeConsumerState.class); @ParameterizedTest @MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider") @@ -58,7 +59,8 @@ public void testRecordApplicationEventQueueSize(String groupName) { () -> applicationEventProcessor, () -> networkClientDelegate, () -> requestManagers, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState )) { // add event applicationEventHandler.add(new PollEvent(time.milliseconds())); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 9aef4cf7f5293..4b7f9d947e91d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata.LeaderAndEpoch; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; @@ -138,6 +139,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; +import static org.apache.kafka.clients.consumer.internals.ThreadSafeAutoCommitState.AUTO_COMMIT_DISABLED; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.TestUtils.requiredConsumerConfig; @@ -223,7 +225,7 @@ private AsyncKafkaConsumer newConsumerWithStreamRebalanceData( new StringDeserializer(), new StringDeserializer(), time, - (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, + (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics, metadataError) -> applicationEventHandler, logContext -> backgroundEventReaper, (logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector, (consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata, @@ -238,7 +240,7 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, + (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics, metadataError) -> applicationEventHandler, logContext -> backgroundEventReaper, (logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector, (consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata, @@ -274,7 +276,7 @@ private AsyncKafkaConsumer newConsumer( requestTimeoutMs, defaultApiTimeoutMs, "group-id", - false); + new ThreadSafeAsyncConsumerState(AUTO_COMMIT_DISABLED, new LogContext(), metadata, subscriptions, time, retryBackoffMs, new ApiVersions())); } @Test @@ -417,7 +419,7 @@ public void testWakeupBeforeCallingPoll() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); @@ -439,7 +441,7 @@ public void testWakeupAfterEmptyFetch() { consumer.wakeup(); return Fetch.empty(); }).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); @@ -463,7 +465,7 @@ public void testWakeupAfterNonEmptyFetch() { consumer.wakeup(); return Fetch.forPartition(tp, records, true, new OffsetAndMetadata(4, Optional.of(0), "")); }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); @@ -482,7 +484,7 @@ public void testCommitInRebalanceCallback() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); SortedSet sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); sortedPartitions.add(tp); CompletableBackgroundEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, sortedPartitions); @@ -522,7 +524,7 @@ public void testClearWakeupTriggerAfterPoll() { ); doReturn(Fetch.forPartition(tp, records, true, new OffsetAndMetadata(4, Optional.of(0), ""))) .when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); @@ -668,7 +670,8 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { MockCommitCallback callback = new MockCommitCallback(); completeCommitAsyncApplicationEventSuccessfully(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(metadata.updateVersion()).thenReturn(-1); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); @@ -1338,6 +1341,7 @@ private MemberStateListener captureGroupMetadataUpdateListener(final MockedStati any(), any(), applicationThreadMemberStateListener.capture(), + any(), any() )); return applicationThreadMemberStateListener.getValue(); @@ -1409,7 +1413,8 @@ private Optional captureStreamRebalanceData(final MockedSt any(), any(), any(), - streamRebalanceData.capture() + streamRebalanceData.capture(), + any() )); return streamRebalanceData.getValue(); } @@ -1481,7 +1486,7 @@ public void testListenerCallbacksInvoke(List consumer.poll(Duration.ZERO)); } @@ -1641,7 +1646,7 @@ public void testEnsurePollEventSentOnConsumerPoll() { doAnswer(invocation -> Fetch.forPartition(tp, records, true, new OffsetAndMetadata(3, Optional.of(0), ""))) .when(fetchCollector) .collectFetch(Mockito.any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(singletonList("topic1")); @@ -1660,7 +1665,8 @@ private Properties requiredConsumerConfigAndGroupId(final String groupId) { private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() { completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(metadata.updateVersion()).thenReturn(-1); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(new TopicPartition("t1", 1))); @@ -1699,7 +1705,7 @@ public void testLongPollWaitIsLimited() { }).doAnswer(invocation -> Fetch.forPartition(tp, records, true, nextOffsetAndMetadata) ).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); markReconcileAndAutoCommitCompleteForPollEvent(); // And then poll for up to 10000ms, which should return 2 records without timing out @@ -1797,7 +1803,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); @@ -1836,7 +1842,7 @@ void testReaperInvokedInPoll() { doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(Collections.singletonList("topic")); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); markReconcileAndAutoCommitCompleteForPollEvent(); consumer.poll(Duration.ZERO); verify(backgroundEventReaper).reap(time.milliseconds()); @@ -1893,7 +1899,7 @@ public void testSeekToEnd() { public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() { consumer = newConsumer(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(null); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); completeTopicPatternSubscriptionChangeEventSuccessfully(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 26d39715d27ad..b48459722ab4f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -138,7 +139,19 @@ public void setup() { @Test public void testOffsetFetchRequestStateToStringBase() { ConsumerConfig config = mock(ConsumerConfig.class); - + ThreadSafeConsumerState threadSafeConsumerState = new ThreadSafeAsyncConsumerState( + ThreadSafeAutoCommitState.fromConfig( + logContext, + config, + time + ), + logContext, + metadata, + subscriptionState, + time, + retryBackoffMs, + new ApiVersions() + ); CommitRequestManager commitRequestManager = new CommitRequestManager( time, logContext, @@ -152,7 +165,8 @@ public void testOffsetFetchRequestStateToStringBase() { retryBackoffMaxMs, OptionalDouble.of(0), metrics, - metadata); + metadata, + threadSafeConsumerState); commitRequestManager.onMemberEpochUpdated(Optional.of(1), Uuid.randomUuid().toString()); Set requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1)); @@ -1569,11 +1583,25 @@ private CommitRequestManager create(final boolean autoCommitEnabled, final long if (autoCommitEnabled) props.setProperty(GROUP_ID_CONFIG, TestUtils.randomString(10)); + ConsumerConfig config = new ConsumerConfig(props); + ThreadSafeConsumerState threadSafeConsumerState = new ThreadSafeAsyncConsumerState( + ThreadSafeAutoCommitState.fromConfig( + logContext, + config, + time + ), + logContext, + metadata, + subscriptionState, + time, + retryBackoffMs, + new ApiVersions() + ); return spy(new CommitRequestManager( this.time, this.logContext, this.subscriptionState, - new ConsumerConfig(props), + config, this.coordinatorRequestManager, this.offsetCommitCallbackInvoker, DEFAULT_GROUP_ID, @@ -1582,7 +1610,8 @@ private CommitRequestManager create(final boolean autoCommitEnabled, final long retryBackoffMaxMs, OptionalDouble.of(0), metrics, - metadata)); + metadata, + threadSafeConsumerState)); } private ClientResponse buildOffsetFetchClientResponse( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java index aa8c7bdb1dcae..739c2a2092dc5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.consumer.CloseOptions; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; @@ -141,10 +142,23 @@ private ConsumerMembershipManager createMembershipManagerJoiningGroup(String gro } private ConsumerMembershipManager createMembershipManager(String groupInstanceId) { + ThreadSafeConsumerState threadSafeConsumerState = new ThreadSafeAsyncConsumerState( + new ThreadSafeAutoCommitState.AutoCommitEnabled( + LOG_CONTEXT, + time, + 1000 + ), + LOG_CONTEXT, + metadata, + subscriptionState, + time, + 1000, + new ApiVersions() + ); ConsumerMembershipManager manager = spy(new ConsumerMembershipManager( GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, - backgroundEventHandler, time, rebalanceMetricsManager, true)); + backgroundEventHandler, time, rebalanceMetricsManager, threadSafeConsumerState)); assertMemberIdIsGenerated(manager.memberId()); return manager; } @@ -154,10 +168,14 @@ private ConsumerMembershipManager createMembershipManagerJoiningGroup( String serverAssignor, String rackId ) { + ThreadSafeConsumerState threadSafeConsumerState = new ThreadSafeAsyncConsumerState( + new ThreadSafeAutoCommitState.AutoCommitEnabled(LOG_CONTEXT, time, 1000), + LOG_CONTEXT, metadata, subscriptionState, time, 1000, new ApiVersions() + ); ConsumerMembershipManager manager = spy(new ConsumerMembershipManager( GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(rackId), REBALANCE_TIMEOUT, Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager, - metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager, true)); + metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager, threadSafeConsumerState)); assertMemberIdIsGenerated(manager.memberId()); manager.transitionToJoining(); return manager; @@ -242,10 +260,14 @@ public void testTransitionToFatal() { @Test public void testTransitionToFailedWhenTryingToJoin() { + ThreadSafeConsumerState threadSafeConsumerState = new ThreadSafeAsyncConsumerState( + new ThreadSafeAutoCommitState.AutoCommitEnabled(LOG_CONTEXT, time, 1000), + LOG_CONTEXT, metadata, subscriptionState, time, 1000, new ApiVersions() + ); ConsumerMembershipManager membershipManager = new ConsumerMembershipManager( GROUP_ID, Optional.empty(), Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, - backgroundEventHandler, time, rebalanceMetricsManager, true); + backgroundEventHandler, time, rebalanceMetricsManager, threadSafeConsumerState); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); membershipManager.transitionToJoining(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 35ccb17dfab43..e8835062829dd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -68,6 +68,7 @@ public class ConsumerNetworkThreadTest { private final RequestManagers requestManagers; private final CompletableEventReaper applicationEventReaper; private final AsyncConsumerMetrics asyncConsumerMetrics; + private final ThreadSafeConsumerState threadSafeConsumerState; ConsumerNetworkThreadTest() { this.networkClientDelegate = mock(NetworkClientDelegate.class); @@ -80,6 +81,7 @@ public class ConsumerNetworkThreadTest { this.time = new MockTime(); this.applicationEventQueue = new LinkedBlockingQueue<>(); this.asyncConsumerMetrics = mock(AsyncConsumerMetrics.class); + this.threadSafeConsumerState = mock(ThreadSafeConsumerState.class); LogContext logContext = new LogContext(); this.consumerNetworkThread = new ConsumerNetworkThread( @@ -90,7 +92,8 @@ public class ConsumerNetworkThreadTest { () -> applicationEventProcessor, () -> networkClientDelegate, () -> requestManagers, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState ); } @@ -219,7 +222,8 @@ public void testRunOnceRecordTimeBetweenNetworkThreadPoll(String groupName) { () -> applicationEventProcessor, () -> networkClientDelegate, () -> requestManagers, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState )) { consumerNetworkThread.initializeResources(); @@ -254,7 +258,8 @@ public void testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTi () -> applicationEventProcessor, () -> networkClientDelegate, () -> requestManagers, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState )) { consumerNetworkThread.initializeResources(); @@ -332,7 +337,8 @@ private void testInitializeResourcesError(Supplier networ () -> applicationEventProcessor, networkClientDelegateSupplier, requestManagersSupplier, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState )) { assertThrows(KafkaException.class, thread::initializeResources, "initializeResources should fail because one or more Supplier throws an error on get()"); assertDoesNotThrow(thread::cleanup, "cleanup() should not cause an error because all references are checked before use"); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index f806ab65b6b65..c971ead835368 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -4214,7 +4214,7 @@ public TestableNetworkClientDelegate(Time time, Metadata metadata, BackgroundEventHandler backgroundEventHandler, boolean notifyMetadataErrorsViaErrorQueue) { - super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); + super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class), mock(ThreadSafeAsyncConsumerState.class)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index 0347423137b57..f0542b91b5435 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -74,13 +74,18 @@ public class NetworkClientDelegateTest { private MockClient client; private Metadata metadata; private BackgroundEventHandler backgroundEventHandler; + private ThreadSafeConsumerState threadSafeConsumerState; @BeforeEach public void setup() { this.time = new MockTime(0); this.metadata = mock(Metadata.class); this.backgroundEventHandler = mock(BackgroundEventHandler.class); + this.threadSafeConsumerState = mock(ThreadSafeConsumerState.class); this.client = new MockClient(time, Collections.singletonList(mockNode())); + + ThreadSafeExceptionReference metadataError = new ThreadSafeExceptionReference(); + when(this.threadSafeConsumerState.metadataError()).thenReturn(metadataError); } @Test @@ -220,10 +225,11 @@ public void testPropagateMetadataError() { doThrow(authException).when(metadata).maybeThrowAnyException(); NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false); - assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty()); + ThreadSafeExceptionReference metadataErrorRef = threadSafeConsumerState.metadataError(); + assertTrue(metadataErrorRef.getAndClear().isEmpty()); networkClientDelegate.poll(0, time.milliseconds()); - Optional metadataError = networkClientDelegate.getAndClearMetadataError(); + Optional metadataError = metadataErrorRef.getAndClear(); assertTrue(metadataError.isPresent()); assertInstanceOf(AuthenticationException.class, metadataError.get()); assertEquals(authException.getMessage(), metadataError.get().getMessage()); @@ -301,7 +307,8 @@ public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErro this.metadata, this.backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, - asyncConsumerMetrics + asyncConsumerMetrics, + threadSafeConsumerState ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index ed96b81790002..bfea920bfcd1e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -112,12 +112,15 @@ public void setup() { metadata, DEFAULT_ISOLATION_LEVEL, time, - RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS, DEFAULT_API_TIMEOUT_MS, apiVersions, mock(NetworkClientDelegate.class), commitRequestManager, + new ThreadSafeAsyncConsumerState( + new ThreadSafeAutoCommitState.AutoCommitEnabled(logContext, time, 1000), + logContext, metadata, subscriptionState, time, RETRY_BACKOFF_MS, apiVersions + ), logContext ); } @@ -676,7 +679,7 @@ public void testUpdatePositionsWithCommittedOffsets() { // Call to updateFetchPositions. Should send an OffsetFetch request and use the response to set positions CompletableFuture> fetchResult = new CompletableFuture<>(); when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); - CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); assertFalse(updatePositions1.isDone(), "Update positions should wait for the OffsetFetch request"); verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); @@ -705,13 +708,13 @@ public void testUpdatePositionsWithCommittedOffsetsReusesRequest() { // call to updateFetchPositions. Should send an OffsetFetch request CompletableFuture> fetchResult = new CompletableFuture<>(); when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); - CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); assertFalse(updatePositions1.isDone(), "Update positions should wait for the OffsetFetch request"); verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); clearInvocations(commitRequestManager); // Call to updateFetchPositions again with the same set of initializing partitions should reuse request - CompletableFuture updatePositions2 = requestManager.updateFetchPositions(time.milliseconds()); + CompletableFuture updatePositions2 = requestManager.updateFetchPositions(time.milliseconds()); verify(commitRequestManager, never()).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); // Receive response with committed offsets, should complete both calls @@ -738,7 +741,7 @@ public void testUpdatePositionsDoesNotApplyOffsetsIfPartitionNotInitializingAnym // call to updateFetchPositions will trigger an OffsetFetch request for tp1 (won't complete just yet) CompletableFuture> fetchResult = new CompletableFuture<>(); when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); - CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); assertFalse(updatePositions1.isDone()); verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); clearInvocations(commitRequestManager); @@ -766,7 +769,7 @@ public void testUpdatePositionsDoesNotResetPositionBeforeRetrievingOffsetsForNew // call to updateFetchPositions will trigger an OffsetFetch request for tp1 (won't complete just yet) CompletableFuture> fetchResult = new CompletableFuture<>(); when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); - CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); assertFalse(updatePositions1.isDone()); verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); clearInvocations(commitRequestManager); @@ -801,12 +804,15 @@ public void testRemoteListOffsetsRequestTimeoutMs() { metadata, DEFAULT_ISOLATION_LEVEL, time, - RETRY_BACKOFF_MS, requestTimeoutMs, defaultApiTimeoutMs, apiVersions, mock(NetworkClientDelegate.class), commitRequestManager, + new ThreadSafeAsyncConsumerState( + new ThreadSafeAutoCommitState.AutoCommitEnabled(new LogContext(), time, 1000), + new LogContext(), metadata, subscriptionState, time, RETRY_BACKOFF_MS, apiVersions + ), new LogContext() ); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java index 67628c513406a..2630e18b58f75 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java @@ -31,6 +31,7 @@ import java.util.Properties; import java.util.UUID; +import static org.apache.kafka.clients.consumer.internals.ThreadSafeAutoCommitState.AUTO_COMMIT_DISABLED; import static org.apache.kafka.test.TestUtils.requiredConsumerConfig; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -50,23 +51,30 @@ public void testMemberStateListenerRegistered() { config, GroupRebalanceConfig.ProtocolType.CONSUMER ); + LogContext logContext = new LogContext(); + MockTime time = new MockTime(); + ConsumerMetadata metadata = mock(ConsumerMetadata.class); + SubscriptionState subscriptions = mock(SubscriptionState.class); + ApiVersions apiVersions = mock(ApiVersions.class); + long retryBackoffMs = 1000L; final RequestManagers requestManagers = RequestManagers.supplier( - new MockTime(), - new LogContext(), + time, + logContext, mock(BackgroundEventHandler.class), - mock(ConsumerMetadata.class), - mock(SubscriptionState.class), + metadata, + subscriptions, mock(FetchBuffer.class), config, groupRebalanceConfig, - mock(ApiVersions.class), + apiVersions, mock(FetchMetricsManager.class), () -> mock(NetworkClientDelegate.class), Optional.empty(), new Metrics(), mock(OffsetCommitCallbackInvoker.class), listener, - Optional.empty() + Optional.empty(), + new ThreadSafeAsyncConsumerState(AUTO_COMMIT_DISABLED, logContext, metadata, subscriptions, time, retryBackoffMs, apiVersions) ).get(); assertTrue(requestManagers.consumerMembershipManager.isPresent()); assertTrue(requestManagers.streamsMembershipManager.isEmpty()); @@ -90,23 +98,30 @@ public void testStreamMemberStateListenerRegistered() { config, GroupRebalanceConfig.ProtocolType.CONSUMER ); + LogContext logContext = new LogContext(); + MockTime time = new MockTime(); + ConsumerMetadata metadata = mock(ConsumerMetadata.class); + SubscriptionState subscriptions = mock(SubscriptionState.class); + ApiVersions apiVersions = mock(ApiVersions.class); + long retryBackoffMs = 1000L; final RequestManagers requestManagers = RequestManagers.supplier( - new MockTime(), - new LogContext(), + time, + logContext, mock(BackgroundEventHandler.class), - mock(ConsumerMetadata.class), - mock(SubscriptionState.class), + metadata, + subscriptions, mock(FetchBuffer.class), config, groupRebalanceConfig, - mock(ApiVersions.class), + apiVersions, mock(FetchMetricsManager.class), () -> mock(NetworkClientDelegate.class), Optional.empty(), new Metrics(), mock(OffsetCommitCallbackInvoker.class), listener, - Optional.of(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())) + Optional.of(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())), + new ThreadSafeAsyncConsumerState(AUTO_COMMIT_DISABLED, logContext, metadata, subscriptions, time, retryBackoffMs, apiVersions) ).get(); assertTrue(requestManagers.streamsMembershipManager.isPresent()); assertTrue(requestManagers.streamsGroupHeartbeatRequestManager.isPresent()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index a4268b7eca0a7..b4fbca673073f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -2750,10 +2750,10 @@ public TestableNetworkClientDelegate(Time time, ConsumerConfig config, LogContext logContext, KafkaClient client, - Metadata metadata, + ConsumerMetadata metadata, BackgroundEventHandler backgroundEventHandler, boolean notifyMetadataErrorsViaErrorQueue) { - super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); + super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class), new ThreadSafeShareConsumerState()); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 5dddd0772df2f..d4244fa90d186 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -141,7 +141,7 @@ private ShareConsumerImpl newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g, h) -> applicationEventHandler, + (a, b, c, d, e, f, g, h, i) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e) -> fetchCollector, backgroundEventQueue