Skip to content

Commit dbf3eea

Browse files
committed
handle segment granularity changes
1 parent 2ae0c26 commit dbf3eea

File tree

14 files changed

+295
-89
lines changed

14 files changed

+295
-89
lines changed

core/src/main/scala/com/metamx/tranquility/beam/Beam.scala

+10-1
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
package com.metamx.tranquility.beam
1818

1919
import com.twitter.util.Future
20+
import org.joda.time.Interval
2021

2122
/**
2223
* Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means
2324
* the beam should be discarded (after calling close()).
2425
*/
25-
trait Beam[A]
26+
trait Beam[A] extends DiscoverableInterval
2627
{
2728
/**
2829
* Request propagation of events. The operation may fail in various ways, which tend to be specific to
@@ -40,3 +41,11 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t)
4041
{
4142
def this(s: String) = this(s, null)
4243
}
44+
45+
trait DiscoverableInterval
46+
{
47+
/**
48+
* Returns the interval handled by the Beam, can return None if there is no associated interval
49+
* */
50+
def getInterval(): Option[Interval]
51+
}

core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala

+150-65
Large diffs are not rendered by default.

core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala

+3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ class HttpBeam[A: Timestamper](
5858
emitter: ServiceEmitter
5959
) extends Beam[A] with Logging
6060
{
61+
62+
def getInterval() = None
63+
6164
private[this] implicit val timer: Timer = DefaultTimer.twitter
6265

6366
private[this] val port = if (uri.port > 0) {

core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class MemoryBeam[A](
2828
jsonWriter: JsonWriter[A]
2929
) extends Beam[A]
3030
{
31+
def getInterval() = None
32+
3133
def propagate(events: Seq[A]) = {
3234
events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach {
3335
d =>

core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala

+7
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,12 @@ class MergingPartitioningBeam[A](
4444
Future.collect(beams map (_.close())) map (_ => ())
4545
}
4646

47+
def getInterval() = {
48+
beams.headOption match {
49+
case Some(x) => x.getInterval()
50+
case None => None
51+
}
52+
}
53+
4754
override def toString = s"MergingPartitioningBeam(${beams.mkString(", ")})"
4855
}

core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.metamx.tranquility.beam
1818

1919
import com.twitter.util.Future
20+
import org.joda.time.Interval
21+
import org.joda.time.chrono.ISOChronology
2022

2123
class NoopBeam[A] extends Beam[A]
2224
{
@@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A]
2527
def close() = Future.Done
2628

2729
override def toString = "NoopBeam()"
30+
31+
def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC))
2832
}

core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala

+7
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,11 @@ class RoundRobinBeam[A](
3838
}
3939

4040
override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ")
41+
42+
def getInterval() = {
43+
beams.headOption match {
44+
case Some(x) => x.getInterval()
45+
case None => None
46+
}
47+
}
4148
}

core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package com.metamx.tranquility.druid
1919
import com.metamx.common.scala.Logging
2020
import com.metamx.common.scala.Predef._
2121
import com.metamx.emitter.service.ServiceEmitter
22-
import com.metamx.tranquility.beam.Beam
23-
import com.metamx.tranquility.beam.DefunctBeamException
22+
import com.metamx.tranquility.beam.{Beam, DefunctBeamException}
2423
import com.metamx.tranquility.finagle._
2524
import com.metamx.tranquility.typeclass.ObjectWriter
2625
import com.twitter.util.Closable
@@ -44,6 +43,9 @@ class DruidBeam[A](
4443
objectWriter: ObjectWriter[A]
4544
) extends Beam[A] with Logging with Closable
4645
{
46+
47+
def getInterval() = Some(interval)
48+
4749
private[this] val clients = Map(
4850
tasks map {
4951
task =>

core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala

+1-8
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper](
138138
}
139139

140140
override def newBeam(interval: Interval, partition: Int) = {
141-
require(
142-
beamTuning.segmentGranularity.widen(interval) == interval,
143-
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
144-
)
145141
val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId(
146142
location.dataSource,
147143
beamTuning.segmentGranularity,
@@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper](
190186
// Backwards compatibility (see toDict).
191187
beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC))
192188
}
193-
require(
194-
beamTuning.segmentGranularity.widen(interval) == interval,
195-
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
196-
)
189+
197190
val partition = int(d("partition"))
198191
val tasks = if (d contains "tasks") {
199192
list(d("tasks")).map(dict(_)).map(d => TaskPointer(str(d("id")), str(d("firehoseId"))))

core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala

+3
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ object DruidBeams
221221
def close() = clusteredBeam.close() map (_ => lifecycle.stop())
222222

223223
override def toString = clusteredBeam.toString
224+
225+
def getInterval() = clusteredBeam.getInterval()
226+
224227
}
225228
}
226229

