Skip to content

Commit

Permalink
Group Kafka back-off properties
Browse files Browse the repository at this point in the history
Kafka back-off policy properties "delay", "maxDelay",
"multiplier", and "randomBackOff" are now grouped
under a common prefix of "backoff":
    - spring.kafka.retry.topic.backoff.delay
    - spring.kafka.retry.topic.backoff.maxDelay
    - spring.kafka.retry.topic.backoff.multiplier
    - spring.kafka.retry.topic.backoff.random
  • Loading branch information
travisriegler committed Jul 6, 2024
1 parent 69630bb commit e057139
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic.Backoff;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.ssl.SslBundles;
Expand Down Expand Up @@ -186,7 +186,7 @@ public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate<?, ?>
.useSingleTopicForSameIntervals()
.suffixTopicsWithIndexValues()
.doNotAutoCreateRetryTopics();
setBackOffPolicy(builder, retryTopic);
setBackOffPolicy(builder, retryTopic.getBackoff());
return builder.create(kafkaTemplate);
}

Expand Down Expand Up @@ -214,15 +214,15 @@ private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties,
}
}

private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Backoff retryTopicBackoff) {
long delay = (retryTopicBackoff.getDelay() != null) ? retryTopicBackoff.getDelay().toMillis() : 0;
if (delay > 0) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder();
map.from(delay).to(backOffPolicy::delay);
map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier);
map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random);
map.from(retryTopicBackoff.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
map.from(retryTopicBackoff.getMultiplier()).to(backOffPolicy::multiplier);
map.from(retryTopicBackoff.isRandom()).to(backOffPolicy::random);
builder.customBackoff((SleepingBackOffPolicy<?>) backOffPolicy.build());
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.convert.DurationUnit;
Expand Down Expand Up @@ -1644,36 +1645,114 @@ public void setAttempts(int attempts) {
this.attempts = attempts;
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.delay", since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public Duration getDelay() {
return this.delay;
return getBackoff().getDelay();
}

@Deprecated(since = "3.4.0", forRemoval = true)
public void setDelay(Duration delay) {
this.delay = delay;
getBackoff().setDelay(delay);
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.multiplier",
since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public double getMultiplier() {
return this.multiplier;
return getBackoff().getMultiplier();
}

@Deprecated(since = "3.4.0", forRemoval = true)
public void setMultiplier(double multiplier) {
this.multiplier = multiplier;
getBackoff().setMultiplier(multiplier);
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.maxDelay", since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public Duration getMaxDelay() {
return this.maxDelay;
return getBackoff().getMaxDelay();
}

@Deprecated(since = "3.4.0", forRemoval = true)
public void setMaxDelay(Duration maxDelay) {
this.maxDelay = maxDelay;
getBackoff().setMaxDelay(maxDelay);
}

@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.randomBackOff",
since = "3.4.0")
@Deprecated(since = "3.4.0", forRemoval = true)
public boolean isRandomBackOff() {
return this.randomBackOff;
return getBackoff().isRandom();
}

@Deprecated(since = "3.4.0", forRemoval = true)
public void setRandomBackOff(boolean randomBackOff) {
this.randomBackOff = randomBackOff;
getBackoff().setRandom(randomBackOff);
}

private final Backoff backoff = new Backoff();

public Backoff getBackoff() {
return this.backoff;
}

public static class Backoff {

/**
* Canonical backoff period. Used as an initial value in the exponential
* case, and as a minimum value in the uniform case.
*/
private Duration delay = Duration.ofSeconds(1);

/**
* Multiplier to use for generating the next backoff delay.
*/
private double multiplier = 0.0;

/**
* Maximum wait between retries. If less than the delay then the default
* of 30 seconds is applied.
*/
private Duration maxDelay = Duration.ZERO;

/**
* Whether to have the backoff delays.
*/
private boolean random = false;

public Duration getDelay() {
return this.delay;
}

public void setDelay(Duration delay) {
this.delay = delay;
}

public double getMultiplier() {
return this.multiplier;
}

public void setMultiplier(double multiplier) {
this.multiplier = multiplier;
}

public Duration getMaxDelay() {
return this.maxDelay;
}

public void setMaxDelay(Duration maxDelay) {
this.maxDelay = maxDelay;
}

public boolean isRandom() {
return this.random;
}

public void setRandom(boolean random) {
this.random = random;
}

}

}
Expand Down

0 comments on commit e057139

Please sign in to comment.