Skip to content

Commit

Permalink
Merge branch 'trunk' into KAFKA-9366
Browse files Browse the repository at this point in the history
  • Loading branch information
frankvicky committed Dec 10, 2024
2 parents 23e391b + d208abb commit 38f6155
Show file tree
Hide file tree
Showing 67 changed files with 1,035 additions and 2,453 deletions.
8 changes: 0 additions & 8 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,6 @@ else
CLASSPATH="$file":"$CLASSPATH"
fi
done
if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
fi
if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
fi
fi

for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
Expand Down
59 changes: 2 additions & 57 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ext {
minClientJavaVersion = 11
minNonClientJavaVersion = 17
// The connect:api module also belongs to the clients module, but it has already been bumped to JDK 17 as part of KIP-1032.
modulesNeedingJava11 = [":clients", ":streams", ":streams:test-utils", ":streams-scala", ":test-common:test-common-runtime"]
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams-scala", ":test-common:test-common-runtime"]

buildVersionFileName = "kafka-version.properties"

Expand Down Expand Up @@ -129,8 +129,7 @@ ext {
if (name in ["compileTestJava", "compileTestScala"]) {
options.compilerArgs << "-parameters"
} else if (name in ["compileJava", "compileScala"]) {
if (!project.path.startsWith(":connect") && !project.path.startsWith(":storage"))
options.compilerArgs << "-Xlint:-rawtypes"
options.compilerArgs << "-Xlint:-rawtypes"
options.compilerArgs << "-Xlint:all"
options.compilerArgs << "-Xlint:-serial"
options.compilerArgs << "-Xlint:-try"
Expand Down Expand Up @@ -2759,9 +2758,6 @@ project(':streams') {
':streams:integration-tests',
':streams:test-utils:test',
':streams:streams-scala:test',
':streams:upgrade-system-tests-0100:test',
':streams:upgrade-system-tests-0101:test',
':streams:upgrade-system-tests-0102:test',
':streams:upgrade-system-tests-0110:test',
':streams:upgrade-system-tests-10:test',
':streams:upgrade-system-tests-11:test',
Expand Down Expand Up @@ -2970,57 +2966,6 @@ project(':streams:examples') {
}
}

project(':streams:upgrade-system-tests-0100') {
base {
archivesName = "kafka-streams-upgrade-system-tests-0100"
}

dependencies {
testImplementation(libs.kafkaStreams_0100) {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
}
testRuntimeOnly libs.junitJupiter
}

systemTestLibs {
dependsOn testJar
}
}

project(':streams:upgrade-system-tests-0101') {
base {
archivesName = "kafka-streams-upgrade-system-tests-0101"
}

dependencies {
testImplementation(libs.kafkaStreams_0101) {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
}
testRuntimeOnly libs.junitJupiter
}

systemTestLibs {
dependsOn testJar
}
}

project(':streams:upgrade-system-tests-0102') {
base {
archivesName = "kafka-streams-upgrade-system-tests-0102"
}

dependencies {
testImplementation libs.kafkaStreams_0102
testRuntimeOnly libs.junitJupiter
}

systemTestLibs {
dependsOn testJar
}
}

project(':streams:upgrade-system-tests-0110') {
base{
archivesName = "kafka-streams-upgrade-system-tests-0110"
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
<suppress checks="CyclomaticComplexity|ClassDataAbstractionCoupling" files="SharePartition.java"/>
<suppress checks="CyclomaticComplexity" files="SharePartition.java"/>

<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -353,7 +351,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
metrics,
fetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler);
backgroundEventHandler,
false);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
final Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time,
Expand Down Expand Up @@ -524,7 +523,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
logContext,
client,
metadata,
backgroundEventHandler
backgroundEventHandler,
false
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(
Expand Down Expand Up @@ -1574,12 +1574,9 @@ public void unsubscribe() {
subscriptions.assignedPartitions());

try {
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
// Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background,
// there will be some error events in the background queue.
// If users have fatal error, they will get some exceptions in the background queue.
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> e instanceof InvalidTopicException || e instanceof TopicAuthorizationException || e instanceof GroupAuthorizationException);
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof GroupAuthorizationException);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.common.internals.IdempotentCloser;
Expand All @@ -40,6 +41,7 @@
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
Expand Down Expand Up @@ -154,6 +156,8 @@ void runOnce() {
.reduce(Long.MAX_VALUE, Math::min);

reapExpiredApplicationEvents(currentTimeMs);
List<CompletableEvent<?>> uncompletedEvents = applicationEventReaper.uncompletedEvents();
maybeFailOnMetadataError(uncompletedEvents);
}

/**
Expand All @@ -165,9 +169,13 @@ private void processApplicationEvents() {

for (ApplicationEvent event : events) {
try {
if (event instanceof CompletableEvent)
if (event instanceof CompletableEvent) {
applicationEventReaper.add((CompletableEvent<?>) event);

// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away.
maybeFailOnMetadataError(List.of((CompletableEvent<?>) event));
}
applicationEventProcessor.process(event);
} catch (Throwable t) {
log.warn("Error processing event {}", t.getMessage(), t);
Expand Down Expand Up @@ -325,4 +333,21 @@ void cleanup() {
log.debug("Closed the consumer network thread");
}
}

/**
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
List<? extends CompletableApplicationEvent<?>> subscriptionMetadataEvent = events.stream()
.filter(e -> e instanceof CompletableApplicationEvent<?>)
.map(e -> (CompletableApplicationEvent<?>) e)
.filter(CompletableApplicationEvent::requireSubscriptionMetadata)
.collect(Collectors.toList());

if (subscriptionMetadataEvent.isEmpty())
return;
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,17 @@ public class NetworkClientDelegate implements AutoCloseable {
private final int requestTimeoutMs;
private final Queue<UnsentRequest> unsentRequests;
private final long retryBackoffMs;
private Optional<Exception> metadataError;
private final boolean notifyMetadataErrorsViaErrorQueue;

public NetworkClientDelegate(
final Time time,
final ConsumerConfig config,
final LogContext logContext,
final KafkaClient client,
final Metadata metadata,
final BackgroundEventHandler backgroundEventHandler) {
final BackgroundEventHandler backgroundEventHandler,
final boolean notifyMetadataErrorsViaErrorQueue) {
this.time = time;
this.client = client;
this.metadata = metadata;
Expand All @@ -85,6 +88,8 @@ public NetworkClientDelegate(
this.unsentRequests = new ArrayDeque<>();
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadataError = Optional.empty();
this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue;
}

// Visible for testing
Expand Down Expand Up @@ -150,7 +155,11 @@ private void maybePropagateMetadataError() {
try {
metadata.maybeThrowAnyException();
} catch (Exception e) {
backgroundEventHandler.add(new ErrorEvent(e));
if (notifyMetadataErrorsViaErrorQueue) {
backgroundEventHandler.add(new ErrorEvent(e));
} else {
metadataError = Optional.of(e);
}
}
}

Expand Down Expand Up @@ -230,6 +239,12 @@ private ClientRequest makeClientRequest(
unsent.handler
);
}

public Optional<Exception> getAndClearMetadataError() {
Optional<Exception> metadataError = this.metadataError;
this.metadataError = Optional.empty();
return metadataError;
}

public Node leastLoadedNode() {
return this.client.leastLoadedNode(time.milliseconds()).node();
Expand Down Expand Up @@ -412,7 +427,8 @@ public static Supplier<NetworkClientDelegate> supplier(final Time time,
final Metrics metrics,
final Sensor throttleTimeSensor,
final ClientTelemetrySender clientTelemetrySender,
final BackgroundEventHandler backgroundEventHandler) {
final BackgroundEventHandler backgroundEventHandler,
final boolean notifyMetadataErrorsViaErrorQueue) {
return new CachedSupplier<>() {
@Override
protected NetworkClientDelegate create() {
Expand All @@ -426,7 +442,7 @@ protected NetworkClientDelegate create() {
metadata,
throttleTimeSensor,
clientTelemetrySender);
return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler);
return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,16 +990,18 @@ UnsentRequest buildRequest() {
}

ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig);
Node nodeToSend = metadata.fetch().nodeById(nodeId);

log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend);
nodesWithPendingRequests.add(nodeId);
isProcessed = false;

if (requestBuilder == null) {
handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
return null;
} else {
Node nodeToSend = metadata.fetch().nodeById(nodeId);
nodesWithPendingRequests.add(nodeId);

log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend);

inFlightAcknowledgements.putAll(finalAcknowledgementsToSend);
if (incompleteAcknowledgements.isEmpty()) {
acknowledgementsToSend.clear();
Expand Down Expand Up @@ -1082,12 +1084,16 @@ void handleAcknowledgeTimedOut(TopicIdPartition tip) {
* being sent.
*/
void handleSessionErrorCode(Errors errorCode) {
inFlightAcknowledgements.forEach((tip, acks) -> {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapToClear =
incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend : incompleteAcknowledgements;

acknowledgementsMapToClear.forEach((tip, acks) -> {
if (acks != null) {
acks.setAcknowledgeErrorCode(errorCode);
}
resultHandler.complete(tip, acks, onCommitAsync());
});
acknowledgementsMapToClear.clear();
processingComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ private enum AcknowledgementMode {
metrics,
shareFetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler
backgroundEventHandler,
true
);
this.completedAcknowledgements = new LinkedList<>();

Expand Down Expand Up @@ -378,7 +379,7 @@ private enum AcknowledgementMode {
final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue);

final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
() -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler);
() -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true);

GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfi
public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String groupId, FetchConfig fetchConfig) {
if (nextMetadata.isNewSession()) {
// A share session cannot be started with a ShareAcknowledge request
nextPartitions.clear();
nextAcknowledgements.clear();
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;

/**
* Event to check if all assigned partitions have fetch positions. If there are positions missing, it will fetch
Expand All @@ -32,4 +35,15 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
public CheckAndUpdatePositionsEvent(long deadlineMs) {
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
}

/**
* Indicates that this event requires subscription metadata to be present
* for its execution. This is used to ensure that metadata errors are
* handled correctly during the {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration) poll}
* or {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition) position} process.
*/
@Override
public boolean requireSubscriptionMetadata() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ public long deadlineMs() {
protected String toStringBase() {
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
}

public boolean requireSubscriptionMetadata() {
return false;
}
}
Loading

0 comments on commit 38f6155

Please sign in to comment.