From 53bce1cf79e580b1c2cf247691ea65b25aed0e93 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Mon, 15 Apr 2024 14:01:47 -0700 Subject: [PATCH] add async-query-core module Signed-off-by: Vamsi Manohar --- settings.gradle | 2 +- spark/build.gradle | 10 +++ spark/src/main/antlr/SqlBaseLexer.g4 | 1 + spark/src/main/antlr/SqlBaseParser.g4 | 4 +- .../model/AsyncQueryJobMetadata.java | 20 ++--- .../spark/dispatcher/BatchQueryHandler.java | 4 +- .../sql/spark/dispatcher/IndexDMLHandler.java | 9 +- .../dispatcher/InteractiveQueryHandler.java | 2 +- .../dispatcher/SparkQueryDispatcher.java | 6 +- .../execution/session/InteractiveSession.java | 4 - .../session/SessionConfigSupplier.java | 6 ++ .../execution/session/SessionManager.java | 17 +--- .../spark/execution/statement/Statement.java | 2 - ...yncQueryJobMetadataXContentSerializer.java | 2 +- .../flint/FlintIndexMetadataService.java | 7 ++ .../flint/FlintIndexMetadataServiceImpl.java | 10 +++ .../flint/operation/FlintIndexOpVacuum.java | 16 ++-- .../response/JobExecutionResponseReader.java | 84 ++++--------------- .../JobExecutionResponseReaderImpl.java | 81 ++++++++++++++++++ .../config/AsyncExecutorServiceModule.java | 23 ++--- .../AsyncQueryExecutorServiceSpec.java | 15 ++-- .../AsyncQueryGetResultSpecTest.java | 8 +- ...yncQueryJobMetadataStorageServiceTest.java | 2 +- .../FlintStreamingJobHouseKeeperTaskTest.java | 3 + .../spark/dispatcher/IndexDMLHandlerTest.java | 16 ++-- .../dispatcher/SparkQueryDispatcherTest.java | 19 ++--- .../session/InteractiveSessionTest.java | 16 ++-- .../execution/session/SessionManagerTest.java | 5 +- .../execution/statement/StatementTest.java | 48 ++++++----- ...ueryJobMetadataXContentSerializerTest.java | 4 +- ...AsyncQueryExecutionResponseReaderTest.java | 28 ++++--- 31 files changed, 257 insertions(+), 217 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReaderImpl.java diff --git a/settings.gradle b/settings.gradle index 2140ad6c9e..928f4e90a5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,4 +21,4 @@ include 'prometheus' include 'benchmarks' include 'datasources' include 'spark' - +include 'async-query-core' diff --git a/spark/build.gradle b/spark/build.gradle index c221c4e36c..01a12082c8 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -8,6 +8,7 @@ plugins { id "io.freefair.lombok" id 'jacoco' id 'antlr' + id 'com.github.johnrengelman.shadow' } repositories { @@ -75,6 +76,15 @@ dependencies { testImplementation project(':opensearch') } +shadowJar { + archiveBaseName.set('async-query-core') + archiveVersion.set('1.0.0') // Set the desired version + archiveClassifier.set('all') + + from sourceSets.main.output + configurations = [project.configurations.runtimeClasspath] +} + test { useJUnitPlatform { includeEngines("junit-jupiter") diff --git a/spark/src/main/antlr/SqlBaseLexer.g4 b/spark/src/main/antlr/SqlBaseLexer.g4 index e2b178d34b..83e40c4a20 100644 --- a/spark/src/main/antlr/SqlBaseLexer.g4 +++ b/spark/src/main/antlr/SqlBaseLexer.g4 @@ -182,6 +182,7 @@ ELSE: 'ELSE'; END: 'END'; ESCAPE: 'ESCAPE'; ESCAPED: 'ESCAPED'; +EVOLUTION: 'EVOLUTION'; EXCEPT: 'EXCEPT'; EXCHANGE: 'EXCHANGE'; EXCLUDE: 'EXCLUDE'; diff --git a/spark/src/main/antlr/SqlBaseParser.g4 b/spark/src/main/antlr/SqlBaseParser.g4 index 3d00851658..60b67b0802 100644 --- a/spark/src/main/antlr/SqlBaseParser.g4 +++ b/spark/src/main/antlr/SqlBaseParser.g4 @@ -480,7 +480,7 @@ dmlStatementNoWith | fromClause multiInsertQueryBody+ #multiInsertQuery | DELETE FROM identifierReference tableAlias whereClause? #deleteFromTable | UPDATE identifierReference tableAlias setClause whereClause? #updateTable - | MERGE INTO target=identifierReference targetAlias=tableAlias + | MERGE (WITH SCHEMA EVOLUTION)? INTO target=identifierReference targetAlias=tableAlias USING (source=identifierReference | LEFT_PAREN sourceQuery=query RIGHT_PAREN) sourceAlias=tableAlias ON mergeCondition=booleanExpression @@ -1399,6 +1399,7 @@ ansiNonReserved | DOUBLE | DROP | ESCAPED + | EVOLUTION | EXCHANGE | EXCLUDE | EXISTS @@ -1715,6 +1716,7 @@ nonReserved | END | ESCAPE | ESCAPED + | EVOLUTION | EXCHANGE | EXCLUDE | EXECUTE diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java index cbb5779699..d62f55a5b2 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java @@ -25,7 +25,7 @@ public class AsyncQueryJobMetadata extends StateModel { private final AsyncQueryId queryId; private final String applicationId; private final String jobId; - private final String resultIndex; + private final String resultLocation; // optional sessionId. private final String sessionId; // since 2.13 @@ -41,12 +41,12 @@ public class AsyncQueryJobMetadata extends StateModel { @EqualsAndHashCode.Exclude private final long primaryTerm; public AsyncQueryJobMetadata( - AsyncQueryId queryId, String applicationId, String jobId, String resultIndex) { + AsyncQueryId queryId, String applicationId, String jobId, String resultLocation) { this( queryId, applicationId, jobId, - resultIndex, + resultLocation, null, null, JobType.INTERACTIVE, @@ -59,13 +59,13 @@ public AsyncQueryJobMetadata( AsyncQueryId queryId, String applicationId, String jobId, - String resultIndex, + String resultLocation, String sessionId) { this( queryId, applicationId, jobId, - resultIndex, + resultLocation, sessionId, null, JobType.INTERACTIVE, @@ -78,7 +78,7 @@ public AsyncQueryJobMetadata( AsyncQueryId queryId, String applicationId, String jobId, - String resultIndex, + String resultLocation, String sessionId, String datasourceName, JobType jobType, @@ -87,7 +87,7 @@ public AsyncQueryJobMetadata( queryId, applicationId, jobId, - resultIndex, + resultLocation, sessionId, datasourceName, jobType, @@ -100,7 +100,7 @@ public AsyncQueryJobMetadata( AsyncQueryId queryId, String applicationId, String jobId, - String resultIndex, + String resultLocation, String sessionId, String datasourceName, JobType jobType, @@ -110,7 +110,7 @@ public AsyncQueryJobMetadata( this.queryId = queryId; this.applicationId = applicationId; this.jobId = jobId; - this.resultIndex = resultIndex; + this.resultLocation = resultLocation; this.sessionId = sessionId; this.datasourceName = datasourceName; this.jobType = jobType; @@ -131,7 +131,7 @@ public static AsyncQueryJobMetadata copy( copy.getQueryId(), copy.getApplicationId(), copy.getJobId(), - copy.getResultIndex(), + copy.getResultLocation(), copy.getSessionId(), copy.datasourceName, copy.jobType, diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index e9356e5bed..a414eed48f 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -38,8 +38,8 @@ public class BatchQueryHandler extends AsyncQueryHandler { protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { // either empty json when the result is not available or data with status // Fetch from Result Index - return jobExecutionResponseReader.getResultFromOpensearchIndex( - asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()); + return jobExecutionResponseReader.getResultWithJobId( + asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultLocation()); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index 233e2d14c6..3f412dab6d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -15,7 +15,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import org.opensearch.client.Client; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; @@ -54,8 +53,6 @@ public class IndexDMLHandler extends AsyncQueryHandler { private final FlintIndexStateModelService flintIndexStateModelService; private final IndexDMLResultStorageService indexDMLResultStorageService; - private final Client client; - public static boolean isIndexDMLQuery(String jobId) { return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId); } @@ -137,7 +134,9 @@ private void executeIndexOp( case VACUUM: FlintIndexOp indexVacuumOp = new FlintIndexOpVacuum( - flintIndexStateModelService, dispatchQueryRequest.getDatasource(), client); + flintIndexStateModelService, + dispatchQueryRequest.getDatasource(), + flintIndexMetadataService); indexVacuumOp.apply(indexMetadata); break; default: @@ -163,7 +162,7 @@ private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails) protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { String queryId = asyncQueryJobMetadata.getQueryId().getId(); return jobExecutionResponseReader.getResultWithQueryId( - queryId, asyncQueryJobMetadata.getResultIndex()); + queryId, asyncQueryJobMetadata.getResultLocation()); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index 7602988d26..a794fbbc92 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -45,7 +45,7 @@ public class InteractiveQueryHandler extends AsyncQueryHandler { protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { String queryId = asyncQueryJobMetadata.getQueryId().getId(); return jobExecutionResponseReader.getResultWithQueryId( - queryId, asyncQueryJobMetadata.getResultIndex()); + queryId, asyncQueryJobMetadata.getResultLocation()); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index d3d0e9ec94..cfeb72c0fe 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -9,7 +9,6 @@ import java.util.Map; import lombok.AllArgsConstructor; import org.json.JSONObject; -import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; @@ -48,8 +47,6 @@ public class SparkQueryDispatcher { private FlintIndexMetadataService flintIndexMetadataService; - private Client client; - private SessionManager sessionManager; private LeaseManager leaseManager; @@ -166,8 +163,7 @@ private IndexDMLHandler createIndexDMLHandler(EMRServerlessClient emrServerlessC jobExecutionResponseReader, flintIndexMetadataService, flintIndexStateModelService, - indexDMLResultStorageService, - client); + indexDMLResultStorageService); } // TODO: Revisit this logic. diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java index 82d4b2b3a4..033b1e7e6b 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -24,7 +24,6 @@ import org.opensearch.sql.spark.execution.statement.StatementId; import org.opensearch.sql.spark.execution.statement.StatementStorageService; import org.opensearch.sql.spark.execution.statestore.SessionStorageService; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.rest.model.LangType; import org.opensearch.sql.spark.utils.TimeProvider; @@ -41,7 +40,6 @@ public class InteractiveSession implements Session { public static final String SESSION_ID_TAG_KEY = "sid"; private final SessionId sessionId; - private final StateStore stateStore; private final StatementStorageService statementStorageService; private final SessionStorageService sessionStorageService; private final EMRServerlessClient serverlessClient; @@ -103,7 +101,6 @@ public StatementId submit(QueryRequest request) { .sessionId(sessionId) .applicationId(sessionModel.getApplicationId()) .jobId(sessionModel.getJobId()) - .stateStore(stateStore) .statementStorageService(statementStorageService) .statementId(statementId) .langType(LangType.SQL) @@ -139,7 +136,6 @@ public Optional get(StatementId stID) { .langType(model.getLangType()) .query(model.getQuery()) .queryId(model.getQueryId()) - .stateStore(stateStore) .statementStorageService(statementStorageService) .statementModel(model) .build()); diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java new file mode 100644 index 0000000000..081c102fce --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java @@ -0,0 +1,6 @@ +package org.opensearch.sql.spark.execution.session; + +public interface SessionConfigSupplier { + + Long getSessionInactivityTimeoutMillis(); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index 1babd8712d..4be59c86a0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -5,15 +5,12 @@ package org.opensearch.sql.spark.execution.session; -import static org.opensearch.sql.common.setting.Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS; import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; import java.util.Optional; -import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statement.StatementStorageService; import org.opensearch.sql.spark.execution.statestore.SessionStorageService; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.utils.RealTimeProvider; /** @@ -22,31 +19,26 @@ *

todo. add Session cache and Session sweeper. */ public class SessionManager { - private final StateStore stateStore; private final StatementStorageService statementStorageService; - private final SessionStorageService sessionStorageService; private final EMRServerlessClientFactory emrServerlessClientFactory; - private Settings settings; + private SessionConfigSupplier sessionConfigSupplier; public SessionManager( - StateStore stateStore, StatementStorageService statementStorageService, SessionStorageService sessionStorageService, EMRServerlessClientFactory emrServerlessClientFactory, - Settings settings) { - this.stateStore = stateStore; + SessionConfigSupplier sessionConfigSupplier) { this.statementStorageService = statementStorageService; this.sessionStorageService = sessionStorageService; this.emrServerlessClientFactory = emrServerlessClientFactory; - this.settings = settings; + this.sessionConfigSupplier = sessionConfigSupplier; } public Session createSession(CreateSessionRequest request) { InteractiveSession session = InteractiveSession.builder() .sessionId(newSessionId(request.getDatasourceName())) - .stateStore(stateStore) .statementStorageService(statementStorageService) .sessionStorageService(sessionStorageService) .serverlessClient(emrServerlessClientFactory.getClient()) @@ -80,13 +72,12 @@ public Optional getSession(SessionId sid, String dataSourceName) { InteractiveSession session = InteractiveSession.builder() .sessionId(sid) - .stateStore(stateStore) .statementStorageService(statementStorageService) .sessionStorageService(sessionStorageService) .serverlessClient(emrServerlessClientFactory.getClient()) .sessionModel(model.get()) .sessionInactivityTimeoutMilli( - settings.getSettingValue(SESSION_INACTIVITY_TIMEOUT_MILLIS)) + sessionConfigSupplier.getSessionInactivityTimeoutMillis()) .timeProvider(new RealTimeProvider()) .build(); return Optional.ofNullable(session); diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java index e42af72c7a..80a354e400 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java @@ -15,7 +15,6 @@ import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.sql.spark.execution.session.SessionId; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.rest.model.LangType; /** Statement represent query to execute in session. One statement map to one session. */ @@ -32,7 +31,6 @@ public class Statement { private final String datasourceName; private final String query; private final String queryId; - private final StateStore stateStore; private final StatementStorageService statementStorageService; @Setter private StatementModel statementModel; diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java index 4b9790ca96..7aa56a1af8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java @@ -35,7 +35,7 @@ public XContentBuilder toXContent(AsyncQueryJobMetadata jobMetadata, ToXContent. .field("type", TYPE_JOBMETA) .field("jobId", jobMetadata.getJobId()) .field("applicationId", jobMetadata.getApplicationId()) - .field("resultIndex", jobMetadata.getResultIndex()) + .field("resultIndex", jobMetadata.getResultLocation()) .field("sessionId", jobMetadata.getSessionId()) .field(DATASOURCE_NAME, jobMetadata.getDatasourceName()) .field(JOB_TYPE, jobMetadata.getJobType().getText().toLowerCase(Locale.ROOT)) diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataService.java index ad274e429e..d460c8e4c5 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataService.java @@ -27,4 +27,11 @@ public interface FlintIndexMetadataService { * @param flintIndexOptions flintIndexOptions. */ void updateIndexToManualRefresh(String indexName, FlintIndexOptions flintIndexOptions); + + /** + * Deletes FlintIndex. + * + * @param indexName indexName. + */ + void deleteFlintIndex(String indexName); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java index 893b33b39d..d490fa4c21 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java @@ -31,7 +31,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; @@ -87,6 +89,14 @@ public void updateIndexToManualRefresh(String indexName, FlintIndexOptions flint client.admin().indices().preparePutMapping(indexName).setSource(flintMetadataMap).get(); } + @Override + public void deleteFlintIndex(String indexName) { + LOGGER.info("Vacuuming Flint index {}", indexName); + DeleteIndexRequest request = new DeleteIndexRequest().indices(indexName); + AcknowledgedResponse response = client.admin().indices().delete(request).actionGet(); + LOGGER.info("OpenSearch index delete result: {}", response.isAcknowledged()); + } + private void validateFlintIndexOptions( String kind, Map existingOptions, Map newOptions) { if ((newOptions.containsKey(INCREMENTAL_REFRESH) diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java index 6e0d386664..6d6d01d249 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java @@ -7,10 +7,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.client.Client; import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; @@ -21,14 +19,14 @@ public class FlintIndexOpVacuum extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(); /** OpenSearch client. */ - private final Client client; + private final FlintIndexMetadataService flintIndexMetadataService; public FlintIndexOpVacuum( FlintIndexStateModelService flintIndexStateModelService, String datasourceName, - Client client) { + FlintIndexMetadataService flintIndexMetadataService) { super(flintIndexStateModelService, datasourceName); - this.client = client; + this.flintIndexMetadataService = flintIndexMetadataService; } @Override @@ -43,11 +41,7 @@ FlintIndexState transitioningState() { @Override public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) { - LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName()); - DeleteIndexRequest request = - new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName()); - AcknowledgedResponse response = client.admin().indices().delete(request).actionGet(); - LOG.info("OpenSearch index delete result: {}", response.isAcknowledged()); + flintIndexMetadataService.deleteFlintIndex(flintIndexMetadata.getOpensearchIndexName()); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index e4773310f0..5eff49c0cc 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -1,79 +1,23 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - package org.opensearch.sql.spark.response; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; -import static org.opensearch.sql.spark.data.constants.SparkConstants.JOB_ID_FIELD; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.Client; -import org.opensearch.common.action.ActionFuture; -import org.opensearch.index.IndexNotFoundException; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.SearchHit; -import org.opensearch.search.builder.SearchSourceBuilder; - -public class JobExecutionResponseReader { - private final Client client; - private static final Logger LOG = LogManager.getLogger(); +public interface JobExecutionResponseReader { /** - * JobExecutionResponseReader for spark query. + * Retrieves the result from the OpenSearch index based on the job ID. * - * @param client Opensearch client + * @param jobId The job ID. + * @param resultLocation The location identifier where the result is stored (optional). + * @return A JSONObject containing the result data. */ - public JobExecutionResponseReader(Client client) { - this.client = client; - } - - public JSONObject getResultFromOpensearchIndex(String jobId, String resultIndex) { - return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, jobId), resultIndex); - } + JSONObject getResultWithJobId(String jobId, String resultLocation); - public JSONObject getResultWithQueryId(String queryId, String resultIndex) { - return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultIndex); - } - - private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { - SearchRequest searchRequest = new SearchRequest(); - String searchResultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; - searchRequest.indices(searchResultIndex); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(query); - searchRequest.source(searchSourceBuilder); - ActionFuture searchResponseActionFuture; - JSONObject data = new JSONObject(); - try { - searchResponseActionFuture = client.search(searchRequest); - } catch (IndexNotFoundException e) { - // if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty - // json - LOG.info(resultIndex + " is not created yet."); - return data; - } catch (Exception e) { - throw new RuntimeException(e); - } - SearchResponse searchResponse = searchResponseActionFuture.actionGet(); - if (searchResponse.status().getStatus() != 200) { - throw new RuntimeException( - "Fetching result from " - + searchResultIndex - + " index failed with status : " - + searchResponse.status()); - } else { - for (SearchHit searchHit : searchResponse.getHits().getHits()) { - data.put(DATA_FIELD, searchHit.getSourceAsMap()); - } - return data; - } - } + /** + * Retrieves the result from the OpenSearch index based on the query ID. + * + * @param queryId The query ID. + * @param resultLocation The location identifier where the result is stored (optional). + * @return A JSONObject containing the result data. + */ + JSONObject getResultWithQueryId(String queryId, String resultLocation); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReaderImpl.java new file mode 100644 index 0000000000..4a0ce04a69 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReaderImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.response; + +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.JOB_ID_FIELD; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; + +public class JobExecutionResponseReaderImpl implements JobExecutionResponseReader { + private final Client client; + private static final Logger LOG = LogManager.getLogger(); + + /** + * JobExecutionResponseReader for spark query. + * + * @param client Opensearch client + */ + public JobExecutionResponseReaderImpl(Client client) { + this.client = client; + } + + @Override + public JSONObject getResultWithJobId(String jobId, String resultLocation) { + return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, jobId), resultLocation); + } + + @Override + public JSONObject getResultWithQueryId(String queryId, String resultLocation) { + return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultLocation); + } + + private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { + SearchRequest searchRequest = new SearchRequest(); + String searchResultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + searchRequest.indices(searchResultIndex); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + ActionFuture searchResponseActionFuture; + JSONObject data = new JSONObject(); + try { + searchResponseActionFuture = client.search(searchRequest); + } catch (IndexNotFoundException e) { + // if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty + // json + LOG.info(resultIndex + " is not created yet."); + return data; + } catch (Exception e) { + throw new RuntimeException(e); + } + SearchResponse searchResponse = searchResponseActionFuture.actionGet(); + if (searchResponse.status().getStatus() != 200) { + throw new RuntimeException( + "Fetching result from " + + searchResultIndex + + " index failed with status : " + + searchResponse.status()); + } else { + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + data.put(DATA_FIELD, searchHit.getSourceAsMap()); + } + return data; + } + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index e0e9283d4c..379508bf30 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -26,6 +26,7 @@ import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; +import org.opensearch.sql.spark.execution.session.SessionConfigSupplier; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statement.OpenSearchStatementStorageService; import org.opensearch.sql.spark.execution.statement.StatementStorageService; @@ -38,7 +39,7 @@ import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService; import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.response.JobExecutionResponseReaderImpl; @RequiredArgsConstructor public class AsyncExecutorServiceModule extends AbstractModule { @@ -75,9 +76,8 @@ public StateStore stateStore(NodeClient client, ClusterService clusterService) { public SparkQueryDispatcher sparkQueryDispatcher( EMRServerlessClientFactory emrServerlessClientFactory, DataSourceService dataSourceService, - JobExecutionResponseReader jobExecutionResponseReader, + JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl, FlintIndexMetadataServiceImpl flintIndexMetadataReader, - NodeClient client, SessionManager sessionManager, DefaultLeaseManager defaultLeaseManager, FlintIndexStateModelService flintIndexStateModelService, @@ -85,9 +85,8 @@ public SparkQueryDispatcher sparkQueryDispatcher( return new SparkQueryDispatcher( emrServerlessClientFactory, dataSourceService, - jobExecutionResponseReader, + jobExecutionResponseReaderImpl, flintIndexMetadataReader, - client, sessionManager, defaultLeaseManager, flintIndexStateModelService, @@ -100,13 +99,12 @@ public SessionManager sessionManager( StatementStorageService statementStorageService, SessionStorageService sessionStorageService, EMRServerlessClientFactory emrServerlessClientFactory, - Settings settings) { + SessionConfigSupplier sessionConfigSupplier) { return new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - settings); + sessionConfigSupplier); } @Provides @@ -153,8 +151,13 @@ public FlintIndexMetadataServiceImpl flintIndexMetadataReader(NodeClient client) } @Provides - public JobExecutionResponseReader jobExecutionResponseReader(NodeClient client) { - return new JobExecutionResponseReader(client); + public JobExecutionResponseReaderImpl jobExecutionResponseReader(NodeClient client) { + return new JobExecutionResponseReaderImpl(client); + } + + @Provides + public SessionConfigSupplier sessionConfigSupplier(Settings settings) { + return () -> settings.getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS); } private void registerStateStoreMetrics(StateStore stateStore) { diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index af608b423a..622bf6c4e0 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -72,7 +72,7 @@ import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService; import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.response.JobExecutionResponseReaderImpl; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.test.OpenSearchIntegTestCase; @@ -206,13 +206,13 @@ private DataSourceServiceImpl createDataSourceService() { protected AsyncQueryExecutorService createAsyncQueryExecutorService( EMRServerlessClientFactory emrServerlessClientFactory) { return createAsyncQueryExecutorService( - emrServerlessClientFactory, new JobExecutionResponseReader(client)); + emrServerlessClientFactory, new JobExecutionResponseReaderImpl(client)); } /** Pass a custom response reader which can mock interaction between PPL plugin and EMR-S job. */ protected AsyncQueryExecutorService createAsyncQueryExecutorService( EMRServerlessClientFactory emrServerlessClientFactory, - JobExecutionResponseReader jobExecutionResponseReader) { + JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl) { StateStore stateStore = new StateStore(client, clusterService); AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = new OpensearchAsyncQueryJobMetadataStorageService(stateStore); @@ -220,15 +220,16 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( new SparkQueryDispatcher( emrServerlessClientFactory, this.dataSourceService, - jobExecutionResponseReader, + jobExecutionResponseReaderImpl, new FlintIndexMetadataServiceImpl(client), - client, new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - pluginSettings), + () -> + pluginSettings.getSettingValue( + org.opensearch.sql.common.setting.Settings.Key + .SESSION_INACTIVITY_TIMEOUT_MILLIS)), new DefaultLeaseManager(pluginSettings, stateStore), flintIndexStateModelService, indexDMLResultStorageService); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index ac2d6b3690..3cd96b58f8 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -30,7 +30,7 @@ import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.flint.FlintIndexType; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.response.JobExecutionResponseReaderImpl; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; import org.opensearch.sql.spark.rest.model.LangType; @@ -421,9 +421,9 @@ private class AssertionHelper { * current interaction. Intercept both get methods for different query handler which * will only call either of them. */ - new JobExecutionResponseReader(client) { + new JobExecutionResponseReaderImpl(client) { @Override - public JSONObject getResultFromOpensearchIndex(String jobId, String resultIndex) { + public JSONObject getResultWithJobId(String jobId, String resultIndex) { return interaction.interact(new InteractionStep(emrClient, jobId, resultIndex)); } @@ -492,7 +492,7 @@ private InteractionStep(LocalEMRSClient emrClient, String queryId, String result /** Simulate PPL plugin search query_execution_result */ JSONObject pluginSearchQueryResult() { - return new JobExecutionResponseReader(client).getResultWithQueryId(queryId, resultIndex); + return new JobExecutionResponseReaderImpl(client).getResultWithQueryId(queryId, resultIndex); } /** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */ diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java index 20c944fd0a..6f19ef0c44 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java @@ -69,7 +69,7 @@ public void testStoreJobMetadataWithResultExtraData() { assertTrue(actual.isPresent()); assertEquals(expected, actual.get()); - assertEquals("resultIndex", actual.get().getResultIndex()); + assertEquals("resultIndex", actual.get().getResultLocation()); assertEquals(MOCK_SESSION_ID, actual.get().getSessionId()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java index 008f976542..950fe5347f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java @@ -511,6 +511,9 @@ public Map getFlintIndexMetadata(String indexPattern @Override public void updateIndexToManualRefresh( String indexName, FlintIndexOptions flintIndexOptions) {} + + @Override + public void deleteFlintIndex(String indexName) {} }; FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java index df4ce67c82..59b909a923 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java @@ -38,14 +38,14 @@ import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.response.JobExecutionResponseReaderImpl; import org.opensearch.sql.spark.rest.model.LangType; @ExtendWith(MockitoExtension.class) class IndexDMLHandlerTest { @Mock private EMRServerlessClient emrServerlessClient; - @Mock private JobExecutionResponseReader jobExecutionResponseReader; + @Mock private JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl; @Mock private FlintIndexMetadataService flintIndexMetadataService; @Mock private FlintIndexStateModelService flintIndexStateModelService; @Mock private IndexDMLResultStorageService indexDMLResultStorageService; @@ -54,7 +54,7 @@ class IndexDMLHandlerTest { @Test public void getResponseFromExecutor() { JSONObject result = - new IndexDMLHandler(null, null, null, null, null, null).getResponseFromExecutor(null); + new IndexDMLHandler(null, null, null, null, null).getResponseFromExecutor(null); assertEquals("running", result.getString(STATUS_FIELD)); assertEquals("", result.getString(ERROR_FIELD)); @@ -65,11 +65,10 @@ public void testWhenIndexDetailsAreNotFound() { IndexDMLHandler indexDMLHandler = new IndexDMLHandler( emrServerlessClient, - jobExecutionResponseReader, + jobExecutionResponseReaderImpl, flintIndexMetadataService, flintIndexStateModelService, - indexDMLResultStorageService, - client); + indexDMLResultStorageService); DispatchQueryRequest dispatchQueryRequest = new DispatchQueryRequest( EMRS_APPLICATION_ID, @@ -108,11 +107,10 @@ public void testWhenIndexDetailsWithInvalidQueryActionType() { IndexDMLHandler indexDMLHandler = new IndexDMLHandler( emrServerlessClient, - jobExecutionResponseReader, + jobExecutionResponseReaderImpl, flintIndexMetadataService, flintIndexStateModelService, - indexDMLResultStorageService, - client); + indexDMLResultStorageService); DispatchQueryRequest dispatchQueryRequest = new DispatchQueryRequest( EMRS_APPLICATION_ID, diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 3df5b23dc4..8854058ec5 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -76,7 +76,7 @@ import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; import org.opensearch.sql.spark.leasemanager.LeaseManager; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.response.JobExecutionResponseReaderImpl; import org.opensearch.sql.spark.rest.model.LangType; @ExtendWith(MockitoExtension.class) @@ -85,7 +85,7 @@ public class SparkQueryDispatcherTest { @Mock private EMRServerlessClient emrServerlessClient; @Mock private EMRServerlessClientFactory emrServerlessClientFactory; @Mock private DataSourceService dataSourceService; - @Mock private JobExecutionResponseReader jobExecutionResponseReader; + @Mock private JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl; @Mock private FlintIndexMetadataService flintIndexMetadataService; @Mock(answer = RETURNS_DEEP_STUBS) @@ -117,9 +117,8 @@ void setUp() { new SparkQueryDispatcher( emrServerlessClientFactory, dataSourceService, - jobExecutionResponseReader, + jobExecutionResponseReaderImpl, flintIndexMetadataService, - openSearchClient, sessionManager, leaseManager, flintIndexStateModelService, @@ -818,7 +817,7 @@ void testGetQueryResponse() { .thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.PENDING))); // simulate result index is not created yet - when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)) + when(jobExecutionResponseReaderImpl.getResultWithJobId(EMR_JOB_ID, null)) .thenReturn(new JSONObject()); JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata()); Assertions.assertEquals("PENDING", result.get("status")); @@ -832,7 +831,7 @@ void testGetQueryResponseWithSession() { doReturn(StatementState.WAITING).when(statement).getStatementState(); doReturn(new JSONObject()) - .when(jobExecutionResponseReader) + .when(jobExecutionResponseReaderImpl) .getResultWithQueryId(eq(MOCK_STATEMENT_ID), any()); JSONObject result = sparkQueryDispatcher.getQueryResponse( @@ -846,7 +845,7 @@ void testGetQueryResponseWithSession() { void testGetQueryResponseWithInvalidSession() { doReturn(Optional.empty()).when(sessionManager).getSession(eq(new SessionId(MOCK_SESSION_ID))); doReturn(new JSONObject()) - .when(jobExecutionResponseReader) + .when(jobExecutionResponseReaderImpl) .getResultWithQueryId(eq(MOCK_STATEMENT_ID), any()); IllegalArgumentException exception = Assertions.assertThrows( @@ -865,7 +864,7 @@ void testGetQueryResponseWithStatementNotExist() { doReturn(Optional.of(session)).when(sessionManager).getSession(new SessionId(MOCK_SESSION_ID)); doReturn(Optional.empty()).when(session).get(any()); doReturn(new JSONObject()) - .when(jobExecutionResponseReader) + .when(jobExecutionResponseReaderImpl) .getResultWithQueryId(eq(MOCK_STATEMENT_ID), any()); IllegalArgumentException exception = @@ -886,10 +885,10 @@ void testGetQueryResponseWithSuccess() { resultMap.put(STATUS_FIELD, "SUCCESS"); resultMap.put(ERROR_FIELD, ""); queryResult.put(DATA_FIELD, resultMap); - when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)) + when(jobExecutionResponseReaderImpl.getResultWithJobId(EMR_JOB_ID, null)) .thenReturn(queryResult); JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata()); - verify(jobExecutionResponseReader, times(1)).getResultFromOpensearchIndex(EMR_JOB_ID, null); + verify(jobExecutionResponseReaderImpl, times(1)).getResultWithJobId(EMR_JOB_ID, null); Assertions.assertEquals( new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet()); JSONObject dataJson = new JSONObject(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index 9a646e0cda..3601922e23 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; @@ -67,7 +68,6 @@ public void openCloseSession() { InteractiveSession session = InteractiveSession.builder() .sessionId(sessionId) - .stateStore(stateStore) .statementStorageService(statementStorageService) .sessionStorageService(sessionStorageService) .serverlessClient(emrsClient) @@ -95,7 +95,6 @@ public void openSessionFailedConflict() { InteractiveSession session = InteractiveSession.builder() .sessionId(sessionId) - .stateStore(stateStore) .sessionStorageService(sessionStorageService) .statementStorageService(statementStorageService) .serverlessClient(emrsClient) @@ -105,7 +104,6 @@ public void openSessionFailedConflict() { InteractiveSession duplicateSession = InteractiveSession.builder() .sessionId(sessionId) - .stateStore(stateStore) .statementStorageService(statementStorageService) .sessionStorageService(sessionStorageService) .serverlessClient(emrsClient) @@ -122,7 +120,6 @@ public void closeNotExistSession() { InteractiveSession session = InteractiveSession.builder() .sessionId(sessionId) - .stateStore(stateStore) .statementStorageService(statementStorageService) .sessionStorageService(sessionStorageService) .serverlessClient(emrsClient) @@ -141,11 +138,12 @@ public void sessionManagerCreateSession() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); TestSession testSession = testSession(session, sessionStorageService); @@ -157,11 +155,10 @@ public void sessionManagerGetSession() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; SessionManager sessionManager = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()); + () -> sessionSetting().getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)); Session session = sessionManager.createSession(createSessionRequest()); Optional managerSession = sessionManager.getSession(session.getSessionId()); @@ -174,11 +171,10 @@ public void sessionManagerGetSessionNotExist() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; SessionManager sessionManager = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()); + () -> sessionSetting().getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)); Optional managerSession = sessionManager.getSession(SessionId.newSessionId("no-exist")); diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java index f4e9fa4e8b..88dac149cf 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -33,11 +33,12 @@ public class SessionManagerTest { public void sessionEnable() { Assertions.assertTrue( new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .isEnabled()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java index 29a2c9f87c..ead374d76a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.session.InteractiveSessionTest; @@ -70,7 +71,6 @@ public void openThenCancelStatement() { .datasourceName(DS_NAME) .query("query") .queryId("statementId") - .stateStore(stateStore) .statementStorageService(statementStorageService) .build(); @@ -97,7 +97,6 @@ public void openFailedBecauseConflict() { .datasourceName(DS_NAME) .query("query") .queryId("statementId") - .stateStore(stateStore) .statementStorageService(statementStorageService) .build(); st.open(); @@ -113,7 +112,6 @@ public void openFailedBecauseConflict() { .datasourceName(DS_NAME) .query("query") .queryId("statementId") - .stateStore(stateStore) .statementStorageService(statementStorageService) .build(); IllegalStateException exception = assertThrows(IllegalStateException.class, dupSt::open); @@ -133,7 +131,6 @@ public void cancelNotExistStatement() { .datasourceName(DS_NAME) .query("query") .queryId("statementId") - .stateStore(stateStore) .statementStorageService(statementStorageService) .build(); st.open(); @@ -159,7 +156,6 @@ public void cancelFailedBecauseOfConflict() { .datasourceName(DS_NAME) .query("query") .queryId("statementId") - .stateStore(stateStore) .statementStorageService(statementStorageService) .build(); st.open(); @@ -248,7 +244,6 @@ public void cancelRunningStatementSuccess() { .datasourceName(DS_NAME) .query("query") .queryId("statementId") - .stateStore(stateStore) .statementStorageService(statementStorageService) .build(); @@ -270,11 +265,12 @@ public void submitStatementInRunningSession() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); // App change state to running @@ -290,11 +286,12 @@ public void submitStatementInNotStartedState() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); StatementId statementId = session.submit(queryRequest()); @@ -306,11 +303,12 @@ public void failToSubmitStatementInDeadState() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.DEAD, DS_NAME); @@ -328,11 +326,12 @@ public void failToSubmitStatementInFailState() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.FAIL, DS_NAME); @@ -350,11 +349,12 @@ public void newStatementFieldAssert() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); StatementId statementId = session.submit(queryRequest()); Optional statement = session.get(statementId); @@ -374,11 +374,12 @@ public void failToSubmitStatementInDeletedSession() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); // other's delete session @@ -396,11 +397,12 @@ public void getStatementSuccess() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); // App change state to running sessionStorageService.updateSessionState( @@ -418,11 +420,12 @@ public void getStatementNotExist() { EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; Session session = new SessionManager( - stateStore, statementStorageService, sessionStorageService, emrServerlessClientFactory, - sessionSetting()) + () -> + sessionSetting() + .getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)) .createSession(createSessionRequest()); // App change state to running sessionStorageService.updateSessionState( @@ -496,7 +499,6 @@ private Statement createStatement(StatementId stId) { .datasourceName(DS_NAME) .query("query") .queryId("statementId") - .stateStore(stateStore) .statementStorageService(statementStorageService) .build(); st.open(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java index d393c383c6..02f5f5c0c6 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java @@ -79,7 +79,7 @@ void fromXContentShouldDeserializeAsyncQueryJobMetadata() throws Exception { assertEquals("query1", jobMetadata.getQueryId().getId()); assertEquals("job1", jobMetadata.getJobId()); assertEquals("app1", jobMetadata.getApplicationId()); - assertEquals("result1", jobMetadata.getResultIndex()); + assertEquals("result1", jobMetadata.getResultLocation()); assertEquals("session1", jobMetadata.getSessionId()); assertEquals("datasource1", jobMetadata.getDatasourceName()); assertEquals(JobType.INTERACTIVE, jobMetadata.getJobType()); @@ -175,7 +175,7 @@ void fromXContentShouldDeserializeAsyncQueryWithJobTypeNUll() throws Exception { assertEquals("query1", jobMetadata.getQueryId().getId()); assertEquals("job1", jobMetadata.getJobId()); assertEquals("app1", jobMetadata.getApplicationId()); - assertEquals("result1", jobMetadata.getResultIndex()); + assertEquals("result1", jobMetadata.getResultLocation()); assertEquals("session1", jobMetadata.getSessionId()); assertEquals("datasource1", jobMetadata.getDatasourceName()); assertNull(jobMetadata.getJobType()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java index bbaf6f0f59..e76be5f955 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java @@ -46,9 +46,9 @@ public void testGetResultFromOpensearchIndex() { new SearchHits( new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID)); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); - assertFalse( - jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null).isEmpty()); + JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl = + new JobExecutionResponseReaderImpl(client); + assertFalse(jobExecutionResponseReaderImpl.getResultWithJobId(EMR_JOB_ID, null).isEmpty()); } @Test @@ -61,9 +61,9 @@ public void testGetResultFromCustomIndex() { new SearchHits( new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID)); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); - assertFalse( - jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty()); + JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl = + new JobExecutionResponseReaderImpl(client); + assertFalse(jobExecutionResponseReaderImpl.getResultWithJobId(EMR_JOB_ID, "foo").isEmpty()); } @Test @@ -72,11 +72,12 @@ public void testInvalidSearchResponse() { when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); when(searchResponse.status()).thenReturn(RestStatus.NO_CONTENT); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl = + new JobExecutionResponseReaderImpl(client); RuntimeException exception = assertThrows( RuntimeException.class, - () -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)); + () -> jobExecutionResponseReaderImpl.getResultWithJobId(EMR_JOB_ID, null)); Assertions.assertEquals( "Fetching result from " + DEFAULT_RESULT_INDEX @@ -88,17 +89,18 @@ public void testInvalidSearchResponse() { @Test public void testSearchFailure() { when(client.search(any())).thenThrow(RuntimeException.class); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl = + new JobExecutionResponseReaderImpl(client); assertThrows( RuntimeException.class, - () -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)); + () -> jobExecutionResponseReaderImpl.getResultWithJobId(EMR_JOB_ID, null)); } @Test public void testIndexNotFoundException() { when(client.search(any())).thenThrow(IndexNotFoundException.class); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); - assertTrue( - jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty()); + JobExecutionResponseReaderImpl jobExecutionResponseReaderImpl = + new JobExecutionResponseReaderImpl(client); + assertTrue(jobExecutionResponseReaderImpl.getResultWithJobId(EMR_JOB_ID, "foo").isEmpty()); } }