diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index f921ad78393ca..24116f32dd630 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -51,6 +51,66 @@ using this for very simple tasks such as applying labels or adding comments to P
_We must never run the untrusted PR code in the elevated `pull_request_target` context_
+## Our Workflows
+
+### Trunk Build
+
+The [ci.yml](ci.yml) is run when commits are pushed to trunk. This calls into [build.yml](build.yml)
+to run our main build. In the trunk build, we do not read from the Gradle cache,
+but we do write to it. Also, the test catalog is only updated from trunk builds.
+
+### PR Build
+
+Similar to trunk, this workflow starts in [ci.yml](ci.yml) and calls into [build.yml](build.yml).
+Unlike trunk, the PR builds _will_ utilize the Gradle cache.
+
+### PR Triage
+
+In order to get the attention of committers, we have a triage workflow for Pull Requests
+opened by non-committers. This workflow consists of three files:
+
+* [pr-update.yml](pr-update.yml) When a PR is created add the `triage` label if the PR
+ was opened by a non-committer.
+* [pr-reviewed-trigger.yml](pr-reviewed-trigger.yml) Runs when any PR is reviewed.
+ Used as a trigger for the next workflow
+* [pr-reviewed.yml](pr-reviewed.yml) Remove the `triage` label after a PR has been reviewed
+
+_The pr-update.yml workflow includes pull_request_target!_
+
+For committers to avoid having this label added, their membership in the ASF GitHub
+organization must be public. Here are the steps to take:
+
+* Navigate to the ASF organization's "People" page https://github.com/orgs/apache/people
+* Find yourself
+* Change "Organization Visibility" to Public
+
+Full documentation for this process can be found in GitHub's docs: https://docs.github.com/en/account-and-profile/setting-up-and-managing-your-personal-account-on-github/managing-your-membership-in-organizations/publicizing-or-hiding-organization-membership
+
+If you are a committer and do not want your membership in the ASF org listed as public,
+you will need to remove the `triage` label manually.
+
+### CI Approved
+
+Due to a combination of GitHub security and ASF's policy, we required explicit
+approval of workflows on PRs submitted by non-committers (and non-contributors).
+To simply this process, we have a `ci-approved` label which automatically approves
+these workflows.
+
+There are two files related to this workflow:
+
+* [pr-labeled.yml](pr-labeled.yml) approves a pending approval for PRs that have
+been labeled with `ci-approved`
+* [ci-requested.yml](ci-requested.yml) approves future CI requests automatically
+if the PR has the `ci-approved` label
+
+_The pr-labeled.yml workflow includes pull_request_target!_
+
+### Stale PRs
+
+This one is straightforward. Using the "actions/stale" GitHub Action, we automatically
+label and eventually close PRs which have not had activity for some time. See the
+[stale.yml](stale.yml) workflow file for specifics.
+
## GitHub Actions Quirks
### Composite Actions
diff --git a/.github/workflows/pr-reviewed-trigger.yml b/.github/workflows/pr-reviewed-trigger.yml
new file mode 100644
index 0000000000000..f089176ff4b23
--- /dev/null
+++ b/.github/workflows/pr-reviewed-trigger.yml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Pull Request Reviewed
+
+on:
+ pull_request_review:
+ types:
+ - submitted
+
+jobs:
+ # This job is a workaround for the fact that pull_request_review lacks necessary permissions to modify PRs.
+ # Also, there is no pull_request_target analog to pull_request_review. The approach taken here is taken from
+ # https://securitylab.github.com/resources/github-actions-preventing-pwn-requests/.
+ pr-review-trigger:
+ name: Reviewed
+ runs-on: ubuntu-latest
+ steps:
+ - name: Env
+ run: printenv
+ env:
+ GITHUB_CONTEXT: ${{ toJson(github) }}
+ - name: Capture PR Number
+ run:
+ echo ${{ github.event.pull_request.number }} >> pr-number.txt
+ - name: Archive Event
+ uses: actions/upload-artifact@v4
+ with:
+ name: pr-number.txt
+ path: pr-number.txt
diff --git a/.github/workflows/pr-reviewed.yml b/.github/workflows/pr-reviewed.yml
new file mode 100644
index 0000000000000..2f6cae8a4fe97
--- /dev/null
+++ b/.github/workflows/pr-reviewed.yml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Remove Triage Label
+
+on:
+ workflow_run:
+ workflows: [Pull Request Reviewed]
+ types:
+ - completed
+
+jobs:
+ # This job runs with elevated permissions and the ability to modify pull requests. The steps taken here
+ # should be limited to updating labels and adding comments to PRs. This approach is taken from
+ # https://securitylab.github.com/resources/github-actions-preventing-pwn-requests/.
+ remove-triage:
+ if: ${{ github.event.workflow_run.conclusion == 'success' }}
+ runs-on: ubuntu-latest
+ steps:
+ - name: Env
+ run: printenv
+ env:
+ GITHUB_CONTEXT: ${{ toJson(github) }}
+ - uses: actions/download-artifact@v4
+ with:
+ github-token: ${{ github.token }}
+ run-id: ${{ github.event.workflow_run.id }}
+ name: pr-number.txt
+ - name: Remove label
+ uses: actions/github-script@v7
+ with:
+ github-token: ${{ secrets.GITHUB_TOKEN }}
+ script: |
+ var fs = require('fs');
+ var pr_number = Number(fs.readFileSync('./pr-number.txt'));
+ await github.rest.issues.removeLabel({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: pr_number,
+ name: 'triage'
+ });
diff --git a/.github/workflows/pr-update.yml b/.github/workflows/pr-update.yml
index 31e0038705499..e1cd7214d6c36 100644
--- a/.github/workflows/pr-update.yml
+++ b/.github/workflows/pr-update.yml
@@ -25,9 +25,11 @@ on:
# * https://securitylab.github.com/resources/github-actions-preventing-pwn-requests/
pull_request_target:
types: [opened, reopened, synchronize]
+ branches:
+ - trunk
jobs:
- label_PRs:
+ add-labeler-labels:
name: Labeler
permissions:
contents: read
@@ -45,3 +47,24 @@ jobs:
PR_NUM: ${{github.event.number}}
run: |
./.github/scripts/label_small.sh
+
+ add-triage-label:
+ if: github.event.action == 'opened' || github.event.action == 'reopened'
+ name: Add triage label
+ runs-on: ubuntu-latest
+ permissions:
+ pull-requests: write
+ steps:
+ - name: Env
+ run: printenv
+ env:
+ GITHUB_CONTEXT: ${{ toJson(github) }}
+ # If the PR is from a non-committer, add triage label
+ - if: |
+ github.event.pull_request.author_association != 'MEMBER' &&
+ github.event.pull_request.author_association != 'OWNER'
+ env:
+ GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ GH_REPO: ${{ github.repository }}
+ NUMBER: ${{ github.event.pull_request.number }}
+ run: gh pr edit "$NUMBER" --add-label triage
diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml
index 9382d4173e94c..6ceb074f62c10 100644
--- a/.github/workflows/stale.yml
+++ b/.github/workflows/stale.yml
@@ -35,6 +35,22 @@ permissions:
pull-requests: write
jobs:
+ needs-attention:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/stale@v9
+ with:
+ debug-only: ${{ inputs.dryRun || false }}
+ operations-per-run: ${{ inputs.operationsPerRun || 500 }}
+ days-before-stale: 7
+ days-before-close: -1
+ ignore-pr-updates: true
+ only-pr-labels: 'triage'
+ stale-pr-label: 'needs-attention'
+ stale-pr-message: |
+ A label of 'needs-attention' was automatically added to this PR in order to raise the
+ attention of the committers. Once this issue has been triaged, the `triage` label
+ should be removed to prevent this automation from happening again.
stale:
runs-on: ubuntu-latest
steps:
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 4cbc5b4b43bb3..dd1b4b4cb5c7e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,9 +43,11 @@ public class ConsumerGroupDescription {
private final GroupState groupState;
private final Node coordinator;
private final Set authorizedOperations;
+ private final Optional groupEpoch;
+ private final Optional targetAssignmentEpoch;
/**
- * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node)}.
+ * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
@@ -57,7 +60,7 @@ public ConsumerGroupDescription(String groupId,
}
/**
- * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node, Set)}.
+ * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
@@ -71,7 +74,7 @@ public ConsumerGroupDescription(String groupId,
}
/**
- * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set)}.
+ * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
@@ -90,25 +93,8 @@ public ConsumerGroupDescription(String groupId,
this.groupState = GroupState.parse(state.name());
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
- }
-
- public ConsumerGroupDescription(String groupId,
- boolean isSimpleConsumerGroup,
- Collection members,
- String partitionAssignor,
- GroupState groupState,
- Node coordinator) {
- this(groupId, isSimpleConsumerGroup, members, partitionAssignor, groupState, coordinator, Collections.emptySet());
- }
-
- public ConsumerGroupDescription(String groupId,
- boolean isSimpleConsumerGroup,
- Collection members,
- String partitionAssignor,
- GroupState groupState,
- Node coordinator,
- Set authorizedOperations) {
- this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, groupState, coordinator, authorizedOperations);
+ this.groupEpoch = Optional.empty();
+ this.targetAssignmentEpoch = Optional.empty();
}
public ConsumerGroupDescription(String groupId,
@@ -118,7 +104,9 @@ public ConsumerGroupDescription(String groupId,
GroupType type,
GroupState groupState,
Node coordinator,
- Set authorizedOperations) {
+ Set authorizedOperations,
+ Optional groupEpoch,
+ Optional targetAssignmentEpoch) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
@@ -127,6 +115,8 @@ public ConsumerGroupDescription(String groupId,
this.groupState = groupState;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
+ this.groupEpoch = groupEpoch;
+ this.targetAssignmentEpoch = targetAssignmentEpoch;
}
@Override
@@ -141,12 +131,15 @@ public boolean equals(final Object o) {
type == that.type &&
groupState == that.groupState &&
Objects.equals(coordinator, that.coordinator) &&
- Objects.equals(authorizedOperations, that.authorizedOperations);
+ Objects.equals(authorizedOperations, that.authorizedOperations) &&
+ Objects.equals(groupEpoch, that.groupEpoch) &&
+ Objects.equals(targetAssignmentEpoch, that.targetAssignmentEpoch);
}
@Override
public int hashCode() {
- return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator, authorizedOperations);
+ return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator,
+ authorizedOperations, groupEpoch, targetAssignmentEpoch);
}
/**
@@ -215,6 +208,24 @@ public Set authorizedOperations() {
return authorizedOperations;
}
+ /**
+ * The epoch of the consumer group.
+ * The optional is set to an integer if it is a {@link GroupType#CONSUMER} group, and to empty if it
+ * is a {@link GroupType#CLASSIC} group.
+ */
+ public Optional groupEpoch() {
+ return groupEpoch;
+ }
+
+ /**
+ * The epoch of the target assignment.
+ * The optional is set to an integer if it is a {@link GroupType#CONSUMER} group, and to empty if it
+ * is a {@link GroupType#CLASSIC} group.
+ */
+ public Optional targetAssignmentEpoch() {
+ return targetAssignmentEpoch;
+ }
+
@Override
public String toString() {
return "(groupId=" + groupId +
@@ -225,6 +236,8 @@ public String toString() {
", groupState=" + groupState +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
+ ", groupEpoch=" + groupEpoch.orElse(null) +
+ ", targetAssignmentEpoch=" + targetAssignmentEpoch.orElse(null) +
")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
index 5ca7dba86f8f4..0785f2e67155f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.GroupType;
+
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
@@ -30,13 +32,18 @@ public class MemberDescription {
private final String host;
private final MemberAssignment assignment;
private final Optional targetAssignment;
+ private final Optional memberEpoch;
+ private final Optional upgraded;
- public MemberDescription(String memberId,
+ public MemberDescription(
+ String memberId,
Optional groupInstanceId,
String clientId,
String host,
MemberAssignment assignment,
- Optional targetAssignment
+ Optional targetAssignment,
+ Optional memberEpoch,
+ Optional upgraded
) {
this.memberId = memberId == null ? "" : memberId;
this.groupInstanceId = groupInstanceId;
@@ -45,8 +52,38 @@ public MemberDescription(String memberId,
this.assignment = assignment == null ?
new MemberAssignment(Collections.emptySet()) : assignment;
this.targetAssignment = targetAssignment;
+ this.memberEpoch = memberEpoch;
+ this.upgraded = upgraded;
}
+ /**
+ * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
+ */
+ @Deprecated
+ public MemberDescription(
+ String memberId,
+ Optional groupInstanceId,
+ String clientId,
+ String host,
+ MemberAssignment assignment,
+ Optional targetAssignment
+ ) {
+ this(
+ memberId,
+ groupInstanceId,
+ clientId,
+ host,
+ assignment,
+ targetAssignment,
+ Optional.empty(),
+ Optional.empty()
+ );
+ }
+
+ /**
+ * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
+ */
+ @Deprecated
public MemberDescription(
String memberId,
Optional groupInstanceId,
@@ -64,6 +101,10 @@ public MemberDescription(
);
}
+ /**
+ * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
+ */
+ @Deprecated
public MemberDescription(String memberId,
String clientId,
String host,
@@ -81,12 +122,14 @@ public boolean equals(Object o) {
clientId.equals(that.clientId) &&
host.equals(that.host) &&
assignment.equals(that.assignment) &&
- targetAssignment.equals(that.targetAssignment);
+ targetAssignment.equals(that.targetAssignment) &&
+ memberEpoch.equals(that.memberEpoch) &&
+ upgraded.equals(that.upgraded);
}
@Override
public int hashCode() {
- return Objects.hash(memberId, groupInstanceId, clientId, host, assignment, targetAssignment);
+ return Objects.hash(memberId, groupInstanceId, clientId, host, assignment, targetAssignment, memberEpoch, upgraded);
}
/**
@@ -131,6 +174,25 @@ public Optional targetAssignment() {
return targetAssignment;
}
+ /**
+ * The epoch of the group member.
+ * The optional is set to an integer if the member is in a {@link GroupType#CONSUMER} group, and to empty if it
+ * is in a {@link GroupType#CLASSIC} group.
+ */
+ public Optional memberEpoch() {
+ return memberEpoch;
+ }
+
+ /**
+ * The flag indicating whether a member within a {@link GroupType#CONSUMER} group uses the
+ * {@link GroupType#CONSUMER} protocol.
+ * The optional is set to true if it does, to false if it does not, and to empty if it is unknown or if the group
+ * is a {@link GroupType#CLASSIC} group.
+ */
+ public Optional upgraded() {
+ return upgraded;
+ }
+
@Override
public String toString() {
return "(memberId=" + memberId +
@@ -138,6 +200,9 @@ public String toString() {
", clientId=" + clientId +
", host=" + host +
", assignment=" + assignment +
- ", targetAssignment=" + targetAssignment + ")";
+ ", targetAssignment=" + targetAssignment +
+ ", memberEpoch=" + memberEpoch.orElse(null) +
+ ", upgraded=" + upgraded.orElse(null) +
+ ")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
index 77c04c5d5f02e..686ee43a44b2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
@@ -136,7 +136,10 @@ public ApiResult handleResponse(
Optional.ofNullable(groupMember.groupInstanceId()),
groupMember.clientId(),
groupMember.clientHost(),
- new MemberAssignment(partitions)));
+ new MemberAssignment(partitions),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
});
final ClassicGroupDescription classicGroupDescription =
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
index 1d911e2f0c7f4..457675e92675a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
@@ -222,7 +222,9 @@ private ApiResult handledConsumerGroup
groupMember.clientId(),
groupMember.clientHost(),
new MemberAssignment(convertAssignment(groupMember.assignment())),
- Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment())))
+ Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment()))),
+ Optional.of(groupMember.memberEpoch()),
+ groupMember.memberType() == -1 ? Optional.empty() : Optional.of(groupMember.memberType() == 1)
))
);
@@ -235,7 +237,9 @@ private ApiResult handledConsumerGroup
GroupType.CONSUMER,
GroupState.parse(describedGroup.groupState()),
coordinator,
- authorizedOperations
+ authorizedOperations,
+ Optional.of(describedGroup.groupEpoch()),
+ Optional.of(describedGroup.assignmentEpoch())
);
completed.put(groupIdKey, consumerGroupDescription);
}
@@ -281,7 +285,10 @@ private ApiResult handledClassicGroupR
Optional.ofNullable(groupMember.groupInstanceId()),
groupMember.clientId(),
groupMember.clientHost(),
- new MemberAssignment(partitions)));
+ new MemberAssignment(partitions),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
}
final ConsumerGroupDescription consumerGroupDescription =
new ConsumerGroupDescription(groupIdKey.idValue, protocolType.isEmpty(),
@@ -290,7 +297,9 @@ private ApiResult handledClassicGroupR
GroupType.CLASSIC,
GroupState.parse(describedGroup.groupState()),
coordinator,
- authorizedOperations);
+ authorizedOperations,
+ Optional.empty(),
+ Optional.empty());
completed.put(groupIdKey, consumerGroupDescription);
} else {
failed.put(groupIdKey, new IllegalArgumentException(
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index eda01ee3599f9..6aa334d487bde 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -992,12 +992,12 @@ UnsentRequest buildRequest() {
ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig);
isProcessed = false;
+ Node nodeToSend = metadata.fetch().nodeById(nodeId);
if (requestBuilder == null) {
handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
return null;
- } else {
- Node nodeToSend = metadata.fetch().nodeById(nodeId);
+ } else if (nodeToSend != null) {
nodesWithPendingRequests.add(nodeId);
log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend);
@@ -1019,6 +1019,8 @@ UnsentRequest buildRequest() {
};
return unsentRequest.whenComplete(responseHandler);
}
+
+ return null;
}
int getInFlightAcknowledgementsCount(TopicIdPartition tip) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1b1ffcc9d15de..0229c43cb8b3c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1219,11 +1219,19 @@ private void ensureValidRecordSize(int size) {
* flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)}
* calls made since the previous {@link #beginTransaction()} are completed before the commit.
*
+ *
+ * Important: This method should not be used within the callback provided to
+ * {@link #send(ProducerRecord, Callback)}. Invoking flush()
in this context will cause a deadlock.
+ *
*
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void flush() {
+ if (Thread.currentThread() == this.ioThread) {
+ log.error("KafkaProducer.flush() invocation inside a callback will cause a deadlock.");
+ }
+
log.trace("Flushing accumulated records in producer.");
long start = time.nanoseconds();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index b0b48e33c67ff..44f6e1f5a8891 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4057,6 +4057,7 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception {
.setTopicName("foo")
.setPartitions(singletonList(1))
)))
+ .setMemberType((byte) 1)
)),
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp2")
@@ -4110,14 +4111,18 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception {
),
Optional.of(new MemberAssignment(
Collections.singleton(new TopicPartition("foo", 1))
- ))
+ )),
+ Optional.of(10),
+ Optional.of(true)
)
),
"range",
GroupType.CONSUMER,
GroupState.STABLE,
env.cluster().controller(),
- Collections.emptySet()
+ Collections.emptySet(),
+ Optional.of(10),
+ Optional.of(10)
));
expectedResult.put("grp2", new ConsumerGroupDescription(
"grp2",
@@ -4130,14 +4135,19 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception {
"clientHost",
new MemberAssignment(
Collections.singleton(new TopicPartition("bar", 0))
- )
+ ),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()
)
),
"range",
GroupType.CLASSIC,
GroupState.STABLE,
env.cluster().controller(),
- Collections.emptySet()
+ Collections.emptySet(),
+ Optional.empty(),
+ Optional.empty()
));
assertEquals(expectedResult, result.all().get());
@@ -8674,7 +8684,10 @@ private static MemberDescription convertToMemberDescriptions(DescribedGroupMembe
Optional.ofNullable(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
- assignment);
+ assignment,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty());
}
private static ShareMemberDescription convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member,
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
index 0bddc618cfc03..16ce11d7361e5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
@@ -99,5 +99,38 @@ public void testNonEqual() {
assertNotEquals(STATIC_MEMBER_DESCRIPTION, newInstanceDescription);
assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newInstanceDescription.hashCode());
+
+ MemberDescription newTargetAssignmentDescription = new MemberDescription(MEMBER_ID,
+ INSTANCE_ID,
+ CLIENT_ID,
+ HOST,
+ ASSIGNMENT,
+ Optional.of(ASSIGNMENT),
+ Optional.empty(),
+ Optional.empty());
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION, newTargetAssignmentDescription);
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newTargetAssignmentDescription.hashCode());
+
+ MemberDescription newMemberEpochDescription = new MemberDescription(MEMBER_ID,
+ INSTANCE_ID,
+ CLIENT_ID,
+ HOST,
+ ASSIGNMENT,
+ Optional.empty(),
+ Optional.of(1),
+ Optional.empty());
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION, newMemberEpochDescription);
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newMemberEpochDescription.hashCode());
+
+ MemberDescription newIsClassicDescription = new MemberDescription(MEMBER_ID,
+ INSTANCE_ID,
+ CLIENT_ID,
+ HOST,
+ ASSIGNMENT,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.of(false));
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION, newIsClassicDescription);
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newIsClassicDescription.hashCode());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
index cfbf67e2090d8..20cf0b761e641 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
@@ -22,6 +22,7 @@
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -54,6 +55,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -152,29 +154,46 @@ public void testInvalidBuildRequest() {
@Test
public void testSuccessfulHandleConsumerGroupResponse() {
DescribeConsumerGroupsHandler handler = new DescribeConsumerGroupsHandler(false, logContext);
- Collection members = singletonList(new MemberDescription(
- "memberId",
- Optional.of("instanceId"),
- "clientId",
- "host",
- new MemberAssignment(Set.of(
- new TopicPartition("foo", 0),
- new TopicPartition("bar", 1))
+ Collection members = List.of(
+ new MemberDescription(
+ "memberId",
+ Optional.of("instanceId"),
+ "clientId",
+ "host",
+ new MemberAssignment(Set.of(
+ new TopicPartition("foo", 0)
+ )),
+ Optional.of(new MemberAssignment(Set.of(
+ new TopicPartition("foo", 1)
+ ))),
+ Optional.of(10),
+ Optional.of(true)
),
- Optional.of(new MemberAssignment(Set.of(
- new TopicPartition("foo", 1),
- new TopicPartition("bar", 2)
- )))
- ));
+ new MemberDescription(
+ "memberId-classic",
+ Optional.of("instanceId-classic"),
+ "clientId-classic",
+ "host",
+ new MemberAssignment(Set.of(
+ new TopicPartition("bar", 0)
+ )),
+ Optional.of(new MemberAssignment(Set.of(
+ new TopicPartition("bar", 1)
+ ))),
+ Optional.of(9),
+ Optional.of(false)
+ ));
ConsumerGroupDescription expected = new ConsumerGroupDescription(
groupId1,
false,
members,
"range",
GroupType.CONSUMER,
- ConsumerGroupState.STABLE,
+ GroupState.STABLE,
coordinator,
- Collections.emptySet()
+ Collections.emptySet(),
+ Optional.of(10),
+ Optional.of(10)
);
AdminApiHandler.ApiResult result = handler.handleResponse(
coordinator,
@@ -189,7 +208,7 @@ public void testSuccessfulHandleConsumerGroupResponse() {
.setAssignmentEpoch(10)
.setAssignorName("range")
.setAuthorizedOperations(Utils.to32BitField(emptySet()))
- .setMembers(singletonList(
+ .setMembers(List.of(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId("memberId")
.setInstanceId("instanceId")
@@ -200,27 +219,44 @@ public void testSuccessfulHandleConsumerGroupResponse() {
.setSubscribedTopicNames(singletonList("foo"))
.setSubscribedTopicRegex("regex")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
- .setTopicPartitions(Arrays.asList(
+ .setTopicPartitions(List.of(
new ConsumerGroupDescribeResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setTopicName("foo")
- .setPartitions(Collections.singletonList(0)),
+ .setPartitions(Collections.singletonList(0))
+ )))
+ .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List.of(
new ConsumerGroupDescribeResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
- .setTopicName("bar")
+ .setTopicName("foo")
.setPartitions(Collections.singletonList(1))
)))
- .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
- .setTopicPartitions(Arrays.asList(
+ .setMemberType((byte) 1),
+ new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("memberId-classic")
+ .setInstanceId("instanceId-classic")
+ .setClientHost("host")
+ .setClientId("clientId-classic")
+ .setMemberEpoch(9)
+ .setRackId("rackid")
+ .setSubscribedTopicNames(singletonList("bar"))
+ .setSubscribedTopicRegex("regex")
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List.of(
new ConsumerGroupDescribeResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
- .setTopicName("foo")
- .setPartitions(Collections.singletonList(1)),
+ .setTopicName("bar")
+ .setPartitions(Collections.singletonList(0))
+ )))
+ .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List.of(
new ConsumerGroupDescribeResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setTopicName("bar")
- .setPartitions(Collections.singletonList(2))
+ .setPartitions(Collections.singletonList(1))
)))
+ .setMemberType((byte) 0)
))
))
)
@@ -232,9 +268,13 @@ public void testSuccessfulHandleConsumerGroupResponse() {
public void testSuccessfulHandleClassicGroupResponse() {
Collection members = singletonList(new MemberDescription(
"memberId",
+ Optional.empty(),
"clientId",
"host",
- new MemberAssignment(tps)));
+ new MemberAssignment(tps),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
ConsumerGroupDescription expected = new ConsumerGroupDescription(
groupId1,
true,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
index ab33c8f45d7fd..b4f649de579ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
@@ -150,6 +150,7 @@ private ConsumerConfig initConsumerConfigWithClassTypes(List classTypes)
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes);
+ props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
return new ConsumerConfig(props);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 8fc23f6255acf..9bc5d66d9c3a0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -1068,7 +1068,6 @@ private ConsumerConfig config() {
prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MS));
prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MAX_MS));
- prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_HEARTBEAT_INTERVAL_MS));
return new ConsumerConfig(prop);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 549720766858d..f9e46571795bc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -655,7 +655,6 @@ private ConsumerConfig config() {
prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MS));
prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MAX_MS));
- prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_HEARTBEAT_INTERVAL_MS));
return new ConsumerConfig(prop);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 120e03c6f8e3b..43af6b274b6ac 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -121,9 +121,7 @@ CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws I
File connectorConfigurationFile = Paths.get(filePath).toFile();
try {
- Map connectorConfigs = objectMapper.readValue(
- connectorConfigurationFile,
- new TypeReference>() { });
+ Map connectorConfigs = objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { });
if (!connectorConfigs.containsKey(NAME_CONFIG)) {
throw new ConnectException("Connector configuration at '" + filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
@@ -136,8 +134,7 @@ CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws I
try {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile,
- new TypeReference() { });
+ CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { });
if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
if (!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name())) {
throw new ConnectException("Connector name configuration in 'config' doesn't match the one specified in 'name' at '" + filePath
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index ff62c25eee585..430cad52b8f6f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -81,8 +81,7 @@ public ConnectMetrics(String workerId, WorkerConfig config, Time time, String cl
.timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
Sensor.RecordingLevel.forName(metricsRecordingLevel));
- Map contextLabels = new HashMap<>();
- contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ Map contextLabels = new HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
Object groupId = config.originals().get(DistributedConfig.GROUP_ID_CONFIG);
if (groupId != null) {
@@ -391,8 +390,7 @@ public Sensor sensor(String name, MetricConfig config, Sensor... parents) {
public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
// We need to make sure that all sensor names are unique across all groups, so use the sensor prefix
Sensor result = metrics.sensor(sensorPrefix + name, config, Long.MAX_VALUE, recordingLevel, parents);
- if (result != null)
- sensorNames.add(result.name());
+ sensorNames.add(result.name());
return result;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index bcff615c4147e..d837776be3829 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -321,7 +321,7 @@ private void commitTransaction() {
error = flushError.get();
if (error != null) {
- recordCommitFailure(time.milliseconds() - started, null);
+ recordCommitFailure(time.milliseconds() - started);
offsetWriter.cancelFlush();
throw maybeWrapProducerSendException(
"Failed to flush offsets and/or records for task " + id,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
index 7c10f42148e00..9dddec09ae340 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -22,7 +22,7 @@
/**
* Utility class that tracks the current state and the duration of time spent in each state.
- * This class is threadsafe.
+ * This class is thread-safe.
*/
public class StateTracker {
@@ -60,7 +60,7 @@ public State currentState() {
/**
* An immutable record of the accumulated times at the most recent state change. This class is required to
- * efficiently make {@link StateTracker} threadsafe.
+ * efficiently make {@link StateTracker} thread-safe.
*/
private static final class StateChange {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1f4e930ae5a46..424de8f3de5b1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -228,7 +228,7 @@ protected void iteration() {
// Maybe commit
if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
- commitOffsets(now, false);
+ commitOffsets(now);
nextCommit = now + offsetCommitIntervalMs;
context.clearCommitRequest();
}
@@ -282,7 +282,7 @@ private void onCommitCompleted(Throwable error, long seqno, Map offsets, boolean cl
}
}
- private void commitOffsets(long now, boolean closing) {
- commitOffsets(now, closing, consumer.assignment());
+ private void commitOffsets(long now) {
+ commitOffsets(now, false, consumer.assignment());
}
private void commitOffsets(long now, boolean closing, Collection topicPartitions) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0d0eba32d86c4..55cc097083d02 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -262,11 +262,11 @@ public boolean commitOffsets() {
shouldFlush = offsetWriter.beginFlush(timeout - time.milliseconds(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("{} Interrupted while waiting for previous offset flush to complete, cancelling", this);
- recordCommitFailure(time.milliseconds() - started, e);
+ recordCommitFailure(time.milliseconds() - started);
return false;
} catch (TimeoutException e) {
log.warn("{} Timed out while waiting for previous offset flush to complete, cancelling", this);
- recordCommitFailure(time.milliseconds() - started, e);
+ recordCommitFailure(time.milliseconds() - started);
return false;
}
if (!shouldFlush) {
@@ -292,7 +292,7 @@ public boolean commitOffsets() {
// any data
if (flushFuture == null) {
offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, null);
+ recordCommitFailure(time.milliseconds() - started);
return false;
}
try {
@@ -304,17 +304,17 @@ public boolean commitOffsets() {
} catch (InterruptedException e) {
log.warn("{} Flush of offsets interrupted, cancelling", this);
offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, e);
+ recordCommitFailure(time.milliseconds() - started);
return false;
} catch (ExecutionException e) {
log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, e);
+ recordCommitFailure(time.milliseconds() - started);
return false;
} catch (TimeoutException e) {
log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, null);
+ recordCommitFailure(time.milliseconds() - started);
return false;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 98171fe47b6aa..9b70572fe24a7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -356,17 +356,16 @@ protected void recordActiveTopic(String topic) {
* @param duration the length of time in milliseconds for the commit attempt to complete
*/
protected void recordCommitSuccess(long duration) {
- taskMetricsGroup.recordCommit(duration, true, null);
+ taskMetricsGroup.recordCommit(duration, true);
}
/**
* Record that offsets have been committed.
*
* @param duration the length of time in milliseconds for the commit attempt to complete
- * @param error the unexpected error that occurred; may be null in the case of timeouts or interruptions
*/
- protected void recordCommitFailure(long duration, Throwable error) {
- taskMetricsGroup.recordCommit(duration, false, error);
+ protected void recordCommitFailure(long duration) {
+ taskMetricsGroup.recordCommit(duration, false);
}
/**
@@ -434,7 +433,7 @@ void close() {
metricGroup.close();
}
- void recordCommit(long duration, boolean success, Throwable error) {
+ void recordCommit(long duration, boolean success) {
if (success) {
commitTime.record(duration);
commitAttempts.record(1.0d);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 69310a91bcc5f..ff7a9d3149d58 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1510,7 +1510,7 @@ public void restartConnectorAndTasks(RestartRequest request, Callback plan = buildRestartPlan(request);
- if (!plan.isPresent()) {
+ if (plan.isEmpty()) {
callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null);
} else {
callback.onCompletion(null, plan.get().restartConnectorStateInfo());
@@ -1558,7 +1558,7 @@ void processRestartRequests() {
protected synchronized void doRestartConnectorAndTasks(RestartRequest request) {
String connectorName = request.connectorName();
Optional maybePlan = buildRestartPlan(request);
- if (!maybePlan.isPresent()) {
+ if (maybePlan.isEmpty()) {
log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request);
return;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index a3982f070a433..c89eb33082fbe 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -89,8 +89,7 @@ public WorkerGroupMember(DistributedConfig config,
.tags(metricsTags);
List reporters = CommonClientConfigs.metricsReporters(clientId, config);
- Map contextLabels = new HashMap<>();
- contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ Map contextLabels = new HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, config.kafkaClusterId());
contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, config.getString(DistributedConfig.GROUP_ID_CONFIG));
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
index ed09d4a37a005..56f559dc245e0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
@@ -20,7 +20,6 @@
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.predicates.Predicate;
-import java.io.PrintStream;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -53,29 +52,22 @@ private > DocInfo(Class
predicateClass, String overvie
.sorted(Comparator.comparing(docInfo -> docInfo.predicateName))
.collect(Collectors.toList());
- private static void printPredicateHtml(PrintStream out, DocInfo docInfo) {
- out.println("
");
-
- out.print("
");
-
- out.println(docInfo.overview);
-
- out.println("
");
-
- out.println(docInfo.configDef.toHtml(6, key -> docInfo.predicateName + "_" + key));
-
- out.println("
");
- }
-
- private static void printHtml(PrintStream out) {
+ private static String toHtml() {
+ StringBuilder b = new StringBuilder();
for (final DocInfo docInfo : PREDICATES) {
- printPredicateHtml(out, docInfo);
+ b.append("\n");
+ b.append("
\n");
+ b.append(docInfo.overview + "\n");
+ b.append("
\n");
+ b.append(docInfo.configDef.toHtml(6, key -> docInfo.predicateName + "_" + key) + "\n");
+ b.append("
\n");
}
+ return b.toString();
}
public static void main(String... args) {
- printHtml(System.out);
+ System.out.println(toHtml());
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
index 2c7250eb588c3..100f938bd9b5d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -34,7 +34,6 @@
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;
-import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
@@ -71,30 +70,23 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef)
new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF)
);
- private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {
- out.println("");
-
- out.print("
");
-
- out.println(docInfo.overview);
-
- out.println("
");
-
- out.println(docInfo.configDef.toHtml(6, key -> docInfo.transformationName + "_" + key));
-
- out.println("
");
- }
-
- private static void printHtml(PrintStream out) {
+ private static String toHtml() {
+ StringBuilder b = new StringBuilder();
for (final DocInfo docInfo : TRANSFORMATIONS) {
- printTransformationHtml(out, docInfo);
+ b.append("\n");
+ b.append("
\n");
+ b.append(docInfo.overview + "\n");
+ b.append("
\n");
+ b.append(docInfo.configDef.toHtml(6, key -> docInfo.transformationName + "_" + key) + "\n");
+ b.append("
\n");
}
+ return b.toString();
}
public static void main(String... args) {
- printHtml(System.out);
+ System.out.println(toHtml());
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
index c09eba62a2377..fd62fc172f4cf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
@@ -32,7 +32,7 @@ default void recordStage(Stage stage) {
}
default Callback chainStaging(Callback chained) {
- return new Callback() {
+ return new Callback<>() {
@Override
public void recordStage(Stage stage) {
Callback.this.recordStage(stage);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 23f6d8a9c4937..bcc9f94b1fb29 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -159,12 +159,12 @@ public KafkaBasedLog(String topic,
* @param initializer the function that should be run when this log is {@link #start() started}; may be null
*/
public KafkaBasedLog(String topic,
- Map producerConfigs,
- Map consumerConfigs,
- Supplier topicAdminSupplier,
- Callback> consumedCallback,
- Time time,
- java.util.function.Consumer initializer) {
+ Map producerConfigs,
+ Map consumerConfigs,
+ Supplier topicAdminSupplier,
+ Callback> consumedCallback,
+ Time time,
+ java.util.function.Consumer initializer) {
this.topic = topic;
this.producerConfigs = producerConfigs;
this.consumerConfigs = consumerConfigs;
@@ -212,7 +212,7 @@ public static KafkaBasedLog withExistingClients(String topic,
) {
Objects.requireNonNull(topicAdmin);
Objects.requireNonNull(readTopicPartition);
- return new KafkaBasedLog(topic,
+ return new KafkaBasedLog<>(topic,
Collections.emptyMap(),
Collections.emptyMap(),
() -> topicAdmin,
@@ -266,8 +266,8 @@ public void start(boolean reportErrorsToCallback) {
// Then create the producer and consumer
producer = Optional.ofNullable(createProducer());
- if (!producer.isPresent())
- log.trace("Creating read-only KafkaBasedLog for topic " + topic);
+ if (producer.isEmpty())
+ log.trace("Creating read-only KafkaBasedLog for topic {}", topic);
consumer = createConsumer();
List partitions = new ArrayList<>();
@@ -308,13 +308,13 @@ public void start(boolean reportErrorsToCallback) {
thread = new WorkThread();
thread.start();
- log.info("Finished reading KafkaBasedLog for topic " + topic);
+ log.info("Finished reading KafkaBasedLog for topic {}", topic);
- log.info("Started KafkaBasedLog for topic " + topic);
+ log.info("Started KafkaBasedLog for topic {}", topic);
}
public void stop() {
- log.info("Stopping KafkaBasedLog for topic " + topic);
+ log.info("Stopping KafkaBasedLog for topic {}", topic);
synchronized (this) {
stopRequested = true;
@@ -338,7 +338,7 @@ public void stop() {
// do not close the admin client, since we don't own it
admin = null;
- log.info("Stopped KafkaBasedLog for topic " + topic);
+ log.info("Stopped KafkaBasedLog for topic {}", topic);
}
/**
@@ -466,16 +466,16 @@ protected boolean readPartition(TopicPartition topicPartition) {
return true;
}
- private void poll(long timeoutMs) {
+ private void poll() {
try {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(timeoutMs));
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
for (ConsumerRecord record : records)
consumedCallback.onCompletion(null, record);
} catch (WakeupException e) {
// Expected on get() or stop(). The calling code should handle this
throw e;
} catch (KafkaException e) {
- log.error("Error polling: " + e);
+ log.error("Error polling: ", e);
if (reportErrorsToCallback) {
consumedCallback.onCompletion(e, null);
}
@@ -507,7 +507,7 @@ private void readToLogEnd(boolean shouldRetry) {
} else {
log.trace("Behind end offset {} for {}; last-read offset is {}",
endOffset, topicPartition, lastConsumedOffset);
- poll(Integer.MAX_VALUE);
+ poll();
break;
}
}
@@ -609,7 +609,7 @@ public void run() {
}
try {
- poll(Integer.MAX_VALUE);
+ poll();
} catch (WakeupException e) {
// See previous comment, both possible causes of this wakeup are handled by starting this loop again
continue;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
index a0a68e0e81e76..cb8d51c0a43fd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
@@ -59,7 +59,7 @@ public static T retryUntilTimeout(Callable callable, Supplier des
// visible for testing
static T retryUntilTimeout(Callable callable, Supplier description, Duration timeoutDuration, long retryBackoffMs, Time time) throws Exception {
- // if null supplier or string is provided, the message will be default to "callabe"
+ // if null supplier or string is provided, the message will be default to "callable"
final String descriptionStr = Optional.ofNullable(description)
.map(Supplier::get)
.orElse("callable");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
index 0ad3889b5f0ea..df2da55278005 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
@@ -20,29 +20,21 @@
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Set;
public class SafeObjectInputStream extends ObjectInputStream {
- protected static final Set DEFAULT_NO_DESERIALIZE_CLASS_NAMES;
-
- static {
-
- Set s = new HashSet<>();
- s.add("org.apache.commons.collections.functors.InvokerTransformer");
- s.add("org.apache.commons.collections.functors.InstantiateTransformer");
- s.add("org.apache.commons.collections4.functors.InvokerTransformer");
- s.add("org.apache.commons.collections4.functors.InstantiateTransformer");
- s.add("org.codehaus.groovy.runtime.ConvertedClosure");
- s.add("org.codehaus.groovy.runtime.MethodClosure");
- s.add("org.springframework.beans.factory.ObjectFactory");
- s.add("com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl");
- s.add("org.apache.xalan.xsltc.trax.TemplatesImpl");
- DEFAULT_NO_DESERIALIZE_CLASS_NAMES = Collections.unmodifiableSet(s);
- }
-
+ protected static final Set DEFAULT_NO_DESERIALIZE_CLASS_NAMES = Set.of(
+ "org.apache.commons.collections.functors.InvokerTransformer",
+ "org.apache.commons.collections.functors.InstantiateTransformer",
+ "org.apache.commons.collections4.functors.InvokerTransformer",
+ "org.apache.commons.collections4.functors.InstantiateTransformer",
+ "org.codehaus.groovy.runtime.ConvertedClosure",
+ "org.codehaus.groovy.runtime.MethodClosure",
+ "org.springframework.beans.factory.ObjectFactory",
+ "com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl",
+ "org.apache.xalan.xsltc.trax.TemplatesImpl"
+ );
public SafeObjectInputStream(InputStream in) throws IOException {
super(in);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 0974f35d16c71..f4374d18500ea 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -74,8 +74,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -408,15 +406,6 @@ private void assertErrorHandlingMetricValue(String name, double expected) {
assertEquals(expected, measured, 0.001d);
}
- private void verifyCloseSource() throws IOException {
- verify(producer).close(any(Duration.class));
- verify(admin).close(any(Duration.class));
- verify(offsetReader).close();
- verify(offsetStore).stop();
- // headerConverter.close() can throw IOException
- verify(headerConverter).close();
- }
-
private void expectTopicCreation(String topic) {
if (enableTopicCreation) {
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 14e29cded9ca6..65262983d9f8b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -2149,7 +2149,7 @@ public void testAlterOffsetsSourceConnector(boolean enableTopicCreation) throws
@ParameterizedTest
@ValueSource(booleans = {true, false})
@SuppressWarnings("unchecked")
- public void testAlterOffsetsSourceConnectorError(boolean enableTopicCreation) throws Exception {
+ public void testAlterOffsetsSourceConnectorError(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
mockInternalConverters();
@@ -2188,7 +2188,7 @@ public void testAlterOffsetsSourceConnectorError(boolean enableTopicCreation) th
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testNormalizeSourceConnectorOffsets(boolean enableTopicCreation) throws Exception {
+ public void testNormalizeSourceConnectorOffsets(boolean enableTopicCreation) {
setup(enableTopicCreation);
Map, Map> offsets = Collections.singletonMap(
Collections.singletonMap("filename", "/path/to/filename"),
@@ -2334,7 +2334,7 @@ private void alterOffsetsSinkConnector(Map, Map> parti
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean enableTopicCreation) throws Exception {
+ public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
@@ -2375,7 +2375,7 @@ public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean enableTopicCr
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean enableTopicCreation) throws Exception {
+ public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
@@ -2426,7 +2426,7 @@ public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean enableTopicC
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testAlterOffsetsSinkConnectorSynchronousError(boolean enableTopicCreation) throws Exception {
+ public void testAlterOffsetsSinkConnectorSynchronousError(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
@@ -2557,7 +2557,7 @@ public void testResetOffsetsSinkConnector(boolean enableTopicCreation) throws Ex
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean enableTopicCreation) throws Exception {
+ public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
@@ -2594,7 +2594,7 @@ public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean enable
@ParameterizedTest
@ValueSource(booleans = {true, false})
@SuppressWarnings("unchecked")
- public void testModifySourceConnectorOffsetsTimeout(boolean enableTopicCreation) throws Exception {
+ public void testModifySourceConnectorOffsetsTimeout(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
Time time = new MockTime();
@@ -2630,7 +2630,7 @@ public void testModifySourceConnectorOffsetsTimeout(boolean enableTopicCreation)
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testModifyOffsetsSinkConnectorTimeout(boolean enableTopicCreation) throws Exception {
+ public void testModifyOffsetsSinkConnectorTimeout(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index 462d02f3e6d8e..06b0e3fb55cc4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -17,7 +17,6 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
-import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -30,35 +29,11 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class WorkerTestUtils {
- public static WorkerLoad emptyWorkerLoad(String worker) {
- return new WorkerLoad.Builder(worker).build();
- }
-
- public WorkerLoad workerLoad(String worker, int connectorStart, int connectorNum,
- int taskStart, int taskNum) {
- return new WorkerLoad.Builder(worker).with(
- newConnectors(connectorStart, connectorStart + connectorNum),
- newTasks(taskStart, taskStart + taskNum)).build();
- }
-
- public static List newConnectors(int start, int end) {
- return IntStream.range(start, end)
- .mapToObj(i -> "connector" + i)
- .collect(Collectors.toList());
- }
-
- public static List newTasks(int start, int end) {
- return IntStream.range(start, end)
- .mapToObj(i -> new ConnectorTaskId("task", i))
- .collect(Collectors.toList());
- }
-
public static ClusterConfigState clusterConfigState(long offset,
int connectorNum,
int taskNum) {
@@ -82,24 +57,6 @@ public static ClusterConfigState clusterConfigState(long offset,
Collections.emptySet());
}
- public static Map memberConfigs(String givenLeader,
- long givenOffset,
- Map givenAssignments) {
- return givenAssignments.entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, e.getValue())));
- }
-
- public static Map memberConfigs(String givenLeader,
- long givenOffset,
- int start,
- int connectorNum) {
- return IntStream.range(start, connectorNum + 1)
- .mapToObj(i -> new SimpleEntry<>("worker" + i, new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null)))
- .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
- }
-
public static Map connectorTaskCounts(int start,
int connectorNum,
int taskCounts) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
index f76f0585f1aff..e58444ccd4d77 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
@@ -75,8 +75,6 @@ public void testListenersConfigAllowedValues() {
props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
config = RestServerConfig.forPublic(null, props);
assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.listeners());
-
- config = RestServerConfig.forPublic(null, props);
}
@Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 392d082d3305e..700284a9c66ee 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -972,9 +972,4 @@ private Stubber expectAndCallbackNotLeaderException(final ArgumentCaptor {
- T run() throws Throwable;
- }
-
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index d0968db29ced2..4173d9a357c45 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -164,7 +164,6 @@ public class KafkaConfigBackingStoreTest {
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
);
- private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "PAUSED");
@@ -1658,12 +1657,6 @@ private void expectRead(LinkedHashMap serializedValues,
}
}
- private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) {
- LinkedHashMap serializedData = new LinkedHashMap<>();
- serializedData.put(key, serializedValue);
- expectRead(serializedData, Collections.singletonMap(key, deserializedValue));
- }
-
// This map needs to maintain ordering
private Answer> expectReadToEnd(final Map serializedConfigs) {
return invocation -> {
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6e95e36020b8d..b2b1ab856c04f 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -17,7 +17,6 @@
package kafka.log.remote;
import kafka.cluster.Partition;
-import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
import kafka.server.DelayedRemoteListOffsets;
@@ -74,6 +73,7 @@
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
deleted file mode 100644
index 77662c3b11464..0000000000000
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ /dev/null
@@ -1,307 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
-import kafka.server.KafkaConfig
-import kafka.utils.{Logging, ToolsUtils}
-import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.{Exit, Time, Utils}
-import org.apache.kafka.server.config.ZkConfigs
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
-import org.apache.zookeeper.KeeperException
-import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.client.ZKClientConfig
-import org.apache.zookeeper.data.Stat
-
-import scala.annotation.tailrec
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-import scala.concurrent._
-import scala.concurrent.duration._
-
-/**
- * This tool is to be used when making access to ZooKeeper authenticated or
- * the other way around, when removing authenticated access. The exact steps
- * to migrate a Kafka cluster from unsecure to secure with respect to ZooKeeper
- * access are the following:
- *
- * 1- Perform a rolling upgrade of Kafka servers, setting zookeeper.set.acl to false
- * and passing a valid JAAS login file via the system property
- * java.security.auth.login.config
- * 2- Perform a second rolling upgrade keeping the system property for the login file
- * and now setting zookeeper.set.acl to true
- * 3- Finally run this tool. There is a script under ./bin. Run
- * ./bin/zookeeper-security-migration.sh --help
- * to see the configuration parameters. An example of running it is the following:
- * ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181
- *
- * To convert a cluster from secure to unsecure, we need to perform the following
- * steps:
- * 1- Perform a rolling upgrade setting zookeeper.set.acl to false for each server
- * 2- Run this migration tool, setting zookeeper.acl to unsecure
- * 3- Perform another rolling upgrade to remove the system property setting the
- * login file (java.security.auth.login.config).
- */
-
-object ZkSecurityMigrator extends Logging {
- private val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of "
- + "znodes as part of the process of setting up ZooKeeper "
- + "authentication.")
- private val tlsConfigFileOption = "zk-tls-config-file"
-
- def run(args: Array[String]): Unit = {
- val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
- val opts = new ZkSecurityMigratorOptions(args)
-
- CommandLineUtils.maybePrintHelpOrVersion(opts, usageMessage)
-
- // Must have either SASL or TLS mutual authentication enabled to use this tool.
- // Instantiate the client config we will use so that we take into account config provided via the CLI option
- // and system properties passed via -D parameters if no CLI option is given.
- val zkClientConfig = createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile).getOrElse(new ZKClientConfig())
- val tlsClientAuthEnabled = KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig)
- if (jaasFile == null && !tlsClientAuthEnabled) {
- val errorMsg = s"No JAAS configuration file has been specified and no TLS client certificate has been specified. Please make sure that you set " +
- s"the system property ${JaasUtils.JAVA_LOGIN_CONFIG_PARAM} or provide a ZooKeeper client TLS configuration via --$tlsConfigFileOption " +
- s"identifying at least ${ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG}, ${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG}, and ${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG}"
- System.err.println("ERROR: %s".format(errorMsg))
- throw new IllegalArgumentException("Incorrect configuration")
- }
-
- if (!tlsClientAuthEnabled && !JaasUtils.isZkSaslEnabled) {
- val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile)
- System.out.println("ERROR: %s".format(errorMsg))
- throw new IllegalArgumentException("Incorrect configuration")
- }
-
- val zkAcl = opts.options.valueOf(opts.zkAclOpt) match {
- case "secure" =>
- info("zookeeper.acl option is secure")
- true
- case "unsecure" =>
- info("zookeeper.acl option is unsecure")
- false
- case _ =>
- ToolsUtils.printUsageAndExit(opts.parser, usageMessage)
- }
- val zkUrl = opts.options.valueOf(opts.zkUrlOpt)
- val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
- val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
- val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout,
- Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ZkSecurityMigrator", enableEntityConfigControllerCheck = false)
- val enablePathCheck = opts.options.has(opts.enablePathCheckOpt)
- val migrator = new ZkSecurityMigrator(zkClient)
- migrator.run(enablePathCheck)
- }
-
- def main(args: Array[String]): Unit = {
- try {
- run(args)
- } catch {
- case e: Exception =>
- e.printStackTrace()
- // must exit with non-zero status so system tests will know we failed
- Exit.exit(1)
- }
- }
-
- def createZkClientConfigFromFile(filename: String) : ZKClientConfig = {
- val zkTlsConfigFileProps = Utils.loadProps(filename, ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.asJava)
- val zkClientConfig = new ZKClientConfig() // Initializes based on any system properties that have been set
- // Now override any set system properties with explicitly-provided values from the config file
- // Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make
- info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename")
- zkTlsConfigFileProps.asScala.foreachEntry { (key, value) =>
- info(s"Setting $key")
- KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value)
- }
- zkClientConfig
- }
-
- private[admin] def createZkClientConfigFromOption(options: OptionSet, option: OptionSpec[String]) : Option[ZKClientConfig] =
- if (!options.has(option))
- None
- else
- Some(createZkClientConfigFromFile(options.valueOf(option)))
-
- private class ZkSecurityMigratorOptions(args: Array[String]) extends CommandDefaultOptions(args) {
- val zkAclOpt: OptionSpec[String] = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
- + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
- val zkUrlOpt: OptionSpec[String] = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " +
- "takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181").
- ofType(classOf[String])
- val zkSessionTimeoutOpt: OptionSpec[Integer] = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout.").
- withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
- val zkConnectionTimeoutOpt: OptionSpec[Integer] = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout.").
- withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
- val enablePathCheckOpt: OptionSpecBuilder = parser.accepts("enable.path.check", "Checks if all the root paths exist in ZooKeeper " +
- "before migration. If not, exit the command.")
- val zkTlsConfigFile: OptionSpec[String] = parser.accepts(tlsConfigFileOption,
- "Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " +
- ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.mkString(", ") + " are ignored.")
- .withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String])
- options = parser.parse(args : _*)
- }
-}
-
-class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
- private val zkSecurityMigratorUtils = new ZkSecurityMigratorUtils(zkClient)
- private val futures = new mutable.Queue[Future[String]]
-
- private def setAcl(path: String, setPromise: Promise[String]): Unit = {
- info("Setting ACL for path %s".format(path))
- zkSecurityMigratorUtils.currentZooKeeper.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, setPromise)
- }
-
- private def retrieveChildren(path: String, childrenPromise: Promise[String]): Unit = {
- info("Getting children to set ACLs for path %s".format(path))
- zkSecurityMigratorUtils.currentZooKeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
- }
-
- private def setAclIndividually(path: String): Unit = {
- val setPromise = Promise[String]()
- futures.synchronized {
- futures += setPromise.future
- }
- setAcl(path, setPromise)
- }
-
- private def setAclsRecursively(path: String): Unit = {
- val setPromise = Promise[String]()
- val childrenPromise = Promise[String]()
- futures.synchronized {
- futures += setPromise.future
- futures += childrenPromise.future
- }
- setAcl(path, setPromise)
- retrieveChildren(path, childrenPromise)
- }
-
- private object GetChildrenCallback extends ChildrenCallback {
- def processResult(rc: Int,
- path: String,
- ctx: Object,
- children: java.util.List[String]): Unit = {
- val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
- val promise = ctx.asInstanceOf[Promise[String]]
- Code.get(rc) match {
- case Code.OK =>
- // Set ACL for each child
- children.asScala.map { child =>
- path match {
- case "/" => s"/$child"
- case path => s"$path/$child"
- }
- }.foreach(setAclsRecursively)
- promise success "done"
- case Code.CONNECTIONLOSS =>
- zkHandle.getChildren(path, false, GetChildrenCallback, ctx)
- case Code.NONODE =>
- warn("Node is gone, it could be have been legitimately deleted: %s".format(path))
- promise success "done"
- case Code.SESSIONEXPIRED =>
- // Starting a new session isn't really a problem, but it'd complicate
- // the logic of the tool, so we quit and let the user re-run it.
- System.out.println("ZooKeeper session expired while changing ACLs")
- promise failure KeeperException.create(Code.get(rc))
- case _ =>
- System.out.println("Unexpected return code: %d".format(rc))
- promise failure KeeperException.create(Code.get(rc))
- }
- }
- }
-
- private object SetACLCallback extends StatCallback {
- def processResult(rc: Int,
- path: String,
- ctx: Object,
- stat: Stat): Unit = {
- val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
- val promise = ctx.asInstanceOf[Promise[String]]
-
- Code.get(rc) match {
- case Code.OK =>
- info("Successfully set ACLs for %s".format(path))
- promise success "done"
- case Code.CONNECTIONLOSS =>
- zkHandle.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, ctx)
- case Code.NONODE =>
- warn("Znode is gone, it could be have been legitimately deleted: %s".format(path))
- promise success "done"
- case Code.SESSIONEXPIRED =>
- // Starting a new session isn't really a problem, but it'd complicate
- // the logic of the tool, so we quit and let the user re-run it.
- System.out.println("ZooKeeper session expired while changing ACLs")
- promise failure KeeperException.create(Code.get(rc))
- case _ =>
- System.out.println("Unexpected return code: %d".format(rc))
- promise failure KeeperException.create(Code.get(rc))
- }
- }
- }
-
- private def run(enablePathCheck: Boolean): Unit = {
- try {
- setAclIndividually("/")
- checkPathExistenceAndMaybeExit(enablePathCheck)
- for (path <- ZkData.SecureRootPaths) {
- debug("Going to set ACL for %s".format(path))
- if (path == ControllerZNode.path && !zkClient.pathExists(path)) {
- debug("Ignoring to set ACL for %s, because it doesn't exist".format(path))
- } else {
- zkClient.makeSurePersistentPathExists(path)
- setAclsRecursively(path)
- }
- }
-
- @tailrec
- def recurse(): Unit = {
- val future = futures.synchronized {
- futures.headOption
- }
- future match {
- case Some(a) =>
- Await.result(a, 6000 millis)
- futures.synchronized { futures.dequeue() }
- recurse()
- case None =>
- }
- }
- recurse()
-
- } finally {
- zkClient.close()
- }
- }
-
- private def checkPathExistenceAndMaybeExit(enablePathCheck: Boolean): Unit = {
- val nonExistingSecureRootPaths = ZkData.SecureRootPaths.filterNot(zkClient.pathExists)
- if (nonExistingSecureRootPaths.nonEmpty) {
- println(s"Warning: The following secure root paths do not exist in ZooKeeper: ${nonExistingSecureRootPaths.mkString(",")}")
- println("That might be due to an incorrect chroot is specified when executing the command.")
- if (enablePathCheck) {
- println("Exit the command.")
- // must exit with non-zero status so system tests will know we failed
- Exit.exit(1)
- }
- }
- }
-}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 7bc3c03391c7d..67d3d3d3624a9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -391,6 +391,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
producerEpoch: Short,
partitions: collection.Set[TopicPartition],
responseCallback: AddPartitionsCallback,
+ clientTransactionVersion: TransactionVersion,
requestLocal: RequestLocal = RequestLocal.noCaching): Unit = {
if (transactionalId == null || transactionalId.isEmpty) {
debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request")
@@ -420,7 +421,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// this is an optimization: if the partitions are already in the metadata reply OK immediately
Left(Errors.NONE)
} else {
- Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds()))
+ Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds(), clientTransactionVersion))
}
}
}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index dc52ea134020a..31daebac76391 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -255,7 +255,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
def prepareNoTransit(): TxnTransitMetadata = {
// do not call transitTo as it will set the pending state, a follow-up call to abort the transaction will set its pending state
TxnTransitMetadata(producerId, previousProducerId, nextProducerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, state, topicPartitions.toSet,
- txnStartTimestamp, txnLastUpdateTimestamp, TransactionVersion.TV_0)
+ txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion)
}
def prepareFenceProducerEpoch(): TxnTransitMetadata = {
@@ -267,7 +267,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else (producerEpoch + 1).toShort
prepareTransitionTo(PrepareEpochFence, producerId, bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
- topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp)
+ topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion)
}
def prepareIncrementProducerEpoch(newTxnTimeoutMs: Int,
@@ -306,7 +306,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
epochBumpResult match {
case Right((nextEpoch, lastEpoch)) => Right(prepareTransitionTo(Empty, producerId, nextEpoch, lastEpoch, newTxnTimeoutMs,
- immutable.Set.empty[TopicPartition], -1, updateTimestamp))
+ immutable.Set.empty[TopicPartition], -1, updateTimestamp, clientTransactionVersion))
case Left(err) => Left(err)
}
@@ -320,17 +320,17 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
throw new IllegalStateException("Cannot rotate producer ids while a transaction is still pending")
prepareTransitionTo(Empty, newProducerId, 0, if (recordLastEpoch) producerEpoch else RecordBatch.NO_PRODUCER_EPOCH,
- newTxnTimeoutMs, immutable.Set.empty[TopicPartition], -1, updateTimestamp)
+ newTxnTimeoutMs, immutable.Set.empty[TopicPartition], -1, updateTimestamp, clientTransactionVersion)
}
- def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = {
+ def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long, clientTransactionVersion: TransactionVersion): TxnTransitMetadata = {
val newTxnStartTimestamp = state match {
case Empty | CompleteAbort | CompleteCommit => updateTimestamp
case _ => txnStartTimestamp
}
prepareTransitionTo(Ongoing, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs,
- (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, updateTimestamp)
+ (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, updateTimestamp, clientTransactionVersion)
}
def prepareAbortOrCommit(newState: TransactionState, clientTransactionVersion: TransactionVersion, nextProducerId: Long, updateTimestamp: Long, noPartitionAdded: Boolean): TxnTransitMetadata = {
@@ -371,7 +371,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
def prepareDead(): TxnTransitMetadata = {
prepareTransitionTo(Dead, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition],
- txnStartTimestamp, txnLastUpdateTimestamp)
+ txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion)
}
/**
@@ -394,8 +394,9 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
updatedTxnTimeoutMs: Int,
updatedTopicPartitions: immutable.Set[TopicPartition],
updatedTxnStartTimestamp: Long,
- updateTimestamp: Long): TxnTransitMetadata = {
- prepareTransitionTo(updatedState, updatedProducerId, RecordBatch.NO_PRODUCER_ID, updatedEpoch, updatedLastEpoch, updatedTxnTimeoutMs, updatedTopicPartitions, updatedTxnStartTimestamp, updateTimestamp, TransactionVersion.TV_0)
+ updateTimestamp: Long,
+ clientTransactionVersion: TransactionVersion): TxnTransitMetadata = {
+ prepareTransitionTo(updatedState, updatedProducerId, RecordBatch.NO_PRODUCER_ID, updatedEpoch, updatedLastEpoch, updatedTxnTimeoutMs, updatedTopicPartitions, updatedTxnStartTimestamp, updateTimestamp, clientTransactionVersion)
}
private def prepareTransitionTo(updatedState: TransactionState,
@@ -613,7 +614,8 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
s"pendingState=$pendingState, " +
s"topicPartitions=$topicPartitions, " +
s"txnStartTimestamp=$txnStartTimestamp, " +
- s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp)"
+ s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp, " +
+ s"clientTransactionVersion=$clientTransactionVersion)"
}
override def equals(that: Any): Boolean = that match {
@@ -626,13 +628,14 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
state.equals(other.state) &&
topicPartitions.equals(other.topicPartitions) &&
txnStartTimestamp == other.txnStartTimestamp &&
- txnLastUpdateTimestamp == other.txnLastUpdateTimestamp
+ txnLastUpdateTimestamp == other.txnLastUpdateTimestamp &&
+ clientTransactionVersion == other.clientTransactionVersion
case _ => false
}
override def hashCode(): Int = {
val fields = Seq(transactionalId, producerId, producerEpoch, txnTimeoutMs, state, topicPartitions,
- txnStartTimestamp, txnLastUpdateTimestamp)
+ txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion)
fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
diff --git a/core/src/main/scala/kafka/log/OffsetResultHolder.scala b/core/src/main/scala/kafka/log/OffsetResultHolder.scala
index 64b78c6cee912..89951dbb96f2b 100644
--- a/core/src/main/scala/kafka/log/OffsetResultHolder.scala
+++ b/core/src/main/scala/kafka/log/OffsetResultHolder.scala
@@ -18,8 +18,7 @@ package kafka.log
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
-
-import java.util.concurrent.{CompletableFuture, Future}
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset],
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) {
@@ -27,12 +26,3 @@ case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset],
var maybeOffsetsError: Option[ApiException] = None
var lastFetchableOffset: Option[Long] = None
}
-
-/**
- * A remote log offset read task future holder. It contains two futures:
- * 1. JobFuture - Use this future to cancel the running job.
- * 2. TaskFuture - Use this future to get the result of the job/computation.
- */
-case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture: CompletableFuture[T]) {
-
-}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7a5ac37f0a3e4..ba5eef40e5cec 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2328,14 +2328,11 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, createResponse)
}
- // If the request is greater than version 4, we know the client supports transaction version 2.
- val clientTransactionVersion = if (endTxnRequest.version() > 4) TransactionVersion.TV_2 else TransactionVersion.TV_0
-
txnCoordinator.handleEndTransaction(endTxnRequest.data.transactionalId,
endTxnRequest.data.producerId,
endTxnRequest.data.producerEpoch,
endTxnRequest.result(),
- clientTransactionVersion,
+ TransactionVersion.transactionVersionForEndTxn(endTxnRequest),
sendResponseCallback,
requestLocal)
} else
@@ -2614,6 +2611,7 @@ class KafkaApis(val requestChannel: RequestChannel,
transaction.producerEpoch,
authorizedPartitions,
sendResponseCallback,
+ TransactionVersion.transactionVersionForAddPartitionsToTxn(addPartitionsToTxnRequest),
requestLocal)
} else {
txnCoordinator.handleVerifyPartitionsInTransaction(transactionalId,
@@ -2673,6 +2671,7 @@ class KafkaApis(val requestChannel: RequestChannel,
addOffsetsToTxnRequest.data.producerEpoch,
Set(offsetTopicPartition),
sendResponseCallback,
+ TransactionVersion.TV_0, // This request will always come from the client not using TV 2.
requestLocal)
}
}
diff --git a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
index 702d0a4ccb8ef..d9fb9e6d059db 100644
--- a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
+++ b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
@@ -16,10 +16,10 @@
*/
package kafka.server
-import kafka.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
class ListOffsetsPartitionStatus(val futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]],
val lastFetchableOffset: Option[Long],
diff --git a/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala
deleted file mode 100644
index 31a7ba2907379..0000000000000
--- a/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.zk
-
-import org.apache.zookeeper.ZooKeeper
-
-/**
- * This class should only be used in ZkSecurityMigrator tool.
- * This class will be removed after we migrate ZkSecurityMigrator away from ZK's asynchronous API.
- * @param kafkaZkClient
- */
-class ZkSecurityMigratorUtils(val kafkaZkClient: KafkaZkClient) {
-
- def currentZooKeeper: ZooKeeper = kafkaZkClient.currentZooKeeper
-
-}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
index ce027f8f91510..1313c8e2898ed 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
@@ -16,7 +16,6 @@
*/
package kafka.log.remote;
-import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
@@ -28,6 +27,7 @@
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index be853d9d990bf..99aefe0e51b87 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -50,6 +51,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
def generateConfigs: scala.collection.Seq[KafkaConfig] = {
val overridingProps = new Properties()
val numServers = 2
+ overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toShort)
overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
TestUtils.createBrokerConfigs(
numServers,
@@ -367,7 +369,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_16176"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSendToPartitionWithFollowerShutdownShouldNotTimeout(quorum: String, groupProtocol: String): Unit = {
// This test produces to a leader that has follower that is shutting down. It shows that
// the produce request succeed, do not timeout and do not need to be retried.
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 7c360285561f1..fbdff9c4fc33e 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1914,12 +1914,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// Test that we can get information about the test consumer group.
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+ assertEquals(groupType == GroupType.CLASSIC, testGroupDescription.groupEpoch.isEmpty)
+ assertEquals(groupType == GroupType.CLASSIC, testGroupDescription.targetAssignmentEpoch.isEmpty)
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(groupInstanceSet.size, testGroupDescription.members().size())
val members = testGroupDescription.members()
- members.asScala.foreach(member => assertEquals(testClientId, member.clientId()))
+ members.asScala.foreach { member =>
+ assertEquals(testClientId, member.clientId)
+ assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded)
+ }
val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
topicSet.foreach { topic =>
val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty)
@@ -2051,6 +2056,89 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ classicConsumer.poll(JDuration.ofMillis(100))
+ consumerConsumer.poll(JDuration.ofMillis(100))
+ val describeConsumerGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).groupState == GroupState.STABLE &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
+ }, s"Expected to find 2 members in a stable group $testGroupId")
+
+ val describeConsumerGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ val group = describeConsumerGroupResult.get(testGroupId)
+ assertNotNull(group)
+ assertEquals(Optional.of(2), group.groupEpoch)
+ assertEquals(Optional.of(2), group.targetAssignmentEpoch)
+
+ val classicMember = group.members.asScala.find(_.clientId == testClassicClientId)
+ assertTrue(classicMember.isDefined)
+ assertEquals(Optional.of(2), classicMember.get.memberEpoch)
+ assertEquals(Optional.of(false), classicMember.get.upgraded)
+
+ val consumerMember = group.members.asScala.find(_.clientId == testConsumerClientId)
+ assertTrue(consumerMember.isDefined)
+ assertEquals(Optional.of(2), consumerMember.get.memberEpoch)
+ assertEquals(Optional.of(true), consumerMember.get.upgraded)
+ } finally {
+ Utils.closeQuietly(classicConsumer, "classicConsumer")
+ Utils.closeQuietly(consumerConsumer, "consumerConsumer")
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
/**
* Test the consumer group APIs.
*/
@@ -2539,9 +2627,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}, "Expected to find all groups")
val classicConsumers = client.describeClassicGroups(groupIds.asJavaCollection).all().get()
- assertNotNull(classicConsumers.get(classicGroupId))
- assertEquals(classicGroupId, classicConsumers.get(classicGroupId).groupId())
- assertEquals("consumer", classicConsumers.get(classicGroupId).protocol())
+ val classicConsumer = classicConsumers.get(classicGroupId)
+ assertNotNull(classicConsumer)
+ assertEquals(classicGroupId, classicConsumer.groupId)
+ assertEquals("consumer", classicConsumer.protocol)
+ assertFalse(classicConsumer.members.isEmpty)
+ classicConsumer.members.forEach(member => assertTrue(member.upgraded.isEmpty))
assertNotNull(classicConsumers.get(simpleGroupId))
assertEquals(simpleGroupId, classicConsumers.get(simpleGroupId).groupId())
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index f9c6d18b4703b..5eea54b23d1d3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -193,10 +193,10 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
val consumer = createConsumer()
assertEquals(0, consumer.assignment().size)
- val pattern = new SubscriptionPattern("t.*c")
+ var pattern = new SubscriptionPattern("t.*c")
consumer.subscribe(pattern)
- val assignment = Set(
+ var assignment = Set(
new TopicPartition(topic, 0),
new TopicPartition(topic, 1),
new TopicPartition(topic1, 0),
@@ -204,6 +204,86 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
awaitAssignment(consumer, assignment)
consumer.unsubscribe()
assertEquals(0, consumer.assignment().size)
+
+ // Subscribe to a different pattern to match topic2 (that did not match before)
+ pattern = new SubscriptionPattern(topic2 + ".*")
+ consumer.subscribe(pattern)
+
+ assignment = Set(
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1))
+ awaitAssignment(consumer, assignment)
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: String): Unit = {
+ val topic1 = "topic1" // matches first pattern
+ createTopic(topic1, 2, brokerCount)
+
+ val topic2 = "topic2" // does not match first pattern
+ createTopic(topic2, 2, brokerCount)
+
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ var pattern = new SubscriptionPattern("topic1.*")
+ consumer.subscribe(pattern)
+ val assignment = Set(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1))
+ awaitAssignment(consumer, assignment)
+
+ consumer.unsubscribe()
+ assertEquals(0, consumer.assignment().size)
+
+ // Subscribe to a different pattern that should match
+ // the same topics the member already had plus new ones
+ pattern = new SubscriptionPattern("topic1|topic2")
+ consumer.subscribe(pattern)
+
+ val expandedAssignment = assignment ++ Set(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
+ awaitAssignment(consumer, expandedAssignment)
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRe2JPatternSubscriptionAndTopicSubscription(quorum: String, groupProtocol: String): Unit = {
+ val topic1 = "topic1" // matches subscribed pattern
+ createTopic(topic1, 2, brokerCount)
+
+ val topic11 = "topic11" // matches subscribed pattern
+ createTopic(topic11, 2, brokerCount)
+
+ val topic2 = "topic2" // does not match subscribed pattern
+ createTopic(topic2, 2, brokerCount)
+
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ // Subscribe to pattern
+ val pattern = new SubscriptionPattern("topic1.*")
+ consumer.subscribe(pattern)
+ val patternAssignment = Set(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1),
+ new TopicPartition(topic11, 0),
+ new TopicPartition(topic11, 1))
+ awaitAssignment(consumer, patternAssignment)
+ consumer.unsubscribe()
+ assertEquals(0, consumer.assignment().size)
+
+ // Subscribe to explicit topic names
+ consumer.subscribe(List(topic2).asJava)
+ val assignment = Set(
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1))
+ awaitAssignment(consumer, assignment)
+ consumer.unsubscribe()
+
+ // Subscribe to pattern again
+ consumer.subscribe(pattern)
+ awaitAssignment(consumer, patternAssignment)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
index 4061e6aaaf8ff..eaa4589595954 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
@@ -16,7 +16,6 @@
*/
package kafka.server
-import kafka.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse
@@ -25,6 +24,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.util.timer.MockTimer
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions.assertEquals
import org.mockito.ArgumentMatchers.anyBoolean
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index f4d6816da8cc4..ac59f026b0c2f 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -575,7 +575,6 @@ object QuorumTestHarness {
// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
// implementation that would otherwise cause tests to fail.
- def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_16176: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17960: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17964: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index f446eb2bfb2ef..28019efc0c6a4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -547,6 +547,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
txnMetadata.producerEpoch,
partitions,
resultCallback,
+ TransactionVersion.TV_2,
RequestLocal.withThreadConfinedCaching)
replicaManager.tryCompleteActions()
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index f3302b1293531..ab5ff72cd982b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -209,19 +209,19 @@ class TransactionCoordinatorTest {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(None))
- coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
}
@Test
def shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty(): Unit = {
- coordinator.handleAddPartitionsToTransaction("", 0L, 1, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction("", 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.INVALID_REQUEST, error)
}
@Test
def shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsNull(): Unit = {
- coordinator.handleAddPartitionsToTransaction(null, 0L, 1, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction(null, 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.INVALID_REQUEST, error)
}
@@ -230,7 +230,7 @@ class TransactionCoordinatorTest {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Left(Errors.NOT_COORDINATOR))
- coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.NOT_COORDINATOR, error)
}
@@ -239,7 +239,7 @@ class TransactionCoordinatorTest {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Left(Errors.COORDINATOR_LOAD_IN_PROGRESS))
- coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0)
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, error)
}
@@ -313,7 +313,7 @@ class TransactionCoordinatorTest {
new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, state, mutable.Set.empty, 0, 0, TV_2)))))
- coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_2)
assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
}
@@ -325,7 +325,7 @@ class TransactionCoordinatorTest {
new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID,
10, 9, 0, PrepareCommit, mutable.Set.empty, 0, 0, TV_2)))))
- coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_2)
assertEquals(Errors.PRODUCER_FENCED, error)
}
@@ -359,7 +359,7 @@ class TransactionCoordinatorTest {
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
- coordinator.handleAddPartitionsToTransaction(transactionalId, producerId, producerEpoch, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction(transactionalId, producerId, producerEpoch, partitions, errorsCallback, clientTransactionVersion)
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
verify(transactionManager).appendTransactionToLog(
@@ -379,7 +379,7 @@ class TransactionCoordinatorTest {
new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID,
0, RecordBatch.NO_PRODUCER_EPOCH, 0, Empty, partitions, 0, 0, TV_0)))))
- coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback)
+ coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_0)
assertEquals(Errors.NONE, error)
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
index 2da9c96fa2077..6b2d20e69eb65 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
@@ -253,7 +253,7 @@ class TransactionMetadataTest {
clientTransactionVersion = TV_0)
// let new time be smaller; when transiting from Empty the start time would be updated to the update-time
- var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1)
+ var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1, TV_0)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions)
assertEquals(producerId, txnMetadata.producerId)
@@ -263,7 +263,7 @@ class TransactionMetadataTest {
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
// add another partition, check that in Ongoing state the start timestamp would not change to update time
- transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2)
+ transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2, TV_0)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)), txnMetadata.topicPartitions)
assertEquals(producerId, txnMetadata.producerId)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 78da50f782bc8..d12df190c8bdf 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -389,7 +389,7 @@ class TransactionStateManagerTest {
// update the metadata to ongoing with two partitions
val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
- new TopicPartition("topic1", 1)), time.milliseconds())
+ new TopicPartition("topic1", 1)), time.milliseconds(), TV_0)
// append the new metadata into log
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching)
@@ -404,7 +404,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.COORDINATOR_NOT_AVAILABLE
- var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
val requestLocal = RequestLocal.withThreadConfinedCaching
@@ -412,19 +412,19 @@ class TransactionStateManagerTest {
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
- failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
- failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
- failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
@@ -437,7 +437,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.NOT_COORDINATOR
- var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER)
val requestLocal = RequestLocal.withThreadConfinedCaching
@@ -445,7 +445,7 @@ class TransactionStateManagerTest {
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
- failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NONE)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
@@ -468,7 +468,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.COORDINATOR_LOAD_IN_PROGRESS
- val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.NONE)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
@@ -482,7 +482,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.UNKNOWN_SERVER_ERROR
- var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
val requestLocal = RequestLocal.withThreadConfinedCaching
@@ -490,7 +490,7 @@ class TransactionStateManagerTest {
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
- failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
@@ -503,7 +503,7 @@ class TransactionStateManagerTest {
transactionManager.putTransactionStateIfNotExists(txnMetadata1)
expectedError = Errors.COORDINATOR_NOT_AVAILABLE
- val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
+ val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0)
prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true, RequestLocal.withThreadConfinedCaching)
@@ -522,7 +522,7 @@ class TransactionStateManagerTest {
expectedError = Errors.NOT_COORDINATOR
val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
- new TopicPartition("topic1", 1)), time.milliseconds())
+ new TopicPartition("topic1", 1)), time.milliseconds(), TV_0)
// modify the cache while trying to append the new metadata
txnMetadata1.producerEpoch = (txnMetadata1.producerEpoch + 1).toShort
@@ -541,7 +541,7 @@ class TransactionStateManagerTest {
expectedError = Errors.INVALID_PRODUCER_EPOCH
val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
- new TopicPartition("topic1", 1)), time.milliseconds())
+ new TopicPartition("topic1", 1)), time.milliseconds(), TV_0)
// modify the cache while trying to append the new metadata
txnMetadata1.pendingState = None
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a009b2f10a242..a36598a5eebca 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2301,6 +2301,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(epoch),
ArgumentMatchers.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
responseCallback.capture(),
+ ArgumentMatchers.eq(TransactionVersion.TV_0),
ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
val kafkaApis = createKafkaApis()
@@ -2359,6 +2360,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(epoch),
ArgumentMatchers.eq(Set(topicPartition)),
responseCallback.capture(),
+ ArgumentMatchers.eq(TransactionVersion.TV_0),
ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
val kafkaApis = createKafkaApis()
@@ -2434,6 +2436,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(epoch),
ArgumentMatchers.eq(Set(tp0)),
responseCallback.capture(),
+ any[TransactionVersion],
ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.NONE))
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 84e609fcd9202..4cc3f968d2769 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -49,29 +49,50 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa
numPartitions = 3
)
+ def instanceId(memberId: String): String = "instance_" + memberId
+ val memberIds = Range(0, 3).map { __ =>
+ Uuid.randomUuid().toString
+ }
+
for (version <- 3 to ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) {
- val memberId = Uuid.randomUuid().toString
- assertEquals(Errors.NONE.code, consumerGroupHeartbeat(
- groupId = "group",
- memberId = memberId,
- memberEpoch = 0,
- instanceId = "instance-id",
- rebalanceTimeoutMs = 5 * 60 * 1000,
- subscribedTopicNames = List("foo"),
- topicPartitions = List.empty,
- ).errorCode)
+ // Join with all the members.
+ memberIds.foreach { memberId =>
+ assertEquals(Errors.NONE.code, consumerGroupHeartbeat(
+ groupId = "group",
+ memberId = memberId,
+ memberEpoch = 0,
+ instanceId = instanceId(memberId),
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List("foo"),
+ topicPartitions = List.empty,
+ ).errorCode)
+ }
assertEquals(
new LeaveGroupResponseData()
.setMembers(List(
new LeaveGroupResponseData.MemberResponse()
- .setMemberId(memberId)
- .setGroupInstanceId("instance-id")
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setGroupInstanceId(instanceId(memberIds(0))),
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId(memberIds(1))
+ .setGroupInstanceId(instanceId(memberIds(1))),
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId(memberIds(2))
+ .setGroupInstanceId(null)
).asJava),
classicLeaveGroup(
groupId = "group",
- memberIds = List(JoinGroupRequest.UNKNOWN_MEMBER_ID),
- groupInstanceIds = List("instance-id"),
+ memberIds = List(
+ JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ memberIds(1),
+ memberIds(2)
+ ),
+ groupInstanceIds = List(
+ instanceId(memberIds(0)),
+ instanceId(memberIds(1)),
+ null
+ ),
version = version.toShort
)
)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 5967aeb9cee35..dcd9c4334d75a 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -130,6 +130,8 @@ The kafka.admin.ZkSecurityMigrator
tool was removed.
+
Connect
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index dd0c6954088b4..6b9a5e7cbd5aa 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -6020,7 +6020,7 @@ public CoordinatorResult classicGroup
}
if (group.type() == CLASSIC) {
- return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request);
+ return classicGroupLeaveToClassicGroup((ClassicGroup) group, request);
} else if (group.type() == CONSUMER) {
return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, request);
} else {
@@ -6046,48 +6046,46 @@ private CoordinatorResult classicGrou
List records = new ArrayList<>();
for (MemberIdentity memberIdentity : request.members()) {
- String memberId = memberIdentity.memberId();
- String instanceId = memberIdentity.groupInstanceId();
String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided";
- ConsumerGroupMember member;
try {
- if (instanceId == null) {
- member = group.getOrMaybeCreateMember(memberId, false);
- throwIfMemberDoesNotUseClassicProtocol(member);
+ ConsumerGroupMember member;
+
+ if (memberIdentity.groupInstanceId() == null) {
+ member = group.getOrMaybeCreateMember(memberIdentity.memberId(), false);
log.info("[GroupId {}] Dynamic member {} has left group " +
"through explicit `LeaveGroup` request; client reason: {}",
- groupId, memberId, reason);
+ groupId, memberIdentity.memberId(), reason);
} else {
- member = group.staticMember(instanceId);
- throwIfStaticMemberIsUnknown(member, instanceId);
+ member = group.staticMember(memberIdentity.groupInstanceId());
+ throwIfStaticMemberIsUnknown(member, memberIdentity.groupInstanceId());
// The LeaveGroup API allows administrative removal of members by GroupInstanceId
// in which case we expect the MemberId to be undefined.
- if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
- throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
- throwIfMemberDoesNotUseClassicProtocol(member);
+ if (!UNKNOWN_MEMBER_ID.equals(memberIdentity.memberId())) {
+ throwIfInstanceIdIsFenced(member, groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId());
}
- memberId = member.memberId();
log.info("[GroupId {}] Static member {} with instance id {} has left group " +
"through explicit `LeaveGroup` request; client reason: {}",
- groupId, memberId, instanceId, reason);
+ groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId(), reason);
}
- removeMember(records, groupId, memberId);
- cancelTimers(groupId, memberId);
+ removeMember(records, groupId, member.memberId());
+ cancelTimers(groupId, member.memberId());
+
memberResponses.add(
new MemberResponse()
- .setMemberId(memberId)
- .setGroupInstanceId(instanceId)
+ .setMemberId(memberIdentity.memberId())
+ .setGroupInstanceId(memberIdentity.groupInstanceId())
);
+
validLeaveGroupMembers.add(member);
} catch (KafkaException e) {
memberResponses.add(
new MemberResponse()
- .setMemberId(memberId)
- .setGroupInstanceId(instanceId)
+ .setMemberId(memberIdentity.memberId())
+ .setGroupInstanceId(memberIdentity.groupInstanceId())
.setErrorCode(Errors.forException(e).code())
);
}
@@ -6126,7 +6124,6 @@ private CoordinatorResult classicGrou
* Handle a classic LeaveGroupRequest to a ClassicGroup.
*
* @param group The ClassicGroup.
- * @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the GroupMetadata record to append if the group
@@ -6134,7 +6131,6 @@ private CoordinatorResult classicGrou
*/
private CoordinatorResult classicGroupLeaveToClassicGroup(
ClassicGroup group,
- RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException {
if (group.isInState(DEAD)) {
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 18540253487d1..0535f763b4f5f 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -117,8 +117,6 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
-import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
@@ -13861,15 +13859,17 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
context.assertJoinTimeout(groupId, memberId2, member2.rebalanceTimeoutMs());
context.assertSessionTimeout(groupId, memberId2, member2.classicMemberMetadata().get().sessionTimeoutMs());
- // Member 1 and member 2 leave the group.
+ // Member 1, member 2 and member 3 leave the group.
CoordinatorResult leaveResult = context.sendClassicGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(List.of(
// Valid member id.
new MemberIdentity()
- .setMemberId(memberId1),
+ .setMemberId(memberId1)
+ .setGroupInstanceId(null),
new MemberIdentity()
+ .setMemberId(UNKNOWN_MEMBER_ID)
.setGroupInstanceId(instanceId2),
// Member that doesn't use the classic protocol.
new MemberIdentity()
@@ -13877,8 +13877,10 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
.setGroupInstanceId(instanceId3),
// Unknown member id.
new MemberIdentity()
- .setMemberId("unknown-member-id"),
+ .setMemberId("unknown-member-id")
+ .setGroupInstanceId(null),
new MemberIdentity()
+ .setMemberId(UNKNOWN_MEMBER_ID)
.setGroupInstanceId("unknown-instance-id"),
// Fenced instance id.
new MemberIdentity()
@@ -13895,11 +13897,10 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
.setMemberId(memberId1),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(instanceId2)
- .setMemberId(memberId2),
+ .setMemberId(UNKNOWN_MEMBER_ID),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(instanceId3)
- .setMemberId(memberId3)
- .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+ .setMemberId(memberId3),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId("unknown-member-id")
@@ -13908,8 +13909,8 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
.setGroupInstanceId("unknown-instance-id")
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
new LeaveGroupResponseData.MemberResponse()
- .setGroupInstanceId(instanceId3)
.setMemberId("unknown-member-id")
+ .setGroupInstanceId(instanceId3)
.setErrorCode(Errors.FENCED_INSTANCE_ID.code())
)),
leaveResult.response()
@@ -13924,6 +13925,12 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId2),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2),
+ // Remove member 3.
+ GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId3),
+ GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId3),
+ GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId3),
+ // Update subscription metadata.
+ GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap()),
// Bump the group epoch.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
);
@@ -14045,7 +14052,7 @@ public void testClassicGroupLeaveToConsumerGroupWithoutValidLeaveGroupMember() {
String groupId = "group-id";
String memberId = Uuid.randomUuid().toString();
- // Consumer group without member using the classic protocol.
+ // Consumer group.
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId)
@@ -14058,9 +14065,7 @@ public void testClassicGroupLeaveToConsumerGroupWithoutValidLeaveGroupMember() {
.setGroupId("group-id")
.setMembers(List.of(
new MemberIdentity()
- .setMemberId("unknown-member-id"),
- new MemberIdentity()
- .setMemberId(memberId)
+ .setMemberId("unknown-member-id")
))
);
@@ -14070,10 +14075,6 @@ public void testClassicGroupLeaveToConsumerGroupWithoutValidLeaveGroupMember() {
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId("unknown-member-id")
- .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
- new LeaveGroupResponseData.MemberResponse()
- .setGroupInstanceId(null)
- .setMemberId(memberId)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
)),
leaveResult.response()
@@ -16047,9 +16048,15 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
new LeaveGroupRequestData()
.setGroupId(groupId)
.setMembers(List.of(
- new MemberIdentity().setGroupInstanceId(memberId1),
- new MemberIdentity().setGroupInstanceId(memberId2),
- new MemberIdentity().setGroupInstanceId(memberId3)
+ new MemberIdentity()
+ .setMemberId(memberId1)
+ .setGroupInstanceId(null),
+ new MemberIdentity()
+ .setMemberId(memberId2)
+ .setGroupInstanceId(memberId2),
+ new MemberIdentity()
+ .setMemberId(UNKNOWN_MEMBER_ID)
+ .setGroupInstanceId(memberId3)
))
);
@@ -16058,12 +16065,12 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
.setMembers(List.of(
new LeaveGroupResponseData.MemberResponse()
.setMemberId(memberId1)
- .setGroupInstanceId(memberId1),
+ .setGroupInstanceId(null),
new LeaveGroupResponseData.MemberResponse()
.setMemberId(memberId2)
.setGroupInstanceId(memberId2),
new LeaveGroupResponseData.MemberResponse()
- .setMemberId(memberId3)
+ .setMemberId(UNKNOWN_MEMBER_ID)
.setGroupInstanceId(memberId3)
)),
result.response()
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java
index 069440d35c9b9..45546c447b036 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.server.common;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.EndTxnRequest;
+
import java.util.Collections;
import java.util.Map;
@@ -55,6 +58,16 @@ public static TransactionVersion fromFeatureLevel(short version) {
return (TransactionVersion) Feature.TRANSACTION_VERSION.fromFeatureLevel(version, true);
}
+ public static TransactionVersion transactionVersionForAddPartitionsToTxn(AddPartitionsToTxnRequest request) {
+ // If the request is greater than version 3, we know the client supports transaction version 2.
+ return request.version() > 3 ? TV_2 : TV_0;
+ }
+
+ public static TransactionVersion transactionVersionForEndTxn(EndTxnRequest request) {
+ // If the request is greater than version 4, we know the client supports transaction version 2.
+ return request.version() > 4 ? TV_2 : TV_0;
+ }
+
@Override
public String featureName() {
return FEATURE_NAME;
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java
new file mode 100644
index 0000000000000..990a5ef67ddf0
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+/**
+ * A remote log offset read task future holder. It contains two futures:
+ *
+ * JobFuture - Use this future to cancel the running job.
+ * TaskFuture - Use this future to get the result of the job/computation.
+ *
+ */
+public record AsyncOffsetReadFutureHolder(Future jobFuture, CompletableFuture taskFuture) {
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index dcb8fd054812d..7a134ac0c9610 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -32,6 +32,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -139,8 +140,12 @@ public void testAdminRequestsForDescribeNegativeOffsets() throws Exception {
true,
Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions))),
RangeAssignor.class.getName(),
+ GroupType.CLASSIC,
GroupState.STABLE,
- new Node(1, "localhost", 9092));
+ new Node(1, "localhost", 9092),
+ Set.of(),
+ Optional.empty(),
+ Optional.empty());
Function, ArgumentMatcher>> offsetsArgMatcher = expectedPartitions ->
topicPartitionOffsets -> topicPartitionOffsets != null && topicPartitionOffsets.keySet().equals(expectedPartitions);
@@ -233,8 +238,12 @@ private DescribeConsumerGroupsResult describeGroupsResult(GroupState groupState)
true,
Collections.singleton(member1),
RangeAssignor.class.getName(),
+ GroupType.CLASSIC,
groupState,
- new Node(1, "localhost", 9092));
+ new Node(1, "localhost", 9092),
+ Set.of(),
+ Optional.empty(),
+ Optional.empty());
KafkaFutureImpl future = new KafkaFutureImpl<>();
future.complete(description);
return new DescribeConsumerGroupsResult(Collections.singletonMap(GROUP, future));