Skip to content

Commit

Permalink
Remove timeout from decide allocate unassigned
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Jul 22, 2024
1 parent 3d8b9e4 commit cf9c9dd
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -163,13 +162,6 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<TimeValue> ALLOCATE_UNASSIGNED_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.routing.allocation.allocate_unassigned_timeout",
TimeValue.MINUS_ONE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

Expand All @@ -180,7 +172,6 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float shardBalanceFactor;
private volatile WeightFunction weightFunction;
private volatile float threshold;
private volatile TimeValue allocateUnassignedTimeout;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand All @@ -196,7 +187,6 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setAllocateUnassignedTimeout(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
Expand All @@ -205,7 +195,6 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING, this::setAllocateUnassignedTimeout);
}

/**
Expand Down Expand Up @@ -280,10 +269,6 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

private void setAllocateUnassignedTimeout(TimeValue allocateUnassignedTimeout) {
this.allocateUnassignedTimeout = allocateUnassignedTimeout;
}

@Override
public void allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
Expand All @@ -297,8 +282,7 @@ public void allocate(RoutingAllocation allocation) {
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance,
allocateUnassignedTimeout
preferPrimaryShardRebalance
);
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
Expand All @@ -321,13 +305,12 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance,
allocateUnassignedTimeout
preferPrimaryShardRebalance
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
if (shard.unassigned()) {
allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard, startTime);
allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard);
} else {
moveDecision = localShardsBalancer.decideMove(shard);
if (moveDecision.isDecisionTaken() && moveDecision.canRemain()) {
Expand Down Expand Up @@ -576,7 +559,7 @@ public Balancer(
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, TimeValue.MINUS_ONE);
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.PriorityComparator;

import java.util.ArrayList;
Expand Down Expand Up @@ -62,7 +61,6 @@ public class LocalShardsBalancer extends ShardsBalancer {

private final boolean preferPrimaryBalance;
private final boolean preferPrimaryRebalance;
private final TimeValue allocateUnassignedTimeout;
private final BalancedShardsAllocator.WeightFunction weight;

private final float threshold;
Expand All @@ -80,8 +78,7 @@ public LocalShardsBalancer(
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance,
boolean preferPrimaryRebalance,
TimeValue allocateUnassignedTimeout
boolean preferPrimaryRebalance
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -98,7 +95,6 @@ public LocalShardsBalancer(
this.preferPrimaryBalance = preferPrimaryBalance;
this.preferPrimaryRebalance = preferPrimaryRebalance;
this.shardMovementStrategy = shardMovementStrategy;
this.allocateUnassignedTimeout = allocateUnassignedTimeout;
}

/**
Expand Down Expand Up @@ -747,7 +743,6 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
*/
@Override
void allocateUnassigned() {
long startTime = System.nanoTime();
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
assert !nodes.isEmpty();
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -803,7 +798,7 @@ void allocateUnassigned() {
do {
for (int i = 0; i < primaryLength; i++) {
ShardRouting shard = primary[i];
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard, startTime);
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard);
final String assignedNodeId = allocationDecision.getTargetNode() != null
? allocationDecision.getTargetNode().getId()
: null;
Expand Down Expand Up @@ -876,8 +871,6 @@ void allocateUnassigned() {
secondaryLength = 0;
} while (primaryLength > 0);
// clear everything we have either added it or moved to ignoreUnassigned
logger.debug("Time taken in allocate unassigned [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}

/**
Expand All @@ -887,17 +880,12 @@ void allocateUnassigned() {
* is of type {@link Decision.Type#NO}, then the assigned node will be null.
*/
@Override
AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard, long startTime) {
AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
if (shard.assignedToNode()) {
// we only make decisions for unassigned shards here
return AllocateUnassignedDecision.NOT_TAKEN;
}

if (System.nanoTime() - startTime > allocateUnassignedTimeout.nanos()) {
logger.info("Timed out while running Local shard balancer allocate unassigned - outer loop");
return AllocateUnassignedDecision.throttle(null);
}

final boolean explain = allocation.debugDecision();
Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation);
if (shardLevelDecision.type() == Decision.Type.NO && explain == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private Map<String, Integer> calculateNodePrimaryShardCount(List<RoutingNode> re
}

@Override
AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting, long startTime) {
AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting) {
throw new UnsupportedOperationException("remote shards balancer does not support decision operations");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class ShardsBalancer {
* @param shardRouting the shard for which the decision has to be made
* @return the allocation decision
*/
abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting, long startTime);
abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting);

/**
* Makes a decision on whether to move a started shard to another node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.ALLOCATE_UNASSIGNED_TIMEOUT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down

0 comments on commit cf9c9dd

Please sign in to comment.