Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/query/storages/fuse/src/operations/read/fuse_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::io::BlockReader;
use crate::io::VirtualColumnReader;
use crate::operations::read::DeserializeDataTransform;
use crate::operations::read::NativeDeserializeDataTransform;
use crate::operations::read::TransformRuntimeFilter;
use crate::operations::read::TransformRuntimeFilterWait;
use crate::operations::read::block_partition_receiver_source::BlockPartitionReceiverSource;
use crate::operations::read::block_partition_source::BlockPartitionSource;
Expand Down Expand Up @@ -119,6 +120,19 @@ pub fn build_fuse_native_source_pipeline(
)
})?;

pipeline.add_transform(|input, output| {
let mut output_schema = plan.schema().as_ref().clone();
output_schema.remove_internal_fields();
let output_schema = (&output_schema).into();
Ok(TransformRuntimeFilter::create(
ctx.clone(),
plan.scan_id,
output_schema,
input,
output,
))
})?;

pipeline.try_resize(max_threads)?;

Ok(())
Expand Down Expand Up @@ -211,6 +225,19 @@ pub fn build_fuse_parquet_source_pipeline(
)
})?;

pipeline.add_transform(|input, output| {
let mut output_schema = plan.schema().as_ref().clone();
output_schema.remove_internal_fields();
let output_schema = (&output_schema).into();
Ok(TransformRuntimeFilter::create(
ctx.clone(),
plan.scan_id,
output_schema,
input,
output,
))
})?;

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/operations/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod parquet_data_source;
mod parquet_data_source_deserializer;
mod parquet_data_transform_reader;
mod parquet_rows_fetcher;
mod runtime_filter_transform;
mod runtime_filter_wait;

mod block_partition_meta;
Expand All @@ -33,5 +34,6 @@ pub use fuse_rows_fetcher::row_fetch_processor;
pub use fuse_source::build_fuse_parquet_source_pipeline;
pub use native_data_source_deserializer::NativeDeserializeDataTransform;
pub use parquet_data_source_deserializer::DeserializeDataTransform;
pub use runtime_filter_transform::TransformRuntimeFilter;
pub use runtime_filter_wait::TransformRuntimeFilterWait;
pub use util::need_reserve_block_info;
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,12 @@ use std::any::Any;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::ops::BitAnd;
use std::sync::Arc;

use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::plan::TopK;
use databend_common_catalog::runtime_filter_info::RuntimeBloomFilter;
use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
use databend_common_catalog::runtime_filter_info::RuntimeFilterStats;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::BlockEntry;
Expand All @@ -41,7 +33,6 @@ use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::Evaluator;
use databend_common_expression::Expr;
use databend_common_expression::FieldIndex;
use databend_common_expression::FilterExecutor;
use databend_common_expression::FunctionContext;
use databend_common_expression::Scalar;
Expand Down Expand Up @@ -69,7 +60,6 @@ use crate::fuse_part::FuseBlockPartInfo;
use crate::io::AggIndexReader;
use crate::io::BlockReader;
use crate::operations::read::data_source_with_meta::DataSourceWithMeta;
use crate::pruning::ExprBloomFilter;

