From fb39b2edaf5dfa66f00146d2fb61e59021a764cf Mon Sep 17 00:00:00 2001 From: Chris Eager <79161849+eager-signal@users.noreply.github.com> Date: Thu, 29 Jun 2023 14:56:41 -0500 Subject: [PATCH] Improve two `@Disabled` flaky tests --- .../storage/MessagesCacheTest.java | 42 +++++++++---------- .../websocket/WebSocketConnectionTest.java | 26 ++++-------- 2 files changed, 27 insertions(+), 41 deletions(-) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 18e39f02d..55092f292 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -7,12 +7,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -46,7 +45,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; @@ -64,10 +62,10 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; class MessagesCacheTest { @@ -586,14 +584,14 @@ void testGetAllMessagesLimitsAndBackpressure() { // is subscribed. Rather, we should be fetching in pages to satisfy downstream requests, so that memory usage // is limited to few pages of messages - // we use a combination of Flux.just() and Sinks to control when data is “fetched” from the cache. The initial - // Flux.just()s are pages that are readily available, on demand. By design, there are more of these pages than - // the initial prefetch. The sinks allow us to create extra demand but defer producing values to satisfy the demand - // until later on. + // we use a combination of Flux.just() and TestPublishers to control when data is “fetched” and emitted from the + // cache. The initial Flux.just()s are pages that are readily available, on demand. By design, there are more of + // these pages than the initial prefetch. The publishers allow us to create extra demand but defer producing + // values to satisfy the demand until later on. - final AtomicReference> page4Sink = new AtomicReference<>(); - final AtomicReference> page56Sink = new AtomicReference<>(); - final AtomicReference> emptyFinalPageSink = new AtomicReference<>(); + final TestPublisher page4Publisher = TestPublisher.create(); + final TestPublisher page56Publisher = TestPublisher.create(); + final TestPublisher emptyFinalPagePublisher = TestPublisher.create(); final Deque> pages = new ArrayDeque<>(); pages.add(generatePage()); @@ -608,9 +606,9 @@ void testGetAllMessagesLimitsAndBackpressure() { .thenReturn(Flux.just(pages.pop())) .thenReturn(Flux.just(pages.pop())) .thenReturn(Flux.just(pages.pop())) - .thenReturn(Flux.create(sink -> page4Sink.compareAndSet(null, sink))) - .thenReturn(Flux.create(sink -> page56Sink.compareAndSet(null, sink))) - .thenReturn(Flux.create(sink -> emptyFinalPageSink.compareAndSet(null, sink))) + .thenReturn(Flux.from(page4Publisher)) + .thenReturn(Flux.from(page56Publisher)) + .thenReturn(Flux.from(emptyFinalPagePublisher)) .thenReturn(Flux.empty()); final Flux allMessages = messagesCache.getAllMessages(UUID.randomUUID(), 1L); @@ -635,9 +633,9 @@ void testGetAllMessagesLimitsAndBackpressure() { .thenRequest(halfPage) // page 0.5 requested .expectNextCount(halfPage) // page 0.5 produced // page 0.5 produced, 1.5 remain, so no additional interactions with the cache cluster - .then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.get())).evalsha(any(), + .then(() -> verify(reactiveCommands, atMost(expectedReactiveCommandInvocations.get())).evalsha(any(), any(), any(), any())) - .then(() -> assertNull(page4Sink.get(), "page 4 should not have been fetched yet")) + .then(page4Publisher::assertWasNotRequested) .thenRequest(page) // page 1.5 requested .expectNextCount(page) // page 1.5 produced @@ -647,26 +645,26 @@ void testGetAllMessagesLimitsAndBackpressure() { // also NB: times() checks cumulative calls, hence addAndGet .then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(1))).evalsha(any(), any(), any(), any())) - .then(() -> assertNotNull(page4Sink.get(), "page 4 should have been fetched")) + .then(page4Publisher::assertWasSubscribed) .thenRequest(page + halfPage) // page 3 requested .expectNextCount(page + halfPage) // page 1.5–3 produced .thenRequest(halfPage) // page 3.5 requested - .then(() -> assertNull(page56Sink.get(), "page 5 should not have been fetched yet")) - .then(() -> page4Sink.get().next(pages.pop()).complete()) + .then(page56Publisher::assertWasNotRequested) + .then(() -> page4Publisher.emit(pages.pop())) .expectNextCount(halfPage) // page 3.5 produced .then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(1))).evalsha(any(), any(), any(), any())) - .then(() -> assertNotNull(page56Sink.get(), "page 5 should have been fetched")) + .then(page56Publisher::assertWasSubscribed) .thenRequest(page) // page 4.5 requested .expectNextCount(halfPage) // page 4 produced .thenRequest(page * 4) // request more demand than we will ultimately satisfy - .then(() -> page56Sink.get().next(pages.pop()).next(pages.pop()).complete()) + .then(() -> page56Publisher.next(pages.pop()).next(pages.pop()).complete()) .expectNextCount(page + page) // page 5 and 6 produced - .then(() -> emptyFinalPageSink.get().complete()) + .then(emptyFinalPagePublisher::complete) // confirm that cache calls increased by 2: one for page 5-and-6 (we got a two-fer in next(pop()).next(pop()), // and one for the final, empty page .then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(2))).evalsha(any(), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index efbed8f8b..1e55e3aaa 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -44,13 +44,10 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; @@ -68,10 +65,10 @@ import org.whispersystems.websocket.messages.WebSocketResponseMessage; import org.whispersystems.websocket.session.WebSocketSessionContext; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; class WebSocketConnectionTest { @@ -744,7 +741,6 @@ void testRetrieveMessageExceptionClientDisconnected() { } @Test - @Disabled("This test is flaky") void testReactivePublisherLimitRate() { final UUID accountUuid = UUID.randomUUID(); @@ -754,18 +750,10 @@ void testReactivePublisherLimitRate() { when(account.getNumber()).thenReturn("+14152222222"); when(account.getUuid()).thenReturn(accountUuid); - final int totalMessages = 10; - final AtomicReference> sink = new AtomicReference<>(); + final int totalMessages = 1000; - final AtomicLong maxRequest = new AtomicLong(-1); - final Flux flux = Flux.create(s -> { - sink.set(s); - s.onRequest(n -> { - if (maxRequest.get() < n) { - maxRequest.set(n); - } - }); - }); + final TestPublisher testPublisher = TestPublisher.createCold(); + final Flux flux = Flux.from(testPublisher); when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(deviceId), anyBoolean())) .thenReturn(flux); @@ -790,16 +778,16 @@ void testReactivePublisherLimitRate() { .thenRequest(totalMessages * 2) .then(() -> { for (long i = 0; i < totalMessages; i++) { - sink.get().next(createMessage(UUID.randomUUID(), accountUuid, 1111 * i + 1, "message " + i)); + testPublisher.next(createMessage(UUID.randomUUID(), accountUuid, 1111 * i + 1, "message " + i)); } - sink.get().complete(); + testPublisher.complete(); }) .expectNextCount(totalMessages) .expectComplete() .log() .verify(); - assertEquals(WebSocketConnection.MESSAGE_PUBLISHER_LIMIT_RATE, maxRequest.get()); + testPublisher.assertMaxRequested(WebSocketConnection.MESSAGE_PUBLISHER_LIMIT_RATE); } @Test