diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 8ddf7558e1..0d2b4b950c 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -59,11 +59,9 @@ where h1_parser_config: ParserConfig::default(), h1_max_headers: None, #[cfg(feature = "server")] - h1_header_read_timeout: None, + h1_header_read_timeout: TimeoutState::default(), #[cfg(feature = "server")] - h1_header_read_timeout_fut: None, - #[cfg(feature = "server")] - h1_header_read_timeout_running: false, + h1_graceful_shutdown_first_byte_read_timeout: TimeoutState::default(), #[cfg(feature = "server")] date_header: true, #[cfg(feature = "server")] @@ -144,7 +142,14 @@ where #[cfg(feature = "server")] pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) { - self.state.h1_header_read_timeout = Some(val); + self.state.h1_header_read_timeout.timeout = Some(val); + } + + #[cfg(feature = "server")] + pub(crate) fn set_http1_graceful_shutdown_first_byte_read_timeout(&mut self, val: Duration) { + self.state + .h1_graceful_shutdown_first_byte_read_timeout + .timeout = Some(val); } #[cfg(feature = "server")] @@ -209,6 +214,19 @@ where read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE } + fn close_if_inactive(&mut self) { + // When a graceful shutdown is triggered we wait for up to some + // `Duration` to allow for the client to begin transmitting bytes to the + // server. + // If that duration has elapsed and the connection is still idle, or + // no bytes have been received on the connection, then we close it. + // This prevents inactive connections from keeping the server alive + // despite having no intention of sending a request. + if self.is_idle() || self.has_initial_read_write_state() { + self.state.close(); + } + } + pub(super) fn poll_read_head( &mut self, cx: &mut Context<'_>, @@ -217,24 +235,50 @@ where trace!("Conn::read_head"); #[cfg(feature = "server")] - if !self.state.h1_header_read_timeout_running { - if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout { + if !self.state.h1_header_read_timeout.is_running { + if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout.timeout { + self.state.h1_header_read_timeout.is_running = true; let deadline = Instant::now() + h1_header_read_timeout; - self.state.h1_header_read_timeout_running = true; - match self.state.h1_header_read_timeout_fut { + match self.state.h1_header_read_timeout.deadline_fut { Some(ref mut h1_header_read_timeout_fut) => { trace!("resetting h1 header read timeout timer"); self.state.timer.reset(h1_header_read_timeout_fut, deadline); } None => { trace!("setting h1 header read timeout timer"); - self.state.h1_header_read_timeout_fut = + self.state.h1_header_read_timeout.deadline_fut = Some(self.state.timer.sleep_until(deadline)); } } } } + #[cfg(feature = "server")] + if !self + .state + .h1_graceful_shutdown_first_byte_read_timeout + .is_running + { + if let Some(h1_graceful_shutdown_timeout) = self + .state + .h1_graceful_shutdown_first_byte_read_timeout + .timeout + { + if h1_graceful_shutdown_timeout == Duration::from_secs(0) { + self.close_if_inactive(); + } else { + self.state + .h1_graceful_shutdown_first_byte_read_timeout + .is_running = true; + + let deadline = Instant::now() + h1_graceful_shutdown_timeout; + self.state + .h1_graceful_shutdown_first_byte_read_timeout + .deadline_fut = Some(self.state.timer.sleep_until(deadline)); + } + } + } + let msg = match self.io.parse::( cx, ParseContext { @@ -254,12 +298,12 @@ where Poll::Ready(Err(e)) => return self.on_read_head_error(e), Poll::Pending => { #[cfg(feature = "server")] - if self.state.h1_header_read_timeout_running { + if self.state.h1_header_read_timeout.is_running { if let Some(ref mut h1_header_read_timeout_fut) = - self.state.h1_header_read_timeout_fut + self.state.h1_header_read_timeout.deadline_fut { if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() { - self.state.h1_header_read_timeout_running = false; + self.state.h1_header_read_timeout.is_running = false; warn!("read header from client timeout"); return Poll::Ready(Some(Err(crate::Error::new_header_timeout()))); @@ -267,14 +311,34 @@ where } } + #[cfg(feature = "server")] + if self + .state + .h1_graceful_shutdown_first_byte_read_timeout + .is_running + { + if let Some(ref mut h1_graceful_shutdown_timeout_fut) = self + .state + .h1_graceful_shutdown_first_byte_read_timeout + .deadline_fut + { + if Pin::new(h1_graceful_shutdown_timeout_fut) + .poll(cx) + .is_ready() + { + self.close_if_inactive(); + } + } + } + return Poll::Pending; } }; #[cfg(feature = "server")] { - self.state.h1_header_read_timeout_running = false; - self.state.h1_header_read_timeout_fut = None; + self.state.h1_header_read_timeout.is_running = false; + self.state.h1_header_read_timeout.deadline_fut = None; } // Note: don't deconstruct `msg` into local variables, it appears @@ -872,15 +936,15 @@ where self.state.close_write(); } + #[cfg(feature = "server")] + pub(crate) fn is_idle(&mut self) -> bool { + self.state.is_idle() + } + #[cfg(feature = "server")] pub(crate) fn disable_keep_alive(&mut self) { - if self.state.is_idle() { - trace!("disable_keep_alive; closing idle connection"); - self.state.close(); - } else { - trace!("disable_keep_alive; in-progress connection"); - self.state.disable_keep_alive(); - } + trace!("disable_keep_alive"); + self.state.disable_keep_alive(); } pub(crate) fn take_error(&mut self) -> crate::Result<()> { @@ -926,11 +990,11 @@ struct State { h1_parser_config: ParserConfig, h1_max_headers: Option, #[cfg(feature = "server")] - h1_header_read_timeout: Option, + h1_header_read_timeout: TimeoutState, + /// If a graceful shutdown is initiated, and the `TimeoutState` duration has elapsed without + /// receiving any bytes from the client, the connection will be closed. #[cfg(feature = "server")] - h1_header_read_timeout_fut: Option>>, - #[cfg(feature = "server")] - h1_header_read_timeout_running: bool, + h1_graceful_shutdown_first_byte_read_timeout: TimeoutState, #[cfg(feature = "server")] date_header: bool, #[cfg(feature = "server")] @@ -1144,6 +1208,14 @@ impl State { } } +#[derive(Default)] +#[cfg(feature = "server")] +struct TimeoutState { + timeout: Option, + deadline_fut: Option>>, + is_running: bool, +} + #[cfg(test)] mod tests { #[cfg(all(feature = "nightly", not(miri)))] diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 79ea48be9f..ccb1c43b8f 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -1,3 +1,8 @@ +use crate::rt::{Read, Write}; +use bytes::{Buf, Bytes}; +use futures_util::ready; +use http::Request; +use std::time::Duration; use std::{ error::Error as StdError, future::Future, @@ -6,11 +11,6 @@ use std::{ task::{Context, Poll}, }; -use crate::rt::{Read, Write}; -use bytes::{Buf, Bytes}; -use futures_util::ready; -use http::Request; - use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, Incoming as IncomingBody}; #[cfg(feature = "client")] @@ -90,13 +90,12 @@ where #[cfg(feature = "server")] pub(crate) fn disable_keep_alive(&mut self) { self.conn.disable_keep_alive(); + } - // If keep alive has been disabled and no read or write has been seen on - // the connection yet, we must be in a state where the server is being asked to - // shut down before any data has been seen on the connection - if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() { - self.close(); - } + #[cfg(feature = "server")] + pub(crate) fn set_graceful_shutdown_first_byte_read_timeout(&mut self, read_timeout: Duration) { + self.conn + .set_http1_graceful_shutdown_first_byte_read_timeout(read_timeout); } pub(crate) fn into_inner(self) -> (I, Bytes, D) { diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 097497bf41..84713dca47 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -123,7 +123,8 @@ where B: Body + 'static, B::Error: Into>, { - /// Start a graceful shutdown process for this connection. + /// Start a graceful shutdown process for this connection, using the default + /// [`GracefulShutdownConfig`]. /// /// This `Connection` should continue to be polled until shutdown /// can finish. @@ -133,8 +134,29 @@ where /// This should only be called while the `Connection` future is still /// pending. If called after `Connection::poll` has resolved, this does /// nothing. - pub fn graceful_shutdown(mut self: Pin<&mut Self>) { + pub fn graceful_shutdown(self: Pin<&mut Self>) { + self.graceful_shutdown_with_config(GracefulShutdownConfig::default()); + } + + /// Start a graceful shutdown process for this connection. + /// + /// This `Connection` should continue to be polled until shutdown can finish. + /// + /// Requires a [`Timer`] set by [`Builder::timer`]. + /// + /// # Note + /// + /// This should only be called while the `Connection` future is still + /// pending. If called after `Connection::poll` has resolved, this does + /// nothing. + /// + /// # Panics + /// If [`GracefulShutdownConfig::first_byte_read_timeout`] was configured to greater than zero + /// nanoseconds, but no timer was set, then the `Connection` will panic when it is next polled. + pub fn graceful_shutdown_with_config(mut self: Pin<&mut Self>, config: GracefulShutdownConfig) { self.conn.disable_keep_alive(); + self.conn + .set_graceful_shutdown_first_byte_read_timeout(config.first_byte_read_timeout); } /// Return the inner IO object, and additional information. @@ -526,3 +548,73 @@ where } } } + +/// Configuration for graceful shutdowns. +/// +/// # Example +/// +/// ``` +/// # use hyper::{body::Incoming, Request, Response}; +/// # use hyper::service::Service; +/// # use hyper::server::conn::http1::Builder; +/// # use hyper::rt::{Read, Write}; +/// # use std::time::Duration; +/// # use hyper::server::conn::http1::GracefulShutdownConfig; +/// # async fn run(some_io: I, some_service: S) +/// # where +/// # I: Read + Write + Unpin + Send + 'static, +/// # S: Service, Response=hyper::Response> + Send + 'static, +/// # S::Error: Into>, +/// # S::Future: Send, +/// # { +/// let http = Builder::new(); +/// let conn = http.serve_connection(some_io, some_service); +/// +/// let mut config = GracefulShutdownConfig::default(); +/// config.first_byte_read_timeout(Duration::from_secs(2)); +/// +/// conn.graceful_shutdown_with_config(config); +/// conn.await.unwrap(); +/// # } +/// # fn main() {} +/// ``` +#[derive(Debug)] +pub struct GracefulShutdownConfig { + first_byte_read_timeout: Duration, +} +impl Default for GracefulShutdownConfig { + fn default() -> Self { + GracefulShutdownConfig { + first_byte_read_timeout: Duration::from_secs(0), + } + } +} +impl GracefulShutdownConfig { + /// It is possible for a client to open a connection and begin transmitting bytes, but have the + /// server initiate a graceful shutdown just before it sees any of the client's bytes. + /// + /// The more traffic that a server receives, the more likely this race condition is to occur for + /// some of the open connections. + /// + /// The `first_byte_read_timeout` controls how long the server waits for the first bytes of a + /// final request to be received from the client. + /// + /// If no bytes were received from the client between the time that keep alive was disabled and + /// the `first_byte_timeout` duration, the connection is considered inactive and the server will + /// close it. + /// + /// # Recommendations + /// Servers are recommended to use a `first_byte_read_timeout` that reduces the likelihood of + /// the client receiving an error due to the connection closing just after they began + /// transmitting their final request. + /// For most internet connections, a roughly one second timeout should be enough time for the + /// server to begin receiving the client's request's bytes. + /// + /// # Default + /// A default of 0 seconds was chosen to remain backwards compatible with version of hyper that + /// did not have this `first_byte_read_timeout` configuration. + pub fn first_byte_read_timeout(&mut self, timeout: Duration) -> &mut Self { + self.first_byte_read_timeout = timeout; + self + } +} diff --git a/tests/server.rs b/tests/server.rs index f72cf62702..ecf5025d9f 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -27,6 +27,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream}; use hyper::body::{Body, Incoming as IncomingBody}; +use hyper::server::conn::http1::GracefulShutdownConfig; use hyper::server::conn::{http1, http2}; use hyper::service::{service_fn, Service}; use hyper::{Method, Request, Response, StatusCode, Uri, Version}; @@ -1348,6 +1349,38 @@ async fn http1_graceful_shutdown_after_upgrade() { conn.as_mut().graceful_shutdown(); } +/// When hyper reached 1.0 the `Connection::graceful_shutdown` did not require a timer. +/// It would immediately close connections that were idle or had not received any bytes. +/// A later release made it possible to wait some Duration before closing the inactive connection. +/// Here we confirm that `Connection::graceful_shutdown` does not require a timer. +#[tokio::test] +async fn http1_graceful_shutdown_no_timer_required_for_zero_second_next_byte_timeout() { + let (listener, addr) = setup_tcp_listener(); + + tokio::spawn(async move { + let mut stream = TkTcpStream::connect(addr).await.unwrap(); + + stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap(); + + let mut buf = vec![]; + stream.read_to_end(&mut buf).await.unwrap(); + }); + + let socket = listener.accept().await.unwrap().0; + let socket = TokioIo::new(socket); + + // Construct a builder that does not call the `.timer` method. + let future = http1::Builder::new().serve_connection(socket, HelloWorld); + pin!(future); + + future.as_mut().graceful_shutdown(); + + tokio::time::timeout(Duration::from_secs(5), future) + .await + .unwrap() + .unwrap(); +} + #[tokio::test] async fn empty_parse_eof_does_not_return_error() { let (listener, addr) = setup_tcp_listener(); @@ -2352,6 +2385,66 @@ async fn graceful_shutdown_before_first_request_no_block() { .expect("error receiving response"); } +#[cfg(feature = "http1")] +#[tokio::test] +async fn graceful_shutdown_grace_period_for_first_byte() { + // (Client wait before sending, Server first byte timeout, Expected completed response) + let test_cases = [ + // When the client sends bytes before the grace period we expect a response. + (500, 1000, true), + // When the client sends bytes after the grace period we do not expect a response. + (1000, 500, false), + ]; + for (client_wait_before_sending, server_first_byte_timeout, expected_to_respond) in test_cases { + let (listener, addr) = setup_tcp_listener(); + + let graceful_shutdown_first_byte_timeout = Duration::from_millis(server_first_byte_timeout); + let client_wait_before_sending = Duration::from_millis(client_wait_before_sending); + + tokio::spawn(async move { + let socket = listener.accept().await.unwrap().0; + let socket = TokioIo::new(socket); + + let future = http1::Builder::new() + .timer(TokioTimer) + .serve_connection(socket, HelloWorld); + pin!(future); + + let mut graceful_config = GracefulShutdownConfig::default(); + graceful_config.first_byte_read_timeout(graceful_shutdown_first_byte_timeout); + future + .as_mut() + .graceful_shutdown_with_config(graceful_config); + + future.await.unwrap(); + }); + + let mut stream = TkTcpStream::connect(addr).await.unwrap(); + + tokio::time::sleep(client_wait_before_sending).await; + stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap(); + + let mut buf = vec![]; + stream.read_to_end(&mut buf).await.unwrap(); + + if expected_to_respond { + assert!(buf.starts_with(b"HTTP/1.1 200 OK\r\nconnection: close")); + } else { + assert!(buf.is_empty()); + } + + // Since the server was gracefully shut down it should not respond to any further requests. + { + stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap(); + + let mut buf = vec![]; + stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf.len(), 0); + } + } +} + #[test] fn streaming_body() { use futures_util::StreamExt;