Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tcp user timeout config knob #14

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ phf = "0.11"
postgres-protocol = { version = "0.6.4", path = "../postgres-protocol" }
postgres-types = { version = "0.2.4", path = "../postgres-types" }
serde = { version = "1.0", optional = true }
socket2 = "0.4"
socket2 = { version = "0.4", features = ["all"] }
tokio = { version = "1.0", features = ["io-util"] }
tokio-util = { version = "0.7", features = ["codec"] }

Expand Down
1 change: 1 addition & 0 deletions tokio-postgres/src/cancel_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ where
&config.host,
config.port,
config.connect_timeout,
config.tcp_user_timeout,
config.keepalive.as_ref(),
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ pub(crate) struct SocketConfig {
pub host: Host,
pub port: u16,
pub connect_timeout: Option<Duration>,
pub tcp_user_timeout: Option<Duration>,
pub keepalive: Option<KeepaliveConfig>,
}

Expand Down
29 changes: 29 additions & 0 deletions tokio-postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ pub enum Host {
/// omitted or the empty string.
/// * `connect_timeout` - The time limit in seconds applied to each socket-level connection attempt. Note that hostnames
/// can resolve to multiple IP addresses, and this limit is applied to each address. Defaults to no timeout.
/// * `tcp_user_timeout` - The time limit that transmitted data may remain unacknowledged before a connection is forcibly closed.
/// This is ignored for Unix domain socket connections. It is only supported on systems where TCP_USER_TIMEOUT is available
/// and will default to the system default if omitted or set to 0; on other systems, it has no effect.
/// * `keepalives` - Controls the use of TCP keepalive. A value of 0 disables keepalive and nonzero integers enable it.
/// This option is ignored when connecting with Unix sockets. Defaults to on.
/// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server.
Expand Down Expand Up @@ -183,6 +186,7 @@ pub struct Config {
pub(crate) host: Vec<Host>,
pub(crate) port: Vec<u16>,
pub(crate) connect_timeout: Option<Duration>,
pub(crate) tcp_user_timeout: Option<Duration>,
pub(crate) keepalives: bool,
pub(crate) keepalive_config: KeepaliveConfig,
pub(crate) target_session_attrs: TargetSessionAttrs,
Expand Down Expand Up @@ -217,6 +221,7 @@ impl Config {
host: vec![],
port: vec![],
connect_timeout: None,
tcp_user_timeout: None,
keepalives: true,
keepalive_config,
target_session_attrs: TargetSessionAttrs::Any,
Expand Down Expand Up @@ -407,6 +412,21 @@ impl Config {
self.connect_timeout.as_ref()
}

/// Sets the TCP user timeout.
///
/// This is ignored for Unix domain socket connections. It is only supported on systems where
/// TCP_USER_TIMEOUT is available and will default to the system default if omitted or set to 0;
/// on other systems, it has no effect.
pub fn tcp_user_timeout(&mut self, tcp_user_timeout: Duration) -> &mut Config {
self.tcp_user_timeout = Some(tcp_user_timeout);
self
}

/// Gets the TCP user timeout, if one has been set with the
/// `user_timeout` method.
pub fn get_tcp_user_timeout(&self) -> Option<&Duration> {
self.tcp_user_timeout.as_ref()
}
/// Controls the use of TCP keepalive.
///
/// This is ignored for Unix domain socket connections. Defaults to `true`.
Expand Down Expand Up @@ -578,6 +598,14 @@ impl Config {
self.connect_timeout(Duration::from_secs(timeout as u64));
}
}
"tcp_user_timeout" => {
let timeout = value
.parse::<i64>()
.map_err(|_| Error::config_parse(Box::new(InvalidValue("tcp_user_timeout"))))?;
if timeout > 0 {
self.tcp_user_timeout(Duration::from_secs(timeout as u64));
}
}
"keepalives" => {
let keepalives = value
.parse::<u64>()
Expand Down Expand Up @@ -713,6 +741,7 @@ impl fmt::Debug for Config {
.field("host", &self.host)
.field("port", &self.port)
.field("connect_timeout", &self.connect_timeout)
.field("tcp_user_timeout", &self.tcp_user_timeout)
.field("keepalives", &self.keepalives)
.field("keepalives_idle", &self.keepalive_config.idle)
.field("keepalives_interval", &self.keepalive_config.interval)
Expand Down
2 changes: 2 additions & 0 deletions tokio-postgres/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ where
host,
port,
config.connect_timeout,
config.tcp_user_timeout,
if config.keepalives {
Some(&config.keepalive_config)
} else {
Expand Down Expand Up @@ -118,6 +119,7 @@ where
host: host.clone(),
port,
connect_timeout: config.connect_timeout,
tcp_user_timeout: config.tcp_user_timeout,
keepalive: if config.keepalives {
Some(config.keepalive_config.clone())
} else {
Expand Down
12 changes: 11 additions & 1 deletion tokio-postgres/src/connect_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub(crate) async fn connect_socket(
host: &Host,
port: u16,
connect_timeout: Option<Duration>,
tcp_user_timeout: Option<Duration>,
keepalive_config: Option<&KeepaliveConfig>,
) -> Result<Socket, Error> {
match host {
Expand All @@ -35,8 +36,17 @@ pub(crate) async fn connect_socket(
};

stream.set_nodelay(true).map_err(Error::connect)?;

let sock_ref = SockRef::from(&stream);
#[cfg(target_os = "linux")]
{
sock_ref
.set_tcp_user_timeout(tcp_user_timeout)
.map_err(Error::connect)?;
}

if let Some(keepalive_config) = keepalive_config {
SockRef::from(&stream)
sock_ref
.set_tcp_keepalive(&TcpKeepalive::from(keepalive_config))
.map_err(Error::connect)?;
}
Expand Down