Skip to content

Commit

Permalink
Simplify batch allocators with timeouts
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar authored and imRishN committed Jul 22, 2024
1 parent 6b8b3ef commit 6d2a9d3
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class AllocateUnassignedDecision extends AbstractAllocationDecision {
private final long remainingDelayInMillis;
private final long configuredDelayInMillis;

private AllocateUnassignedDecision(
public AllocateUnassignedDecision(
AllocationStatus allocationStatus,
DiscoveryNode assignedNode,
String allocationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand All @@ -100,6 +102,7 @@ public class AllocationService {
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;
private final ClusterManagerMetrics clusterManagerMetrics;
private final long maxRunTimeoutInMillis = 5;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand Down Expand Up @@ -617,10 +620,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, true), () -> maxRunTimeoutInMillis);
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, false), () -> maxRunTimeoutInMillis);
}

private void executeTimedRunnables(List<Consumer<Boolean>> runnables, Supplier<Long> maxRunTimeSupplier) {
Collections.shuffle(runnables);
long startTime = System.nanoTime();
for (Consumer<Boolean> workQueue : runnables) {
if (System.nanoTime() - startTime < TimeValue.timeValueMillis(maxRunTimeSupplier.get()).nanos()) {
workQueue.accept(false);
} else {
workQueue.accept(true);
}
}
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is
Expand Down Expand Up @@ -108,14 +110,16 @@ void allocateUnassigned(
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
default List<Consumer<Boolean>> allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<Consumer<Boolean>> runnables = new ArrayList<>();
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
if (shardRouting.primary() == primary) {
allocateUnassigned(shardRouting, allocation, iterator);
runnables.add((t) -> allocateUnassigned(shardRouting, allocation, iterator));
}
}
return runnables;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -121,6 +122,19 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
logger.trace("Finished shard allocation execution for unassigned primary shards: {}", shardRoutings.size());
}

public void allocateUnassignedBatchOnTimeout(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
Set<ShardRouting> batchShardRoutingSet = new HashSet<>(shardRoutings);
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() && batchShardRoutingSet.contains(unassignedShard)) {
allocationDecision = new AllocateUnassignedDecision(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, null, null, null, false, 0L, 0L);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

/**
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShardsBatch} to {@link List} of {@link TransportNodesListGatewayStartedShards.NodeGatewayStartedShards}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,23 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
logger.trace("Finished shard allocation execution for unassigned replica shards: {}", shardRoutings.size());
}

public void allocateUnassignedBatchOnTimeout(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
Set<ShardId> shardIdsFromBatch = new HashSet<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardId shardId = shardRouting.shardId();
shardIdsFromBatch.add(shardId);
}
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (!unassignedShard.primary() && shardIdsFromBatch.contains(unassignedShard.shardId())) {
allocationDecision = new AllocateUnassignedDecision(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, null, null, null, false, 0L, 0L);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
ShardRouting shardRouting,
RoutingAllocation allocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -51,6 +52,7 @@
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -187,14 +189,14 @@ public void allocateUnassigned(
}

@Override
public void allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) {
public List<Consumer<Boolean>> allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) {

assert primaryShardBatchAllocator != null;
assert replicaShardBatchAllocator != null;
innerAllocateUnassignedBatch(allocation, primaryShardBatchAllocator, replicaShardBatchAllocator, primary);
return innerAllocateUnassignedBatch(allocation, primaryShardBatchAllocator, replicaShardBatchAllocator, primary);
}

protected void innerAllocateUnassignedBatch(
protected List<Consumer<Boolean>> innerAllocateUnassignedBatch(
RoutingAllocation allocation,
PrimaryShardBatchAllocator primaryBatchShardAllocator,
ReplicaShardBatchAllocator replicaBatchShardAllocator,
Expand All @@ -203,21 +205,37 @@ protected void innerAllocateUnassignedBatch(
// create batches for unassigned shards
Set<String> batchesToAssign = createAndUpdateBatches(allocation, primary);
if (batchesToAssign.isEmpty()) {
return;
return Collections.emptyList();
}
List<Consumer<Boolean>> runnables = new ArrayList<>();
if (primary) {
batchIdToStartedShardBatch.values()
.stream()
.filter(batch -> batchesToAssign.contains(batch.batchId))
.forEach(
shardsBatch -> primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation)
shardsBatch -> runnables.add((timedOut) -> {
if(timedOut) {
primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(shardsBatch.getBatchedShardRoutings(), allocation);
} else {
primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation);
}
})
);
} else {
batchIdToStoreShardBatch.values()
.stream()
.filter(batch -> batchesToAssign.contains(batch.batchId))
.forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation));
.forEach(
batch -> runnables.add((timedOut) -> {
if(timedOut) {
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(batch.getBatchedShardRoutings(), allocation);
} else {
replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation);
}
})
);
}
return runnables;
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator {

Expand Down Expand Up @@ -102,9 +103,9 @@ protected boolean hasInitiatedFetching(ShardRouting shard) {
};

@Override
public void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
public List<Consumer<Boolean>> allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
currentNodes = allocation.nodes();
innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary);
return innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary);
}

@Override
Expand Down

0 comments on commit 6d2a9d3

Please sign in to comment.