diff --git a/src/query/ee/src/resource_management/resources_management_kubernetes.rs b/src/query/ee/src/resource_management/resources_management_kubernetes.rs index ea362f1346ae4..478c0e49e0ae4 100644 --- a/src/query/ee/src/resource_management/resources_management_kubernetes.rs +++ b/src/query/ee/src/resource_management/resources_management_kubernetes.rs @@ -32,6 +32,10 @@ impl KubernetesResourcesManagement { #[async_trait::async_trait] impl ResourcesManagement for KubernetesResourcesManagement { + fn support_forward_warehouse_request(&self) -> bool { + false + } + async fn init_node(&self, _: &mut NodeInfo) -> Result<()> { Err(ErrorCode::Unimplemented( "Unimplemented kubernetes resources management", diff --git a/src/query/ee/src/resource_management/resources_management_self_managed.rs b/src/query/ee/src/resource_management/resources_management_self_managed.rs index fdd97fd6c14ff..33d76f693166c 100644 --- a/src/query/ee/src/resource_management/resources_management_self_managed.rs +++ b/src/query/ee/src/resource_management/resources_management_self_managed.rs @@ -41,6 +41,10 @@ impl SelfManagedResourcesManagement { #[async_trait::async_trait] impl ResourcesManagement for SelfManagedResourcesManagement { + fn support_forward_warehouse_request(&self) -> bool { + false + } + async fn init_node(&self, node: &mut NodeInfo) -> Result<()> { let config = GlobalConfig::instance(); node.cluster_id = config.query.cluster_id.clone(); diff --git a/src/query/ee/src/resource_management/resources_management_system.rs b/src/query/ee/src/resource_management/resources_management_system.rs index 2b929a8f024b0..12c98d1b2aeb3 100644 --- a/src/query/ee/src/resource_management/resources_management_system.rs +++ b/src/query/ee/src/resource_management/resources_management_system.rs @@ -38,6 +38,10 @@ impl SystemResourcesManagement { #[async_trait::async_trait] impl ResourcesManagement for SystemResourcesManagement { + fn support_forward_warehouse_request(&self) -> bool { + true + } + async fn init_node(&self, node: &mut NodeInfo) -> Result<()> { let config = GlobalConfig::instance(); assert!(config.query.cluster_id.is_empty()); diff --git a/src/query/ee_features/resources_management/src/resources_management.rs b/src/query/ee_features/resources_management/src/resources_management.rs index 6a010f5591692..dd59b6b860337 100644 --- a/src/query/ee_features/resources_management/src/resources_management.rs +++ b/src/query/ee_features/resources_management/src/resources_management.rs @@ -26,6 +26,8 @@ use databend_common_meta_types::NodeType; #[async_trait::async_trait] pub trait ResourcesManagement: Sync + Send + 'static { + fn support_forward_warehouse_request(&self) -> bool; + async fn init_node(&self, node: &mut NodeInfo) -> Result<()>; async fn create_warehouse(&self, name: String, nodes: Vec) -> Result<()>; @@ -77,6 +79,10 @@ pub struct DummyResourcesManagement; #[async_trait::async_trait] impl ResourcesManagement for DummyResourcesManagement { + fn support_forward_warehouse_request(&self) -> bool { + false + } + async fn init_node(&self, node: &mut NodeInfo) -> Result<()> { let config = GlobalConfig::instance(); node.cluster_id = config.query.cluster_id.clone(); diff --git a/src/query/service/src/servers/http/middleware/session.rs b/src/query/service/src/servers/http/middleware/session.rs index 0c4b991d5b3d2..567e95f4775f5 100644 --- a/src/query/service/src/servers/http/middleware/session.rs +++ b/src/query/service/src/servers/http/middleware/session.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use databend_common_base::base::GlobalInstance; use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL; use databend_common_base::headers::HEADER_NODE_ID; use databend_common_base::headers::HEADER_QUERY_ID; @@ -30,6 +31,7 @@ use databend_common_exception::Result; use databend_common_meta_app::principal::user_token::TokenType; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_types::NodeInfo; +use databend_enterprise_resources_management::ResourcesManagement; use fastrace::func_name; use headers::authorization::Basic; use headers::authorization::Bearer; @@ -537,46 +539,54 @@ impl Endpoint for HTTPSessionEndpoint { }; } } else if let Some(warehouse) = headers.get(HEADER_WAREHOUSE) { - req.headers_mut().remove(HEADER_WAREHOUSE); - - let warehouse = warehouse - .to_str() - .map_err(|e| { - HttpErrorCode::bad_request(ErrorCode::BadArguments(format!( - "Invalid Header ({HEADER_WAREHOUSE}: {warehouse:?}): {e}" - ))) - })? - .to_string(); - - let cluster_discovery = ClusterDiscovery::instance(); - - let forward_node = cluster_discovery.find_node_by_warehouse(&warehouse).await; - - match forward_node { - Err(error) => { - return Err(HttpErrorCode::server_error(error).into()); - } - Ok(None) => { - let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse `", warehouse); - warn!("{}", msg); - return Err(Error::from(HttpErrorCode::bad_request( - ErrorCode::UnknownWarehouse(msg), - ))); - } - Ok(Some(node)) => { - let local_id = GlobalConfig::instance().query.node_id.clone(); - if node.id != local_id { - log::info!( - "forwarding /v1{} from {} to warehouse {}({})", - req.uri(), - local_id, - warehouse, - node.id - ); - return forward_request(req, node).await; + let resources_management = GlobalInstance::get::>(); + if resources_management.support_forward_warehouse_request() { + req.headers_mut().remove(HEADER_WAREHOUSE); + + let warehouse = warehouse + .to_str() + .map_err(|e| { + HttpErrorCode::bad_request(ErrorCode::BadArguments(format!( + "Invalid value for header ({HEADER_WAREHOUSE}: {warehouse:?}): {e}" + ))) + })? + .to_string(); + + let cluster_discovery = ClusterDiscovery::instance(); + + let forward_node = cluster_discovery.find_node_by_warehouse(&warehouse).await; + + match forward_node { + Err(error) => { + return Err(HttpErrorCode::server_error( + error.add_message_back("(while in warehouse request forward)"), + ) + .into()); + } + Ok(None) => { + let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse `", warehouse); + warn!("{}", msg); + return Err(Error::from(HttpErrorCode::bad_request( + ErrorCode::UnknownWarehouse(msg), + ))); + } + Ok(Some(node)) => { + let local_id = GlobalConfig::instance().query.node_id.clone(); + if node.id != local_id { + log::info!( + "forwarding /v1{} from {} to warehouse {}({})", + req.uri(), + local_id, + warehouse, + node.id + ); + return forward_request(req, node).await; + } } } } + + log::warn!("Ignore header ({HEADER_WAREHOUSE}: {warehouse:?})"); } }