Skip to content

Commit

Permalink
Merge branch 'trunk' into KAFKA-9366
Browse files Browse the repository at this point in the history
  • Loading branch information
frankvicky committed Dec 8, 2024
2 parents 9bf59d9 + 104fa57 commit 23e391b
Show file tree
Hide file tree
Showing 289 changed files with 8,785 additions and 3,690 deletions.
12 changes: 12 additions & 0 deletions .github/configs/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,15 @@ kraft:
- any-glob-to-any-file:
- 'metadata/**'
- 'raft/**'

dependencies:
- changed-files:
- any-glob-to-any-file:
- "grade/dependencies.gradle"
- "LICENSE-binary"

tiered-storage:
- changed-files:
- any-glob-to-any-file:
- '*/src/*/java/org/apache/kafka/server/log/remote/**'
- '*/src/*/java/kafka/log/remote/**'
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fail due to code changes. You can just run:
Using compiled files:

KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
./bin/kafka-server-start.sh config/kraft/reconfig-server.properties

Using docker image:
Expand Down
22 changes: 21 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ allprojects {
options.links "https://docs.oracle.com/en/java/javase/${JavaVersion.current().majorVersion}/docs/api/"
}

tasks.withType(Checkstyle) {
minHeapSize = "200m"
maxHeapSize = "1g"
}

