Skip to content

Commit

Permalink
Build failure fix after rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Apr 25, 2024
1 parent 3b871d2 commit 9a81564
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@

package org.opensearch.gateway;

import org.opensearch.Version;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
Expand Down Expand Up @@ -1245,165 +1244,6 @@ private void createNIndices(int n, String prefix) {
}
}

public void testSingleShardFetchUsingBatchAction() {
String indexName = "test";
int numOfShards = 1;
prepareIndex(indexName, numOfShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards);

ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();

TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap)
);
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
GatewayStartedShard gatewayStartedShard = response.getNodesMap()
.get(searchShardsResponse.getNodes()[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard);
}

public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
// start node
internalCluster().startNode();
String indexName1 = "test1";
String indexName2 = "test2";
int numShards = internalCluster().numDataNodes();
// assign one primary shard each to the data nodes
prepareIndex(indexName1, numShards);
prepareIndex(indexName2, numShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName1, indexName2 }, numShards);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get();
assertEquals(internalCluster().numDataNodes(), searchShardsResponse.getNodes().length);
TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap)
);
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
ShardId shardId = clusterSearchShardsGroup.getShardId();
assertEquals(1, clusterSearchShardsGroup.getShards().length);
String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId();
GatewayStartedShard gatewayStartedShard = response.getNodesMap().get(nodeId).getNodeGatewayStartedShardsBatch().get(shardId);
assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard);
}
}

public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
String indexName = "test";
int numOfShards = 1;
prepareIndex(indexName, numOfShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId);
TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
internalCluster().restartNode(searchShardsResponse.getNodes()[0].getName());
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap)
);
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
GatewayStartedShard gatewayStartedShard = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNotNull(gatewayStartedShard.storeException());
assertNotNull(gatewayStartedShard.allocationId());
assertTrue(gatewayStartedShard.primary());
}

public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException {
String indexName = "test";
DiscoveryNode[] nodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
new String[] { indexName },
nodes
);
Index index = resolveIndex(indexName);
ShardId shardId = new ShardId(index, 0);
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
.get(nodes[0].getId())
.getNodeStoreFilesMetadataBatch()
.get(shardId);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId);
}

public void testShardStoreFetchMultiNodeMultiIndexesUsingBatchAction() throws Exception {
internalCluster().startNodes(2);
String indexName1 = "test1";
String indexName2 = "test2";
DiscoveryNode[] nodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
new String[] { indexName1, indexName2 },
nodes
);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
ShardId shardId = clusterSearchShardsGroup.getShardId();
ShardRouting[] shardRoutings = clusterSearchShardsGroup.getShards();
assertEquals(2, shardRoutings.length);
for (ShardRouting shardRouting : shardRoutings) {
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
.get(shardRouting.currentNodeId())
.getNodeStoreFilesMetadataBatch()
.get(shardId);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId);
}
}
}

public void testShardStoreFetchNodeNotConnectedUsingBatchAction() {
DiscoveryNode nonExistingNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
String indexName = "test";
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
new String[] { indexName },
new DiscoveryNode[] { nonExistingNode }
);
assertTrue(response.hasFailures());
assertEquals(1, response.failures().size());
assertEquals(nonExistingNode.getId(), response.failures().get(0).nodeId());
}

public void testShardStoreFetchCorruptedIndexUsingBatchAction() throws Exception {
internalCluster().startNodes(2);
String index1Name = "test1";
String index2Name = "test2";
prepareIndices(new String[] { index1Name, index2Name }, 1, 1);
Map<ShardId, ShardAttributes> shardAttributesMap = prepareRequestMap(new String[] { index1Name, index2Name }, 1);
Index index1 = resolveIndex(index1Name);
ShardId shardId1 = new ShardId(index1, 0);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(index1Name).get();
assertEquals(2, searchShardsResponse.getNodes().length);

// corrupt test1 index shards
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId1);
corruptShard(searchShardsResponse.getNodes()[1].getName(), shardId1);
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(false).get();
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, discoveryNodes)
);
Map<ShardId, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata> nodeStoreFilesMetadata = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeStoreFilesMetadataBatch();
// We don't store exception in case of corrupt index, rather just return an empty response
assertNull(nodeStoreFilesMetadata.get(shardId1).getStoreFileFetchException());
assertEquals(shardId1, nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().shardId());
assertTrue(nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().isEmpty());

Index index2 = resolveIndex(index2Name);
ShardId shardId2 = new ShardId(index2, 0);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata.get(shardId2), shardId2);
}

public void testDeleteRedIndexInBatchMode() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Expand Down Expand Up @@ -1454,57 +1294,4 @@ public void testDeleteRedIndexInBatchMode() throws Exception {
assertFalse(indexExistResponse.isExists());
}

private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) {
for (String index : indices) {
createIndex(
index,
Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards)
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicaShards)
.build()
);
index(index, "type", "1", Collections.emptyMap());
flush(index);
}
}

private TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch prepareAndSendRequest(
String[] indices,
DiscoveryNode[] nodes
) {
Map<ShardId, ShardAttributes> shardAttributesMap = null;
prepareIndices(indices, 1, 1);
shardAttributesMap = prepareRequestMap(indices, 1);
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
return ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, nodes)
);
}

private void assertNodeStoreFilesMetadataSuccessCase(
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata,
ShardId shardId
) {
assertNull(nodeStoreFilesMetadata.getStoreFileFetchException());
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFileMetadata = nodeStoreFilesMetadata.storeFilesMetadata();
assertFalse(storeFileMetadata.isEmpty());
assertEquals(shardId, storeFileMetadata.shardId());
assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases());
}

private void assertNodeGatewayStartedShardsHappyCase(GatewayStartedShard gatewayStartedShard) {
assertNull(gatewayStartedShard.storeException());
assertNotNull(gatewayStartedShard.allocationId());
assertTrue(gatewayStartedShard.primary());
}

private void prepareIndex(String indexName, int numberOfPrimaryShards) {
createIndex(
indexName,
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
);
index(indexName, "type", "1", Collections.emptyMap());
flush(indexName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
Expand Down Expand Up @@ -201,9 +200,9 @@ private ClusterState buildResult(ClusterState oldState, RoutingAllocation alloca
if (restoreInProgress != null) {
RestoreInProgress updatedRestoreInProgress = allocation.updateRestoreInfoWithRoutingChanges(restoreInProgress);
if (updatedRestoreInProgress != restoreInProgress) {
ImmutableOpenMap.Builder<String, ClusterState.Custom> customsBuilder = ImmutableOpenMap.builder(allocation.getCustoms());
final Map<String, ClusterState.Custom> customsBuilder = new HashMap<>(allocation.getCustoms());
customsBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
newStateBuilder.customs(customsBuilder.build());
newStateBuilder.customs(customsBuilder);
}
}
return newStateBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,6 @@ public interface ExistingShardsAllocator {
Setting.Property.NodeScope
);

/**
* Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk.
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
* in one or more go.
* <p>
* Enable this setting if your ExistingShardAllocator is implementing the
* {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method.
* The default implementation of this method is not optimized and assigns shards one by one.
* <p>
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e,
* {@link ShardsBatchGatewayAllocator}.
* <p>
* This setting is experimental at this point.
*/
Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting(
"cluster.allocator.existing_shards_allocator.batch_enabled",
false,
Setting.Property.NodeScope
);

/**
* Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -270,7 +271,6 @@ public void apply(Settings value, Settings current, Settings previous) {
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED,
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING,
Expand Down

0 comments on commit 9a81564

Please sign in to comment.