Skip to content

Commit

Permalink
KAFKA-18160 Interrupting or waking up onPartitionsAssigned in AsyncCo…
Browse files Browse the repository at this point in the history
…nsumer can cause the ConsumerRebalanceListenerCallbackCompletedEvent to be skipped (#18089)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
brandboat authored Dec 15, 2024
1 parent fef625c commit 0815d70
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1175,10 +1175,16 @@ private CompletableFuture<Void> assignPartitions(

// Invoke user call back.
CompletableFuture<Void> result = signalPartitionsAssigned(addedPartitions);
// Enable newly added partitions to start fetching and updating positions for them.
result.whenComplete((__, exception) -> {
if (exception == null) {
// Enable newly added partitions to start fetching and updating positions for them.
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
// Enable assigned partitions to start fetching and updating positions for them.
// We use assignedPartitions here instead of addedPartitions because there's a chance that the callback
// might throw an exception, leaving addedPartitions empty. This would result in the poll operation
// returning no records, as no topic partitions are marked as fetchable. In contrast, with the classic consumer,
// if the first callback fails but the next one succeeds, polling can still retrieve data. To align with
// this behavior, we rely on assignedPartitions to avoid such scenarios.
subscriptions.enablePartitionsAwaitingCallback(toTopicPartitionSet(assignedPartitions));
} else {
// Keeping newly added partitions as non-fetchable after the callback failure.
// They will be retried on the next reconciliation loop, until it succeeds or the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -2072,23 +2073,27 @@ static ConsumerRebalanceListenerCallbackCompletedEvent invokeRebalanceCallbacks(
ConsumerRebalanceListenerMethodName methodName,
SortedSet<TopicPartition> partitions,
CompletableFuture<Void> future) {
final Exception e;
Exception e;

switch (methodName) {
case ON_PARTITIONS_REVOKED:
e = rebalanceListenerInvoker.invokePartitionsRevoked(partitions);
break;
try {
switch (methodName) {
case ON_PARTITIONS_REVOKED:
e = rebalanceListenerInvoker.invokePartitionsRevoked(partitions);
break;

case ON_PARTITIONS_ASSIGNED:
e = rebalanceListenerInvoker.invokePartitionsAssigned(partitions);
break;
case ON_PARTITIONS_ASSIGNED:
e = rebalanceListenerInvoker.invokePartitionsAssigned(partitions);
break;

case ON_PARTITIONS_LOST:
e = rebalanceListenerInvoker.invokePartitionsLost(partitions);
break;
case ON_PARTITIONS_LOST:
e = rebalanceListenerInvoker.invokePartitionsLost(partitions);
break;

default:
throw new IllegalArgumentException("The method " + methodName.fullyQualifiedMethodName() + " to invoke was not expected");
default:
throw new IllegalArgumentException("The method " + methodName.fullyQualifiedMethodName() + " to invoke was not expected");
}
} catch (WakeupException | InterruptException ex) {
e = ex;
}

final Optional<KafkaException> error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,8 @@ public synchronized void assignFromSubscribedAwaitingCallback(Collection<TopicPa
}

/**
* Enable fetching and updating positions for the given partitions that were added to the
* assignment, but waiting for the onPartitionsAssigned callback to complete. This is
* Enable fetching and updating positions for the given partitions that were assigned to the
* consumer, but waiting for the onPartitionsAssigned callback to complete. This is
* expected to be used by the async consumer.
*/
public synchronized void enablePartitionsAwaitingCallback(Collection<TopicPartition> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
Expand Down Expand Up @@ -96,6 +98,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("ClassDataAbstractionCoupling")
public class ConsumerMembershipManagerTest {

private static final String GROUP_ID = "test-group";
Expand Down Expand Up @@ -1738,14 +1741,20 @@ public void testListenerCallbacksBasic() {

@Test
public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
testErrorsOnPartitionsRevoked(new WakeupException());
testErrorsOnPartitionsRevoked(new InterruptException("Intentional onPartitionsRevoked() error"));
testErrorsOnPartitionsRevoked(new IllegalArgumentException("Intentional onPartitionsRevoked() error"));
}

private void testErrorsOnPartitionsRevoked(RuntimeException error) {
// Step 1: set up mocks
String topicName = "topic1";
Uuid topicId = Uuid.randomUuid();

ConsumerMembershipManager membershipManager = createMemberInStableState();
mockOwnedPartition(membershipManager, topicId, topicName);
CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(
Optional.of(new IllegalArgumentException("Intentional onPartitionsRevoked() error")),
Optional.ofNullable(error),
Optional.empty(),
Optional.empty()
);
Expand Down Expand Up @@ -1792,14 +1801,20 @@ public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {

@Test
public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
testErrorsOnPartitionsAssigned(new WakeupException());
testErrorsOnPartitionsAssigned(new InterruptException("Intentional error"));
testErrorsOnPartitionsAssigned(new IllegalArgumentException("Intentional error"));
}

private void testErrorsOnPartitionsAssigned(RuntimeException error) {
// Step 1: set up mocks
ConsumerMembershipManager membershipManager = createMemberInStableState();
String topicName = "topic1";
Uuid topicId = Uuid.randomUuid();
mockOwnedPartition(membershipManager, topicId, topicName);
CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(
Optional.empty(),
Optional.of(new IllegalArgumentException("Intentional onPartitionsAssigned() error")),
Optional.ofNullable(error),
Optional.empty()
);
ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker();
Expand Down Expand Up @@ -1879,7 +1894,7 @@ public void testAddedPartitionsTemporarilyDisabledAwaitingOnPartitionsAssignedCa
true
);

verify(subscriptionState).enablePartitionsAwaitingCallback(addedPartitions);
verify(subscriptionState).enablePartitionsAwaitingCallback(assignedPartitions);
}

@Test
Expand Down Expand Up @@ -1915,12 +1930,14 @@ public void testAddedPartitionsNotEnabledAfterFailedOnPartitionsAssignedCallback

@Test
public void testOnPartitionsLostNoError() {
testOnPartitionsLost(Optional.empty());
testOnPartitionsLost(null);
}

@Test
public void testOnPartitionsLostError() {
testOnPartitionsLost(Optional.of(new KafkaException("Intentional error for test")));
testOnPartitionsLost(new KafkaException("Intentional error for test"));
testOnPartitionsLost(new WakeupException());
testOnPartitionsLost(new InterruptException("Intentional error for test"));
}

private void assertLeaveGroupDueToExpiredPollAndTransitionToStale(ConsumerMembershipManager membershipManager) {
Expand Down Expand Up @@ -2054,7 +2071,7 @@ private void mockPartitionOwnedAndNewPartitionAdded(String topicName,
receiveAssignment(topicId, Arrays.asList(partitionOwned, partitionAdded), membershipManager);
}

private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
private void testOnPartitionsLost(RuntimeException lostError) {
// Step 1: set up mocks
ConsumerMembershipManager membershipManager = createMemberInStableState();
String topicName = "topic1";
Expand All @@ -2063,7 +2080,7 @@ private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(
Optional.empty(),
Optional.empty(),
lostError
Optional.ofNullable(lostError)
);
ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker();

Expand Down

This file was deleted.

Loading

0 comments on commit 0815d70

Please sign in to comment.