From 7efbed4e497bf4943746a2d976336f56f00da07a Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Mon, 21 Oct 2024 17:27:09 +0100 Subject: [PATCH] KAFKA-17545: Removing process fetch queue (#17534) The PR removed the process fetch queue as we have moved to share fetch purgatory. Reviewers: Abhinav Dixit , Jun Rao --- .../server/share/SharePartitionManager.java | 97 +++---------------- .../share/SharePartitionManagerTest.java | 80 --------------- 2 files changed, 15 insertions(+), 162 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 33bd5c376066d..84ccc28565c32 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -69,8 +69,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import scala.jdk.javaapi.CollectionConverters; @@ -102,16 +100,6 @@ public class SharePartitionManager implements AutoCloseable { */ private final ShareSessionCache cache; - /** - * The fetch queue stores the share fetch requests that are waiting to be processed. - */ - private final ConcurrentLinkedQueue fetchQueue; - - /** - * The process fetch queue lock is used to ensure that only one thread is processing the fetch queue at a time. - */ - private final AtomicBoolean processFetchQueueLock; - /** * The group config manager is used to retrieve the values for dynamic group configurations */ @@ -184,30 +172,27 @@ private SharePartitionManager( GroupConfigManager groupConfigManager, Metrics metrics ) { - this.replicaManager = replicaManager; - this.time = time; - this.cache = cache; - this.partitionCacheMap = partitionCacheMap; - this.fetchQueue = new ConcurrentLinkedQueue<>(); - this.processFetchQueueLock = new AtomicBoolean(false); - this.defaultRecordLockDurationMs = defaultRecordLockDurationMs; - this.timer = new SystemTimerReaper("share-group-lock-timeout-reaper", - new SystemTimer("share-group-lock-timeout")); - this.maxDeliveryCount = maxDeliveryCount; - this.maxInFlightMessages = maxInFlightMessages; - this.persister = persister; - this.groupConfigManager = groupConfigManager; - this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); + this(replicaManager, + time, + cache, + partitionCacheMap, + defaultRecordLockDurationMs, + new SystemTimerReaper("share-group-lock-timeout-reaper", + new SystemTimer("share-group-lock-timeout")), + maxDeliveryCount, + maxInFlightMessages, + persister, + groupConfigManager, + metrics + ); } // Visible for testing. - @SuppressWarnings({"checkstyle:ParameterNumber"}) SharePartitionManager( ReplicaManager replicaManager, Time time, ShareSessionCache cache, Map partitionCacheMap, - ConcurrentLinkedQueue fetchQueue, int defaultRecordLockDurationMs, Timer timer, int maxDeliveryCount, @@ -220,8 +205,6 @@ private SharePartitionManager( this.time = time; this.cache = cache; this.partitionCacheMap = partitionCacheMap; - this.fetchQueue = fetchQueue; - this.processFetchQueueLock = new AtomicBoolean(false); this.defaultRecordLockDurationMs = defaultRecordLockDurationMs; this.timer = timer; this.maxDeliveryCount = maxDeliveryCount; @@ -252,9 +235,7 @@ public CompletableFuture> fetchMessages( partitionMaxBytes.keySet(), groupId, fetchParams); CompletableFuture> future = new CompletableFuture<>(); - ShareFetchData shareFetchData = new ShareFetchData(fetchParams, groupId, memberId, future, partitionMaxBytes); - fetchQueue.add(shareFetchData); - maybeProcessFetchQueue(); + processShareFetch(new ShareFetchData(fetchParams, groupId, memberId, future, partitionMaxBytes)); return future; } @@ -530,13 +511,6 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set shareFetchData.future().completeExceptionally( - Errors.BROKER_NOT_AVAILABLE.exception())); - fetchQueue.clear(); - } } private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) { @@ -547,31 +521,11 @@ private static String partitionsToLogString(Collection partiti return ShareSession.partitionsToLogString(partitions, log.isTraceEnabled()); } - /** - * Recursive function to process all the fetch requests present inside the fetch queue - */ // Visible for testing. - void maybeProcessFetchQueue() { - if (!acquireProcessFetchQueueLock()) { - // The queue is already being processed hence avoid re-triggering. - return; - } - - ShareFetchData shareFetchData = fetchQueue.poll(); - if (shareFetchData == null) { - // No more requests to process, so release the lock. Though we should not reach here as the lock - // is acquired only when there are requests in the queue. But still, it's safe to release the lock. - releaseProcessFetchQueueLock(); - return; - } - + void processShareFetch(ShareFetchData shareFetchData) { if (shareFetchData.partitionMaxBytes().isEmpty()) { // If there are no partitions to fetch then complete the future with an empty map. shareFetchData.future().complete(Collections.emptyMap()); - // Release the lock so that other threads can process the queue. - releaseProcessFetchQueueLock(); - if (!fetchQueue.isEmpty()) - maybeProcessFetchQueue(); return; } @@ -590,7 +544,6 @@ void maybeProcessFetchQueue() { sharePartition.maybeInitialize().whenComplete((result, throwable) -> { if (throwable != null) { maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable); - return; } }); }); @@ -611,20 +564,9 @@ void maybeProcessFetchQueue() { // Add the share fetch to the delayed share fetch purgatory to process the fetch request. addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this), delayedShareFetchWatchKeys); - - // Release the lock so that other threads can process the queue. - releaseProcessFetchQueueLock(); - // If there are more requests in the queue, then process them. - if (!fetchQueue.isEmpty()) - maybeProcessFetchQueue(); - } catch (Exception e) { // In case exception occurs then release the locks so queue can be further processed. log.error("Error processing fetch queue for share partitions", e); - releaseProcessFetchQueueLock(); - // If there are more requests in the queue, then process them. - if (!fetchQueue.isEmpty()) - maybeProcessFetchQueue(); } } @@ -679,15 +621,6 @@ private void maybeCompleteInitializationWithException( future.completeExceptionally(throwable); } - // Visible for testing. - boolean acquireProcessFetchQueueLock() { - return processFetchQueueLock.compareAndSet(false, true); - } - - private void releaseProcessFetchQueueLock() { - processFetchQueueLock.set(false); - } - private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) { return new SharePartitionKey(groupId, topicIdPartition); } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 8a6e729a167bd..c21e0b149aef4 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.FencedStateEpochException; import org.apache.kafka.common.errors.InvalidRecordStateException; @@ -92,7 +91,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -121,7 +119,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @Timeout(120) @@ -1253,33 +1250,6 @@ public void testCloseSharePartitionManager() throws Exception { sharePartitionManager.close(); // Verify that the timer object in sharePartitionManager is closed by checking the calls to timer.close() and persister.stop(). Mockito.verify(timer, times(1)).close(); - Mockito.verify(persister, times(1)).stop(); - } - - @Test - public void testCloseShouldCompletePendingFetchRequests() throws Exception { - String groupId = "grp"; - Uuid memberId = Uuid.randomUuid(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); - Uuid fooId = Uuid.randomUuid(); - TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); - Map partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES); - - SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build(); - - // Acquire the fetch lock so fetch requests keep waiting in the queue. - assertTrue(sharePartitionManager.acquireProcessFetchQueueLock()); - CompletableFuture> future = - sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - // Verify that the fetch request is not completed. - assertFalse(future.isDone()); - - // Closing the sharePartitionManager closes pending fetch requests in the fetch queue. - sharePartitionManager.close(); - // Verify that the fetch request is now completed. - assertTrue(future.isDone()); - assertFutureThrows(future, BrokerNotAvailableException.class); } @Test @@ -1640,49 +1610,6 @@ public void testAcknowledgeEmptyPartitionCacheMap() { assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode()); } - @Test - public void testFetchQueueProcessingWhenFrontItemIsEmpty() { - String groupId = "grp"; - String memberId = Uuid.randomUuid().toString(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); - TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - Map partitionMaxBytes = new HashMap<>(); - partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); - - final Time time = new MockTime(); - ReplicaManager replicaManager = mock(ReplicaManager.class); - - ShareFetchData shareFetchData1 = new ShareFetchData( - fetchParams, groupId, memberId, new CompletableFuture<>(), Collections.emptyMap()); - ShareFetchData shareFetchData2 = new ShareFetchData( - fetchParams, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes); - - ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); - // First request added to fetch queue is empty i.e. no topic partitions to fetch. - fetchQueue.add(shareFetchData1); - // Second request added to fetch queue has a topic partition to fetch. - fetchQueue.add(shareFetchData2); - - DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), - DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); - - SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withReplicaManager(replicaManager) - .withTime(time) - .withTimer(mockTimer) - .withFetchQueue(fetchQueue).build(); - - doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); - - sharePartitionManager.maybeProcessFetchQueue(); - - // Verifying that the second item in the fetchQueue is processed, even though the first item is empty. - verify(replicaManager, times(1)).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); - } - @Test public void testAcknowledgeCompletesDelayedShareFetchRequest() { String groupId = "grp"; @@ -2314,7 +2241,6 @@ static class SharePartitionManagerBuilder { private Persister persister = NoOpShareStatePersister.getInstance(); private Timer timer = new MockTimer(); private Metrics metrics = new Metrics(); - private ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -2351,11 +2277,6 @@ private SharePartitionManagerBuilder withMetrics(Metrics metrics) { return this; } - private SharePartitionManagerBuilder withFetchQueue(ConcurrentLinkedQueue fetchQueue) { - this.fetchQueue = fetchQueue; - return this; - } - public static SharePartitionManagerBuilder builder() { return new SharePartitionManagerBuilder(); } @@ -2365,7 +2286,6 @@ public SharePartitionManager build() { time, cache, partitionCacheMap, - fetchQueue, DEFAULT_RECORD_LOCK_DURATION_MS, timer, MAX_DELIVERY_COUNT,