Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supports kafka consumer config file paramater #39

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: scala
scala:
- 2.11.8
- 2.11.12
jdk:
- oraclejdk8
- oraclejdk8
47 changes: 32 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,49 @@ $ sbt clean assembly
Running It
===========

If you do not want to build it manually, just download the [current jar](https://github.com/Morningstar/kafka-offset-monitor/releases/latest).
If you do not want to build it manually, just download the [current jar](https://github.com/dmilan77/kafka-offset-monitor/releases/latest).

This is a small web app, you can run it locally or on a server, as long as you have access to the Kafka broker(s) and ZooKeeper nodes storing kafka data.

```

# For http
java -Djava.security.auth.login.config=conf/server-client-jaas.conf \
-cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--zk zkserver01,zkserver02 \
--port 8080 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka \
--consumerConfig consumerConfig-File

# For https
java -Djava.security.auth.login.config=conf/server-client-jaas.conf \
-cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
-Djetty.ssl.keyStore=/<path-to-keystore>/localhost-keystore.jks \
-Djetty.ssl.keyStorePassword=trustme \
-Djetty.ssl.trustStore=/<path-to-trusstore>/all-truststore.jks \
-Djetty.ssl.trustStorePassword=trustme \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 \
--kafkaSecurityProtocol SASL_PLAINTEXT \
--protocol https \
--zk zkserver01,zkserver02 \
--port 8081 \
--port 8080 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka
--dbName offsetapp_kafka \
--consumerConfig consumerConfig-File

```

The arguments are:

- **offsetStorage** valid options are ''kafka'', ''zookeeper'', or ''storm''. Anything else falls back to ''zookeeper''
- **zk** the ZooKeeper hosts
- **kafkaBrokers** comma-separated list of Kafka broker hosts (ex. "host1:port,host2:port'). Required only when using offsetStorage "kafka".
- **kafkaSecurityProtocol** security protocol to use when connecting to kafka brokers (default: ''PLAINTEXT'', optional: ''SASL_PLAINTEXT'')
- **consumerConfig** kafka consumer config. Needed for SSL/TLS Kafka
- **port** the port on which the app will be made available
- **protocol** `http` or `https` . Default value `http`
- **refresh** how often should the app refresh and store a point in the DB
- **retain** how long should points be kept in the DB
- **dbName** where to store the history (default 'offsetapp')
Expand Down Expand Up @@ -126,16 +142,17 @@ As long as this is true you will need to use local maven repo and just publish K
Assuming you have a custom implementation of OffsetInfoReporter in a jar file, running it is as simple as adding the jar to the classpath when running app:

```
java -cp KafkaOffsetMonitor-assembly-0.4.6.jar:kafka-offset-monitor-another-db-reporter.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk zkserver01,zkserver02 \
--port 8080 \
java -Djava.security.auth.login.config=/<path>/kafka_client_jaas.conf \
-cp KafkaOffsetMonitor-assembly-0.4.7-dmilan-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk ${MASTER1} \
--port 8089 \
--refresh 10.seconds \
--retain 2.days
--pluginsArgs anotherDbHost=host1,anotherDbPort=555
--retain 2.days \
--dbName offsetapp_kafka \
--consumerConfig /<path-to-jaas>/kafka-config.property
```

For complete working example you can check [kafka-offset-monitor-graphite](https://github.com/allegro/kafka-offset-monitor-graphite), a plugin reporting offset information to Graphite.

Contributing
============
Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "KafkaOffsetMonitor"
version := "0.4.6-SNAPSHOT"
scalaVersion := "2.11.11"
version := "0.4.7-dmilan-SNAPSHOT"
scalaVersion := "2.11.12"
organization := "com.quantifind"

scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature")
Expand All @@ -13,11 +13,11 @@ libraryDependencies ++= Seq(
"net.databinder" %% "unfiltered-jetty" % "0.8.4",
"net.databinder" %% "unfiltered-json4s" % "0.8.4",
"com.quantifind" %% "sumac" % "0.3.0",
"org.apache.kafka" %% "kafka" % "0.9.0.1",
"org.apache.kafka" %% "kafka" % "1.1.0",
"org.reflections" % "reflections" % "0.9.11",
"com.twitter" %% "util-core" % "7.1.0",
"com.typesafe.slick" %% "slick" % "2.1.0",
"org.xerial" % "sqlite-jdbc" % "3.18.0",
"org.xerial" % "sqlite-jdbc" % "3.25.2",
"com.google.code.gson" % "gson" % "2.8.2",
"com.google.guava" % "guava" % "20.0",
"javax.ws.rs" % "javax.ws.rs-api" % "2.0-m16",
Expand Down
15 changes: 15 additions & 0 deletions kafka-config.property
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
bootstrap.servers = localhost:9092
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer
#sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false renewTicket=true serviceName="kafka" useKeyTab=true keyTab="/scurepath/kafka.headless.keytab" principal="[email protected]";
#sasl.kerberos.service.name = kafka
#sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
value.deserializer = org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer = org.apache.kafka.common.serialization.ByteArrayDeserializer
#ssl.keymanager.algorithm = SunX509
#ssl.protocol = TLSv1.2
#ssl.trustmanager.algorithm = SunX509
#ssl.truststore.location = /secure-path/all-truststore.jks
#ssl.truststore.password = StRoNgPaSsWoRd
#ssl.truststore.type = JKS
55 changes: 33 additions & 22 deletions src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.quantifind.kafka.core

import java.io.FileInputStream
import java.nio.{BufferUnderflowException, ByteBuffer}
import java.util
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -10,23 +11,29 @@ import com.morningstar.kafka.KafkaOffsetMetadata
import com.morningstar.kafka.KafkaOffsetStorage
import com.morningstar.kafka.KafkaTopicPartition
import com.morningstar.kafka.KafkaTopicPartitionLogEndOffset

import com.quantifind.kafka.OffsetGetter.OffsetInfo
import com.quantifind.kafka.offsetapp.OffsetGetterArgs
import com.quantifind.kafka.{Node, OffsetGetter}
import com.quantifind.utils.ZkUtilsWrapper
import com.twitter.util.Time
import kafka.admin.AdminClient
import kafka.common.{KafkaException, OffsetAndMetadata, TopicAndPartition}
import kafka.coordinator._
import kafka.common.TopicAndPartition
import kafka.coordinator.group._
import kafka.utils.Logging
import kafka.common.OffsetAndMetadata
import kafka.admin.AdminClient
import kafka.coordinator.group.GroupTopicPartition
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.{KafkaException, PartitionInfo, TopicPartition}

import scala.collection.mutable.ArrayBuffer
import scala.collection.{mutable, _}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future, duration}
import scala.collection.JavaConverters._





/**
Expand All @@ -43,8 +50,8 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs)
override def processPartition(group: String, topic: String, partitionId: Int): Option[OffsetInfo] = {

val topicPartition = new TopicPartition(topic, partitionId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val optionalOffsetMetaData: Option[OffsetAndMetadata] = committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition))
// val topicAndPartition = TopicAndPartition(topic, partitionId)
val optionalOffsetMetaData: Option[OffsetAndMetadata] = committedOffsetMap.get(GroupTopicPartition(group, topicPartition))

if (!optionalOffsetMetaData.isDefined) {
error(s"processPartition: Could not find group-topic-partition in committedOffsetsMap, g:$group,t:$topic,p:$partitionId")
Expand Down Expand Up @@ -181,6 +188,8 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs)

object KafkaOffsetGetter extends Logging {



val committedOffsetMap: concurrent.Map[GroupTopicPartition, OffsetAndMetadata] = concurrent.TrieMap()
val logEndOffsetsMap: concurrent.Map[TopicPartition, Long] = concurrent.TrieMap()
val kafkaOffsetStorage: KafkaOffsetStorage = new KafkaOffsetStorage();
Expand All @@ -195,8 +204,9 @@ object KafkaOffsetGetter extends Logging {
private def createNewKafkaConsumer(args: OffsetGetterArgs, group: String, autoCommitOffset: Boolean): KafkaConsumer[Array[Byte], Array[Byte]] = {

val props: Properties = new Properties
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol)

props.load(new FileInputStream(args.consumerConfig))

props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, if (autoCommitOffset) "true" else "false")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
Expand All @@ -213,8 +223,10 @@ object KafkaOffsetGetter extends Logging {
var adminClient: AdminClient = null

val props: Properties = new Properties
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, args.kafkaBrokers)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, args.kafkaSecurityProtocol)


props.load(new FileInputStream(args.consumerConfig))


while (null == adminClient) {

Expand Down Expand Up @@ -274,6 +286,7 @@ object KafkaOffsetGetter extends Logging {
if (messageBody != null) {

val gtp: GroupTopicPartition = b.key

val offsetAndMetadata: OffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(messageBody))
return Option(gtp, offsetAndMetadata)
}
Expand Down Expand Up @@ -327,12 +340,7 @@ object KafkaOffsetGetter extends Logging {
if (args.kafkaOffsetForceFromStart) {

val topicPartitionIterator = partitions.iterator()

while (topicPartitionIterator.hasNext()) {

val topicPartition: TopicPartition = topicPartitionIterator.next()
offsetConsumer.seekToBeginning(topicPartition)
}
offsetConsumer.seekToBeginning(partitions)
}
}

Expand Down Expand Up @@ -422,13 +430,14 @@ object KafkaOffsetGetter extends Logging {

groupOverviews.foreach((groupOverview: GroupOverview) => {

val groupId: String = groupOverview.groupId;
val consumerGroupSummary: List[AdminClient#ConsumerSummary] = adminClient.describeConsumerGroup(groupId)
val groupId: String = groupOverview.groupId

val consumerGroupSummary: Option[List[AdminClient#ConsumerSummary]] = adminClient.describeConsumerGroup(groupId).consumers

consumerGroupSummary.foreach((consumerSummary) => {
consumerGroupSummary.get.foreach((consumerSummary) => {

val clientId: String = consumerSummary.clientId
val clientHost: String = consumerSummary.clientHost
val clientHost: String = consumerSummary.host

val topicPartitions: List[TopicPartition] = consumerSummary.assignment

Expand Down Expand Up @@ -510,8 +519,10 @@ object KafkaOffsetGetter extends Logging {

// Get the LogEndOffset for the TopicPartition
val topicPartition: TopicPartition = new TopicPartition(partitionInfo.topic, partitionInfo.partition)

logEndOffsetGetter.assign(Arrays.asList(topicPartition))
logEndOffsetGetter.seekToEnd(topicPartition)

logEndOffsetGetter.seekToEnd(ArrayBuffer(topicPartition).asJava)
val logEndOffset: Long = logEndOffsetGetter.position(topicPartition)

// Update KafkaOffsetStorage
Expand Down
11 changes: 9 additions & 2 deletions src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.twitter.util.Time
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.utils.Json
import kafka.utils.json.{JsonArray, JsonValue}
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
Expand Down Expand Up @@ -86,8 +87,14 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend
println(stateJson)
Json.parseFull(stateJson) match {
case Some(m) =>
val spoutState = m.asInstanceOf[Map[String, Any]]
List(spoutState.getOrElse("topic", "Unknown Topic").toString)
println(m)
val spoutStateValue:JsonValue = m.asInstanceOf[JsonValue]
// spoutStateValue.toString()
//
// val spoutState = spoutStateArray.asInstanceOf[Map[String, Any]]
//
// List(spoutState.getOrElse("topic", "Unknown Topic").toString)
List("testtopic")
case None =>
List()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ class OffsetGetterArgsWGT extends OffsetGetterArgs {

class OffsetGetterArgs extends FieldArgs {

var offsetStorage: String = "zookeeper"
var offsetStorage: String = "kafka"

var kafkaOffsetForceFromStart = false
var kafkaOffsetForceFromStart = true

var stormZKOffsetBase = "/stormconsumers"

var kafkaBrokers: String = _

var kafkaSecurityProtocol = "PLAINTEXT"
@Required
var consumerConfig: String = _

@Required
var zk: String = _
Expand Down
16 changes: 12 additions & 4 deletions src/main/scala/com/quantifind/utils/UnfilteredWebApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ trait UnfilteredWebApp[T <: Arguments] extends ArgMain[T] {
override def main(parsed: T) {
val root = getClass.getResource(htmlRoot)
println("serving resources from: " + root)
unfiltered.jetty.Http(parsed.port)
.resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...)
.filter(setup(parsed))
.run(_ => afterStart(), _ => afterStop())
if (parsed.protocol.equals("https")) {
unfiltered.jetty.Https(parsed.port)
.resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...)
.filter(setup(parsed))
.run(_ => afterStart(), _ => afterStop())
}else{
unfiltered.jetty.Http(parsed.port)
.resources(root) //whatever is not matched by our filter will be served from the resources folder (html, css, ...)
.filter(setup(parsed))
.run(_ => afterStart(), _ => afterStop())
}
}

}
Expand All @@ -36,6 +43,7 @@ object UnfilteredWebApp {

trait Arguments extends FieldArgs {
var port = Port.any
var protocol:String = "http"
}

}
2 changes: 1 addition & 1 deletion src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Root logger option
log4j.rootLogger=INFO, stdout
log4j.rootLogger=ERROR, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse}
import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
import kafka.coordinator._
import kafka.consumer.SimpleConsumer
import kafka.coordinator.group.GroupTopicPartition
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.mockito.Matchers._
Expand Down Expand Up @@ -42,7 +43,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {

val topicAndPartition = TopicAndPartition(testTopic, testPartition)
val topicPartition = new TopicPartition(testTopic, testPartition)
val groupTopicPartition = GroupTopicPartition(testGroup, TopicAndPartition(testTopic, testPartition))
val groupTopicPartition = GroupTopicPartition(testGroup, topicPartition)
val offsetAndMetadata = OffsetAndMetadata(committedOffset, "meta", System.currentTimeMillis)

KafkaOffsetGetter.committedOffsetMap += (groupTopicPartition -> offsetAndMetadata)
Expand All @@ -51,7 +52,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
.thenReturn(Some(testPartitionLeader))

val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0, Seq(logEndOffset)))
val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(org.apache.kafka.common.protocol.Errors.ILLEGAL_SASL_STATE, Seq(logEndOffset)))
val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets)
when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse)
when(offsetGetterSpy.isGroupActive(any[String])).thenReturn(true)
Expand Down Expand Up @@ -161,7 +162,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
val gtp: GroupTopicPartition = messageOffsetMap._1
val offMeta: OffsetAndMetadata = messageOffsetMap._2
gtp.group shouldBe group
gtp.topicPartition shouldBe TopicAndPartition(topic, partition)
gtp.topicPartition shouldBe new TopicPartition(topic, partition)
offMeta shouldBe offsetAndMetadata
}
}
Loading