From f446de04f29726c5b1e2d9b3cc598ccc75b3621f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 8 Feb 2024 12:41:07 -0800 Subject: [PATCH] Support on-demand incremental refresh (#234) * Add index refresher Signed-off-by: Chen Dai * Add incremental index option Signed-off-by: Chen Dai * Fix broken IT Signed-off-by: Chen Dai * Fix broken UT Signed-off-by: Chen Dai * Update user manual and javadoc Signed-off-by: Chen Dai * Rename index refresher Signed-off-by: Chen Dai * Add UT for index refresh Signed-off-by: Chen Dai * Add more SQL IT Signed-off-by: Chen Dai * Fix javadoc and comments Signed-off-by: Chen Dai * Address PR comments Signed-off-by: Chen Dai * Fix broken IT Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- docs/index.md | 24 +++- .../opensearch/flint/spark/FlintSpark.scala | 136 ++---------------- .../flint/spark/FlintSparkIndexOptions.scala | 15 +- .../spark/refresh/AutoIndexRefresh.scala | 111 ++++++++++++++ .../refresh/FlintSparkIndexRefresh.scala | 68 +++++++++ .../spark/refresh/FullIndexRefresh.scala | 45 ++++++ .../refresh/IncrementalIndexRefresh.scala | 45 ++++++ .../FlintSparkCoveringIndexAstBuilder.scala | 1 - ...FlintSparkMaterializedViewAstBuilder.scala | 1 - .../FlintSparkSkippingIndexAstBuilder.scala | 1 - .../spark/FlintSparkIndexOptionsSuite.scala | 4 + .../spark/FlintSparkIndexRefreshSuite.scala | 51 +++++++ .../mv/FlintSparkMaterializedViewSuite.scala | 9 +- .../FlintSparkCoveringIndexITSuite.scala | 5 +- .../FlintSparkCoveringIndexSqlITSuite.scala | 31 +++- .../FlintSparkMaterializedViewITSuite.scala | 1 + ...FlintSparkMaterializedViewSqlITSuite.scala | 31 +++- .../FlintSparkSkippingIndexITSuite.scala | 68 ++++++++- .../FlintSparkSkippingIndexSqlITSuite.scala | 33 ++++- .../spark/FlintSparkTransactionITSuite.scala | 22 ++- 20 files changed, 558 insertions(+), 144 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala create mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexRefreshSuite.scala diff --git a/docs/index.md b/docs/index.md index 84ba54d4b..1832953be 100644 --- a/docs/index.md +++ b/docs/index.md @@ -27,6 +27,25 @@ Please see the following example in which Index Building Logic and Query Rewrite | MinMax | CREATE SKIPPING INDEX
ON alb_logs
(
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 | | BloomFilter | CREATE SKIPPING INDEX
ON alb_logs
(
  client_ip BLOOM_FILTER
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  BLOOM_FILTER_AGG(client_ip) AS client_ip,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE client_ip = '127.0.0.1'
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE BLOOM_FILTER_MIGHT_CONTAIN(client_ip, '127.0.0.1') = true
)
WHERE client_ip = '127.0.0.1' | +### Flint Index Refresh + +- **Auto Refresh:** + - This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences. +- **Manual Refresh:** + - Users have the option to manually trigger a refresh for the Flint Index. This provides flexibility and control over when the refresh occurs. + - **Full Refresh:** + - Initiates a comprehensive update of the Flint Index, fetching all available data and ensuring the most up-to-date information is displayed. + - **Incremental Refresh:** + - Performs an incremental update by fetching only the new data since the last refresh. This is useful for optimizing the refresh process and reducing resource usage. + +The refresh mode is influenced by the index options specified during index creation, particularly the `auto_refresh` and `incremental_refresh` options. These options collectively define the behavior of the refresh mode when creating an index as below. Find more details in [Create Index Options](#create-index-options). + +| Refresh Mode | auto_refresh | incremental_refresh | +|---------------------|--------------|---------------------| +| Auto Refresh | true | | +| Full Refresh | false | false | +| Incremental Refresh | false | true | + ### Flint Index Specification #### Metadata @@ -263,9 +282,10 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics User can provide the following options in `WITH` clause of create statement: -+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually. ++ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually. + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. -+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. ++ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled. ++ `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index dc85affb1..fba818a0f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -12,20 +12,19 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY -import org.opensearch.flint.spark.FlintSpark.RefreshMode.{AUTO, MANUAL, RefreshMode} -import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh} +import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.SaveMode._ +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf -import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} -import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger} +import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} /** * Flint Spark integration API entrypoint. @@ -130,8 +129,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { * * @param indexName * index name - * @param mode - * refresh mode * @return * refreshing job ID (empty if batch job for now) */ @@ -139,7 +136,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Refreshing Flint index $indexName") val index = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - val mode = if (index.options.autoRefresh()) AUTO else MANUAL + val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) try { flintClient @@ -149,17 +146,16 @@ class FlintSpark(val spark: SparkSession) extends Logging { latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) .finalLog(latest => { // Change state to active if full, otherwise update index state regularly - if (mode == MANUAL) { - logInfo("Updating index state to active") - latest.copy(state = ACTIVE) - } else { - // Schedule regular update and return log entry as refreshing state + if (indexRefresh.refreshMode == AUTO) { logInfo("Scheduling index state monitor") flintIndexMonitor.startMonitor(indexName) latest + } else { + logInfo("Updating index state to active") + latest.copy(state = ACTIVE) } }) - .commit(_ => doRefreshIndex(index, indexName, mode)) + .commit(_ => indexRefresh.start(spark, flintSparkConf)) } catch { case e: Exception => logError("Failed to refresh Flint index", e) @@ -292,7 +288,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintIndexMonitor.startMonitor(indexName) latest.copy(state = REFRESHING) }) - .commit(_ => doRefreshIndex(index.get, indexName, AUTO)) + .commit(_ => + FlintSparkIndexRefresh + .create(indexName, index.get) + .start(spark, flintSparkConf)) logInfo("Recovery complete") true @@ -333,67 +332,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { spark.read.format(FLINT_DATASOURCE).load(indexName) } - // TODO: move to separate class - private def doRefreshIndex( - index: FlintSparkIndex, - indexName: String, - mode: RefreshMode): Option[String] = { - logInfo(s"Refreshing index $indexName in $mode mode") - val options = index.options - val tableName = index.metadata().source - - // Batch refresh Flint index from the given source data frame - def batchRefresh(df: Option[DataFrame] = None): Unit = { - index - .build(spark, df) - .write - .format(FLINT_DATASOURCE) - .options(flintSparkConf.properties) - .mode(Overwrite) - .save(indexName) - } - - val jobId = mode match { - case MANUAL => - logInfo("Start refreshing index in batch style") - batchRefresh() - None - - // Flint index has specialized logic and capability for incremental refresh - case AUTO if index.isInstanceOf[StreamingRefresh] => - logInfo("Start refreshing index in streaming style") - val job = - index - .asInstanceOf[StreamingRefresh] - .buildStream(spark) - .writeStream - .queryName(indexName) - .format(FLINT_DATASOURCE) - .options(flintSparkConf.properties) - .addSinkOptions(options) - .start(indexName) - Some(job.id.toString) - - // Otherwise, fall back to foreachBatch + batch refresh - case AUTO => - logInfo("Start refreshing index in foreach streaming style") - val job = spark.readStream - .options(options.extraSourceOptions(tableName)) - .table(quotedTableName(tableName)) - .writeStream - .queryName(indexName) - .addSinkOptions(options) - .foreachBatch { (batchDF: DataFrame, _: Long) => - batchRefresh(Some(batchDF)) - } - .start() - Some(job.id.toString) - } - - logInfo("Refresh index complete") - jobId - } - private def stopRefreshingJob(indexName: String): Unit = { logInfo(s"Terminating refreshing job $indexName") val job = spark.streams.active.find(_.name == indexName) @@ -403,48 +341,4 @@ class FlintSpark(val spark: SparkSession) extends Logging { logWarning("Refreshing job not found") } } - - // Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API - private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) { - - def addSinkOptions(options: FlintSparkIndexOptions): DataStreamWriter[Row] = { - dataStream - .addCheckpointLocation(options.checkpointLocation()) - .addRefreshInterval(options.refreshInterval()) - .addOutputMode(options.outputMode()) - .options(options.extraSinkOptions()) - } - - def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = { - checkpointLocation match { - case Some(location) => dataStream.option("checkpointLocation", location) - case None if flintSparkConf.isCheckpointMandatory => - throw new IllegalStateException( - s"Checkpoint location is mandatory for incremental refresh if ${CHECKPOINT_MANDATORY.key} enabled") - case _ => dataStream - } - } - - def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = { - refreshInterval - .map(interval => dataStream.trigger(Trigger.ProcessingTime(interval))) - .getOrElse(dataStream) - } - - def addOutputMode(outputMode: Option[String]): DataStreamWriter[Row] = { - outputMode.map(dataStream.outputMode).getOrElse(dataStream) - } - } -} - -object FlintSpark { - - /** - * Index refresh mode: FULL: refresh on current source data in batch style at one shot - * INCREMENTAL: auto refresh on new data in continuous streaming style - */ - object RefreshMode extends Enumeration { - type RefreshMode = Value - val MANUAL, AUTO = Value - } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index ffb479b54..9107a8a66 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -8,7 +8,7 @@ package org.opensearch.flint.spark import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -39,6 +39,15 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def refreshInterval(): Option[String] = getOptionValue(REFRESH_INTERVAL) + /** + * Is refresh incremental or full. This only applies to manual refresh. + * + * @return + * incremental option value + */ + def incrementalRefresh(): Boolean = + getOptionValue(INCREMENTAL_REFRESH).getOrElse("false").toBoolean + /** * The checkpoint location which maybe required by Flint index's refresh. * @@ -103,6 +112,9 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { if (!options.contains(AUTO_REFRESH.toString)) { map += (AUTO_REFRESH.toString -> autoRefresh().toString) } + if (!options.contains(INCREMENTAL_REFRESH.toString)) { + map += (INCREMENTAL_REFRESH.toString -> incrementalRefresh().toString) + } map.result() } @@ -131,6 +143,7 @@ object FlintSparkIndexOptions { type OptionName = Value val AUTO_REFRESH: OptionName.Value = Value("auto_refresh") val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") + val INCREMENTAL_REFRESH: OptionName.Value = Value("incremental_refresh") val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay") val OUTPUT_MODE: OptionName.Value = Value("output_mode") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala new file mode 100644 index 000000000..09428f80d --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.refresh + +import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions} +import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} + +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.flint.config.FlintSparkConf +import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY +import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger} + +/** + * Index refresh that auto refreshes the index by index options provided. + * + * @param indexName + * Flint index name + * @param index + * Flint index + */ +class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresh { + + override def refreshMode: RefreshMode = AUTO + + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { + val options = index.options + val tableName = index.metadata().source + index match { + // Flint index has specialized logic and capability for incremental refresh + case refresh: StreamingRefresh => + logInfo("Start refreshing index in streaming style") + val job = + refresh + .buildStream(spark) + .writeStream + .queryName(indexName) + .format(FLINT_DATASOURCE) + .options(flintSparkConf.properties) + .addSinkOptions(options, flintSparkConf) + .start(indexName) + Some(job.id.toString) + + // Otherwise, fall back to foreachBatch + batch refresh + case _ => + logInfo("Start refreshing index in foreach streaming style") + val job = spark.readStream + .options(options.extraSourceOptions(tableName)) + .table(quotedTableName(tableName)) + .writeStream + .queryName(indexName) + .addSinkOptions(options, flintSparkConf) + .foreachBatch { (batchDF: DataFrame, _: Long) => + new FullIndexRefresh(indexName, index, Some(batchDF)) + .start(spark, flintSparkConf) + () // discard return value above and return unit to use right overridden method + } + .start() + Some(job.id.toString) + } + } + + // Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API + private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) { + + def addSinkOptions( + options: FlintSparkIndexOptions, + flintSparkConf: FlintSparkConf): DataStreamWriter[Row] = { + dataStream + .addCheckpointLocation(options.checkpointLocation(), flintSparkConf.isCheckpointMandatory) + .addRefreshInterval(options.refreshInterval()) + .addAvailableNowTrigger(options.incrementalRefresh()) + .addOutputMode(options.outputMode()) + .options(options.extraSinkOptions()) + } + + def addCheckpointLocation( + checkpointLocation: Option[String], + isCheckpointMandatory: Boolean): DataStreamWriter[Row] = { + checkpointLocation match { + case Some(location) => dataStream.option("checkpointLocation", location) + case None if isCheckpointMandatory => + throw new IllegalStateException( + s"Checkpoint location is mandatory for incremental refresh if ${CHECKPOINT_MANDATORY.key} enabled") + case _ => dataStream + } + } + + def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = { + refreshInterval + .map(interval => dataStream.trigger(Trigger.ProcessingTime(interval))) + .getOrElse(dataStream) + } + + def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = { + if (incrementalRefresh) { + dataStream.trigger(Trigger.AvailableNow()) + } else { + dataStream + } + } + + def addOutputMode(outputMode: Option[String]): DataStreamWriter[Row] = { + outputMode.map(dataStream.outputMode).getOrElse(dataStream) + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala new file mode 100644 index 000000000..3c929d8e3 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.refresh + +import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.RefreshMode + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.flint.config.FlintSparkConf + +/** + * Flint Spark index refresh that sync index data with source in style defined by concrete + * implementation class. + */ +trait FlintSparkIndexRefresh extends Logging { + + /** + * @return + * refresh mode + */ + def refreshMode: RefreshMode + + /** + * Start refreshing the index. + * + * @param spark + * Spark session to submit job + * @param flintSparkConf + * Flint Spark configuration + * @return + * optional Spark job ID + */ + def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] +} + +object FlintSparkIndexRefresh { + + /** Index refresh mode */ + object RefreshMode extends Enumeration { + type RefreshMode = Value + val AUTO, FULL, INCREMENTAL = Value + } + + /** + * Create concrete index refresh implementation for the given index. + * + * @param indexName + * Flint index name + * @param index + * Flint index + * @return + * index refresh + */ + def create(indexName: String, index: FlintSparkIndex): FlintSparkIndexRefresh = { + val options = index.options + if (options.autoRefresh()) { + new AutoIndexRefresh(indexName, index) + } else if (options.incrementalRefresh()) { + new IncrementalIndexRefresh(indexName, index) + } else { + new FullIndexRefresh(indexName, index) + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala new file mode 100644 index 000000000..be09c2c36 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.refresh + +import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{FULL, RefreshMode} + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.SaveMode.Overwrite +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.flint.config.FlintSparkConf + +/** + * Index refresh that fully refreshes the index from the given source data frame. + * + * @param indexName + * Flint index name + * @param index + * Flint index + * @param source + * refresh from this data frame representing a micro batch or from the beginning + */ +class FullIndexRefresh( + indexName: String, + index: FlintSparkIndex, + source: Option[DataFrame] = None) + extends FlintSparkIndexRefresh { + + override def refreshMode: RefreshMode = FULL + + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { + logInfo(s"Start refreshing index $indexName in full mode") + index + .build(spark, source) + .write + .format(FLINT_DATASOURCE) + .options(flintSparkConf.properties) + .mode(Overwrite) + .save(indexName) + None + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala new file mode 100644 index 000000000..418ada902 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.refresh + +import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.flint.config.FlintSparkConf + +/** + * Index refresh that incrementally refreshes the index from the last checkpoint. + * + * @param indexName + * Flint index name + * @param index + * Flint index + */ +class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) + extends FlintSparkIndexRefresh { + + override def refreshMode: RefreshMode = INCREMENTAL + + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { + logInfo(s"Start refreshing index $indexName in incremental mode") + + // TODO: move this to validation method together in future + if (index.options.checkpointLocation().isEmpty) { + throw new IllegalStateException("Checkpoint location is required by incremental refresh") + } + + // Reuse auto refresh which uses AvailableNow trigger and will stop once complete + val jobId = + new AutoIndexRefresh(indexName, index) + .start(spark, flintSparkConf) + + spark.streams + .get(jobId.get) + .awaitTermination() + None + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index eae401a69..14fa21240 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -7,7 +7,6 @@ package org.opensearch.flint.spark.sql.covering import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index a67803a18..5b31890bb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -9,7 +9,6 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 73bff5cba..9b638f36f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -9,7 +9,6 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index b678096ca..212d91e13 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -15,6 +15,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { test("should return lowercase name as option name") { AUTO_REFRESH.toString shouldBe "auto_refresh" REFRESH_INTERVAL.toString shouldBe "refresh_interval" + INCREMENTAL_REFRESH.toString shouldBe "incremental_refresh" CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" WATERMARK_DELAY.toString shouldBe "watermark_delay" OUTPUT_MODE.toString shouldBe "output_mode" @@ -27,6 +28,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { Map( "auto_refresh" -> "true", "refresh_interval" -> "1 Minute", + "incremental_refresh" -> "true", "checkpoint_location" -> "s3://test/", "watermark_delay" -> "30 Seconds", "output_mode" -> "complete", @@ -44,6 +46,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.autoRefresh() shouldBe true options.refreshInterval() shouldBe Some("1 Minute") + options.incrementalRefresh() shouldBe true options.checkpointLocation() shouldBe Some("s3://test/") options.watermarkDelay() shouldBe Some("30 Seconds") options.outputMode() shouldBe Some("complete") @@ -85,6 +88,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.optionsWithDefault shouldBe Map( "auto_refresh" -> "false", + "incremental_refresh" -> "false", "refresh_interval" -> "1 Minute") } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexRefreshSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexRefreshSuite.scala new file mode 100644 index 000000000..e9226e1c8 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexRefreshSuite.scala @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.mockito.Mockito.{when, RETURNS_DEEP_STUBS} +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock + +import org.apache.spark.FlintSuite + +class FlintSparkIndexRefreshSuite extends FlintSuite with Matchers { + + /** Test index name */ + val indexName: String = "test" + + /** Mock Flint index */ + var index: FlintSparkIndex = _ + + override def beforeEach(): Unit = { + index = mock[FlintSparkIndex](RETURNS_DEEP_STUBS) + } + + test("should auto refresh if auto refresh option enabled") { + when(index.options.autoRefresh()).thenReturn(true) + + val refresh = FlintSparkIndexRefresh.create(indexName, index) + refresh.refreshMode shouldBe AUTO + } + + test("should full refresh if both auto and incremental refresh option disabled") { + when(index.options.autoRefresh()).thenReturn(false) + when(index.options.incrementalRefresh()).thenReturn(false) + + val refresh = FlintSparkIndexRefresh.create(indexName, index) + refresh.refreshMode shouldBe FULL + } + + test( + "should incremental refresh if auto refresh disabled but incremental refresh option enabled") { + when(index.options.autoRefresh()).thenReturn(false) + when(index.options.incrementalRefresh()).thenReturn(true) + + val refresh = FlintSparkIndexRefresh.create(indexName, index) + refresh.refreshMode shouldBe INCREMENTAL + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index b7746d44a..c1df42883 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -5,12 +5,12 @@ package org.opensearch.flint.spark.mv -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} import org.opensearch.flint.spark.FlintSparkIndexOptions import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedViewSuite.{streamingRelation, StreamingDslLogicalPlan} -import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} +import org.scalatest.matchers.should.Matchers.{contain, convertToAnyShouldWrapper, the} import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite @@ -77,9 +77,8 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { Map("test_col" -> "integer"), indexOptions) - mv.metadata().options shouldBe Map( - "auto_refresh" -> "true", - "index_settings" -> indexSettings).asJava + mv.metadata().options.asScala should contain allOf ("auto_refresh" -> "true", + "index_settings" -> indexSettings) mv.metadata().indexSettings shouldBe Some(indexSettings) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index d1996359f..a77d261cd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -59,7 +59,10 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.ci_test", - | "options": { "auto_refresh": "false" }, + | "options": { + | "auto_refresh": "false", + | "incremental_refresh": "false" + | }, | "properties": { | "filterCondition": "age > 30" | } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 450da14c9..3c9e06257 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -136,7 +136,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { } } - test("create covering index with manual refresh") { + test("create covering index with full refresh") { sql(s""" | CREATE INDEX $testIndex ON $testTable | (name, age) @@ -151,6 +151,35 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create covering index with incremental refresh") { + withTempDir { checkpointDir => + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH ( + | incremental_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH INDEX $testIndex ON $testTable") + flint.queryIndex(testFlintIndex).count() shouldBe 2 + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('Hello', 50, 'Vancouver') + |""".stripMargin) + flint.queryIndex(testFlintIndex).count() shouldBe 2 + + // New data is refreshed incrementally + sql(s"REFRESH INDEX $testIndex ON $testTable") + flint.queryIndex(testFlintIndex).count() shouldBe 3 + } + } + test("create covering index on table without database name") { sql(s"CREATE INDEX $testIndex ON covering_sql_test (name)") diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 4df6dc55b..586b4e877 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -73,6 +73,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | }], | "options": { | "auto_refresh": "true", + | "incremental_refresh": "false", | "checkpoint_location": "s3://test/", | "watermark_delay": "30 Seconds" | }, diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index ed702c7a1..20b7f3d55 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -129,7 +129,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } - test("create materialized view with manual refresh") { + test("create materialized view with full refresh") { sql(s""" | CREATE MATERIALIZED VIEW $testMvName | AS $testQuery @@ -146,6 +146,35 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 4 } + test("create materialized view with incremental refresh") { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | incremental_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + flint.queryIndex(testFlintIndex).count() shouldBe 3 + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver') + | """.stripMargin) + flint.queryIndex(testFlintIndex).count() shouldBe 3 + + // New data is refreshed incrementally + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + flint.queryIndex(testFlintIndex).count() shouldBe 4 + } + } + test("create materialized view if not exists") { sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") flint.describeIndex(testFlintIndex) shouldBe defined diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e68efdb7e..c3b8caffd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -7,10 +7,13 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ +import org.opensearch.client.RequestOptions import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.DeleteByQueryRequest import org.scalatest.matchers.{Matcher, MatchResult} import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -27,9 +30,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { private val testTable = "spark_catalog.default.test" private val testIndex = getSkippingIndexName(testTable) - override def beforeAll(): Unit = { - super.beforeAll() - + override def beforeEach(): Unit = { + super.beforeEach() createPartitionedMultiRowTable(testTable) } @@ -38,6 +40,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { // Delete all test indices deleteTestIndex(testIndex) + sql(s"DROP TABLE $testTable") } test("create skipping index with metadata successfully") { @@ -92,7 +95,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "string" | }], | "source": "spark_catalog.default.test", - | "options": { "auto_refresh": "false" }, + | "options": { + | "auto_refresh": "false", + | "incremental_refresh": "false" + | }, | "properties": {} | }, | "properties": { @@ -121,7 +127,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | } |""".stripMargin) - index.get.options shouldBe FlintSparkIndexOptions(Map("auto_refresh" -> "false")) + index.get.options shouldBe FlintSparkIndexOptions( + Map("auto_refresh" -> "false", "incremental_refresh" -> "false")) } test("create skipping index with index options successfully") { @@ -142,6 +149,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { optionJson should matchJson(""" | { | "auto_refresh": "true", + | "incremental_refresh": "false", | "refresh_interval": "1 Minute", | "checkpoint_location": "s3a://test/", | "index_settings": "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" @@ -184,6 +192,51 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("incremental refresh skipping index successfully") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "incremental_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + + flint.refreshIndex(testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 2 + + // Delete all index data intentionally and generate a new source file + openSearchClient.deleteByQuery( + new DeleteByQueryRequest(testIndex).setQuery(QueryBuilders.matchAllQuery()), + RequestOptions.DEFAULT) + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) + + // Expect to only refresh the new file + flint.refreshIndex(testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 1 + } + } + + test("should fail if incremental refresh without checkpoint location") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) + .create() + + assertThrows[IllegalStateException] { + flint.refreshIndex(testIndex) + } + } + + test("auto refresh skipping index successfully") { // Create Flint index and wait for complete flint .skippingIndex() @@ -565,7 +618,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "struct" | }], | "source": "$testTable", - | "options": { "auto_refresh": "false" }, + | "options": { + | "auto_refresh": "false", + | "incremental_refresh": "false" + | }, | "properties": {} | }, | "properties": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 3f94762a5..ca14a555c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -27,7 +27,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { private val testTable = "spark_catalog.default.skipping_sql_test" private val testIndex = getSkippingIndexName(testTable) - override def beforeAll(): Unit = { + override def beforeEach(): Unit = { super.beforeAll() createPartitionedMultiRowTable(testTable) @@ -37,6 +37,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { super.afterEach() deleteTestIndex(testIndex) + sql(s"DROP TABLE $testTable") } test("create skipping index with auto refresh") { @@ -142,7 +143,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { } } - test("create skipping index with manual refresh") { + test("create skipping index with full refresh") { sql(s""" | CREATE SKIPPING INDEX ON $testTable | ( @@ -161,6 +162,34 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create skipping index with incremental refresh") { + withTempDir { checkpointDir => + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | incremental_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 2 + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('Hello', 50, 'Vancouver') + |""".stripMargin) + flint.queryIndex(testIndex).count() shouldBe 2 + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 3 + } + } + test("should fail if refresh an auto refresh skipping index") { sql(s""" | CREATE SKIPPING INDEX ON $testTable diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index a2b93648e..b27275539 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -64,7 +64,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match (parse(mapping) \ "_meta" \ "latestId").extract[String] shouldBe testLatestId } - test("manual refresh index") { + test("full refresh index") { flint .skippingIndex() .onTable(testTable) @@ -78,6 +78,26 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match } test("incremental refresh index") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "incremental_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + flint.refreshIndex(testFlintIndex) + + val latest = latestLogEntry(testLatestId) + latest should contain("state" -> "active") + latest("jobStartTime").asInstanceOf[Number].longValue() should be > 0L + } + } + + test("auto refresh index") { flint .skippingIndex() .onTable(testTable)