Skip to content

Commit a9fe0c2

Browse files
artembilanolegz
authored andcommitted
GH-3040: Add virtual threads customizer for RabbitMQ binder
Fixes: #3040 Resolves #3041 This change adds out-of-the-box `ListenerContainerCustomizer<AbstractMessageListenerContainer>` to set `VirtualThreadTaskExecutor` into an `AbstractMessageListenerContainer` created by the binder when `Threading.VIRTUAL` condition is met
1 parent a1c7773 commit a9fe0c2

File tree

1 file changed

+16
-1
lines changed

1 file changed

+16
-1
lines changed

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2023 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,12 +24,15 @@
2424
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
2525
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
2626
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
27+
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
2728
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
2829
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
2930
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
3031
import org.springframework.beans.factory.annotation.Autowired;
3132
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
33+
import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading;
3234
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
35+
import org.springframework.boot.autoconfigure.thread.Threading;
3336
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3437
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
3538
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitBinderConfigurationProperties;
@@ -42,6 +45,7 @@
4245
import org.springframework.context.annotation.Bean;
4346
import org.springframework.context.annotation.Configuration;
4447
import org.springframework.context.annotation.Import;
48+
import org.springframework.core.task.VirtualThreadTaskExecutor;
4549
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
4650
import org.springframework.integration.amqp.inbound.AmqpMessageSource;
4751
import org.springframework.lang.Nullable;
@@ -150,4 +154,15 @@ MessagePostProcessor gZipPostProcessor() {
150154
RabbitExchangeQueueProvisioner provisioningProvider(List<DeclarableCustomizer> customizers) {
151155
return new RabbitExchangeQueueProvisioner(this.rabbitConnectionFactory, customizers);
152156
}
157+
158+
@Bean
159+
@ConditionalOnThreading(Threading.VIRTUAL)
160+
ListenerContainerCustomizer<MessageListenerContainer> listenerContainerVirtualThreadExecutorCustomizer() {
161+
return (container, destinationName, group) -> {
162+
if (container instanceof AbstractMessageListenerContainer listenerContainer) {
163+
listenerContainer.setTaskExecutor(new VirtualThreadTaskExecutor(destinationName + "-"));
164+
}
165+
};
166+
}
167+
153168
}

0 commit comments

Comments
 (0)