Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/common/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,31 @@ pub struct StorageConfig {
pub num_cpus: u64,
pub allow_insecure: bool,
pub params: StorageParams,
/// Global switches that affect the ambient credential chain behavior.
///
/// Notes:
/// - These are runtime-only controls and are not persisted in meta.
/// - They apply to all storage operators created in this process.
pub disable_config_load: bool,
pub disable_instance_profile: bool,
}

/// Runtime-only switches for ambient credential chain behavior.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct CredentialChainConfig {
pub disable_config_load: bool,
pub disable_instance_profile: bool,
}

impl CredentialChainConfig {
pub fn init(cfg: CredentialChainConfig) -> databend_common_exception::Result<()> {
GlobalInstance::set(cfg);
Ok(())
}

pub fn try_get() -> Option<CredentialChainConfig> {
GlobalInstance::try_get()
}
}

// TODO: This config should be moved out of common-storage crate.
Expand Down
30 changes: 24 additions & 6 deletions src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use opendal::services;

use crate::StorageConfig;
use crate::StorageHttpClient;
use crate::config::CredentialChainConfig;
use crate::http_client::get_storage_http_client;
use crate::metrics_layer::METRICS_LAYER;
use crate::operator_cache::get_operator_cache;
Expand Down Expand Up @@ -415,19 +416,31 @@ fn init_s3_operator(cfg: &StorageS3Config) -> Result<impl Builder> {
builder = builder.default_storage_class(cfg.storage_class.to_string().as_ref())
}