/// A helper struct to store the intermediate state while reading a native partition.
#[derive(Default)]
Expand Down Expand Up @@ -196,10 +186,8 @@ pub struct NativeDeserializeDataTransform {
output_data: Option<DataBlock>,
parts: VecDeque<PartInfoPtr>,
columns: VecDeque<NativeDataSource>,
scan_progress: Arc<Progress>,

// Structures for table scan information:
scan_id: usize,
block_reader: Arc<BlockReader>,
src_schema: DataSchema,
output_schema: DataSchema,
Expand All @@ -213,10 +201,6 @@ pub struct NativeDeserializeDataTransform {
prewhere_filter: Arc<Option<Expr>>,
filter_executor: Option<FilterExecutor>,

// Structures for the bloom runtime filter:
ctx: Arc<dyn TableContext>,
bloom_runtime_filter: Option<Vec<BloomRuntimeFilterRef>>,

// Structures for aggregating index:
index_reader: Arc<Option<AggIndexReader>>,
remain_columns: Vec<usize>,
Expand All @@ -233,14 +217,6 @@ pub struct NativeDeserializeDataTransform {
need_reserve_block_info: bool,
}

#[derive(Clone)]
struct BloomRuntimeFilterRef {
column_index: FieldIndex,
filter_id: usize,
filter: RuntimeBloomFilter,
stats: Arc<RuntimeFilterStats>,
}

impl NativeDeserializeDataTransform {
#[allow(clippy::too_many_arguments)]
pub fn create(
Expand All @@ -252,7 +228,6 @@ impl NativeDeserializeDataTransform {
output: Arc<OutputPort>,
index_reader: Arc<Option<AggIndexReader>>,
) -> Result<ProcessorPtr> {
let scan_progress = ctx.get_scan_progress();
let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index);
let src_schema: DataSchema = (block_reader.schema().as_ref()).into();

Expand Down Expand Up @@ -311,10 +286,7 @@ impl NativeDeserializeDataTransform {
let output_schema: DataSchema = (&output_schema).into();
Ok(ProcessorPtr::create(Box::new(
NativeDeserializeDataTransform {
ctx,
scan_id: plan.scan_id,
func_ctx,
scan_progress,
block_reader,
input,
output,
Expand All @@ -331,7 +303,6 @@ impl NativeDeserializeDataTransform {
top_k,
index_reader,
base_block_ids: plan.base_block_ids.clone(),
bloom_runtime_filter: None,
read_state: ReadPartState::new(),
need_reserve_block_info,
},
Expand All @@ -352,17 +323,10 @@ impl NativeDeserializeDataTransform {
))
}

fn add_output_block(&mut self, data_block: DataBlock) {
let rows = data_block.num_rows();
if rows == 0 {
fn set_output_block(&mut self, data_block: DataBlock) {
if data_block.num_rows() == 0 {
return;
}
let progress_values = ProgressValues {
rows,
bytes: data_block.memory_size(),
};
self.scan_progress.incr(&progress_values);
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, data_block.memory_size());
self.output_data = Some(data_block);
}

Expand Down Expand Up @@ -532,15 +496,7 @@ impl NativeDeserializeDataTransform {
continue;
}

// 4. check and evaluator the bloom runtime filter.
if !self.read_and_check_bloom_runtime_filter()? {
// skip current pages.
self.skipped_pages += 1;
self.read_state.skip_pages();
continue;
}

// 5. read remain columns and generate a data block.
// 4. read remain columns and generate a data block.
if !self.read_remain_columns()? {
debug_assert!(self.read_state.is_finished());
return Ok(None);
Expand All @@ -549,7 +505,7 @@ impl NativeDeserializeDataTransform {
.block_reader
.build_block(&self.read_state.columns, None)?;

// 6. fill missing fields with default values.
// 5. fill missing fields with default values.
if self.read_state.if_need_fill_defaults {
block = self
.block_reader
Expand Down Expand Up @@ -636,70 +592,6 @@ impl NativeDeserializeDataTransform {
Ok(true)
}

// TODO(xudong): add selectivity prediction
/// Read and check the column for the bloom runtime filter (only one column).
///
/// Returns false if skip the current page or the partition is finished.
fn read_and_check_bloom_runtime_filter(&mut self) -> Result<bool> {
if let Some(bloom_runtime_filter) = self.bloom_runtime_filter.as_ref() {
let mut bitmaps = Vec::with_capacity(bloom_runtime_filter.len());
for runtime_filter in bloom_runtime_filter.iter() {
let start = std::time::Instant::now();
let column = if let Some((_, column)) = self
.read_state
.columns
.iter()
.find(|(i, _)| i == &runtime_filter.column_index)
{
(runtime_filter.column_index, column.clone())
} else if !self.read_state.read_page(runtime_filter.column_index)? {
debug_assert!(self.read_state.is_finished());
return Ok(false);
} else {
// The runtime filter column must be the last column to read.
let (i, column) = self.read_state.columns.last().unwrap();
debug_assert_eq!(i, &runtime_filter.column_index);
(runtime_filter.column_index, column.clone())
};

let probe_block = self.block_reader.build_block(&[column], None)?;
let mut bitmap = MutableBitmap::from_len_zeroed(probe_block.num_rows());
let probe_column = probe_block.get_last_column().clone();
// Apply the filter to the probe column.
ExprBloomFilter::new(&runtime_filter.filter).apply(probe_column, &mut bitmap)?;

let unset_bits = bitmap.null_count();
let elapsed = start.elapsed();
runtime_filter
.stats
.record_bloom(elapsed.as_nanos() as u64, unset_bits as u64);
if unset_bits == bitmap.len() {
// skip current page.
return Ok(false);
}
if unset_bits != 0 {
bitmaps.push(bitmap);
}
}
if !bitmaps.is_empty() {
let rf_bitmap = bitmaps
.into_iter()
.reduce(|acc, rf_filter| acc.bitand(&rf_filter.into()))
.unwrap();

let filter_executor = self.filter_executor.as_mut().unwrap();
let filter_count = if let Some(count) = self.read_state.filtered_count {
filter_executor.select_bitmap(count, rf_bitmap)
} else {
filter_executor.from_bitmap(rf_bitmap)
};
self.read_state.filtered_count = Some(filter_count);
}
}

Ok(true)
}

/// Update the top-k heap with by the topk column.
///
/// Returns false if skip the current page.
Expand Down Expand Up @@ -746,46 +638,6 @@ impl NativeDeserializeDataTransform {
Ok(true)
}

/// Try to get bloom runtime filter from context.
fn try_init_bloom_runtime_filter(&mut self) {
if self.bloom_runtime_filter.is_none() {
let bloom_filters = self
.ctx
.get_runtime_filters(self.scan_id)
.into_iter()
.filter_map(|entry| {
let filter_id = entry.id;
let RuntimeFilterEntry { bloom, stats, .. } = entry;
let bloom = bloom?;
let column_index = self.src_schema.index_of(bloom.column_name.as_str()).ok()?;
Some(BloomRuntimeFilterRef {
column_index,
filter_id,
filter: bloom.filter.clone(),
stats,
})
})
.collect::<Vec<_>>();
if !bloom_filters.is_empty() {
let mut filter_ids = bloom_filters
.iter()
.map(|f| f.filter_id)
.collect::<Vec<_>>();
filter_ids.sort_unstable();
log::info!(
"RUNTIME-FILTER: scan_id={} bloom_filters={} filter_ids={:?}",
self.scan_id,
bloom_filters.len(),
filter_ids
);
self.bloom_runtime_filter = Some(bloom_filters);
if self.filter_executor.is_none() {
self.filter_executor = Some(new_dummy_filter_executor(self.func_ctx.clone()));
}
}
}
}

/// Pre-process the partition before reading it.
fn pre_process_partition(&mut self) -> Result<()> {
debug_assert!(!self.columns.is_empty());
Expand All @@ -806,7 +658,7 @@ impl NativeDeserializeDataTransform {
let part = self.parts.front().unwrap();
let fuse_part = FuseBlockPartInfo::from_part(part)?;
let block = self.build_default_block(fuse_part)?;
self.add_output_block(block);
self.set_output_block(block);

self.finish_partition();
return Ok(());
Expand Down Expand Up @@ -924,9 +776,6 @@ impl Processor for NativeDeserializeDataTransform {
}

fn process(&mut self) -> Result<()> {
// Try to get the bloom runtime filter from the context if existed.
self.try_init_bloom_runtime_filter();

// Only if current read state is finished can we start to read a new partition.
if self.read_state.is_finished() {
if let Some(columns) = self.columns.front_mut() {
Expand Down Expand Up @@ -957,7 +806,7 @@ impl Processor for NativeDeserializeDataTransform {
)?;

self.finish_partition();
self.add_output_block(data_block);
self.set_output_block(data_block);
return Ok(());
}

Expand All @@ -974,7 +823,7 @@ impl Processor for NativeDeserializeDataTransform {
// Each `process` try to produce one `DataBlock`.
if let Some(block) = self.read_pages()? {
let block = self.post_process_block(block)?;
self.add_output_block(block);
self.set_output_block(block);
} else {
// No more data can be read from current partition.
self.finish_partition();
Expand All @@ -986,7 +835,7 @@ impl Processor for NativeDeserializeDataTransform {

/// Build a dummy filter executor to retain a selection.
///
/// This method may be used by `update_topk_heap` and `read_and_check_bloom_runtime_filter`.
/// This method may be used by `update_topk_heap`.
fn new_dummy_filter_executor(func_ctx: FunctionContext) -> FilterExecutor {
let dummy_expr = Expr::Constant(Constant {
span: None,
Expand Down
Loading
Loading