diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java index ea7c2fadbe..06aad01924 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -57,6 +58,7 @@ import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.core.codec.CodecException; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.GlobalChannelInterceptor; @@ -203,6 +205,23 @@ void test_2249() { } } + // For more context on this test: https://github.com/spring-cloud/spring-cloud-stream/issues/3078 + @Test + void functionInvocationWrapperNullError() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + EmptyConfiguration.class)).web(WebApplicationType.NONE).run( + "--spring.cloud.stream.source=outputA", + "--spring.jmx.enabled=false")) { + StreamBridge streamBridge = context.getBean(StreamBridge.class); + var exception = Assertions.assertThrows(RuntimeException.class, () -> streamBridge.send("outputA-out-0", + new CodecException("invalidException") + )); + + assertThat(exception.getMessage()).isEqualTo("org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper returned null"); + } + } + // For more context on this test: https://github.com/spring-cloud/spring-cloud-stream/issues/2815 @Test void ensurePartitioningWorksWhenNativeEncodingEnabled() { diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index 42baa958d1..688fe49011 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -228,9 +228,13 @@ public boolean send(String bindingName, @Nullable String binderName, Object data lock.unlock(); } - if (resultMessage == null - && ((Message) messageToSend).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) { - resultMessage = messageToSend; + if (resultMessage == null) { + if (((Message) messageToSend).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) { + resultMessage = messageToSend; + } + else { + throw new RuntimeException(functionToInvoke.getClass().getName() + " returned null"); + } } resultMessage = (Message) this.functionInvocationHelper.postProcessResult(resultMessage, null);