diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 19774a6d437a8..246199700f423 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -26,6 +26,7 @@ use super::HashTableConfig; use super::LOAD_FACTOR; use super::MAX_PAGE_SIZE; use super::Payload; +use super::ProbeBatchStat; use super::group_hash_entries; use super::hash_index::AdapterImpl; use super::hash_index::HashIndex; @@ -113,6 +114,8 @@ impl AggregateHashTable { count: 0, capacity, capacity_mask: capacity - 1, + probe_batch_stats: Vec::new(), + probe_batch_index: 0, }, config, hash_index_resize_count: 0, @@ -402,7 +405,10 @@ impl AggregateHashTable { return; } self.hash_index_resize_count += 1; - self.hash_index = HashIndex::with_capacity(new_capacity); + let mut hash_index = HashIndex::with_capacity(new_capacity); + hash_index.probe_batch_stats = std::mem::take(&mut self.hash_index.probe_batch_stats); + hash_index.probe_batch_index = self.hash_index.probe_batch_index; + self.hash_index = hash_index; return; } @@ -433,6 +439,8 @@ impl AggregateHashTable { } } + hash_index.probe_batch_stats = std::mem::take(&mut self.hash_index.probe_batch_stats); + hash_index.probe_batch_index = self.hash_index.probe_batch_index; self.hash_index = hash_index } @@ -463,4 +471,8 @@ impl AggregateHashTable { pub fn hash_index_resize_count(&self) -> usize { self.hash_index_resize_count } + + pub fn take_probe_batch_stats(&mut self) -> Vec { + self.hash_index.take_probe_batch_stats() + } } diff --git a/src/query/expression/src/aggregate/hash_index.rs b/src/query/expression/src/aggregate/hash_index.rs index b8032982b828d..ea26cd3d3bd07 100644 --- a/src/query/expression/src/aggregate/hash_index.rs +++ b/src/query/expression/src/aggregate/hash_index.rs @@ -26,10 +26,20 @@ pub(super) struct HashIndex { pub count: usize, pub capacity: usize, pub capacity_mask: usize, + pub probe_batch_stats: Vec, + pub probe_batch_index: usize, } const INCREMENT_BITS: usize = 5; +#[derive(Clone, Debug)] +pub struct ProbeBatchStat { + pub batch_index: usize, + pub row_count: usize, + pub avg_probe_distance: f64, + pub median_probe_distance: f64, +} + /// Derive an odd probing step from the high bits of the hash so the walk spans all slots. /// /// this will generate a step in the range [1, 2^INCREMENT_BITS) based on hash and always odd. @@ -60,26 +70,30 @@ impl HashIndex { count: 0, capacity, capacity_mask, + probe_batch_stats: Vec::new(), + probe_batch_index: 0, } } - fn find_or_insert(&mut self, mut slot: usize, hash: u64) -> (usize, bool) { + fn find_or_insert(&mut self, mut slot: usize, hash: u64) -> (usize, bool, usize) { let salt = Entry::hash_to_salt(hash); let entries = self.entries.as_mut_slice(); + let mut probe_distance = 0; loop { debug_assert!(entries.get(slot).is_some()); // SAFETY: slot is always in range let entry = unsafe { entries.get_unchecked_mut(slot) }; if entry.is_occupied() { if entry.get_salt() == salt { - return (slot, false); + return (slot, false, probe_distance); } else { slot = next_slot(slot, hash, self.capacity_mask); + probe_distance += 1; continue; } } else { entry.set_salt(salt); - return (slot, true); + return (slot, true, probe_distance); } } } @@ -186,6 +200,7 @@ impl HashIndex { for (i, row) in state.no_match_vector[..row_count].iter_mut().enumerate() { *row = i.into(); state.slots[i] = init_slot(state.group_hashes[i], self.capacity_mask); + state.probe_distances[i] = 0; } let mut new_group_count = 0; @@ -199,9 +214,10 @@ impl HashIndex { // 1. inject new_group_count, new_entry_count, need_compare_count, no_match_count for row in state.no_match_vector[..remaining_entries].iter().copied() { let slot = &mut state.slots[row]; - let is_new; - - (*slot, is_new) = self.find_or_insert(*slot, state.group_hashes[row]); + let (new_slot, is_new, probe_distance) = + self.find_or_insert(*slot, state.group_hashes[row]); + *slot = new_slot; + state.probe_distances[row.to_usize()] += probe_distance; if is_new { state.empty_vector[new_entry_count] = row; new_entry_count += 1; @@ -246,14 +262,46 @@ impl HashIndex { let slot = &mut state.slots[row]; let hash = state.group_hashes[row]; *slot = next_slot(*slot, hash, self.capacity_mask); + state.probe_distances[row.to_usize()] += 1; } remaining_entries = no_match_count; } + self.record_probe_batch_stats(&state.probe_distances[..row_count]); self.count += new_group_count; new_group_count } + + fn record_probe_batch_stats(&mut self, probe_distances: &[usize]) { + if probe_distances.is_empty() { + return; + } + + let mut distances = probe_distances.to_vec(); + let count = distances.len(); + let sum: u64 = distances.iter().map(|value| *value as u64).sum(); + let avg = sum as f64 / count as f64; + + distances.sort_unstable(); + let median = if count % 2 == 1 { + distances[count / 2] as f64 + } else { + (distances[count / 2 - 1] as f64 + distances[count / 2] as f64) / 2.0 + }; + + self.probe_batch_stats.push(ProbeBatchStat { + batch_index: self.probe_batch_index, + row_count: count, + avg_probe_distance: avg, + median_probe_distance: median, + }); + self.probe_batch_index += 1; + } + + pub fn take_probe_batch_stats(&mut self) -> Vec { + std::mem::take(&mut self.probe_batch_stats) + } } pub(super) struct AdapterImpl<'a> { diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index d8629cb2bc557..d91af960eb5e6 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -36,6 +36,7 @@ pub use aggregate_function_state::*; pub use aggregate_hashtable::*; pub use group_hash::*; use hash_index::Entry; +pub use hash_index::ProbeBatchStat; pub use partitioned_payload::*; pub use payload::*; pub use payload_flush::*; diff --git a/src/query/expression/src/aggregate/probe_state.rs b/src/query/expression/src/aggregate/probe_state.rs index 7b9b04243fb26..463e5b4c1b26c 100644 --- a/src/query/expression/src/aggregate/probe_state.rs +++ b/src/query/expression/src/aggregate/probe_state.rs @@ -35,6 +35,7 @@ pub struct ProbeState { pub(super) group_compare_vector: [RowID; BATCH_SIZE], pub(super) no_match_vector: [RowID; BATCH_SIZE], pub(super) slots: [usize; BATCH_SIZE], + pub(super) probe_distances: [usize; BATCH_SIZE], pub(super) row_count: usize, pub partition_entries: Vec<(u16, SelectVector)>, @@ -51,6 +52,7 @@ impl Default for ProbeState { no_match_vector: [RowID::default(); BATCH_SIZE], empty_vector: [RowID::default(); BATCH_SIZE], slots: [0; BATCH_SIZE], + probe_distances: [0; BATCH_SIZE], row_count: 0, partition_entries: vec![], diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index 81fe71cc24448..2a9d3c06ca08b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -27,11 +27,13 @@ use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::HashTableConfig; use databend_common_expression::PayloadFlushState; +use databend_common_expression::ProbeBatchStat; use databend_common_pipeline::core::Event; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; use databend_common_pipeline_transforms::MemorySettings; +use log::info; use crate::pipelines::memory_settings::MemorySettingsExt; use crate::pipelines::processors::transforms::aggregator::AggregateMeta; @@ -256,6 +258,8 @@ impl NewTransformFinalAggregate { let config = ht.config.clone(); self.statistics.log_finish_statistics(&ht); + let probe_stats = ht.take_probe_batch_stats(); + log_probe_batch_stats(&probe_stats); let mut blocks = vec![]; self.flush_state.clear(); @@ -315,6 +319,23 @@ impl NewTransformFinalAggregate { } } +fn log_probe_batch_stats(stats: &[ProbeBatchStat]) { + if stats.is_empty() { + return; + } + + info!( + "hash_index probe_distance batches: total_batches={}", + stats.len() + ); + for stat in stats.iter().rev().take(100).rev() { + info!( + "hash_index probe_distance batch={} count={} avg={:.3} median={:.3}", + stat.batch_index, stat.row_count, stat.avg_probe_distance, stat.median_probe_distance + ); + } +} + #[async_trait::async_trait] impl Processor for NewTransformFinalAggregate { fn name(&self) -> String { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index e095bc3f6b17d..8d05c5c7fe14c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -20,11 +20,13 @@ use databend_common_expression::AggregateHashTable; use databend_common_expression::DataBlock; use databend_common_expression::HashTableConfig; use databend_common_expression::PayloadFlushState; +use databend_common_expression::ProbeBatchStat; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; use databend_common_pipeline_transforms::processors::BlockMetaTransform; use databend_common_pipeline_transforms::processors::BlockMetaTransformer; +use log::info; use crate::pipelines::processors::transforms::aggregator::AggregatorParams; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; @@ -112,6 +114,9 @@ impl TransformFinalAggregate { let mut blocks = vec![]; self.flush_state.clear(); + let probe_stats = ht.take_probe_batch_stats(); + log_probe_batch_stats(&probe_stats); + loop { if ht.merge_result(&mut self.flush_state)? { let mut entries = self.flush_state.take_aggregate_results(); @@ -134,6 +139,23 @@ impl TransformFinalAggregate { } } +fn log_probe_batch_stats(stats: &[ProbeBatchStat]) { + if stats.is_empty() { + return; + } + + info!( + "hash_index probe_distance batches: total_batches={}", + stats.len() + ); + for stat in stats { + info!( + "hash_index probe_distance batch={} count={} avg={:.3} median={:.3}", + stat.batch_index, stat.row_count, stat.avg_probe_distance, stat.median_probe_distance + ); + } +} + impl BlockMetaTransform for TransformFinalAggregate { const NAME: &'static str = "TransformFinalAggregate";