Skip to content

Commit

Permalink
Fix Session state bug and improve Query Efficiency in REPL (#245)
Browse files Browse the repository at this point in the history
* Fix Session state bug and improve Query Efficiency in REPL

This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.

Signed-off-by: Kaituo Li <[email protected]>

* added Java doc

Signed-off-by: Kaituo Li <[email protected]>

* fix IT by restore env variable change

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Feb 9, 2024
1 parent f446de0 commit 9c15194
Show file tree
Hide file tree
Showing 23 changed files with 1,279 additions and 59 deletions.
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ lazy val commonSettings = Seq(
testScalastyle := (Test / scalastyle).toTask("").value,
Test / test := ((Test / test) dependsOn testScalastyle).value)

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication)
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand Down Expand Up @@ -159,7 +160,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test" )
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.settings(
commonSettings,
name := "integ-test",
Expand All @@ -175,7 +176,9 @@ lazy val integtest = (project in file("integ-test"))
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value))
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value,
(sparkSqlApplication / assembly).value
))

lazy val standaloneCosmetic = project
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class FlintOptions implements Serializable {

public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000;

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* {@link OpenSearchReader} using search. https://opensearch.org/docs/latest/api-reference/search/
*/
public class OpenSearchQueryReader extends OpenSearchReader {

private static final Logger LOG = Logger.getLogger(OpenSearchQueryReader.class.getName());

public OpenSearchQueryReader(IRestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder));
}

/**
* search.
*/
Optional<SearchResponse> search(SearchRequest request) throws IOException {
return Optional.of(client.search(request, RequestOptions.DEFAULT));
}

/**
* nothing to clean
*/
void clean() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.flint.core.IRestHighLevelClient;
Expand Down Expand Up @@ -48,6 +49,13 @@ public OpenSearchReader(IRestHighLevelClient client, SearchRequest searchRequest
iterator = searchHits.iterator();
}
return iterator.hasNext();
} catch (OpenSearchStatusException e) {
// e.g., org.opensearch.OpenSearchStatusException: OpenSearch exception [type=index_not_found_exception, reason=no such index [query_results2]]
if (e.getMessage() != null && (e.getMessage().contains("index_not_found_exception"))) {
return false;
} else {
throw e;
}
} catch (IOException e) {
// todo. log error.
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,20 @@ public void testGetDimensionsFromSystemEnv() throws NoSuchFieldException, Illega
Field field = classOfMap.getDeclaredField("m");
field.setAccessible(true);
Map<String, String> writeableEnvironmentVariables = (Map<String, String>)field.get(System.getenv());
writeableEnvironmentVariables.put("TEST_VAR", "dummy1");
writeableEnvironmentVariables.put("SERVERLESS_EMR_JOB_ID", "dummy2");
Dimension result1 = DimensionUtils.constructDimension("TEST_VAR", parts);
assertEquals("TEST_VAR", result1.getName());
assertEquals("dummy1", result1.getValue());
Dimension result2 = DimensionUtils.constructDimension("jobId", parts);
assertEquals("jobId", result2.getName());
assertEquals("dummy2", result2.getValue());
try {
writeableEnvironmentVariables.put("TEST_VAR", "dummy1");
writeableEnvironmentVariables.put("SERVERLESS_EMR_JOB_ID", "dummy2");
Dimension result1 = DimensionUtils.constructDimension("TEST_VAR", parts);
assertEquals("TEST_VAR", result1.getName());
assertEquals("dummy1", result1.getValue());
Dimension result2 = DimensionUtils.constructDimension("jobId", parts);
assertEquals("jobId", result2.getName());
assertEquals("dummy2", result2.getValue());
} finally {
// since system environment is shared by other tests. Make sure to remove them before exiting.
writeableEnvironmentVariables.remove("SERVERLESS_EMR_JOB_ID");
writeableEnvironmentVariables.remove("TEST_VAR");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,30 @@ object FlintSparkConf {
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))

val DATA_SOURCE_NAME =
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
.createOptional()
val JOB_TYPE =
FlintConfig(s"spark.flint.job.type")
.doc("Flint job type. Including interactive and streaming")
.createWithDefault("interactive")
val SESSION_ID =
FlintConfig(s"spark.flint.job.sessionId")
.doc("Flint session id")
.createOptional()
val REQUEST_INDEX =
FlintConfig(s"spark.flint.job.requestIndex")
.doc("Request index")
.createOptional()
val EXCLUDE_JOB_IDS =
FlintConfig(s"spark.flint.deployment.excludeJobs")
.doc("Exclude job ids")
.createOptional()
val REPL_INACTIVITY_TIMEOUT_MILLIS =
FlintConfig(s"spark.flint.job.inactivityLimitMillis")
.doc("inactivity timeout")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_INACTIVITY_LIMIT_MILLIS))
}

/**
Expand Down Expand Up @@ -196,11 +219,18 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD,
SOCKET_TIMEOUT_MILLIS)
SOCKET_TIMEOUT_MILLIS,
JOB_TYPE,
REPL_INACTIVITY_TIMEOUT_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

val optionsWithoutDefault = Seq(RETRYABLE_EXCEPTION_CLASS_NAMES)
val optionsWithoutDefault = Seq(
RETRYABLE_EXCEPTION_CLASS_NAMES,
DATA_SOURCE_NAME,
SESSION_ID,
REQUEST_INDEX,
EXCLUDE_JOB_IDS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.flatMap {
case (_, None) => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@ class FlintInstance(
val lastUpdateTime: Long,
val jobStartTime: Long = 0,
val excludedJobIds: Seq[String] = Seq.empty[String],
val error: Option[String] = None) {}
val error: Option[String] = None) {
override def toString: String = {
val excludedJobIdsStr = excludedJobIds.mkString("[", ", ", "]")
val errorStr = error.getOrElse("None")
s"FlintInstance(applicationId=$applicationId, jobId=$jobId, sessionId=$sessionId, state=$state, " +
s"lastUpdateTime=$lastUpdateTime, jobStartTime=$jobStartTime, excludedJobIds=$excludedJobIdsStr, error=$errorStr)"
}
}

object FlintInstance {

Expand Down
Loading

0 comments on commit 9c15194

Please sign in to comment.