Skip to content

Commit fb3c889

Browse files
authored
Merge pull request #3079 from StevenPG/bugfix/handle-null-result-on-function-invocation-streambridge
Add Null Handling for FunctionToInvoke in StreamBridge along with Test
2 parents 7178dbf + 25427bc commit fb3c889

File tree

2 files changed

+26
-3
lines changed
  • core
    • spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function
    • spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function

2 files changed

+26
-3
lines changed

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.stream.Collectors;
3838
import java.util.stream.IntStream;
3939

40+
import org.junit.jupiter.api.Assertions;
4041
import org.junit.jupiter.api.BeforeAll;
4142
import org.junit.jupiter.api.Test;
4243

@@ -57,6 +58,7 @@
5758
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
5859
import org.springframework.context.ConfigurableApplicationContext;
5960
import org.springframework.context.annotation.Bean;
61+
import org.springframework.core.codec.CodecException;
6062
import org.springframework.integration.channel.AbstractMessageChannel;
6163
import org.springframework.integration.channel.DirectChannel;
6264
import org.springframework.integration.config.GlobalChannelInterceptor;
@@ -203,6 +205,23 @@ void test_2249() {
203205
}
204206
}
205207

208+
// For more context on this test: https://github.com/spring-cloud/spring-cloud-stream/issues/3078
209+
@Test
210+
void functionInvocationWrapperNullError() {
211+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
212+
TestChannelBinderConfiguration.getCompleteConfiguration(
213+
EmptyConfiguration.class)).web(WebApplicationType.NONE).run(
214+
"--spring.cloud.stream.source=outputA",
215+
"--spring.jmx.enabled=false")) {
216+
StreamBridge streamBridge = context.getBean(StreamBridge.class);
217+
var exception = Assertions.assertThrows(RuntimeException.class, () -> streamBridge.send("outputA-out-0",
218+
new CodecException("invalidException")
219+
));
220+
221+
assertThat(exception.getMessage()).isEqualTo("org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper returned null");
222+
}
223+
}
224+
206225
// For more context on this test: https://github.com/spring-cloud/spring-cloud-stream/issues/2815
207226
@Test
208227
void ensurePartitioningWorksWhenNativeEncodingEnabled() {

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,13 @@ public boolean send(String bindingName, @Nullable String binderName, Object data
228228
lock.unlock();
229229
}
230230

231-
if (resultMessage == null
232-
&& ((Message) messageToSend).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
233-
resultMessage = messageToSend;
231+
if (resultMessage == null) {
232+
if (((Message) messageToSend).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
233+
resultMessage = messageToSend;
234+
}
235+
else {
236+
throw new RuntimeException(functionToInvoke.getClass().getName() + " returned null");
237+
}
234238
}
235239

236240
resultMessage = (Message<?>) this.functionInvocationHelper.postProcessResult(resultMessage, null);

0 commit comments

Comments
 (0)