diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/ConcurrencyBasedRetryLimiter.java b/core/src/main/java/com/linecorp/armeria/client/retry/ConcurrencyBasedRetryLimiter.java new file mode 100644 index 00000000000..28fddb81a78 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/ConcurrencyBasedRetryLimiter.java @@ -0,0 +1,55 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.MoreObjects; + +import com.linecorp.armeria.client.ClientRequestContext; + +final class ConcurrencyBasedRetryLimiter implements RetryLimiter { + + private final long maxRequests; + private final AtomicLong activeRequests = new AtomicLong(); + + ConcurrencyBasedRetryLimiter(long maxRequests) { + checkArgument(maxRequests > 0, "maxRequests must be positive: %s.", maxRequests); + this.maxRequests = maxRequests; + } + + @Override + public boolean shouldRetry(ClientRequestContext ctx) { + final long cnt = activeRequests.incrementAndGet(); + if (cnt > maxRequests) { + activeRequests.decrementAndGet(); + return false; + } + ctx.log().whenComplete().thenRun(activeRequests::decrementAndGet); + return true; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("activeRequests", activeRequests) + .add("maxRequests", maxRequests) + .toString(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfig.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfig.java index 31f7892815d..44d953668b5 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfig.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfig.java @@ -78,6 +78,7 @@ static RetryConfigBuilder builder0( private final int maxTotalAttempts; private final long responseTimeoutMillisForEachAttempt; private final int maxContentLength; + private final RetryLimiter retryLimiter; @Nullable private final RetryRule retryRule; @@ -88,9 +89,10 @@ static RetryConfigBuilder builder0( @Nullable private RetryRuleWithContent fromRetryRule; - RetryConfig(RetryRule retryRule, int maxTotalAttempts, long responseTimeoutMillisForEachAttempt) { + RetryConfig(RetryRule retryRule, int maxTotalAttempts, long responseTimeoutMillisForEachAttempt, + RetryLimiter retryLimiter) { this(requireNonNull(retryRule, "retryRule"), null, - maxTotalAttempts, responseTimeoutMillisForEachAttempt, 0); + maxTotalAttempts, responseTimeoutMillisForEachAttempt, 0, retryLimiter); checkArguments(maxTotalAttempts, responseTimeoutMillisForEachAttempt); } @@ -98,9 +100,10 @@ static RetryConfigBuilder builder0( RetryRuleWithContent retryRuleWithContent, int maxContentLength, int maxTotalAttempts, - long responseTimeoutMillisForEachAttempt) { + long responseTimeoutMillisForEachAttempt, + RetryLimiter retryLimiter) { this(null, requireNonNull(retryRuleWithContent, "retryRuleWithContent"), - maxTotalAttempts, responseTimeoutMillisForEachAttempt, maxContentLength); + maxTotalAttempts, responseTimeoutMillisForEachAttempt, maxContentLength, retryLimiter); } private RetryConfig( @@ -108,7 +111,8 @@ private RetryConfig( @Nullable RetryRuleWithContent retryRuleWithContent, int maxTotalAttempts, long responseTimeoutMillisForEachAttempt, - int maxContentLength) { + int maxContentLength, RetryLimiter retryLimiter) { + this.retryLimiter = new RetryLimiters.CatchingRetryLimiter(retryLimiter); checkArguments(maxTotalAttempts, responseTimeoutMillisForEachAttempt); this.retryRule = retryRule; this.retryRuleWithContent = retryRuleWithContent; @@ -147,7 +151,8 @@ public RetryConfigBuilder toBuilder() { } return builder .maxTotalAttempts(maxTotalAttempts) - .responseTimeoutMillisForEachAttempt(responseTimeoutMillisForEachAttempt); + .responseTimeoutMillisForEachAttempt(responseTimeoutMillisForEachAttempt) + .retryLimiter(retryLimiter); } /** @@ -197,6 +202,13 @@ public boolean needsContentInRule() { return retryRuleWithContent != null; } + /** + * Returns the configured {@link RetryLimiter}. + */ + public RetryLimiter retryLimiter() { + return retryLimiter; + } + /** * Returns whether the associated {@link RetryRule} or {@link RetryRuleWithContent} requires * response trailers. diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfigBuilder.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfigBuilder.java index cbda418e44f..5d759e93fcf 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfigBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfigBuilder.java @@ -24,6 +24,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; +import com.linecorp.armeria.client.retry.RetryLimiters.AlwaysRetryLimiter; import com.linecorp.armeria.common.Flags; import com.linecorp.armeria.common.Response; import com.linecorp.armeria.common.annotation.Nullable; @@ -42,6 +43,7 @@ public final class RetryConfigBuilder { private final RetryRule retryRule; @Nullable private final RetryRuleWithContent retryRuleWithContent; + private RetryLimiter retryLimiter = AlwaysRetryLimiter.INSTANCE; /** * Creates a {@link RetryConfigBuilder} with this {@link RetryRule}. @@ -111,19 +113,30 @@ public RetryConfigBuilder responseTimeoutForEachAttempt(Duration responseTime return this; } + /** + * Sets a {@link RetryLimiter} which may limit retry requests. + * @see RetryLimiter + */ + public RetryConfigBuilder retryLimiter(RetryLimiter retryLimiter) { + this.retryLimiter = requireNonNull(retryLimiter, "retryLimiter"); + return this; + } + /** * Returns a newly-created {@link RetryConfig} from this {@link RetryConfigBuilder}'s values. */ public RetryConfig build() { if (retryRule != null) { - return new RetryConfig<>(retryRule, maxTotalAttempts, responseTimeoutMillisForEachAttempt); + return new RetryConfig<>(retryRule, maxTotalAttempts, responseTimeoutMillisForEachAttempt, + retryLimiter); } assert retryRuleWithContent != null; return new RetryConfig<>( retryRuleWithContent, maxContentLength, maxTotalAttempts, - responseTimeoutMillisForEachAttempt); + responseTimeoutMillisForEachAttempt, + retryLimiter); } @Override @@ -139,6 +152,7 @@ ToStringHelper toStringHelper() { .add("retryRuleWithContent", retryRuleWithContent) .add("maxTotalAttempts", maxTotalAttempts) .add("responseTimeoutMillisForEachAttempt", responseTimeoutMillisForEachAttempt) - .add("maxContentLength", maxContentLength); + .add("maxContentLength", maxContentLength) + .add("retryLimiter", retryLimiter); } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryDecision.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryDecision.java index ed4bcad3c2a..44ae89d9e4e 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryDecision.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryDecision.java @@ -18,6 +18,9 @@ import static java.util.Objects.requireNonNull; +import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; + import com.linecorp.armeria.common.annotation.Nullable; /** @@ -26,27 +29,44 @@ */ public final class RetryDecision { - private static final RetryDecision NO_RETRY = new RetryDecision(null); - private static final RetryDecision NEXT = new RetryDecision(null); - static final RetryDecision DEFAULT = new RetryDecision(Backoff.ofDefault()); + private static final RetryDecision NO_RETRY = new RetryDecision(null, -1); + private static final RetryDecision NEXT = new RetryDecision(null, 0); + static final RetryDecision DEFAULT = new RetryDecision(Backoff.ofDefault(), 1); /** * Returns a {@link RetryDecision} that retries with the specified {@link Backoff}. + * The permits will be {@code 1} by default. */ public static RetryDecision retry(Backoff backoff) { - if (backoff == Backoff.ofDefault()) { + return retry(backoff, 1); + } + + /** + * Returns a {@link RetryDecision} that retries with the specified {@link Backoff}. + */ + @SuppressWarnings("FloatingPointEquality") + public static RetryDecision retry(Backoff backoff, double permits) { + if (backoff == DEFAULT.backoff() && permits == DEFAULT.permits()) { return DEFAULT; } - return new RetryDecision(requireNonNull(backoff, "backoff")); + return new RetryDecision(requireNonNull(backoff, "backoff"), permits); } /** * Returns a {@link RetryDecision} that never retries. + * The permits will be {@code -1} by default. */ public static RetryDecision noRetry() { return NO_RETRY; } + /** + * Returns a {@link RetryDecision} that never retries. + */ + public static RetryDecision noRetry(double permits) { + return new RetryDecision(null, permits); + } + /** * Returns a {@link RetryDecision} that skips the current {@link RetryRule} and * tries to retry with the next {@link RetryRule}. @@ -57,9 +77,11 @@ public static RetryDecision next() { @Nullable private final Backoff backoff; + private final double permits; - private RetryDecision(@Nullable Backoff backoff) { + private RetryDecision(@Nullable Backoff backoff, double permits) { this.backoff = backoff; + this.permits = permits; } @Nullable @@ -67,14 +89,29 @@ Backoff backoff() { return backoff; } + /** + * The number of permits associated with this {@link RetryDecision}. + * This may be used by {@link RetryLimiter} to determine whether retry requests should + * be limited or not. The semantics of whether or how the returned value affects {@link RetryLimiter} + * depends on what type of {@link RetryLimiter} is used. + */ + public double permits() { + return permits; + } + @Override public String toString() { - if (this == NO_RETRY) { - return "RetryDecision(NO_RETRY)"; - } else if (this == NEXT) { - return "RetryDecision(NEXT)"; + final ToStringHelper stringHelper = MoreObjects.toStringHelper(this); + if (this == NEXT) { + stringHelper.add("type", "NEXT"); + } else if (backoff != null) { + stringHelper.add("type", "RETRY"); } else { - return "RetryDecision(RETRY(" + backoff + "))"; + stringHelper.add("type", "NO_RETRY"); } + return stringHelper.omitNullValues() + .add("backoff", backoff) + .add("permits", permits) + .toString(); } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimitedException.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimitedException.java new file mode 100644 index 00000000000..d55745096c8 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimitedException.java @@ -0,0 +1,44 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import com.linecorp.armeria.common.Flags; + +/** + * An exception thrown when a retry is limited by a {@link RetryLimiter}. + */ +public final class RetryLimitedException extends RuntimeException { + + private static final long serialVersionUID = 7203512016805562689L; + + private static final RetryLimitedException INSTANCE = new RetryLimitedException(false); + + /** + * Returns an instance of {@link RetryLimitedException} sampled by {@link Flags#verboseExceptionSampler()}. + */ + public static RetryLimitedException of() { + return isSampled() ? new RetryLimitedException(true) : INSTANCE; + } + + private RetryLimitedException(boolean enableSuppression) { + super(null, null, enableSuppression, isSampled()); + } + + private static boolean isSampled() { + return Flags.verboseExceptionSampler().isSampled(RetryLimitedException.class); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiter.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiter.java new file mode 100644 index 00000000000..03d436d4c76 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiter.java @@ -0,0 +1,80 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * Provides a way to limit the number of retries. + * This may be useful for situations where prolonged service degradation may trigger + * an explosion of retries, which may further exacerbate the overall system. + *
{@code
+ * var rule = RetryRule.of(RetryRule.builder()
+ *                     .onStatus(SERVICE_UNAVAILABLE)
+ *                     // increment permits
+ *                     .build(RetryDecision.retry(Backoff.ofDefault(), 1)),
+ *            RetryRule.builder()
+ *                     .onStatus(OK)
+ *                     // decrement permits
+ *                     .build(RetryDecision.retry(Backoff.ofDefault(), -1)));
+ * var config = RetryConfig.builder(rule)
+ *                         // max 5 concurrent retry request
+ *                         .retryLimiter(RetryLimiter.concurrencyLimiting(5))
+ *                         .build();
+ * var decorator = RetryingClient.newDecorator(config)
+ * }
+ */ +@UnstableApi +public interface RetryLimiter { + + /** + * Returns a {@link RetryLimiter} which limits the number of concurrent retry requests. + * This limiter does not consider {@link RetryDecision#permits()} when limiting retries. + */ + static RetryLimiter concurrencyLimiting(long maxRequests) { + return new ConcurrencyBasedRetryLimiter(maxRequests); + } + + /** + * A token based {@link RetryLimiter} influenced by gRPC's retry throttling algorithm. + * Given {@code maxTokens} amount of tokens, each retry will consume {@link RetryDecision#permits()} + * amount of tokens. Negative {@link RetryDecision#permits()} will replenish tokens. + * Retries will be allowed if more than {@code threshold} amount of tokens are available. + */ + static RetryLimiter tokenBased(int maxTokens, int threshold) { + return new TokenBasedRetryLimiter(maxTokens, threshold); + } + + /** + * A callback method which is invoked before each retry (excluding the first request). + * The returned value determines whether a retry request should be executed, or whether a + * {@link RetryLimitedException} will be thrown. + * + * @param ctx the {@link ClientRequestContext} of the current request which will be executed + * @return {@code true} if the request should be executed + */ + boolean shouldRetry(ClientRequestContext ctx); + + /** + * A callback method which is invoked after a {@link RetryDecision} has been determined. + * + * @param ctx the {@link ClientRequestContext} of the request used to derive the {@link RetryDecision} + * @param decision the computed {@link RetryDecision} + */ + default void handleDecision(ClientRequestContext ctx, RetryDecision decision) {} +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiters.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiters.java new file mode 100644 index 00000000000..ddeec57e2f9 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiters.java @@ -0,0 +1,65 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.client.ClientRequestContext; + +final class RetryLimiters { + + static final class CatchingRetryLimiter implements RetryLimiter { + + private static final Logger logger = LoggerFactory.getLogger(CatchingRetryLimiter.class); + + private final RetryLimiter delegate; + + CatchingRetryLimiter(RetryLimiter delegate) { + this.delegate = delegate; + } + + @Override + public boolean shouldRetry(ClientRequestContext ctx) { + try { + return delegate.shouldRetry(ctx); + } catch (Exception e) { + logger.warn("Unexpected error when invoking RetryLimiter.shouldRetry: ", e); + return false; + } + } + + @Override + public void handleDecision(ClientRequestContext ctx, RetryDecision decision) { + try { + delegate.handleDecision(ctx, decision); + } catch (Exception e) { + logger.warn("Unexpected error when invoking RetryLimiter.handleDecision: ", e); + } + } + } + + static final class AlwaysRetryLimiter implements RetryLimiter { + + static final RetryLimiter INSTANCE = new AlwaysRetryLimiter(); + + @Override + public boolean shouldRetry(ClientRequestContext ctx) { + return true; + } + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryRuleBuilder.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryRuleBuilder.java index 5cdbeae24b8..4e230b0a23c 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryRuleBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryRuleBuilder.java @@ -63,8 +63,11 @@ public RetryRule thenNoRetry() { return build(RetryDecision.noRetry()); } - private RetryRule build(RetryDecision decision) { - if (decision != RetryDecision.noRetry() && + /** + * Returns a newly created {@link RetryRule} based on the {@link RetryDecision}. + */ + public RetryRule build(RetryDecision decision) { + if (decision.backoff() != null && exceptionFilter() == null && responseHeadersFilter() == null && responseTrailersFilter() == null && grpcTrailersFilter() == null && totalDurationFilter() == null) { diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryRuleWithContentBuilder.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryRuleWithContentBuilder.java index f17c68120e2..24494fc0141 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryRuleWithContentBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryRuleWithContentBuilder.java @@ -63,11 +63,14 @@ public RetryRuleWithContent thenNoRetry() { return build(RetryDecision.noRetry()); } - RetryRuleWithContent build(RetryDecision decision) { + /** + * Returns a newly created {@link RetryRuleWithContent} based on the {@link RetryDecision}. + */ + public RetryRuleWithContent build(RetryDecision decision) { final BiFunction> responseFilter = responseFilter(); final boolean hasResponseFilter = responseFilter != null; - if (decision != RetryDecision.noRetry() && exceptionFilter() == null && + if (decision.backoff() != null && exceptionFilter() == null && responseHeadersFilter() == null && responseTrailersFilter() == null && grpcTrailersFilter() == null && !hasResponseFilter) { throw new IllegalStateException("Should set at least one retry rule if a backoff was set."); diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index d914bfece68..5969f640e30 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -312,6 +312,15 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD return; } + final RetryConfig config = mappedRetryConfig(ctx); + if (!initialAttempt) { + final boolean shouldRetry = config.retryLimiter().shouldRetry(derivedCtx); + if (!shouldRetry) { + handleException(ctx, rootReqDuplicator, future, RetryLimitedException.of(), initialAttempt); + return; + } + } + final HttpRequest ctxReq = derivedCtx.request(); assert ctxReq != null; final HttpResponse response; @@ -328,7 +337,6 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD (context, cause) -> HttpResponse.ofFailure(cause), ctxReq, false); } - final RetryConfig config = mappedRetryConfig(ctx); if (!ctx.exchangeType().isResponseStreaming() || config.requiresResponseTrailers()) { response.aggregate().handle((aggregated, cause) -> { if (cause != null) { @@ -366,7 +374,7 @@ private void handleResponseWithoutContent(RetryConfig config, Clie f.handle((decision, shouldRetryCause) -> { warnIfExceptionIsRaised(retryRule, shouldRetryCause); handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, - originalReq, returnedRes, future, response); + originalReq, returnedRes, future, response, config); return null; }); } catch (Throwable cause) { @@ -413,7 +421,8 @@ private void handleStreamingResponse(RetryConfig retryConfig, Clie warnIfExceptionIsRaised(ruleWithContent, cause); truncatingHttpResponse.abort(); handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, - originalReq, returnedRes, future, duplicated); + originalReq, returnedRes, future, duplicated, + retryConfig); return null; }); } catch (Throwable cause) { @@ -451,7 +460,7 @@ private void handleAggregatedResponse(RetryConfig retryConfig, Cli warnIfExceptionIsRaised(ruleWithContent, cause); handleRetryDecision( decision, ctx, derivedCtx, rootReqDuplicator, originalReq, - returnedRes, future, aggregatedRes.toHttpResponse()); + returnedRes, future, aggregatedRes.toHttpResponse(), retryConfig); return null; }); } catch (Throwable cause) { @@ -524,7 +533,12 @@ private static void handleException(ClientRequestContext ctx, private void handleRetryDecision(@Nullable RetryDecision decision, ClientRequestContext ctx, ClientRequestContext derivedCtx, HttpRequestDuplicator rootReqDuplicator, HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future, HttpResponse originalRes) { + CompletableFuture future, HttpResponse originalRes, + RetryConfig config) { + if (decision != null) { + config.retryLimiter().handleDecision(derivedCtx, decision); + } + final Backoff backoff = decision != null ? decision.backoff() : null; if (backoff != null) { final long millisAfter = useRetryAfter ? getRetryAfterMillis(derivedCtx) : -1; diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index a1c0f2749c7..a35c5e220ba 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -25,6 +25,7 @@ import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.ResponseTimeoutException; import com.linecorp.armeria.client.RpcClient; +import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.Request; @@ -170,6 +171,16 @@ private void doExecute0(ClientRequestContext ctx, RpcRequest req, mutator -> mutator.add(ARMERIA_RETRY_COUNT, StringUtil.toString(totalAttempts - 1))); } + final RetryConfig config = mappedRetryConfig(ctx); + if (!initialAttempt) { + final boolean shouldRetry = config.retryLimiter().shouldRetry(derivedCtx); + if (!shouldRetry) { + handleException(ctx, future, UnprocessedRequestException.of(RetryLimitedException.of()), + initialAttempt); + return; + } + } + final RpcResponse res; final ClientRequestContextExtension ctxExtension = derivedCtx.as(ClientRequestContextExtension.class); @@ -196,6 +207,10 @@ private void doExecute0(ClientRequestContext ctx, RpcRequest req, try { assert retryRule != null; retryRule.shouldRetry(derivedCtx, res, cause).handle((decision, unused3) -> { + if (decision != null) { + config.retryLimiter().handleDecision(derivedCtx, decision); + } + final Backoff backoff = decision != null ? decision.backoff() : null; if (backoff != null) { final long nextDelay = getNextDelay(derivedCtx, backoff); diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/TokenBasedRetryLimiter.java b/core/src/main/java/com/linecorp/armeria/client/retry/TokenBasedRetryLimiter.java new file mode 100644 index 00000000000..117add103f4 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/TokenBasedRetryLimiter.java @@ -0,0 +1,79 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.MoreObjects; + +import com.linecorp.armeria.client.ClientRequestContext; + +final class TokenBasedRetryLimiter implements RetryLimiter { + + /** + * The number of tokens starts at maxTokens. The token_count will always be between 0 and maxTokens. + */ + private final int maxTokens; + + /** + * The threshold. Retries are allowed if the token count is greater than the threshold + */ + private final int threshold; + + private final AtomicInteger tokenCount; + + TokenBasedRetryLimiter(int maxTokens, int threshold) { + checkArgument(maxTokens > 0, "maxTokens must be positive: %s.", maxTokens); + checkArgument(threshold >= 0 && threshold < maxTokens, + "invalid threshold: %s (>=0 && <%s)", threshold, maxTokens); + this.maxTokens = maxTokens; + this.threshold = threshold; + tokenCount = new AtomicInteger(maxTokens); + } + + @Override + public boolean shouldRetry(ClientRequestContext ctx) { + return tokenCount.get() > threshold; + } + + @Override + public void handleDecision(ClientRequestContext ctx, RetryDecision decision) { + final int permits = (int) decision.permits(); + if (permits == 0) { + return; + } + boolean updated; + do { + final int currentCount = tokenCount.get(); + int newCount = currentCount - permits; + newCount = Math.max(newCount, 0); + newCount = Math.min(newCount, maxTokens); + updated = tokenCount.compareAndSet(currentCount, newCount); + } while (!updated); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("maxTokens", maxTokens) + .add("threshold", threshold) + .add("tokenCount", tokenCount) + .toString(); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterIntegrationTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterIntegrationTest.java new file mode 100644 index 00000000000..5554418ef4c --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterIntegrationTest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.ArrayDeque; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.BlockingWebClient; +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.ResponseHeaders; + +class RetryLimiterIntegrationTest { + + @Test + void concurrencyLimiting() throws Exception { + final Backoff fixed = Backoff.fixed(0); + final RetryRule rule = + RetryRule.builder() + .onStatus(HttpStatus.OK) + .build(RetryDecision.retry(fixed, 1)); + final int maxRequests = 3; + final RetryConfig config = + RetryConfig.builder(rule) + .retryLimiter(RetryLimiter.concurrencyLimiting(maxRequests)) + .build(); + + final AtomicInteger counter = new AtomicInteger(); + final ArrayDeque deque = new ArrayDeque<>(); + final WebClient client = + WebClient.builder("http://foo.com") + .decorator((delegate, ctx, req) -> { + counter.incrementAndGet(); + if (ctx.log().partial().currentAttempt() > 1) { + return HttpResponse.streaming(); + } + return HttpResponse.of(200); + }) + .decorator(RetryingClient.newDecorator(config)) + .build(); + for (int i = 0; i < maxRequests; i++) { + deque.add(client.get("/")); + } + + assertThatThrownBy(() -> client.blocking().get("/")).isInstanceOf(RetryLimitedException.class); + while (!deque.isEmpty()) { + deque.poll().abort(); + } + assertThat(counter).hasValue(maxRequests * 2 + 1); + } + + @Test + void bucketLimiting() throws Exception { + final Backoff fixed = Backoff.fixed(0); + // emulates grpc retry throttling behavior + final RetryRule retryRule = RetryRule.of( + RetryRule.builder() + .onResponseHeaders((ctx, trailers) -> trailers.containsInt("grpc-status", 0)) + .build(RetryDecision.noRetry(-1)), + RetryRule.builder() + .onResponseHeaders((ctx, trailers) -> trailers.containsInt("grpc-status", 11)) + .build(RetryDecision.retry(fixed, 1)) + ); + final RetryConfig config = RetryConfig.builder(retryRule) + .retryLimiter(RetryLimiter.tokenBased(3, 1)) + .build(); + + final AtomicInteger counter = new AtomicInteger(); + final BlockingWebClient client = + WebClient.builder("http://foo.com") + .decorator((delegate, ctx, req) -> { + counter.incrementAndGet(); + return HttpResponse.of(ResponseHeaders.builder(200) + .add("grpc-status", "11") + .build()); + }) + .decorator(RetryingClient.newDecorator(config)) + .build() + .blocking(); + assertThatThrownBy(() -> client.get("/")).isInstanceOf(RetryLimitedException.class); + assertThat(counter).hasValue(2); + } + + @Test + void throwingLimiter() throws Exception { + final Backoff fixed = Backoff.fixed(0); + final RetryRule rule = + RetryRule.builder() + .onStatus(HttpStatus.OK) + .build(RetryDecision.retry(fixed, 1)); + final int maxRequests = 3; + final RetryConfig config = + RetryConfig.builder(rule) + .retryLimiter(new RetryLimiter() { + @Override + public boolean shouldRetry(ClientRequestContext ctx) { + throw new RuntimeException(); + } + + @Override + public void handleDecision(ClientRequestContext ctx, RetryDecision decision) { + throw new RuntimeException(); + } + }) + .maxTotalAttempts(maxRequests) + .build(); + + final AtomicInteger counter = new AtomicInteger(); + final BlockingWebClient client = + WebClient.builder("http://foo.com") + .decorator((delegate, ctx, req) -> { + counter.incrementAndGet(); + return HttpResponse.of(200); + }) + .decorator(RetryingClient.newDecorator(config)) + .build().blocking(); + + assertThatThrownBy(() -> client.get("/")).isInstanceOf(RetryLimitedException.class); + assertThat(counter).hasValue(1); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterTest.java new file mode 100644 index 00000000000..f45dc5ffc53 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; + +class RetryLimiterTest { + + @Test + void concurrencyLimitingAllowsRetriesWithinLimit() { + final RetryLimiter limiter = RetryLimiter.concurrencyLimiting(10); + final ClientRequestContext ctx = ctx(); + + assertThat(limiter.shouldRetry(ctx)).isTrue(); + assertThat(limiter.shouldRetry(ctx)).isTrue(); + } + + @Test + void concurrencyLimitingBlocksExcessiveRetries() throws Exception { + final RetryLimiter limiter = RetryLimiter.concurrencyLimiting(1); + final ClientRequestContext ctx1 = ctx(); + final ClientRequestContext ctx2 = ctx(); + + assertThat(limiter.shouldRetry(ctx1)).isTrue(); + assertThat(limiter.shouldRetry(ctx2)).isFalse(); + + ctx1.cancel(); + // cancel doesn't synchronously complete futures, so we wait until the limiter is available again + await().untilAsserted(() -> assertThat(limiter.shouldRetry(ctx2)).isTrue()); + } + + @Test + void concurrencyLimitingIgnoresDecisionHandling() { + final RetryLimiter limiter = RetryLimiter.concurrencyLimiting(10); + final ClientRequestContext ctx = ctx(); + final RetryDecision zeroDecision = RetryDecision.retry(Backoff.ofDefault(), 0.0); + final RetryDecision positiveDecision = RetryDecision.retry(Backoff.ofDefault(), 1.0); + final RetryDecision negativeDecision = RetryDecision.retry(Backoff.ofDefault(), -1.0); + + assertThat(limiter.shouldRetry(ctx)).isTrue(); + limiter.handleDecision(ctx, zeroDecision); + limiter.handleDecision(ctx, positiveDecision); + limiter.handleDecision(ctx, negativeDecision); + assertThat(limiter.shouldRetry(ctx)).isTrue(); + } + + @Test + void tokenBasedInitiallyAllowsRetries() { + final RetryLimiter limiter = RetryLimiter.tokenBased(10, 1); + final ClientRequestContext ctx = ctx(); + + assertThat(limiter.shouldRetry(ctx)).isTrue(); + } + + @Test + void tokenBasedBlocksWhenTokensExhausted() { + final RetryLimiter limiter = RetryLimiter.tokenBased(10, 5); + final ClientRequestContext ctx = ctx(); + final RetryDecision positiveDecision = RetryDecision.retry(Backoff.ofDefault(), 1.0); + + for (int i = 0; i < 5; i++) { + assertThat(limiter.shouldRetry(ctx)).isTrue(); + limiter.handleDecision(ctx, positiveDecision); + } + + assertThat(limiter.shouldRetry(ctx)).isFalse(); + } + + @Test + void tokenBasedReplenishesTokens() { + final RetryLimiter limiter = RetryLimiter.tokenBased(10, 5); + final ClientRequestContext ctx = ctx(); + final RetryDecision positiveDecision = RetryDecision.retry(Backoff.ofDefault(), 1.0); + final RetryDecision negativeDecision = RetryDecision.retry(Backoff.ofDefault(), -1.0); + + for (int i = 0; i < 5; i++) { + assertThat(limiter.shouldRetry(ctx)).isTrue(); + limiter.handleDecision(ctx, positiveDecision); + } + // blocked + assertThat(limiter.shouldRetry(ctx)).isFalse(); + + // replenish tokens + limiter.handleDecision(ctx, negativeDecision); + assertThat(limiter.shouldRetry(ctx)).isTrue(); + } + + @Test + void tokenBasedIgnoresZeroPermits() { + final RetryLimiter limiter = RetryLimiter.tokenBased(2, 1); + final ClientRequestContext ctx = ctx(); + final RetryDecision zeroDecision = RetryDecision.retry(Backoff.ofDefault(), 0.0); + + for (int i = 0; i < 5; i++) { + assertThat(limiter.shouldRetry(ctx)).isTrue(); + limiter.handleDecision(ctx, zeroDecision); + } + } + + private static ClientRequestContext ctx() { + return ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + } +} diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/RpcRetryLimiterTest.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/RpcRetryLimiterTest.java new file mode 100644 index 00000000000..2854a225526 --- /dev/null +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/RpcRetryLimiterTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.thrift; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.UnprocessedRequestException; +import com.linecorp.armeria.client.retry.Backoff; +import com.linecorp.armeria.client.retry.RetryConfig; +import com.linecorp.armeria.client.retry.RetryDecision; +import com.linecorp.armeria.client.retry.RetryLimitedException; +import com.linecorp.armeria.client.retry.RetryLimiter; +import com.linecorp.armeria.client.retry.RetryRule; +import com.linecorp.armeria.client.retry.RetryingRpcClient; +import com.linecorp.armeria.common.RpcResponse; + +import testing.thrift.main.HelloService; + +class RpcRetryLimiterTest { + + @Test + void basicCase() throws Exception { + final Backoff fixed = Backoff.fixed(0); + // simulates grpc retry throttling behavior + final RetryRule retryRule = RetryRule.builder() + .onException() + .build(RetryDecision.retry(fixed)); + final RetryConfig config = RetryConfig.builderForRpc(retryRule) + .retryLimiter(RetryLimiter.tokenBased(3, 1)) + .build(); + + final AtomicInteger counter = new AtomicInteger(); + final HelloService.Iface iface = ThriftClients.builder("http://foo.com") + .rpcDecorator((delegate, ctx, req) -> { + counter.incrementAndGet(); + return RpcResponse.ofFailure(new RuntimeException()); + }) + .rpcDecorator(RetryingRpcClient.newDecorator(config)) + .build(HelloService.Iface.class); + assertThatThrownBy(() -> iface.hello("hello")).isInstanceOf(UnprocessedRequestException.class) + .hasCauseInstanceOf(RetryLimitedException.class); + assertThat(counter).hasValue(2); + } +}