Skip to content

Commit

Permalink
KAFKA-17848: Fixing share purgatory request and locks handling (#17583)
Browse files Browse the repository at this point in the history
For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed.

Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
apoorvmittal10 authored Oct 26, 2024
1 parent d7ac865 commit 397ae59
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 58 deletions.
56 changes: 31 additions & 25 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public void onExpiration() {
*/
@Override
public void onComplete() {
log.trace("Completing the delayed share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet());
log.trace("Completing the delayed share fetch request for group {}, member {}, "
+ "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
topicPartitionDataFromTryComplete.keySet());

if (shareFetchData.future().isDone())
return;
Expand All @@ -100,7 +100,7 @@ public void onComplete() {
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams());
topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams());

try {
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
Expand Down Expand Up @@ -149,17 +149,17 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
log.trace("Try to complete the delayed share fetch request for group {}, member {}, topic partitions {}",
shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());

topicPartitionDataFromTryComplete = acquirablePartitions();

if (!topicPartitionDataFromTryComplete.isEmpty())
return forceComplete();
log.info("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet());
if (!topicPartitionDataFromTryComplete.isEmpty()) {
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that means the request is already completed
// hence release the acquired locks.
if (!completedByMe) {
releasePartitionLocks(shareFetchData.groupId(), topicPartitionDataFromTryComplete.keySet());
}
return completedByMe;
}
return false;
}

Expand All @@ -182,22 +182,28 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
// Add the share partition to the list of partitions to be fetched only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
// If the share partition is already at capacity, we should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
topicPartitionData.put(
try {
// If the share partition is already at capacity, we should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
partitionMaxBytes,
Optional.empty()
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
partitionMaxBytes,
Optional.empty()
)
);
} else {
sharePartition.releaseFetchLock();
log.trace("Record lock partition limit exceeded for SharePartition {}, " +
);
} else {
sharePartition.releaseFetchLock();
log.trace("Record lock partition limit exceeded for SharePartition {}, " +
"cannot acquire more records", sharePartition);
}
} catch (Exception e) {
log.error("Error checking condition for SharePartition: {}", sharePartition, e);
// Release the lock, if error occurred.
sharePartition.releaseFetchLock();
}
}
});
Expand Down
16 changes: 10 additions & 6 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -1846,14 +1846,15 @@ private AcquisitionLockTimerTask acquisitionLockTimerTask(
}

private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) {
List<PersisterStateBatch> stateBatches;
lock.writeLock().lock();
try {
Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset);
if (floorOffset == null) {
log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition);
return;
}
List<PersisterStateBatch> stateBatches = new ArrayList<>();
stateBatches = new ArrayList<>();
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
Expand Down Expand Up @@ -1884,16 +1885,19 @@ && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las
// Even if write share group state RPC call fails, we will still go ahead with the state transition.
// Update the cached state and start and end offsets after releasing the acquisition lock on timeout.
maybeUpdateCachedStateAndOffsets();

// If we have an acquisition lock timeout for a share-partition, then we should check if
// there is a pending share fetch request for the share-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
});
}
} finally {
lock.writeLock().unlock();
}

// Skip null check for stateBatches, it should always be initialized if reached here.
if (!stateBatches.isEmpty()) {
// If we have an acquisition lock timeout for a share-partition, then we should check if
// there is a pending share fetch request for the share-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
}
}

private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch,
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/server/DelayedOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ abstract class DelayedOperation(delayMs: Long,
/**
* Thread-safe variant of tryComplete()
*/
private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete())
private[server] def safeTryComplete(): Boolean = inLock(lock) {
if (isCompleted)
false
else
tryComplete()
}

/*
* run() method defines a task that is executed on timeout
Expand Down
93 changes: 67 additions & 26 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand All @@ -73,6 +74,9 @@
public class DelayedShareFetchTest {
private static final int MAX_WAIT_MS = 5000;
private static final int MAX_FETCH_RECORDS = 100;
private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty());

private static Timer mockTimer;

Expand Down Expand Up @@ -107,10 +111,8 @@ public void testDelayedShareFetchTryCompleteReturnsFalse() {
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1);

ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);

when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
Expand Down Expand Up @@ -145,10 +147,8 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1);

ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
Expand Down Expand Up @@ -189,10 +189,8 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() {
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1);

ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);

when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
Expand Down Expand Up @@ -232,10 +230,8 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1);

ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
Expand Down Expand Up @@ -273,10 +269,8 @@ public void testToCompleteAnAlreadyCompletedFuture() {
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);

CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, MAX_FETCH_RECORDS);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, MAX_FETCH_RECORDS);

when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false);
Expand Down Expand Up @@ -328,9 +322,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
when(sharePartitionManager1.sharePartition(groupId, tp1)).thenReturn(sp1);
when(sharePartitionManager1.sharePartition(groupId, tp2)).thenReturn(sp2);

ShareFetchData shareFetchData1 = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
ShareFetchData shareFetchData1 = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes1, MAX_FETCH_RECORDS);

DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
Expand All @@ -357,9 +349,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES);
ShareFetchData shareFetchData2 = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
ShareFetchData shareFetchData2 = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes2, MAX_FETCH_RECORDS);

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
Expand Down Expand Up @@ -397,6 +387,57 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
Mockito.verify(replicaManager, times(0)).tryCompleteActions();
}

@Test
public void testLocksReleasedForCompletedFetch() {
String groupId = "grp";
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));

SharePartition sp0 = mock(SharePartition.class);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(true);

SharePartitionManager sharePartitionManager1 = mock(SharePartitionManager.class);
when(sharePartitionManager1.sharePartition(groupId, tp0)).thenReturn(sp0);

ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), MAX_FETCH_RECORDS);

DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withSharePartitionManager(sharePartitionManager1)
.build();

DelayedShareFetch spy = spy(delayedShareFetch);
doReturn(false).when(spy).forceComplete();

assertFalse(spy.tryComplete());
Mockito.verify(sp0, times(1)).releaseFetchLock();
}

@Test
public void testLocksReleasedAcquireException() {
String groupId = "grp";
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));

SharePartition sp0 = mock(SharePartition.class);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenThrow(new RuntimeException("Acquire exception"));

SharePartitionManager sharePartitionManager1 = mock(SharePartitionManager.class);
when(sharePartitionManager1.sharePartition(groupId, tp0)).thenReturn(sp0);

ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), MAX_FETCH_RECORDS);

DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withSharePartitionManager(sharePartitionManager1)
.build();

assertFalse(delayedShareFetch.tryComplete());
Mockito.verify(sp0, times(1)).releaseFetchLock();
}

static class DelayedShareFetchBuilder {
ShareFetchData shareFetchData = mock(ShareFetchData.class);
private ReplicaManager replicaManager = mock(ReplicaManager.class);
Expand Down

0 comments on commit 397ae59

Please sign in to comment.