diff --git a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/AsterixQueryGenerator.scala b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/AsterixQueryGenerator.scala new file mode 100644 index 000000000..7c778dc82 --- /dev/null +++ b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/AsterixQueryGenerator.scala @@ -0,0 +1,279 @@ +package edu.uci.ics.cloudberry.zion.model.impl + +import edu.uci.ics.cloudberry.zion.model.datastore.{IQLGenerator, QueryParsingException} +import edu.uci.ics.cloudberry.zion.model.schema._ + + +/** + * Defines constant strings for query languages supported by AsterixDB + */ +trait AsterixImpl { + + val aggregateFuncMap: Map[AggregateFunc, String] + + def getAggregateStr(aggregate: AggregateFunc): String = { + aggregateFuncMap.get(aggregate) match { + case Some(impl) => impl + case None => throw new QueryParsingException(s"No implementation is provided for aggregate function ${aggregate.name}") + } + } + + val datetime: String + val round: String + + val dayTimeDuration: String + val yearMonthDuration: String + val getIntervalStartDatetime: String + val intervalBin: String + + val spatialIntersect: String + val createRectangle: String + val createPoint: String + val spatialCell: String + val getPoints: String + + + val similarityJaccard: String + val contains: String + val wordTokens: String + +} + +object AsterixImpl { + + +} + + +abstract class AsterixQueryGenerator extends IQLGenerator { + + /** + * represent the expression for a [[Field]] + * + * @param refExpr the expression for referring this field by the subsequent statements + * @param defExpr the expression the defines the field + * + */ + case class FieldExpr(refExpr: String, defExpr: String) + + /** + * Partial parse results after parsing each [[Statement]] + * + * @param strs a sequence of parsed query strings, which would be composed together later. + * @param exprMap a new field expression map + */ + case class ParsedResult(strs: Seq[String], exprMap: Map[String, FieldExpr]) + + protected def typeImpl: AsterixImpl + + protected def quote: String + + protected def sourceVar: String + + protected def lookupVar: String + + protected def unnestVar: String + + protected def groupVar: String + + protected def globalAggrVar: String + + protected def outerSelectVar: String + + /** + * The suffix (such as ";") appended to the query string + * @return + */ + protected def suffix: String + + + /** + * Returns a query string query after parsing the query object. + * + * @param query [[IQuery]] object containing query details + * @param schemaMap a map of Dataset name to it's [[Schema]] + * @return query string + **/ + def generate(query: IQuery, schemaMap: Map[String, Schema]): String = { + val result = query match { + case q: Query => + parseQuery(q, schemaMap) + case q: CreateView => parseCreate(q, schemaMap) + case q: AppendView => parseAppend(q, schemaMap) + case q: UpsertRecord => parseUpsert(q, schemaMap) + case q: DropView => ??? + case _ => ??? + } + s"$result$suffix" + } + + protected def parseQuery(query: Query, schemaMap: Map[String, Schema]): String + + protected def parseCreate(query: CreateView, schemaMap: Map[String, Schema]): String + + protected def parseAppend(query: AppendView, schemaMap: Map[String, Schema]): String + + protected def parseUpsert(query: UpsertRecord, schemaMap: Map[String, Schema]): String + + + def calcResultSchema(query: Query, schema: Schema): Schema = { + if (query.lookup.isEmpty && query.groups.isEmpty && query.select.isEmpty) { + schema.copy() + } else { + ??? + } + } + + protected def initExprMap(query: Query, schemaMap: Map[String, Schema]): Map[String, FieldExpr] = { + val schema = schemaMap(query.dataset) + schema.fieldMap.mapValues { f => + f.dataType match { + case DataType.Record => FieldExpr(sourceVar, sourceVar) + case DataType.Hierarchy => FieldExpr(sourceVar, sourceVar) // TODO rethink this type: a type or just a relation between types? + case _ => { + //Add the quote to wrap the name in order to not touch the SQL reserved keyword + val quoted = f.name.split('.').map(name => s"$quote$name$quote").mkString(".") + FieldExpr(s"$sourceVar.$quoted", s"$sourceVar.$quoted") + } + } + } + } + + + //TODO possibly using /*+ skip-index */ hint if the relation selectivity is not high enough + protected def parseFilterRelation(filter: FilterStatement, fieldExpr: String): String = { + filter.field.dataType match { + case DataType.Number => + parseNumberRelation(filter, fieldExpr) + case DataType.Time => + parseTimeRelation(filter, fieldExpr) + case DataType.Point => + parsePointRelation(filter, fieldExpr) + case DataType.Boolean => ??? + case DataType.String => ??? + case DataType.Text => + parseTextRelation(filter, fieldExpr) + case DataType.Bag => ??? + case DataType.Hierarchy => + throw new QueryParsingException("the Hierarchy type doesn't support any relations.") + case _ => throw new QueryParsingException(s"unknown datatype: ${filter.field.dataType}") + } + } + + + protected def parseNumberRelation(filter: FilterStatement, + fieldExpr: String): String + + protected def parseTimeRelation(filter: FilterStatement, + fieldExpr: String): String = { + filter.relation match { + case Relation.inRange => { + s"$fieldExpr >= ${typeImpl.datetime}('${filter.values(0)}') and $fieldExpr < ${typeImpl.datetime}('${filter.values(1)}')" + } + case _ => { + s"$fieldExpr ${filter.relation} ${typeImpl.datetime}('${filter.values(0)}')" + } + } + } + + + protected def parsePointRelation(filter: FilterStatement, + fieldExpr: String): String = { + val values = filter.values.map(_.asInstanceOf[Seq[Double]]) + filter.relation match { + case Relation.inRange => { + s"""${typeImpl.spatialIntersect}($fieldExpr, + | ${typeImpl.createRectangle}(${typeImpl.createPoint}(${values(0)(0)},${values(0)(1)}), + | ${typeImpl.createPoint}(${values(1)(0)},${values(1)(1)}))) + |""".stripMargin + } + } + } + + protected def parseTextRelation(filter: FilterStatement, + fieldExpr: String): String = { + val first = s"${typeImpl.similarityJaccard}(${typeImpl.wordTokens}($fieldExpr), ${typeImpl.wordTokens}('${filter.values.head}')) > 0.0" + val rest = filter.values.tail.map(keyword => s"""and ${typeImpl.contains}($fieldExpr, "$keyword")""") + (first +: rest).mkString("\n") + } + + + protected def parseGeoCell(scale: Double, fieldExpr: String, dataType: DataType.Value): String = { + val origin = s"${typeImpl.createPoint}(0.0,0.0)" + s"${typeImpl.getPoints}(${typeImpl.spatialCell}(${fieldExpr}, $origin, ${1 / scale}, ${1 / scale}))[0]" + } + + protected def parseAggregateFunc(aggregate: AggregateStatement, + fieldExpr: String): String + + + protected def parseIntervalDuration(interval: Interval): String = { + import TimeUnit._ + //PnYnMnDTnHnMn.mmmS + interval.unit match { + case Second => s""" ${typeImpl.dayTimeDuration}("PT${interval.x}S") """ + case Minute => s""" ${typeImpl.dayTimeDuration}("PT${interval.x}M") """ + case Hour => s""" ${typeImpl.dayTimeDuration}("PT${interval.x}H") """ + case Day => s""" ${typeImpl.dayTimeDuration}("P${interval.x}D") """ + case Week => s""" ${typeImpl.dayTimeDuration}("P${interval.x * 7}D") """ + case Month => s""" ${typeImpl.yearMonthDuration}("P${interval.x}M") """ + case Year => s""" ${typeImpl.yearMonthDuration}("P${interval.x}Y") """ + } + } + + protected def parseGroupByFunc(groupBy: ByStatement, fieldExpr: String): String = { + groupBy.funcOpt match { + case Some(func) => + func match { + case bin: Bin => s"${typeImpl.round}($fieldExpr/${bin.scale})*${bin.scale}" + case interval: Interval => + val duration = parseIntervalDuration(interval) + s"${typeImpl.getIntervalStartDatetime}(${typeImpl.intervalBin}($fieldExpr, ${typeImpl.datetime}('1990-01-01T00:00:00.000Z'), $duration))" + case level: Level => + //TODO remove this data type + val hierarchyField = groupBy.field.asInstanceOf[HierarchyField] + val field = hierarchyField.levels.find(_._1 == level.levelTag).get + s"$fieldExpr.${field._2}" + case GeoCellTenth => parseGeoCell(10, fieldExpr, groupBy.field.dataType) + case GeoCellHundredth => parseGeoCell(100, fieldExpr, groupBy.field.dataType) + case GeoCellThousandth => parseGeoCell(1000, fieldExpr, groupBy.field.dataType) + case _ => throw new QueryParsingException(s"unknown function: ${func.name}") + } + case None => fieldExpr + } + } + + + protected def genDDL(schema: Schema): String = { + //FIXME this function is wrong for nested types if it contains multiple sub-fields + def mkNestDDL(names: List[String], typeStr: String): String = { + names match { + case List(e) => s" $e : $typeStr" + case e :: tail => s" $e : { ${mkNestDDL(tail, typeStr)} }" + } + } + + val fields = schema.fieldMap.values.filter(f => f.dataType != DataType.Hierarchy && f != AllField).map { + f => mkNestDDL(f.name.split("\\.").toList, fieldType2ADMType(f) + (if (f.isOptional) "?" else "")) + } + s""" + |create type ${schema.typeName} if not exists as open { + |${fields.mkString(",\n")} + |} + """.stripMargin + } + + protected def fieldType2ADMType(field: Field): String = { + field.dataType match { + case DataType.Number => "double" + case DataType.Time => "datetime" + case DataType.Point => "point" + case DataType.Boolean => "boolean" + case DataType.String => "string" + case DataType.Text => "string" + case DataType.Bag => s"{{${fieldType2ADMType(Field("", field.asInstanceOf[BagField].innerType))}}}" + case DataType.Hierarchy => ??? // should be skipped + case DataType.Record => ??? + } + } +} diff --git a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/SQLPPGenerator.scala b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/SQLPPGenerator.scala new file mode 100644 index 000000000..024dcae1f --- /dev/null +++ b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/SQLPPGenerator.scala @@ -0,0 +1,340 @@ +package edu.uci.ics.cloudberry.zion.model.impl + +import edu.uci.ics.cloudberry.zion.model.datastore.{IQLGenerator, IQLGeneratorFactory, QueryParsingException} +import edu.uci.ics.cloudberry.zion.model.schema._ +import play.api.libs.json.Json + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** + * Provide constant query strings for SQL++ + */ +object SQLPPAsterixImpl extends AsterixImpl { + override val aggregateFuncMap: Map[AggregateFunc, String] = Map( + Count -> "coll_count", + Max -> "coll_max", + Min -> "coll_min", + Avg -> "coll_avg", + Sum -> "coll_sum" + ) + + + val datetime: String = "datetime" + val round: String = "round" + + val dayTimeDuration: String = "day_time_duration" + val yearMonthDuration: String = "year_month_duration" + val getIntervalStartDatetime: String = "get_interval_start_datetime" + val intervalBin: String = "interval_bin" + + val spatialIntersect: String = "spatial_intersect" + val createRectangle: String = "create_rectangle" + val createPoint: String = "create_point" + val spatialCell: String = "spatial_cell" + val getPoints: String = "get_points" + + val similarityJaccard: String = "similarity_jaccard" + val contains: String = "contains" + val wordTokens: String = "word_tokens" + + +} + +class SQLPPGenerator extends AsterixQueryGenerator { + + protected val typeImpl: AsterixImpl = SQLPPAsterixImpl + + protected val sourceVar: String = "t" + + protected val unnestVar: String = "unnest" + + protected val lookupVar: String = "l" + + protected val groupVar: String = "g" + + protected val globalAggrVar: String = "c" + + protected val outerSelectVar: String = "s" + + protected val quote = "`" + + protected val suffix: String = ";" + + def parseCreate(create: CreateView, schemaMap: Map[String, Schema]): String = { + val sourceSchema = schemaMap(create.query.dataset) + val resultSchema = calcResultSchema(create.query, schemaMap(create.query.dataset)) + val ddl: String = genDDL(resultSchema) + val createDataSet = + s""" + |drop dataset ${create.dataset} if exists; + |create dataset ${create.dataset}(${resultSchema.typeName}) primary key ${resultSchema.primaryKey.map(_.name).mkString(",")} //with filter on '${resultSchema.timeField.name}' + |""".stripMargin + val insert = + s""" + |insert into ${create.dataset} ( + |${parseQuery(create.query, schemaMap)} + |)""".stripMargin + ddl + createDataSet + insert + } + + def parseAppend(append: AppendView, schemaMap: Map[String, Schema]): String = { + s""" + |upsert into ${append.dataset} ( + |${parseQuery(append.query, schemaMap)} + |)""".stripMargin + } + + def parseUpsert(q: UpsertRecord, schemaMap: Map[String, Schema]): String = { + s""" + |upsert into ${q.dataset} ( + |${Json.toJson(q.records)} + |)""".stripMargin + } + + def parseQuery(query: Query, schemaMap: Map[String, Schema]): String = { + + val exprMap: Map[String, FieldExpr] = initExprMap(query, schemaMap) + + val resultAfterLookup = parseLookup(query.lookup, exprMap) + val lookupStr = resultAfterLookup.strs(0) + val fromStr = s"from ${query.dataset} $sourceVar $lookupStr".trim + + val resultAfterUnnest = parseUnnest(query.unnest, resultAfterLookup.exprMap) + val unnestStr = resultAfterUnnest.strs(0) + val unnestTests = resultAfterUnnest.strs.tail + + val resultAfterFilter = parseFilter(query.filter, resultAfterUnnest.exprMap, unnestTests) + val filterStr = resultAfterFilter.strs(0) + + val resultAfterGroup = parseGroupby(query.groups, resultAfterFilter.exprMap) + val groupSQL = resultAfterGroup.strs(0) + + val resultAfterSelect = parseSelect(query.select, resultAfterGroup.exprMap, query) + val projectStr = resultAfterSelect.strs(0) + val orderStr = resultAfterSelect.strs(1) + val limitStr = resultAfterSelect.strs(2) + val offsetStr = resultAfterSelect.strs(3) + + + val queryStr = Seq( + projectStr, + fromStr, + unnestStr, + filterStr, + groupSQL, + orderStr, + limitStr, + offsetStr).filter(!_.isEmpty).mkString("\n") + + val resultAfterGlobalAggr = parseGlobalAggr(query.globalAggr, resultAfterSelect.exprMap, queryStr) + resultAfterGlobalAggr.strs.head + } + + + private def parseLookup(lookups: Seq[LookupStatement], + exprMap: Map[String, FieldExpr]): ParsedResult = { + val producedExprs = mutable.Map.newBuilder[String, FieldExpr] + + val lookupStr = lookups.zipWithIndex.map { + case (lookup, id) => + val lookupExpr = s"l$id" + val conditions = lookup.lookupKeys.zip(lookup.sourceKeys).map { + case (lookupKey, sourceKey) => + val sourceExpr = exprMap(sourceKey.name) + s"$lookupExpr.${lookupKey.name} = ${sourceExpr.refExpr}" + } + lookup.as.zip(lookup.selectValues).foreach { + case (as, selectValue) => + val expr = s"$lookupExpr.$quote${selectValue.name}$quote" + producedExprs += (as.name -> FieldExpr(expr, expr)) + } + s"""left outer join ${lookup.dataset} $lookupExpr on ${conditions.mkString(" and ")}""" + }.mkString("\n") + + ParsedResult(Seq(lookupStr), (producedExprs ++= exprMap).result().toMap) + } + + private def parseFilter(filters: Seq[FilterStatement], exprMap: Map[String, FieldExpr], unnestTestStrs: Seq[String]): ParsedResult = { + if (filters.isEmpty && unnestTestStrs.isEmpty) { + ParsedResult(Seq(""), exprMap) + } else { + val filterStrs = filters.map { filter => + parseFilterRelation(filter, exprMap(filter.field.name).refExpr) + } + val filterStr = (unnestTestStrs ++ filterStrs).mkString("where ", " and ", "") + + ParsedResult(Seq(filterStr), exprMap) + } + } + + private def parseUnnest(unnest: Seq[UnnestStatement], + exprMap: Map[String, FieldExpr]): ParsedResult = { + val producedExprs = mutable.Map.newBuilder[String, FieldExpr] + val unnestTestStrs = new ListBuffer[String] + val unnestStr = unnest.zipWithIndex.map { + case (unnest, id) => + val expr = exprMap(unnest.field.name) + val newExpr = s"${quote}unnest$id$quote" + producedExprs += (unnest.as.name -> FieldExpr(newExpr, newExpr)) + if (unnest.field.isOptional) { + unnestTestStrs += s"not(is_null(${expr.refExpr}))" + } + s"unnest ${expr.refExpr} $newExpr" + }.mkString("\n") + + unnestTestStrs.prepend(unnestStr) + ParsedResult(unnestTestStrs.toSeq, (producedExprs ++= exprMap).result().toMap) + } + + private def parseGroupby(groupOpt: Option[GroupStatement], + exprMap: Map[String, FieldExpr]): ParsedResult = { + groupOpt match { + case Some(group) => + val producedExprs = mutable.Map.newBuilder[String, FieldExpr] + val groupStrs = group.bys.map { by => + val fieldExpr = exprMap(by.field.name) + val as = by.as.getOrElse(by.field) + val groupExpr = parseGroupByFunc(by, fieldExpr.refExpr) + val newExpr = s"$quote${as.name}$quote" + producedExprs += (as.name -> FieldExpr(newExpr, newExpr)) + s"$groupExpr as $newExpr" + } + val groupStr = s"group by ${groupStrs.mkString(",")} group as $groupVar" + + group.aggregates.foreach { aggr => + val fieldExpr = exprMap(aggr.field.name) + //def + val aggrExpr = parseAggregateFunc(aggr, fieldExpr.refExpr) + //ref + val newExpr = s"$quote${aggr.as.name}$quote" + producedExprs += aggr.as.name -> FieldExpr(newExpr, aggrExpr) + } + + ParsedResult(Seq(groupStr), producedExprs.result().toMap) + case None => ParsedResult(Seq(""), exprMap) + } + } + + + private def parseSelect(selectOpt: Option[SelectStatement], + exprMap: Map[String, FieldExpr], query: Query): ParsedResult = { + selectOpt match { + case Some(select) => + val producedExprs = mutable.Map.newBuilder[String, FieldExpr] + val orderStrs = select.orderOn.zip(select.order).map { + case (orderOn, order) => + val expr = exprMap(orderOn.name).refExpr + val orderStr = if (order == SortOrder.DSC) "desc" else "" + s"${expr} $orderStr" + } + val orderStr = if (orderStrs.nonEmpty) { + orderStrs.mkString("order by ", ",", "") + } else { + "" + } + val limitStr = s"limit ${select.limit}" + val offsetStr = s"offset ${select.offset}" + + if (select.fields.isEmpty) { + producedExprs ++= exprMap + } else { + select.fields.foreach { + case AllField => producedExprs ++= exprMap + case field => producedExprs += field.name -> exprMap(field.name) + } + } + val newExprMap = producedExprs.result().toMap + val projectStr = if (select.fields.isEmpty) { + if (query.hasUnnest || query.hasGroup) { + parseProject(exprMap) + } else { + s"select value $sourceVar" + } + } else { + parseProject(newExprMap) + } + ParsedResult(Seq(projectStr, orderStr, limitStr, offsetStr), newExprMap) + + case None => + val projectStr = + if (query.hasUnnest || query.hasGroup) { + parseProject(exprMap) + } else { + s"select value $sourceVar" + } + ParsedResult(Seq(projectStr, "", "", ""), exprMap) + } + + } + + private def parseProject(exprMap: Map[String, FieldExpr]): String = { + exprMap.filter { + case (field, expr) => field != "*" && expr.refExpr != sourceVar + }.map { + case (field, expr) => s"${expr.defExpr} as $quote$field$quote" + }.mkString("select ", ",", "") + } + + private def parseGlobalAggr(globalAggrOpt: Option[GlobalAggregateStatement], + exprMap: Map[String, FieldExpr], + queryStr: String): ParsedResult = { + globalAggrOpt match { + case Some(globalAggr) => + val producedExprs = mutable.Map.newBuilder[String, FieldExpr] + val aggr = globalAggr.aggregate + val funcName = typeImpl.getAggregateStr(aggr.func) + + val newDefExpr = if (aggr.func == Count) { + globalAggrVar + } else { + s"${globalAggrVar}.$quote${aggr.field.name}$quote" + } + val newRefExpr = s"$quote${aggr.as.name}$quote" + + producedExprs += aggr.as.name -> FieldExpr(newRefExpr, newDefExpr) + val result = + s""" + |select $funcName( + |(select value $newDefExpr from ($queryStr) as $globalAggrVar) + |) as $quote${aggr.as.name}$quote""".stripMargin + ParsedResult(Seq(result), producedExprs.result().toMap) + case None => + ParsedResult(Seq(queryStr), exprMap) + } + } + + protected def parseNumberRelation(filter: FilterStatement, + fieldExpr: String): String = { + filter.relation match { + case Relation.inRange => + if (filter.values.size != 2) throw new QueryParsingException(s"relation: ${filter.relation} require two parameters") + s"$fieldExpr >= ${filter.values(0)} and $fieldExpr < ${filter.values(1)}" + case Relation.in => + s"$fieldExpr in [ ${filter.values.mkString(",")} ]" + case _ => + s"$fieldExpr ${filter.values} ${filter.values.head}" + } + } + + protected def parseAggregateFunc(aggr: AggregateStatement, + fieldExpr: String): String = { + def aggFuncExpr(aggFunc: String): String = { + if (aggr.field.name.equals("*")) { + s"$aggFunc($groupVar)" + } else { + s"$aggFunc( (select value $groupVar.$fieldExpr from $groupVar) )" + } + } + + aggr.func match { + case topK: TopK => ??? + case DistinctCount => ??? + case _ => aggFuncExpr(typeImpl.getAggregateStr(aggr.func)) + } + } +} + +object SQLPPGenerator extends IQLGeneratorFactory { + override def apply(): IQLGenerator = new SQLPPGenerator() +} diff --git a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Query.scala b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Query.scala index af261eb1f..4c9881945 100644 --- a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Query.scala +++ b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Query.scala @@ -35,6 +35,12 @@ case class Query(dataset: String, isEstimable: Boolean = false ) extends IReadQuery { + def hasUnnest: Boolean = !unnest.isEmpty + + def hasGroup: Boolean = groups.isDefined + + def hasSelect: Boolean = select.isDefined + import TimeField.TimeFormat def setInterval(field: Field, interval: org.joda.time.Interval): Query = { @@ -174,11 +180,11 @@ case class GlobalAggregateStatement(aggregate: AggregateStatement ) extends Statement object SortOrder extends Enumeration { - val ASC, DSC = Value + val ASC, DSC = Value } case class SelectStatement(orderOn: Seq[Field], - order:Seq[SortOrder.Value], + order: Seq[SortOrder.Value], limit: Int, offset: Int, fields: Seq[Field] diff --git a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/SQLPPGeneratorTest.scala b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/SQLPPGeneratorTest.scala new file mode 100644 index 000000000..f937b2260 --- /dev/null +++ b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/SQLPPGeneratorTest.scala @@ -0,0 +1,604 @@ +package edu.uci.ics.cloudberry.zion.model.impl + +import edu.uci.ics.cloudberry.zion.model.schema._ +import org.specs2.mutable.Specification + + +class SQLPPGeneratorTest extends Specification { + + import TestQuery._ + + val parser = new SQLPPGenerator + + "SQLPPGenerator generate" should { + + "translate a simple unnest query" in { + val query = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq(unnestHashTag), None, Some(selectTop10)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select t.`favorite_count` as `favorite_count`,t.`geo_tag`.`countyID` as `geo_tag.countyID`,t.`user_mentions` as `user_mentions`,`unnest0` as `tag`,t.`user`.`id` as `user.id`,t.`geo_tag`.`cityID` as `geo_tag.cityID`,t.`is_retweet` as `is_retweet`,t.`text` as `text`,t.`retweet_count` as `retweet_count`,t.`in_reply_to_user` as `in_reply_to_user`,t.`id` as `id`,t.`coordinate` as `coordinate`,t.`in_reply_to_status` as `in_reply_to_status`,t.`user`.`status_count` as `user.status_count`,t.`geo_tag`.`stateID` as `geo_tag.stateID`,t.`create_at` as `create_at`,t.`lang` as `lang`,t.`hashtags` as `hashtags` + |from twitter.ds_tweet t + |unnest t.`hashtags` `unnest0` + |where not(is_null(t.`hashtags`)) + |limit 10 + |offset 0;""".stripMargin.trim) + } + + "translate a simple filter by time and group by time query" in { + val filter = Seq(timeFilter) + val group = GroupStatement(Seq(byHour), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`hour` as `hour` + |from twitter.ds_tweet t + |where t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1H") )) as `hour` group as g; + | """.stripMargin.trim) + } + + "translate a text contain filter and group by time query" in { + val filter = Seq(textFilter) + val group = GroupStatement(Seq(byHour), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`hour` as `hour` + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1H") )) as `hour` group as g; + | """.stripMargin.trim) + } + + "translate a geo id set filter group by time query" in { + val filter = Seq(stateFilter) + val group = GroupStatement(Seq(byHour), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`hour` as `hour` + |from twitter.ds_tweet t + |where t.`geo_tag`.`stateID` in [ 37,51,24,11,10,34,42,9,44 ] + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1H") )) as `hour` group as g; + | """.stripMargin.trim) + } + + "translate a text contain + time + geo id set filter and group by time + spatial cube" in { + val filter = Seq(textFilter, timeFilter, stateFilter) + val group = GroupStatement(Seq(byHour, byState), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`state` as `state`,`hour` as `hour` + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") and t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') and t.`geo_tag`.`stateID` in [ 37,51,24,11,10,34,42,9,44 ] + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1H") )) as `hour`,t.geo_tag.stateID as `state` group as g; + | """.stripMargin.trim) + } + + "translate a text contain + time + geo id set filter and sample tweets" in { + val filter = Seq(textFilter, timeFilter, stateFilter) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, None, Some(selectRecent)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select t.`user`.`id` as `user.id`,t.`create_at` as `create_at`,t.`id` as `id` + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") and t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') and t.`geo_tag`.`stateID` in [ 37,51,24,11,10,34,42,9,44 ] + |order by t.`create_at` desc + |limit 100 + |offset 0; + | """.stripMargin.trim) + } + + "translate a text contain + time + geo id set filter and group by hashtags" in { + val filter = Seq(textFilter, timeFilter, stateFilter) + val group = GroupStatement(Seq(byTag), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq(unnestHashTag), Some(group), Some(selectTop10Tag)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`tag` as `tag` + |from twitter.ds_tweet t + |unnest t.`hashtags` `unnest0` + |where not(is_null(t.`hashtags`)) and similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") and t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') and t.`geo_tag`.`stateID` in [ 37,51,24,11,10,34,42,9,44 ] + |group by `unnest0` as `tag` group as g + |order by `count` desc + |limit 10 + |offset 0; + | """.stripMargin.trim) + } + + "translate a simple filter by time and group by time query max id" in { + val filter = Seq(timeFilter) + val group = GroupStatement(Seq(byHour), Seq(aggrMax)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_max( (select value g.t.`id` from g) ) as `max`,`hour` as `hour` + |from twitter.ds_tweet t + |where t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1H") )) as `hour` group as g; + | """.stripMargin.trim) + } + + "translate a simple filter by time and group by time query min id" in { + val filter = Seq(timeFilter) + val group = GroupStatement(Seq(byHour), Seq(aggrMin)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select `hour` as `hour`,coll_min( (select value g.t.`id` from g) ) as `min` + |from twitter.ds_tweet t + |where t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1H") )) as `hour` group as g; + | """.stripMargin.trim) + } + + "translate a simple filter by time and group by time query sum id" in { + val filter = Seq(timeFilter) + val group = GroupStatement(Seq(byHour), Seq(aggrSum)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select `hour` as `hour`,coll_sum( (select value g.t.`id` from g) ) as `sum` + |from twitter.ds_tweet t + |where t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1H") )) as `hour` group as g; + | """.stripMargin.trim) + } + + "translate a simple filter by time and group by time query avg id" in { + val filter = Seq(timeFilter) + val group = GroupStatement(Seq(byHour), Seq(aggrAvg)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_avg( (select value g.t.`id` from g) ) as `avg`,`hour` as `hour` + |from twitter.ds_tweet t + |where t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1H") )) as `hour` group as g; + | """.stripMargin.trim) + } + + "translate a text contain filter and group by geocell 10th" in { + val filter = Seq(textFilter) + val group = GroupStatement(Seq(byGeocell10), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`cell` as `cell` + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |group by get_points(spatial_cell(t.`coordinate`, create_point(0.0,0.0), 0.1, 0.1))[0] as `cell` group as g; + """.stripMargin.trim) + } + + "translate a text contain filter and group by geocell 100th" in { + val filter = Seq(textFilter) + val group = GroupStatement(Seq(byGeocell100), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`cell` as `cell` + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |group by get_points(spatial_cell(t.`coordinate`, create_point(0.0,0.0), 0.01, 0.01))[0] as `cell` group as g; + """. + stripMargin.trim) + } + + "translate a text contain filter and group by geocell 1000th" in { + val filter = Seq(textFilter) + val group = GroupStatement(Seq(byGeocell1000), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`cell` as `cell` + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |group by get_points(spatial_cell(t.`coordinate`, create_point(0.0,0.0), 0.001, 0.001))[0] as `cell` group as g; + """. + stripMargin.trim) + } + + "translate a text contain filter and group by bin" in { + val filter = Seq(textFilter) + val group = GroupStatement(Seq(byBin), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`state` as `state` + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |group by round(t.`geo_tag`.`stateID`/10)*10 as `state` group as g; + | """.stripMargin.trim) + } + + "translate a group by geocell without filter" in { + val group = GroupStatement(Seq(byGeocell1000), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`cell` as `cell` + |from twitter.ds_tweet t + |group by get_points(spatial_cell(t.`coordinate`, create_point(0.0,0.0), 0.001, 0.001))[0] as `cell` group as g; + """. + stripMargin.trim) + } + + "translate a text contain filter and select 10" in { + val filter = Seq(textFilter) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, None, Some(selectTop10)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select value t + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |limit 10 + |offset 0; + | """.stripMargin.trim) + } + "translate group by second" in { + val group = GroupStatement(Seq(bySecond), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`sec` as `sec` + |from twitter.ds_tweet t + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1S") )) as `sec` group as g; + | """.stripMargin.trim) + } + "translate group by minute" in { + val group = GroupStatement(Seq(byMinute), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`min` as `min` + |from twitter.ds_tweet t + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("PT1M") )) as `min` group as g; + | """.stripMargin.trim) + } + + "translate group by day" in { + val group = GroupStatement(Seq(byDay), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`day` as `day` + |from twitter.ds_tweet t + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("P1D") )) as `day` group as g; + | """.stripMargin.trim) + } + + "translate group by week" in { + val group = GroupStatement(Seq(byWeek), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`week` as `week` + |from twitter.ds_tweet t + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), day_time_duration("P7D") )) as `week` group as g; + | """.stripMargin.trim) + } + + "translate group by month" in { + val group = GroupStatement(Seq(byMonth), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`month` as `month` + |from twitter.ds_tweet t + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), year_month_duration("P1M") )) as `month` group as g; + | """.stripMargin.trim) + } + + "translate group by year" in { + val group = GroupStatement(Seq(byYear), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(group), None) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select coll_count(g) as `count`,`year` as `year` + |from twitter.ds_tweet t + |group by get_interval_start_datetime(interval_bin(t.`create_at`, datetime('1990-01-01T00:00:00.000Z'), year_month_duration("P1Y") )) as `year` group as g; + | """.stripMargin.trim) + } + + "translate a count cardinality query without group by" in { + val globalAggr = GlobalAggregateStatement(aggrCount) + val query = new Query(dataset = TwitterDataSet, globalAggr = Some(globalAggr)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select coll_count( + |(select value c from (select value t + |from twitter.ds_tweet t) as c) + |) as `count`;""".stripMargin) + } + + "translate get min field value query without group by" in { + val globalAggr = GlobalAggregateStatement(aggrMin) + val query = new Query(dataset = TwitterDataSet, globalAggr = Some(globalAggr)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select coll_min( + |(select value c.`id` from (select value t + |from twitter.ds_tweet t) as c) + |) as `min`;""".stripMargin) + } + + "translate get max field value query without group by" in { + val globalAggr = GlobalAggregateStatement(aggrMax) + val query = new Query(dataset = TwitterDataSet, globalAggr = Some(globalAggr)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select coll_max( + |(select value c.`id` from (select value t + |from twitter.ds_tweet t) as c) + |) as `max`;""".stripMargin) + } + + "translate a count cardinality query with filter without group by" in { + val filter = Seq(timeFilter) + val globalAggr = GlobalAggregateStatement(aggrCount) + val query = new Query(dataset = TwitterDataSet, filter = filter, globalAggr = Some(globalAggr)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select coll_count( + |(select value c from (select value t + |from twitter.ds_tweet t + |where t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z')) as c) + |) as `count`;""".stripMargin) + } + + "translate a min cardinality query with filter without group by" in { + val filter = Seq(timeFilter) + val globalAggr = GlobalAggregateStatement(aggrMin) + val query = new Query(dataset = TwitterDataSet, filter = filter, globalAggr = Some(globalAggr)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select coll_min( + |(select value c.`id` from (select value t + |from twitter.ds_tweet t + |where t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z')) as c) + |) as `min`;""".stripMargin) + } + + "translate a max cardinality query with unnest with group by with select" in { + + val filter = Seq(textFilter, timeFilter, stateFilter) + val globalAggr = GlobalAggregateStatement(aggrMaxGroupBy) + val group = GroupStatement(Seq(byTag), Seq(aggrCount)) + val query = new Query(TwitterDataSet, Seq.empty, filter, Seq(unnestHashTag), Some(group), Some(selectTop10Tag), Some(globalAggr)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select coll_max( + |(select value c.`count` from (select coll_count(g) as `count`,`tag` as `tag` + |from twitter.ds_tweet t + |unnest t.`hashtags` `unnest0` + |where not(is_null(t.`hashtags`)) and similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") and t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') and t.`geo_tag`.`stateID` in [ 37,51,24,11,10,34,42,9,44 ] + |group by `unnest0` as `tag` group as g + |order by `count` desc + |limit 10 + |offset 0) as c) + |) as `max`;""".stripMargin.trim) + } + + "translate a count cardinality query with select" in { + val globalAggr = GlobalAggregateStatement(aggrCount) + val query = new Query(dataset = TwitterDataSet, select = Some(selectTop10), globalAggr = Some(globalAggr)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select coll_count( + |(select value c from (select value t + |from twitter.ds_tweet t + |limit 10 + |offset 0) as c) + |) as `count`;""".stripMargin.trim) + } + + "translate lookup one table with one join key" in { + val populationDataSet = PopulationDataStore.DatasetName + val populationSchema = PopulationDataStore.PopulationSchema + + val selectStatement = SelectStatement(Seq.empty, Seq.empty, 0, 0, Seq(AllField, population)) + val lookup = Seq(lookupPopulation) + val filter = Seq(textFilter) + val query = new Query(TwitterDataSet, lookup, filter, Seq.empty, select = Some(selectStatement)) + val result = parser.generate(query, schemaMap = Map(TwitterDataSet -> twitterSchema, populationDataSet -> populationSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select t.`favorite_count` as `favorite_count`,t.`geo_tag`.`countyID` as `geo_tag.countyID`,t.`user_mentions` as `user_mentions`,l0.`population` as `population`,t.`user`.`id` as `user.id`,t.`geo_tag`.`cityID` as `geo_tag.cityID`,t.`is_retweet` as `is_retweet`,t.`text` as `text`,t.`retweet_count` as `retweet_count`,t.`in_reply_to_user` as `in_reply_to_user`,t.`id` as `id`,t.`coordinate` as `coordinate`,t.`in_reply_to_status` as `in_reply_to_status`,t.`user`.`status_count` as `user.status_count`,t.`geo_tag`.`stateID` as `geo_tag.stateID`,t.`create_at` as `create_at`,t.`lang` as `lang`,t.`hashtags` as `hashtags` + |from twitter.ds_tweet t left outer join twitter.US_population l0 on l0.stateID = t.`geo_tag`.`stateID` + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |limit 0 + |offset 0;""".stripMargin.trim + ) + } + + "parseLookup should be able to handle multiple fields in the lookup statement" in { + val populationDataSet = PopulationDataStore.DatasetName + val populationSchema = PopulationDataStore.PopulationSchema + + val selectStatement = SelectStatement(Seq.empty, Seq.empty, 0, 0, Seq(AllField, population, stateID)) + val lookup = LookupStatement(Seq(geoStateID), populationDataSet, Seq(stateID), Seq(population, stateID), + Seq(population, stateID)) + val filter = Seq(textFilter) + val query = new Query(TwitterDataSet, Seq(lookup), filter, Seq.empty, select = Some(selectStatement)) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema, populationDataSet -> populationSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select t.`favorite_count` as `favorite_count`,t.`geo_tag`.`countyID` as `geo_tag.countyID`,t.`user_mentions` as `user_mentions`,l0.`population` as `population`,l0.`stateID` as `stateID`,t.`user`.`id` as `user.id`,t.`geo_tag`.`cityID` as `geo_tag.cityID`,t.`is_retweet` as `is_retweet`,t.`text` as `text`,t.`retweet_count` as `retweet_count`,t.`in_reply_to_user` as `in_reply_to_user`,t.`id` as `id`,t.`coordinate` as `coordinate`,t.`in_reply_to_status` as `in_reply_to_status`,t.`user`.`status_count` as `user.status_count`,t.`geo_tag`.`stateID` as `geo_tag.stateID`,t.`create_at` as `create_at`,t.`lang` as `lang`,t.`hashtags` as `hashtags` + |from twitter.ds_tweet t left outer join twitter.US_population l0 on l0.stateID = t.`geo_tag`.`stateID` + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |limit 0 + |offset 0;""".stripMargin.trim + ) + } + + "translate lookup multiple tables with one join key on each" in { + val populationDataSet = PopulationDataStore.DatasetName + val populationSchema = PopulationDataStore.PopulationSchema + + val literacyDataSet = LiteracyDataStore.DatasetName + val literacySchema = LiteracyDataStore.LiteracySchema + + val selectValues = Seq(AllField, population, literacy) + val selectStatement = SelectStatement(Seq.empty, Seq.empty, 0, 0, selectValues) + val filter = Seq(textFilter) + val query = new Query(TwitterDataSet, + lookup = Seq(lookupPopulation, lookupLiteracy), + filter, Seq.empty, + select = Some(selectStatement)) + + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema, populationDataSet -> populationSchema, + literacyDataSet -> literacySchema)) + removeEmptyLine(result) must_== unifyNewLine( + """ + |select t.`favorite_count` as `favorite_count`,t.`geo_tag`.`countyID` as `geo_tag.countyID`,t.`user_mentions` as `user_mentions`,l0.`population` as `population`,t.`user`.`id` as `user.id`,t.`geo_tag`.`cityID` as `geo_tag.cityID`,t.`is_retweet` as `is_retweet`,t.`text` as `text`,t.`retweet_count` as `retweet_count`,l1.`literacy` as `literacy`,t.`in_reply_to_user` as `in_reply_to_user`,t.`id` as `id`,t.`coordinate` as `coordinate`,t.`in_reply_to_status` as `in_reply_to_status`,t.`user`.`status_count` as `user.status_count`,t.`geo_tag`.`stateID` as `geo_tag.stateID`,t.`create_at` as `create_at`,t.`lang` as `lang`,t.`hashtags` as `hashtags` + |from twitter.ds_tweet t left outer join twitter.US_population l0 on l0.stateID = t.`geo_tag`.`stateID` + |left outer join twitter.US_literacy l1 on l1.stateID = t.`geo_tag`.`stateID` + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |limit 0 + |offset 0;""".stripMargin.trim + ) + } + + + "translate group by query having lookup with one join key" in { + val populationDataSet = PopulationDataStore.DatasetName + val populationSchema = PopulationDataStore.PopulationSchema + + val selectValues = Seq(population) + val group = Some(groupPopulationSum) + val lookup = LookupStatement( + sourceKeys = Seq(geoStateID), + dataset = populationDataSet, + lookupKeys = Seq(stateID), + selectValues, + as = selectValues) + val filter = Seq(textFilter) + val query = new Query(TwitterDataSet, Seq(lookup), filter, Seq.empty, group) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema, populationDataSet -> populationSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select `state` as `state`,coll_sum( (select value g.l0.`population` from g) ) as `sum` + |from twitter.ds_tweet t left outer join twitter.US_population l0 on l0.stateID = t.`geo_tag`.`stateID` + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |and contains(t.`text`, "virus") + |group by t.geo_tag.stateID as `state` group as g;""".stripMargin.trim + ) + } + + "translate a text contain + time + geo id set filter and group day and state and aggregate topK hashtags" in { + ok + } + + + "translate a filter having point relation with select" in { + val filter = Seq(pointFilter) + val select = Option(selectRecent) + val query = new Query(dataset = TwitterDataSet, filter = filter, select = select) + val result = parser.generate(query, Map(TwitterDataSet -> twitterSchema)) + removeEmptyLine(result) must_== unifyNewLine( + """select t.`user`.`id` as `user.id`,t.`create_at` as `create_at`,t.`id` as `id` + |from twitter.ds_tweet t + |where spatial_intersect(t.`coordinate`, + | create_rectangle(create_point(0.0,0.0), + | create_point(1.0,1.0))) + |order by t.`create_at` desc + |limit 100 + |offset 0;""".stripMargin) + } + } + + "SQLPPGenerator calcResultSchema" should { + "return the input schema if the query is subset filter only" in { + val schema = parser.calcResultSchema(zikaCreateQuery, TwitterDataStore.TwitterSchema) + schema must_== TwitterDataStore.TwitterSchema + } + "return the aggregated schema for aggregation queries" in { + ok + } + } + + "SQLPPGenerator createView" should { + "generate the ddl for the twitter dataset" in { + val ddl = parser.generate(CreateView("zika", zikaCreateQuery), Map(TwitterDataSet-> TwitterDataStore.TwitterSchema)) + removeEmptyLine(ddl) must_== unifyNewLine( + """ + |create type twitter.typeTweet if not exists as open { + | favorite_count : double, + | geo_tag : { countyID : double }, + | user_mentions : {{double}}?, + | user : { id : double }, + | geo_tag : { cityID : double }, + | is_retweet : boolean, + | text : string, + | retweet_count : double, + | in_reply_to_user : double, + | id : double, + | coordinate : point, + | in_reply_to_status : double, + | user : { status_count : double }, + | geo_tag : { stateID : double }, + | create_at : datetime, + | lang : string, + | hashtags : {{string}}? + |} + |drop dataset zika if exists; + |create dataset zika(twitter.typeTweet) primary key id //with filter on 'create_at' + |insert into zika ( + |select value t + |from twitter.ds_tweet t + |where similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |);""".stripMargin.trim) + } + } + + "SQLPPGenerator appendView" should { + "generate the upsert query" in { + val timeFilter = FilterStatement(TimeField(TwitterDataStore.TimeFieldName), None, Relation.inRange, Seq(startTime, endTime)) + val sql = parser.generate(AppendView("zika", zikaCreateQuery.copy(filter = Seq(timeFilter) ++ zikaCreateQuery.filter)), Map("twitter.ds_tweet" -> TwitterDataStore.TwitterSchema)) + removeEmptyLine(sql) must_== unifyNewLine( + """ + |upsert into zika ( + |select value t + |from twitter.ds_tweet t + |where t.`create_at` >= datetime('2016-01-01T00:00:00.000Z') and t.`create_at` < datetime('2016-12-01T00:00:00.000Z') and similarity_jaccard(word_tokens(t.`text`), word_tokens('zika')) > 0.0 + |); + """.stripMargin.trim) + } + } +} \ No newline at end of file diff --git a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestQuery.scala b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestQuery.scala index a5fc26588..6a7bc02cb 100644 --- a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestQuery.scala +++ b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestQuery.scala @@ -45,6 +45,7 @@ object TestQuery { val stateFilter = FilterStatement(geoStateID, None, Relation.in, stateValue) val retweetFilter = FilterStatement(isRetweet, None, Relation.isTrue, Seq.empty) val bagFilter = FilterStatement(hashtags, None, Relation.contains, Seq(BagField("tags", DataType.String, false))) + val pointFilter = FilterStatement(coordinate, None, Relation.inRange, Seq(Seq(0.0, 0.0), Seq(1.0, 1.0))) val intValues = Seq(1) val stringValue = Seq("English")