Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct AggregateHashTable {

current_radix_bits: u64,
hash_index: HashIndex,
hash_index_resize_count: usize,
}

unsafe impl Send for AggregateHashTable {}
Expand Down Expand Up @@ -80,6 +81,7 @@ impl AggregateHashTable {
),
hash_index: HashIndex::with_capacity(capacity),
config,
hash_index_resize_count: 0,
}
}

Expand Down Expand Up @@ -113,6 +115,7 @@ impl AggregateHashTable {
capacity_mask: capacity - 1,
},
config,
hash_index_resize_count: 0,
}
}

Expand Down Expand Up @@ -398,10 +401,12 @@ impl AggregateHashTable {
if self.hash_index.capacity == self.config.max_partial_capacity {
return;
}
self.hash_index_resize_count += 1;
self.hash_index = HashIndex::with_capacity(new_capacity);
return;
}

self.hash_index_resize_count += 1;
let mut hash_index = HashIndex::with_capacity(new_capacity);

// iterate over payloads and copy to new entries
Expand Down Expand Up @@ -454,4 +459,8 @@ impl AggregateHashTable {
.sum::<usize>()
+ self.hash_index.allocated_bytes()
}

pub fn hash_index_resize_count(&self) -> usize {
self.hash_index_resize_count
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod aggregator_params;
mod build_partition_bucket;
mod new_aggregate;
mod serde;
mod statistics;
mod transform_aggregate_expand;
mod transform_aggregate_final;
mod transform_aggregate_partial;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Instant;
use std::vec;

use bumpalo::Bump;
use databend_common_base::base::convert_byte_size;
use databend_common_base::base::convert_number_size;
use databend_common_catalog::plan::AggIndexMeta;
use databend_common_exception::Result;
use databend_common_expression::AggregateHashTable;
Expand Down Expand Up @@ -52,6 +49,7 @@ use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream;
use crate::pipelines::processors::transforms::aggregator::aggregate_exchange_injector::scatter_partitioned_payload;
use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta;
use crate::pipelines::processors::transforms::aggregator::exchange_defines;
use crate::pipelines::processors::transforms::aggregator::statistics::AggregationStatistics;
use crate::servers::flight::v1::exchange::serde::serialize_block;
use crate::sessions::QueryContext;

Expand All @@ -67,52 +65,6 @@ impl Default for HashTable {
}
}

struct PartialAggregationStatistics {
start: Instant,
first_block_start: Option<Instant>,
processed_bytes: usize,
processed_rows: usize,
}

impl PartialAggregationStatistics {
fn new() -> Self {
Self {
start: Instant::now(),
first_block_start: None,
processed_bytes: 0,
processed_rows: 0,
}
}

fn record_block(&mut self, rows: usize, bytes: usize) {
self.processed_rows += rows;
self.processed_bytes += bytes;
if self.first_block_start.is_none() {
self.first_block_start = Some(Instant::now());
}
}

fn log_finish_statistics(&self, hashtable: &AggregateHashTable) {
let elapsed = self.start.elapsed().as_secs_f64();
let real_elapsed = self
.first_block_start
.as_ref()
.map(|t| t.elapsed().as_secs_f64())
.unwrap_or(elapsed);

log::info!(
"[TRANSFORM-AGGREGATOR] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}",
self.processed_rows,
hashtable.payload.len(),
elapsed,
real_elapsed,
convert_number_size(self.processed_rows as f64 / elapsed),
convert_byte_size(self.processed_bytes as f64 / elapsed),
convert_byte_size(self.processed_bytes as f64),
);
}
}