// Disable credential loader
if cfg.disable_credential_loader {
// If allow_credential_chain is not set, default to false for security.
let allow_credential_chain = cfg.allow_credential_chain.unwrap_or(false);

// Disallowing the credential chain forces unsigned or fully explicit access.
if !allow_credential_chain {
builder = builder.disable_config_load().disable_ec2_metadata();
} else if let Some(global) = CredentialChainConfig::try_get() {
// Apply global limits to the credential chain.
if global.disable_config_load {
builder = builder.disable_config_load();
}
if global.disable_instance_profile {
builder = builder.disable_ec2_metadata();
}
}

// If credential loading is disabled and no credentials are provided, use unsigned requests.
// This allows accessing public buckets reliably in environments where signing could be rejected.
if cfg.disable_credential_loader
if !allow_credential_chain
&& cfg.access_key_id.is_empty()
&& cfg.secret_access_key.is_empty()
&& cfg.security_token.is_empty()
&& cfg.role_arn.is_empty()
{
// Allow anonymous is actually forcing unsigned requests in OpenDAL.
builder = builder.allow_anonymous();
}

Expand Down Expand Up @@ -568,6 +581,10 @@ impl DataOperator {
conf: &StorageConfig,
spill_params: Option<StorageParams>,
) -> databend_common_exception::Result<()> {
CredentialChainConfig::init(CredentialChainConfig {
disable_config_load: conf.disable_config_load,
disable_instance_profile: conf.disable_instance_profile,
})?;
GlobalInstance::set(Self::try_create(conf, spill_params).await?);

Ok(())
Expand All @@ -578,8 +595,9 @@ impl DataOperator {
conf: &StorageConfig,
spill_params: Option<StorageParams>,
) -> databend_common_exception::Result<DataOperator> {
let operator = init_operator(&conf.params)?;
check_operator(&operator, &conf.params).await?;
let data_params = conf.params.clone();
let operator = init_operator(&data_params)?;
check_operator(&operator, &data_params).await?;

// Init spill operator
let mut params = spill_params.as_ref().unwrap_or(&conf.params).clone();
Expand All @@ -590,7 +608,7 @@ impl DataOperator {

Ok(DataOperator {
operator,
params: conf.params.clone(),
params: data_params,
spill_operator,
spill_params,
})
Expand Down
9 changes: 5 additions & 4 deletions src/common/storage/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ impl StageFileInfo {

pub fn init_stage_operator(stage_info: &StageInfo) -> Result<Operator> {
if stage_info.stage_type == StageType::External {
// External S3 stages don't load credentials by default; `role_arn` opts into assume-role.
// External S3 stages disallow the ambient credential chain by default.
// `role_arn` opts into using the credential chain as the source credential.
let storage = match stage_info.stage_params.storage.clone() {
StorageParams::S3(mut cfg) => {
if cfg.role_arn.is_empty() {
cfg.disable_credential_loader = true;
}
let allow_credential_chain =
stage_info.allow_credential_chain || !cfg.role_arn.is_empty();
cfg.allow_credential_chain = Some(allow_credential_chain);
StorageParams::S3(cfg)
}
v => v,
Expand Down
17 changes: 13 additions & 4 deletions src/meta/app-storage/src/storage_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ impl StorageParams {
s1.master_key = s2.master_key;
s1.network_config = s2.network_config;
s1.disable_credential_loader = s2.disable_credential_loader;
// Remove disable_credential_loader is role_arn has been set.
if !s1.role_arn.is_empty() {
s1.disable_credential_loader = false;
}
s1.allow_credential_chain = s2.allow_credential_chain;
Ok(Self::S3(s1))
}
(s1, s2) => Err(ErrorCode::StorageOther(format!(
Expand Down Expand Up @@ -483,7 +480,17 @@ pub struct StorageS3Config {
pub root: String,
/// This flag is used internally to control whether databend load
/// credentials from environment like env, profile and web token.
///
/// Deprecated: prefer the runtime `allow_credential_chain` policy plus global
/// `storage.disable_config_load` / `storage.disable_instance_profile`.
#[serde(skip)]
pub disable_credential_loader: bool,
/// Runtime-only override for whether ambient credential chains are allowed.
///
/// This value is never serialized/persisted and is only used when building
/// operators in the current process.
#[serde(skip)]
pub allow_credential_chain: Option<bool>,
/// Enable this flag to send API in virtual host style.
///
/// - Virtual Host Style: `https://bucket.s3.amazonaws.com`
Expand All @@ -509,6 +516,7 @@ impl Default for StorageS3Config {
master_key: "".to_string(),
root: "".to_string(),
disable_credential_loader: false,
allow_credential_chain: None,
enable_virtual_host_style: false,
role_arn: "".to_string(),
external_id: "".to_string(),
Expand All @@ -527,6 +535,7 @@ impl Debug for StorageS3Config {
.field("storage_class", &self.storage_class)
.field("root", &self.root)
.field("disable_credential_loader", &self.disable_credential_loader)
.field("allow_credential_chain", &self.allow_credential_chain)
.field("enable_virtual_host_style", &self.enable_virtual_host_style)
.field("role_arn", &self.role_arn)
.field("external_id", &self.external_id)
Expand Down
10 changes: 10 additions & 0 deletions src/meta/app/src/principal/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ pub struct StageInfo {
pub stage_name: String,
pub stage_type: StageType,
pub stage_params: StageParams,
/// Runtime flag to allow ambient credential chain usage for stage operators.
///
/// This value is not persisted to meta and must be set explicitly when needed.
#[serde(skip)]
pub allow_credential_chain: bool,
// on `COPY INTO xx FROM 's3://xxx?ak=?&sk=?'`, the URL(ExternalLocation) will be treated as an temporary stage.
pub is_temporary: bool,
pub file_format_params: FileFormatParams,
Expand Down Expand Up @@ -473,6 +478,11 @@ impl StageInfo {
self
}

pub fn with_allow_credential_chain(mut self, allow: bool) -> StageInfo {
self.allow_credential_chain = allow;
self
}

/// Get the prefix of stage.
///
/// Use this function to get the prefix of this stage in the data operator.
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/config_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl FromToProto for StorageS3Config {
root: p.root,
master_key: p.master_key,
disable_credential_loader: p.disable_credential_loader,
allow_credential_chain: None,
enable_virtual_host_style: p.enable_virtual_host_style,
role_arn: p.role_arn,
external_id: p.external_id,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/stage_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ impl FromToProto for mt::principal::StageInfo {
|| Incompatible::new("StageInfo.stage_params cannot be None".to_string()),
)?)?,
is_temporary: false,
allow_credential_chain: false,
file_format_params,
copy_options: mt::principal::CopyOptions::from_pb(p.copy_options.ok_or_else(
|| Incompatible::new("StageInfo.copy_options cannot be None".to_string()),
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/user_proto_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub(crate) fn test_fs_stage_info() -> mt::principal::StageInfo {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v025_user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn test_decode_v25_user_stage() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v035_user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ fn test_decode_v35_user_stage() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v042_s3_stage_new_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn test_decode_v42_s3_stage_new_field() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v066_stage_create_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ fn test_decode_v66_stage() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Parquet(
mt::principal::ParquetFileFormatParams {
compression: StageFileCompression::Zstd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn test_v077_s3_remove_allow_anonymous() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn test_v117_webhdfs_add_disable_list_batch() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v118_webhdfs_add_user_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ fn test_v118_webhdfs_add_user_name() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
16 changes: 16 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,14 @@ pub struct StorageConfig {
#[clap(long = "storage-allow-insecure")]
pub allow_insecure: bool,

/// Disable loading credentials from env/shared config/web identity files globally.
#[clap(long = "storage-disable-config-load")]
pub disable_config_load: bool,

/// Disable all instance-profile based credential providers globally.
#[clap(long = "storage-disable-instance-profile")]
pub disable_instance_profile: bool,

#[clap(long, value_name = "VALUE", default_value_t)]
pub storage_retry_timeout: u64,

Expand Down Expand Up @@ -432,6 +440,8 @@ impl From<InnerStorageConfig> for StorageConfig {
storage_num_cpus: inner.num_cpus,
typ: "".to_string(),
allow_insecure: inner.allow_insecure,
disable_config_load: inner.disable_config_load,
disable_instance_profile: inner.disable_instance_profile,
// use default for each config instead of using `..Default::default`
// using `..Default::default` is calling `Self::default`
// and `Self::default` relies on `InnerStorage::into()`
Expand Down Expand Up @@ -542,6 +552,9 @@ impl From<InnerStorageConfig> for StorageConfig {
v => unreachable!("{v:?} should not be used as storage backend"),
}

cfg.disable_config_load = inner.disable_config_load;
cfg.disable_instance_profile = inner.disable_instance_profile;

cfg
}
}
Expand Down Expand Up @@ -587,6 +600,8 @@ impl TryInto<InnerStorageConfig> for StorageConfig {
Ok(InnerStorageConfig {
num_cpus: self.storage_num_cpus,
allow_insecure: self.allow_insecure,
disable_config_load: self.disable_config_load,
disable_instance_profile: self.disable_instance_profile,
params: {
match self.typ.as_str() {
"azblob" => {
Expand Down Expand Up @@ -1008,6 +1023,7 @@ impl TryInto<InnerStorageS3Config> for S3StorageConfig {
master_key: self.master_key,
root: self.root,
disable_credential_loader: false,
allow_credential_chain: None,
enable_virtual_host_style: self.enable_virtual_host_style,
role_arn: self.s3_role_arn,
external_id: self.s3_external_id,
Expand Down
12 changes: 12 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ impl InnerConfig {
// Handle auto detect for storage params.
cfg.storage.params = cfg.storage.params.auto_detect().await?;

// Set default allow_credential_chain to true for config storage params.
if let StorageParams::S3(s3) = &mut cfg.storage.params
&& s3.allow_credential_chain.is_none()
{
s3.allow_credential_chain = Some(true);
}
if let Some(StorageParams::S3(s3)) = &mut cfg.spill.storage_params
&& s3.allow_credential_chain.is_none()
{
s3.allow_credential_chain = Some(true);
}

if check_meta {
cfg.meta.check_valid()?;
}
Expand Down
21 changes: 20 additions & 1 deletion src/query/service/src/history_tables/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::TableSchemaRef;
use databend_common_meta_app::storage::StorageParams;
use databend_common_sql::Planner;
use databend_common_sql::binder::parse_uri_location;
use databend_common_sql::plans::Plan;
Expand Down Expand Up @@ -154,7 +155,10 @@ pub async fn should_reset(

// External1 -> External2
// return error to prevent cyclic conversion
if current_storage_params != Some(&new_storage_params) {
let is_same_params = current_storage_params
.map(|sp| storage_params_equal(sp, &new_storage_params))
.unwrap_or(false);
if !is_same_params {
info!(
"Storage parameters have changed, current {:?} vs new {:?}",
current_storage_params, new_storage_params
Expand Down Expand Up @@ -186,6 +190,21 @@ pub async fn get_log_table(context: Arc<QueryContext>) -> Result<Option<Arc<dyn
Ok(Some(table?))
}

fn storage_params_equal(lhs: &StorageParams, rhs: &StorageParams) -> bool {
match (lhs, rhs) {
(StorageParams::S3(left), StorageParams::S3(right)) => {
let mut left = left.clone();
let mut right = right.clone();
left.disable_credential_loader = false;
right.disable_credential_loader = false;
left.allow_credential_chain = None;
right.allow_credential_chain = None;
left == right
}
_ => lhs == rhs,
}
}

#[cfg(test)]
mod tests {
mod alter_table_tests {
Expand Down
Loading