From 286ea8c03e57df08ad55c8d77c86e284cf00ec8d Mon Sep 17 00:00:00 2001 From: Robert Casey Date: Sat, 18 Feb 2017 14:46:46 -0600 Subject: [PATCH] Created function tryParseOffsetMessage to attempt to parse a kafka offset message retrieved from the internal committed offset topic: * Handles messages of other types and questionable correctness. * Added 100% unit-test coverage for this new function. Add robustness to the log-end-offset getter thread: * No longer shutting down the application on error. Instead, closing and destroying the client and re-creating it. * Sleeping on error before re-creating client and continuing to process Deal with thread-safety issues on shared memory between threads that retrieve data from Kafka. Respect command-line arg kafkaOffsetForceFromStart, starting consumer offset listener clients from the beginning of the log by implementin a ConsumerRebalanceListener. Begin to reduce usage of Zookeeper: * Override getTopics() in KafkaOffsetGetter to retrieve topics directly from the Kafka broker * Override getClusterVis() in KafkaOffsetGetter to retrieve cluster information directly from the Kafka broker. Stoped polluting consumer groups in zookeeper by not creatin a unique consumer group name for the consumer-offset and log-end-offset listener at each client instantiation. Improved createNewAdminClient code, simplifying the error paths and property calling close on error. Use constants for all property in createNewKafkaConsumer(). Fixed some bad grammar in error messages. Re-factored some of the error handling paths, simplifying them. Closing all kafka clients on error so connections do not leak. Fixed silly com.twitter.util-core dependency in build.sbt. --- build.sbt | 2 +- .../com/quantifind/kafka/OffsetGetter.scala | 441 +++++++------ .../kafka/core/KafkaOffsetGetter.scala | 618 +++++++++++------- .../kafka/offsetapp/OffsetGetterApp.scala | 2 +- .../kafka/offsetapp/OffsetGetterWeb.scala | 2 +- .../core/KafkaMessageProtocolHelper.scala | 177 +++++ .../kafka/core/KafkaOffsetGetterSpec.scala | 126 +++- 7 files changed, 879 insertions(+), 489 deletions(-) create mode 100644 src/test/scala/com/quantifind/kafka/core/KafkaMessageProtocolHelper.scala diff --git a/build.sbt b/build.sbt index 2f2ad8c..325086d 100644 --- a/build.sbt +++ b/build.sbt @@ -15,7 +15,7 @@ libraryDependencies ++= Seq( "com.quantifind" %% "sumac" % "0.3.0", "org.apache.kafka" %% "kafka" % "0.9.0.1", "org.reflections" % "reflections" % "0.9.10", - "com.twitter" % "util-core_2.11" % "6.40.0", + "com.twitter" %% "util-core" % "6.40.0", "com.typesafe.slick" %% "slick" % "2.1.0", "org.xerial" % "sqlite-jdbc" % "3.7.2", "org.mockito" % "mockito-all" % "1.10.19" % "test", diff --git a/src/main/scala/com/quantifind/kafka/OffsetGetter.scala b/src/main/scala/com/quantifind/kafka/OffsetGetter.scala index 0b6de78..7ed9244 100644 --- a/src/main/scala/com/quantifind/kafka/OffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/OffsetGetter.scala @@ -1,17 +1,16 @@ package com.quantifind.kafka +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean + +import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo} import com.quantifind.kafka.core._ import com.quantifind.kafka.offsetapp.OffsetGetterArgs -import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo} import com.quantifind.utils.ZkUtilsWrapper import com.twitter.util.Time - -import java.util.concurrent.atomic.AtomicBoolean - import kafka.common.BrokerNotAvailableException import kafka.consumer.{ConsumerConnector, SimpleConsumer} import kafka.utils.{Json, Logging, ZkUtils} - import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.security.JaasUtils @@ -22,233 +21,233 @@ import scala.util.control.NonFatal case class Node(name: String, children: Seq[Node] = Seq()) case class TopicDetails(consumers: Seq[ConsumerDetail]) + case class TopicDetailsWrapper(consumers: TopicDetails) case class TopicAndConsumersDetails(active: Seq[KafkaInfo], inactive: Seq[KafkaInfo]) + case class TopicAndConsumersDetailsWrapper(consumers: TopicAndConsumersDetails) case class ConsumerDetail(name: String) trait OffsetGetter extends Logging { - val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() - - def zkUtils: ZkUtilsWrapper - - // kind of interface methods - def getTopicList(group: String): List[String] - - def getGroups: Seq[String] - - def getTopicMap: Map[String, Seq[String]] - - def getActiveTopicMap: Map[String, Seq[String]] - - def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] - - // get the Kafka simple consumer so that we can fetch broker offsets - protected def getConsumer(bid: Int): Option[SimpleConsumer] = { - try { - zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + bid) match { - case (Some(brokerInfoString), _) => - Json.parseFull(brokerInfoString) match { - case Some(m) => - val brokerInfo = m.asInstanceOf[Map[String, Any]] - val host = brokerInfo.get("host").get.asInstanceOf[String] - val port = brokerInfo.get("port").get.asInstanceOf[Int] - Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker")) - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) - } - case (None, _) => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) - } - } catch { - case t: Throwable => - error("Could not parse broker info", t) - None - } - } - - protected def processTopic(group: String, topic: String): Seq[OffsetInfo] = { - val pidMap = zkUtils.getPartitionsForTopics(Seq(topic)) - for { - partitions <- pidMap.get(topic).toSeq - pid <- partitions.sorted - info <- processPartition(group, topic, pid) - } yield info - } - - protected def brokerInfo(): Iterable[BrokerInfo] = { - for { - (bid, consumerOpt) <- consumerMap - consumer <- consumerOpt - } yield BrokerInfo(id = bid, host = consumer.host, port = consumer.port) - } - - protected def offsetInfo(group: String, topics: Seq[String] = Seq()): Seq[OffsetInfo] = { - - val topicList = if (topics.isEmpty) { - getTopicList(group) - } else { - topics - } - - topicList.sorted.flatMap(processTopic(group, _)) - } - - - // get information about a consumer group and the topics it consumes - def getInfo(group: String, topics: Seq[String] = Seq()): KafkaInfo = { - val off = offsetInfo(group, topics) - val brok = brokerInfo() - KafkaInfo( - name = group, - brokers = brok.toSeq, - offsets = off - ) - } - - // get list of all topics - def getTopics: Seq[String] = { - try { - zkUtils.getChildren(ZkUtils.BrokerTopicsPath).sortWith(_ < _) - } catch { - case NonFatal(t) => - error(s"could not get topics because of ${t.getMessage}", t) - Seq() - } - } - - def getClusterViz: Node = { - val clusterNodes = zkUtils.getAllBrokersInCluster().map((broker) => { - Node(broker.toString(), Seq()) - }) - Node("KafkaCluster", clusterNodes) - } - - /** - * Returns details for a given topic such as the consumers pulling off of it - */ - def getTopicDetail(topic: String): TopicDetails = { - val topicMap = getActiveTopicMap - - if (topicMap.contains(topic)) { - TopicDetails(topicMap(topic).map(consumer => { - ConsumerDetail(consumer.toString) - }).toSeq) - } else { - TopicDetails(Seq(ConsumerDetail("Unable to find Active Consumers"))) - } - } - - def mapConsumerDetails(consumers: Seq[String]): Seq[ConsumerDetail] = - consumers.map(consumer => ConsumerDetail(consumer.toString)) - - /** - * Returns details for a given topic such as the active consumers pulling off of it - * and for each of the active consumers it will return the consumer data - */ - def getTopicAndConsumersDetail(topic: String): TopicAndConsumersDetailsWrapper = { - val topicMap = getTopicMap - val activeTopicMap = getActiveTopicMap - - val activeConsumers = if (activeTopicMap.contains(topic)) { - mapConsumersToKafkaInfo(activeTopicMap(topic), topic) - } else { - Seq() - } - - val inactiveConsumers = if (!activeTopicMap.contains(topic) && topicMap.contains(topic)) { - mapConsumersToKafkaInfo(topicMap(topic), topic) - } else if (activeTopicMap.contains(topic) && topicMap.contains(topic)) { - mapConsumersToKafkaInfo(topicMap(topic).diff(activeTopicMap(topic)), topic) - } else { - Seq() - } - - TopicAndConsumersDetailsWrapper(TopicAndConsumersDetails(activeConsumers, inactiveConsumers)) - } - - def mapConsumersToKafkaInfo(consumers: Seq[String], topic: String): Seq[KafkaInfo] = - consumers.map(getInfo(_, Seq(topic))) - - - def getActiveTopics: Node = { - val topicMap = getActiveTopicMap - - Node("ActiveTopics", topicMap.map { - case (s: String, ss: Seq[String]) => { - Node(s, ss.map(consumer => Node(consumer))) - - } - }.toSeq) - } - - def close() { - // TODO: What is going on here? This code is broken - /* - for (consumerOpt <- consumerMap.values) { - consumerOpt match { - case Some(consumer) => consumer.close() - case None => // ignore - } - } - */ - } + val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() + + def zkUtils: ZkUtilsWrapper + + // kind of interface methods + def getTopicList(group: String): List[String] + + def getGroups: Seq[String] + + def getTopicMap: Map[String, Seq[String]] + + def getActiveTopicMap: Map[String, Seq[String]] + + def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] + + // get the Kafka simple consumer so that we can fetch broker offsets + protected def getConsumer(bid: Int): Option[SimpleConsumer] = { + try { + zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + bid) match { + case (Some(brokerInfoString), _) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker")) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + case (None, _) => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + } catch { + case t: Throwable => + error("Could not parse broker info", t) + None + } + } + + protected def processTopic(group: String, topic: String): Seq[OffsetInfo] = { + val pidMap = zkUtils.getPartitionsForTopics(Seq(topic)) + for { + partitions <- pidMap.get(topic).toSeq + pid <- partitions.sorted + info <- processPartition(group, topic, pid) + } yield info + } + + protected def brokerInfo(): Iterable[BrokerInfo] = { + for { + (bid, consumerOpt) <- consumerMap + consumer <- consumerOpt + } yield BrokerInfo(id = bid, host = consumer.host, port = consumer.port) + } + + protected def offsetInfo(group: String, topics: Seq[String] = Seq()): Seq[OffsetInfo] = { + + val topicList = if (topics.isEmpty) { + getTopicList(group) + } else { + topics + } + + topicList.sorted.flatMap(processTopic(group, _)) + } + + + // get information about a consumer group and the topics it consumes + def getInfo(group: String, topics: Seq[String] = Seq()): KafkaInfo = { + val off = offsetInfo(group, topics) + val brok = brokerInfo() + KafkaInfo( + name = group, + brokers = brok.toSeq, + offsets = off + ) + } + + // get list of all topics + def getTopics: Seq[String] = { + try { + zkUtils.getChildren(ZkUtils.BrokerTopicsPath).sortWith(_ < _) + } catch { + case NonFatal(t) => + error(s"could not get topics because of ${t.getMessage}", t) + Seq() + } + } + + def getClusterViz: Node = { + val clusterNodes = zkUtils.getAllBrokersInCluster().map((broker) => { + Node(broker.toString(), Seq()) + }) + Node("KafkaCluster", clusterNodes) + } + + /** + * Returns details for a given topic such as the consumers pulling off of it + */ + def getTopicDetail(topic: String): TopicDetails = { + val topicMap = getActiveTopicMap + + if (topicMap.contains(topic)) { + TopicDetails(topicMap(topic).map(consumer => { + ConsumerDetail(consumer.toString) + }).toSeq) + } else { + TopicDetails(Seq(ConsumerDetail("Unable to find Active Consumers"))) + } + } + + def mapConsumerDetails(consumers: Seq[String]): Seq[ConsumerDetail] = + consumers.map(consumer => ConsumerDetail(consumer.toString)) + + /** + * Returns details for a given topic such as the active consumers pulling off of it + * and for each of the active consumers it will return the consumer data + */ + def getTopicAndConsumersDetail(topic: String): TopicAndConsumersDetailsWrapper = { + val topicMap = getTopicMap + val activeTopicMap = getActiveTopicMap + + val activeConsumers = if (activeTopicMap.contains(topic)) { + mapConsumersToKafkaInfo(activeTopicMap(topic), topic) + } else { + Seq() + } + + val inactiveConsumers = if (!activeTopicMap.contains(topic) && topicMap.contains(topic)) { + mapConsumersToKafkaInfo(topicMap(topic), topic) + } else if (activeTopicMap.contains(topic) && topicMap.contains(topic)) { + mapConsumersToKafkaInfo(topicMap(topic).diff(activeTopicMap(topic)), topic) + } else { + Seq() + } + + TopicAndConsumersDetailsWrapper(TopicAndConsumersDetails(activeConsumers, inactiveConsumers)) + } + + def mapConsumersToKafkaInfo(consumers: Seq[String], topic: String): Seq[KafkaInfo] = + consumers.map(getInfo(_, Seq(topic))) + + + def getActiveTopics: Node = { + val topicMap = getActiveTopicMap + + Node("ActiveTopics", topicMap.map { + case (s: String, ss: Seq[String]) => { + Node(s, ss.map(consumer => Node(consumer))) + + } + }.toSeq) + } } object OffsetGetter { - case class KafkaInfo(name: String, brokers: Seq[BrokerInfo], offsets: Seq[OffsetInfo]) - - case class BrokerInfo(id: Int, host: String, port: Int) - - case class OffsetInfo(group: String, - topic: String, - partition: Int, - offset: Long, - logSize: Long, - owner: Option[String], - creation: Time, - modified: Time) { - val lag = logSize - offset - } - - val kafkaOffsetListenerStarted: AtomicBoolean = new AtomicBoolean(false) - var zkUtils: ZkUtilsWrapper = null - var consumerConnector: ConsumerConnector = null - var newKafkaConsumer: KafkaConsumer[String, String] = null - - def createZkUtils(args: OffsetGetterArgs): ZkUtils = { - ZkUtils(args.zk, - args.zkSessionTimeout.toMillis.toInt, - args.zkConnectionTimeout.toMillis.toInt, - JaasUtils.isZkSecurityEnabled()) - } - - def getInstance(args: OffsetGetterArgs): OffsetGetter = { - - if (kafkaOffsetListenerStarted.compareAndSet(false, true)) { - zkUtils = new ZkUtilsWrapper(createZkUtils(args)) - - args.offsetStorage.toLowerCase match { - - case "kafka" => - // Start threads which will collect topic/partition/offset/client data on a schedule - KafkaOffsetGetter.startAdminClient(args) - KafkaOffsetGetter.startTopicPartitionOffsetGetter(args) - KafkaOffsetGetter.startCommittedOffsetListener(args) - } - } - - args.offsetStorage.toLowerCase match { - case "kafka" => - new KafkaOffsetGetter(args) - case "storm" => - new StormOffsetGetter(zkUtils, args.stormZKOffsetBase) - case _ => - new ZKOffsetGetter(zkUtils) - } - } + case class KafkaInfo(name: String, brokers: Seq[BrokerInfo], offsets: Seq[OffsetInfo]) + + case class BrokerInfo(id: Int, host: String, port: Int) + + case class OffsetInfo(group: String, + topic: String, + partition: Int, + offset: Long, + logSize: Long, + owner: Option[String], + creation: Time, + modified: Time) { + val lag = logSize - offset + } + + val kafkaOffsetListenerStarted: AtomicBoolean = new AtomicBoolean(false) + var zkUtils: ZkUtilsWrapper = null + var consumerConnector: ConsumerConnector = null + var newKafkaConsumer: KafkaConsumer[String, String] = null + + def createZkUtils(args: OffsetGetterArgs): ZkUtils = { + ZkUtils(args.zk, + args.zkSessionTimeout.toMillis.toInt, + args.zkConnectionTimeout.toMillis.toInt, + JaasUtils.isZkSecurityEnabled()) + } + + def getInstance(args: OffsetGetterArgs): OffsetGetter = { + + if (kafkaOffsetListenerStarted.compareAndSet(false, true)) { + + zkUtils = new ZkUtilsWrapper(createZkUtils(args)) + + if (args.offsetStorage.toLowerCase == "kafka") { + + val adminClientExecutor = Executors.newSingleThreadExecutor() + adminClientExecutor.submit(new Runnable() { + def run() = KafkaOffsetGetter.startAdminClient(args) + }) + + val logEndOffsetExecutor = Executors.newSingleThreadExecutor() + logEndOffsetExecutor.submit(new Runnable() { + def run() = KafkaOffsetGetter.startLogEndOffsetGetter(args) + }) + + val committedOffsetExecutor = Executors.newSingleThreadExecutor() + committedOffsetExecutor.submit(new Runnable() { + def run() = KafkaOffsetGetter.startCommittedOffsetListener(args) + }) + } + } + + args.offsetStorage.toLowerCase match { + case "kafka" => + new KafkaOffsetGetter(zkUtils, args) + case "storm" => + new StormOffsetGetter(zkUtils, args.stormZKOffsetBase) + case _ => + new ZKOffsetGetter(zkUtils) + } + } } \ No newline at end of file diff --git a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala index 588940f..6f6a62e 100644 --- a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala @@ -1,21 +1,21 @@ package com.quantifind.kafka.core -import com.quantifind.kafka.OffsetGetter +import java.nio.{BufferUnderflowException, ByteBuffer} +import java.util +import java.util.{Arrays, Properties} + import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.kafka.offsetapp.OffsetGetterArgs +import com.quantifind.kafka.{Node, OffsetGetter} +import com.quantifind.utils.ZkUtilsWrapper import com.twitter.util.Time - -import java.nio.ByteBuffer -import java.util.{Arrays, Properties} - import kafka.admin.AdminClient -import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.common.{KafkaException, OffsetAndMetadata, TopicAndPartition} import kafka.coordinator._ -import kafka.utils.{Logging, ZkUtils} - +import kafka.utils.Logging import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{PartitionInfo, TopicPartition} import scala.collection._ import scala.concurrent.ExecutionContext.Implicits.global @@ -25,329 +25,441 @@ import scala.concurrent.{Await, Future, duration} /** * Created by rcasey on 11/16/2016. */ -class KafkaOffsetGetter(args: OffsetGetterArgs) extends OffsetGetter { - - import KafkaOffsetGetter._ - - // TODO: We will get all data from the Kafka broker in this class. This is here simply to satisfy - // the OffsetGetter dependency until it can be refactored. - override val zkUtils = null - - override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = { - - val topicAndPartition = TopicAndPartition(topic, pid) - - committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData => - - // BIT O'HACK to deal with timing: - // Due to thread and processing timing, it is possible that the value we have for the topicPartition's - // logEndOffset has not yet been updated to match the value on the broker. When this happens, report the - // topicPartition's logEndOffset as "committedOffset - lag", as the topicPartition's logEndOffset is *AT LEAST* - // this value - val logEndOffset: Option[Long] = topicPartitionOffsetsMap.get(topicAndPartition) - val committedOffset: Long = offsetMetaData.offset - val lag: Long = logEndOffset.get - committedOffset - val logEndOffsetReported: Long = if (lag < 0) committedOffset - lag else logEndOffset.get - - // Get client information if we can find an associated client - var clientString: Option[String] = Option("NA") - val filteredClients = clients.filter(c => (c.group == group && c.topicPartitions.contains(topicAndPartition))) - if (!filteredClients.isEmpty) { - val client: ClientGroup = filteredClients.head - clientString = Option(client.clientId + client.clientHost) - } - - OffsetInfo(group = group, - topic = topic, - partition = pid, - offset = committedOffset, - logSize = logEndOffsetReported, - owner = clientString, - creation = Time.fromMilliseconds(offsetMetaData.commitTimestamp), - modified = Time.fromMilliseconds(offsetMetaData.expireTimestamp)) - } - } - - override def getGroups: Seq[String] = { - topicAndGroups.groupBy(_.group).keySet.toSeq.sorted - } - - override def getTopicList(group: String): List[String] = { - topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList.sorted - } - - override def getTopicMap: Map[String, scala.Seq[String]] = { - topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq) - } - - override def getActiveTopicMap: Map[String, Seq[String]] = { - getTopicMap - } +class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) extends OffsetGetter { + + import KafkaOffsetGetter._ + + // TODO: We will get all data from the Kafka broker in this class. This is here simply to satisfy + // the OffsetGetter dependency until it can be refactored. + override val zkUtils = zkUtilsWrapper + + override def processPartition(group: String, topic: String, partitionId: Int): Option[OffsetInfo] = { + + val topicPartition = new TopicPartition(topic, partitionId) + val topicAndPartition = TopicAndPartition(topic, partitionId) + + committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData => + + // BIT O'HACK to deal with timing: + // Due to thread and processing timing, it is possible that the value we have for the topicPartition's + // logEndOffset has not yet been updated to match the value on the broker. When this happens, report the + // topicPartition's logEndOffset as "committedOffset - lag", as the topicPartition's logEndOffset is *AT LEAST* + // this value + val logEndOffset: Option[Long] = logEndOffsetsMap.get(topicPartition) + val committedOffset: Long = offsetMetaData.offset + val lag: Long = logEndOffset.get - committedOffset + val logEndOffsetReported: Long = if (lag < 0) committedOffset - lag else logEndOffset.get + + // Get client information if we can find an associated client + var clientString: Option[String] = Option("NA") + val filteredClients = clients.filter(c => (c.group == group && c.topicPartitions.contains(topicPartition))) + if (!filteredClients.isEmpty) { + val client: ClientGroup = filteredClients.head + clientString = Option(client.clientId + client.clientHost) + } + + OffsetInfo(group = group, + topic = topic, + partition = partitionId, + offset = committedOffset, + logSize = logEndOffsetReported, + owner = clientString, + creation = Time.fromMilliseconds(offsetMetaData.commitTimestamp), + modified = Time.fromMilliseconds(offsetMetaData.expireTimestamp)) + } + } + + override def getGroups: Seq[String] = { + topicAndGroups.groupBy(_.group).keySet.toSeq.sorted + } + + override def getTopicList(group: String): List[String] = { + topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList.sorted + } + + override def getTopicMap: Map[String, scala.Seq[String]] = { + topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq) + } + + override def getActiveTopicMap: Map[String, Seq[String]] = { + getTopicMap + } + + override def getTopics: Seq[String] = { + topicPartitionsMap.keys.toSeq.sorted + } + + override def getClusterViz: Node = { + val clusterNodes = topicPartitionsMap.values.map(partition => { + Node(partition.get(0).leader().host() + ":" + partition.get(0).leader().port(), Seq()) + }).toSet.toSeq.sortWith(_.name < _.name) + Node("KafkaCluster", clusterNodes) + } } object KafkaOffsetGetter extends Logging { - val committedOffsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap() - val topicPartitionOffsetsMap: mutable.Map[TopicAndPartition, Long] = mutable.HashMap() - val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet() - val clients: mutable.Set[ClientGroup] = mutable.HashSet() + val committedOffsetMap: concurrent.Map[GroupTopicPartition, OffsetAndMetadata] = concurrent.TrieMap() + val logEndOffsetsMap: concurrent.Map[TopicPartition, Long] = concurrent.TrieMap() + + // Swap the object on update + var activeTopicPartitions: immutable.Set[TopicAndPartition] = immutable.HashSet() + var clients: immutable.Set[ClientGroup] = immutable.HashSet() + var topicAndGroups: immutable.Set[TopicAndGroup] = immutable.HashSet() + var topicPartitionsMap: immutable.Map[String, util.List[PartitionInfo]] = immutable.HashMap() + + + private def createNewKafkaConsumer(args: OffsetGetterArgs, group: String, autoCommitOffset: Boolean): KafkaConsumer[Array[Byte], Array[Byte]] = { + + val props: Properties = new Properties + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) + props.put(ConsumerConfig.GROUP_ID_CONFIG, group) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, if (autoCommitOffset) "true" else "false") + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (args.kafkaOffsetForceFromStart) "earliest" else "latest") + + new KafkaConsumer[Array[Byte], Array[Byte]](props) + } + + private def createNewAdminClient(args: OffsetGetterArgs): AdminClient = { + + val sleepAfterFailedAdminClientConnect: Int = 30000 + var adminClient: AdminClient = null - var tpOffsetGetterConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = null; + val props: Properties = new Properties + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) - case class TopicAndGroup(topic: String, group: String) + while (null == adminClient) { - case class ClientGroup(group: String, clientId: String, clientHost: String, topicPartitions: Set[TopicAndPartition]) + try { - private def createNewKafkaConsumer(args: OffsetGetterArgs, group: String): KafkaConsumer[Array[Byte], Array[Byte]] = { + info("Creating new Kafka AdminClient to get consumer and group info."); + adminClient = AdminClient.create(props) + } - val props: Properties = new Properties - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) - props.put(ConsumerConfig.GROUP_ID_CONFIG, group) - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put("security.protocol", args.kafkaSecurityProtocol) + catch { - new KafkaConsumer[Array[Byte], Array[Byte]](props) - } + case e: Throwable => - private def createNewAdminClient(args: OffsetGetterArgs): AdminClient = { + if (null != adminClient) { - val sleepAfterFailedAdminClientConnect: Int = 60000 - var hasGoodAdminClient: Boolean = false - var adminClient: AdminClient = null + adminClient.close() + adminClient = null + } - while (!hasGoodAdminClient) { + val errorMsg = "Error creating an AdminClient. Will attempt to re-create in %d seconds".format(sleepAfterFailedAdminClientConnect) + error(errorMsg, e) + Thread.sleep(sleepAfterFailedAdminClientConnect) + } + } - try { + info("Created admin client: " + adminClient) + adminClient + } - info("Creating new Kafka AdminClient to get consumer and group info."); - val props: Properties = new Properties - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) - props.put("security.protocol", args.kafkaSecurityProtocol) + /** + * Attempts to parse a kafka message as an offset message. + * + * @author Robert Casey (rcasey212@gmail.com) + * @param message message retrieved from the kafka client's poll() method + * @return key-value of GroupTopicPartition and OffsetAndMetadata if the message was a valid offset message, + * otherwise None + */ + def tryParseOffsetMessage(message: ConsumerRecord[Array[Byte], Array[Byte]]): Option[(GroupTopicPartition, OffsetAndMetadata)] = { - adminClient = AdminClient.create(props) - hasGoodAdminClient = true - } + try { + // If the message has a null key, there is nothing that can be done + if (message.key == null) { - catch { + info("Ignoring message with a null key.") + return None + } - case e: Throwable => - adminClient = null - hasGoodAdminClient = false - val errorMsg = "Error creating an AdminClient. Will attempt to re-create in %d seconds".format(sleepAfterFailedAdminClientConnect) - error(errorMsg, e) - Thread.sleep(sleepAfterFailedAdminClientConnect) - } - } + val baseKey: BaseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(message.key)) - adminClient - } + // Match on the key to see if the message is an offset message + baseKey match { - def startCommittedOffsetListener(args: OffsetGetterArgs) = { + // This is the type we are looking for + case b: OffsetKey => + val messageBody: Array[Byte] = message.value() - var offsetConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = null; + if (messageBody != null) { - if (null == offsetConsumer) { + val gtp: GroupTopicPartition = b.key + val offsetAndMetadata: OffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(messageBody)) + return Option(gtp, offsetAndMetadata) + } + else { - logger.info("Creating new Kafka Client to get consumer group committed offsets") + // Have run into circumstances where kafka sends an offset message with a good key, but with a + // null body/value. Return None if the body/value is null, as message is not parsable + info("Ignoring offset message with NULL body/value.") + return None + } - val group: String = "kafka-offset-getter-client-" + System.currentTimeMillis - val consumerOffsetTopic = "__consumer_offsets" + // Return None for all non-offset messages + case _ => + info("Ignoring non-offset message.") + return None + } + } catch { - offsetConsumer = createNewKafkaConsumer(args, group) - offsetConsumer.subscribe(Arrays.asList(consumerOffsetTopic)) - } + case malformedEx@(_: BufferUnderflowException | _: KafkaException) => + val errorMsg = String.format("The message was malformed and does not conform to a type of (BaseKey, OffsetAndMetadata. Ignoring this message.") + error(errorMsg, malformedEx) + return None - Future { - try { + case e: Throwable => + val errorMsg = String.format("An unhandled exception was thrown while attempting to determine the validity of a message as an offset message. This message will be ignored.") + error(errorMsg, e) + return None + } + } - while (true) { + def startCommittedOffsetListener(args: OffsetGetterArgs) = { - val records: ConsumerRecords[Array[Byte], Array[Byte]] = offsetConsumer.poll(100) + val group: String = "kafka-monitor-committedOffsetListener" + val consumerOffsetTopic = "__consumer_offsets" + var offsetConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = null - if (0 != records.count) { + while (true) { - val iter = records.iterator() + try { - while (iter.hasNext()) { + if (null == offsetConsumer) { - val record: ConsumerRecord[Array[Byte], Array[Byte]] = iter.next() - val baseKey: BaseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key)) - baseKey match { + logger.info("Creating new Kafka Client to get consumer group committed offsets") + offsetConsumer = createNewKafkaConsumer(args, group, false) - case b: OffsetKey => - val offsetAndMetadata: OffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value)) - val gtp: GroupTopicPartition = b.key + offsetConsumer.subscribe( + Arrays.asList(consumerOffsetTopic), + new ConsumerRebalanceListener { + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = { - val existingCommittedOffsetMap: Option[OffsetAndMetadata] = committedOffsetMap.get(gtp) + if (args.kafkaOffsetForceFromStart) { - // Update only if the new message brings a change in offset - if (!existingCommittedOffsetMap.isDefined || existingCommittedOffsetMap.get.offset != offsetAndMetadata.offset) { + val topicPartitionIterator = partitions.iterator() - val group: String = gtp.group - val topic: String = gtp.topicPartition.topic - val partition: Long = gtp.topicPartition.partition - val offset: Long = offsetAndMetadata.offset - logger.info(s"Updating committed offset: g:$group,t:$topic,p:$partition: $offset") + while (topicPartitionIterator.hasNext()) { - committedOffsetMap += (gtp -> offsetAndMetadata) - } + val topicPartition: TopicPartition = topicPartitionIterator.next() + offsetConsumer.seekToBeginning(topicPartition) + } + } + } - case _ => // do nothing with these other messages - } - } - } - } + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = { + /* Do nothing for now */ + } + }) + } - } catch { + val messages: ConsumerRecords[Array[Byte], Array[Byte]] = offsetConsumer.poll(500) + val messageIterator = messages.iterator() - case e: Throwable => - val errorMsg = String.format("The Kafka Client reading consumer group committed offsets has thrown an unhandled exception.") - fatal(errorMsg, e) - System.exit(1) - } - } - } + while (messageIterator.hasNext()) { - // Retrieve ConsumerGroup, Consumer, Topic, and Partition information - def startAdminClient(args: OffsetGetterArgs) = { + val message: ConsumerRecord[Array[Byte], Array[Byte]] = messageIterator.next() + val offsetMessage: Option[(GroupTopicPartition, OffsetAndMetadata)] = tryParseOffsetMessage(message) - val sleepOnException: Int = 60000 - val sleepOnDataRetrieval: Int = 10000 + if (offsetMessage.isDefined) { - Future { + // Deal with the offset message + val messageOffsetMap: (GroupTopicPartition, OffsetAndMetadata) = offsetMessage.get + val gtp: GroupTopicPartition = messageOffsetMap._1 + val offsetAndMetadata: OffsetAndMetadata = messageOffsetMap._2 - while (true) { + // Get current offset for topic-partition + val existingCommittedOffsetMap: Option[OffsetAndMetadata] = committedOffsetMap.get(gtp) - var adminClient: AdminClient = null + // Update committed offset only if the new message brings a change in offset for the topic-partition: + // a changed offset for an existing topic-partition, or a new topic-partition + if (!existingCommittedOffsetMap.isDefined || existingCommittedOffsetMap.get.offset != offsetAndMetadata.offset) { - try { + val group: String = gtp.group + val topic: String = gtp.topicPartition.topic + val partition: Long = gtp.topicPartition.partition + val offset: Long = offsetAndMetadata.offset - adminClient = createNewAdminClient(args) - var hasError: Boolean = false + info(s"Updating committed offset: g:$group,t:$topic,p:$partition: $offset") + committedOffsetMap += messageOffsetMap + } + } + } + } catch { - while (!hasError) { + case e: Throwable => { + val errorMsg = String.format("An unhandled exception was thrown while reading messages from the committed offsets topic.") + error(errorMsg, e) - lazy val f = Future { + if (null != offsetConsumer) { - try { + offsetConsumer.close() + offsetConsumer = null + } + } + } + } + } - def groupOverviews = adminClient.listAllConsumerGroupsFlattened() + def startAdminClient(args: OffsetGetterArgs) = { - groupOverviews.foreach((groupOverview: GroupOverview) => { + val sleepOnDataRetrieval: Int = 30000 + val sleepOnError: Int = 60000 + val awaitForResults: Int = 30000 + var adminClient: AdminClient = null - val groupId = groupOverview.groupId; - val consumerGroupSummary = adminClient.describeConsumerGroup(groupId) + while (true) { - consumerGroupSummary.foreach((consumerSummary) => { + try { - val clientId = consumerSummary.clientId - val clientHost = consumerSummary.clientHost + if (null == adminClient) { - val topicPartitions: List[TopicPartition] = consumerSummary.assignment - var topicAndPartitions: mutable.Set[TopicAndPartition] = mutable.HashSet() + adminClient = createNewAdminClient(args) + } - topicPartitions.foreach((topicPartition) => { + var hasError: Boolean = false - val topic = topicPartition.topic() - val partitionId = topicPartition.partition() - val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partitionId) + while (!hasError) { - topicAndGroups += TopicAndGroup(topic, groupId) - topicAndPartitions += topicAndPartition - }) + lazy val f = Future { - clients += ClientGroup(groupId, clientId, clientHost, topicAndPartitions) - }) - }) - } + try { - catch { + val newTopicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet() + val newClients: mutable.Set[ClientGroup] = mutable.HashSet() + val newActiveTopicPartitions: mutable.HashSet[TopicAndPartition] = mutable.HashSet() - case e: Throwable => + val groupOverviews = adminClient.listAllConsumerGroupsFlattened() - hasError = true - if (null != adminClient) { - adminClient.close() - } - val errorMsg: String = "Kafka AdminClient polling aborted due to unexpected exception." - error(errorMsg, e) - } - } + groupOverviews.foreach((groupOverview: GroupOverview) => { - Await.result(f, duration.pairIntToDuration(30, duration.SECONDS)) + val groupId: String = groupOverview.groupId; + val consumerGroupSummary: List[AdminClient#ConsumerSummary] = adminClient.describeConsumerGroup(groupId) - if (hasError) { + consumerGroupSummary.foreach((consumerSummary) => { - Thread.sleep(sleepOnException) - } else { + val clientId: String = consumerSummary.clientId + val clientHost: String = consumerSummary.clientHost - Thread.sleep(sleepOnDataRetrieval) - } - } - } + val topicPartitions: List[TopicPartition] = consumerSummary.assignment - catch { + topicPartitions.foreach((topicPartition) => { - case ex: java.util.concurrent.TimeoutException => { - error("The AdminClient timed out. It will be closed and restarted.") - if (null != adminClient) { - adminClient.close() - } - } - } - } - } - } + newActiveTopicPartitions += TopicAndPartition(topicPartition.topic(), topicPartition.partition()) + newTopicAndGroups += TopicAndGroup(topicPartition.topic(), groupId) + }) - def startTopicPartitionOffsetGetter(args: OffsetGetterArgs) = { + newClients += ClientGroup(groupId, clientId, clientHost, topicPartitions.toSet) + }) + }) - Future { + activeTopicPartitions = newActiveTopicPartitions.toSet + clients = newClients.toSet + topicAndGroups = newTopicAndGroups.toSet + } - try { + catch { - while (true) { + case e: Throwable => - // Get list of distinct TopicAndPartitions - val distinctTopicPartitions: List[TopicAndPartition] = clients.flatMap(c => c.topicPartitions).toList.distinct + val errorMsg: String = "Kafka AdminClient polling aborted due to an unexpected exception." + error(errorMsg, e) + hasError = true + if (null != adminClient) { - if (null == tpOffsetGetterConsumer) { + adminClient.close() + adminClient = null + } + } + } - val group: String = "KafkaOffsetMonitor-TopicPartOffsetGetter-" + System.currentTimeMillis() - tpOffsetGetterConsumer = createNewKafkaConsumer(args, group) + Await.result(f, duration.pairIntToDuration(awaitForResults, duration.MILLISECONDS)) - // Do I need to assign to each topicPartition? - } + if (hasError) { + Thread.sleep(sleepOnError) + } else { + Thread.sleep(sleepOnDataRetrieval) + } + } + } - // Iterate over each distinct TopicPartition - distinctTopicPartitions.foreach(topicAndPartition => { + catch { - // Get the LogEndOffset for the TopicPartition - val topicPartition: TopicPartition = new TopicPartition(topicAndPartition.topic, topicAndPartition.partition) - tpOffsetGetterConsumer.assign(Arrays.asList(topicPartition)) - tpOffsetGetterConsumer.seekToEnd(topicPartition) - val logEndOffset: Long = tpOffsetGetterConsumer.position(topicPartition) + case tex: java.util.concurrent.TimeoutException => { + warn("The AdminClient timed out. It will be closed and restarted.", tex) + if (null != adminClient) { + adminClient.close() + adminClient = null + } + } + } + } + } - // Update the TopicPartition map with the current LogEndOffset if it exists, else add a new map - if (topicPartitionOffsetsMap.contains(topicAndPartition)) { - topicPartitionOffsetsMap.update(topicAndPartition, logEndOffset) - } - else { - topicPartitionOffsetsMap += (topicAndPartition -> logEndOffset) - } - }) - } + def startLogEndOffsetGetter(args: OffsetGetterArgs) = { - } catch { + val group: String = "kafka-monitor-LogEndOffsetGetter" + val sleepOnDataRetrieval: Int = 10000 + val sleepOnError: Int = 30000 + var logEndOffsetGetter: KafkaConsumer[Array[Byte], Array[Byte]] = null + + while (true) { + + try { + + while (null == logEndOffsetGetter) { + + logEndOffsetGetter = createNewKafkaConsumer(args, group, false) + } + + // Get topic-partitions + topicPartitionsMap = JavaConversions.mapAsScalaMap(logEndOffsetGetter.listTopics()).toMap + val distinctPartitionInfo: Seq[PartitionInfo] = (topicPartitionsMap.values).flatten(listPartitionInfo => JavaConversions.asScalaBuffer(listPartitionInfo)).toSeq + + // Iterate over each distinct PartitionInfo + distinctPartitionInfo.foreach(partitionInfo => { + + // Get the LogEndOffset for the TopicPartition + val topicPartition: TopicPartition = new TopicPartition(partitionInfo.topic, partitionInfo.partition) + logEndOffsetGetter.assign(Arrays.asList(topicPartition)) + logEndOffsetGetter.seekToEnd(topicPartition) + val logEndOffset: Long = logEndOffsetGetter.position(topicPartition) + + // Update the TopicPartition map with the current LogEndOffset if it exists, else add a new entry to the map + if (logEndOffsetsMap.contains(topicPartition)) { + logEndOffsetsMap.update(topicPartition, logEndOffset) + } + else { + logEndOffsetsMap += (topicPartition -> logEndOffset) + } + }) + + Thread.sleep(sleepOnDataRetrieval) + } + + catch { + + case e: Throwable => { + val errorMsg = String.format("The Kafka Client reading topic/partition LogEndOffsets has thrown an unhandled exception. Will attempt to reconnect.") + error(errorMsg, e) + + if (null != logEndOffsetGetter) { + + logEndOffsetGetter.close() + logEndOffsetGetter = null + } + } + } + } + } +} - case e: Throwable => - val errorMsg = String.format("The Kafka Client reading topic/partition LogEndOffsets has thrown an unhandled exception.") - fatal(errorMsg, e) - System.exit(1) - } - } +case class TopicAndGroup(topic: String, group: String) - } -} \ No newline at end of file +case class ClientGroup(group: String, clientId: String, clientHost: String, topicPartitions: Set[TopicPartition]) \ No newline at end of file diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala index 9294a58..efd6d7d 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala @@ -111,7 +111,7 @@ object OffsetGetterApp extends ArgMain[OffsetGetterArgsWGT] { } } finally { - if (og != null) og.close() + } } } diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala index 718a17f..759429b 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala @@ -87,7 +87,7 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { og = OffsetGetter.getInstance(args) f(og) } finally { - if (og != null) og.close() + } } diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaMessageProtocolHelper.scala b/src/test/scala/com/quantifind/kafka/core/KafkaMessageProtocolHelper.scala new file mode 100644 index 0000000..4255923 --- /dev/null +++ b/src/test/scala/com/quantifind/kafka/core/KafkaMessageProtocolHelper.scala @@ -0,0 +1,177 @@ +package com.quantifind.kafka.core + +import java.nio.ByteBuffer + +import kafka.common.{KafkaException, OffsetAndMetadata} +import org.apache.kafka.common.protocol.types.Type.{BYTES, INT32, INT64, STRING} +import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} + +/** + * The objective of this object is to provide helpers to serialize data into Kafka's internal Group Metadata protocol. + * This is used to test the handling of various commit & metadata messages which appear in Kafka's + * internal __consumer_offsets topic. + * + * The contents of this object are copy-pasted from kafka.coordinator.GroupMetadataManager. This was done for several + * reasons: + * 1. I am unable to mock the static functions I call in the Kafka codebase and therefore have to test the Kafka + * codebase along with my code. + * 2. To test our handling of committed offset messages, I need to synthesize messages, good and bad, which can appear + * on the internal Kafka topic that stores the offset commit location for each consumer-group, topic, and partition. + * 3. These serialization helper functions are private in the Kafka codebase, and therefore inaccessible for + * use outside of the GroupMetadataManager object. + * + * author: Robert Casey (rcasey212@gmail.com) + */ +object KafkaMessageProtocolHelper { + + private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort + private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort + + private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), + new Field("topic", STRING), + new Field("partition", INT32)) + private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") + private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") + private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") + + private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), + new Field("metadata", STRING, "Associated metadata.", ""), + new Field("timestamp", INT64)) + private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") + private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") + private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") + + private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), + new Field("metadata", STRING, "Associated metadata.", ""), + new Field("commit_timestamp", INT64), + new Field("expire_timestamp", INT64)) + private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") + private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") + private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") + private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") + + private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) + private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") + + private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING), + new Field("client_id", STRING), + new Field("client_host", STRING), + new Field("session_timeout", INT32), + new Field("subscription", BYTES), + new Field("assignment", BYTES)) + private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id") + private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id") + private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host") + private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout") + private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription") + private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment") + + + private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING), + new Field("generation", INT32), + new Field("protocol", STRING), + new Field("leader", STRING), + new Field("members", new ArrayOf(MEMBER_METADATA_V0))) + private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type") + private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation") + private val GROUP_METADATA_PROTOCOL_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol") + private val GROUP_METADATA_LEADER_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("leader") + private val GROUP_METADATA_MEMBERS_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("members") + + // map of versions to key schemas as data types + private val MESSAGE_TYPE_SCHEMAS = Map( + 0 -> OFFSET_COMMIT_KEY_SCHEMA, + 1 -> OFFSET_COMMIT_KEY_SCHEMA, + 2 -> GROUP_METADATA_KEY_SCHEMA) + + // map of version of offset value schemas + private val OFFSET_VALUE_SCHEMAS = Map( + 0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0, + 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1) + private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort + + // map of version of group metadata value schemas + private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0) + private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 0.toShort + + private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION) + private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION) + + private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION) + private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION) + + private def schemaForKey(version: Int) = { + val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version) + schemaOpt match { + case Some(schema) => schema + case _ => throw new KafkaException("Unknown offset schema version " + version) + } + } + + private def schemaForOffset(version: Int) = { + val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version) + schemaOpt match { + case Some(schema) => schema + case _ => throw new KafkaException("Unknown offset schema version " + version) + } + } + + private def schemaForGroup(version: Int) = { + val schemaOpt = GROUP_VALUE_SCHEMAS.get(version) + schemaOpt match { + case Some(schema) => schema + case _ => throw new KafkaException("Unknown group metadata version " + version) + } + } + + /** + * Generates the key for group metadata message for given group + * + * @return key bytes for group metadata message + */ + def groupMetadataKey(group: String): Array[Byte] = { + val key = new Struct(CURRENT_GROUP_KEY_SCHEMA) + key.set(GROUP_KEY_GROUP_FIELD, group) + + val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf) + byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION) + key.writeTo(byteBuffer) + byteBuffer.array() + } + + /** + * Generates the key for offset commit message for given (group, topic, partition) + * + * @return key for offset commit message + */ + def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = { + val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA) + key.set(OFFSET_KEY_GROUP_FIELD, group) + key.set(OFFSET_KEY_TOPIC_FIELD, topic) + key.set(OFFSET_KEY_PARTITION_FIELD, partition) + + val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf) + byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION) + key.writeTo(byteBuffer) + byteBuffer.array() + } + + /** + * Generates the payload for offset commit message from given offset and metadata + * + * @param offsetAndMetadata consumer's current offset and metadata + * @return payload for offset commit message + */ + def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = { + // generate commit value with schema version 1 + val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA) + value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset) + value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata) + value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp) + value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp) + val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) + byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION) + value.writeTo(byteBuffer) + byteBuffer.array() + } +} diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala index 8bfbc92..ff5e5da 100644 --- a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala @@ -1,27 +1,32 @@ package com.quantifind.kafka.core import com.quantifind.kafka.offsetapp.OffsetGetterArgs +import com.quantifind.utils.ZkUtilsWrapper +import java.nio.{BufferUnderflowException, ByteBuffer} + import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse} -import kafka.common.{OffsetAndMetadata, TopicAndPartition} -import kafka.coordinator.GroupTopicPartition +import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition} +import kafka.coordinator._ import kafka.consumer.SimpleConsumer -import kafka.utils.ZkUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition import org.mockito.Matchers._ import org.mockito.Mockito._ import org.mockito.{Mockito, Matchers => MockitoMatchers} import org.scalatest._ + class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { trait Fixture { - val mockedZkUtil = Mockito.mock(classOf[ZkUtils]) + val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper]) val mockedConsumer = Mockito.mock(classOf[SimpleConsumer]) val testPartitionLeader = 1 val args = new OffsetGetterArgs - val offsetGetter = new KafkaOffsetGetter(args) + val offsetGetter = new KafkaOffsetGetter(mockedZkUtil, args) offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer)) } @@ -34,17 +39,17 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val logEndOffset = 102 val topicAndPartition = TopicAndPartition(testTopic, testPartition) - val groupTopicPartition = GroupTopicPartition(testGroup, topicAndPartition) + val topicPartition = new TopicPartition(testTopic, testPartition) + val groupTopicPartition = GroupTopicPartition(testGroup, TopicAndPartition(testTopic, testPartition)) val offsetAndMetadata = OffsetAndMetadata(committedOffset, "meta", System.currentTimeMillis) - KafkaOffsetGetter.committedOffsetMap += (groupTopicPartition -> offsetAndMetadata) - //topicPartitionOffsetsMap - KafkaOffsetGetter.topicPartitionOffsetsMap += (topicAndPartition -> logEndOffset) + KafkaOffsetGetter.committedOffsetMap += (groupTopicPartition -> offsetAndMetadata) + KafkaOffsetGetter.logEndOffsetsMap += (topicPartition -> logEndOffset) when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition))) - .thenReturn(Some(testPartitionLeader)) + .thenReturn(Some(testPartitionLeader)) - val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(logEndOffset))) + val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0, Seq(logEndOffset))) val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets) when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse) @@ -57,6 +62,103 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { offsetInfo.logSize shouldBe logEndOffset case None => fail("Failed to build offset data") } - + } + + "tryParseOffsetMessage" should "return None for messages with null keys" in { + + val topic: String = "topic-test" + val partition: Int = 0 + val offset: Long = 1 + val group: String = "group-test" + val key: Array[Byte] = null + val value: Array[Byte] = null + + val cRecord: ConsumerRecord[Array[Byte], Array[Byte]] = new ConsumerRecord[Array[Byte], Array[Byte]](topic, partition, offset, key, value) + + KafkaOffsetGetter.tryParseOffsetMessage(cRecord) shouldBe None + } + + "tryParseOffsetMessage" should "return None for messages with malformed keys" in { + + val topic: String = "topic-test" + val partition: Int = 0 + val offset: Long = 1 + val group: String = "group-test" + val keySmall: Array[Byte] = Array[Byte](1) + val keyBig: Array[Byte] = BigInt(Long.MaxValue).toByteArray + val value: Array[Byte] = Array[Byte](1) + + val cRecordSmall: ConsumerRecord[Array[Byte], Array[Byte]] = new ConsumerRecord[Array[Byte], Array[Byte]](topic, partition, offset, keySmall, value) + KafkaOffsetGetter.tryParseOffsetMessage(cRecordSmall) shouldBe None + + val cRecordBig: ConsumerRecord[Array[Byte], Array[Byte]] = new ConsumerRecord[Array[Byte], Array[Byte]](topic, partition, offset, keyBig, value) + KafkaOffsetGetter.tryParseOffsetMessage(cRecordBig) shouldBe None + } + + "tryParseOffsetMessage" should "return None for messages with any key other than OffsetKey" in { + + val topic: String = "topic-test" + val partition: Int = 0 + val offset: Long = 1 + val group: String = "group-test" + val key: Array[Byte] = KafkaMessageProtocolHelper.groupMetadataKey(group) + val value: Array[Byte] = Array[Byte](1) + val cRecord: ConsumerRecord[Array[Byte], Array[Byte]] = new ConsumerRecord[Array[Byte], Array[Byte]](topic, partition, offset, key, value) + + KafkaOffsetGetter.tryParseOffsetMessage(cRecord) shouldBe None + } + + "tryParseOffsetMessage" should "return None for messages with a valid OffsetKey and a null body/value" in { + + val topic: String = "topic-test" + val partition: Int = 0 + val offset: Long = 1 + val group: String = "group-test" + + val key: Array[Byte] = KafkaMessageProtocolHelper.offsetCommitKey(group, topic, partition, 1.toShort) + val value: Array[Byte] = null + val cRecord: ConsumerRecord[Array[Byte], Array[Byte]] = new ConsumerRecord[Array[Byte], Array[Byte]](topic, partition, offset, key, value) + + KafkaOffsetGetter.tryParseOffsetMessage(cRecord) shouldBe None + } + + "tryParseOffsetMessage" should "return None for messages with a valid OffsetKey and a malformed body/value" in { + + val topic: String = "topic-test" + val partition: Int = 0 + val offset: Long = 1 + val group: String = "group-test" + + val key: Array[Byte] = KafkaMessageProtocolHelper.offsetCommitKey(group, topic, partition, 1.toShort) + val value: Array[Byte] = Array[Byte](1) + val cRecord: ConsumerRecord[Array[Byte], Array[Byte]] = new ConsumerRecord[Array[Byte], Array[Byte]](topic, partition, offset, key, value) + + KafkaOffsetGetter.tryParseOffsetMessage(cRecord) shouldBe None + } + + "tryParseOffsetMessage" should "parse and return valid data for messages with a valid key of type OffsetKey and a valid body/value of type OffsetAndMetadata " in { + + val topic: String = "topic-test" + val partition: Int = 0 + val offset: Long = 1 + val metadata: String = "metadata-test" + val group: String = "group-test" + val commitTimestamp: Long = 12345 + val offsetMetadata: OffsetMetadata = new OffsetMetadata(offset, metadata) + val offsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offsetMetadata, commitTimestamp, commitTimestamp) + + val key: Array[Byte] = KafkaMessageProtocolHelper.offsetCommitKey(group, topic, partition, 1.toShort) + val value: Array[Byte] = KafkaMessageProtocolHelper.offsetCommitValue(offsetAndMetadata) + val cRecord: ConsumerRecord[Array[Byte], Array[Byte]] = new ConsumerRecord[Array[Byte], Array[Byte]](topic, partition, offset, key, value) + + val offsetMessage: Option[(GroupTopicPartition, OffsetAndMetadata)] = KafkaOffsetGetter.tryParseOffsetMessage(cRecord) + offsetMessage.isDefined shouldBe true + + val messageOffsetMap: (GroupTopicPartition, OffsetAndMetadata) = offsetMessage.get + val gtp: GroupTopicPartition = messageOffsetMap._1 + val offMeta: OffsetAndMetadata = messageOffsetMap._2 + gtp.group shouldBe group + gtp.topicPartition shouldBe TopicAndPartition(topic, partition) + offMeta shouldBe offsetAndMetadata } } \ No newline at end of file