Skip to content

Commit

Permalink
KAFKA-14438: Throw if async consumer configured with invalid group ID (
Browse files Browse the repository at this point in the history
…#14872)

Verifies that the group ID passed into the async consumer is valid. That is, if the group ID is not null, it is not empty or it does not consist of only whitespaces.

This change stores the group ID in the group metadata because KAFKA-15281 about the group metadata API will build on that.

Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
  • Loading branch information
cadonna authored Dec 3, 2023
1 parent bce2d4a commit 0cf227d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
Expand Down Expand Up @@ -132,7 +133,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {

private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private final Optional<String> groupId;
private Optional<ConsumerGroupMetadata> groupMetadata;
private final KafkaConsumerMetrics kafkaConsumerMetrics;
private Logger log;
private final String clientId;
Expand Down Expand Up @@ -162,8 +163,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private boolean isFenced = false;
private final Optional<String> groupInstanceId;

private final OffsetCommitCallbackInvoker invoker = new OffsetCommitCallbackInvoker();

// currentThread holds the threadId of the current thread accessing the AsyncKafkaConsumer
Expand All @@ -175,18 +174,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);
this.groupInstanceId = groupRebalanceConfig.groupInstanceId;
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
GroupRebalanceConfig.ProtocolType.CONSUMER
);
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
LogContext logContext = createLogContext(config, groupRebalanceConfig);
this.log = logContext.logger(getClass());
groupId.ifPresent(groupIdStr -> {
if (groupIdStr.isEmpty()) {
log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
}
});

log.debug("Initializing the Kafka consumer");
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
Expand Down Expand Up @@ -249,11 +243,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
);

// no coordinator will be constructed for the default (null) group id
if (!groupId.isPresent()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
}
this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig);

// The FetchCollector is only used on the application thread.
this.fetchCollector = new FetchCollector<>(logContext,
Expand Down Expand Up @@ -307,15 +297,14 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.time = time;
this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
this.metrics = metrics;
this.groupId = Optional.ofNullable(groupId);
this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty());
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
this.groupInstanceId = Optional.empty();
}

// Visible for testing
Expand All @@ -336,13 +325,11 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
this.time = time;
this.metrics = new Metrics(time);
this.groupId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
this.metadata = metadata;
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
this.assignors = assignors;
this.groupInstanceId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));

ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
Expand All @@ -360,6 +347,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
GroupRebalanceConfig.ProtocolType.CONSUMER
);

this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig);

BlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<>();
BlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
Expand Down Expand Up @@ -397,6 +386,36 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
requestManagersSupplier);
}

private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final ConsumerConfig config,
final GroupRebalanceConfig groupRebalanceConfig) {
final Optional<ConsumerGroupMetadata> groupMetadata = initializeGroupMetadata(
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId
);
if (!groupMetadata.isPresent()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
}
return groupMetadata;
}

private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final String groupId,
final Optional<String> groupInstanceId) {
if (groupId != null) {
if (groupId.isEmpty()) {
throw new InvalidGroupIdException("The configured group.id should not be an empty string or whitespace.");
} else {
return Optional.of(new ConsumerGroupMetadata(
groupId,
JoinGroupRequest.UNKNOWN_GENERATION_ID,
JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupInstanceId
));
}
}
return Optional.empty();
}

/**
* poll implementation using {@link ApplicationEventHandler}.
* 1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
Expand Down Expand Up @@ -675,9 +694,9 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
}

private void maybeThrowInvalidGroupIdException() {
if (!groupId.isPresent() || groupId.get().isEmpty()) {
if (!groupMetadata.isPresent()) {
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
}
}

Expand Down Expand Up @@ -1055,7 +1074,7 @@ public void unsubscribe() {
acquireAndEnsureOpen();
try {
fetchBuffer.retainAll(Collections.emptySet());
if (groupId.isPresent()) {
if (groupMetadata.isPresent()) {
UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent();
applicationEventHandler.add(unsubscribeApplicationEvent);
try {
Expand Down Expand Up @@ -1186,7 +1205,7 @@ private boolean updateFetchPositions(final Timer timer) {
* according to config {@link CommonClientConfigs#GROUP_ID_CONFIG}
*/
private boolean isCommittedOffsetsManagementEnabled() {
return groupId.isPresent();
return groupMetadata.isPresent();
}

/**
Expand Down Expand Up @@ -1374,8 +1393,15 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {

private void maybeThrowFencedInstanceException() {
if (isFenced) {
throw new FencedInstanceIdException("Get fenced exception for group.instance.id " +
groupInstanceId.orElse("null"));
String groupInstanceId = "unknown";
if (!groupMetadata.isPresent()) {
log.error("No group metadata found although a group ID was provided. This is a bug!");
} else if (!groupMetadata.get().groupInstanceId().isPresent()) {
log.error("No group instance ID found although the consumer is fenced. This is a bug!");
} else {
groupInstanceId = groupMetadata.get().groupInstanceId().get();
}
throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + groupInstanceId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -63,6 +65,7 @@
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.opentest4j.AssertionFailedError;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -73,6 +76,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -85,6 +89,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand Down Expand Up @@ -755,6 +760,70 @@ public void testSubscriptionOnEmptyTopic() {
assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic)));
}

@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);

try (AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
} catch (final Exception exception) {
throw new AssertionFailedError("The following exception was not expected:", exception);
}
}

@Test
public void testGroupIdNotNullAndValid() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);

try (AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
} catch (final Exception exception) {
throw new AssertionFailedError("The following exception was not expected:", exception);
}
}

@Test
public void testGroupIdEmpty() {
testInvalidGroupId("");
}

@Test
public void testGroupIdOnlyWhitespaces() {
testInvalidGroupId(" ");
}

private void testInvalidGroupId(final String groupId) {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
final ConsumerConfig config = new ConsumerConfig(props);

final Exception exception = assertThrows(
KafkaException.class,
() -> new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())
);

assertEquals("Failed to construct kafka consumer", exception.getMessage());
}

private Properties requiredConsumerProperties() {
final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
return props;
}

private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) {
// Uncompleted future that will time out if used
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{MetricName, PartitionInfo, TopicPartition}
import org.apache.kafka.common.{KafkaException, MetricName, PartitionInfo, TopicPartition}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException}
import org.apache.kafka.common.header.Headers
Expand Down Expand Up @@ -2027,10 +2027,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
val consumer1 = createConsumer(configOverrides = consumer1Config)

consumer1.assign(List(tp).asJava)
assertThrows(classOf[InvalidGroupIdException], () => consumer1.commitSync())
assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = consumer1Config))
}

// Static membership temporarily not supported in consumer group protocol
Expand Down

0 comments on commit 0cf227d

Please sign in to comment.