enum Spiller {
Standalone(NewAggregateSpiller),
// (local_pos, spillers for all)
Expand Down Expand Up @@ -269,7 +221,7 @@ pub struct NewTransformPartialAggregate {
hash_table: HashTable,
probe_state: ProbeState,
params: Arc<AggregatorParams>,
statistics: PartialAggregationStatistics,
statistics: AggregationStatistics,
settings: MemorySettings,
spillers: Spiller,
}
Expand Down Expand Up @@ -302,7 +254,7 @@ impl NewTransformPartialAggregate {
hash_table,
probe_state: ProbeState::default(),
settings: MemorySettings::from_aggregate_settings(&ctx)?,
statistics: PartialAggregationStatistics::new(),
statistics: AggregationStatistics::new("NewPartialAggregate"),
spillers,
},
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use super::new_final_aggregate_state::RoundPhase;
use crate::pipelines::processors::transforms::aggregator::AggregateMeta;
use crate::pipelines::processors::transforms::aggregator::AggregatePayload;
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
use crate::pipelines::processors::transforms::aggregator::statistics::AggregationStatistics;

pub struct NewFinalAggregateTransform {
input: Arc<InputPort>,
Expand All @@ -61,6 +62,8 @@ pub struct NewFinalAggregateTransform {

/// spill
spiller: NewAggregateSpiller,

statistics: AggregationStatistics,
}

impl NewFinalAggregateTransform {
Expand Down Expand Up @@ -88,6 +91,7 @@ impl NewFinalAggregateTransform {
barrier,
shared_state,
spiller,
statistics: AggregationStatistics::new("NewFinalAggregate"),
}))
}

Expand Down Expand Up @@ -219,6 +223,9 @@ impl NewFinalAggregateTransform {
match meta {
AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() {
Some(ht) => {
let rows = payload.data_block.num_rows();
let bytes = payload.data_block.memory_size();
self.statistics.record_block(rows, bytes);
let payload = payload.convert_to_partitioned_payload(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
Expand All @@ -229,6 +236,9 @@ impl NewFinalAggregateTransform {
ht.combine_payloads(&payload, &mut self.flush_state)?;
}
None => {
let rows = payload.data_block.num_rows();
let bytes = payload.data_block.memory_size();
self.statistics.record_block(rows, bytes);
agg_hashtable = Some(payload.convert_to_aggregate_table(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
Expand All @@ -241,9 +251,15 @@ impl NewFinalAggregateTransform {
},
AggregateMeta::AggregatePayload(payload) => match agg_hashtable.as_mut() {
Some(ht) => {
let rows = payload.payload.len();
let bytes = payload.payload.memory_size();
self.statistics.record_block(rows, bytes);
ht.combine_payload(&payload.payload, &mut self.flush_state)?;
}
None => {
let rows = payload.payload.len();
let bytes = payload.payload.memory_size();
self.statistics.record_block(rows, bytes);
let capacity =
AggregateHashTable::get_capacity_for_count(payload.payload.len());
let mut hashtable = AggregateHashTable::new_with_capacity(
Expand All @@ -263,6 +279,7 @@ impl NewFinalAggregateTransform {
}

let output_block = if let Some(mut ht) = agg_hashtable {
self.statistics.log_finish_statistics(&ht);
let mut blocks = vec![];
self.flush_state.clear();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Instant;

use databend_common_base::base::convert_byte_size;
use databend_common_base::base::convert_number_size;
use databend_common_expression::AggregateHashTable;

pub struct AggregationStatistics {
stage: &'static str,
start: Instant,
first_block_start: Option<Instant>,
processed_bytes: usize,
processed_rows: usize,
}

impl AggregationStatistics {
pub fn new(stage: &'static str) -> Self {
Self {
stage,
start: Instant::now(),
first_block_start: None,
processed_bytes: 0,
processed_rows: 0,
}
}

pub fn record_block(&mut self, rows: usize, bytes: usize) {
self.processed_rows += rows;
self.processed_bytes += bytes;
if self.first_block_start.is_none() {
self.first_block_start = Some(Instant::now());
}
}

pub fn log_finish_statistics(&mut self, hashtable: &AggregateHashTable) {
let elapsed = self.start.elapsed().as_secs_f64();
let real_elapsed = self
.first_block_start
.as_ref()
.map(|t| t.elapsed().as_secs_f64())
.unwrap_or(elapsed);

log::info!(
"[TRANSFORM-AGGREGATOR][{}] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}",
self.stage,
self.processed_rows,
hashtable.payload.len(),
elapsed,
real_elapsed,
convert_number_size(self.processed_rows as f64 / elapsed),
convert_byte_size(self.processed_bytes as f64 / elapsed),
convert_byte_size(self.processed_bytes as f64),
hashtable.hash_index_resize_count(),
);

self.processed_rows = 0;
self.processed_bytes = 0;
self.first_block_start = None;
self.start = Instant::now();
}
}
Loading