Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to terminate unfinished requests after graceful shutdown #5941

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.util.Sampler;
import com.linecorp.armeria.server.Server;

/**
* A {@link CancellationException} raised when a {@link Server} cannot handle a request because it's shutting
* down.
*/
@UnstableApi
public final class ShuttingDownException extends CancellationException {
private static final long serialVersionUID = -4963725400532294491L;

private static final ShuttingDownException INSTANCE = new ShuttingDownException(false);

/**
* Returns a singleton {@link ShuttingDownException} or newly-created exception depending on
* the result of {@link Sampler#isSampled(Object)} of {@link Flags#verboseExceptionSampler()}.
*/
public static ShuttingDownException get() {
return Flags.verboseExceptionSampler().isSampled(ShuttingDownException.class) ?
new ShuttingDownException() : INSTANCE;
}

private ShuttingDownException() {}

private ShuttingDownException(@SuppressWarnings("unused") boolean dummy) {
super(null, null, false, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.linecorp.armeria.internal.common;

import static com.linecorp.armeria.internal.client.ClosedStreamExceptionUtil.newClosedSessionException;
import static java.util.Objects.requireNonNull;

import java.util.AbstractMap.SimpleImmutableEntry;
Expand Down Expand Up @@ -392,7 +391,7 @@ protected final void updateClosedId(int id) {
protected abstract boolean isPing(int id);

@Override
public final void close() {
public final void close(Throwable cause) {
if (closed) {
return;
}
Expand All @@ -403,7 +402,6 @@ public final void close() {
return;
}

final ClosedSessionException cause = newClosedSessionException(ch);
for (Queue<Entry<HttpObject, ChannelPromise>> queue : pendingWritesMap.values()) {
for (;;) {
final Entry<HttpObject, ChannelPromise> e = queue.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected final boolean isStreamPresentAndWritable(int streamId) {
}

@Override
public final void close() {
public final void close(Throwable unused) {
closed = true;
keepAliveHandler().destroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ default ChannelFuture writeReset(int id, int streamId, Http2Error error,
/**
* Releases the resources related with this encoder and fails any unfinished writes.
*/
void close();
void close(Throwable cause);

/**
* Returns {@code true} if {@link #close()} is called.
* Returns {@code true} if {@link #close(Throwable)} is called.
*/
boolean isClosed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ final class AggregatingDecodedHttpRequest extends AggregatingStreamMessage<HttpO
private boolean isNormallyClosed;

private final CompletableFuture<Void> aggregationFuture = new CompletableFuture<>();
private final CompletableFuture<Void> whenResponseSent = new CompletableFuture<>();

AggregatingDecodedHttpRequest(EventLoop eventLoop, int id, int streamId, RequestHeaders headers,
boolean keepAlive, long maxRequestLength,
Expand Down Expand Up @@ -89,6 +90,12 @@ public CompletableFuture<AggregatedHttpRequest> aggregate(AggregationOptions opt
return super.aggregate(options);
}

@Nullable
@Override
public ServiceRequestContext requestContext() {
return ctx;
}

@Override
public RoutingContext routingContext() {
return routingCtx;
Expand Down Expand Up @@ -234,6 +241,11 @@ public CompletableFuture<Void> whenAggregated() {
return aggregationFuture;
}

@Override
public CompletableFuture<Void> whenResponseSent() {
return whenResponseSent;
}

@Override
public ExchangeType exchangeType() {
return exchangeType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ static DecodedHttpRequest of(boolean endOfStream, EventLoop eventLoop, int id, i

void init(ServiceRequestContext ctx);

@Nullable
ServiceRequestContext requestContext();

RoutingContext routingContext();

/**
Expand Down Expand Up @@ -118,6 +121,11 @@ default CompletableFuture<Void> whenAggregated() {
return null;
}

/**
* Returns a {@link CompletableFuture} that is completed when the response is fully sent.
*/
CompletableFuture<Void> whenResponseSent();

/**
* Returns the {@link ExchangeType} that determines whether to stream an {@link HttpRequest} or
* {@link HttpResponse}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server;

import java.time.Duration;
import java.util.Objects;
import java.util.function.BiFunction;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.common.HttpRequest;

final class DefaultGracefulShutdown implements GracefulShutdown {

private final Duration quietPeriod;
private final Duration timeout;
private final BiFunction<ServiceRequestContext, HttpRequest, Throwable> errorFunction;

DefaultGracefulShutdown(Duration quietPeriod, Duration timeout,
BiFunction<ServiceRequestContext, HttpRequest, Throwable> errorFunction) {
this.quietPeriod = quietPeriod;
this.timeout = timeout;
this.errorFunction = errorFunction;
}

@Override
public Duration quietPeriod() {
return quietPeriod;
}

@Override
public Duration timeout() {
return timeout;
}

@Override
public Throwable shutdownError(ServiceRequestContext ctx, HttpRequest request) {
return errorFunction.apply(ctx, request);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DefaultGracefulShutdown)) {
return false;
}
final DefaultGracefulShutdown that = (DefaultGracefulShutdown) o;
return quietPeriod.equals(that.quietPeriod) &&
timeout.equals(that.timeout) &&
errorFunction.equals(that.errorFunction);
}

@Override
public int hashCode() {
return Objects.hash(quietPeriod, timeout, errorFunction);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("quietPeriod", quietPeriod)
.add("timeout", timeout)
.add("errorFunction", errorFunction)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ final class DefaultServerConfig implements ServerConfig {
private final int http1MaxHeaderSize;
private final int http1MaxChunkSize;

private final Duration gracefulShutdownQuietPeriod;
private final Duration gracefulShutdownTimeout;
private final GracefulShutdown gracefulShutdown;

private final BlockingTaskExecutor blockingTaskExecutor;

Expand Down Expand Up @@ -135,7 +134,7 @@ final class DefaultServerConfig implements ServerConfig {
long http2MaxStreamsPerConnection, int http2MaxFrameSize, long http2MaxHeaderListSize,
int http2MaxResetFramesPerWindow, int http2MaxResetFramesWindowSeconds,
int http1MaxInitialLineLength, int http1MaxHeaderSize,
int http1MaxChunkSize, Duration gracefulShutdownQuietPeriod, Duration gracefulShutdownTimeout,
int http1MaxChunkSize, GracefulShutdown gracefulShutdown,
BlockingTaskExecutor blockingTaskExecutor,
MeterRegistry meterRegistry, int proxyProtocolMaxTlvSize,
Map<ChannelOption<?>, Object> channelOptions,
Expand Down Expand Up @@ -183,12 +182,7 @@ final class DefaultServerConfig implements ServerConfig {
http1MaxHeaderSize, "http1MaxHeaderSize");
this.http1MaxChunkSize = validateNonNegative(
http1MaxChunkSize, "http1MaxChunkSize");
this.gracefulShutdownQuietPeriod = validateNonNegative(requireNonNull(
gracefulShutdownQuietPeriod), "gracefulShutdownQuietPeriod");
this.gracefulShutdownTimeout = validateNonNegative(requireNonNull(
gracefulShutdownTimeout), "gracefulShutdownTimeout");
validateGreaterThanOrEqual(gracefulShutdownTimeout, "gracefulShutdownTimeout",
gracefulShutdownQuietPeriod, "gracefulShutdownQuietPeriod");
this.gracefulShutdown = requireNonNull(gracefulShutdown, "gracefulShutdown");

requireNonNull(blockingTaskExecutor, "blockingTaskExecutor");
this.blockingTaskExecutor = monitorBlockingTaskExecutor(blockingTaskExecutor, meterRegistry);
Expand Down Expand Up @@ -369,7 +363,7 @@ static Duration validateNonNegative(Duration duration, String fieldName) {
static void validateGreaterThanOrEqual(Duration larger, String largerFieldName,
Duration smaller, String smallerFieldName) {
if (larger.compareTo(smaller) < 0) {
throw new IllegalArgumentException(largerFieldName + " must be greater than or equal to" +
throw new IllegalArgumentException(largerFieldName + " must be greater than or equal to " +
smallerFieldName);
}
}
Expand Down Expand Up @@ -586,12 +580,17 @@ public int http2MaxResetFramesWindowSeconds() {

@Override
public Duration gracefulShutdownQuietPeriod() {
return gracefulShutdownQuietPeriod;
return gracefulShutdown.quietPeriod();
}

@Override
public Duration gracefulShutdownTimeout() {
return gracefulShutdownTimeout;
return gracefulShutdown.timeout();
}

@Override
public GracefulShutdown gracefulShutdown() {
return gracefulShutdown;
}

@Override
Expand Down Expand Up @@ -702,7 +701,7 @@ public String toString() {
http2InitialConnectionWindowSize(), http2InitialStreamWindowSize(),
http2MaxStreamsPerConnection(), http2MaxFrameSize(), http2MaxHeaderListSize(),
http1MaxInitialLineLength(), http1MaxHeaderSize(), http1MaxChunkSize(),
proxyProtocolMaxTlvSize(), gracefulShutdownQuietPeriod(), gracefulShutdownTimeout(),
proxyProtocolMaxTlvSize(), gracefulShutdown(),
blockingTaskExecutor(),
meterRegistry(), channelOptions(), childChannelOptions(),
clientAddressSources(), clientAddressTrustedProxyFilter(), clientAddressFilter(),
Expand All @@ -723,7 +722,7 @@ static String toString(
int http2InitialStreamWindowSize, long http2MaxStreamsPerConnection, int http2MaxFrameSize,
long http2MaxHeaderListSize, long http1MaxInitialLineLength, long http1MaxHeaderSize,
long http1MaxChunkSize, int proxyProtocolMaxTlvSize,
Duration gracefulShutdownQuietPeriod, Duration gracefulShutdownTimeout,
GracefulShutdown gracefulShutdown,
@Nullable BlockingTaskExecutor blockingTaskExecutor,
@Nullable MeterRegistry meterRegistry,
Map<ChannelOption<?>, ?> channelOptions, Map<ChannelOption<?>, ?> childChannelOptions,
Expand Down Expand Up @@ -799,10 +798,8 @@ static String toString(
buf.append(http1MaxChunkSize);
buf.append("B, proxyProtocolMaxTlvSize: ");
buf.append(proxyProtocolMaxTlvSize);
buf.append("B, gracefulShutdownQuietPeriod: ");
buf.append(gracefulShutdownQuietPeriod);
buf.append(", gracefulShutdownTimeout: ");
buf.append(gracefulShutdownTimeout);
buf.append("B, gracefulShutdown: ");
buf.append(gracefulShutdown);
if (blockingTaskExecutor != null) {
buf.append(", blockingTaskExecutor: ");
buf.append(blockingTaskExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.ShuttingDownException;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.common.RequestContextExtension;
Expand Down Expand Up @@ -95,6 +96,11 @@ public HttpResponse onServiceException(ServiceRequestContext ctx, Throwable caus
return internalRenderStatus(ctx, ctx.request().headers(), status, cause);
}

if (cause instanceof ShuttingDownException) {
return internalRenderStatus(ctx, ctx.request().headers(),
HttpStatus.SERVICE_UNAVAILABLE, cause);
}

return internalRenderStatus(ctx, ctx.request().headers(),
HttpStatus.INTERNAL_SERVER_ERROR, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ final class EmptyContentDecodedHttpRequest implements DecodedHttpRequest {

@Nullable
private CompletableFuture<AggregatedHttpRequest> aggregateFuture;
private final CompletableFuture<Void> whenResponseSent = new CompletableFuture<>();

@Nullable
private HttpResponse response;
Expand All @@ -84,6 +85,12 @@ public void init(ServiceRequestContext ctx) {
this.ctx = ctx;
}

@Nullable
@Override
public ServiceRequestContext requestContext() {
return ctx;
}

@Override
public RoutingContext routingContext() {
return routingContext;
Expand Down Expand Up @@ -234,6 +241,11 @@ public boolean isResponseAborted() {
return abortResponseCause != null;
}

@Override
public CompletableFuture<Void> whenResponseSent() {
return whenResponseSent;
}

@Override
public ExchangeType exchangeType() {
return exchangeType;
Expand Down
Loading
Loading