diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 7dc5cabc92f47..b4d1616613814 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -38,6 +38,7 @@ use crate::io::BlockReader; use crate::io::VirtualColumnReader; use crate::operations::read::DeserializeDataTransform; use crate::operations::read::NativeDeserializeDataTransform; +use crate::operations::read::TransformRuntimeFilter; use crate::operations::read::TransformRuntimeFilterWait; use crate::operations::read::block_partition_receiver_source::BlockPartitionReceiverSource; use crate::operations::read::block_partition_source::BlockPartitionSource; @@ -119,6 +120,19 @@ pub fn build_fuse_native_source_pipeline( ) })?; + pipeline.add_transform(|input, output| { + let mut output_schema = plan.schema().as_ref().clone(); + output_schema.remove_internal_fields(); + let output_schema = (&output_schema).into(); + Ok(TransformRuntimeFilter::create( + ctx.clone(), + plan.scan_id, + output_schema, + input, + output, + )) + })?; + pipeline.try_resize(max_threads)?; Ok(()) @@ -211,6 +225,19 @@ pub fn build_fuse_parquet_source_pipeline( ) })?; + pipeline.add_transform(|input, output| { + let mut output_schema = plan.schema().as_ref().clone(); + output_schema.remove_internal_fields(); + let output_schema = (&output_schema).into(); + Ok(TransformRuntimeFilter::create( + ctx.clone(), + plan.scan_id, + output_schema, + input, + output, + )) + })?; + Ok(()) } diff --git a/src/query/storages/fuse/src/operations/read/mod.rs b/src/query/storages/fuse/src/operations/read/mod.rs index f0df63e6d77ee..d0cb238dd0153 100644 --- a/src/query/storages/fuse/src/operations/read/mod.rs +++ b/src/query/storages/fuse/src/operations/read/mod.rs @@ -21,6 +21,7 @@ mod parquet_data_source; mod parquet_data_source_deserializer; mod parquet_data_transform_reader; mod parquet_rows_fetcher; +mod runtime_filter_transform; mod runtime_filter_wait; mod block_partition_meta; @@ -33,5 +34,6 @@ pub use fuse_rows_fetcher::row_fetch_processor; pub use fuse_source::build_fuse_parquet_source_pipeline; pub use native_data_source_deserializer::NativeDeserializeDataTransform; pub use parquet_data_source_deserializer::DeserializeDataTransform; +pub use runtime_filter_transform::TransformRuntimeFilter; pub use runtime_filter_wait::TransformRuntimeFilterWait; pub use util::need_reserve_block_info; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 476d6f1a4446d..4a8d220504ea7 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -16,20 +16,12 @@ use std::any::Any; use std::collections::BTreeMap; use std::collections::HashSet; use std::collections::VecDeque; -use std::ops::BitAnd; use std::sync::Arc; -use databend_common_base::base::Progress; -use databend_common_base::base::ProgressValues; -use databend_common_base::runtime::profile::Profile; -use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::TopK; -use databend_common_catalog::runtime_filter_info::RuntimeBloomFilter; -use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry; -use databend_common_catalog::runtime_filter_info::RuntimeFilterStats; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::BlockEntry; @@ -41,7 +33,6 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataSchema; use databend_common_expression::Evaluator; use databend_common_expression::Expr; -use databend_common_expression::FieldIndex; use databend_common_expression::FilterExecutor; use databend_common_expression::FunctionContext; use databend_common_expression::Scalar; @@ -69,7 +60,6 @@ use crate::fuse_part::FuseBlockPartInfo; use crate::io::AggIndexReader; use crate::io::BlockReader; use crate::operations::read::data_source_with_meta::DataSourceWithMeta; -use crate::pruning::ExprBloomFilter; /// A helper struct to store the intermediate state while reading a native partition. #[derive(Default)] @@ -196,10 +186,8 @@ pub struct NativeDeserializeDataTransform { output_data: Option, parts: VecDeque, columns: VecDeque, - scan_progress: Arc, // Structures for table scan information: - scan_id: usize, block_reader: Arc, src_schema: DataSchema, output_schema: DataSchema, @@ -213,10 +201,6 @@ pub struct NativeDeserializeDataTransform { prewhere_filter: Arc>, filter_executor: Option, - // Structures for the bloom runtime filter: - ctx: Arc, - bloom_runtime_filter: Option>, - // Structures for aggregating index: index_reader: Arc>, remain_columns: Vec, @@ -233,14 +217,6 @@ pub struct NativeDeserializeDataTransform { need_reserve_block_info: bool, } -#[derive(Clone)] -struct BloomRuntimeFilterRef { - column_index: FieldIndex, - filter_id: usize, - filter: RuntimeBloomFilter, - stats: Arc, -} - impl NativeDeserializeDataTransform { #[allow(clippy::too_many_arguments)] pub fn create( @@ -252,7 +228,6 @@ impl NativeDeserializeDataTransform { output: Arc, index_reader: Arc>, ) -> Result { - let scan_progress = ctx.get_scan_progress(); let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); let src_schema: DataSchema = (block_reader.schema().as_ref()).into(); @@ -311,10 +286,7 @@ impl NativeDeserializeDataTransform { let output_schema: DataSchema = (&output_schema).into(); Ok(ProcessorPtr::create(Box::new( NativeDeserializeDataTransform { - ctx, - scan_id: plan.scan_id, func_ctx, - scan_progress, block_reader, input, output, @@ -331,7 +303,6 @@ impl NativeDeserializeDataTransform { top_k, index_reader, base_block_ids: plan.base_block_ids.clone(), - bloom_runtime_filter: None, read_state: ReadPartState::new(), need_reserve_block_info, }, @@ -352,17 +323,10 @@ impl NativeDeserializeDataTransform { )) } - fn add_output_block(&mut self, data_block: DataBlock) { - let rows = data_block.num_rows(); - if rows == 0 { + fn set_output_block(&mut self, data_block: DataBlock) { + if data_block.num_rows() == 0 { return; } - let progress_values = ProgressValues { - rows, - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, data_block.memory_size()); self.output_data = Some(data_block); } @@ -532,15 +496,7 @@ impl NativeDeserializeDataTransform { continue; } - // 4. check and evaluator the bloom runtime filter. - if !self.read_and_check_bloom_runtime_filter()? { - // skip current pages. - self.skipped_pages += 1; - self.read_state.skip_pages(); - continue; - } - - // 5. read remain columns and generate a data block. + // 4. read remain columns and generate a data block. if !self.read_remain_columns()? { debug_assert!(self.read_state.is_finished()); return Ok(None); @@ -549,7 +505,7 @@ impl NativeDeserializeDataTransform { .block_reader .build_block(&self.read_state.columns, None)?; - // 6. fill missing fields with default values. + // 5. fill missing fields with default values. if self.read_state.if_need_fill_defaults { block = self .block_reader @@ -636,70 +592,6 @@ impl NativeDeserializeDataTransform { Ok(true) } - // TODO(xudong): add selectivity prediction - /// Read and check the column for the bloom runtime filter (only one column). - /// - /// Returns false if skip the current page or the partition is finished. - fn read_and_check_bloom_runtime_filter(&mut self) -> Result { - if let Some(bloom_runtime_filter) = self.bloom_runtime_filter.as_ref() { - let mut bitmaps = Vec::with_capacity(bloom_runtime_filter.len()); - for runtime_filter in bloom_runtime_filter.iter() { - let start = std::time::Instant::now(); - let column = if let Some((_, column)) = self - .read_state - .columns - .iter() - .find(|(i, _)| i == &runtime_filter.column_index) - { - (runtime_filter.column_index, column.clone()) - } else if !self.read_state.read_page(runtime_filter.column_index)? { - debug_assert!(self.read_state.is_finished()); - return Ok(false); - } else { - // The runtime filter column must be the last column to read. - let (i, column) = self.read_state.columns.last().unwrap(); - debug_assert_eq!(i, &runtime_filter.column_index); - (runtime_filter.column_index, column.clone()) - }; - - let probe_block = self.block_reader.build_block(&[column], None)?; - let mut bitmap = MutableBitmap::from_len_zeroed(probe_block.num_rows()); - let probe_column = probe_block.get_last_column().clone(); - // Apply the filter to the probe column. - ExprBloomFilter::new(&runtime_filter.filter).apply(probe_column, &mut bitmap)?; - - let unset_bits = bitmap.null_count(); - let elapsed = start.elapsed(); - runtime_filter - .stats - .record_bloom(elapsed.as_nanos() as u64, unset_bits as u64); - if unset_bits == bitmap.len() { - // skip current page. - return Ok(false); - } - if unset_bits != 0 { - bitmaps.push(bitmap); - } - } - if !bitmaps.is_empty() { - let rf_bitmap = bitmaps - .into_iter() - .reduce(|acc, rf_filter| acc.bitand(&rf_filter.into())) - .unwrap(); - - let filter_executor = self.filter_executor.as_mut().unwrap(); - let filter_count = if let Some(count) = self.read_state.filtered_count { - filter_executor.select_bitmap(count, rf_bitmap) - } else { - filter_executor.from_bitmap(rf_bitmap) - }; - self.read_state.filtered_count = Some(filter_count); - } - } - - Ok(true) - } - /// Update the top-k heap with by the topk column. /// /// Returns false if skip the current page. @@ -746,46 +638,6 @@ impl NativeDeserializeDataTransform { Ok(true) } - /// Try to get bloom runtime filter from context. - fn try_init_bloom_runtime_filter(&mut self) { - if self.bloom_runtime_filter.is_none() { - let bloom_filters = self - .ctx - .get_runtime_filters(self.scan_id) - .into_iter() - .filter_map(|entry| { - let filter_id = entry.id; - let RuntimeFilterEntry { bloom, stats, .. } = entry; - let bloom = bloom?; - let column_index = self.src_schema.index_of(bloom.column_name.as_str()).ok()?; - Some(BloomRuntimeFilterRef { - column_index, - filter_id, - filter: bloom.filter.clone(), - stats, - }) - }) - .collect::>(); - if !bloom_filters.is_empty() { - let mut filter_ids = bloom_filters - .iter() - .map(|f| f.filter_id) - .collect::>(); - filter_ids.sort_unstable(); - log::info!( - "RUNTIME-FILTER: scan_id={} bloom_filters={} filter_ids={:?}", - self.scan_id, - bloom_filters.len(), - filter_ids - ); - self.bloom_runtime_filter = Some(bloom_filters); - if self.filter_executor.is_none() { - self.filter_executor = Some(new_dummy_filter_executor(self.func_ctx.clone())); - } - } - } - } - /// Pre-process the partition before reading it. fn pre_process_partition(&mut self) -> Result<()> { debug_assert!(!self.columns.is_empty()); @@ -806,7 +658,7 @@ impl NativeDeserializeDataTransform { let part = self.parts.front().unwrap(); let fuse_part = FuseBlockPartInfo::from_part(part)?; let block = self.build_default_block(fuse_part)?; - self.add_output_block(block); + self.set_output_block(block); self.finish_partition(); return Ok(()); @@ -924,9 +776,6 @@ impl Processor for NativeDeserializeDataTransform { } fn process(&mut self) -> Result<()> { - // Try to get the bloom runtime filter from the context if existed. - self.try_init_bloom_runtime_filter(); - // Only if current read state is finished can we start to read a new partition. if self.read_state.is_finished() { if let Some(columns) = self.columns.front_mut() { @@ -957,7 +806,7 @@ impl Processor for NativeDeserializeDataTransform { )?; self.finish_partition(); - self.add_output_block(data_block); + self.set_output_block(data_block); return Ok(()); } @@ -974,7 +823,7 @@ impl Processor for NativeDeserializeDataTransform { // Each `process` try to produce one `DataBlock`. if let Some(block) = self.read_pages()? { let block = self.post_process_block(block)?; - self.add_output_block(block); + self.set_output_block(block); } else { // No more data can be read from current partition. self.finish_partition(); @@ -986,7 +835,7 @@ impl Processor for NativeDeserializeDataTransform { /// Build a dummy filter executor to retain a selection. /// -/// This method may be used by `update_topk_heap` and `read_and_check_bloom_runtime_filter`. +/// This method may be used by `update_topk_heap`. fn new_dummy_filter_executor(func_ctx: FunctionContext) -> FilterExecutor { let dummy_expr = Expr::Constant(Constant { span: None, diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index d329701ffe443..473ba70988c46 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -13,37 +13,25 @@ // limitations under the License. use std::any::Any; -use std::ops::BitAnd; use std::sync::Arc; use std::time::Instant; -use databend_common_base::base::Progress; -use databend_common_base::base::ProgressValues; -use databend_common_base::runtime::profile::Profile; -use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; -use databend_common_catalog::runtime_filter_info::RuntimeBloomFilter; -use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry; -use databend_common_catalog::runtime_filter_info::RuntimeFilterStats; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchema; -use databend_common_expression::FieldIndex; use databend_common_expression::Scalar; -use databend_common_expression::types::Bitmap; use databend_common_expression::types::DataType; -use databend_common_expression::types::MutableBitmap; use databend_common_metrics::storage::*; use databend_common_pipeline::core::Event; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; use databend_common_pipeline::core::ProcessorPtr; -use roaring::RoaringTreemap; use super::parquet_data_source::ParquetDataSource; use super::util::add_data_block_meta; @@ -53,12 +41,8 @@ use crate::io::AggIndexReader; use crate::io::BlockReader; use crate::io::VirtualColumnReader; use crate::operations::read::data_source_with_meta::DataSourceWithMeta; -use crate::pruning::ExprBloomFilter; pub struct DeserializeDataTransform { - ctx: Arc, - scan_id: usize, - scan_progress: Arc, block_reader: Arc, input: Arc, @@ -73,18 +57,9 @@ pub struct DeserializeDataTransform { virtual_reader: Arc>, base_block_ids: Option, - cached_runtime_filter: Option>, need_reserve_block_info: bool, } -#[derive(Clone)] -struct BloomRuntimeFilterRef { - column_index: FieldIndex, - filter_id: usize, - filter: RuntimeBloomFilter, - stats: Arc, -} - unsafe impl Send for DeserializeDataTransform {} impl DeserializeDataTransform { @@ -97,8 +72,6 @@ impl DeserializeDataTransform { index_reader: Arc>, virtual_reader: Arc>, ) -> Result { - let scan_progress = ctx.get_scan_progress(); - let mut src_schema: DataSchema = (block_reader.schema().as_ref()).into(); if let Some(virtual_reader) = virtual_reader.as_ref() { let mut fields = src_schema.fields().clone(); @@ -117,9 +90,6 @@ impl DeserializeDataTransform { let output_schema: DataSchema = (&output_schema).into(); let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform { - ctx: ctx.clone(), - scan_id: plan.scan_id, - scan_progress, block_reader, input, output, @@ -131,75 +101,9 @@ impl DeserializeDataTransform { index_reader, virtual_reader, base_block_ids: plan.base_block_ids.clone(), - cached_runtime_filter: None, need_reserve_block_info, }))) } - - fn runtime_filter(&mut self, data_block: DataBlock) -> Result> { - // Check if already cached runtime filters - if self.cached_runtime_filter.is_none() { - let bloom_filters = self - .ctx - .get_runtime_filters(self.scan_id) - .into_iter() - .filter_map(|entry| { - let filter_id = entry.id; - let RuntimeFilterEntry { bloom, stats, .. } = entry; - let bloom = bloom?; - let column_index = self.src_schema.index_of(bloom.column_name.as_str()).ok()?; - Some(BloomRuntimeFilterRef { - column_index, - filter_id, - filter: bloom.filter.clone(), - stats, - }) - }) - .collect::>(); - if bloom_filters.is_empty() { - return Ok(None); - } - let mut filter_ids = bloom_filters - .iter() - .map(|f| f.filter_id) - .collect::>(); - filter_ids.sort_unstable(); - log::info!( - "RUNTIME-FILTER: scan_id={} bloom_filters={} filter_ids={:?}", - self.scan_id, - bloom_filters.len(), - filter_ids - ); - self.cached_runtime_filter = Some(bloom_filters); - } - - let mut bitmaps = vec![]; - for runtime_filter in self.cached_runtime_filter.as_ref().unwrap().iter() { - let mut bitmap = MutableBitmap::from_len_zeroed(data_block.num_rows()); - let probe_block_entry = data_block.get_by_offset(runtime_filter.column_index); - let probe_column = probe_block_entry.to_column(); - - // Apply bloom filter - let start = Instant::now(); - ExprBloomFilter::new(&runtime_filter.filter).apply(probe_column, &mut bitmap)?; - let elapsed = start.elapsed(); - let unset_bits = bitmap.null_count(); - runtime_filter - .stats - .record_bloom(elapsed.as_nanos() as u64, unset_bits as u64); - bitmaps.push(bitmap); - } - if !bitmaps.is_empty() { - let rf_bitmap = bitmaps - .into_iter() - .reduce(|acc, rf_filter| acc.bitand(&rf_filter.into())) - .unwrap(); - - Ok(rf_bitmap.into()) - } else { - Ok(None) - } - } } #[async_trait::async_trait] @@ -267,16 +171,6 @@ impl Processor for DeserializeDataTransform { let agg_index_reader = self.index_reader.as_ref().as_ref().unwrap(); let block = agg_index_reader.deserialize_parquet_data(actual_part, data)?; - let progress_values = ProgressValues { - rows: block.num_rows(), - bytes: block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - Profile::record_usize_profile( - ProfileStatisticsName::ScanBytes, - block.memory_size(), - ); - self.output_data = Some(block); } ParquetDataSource::Normal((data, virtual_data)) => { @@ -292,29 +186,6 @@ impl Processor for DeserializeDataTransform { &part.location, )?; - let origin_num_rows = data_block.num_rows(); - - let mut filter = None; - let bloom_start = Instant::now(); - - let rows_before = data_block.num_rows(); - if let Some(bitmap) = self.runtime_filter(data_block.clone())? { - data_block = data_block.filter_with_bitmap(&bitmap)?; - filter = Some(bitmap); - let rows_after = data_block.num_rows(); - let bloom_duration = bloom_start.elapsed(); - Profile::record_usize_profile( - ProfileStatisticsName::RuntimeFilterBloomTime, - bloom_duration.as_nanos() as usize, - ); - if rows_before > rows_after { - Profile::record_usize_profile( - ProfileStatisticsName::RuntimeFilterBloomRowsFiltered, - rows_before - rows_after, - ); - } - } - // Add optional virtual columns if let Some(virtual_reader) = self.virtual_reader.as_ref() { data_block = virtual_reader @@ -328,33 +199,12 @@ impl Processor for DeserializeDataTransform { ); } - let progress_values = ProgressValues { - rows: data_block.num_rows(), - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - Profile::record_usize_profile( - ProfileStatisticsName::ScanBytes, - data_block.memory_size(), - ); - let mut data_block = data_block.resort(&self.src_schema, &self.output_schema)?; // Fill `BlockMetaIndex` as `DataBlock.meta` if query internal columns, // `TransformAddInternalColumns` will generate internal columns using `BlockMetaIndex` in next pipeline. - let offsets = if self.block_reader.query_internal_columns() { - filter.as_ref().map(|bitmap| { - RoaringTreemap::from_sorted_iter( - (0..origin_num_rows) - .filter(|i| unsafe { bitmap.get_bit_unchecked(*i) }) - .map(|i| i as u64), - ) - .unwrap() - }) - } else { - None - }; + let offsets = None; data_block = add_data_block_meta( data_block, diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_transform.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_transform.rs new file mode 100644 index 0000000000000..2e0867c2ef92a --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/runtime_filter_transform.rs @@ -0,0 +1,341 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::ops::BitAnd; +use std::sync::Arc; +use std::time::Instant; + +use databend_common_base::base::Progress; +use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::profile::Profile; +use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_catalog::plan::InternalColumnMeta; +use databend_common_catalog::runtime_filter_info::RuntimeBloomFilter; +use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry; +use databend_common_catalog::runtime_filter_info::RuntimeFilterStats; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; +use databend_common_expression::FieldIndex; +use databend_common_expression::types::Bitmap; +use databend_common_expression::types::MutableBitmap; +use databend_common_pipeline::core::Event; +use databend_common_pipeline::core::InputPort; +use databend_common_pipeline::core::OutputPort; +use databend_common_pipeline::core::Processor; +use databend_common_pipeline::core::ProcessorPtr; +use roaring::RoaringTreemap; + +use crate::pruning::ExprBloomFilter; + +pub struct TransformRuntimeFilter { + ctx: Arc, + scan_id: usize, + schema: DataSchema, + scan_progress: Arc, + + input: Arc, + output: Arc, + output_data: Option, + + cached_bloom_filters: Option>, +} + +#[derive(Clone)] +struct BloomRuntimeFilterRef { + column_index: FieldIndex, + filter_id: usize, + filter: RuntimeBloomFilter, + stats: Arc, +} + +impl TransformRuntimeFilter { + pub fn create( + ctx: Arc, + scan_id: usize, + schema: DataSchema, + input: Arc, + output: Arc, + ) -> ProcessorPtr { + let scan_progress = ctx.get_scan_progress(); + ProcessorPtr::create(Box::new(Self { + ctx, + scan_id, + schema, + scan_progress, + input, + output, + output_data: None, + cached_bloom_filters: None, + })) + } + + fn try_init_bloom_filters(&mut self) { + if self.cached_bloom_filters.is_some() { + return; + } + + let bloom_filters = self + .ctx + .get_runtime_filters(self.scan_id) + .into_iter() + .filter_map(|entry| { + let filter_id = entry.id; + let RuntimeFilterEntry { bloom, stats, .. } = entry; + let bloom = bloom?; + let column_index = self.schema.index_of(bloom.column_name.as_str()).ok()?; + Some(BloomRuntimeFilterRef { + column_index, + filter_id, + filter: bloom.filter.clone(), + stats, + }) + }) + .collect::>(); + + if bloom_filters.is_empty() { + return; + } + + let mut filter_ids = bloom_filters + .iter() + .map(|f| f.filter_id) + .collect::>(); + filter_ids.sort_unstable(); + log::info!( + "RUNTIME-FILTER: scan_id={} bloom_filters={} filter_ids={:?}", + self.scan_id, + bloom_filters.len(), + filter_ids + ); + self.cached_bloom_filters = Some(bloom_filters); + } + + fn apply_bloom_runtime_filters(&mut self, data_block: &DataBlock) -> Result> { + self.try_init_bloom_filters(); + let Some(bloom_filters) = self.cached_bloom_filters.as_ref() else { + return Ok(None); + }; + + let mut bitmaps = Vec::with_capacity(bloom_filters.len()); + for runtime_filter in bloom_filters.iter() { + let mut bitmap = MutableBitmap::from_len_zeroed(data_block.num_rows()); + let probe_block_entry = data_block.get_by_offset(runtime_filter.column_index); + let probe_column = probe_block_entry.to_column(); + + let start = Instant::now(); + ExprBloomFilter::new(&runtime_filter.filter).apply(probe_column, &mut bitmap)?; + let elapsed = start.elapsed(); + + let unset_bits = bitmap.null_count(); + runtime_filter + .stats + .record_bloom(elapsed.as_nanos() as u64, unset_bits as u64); + bitmaps.push(bitmap); + } + + if bitmaps.is_empty() { + return Ok(None); + } + + let rf_bitmap = bitmaps + .into_iter() + .reduce(|acc, rf_filter| acc.bitand(&rf_filter.into())) + .unwrap(); + + Ok(Some(rf_bitmap.into())) + } + + fn maybe_update_internal_column_meta( + mut block: DataBlock, + bitmap: &Bitmap, + ) -> Result { + let Some(meta) = block.take_meta() else { + return Ok(block); + }; + + let mut internal = match InternalColumnMeta::downcast_from_err(meta) { + Ok(internal) => internal, + Err(meta) => return block.add_meta(Some(meta)), + }; + + let num_rows_before = bitmap.len(); + if num_rows_before == 0 { + return block.add_meta(Some(internal.boxed())); + } + + let kept_positions = RoaringTreemap::from_sorted_iter( + (0..num_rows_before) + .filter(|i| unsafe { bitmap.get_bit_unchecked(*i) }) + .map(|i| i as u64), + ) + .unwrap(); + + let new_offsets = match &internal.offsets { + None => Some(RoaringTreemap::from_sorted_iter(kept_positions.iter()).unwrap()), + Some(offsets) => { + let mut selected = Vec::with_capacity(kept_positions.len() as usize); + for (idx, origin_row) in offsets.iter().take(num_rows_before).enumerate() { + if unsafe { bitmap.get_bit_unchecked(idx) } { + selected.push(origin_row); + } + } + Some(RoaringTreemap::from_sorted_iter(selected).unwrap()) + } + }; + internal.offsets = new_offsets; + + match (internal.matched_rows.take(), internal.matched_scores.take()) { + (Some(rows), Some(scores)) => { + debug_assert_eq!(rows.len(), scores.len()); + let mut new_rows = Vec::with_capacity(rows.len()); + let mut new_scores = Vec::with_capacity(scores.len()); + for (idx, score) in rows.into_iter().zip(scores.into_iter()) { + if kept_positions.contains(idx as u64) { + new_rows.push((kept_positions.rank(idx as u64) - 1) as usize); + new_scores.push(score); + } + } + internal.matched_rows = Some(new_rows); + internal.matched_scores = Some(new_scores); + } + (Some(rows), None) => { + internal.matched_rows = Some( + rows.into_iter() + .filter(|idx| kept_positions.contains(*idx as u64)) + .map(|idx| (kept_positions.rank(idx as u64) - 1) as usize) + .collect(), + ); + internal.matched_scores = None; + } + (None, other) => { + internal.matched_rows = None; + internal.matched_scores = other; + } + } + + if let Some(vector_scores) = internal.vector_scores.take() { + internal.vector_scores = Some( + vector_scores + .into_iter() + .filter(|(idx, _score)| kept_positions.contains(*idx as u64)) + .map(|(idx, score)| ((kept_positions.rank(idx as u64) - 1) as usize, score)) + .collect(), + ); + } + + block.add_meta(Some(internal.boxed())) + } + + fn add_output_block(&mut self, data_block: &DataBlock) { + let progress_values = ProgressValues { + rows: data_block.num_rows(), + bytes: data_block.memory_size(), + }; + self.scan_progress.incr(&progress_values); + Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, data_block.memory_size()); + } +} + +#[async_trait::async_trait] +impl Processor for TransformRuntimeFilter { + fn name(&self) -> String { + String::from("TransformRuntimeFilter") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if let Some(data_block) = self.output_data.take() { + self.add_output_block(&data_block); + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + + if self.input.has_data() { + return Ok(Event::Sync); + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + #[async_backtrace::framed] + async fn async_process(&mut self) -> Result<()> { + Ok(()) + } + + fn process(&mut self) -> Result<()> { + let Some(data) = self.input.pull_data() else { + return Ok(()); + }; + + let data_block = data?; + if data_block.num_rows() == 0 { + self.output_data = Some(data_block); + return Ok(()); + } + + let bloom_start = Instant::now(); + let Some(bitmap) = self.apply_bloom_runtime_filters(&data_block)? else { + self.output_data = Some(data_block); + return Ok(()); + }; + + let rows_before = data_block.num_rows(); + let output_block = if bitmap.null_count() == 0 { + data_block + } else { + let mut filtered = data_block.filter_with_bitmap(&bitmap)?; + filtered = Self::maybe_update_internal_column_meta(filtered, &bitmap)?; + filtered + }; + + let rows_after = output_block.num_rows(); + let bloom_duration = bloom_start.elapsed(); + Profile::record_usize_profile( + ProfileStatisticsName::RuntimeFilterBloomTime, + bloom_duration.as_nanos() as usize, + ); + if rows_before > rows_after { + Profile::record_usize_profile( + ProfileStatisticsName::RuntimeFilterBloomRowsFiltered, + rows_before - rows_after, + ); + } + + self.output_data = Some(output_block); + Ok(()) + } +}