From cff6cf359b145c59d04721390bb27b9a5ca8fa21 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Thu, 26 Oct 2023 13:06:29 -0700 Subject: [PATCH] Refactoring for tags usage in test files and also added explicit denly list setting. Signed-off-by: Vamsi Manohar --- .../dispatcher/SparkQueryDispatcher.java | 2 +- ...AsyncQueryExecutorServiceImplSpecTest.java | 28 +++++--- .../dispatcher/SparkQueryDispatcherTest.java | 67 ++++++++++--------- 3 files changed, 54 insertions(+), 43 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 5e80259e09..b603ee6909 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -55,7 +55,7 @@ public class SparkQueryDispatcher { private static final Logger LOG = LogManager.getLogger(); public static final String INDEX_TAG_KEY = "index"; public static final String DATASOURCE_TAG_KEY = "datasource"; - public static final String CLUSTER_NAME_TAG_KEY = "cluster"; + public static final String CLUSTER_NAME_TAG_KEY = "domain_ident"; public static final String JOB_TYPE_TAG_KEY = "type"; private EMRServerlessClient emrServerlessClient; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 4bc894c1b2..39ec132442 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -5,8 +5,7 @@ package org.opensearch.sql.spark.asyncquery; -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING; -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.*; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; @@ -28,12 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import lombok.Getter; import org.junit.After; import org.junit.Before; @@ -105,9 +99,18 @@ public List> getSettings() { @Before public void setup() { clusterService = clusterService(); + client = (NodeClient) cluster().client(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList()) + .build()) + .get(); clusterSettings = clusterService.getClusterSettings(); pluginSettings = new OpenSearchSettings(clusterSettings); - client = (NodeClient) cluster().client(); dataSourceService = createDataSourceService(); dataSourceService.createDataSource( new DataSourceMetadata( @@ -144,6 +147,13 @@ public void clean() { .setTransientSettings( Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) .get(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(DATASOURCE_URI_HOSTS_DENY_LIST.getKey()).build()) + .get(); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 95b6033d12..aaef4db6b8 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -33,6 +33,7 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; +import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.*; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; @@ -120,9 +121,9 @@ void setUp() { @Test void testDispatchSelectQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -175,9 +176,9 @@ void testDispatchSelectQuery() { @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -231,9 +232,9 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { @Test void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -346,10 +347,10 @@ void testDispatchSelectQueryFailedCreateSession() { @Test void testDispatchIndexQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.STREAMING.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -405,9 +406,9 @@ void testDispatchIndexQuery() { @Test void testDispatchWithPPLQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "source = my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -460,9 +461,9 @@ void testDispatchWithPPLQuery() { @Test void testDispatchQueryWithoutATableAndDataSourceName() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "show tables"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -515,10 +516,10 @@ void testDispatchQueryWithoutATableAndDataSourceName() { @Test void testDispatchIndexQueryWithoutADatasourceName() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.STREAMING.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -574,10 +575,10 @@ void testDispatchIndexQueryWithoutADatasourceName() { @Test void testDispatchMaterializedViewQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("index", "flint_mv_1"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.STREAMING.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(INDEX_TAG_KEY, "flint_mv_1"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); String query = "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + " (auto_refresh = true)"; @@ -633,8 +634,8 @@ void testDispatchMaterializedViewQuery() { @Test void testDispatchShowMVQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); String query = "SHOW MATERIALIZED VIEW IN mys3.default"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -687,8 +688,8 @@ void testDispatchShowMVQuery() { @Test void testRefreshIndexQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); String query = "REFRESH SKIPPING INDEX ON my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -741,8 +742,8 @@ void testRefreshIndexQuery() { @Test void testDispatchDescribeIndexQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); String query = "DESCRIBE SKIPPING INDEX ON mys3.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(