core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala

+2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging
5656
}
5757

5858
override def close() = memoryBeam.close()
59+
60+
def getInterval() = None
5961
}
6062

6163
val acked = new AtomicLong()

core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala

+95-13
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework
4444
import org.joda.time.DateTimeZone
4545
import org.joda.time.DateTime
4646
import org.joda.time.Interval
47+
import org.joda.time.chrono.ISOChronology
4748
import org.scala_tools.time.Implicits._
4849
import org.scalatest.BeforeAndAfter
4950
import org.scalatest.FunSuite
@@ -68,7 +69,14 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
6869
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "e")),
6970
SimpleEvent(new DateTime("2012-01-01T01:20Z"), Map("foo" -> "f")),
7071
SimpleEvent(new DateTime("2012-01-01T03:05Z"), Map("foo" -> "g")),
71-
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h"))
72+
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")),
73+
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "i")),
74+
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "j")),
75+
SimpleEvent(new DateTime("2012-01-01T01:07Z"), Map("foo" -> "k")),
76+
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "l")),
77+
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "m")),
78+
SimpleEvent(new DateTime("2012-01-01T01:09Z"), Map("foo" -> "n")),
79+
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "o"))
7280
) map {
7381
x => x.fields("foo") -> x
7482
}).toMap
@@ -79,14 +87,18 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
7987
val localZone = new DateTime().getZone
8088

