From b9f2a3986a993660ba300751f9ffc3f06860433c Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 8 Nov 2023 21:24:33 +0000 Subject: [PATCH] Revert "Add missing tags and MV support (#2336) (#2346)" This reverts commit 8791bb01ea9f96e304f7cea08abb313e36f419ce. Signed-off-by: Eric --- common/build.gradle | 2 +- integ-test/build.gradle | 2 +- ppl/build.gradle | 2 +- .../src/main/antlr/FlintSparkSqlExtensions.g4 | 34 --- spark/src/main/antlr/SparkSqlBase.g4 | 5 - spark/src/main/antlr/SqlBaseLexer.g4 | 1 - spark/src/main/antlr/SqlBaseParser.g4 | 1 - .../spark/data/constants/SparkConstants.java | 2 + .../dispatcher/SparkQueryDispatcher.java | 33 +-- .../spark/dispatcher/model/IndexDetails.java | 145 +++--------- .../sql/spark/dispatcher/model/JobType.java | 37 --- .../sql/spark/utils/SQLQueryUtils.java | 47 ++-- .../dispatcher/SparkQueryDispatcherTest.java | 222 ++++++------------ .../FlintIndexMetadataReaderImplTest.java | 58 +++-- .../sql/spark/flint/IndexDetailsTest.java | 13 +- .../sql/spark/utils/SQLQueryUtilsTest.java | 43 +--- 16 files changed, 187 insertions(+), 460 deletions(-) delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/JobType.java diff --git a/common/build.gradle b/common/build.gradle index 3a04e87fe7..507ad6c0d6 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -34,7 +34,7 @@ repositories { dependencies { api "org.antlr:antlr4-runtime:4.7.1" api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' - api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" + api group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.20.0' api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' implementation 'com.github.babbel:okhttp-aws-signer:1.0.2' diff --git a/integ-test/build.gradle b/integ-test/build.gradle index c48d43d3e5..dd646d7a66 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -167,7 +167,7 @@ dependencies { testImplementation group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}" testImplementation group: 'org.opensearch.driver', name: 'opensearch-sql-jdbc', version: System.getProperty("jdbcDriverVersion", '1.2.0.0') testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.1' - implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" + implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.20.0' testImplementation project(':opensearch-sql-plugin') testImplementation project(':legacy') testImplementation('org.junit.jupiter:junit-jupiter-api:5.6.2') diff --git a/ppl/build.gradle b/ppl/build.gradle index 6d0a67c443..7408d7ad2b 100644 --- a/ppl/build.gradle +++ b/ppl/build.gradle @@ -49,7 +49,7 @@ dependencies { implementation "org.antlr:antlr4-runtime:4.7.1" implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' api group: 'org.json', name: 'json', version: '20231013' - implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" + implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.20.0' api project(':common') api project(':core') api project(':protocol') diff --git a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 index c4af2779d1..e8e0264f28 100644 --- a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 +++ b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 @@ -17,7 +17,6 @@ singleStatement statement : skippingIndexStatement | coveringIndexStatement - | materializedViewStatement ; skippingIndexStatement @@ -77,39 +76,6 @@ dropCoveringIndexStatement : DROP INDEX indexName ON tableName ; -materializedViewStatement - : createMaterializedViewStatement - | showMaterializedViewStatement - | describeMaterializedViewStatement - | dropMaterializedViewStatement - ; - -createMaterializedViewStatement - : CREATE MATERIALIZED VIEW (IF NOT EXISTS)? mvName=multipartIdentifier - AS query=materializedViewQuery - (WITH LEFT_PAREN propertyList RIGHT_PAREN)? - ; - -showMaterializedViewStatement - : SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier - ; - -describeMaterializedViewStatement - : (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier - ; - -dropMaterializedViewStatement - : DROP MATERIALIZED VIEW mvName=multipartIdentifier - ; - -/* - * Match all remaining tokens in non-greedy way - * so WITH clause won't be captured by this rule. - */ -materializedViewQuery - : .+? - ; - indexColTypeList : indexColType (COMMA indexColType)* ; diff --git a/spark/src/main/antlr/SparkSqlBase.g4 b/spark/src/main/antlr/SparkSqlBase.g4 index 533d851ba6..4ac1ced5c4 100644 --- a/spark/src/main/antlr/SparkSqlBase.g4 +++ b/spark/src/main/antlr/SparkSqlBase.g4 @@ -154,7 +154,6 @@ COMMA: ','; DOT: '.'; -AS: 'AS'; CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; @@ -162,18 +161,14 @@ DROP: 'DROP'; EXISTS: 'EXISTS'; FALSE: 'FALSE'; IF: 'IF'; -IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; -MATERIALIZED: 'MATERIALIZED'; NOT: 'NOT'; ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; -VIEW: 'VIEW'; -VIEWS: 'VIEWS'; WITH: 'WITH'; diff --git a/spark/src/main/antlr/SqlBaseLexer.g4 b/spark/src/main/antlr/SqlBaseLexer.g4 index e8b5cb012f..d9128de0f5 100644 --- a/spark/src/main/antlr/SqlBaseLexer.g4 +++ b/spark/src/main/antlr/SqlBaseLexer.g4 @@ -447,7 +447,6 @@ PIPE: '|'; CONCAT_PIPE: '||'; HAT: '^'; COLON: ':'; -DOUBLE_COLON: '::'; ARROW: '->'; FAT_ARROW : '=>'; HENT_START: '/*+'; diff --git a/spark/src/main/antlr/SqlBaseParser.g4 b/spark/src/main/antlr/SqlBaseParser.g4 index 84a31dafed..77a9108e06 100644 --- a/spark/src/main/antlr/SqlBaseParser.g4 +++ b/spark/src/main/antlr/SqlBaseParser.g4 @@ -957,7 +957,6 @@ primaryExpression | CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase | CASE value=expression whenClause+ (ELSE elseExpression=expression)? END #simpleCase | name=(CAST | TRY_CAST) LEFT_PAREN expression AS dataType RIGHT_PAREN #cast - | primaryExpression DOUBLE_COLON dataType #castByColon | STRUCT LEFT_PAREN (argument+=namedExpression (COMMA argument+=namedExpression)*)? RIGHT_PAREN #struct | FIRST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #first | ANY_VALUE LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #any_value diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index e8659c680c..85ce3c4989 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -26,6 +26,8 @@ public class SparkConstants { public static final String FLINT_INTEGRATION_JAR = "s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar"; // TODO should be replaced with mvn jar. + public static final String FLINT_CATALOG_JAR = + "s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar"; public static final String FLINT_DEFAULT_HOST = "localhost"; public static final String FLINT_DEFAULT_PORT = "9200"; public static final String FLINT_DEFAULT_SCHEME = "http"; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index ff7ccf8c08..882f2663d9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -38,8 +38,8 @@ import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; +import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexDetails; -import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.session.CreateSessionRequest; import org.opensearch.sql.spark.execution.session.Session; import org.opensearch.sql.spark.execution.session.SessionId; @@ -59,8 +59,9 @@ public class SparkQueryDispatcher { public static final String INDEX_TAG_KEY = "index"; public static final String DATASOURCE_TAG_KEY = "datasource"; + public static final String SCHEMA_TAG_KEY = "schema"; + public static final String TABLE_TAG_KEY = "table"; public static final String CLUSTER_NAME_TAG_KEY = "cluster"; - public static final String JOB_TYPE_TAG_KEY = "job_type"; private EMRServerlessClient emrServerlessClient; @@ -110,8 +111,6 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) { IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery()); - fillMissingDetails(dispatchQueryRequest, indexDetails); - if (indexDetails.isDropIndex()) { return handleDropIndexQuery(dispatchQueryRequest, indexDetails); } else { @@ -122,29 +121,17 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR } } - // TODO: Revisit this logic. - // Currently, Spark if datasource is not provided in query. - // Spark Assumes the datasource to be catalog. - // This is required to handle drop index case properly when datasource name is not provided. - private static void fillMissingDetails( - DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) { - if (indexDetails.getFullyQualifiedTableName() != null - && indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) { - indexDetails - .getFullyQualifiedTableName() - .setDatasourceName(dispatchQueryRequest.getDatasource()); - } - } - private DispatchQueryResponse handleIndexQuery( DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) { + FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query"; Map tags = getDefaultTagsForJobSubmission(dispatchQueryRequest); - tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName()); - tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); + tags.put(INDEX_TAG_KEY, indexDetails.getIndexName()); + tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName()); + tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName()); StartJobRequest startJobRequest = new StartJobRequest( dispatchQueryRequest.getQuery(), @@ -155,12 +142,12 @@ private DispatchQueryResponse handleIndexQuery( .dataSource( dataSourceService.getRawDataSourceMetadata( dispatchQueryRequest.getDatasource())) - .structuredStreaming(indexDetails.isAutoRefresh()) + .structuredStreaming(indexDetails.getAutoRefresh()) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()) .build() .toString(), tags, - indexDetails.isAutoRefresh(), + indexDetails.getAutoRefresh(), dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); return new DispatchQueryResponse( @@ -191,7 +178,6 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ session = createdSession.get(); } else { // create session if not exist - tags.put(JOB_TYPE_TAG_KEY, JobType.INTERACTIVE.getText()); session = sessionManager.createSession( new CreateSessionRequest( @@ -218,7 +204,6 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ dataSourceMetadata.getResultIndex(), session.getSessionId().getSessionId()); } else { - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); StartJobRequest startJobRequest = new StartJobRequest( dispatchQueryRequest.getQuery(), diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java index 42e2905e67..1cc66da9fc 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java @@ -5,129 +5,56 @@ package org.opensearch.sql.spark.dispatcher.model; -import com.google.common.base.Preconditions; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.Getter; -import org.apache.commons.lang3.StringUtils; +import lombok.NoArgsConstructor; import org.opensearch.sql.spark.flint.FlintIndexType; /** Index details in an async query. */ -@Getter +@Data +@AllArgsConstructor +@NoArgsConstructor @EqualsAndHashCode public class IndexDetails { - - public static final String STRIP_CHARS = "`"; - private String indexName; private FullyQualifiedTableName fullyQualifiedTableName; // by default, auto_refresh = false; - private boolean autoRefresh; + private Boolean autoRefresh = false; private boolean isDropIndex; - // materialized view special case where - // table name and mv name are combined. - private String mvName; private FlintIndexType indexType; - private IndexDetails() {} - - public static IndexDetailsBuilder builder() { - return new IndexDetailsBuilder(); - } - - // Builder class - public static class IndexDetailsBuilder { - private final IndexDetails indexDetails; - - public IndexDetailsBuilder() { - indexDetails = new IndexDetails(); - } - - public IndexDetailsBuilder indexName(String indexName) { - indexDetails.indexName = indexName; - return this; - } - - public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) { - indexDetails.fullyQualifiedTableName = tableName; - return this; - } - - public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) { - indexDetails.autoRefresh = autoRefresh; - return this; - } - - public IndexDetailsBuilder isDropIndex(boolean isDropIndex) { - indexDetails.isDropIndex = isDropIndex; - return this; - } - - public IndexDetailsBuilder mvName(String mvName) { - indexDetails.mvName = mvName; - return this; - } - - public IndexDetailsBuilder indexType(FlintIndexType indexType) { - indexDetails.indexType = indexType; - return this; - } - - public IndexDetails build() { - Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null"); - switch (indexDetails.indexType) { - case COVERING: - Preconditions.checkNotNull( - indexDetails.indexName, "IndexName can't be null for Covering Index."); - Preconditions.checkNotNull( - indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index."); - break; - case SKIPPING: - Preconditions.checkNotNull( - indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index."); - break; - case MATERIALIZED_VIEW: - Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null"); - break; - } - - return indexDetails; - } - } - public String openSearchIndexName() { FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName(); - String indexName = StringUtils.EMPTY; - switch (getIndexType()) { - case COVERING: - indexName = - "flint" - + "_" - + StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS) - + "_" - + StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS) - + "_" - + StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS) - + "_" - + StringUtils.strip(getIndexName(), STRIP_CHARS) - + "_" - + getIndexType().getSuffix(); - break; - case SKIPPING: - indexName = - "flint" - + "_" - + StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS) - + "_" - + StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS) - + "_" - + StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS) - + "_" - + getIndexType().getSuffix(); - break; - case MATERIALIZED_VIEW: - indexName = "flint" + "_" + StringUtils.strip(getMvName(), STRIP_CHARS).toLowerCase(); - break; + if (FlintIndexType.SKIPPING.equals(getIndexType())) { + String indexName = + "flint" + + "_" + + fullyQualifiedTableName.getDatasourceName() + + "_" + + fullyQualifiedTableName.getSchemaName() + + "_" + + fullyQualifiedTableName.getTableName() + + "_" + + getIndexType().getSuffix(); + return indexName.toLowerCase(); + } else if (FlintIndexType.COVERING.equals(getIndexType())) { + String indexName = + "flint" + + "_" + + fullyQualifiedTableName.getDatasourceName() + + "_" + + fullyQualifiedTableName.getSchemaName() + + "_" + + fullyQualifiedTableName.getTableName() + + "_" + + getIndexName() + + "_" + + getIndexType().getSuffix(); + return indexName.toLowerCase(); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported Index Type : %s", getIndexType())); } - return indexName.toLowerCase(); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/JobType.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/JobType.java deleted file mode 100644 index 01f5f422e9..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/JobType.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.dispatcher.model; - -public enum JobType { - INTERACTIVE("interactive"), - STREAMING("streaming"), - BATCH("batch"); - - private String text; - - JobType(String text) { - this.text = text; - } - - public String getText() { - return this.text; - } - - /** - * Get JobType from text. - * - * @param text text. - * @return JobType {@link JobType}. - */ - public static JobType fromString(String text) { - for (JobType JobType : JobType.values()) { - if (JobType.text.equalsIgnoreCase(text)) { - return JobType; - } - } - throw new IllegalArgumentException("No JobType with text " + text + " found"); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index 4816f1c2cd..f6b75d49ef 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -52,7 +52,7 @@ public static IndexDetails extractIndexDetails(String sqlQuery) { flintSparkSqlExtensionsParser.statement(); FlintSQLIndexDetailsVisitor flintSQLIndexDetailsVisitor = new FlintSQLIndexDetailsVisitor(); statementContext.accept(flintSQLIndexDetailsVisitor); - return flintSQLIndexDetailsVisitor.getIndexDetailsBuilder().build(); + return flintSQLIndexDetailsVisitor.getIndexDetails(); } public static boolean isIndexQuery(String sqlQuery) { @@ -117,29 +117,29 @@ public Void visitCreateTableHeader(SqlBaseParser.CreateTableHeaderContext ctx) { public static class FlintSQLIndexDetailsVisitor extends FlintSparkSqlExtensionsBaseVisitor { - @Getter private final IndexDetails.IndexDetailsBuilder indexDetailsBuilder; + @Getter private final IndexDetails indexDetails; public FlintSQLIndexDetailsVisitor() { - this.indexDetailsBuilder = new IndexDetails.IndexDetailsBuilder(); + this.indexDetails = new IndexDetails(); } @Override public Void visitIndexName(FlintSparkSqlExtensionsParser.IndexNameContext ctx) { - indexDetailsBuilder.indexName(ctx.getText()); + indexDetails.setIndexName(ctx.getText()); return super.visitIndexName(ctx); } @Override public Void visitTableName(FlintSparkSqlExtensionsParser.TableNameContext ctx) { - indexDetailsBuilder.fullyQualifiedTableName(new FullyQualifiedTableName(ctx.getText())); + indexDetails.setFullyQualifiedTableName(new FullyQualifiedTableName(ctx.getText())); return super.visitTableName(ctx); } @Override public Void visitCreateSkippingIndexStatement( FlintSparkSqlExtensionsParser.CreateSkippingIndexStatementContext ctx) { - indexDetailsBuilder.isDropIndex(false); - indexDetailsBuilder.indexType(FlintIndexType.SKIPPING); + indexDetails.setDropIndex(false); + indexDetails.setIndexType(FlintIndexType.SKIPPING); visitPropertyList(ctx.propertyList()); return super.visitCreateSkippingIndexStatement(ctx); } @@ -147,47 +147,28 @@ public Void visitCreateSkippingIndexStatement( @Override public Void visitCreateCoveringIndexStatement( FlintSparkSqlExtensionsParser.CreateCoveringIndexStatementContext ctx) { - indexDetailsBuilder.isDropIndex(false); - indexDetailsBuilder.indexType(FlintIndexType.COVERING); + indexDetails.setDropIndex(false); + indexDetails.setIndexType(FlintIndexType.COVERING); visitPropertyList(ctx.propertyList()); return super.visitCreateCoveringIndexStatement(ctx); } - @Override - public Void visitCreateMaterializedViewStatement( - FlintSparkSqlExtensionsParser.CreateMaterializedViewStatementContext ctx) { - indexDetailsBuilder.isDropIndex(false); - indexDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); - indexDetailsBuilder.mvName(ctx.mvName.getText()); - visitPropertyList(ctx.propertyList()); - return super.visitCreateMaterializedViewStatement(ctx); - } - @Override public Void visitDropCoveringIndexStatement( FlintSparkSqlExtensionsParser.DropCoveringIndexStatementContext ctx) { - indexDetailsBuilder.isDropIndex(true); - indexDetailsBuilder.indexType(FlintIndexType.COVERING); + indexDetails.setDropIndex(true); + indexDetails.setIndexType(FlintIndexType.COVERING); return super.visitDropCoveringIndexStatement(ctx); } @Override public Void visitDropSkippingIndexStatement( FlintSparkSqlExtensionsParser.DropSkippingIndexStatementContext ctx) { - indexDetailsBuilder.isDropIndex(true); - indexDetailsBuilder.indexType(FlintIndexType.SKIPPING); + indexDetails.setDropIndex(true); + indexDetails.setIndexType(FlintIndexType.SKIPPING); return super.visitDropSkippingIndexStatement(ctx); } - @Override - public Void visitDropMaterializedViewStatement( - FlintSparkSqlExtensionsParser.DropMaterializedViewStatementContext ctx) { - indexDetailsBuilder.isDropIndex(true); - indexDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); - indexDetailsBuilder.mvName(ctx.mvName.getText()); - return super.visitDropMaterializedViewStatement(ctx); - } - @Override public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext ctx) { if (ctx != null) { @@ -199,7 +180,7 @@ public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext // https://github.com/apache/spark/blob/v3.5.0/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala#L35 to unescape string literal if (propertyKey(property.key).toLowerCase(Locale.ROOT).contains("auto_refresh")) { if (propertyValue(property.value).toLowerCase(Locale.ROOT).contains("true")) { - indexDetailsBuilder.autoRefresh(true); + indexDetails.setAutoRefresh(true); } } }); diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 700acb973e..4acccae0e2 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -67,7 +67,6 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexDetails; -import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.session.Session; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.session.SessionManager; @@ -125,7 +124,6 @@ void testDispatchSelectQuery() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -180,7 +178,6 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -236,7 +233,6 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -369,9 +365,10 @@ void testDispatchSelectQueryFailedCreateSession() { void testDispatchIndexQuery() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); - tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); + tags.put("table", "http_logs"); + tags.put("index", "elb_and_requestUri"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.STREAMING.getText()); + tags.put("schema", "default"); String query = "CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -429,7 +426,7 @@ void testDispatchWithPPLQuery() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); + String query = "source = my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -484,7 +481,7 @@ void testDispatchQueryWithoutATableAndDataSourceName() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); + String query = "show tables"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -538,70 +535,13 @@ void testDispatchQueryWithoutATableAndDataSourceName() { void testDispatchIndexQueryWithoutADatasourceName() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); - tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); + tags.put("table", "http_logs"); + tags.put("index", "elb_and_requestUri"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.STREAMING.getText()); - String query = - "CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH" - + " (auto_refresh = true)"; - String sparkSubmitParameters = - withStructuredStreaming( - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - })); - when(emrServerlessClient.startJobRun( - new StartJobRequest( - query, - "TEST_CLUSTER:index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - true, - any()))) - .thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - new DispatchQueryRequest( - EMRS_APPLICATION_ID, - query, - "my_glue", - LangType.SQL, - EMRS_EXECUTION_ROLE, - TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - StartJobRequest expected = - new StartJobRequest( - query, - "TEST_CLUSTER:index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - true, - null); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); - verifyNoInteractions(flintIndexMetadataReader); - } + tags.put("schema", "default"); - @Test - void testDispatchMaterializedViewQuery() { - HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("index", "flint_mv_1"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.STREAMING.getText()); String query = - "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + "CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; String sparkSubmitParameters = withStructuredStreaming( @@ -901,15 +841,13 @@ void testGetQueryResponseOfDropIndex() { @Test void testDropIndexQuery() throws ExecutionException, InterruptedException { String query = "DROP INDEX size_year ON my_glue.default.http_logs"; - IndexDetails indexDetails = - IndexDetails.builder() - .indexName("size_year") - .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) - .autoRefresh(false) - .isDropIndex(true) - .indexType(FlintIndexType.COVERING) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata( + new IndexDetails( + "size_year", + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.COVERING))) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); // auto_refresh == true @@ -938,7 +876,15 @@ void testDropIndexQuery() throws ExecutionException, InterruptedException { TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)) + .getFlintIndexMetadata( + new IndexDetails( + "size_year", + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.COVERING)); + SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -948,14 +894,13 @@ void testDropIndexQuery() throws ExecutionException, InterruptedException { @Test void testDropSkippingIndexQuery() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexDetails indexDetails = - IndexDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) - .autoRefresh(false) - .isDropIndex(true) - .indexType(FlintIndexType.SKIPPING) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING))) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); @@ -982,7 +927,14 @@ void testDropSkippingIndexQuery() throws ExecutionException, InterruptedExceptio TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)) + .getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING)); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -993,14 +945,13 @@ void testDropSkippingIndexQuery() throws ExecutionException, InterruptedExceptio void testDropSkippingIndexQueryAutoRefreshFalse() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexDetails indexDetails = - IndexDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) - .autoRefresh(false) - .isDropIndex(true) - .indexType(FlintIndexType.SKIPPING) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING))) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); @@ -1021,7 +972,14 @@ void testDropSkippingIndexQueryAutoRefreshFalse() TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)) + .getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING)); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -1032,14 +990,13 @@ void testDropSkippingIndexQueryAutoRefreshFalse() void testDropSkippingIndexQueryDeleteIndexException() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexDetails indexDetails = - IndexDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) - .autoRefresh(false) - .isDropIndex(true) - .indexType(FlintIndexType.SKIPPING) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING))) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); @@ -1061,7 +1018,14 @@ void testDropSkippingIndexQueryDeleteIndexException() TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)) + .getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING)); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.FAILED.toString(), dropIndexResult.getStatus()); @@ -1071,52 +1035,6 @@ void testDropSkippingIndexQueryDeleteIndexException() Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); } - @Test - void testDropMVQuery() throws ExecutionException, InterruptedException { - String query = "DROP MATERIALIZED VIEW mv_1"; - IndexDetails indexDetails = - IndexDetails.builder() - .mvName("mv_1") - .isDropIndex(true) - .fullyQualifiedTableName(null) - .indexType(FlintIndexType.MATERIALIZED_VIEW) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) - .thenReturn(flintIndexMetadata); - when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); - // auto_refresh == true - when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); - - when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) - .thenReturn( - new CancelJobRunResult() - .withJobRunId(EMR_JOB_ID) - .withApplicationId(EMRS_APPLICATION_ID)); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); - - AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); - when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); - when(acknowledgedResponse.isAcknowledged()).thenReturn(true); - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - new DispatchQueryRequest( - EMRS_APPLICATION_ID, - query, - "my_glue", - LangType.SQL, - EMRS_EXECUTION_ROLE, - TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); - verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); - SparkQueryDispatcher.DropIndexResult dropIndexResult = - SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); - Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); - Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); - } - @Test void testDispatchQueryWithExtraSparkSubmitParameters() { DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java index 3cc40e0df5..b0c8491b0b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java @@ -44,12 +44,12 @@ void testGetJobIdFromFlintSkippingIndexMetadata() { FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata( - IndexDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .isDropIndex(true) - .indexType(FlintIndexType.SKIPPING) - .build()); + new IndexDetails( + null, + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING)); Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); } @@ -64,13 +64,12 @@ void testGetJobIdFromFlintCoveringIndexMetadata() { FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata( - IndexDetails.builder() - .indexName("cv1") - .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .isDropIndex(true) - .indexType(FlintIndexType.COVERING) - .build()); + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.COVERING)); Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); } @@ -87,17 +86,34 @@ void testGetJobIDWithNPEException() { IllegalArgumentException.class, () -> flintIndexMetadataReader.getFlintIndexMetadata( - IndexDetails.builder() - .indexName("cv1") - .fullyQualifiedTableName( - new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .isDropIndex(true) - .indexType(FlintIndexType.COVERING) - .build())); + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.COVERING))); Assertions.assertEquals("Provided Index doesn't exist", illegalArgumentException.getMessage()); } + @SneakyThrows + @Test + void testGetJobIdFromUnsupportedIndex() { + FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); + UnsupportedOperationException unsupportedOperationException = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + flintIndexMetadataReader.getFlintIndexMetadata( + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.MATERIALIZED_VIEW))); + Assertions.assertEquals( + "Unsupported Index Type : MATERIALIZED_VIEW", unsupportedOperationException.getMessage()); + } + @SneakyThrows public void mockNodeClientIndicesMappings(String indexName, String mappings) { GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java index cf6b5f8f2b..46fa4f7dbe 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java @@ -16,13 +16,12 @@ public class IndexDetailsTest { public void skippingIndexName() { assertEquals( "flint_mys3_default_http_logs_skipping_index", - IndexDetails.builder() - .indexName("invalid") - .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .isDropIndex(true) - .indexType(FlintIndexType.SKIPPING) - .build() + new IndexDetails( + "invalid", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING) .openSearchIndexName()); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index 01759c2bdd..af892fa097 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -6,7 +6,6 @@ package org.opensearch.sql.spark.utils; import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.index; -import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.mv; import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.skippingIndex; import lombok.Getter; @@ -113,67 +112,50 @@ void testExtractionFromFlintIndexQueries() { Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); } - @Test - void testExtractionFromFlintMVQuery() { - String createCoveredIndexQuery = - "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" - + " (auto_refresh = true)"; - Assertions.assertTrue(SQLQueryUtils.isIndexQuery(createCoveredIndexQuery)); - IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery); - FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); - Assertions.assertNull(indexDetails.getIndexName()); - Assertions.assertNull(fullyQualifiedTableName); - Assertions.assertEquals("mv_1", indexDetails.getMvName()); - } - /** https://github.com/opensearch-project/sql/issues/2206 */ @Test void testAutoRefresh() { Assertions.assertFalse( - SQLQueryUtils.extractIndexDetails(skippingIndex().getQuery()).isAutoRefresh()); + SQLQueryUtils.extractIndexDetails(skippingIndex().getQuery()).getAutoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "false").getQuery()) - .isAutoRefresh()); + .getAutoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "true").getQuery()) - .isAutoRefresh()); + .getAutoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("\"auto_refresh\"", "true").getQuery()) - .isAutoRefresh()); + .getAutoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("\"auto_refresh\"", "\"true\"").getQuery()) - .isAutoRefresh()); + .getAutoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "1").getQuery()) - .isAutoRefresh()); + .getAutoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails(skippingIndex().withProperty("interval", "1").getQuery()) - .isAutoRefresh()); + .getAutoRefresh()); - Assertions.assertFalse(SQLQueryUtils.extractIndexDetails(index().getQuery()).isAutoRefresh()); + Assertions.assertFalse(SQLQueryUtils.extractIndexDetails(index().getQuery()).getAutoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "false").getQuery()) - .isAutoRefresh()); + .getAutoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "true").getQuery()) - .isAutoRefresh()); - - Assertions.assertTrue( - SQLQueryUtils.extractIndexDetails(mv().withProperty("auto_refresh", "true").getQuery()) - .isAutoRefresh()); + .getAutoRefresh()); } @Getter @@ -194,11 +176,6 @@ public static IndexQuery index() { "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, " + "l_quantity)"); } - public static IndexQuery mv() { - return new IndexQuery( - "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs"); - } - public IndexQuery withProperty(String key, String value) { query = String.format("%s with (%s = %s)", query, key, value); return this;