Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Oct 1, 2024
1 parent c865f65 commit 4e42331
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,4 +1012,70 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s"))
.get();
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
if (randomBoolean()) {
for (int i = 0; i < randomIntBetween(1, 5); i++) {
indexSingleDoc(INDEX_NAME);
}
flushAndRefresh(INDEX_NAME);
}
// Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
// Index another doc and in this case the flush would have happened before the sync.
indexSingleDoc(INDEX_NAME);
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
// Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@

import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE_NODE_STATS;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -195,26 +194,31 @@ public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m")
.build();
internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings);
internalCluster().startDataOnlyNodes(2, pinnedTimestampEnabledSettings);
ensureStableCluster(3);
String remoteNodeName = internalCluster().startDataOnlyNodes(1, pinnedTimestampEnabledSettings).get(0);
ensureStableCluster(2);
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
remoteNodeName
);

logger.info("Sleeping for 70 seconds to wait for fetching of pinned timestamps");
Thread.sleep(70000);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L);
assertBusy(() -> {
long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L);
NodesStatsResponse nodesStatsResponse = internalCluster().client()
.admin()
.cluster()
.prepareNodesStats()
.addMetric(REMOTE_STORE_NODE_STATS.metricName())
.addMetric(REMOTE_STORE.metricName())
.execute()
.actionGet();
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps();
assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps);
}
}, 1, TimeUnit.MINUTES);
});

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public enum Metric {
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control"),
CACHE_STATS("caches"),
REMOTE_STORE_NODE_STATS("remote_store_node_stats");
REMOTE_STORE("remote_store");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics),
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.REMOTE_STORE_NODE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.REMOTE_STORE.containedIn(metrics)
);
}

Expand Down
3 changes: 1 addition & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1329,8 +1329,7 @@ protected Node(
segmentReplicationStatsTracker,
repositoryService,
admissionControlService,
cacheService,
remoteStoreNodeService
cacheService
);

