Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ impl KubernetesResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for KubernetesResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, _: &mut NodeInfo) -> Result<()> {
Err(ErrorCode::Unimplemented(
"Unimplemented kubernetes resources management",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl SelfManagedResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for SelfManagedResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
node.cluster_id = config.query.cluster_id.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ impl SystemResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for SystemResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
true
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
assert!(config.query.cluster_id.is_empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use databend_common_meta_types::NodeType;

#[async_trait::async_trait]
pub trait ResourcesManagement: Sync + Send + 'static {
fn support_forward_warehouse_request(&self) -> bool;

async fn init_node(&self, node: &mut NodeInfo) -> Result<()>;

async fn create_warehouse(&self, name: String, nodes: Vec<SelectedNode>) -> Result<()>;
Expand Down Expand Up @@ -77,6 +79,10 @@ pub struct DummyResourcesManagement;

#[async_trait::async_trait]
impl ResourcesManagement for DummyResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
node.cluster_id = config.query.cluster_id.clone();
Expand Down
84 changes: 47 additions & 37 deletions src/query/service/src/servers/http/middleware/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL;
use databend_common_base::headers::HEADER_NODE_ID;
use databend_common_base::headers::HEADER_QUERY_ID;
Expand All @@ -30,6 +31,7 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::user_token::TokenType;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::NodeInfo;
use databend_enterprise_resources_management::ResourcesManagement;
use fastrace::func_name;
use headers::authorization::Basic;
use headers::authorization::Bearer;
Expand Down Expand Up @@ -537,46 +539,54 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
};
}
} else if let Some(warehouse) = headers.get(HEADER_WAREHOUSE) {
req.headers_mut().remove(HEADER_WAREHOUSE);

let warehouse = warehouse
.to_str()
.map_err(|e| {
HttpErrorCode::bad_request(ErrorCode::BadArguments(format!(
"Invalid Header ({HEADER_WAREHOUSE}: {warehouse:?}): {e}"
)))
})?
.to_string();

let cluster_discovery = ClusterDiscovery::instance();

let forward_node = cluster_discovery.find_node_by_warehouse(&warehouse).await;

match forward_node {
Err(error) => {
return Err(HttpErrorCode::server_error(error).into());
}
Ok(None) => {
let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse <new_warehouse>`", warehouse);
warn!("{}", msg);
return Err(Error::from(HttpErrorCode::bad_request(
ErrorCode::UnknownWarehouse(msg),
)));
}
Ok(Some(node)) => {
let local_id = GlobalConfig::instance().query.node_id.clone();
if node.id != local_id {
log::info!(
"forwarding /v1{} from {} to warehouse {}({})",
req.uri(),
local_id,
warehouse,
node.id
);
return forward_request(req, node).await;
let resources_management = GlobalInstance::get::<Arc<dyn ResourcesManagement>>();
if resources_management.support_forward_warehouse_request() {
req.headers_mut().remove(HEADER_WAREHOUSE);

let warehouse = warehouse
.to_str()
.map_err(|e| {
HttpErrorCode::bad_request(ErrorCode::BadArguments(format!(
"Invalid value for header ({HEADER_WAREHOUSE}: {warehouse:?}): {e}"
)))
})?
.to_string();

let cluster_discovery = ClusterDiscovery::instance();

let forward_node = cluster_discovery.find_node_by_warehouse(&warehouse).await;

match forward_node {
Err(error) => {
return Err(HttpErrorCode::server_error(
error.add_message_back("(while in warehouse request forward)"),
)
.into());
}
Ok(None) => {
let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse <new_warehouse>`", warehouse);
warn!("{}", msg);
return Err(Error::from(HttpErrorCode::bad_request(
ErrorCode::UnknownWarehouse(msg),
)));
}
Ok(Some(node)) => {
let local_id = GlobalConfig::instance().query.node_id.clone();
if node.id != local_id {
log::info!(
"forwarding /v1{} from {} to warehouse {}({})",
req.uri(),
local_id,
warehouse,
node.id
);
return forward_request(req, node).await;
}
}
}
}

log::warn!("Ignore header ({HEADER_WAREHOUSE}: {warehouse:?})");
}
}

Expand Down
Loading