diff --git a/.ci/bwcVersions b/.ci/bwcVersions index ccb47c1a3b724..5587b8e5784c5 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -39,4 +39,5 @@ BWC_VERSION: - "2.16.1" - "2.17.0" - "2.17.1" + - "2.17.2" - "2.18.0" diff --git a/CHANGELOG.md b/CHANGELOG.md index 681e05bd4250c..35d512358249a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424)) - Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916)) - Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967)) -- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430)) - Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923)) - Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718)) - Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976)) +- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611)) +- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978)) - [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289)) ### Dependencies @@ -32,9 +33,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `dnsjava:dnsjava` from 3.6.1 to 3.6.2 ([#16041](https://github.com/opensearch-project/OpenSearch/pull/16041)) - Bump `com.maxmind.geoip2:geoip2` from 4.2.0 to 4.2.1 ([#16042](https://github.com/opensearch-project/OpenSearch/pull/16042)) - Bump `com.maxmind.db:maxmind-db` from 3.1.0 to 3.1.1 ([#16137](https://github.com/opensearch-project/OpenSearch/pull/16137)) +- Bump `com.azure:azure-core-http-netty` from 1.15.3 to 1.15.4 ([#16133](https://github.com/opensearch-project/OpenSearch/pull/16133)) +- Bump `org.jline:jline` from 3.26.3 to 3.27.0 ([#16135](https://github.com/opensearch-project/OpenSearch/pull/16135)) ### Changed - Add support for docker compose v2 in TestFixturesPlugin ([#16049](https://github.com/opensearch-project/OpenSearch/pull/16049)) +- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430)) +- Remove Identity FeatureFlag ([#16024](https://github.com/opensearch-project/OpenSearch/pull/16024)) +- Ensure RestHandler.Wrapper delegates all implementations to the wrapped handler ([#16154](https://github.com/opensearch-project/OpenSearch/pull/16154)) ### Deprecated @@ -49,6 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Avoid infinite loop when `flat_object` field contains invalid token ([#15985](https://github.com/opensearch-project/OpenSearch/pull/15985)) - Fix infinite loop in nested agg ([#15931](https://github.com/opensearch-project/OpenSearch/pull/15931)) - Fix race condition in node-join and node-left ([#15521](https://github.com/opensearch-project/OpenSearch/pull/15521)) +- Streaming bulk request hangs ([#16158](https://github.com/opensearch-project/OpenSearch/pull/16158)) ### Security diff --git a/README.md b/README.md index 03728aee0135c..95fbac7bbecf1 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ [![Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch)](https://github.com/opensearch-project/OpenSearch/issues) [![Open Pull Requests](https://img.shields.io/github/issues-pr/opensearch-project/OpenSearch)](https://github.com/opensearch-project/OpenSearch/pulls) [![2.18.0 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v2.18.0)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v2.18.0") -[![2.17.1 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v2.17.1)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v2.17.1") +[![2.17.2 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v2.17.2)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v2.17.2") [![3.0.0 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v3.0.0)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v3.0.0") [![GHA gradle check](https://github.com/opensearch-project/OpenSearch/actions/workflows/gradle-check.yml/badge.svg)](https://github.com/opensearch-project/OpenSearch/actions/workflows/gradle-check.yml) [![GHA validate pull request](https://github.com/opensearch-project/OpenSearch/actions/workflows/wrapper.yml/badge.svg)](https://github.com/opensearch-project/OpenSearch/actions/workflows/wrapper.yml) diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/OpenSearchRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/opensearch/client/OpenSearchRestHighLevelClientTestCase.java index b0a7d1e3578c0..b512117c42f65 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/OpenSearchRestHighLevelClientTestCase.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/OpenSearchRestHighLevelClientTestCase.java @@ -90,6 +90,7 @@ public abstract class OpenSearchRestHighLevelClientTestCase extends OpenSearchRestTestCase { protected static final String CONFLICT_PIPELINE_ID = "conflict_pipeline"; + protected static final double DOUBLE_DELTA = 0.000001; private static RestHighLevelClient restHighLevelClient; private static boolean async = Booleans.parseBoolean(System.getProperty("tests.rest.async", "false")); diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RankEvalIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/RankEvalIT.java index 01fdd489aa7d8..6da8a29a9789e 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RankEvalIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RankEvalIT.java @@ -158,7 +158,7 @@ public void testMetrics() throws IOException { RankEvalRequest rankEvalRequest = new RankEvalRequest(spec, new String[] { "index", "index2" }); RankEvalResponse response = execute(rankEvalRequest, highLevelClient()::rankEval, highLevelClient()::rankEvalAsync); - assertEquals(expectedScores[i], response.getMetricScore(), Double.MIN_VALUE); + assertEquals(expectedScores[i], response.getMetricScore(), DOUBLE_DELTA); i++; } } diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index b86b8459fb8a8..5d38f85c40c89 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -110,6 +110,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_16_1 = new Version(2160199, org.apache.lucene.util.Version.LUCENE_9_11_1); public static final Version V_2_17_0 = new Version(2170099, org.apache.lucene.util.Version.LUCENE_9_11_1); public static final Version V_2_17_1 = new Version(2170199, org.apache.lucene.util.Version.LUCENE_9_11_1); + public static final Version V_2_17_2 = new Version(2170299, org.apache.lucene.util.Version.LUCENE_9_11_1); public static final Version V_2_18_0 = new Version(2180099, org.apache.lucene.util.Version.LUCENE_9_11_1); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_12_0); public static final Version CURRENT = V_3_0_0; diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 2892bdba51ba6..4baf79e619be9 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -48,7 +48,7 @@ dependencies { api 'com.azure:azure-json:1.1.0' api 'com.azure:azure-xml:1.1.0' api 'com.azure:azure-storage-common:12.25.1' - api 'com.azure:azure-core-http-netty:1.15.3' + api 'com.azure:azure-core-http-netty:1.15.4' api "io.netty:netty-codec-dns:${versions.netty}" api "io.netty:netty-codec-socks:${versions.netty}" api "io.netty:netty-codec-http2:${versions.netty}" diff --git a/plugins/repository-azure/licenses/azure-core-http-netty-1.15.3.jar.sha1 b/plugins/repository-azure/licenses/azure-core-http-netty-1.15.3.jar.sha1 deleted file mode 100644 index 3cea52ba67ce5..0000000000000 --- a/plugins/repository-azure/licenses/azure-core-http-netty-1.15.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -03b5bd5f5c16eea71f130119dbfb1fe5239f806a \ No newline at end of file diff --git a/plugins/repository-azure/licenses/azure-core-http-netty-1.15.4.jar.sha1 b/plugins/repository-azure/licenses/azure-core-http-netty-1.15.4.jar.sha1 new file mode 100644 index 0000000000000..97e6fad264294 --- /dev/null +++ b/plugins/repository-azure/licenses/azure-core-http-netty-1.15.4.jar.sha1 @@ -0,0 +1 @@ +489a38c9e6efb5ce01fbd276d8cb6c0e89000459 \ No newline at end of file diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java index d691cad9c9d03..8bbef168de89c 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java @@ -18,6 +18,7 @@ import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; +import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.core.retry.backoff.BackoffStrategy; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; @@ -239,7 +240,9 @@ static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSett RetryPolicy.builder() .numRetries(clientSettings.maxRetries) .throttlingBackoffStrategy( - clientSettings.throttleRetries ? BackoffStrategy.defaultThrottlingStrategy() : BackoffStrategy.none() + clientSettings.throttleRetries + ? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD) + : BackoffStrategy.none() ) .build() ) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java index fe81da31432f4..3d5e121778ba9 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java @@ -42,6 +42,7 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.core.retry.backoff.BackoffStrategy; import software.amazon.awssdk.http.SystemPropertyTlsKeyManagersProvider; @@ -330,6 +331,8 @@ static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSett ); if (!clientSettings.throttleRetries) { retryPolicy.throttlingBackoffStrategy(BackoffStrategy.none()); + } else { + retryPolicy.throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD)); } return clientOverrideConfiguration.retryPolicy(retryPolicy.build()).build(); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java index b80b857644f2a..e7312157d7a33 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java @@ -35,6 +35,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.core.retry.backoff.BackoffStrategy; import software.amazon.awssdk.http.apache.ProxyConfiguration; @@ -364,7 +365,7 @@ private void launchAWSConfigurationTest( if (expectedUseThrottleRetries) { assertThat( clientOverrideConfiguration.retryPolicy().get().throttlingBackoffStrategy(), - is(BackoffStrategy.defaultThrottlingStrategy()) + is(BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD)) ); } else { assertThat(clientOverrideConfiguration.retryPolicy().get().throttlingBackoffStrategy(), is(BackoffStrategy.none())); diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java index 6f3895fffa437..1b60023da0329 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Locale; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -297,4 +298,31 @@ public void testStreamingBadStream() throws IOException { assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); assertThat(streamingResponse.getWarnings(), empty()); } + + public void testStreamingLargeDocument() throws IOException { + final Stream stream = Stream.of( + String.format( + Locale.getDefault(), + "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n", + randomAlphaOfLength(5000) + ) + ); + + final Duration delay = Duration.ofMillis(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches(s -> s.contains("\"type\":\"illegal_argument_exception\"")) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + } } diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/IdentityAuthenticationIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/IdentityAuthenticationIT.java index 1a806b033eb8a..14346b8910c76 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/IdentityAuthenticationIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/IdentityAuthenticationIT.java @@ -38,7 +38,6 @@ public class IdentityAuthenticationIT extends HttpSmokeTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(FeatureFlags.IDENTITY, "true") .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java index 2fcda8c2d2f27..024e0e952eea5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java @@ -9,6 +9,8 @@ package org.opensearch.remotestore; import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -20,6 +22,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; +import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE; + @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase { static final String INDEX_NAME = "remote-store-test-idx-1"; @@ -180,4 +184,41 @@ public void onFailure(Exception e) { assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())); remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3)); } + + public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception { + logger.info("Starting up cluster manager"); + logger.info("cluster.remote_store.pinned_timestamps.enabled set to true"); + logger.info("cluster.remote_store.pinned_timestamps.scheduler_interval set to minimum value of 1minute"); + Settings pinnedTimestampEnabledSettings = Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m") + .build(); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings); + String remoteNodeName = internalCluster().startDataOnlyNodes(1, pinnedTimestampEnabledSettings).get(0); + ensureStableCluster(2); + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + remoteNodeName + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + assertBusy(() -> { + long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); + assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L); + NodesStatsResponse nodesStatsResponse = internalCluster().client() + .admin() + .cluster() + .prepareNodesStats() + .addMetric(REMOTE_STORE.metricName()) + .execute() + .actionGet(); + for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { + long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps(); + assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps); + } + }); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3)); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java index c3214022df663..8b6869aa1d81a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java @@ -116,7 +116,7 @@ public void testStatusApiConsistency() { assertEquals(snapshotStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime()); } - public void testStatusAPICallForShallowCopySnapshot() { + public void testStatusAPICallForShallowCopySnapshot() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used for the test"); internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); @@ -136,15 +136,24 @@ public void testStatusAPICallForShallowCopySnapshot() { final String snapshot = "snapshot"; createFullSnapshot(snapshotRepoName, snapshot); - final SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, snapshot); - assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); + assertBusy(() -> { + final SnapshotStatus snapshotStatus = client().admin() + .cluster() + .prepareSnapshotStatus(snapshotRepoName) + .setSnapshots(snapshot) + .execute() + .actionGet() + .getSnapshots() + .get(0); + assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); - final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName); - assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); - assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); - assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L)); - assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0)); - assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L)); + final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName); + assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); + assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); + assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L)); + assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0)); + assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L)); + }, 20, TimeUnit.SECONDS); } public void testStatusAPICallInProgressSnapshot() throws Exception { @@ -193,7 +202,7 @@ public void testExceptionOnMissingSnapBlob() throws IOException { ); } - public void testExceptionOnMissingShardLevelSnapBlob() throws IOException { + public void testExceptionOnMissingShardLevelSnapBlob() throws Exception { disableRepoConsistencyCheck("This test intentionally corrupts the repository"); final Path repoPath = randomRepoPath(); @@ -216,11 +225,12 @@ public void testExceptionOnMissingShardLevelSnapBlob() throws IOException { repoPath.resolve(resolvePath(indexId, "0")) .resolve(BlobStoreRepository.SNAPSHOT_PREFIX + snapshotInfo.snapshotId().getUUID() + ".dat") ); - - expectThrows( - SnapshotMissingException.class, - () -> client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").execute().actionGet() - ); + assertBusy(() -> { + expectThrows( + SnapshotMissingException.class, + () -> client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").execute().actionGet() + ); + }, 20, TimeUnit.SECONDS); } public void testGetSnapshotsWithoutIndices() throws Exception { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 0917a0baff1ab..c91260778f037 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -59,6 +59,7 @@ import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; import org.opensearch.node.NodesResourceUsageStats; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; import org.opensearch.repositories.RepositoriesStats; import org.opensearch.script.ScriptCacheStats; @@ -162,6 +163,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private NodeCacheStats nodeCacheStats; + @Nullable + private RemoteStoreNodeStats remoteStoreNodeStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -243,6 +247,12 @@ public NodeStats(StreamInput in) throws IOException { } else { nodeCacheStats = null; } + // TODO: change version to V_2_18_0 + if (in.getVersion().onOrAfter(Version.CURRENT)) { + remoteStoreNodeStats = in.readOptionalWriteable(RemoteStoreNodeStats::new); + } else { + remoteStoreNodeStats = null; + } } public NodeStats( @@ -274,7 +284,8 @@ public NodeStats( @Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats, @Nullable RepositoriesStats repositoriesStats, @Nullable AdmissionControlStats admissionControlStats, - @Nullable NodeCacheStats nodeCacheStats + @Nullable NodeCacheStats nodeCacheStats, + @Nullable RemoteStoreNodeStats remoteStoreNodeStats ) { super(node); this.timestamp = timestamp; @@ -305,6 +316,7 @@ public NodeStats( this.repositoriesStats = repositoriesStats; this.admissionControlStats = admissionControlStats; this.nodeCacheStats = nodeCacheStats; + this.remoteStoreNodeStats = remoteStoreNodeStats; } public long getTimestamp() { @@ -467,6 +479,11 @@ public NodeCacheStats getNodeCacheStats() { return nodeCacheStats; } + @Nullable + public RemoteStoreNodeStats getRemoteStoreNodeStats() { + return remoteStoreNodeStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -525,6 +542,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_14_0)) { out.writeOptionalWriteable(nodeCacheStats); } + // TODO: change version to V_2_18_0 + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(remoteStoreNodeStats); + } } @Override @@ -631,6 +652,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getNodeCacheStats() != null) { getNodeCacheStats().toXContent(builder, params); } + if (getRemoteStoreNodeStats() != null) { + getRemoteStoreNodeStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index f1f9f93afdad2..a5b00ed82d3cb 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -220,7 +220,8 @@ public enum Metric { SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"), REPOSITORIES("repositories"), ADMISSION_CONTROL("admission_control"), - CACHE_STATS("caches"); + CACHE_STATS("caches"), + REMOTE_STORE("remote_store"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 2c808adc97c7a..a98d245af872b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -129,7 +129,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics), NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics), NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics), - NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics) + NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics), + NodesStatsRequest.Metric.REMOTE_STORE.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index a49ca2035783c..c4b3524cf6da5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -174,6 +174,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 73375f9eaa413..59d999798868e 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -31,7 +31,6 @@ protected FeatureFlagSettings( public static final Set> BUILT_IN_FEATURE_FLAGS = Set.of( FeatureFlags.EXTENSIONS_SETTING, - FeatureFlags.IDENTITY_SETTING, FeatureFlags.TELEMETRY_SETTING, FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, FeatureFlags.TIERED_REMOTE_INDEX_SETTING, diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index a5acea004c3b2..6df68013a8119 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -40,11 +40,6 @@ public class FeatureFlags { */ public static final String EXTENSIONS = "opensearch.experimental.feature.extensions.enabled"; - /** - * Gates the functionality of identity. - */ - public static final String IDENTITY = "opensearch.experimental.feature.identity.enabled"; - /** * Gates the functionality of telemetry framework. */ @@ -82,8 +77,6 @@ public class FeatureFlags { public static final Setting EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope); - public static final Setting IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope); - public static final Setting TELEMETRY_SETTING = Setting.boolSetting(TELEMETRY, false, Property.NodeScope); public static final Setting DATETIME_FORMATTER_CACHING_SETTING = Setting.boolSetting( @@ -138,7 +131,6 @@ public class FeatureFlags { private static final List> ALL_FEATURE_FLAG_SETTINGS = List.of( REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, EXTENSIONS_SETTING, - IDENTITY_SETTING, TELEMETRY_SETTING, DATETIME_FORMATTER_CACHING_SETTING, TIERED_REMOTE_INDEX_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index d6739c4572d1a..aa007f5da15b3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -12,8 +12,10 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.DiffableStringMap; +import org.opensearch.common.CheckedFunction; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -22,11 +24,15 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParseException; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import com.jcraft.jzlib.JZlib; @@ -37,6 +43,7 @@ */ public class ClusterStateChecksum implements ToXContentFragment, Writeable { + public static final int COMPONENT_SIZE = 11; static final String ROUTING_TABLE_CS = "routing_table"; static final String NODES_CS = "discovery_nodes"; static final String BLOCKS_CS = "blocks"; @@ -65,62 +72,103 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable { long indicesChecksum; long clusterStateChecksum; - public ClusterStateChecksum(ClusterState clusterState) { - try ( - BytesStreamOutput out = new BytesStreamOutput(); - BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out) - ) { - clusterState.routingTable().writeVerifiableTo(checksumOut); - routingTableChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.nodes().writeVerifiableTo(checksumOut); - nodesChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.coordinationMetadata().writeVerifiableTo(checksumOut); - coordinationMetadataChecksum = checksumOut.getChecksum(); - - // Settings create sortedMap by default, so no explicit sorting required here. - checksumOut.reset(); - Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), checksumOut); - settingMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), checksumOut); - transientSettingsMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.metadata().templatesMetadata().writeVerifiableTo(checksumOut); - templatesMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeStringCollection(clusterState.metadata().customs().keySet()); - customMetadataMapChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(checksumOut); - hashesOfConsistentSettingsChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeMapValues( + public ClusterStateChecksum(ClusterState clusterState, ThreadPool threadpool) { + long start = threadpool.relativeTimeInNanos(); + ExecutorService executorService = threadpool.executor(ThreadPool.Names.REMOTE_STATE_CHECKSUM); + CountDownLatch latch = new CountDownLatch(COMPONENT_SIZE); + + executeChecksumTask((stream) -> { + clusterState.routingTable().writeVerifiableTo(stream); + return null; + }, checksum -> routingTableChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + clusterState.nodes().writeVerifiableTo(stream); + return null; + }, checksum -> nodesChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + clusterState.coordinationMetadata().writeVerifiableTo(stream); + return null; + }, checksum -> coordinationMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), stream); + return null; + }, checksum -> settingMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), stream); + return null; + }, checksum -> transientSettingsMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + clusterState.metadata().templatesMetadata().writeVerifiableTo(stream); + return null; + }, checksum -> templatesMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + stream.writeStringCollection(clusterState.metadata().customs().keySet()); + return null; + }, checksum -> customMetadataMapChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(stream); + return null; + }, checksum -> hashesOfConsistentSettingsChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + stream.writeMapValues( clusterState.metadata().indices(), - (stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream) + (checksumStream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) checksumStream) ); - indicesChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.blocks().writeVerifiableTo(checksumOut); - blocksChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeStringCollection(clusterState.customs().keySet()); - clusterStateCustomsChecksum = checksumOut.getChecksum(); - } catch (IOException e) { - logger.error("Failed to create checksum for cluster state.", e); + return null; + }, checksum -> indicesChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + clusterState.blocks().writeVerifiableTo(stream); + return null; + }, checksum -> blocksChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + stream.writeStringCollection(clusterState.customs().keySet()); + return null; + }, checksum -> clusterStateCustomsChecksum = checksum, executorService, latch); + + try { + latch.await(); + } catch (InterruptedException e) { throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e); } createClusterStateChecksum(); + logger.debug("Checksum execution time {}", TimeValue.nsecToMSec(threadpool.relativeTimeInNanos() - start)); + } + + private void executeChecksumTask( + CheckedFunction checksumTask, + Consumer checksumConsumer, + ExecutorService executorService, + CountDownLatch latch + ) { + executorService.execute(() -> { + try { + long checksum = createChecksum(checksumTask); + checksumConsumer.accept(checksum); + latch.countDown(); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to execute checksum task", e); + } + }); + } + + private long createChecksum(CheckedFunction task) throws IOException { + try ( + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out) + ) { + task.apply(checksumOut); + return checksumOut.getChecksum(); + } } private void createClusterStateChecksum() { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ece29180f9cf5..ce5e57b79dadb 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -332,7 +332,9 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, false, codecVersion ); @@ -539,7 +541,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, false, previousManifest.getCodecVersion() ); @@ -1010,7 +1014,9 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, true, previousManifest.getCodecVersion() ); @@ -1631,7 +1637,7 @@ void validateClusterStateFromChecksum( String localNodeId, boolean isFullStateDownload ) { - ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState); + ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState, threadpool); List failedValidation = newClusterStateChecksum.getMismatchEntities(manifest.getClusterStateChecksum()); if (failedValidation.isEmpty()) { return; diff --git a/server/src/main/java/org/opensearch/identity/IdentityService.java b/server/src/main/java/org/opensearch/identity/IdentityService.java index 03f937180f4ba..33066fae5a80d 100644 --- a/server/src/main/java/org/opensearch/identity/IdentityService.java +++ b/server/src/main/java/org/opensearch/identity/IdentityService.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; +import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.settings.Settings; import org.opensearch.identity.noop.NoopIdentityPlugin; import org.opensearch.identity.tokens.TokenManager; @@ -22,8 +23,9 @@ /** * Identity and access control for OpenSearch * - * @opensearch.experimental + * @opensearch.internal * */ +@InternalApi public class IdentityService { private static final Logger log = LogManager.getLogger(IdentityService.class); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4962d72d8728a..56d04df5921ee 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -526,11 +526,7 @@ protected Node( FeatureFlags.initializeFeatureFlags(settings); final List identityPlugins = new ArrayList<>(); - if (FeatureFlags.isEnabled(FeatureFlags.IDENTITY)) { - // If identity is enabled load plugins implementing the extension point - logger.info("Identity on so found plugins implementing: " + pluginsService.filterPlugins(IdentityPlugin.class).toString()); - identityPlugins.addAll(pluginsService.filterPlugins(IdentityPlugin.class)); - } + identityPlugins.addAll(pluginsService.filterPlugins(IdentityPlugin.class)); final Set additionalRoles = pluginsService.filterPlugins(Plugin.class) .stream() diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 1eb38ea63ad5a..9671fda14375d 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -54,6 +54,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.plugins.PluginsService; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.repositories.RepositoriesService; @@ -241,7 +242,8 @@ public NodeStats stats( boolean segmentReplicationTrackerStats, boolean repositoriesStats, boolean admissionControl, - boolean cacheService + boolean cacheService, + boolean remoteStoreNodeStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -274,7 +276,8 @@ public NodeStats stats( segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, admissionControl ? this.admissionControlService.stats() : null, - cacheService ? this.cacheService.stats(indices) : null + cacheService ? this.cacheService.stats(indices) : null, + remoteStoreNodeStats ? new RemoteStoreNodeStats() : null ); } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java new file mode 100644 index 0000000000000..8da8a17e21839 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node.remotestore; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Node level remote store stats + * @opensearch.internal + */ +public class RemoteStoreNodeStats implements Writeable, ToXContentFragment { + + public static final String STATS_NAME = "remote_store"; + public static final String LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS = "last_successful_fetch_of_pinned_timestamps"; + + /** + * Time stamp for the last successful fetch of pinned timestamps by the {@linkplain RemoteStorePinnedTimestampService} + */ + private final long lastSuccessfulFetchOfPinnedTimestamps; + + public RemoteStoreNodeStats() { + this.lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); + } + + public long getLastSuccessfulFetchOfPinnedTimestamps() { + return this.lastSuccessfulFetchOfPinnedTimestamps; + } + + public RemoteStoreNodeStats(StreamInput in) throws IOException { + this.lastSuccessfulFetchOfPinnedTimestamps = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(this.lastSuccessfulFetchOfPinnedTimestamps); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(STATS_NAME); + builder.field(LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS, this.lastSuccessfulFetchOfPinnedTimestamps); + return builder.endObject(); + } + + @Override + public String toString() { + return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}"; + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (o.getClass() != RemoteStoreNodeStats.class) { + return false; + } + RemoteStoreNodeStats other = (RemoteStoreNodeStats) o; + return this.lastSuccessfulFetchOfPinnedTimestamps == other.lastSuccessfulFetchOfPinnedTimestamps; + } + + @Override + public int hashCode() { + return Objects.hash(lastSuccessfulFetchOfPinnedTimestamps); + } +} diff --git a/server/src/main/java/org/opensearch/rest/RestController.java b/server/src/main/java/org/opensearch/rest/RestController.java index 4f87c01258396..c17f723c13f2a 100644 --- a/server/src/main/java/org/opensearch/rest/RestController.java +++ b/server/src/main/java/org/opensearch/rest/RestController.java @@ -709,7 +709,7 @@ public void sendResponse(RestResponse response) { prepareResponse(response.status(), Map.of("Content-Type", List.of(response.contentType()))); } - Mono.ignoreElements(this).then(Mono.just(response)).subscribe(delegate::sendResponse); + Mono.from(this).ignoreElement().then(Mono.just(response)).subscribe(delegate::sendResponse); } @Override diff --git a/server/src/main/java/org/opensearch/rest/RestHandler.java b/server/src/main/java/org/opensearch/rest/RestHandler.java index 143cbd472ed07..7c3a369be61b9 100644 --- a/server/src/main/java/org/opensearch/rest/RestHandler.java +++ b/server/src/main/java/org/opensearch/rest/RestHandler.java @@ -192,6 +192,16 @@ public List replacedRoutes() { public boolean allowSystemIndexAccessByDefault() { return delegate.allowSystemIndexAccessByDefault(); } + + @Override + public boolean isActionPaginated() { + return delegate.isActionPaginated(); + } + + @Override + public boolean supportsStreaming() { + return delegate.supportsStreaming(); + } } /** diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 81220ab171b34..d795fd252b7fc 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -53,6 +53,7 @@ import org.opensearch.core.service.ReportingService; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.gateway.remote.ClusterStateChecksum; import org.opensearch.node.Node; import java.io.IOException; @@ -118,6 +119,7 @@ public static class Names { public static final String REMOTE_RECOVERY = "remote_recovery"; public static final String REMOTE_STATE_READ = "remote_state_read"; public static final String INDEX_SEARCHER = "index_searcher"; + public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum"; } /** @@ -191,6 +193,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING); map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING); map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); + map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -307,6 +310,10 @@ public ThreadPool( runnableTaskListener ) ); + builders.put( + Names.REMOTE_STATE_CHECKSUM, + new FixedExecutorBuilder(settings, Names.REMOTE_STATE_CHECKSUM, ClusterStateChecksum.COMPONENT_SIZE, 1000) + ); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 11902728eed07..34065daff2b8a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -95,6 +95,7 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; @@ -614,6 +615,14 @@ public void testSerialization() throws IOException { } else { assertEquals(nodeCacheStats, deserializedNodeCacheStats); } + + RemoteStoreNodeStats remoteStoreNodeStats = nodeStats.getRemoteStoreNodeStats(); + RemoteStoreNodeStats deserializedRemoteStoreNodeStats = deserializedNodeStats.getRemoteStoreNodeStats(); + if (remoteStoreNodeStats == null) { + assertNull(deserializedRemoteStoreNodeStats); + } else { + assertEquals(remoteStoreNodeStats, deserializedRemoteStoreNodeStats); + } } } } @@ -996,6 +1005,16 @@ public void apply(String action, AdmissionControlActionType admissionControlActi nodeCacheStats = new NodeCacheStats(cacheStatsMap, flags); } + RemoteStoreNodeStats remoteStoreNodeStats = null; + if (frequently()) { + remoteStoreNodeStats = new RemoteStoreNodeStats() { + @Override + public long getLastSuccessfulFetchOfPinnedTimestamps() { + return 123456L; + } + }; + } + // TODO: Only remote_store based aspects of NodeIndicesStats are being tested here. // It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now return new NodeStats( @@ -1027,7 +1046,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi segmentReplicationRejectionStats, null, admissionControlStats, - nodeCacheStats + nodeCacheStats, + remoteStoreNodeStats ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 1c4a77905d73f..823661ba14abf 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -349,6 +349,7 @@ private ClusterStatsNodeResponse createClusterStatsNodeResponse( null, null, null, + null, null ); if (defaultBehavior) { diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 5539dd26dd52d..cd050fb346563 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -195,6 +195,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -226,6 +227,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -257,6 +259,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -319,6 +322,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -350,6 +354,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -381,6 +386,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java index 6b6c7b96266d3..6d9d1aad3c5d5 100644 --- a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java +++ b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java @@ -14,7 +14,6 @@ import static org.opensearch.common.util.FeatureFlags.DATETIME_FORMATTER_CACHING; import static org.opensearch.common.util.FeatureFlags.EXTENSIONS; -import static org.opensearch.common.util.FeatureFlags.IDENTITY; public class FeatureFlagTests extends OpenSearchTestCase { @@ -40,7 +39,7 @@ public void testNonBooleanFeatureFlag() { } public void testBooleanFeatureFlagWithDefaultSetToFalse() { - final String testFlag = IDENTITY; + final String testFlag = EXTENSIONS; FeatureFlags.initializeFeatureFlags(Settings.EMPTY); assertNotNull(testFlag); assertFalse(FeatureFlags.isEnabled(testFlag)); @@ -49,15 +48,13 @@ public void testBooleanFeatureFlagWithDefaultSetToFalse() { public void testBooleanFeatureFlagInitializedWithEmptySettingsAndDefaultSetToFalse() { final String testFlag = DATETIME_FORMATTER_CACHING; FeatureFlags.initializeFeatureFlags(Settings.EMPTY); - assertNotNull(testFlag); assertFalse(FeatureFlags.isEnabled(testFlag)); } public void testInitializeFeatureFlagsWithExperimentalSettings() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(IDENTITY, true).build()); - assertTrue(FeatureFlags.isEnabled(IDENTITY)); + FeatureFlags.initializeFeatureFlags(Settings.builder().put(EXTENSIONS, true).build()); + assertTrue(FeatureFlags.isEnabled(EXTENSIONS)); assertFalse(FeatureFlags.isEnabled(DATETIME_FORMATTER_CACHING)); - assertFalse(FeatureFlags.isEnabled(EXTENSIONS)); // reset FeatureFlags to defaults FeatureFlags.initializeFeatureFlags(Settings.EMPTY); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 3f9aa1245cab3..09c2933680be3 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -34,6 +34,9 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; import java.io.IOException; import java.util.ArrayList; @@ -64,6 +67,14 @@ public class ClusterMetadataManifestTests extends OpenSearchTestCase { + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @After + public void teardown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + public void testClusterMetadataManifestXContentV0() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path", CODEC_V0); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() @@ -214,7 +225,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "indicesRoutingDiffPath" ) ) - .checksum(new ClusterStateChecksum(createClusterState())) + .checksum(new ClusterStateChecksum(createClusterState(), threadPool)) .build(); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( @@ -647,7 +658,7 @@ public void testClusterMetadataManifestXContentV4() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); final StringKeyDiffProvider routingTableIncrementalDiff = Mockito.mock(StringKeyDiffProvider.class); - ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState(), threadPool); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) .stateVersion(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java index 0203e56dd2d5c..9b98187053a39 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java @@ -34,6 +34,9 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; import java.io.IOException; import java.util.EnumSet; @@ -41,14 +44,21 @@ import java.util.Map; public class ClusterStateChecksumTests extends OpenSearchTestCase { + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @After + public void teardown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } public void testClusterStateChecksumEmptyClusterState() { - ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE); + ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE, threadPool); assertNotNull(checksum); } public void testClusterStateChecksum() { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); assertNotNull(checksum); assertTrue(checksum.routingTableChecksum != 0); assertTrue(checksum.nodesChecksum != 0); @@ -65,8 +75,8 @@ public void testClusterStateChecksum() { } public void testClusterStateMatchChecksum() { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); - ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); + ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState(), threadPool); assertNotNull(checksum); assertNotNull(newChecksum); assertEquals(checksum.routingTableChecksum, newChecksum.routingTableChecksum); @@ -84,7 +94,7 @@ public void testClusterStateMatchChecksum() { } public void testXContentConversion() throws IOException { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); checksum.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -97,7 +107,7 @@ public void testXContentConversion() throws IOException { } public void testSerialization() throws IOException { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); BytesStreamOutput output = new BytesStreamOutput(); checksum.writeTo(output); @@ -109,10 +119,10 @@ public void testSerialization() throws IOException { public void testGetMismatchEntities() { ClusterState clsState1 = generateClusterState(); - ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1); + ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1, threadPool); assertTrue(checksum.getMismatchEntities(checksum).isEmpty()); - ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1); + ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1, threadPool); assertTrue(checksum.getMismatchEntities(checksum2).isEmpty()); ClusterState clsState2 = ClusterState.builder(ClusterName.DEFAULT) @@ -122,7 +132,7 @@ public void testGetMismatchEntities() { .customs(Map.of()) .metadata(Metadata.EMPTY_METADATA) .build(); - ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2); + ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2, threadPool); List mismatches = checksum.getMismatchEntities(checksum3); assertFalse(mismatches.isEmpty()); assertEquals(11, mismatches.size()); @@ -151,8 +161,8 @@ public void testGetMismatchEntitiesUnorderedInput() { ClusterState state2 = ClusterState.builder(state1).nodes(nodes1).build(); ClusterState state3 = ClusterState.builder(state1).nodes(nodes2).build(); - ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2); - ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3); + ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2, threadPool); + ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3, threadPool); assertEquals(checksum2, checksum1); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 56857285fa8d3..35a8ae16cacf7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -3123,7 +3123,7 @@ public void testWriteFullMetadataSuccessWithChecksumValidationEnabled() throws I .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1L) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3193,7 +3193,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indices(Collections.emptyList()) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); @@ -3219,7 +3219,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3245,7 +3245,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone() final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indices(Collections.emptyList()) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); @@ -3271,7 +3271,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone() .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3349,7 +3349,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3382,7 +3382,7 @@ public void testGetClusterStateForManifestWithChecksumValidationModeNone() throw initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3415,7 +3415,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMisma initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3465,7 +3465,7 @@ public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatc ); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3505,7 +3505,7 @@ public void testGetClusterStateUsingDiffWithChecksum() throws IOException { initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3547,7 +3547,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOExceptio initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3589,7 +3589,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws I initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.DEBUG); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3630,7 +3630,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.TRACE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3692,7 +3692,7 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); diff --git a/server/src/test/java/org/opensearch/rest/BaseRestHandlerTests.java b/server/src/test/java/org/opensearch/rest/BaseRestHandlerTests.java index 45653e9d8e4d6..7534dcd93944a 100644 --- a/server/src/test/java/org/opensearch/rest/BaseRestHandlerTests.java +++ b/server/src/test/java/org/opensearch/rest/BaseRestHandlerTests.java @@ -35,6 +35,8 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.common.Table; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.rest.RestStatus; import org.opensearch.rest.RestHandler.ReplacedRoute; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest.Method; @@ -46,15 +48,22 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.lang.reflect.Modifier; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static org.hamcrest.core.StringContains.containsString; import static org.hamcrest.object.HasToString.hasToString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; public class BaseRestHandlerTests extends OpenSearchTestCase { private NodeClient mockClient; @@ -288,4 +297,36 @@ public void testReplaceRoutesMethod() throws Exception { } } + public void testRestHandlerWrapper() throws Exception { + RestHandler rh = new RestHandler() { + @Override + public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { + new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY); + } + }; + RestHandler handlerSpy = spy(rh); + RestHandler.Wrapper rhWrapper = new RestHandler.Wrapper(handlerSpy); + + List overridableMethods = Arrays.stream(RestHandler.class.getMethods()) + .filter( + m -> !(Modifier.isPrivate(m.getModifiers()) || Modifier.isStatic(m.getModifiers()) || Modifier.isFinal(m.getModifiers())) + ) + .collect(Collectors.toList()); + + for (java.lang.reflect.Method method : overridableMethods) { + int argCount = method.getParameterCount(); + Object[] args = new Object[argCount]; + for (int i = 0; i < argCount; i++) { + args[i] = any(); + } + if (args.length > 0) { + method.invoke(rhWrapper, args); + } else { + method.invoke(rhWrapper); + } + method.invoke(verify(handlerSpy, times(1)), args); + } + verifyNoMoreInteractions(handlerSpy); + } + } diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index 99b0386a48808..bc63362980bea 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -77,7 +77,7 @@ dependencies { api "ch.qos.logback:logback-core:1.5.8" api "ch.qos.logback:logback-classic:1.2.13" api "org.jboss.xnio:xnio-nio:3.8.16.Final" - api 'org.jline:jline:3.26.3' + api 'org.jline:jline:3.27.0' api 'org.apache.commons:commons-configuration2:2.11.0' api 'com.nimbusds:nimbus-jose-jwt:9.41.1' api ('org.apache.kerby:kerb-admin:2.1.0') { diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 35ca5d80aeb4e..ded457601c0ae 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -125,7 +125,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getSegmentReplicationRejectionStats(), nodeStats.getRepositoriesStats(), nodeStats.getAdmissionControlStats(), - nodeStats.getNodeCacheStats() + nodeStats.getNodeCacheStats(), + nodeStats.getRemoteStoreNodeStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 7adff82e72245..fa5fb736f518f 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2752,6 +2752,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(