diff --git a/.travis.yml b/.travis.yml index d7ccbba..2e8db24 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: scala scala: - - 2.11.8 + - 2.11.12 jdk: - - oraclejdk8 \ No newline at end of file + - oraclejdk8 diff --git a/README.md b/README.md index 632c3d9..fcf357b 100644 --- a/README.md +++ b/README.md @@ -50,23 +50,39 @@ $ sbt clean assembly Running It =========== -If you do not want to build it manually, just download the [current jar](https://github.com/Morningstar/kafka-offset-monitor/releases/latest). +If you do not want to build it manually, just download the [current jar](https://github.com/dmilan77/kafka-offset-monitor/releases/latest). This is a small web app, you can run it locally or on a server, as long as you have access to the Kafka broker(s) and ZooKeeper nodes storing kafka data. ``` - +# For http +java -Djava.security.auth.login.config=conf/server-client-jaas.conf \ + -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \ + com.quantifind.kafka.offsetapp.OffsetGetterWeb \ + --offsetStorage kafka \ + --zk zkserver01,zkserver02 \ + --port 8080 \ + --refresh 10.seconds \ + --retain 2.days \ + --dbName offsetapp_kafka \ + --consumerConfig consumerConfig-File + +# For https java -Djava.security.auth.login.config=conf/server-client-jaas.conf \ -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \ + -Djetty.ssl.keyStore=//localhost-keystore.jks \ + -Djetty.ssl.keyStorePassword=trustme \ + -Djetty.ssl.trustStore=//all-truststore.jks \ + -Djetty.ssl.trustStorePassword=trustme \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ - --kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 \ - --kafkaSecurityProtocol SASL_PLAINTEXT \ + --protocol https \ --zk zkserver01,zkserver02 \ - --port 8081 \ + --port 8080 \ --refresh 10.seconds \ --retain 2.days \ - --dbName offsetapp_kafka + --dbName offsetapp_kafka \ + --consumerConfig consumerConfig-File ``` @@ -74,9 +90,9 @@ The arguments are: - **offsetStorage** valid options are ''kafka'', ''zookeeper'', or ''storm''. Anything else falls back to ''zookeeper'' - **zk** the ZooKeeper hosts -- **kafkaBrokers** comma-separated list of Kafka broker hosts (ex. "host1:port,host2:port'). Required only when using offsetStorage "kafka". -- **kafkaSecurityProtocol** security protocol to use when connecting to kafka brokers (default: ''PLAINTEXT'', optional: ''SASL_PLAINTEXT'') +- **consumerConfig** kafka consumer config. Needed for SSL/TLS Kafka - **port** the port on which the app will be made available +- **protocol** `http` or `https` . Default value `http` - **refresh** how often should the app refresh and store a point in the DB - **retain** how long should points be kept in the DB - **dbName** where to store the history (default 'offsetapp') @@ -126,16 +142,17 @@ As long as this is true you will need to use local maven repo and just publish K Assuming you have a custom implementation of OffsetInfoReporter in a jar file, running it is as simple as adding the jar to the classpath when running app: ``` -java -cp KafkaOffsetMonitor-assembly-0.4.6.jar:kafka-offset-monitor-another-db-reporter.jar \ - com.quantifind.kafka.offsetapp.OffsetGetterWeb \ - --zk zkserver01,zkserver02 \ - --port 8080 \ +java -Djava.security.auth.login.config=//kafka_client_jaas.conf \ + -cp KafkaOffsetMonitor-assembly-0.4.7-dmilan-SNAPSHOT.jar \ + com.quantifind.kafka.offsetapp.OffsetGetterWeb \ + --zk ${MASTER1} \ + --port 8089 \ --refresh 10.seconds \ - --retain 2.days - --pluginsArgs anotherDbHost=host1,anotherDbPort=555 + --retain 2.days \ + --dbName offsetapp_kafka \ + --consumerConfig //kafka-config.property ``` -For complete working example you can check [kafka-offset-monitor-graphite](https://github.com/allegro/kafka-offset-monitor-graphite), a plugin reporting offset information to Graphite. Contributing ============ diff --git a/build.sbt b/build.sbt index 02589f8..758ef71 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "KafkaOffsetMonitor" -version := "0.4.6-SNAPSHOT" -scalaVersion := "2.11.11" +version := "0.4.7-dmilan-SNAPSHOT" +scalaVersion := "2.11.12" organization := "com.quantifind" scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature") @@ -13,11 +13,11 @@ libraryDependencies ++= Seq( "net.databinder" %% "unfiltered-jetty" % "0.8.4", "net.databinder" %% "unfiltered-json4s" % "0.8.4", "com.quantifind" %% "sumac" % "0.3.0", - "org.apache.kafka" %% "kafka" % "0.9.0.1", + "org.apache.kafka" %% "kafka" % "1.1.0", "org.reflections" % "reflections" % "0.9.11", "com.twitter" %% "util-core" % "7.1.0", "com.typesafe.slick" %% "slick" % "2.1.0", - "org.xerial" % "sqlite-jdbc" % "3.18.0", + "org.xerial" % "sqlite-jdbc" % "3.25.2", "com.google.code.gson" % "gson" % "2.8.2", "com.google.guava" % "guava" % "20.0", "javax.ws.rs" % "javax.ws.rs-api" % "2.0-m16", diff --git a/kafka-config.property b/kafka-config.property new file mode 100644 index 0000000..4fd24ec --- /dev/null +++ b/kafka-config.property @@ -0,0 +1,15 @@ +bootstrap.servers = localhost:9092 +key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer +key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer +#sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false renewTicket=true serviceName="kafka" useKeyTab=true keyTab="/scurepath/kafka.headless.keytab" principal="kafka@EXAMPLE.COM"; +#sasl.kerberos.service.name = kafka +#sasl.mechanism = GSSAPI +security.protocol = PLAINTEXT +value.deserializer = org.apache.kafka.common.serialization.ByteArraySerializer +value.serializer = org.apache.kafka.common.serialization.ByteArrayDeserializer +#ssl.keymanager.algorithm = SunX509 +#ssl.protocol = TLSv1.2 +#ssl.trustmanager.algorithm = SunX509 +#ssl.truststore.location = /secure-path/all-truststore.jks +#ssl.truststore.password = StRoNgPaSsWoRd +#ssl.truststore.type = JKS \ No newline at end of file diff --git a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala index 7ea1c1b..94a378c 100644 --- a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala @@ -1,5 +1,6 @@ package com.quantifind.kafka.core +import java.io.FileInputStream import java.nio.{BufferUnderflowException, ByteBuffer} import java.util import java.util.concurrent.atomic.AtomicReference @@ -10,23 +11,29 @@ import com.morningstar.kafka.KafkaOffsetMetadata import com.morningstar.kafka.KafkaOffsetStorage import com.morningstar.kafka.KafkaTopicPartition import com.morningstar.kafka.KafkaTopicPartitionLogEndOffset - import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.kafka.offsetapp.OffsetGetterArgs import com.quantifind.kafka.{Node, OffsetGetter} import com.quantifind.utils.ZkUtilsWrapper import com.twitter.util.Time -import kafka.admin.AdminClient -import kafka.common.{KafkaException, OffsetAndMetadata, TopicAndPartition} -import kafka.coordinator._ +import kafka.common.TopicAndPartition +import kafka.coordinator.group._ import kafka.utils.Logging +import kafka.common.OffsetAndMetadata +import kafka.admin.AdminClient +import kafka.coordinator.group.GroupTopicPartition import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.{PartitionInfo, TopicPartition} +import org.apache.kafka.common.{KafkaException, PartitionInfo, TopicPartition} +import scala.collection.mutable.ArrayBuffer import scala.collection.{mutable, _} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future, duration} +import scala.collection.JavaConverters._ + + + /** @@ -43,8 +50,8 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) override def processPartition(group: String, topic: String, partitionId: Int): Option[OffsetInfo] = { val topicPartition = new TopicPartition(topic, partitionId) - val topicAndPartition = TopicAndPartition(topic, partitionId) - val optionalOffsetMetaData: Option[OffsetAndMetadata] = committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition)) +// val topicAndPartition = TopicAndPartition(topic, partitionId) + val optionalOffsetMetaData: Option[OffsetAndMetadata] = committedOffsetMap.get(GroupTopicPartition(group, topicPartition)) if (!optionalOffsetMetaData.isDefined) { error(s"processPartition: Could not find group-topic-partition in committedOffsetsMap, g:$group,t:$topic,p:$partitionId") @@ -181,6 +188,8 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) object KafkaOffsetGetter extends Logging { + + val committedOffsetMap: concurrent.Map[GroupTopicPartition, OffsetAndMetadata] = concurrent.TrieMap() val logEndOffsetsMap: concurrent.Map[TopicPartition, Long] = concurrent.TrieMap() val kafkaOffsetStorage: KafkaOffsetStorage = new KafkaOffsetStorage(); @@ -195,8 +204,9 @@ object KafkaOffsetGetter extends Logging { private def createNewKafkaConsumer(args: OffsetGetterArgs, group: String, autoCommitOffset: Boolean): KafkaConsumer[Array[Byte], Array[Byte]] = { val props: Properties = new Properties - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) + + props.load(new FileInputStream(args.consumerConfig)) + props.put(ConsumerConfig.GROUP_ID_CONFIG, group) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, if (autoCommitOffset) "true" else "false") props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") @@ -213,8 +223,10 @@ object KafkaOffsetGetter extends Logging { var adminClient: AdminClient = null val props: Properties = new Properties - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) + + + props.load(new FileInputStream(args.consumerConfig)) + while (null == adminClient) { @@ -274,6 +286,7 @@ object KafkaOffsetGetter extends Logging { if (messageBody != null) { val gtp: GroupTopicPartition = b.key + val offsetAndMetadata: OffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(messageBody)) return Option(gtp, offsetAndMetadata) } @@ -327,12 +340,7 @@ object KafkaOffsetGetter extends Logging { if (args.kafkaOffsetForceFromStart) { val topicPartitionIterator = partitions.iterator() - - while (topicPartitionIterator.hasNext()) { - - val topicPartition: TopicPartition = topicPartitionIterator.next() - offsetConsumer.seekToBeginning(topicPartition) - } + offsetConsumer.seekToBeginning(partitions) } } @@ -422,13 +430,14 @@ object KafkaOffsetGetter extends Logging { groupOverviews.foreach((groupOverview: GroupOverview) => { - val groupId: String = groupOverview.groupId; - val consumerGroupSummary: List[AdminClient#ConsumerSummary] = adminClient.describeConsumerGroup(groupId) + val groupId: String = groupOverview.groupId + + val consumerGroupSummary: Option[List[AdminClient#ConsumerSummary]] = adminClient.describeConsumerGroup(groupId).consumers - consumerGroupSummary.foreach((consumerSummary) => { + consumerGroupSummary.get.foreach((consumerSummary) => { val clientId: String = consumerSummary.clientId - val clientHost: String = consumerSummary.clientHost + val clientHost: String = consumerSummary.host val topicPartitions: List[TopicPartition] = consumerSummary.assignment @@ -510,8 +519,10 @@ object KafkaOffsetGetter extends Logging { // Get the LogEndOffset for the TopicPartition val topicPartition: TopicPartition = new TopicPartition(partitionInfo.topic, partitionInfo.partition) + logEndOffsetGetter.assign(Arrays.asList(topicPartition)) - logEndOffsetGetter.seekToEnd(topicPartition) + + logEndOffsetGetter.seekToEnd(ArrayBuffer(topicPartition).asJava) val logEndOffset: Long = logEndOffsetGetter.position(topicPartition) // Update KafkaOffsetStorage diff --git a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala index a987086..21be064 100644 --- a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala @@ -8,6 +8,7 @@ import com.twitter.util.Time import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition import kafka.utils.Json +import kafka.utils.json.{JsonArray, JsonValue} import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat @@ -86,8 +87,14 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend println(stateJson) Json.parseFull(stateJson) match { case Some(m) => - val spoutState = m.asInstanceOf[Map[String, Any]] - List(spoutState.getOrElse("topic", "Unknown Topic").toString) + println(m) + val spoutStateValue:JsonValue = m.asInstanceOf[JsonValue] +// spoutStateValue.toString() +// +// val spoutState = spoutStateArray.asInstanceOf[Map[String, Any]] +// +// List(spoutState.getOrElse("topic", "Unknown Topic").toString) + List("testtopic") case None => List() } diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala index a7fc541..0caa26c 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala @@ -35,15 +35,14 @@ class OffsetGetterArgsWGT extends OffsetGetterArgs { class OffsetGetterArgs extends FieldArgs { - var offsetStorage: String = "zookeeper" + var offsetStorage: String = "kafka" - var kafkaOffsetForceFromStart = false + var kafkaOffsetForceFromStart = true var stormZKOffsetBase = "/stormconsumers" - var kafkaBrokers: String = _ - - var kafkaSecurityProtocol = "PLAINTEXT" + @Required + var consumerConfig: String = _ @Required var zk: String = _ diff --git a/src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala b/src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala index 5308649..e6cc2d4 100644 --- a/src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala +++ b/src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala @@ -24,10 +24,17 @@ trait UnfilteredWebApp[T <: Arguments] extends ArgMain[T] { override def main(parsed: T) { val root = getClass.getResource(htmlRoot) println("serving resources from: " + root) - unfiltered.jetty.Http(parsed.port) - .resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...) - .filter(setup(parsed)) - .run(_ => afterStart(), _ => afterStop()) + if (parsed.protocol.equals("https")) { + unfiltered.jetty.Https(parsed.port) + .resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...) + .filter(setup(parsed)) + .run(_ => afterStart(), _ => afterStop()) + }else{ + unfiltered.jetty.Http(parsed.port) + .resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...) + .filter(setup(parsed)) + .run(_ => afterStart(), _ => afterStop()) + } } } @@ -36,6 +43,7 @@ object UnfilteredWebApp { trait Arguments extends FieldArgs { var port = Port.any + var protocol:String = "http" } } diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index 8419583..5d60613 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -1,5 +1,5 @@ # Root logger option -log4j.rootLogger=INFO, stdout +log4j.rootLogger=ERROR, stdout # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala index 2f84a6c..7f4777f 100644 --- a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala @@ -8,6 +8,7 @@ import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse} import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition} import kafka.coordinator._ import kafka.consumer.SimpleConsumer +import kafka.coordinator.group.GroupTopicPartition import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.mockito.Matchers._ @@ -42,7 +43,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val topicAndPartition = TopicAndPartition(testTopic, testPartition) val topicPartition = new TopicPartition(testTopic, testPartition) - val groupTopicPartition = GroupTopicPartition(testGroup, TopicAndPartition(testTopic, testPartition)) + val groupTopicPartition = GroupTopicPartition(testGroup, topicPartition) val offsetAndMetadata = OffsetAndMetadata(committedOffset, "meta", System.currentTimeMillis) KafkaOffsetGetter.committedOffsetMap += (groupTopicPartition -> offsetAndMetadata) @@ -51,7 +52,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition))) .thenReturn(Some(testPartitionLeader)) - val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0, Seq(logEndOffset))) + val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(org.apache.kafka.common.protocol.Errors.ILLEGAL_SASL_STATE, Seq(logEndOffset))) val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets) when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse) when(offsetGetterSpy.isGroupActive(any[String])).thenReturn(true) @@ -161,7 +162,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val gtp: GroupTopicPartition = messageOffsetMap._1 val offMeta: OffsetAndMetadata = messageOffsetMap._2 gtp.group shouldBe group - gtp.topicPartition shouldBe TopicAndPartition(topic, partition) + gtp.topicPartition shouldBe new TopicPartition(topic, partition) offMeta shouldBe offsetAndMetadata } } diff --git a/start_kafkaMonitor.sh b/start_kafkaMonitor.sh new file mode 100644 index 0000000..18c737a --- /dev/null +++ b/start_kafkaMonitor.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +ZOOKEEPER_HOST=zk1.server.com + +java -Djava.security.auth.login.config=kafka_client_jaas.conf \ + -Djetty.ssl.keyStore=keystore.jks \ + -Djetty.ssl.keyStorePassword=keypassword \ + -Djetty.ssl.trustStore=truststore.jks \ + -Djetty.ssl.trustStorePassword=trustpassword \ + -cp KafkaOffsetMonitor-assembly-0.4.7-dmilan-SNAPSHOT.jar \ + com.quantifind.kafka.offsetapp.OffsetGetterWeb \ + --zk ${ZOOKEEPER_HOST} \ + --port 8443 \ + --protocol https \ + --refresh 10.seconds \ + --retain 2.days \ + --dbName offsetapp_kafka \ + --consumerConfig //kafka-config.property \ No newline at end of file