-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support on-demand incremental refresh (#234)
* Add index refresher Signed-off-by: Chen Dai <[email protected]> * Add incremental index option Signed-off-by: Chen Dai <[email protected]> * Fix broken IT Signed-off-by: Chen Dai <[email protected]> * Fix broken UT Signed-off-by: Chen Dai <[email protected]> * Update user manual and javadoc Signed-off-by: Chen Dai <[email protected]> * Rename index refresher Signed-off-by: Chen Dai <[email protected]> * Add UT for index refresh Signed-off-by: Chen Dai <[email protected]> * Add more SQL IT Signed-off-by: Chen Dai <[email protected]> * Fix javadoc and comments Signed-off-by: Chen Dai <[email protected]> * Address PR comments Signed-off-by: Chen Dai <[email protected]> * Fix broken IT Signed-off-by: Chen Dai <[email protected]> --------- Signed-off-by: Chen Dai <[email protected]>
- Loading branch information
Showing
20 changed files
with
558 additions
and
144 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
111 changes: 111 additions & 0 deletions
111
...park-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark.refresh | ||
|
||
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions} | ||
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} | ||
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} | ||
|
||
import org.apache.spark.sql.{DataFrame, Row, SparkSession} | ||
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE | ||
import org.apache.spark.sql.flint.config.FlintSparkConf | ||
import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY | ||
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger} | ||
|
||
/** | ||
* Index refresh that auto refreshes the index by index options provided. | ||
* | ||
* @param indexName | ||
* Flint index name | ||
* @param index | ||
* Flint index | ||
*/ | ||
class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresh { | ||
|
||
override def refreshMode: RefreshMode = AUTO | ||
|
||
override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { | ||
val options = index.options | ||
val tableName = index.metadata().source | ||
index match { | ||
// Flint index has specialized logic and capability for incremental refresh | ||
case refresh: StreamingRefresh => | ||
logInfo("Start refreshing index in streaming style") | ||
val job = | ||
refresh | ||
.buildStream(spark) | ||
.writeStream | ||
.queryName(indexName) | ||
.format(FLINT_DATASOURCE) | ||
.options(flintSparkConf.properties) | ||
.addSinkOptions(options, flintSparkConf) | ||
.start(indexName) | ||
Some(job.id.toString) | ||
|
||
// Otherwise, fall back to foreachBatch + batch refresh | ||
case _ => | ||
logInfo("Start refreshing index in foreach streaming style") | ||
val job = spark.readStream | ||
.options(options.extraSourceOptions(tableName)) | ||
.table(quotedTableName(tableName)) | ||
.writeStream | ||
.queryName(indexName) | ||
.addSinkOptions(options, flintSparkConf) | ||
.foreachBatch { (batchDF: DataFrame, _: Long) => | ||
new FullIndexRefresh(indexName, index, Some(batchDF)) | ||
.start(spark, flintSparkConf) | ||
() // discard return value above and return unit to use right overridden method | ||
} | ||
.start() | ||
Some(job.id.toString) | ||
} | ||
} | ||
|
||
// Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API | ||
private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) { | ||
|
||
def addSinkOptions( | ||
options: FlintSparkIndexOptions, | ||
flintSparkConf: FlintSparkConf): DataStreamWriter[Row] = { | ||
dataStream | ||
.addCheckpointLocation(options.checkpointLocation(), flintSparkConf.isCheckpointMandatory) | ||
.addRefreshInterval(options.refreshInterval()) | ||
.addAvailableNowTrigger(options.incrementalRefresh()) | ||
.addOutputMode(options.outputMode()) | ||
.options(options.extraSinkOptions()) | ||
} | ||
|
||
def addCheckpointLocation( | ||
checkpointLocation: Option[String], | ||
isCheckpointMandatory: Boolean): DataStreamWriter[Row] = { | ||
checkpointLocation match { | ||
case Some(location) => dataStream.option("checkpointLocation", location) | ||
case None if isCheckpointMandatory => | ||
throw new IllegalStateException( | ||
s"Checkpoint location is mandatory for incremental refresh if ${CHECKPOINT_MANDATORY.key} enabled") | ||
case _ => dataStream | ||
} | ||
} | ||
|
||
def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = { | ||
refreshInterval | ||
.map(interval => dataStream.trigger(Trigger.ProcessingTime(interval))) | ||
.getOrElse(dataStream) | ||
} | ||
|
||
def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = { | ||
if (incrementalRefresh) { | ||
dataStream.trigger(Trigger.AvailableNow()) | ||
} else { | ||
dataStream | ||
} | ||
} | ||
|
||
def addOutputMode(outputMode: Option[String]): DataStreamWriter[Row] = { | ||
outputMode.map(dataStream.outputMode).getOrElse(dataStream) | ||
} | ||
} | ||
} |
Oops, something went wrong.