Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/common/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,31 @@ pub struct StorageConfig {
pub num_cpus: u64,
pub allow_insecure: bool,
pub params: StorageParams,
/// Global switches that affect the ambient credential chain behavior.
///
/// Notes:
/// - These are runtime-only controls and are not persisted in meta.
/// - They apply to all storage operators created in this process.
pub disable_config_load: bool,
pub disable_instance_profile: bool,
}

/// Runtime-only switches for ambient credential chain behavior.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct CredentialChainConfig {
pub disable_config_load: bool,
pub disable_instance_profile: bool,
}

impl CredentialChainConfig {
pub fn init(cfg: CredentialChainConfig) -> databend_common_exception::Result<()> {
GlobalInstance::set(cfg);
Ok(())
}

pub fn try_get() -> Option<CredentialChainConfig> {
GlobalInstance::try_get()
}
}

// TODO: This config should be moved out of common-storage crate.
Expand Down
43 changes: 37 additions & 6 deletions src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use opendal_layer_immutable_index::ImmutableIndexLayer;

use crate::StorageConfig;
use crate::StorageHttpClient;
use crate::config::CredentialChainConfig;
use crate::http_client::get_storage_http_client;
use crate::metrics_layer::METRICS_LAYER;
use crate::operator_cache::get_operator_cache;
Expand All @@ -71,6 +72,15 @@ use crate::runtime_layer::RuntimeLayer;
static METRIC_OPENDAL_RETRIES_COUNT: LazyLock<FamilyCounter<Vec<(&'static str, String)>>> =
LazyLock::new(|| register_counter_family("opendal_retries_count"));

fn set_allow_credential_chain_if_missing(params: &mut StorageParams, allow: bool) {
if let StorageParams::S3(cfg) = params {
if cfg.allow_credential_chain.is_some() {
return;
}
cfg.allow_credential_chain = Some(allow);
}
}

/// init_operator will init an opendal operator based on storage config.
pub fn init_operator(cfg: &StorageParams) -> Result<Operator> {
let cache = get_operator_cache();
Expand Down Expand Up @@ -416,19 +426,31 @@ fn init_s3_operator(cfg: &StorageS3Config) -> Result<impl Builder> {
builder = builder.default_storage_class(cfg.storage_class.to_string().as_ref())
}

// Disable credential loader
if cfg.disable_credential_loader {
// If allow_credential_chain is not set, default to false for security.
let allow_credential_chain = cfg.allow_credential_chain.unwrap_or(false);

// Disallowing the credential chain forces unsigned or fully explicit access.
if !allow_credential_chain {
builder = builder.disable_config_load().disable_ec2_metadata();
} else if let Some(global) = CredentialChainConfig::try_get() {
// Apply global limits to the credential chain.
if global.disable_config_load {
builder = builder.disable_config_load();
}
if global.disable_instance_profile {
builder = builder.disable_ec2_metadata();
}
}

// If credential loading is disabled and no credentials are provided, use unsigned requests.
// This allows accessing public buckets reliably in environments where signing could be rejected.
if cfg.disable_credential_loader
if !allow_credential_chain
&& cfg.access_key_id.is_empty()
&& cfg.secret_access_key.is_empty()
&& cfg.security_token.is_empty()
&& cfg.role_arn.is_empty()
{
// Allow anonymous is actually forcing unsigned requests in OpenDAL.
builder = builder.allow_anonymous();
}

Expand Down Expand Up @@ -569,6 +591,10 @@ impl DataOperator {
conf: &StorageConfig,
spill_params: Option<StorageParams>,
) -> databend_common_exception::Result<()> {
CredentialChainConfig::init(CredentialChainConfig {
disable_config_load: conf.disable_config_load,
disable_instance_profile: conf.disable_instance_profile,
})?;
GlobalInstance::set(Self::try_create(conf, spill_params).await?);

Ok(())
Expand All @@ -579,19 +605,24 @@ impl DataOperator {
conf: &StorageConfig,
spill_params: Option<StorageParams>,
) -> databend_common_exception::Result<DataOperator> {
let operator = init_operator(&conf.params)?;
check_operator(&operator, &conf.params).await?;
let mut data_params = conf.params.clone();
// Global data operator must allow ambient credentials unless explicitly disabled.
set_allow_credential_chain_if_missing(&mut data_params, true);
let operator = init_operator(&data_params)?;
check_operator(&operator, &data_params).await?;

// Init spill operator
let mut params = spill_params.as_ref().unwrap_or(&conf.params).clone();
// Spill operator shares the same default credential policy as the global operator.
set_allow_credential_chain_if_missing(&mut params, true);
// Always use Standard storage class if spill to s3 object storage
set_s3_storage_class(&mut params, S3StorageClass::Standard);
let spill_operator = init_operator(&params)?;
check_operator(&spill_operator, &params).await?;

Ok(DataOperator {
operator,
params: conf.params.clone(),
params: data_params,
spill_operator,
spill_params,
})
Expand Down
9 changes: 5 additions & 4 deletions src/common/storage/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ impl StageFileInfo {

pub fn init_stage_operator(stage_info: &StageInfo) -> Result<Operator> {
if stage_info.stage_type == StageType::External {
// External S3 stages don't load credentials by default; `role_arn` opts into assume-role.
// External S3 stages disallow the ambient credential chain by default.
// `role_arn` opts into using the credential chain as the source credential.
let storage = match stage_info.stage_params.storage.clone() {
StorageParams::S3(mut cfg) => {
if cfg.role_arn.is_empty() {
cfg.disable_credential_loader = true;
}
let allow_credential_chain =
stage_info.allow_credential_chain || !cfg.role_arn.is_empty();
cfg.allow_credential_chain = Some(allow_credential_chain);
StorageParams::S3(cfg)
}
v => v,
Expand Down
15 changes: 11 additions & 4 deletions src/meta/app-storage/src/storage_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,6 @@ impl StorageParams {
s1.master_key = s2.master_key;
s1.network_config = s2.network_config;
s1.disable_credential_loader = s2.disable_credential_loader;
// Remove disable_credential_loader is role_arn has been set.
if !s1.role_arn.is_empty() {
s1.disable_credential_loader = false;
}
Ok(Self::S3(s1))
}
(s1, s2) => Err(ErrorCode::StorageOther(format!(
Expand Down Expand Up @@ -483,7 +479,16 @@ pub struct StorageS3Config {
pub root: String,
/// This flag is used internally to control whether databend load
/// credentials from environment like env, profile and web token.
///
/// Deprecated: prefer the runtime `allow_credential_chain` policy plus global
/// `storage.disable_config_load` / `storage.disable_instance_profile`.
pub disable_credential_loader: bool,
/// Runtime-only override for whether ambient credential chains are allowed.
///
/// This value is never serialized/persisted and is only used when building
/// operators in the current process.
#[serde(skip)]
pub allow_credential_chain: Option<bool>,
/// Enable this flag to send API in virtual host style.
///
/// - Virtual Host Style: `https://bucket.s3.amazonaws.com`
Expand All @@ -509,6 +514,7 @@ impl Default for StorageS3Config {
master_key: "".to_string(),
root: "".to_string(),
disable_credential_loader: false,
allow_credential_chain: None,
enable_virtual_host_style: false,
role_arn: "".to_string(),
external_id: "".to_string(),
Expand All @@ -527,6 +533,7 @@ impl Debug for StorageS3Config {
.field("storage_class", &self.storage_class)
.field("root", &self.root)
.field("disable_credential_loader", &self.disable_credential_loader)
.field("allow_credential_chain", &self.allow_credential_chain)
.field("enable_virtual_host_style", &self.enable_virtual_host_style)
.field("role_arn", &self.role_arn)
.field("external_id", &self.external_id)
Expand Down
10 changes: 10 additions & 0 deletions src/meta/app/src/principal/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ pub struct StageInfo {
pub stage_name: String,
pub stage_type: StageType,
pub stage_params: StageParams,
/// Runtime flag to allow ambient credential chain usage for stage operators.
///
/// This value is not persisted to meta and must be set explicitly when needed.
#[serde(skip)]
pub allow_credential_chain: bool,
// on `COPY INTO xx FROM 's3://xxx?ak=?&sk=?'`, the URL(ExternalLocation) will be treated as an temporary stage.
pub is_temporary: bool,
pub file_format_params: FileFormatParams,
Expand Down Expand Up @@ -473,6 +478,11 @@ impl StageInfo {
self
}

pub fn with_allow_credential_chain(mut self, allow: bool) -> StageInfo {
self.allow_credential_chain = allow;
self
}

/// Get the prefix of stage.
///
/// Use this function to get the prefix of this stage in the data operator.
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/config_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl FromToProto for StorageS3Config {
root: p.root,
master_key: p.master_key,
disable_credential_loader: p.disable_credential_loader,
allow_credential_chain: None,
enable_virtual_host_style: p.enable_virtual_host_style,
role_arn: p.role_arn,
external_id: p.external_id,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/stage_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ impl FromToProto for mt::principal::StageInfo {
|| Incompatible::new("StageInfo.stage_params cannot be None".to_string()),
)?)?,
is_temporary: false,
allow_credential_chain: false,
file_format_params,
copy_options: mt::principal::CopyOptions::from_pb(p.copy_options.ok_or_else(
|| Incompatible::new("StageInfo.copy_options cannot be None".to_string()),
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/user_proto_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub(crate) fn test_fs_stage_info() -> mt::principal::StageInfo {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v025_user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn test_decode_v25_user_stage() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v035_user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ fn test_decode_v35_user_stage() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v042_s3_stage_new_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn test_decode_v42_s3_stage_new_field() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v066_stage_create_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ fn test_decode_v66_stage() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Parquet(
mt::principal::ParquetFileFormatParams {
compression: StageFileCompression::Zstd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn test_v077_s3_remove_allow_anonymous() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn test_v117_webhdfs_add_disable_list_batch() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v118_webhdfs_add_user_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ fn test_v118_webhdfs_add_user_name() -> anyhow::Result<()> {
}),
},
is_temporary: false,
allow_credential_chain: false,
file_format_params: mt::principal::FileFormatParams::Json(
mt::principal::JsonFileFormatParams {
compression: mt::principal::StageFileCompression::Bz2,
Expand Down
16 changes: 16 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,14 @@ pub struct StorageConfig {
#[clap(long = "storage-allow-insecure")]
pub allow_insecure: bool,

/// Disable loading credentials from env/shared config/web identity files globally.
#[clap(long = "storage-disable-config-load")]
pub disable_config_load: bool,

/// Disable all instance-profile based credential providers globally.
#[clap(long = "storage-disable-instance-profile")]
pub disable_instance_profile: bool,

#[clap(long, value_name = "VALUE", default_value_t)]
pub storage_retry_timeout: u64,

Expand Down Expand Up @@ -432,6 +440,8 @@ impl From<InnerStorageConfig> for StorageConfig {
storage_num_cpus: inner.num_cpus,
typ: "".to_string(),
allow_insecure: inner.allow_insecure,
disable_config_load: inner.disable_config_load,
disable_instance_profile: inner.disable_instance_profile,
// use default for each config instead of using `..Default::default`
// using `..Default::default` is calling `Self::default`
// and `Self::default` relies on `InnerStorage::into()`
Expand Down Expand Up @@ -542,6 +552,9 @@ impl From<InnerStorageConfig> for StorageConfig {
v => unreachable!("{v:?} should not be used as storage backend"),
}

cfg.disable_config_load = inner.disable_config_load;
cfg.disable_instance_profile = inner.disable_instance_profile;

cfg
}
}
Expand Down Expand Up @@ -587,6 +600,8 @@ impl TryInto<InnerStorageConfig> for StorageConfig {
Ok(InnerStorageConfig {
num_cpus: self.storage_num_cpus,
allow_insecure: self.allow_insecure,
disable_config_load: self.disable_config_load,
disable_instance_profile: self.disable_instance_profile,
params: {
match self.typ.as_str() {
"azblob" => {
Expand Down Expand Up @@ -1008,6 +1023,7 @@ impl TryInto<InnerStorageS3Config> for S3StorageConfig {
master_key: self.master_key,
root: self.root,
disable_credential_loader: false,
allow_credential_chain: None,
enable_virtual_host_style: self.enable_virtual_host_style,
role_arn: self.s3_role_arn,
external_id: self.s3_external_id,
Expand Down
9 changes: 6 additions & 3 deletions src/query/service/tests/it/sql/expr/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ async fn test_parse_uri_location() -> Result<()> {
security_token: "session_token".to_string(),
master_key: "".to_string(),
root: "/tmp/".to_string(),
disable_credential_loader: true,
disable_credential_loader: false,
allow_credential_chain: None,
enable_virtual_host_style: false,
role_arn: "".to_string(),
external_id: "".to_string(),
Expand Down Expand Up @@ -212,7 +213,8 @@ async fn test_parse_uri_location() -> Result<()> {
security_token: "security_token".to_string(),
master_key: "".to_string(),
root: "/tmp/".to_string(),
disable_credential_loader: true,
disable_credential_loader: false,
allow_credential_chain: None,
enable_virtual_host_style: false,
role_arn: "".to_string(),
external_id: "".to_string(),
Expand Down Expand Up @@ -246,7 +248,7 @@ async fn test_parse_uri_location() -> Result<()> {
security_token: "security_token".to_string(),
master_key: "".to_string(),
root: "/tmp/".to_string(),
disable_credential_loader: true,
disable_credential_loader: false,
enable_virtual_host_style: false,
role_arn: "".to_string(),
external_id: "".to_string(),
Expand Down Expand Up @@ -276,6 +278,7 @@ async fn test_parse_uri_location() -> Result<()> {
master_key: "".to_string(),
root: "/tmp/".to_string(),
disable_credential_loader: false,
allow_credential_chain: None,
enable_virtual_host_style: false,
role_arn: "aws::iam::xxxx".to_string(),
external_id: "".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'storage' | 'cos.root' | '' | '' |
| 'storage' | 'cos.secret_id' | '' | '' |
| 'storage' | 'cos.secret_key' | '' | '' |
| 'storage' | 'disable_config_load' | 'false' | '' |
| 'storage' | 'disable_instance_profile' | 'false' | '' |
| 'storage' | 'fs.data_path' | '_data' | '' |
| 'storage' | 'gcs.bucket' | '' | '' |
| 'storage' | 'gcs.credential' | '' | '' |
Expand Down
Loading
Loading