final SearchService searchService = newSearchService(
Expand Down
12 changes: 5 additions & 7 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -102,7 +103,6 @@ public class NodeService implements Closeable {
private final AdmissionControlService admissionControlService;
private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;
private final CacheService cacheService;
private final RemoteStoreNodeService remoteStoreNodeService;

NodeService(
Settings settings,
Expand Down Expand Up @@ -130,8 +130,7 @@ public class NodeService implements Closeable {
SegmentReplicationStatsTracker segmentReplicationStatsTracker,
RepositoriesService repositoriesService,
AdmissionControlService admissionControlService,
CacheService cacheService,
RemoteStoreNodeService remoteStoreNodeService
CacheService cacheService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -161,7 +160,6 @@ public class NodeService implements Closeable {
clusterService.addStateApplier(searchPipelineService);
this.segmentReplicationStatsTracker = segmentReplicationStatsTracker;
this.cacheService = cacheService;
this.remoteStoreNodeService = remoteStoreNodeService;
}

public NodeInfo info(
Expand Down Expand Up @@ -246,7 +244,7 @@ public NodeStats stats(
boolean repositoriesStats,
boolean admissionControl,
boolean cacheService,
boolean remoteStoreNodeService
boolean remoteStoreNodeStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -280,7 +278,7 @@ public NodeStats stats(
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats() : null,
cacheService ? this.cacheService.stats(indices) : null,
remoteStoreNodeService ? this.remoteStoreNodeService.getRemoteStoreNodeStats() : null
remoteStoreNodeStats ? new RemoteStoreNodeStats(RemoteStorePinnedTimestampService.getPinnedTimestamps().v1()) : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryException;
Expand Down Expand Up @@ -250,12 +249,4 @@ public static boolean isMigratingToRemoteStore(Metadata metadata) {

return (isMixedMode && isRemoteStoreMigrationDirection);
}

public RemoteStoreNodeStats getRemoteStoreNodeStats() {
long lastSuccessfulFetchOfPinnedTimestamps = 0;
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
}
return new RemoteStoreNodeStats(lastSuccessfulFetchOfPinnedTimestamps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,28 @@

package org.opensearch.node.remotestore;

import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.RemoteStoreSettings;

import java.io.IOException;
import java.util.Objects;

/**
* Node level remote store stats
* @opensearch.internal
*/
public class RemoteStoreNodeStats implements Writeable, ToXContentFragment {

public static final String STATS_NAME = "remote_store_node_stats";
public static final String STATS_NAME = "remote_store";
public static final String LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS = "last_successful_fetch_of_pinned_timestamps";

/**
* Time stamp for the last successful fetch of pinned timestamps by the {@linkplain RemoteStorePinnedTimestampService}
*/
private long lastSuccessfulFetchOfPinnedTimestamps;
private final long lastSuccessfulFetchOfPinnedTimestamps;

public RemoteStoreNodeStats(final long lastSuccessfulFetchOfPinnedTimestamps) {
this.lastSuccessfulFetchOfPinnedTimestamps = lastSuccessfulFetchOfPinnedTimestamps;
Expand All @@ -41,18 +40,12 @@ public long getLastSuccessfulFetchOfPinnedTimestamps() {
}

public RemoteStoreNodeStats(StreamInput in) throws IOException {
// TODO: change version to V_2_18_0
if (in.getVersion().onOrAfter(Version.CURRENT) && RemoteStoreSettings.isPinnedTimestampsEnabled()) {
this.lastSuccessfulFetchOfPinnedTimestamps = in.readOptionalLong();
}
this.lastSuccessfulFetchOfPinnedTimestamps = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
// TODO: change version to V_2_18_0
if (out.getVersion().onOrAfter(Version.CURRENT) && RemoteStoreSettings.isPinnedTimestampsEnabled()) {
out.writeOptionalLong(this.lastSuccessfulFetchOfPinnedTimestamps);
}
out.writeLong(this.lastSuccessfulFetchOfPinnedTimestamps);
}

@Override
Expand All @@ -66,4 +59,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public String toString() {
return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}";
}

@Override
public boolean equals(Object o) {
if (o == null) {
return false;
}
if (o.getClass() != RemoteStoreNodeStats.class) {
return false;
}
RemoteStoreNodeStats other = (RemoteStoreNodeStats) o;
return this.lastSuccessfulFetchOfPinnedTimestamps == other.lastSuccessfulFetchOfPinnedTimestamps;
}

@Override
public int hashCode() {
return Objects.hash(lastSuccessfulFetchOfPinnedTimestamps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.opensearch.node.NodeResourceUsageStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
Expand Down Expand Up @@ -614,6 +615,14 @@ public void testSerialization() throws IOException {
} else {
assertEquals(nodeCacheStats, deserializedNodeCacheStats);
}

RemoteStoreNodeStats remoteStoreNodeStats = nodeStats.getRemoteStoreNodeStats();
RemoteStoreNodeStats deserializedRemoteStoreNodeStats = deserializedNodeStats.getRemoteStoreNodeStats();
if (remoteStoreNodeStats == null) {
assertNull(deserializedRemoteStoreNodeStats);
} else {
assertEquals(remoteStoreNodeStats, deserializedRemoteStoreNodeStats);
}
}
}
}
Expand Down Expand Up @@ -996,6 +1005,12 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
nodeCacheStats = new NodeCacheStats(cacheStatsMap, flags);
}

RemoteStoreNodeStats remoteStoreNodeStats = null;
if (frequently()) {
long lastSuccessfulFetchOfPinnedTimestamps = randomNonNegativeLong();
remoteStoreNodeStats = new RemoteStoreNodeStats(lastSuccessfulFetchOfPinnedTimestamps);
}

// TODO: Only remote_store based aspects of NodeIndicesStats are being tested here.
// It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now
return new NodeStats(
Expand Down Expand Up @@ -1028,7 +1043,7 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
null,
admissionControlStats,
nodeCacheStats,
null
remoteStoreNodeStats
);
}

Expand Down

0 comments on commit 4e42331

Please sign in to comment.