Skip to content

Commit efe07c0

Browse files
committed
Merge branch 'release/0.6.0'
2 parents a65199f + 820f0e9 commit efe07c0

10 files changed

Lines changed: 51 additions & 36 deletions

File tree

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# tormenta #
22

3+
### Version 0.6.0 ###
4+
* Add logging, make storm provided: https://github.com/twitter/tormenta/pull/47
5+
* Upgrades Avro/Bijection-avro: https://github.com/twitter/tormenta/pull/49
6+
37
### Version 0.5.4 ###
48
* Add Scheme "withHandler" Test: https://github.com/twitter/tormenta/pull/38
59
* Fixes Implicit resolution in GenericAvroSchemeLaws: https://github.com/twitter/tormenta/pull/40

project/Build.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
66
import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
77

88
object TormentaBuild extends Build {
9+
10+
lazy val slf4jVersion = "1.6.6"
11+
lazy val stormVersion = "0.9.0-wip15"
12+
913
val extraSettings =
1014
Project.defaultSettings ++ mimaDefaultSettings
1115

@@ -19,12 +23,14 @@ object TormentaBuild extends Build {
1923

2024
val sharedSettings = extraSettings ++ ciSettings ++ Seq(
2125
organization := "com.twitter",
22-
version := "0.5.4",
26+
version := "0.6.0",
2327
scalaVersion := "2.9.3",
2428
crossScalaVersions := Seq("2.9.3", "2.10.0"),
2529
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),
2630
javacOptions in doc := Seq("-source", "1.6"),
2731
libraryDependencies ++= Seq(
32+
"org.slf4j" % "slf4j-api" % slf4jVersion,
33+
"storm" % "storm" % stormVersion % "provided",
2834
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
2935
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test"
3036
),
@@ -93,7 +99,7 @@ object TormentaBuild extends Build {
9399
def youngestForwardCompatible(subProj: String) =
94100
Some(subProj)
95101
.filterNot(unreleasedModules.contains(_))
96-
.map { s => "com.twitter" % ("tormenta-" + s + "_2.9.3") % "0.5.3" }
102+
.map { s => "com.twitter" % ("tormenta-" + s + "_2.9.3") % "0.6.0" }
97103

98104
lazy val tormenta = Project(
99105
id = "tormenta",
@@ -119,9 +125,7 @@ object TormentaBuild extends Build {
119125
)
120126
}
121127

122-
lazy val tormentaCore = module("core").settings(
123-
libraryDependencies += "storm" % "storm" % "0.9.0-wip15"
124-
)
128+
lazy val tormentaCore = module("core")
125129

126130
lazy val tormentaTwitter = module("twitter").settings(
127131
libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.3"
@@ -137,8 +141,8 @@ object TormentaBuild extends Build {
137141

138142
lazy val tormentaAvro = module("avro").settings(
139143
libraryDependencies ++= Seq(
140-
"org.apache.avro" % "avro" % "1.7.4",
141-
"com.twitter" %% "bijection-core" % "0.5.3",
142-
"com.twitter" %% "bijection-avro" % "0.5.3")
144+
"org.apache.avro" % "avro" % "1.7.5",
145+
"com.twitter" %% "bijection-core" % "0.6.0",
146+
"com.twitter" %% "bijection-avro" % "0.6.0")
143147
).dependsOn(tormentaCore % "test->test;compile->compile")
144148
}

project/plugins.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
1+
resolvers += Resolver.url("sbt-plugin-releases included", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
22

33
resolvers += "jgit-repo" at "http://download.eclipse.org/jgit/maven"
44

tormenta-avro/src/main/scala/com/twitter/tormenta/scheme/avro/generic/GenericAvroScheme.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package com.twitter.tormenta.scheme.avro.generic
1818

1919
import org.apache.avro.generic.GenericRecord
2020
import org.apache.avro.Schema
21-
import com.twitter.bijection.avro.AvroCodecs
21+
import com.twitter.bijection.avro.GenericAvroCodecs
2222
import com.twitter.bijection.Injection.connect
2323
import com.twitter.tormenta.scheme.avro.AvroScheme
2424
import com.twitter.tormenta.scheme.Scheme
@@ -33,7 +33,7 @@ object GenericAvroScheme {
3333

3434
class GenericAvroScheme[T <: GenericRecord](schema: Schema) extends Scheme[T] with AvroScheme[T] {
3535
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
36-
implicit val inj = AvroCodecs[T](schema)
36+
implicit val inj = GenericAvroCodecs[T](schema)
3737
decodeRecord(bytes)
3838
}
3939
}
@@ -44,7 +44,7 @@ object BinaryAvroScheme {
4444

4545
class BinaryAvroScheme[T <: GenericRecord](schema: Schema) extends Scheme[T] with AvroScheme[T] {
4646
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
47-
implicit val inj = AvroCodecs.toBinary[T](schema)
47+
implicit val inj = GenericAvroCodecs.toBinary[T](schema)
4848
decodeRecord(bytes)
4949
}
5050
}
@@ -55,7 +55,7 @@ object JsonAvroScheme {
5555

5656
class JsonAvroScheme[T <: GenericRecord](schema: Schema)extends Scheme[T] with AvroScheme[T] {
5757
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
58-
implicit val avroInj = AvroCodecs.toJson[T](schema)
58+
implicit val avroInj = GenericAvroCodecs.toJson[T](schema)
5959
implicit val inj = connect[T, String, Array[Byte]]
6060
decodeRecord(bytes)
6161
}

tormenta-avro/src/main/scala/com/twitter/tormenta/scheme/avro/specific/SpecificAvroScheme.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package com.twitter.tormenta.scheme.avro.specific
1919
import org.apache.avro.Schema
2020
import com.twitter.tormenta.scheme.avro.AvroScheme
2121
import com.twitter.tormenta.scheme.Scheme
22-
import com.twitter.bijection.avro.AvroCodecs
22+
import com.twitter.bijection.avro.SpecificAvroCodecs
2323
import com.twitter.bijection.Injection._
2424
import org.apache.avro.specific.SpecificRecordBase
2525

@@ -33,7 +33,7 @@ object SpecificAvroScheme {
3333

3434
class SpecificAvroScheme[T <: SpecificRecordBase : Manifest] extends Scheme[T] with AvroScheme[T] {
3535
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
36-
implicit val inj = AvroCodecs[T]
36+
implicit val inj = SpecificAvroCodecs[T]
3737
decodeRecord(bytes)
3838
}
3939
}
@@ -44,7 +44,7 @@ object BinaryAvroScheme {
4444

4545
class BinaryAvroScheme[T <: SpecificRecordBase : Manifest] extends Scheme[T] with AvroScheme[T] {
4646
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
47-
implicit val inj = AvroCodecs.toBinary[T]
47+
implicit val inj = SpecificAvroCodecs.toBinary[T]
4848
decodeRecord(bytes)
4949
}
5050
}
@@ -55,7 +55,7 @@ object JsonAvroScheme {
5555

5656
class JsonAvroScheme[T <: SpecificRecordBase : Manifest](schema: Schema) extends Scheme[T] with AvroScheme[T] {
5757
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
58-
implicit val avroInj = AvroCodecs.toJson[T](schema)
58+
implicit val avroInj = SpecificAvroCodecs.toJson[T](schema)
5959
implicit val inj = connect[T, String, Array[Byte]]
6060
decodeRecord(bytes)
6161
}

tormenta-avro/src/test/scala/com/twitter/tormenta/scheme/avro/GenericAvroSchemeLaws.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.scalacheck.Properties
2020
import org.apache.avro.Schema
2121
import org.apache.avro.generic.{GenericData, GenericRecord}
2222
import com.twitter.bijection.Injection
23-
import com.twitter.bijection.avro.AvroCodecs
23+
import com.twitter.bijection.avro.GenericAvroCodecs
2424
import com.twitter.tormenta.scheme.avro.generic.{JsonAvroScheme, BinaryAvroScheme, GenericAvroScheme}
2525
import com.twitter.bijection.Injection._
2626
import com.twitter.tormenta.scheme.Scheme
@@ -47,40 +47,40 @@ object GenericAvroSchemeLaws extends Properties("GenericAvroScheme") with BaseAv
4747
}
4848

4949
property("round trips Generic Record using Generic Avro Scheme") = {
50-
implicit val inj = AvroCodecs[GenericRecord](testSchema)
50+
implicit val inj = GenericAvroCodecs[GenericRecord](testSchema)
5151
implicit val scheme = GenericAvroScheme[GenericRecord](testSchema)
5252
roundTripsGenericRecord
5353
}
5454

5555
property("round trips Generic Record using Binary Avro Scheme") = {
56-
implicit val inj = AvroCodecs.toBinary[GenericRecord](testSchema)
56+
implicit val inj = GenericAvroCodecs.toBinary[GenericRecord](testSchema)
5757
implicit val scheme = BinaryAvroScheme[GenericRecord](testSchema)
5858
roundTripsGenericRecord
5959
}
6060

6161
property("round trips Generic Record using Json Avro Scheme") = {
62-
implicit val avinj = AvroCodecs.toJson[GenericRecord](testSchema)
62+
implicit val avinj = GenericAvroCodecs.toJson[GenericRecord](testSchema)
6363
implicit val scheme = JsonAvroScheme[GenericRecord](testSchema)
6464
implicit val inj = connect[GenericRecord, String, Array[Byte]]
6565
roundTripsGenericRecord
6666
}
6767

6868
property("Simulates Generic Avro Scheme failure") = {
69-
implicit val jinj = AvroCodecs.toJson[GenericRecord](testSchema) //passing wrong injection to produce incorrect bytes
69+
implicit val jinj = GenericAvroCodecs.toJson[GenericRecord](testSchema) //passing wrong injection to produce incorrect bytes
7070
implicit val inj = connect[GenericRecord, String, Array[Byte]]
7171
implicit val scheme = BinaryAvroScheme[GenericRecord](testSchema).withHandler(t=>List(failedGenericRecord))
7272
simulateGenericRecordFailure
7373
}
7474

7575
property("simulate Binary Avro Scheme") = {
76-
implicit val inj = AvroCodecs[GenericRecord](testSchema) //passing wrong injection to produce incorrect bytes
76+
implicit val inj = GenericAvroCodecs[GenericRecord](testSchema) //passing wrong injection to produce incorrect bytes
7777
implicit val scheme = BinaryAvroScheme[GenericRecord](testSchema).withHandler(t=>List(failedGenericRecord))
7878
simulateGenericRecordFailure
7979
}
8080

8181
property("round trips Generic Record using Json Avro Scheme") = {
8282
implicit val scheme = JsonAvroScheme[GenericRecord](testSchema).withHandler(t=>List(failedGenericRecord))
83-
implicit val inj = AvroCodecs[GenericRecord](testSchema) //passing wrong injection to produce incorrect bytes
83+
implicit val inj = GenericAvroCodecs[GenericRecord](testSchema) //passing wrong injection to produce incorrect bytes
8484
simulateGenericRecordFailure
8585
}
8686

tormenta-avro/src/test/scala/com/twitter/tormenta/scheme/avro/SpecificAvroSchemeLaws.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package com.twitter.tormenta.scheme.avro
1818

1919
import org.scalacheck.Properties
2020
import com.twitter.bijection.Injection
21-
import com.twitter.bijection.avro.AvroCodecs
21+
import com.twitter.bijection.avro.SpecificAvroCodecs
2222

2323
import com.twitter.bijection.Injection._
2424
import avro.FiscalRecord
@@ -48,40 +48,40 @@ object SpecificAvroSchemeLaws extends Properties("SpecificAvroScheme") with Base
4848

4949

5050
property("round trips Specific Record using Specific Avro Scheme") = {
51-
implicit val inj = AvroCodecs[FiscalRecord]
51+
implicit val inj = SpecificAvroCodecs[FiscalRecord]
5252
implicit val scheme = SpecificAvroScheme[FiscalRecord]
5353
roundTripsSpecificRecord
5454
}
5555

5656
property("round trips Specific Record using Binary Avro Scheme") = {
57-
implicit val inj = AvroCodecs.toBinary[FiscalRecord]
57+
implicit val inj = SpecificAvroCodecs.toBinary[FiscalRecord]
5858
implicit val scheme = BinaryAvroScheme[FiscalRecord]
5959
roundTripsSpecificRecord
6060
}
6161

6262
property("round trips Specific Record using Json Avro Scheme") = {
63-
implicit val avinj = AvroCodecs.toJson[FiscalRecord](testSchema)
63+
implicit val avinj = SpecificAvroCodecs.toJson[FiscalRecord](testSchema)
6464
implicit val scheme = JsonAvroScheme[FiscalRecord](testSchema)
6565
implicit val inj = connect[FiscalRecord, String, Array[Byte]]
6666
roundTripsSpecificRecord
6767
}
6868

6969
property("Simulates Specific Avro Scheme failure") = {
70-
implicit val jinj = AvroCodecs.toJson[FiscalRecord](testSchema) //passing wrong injection to produce incorrect bytes
70+
implicit val jinj = SpecificAvroCodecs.toJson[FiscalRecord](testSchema) //passing wrong injection to produce incorrect bytes
7171
implicit val inj = connect[FiscalRecord, String, Array[Byte]]
7272
implicit val scheme = BinaryAvroScheme[FiscalRecord].withHandler(t=>List(failedGenericRecord))
7373
simulateSpecificRecordFailure
7474
}
7575

7676
property("simulate Binary Avro Scheme") = {
77-
implicit val inj = AvroCodecs[FiscalRecord] //passing wrong injection to produce incorrect bytes
77+
implicit val inj = SpecificAvroCodecs[FiscalRecord] //passing wrong injection to produce incorrect bytes
7878
implicit val scheme = BinaryAvroScheme[FiscalRecord].withHandler(t=>List(failedGenericRecord))
7979
simulateSpecificRecordFailure
8080
}
8181

8282
property("round trips Specific Record using Json Avro Scheme") = {
8383
implicit val scheme = JsonAvroScheme[FiscalRecord](testSchema).withHandler(t=>List(failedGenericRecord))
84-
implicit val inj = AvroCodecs[FiscalRecord] //passing wrong injection to produce incorrect bytes
84+
implicit val inj = SpecificAvroCodecs[FiscalRecord] //passing wrong injection to produce incorrect bytes
8585
simulateSpecificRecordFailure
8686
}
8787

tormenta-avro/src/test/scala/com/twitter/tormenta/scheme/spout/GenericRecordTopologyTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import backtype.storm.Testing
2727
import org.specs._
2828
import scala.collection.JavaConverters._
2929
import com.twitter.tormenta.AvroTestHelper
30-
import com.twitter.bijection.avro.AvroCodecs
30+
import com.twitter.bijection.avro.GenericAvroCodecs
3131
import org.apache.avro.generic.GenericRecord
3232

3333
/**
@@ -36,7 +36,7 @@ import org.apache.avro.generic.GenericRecord
3636
*/
3737

3838
object GenericRecordTopologyTest extends Specification with AvroTestHelper {
39-
val inj = AvroCodecs[GenericRecord](testSchema)
39+
val inj = GenericAvroCodecs[GenericRecord](testSchema)
4040

4141
val genericSpout = TraversableSpout[GenericRecord](List(
4242
buildGenericAvroRecord("2010-01-01", 1, 1),

tormenta-avro/src/test/scala/com/twitter/tormenta/scheme/spout/SpecificRecordTopologyTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package com.twitter.tormenta.scheme.spout
1818

1919
import org.specs.Specification
2020
import com.twitter.tormenta.AvroTestHelper
21-
import com.twitter.bijection.avro.AvroCodecs
21+
import com.twitter.bijection.avro.SpecificAvroCodecs
2222
import com.twitter.tormenta.spout.{TraversableSpout, Spout}
2323
import backtype.storm.topology.TopologyBuilder
2424
import backtype.storm.{Testing, LocalCluster}
@@ -32,7 +32,7 @@ import scala.collection.JavaConverters._
3232
* @since 9/25/13
3333
*/
3434
object SpecificRecordTopologyTest extends Specification with AvroTestHelper {
35-
val inj = AvroCodecs[FiscalRecord]
35+
val inj = SpecificAvroCodecs[FiscalRecord]
3636

3737
val specificSpout: Spout[Array[Byte]] = TraversableSpout[FiscalRecord](List(
3838
buildSpecificAvroRecord("2010-01-01", 1, 1),

tormenta-core/src/main/scala/com/twitter/tormenta/scheme/Scheme.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import backtype.storm.spout.MultiScheme
2121
import java.util.{ List => JList }
2222
import scala.collection.JavaConverters._
2323
import java.io.Serializable
24+
import org.slf4j.LoggerFactory
2425

2526
/**
2627
* @author Oscar Boykin
@@ -42,7 +43,13 @@ trait Scheme[+T] extends MultiScheme with Serializable { self =>
4243
*/
4344
def decode(bytes: Array[Byte]): TraversableOnce[T]
4445

45-
def handle(t: Throwable): TraversableOnce[T] = List.empty
46+
def handle(t: Throwable): TraversableOnce[T] = {
47+
// We assume this is rare enough that the perf hit of
48+
// getLogger+getClass is better than
49+
// forcing a new variable on everyone, even those that override this
50+
LoggerFactory.getLogger(getClass).error("decoding error, ignoring", t)
51+
List.empty
52+
}
4653

4754
def withHandler[U >: T](fn: Throwable => TraversableOnce[U]): Scheme[U] =
4855
new Scheme[U] {

0 commit comments

Comments
 (0)