Skip to content

Commit

Permalink
Add tumble windowing function (opensearch-project#37)
Browse files Browse the repository at this point in the history
* Add tumble function and IT

Signed-off-by: Chen Dai <[email protected]>

* Add UT

Signed-off-by: Chen Dai <[email protected]>

* Add assertion on DF schema in IT

Signed-off-by: Chen Dai <[email protected]>

* Update Javadoc

Signed-off-by: Chen Dai <[email protected]>

* Remove select alias in IT

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Sep 20, 2023
1 parent bd68653 commit e72f054
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.sql.FlintSparkSqlParser

import org.apache.spark.sql.SparkSessionExtensions
Expand All @@ -18,6 +19,9 @@ class FlintSparkExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectParser { (spark, parser) =>
new FlintSparkSqlParser(parser)
}

extensions.injectFunction(TumbleFunction.description)

extensions.injectOptimizerRule { spark =>
new FlintSparkOptimizer(spark)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.function

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.functions.window

/**
* Tumble windowing function that assigns row to fixed interval window without overlap.
*/
object TumbleFunction {

/**
* Function name.
*/
val identifier: FunctionIdentifier = FunctionIdentifier("tumble")

/**
* Function signature: tumble function generates a new struct column after evaluation.
*/
val exprInfo: ExpressionInfo = new ExpressionInfo(classOf[Column].getCanonicalName, "window")

/**
* Function implementation builder.
*/
val functionBuilder: Seq[Expression] => Expression =
(children: Seq[Expression]) => {
require(children.size == 2, "column name and window expression are required")

// Delegate actual implementation to Spark existing window() function
val timeColumn = children.head
val windowDuration = children(1)
window(new Column(timeColumn), windowDuration.toString()).expr
}

/**
* Function description to register current function to Spark extension.
*/
val description: (FunctionIdentifier, ExpressionInfo, FunctionBuilder) =
(identifier, exprInfo, functionBuilder)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.function

import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.functions.{col, lit}

class TumbleFunctionSuite extends FlintSuite with Matchers {

test("should accept column name and window expression as arguments") {
TumbleFunction.functionBuilder(Seq(col("timestamp").expr, lit("10 minutes").expr))
}

test("should fail if only column name provided") {
assertThrows[IllegalArgumentException] {
TumbleFunction.functionBuilder(Seq(col("timestamp").expr))
}
}

test("should fail if argument type is wrong") {
assertThrows[AnalysisException] {
TumbleFunction.functionBuilder(Seq(col("timestamp").expr, lit(10).expr))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.sql.Timestamp

import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.types.StructType

class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite {

test("tumble windowing function") {
val inputDF = spark
.createDataFrame(
Seq(
(1L, "2023-01-01 00:00:00"),
(2L, "2023-01-01 00:09:00"),
(3L, "2023-01-01 00:15:00")))
.toDF("id", "timestamp")

val resultDF = inputDF.selectExpr("TUMBLE(timestamp, '10 minutes')")

resultDF.schema shouldBe StructType.fromDDL(
"window struct<start:timestamp,end:timestamp> NOT NULL")
checkAnswer(
resultDF,
Seq(
Row(Row(timestamp("2023-01-01 00:00:00"), timestamp("2023-01-01 00:10:00"))),
Row(Row(timestamp("2023-01-01 00:00:00"), timestamp("2023-01-01 00:10:00"))),
Row(Row(timestamp("2023-01-01 00:10:00"), timestamp("2023-01-01 00:20:00")))))
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}

0 comments on commit e72f054

Please sign in to comment.