Skip to content

Commit

Permalink
Created function tryParseOffsetMessage to attempt to parse a kafka of…
Browse files Browse the repository at this point in the history
…fset message retrieved from the internal committed offset topic:

  * Handles messages of other types and questionable correctness.
  * Added 100% unit-test coverage for this new function.
Add robustness to the log-end-offset getter thread:
  * No longer shutting down the application on error. Instead, closing and destroying the client and re-creating it.
  * Sleeping on error before re-creating client and continuing to process
Deal with thread-safety issues on shared memory between threads that retrieve data from Kafka.
Respect command-line arg kafkaOffsetForceFromStart, starting consumer offset listener clients from the beginning of the log by implementin a ConsumerRebalanceListener.
Begin to reduce usage of Zookeeper:
  * Override getTopics() in KafkaOffsetGetter to retrieve topics directly from the Kafka broker
  * Override getClusterVis() in KafkaOffsetGetter to retrieve cluster
  information directly from the Kafka broker.
Stoped polluting consumer groups in zookeeper by not creatin a unique consumer group name for the consumer-offset and log-end-offset listener at each client instantiation.
Improved createNewAdminClient code, simplifying the error paths and property calling close on error.
Use constants for all property in createNewKafkaConsumer().
Fixed some bad grammar in error messages.
Re-factored some of the error handling paths, simplifying them.
Closing all kafka clients on error so connections do not leak.
Fixed silly com.twitter.util-core dependency in build.sbt.
  • Loading branch information
rcasey212 committed Feb 18, 2017
1 parent fe1c86b commit 286ea8c
Show file tree
Hide file tree
Showing 7 changed files with 879 additions and 489 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ libraryDependencies ++= Seq(
"com.quantifind" %% "sumac" % "0.3.0",
"org.apache.kafka" %% "kafka" % "0.9.0.1",
"org.reflections" % "reflections" % "0.9.10",
"com.twitter" % "util-core_2.11" % "6.40.0",
"com.twitter" %% "util-core" % "6.40.0",
"com.typesafe.slick" %% "slick" % "2.1.0",
"org.xerial" % "sqlite-jdbc" % "3.7.2",
"org.mockito" % "mockito-all" % "1.10.19" % "test",
Expand Down
Loading

0 comments on commit 286ea8c

Please sign in to comment.