Skip to content

Commit

Permalink
KAFKA-17545: Removing process fetch queue (#17534)
Browse files Browse the repository at this point in the history
The PR removed the process fetch queue as we have moved to share fetch purgatory.

Reviewers: Abhinav Dixit <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
apoorvmittal10 authored Oct 21, 2024
1 parent 6e8df29 commit 7efbed4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 162 deletions.
97 changes: 15 additions & 82 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import scala.jdk.javaapi.CollectionConverters;

Expand Down Expand Up @@ -102,16 +100,6 @@ public class SharePartitionManager implements AutoCloseable {
*/
private final ShareSessionCache cache;

/**
* The fetch queue stores the share fetch requests that are waiting to be processed.
*/
private final ConcurrentLinkedQueue<ShareFetchData> fetchQueue;

/**
* The process fetch queue lock is used to ensure that only one thread is processing the fetch queue at a time.
*/
private final AtomicBoolean processFetchQueueLock;

/**
* The group config manager is used to retrieve the values for dynamic group configurations
*/
Expand Down Expand Up @@ -184,30 +172,27 @@ private SharePartitionManager(
GroupConfigManager groupConfigManager,
Metrics metrics
) {
this.replicaManager = replicaManager;
this.time = time;
this.cache = cache;
this.partitionCacheMap = partitionCacheMap;
this.fetchQueue = new ConcurrentLinkedQueue<>();
this.processFetchQueueLock = new AtomicBoolean(false);
this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
this.timer = new SystemTimerReaper("share-group-lock-timeout-reaper",
new SystemTimer("share-group-lock-timeout"));
this.maxDeliveryCount = maxDeliveryCount;
this.maxInFlightMessages = maxInFlightMessages;
this.persister = persister;
this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this(replicaManager,
time,
cache,
partitionCacheMap,
defaultRecordLockDurationMs,
new SystemTimerReaper("share-group-lock-timeout-reaper",
new SystemTimer("share-group-lock-timeout")),
maxDeliveryCount,
maxInFlightMessages,
persister,
groupConfigManager,
metrics
);
}

// Visible for testing.
@SuppressWarnings({"checkstyle:ParameterNumber"})
SharePartitionManager(
ReplicaManager replicaManager,
Time time,
ShareSessionCache cache,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
ConcurrentLinkedQueue<ShareFetchData> fetchQueue,
int defaultRecordLockDurationMs,
Timer timer,
int maxDeliveryCount,
Expand All @@ -220,8 +205,6 @@ private SharePartitionManager(
this.time = time;
this.cache = cache;
this.partitionCacheMap = partitionCacheMap;
this.fetchQueue = fetchQueue;
this.processFetchQueueLock = new AtomicBoolean(false);
this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
this.timer = timer;
this.maxDeliveryCount = maxDeliveryCount;
Expand Down Expand Up @@ -252,9 +235,7 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages(
partitionMaxBytes.keySet(), groupId, fetchParams);

CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetchData shareFetchData = new ShareFetchData(fetchParams, groupId, memberId, future, partitionMaxBytes);
fetchQueue.add(shareFetchData);
maybeProcessFetchQueue();
processShareFetch(new ShareFetchData(fetchParams, groupId, memberId, future, partitionMaxBytes));

return future;
}
Expand Down Expand Up @@ -530,13 +511,6 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<Delay
@Override
public void close() throws Exception {
this.timer.close();
this.persister.stop();
if (!fetchQueue.isEmpty()) {
log.warn("Closing SharePartitionManager with pending fetch requests count: {}", fetchQueue.size());
fetchQueue.forEach(shareFetchData -> shareFetchData.future().completeExceptionally(
Errors.BROKER_NOT_AVAILABLE.exception()));
fetchQueue.clear();
}
}

private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
Expand All @@ -547,31 +521,11 @@ private static String partitionsToLogString(Collection<TopicIdPartition> partiti
return ShareSession.partitionsToLogString(partitions, log.isTraceEnabled());
}

/**
* Recursive function to process all the fetch requests present inside the fetch queue
*/
// Visible for testing.
void maybeProcessFetchQueue() {
if (!acquireProcessFetchQueueLock()) {
// The queue is already being processed hence avoid re-triggering.
return;
}

ShareFetchData shareFetchData = fetchQueue.poll();
if (shareFetchData == null) {
// No more requests to process, so release the lock. Though we should not reach here as the lock
// is acquired only when there are requests in the queue. But still, it's safe to release the lock.
releaseProcessFetchQueueLock();
return;
}

void processShareFetch(ShareFetchData shareFetchData) {
if (shareFetchData.partitionMaxBytes().isEmpty()) {
// If there are no partitions to fetch then complete the future with an empty map.
shareFetchData.future().complete(Collections.emptyMap());
// Release the lock so that other threads can process the queue.
releaseProcessFetchQueueLock();
if (!fetchQueue.isEmpty())
maybeProcessFetchQueue();
return;
}

Expand All @@ -590,7 +544,6 @@ void maybeProcessFetchQueue() {
sharePartition.maybeInitialize().whenComplete((result, throwable) -> {
if (throwable != null) {
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable);
return;
}
});
});
Expand All @@ -611,20 +564,9 @@ void maybeProcessFetchQueue() {
// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this),
delayedShareFetchWatchKeys);

// Release the lock so that other threads can process the queue.
releaseProcessFetchQueueLock();
// If there are more requests in the queue, then process them.
if (!fetchQueue.isEmpty())
maybeProcessFetchQueue();

} catch (Exception e) {
// In case exception occurs then release the locks so queue can be further processed.
log.error("Error processing fetch queue for share partitions", e);
releaseProcessFetchQueueLock();
// If there are more requests in the queue, then process them.
if (!fetchQueue.isEmpty())
maybeProcessFetchQueue();
}
}

Expand Down Expand Up @@ -679,15 +621,6 @@ private void maybeCompleteInitializationWithException(
future.completeExceptionally(throwable);
}

// Visible for testing.
boolean acquireProcessFetchQueueLock() {
return processFetchQueueLock.compareAndSet(false, true);
}

private void releaseProcessFetchQueueLock() {
processFetchQueueLock.set(false);
}

private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
return new SharePartitionKey(groupId, topicIdPartition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
Expand Down Expand Up @@ -92,7 +91,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -121,7 +119,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Timeout(120)
Expand Down Expand Up @@ -1253,33 +1250,6 @@ public void testCloseSharePartitionManager() throws Exception {
sharePartitionManager.close();
// Verify that the timer object in sharePartitionManager is closed by checking the calls to timer.close() and persister.stop().
Mockito.verify(timer, times(1)).close();
Mockito.verify(persister, times(1)).stop();
}

@Test
public void testCloseShouldCompletePendingFetchRequests() throws Exception {
String groupId = "grp";
Uuid memberId = Uuid.randomUuid();
FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty());
Uuid fooId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build();

// Acquire the fetch lock so fetch requests keep waiting in the queue.
assertTrue(sharePartitionManager.acquireProcessFetchQueueLock());
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes);
// Verify that the fetch request is not completed.
assertFalse(future.isDone());

// Closing the sharePartitionManager closes pending fetch requests in the fetch queue.
sharePartitionManager.close();
// Verify that the fetch request is now completed.
assertTrue(future.isDone());
assertFutureThrows(future, BrokerNotAvailableException.class);
}

