Skip to content

Commit ad7c179

Browse files
author
Anna Zubenko
committed
Scalafmt run
1 parent 231bf56 commit ad7c179

File tree

537 files changed

+8944
-10400
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

537 files changed

+8944
-10400
lines changed

green-river/build.sbt

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
lazy val commonSettings = Seq(
2-
version := "1.0",
3-
scalaVersion := "2.11.8",
2+
version := "1.0",
3+
scalaVersion := "2.11.8",
44
updateOptions := updateOptions.value.withCachedResolution(true),
55
scalacOptions ++= List(
6-
"-encoding", "UTF-8",
6+
"-encoding",
7+
"UTF-8",
78
"-target:jvm-1.8",
89
"-feature",
910
"-unchecked",
@@ -19,12 +20,11 @@ lazy val commonSettings = Seq(
1920
)
2021
)
2122

22-
23-
lazy val greenRiver = (project in file(".")).
24-
settings(commonSettings).
25-
settings(
26-
name := "green-river",
27-
version := "1.0",
23+
lazy val greenRiver = (project in file("."))
24+
.settings(commonSettings)
25+
.settings(
26+
name := "green-river",
27+
version := "1.0",
2828
resolvers ++= Seq(
2929
"confluent" at "http://packages.confluent.io/maven"
3030
),
@@ -35,28 +35,28 @@ lazy val greenRiver = (project in file(".")).
3535

3636
Seq(
3737
// Config
38-
"com.typesafe" % "config" % "1.3.0",
38+
"com.typesafe" % "config" % "1.3.0",
3939
// Cache
40-
"com.github.cb372" %% "scalacache-lrumap" % "0.8.1",
40+
"com.github.cb372" %% "scalacache-lrumap" % "0.8.1",
4141
// JSON
42-
"org.json4s" %% "json4s-core" % json4sV,
43-
"org.json4s" %% "json4s-jackson" % json4sV,
44-
"org.json4s" %% "json4s-ext" % json4sV,
42+
"org.json4s" %% "json4s-core" % json4sV,
43+
"org.json4s" %% "json4s-jackson" % json4sV,
44+
"org.json4s" %% "json4s-ext" % json4sV,
4545
// Search
46-
"org.apache.kafka" % "kafka_2.11" % "0.9.0.1",
47-
"io.confluent" % "kafka-avro-serializer" % "1.0",
48-
"com.sksamuel.elastic4s" %% "elastic4s-core" % "2.3.0",
46+
"org.apache.kafka" % "kafka_2.11" % "0.9.0.1",
47+
"io.confluent" % "kafka-avro-serializer" % "1.0",
48+
"com.sksamuel.elastic4s" %% "elastic4s-core" % "2.3.0",
4949
// Akka
50-
"com.typesafe.akka" %% "akka-slf4j" % akkaV,
51-
"com.typesafe.akka" %% "akka-actor" % akkaV,
52-
"com.typesafe.akka" %% "akka-agent" % akkaV,
53-
"com.typesafe.akka" %% "akka-stream" % akkaV,
54-
"com.typesafe.akka" %% "akka-http-core" % akkaV,
55-
"de.heikoseeberger" %% "akka-http-json4s" % "1.6.0",
50+
"com.typesafe.akka" %% "akka-slf4j" % akkaV,
51+
"com.typesafe.akka" %% "akka-actor" % akkaV,
52+
"com.typesafe.akka" %% "akka-agent" % akkaV,
53+
"com.typesafe.akka" %% "akka-stream" % akkaV,
54+
"com.typesafe.akka" %% "akka-http-core" % akkaV,
55+
"de.heikoseeberger" %% "akka-http-json4s" % "1.6.0",
5656
// Cats
57-
"org.typelevel" %% "cats" % "0.5.0",
57+
"org.typelevel" %% "cats" % "0.5.0",
5858
// Testing
59-
"org.scalatest" %% "scalatest" % scalaTestV % "test"
59+
"org.scalatest" %% "scalatest" % scalaTestV % "test"
6060
)
6161
},
6262
(mainClass in Compile) := Some("consumer.Main"),
@@ -86,15 +86,14 @@ lazy val greenRiver = (project in file(".")).
8686
|val conn = Phoenix(connInfo)
8787
""".stripMargin,
8888
assemblyMergeStrategy in assembly := {
89-
case PathList("org", "joda", xs @ _*) MergeStrategy.last
90-
case x {
91-
val old = (assemblyMergeStrategy in assembly).value
92-
old(x)
93-
}
89+
case PathList("org", "joda", xs @ _ *) MergeStrategy.last
90+
case x {
91+
val old = (assemblyMergeStrategy in assembly).value
92+
old(x)
93+
}
9494
}
9595
//test in assembly := {}
96-
)
97-
96+
)
9897

9998
lazy val consume = inputKey[Unit]("Runs the Kafka Consumers")
10099
consume := { (runMain in Compile).partialInput(" consumer.Main").evaluated }

green-river/src/main/scala/consumer/AvroProcessor.scala

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ class AvroProcessor(schemaRegistryUrl: String, processor: JsonProcessor)(implici
119119
extends AbstractKafkaAvroDeserializer
120120
with MessageProcessor {
121121

122-
this.schemaRegistry = new CachedSchemaRegistryClient(
123-
schemaRegistryUrl, DEFAULT_MAX_SCHEMAS_PER_SUBJECT)
122+
this.schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, DEFAULT_MAX_SCHEMAS_PER_SUBJECT)
124123
val encoderFactory = EncoderFactory.get()
125124

126125
register("scoped_activities-value", AvroProcessor.activitySchema)
@@ -129,13 +128,13 @@ class AvroProcessor(schemaRegistryUrl: String, processor: JsonProcessor)(implici
129128
register("scoped_activities-key", AvroProcessor.keySchema)
130129
register("scoped_activity_trails-key", AvroProcessor.keySchema)
131130

132-
def process(offset: Long, topic: String, key: Array[Byte], message: Array[Byte]): Future[Unit] = {
131+
def process(offset: Long, topic: String, key: Array[Byte], message: Array[Byte]): Future[Unit] =
133132
try {
134133

135134
val keyJson =
136135
if (key == null || key.isEmpty) {
137136
Console.err.println(
138-
s"Warning, message has no key for topic ${topic}: ${new String(message, "UTF-8")}")
137+
s"Warning, message has no key for topic $topic: ${new String(message, "UTF-8")}")
139138
""
140139
} else {
141140
deserializeAvro(key)
@@ -150,12 +149,11 @@ class AvroProcessor(schemaRegistryUrl: String, processor: JsonProcessor)(implici
150149
val readableKey = new String(key, "UTF-8")
151150
val readableMessage = new String(message, "UTF-8")
152151
Console.err.println(
153-
s"Error deserializing avro message with key $readableKey: error $e\n\t$readableMessage")
152+
s"Error deserializing avro message with key $readableKey: error $e\n\t$readableMessage")
154153
e
155154
}
156155
case e: Throwable Future.failed(e)
157156
}
158-
}
159157

160158
def deserializeAvro(v: Array[Byte]): String = {
161159
val obj = deserialize(v)
@@ -177,13 +175,11 @@ class AvroProcessor(schemaRegistryUrl: String, processor: JsonProcessor)(implici
177175
*/
178176
object AvroJsonHelper {
179177

180-
def transformJson(json: String, fields: List[String] = List.empty): String = {
178+
def transformJson(json: String, fields: List[String] = List.empty): String =
181179
compact(render(transformJsonRaw(json, fields)))
182-
}
183180

184-
def transformJsonRaw(json: String, fields: List[String] = List.empty): JValue = {
181+
def transformJsonRaw(json: String, fields: List[String] = List.empty): JValue =
185182
JsonTransformers.camelCase(stringToJson(deannotateAvroTypes(parse(json)), fields))
186-
}
187183

188184
private def convertType(typeName: String, value: JValue): JValue =
189185
typeName match {
@@ -192,26 +188,24 @@ object AvroJsonHelper {
192188
case _ value
193189
}
194190

195-
private def deannotateAvroTypes(input: JValue): JValue = {
191+
private def deannotateAvroTypes(input: JValue): JValue =
196192
input.transformField {
197193
case JField(name, (JObject(JField(typeName, value) :: Nil))) {
198-
(name, convertType(typeName, value))
199-
}
194+
(name, convertType(typeName, value))
195+
}
200196
}
201-
}
202197

203-
private def stringToJson(input: JValue, fields: List[String]): JValue = {
198+
private def stringToJson(input: JValue, fields: List[String]): JValue =
204199
input.transformField {
205200
case JField(name, JString(text)) if fields.contains(name) {
206-
// Try to parse the text as json, otherwise treat it as text
207-
try {
208-
(name, parse(text))
209-
} catch {
210-
case NonFatal(e)
211-
Console.println(s"Error during parsing field $name: ${e.getMessage}")
212-
(name, JString(text))
213-
}
201+
// Try to parse the text as json, otherwise treat it as text
202+
try {
203+
(name, parse(text))
204+
} catch {
205+
case NonFatal(e)
206+
Console.println(s"Error during parsing field $name: ${e.getMessage}")
207+
(name, JString(text))
214208
}
209+
}
215210
}
216-
}
217211
}

green-river/src/main/scala/consumer/Main.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,16 @@ object Main {
8181
conf.indexTopics.foreach {
8282
case (index, topics)
8383
val esProcessor =
84-
new ElasticSearchProcessor(uri = conf.elasticSearchUrl,
85-
cluster = conf.elasticSearchCluster,
86-
indexName = index,
87-
topics = topics,
88-
jsonTransformers =
89-
Workers.topicTransformers(conf.connectionInfo()))
90-
91-
Console.err.println(s"index: ${index}")
92-
Console.err.println(s"topics: ${topics}")
84+
new ElasticSearchProcessor(
85+
uri = conf.elasticSearchUrl,
86+
cluster = conf.elasticSearchCluster,
87+
indexName = index,
88+
topics = topics,
89+
jsonTransformers = Workers.topicTransformers(conf.connectionInfo())
90+
)
91+
92+
Console.err.println(s"index: $index")
93+
Console.err.println(s"topics: $topics")
9394
// Create ES mappings
9495
esProcessor.createMappings()
9596
Console.out.println(s"Done")

green-river/src/main/scala/consumer/MainConfig.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,31 +40,31 @@ object MainConfig {
4040
val scopedIndexTopics = getIndexMap(conf, "kafka.scoped.indices");
4141

4242
MainConfig(
43-
activityTopic = conf.getString(s"$env.activity.kafka.topic"),
44-
avroSchemaRegistryUrl = conf.getString(s"$env.avro.schemaRegistryUrl"),
45-
elasticSearchCluster = conf.getString(s"$env.elastic.cluster"),
46-
elasticSearchIndex = conf.getString(s"$env.elastic.index"),
47-
elasticSearchUrl = conf.getString(s"$env.elastic.host"),
48-
kafkaBroker = conf.getString(s"$env.kafka.broker"),
49-
kafkaGroupId = conf.getString(s"$env.kafka.groupId"),
50-
indexTopics = indexTopics,
51-
scopedIndexTopics = scopedIndexTopics,
52-
phoenixPass = conf.getString(s"$env.activity.phoenix.pass"),
53-
phoenixUri = conf.getString(s"$env.activity.phoenix.url"),
54-
phoenixUser = conf.getString(s"$env.activity.phoenix.user"),
55-
phoenixOrg = conf.getString(s"$env.activity.phoenix.org"),
56-
maxConnections = conf.getInt(s"$env.max.connections"),
57-
startFromBeginning = conf.getBoolean(s"$env.consume.restart"),
58-
doSetup = conf.getBoolean(s"$env.elastic.setup")
43+
activityTopic = conf.getString(s"$env.activity.kafka.topic"),
44+
avroSchemaRegistryUrl = conf.getString(s"$env.avro.schemaRegistryUrl"),
45+
elasticSearchCluster = conf.getString(s"$env.elastic.cluster"),
46+
elasticSearchIndex = conf.getString(s"$env.elastic.index"),
47+
elasticSearchUrl = conf.getString(s"$env.elastic.host"),
48+
kafkaBroker = conf.getString(s"$env.kafka.broker"),
49+
kafkaGroupId = conf.getString(s"$env.kafka.groupId"),
50+
indexTopics = indexTopics,
51+
scopedIndexTopics = scopedIndexTopics,
52+
phoenixPass = conf.getString(s"$env.activity.phoenix.pass"),
53+
phoenixUri = conf.getString(s"$env.activity.phoenix.url"),
54+
phoenixUser = conf.getString(s"$env.activity.phoenix.user"),
55+
phoenixOrg = conf.getString(s"$env.activity.phoenix.org"),
56+
maxConnections = conf.getInt(s"$env.max.connections"),
57+
startFromBeginning = conf.getBoolean(s"$env.consume.restart"),
58+
doSetup = conf.getBoolean(s"$env.elastic.setup")
5959
)
6060
}
6161

6262
def getIndexMap(conf: Config, key: String): IndexTopicMap = {
6363
val topicConf = conf.getConfig(key)
6464
topicConf.entrySet.foldLeft(Map[String, Seq[String]]()) {
6565
case (m, entry) {
66-
m + (entry.getKey topicConf.getStringList(entry.getKey).toSeq)
67-
}
66+
m + (entry.getKey topicConf.getStringList(entry.getKey).toSeq)
67+
}
6868
}
6969
}
7070
}

green-river/src/main/scala/consumer/MultiTopicConsumer.scala

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,63 +19,54 @@ import scala.language.postfixOps
1919
import org.elasticsearch.client.transport.NoNodeAvailableException
2020

2121
private object Sync {
22-
def commit[A, B](consumer: KafkaConsumer[A, B]): Unit = {
22+
def commit[A, B](consumer: KafkaConsumer[A, B]): Unit =
2323
try {
2424
consumer.commitSync()
2525
} catch {
2626
case e: CommitFailedException Console.err.println(s"Failed to commit: $e")
2727
case e: KafkaException Console.err.println(s"Unexpectedly to commit: $e")
2828
}
29-
}
3029

31-
def commit[A, B](
32-
consumer: KafkaConsumer[A, B], offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = {
30+
def commit[A, B](consumer: KafkaConsumer[A, B], offsets: Map[TopicPartition, OffsetAndMetadata]): Unit =
3331
try {
3432
consumer.commitSync(offsets)
3533
} catch {
3634
case e: CommitFailedException Console.err.println(s"Failed to commit: $e")
3735
case e: KafkaException Console.err.println(s"Unexpectedly to commit: $e")
3836
}
39-
}
4037
}
4138

42-
private case class StartFromBeginning[A, B](consumer: KafkaConsumer[A, B])
43-
extends ConsumerRebalanceListener {
39+
private case class StartFromBeginning[A, B](consumer: KafkaConsumer[A, B]) extends ConsumerRebalanceListener {
4440

45-
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
41+
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit =
4642
Sync.commit(consumer)
47-
}
4843

49-
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
44+
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit =
5045
partitions.foreach { p
51-
Console.out.println(
52-
s"Consuming from beggining for topic ${p.topic} using partition ${p.partition}")
46+
Console.out.println(s"Consuming from beggining for topic ${p.topic} using partition ${p.partition}")
5347
consumer.seekToBeginning(p)
5448
}
55-
}
5649
}
5750

5851
private case class StartFromLastCommit[A, B](consumer: KafkaConsumer[A, B])
5952
extends ConsumerRebalanceListener {
6053

61-
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
54+
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit =
6255
Sync.commit(consumer)
63-
}
6456

65-
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
57+
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit =
6658
partitions.foreach { p
6759
val offsetMetadata = consumer.committed(p)
6860
if (offsetMetadata == null) {
6961
Console.out.println(
70-
s"No offset commited. Consuming from beggining for topic ${p.topic} using partition ${p.partition}")
62+
s"No offset commited. Consuming from beggining for topic ${p.topic} using partition ${p.partition}")
7163
consumer.seekToBeginning(p)
7264
} else {
7365
Console.out.println(
74-
s"Consuming from offset ${offsetMetadata.offset} for topic ${p.topic} using partition ${p.partition}")
66+
s"Consuming from offset ${offsetMetadata.offset} for topic ${p.topic} using partition ${p.partition}")
7567
consumer.seek(p, offsetMetadata.offset)
7668
}
7769
}
78-
}
7970
}
8071

8172
case class ProcessOffsetsResult(ok: Map[TopicPartition, OffsetAndMetadata] = Map.empty,
@@ -106,7 +97,7 @@ class MultiTopicConsumer(topics: Seq[String],
10697
val consumer = new RawConsumer(props)
10798
subscribe(topics, startFromBeginning)
10899

109-
def readForever(): Unit = {
100+
def readForever(): Unit =
110101
while (true) {
111102
val records = consumer.poll(timeout)
112103

@@ -152,8 +143,7 @@ class MultiTopicConsumer(topics: Seq[String],
152143
case Some(e) if e.isInstanceOf[NoNodeAvailableException]
153144
result.errorTopicAndOffset.foreach {
154145
case (tp, offset)
155-
Console.err.println(
156-
s"NoNodeAvailableException workaround, seek to offset $offset again")
146+
Console.err.println(s"NoNodeAvailableException workaround, seek to offset $offset again")
157147
consumer.seek(tp, offset)
158148
}
159149
case Some(e) if e.isInstanceOf[TryAgainLater]
@@ -164,7 +154,6 @@ class MultiTopicConsumer(topics: Seq[String],
164154
case _
165155
}
166156
}
167-
}
168157

169158
def subscribe(topics: Seq[String], startFromBeginning: Boolean): Unit = {
170159
Console.out.println(s"Subscribing to topics: $topics")

0 commit comments

Comments
 (0)