From 9b256fbe4626d018405c30d9982fb888a79faf54 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sun, 21 Jul 2024 05:56:47 +0530 Subject: [PATCH] Simplify batch allocators with timeouts --- .../AllocateUnassignedDecision.java | 2 +- .../routing/allocation/AllocationService.java | 19 ++++++++++-- .../allocation/ExistingShardsAllocator.java | 8 +++-- .../gateway/PrimaryShardBatchAllocator.java | 14 +++++++++ .../gateway/ReplicaShardBatchAllocator.java | 17 +++++++++++ .../gateway/ShardsBatchGatewayAllocator.java | 30 +++++++++++++++---- .../TestShardBatchGatewayAllocator.java | 5 ++-- 7 files changed, 82 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocateUnassignedDecision.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocateUnassignedDecision.java index 4e77ab772f390..43c6c19315cd0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocateUnassignedDecision.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocateUnassignedDecision.java @@ -105,7 +105,7 @@ public class AllocateUnassignedDecision extends AbstractAllocationDecision { private final long remainingDelayInMillis; private final long configuredDelayInMillis; - private AllocateUnassignedDecision( + public AllocateUnassignedDecision( AllocationStatus allocationStatus, DiscoveryNode assignedNode, String allocationId, diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 5ad3a2fd47ce3..9eeb75d4f05aa 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -73,7 +73,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -100,6 +102,7 @@ public class AllocationService { private final ClusterInfoService clusterInfoService; private SnapshotsInfoService snapshotsInfoService; private final ClusterManagerMetrics clusterManagerMetrics; + private final long maxRunTimeoutInMillis = 5; // only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator public AllocationService( @@ -617,10 +620,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { private void allocateAllUnassignedShards(RoutingAllocation allocation) { ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); - allocator.allocateAllUnassignedShards(allocation, true); + executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, true), () -> maxRunTimeoutInMillis); allocator.afterPrimariesBeforeReplicas(allocation); // Replicas Assignment - allocator.allocateAllUnassignedShards(allocation, false); + executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, false), () -> maxRunTimeoutInMillis); + } + + private void executeTimedRunnables(List> runnables, Supplier maxRunTimeSupplier) { + Collections.shuffle(runnables); + long startTime = System.nanoTime(); + for (Consumer workQueue : runnables) { + if (System.nanoTime() - startTime < TimeValue.timeValueMillis(maxRunTimeSupplier.get()).nanos()) { + workQueue.accept(false); + } else { + workQueue.accept(true); + } + } } private void disassociateDeadNodes(RoutingAllocation allocation) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index fb2a37237f8b6..097ee95d186fa 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -41,7 +41,9 @@ import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.ShardsBatchGatewayAllocator; +import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; /** * Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is @@ -108,14 +110,16 @@ void allocateUnassigned( * * Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator} */ - default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) { + default List> allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) { RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + List> runnables = new ArrayList<>(); while (iterator.hasNext()) { ShardRouting shardRouting = iterator.next(); if (shardRouting.primary() == primary) { - allocateUnassigned(shardRouting, allocation, iterator); + runnables.add((t) -> allocateUnassigned(shardRouting, allocation, iterator)); } } + return runnables; } /** diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index c493bf717c97f..617902b7506a9 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -12,6 +12,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.core.index.shard.ShardId; @@ -121,6 +122,19 @@ public void allocateUnassignedBatch(List shardRoutings, RoutingAll logger.trace("Finished shard allocation execution for unassigned primary shards: {}", shardRoutings.size()); } + public void allocateUnassignedBatchOnTimeout(List shardRoutings, RoutingAllocation allocation) { + Set batchShardRoutingSet = new HashSet<>(shardRoutings); + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + ShardRouting unassignedShard = iterator.next(); + AllocateUnassignedDecision allocationDecision; + if (unassignedShard.primary() && batchShardRoutingSet.contains(unassignedShard)) { + allocationDecision = new AllocateUnassignedDecision(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, null, null, null, false, 0L, 0L); + executeDecision(unassignedShard, allocationDecision, allocation, iterator); + } + } + } + /** * Transforms {@link FetchResult} of {@link NodeGatewayStartedShardsBatch} to {@link List} of {@link TransportNodesListGatewayStartedShards.NodeGatewayStartedShards}. *

diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 7c75f2a5d1a8f..99171b17a99e9 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -168,6 +168,23 @@ public void allocateUnassignedBatch(List shardRoutings, RoutingAll logger.trace("Finished shard allocation execution for unassigned replica shards: {}", shardRoutings.size()); } + public void allocateUnassignedBatchOnTimeout(List shardRoutings, RoutingAllocation allocation) { + Set shardIdsFromBatch = new HashSet<>(); + for (ShardRouting shardRouting : shardRoutings) { + ShardId shardId = shardRouting.shardId(); + shardIdsFromBatch.add(shardId); + } + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + ShardRouting unassignedShard = iterator.next(); + AllocateUnassignedDecision allocationDecision; + if (!unassignedShard.primary() && shardIdsFromBatch.contains(unassignedShard.shardId())) { + allocationDecision = new AllocateUnassignedDecision(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, null, null, null, false, 0L, 0L); + executeDecision(unassignedShard, allocationDecision, allocation, iterator); + } + } + } + private AllocateUnassignedDecision getUnassignedShardAllocationDecision( ShardRouting shardRouting, RoutingAllocation allocation, diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 3c0797cd450d2..6247f4a554f9f 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -41,6 +41,7 @@ import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -51,6 +52,7 @@ import java.util.Set; import java.util.Spliterators; import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -187,14 +189,14 @@ public void allocateUnassigned( } @Override - public void allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) { + public List> allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) { assert primaryShardBatchAllocator != null; assert replicaShardBatchAllocator != null; - innerAllocateUnassignedBatch(allocation, primaryShardBatchAllocator, replicaShardBatchAllocator, primary); + return innerAllocateUnassignedBatch(allocation, primaryShardBatchAllocator, replicaShardBatchAllocator, primary); } - protected void innerAllocateUnassignedBatch( + protected List> innerAllocateUnassignedBatch( RoutingAllocation allocation, PrimaryShardBatchAllocator primaryBatchShardAllocator, ReplicaShardBatchAllocator replicaBatchShardAllocator, @@ -203,21 +205,37 @@ protected void innerAllocateUnassignedBatch( // create batches for unassigned shards Set batchesToAssign = createAndUpdateBatches(allocation, primary); if (batchesToAssign.isEmpty()) { - return; + return Collections.emptyList(); } + List> runnables = new ArrayList<>(); if (primary) { batchIdToStartedShardBatch.values() .stream() .filter(batch -> batchesToAssign.contains(batch.batchId)) .forEach( - shardsBatch -> primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation) + shardsBatch -> runnables.add((timedOut) -> { + if(timedOut) { + primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(shardsBatch.getBatchedShardRoutings(), allocation); + } else { + primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); + } + }) ); } else { batchIdToStoreShardBatch.values() .stream() .filter(batch -> batchesToAssign.contains(batch.batchId)) - .forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation)); + .forEach( + batch -> runnables.add((timedOut) -> { + if(timedOut) { + replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(batch.getBatchedShardRoutings(), allocation); + } else { + replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation); + } + }) + ); } + return runnables; } // visible for testing diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java index fbb39c284f0ff..9b11ebc8cd429 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator { @@ -102,9 +103,9 @@ protected boolean hasInitiatedFetching(ShardRouting shard) { }; @Override - public void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) { + public List> allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) { currentNodes = allocation.nodes(); - innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary); + return innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary); } @Override