Skip to content

Commit 7c03486

Browse files
committed
Merge pull request #168 from mesos/kafka_0_9_support
Kafka_0_9_support
2 parents d4c4072 + 65318dd commit 7c03486

File tree

9 files changed

+127
-17
lines changed

9 files changed

+127
-17
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ out/*
22
project/*
33
.gradle
44
*.iml
5+
*.ipr
56
todo.txt
67

78
src/docker/.docker

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
apply plugin: 'scala'
22
apply plugin: 'idea'
33

4-
version = '0.9.3.0'
4+
version = '0.9.4.0'
55

66
jar.archiveName = "kafka-mesos-${version}.jar"
77

kafka-mesos.properties

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ debug=true
33

44
user=vagrant
55

6-
storage=file:kafka-mesos.json
6+
storage=zk:mesos-kafka-scheduler
77

88
master=master:5050
99

10-
zk=master:2181
11-
12-
api=http://192.168.3.1:7000
10+
zk=master:2181/chroot
1311

12+
#for testing on the vagrant master via ./kafka-mesos.sh scheduler
13+
#you will eventually want to run this on a scheduler i.e marathon
14+
#change the IP to what is service discoverable & routable for your setup
15+
api=http://192.168.3.5:7000

src/scala/ly/stealth/mesos/kafka/BrokerServer.scala

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,27 +81,46 @@ object BrokerServer {
8181
val serverClass = loader.loadClass("kafka.server.KafkaServerStartable")
8282
val configClass = loader.loadClass("kafka.server.KafkaConfig")
8383

84-
val props: Properties = this.props(options, "server.properties")
85-
val config: Object = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object]
84+
val config: Object = newKafkaConfig(this.props(options, "server.properties"))
8685
val server: Object = serverClass.getConstructor(configClass).newInstance(config).asInstanceOf[Object]
8786

8887
server
8988
}
9089

9190
def startReporters(options: util.Map[String, String]): Object = {
92-
val configClass = loader.loadClass("kafka.server.KafkaConfig")
93-
94-
val props: Properties = this.props(options, "server.properties")
95-
val config: Object = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object]
96-
9791
val metricsReporter = loader.loadClass("kafka.metrics.KafkaMetricsReporter$").getField("MODULE$").get(null)
9892
val metricsReporterClass = metricsReporter.getClass
99-
val verifiableProps = config.getClass.getMethod("props").invoke(config)
93+
94+
val props = this.props(options, "server.properties")
95+
val verifiablePropsClass: Class[_] = loader.loadClass("kafka.utils.VerifiableProperties")
96+
val verifiableProps: Object = verifiablePropsClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object]
97+
10098
metricsReporterClass.getMethod("startReporters", verifiableProps.getClass).invoke(metricsReporter, verifiableProps)
10199
}
100+
101+
private def newKafkaConfig(props: Properties): Object = {
102+
val configClass = loader.loadClass("kafka.server.KafkaConfig")
103+
var config: Object = null
102104

105+
// in kafka <= 0.8.x constructor is KafkaConfig(java.util.Properties)
106+
try { config = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object] }
107+
catch { case e: NoSuchMethodException => }
108+
109+
if (config == null) {
110+
// in kafka 0.9.0.0 constructor is KafkaConfig(java.util.Map[_,_])
111+
val map: util.Map[_,_] = props.toMap.asInstanceOf[Map[_,_]]
112+
try { config = configClass.getConstructor(classOf[util.Map[String, String]]).newInstance(map).asInstanceOf[Object] }
113+
catch { case e: NoSuchMethodError => }
114+
}
115+
116+
if (config == null) throw new IllegalStateException("Can't create KafkaConfig. Unsupported kafka distro?")
117+
config
118+
}
119+
103120
def configureLog4j(options: util.Map[String, String]): Unit = {
121+
System.setProperty("kafka.logs.dir", "" + new File(Distro.dir, "log"))
104122
val props: Properties = this.props(options, "log4j.properties")
123+
105124
val configurator: Class[_] = loader.loadClass("org.apache.log4j.PropertyConfigurator")
106125
configurator.getMethod("configure", classOf[Properties]).invoke(null, props)
107126
}
@@ -141,13 +160,31 @@ object BrokerServer {
141160
// This is required, because current jar have classes incompatible with classes from kafka distro.
142161
class Loader(urls: Array[URL]) extends URLClassLoader(urls) {
143162
val snappyHackedClasses = Array[String]("org.xerial.snappy.SnappyNativeAPI", "org.xerial.snappy.SnappyNative", "org.xerial.snappy.SnappyErrorCode")
163+
var snappyHackEnabled = false
164+
checkSnappyVersion
165+
166+
def checkSnappyVersion {
167+
var jarName: String = null
168+
for (url <- urls) {
169+
val fileName = new File(url.getFile).getName
170+
if (fileName.matches("snappy.*jar")) jarName = fileName
171+
}
172+
173+
if (jarName == null) return
174+
val hIdx = jarName.lastIndexOf("-")
175+
val extIdx = jarName.lastIndexOf(".jar")
176+
if (hIdx == -1 || extIdx == -1) return
177+
178+
val version = new Util.Version(jarName.substring(hIdx + 1, extIdx))
179+
snappyHackEnabled = version.compareTo(new Util.Version(1,1,0)) <= 0
180+
}
144181

145182
override protected def loadClass(name: String, resolve: Boolean): Class[_] = {
146183
getClassLoadingLock(name) synchronized {
147184
// Handle Snappy class loading hack:
148185
// Snappy injects 3 classes and native lib to root ClassLoader
149186
// See - org.xerial.snappy.SnappyLoader.injectSnappyNativeLoader
150-
if (snappyHackedClasses.contains(name))
187+
if (snappyHackEnabled && snappyHackedClasses.contains(name))
151188
return super.loadClass(name, true)
152189

153190
// Check class is loaded

src/scala/ly/stealth/mesos/kafka/HttpServer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import scala.util.parsing.json.JSONObject
3434
object HttpServer {
3535
var jar: File = null
3636
var kafkaDist: File = null
37+
var kafkaVersion: Util.Version = null
3738

3839
val logger = Logger.getLogger(HttpServer.getClass)
3940
var server: Server = null
@@ -90,6 +91,13 @@ object HttpServer {
9091

9192
if (jar == null) throw new IllegalStateException(jarMask + " not found in current dir")
9293
if (kafkaDist == null) throw new IllegalStateException(kafkaMask + " not found in in current dir")
94+
95+
// extract version
96+
val distName: String = kafkaDist.getName
97+
val tgzIdx = distName.lastIndexOf(".tgz")
98+
val hIdx = distName.lastIndexOf("-")
99+
if (tgzIdx == -1 || hIdx == -1) throw new IllegalStateException("Can't extract version number from " + distName)
100+
kafkaVersion = new Util.Version(distName.substring(hIdx + 1, tgzIdx))
93101
}
94102

95103
private class Servlet extends HttpServlet {

src/scala/ly/stealth/mesos/kafka/Scheduler.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util
2424
import com.google.protobuf.ByteString
2525
import java.util.{Collections, Date}
2626
import scala.collection.JavaConversions._
27-
import ly.stealth.mesos.kafka.Util.{Period, Str}
27+
import ly.stealth.mesos.kafka.Util.{Version, Period, Str}
2828

2929
object Scheduler extends org.apache.mesos.Scheduler {
3030
private val logger: Logger = Logger.getLogger(this.getClass)
@@ -70,6 +70,9 @@ object Scheduler extends org.apache.mesos.Scheduler {
7070
"host.name" -> offer.getHostname
7171
)
7272

73+
if (HttpServer.kafkaVersion.compareTo(new Version("0.9")) >= 0)
74+
defaults += ("listeners" -> s"PLAINTEXT://:${reservation.port}")
75+
7376
if (reservation.volume != null)
7477
defaults += ("log.dirs" -> "data/kafka-logs")
7578

src/scala/ly/stealth/mesos/kafka/Util.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,38 @@ object Util {
240240
override def toString: String = if (start == end) "" + start else start + ".." + end
241241
}
242242

243+
class Version(s: String) extends Comparable[Version] {
244+
private var parts: Array[Int] = null
245+
246+
def this(args: Int*) {
247+
this(args.mkString("."))
248+
}
249+
250+
parse
251+
private def parse {
252+
parts = if (s != "") s.split("\\.", -1).map(Integer.parseInt) else new Array[Int](0)
253+
}
254+
255+
def asList: List[Int] = parts.toList
256+
257+
def compareTo(v: Version): Int = {
258+
for (i <- 0 until Math.min(parts.length, v.parts.length)) {
259+
val diff = parts(i) - v.parts(i)
260+
if (diff != 0) return diff
261+
}
262+
263+
parts.length - v.parts.length
264+
}
265+
266+
override def hashCode(): Int = toString.hashCode
267+
268+
override def equals(obj: scala.Any): Boolean = {
269+
obj.isInstanceOf[Version] && toString == "" + obj
270+
}
271+
272+
override def toString: String = parts.mkString(".")
273+
}
274+
243275
class BindAddress(s: String) {
244276
private var _source: String = null
245277
private var _value: String = null

src/test/ly/stealth/mesos/kafka/MesosTestCase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class MesosTestCase {
8282
}
8383

8484
HttpServer.jar = createTempFile("executor.jar", "executor")
85-
HttpServer.kafkaDist = createTempFile("kafka.tgz", "kafka")
85+
HttpServer.kafkaDist = createTempFile("kafka-0.9.3.0.tgz", "kafka")
8686
}
8787

8888
@After

src/test/ly/stealth/mesos/kafka/UtilTest.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package ly.stealth.mesos.kafka
1919

2020
import org.junit.Test
2121
import org.junit.Assert._
22-
import ly.stealth.mesos.kafka.Util.{BindAddress, Period, Range}
22+
import ly.stealth.mesos.kafka.Util.{BindAddress, Period, Range, Version}
2323
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
2424
import java.util
2525

@@ -260,6 +260,33 @@ class UtilTest {
260260
assertEquals("0", "" + new Range("0..0"))
261261
}
262262

263+
@Test
264+
def Version_init {
265+
assertEquals(List(), new Version().asList)
266+
assertEquals(List(1,0), new Version(1,0).asList)
267+
assertEquals(List(1,2,3,4), new Version("1.2.3.4").asList)
268+
269+
try { new Version(" "); fail() }
270+
catch { case e: IllegalArgumentException => }
271+
272+
try { new Version("."); fail() }
273+
catch { case e: IllegalArgumentException => }
274+
275+
try { new Version("a"); fail() }
276+
catch { case e: IllegalArgumentException => }
277+
}
278+
279+
@Test
280+
def Version_compareTo {
281+
assertEquals(0, new Version().compareTo(new Version()))
282+
assertEquals(0, new Version(0).compareTo(new Version(0)))
283+
284+
assertTrue(new Version(0).compareTo(new Version(1)) < 0)
285+
assertTrue(new Version(0).compareTo(new Version(0, 0)) < 0)
286+
287+
assertTrue(new Version(0, 9, 0, 0).compareTo(new Version(0, 8, 2, 0)) > 0)
288+
}
289+
263290
// BindAddress
264291
@Test
265292
def BindAddress_init {

0 commit comments

Comments
 (0)