From 823b4871c129a4f52cc12951585306a2e8e6ae60 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Mon, 18 Mar 2024 16:13:09 +0530 Subject: [PATCH] Index Setting Changes Signed-off-by: Lakshya Taragi --- .../RemoteStoreMigrationAllocationIT.java | 573 ++++++++++++++++++ .../RemoteStoreMigrationSettingsUpdateIT.java | 125 ++++ .../cluster/metadata/IndexMetadata.java | 2 + .../metadata/MetadataCreateIndexService.java | 53 +- .../remotestore/RemoteStoreNodeService.java | 15 + .../MetadataCreateIndexServiceTests.java | 132 ++++ 6 files changed, 899 insertions(+), 1 deletion(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationAllocationIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationAllocationIT.java new file mode 100644 index 0000000000000..5fcb6939184aa --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationAllocationIT.java @@ -0,0 +1,573 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplanation; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; +import org.opensearch.cluster.routing.allocation.NodeAllocationResult; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.NONE; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class RemoteStoreMigrationAllocationIT extends MigrationBaseTestCase { + + public static final String TEST_INDEX = "test_index"; + public static final String NAME = "remote_store_migration"; + + private static final ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + private Client client; + + // tests for primary shard copy allocation with MIXED mode and REMOTE_STORE direction + + public void testAllocateNewPrimaryShardForMixedModeAndRemoteStoreDirection() throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> add remote and non-remote nodes"); + setClusterMode(MIXED.mode); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + + logger.info(" --> verify expected decision for allocating a new primary shard on a non-remote node"); + prepareIndexWithoutReplica(Optional.empty()); + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + Decision decision = getDecisionForTargetNode(nonRemoteNode, true, true, false); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals( + "[remote_store migration_direction]: primary shard copy can not be allocated to a non-remote node", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + + logger.info(" --> attempt allocation on non-remote node"); + attemptAllocation(nonRemoteNodeName); + + logger.info(" --> verify non-allocation of primary shard on non-remote node"); + assertNonAllocation(true); + + logger.info(" --> verify expected decision for allocating a new primary shard on a remote node"); + prepareDecisions(); + decision = getDecisionForTargetNode(remoteNode, true, true, false); + assertEquals(Decision.Type.YES, decision.type()); + assertEquals( + "[remote_store migration_direction]: primary shard copy can be allocated to a remote node", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + + logger.info(" --> attempt allocation on remote node"); + attemptAllocation(remoteNodeName); + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of primary shard"); + assertAllocation(true, remoteNode); + } + + // tests for replica shard copy allocation with MIXED mode and REMOTE_STORE direction + + public void testDontAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnNonRemoteNodeForMixedModeAndRemoteStoreDirection() + throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> add remote and non-remote nodes"); + setClusterMode(MIXED.mode); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + + logger.info(" --> allocate primary shard on non-remote node"); + prepareIndexWithAllocatedPrimary(nonRemoteNode, Optional.empty()); + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + logger.info(" --> verify expected decision for replica shard"); + prepareDecisions(); + Decision decision = getDecisionForTargetNode(remoteNode, false, true, false); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals( + "[remote_store migration_direction]: replica shard copy can not be allocated to a remote node since primary shard copy is not yet migrated to remote", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + + logger.info(" --> attempt allocation of replica shard on remote node"); + attemptAllocation(remoteNodeName); + + logger.info(" --> verify non-allocation of replica shard"); + assertNonAllocation(false); + } + + public void testAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnRemoteNodeForMixedModeAndRemoteStoreDirection() throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> set mixed cluster compatibility mode"); + setClusterMode(MIXED.mode); + + logger.info(" --> add remote and non-remote nodes"); + addRemote = true; + String remoteNodeName1 = internalCluster().startNode(); + String remoteNodeName2 = internalCluster().startNode(); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode1 = assertNodeInCluster(remoteNodeName1); + DiscoveryNode remoteNode2 = assertNodeInCluster(remoteNodeName2); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + + logger.info(" --> allocate primary shard on remote node"); + prepareIndexWithAllocatedPrimary(remoteNode1, Optional.empty()); + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + logger.info(" --> verify expected decision for replica shard"); + prepareDecisions(); + Decision decision = getDecisionForTargetNode(remoteNode2, false, true, false); + assertEquals(Decision.Type.YES, decision.type()); + assertEquals( + "[remote_store migration_direction]: replica shard copy can be allocated to a remote node since primary shard copy has been migrated to remote", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + } + + public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnNonRemoteNodeForMixedModeAndRemoteStoreDirection() + throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> add remote and non-remote nodes"); + setClusterMode(MIXED.mode); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + addRemote = false; + String nonRemoteNodeName1 = internalCluster().startNode(); + String nonRemoteNodeName2 = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + DiscoveryNode nonRemoteNode1 = assertNodeInCluster(nonRemoteNodeName1); + DiscoveryNode nonRemoteNode2 = assertNodeInCluster(nonRemoteNodeName2); + + logger.info(" --> allocate primary shard on non-remote node"); + prepareIndexWithAllocatedPrimary(nonRemoteNode1, Optional.empty()); + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + logger.info(" --> verify expected decision for replica shard"); + prepareDecisions(); + Decision decision = getDecisionForTargetNode(nonRemoteNode2, false, true, false); + Decision.Type expectedType = Decision.Type.YES; + String expectedReason = "[remote_store migration_direction]: replica shard copy can be allocated to a non-remote node"; + + assertEquals(expectedType, decision.type()); + assertEquals(expectedReason, decision.getExplanation().toLowerCase(Locale.ROOT)); + + logger.info(" --> allocate replica shard on the other non-remote node"); + attemptAllocation(nonRemoteNodeName2); + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of replica shard"); + assertAllocation(false, nonRemoteNode2); + } + + public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnRemoteNodeForMixedModeAndRemoteStoreDirection() throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> set mixed cluster compatibility mode"); + + logger.info(" --> add remote and non-remote nodes"); + setClusterMode(MIXED.mode); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + + logger.info(" --> allocate primary on remote node"); + prepareIndexWithAllocatedPrimary(remoteNode, Optional.empty()); + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + logger.info(" --> verify expected decision for replica shard"); + prepareDecisions(); + Decision decision = getDecisionForTargetNode(nonRemoteNode, false, true, false); + + Decision.Type expectedType = Decision.Type.YES; + assertEquals(expectedType, decision.type()); + assertEquals( + "[remote_store migration_direction]: replica shard copy can be allocated to a non-remote node", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + } + + // test for STRICT mode + + public void testAlwaysAllocateNewShardForStrictMode() throws Exception { + boolean isRemoteCluster = randomBoolean(); + boolean isReplicaAllocation = randomBoolean(); + + logger.info(" --> initialize cluster and add nodes"); + List nodes = new ArrayList<>(); + if (isRemoteCluster) { + initializeCluster(true); + + logger.info(" --> add remote nodes"); + String remoteNodeName1 = internalCluster().startNode(); + String remoteNodeName2 = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode1 = assertNodeInCluster(remoteNodeName1); + DiscoveryNode remoteNode2 = assertNodeInCluster(remoteNodeName2); + nodes.add(remoteNode1); + nodes.add(remoteNode2); + } else { + initializeCluster(false); + setClusterMode(STRICT.mode); + addRemote = false; + String nonRemoteNodeName1 = internalCluster().startNode(); + String nonRemoteNodeName2 = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode1 = assertNodeInCluster(nonRemoteNodeName1); + DiscoveryNode nonRemoteNode2 = assertNodeInCluster(nonRemoteNodeName2); + nodes.add(nonRemoteNode1); + nodes.add(nonRemoteNode2); + } + + logger.info(" --> verify expected decision for allocating a new shard on a non-remote node"); + if (isReplicaAllocation) { + prepareIndexWithAllocatedPrimary(nodes.get(0), Optional.empty()); + } else { + prepareIndexWithoutReplica(Optional.empty()); + } + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + DiscoveryNode targetNode = isReplicaAllocation ? nodes.get(1) : nodes.get(0); + + assertEquals( + (isRemoteCluster ? "true" : null), + client.admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .getMetadata() + .index(TEST_INDEX) + .getSettings() + .get(SETTING_REMOTE_STORE_ENABLED) + ); + + prepareDecisions(); + Decision decision = getDecisionForTargetNode(targetNode, !isReplicaAllocation, true, false); + assertEquals(Decision.Type.YES, decision.type()); + String expectedReason = String.format( + Locale.ROOT, + "[remote_store migration_direction]: %s shard copy can be allocated to a %s node for strict compatibility mode", + (isReplicaAllocation ? "replica" : "primary"), + (isRemoteCluster ? "remote" : "non-remote") + ); + assertEquals(expectedReason, decision.getExplanation().toLowerCase(Locale.ROOT)); + + logger.info(" --> attempt allocation"); + attemptAllocation(targetNode.getName()); + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of primary shard"); + assertAllocation(!isReplicaAllocation, targetNode); + } + + // test for remote store backed index + + public void testDontAllocateToNonRemoteNodeForRemoteStoreBackedIndex() throws Exception { + logger.info(" --> initialize cluster with remote master node"); + initializeCluster(true); + + logger.info(" --> add remote and non-remote nodes"); + String remoteNodeName = internalCluster().startNode(); + setClusterMode(MIXED.mode); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + + boolean isReplicaAllocation = randomBoolean(); + + logger.info(" --> verify expected decision for allocating a new shard on a non-remote node"); + if (isReplicaAllocation) { + prepareIndexWithAllocatedPrimary(remoteNode, Optional.empty()); + } else { + prepareIndexWithoutReplica(Optional.empty()); + } + + assertEquals( + "true", + client.admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .getMetadata() + .index(TEST_INDEX) + .getSettings() + .get(SETTING_REMOTE_STORE_ENABLED) + ); + + setDirection(REMOTE_STORE.direction); + prepareDecisions(); + Decision decision = getDecisionForTargetNode(nonRemoteNode, !isReplicaAllocation, false, false); + assertEquals(Decision.Type.NO, decision.type()); + String expectedReason = String.format( + Locale.ROOT, + "[remote_store migration_direction]: %s shard copy can not be allocated to a non-remote node because a remote store backed index's shard copy can only be allocated to a remote node", + (isReplicaAllocation ? "replica" : "primary") + ); + assertEquals(expectedReason, decision.getExplanation().toLowerCase(Locale.ROOT)); + + logger.info(" --> attempt allocation of shard on non-remote node"); + attemptAllocation(nonRemoteNodeName); + + logger.info(" --> verify non-allocation of shard"); + assertNonAllocation(!isReplicaAllocation); + } + + // bootstrap a cluster + public void initializeCluster(boolean remoteClusterManager) { + addRemote = remoteClusterManager; + internalCluster().setBootstrapClusterManagerNodeIndex(0); + internalCluster().startNodes(1); + client = internalCluster().client(); + setClusterMode(STRICT.mode); + setDirection(NONE.direction); + } + + // set the compatibility mode of cluster [strict, mixed] + public static void setClusterMode(String mode) { + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), mode)); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + // set the migration direction for cluster [remote_store, docrep, none] + public static void setDirection(String direction) { + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), direction)); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + // verify that the given nodeName exists in cluster + public static DiscoveryNode assertNodeInCluster(String nodeName) { + Map nodes = internalCluster().client().admin().cluster().prepareState().get().getState().nodes().getNodes(); + DiscoveryNode discoveryNode = null; + for (Map.Entry entry : nodes.entrySet()) { + DiscoveryNode node = entry.getValue(); + if (node.getName().equals(nodeName)) { + discoveryNode = node; + break; + } + } + assertNotNull(discoveryNode); + return discoveryNode; + } + + // returns a comma-separated list of node names excluding `except` + public static String allNodesExcept(String except) { + StringBuilder exclude = new StringBuilder(); + DiscoveryNodes allNodes = internalCluster().client().admin().cluster().prepareState().get().getState().nodes(); + for (DiscoveryNode node : allNodes) { + if (node.getName().equals(except) == false) { + exclude.append(node.getName()).append(","); + } + } + return exclude.toString(); + } + + // obtain decision for allocation/relocation of a shard to a given node + private Decision getDecisionForTargetNode( + DiscoveryNode targetNode, + boolean isPrimary, + boolean includeYesDecisions, + boolean isRelocation + ) { + ClusterAllocationExplanation explanation = client.admin() + .cluster() + .prepareAllocationExplain() + .setIndex(TEST_INDEX) + .setShard(0) + .setPrimary(isPrimary) + .setIncludeYesDecisions(includeYesDecisions) + .get() + .getExplanation(); + + Decision requiredDecision = null; + List nodeAllocationResults; + if (isRelocation) { + MoveDecision moveDecision = explanation.getShardAllocationDecision().getMoveDecision(); + nodeAllocationResults = moveDecision.getNodeDecisions(); + } else { + AllocateUnassignedDecision allocateUnassignedDecision = explanation.getShardAllocationDecision().getAllocateDecision(); + nodeAllocationResults = allocateUnassignedDecision.getNodeDecisions(); + } + + for (NodeAllocationResult nodeAllocationResult : nodeAllocationResults) { + if (nodeAllocationResult.getNode().equals(targetNode)) { + for (Decision decision : nodeAllocationResult.getCanAllocateDecision().getDecisions()) { + if (decision.label().equals(NAME)) { + requiredDecision = decision; + break; + } + } + } + } + + assertNotNull(requiredDecision); + return requiredDecision; + } + + // create a new test index + public static void prepareIndexWithoutReplica(Optional name) { + String indexName = name.orElse(TEST_INDEX); + internalCluster().client() + .admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.exclude._name", allNodesExcept(null)) + ) + .execute() + .actionGet(); + } + + public void prepareIndexWithAllocatedPrimary(DiscoveryNode primaryShardNode, Optional name) { + String indexName = name.orElse(TEST_INDEX); + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.routing.allocation.include._name", primaryShardNode.getName()) + .put("index.routing.allocation.exclude._name", allNodesExcept(primaryShardNode.getName())) + ) + .setWaitForActiveShards(ActiveShardCount.ONE) + .execute() + .actionGet(); + + ensureYellowAndNoInitializingShards(TEST_INDEX); + + logger.info(" --> verify allocation of primary shard"); + assertAllocation(true, primaryShardNode); + + logger.info(" --> verify non-allocation of replica shard"); + assertNonAllocation(false); + } + + // get allocation and relocation decisions for all nodes + private void prepareDecisions() { + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings(Settings.builder().put("index.routing.allocation.exclude._name", allNodesExcept(null))) + .execute() + .actionGet(); + } + + private void attemptAllocation(String targetNodeName) { + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.routing.allocation.include._name", targetNodeName) + .put("index.routing.allocation.exclude._name", allNodesExcept(targetNodeName)) + ) + .execute() + .actionGet(); + } + + private ShardRouting getShardRouting(boolean isPrimary) { + IndexShardRoutingTable table = client.admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .getRoutingTable() + .index(TEST_INDEX) + .shard(0); + return (isPrimary ? table.primaryShard() : table.replicaShards().get(0)); + } + + // verify that shard does not exist at targetNode + private void assertNonAllocation(boolean isPrimary) { + if (isPrimary) { + ensureRed(TEST_INDEX); + } else { + ensureYellowAndNoInitializingShards(TEST_INDEX); + } + ShardRouting shardRouting = getShardRouting(isPrimary); + assertFalse(shardRouting.active()); + assertNull(shardRouting.currentNodeId()); + assertEquals(ShardRoutingState.UNASSIGNED, shardRouting.state()); + } + + // verify that shard exists at targetNode + private void assertAllocation(boolean isPrimary, DiscoveryNode targetNode) { + ShardRouting shardRouting = getShardRouting(isPrimary); + assertTrue(shardRouting.active()); + assertNotNull(shardRouting.currentNodeId()); + assertEquals(shardRouting.currentNodeId(), targetNode.getId()); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java new file mode 100644 index 0000000000000..f93080b63c8d9 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexSettings; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Optional; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.NONE; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.TEST_INDEX; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.assertNodeInCluster; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.prepareIndexWithoutReplica; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.setClusterMode; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.setDirection; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class RemoteStoreMigrationSettingsUpdateIT extends MigrationBaseTestCase { + + private Client client; + + // remote store backed index setting tests + + public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode() { + logger.info(" --> initialize cluster: gives non remote cluster manager"); + initializeCluster(false); + + String indexName1 = "test_index_1"; + String indexName2 = "test_index_2"; + + logger.info(" --> add non-remote node"); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + assertNodeInCluster(nonRemoteNodeName); + + logger.info(" --> create an index"); + prepareIndexWithoutReplica(Optional.of(indexName1)); + + logger.info(" --> verify that non remote-backed index is created"); + assertNonRemoteStoreBackedIndex(indexName1); + + logger.info(" --> set mixed cluster compatibility mode and remote_store direction"); + setClusterMode(MIXED.mode); + setDirection(REMOTE_STORE.direction); + + logger.info(" --> add remote node"); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + assertNodeInCluster(remoteNodeName); + + logger.info(" --> create another index"); + prepareIndexWithoutReplica(Optional.of(indexName2)); + + logger.info(" --> verify that remote backed index is created"); + assertRemoteStoreBackedIndex(indexName2); + } + + // verify that the created index is not remote store backed + private void assertNonRemoteStoreBackedIndex(String indexName) { + Settings indexSettings = client.admin().indices().prepareGetIndex().execute().actionGet().getSettings().get(indexName); + assertEquals(ReplicationType.DOCUMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED)); + assertNull(indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY)); + assertNull(indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY)); + } + + // verify that the created index is remote store backed + private void assertRemoteStoreBackedIndex(String indexName) { + Settings indexSettings = client.admin().indices().prepareGetIndex().execute().actionGet().getSettings().get(indexName); + assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + assertEquals("true", indexSettings.get(SETTING_REMOTE_STORE_ENABLED)); + assertEquals(REPOSITORY_NAME, indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY)); + assertEquals(REPOSITORY_2_NAME, indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY)); + assertEquals( + IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings) + ); + } + + // restore indices from a snapshot + private void restoreSnapshot(String snapshotRepoName, String snapshotName, String restoredIndexName) { + RestoreSnapshotResponse restoreSnapshotResponse = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName) + .setWaitForCompletion(false) + .setIndices(TEST_INDEX) + .setRenamePattern(TEST_INDEX) + .setRenameReplacement(restoredIndexName) + .get(); + + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); + ensureGreen(restoredIndexName); + } + + private void initializeCluster(boolean remoteClusterManager) { + addRemote = remoteClusterManager; + internalCluster().setBootstrapClusterManagerNodeIndex(0); + internalCluster().startNodes(1); + client = internalCluster().client(); + setClusterMode(STRICT.mode); + setDirection(NONE.direction); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 80b78cfe154f1..bb0e10c7035bd 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -319,6 +319,8 @@ public Iterator> settings() { Property.Final ); + public static final String SETTING_REMOTE_STORE_PREFIX = "index.remote_store."; + public static final String SETTING_REMOTE_STORE_ENABLED = "index.remote_store.enabled"; public static final String SETTING_REMOTE_SEGMENT_STORE_REPOSITORY = "index.remote_store.segment.repository"; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 451871b10d5eb..6c912a49ee5c4 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -52,6 +52,7 @@ import org.opensearch.cluster.block.ClusterBlock; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; @@ -144,6 +145,7 @@ import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore; /** * Service responsible for submitting create index requests @@ -950,6 +952,7 @@ static Settings aggregateIndexSettings( updateReplicationStrategy(indexSettingsBuilder, request.settings(), settings, combinedTemplateSettings); updateRemoteStoreSettings(indexSettingsBuilder, settings); + updateRemoteStoreSettingsForMigration(indexSettingsBuilder, currentState, clusterSettings, request.index()); if (sourceMetadata != null) { assert request.resizeType() != null; @@ -1029,7 +1032,7 @@ private static void updateReplicationStrategy( * @param settingsBuilder index settings builder to be updated with relevant settings * @param clusterSettings cluster level settings */ - private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) { + public static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) { if (isRemoteDataAttributePresent(clusterSettings)) { settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true) .put( @@ -1047,6 +1050,54 @@ private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, } } + /** + * Updates index settings to enable remote store by default for migration towards remote store backed clusters + * @param settingsBuilder index settings builder to be updated with relevant settings + * @param clusterState state of cluster + * @param clusterSettings cluster level settings + */ + public static void updateRemoteStoreSettingsForMigration( + Settings.Builder settingsBuilder, + ClusterState clusterState, + ClusterSettings clusterSettings, + String indexName + ) { + String value = settingsBuilder.get(SETTING_REMOTE_STORE_ENABLED); + if (value != null && value.toLowerCase(Locale.ROOT).equals("true")) { + return; + } + + if (isMigratingToRemoteStore(clusterSettings)) { + String segmentRepo, translogRepo; + Optional remoteNode = clusterState.nodes() + .getNodes() + .values() + .stream() + .filter(DiscoveryNode::isRemoteStoreNode) + .findFirst(); + if (remoteNode.isPresent()) { + translogRepo = remoteNode.get() + .getAttributes() + .get(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + segmentRepo = remoteNode.get() + .getAttributes() + .get(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + if (segmentRepo != null && translogRepo != null) { + settingsBuilder.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(SETTING_REMOTE_STORE_ENABLED, true) + .put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepo) + .put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, translogRepo); + } else { + ValidationException validationException = new ValidationException(); + validationException.addValidationErrors( + Collections.singletonList("Cluster is migrating to remote store but no remote node found, failing index creation") + ); + throw new IndexCreationException(indexName, validationException); + } + } + } + } + public static void validateStoreTypeSettings(Settings settings) { // deprecate simplefs store type: if (IndexModule.Type.SIMPLEFS.match(IndexModule.INDEX_STORE_TYPE_SETTING.get(settings))) { diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index 33b182dd3cc97..b9b956e4870ca 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -14,6 +14,7 @@ import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.util.FeatureFlags; import org.opensearch.repositories.RepositoriesService; @@ -223,4 +224,18 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode return existingRepositories; } } + + /** + * To check if the cluster is undergoing remote store migration + * @param clusterSettings cluster level settings + * @return + * true For REMOTE_STORE migration direction and MIXED compatibility mode, + * false otherwise + */ + public static boolean isMigratingToRemoteStore(ClusterSettings clusterSettings) { + boolean isMixedMode = clusterSettings.get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(CompatibilityMode.MIXED); + boolean isRemoteStoreMigrationDirection = clusterSettings.get(MIGRATION_DIRECTION_SETTING).equals(Direction.REMOTE_STORE); + + return (isMixedMode == true && isRemoteStoreMigrationDirection == true); + } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index a2f19b8c694d0..e53484c21c019 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -55,6 +55,7 @@ import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; @@ -62,6 +63,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -82,6 +84,7 @@ import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; @@ -136,6 +139,7 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING; import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; @@ -150,6 +154,8 @@ import static org.opensearch.node.Node.NODE_ATTRIBUTES; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -1551,6 +1557,132 @@ public void testRemoteStoreOverrideTranslogRepoIndexSettings() { })); } + public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build()); + + // non-remote cluster manager node + DiscoveryNode nonRemoteClusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + emptyMap(), + singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteClusterManagerNode) + .localNodeId(nonRemoteClusterManagerNode.getId()) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + + Settings indexSettings = aggregateIndexSettings( + clusterState, + request, + Settings.EMPTY, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ); + + verifyRemoteStoreIndexSettings( + indexSettings, + null, + null, + null, + ReplicationType.DOCUMENT.toString(), + IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + ); + + // remote data node + Map attributes = new HashMap<>(); + attributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1"); + attributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1"); + DiscoveryNode remoteDataNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + attributes, + Set.of(DiscoveryNodeRole.INGEST_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE, DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + + discoveryNodes = DiscoveryNodes.builder(discoveryNodes).add(remoteDataNode).localNodeId(remoteDataNode.getId()).build(); + + clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + + Settings remoteStoreMigrationSettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .build(); + + clusterSettings = new ClusterSettings(remoteStoreMigrationSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + indexSettings = aggregateIndexSettings( + clusterState, + request, + Settings.EMPTY, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ); + + verifyRemoteStoreIndexSettings( + indexSettings, + "true", + "my-segment-repo-1", + "my-translog-repo-1", + ReplicationType.SEGMENT.toString(), + IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + ); + + Map missingTranslogAttribute = Map.of(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1"); + + DiscoveryNodes finalDiscoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteClusterManagerNode) + .add( + new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + missingTranslogAttribute, + Set.of(DiscoveryNodeRole.INGEST_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE, DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ) + ) + .build(); + + ClusterState finalClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(finalDiscoveryNodes).build(); + ClusterSettings finalClusterSettings = clusterSettings; + + final IndexCreationException error = expectThrows(IndexCreationException.class, () -> { + aggregateIndexSettings( + finalClusterState, + request, + Settings.EMPTY, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + finalClusterSettings + ); + }); + + assertThat( + error.getCause().getMessage(), + containsString("Cluster is migrating to remote store but no remote node found, failing index creation") + ); + } + public void testBuildIndexMetadata() { IndexMetadata sourceIndexMetadata = IndexMetadata.builder("parent") .settings(Settings.builder().put("index.version.created", Version.CURRENT).build())