From e0571397345b9cd28cb7eb80670655de8c45c2a6 Mon Sep 17 00:00:00 2001 From: Travis Riegler Date: Sat, 6 Jul 2024 16:17:16 -0400 Subject: [PATCH] Group Kafka back-off properties Kafka back-off policy properties "delay", "maxDelay", "multiplier", and "randomBackOff" are now grouped under a common prefix of "backoff": - spring.kafka.retry.topic.backoff.delay - spring.kafka.retry.topic.backoff.maxDelay - spring.kafka.retry.topic.backoff.multiplier - spring.kafka.retry.topic.backoff.random --- .../kafka/KafkaAutoConfiguration.java | 14 +-- .../autoconfigure/kafka/KafkaProperties.java | 95 +++++++++++++++++-- 2 files changed, 94 insertions(+), 15 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 18e02adb854a..0e0a42df0603 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -32,7 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic.Backoff; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.ssl.SslBundles; @@ -186,7 +186,7 @@ public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate .useSingleTopicForSameIntervals() .suffixTopicsWithIndexValues() .doNotAutoCreateRetryTopics(); - setBackOffPolicy(builder, retryTopic); + setBackOffPolicy(builder, retryTopic.getBackoff()); return builder.create(kafkaTemplate); } @@ -214,15 +214,15 @@ private void applyKafkaConnectionDetailsForAdmin(Map properties, } } - private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) { - long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0; + private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Backoff retryTopicBackoff) { + long delay = (retryTopicBackoff.getDelay() != null) ? retryTopicBackoff.getDelay().toMillis() : 0; if (delay > 0) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder(); map.from(delay).to(backOffPolicy::delay); - map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay); - map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier); - map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random); + map.from(retryTopicBackoff.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay); + map.from(retryTopicBackoff.getMultiplier()).to(backOffPolicy::multiplier); + map.from(retryTopicBackoff.isRandom()).to(backOffPolicy::random); builder.customBackoff((SleepingBackOffPolicy) backOffPolicy.build()); } else { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index c74fa291f734..c2a321dea3a4 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.DeprecatedConfigurationProperty; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.convert.DurationUnit; @@ -1644,36 +1645,114 @@ public void setAttempts(int attempts) { this.attempts = attempts; } + @DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.delay", since = "3.4.0") + @Deprecated(since = "3.4.0", forRemoval = true) public Duration getDelay() { - return this.delay; + return getBackoff().getDelay(); } + @Deprecated(since = "3.4.0", forRemoval = true) public void setDelay(Duration delay) { - this.delay = delay; + getBackoff().setDelay(delay); } + @DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.multiplier", + since = "3.4.0") + @Deprecated(since = "3.4.0", forRemoval = true) public double getMultiplier() { - return this.multiplier; + return getBackoff().getMultiplier(); } + @Deprecated(since = "3.4.0", forRemoval = true) public void setMultiplier(double multiplier) { - this.multiplier = multiplier; + getBackoff().setMultiplier(multiplier); } + @DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.maxDelay", since = "3.4.0") + @Deprecated(since = "3.4.0", forRemoval = true) public Duration getMaxDelay() { - return this.maxDelay; + return getBackoff().getMaxDelay(); } + @Deprecated(since = "3.4.0", forRemoval = true) public void setMaxDelay(Duration maxDelay) { - this.maxDelay = maxDelay; + getBackoff().setMaxDelay(maxDelay); } + @DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.randomBackOff", + since = "3.4.0") + @Deprecated(since = "3.4.0", forRemoval = true) public boolean isRandomBackOff() { - return this.randomBackOff; + return getBackoff().isRandom(); } + @Deprecated(since = "3.4.0", forRemoval = true) public void setRandomBackOff(boolean randomBackOff) { - this.randomBackOff = randomBackOff; + getBackoff().setRandom(randomBackOff); + } + + private final Backoff backoff = new Backoff(); + + public Backoff getBackoff() { + return this.backoff; + } + + public static class Backoff { + + /** + * Canonical backoff period. Used as an initial value in the exponential + * case, and as a minimum value in the uniform case. + */ + private Duration delay = Duration.ofSeconds(1); + + /** + * Multiplier to use for generating the next backoff delay. + */ + private double multiplier = 0.0; + + /** + * Maximum wait between retries. If less than the delay then the default + * of 30 seconds is applied. + */ + private Duration maxDelay = Duration.ZERO; + + /** + * Whether to have the backoff delays. + */ + private boolean random = false; + + public Duration getDelay() { + return this.delay; + } + + public void setDelay(Duration delay) { + this.delay = delay; + } + + public double getMultiplier() { + return this.multiplier; + } + + public void setMultiplier(double multiplier) { + this.multiplier = multiplier; + } + + public Duration getMaxDelay() { + return this.maxDelay; + } + + public void setMaxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + } + + public boolean isRandom() { + return this.random; + } + + public void setRandom(boolean random) { + this.random = random; + } + } }