Skip to content

Commit

Permalink
Using WeightRampingUpStrategy with AbstractEndpointSelector can f…
Browse files Browse the repository at this point in the history
…ail to select initial endpoints (#5752)

Motivation:

Following the change of #5693, now calling `EndpointGroup#selectNow` may return `null` if the `RampingUpEndpointWeightSelector#updateEndpoints` is not completed yet since updates are always scheduled on the event loop.

https://github.com/line/armeria/blob/8bad3305a8e1843c049537395368266215245df6/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightRampingUpStrategy.java#L149

In addition, most `EndpointGroup` implementations use `AbstractEndpointSelector`.
Whenever a change is made to the `EndpointGroup#endpoints`, [refreshEndpoints](https://github.com/line/armeria/blob/8bad3305a8e1843c049537395368266215245df6/core/src/main/java/com/linecorp/armeria/client/endpoint/AbstractEndpointSelector.java#L148) is called.

The expectation here is that [updateEndpoints](https://github.com/line/armeria/blob/8bad3305a8e1843c049537395368266215245df6/core/src/main/java/com/linecorp/armeria/client/endpoint/AbstractEndpointSelector.java#L150) updates the internal state of the endpoint selector, and then pending futures invoke `selectNow` again.

https://github.com/line/armeria/blob/8bad3305a8e1843c049537395368266215245df6/core/src/main/java/com/linecorp/armeria/client/endpoint/AbstractEndpointSelector.java#L148-L158

However, `WeightRampingUpStrategy` completes `updateEndpoints` asynchronously. Therefore, `pendingIf` futures may call `RampingUpEndpointWeightSelector#selectNow` although `RampingUpEndpointWeightSelector` didn't update its state yet. Consequently, some pending futures may never have a chance to be updated unless the endpoints is updated again.

Modifications:

- Modified `AbstractEndpointSelector#updateNewEndpoints` to return a `CompletableFuture<Void>` instead of `void`
- `RampingUpEndpointWeightSelector#updateNewEndpoints` completes the returned future once the endpoints have completely updated the internal state

Result:

- `WeightRampingUpStrategy` works with `AbstractEndpointSelector`

<!--
Visit this URL to learn more about how to write a pull request description:
https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->
  • Loading branch information
jrhee17 authored Jun 12, 2024
1 parent 9495a06 commit 76e5267
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,24 @@ protected final void initialize() {

private void refreshEndpoints(List<Endpoint> endpoints) {
// Allow subclasses to update the endpoints first.
updateNewEndpoints(endpoints);

lock.lock();
try {
pendingFutures.removeIf(ListeningFuture::tryComplete);
} finally {
lock.unlock();
}
updateNewEndpoints(endpoints).handle((ignored, e) -> {
lock.lock();
try {
pendingFutures.removeIf(ListeningFuture::tryComplete);
} finally {
lock.unlock();
}
return null;
});
}

/**
* Invoked when the {@link EndpointGroup} has been updated.
*/
@UnstableApi
protected void updateNewEndpoints(List<Endpoint> endpoints) {}
protected CompletableFuture<Void> updateNewEndpoints(List<Endpoint> endpoints) {
return UnmodifiableFuture.completedFuture(null);
}

private void addPendingFuture(ListeningFuture future) {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -144,9 +145,9 @@ final class RampingUpEndpointWeightSelector extends AbstractEndpointSelector {
}

@Override
protected void updateNewEndpoints(List<Endpoint> endpoints) {
protected CompletableFuture<Void> updateNewEndpoints(List<Endpoint> endpoints) {
// Use the executor so the order of endpoints change is guaranteed.
executor.execute(() -> updateEndpoints(endpoints));
return CompletableFuture.runAsync(() -> updateEndpoints(endpoints), executor);
}

private long computeCreateTimestamp(Endpoint endpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.ImmutableList;
Expand All @@ -28,6 +29,7 @@
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.UnmodifiableFuture;

final class WeightedRoundRobinStrategy implements EndpointSelectionStrategy {

Expand Down Expand Up @@ -62,11 +64,12 @@ private static final class WeightedRoundRobinSelector extends AbstractEndpointSe
}

@Override
protected void updateNewEndpoints(List<Endpoint> endpoints) {
protected CompletableFuture<Void> updateNewEndpoints(List<Endpoint> endpoints) {
final EndpointsAndWeights endpointsAndWeights = this.endpointsAndWeights;
if (endpointsAndWeights == null || endpointsAndWeights.endpoints != endpoints) {
this.endpointsAndWeights = new EndpointsAndWeights(endpoints);
}
return UnmodifiableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ void testSelectionTimeoutException() {
.hasRootCauseInstanceOf(EndpointSelectionTimeoutException.class);
}

@Test
void testRampingUpInitialSelection() {
final DynamicEndpointGroup endpointGroup =
new DynamicEndpointGroup(EndpointSelectionStrategy.rampingUp());
final Endpoint endpoint = Endpoint.of("foo.com");
endpointGroup.setEndpoints(ImmutableList.of(endpoint));
final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/"));
final Endpoint selected = endpointGroup.select(ctx, ctx.eventLoop()).join();
assertThat(selected).isEqualTo(endpoint);
}

private static AbstractEndpointSelector newSelector(EndpointGroup endpointGroup) {
final AbstractEndpointSelector selector = new AbstractEndpointSelector(endpointGroup) {

Expand Down

0 comments on commit 76e5267

Please sign in to comment.