Skip to content

Commit

Permalink
Fix refresh policy back to WAIT_FOR other than writing query result (o…
Browse files Browse the repository at this point in the history
…pensearch-project#554)

Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored Aug 12, 2024
1 parent 61c9620 commit def26cf
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
new IndexRequest()
.index(metadataLogIndexName)
.id(logEntryWithId.id())
.setRefreshPolicy(options.getRefreshPolicy())
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.source(toJson(logEntryWithId), XContentType.JSON),
RequestOptions.DEFAULT));
}
Expand All @@ -166,7 +166,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
client -> client.update(
new UpdateRequest(metadataLogIndexName, logEntry.id())
.doc(toJson(logEntry), XContentType.JSON)
.setRefreshPolicy(options.getRefreshPolicy())
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.setIfSeqNo((Long) logEntry.entryVersion().get("seqNo").get())
.setIfPrimaryTerm((Long) logEntry.entryVersion().get("primaryTerm").get()),
RequestOptions.DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;

import java.io.IOException;
Expand All @@ -26,12 +25,10 @@ public class OpenSearchUpdater {

private final String indexName;
private final FlintClient flintClient;
private final FlintOptions options;

public OpenSearchUpdater(String indexName, FlintClient flintClient, FlintOptions options) {
public OpenSearchUpdater(String indexName, FlintClient flintClient) {
this.indexName = indexName;
this.flintClient = flintClient;
this.options = options;
}

public void upsert(String id, String doc) {
Expand Down Expand Up @@ -64,7 +61,7 @@ private void updateDocument(String id, String doc, boolean upsert, long seqNo, l
assertIndexExist(client, indexName);
UpdateRequest updateRequest = new UpdateRequest(indexName, id)
.doc(doc, XContentType.JSON)
.setRefreshPolicy(options.getRefreshPolicy());
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);

if (upsert) {
updateRequest.docAsUpsert(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {

flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava));
osClient = new OSClient(new FlintOptions(openSearchOptions.asJava))
val options = new FlintOptions(openSearchOptions.asJava)
updater = new OpenSearchUpdater(requestIndex, new FlintOpenSearchClient(options), options)
updater = new OpenSearchUpdater(
requestIndex,
new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)))
}

override def afterEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ class OpenSearchUpdaterSuite extends OpenSearchTransactionSuite with Matchers {
override def beforeAll(): Unit = {
super.beforeAll()
flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava));
val options = new FlintOptions(openSearchOptions.asJava)
updater = new OpenSearchUpdater(testMetaLogIndex, new FlintOpenSearchClient(options), options)
updater = new OpenSearchUpdater(
testMetaLogIndex,
new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)))
}

test("upsert flintJob should success") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
}

def createUpdater(indexName: String): OpenSearchUpdater =
new OpenSearchUpdater(indexName, flintClient, flintOptions)
new OpenSearchUpdater(indexName, flintClient)

def getDoc(osIndexName: String, id: String): GetResponse = {
using(flintClient.createClient()) { client =>
Expand Down

0 comments on commit def26cf

Please sign in to comment.