From 14350be2f60d4ff45fe904707a9ed5d451cf9acf Mon Sep 17 00:00:00 2001 From: Violeta Georgieva <696661+violetagg@users.noreply.github.com> Date: Wed, 9 Jul 2025 15:24:03 +0300 Subject: [PATCH 1/6] Add HTTP/2 PING-based connection health checks Introduce PING frame support for proactive connection health monitoring on idle HTTP/2 connections. The feature sends PING frames to detect unresponsive peers and close stale connections. Configuration via Http2SettingsSpec: - pingAckTimeout: Timeout for ACK response - pingAckDropThreshold: Maximum PING attempts (default: 1) Requires idle timeout configuration: - Client: ConnectionProvider.maxIdleTime - Server: HttpServer.idleTimeout The feature is opt-in and integrates automatically with connection pool eviction (client) and idle timeout handling (server). This implementation is based on PR #3612, reduces the scope and adds HttpClient support. Co-authored-by: raccoonback Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com> --- docs/modules/ROOT/pages/http-client.adoc | 35 ++ docs/modules/ROOT/pages/http-server.adoc | 35 ++ .../resources/PooledConnectionProvider.java | 5 + .../http/client/liveness/Application.java | 52 +++ .../http/server/liveness/Application.java | 47 ++ reactor-netty-http/build.gradle | 1 + .../netty/http/Http2ConnectionLiveness.java | 225 +++++++++ .../reactor/netty/http/Http2SettingsSpec.java | 148 ++++++ .../netty/http/HttpConnectionLiveness.java | 81 ++++ .../netty/http/IdleTimeoutHandler.java | 133 ++++++ .../http/client/Http2ConnectionProvider.java | 106 ++++- .../reactor/netty/http/client/Http2Pool.java | 14 +- .../netty/http/server/HttpServerConfig.java | 39 +- .../netty/http/server/HttpTrafficHandler.java | 6 +- .../netty/http/server/IdleTimeoutHandler.java | 85 ---- .../reactor-netty-http/reflect-config.json | 4 +- .../http/Http2ConnectionLivenessTest.java | 432 ++++++++++++++++++ .../netty/http/Http2SettingsSpecTests.java | 65 +++ .../http/server/HttpIdleTimeoutTest.java | 155 +++++++ 19 files changed, 1562 insertions(+), 106 deletions(-) create mode 100644 reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/liveness/Application.java create mode 100644 reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/liveness/Application.java create mode 100644 reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java create mode 100644 reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionLiveness.java create mode 100644 reactor-netty-http/src/main/java/reactor/netty/http/IdleTimeoutHandler.java delete mode 100644 reactor-netty-http/src/main/java/reactor/netty/http/server/IdleTimeoutHandler.java create mode 100644 reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java create mode 100644 reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java diff --git a/docs/modules/ROOT/pages/http-client.adoc b/docs/modules/ROOT/pages/http-client.adoc index 9eb4125778..8aafb5847e 100644 --- a/docs/modules/ROOT/pages/http-client.adoc +++ b/docs/modules/ROOT/pages/http-client.adoc @@ -680,6 +680,41 @@ include::{examples-dir}/channeloptions/Application.java[lines=18..54] <4> Configures the time between individual `keepalive` probes to 1 minute. <5> Configures the maximum number of TCP `keepalive` probes to 8. +=== HTTP/2 Connection Health Check +For `HTTP/2 connections`, it's recommended to configure `PING` frames for connection maintenance and health monitoring. +The `HttpClient` uses `PING` frames to detect unresponsive connections when `maxIdleTime` on the connection pool is configured. + +Benefits of using `PING` frames: + +- Actively probes connection liveness when idle +- Detects unresponsive connections through timely health checks +- Closes stale connections automatically +- Maintains reliable connection pool state + +[NOTE] +==== +`HTTP/2` `PING` frame-based health checks require `maxIdleTime` on the connection pool to be configured as a prerequisite. +Without idle timeout configuration, PING frames will not be sent and connections may remain open indefinitely, +preventing proper detection of inactive or unresponsive connections. + +The idle timeout and PING settings work together: + +1. Connection becomes idle (no activity for `maxIdleTime`) +2. A PING frame is sent to check if the peer is responsive +3. If no ACK within `pingAckTimeout`, retry based on `pingAckDropThreshold` +4. If all attempts fail, the connection is closed +==== + +To enable `PING` frames for `HTTP/2` connections, configure the `HttpClient` as follows: + +{examples-link}/liveness/Application.java +---- +include::{examples-dir}/liveness/Application.java[lines=27..52] +---- +<1> Configure `maxIdleTime` to enable PING-based health checks. When a connection is idle for this duration, a PING frame will be sent. +<2> Configure `pingAckTimeout` - the maximum time to wait for each PING ACK response. If no ACK is received within this timeout, another PING is sent (up to the threshold). +<3> Configure `pingAckDropThreshold` - the maximum number of PING frames to send before closing the connection. For example, `2` means send up to 2 PING frames (if first fails, try once more) before closing. + [[ssl-tls-timeout]] ==== SSL/TLS Timeout `HttpClient` supports the SSL/TLS functionality provided by Netty. diff --git a/docs/modules/ROOT/pages/http-server.adoc b/docs/modules/ROOT/pages/http-server.adoc index 06f2bcac64..daaa12bab4 100644 --- a/docs/modules/ROOT/pages/http-server.adoc +++ b/docs/modules/ROOT/pages/http-server.adoc @@ -789,6 +789,41 @@ include::{examples-dir}/idle/timeout/Application.java[lines=18..35] ---- <1> Configures the default idle timeout to 1 second. +=== HTTP/2 Connection Health Check +For `HTTP/2 connections`, it's recommended to configure `PING` frames for connection maintenance and health monitoring. +The `HttpServer` uses `PING` frames to detect unresponsive clients when `idleTimeout` is configured. + +Benefits of using `PING` frames: + +- Actively probes connection liveness when idle +- Detects unresponsive clients through timely health checks +- Closes stale connections automatically +- Prevents resource exhaustion from dead connections + +[NOTE] +==== +`HTTP/2` `PING` frame-based health checks require `idleTimeout` to be configured as a prerequisite. +Without idle timeout configuration, PING frames will not be sent and connections may remain open indefinitely, +preventing proper detection of inactive or unresponsive connections. + +The idle timeout and PING settings work together: + +1. Connection becomes idle (no activity for `idleTimeout`) +2. A PING frame is sent to check if the client is responsive +3. If no ACK within `pingAckTimeout`, retry based on `pingAckDropThreshold` +4. If all attempts fail, the connection is closed +==== + +To enable `PING` frames for `HTTP/2` connections, configure the `HttpServer` as follows: + +{examples-link}/liveness/Application.java +---- +include::{examples-dir}/liveness/Application.java[lines=26..50] +---- +<1> Configure `idleTimeout` to enable PING-based health checks. When a connection is idle for this duration, a PING frame will be sent. +<2> Configure `pingAckTimeout` - the maximum time to wait for each PING ACK response. If no ACK is received within this timeout, another PING is sent (up to the threshold). +<3> Configure `pingAckDropThreshold` - the maximum number of PING frames to send before closing the connection. For example, `2` means send up to 2 PING frames (if first fails, try once more) before closing. + [[http-server-ssl-tls-timeout]] === SSL/TLS Timeout `HttpServer` supports the SSL/TLS functionality provided by Netty. diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index afa4fdc9c0..b1e654da42 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -644,6 +644,11 @@ public AllocationStrategy allocationStrategy() { return allocationStrategy; } + @Nullable + public BiPredicate evictionPredicate() { + return evictionPredicate; + } + public long maxIdleTime() { return this.maxIdleTime; } diff --git a/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/liveness/Application.java b/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/liveness/Application.java new file mode 100644 index 0000000000..55c2e28815 --- /dev/null +++ b/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/liveness/Application.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.examples.documentation.http.client.liveness; + +import io.netty.handler.codec.http.HttpHeaders; +import reactor.core.publisher.Mono; +import reactor.netty.http.HttpProtocol; +import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; +import reactor.util.function.Tuple2; + +import java.time.Duration; + +public class Application { + + public static void main(String[] args) { + ConnectionProvider provider = + ConnectionProvider.builder("liveness") + .maxIdleTime(Duration.ofSeconds(2)) //<1> + .build(); + + HttpClient client = + HttpClient.create(provider) + .protocol(HttpProtocol.H2) + .secure() + .http2Settings(builder -> builder.pingAckTimeout(Duration.ofMillis(600)) //<2> + .pingAckDropThreshold(2)); //<3> + + Tuple2 response = + client.get() + .uri("https://example.com/") + .responseSingle((res, bytes) -> bytes.asString() + .zipWith(Mono.just(res.responseHeaders()))) + .block(); + + System.out.println("Used stream ID: " + response.getT2().get("x-http2-stream-id")); + System.out.println("Response: " + response.getT1()); + } +} diff --git a/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/liveness/Application.java b/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/liveness/Application.java new file mode 100644 index 0000000000..808b399405 --- /dev/null +++ b/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/liveness/Application.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.examples.documentation.http.server.liveness; + +import reactor.netty.DisposableServer; +import reactor.netty.http.Http2SslContextSpec; +import reactor.netty.http.HttpProtocol; +import reactor.netty.http.server.HttpServer; + +import java.io.File; +import java.time.Duration; + +public class Application { + + public static void main(String[] args) { + File cert = new File("certificate.crt"); + File key = new File("private.key"); + + Http2SslContextSpec http2SslContextSpec = Http2SslContextSpec.forServer(cert, key); + + DisposableServer server = + HttpServer.create() + .port(8080) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(http2SslContextSpec)) + .idleTimeout(Duration.ofSeconds(1)) //<1> + .http2Settings(builder -> builder.pingAckTimeout(Duration.ofMillis(600)) //<2> + .pingAckDropThreshold(2)) //<3> + .bindNow(); + + server.onDispose() + .block(); + } +} diff --git a/reactor-netty-http/build.gradle b/reactor-netty-http/build.gradle index 823b83d582..a8cfd402c6 100644 --- a/reactor-netty-http/build.gradle +++ b/reactor-netty-http/build.gradle @@ -144,6 +144,7 @@ dependencies { testImplementation "io.specto:hoverfly-java-junit5:$hoverflyJavaVersion" testImplementation "org.apache.tomcat.embed:tomcat-embed-core:$tomcatVersion" testImplementation "io.projectreactor:reactor-test:$testAddonVersion" + testImplementation "io.projectreactor.addons:reactor-extra:$reactorAddonsVersion" testImplementation "org.assertj:assertj-core:$assertJVersion" testImplementation "org.awaitility:awaitility:$awaitilityVersion" testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java b/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java new file mode 100644 index 0000000000..fa726edb53 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java @@ -0,0 +1,225 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2PingFrame; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static reactor.netty.ReactorNetty.format; + +/** + * Supports connection health checks using HTTP/2 PING frames. + * + *

