From def26cf66271b99d6f27aa624d3d43e1c429cb71 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Mon, 12 Aug 2024 09:40:06 -0700 Subject: [PATCH] Fix refresh policy back to WAIT_FOR other than writing query result (#554) Signed-off-by: Tomoyuki Morita --- .../flint/core/storage/FlintOpenSearchMetadataLog.java | 4 ++-- .../opensearch/flint/core/storage/OpenSearchUpdater.java | 7 ++----- .../scala/org/apache/spark/sql/FlintREPLITSuite.scala | 5 +++-- .../org/opensearch/flint/core/OpenSearchUpdaterSuite.scala | 5 +++-- .../src/main/scala/org/apache/spark/sql/OSClient.scala | 2 +- 5 files changed, 11 insertions(+), 12 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 62dd01683..7944de5ae 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -155,7 +155,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { new IndexRequest() .index(metadataLogIndexName) .id(logEntryWithId.id()) - .setRefreshPolicy(options.getRefreshPolicy()) + .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) .source(toJson(logEntryWithId), XContentType.JSON), RequestOptions.DEFAULT)); } @@ -166,7 +166,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { client -> client.update( new UpdateRequest(metadataLogIndexName, logEntry.id()) .doc(toJson(logEntry), XContentType.JSON) - .setRefreshPolicy(options.getRefreshPolicy()) + .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) .setIfSeqNo((Long) logEntry.entryVersion().get("seqNo").get()) .setIfPrimaryTerm((Long) logEntry.entryVersion().get("primaryTerm").get()), RequestOptions.DEFAULT)); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index d9dc54783..0d84b4956 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -6,7 +6,6 @@ import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; -import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import java.io.IOException; @@ -26,12 +25,10 @@ public class OpenSearchUpdater { private final String indexName; private final FlintClient flintClient; - private final FlintOptions options; - public OpenSearchUpdater(String indexName, FlintClient flintClient, FlintOptions options) { + public OpenSearchUpdater(String indexName, FlintClient flintClient) { this.indexName = indexName; this.flintClient = flintClient; - this.options = options; } public void upsert(String id, String doc) { @@ -64,7 +61,7 @@ private void updateDocument(String id, String doc, boolean upsert, long seqNo, l assertIndexExist(client, indexName); UpdateRequest updateRequest = new UpdateRequest(indexName, id) .doc(doc, XContentType.JSON) - .setRefreshPolicy(options.getRefreshPolicy()); + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); if (upsert) { updateRequest.docAsUpsert(true); diff --git a/integ-test/src/integration/scala/org/apache/spark/sql/FlintREPLITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/sql/FlintREPLITSuite.scala index b75ff0ce9..1d86a6589 100644 --- a/integ-test/src/integration/scala/org/apache/spark/sql/FlintREPLITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/sql/FlintREPLITSuite.scala @@ -118,8 +118,9 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest { flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)); osClient = new OSClient(new FlintOptions(openSearchOptions.asJava)) - val options = new FlintOptions(openSearchOptions.asJava) - updater = new OpenSearchUpdater(requestIndex, new FlintOpenSearchClient(options), options) + updater = new OpenSearchUpdater( + requestIndex, + new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))) } override def afterEach(): Unit = { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala index b235ecdd5..1c4da53fb 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala @@ -32,8 +32,9 @@ class OpenSearchUpdaterSuite extends OpenSearchTransactionSuite with Matchers { override def beforeAll(): Unit = { super.beforeAll() flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)); - val options = new FlintOptions(openSearchOptions.asJava) - updater = new OpenSearchUpdater(testMetaLogIndex, new FlintOpenSearchClient(options), options) + updater = new OpenSearchUpdater( + testMetaLogIndex, + new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))) } test("upsert flintJob should success") { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index ebac04876..422cfc947 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -111,7 +111,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { } def createUpdater(indexName: String): OpenSearchUpdater = - new OpenSearchUpdater(indexName, flintClient, flintOptions) + new OpenSearchUpdater(indexName, flintClient) def getDoc(osIndexName: String, id: String): GetResponse = { using(flintClient.createClient()) { client =>