Skip to content

Commit

Permalink
Add UT for BatchRunnableExecutor and BatchAllocators
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Jul 22, 2024
1 parent cf9c9dd commit 25588fa
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,99 @@
package org.opensearch.common.util;public class BatchRunnableExecutorTest {
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import org.junit.Before;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

import static org.mockito.Mockito.*;

public class BatchRunnableExecutorTest extends OpenSearchTestCase {
private Supplier<TimeValue> timeoutSupplier;
private TimeoutAwareRunnable runnable1;
private TimeoutAwareRunnable runnable2;
private TimeoutAwareRunnable runnable3;
private List<TimeoutAwareRunnable> runnableList;

public void testRunWithoutTimeout() {
timeoutSupplier = mock(Supplier.class);
runnable1 = mock(TimeoutAwareRunnable.class);
runnable2 = mock(TimeoutAwareRunnable.class);
runnable3 = mock(TimeoutAwareRunnable.class);
runnableList = Arrays.asList(runnable1, runnable2, runnable3);
when(timeoutSupplier.get()).thenReturn(TimeValue.timeValueSeconds(1));
BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier);
executor.run();
verify(runnable1, times(1)).run();
verify(runnable2, times(1)).run();
verify(runnable3, times(1)).run();
verify(runnable1, never()).onTimeout();
verify(runnable2, never()).onTimeout();
verify(runnable3, never()).onTimeout();
}

public void testRunWithTimeout() {
timeoutSupplier = mock(Supplier.class);
runnable1 = mock(TimeoutAwareRunnable.class);
runnable2 = mock(TimeoutAwareRunnable.class);
runnable3 = mock(TimeoutAwareRunnable.class);
runnableList = Arrays.asList(runnable1, runnable2, runnable3);
when(timeoutSupplier.get()).thenReturn(TimeValue.timeValueNanos(1));
BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier);
executor.run();
verify(runnable1, times(1)).onTimeout();
verify(runnable2, times(1)).onTimeout();
verify(runnable3, times(1)).onTimeout();
verify(runnable1, never()).run();
verify(runnable2, never()).run();
verify(runnable3, never()).run();
}

// public void testRunWithPartialTimeout() {
// timeoutSupplier = mock(Supplier.class);
// runnable1 = mock(TimeoutAwareRunnable.class);
// runnable2 = mock(TimeoutAwareRunnable.class);
// runnable3 = mock(TimeoutAwareRunnable.class);
// runnableList = Arrays.asList(runnable1, runnable2, runnable3);
// when(timeoutSupplier.get()).thenReturn(TimeValue.timeValueMillis(100));
// BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier);
// doAnswer(invocationOnMock -> {
// Thread.sleep(1000);
// return null;
// }).when(runnable1).run();
// executor.run();
// verify(runnable1, times(1)).run();
// verify(runnable2, never()).run();
// verify(runnable3, never()).run();
// verify(runnable2, times(1)).onTimeout();
// verify(runnable3, times(1)).onTimeout();
// }

public void testRunWithEmptyRunnableList() {
timeoutSupplier = mock(Supplier.class);
runnable1 = mock(TimeoutAwareRunnable.class);
runnable2 = mock(TimeoutAwareRunnable.class);
runnable3 = mock(TimeoutAwareRunnable.class);
runnableList = Arrays.asList(runnable1, runnable2, runnable3);
BatchRunnableExecutor executor = new BatchRunnableExecutor(Collections.emptyList(), timeoutSupplier);
executor.run();
verify(runnable1, never()).onTimeout();
verify(runnable2, never()).onTimeout();
verify(runnable3, never()).onTimeout();
verify(runnable1, never()).run();
verify(runnable2, never()).run();
verify(runnable3, never()).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,7 @@
import org.opensearch.test.IndexSettingsModule;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED;
Expand Down Expand Up @@ -256,6 +249,47 @@ public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() {
assertEquals(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, ignoredShards.get(0).unassignedInfo().getLastAllocationStatus());
}

public void testAllocateUnassignedBatchOnTimeoutWithMatchingPrimaryShards() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
setUpShards(1);
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0");
ShardRouting shardRouting = routingAllocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();

List<ShardRouting> shardRoutings = Arrays.asList(shardRouting);
batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation);

List<ShardRouting> ignoredShards = routingAllocation.routingNodes().unassigned().ignored();
assertEquals(1, ignoredShards.size());
assertEquals(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, ignoredShards.get(0).unassignedInfo().getLastAllocationStatus());
}

public void testAllocateUnassignedBatchOnTimeoutWithNoMatchingPrimaryShards() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
setUpShards(1);
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0");
List<ShardRouting> shardRoutings = new ArrayList<>();
batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation);

List<ShardRouting> ignoredShards = routingAllocation.routingNodes().unassigned().ignored();
assertEquals(0, ignoredShards.size());
}

public void testAllocateUnassignedBatchOnTimeoutWithNonPrimaryShards() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
setUpShards(1);
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0");

ShardRouting shardRouting = routingAllocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).replicaShards().get(0);
List<ShardRouting> shardRoutings = Arrays.asList(shardRouting);
batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation);

List<ShardRouting> ignoredShards = routingAllocation.routingNodes().unassigned().ignored();
assertEquals(0, ignoredShards.size());
}

private RoutingAllocation routingAllocationWithOnePrimary(
AllocationDeciders deciders,
UnassignedInfo.Reason reason,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,30 @@ public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() t
assertEquals(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, allocateUnassignedDecision.getAllocationStatus());
}

public void testAllocateUnassignedBatchOnTimeoutWithUnassignedReplicaShard() {
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders());
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<ShardRouting> shards = new ArrayList<>();
while (iterator.hasNext()) {
shards.add(iterator.next());
}
testBatchAllocator.allocateUnassignedBatchOnTimeout(shards, allocation);
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertEquals(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.routingNodes().unassigned().ignored().get(0).unassignedInfo().getLastAllocationStatus());
}

public void testAllocateUnassignedBatchOnTimeoutWithAlreadyRecoveringReplicaShard() {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<ShardRouting> shards = new ArrayList<>();
while (iterator.hasNext()) {
shards.add(iterator.next());
}
testBatchAllocator.allocateUnassignedBatchOnTimeout(shards, allocation);
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
}

private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) {
return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED);
}
Expand Down

0 comments on commit 25588fa

Please sign in to comment.