Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct the delete by query endpoint to match the OpenSearch API #350

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/build_hive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-hadoop-hive:build
run: ./gradlew opensearch-hadoop-hive:build

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_mr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-hadoop-mr:build
run: ./gradlew opensearch-hadoop-mr:build

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark:build
run: ./gradlew opensearch-spark:build

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_20.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-20:integrationTest
run: ./gradlew opensearch-spark-20:integrationTest

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_20_scala_210.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-20:integrationTestSpark20scala210
run: ./gradlew opensearch-spark-20:integrationTestSpark20scala210

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_20_scala_211.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-20:integrationTestSpark20scala210
run: ./gradlew opensearch-spark-20:integrationTestSpark20scala210

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_30.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-30:integrationTest
run: ./gradlew opensearch-spark-30:integrationTest

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_30_scala_213.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-30:integrationTestSpark30scala213
run: ./gradlew opensearch-spark-30:integrationTestSpark30scala213

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Removed

### Fixed
- Corrected the delete by query endpoint to match the OpenSearch API ([#350](https://github.com/opensearch-project/opensearch-hadoop/pull/350))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class AbstractRestQueryTest {
@Before
public void start() throws IOException {
version = TestUtils.getOpenSearchClusterInfo().getMajorVersion();
settings = new TestSettings("rest/savebulk");
settings = new TestSettings("rest_save_bulk");
settings.setInternalVersion(version);
//testSettings.setPort(9200)
settings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Ignore;
import org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException;
import org.opensearch.hadoop.cfg.ConfigurationOptions;
import org.opensearch.hadoop.cfg.Settings;
Expand Down Expand Up @@ -66,9 +67,11 @@ public class AbstractRestSaveTest {

private static final Log LOG = LogFactory.getLog(AbstractRestSaveTest.class);

private static final JsonUtils.Query HITS_TOTAL_VALUE = JsonUtils.query("hits").get("total").get("value");

@Test
public void testBulkWrite() throws Exception {
TestSettings testSettings = new TestSettings("rest/savebulk");
TestSettings testSettings = new TestSettings("rest_save_bulk");
//testSettings.setPort(9200)
testSettings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
RestRepository client = new RestRepository(testSettings);
Expand All @@ -83,16 +86,18 @@ public void testBulkWrite() throws Exception {
line.put("name", in.next());
line.put("url", in.next());
line.put("picture", in.next());
in.nextLine();
client.writeToIndex(line);
line.clear();
}

client.close();
}

@Ignore("OpenSearch throws an error on empty bulk request")
@Test
public void testEmptyBulkWrite() throws Exception {
TestSettings testSettings = new TestSettings("rest/emptybulk");
TestSettings testSettings = new TestSettings("rest_empty_bulk");
testSettings.setInternalClusterInfo(TestUtils.getOpenSearchClusterInfo());
testSettings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
RestRepository restRepo = new RestRepository(testSettings);
Expand All @@ -105,8 +110,9 @@ public void testEmptyBulkWrite() throws Exception {

@Test
public void testRepositoryDelete() throws Exception {
Settings settings = new TestSettings("rest/deletebulk");
RestUtils.delete("rest");
String index = "rest_delete_bulk";
Settings settings = new TestSettings(index);
RestUtils.delete(index);
InitializationUtils.discoverClusterInfo(settings, LOG);
settings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
settings.setProperty(ConfigurationOptions.OPENSEARCH_MAPPING_DEFAULT_EXTRACTOR_CLASS, ConstantFieldExtractor.class.getName());
Expand All @@ -120,18 +126,18 @@ public void testRepositoryDelete() throws Exception {
String doc = "{\"index\":{\"_id\":\"" + StringUtils.jsonEncoding(id) + "\"}}\n{\"field\":1}\n";
repository.writeProcessedToIndex(new BytesArray(doc));
repository.flush();
RestUtils.refresh("rest");
RestUtils.refresh(index);

assertThat(JsonUtils.query("hits").get("total").apply(JsonUtils.asMap(RestUtils.get("rest/deletebulk/_search"))), is(equalTo(1)));
assertThat(HITS_TOTAL_VALUE.apply(JsonUtils.asMap(RestUtils.get(index + "/_search"))), is(equalTo(1)));

repository.delete();

assertThat(JsonUtils.query("hits").get("total").apply(JsonUtils.asMap(RestUtils.get("rest/deletebulk/_search"))), is(equalTo(0)));
assertThat(HITS_TOTAL_VALUE.apply(JsonUtils.asMap(RestUtils.get(index + "/_search"))), is(equalTo(0)));
}

@Test
public void testRepositoryDeleteEmptyIndex() throws Exception {
Settings settings = new TestSettings("delete_empty/test");
Settings settings = new TestSettings("delete_empty");
RestUtils.delete("delete_empty");
InitializationUtils.discoverClusterInfo(settings, LOG);
settings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
Expand All @@ -143,11 +149,11 @@ public void testRepositoryDeleteEmptyIndex() throws Exception {
RestRepository repository = new RestRepository(settings);
repository.touch();

assertThat(JsonUtils.query("hits").get("total").apply(JsonUtils.asMap(RestUtils.get("delete_empty/test/_search"))), is(equalTo(0)));
assertThat(HITS_TOTAL_VALUE.apply(JsonUtils.asMap(RestUtils.get("delete_empty/_search"))), is(equalTo(0)));

repository.delete();

assertThat(JsonUtils.query("hits").get("total").apply(JsonUtils.asMap(RestUtils.get("delete_empty/test/_search"))), is(equalTo(0)));
assertThat(HITS_TOTAL_VALUE.apply(JsonUtils.asMap(RestUtils.get("delete_empty/_search"))), is(equalTo(0)));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

@Ignore
@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractRestSaveTest.class, AbstractRestQueryTest.class })
public class RestSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public int getResponseCode() {
public BulkActionResponse bulk(Resource resource, TrackingBytesArray data) {
// NB: dynamically get the stats since the transport can change
long start = network.transportStats().netTotalTime;
Response response = execute(PUT, resource.bulk(), data);
Response response = execute(POST, resource.bulk(), data);
long spent = network.transportStats().netTotalTime - start;

stats.bulkTotal++;
Expand Down Expand Up @@ -499,6 +499,13 @@ public boolean delete(String indexOrType) {
return (res.status() == HttpStatus.OK ? true : false);
}

public int deleteByQuery(String indexOrType, QueryBuilder query) {
BytesArray body = searchRequest(query);
Request req = new SimpleRequest(POST, null, indexOrType + "/_delete_by_query", body);
Response res = executeNotFoundAllowed(req);
return parseContent(res.body(), "deleted");
}

public boolean deleteScroll(String scrollId) {
BytesArray body = new BytesArray(("{\"scroll_id\":[\"" + scrollId + "\"]}").getBytes(StringUtils.UTF_8));
Request req = new SimpleRequest(DELETE, null, "_search/scroll", body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.hadoop.cfg.Settings;
import org.opensearch.hadoop.rest.bulk.BulkProcessor;
import org.opensearch.hadoop.rest.bulk.BulkResponse;
import org.opensearch.hadoop.rest.query.MatchAllQueryBuilder;
import org.opensearch.hadoop.rest.query.QueryUtils;
import org.opensearch.hadoop.rest.stats.Stats;
import org.opensearch.hadoop.rest.stats.StatsAware;
Expand Down Expand Up @@ -376,15 +377,16 @@ public boolean touch() {
}

public void delete() {
// try first a blind delete by query (since the plugin might be installed)
// try first a blind delete by query
try {
if (resources.getResourceWrite().isTyped()) {
client.delete(resources.getResourceWrite().index() + "/" + resources.getResourceWrite().type() + "/_query?q=*");
} else {
client.delete(resources.getResourceWrite().index() + "/_query?q=*");
}
Resource res = resources.getResourceWrite();
client.deleteByQuery(
res.isTyped()
? res.index() + "/" + res.type()
: res.index(),
MatchAllQueryBuilder.MATCH_ALL);
} catch (OpenSearchHadoopInvalidRequest ehir) {
log.info("Skipping delete by query as the plugin is not installed...");
log.error("Delete by query was not successful...", ehir);
}

// in ES 2.0 and higher this means scrolling and deleting the docs by hand...
Expand Down
Loading