From baca30918aeffa35338b57f1efaa8ca5f0bc5cbb Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 12 Apr 2024 11:53:41 +0530 Subject: [PATCH] Incorporate AsyncShardBatchFetch class changes Signed-off-by: Aman Khare --- .../gateway/RecoveryFromGatewayIT.java | 2 +- .../org/opensearch/cluster/ClusterModule.java | 8 +-- .../opensearch/gateway/GatewayAllocator.java | 2 +- .../gateway/ShardsBatchGatewayAllocator.java | 54 +++++++------------ ...sportNodesListShardStoreMetadataBatch.java | 2 +- .../main/java/org/opensearch/node/Node.java | 8 +-- .../cluster/ClusterModuleTests.java | 13 +++-- .../gateway/GatewayAllocatorTests.java | 12 ++--- .../gateway/ShardBatchCacheTests.java | 13 +++-- .../TestShardBatchGatewayAllocator.java | 7 +-- 10 files changed, 51 insertions(+), 70 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 2454b643ad025..53048c036db49 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -800,7 +800,7 @@ public void testBatchModeEnabled() throws Exception { ensureGreen("test"); assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); - assertEquals(0,gatewayAllocator.getNumberOfInFlightFetches()); + assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches()); } public void testBatchModeDisabled() throws Exception { diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index aa9101090b6d5..57c44aa107e5b 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -154,13 +154,7 @@ public ClusterModule( this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext); - this.allocationService = new AllocationService( - allocationDeciders, - shardsAllocator, - clusterInfoService, - snapshotsInfoService, - settings - ); + this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); } public static List getNamedWriteables() { diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index 444a33c28d3ad..c8ef9364ebba9 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -83,7 +83,7 @@ public class GatewayAllocator implements ExistingShardsAllocator { private final ConcurrentMap< ShardId, AsyncShardFetch> asyncFetchStarted = ConcurrentCollections - .newConcurrentMap(); + .newConcurrentMap(); private final ConcurrentMap> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); private Set lastSeenEphemeralIds = Collections.emptySet(); diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 69878d83dfa73..0972f7b1398eb 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -33,10 +33,13 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.index.store.Store; import org.opensearch.indices.store.ShardAttributes; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata; import java.util.Collections; import java.util.HashMap; @@ -48,10 +51,7 @@ import java.util.Set; import java.util.Spliterators; import java.util.concurrent.ConcurrentMap; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -110,9 +110,7 @@ public ShardsBatchGatewayAllocator( @Override public void cleanCaches() { Stream.of(batchIdToStartedShardBatch, batchIdToStoreShardBatch).forEach(b -> { - Releasables.close( - b.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList()) - ); + Releasables.close(b.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList())); b.clear(); }); } @@ -227,9 +225,7 @@ protected Set createAndUpdateBatches(RoutingAllocation allocation, boole // get all batched shards Map currentBatchedShards = new HashMap<>(); for (Map.Entry batchEntry : currentBatches.entrySet()) { - batchEntry.getValue().getBatchedShards() - .forEach(shardId -> currentBatchedShards.put(shardId, - batchEntry.getKey())); + batchEntry.getValue().getBatchedShards().forEach(shardId -> currentBatchedShards.put(shardId, batchEntry.getKey())); } Set newShardsToBatch = Sets.newHashSet(); @@ -447,23 +443,11 @@ class InternalBatchAsyncFetch extends AsyncShardB AsyncShardFetch.Lister, T> action, String batchUUId, Class clazz, - BiFunction, T> responseBuilder, - Function> shardsBatchDataGetter, - Supplier emptyResponseBuilder, - Consumer handleFailedShard + V emptyShardResponse, + Predicate emptyShardResponsePredicate, + ShardBatchResponseFactory responseFactory ) { - super( - logger, - type, - map, - action, - batchUUId, - clazz, - responseBuilder, - shardsBatchDataGetter, - emptyResponseBuilder, - handleFailedShard - ); + super(logger, type, map, action, batchUUId, clazz, emptyShardResponse, emptyShardResponsePredicate, responseFactory); } @Override @@ -622,11 +606,10 @@ public ShardsBatch(String batchId, Map shardsWithInfo, bool shardIdsMap, batchStartedAction, batchId, - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard.class, - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch::new, - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch::getNodeGatewayStartedShardsBatch, - () -> new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(null, false, null, null), - this::removeShard + GatewayStartedShard.class, + new GatewayStartedShard(null, false, null, null), + GatewayStartedShard::isEmpty, + new ShardBatchResponseFactory<>(true) ); } else { asyncBatch = new InternalBatchAsyncFetch<>( @@ -635,11 +618,10 @@ public ShardsBatch(String batchId, Map shardsWithInfo, bool shardIdsMap, batchStoreAction, batchId, - TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata.class, - TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch::new, - TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch::getNodeStoreFilesMetadataBatch, - this::buildEmptyReplicaShardResponse, - this::removeShard + NodeStoreFilesMetadata.class, + new NodeStoreFilesMetadata(new StoreFilesMetadata(null, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), null), + NodeStoreFilesMetadata::isEmpty, + new ShardBatchResponseFactory<>(false) ); } } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java index 85d5bff4677ef..68dffa62f7b2c 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java @@ -276,7 +276,7 @@ public void writeTo(StreamOutput out) throws IOException { } } - boolean isEmpty(NodeStoreFilesMetadata response) { + public static boolean isEmpty(NodeStoreFilesMetadata response) { return response.storeFilesMetadata() == null || response.storeFilesMetadata().isEmpty() && response.getStoreFileFetchException() == null; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a61e675508ac0..f500994289d30 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -131,12 +131,12 @@ import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; import org.opensearch.gateway.GatewayAllocator; -import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.GatewayMetaState; import org.opensearch.gateway.GatewayModule; import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.MetaStateService; import org.opensearch.gateway.PersistedClusterStateService; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; @@ -1324,8 +1324,10 @@ protected Node( // completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation // service needs access to the existing shards allocators (e.g. the GatewayAllocator, ShardsBatchGatewayAllocator) which // need to be able to trigger a reroute, which needs to call into the allocation service. We close the loop here: - clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class), - injector.getInstance(ShardsBatchGatewayAllocator.class)); + clusterModule.setExistingShardsAllocators( + injector.getInstance(GatewayAllocator.class), + injector.getInstance(ShardsBatchGatewayAllocator.class) + ); List pluginLifecycleComponents = pluginComponents.stream() .filter(p -> p instanceof LifecycleComponent) diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index f8413f8ed6120..557e4dc2ca8c5 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -78,7 +78,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -298,8 +297,10 @@ public void testRejectsReservedExistingShardsAllocatorName() { null, threadContext ); - expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(), - new TestShardBatchGatewayAllocator())); + expectThrows( + IllegalArgumentException.class, + () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(), new TestShardBatchGatewayAllocator()) + ); } public void testRejectsDuplicateExistingShardsAllocatorName() { @@ -311,8 +312,10 @@ public void testRejectsDuplicateExistingShardsAllocatorName() { null, threadContext ); - expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(), - new TestShardBatchGatewayAllocator())); + expectThrows( + IllegalArgumentException.class, + () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(), new TestShardBatchGatewayAllocator()) + ); } private static ClusterPlugin existingShardsAllocatorPlugin(final String allocatorName) { diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index ee269e0264b8b..604a5e62192c2 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -255,9 +255,9 @@ public void testGetBatchIdExisting() { for (String batchId : primaryBatches) { if (shardRouting.primary() == true && testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch() - .get(batchId) - .getBatchedShards() - .contains(shardRouting.shardId())) { + .get(batchId) + .getBatchedShards() + .contains(shardRouting.shardId())) { if (shardIdToBatchIdForStartedShards.containsKey(shardRouting.shardId())) { fail("found duplicate shard routing for shard. One shard cant be in multiple batches " + shardRouting.shardId()); } @@ -272,9 +272,9 @@ public void testGetBatchIdExisting() { for (String batchId : replicaBatches) { if (shardRouting.primary() == false && testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch() - .get(batchId) - .getBatchedShards() - .contains(shardRouting.shardId())) { + .get(batchId) + .getBatchedShards() + .contains(shardRouting.shardId())) { if (shardIdToBatchIdForStoreShards.containsKey(shardRouting.shardId())) { fail("found duplicate shard routing for shard. One shard cant be in multiple batches " + shardRouting.shardId()); } diff --git a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java index 1b42a31a4fd84..12030ad41d508 100644 --- a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java +++ b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java @@ -29,8 +29,7 @@ public class ShardBatchCacheTests extends OpenSearchAllocationTestCase { private static final String BATCH_ID = "b1"; private final DiscoveryNode node1 = newNode("node1"); private final DiscoveryNode node2 = newNode("node2"); - // Needs to be enabled once ShardsBatchGatewayAllocator is pushed - // private final Map batchInfo = new HashMap<>(); + private final Map batchInfo = new HashMap<>(); private AsyncShardBatchFetch.ShardBatchCache shardCache; private List shardsInBatch = new ArrayList<>(); private static final int NUMBER_OF_SHARDS_DEFAULT = 10; @@ -162,7 +161,7 @@ public void testShardsDataWithException() { null ); - // assertEquals(5, batchInfo.size()); + assertEquals(10, batchInfo.size()); assertEquals(2, fetchData.size()); assertEquals(10, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); assertTrue(fetchData.get(node2).getNodeGatewayStartedShardsBatch().isEmpty()); @@ -210,10 +209,10 @@ private void fillShards(Map shardAttributesMap, int nu for (ShardId shardId : shardsInBatch) { ShardAttributes attr = new ShardAttributes(""); shardAttributesMap.put(shardId, attr); - // batchInfo.put( - // shardId, - // new ShardsBatchGatewayAllocator.ShardEntry(attr, randomShardRouting(shardId.getIndexName(), shardId.id())) - // ); + batchInfo.put( + shardId, + new ShardsBatchGatewayAllocator.ShardEntry(attr, randomShardRouting(shardId.getIndexName(), shardId.id())) + ); } } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java index 8721f40b2546b..53a4e90adb976 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java @@ -18,6 +18,7 @@ import org.opensearch.gateway.PrimaryShardBatchAllocator; import org.opensearch.gateway.ReplicaShardBatchAllocator; import org.opensearch.gateway.ShardsBatchGatewayAllocator; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; @@ -46,15 +47,15 @@ protected AsyncShardFetch.FetchResult> entry : knownAllocations.entrySet()) { String nodeId = entry.getKey(); Map shardsOnNode = entry.getValue(); - HashMap adaptedResponse = new HashMap<>(); + HashMap adaptedResponse = new HashMap<>(); for (ShardRouting shardRouting : eligibleShards) { ShardId shardId = shardRouting.shardId(); Set ignoreNodes = allocation.getIgnoreNodes(shardId); if (shardsOnNode.containsKey(shardId) && ignoreNodes.contains(nodeId) == false && currentNodes.nodeExists(nodeId)) { - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeShard = - new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard( + TransportNodesGatewayStartedShardHelper.GatewayStartedShard nodeShard = + new TransportNodesGatewayStartedShardHelper.GatewayStartedShard( shardsOnNode.get(shardId).allocationId().getId(), shardsOnNode.get(shardId).primary(), getReplicationCheckpoint(shardId, nodeId)