diff --git a/build.gradle.kts b/build.gradle.kts index a7f32410..870afdf9 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -144,7 +144,7 @@ allprojects { testLogging { events(SKIPPED, PASSED, FAILED) - showStandardStreams = false // change to true to get log output from tests + showStandardStreams = true // change to true to get log output from tests exceptionFormat = FULL } diff --git a/commons/src/testFixtures/java/one/tomorrow/transactionaloutbox/commons/ProxiedContainerSupport.java b/commons/src/testFixtures/java/one/tomorrow/transactionaloutbox/commons/ProxiedContainerSupport.java index 88acd562..8fbc3b43 100644 --- a/commons/src/testFixtures/java/one/tomorrow/transactionaloutbox/commons/ProxiedContainerSupport.java +++ b/commons/src/testFixtures/java/one/tomorrow/transactionaloutbox/commons/ProxiedContainerSupport.java @@ -17,7 +17,9 @@ import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; +import eu.rekawek.toxiproxy.model.Toxic; import eu.rekawek.toxiproxy.model.ToxicDirection; +import lombok.SneakyThrows; import org.testcontainers.containers.ToxiproxyContainer; import java.io.IOException; @@ -38,6 +40,7 @@ public interface ProxiedContainerSupport { * Cuts the connection by setting bandwidth in both directions to zero. * @param shouldCutConnection true if the connection should be cut, or false if it should be re-enabled */ + @SneakyThrows default void setConnectionCut(boolean shouldCutConnection) { synchronized (isCurrentlyCut) { if (shouldCutConnection != isCurrentlyCut.get()) { @@ -47,8 +50,8 @@ default void setConnectionCut(boolean shouldCutConnection) { getProxy().toxics().bandwidth(CUT_CONNECTION_UPSTREAM, ToxicDirection.UPSTREAM, 0); isCurrentlyCut.set(true); } else { - getProxy().toxics().get(CUT_CONNECTION_DOWNSTREAM).remove(); - getProxy().toxics().get(CUT_CONNECTION_UPSTREAM).remove(); + removeToxicIfPresent(CUT_CONNECTION_DOWNSTREAM); + removeToxicIfPresent(CUT_CONNECTION_UPSTREAM); isCurrentlyCut.set(false); } } catch (IOException e) { @@ -58,6 +61,24 @@ default void setConnectionCut(boolean shouldCutConnection) { } } + private void removeToxicIfPresent(String name) throws IOException { + Toxic toxic = getToxic(name); + if (toxic != null) { + toxic.remove(); + } + } + + private Toxic getToxic(String name) { + try { + return getProxy().toxics().get(name); + } catch (IOException e) { + if (e.getMessage().contains("[404]")) { + return null; + } + throw new RuntimeException("Could not get toxic", e); + } + } + static Proxy createProxy(String service, ToxiproxyContainer toxiproxy, int exposedPort) { final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort()); Proxy proxy; diff --git a/outbox-kafka-spring-reactive/src/test/java/one/tomorrow/transactionaloutbox/reactive/service/OutboxProcessorIntegrationTest.java b/outbox-kafka-spring-reactive/src/test/java/one/tomorrow/transactionaloutbox/reactive/service/OutboxProcessorIntegrationTest.java index f1b48d6f..4cb4b571 100644 --- a/outbox-kafka-spring-reactive/src/test/java/one/tomorrow/transactionaloutbox/reactive/service/OutboxProcessorIntegrationTest.java +++ b/outbox-kafka-spring-reactive/src/test/java/one/tomorrow/transactionaloutbox/reactive/service/OutboxProcessorIntegrationTest.java @@ -54,6 +54,7 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; @FlywayTest @SuppressWarnings({"unused", "ConstantConditions"}) @@ -167,8 +168,13 @@ void should_processRecordsInOrder_whenKafkaIsTemporarilyNotAvailable() { // then List outboxRecords = getSortedById(outboxRecordMonos); - Iterator> kafkaRecordsIter = consumeAndDeduplicateRecords(outboxRecords.size(), Duration.ofSeconds(30)) - .iterator(); + Collection> kafkaRecords = consumeAndDeduplicateRecords(outboxRecords.size(), Duration.ofSeconds(30)); + + List outboxRecordIds = outboxRecords.stream().map(OutboxRecord::getId).toList(); + List kafkaRecordIds = kafkaRecords.stream().map(rec -> toLong(rec.headers().lastHeader(HEADERS_SEQUENCE_NAME).value())).toList(); + assertEquals(outboxRecordIds, kafkaRecordIds, "OutboxRecord ids and Kafka record ids do not match"); + + Iterator> kafkaRecordsIter = kafkaRecords.iterator(); for (OutboxRecord outboxRecord : outboxRecords) { assertConsumedRecord(outboxRecord, eventSource, kafkaRecordsIter.next()); }