Skip to content

Commit fb749bf

Browse files
GavinGavinNo1fjy
authored andcommitted
support rollup function in druid 0.9.2 (#210)
* support rollup function in druid 0.9.2 * give isRollup a default value; use NoopEmitter for this dummy emitter
1 parent 33cd9cb commit fb749bf

File tree

19 files changed

+132
-73
lines changed

19 files changed

+132
-73
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ val jacksonOneVersion = "1.9.13"
1414
// See https://github.com/druid-io/druid/pull/1669, https://github.com/druid-io/tranquility/pull/81 before upgrading Jackson
1515
val jacksonTwoVersion = "2.4.6"
1616
val jacksonTwoModuleScalaVersion = "2.4.5"
17-
val druidVersion = "0.9.1"
17+
val druidVersion = "0.9.2"
1818
val guiceVersion = "4.0"
1919
val flinkVersion = "1.0.3"
2020
val finagleVersion = "6.31.0"

core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ class DruidBeamMaker[A](
8888
"granularitySpec" -> Map(
8989
"type" -> "uniform",
9090
"segmentGranularity" -> beamTuning.segmentGranularity,
91-
"queryGranularity" -> queryGranularityMap
91+
"queryGranularity" -> queryGranularityMap,
92+
"rollup" -> rollup.isRollup
9293
)
9394
)
9495
val ioConfigMap = Map(

core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,13 @@ import scala.reflect.runtime.universe.typeTag
8787
* val dataSource = "foo"
8888
* val dimensions = Seq("bar")
8989
* val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
90+
* val isRollup = true
9091
* val sender = DruidBeams
9192
* .builder[Map[String, Any]](eventMap => new DateTime(eventMap("timestamp")))
9293
* .curator(curator)
9394
* .discoveryPath("/test/discovery")
9495
* .location(DruidLocation(new DruidEnvironment("druid:local:indexer", "druid:local:firehose:%s"), dataSource))
95-
* .rollup(DruidRollup(dimensions, aggregators, QueryGranularities.MINUTE))
96+
* .rollup(DruidRollup(dimensions, aggregators, QueryGranularities.MINUTE, isRollup))
9697
* .tuning(new ClusteredBeamTuning(Granularity.HOUR, 10.minutes, 1, 1))
9798
* .buildTranquilizer()
9899
* val future = sender.send(Map("timestamp" -> "2010-01-02T03:04:05.678Z", "bar" -> "hey", "baz" -> 3))
@@ -336,7 +337,8 @@ object DruidBeams
336337
)
337338
},
338339
aggregators = fireDepartment.getDataSchema.getAggregators,
339-
indexGranularity = fireDepartment.getDataSchema.getGranularitySpec.getQueryGranularity
340+
indexGranularity = fireDepartment.getDataSchema.getGranularitySpec.getQueryGranularity,
341+
isRollup = fireDepartment.getDataSchema.getGranularitySpec.isRollup
340342
)
341343
builder(inputFnFn(rollup, mkparser, timestampSpec), timestamperFn(timestampSpec))
342344
.curatorFactory(

core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala

+13-7
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ import scala.collection.JavaConverters._
3535
class DruidRollup(
3636
val dimensions: DruidDimensions,
3737
val aggregators: IndexedSeq[AggregatorFactory],
38-
val indexGranularity: QueryGranularity
38+
val indexGranularity: QueryGranularity,
39+
val isRollup: Boolean = true
3940
)
4041
{
4142
private val additionalExclusions: Set[String] = {
@@ -176,10 +177,11 @@ object DruidRollup
176177
def apply(
177178
dimensions: DruidDimensions,
178179
aggregators: Seq[AggregatorFactory],
179-
indexGranularity: QueryGranularity
180+
indexGranularity: QueryGranularity,
181+
isRollup: Boolean
180182
) =
181183
{
182-
new DruidRollup(dimensions, aggregators.toIndexedSeq, indexGranularity)
184+
new DruidRollup(dimensions, aggregators.toIndexedSeq, indexGranularity, isRollup)
183185
}
184186

185187
/**
@@ -192,13 +194,15 @@ object DruidRollup
192194
def create(
193195
dimensions: DruidDimensions,
194196
aggregators: java.util.List[AggregatorFactory],
195-
indexGranularity: QueryGranularity
197+
indexGranularity: QueryGranularity,
198+
isRollup: Boolean
196199
): DruidRollup =
197200
{
198201
new DruidRollup(
199202
dimensions,
200203
aggregators.asScala.toIndexedSeq,
201-
indexGranularity
204+
indexGranularity,
205+
isRollup
202206
)
203207
}
204208

@@ -208,13 +212,15 @@ object DruidRollup
208212
def create(
209213
dimensions: java.util.List[String],
210214
aggregators: java.util.List[AggregatorFactory],
211-
indexGranularity: QueryGranularity
215+
indexGranularity: QueryGranularity,
216+
isRollup: Boolean
212217
): DruidRollup =
213218
{
214219
new DruidRollup(
215220
SpecificDruidDimensions(dimensions.asScala, Vector.empty),
216221
aggregators.asScala.toIndexedSeq,
217-
indexGranularity
222+
indexGranularity,
223+
isRollup
218224
)
219225
}
220226
}

core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public void testSpecificDimensionsRollupConfiguration() throws Exception
5252
final DruidRollup rollup = DruidRollup.create(
5353
DruidDimensions.specific(dimensions),
5454
aggregators,
55-
QueryGranularities.MINUTE
55+
QueryGranularities.MINUTE,
56+
true
5657
);
5758
Assert.assertTrue(rollup.dimensions() instanceof SpecificDruidDimensions);
5859
Assert.assertEquals("column", ((SpecificDruidDimensions) rollup.dimensions()).dimensions().iterator().next());
@@ -64,7 +65,8 @@ public void testSchemalessDimensionsRollupConfiguration() throws Exception
6465
final DruidRollup rollup = DruidRollup.create(
6566
DruidDimensions.schemaless(),
6667
aggregators,
67-
QueryGranularities.MINUTE
68+
QueryGranularities.MINUTE,
69+
true
6870
);
6971
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
7072
Assert.assertEquals(0, ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().size());
@@ -76,7 +78,8 @@ public void testSchemalessDimensionsWithExclusionsRollupConfiguration() throws E
7678
final DruidRollup rollup = DruidRollup.create(
7779
DruidDimensions.schemalessWithExclusions(dimensions),
7880
aggregators,
79-
QueryGranularities.MINUTE
81+
QueryGranularities.MINUTE,
82+
true
8083
);
8184
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
8285
Assert.assertEquals("column", ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().iterator().next());
@@ -96,7 +99,8 @@ public void testSchemalessDimensionsWithExclusionsAndSpatialDimensionsRollupConf
9699
)
97100
),
98101
aggregators,
99-
QueryGranularities.MINUTE
102+
QueryGranularities.MINUTE,
103+
true
100104
);
101105
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
102106
Assert.assertEquals("column", ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().iterator().next());

core/src/test/resources/tranquility-core.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
"granularitySpec" : {
3838
"queryGranularity" : "none",
3939
"type" : "uniform",
40-
"segmentGranularity" : "hour"
40+
"segmentGranularity" : "hour",
41+
"rollup": false
4142
},
4243
"dataSource" : "foo"
4344
},

core/src/test/resources/tranquility-core.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dataSources:
2121
type: uniform
2222
segmentGranularity: hour
2323
queryGranularity: none
24+
rollup: false
2425

2526
ioConfig:
2627
type: realtime

core/src/test/scala/com/metamx/tranquility/test/DirectDruidTest.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ object DirectDruidTest
9696
Vector(MultipleFieldDruidSpatialDimension("coord.geo", Seq("lat", "lon")))
9797
),
9898
IndexedSeq(new LongSumAggregatorFactory("barr", "bar")),
99-
QueryGranularities.MINUTE
99+
QueryGranularities.MINUTE,
100+
true
100101
)
101102
val druidEnvironment = new DruidEnvironment(
102103
"druid/tranquility/indexer" /* Slashes should be converted to colons */ ,

core/src/test/scala/com/metamx/tranquility/test/DruidBeamTest.scala

+65-39
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@
2020
package com.metamx.tranquility.test
2121

2222
import com.fasterxml.jackson.databind
23-
import com.fasterxml.jackson.databind.DeserializationContext
24-
import com.fasterxml.jackson.databind.InjectableValues
25-
import com.fasterxml.jackson.databind.ObjectMapper
23+
import com.fasterxml.jackson.databind.{ObjectReader, DeserializationContext, InjectableValues, ObjectMapper}
2624
import com.google.inject.Key
2725
import com.metamx.common.Granularity
2826
import com.metamx.common.scala.untyped.Dict
29-
import com.metamx.emitter.core.Emitter
30-
import com.metamx.emitter.core.Event
27+
import com.metamx.emitter.core.NoopEmitter
3128
import com.metamx.emitter.service.ServiceEmitter
3229
import com.metamx.tranquility.beam.ClusteredBeamTuning
3330
import com.metamx.tranquility.druid.DruidBeamConfig
@@ -56,6 +53,30 @@ import _root_.scala.collection.JavaConverters._
5653

5754
class DruidBeamTest extends FunSuite with Matchers
5855
{
56+
private def emptyEmitter(): ServiceEmitter = new ServiceEmitter(
57+
"service", "host", new NoopEmitter()
58+
)
59+
60+
private def defaultObjectReader(): ObjectReader = DruidGuicer.Default.objectMapper.reader(
61+
new InjectableValues
62+
{
63+
override def findInjectableValue(
64+
valueId: Any,
65+
ctxt: DeserializationContext,
66+
forProperty: databind.BeanProperty,
67+
beanInstance: scala.Any
68+
): AnyRef =
69+
{
70+
valueId match {
71+
case k: Key[_] if k.getTypeLiteral.getRawType == classOf[ChatHandlerProvider] => new NoopChatHandlerProvider
72+
case k: Key[_] if k.getTypeLiteral.getRawType == classOf[ObjectMapper] => DruidGuicer.Default.objectMapper
73+
case k: Key[_] if k.getTypeLiteral.getRawType == classOf[EventReceiverFirehoseRegister] =>
74+
new EventReceiverFirehoseRegister
75+
}
76+
}
77+
}
78+
).withType(classOf[Task])
79+
5980
test("GenerateAvailabilityGroup") {
6081
val dt = new DateTime("2010-02-03T04:34:56.789", DateTimeZone.forID("America/Los_Angeles"))
6182
assert(DruidBeamMaker.generateAvailabilityGroup("x", dt, 1) === "x-2010-02-03T12:34:56.789Z-0001")
@@ -123,23 +144,13 @@ class DruidBeamTest extends FunSuite with Matchers
123144
DruidRollup(
124145
dimensions = SpecificDruidDimensions(Seq("dim1", "dim2"), Seq(DruidSpatialDimension.singleField("spatial1"))),
125146
aggregators = Seq(new LongSumAggregatorFactory("met1", "met1")),
126-
indexGranularity = QueryGranularities.MINUTE
147+
indexGranularity = QueryGranularities.MINUTE,
148+
true
127149
),
128150
new TimestampSpec("ts", "iso", null),
129151
null,
130152
null,
131-
new ServiceEmitter(
132-
"service", "host", new Emitter
133-
{
134-
override def flush(): Unit = ???
135-
136-
override def emit(event: Event): Unit = ???
137-
138-
override def close(): Unit = ???
139-
140-
override def start(): Unit = ???
141-
}
142-
),
153+
emptyEmitter(),
143154
null,
144155
DruidGuicer.Default.objectMapper
145156
)
@@ -151,27 +162,7 @@ class DruidBeamTest extends FunSuite with Matchers
151162
1,
152163
2
153164
)
154-
val objectReader = DruidGuicer.Default.objectMapper.reader(
155-
new InjectableValues
156-
{
157-
override def findInjectableValue(
158-
valueId: Any,
159-
ctxt: DeserializationContext,
160-
forProperty: databind.BeanProperty,
161-
beanInstance: scala.Any
162-
): AnyRef =
163-
{
164-
valueId match {
165-
case k: Key[_] if k.getTypeLiteral.getRawType == classOf[ChatHandlerProvider] => new NoopChatHandlerProvider
166-
case k: Key[_] if k.getTypeLiteral.getRawType == classOf[ObjectMapper] => DruidGuicer.Default.objectMapper
167-
case k: Key[_] if k.getTypeLiteral.getRawType == classOf[EventReceiverFirehoseRegister] =>
168-
new EventReceiverFirehoseRegister
169-
}
170-
}
171-
}
172-
).withType(classOf[Task])
173-
174-
val task = objectReader.readValue(taskBytes).asInstanceOf[RealtimeIndexTask]
165+
val task = defaultObjectReader().readValue(taskBytes).asInstanceOf[RealtimeIndexTask]
175166
task.getId should be("index_realtime_mydatasource_2000-01-01T00:00:00.000Z_1_2")
176167
task.getDataSource should be("mydatasource")
177168
task.getTaskResource.getAvailabilityGroup should be("mygroup")
@@ -202,4 +193,39 @@ class DruidBeamTest extends FunSuite with Matchers
202193
parseSpec.getDimensionsSpec.getDimensions.asScala.map(_.getName) should be(Seq("dim1", "dim2", "spatial1"))
203194
parseSpec.getDimensionsSpec.getSpatialDimensions.asScala.map(_.getDimName) should be(Seq("spatial1"))
204195
}
196+
197+
test("Attribute isRollup should be passed to task") {
198+
val isRollup = false
199+
val druidBeamMaker = new DruidBeamMaker[Dict](
200+
DruidBeamConfig(),
201+
DruidLocation.create("druid/overlord", "mydatasource"),
202+
ClusteredBeamTuning(),
203+
DruidTuning().toMap,
204+
DruidRollup(
205+
dimensions = SpecificDruidDimensions(Seq(), Seq()),
206+
aggregators = Seq(),
207+
indexGranularity = QueryGranularities.NONE,
208+
// isRollup is set for test.
209+
isRollup
210+
),
211+
new TimestampSpec("ts", "iso", null),
212+
null,
213+
null,
214+
emptyEmitter(),
215+
null,
216+
DruidGuicer.Default.objectMapper
217+
)
218+
219+
val interval = new Interval("2000/PT1H", ISOChronology.getInstanceUTC)
220+
val taskBytes = druidBeamMaker.taskBytes(
221+
interval,
222+
"mygroup",
223+
"myfirehose",
224+
1,
225+
2
226+
)
227+
val task = defaultObjectReader().readValue(taskBytes).asInstanceOf[RealtimeIndexTask]
228+
229+
task.getRealtimeIngestionSchema.getDataSchema.getGranularitySpec.isRollup should be(isRollup)
230+
}
205231
}

core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala

+14-7
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class DruidRollupTest extends FunSuite with Matchers
3636
val rollup = DruidRollup(
3737
SpecificDruidDimensions(Vector("hey", "what"), Vector.empty),
3838
Seq(new CountAggregatorFactory("heyyo")),
39-
QueryGranularities.NONE
39+
QueryGranularities.NONE,
40+
true
4041
)
4142
rollup.validate()
4243
}
@@ -46,7 +47,8 @@ class DruidRollupTest extends FunSuite with Matchers
4647
DruidRollup(
4748
SpecificDruidDimensions(Vector("hey", "what"), Vector.empty),
4849
Seq(new CountAggregatorFactory("hey")),
49-
QueryGranularities.NONE
50+
QueryGranularities.NONE,
51+
true
5052
)
5153
}
5254
e.getMessage should be("Duplicate columns: hey")
@@ -57,7 +59,8 @@ class DruidRollupTest extends FunSuite with Matchers
5759
DruidRollup(
5860
SpecificDruidDimensions(Vector("what"), Vector.empty),
5961
Seq(new CountAggregatorFactory("hey"), new LongSumAggregatorFactory("hey", "blah")),
60-
QueryGranularities.NONE
62+
QueryGranularities.NONE,
63+
true
6164
)
6265
}
6366
e.getMessage should be("Duplicate columns: hey")
@@ -68,7 +71,8 @@ class DruidRollupTest extends FunSuite with Matchers
6871
DruidRollup(
6972
SpecificDruidDimensions(Vector("what", "what"), Vector.empty),
7073
Seq(new CountAggregatorFactory("hey")),
71-
QueryGranularities.NONE
74+
QueryGranularities.NONE,
75+
true
7276
)
7377
}
7478
e.getMessage should be("Duplicate columns: what")
@@ -78,7 +82,8 @@ class DruidRollupTest extends FunSuite with Matchers
7882
val rollup = DruidRollup(
7983
SpecificDruidDimensions(Vector("e", "f", "a", "b", "z", "t"), Vector.empty),
8084
Seq(new CountAggregatorFactory("hey")),
81-
QueryGranularities.NONE
85+
QueryGranularities.NONE,
86+
true
8287
)
8388
rollup.dimensions.specMap.get("dimensions").asInstanceOf[java.util.List[String]].asScala should
8489
be(Seq("e", "f", "a", "b", "z", "t"))
@@ -88,7 +93,8 @@ class DruidRollupTest extends FunSuite with Matchers
8893
val rollup = DruidRollup(
8994
SpecificDruidDimensions(Seq("foo", "bar")),
9095
Seq(new LongSumAggregatorFactory("hey", "there")),
91-
QueryGranularities.NONE
96+
QueryGranularities.NONE,
97+
true
9298
)
9399
val timestampSpec = new TimestampSpec("t", "auto", null)
94100
rollup.isStringDimension(timestampSpec, "t") should be(false)
@@ -104,7 +110,8 @@ class DruidRollupTest extends FunSuite with Matchers
104110
val rollup = DruidRollup(
105111
SchemalessDruidDimensions(Set("qux")),
106112
Seq(new LongSumAggregatorFactory("hey", "there")),
107-
QueryGranularities.NONE
113+
QueryGranularities.NONE,
114+
true
108115
)
109116
val timestampSpec = new TimestampSpec("t", "auto", null)
110117
rollup.isStringDimension(timestampSpec, "t") should be(false)

0 commit comments

Comments
 (0)