diff --git a/Cargo.lock b/Cargo.lock index 78865754d9903..772fb3b4a6412 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4160,7 +4160,6 @@ dependencies = [ "serde_json", "sha2", "siphasher", - "sys-info", "tantivy", "tantivy-common", "tantivy-fst", diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs index 69e4c1b40056a..6c77659ef3cfd 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs @@ -12,21 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_exception::Result; use databend_common_expression::BlockThresholds; +use databend_common_expression::Column; use databend_common_expression::DataBlock; +use databend_common_expression::Value; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::Pipeline; use crate::processors::AccumulatingTransform; use crate::processors::BlockCompactMeta; use crate::processors::TransformCompactBlock; use crate::processors::TransformPipelineHelper; +use crate::Transform; +use crate::Transformer; pub fn build_compact_block_pipeline( pipeline: &mut Pipeline, thresholds: BlockThresholds, ) -> Result<()> { let output_len = pipeline.output_len(); + pipeline.add_transform(ConvertToFullTransform::create)?; pipeline.try_resize(1)?; pipeline.add_accumulating_transformer(|| BlockCompactBuilder::new(thresholds)); pipeline.try_resize(output_len)?; @@ -34,6 +44,26 @@ pub fn build_compact_block_pipeline( Ok(()) } +pub(crate) struct ConvertToFullTransform; + +impl ConvertToFullTransform { + pub(crate) fn create(input: Arc, output: Arc) -> Result { + Ok(ProcessorPtr::create(Transformer::create( + input, + output, + ConvertToFullTransform {}, + ))) + } +} + +impl Transform for ConvertToFullTransform { + const NAME: &'static str = "ConvertToFullTransform"; + + fn transform(&mut self, data: DataBlock) -> Result { + Ok(data.consume_convert_to_full()) + } +} + pub struct BlockCompactBuilder { thresholds: BlockThresholds, // Holds blocks that are partially accumulated but haven't reached the threshold. @@ -63,7 +93,7 @@ impl AccumulatingTransform for BlockCompactBuilder { fn transform(&mut self, data: DataBlock) -> Result> { let num_rows = data.num_rows(); - let num_bytes = data.memory_size(); + let num_bytes = memory_size(&data); if !self.thresholds.check_for_compact(num_rows, num_bytes) { // holding slices of blocks to merge later may lead to oom, so @@ -112,3 +142,18 @@ impl AccumulatingTransform for BlockCompactBuilder { } } } + +pub(crate) fn memory_size(data_block: &DataBlock) -> usize { + data_block + .columns() + .iter() + .map(|entry| match &entry.value { + Value::Column(Column::Nullable(col)) if col.validity.true_count() == 0 => { + // For `Nullable` columns with no valid values, + // only the size of the validity bitmap is counted. + col.validity.as_slice().0.len() + } + _ => entry.memory_size(), + }) + .sum() +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs index 85215b0b58b6f..adcd014acb56f 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs @@ -79,7 +79,7 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder { fn transform(&mut self, data: DataBlock) -> Result> { self.accumulated_rows += data.num_rows(); - self.accumulated_bytes += data.memory_size(); + self.accumulated_bytes += crate::processors::memory_size(&data); if !self .thresholds .check_large_enough(self.accumulated_rows, self.accumulated_bytes) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_dummy.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_dummy.rs index 1dc6ee7d0d39a..17318b7716801 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_dummy.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_dummy.rs @@ -27,7 +27,6 @@ use crate::processors::transforms::Transformer; pub struct TransformDummy; impl TransformDummy { - #[allow(dead_code)] pub fn create(input: Arc, output: Arc) -> ProcessorPtr { ProcessorPtr::create(Transformer::create(input, output, TransformDummy {})) } diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 3c695cf3465e9..0218fa5399d2f 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -203,6 +203,7 @@ async fn compact_table( &compact_target.database, &compact_target.table, )?; + ctx.set_enable_sort_spill(false); let recluster = RelOperator::Recluster(Recluster { catalog: compact_target.catalog, database: compact_target.database, diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index f57e149feab6b..50742d09be4e0 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -154,7 +154,6 @@ impl PipelineBuilder { let sort_block_size = block_thresholds.calc_rows_per_block(task.total_bytes, task.total_rows); - self.ctx.set_enable_sort_spill(false); let sort_pipeline_builder = SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs))? .with_block_size_hit(sort_block_size) diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index bb7240f586abd..1e544adcd7b61 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -584,8 +584,8 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::HilbertSerialize(_) | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), + PhysicalPlan::HilbertSerialize(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::AggregateExpand(plan) => Box::new(std::iter::once(plan.input.as_ref())), diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 5cdccd69f9e91..dfab899623cf7 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -128,7 +128,12 @@ pub trait PhysicalPlanReplacer { } fn replace_hilbert_serialize(&mut self, plan: &HilbertSerialize) -> Result { - Ok(PhysicalPlan::HilbertSerialize(Box::new(plan.clone()))) + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::HilbertSerialize(Box::new(HilbertSerialize { + plan_id: plan.plan_id, + input: Box::new(input), + table_info: plan.table_info.clone(), + }))) } fn replace_table_scan(&mut self, plan: &TableScan) -> Result { diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 219247b538dc8..6a0b91f1081e7 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -1141,22 +1141,19 @@ impl Binder { let partitions = settings.get_hilbert_num_range_ids()?; let sample_size = settings.get_hilbert_sample_size_per_block()?; - let keys_bounds_str = cluster_key_strs - .iter() - .map(|s| format!("range_bound({partitions}, {sample_size})({s}) AS {s}_bound")) - .collect::>() - .join(", "); - - let hilbert_keys_str = cluster_key_strs - .iter() - .map(|s| { - format!( - "hilbert_key(cast(ifnull(range_partition_id({table}.{s}, _keys_bound.{s}_bound), {}) as uint16))", - partitions - ) - }) - .collect::>() - .join(", "); + let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len()); + let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len()); + for (index, cluster_key_str) in cluster_key_strs.into_iter().enumerate() { + keys_bounds.push(format!( + "range_bound({partitions}, {sample_size})({cluster_key_str}) AS bound_{index}" + )); + hilbert_keys.push(format!( + "hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, \ + _keys_bound.bound_{index}), {partitions}) as uint16))" + )); + } + let keys_bounds_str = keys_bounds.join(", "); + let hilbert_keys_str = hilbert_keys.join(", "); let quote = settings.get_sql_dialect()?.default_ident_quote(); let schema = tbl.schema_with_stream(); @@ -1174,20 +1171,20 @@ impl Binder { let query = format!( "WITH _keys_bound AS ( \ - SELECT \ - {keys_bounds_str} \ - FROM {database}.{table} \ - ), \ - _source_data AS ( \ - SELECT \ - {output_with_table_str}, \ - hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \ - FROM {database}.{table}, _keys_bound \ - ) \ SELECT \ - {output_str} \ - FROM _source_data \ - ORDER BY _hilbert_index" + {keys_bounds_str} \ + FROM {database}.{table} \ + ), \ + _source_data AS ( \ + SELECT \ + {output_with_table_str}, \ + hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \ + FROM {database}.{table}, _keys_bound \ + ) \ + SELECT \ + {output_str} \ + FROM _source_data \ + ORDER BY _hilbert_index" ); let tokens = tokenize_sql(query.as_str())?; let (stmt, _) = parse_sql(&tokens, self.dialect)?; diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index f7d9369ffbab4..da10c20cbd544 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -67,7 +67,6 @@ serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } siphasher = { workspace = true } -sys-info = { workspace = true } tantivy = { workspace = true } tantivy-common = { workspace = true } tantivy-fst = { workspace = true } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 5de6cb538442b..866d8c9633367 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -20,6 +20,7 @@ use std::collections::HashSet; use std::sync::Arc; use databend_common_base::runtime::execute_futures_in_parallel; +use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::ReclusterParts; @@ -195,10 +196,13 @@ impl ReclusterMutator { } } - // Compute memory threshold and maximum number of blocks allowed for reclustering - let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; - let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize; - let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 30 / 100); + // Compute memory threshold and maximum number of blocks allowed for reclustering. + let settings = self.ctx.get_settings(); + let avail_memory_usage = + settings.get_max_memory_usage()? - GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64; + let memory_threshold = settings + .get_recluster_block_size()? + .min(avail_memory_usage * 30 / 100) as usize; // specify a rather small value, so that `recluster_block_size` might be tuned to lower value. let max_blocks_num = (memory_threshold / self.block_thresholds.max_bytes_per_block).max(2) * self.max_tasks; diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index 9a30db09adf21..f752186b84fea 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -185,7 +185,7 @@ impl AsyncSystemTable for StreamsTable { let mut handlers = Vec::new(); for table in tables { - // If db1 is visible, do not means db1.table1 is visible. An user may have a grant about db1.table2, so db1 is visible + // If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible // for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check. let t_id = table.get_id(); if visibility_checker.check_table_visibility( diff --git a/tests/suites/1_stateful/07_stage_attachment/07_0000_insert_with_stage.result b/tests/suites/1_stateful/07_stage_attachment/07_0000_insert_with_stage.result index 2ec060c70b1bf..75b9375de4942 100755 --- a/tests/suites/1_stateful/07_stage_attachment/07_0000_insert_with_stage.result +++ b/tests/suites/1_stateful/07_stage_attachment/07_0000_insert_with_stage.result @@ -1,7 +1,7 @@ sample.csv Succeeded 96 -125 +192 null 1 'Beijing' 100 China 2 'Shanghai' 80 China diff --git a/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.result b/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.result index 98cb421d64e09..d4b70df445dd2 100644 --- a/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.result +++ b/tests/suites/1_stateful/07_stage_attachment/07_0001_replace_with_stage.result @@ -1,6 +1,6 @@ sample.csv 96 -125 +192 null 1 'Beijing' 100 China 2 'Shanghai' 80 China diff --git a/tests/suites/1_stateful/07_stage_attachment/07_0002_insert_with_stage_deduplicate.result b/tests/suites/1_stateful/07_stage_attachment/07_0002_insert_with_stage_deduplicate.result index 3d6fc1564c68d..fdd3289d0ac19 100644 --- a/tests/suites/1_stateful/07_stage_attachment/07_0002_insert_with_stage_deduplicate.result +++ b/tests/suites/1_stateful/07_stage_attachment/07_0002_insert_with_stage_deduplicate.result @@ -1,6 +1,6 @@ sample.csv 96 -125 +192 null 1 'Beijing' 100 China 2 'Shanghai' 80 China