From 2a315ef5bc48ec37f0ddfcf9b60f9b98cce13fd9 Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 16 May 2024 16:12:31 -0700 Subject: [PATCH] Add rest, transport, service layer changes for Tiering Signed-off-by: Neetika Singhal --- .../org/opensearch/action/ActionModule.java | 5 + .../action/tiering/RestWarmTieringAction.java | 49 +++ .../action/tiering/TieringIndexRequest.java | 129 +++++++ .../tiering/TransportWarmTieringAction.java | 120 ++++++ .../action/tiering/WarmTieringAction.java | 27 ++ .../cluster/routing/RoutingPool.java | 5 +- .../common/settings/ClusterSettings.java | 2 + .../org/opensearch/index/IndexModule.java | 66 ++++ .../main/java/org/opensearch/node/Node.java | 10 + .../tiering/HotToWarmTieringService.java | 357 +++++++++++++++++ .../tiering/TieringClusterStateListener.java | 82 ++++ .../opensearch/tiering/TieringService.java | 312 +++++++++++++++ .../tiering/TieringServiceValidator.java | 347 +++++++++++++++++ .../action/tiering/TieringRequestTests.java | 72 ++++ .../tiering/TieringRequestValidatorTests.java | 358 ++++++++++++++++++ 15 files changed, 1940 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/TransportWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/WarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java create mode 100644 server/src/main/java/org/opensearch/tiering/TieringClusterStateListener.java create mode 100644 server/src/main/java/org/opensearch/tiering/TieringService.java create mode 100644 server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java create mode 100644 server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 5e2b62614fc47..7295808c40d84 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -288,6 +288,9 @@ import org.opensearch.action.termvectors.TransportMultiTermVectorsAction; import org.opensearch.action.termvectors.TransportShardMultiTermsVectorAction; import org.opensearch.action.termvectors.TransportTermVectorsAction; +import org.opensearch.action.tiering.RestWarmTieringAction; +import org.opensearch.action.tiering.TransportWarmTieringAction; +import org.opensearch.action.tiering.WarmTieringAction; import org.opensearch.action.update.TransportUpdateAction; import org.opensearch.action.update.UpdateAction; import org.opensearch.client.node.NodeClient; @@ -633,6 +636,7 @@ public void reg actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); + actions.register(WarmTieringAction.INSTANCE, TransportWarmTieringAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); @@ -964,6 +968,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestNodeAttrsAction()); registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); + registerHandler.accept(new RestWarmTieringAction()); registerHandler.accept(new RestTemplatesAction()); // Point in time API diff --git a/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java new file mode 100644 index 0000000000000..2035801d0da9e --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * Rest Tiering API class to move index from hot to warm + * + * @opensearch.experimental + */ +public class RestWarmTieringAction extends BaseRestHandler { + + @Override + public List routes() { + return singletonList(new RestHandler.Route(POST, "/{index}/_tier/_warm")); + } + + @Override + public String getName() { + return "tiering_warm"; + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest(request.param("index")); + tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout())); + tieringIndexRequest.clusterManagerNodeTimeout( + request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout()) + ); + tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false)); + return channel -> client.admin().cluster().execute(WarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java b/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java new file mode 100644 index 0000000000000..3125e418a04ca --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java @@ -0,0 +1,129 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.IndicesRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Represents the tiering request for indices + * to move to a different tier + * + * @opensearch.experimental + */ +@ExperimentalApi +public class TieringIndexRequest extends AcknowledgedRequest implements IndicesRequest.Replaceable { + + private String[] indices; + private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, true); + private boolean waitForCompletion; + public TieringIndexRequest() { + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (indices == null || indices.length == 0) { + validationException = addValidationError("Mandatory parameter - indices is missing from the request", validationException); + } + return validationException; + } + + public TieringIndexRequest(String... indices) { + this.indices = indices; + } + + public TieringIndexRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + waitForCompletion = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + out.writeBoolean(waitForCompletion); + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + @Override + public TieringIndexRequest indices(String... indices) { + this.indices = indices; + return this; + } + + public TieringIndexRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + /** + * If this parameter is set to true the operation will wait for completion of tiering process before returning. + * + * @param waitForCompletion if true the operation will wait for completion + * @return this request + */ + public TieringIndexRequest waitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + return this; + } + + /** + * Returns wait for completion setting + * + * @return true if the operation will wait for completion + */ + public boolean waitForCompletion() { + return waitForCompletion; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TieringIndexRequest that = (TieringIndexRequest) o; + return clusterManagerNodeTimeout.equals(that.clusterManagerNodeTimeout) + && timeout.equals(that.timeout) + && Objects.equals(indicesOptions, that.indicesOptions) + && Arrays.equals(indices, that.indices) + && waitForCompletion == that.waitForCompletion; + } + + @Override + public int hashCode() { + return Objects.hash(clusterManagerNodeTimeout, timeout, indicesOptions, waitForCompletion, Arrays.hashCode(indices)); + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/TransportWarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/TransportWarmTieringAction.java new file mode 100644 index 0000000000000..ce55b8369ac42 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/TransportWarmTieringAction.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tiering.TieringClusterStateListener; +import org.opensearch.tiering.TieringService; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS; + +/** + * Transport Tiering API class to move index from hot to warm + * + * @opensearch.experimental + */ +public class TransportWarmTieringAction extends TransportClusterManagerNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportWarmTieringAction.class); + private final TieringService tieringService; + private final Client client; + @Inject + public TransportWarmTieringAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, TieringService tieringService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Client client) { + super(WarmTieringAction.NAME, transportService, clusterService, threadPool, actionFilters, + TieringIndexRequest::new, indexNameExpressionResolver); + this.client = client; + this.tieringService = tieringService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(TieringIndexRequest request, ClusterState state) { + ClusterBlockException blockException = + state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + if (blockException == null) { + // Check indices level block + blockException = state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request)); + } + return blockException; + } + + @Override + protected void clusterManagerOperation(TieringIndexRequest request, ClusterState state, + ActionListener listener) throws Exception { + // Collect node stats to get node level filesystem info. The response will be used for performing some + // validations. + client.admin().cluster().prepareNodesStats().clear().addMetric(FS.metricName()).execute( + new ActionListener() { + @Override + public void onResponse(NodesStatsResponse nodesStatsResponse) { + // Collect index level stats. This response is also used for validations. + client.admin().indices().prepareStats().clear().setStore(true).setIndices(request.indices()).execute( + new ActionListener() { + @Override + public void onResponse(IndicesStatsResponse indicesStatsResponse) { + tieringService.tier(request, nodesStatsResponse, indicesStatsResponse, + ActionListener.delegateFailure(listener, (delegatedListener, acknowledgedResponse) -> { + if (request.waitForCompletion()) { + TieringClusterStateListener.createAndRegisterListener( + clusterService, + new AcknowledgedResponse(acknowledgedResponse.isAcknowledged()), + delegatedListener + ); + } else { + delegatedListener.onResponse(new AcknowledgedResponse(true)); + } + })); + } + @Override + public void onFailure(Exception e) { + logger.debug("Indices stats call failed with exception", e); + listener.onFailure(e); + } + }); + } + @Override + public void onFailure(Exception e) { + logger.debug("Node stats call failed with exception", e); + listener.onFailure(e); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/WarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/WarmTieringAction.java new file mode 100644 index 0000000000000..ffb4640e1f626 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/WarmTieringAction.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * Tiering action class to move index from hot to warm + * + * @opensearch.experimental + */ +public class WarmTieringAction extends ActionType { + + public static final WarmTieringAction INSTANCE = new WarmTieringAction(); + public static final String NAME = "indices:admin/tiering/warm"; + + public WarmTieringAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java index db10ad61c7d6d..04f76bd844bac 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.index.IndexModule; /** * {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods @@ -58,6 +59,8 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all * @return {@link RoutingPool} for the given index. */ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { - return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY; + return indexMetadata.isRemoteSnapshot() || + IndexModule.DataLocalityType.PARTIAL.name() + .equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())) ? REMOTE_CAPABLE : LOCAL_ONLY; } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09f32884e0ae1..60728132e7804 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -165,6 +165,7 @@ import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tiering.HotToWarmTieringService; import org.opensearch.transport.ProxyConnectionStrategy; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteConnectionStrategy; @@ -278,6 +279,7 @@ public void apply(Settings value, Settings current, Settings previous) { FsRepository.REPOSITORIES_CHUNK_SIZE_SETTING, FsRepository.REPOSITORIES_COMPRESS_SETTING, FsRepository.REPOSITORIES_LOCATION_SETTING, + HotToWarmTieringService.WARM_TIERING_MAX_SHARD_SIZE, IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING, IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING, IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 3c4cb4fd596c1..751873a401004 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -48,6 +48,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.TriFunction; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.Setting; @@ -107,6 +108,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.logging.log4j.util.Strings.toRootUpperCase; + /** * IndexModule represents the central extension point for index level custom implementations like: *
    @@ -160,6 +163,26 @@ public final class IndexModule { Property.NodeScope ); + /** + * Index setting which used to determine how the data is cached locally fully or partially + */ + public static final Setting INDEX_STORE_LOCALITY_SETTING = new Setting<>( + "index.store.data_locality", + DataLocalityType.FULL.name(), + DataLocalityType::getValueOf, + Property.IndexScope, + Property.NodeScope, + Property.PrivateIndex + ); + + public static final Setting INDEX_TIERING_STATE = new Setting<>( + "index.tiering.state", + TieringState.HOT.name(), + Function.identity(), + Property.IndexScope, + Property.PrivateIndex + ); + /** Which lucene file extensions to load with the mmap directory when using hybridfs store. This settings is ignored if {@link #INDEX_STORE_HYBRID_NIO_EXTENSIONS} is set. * This is an expert setting. * @see Lucene File Extensions. @@ -585,6 +608,49 @@ public static Type defaultStoreType(final boolean allowMmap) { } } + /** + * Indicates the locality of the data - whether it will be cached fully or partially + */ + @ExperimentalApi + public enum DataLocalityType { + /** + * Indicates that all the data will be cached locally + */ + FULL, + /** + * Indicates that only a subset of the data will be cached locally + */ + PARTIAL; + + private static final Map LOCALITY_TYPES; + + static { + final Map localityTypes = new HashMap<>(values().length); + for (final DataLocalityType dataLocalityType : values()) { + localityTypes.put(dataLocalityType.name(), dataLocalityType); + } + LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes); + } + + public static DataLocalityType getValueOf(final String localityType) { + Objects.requireNonNull(localityType, "No locality type given."); + final String localityTypeName = toRootUpperCase(localityType.trim()); + final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName); + if (type != null) { + return type; + } + throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "]."); + } + } + + @ExperimentalApi + public enum TieringState { + HOT, + HOT_TO_WARM, + WARM, + WARM_TO_HOT; + } + public IndexService newIndexService( IndexService.IndexCreationContext indexCreationContext, NodeEnvironment environment, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index f7a901335f34a..61fc3fad8d107 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -252,6 +252,8 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tiering.HotToWarmTieringService; +import org.opensearch.tiering.TieringService; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -1173,6 +1175,13 @@ protected Node( remoteClusterStateService ); + HotToWarmTieringService tieringService = new HotToWarmTieringService( + settings, + clusterService, + clusterModule.getIndexNameExpressionResolver(), + clusterModule.getAllocationService() + ); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( settings, clusterService::state, @@ -1369,6 +1378,7 @@ protected Node( b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus); b.bind(RestoreService.class).toInstance(restoreService); b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService); + b.bind(TieringService.class).toInstance(tieringService); b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); diff --git a/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java b/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java new file mode 100644 index 0000000000000..1551e35ff5c93 --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java @@ -0,0 +1,357 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.tiering.TieringIndexRequest; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.NotClusterManagerException; +import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.UUIDs; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexNotFoundException; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; +import static org.opensearch.tiering.TieringServiceValidator.validateHotToWarm; + +/** + * Service responsible for tiering indices from hot to warm + * @opensearch.experimental + */ +public class HotToWarmTieringService extends TieringService { + private static final Logger logger = LogManager.getLogger(HotToWarmTieringService.class); + + public static final Setting HOT_TO_WARM_START_TIME_SETTING = + Setting.longSetting("index.tiering.hot_to_warm.start_time", System.currentTimeMillis(), 0, Setting.Property.IndexScope, Setting.Property.PrivateIndex); + + public static final Setting HOT_TO_WARM_END_TIME_SETTING = + Setting.longSetting("index.tiering.hot_to_warm.end_time", 0, 0, Setting.Property.IndexScope, Setting.Property.PrivateIndex); + + /** + * The maximum shard size to be accepted for warm migration. We reject requests once shard size exceed this value. + */ + static final ByteSizeValue DEFAULT_WARM_TIERING_MAX_SHARD_SIZE = new ByteSizeValue(100, ByteSizeUnit.GB); + public static final Setting WARM_TIERING_MAX_SHARD_SIZE = + Setting.byteSizeSetting("warm.tiering.max_shard_size", DEFAULT_WARM_TIERING_MAX_SHARD_SIZE, + Setting.Property.Dynamic, Setting.Property.NodeScope); + + public static final String TIERING_INDEX_UUID = "index.tiering.uuid"; + private Map> indexShardTieringStatus = new HashMap<>(); + + @Inject + public HotToWarmTieringService(Settings settings, ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService) { + super(settings, clusterService, indexNameExpressionResolver, allocationService); + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + clusterSettings.addSettingsUpdateConsumer(WARM_TIERING_MAX_SHARD_SIZE, this::setWarmTieringMaxShardSize); + setWarmTieringMaxShardSize(clusterSettings.get(WARM_TIERING_MAX_SHARD_SIZE)); + setBytesThresholdLow(ByteSizeValue.parseBytesSizeValue("0b", clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING))); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + + // TODO add handling for master switch, dangling indices, master reload + + if(event.routingTableChanged()) { + updateIndexShardStatus(event.state()); + } + } + + private void updateIndexShardStatus(ClusterState clusterState) { + List routingTable; + Set relocationCompletedIndices = new HashSet<>(); + Set failedIndices = new HashSet<>(); + for(Index index: indexShardTieringStatus.keySet()) { + try { + // Ensure index is not deleted + routingTable = clusterState.routingTable().allShards(index.getName()); + } catch (IndexNotFoundException ex) { + // Index already deleted nothing to do + logger.warn("Index [{}] deleted before hot to warm relocation finished", index.getName()); + return; + } + + Map shardTieringStatusMap = indexShardTieringStatus.getOrDefault(index, new HashMap<>()); + List processingShards = shardTieringStatusMap.keySet().stream() + .filter(shardId -> shardTieringStatusMap.get(shardId).state() == State.INIT || shardTieringStatusMap.get(shardId).state() == State.PROCESSING) + .collect(Collectors.toList()); + if (processingShards.isEmpty()) { + // No shards are in processing state, nothing to do + // This means that tiering for the index is completed - either failed or successful + continue; + } + boolean relocationCompleted = true; + for (ShardRouting shard : routingTable) { + if (shardTieringStatusMap.get(shard.shardId()) != null && + (State.SUCCESSFUL.equals(shardTieringStatusMap.get(shard.shardId()).state()) || State.FAILED.equals(shardTieringStatusMap.get(shard.shardId()).state()))) { + continue; + } + TieringService.State tieringState; + String reason = null; + boolean isShardFoundOnSearchNode = clusterState.getNodes().get(shard.currentNodeId()).isSearchNode(); + boolean isShardRelocatingToSearchNode = clusterState.getNodes().get(shard.relocatingNodeId()).isSearchNode(); + + if (shard.active() && isShardFoundOnSearchNode) { + tieringState = State.SUCCESSFUL; + } else if (isShardFoundOnSearchNode || isShardRelocatingToSearchNode) { + tieringState = State.PROCESSING; + relocationCompleted = false; + } else { + tieringState = State.FAILED; + relocationCompleted = false; + failedIndices.add(index); + reason = "Shard is neither allocated nor relocating to the search node, current node: " + shard.currentNodeId() + + ", relocating node: " + shard.relocatingNodeId(); + if (shard.unassigned()) { + reason += " Shard is unassigned due to " + shard.unassignedInfo().getReason(); + } + } + shardTieringStatusMap.put(shard.shardId(), new ShardTieringStatus(shard.currentNodeId(), shard.state(), tieringState, reason)); + } + indexShardTieringStatus.put(index, shardTieringStatusMap); + if (relocationCompleted) { + logger.info("Hot to warm relocation completed for index [{}]", index.getName()); + relocationCompletedIndices.add(index); + } + } + if (!relocationCompletedIndices.isEmpty()) { + processSuccessfullyTieredIndices(relocationCompletedIndices); + } + if (!failedIndices.isEmpty()) { + processFailedIndices(failedIndices); + } + } + + private void processFailedIndices(Set indices) { + clusterService.submitStateUpdateTask("process hot to warm tiering for failed indices", new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + + for (Index index : indices) { + final IndexMetadata indexMetadata = metadataBuilder.get(index.getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + + // update tiering settings here + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT); + indexSettingsBuilder.put(HOT_TO_WARM_END_TIME_SETTING.getKey(), System.currentTimeMillis()); + + // Update index settings version + final IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + builder.settingsVersion(1 + builder.settingsVersion()); + metadataBuilder.put(builder); + } + + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(currentState.routingTable()) + .blocks(blocks) + .build(); + + // now, reroute to trigger shard relocation for shards to go back to hot nodes + updatedState = allocationService.reroute(updatedState, "hot to warm revert tiering"); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("failed to complete hot to warm tiering for indices " + + "[{}]", indices), e); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + } + }); + } + + private void processSuccessfullyTieredIndices(Set indices) { + clusterService.submitStateUpdateTask("complete hot to warm tiering", new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + + for (Index index : indices) { + final IndexMetadata indexMetadata = metadataBuilder.get(index.getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // put/update tiering settings here + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.WARM); + indexSettingsBuilder.put(HOT_TO_WARM_END_TIME_SETTING.getKey(), System.currentTimeMillis()); + + // Update index settings version + final IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + builder.settingsVersion(1 + builder.settingsVersion()); + metadataBuilder.put(builder); + } + + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .blocks(blocks) + .build(); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("failed to complete hot to warm tiering for indices " + + "[{}]", indices), e); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + for (Index index : indices) { + indexShardTieringStatus.remove(index); + } + } + }); + } + + @Override + public void tier(final TieringIndexRequest request, final NodesStatsResponse nodesStatsResponse, + final IndicesStatsResponse indicesStatsResponse, final ActionListener listener) { + clusterService.submitStateUpdateTask("start hot to warm tiering", new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + int noOfWarmReplicas = 1; + final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(currentState, request); + + Index[] validatedIndices = validateHotToWarm(currentState, concreteIndices, nodesStatsResponse, indicesStatsResponse, warmTieringMaxShardSize, bytesThresholdLow); + final String[] actualIndices = new String[validatedIndices.length]; + for (int i = 0; i < validatedIndices.length; i++) { + actualIndices[i] = validatedIndices[i].getName(); + } + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + + routingTableBuilder.updateNumberOfReplicas(noOfWarmReplicas, actualIndices); + metadataBuilder.updateNumberOfReplicas(noOfWarmReplicas, actualIndices); + + for (Index index : validatedIndices) { + final IndexMetadata indexMetadata = metadataBuilder.get(index.getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // put additional settings here + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT_TO_WARM); + indexSettingsBuilder.put(TIERING_INDEX_UUID, UUIDs.randomBase64UUID()); + indexSettingsBuilder.put(HOT_TO_WARM_START_TIME_SETTING.getKey(), System.currentTimeMillis()); + indexSettingsBuilder.put(HOT_TO_WARM_END_TIME_SETTING.getKey(), -1); + + // Update index settings version + final IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + builder.settingsVersion(1 + builder.settingsVersion()); + metadataBuilder.put(builder); + Map shardTieringStatus = new HashMap<>(); + for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { + ShardId shardIdObj = new ShardId(indexMetadata.getIndex(), shard); + shardTieringStatus.put(shardIdObj, new ShardTieringStatus(currentState.nodes().getLocalNodeId(), null)); + } + indexShardTieringStatus.put(index, shardTieringStatus); + } + + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .blocks(blocks) + .build(); + + // now, reroute to trigger shard relocation for the dedicated case + updatedState = allocationService.reroute(updatedState, "hot to warm tiering"); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + // TODO check if can print only the failed indices + logger.warn((Supplier) () -> new ParameterizedMessage("failed to start warm tiering for indices " + + "[{}]", (Object) request.indices()), e); + listener.onFailure(e); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public TimeValue timeout() { + return request.clusterManagerNodeTimeout(); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/tiering/TieringClusterStateListener.java b/server/src/main/java/org/opensearch/tiering/TieringClusterStateListener.java new file mode 100644 index 0000000000000..8954458e8eba6 --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/TieringClusterStateListener.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; + +/** + * Transport listener for cluster state updates + * + * @opensearch.internal + * @opensearch.experimental + */ +public class TieringClusterStateListener implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(TieringClusterStateListener.class); + + private final ClusterService clusterService; + private final ActionListener listener; + + private TieringClusterStateListener( + ClusterService clusterService, + AcknowledgedResponse response, + ActionListener listener + ) { + this.clusterService = clusterService; + this.listener = listener; + } + + @Override + public void clusterChanged(ClusterChangedEvent changedEvent) { + //TODO: to be added + } + + /** + * Creates a cluster state listener and registers it with the cluster service. The listener passed as a + * parameter will be called when the restore is complete. + */ + public static void createAndRegisterListener( + ClusterService clusterService, + AcknowledgedResponse response, + ActionListener listener + ) { + clusterService.addListener(new TieringClusterStateListener(clusterService, response, listener)); + } +} diff --git a/server/src/main/java/org/opensearch/tiering/TieringService.java b/server/src/main/java/org/opensearch/tiering/TieringService.java new file mode 100644 index 0000000000000..1daa484437ac7 --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/TieringService.java @@ -0,0 +1,312 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.tiering.TieringIndexRequest; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.unit.ByteSizeValue; + +import java.io.IOException; +import java.util.Objects; + +/** + * Service responsible for tiering indices + * @opensearch.experimental + */ +public abstract class TieringService extends AbstractLifecycleComponent implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(TieringService.class); + + protected final ClusterService clusterService; + + protected final IndexNameExpressionResolver indexNameExpressionResolver; + + protected final AllocationService allocationService; + protected volatile ByteSizeValue warmTieringMaxShardSize; + protected volatile ByteSizeValue bytesThresholdLow; + + public TieringService(Settings settings, ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, AllocationService allocationService) { + super(); + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.allocationService = allocationService; + + if (DiscoveryNode.isClusterManagerNode(settings)) { + clusterService.addListener(this); + } + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + } + + + /** + * Tier indices from hot to warm / warm to hot + * @param request - tiering request + * @param nodesStatsResponse - node stats response of the nodes in the cluster + * @param indicesStatsResponse - indices stats response of the indices in the cluster + * @param listener - call back listener + */ + public abstract void tier(final TieringIndexRequest request, final NodesStatsResponse nodesStatsResponse, + final IndicesStatsResponse indicesStatsResponse, final ActionListener listener); + + /** + * @param warmTieringMaxShardSize - maximum shard size for hot to warm migration to set + */ + protected void setWarmTieringMaxShardSize(ByteSizeValue warmTieringMaxShardSize) { + this.warmTieringMaxShardSize = warmTieringMaxShardSize; + } + + public ByteSizeValue getBytesThresholdLow() { + return bytesThresholdLow; + } + + public void setBytesThresholdLow(ByteSizeValue bytesThresholdLow) { + this.bytesThresholdLow = bytesThresholdLow; + } + + /** + * Represents status of a tiering shard + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class ShardTieringStatus implements Writeable { + private State state; + private ShardRoutingState shardRoutingState; + private String nodeId; + private String reason; + + private ShardTieringStatus() {} + + /** + * Constructs a new shard tiering status in initializing state on the given node + * + * @param nodeId node id + */ + public ShardTieringStatus(String nodeId) { + this(nodeId, null); + } + + /** + * Constructs a new shard tiering status in with specified state on the given node + * + * @param nodeId node id + * @param shardRoutingState shard state + */ + public ShardTieringStatus(String nodeId, ShardRoutingState shardRoutingState) { + this(nodeId, shardRoutingState, State.INIT, null); + } + + /** + * Constructs a new shard tiering status in with specified state on the given node with specified failure reason + * + * @param nodeId node id + * @param state shard state + * @param reason failure reason + */ + public ShardTieringStatus(String nodeId, ShardRoutingState shardRoutingState, State state, String reason) { + this.nodeId = nodeId; + this.state = state; + this.reason = reason; + this.shardRoutingState = shardRoutingState; + } + + /** + * Returns current state + * + * @return current state + */ + public State state() { + return state; + } + + /** + * Returns node id of the node where shared is getting tiered + * + * @return node id + */ + public String nodeId() { + return nodeId; + } + + /** + * Returns failure reason + * + * @return failure reason + */ + public String reason() { + return reason; + } + + /** + * Reads tiering status from stream input + * + * @param in stream input + * @return restore status + */ + public static ShardTieringStatus readShardRestoreStatus(StreamInput in) throws IOException { + ShardTieringStatus shardTieringStatus = new ShardTieringStatus(); + shardTieringStatus.readFrom(in); + return shardTieringStatus; + } + + /** + * Reads tiering status from stream input + * + * @param in stream input + */ + public void readFrom(StreamInput in) throws IOException { + nodeId = in.readOptionalString(); + state = State.fromValue(in.readByte()); + reason = in.readOptionalString(); + } + + /** + * Writes tiering status to stream output + * + * @param out stream input + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(nodeId); + out.writeByte(state.value); + out.writeOptionalString(reason); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ShardTieringStatus status = (ShardTieringStatus) o; + return state == status.state && Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason); + } + + @Override + public int hashCode() { + return Objects.hash(state, nodeId, reason); + } + } + + + /** + * Tiering state + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum State { + /** + * Initializing state + */ + INIT((byte) 0), + /** + * Processing state + */ + PROCESSING((byte) 1), + /** + * Tiering finished successfully + */ + SUCCESSFUL((byte) 2), + /** + * Tiering failed + */ + FAILED((byte) 3); + + private final byte value; + + /** + * Constructs new state + * + * @param value state code + */ + State(byte value) { + this.value = value; + } + + /** + * Returns state code + * + * @return state code + */ + public byte value() { + return value; + } + + /** + * Returns true if tiering completed (either successfully or with failure) + * + * @return true if tiering completed + */ + public boolean completed() { + return this == SUCCESSFUL || this == FAILED; + } + + /** + * Returns state corresponding to state code + * + * @param value stat code + * @return state + */ + public static State fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return PROCESSING; + case 2: + return SUCCESSFUL; + case 3: + return FAILED; + default: + throw new IllegalArgumentException("No tiering state for value [" + value + "]"); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java b/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java new file mode 100644 index 0000000000000..ae32528866066 --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java @@ -0,0 +1,347 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.IndexStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +/** + * Validator class to validate the tiering requests of the index + * @opensearch.experimental + */ +public class TieringServiceValidator { + + private static final Logger logger = LogManager.getLogger(TieringServiceValidator.class); + + /** + * Validates the tiering request for indices going from hot to warm tier + * + * @param currentState current cluster state + * @param indices array of indices to be validated + * @param nodesStatsResponse node stats response of the nodes in the cluster + * @param indicesStatsResponse indices stats response of the indices in the cluster + * @param warmTieringMaxShardSize warm tiering max shard size + * @param bytesThresholdLow disk threshold low + * @return array of indices that are accepted for tiering + */ + public static Index[] validateHotToWarm(final ClusterState currentState, final Index[] indices, + final NodesStatsResponse nodesStatsResponse, + final IndicesStatsResponse indicesStatsResponse, + final ByteSizeValue warmTieringMaxShardSize, + final ByteSizeValue bytesThresholdLow) { + String indexNames = Arrays.stream(indices) + .map(Index::getName) + .collect(Collectors.joining(", ")); + // can we have common validate class + validateTieredRemoteIndexFeatureFlag(indexNames); + validateSearchNodes(currentState, indexNames); + validateDiskThresholdWaterMarkNotBreached(currentState, nodesStatsResponse, bytesThresholdLow); + + List notHotIndices = new ArrayList<>(); + List notRemoteStoreBacked = new ArrayList<>(); + List closedIndices = new ArrayList<>(); + List redIndices = new ArrayList<>(); + List indicesWithLargeShard = new ArrayList<>(); + List acceptedIndices = new ArrayList<>(); + + for (Index index : indices) { + if (!validateHotIndex(currentState, index)) { + notHotIndices.add(index.getName()); + continue; + } + if (!validateRemoteStoreIndex(currentState, index)) { + notRemoteStoreBacked.add(index.getName()); + continue; + } + if (!validateOpenIndex(currentState, index)) { + closedIndices.add(index.getName()); + continue; + } + if (!validateIndexHealth(currentState, index)) { + redIndices.add(index.getName()); + continue; + } + if(!validateMaxShardSize(indicesStatsResponse, index.getName(), warmTieringMaxShardSize)) { + indicesWithLargeShard.add(index.getName()); + continue; + } + acceptedIndices.add(index); + } + + if (!notHotIndices.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are because they are already in the warm tier or the tiering is currently in progress.", notRemoteStoreBacked); + } + if (!notRemoteStoreBacked.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are because they are not remote store enabled.", notRemoteStoreBacked); + } + if (!closedIndices.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are closed.", closedIndices); + } + if (!redIndices.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are red.", redIndices); + } + + if(!indicesWithLargeShard.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they exceed" + + " the warm tiering shard size limit of [{}] GiB.", indicesWithLargeShard, warmTieringMaxShardSize.getGb()); + } + + acceptedIndices.addAll(validateEligibleNodesCapacity(nodesStatsResponse, indicesStatsResponse, currentState, indices, 0)); + logger.info("Successfully accepted indices for tiering [{}]", acceptedIndices); + + return acceptedIndices.toArray(Index.EMPTY_ARRAY); + } + + /** + * Validates that the tiered remote index feature flag is enabled in the current cluster state. + * + * @param indexNames the names of the indices being validated + * @throws IllegalArgumentException if the tiered remote index feature flag is not enabled + */ + public static void validateTieredRemoteIndexFeatureFlag(final String indexNames) { + if (!FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING)) { + final String errorMsg = "Rejecting tiering request for indices [" + indexNames + + "] because feature flag [opensearch.experimental.feature.tiered_remote_index.enabled] is not enabled"; + logger.warn(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + } + + /** + * Validates that there are eligible nodes with the search role in the current cluster state. + * (only for the dedicated case - to be removed later) + * + * @param currentState the current cluster state + * @param indexNames the names of the indices being validated + * @throws IllegalArgumentException if there are no eligible search nodes in the cluster + */ + public static void validateSearchNodes(final ClusterState currentState, final String indexNames) { + if (getEligibleNodes(currentState).isEmpty()) { + final String errorMsg = "Rejecting tiering request for indices [" + indexNames + "] because there are no nodes found with the search role"; + logger.warn(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + } + + /** + * Validates that the specified index has the remote store setting enabled. + * + * @param state the current cluster state + * @param index the index to be validated + * @return true if the remote store setting is enabled for the index, false otherwise + */ + public static boolean validateRemoteStoreIndex(final ClusterState state, final Index index) { + return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(state.metadata().getIndexSafe(index).getSettings()); + } + + /** + * Validates that the specified index is in the "hot" tiering state. + * + * @param state the current cluster state + * @param index the index to be validated + * @return true if the index is in the "hot" tiering state, false otherwise + */ + public static boolean validateHotIndex(final ClusterState state, final Index index) { + return IndexModule.TieringState.HOT.name().equals(INDEX_TIERING_STATE.get(state.metadata().getIndexSafe(index).getSettings())); + } + + /** + * Validates the health of the specified index in the current cluster state. + * + * @param currentState the current cluster state + * @param index the index to be validated + * @return true if the index health is not in the "red" state, false otherwise + */ + public static boolean validateIndexHealth(final ClusterState currentState, final Index index) { + final IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); + final IndexMetadata indexMetadata = currentState.metadata().index(index); + final ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable); + return !ClusterHealthStatus.RED.equals(indexHealth.getStatus()); + } + + /** + * Validates that the specified index is in the open state in the current cluster state. + * + * @param currentState the current cluster state + * @param index the index to be validated + * @return true if the index is in the open state, false otherwise + */ + public static boolean validateOpenIndex(final ClusterState currentState, final Index index) { + return currentState.metadata().index(index).getState() == IndexMetadata.State.OPEN; + } + + /** + * Validates that the disk threshold low watermark is not breached on any of the eligible nodes in the cluster. + * + * @param currentState the current cluster state + * @param nodesStats the current nodes statistics for the cluster + * @param bytesThresholdLow the low watermark threshold for disk usage, in bytes + * @throws IllegalArgumentException if the disk threshold low watermark is breached on all eligible nodes + */ + public static void validateDiskThresholdWaterMarkNotBreached(final ClusterState currentState, final NodesStatsResponse nodesStats, final ByteSizeValue bytesThresholdLow) { + final Set eligibleNodes = getEligibleNodes(currentState); + int count = 0; + for (NodeStats nodeStat : nodesStats.getNodes()) { + if (eligibleNodes.contains(nodeStat.getNode())) { + final long totalAvailBytesOnNode = Math.max(0, nodeStat.getFs().getTotal().getAvailable().getBytes()); + if (totalAvailBytesOnNode < bytesThresholdLow.getBytes()) { + logger.warn("Disk threshold low watermark breached on the node [{}]", nodeStat.getNode().getId()); + count++; + } + } + } + + if (count == eligibleNodes.size()) { + throw new IllegalArgumentException("Disk threshold low watermark breached on all the search nodes, hence no new index can be accepted for tiering "); + } + } + + /** + * Validates that the size of all shards in the specified index do not exceed the maximum shard size limit. + * + * @param indicesStats the current indices statistics for the cluster + * @param index the index to be validated + * @param warmTieringMaxShardSize the maximum allowed shard size for the warm tier + * @return true if all shards in the index are within the size limit, false otherwise + */ + public static boolean validateMaxShardSize(final IndicesStatsResponse indicesStats, final String index, + final ByteSizeValue warmTieringMaxShardSize) { + // Find all shards stats from the given request index + final ShardStats[] requestIndexShardStats = indicesStats.getIndex(index).getShards(); + + for (ShardStats shardStats : requestIndexShardStats) { + long currentShardBytes = Math.max(0, shardStats.getStats().store.getSizeInBytes()); + ByteSizeValue currentShardSize = new ByteSizeValue(currentShardBytes); + + // Check if the shard size exceed the max shard size limit + if (warmTieringMaxShardSize.compareTo(currentShardSize) < 0) { + return false; + } + } + return true; + } + + /** + * Validates the capacity of eligible nodes in the cluster to accommodate the specified indices. + * Best effort since nodes/index might fail to report their stats. + * + * @param nodesStats the current nodes statistics for the cluster + * @param indicesStats the current indices statistics for the cluster + * @param currentState the current cluster state + * @param indices the indices to be validated + * @param bufferSpace the amount of buffer space to reserve on each node + * @return a list of indices that can be accommodated by the eligible nodes + */ + public static List validateEligibleNodesCapacity(final NodesStatsResponse nodesStats, final IndicesStatsResponse indicesStats, + final ClusterState currentState, final Index[] indices, + final long bufferSpace) { + + final Set eligibleNodes = getEligibleNodes(currentState); + long totalAvailableBytesInWarmTier = getTotalAvailableBytesInWarmTier(nodesStats, eligibleNodes, bufferSpace); + + // It's possible that index stats is missing for the index to be migrated. + // Assume 0 size in that case, but still validate that we have at least some (i.e. > 0) headroom + // with the warm capacity. + Map indexSizes = new HashMap<>(); + Arrays.stream(indices) + .forEach(index-> { + IndexStats requestIndexStats = indicesStats.getIndex(index.getName()); + if (requestIndexStats != null) { + indexSizes.put(index, requestIndexStats.getPrimaries().store.sizeInBytes()); + } else { + indexSizes.put(index, 0L); + logger.warn("Couldn't get size of index [" + index + "], going to assume 0 size"); + } + }); + + if (indexSizes.values().stream().mapToLong(Long::longValue).sum() < totalAvailableBytesInWarmTier) { + return Arrays.asList(indices); + } + HashMap sortedIndexSizes = indexSizes.entrySet().stream() + .sorted(Map.Entry.comparingByValue()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, HashMap::new)); + + long requestIndexBytes = 0L; + List acceptedIndices = new ArrayList<>(); + List rejectedIndices = new ArrayList<>(); + for(Index index: sortedIndexSizes.keySet()) { + requestIndexBytes += sortedIndexSizes.get(index); + if (requestIndexBytes < totalAvailableBytesInWarmTier) { + acceptedIndices.add(index); + } else { + rejectedIndices.add(index.getName()); + } + } + if (!rejectedIndices.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because total available bytes is less than the " + + "requested bytes.", rejectedIndices); + } + return acceptedIndices; + } + + /** + * Calculates the total available bytes in the warm tier of the cluster. + * + * @param nodesStats the current nodes statistics for the cluster + * @param eligibleNodes the set of eligible nodes in the cluster + * @param bufferSpace the amount of buffer space to reserve on each node + * @return the total available bytes in the warm tier + */ + public static long getTotalAvailableBytesInWarmTier(final NodesStatsResponse nodesStats, final Set eligibleNodes, + final long bufferSpace) { + long totalAvailableBytes = 0; + for (NodeStats nodeStat : nodesStats.getNodes()) { + if (eligibleNodes.contains(nodeStat.getNode())) { + final long totalBytesOnNode = Math.max(0, nodeStat.getFs().getTotal().getTotal().getBytes()); + final long totalAvailBytesOnNode = Math.max(0, nodeStat.getFs().getTotal().getAvailable().getBytes()); + final long diskSpaceBufferBytes = Math.round(Math.min(bufferSpace, totalBytesOnNode * .2)); + totalAvailableBytes += Math.max(0, (totalAvailBytesOnNode - diskSpaceBufferBytes)); + } + } + return totalAvailableBytes; + } + + /** + * Retrieves the set of eligible(search) nodes from the current cluster state. + * + * @param currentState the current cluster state + * @return the set of eligible nodes + */ + public static Set getEligibleNodes(final ClusterState currentState) { + final Map nodes = currentState.getNodes().getDataNodes(); + return nodes.values().stream() + .filter(DiscoveryNode::isSearchNode) + .collect(Collectors.toSet()); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java new file mode 100644 index 0000000000000..246c37bdb556f --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.tiering; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.tiering.TieringIndexRequest; +import org.opensearch.common.UUIDs; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.snapshots.Snapshot; +import org.opensearch.snapshots.SnapshotId; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; + + +public class TieringRequestTests extends OpenSearchTestCase { + + public void testTieringRequestWithListOfIndices() { + TieringIndexRequest request = new TieringIndexRequest(); + request.indices("foo", "bar", "baz"); + request.indices("test").indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + ActionRequestValidationException validationException = request.validate(); + assertNull(validationException); + } + + public void testTieringRequestWithIndexPattern() { + TieringIndexRequest request = new TieringIndexRequest(); + request.indices("foo-*"); + ActionRequestValidationException validationException = request.validate(); + assertNull(validationException); + } + + public void testTieringRequestWithNullOrEmptyIndices() { + TieringIndexRequest request = new TieringIndexRequest(); + ActionRequestValidationException validationException = request.validate(); + assertNotNull(validationException); + request.indices(); + validationException = request.validate(); + assertNotNull(validationException); + } + + public void testSerDeOfTieringRequest() throws IOException { + TieringIndexRequest request = new TieringIndexRequest("test"); + try(BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try(StreamInput in = out.bytes().streamInput()) { + final TieringIndexRequest deserializedRequest = new TieringIndexRequest(in); + assertEquals(request, deserializedRequest); + } + } + } + + public void testTieringRequestEquals() { + final TieringIndexRequest original = new TieringIndexRequest("test"); + original.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + final TieringIndexRequest expected = new TieringIndexRequest(original.indices()); + expected.indicesOptions(original.indicesOptions()); + assertThat(expected, equalTo(original)); + assertThat(expected.indices(), equalTo(original.indices())); + assertThat(expected.indicesOptions(), equalTo(original.indicesOptions())); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java new file mode 100644 index 0000000000000..45108d14f32b9 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java @@ -0,0 +1,358 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.tiering; + +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsTests; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.StoreStats; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.tiering.TieringServiceValidator; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static java.util.Collections.emptyList; + +public class TieringRequestValidatorTests extends OpenSearchTestCase { + + private TieringServiceValidator validator = new TieringServiceValidator(); + + public void testValidateSearchNodes() { + + ClusterState clusterStateWithSearchNodes = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(2, 0, 0)) + .build(); + + //throws no errors + validator.validateSearchNodes(clusterStateWithSearchNodes, "test_index"); + } + + public void testValidateSearchNodesThrowError() { + + ClusterState clusterStateWithNoSearchNodes = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(0, 1, 1)) + .build(); + //throws error + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> validator.validateSearchNodes(clusterStateWithNoSearchNodes, "test")); + } + + public void testValidRemoteStoreIndex() { + + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + ClusterState clusterState1 = buildClusterState(indexName, indexUuid, + Settings.builder() + .put(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .build() + ); + + assertTrue(validator.validateRemoteStoreIndex(clusterState1, new Index(indexName, indexUuid))); + } + + public void testNotRemoteStoreIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertFalse(validator.validateRemoteStoreIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testValidHotIndex() { + + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertTrue(validator.validateHotIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testNotHotIndex() { + + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + IndexModule.TieringState tieringState = randomBoolean() ? IndexModule.TieringState.HOT_TO_WARM : IndexModule.TieringState.WARM; + + ClusterState clusterState = buildClusterState(indexName, indexUuid, + Settings.builder() + .put(IndexModule.INDEX_TIERING_STATE.getKey(), tieringState) + .build() + ); + assertFalse(validator.validateHotIndex(clusterState, new Index(indexName, indexUuid))); + } + + public void testValidateIndexHealth() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + ClusterState clusterState = buildClusterState(indexName, indexUuid, Settings.EMPTY); + assertTrue(validator.validateIndexHealth(clusterState, new Index(indexName, indexUuid))); + } + + public void testValidOpenIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertTrue(validator.validateOpenIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testNotValidOpenIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertFalse(validator.validateOpenIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY, IndexMetadata.State.CLOSE), new Index(indexName, indexUuid))); + } + + public void testValidateDiskThresholdWaterMarkNotBreached() { + DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder() + .add(node1) + .add(node2) + .build()) + .build(); + + NodesStatsResponse nodesStatsResponse = createNodesStatsResponse(List.of(node1, node2), clusterState.getClusterName(), 100, 90, 100); + // throws no error + validator.validateDiskThresholdWaterMarkNotBreached(clusterState, nodesStatsResponse, new ByteSizeValue(20)); + } + + public void testValidateDiskThresholdWaterMarkNotBreachedThrowsError() { + DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder() + .add(node1) + .add(node2) + .build()) + .build(); + + NodesStatsResponse nodesStatsResponse = createNodesStatsResponse(List.of(node1, node2), clusterState.getClusterName(), 100, 90, 20); + // throws error + expectThrows(IllegalArgumentException.class, () -> validator.validateDiskThresholdWaterMarkNotBreached(clusterState, nodesStatsResponse, new ByteSizeValue(100))); + } + + public void testValidateMaxShardSize() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + ClusterState clusterState = buildClusterState(indexName, indexUuid, Settings.EMPTY); + ShardRouting[] shardRoutings = clusterState.routingTable().allShards(indexName) + .stream() + .filter(ShardRouting::primary) + .toArray(ShardRouting[]::new); + + Index index = new Index(indexName, indexUuid); + // Setting shard per size as 10 mb + long sizePerShard = 10 * 1024 * 1024; + IndicesStatsResponse indicesStatsResponse = createIndicesStatsResponse(index, clusterState, sizePerShard); + assertTrue(validator.validateMaxShardSize(indicesStatsResponse, indexName, new ByteSizeValue(100, ByteSizeUnit.MB))); + } + + public void testValidateMaxShardSizeViolated() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + ClusterState clusterState = buildClusterState(indexName, indexUuid, Settings.EMPTY); + ShardRouting[] shardRoutings = clusterState.routingTable().allShards(indexName) + .stream() + .filter(ShardRouting::primary) + .toArray(ShardRouting[]::new); + + Index index = new Index(indexName, indexUuid); + // Setting shard per size as 100 mb + long sizePerShard = 100 * 1024 * 1024; + IndicesStatsResponse indicesStatsResponse = createIndicesStatsResponse(index, clusterState, sizePerShard); + assertFalse(validator.validateMaxShardSize(indicesStatsResponse, indexName, new ByteSizeValue(50, ByteSizeUnit.MB))); + } + + public void testValidateEligibleNodesCapacity() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + ClusterState clusterState = ClusterState.builder(buildClusterState(indexName, indexUuid, Settings.EMPTY)) + .nodes(DiscoveryNodes.builder() + .add(node1) + .add(node2) + .build()) + .build(); + + NodesStatsResponse nodesStatsResponse = createNodesStatsResponse(List.of(node1, node2), clusterState.getClusterName(), 100, 90, 100); + Index index = new Index(indexName, indexUuid); + // Setting shard per size as 10 b + long sizePerShard = 10; + IndicesStatsResponse indicesStatsResponse = createIndicesStatsResponse(index, clusterState, sizePerShard); + Index[] indices = new Index[]{index}; + List acceptedIndices = validator.validateEligibleNodesCapacity(nodesStatsResponse, indicesStatsResponse, clusterState, indices, 0); + + assertEquals(indices.length, acceptedIndices.size()); + assertEquals(Arrays.asList(indices), acceptedIndices); + + nodesStatsResponse = createNodesStatsResponse(List.of(node1, node2), clusterState.getClusterName(), 100, 90, 100); + // Setting shard per size as 10 mb + sizePerShard = 10 * 1024; + indicesStatsResponse = createIndicesStatsResponse(index, clusterState, sizePerShard); + acceptedIndices = validator.validateEligibleNodesCapacity(nodesStatsResponse, indicesStatsResponse, clusterState, indices, 0); + + assertEquals(0, acceptedIndices.size()); + } + + public void testGetTotalAvailableBytesInWarmTier() { + DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder() + .add(node1) + .add(node2) + .build()) + .build(); + NodesStatsResponse nodesStatsResponse = createNodesStatsResponse(List.of(node1, node2), clusterState.getClusterName(), 100, 90, 100); + assertEquals(200, validator.getTotalAvailableBytesInWarmTier(nodesStatsResponse, Set.of(node1, node2), 0)); + } + + public void testEligibleNodes() { + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(2, 0, 0)) + .build(); + + assertEquals(2, validator.getEligibleNodes(clusterState).size()); + + clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(0, 1, 1)) + .build(); + assertEquals(0, validator.getEligibleNodes(clusterState).size()); + } + + private static ClusterState buildClusterState(String indexName, String indexUuid, Settings settings) { + return buildClusterState(indexName, indexUuid, settings, IndexMetadata.State.OPEN); + } + + private static ClusterState buildClusterState(String indexName, String indexUuid, Settings settings, IndexMetadata.State state) { + Settings combinedSettings = Settings.builder() + .put(settings) + .put(createDefaultIndexSettings(indexUuid)) + .build(); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder(indexName).settings(combinedSettings).state(state)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metadata.index(indexName)) + .build(); + + return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + } + + private static Settings createDefaultIndexSettings(String indexUuid) { + return Settings.builder() + .put("index.version.created", Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, indexUuid) + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .build(); + } + + private DiscoveryNodes createNodes(int numOfSearchNodes, int numOfDataNodes, int numOfIngestNodes) { + DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < numOfSearchNodes; i++) { + discoveryNodesBuilder.add(new DiscoveryNode("node-s" + i, buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT)); + } + for (int i = 0; i < numOfDataNodes; i++) { + discoveryNodesBuilder.add(new DiscoveryNode("node-d" + i, buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT)); + } + for (int i = 0; i < numOfIngestNodes; i++) { + discoveryNodesBuilder.add(new DiscoveryNode("node-i" + i, buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.INGEST_ROLE), Version.CURRENT)); + } + return discoveryNodesBuilder.build(); + } + + private ShardStats[] createShardStats(ShardRouting[] shardRoutings, long sizePerShard, Index index) { + ShardStats[] shardStats = new ShardStats[shardRoutings.length]; + for (int i = 1; i <= shardRoutings.length; i++) { + ShardRouting shardRouting = shardRoutings[i-1]; + Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardRouting.id())); + CommonStats commonStats = new CommonStats(); + commonStats.store = new StoreStats(sizePerShard, 0); + shardStats[i-1] = new ShardStats(shardRouting, new ShardPath(false, path, path, shardRouting.shardId()), + commonStats, null, null, null); + } + return shardStats; + } + + private IndicesStatsResponse createIndicesStatsResponse(Index index, ClusterState clusterState, long sizePerShard) { + ShardRouting[] shardRoutings = clusterState.routingTable().allShards(index.getName()) + .stream() + .filter(ShardRouting::primary) + .toArray(ShardRouting[]::new); + + ShardStats[] shardStats = createShardStats(shardRoutings, sizePerShard, index); + return IndicesStatsTests.newIndicesStatsResponse(shardStats, + shardStats.length, + shardStats.length, + 0, + emptyList()); + } + private NodesStatsResponse createNodesStatsResponse(List nodes, ClusterName clusterName, long total, long free, + long available) { + FsInfo fsInfo = new FsInfo(0, + null, + new FsInfo.Path[]{new FsInfo.Path("/path", "/dev/sda", total, free, available)} + ); + List nodeStats = new ArrayList<>(); + for (DiscoveryNode node : nodes) { + nodeStats.add(new NodeStats(node, 0, null, null, null, + null, null, fsInfo, null, null, null, null, + null, null, null, null, null, + null, null, null, null, + null, null, null, null, null, + null, null, null)); + } + return new NodesStatsResponse(clusterName, nodeStats, new ArrayList<>()); + } +}