Skip to content

Commit

Permalink
Fix a bug that only the first query in a batch is executed by BerryCl…
Browse files Browse the repository at this point in the history
…ient (#292)

* fix a bug that only the first query in a batch is executed by BerryClient

* Add a test case for slicing a query batch

* Introduce another dayCountJSON for testing slicing query batch

* fix the testcase
  • Loading branch information
luochen01 authored Apr 4, 2017
1 parent f40f2a6 commit fea3892
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BerryClient(val jsonParser: JSONParser,

private case class QueryGroup(ts: DateTime, curSender: ActorRef, queries: Seq[QueryInfo], postTransform: IPostTransform)

private case class Initial(ts: DateTime, sender: ActorRef, targetMillis: Long, queries: Seq[Query], infos: Seq[DataSetInfo], postTransform: IPostTransform)
private case class Initial(ts: DateTime, sender: ActorRef, targetMillis: Long, queries: Seq[Query], infos: Map[String, DataSetInfo], postTransform: IPostTransform)

private case class PartialResult(queryGroup: QueryGroup, jsons: Seq[JsArray])

Expand All @@ -52,12 +52,12 @@ class BerryClient(val jsonParser: JSONParser,
case request: Request =>
handleNewRequest(request, out.getOrElse(sender()))
case initial: Initial if initial.ts == curKey =>
val queryInfos = initial.queries.zip(initial.infos).map {
case (query, info) =>
val bound = query.getTimeInterval(info.schema.timeField).getOrElse(new TInterval(info.dataInterval.getStart, DateTime.now))
val merger = planner.calculateMergeFunc(query, info.schema)
val queryWOTime = query.copy(filter = query.filter.filterNot(_.field == info.schema.timeField))
QueryInfo(queryWOTime, info, bound, merger)
val queryInfos = initial.queries.map { query =>
val info = initial.infos(query.dataset)
val bound = query.getTimeInterval(info.schema.timeField).getOrElse(new TInterval(info.dataInterval.getStart, DateTime.now))
val merger = planner.calculateMergeFunc(query, info.schema)
val queryWOTime = query.copy(filter = query.filter.filterNot(_.field == info.schema.timeField))
QueryInfo(queryWOTime, info, bound, merger)
}
val min = queryInfos.map(_.queryBound.getStartMillis).min
val max = queryInfos.map(_.queryBound.getEndMillis).max
Expand Down Expand Up @@ -95,7 +95,8 @@ class BerryClient(val jsonParser: JSONParser,
}
} else {
val targetMillis = runOption.sliceMills
self ! Initial(key, curSender, targetMillis, queries, seqInfos.map(_.get), request.postTransform)
val mapInfos = seqInfos.map(_.get).map(info => info.name -> info).toMap
self ! Initial(key, curSender, targetMillis, queries, mapInfos, request.postTransform)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import edu.uci.ics.cloudberry.zion.TInterval
import edu.uci.ics.cloudberry.zion.common.Config
import edu.uci.ics.cloudberry.zion.model.impl.QueryPlanner.{IMerger, Unioner}
import edu.uci.ics.cloudberry.zion.model.impl.{JSONParser, QueryPlanner, TestQuery}
import edu.uci.ics.cloudberry.zion.model.schema.{CreateView, Query, QueryExeOption, TimeField}
import edu.uci.ics.cloudberry.zion.model.schema.{CreateView, Query, QueryExeOption, Field, TimeField}
import org.joda.time.{DateTime, DateTimeZone}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
Expand Down Expand Up @@ -67,6 +67,45 @@ class ReactiveBerryClientTest extends TestkitExample with SpecificationLike with
|}
""".stripMargin)

val dayCountJSON = Json.parse(
s"""
|{
| "dataset": "twitter.ds_tweet",
| "filter": [
| {
| "field": "create_at",
| "relation": "inRange",
| "values": [
| "${TimeField.TimeFormat.print(startTime)}",
| "${TimeField.TimeFormat.print(endTime)}"
| ]
| }],
| "group": {
| "by": [
| {
| "field": "create_at",
| "apply": {
| "name": "interval",
| "args" : {
| "unit": "day"
| }
| },
| "as": "day"
| }
| ],
| "aggregate": [
| {
| "field": "*",
| "apply": {
| "name" : "count"
| },
| "as": "count"
| }
| ]
| }
|}
""".stripMargin)

val endTime2 = new DateTime(2016, 11, 30, 0, 0)
val hourCountJSON2 = Json.parse(
s"""
Expand Down Expand Up @@ -113,13 +152,15 @@ class ReactiveBerryClientTest extends TestkitExample with SpecificationLike with
|}
""".stripMargin)


def makeOptionJsonObj(json: JsValue): JsObject = {
json.asInstanceOf[JsObject] + ("option" -> Json.obj(QueryExeOption.TagSliceMillis -> JsNumber(50)))
}

def getRet(i: Int) = JsArray(Seq(JsObject(Seq("hour" -> JsNumber(i), "count" -> JsNumber(i)))))

sequential
def getGroupAs(query: Query): Option[Field] =
query.groups.get.bys(0).as

"Client" should {
"slice the query into small pieces and return the merged result incrementally" in {
Expand Down Expand Up @@ -178,6 +219,62 @@ class ReactiveBerryClientTest extends TestkitExample with SpecificationLike with
sender.expectMsg(JsArray(Seq(getRet(1) ++ getRet(2) ++ getRet(3))))
ok
}

"slice a query batch should generate a slice for each query" in {
val sender = new TestProbe(system)
val dataManager = new TestProbe(system)
val parser = new JSONParser
val mockPlanner = mock[QueryPlanner]
when(mockPlanner.calculateMergeFunc(any, any)).thenReturn(QueryPlanner.Unioner)
//Return the input query
when(mockPlanner.makePlan(any, any, any)).thenAnswer(new Answer[(Seq[Query], IMerger)] {
override def answer(invocation: InvocationOnMock): (Seq[Query], IMerger) = {
val query = invocation.getArguments().head.asInstanceOf[Query]
(Seq(query), Unioner)
}
})

val client = system.actorOf(BerryClient.props(parser, dataManager.ref, mockPlanner, Config.Default))
sender.send(client, makeOptionJsonObj(JsObject(Seq("batch" -> JsArray(Seq(hourCountJSON, dayCountJSON))))))

val askInfo = dataManager.receiveOne(5 seconds).asInstanceOf[DataStoreManager.AskInfo]
askInfo.who must_== "twitter.ds_tweet"
dataManager.reply(Some(TestQuery.sourceInfo))

dataManager.receiveOne(5 seconds).asInstanceOf[DataStoreManager.AskInfoAndViews]
dataManager.reply(Seq(TestQuery.sourceInfo))

dataManager.receiveOne(5 seconds).asInstanceOf[DataStoreManager.AskInfoAndViews]
dataManager.reply(Seq(TestQuery.sourceInfo))

val slicedQ1 = dataManager.receiveOne(5 seconds).asInstanceOf[Query]
val interval1 = slicedQ1.getTimeInterval(TimeField("create_at")).get
interval1.getEnd must_== endTime
interval1.toDurationMillis must_== Config.Default.FirstQueryTimeGap.toMillis

dataManager.reply(
getGroupAs(slicedQ1) match {
case Some(TimeField("hour", _)) => getRet(1)
case Some(TimeField("day", _)) => getRet(2)
}
)

val slicedQ2 = dataManager.receiveOne(5 seconds).asInstanceOf[Query]
val interval2 = slicedQ2.getTimeInterval(TimeField("create_at")).get
interval2.getEnd must_== endTime
interval2.toDurationMillis must_== Config.Default.FirstQueryTimeGap.toMillis

dataManager.reply(
getGroupAs(slicedQ2) match {
case Some(TimeField("hour", _)) => getRet(1)
case Some(TimeField("day", _)) => getRet(2)
}
)
sender.expectMsg(JsArray(Seq(getRet(1), getRet(2))))

ok
}

"suggest the view at the end of the last query finishes" in {
val sender = new TestProbe(system)
val dataManager = new TestProbe(system)
Expand Down

0 comments on commit fea3892

Please sign in to comment.