Skip to content

Commit

Permalink
Remove shards from batches if they are not present in unassigned list…
Browse files Browse the repository at this point in the history
… from allocation object

Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Apr 4, 2024
1 parent a938d95 commit 52c9d1c
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.ActionTestUtils;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ElectionSchedulerFactory;
Expand Down Expand Up @@ -1187,6 +1189,53 @@ public void testShardStoreFetchCorruptedIndexUsingBatchAction() throws Exception
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata.get(shardId2), shardId2);
}

public void testDeleteRedIndexInBatchMode() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
createIndex(
"test2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
createIndex(
"testg",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);

ensureGreen("test", "test1", "test2", "testg");
internalCluster().stopRandomDataNode();
ensureStableCluster(2);

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);
ensureRed("test", "test1", "test2");

assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));

logger.info("--> Now do a reroute so batches are created"); // to avoid any race condition in test
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

AcknowledgedResponse deleteIndexResponse = client().admin().indices().prepareDelete("test").get();
assertTrue(deleteIndexResponse.isAcknowledged());

ensureYellow("testg");
IndicesExistsResponse indexExistResponse = client().admin().indices().prepareExists("test").get();
assertFalse(indexExistResponse.isExists());
}

private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) {
for (String index : indices) {
createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ protected Set<String> createAndUpdateBatches(RoutingAllocation allocation, boole
}

Set<ShardRouting> shardsToBatch = Sets.newHashSet();
Set<ShardId> batchedShardsToAssign = Sets.newHashSet();
// add all unassigned shards to the batch if they are not already in a batch
unassigned.forEach(shardRouting -> {
if ((currentBatchShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) {
Expand All @@ -251,8 +252,12 @@ else if (shardRouting.primary() == primary) {
String batchId = currentBatchShards.get(shardRouting.shardId());
batchesToBeAssigned.add(batchId);
currentBatches.get(batchId).batchInfo.get(shardRouting.shardId()).setShardRouting(shardRouting);
batchedShardsToAssign.add(shardRouting.shardId());
}
});

refreshShardBatches(currentBatches, batchedShardsToAssign);

Iterator<ShardRouting> iterator = shardsToBatch.iterator();
assert maxBatchSize > 0 : "Shards batch size must be greater than 0";

Expand Down Expand Up @@ -283,6 +288,23 @@ else if (shardRouting.primary() == primary) {
return batchesToBeAssigned;
}

private void refreshShardBatches(ConcurrentMap<String, ShardsBatch> currentBatches, Set<ShardId> batchedShardsToAssign) {
// cleanup shard from batches if they are not present in unassigned list from allocation object. This is
// needed as AllocationService.reroute can also be called directly by API flows for example DeleteIndices.
// So, as part of calling reroute, those shards will be removed from allocation object. It'll handle the
// scenarios where shards can be removed from unassigned list without "start" or "failed" event.
for (Map.Entry<String, ShardsBatch> batchEntry : currentBatches.entrySet()) {
Iterator<ShardId> shardIdIterator = batchEntry.getValue().getBatchedShards().iterator();
while (shardIdIterator.hasNext()) {
ShardId shardId = shardIdIterator.next();
if (batchedShardsToAssign.contains(shardId) == false) {
shardIdIterator.remove();
batchEntry.getValue().clearShardFromCache(shardId);
}
}
}
}

private void addBatch(ShardsBatch shardsBatch, boolean primary) {
ConcurrentMap<String, ShardsBatch> batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch;
if (batches.containsKey(shardsBatch.getBatchId())) {
Expand Down Expand Up @@ -640,11 +662,15 @@ private TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata buildEm

private void removeFromBatch(ShardRouting shard) {
removeShard(shard.shardId());
asyncBatch.clearShard(shard.shardId());
clearShardFromCache(shard.shardId());
// assert that fetcher and shards are the same as batched shards
assert batchInfo.size() == asyncBatch.shardAttributesMap.size() : "Shards size is not equal to fetcher size";
}

private void clearShardFromCache(ShardId shardId) {
asyncBatch.clearShard(shardId);
}

public List<ShardRouting> getBatchedShardRoutings() {
return batchInfo.values().stream().map(ShardEntry::getShardRouting).collect(Collectors.toList());
}
Expand Down

0 comments on commit 52c9d1c

Please sign in to comment.