Skip to content

Commit

Permalink
Improve two @Disabled flaky tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eager-signal committed Jun 29, 2023
1 parent d7bf815 commit fb39b2e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -46,7 +45,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
Expand All @@ -64,10 +62,10 @@
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

class MessagesCacheTest {

Expand Down Expand Up @@ -586,14 +584,14 @@ void testGetAllMessagesLimitsAndBackpressure() {
// is subscribed. Rather, we should be fetching in pages to satisfy downstream requests, so that memory usage
// is limited to few pages of messages

// we use a combination of Flux.just() and Sinks to control when data is “fetched” from the cache. The initial
// Flux.just()s are pages that are readily available, on demand. By design, there are more of these pages than
// the initial prefetch. The sinks allow us to create extra demand but defer producing values to satisfy the demand
// until later on.
// we use a combination of Flux.just() and TestPublishers to control when data is “fetched” and emitted from the
// cache. The initial Flux.just()s are pages that are readily available, on demand. By design, there are more of
// these pages than the initial prefetch. The publishers allow us to create extra demand but defer producing
// values to satisfy the demand until later on.

final AtomicReference<FluxSink<Object>> page4Sink = new AtomicReference<>();
final AtomicReference<FluxSink<Object>> page56Sink = new AtomicReference<>();
final AtomicReference<FluxSink<Object>> emptyFinalPageSink = new AtomicReference<>();
final TestPublisher<Object> page4Publisher = TestPublisher.create();
final TestPublisher<Object> page56Publisher = TestPublisher.create();
final TestPublisher<Object> emptyFinalPagePublisher = TestPublisher.create();

final Deque<List<byte[]>> pages = new ArrayDeque<>();
pages.add(generatePage());
Expand All @@ -608,9 +606,9 @@ void testGetAllMessagesLimitsAndBackpressure() {
.thenReturn(Flux.just(pages.pop()))
.thenReturn(Flux.just(pages.pop()))
.thenReturn(Flux.just(pages.pop()))
.thenReturn(Flux.create(sink -> page4Sink.compareAndSet(null, sink)))
.thenReturn(Flux.create(sink -> page56Sink.compareAndSet(null, sink)))
.thenReturn(Flux.create(sink -> emptyFinalPageSink.compareAndSet(null, sink)))
.thenReturn(Flux.from(page4Publisher))
.thenReturn(Flux.from(page56Publisher))
.thenReturn(Flux.from(emptyFinalPagePublisher))
.thenReturn(Flux.empty());

final Flux<?> allMessages = messagesCache.getAllMessages(UUID.randomUUID(), 1L);
Expand All @@ -635,9 +633,9 @@ void testGetAllMessagesLimitsAndBackpressure() {
.thenRequest(halfPage) // page 0.5 requested
.expectNextCount(halfPage) // page 0.5 produced
// page 0.5 produced, 1.5 remain, so no additional interactions with the cache cluster
.then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.get())).evalsha(any(),
.then(() -> verify(reactiveCommands, atMost(expectedReactiveCommandInvocations.get())).evalsha(any(),
any(), any(), any()))
.then(() -> assertNull(page4Sink.get(), "page 4 should not have been fetched yet"))
.then(page4Publisher::assertWasNotRequested)
.thenRequest(page) // page 1.5 requested
.expectNextCount(page) // page 1.5 produced

Expand All @@ -647,26 +645,26 @@ void testGetAllMessagesLimitsAndBackpressure() {
// also NB: times() checks cumulative calls, hence addAndGet
.then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(1))).evalsha(any(),
any(), any(), any()))
.then(() -> assertNotNull(page4Sink.get(), "page 4 should have been fetched"))
.then(page4Publisher::assertWasSubscribed)
.thenRequest(page + halfPage) // page 3 requested
.expectNextCount(page + halfPage) // page 1.5–3 produced

.thenRequest(halfPage) // page 3.5 requested
.then(() -> assertNull(page56Sink.get(), "page 5 should not have been fetched yet"))
.then(() -> page4Sink.get().next(pages.pop()).complete())
.then(page56Publisher::assertWasNotRequested)
.then(() -> page4Publisher.emit(pages.pop()))
.expectNextCount(halfPage) // page 3.5 produced
.then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(1))).evalsha(any(),
any(), any(), any()))
.then(() -> assertNotNull(page56Sink.get(), "page 5 should have been fetched"))
.then(page56Publisher::assertWasSubscribed)

.thenRequest(page) // page 4.5 requested
.expectNextCount(halfPage) // page 4 produced

.thenRequest(page * 4) // request more demand than we will ultimately satisfy

.then(() -> page56Sink.get().next(pages.pop()).next(pages.pop()).complete())
.then(() -> page56Publisher.next(pages.pop()).next(pages.pop()).complete())
.expectNextCount(page + page) // page 5 and 6 produced
.then(() -> emptyFinalPageSink.get().complete())
.then(emptyFinalPagePublisher::complete)
// confirm that cache calls increased by 2: one for page 5-and-6 (we got a two-fer in next(pop()).next(pop()),
// and one for the final, empty page
.then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(2))).evalsha(any(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,10 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
Expand All @@ -68,10 +65,10 @@
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import org.whispersystems.websocket.session.WebSocketSessionContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

class WebSocketConnectionTest {

Expand Down Expand Up @@ -744,7 +741,6 @@ void testRetrieveMessageExceptionClientDisconnected() {
}

@Test
@Disabled("This test is flaky")
void testReactivePublisherLimitRate() {
final UUID accountUuid = UUID.randomUUID();

Expand All @@ -754,18 +750,10 @@ void testReactivePublisherLimitRate() {
when(account.getNumber()).thenReturn("+14152222222");
when(account.getUuid()).thenReturn(accountUuid);

final int totalMessages = 10;
final AtomicReference<FluxSink<Envelope>> sink = new AtomicReference<>();
final int totalMessages = 1000;

final AtomicLong maxRequest = new AtomicLong(-1);
final Flux<Envelope> flux = Flux.create(s -> {
sink.set(s);
s.onRequest(n -> {
if (maxRequest.get() < n) {
maxRequest.set(n);
}
});
});
final TestPublisher<Envelope> testPublisher = TestPublisher.createCold();
final Flux<Envelope> flux = Flux.from(testPublisher);

when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(deviceId), anyBoolean()))
.thenReturn(flux);
Expand All @@ -790,16 +778,16 @@ void testReactivePublisherLimitRate() {
.thenRequest(totalMessages * 2)
.then(() -> {
for (long i = 0; i < totalMessages; i++) {
sink.get().next(createMessage(UUID.randomUUID(), accountUuid, 1111 * i + 1, "message " + i));
testPublisher.next(createMessage(UUID.randomUUID(), accountUuid, 1111 * i + 1, "message " + i));
}
sink.get().complete();
testPublisher.complete();
})
.expectNextCount(totalMessages)
.expectComplete()
.log()
.verify();

assertEquals(WebSocketConnection.MESSAGE_PUBLISHER_LIMIT_RATE, maxRequest.get());
testPublisher.assertMaxRequested(WebSocketConnection.MESSAGE_PUBLISHER_LIMIT_RATE);
}

@Test
Expand Down

0 comments on commit fb39b2e

Please sign in to comment.