From e5598a9d8c7ec134740af48911cc60e72bf638eb Mon Sep 17 00:00:00 2001 From: Kirk True Date: Mon, 9 Dec 2024 23:01:51 -0800 Subject: [PATCH 01/13] KAFKA-18040; fix for test that ensures produce during follower shutdown (#18108) Test lacked the proper configuration for the offset topic replication. As a result, when the follower was shut down, the coordinator did not failover properly. Reviewers: TaiJuWu , David Jacot --- .../scala/integration/kafka/api/BaseProducerSendTest.scala | 4 +++- .../scala/integration/kafka/server/QuorumTestHarness.scala | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index be853d9d990bf..99aefe0e51b87 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -34,6 +34,7 @@ import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.server.config.ServerLogConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} @@ -50,6 +51,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { def generateConfigs: scala.collection.Seq[KafkaConfig] = { val overridingProps = new Properties() val numServers = 2 + overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toShort) overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString) TestUtils.createBrokerConfigs( numServers, @@ -367,7 +369,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_16176")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testSendToPartitionWithFollowerShutdownShouldNotTimeout(quorum: String, groupProtocol: String): Unit = { // This test produces to a leader that has follower that is shutting down. It shows that // the produce request succeed, do not timeout and do not need to be retried. diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index f4d6816da8cc4..ac59f026b0c2f 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -575,7 +575,6 @@ object QuorumTestHarness { // The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer // implementation that would otherwise cause tests to fail. - def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_16176: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17960: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17964: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly From 08aa8ec3bfdbf2c508ceaf3072bba516366d7a1a Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 10 Dec 2024 09:00:08 +0100 Subject: [PATCH 02/13] KAFKA-18178 Remove ZkSecurityMigrator (#18092) Reviewers: Chia-Ping Tsai --- .../kafka/admin/ZkSecurityMigrator.scala | 307 ------------------ .../kafka/zk/ZkSecurityMigratorUtils.scala | 30 -- docs/upgrade.html | 2 + 3 files changed, 2 insertions(+), 337 deletions(-) delete mode 100644 core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala delete mode 100644 core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala deleted file mode 100644 index 77662c3b11464..0000000000000 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ /dev/null @@ -1,307 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - -import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder} -import kafka.server.KafkaConfig -import kafka.utils.{Logging, ToolsUtils} -import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} -import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.utils.{Exit, Time, Utils} -import org.apache.kafka.server.config.ZkConfigs -import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} -import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} -import org.apache.zookeeper.KeeperException -import org.apache.zookeeper.KeeperException.Code -import org.apache.zookeeper.client.ZKClientConfig -import org.apache.zookeeper.data.Stat - -import scala.annotation.tailrec -import scala.collection.mutable -import scala.jdk.CollectionConverters._ -import scala.concurrent._ -import scala.concurrent.duration._ - -/** - * This tool is to be used when making access to ZooKeeper authenticated or - * the other way around, when removing authenticated access. The exact steps - * to migrate a Kafka cluster from unsecure to secure with respect to ZooKeeper - * access are the following: - * - * 1- Perform a rolling upgrade of Kafka servers, setting zookeeper.set.acl to false - * and passing a valid JAAS login file via the system property - * java.security.auth.login.config - * 2- Perform a second rolling upgrade keeping the system property for the login file - * and now setting zookeeper.set.acl to true - * 3- Finally run this tool. There is a script under ./bin. Run - * ./bin/zookeeper-security-migration.sh --help - * to see the configuration parameters. An example of running it is the following: - * ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181 - * - * To convert a cluster from secure to unsecure, we need to perform the following - * steps: - * 1- Perform a rolling upgrade setting zookeeper.set.acl to false for each server - * 2- Run this migration tool, setting zookeeper.acl to unsecure - * 3- Perform another rolling upgrade to remove the system property setting the - * login file (java.security.auth.login.config). - */ - -object ZkSecurityMigrator extends Logging { - private val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of " - + "znodes as part of the process of setting up ZooKeeper " - + "authentication.") - private val tlsConfigFileOption = "zk-tls-config-file" - - def run(args: Array[String]): Unit = { - val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) - val opts = new ZkSecurityMigratorOptions(args) - - CommandLineUtils.maybePrintHelpOrVersion(opts, usageMessage) - - // Must have either SASL or TLS mutual authentication enabled to use this tool. - // Instantiate the client config we will use so that we take into account config provided via the CLI option - // and system properties passed via -D parameters if no CLI option is given. - val zkClientConfig = createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile).getOrElse(new ZKClientConfig()) - val tlsClientAuthEnabled = KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig) - if (jaasFile == null && !tlsClientAuthEnabled) { - val errorMsg = s"No JAAS configuration file has been specified and no TLS client certificate has been specified. Please make sure that you set " + - s"the system property ${JaasUtils.JAVA_LOGIN_CONFIG_PARAM} or provide a ZooKeeper client TLS configuration via --$tlsConfigFileOption " + - s"identifying at least ${ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG}, ${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG}, and ${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG}" - System.err.println("ERROR: %s".format(errorMsg)) - throw new IllegalArgumentException("Incorrect configuration") - } - - if (!tlsClientAuthEnabled && !JaasUtils.isZkSaslEnabled) { - val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile) - System.out.println("ERROR: %s".format(errorMsg)) - throw new IllegalArgumentException("Incorrect configuration") - } - - val zkAcl = opts.options.valueOf(opts.zkAclOpt) match { - case "secure" => - info("zookeeper.acl option is secure") - true - case "unsecure" => - info("zookeeper.acl option is unsecure") - false - case _ => - ToolsUtils.printUsageAndExit(opts.parser, usageMessage) - } - val zkUrl = opts.options.valueOf(opts.zkUrlOpt) - val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue - val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue - val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout, - Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ZkSecurityMigrator", enableEntityConfigControllerCheck = false) - val enablePathCheck = opts.options.has(opts.enablePathCheckOpt) - val migrator = new ZkSecurityMigrator(zkClient) - migrator.run(enablePathCheck) - } - - def main(args: Array[String]): Unit = { - try { - run(args) - } catch { - case e: Exception => - e.printStackTrace() - // must exit with non-zero status so system tests will know we failed - Exit.exit(1) - } - } - - def createZkClientConfigFromFile(filename: String) : ZKClientConfig = { - val zkTlsConfigFileProps = Utils.loadProps(filename, ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.asJava) - val zkClientConfig = new ZKClientConfig() // Initializes based on any system properties that have been set - // Now override any set system properties with explicitly-provided values from the config file - // Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make - info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename") - zkTlsConfigFileProps.asScala.foreachEntry { (key, value) => - info(s"Setting $key") - KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value) - } - zkClientConfig - } - - private[admin] def createZkClientConfigFromOption(options: OptionSet, option: OptionSpec[String]) : Option[ZKClientConfig] = - if (!options.has(option)) - None - else - Some(createZkClientConfigFromFile(options.valueOf(option))) - - private class ZkSecurityMigratorOptions(args: Array[String]) extends CommandDefaultOptions(args) { - val zkAclOpt: OptionSpec[String] = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure." - + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String]) - val zkUrlOpt: OptionSpec[String] = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " + - "takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181"). - ofType(classOf[String]) - val zkSessionTimeoutOpt: OptionSpec[Integer] = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout."). - withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000) - val zkConnectionTimeoutOpt: OptionSpec[Integer] = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout."). - withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000) - val enablePathCheckOpt: OptionSpecBuilder = parser.accepts("enable.path.check", "Checks if all the root paths exist in ZooKeeper " + - "before migration. If not, exit the command.") - val zkTlsConfigFile: OptionSpec[String] = parser.accepts(tlsConfigFileOption, - "Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " + - ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.mkString(", ") + " are ignored.") - .withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String]) - options = parser.parse(args : _*) - } -} - -class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { - private val zkSecurityMigratorUtils = new ZkSecurityMigratorUtils(zkClient) - private val futures = new mutable.Queue[Future[String]] - - private def setAcl(path: String, setPromise: Promise[String]): Unit = { - info("Setting ACL for path %s".format(path)) - zkSecurityMigratorUtils.currentZooKeeper.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, setPromise) - } - - private def retrieveChildren(path: String, childrenPromise: Promise[String]): Unit = { - info("Getting children to set ACLs for path %s".format(path)) - zkSecurityMigratorUtils.currentZooKeeper.getChildren(path, false, GetChildrenCallback, childrenPromise) - } - - private def setAclIndividually(path: String): Unit = { - val setPromise = Promise[String]() - futures.synchronized { - futures += setPromise.future - } - setAcl(path, setPromise) - } - - private def setAclsRecursively(path: String): Unit = { - val setPromise = Promise[String]() - val childrenPromise = Promise[String]() - futures.synchronized { - futures += setPromise.future - futures += childrenPromise.future - } - setAcl(path, setPromise) - retrieveChildren(path, childrenPromise) - } - - private object GetChildrenCallback extends ChildrenCallback { - def processResult(rc: Int, - path: String, - ctx: Object, - children: java.util.List[String]): Unit = { - val zkHandle = zkSecurityMigratorUtils.currentZooKeeper - val promise = ctx.asInstanceOf[Promise[String]] - Code.get(rc) match { - case Code.OK => - // Set ACL for each child - children.asScala.map { child => - path match { - case "/" => s"/$child" - case path => s"$path/$child" - } - }.foreach(setAclsRecursively) - promise success "done" - case Code.CONNECTIONLOSS => - zkHandle.getChildren(path, false, GetChildrenCallback, ctx) - case Code.NONODE => - warn("Node is gone, it could be have been legitimately deleted: %s".format(path)) - promise success "done" - case Code.SESSIONEXPIRED => - // Starting a new session isn't really a problem, but it'd complicate - // the logic of the tool, so we quit and let the user re-run it. - System.out.println("ZooKeeper session expired while changing ACLs") - promise failure KeeperException.create(Code.get(rc)) - case _ => - System.out.println("Unexpected return code: %d".format(rc)) - promise failure KeeperException.create(Code.get(rc)) - } - } - } - - private object SetACLCallback extends StatCallback { - def processResult(rc: Int, - path: String, - ctx: Object, - stat: Stat): Unit = { - val zkHandle = zkSecurityMigratorUtils.currentZooKeeper - val promise = ctx.asInstanceOf[Promise[String]] - - Code.get(rc) match { - case Code.OK => - info("Successfully set ACLs for %s".format(path)) - promise success "done" - case Code.CONNECTIONLOSS => - zkHandle.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, ctx) - case Code.NONODE => - warn("Znode is gone, it could be have been legitimately deleted: %s".format(path)) - promise success "done" - case Code.SESSIONEXPIRED => - // Starting a new session isn't really a problem, but it'd complicate - // the logic of the tool, so we quit and let the user re-run it. - System.out.println("ZooKeeper session expired while changing ACLs") - promise failure KeeperException.create(Code.get(rc)) - case _ => - System.out.println("Unexpected return code: %d".format(rc)) - promise failure KeeperException.create(Code.get(rc)) - } - } - } - - private def run(enablePathCheck: Boolean): Unit = { - try { - setAclIndividually("/") - checkPathExistenceAndMaybeExit(enablePathCheck) - for (path <- ZkData.SecureRootPaths) { - debug("Going to set ACL for %s".format(path)) - if (path == ControllerZNode.path && !zkClient.pathExists(path)) { - debug("Ignoring to set ACL for %s, because it doesn't exist".format(path)) - } else { - zkClient.makeSurePersistentPathExists(path) - setAclsRecursively(path) - } - } - - @tailrec - def recurse(): Unit = { - val future = futures.synchronized { - futures.headOption - } - future match { - case Some(a) => - Await.result(a, 6000 millis) - futures.synchronized { futures.dequeue() } - recurse() - case None => - } - } - recurse() - - } finally { - zkClient.close() - } - } - - private def checkPathExistenceAndMaybeExit(enablePathCheck: Boolean): Unit = { - val nonExistingSecureRootPaths = ZkData.SecureRootPaths.filterNot(zkClient.pathExists) - if (nonExistingSecureRootPaths.nonEmpty) { - println(s"Warning: The following secure root paths do not exist in ZooKeeper: ${nonExistingSecureRootPaths.mkString(",")}") - println("That might be due to an incorrect chroot is specified when executing the command.") - if (enablePathCheck) { - println("Exit the command.") - // must exit with non-zero status so system tests will know we failed - Exit.exit(1) - } - } - } -} diff --git a/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala deleted file mode 100644 index 31a7ba2907379..0000000000000 --- a/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.zk - -import org.apache.zookeeper.ZooKeeper - -/** - * This class should only be used in ZkSecurityMigrator tool. - * This class will be removed after we migrate ZkSecurityMigrator away from ZK's asynchronous API. - * @param kafkaZkClient - */ -class ZkSecurityMigratorUtils(val kafkaZkClient: KafkaZkClient) { - - def currentZooKeeper: ZooKeeper = kafkaZkClient.currentZooKeeper - -} diff --git a/docs/upgrade.html b/docs/upgrade.html index 5967aeb9cee35..dcd9c4334d75a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -130,6 +130,8 @@
Notable changes in 4 and it will fall directly if the broker doesn't support incrementalAlterConfigs API, which means the broker version is prior to 2.3.x. See KIP-1011 for more details. +
  • The kafka.admin.ZkSecurityMigrator tool was removed. +
  • Connect From c8380ae77950bbd2161cf14cbdeb007562655f2f Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 10 Dec 2024 21:02:20 +0800 Subject: [PATCH 03/13] KAFKA-17750: Extend kafka-consumer-groups command line tool to support new consumer group (part 2) (#18034) * Add fields `groupEpoch` and `targetAssignmentEpoch` to `ConsumerGroupDescription.java`. * Add fields `memberEpoch` and `upgraded` to `MemberDescription.java`. * Add assertion to `PlaintextAdminIntegrationTest#testDescribeClassicGroups` to make sure member in classic group returns `upgraded` as `Optional.empty`. * Add new case `testConsumerGroupWithMemberMigration` to `PlaintextAdminIntegrationTest` to make sure migration member has correct `upgraded` value. Add assertion for `groupEpoch`, `targetAssignmentEpoch`, `memberEpoch` as well. Reviewers: David Jacot Signed-off-by: PoAn Yang --- .../admin/ConsumerGroupDescription.java | 63 +++++++----- .../clients/admin/MemberDescription.java | 75 +++++++++++++- .../DescribeClassicGroupsHandler.java | 5 +- .../DescribeConsumerGroupsHandler.java | 17 +++- .../clients/admin/KafkaAdminClientTest.java | 23 ++++- .../clients/admin/MemberDescriptionTest.java | 33 +++++++ .../DescribeConsumerGroupsHandlerTest.java | 90 ++++++++++++----- .../api/PlaintextAdminIntegrationTest.scala | 99 ++++++++++++++++++- .../group/ConsumerGroupServiceTest.java | 13 ++- 9 files changed, 347 insertions(+), 71 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 4cbc5b4b43bb3..dd1b4b4cb5c7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -42,9 +43,11 @@ public class ConsumerGroupDescription { private final GroupState groupState; private final Node coordinator; private final Set authorizedOperations; + private final Optional groupEpoch; + private final Optional targetAssignmentEpoch; /** - * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node)}. + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}. */ @Deprecated public ConsumerGroupDescription(String groupId, @@ -57,7 +60,7 @@ public ConsumerGroupDescription(String groupId, } /** - * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node, Set)}. + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}. */ @Deprecated public ConsumerGroupDescription(String groupId, @@ -71,7 +74,7 @@ public ConsumerGroupDescription(String groupId, } /** - * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set)}. + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}. */ @Deprecated public ConsumerGroupDescription(String groupId, @@ -90,25 +93,8 @@ public ConsumerGroupDescription(String groupId, this.groupState = GroupState.parse(state.name()); this.coordinator = coordinator; this.authorizedOperations = authorizedOperations; - } - - public ConsumerGroupDescription(String groupId, - boolean isSimpleConsumerGroup, - Collection members, - String partitionAssignor, - GroupState groupState, - Node coordinator) { - this(groupId, isSimpleConsumerGroup, members, partitionAssignor, groupState, coordinator, Collections.emptySet()); - } - - public ConsumerGroupDescription(String groupId, - boolean isSimpleConsumerGroup, - Collection members, - String partitionAssignor, - GroupState groupState, - Node coordinator, - Set authorizedOperations) { - this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, groupState, coordinator, authorizedOperations); + this.groupEpoch = Optional.empty(); + this.targetAssignmentEpoch = Optional.empty(); } public ConsumerGroupDescription(String groupId, @@ -118,7 +104,9 @@ public ConsumerGroupDescription(String groupId, GroupType type, GroupState groupState, Node coordinator, - Set authorizedOperations) { + Set authorizedOperations, + Optional groupEpoch, + Optional targetAssignmentEpoch) { this.groupId = groupId == null ? "" : groupId; this.isSimpleConsumerGroup = isSimpleConsumerGroup; this.members = members == null ? Collections.emptyList() : List.copyOf(members); @@ -127,6 +115,8 @@ public ConsumerGroupDescription(String groupId, this.groupState = groupState; this.coordinator = coordinator; this.authorizedOperations = authorizedOperations; + this.groupEpoch = groupEpoch; + this.targetAssignmentEpoch = targetAssignmentEpoch; } @Override @@ -141,12 +131,15 @@ public boolean equals(final Object o) { type == that.type && groupState == that.groupState && Objects.equals(coordinator, that.coordinator) && - Objects.equals(authorizedOperations, that.authorizedOperations); + Objects.equals(authorizedOperations, that.authorizedOperations) && + Objects.equals(groupEpoch, that.groupEpoch) && + Objects.equals(targetAssignmentEpoch, that.targetAssignmentEpoch); } @Override public int hashCode() { - return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator, authorizedOperations); + return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator, + authorizedOperations, groupEpoch, targetAssignmentEpoch); } /** @@ -215,6 +208,24 @@ public Set authorizedOperations() { return authorizedOperations; } + /** + * The epoch of the consumer group. + * The optional is set to an integer if it is a {@link GroupType#CONSUMER} group, and to empty if it + * is a {@link GroupType#CLASSIC} group. + */ + public Optional groupEpoch() { + return groupEpoch; + } + + /** + * The epoch of the target assignment. + * The optional is set to an integer if it is a {@link GroupType#CONSUMER} group, and to empty if it + * is a {@link GroupType#CLASSIC} group. + */ + public Optional targetAssignmentEpoch() { + return targetAssignmentEpoch; + } + @Override public String toString() { return "(groupId=" + groupId + @@ -225,6 +236,8 @@ public String toString() { ", groupState=" + groupState + ", coordinator=" + coordinator + ", authorizedOperations=" + authorizedOperations + + ", groupEpoch=" + groupEpoch.orElse(null) + + ", targetAssignmentEpoch=" + targetAssignmentEpoch.orElse(null) + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java index 5ca7dba86f8f4..0785f2e67155f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.GroupType; + import java.util.Collections; import java.util.Objects; import java.util.Optional; @@ -30,13 +32,18 @@ public class MemberDescription { private final String host; private final MemberAssignment assignment; private final Optional targetAssignment; + private final Optional memberEpoch; + private final Optional upgraded; - public MemberDescription(String memberId, + public MemberDescription( + String memberId, Optional groupInstanceId, String clientId, String host, MemberAssignment assignment, - Optional targetAssignment + Optional targetAssignment, + Optional memberEpoch, + Optional upgraded ) { this.memberId = memberId == null ? "" : memberId; this.groupInstanceId = groupInstanceId; @@ -45,8 +52,38 @@ public MemberDescription(String memberId, this.assignment = assignment == null ? new MemberAssignment(Collections.emptySet()) : assignment; this.targetAssignment = targetAssignment; + this.memberEpoch = memberEpoch; + this.upgraded = upgraded; } + /** + * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. + */ + @Deprecated + public MemberDescription( + String memberId, + Optional groupInstanceId, + String clientId, + String host, + MemberAssignment assignment, + Optional targetAssignment + ) { + this( + memberId, + groupInstanceId, + clientId, + host, + assignment, + targetAssignment, + Optional.empty(), + Optional.empty() + ); + } + + /** + * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. + */ + @Deprecated public MemberDescription( String memberId, Optional groupInstanceId, @@ -64,6 +101,10 @@ public MemberDescription( ); } + /** + * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. + */ + @Deprecated public MemberDescription(String memberId, String clientId, String host, @@ -81,12 +122,14 @@ public boolean equals(Object o) { clientId.equals(that.clientId) && host.equals(that.host) && assignment.equals(that.assignment) && - targetAssignment.equals(that.targetAssignment); + targetAssignment.equals(that.targetAssignment) && + memberEpoch.equals(that.memberEpoch) && + upgraded.equals(that.upgraded); } @Override public int hashCode() { - return Objects.hash(memberId, groupInstanceId, clientId, host, assignment, targetAssignment); + return Objects.hash(memberId, groupInstanceId, clientId, host, assignment, targetAssignment, memberEpoch, upgraded); } /** @@ -131,6 +174,25 @@ public Optional targetAssignment() { return targetAssignment; } + /** + * The epoch of the group member. + * The optional is set to an integer if the member is in a {@link GroupType#CONSUMER} group, and to empty if it + * is in a {@link GroupType#CLASSIC} group. + */ + public Optional memberEpoch() { + return memberEpoch; + } + + /** + * The flag indicating whether a member within a {@link GroupType#CONSUMER} group uses the + * {@link GroupType#CONSUMER} protocol. + * The optional is set to true if it does, to false if it does not, and to empty if it is unknown or if the group + * is a {@link GroupType#CLASSIC} group. + */ + public Optional upgraded() { + return upgraded; + } + @Override public String toString() { return "(memberId=" + memberId + @@ -138,6 +200,9 @@ public String toString() { ", clientId=" + clientId + ", host=" + host + ", assignment=" + assignment + - ", targetAssignment=" + targetAssignment + ")"; + ", targetAssignment=" + targetAssignment + + ", memberEpoch=" + memberEpoch.orElse(null) + + ", upgraded=" + upgraded.orElse(null) + + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java index 77c04c5d5f02e..686ee43a44b2b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java @@ -136,7 +136,10 @@ public ApiResult handleResponse( Optional.ofNullable(groupMember.groupInstanceId()), groupMember.clientId(), groupMember.clientHost(), - new MemberAssignment(partitions))); + new MemberAssignment(partitions), + Optional.empty(), + Optional.empty(), + Optional.empty())); }); final ClassicGroupDescription classicGroupDescription = diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 1d911e2f0c7f4..457675e92675a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -222,7 +222,9 @@ private ApiResult handledConsumerGroup groupMember.clientId(), groupMember.clientHost(), new MemberAssignment(convertAssignment(groupMember.assignment())), - Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment()))) + Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment()))), + Optional.of(groupMember.memberEpoch()), + groupMember.memberType() == -1 ? Optional.empty() : Optional.of(groupMember.memberType() == 1) )) ); @@ -235,7 +237,9 @@ private ApiResult handledConsumerGroup GroupType.CONSUMER, GroupState.parse(describedGroup.groupState()), coordinator, - authorizedOperations + authorizedOperations, + Optional.of(describedGroup.groupEpoch()), + Optional.of(describedGroup.assignmentEpoch()) ); completed.put(groupIdKey, consumerGroupDescription); } @@ -281,7 +285,10 @@ private ApiResult handledClassicGroupR Optional.ofNullable(groupMember.groupInstanceId()), groupMember.clientId(), groupMember.clientHost(), - new MemberAssignment(partitions))); + new MemberAssignment(partitions), + Optional.empty(), + Optional.empty(), + Optional.empty())); } final ConsumerGroupDescription consumerGroupDescription = new ConsumerGroupDescription(groupIdKey.idValue, protocolType.isEmpty(), @@ -290,7 +297,9 @@ private ApiResult handledClassicGroupR GroupType.CLASSIC, GroupState.parse(describedGroup.groupState()), coordinator, - authorizedOperations); + authorizedOperations, + Optional.empty(), + Optional.empty()); completed.put(groupIdKey, consumerGroupDescription); } else { failed.put(groupIdKey, new IllegalArgumentException( diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index b0b48e33c67ff..44f6e1f5a8891 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4057,6 +4057,7 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { .setTopicName("foo") .setPartitions(singletonList(1)) ))) + .setMemberType((byte) 1) )), new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId("grp2") @@ -4110,14 +4111,18 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { ), Optional.of(new MemberAssignment( Collections.singleton(new TopicPartition("foo", 1)) - )) + )), + Optional.of(10), + Optional.of(true) ) ), "range", GroupType.CONSUMER, GroupState.STABLE, env.cluster().controller(), - Collections.emptySet() + Collections.emptySet(), + Optional.of(10), + Optional.of(10) )); expectedResult.put("grp2", new ConsumerGroupDescription( "grp2", @@ -4130,14 +4135,19 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { "clientHost", new MemberAssignment( Collections.singleton(new TopicPartition("bar", 0)) - ) + ), + Optional.empty(), + Optional.empty(), + Optional.empty() ) ), "range", GroupType.CLASSIC, GroupState.STABLE, env.cluster().controller(), - Collections.emptySet() + Collections.emptySet(), + Optional.empty(), + Optional.empty() )); assertEquals(expectedResult, result.all().get()); @@ -8674,7 +8684,10 @@ private static MemberDescription convertToMemberDescriptions(DescribedGroupMembe Optional.ofNullable(member.groupInstanceId()), member.clientId(), member.clientHost(), - assignment); + assignment, + Optional.empty(), + Optional.empty(), + Optional.empty()); } private static ShareMemberDescription convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java index 0bddc618cfc03..16ce11d7361e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java @@ -99,5 +99,38 @@ public void testNonEqual() { assertNotEquals(STATIC_MEMBER_DESCRIPTION, newInstanceDescription); assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newInstanceDescription.hashCode()); + + MemberDescription newTargetAssignmentDescription = new MemberDescription(MEMBER_ID, + INSTANCE_ID, + CLIENT_ID, + HOST, + ASSIGNMENT, + Optional.of(ASSIGNMENT), + Optional.empty(), + Optional.empty()); + assertNotEquals(STATIC_MEMBER_DESCRIPTION, newTargetAssignmentDescription); + assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newTargetAssignmentDescription.hashCode()); + + MemberDescription newMemberEpochDescription = new MemberDescription(MEMBER_ID, + INSTANCE_ID, + CLIENT_ID, + HOST, + ASSIGNMENT, + Optional.empty(), + Optional.of(1), + Optional.empty()); + assertNotEquals(STATIC_MEMBER_DESCRIPTION, newMemberEpochDescription); + assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newMemberEpochDescription.hashCode()); + + MemberDescription newIsClassicDescription = new MemberDescription(MEMBER_ID, + INSTANCE_ID, + CLIENT_ID, + HOST, + ASSIGNMENT, + Optional.empty(), + Optional.empty(), + Optional.of(false)); + assertNotEquals(STATIC_MEMBER_DESCRIPTION, newIsClassicDescription); + assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newIsClassicDescription.hashCode()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java index cfbf67e2090d8..20cf0b761e641 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -54,6 +55,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.List; import java.util.Optional; import java.util.Set; @@ -152,29 +154,46 @@ public void testInvalidBuildRequest() { @Test public void testSuccessfulHandleConsumerGroupResponse() { DescribeConsumerGroupsHandler handler = new DescribeConsumerGroupsHandler(false, logContext); - Collection members = singletonList(new MemberDescription( - "memberId", - Optional.of("instanceId"), - "clientId", - "host", - new MemberAssignment(Set.of( - new TopicPartition("foo", 0), - new TopicPartition("bar", 1)) + Collection members = List.of( + new MemberDescription( + "memberId", + Optional.of("instanceId"), + "clientId", + "host", + new MemberAssignment(Set.of( + new TopicPartition("foo", 0) + )), + Optional.of(new MemberAssignment(Set.of( + new TopicPartition("foo", 1) + ))), + Optional.of(10), + Optional.of(true) ), - Optional.of(new MemberAssignment(Set.of( - new TopicPartition("foo", 1), - new TopicPartition("bar", 2) - ))) - )); + new MemberDescription( + "memberId-classic", + Optional.of("instanceId-classic"), + "clientId-classic", + "host", + new MemberAssignment(Set.of( + new TopicPartition("bar", 0) + )), + Optional.of(new MemberAssignment(Set.of( + new TopicPartition("bar", 1) + ))), + Optional.of(9), + Optional.of(false) + )); ConsumerGroupDescription expected = new ConsumerGroupDescription( groupId1, false, members, "range", GroupType.CONSUMER, - ConsumerGroupState.STABLE, + GroupState.STABLE, coordinator, - Collections.emptySet() + Collections.emptySet(), + Optional.of(10), + Optional.of(10) ); AdminApiHandler.ApiResult result = handler.handleResponse( coordinator, @@ -189,7 +208,7 @@ public void testSuccessfulHandleConsumerGroupResponse() { .setAssignmentEpoch(10) .setAssignorName("range") .setAuthorizedOperations(Utils.to32BitField(emptySet())) - .setMembers(singletonList( + .setMembers(List.of( new ConsumerGroupDescribeResponseData.Member() .setMemberId("memberId") .setInstanceId("instanceId") @@ -200,27 +219,44 @@ public void testSuccessfulHandleConsumerGroupResponse() { .setSubscribedTopicNames(singletonList("foo")) .setSubscribedTopicRegex("regex") .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() - .setTopicPartitions(Arrays.asList( + .setTopicPartitions(List.of( new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(Uuid.randomUuid()) .setTopicName("foo") - .setPartitions(Collections.singletonList(0)), + .setPartitions(Collections.singletonList(0)) + ))) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List.of( new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(Uuid.randomUuid()) - .setTopicName("bar") + .setTopicName("foo") .setPartitions(Collections.singletonList(1)) ))) - .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() - .setTopicPartitions(Arrays.asList( + .setMemberType((byte) 1), + new ConsumerGroupDescribeResponseData.Member() + .setMemberId("memberId-classic") + .setInstanceId("instanceId-classic") + .setClientHost("host") + .setClientId("clientId-classic") + .setMemberEpoch(9) + .setRackId("rackid") + .setSubscribedTopicNames(singletonList("bar")) + .setSubscribedTopicRegex("regex") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List.of( new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(Uuid.randomUuid()) - .setTopicName("foo") - .setPartitions(Collections.singletonList(1)), + .setTopicName("bar") + .setPartitions(Collections.singletonList(0)) + ))) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List.of( new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(Uuid.randomUuid()) .setTopicName("bar") - .setPartitions(Collections.singletonList(2)) + .setPartitions(Collections.singletonList(1)) ))) + .setMemberType((byte) 0) )) )) ) @@ -232,9 +268,13 @@ public void testSuccessfulHandleConsumerGroupResponse() { public void testSuccessfulHandleClassicGroupResponse() { Collection members = singletonList(new MemberDescription( "memberId", + Optional.empty(), "clientId", "host", - new MemberAssignment(tps))); + new MemberAssignment(tps), + Optional.empty(), + Optional.empty(), + Optional.empty())); ConsumerGroupDescription expected = new ConsumerGroupDescription( groupId1, true, diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 64d9cc94c2dda..bd381f0306ecc 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1921,12 +1921,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Test that we can get information about the test consumer group. assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId)) var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get() + assertEquals(groupType == GroupType.CLASSIC, testGroupDescription.groupEpoch.isEmpty) + assertEquals(groupType == GroupType.CLASSIC, testGroupDescription.targetAssignmentEpoch.isEmpty) assertEquals(testGroupId, testGroupDescription.groupId()) assertFalse(testGroupDescription.isSimpleConsumerGroup) assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) val members = testGroupDescription.members() - members.asScala.foreach(member => assertEquals(testClientId, member.clientId())) + members.asScala.foreach { member => + assertEquals(testClientId, member.clientId) + assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded) + } val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()) topicSet.foreach { topic => val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty) @@ -2058,6 +2063,89 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + /** + * Test the consumer group APIs. + */ + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testConsumerGroupWithMemberMigration(quorum: String): Unit = { + val config = createConfig + client = Admin.create(config) + var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null + var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null + try { + // Verify that initially there are no consumer groups to list. + val list1 = client.listConsumerGroups + assertEquals(0, list1.all.get.size) + assertEquals(0, list1.errors.get.size) + assertEquals(0, list1.valid.get.size) + val testTopicName = "test_topic" + val testNumPartitions = 2 + + client.createTopics(util.Arrays.asList( + new NewTopic(testTopicName, testNumPartitions, 1.toShort), + )).all.get + waitForTopics(client, List(testTopicName), List()) + + val producer = createProducer() + try { + producer.send(new ProducerRecord(testTopicName, 0, null, null)) + producer.send(new ProducerRecord(testTopicName, 1, null, null)) + producer.flush() + } finally { + Utils.closeQuietly(producer, "producer") + } + + val testGroupId = "test_group_id" + val testClassicClientId = "test_classic_client_id" + val testConsumerClientId = "test_consumer_client_id" + + val newConsumerConfig = new Properties(consumerConfig) + newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, testClassicClientId) + consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name) + + classicConsumer = createConsumer(configOverrides = newConsumerConfig) + classicConsumer.subscribe(List(testTopicName).asJava) + classicConsumer.poll(JDuration.ofMillis(1000)) + + newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, testConsumerClientId) + consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name) + consumerConsumer = createConsumer(configOverrides = newConsumerConfig) + consumerConsumer.subscribe(List(testTopicName).asJava) + consumerConsumer.poll(JDuration.ofMillis(1000)) + + TestUtils.waitUntilTrue(() => { + classicConsumer.poll(JDuration.ofMillis(100)) + consumerConsumer.poll(JDuration.ofMillis(100)) + val describeConsumerGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava).all.get + describeConsumerGroupResult.containsKey(testGroupId) && + describeConsumerGroupResult.get(testGroupId).groupState == GroupState.STABLE && + describeConsumerGroupResult.get(testGroupId).members.size == 2 + }, s"Expected to find 2 members in a stable group $testGroupId") + + val describeConsumerGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava).all.get + val group = describeConsumerGroupResult.get(testGroupId) + assertNotNull(group) + assertEquals(Optional.of(2), group.groupEpoch) + assertEquals(Optional.of(2), group.targetAssignmentEpoch) + + val classicMember = group.members.asScala.find(_.clientId == testClassicClientId) + assertTrue(classicMember.isDefined) + assertEquals(Optional.of(2), classicMember.get.memberEpoch) + assertEquals(Optional.of(false), classicMember.get.upgraded) + + val consumerMember = group.members.asScala.find(_.clientId == testConsumerClientId) + assertTrue(consumerMember.isDefined) + assertEquals(Optional.of(2), consumerMember.get.memberEpoch) + assertEquals(Optional.of(true), consumerMember.get.upgraded) + } finally { + Utils.closeQuietly(classicConsumer, "classicConsumer") + Utils.closeQuietly(consumerConsumer, "consumerConsumer") + Utils.closeQuietly(client, "adminClient") + } + } + /** * Test the consumer group APIs. */ @@ -2546,9 +2634,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { }, "Expected to find all groups") val classicConsumers = client.describeClassicGroups(groupIds.asJavaCollection).all().get() - assertNotNull(classicConsumers.get(classicGroupId)) - assertEquals(classicGroupId, classicConsumers.get(classicGroupId).groupId()) - assertEquals("consumer", classicConsumers.get(classicGroupId).protocol()) + val classicConsumer = classicConsumers.get(classicGroupId) + assertNotNull(classicConsumer) + assertEquals(classicGroupId, classicConsumer.groupId) + assertEquals("consumer", classicConsumer.protocol) + assertFalse(classicConsumer.members.isEmpty) + classicConsumer.members.forEach(member => assertTrue(member.upgraded.isEmpty)) assertNotNull(classicConsumers.get(simpleGroupId)) assertEquals(simpleGroupId, classicConsumers.get(simpleGroupId).groupId()) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java index dcb8fd054812d..7a134ac0c9610 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -139,8 +140,12 @@ public void testAdminRequestsForDescribeNegativeOffsets() throws Exception { true, Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions))), RangeAssignor.class.getName(), + GroupType.CLASSIC, GroupState.STABLE, - new Node(1, "localhost", 9092)); + new Node(1, "localhost", 9092), + Set.of(), + Optional.empty(), + Optional.empty()); Function, ArgumentMatcher>> offsetsArgMatcher = expectedPartitions -> topicPartitionOffsets -> topicPartitionOffsets != null && topicPartitionOffsets.keySet().equals(expectedPartitions); @@ -233,8 +238,12 @@ private DescribeConsumerGroupsResult describeGroupsResult(GroupState groupState) true, Collections.singleton(member1), RangeAssignor.class.getName(), + GroupType.CLASSIC, groupState, - new Node(1, "localhost", 9092)); + new Node(1, "localhost", 9092), + Set.of(), + Optional.empty(), + Optional.empty()); KafkaFutureImpl future = new KafkaFutureImpl<>(); future.complete(description); return new DescribeConsumerGroupsResult(Collections.singletonMap(GROUP, future)); From b99c22770aafb873935d6daed4b942d326c05aba Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Tue, 10 Dec 2024 09:01:02 -0500 Subject: [PATCH 04/13] regex integration tests (#18079) Reviewers: David Jacot --- .../PlaintextConsumerSubscriptionTest.scala | 84 ++++++++++++++++++- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala index f9c6d18b4703b..5eea54b23d1d3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala @@ -193,10 +193,10 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { val consumer = createConsumer() assertEquals(0, consumer.assignment().size) - val pattern = new SubscriptionPattern("t.*c") + var pattern = new SubscriptionPattern("t.*c") consumer.subscribe(pattern) - val assignment = Set( + var assignment = Set( new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic1, 0), @@ -204,6 +204,86 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { awaitAssignment(consumer, assignment) consumer.unsubscribe() assertEquals(0, consumer.assignment().size) + + // Subscribe to a different pattern to match topic2 (that did not match before) + pattern = new SubscriptionPattern(topic2 + ".*") + consumer.subscribe(pattern) + + assignment = Set( + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) + awaitAssignment(consumer, assignment) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: String): Unit = { + val topic1 = "topic1" // matches first pattern + createTopic(topic1, 2, brokerCount) + + val topic2 = "topic2" // does not match first pattern + createTopic(topic2, 2, brokerCount) + + val consumer = createConsumer() + assertEquals(0, consumer.assignment().size) + + var pattern = new SubscriptionPattern("topic1.*") + consumer.subscribe(pattern) + val assignment = Set( + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1)) + awaitAssignment(consumer, assignment) + + consumer.unsubscribe() + assertEquals(0, consumer.assignment().size) + + // Subscribe to a different pattern that should match + // the same topics the member already had plus new ones + pattern = new SubscriptionPattern("topic1|topic2") + consumer.subscribe(pattern) + + val expandedAssignment = assignment ++ Set(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)) + awaitAssignment(consumer, expandedAssignment) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testRe2JPatternSubscriptionAndTopicSubscription(quorum: String, groupProtocol: String): Unit = { + val topic1 = "topic1" // matches subscribed pattern + createTopic(topic1, 2, brokerCount) + + val topic11 = "topic11" // matches subscribed pattern + createTopic(topic11, 2, brokerCount) + + val topic2 = "topic2" // does not match subscribed pattern + createTopic(topic2, 2, brokerCount) + + val consumer = createConsumer() + assertEquals(0, consumer.assignment().size) + + // Subscribe to pattern + val pattern = new SubscriptionPattern("topic1.*") + consumer.subscribe(pattern) + val patternAssignment = Set( + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1), + new TopicPartition(topic11, 0), + new TopicPartition(topic11, 1)) + awaitAssignment(consumer, patternAssignment) + consumer.unsubscribe() + assertEquals(0, consumer.assignment().size) + + // Subscribe to explicit topic names + consumer.subscribe(List(topic2).asJava) + val assignment = Set( + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) + awaitAssignment(consumer, assignment) + consumer.unsubscribe() + + // Subscribe to pattern again + consumer.subscribe(pattern) + awaitAssignment(consumer, patternAssignment) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) From f57fd2d9fd1c8a240d6c15ad35e83fd9283a958f Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Tue, 10 Dec 2024 23:27:42 +0800 Subject: [PATCH 05/13] MINOR: Logs warning message when user invoke producer#flush within callback (#18112) Reviewers: Andrew Schofield --- .../org/apache/kafka/clients/producer/KafkaProducer.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1b1ffcc9d15de..0229c43cb8b3c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1219,11 +1219,19 @@ private void ensureValidRecordSize(int size) { * flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)} * calls made since the previous {@link #beginTransaction()} are completed before the commit. *

    + *

    + * Important: This method should not be used within the callback provided to + * {@link #send(ProducerRecord, Callback)}. Invoking flush() in this context will cause a deadlock. + *

    * * @throws InterruptException If the thread is interrupted while blocked */ @Override public void flush() { + if (Thread.currentThread() == this.ioThread) { + log.error("KafkaProducer.flush() invocation inside a callback will cause a deadlock."); + } + log.trace("Flushing accumulated records in producer."); long start = time.nanoseconds(); From 45835a0e459b3d1a7203d73c94e7b393bdc89dca Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 10 Dec 2024 22:25:38 +0500 Subject: [PATCH 06/13] MINOR: Cleanup connect runtime module (#18074) Reviewers: Mickael Maison --- .../kafka/connect/cli/ConnectStandalone.java | 7 +-- .../kafka/connect/runtime/ConnectMetrics.java | 6 +-- .../runtime/ExactlyOnceWorkerSourceTask.java | 2 +- .../kafka/connect/runtime/StateTracker.java | 4 +- .../kafka/connect/runtime/WorkerSinkTask.java | 8 ++-- .../connect/runtime/WorkerSourceTask.java | 12 +++--- .../kafka/connect/runtime/WorkerTask.java | 9 ++-- .../distributed/DistributedHerder.java | 4 +- .../distributed/WorkerGroupMember.java | 3 +- .../kafka/connect/tools/PredicateDoc.java | 32 ++++++-------- .../connect/tools/TransformationDoc.java | 32 ++++++-------- .../apache/kafka/connect/util/Callback.java | 2 +- .../kafka/connect/util/KafkaBasedLog.java | 36 ++++++++-------- .../apache/kafka/connect/util/RetryUtil.java | 2 +- .../connect/util/SafeObjectInputStream.java | 30 +++++-------- .../runtime/ErrorHandlingTaskTest.java | 11 ----- .../kafka/connect/runtime/WorkerTest.java | 16 +++---- .../connect/runtime/WorkerTestUtils.java | 43 ------------------- .../runtime/rest/RestServerConfigTest.java | 2 - .../resources/ConnectorsResourceTest.java | 5 --- .../storage/KafkaConfigBackingStoreTest.java | 7 --- 21 files changed, 87 insertions(+), 186 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java index 120e03c6f8e3b..43af6b274b6ac 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java @@ -121,9 +121,7 @@ CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws I File connectorConfigurationFile = Paths.get(filePath).toFile(); try { - Map connectorConfigs = objectMapper.readValue( - connectorConfigurationFile, - new TypeReference>() { }); + Map connectorConfigs = objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { }); if (!connectorConfigs.containsKey(NAME_CONFIG)) { throw new ConnectException("Connector configuration at '" + filePath + "' is missing the mandatory '" + NAME_CONFIG + "' " @@ -136,8 +134,7 @@ CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws I try { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile, - new TypeReference() { }); + CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { }); if (createConnectorRequest.config().containsKey(NAME_CONFIG)) { if (!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name())) { throw new ConnectException("Connector name configuration in 'config' doesn't match the one specified in 'name' at '" + filePath diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java index ff62c25eee585..430cad52b8f6f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java @@ -81,8 +81,7 @@ public ConnectMetrics(String workerId, WorkerConfig config, Time time, String cl .timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel( Sensor.RecordingLevel.forName(metricsRecordingLevel)); - Map contextLabels = new HashMap<>(); - contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); + Map contextLabels = new HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId); Object groupId = config.originals().get(DistributedConfig.GROUP_ID_CONFIG); if (groupId != null) { @@ -391,8 +390,7 @@ public Sensor sensor(String name, MetricConfig config, Sensor... parents) { public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) { // We need to make sure that all sensor names are unique across all groups, so use the sensor prefix Sensor result = metrics.sensor(sensorPrefix + name, config, Long.MAX_VALUE, recordingLevel, parents); - if (result != null) - sensorNames.add(result.name()); + sensorNames.add(result.name()); return result; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index bcff615c4147e..d837776be3829 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -321,7 +321,7 @@ private void commitTransaction() { error = flushError.get(); if (error != null) { - recordCommitFailure(time.milliseconds() - started, null); + recordCommitFailure(time.milliseconds() - started); offsetWriter.cancelFlush(); throw maybeWrapProducerSendException( "Failed to flush offsets and/or records for task " + id, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java index 7c10f42148e00..9dddec09ae340 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java @@ -22,7 +22,7 @@ /** * Utility class that tracks the current state and the duration of time spent in each state. - * This class is threadsafe. + * This class is thread-safe. */ public class StateTracker { @@ -60,7 +60,7 @@ public State currentState() { /** * An immutable record of the accumulated times at the most recent state change. This class is required to - * efficiently make {@link StateTracker} threadsafe. + * efficiently make {@link StateTracker} thread-safe. */ private static final class StateChange { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 1f4e930ae5a46..424de8f3de5b1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -228,7 +228,7 @@ protected void iteration() { // Maybe commit if (!committing && (context.isCommitRequested() || now >= nextCommit)) { - commitOffsets(now, false); + commitOffsets(now); nextCommit = now + offsetCommitIntervalMs; context.clearCommitRequest(); } @@ -282,7 +282,7 @@ private void onCommitCompleted(Throwable error, long seqno, Map offsets, boolean cl } } - private void commitOffsets(long now, boolean closing) { - commitOffsets(now, closing, consumer.assignment()); + private void commitOffsets(long now) { + commitOffsets(now, false, consumer.assignment()); } private void commitOffsets(long now, boolean closing, Collection topicPartitions) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 0d0eba32d86c4..55cc097083d02 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -262,11 +262,11 @@ public boolean commitOffsets() { shouldFlush = offsetWriter.beginFlush(timeout - time.milliseconds(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.warn("{} Interrupted while waiting for previous offset flush to complete, cancelling", this); - recordCommitFailure(time.milliseconds() - started, e); + recordCommitFailure(time.milliseconds() - started); return false; } catch (TimeoutException e) { log.warn("{} Timed out while waiting for previous offset flush to complete, cancelling", this); - recordCommitFailure(time.milliseconds() - started, e); + recordCommitFailure(time.milliseconds() - started); return false; } if (!shouldFlush) { @@ -292,7 +292,7 @@ public boolean commitOffsets() { // any data if (flushFuture == null) { offsetWriter.cancelFlush(); - recordCommitFailure(time.milliseconds() - started, null); + recordCommitFailure(time.milliseconds() - started); return false; } try { @@ -304,17 +304,17 @@ public boolean commitOffsets() { } catch (InterruptedException e) { log.warn("{} Flush of offsets interrupted, cancelling", this); offsetWriter.cancelFlush(); - recordCommitFailure(time.milliseconds() - started, e); + recordCommitFailure(time.milliseconds() - started); return false; } catch (ExecutionException e) { log.error("{} Flush of offsets threw an unexpected exception: ", this, e); offsetWriter.cancelFlush(); - recordCommitFailure(time.milliseconds() - started, e); + recordCommitFailure(time.milliseconds() - started); return false; } catch (TimeoutException e) { log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this); offsetWriter.cancelFlush(); - recordCommitFailure(time.milliseconds() - started, null); + recordCommitFailure(time.milliseconds() - started); return false; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 98171fe47b6aa..9b70572fe24a7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -356,17 +356,16 @@ protected void recordActiveTopic(String topic) { * @param duration the length of time in milliseconds for the commit attempt to complete */ protected void recordCommitSuccess(long duration) { - taskMetricsGroup.recordCommit(duration, true, null); + taskMetricsGroup.recordCommit(duration, true); } /** * Record that offsets have been committed. * * @param duration the length of time in milliseconds for the commit attempt to complete - * @param error the unexpected error that occurred; may be null in the case of timeouts or interruptions */ - protected void recordCommitFailure(long duration, Throwable error) { - taskMetricsGroup.recordCommit(duration, false, error); + protected void recordCommitFailure(long duration) { + taskMetricsGroup.recordCommit(duration, false); } /** @@ -434,7 +433,7 @@ void close() { metricGroup.close(); } - void recordCommit(long duration, boolean success, Throwable error) { + void recordCommit(long duration, boolean success) { if (success) { commitTime.record(duration); commitAttempts.record(1.0d); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 69310a91bcc5f..ff7a9d3149d58 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1510,7 +1510,7 @@ public void restartConnectorAndTasks(RestartRequest request, Callback plan = buildRestartPlan(request); - if (!plan.isPresent()) { + if (plan.isEmpty()) { callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); } else { callback.onCompletion(null, plan.get().restartConnectorStateInfo()); @@ -1558,7 +1558,7 @@ void processRestartRequests() { protected synchronized void doRestartConnectorAndTasks(RestartRequest request) { String connectorName = request.connectorName(); Optional maybePlan = buildRestartPlan(request); - if (!maybePlan.isPresent()) { + if (maybePlan.isEmpty()) { log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request); return; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index a3982f070a433..c89eb33082fbe 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -89,8 +89,7 @@ public WorkerGroupMember(DistributedConfig config, .tags(metricsTags); List reporters = CommonClientConfigs.metricsReporters(clientId, config); - Map contextLabels = new HashMap<>(); - contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); + Map contextLabels = new HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, config.kafkaClusterId()); contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, config.getString(DistributedConfig.GROUP_ID_CONFIG)); MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java index ed09d4a37a005..56f559dc245e0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java @@ -20,7 +20,6 @@ import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.transforms.predicates.Predicate; -import java.io.PrintStream; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -53,29 +52,22 @@ private

    > DocInfo(Class

    predicateClass, String overvie .sorted(Comparator.comparing(docInfo -> docInfo.predicateName)) .collect(Collectors.toList()); - private static void printPredicateHtml(PrintStream out, DocInfo docInfo) { - out.println("

    "); - - out.print("
    "); - out.print("" + docInfo.predicateName + ""); - out.println("
    "); - - out.println(docInfo.overview); - - out.println("

    "); - - out.println(docInfo.configDef.toHtml(6, key -> docInfo.predicateName + "_" + key)); - - out.println("

    "); - } - - private static void printHtml(PrintStream out) { + private static String toHtml() { + StringBuilder b = new StringBuilder(); for (final DocInfo docInfo : PREDICATES) { - printPredicateHtml(out, docInfo); + b.append("
    \n"); + b.append("
    "); + b.append("" + docInfo.predicateName + ""); + b.append("
    \n"); + b.append(docInfo.overview + "\n"); + b.append("

    \n"); + b.append(docInfo.configDef.toHtml(6, key -> docInfo.predicateName + "_" + key) + "\n"); + b.append("

    \n"); } + return b.toString(); } public static void main(String... args) { - printHtml(System.out); + System.out.println(toHtml()); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index 2c7250eb588c3..100f938bd9b5d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -34,7 +34,6 @@ import org.apache.kafka.connect.transforms.TimestampRouter; import org.apache.kafka.connect.transforms.ValueToKey; -import java.io.PrintStream; import java.util.Arrays; import java.util.List; @@ -71,30 +70,23 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef) new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF) ); - private static void printTransformationHtml(PrintStream out, DocInfo docInfo) { - out.println("
    "); - - out.print("
    "); - out.print("" + docInfo.transformationName + ""); - out.println("
    "); - - out.println(docInfo.overview); - - out.println("

    "); - - out.println(docInfo.configDef.toHtml(6, key -> docInfo.transformationName + "_" + key)); - - out.println("

    "); - } - - private static void printHtml(PrintStream out) { + private static String toHtml() { + StringBuilder b = new StringBuilder(); for (final DocInfo docInfo : TRANSFORMATIONS) { - printTransformationHtml(out, docInfo); + b.append("
    \n"); + b.append("
    "); + b.append("" + docInfo.transformationName + ""); + b.append("
    \n"); + b.append(docInfo.overview + "\n"); + b.append("

    \n"); + b.append(docInfo.configDef.toHtml(6, key -> docInfo.transformationName + "_" + key) + "\n"); + b.append("

    \n"); } + return b.toString(); } public static void main(String... args) { - printHtml(System.out); + System.out.println(toHtml()); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java index c09eba62a2377..fd62fc172f4cf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java @@ -32,7 +32,7 @@ default void recordStage(Stage stage) { } default Callback chainStaging(Callback chained) { - return new Callback() { + return new Callback<>() { @Override public void recordStage(Stage stage) { Callback.this.recordStage(stage); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 23f6d8a9c4937..bcc9f94b1fb29 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -159,12 +159,12 @@ public KafkaBasedLog(String topic, * @param initializer the function that should be run when this log is {@link #start() started}; may be null */ public KafkaBasedLog(String topic, - Map producerConfigs, - Map consumerConfigs, - Supplier topicAdminSupplier, - Callback> consumedCallback, - Time time, - java.util.function.Consumer initializer) { + Map producerConfigs, + Map consumerConfigs, + Supplier topicAdminSupplier, + Callback> consumedCallback, + Time time, + java.util.function.Consumer initializer) { this.topic = topic; this.producerConfigs = producerConfigs; this.consumerConfigs = consumerConfigs; @@ -212,7 +212,7 @@ public static KafkaBasedLog withExistingClients(String topic, ) { Objects.requireNonNull(topicAdmin); Objects.requireNonNull(readTopicPartition); - return new KafkaBasedLog(topic, + return new KafkaBasedLog<>(topic, Collections.emptyMap(), Collections.emptyMap(), () -> topicAdmin, @@ -266,8 +266,8 @@ public void start(boolean reportErrorsToCallback) { // Then create the producer and consumer producer = Optional.ofNullable(createProducer()); - if (!producer.isPresent()) - log.trace("Creating read-only KafkaBasedLog for topic " + topic); + if (producer.isEmpty()) + log.trace("Creating read-only KafkaBasedLog for topic {}", topic); consumer = createConsumer(); List partitions = new ArrayList<>(); @@ -308,13 +308,13 @@ public void start(boolean reportErrorsToCallback) { thread = new WorkThread(); thread.start(); - log.info("Finished reading KafkaBasedLog for topic " + topic); + log.info("Finished reading KafkaBasedLog for topic {}", topic); - log.info("Started KafkaBasedLog for topic " + topic); + log.info("Started KafkaBasedLog for topic {}", topic); } public void stop() { - log.info("Stopping KafkaBasedLog for topic " + topic); + log.info("Stopping KafkaBasedLog for topic {}", topic); synchronized (this) { stopRequested = true; @@ -338,7 +338,7 @@ public void stop() { // do not close the admin client, since we don't own it admin = null; - log.info("Stopped KafkaBasedLog for topic " + topic); + log.info("Stopped KafkaBasedLog for topic {}", topic); } /** @@ -466,16 +466,16 @@ protected boolean readPartition(TopicPartition topicPartition) { return true; } - private void poll(long timeoutMs) { + private void poll() { try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(timeoutMs)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); for (ConsumerRecord record : records) consumedCallback.onCompletion(null, record); } catch (WakeupException e) { // Expected on get() or stop(). The calling code should handle this throw e; } catch (KafkaException e) { - log.error("Error polling: " + e); + log.error("Error polling: ", e); if (reportErrorsToCallback) { consumedCallback.onCompletion(e, null); } @@ -507,7 +507,7 @@ private void readToLogEnd(boolean shouldRetry) { } else { log.trace("Behind end offset {} for {}; last-read offset is {}", endOffset, topicPartition, lastConsumedOffset); - poll(Integer.MAX_VALUE); + poll(); break; } } @@ -609,7 +609,7 @@ public void run() { } try { - poll(Integer.MAX_VALUE); + poll(); } catch (WakeupException e) { // See previous comment, both possible causes of this wakeup are handled by starting this loop again continue; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index a0a68e0e81e76..cb8d51c0a43fd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -59,7 +59,7 @@ public static T retryUntilTimeout(Callable callable, Supplier des // visible for testing static T retryUntilTimeout(Callable callable, Supplier description, Duration timeoutDuration, long retryBackoffMs, Time time) throws Exception { - // if null supplier or string is provided, the message will be default to "callabe" + // if null supplier or string is provided, the message will be default to "callable" final String descriptionStr = Optional.ofNullable(description) .map(Supplier::get) .orElse("callable"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java index 0ad3889b5f0ea..df2da55278005 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java @@ -20,29 +20,21 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectStreamClass; -import java.util.Collections; -import java.util.HashSet; import java.util.Set; public class SafeObjectInputStream extends ObjectInputStream { - protected static final Set DEFAULT_NO_DESERIALIZE_CLASS_NAMES; - - static { - - Set s = new HashSet<>(); - s.add("org.apache.commons.collections.functors.InvokerTransformer"); - s.add("org.apache.commons.collections.functors.InstantiateTransformer"); - s.add("org.apache.commons.collections4.functors.InvokerTransformer"); - s.add("org.apache.commons.collections4.functors.InstantiateTransformer"); - s.add("org.codehaus.groovy.runtime.ConvertedClosure"); - s.add("org.codehaus.groovy.runtime.MethodClosure"); - s.add("org.springframework.beans.factory.ObjectFactory"); - s.add("com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl"); - s.add("org.apache.xalan.xsltc.trax.TemplatesImpl"); - DEFAULT_NO_DESERIALIZE_CLASS_NAMES = Collections.unmodifiableSet(s); - } - + protected static final Set DEFAULT_NO_DESERIALIZE_CLASS_NAMES = Set.of( + "org.apache.commons.collections.functors.InvokerTransformer", + "org.apache.commons.collections.functors.InstantiateTransformer", + "org.apache.commons.collections4.functors.InvokerTransformer", + "org.apache.commons.collections4.functors.InstantiateTransformer", + "org.codehaus.groovy.runtime.ConvertedClosure", + "org.codehaus.groovy.runtime.MethodClosure", + "org.springframework.beans.factory.ObjectFactory", + "com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl", + "org.apache.xalan.xsltc.trax.TemplatesImpl" + ); public SafeObjectInputStream(InputStream in) throws IOException { super(in); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 0974f35d16c71..f4374d18500ea 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -74,8 +74,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -408,15 +406,6 @@ private void assertErrorHandlingMetricValue(String name, double expected) { assertEquals(expected, measured, 0.001d); } - private void verifyCloseSource() throws IOException { - verify(producer).close(any(Duration.class)); - verify(admin).close(any(Duration.class)); - verify(offsetReader).close(); - verify(offsetStore).stop(); - // headerConverter.close() can throw IOException - verify(headerConverter).close(); - } - private void expectTopicCreation(String topic) { if (enableTopicCreation) { when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 14e29cded9ca6..65262983d9f8b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -2149,7 +2149,7 @@ public void testAlterOffsetsSourceConnector(boolean enableTopicCreation) throws @ParameterizedTest @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void testAlterOffsetsSourceConnectorError(boolean enableTopicCreation) throws Exception { + public void testAlterOffsetsSourceConnectorError(boolean enableTopicCreation) { setup(enableTopicCreation); mockKafkaClusterId(); mockInternalConverters(); @@ -2188,7 +2188,7 @@ public void testAlterOffsetsSourceConnectorError(boolean enableTopicCreation) th @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testNormalizeSourceConnectorOffsets(boolean enableTopicCreation) throws Exception { + public void testNormalizeSourceConnectorOffsets(boolean enableTopicCreation) { setup(enableTopicCreation); Map, Map> offsets = Collections.singletonMap( Collections.singletonMap("filename", "/path/to/filename"), @@ -2334,7 +2334,7 @@ private void alterOffsetsSinkConnector(Map, Map> parti @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean enableTopicCreation) throws Exception { + public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean enableTopicCreation) { setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); @@ -2375,7 +2375,7 @@ public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean enableTopicCr @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean enableTopicCreation) throws Exception { + public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean enableTopicCreation) { setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); @@ -2426,7 +2426,7 @@ public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean enableTopicC @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testAlterOffsetsSinkConnectorSynchronousError(boolean enableTopicCreation) throws Exception { + public void testAlterOffsetsSinkConnectorSynchronousError(boolean enableTopicCreation) { setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); @@ -2557,7 +2557,7 @@ public void testResetOffsetsSinkConnector(boolean enableTopicCreation) throws Ex @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean enableTopicCreation) throws Exception { + public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean enableTopicCreation) { setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); @@ -2594,7 +2594,7 @@ public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean enable @ParameterizedTest @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void testModifySourceConnectorOffsetsTimeout(boolean enableTopicCreation) throws Exception { + public void testModifySourceConnectorOffsetsTimeout(boolean enableTopicCreation) { setup(enableTopicCreation); mockKafkaClusterId(); Time time = new MockTime(); @@ -2630,7 +2630,7 @@ public void testModifySourceConnectorOffsetsTimeout(boolean enableTopicCreation) @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testModifyOffsetsSinkConnectorTimeout(boolean enableTopicCreation) throws Exception { + public void testModifyOffsetsSinkConnectorTimeout(boolean enableTopicCreation) { setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java index 462d02f3e6d8e..06b0e3fb55cc4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java @@ -17,7 +17,6 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment; -import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState; import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -30,35 +29,11 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; public class WorkerTestUtils { - public static WorkerLoad emptyWorkerLoad(String worker) { - return new WorkerLoad.Builder(worker).build(); - } - - public WorkerLoad workerLoad(String worker, int connectorStart, int connectorNum, - int taskStart, int taskNum) { - return new WorkerLoad.Builder(worker).with( - newConnectors(connectorStart, connectorStart + connectorNum), - newTasks(taskStart, taskStart + taskNum)).build(); - } - - public static List newConnectors(int start, int end) { - return IntStream.range(start, end) - .mapToObj(i -> "connector" + i) - .collect(Collectors.toList()); - } - - public static List newTasks(int start, int end) { - return IntStream.range(start, end) - .mapToObj(i -> new ConnectorTaskId("task", i)) - .collect(Collectors.toList()); - } - public static ClusterConfigState clusterConfigState(long offset, int connectorNum, int taskNum) { @@ -82,24 +57,6 @@ public static ClusterConfigState clusterConfigState(long offset, Collections.emptySet()); } - public static Map memberConfigs(String givenLeader, - long givenOffset, - Map givenAssignments) { - return givenAssignments.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, e.getValue()))); - } - - public static Map memberConfigs(String givenLeader, - long givenOffset, - int start, - int connectorNum) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("worker" + i, new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null))) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); - } - public static Map connectorTaskCounts(int start, int connectorNum, int taskCounts) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java index f76f0585f1aff..e58444ccd4d77 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java @@ -75,8 +75,6 @@ public void testListenersConfigAllowedValues() { props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812"); config = RestServerConfig.forPublic(null, props); assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.listeners()); - - config = RestServerConfig.forPublic(null, props); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 392d082d3305e..700284a9c66ee 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -972,9 +972,4 @@ private Stubber expectAndCallbackNotLeaderException(final ArgumentCaptor { - T run() throws Throwable; - } - } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index d0968db29ced2..4173d9a357c45 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -164,7 +164,6 @@ public class KafkaConfigBackingStoreTest { new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)) ); - private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED"); private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1) .put("state", "PAUSED") .put("state.v2", "PAUSED"); @@ -1658,12 +1657,6 @@ private void expectRead(LinkedHashMap serializedValues, } } - private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) { - LinkedHashMap serializedData = new LinkedHashMap<>(); - serializedData.put(key, serializedValue); - expectRead(serializedData, Collections.singletonMap(key, deserializedValue)); - } - // This map needs to maintain ordering private Answer> expectReadToEnd(final Map serializedConfigs) { return invocation -> { From b8dadb741c1a6f6c44d8e058ff5863f3e29528d5 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 10 Dec 2024 12:34:09 -0500 Subject: [PATCH 07/13] MINOR Add PR triage workflow (#17881) Automatically adds a "triage" label to PRs from the community. After 7 days, if no review has been made and the "triage" label is still present, a "needs-attention" label is added. Reviewers: Chia-Ping Tsai , Mickael Maison --- .github/workflows/README.md | 48 ++++++++++++++++++++ .github/workflows/pr-reviewed-trigger.yml | 42 ++++++++++++++++++ .github/workflows/pr-reviewed.yml | 53 +++++++++++++++++++++++ .github/workflows/pr-update.yml | 25 ++++++++++- .github/workflows/stale.yml | 16 +++++++ 5 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/pr-reviewed-trigger.yml create mode 100644 .github/workflows/pr-reviewed.yml diff --git a/.github/workflows/README.md b/.github/workflows/README.md index f921ad78393ca..26f22cb27414d 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -51,6 +51,54 @@ using this for very simple tasks such as applying labels or adding comments to P _We must never run the untrusted PR code in the elevated `pull_request_target` context_ +## Our Workflows + +### Trunk Build + +The [ci.yml](ci.yml) is run when commits are pushed to trunk. This calls into [build.yml](build.yml) +to run our main build. In the trunk build, we do not read from the Gradle cache, +but we do write to it. Also, the test catalog is only updated from trunk builds. + +### PR Build + +Similar to trunk, this workflow starts in [ci.yml](ci.yml) and calls into [build.yml](build.yml). +Unlike trunk, the PR builds _will_ utilize the Gradle cache. + +### PR Triage + +In order to get the attention of committers, we have a triage workflow for Pull Requests +opened by non-committers. This workflow consists of three files: + +* [pr-update.yml](pr-update.yml) When a PR is created add the `triage` label if the PR + was opened by a non-committer. +* [pr-reviewed-trigger.yml](pr-reviewed-trigger.yml) Runs when any PR is reviewed. + Used as a trigger for the next workflow +* [pr-reviewed.yml](pr-reviewed.yml) Remove the `triage` label after a PR has been reviewed + +_The pr-update.yml workflow includes pull_request_target!_ + +### CI Approved + +Due to a combination of GitHub security and ASF's policy, we required explicit +approval of workflows on PRs submitted by non-committers (and non-contributors). +To simply this process, we have a `ci-approved` label which automatically approves +these workflows. + +There are two files related to this workflow: + +* [pr-labeled.yml](pr-labeled.yml) approves a pending approval for PRs that have +been labeled with `ci-approved` +* [ci-requested.yml](ci-requested.yml) approves future CI requests automatically +if the PR has the `ci-approved` label + +_The pr-labeled.yml workflow includes pull_request_target!_ + +### Stale PRs + +This one is straightforward. Using the "actions/stale" GitHub Action, we automatically +label and eventually close PRs which have not had activity for some time. See the +[stale.yml](stale.yml) workflow file for specifics. + ## GitHub Actions Quirks ### Composite Actions diff --git a/.github/workflows/pr-reviewed-trigger.yml b/.github/workflows/pr-reviewed-trigger.yml new file mode 100644 index 0000000000000..f089176ff4b23 --- /dev/null +++ b/.github/workflows/pr-reviewed-trigger.yml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Pull Request Reviewed + +on: + pull_request_review: + types: + - submitted + +jobs: + # This job is a workaround for the fact that pull_request_review lacks necessary permissions to modify PRs. + # Also, there is no pull_request_target analog to pull_request_review. The approach taken here is taken from + # https://securitylab.github.com/resources/github-actions-preventing-pwn-requests/. + pr-review-trigger: + name: Reviewed + runs-on: ubuntu-latest + steps: + - name: Env + run: printenv + env: + GITHUB_CONTEXT: ${{ toJson(github) }} + - name: Capture PR Number + run: + echo ${{ github.event.pull_request.number }} >> pr-number.txt + - name: Archive Event + uses: actions/upload-artifact@v4 + with: + name: pr-number.txt + path: pr-number.txt diff --git a/.github/workflows/pr-reviewed.yml b/.github/workflows/pr-reviewed.yml new file mode 100644 index 0000000000000..2f6cae8a4fe97 --- /dev/null +++ b/.github/workflows/pr-reviewed.yml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Remove Triage Label + +on: + workflow_run: + workflows: [Pull Request Reviewed] + types: + - completed + +jobs: + # This job runs with elevated permissions and the ability to modify pull requests. The steps taken here + # should be limited to updating labels and adding comments to PRs. This approach is taken from + # https://securitylab.github.com/resources/github-actions-preventing-pwn-requests/. + remove-triage: + if: ${{ github.event.workflow_run.conclusion == 'success' }} + runs-on: ubuntu-latest + steps: + - name: Env + run: printenv + env: + GITHUB_CONTEXT: ${{ toJson(github) }} + - uses: actions/download-artifact@v4 + with: + github-token: ${{ github.token }} + run-id: ${{ github.event.workflow_run.id }} + name: pr-number.txt + - name: Remove label + uses: actions/github-script@v7 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + var fs = require('fs'); + var pr_number = Number(fs.readFileSync('./pr-number.txt')); + await github.rest.issues.removeLabel({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr_number, + name: 'triage' + }); diff --git a/.github/workflows/pr-update.yml b/.github/workflows/pr-update.yml index 31e0038705499..e1cd7214d6c36 100644 --- a/.github/workflows/pr-update.yml +++ b/.github/workflows/pr-update.yml @@ -25,9 +25,11 @@ on: # * https://securitylab.github.com/resources/github-actions-preventing-pwn-requests/ pull_request_target: types: [opened, reopened, synchronize] + branches: + - trunk jobs: - label_PRs: + add-labeler-labels: name: Labeler permissions: contents: read @@ -45,3 +47,24 @@ jobs: PR_NUM: ${{github.event.number}} run: | ./.github/scripts/label_small.sh + + add-triage-label: + if: github.event.action == 'opened' || github.event.action == 'reopened' + name: Add triage label + runs-on: ubuntu-latest + permissions: + pull-requests: write + steps: + - name: Env + run: printenv + env: + GITHUB_CONTEXT: ${{ toJson(github) }} + # If the PR is from a non-committer, add triage label + - if: | + github.event.pull_request.author_association != 'MEMBER' && + github.event.pull_request.author_association != 'OWNER' + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GH_REPO: ${{ github.repository }} + NUMBER: ${{ github.event.pull_request.number }} + run: gh pr edit "$NUMBER" --add-label triage diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index 9382d4173e94c..6ceb074f62c10 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -35,6 +35,22 @@ permissions: pull-requests: write jobs: + needs-attention: + runs-on: ubuntu-latest + steps: + - uses: actions/stale@v9 + with: + debug-only: ${{ inputs.dryRun || false }} + operations-per-run: ${{ inputs.operationsPerRun || 500 }} + days-before-stale: 7 + days-before-close: -1 + ignore-pr-updates: true + only-pr-labels: 'triage' + stale-pr-label: 'needs-attention' + stale-pr-message: | + A label of 'needs-attention' was automatically added to this PR in order to raise the + attention of the committers. Once this issue has been triaged, the `triage` label + should be removed to prevent this automation from happening again. stale: runs-on: ubuntu-latest steps: From 7a31b9eae8192dc14779d3e64587988b019cfb6e Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Tue, 10 Dec 2024 12:38:25 -0500 Subject: [PATCH 08/13] Add null check (#18119) Reviewers: Andrew Schofield --- .../consumer/internals/ShareConsumeRequestManager.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index eda01ee3599f9..6aa334d487bde 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -992,12 +992,12 @@ UnsentRequest buildRequest() { ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig); isProcessed = false; + Node nodeToSend = metadata.fetch().nodeById(nodeId); if (requestBuilder == null) { handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND); return null; - } else { - Node nodeToSend = metadata.fetch().nodeById(nodeId); + } else if (nodeToSend != null) { nodesWithPendingRequests.add(nodeId); log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend); @@ -1019,6 +1019,8 @@ UnsentRequest buildRequest() { }; return unsentRequest.whenComplete(responseHandler); } + + return null; } int getInFlightAcknowledgementsCount(TopicIdPartition tip) { From dadc7cc477041ec6bc2534c19327eb7c1eccde22 Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:11:32 -0500 Subject: [PATCH 09/13] update test configs (#18123) Reviewers: Andrew Schofield , Kirk True --- .../kafka/clients/consumer/ConsumerPartitionAssignorTest.java | 1 + .../consumer/internals/ConsumerHeartbeatRequestManagerTest.java | 1 - .../consumer/internals/ShareHeartbeatRequestManagerTest.java | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java index ab33c8f45d7fd..b4f649de579ae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java @@ -150,6 +150,7 @@ private ConsumerConfig initConsumerConfigWithClassTypes(List classTypes) props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes); + props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()); return new ConsumerConfig(props); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java index 8fc23f6255acf..9bc5d66d9c3a0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java @@ -1068,7 +1068,6 @@ private ConsumerConfig config() { prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS)); prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MS)); prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MAX_MS)); - prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_HEARTBEAT_INTERVAL_MS)); return new ConsumerConfig(prop); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java index 549720766858d..f9e46571795bc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java @@ -655,7 +655,6 @@ private ConsumerConfig config() { prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS)); prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MS)); prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MAX_MS)); - prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_HEARTBEAT_INTERVAL_MS)); return new ConsumerConfig(prop); } From d9a71e083bcb20adee5469e8fc0e16ba39d95f82 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 10 Dec 2024 15:33:37 -0500 Subject: [PATCH 10/13] MINOR add note about GitHub org visibility (#18128) Reviewers: Mickael Maison --- .github/workflows/README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/workflows/README.md b/.github/workflows/README.md index 26f22cb27414d..24116f32dd630 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -77,6 +77,18 @@ opened by non-committers. This workflow consists of three files: _The pr-update.yml workflow includes pull_request_target!_ +For committers to avoid having this label added, their membership in the ASF GitHub +organization must be public. Here are the steps to take: + +* Navigate to the ASF organization's "People" page https://github.com/orgs/apache/people +* Find yourself +* Change "Organization Visibility" to Public + +Full documentation for this process can be found in GitHub's docs: https://docs.github.com/en/account-and-profile/setting-up-and-managing-your-personal-account-on-github/managing-your-membership-in-organizations/publicizing-or-hiding-organization-membership + +If you are a committer and do not want your membership in the ASF org listed as public, +you will need to remove the `triage` label manually. + ### CI Approved Due to a combination of GitHub security and ASF's policy, we required explicit From 3cf8745243026b83d97fbc80f2b1fe03f1455701 Mon Sep 17 00:00:00 2001 From: Justine Olshan Date: Tue, 10 Dec 2024 12:59:01 -0800 Subject: [PATCH 11/13] MINOR: Add clientTransactionVersion to AddPartitionsToTxn requests and persist the value across transitions (#18086) We can better keep track of which transactions use TV_2 by storing this information in the clientTransactionVersion field and persisting it across state transitions. Also updated some logging and equality code to include this information. Added a test to ensure version persists. There aren't many TV2 transitions that don't specify TV, but I did test the InitProducerId + epoch overflow case. Reviewers: Artem Livshits , Jeff Kim --- .../transaction/TransactionCoordinator.scala | 3 ++- .../transaction/TransactionMetadata.scala | 27 ++++++++++--------- .../main/scala/kafka/server/KafkaApis.scala | 7 +++-- ...ransactionCoordinatorConcurrencyTest.scala | 1 + .../TransactionCoordinatorTest.scala | 18 ++++++------- .../transaction/TransactionMetadataTest.scala | 4 +-- .../TransactionStateManagerTest.scala | 26 +++++++++--------- .../unit/kafka/server/KafkaApisTest.scala | 3 +++ .../server/common/TransactionVersion.java | 13 +++++++++ 9 files changed, 61 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 7bc3c03391c7d..67d3d3d3624a9 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -391,6 +391,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig, producerEpoch: Short, partitions: collection.Set[TopicPartition], responseCallback: AddPartitionsCallback, + clientTransactionVersion: TransactionVersion, requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { if (transactionalId == null || transactionalId.isEmpty) { debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request") @@ -420,7 +421,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig, // this is an optimization: if the partitions are already in the metadata reply OK immediately Left(Errors.NONE) } else { - Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds())) + Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds(), clientTransactionVersion)) } } } diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index dc52ea134020a..31daebac76391 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -255,7 +255,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, def prepareNoTransit(): TxnTransitMetadata = { // do not call transitTo as it will set the pending state, a follow-up call to abort the transaction will set its pending state TxnTransitMetadata(producerId, previousProducerId, nextProducerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, state, topicPartitions.toSet, - txnStartTimestamp, txnLastUpdateTimestamp, TransactionVersion.TV_0) + txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion) } def prepareFenceProducerEpoch(): TxnTransitMetadata = { @@ -267,7 +267,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else (producerEpoch + 1).toShort prepareTransitionTo(PrepareEpochFence, producerId, bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, - topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp) + topicPartitions.toSet, txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion) } def prepareIncrementProducerEpoch(newTxnTimeoutMs: Int, @@ -306,7 +306,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, epochBumpResult match { case Right((nextEpoch, lastEpoch)) => Right(prepareTransitionTo(Empty, producerId, nextEpoch, lastEpoch, newTxnTimeoutMs, - immutable.Set.empty[TopicPartition], -1, updateTimestamp)) + immutable.Set.empty[TopicPartition], -1, updateTimestamp, clientTransactionVersion)) case Left(err) => Left(err) } @@ -320,17 +320,17 @@ private[transaction] class TransactionMetadata(val transactionalId: String, throw new IllegalStateException("Cannot rotate producer ids while a transaction is still pending") prepareTransitionTo(Empty, newProducerId, 0, if (recordLastEpoch) producerEpoch else RecordBatch.NO_PRODUCER_EPOCH, - newTxnTimeoutMs, immutable.Set.empty[TopicPartition], -1, updateTimestamp) + newTxnTimeoutMs, immutable.Set.empty[TopicPartition], -1, updateTimestamp, clientTransactionVersion) } - def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = { + def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long, clientTransactionVersion: TransactionVersion): TxnTransitMetadata = { val newTxnStartTimestamp = state match { case Empty | CompleteAbort | CompleteCommit => updateTimestamp case _ => txnStartTimestamp } prepareTransitionTo(Ongoing, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, - (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, updateTimestamp) + (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, updateTimestamp, clientTransactionVersion) } def prepareAbortOrCommit(newState: TransactionState, clientTransactionVersion: TransactionVersion, nextProducerId: Long, updateTimestamp: Long, noPartitionAdded: Boolean): TxnTransitMetadata = { @@ -371,7 +371,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, def prepareDead(): TxnTransitMetadata = { prepareTransitionTo(Dead, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition], - txnStartTimestamp, txnLastUpdateTimestamp) + txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion) } /** @@ -394,8 +394,9 @@ private[transaction] class TransactionMetadata(val transactionalId: String, updatedTxnTimeoutMs: Int, updatedTopicPartitions: immutable.Set[TopicPartition], updatedTxnStartTimestamp: Long, - updateTimestamp: Long): TxnTransitMetadata = { - prepareTransitionTo(updatedState, updatedProducerId, RecordBatch.NO_PRODUCER_ID, updatedEpoch, updatedLastEpoch, updatedTxnTimeoutMs, updatedTopicPartitions, updatedTxnStartTimestamp, updateTimestamp, TransactionVersion.TV_0) + updateTimestamp: Long, + clientTransactionVersion: TransactionVersion): TxnTransitMetadata = { + prepareTransitionTo(updatedState, updatedProducerId, RecordBatch.NO_PRODUCER_ID, updatedEpoch, updatedLastEpoch, updatedTxnTimeoutMs, updatedTopicPartitions, updatedTxnStartTimestamp, updateTimestamp, clientTransactionVersion) } private def prepareTransitionTo(updatedState: TransactionState, @@ -613,7 +614,8 @@ private[transaction] class TransactionMetadata(val transactionalId: String, s"pendingState=$pendingState, " + s"topicPartitions=$topicPartitions, " + s"txnStartTimestamp=$txnStartTimestamp, " + - s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp)" + s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp, " + + s"clientTransactionVersion=$clientTransactionVersion)" } override def equals(that: Any): Boolean = that match { @@ -626,13 +628,14 @@ private[transaction] class TransactionMetadata(val transactionalId: String, state.equals(other.state) && topicPartitions.equals(other.topicPartitions) && txnStartTimestamp == other.txnStartTimestamp && - txnLastUpdateTimestamp == other.txnLastUpdateTimestamp + txnLastUpdateTimestamp == other.txnLastUpdateTimestamp && + clientTransactionVersion == other.clientTransactionVersion case _ => false } override def hashCode(): Int = { val fields = Seq(transactionalId, producerId, producerEpoch, txnTimeoutMs, state, topicPartitions, - txnStartTimestamp, txnLastUpdateTimestamp) + txnStartTimestamp, txnLastUpdateTimestamp, clientTransactionVersion) fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7a5ac37f0a3e4..ba5eef40e5cec 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2328,14 +2328,11 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, createResponse) } - // If the request is greater than version 4, we know the client supports transaction version 2. - val clientTransactionVersion = if (endTxnRequest.version() > 4) TransactionVersion.TV_2 else TransactionVersion.TV_0 - txnCoordinator.handleEndTransaction(endTxnRequest.data.transactionalId, endTxnRequest.data.producerId, endTxnRequest.data.producerEpoch, endTxnRequest.result(), - clientTransactionVersion, + TransactionVersion.transactionVersionForEndTxn(endTxnRequest), sendResponseCallback, requestLocal) } else @@ -2614,6 +2611,7 @@ class KafkaApis(val requestChannel: RequestChannel, transaction.producerEpoch, authorizedPartitions, sendResponseCallback, + TransactionVersion.transactionVersionForAddPartitionsToTxn(addPartitionsToTxnRequest), requestLocal) } else { txnCoordinator.handleVerifyPartitionsInTransaction(transactionalId, @@ -2673,6 +2671,7 @@ class KafkaApis(val requestChannel: RequestChannel, addOffsetsToTxnRequest.data.producerEpoch, Set(offsetTopicPartition), sendResponseCallback, + TransactionVersion.TV_0, // This request will always come from the client not using TV 2. requestLocal) } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index f446eb2bfb2ef..28019efc0c6a4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -547,6 +547,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren txnMetadata.producerEpoch, partitions, resultCallback, + TransactionVersion.TV_2, RequestLocal.withThreadConfinedCaching) replicaManager.tryCompleteActions() } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index f3302b1293531..ab5ff72cd982b 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -209,19 +209,19 @@ class TransactionCoordinatorTest { when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) .thenReturn(Right(None)) - coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0) assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error) } @Test def shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty(): Unit = { - coordinator.handleAddPartitionsToTransaction("", 0L, 1, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction("", 0L, 1, partitions, errorsCallback, TV_0) assertEquals(Errors.INVALID_REQUEST, error) } @Test def shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsNull(): Unit = { - coordinator.handleAddPartitionsToTransaction(null, 0L, 1, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction(null, 0L, 1, partitions, errorsCallback, TV_0) assertEquals(Errors.INVALID_REQUEST, error) } @@ -230,7 +230,7 @@ class TransactionCoordinatorTest { when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) .thenReturn(Left(Errors.NOT_COORDINATOR)) - coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0) assertEquals(Errors.NOT_COORDINATOR, error) } @@ -239,7 +239,7 @@ class TransactionCoordinatorTest { when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) .thenReturn(Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)) - coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback, TV_0) assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, error) } @@ -313,7 +313,7 @@ class TransactionCoordinatorTest { new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID, 0, RecordBatch.NO_PRODUCER_EPOCH, 0, state, mutable.Set.empty, 0, 0, TV_2))))) - coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_2) assertEquals(Errors.CONCURRENT_TRANSACTIONS, error) } @@ -325,7 +325,7 @@ class TransactionCoordinatorTest { new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID, 10, 9, 0, PrepareCommit, mutable.Set.empty, 0, 0, TV_2))))) - coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_2) assertEquals(Errors.PRODUCER_FENCED, error) } @@ -359,7 +359,7 @@ class TransactionCoordinatorTest { when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))) - coordinator.handleAddPartitionsToTransaction(transactionalId, producerId, producerEpoch, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction(transactionalId, producerId, producerEpoch, partitions, errorsCallback, clientTransactionVersion) verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId)) verify(transactionManager).appendTransactionToLog( @@ -379,7 +379,7 @@ class TransactionCoordinatorTest { new TransactionMetadata(transactionalId, 0, 0, RecordBatch.NO_PRODUCER_ID, 0, RecordBatch.NO_PRODUCER_EPOCH, 0, Empty, partitions, 0, 0, TV_0))))) - coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback) + coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback, TV_0) assertEquals(Errors.NONE, error) verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId)) } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala index 2da9c96fa2077..6b2d20e69eb65 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala @@ -253,7 +253,7 @@ class TransactionMetadataTest { clientTransactionVersion = TV_0) // let new time be smaller; when transiting from Empty the start time would be updated to the update-time - var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1) + var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1, TV_0) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions) assertEquals(producerId, txnMetadata.producerId) @@ -263,7 +263,7 @@ class TransactionMetadataTest { assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp) // add another partition, check that in Ongoing state the start timestamp would not change to update time - transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2) + transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2, TV_0) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)), txnMetadata.topicPartitions) assertEquals(producerId, txnMetadata.producerId) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 78da50f782bc8..d12df190c8bdf 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -389,7 +389,7 @@ class TransactionStateManagerTest { // update the metadata to ongoing with two partitions val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0), - new TopicPartition("topic1", 1)), time.milliseconds()) + new TopicPartition("topic1", 1)), time.milliseconds(), TV_0) // append the new metadata into log transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching) @@ -404,7 +404,7 @@ class TransactionStateManagerTest { transactionManager.putTransactionStateIfNotExists(txnMetadata1) expectedError = Errors.COORDINATOR_NOT_AVAILABLE - var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION) val requestLocal = RequestLocal.withThreadConfinedCaching @@ -412,19 +412,19 @@ class TransactionStateManagerTest { assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) - failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) - failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) - failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) @@ -437,7 +437,7 @@ class TransactionStateManagerTest { transactionManager.putTransactionStateIfNotExists(txnMetadata1) expectedError = Errors.NOT_COORDINATOR - var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER) val requestLocal = RequestLocal.withThreadConfinedCaching @@ -445,7 +445,7 @@ class TransactionStateManagerTest { assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) - failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.NONE) transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) @@ -468,7 +468,7 @@ class TransactionStateManagerTest { transactionManager.putTransactionStateIfNotExists(txnMetadata1) expectedError = Errors.COORDINATOR_LOAD_IN_PROGRESS - val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.NONE) transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch) @@ -482,7 +482,7 @@ class TransactionStateManagerTest { transactionManager.putTransactionStateIfNotExists(txnMetadata1) expectedError = Errors.UNKNOWN_SERVER_ERROR - var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE) val requestLocal = RequestLocal.withThreadConfinedCaching @@ -490,7 +490,7 @@ class TransactionStateManagerTest { assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) - failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) @@ -503,7 +503,7 @@ class TransactionStateManagerTest { transactionManager.putTransactionStateIfNotExists(txnMetadata1) expectedError = Errors.COORDINATOR_NOT_AVAILABLE - val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds(), TV_0) prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true, RequestLocal.withThreadConfinedCaching) @@ -522,7 +522,7 @@ class TransactionStateManagerTest { expectedError = Errors.NOT_COORDINATOR val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0), - new TopicPartition("topic1", 1)), time.milliseconds()) + new TopicPartition("topic1", 1)), time.milliseconds(), TV_0) // modify the cache while trying to append the new metadata txnMetadata1.producerEpoch = (txnMetadata1.producerEpoch + 1).toShort @@ -541,7 +541,7 @@ class TransactionStateManagerTest { expectedError = Errors.INVALID_PRODUCER_EPOCH val newMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0), - new TopicPartition("topic1", 1)), time.milliseconds()) + new TopicPartition("topic1", 1)), time.milliseconds(), TV_0) // modify the cache while trying to append the new metadata txnMetadata1.pendingState = None diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a009b2f10a242..a36598a5eebca 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -2301,6 +2301,7 @@ class KafkaApisTest extends Logging { ArgumentMatchers.eq(epoch), ArgumentMatchers.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))), responseCallback.capture(), + ArgumentMatchers.eq(TransactionVersion.TV_0), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) val kafkaApis = createKafkaApis() @@ -2359,6 +2360,7 @@ class KafkaApisTest extends Logging { ArgumentMatchers.eq(epoch), ArgumentMatchers.eq(Set(topicPartition)), responseCallback.capture(), + ArgumentMatchers.eq(TransactionVersion.TV_0), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) val kafkaApis = createKafkaApis() @@ -2434,6 +2436,7 @@ class KafkaApisTest extends Logging { ArgumentMatchers.eq(epoch), ArgumentMatchers.eq(Set(tp0)), responseCallback.capture(), + any[TransactionVersion], ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.NONE)) diff --git a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java index 069440d35c9b9..45546c447b036 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.server.common; +import org.apache.kafka.common.requests.AddPartitionsToTxnRequest; +import org.apache.kafka.common.requests.EndTxnRequest; + import java.util.Collections; import java.util.Map; @@ -55,6 +58,16 @@ public static TransactionVersion fromFeatureLevel(short version) { return (TransactionVersion) Feature.TRANSACTION_VERSION.fromFeatureLevel(version, true); } + public static TransactionVersion transactionVersionForAddPartitionsToTxn(AddPartitionsToTxnRequest request) { + // If the request is greater than version 3, we know the client supports transaction version 2. + return request.version() > 3 ? TV_2 : TV_0; + } + + public static TransactionVersion transactionVersionForEndTxn(EndTxnRequest request) { + // If the request is greater than version 4, we know the client supports transaction version 2. + return request.version() > 4 ? TV_2 : TV_0; + } + @Override public String featureName() { return FEATURE_NAME; From 57737a357fad495964c1aefc593b67d107098d38 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 11 Dec 2024 08:17:32 +0100 Subject: [PATCH 12/13] KAFKA-18188; Admin LeaveGroup should allow removing member using consumer protocol by member id (#18116) The LeaveGroup API is used by the admin client to remove static members or remove all members from the group. The latter does not work because the API does not allow removing a member using the CONSUMER protocol by member id. Moreover, the response should only include the member id if the member id was included in the request. This patch fixes both issues. Reviewers: Dongnuo Lyu , Christo Lolov , Jeff Kim --- .../kafka/server/LeaveGroupRequestTest.scala | 49 +++++++++++++----- .../group/GroupMetadataManager.java | 42 +++++++-------- .../group/GroupMetadataManagerTest.java | 51 +++++++++++-------- 3 files changed, 83 insertions(+), 59 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala index 84e609fcd9202..4cc3f968d2769 100644 --- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala @@ -49,29 +49,50 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa numPartitions = 3 ) + def instanceId(memberId: String): String = "instance_" + memberId + val memberIds = Range(0, 3).map { __ => + Uuid.randomUuid().toString + } + for (version <- 3 to ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) { - val memberId = Uuid.randomUuid().toString - assertEquals(Errors.NONE.code, consumerGroupHeartbeat( - groupId = "group", - memberId = memberId, - memberEpoch = 0, - instanceId = "instance-id", - rebalanceTimeoutMs = 5 * 60 * 1000, - subscribedTopicNames = List("foo"), - topicPartitions = List.empty, - ).errorCode) + // Join with all the members. + memberIds.foreach { memberId => + assertEquals(Errors.NONE.code, consumerGroupHeartbeat( + groupId = "group", + memberId = memberId, + memberEpoch = 0, + instanceId = instanceId(memberId), + rebalanceTimeoutMs = 5 * 60 * 1000, + subscribedTopicNames = List("foo"), + topicPartitions = List.empty, + ).errorCode) + } assertEquals( new LeaveGroupResponseData() .setMembers(List( new LeaveGroupResponseData.MemberResponse() - .setMemberId(memberId) - .setGroupInstanceId("instance-id") + .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) + .setGroupInstanceId(instanceId(memberIds(0))), + new LeaveGroupResponseData.MemberResponse() + .setMemberId(memberIds(1)) + .setGroupInstanceId(instanceId(memberIds(1))), + new LeaveGroupResponseData.MemberResponse() + .setMemberId(memberIds(2)) + .setGroupInstanceId(null) ).asJava), classicLeaveGroup( groupId = "group", - memberIds = List(JoinGroupRequest.UNKNOWN_MEMBER_ID), - groupInstanceIds = List("instance-id"), + memberIds = List( + JoinGroupRequest.UNKNOWN_MEMBER_ID, + memberIds(1), + memberIds(2) + ), + groupInstanceIds = List( + instanceId(memberIds(0)), + instanceId(memberIds(1)), + null + ), version = version.toShort ) ) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index dd0c6954088b4..6b9a5e7cbd5aa 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -6020,7 +6020,7 @@ public CoordinatorResult classicGroup } if (group.type() == CLASSIC) { - return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request); + return classicGroupLeaveToClassicGroup((ClassicGroup) group, request); } else if (group.type() == CONSUMER) { return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, request); } else { @@ -6046,48 +6046,46 @@ private CoordinatorResult classicGrou List records = new ArrayList<>(); for (MemberIdentity memberIdentity : request.members()) { - String memberId = memberIdentity.memberId(); - String instanceId = memberIdentity.groupInstanceId(); String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided"; - ConsumerGroupMember member; try { - if (instanceId == null) { - member = group.getOrMaybeCreateMember(memberId, false); - throwIfMemberDoesNotUseClassicProtocol(member); + ConsumerGroupMember member; + + if (memberIdentity.groupInstanceId() == null) { + member = group.getOrMaybeCreateMember(memberIdentity.memberId(), false); log.info("[GroupId {}] Dynamic member {} has left group " + "through explicit `LeaveGroup` request; client reason: {}", - groupId, memberId, reason); + groupId, memberIdentity.memberId(), reason); } else { - member = group.staticMember(instanceId); - throwIfStaticMemberIsUnknown(member, instanceId); + member = group.staticMember(memberIdentity.groupInstanceId()); + throwIfStaticMemberIsUnknown(member, memberIdentity.groupInstanceId()); // The LeaveGroup API allows administrative removal of members by GroupInstanceId // in which case we expect the MemberId to be undefined. - if (!UNKNOWN_MEMBER_ID.equals(memberId)) { - throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); - throwIfMemberDoesNotUseClassicProtocol(member); + if (!UNKNOWN_MEMBER_ID.equals(memberIdentity.memberId())) { + throwIfInstanceIdIsFenced(member, groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId()); } - memberId = member.memberId(); log.info("[GroupId {}] Static member {} with instance id {} has left group " + "through explicit `LeaveGroup` request; client reason: {}", - groupId, memberId, instanceId, reason); + groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId(), reason); } - removeMember(records, groupId, memberId); - cancelTimers(groupId, memberId); + removeMember(records, groupId, member.memberId()); + cancelTimers(groupId, member.memberId()); + memberResponses.add( new MemberResponse() - .setMemberId(memberId) - .setGroupInstanceId(instanceId) + .setMemberId(memberIdentity.memberId()) + .setGroupInstanceId(memberIdentity.groupInstanceId()) ); + validLeaveGroupMembers.add(member); } catch (KafkaException e) { memberResponses.add( new MemberResponse() - .setMemberId(memberId) - .setGroupInstanceId(instanceId) + .setMemberId(memberIdentity.memberId()) + .setGroupInstanceId(memberIdentity.groupInstanceId()) .setErrorCode(Errors.forException(e).code()) ); } @@ -6126,7 +6124,6 @@ private CoordinatorResult classicGrou * Handle a classic LeaveGroupRequest to a ClassicGroup. * * @param group The ClassicGroup. - * @param context The request context. * @param request The actual LeaveGroup request. * * @return The LeaveGroup response and the GroupMetadata record to append if the group @@ -6134,7 +6131,6 @@ private CoordinatorResult classicGrou */ private CoordinatorResult classicGroupLeaveToClassicGroup( ClassicGroup group, - RequestContext context, LeaveGroupRequestData request ) throws UnknownMemberIdException { if (group.isInState(DEAD)) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 18540253487d1..0535f763b4f5f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -117,8 +117,6 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; -import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; @@ -13861,15 +13859,17 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) context.assertJoinTimeout(groupId, memberId2, member2.rebalanceTimeoutMs()); context.assertSessionTimeout(groupId, memberId2, member2.classicMemberMetadata().get().sessionTimeoutMs()); - // Member 1 and member 2 leave the group. + // Member 1, member 2 and member 3 leave the group. CoordinatorResult leaveResult = context.sendClassicGroupLeave( new LeaveGroupRequestData() .setGroupId("group-id") .setMembers(List.of( // Valid member id. new MemberIdentity() - .setMemberId(memberId1), + .setMemberId(memberId1) + .setGroupInstanceId(null), new MemberIdentity() + .setMemberId(UNKNOWN_MEMBER_ID) .setGroupInstanceId(instanceId2), // Member that doesn't use the classic protocol. new MemberIdentity() @@ -13877,8 +13877,10 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) .setGroupInstanceId(instanceId3), // Unknown member id. new MemberIdentity() - .setMemberId("unknown-member-id"), + .setMemberId("unknown-member-id") + .setGroupInstanceId(null), new MemberIdentity() + .setMemberId(UNKNOWN_MEMBER_ID) .setGroupInstanceId("unknown-instance-id"), // Fenced instance id. new MemberIdentity() @@ -13895,11 +13897,10 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) .setMemberId(memberId1), new LeaveGroupResponseData.MemberResponse() .setGroupInstanceId(instanceId2) - .setMemberId(memberId2), + .setMemberId(UNKNOWN_MEMBER_ID), new LeaveGroupResponseData.MemberResponse() .setGroupInstanceId(instanceId3) - .setMemberId(memberId3) - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()), + .setMemberId(memberId3), new LeaveGroupResponseData.MemberResponse() .setGroupInstanceId(null) .setMemberId("unknown-member-id") @@ -13908,8 +13909,8 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) .setGroupInstanceId("unknown-instance-id") .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()), new LeaveGroupResponseData.MemberResponse() - .setGroupInstanceId(instanceId3) .setMemberId("unknown-member-id") + .setGroupInstanceId(instanceId3) .setErrorCode(Errors.FENCED_INSTANCE_ID.code()) )), leaveResult.response() @@ -13924,6 +13925,12 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2), + // Remove member 3. + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId3), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId3), + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId3), + // Update subscription metadata. + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap()), // Bump the group epoch. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) ); @@ -14045,7 +14052,7 @@ public void testClassicGroupLeaveToConsumerGroupWithoutValidLeaveGroupMember() { String groupId = "group-id"; String memberId = Uuid.randomUuid().toString(); - // Consumer group without member using the classic protocol. + // Consumer group. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -14058,9 +14065,7 @@ public void testClassicGroupLeaveToConsumerGroupWithoutValidLeaveGroupMember() { .setGroupId("group-id") .setMembers(List.of( new MemberIdentity() - .setMemberId("unknown-member-id"), - new MemberIdentity() - .setMemberId(memberId) + .setMemberId("unknown-member-id") )) ); @@ -14070,10 +14075,6 @@ public void testClassicGroupLeaveToConsumerGroupWithoutValidLeaveGroupMember() { new LeaveGroupResponseData.MemberResponse() .setGroupInstanceId(null) .setMemberId("unknown-member-id") - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()), - new LeaveGroupResponseData.MemberResponse() - .setGroupInstanceId(null) - .setMemberId(memberId) .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) )), leaveResult.response() @@ -16047,9 +16048,15 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) new LeaveGroupRequestData() .setGroupId(groupId) .setMembers(List.of( - new MemberIdentity().setGroupInstanceId(memberId1), - new MemberIdentity().setGroupInstanceId(memberId2), - new MemberIdentity().setGroupInstanceId(memberId3) + new MemberIdentity() + .setMemberId(memberId1) + .setGroupInstanceId(null), + new MemberIdentity() + .setMemberId(memberId2) + .setGroupInstanceId(memberId2), + new MemberIdentity() + .setMemberId(UNKNOWN_MEMBER_ID) + .setGroupInstanceId(memberId3) )) ); @@ -16058,12 +16065,12 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) .setMembers(List.of( new LeaveGroupResponseData.MemberResponse() .setMemberId(memberId1) - .setGroupInstanceId(memberId1), + .setGroupInstanceId(null), new LeaveGroupResponseData.MemberResponse() .setMemberId(memberId2) .setGroupInstanceId(memberId2), new LeaveGroupResponseData.MemberResponse() - .setMemberId(memberId3) + .setMemberId(UNKNOWN_MEMBER_ID) .setGroupInstanceId(memberId3) )), result.response() From 7591868aead54fff7d5e8a44c5e06746ed34866b Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Wed, 11 Dec 2024 10:56:47 +0100 Subject: [PATCH 13/13] KAFKA-18179: Move AsyncOffsetReadFutureHolder to storage module (#18095) Reviewers: Christo Lolov --- .../kafka/log/remote/RemoteLogManager.java | 2 +- .../scala/kafka/log/OffsetResultHolder.scala | 12 +------- .../server/ListOffsetsPartitionStatus.scala | 2 +- .../log/remote/RemoteLogOffsetReaderTest.java | 2 +- .../server/DelayedRemoteListOffsetsTest.scala | 2 +- .../log/AsyncOffsetReadFutureHolder.java | 30 +++++++++++++++++++ 6 files changed, 35 insertions(+), 15 deletions(-) create mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6e95e36020b8d..b2b1ab856c04f 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -17,7 +17,6 @@ package kafka.log.remote; import kafka.cluster.Partition; -import kafka.log.AsyncOffsetReadFutureHolder; import kafka.log.UnifiedLog; import kafka.server.DelayedRemoteListOffsets; @@ -74,6 +73,7 @@ import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.AbortedTxn; +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; diff --git a/core/src/main/scala/kafka/log/OffsetResultHolder.scala b/core/src/main/scala/kafka/log/OffsetResultHolder.scala index 64b78c6cee912..89951dbb96f2b 100644 --- a/core/src/main/scala/kafka/log/OffsetResultHolder.scala +++ b/core/src/main/scala/kafka/log/OffsetResultHolder.scala @@ -18,8 +18,7 @@ package kafka.log import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.record.FileRecords.TimestampAndOffset - -import java.util.concurrent.{CompletableFuture, Future} +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset], futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) { @@ -27,12 +26,3 @@ case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset], var maybeOffsetsError: Option[ApiException] = None var lastFetchableOffset: Option[Long] = None } - -/** - * A remote log offset read task future holder. It contains two futures: - * 1. JobFuture - Use this future to cancel the running job. - * 2. TaskFuture - Use this future to get the result of the job/computation. - */ -case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture: CompletableFuture[T]) { - -} diff --git a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala index 702d0a4ccb8ef..d9fb9e6d059db 100644 --- a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala +++ b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala @@ -16,10 +16,10 @@ */ package kafka.server -import kafka.log.AsyncOffsetReadFutureHolder import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder class ListOffsetsPartitionStatus(val futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]], val lastFetchableOffset: Option[Long], diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java index ce027f8f91510..1313c8e2898ed 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java @@ -16,7 +16,6 @@ */ package kafka.log.remote; -import kafka.log.AsyncOffsetReadFutureHolder; import kafka.utils.TestUtils; import org.apache.kafka.common.TopicPartition; @@ -28,6 +27,7 @@ import org.apache.kafka.server.util.MockTime; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala index 4061e6aaaf8ff..eaa4589595954 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala @@ -16,7 +16,6 @@ */ package kafka.server -import kafka.log.AsyncOffsetReadFutureHolder import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.NotLeaderOrFollowerException import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse @@ -25,6 +24,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.util.timer.MockTimer +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.Assertions.assertEquals import org.mockito.ArgumentMatchers.anyBoolean diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java new file mode 100644 index 0000000000000..990a5ef67ddf0 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +/** + * A remote log offset read task future holder. It contains two futures: + *
      + *
    1. JobFuture - Use this future to cancel the running job. + *
    2. TaskFuture - Use this future to get the result of the job/computation. + *
    + */ +public record AsyncOffsetReadFutureHolder(Future jobFuture, CompletableFuture taskFuture) { +}