Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refine replace into by caching individual BlockMeta #17368

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/query/storages/common/cache/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub type CompactSegmentInfoCache = InMemoryLruCache<CompactSegmentInfo>;
pub type SegmentBlockMetasCache = InMemoryLruCache<Vec<Arc<BlockMeta>>>;

/// In-memory cache of individual BlockMeta.
pub type BlockMetaCache = InMemoryLruCache<Arc<BlockMeta>>;
pub type BlockMetaCache = InMemoryLruCache<BlockMeta>;

/// In memory object cache of TableSnapshot
pub type TableSnapshotCache = InMemoryLruCache<TableSnapshot>;
Expand Down Expand Up @@ -186,8 +186,8 @@ impl From<Vec<Arc<BlockMeta>>> for CacheValue<Vec<Arc<BlockMeta>>> {
}
}

impl From<Arc<BlockMeta>> for CacheValue<Arc<BlockMeta>> {
fn from(value: Arc<BlockMeta>) -> Self {
impl From<BlockMeta> for CacheValue<BlockMeta> {
fn from(value: BlockMeta) -> Self {
CacheValue {
inner: Arc::new(value),
mem_bytes: 0,
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/common/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod temp_dir;

pub use cache::CacheAccessor;
pub use cache::Unit;
pub use caches::BlockMetaCache;
pub use caches::CacheValue;
pub use caches::CachedObject;
pub use caches::SegmentBlockMetasCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use databend_common_metrics::storage::*;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::executor::physical_plans::OnConflictField;
use databend_common_sql::StreamContext;
use databend_storages_common_cache::BlockMetaCache;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CacheManager;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_index::filters::Filter;
use databend_storages_common_index::filters::Xor8Filter;
Expand Down Expand Up @@ -100,6 +103,8 @@ struct AggregationContext {
io_request_semaphore: Arc<Semaphore>,
// generate stream columns if necessary
stream_ctx: Option<StreamContext>,

block_meta_cache: Option<BlockMetaCache>,
}

// Apply MergeIntoOperations to segments
Expand Down Expand Up @@ -209,6 +214,7 @@ impl ReplaceIntoOperationAggregator {
block_builder,
io_request_semaphore,
stream_ctx,
block_meta_cache: CacheManager::instance().get_block_meta_cache(),
}),
})
}
Expand Down Expand Up @@ -291,6 +297,8 @@ impl ReplaceIntoOperationAggregator {
impl ReplaceIntoOperationAggregator {
#[async_backtrace::framed]
pub async fn apply(&mut self) -> Result<Option<MutationLogs>> {
let block_meta_cache = &self.aggregation_ctx.block_meta_cache;

metrics_inc_replace_number_apply_deletion();

// track number of segments and blocks after pruning (per merge action application)
Expand All @@ -317,7 +325,7 @@ impl ReplaceIntoOperationAggregator {
let mut mutation_log_handlers = Vec::new();
let mut num_rows_mutated = 0;
for (segment_idx, block_deletion) in self.deletion_accumulator.deletions.drain() {
let (path, ver) = self
let (segment_path, ver) = self
.aggregation_ctx
.segment_locations
.get(&segment_idx)
Expand All @@ -329,19 +337,41 @@ impl ReplaceIntoOperationAggregator {
})?;

let load_param = LoadParams {
location: path.clone(),
location: segment_path.clone(),
len_hint: None,
ver: *ver,
put_cache: true,
};

let compact_segment_info = aggregation_ctx.segment_reader.read(&load_param).await?;
let segment_info: SegmentInfo = compact_segment_info.try_into()?;
// Retain SegmentInfo to avoid repeatedly extracting it from CompactSegmentInfo later.
let mut opt_segment_info: Option<SegmentInfo> = None;

for (block_index, keys) in block_deletion {
let block_cache_key = format!("{segment_path}-{block_index}");
let block_meta = match block_meta_cache.get(&block_cache_key) {
Some(block_meta) => block_meta,
None => {
let block_meta = if let Some(segment_info) = &opt_segment_info {
segment_info.blocks[block_index].clone()
} else {
let compact_segment_info =
aggregation_ctx.segment_reader.read(&load_param).await?;
let segment_info: SegmentInfo = compact_segment_info.try_into()?;
let block_meta = segment_info.blocks[block_index].clone();
opt_segment_info = Some(segment_info);
block_meta
};
// A query node typically processes only a subset of the BlockMeta in a given segment.
// Therefore, even though all BlockMeta of a segment are available here, not all are populated into the cache.
block_meta_cache.insert(block_cache_key, block_meta.as_ref().clone());
block_meta
}
};

let permit =
acquire_task_permit(aggregation_ctx.io_request_semaphore.clone()).await?;
let block_meta = segment_info.blocks[block_index].clone();

// let block_meta = segment_info.blocks[block_index].clone();
let aggregation_ctx = aggregation_ctx.clone();
num_rows_mutated += block_meta.row_count;
// self.aggregation_ctx.
Expand Down Expand Up @@ -604,7 +634,7 @@ impl AggregationContext {
if let Some(stats) = column_stats {
let max = stats.max();
let min = stats.min();
std::cmp::min(key_max, max) >= std::cmp::max(key_min,min)
std::cmp::min(key_max, max) >= std::cmp::max(key_min, min)
|| // coincide overlap
(max == key_max && min == key_min)
} else {
Expand All @@ -630,22 +660,22 @@ impl AggregationContext {
let reader = reader.clone();
GlobalIORuntime::instance()
.spawn(async move {
let column_chunks = merged_io_read_result.columns_chunks()?;
reader.deserialize_chunks(
block_meta_ptr.location.0.as_str(),
block_meta_ptr.row_count as usize,
&block_meta_ptr.compression,
&block_meta_ptr.col_metas,
column_chunks,
&storage_format,
)
})
let column_chunks = merged_io_read_result.columns_chunks()?;
reader.deserialize_chunks(
block_meta_ptr.location.0.as_str(),
block_meta_ptr.row_count as usize,
&block_meta_ptr.compression,
&block_meta_ptr.col_metas,
column_chunks,
&storage_format,
)
})
.await
.map_err(|e| {
ErrorCode::Internal(
"unexpected, failed to join aggregation context read block tasks for replace into.",
)
.add_message_back(e.to_string())
.add_message_back(e.to_string())
})?
}

Expand Down
Loading