diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 15dbdd8ae4bbe..19774a6d437a8 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -46,6 +46,7 @@ pub struct AggregateHashTable { current_radix_bits: u64, hash_index: HashIndex, + hash_index_resize_count: usize, } unsafe impl Send for AggregateHashTable {} @@ -80,6 +81,7 @@ impl AggregateHashTable { ), hash_index: HashIndex::with_capacity(capacity), config, + hash_index_resize_count: 0, } } @@ -113,6 +115,7 @@ impl AggregateHashTable { capacity_mask: capacity - 1, }, config, + hash_index_resize_count: 0, } } @@ -398,10 +401,12 @@ impl AggregateHashTable { if self.hash_index.capacity == self.config.max_partial_capacity { return; } + self.hash_index_resize_count += 1; self.hash_index = HashIndex::with_capacity(new_capacity); return; } + self.hash_index_resize_count += 1; let mut hash_index = HashIndex::with_capacity(new_capacity); // iterate over payloads and copy to new entries @@ -454,4 +459,8 @@ impl AggregateHashTable { .sum::() + self.hash_index.allocated_bytes() } + + pub fn hash_index_resize_count(&self) -> usize { + self.hash_index_resize_count + } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs index 880668c2a932b..1c791ba964f19 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs @@ -18,6 +18,7 @@ mod aggregator_params; mod build_partition_bucket; mod new_aggregate; mod serde; +mod statistics; mod transform_aggregate_expand; mod transform_aggregate_final; mod transform_aggregate_partial; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs index 2cd1eb63fb28a..d41b4aabf1225 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_partial.rs @@ -13,12 +13,9 @@ // limitations under the License. use std::sync::Arc; -use std::time::Instant; use std::vec; use bumpalo::Bump; -use databend_common_base::base::convert_byte_size; -use databend_common_base::base::convert_number_size; use databend_common_catalog::plan::AggIndexMeta; use databend_common_exception::Result; use databend_common_expression::AggregateHashTable; @@ -52,6 +49,7 @@ use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream; use crate::pipelines::processors::transforms::aggregator::aggregate_exchange_injector::scatter_partitioned_payload; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::exchange_defines; +use crate::pipelines::processors::transforms::aggregator::statistics::AggregationStatistics; use crate::servers::flight::v1::exchange::serde::serialize_block; use crate::sessions::QueryContext; @@ -67,52 +65,6 @@ impl Default for HashTable { } } -struct PartialAggregationStatistics { - start: Instant, - first_block_start: Option, - processed_bytes: usize, - processed_rows: usize, -} - -impl PartialAggregationStatistics { - fn new() -> Self { - Self { - start: Instant::now(), - first_block_start: None, - processed_bytes: 0, - processed_rows: 0, - } - } - - fn record_block(&mut self, rows: usize, bytes: usize) { - self.processed_rows += rows; - self.processed_bytes += bytes; - if self.first_block_start.is_none() { - self.first_block_start = Some(Instant::now()); - } - } - - fn log_finish_statistics(&self, hashtable: &AggregateHashTable) { - let elapsed = self.start.elapsed().as_secs_f64(); - let real_elapsed = self - .first_block_start - .as_ref() - .map(|t| t.elapsed().as_secs_f64()) - .unwrap_or(elapsed); - - log::info!( - "[TRANSFORM-AGGREGATOR] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}", - self.processed_rows, - hashtable.payload.len(), - elapsed, - real_elapsed, - convert_number_size(self.processed_rows as f64 / elapsed), - convert_byte_size(self.processed_bytes as f64 / elapsed), - convert_byte_size(self.processed_bytes as f64), - ); - } -} - enum Spiller { Standalone(NewAggregateSpiller), // (local_pos, spillers for all) @@ -269,7 +221,7 @@ pub struct NewTransformPartialAggregate { hash_table: HashTable, probe_state: ProbeState, params: Arc, - statistics: PartialAggregationStatistics, + statistics: AggregationStatistics, settings: MemorySettings, spillers: Spiller, } @@ -302,7 +254,7 @@ impl NewTransformPartialAggregate { hash_table, probe_state: ProbeState::default(), settings: MemorySettings::from_aggregate_settings(&ctx)?, - statistics: PartialAggregationStatistics::new(), + statistics: AggregationStatistics::new("NewPartialAggregate"), spillers, }, )) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index f41ec14819514..dab9289ab602c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -40,6 +40,7 @@ use super::new_final_aggregate_state::RoundPhase; use crate::pipelines::processors::transforms::aggregator::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::AggregatePayload; use crate::pipelines::processors::transforms::aggregator::AggregatorParams; +use crate::pipelines::processors::transforms::aggregator::statistics::AggregationStatistics; pub struct NewFinalAggregateTransform { input: Arc, @@ -61,6 +62,8 @@ pub struct NewFinalAggregateTransform { /// spill spiller: NewAggregateSpiller, + + statistics: AggregationStatistics, } impl NewFinalAggregateTransform { @@ -88,6 +91,7 @@ impl NewFinalAggregateTransform { barrier, shared_state, spiller, + statistics: AggregationStatistics::new("NewFinalAggregate"), })) } @@ -219,6 +223,9 @@ impl NewFinalAggregateTransform { match meta { AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() { Some(ht) => { + let rows = payload.data_block.num_rows(); + let bytes = payload.data_block.memory_size(); + self.statistics.record_block(rows, bytes); let payload = payload.convert_to_partitioned_payload( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), @@ -229,6 +236,9 @@ impl NewFinalAggregateTransform { ht.combine_payloads(&payload, &mut self.flush_state)?; } None => { + let rows = payload.data_block.num_rows(); + let bytes = payload.data_block.memory_size(); + self.statistics.record_block(rows, bytes); agg_hashtable = Some(payload.convert_to_aggregate_table( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), @@ -241,9 +251,15 @@ impl NewFinalAggregateTransform { }, AggregateMeta::AggregatePayload(payload) => match agg_hashtable.as_mut() { Some(ht) => { + let rows = payload.payload.len(); + let bytes = payload.payload.memory_size(); + self.statistics.record_block(rows, bytes); ht.combine_payload(&payload.payload, &mut self.flush_state)?; } None => { + let rows = payload.payload.len(); + let bytes = payload.payload.memory_size(); + self.statistics.record_block(rows, bytes); let capacity = AggregateHashTable::get_capacity_for_count(payload.payload.len()); let mut hashtable = AggregateHashTable::new_with_capacity( @@ -263,6 +279,7 @@ impl NewFinalAggregateTransform { } let output_block = if let Some(mut ht) = agg_hashtable { + self.statistics.log_finish_statistics(&ht); let mut blocks = vec![]; self.flush_state.clear(); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs new file mode 100644 index 0000000000000..ebf538a9c1f19 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs @@ -0,0 +1,74 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Instant; + +use databend_common_base::base::convert_byte_size; +use databend_common_base::base::convert_number_size; +use databend_common_expression::AggregateHashTable; + +pub struct AggregationStatistics { + stage: &'static str, + start: Instant, + first_block_start: Option, + processed_bytes: usize, + processed_rows: usize, +} + +impl AggregationStatistics { + pub fn new(stage: &'static str) -> Self { + Self { + stage, + start: Instant::now(), + first_block_start: None, + processed_bytes: 0, + processed_rows: 0, + } + } + + pub fn record_block(&mut self, rows: usize, bytes: usize) { + self.processed_rows += rows; + self.processed_bytes += bytes; + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + } + + pub fn log_finish_statistics(&mut self, hashtable: &AggregateHashTable) { + let elapsed = self.start.elapsed().as_secs_f64(); + let real_elapsed = self + .first_block_start + .as_ref() + .map(|t| t.elapsed().as_secs_f64()) + .unwrap_or(elapsed); + + log::info!( + "[TRANSFORM-AGGREGATOR][{}] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", + self.stage, + self.processed_rows, + hashtable.payload.len(), + elapsed, + real_elapsed, + convert_number_size(self.processed_rows as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64), + hashtable.hash_index_resize_count(), + ); + + self.processed_rows = 0; + self.processed_bytes = 0; + self.first_block_start = None; + self.start = Instant::now(); + } +}