diff --git a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/actor/BerryClient.scala b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/actor/BerryClient.scala index 1667c519f..f59c3ade8 100644 --- a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/actor/BerryClient.scala +++ b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/actor/BerryClient.scala @@ -39,7 +39,7 @@ class BerryClient(val jsonParser: JSONParser, private case class QueryGroup(ts: DateTime, curSender: ActorRef, queries: Seq[QueryInfo], postTransform: IPostTransform) - private case class Initial(ts: DateTime, sender: ActorRef, targetMillis: Long, queries: Seq[Query], infos: Seq[DataSetInfo], postTransform: IPostTransform) + private case class Initial(ts: DateTime, sender: ActorRef, targetMillis: Long, queries: Seq[Query], infos: Map[String, DataSetInfo], postTransform: IPostTransform) private case class PartialResult(queryGroup: QueryGroup, jsons: Seq[JsArray]) @@ -52,12 +52,12 @@ class BerryClient(val jsonParser: JSONParser, case request: Request => handleNewRequest(request, out.getOrElse(sender())) case initial: Initial if initial.ts == curKey => - val queryInfos = initial.queries.zip(initial.infos).map { - case (query, info) => - val bound = query.getTimeInterval(info.schema.timeField).getOrElse(new TInterval(info.dataInterval.getStart, DateTime.now)) - val merger = planner.calculateMergeFunc(query, info.schema) - val queryWOTime = query.copy(filter = query.filter.filterNot(_.field == info.schema.timeField)) - QueryInfo(queryWOTime, info, bound, merger) + val queryInfos = initial.queries.map { query => + val info = initial.infos(query.dataset) + val bound = query.getTimeInterval(info.schema.timeField).getOrElse(new TInterval(info.dataInterval.getStart, DateTime.now)) + val merger = planner.calculateMergeFunc(query, info.schema) + val queryWOTime = query.copy(filter = query.filter.filterNot(_.field == info.schema.timeField)) + QueryInfo(queryWOTime, info, bound, merger) } val min = queryInfos.map(_.queryBound.getStartMillis).min val max = queryInfos.map(_.queryBound.getEndMillis).max @@ -95,7 +95,8 @@ class BerryClient(val jsonParser: JSONParser, } } else { val targetMillis = runOption.sliceMills - self ! Initial(key, curSender, targetMillis, queries, seqInfos.map(_.get), request.postTransform) + val mapInfos = seqInfos.map(_.get).map(info => info.name -> info).toMap + self ! Initial(key, curSender, targetMillis, queries, mapInfos, request.postTransform) } } } diff --git a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/actor/ReactiveBerryClientTest.scala b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/actor/ReactiveBerryClientTest.scala index c11154746..2a1c9bbf7 100644 --- a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/actor/ReactiveBerryClientTest.scala +++ b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/actor/ReactiveBerryClientTest.scala @@ -7,7 +7,7 @@ import edu.uci.ics.cloudberry.zion.TInterval import edu.uci.ics.cloudberry.zion.common.Config import edu.uci.ics.cloudberry.zion.model.impl.QueryPlanner.{IMerger, Unioner} import edu.uci.ics.cloudberry.zion.model.impl.{JSONParser, QueryPlanner, TestQuery} -import edu.uci.ics.cloudberry.zion.model.schema.{CreateView, Query, QueryExeOption, TimeField} +import edu.uci.ics.cloudberry.zion.model.schema.{CreateView, Query, QueryExeOption, Field, TimeField} import org.joda.time.{DateTime, DateTimeZone} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -67,6 +67,45 @@ class ReactiveBerryClientTest extends TestkitExample with SpecificationLike with |} """.stripMargin) + val dayCountJSON = Json.parse( + s""" + |{ + | "dataset": "twitter.ds_tweet", + | "filter": [ + | { + | "field": "create_at", + | "relation": "inRange", + | "values": [ + | "${TimeField.TimeFormat.print(startTime)}", + | "${TimeField.TimeFormat.print(endTime)}" + | ] + | }], + | "group": { + | "by": [ + | { + | "field": "create_at", + | "apply": { + | "name": "interval", + | "args" : { + | "unit": "day" + | } + | }, + | "as": "day" + | } + | ], + | "aggregate": [ + | { + | "field": "*", + | "apply": { + | "name" : "count" + | }, + | "as": "count" + | } + | ] + | } + |} + """.stripMargin) + val endTime2 = new DateTime(2016, 11, 30, 0, 0) val hourCountJSON2 = Json.parse( s""" @@ -113,13 +152,15 @@ class ReactiveBerryClientTest extends TestkitExample with SpecificationLike with |} """.stripMargin) + def makeOptionJsonObj(json: JsValue): JsObject = { json.asInstanceOf[JsObject] + ("option" -> Json.obj(QueryExeOption.TagSliceMillis -> JsNumber(50))) } def getRet(i: Int) = JsArray(Seq(JsObject(Seq("hour" -> JsNumber(i), "count" -> JsNumber(i))))) - sequential + def getGroupAs(query: Query): Option[Field] = + query.groups.get.bys(0).as "Client" should { "slice the query into small pieces and return the merged result incrementally" in { @@ -178,6 +219,62 @@ class ReactiveBerryClientTest extends TestkitExample with SpecificationLike with sender.expectMsg(JsArray(Seq(getRet(1) ++ getRet(2) ++ getRet(3)))) ok } + + "slice a query batch should generate a slice for each query" in { + val sender = new TestProbe(system) + val dataManager = new TestProbe(system) + val parser = new JSONParser + val mockPlanner = mock[QueryPlanner] + when(mockPlanner.calculateMergeFunc(any, any)).thenReturn(QueryPlanner.Unioner) + //Return the input query + when(mockPlanner.makePlan(any, any, any)).thenAnswer(new Answer[(Seq[Query], IMerger)] { + override def answer(invocation: InvocationOnMock): (Seq[Query], IMerger) = { + val query = invocation.getArguments().head.asInstanceOf[Query] + (Seq(query), Unioner) + } + }) + + val client = system.actorOf(BerryClient.props(parser, dataManager.ref, mockPlanner, Config.Default)) + sender.send(client, makeOptionJsonObj(JsObject(Seq("batch" -> JsArray(Seq(hourCountJSON, dayCountJSON)))))) + + val askInfo = dataManager.receiveOne(5 seconds).asInstanceOf[DataStoreManager.AskInfo] + askInfo.who must_== "twitter.ds_tweet" + dataManager.reply(Some(TestQuery.sourceInfo)) + + dataManager.receiveOne(5 seconds).asInstanceOf[DataStoreManager.AskInfoAndViews] + dataManager.reply(Seq(TestQuery.sourceInfo)) + + dataManager.receiveOne(5 seconds).asInstanceOf[DataStoreManager.AskInfoAndViews] + dataManager.reply(Seq(TestQuery.sourceInfo)) + + val slicedQ1 = dataManager.receiveOne(5 seconds).asInstanceOf[Query] + val interval1 = slicedQ1.getTimeInterval(TimeField("create_at")).get + interval1.getEnd must_== endTime + interval1.toDurationMillis must_== Config.Default.FirstQueryTimeGap.toMillis + + dataManager.reply( + getGroupAs(slicedQ1) match { + case Some(TimeField("hour", _)) => getRet(1) + case Some(TimeField("day", _)) => getRet(2) + } + ) + + val slicedQ2 = dataManager.receiveOne(5 seconds).asInstanceOf[Query] + val interval2 = slicedQ2.getTimeInterval(TimeField("create_at")).get + interval2.getEnd must_== endTime + interval2.toDurationMillis must_== Config.Default.FirstQueryTimeGap.toMillis + + dataManager.reply( + getGroupAs(slicedQ2) match { + case Some(TimeField("hour", _)) => getRet(1) + case Some(TimeField("day", _)) => getRet(2) + } + ) + sender.expectMsg(JsArray(Seq(getRet(1), getRet(2)))) + + ok + } + "suggest the view at the end of the last query finishes" in { val sender = new TestProbe(system) val dataManager = new TestProbe(system)