Concurrency and Virtual Threads #2915
-
My service creates a I recently switched my service over to use virtual threads, and enabled them for Kafka as well: @Bean
public <V> ConcurrentKafkaListenerContainerFactory<String, V> kafkaListenerContainerFactory(
ObjectMapper objectMapper
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, V>();
factory.setAutoStartup(true);
factory.setConsumerFactory(kafkaConsumerFactory(objectMapper));
factory.setContainerCustomizer(container -> {
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
container.getContainerProperties().setListenerTaskExecutor(createVirtualThreadAsyncTaskExecutor());
});
return factory;
}
private AsyncTaskExecutor createVirtualThreadAsyncTaskExecutor() {
var asyncTaskExecutor = new SimpleAsyncTaskExecutor();
asyncTaskExecutor.setConcurrencyLimit(SimpleAsyncTaskExecutor.UNBOUNDED_CONCURRENCY);
asyncTaskExecutor.setVirtualThreads(true);
return asyncTaskExecutor;
} With this pattern, should I be invoking |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
You still need to set concurrency if you want to consume multiple records from different partitions concurrently; each consumer will get its own virtual thread. |
Beta Was this translation helpful? Give feedback.
-
Is it necessary to set the |
Beta Was this translation helpful? Give feedback.
You still need to set concurrency if you want to consume multiple records from different partitions concurrently; each consumer will get its own virtual thread.