diff --git a/src/common/grpc/src/dns_resolver.rs b/src/common/grpc/src/dns_resolver.rs index 8caae83188a8c..0a470e60c53c6 100644 --- a/src/common/grpc/src/dns_resolver.rs +++ b/src/common/grpc/src/dns_resolver.rs @@ -140,6 +140,13 @@ impl Future for DNSServiceFuture { } } +#[derive(Clone, Copy, Debug, Default)] +pub struct TcpKeepAliveConfig { + pub time: Option, + pub interval: Option, + pub retries: Option, +} + pub struct ConnectionFactory; impl ConnectionFactory { @@ -147,12 +154,24 @@ impl ConnectionFactory { addr: impl ToString, timeout: Option, rpc_client_config: Option, + keep_alive: Option, ) -> std::result::Result { let endpoint = Self::create_rpc_endpoint(addr, timeout, rpc_client_config)?; let mut inner_connector = HttpConnector::new_with_resolver(DNSService); inner_connector.set_nodelay(true); - inner_connector.set_keepalive(None); + match keep_alive { + Some(config) => { + inner_connector.set_keepalive(config.time); + inner_connector.set_keepalive_interval(config.interval); + inner_connector.set_keepalive_retries(config.retries); + } + None => { + inner_connector.set_keepalive(None); + inner_connector.set_keepalive_interval(None); + inner_connector.set_keepalive_retries(None); + } + } inner_connector.enforce_http(false); inner_connector.set_connect_timeout(timeout); diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index e511837113d00..b75d9a2731bce 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -20,6 +20,7 @@ pub use dns_resolver::ConnectionFactory; pub use dns_resolver::DNSResolver; pub use dns_resolver::DNSService; pub use dns_resolver::GrpcConnectionError; +pub use dns_resolver::TcpKeepAliveConfig; pub use grpc_token::GrpcClaim; pub use grpc_token::GrpcToken; diff --git a/src/meta/client/src/channel_manager.rs b/src/meta/client/src/channel_manager.rs index 416126f823470..c8ecc7f37d3e1 100644 --- a/src/meta/client/src/channel_manager.rs +++ b/src/meta/client/src/channel_manager.rs @@ -142,19 +142,24 @@ impl MetaChannelManager { async fn build_channel(&self, addr: &String) -> Result { info!("MetaChannelManager::build_channel to {}", addr); - let ch = ConnectionFactory::create_rpc_channel(addr, self.timeout, self.tls_config.clone()) - .await - .map_err(|e| match e { - GrpcConnectionError::InvalidUri { .. } => MetaNetworkError::BadAddressFormat( - AnyError::new(&e).add_context(|| "while creating rpc channel"), - ), - GrpcConnectionError::TLSConfigError { .. } => MetaNetworkError::TLSConfigError( - AnyError::new(&e).add_context(|| "while creating rpc channel"), - ), - GrpcConnectionError::CannotConnect { .. } => MetaNetworkError::ConnectionError( - ConnectionError::new(e, "while creating rpc channel"), - ), - })?; + let ch = ConnectionFactory::create_rpc_channel( + addr, + self.timeout, + self.tls_config.clone(), + None, + ) + .await + .map_err(|e| match e { + GrpcConnectionError::InvalidUri { .. } => MetaNetworkError::BadAddressFormat( + AnyError::new(&e).add_context(|| "while creating rpc channel"), + ), + GrpcConnectionError::TLSConfigError { .. } => MetaNetworkError::TLSConfigError( + AnyError::new(&e).add_context(|| "while creating rpc channel"), + ), + GrpcConnectionError::CannotConnect { .. } => MetaNetworkError::ConnectionError( + ConnectionError::new(e, "while creating rpc channel"), + ), + })?; Ok(ch) } } diff --git a/src/meta/client/tests/it/grpc_client.rs b/src/meta/client/tests/it/grpc_client.rs index 4b1198dc958f1..43bbf612da900 100644 --- a/src/meta/client/tests/it/grpc_client.rs +++ b/src/meta/client/tests/it/grpc_client.rs @@ -68,7 +68,7 @@ async fn test_grpc_client_handshake_timeout() { // our mock grpc server's handshake impl will sleep 2secs. // see: GrpcServiceForTestImp.handshake let timeout = Duration::from_secs(1); - let c = ConnectionFactory::create_rpc_channel(srv_addr.clone(), Some(timeout), None) + let c = ConnectionFactory::create_rpc_channel(srv_addr.clone(), Some(timeout), None, None) .await .unwrap(); @@ -86,7 +86,7 @@ async fn test_grpc_client_handshake_timeout() { // handshake success { let timeout = Duration::from_secs(3); - let c = ConnectionFactory::create_rpc_channel(srv_addr, Some(timeout), None) + let c = ConnectionFactory::create_rpc_channel(srv_addr, Some(timeout), None, None) .await .unwrap(); diff --git a/src/meta/service/src/meta_node/meta_node.rs b/src/meta/service/src/meta_node/meta_node.rs index c680686a1b71e..4b4151ee35943 100644 --- a/src/meta/service/src/meta_node/meta_node.rs +++ b/src/meta/service/src/meta_node/meta_node.rs @@ -937,7 +937,7 @@ impl MetaNode { addr, timeout ); - let chan_res = ConnectionFactory::create_rpc_channel(addr, timeout, None).await; + let chan_res = ConnectionFactory::create_rpc_channel(addr, timeout, None, None).await; let chan = match chan_res { Ok(c) => c, Err(e) => { diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_handshake.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_handshake.rs index 8a56d8bb131fd..1d9b414cbf376 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_handshake.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_handshake.rs @@ -52,8 +52,9 @@ async fn test_metasrv_handshake() -> anyhow::Result<()> { let (_tc, addr) = start_metasrv().await?; - let c = ConnectionFactory::create_rpc_channel(addr, Some(Duration::from_millis(1000)), None) - .await?; + let c = + ConnectionFactory::create_rpc_channel(addr, Some(Duration::from_millis(1000)), None, None) + .await?; let (mut client, _once) = MetaChannelManager::new_real_client(c); info!("--- client has smaller ver than S.min_cli_ver"); diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index f517de54b0a9d..0f1d85ba77d97 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -54,6 +54,7 @@ use databend_common_meta_store::MetaStoreProvider; use databend_common_meta_types::NodeInfo; use databend_common_meta_types::SeqV; use databend_common_metrics::cluster::*; +use databend_common_settings::FlightKeepAliveParams; use databend_common_settings::Settings; use databend_common_telemetry::report_node_telemetry; use databend_common_version::DATABEND_TELEMETRY_API_KEY; @@ -74,6 +75,7 @@ use serde::Serialize; use tokio::time::sleep; use crate::servers::flight::FlightClient; +use crate::servers::flight::keep_alive::build_keep_alive_config; pub struct ClusterDiscovery { local_id: String, @@ -188,7 +190,9 @@ impl ClusterHelper for Cluster { let mut attempt = 0; loop { - let mut conn = create_client(&config, &flight_address).await?; + let mut conn = + create_client(&config, &flight_address, flight_params.keep_alive) + .await?; match conn .do_action::<_, Res>( path, @@ -312,7 +316,13 @@ impl ClusterDiscovery { if config.query.check_connection_before_schedule && node.id != self.local_id { let start_at = Instant::now(); - if let Err(cause) = create_client(config, &node.flight_address).await { + if let Err(cause) = create_client( + config, + &node.flight_address, + FlightKeepAliveParams::default(), + ) + .await + { warn!( "Cannot connect node [{:?}] after {:?}s, remove it in query. cause: {:?}", node.flight_address, @@ -964,7 +974,11 @@ impl ClusterHeartbeat { } #[async_backtrace::framed] -pub async fn create_client(config: &InnerConfig, address: &str) -> Result { +pub async fn create_client( + config: &InnerConfig, + address: &str, + keep_alive: FlightKeepAliveParams, +) -> Result { let timeout = if config.query.rpc_client_timeout_secs > 0 { Some(Duration::from_secs(config.query.rpc_client_timeout_secs)) } else { @@ -976,9 +990,16 @@ pub async fn create_client(config: &InnerConfig, address: &str) -> Result(SET_PRIORITY, message, flight_params) diff --git a/src/query/service/src/interpreters/interpreter_system_action.rs b/src/query/service/src/interpreters/interpreter_system_action.rs index d996b635a528c..8f50332496599 100644 --- a/src/query/service/src/interpreters/interpreter_system_action.rs +++ b/src/query/service/src/interpreters/interpreter_system_action.rs @@ -81,6 +81,7 @@ impl Interpreter for SystemActionInterpreter { timeout: settings.get_flight_client_timeout()?, retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, + keep_alive: settings.get_flight_keep_alive_params()?, }; warehouse .do_action::<_, ()>(SYSTEM_ACTION, message, flight_params) diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 1c704dcc3247e..92a9d84868635 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -100,6 +100,7 @@ impl Interpreter for TruncateTableInterpreter { timeout: settings.get_flight_client_timeout()?, retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, + keep_alive: settings.get_flight_keep_alive_params()?, }; warehouse .do_action::<_, ()>(TRUNCATE_TABLE, message, flight_params) diff --git a/src/query/service/src/servers/admin/v1/query_dump.rs b/src/query/service/src/servers/admin/v1/query_dump.rs index fdd0f46085c68..5fa9871b9c87b 100644 --- a/src/query/service/src/servers/admin/v1/query_dump.rs +++ b/src/query/service/src/servers/admin/v1/query_dump.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use databend_common_config::GlobalConfig; use databend_common_exception::Result; +use databend_common_settings::FlightKeepAliveParams; use http::StatusCode; use poem::IntoResponse; use poem::web::Json; @@ -65,6 +66,7 @@ async fn get_running_query_dump(query_id: &str) -> Result(GET_RUNNING_QUERY_DUMP, message, flight_params) diff --git a/src/query/service/src/servers/flight/keep_alive.rs b/src/query/service/src/servers/flight/keep_alive.rs new file mode 100644 index 0000000000000..5408a8876c1ad --- /dev/null +++ b/src/query/service/src/servers/flight/keep_alive.rs @@ -0,0 +1,28 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_grpc::TcpKeepAliveConfig; +use databend_common_settings::FlightKeepAliveParams; + +pub fn build_keep_alive_config(params: FlightKeepAliveParams) -> Option { + if params.is_disabled() { + None + } else { + Some(TcpKeepAliveConfig { + time: params.time, + interval: params.interval, + retries: params.retries, + }) + } +} diff --git a/src/query/service/src/servers/flight/mod.rs b/src/query/service/src/servers/flight/mod.rs index 177c747d6d8ec..8a44ec605dd20 100644 --- a/src/query/service/src/servers/flight/mod.rs +++ b/src/query/service/src/servers/flight/mod.rs @@ -14,6 +14,7 @@ mod flight_client; mod flight_service; +pub(crate) mod keep_alive; mod request_builder; pub mod v1; diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 815f26072c065..b7ac693863ac9 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -39,6 +39,7 @@ use databend_common_grpc::ConnectionFactory; use databend_common_pipeline::core::ExecutionInfo; use databend_common_pipeline::core::always_callback; use databend_common_pipeline::core::basic_callback; +use databend_common_settings::FlightKeepAliveParams; use fastrace::prelude::*; use log::warn; use parking_lot::Mutex; @@ -67,6 +68,7 @@ use crate::servers::flight::FlightClient; use crate::servers::flight::FlightExchange; use crate::servers::flight::FlightReceiver; use crate::servers::flight::FlightSender; +use crate::servers::flight::keep_alive::build_keep_alive_config; use crate::servers::flight::v1::actions::INIT_QUERY_FRAGMENTS; use crate::servers::flight::v1::actions::START_PREPARED_QUERY; use crate::servers::flight::v1::actions::init_query_fragments; @@ -184,6 +186,11 @@ impl DataExchangeManager { let config = GlobalConfig::instance(); let with_cur_rt = env.create_rpc_clint_with_current_rt; + let settings = match ctx { + Some(ref ctx) => ctx.get_settings(), + None => env.settings.clone(), + }; + let keep_alive = settings.get_flight_keep_alive_params()?; let mut request_exchanges = HashMap::new(); let mut targets_exchanges = HashMap::>::new(); @@ -203,8 +210,10 @@ impl DataExchangeManager { let query_id = env.query_id.clone(); let address = source.flight_address.clone(); + let keep_alive_params = keep_alive; flight_exchanges.push(async move { - let mut flight_client = Self::create_client(&address, with_cur_rt).await?; + let mut flight_client = + Self::create_client(&address, with_cur_rt, keep_alive_params).await?; Ok::(match edge { Edge::Fragment(channel) => QueryExchange::Fragment { @@ -326,9 +335,14 @@ impl DataExchangeManager { } #[async_backtrace::framed] - pub async fn create_client(address: &str, use_current_rt: bool) -> Result { + pub async fn create_client( + address: &str, + use_current_rt: bool, + keep_alive: FlightKeepAliveParams, + ) -> Result { let config = GlobalConfig::instance(); let address = address.to_string(); + let keep_alive_config = build_keep_alive_config(keep_alive); let task = async move { match config.tls_query_cli_enabled() { true => Ok(FlightClient::new(FlightServiceClient::new( @@ -336,11 +350,18 @@ impl DataExchangeManager { address.to_owned(), None, Some(config.query.to_rpc_client_tls_config()), + keep_alive_config, ) .await?, ))), false => Ok(FlightClient::new(FlightServiceClient::new( - ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?, + ConnectionFactory::create_rpc_channel( + address.to_owned(), + None, + None, + keep_alive_config, + ) + .await?, ))), } }; @@ -474,6 +495,7 @@ impl DataExchangeManager { timeout: settings.get_flight_client_timeout()?, retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, + keep_alive: settings.get_flight_keep_alive_params()?, }; let mut root_fragment_ids = actions.get_root_fragment_ids()?; let conf = GlobalConfig::instance(); diff --git a/src/query/service/tests/it/servers/flight/flight_service.rs b/src/query/service/tests/it/servers/flight/flight_service.rs index 3f4613eade52e..f2c74668334b6 100644 --- a/src/query/service/tests/it/servers/flight/flight_service.rs +++ b/src/query/service/tests/it/servers/flight/flight_service.rs @@ -54,14 +54,15 @@ async fn test_tls_rpc_server() -> Result<()> { }); // normal case - let conn = ConnectionFactory::create_rpc_channel(listener_address, None, tls_conf).await?; + let conn = + ConnectionFactory::create_rpc_channel(listener_address, None, tls_conf, None).await?; let mut f_client = FlightServiceClient::new(conn); let r = f_client.list_actions(Empty {}).await; assert!(r.is_ok()); // client access without tls enabled will be failed // - channel can still be created, but communication will be failed - let channel = ConnectionFactory::create_rpc_channel(listener_address, None, None).await?; + let channel = ConnectionFactory::create_rpc_channel(listener_address, None, None, None).await?; let mut f_client = FlightServiceClient::new(channel); let r = f_client.list_actions(Empty {}).await; assert!(r.is_err()); @@ -96,7 +97,7 @@ async fn test_tls_rpc_server_invalid_client_config() -> Result<()> { domain_name: TEST_CN_NAME.to_string(), }; - let r = ConnectionFactory::create_rpc_channel("fake:1234", None, Some(client_conf)).await; + let r = ConnectionFactory::create_rpc_channel("fake:1234", None, Some(client_conf), None).await; assert!(r.is_err()); let e = r.unwrap_err(); match e { diff --git a/src/query/settings/src/lib.rs b/src/query/settings/src/lib.rs index c7f284ce09e01..53b33a4e26c2d 100644 --- a/src/query/settings/src/lib.rs +++ b/src/query/settings/src/lib.rs @@ -26,4 +26,5 @@ pub use settings_default::SettingMode; pub use settings_default::SettingRange; pub use settings_default::SettingScope; pub use settings_getter_setter::FlightCompression; +pub use settings_getter_setter::FlightKeepAliveParams; pub use settings_getter_setter::OutofMemoryBehavior; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 45cf828dc937c..7efd9f212e9e4 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -281,6 +281,27 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("flight_client_keep_alive_time_secs", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Sets the idle time in seconds before a flight TCP connection sends keepalive probes. 0 disables keepalive.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), + ("flight_client_keep_alive_interval_secs", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Sets the interval in seconds between two flight TCP keepalive probes. 0 disables keepalive.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), + ("flight_client_keep_alive_retries", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Sets the number of TCP keepalive retries for flight connections before declaring the peer unreachable. 0 disables keepalive.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("http_handler_result_timeout_secs", DefaultSettingValue { value: { let result_timeout_secs = global_conf.as_ref().map(|conf| conf.query.http_handler_result_timeout_secs) diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0b2ee123cd5dc..0ad3c78c5cdc3 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::TryFrom; use std::str::FromStr; +use std::time::Duration; use databend_common_ast::parser::Dialect; use databend_common_base::base::BuildInfoRef; @@ -36,6 +38,19 @@ pub enum FlightCompression { Zstd, } +#[derive(Clone, Copy, Debug, Default)] +pub struct FlightKeepAliveParams { + pub time: Option, + pub interval: Option, + pub retries: Option, +} + +impl FlightKeepAliveParams { + pub fn is_disabled(&self) -> bool { + self.time.is_none() && self.interval.is_none() && self.retries.is_none() + } +} + #[derive(Clone, Copy)] pub enum SpillFileFormat { Arrow, @@ -284,6 +299,33 @@ impl Settings { self.try_get_u64("flight_client_timeout") } + pub fn get_flight_keep_alive_params(&self) -> Result { + fn secs_to_duration(value: u64) -> Option { + if value == 0 { + None + } else { + Some(Duration::from_secs(value)) + } + } + + let retries_raw = self.try_get_u64("flight_client_keep_alive_retries")?; + let retries = if retries_raw == 0 { + None + } else { + Some(u32::try_from(retries_raw).map_err(|_| { + ErrorCode::BadArguments( + "flight_client_keep_alive_retries must be less than or equal to u32::MAX", + ) + })?) + }; + + Ok(FlightKeepAliveParams { + time: secs_to_duration(self.try_get_u64("flight_client_keep_alive_time_secs")?), + interval: secs_to_duration(self.try_get_u64("flight_client_keep_alive_interval_secs")?), + retries, + }) + } + // Get storage read buffer size. pub fn get_storage_read_buffer_size(&self) -> Result { self.try_get_u64("storage_read_buffer_size") diff --git a/src/query/settings/tests/it/setting.rs b/src/query/settings/tests/it/setting.rs index c37bc9929200f..a3f1e380f1e27 100644 --- a/src/query/settings/tests/it/setting.rs +++ b/src/query/settings/tests/it/setting.rs @@ -218,3 +218,48 @@ async fn test_set_data_retention_time_in_days_from_config() { assert_eq!(expect, format!("{}", result.unwrap_err())); } } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_flight_keep_alive_settings() { + let settings = Settings::create(Tenant::new_literal("test")); + + let params = settings.get_flight_keep_alive_params().unwrap(); + assert!(params.is_disabled()); + + settings + .set_setting( + "flight_client_keep_alive_time_secs".to_string(), + "15".to_string(), + ) + .unwrap(); + settings + .set_setting( + "flight_client_keep_alive_interval_secs".to_string(), + "5".to_string(), + ) + .unwrap(); + settings + .set_setting( + "flight_client_keep_alive_retries".to_string(), + "3".to_string(), + ) + .unwrap(); + + let params = settings.get_flight_keep_alive_params().unwrap(); + assert_eq!(params.time.unwrap().as_secs(), 15); + assert_eq!(params.interval.unwrap().as_secs(), 5); + assert_eq!(params.retries.unwrap(), 3); + + // Values larger than u32::MAX should trigger an error when read. + settings + .set_setting( + "flight_client_keep_alive_retries".to_string(), + (u32::MAX as u64 + 1).to_string(), + ) + .unwrap(); + let err = settings.get_flight_keep_alive_params().unwrap_err(); + assert!( + err.to_string() + .contains("flight_client_keep_alive_retries must be less than or equal to u32::MAX") + ); +}