diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java index 1ffb084f9f..0555f2ae27 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,12 +24,15 @@ import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy; +import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor; import org.springframework.amqp.support.postprocessor.GZipPostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; +import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading; import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; +import org.springframework.boot.autoconfigure.thread.Threading; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder; import org.springframework.cloud.stream.binder.rabbit.properties.RabbitBinderConfigurationProperties; @@ -42,6 +45,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.core.task.VirtualThreadTaskExecutor; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; import org.springframework.integration.amqp.inbound.AmqpMessageSource; import org.springframework.lang.Nullable; @@ -150,4 +154,15 @@ MessagePostProcessor gZipPostProcessor() { RabbitExchangeQueueProvisioner provisioningProvider(List customizers) { return new RabbitExchangeQueueProvisioner(this.rabbitConnectionFactory, customizers); } + + @Bean + @ConditionalOnThreading(Threading.VIRTUAL) + ListenerContainerCustomizer listenerContainerVirtualThreadExecutorCustomizer() { + return (container, destinationName, group) -> { + if (container instanceof AbstractMessageListenerContainer listenerContainer) { + listenerContainer.setTaskExecutor(new VirtualThreadTaskExecutor(destinationName + "-")); + } + }; + } + }