This class implements liveness detection for HTTP/2 connections by sending PING frames + * when the connection becomes idle (no read/write activity). The peer must respond with a PING ACK + * within the configured timeout, otherwise the connection is considered unresponsive.

+ * + *

Behavior

+ *
    + *
  • PING frames are only sent when the connection is idle (no active reads/writes)
  • + *
  • Each PING frame must receive an ACK within {@code pingAckTimeout}
  • + *
  • If no ACK is received, retry up to {@code pingAckDropThreshold} times
  • + *
  • If all attempts fail, the connection is closed
  • + *
  • Receiving any HTTP/2 frame (HEADERS, DATA, or PING ACK) cancels the health check
  • + *
+ * + *

Thread Safety

+ *

This class is designed to be used with a single channel. All operations, including + * scheduled tasks, execute on the channel's event loop thread, eliminating the need for + * explicit synchronization. A new instance is created per channel.

+ * + *

Configuration Guidelines

+ *
    + *
  • pingAckTimeout: Should account for expected network latency and load. + * Values that are too short may cause false positives due to temporary delays.
  • + *
  • pingAckDropThreshold: Balance between quick failure detection and tolerance + * for transient issues. Higher values increase detection time but reduce false positives.
  • + *
+ * + * @author raccoonback + * @author Violeta Georgieva + * @since 1.2.12 + */ +public final class Http2ConnectionLiveness implements HttpConnectionLiveness { + + private final Http2FrameWriter http2FrameWriter; + private final int pingAckDropThreshold; + private final long pingAckTimeoutNanos; + + private boolean isPingAckPending; + private long lastSentPingData; + private ScheduledFuture pingScheduler; + private int pingAttempts; + + /** + * Constructs a new {@code Http2ConnectionLiveness} instance. + * + * @param http2FrameCodec the HTTP/2 frame codec + * @param pingAckDropThreshold the maximum number of PING frame transmission attempts before closing the connection + * @param pingAckTimeoutNanos the timeout in nanoseconds for receiving a PING ACK response + */ + public Http2ConnectionLiveness(Http2FrameCodec http2FrameCodec, int pingAckDropThreshold, long pingAckTimeoutNanos) { + this.http2FrameWriter = http2FrameCodec.encoder().frameWriter(); + this.pingAckDropThreshold = pingAckDropThreshold; + this.pingAckTimeoutNanos = pingAckTimeoutNanos; + } + + /** + * Cancels the scheduled ping task. + */ + @Override + public void cancel() { + isPingAckPending = false; + pingAttempts = 0; + if (pingScheduler != null) { + pingScheduler.cancel(false); + pingScheduler = null; + } + } + + /** + * Checks the liveness of the connection and schedules a ping if necessary. + * + * @param ctx the {@link ChannelHandlerContext} of the connection + */ + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void check(ChannelHandlerContext ctx) { + if (!isPingAckPending) { + if (log.isDebugEnabled()) { + log.debug(format(ctx.channel(), "Connection was idle. Starting probing with PING frame: timeout={} ns."), + pingAckTimeoutNanos); + } + + PingTimeoutTask pingTimeoutTask = new PingTimeoutTask(ctx); + pingTimeoutTask.writePing(); + } + } + + /** + * Receives a message from the peer and processes it if it is a ping frame. + * + * @param msg the message received from the peer + */ + @Override + public void receive(Object msg) { + if (isPingAckPending) { + if (msg instanceof Http2PingFrame) { + Http2PingFrame frame = (Http2PingFrame) msg; + if (frame.ack() && frame.content() == lastSentPingData) { + cancel(); + } + } + else if (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame) { + cancel(); + } + } + } + + /** + * A task that handles ping timeouts. + */ + class PingTimeoutTask implements ChannelFutureListener, Runnable { + + private final ChannelHandlerContext ctx; + + PingTimeoutTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public void operationComplete(ChannelFuture future) { + if (!future.channel().isActive()) { + // Channel is not active, don't schedule next check + return; + } + + if (future.isSuccess()) { + if (log.isDebugEnabled()) { + log.debug(format(future.channel(), "PING frame was sent: ping data={}, ping attempts={}."), lastSentPingData, pingAttempts); + } + } + else { + if (log.isDebugEnabled()) { + log.debug(format(future.channel(), "Failed to send PING frame: ping data={}, ping attempts={}. " + + "Will wait timeout and retry based on threshold."), lastSentPingData, pingAttempts); + } + } + + // Schedule timeout check - whether send succeeded or failed, + // wait the configured timeout before retry/close decision + pingScheduler = invokeNextSchedule(); + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void run() { + Channel channel = ctx.channel(); + if (channel == null || !channel.isActive() || !isPingAckPending) { + return; + } + + if (isExceedAckDropThreshold()) { + if (log.isDebugEnabled()) { + log.debug(format(channel, "Closing connection due to delayed PING ACK response: timeout={} ns, attempts={}, threshold={}."), + pingAckTimeoutNanos, pingAttempts, pingAckDropThreshold); + } + + //"FutureReturnValueIgnored" this is deliberate + ctx.close(); + } + else { + if (log.isDebugEnabled()) { + log.debug(format(channel, "PING ACK response delayed: timeout={} ns, attempts={}, threshold={}. Retrying PING frame."), + pingAckTimeoutNanos, pingAttempts, pingAckDropThreshold); + } + + writePing(); + } + } + + private ScheduledFuture invokeNextSchedule() { + return ctx.executor().schedule(this, pingAckTimeoutNanos, NANOSECONDS); + } + + private boolean isExceedAckDropThreshold() { + return pingAttempts >= pingAckDropThreshold; + } + + private void writePing() { + isPingAckPending = true; + pingAttempts++; + + lastSentPingData = ThreadLocalRandom.current().nextLong(); + + http2FrameWriter.writePing(ctx, false, lastSentPingData, ctx.newPromise()) + .addListener(this); + + ctx.flush(); + } + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java b/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java index 734516cd81..960eb11ec3 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.http2.Http2Settings; import reactor.util.annotation.Nullable; +import java.time.Duration; import java.util.Objects; /** @@ -124,6 +125,101 @@ public interface Builder { */ Builder maxStreams(long maxStreams); + /** + * Sets the maximum number of PING frame transmission attempts before closing the connection. + * + *

+ * This method configures how many PING frames will be sent without receiving an ACK + * before considering the connection as unresponsive and closing it. + * Each PING waits for {@link #pingAckTimeout(Duration)} before either receiving an ACK + * (which resets the health check) or timing out and sending the next PING. + *

+ * + *

+ * Example with {@code pingAckDropThreshold=1}: + *

    + *
  1. Connection becomes idle
  2. + *
  3. First PING frame is sent
  4. + *
  5. Wait up to {@code pingAckTimeout} for ACK
  6. + *
  7. If no ACK received, connection is closed (1 attempt limit reached)
  8. + *
+ *

+ * + *

+ * Example with {@code pingAckDropThreshold=2}: + *

    + *
  1. Connection becomes idle
  2. + *
  3. First PING frame is sent
  4. + *
  5. Wait up to {@code pingAckTimeout} for ACK
  6. + *
  7. If no ACK received, second PING frame is sent
  8. + *
  9. Wait up to {@code pingAckTimeout} for ACK
  10. + *
  11. If no ACK received, connection is closed (2 attempt limit reached)
  12. + *
+ *

+ * + *

+ * A lower threshold detects connection failures more quickly but may lead + * to premature disconnections if network latency is high. A higher threshold + * tolerates more packet loss or delays but increases the time to detect truly dead connections. + *

+ * + *

+ * The default {@code pingAckDropThreshold} is {@code 1}, meaning only one PING frame + * will be sent. If no ACK is received within {@code pingAckTimeout}, the connection closes immediately. + *

+ * + * @param pingAckDropThreshold the maximum number of PING transmission attempts without receiving ACK. + * Must be a positive integer (minimum 1). Default is 1. + * @return {@code this} + * @since 1.2.12 + */ + default Builder pingAckDropThreshold(int pingAckDropThreshold) { + return this; + } + + /** + * Sets the timeout for receiving an ACK response to HTTP/2 PING frames. + * + *

+ * This method configures how long to wait for a PING ACK response before + * either retrying or closing the connection (based on {@link #pingAckDropThreshold(int)}). + * This timeout is used in conjunction with the idle timeout to detect unresponsive connections. + *

+ * + *

+ * When a connection becomes idle (no reads/writes for the configured idle timeout duration), + * a PING frame is sent to check if the peer is still responsive. If no ACK is received + * within the {@code pingAckTimeout} duration, another PING attempt may be made + * (depending on {@code pingAckDropThreshold}). If all attempts fail, the connection is closed. + *

+ * + *

+ * Important: This setting only takes effect when used together with: + *

    + *
  • For client: {@code ConnectionProvider} with {@code maxIdleTime} configured
  • + *
  • For server: {@code HttpServer} with {@code idleTimeout} configured
  • + *
+ * Without an idle timeout, PING frames will not be sent. + *

+ * + *

+ * The timeout should be chosen based on your network conditions and requirements: + *

    + *
  • Too short: May cause false positives and premature connection closures due to temporary delays
  • + *
  • Too long: Delays detection of truly unresponsive connections
  • + *
+ * Consider your expected network latency, load patterns, and tolerance for stale connections. + *

+ * + * @param pingAckTimeout the timeout duration to wait for a PING ACK response. + * Must be a positive value. + * @return {@code this} + * @since 1.2.12 + */ + default Builder pingAckTimeout(Duration pingAckTimeout) { + return this; + } + /** * Sets the {@code SETTINGS_ENABLE_PUSH} value. * @@ -259,6 +355,27 @@ public Long maxStreams() { return maxStreams; } + /** + * Returns the configured {@code pingAckDropThreshold} value. + * + * @return the configured {@code pingAckDropThreshold} value + * @since 1.2.12 + */ + public Integer pingAckDropThreshold() { + return pingAckDropThreshold; + } + + /** + * Returns the configured {@code pingAckTimeout} value or null. + * + * @return the configured {@code pingAckTimeout} value or null + * @since 1.2.12 + */ + @Nullable + public Duration pingAckTimeout() { + return pingAckTimeout; + } + /** * Returns the configured {@code SETTINGS_ENABLE_PUSH} value or null. * @@ -289,6 +406,8 @@ public boolean equals(Object o) { Objects.equals(maxFrameSize, that.maxFrameSize) && maxHeaderListSize.equals(that.maxHeaderListSize) && Objects.equals(maxStreams, that.maxStreams) && + pingAckDropThreshold.equals(that.pingAckDropThreshold) && + Objects.equals(pingAckTimeout, that.pingAckTimeout) && Objects.equals(pushEnabled, that.pushEnabled); } @@ -306,6 +425,8 @@ public int hashCode() { result = 31 * result + (maxFrameSize == null ? 0 : maxFrameSize); result = 31 * result + (maxHeaderListSize == null ? 0 : Long.hashCode(maxHeaderListSize)); result = 31 * result + (maxStreams == null ? 0 : Long.hashCode(maxStreams)); + result = 31 * result + pingAckDropThreshold; + result = 31 * result + (pingAckTimeout == null ? 0 : Objects.hashCode(pingAckTimeout)); result = 31 * result + (pushEnabled == null ? 0 : Boolean.hashCode(pushEnabled)); return result; } @@ -321,6 +442,8 @@ public int hashCode() { final Integer maxFrameSize; final Long maxHeaderListSize; final Long maxStreams; + final Integer pingAckDropThreshold; + final Duration pingAckTimeout; final Boolean pushEnabled; Http2SettingsSpec(Build build) { @@ -342,16 +465,22 @@ public int hashCode() { maxFrameSize = settings.maxFrameSize(); maxHeaderListSize = settings.maxHeaderListSize(); maxStreams = build.maxStreams; + pingAckDropThreshold = build.pingAckDropThreshold; + pingAckTimeout = build.pingAckTimeout; pushEnabled = settings.pushEnabled(); } static final class Build implements Builder { + static final int DEFAULT_PING_ACK_DROP_THRESHOLD = 1; + Boolean connectProtocolEnabled; Integer maxDecodedRstFramesPerWindow; Integer maxDecodedRstFramesSecondsPerWindow; Integer maxEncodedRstFramesPerWindow; Integer maxEncodedRstFramesSecondsPerWindow; Long maxStreams; + Integer pingAckDropThreshold = Integer.valueOf(DEFAULT_PING_ACK_DROP_THRESHOLD); + Duration pingAckTimeout; final Http2Settings http2Settings = Http2Settings.defaultSettings(); @Override @@ -430,6 +559,25 @@ public Builder maxStreams(long maxStreams) { return this; } + @Override + public Builder pingAckDropThreshold(int pingAckDropThreshold) { + if (pingAckDropThreshold < 1) { + throw new IllegalArgumentException("pingAckDropThreshold must be positive"); + } + this.pingAckDropThreshold = Integer.valueOf(pingAckDropThreshold); + return this; + } + + @Override + public Builder pingAckTimeout(Duration pingAckTimeout) { + Objects.requireNonNull(pingAckTimeout, "pingAckTimeout"); + if (pingAckTimeout.isNegative() || pingAckTimeout.isZero()) { + throw new IllegalArgumentException("pingAckTimeout must be positive"); + } + this.pingAckTimeout = pingAckTimeout; + return this; + } + /* @Override public Builder pushEnabled(boolean pushEnabled) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionLiveness.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionLiveness.java new file mode 100644 index 0000000000..3fac68dc8b --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionLiveness.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http; + +import io.netty.channel.ChannelHandlerContext; +import reactor.util.Logger; +import reactor.util.Loggers; + +import static reactor.netty.ReactorNetty.format; + +/** + * Interface for checking the liveness of an HTTP connection. + * This interface provides methods to cancel started probes or any active work, + * process messages received from the remote peer, and start probing the connection for the liveness. + * + * @author raccoonback + * @author Violeta Georgieva + * @since 1.2.12 + */ +public interface HttpConnectionLiveness { + + Logger log = Loggers.getLogger(HttpConnectionLiveness.class); + + /** + * {@link HttpConnectionLiveness} that immediately closes the HTTP connection on read idle, without starting any probing. + */ + HttpConnectionLiveness CLOSE = new HttpConnectionLiveness() { + + @Override + public void cancel() { + // no op + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void check(ChannelHandlerContext ctx) { + if (log.isDebugEnabled()) { + log.debug(format(ctx.channel(), "Connection was idle, as per configuration the connection will be closed.")); + } + // FutureReturnValueIgnored is deliberate + ctx.close(); + } + + @Override + public void receive(Object msg) { + // no op + } + }; + + /** + * Cancels started probes or any active work. + */ + void cancel(); + + /** + * Closes or starts probing the connection for the liveness. + * + * @param ctx the {@link ChannelHandlerContext} of the connection + */ + void check(ChannelHandlerContext ctx); + + /** + * Processes messages received from the remote peer. + * + * @param msg the message received from the remote peer + */ + void receive(Object msg); +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/IdleTimeoutHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/IdleTimeoutHandler.java new file mode 100644 index 0000000000..5ea247862b --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/IdleTimeoutHandler.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2022-2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import reactor.netty.NettyPipeline; +import reactor.util.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * A handler that manages idle timeout for HTTP connections. + * This handler will close the connection if it remains idle for the specified duration. + * It may also check the liveness of the HTTP connection with sending series of ping messages. + * + * @author raccoonback + * @author Violeta Georgieva + * @since 1.2.12 + */ +public final class IdleTimeoutHandler extends IdleStateHandler { + + private final HttpConnectionLiveness httpConnectionLiveness; + + private IdleTimeoutHandler(long idleTimeout, HttpConnectionLiveness httpConnectionLiveness) { + super(0, 0, idleTimeout, TimeUnit.MILLISECONDS); + this.httpConnectionLiveness = httpConnectionLiveness; + } + + @Override + protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { + if (evt.state() == IdleState.ALL_IDLE) { + httpConnectionLiveness.check(ctx); + } + + ctx.fireUserEventTriggered(evt); + } + + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + httpConnectionLiveness.receive(msg); + + super.channelRead(ctx, msg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + httpConnectionLiveness.cancel(); + + super.channelInactive(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + httpConnectionLiveness.cancel(); + + super.handlerRemoved(ctx); + } + + /** + * Adds an idle timeout handler to the pipeline. + * + * @param pipeline the channel pipeline + * @param idleTimeout the idle timeout duration + * @param httpConnectionLiveness the HTTP connection liveness checker + * @since 1.2.12 + */ + public static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout, + HttpConnectionLiveness httpConnectionLiveness) { + if (idleTimeout != null && pipeline.get(NettyPipeline.IdleTimeoutHandler) == null) { + String baseName = null; + if (pipeline.get(NettyPipeline.HttpCodec) != null) { + baseName = NettyPipeline.HttpCodec; + } + else { + ChannelHandler http2FrameCodec = pipeline.get(Http2FrameCodec.class); + if (http2FrameCodec != null) { + baseName = pipeline.context(http2FrameCodec).name(); + } + else { + ChannelHandler httpServerUpgradeHandler = pipeline.get(HttpServerUpgradeHandler.class); + if (httpServerUpgradeHandler != null) { + baseName = pipeline.context(httpServerUpgradeHandler).name(); + } + else { + ChannelHandler httpServerCodec = pipeline.get(HttpServerCodec.class); + if (httpServerCodec != null) { + baseName = pipeline.context(httpServerCodec).name(); + } + } + } + } + + pipeline.addAfter(baseName, + NettyPipeline.IdleTimeoutHandler, + new IdleTimeoutHandler(idleTimeout.toMillis(), httpConnectionLiveness)); + } + } + + /** + * Removes the idle timeout handler from the pipeline if it exists. + * + * @param pipeline the channel pipeline from which the handler will be removed + * @since 1.2.12 + */ + public static void removeIdleTimeoutHandler(ChannelPipeline pipeline) { + if (pipeline.get(NettyPipeline.IdleTimeoutHandler) != null) { + pipeline.remove(NettyPipeline.IdleTimeoutHandler); + } + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 13b344f57a..ef55226151 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -37,6 +37,11 @@ import reactor.netty.ConnectionObserver; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; +import reactor.netty.http.Http2ConnectionLiveness; +import reactor.netty.http.Http2SettingsSpec; +import reactor.netty.http.IdleTimeoutHandler; +import reactor.netty.internal.shaded.reactor.pool.PoolConfig; +import reactor.netty.internal.shaded.reactor.pool.decorators.GracefulShutdownInstrumentedPool; import reactor.netty.resources.ConnectionPoolMetrics; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.PooledConnectionProvider; @@ -63,6 +68,7 @@ import static reactor.netty.ReactorNetty.setChannelContext; import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED; import static reactor.netty.http.client.HttpClientState.UPGRADE_REJECTED; +import static reactor.netty.http.client.HttpClientState.UPGRADE_SUCCESSFUL; /** * An HTTP/2 implementation for pooled {@link ConnectionProvider}. @@ -121,12 +127,16 @@ protected CoreSubscriber> createDisposableAcquire( SocketAddress proxyAddress = ((ClientTransportConfig) config).proxyProvider() != null ? ((ClientTransportConfig) config).proxyProvider().getProxyAddress() : null; Function uriTagValue = null; + Http2SettingsSpec http2SettingsSpec = null; if (config instanceof HttpClientConfig) { - acceptGzip = ((HttpClientConfig) config).acceptGzip; - uriTagValue = ((HttpClientConfig) config).uriTagValue; + HttpClientConfig httpClientConfig = (HttpClientConfig) config; + acceptGzip = httpClientConfig.acceptGzip; + uriTagValue = httpClientConfig.uriTagValue; + http2SettingsSpec = httpClientConfig.http2Settings; } return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(), - acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, sink, currentContext, uriTagValue); + acceptGzip, http2SettingsSpec, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, + sink, currentContext, uriTagValue); } @Override @@ -253,6 +263,7 @@ static final class DisposableAcquire final ConnectionObserver obs; final ChannelOperations.OnSetup opsFactory; final boolean acceptGzip; + final Http2SettingsSpec http2SettingsSpec; final ChannelMetricsRecorder metricsRecorder; final long pendingAcquireTimeout; final InstrumentedPool pool; @@ -269,6 +280,7 @@ static final class DisposableAcquire ConnectionObserver obs, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + @Nullable Http2SettingsSpec http2SettingsSpec, @Nullable ChannelMetricsRecorder metricsRecorder, long pendingAcquireTimeout, InstrumentedPool pool, @@ -282,6 +294,7 @@ static final class DisposableAcquire this.obs = obs; this.opsFactory = opsFactory; this.acceptGzip = acceptGzip; + this.http2SettingsSpec = http2SettingsSpec; this.metricsRecorder = metricsRecorder; this.pendingAcquireTimeout = pendingAcquireTimeout; this.pool = pool; @@ -298,6 +311,7 @@ static final class DisposableAcquire this.obs = parent.obs; this.opsFactory = parent.opsFactory; this.acceptGzip = parent.acceptGzip; + this.http2SettingsSpec = parent.http2SettingsSpec; this.metricsRecorder = parent.metricsRecorder; this.pendingAcquireTimeout = parent.pendingAcquireTimeout; this.pool = parent.pool; @@ -364,6 +378,16 @@ else if (p.state != null) { return; } + Http2Pool http2Pool = http2Pool(); + if (http2Pool.evictionPredicate != null) { + ChannelHandlerContext frameCodec = http2PooledRef(pooledRef).slot.http2FrameCodecCtx(); + if (frameCodec != null) { + IdleTimeoutHandler.addIdleTimeoutHandler(channel.pipeline(), Duration.ofMillis(http2Pool.maxIdleTime), + new Http2ConnectionLiveness(((Http2FrameCodec) frameCodec.handler()), + http2SettingsSpec.pingAckDropThreshold(), http2SettingsSpec.pingAckTimeout().toNanos())); + } + } + if (getChannelContext(channel) != null) { setChannelContext(channel, null); } @@ -375,6 +399,17 @@ public void onStateChange(Connection connection, State newState) { if (newState == UPGRADE_REJECTED) { invalidate(connection.channel().attr(OWNER).get()); } + else if (newState == UPGRADE_SUCCESSFUL) { + Http2Pool http2Pool = http2Pool(); + if (http2Pool.evictionPredicate != null) { + ChannelHandlerContext frameCodec = http2PooledRef(pooledRef).slot.http2FrameCodecCtx(); + if (frameCodec != null) { + IdleTimeoutHandler.addIdleTimeoutHandler(connection.channel().pipeline(), Duration.ofMillis(http2Pool.maxIdleTime), + new Http2ConnectionLiveness(((Http2FrameCodec) frameCodec.handler()), + http2SettingsSpec.pingAckDropThreshold(), http2SettingsSpec.pingAckTimeout().toNanos())); + } + } + } obs.onStateChange(connection, newState); } @@ -470,6 +505,13 @@ boolean isH2cUpgrade() { return false; } + Http2Pool http2Pool() { + if (pool instanceof GracefulShutdownInstrumentedPool) { + return (Http2Pool) ((GracefulShutdownInstrumentedPool) pool).getOriginalPool(); + } + return (Http2Pool) pool; + } + boolean notHttp2() { Channel channel = pooledRef.poolable().channel(); Http2Pool.Http2PooledRef http2PooledRef = http2PooledRef(pooledRef); @@ -579,16 +621,28 @@ static final class PooledConnectionAllocator { PoolFactory poolFactory, SocketAddress remoteAddress, @Nullable AddressResolverGroup resolver) { - this.parent = parent; this.config = (HttpClientConfig) config; + Function, InstrumentedPool> poolConfigFunction; + Http2SettingsSpec http2SettingsSpec = this.config.http2SettingsSpec(); + if (poolFactory.evictionPredicate() == null && poolFactory.maxIdleTime() != -1 && + http2SettingsSpec != null && http2SettingsSpec.pingAckTimeout() != null) { + Http11EvictionPredicate http11EvictionPredicate = + new Http11EvictionPredicate(poolFactory.maxIdleTime(), poolFactory.maxLifeTime()); + Http2EvictionPredicate http2EvictionPredicate = new Http2EvictionPredicate(poolFactory.maxLifeTime()); + this.parent = parent.mutate().evictionPredicate(http11EvictionPredicate).build(); + poolConfigFunction = poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy(), + http2EvictionPredicate, poolFactory.maxIdleTime()); + } + else { + this.parent = parent; + poolConfigFunction = poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()); + } this.remoteAddress = remoteAddress; this.resolver = resolver; this.pool = id == null ? - poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) : + poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, poolConfigFunction) : poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - new MicrometerPoolMetricsRecorder(id, name, remoteAddress), - poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())); + new MicrometerPoolMetricsRecorder(id, name, remoteAddress), poolConfigFunction); } Publisher connectChannel() { @@ -600,5 +654,41 @@ Publisher connectChannel() { (connection, metadata) -> false; static final Function> DEFAULT_DESTROY_HANDLER = connection -> Mono.empty(); + + static final class Http11EvictionPredicate implements BiPredicate { + final long maxIdleTime; + final long maxLifeTime; + + Http11EvictionPredicate(long maxIdleTime, long maxLifeTime) { + this.maxIdleTime = maxIdleTime; + this.maxLifeTime = maxLifeTime; + } + + @Override + public boolean test(Connection connection, ConnectionMetadata meta) { + ConnectionObserver owner = connection.channel().attr(OWNER).get(); + boolean isNotHttp2 = true; + if (owner instanceof DisposableAcquire) { + Http2Pool.Http2PooledRef http2PooledRef = http2PooledRef(((DisposableAcquire) owner).pooledRef); + ChannelHandlerContext frameCodec = http2PooledRef.slot.http2FrameCodecCtx(); + isNotHttp2 = frameCodec == null; + } + return (isNotHttp2 && maxIdleTime != -1 && meta.idleTime() >= maxIdleTime) + || (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime); + } + } + + static final class Http2EvictionPredicate implements BiPredicate { + final long maxLifeTime; + + Http2EvictionPredicate(long maxLifeTime) { + this.maxLifeTime = maxLifeTime; + } + + @Override + public boolean test(Connection connection, PooledRefMetadata meta) { + return maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime; + } + } } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index fa7af532d8..418f5d8569 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.BiPredicate; import java.util.function.Function; import io.netty.channel.Channel; @@ -157,12 +158,19 @@ class Http2Pool implements InstrumentedPool, InstrumentedPool.PoolMe final Long maxConcurrentStreams; final int minConnections; final PoolConfig poolConfig; + final BiPredicate evictionPredicate; + final long maxIdleTime; long lastInteractionTimestamp; Disposable evictionTask; Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy allocationStrategy) { + this(poolConfig, allocationStrategy, null, -1); + } + + Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy allocationStrategy, + @Nullable BiPredicate evictionPredicate, long maxIdleTime) { this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); @@ -171,6 +179,8 @@ class Http2Pool implements InstrumentedPool, InstrumentedPool.PoolMe this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum(); this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; + this.evictionPredicate = evictionPredicate; + this.maxIdleTime = maxIdleTime; recordInteractionTimestamp(); scheduleEviction(); @@ -614,7 +624,9 @@ Slot findConnection(ConcurrentLinkedQueue resources) { } boolean testEvictionPredicate(Slot slot) { - return poolConfig.evictionPredicate().test(slot.connection, slot); + return evictionPredicate == null ? + poolConfig.evictionPredicate().test(slot.connection, slot) : + evictionPredicate.test(slot.connection, slot); } static void pendingAcquireLimitReached(Borrower borrower, int maxPending) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 4a6fc5dc2f..c1b2c0f666 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -65,10 +65,13 @@ import reactor.netty.channel.AbstractChannelMetricsHandler; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; +import reactor.netty.http.Http2ConnectionLiveness; import reactor.netty.http.Http2SettingsSpec; import reactor.netty.http.Http3SettingsSpec; +import reactor.netty.http.HttpConnectionLiveness; import reactor.netty.http.HttpProtocol; import reactor.netty.http.HttpResources; +import reactor.netty.http.IdleTimeoutHandler; import reactor.netty.http.logging.HttpMessageLogFactory; import reactor.netty.http.logging.ReactorNettyHttpMessageLogFactory; import reactor.netty.http.server.compression.HttpCompressionOptionsSpec; @@ -728,7 +731,10 @@ static void configureH2Pipeline(ChannelPipeline p, cookieDecoder, cookieEncoder, errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue))); - IdleTimeoutHandler.addIdleTimeoutHandler(p, idleTimeout); + IdleTimeoutHandler.addIdleTimeoutHandler(p, idleTimeout, + http2SettingsSpec != null && http2SettingsSpec.pingAckTimeout() != null ? + new Http2ConnectionLiveness(http2FrameCodec, http2SettingsSpec.pingAckDropThreshold(), http2SettingsSpec.pingAckTimeout().toNanos()) : + HttpConnectionLiveness.CLOSE); if (metricsRecorder != null) { if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder) { @@ -780,10 +786,10 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressionOptions, compressPredicate, cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, - errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, listener, mapHandle, + errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, idleTimeout, http2SettingsSpec, httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue, decoder.validateHeaders()); - ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null); + ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null, idleTimeout, http2SettingsSpec); HttpServerUpgradeHandler httpServerUpgradeHandler = readTimeout == null && requestTimeout == null ? new HttpServerUpgradeHandler(httpServerCodec, upgrader, decoder.h2cMaxContentLength()) : @@ -999,12 +1005,14 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { final boolean addHttp2FrameCodec; final boolean removeMetricsHandler; final Long maxStreams; + final Duration idleTimeout; + final Http2SettingsSpec http2SettingsSpec; /** * Used when full H2 preface is received. */ - H2CleartextCodec(Http11OrH2CleartextCodec upgrader, @Nullable Long maxStreams) { - this(upgrader, true, true, maxStreams); + H2CleartextCodec(Http11OrH2CleartextCodec upgrader, @Nullable Long maxStreams, @Nullable Duration idleTimeout, @Nullable Http2SettingsSpec http2SettingsSpec) { + this(upgrader, true, true, maxStreams, idleTimeout, http2SettingsSpec); } /** @@ -1012,11 +1020,13 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { * is added by {@link Http2ServerUpgradeCodec} */ H2CleartextCodec(Http11OrH2CleartextCodec upgrader, boolean addHttp2FrameCodec, boolean removeMetricsHandler, - @Nullable Long maxStreams) { + @Nullable Long maxStreams, @Nullable Duration idleTimeout, @Nullable Http2SettingsSpec http2SettingsSpec) { this.upgrader = upgrader; this.addHttp2FrameCodec = addHttp2FrameCodec; this.removeMetricsHandler = removeMetricsHandler; this.maxStreams = maxStreams; + this.idleTimeout = idleTimeout; + this.http2SettingsSpec = http2SettingsSpec; } @Override @@ -1053,6 +1063,14 @@ public void handlerAdded(ChannelHandlerContext ctx) { } pipeline.remove(NettyPipeline.HttpTrafficHandler); pipeline.remove(NettyPipeline.ReactiveBridge); + + if (idleTimeout != null) { + IdleTimeoutHandler.removeIdleTimeoutHandler(pipeline); + IdleTimeoutHandler.addIdleTimeoutHandler(pipeline, idleTimeout, + http2SettingsSpec != null && http2SettingsSpec.pingAckTimeout() != null ? + new Http2ConnectionLiveness(upgrader.http2FrameCodec, http2SettingsSpec.pingAckDropThreshold(), http2SettingsSpec.pingAckTimeout().toNanos()) : + HttpConnectionLiveness.CLOSE); + } } } @@ -1162,6 +1180,8 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final Duration readTimeout; final Duration requestTimeout; final Function uriTagValue; + final Duration idleTimeout; + final Http2SettingsSpec http2SettingsSpec; Http11OrH2CleartextCodec( boolean accessLogEnabled, @@ -1176,6 +1196,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer @Nullable Function errorLog, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, + @Nullable Duration idleTimeout, @Nullable Http2SettingsSpec http2SettingsSpec, HttpMessageLogFactory httpMessageLogFactory, ConnectionObserver listener, @@ -1242,6 +1263,8 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer this.readTimeout = readTimeout; this.requestTimeout = requestTimeout; this.uriTagValue = uriTagValue; + this.idleTimeout = idleTimeout; + this.http2SettingsSpec = http2SettingsSpec; } /** @@ -1259,7 +1282,7 @@ protected void initChannel(Channel ch) { @Nullable public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { - return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false, maxStreams)); + return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false, maxStreams, idleTimeout, http2SettingsSpec)); } else { return null; @@ -1379,7 +1402,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { // When the server is configured with HTTP/1.1 and H2 and HTTP/1.1 is negotiated, // when channelActive event happens, this HttpTrafficHandler is still not in the pipeline, // and will not be able to add IdleTimeoutHandler. So in this use case add IdleTimeoutHandler here. - IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout); + IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout, HttpConnectionLiveness.CLOSE); return; } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java index 7079ac2ea2..dad5020201 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java @@ -51,6 +51,8 @@ import reactor.netty.ConnectionObserver; import reactor.netty.ReactorNetty; import reactor.netty.channel.ChannelOperations; +import reactor.netty.http.HttpConnectionLiveness; +import reactor.netty.http.IdleTimeoutHandler; import reactor.netty.http.logging.HttpMessageArgProviderFactory; import reactor.netty.http.logging.HttpMessageLogFactory; import reactor.netty.http.server.compression.HttpCompressionOptionsSpec; @@ -155,7 +157,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @Override public void channelActive(ChannelHandlerContext ctx) { - IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout); + IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout, HttpConnectionLiveness.CLOSE); ctx.fireChannelActive(); } @@ -546,7 +548,7 @@ void handleLastHttpContent(Object msg, ChannelPromise promise) { ctx.executor().execute(this); } else { - IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout); + IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout, HttpConnectionLiveness.CLOSE); ctx.read(); } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/IdleTimeoutHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/IdleTimeoutHandler.java deleted file mode 100644 index d31f6310b8..0000000000 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/IdleTimeoutHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.http.server; - -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.HttpServerUpgradeHandler; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.IdleStateHandler; -import reactor.netty.NettyPipeline; -import reactor.util.annotation.Nullable; - -import java.time.Duration; -import java.util.concurrent.TimeUnit; - -import static reactor.netty.ReactorNetty.format; - -final class IdleTimeoutHandler extends IdleStateHandler { - - IdleTimeoutHandler(long idleTimeout) { - super(idleTimeout, 0, 0, TimeUnit.MILLISECONDS); - } - - @Override - @SuppressWarnings("FutureReturnValueIgnored") - protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { - if (evt.state() == IdleState.READER_IDLE) { - if (HttpServerOperations.log.isDebugEnabled()) { - HttpServerOperations.log.debug(format(ctx.channel(), - "Connection was idle for [{}ms], as per configuration the connection will be closed."), - getReaderIdleTimeInMillis()); - } - // FutureReturnValueIgnored is deliberate - ctx.close(); - } - ctx.fireUserEventTriggered(evt); - } - - static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout) { - if (idleTimeout != null && pipeline.get(NettyPipeline.IdleTimeoutHandler) == null) { - String baseName = null; - if (pipeline.get(NettyPipeline.HttpCodec) != null) { - baseName = NettyPipeline.HttpCodec; - } - else { - ChannelHandler httpServerUpgradeHandler = pipeline.get(HttpServerUpgradeHandler.class); - if (httpServerUpgradeHandler != null) { - baseName = pipeline.context(httpServerUpgradeHandler).name(); - } - else { - ChannelHandler httpServerCodec = pipeline.get(HttpServerCodec.class); - if (httpServerCodec != null) { - baseName = pipeline.context(httpServerCodec).name(); - } - } - } - - pipeline.addAfter(baseName, - NettyPipeline.IdleTimeoutHandler, - new IdleTimeoutHandler(idleTimeout.toMillis())); - } - } - - static void removeIdleTimeoutHandler(ChannelPipeline pipeline) { - if (pipeline.get(NettyPipeline.IdleTimeoutHandler) != null) { - pipeline.remove(NettyPipeline.IdleTimeoutHandler); - } - } -} diff --git a/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json b/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json index 65c4c74cdd..b4a26c7a4e 100644 --- a/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json +++ b/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json @@ -260,9 +260,9 @@ }, { "condition": { - "typeReachable": "reactor.netty.http.server.IdleTimeoutHandler" + "typeReachable": "reactor.netty.http.IdleTimeoutHandler" }, - "name": "reactor.netty.http.server.IdleTimeoutHandler", + "name": "reactor.netty.http.IdleTimeoutHandler", "queryAllPublicMethods": true }, { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java new file mode 100644 index 0000000000..eb1db550f9 --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java @@ -0,0 +1,432 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http2.DefaultHttp2PingFrame; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2PingFrame; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Named; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Mono; +import reactor.function.Consumer3; +import reactor.netty.BaseHttpTest; +import reactor.netty.NettyPipeline; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.server.HttpServer; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.tcp.SslProvider; +import reactor.test.StepVerifier; +import reactor.util.annotation.Nullable; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.netty.handler.codec.http.HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * This test class verifies {@link HttpConnectionLiveness} with server side. + * + * @author raccoonback + * @author Violeta Georgieva + * @since 1.2.12 + */ +class Http2ConnectionLivenessTest extends BaseHttpTest { + + static Http2SslContextSpec clientCtx2; + static Http2SslContextSpec serverCtx2; + static SelfSignedCertificate ssc; + + @BeforeAll + static void createSelfSignedCertificate() throws Exception { + ssc = new SelfSignedCertificate(); + clientCtx2 = Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + serverCtx2 = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + } + + @ParameterizedTest + @MethodSource("http2CompatibleProtocols") + void serverPingAckFrameWithinThreshold(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { + Http2PingFrameHandler clientHandler = new Http2PingFrameHandler((ctx, frame, receivedPingTimes) -> { + if (receivedPingTimes.size() % 2 == 0) { + ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } + }); + testServer(clientCtx, clientHandler, clientProtocols, 2, serverCtx, + builder -> builder.pingAckTimeout(Duration.ofMillis(100)) + .pingAckDropThreshold(2), + serverProtocols, "serverPingAckFrameWithinThreshold", Duration.ofMillis(600)); + } + + @ParameterizedTest + @MethodSource("http2CompatibleProtocols") + void clientPingAckFrameWithinThreshold(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { + Http2PingFrameHandler serverHandler = new Http2PingFrameHandler((ctx, frame, receivedPingTimes) -> { + if (receivedPingTimes.size() % 2 == 0) { + ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } + }); + testClient(clientCtx, + builder -> builder.pingAckTimeout(Duration.ofMillis(100)) + .pingAckDropThreshold(2), + clientProtocols, Duration.ZERO, serverCtx, serverHandler, serverProtocols, 2, + "clientPingAckFrameWithinThreshold", Duration.ofMillis(600)); + } + + @ParameterizedTest + @MethodSource("http2CompatibleProtocols") + void serverAckPingFrameWithinTimeout(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { + testServer(clientCtx, new Http2PingFrameHandler(), clientProtocols, 1, serverCtx, + builder -> builder.pingAckTimeout(Duration.ofMillis(100)), + serverProtocols, "serverAckPingFrameWithinTimeout", Duration.ofMillis(500)); + } + + @ParameterizedTest + @MethodSource("http2CompatibleProtocols") + void clientAckPingFrameWithinTimeout(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { + testClient(clientCtx, builder -> builder.pingAckTimeout(Duration.ofMillis(100)), + clientProtocols, Duration.ZERO, serverCtx, new Http2PingFrameHandler(), serverProtocols, 1, + "clientAckPingFrameWithinTimeout", Duration.ofMillis(500)); + } + + @ParameterizedTest + @MethodSource("http2CompatibleProtocols") + void serverCloseConnectionIfPingFrameDelayed(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { + Http2PingFrameHandler clientHandler = new Http2PingFrameHandler((ctx, frame, receivedPingTimes) -> {}); + testServer(clientCtx, clientHandler, clientProtocols, 1, serverCtx, + builder -> builder.pingAckTimeout(Duration.ofMillis(100)), + serverProtocols, "serverCloseConnectionIfPingFrameDelayed", null); + + clientHandler = new Http2PingFrameHandler((ctx, frame, receivedPingTimes) -> {}); + testServer(clientCtx, clientHandler, clientProtocols, 2, serverCtx, + builder -> builder.pingAckTimeout(Duration.ofMillis(100)) + .pingAckDropThreshold(2), + serverProtocols, "serverCloseConnectionIfPingFrameDelayed", null); + } + + @ParameterizedTest + @MethodSource("http2CompatibleProtocols") + void clientCloseConnectionIfPingFrameDelayed(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { + Http2PingFrameHandler serverHandler = new Http2PingFrameHandler((ctx, frame, receivedPingTimes) -> {}); + testClient(clientCtx, builder -> builder.pingAckTimeout(Duration.ofMillis(100)), + clientProtocols, Duration.ZERO, serverCtx, serverHandler, serverProtocols, 1, + "clientCloseConnectionIfPingFrameDelayed", null); + + serverHandler = new Http2PingFrameHandler((ctx, frame, receivedPingTimes) -> {}); + testClient(clientCtx, + builder -> builder.pingAckTimeout(Duration.ofMillis(100)) + .pingAckDropThreshold(2), + clientProtocols, Duration.ZERO, serverCtx, serverHandler, serverProtocols, 2, + "clientCloseConnectionIfPingFrameDelayed", null); + } + + @ParameterizedTest + @MethodSource("http2CompatibleProtocols") + void serverCloseConnectionWithoutPingCheckWhenNotConfigured(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { + testServer(clientCtx, new Http2PingFrameHandler(), clientProtocols, 0, serverCtx, null, + serverProtocols, "serverCloseConnectionWithoutPingCheckWhenNotConfigured", null); + } + + @ParameterizedTest + @MethodSource("http2CompatibleProtocols") + void clientCloseConnectionWithoutPingCheckWhenNotConfigured(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { + testClient(clientCtx, null, + clientProtocols, Duration.ofMillis(300), serverCtx, new Http2PingFrameHandler(), serverProtocols, 0, + "clientCloseConnectionWithoutPingCheckWhenNotConfigured", null); + } + + @SuppressWarnings("deprecation") + void testClient( + @Nullable SslProvider.ProtocolSslContextSpec clientCtx, + @Nullable Consumer clientHttp2Settings, + HttpProtocol[] clientProtocols, + Duration evictionInBackground, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, + Http2PingFrameHandler serverHandler, + HttpProtocol[] serverProtocols, + int serverReceivedPingTimes, + String serverResponse, + @Nullable Duration waitTime) throws Exception { + disposableServer = + (serverCtx == null ? createServer() : createServer().secure(spec -> spec.sslContext(serverCtx))) + .protocol(serverProtocols) + .doOnConnection(conn -> { + Channel channel = conn.channel() instanceof Http2StreamChannel ? conn.channel().parent() : conn.channel(); + Http2FrameCodec http2FrameCodec = channel.pipeline().get(Http2FrameCodec.class); + if (http2FrameCodec != null) { + if (serverHandler.consumer != null) { + setValueReflection(http2FrameCodec.decoder(), true); + } + channel.pipeline().addLast(serverHandler); + } + }) + .handle((req, resp) -> resp.sendString(Mono.just(serverResponse))) + .bindNow(); + + ConnectionProvider provider = + ConnectionProvider.builder("testClient") + .maxIdleTime(Duration.ofMillis(300)) + .evictInBackground(evictionInBackground) + .build(); + try { + HttpClient client = (clientCtx == null ? + createClient(provider, disposableServer::address) : + createClient(provider, disposableServer::address).secure(spec -> spec.sslContext(clientCtx))); + + if (clientHttp2Settings != null) { + client = client.http2Settings(clientHttp2Settings); + } + + AtomicReference connectedClientChannel = new AtomicReference<>(); + CountDownLatch connectedClientChannelClosed = new CountDownLatch(1); + client.protocol(clientProtocols) + .doOnResponse((req, conn) -> { + if (waitTime != null) { + connectedClientChannel.set(conn.channel()); + } + else { + conn.channel().parent().closeFuture() + .addListener(f -> connectedClientChannelClosed.countDown()); + } + }) + .get() + .uri("/") + .responseContent() + .aggregate() + .asString() + .as(StepVerifier::create) + .expectNext(serverResponse) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + if (waitTime != null) { + Mono.delay(waitTime) + .block(); + + assertThat(connectedClientChannel.get()).isNotNull(); + assertThat(connectedClientChannel.get().parent().isOpen()).isEqualTo(true); + } + else { + assertThat(connectedClientChannelClosed.await(5, TimeUnit.SECONDS)).isTrue(); + } + + assertThat(serverHandler.getReceivedPingTimes()).hasSize(serverReceivedPingTimes); + } + finally { + provider.disposeLater() + .block(Duration.ofSeconds(5)); + } + } + + @SuppressWarnings("deprecation") + void testServer( + @Nullable SslProvider.ProtocolSslContextSpec clientCtx, + Http2PingFrameHandler clientHandler, + HttpProtocol[] clientProtocols, + int clientReceivedPingTimes, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, + @Nullable Consumer serverHttp2Settings, + HttpProtocol[] serverProtocols, + String serverResponse, + @Nullable Duration waitTime) throws Exception { + AtomicReference connectedServerChannel = new AtomicReference<>(); + CountDownLatch connectedServerChannelClosed = new CountDownLatch(1); + HttpServer server = + (serverCtx == null ? createServer() : createServer().secure(spec -> spec.sslContext(serverCtx))) + .protocol(serverProtocols) + .doOnConnection(conn -> { + if (waitTime != null) { + connectedServerChannel.set(conn.channel()); + } + else { + conn.channel().parent().closeFuture() + .addListener(f -> connectedServerChannelClosed.countDown()); + } + }) + .idleTimeout(Duration.ofMillis(300)); + if (serverHttp2Settings != null) { + server = server.http2Settings(serverHttp2Settings); + } + disposableServer = + server.handle((req, resp) -> resp.sendString(Mono.just(serverResponse))) + .bindNow(); + + (clientCtx == null ? createClient(disposableServer::address) : createClient(disposableServer::address).secure(spec -> spec.sslContext(clientCtx))) + .protocol(clientProtocols) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = channel.pipeline().get(Http2FrameCodec.class); + if (http2FrameCodec != null) { + if (clientHandler.consumer != null) { + setValueReflection(http2FrameCodec.decoder(), false); + } + channel.pipeline().addLast(clientHandler); + } + else if (channel.pipeline().get(NettyPipeline.H2OrHttp11Codec) != null) { + channel.pipeline().addAfter(NettyPipeline.H2OrHttp11Codec, "test", new ChannelInboundHandlerAdapter() { + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.fireChannelActive(); + Http2FrameCodec http2FrameCodec = channel.pipeline().get(Http2FrameCodec.class); + if (http2FrameCodec != null) { + if (clientHandler.consumer != null) { + setValueReflection(http2FrameCodec.decoder(), false); + } + channel.pipeline().addLast(clientHandler); + } + ctx.channel().pipeline().remove(this); + } + }); + } + else if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null) { + channel.pipeline().addAfter(NettyPipeline.H2CUpgradeHandler, "test", new ChannelInboundHandlerAdapter() { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + ctx.fireUserEventTriggered(evt); + if (evt == UPGRADE_SUCCESSFUL) { + Http2FrameCodec http2FrameCodec = channel.pipeline().get(Http2FrameCodec.class); + if (http2FrameCodec != null) { + if (clientHandler.consumer != null) { + setValueReflection(http2FrameCodec.decoder(), false); + } + channel.pipeline().addLast(clientHandler); + } + ctx.channel().pipeline().remove(this); + } + } + }); + } + }) + .get() + .uri("/") + .responseContent() + .aggregate() + .asString() + .as(StepVerifier::create) + .expectNext(serverResponse) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + if (waitTime != null) { + Mono.delay(waitTime) + .block(); + + assertThat(connectedServerChannel.get()).isNotNull(); + assertThat(connectedServerChannel.get().parent().isOpen()).isEqualTo(true); + } + else { + assertThat(connectedServerChannelClosed.await(5, TimeUnit.SECONDS)).isTrue(); + } + + assertThat(clientHandler.getReceivedPingTimes()).hasSize(clientReceivedPingTimes); + } + + static void setValueReflection(Object obj, boolean onServer) { + try { + Field delegate = obj.getClass().getSuperclass().getDeclaredField("delegate"); + delegate.setAccessible(true); + Object delegateObj = delegate.get(obj); + if (onServer) { + delegate = delegateObj.getClass().getSuperclass().getDeclaredField("delegate"); + delegate.setAccessible(true); + delegateObj = delegate.get(delegateObj); + } + Field field = delegateObj.getClass().getDeclaredField("autoAckPing"); + field.setAccessible(true); + field.setBoolean(delegateObj, false); + } + catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + static Stream http2CompatibleProtocols() { + return Stream.of( + Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C}, new HttpProtocol[]{HttpProtocol.H2C}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null) + ); + } + + private static final class Http2PingFrameHandler extends SimpleChannelInboundHandler { + + private final List receivedPingTimes = new ArrayList<>(); + + private final Consumer3> consumer; + + private Http2PingFrameHandler() { + this(null); + } + + private Http2PingFrameHandler(@Nullable Consumer3> consumer) { + this.consumer = consumer; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Http2PingFrame frame) { + receivedPingTimes.add(LocalDateTime.now(ZoneId.systemDefault())); + if (consumer != null) { + consumer.accept(ctx, frame, receivedPingTimes); + } + } + + List getReceivedPingTimes() { + return receivedPingTimes.stream() + .sorted() + .collect(Collectors.toList()); + } + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java index 43aaa62923..b93d36e739 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java @@ -19,6 +19,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -46,6 +48,8 @@ public void connectProtocolEnabled() { assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); assertThat(spec.maxStreams()).isNull(); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -64,6 +68,8 @@ void headerTableSize() { assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); assertThat(spec.maxStreams()).isNull(); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -89,6 +95,8 @@ void initialWindowSize() { assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); assertThat(spec.maxStreams()).isNull(); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -114,6 +122,8 @@ void maxConcurrentStreams() { assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); assertThat(spec.maxStreams()).isNull(); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -191,6 +201,8 @@ void maxFrameSize() { assertThat(spec.maxFrameSize()).isEqualTo(16384); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); assertThat(spec.maxStreams()).isNull(); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -216,6 +228,8 @@ void maxHeaderListSize() { assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(123); assertThat(spec.maxStreams()).isNull(); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -241,6 +255,8 @@ public void maxStreamsNoMaxConcurrentStreams() { assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -259,6 +275,8 @@ public void maxStreamsWithMaxConcurrentStreams_1() { assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); assertThat(spec.maxStreams()).isEqualTo(123); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -277,6 +295,8 @@ public void maxStreamsWithMaxConcurrentStreams_2() { assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); assertThat(spec.maxStreams()).isEqualTo(456); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -287,6 +307,51 @@ public void maxStreamsBadValues() { .withMessageContaining("maxStreams must be positive"); } + @Test + void pingAckDropThreshold() { + builder.pingAckDropThreshold(1); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.connectProtocolEnabled()).isNull(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isNull(); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isNull(); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + public void pingAckDropThresholdBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.pingAckDropThreshold(-1)) + .withMessageContaining("pingAckDropThreshold must be positive"); + } + + @Test + void pingAckTimeout() { + builder.pingAckTimeout(Duration.ofMillis(100)); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.connectProtocolEnabled()).isNull(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isNull(); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); + assertThat(spec.pingAckDropThreshold()).isEqualTo(1); + assertThat(spec.pingAckTimeout()).isEqualTo(Duration.ofMillis(100)); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + public void pingAckTimeoutBadValues() { + assertThatExceptionOfType(NullPointerException.class) + .isThrownBy(() -> builder.pingAckTimeout(null)); + } + /* @Test public void pushEnabled() { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java new file mode 100644 index 0000000000..50ca729b28 --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.server; + +import io.netty.channel.Channel; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Named; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Mono; +import reactor.netty.BaseHttpTest; +import reactor.netty.http.Http11SslContextSpec; +import reactor.netty.http.Http2SslContextSpec; +import reactor.netty.http.HttpProtocol; +import reactor.netty.tcp.SslProvider; +import reactor.test.StepVerifier; +import reactor.util.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +class HttpIdleTimeoutTest extends BaseHttpTest { + + static Http2SslContextSpec clientCtx2; + static Http11SslContextSpec clientCtx11; + static Http2SslContextSpec serverCtx2; + static Http11SslContextSpec serverCtx11; + static SelfSignedCertificate ssc; + + @BeforeAll + static void createSelfSignedCertificate() throws Exception { + ssc = new SelfSignedCertificate(); + clientCtx2 = Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + clientCtx11 = Http11SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + serverCtx2 = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + serverCtx11 = Http11SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + } + + @ParameterizedTest + @MethodSource("httpCompatibleProtocols") + @SuppressWarnings("deprecation") + void closedAfterIdleTimeout(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) { + AtomicReference channel = new AtomicReference<>(); + disposableServer = + (serverCtx == null ? createServer() : createServer().secure(spec -> spec.sslContext(serverCtx))) + .protocol(serverProtocols) + .idleTimeout(Duration.ofMillis(100)) + .handle((req, resp) -> + resp.withConnection(conn -> + channel.set(conn.channel() instanceof Http2StreamChannel ? conn.channel().parent() : conn.channel())) + .sendString(Mono.just("closedAfterIdleTimeout"))) + .bindNow(); + + (clientCtx == null ? createClient(disposableServer::address) : createClient(disposableServer::address).secure(spec -> spec.sslContext(clientCtx))) + .protocol(clientProtocols) + .get() + .uri("/") + .responseContent() + .aggregate() + .asString() + .as(StepVerifier::create) + .expectNext("closedAfterIdleTimeout") + .expectComplete() + .verify(Duration.ofSeconds(1)); + + Mono.delay(Duration.ofMillis(200)) + .block(); + + assertThat(channel.get()).isNotNull(); + assertThat(channel.get().isOpen()).isFalse(); + } + + @ParameterizedTest + @MethodSource("httpCompatibleProtocols") + @SuppressWarnings("deprecation") + void openedBeforeIdleTimeout(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) { + AtomicReference channel = new AtomicReference<>(); + disposableServer = + (serverCtx == null ? createServer() : createServer().secure(spec -> spec.sslContext(serverCtx))) + .protocol(serverProtocols) + .idleTimeout(Duration.ofMillis(500)) + .handle((req, resp) -> + resp.withConnection(conn -> + channel.set(conn.channel() instanceof Http2StreamChannel ? conn.channel().parent() : conn.channel())) + .sendString(Mono.just("openedBeforeIdleTimeout"))) + .bindNow(); + + (clientCtx == null ? createClient(disposableServer::address) : createClient(disposableServer::address).secure(spec -> spec.sslContext(clientCtx))) + .protocol(clientProtocols) + .get() + .uri("/") + .responseContent() + .aggregate() + .asString() + .as(StepVerifier::create) + .expectNext("openedBeforeIdleTimeout") + .expectComplete() + .verify(Duration.ofSeconds(1)); + + Mono.delay(Duration.ofMillis(100)) + .block(); + + assertThat(channel.get()).isNotNull(); + assertThat(channel.get().isOpen()).isTrue(); + } + + static Stream httpCompatibleProtocols() { + return Stream.of( + Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, + Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11)), + Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, + Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http11SslContextSpec", clientCtx11)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C}, new HttpProtocol[]{HttpProtocol.H2C}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null) + ); + } +} From 1aba391da2fd2e7ec6ea3c0fa9f6aae5f355a765 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva <696661+violetagg@users.noreply.github.com> Date: Fri, 24 Oct 2025 19:16:10 +0300 Subject: [PATCH 2/6] Fix checkstyle errors Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com> --- .../main/java/reactor/netty/http/Http2ConnectionLiveness.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java b/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java index fa726edb53..245f5deb56 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java @@ -169,8 +169,8 @@ public void operationComplete(ChannelFuture future) { "Will wait timeout and retry based on threshold."), lastSentPingData, pingAttempts); } } - - // Schedule timeout check - whether send succeeded or failed, + + // Schedule timeout check - whether send succeeded or failed, // wait the configured timeout before retry/close decision pingScheduler = invokeNextSchedule(); } From af0d64e46dad8870a3afe51dbe8b703573687b32 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva <696661+violetagg@users.noreply.github.com> Date: Wed, 29 Oct 2025 10:06:50 +0200 Subject: [PATCH 3/6] [test] Increase the waiting time in the tests Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com> --- .../java/reactor/netty/http/Http2ConnectionLivenessTest.java | 4 ++-- .../java/reactor/netty/http/server/HttpIdleTimeoutTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java index eb1db550f9..bc182e163e 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java @@ -242,7 +242,7 @@ void testClient( .as(StepVerifier::create) .expectNext(serverResponse) .expectComplete() - .verify(Duration.ofSeconds(5)); + .verify(Duration.ofSeconds(10)); if (waitTime != null) { Mono.delay(waitTime) @@ -350,7 +350,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { .as(StepVerifier::create) .expectNext(serverResponse) .expectComplete() - .verify(Duration.ofSeconds(5)); + .verify(Duration.ofSeconds(10)); if (waitTime != null) { Mono.delay(waitTime) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java index 50ca729b28..ae8391c9c2 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java @@ -84,7 +84,7 @@ void closedAfterIdleTimeout(HttpProtocol[] serverProtocols, HttpProtocol[] clien .as(StepVerifier::create) .expectNext("closedAfterIdleTimeout") .expectComplete() - .verify(Duration.ofSeconds(1)); + .verify(Duration.ofSeconds(5)); Mono.delay(Duration.ofMillis(200)) .block(); From 9cfe163badcdf4ed05fef71e01052e3487cbb33a Mon Sep 17 00:00:00 2001 From: Violeta Georgieva <696661+violetagg@users.noreply.github.com> Date: Wed, 29 Oct 2025 10:59:49 +0200 Subject: [PATCH 4/6] [test] Fix flaky test Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com> --- .../netty/http/Http2ConnectionLivenessTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java index bc182e163e..6592f5f571 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2ConnectionLivenessTest.java @@ -92,7 +92,7 @@ void serverPingAckFrameWithinThreshold(HttpProtocol[] serverProtocols, HttpProto testServer(clientCtx, clientHandler, clientProtocols, 2, serverCtx, builder -> builder.pingAckTimeout(Duration.ofMillis(100)) .pingAckDropThreshold(2), - serverProtocols, "serverPingAckFrameWithinThreshold", Duration.ofMillis(600)); + serverProtocols, "serverPingAckFrameWithinThreshold", Duration.ofMillis(550)); } @ParameterizedTest @@ -109,7 +109,7 @@ void clientPingAckFrameWithinThreshold(HttpProtocol[] serverProtocols, HttpProto builder -> builder.pingAckTimeout(Duration.ofMillis(100)) .pingAckDropThreshold(2), clientProtocols, Duration.ZERO, serverCtx, serverHandler, serverProtocols, 2, - "clientPingAckFrameWithinThreshold", Duration.ofMillis(600)); + "clientPingAckFrameWithinThreshold", Duration.ofMillis(550)); } @ParameterizedTest @@ -118,7 +118,7 @@ void serverAckPingFrameWithinTimeout(HttpProtocol[] serverProtocols, HttpProtoco @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { testServer(clientCtx, new Http2PingFrameHandler(), clientProtocols, 1, serverCtx, builder -> builder.pingAckTimeout(Duration.ofMillis(100)), - serverProtocols, "serverAckPingFrameWithinTimeout", Duration.ofMillis(500)); + serverProtocols, "serverAckPingFrameWithinTimeout", Duration.ofMillis(450)); } @ParameterizedTest @@ -127,7 +127,7 @@ void clientAckPingFrameWithinTimeout(HttpProtocol[] serverProtocols, HttpProtoco @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx) throws Exception { testClient(clientCtx, builder -> builder.pingAckTimeout(Duration.ofMillis(100)), clientProtocols, Duration.ZERO, serverCtx, new Http2PingFrameHandler(), serverProtocols, 1, - "clientAckPingFrameWithinTimeout", Duration.ofMillis(500)); + "clientAckPingFrameWithinTimeout", Duration.ofMillis(450)); } @ParameterizedTest @@ -249,7 +249,7 @@ void testClient( .block(); assertThat(connectedClientChannel.get()).isNotNull(); - assertThat(connectedClientChannel.get().parent().isOpen()).isEqualTo(true); + assertThat(connectedClientChannel.get().parent().isActive()).isEqualTo(true); } else { assertThat(connectedClientChannelClosed.await(5, TimeUnit.SECONDS)).isTrue(); @@ -357,7 +357,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { .block(); assertThat(connectedServerChannel.get()).isNotNull(); - assertThat(connectedServerChannel.get().parent().isOpen()).isEqualTo(true); + assertThat(connectedServerChannel.get().parent().isActive()).isEqualTo(true); } else { assertThat(connectedServerChannelClosed.await(5, TimeUnit.SECONDS)).isTrue(); From 8c4559f180a69447b0839dba9c3c8553df8db142 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva <696661+violetagg@users.noreply.github.com> Date: Wed, 29 Oct 2025 11:32:59 +0200 Subject: [PATCH 5/6] [test] Increase the waiting time in the tests Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com> --- .../java/reactor/netty/http/server/HttpIdleTimeoutTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java index ae8391c9c2..63bc66081e 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java @@ -119,7 +119,7 @@ void openedBeforeIdleTimeout(HttpProtocol[] serverProtocols, HttpProtocol[] clie .as(StepVerifier::create) .expectNext("openedBeforeIdleTimeout") .expectComplete() - .verify(Duration.ofSeconds(1)); + .verify(Duration.ofSeconds(5)); Mono.delay(Duration.ofMillis(100)) .block(); From 5480236a864673fd5b9b7b6ecc08e78f5d1d35b9 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva <696661+violetagg@users.noreply.github.com> Date: Wed, 29 Oct 2025 13:55:15 +0200 Subject: [PATCH 6/6] [test] Increase the waiting time in the tests Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com> --- .../java/reactor/netty/http/server/HttpIdleTimeoutTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java index 63bc66081e..9100c3b9cb 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java @@ -84,7 +84,7 @@ void closedAfterIdleTimeout(HttpProtocol[] serverProtocols, HttpProtocol[] clien .as(StepVerifier::create) .expectNext("closedAfterIdleTimeout") .expectComplete() - .verify(Duration.ofSeconds(5)); + .verify(Duration.ofSeconds(10)); Mono.delay(Duration.ofMillis(200)) .block(); @@ -119,7 +119,7 @@ void openedBeforeIdleTimeout(HttpProtocol[] serverProtocols, HttpProtocol[] clie .as(StepVerifier::create) .expectNext("openedBeforeIdleTimeout") .expectComplete() - .verify(Duration.ofSeconds(5)); + .verify(Duration.ofSeconds(10)); Mono.delay(Duration.ofMillis(100)) .block();