diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 922b433f5..415a4b775 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -22,81 +23,122 @@ import io.lettuce.core.event.EventBus; import io.lettuce.core.resource.ClientResources; import java.time.Duration; +import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import reactor.core.publisher.Flux; class FaultTolerantRedisClusterTest { - private RedisAdvancedClusterCommands clusterCommands; - private FaultTolerantRedisCluster faultTolerantCluster; - - @SuppressWarnings("unchecked") - @BeforeEach - public void setUp() { - final RedisClusterClient clusterClient = mock(RedisClusterClient.class); - final StatefulRedisClusterConnection clusterConnection = mock(StatefulRedisClusterConnection.class); - final StatefulRedisClusterPubSubConnection pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); - final ClientResources clientResources = mock(ClientResources.class); - final EventBus eventBus = mock(EventBus.class); - - clusterCommands = mock(RedisAdvancedClusterCommands.class); - - when(clusterClient.connect()).thenReturn(clusterConnection); - when(clusterClient.connectPubSub()).thenReturn(pubSubConnection); - when(clusterClient.getResources()).thenReturn(clientResources); - when(clusterConnection.sync()).thenReturn(clusterCommands); - when(clientResources.eventBus()).thenReturn(eventBus); - when(eventBus.get()).thenReturn(mock(Flux.class)); - - final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); - breakerConfiguration.setFailureRateThreshold(100); - breakerConfiguration.setSlidingWindowSize(1); - breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1); - breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); - - final RetryConfiguration retryConfiguration = new RetryConfiguration(); - retryConfiguration.setMaxAttempts(3); - retryConfiguration.setWaitDuration(0); - - faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), - breakerConfiguration, retryConfiguration); - } + private RedisAdvancedClusterCommands clusterCommands; + private FaultTolerantRedisCluster faultTolerantCluster; - @Test - void testBreaker() { - when(clusterCommands.get(anyString())) - .thenReturn("value") - .thenThrow(new RuntimeException("Badness has ensued.")); + @SuppressWarnings("unchecked") + @BeforeEach + public void setUp() { + final RedisClusterClient clusterClient = mock(RedisClusterClient.class); + final StatefulRedisClusterConnection clusterConnection = mock(StatefulRedisClusterConnection.class); + final StatefulRedisClusterPubSubConnection pubSubConnection = mock( + StatefulRedisClusterPubSubConnection.class); + final ClientResources clientResources = mock(ClientResources.class); + final EventBus eventBus = mock(EventBus.class); + + clusterCommands = mock(RedisAdvancedClusterCommands.class); + + when(clusterClient.connect()).thenReturn(clusterConnection); + when(clusterClient.connectPubSub()).thenReturn(pubSubConnection); + when(clusterClient.getResources()).thenReturn(clientResources); + when(clusterConnection.sync()).thenReturn(clusterCommands); + when(clientResources.eventBus()).thenReturn(eventBus); + when(eventBus.get()).thenReturn(mock(Flux.class)); + + final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); + breakerConfiguration.setFailureRateThreshold(100); + breakerConfiguration.setSlidingWindowSize(1); + breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1); + breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); + + final RetryConfiguration retryConfiguration = new RetryConfiguration(); + retryConfiguration.setMaxAttempts(3); + retryConfiguration.setWaitDuration(0); + + faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), + breakerConfiguration, retryConfiguration); + } + + @Test + void testBreaker() { + when(clusterCommands.get(anyString())) + .thenReturn("value") + .thenThrow(new RuntimeException("Badness has ensued.")); + + assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); + + assertThrows(RedisException.class, + () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); - assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); + final RedisException redisException = assertThrows(RedisException.class, + () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); - assertThrows(RedisException.class, - () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); + assertTrue(redisException.getCause() instanceof CallNotPermittedException); + } - final RedisException redisException = assertThrows(RedisException.class, - () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); + @Test + void testRetry() { + when(clusterCommands.get(anyString())) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenReturn("value"); - assertTrue(redisException.getCause() instanceof CallNotPermittedException); + assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); + + when(clusterCommands.get(anyString())) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenReturn("value"); + + assertThrows(RedisCommandTimeoutException.class, + () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); + + } + + @Nested + class WithRealCluster { + + private static final Duration TIMEOUT = Duration.ofMillis(50); + + private static final RetryConfiguration retryConfiguration = new RetryConfiguration(); + + static { + retryConfiguration.setMaxAttempts(1); + retryConfiguration.setWaitDuration(50); } + @RegisterExtension + static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder() + .retryConfiguration(retryConfiguration) + .timeout(TIMEOUT) + .build(); + @Test - void testRetry() { - when(clusterCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); + void testTimeout() { + final FaultTolerantRedisCluster cluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); - assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); + assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { + final ExecutionException asyncException = assertThrows(ExecutionException.class, + () -> cluster.withCluster(connection -> connection.async().blpop(TIMEOUT.toMillis() * 2, "key")).get()); + assertTrue(asyncException.getCause() instanceof RedisCommandTimeoutException); - when(clusterCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); + assertThrows(RedisCommandTimeoutException.class, + () -> cluster.withCluster(connection -> connection.sync().blpop(TIMEOUT.toMillis() * 2, "key"))); + }); - assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); } + } + } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java index 67b04df06..4d1ca048d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java @@ -32,14 +32,23 @@ import redis.embedded.RedisServer; import redis.embedded.exceptions.EmbeddedRedisException; -public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback { +public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, + AfterEachCallback { + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(2); private static final int NODE_COUNT = 2; private static final RedisServer[] CLUSTER_NODES = new RedisServer[NODE_COUNT]; + private final Duration timeout; + private final RetryConfiguration retryConfiguration; private FaultTolerantRedisCluster redisCluster; + public RedisClusterExtension(final Duration timeout, final RetryConfiguration retryConfiguration) { + this.timeout = timeout; + this.retryConfiguration = retryConfiguration; + } + public static RedisClusterExtensionBuilder builder() { return new RedisClusterExtensionBuilder(); @@ -78,9 +87,9 @@ public void beforeEach(final ExtensionContext context) throws Exception { redisCluster = new FaultTolerantRedisCluster("test-cluster", RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), - Duration.ofSeconds(2), + timeout, new CircuitBreakerConfiguration(), - new RetryConfiguration()); + retryConfiguration); redisCluster.useCluster(connection -> { boolean setAll = false; @@ -208,11 +217,24 @@ private static RedisServer startWithRetries(final int attemptsLeft) throws Excep public static class RedisClusterExtensionBuilder { + private Duration timeout = DEFAULT_TIMEOUT; + private RetryConfiguration retryConfiguration = new RetryConfiguration(); + private RedisClusterExtensionBuilder() { } + RedisClusterExtensionBuilder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + RedisClusterExtensionBuilder retryConfiguration(RetryConfiguration retryConfiguration) { + this.retryConfiguration = retryConfiguration; + return this; + } + public RedisClusterExtension build() { - return new RedisClusterExtension(); + return new RedisClusterExtension(timeout, retryConfiguration); } } }