From 21c17963b5711f04f6a7d58896e40eba874a40cf Mon Sep 17 00:00:00 2001 From: dantengsky Date: Sat, 25 Jan 2025 13:37:42 +0800 Subject: [PATCH] refactor: refine `replace into` by caching individual BlockMeta --- src/query/storages/common/cache/src/caches.rs | 6 +- src/query/storages/common/cache/src/lib.rs | 1 + .../mutator/replace_into_operation_agg.rs | 64 ++++++++++++++----- 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/src/query/storages/common/cache/src/caches.rs b/src/query/storages/common/cache/src/caches.rs index d86e647c4aec5..b3530b4626841 100644 --- a/src/query/storages/common/cache/src/caches.rs +++ b/src/query/storages/common/cache/src/caches.rs @@ -44,7 +44,7 @@ pub type CompactSegmentInfoCache = InMemoryLruCache; pub type SegmentBlockMetasCache = InMemoryLruCache>>; /// In-memory cache of individual BlockMeta. -pub type BlockMetaCache = InMemoryLruCache>; +pub type BlockMetaCache = InMemoryLruCache; /// In memory object cache of TableSnapshot pub type TableSnapshotCache = InMemoryLruCache; @@ -186,8 +186,8 @@ impl From>> for CacheValue>> { } } -impl From> for CacheValue> { - fn from(value: Arc) -> Self { +impl From for CacheValue { + fn from(value: BlockMeta) -> Self { CacheValue { inner: Arc::new(value), mem_bytes: 0, diff --git a/src/query/storages/common/cache/src/lib.rs b/src/query/storages/common/cache/src/lib.rs index 44b3826a4935d..bd56ea554b3fc 100644 --- a/src/query/storages/common/cache/src/lib.rs +++ b/src/query/storages/common/cache/src/lib.rs @@ -25,6 +25,7 @@ mod temp_dir; pub use cache::CacheAccessor; pub use cache::Unit; +pub use caches::BlockMetaCache; pub use caches::CacheValue; pub use caches::CachedObject; pub use caches::SegmentBlockMetasCache; diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs index 5eb33fb9192cb..7172ceae31304 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs @@ -43,6 +43,9 @@ use databend_common_metrics::storage::*; use databend_common_sql::evaluator::BlockOperator; use databend_common_sql::executor::physical_plans::OnConflictField; use databend_common_sql::StreamContext; +use databend_storages_common_cache::BlockMetaCache; +use databend_storages_common_cache::CacheAccessor; +use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::LoadParams; use databend_storages_common_index::filters::Filter; use databend_storages_common_index::filters::Xor8Filter; @@ -100,6 +103,8 @@ struct AggregationContext { io_request_semaphore: Arc, // generate stream columns if necessary stream_ctx: Option, + + block_meta_cache: Option, } // Apply MergeIntoOperations to segments @@ -209,6 +214,7 @@ impl ReplaceIntoOperationAggregator { block_builder, io_request_semaphore, stream_ctx, + block_meta_cache: CacheManager::instance().get_block_meta_cache(), }), }) } @@ -291,6 +297,8 @@ impl ReplaceIntoOperationAggregator { impl ReplaceIntoOperationAggregator { #[async_backtrace::framed] pub async fn apply(&mut self) -> Result> { + let block_meta_cache = &self.aggregation_ctx.block_meta_cache; + metrics_inc_replace_number_apply_deletion(); // track number of segments and blocks after pruning (per merge action application) @@ -317,7 +325,7 @@ impl ReplaceIntoOperationAggregator { let mut mutation_log_handlers = Vec::new(); let mut num_rows_mutated = 0; for (segment_idx, block_deletion) in self.deletion_accumulator.deletions.drain() { - let (path, ver) = self + let (segment_path, ver) = self .aggregation_ctx .segment_locations .get(&segment_idx) @@ -329,19 +337,41 @@ impl ReplaceIntoOperationAggregator { })?; let load_param = LoadParams { - location: path.clone(), + location: segment_path.clone(), len_hint: None, ver: *ver, put_cache: true, }; - let compact_segment_info = aggregation_ctx.segment_reader.read(&load_param).await?; - let segment_info: SegmentInfo = compact_segment_info.try_into()?; + // Retain SegmentInfo to avoid repeatedly extracting it from CompactSegmentInfo later. + let mut opt_segment_info: Option = None; for (block_index, keys) in block_deletion { + let block_cache_key = format!("{segment_path}-{block_index}"); + let block_meta = match block_meta_cache.get(&block_cache_key) { + Some(block_meta) => block_meta, + None => { + let block_meta = if let Some(segment_info) = &opt_segment_info { + segment_info.blocks[block_index].clone() + } else { + let compact_segment_info = + aggregation_ctx.segment_reader.read(&load_param).await?; + let segment_info: SegmentInfo = compact_segment_info.try_into()?; + let block_meta = segment_info.blocks[block_index].clone(); + opt_segment_info = Some(segment_info); + block_meta + }; + // A query node typically processes only a subset of the BlockMeta in a given segment. + // Therefore, even though all BlockMeta of a segment are available here, not all are populated into the cache. + block_meta_cache.insert(block_cache_key, block_meta.as_ref().clone()); + block_meta + } + }; + let permit = acquire_task_permit(aggregation_ctx.io_request_semaphore.clone()).await?; - let block_meta = segment_info.blocks[block_index].clone(); + + // let block_meta = segment_info.blocks[block_index].clone(); let aggregation_ctx = aggregation_ctx.clone(); num_rows_mutated += block_meta.row_count; // self.aggregation_ctx. @@ -604,7 +634,7 @@ impl AggregationContext { if let Some(stats) = column_stats { let max = stats.max(); let min = stats.min(); - std::cmp::min(key_max, max) >= std::cmp::max(key_min,min) + std::cmp::min(key_max, max) >= std::cmp::max(key_min, min) || // coincide overlap (max == key_max && min == key_min) } else { @@ -630,22 +660,22 @@ impl AggregationContext { let reader = reader.clone(); GlobalIORuntime::instance() .spawn(async move { - let column_chunks = merged_io_read_result.columns_chunks()?; - reader.deserialize_chunks( - block_meta_ptr.location.0.as_str(), - block_meta_ptr.row_count as usize, - &block_meta_ptr.compression, - &block_meta_ptr.col_metas, - column_chunks, - &storage_format, - ) - }) + let column_chunks = merged_io_read_result.columns_chunks()?; + reader.deserialize_chunks( + block_meta_ptr.location.0.as_str(), + block_meta_ptr.row_count as usize, + &block_meta_ptr.compression, + &block_meta_ptr.col_metas, + column_chunks, + &storage_format, + ) + }) .await .map_err(|e| { ErrorCode::Internal( "unexpected, failed to join aggregation context read block tasks for replace into.", ) - .add_message_back(e.to_string()) + .add_message_back(e.to_string()) })? }