@Test
Expand Down Expand Up @@ -1640,49 +1610,6 @@ public void testAcknowledgeEmptyPartitionCacheMap() {
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
}

@Test
public void testFetchQueueProcessingWhenFrontItemIsEmpty() {
String groupId = "grp";
String memberId = Uuid.randomUuid().toString();
FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty());
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);

final Time time = new MockTime();
ReplicaManager replicaManager = mock(ReplicaManager.class);

ShareFetchData shareFetchData1 = new ShareFetchData(
fetchParams, groupId, memberId, new CompletableFuture<>(), Collections.emptyMap());
ShareFetchData shareFetchData2 = new ShareFetchData(
fetchParams, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes);

ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new ConcurrentLinkedQueue<>();
// First request added to fetch queue is empty i.e. no topic partitions to fetch.
fetchQueue.add(shareFetchData1);
// Second request added to fetch queue has a topic partition to fetch.
fetchQueue.add(shareFetchData2);

DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager)
.withTime(time)
.withTimer(mockTimer)
.withFetchQueue(fetchQueue).build();

doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

sharePartitionManager.maybeProcessFetchQueue();

// Verifying that the second item in the fetchQueue is processed, even though the first item is empty.
verify(replicaManager, times(1)).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
}

@Test
public void testAcknowledgeCompletesDelayedShareFetchRequest() {
String groupId = "grp";
Expand Down Expand Up @@ -2314,7 +2241,6 @@ static class SharePartitionManagerBuilder {
private Persister persister = NoOpShareStatePersister.getInstance();
private Timer timer = new MockTimer();
private Metrics metrics = new Metrics();
private ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new ConcurrentLinkedQueue<>();

private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
this.replicaManager = replicaManager;
Expand Down Expand Up @@ -2351,11 +2277,6 @@ private SharePartitionManagerBuilder withMetrics(Metrics metrics) {
return this;
}

private SharePartitionManagerBuilder withFetchQueue(ConcurrentLinkedQueue<ShareFetchData> fetchQueue) {
this.fetchQueue = fetchQueue;
return this;
}

public static SharePartitionManagerBuilder builder() {
return new SharePartitionManagerBuilder();
}
Expand All @@ -2365,7 +2286,6 @@ public SharePartitionManager build() {
time,
cache,
partitionCacheMap,
fetchQueue,
DEFAULT_RECORD_LOCK_DURATION_MS,
timer,
MAX_DELIVERY_COUNT,
Expand Down

0 comments on commit 7efbed4

Please sign in to comment.