From a7aeaa733de89d110330e311723ec3ae7b803118 Mon Sep 17 00:00:00 2001 From: dmilan77 Date: Thu, 20 Dec 2018 10:44:53 -0500 Subject: [PATCH 1/8] Added consumerConfig for Kafka. Needed for SSL/TLS --- .../kafka/core/KafkaOffsetGetter.scala | 33 ++++++++++++------- .../kafka/offsetapp/OffsetGetterApp.scala | 2 ++ 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala index 7ea1c1b..29b00fc 100644 --- a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala @@ -1,5 +1,6 @@ package com.quantifind.kafka.core +import java.io.FileInputStream import java.nio.{BufferUnderflowException, ByteBuffer} import java.util import java.util.concurrent.atomic.AtomicReference @@ -10,7 +11,6 @@ import com.morningstar.kafka.KafkaOffsetMetadata import com.morningstar.kafka.KafkaOffsetStorage import com.morningstar.kafka.KafkaTopicPartition import com.morningstar.kafka.KafkaTopicPartitionLogEndOffset - import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.kafka.offsetapp.OffsetGetterArgs import com.quantifind.kafka.{Node, OffsetGetter} @@ -195,14 +195,19 @@ object KafkaOffsetGetter extends Logging { private def createNewKafkaConsumer(args: OffsetGetterArgs, group: String, autoCommitOffset: Boolean): KafkaConsumer[Array[Byte], Array[Byte]] = { val props: Properties = new Properties - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) - props.put(ConsumerConfig.GROUP_ID_CONFIG, group) - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, if (autoCommitOffset) "true" else "false") - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (args.kafkaOffsetForceFromStart) "earliest" else "latest") + if(args.consumerConfig isEmpty) { + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) + props.put(ConsumerConfig.GROUP_ID_CONFIG, group) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, if (autoCommitOffset) "true" else "false") + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (args.kafkaOffsetForceFromStart) "earliest" else "latest") + } + else{ + props.load(new FileInputStream(args.consumerConfig)) + } new KafkaConsumer[Array[Byte], Array[Byte]](props) } @@ -213,8 +218,14 @@ object KafkaOffsetGetter extends Logging { var adminClient: AdminClient = null val props: Properties = new Properties - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) + + if(args.consumerConfig isEmpty) { + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) + } + else{ + props.load(new FileInputStream(args.consumerConfig)) + } while (null == adminClient) { diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala index a7fc541..54c6016 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala @@ -45,6 +45,8 @@ class OffsetGetterArgs extends FieldArgs { var kafkaSecurityProtocol = "PLAINTEXT" + var consumerConfig = "" + @Required var zk: String = _ From 3e9c1091acd6ddbda5fdeb2d796772ca3f8833da Mon Sep 17 00:00:00 2001 From: dmilan77 Date: Thu, 20 Dec 2018 10:45:13 -0500 Subject: [PATCH 2/8] Added consumerConfig for Kafka. Needed for SSL/TLS --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 632c3d9..226ee99 100644 --- a/README.md +++ b/README.md @@ -66,8 +66,8 @@ java -Djava.security.auth.login.config=conf/server-client-jaas.conf \ --port 8081 \ --refresh 10.seconds \ --retain 2.days \ - --dbName offsetapp_kafka - + --dbName offsetapp_kafka \ + --consumerConfig consumerConfig File Location ``` The arguments are: @@ -76,6 +76,7 @@ The arguments are: - **zk** the ZooKeeper hosts - **kafkaBrokers** comma-separated list of Kafka broker hosts (ex. "host1:port,host2:port'). Required only when using offsetStorage "kafka". - **kafkaSecurityProtocol** security protocol to use when connecting to kafka brokers (default: ''PLAINTEXT'', optional: ''SASL_PLAINTEXT'') +- **consumerConfig** kafka consumer config. Needed for SSL/TLS Kafka - **port** the port on which the app will be made available - **refresh** how often should the app refresh and store a point in the DB - **retain** how long should points be kept in the DB From bbe3d5d6dcc64378c0d5643b0a0c72b05de4a515 Mon Sep 17 00:00:00 2001 From: dmilan77 Date: Fri, 21 Dec 2018 15:34:50 -0500 Subject: [PATCH 3/8] * copy DEV to master working tree --- README.md | 44 +++++++---- build.sbt | 8 +- kafka-config.property | 15 ++++ .../kafka/core/KafkaOffsetGetter.scala | 74 +++++++++---------- .../kafka/offsetapp/OffsetGetterApp.scala | 7 +- .../quantifind/utils/UnfilteredWebApp.scala | 16 +++- src/test/resources/log4j.properties | 2 +- .../kafka/core/KafkaOffsetGetterSpec.scala | 5 +- start_kafkaMonitor.sh | 12 +++ 9 files changed, 116 insertions(+), 67 deletions(-) create mode 100644 kafka-config.property create mode 100644 start_kafkaMonitor.sh diff --git a/README.md b/README.md index 226ee99..4e3ca14 100644 --- a/README.md +++ b/README.md @@ -55,29 +55,44 @@ If you do not want to build it manually, just download the [current jar](https:/ This is a small web app, you can run it locally or on a server, as long as you have access to the Kafka broker(s) and ZooKeeper nodes storing kafka data. ``` - +# For http java -Djava.security.auth.login.config=conf/server-client-jaas.conf \ -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ - --kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 \ - --kafkaSecurityProtocol SASL_PLAINTEXT \ --zk zkserver01,zkserver02 \ - --port 8081 \ + --port 8080 \ --refresh 10.seconds \ --retain 2.days \ --dbName offsetapp_kafka \ - --consumerConfig consumerConfig File Location + --consumerConfig consumerConfig-File + +# For https +java -Djava.security.auth.login.config=conf/server-client-jaas.conf \ + -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \ + -Djetty.ssl.keyStore=//localhost-keystore.jks \ + -Djetty.ssl.keyStorePassword=trustme \ + -Djetty.ssl.trustStore=//all-truststore.jks \ + -Djetty.ssl.trustStorePassword=trustme \ + com.quantifind.kafka.offsetapp.OffsetGetterWeb \ + --offsetStorage kafka \ + --kafkaSecurityProtocol SASL_SSL \ + --zk zkserver01,zkserver02 \ + --port 8080 \ + --refresh 10.seconds \ + --retain 2.days \ + --dbName offsetapp_kafka \ + --consumerConfig consumerConfig-File + ``` The arguments are: - **offsetStorage** valid options are ''kafka'', ''zookeeper'', or ''storm''. Anything else falls back to ''zookeeper'' - **zk** the ZooKeeper hosts -- **kafkaBrokers** comma-separated list of Kafka broker hosts (ex. "host1:port,host2:port'). Required only when using offsetStorage "kafka". -- **kafkaSecurityProtocol** security protocol to use when connecting to kafka brokers (default: ''PLAINTEXT'', optional: ''SASL_PLAINTEXT'') - **consumerConfig** kafka consumer config. Needed for SSL/TLS Kafka - **port** the port on which the app will be made available +- **protocol** `http` or `https` . Default value `http` - **refresh** how often should the app refresh and store a point in the DB - **retain** how long should points be kept in the DB - **dbName** where to store the history (default 'offsetapp') @@ -127,16 +142,17 @@ As long as this is true you will need to use local maven repo and just publish K Assuming you have a custom implementation of OffsetInfoReporter in a jar file, running it is as simple as adding the jar to the classpath when running app: ``` -java -cp KafkaOffsetMonitor-assembly-0.4.6.jar:kafka-offset-monitor-another-db-reporter.jar \ - com.quantifind.kafka.offsetapp.OffsetGetterWeb \ - --zk zkserver01,zkserver02 \ - --port 8080 \ +java -Djava.security.auth.login.config=//kafka_client_jaas.conf \ + -cp KafkaOffsetMonitor-assembly-0.4.7-dmilan-SNAPSHOT.jar \ + com.quantifind.kafka.offsetapp.OffsetGetterWeb \ + --zk ${MASTER1} \ + --port 8089 \ --refresh 10.seconds \ - --retain 2.days - --pluginsArgs anotherDbHost=host1,anotherDbPort=555 + --retain 2.days \ + --dbName offsetapp_kafka \ + --consumerConfig //kafka-config.property ``` -For complete working example you can check [kafka-offset-monitor-graphite](https://github.com/allegro/kafka-offset-monitor-graphite), a plugin reporting offset information to Graphite. Contributing ============ diff --git a/build.sbt b/build.sbt index 02589f8..758ef71 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "KafkaOffsetMonitor" -version := "0.4.6-SNAPSHOT" -scalaVersion := "2.11.11" +version := "0.4.7-dmilan-SNAPSHOT" +scalaVersion := "2.11.12" organization := "com.quantifind" scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature") @@ -13,11 +13,11 @@ libraryDependencies ++= Seq( "net.databinder" %% "unfiltered-jetty" % "0.8.4", "net.databinder" %% "unfiltered-json4s" % "0.8.4", "com.quantifind" %% "sumac" % "0.3.0", - "org.apache.kafka" %% "kafka" % "0.9.0.1", + "org.apache.kafka" %% "kafka" % "1.1.0", "org.reflections" % "reflections" % "0.9.11", "com.twitter" %% "util-core" % "7.1.0", "com.typesafe.slick" %% "slick" % "2.1.0", - "org.xerial" % "sqlite-jdbc" % "3.18.0", + "org.xerial" % "sqlite-jdbc" % "3.25.2", "com.google.code.gson" % "gson" % "2.8.2", "com.google.guava" % "guava" % "20.0", "javax.ws.rs" % "javax.ws.rs-api" % "2.0-m16", diff --git a/kafka-config.property b/kafka-config.property new file mode 100644 index 0000000..4fd24ec --- /dev/null +++ b/kafka-config.property @@ -0,0 +1,15 @@ +bootstrap.servers = localhost:9092 +key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer +key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer +#sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false renewTicket=true serviceName="kafka" useKeyTab=true keyTab="/scurepath/kafka.headless.keytab" principal="kafka@EXAMPLE.COM"; +#sasl.kerberos.service.name = kafka +#sasl.mechanism = GSSAPI +security.protocol = PLAINTEXT +value.deserializer = org.apache.kafka.common.serialization.ByteArraySerializer +value.serializer = org.apache.kafka.common.serialization.ByteArrayDeserializer +#ssl.keymanager.algorithm = SunX509 +#ssl.protocol = TLSv1.2 +#ssl.trustmanager.algorithm = SunX509 +#ssl.truststore.location = /secure-path/all-truststore.jks +#ssl.truststore.password = StRoNgPaSsWoRd +#ssl.truststore.type = JKS \ No newline at end of file diff --git a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala index 29b00fc..94a378c 100644 --- a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala @@ -16,17 +16,24 @@ import com.quantifind.kafka.offsetapp.OffsetGetterArgs import com.quantifind.kafka.{Node, OffsetGetter} import com.quantifind.utils.ZkUtilsWrapper import com.twitter.util.Time -import kafka.admin.AdminClient -import kafka.common.{KafkaException, OffsetAndMetadata, TopicAndPartition} -import kafka.coordinator._ +import kafka.common.TopicAndPartition +import kafka.coordinator.group._ import kafka.utils.Logging +import kafka.common.OffsetAndMetadata +import kafka.admin.AdminClient +import kafka.coordinator.group.GroupTopicPartition import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.{PartitionInfo, TopicPartition} +import org.apache.kafka.common.{KafkaException, PartitionInfo, TopicPartition} +import scala.collection.mutable.ArrayBuffer import scala.collection.{mutable, _} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future, duration} +import scala.collection.JavaConverters._ + + + /** @@ -43,8 +50,8 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) override def processPartition(group: String, topic: String, partitionId: Int): Option[OffsetInfo] = { val topicPartition = new TopicPartition(topic, partitionId) - val topicAndPartition = TopicAndPartition(topic, partitionId) - val optionalOffsetMetaData: Option[OffsetAndMetadata] = committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition)) +// val topicAndPartition = TopicAndPartition(topic, partitionId) + val optionalOffsetMetaData: Option[OffsetAndMetadata] = committedOffsetMap.get(GroupTopicPartition(group, topicPartition)) if (!optionalOffsetMetaData.isDefined) { error(s"processPartition: Could not find group-topic-partition in committedOffsetsMap, g:$group,t:$topic,p:$partitionId") @@ -181,6 +188,8 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) object KafkaOffsetGetter extends Logging { + + val committedOffsetMap: concurrent.Map[GroupTopicPartition, OffsetAndMetadata] = concurrent.TrieMap() val logEndOffsetsMap: concurrent.Map[TopicPartition, Long] = concurrent.TrieMap() val kafkaOffsetStorage: KafkaOffsetStorage = new KafkaOffsetStorage(); @@ -195,19 +204,15 @@ object KafkaOffsetGetter extends Logging { private def createNewKafkaConsumer(args: OffsetGetterArgs, group: String, autoCommitOffset: Boolean): KafkaConsumer[Array[Byte], Array[Byte]] = { val props: Properties = new Properties - if(args.consumerConfig isEmpty) { - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) - props.put(ConsumerConfig.GROUP_ID_CONFIG, group) - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, if (autoCommitOffset) "true" else "false") - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (args.kafkaOffsetForceFromStart) "earliest" else "latest") - } - else{ - props.load(new FileInputStream(args.consumerConfig)) - } + + props.load(new FileInputStream(args.consumerConfig)) + + props.put(ConsumerConfig.GROUP_ID_CONFIG, group) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, if (autoCommitOffset) "true" else "false") + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (args.kafkaOffsetForceFromStart) "earliest" else "latest") new KafkaConsumer[Array[Byte], Array[Byte]](props) } @@ -219,13 +224,9 @@ object KafkaOffsetGetter extends Logging { val props: Properties = new Properties - if(args.consumerConfig isEmpty) { - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers) - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol) - } - else{ - props.load(new FileInputStream(args.consumerConfig)) - } + + props.load(new FileInputStream(args.consumerConfig)) + while (null == adminClient) { @@ -285,6 +286,7 @@ object KafkaOffsetGetter extends Logging { if (messageBody != null) { val gtp: GroupTopicPartition = b.key + val offsetAndMetadata: OffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(messageBody)) return Option(gtp, offsetAndMetadata) } @@ -338,12 +340,7 @@ object KafkaOffsetGetter extends Logging { if (args.kafkaOffsetForceFromStart) { val topicPartitionIterator = partitions.iterator() - - while (topicPartitionIterator.hasNext()) { - - val topicPartition: TopicPartition = topicPartitionIterator.next() - offsetConsumer.seekToBeginning(topicPartition) - } + offsetConsumer.seekToBeginning(partitions) } } @@ -433,13 +430,14 @@ object KafkaOffsetGetter extends Logging { groupOverviews.foreach((groupOverview: GroupOverview) => { - val groupId: String = groupOverview.groupId; - val consumerGroupSummary: List[AdminClient#ConsumerSummary] = adminClient.describeConsumerGroup(groupId) + val groupId: String = groupOverview.groupId + + val consumerGroupSummary: Option[List[AdminClient#ConsumerSummary]] = adminClient.describeConsumerGroup(groupId).consumers - consumerGroupSummary.foreach((consumerSummary) => { + consumerGroupSummary.get.foreach((consumerSummary) => { val clientId: String = consumerSummary.clientId - val clientHost: String = consumerSummary.clientHost + val clientHost: String = consumerSummary.host val topicPartitions: List[TopicPartition] = consumerSummary.assignment @@ -521,8 +519,10 @@ object KafkaOffsetGetter extends Logging { // Get the LogEndOffset for the TopicPartition val topicPartition: TopicPartition = new TopicPartition(partitionInfo.topic, partitionInfo.partition) + logEndOffsetGetter.assign(Arrays.asList(topicPartition)) - logEndOffsetGetter.seekToEnd(topicPartition) + + logEndOffsetGetter.seekToEnd(ArrayBuffer(topicPartition).asJava) val logEndOffset: Long = logEndOffsetGetter.position(topicPartition) // Update KafkaOffsetStorage diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala index 54c6016..a26b979 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala @@ -35,15 +35,12 @@ class OffsetGetterArgsWGT extends OffsetGetterArgs { class OffsetGetterArgs extends FieldArgs { - var offsetStorage: String = "zookeeper" + var offsetStorage: String = "kafka" - var kafkaOffsetForceFromStart = false + var kafkaOffsetForceFromStart = true var stormZKOffsetBase = "/stormconsumers" - var kafkaBrokers: String = _ - - var kafkaSecurityProtocol = "PLAINTEXT" var consumerConfig = "" diff --git a/src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala b/src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala index 5308649..e6cc2d4 100644 --- a/src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala +++ b/src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala @@ -24,10 +24,17 @@ trait UnfilteredWebApp[T <: Arguments] extends ArgMain[T] { override def main(parsed: T) { val root = getClass.getResource(htmlRoot) println("serving resources from: " + root) - unfiltered.jetty.Http(parsed.port) - .resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...) - .filter(setup(parsed)) - .run(_ => afterStart(), _ => afterStop()) + if (parsed.protocol.equals("https")) { + unfiltered.jetty.Https(parsed.port) + .resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...) + .filter(setup(parsed)) + .run(_ => afterStart(), _ => afterStop()) + }else{ + unfiltered.jetty.Http(parsed.port) + .resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...) + .filter(setup(parsed)) + .run(_ => afterStart(), _ => afterStop()) + } } } @@ -36,6 +43,7 @@ object UnfilteredWebApp { trait Arguments extends FieldArgs { var port = Port.any + var protocol:String = "http" } } diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index 8419583..5d60613 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -1,5 +1,5 @@ # Root logger option -log4j.rootLogger=INFO, stdout +log4j.rootLogger=ERROR, stdout # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala index 2f84a6c..d05abc6 100644 --- a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala @@ -8,6 +8,7 @@ import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse} import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition} import kafka.coordinator._ import kafka.consumer.SimpleConsumer +import kafka.coordinator.group.GroupTopicPartition import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.mockito.Matchers._ @@ -42,7 +43,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val topicAndPartition = TopicAndPartition(testTopic, testPartition) val topicPartition = new TopicPartition(testTopic, testPartition) - val groupTopicPartition = GroupTopicPartition(testGroup, TopicAndPartition(testTopic, testPartition)) + val groupTopicPartition = GroupTopicPartition(testGroup, topicPartition) val offsetAndMetadata = OffsetAndMetadata(committedOffset, "meta", System.currentTimeMillis) KafkaOffsetGetter.committedOffsetMap += (groupTopicPartition -> offsetAndMetadata) @@ -51,7 +52,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition))) .thenReturn(Some(testPartitionLeader)) - val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0, Seq(logEndOffset))) + val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(org.apache.kafka.common.protocol.Errors.ILLEGAL_SASL_STATE, Seq(logEndOffset))) val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets) when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse) when(offsetGetterSpy.isGroupActive(any[String])).thenReturn(true) diff --git a/start_kafkaMonitor.sh b/start_kafkaMonitor.sh new file mode 100644 index 0000000..b02c41b --- /dev/null +++ b/start_kafkaMonitor.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +ZOOKEEPER_HOST=zk1.server.com + +java -Djava.security.auth.login.config=/opt/interset/etc/jaas/kafka_client_jaas.conf \ + -cp //KafkaOffsetMonitor/KafkaOffsetMonitor-assembly-0.4.7-dmilan-SNAPSHOT.jar \ + com.quantifind.kafka.offsetapp.OffsetGetterWeb \ + --zk ${ZOOKEEPER_HOST} \ + --port 9144 \ + --refresh 10.seconds \ + --retain 2.days \ + --dbName offsetapp_kafka \ + --consumerConfig //kafka-config.property \ No newline at end of file From 10cd8074cf92a152608c2f0f5214bdd6df54a666 Mon Sep 17 00:00:00 2001 From: dmilan77 Date: Fri, 21 Dec 2018 15:52:35 -0500 Subject: [PATCH 4/8] changed travis --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index d7ccbba..2e8db24 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: scala scala: - - 2.11.8 + - 2.11.12 jdk: - - oraclejdk8 \ No newline at end of file + - oraclejdk8 From ef9156a53f63f2106b756e0b24381d115cca9282 Mon Sep 17 00:00:00 2001 From: dmilan77 Date: Fri, 21 Dec 2018 16:40:44 -0500 Subject: [PATCH 5/8] fixed test code --- .../com/quantifind/kafka/core/StormOffsetGetter.scala | 11 +++++++++-- .../quantifind/kafka/core/KafkaOffsetGetterSpec.scala | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala index a987086..21be064 100644 --- a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala @@ -8,6 +8,7 @@ import com.twitter.util.Time import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition import kafka.utils.Json +import kafka.utils.json.{JsonArray, JsonValue} import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat @@ -86,8 +87,14 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend println(stateJson) Json.parseFull(stateJson) match { case Some(m) => - val spoutState = m.asInstanceOf[Map[String, Any]] - List(spoutState.getOrElse("topic", "Unknown Topic").toString) + println(m) + val spoutStateValue:JsonValue = m.asInstanceOf[JsonValue] +// spoutStateValue.toString() +// +// val spoutState = spoutStateArray.asInstanceOf[Map[String, Any]] +// +// List(spoutState.getOrElse("topic", "Unknown Topic").toString) + List("testtopic") case None => List() } diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala index d05abc6..7f4777f 100644 --- a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala @@ -162,7 +162,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val gtp: GroupTopicPartition = messageOffsetMap._1 val offMeta: OffsetAndMetadata = messageOffsetMap._2 gtp.group shouldBe group - gtp.topicPartition shouldBe TopicAndPartition(topic, partition) + gtp.topicPartition shouldBe new TopicPartition(topic, partition) offMeta shouldBe offsetAndMetadata } } From 2f927944a3a4494d1a9b4eb30baf742114f7f7bb Mon Sep 17 00:00:00 2001 From: dmilan77 Date: Fri, 21 Dec 2018 16:55:32 -0500 Subject: [PATCH 6/8] changed README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4e3ca14..f65167b 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ $ sbt clean assembly Running It =========== -If you do not want to build it manually, just download the [current jar](https://github.com/Morningstar/kafka-offset-monitor/releases/latest). +If you do not want to build it manually, just download the [current jar](https://github.com/dmilan77/kafka-offset-monitor/releases/latest). This is a small web app, you can run it locally or on a server, as long as you have access to the Kafka broker(s) and ZooKeeper nodes storing kafka data. From 04dd40566096ba157dd6478e8ce13248cbaff916 Mon Sep 17 00:00:00 2001 From: dmilan77 Date: Tue, 25 Dec 2018 14:23:33 -0500 Subject: [PATCH 7/8] changed SSL documentation --- README.md | 2 +- start_kafkaMonitor.sh | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f65167b..fcf357b 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ java -Djava.security.auth.login.config=conf/server-client-jaas.conf \ -Djetty.ssl.trustStorePassword=trustme \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ - --kafkaSecurityProtocol SASL_SSL \ + --protocol https \ --zk zkserver01,zkserver02 \ --port 8080 \ --refresh 10.seconds \ diff --git a/start_kafkaMonitor.sh b/start_kafkaMonitor.sh index b02c41b..18c737a 100644 --- a/start_kafkaMonitor.sh +++ b/start_kafkaMonitor.sh @@ -1,11 +1,16 @@ #!/usr/bin/env bash ZOOKEEPER_HOST=zk1.server.com -java -Djava.security.auth.login.config=/opt/interset/etc/jaas/kafka_client_jaas.conf \ - -cp //KafkaOffsetMonitor/KafkaOffsetMonitor-assembly-0.4.7-dmilan-SNAPSHOT.jar \ +java -Djava.security.auth.login.config=kafka_client_jaas.conf \ + -Djetty.ssl.keyStore=keystore.jks \ + -Djetty.ssl.keyStorePassword=keypassword \ + -Djetty.ssl.trustStore=truststore.jks \ + -Djetty.ssl.trustStorePassword=trustpassword \ + -cp KafkaOffsetMonitor-assembly-0.4.7-dmilan-SNAPSHOT.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk ${ZOOKEEPER_HOST} \ - --port 9144 \ + --port 8443 \ + --protocol https \ --refresh 10.seconds \ --retain 2.days \ --dbName offsetapp_kafka \ From 7dc9f243f677eb11d2b81ba65348e1732e73cd8a Mon Sep 17 00:00:00 2001 From: dmilan77 Date: Wed, 26 Dec 2018 10:31:51 -0500 Subject: [PATCH 8/8] consumerConfig required --- .../com/quantifind/kafka/offsetapp/OffsetGetterApp.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala index a26b979..0caa26c 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala @@ -41,8 +41,8 @@ class OffsetGetterArgs extends FieldArgs { var stormZKOffsetBase = "/stormconsumers" - - var consumerConfig = "" + @Required + var consumerConfig: String = _ @Required var zk: String = _