From 5a7d450f7367ce4de0094a7005d22e6a34ca8bf8 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Fri, 26 Jul 2024 00:21:34 +0530 Subject: [PATCH 01/13] Optimise unassigned shards iteration after allocator timeout Signed-off-by: Rishab Nahata --- .../common/util/BatchRunnableExecutor.java | 2 ++ .../gateway/BaseGatewayShardAllocator.java | 9 ++---- .../gateway/ShardsBatchGatewayAllocator.java | 28 +++++++++++++------ .../PrimaryShardBatchAllocatorTests.java | 13 +++++---- .../ReplicaShardBatchAllocatorTests.java | 8 +++--- 5 files changed, 35 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java b/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java index d3d3304cb909a..220d42ffa2d59 100644 --- a/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java @@ -61,6 +61,8 @@ public void run() { "Time taken to execute timed runnables in this cycle:[{}ms]", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) ); + onComplete(); } + public void onComplete(){} } diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 0d6af943d39e0..aaa712ef5e523 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -82,17 +82,12 @@ public void allocateUnassigned( executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler); } - protected void allocateUnassignedBatchOnTimeout(List shardRoutings, RoutingAllocation allocation, boolean primary) { - Set shardIdsFromBatch = new HashSet<>(); - for (ShardRouting shardRouting : shardRoutings) { - ShardId shardId = shardRouting.shardId(); - shardIdsFromBatch.add(shardId); - } + protected void allocateUnassignedBatchOnTimeout(Set shardIds, RoutingAllocation allocation, boolean primary) { RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { ShardRouting unassignedShard = iterator.next(); AllocateUnassignedDecision allocationDecision; - if (unassignedShard.primary() == primary && shardIdsFromBatch.contains(unassignedShard.shardId())) { + if (unassignedShard.primary() == primary && shardIds.contains(unassignedShard.shardId())) { allocationDecision = AllocateUnassignedDecision.throttle(null); executeDecision(unassignedShard, allocationDecision, allocation, iterator); } diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 55f5388d8f454..cd596c12fd36e 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -118,6 +118,8 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { protected final ConcurrentMap batchIdToStoreShardBatch = ConcurrentCollections.newConcurrentMap(); private final TransportNodesListGatewayStartedShardsBatch batchStartedAction; private final TransportNodesListShardStoreMetadataBatch batchStoreAction; + private Set timedOutPrimaryShardIds; + private Set timedOutReplicaShardIds; @Inject public ShardsBatchGatewayAllocator( @@ -242,17 +244,14 @@ protected BatchRunnableExecutor innerAllocateUnassignedBatch( } List runnables = new ArrayList<>(); if (primary) { + timedOutPrimaryShardIds = new HashSet<>(); batchIdToStartedShardBatch.values() .stream() .filter(batch -> batchesToAssign.contains(batch.batchId)) .forEach(shardsBatch -> runnables.add(new TimeoutAwareRunnable() { @Override public void onTimeout() { - primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout( - shardsBatch.getBatchedShardRoutings(), - allocation, - true - ); + timedOutPrimaryShardIds.addAll(shardsBatch.getBatchedShards()); } @Override @@ -260,15 +259,22 @@ public void run() { primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); } })); - return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout); + return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout) { + @Override + public void onComplete() { + logger.trace("Triggering oncomplete after timeout for [{}] primary shards", timedOutPrimaryShardIds.size()); + primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true); + } + }; } else { + timedOutReplicaShardIds = new HashSet<>(); batchIdToStoreShardBatch.values() .stream() .filter(batch -> batchesToAssign.contains(batch.batchId)) .forEach(batch -> runnables.add(new TimeoutAwareRunnable() { @Override public void onTimeout() { - replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(batch.getBatchedShardRoutings(), allocation, false); + timedOutReplicaShardIds.addAll(batch.getBatchedShards()); } @Override @@ -276,7 +282,13 @@ public void run() { replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation); } })); - return new BatchRunnableExecutor(runnables, () -> replicaShardsBatchGatewayAllocatorTimeout); + return new BatchRunnableExecutor(runnables, () -> replicaShardsBatchGatewayAllocatorTimeout) { + @Override + public void onComplete() { + logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size()); + replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false); + } + }; } } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index 270cf465d0f80..89acfe2169ef3 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -264,8 +264,9 @@ public void testAllocateUnassignedBatchOnTimeoutWithMatchingPrimaryShards() { final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0"); ShardRouting shardRouting = routingAllocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - List shardRoutings = Arrays.asList(shardRouting); - batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation, true); + Set shardIds = new HashSet<>(); + shardIds.add(shardRouting.shardId()); + batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, true); List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); assertEquals(1, ignoredShards.size()); @@ -277,8 +278,7 @@ public void testAllocateUnassignedBatchOnTimeoutWithNoMatchingPrimaryShards() { AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random()); setUpShards(1); final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0"); - List shardRoutings = new ArrayList<>(); - batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation, true); + batchAllocator.allocateUnassignedBatchOnTimeout(new HashSet<>(), routingAllocation, true); List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); assertEquals(0, ignoredShards.size()); @@ -296,8 +296,9 @@ public void testAllocateUnassignedBatchOnTimeoutWithNonPrimaryShards() { .shard(shardId.id()) .replicaShards() .get(0); - List shardRoutings = Arrays.asList(shardRouting); - batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation, false); + Set shardIds = new HashSet<>(); + shardIds.add(shardRouting.shardId()); + batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, false); List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); assertEquals(1, ignoredShards.size()); diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java index 435fd78be2bcd..78ed3f2c7d38c 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java @@ -720,9 +720,9 @@ public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() t public void testAllocateUnassignedBatchOnTimeoutWithUnassignedReplicaShard() { RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders()); final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - List shards = new ArrayList<>(); + Set shards = new HashSet<>(); while (iterator.hasNext()) { - shards.add(iterator.next()); + shards.add(iterator.next().shardId()); } testBatchAllocator.allocateUnassignedBatchOnTimeout(shards, allocation, false); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -736,9 +736,9 @@ public void testAllocateUnassignedBatchOnTimeoutWithUnassignedReplicaShard() { public void testAllocateUnassignedBatchOnTimeoutWithAlreadyRecoveringReplicaShard() { RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - List shards = new ArrayList<>(); + Set shards = new HashSet<>(); while (iterator.hasNext()) { - shards.add(iterator.next()); + shards.add(iterator.next().shardId()); } testBatchAllocator.allocateUnassignedBatchOnTimeout(shards, allocation, false); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0)); From 2ed62a868cbe95b065db17ac5c8415df42c5e3e7 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Fri, 26 Jul 2024 00:22:41 +0530 Subject: [PATCH 02/13] Fix spotless Signed-off-by: Rishab Nahata --- .../java/org/opensearch/common/util/BatchRunnableExecutor.java | 2 +- .../java/org/opensearch/gateway/BaseGatewayShardAllocator.java | 1 - .../org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java b/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java index 220d42ffa2d59..b2002a94eaed6 100644 --- a/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java @@ -64,5 +64,5 @@ public void run() { onComplete(); } - public void onComplete(){} + public void onComplete() {} } diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index aaa712ef5e523..a3e0b590a9743 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -47,7 +47,6 @@ import org.opensearch.core.index.shard.ShardId; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index 89acfe2169ef3..7484502610bdb 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -41,7 +41,6 @@ import org.junit.Before; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; From 7508e93199b285356d3cde0ae01099d4dd213505 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Fri, 26 Jul 2024 11:49:17 +0530 Subject: [PATCH 03/13] Add java doc and minor fix Signed-off-by: Rishab Nahata --- .../org/opensearch/common/util/BatchRunnableExecutor.java | 5 +++++ .../org/opensearch/gateway/BaseGatewayShardAllocator.java | 3 +++ 2 files changed, 8 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java b/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java index b2002a94eaed6..cfe2bbb85bda4 100644 --- a/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java @@ -64,5 +64,10 @@ public void run() { onComplete(); } + /** + * Callback method that is invoked after all {@link TimeoutAwareRunnable} instances in the batch have been processed. + * By default, this method does nothing, but it can be overridden by subclasses or modified in the implementation if + * there is a need to perform additional actions once the batch execution is completed. + */ public void onComplete() {} } diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index a3e0b590a9743..41704545c7a6f 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -82,6 +82,9 @@ public void allocateUnassigned( } protected void allocateUnassignedBatchOnTimeout(Set shardIds, RoutingAllocation allocation, boolean primary) { + if (shardIds.isEmpty()) { + return; + } RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { ShardRouting unassignedShard = iterator.next(); From b32e7c8bce08fc20fd15bad55a4de78cd5634c16 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 27 Jul 2024 08:59:44 +0530 Subject: [PATCH 04/13] Trigger Build Signed-off-by: Rishab Nahata From 09fea0c7326ad3cc486d42f5098a958090ee182c Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 27 Jul 2024 13:34:18 +0530 Subject: [PATCH 05/13] Trigger Build Signed-off-by: Rishab Nahata From a7f0f0bb514b9816069bffdd996f6078ee5baf68 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 27 Jul 2024 16:55:08 +0530 Subject: [PATCH 06/13] Update test Signed-off-by: Rishab Nahata --- .../util/BatchRunnableExecutorTests.java | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/common/util/BatchRunnableExecutorTests.java b/server/src/test/java/org/opensearch/common/util/BatchRunnableExecutorTests.java index 269f89faec54d..2f63ae43d0ded 100644 --- a/server/src/test/java/org/opensearch/common/util/BatchRunnableExecutorTests.java +++ b/server/src/test/java/org/opensearch/common/util/BatchRunnableExecutorTests.java @@ -15,6 +15,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; import static org.mockito.Mockito.atMost; @@ -42,7 +43,13 @@ public void setupRunnables() { public void testRunWithoutTimeout() { setupRunnables(); timeoutSupplier = () -> TimeValue.timeValueSeconds(1); - BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier); + CountDownLatch countDownLatch = new CountDownLatch(1); + BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier) { + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }; executor.run(); verify(runnable1, times(1)).run(); verify(runnable2, times(1)).run(); @@ -50,12 +57,19 @@ public void testRunWithoutTimeout() { verify(runnable1, never()).onTimeout(); verify(runnable2, never()).onTimeout(); verify(runnable3, never()).onTimeout(); + assertEquals(0, countDownLatch.getCount()); } public void testRunWithTimeout() { setupRunnables(); timeoutSupplier = () -> TimeValue.timeValueNanos(1); - BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier); + CountDownLatch countDownLatch = new CountDownLatch(1); + BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier) { + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }; executor.run(); verify(runnable1, times(1)).onTimeout(); verify(runnable2, times(1)).onTimeout(); @@ -63,12 +77,19 @@ public void testRunWithTimeout() { verify(runnable1, never()).run(); verify(runnable2, never()).run(); verify(runnable3, never()).run(); + assertEquals(0, countDownLatch.getCount()); } public void testRunWithPartialTimeout() { setupRunnables(); timeoutSupplier = () -> TimeValue.timeValueMillis(50); - BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier); + CountDownLatch countDownLatch = new CountDownLatch(1); + BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier) { + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }; doAnswer(invocation -> { Thread.sleep(100); return null; @@ -81,11 +102,18 @@ public void testRunWithPartialTimeout() { verify(runnable3, atMost(1)).onTimeout(); verify(runnable2, atMost(1)).onTimeout(); verify(runnable3, atMost(1)).onTimeout(); + assertEquals(0, countDownLatch.getCount()); } public void testRunWithEmptyRunnableList() { setupRunnables(); - BatchRunnableExecutor executor = new BatchRunnableExecutor(Collections.emptyList(), timeoutSupplier); + CountDownLatch countDownLatch = new CountDownLatch(1); + BatchRunnableExecutor executor = new BatchRunnableExecutor(Collections.emptyList(), timeoutSupplier) { + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }; executor.run(); verify(runnable1, never()).onTimeout(); verify(runnable2, never()).onTimeout(); @@ -93,5 +121,6 @@ public void testRunWithEmptyRunnableList() { verify(runnable1, never()).run(); verify(runnable2, never()).run(); verify(runnable3, never()).run(); + assertEquals(1, countDownLatch.getCount()); } } From 89cc56d7b42b1530433fad0c54508a20f3b2ae11 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 30 Jul 2024 22:58:33 +0530 Subject: [PATCH 07/13] Add test for pre-emptive return Signed-off-by: Rishab Nahata --- .../PrimaryShardBatchAllocatorTests.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index 7484502610bdb..48183fed66671 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -303,6 +303,25 @@ public void testAllocateUnassignedBatchOnTimeoutWithNonPrimaryShards() { assertEquals(1, ignoredShards.size()); } + public void testAllocateUnassignedBatchOnTimeoutWithNoShards() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random()); + setUpShards(1); + final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0"); + + ShardRouting shardRouting = routingAllocation.routingTable() + .getIndicesRouting() + .get("test") + .shard(shardId.id()) + .replicaShards() + .get(0); + Set shardIds = new HashSet<>(); + batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, false); + + List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); + assertEquals(0, ignoredShards.size()); + } + private RoutingAllocation routingAllocationWithOnePrimary( AllocationDeciders deciders, UnassignedInfo.Reason reason, From ee8e7c535479822941cb552a7fc87ab2a93ee8d8 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 31 Jul 2024 00:43:48 +0530 Subject: [PATCH 08/13] Add test Signed-off-by: Rishab Nahata --- .../gateway/ShardsBatchGatewayAllocator.java | 13 +++++++++++-- .../opensearch/gateway/GatewayAllocatorTests.java | 13 +++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index cd596c12fd36e..b1dbfb6066df3 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -794,11 +794,20 @@ public int getNumberOfStoreShardBatches() { return batchIdToStoreShardBatch.size(); } - private void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) { + protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) { this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout; } - private void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { + protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } + + // for tests + public Set getTimedOutPrimaryShardIds() { + return timedOutPrimaryShardIds; + } + + public Set getTimedOutReplicaShardIds() { + return timedOutReplicaShardIds; + } } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index bd56123f6df1f..951d7ab515307 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -32,6 +32,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BatchRunnableExecutor; import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; @@ -368,6 +369,18 @@ public void testCreatePrimaryAndReplicaExecutorOfSizeTwo() { assertEquals(executor.getTimeoutAwareRunnables().size(), 2); } + public void testCollectTimedOutShards() { + createIndexAndUpdateClusterState(2, 50, 2); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + executor.run(); + assertEquals(100, testShardsBatchGatewayAllocator.getTimedOutPrimaryShardIds().size()); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + executor.run(); + assertEquals(100, testShardsBatchGatewayAllocator.getTimedOutReplicaShardIds().size()); + } + private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) { if (count == 0) return; Metadata.Builder metadata = Metadata.builder(); From fad910ed219e72b9532069463e9e601dcfa3b74a Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 31 Jul 2024 01:16:18 +0530 Subject: [PATCH 09/13] Fix spotless Signed-off-by: Rishab Nahata --- .../test/java/org/opensearch/gateway/GatewayAllocatorTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index 49e9b8c550242..8849a5cfdcf60 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -385,6 +385,7 @@ public void testCollectTimedOutShards() { executor.run(); assertEquals(100, testShardsBatchGatewayAllocator.getTimedOutReplicaShardIds().size()); } + public void testPrimaryAllocatorTimeout() { // Valid setting with timeout = 20s Settings build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build(); From 235ea109ebff51a28e8829c3bb88fb3fa28f879b Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 31 Jul 2024 11:53:36 +0530 Subject: [PATCH 10/13] Trigger Build Signed-off-by: Rishab Nahata From 3cfa046dd49fbf82ff0abff49103769ae2c19922 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 7 Aug 2024 15:47:07 +0530 Subject: [PATCH 11/13] move timeout set inside method Signed-off-by: Rishab Nahata --- .../gateway/ShardsBatchGatewayAllocator.java | 15 ++------------- .../opensearch/gateway/GatewayAllocatorTests.java | 13 ------------- 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index b560e13ddffcf..d18304ea73ed0 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -153,8 +153,6 @@ public void validate(TimeValue timeValue) { protected final ConcurrentMap batchIdToStoreShardBatch = ConcurrentCollections.newConcurrentMap(); private final TransportNodesListGatewayStartedShardsBatch batchStartedAction; private final TransportNodesListShardStoreMetadataBatch batchStoreAction; - private Set timedOutPrimaryShardIds; - private Set timedOutReplicaShardIds; @Inject public ShardsBatchGatewayAllocator( @@ -279,7 +277,7 @@ protected BatchRunnableExecutor innerAllocateUnassignedBatch( } List runnables = new ArrayList<>(); if (primary) { - timedOutPrimaryShardIds = new HashSet<>(); + Set timedOutPrimaryShardIds = new HashSet<>(); batchIdToStartedShardBatch.values() .stream() .filter(batch -> batchesToAssign.contains(batch.batchId)) @@ -302,7 +300,7 @@ public void onComplete() { } }; } else { - timedOutReplicaShardIds = new HashSet<>(); + Set timedOutReplicaShardIds = new HashSet<>(); batchIdToStoreShardBatch.values() .stream() .filter(batch -> batchesToAssign.contains(batch.batchId)) @@ -865,13 +863,4 @@ protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatew protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } - - // for tests - public Set getTimedOutPrimaryShardIds() { - return timedOutPrimaryShardIds; - } - - public Set getTimedOutReplicaShardIds() { - return timedOutReplicaShardIds; - } } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index 8849a5cfdcf60..1596a0b566b28 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -32,7 +32,6 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BatchRunnableExecutor; import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; @@ -374,18 +373,6 @@ public void testCreatePrimaryAndReplicaExecutorOfSizeTwo() { assertEquals(executor.getTimeoutAwareRunnables().size(), 2); } - public void testCollectTimedOutShards() { - createIndexAndUpdateClusterState(2, 50, 2); - testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); - testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); - executor.run(); - assertEquals(100, testShardsBatchGatewayAllocator.getTimedOutPrimaryShardIds().size()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); - executor.run(); - assertEquals(100, testShardsBatchGatewayAllocator.getTimedOutReplicaShardIds().size()); - } - public void testPrimaryAllocatorTimeout() { // Valid setting with timeout = 20s Settings build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build(); From 89338798eceee1fa58fee85699e935689d0d6dec Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 8 Aug 2024 11:39:47 +0530 Subject: [PATCH 12/13] Trigger Build Signed-off-by: Rishab Nahata From 9aee4e94b5ed123a321420c5c7c82671e743df51 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 8 Aug 2024 15:19:22 +0530 Subject: [PATCH 13/13] Add test Signed-off-by: Rishab Nahata --- .../gateway/GatewayAllocatorTests.java | 21 +++++++++++++++++++ .../TestShardBatchGatewayAllocator.java | 21 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index 1596a0b566b28..c7eae77d6deba 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -32,6 +32,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BatchRunnableExecutor; import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; @@ -45,6 +46,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING; @@ -423,6 +426,24 @@ public void testReplicaAllocatorTimeout() { assertEquals(-1, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); } + public void testCollectTimedOutShards() throws InterruptedException { + createIndexAndUpdateClusterState(2, 5, 2); + CountDownLatch latch = new CountDownLatch(10); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(latch); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + executor.run(); + assertTrue(latch.await(1, TimeUnit.MINUTES)); + latch = new CountDownLatch(10); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(latch); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + executor.run(); + assertTrue(latch.await(1, TimeUnit.MINUTES)); + } + private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) { if (count == 0) return; Metadata.Builder metadata = Metadata.builder(); diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java index 0eb4bb6935bac..156b1d7c620e6 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java @@ -29,13 +29,20 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator { + CountDownLatch latch; + public TestShardBatchGatewayAllocator() { } + public TestShardBatchGatewayAllocator(CountDownLatch latch) { + this.latch = latch; + } + public TestShardBatchGatewayAllocator(long maxBatchSize) { super(maxBatchSize); } @@ -83,6 +90,13 @@ protected AsyncShardFetch.FetchResult(foundShards, shardsToIgnoreNodes); } + + @Override + protected void allocateUnassignedBatchOnTimeout(Set shardIds, RoutingAllocation allocation, boolean primary) { + for (int i = 0; i < shardIds.size(); i++) { + latch.countDown(); + } + } }; ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator() { @@ -100,6 +114,13 @@ protected AsyncShardFetch.FetchResult shardIds, RoutingAllocation allocation, boolean primary) { + for (int i = 0; i < shardIds.size(); i++) { + latch.countDown(); + } + } }; @Override