8189
def buffers = _lock.synchronized {
82-
_buffers.values.map(x => (x.timestamp.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
90+
_buffers.values.map(x => (x.interval.start.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
91+
}
92+
93+
def buffersWithInterval = _lock.synchronized {
94+
_buffers.values.map(x => (x.interval, x.partition, x.open, x.buffer.toSeq)).toSet
8395
}
8496

8597
def beamsList = _lock.synchronized {
8698
_beams.toList
8799
}
88100

89-
class EventBuffer(val timestamp: DateTime, val partition: Int)
101+
class EventBuffer(val interval: Interval, val partition: Int)
90102
{
91103
val buffer: mutable.Buffer[SimpleEvent] = mutable.ListBuffer()
92104
@volatile var open: Boolean = true
@@ -109,20 +121,24 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
109121
def close() = {
110122
beam.close()
111123
}
124+
125+
def getInterval() = None
112126
}
113127

114-
class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString)
128+
class TestingBeam(val interval: Interval, val partition: Int, val uuid: String = UUID.randomUUID().toString)
115129
extends Beam[SimpleEvent]
116130
{
117131
_lock.synchronized {
118132
_beams += this
119133
}
120134

135+
def getInterval() = Some(interval)
136+
121137
def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized {
122138
if (_events.contains(events("defunct"))) {
123139
Future.exception(new DefunctBeamException("Defunct"))
124140
} else {
125-
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
141+
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
126142
buffer.open = true
127143
buffer.buffer ++= _events
128144
Future.value(_events.size)
@@ -131,35 +147,35 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
131147

132148
def close() = _lock.synchronized {
133149
_beams -= this
134-
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
150+
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
135151
buffer.open = false
136152
Future.Done
137153
}
138154

139155
def toDict = Dict(
140-
"timestamp" -> timestamp.toString(),
156+
"interval" -> interval.toString,
141157
"partition" -> partition,
142158
"uuid" -> uuid
143159
)
144160
}
145161

146162
class TestingBeamMaker extends BeamMaker[SimpleEvent, TestingBeam]
147163
{
148-
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval.start, partition)
164+
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval, partition)
149165

150166
def toDict(beam: TestingBeam) = {
151167
Dict(
152-
"timestamp" -> beam.timestamp.toString(),
168+
"interval" -> beam.interval.toString,
153169
"partition" -> beam.partition,
154170
"uuid" -> beam.uuid
155171
)
156172
}
157173

158174
def fromDict(d: Dict) = {
159-
val timestamp = new DateTime(d("timestamp"))
175+
val interval= new Interval(d("interval"))
160176
val partition = int(d("partition"))
161177
val uuid = str(d("uuid"))
162-
new TestingBeam(timestamp, partition, uuid)
178+
new TestingBeam(interval, partition, uuid)
163179
}
164180
}
165181

@@ -353,6 +369,72 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
353369
}
354370
}
355371

372+
test("IncreaseGranularity") {
373+
withLocalCurator {
374+
curator =>
375+
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.MINUTE, windowPeriod = 1.minute)
376+
val newTuning = oldTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)
377+
378+
val beamsA = newBeams(curator, oldTuning)
379+
beamsA.timekeeper.now = start
380+
beamsA.blockagate(Seq("i") map events)
381+
beamsA.blockagate(Seq("i") map events)
382+
beamsA.timekeeper.now = start + 1.minute
383+
beamsA.blockagate(Seq("j") map events)
384+
beamsA.blockagate(Seq("j") map events)
385+
386+
val beamsB = newBeams(curator, newTuning)
387+
beamsB.timekeeper.now = start + 2.minute
388+
beamsB.blockagate(Seq("k") map events)
389+
beamsB.blockagate(Seq("k") map events)
390+
beamsB.blockagate(Seq("l") map events)
391+
beamsB.blockagate(Seq("l") map events)
392+
beamsB.blockagate(Seq("m") map events)
393+
beamsB.blockagate(Seq("m") map events)
394+
beamsB.blockagate(Seq("n") map events)
395+
beamsB.blockagate(Seq("n") map events)
396+
397+
Await.result(beamsA.close())
398+
399+
assert(buffersWithInterval === Set(
400+
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 0, false, Seq("i") map events),
401+
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 1, false, Seq("i") map events),
402+
// "j" and "l" are in same partition as diff beams were used to propagate them
403+
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 0, false, Seq("j", "l") map events),
404+
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 1, false, Seq("j", "l") map events),
405+
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, true, Seq("k", "n") map events),
406+
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, true, Seq("k", "n") map events)
407+
))
408+
}
409+
}
410+
411+
test("DecreaseGranularity") {
412+
withLocalCurator {
413+
curator =>
414+
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)
415+
val newTuning = oldTuning.copy(segmentGranularity = Granularity.MINUTE)
416+
417+
val beamsA = newBeams(curator, oldTuning)
418+
beamsA.timekeeper.now = start
419+
beamsA.blockagate(Seq("i") map events)
420+
421+
val beamsB = newBeams(curator, newTuning)
422+
beamsB.timekeeper.now = start + 4.minute
423+
beamsB.blockagate(Seq("j") map events)
424+
beamsB.blockagate(Seq("n") map events)
425+
beamsB.blockagate(Seq("o") map events)
426+
beamsB.blockagate(Seq("o") map events)
427+
Await.result(beamsB.close())
428+
429+
assert(buffersWithInterval === Set(
430+
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, false, Seq("i", "j") map events),
431+
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, false, Seq("n") map events),
432+
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 0, false, Seq("o") map events),
433+
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 1, false, Seq("o") map events)
434+
))
435+
}
436+
}
437+
356438
test("DefunctBeam") {
357439
withLocalCurator {
358440
curator =>
@@ -385,10 +467,10 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
385467
))
386468
val desired = List("2012-01-01T00Z", "2012-01-01T00Z", "2012-01-01T01Z", "2012-01-01T01Z").map(new DateTime(_))
387469
val startTime = System.currentTimeMillis()
388-
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.timestamp).sortBy(_.millis) != desired) {
470+
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.interval.start).sortBy(_.millis) != desired) {
389471
Thread.sleep(100)
390472
}
391-
assert(beamsList.map(_.timestamp).sortBy(_.millis) === desired)
473+
assert(beamsList.map(_.interval.start).sortBy(_.millis) === desired)
392474
}
393475
}
394476

core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala

+5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import com.metamx.tranquility.beam.HashPartitionBeam
2424
import com.twitter.util.Await
2525
import com.twitter.util.Future
2626
import java.util.concurrent.CopyOnWriteArrayList
27+
import org.joda.time.Interval
2728
import org.scalatest.FunSuite
2829
import org.scalatest.Matchers
2930
import scala.collection.JavaConverters._
@@ -44,6 +45,10 @@ class HashPartitionBeamTest extends FunSuite with Matchers
4445
Future(events.size)
4546
}
4647

48+
override def getInterval() = {
49+
None
50+
}
51+
4752
override def close() = Future.Done
4853
}
4954

storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent]
5252
}
5353

5454
def close() = Future.Done
55+
56+
def getInterval() = None
5557
}
5658

5759
object SimpleBeam

0 commit comments

Comments
 (0)