From 97176b46ab5e54460d2f5ae715705e7b45ae16bd Mon Sep 17 00:00:00 2001 From: ferblaca Date: Tue, 1 Jul 2025 09:34:45 +0200 Subject: [PATCH] Add test to verify correct bean conditional resolution order in a multi-binder scenario This change adds KafkaMultiBinderCustomConfigurationTests to verify that user configuration classes specified via spring.main.sources are properly loaded before binder configuration classes. This ensures that @ConditionalOnMissingBean annotations in the binder configurations correctly detect user-provided beans. The test confirms that when using KafkaBinderConfiguration with @ConditionalOnMissingBean(KafkaBinderMetrics.class), a custom KafkaBinderMetrics bean configured via spring.main.sources is correctly detected and used instead of the default implementation. Related to fix in gh-3114 Signed-off-by: ferblaca --- ...kaMultiBinderCustomConfigurationTests.java | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaMultiBinderCustomConfigurationTests.java diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaMultiBinderCustomConfigurationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaMultiBinderCustomConfigurationTests.java new file mode 100644 index 0000000000..c3c3768f79 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaMultiBinderCustomConfigurationTests.java @@ -0,0 +1,145 @@ +/* + * Copyright 2023-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.integration; + +import java.lang.reflect.Field; +import java.util.Map; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.binder.Binder; +import org.springframework.cloud.stream.binder.DefaultBinderFactory; +import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics; +import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.util.ReflectionUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests to verify that custom configurations defined in spring.main.sources + * are properly loaded before the default binder configuration in multi-binder scenarios. + * + * @author Fernando Blanch + * @since 4.1.0 + */ +@SpringBootTest(classes = KafkaMultiBinderCustomConfigurationTests.class, + webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = { + "spring.cloud.stream.defaultBinder=kafka1", + "spring.cloud.stream.binders.kafka1.type=kafka", + "spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}", + "spring.cloud.stream.binders.kafka2.type=kafka", + "spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}", + "spring.cloud.stream.binders.kafka2.environment.spring.main.sources=" + + "org.springframework.cloud.stream.binder.kafka.integration.KafkaMultiBinderCustomConfigurationTests$CustomConfiguration" +}) +@DirtiesContext +@EnableAutoConfiguration +@EmbeddedKafka(controlledShutdown = true) +class KafkaMultiBinderCustomConfigurationTests { + + @Autowired + private DefaultBinderFactory binderFactory; + + /** + * Verifies that the custom user configuration is loaded from spring.main.sources + */ + @Test + void binderKafka2UsesCustomConfigurationIsLoadedFromSpringMainSources() throws IllegalAccessException { + // Force initialization of binders + Binder kafka2Binder = binderFactory.getBinder("kafka2", Object.class); + assertThat(kafka2Binder).isNotNull(); + + // Get the kafka2 binder context + ConfigurableApplicationContext kafka2Context = getBinderContext("kafka2"); + assertThat(kafka2Context).isNotNull(); + + // Verify that our custom bean is used instead of the default one + KafkaBinderMetrics kafkaBinderMetrics = kafka2Context.getBean(KafkaBinderMetrics.class); + assertThat(kafkaBinderMetrics).isInstanceOf(CustomKafkaBinderMetrics.class); + } + + /** + * Verifies that the default configuration is used when no custom user configuration is provided + */ + @Test + void binderKafka1UsesDefaultBeanFromKafkaBinderMetricsConfigurationWithMultiBinder() throws IllegalAccessException { + // Force initialization of binders + Binder kafka1Binder = binderFactory.getBinder("kafka1", Object.class); + assertThat(kafka1Binder).isNotNull(); + + ConfigurableApplicationContext kafka1Context = getBinderContext("kafka1"); + assertThat(kafka1Context).isNotNull(); + + // Verify that the metrics bean is from KafkaBinderMetricsConfigurationWithMultiBinder configuration + // (not a CustomKafkaBinderMetrics instance) + KafkaBinderMetrics kafka1BinderMetrics = kafka1Context.getBean(KafkaBinderMetrics.class); + assertThat(kafka1BinderMetrics).isNotInstanceOf(CustomKafkaBinderMetrics.class); + } + + /** + * Helper method to get the binder context from the binderInstanceCache field in DefaultBinderFactory + */ + private ConfigurableApplicationContext getBinderContext(String binderName) throws IllegalAccessException { + Field binderInstanceCacheField = ReflectionUtils.findField(DefaultBinderFactory.class, "binderInstanceCache"); + assertThat(binderInstanceCacheField).isNotNull(); + ReflectionUtils.makeAccessible(binderInstanceCacheField); + @SuppressWarnings("unchecked") + Map, ConfigurableApplicationContext>> binderInstanceCache = + (Map, ConfigurableApplicationContext>>) binderInstanceCacheField.get(this.binderFactory); + return binderInstanceCache.get(binderName).getValue(); + } + + /** + * Custom configuration that provides a custom KafkaBinderMetrics + */ + static class CustomConfiguration { + + @Bean + MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + + @Bean + KafkaBinderMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder, + KafkaBinderConfigurationProperties configurationProperties, + MeterRegistry meterRegistry) { + return new CustomKafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, meterRegistry); + } + + } + + static class CustomKafkaBinderMetrics extends KafkaBinderMetrics { + + CustomKafkaBinderMetrics(KafkaMessageChannelBinder binder, + KafkaBinderConfigurationProperties binderConfigurationProperties, + MeterRegistry meterRegistry) { + super(binder, binderConfigurationProperties, null, meterRegistry); + } + + } + +}