diff --git a/src/query/service/src/physical_plans/physical_aggregate_final.rs b/src/query/service/src/physical_plans/physical_aggregate_final.rs index a87b6b6b8cc6f..6a5f8241c04ae 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_final.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_final.rs @@ -32,6 +32,7 @@ use databend_common_sql::optimizer::ir::SExpr; use databend_common_sql::plans::Aggregate; use databend_common_sql::plans::AggregateMode; use databend_common_sql::plans::ConstantTableScan; +use databend_common_sql::plans::ScalarItem; use itertools::Itertools; use super::AggregateExpand; @@ -259,111 +260,8 @@ impl PhysicalPlanBuilder { .map(|item| Ok(item.scalar.as_expr()?.sql_display())) .collect::>>()?; - let mut agg_funcs: Vec = agg - .aggregate_functions - .iter() - .map(|v| match &v.scalar { - ScalarExpr::AggregateFunction(agg) => { - let arg_indices = agg - .args - .iter() - .map(|arg| { - if let ScalarExpr::BoundColumnRef(col) = arg { - Ok(col.column.index) - } else { - Err(ErrorCode::Internal( - "Aggregate function argument must be a BoundColumnRef" - .to_string(), - )) - } - }) - .collect::>>()?; - let args = arg_indices - .iter() - .map(|i| { - Ok(input_schema - .field_with_name(&i.to_string())? - .data_type() - .clone()) - }) - .collect::>()?; - let sort_desc_indices = agg.sort_descs - .iter() - .map(|desc| { - if let ScalarExpr::BoundColumnRef(col) = &desc.expr { - Ok(col.column.index) - } else { - Err(ErrorCode::Internal( - "Aggregate function description must be a BoundColumnRef" - .to_string(), - )) - } - }) - .collect::>()?; - let sort_descs = agg.sort_descs - .iter() - .map(|desc| desc.try_into()) - .collect::>()?; - Ok(AggregateFunctionDesc { - sig: AggregateFunctionSignature { - name: agg.func_name.clone(), - udaf: None, - return_type: *agg.return_type.clone(), - args, - params: agg.params.clone(), - sort_descs, - }, - output_column: v.index, - arg_indices, - sort_desc_indices, - display: v.scalar.as_expr()?.sql_display(), - }) - } - ScalarExpr::UDAFCall(udaf) => { - let arg_indices = udaf - .arguments - .iter() - .map(|arg| { - if let ScalarExpr::BoundColumnRef(col) = arg { - Ok(col.column.index) - } else { - Err(ErrorCode::Internal( - "Aggregate function argument must be a BoundColumnRef" - .to_string(), - )) - } - }) - .collect::>>()?; - let args = arg_indices - .iter() - .map(|i| { - Ok(input_schema - .field_with_name(&i.to_string())? - .data_type() - .clone()) - }) - .collect::>()?; - - Ok(AggregateFunctionDesc { - sig: AggregateFunctionSignature { - name: udaf.name.clone(), - udaf: Some((udaf.udf_type.clone(), udaf.state_fields.clone())), - return_type: *udaf.return_type.clone(), - args, - params: vec![], - sort_descs: vec![], - }, - output_column: v.index, - arg_indices, - sort_desc_indices: vec![], - display: v.scalar.as_expr()?.sql_display(), - }) - } - _ => Err(ErrorCode::Internal( - "Expected aggregate function".to_string(), - )), - }) - .collect::>()?; + let mut agg_funcs = + build_aggregate_function(&agg.aggregate_functions, &input_schema)?; let settings = self.ctx.get_settings(); let mut group_by_shuffle_mode = settings.get_group_by_shuffle_mode()?; @@ -507,111 +405,8 @@ impl PhysicalPlanBuilder { aggregate.input.output_schema()? }; - let mut agg_funcs: Vec = agg - .aggregate_functions - .iter() - .map(|v| match &v.scalar { - ScalarExpr::AggregateFunction(agg) => { - let arg_indices = agg - .args - .iter() - .map(|arg| { - if let ScalarExpr::BoundColumnRef(col) = arg { - Ok(col.column.index) - } else { - Err(ErrorCode::Internal( - "Aggregate function argument must be a BoundColumnRef" - .to_string(), - )) - } - }) - .collect::>>()?; - let sort_desc_indices = agg.sort_descs - .iter() - .map(|desc| { - if let ScalarExpr::BoundColumnRef(col) = &desc.expr { - Ok(col.column.index) - } else { - Err(ErrorCode::Internal( - "Aggregate function sort description must be a BoundColumnRef" - .to_string(), - )) - } - }) - .collect::>()?; - let args = arg_indices - .iter() - .map(|i| { - Ok(input_schema - .field_with_name(&i.to_string())? - .data_type() - .clone()) - }) - .collect::>()?; - let sort_descs = agg.sort_descs - .iter() - .map(|desc| desc.try_into()) - .collect::>()?; - Ok(AggregateFunctionDesc { - sig: AggregateFunctionSignature { - name: agg.func_name.clone(), - udaf: None, - return_type: *agg.return_type.clone(), - args, - params: agg.params.clone(), - sort_descs, - }, - output_column: v.index, - arg_indices, - sort_desc_indices, - display: v.scalar.as_expr()?.sql_display(), - }) - } - ScalarExpr::UDAFCall(udaf) => { - let arg_indices = udaf - .arguments - .iter() - .map(|arg| { - if let ScalarExpr::BoundColumnRef(col) = arg { - Ok(col.column.index) - } else { - Err(ErrorCode::Internal( - "Aggregate function argument must be a BoundColumnRef" - .to_string(), - )) - } - }) - .collect::>>()?; - let args = arg_indices - .iter() - .map(|i| { - Ok(input_schema - .field_with_name(&i.to_string())? - .data_type() - .clone()) - }) - .collect::>()?; - - Ok(AggregateFunctionDesc { - sig: AggregateFunctionSignature { - name: udaf.name.clone(), - udaf: Some((udaf.udf_type.clone(), udaf.state_fields.clone())), - return_type: *udaf.return_type.clone(), - args, - params: vec![], - sort_descs: vec![], - }, - output_column: v.index, - arg_indices, - sort_desc_indices: vec![], - display: v.scalar.as_expr()?.sql_display(), - }) - } - _ => Err(ErrorCode::Internal( - "Expected aggregate function".to_string(), - )), - }) - .collect::>()?; + let mut agg_funcs = + build_aggregate_function(&agg.aggregate_functions, &input_schema)?; if let Some(grouping_sets) = agg.grouping_sets.as_ref() { // The argument types are wrapped nullable due to `AggregateExpand` plan. We should recover them to original types. @@ -676,3 +471,113 @@ impl PhysicalPlanBuilder { Ok(result) } } + +fn build_aggregate_function( + agg_functions: &[ScalarItem], + input_schema: &DataSchemaRef, +) -> Result> { + agg_functions + .iter() + .map(|v| match &v.scalar { + ScalarExpr::AggregateFunction(agg) => { + let arg_indices = agg + .args + .iter() + .map(|arg| { + if let ScalarExpr::BoundColumnRef(col) = arg { + Ok(col.column.index) + } else { + Err(ErrorCode::Internal( + "Aggregate function argument must be a BoundColumnRef".to_string(), + )) + } + }) + .collect::>>()?; + let sort_desc_indices = agg + .sort_descs + .iter() + .map(|desc| { + if let ScalarExpr::BoundColumnRef(col) = &desc.expr { + Ok(col.column.index) + } else { + Err(ErrorCode::Internal( + "Aggregate function sort description must be a BoundColumnRef" + .to_string(), + )) + } + }) + .collect::>()?; + let args = arg_indices + .iter() + .map(|i| { + Ok(input_schema + .field_with_name(&i.to_string())? + .data_type() + .clone()) + }) + .collect::>()?; + let sort_descs = agg + .sort_descs + .iter() + .map(|desc| desc.try_into()) + .collect::>()?; + Ok(AggregateFunctionDesc { + sig: AggregateFunctionSignature { + name: agg.func_name.clone(), + udaf: None, + return_type: *agg.return_type.clone(), + args, + params: agg.params.clone(), + sort_descs, + }, + output_column: v.index, + arg_indices, + sort_desc_indices, + display: v.scalar.as_expr()?.sql_display(), + }) + } + ScalarExpr::UDAFCall(udaf) => { + let arg_indices = udaf + .arguments + .iter() + .map(|arg| { + if let ScalarExpr::BoundColumnRef(col) = arg { + Ok(col.column.index) + } else { + Err(ErrorCode::Internal( + "Aggregate function argument must be a BoundColumnRef".to_string(), + )) + } + }) + .collect::>>()?; + let args = arg_indices + .iter() + .map(|i| { + Ok(input_schema + .field_with_name(&i.to_string())? + .data_type() + .clone()) + }) + .collect::>()?; + + Ok(AggregateFunctionDesc { + sig: AggregateFunctionSignature { + name: udaf.name.clone(), + udaf: Some((udaf.udf_type.clone(), udaf.state_fields.clone())), + return_type: *udaf.return_type.clone(), + args, + params: vec![], + sort_descs: vec![], + }, + output_column: v.index, + arg_indices, + sort_desc_indices: vec![], + display: v.scalar.as_expr()?.sql_display(), + }) + } + _ => Err(ErrorCode::Internal( + "Expected aggregate function".to_string(), + )), + }) + .collect::>() +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs index b380ccbd7facb..ebd0c62e4e9d8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs @@ -20,7 +20,10 @@ mod new_transform_final_aggregate; mod transform_partition_bucket_scatter; pub use datablock_splitter::split_partitioned_meta_into_datablocks; +pub use new_aggregate_spiller::LocalPartitionStream; +pub use new_aggregate_spiller::NewAggregateSpillReader; pub use new_aggregate_spiller::NewAggregateSpiller; +pub use new_aggregate_spiller::PartitionStream; pub use new_aggregate_spiller::SharedPartitionStream; pub use new_final_aggregate_state::FinalAggregateSharedState; pub use new_transform_aggregate_partial::NewTransformPartialAggregate; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 45b02e1e1c69e..bd56f9dd60d55 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -225,41 +225,44 @@ impl AggregatePayloadWriters { } } -struct SharedPartitionStreamInner { - partition_stream: BlockPartitionStream, - worker_count: usize, - finish_count: usize, +pub trait PartitionStream: Send { + fn finish(&mut self) -> Vec<(usize, DataBlock)>; + fn partition(&mut self, partition_id: usize, block: DataBlock) -> Vec<(usize, DataBlock)>; } -impl SharedPartitionStreamInner { - pub fn finish(&mut self) -> Vec<(usize, DataBlock)> { - self.finish_count += 1; - - if self.finish_count == self.worker_count { - self.finish_count = 0; - - let ids = self.partition_stream.partition_ids(); +pub struct LocalPartitionStream { + partition_stream: BlockPartitionStream, +} - let mut pending_blocks = Vec::with_capacity(ids.len()); +impl LocalPartitionStream { + pub fn new(max_rows: usize, max_bytes: usize, partition_count: usize) -> Self { + let partition_stream = BlockPartitionStream::create(max_rows, max_bytes, partition_count); + Self { partition_stream } + } +} - for id in ids { - if let Some(block) = self.partition_stream.finalize_partition(id) { - pending_blocks.push((id, block)); - } +impl PartitionStream for LocalPartitionStream { + fn finish(&mut self) -> Vec<(usize, DataBlock)> { + let ids = self.partition_stream.partition_ids(); + let mut pending_blocks = Vec::with_capacity(ids.len()); + for id in ids { + if let Some(block) = self.partition_stream.finalize_partition(id) { + pending_blocks.push((id, block)); } - return pending_blocks; } - vec![] + pending_blocks } - pub fn partition(&mut self, partition_id: u64, block: DataBlock) -> Vec<(usize, DataBlock)> { - let indices = vec![partition_id; block.num_rows()]; + fn partition(&mut self, partition_id: usize, block: DataBlock) -> Vec<(usize, DataBlock)> { + let indices = vec![partition_id as u64; block.num_rows()]; self.partition_stream.partition(indices, block, true) } +} - pub fn update_worker_count(&mut self, worker_count: usize) { - self.worker_count = worker_count; - } +struct SharedPartitionStreamInner { + partition_stream: BlockPartitionStream, + worker_count: usize, + finish_count: usize, } #[derive(Clone)] @@ -284,35 +287,61 @@ impl SharedPartitionStream { } } - pub fn finish(&self) -> Vec<(usize, DataBlock)> { + pub fn do_finish(&self) -> Vec<(usize, DataBlock)> { let mut inner = self.inner.lock(); - inner.finish() + inner.finish_count += 1; + + if inner.finish_count == inner.worker_count { + inner.finish_count = 0; + + let ids = inner.partition_stream.partition_ids(); + + let mut pending_blocks = Vec::with_capacity(ids.len()); + + for id in ids { + if let Some(block) = inner.partition_stream.finalize_partition(id) { + pending_blocks.push((id, block)); + } + } + return pending_blocks; + } + vec![] } - pub fn partition(&self, partition_id: usize, block: DataBlock) -> Vec<(usize, DataBlock)> { + pub fn do_partition(&self, partition_id: usize, block: DataBlock) -> Vec<(usize, DataBlock)> { let mut inner = self.inner.lock(); - inner.partition(partition_id as u64, block) + let indices = vec![partition_id as u64; block.num_rows()]; + inner.partition_stream.partition(indices, block, true) } pub fn update_worker_count(&self, worker_count: usize) { let mut inner = self.inner.lock(); - inner.update_worker_count(worker_count); + inner.worker_count = worker_count; } } -pub struct NewAggregateSpiller { +impl PartitionStream for SharedPartitionStream { + fn finish(&mut self) -> Vec<(usize, DataBlock)> { + self.do_finish() + } + + fn partition(&mut self, partition_id: usize, block: DataBlock) -> Vec<(usize, DataBlock)> { + self.do_partition(partition_id, block) + } +} + +pub struct NewAggregateSpiller { pub memory_settings: MemorySettings, read_setting: ReadSettings, - partition_count: usize, - partition_stream: SharedPartitionStream, + partition_stream: P, payload_writers: AggregatePayloadWriters, } -impl NewAggregateSpiller { +impl NewAggregateSpiller

{ pub fn try_create( ctx: Arc, partition_count: usize, - partition_stream: SharedPartitionStream, + partition_stream: P, is_local: bool, ) -> Result { let memory_settings = MemorySettings::from_aggregate_settings(&ctx)?; @@ -326,7 +355,6 @@ impl NewAggregateSpiller { Ok(Self { memory_settings, read_setting, - partition_count, partition_stream, payload_writers, }) @@ -351,50 +379,11 @@ impl NewAggregateSpiller { } pub fn restore(&self, payload: NewSpilledPayload) -> Result { - let NewSpilledPayload { - bucket, - location, - row_group, - } = payload; - - let data_operator = DataOperator::instance(); - let target = SpillTarget::from_storage_params(data_operator.spill_params()); - let operator = data_operator.spill_operator(); - let buffer_pool = SpillsBufferPool::instance(); - - let mut reader = buffer_pool.reader( - operator.clone(), - location.clone(), - vec![row_group.clone()], - target, - )?; - - let instant = Instant::now(); - let data_block = reader.read(self.read_setting)?; - let elapsed = instant.elapsed(); - - let read_size = reader.read_bytes(); - - info!( - "Read aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, elapsed: {:?})", - bucket, - location, - read_size, - row_group.num_rows(), - elapsed - ); - - if let Some(block) = data_block { - Ok(AggregateMeta::Serialized(SerializedPayload { - bucket, - data_block: block, - max_partition_count: self.partition_count, - })) - } else { - Err(ErrorCode::Internal("read empty block from final aggregate")) - } + restore_payload(self.read_setting, payload) } +} +impl NewAggregateSpiller { pub fn update_activate_worker(&self, activate_worker: usize) { self.partition_stream.update_worker_count(activate_worker); } @@ -410,6 +399,71 @@ impl NewAggregateSpiller { } } +pub struct NewAggregateSpillReader { + read_setting: ReadSettings, +} + +impl NewAggregateSpillReader { + pub fn try_create(ctx: Arc) -> Result { + let table_ctx: Arc = ctx; + let read_setting = ReadSettings::from_settings(&table_ctx.get_settings())?; + Ok(Self { read_setting }) + } + + pub fn restore(&self, payload: NewSpilledPayload) -> Result { + restore_payload(self.read_setting, payload) + } +} + +fn restore_payload( + read_setting: ReadSettings, + payload: NewSpilledPayload, +) -> Result { + let NewSpilledPayload { + bucket, + location, + row_group, + } = payload; + + let data_operator = DataOperator::instance(); + let target = SpillTarget::from_storage_params(data_operator.spill_params()); + let operator = data_operator.spill_operator(); + let buffer_pool = SpillsBufferPool::instance(); + + let mut reader = buffer_pool.reader( + operator.clone(), + location.clone(), + vec![row_group.clone()], + target, + )?; + + let instant = Instant::now(); + let data_block = reader.read(read_setting)?; + let elapsed = instant.elapsed(); + + let read_size = reader.read_bytes(); + + info!( + "Read aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, elapsed: {:?})", + bucket, + location, + read_size, + row_group.num_rows(), + elapsed + ); + + if let Some(block) = data_block { + Ok(AggregateMeta::Serialized(SerializedPayload { + bucket, + data_block: block, + // New aggregator no longer uses this field; keep 0 for deprecated compatibility. + max_partition_count: 0, + })) + } else { + Err(ErrorCode::Internal("read empty block from final aggregate")) + } +} + fn flush_write_profile(ctx: &Arc, stats: WriteStats, _is_local: bool) { if stats.count == 0 && stats.bytes == 0 && stats.rows == 0 { return; @@ -435,6 +489,7 @@ mod tests { use databend_common_expression::types::Int32Type; use crate::pipelines::processors::transforms::aggregator::NewAggregateSpiller; + use crate::pipelines::processors::transforms::aggregator::new_aggregate::LocalPartitionStream; use crate::pipelines::processors::transforms::aggregator::new_aggregate::SharedPartitionStream; use crate::test_kits::TestFixture; @@ -466,4 +521,33 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_aggregate_payload_writers_local_stream() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let partition_count = 4; + let partition_stream = LocalPartitionStream::new(1024, 1024 * 1024, partition_count); + let mut spiller = + NewAggregateSpiller::try_create(ctx.clone(), partition_count, partition_stream, true)?; + + let block = DataBlock::new_from_columns(vec![Int32Type::from_data(vec![1i32, 2, 3])]); + + spiller.spill(0, block.clone())?; + spiller.spill(2, block)?; + + let payloads = spiller.spill_finish()?; + + assert_eq!(payloads.len(), 2); + + let spilled_files = ctx.get_spilled_files(); + assert_eq!(spilled_files.len(), 2); + + let buckets: HashSet<_> = payloads.iter().map(|p| p.bucket).collect(); + assert!(buckets.contains(&0)); + assert!(buckets.contains(&2)); + + Ok(()) + } }