Skip to content

Commit

Permalink
Add test for Redis timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
eager-signal committed Jun 30, 2023
1 parent 463dd9d commit 457ecf1
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
Expand All @@ -22,81 +23,122 @@
import io.lettuce.core.event.EventBus;
import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import reactor.core.publisher.Flux;

class FaultTolerantRedisClusterTest {

private RedisAdvancedClusterCommands<String, String> clusterCommands;
private FaultTolerantRedisCluster faultTolerantCluster;

@SuppressWarnings("unchecked")
@BeforeEach
public void setUp() {
final RedisClusterClient clusterClient = mock(RedisClusterClient.class);
final StatefulRedisClusterConnection<String, String> clusterConnection = mock(StatefulRedisClusterConnection.class);
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class);
final ClientResources clientResources = mock(ClientResources.class);
final EventBus eventBus = mock(EventBus.class);

clusterCommands = mock(RedisAdvancedClusterCommands.class);

when(clusterClient.connect()).thenReturn(clusterConnection);
when(clusterClient.connectPubSub()).thenReturn(pubSubConnection);
when(clusterClient.getResources()).thenReturn(clientResources);
when(clusterConnection.sync()).thenReturn(clusterCommands);
when(clientResources.eventBus()).thenReturn(eventBus);
when(eventBus.get()).thenReturn(mock(Flux.class));

final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration();
breakerConfiguration.setFailureRateThreshold(100);
breakerConfiguration.setSlidingWindowSize(1);
breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1);
breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE);

final RetryConfiguration retryConfiguration = new RetryConfiguration();
retryConfiguration.setMaxAttempts(3);
retryConfiguration.setWaitDuration(0);

faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2),
breakerConfiguration, retryConfiguration);
}
private RedisAdvancedClusterCommands<String, String> clusterCommands;
private FaultTolerantRedisCluster faultTolerantCluster;

@Test
void testBreaker() {
when(clusterCommands.get(anyString()))
.thenReturn("value")
.thenThrow(new RuntimeException("Badness has ensued."));
@SuppressWarnings("unchecked")
@BeforeEach
public void setUp() {
final RedisClusterClient clusterClient = mock(RedisClusterClient.class);
final StatefulRedisClusterConnection<String, String> clusterConnection = mock(StatefulRedisClusterConnection.class);
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = mock(
StatefulRedisClusterPubSubConnection.class);
final ClientResources clientResources = mock(ClientResources.class);
final EventBus eventBus = mock(EventBus.class);

clusterCommands = mock(RedisAdvancedClusterCommands.class);

when(clusterClient.connect()).thenReturn(clusterConnection);
when(clusterClient.connectPubSub()).thenReturn(pubSubConnection);
when(clusterClient.getResources()).thenReturn(clientResources);
when(clusterConnection.sync()).thenReturn(clusterCommands);
when(clientResources.eventBus()).thenReturn(eventBus);
when(eventBus.get()).thenReturn(mock(Flux.class));

final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration();
breakerConfiguration.setFailureRateThreshold(100);
breakerConfiguration.setSlidingWindowSize(1);
breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1);
breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE);

final RetryConfiguration retryConfiguration = new RetryConfiguration();
retryConfiguration.setMaxAttempts(3);
retryConfiguration.setWaitDuration(0);

faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2),
breakerConfiguration, retryConfiguration);
}

@Test
void testBreaker() {
when(clusterCommands.get(anyString()))
.thenReturn("value")
.thenThrow(new RuntimeException("Badness has ensued."));

assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));

assertThrows(RedisException.class,
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));

assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
final RedisException redisException = assertThrows(RedisException.class,
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));

assertThrows(RedisException.class,
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
assertTrue(redisException.getCause() instanceof CallNotPermittedException);
}

final RedisException redisException = assertThrows(RedisException.class,
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
@Test
void testRetry() {
when(clusterCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");

assertTrue(redisException.getCause() instanceof CallNotPermittedException);
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));

when(clusterCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");

assertThrows(RedisCommandTimeoutException.class,
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));

}

@Nested
class WithRealCluster {

private static final Duration TIMEOUT = Duration.ofMillis(50);

private static final RetryConfiguration retryConfiguration = new RetryConfiguration();

static {
retryConfiguration.setMaxAttempts(1);
retryConfiguration.setWaitDuration(50);
}

@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder()
.retryConfiguration(retryConfiguration)
.timeout(TIMEOUT)
.build();

@Test
void testRetry() {
when(clusterCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");
void testTimeout() {
final FaultTolerantRedisCluster cluster = REDIS_CLUSTER_EXTENSION.getRedisCluster();

assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
assertTimeoutPreemptively(Duration.ofSeconds(1), () -> {
final ExecutionException asyncException = assertThrows(ExecutionException.class,
() -> cluster.withCluster(connection -> connection.async().blpop(TIMEOUT.toMillis() * 2, "key")).get());
assertTrue(asyncException.getCause() instanceof RedisCommandTimeoutException);

when(clusterCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");
assertThrows(RedisCommandTimeoutException.class,
() -> cluster.withCluster(connection -> connection.sync().blpop(TIMEOUT.toMillis() * 2, "key")));
});

assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,23 @@
import redis.embedded.RedisServer;
import redis.embedded.exceptions.EmbeddedRedisException;

public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback {
public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback,
AfterEachCallback {

private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(2);
private static final int NODE_COUNT = 2;

private static final RedisServer[] CLUSTER_NODES = new RedisServer[NODE_COUNT];

private final Duration timeout;
private final RetryConfiguration retryConfiguration;
private FaultTolerantRedisCluster redisCluster;

public RedisClusterExtension(final Duration timeout, final RetryConfiguration retryConfiguration) {
this.timeout = timeout;
this.retryConfiguration = retryConfiguration;
}


public static RedisClusterExtensionBuilder builder() {
return new RedisClusterExtensionBuilder();
Expand Down Expand Up @@ -78,9 +87,9 @@ public void beforeEach(final ExtensionContext context) throws Exception {

redisCluster = new FaultTolerantRedisCluster("test-cluster",
RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())),
Duration.ofSeconds(2),
timeout,
new CircuitBreakerConfiguration(),
new RetryConfiguration());
retryConfiguration);

redisCluster.useCluster(connection -> {
boolean setAll = false;
Expand Down Expand Up @@ -208,11 +217,24 @@ private static RedisServer startWithRetries(final int attemptsLeft) throws Excep

public static class RedisClusterExtensionBuilder {

private Duration timeout = DEFAULT_TIMEOUT;
private RetryConfiguration retryConfiguration = new RetryConfiguration();

private RedisClusterExtensionBuilder() {
}

RedisClusterExtensionBuilder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

RedisClusterExtensionBuilder retryConfiguration(RetryConfiguration retryConfiguration) {
this.retryConfiguration = retryConfiguration;
return this;
}

public RedisClusterExtension build() {
return new RedisClusterExtension();
return new RedisClusterExtension(timeout, retryConfiguration);
}
}
}

0 comments on commit 457ecf1

Please sign in to comment.