clean {
delete "${projectDir}/src/generated"
delete "${projectDir}/src/generated-test"
Expand Down Expand Up @@ -548,6 +553,7 @@ subprojects {
task quarantinedTest(type: Test, dependsOn: compileJava) {
ext {
isGithubActions = System.getenv('GITHUB_ACTIONS') != null
hadFailure = false // Used to track if any tests failed, see afterSuite below
}

// Disable caching and up-to-date for this task. We always want quarantined tests
Expand All @@ -557,7 +563,7 @@ subprojects {
outputs.cacheIf { false }

maxParallelForks = maxTestForks
ignoreFailures = userIgnoreFailures
ignoreFailures = userIgnoreFailures || ext.isGithubActions

maxHeapSize = defaultMaxHeapSize
jvmArgs = defaultJvmArgs
Expand Down Expand Up @@ -586,6 +592,13 @@ subprojects {
}
}

// As we process results, check if there were any test failures.
afterSuite { desc, result ->
if (result.resultType == TestResult.ResultType.FAILURE) {
ext.hadFailure = true
}
}

// This closure will copy JUnit XML files out of the sub-project's build directory and into
// a top-level build/junit-xml directory. This is necessary to avoid reporting on tests which
// were not run, but instead were restored via FROM-CACHE. See KAFKA-17479 for more details.
Expand All @@ -599,6 +612,11 @@ subprojects {
ant.include(name: "**/*.xml")
}
}
// If there were any test failures, we want to fail the task to prevent the failures
// from being cached.
if (ext.hadFailure) {
throw new GradleException("Failing this task since '${project.name}:${name}' had test failures.")
}
}
}
}
Expand Down Expand Up @@ -2447,6 +2465,7 @@ project(':tools') {
implementation libs.log4j2Core
implementation libs.log4j1Bridge2Api
implementation libs.joptSimple
implementation libs.re2j

implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonJaxrsJsonProvider
Expand Down Expand Up @@ -3541,6 +3560,7 @@ project(':connect:runtime') {
api project(':connect:transforms')

implementation libs.slf4jApi
implementation libs.slf4jLog4j2
implementation libs.log4j2Api
implementation libs.log4j2Core
implementation libs.log4j1Bridge2Api
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@
<allow pkg="org.apache.kafka.tools"/>
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="scala"/>
<allow pkg="com.google.re2j"/>
<subpackage name="share">
<allow pkg="org.apache.kafka.server.share" />
<allow pkg="org.apache.kafka.coordinator.share" />
Expand Down
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
<suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
<suppress checks="CyclomaticComplexity|ClassDataAbstractionCoupling" files="SharePartition.java"/>

<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
private final Set<String> useClassicGroupApi;
private final Map<String, String> groupIdNotFoundErrorMessages;

public DescribeConsumerGroupsHandler(
boolean includeAuthorizedOperations,
Expand All @@ -73,6 +74,7 @@ public DescribeConsumerGroupsHandler(
this.log = logContext.logger(DescribeConsumerGroupsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
this.useClassicGroupApi = new HashSet<>();
this.groupIdNotFoundErrorMessages = new HashMap<>();
}

private static Set<CoordinatorKey> buildKeySet(Collection<String> groupIds) {
Expand Down Expand Up @@ -255,7 +257,7 @@ private ApiResult<CoordinatorKey, ConsumerGroupDescription> handledClassicGroupR
handleError(
groupIdKey,
error,
null,
describedGroup.errorMessage(),
failed,
groupsToUnmap,
false
Expand Down Expand Up @@ -354,11 +356,18 @@ private void handleError(
case GROUP_ID_NOT_FOUND:
if (isConsumerGroupResponse) {
log.debug("`{}` request for group id {} failed because the group is not " +
"a new consumer group. Will retry with `DescribeGroups` API.", apiName, groupId.idValue);
"a new consumer group. Will retry with `DescribeGroups` API. {}",
apiName, groupId.idValue, errorMsg != null ? errorMsg : "");
useClassicGroupApi.add(groupId.idValue);

// The error message from the ConsumerGroupDescribe API is more informative to the user
// than the error message from the classic group API. Capture it and use it if we get the
// same error code for the classic group API also.
groupIdNotFoundErrorMessages.put(groupId.idValue, errorMsg);
} else {
log.error("`{}` request for group id {} failed because the group does not exist.", apiName, groupId.idValue);
failed.put(groupId, error.exception(errorMsg));
log.debug("`{}` request for group id {} failed because the group does not exist. {}",
apiName, groupId.idValue, errorMsg != null ? errorMsg : "");
failed.put(groupId, error.exception(groupIdNotFoundErrorMessages.getOrDefault(groupId.idValue, errorMsg)));
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -177,17 +176,9 @@ private void handleError(
break;

case GROUP_ID_NOT_FOUND:
// In order to maintain compatibility with describeConsumerGroups, an unknown group ID is
// reported as a DEAD share group, and the admin client operation did not fail
log.debug("`DescribeShareGroups` request for group id {} failed because the group does not exist. {}",
groupId.idValue, errorMsg != null ? errorMsg : "");
final ShareGroupDescription shareGroupDescription =
new ShareGroupDescription(groupId.idValue,
Collections.emptySet(),
GroupState.DEAD,
coordinator,
validAclOperations(describedGroup.authorizedOperations()));
completed.put(groupId, shareGroupDescription);
failed.put(groupId, error.exception(errorMsg));
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,14 +996,6 @@ UnsentRequest buildRequest() {
nodesWithPendingRequests.add(nodeId);
isProcessed = false;

BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null) {
handleShareAcknowledgeFailure(nodeToSend, requestBuilder.data(), this, error, clientResponse.receivedTimeMs());
} else {
handleShareAcknowledgeSuccess(nodeToSend, requestBuilder.data(), this, clientResponse, clientResponse.receivedTimeMs());
}
};

if (requestBuilder == null) {
handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
return null;
Expand All @@ -1014,7 +1006,16 @@ UnsentRequest buildRequest() {
} else {
incompleteAcknowledgements.clear();
}
return new UnsentRequest(requestBuilder, Optional.of(nodeToSend)).whenComplete(responseHandler);

UnsentRequest unsentRequest = new UnsentRequest(requestBuilder, Optional.of(nodeToSend));
BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null) {
handleShareAcknowledgeFailure(nodeToSend, requestBuilder.data(), this, error, unsentRequest.handler().completionTimeMs());
} else {
handleShareAcknowledgeSuccess(nodeToSend, requestBuilder.data(), this, clientResponse, unsentRequest.handler().completionTimeMs());
}
};
return unsentRequest.whenComplete(responseHandler);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public KafkaProducer(final Map<String, Object> configs) {
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
keySerializer, valueSerializer, null, null, null, new ApiVersions(), Time.SYSTEM);
}

/**
Expand Down Expand Up @@ -351,6 +351,7 @@ private void warnIfPartitionerDeprecated() {
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
ApiVersions apiVersions,
Time time) {
try {
this.producerConfig = config;
Expand Down Expand Up @@ -423,7 +424,7 @@ private void warnIfPartitionerDeprecated() {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

this.apiVersions = new ApiVersions();
this.apiVersions = apiVersions;
this.transactionManager = configureTransactionState(config, logContext);
// There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
boolean enableAdaptivePartitioning = partitioner == null &&
Expand Down Expand Up @@ -1509,6 +1510,11 @@ String getClientId() {
return clientId;
}

// Visible for testing
TransactionManager getTransactionManager() {
return transactionManager;
}

private static class ClusterAndWaitTime {
final Cluster cluster;
final long waitedOnMetadataMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,16 +904,25 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
}

String transactionalId = null;

// When we use transaction V1 protocol in transaction we set the request version upper limit to
// LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 so that the broker knows that we're using transaction protocol V1.
boolean useTransactionV1Version = false;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
if (!transactionManager.isTransactionV2Enabled()) {
useTransactionV1Version = true;
}
}

ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
new ProduceRequestData()
.setAcks(acks)
.setTimeoutMs(timeout)
.setTransactionalId(transactionalId)
.setTopicData(tpd));
.setTopicData(tpd),
useTransactionV1Version
);
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());

String nodeId = Integer.toString(destination);
Expand Down
Loading

0 comments on commit 23e391b

Please sign in to comment.