Skip to content

Commit

Permalink
Add create, drop and refresh covering index SQL support (opensearch-p…
Browse files Browse the repository at this point in the history
…roject#32)

* Separate out skipping index ast builder

Signed-off-by: Chen Dai <[email protected]>

* Add create index statement and IT

Signed-off-by: Chen Dai <[email protected]>

* Refactor Flint sql IT

Signed-off-by: Chen Dai <[email protected]>

* Refactor Flint AST builder for mix-in

Signed-off-by: Chen Dai <[email protected]>

* Extract util methods from each AST builder

Signed-off-by: Chen Dai <[email protected]>

* Add refresh and drop statement support

Signed-off-by: Chen Dai <[email protected]>

* Update doc

Signed-off-by: Chen Dai <[email protected]>

* Fix full table name in AST builder

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Sep 20, 2023
1 parent 7434e5a commit bd68653
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 127 deletions.
49 changes: 46 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ A Flint index is ...

### Feature Highlights

- Skipping Index
- Skipping Index: accelerate data scan by maintaining compact aggregate data structure which includes
- Partition: skip data scan by maintaining and filtering partitioned column value per file.
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.
- Covering Index: create index for selected columns within the source dataset to improve query performance

Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.

Expand Down Expand Up @@ -117,7 +118,7 @@ High level API is dependent on query engine implementation. Please see Query Eng

### SQL

DDL statement:
#### Skipping Index

```sql
CREATE SKIPPING INDEX
Expand All @@ -128,7 +129,7 @@ WITH (auto_refresh = (true|false))

REFRESH SKIPPING INDEX ON <object>

DESCRIBE SKIPPING INDEX ON <object>
[DESC|DESCRIBE] SKIPPING INDEX ON <object>

DROP SKIPPING INDEX ON <object>

Expand Down Expand Up @@ -157,6 +158,38 @@ DESCRIBE SKIPPING INDEX ON alb_logs
DROP SKIPPING INDEX ON alb_logs
```

#### Covering Index

```sql
CREATE INDEX name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))

REFRESH INDEX name ON <object>

SHOW [INDEX|INDEXES] ON <object>

[DESC|DESCRIBE] INDEX name ON <object>

DROP INDEX name ON <object>
```

Example:

```sql
CREATE INDEX elb_and_requestUri
ON alb_logs ( elb, requestUri )

REFRESH INDEX elb_and_requestUri ON alb_logs

SHOW INDEX ON alb_logs

DESCRIBE INDEX elb_and_requestUri ON alb_logs

DROP INDEX elb_and_requestUri ON alb_logs
```

## Index Store

### OpenSearch
Expand Down Expand Up @@ -264,6 +297,7 @@ Here is an example for Flint Spark integration:
```scala
val flint = new FlintSpark(spark)

// Skipping index
flint.skippingIndex()
.onTable("alb_logs")
.filterBy("time > 2023-04-01 00:00:00")
Expand All @@ -273,6 +307,15 @@ flint.skippingIndex()
.create()

flint.refresh("flint_alb_logs_skipping_index", FULL)

// Covering index
flint.coveringIndex()
.name("elb_and_requestUri")
.onTable("alb_logs")
.addIndexColumns("elb", "requestUri")
.create()

flint.refresh("flint_alb_logs_elb_and_requestUri_index")
```

#### Skipping Index Provider SPI
Expand Down
21 changes: 21 additions & 0 deletions flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ singleStatement

statement
: skippingIndexStatement
| coveringIndexStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -43,6 +44,26 @@ dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName=multipartIdentifier
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName=identifier ON tableName=multipartIdentifier
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier
;

dropCoveringIndexStatement
: DROP INDEX indexName=identifier ON tableName=multipartIdentifier
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
8 changes: 8 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ grammar SparkSqlBase;
}


multipartIdentifierPropertyList
: multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
;

multipartIdentifierProperty
: multipartIdentifier (options=propertyList)?
;

propertyList
: property (COMMA property)*
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,84 +7,34 @@ package org.opensearch.flint.spark.sql

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.PropertyListContext
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint index statement.
* This class mix-in all other AST builders and provides util methods.
*/
class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] {
class FlintSparkSqlAstBuilder
extends FlintSparkSqlExtensionsBaseVisitor[Command]
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder {

override def visitCreateSkippingIndexStatement(
ctx: CreateSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
// Create skipping index
val indexBuilder = flint
.skippingIndex()
.onTable(getFullTableName(flint, ctx.tableName))

ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
val colName = colTypeCtx.identifier().getText
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
skipType match {
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET => indexBuilder.addValueSet(colName)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
indexBuilder.create()

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName, RefreshMode.INCREMENTAL)
}
Seq.empty
}

override def visitRefreshSkippingIndexStatement(
ctx: RefreshSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName, RefreshMode.FULL)
Seq.empty
}

override def visitDescribeSkippingIndexStatement(
ctx: DescribeSkippingIndexStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("indexed_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("skip_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint
.describeIndex(indexName)
.map { case index: FlintSparkSkippingIndex =>
index.indexedColumns.map(strategy =>
Row(strategy.columnName, strategy.columnType, strategy.kind.toString))
}
.getOrElse(Seq.empty)
}
}
override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate
}

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.deleteIndex(indexName)
Seq.empty
}
object FlintSparkSqlAstBuilder {

private def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
/**
* Check if auto_refresh is true in property list.
*
* @param ctx
* property list
*/
def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
if (ctx == null) {
false
} else {
Expand All @@ -99,10 +49,16 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command
}
}

private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String =
FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx))

private def getFullTableName(flint: FlintSpark, tableNameCtx: RuleNode): String = {
/**
* Get full table name if database not specified.
*
* @param flint
* Flint Spark which has access to Spark Catalog
* @param tableNameCtx
* table name
* @return
*/
def getFullTableName(flint: FlintSpark, tableNameCtx: RuleNode): String = {
val tableName = tableNameCtx.getText
if (tableName.contains(".")) {
tableName
Expand All @@ -111,7 +67,4 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command
s"$db.$tableName"
}
}

override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql.covering

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext}

import org.apache.spark.sql.catalyst.plans.logical.Command

/**
* Flint Spark AST builder that builds Spark command for Flint covering index statement.
*/
trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] {

override def visitCreateCoveringIndexStatement(
ctx: CreateCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val indexName = ctx.indexName.getText
val tableName = ctx.tableName.getText
val indexBuilder =
flint
.coveringIndex()
.name(indexName)
.onTable(tableName)

ctx.indexColumns.multipartIdentifierProperty().forEach { indexColCtx =>
val colName = indexColCtx.multipartIdentifier().getText
indexBuilder.addIndexColumns(colName)
}
indexBuilder.create()

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL)
}
Seq.empty
}
}

override def visitRefreshCoveringIndexStatement(
ctx: RefreshCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName, RefreshMode.FULL)
Seq.empty
}
}

override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.deleteIndex(flintIndexName)
Seq.empty
}
}

private def getFlintIndexName(
flint: FlintSpark,
indexNameCtx: RuleNode,
tableNameCtx: RuleNode): String = {
val indexName = indexNameCtx.getText
val fullTableName = getFullTableName(flint, tableNameCtx)
FlintSparkCoveringIndex.getFlintIndexName(indexName, fullTableName)
}
}
Loading

0 comments on commit bd68653

Please sign in to comment.