diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index df1a033ecc380..733db8abece99 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -129,7 +129,7 @@ const CATALOG_HIVE: &str = "hive"; /// It's forbidden to do any breaking changes on this struct. /// Only adding new fields is allowed. /// This same rules should be applied to all fields of this struct. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Args)] #[serde(default)] pub struct Config { // Query engine config. @@ -3189,7 +3189,7 @@ impl TryInto for LocalConfig { } } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Args)] #[serde(default)] pub struct CacheConfig { /// The data in meta-service using key `TenantOwnershipObjectIdent` @@ -3533,7 +3533,7 @@ impl Default for DiskCacheKeyReloadPolicy { } } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Args)] #[serde(default)] pub struct DiskCacheConfig { /// Max bytes of cached raw table data. Default 20GB, set it to 0 to disable it. @@ -3570,6 +3570,10 @@ pub struct DiskCacheConfig { )] #[serde(default = "bool_true")] pub sync_data: bool, + + #[clap(flatten)] + #[serde(default)] + pub ratios: DiskCacheRatioConfig, } impl Default for DiskCacheConfig { @@ -3578,6 +3582,72 @@ impl Default for DiskCacheConfig { } } +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Args)] +#[serde(default)] +pub struct DiskCacheRatioConfig { + /// Weight used to derive on-disk column data cache capacity when no explicit size is provided. + #[clap( + long = "cache-disk-column-data-ratio", + value_name = "VALUE", + default_value = "1" + )] + pub column_data: f64, + + /// Weight used to derive on-disk bloom index filter cache capacity when no explicit size is provided. + #[clap( + long = "cache-disk-bloom-index-filter-ratio", + value_name = "VALUE", + default_value = "0" + )] + pub bloom_index_filter: f64, + + /// Weight used to derive on-disk bloom index meta cache capacity when no explicit size is provided. + #[clap( + long = "cache-disk-bloom-index-meta-ratio", + value_name = "VALUE", + default_value = "0" + )] + pub bloom_index_meta: f64, + + /// Weight used to derive on-disk inverted index filter cache capacity when no explicit size is provided. + #[clap( + long = "cache-disk-inverted-index-filter-ratio", + value_name = "VALUE", + default_value = "0" + )] + pub inverted_index_filter: f64, + + /// Weight used to derive on-disk inverted index meta cache capacity when no explicit size is provided. + #[clap( + long = "cache-disk-inverted-index-meta-ratio", + value_name = "VALUE", + default_value = "0" + )] + pub inverted_index_meta: f64, + + /// Weight used to derive on-disk vector index filter cache capacity when no explicit size is provided. + #[clap( + long = "cache-disk-vector-index-filter-ratio", + value_name = "VALUE", + default_value = "0" + )] + pub vector_index_filter: f64, + + /// Weight used to derive on-disk vector index meta cache capacity when no explicit size is provided. + #[clap( + long = "cache-disk-vector-index-meta-ratio", + value_name = "VALUE", + default_value = "0" + )] + pub vector_index_meta: f64, +} + +impl Default for DiskCacheRatioConfig { + fn default() -> Self { + inner::DiskCacheRatioConfig::default().into() + } +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)] #[serde(default)] pub struct SpillConfig { @@ -3850,6 +3920,7 @@ mod cache_config_converters { max_bytes: value.max_bytes, path: value.path, sync_data: value.sync_data, + ratios: value.ratios.into(), }) } } @@ -3860,6 +3931,35 @@ mod cache_config_converters { max_bytes: value.max_bytes, path: value.path, sync_data: value.sync_data, + ratios: value.ratios.into(), + } + } + } + + impl From for inner::DiskCacheRatioConfig { + fn from(value: DiskCacheRatioConfig) -> Self { + Self { + column_data: value.column_data, + bloom_index_filter: value.bloom_index_filter, + bloom_index_meta: value.bloom_index_meta, + inverted_index_filter: value.inverted_index_filter, + inverted_index_meta: value.inverted_index_meta, + vector_index_filter: value.vector_index_filter, + vector_index_meta: value.vector_index_meta, + } + } + } + + impl From for DiskCacheRatioConfig { + fn from(value: inner::DiskCacheRatioConfig) -> Self { + Self { + column_data: value.column_data, + bloom_index_filter: value.bloom_index_filter, + bloom_index_meta: value.bloom_index_meta, + inverted_index_filter: value.inverted_index_filter, + inverted_index_meta: value.inverted_index_meta, + vector_index_filter: value.vector_index_filter, + vector_index_meta: value.vector_index_meta, } } } diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index 5d17517fb0329..f9a218132bb9c 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -44,7 +44,7 @@ use crate::BuiltInConfig; /// Inner config for query. /// /// All function should implement based on this Config. -#[derive(Clone, Default, PartialEq, Eq)] +#[derive(Clone, Default, PartialEq)] pub struct InnerConfig { // Query engine config. pub query: QueryConfig, @@ -554,7 +554,7 @@ impl Default for LocalConfig { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub struct CacheConfig { /// The data in meta-service using key `TenantOwnershipObjectIdent` pub meta_service_ownership_cache: bool, @@ -726,7 +726,7 @@ impl Display for DiskCacheKeyReloadPolicy { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub struct DiskCacheConfig { /// Max bytes of cached raw table data. Default 20GB, set it to 0 to disable it. pub max_bytes: u64, @@ -739,6 +739,9 @@ pub struct DiskCacheConfig { /// it's recommended to set this to true to prevent the container from /// being killed due to high dirty page memory usage. pub sync_data: bool, + + /// Ratio weights used to derive per-cache disk capacities when explicit sizes are not specified. + pub ratios: DiskCacheRatioConfig, } impl Default for DiskCacheConfig { @@ -747,6 +750,32 @@ impl Default for DiskCacheConfig { max_bytes: 21474836480, path: "./.databend/_cache".to_owned(), sync_data: true, + ratios: DiskCacheRatioConfig::default(), + } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct DiskCacheRatioConfig { + pub column_data: f64, + pub bloom_index_filter: f64, + pub bloom_index_meta: f64, + pub inverted_index_filter: f64, + pub inverted_index_meta: f64, + pub vector_index_filter: f64, + pub vector_index_meta: f64, +} + +impl Default for DiskCacheRatioConfig { + fn default() -> Self { + Self { + column_data: 1.0, + bloom_index_filter: 0.0, + bloom_index_meta: 0.0, + inverted_index_filter: 0.0, + inverted_index_meta: 0.0, + vector_index_filter: 0.0, + vector_index_meta: 0.0, } } } diff --git a/src/query/config/src/lib.rs b/src/query/config/src/lib.rs index 07efa5a0bc065..445c18516ca3c 100644 --- a/src/query/config/src/lib.rs +++ b/src/query/config/src/lib.rs @@ -47,6 +47,7 @@ pub use inner::CatalogConfig; pub use inner::CatalogHiveConfig; pub use inner::DiskCacheConfig as DiskCacheInnerConfig; pub use inner::DiskCacheKeyReloadPolicy; +pub use inner::DiskCacheRatioConfig as DiskCacheRatioInnerConfig; pub use inner::InnerConfig; pub use inner::MetaConfig; pub use inner::QueryConfig; diff --git a/src/query/storages/common/cache/src/manager.rs b/src/query/storages/common/cache/src/manager.rs index 192a47ed27388..3f68a305198db 100644 --- a/src/query/storages/common/cache/src/manager.rs +++ b/src/query/storages/common/cache/src/manager.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -21,6 +22,7 @@ use databend_common_base::base::GlobalInstance; use databend_common_config::CacheConfig; use databend_common_config::CacheStorageTypeInnerConfig; use databend_common_config::DiskCacheKeyReloadPolicy; +use databend_common_config::DiskCacheRatioInnerConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use log::info; @@ -65,6 +67,87 @@ static DEFAULT_PARQUET_META_DATA_CACHE_ITEMS: usize = 3000; // Eventually, we should refactor the compute node configurations instead, to make those options more sensible. const TABLE_DATA_DISK_CACHE_SIZE_THRESHOLD: usize = 1024; +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +enum DiskCacheKind { + ColumnData, + BloomIndexFilter, + BloomIndexMeta, + InvertedIndexFilter, + InvertedIndexMeta, + VectorIndexFilter, + VectorIndexMeta, +} + +impl DiskCacheKind { + const ALL: [DiskCacheKind; 7] = [ + DiskCacheKind::ColumnData, + DiskCacheKind::BloomIndexFilter, + DiskCacheKind::BloomIndexMeta, + DiskCacheKind::InvertedIndexFilter, + DiskCacheKind::InvertedIndexMeta, + DiskCacheKind::VectorIndexFilter, + DiskCacheKind::VectorIndexMeta, + ]; + + fn as_str(&self) -> &'static str { + match self { + DiskCacheKind::ColumnData => "column_data", + DiskCacheKind::BloomIndexFilter => "bloom_index_filter", + DiskCacheKind::BloomIndexMeta => "bloom_index_meta", + DiskCacheKind::InvertedIndexFilter => "inverted_index_filter", + DiskCacheKind::InvertedIndexMeta => "inverted_index_meta", + DiskCacheKind::VectorIndexFilter => "vector_index_filter", + DiskCacheKind::VectorIndexMeta => "vector_index_meta", + } + } +} + +impl fmt::Display for DiskCacheKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +#[derive(Clone, Copy)] +enum AllocationSource { + Override, + Ratio, +} + +impl AllocationSource { + fn as_str(&self) -> &'static str { + match self { + AllocationSource::Override => "override", + AllocationSource::Ratio => "ratio", + } + } +} + +#[derive(Default, Debug)] +struct DiskCacheSizes { + column_data: usize, + bloom_index_filter: usize, + bloom_index_meta: usize, + inverted_index_filter: usize, + inverted_index_meta: usize, + vector_index_filter: usize, + vector_index_meta: usize, +} + +impl DiskCacheSizes { + fn set(&mut self, kind: DiskCacheKind, bytes: usize) { + match kind { + DiskCacheKind::ColumnData => self.column_data = bytes, + DiskCacheKind::BloomIndexFilter => self.bloom_index_filter = bytes, + DiskCacheKind::BloomIndexMeta => self.bloom_index_meta = bytes, + DiskCacheKind::InvertedIndexFilter => self.inverted_index_filter = bytes, + DiskCacheKind::InvertedIndexMeta => self.inverted_index_meta = bytes, + DiskCacheKind::VectorIndexFilter => self.vector_index_filter = bytes, + DiskCacheKind::VectorIndexMeta => self.vector_index_meta = bytes, + } + } +} + #[derive(Default)] struct CacheSlot { cache: RwLock>, @@ -158,6 +241,7 @@ impl CacheManager { let tenant_id = tenant_id.into(); let on_disk_cache_sync_data = config.disk_cache_config.sync_data; let allows_on_disk_cache = AtomicBool::new(ee_mode); + let disk_cache_sizes = derive_disk_cache_sizes(config)?; let on_disk_cache_queue_size: u32 = if config.table_data_cache_population_queue_size > 0 { config.table_data_cache_population_queue_size @@ -196,7 +280,7 @@ impl CacheManager { Unit::Bytes, &table_data_on_disk_cache_path, on_disk_cache_queue_size, - config.disk_cache_config.max_bytes as usize, + disk_cache_sizes.column_data, config.data_cache_key_reload_policy.clone(), on_disk_cache_sync_data, ee_mode, @@ -273,7 +357,7 @@ impl CacheManager { Unit::Bytes, &bloom_filter_on_disk_cache_path, on_disk_cache_queue_size, - config.disk_cache_table_bloom_index_data_size as usize, + disk_cache_sizes.bloom_index_filter, DiskCacheKeyReloadPolicy::Fuzzy, on_disk_cache_sync_data, ee_mode, @@ -291,7 +375,7 @@ impl CacheManager { Unit::Count, &bloom_filter_meta_on_disk_cache_path, on_disk_cache_queue_size, - config.disk_cache_table_bloom_index_meta_size as usize, + disk_cache_sizes.bloom_index_meta, DiskCacheKeyReloadPolicy::Fuzzy, on_disk_cache_sync_data, ee_mode, @@ -309,7 +393,7 @@ impl CacheManager { Unit::Count, &inverted_index_meta_on_disk_cache_path, on_disk_cache_queue_size, - config.disk_cache_inverted_index_meta_size as usize, + disk_cache_sizes.inverted_index_meta, DiskCacheKeyReloadPolicy::Fuzzy, on_disk_cache_sync_data, ee_mode, @@ -335,7 +419,7 @@ impl CacheManager { Unit::Bytes, &inverted_index_file_on_disk_cache_path, on_disk_cache_queue_size, - config.disk_cache_inverted_index_data_size as usize, + disk_cache_sizes.inverted_index_filter, DiskCacheKeyReloadPolicy::Fuzzy, on_disk_cache_sync_data, ee_mode, @@ -353,7 +437,7 @@ impl CacheManager { Unit::Count, &vector_index_meta_on_disk_cache_path, on_disk_cache_queue_size, - config.disk_cache_vector_index_meta_size as usize, + disk_cache_sizes.vector_index_meta, DiskCacheKeyReloadPolicy::Fuzzy, on_disk_cache_sync_data, ee_mode, @@ -379,7 +463,7 @@ impl CacheManager { Unit::Bytes, &vector_index_file_on_disk_cache_path, on_disk_cache_queue_size, - config.disk_cache_vector_index_data_size as usize, + disk_cache_sizes.vector_index_filter, DiskCacheKeyReloadPolicy::Fuzzy, on_disk_cache_sync_data, ee_mode, @@ -851,6 +935,106 @@ impl CacheManager { } } +fn derive_disk_cache_sizes(config: &CacheConfig) -> Result { + let total_bytes = config.disk_cache_config.max_bytes as usize; + let ratios: &DiskCacheRatioInnerConfig = &config.disk_cache_config.ratios; + let mut sizes = DiskCacheSizes::default(); + let mut log_entries: Vec<(DiskCacheKind, usize, AllocationSource)> = Vec::new(); + let mut ratio_entries: Vec<(DiskCacheKind, f64)> = Vec::new(); + let mut ratio_sum = 0.0; + + for kind in DiskCacheKind::ALL { + let override_bytes = disk_cache_override_bytes(kind, config); + if override_bytes > 0 { + let bytes = override_bytes as usize; + sizes.set(kind, bytes); + log_entries.push((kind, bytes, AllocationSource::Override)); + } else { + let ratio = disk_cache_ratio_value(kind, ratios); + if ratio < 0.0 { + return Err(ErrorCode::BadArguments(format!( + "Disk cache ratio for {} must be non-negative, got {}", + kind, ratio + ))); + } + if ratio > 0.0 { + ratio_sum += ratio; + } + ratio_entries.push((kind, ratio)); + } + } + + if ratio_sum == 0.0 { + if let Some(entry) = ratio_entries + .iter_mut() + .find(|(kind, _)| *kind == DiskCacheKind::ColumnData) + { + entry.1 = 1.0; + ratio_sum = 1.0; + } + } + + for (kind, ratio) in ratio_entries { + let bytes = if ratio_sum == 0.0 || ratio <= 0.0 { + 0 + } else { + ((ratio / ratio_sum) * total_bytes as f64).round() as usize + }; + sizes.set(kind, bytes); + log_entries.push((kind, bytes, AllocationSource::Ratio)); + } + + log_disk_cache_allocations(total_bytes, ratio_sum, &log_entries); + Ok(sizes) +} + +fn disk_cache_override_bytes(kind: DiskCacheKind, config: &CacheConfig) -> u64 { + match kind { + DiskCacheKind::ColumnData => 0, + DiskCacheKind::BloomIndexFilter => config.disk_cache_table_bloom_index_data_size, + DiskCacheKind::BloomIndexMeta => config.disk_cache_table_bloom_index_meta_size, + DiskCacheKind::InvertedIndexFilter => config.disk_cache_inverted_index_data_size, + DiskCacheKind::InvertedIndexMeta => config.disk_cache_inverted_index_meta_size, + DiskCacheKind::VectorIndexFilter => config.disk_cache_vector_index_data_size, + DiskCacheKind::VectorIndexMeta => config.disk_cache_vector_index_meta_size, + } +} + +fn disk_cache_ratio_value(kind: DiskCacheKind, ratios: &DiskCacheRatioInnerConfig) -> f64 { + match kind { + DiskCacheKind::ColumnData => ratios.column_data, + DiskCacheKind::BloomIndexFilter => ratios.bloom_index_filter, + DiskCacheKind::BloomIndexMeta => ratios.bloom_index_meta, + DiskCacheKind::InvertedIndexFilter => ratios.inverted_index_filter, + DiskCacheKind::InvertedIndexMeta => ratios.inverted_index_meta, + DiskCacheKind::VectorIndexFilter => ratios.vector_index_filter, + DiskCacheKind::VectorIndexMeta => ratios.vector_index_meta, + } +} + +fn log_disk_cache_allocations( + total_bytes: usize, + weight_sum: f64, + entries: &[(DiskCacheKind, usize, AllocationSource)], +) { + let mut parts = Vec::with_capacity(DiskCacheKind::ALL.len()); + for kind in DiskCacheKind::ALL { + let (bytes, source) = entries + .iter() + .rev() + .find(|(k, _, _)| *k == kind) + .map(|(_, bytes, source)| (*bytes, source.as_str())) + .unwrap_or((0, AllocationSource::Ratio.as_str())); + parts.push(format!("{}={}({})", kind, bytes, source)); + } + info!( + "[CacheManager] Disk cache allocation plan (global {}, weight_sum {:.2}): {}", + total_bytes, + weight_sum, + parts.join(", ") + ); +} + const MEMORY_CACHE_TABLE_DATA: &str = "memory_cache_table_data"; const MEMORY_CACHE_PARQUET_META_DATA: &str = "memory_cache_parquet_meta_data"; const MEMORY_CACHE_PRUNE_PARTITIONS: &str = "memory_cache_prune_partitions"; @@ -1369,4 +1553,50 @@ mod tests { Ok(()) } + + #[test] + fn test_disk_cache_ratio_distribution() -> Result<()> { + let mut cache_config = CacheConfig::default(); + cache_config.disk_cache_config.max_bytes = 1000; + cache_config.disk_cache_config.ratios.bloom_index_filter = 1.0; + + let sizes = derive_disk_cache_sizes(&cache_config)?; + assert_eq!(sizes.column_data, 500); + assert_eq!(sizes.bloom_index_filter, 500); + assert_eq!(sizes.vector_index_filter, 0); + Ok(()) + } + + #[test] + fn test_disk_cache_ratio_fallback() -> Result<()> { + let mut cache_config = CacheConfig::default(); + cache_config.disk_cache_config.max_bytes = 2048; + cache_config.disk_cache_config.ratios.column_data = 0.0; + + let sizes = derive_disk_cache_sizes(&cache_config)?; + assert_eq!(sizes.column_data, 2048); + Ok(()) + } + + #[test] + fn test_disk_cache_ratio_override_precedence() -> Result<()> { + let mut cache_config = CacheConfig::default(); + cache_config.disk_cache_config.max_bytes = 1000; + cache_config.disk_cache_table_bloom_index_data_size = 600; + cache_config.disk_cache_config.ratios.bloom_index_filter = 1.0; + + let sizes = derive_disk_cache_sizes(&cache_config)?; + assert_eq!(sizes.bloom_index_filter, 600); + assert_eq!(sizes.column_data, 1000); + Ok(()) + } + + #[test] + fn test_disk_cache_ratio_negative_should_fail() { + let mut cache_config = CacheConfig::default(); + cache_config.disk_cache_config.ratios.column_data = -0.1; + + let err = derive_disk_cache_sizes(&cache_config).unwrap_err(); + assert!(err.message().contains("Disk cache ratio")); + } }